#!/usr/bin/ruby

#############################################################################
## Monitors and reports on successful data download by ark-collector.
##
## When ark-collector finishes downloading a file from a monitor, it writes
## a DOWNLOAD-STAT tuple into the collector tuple-space region.  This script
## does a take on those tuples and emails out daily summaries of successful
## download to ark-status@caida.org.  This sends out a separate email summary
## for each activity (e.g., team probing).
##
## Nov 2010 update:
##
##   sqlite3-ruby (1.3.2) on indy has an incompatible API or a bug
##   compared to sqlite3-ruby (1.2.5) on monsterzero.  If there are
##   multiple parameters to #execute or #get_first_value, then they
##   must be supplied as an array (if there's just one value, then
##   it can be provided as a direct value).
##
## --------------------------------------------------------------------------
## Copyright (C) 2008,2010 The Regents of the University of California.
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## 
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
## 
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
##
## $Id: ark-download-reporter,v 1.40 2014/07/17 02:17:13 youngh Exp $
#############################################################################

$0 = "ark-download-reporter"

require 'rubygems'
require 'ostruct'
require 'optparse'
require 'socket'
require 'syslog'
require 'fileutils'
require 'thread'
require 'net/http'
require 'json'

require 'sqlite3'

require 'arkutil/fileutils'
require 'arkutil/processutils'
require 'arkutil/generalutils'
require 'arkutil/interactiveutils'

Thread.abort_on_exception = true

$hostname = Socket::gethostname

$options = OpenStruct.new
$options.email_address = "ark-status@caida.org"
$options.staging = "/staging"
$options.use_tuple_space = true
$options.use_lock_file = true
$options.special_run = false
$options.collector = "127.0.0.1:9092"

opts = OptionParser.new
opts.on("--email-address", "=ADDRESS",
	"recipient email address (#{$options.email_address})") do |v|
  $options.email_address = v
end

opts.on("--staging", "=DIR",
	"staging dir for Ark data (#{$options.staging})") do |v|
  $options.staging = v
end

opts.on("--test-input", "=FILE",
        "process download stats from test input file") do |v|
  $options.test_input = v
  $options.special_run = true
end

opts.on("--report-downloads", TrueClass,
        "print out a daily report of all downloads") do |v|
  $options.report_downloads = v
  $options.use_lock_file = false
  $options.special_run = true
end

opts.on("--add-inactive", TrueClass,
        "interactively register an inactive monitor") do |v|
  $options.add_inactive = v
  $options.use_lock_file = false
  $options.special_run = true
end

opts.on("-d", "--[no-]detach", TrueClass,
	"run as detached (daemon) process") do |v|
  $options.detach = v
end

opts.on("--collector", "=ADDR", String,
    "collector server to fetch data from (#{$options.collector})") do |v|
  $options.collector = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

#===========================================================================

original_staging = $options.staging
$options.staging = File.expand_path $options.staging

unless File.directory? $options.staging
  $stderr.puts "ERROR: Ark staging directory '#{$options.staging}' " +
    ($options.staging == original_staging ? "" :
     "(expansion of '#{original_staging}') ") +
    "doesn't exist or is not a directory"
  exit 1
end

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

$options.prep_dir = $options.staging + "/.download-stats"

$options.state_db = $options.prep_dir + "/state.db"
$options.lock_file = $options.prep_dir + "/download-reporter.LOCK"

unless File.directory? $options.staging
  $stderr.puts "ERROR: data staging directory '#{$options.staging}' " +
    "doesn't exist or is not a directory"
  exit 1
end

FileUtils.mkpath $options.prep_dir unless File.directory? $options.prep_dir

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

if $options.special_run
  if $options.test_input
    $stderr.puts "Entering test mode: only processing download stats in test file\n"
  elsif $options.report_downloads
    $stderr.puts "Entering test mode: only reporting downloads\n"
  elsif $options.add_inactive
    $stderr.puts "Entering interactive mode to register an inactive monitor\n"
  end

  $stderr.puts "Ignoring --detach option\n" if $options.detach
  $options.use_tuple_space = false
  $options.detach = false
end


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def log_exception(exn)
  msg = exn.class.name + ": " + exn.to_s
  $log.err "aborting on exception: %s; backtrace: %s", msg,
    exn.backtrace.join(" <= ")
  msg
end

#===========================================================================

