#!/usr/bin/ruby

#############################################################################
## Aggregates hourly team-probing scamper traces into daily files.
##
## Although daily files are created precisely on the day, the contents of
## daily files are not precise.  It's possible for a daily file to contain
## traces started in the previous day because scamper doesn't write traces in
## sorted order (and we don't sort traces, either, during processing).
## Normally (always?), this lagging will only happen for traces started
## within the last 2 minutes of the previous day.  This also means that
## out-of-order traces only occur near the beginning of each daily file.
##
## Despite the slight spilling over of traces between daily files, the
## first trace in each daily file is guaranteed to start in the hour
## defined by the file.
##
##
## Some other assumptions:
##
##   * For simplicity, a single warts file can only contain traces from a
##     single team and a single list.
##
##     Also, a warts file must contain traces from only a single cycle.
##     This requirement is needed to ensure that generated daily files
##     also have a single cycle per file, which is depended upon by
##     team-shelver.  If any of these constraints aren't met, then traces
##     won't be properly divided up by team-shelver into cycles (in
##     particular, traces from the beginning hour of the next cycle may
##     show up at the end of the previous cycle).
##
## $Id: team-daily,v 1.13 2021/02/13 06:06:52 youngh Exp $
#############################################################################

$: << File.dirname(__FILE__)
$0 = "ark-team-daily"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'rdkafka'

$options = OpenStruct.new
$options.activity = "team-probing"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092
$options.staging = "/staging"
$options.delete_input = true

opts = OptionParser.new

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--staging", "=DIR",
	"staging dir for Ark data (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--host", "=IP",
	"Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer,
	"Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--[no-]delete", TrueClass,
	"delete input files after sorting? (#{$options.delete_input})") do |v|
  $options.delete_input = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("--force", TrueClass, "ignore out of order daily files") do |v|
  $options.force = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

$options.staging = File.expand_path $options.staging

$hourly_dir = sprintf "%s/%s.hourly", $options.staging, $options.activity

$daily_dir = sprintf "%s/%s.daily", $options.staging, $options.activity

FileUtils.mkpath $daily_dir unless File.directory? $daily_dir

