Files
Zachary D. Rowitsch 38e6dcc34a chore: archive v1.0 phase directories to milestones/v1.0-phases/
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 01:33:15 -04:00

30 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
phase plan type wave depends_on files_modified autonomous requirements must_haves
01-data-pipeline 03 execute 3
01-01
01-02
tcptop/src/aggregator.rs
tcptop/src/output.rs
tcptop/src/main.rs
tcptop/tests/pipeline_test.rs
true
DATA-06
DATA-07
truths artifacts key_links
Running tcptop on Linux with root prints per-connection data as streaming lines to stdout
Each line shows protocol, local/remote addr+port, PID, process name, TCP state, bytes in/out, packets in/out, RTT, bandwidth rate
UDP flows are tracked by 4-tuple and expire after 5 seconds of idle
Bandwidth rates (KB/s, MB/s) are calculated per connection per tick interval
Closed TCP connections print a [CLOSED] line and are removed after one tick
Pre-existing connections show [PARTIAL] marker
Human-readable sizes are used (1.2 MB, 340 KB/s)
path provides contains
tcptop/src/aggregator.rs ConnectionTable with tick-based rate calculation and UDP timeout struct ConnectionTable
path provides contains
tcptop/src/output.rs Streaming stdout formatter with human-readable sizes fn format_bytes
path provides contains
tcptop/src/main.rs Tokio event loop wiring collector, aggregator, and output #[tokio::main]
path provides contains
tcptop/tests/pipeline_test.rs Unit test proving data flows from CollectorEvent through aggregator to output fn test_pipeline
from to via pattern
tcptop/src/main.rs tcptop/src/collector/linux.rs creates LinuxCollector and calls start() LinuxCollector
from to via pattern
tcptop/src/main.rs tcptop/src/aggregator.rs receives CollectorEvents and updates ConnectionTable ConnectionTable
from to via pattern
tcptop/src/aggregator.rs tcptop/src/output.rs passes connection records for formatting format_connections
Implement the connection aggregator (bandwidth calculation, UDP timeout, lifecycle management), the streaming stdout output formatter, and wire everything together in main.rs with a tokio event loop.

Purpose: Complete the proof-of-life data pipeline -- from kernel events through aggregation to human-readable streaming output. This plan delivers the Phase 1 success criteria: running tcptop prints real per-connection data.

Output: Working end-to-end pipeline from eBPF events to formatted stdout lines with bandwidth rates, UDP flow management, and connection lifecycle handling.

<execution_context> @/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/workflows/execute-plan.md @/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-data-pipeline/01-CONTEXT.md @.planning/phases/01-data-pipeline/01-RESEARCH.md @.planning/phases/01-data-pipeline/01-01-SUMMARY.md @.planning/phases/01-data-pipeline/01-02-SUMMARY.md

From tcptop/src/model.rs:

  • ConnectionKey { protocol: Protocol, local_addr: IpAddr, local_port: u16, remote_addr: IpAddr, remote_port: u16 }
  • ConnectionRecord { key, pid, process_name, tcp_state: Option, bytes_in/out: u64, packets_in/out: u64, rate_in/out: f64, prev_bytes_in/out: u64, rtt_us: Option, last_seen: Instant, is_partial: bool, is_closed: bool }
  • Protocol { Tcp, Udp }
  • TcpState with as_str() -> &str and from_kernel(u32)

From tcptop/src/collector/mod.rs:

  • enum CollectorEvent { TcpSend{key, pid, comm, bytes, srtt_us}, TcpRecv{..}, UdpSend{key, pid, comm, bytes}, UdpRecv{..}, TcpStateChange{key, pid, old_state, new_state} }
  • trait NetworkCollector { start(tx), stop(), bootstrap_existing() }

From tcptop/src/collector/linux.rs:

  • LinuxCollector::new() -> Result
