#!/usr/bin/env python3
"""
ark-data-lint - Sanity-check CAIDA Ark warts files.

Per-VP aggregation across all provided warts files.  Flags VPs whose:
  - TTL rewrite rate exceeds --ttl-rewrite-threshold  (TTL)
  - response rate falls below --response-threshold    (RESP)
  - fully-silent trace fraction exceeds --empty-threshold (EMPTY)
  - leading hops are silent in >= --silent-threshold of
    intermediate-responding traces                    (SILENT)
  - one or more files could not be read               (ERR)

A VP can carry any combination of these flags.  See the legend
appended to each report for column meanings.

Run with --help for the full option list.

Usage:
    python3 ark-data-lint [options] <file> [<file> ...]
    python3 ark-data-lint [options] --ipv4-dir PATH --ipv6-dir PATH
"""

import argparse
import enum
import glob
import os
import subprocess
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from datetime import date, timedelta
from typing import NamedTuple

try:
    from scamper import ScamperFile, ScamperTrace, ScamperTraceStop
except ImportError:
    print(
        "ERROR: scamper Python module not found.\n"
        "Build from https://www.caida.org/catalog/software/scamper/",
        file=sys.stderr,
    )
    sys.exit(1)


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

WARTS_EXTENSIONS = (".warts", ".warts.gz")

# Leading dot-component that identifies the IPv6 prefix-probing
# filename format; used to distinguish it from the IPv4 team-probing
# format when parsing filenames.
IPV6_FILENAME_PREFIX = "topo-v6"

# Stop reasons that indicate the destination produced a terminal
# reply.  Completed = probe reached destination; Unreach = ICMP
# unreachable from destination; Icmp = other ICMP error (typically
# from destination or very near it).  All other stop reasons
# (GapLimit, Loop, HopLimit, NoReason, GSS, Halted, Error,
# InProgress) indicate no response from the destination.
RESPONDING_STOP_REASONS = frozenset({
    ScamperTraceStop.Completed,
    ScamperTraceStop.Unreach,
    ScamperTraceStop.Icmp,
})

# Flag labels shown in the report and counted in the summary.
# Centralized so the help text, vp_flags(), and summary stay in sync
# if new checks are added later.
FLAG_TTL = "TTL"
FLAG_RESP = "RESP"
FLAG_EMPTY = "EMPTY"
FLAG_SILENT = "SILENT"
FLAG_ERR = "ERR"

def build_legend(silent_threshold: float) -> str:
    """Build the report legend."""
    pct = f"{silent_threshold * 100:.0f}%"
    return (
        "  TRACES     total traces aggregated for this VP\n"
        "  TTL%       fraction of TTL-eligible traces with a"
        " quoted-TTL rewrite\n"
        "  RESP%      fraction of traces where the destination"
        " replied\n"
        "  EMPTY%     fraction of traces with no responding hop at"
        " all\n"
        "  FIRSTHOP   configured starting TTL for this VP's probing"
        " (comma-\n"
        "             separated if it differed across files)\n"
        "  SILENT     number of leading hops, counting from"
        " FIRSTHOP, that are\n"
        f"             silent in at least {pct} of traces\n"
        "  ERR        flag: one or more of the VP's files could not"
        " be read\n"
        "             (count shown next to the flags)"
    )


# ---------------------------------------------------------------------------
# Trace classification
# ---------------------------------------------------------------------------


class TtlCheck(enum.Enum):
    """Outcome of the TTL rewrite check on a single trace."""

    REWRITTEN = "rewritten"
    # A terminal ICMP error was found, and its quoted TTL exceeded
    # the probe TTL by more than the tolerance.

    NO_REWRITE = "no_rewrite"
    # A terminal ICMP error was found; its quoted TTL was within
    # tolerance.

    NO_TERMINAL_HOP = "no_terminal_hop"
    # No terminal ICMP error hop was found in this trace, so the
    # TTL check cannot be applied.  This is independent of whether
    # the destination responded: e.g. a stop_reason=Completed trace
    # typically has no terminal ICMP error hop but did reach the
    # destination.


