#!/usr/bin/ruby

#############################################################################
## Executes scamper on each monitor to conduct IPv4/IPv6 traceroute
## measurements while communicating with teamx-master to obtain targets.
##
## This is useful for team-probing and tracetun measurements and possibly
## for yarrp6 (with a yarrp-specific target generator).
#############################################################################

$0 = "ark-team-prober"

require 'rubygems'
require 'ostruct'
require 'optparse'
require 'fileutils'
require 'net/http'

require 'rdkafka'
require 'arkutil/producer'

$options = OpenStruct.new
$options.target = "targets.ark.caida.org"
$options.collector = "collector.ark.caida.org"

opts = OptionParser.new

opts.on("-m", "--monitor", "=MONITOR", "canonical monitor name") do |v|
  $options.monitor = v
end

opts.on("--target", "=ADDR", String,
        "server to fetch targets from (#{$options.target})") do |v|
  $options.target = v
end

opts.on("--collector", "=ADDR", String,
        "collector server to report to (#{$options.collector})") do |v|
  $options.collector = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

unless $options.monitor
  $stderr.puts "ERROR: missing --monitor argument"
  $stderr.puts opts
  exit 1
end

$monitor = $options.monitor
$activity = "team-probing"
$topic = "ark-activity-data"
$root = "/var/lib/ark/activity/#{$activity}"
$finished_dir = $root + "/finished"
$ramdisk = "/ramdisk"

if File.symlink? $root and not File.directory? $root
  # this block of code will run if a symlink exists, but the directory
  # underlying the symlink does not exist.
  x = File.readlink $root
  begin
    FileUtils.mkpath x
  rescue
    $stderr.puts "Could not mkpath #{$x}"
    exit 1
  end
elsif File.directory? $ramdisk and not File.directory? $root
  # if we have /ramdisk available, use it -- create /ramdisk/team-probing
  # and symlink /var/lib/ark/activity/team-probing to it.
  begin
    FileUtils.mkpath "#{$ramdisk}/#{$activity}"
  rescue
    $stderr.puts "Could not mkpath #{$ramdisk}/#{$activity}"
    exit 1
  end
  begin
    FileUtils.ln_s("#{$ramdisk}/#{$activity}", $root)
  rescue
    $stderr.puts "Could not symlink #{$ramdisk}/#{$activity} to #{$root}"
    exit 1
  end
elsif not File.directory? $root
  # we do not have /ramdisk available, so just create
  # /var/lib/ark/activity/team-probing
  begin
    FileUtils.mkpath $root
  rescue
    $stderr.puts "Could not mkpath #{$root}"
    exit 1
  end
end

# make sure the root directory exists, before going any further
unless File.directory? $root
  $stderr.printf "ERROR: missing activity directory '%s'\n", $root
  exit 1
end

# do not start probing if there's a stop-probing file present
if(File.file?($root + "/stop-probing"))
  $stderr.puts "found stop-probing file; exiting"
  exit 0
end

# create the finished directory if it does not exist.
begin
  FileUtils.mkpath $finished_dir
rescue
  $stderr.puts "Could not mkpath #{$finished_dir}"
  exit 1
end

#---------------------------------------------------------------------------

#============================================================================
def handle_probe_request(cycle, target_file)
  target_path = $root + "/" + target_file
  unless File.exist? target_path
    $stderr.printf "missing target file '%s'\n", target_path
    sleep_at_least 30  # prevent fast loops caused by a bug or other problem
    return
  end

  sleep_at_least 2  # hack to prevent re-use of timestamps in filenames
  now = Time.now
  yyyymmdd = yyyymmdd_utc now
  out_name = sprintf "%s.%s.%d.c%06d.%s.warts.gz",
                     $monitor, $activity, now.to_i, cycle, yyyymmdd
  out_path = $root + "/finished/" + out_name
  inprogress_path = out_path + ".inprogress"

  command = sprintf "/usr/bin/ark-team-prober-worker %d %s %s",
                    cycle, target_path, inprogress_path
  sh command

  File.delete target_path
  File.rename inprogress_path, out_path

  message = sprintf "probed in %.1f secs\n", Time.now - now
  $stderr.printf message

  # create a fresh producer for each report so certs get reloaded
  ArkUtil::ArkProducer.new($options.collector, $monitor, $topic) do |producer|
    data = {
      monitor: $monitor,
      activity: $activity,
      filename: out_name
    }
    producer.produce(data)
  end
end


#def handle_control_message(command)
#  case command
#  when "quit"
#    $stderr.puts "Exiting by request..."
#    exit 0
#  else
#    $stderr.printf "invalid CONTROL command: %p\n", command
#  end
#end


def sh(command)
  puts command
  unless system command
    msg = sprintf "couldn't execute command: %p: %s", $?, command
    $stderr.puts "ERROR: " + msg
    exit 1
  end
