#!/usr/bin/ruby

#############################################################################
## Transfer files to or from Ark monitors using rsync.
##
## This reads monitor information from
## cvs:WIP/topology/archipelago/initialize-ts/monitors.yaml
##
## You should either have monitors.yaml in the current working directory,
## or have the environmental variable ARKUTIL_SPEC_PATH set to the path
## of the monitors.yaml file.
##
## $Id: arksync,v 1.27 2017/11/28 22:48:37 youngh Exp $
#############################################################################

require 'rubygems'
require 'ostruct'
require 'optparse'
require 'monitor'

require 'arkutil/generalutils'
require 'arkutil/monitorutils'
require 'arkutil/management'
require "arkutil/rsync"

$options = OpenStruct.new
$options.force = false
$options.substitute = true
$options.parallel = true
$options.max_slow_attempts = 1
$options.max_fast_attempts = 3
$options.ssh_port = false

opts = OptionParser.new
opts.banner = "Usage: #{$0} [options] -- [rsync-options] <path1> <path2> ..."

ArkUtil.configure_monitor_spec_option_parser $options, opts

opts.on("-f", TrueClass,
        "don't do fast/slow retries (#{$options.force})") do |v|
  $options.force = true
end

opts.on("-i", String, "=FILE",
        "set the ssh '-i identity_file' option") do |v|
  $options.ssh_identity_file = v
end

opts.on("--[no-]substitute", TrueClass,
        "substitute variables %MON, %IP, %OS (true)") do |v|
  $options.substitute = v
end

opts.on("--[no-]parallel", TrueClass,
        "perform rsyncs in parallel (true)") do |v|
  $options.parallel = v
end

opts.on("--max-slow-attempts", Integer, "=NUM",
        "max 'slow' retry attempts (15=24 hours) " +
        "(#{$options.max_slow_attempts})") do |v|
  $options.max_slow_attempts = v
end

opts.on("--max-fast-attempts", Integer, "=NUM",
        "max 'fast' retry attempts (#{$options.max_fast_attempts})") do |v|
  $options.max_fast_attempts = v
end

opts.on("--[no-]ssh-port", TrueClass,
        "explicitly specify ssh port to use to rsync") do |v|
  $options.ssh_port = v
end

opts.on("--[no-]test", TrueClass,
        "only show, not execute, the final rsync command") do |v|
  $options.test = v
end

opts.on("--[no-]debug", TrueClass, "enable debugging output") do |v|
  $options.debug = v
end

opts.on("-v", "--[no-]verbose", TrueClass,  "show detailed progress") do |v|
  $options.verbose = v
end

begin
  ARGV.replace opts.parse(*ARGV)
rescue OptionParser::ParseError
  $stderr.puts "ERROR: " + $!.to_s
  $stderr.puts opts
  exit 1
end

ArkUtil.validate_monitor_spec_options $options

if $options.force
  $options.max_slow_attempts = 1
  $options.max_fast_attempts = 1
end

$stdout.sync = true  # for progress output

#============================================================================

class TransferStats

  include MonitorMixin

  def initialize
    super   # NOTE: needed to make MonitorMixin work
    @skipped = [] # [ monitor ]
    @active = {}  # monitor => true
    @success = {} # monitor => [ msg, ... ]
    @failure = {} # monitor => [ result, stderr ]
    @last_progress = nil
  end

  def add_skipped(monitor)
    synchronize do
      @skipped << monitor
    end
  end

  def add_active(monitor)
    synchronize do
      @active[monitor] = true
    end
  end    

  def add_success(monitor, *stats)
    synchronize do
      @active.delete monitor
      @success[monitor] = stats
    end
  end

  def add_failure(monitor, result, stderr)
    synchronize do
      @active.delete monitor
      @failure[monitor] = [result, stderr]
    end
  end

  def success?
    @failure.empty?
  end

  def print_progress
    synchronize do
      progress = [@active.size, @success.size, @failure.size]
      unless progress == @last_progress
        @last_progress = progress
#        printf ">> %d active, %d finished, %d failed\n",
#          @active.size, @success.size, @failure.size
        printf ">> %d active: %s\n", @active.size,
          @active.keys.sort.join(",")
        printf ">> %d finished: %s\n", @success.size,
          @success.keys.sort.join(",")
        printf ">> %d failed: %s\n", @failure.size,
          @failure.keys.sort.join(",")
      end

      return @active.size
    end
  end

  def print
    hrule = "=" * 70

    unless @success.empty?
      printf "\n%s\nSUCCESSES\n%s\n\n", hrule, hrule
      @success.sort.each do |monitor, data|
        data.each do |line|
          printf "%s: %s\n", monitor, line
        end
      end
    end

    unless @failure.empty?
      printf "\n%s\nFAILURES\n", hrule
      @failure.sort.each do |monitor, data|
        result, stderr = data
        printf "%s\n%s: transfer failed: %p\n\n%s\n", hrule,
          monitor, result, stderr
      end
    end

    puts
    puts "." * 70
    printf "\n%s succeeded, %d failed, %s skipped.\n",
      ArkUtil.count_with_unit(@success.size, "transfer", "s"), @failure.size,
      ArkUtil.count_with_unit(@skipped.length, "monitor", "s")
    printf ">> succeeded: %s\n", @success.keys.sort.join(",")
    printf ">> failed: %s\n", @failure.keys.sort.join(",")
    printf ">> skipped: %s\n", @skipped.join(",")
  end

end

$stats = TransferStats.new


#============================================================================

