#!/usr/bin/ruby

#############################################################################
## This probes a random destination and the ::1 address in all routed IPv6
## prefixes on an ongoing basis.
##
## $Id: v6-prober-worker,v 1.2 2023/06/15 03:50:03 mjl Exp $
#############################################################################

$0 = "ark-v6-prober-worker"

require 'rubygems'
require 'ostruct'
require 'optparse'
require 'syslog'
require 'fileutils'
require 'net/http'

require 'arkutil/fileutils'
require 'arkutil/generalutils'
require 'arkutil/logging'
require 'arkutil/producer'
require 'rdkafka'

$options = OpenStruct.new
$options.cycle_duration = 12 * 3600  # secs
$options.scamper_port = 8745
$options.reconnect_delay = 300  # secs
$options.notify_collector = true
$options.collector = "collector.ark.caida.org"
$options.target = "targets.ark.caida.org"

$topic = "ark-activity-data"
$activity = "topo-v6"

opts = OptionParser.new

# Because there aren't that many prefixes, we don't want to just let
# scamper probe at 100pps, since we'll repeatedly probe target networks
# too closely in time.  Hence, we artificially space out measurements
# until a single cycle takes --cycle-duration seconds to complete.
opts.on("--cycle-duration", "=NUM", Integer,
	"min duration (secs) for a cycle (#{$options.cycle_duration})") do |v|
  $options.cycle_duration = v
end

opts.on("-m", "--monitor", "=MONITOR", "canonical monitor name") do |v|
  $options.monitor = v
end

opts.on("-s", "--scamper-port", "=NUM", Integer,
	"control socket port (#{$options.scamper_port})") do |v|
  $options.scamper_port = v.to_i
end

opts.on("--[no-]notify-collector", TrueClass,
        "notify ark-collector of finished files (true)") do |v|
  $options.notify_collector = v
end

opts.on("--syslog", TrueClass,
        "use Syslog instead of StderrLogger") do |v|
  $options.syslog = v
end

opts.on("--collector", "=ADDR", String,
        "collector server to report to (#{$options.collector})") do |v|
  $options.collector = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

unless $options.monitor
  $stderr.puts "ERROR: missing --monitor argument"
  $stderr.puts opts
  exit 1
end

$monitor = $options.monitor

#===========================================================================

$activity_root = "/var/lib/ark/activity/topo-v6"
$finished_dir = $activity_root + "/finished"

#===========================================================================

if $options.syslog
  syslog_options = Syslog::LOG_PID
  syslog_options |= Syslog::LOG_PERROR unless $options.detach
  $log = Syslog.open("v6-prober", syslog_options, Syslog::LOG_LOCAL0)
  Syslog.mask = Syslog::LOG_UPTO(Syslog::LOG_DEBUG) if $options.verbose
else
  $log = ArkUtil::StderrLogger.new
end

Signal.trap("TERM") do
  $log.info "exiting on SIGTERM"
  exit 2
end

Signal.trap("INT")  do
  $log.info "exiting on SIGINT"
  exit 2
end

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def sh(command)
  puts command
  unless system command, :out => File::NULL
    $stderr.printf "ERROR: couldn't execute command: %s\n", command
    exit 1
  end
end

TARGET_FILE_RE = /^(\d+)\.targets$/

def find_target_file
  # delete target files older than 2 days.  follow symlinks because
  # $activity_root can be a symlink
  cmd = sprintf "find -L %s -name \\*.targets -mtime +2 -delete", $activity_root
  sh cmd

  files = Dir.entries($activity_root).grep(TARGET_FILE_RE) { |filename|
    File.basename(filename) }.sort

  if $options.verbose
    files.each do |path|
      printf ">> %s\n", path
    end
  end

  return (files.empty? ? nil : files.last)
end

def got_ipv6()
  unless system "ip route get 2001:48d0:101:501:: >/dev/null 2>&1"
    return false
  end
  return true
end