syslog_options = Syslog::LOG_PID
syslog_options |= Syslog::LOG_PERROR unless $options.detach
$log = Syslog.open("ark-download-reporter", syslog_options, Syslog::LOG_LOCAL0)
Syslog.mask = Syslog::LOG_UPTO(Syslog::LOG_DEBUG) if $options.verbose

Signal.trap("TERM") do
  $log.info "exiting on SIGTERM"
  exit 2
end

Signal.trap("INT")  do
  $log.info "exiting on SIGINT"
  exit 2
end

# See the comments at process_download_stats_from_tuple_space() for
# the SIGHUP exiting policy and procedure.

$exit_requested = false

Signal.trap("HUP") do
  $log.info "got SIGHUP; will exit at next opportunity"
  $exit_requested = true
end


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

if $options.use_tuple_space
  require 'rdkafka'

  collector_config = {
      :"bootstrap.servers" => $options.collector,
      :"client.id" => "ark-download-reporter",
      :"group.id" => "ark-download-reporter",
      :"enable.auto.commit" => false,
      :"enable.auto.offset.store" => false,
      :"auto.offset.reset" => "earliest",
      :"sasl.mechanism" => "SCRAM-SHA-256",
      :"sasl.username" => (ENV['ARK_DOWNLOAD_REPORTER_USERNAME'] || $0),
      :"sasl.password" => (ENV['ARK_DOWNLOAD_REPORTER_PASSWORD'] || $0),
      :"security.protocol" => "SASL_PLAINTEXT",
  }

  $g_collector = Rdkafka::Config.new(collector_config).consumer
  $g_collector.subscribe("ark-download-stat")
end

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

if $options.detach
  $stderr.puts "starting download reporter process in background\n"
  ArkUtil.daemonize()
  $log.info "started download reporter process in background"
end


#===========================================================================
#===========================================================================

def open_download_stats_database(&block)
  file_existed = File.exists? $options.state_db

  retval = SQLite3::Database.new $options.state_db
  unless retval
    $log.err "ERROR: couldn't open/create state database file '%s'",
      $options.state_db
    return nil
  end

  #retval.type_translation = true

  unless file_existed
    retval.execute_batch <<SCHEMA_SQL_EOF
CREATE TABLE download_log
(
  timestamp INTEGER NOT NULL,
  monitor   TEXT NOT NULL,
  activity  TEXT NOT NULL,
  file      TEXT NOT NULL,
  size      INTEGER NOT NULL
);

CREATE TABLE last_download_date
(
  monitor   TEXT NOT NULL,
  activity  TEXT NOT NULL,
  timestamp INTEGER NOT NULL,

  PRIMARY KEY (monitor, activity)
);

CREATE TABLE inactive_monitor
(
  monitor   TEXT NOT NULL,
  activity  TEXT NOT NULL,
  timestamp INTEGER NOT NULL,
  reason    TEXT NOT NULL,

  PRIMARY KEY (monitor, activity)
);

CREATE TABLE last_processed_date
(
  activity  TEXT NOT NULL PRIMARY KEY,
  timestamp INTEGER NOT NULL
);

SCHEMA_SQL_EOF
  end

  if block
    begin
      yield retval
    ensure
      retval.close
    end
  else
    retval
  end
end


def import_test_input(db)
  IO.foreach($options.test_input) do |line|
    line.chomp!
    column0, timestamp, monitor, activity, file, size = line.split
    timestamp = timestamp.to_i
    size = size.to_i
    insert_download_stat_txn db, timestamp, monitor, activity, file, size
  end

  #save_last_download_dates(db, "team-probing" => { "nrt-jp" => 1196974104, "dub-ie" => 1196974136 })
  #save_last_download_dates(db, "team-probing" => { "nrt-jp" => 1196974104 })

  generate_daily_reports db, ArkUtil.beginning_of_today_utc()
end


