#!/usr/bin/ruby

#############################################################################
## Makes latest daily files publicly downloadble by placing them in the
## right YYYY/cycle-YYYYMMDD directory.
##
## For --offline runs, do something like the following:
##
##   $ find /staging/team-probing.daily -type f \
##     -name \*.warts | sort | team-daily-archiver --offline -v
##
## $Id: team-daily-archiver,v 1.10 2018/11/14 20:57:50 youngh Exp $
#############################################################################

$0 = "ark-team-daily-archiver"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'sqlite3'
require "rdkafka"

$options = OpenStruct.new
$options.activity = "team-probing"
$options.archive_dir = "/data/topology/ark/data/team-probing/list-7.allpref24/team-1/daily"
$options.staging = "/staging"
# topov6 targets and intermediary files are in /staging/topo-v6.update-prep/
# so for now do something similar here, rather than using something like on
# indy: {/usr/local,/var}/lib/ark/activity/team-probing/. Where should these
# ultimately be?
$options.local_root = "/staging/team-probing.update-prep"
$options.state_db = $options.local_root + "/cycle.db"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092
$options.compress = true
$options.log = true

opts = OptionParser.new

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--archive-dir", "=DIR",
	"daily archive directory (#{$options.archive_dir})") do |v|
  $options.archive_dir = v
end

opts.on("--staging", "=DIR",
        "staging dir for Ark data (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--local-root", "=DIR",
	"local root directory for activity (#{$options.local_root})") do |v|
  $options.local_root = v
end

opts.on("--state-db", "=PATH",
        "teamx-master database of cycle start dates (#{$options.state_db})") do |v|
  $options.state_db = v
end

opts.on("--host", "=IP", "Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer, "Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--[no-]compress", TrueClass, "compress archived hourly files") do |v|
  $options.compress = v
end

opts.on("--[no-]log", TrueClass, "log daily files .daily-archive topic") do |v|
  $options.log = v
end

opts.on("--offline", TrueClass, "process offline daily files, then exit") do |v|
  $options.offline = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

$options.staging = File.expand_path $options.staging

$daily_dir = sprintf "%s/%s.daily", $options.staging, $options.activity

[["Ark staging", $options.staging],
 ["daily", $daily_dir],
 ["archive", $options.archive_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$daily_topic = $options.activity + ".daily"
$archive_topic = $options.activity + ".daily-archive"

#............................................................................

$can_exit_immediately = false
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class CyclesDB

  def initialize
    @db = SQLite3::Database.new $options.state_db, :readonly => true
    @db.busy_handler do |n| true end   # always retry on busy

    @dir_cache = {}  # cycle => YYYY/cycle-YYYYMMDD
  end


  def cycle_dir(cycle)
    dir = @dir_cache[cycle]
    unless dir
      start_date = @db.get_first_value("SELECT start_date FROM Cycles
                                         WHERE cycle_id=? ", cycle)
      unless start_date
        $stderr.printf "ERROR: no record in Cycles database for cycle %p\n",
                       cycle
        exit 1
      end

      dir = sprintf "%s/cycle-%s", start_date[0..3], start_date
      @dir_cache[cycle] = dir

      path = $options.archive_dir + "/" + dir
      printf "Creating %s ...\n", path if $options.verbose
      FileUtils.mkpath path unless $options.dry_run
    end

    dir
  end

end


#===========================================================================

class DailyArchiver

  def initialize(db)
    @db = db

    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    collector_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-team-daily-archiver",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TEAM_DAILY_ARCHIVER_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TEAM_DAILY_ARCHIVER_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    producer_config = collector_config
    consumer_config = collector_config.merge({
        :"group.id" => "ark-team-daily-archiver",
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
    })
    @consumer = Rdkafka::Config.new(consumer_config).consumer
    @producer = Rdkafka::Config.new(producer_config).producer

    @current_daily = {}  # monitor => <cycle>.<yyyymmdd>
  end


  # dca-us.team-probing.c006950.20180916.warts
  DAILY_RE = /^([a-z][a-z0-9-]{0,31})\.([a-z0-9-]+)\.c(\d+)\.\d{8}\.warts$/

  def process_daily_files
    puts "Processing '#{$daily_topic}' ..."
    @consumer.subscribe($daily_topic)
    loop do
      @consumer.each do |message|
        m = DAILY_RE.match message.payload
        if m
          printf "%d: %p\n", message.offset, message.payload
          monitor, activity, cycle = m.captures()
          if activity != $options.activity
            @consumer.commit unless $options.dry_run
            next
          end

          copy_daily_file monitor, cycle.to_i, message.payload
        else
          printf "ERROR: %d: %p\n", message.offset, message.payload
        end
        @consumer.commit unless $options.dry_run
      end

      exit_if_requested()
    end
  end


  def process_offline_daily_files
    puts "Processing offline daily files ..."
    ARGF.each do |path|
      next if path =~ /^\s*$/
      path.chomp!

      filename = File::basename path
      m = DAILY_RE.match filename
      if m
        puts filename
        monitor, activity, cycle = m.captures()
        next unless activity == $options.activity

        copy_daily_file monitor, cycle.to_i, filename
      else
        $stderr.printf "ERROR: invalid daily file name '%s' (full path: %s)\n",
                       filename, path
        exit 1
      end

      exit_if_requested()
    end
  end


  def copy_daily_file(monitor, cycle, daily_file)
    source_path = sprintf "%s/%s/%s", $daily_dir, monitor, daily_file
    unless File.exists? source_path
      printf "WARNING: daily file '%s' missing\n", source_path
      return
    end

    cycle_dir = @db.cycle_dir cycle
    dest_path = sprintf "%s/%s/%s", $options.archive_dir, cycle_dir, daily_file

    tmp_dest_path = sprintf "%s/%s/.%s", $options.archive_dir, cycle_dir,
                            daily_file
    command = sprintf "cp -p %s %s", source_path, tmp_dest_path
    sh command

    if $options.compress
      command = sprintf "rm %s.gz 2>/dev/null; gzip %s",
                        tmp_dest_path, tmp_dest_path
      sh command

      tmp_dest_path += ".gz"
      dest_path += ".gz"
    end

    command = sprintf "mv %s %s", tmp_dest_path, dest_path
    sh command

    if $options.log && !$options.dry_run
      log_path = cycle_dir + "/" + daily_file
      log_path += ".gz" if $options.compress
      message = sprintf "%d|%s", Time.now.to_i, log_path
      @producer.produce(topic: $archive_topic, payload: message)
    end
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

db = CyclesDB.new
archiver = DailyArchiver.new db

if $options.offline
  archiver.process_offline_daily_files()
else
  archiver.process_daily_files()
end
