#!/usr/bin/ruby

#############################################################################
## Makes latest cycle files publicly downloadble by placing them in the
## right YYYY/MDD directory.
##
## For --offline runs, do something like the following:
##
##   $ find /staging/topo-v6 -type f -name \*.warts \
##     | sort | topo6-archiver --offline -v
##
## $Id: topo6-archiver,v 1.32 2023/06/07 02:14:05 mjl Exp $
#############################################################################

$: << File.dirname(__FILE__)
$0 = "ark-topo6-archiver"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'rdkafka'

$options = OpenStruct.new
$options.activity = "topo-v6"
$options.archive_dir = "/data/topology/ark/data/topo-v6/list-8.ipv6.allpref"
$options.staging = "/staging"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092
$options.recover = true
$options.delete_processed = true
$options.compress = true
$options.log = true

opts = OptionParser.new

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--archive-dir", "=DIR",
	"cycle archive directory (#{$options.archive_dir})") do |v|
  $options.archive_dir = v
end

opts.on("--staging-dir", "=DIR",
    "staging dir for Ark data (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--host", "=IP", "Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer, "Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--[no-]recover", TrueClass,
        "recover state from .cycle-archive topic") do |v|
  $options.recover = v
end

opts.on("--[no-]delete-processed", TrueClass,
        "delete processed downloaded files") do |v|
  $options.delete_processed = v
end

opts.on("--[no-]compress", TrueClass, "compress archived cycle files") do |v|
  $options.compress = v
end

opts.on("--[no-]log", TrueClass, "log cycle files .cycle-archive topic") do |v|
  $options.log = v
end

opts.on("--offline", TrueClass, "process offline cycle files, then exit") do |v|
  $options.offline = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

$options.staging = File.expand_path $options.staging

$download_dir = sprintf "%s/%s", $options.staging, $options.activity