Task 1: Implement aggregator and output formatter tcptop/src/aggregator.rs, tcptop/src/output.rs tcptop/src/model.rs (ConnectionRecord, ConnectionKey, TcpState -- exact field names and types), tcptop/src/collector/mod.rs (CollectorEvent enum variants -- exact field names), .planning/phases/01-data-pipeline/01-RESEARCH.md (Pattern 3: Connection Table code, output format example), .planning/phases/01-data-pipeline/01-CONTEXT.md (D-01 through D-08, D-12, D-14, D-15 for output behavior) **tcptop/src/aggregator.rs:**
Implement ConnectionTable that processes CollectorEvents and maintains per-connection state.

```rust
use crate::collector::CollectorEvent;
use crate::model::{ConnectionKey, ConnectionRecord, Protocol, TcpState};
use std::collections::HashMap;
use std::time::{Duration, Instant};

const UDP_IDLE_TIMEOUT: Duration = Duration::from_secs(5); // per D-06

pub struct ConnectionTable {
    connections: HashMap<ConnectionKey, ConnectionRecord>,
    last_tick: Instant,
    closed_connections: Vec<ConnectionRecord>, // per D-14: buffer for [CLOSED] output
}

impl ConnectionTable {
    pub fn new() -> Self {
        Self {
            connections: HashMap::new(),
            last_tick: Instant::now(),
            closed_connections: Vec::new(),
        }
    }

    /// Seed with pre-existing connections from /proc bootstrap
    pub fn seed(&mut self, records: Vec<ConnectionRecord>) {
        for record in records {
            self.connections.insert(record.key.clone(), record);
        }
    }

    /// Process a single collector event, updating the connection table
    pub fn update(&mut self, event: CollectorEvent) {
        match event {
            CollectorEvent::TcpSend { key, pid, comm, bytes, srtt_us } => {
                let record = self.get_or_create(&key, pid, &comm, Protocol::Tcp);
                record.bytes_out += bytes as u64;    // per DATA-01
                record.packets_out += 1;             // per DATA-02
                if srtt_us > 0 {
                    record.rtt_us = Some(srtt_us);   // per DATA-05 (already >>3 in collector)
                }
                record.last_seen = Instant::now();
            }
            CollectorEvent::TcpRecv { key, pid, comm, bytes, srtt_us } => {
                let record = self.get_or_create(&key, pid, &comm, Protocol::Tcp);
                record.bytes_in += bytes as u64;     // per DATA-01
                record.packets_in += 1;              // per DATA-02
                if srtt_us > 0 {
                    record.rtt_us = Some(srtt_us);
                }
                record.last_seen = Instant::now();
            }
            CollectorEvent::UdpSend { key, pid, comm, bytes } => {
                let record = self.get_or_create(&key, pid, &comm, Protocol::Udp);
                record.bytes_out += bytes as u64;
                record.packets_out += 1;
                record.last_seen = Instant::now();
            }
            CollectorEvent::UdpRecv { key, pid, comm, bytes } => {
                let record = self.get_or_create(&key, pid, &comm, Protocol::Udp);
                record.bytes_in += bytes as u64;
                record.packets_in += 1;
                record.last_seen = Instant::now();
            }
            CollectorEvent::TcpStateChange { key, pid, old_state: _, new_state } => {
                if let Some(record) = self.connections.get_mut(&key) {
                    record.tcp_state = TcpState::from_kernel(new_state);
                    // PID=0 enrichment (Pitfall 3 from Plan 02): inet_sock_set_state
                    // fires in softirq context where PID=0. Inherit PID from the
                    // existing record which was populated by earlier kprobe events.
                    if pid != 0 {
                        record.pid = pid;
                    }
                    // per D-12: mark as closed, will be removed next tick
                    if new_state == 7 { // TCP_CLOSE
                        record.is_closed = true;
                    }
                }
                // If no existing record for this key, the state change is for a
                // connection we haven't seen data for -- ignore it (no PID to attribute)
            }
        }
    }

    /// Called periodically to calculate rates and expire flows
    /// Returns (active connections, newly closed connections for [CLOSED] output)
    pub fn tick(&mut self) -> (Vec<&ConnectionRecord>, Vec<ConnectionRecord>) {
        let now = Instant::now();
        let dt = now.duration_since(self.last_tick).as_secs_f64();
        if dt <= 0.0 {
            return (self.connections.values().collect(), Vec::new());
        }

        // Calculate bandwidth rates per DATA-06
        for record in self.connections.values_mut() {
            record.rate_in = (record.bytes_in.saturating_sub(record.prev_bytes_in)) as f64 / dt;
            record.rate_out = (record.bytes_out.saturating_sub(record.prev_bytes_out)) as f64 / dt;
            record.prev_bytes_in = record.bytes_in;
            record.prev_bytes_out = record.bytes_out;
        }

        // Collect closed TCP connections for [CLOSED] output per D-14
        let mut closed = Vec::new();
        let mut to_remove = Vec::new();
        for (key, record) in &self.connections {
            if record.is_closed {
                closed.push(record.clone());
                to_remove.push(key.clone());
            }
        }
        for key in to_remove {
            self.connections.remove(&key);
        }

        // Expire UDP flows idle > 5 seconds per D-06
        self.connections.retain(|_key, record| {
            if record.key.protocol == Protocol::Udp {
                record.last_seen.elapsed() < UDP_IDLE_TIMEOUT
            } else {
                true
            }
        });

        self.last_tick = now;
        (self.connections.values().collect(), closed)
    }

    fn get_or_create(&mut self, key: &ConnectionKey, pid: u32, comm: &str, protocol: Protocol) -> &mut ConnectionRecord {
        self.connections.entry(key.clone()).or_insert_with(|| {
            ConnectionRecord {
                key: key.clone(),
                pid,
                process_name: comm.to_string(),
                tcp_state: if protocol == Protocol::Tcp { Some(TcpState::Established) } else { None },
                bytes_in: 0,
                bytes_out: 0,
                packets_in: 0,
                packets_out: 0,
                rate_in: 0.0,
                rate_out: 0.0,
                prev_bytes_in: 0,
                prev_bytes_out: 0,
                rtt_us: None,
                last_seen: Instant::now(),
                is_partial: false,
                is_closed: false,
            }
        })
    }

    pub fn connection_count(&self) -> usize {
        self.connections.len()
    }
}
```