class MinimalRsyncStateDelegate

  attr_reader :xfer, :sent_bytes, :received_bytes, :rate, :size, :speedup

  def initialize
    @xfer = 1
    @check_left = 0
    @check_total = 0
    @transfer_summary = nil
    @total_summary = nil
  end

  def slow_attempt(state, attempts)
    if attempts == 1
      printf "%s: rsync started\n", state.id, attempts 
    else
      printf "%s: %d slow attempts\n", state.id, attempts 
    end
  end

  def fast_attempt(state, attempts)
    if attempts > 1 || $options.debug
      printf "%s: %d fast attempts\n", state.id, attempts
    end
  end

  def progress(state, bytes, percent, xfer, check_left, check_total)
    @xfer = xfer if xfer
    @check_left = check_left if check_left
    @check_total = check_total if check_total
    printf "%s: %s\n", state.id, state.progress
  end

  def transfer_stats(state, sent_bytes, received_bytes, rate)
    @sent_bytes = sent_bytes
    @received_bytes = received_bytes
    @rate = rate
    printf "%s: %s\n", state.id, state.transfer_stats
  end

  def total_stats(state, size, speedup)
    @size = size
    @speedup = speedup
    printf "%s: %s\n", state.id, state.total_stats
  end

  def current_file(state, name)
    printf "%s: ... %s\n", state.id, name
  end

  def unknown_line(state, line)
    printf "??? %s: %p\n", state.id, line
  end

  def summarize_success
    sprintf ">%d, <%d, =%d @ %0.f Kb/s, %0.f speedup",
      @sent_bytes, @received_bytes, @size, @rate / 1000.0, @speedup
  end

end


#============================================================================

def create_dev_rsync_options
  options = {}
  options[:initial_slow_timeout] = 10 # seconds
  options[:max_slow_timeout] = 20 # seconds
  options[:slow_timeout_multiplier] = 2
  options[:max_slow_attempts] = 2

  options[:initial_connect_delay] = 5 # seconds
  options[:max_connect_delay] = 10 # seconds
  options[:connect_delay_multiplier] = 2

  options[:initial_fast_delay] = 4 # seconds
  options[:max_fast_delay] = 8 # seconds
  options[:fast_delay_multiplier] = 2
  options[:max_fast_attempts] = 2

  options
end


def execute(monitor, command)
  begin
    puts "CMD>> " + command if $options.verbose
    delegate = MinimalRsyncStateDelegate.new

    state_options = {}
    state_options[:debug] = true if $options.debug
    state = ArkUtil::RsyncState.new monitor, delegate, state_options

    # XXX don't really need a separate Rsync instance per rsync execution
#    rsync_options = create_dev_rsync_options()
    rsync_options = {}
    rsync_options[:max_slow_attempts] = $options.max_slow_attempts
    rsync_options[:max_fast_attempts] = $options.max_fast_attempts
    rsync_options[:debug] = true if $options.debug
    rsync = ArkUtil::Rsync.new rsync_options

    msg = nil
    result = rsync.execute command, state
    case result
    when true
      msg = "success"
      summary = ArkUtil.count_with_unit(delegate.xfer, "file", "s") +
        "; " + delegate.summarize_success()
      $stats.add_success monitor, summary

    when false
      msg = "file(s) to transfer are missing"

    else
      msg = "transfer failed"
      $stats.add_failure monitor, result, state.stderr
    end

    printf "%s: RESULT: %s\n", monitor, msg

  rescue
    msg = sprintf "arksync(%s): unexpected exception: %s: %s\n%s\n",
      monitor, $!.class.name, $!.to_s, $!.backtrace.join(" <= ")
    printf "%s", msg
    $stats.add_failure monitor, "uncaught exception", msg
  end
end


# Returns the binary-compatibility OS type string for a given OS version.
def self.os_type(version)
  case version
  when /freebsd[79]/ then "fbsd71"
  when "freebsd6.2", "freebsd6.4" then "fbsd62"
  when "freebsd4.6" then "fbsd46"
  when /linux/ then "linux"
  when /ubuntu/ then "linux"
  else fail "ERROR: unknown OS version " + version
  end
end


#============================================================================
# MAIN
#============================================================================

threads = []

monitors = ArkUtil.process_monitor_spec_options $options
monitors.sort.each do |monitor, info|
  ArkUtil.print_banner monitor unless $options.parallel

  argv = []
  ARGV.each do |arg|
    if $options.substitute
      arg = arg.gsub(/%MON/, monitor).
        gsub(/%OS/, os_type(info["os_version"])).
        gsub(/%IP/, info["ip_address"])
    end

    arg = "\"" + arg + "\"" if arg.index " "
    argv << arg
  end

  ssh_options = "-o \"BatchMode yes\""

  if $options.ssh_port && info["ssh_port"] != 22
    ssh_options += " -p " + info["ssh_port"].to_s
  end

  if $options.ssh_identity_file
    if $options.ssh_identity_file.index " "
      $options.ssh_identity_file = "\"" + $options.ssh_identity_file + "\""
    end
    ssh_options += " -i " + $options.ssh_identity_file
  end

  command = "rsync -e 'ssh #{ssh_options}' -av --progress --timeout=120 " + argv.join(" ")

  if $options.test
    puts "TEST>> " + command
  else
    $stats.add_active monitor
    if $options.parallel
      threads << Thread.new(monitor, command) do |monitor, command|
        execute monitor, command
      end
    else
      execute monitor, command
    end
  end
end

if $options.parallel
  threads << Thread.new do
    loop do
      sleep 1
      active = $stats.print_progress()
      break if active == 0
    end
  end
  
  threads.each do |thr|
    thr.join
  end
end

$stats.print
exit($stats.success? ? 0 : 1)
