#!/usr/bin/ruby

#############################################################################
## Aggregates downloaded team-probing scamper traces into hourly files.
##
## This writes fully sorted but uncompressed hourly files to a staging area.
## It is the job of team-shelver to compress these sorted hourly files and
## push them to the live hourly directory.
##
## Although hourly files are created precisely on the hour, the contents of
## hourly files are not precise.  It's possible for an hourly file to contain
## traces started in the previous hour because scamper doesn't write traces in
## sorted order (and we don't sort traces, either, during processing).
## Normally (always?), this lagging will only happen for traces started
## within the last 2 minutes of the previous hour.  This also means that
## out-of-order traces only occur near the beginning of each hourly file.
##
## Despite the slight spilling over of traces between hourly files, the
## first trace in each hourly file is guaranteed to start in the hour
## defined by the file.
##
##
## Some other assumptions:
##
##   * For simplicity, a single warts file can only contain traces from a
##     single team and a single list.
##
##     Also, a warts file must contain traces from only a single cycle.
##     This requirement is needed to ensure that generated hourly files
##     also have a single cycle per file, which is depended upon by
##     team-shelver.  If any of these constraints aren't met, then traces
##     won't be properly divided up by team-shelver into cycles (in
##     particular, traces from the beginning hour of the next cycle may
##     show up at the end of the previous cycle).
##
## $Id: team-hourly,v 1.24 2019/03/07 21:23:45 youngh Exp $
#############################################################################

$: << File.dirname(__FILE__)
$0 = "ark-team-hourly"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'rdkafka'

$options = OpenStruct.new
$options.activity = "team-probing"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092
$options.staging = "/staging"
$options.seqnum = 1
$options.delete_input = true

opts = OptionParser.new

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--staging", "=DIR",
	"staging dir for Ark data (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--host", "=IP",
	"Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer,
	"Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--seqnum", "=NUM",
	"starting sequence num for hourly files (#{$options.seqnum})") do |v|
  $options.seqnum = v.to_i
end

opts.on("--[no-]delete", TrueClass,
	"delete input files after sorting? (#{$options.delete_input})") do |v|
  $options.delete_input = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("--force", TrueClass, "ignore out of order hourly files") do |v|
  $options.force = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

$options.staging = File.expand_path $options.staging

$download_dir = sprintf "%s/%s", $options.staging, $options.activity

$hourly_dir = sprintf "%s/%s.hourly", $options.staging, $options.activity

FileUtils.mkpath $hourly_dir unless File.directory? $hourly_dir