**tcptop/src/output.rs (per D-01, D-02, D-03, D-04, D-14):**

Streaming stdout formatter. Each tick prints all active connections, one per line. Closed connections get a `[CLOSED]` line.

```rust
use crate::model::{ConnectionRecord, Protocol};

/// Format bytes into human-readable string per D-02
/// Shows raw value alongside when it fits: "1258291 (1.2M)"
/// If too noisy (raw < 1024), just show raw: "340B"
pub fn format_bytes(bytes: u64) -> String {
    if bytes < 1024 {
        format!("{}B", bytes)
    } else if bytes < 1024 * 1024 {
        format!("{} ({:.1}K)", bytes, bytes as f64 / 1024.0)
    } else if bytes < 1024 * 1024 * 1024 {
        format!("{} ({:.1}M)", bytes, bytes as f64 / (1024.0 * 1024.0))
    } else {
        format!("{} ({:.1}G)", bytes, bytes as f64 / (1024.0 * 1024.0 * 1024.0))
    }
}

/// Format rate (bytes/sec) into human-readable string
pub fn format_rate(rate: f64) -> String {
    if rate < 1.0 {
        "0B/s".to_string()
    } else if rate < 1024.0 {
        format!("{:.0}B/s", rate)
    } else if rate < 1024.0 * 1024.0 {
        format!("{:.1}KB/s", rate / 1024.0)
    } else {
        format!("{:.1}MB/s", rate / (1024.0 * 1024.0))
    }
}

/// Format RTT in microseconds to human-readable
pub fn format_rtt(rtt_us: Option<u32>) -> String {
    match rtt_us {
        None => "-".to_string(),
        Some(us) if us < 1000 => format!("{:.0}us", us),
        Some(us) if us < 1_000_000 => format!("{:.1}ms", us as f64 / 1000.0),
        Some(us) => format!("{:.1}s", us as f64 / 1_000_000.0),
    }
}

/// Print header line (called once at startup)
pub fn print_header() {
    println!(
        "{:<5} {:<22} {:<22} {:<7} {:<12} {:<12} {:<14} {:<14} {:<8} {:<8} {:<8} {:<10} {:<10}",
        "PROTO", "LOCAL", "REMOTE", "PID", "PROCESS", "STATE",
        "BYTES_IN", "BYTES_OUT", "PKTS_IN", "PKTS_OUT", "RTT",
        "RATE_IN", "RATE_OUT"
    );
}

/// Print one connection line per D-03
pub fn print_connection(record: &ConnectionRecord) {
    let proto = match record.key.protocol {
        Protocol::Tcp => "TCP",
        Protocol::Udp => "UDP",
    };
    let local = format!("{}:{}", record.key.local_addr, record.key.local_port);
    let remote = format!("{}:{}", record.key.remote_addr, record.key.remote_port);
    let state = match &record.tcp_state {
        Some(s) => s.as_str(),
        None => "UDP",  // per D-07: show "UDP" for UDP flows
    };
    let partial_marker = if record.is_partial { " [PARTIAL]" } else { "" };

    println!(
        "{:<5} {:<22} {:<22} {:<7} {:<12} {:<12} {:<14} {:<14} {:<8} {:<8} {:<8} {:<10} {:<10}{}",
        proto,
        local,
        remote,
        record.pid,
        truncate(&record.process_name, 11),
        state,
        format_bytes(record.bytes_in),
        format_bytes(record.bytes_out),
        record.packets_in,
        record.packets_out,
        format_rtt(record.rtt_us),
        format_rate(record.rate_in),
        format_rate(record.rate_out),
        partial_marker,
    );
}

/// Print [CLOSED] line per D-14
pub fn print_closed(record: &ConnectionRecord) {
    println!(
        "[CLOSED] {} {}:{} -> {}:{} ({}, PID {})",
        match record.key.protocol { Protocol::Tcp => "TCP", Protocol::Udp => "UDP" },
        record.key.local_addr,
        record.key.local_port,
        record.key.remote_addr,
        record.key.remote_port,
        record.process_name,
        record.pid,
    );
}

/// Print all connections for one tick cycle per D-01
pub fn print_tick(active: &[&ConnectionRecord], closed: &[ConnectionRecord]) {
    for record in active {
        print_connection(record);
    }
    for record in closed {
        print_closed(record);
    }
}

fn truncate(s: &str, max: usize) -> String {
    if s.len() <= max {
        s.to_string()
    } else {
        format!("{}~", &s[..max - 1])
    }
}
```

