#!/usr/bin/ruby

#############################################################################
## Monitors the tuple space for file completion notices and downloads files
## to the storage server at CAIDA.
##
## This script should generically work with any activities running on Ark
## (such as team probing) that conform to the required directory structure.
## Specifically, files to be downloaded must be stored on each monitor
## in the directory $ARK/activity/<activity>/finished
## (e.g., $ARK/activity/team-probing/finished/team-1-1.warts).
##
## Multiple instances of this script can be run (and typically will be run)
## to improve throughput and to improve robustness against slow or hung
## transfers.
##
## This script runs on a central server at CAIDA.
##
## $Id: ark-collector,v 1.56 2018/10/11 22:11:21 youngh Exp $
#############################################################################

# TODO how many partitions do my kafka topics have? is this dynamic? need at
# least as many partitions as I want to have consumers, more is better

# PACKAGES
# https://docs.confluent.io/platform/current/installation/installing_cp/deb-ubuntu.html#systemd-ubuntu-debian-install
# https://packages.confluent.io/deb/7.4/pool/main/c/confluent-kafka/
# apt-get install confluent-kafka openjdk-17-jre-headless

# view status of consumers:
# kafka-consumer-groups --command-config /etc/kafka/config.properties --bootstrap-server 127.0.0.1:9092 --all-groups --all-topics --describe

$0 = "ark-collector"

$script_dir = File.dirname(__FILE__)
$: << $script_dir

require 'rubygems'
require 'ostruct'
require 'optparse'
require 'syslog'
require 'thread'
require 'fileutils'
require 'socket'

require 'rdkafka'
require 'arkutil/processutils'

Thread.abort_on_exception = true

$hostname = Socket::gethostname

$options = OpenStruct.new
$options.staging = "/staging"
$options.remote_root = "/var/lib/ark/activity"
$options.delete_downloaded = true
$options.max_slow_attempts = 1
$options.arksync = $script_dir + "/arksync"
$options.remote_rsync_path = "/usr/bin/rsync"
$options.collector = "127.0.0.1:9092"
$options.topic = "ark-activity-data"
$options.id = "ark-collector-" + Process.pid.to_s

unless File.exists? $options.arksync
  $options.arksync = "arksync" # assume it's in PATH
end

opts = OptionParser.new
opts.on("--staging", "=DIR",
	"local data-staging root dir (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--remote-root", "=DIR",
	"remote activity root dir (#{$options.remote_root})") do |v|
  $options.remote_root = v
end

opts.on("--[no-]delete-on-server", TrueClass,
	"delete files after downloading (#{$options.delete_downloaded})") do |v|
  $options.delete_downloaded = v
end

# TODO does this need to be exposed? Repurpose it now that the consumer and
# kafka topic are tracking retries, rather than arksync blocking and waiting?
opts.on("--max-slow-attempts", Integer, "=NUM",
        "max 'slow' retry attempts (15=24 hours) " +
        "(#{$options.max_slow_attempts})") do |v|
  $options.max_slow_attempts = v
end

# Removed the retry option, as it wasn't required to get similar behaviour
# as the old ark-collector. Previously, if fetching failed then the marinda
# lock for the probe was left locked until manually reset (at which point it
# would try to fetch all files listed in marinda for that probe). Now, failed
# fetches are retried from the kafka retry topic. Restarting the measurement
# software on a probe will also send new completion messages for all existing
# data files on the probe that might have been missed.
# TODO maybe make this read from the dead letter queue?
opts.on("-R", "--retry", "=MONITOR",
    "(removed, no longer required)") do |v|
  $stderr.puts "Option -R/--retry disabled, retries should be automatic"
  exit 1
end

# TODO it might be worth having multiple retry topics of different durations?
# e.g. retry hourly for a day, then daily for a week. Look into how many
# messages end up in the dead letter topic
opts.on("-t", "--topic", "=TOPIC", String,
    "fetch messages from the specified topic (#{$options.topic})") do |v|
  $options.topic = v
end

opts.on("-d", "--[no-]detach", TrueClass,
	"run as detached (daemon) process") do |v|
  $options.detach = v
end

opts.on("--collector", "=ADDR", String,
    "collector server to report to (#{$options.collector})") do |v|
  $options.collector = v
end

opts.on("--id", "=ID", String,
    "kafka client id (#{$options.id})") do |v|
  $options.id = v
end