end


def sleep_at_least(delay)
  $stderr.printf "sleeping for %d seconds\n", delay
  while delay > 0
    delay -= sleep delay
  end
end


def download_target_file(response)
  if response["x-ark-team-probing-cycle"] && response["x-ark-team-probing-cycle"].to_i > 0
    filename = sprintf "targets.%d.%s", Time.now.to_i, $options.monitor
    $stderr.puts "saving " + filename
    open $root + "/" + filename, 'w' do |io|
      response.read_body do |chunk|
        io.write chunk
      end
    end
    return response["x-ark-team-probing-cycle"].to_i, filename
  end

  $stderr.puts "invalid response to target file request"
  return false, false
end


def yyyymmdd_utc(timestamp)
  return Time.at(timestamp).gmtime.strftime("%Y%m%d")
end


def get_http_config
  # resolve the target address ourselves so that we can force use of ipv4,
  # because if it can't do that then it can't perform the measurements either
  addr = Addrinfo.getaddrinfo($options.target, 443, :INET).first.ip_address

  # Ruby 2 doesn't expose Net::HTTP::extra_chain_cert like Ruby 3 does, nor
  # does it have OpenSSL::X509::Certificate::load, which makes it hard to
  # load a certificate chain to use the new certificates. Instead the server
  # now provides the intermediate, so we can get away with just a single cert
  http_config = {
    ipaddr: addr,
    use_ssl: true,
    verify_mode: OpenSSL::SSL::VERIFY_PEER,
    key: OpenSSL::PKey::EC.new(File.read(ArkUtil::ArkProducer.which_key($monitor))),
    cert: OpenSSL::X509::Certificate.new(File.read(ArkUtil::ArkProducer.which_cert($monitor))),
    #extra_chain_cert: OpenSSL::X509::Certificate.load(File.read(ArkUtil::ArkProducer.which_cert($monitor))),
    ca_file: ArkUtil::ArkProducer.which_cacert,
    ciphers: "DEFAULT@SECLEVEL=#{ArkUtil::ArkProducer.which_seclevel($monitor)}",
  }
  return http_config
end


#############################################################################
# MAIN
#############################################################################

## Re-adds ["DATA"] tuples into the global collector region for each finished
## output file that exists on the current monitor so that they will be
## downloaded and processed.
##
## Be sure to do this before starting probing; otherwise, ["DATA"] tuples
## won't be in the right order and will cause problems.
##
## Example: muc-de.team-probing.1692153679.c010848.20230816.warts.gz
FINISHED_FILE_RE = /^[^.]+\.#{$activity}\.\d+\.c\d+\.\d+\.warts(\.gz)?$/

unsent = Dir.entries($finished_dir).grep(FINISHED_FILE_RE)
if unsent.length > 0
  ArkUtil::ArkProducer.new($options.collector, $monitor, $topic) do |producer|
    unsent.sort.each do |filename|
      $stderr.puts "sending completion notification for " + filename
      data = {
        monitor: $monitor,
        activity: $activity,
        filename: filename
      }
      producer.produce(data)
    end
  end
end

begin
  loop do
    if(File.file?($root + "/stop-probing"))
      $stderr.puts "found stop-probing file; exiting"
      break
    end

    delay = 600
    cycleid = nil
    filename = nil
    http_config = get_http_config

    begin
      Net::HTTP.start($options.target, 443, http_config) do |http|
        http.request_get('/activity/team-probing/target') do |response|
          case response
          when Net::HTTPOK
            cycleid, filename = download_target_file(response)
          when Net::HTTPServiceUnavailable
            $stderr.puts "server has no target files available"
          when Net::HTTPTooManyRequests
            $stderr.puts "server is rate limiting us"
          end

          # if there was an expected http error, the response might include a
          # retry-after header with a time to wait
          if response["retry-after"]
            delay = response["retry-after"].to_i
          end
        end
      end
    rescue Errno::ECONNREFUSED, Errno::ECONNRESET
      $stderr.printf "failed to connect to target server '%s'\n", $options.target
    rescue Errno::ENETUNREACH
      $stderr.printf "no IPv4 to target server '%s'\n", $options.target
      delay = 86400
    end

    if cycleid && filename
      handle_probe_request(cycleid, filename)
    else
      # TODO is it worth doing exponential backoff, or smarter delays?
      sleep_at_least (delay + Random.rand(delay))
    end
  end

rescue
  short_msg = $!.class.name + ": " + $!.to_s
  msg = sprintf "teamx-driver: exiting on uncaught exception " +
    "at top-level: %p; backtrace: %s", $!, $!.backtrace.join(" <= ")
  $stderr.puts msg
  exit 1
end