Note: The exact column widths and format strings may need adjustment after seeing real data. Per D-04, this is throwaway scaffolding -- prioritize getting data visible over pixel-perfect formatting.
cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -10 - tcptop/src/aggregator.rs contains `pub struct ConnectionTable` - tcptop/src/aggregator.rs contains `pub fn tick(&mut self)` that returns active and closed connections - tcptop/src/aggregator.rs contains `pub fn update(&mut self, event: CollectorEvent)` - tcptop/src/aggregator.rs contains `pub fn seed(&mut self, records: Vec)` - tcptop/src/aggregator.rs contains `Duration::from_secs(5)` (UDP idle timeout per D-06) - tcptop/src/aggregator.rs contains `rate_in =` AND `rate_out =` (bandwidth calculation per DATA-06) - tcptop/src/aggregator.rs contains `is_closed = true` (TCP close detection per D-12) - tcptop/src/aggregator.rs contains `connections.retain` (UDP expiry) - tcptop/src/aggregator.rs contains `if pid != 0` (PID=0 enrichment for TcpStateChange per Pitfall 3) - tcptop/src/output.rs contains `pub fn format_bytes(bytes: u64) -> String` - tcptop/src/output.rs contains `pub fn format_rate(rate: f64) -> String` - tcptop/src/output.rs contains `[CLOSED]` (per D-14) - tcptop/src/output.rs contains `[PARTIAL]` (per D-15) - tcptop/src/output.rs contains `"UDP"` in state column context (per D-07) - `cargo check -p tcptop` succeeds ConnectionTable processes events with bandwidth rate calculation, UDP timeout, and PID=0 enrichment for state change events. Output formatter produces streaming lines with human-readable sizes, [CLOSED] events, and [PARTIAL] markers. Task 2: Wire tokio event loop in main.rs and add pipeline unit test tcptop/src/main.rs, tcptop/tests/pipeline_test.rs tcptop/src/main.rs (current state from Plan 01), tcptop/src/collector/mod.rs (NetworkCollector trait, CollectorEvent), tcptop/src/collector/linux.rs (LinuxCollector::new), tcptop/src/aggregator.rs (ConnectionTable API from Task 1), tcptop/src/output.rs (print_header, print_tick from Task 1), tcptop/src/privilege.rs (check_privileges), .planning/phases/01-data-pipeline/01-CONTEXT.md (D-01 streaming output, D-04 throwaway scaffolding) **Part A: Replace the placeholder main.rs with the full tokio event loop.**
```rust
mod aggregator;
mod collector;
mod model;
mod output;
mod privilege;
mod proc_bootstrap;

use aggregator::ConnectionTable;
use collector::linux::LinuxCollector;
use collector::NetworkCollector;
use anyhow::Result;
use log::info;
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    // Privilege check per D-09, D-10, D-11
    privilege::check_privileges();

    // Create collector and aggregator
    let mut collector = LinuxCollector::new()?;
    let mut table = ConnectionTable::new();

    // Bootstrap pre-existing connections per D-15
    match collector.bootstrap_existing() {
        Ok(existing) => {
            info!("Bootstrapped {} pre-existing connections", existing.len());
            table.seed(existing);
        }
        Err(e) => {
            log::warn!("Failed to bootstrap existing connections: {}", e);
        }
    }

    // Channel for collector -> aggregator communication
    // Buffer size 4096 to avoid blocking the RingBuf reader (per anti-pattern in research)
    let (tx, mut rx) = mpsc::channel(4096);

    // Spawn collector in background task
    let collector_handle = tokio::spawn(async move {
        if let Err(e) = collector.start(tx).await {
            log::error!("Collector error: {}", e);
        }
    });

    // Print header once
    output::print_header();

    // Tick interval for rate calculation and output (1 second)
    let mut tick = interval(Duration::from_secs(1));

    // Signal handling for graceful shutdown
    let mut sigint = tokio::signal::unix::signal(
        tokio::signal::unix::SignalKind::interrupt()
    )?;
    let mut sigterm = tokio::signal::unix::signal(
        tokio::signal::unix::SignalKind::terminate()
    )?;

    loop {
        tokio::select! {
            // Process incoming events from collector
            Some(event) = rx.recv() => {
                table.update(event);
            }
            // Periodic tick: calculate rates and print
            _ = tick.tick() => {
                let (active, closed) = table.tick();
                output::print_tick(&active, &closed);
            }
            // Graceful shutdown on SIGINT or SIGTERM
            _ = sigint.recv() => {
                info!("Received SIGINT, shutting down");
                break;
            }
            _ = sigterm.recv() => {
                info!("Received SIGTERM, shutting down");
                break;
            }
        }
    }

    // Cleanup: collector handle will be dropped, which drops the Ebpf struct
    // and detaches all eBPF programs
    collector_handle.abort();
    Ok(())
}
```