class TraceResult(NamedTuple):
    """Independent per-trace signals extracted in a single pass."""
    ttl: TtlCheck
    responded: bool
    # Zero-indexed hop number of the first intermediate-router (TTL-
    # exceeded) response.  None if no such hop exists in the trace --
    # which excludes the trace from the SILENT check entirely.
    first_intermediate_idx: int | None
    # Starting TTL configured for this trace's probing (typically 1).
    firsthop: int
    # trace.stop_reason_str, e.g. "GapLimit", "Completed", "Unreach".
    # Aggregated into VpStats.stop_reasons for --debug RESP details.
    stop_reason_str: str
    # True if no hop in the trace responded at all (no intermediate
    # and no destination reply) -- a fully-silent trace, typically
    # aborted at the gap limit.  Distinct from the SILENT check,
    # which is about leading silence in traces that do eventually
    # respond.
    empty: bool


# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------


@dataclass
class VpStats:
    vp: str
    traces: int = 0
    ttl_rewrites: int = 0
    ttl_no_terminal: int = 0  # traces with no terminal ICMP error hop
    responding: int = 0       # traces where destination replied
    errors: int = 0           # exceptions during processing
    # firsthop values observed across this VP's traces.  Normally a
    # single value (uniform per probing config); kept as a Counter so
    # the rare case of a config change across files surfaces in the
    # report instead of being silently flattened.
    firsthop_values: Counter = field(default_factory=Counter)
    # Leading-silence depth distribution over intermediate-responding
    # traces.  Depth = the count of silent hops before the first
    # intermediate (TTL-exceeded, non-destination) response:
    # first_intermediate_idx - (firsthop - 1).  Depth 0 means the
    # first probed hop responded; depth k means k silent leading
    # hops.  Computed per-trace so a VP whose firsthop differs across
    # files still aggregates correctly.  SILENT is derived from this.
    depths: Counter = field(default_factory=Counter)
    # Count of intermediate-responding traces: those with at least
    # one intermediate-router (TTL-exceeded) response.  Equals
    # sum(depths.values()); the SILENT denominator.  Traces whose
    # only response is the destination's, and fully-silent traces,
    # are not counted here -- they say nothing about near-network
    # router behaviour.
    intermediate: int = 0
    # Count of fully-silent traces: no hop responded at all (no
    # intermediate and no destination reply).  EMPTY% = empty /
    # traces.
    empty: int = 0
    # Histogram of trace.stop_reason_str values seen for this VP.
    # Populated unconditionally; only shown when --debug and the VP
    # is flagged RESP or EMPTY.
    stop_reasons: Counter = field(default_factory=Counter)

    @property
    def empty_frac(self) -> float:
        """Fraction of all traces with no responding hop at all."""
        return (
            self.empty / self.traces
            if self.traces > 0 else 0.0
        )

    @property
    def ttl_eligible(self) -> int:
        """Traces where the TTL check could produce a verdict."""
        return self.traces - self.ttl_no_terminal

    @property
    def ttl_rewrite_rate(self) -> float:
        """Fraction of TTL-eligible traces that were rewritten."""
        return (
            self.ttl_rewrites / self.ttl_eligible
            if self.ttl_eligible > 0 else 0.0
        )

    @property
    def response_rate(self) -> float:
        """Fraction of all traces where the destination replied."""
        return (
            self.responding / self.traces
            if self.traces > 0 else 0.0
        )

    def silent_depth(self, threshold: float) -> int | None:
        """
        Number of leading hops (counting from firsthop) that are
        silent in at least `threshold` of intermediate-responding
        traces.

        Walks outward from the first probed hop.  At hop k the
        "still-silent" fraction is the share of intermediate traces
        whose first response is at depth >= k (i.e. silent through
        every hop up to k).  That fraction only decreases with k, so
        there is a single crossing of the threshold; SILENT is the
        last hop at or above it.  Returns None when the VP has no
        intermediate-responding traces (nothing to measure on).
        """
        n = self.intermediate
        if n == 0:
            return None
        # ge = count of traces with depth >= k, walked upward.
        ge = n - self.depths.get(0, 0)   # depth >= 1
        k = 1
        silent = 0
        while ge / n >= threshold and ge > 0:
            silent = k
            ge -= self.depths.get(k, 0)  # depth >= k + 1
            k += 1
        return silent

    @property
    def firsthop_str(self) -> str:
        """Display string for the firsthop column.  Single value
        when uniform; comma-separated distinct values when not."""
        if not self.firsthop_values:
            return "N/A"
        return ",".join(
            str(v) for v in sorted(self.firsthop_values)
        )