def add_inactive_monitor(db)
  end_timestamp = ArkUtil.beginning_of_today_utc()
  puts "To abort, hit ^C."

  monitor = activity = timestamp = reason = nil
  loop do
    activity = ArkUtil.prompt_and_get_line(
      "activity (hit enter for 'team-probing'):", true)
    activity = "team-probing" if activity == ""

    monitor = ArkUtil.prompt_and_get_line "monitor (e.g., san-us):"
    timestamp = ArkUtil.prompt_and_get_timestamp(
      "UTC date (YYYYMMDD) or timestamp:")
    reason = ArkUtil.prompt_and_get_paragraph "inactive reason:", 61

    puts "\n\nPreview: #{activity}"
    date = ArkUtil.utc_date_string timestamp
    period = ArkUtil.human_time_period(end_timestamp - timestamp)
    reason_indented = "         " + reason.gsub(/\n(\s*\S)/, "\n         \\1")
    printf("  %s for %s, since %s:\n\n%s\n\n",
           monitor, period, date, reason_indented)

    puts "\n"
    break if ArkUtil.prompt_and_get_line(
      "Is this correct (yes/no)?", false, true)
    puts "\nPlease try again."
  end

  db.execute "INSERT OR REPLACE INTO inactive_monitor VALUES(?, ?, ?, ?)",
    monitor, activity, timestamp, reason
  puts "Database updated."
end


# NOTE: We must be careful to exit cleanly and safely on SIGHUP.
#
#       We should never exit while in the midst of generating a daily report,
#       but we *can* exit between reports, even if a report is scheduled to
#       be generated (by having a lambda enqueued in {queue}).
#
#       Most importantly, we should *never* exit in the period between
#       taking a DOWNLOAD-STAT tuple from the tuple space and writing out
#       a log entry for it in the database.  Otherwise, we would lose
#       a download stat forever
#
#       The exit procedure implemented here is conservative, in that we could
#       theoretically exit faster, but at a higher cost in complexity and
#       fagility.  The procedure below only exits after all DOWNLOAD-STAT
#       tuples have been logged.
def process_download_stats_from_tuple_space(db)
  queue = Queue.new

  Thread.new do
    begin
      loop do
        if $exit_requested
          queue << lambda do
            $log.info "exiting on SIGHUP"
            exit 1
          end
          break #quit thread to ensure no more download tasks are ever enqueued
        end

        # the old code would block until a message was received, which would
        # make it impossible to exit until then - I'm assuming that wasn't
        # intended and it's ok to reset the loop to check for exit_requested
        # if we aren't in the middle of doing anything useful
        msg = $g_collector.poll(250)
        if ! msg
          next
        end

        begin
          data = JSON.parse(msg.payload)
        rescue JSON::ParserError, TypeError
          # TODO send to a dead message queue for later evaluation?
          # commit the message like any other, but do no other work
          $log.info "failed to parse message #{msg}"
          queue << lambda do
            $g_collector.store_offset(msg)
            $g_collector.commit
          end
          next
        end

        queue << lambda do
          insert_download_stat_txn(
                  db,
                  data["timestamp"],
                  data["monitor"],
                  data["activity"],
                  data["file"],
                  data["size"],
          )
          # commit the offset after the lambda has inserted the data
          $g_collector.store_offset(msg)
          $g_collector.commit
        end
      end
    rescue
      log_exception $!
    end
  end

  Thread.new do
    begin
      loop do
        # 2:15AM today or the next day UTC
        now = Time.now.to_i
        today = ArkUtil.beginning_of_today_utc()
        wakeup_time = today + 2 * 3600 + 15 * 60
        wakeup_time += 24 * 3600 if now >= wakeup_time
        sleep_amount = wakeup_time - now
        $log.debug "sleeping %d seconds till %s", sleep_amount,
          Time.at(wakeup_time).to_s

        ArkUtil.sleep_at_least sleep_amount
        queue << lambda do
          generate_daily_reports db, ArkUtil.beginning_of_today_utc()
        end
      end
    rescue
      log_exception $!
    end
  end

  # Main event loop.
  loop do
    queue.pop.call
  end
end


def insert_download_stat_txn(db, timestamp, monitor, activity, file, size)
  db.transaction do |txn|
    txn.execute "INSERT INTO download_log VALUES(?, ?, ?, ?, ?)",
      timestamp, monitor, activity, file, size

    last_timestamp = txn.get_first_value "SELECT timestamp FROM last_download_date WHERE monitor = ? AND activity = ?", monitor, activity

    # We *should* be processing timestamps in non-decreasing order, but
    # do the check here just to make sure as a precautionary measure.
    if last_timestamp == nil || timestamp > last_timestamp
      txn.execute "INSERT OR REPLACE INTO last_download_date VALUES(?, ?, ?)",
        monitor, activity, timestamp
    end
  end
end