**Key behaviors:**
- Privilege check runs FIRST, before any async setup
- Bootstrap pre-existing connections BEFORE starting collector
- mpsc channel buffer = 4096 (decouples RingBuf reader from output per anti-pattern guidance)
- `tokio::select!` multiplexes event processing, periodic tick, and signal handling
- 1-second tick interval for rate calculation and output refresh
- SIGINT and SIGTERM handled for graceful shutdown (detaches eBPF programs)
- Output is streaming lines per D-01: no screen clearing, no cursor manipulation
- Header printed once at startup

**Note on signal handling:** The code uses `tokio::signal` instead of the `signal-hook` crate for simplicity since we're already in a tokio context. If `signal-hook` is preferred (it's in Cargo.toml), the executor may swap -- the behavior is identical.

Ensure all `mod` declarations match the actual file structure. The module tree should be:
- `main.rs` -> `mod aggregator`, `mod collector`, `mod model`, `mod output`, `mod privilege`, `mod proc_bootstrap`
- `collector/mod.rs` -> `pub mod linux`

**Part B: Create a unit test that proves the data pipeline works end-to-end without eBPF/root.**

Create `tcptop/tests/pipeline_test.rs` -- an integration test that feeds synthetic `CollectorEvent`s through `ConnectionTable.update()` then calls `tick()` and verifies the output.