@dataclass
class DatasetResult:
    """Results for one dataset (ipv4, ipv6, or explicit files)."""
    label: str
    directory: str  # source directory, or "(command line)"
    target_date: str  # YYYY-MM-DD or ""
    file_count: int
    vp_stats: list[VpStats] = field(default_factory=list)


# ---------------------------------------------------------------------------
# Filename parsing and filtering
# ---------------------------------------------------------------------------


def is_warts_file(path: str) -> bool:
    """Return True if the path has a warts file extension."""
    return path.endswith(WARTS_EXTENSIONS)


def vp_from_filename(path: str) -> str:
    """
    Extract the VP name from a warts filename.

    IPv4 format: {vp}.team-probing.{cycle}.{date}.warts.gz
      -> first dot-separated component

    IPv6 format: topo-v6.l8.{date}.{timestamp}.{vp}.warts.gz
      -> last dot-separated component before .warts.gz
    """
    # removesuffix is a no-op when the suffix isn't present; chaining
    # handles both .warts and .warts.gz without an explicit branch.
    base = (
        os.path.basename(path)
        .removesuffix(".gz")
        .removesuffix(".warts")
    )
    parts = base.split(".")
    if parts[0] == IPV6_FILENAME_PREFIX:
        return parts[-1]
    return parts[0]


def date_from_ipv6_filename(path: str) -> str | None:
    """
    Extract the date string from an IPv6 warts filename.

    Format: topo-v6.l8.{YYYYMMDD}.{timestamp}.{vp}.warts.gz
    Returns the YYYYMMDD string, or None if parsing fails.
    """
    base = (
        os.path.basename(path)
        .removesuffix(".gz")
        .removesuffix(".warts")
    )
    parts = base.split(".")
    if len(parts) >= 3 and parts[0] == IPV6_FILENAME_PREFIX:
        return parts[2]
    return None


# ---------------------------------------------------------------------------
# Directory discovery
# ---------------------------------------------------------------------------


def discover_ipv4_files(
    base_dir: str, target_date: date
) -> list[str]:
    """
    Find warts files for a specific date under the IPv4
    team-probing directory structure.

    Structure: {base_dir}/YYYY/cycle-YYYYMMDD/*.warts.gz
    """
    date_str = target_date.strftime("%Y%m%d")
    year_str = target_date.strftime("%Y")
    cycle_dir = os.path.join(
        base_dir, year_str, f"cycle-{date_str}"
    )
    if not os.path.isdir(cycle_dir):
        print(
            f"WARNING: IPv4 cycle directory not found: "
            f"{cycle_dir}",
            file=sys.stderr,
        )
        return []
    files = sorted(
        f for f in glob.glob(os.path.join(cycle_dir, "*"))
        if is_warts_file(f)
    )
    if not files:
        print(
            f"WARNING: no warts files in {cycle_dir}",
            file=sys.stderr,
        )
    return files


def discover_ipv6_files(
    base_dir: str, target_date: date
) -> list[str]:
    """
    Find warts files for a specific date under the IPv6
    prefix-probing directory structure.

    Structure: {base_dir}/YYYY/MM/<files with date in name>
    Files: topo-v6.l8.YYYYMMDD.{timestamp}.{vp}.warts.gz
    """
    date_str = target_date.strftime("%Y%m%d")
    year_str = target_date.strftime("%Y")
    month_str = target_date.strftime("%m")
    month_dir = os.path.join(base_dir, year_str, month_str)
    if not os.path.isdir(month_dir):
        print(
            f"WARNING: IPv6 month directory not found: "
            f"{month_dir}",
            file=sys.stderr,
        )
        return []
    files = sorted(
        f for f in glob.glob(os.path.join(month_dir, "*"))
        if is_warts_file(f)
        and date_from_ipv6_filename(f) == date_str
    )
    if not files:
        print(
            f"WARNING: no IPv6 warts files for {date_str} in "
            f"{month_dir}",
            file=sys.stderr,
        )
    return files