[["Ark staging", $options.staging],
 ["download", $download_dir],
 ["archive", $options.archive_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$archive_topic = $options.activity + ".cycle-archive"

#............................................................................

$can_exit_immediately = false
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class CycleArchiver

  def initialize
    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    collector_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-topo6-archiver",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TOPO6_ARCHIVER_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TOPO6_ARCHIVER_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    producer_config = collector_config
    consumer_config = collector_config.merge({
        :"group.id" => "ark-topo6-archiver",
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
    })
    @consumer = Rdkafka::Config.new(consumer_config).consumer
    @producer = Rdkafka::Config.new(producer_config).producer

    @current_cycle = {}  # monitor => timestamp
  end


  # topo-v6.l8.20180830.1535615746.sjc2-us.warts.gz
  CYCLE_RE = /^topo-v6\.l8\.\d{8}\.(\d+)\.([a-z]{3}\d?-[a-z]{2})\.warts(\.gz)?$/

  def recover_state
    puts "Recovering cycle state ..."

    @consumer.subscribe($archive_topic)
    n = 0

    loop do
      message = @consumer.poll(250)

      # stop processing once there are no more messages available
      if !message
          break
      end

      n += 1
      if n % 1000 == 0
        printf ".... %d\n", n
      end

      log_timestamp, log_path = message.payload.split("|")
      cycle_file = File.basename log_path

      m = CYCLE_RE.match cycle_file
      if m
        timestamp, monitor = m.captures()
        timestamp = timestamp.to_i

        last_timestamp = @current_cycle[monitor]
        if last_timestamp && timestamp <= last_timestamp
          $stderr.printf "ERROR: cycle files out of order for %s:" +
                         " found %d, which is <= previous %d\n", monitor,
                         timestamp, last_timestamp
          next if $options.force
          exit 1
        end

        @current_cycle[monitor] = timestamp
      else
        $stderr.printf "ERROR: %d: %p\n", message.offset, message.payload
        exit 1
      end
    end

    if n > 0 && !$options.dry_run
      @consumer.commit
    end

    @consumer.unsubscribe

    printf "%d files in %s.\n", n, $archive_topic
    puts "\nCurrent cycle state:"
    @current_cycle.sort.each do |monitor, timestamp|
      printf "  % 7s %s (%s)\n", monitor, timestamp, Time.at(timestamp)
    end
  end


  # 1536034475.yto-ca.l8.20180904.warts
  # 1686031043.hlz2-nz.l8.20230606.warts.gz
  DOWNLOADED_RE = /^(\d+)\.([a-z]{3}\d?-[a-z]{2})\.l8\.(\d{8})\.warts(?:\.gz)?$/

  def process_downloaded_data
    current_downloaded = {}  # monitor => timestamp

    puts "Processing 'downloaded-data' ..."
    @consumer.subscribe("ark-downloaded-data")
    @consumer.each do |message|
      monitor, activity, filename = message.payload.split("|")
      if activity != $options.activity
          @consumer.commit unless $options.dry_run
          next
      end

      m = DOWNLOADED_RE.match filename
      if m
        printf "%d: %p\n", message.offset, message.payload
        process_cycle_file filename, m, current_downloaded
      else
        $stderr.printf "ERROR: %d: %p\n", message.offset, message.payload
      end
      @consumer.commit unless $options.dry_run
      exit_if_requested()
    end
  end


  def process_offline_cycle_files
    current_downloaded = {}  # monitor => timestamp

    puts "Processing offline cycle files ..."
    ARGF.each do |path|
      next if path =~ /^\s*$/
      path.chomp!

      filename = File::basename path
      m = DOWNLOADED_RE.match filename
      if m
        puts filename
        process_cycle_file filename, m, current_downloaded
      else
        $stderr.printf "ERROR: invalid cycle file name '%s' (full path: %s)\n",
                       filename, path
        exit 1
      end

      exit_if_requested()
    end
  end


  def process_cycle_file(filename, matchdata, current_downloaded)
    timestamp, monitor, yyyymmdd = matchdata.captures()
    timestamp = timestamp.to_i

    last_downloaded = current_downloaded[monitor]
    if last_downloaded && timestamp <= last_downloaded
      $stderr.printf "WARNING: downloaded files out of order for %s:" +
                     " found %d, which is <= previous %d\n", monitor,
                     timestamp, last_downloaded
      return
    end
    current_downloaded[monitor] = timestamp

    last_cycle = @current_cycle[monitor]
    if last_cycle == nil || timestamp > last_cycle
      copy_cycle_file monitor, filename, yyyymmdd, timestamp
      @current_cycle[monitor] = timestamp
    else # timestamp <= last_cycle
      puts " ... skipping old file."
    end
  end


  def copy_cycle_file(monitor, filename, yyyymmdd, timestamp)
    source_path = sprintf "%s/%s/%s", $download_dir, monitor, filename
    unless File.exists? source_path
      printf "WARNING: cycle file '%s' missing\n", source_path
      return
    end

    year = yyyymmdd[0..3]
    month = yyyymmdd[4..5]
    dest_dir = sprintf "%s/%s/%s", $options.archive_dir, year, month
    command = "mkdir -p " + dest_dir
    sh command

    log_path = sprintf "%s/%s/", year, month

    if /\.gz$/.match(filename)
      outfile = sprintf "topo-v6.l8.%s.%d.%s.warts.gz", yyyymmdd, timestamp,
                        monitor
      log_path += outfile
      dest_path = sprintf "%s/%s/%s/%s", $options.archive_dir,
                          year, month, outfile
      tmp_dest_path = sprintf "%s/%s/%s/.%s", $options.archive_dir,
                              year, month, outfile
      command = sprintf "cp -p %s %s", source_path, tmp_dest_path
      sh command
      command = sprintf "mv %s %s", tmp_dest_path, dest_path
      sh command
    else
      # topo-v6.l8.20180830.1535615746.sjc2-us.warts.gz
      outfile = sprintf "topo-v6.l8.%s.%d.%s.warts", yyyymmdd, timestamp,
                        monitor
      log_path += outfile
      dest_path = sprintf "%s/%s/%s/%s", $options.archive_dir,
                          year, month, outfile
      tmp_dest_path = sprintf "%s/%s/%s/.%s", $options.archive_dir,
                              year, month, outfile
      command = sprintf "cp -p %s %s", source_path, tmp_dest_path
      sh command

      if $options.compress
        command = sprintf "rm %s.gz 2>/dev/null; gzip %s",
                          tmp_dest_path, tmp_dest_path
        sh command
        tmp_dest_path += ".gz"
        dest_path += ".gz"
        log_path += ".gz"
      end

      command = sprintf "mv %s %s", tmp_dest_path, dest_path
      sh command
    end

    if $options.log && !$options.dry_run
      message = sprintf "%d|%s", Time.now.to_i, log_path
      @producer.produce(topic: $archive_topic, payload: message)
    end

    if $options.delete_processed
      command = "rm " + source_path
      sh command
    end
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

archiver = CycleArchiver.new
archiver.recover_state() if $options.recover

if $options.offline
  archiver.process_offline_cycle_files()
else
  archiver.process_downloaded_data()
end