def generate_daily_reports(db, end_timestamp)
  # activity => monitor => bytes total
  daily_summary = Hash.new { |h,k| h[k] = Hash.new 0 }
  earliest_timestamp = Hash.new { |h,k| h[k] = Hash.new }

  db.execute("SELECT * FROM download_log WHERE timestamp < ?",
             end_timestamp) do |row|
    timestamp, monitor, activity, file, size = row
    daily_summary[activity][monitor] += size

    previous_timestamp = earliest_timestamp[activity][monitor]
    if previous_timestamp == nil || previous_timestamp > timestamp
      earliest_timestamp[activity][monitor] = timestamp
    end
  end

  p daily_summary if $options.verbose

  last_download_dates = load_last_download_dates db
  inactive_monitors = load_inactive_monitors db

  begin_timestamp = end_timestamp - 24 * 3600
  begin_date = ArkUtil.utc_date_string begin_timestamp

  begin
    uri = URI("https://api.arkmon.caida.org/public/monitors")
    metadata = JSON.parse(Net::HTTP.get(uri))
  rescue
    metadata = {}
  end

  (daily_summary.keys + last_download_dates.keys).uniq.sort.each do |activity|
    report = []  # text of the report email body
    report << "========== #{begin_date} ==========\n\n"

    missing_data = {}  # monitor => timestamp
    last_download_dates[activity].each do |monitor, timestamp|
      # exclude monitors that have data, or last reported more than 28 days ago,
      # or are explicitly dead, or have the activity disabled
      unless (daily_summary[activity].has_key? monitor) ||
             (end_timestamp - timestamp > 28 * 24 * 60 * 60) ||
             (!monitor_enabled?(metadata, monitor, activity))
        missing_data[monitor] = timestamp
      end
    end

    p missing_data if $options.verbose

    unless missing_data.empty?
      missing_count = ArkUtil.count_with_unit missing_data.size, "monitor"
      report << "MISSING DATA: #{missing_count}\n\n"
      missing_data.to_a.sort_by {|x| x[1]}.reverse.each do |monitor, timestamp|
        date = Time.at(timestamp).to_s
        period = ArkUtil.human_time_period(end_timestamp - timestamp)
        report << "  #{monitor} for #{period} (since #{date})\n"
      end
      report << "\n"
    end

    daily_count =
      ArkUtil.count_with_unit daily_summary[activity].size, "monitor"
    report << "DAILY DOWNLOADS: #{daily_count}\n\n"
    daily_summary[activity].to_a.sort_by { |x| x[1] }.each do |monitor, bytes|
      last_timestamp = last_download_dates[activity][monitor]
      date = Time.at(last_timestamp).to_s
      period = ArkUtil.human_time_period(end_timestamp - last_timestamp)
      megabytes = bytes / 1_048_576.0
      report << sprintf("  %s: %6.1fMB  (%s ago on %s)\n",
                        monitor, megabytes, period, date)
    end

    # activity => monitor => { :timestamp, :reason }
    inactive_count =
      ArkUtil.count_with_unit inactive_monitors[activity].size, "monitor"
    if inactive_monitors[activity].size > 0
      report << "\nINACTIVE: #{inactive_count}\n\n"
      inactive_monitors[activity].to_a.sort { |x, y|
        y[1][:timestamp] <=> x[1][:timestamp] }.each do |monitor, info|
        date = ArkUtil.utc_date_string info[:timestamp]
        period = ArkUtil.human_time_period(end_timestamp - info[:timestamp])
        reason = "         " + info[:reason].gsub(/\n(\s*\S)/, "\n         \\1")
        report << sprintf("  %s for %s, since %s:\n\n%s\n\n",
                          monitor, period, date, reason)
      end
    end

    puts report.join("") if $options.verbose || $options.report_downloads

    unless $options.report_downloads
      send_email activity, report.join("")
      clear_download_log db, activity, end_timestamp
    end
  end
end

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def count_download_log_entries(db, end_timestamp)
  db.get_first_value("SELECT count(*) FROM download_log WHERE timestamp < ? ",
                      end_timestamp).to_i
end


def clear_download_log(db, activity, end_timestamp)
  db.transaction do |txn|
    txn.execute "DELETE FROM download_log WHERE activity = ? AND timestamp < ?",
      activity, end_timestamp

    txn.execute "INSERT OR REPLACE INTO last_processed_date VALUES(?, ?)",
      activity, end_timestamp
  end
end


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def load_last_download_dates(db)
  retval = Hash.new { |h,k| h[k] = Hash.new }  # activity => monitor => tstamp
  db.execute("SELECT * FROM last_download_date") do |row|
    monitor, activity, timestamp = row
    retval[activity][monitor] = timestamp.to_i
  end
  retval