# ---------------------------------------------------------------------------
# Detection logic
# ---------------------------------------------------------------------------


def _classify_ttl(
    trace: ScamperTrace, ttl_tolerance: int
) -> TtlCheck:
    """Classify one trace for the TTL rewrite check."""
    found_terminal = False
    for hop in trace.hops():
        if hop is None:
            continue
        if not hop.is_icmp_q():
            continue
        if hop.is_icmp_ttl_exp():
            # Mid-path TTL-exceeded responses quote the probe too,
            # but icmp_q_ttl is ~0 by design there.  We need the
            # terminal error where icmp_q_ttl reflects the arrival
            # TTL.
            continue

        probe_ttl = hop.probe_ttl
        icmp_q_ttl = hop.icmp_q_ttl
        if probe_ttl is None or icmp_q_ttl is None:
            # No usable TTL fields: this hop cannot produce a
            # verdict, so it must not mark the trace as eligible.
            continue
        found_terminal = True
        if icmp_q_ttl > probe_ttl + ttl_tolerance:
            return TtlCheck.REWRITTEN

    return (
        TtlCheck.NO_REWRITE if found_terminal
        else TtlCheck.NO_TERMINAL_HOP
    )


def _scan_leading(trace: ScamperTrace) -> tuple[int | None, bool]:
    """Find the first intermediate response and whether any hop answered."""
    start = max(0, trace.firsthop - 1)
    stop = trace.stop_hop
    dst = trace.dst
    any_response = False
    for i in range(start, stop):
        hop = trace.hop(i)
        if hop is None:
            continue
        any_response = True
        if hop.is_icmp_ttl_exp() and hop.src != dst:
            return i, True
    return None, any_response


def check_trace(
    trace: ScamperTrace, ttl_tolerance: int
) -> TraceResult:
    """Extract all per-trace signals in one pass."""
    ttl = _classify_ttl(trace, ttl_tolerance)
    responded = trace.stop_reason in RESPONDING_STOP_REASONS
    first_intermediate_idx, any_response = _scan_leading(trace)
    return TraceResult(
        ttl=ttl,
        responded=responded,
        first_intermediate_idx=first_intermediate_idx,
        firsthop=trace.firsthop,
        stop_reason_str=trace.stop_reason_str,
        empty=not any_response,
    )


# ---------------------------------------------------------------------------
# File processing
# ---------------------------------------------------------------------------


def process_file(
    path: str, stats: VpStats, ttl_tolerance: int
) -> None:
    """Read one warts file and update `stats` in place."""
    try:
        with ScamperFile(path, filter_types=[ScamperTrace]) as f:
            for trace in f:
                stats.traces += 1
                result = check_trace(trace, ttl_tolerance)
                if result.ttl is TtlCheck.REWRITTEN:
                    stats.ttl_rewrites += 1
                elif result.ttl is TtlCheck.NO_TERMINAL_HOP:
                    stats.ttl_no_terminal += 1
                # NO_REWRITE: eligible trace with no rewrite seen.
                if result.responded:
                    stats.responding += 1
                if result.empty:
                    stats.empty += 1
                stats.firsthop_values[result.firsthop] += 1
                stats.stop_reasons[result.stop_reason_str] += 1
                # SILENT is measured only over traces that have at
                # least one intermediate-router (TTL-exceeded)
                # response; see _scan_leading for the rationale.  For
                # each, record the leading-silence depth (count of
                # silent hops before that first response):
                #   depth 0  -> the first probed hop responded
                #   depth k  -> k silent leading hops
                # Depth is firsthop-relative, so VPs with different
                # firsthop settings are directly comparable and a
                # mixed-firsthop VP still aggregates correctly.
                if result.first_intermediate_idx is not None:
                    stats.intermediate += 1
                    depth = (
                        result.first_intermediate_idx
                        - result.firsthop + 1
                    )
                    stats.depths[depth] += 1
    except RuntimeError as exc:
        print(
            f"WARNING: could not process {path}: {exc}",
            file=sys.stderr,
        )
        stats.errors += 1