opts.on("-v", "--[no-]verbose", TrueClass, "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

unless $options.staging && $options.remote_root
  $stderr.puts "ERROR: missing --staging or --remote-root argument"
  $stderr.puts opts
  exit 1
end

$local_root = $options.staging
$remote_root = $options.remote_root

unless File.directory? $local_root
  $stderr.puts "ERROR: local data-staging root directory '#{$local_root}' " +
    "doesn't exist or is not a directory"
  exit 1
end

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

syslog_options = Syslog::LOG_PID
syslog_options |= Syslog::LOG_PERROR unless $options.detach
$log = Syslog.open("ark-collector", syslog_options, Syslog::LOG_LOCAL0)
Syslog.mask = Syslog::LOG_UPTO(Syslog::LOG_DEBUG) if $options.verbose

Signal.trap("TERM") do
  $log.info "exiting on SIGTERM"
  exit 1
end

Signal.trap("INT") do
  $log.info "exiting on SIGINT"
  exit 1
end

$can_exit_immediately = false
$exit_requested = false

Signal.trap("HUP") do
  if $can_exit_immediately
    $log.info "exiting on SIGHUP"
    exit 1
  else
    $log.info "got SIGHUP; will exit at next opportunity"
    $exit_requested = true
  end
end

def exit_if_requested
  if $exit_requested
    $log.info "exiting on SIGHUP"
    exit 1
  end
end

def log_exception(exn)
  msg = exn.class.name + ": " + exn.to_s
  $log.err "aborting on exception: %s; backtrace: %s", msg,
    exn.backtrace.join(" <= ")
  msg
end

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

# TODO can we override the rebalance_cb to pause new partitions?
# https://github.com/appsignal/rdkafka-ruby/blob/main/lib/rdkafka/bindings.rb
# https://stackoverflow.com/questions/5944278/overriding-method-by-another-defined-in-module
class RebalanceListener

  def initialize
    @added_partitions = Rdkafka::Consumer::TopicPartitionList.new
    @revoked_partitions = Rdkafka::Consumer::TopicPartitionList.new
  end

  # ideally we'd pause any partitions we gain here, but calling
  # pause/resume inside the handler leads to recursive locking
  def on_partitions_assigned(partitions)
    @added_partitions = partitions
    $log.debug "Rebalancing, assigned partitions: %s", partitions.to_s
  end

  # ideally we'd unpause any partitions we lose here, but calling
  # pause/resume inside the handler leads to recursive locking
  def on_partitions_revoked(partitions)
    @revoked_partitions = partitions
    $log.debug "Rebalancing, lost partitions: %s", partitions.to_s
  end

  # run during the polling loop to update paused state of changed partitions
  def update_paused_tpls
    # pause any new partitions
    if !@added_partitions.empty?
      $g_collector.pause(@added_partitions)
      @added_partitions = Rdkafka::Consumer::TopicPartitionList.new
    end

    # unpause any lost partitions
    if !@revoked_partitions.empty?
      $g_collector.resume(@revoked_partitions)
      @revoked_partitions = Rdkafka::Consumer::TopicPartitionList.new
    end
  end
end

# The region used by ark-collector and remote m-scripts to manage
# file transfer.
collector_config = {
  :"bootstrap.servers" => $options.collector,
  :"client.id" => $options.id,
  :"sasl.mechanism" => "SCRAM-SHA-256",
  :"sasl.username" => (ENV['ARK_COLLECTOR_USERNAME'] || $0),
  :"sasl.password" => (ENV['ARK_COLLECTOR_PASSWORD'] || $0),
  :"security.protocol" => "SASL_PLAINTEXT",
}
# TODO check that this configuration will retry sending messages forever
producer_config = collector_config.merge({
  :"enable.idempotence" => true,
  :"message.timeout.ms" => 0,
  :"message.send.max.retries" => 10000000,
})
# ark-collector processes using the same topic share a group id, so that
# partitions are divided among them.
# ark-collector processes use unique group instance ids, so that they can
# reconnect quickly and get the same partitions without causing a rebalance
# TODO only set group.instance.id if $options.id is not the default value?
consumer_config = collector_config.merge({
  :"group.id" => "ark-collector-" + $options.topic,
  :"group.instance.id" => $options.id,
  :"enable.auto.commit" => false,
  :"enable.auto.offset.store" => false,
  :"auto.offset.reset" => "earliest",
  :"partition.assignment.strategy" => "cooperative-sticky",
})

# config needs to be instantiated before the rebalance listener can be set
$rebalancer = RebalanceListener.new
config = Rdkafka::Config.new(consumer_config)
config.consumer_rebalance_listener = $rebalancer
$g_collector = config.consumer
$g_collector.subscribe($options.topic)

# producer is used to add messages to the retry topic
$g_producer = Rdkafka::Config.new(producer_config).producer


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

if $options.detach
  $stderr.puts "starting collector process in background\n"
  ArkUtil.daemonize()
  $log.info("started collector process in background")
end

#============================================================================
#============================================================================

# Convert the TopicPartitionList into a list of integers for easier comparison
# with message partitions
def partition_assignment_list
  $g_collector.assignment.to_h.values[0].map{|p| p.partition}
end


# Main normal processing loop: download files from all monitors.
def download_files
  $can_exit_immediately = true

  $g_collector.each do |msg|
    $log.debug "@ [%s]", msg

    begin
      data = JSON.parse(msg.payload)
    rescue JSON::ParserError, TypeError
      data = nil
    end

    if !data
      $g_collector.store_offset(msg)
      $g_collector.commit
      next
    end

    # pause all assigned partitions while processing this message, as we
    # may need to poll() in order to keep the connection alive but don't
    # want to get any new messages when we do
    $g_collector.pause($g_collector.assignment)

    # TODO there is probably a better way to check for a retried message
    # than checking for the presence of this header, but it works for now
    if msg.headers.key?("attempts")
      # TODO put the next retry timestamp in the message rather than now?
      # TODO do we care if we get rebalanced and an earlier message is now
      # available? At this stage I don't think when retries happen is that
      # important

      # check message timestamp compared with now, see if it's time to retry
      until msg.timestamp + 3600 < Time.now do
        # can't just sleep till due, in case rebalancing takes away this message
        unwanted = $g_collector.poll(1000)
        if unwanted
          $g_collector.seek(unwanted)
        end
        $rebalancer.update_paused_tpls()

        # if we lose the partition this message was from then get a new one
        if !partition_assignment_list.include?(msg.partition)
          $g_collector.resume($g_collector.assignment)
          next
        end
      end
    end

    $can_exit_immediately = false

    # start a thread to perform the download while we continue to poll
    # the server, otherwise it thinks we've gone away
    thr = Thread.new {
      download_file data["monitor"], data["activity"], data["filename"]
    }

    # wait for the thread to finish downloading the file, and regularly
    # call poll() to keep the connection alive
    until thr.join(1) do
      unwanted = $g_collector.poll(250)
      # if a rebalancing occurred then maybe we got a message from a
      # new partition before it got paused - if so put it back
      if unwanted
        $g_collector.seek(unwanted)
      end
      $rebalancer.update_paused_tpls()

      # if assignments changed, we may need to stop working on this message
      if !partition_assignment_list.include?(msg.partition)
        $log.info "Lost partition with active message, killing arksync"
        thr.exit
      end
    end

    # A value of false means arksync failed to transfer, send the message
    # to the retry topic so it can be tried again later.
    # Don't reschedule if it was stopped (nil) or succeeded (true).
    if thr.value == false
      attempts = (msg.headers["attempts"] || 0).to_i + 1
      # TODO how long to retry before giving up?
      # TODO generate the retry and deadletter topic names from $options.topic
      if attempts < 48
        $log.info "scheduling retry of '%s:%s:%s' (attempts %d)",
                data["monitor"], data["activity"], data["filename"], attempts
        $g_producer.produce(
          topic: "ark-activity-data-retry",
          payload: msg.payload,
          key: msg.key,
          headers: msg.headers.merge({"attempts": attempts}),
        )
      else
        $log.info "giving up on '%s:%s:%s'",
                data["monitor"], data["activity"], data["filename"]
        $g_producer.produce(
          topic: "ark-activity-data-deadletter",
          payload: msg.payload,
          key: msg.key,
          headers: msg.headers,
        )
      end
    end

    if partition_assignment_list.include?(msg.partition)
      # message was processed in some way, commit the offset so we can move on
      $g_collector.store_offset(msg)
      $g_collector.commit
    end

    # ready to receive another message, so unpause all assigned partitions
    $g_collector.resume($g_collector.assignment)

    $can_exit_immediately = true  # must be before exit_if_requested

    exit_if_requested
  end
end


# Returns true on successful download or if the remote file doesn't exist,
# and false on error.
def download_file(monitor, activity, file)
  remote_file = "#{$remote_root}/#{activity}/finished/#{file}"
  local_dir = "#{$local_root}/#{activity}/#{monitor}"
  local_file = "#{local_dir}/#{file}"

  unless File.exists? local_dir
    FileUtils.mkpath local_dir
    FileUtils.touch local_dir + "/.GOOD"
  end

  if File.exists? local_file
    $log.info "skipping already downloaded file '%s:%s:%s' -> '%s'",
      monitor, activity, file, local_file
    return true
  end

  delete_option = ($options.delete_downloaded ? "--remove-source-files" : "")
  command = sprintf "%s --include=%s --max-slow-attempts=%d -- " \
                    "--ignore-missing-args %s --rsync-path=%s ark@%%MON:%s %s",
                    $options.arksync, monitor, $options.max_slow_attempts,
                    delete_option, $options.remote_rsync_path,
                    remote_file, local_file
  $log.debug "%s", command if $options.verbose

  if system command
    if File.exists? local_file
      $log.info "successful download of '%s:%s:%s'", monitor, activity, file

      stats = {
        timestamp: Time.now.to_i,
        monitor: monitor,
        activity: activity,
        file: file,
        size: File.size(local_file),
      }

      # send the filesize to the stats topic for generating emails
      $g_producer.produce(
        topic: "ark-download-stat",
        payload: stats.to_json,
        key: monitor,
      )

      # send the metadata to another kafka topic for later processing/merging
      $g_producer.produce(
        topic: "ark-downloaded-data",
        payload: "#{monitor}|#{activity}|#{file}",
        key: monitor,
      )
      return true
    else
      $log.info "skipping missing remote file '%s:%s:%s'",
                monitor, activity, file
      return true
    end
  else
    $log.err "failed to download '%s:%s:%s'", monitor, activity, file
    return false
  end
end


#============================================================================
# MAIN
#============================================================================

begin
  $log.debug "threads at startup: %s",
    Thread.list.map {|t| t.inspect}.join("; ")
  download_files

rescue
  log_exception $!
end
