#!/usr/bin/ruby

#############################################################################
## Reads the <activity>.daily-archive topic in Kafka and writes out completed
## cycles to the <activity>.cycle-completion topic.
##
## Detecting cycle completions is a little tricky.  The ideal way is to
## take into account both which monitors have data to download for each
## cycle and which monitors are up/reachable at any given moment.  With
## this information, we can decide whether to mark a cycle as complete
## right away or wait until lagging downloads finish (from monitors that
## may not be reachable at the moment but might later).
##
## Unfortunately, we lack both pieces of information, so our cycle
## completion detection has to operate heuristically based on a key
## observation--we know a cycle has ended when we see a daily file for the
## next cycle.  The decision then becomes how long to wait after the start
## of the next cycle (and thus the end of the previous cycle) before
## marking the previous cycle as complete.  This delay determines how long
## we're willing to wait for downloads of the prior cycle to finish before
## declaring the prior cycle complete.  This requires making a tradeoff
## between how tolerant we are of connectivity losses to monitors (for
## daily file downloads) and how much delay we're willing to accept before
## triggering the generation of derived datasets that work on whole cycles
## at a time (e.g., cycle AS links generation).
##
## $Id: team-cycle-completer,v 1.8 2019/03/09 03:51:15 youngh Exp $
#############################################################################

$: << File.dirname(__FILE__)
$0 = "ark-team-cycle-completer"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'rdkafka'

$options = OpenStruct.new

# The number of entries to list in the cycle-completion.log.  This should
# be large enough to ensure a few weeks worth of cycles in the log.
$options.count = 60
$options.interval = 3600 # secs between log updates
$options.delay = 12*3600 # secs to wait after last daily to complete a cycle
$options.archive_dir = "/data/topology/ark/data/team-probing/list-7.allpref24/team-1/daily"
$options.activity = "team-probing"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092
$options.force = true

opts = OptionParser.new

opts.on("--count", "=NUM", Integer,
        "number of log entries (#{$options.count})") do |v|
  $options.count = v
end

opts.on("--interval", "=SECS", Integer,
        "secs between log updates (#{$options.interval})") do |v|
  $options.interval = v
end

opts.on("--delay", "=SECS", Integer,
        "secs to wait for lagging daily files before completing a cycle" +
        " (#{$options.delay})") do |v|
  $options.delay = v
end

opts.on("--archive-dir", "=DIR",
	"daily archive directory (#{$options.archive_dir})") do |v|
  $options.archive_dir = v
end

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--host", "=IP", "Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer, "Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("--[no-]force", TrueClass, "ignore out of order daily files") do |v|
  $options.force = v
end

opts.on("--log-only", TrueClass,
        "write cycle-completion.log and exit") do |v|
  $options.log_only = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