def analyze_files(
    files: list[str],
    ttl_tolerance: int,
) -> list[VpStats]:
    """
    Process a list of warts files and return per-VP stats.

    Only files with warts extensions are processed; others are
    silently skipped.
    """
    vp_files: dict[str, list[str]] = defaultdict(list)
    for path in files:
        if not is_warts_file(path):
            continue
        vp_files[vp_from_filename(path)].append(path)

    vp_stats_list: list[VpStats] = []
    for vp, paths in sorted(vp_files.items()):
        stats = VpStats(vp=vp)
        for path in sorted(paths):
            process_file(path, stats, ttl_tolerance)
        vp_stats_list.append(stats)
    return vp_stats_list


# ---------------------------------------------------------------------------
# Reporting
# ---------------------------------------------------------------------------


def vp_flags(
    s: VpStats,
    ttl_rewrite_threshold: float,
    response_threshold: float,
    empty_threshold: float,
    silent_threshold: float,
    silent_min: int,
) -> list[str]:
    """Return the flag labels triggered for one VP."""
    flags: list[str] = []
    if (
        s.ttl_eligible > 0
        and s.ttl_rewrite_rate > ttl_rewrite_threshold
    ):
        flags.append(FLAG_TTL)
    if (
        s.traces > 0
        and s.response_rate < response_threshold
    ):
        flags.append(FLAG_RESP)
    if s.traces > 0 and s.empty_frac > empty_threshold:
        flags.append(FLAG_EMPTY)
    silent = s.silent_depth(silent_threshold)
    if silent is not None and silent >= silent_min:
        flags.append(FLAG_SILENT)
    if s.errors > 0:
        flags.append(FLAG_ERR)
    return flags


def _format_debug_rows(s: VpStats, flags: list[str]) -> list[str]:
    """Build the `--debug` detail rows for one VP."""
    if s.traces == 0:
        return []

    # Tier 1: denominators and numerators behind the headline %s.
    if s.ttl_eligible > 0:
        ttl_elig_pct = s.ttl_eligible / s.traces * 100
        ttl_elig_str = (
            f"ttl_elig={s.ttl_eligible:,} "
            f"({ttl_elig_pct:.1f}% of traces)"
        )
    else:
        ttl_elig_str = "ttl_elig=0"

    if s.intermediate > 0:
        intermediate_pct = s.intermediate / s.traces * 100
        intermediate_str = (
            f"intermediate={s.intermediate:,} "
            f"({intermediate_pct:.1f}% of traces)"
        )
    else:
        intermediate_str = "intermediate=0"

    tier1 = (
        f"    {ttl_elig_str}  "
        f"resp={s.responding:,}  "
        f"empty={s.empty:,}  "
        f"{intermediate_str}"
    )

    rows = [tier1]

    # Tier 2: only when the VP carries the relevant flag.
    if FLAG_SILENT in flags and s.depths:
        # depth top: top-5 leading-silence depths by trace count
        # (the distribution SILENT is derived from).
        top = s.depths.most_common(5)
        top_str = ", ".join(
            f"{depth}x{count:,}" for depth, count in top
        )
        rows.append(f"    depth top: {top_str}")

    if (FLAG_RESP in flags or FLAG_EMPTY in flags) and s.stop_reasons:
        top_reasons = s.stop_reasons.most_common(5)
        reasons_str = ", ".join(
            f"{name}x{count:,}" for name, count in top_reasons
        )
        rows.append(f"    stop_reasons: {reasons_str}")

    return rows


