#!/usr/bin/ruby

#############################################################################
## Reads the <activity>.daily-archive topic in Kafka and writes out the
## last N created daily files to the daily-creation.log.
##
## $Id: team-daily-logger,v 1.9 2019/03/08 01:14:04 youngh Exp $
#############################################################################

$: << File.dirname(__FILE__)
$0 = "ark-team-daily-logger"

require 'rubygems'
require 'fileutils'
require 'optparse'
require 'ostruct'

require 'rdkafka'

$options = OpenStruct.new

# The number of daily files to list in the daily-creation.log.  This should
# be a few multiples of the total number of monitors so that at least a
# few days worth of daily files always show up in the log.
$options.count = 600
$options.interval = 3600 # secs between log updates

$options.activity = "team-probing"
$options.archive_dir = "/data/topology/ark/data/team-probing/list-7.allpref24/team-1/daily"
$options.kafka_host = "127.0.0.1"
$options.kafka_port = 9092

opts = OptionParser.new

opts.on("--count", "=NUM", Integer,
        "number of log entries (#{$options.count})") do |v|
  $options.count = v
end

opts.on("--interval", "=SECS", Integer,
        "secs between log updates (#{$options.interval})") do |v|
  $options.interval = v
end

opts.on("--activity", "=NAME",
	"Ark activity to process (#{$options.activity})") do |v|
  $options.activity = v
end

opts.on("--archive-dir", "=DIR",
	"daily archive directory (#{$options.archive_dir})") do |v|
  $options.archive_dir = v
end

opts.on("--host", "=IP", "Kafka host (#{$options.kafka_host})") do |v|
  $options.kafka_host = v
end

opts.on("--port", "=NUM", Integer, "Kafka port (#{$options.kafka_port})") do |v|
  $options.kafka_port = v
end

opts.on("--dry-run", TrueClass, "only show what will happen") do |v|
  $options.dry_run = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#............................................................................

[["archive", $options.archive_dir]].each do |label, path|
  unless File.directory? path
    $stderr.printf "ERROR: %s directory '%s' doesn't exist\n", label, path
    exit 1
  end
end

$archive_topic = $options.activity + ".daily-archive"

#............................................................................

$can_exit_immediately = true
$exit_requested = false

Signal.trap("TERM") do
  if $can_exit_immediately
    $stderr.puts "exiting on SIGTERM"
    exit 1
  else
    $stderr.puts "got SIGTERM; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $stderr.puts "exiting on SIGTERM"
    exit 1
  end
end


#===========================================================================

class DailyLogger

  def initialize
    host_port = sprintf "%s:%d", $options.kafka_host, $options.kafka_port
    consumer_config = {
        :"bootstrap.servers" => host_port,
        :"client.id" => "ark-team-daily-logger",
        :"group.id" => "ark-team-daily-logger",
        :"enable.partition.eof" => true,
        :"enable.auto.commit" => false,
        :"auto.offset.reset" => "earliest",
        :"sasl.mechanism" => "SCRAM-SHA-256",
        :"sasl.username" => (ENV['ARK_TEAM_DAILY_LOGGER_USERNAME'] || $0),
        :"sasl.password" => (ENV['ARK_TEAM_DAILY_LOGGER_PASSWORD'] || $0),
        :"security.protocol" => "SASL_PLAINTEXT",
    }
    @consumer = Rdkafka::Config.new(consumer_config).consumer
    @consumer.subscribe($archive_topic)
  end


  def close
    @consumer.close()
    @consumer = nil
  end


  # dca-us.team-probing.c006950.20180916.warts
  #DAILY_RE = /^([a-z]{3}\d?-[a-z]{2})\.([a-z0-9-]+)\.c(\d+)\.\d{8}\.warts$/

  def process_daily_files
    puts "Processing '#{$archive_topic}' ..."

    # poll to get the current offset (and ignore the returned message for now,
    # we'll work our way back up to it later)
    loop do
      begin
        @consumer.poll(1000)
        break if not @consumer.committed().empty?
      rescue Rdkafka::RdkafkaError => error
        if !error.is_partition_eof?
          raise error
        end
      end
    end

    entries = []
    need_commit = false

    # get the current offset for what should be 1 topic, with 1 partition
    offset = @consumer.committed().to_h[$archive_topic][0].offset.to_i

    # seek() takes a message, so build a fake one with the desired offset
    fake_msg = OpenStruct.new(
        topic: $archive_topic,
        partition: 0,
        offset: offset - $options.count
    )

    # jump back some messages so there is data to show in the log
    @consumer.seek(fake_msg)

    begin
      # consume all recent messages until eof
      @consumer.each do |message|
        printf "%d: %p\n", message.offset, message.payload
        entries << message.payload.split("|")
        if message.offset >= offset
          need_commit = true
        end
      end
    rescue Rdkafka::RdkafkaError => error
      if !error.is_partition_eof?
        raise error
      end
    end

    log_path = $options.archive_dir + "/daily-creation.log"

    tmp_log_path = log_path + ".updating"
    File.open(tmp_log_path, "w") do |out|
      gen_date = Time.now.to_s
      out.print <<EOF
# Format: 1
# Fields: timestamp daily_path
# Generated: #{gen_date}
# --------------------------------------------------------------------------
EOF
      entries.each do |timestamp, filename|
        out.printf "%s\t%s\n", timestamp, filename
      end
    end

    command = sprintf "mv %s %s", tmp_log_path, log_path
    sh command

    # only commit offsets once the file is written and moved into place,
    # and only if there were new messages otherwise it triggers an error
    if need_commit
      @consumer.commit
    end
  end


  def sh(command)
    puts command if $options.verbose || $options.dry_run
    return if $options.dry_run

    unless system command
      msg = sprintf "couldn't execute command: %p: %s", $?, command
      $stderr.puts "ERROR: " + msg
      exit 1
    end
  end

end


#############################################################################
# MAIN
#############################################################################

loop do
  logger = DailyLogger.new
  $can_exit_immediately = false
  logger.process_daily_files()
  logger.close()
  logger = nil

  $can_exit_immediately = true
  exit_if_requested()

  printf "Sleeping till %s ...\n", Time.now + $options.interval
  sleep $options.interval
end