[["archive", $options.archive_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$archive_topic = $options.activity + ".daily-archive"
$completion_topic = $options.activity + ".cycle-completion"

#............................................................................

$can_exit_immediately = true
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class CycleCompleter

  Cycle = Struct.new :timestamp, :cycle, :cycle_dir

  def initialize
    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    @last_recovered_cycle = nil
    @pending_cycles = []  # [ Cycle ]
    #@next_offset = :earliest
    @last_log_write = Time.now.to_i

    collector_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-team-cycle-completer",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TEAM_CYCLE_COMPLETER_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TEAM_CYCLE_COMPLETER_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    producer_config = collector_config
    consumer_config = collector_config.merge({
        :"group.id" => "ark-team-cycle-completer",
        :"enable.partition.eof" => true,
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
    })
    @archive_consumer = Rdkafka::Config.new(consumer_config).consumer
    @completion_consumer = Rdkafka::Config.new(consumer_config).consumer
    @producer = Rdkafka::Config.new(producer_config).producer
  end


  def seek(consumer, topic, count)
    # poll to get the current offset (and ignore the returned message for now,
    # we'll work our way back up to it later)
    loop do
      begin
        consumer.poll(1000)
        break if not consumer.committed().empty?
      rescue Rdkafka::RdkafkaError => error
        if !error.is_partition_eof?
          raise error
        end
      end
    end

    # get the current offset for what should be 1 topic, with 1 partition
    offset = consumer.committed().to_h[topic][0].offset.to_i

    # seek() takes a message, so build a fake one with the desired offset
    fake_msg = OpenStruct.new(
        topic: topic,
        partition: 0,
        offset: offset - count
    )

    # jump back the desired number of messages
    consumer.seek(fake_msg)

    return offset
  end


  def recover_state
    puts "Recovering cycle completion state ..."

    @completion_consumer.subscribe($completion_topic)

    # seek back 1 message to get the last cycle id
    seek(@completion_consumer, $completion_topic, 1)

    # TODO just poll for a single message?
    begin
      # consume all recent messages until eof, should only be one
      @completion_consumer.each do |message|
        printf "%d: %p\n", message.offset, message.payload
        timestamp, cycle, path = message.payload.split("|")
        @last_recovered_cycle = cycle.to_i
      end
    rescue Rdkafka::RdkafkaError => error
      if !error.is_partition_eof?
        raise error
      end
    end

    @completion_consumer.unsubscribe

    printf "Note: Last completed cycle was %d.\n", (@last_recovered_cycle || 0)
  end


  # dca-us.team-probing.c006950.20180916.warts.gz
  DAILY_RE = /^([a-z]{3}\d?-[a-z]{2})\.([a-z0-9-]+)\.c(\d+)\.\d{8}\.warts(\.gz)?$/

  def process_daily_files
    puts "Processing '#{$archive_topic}' ..."
    last_cycle = nil

    @archive_consumer.subscribe($archive_topic)

    loop do
      begin
        @archive_consumer.each do |message|
          timestamp, path = message.payload.split("|")
          timestamp = timestamp.to_i
          cycle_dir = File.dirname path
          filename = File.basename path

          m = DAILY_RE.match filename
          if m
            printf "%d: %p\n", message.offset, message.payload
            monitor, activity, cycle = m.captures()
            cycle = cycle.to_i
            if activity != $options.activity
              @archive_consumer.commit
              next
            end

            if last_cycle && cycle < last_cycle
              # Note: Because daily files may be delayed in getting pushed
              #       out (due to, for example, problems downloading files
              #       from monitors), it's relatively normal for daily files
              #       to be out of order.
              $stderr.printf "WARNING: daily file out of order: found cycle %d"+
                             " which is < previous %d\n", cycle, last_cycle
              next if $options.force
              exit 1
            end
            last_cycle = cycle

            $can_exit_immediately = false
            process_daily_file timestamp, cycle, cycle_dir
            begin
              # TODO find the root cause. Possibly related to interleaving
              # consumers, or the eof exception the completion_consumer gets
              # (though the consumers should be independent...)
              # for now, if commit fails due to no offset let it slide
              @archive_consumer.commit
            rescue Rdkafka::RdkafkaError => error
              if error.code != :no_offset
                raise error
              end
              printf "TEMP: ignoring no_offset when committing archive_consumer"
            end
            $can_exit_immediately = true
            exit_if_requested()
          else
            printf "ERROR: %d: %p\n", message.offset, message.payload
          end
        end
      rescue Rdkafka::RdkafkaError => error
        if !error.is_partition_eof?
          raise error
        end
      end
    end
  end


  def process_daily_file(timestamp, cycle, cycle_dir)
    if @last_recovered_cycle == nil || cycle > @last_recovered_cycle
      if @pending_cycles.empty? || @pending_cycles[-1].cycle != cycle
        @pending_cycles << Cycle.new(timestamp, cycle, cycle_dir)
      end

      now = Time.now.to_i
      while @pending_cycles.size >= 2
        if now - @pending_cycles[1].timestamp >= $options.delay
          completion_timestamp = @pending_cycles[1].timestamp + $options.delay
          completed = @pending_cycles.shift
          p completed
          message = sprintf "%d|%d|%s", completion_timestamp,
                            completed.cycle, completed.cycle_dir
          printf "completed: %s\n", message
          unless $options.dry_run
            @producer.produce(topic: $completion_topic, payload: message)

            if now - @last_log_write >= $options.interval
              @last_log_write = now
              write_completion_log()
            end
          end
        else
          break
        end
      end

    else  # cycle <= @last_recovered_cycle
      printf " ... skipping old cycle %d.\n", cycle
    end
  end


  def write_completion_log
    entries = []
    need_commit = false

    @completion_consumer.subscribe($completion_topic)

    # jump back some messages so there is data to show in the log
    offset = seek(@completion_consumer, $completion_topic, $options.count)

    begin
      @completion_consumer.each do |message|
        printf "%d: %p\n", message.offset, message.payload
        entries << message.payload.split("|")
        if message.offset >= offset
          need_commit = true
        end
      end
    rescue Rdkafka::RdkafkaError => error
      if !error.is_partition_eof?
        raise error
      end
    end

    log_path = $options.archive_dir + "/cycle-completion.log"

    tmp_log_path = log_path + ".updating"
    File.open(tmp_log_path, "w") do |out|
      gen_date = Time.now.to_s
      out.print <<EOF
# Format: 1
# Fields: timestamp cycle cycle_dir
# Generated: #{gen_date}
# --------------------------------------------------------------------------
# Note: There may be files for more than one cycle in a given cycle_dir.
#       Check the cycle ID encoded in daily file names, and be sure to only
#       use daily files for compeleted cycles in a given cycle_dir.
# --------------------------------------------------------------------------
EOF
      entries.each do |timestamp, cycle, cycle_dir|
        out.printf "%d\t%d\t%s\n", timestamp, cycle, cycle_dir
      end
    end

    command = sprintf "mv %s %s", tmp_log_path, log_path
    sh command

    # only commit offsets once the file is written and moved into place,
    # and only if there were new messages otherwise it triggers an error
    if need_commit
      @completion_consumer.commit
    end

    @completion_consumer.unsubscribe
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

completer = CycleCompleter.new

if $options.log_only
  completer.write_completion_log()
else
  completer.recover_state()
  completer.process_daily_files()
end