def format_report(
    vp_stats: list[VpStats],
    ttl_rewrite_threshold: float,
    response_threshold: float,
    empty_threshold: float,
    silent_threshold: float,
    silent_min: int,
    verbose: bool,
    sort_by: str,
    debug: bool = False,
) -> tuple[str, list[VpStats]]:
    """Format a text report and return it with the flagged VPs."""
    sort_keys = {
        "vp":            lambda s: s.vp,
        "traces":        lambda s: -s.traces,
        "ttl":           lambda s: -s.ttl_rewrite_rate,
        "response":      lambda s: s.response_rate,
        "empty":         lambda s: -s.empty_frac,
        # VPs with no intermediate data sort last via the leading
        # bool; within those that have data, deeper SILENT first.
        "silent":        lambda s: (
            s.silent_depth(silent_threshold) is None,
            -(s.silent_depth(silent_threshold) or 0),
        ),
    }
    vp_stats = sorted(
        vp_stats, key=sort_keys.get(sort_by, sort_keys["vp"])
    )

    header = (
        f"{'VP':<8} {'TRACES':>9}"
        f" {'TTL%':>7} {'RESP%':>7} {'EMPTY%':>7}"
        f" {'FIRSTHOP':>8} {'SILENT':>6}  FLAGS"
    )
    sep = "-" * len(header)

    flag_pairs: list[tuple[VpStats, list[str]]] = [
        (
            s,
            vp_flags(
                s,
                ttl_rewrite_threshold,
                response_threshold,
                empty_threshold,
                silent_threshold,
                silent_min,
            ),
        )
        for s in vp_stats
    ]
    flagged = [s for s, flags in flag_pairs if flags]

    lines: list[str] = []
    lines.append(header)
    lines.append(sep)
    for s, flags in flag_pairs:
        if s.ttl_eligible == 0:
            ttl_str = "    N/A"
        else:
            ttl_str = f"{s.ttl_rewrite_rate * 100:6.2f}%"

        if s.traces == 0:
            resp_str = "    N/A"
            empty_str = "    N/A"
        else:
            resp_str = f"{s.response_rate * 100:6.2f}%"
            empty_str = f"{s.empty_frac * 100:6.2f}%"

        firsthop_str = s.firsthop_str
        # SILENT: number of leading hops silent in >= silent_threshold
        # of intermediate-responding traces.  N/A when the VP has no
        # such traces.
        silent = s.silent_depth(silent_threshold)
        silent_str = str(silent) if silent is not None else "N/A"

        if flags:
            flag_str = "+".join(flags)
        else:
            flag_str = "---"

        if not verbose and not flags:
            continue

        err_note = f" ({s.errors} err)" if s.errors else ""
        lines.append(
            f"{s.vp:<8} {s.traces:>9,}"
            f" {ttl_str} {resp_str} {empty_str}"
            f" {firsthop_str:>8} {silent_str:>6}"
            f"  {flag_str}{err_note}"
        )
        if debug:
            lines.extend(_format_debug_rows(s, flags))

    lines.append(sep)
    lines.append(
        f"\n{len(flagged)} VP(s) flagged "
        f"(TTL rewrite > {ttl_rewrite_threshold * 100:.1f}%"
        f", response rate < "
        f"{response_threshold * 100:.1f}%"
        f", empty > {empty_threshold * 100:.1f}%"
        f", SILENT >= {silent_min} hops silent in >= "
        f"{silent_threshold * 100:.0f}%, or ERR: file read"
        f" errors)."
    )

    return "\n".join(lines), flagged


def format_full_report(
    results: list[DatasetResult],
    ttl_rewrite_threshold: float,
    response_threshold: float,
    empty_threshold: float,
    silent_threshold: float,
    silent_min: int,
    verbose: bool,
    sort_by: str,
    debug: bool = False,
) -> tuple[str, list[VpStats]]:
    """
    Format the complete report across all datasets.

    Returns (report_text, all_flagged_vps).
    """
    sections: list[str] = []
    all_flagged: list[VpStats] = []

    for result in results:
        if not result.vp_stats:
            continue

        section_lines: list[str] = []
        section_lines.append(f"=== {result.label} ===")
        section_lines.append(f"Source: {result.directory}")
        if result.target_date:
            section_lines.append(f"Date:   {result.target_date}")
        section_lines.append(f"Files:  {result.file_count}")
        section_lines.append("")

        report, flagged = format_report(
            result.vp_stats,
            ttl_rewrite_threshold=ttl_rewrite_threshold,
            response_threshold=response_threshold,
            empty_threshold=empty_threshold,
            silent_threshold=silent_threshold,
            silent_min=silent_min,
            verbose=verbose,
            sort_by=sort_by,
            debug=debug,
        )
        section_lines.append(report)
        sections.append("\n".join(section_lines))
        all_flagged.extend(flagged)

    report_text = "\n\n".join(sections)
    if sections:
        report_text += "\n\n" + build_legend(silent_threshold)
    return report_text, all_flagged


# ---------------------------------------------------------------------------
# Email
# ---------------------------------------------------------------------------