[["Ark staging", $options.staging],
 ["hourly", $hourly_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$hourly_topic = $options.activity + ".hourly"
$daily_topic = $options.activity + ".daily"

#............................................................................

$can_exit_immediately = false
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class DailyCreator

  def initialize
    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    collector_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-team-daily",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TEAM_DAILY_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TEAM_DAILY_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    producer_config = collector_config
    consumer_config = collector_config.merge({
        :"group.id" => "ark-team-daily",
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
    })
    @consumer = Rdkafka::Config.new(consumer_config).consumer
    @producer = Rdkafka::Config.new(producer_config).producer

    @current_daily = {}  # monitor => <cycle>.<yyyymmdd>
    @daily_files = Hash.new { |h,k| h[k] = [] }  # monitor => [ file ]
  end


  # dca-us.team-probing.c006950.20180916.warts
  DAILY_RE = /^([a-z]{3}\d?-[a-z]{2})\.[a-z0-9-]+\.(c\d+\.\d{8})\.warts$/

  def recover_state
    puts "Recovering daily state ..."

    @consumer.subscribe($options.activity + ".daily")
    n = 0

    loop do
      message = @consumer.poll(250)
      if !message
          break
      end

      n += 1
      if n % 1000 == 0
        printf ".... %d\n", n
      end

      m = DAILY_RE.match message.payload
      if m
        monitor, cycle_yyyymmdd = m.captures()
        old_value = @current_daily[monitor]
        if old_value && cycle_yyyymmdd <= old_value
          $stderr.printf "ERROR: daily files out of order for %s:" +
                         " found %s, which is <= previous %s\n", monitor,
                         cycle_yyyymmdd, old_value
          next if $options.force
          exit 1
        end

        @current_daily[monitor] = cycle_yyyymmdd
      else
        $stderr.printf "ERROR: %d: %p\n", message.offset, message.payload
        exit 1
      end
    end

    if n > 0 && !$options.dry_run
      @consumer.commit
    end

    @consumer.unsubscribe

    printf "%d files in team-probing.daily.\n", n
    puts "\nCurrent daily state:"
    @current_daily.sort.each do |monitor, cycle_date|
      printf "  % 7s %s\n", monitor, cycle_date
    end
  end


  # dca-us.team-probing.c006950.20180916-00.warts
  HOURLY_RE = /^([a-z]{3}\d?-[a-z]{2})\.([a-z0-9-]+)\.(c\d+\.\d{8})(-\d{2})\.warts$/

  def process_hourly_files
    current_hourly = {}  # monitor => <cycle>.<yyyymmdd>

    puts "Processing '#{$hourly_topic}' ..."
    @consumer.subscribe($hourly_topic)
    @consumer.each do |message|
      m = HOURLY_RE.match message.payload
      if m
        printf "%d: %p\n", message.offset, message.payload
        monitor, activity, cycle_yyyymmdd, hh = m.captures()
        next unless activity == $options.activity

        cycle_yyyymmdd_hh = cycle_yyyymmdd + hh
        old_value = current_hourly[monitor]
        if old_value && cycle_yyyymmdd_hh <= old_value
          $stderr.printf "WARNING: hourly files out of order for %s:" +
                         " found %s, which is <= previous %s\n", monitor,
                         cycle_yyyymmdd_hh, old_value
          @consumer.commit unless $options.dry_run
          next   # XXX perhaps exit instead? or only continue on --force?
        end
        current_hourly[monitor] = cycle_yyyymmdd_hh

        file_daily = cycle_yyyymmdd
        current_daily = @current_daily[monitor]
        if current_daily == nil || file_daily == current_daily
          @current_daily[monitor] = file_daily unless current_daily
          @daily_files[monitor] << message.payload

        elsif file_daily > current_daily
          outfile = sprintf "%s.%s.%s.warts", monitor, activity, current_daily
          if create_daily_file(monitor, outfile) && !$options.dry_run
            @producer.produce(topic: $daily_topic, payload: outfile)
          end

          @current_daily[monitor] = file_daily
          @daily_files[monitor] = [message.payload]

        else # file_daily < current_daily
          puts " ... skipping old file."
        end
      else
        printf "ERROR: %d: %p\n", message.offset, message.payload
      end
      @consumer.commit unless $options.dry_run
    end

    exit_if_requested()
  end


  def create_daily_file(monitor, outfile)
    paths = @daily_files[monitor].map { |file|
      sprintf "%s/%s/%s", $hourly_dir, monitor, file
    }
    if $options.verbose
      puts outfile + ":"
      puts paths.join("\n")
      puts
    end

    paths.delete_if { |path|
      if File.exists? path
        false
      else
        printf "Ignoring missing hourly file '%s'\n", path
        true
      end
    }

    return false if paths.empty?

    outdir = "#{$daily_dir}/#{monitor}"
    FileUtils.mkdir outdir unless File.directory? outdir

    outpath = outdir + "/" + outfile
    if File.exists? outpath
      printf "Skipping: daily file '%s' already exists.\n", outpath
      delete_input_files_if_requested paths
      return false  # can happen on the first daily file after restart
    end

    command = sprintf "sc_wartscat -o %s/.%s %s", outdir, outfile,
                      paths.join(" ")
    sh command

    command = sprintf "mv %s/.%s %s", outdir, outfile, outpath
    sh command

    delete_input_files_if_requested paths
    true
  end


  def delete_input_files_if_requested(paths)
    if $options.delete_input
      # Use -f to ensure success exit status even when a file is missing
      # (which can happen).
      command = "rm -f " + paths.join(" ")
      sh command
    end
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

creator = DailyCreator.new
creator.recover_state()
creator.process_hourly_files()