def download_targets_file
  http_config = {
    use_ssl: true,
    verify_mode: OpenSSL::SSL::VERIFY_PEER,
    key: OpenSSL::PKey::EC.new(File.read(ArkUtil::ArkProducer.which_key($monitor))),
    cert: OpenSSL::X509::Certificate.new(File.read(ArkUtil::ArkProducer.which_cert($monitor))),
    #extra_chain_cert: OpenSSL::X509::Certificate.load(File.read(ArkUtil::ArkProducer.which_cert($monitor))),
    ca_file: ArkUtil::ArkProducer.which_cacert,
    ciphers: "DEFAULT@SECLEVEL=#{ArkUtil::ArkProducer.which_seclevel($monitor)}",
  }

  begin
    Net::HTTP.start($options.target, 443, http_config) do |http|
      http.request_get("/activity/#{$activity}/target") do |response|
        case response
        when Net::HTTPOK
		  filename = sprintf "%d.targets", Time.now.strftime('%Y%m%d')
	      $stderr.puts "saving " + filename
		  open $activity_root + "/" + filename, 'w' do |io|
	        response.read_body do |chunk|
	          io.write chunk
	        end
	      end
		  return true
        when Net::HTTPServiceUnavailable
          $stderr.puts "server has no target files available"
        when Net::HTTPTooManyRequests
          $stderr.puts "server is rate limiting us"
        end
      end
    end
  rescue Errno::ECONNREFUSED, Errno::ECONNRESET
    $stderr.printf "failed to connect to target server '%s'\n", $options.target
  end

  return false
end

def execute_measurements
  prefix_file = nil

  # check if we have IPv6 connectivity.  if we do not, then we exit
  # back to the ark-v6-prober, which will sleep until 24 hours has
  # passed since the last time it called ark-v6-prober-worker.
  if not got_ipv6
    $log.info "No IPv6 connectivity, sleeping till next cycle"
    return
  end

  # try to download the most up to date targets file
  for attempt in 1..5 do
    if download_targets_file
      break
    end
    sleep(attempt * 60)
  end

  # find the newest targets file with contents, even if we didn't
  # manage to download a new file this time around
  prefix_file = find_target_file

  # there is no useful target file available, try again tomorrow
  if prefix_file.nil?
    $log.info "No valid target file, sleeping till next cycle"
    return
  end

  ArkUtil::sleep_at_least 3   # keep cycle start times unique per monitor
  now = Time.now
  now.gmtime
  cycle_timestamp = now.to_i
  cycle_yyyymmdd = now.strftime "%Y%m%d"
  cycle_filename = sprintf "%d.%s.l8.%s", cycle_timestamp, $monitor,
                            cycle_yyyymmdd

  inprogress_path = $activity_root + "/" + cycle_filename + ".inprogress.warts.gz"
  cycle_filename += ".warts.gz"

  descr = sprintf "cycle_start=%d prefixes=%s", cycle_timestamp, prefix_file

  command = sprintf "/usr/bin/sc_prefixprober -p %d -c 'trace -P icmp-paris -q 3' -o %s -a %s/%s -d %d -L id=8 -L name=ipv6.allpref -L \"descr=%s\" -L \"monitor=%s\" -L cycle-id=%d -O first -O random", $options.scamper_port, inprogress_path, $activity_root, prefix_file, $options.cycle_duration, descr, $options.monitor, cycle_timestamp

  sh command

  File.rename inprogress_path, $finished_dir + "/" + cycle_filename

  if $options.notify_collector
    ArkUtil::ArkProducer.new($options.collector, $monitor, $topic) do |producer|
      data = {
        monitor: $monitor,
        activity: $activity,
        filename: cycle_filename
      }
      producer.produce(data)
    end
  end
end

#############################################################################
# MAIN
#############################################################################

## Re-adds ["DATA"] tuples into the global collector region for each finished
## warts file that exists on the current monitor so that they will be
## downloaded and processed.
##
## Be sure to do this before starting probing; otherwise, ["DATA"] tuples
## won't be in the right order and will cause problems.
FINISHED_FILE_RE = /^\d{10,}\.[^.]+\.l8\.\d{8}.warts(?:\.gz)?$/

if $options.notify_collector
  unsent = Dir.entries($finished_dir).grep(FINISHED_FILE_RE)
  if unsent.length > 0
    ArkUtil::ArkProducer.new($options.collector, $monitor, $topic) do |producer|
      unsent.sort.each do |filename|
        $log.info "sending completion notification for %s", filename
        data = {
          monitor: $monitor,
          activity: $activity,
          filename: filename
        }
        producer.produce(data)
      end
    end
  end
end

execute_measurements