def send_email(
    recipients: str, subject: str, body: str
) -> bool:
    """Send an email via the system mail command."""
    addrs = [r.strip() for r in recipients.split(",") if r.strip()]
    if not addrs:
        print(
            "ERROR: --mailto has no valid recipients.",
            file=sys.stderr,
        )
        return False
    try:
        proc = subprocess.run(
            ["mail", "-s", subject,
             "-a", "From: ark-status@caida.org", *addrs],
            input=body,
            text=True,
            capture_output=True,
            timeout=30,
        )
        if proc.returncode != 0:
            print(
                f"WARNING: mail command failed "
                f"(rc={proc.returncode}): {proc.stderr}",
                file=sys.stderr,
            )
            return False
        return True
    except FileNotFoundError:
        print(
            "ERROR: 'mail' command not found. Install mailutils "
            "or equivalent.",
            file=sys.stderr,
        )
        return False
    except Exception as exc:
        print(
            f"ERROR: could not send email: {exc}",
            file=sys.stderr,
        )
        return False


# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description=(
            "Sanity-check CAIDA Ark warts files for TTL rewrites, "
            "low response rates, fully-silent traces, and silent "
            "leading hops."
        ),
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    p.add_argument("files", nargs="*", metavar="FILE")
    p.add_argument(
        "--ttl-rewrite-threshold",
        type=float,
        default=0.5,
        help=(
            "TTL rewrite rate above which a VP is flagged TTL"
        ),
    )
    p.add_argument(
        "--response-threshold",
        type=float,
        default=0.10,
        help=(
            "Response rate below which a VP is flagged RESP"
        ),
    )
    p.add_argument(
        "--empty-threshold",
        type=float,
        default=0.25,
        help=(
            "Fraction of fully-silent traces (no responding hop)"
            " above which a VP is flagged EMPTY"
        ),
    )
    p.add_argument(
        "--silent-threshold",
        type=float,
        default=0.90,
        help=(
            "A leading hop counts toward SILENT when it is silent in"
            " at least this fraction of intermediate-responding"
            " traces."
        ),
    )
    p.add_argument(
        "--silent-min",
        type=int,
        default=2,
        help=(
            "Flag a VP SILENT only when its SILENT value reaches this"
            " many hops.  Default 2: one silent leading hop is common"
            " and usually not worth reconfiguring."
        ),
    )
    p.add_argument(
        "--ttl-tolerance",
        type=int,
        default=2,
        help=(
            "Allowable excess of icmp_q_ttl over probe_ttl"
            " before counting as a TTL rewrite"
        ),
    )
    p.add_argument(
        "--sort-by",
        default="vp",
        choices=[
            "vp", "traces", "ttl", "response", "empty", "silent",
        ],
        help=(
            "Column to sort by.  'ttl' = TTL rewrite rate"
            " (descending); 'response' = response rate"
            " (ascending, worst first); 'empty' = EMPTY%"
            " (descending); 'silent' = SILENT (descending, VPs"
            " with no intermediate-responding traces last)"
        ),
    )
    p.add_argument(
        "--verbose",
        action="store_true",
        help="Show all VPs, not just flagged ones",
    )
    p.add_argument(
        "--debug",
        action="store_true",
        help=(
            "Add an indented detail row under each VP showing the"
            " counts behind the headline percentages (ttl_elig,"
            " resp, empty, intermediate).  For SILENT-flagged VPs"
            " also adds a 'depth top:' line; for RESP- or"
            " EMPTY-flagged VPs a 'stop_reasons:' line."
        ),
    )
    p.add_argument(
        "--ipv4-dir",
        metavar="PATH",
        help=(
            "Base directory for IPv4 team-probing data "
            "(e.g. .../team-probing/list-7.allpref24/"
            "team-1/daily)"
        ),
    )
    p.add_argument(
        "--ipv6-dir",
        metavar="PATH",
        help=(
            "Base directory for IPv6 prefix-probing data "
            "(e.g. .../topo-v6/list-8.ipv6.allpref)"
        ),
    )
    p.add_argument(
        "--date",
        metavar="YYYY-MM-DD",
        type=date.fromisoformat,
        help=(
            "Target date for directory discovery "
            "(default: two days ago -- yesterday's IPv6 files are"
            " often still incomplete when the scan runs)"
        ),
    )
    p.add_argument(
        "--mailto",
        metavar="ADDRESS",
        help=(
            "Send email alert if any VPs are flagged. "
            "Comma-separated for multiple recipients."
        ),
    )
    return p.parse_args()


