#!/usr/bin/ruby

#############################################################################
## Makes latest hourly files publicly downloadble.
##
## $Id: team-hourly-archiver,v 1.8 2019/10/02 20:11:54 youngh Exp $
#############################################################################

$: << File.dirname(__FILE__)
$0 = "ark-team-hourly-archiver"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'rdkafka'

$options = OpenStruct.new
$options.activity = "team-probing"
$options.archive_dir = "/data/topology/ark/data/team-probing/list-7.allpref24/team-1/hourly"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092
$options.staging = "/staging"
$options.compress = true

opts = OptionParser.new

opts.on("--next-seqnum", "=NUM", Integer,
	"next hourly sequence number to use") do |v|
  $options.next_seqnum = v
end

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--archive-dir", "=DIR",
	"hourly archive directory (#{$options.archive_dir})") do |v|
  $options.archive_dir = v
end

opts.on("--staging", "=DIR",
    "staging dir for Ark data (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--host", "=IP", "Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer, "Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--[no-]compress", TrueClass, "compress archived hourly files") do |v|
  $options.compress = v
end

opts.on("--force", TrueClass, "continue recovery despite warnings") do |v|
  $options.force = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

$options.staging = File.expand_path $options.staging

$hourly_dir = sprintf "%s/%s.hourly", $options.staging, $options.activity

[["Ark staging", $options.staging],
 ["hourly", $hourly_dir],
 ["archive", $options.archive_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$hourly_topic = $options.activity + ".hourly"

#............................................................................

$can_exit_immediately = false
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class HourlyArchiver

  def initialize
    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    consumer_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-team-hourly-archiver",
        :"group.id" => "ark-team-hourly-archiver",
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TEAM_HOURLY_ARCHIVER_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TEAM_HOURLY_ARCHIVER_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    @consumer = Rdkafka::Config.new(consumer_config).consumer
    @current_daily = {}  # monitor => <cycle>.<yyyymmdd>
    @next_seqnum = nil
  end


  # An archived hourly filename is just an hourly file with a seqnum prepended.
  # 06528124.zrh-ch.team-probing.c006981.20181003-16.warts
  ARCHIVE_FILE_RE = /^(\d+)\.([a-z]{3}\d?-[a-z]{2}\.[a-z0-9-]+\.c\d+\.\d{8}-\d{2}\.warts)(\.gz)?$/

  def recover_state
    if $options.next_seqnum
      @next_seqnum = $options.next_seqnum
      # and start processing hourly files at offset 0
      return
    end

    puts "Recovering seq number and last archived hourly file ..."

    last_seqnum = last_hourly_file = nil
    Dir.entries($options.archive_dir).sort.each do |entry|
      next if entry[0] == "."
      puts entry if $options.verbose

      m = ARCHIVE_FILE_RE.match entry
      if m
        last_seqnum, last_hourly_file, _gz = m.captures()
      else
        $stderr.printf "ERROR: archive file has unknown naming scheme: %s\n",
                       entry
        exit 1
      end
    end

    unless last_seqnum
      $stderr.puts "ERROR: no archived hourly files; specify --next-seqnum" +
                   " to manually reset state"
      exit 1
    end

    printf "... found %s.\n", last_seqnum
    @next_seqnum = last_seqnum.to_i + 1

    @consumer.subscribe($hourly_topic)
    loop do
      message = @consumer.poll(250)
      if !message
          break
      end

      if message.payload == last_hourly_file
        printf "Found last archived hourly file '%s' at offset %d in" +
               " topic %s.\n", last_hourly_file, message.offset, $hourly_topic
        @consumer.commit unless $options.dry_run
        @consumer.unsubscribe
        return
      end
      @consumer.commit unless $options.dry_run
    end

    @consumer.unsubscribe

    $stderr.printf "WARNING: couldn't find last archived hourly file" +
                   " '%s' in topic %s.\n", last_hourly_file, $hourly_topic
    unless $options.force
      $stderr.puts "ERROR: please specify --force to continue anyway"
      exit 1
    end
  end


  # dca-us.team-probing.c006950.20180916-00.warts
  HOURLY_RE = /^([a-z]{3}\d?-[a-z]{2})\.([a-z0-9-]+)\.c\d+\.\d{8}-\d{2}\.warts$/

  def process_hourly_files
    puts "Processing '#{$hourly_topic}' ..."
    @consumer.subscribe($hourly_topic)
    loop do
      @consumer.each do |message|
        m = HOURLY_RE.match message.payload
        if m
          printf "%d: %p\n", message.offset, message.payload
          monitor, activity = m.captures()
          if activity != $options.activity
            @consumer.commit unless $options.dry_run
            next
          end
          copy_hourly_file monitor, message.payload
        else
          printf "ERROR: %d: %p\n", message.offset, message.payload
        end
        @consumer.commit unless $options.dry_run
      end

      exit_if_requested()
    end
  end


  def copy_hourly_file(monitor, hourly_file)
    source_path = sprintf "%s/%s/%s", $hourly_dir, monitor, hourly_file
    unless File.exists? source_path
      printf "WARNING: hourly file '%s' missing\n", source_path
      return
    end

    dest_file = sprintf "%08d.%s", @next_seqnum, hourly_file
    @next_seqnum += 1

    tmp_dest_path = $options.archive_dir + "/." + dest_file
    command = sprintf "cp -p %s %s", source_path, tmp_dest_path
    sh command

    if $options.compress
      command = sprintf "rm %s.gz 2>/dev/null; gzip %s",
                        tmp_dest_path, tmp_dest_path
      sh command

      tmp_dest_path += ".gz"
      dest_file += ".gz"
    end

    dest_path = $options.archive_dir + "/" + dest_file
    command = sprintf "mv %s %s", tmp_dest_path, dest_path
    sh command
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

archiver = HourlyArchiver.new
archiver.recover_state()
archiver.process_hourly_files()