```rust
//! Integration test: proves CollectorEvent -> ConnectionTable -> output pipeline
//! works without requiring eBPF, root, or Linux.

use std::net::{IpAddr, Ipv4Addr};

// These imports reference the tcptop crate's public modules.
// If modules are not public, make aggregator, collector (CollectorEvent), model,
// and output pub(crate) or add a test helper. Adjust as needed.

#[test]
fn test_pipeline_tcp_send_updates_table() {
    // Create a ConnectionTable
    let mut table = tcptop::aggregator::ConnectionTable::new();

    // Synthesize a TcpSend event
    let key = tcptop::model::ConnectionKey {
        protocol: tcptop::model::Protocol::Tcp,
        local_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
        local_port: 12345,
        remote_addr: IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)),
        remote_port: 443,
    };
    let event = tcptop::collector::CollectorEvent::TcpSend {
        key: key.clone(),
        pid: 1234,
        comm: "curl".to_string(),
        bytes: 1500,
        srtt_us: 5000,
    };

    // Feed event into aggregator
    table.update(event);

    // Tick to calculate rates
    let (active, closed) = table.tick();

    // Verify: one active connection, no closed
    assert_eq!(active.len(), 1);
    assert_eq!(closed.len(), 0);

    // Verify connection has correct data
    let record = active[0];
    assert_eq!(record.pid, 1234);
    assert_eq!(record.bytes_out, 1500);
    assert_eq!(record.packets_out, 1);
    assert_eq!(record.rtt_us, Some(5000));
    assert!(!record.is_closed);
    assert!(!record.is_partial);
}

#[test]
fn test_pipeline_tcp_close_lifecycle() {
    let mut table = tcptop::aggregator::ConnectionTable::new();

    let key = tcptop::model::ConnectionKey {
        protocol: tcptop::model::Protocol::Tcp,
        local_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
        local_port: 12345,
        remote_addr: IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)),
        remote_port: 443,
    };

    // First: a data event creates the connection
    table.update(tcptop::collector::CollectorEvent::TcpSend {
        key: key.clone(), pid: 1234, comm: "curl".to_string(), bytes: 100, srtt_us: 0,
    });

    // Then: a state change to CLOSE (state 7)
    table.update(tcptop::collector::CollectorEvent::TcpStateChange {
        key: key.clone(), pid: 0, old_state: 1, new_state: 7,
    });

    // Tick: should return the closed connection in closed list
    let (active, closed) = table.tick();
    assert_eq!(active.len(), 0, "closed connection should not be in active list");
    assert_eq!(closed.len(), 1, "closed connection should appear in closed list");
    assert_eq!(closed[0].pid, 1234, "PID should be inherited from earlier event, not 0");
}

#[test]
fn test_output_format_bytes() {
    assert_eq!(tcptop::output::format_bytes(500), "500B");
    assert!(tcptop::output::format_bytes(2048).contains("K"));
    assert!(tcptop::output::format_bytes(2_000_000).contains("M"));
}

#[test]
fn test_output_format_rate() {
    assert_eq!(tcptop::output::format_rate(0.0), "0B/s");
    assert!(tcptop::output::format_rate(2048.0).contains("KB/s"));
}
```