def main() -> int:
    args = parse_args()

    for name, val in (
        ("--ttl-rewrite-threshold", args.ttl_rewrite_threshold),
        ("--response-threshold", args.response_threshold),
        ("--empty-threshold", args.empty_threshold),
        ("--silent-threshold", args.silent_threshold),
    ):
        if not 0.0 <= val <= 1.0:
            print(
                f"ERROR: {name} must be in [0.0, 1.0], "
                f"got {val}.",
                file=sys.stderr,
            )
            return 1

    if args.silent_min < 1:
        print(
            f"ERROR: --silent-min must be >= 1, "
            f"got {args.silent_min}.",
            file=sys.stderr,
        )
        return 1

    has_dirs = args.ipv4_dir or args.ipv6_dir
    has_files = bool(args.files)

    if not has_dirs and not has_files:
        print(
            "ERROR: no input files or directories specified.\n"
            "Provide files as arguments, or use --ipv4-dir / "
            "--ipv6-dir for auto-discovery.",
            file=sys.stderr,
        )
        return 1

    target = args.date or (date.today() - timedelta(days=2))
    # ISO 8601 (YYYY-MM-DD) for user-facing display.  Filename
    # matching in discover_* uses YYYYMMDD internally from the same
    # `target` date object.
    target_display = target.isoformat()
    results: list[DatasetResult] = []

    # --- Directory discovery mode ---
    if args.ipv4_dir:
        files = discover_ipv4_files(args.ipv4_dir, target)
        vp_stats = analyze_files(files, args.ttl_tolerance)
        results.append(DatasetResult(
            label="IPv4 team-probing",
            directory=args.ipv4_dir,
            target_date=target_display,
            file_count=len(files),
            vp_stats=vp_stats,
        ))

    if args.ipv6_dir:
        files = discover_ipv6_files(args.ipv6_dir, target)
        vp_stats = analyze_files(files, args.ttl_tolerance)
        results.append(DatasetResult(
            label="IPv6 prefix-probing",
            directory=args.ipv6_dir,
            target_date=target_display,
            file_count=len(files),
            vp_stats=vp_stats,
        ))

    # --- Explicit files mode ---
    if has_files:
        warts_files = [
            f for f in args.files if is_warts_file(f)
        ]
        skipped = len(args.files) - len(warts_files)
        if skipped:
            print(
                f"NOTE: skipped {skipped} non-warts file(s).",
                file=sys.stderr,
            )
        vp_stats = analyze_files(
            warts_files, args.ttl_tolerance
        )
        results.append(DatasetResult(
            label="Explicit files",
            directory="(command line)",
            target_date="",
            file_count=len(warts_files),
            vp_stats=vp_stats,
        ))

    report, all_flagged = format_full_report(
        results,
        ttl_rewrite_threshold=args.ttl_rewrite_threshold,
        response_threshold=args.response_threshold,
        empty_threshold=args.empty_threshold,
        silent_threshold=args.silent_threshold,
        silent_min=args.silent_min,
        verbose=args.verbose,
        sort_by=args.sort_by,
        debug=args.debug,
    )

    # --- Always print report to stdout ---
    if report:
        print(report)

    # --- Email if flagged VPs exist ---
    if args.mailto and all_flagged:
        # Dedupe by VP name: one VP flagged in both IPv4 and IPv6
        # is still one VP.  Per-dataset detail is in the body.
        n = len({s.vp for s in all_flagged})
        subject = (
            f"Daily Ark data scan: {n} VP(s) flagged -- {target_display}"
        )
        if not send_email(args.mailto, subject, report):
            return 1
    elif args.mailto:
        # No flagged VPs -- no email, but note it on stderr so
        # cron logs show the script ran.
        print(
            f"No VPs flagged for {target_display}; "
            f"no email sent.",
            file=sys.stderr,
        )

    return 0


if __name__ == "__main__":
    sys.exit(main())