end


# dates: activity => monitor => timestamp
def save_last_download_dates(db, dates)
  db.prepare("INSERT OR REPLACE INTO last_download_date VALUES(?, ?, ?)") do
    |statement|
    dates.each do |activity, monitor_hash|
      monitor_hash.each do |monitor, timestamp|
        statement.execute monitor, activity, timestamp
      end
    end
  end
end


def update_last_download_date(db, monitor, activity, timestamp)
  db.execute "INSERT OR REPLACE INTO last_download_date VALUES(?, ?, ?)",
    monitor, activity, timestamp
end


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def load_last_processed_dates(db)
  retval = {}
  db.execute("SELECT * FROM last_processed_date") do |row|
    activity, timestamp = row
    retval[activity] = timestamp.to_i
  end
  retval
end


def save_last_processed_dates(db, dates)
  db.prepare("INSERT OR REPLACE INTO last_processed_date VALUES(?, ?)") do
    |statement|
    dates.each do |activity, timestamp|
      statement.execute activity, timestamp
    end
  end
end


def update_last_processed_date(db, activity, timestamp)
  db.execute "INSERT OR REPLACE INTO last_processed_date VALUES(?, ?)",
    activity, timestamp
end


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def load_inactive_monitors(db)
  # activity => monitor => { :timestamp, :reason }
  retval = Hash.new { |h,k| h[k] = Hash.new { |h2,k2| h2[k2] = Hash.new } }
  db.execute("SELECT * FROM inactive_monitor") do |row|
    monitor, activity, timestamp, reason = row
    retval[activity][monitor][:timestamp] = timestamp.to_i
    retval[activity][monitor][:reason] = reason
  end
  retval
end

# rather than the separate "inactive" database table, use the current state
# from the dory database to determine if a node should be considered missing
def monitor_enabled?(metadata, monitor, activity)
  # dory has different ideas about what activities are called, so translate
  activity_api_names = {
    "prefix-probing" => "IPv4 prefix probing",
    "team-probing" => "IPv4 team probing",
    "topo-v6" => "IPv6 probing",
  }

  # assume disabled if we don't know anything about the activity
  activity = activity_api_names[activity]
  if activity == nil
    return false
  end

  # the public monitors endpoint only returns "Active" monitors by default,
  # so assume disabled if we can't find the node
  info = metadata.find { |m| m["node"] == monitor }
  if info == nil
    return false
  end

  # otherwise a node is enabled if it isn't dead, and has the activity enabled
  if (info["status"] != "Dead") && (info["activities"]&.include? activity)
    return true
  end

  false
end


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def send_email(activity, body)
  begin
    # Works on FreeBSD:
    # command = "mail -s '#{activity} daily downloads'" +
    #  " #{$options.email_address} -F 'Ark Download Reporter'" +
    #  " -f #{$options.email_address}"

    # Works on Linux:
    command = "mail -s '#{activity} daily downloads'" +
      " -a 'From: #{$options.email_address}' #{$options.email_address}"
    IO.popen(command, "w") do |pipe|
      pipe.puts body
    end
      
    unless $?.success?
      $log.err "ERROR: couldn't email report: command failed (%p): %s",
        $?, command
    end        

  rescue
    msg = $!.class.name + ": " + $!.to_s
    $log.err "ERROR: couldn't email report: got exception: %s; backtrace: %s",
      msg, $!.backtrace.join(" <= ")
  end
end



#############################################################################
# MAIN
#############################################################################

if $options.use_lock_file
  lock = ArkUtil.obtain_exclusive_lock $options.lock_file
  unless lock
    $log.err "ERROR: couldn't obtain lock on '%s'", $options.lock_file
    exit 1
  end
end

open_download_stats_database do |db|
  if $options.test_input
    import_test_input db
  elsif $options.report_downloads
    end_timestamp = ArkUtil.beginning_of_today_utc() + 24*3600
    generate_daily_reports db, end_timestamp
  elsif $options.add_inactive
    add_inactive_monitor db
  else
    end_timestamp = ArkUtil.beginning_of_today_utc()
    if count_download_log_entries(db, end_timestamp) > 0
      generate_daily_reports db, end_timestamp
    end
    process_download_stats_from_tuple_space db
  end
end

if $options.use_lock_file
  ArkUtil.release_exclusive_lock lock
end