**IMPORTANT:** The test imports assume `aggregator`, `collector`, `model`, and `output` are accessible from integration tests. If the crate's modules are private (`mod` not `pub mod`), the executor must either:
(a) Make the relevant modules `pub` in main.rs (preferred for a binary crate with tests), or
(b) Convert to unit tests inside `aggregator.rs` using `#[cfg(test)] mod tests { ... }`.
The key requirement is that the tests exist and pass, proving the data flow works without eBPF/root.
cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -5 && cargo test -p tcptop --test pipeline_test 2>&1 | tail -20 - tcptop/src/main.rs contains `#[tokio::main]` - tcptop/src/main.rs contains `privilege::check_privileges()` BEFORE any async work - tcptop/src/main.rs contains `LinuxCollector::new()` - tcptop/src/main.rs contains `ConnectionTable::new()` - tcptop/src/main.rs contains `collector.bootstrap_existing()` - tcptop/src/main.rs contains `table.seed(` - tcptop/src/main.rs contains `mpsc::channel(` with buffer size >= 1024 - tcptop/src/main.rs contains `collector.start(tx)` inside a tokio::spawn - tcptop/src/main.rs contains `table.update(event)` (processing events from channel) - tcptop/src/main.rs contains `table.tick()` (periodic rate calculation) - tcptop/src/main.rs contains `output::print_tick(` (formatted output) - tcptop/src/main.rs contains `output::print_header()` (header printed once) - tcptop/src/main.rs contains `tokio::select!` (event loop multiplexing) - tcptop/src/main.rs contains `SignalKind::interrupt` OR `signal::ctrl_c` (graceful shutdown) - tcptop/src/main.rs contains `mod aggregator` AND `mod collector` AND `mod model` AND `mod output` AND `mod privilege` AND `mod proc_bootstrap` - tcptop/tests/pipeline_test.rs exists with at least 3 tests - `cargo check -p tcptop` succeeds - `cargo test -p tcptop --test pipeline_test` passes (all tests green) Main event loop wires collector, aggregator, and output together with tokio::select! multiplexing. Pipeline unit tests prove data flow from synthetic CollectorEvents through ConnectionTable to output formatters -- verifiable without eBPF, root, or Linux. Privilege check runs first, pre-existing connections bootstrapped, events flow from eBPF through channel to aggregator to stdout. Graceful shutdown on SIGINT/SIGTERM. 1. `cargo check -p tcptop` exits 0 (full pipeline compiles) 2. `cargo test -p tcptop --test pipeline_test` exits 0 (pipeline data flow proven) 3. `grep "tokio::main" tcptop/src/main.rs` returns a match 4. `grep "ConnectionTable" tcptop/src/aggregator.rs` returns a match 5. `grep "format_bytes" tcptop/src/output.rs` returns a match 6. `grep "CLOSED" tcptop/src/output.rs` returns a match 7. `grep "select!" tcptop/src/main.rs` returns a match 8. `grep "check_privileges" tcptop/src/main.rs` returns a match 9. `grep "pid != 0" tcptop/src/aggregator.rs` returns a match (PID=0 enrichment)

<success_criteria>

  • ConnectionTable processes all CollectorEvent variants and calculates bandwidth rates
  • PID=0 enrichment: TcpStateChange with pid=0 inherits PID from existing connection record
  • UDP flows expire after 5 seconds of idle per D-06
  • Closed TCP connections produce [CLOSED] line per D-14 and are removed
  • Pre-existing connections marked [PARTIAL] per D-15
  • Human-readable byte formatting per D-02 (e.g., "1258291 (1.2M)")
  • Output is streaming lines per D-01 (no cursor manipulation)
  • Main event loop multiplexes events, periodic output, and signal handling
  • Full pipeline compiles: privilege -> bootstrap -> collect -> aggregate -> output
  • Pipeline unit tests pass, proving data flow without eBPF/root </success_criteria>
After completion, create `.planning/phases/01-data-pipeline/01-03-SUMMARY.md`