Files
Zachary D. Rowitsch 38e6dcc34a chore: archive v1.0 phase directories to milestones/v1.0-phases/
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 01:33:15 -04:00

700 lines
30 KiB
Markdown

---
phase: 01-data-pipeline
plan: 03
type: execute
wave: 3
depends_on: ["01-01", "01-02"]
files_modified:
- tcptop/src/aggregator.rs
- tcptop/src/output.rs
- tcptop/src/main.rs
- tcptop/tests/pipeline_test.rs
autonomous: true
requirements:
- DATA-06
- DATA-07
must_haves:
truths:
- "Running tcptop on Linux with root prints per-connection data as streaming lines to stdout"
- "Each line shows protocol, local/remote addr+port, PID, process name, TCP state, bytes in/out, packets in/out, RTT, bandwidth rate"
- "UDP flows are tracked by 4-tuple and expire after 5 seconds of idle"
- "Bandwidth rates (KB/s, MB/s) are calculated per connection per tick interval"
- "Closed TCP connections print a [CLOSED] line and are removed after one tick"
- "Pre-existing connections show [PARTIAL] marker"
- "Human-readable sizes are used (1.2 MB, 340 KB/s)"
artifacts:
- path: "tcptop/src/aggregator.rs"
provides: "ConnectionTable with tick-based rate calculation and UDP timeout"
contains: "struct ConnectionTable"
- path: "tcptop/src/output.rs"
provides: "Streaming stdout formatter with human-readable sizes"
contains: "fn format_bytes"
- path: "tcptop/src/main.rs"
provides: "Tokio event loop wiring collector, aggregator, and output"
contains: "#[tokio::main]"
- path: "tcptop/tests/pipeline_test.rs"
provides: "Unit test proving data flows from CollectorEvent through aggregator to output"
contains: "fn test_pipeline"
key_links:
- from: "tcptop/src/main.rs"
to: "tcptop/src/collector/linux.rs"
via: "creates LinuxCollector and calls start()"
pattern: "LinuxCollector"
- from: "tcptop/src/main.rs"
to: "tcptop/src/aggregator.rs"
via: "receives CollectorEvents and updates ConnectionTable"
pattern: "ConnectionTable"
- from: "tcptop/src/aggregator.rs"
to: "tcptop/src/output.rs"
via: "passes connection records for formatting"
pattern: "format_connections"
---
<objective>
Implement the connection aggregator (bandwidth calculation, UDP timeout, lifecycle management), the streaming stdout output formatter, and wire everything together in main.rs with a tokio event loop.
Purpose: Complete the proof-of-life data pipeline -- from kernel events through aggregation to human-readable streaming output. This plan delivers the Phase 1 success criteria: running `tcptop` prints real per-connection data.
Output: Working end-to-end pipeline from eBPF events to formatted stdout lines with bandwidth rates, UDP flow management, and connection lifecycle handling.
</objective>
<execution_context>
@/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/workflows/execute-plan.md
@/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/01-data-pipeline/01-CONTEXT.md
@.planning/phases/01-data-pipeline/01-RESEARCH.md
@.planning/phases/01-data-pipeline/01-01-SUMMARY.md
@.planning/phases/01-data-pipeline/01-02-SUMMARY.md
<interfaces>
<!-- From Plan 01 and Plan 02 -->
From tcptop/src/model.rs:
- ConnectionKey { protocol: Protocol, local_addr: IpAddr, local_port: u16, remote_addr: IpAddr, remote_port: u16 }
- ConnectionRecord { key, pid, process_name, tcp_state: Option<TcpState>, bytes_in/out: u64, packets_in/out: u64, rate_in/out: f64, prev_bytes_in/out: u64, rtt_us: Option<u32>, last_seen: Instant, is_partial: bool, is_closed: bool }
- Protocol { Tcp, Udp }
- TcpState with as_str() -> &str and from_kernel(u32)
From tcptop/src/collector/mod.rs:
- enum CollectorEvent { TcpSend{key, pid, comm, bytes, srtt_us}, TcpRecv{..}, UdpSend{key, pid, comm, bytes}, UdpRecv{..}, TcpStateChange{key, pid, old_state, new_state} }
- trait NetworkCollector { start(tx), stop(), bootstrap_existing() }
From tcptop/src/collector/linux.rs:
- LinuxCollector::new() -> Result<Self>
</interfaces>
</context>
<tasks>
<task type="auto">
<name>Task 1: Implement aggregator and output formatter</name>
<files>
tcptop/src/aggregator.rs,
tcptop/src/output.rs
</files>
<read_first>
tcptop/src/model.rs (ConnectionRecord, ConnectionKey, TcpState -- exact field names and types),
tcptop/src/collector/mod.rs (CollectorEvent enum variants -- exact field names),
.planning/phases/01-data-pipeline/01-RESEARCH.md (Pattern 3: Connection Table code, output format example),
.planning/phases/01-data-pipeline/01-CONTEXT.md (D-01 through D-08, D-12, D-14, D-15 for output behavior)
</read_first>
<action>
**tcptop/src/aggregator.rs:**
Implement ConnectionTable that processes CollectorEvents and maintains per-connection state.
```rust
use crate::collector::CollectorEvent;
use crate::model::{ConnectionKey, ConnectionRecord, Protocol, TcpState};
use std::collections::HashMap;
use std::time::{Duration, Instant};
const UDP_IDLE_TIMEOUT: Duration = Duration::from_secs(5); // per D-06
pub struct ConnectionTable {
connections: HashMap<ConnectionKey, ConnectionRecord>,
last_tick: Instant,
closed_connections: Vec<ConnectionRecord>, // per D-14: buffer for [CLOSED] output
}
impl ConnectionTable {
pub fn new() -> Self {
Self {
connections: HashMap::new(),
last_tick: Instant::now(),
closed_connections: Vec::new(),
}
}
/// Seed with pre-existing connections from /proc bootstrap
pub fn seed(&mut self, records: Vec<ConnectionRecord>) {
for record in records {
self.connections.insert(record.key.clone(), record);
}
}
/// Process a single collector event, updating the connection table
pub fn update(&mut self, event: CollectorEvent) {
match event {
CollectorEvent::TcpSend { key, pid, comm, bytes, srtt_us } => {
let record = self.get_or_create(&key, pid, &comm, Protocol::Tcp);
record.bytes_out += bytes as u64; // per DATA-01
record.packets_out += 1; // per DATA-02
if srtt_us > 0 {
record.rtt_us = Some(srtt_us); // per DATA-05 (already >>3 in collector)
}
record.last_seen = Instant::now();
}
CollectorEvent::TcpRecv { key, pid, comm, bytes, srtt_us } => {
let record = self.get_or_create(&key, pid, &comm, Protocol::Tcp);
record.bytes_in += bytes as u64; // per DATA-01
record.packets_in += 1; // per DATA-02
if srtt_us > 0 {
record.rtt_us = Some(srtt_us);
}
record.last_seen = Instant::now();
}
CollectorEvent::UdpSend { key, pid, comm, bytes } => {
let record = self.get_or_create(&key, pid, &comm, Protocol::Udp);
record.bytes_out += bytes as u64;
record.packets_out += 1;
record.last_seen = Instant::now();
}
CollectorEvent::UdpRecv { key, pid, comm, bytes } => {
let record = self.get_or_create(&key, pid, &comm, Protocol::Udp);
record.bytes_in += bytes as u64;
record.packets_in += 1;
record.last_seen = Instant::now();
}
CollectorEvent::TcpStateChange { key, pid, old_state: _, new_state } => {
if let Some(record) = self.connections.get_mut(&key) {
record.tcp_state = TcpState::from_kernel(new_state);
// PID=0 enrichment (Pitfall 3 from Plan 02): inet_sock_set_state
// fires in softirq context where PID=0. Inherit PID from the
// existing record which was populated by earlier kprobe events.
if pid != 0 {
record.pid = pid;
}
// per D-12: mark as closed, will be removed next tick
if new_state == 7 { // TCP_CLOSE
record.is_closed = true;
}
}
// If no existing record for this key, the state change is for a
// connection we haven't seen data for -- ignore it (no PID to attribute)
}
}
}
/// Called periodically to calculate rates and expire flows
/// Returns (active connections, newly closed connections for [CLOSED] output)
pub fn tick(&mut self) -> (Vec<&ConnectionRecord>, Vec<ConnectionRecord>) {
let now = Instant::now();
let dt = now.duration_since(self.last_tick).as_secs_f64();
if dt <= 0.0 {
return (self.connections.values().collect(), Vec::new());
}
// Calculate bandwidth rates per DATA-06
for record in self.connections.values_mut() {
record.rate_in = (record.bytes_in.saturating_sub(record.prev_bytes_in)) as f64 / dt;
record.rate_out = (record.bytes_out.saturating_sub(record.prev_bytes_out)) as f64 / dt;
record.prev_bytes_in = record.bytes_in;
record.prev_bytes_out = record.bytes_out;
}
// Collect closed TCP connections for [CLOSED] output per D-14
let mut closed = Vec::new();
let mut to_remove = Vec::new();
for (key, record) in &self.connections {
if record.is_closed {
closed.push(record.clone());
to_remove.push(key.clone());
}
}
for key in to_remove {
self.connections.remove(&key);
}
// Expire UDP flows idle > 5 seconds per D-06
self.connections.retain(|_key, record| {
if record.key.protocol == Protocol::Udp {
record.last_seen.elapsed() < UDP_IDLE_TIMEOUT
} else {
true
}
});
self.last_tick = now;
(self.connections.values().collect(), closed)
}
fn get_or_create(&mut self, key: &ConnectionKey, pid: u32, comm: &str, protocol: Protocol) -> &mut ConnectionRecord {
self.connections.entry(key.clone()).or_insert_with(|| {
ConnectionRecord {
key: key.clone(),
pid,
process_name: comm.to_string(),
tcp_state: if protocol == Protocol::Tcp { Some(TcpState::Established) } else { None },
bytes_in: 0,
bytes_out: 0,
packets_in: 0,
packets_out: 0,
rate_in: 0.0,
rate_out: 0.0,
prev_bytes_in: 0,
prev_bytes_out: 0,
rtt_us: None,
last_seen: Instant::now(),
is_partial: false,
is_closed: false,
}
})
}
pub fn connection_count(&self) -> usize {
self.connections.len()
}
}
```
**tcptop/src/output.rs (per D-01, D-02, D-03, D-04, D-14):**
Streaming stdout formatter. Each tick prints all active connections, one per line. Closed connections get a `[CLOSED]` line.
```rust
use crate::model::{ConnectionRecord, Protocol};
/// Format bytes into human-readable string per D-02
/// Shows raw value alongside when it fits: "1258291 (1.2M)"
/// If too noisy (raw < 1024), just show raw: "340B"
pub fn format_bytes(bytes: u64) -> String {
if bytes < 1024 {
format!("{}B", bytes)
} else if bytes < 1024 * 1024 {
format!("{} ({:.1}K)", bytes, bytes as f64 / 1024.0)
} else if bytes < 1024 * 1024 * 1024 {
format!("{} ({:.1}M)", bytes, bytes as f64 / (1024.0 * 1024.0))
} else {
format!("{} ({:.1}G)", bytes, bytes as f64 / (1024.0 * 1024.0 * 1024.0))
}
}
/// Format rate (bytes/sec) into human-readable string
pub fn format_rate(rate: f64) -> String {
if rate < 1.0 {
"0B/s".to_string()
} else if rate < 1024.0 {
format!("{:.0}B/s", rate)
} else if rate < 1024.0 * 1024.0 {
format!("{:.1}KB/s", rate / 1024.0)
} else {
format!("{:.1}MB/s", rate / (1024.0 * 1024.0))
}
}
/// Format RTT in microseconds to human-readable
pub fn format_rtt(rtt_us: Option<u32>) -> String {
match rtt_us {
None => "-".to_string(),
Some(us) if us < 1000 => format!("{:.0}us", us),
Some(us) if us < 1_000_000 => format!("{:.1}ms", us as f64 / 1000.0),
Some(us) => format!("{:.1}s", us as f64 / 1_000_000.0),
}
}
/// Print header line (called once at startup)
pub fn print_header() {
println!(
"{:<5} {:<22} {:<22} {:<7} {:<12} {:<12} {:<14} {:<14} {:<8} {:<8} {:<8} {:<10} {:<10}",
"PROTO", "LOCAL", "REMOTE", "PID", "PROCESS", "STATE",
"BYTES_IN", "BYTES_OUT", "PKTS_IN", "PKTS_OUT", "RTT",
"RATE_IN", "RATE_OUT"
);
}
/// Print one connection line per D-03
pub fn print_connection(record: &ConnectionRecord) {
let proto = match record.key.protocol {
Protocol::Tcp => "TCP",
Protocol::Udp => "UDP",
};
let local = format!("{}:{}", record.key.local_addr, record.key.local_port);
let remote = format!("{}:{}", record.key.remote_addr, record.key.remote_port);
let state = match &record.tcp_state {
Some(s) => s.as_str(),
None => "UDP", // per D-07: show "UDP" for UDP flows
};
let partial_marker = if record.is_partial { " [PARTIAL]" } else { "" };
println!(
"{:<5} {:<22} {:<22} {:<7} {:<12} {:<12} {:<14} {:<14} {:<8} {:<8} {:<8} {:<10} {:<10}{}",
proto,
local,
remote,
record.pid,
truncate(&record.process_name, 11),
state,
format_bytes(record.bytes_in),
format_bytes(record.bytes_out),
record.packets_in,
record.packets_out,
format_rtt(record.rtt_us),
format_rate(record.rate_in),
format_rate(record.rate_out),
partial_marker,
);
}
/// Print [CLOSED] line per D-14
pub fn print_closed(record: &ConnectionRecord) {
println!(
"[CLOSED] {} {}:{} -> {}:{} ({}, PID {})",
match record.key.protocol { Protocol::Tcp => "TCP", Protocol::Udp => "UDP" },
record.key.local_addr,
record.key.local_port,
record.key.remote_addr,
record.key.remote_port,
record.process_name,
record.pid,
);
}
/// Print all connections for one tick cycle per D-01
pub fn print_tick(active: &[&ConnectionRecord], closed: &[ConnectionRecord]) {
for record in active {
print_connection(record);
}
for record in closed {
print_closed(record);
}
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}~", &s[..max - 1])
}
}
```
Note: The exact column widths and format strings may need adjustment after seeing real data. Per D-04, this is throwaway scaffolding -- prioritize getting data visible over pixel-perfect formatting.
</action>
<verify>
<automated>cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -10</automated>
</verify>
<acceptance_criteria>
- tcptop/src/aggregator.rs contains `pub struct ConnectionTable`
- tcptop/src/aggregator.rs contains `pub fn tick(&mut self)` that returns active and closed connections
- tcptop/src/aggregator.rs contains `pub fn update(&mut self, event: CollectorEvent)`
- tcptop/src/aggregator.rs contains `pub fn seed(&mut self, records: Vec<ConnectionRecord>)`
- tcptop/src/aggregator.rs contains `Duration::from_secs(5)` (UDP idle timeout per D-06)
- tcptop/src/aggregator.rs contains `rate_in =` AND `rate_out =` (bandwidth calculation per DATA-06)
- tcptop/src/aggregator.rs contains `is_closed = true` (TCP close detection per D-12)
- tcptop/src/aggregator.rs contains `connections.retain` (UDP expiry)
- tcptop/src/aggregator.rs contains `if pid != 0` (PID=0 enrichment for TcpStateChange per Pitfall 3)
- tcptop/src/output.rs contains `pub fn format_bytes(bytes: u64) -> String`
- tcptop/src/output.rs contains `pub fn format_rate(rate: f64) -> String`
- tcptop/src/output.rs contains `[CLOSED]` (per D-14)
- tcptop/src/output.rs contains `[PARTIAL]` (per D-15)
- tcptop/src/output.rs contains `"UDP"` in state column context (per D-07)
- `cargo check -p tcptop` succeeds
</acceptance_criteria>
<done>ConnectionTable processes events with bandwidth rate calculation, UDP timeout, and PID=0 enrichment for state change events. Output formatter produces streaming lines with human-readable sizes, [CLOSED] events, and [PARTIAL] markers.</done>
</task>
<task type="auto">
<name>Task 2: Wire tokio event loop in main.rs and add pipeline unit test</name>
<files>
tcptop/src/main.rs,
tcptop/tests/pipeline_test.rs
</files>
<read_first>
tcptop/src/main.rs (current state from Plan 01),
tcptop/src/collector/mod.rs (NetworkCollector trait, CollectorEvent),
tcptop/src/collector/linux.rs (LinuxCollector::new),
tcptop/src/aggregator.rs (ConnectionTable API from Task 1),
tcptop/src/output.rs (print_header, print_tick from Task 1),
tcptop/src/privilege.rs (check_privileges),
.planning/phases/01-data-pipeline/01-CONTEXT.md (D-01 streaming output, D-04 throwaway scaffolding)
</read_first>
<action>
**Part A: Replace the placeholder main.rs with the full tokio event loop.**
```rust
mod aggregator;
mod collector;
mod model;
mod output;
mod privilege;
mod proc_bootstrap;
use aggregator::ConnectionTable;
use collector::linux::LinuxCollector;
use collector::NetworkCollector;
use anyhow::Result;
use log::info;
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
// Privilege check per D-09, D-10, D-11
privilege::check_privileges();
// Create collector and aggregator
let mut collector = LinuxCollector::new()?;
let mut table = ConnectionTable::new();
// Bootstrap pre-existing connections per D-15
match collector.bootstrap_existing() {
Ok(existing) => {
info!("Bootstrapped {} pre-existing connections", existing.len());
table.seed(existing);
}
Err(e) => {
log::warn!("Failed to bootstrap existing connections: {}", e);
}
}
// Channel for collector -> aggregator communication
// Buffer size 4096 to avoid blocking the RingBuf reader (per anti-pattern in research)
let (tx, mut rx) = mpsc::channel(4096);
// Spawn collector in background task
let collector_handle = tokio::spawn(async move {
if let Err(e) = collector.start(tx).await {
log::error!("Collector error: {}", e);
}
});
// Print header once
output::print_header();
// Tick interval for rate calculation and output (1 second)
let mut tick = interval(Duration::from_secs(1));
// Signal handling for graceful shutdown
let mut sigint = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::interrupt()
)?;
let mut sigterm = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::terminate()
)?;
loop {
tokio::select! {
// Process incoming events from collector
Some(event) = rx.recv() => {
table.update(event);
}
// Periodic tick: calculate rates and print
_ = tick.tick() => {
let (active, closed) = table.tick();
output::print_tick(&active, &closed);
}
// Graceful shutdown on SIGINT or SIGTERM
_ = sigint.recv() => {
info!("Received SIGINT, shutting down");
break;
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down");
break;
}
}
}
// Cleanup: collector handle will be dropped, which drops the Ebpf struct
// and detaches all eBPF programs
collector_handle.abort();
Ok(())
}
```
**Key behaviors:**
- Privilege check runs FIRST, before any async setup
- Bootstrap pre-existing connections BEFORE starting collector
- mpsc channel buffer = 4096 (decouples RingBuf reader from output per anti-pattern guidance)
- `tokio::select!` multiplexes event processing, periodic tick, and signal handling
- 1-second tick interval for rate calculation and output refresh
- SIGINT and SIGTERM handled for graceful shutdown (detaches eBPF programs)
- Output is streaming lines per D-01: no screen clearing, no cursor manipulation
- Header printed once at startup
**Note on signal handling:** The code uses `tokio::signal` instead of the `signal-hook` crate for simplicity since we're already in a tokio context. If `signal-hook` is preferred (it's in Cargo.toml), the executor may swap -- the behavior is identical.
Ensure all `mod` declarations match the actual file structure. The module tree should be:
- `main.rs` -> `mod aggregator`, `mod collector`, `mod model`, `mod output`, `mod privilege`, `mod proc_bootstrap`
- `collector/mod.rs` -> `pub mod linux`
**Part B: Create a unit test that proves the data pipeline works end-to-end without eBPF/root.**
Create `tcptop/tests/pipeline_test.rs` -- an integration test that feeds synthetic `CollectorEvent`s through `ConnectionTable.update()` then calls `tick()` and verifies the output.
```rust
//! Integration test: proves CollectorEvent -> ConnectionTable -> output pipeline
//! works without requiring eBPF, root, or Linux.
use std::net::{IpAddr, Ipv4Addr};
// These imports reference the tcptop crate's public modules.
// If modules are not public, make aggregator, collector (CollectorEvent), model,
// and output pub(crate) or add a test helper. Adjust as needed.
#[test]
fn test_pipeline_tcp_send_updates_table() {
// Create a ConnectionTable
let mut table = tcptop::aggregator::ConnectionTable::new();
// Synthesize a TcpSend event
let key = tcptop::model::ConnectionKey {
protocol: tcptop::model::Protocol::Tcp,
local_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
local_port: 12345,
remote_addr: IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)),
remote_port: 443,
};
let event = tcptop::collector::CollectorEvent::TcpSend {
key: key.clone(),
pid: 1234,
comm: "curl".to_string(),
bytes: 1500,
srtt_us: 5000,
};
// Feed event into aggregator
table.update(event);
// Tick to calculate rates
let (active, closed) = table.tick();
// Verify: one active connection, no closed
assert_eq!(active.len(), 1);
assert_eq!(closed.len(), 0);
// Verify connection has correct data
let record = active[0];
assert_eq!(record.pid, 1234);
assert_eq!(record.bytes_out, 1500);
assert_eq!(record.packets_out, 1);
assert_eq!(record.rtt_us, Some(5000));
assert!(!record.is_closed);
assert!(!record.is_partial);
}
#[test]
fn test_pipeline_tcp_close_lifecycle() {
let mut table = tcptop::aggregator::ConnectionTable::new();
let key = tcptop::model::ConnectionKey {
protocol: tcptop::model::Protocol::Tcp,
local_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
local_port: 12345,
remote_addr: IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)),
remote_port: 443,
};
// First: a data event creates the connection
table.update(tcptop::collector::CollectorEvent::TcpSend {
key: key.clone(), pid: 1234, comm: "curl".to_string(), bytes: 100, srtt_us: 0,
});
// Then: a state change to CLOSE (state 7)
table.update(tcptop::collector::CollectorEvent::TcpStateChange {
key: key.clone(), pid: 0, old_state: 1, new_state: 7,
});
// Tick: should return the closed connection in closed list
let (active, closed) = table.tick();
assert_eq!(active.len(), 0, "closed connection should not be in active list");
assert_eq!(closed.len(), 1, "closed connection should appear in closed list");
assert_eq!(closed[0].pid, 1234, "PID should be inherited from earlier event, not 0");
}
#[test]
fn test_output_format_bytes() {
assert_eq!(tcptop::output::format_bytes(500), "500B");
assert!(tcptop::output::format_bytes(2048).contains("K"));
assert!(tcptop::output::format_bytes(2_000_000).contains("M"));
}
#[test]
fn test_output_format_rate() {
assert_eq!(tcptop::output::format_rate(0.0), "0B/s");
assert!(tcptop::output::format_rate(2048.0).contains("KB/s"));
}
```
**IMPORTANT:** The test imports assume `aggregator`, `collector`, `model`, and `output` are accessible from integration tests. If the crate's modules are private (`mod` not `pub mod`), the executor must either:
(a) Make the relevant modules `pub` in main.rs (preferred for a binary crate with tests), or
(b) Convert to unit tests inside `aggregator.rs` using `#[cfg(test)] mod tests { ... }`.
The key requirement is that the tests exist and pass, proving the data flow works without eBPF/root.
</action>
<verify>
<automated>cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -5 && cargo test -p tcptop --test pipeline_test 2>&1 | tail -20</automated>
</verify>
<acceptance_criteria>
- tcptop/src/main.rs contains `#[tokio::main]`
- tcptop/src/main.rs contains `privilege::check_privileges()` BEFORE any async work
- tcptop/src/main.rs contains `LinuxCollector::new()`
- tcptop/src/main.rs contains `ConnectionTable::new()`
- tcptop/src/main.rs contains `collector.bootstrap_existing()`
- tcptop/src/main.rs contains `table.seed(`
- tcptop/src/main.rs contains `mpsc::channel(` with buffer size >= 1024
- tcptop/src/main.rs contains `collector.start(tx)` inside a tokio::spawn
- tcptop/src/main.rs contains `table.update(event)` (processing events from channel)
- tcptop/src/main.rs contains `table.tick()` (periodic rate calculation)
- tcptop/src/main.rs contains `output::print_tick(` (formatted output)
- tcptop/src/main.rs contains `output::print_header()` (header printed once)
- tcptop/src/main.rs contains `tokio::select!` (event loop multiplexing)
- tcptop/src/main.rs contains `SignalKind::interrupt` OR `signal::ctrl_c` (graceful shutdown)
- tcptop/src/main.rs contains `mod aggregator` AND `mod collector` AND `mod model` AND `mod output` AND `mod privilege` AND `mod proc_bootstrap`
- tcptop/tests/pipeline_test.rs exists with at least 3 tests
- `cargo check -p tcptop` succeeds
- `cargo test -p tcptop --test pipeline_test` passes (all tests green)
</acceptance_criteria>
<done>Main event loop wires collector, aggregator, and output together with tokio::select! multiplexing. Pipeline unit tests prove data flow from synthetic CollectorEvents through ConnectionTable to output formatters -- verifiable without eBPF, root, or Linux. Privilege check runs first, pre-existing connections bootstrapped, events flow from eBPF through channel to aggregator to stdout. Graceful shutdown on SIGINT/SIGTERM.</done>
</task>
</tasks>
<verification>
1. `cargo check -p tcptop` exits 0 (full pipeline compiles)
2. `cargo test -p tcptop --test pipeline_test` exits 0 (pipeline data flow proven)
3. `grep "tokio::main" tcptop/src/main.rs` returns a match
4. `grep "ConnectionTable" tcptop/src/aggregator.rs` returns a match
5. `grep "format_bytes" tcptop/src/output.rs` returns a match
6. `grep "CLOSED" tcptop/src/output.rs` returns a match
7. `grep "select!" tcptop/src/main.rs` returns a match
8. `grep "check_privileges" tcptop/src/main.rs` returns a match
9. `grep "pid != 0" tcptop/src/aggregator.rs` returns a match (PID=0 enrichment)
</verification>
<success_criteria>
- ConnectionTable processes all CollectorEvent variants and calculates bandwidth rates
- PID=0 enrichment: TcpStateChange with pid=0 inherits PID from existing connection record
- UDP flows expire after 5 seconds of idle per D-06
- Closed TCP connections produce [CLOSED] line per D-14 and are removed
- Pre-existing connections marked [PARTIAL] per D-15
- Human-readable byte formatting per D-02 (e.g., "1258291 (1.2M)")
- Output is streaming lines per D-01 (no cursor manipulation)
- Main event loop multiplexes events, periodic output, and signal handling
- Full pipeline compiles: privilege -> bootstrap -> collect -> aggregate -> output
- Pipeline unit tests pass, proving data flow without eBPF/root
</success_criteria>
<output>
After completion, create `.planning/phases/01-data-pipeline/01-03-SUMMARY.md`
</output>