[["Ark staging", $options.staging],
 ["download", $download_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$hourly_topic = $options.activity + ".hourly"

#............................................................................

$can_exit_immediately = false
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class HourlyCreator

  MIN_GZ_FILE_SIZE = 512

  def initialize
    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    collector_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-team-hourly",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TEAM_HOURLY_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TEAM_HOURLY_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    producer_config = collector_config
    consumer_config = collector_config.merge({
        :"group.id" => "ark-team-hourly",
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
    })
    @consumer = Rdkafka::Config.new(consumer_config).consumer
    @producer = Rdkafka::Config.new(producer_config).producer

    @current_hourly = {}  # monitor => <cycle>.<yyyymmdd-hh>
    @hourly_files = Hash.new { |h,k| h[k] = [] }  # monitor => [ file ]
  end


  # dca-us.team-probing.c006950.20180916-00.warts
  HOURLY_RE = /^([a-z][a-z0-9-]{0,31})\.[a-z0-9-]+\.(c\d+\.\d{8}-\d{2})\.warts$/

  def recover_state
    puts "Recovering hourly state ..."

    n = 0
    topic = $options.activity + ".hourly"
    @consumer.subscribe(topic)

    loop do
      message = @consumer.poll(250)
      if !message
          break
      end

      n += 1
      if n % 1000 == 0
        printf ".... %d\n", n
      end

      m = HOURLY_RE.match message.payload
      if m
        monitor, cycle_yyyymmdd_hh = m.captures()
        old_value = @current_hourly[monitor]
        if old_value && cycle_yyyymmdd_hh <= old_value
          $stderr.printf "ERROR: hourly files out of order for %s:" +
                         " found %s, which is <= previous %s\n", monitor,
                         cycle_yyyymmdd_hh, old_value
          next if $options.force
          exit 1
        end

        @current_hourly[monitor] = cycle_yyyymmdd_hh
      else
        $stderr.printf "ERROR: %d: %p\n", message.offset, message.payload
        exit 1
      end
    end

    if n > 0 && !$options.dry_run
      @consumer.commit
    end

    @consumer.unsubscribe

    printf "%d files in %s.\n", n, topic
    puts "\nCurrent hourly state:"
    @current_hourly.sort.each do |monitor, cycle_date|
      printf "  % 7s %s\n", monitor, cycle_date
    end
  end


  # dca-us.team-probing.1537083276.c006950.20180916.warts.gz
  DOWNLOADED_RE = /^[a-z][a-z0-9-]{0,31}\.[a-z0-9-]+\.(\d+)\.(c\d+)\.\d{8}\.warts\.gz$/

  def process_downloaded_data
    current_downloaded = {}  # monitor => <cycle>.<timestamp>

    puts "Processing 'downloaded-data' ..."
    @consumer.subscribe("ark-downloaded-data")
    @consumer.each do |message|
      monitor, activity, file = message.payload.split("|")
      if activity != $options.activity
          @consumer.commit unless $options.dry_run
          next
      end

      m = DOWNLOADED_RE.match file
      if m
        printf "%d: %p\n", message.offset, message.payload
        timestamp, cycle = m.captures()

        cycle_timestamp = cycle + "." + timestamp
        old_value = current_downloaded[monitor]
        if old_value && cycle_timestamp <= old_value
          $stderr.printf "WARNING: downloaded files out of order for %s:" +
                         " found %s, which is <= previous %s\n", monitor,
                         cycle_timestamp, old_value
          next
        end
        current_downloaded[monitor] = cycle_timestamp

        timestamp = timestamp.to_i
        yyyymmdd_hh = Time.at(timestamp).gmtime().strftime "%Y%m%d-%H"
        file_hourly = cycle + "." + yyyymmdd_hh

        current_hourly = @current_hourly[monitor]
        if current_hourly == nil || file_hourly == current_hourly
          @current_hourly[monitor] = file_hourly unless current_hourly
          @hourly_files[monitor] << file

        elsif file_hourly > current_hourly
          outfile = sprintf "%s.%s.%s.warts", monitor, activity, current_hourly

          if create_hourly_file(monitor, outfile) && !$options.dry_run
            @producer.produce(topic: $hourly_topic, payload: outfile)
          end

          @current_hourly[monitor] = file_hourly
          @hourly_files[monitor] = [file]

        else # file_hourly < current_hourly
          puts " ... skipping old file."
        end
      else
        printf "ERROR: %d: %p\n", message.offset, message.payload
      end
      @consumer.commit unless $options.dry_run
    end

    exit_if_requested()
  end


  def create_hourly_file(monitor, outfile)
    paths = @hourly_files[monitor].map { |file|
      sprintf "%s/%s/%s", $download_dir, monitor, file
    }
    if $options.verbose
      puts outfile + ":"
      puts paths.join("\n")
      puts
    end

    outdir = "#{$hourly_dir}/#{monitor}"
    FileUtils.mkdir outdir unless File.directory? outdir

    outpath = outdir + "/" + outfile
    if File.exists? outpath
      printf "Skipping: hourly file '%s' already exists.\n", outpath
      delete_input_files_if_requested paths
      return false  # can happen on the first hourly file per mon after restart
    end

    tmp_paths = []
    paths.each do |path|
      unless File.exists? path
        printf "WARNING: missing file '%s'\n", path
        next
      end

      stat = File.stat path
      if stat.size < MIN_GZ_FILE_SIZE
        printf "... skipping %d-byte file '%s'\n", stat.size, path
        next
      end

      tmp_path = sprintf "%s/.%s", outdir, File::basename(path, ".gz")
      tmp_paths << tmp_path

      command = sprintf "gzip -dc %s >%s", path, tmp_path
      sh command
    end

    if tmp_paths.empty?
      delete_input_files_if_requested paths
      return false 
    end

    command = sprintf "sc_wartscat -o %s/.%s %s", outdir, outfile,
                      tmp_paths.join(" ")
    sh command

    command = sprintf "mv %s/.%s %s", outdir, outfile, outpath
    sh command

    command = "rm " + tmp_paths.join(" ")
    sh command

    delete_input_files_if_requested paths
    true
  end


  def delete_input_files_if_requested(paths)
    if $options.delete_input
      # Use -f to ensure success exit status even when a file is missing
      # (which can happen).
      command = "rm -f " + paths.join(" ")
      sh command
    end
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

creator = HourlyCreator.new
creator.recover_state()
creator.process_downloaded_data()
