700 lines
30 KiB
Markdown
700 lines
30 KiB
Markdown
---
|
|
phase: 01-data-pipeline
|
|
plan: 03
|
|
type: execute
|
|
wave: 3
|
|
depends_on: ["01-01", "01-02"]
|
|
files_modified:
|
|
- tcptop/src/aggregator.rs
|
|
- tcptop/src/output.rs
|
|
- tcptop/src/main.rs
|
|
- tcptop/tests/pipeline_test.rs
|
|
autonomous: true
|
|
requirements:
|
|
- DATA-06
|
|
- DATA-07
|
|
|
|
must_haves:
|
|
truths:
|
|
- "Running tcptop on Linux with root prints per-connection data as streaming lines to stdout"
|
|
- "Each line shows protocol, local/remote addr+port, PID, process name, TCP state, bytes in/out, packets in/out, RTT, bandwidth rate"
|
|
- "UDP flows are tracked by 4-tuple and expire after 5 seconds of idle"
|
|
- "Bandwidth rates (KB/s, MB/s) are calculated per connection per tick interval"
|
|
- "Closed TCP connections print a [CLOSED] line and are removed after one tick"
|
|
- "Pre-existing connections show [PARTIAL] marker"
|
|
- "Human-readable sizes are used (1.2 MB, 340 KB/s)"
|
|
artifacts:
|
|
- path: "tcptop/src/aggregator.rs"
|
|
provides: "ConnectionTable with tick-based rate calculation and UDP timeout"
|
|
contains: "struct ConnectionTable"
|
|
- path: "tcptop/src/output.rs"
|
|
provides: "Streaming stdout formatter with human-readable sizes"
|
|
contains: "fn format_bytes"
|
|
- path: "tcptop/src/main.rs"
|
|
provides: "Tokio event loop wiring collector, aggregator, and output"
|
|
contains: "#[tokio::main]"
|
|
- path: "tcptop/tests/pipeline_test.rs"
|
|
provides: "Unit test proving data flows from CollectorEvent through aggregator to output"
|
|
contains: "fn test_pipeline"
|
|
key_links:
|
|
- from: "tcptop/src/main.rs"
|
|
to: "tcptop/src/collector/linux.rs"
|
|
via: "creates LinuxCollector and calls start()"
|
|
pattern: "LinuxCollector"
|
|
- from: "tcptop/src/main.rs"
|
|
to: "tcptop/src/aggregator.rs"
|
|
via: "receives CollectorEvents and updates ConnectionTable"
|
|
pattern: "ConnectionTable"
|
|
- from: "tcptop/src/aggregator.rs"
|
|
to: "tcptop/src/output.rs"
|
|
via: "passes connection records for formatting"
|
|
pattern: "format_connections"
|
|
---
|
|
|
|
<objective>
|
|
Implement the connection aggregator (bandwidth calculation, UDP timeout, lifecycle management), the streaming stdout output formatter, and wire everything together in main.rs with a tokio event loop.
|
|
|
|
Purpose: Complete the proof-of-life data pipeline -- from kernel events through aggregation to human-readable streaming output. This plan delivers the Phase 1 success criteria: running `tcptop` prints real per-connection data.
|
|
|
|
Output: Working end-to-end pipeline from eBPF events to formatted stdout lines with bandwidth rates, UDP flow management, and connection lifecycle handling.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/workflows/execute-plan.md
|
|
@/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/01-data-pipeline/01-CONTEXT.md
|
|
@.planning/phases/01-data-pipeline/01-RESEARCH.md
|
|
@.planning/phases/01-data-pipeline/01-01-SUMMARY.md
|
|
@.planning/phases/01-data-pipeline/01-02-SUMMARY.md
|
|
|
|
<interfaces>
|
|
<!-- From Plan 01 and Plan 02 -->
|
|
|
|
From tcptop/src/model.rs:
|
|
- ConnectionKey { protocol: Protocol, local_addr: IpAddr, local_port: u16, remote_addr: IpAddr, remote_port: u16 }
|
|
- ConnectionRecord { key, pid, process_name, tcp_state: Option<TcpState>, bytes_in/out: u64, packets_in/out: u64, rate_in/out: f64, prev_bytes_in/out: u64, rtt_us: Option<u32>, last_seen: Instant, is_partial: bool, is_closed: bool }
|
|
- Protocol { Tcp, Udp }
|
|
- TcpState with as_str() -> &str and from_kernel(u32)
|
|
|
|
From tcptop/src/collector/mod.rs:
|
|
- enum CollectorEvent { TcpSend{key, pid, comm, bytes, srtt_us}, TcpRecv{..}, UdpSend{key, pid, comm, bytes}, UdpRecv{..}, TcpStateChange{key, pid, old_state, new_state} }
|
|
- trait NetworkCollector { start(tx), stop(), bootstrap_existing() }
|
|
|
|
From tcptop/src/collector/linux.rs:
|
|
- LinuxCollector::new() -> Result<Self>
|
|
</interfaces>
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto">
|
|
<name>Task 1: Implement aggregator and output formatter</name>
|
|
<files>
|
|
tcptop/src/aggregator.rs,
|
|
tcptop/src/output.rs
|
|
</files>
|
|
<read_first>
|
|
tcptop/src/model.rs (ConnectionRecord, ConnectionKey, TcpState -- exact field names and types),
|
|
tcptop/src/collector/mod.rs (CollectorEvent enum variants -- exact field names),
|
|
.planning/phases/01-data-pipeline/01-RESEARCH.md (Pattern 3: Connection Table code, output format example),
|
|
.planning/phases/01-data-pipeline/01-CONTEXT.md (D-01 through D-08, D-12, D-14, D-15 for output behavior)
|
|
</read_first>
|
|
<action>
|
|
**tcptop/src/aggregator.rs:**
|
|
|
|
Implement ConnectionTable that processes CollectorEvents and maintains per-connection state.
|
|
|
|
```rust
|
|
use crate::collector::CollectorEvent;
|
|
use crate::model::{ConnectionKey, ConnectionRecord, Protocol, TcpState};
|
|
use std::collections::HashMap;
|
|
use std::time::{Duration, Instant};
|
|
|
|
const UDP_IDLE_TIMEOUT: Duration = Duration::from_secs(5); // per D-06
|
|
|
|
pub struct ConnectionTable {
|
|
connections: HashMap<ConnectionKey, ConnectionRecord>,
|
|
last_tick: Instant,
|
|
closed_connections: Vec<ConnectionRecord>, // per D-14: buffer for [CLOSED] output
|
|
}
|
|
|
|
impl ConnectionTable {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
connections: HashMap::new(),
|
|
last_tick: Instant::now(),
|
|
closed_connections: Vec::new(),
|
|
}
|
|
}
|
|
|
|
/// Seed with pre-existing connections from /proc bootstrap
|
|
pub fn seed(&mut self, records: Vec<ConnectionRecord>) {
|
|
for record in records {
|
|
self.connections.insert(record.key.clone(), record);
|
|
}
|
|
}
|
|
|
|
/// Process a single collector event, updating the connection table
|
|
pub fn update(&mut self, event: CollectorEvent) {
|
|
match event {
|
|
CollectorEvent::TcpSend { key, pid, comm, bytes, srtt_us } => {
|
|
let record = self.get_or_create(&key, pid, &comm, Protocol::Tcp);
|
|
record.bytes_out += bytes as u64; // per DATA-01
|
|
record.packets_out += 1; // per DATA-02
|
|
if srtt_us > 0 {
|
|
record.rtt_us = Some(srtt_us); // per DATA-05 (already >>3 in collector)
|
|
}
|
|
record.last_seen = Instant::now();
|
|
}
|
|
CollectorEvent::TcpRecv { key, pid, comm, bytes, srtt_us } => {
|
|
let record = self.get_or_create(&key, pid, &comm, Protocol::Tcp);
|
|
record.bytes_in += bytes as u64; // per DATA-01
|
|
record.packets_in += 1; // per DATA-02
|
|
if srtt_us > 0 {
|
|
record.rtt_us = Some(srtt_us);
|
|
}
|
|
record.last_seen = Instant::now();
|
|
}
|
|
CollectorEvent::UdpSend { key, pid, comm, bytes } => {
|
|
let record = self.get_or_create(&key, pid, &comm, Protocol::Udp);
|
|
record.bytes_out += bytes as u64;
|
|
record.packets_out += 1;
|
|
record.last_seen = Instant::now();
|
|
}
|
|
CollectorEvent::UdpRecv { key, pid, comm, bytes } => {
|
|
let record = self.get_or_create(&key, pid, &comm, Protocol::Udp);
|
|
record.bytes_in += bytes as u64;
|
|
record.packets_in += 1;
|
|
record.last_seen = Instant::now();
|
|
}
|
|
CollectorEvent::TcpStateChange { key, pid, old_state: _, new_state } => {
|
|
if let Some(record) = self.connections.get_mut(&key) {
|
|
record.tcp_state = TcpState::from_kernel(new_state);
|
|
// PID=0 enrichment (Pitfall 3 from Plan 02): inet_sock_set_state
|
|
// fires in softirq context where PID=0. Inherit PID from the
|
|
// existing record which was populated by earlier kprobe events.
|
|
if pid != 0 {
|
|
record.pid = pid;
|
|
}
|
|
// per D-12: mark as closed, will be removed next tick
|
|
if new_state == 7 { // TCP_CLOSE
|
|
record.is_closed = true;
|
|
}
|
|
}
|
|
// If no existing record for this key, the state change is for a
|
|
// connection we haven't seen data for -- ignore it (no PID to attribute)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Called periodically to calculate rates and expire flows
|
|
/// Returns (active connections, newly closed connections for [CLOSED] output)
|
|
pub fn tick(&mut self) -> (Vec<&ConnectionRecord>, Vec<ConnectionRecord>) {
|
|
let now = Instant::now();
|
|
let dt = now.duration_since(self.last_tick).as_secs_f64();
|
|
if dt <= 0.0 {
|
|
return (self.connections.values().collect(), Vec::new());
|
|
}
|
|
|
|
// Calculate bandwidth rates per DATA-06
|
|
for record in self.connections.values_mut() {
|
|
record.rate_in = (record.bytes_in.saturating_sub(record.prev_bytes_in)) as f64 / dt;
|
|
record.rate_out = (record.bytes_out.saturating_sub(record.prev_bytes_out)) as f64 / dt;
|
|
record.prev_bytes_in = record.bytes_in;
|
|
record.prev_bytes_out = record.bytes_out;
|
|
}
|
|
|
|
// Collect closed TCP connections for [CLOSED] output per D-14
|
|
let mut closed = Vec::new();
|
|
let mut to_remove = Vec::new();
|
|
for (key, record) in &self.connections {
|
|
if record.is_closed {
|
|
closed.push(record.clone());
|
|
to_remove.push(key.clone());
|
|
}
|
|
}
|
|
for key in to_remove {
|
|
self.connections.remove(&key);
|
|
}
|
|
|
|
// Expire UDP flows idle > 5 seconds per D-06
|
|
self.connections.retain(|_key, record| {
|
|
if record.key.protocol == Protocol::Udp {
|
|
record.last_seen.elapsed() < UDP_IDLE_TIMEOUT
|
|
} else {
|
|
true
|
|
}
|
|
});
|
|
|
|
self.last_tick = now;
|
|
(self.connections.values().collect(), closed)
|
|
}
|
|
|
|
fn get_or_create(&mut self, key: &ConnectionKey, pid: u32, comm: &str, protocol: Protocol) -> &mut ConnectionRecord {
|
|
self.connections.entry(key.clone()).or_insert_with(|| {
|
|
ConnectionRecord {
|
|
key: key.clone(),
|
|
pid,
|
|
process_name: comm.to_string(),
|
|
tcp_state: if protocol == Protocol::Tcp { Some(TcpState::Established) } else { None },
|
|
bytes_in: 0,
|
|
bytes_out: 0,
|
|
packets_in: 0,
|
|
packets_out: 0,
|
|
rate_in: 0.0,
|
|
rate_out: 0.0,
|
|
prev_bytes_in: 0,
|
|
prev_bytes_out: 0,
|
|
rtt_us: None,
|
|
last_seen: Instant::now(),
|
|
is_partial: false,
|
|
is_closed: false,
|
|
}
|
|
})
|
|
}
|
|
|
|
pub fn connection_count(&self) -> usize {
|
|
self.connections.len()
|
|
}
|
|
}
|
|
```
|
|
|
|
**tcptop/src/output.rs (per D-01, D-02, D-03, D-04, D-14):**
|
|
|
|
Streaming stdout formatter. Each tick prints all active connections, one per line. Closed connections get a `[CLOSED]` line.
|
|
|
|
```rust
|
|
use crate::model::{ConnectionRecord, Protocol};
|
|
|
|
/// Format bytes into human-readable string per D-02
|
|
/// Shows raw value alongside when it fits: "1258291 (1.2M)"
|
|
/// If too noisy (raw < 1024), just show raw: "340B"
|
|
pub fn format_bytes(bytes: u64) -> String {
|
|
if bytes < 1024 {
|
|
format!("{}B", bytes)
|
|
} else if bytes < 1024 * 1024 {
|
|
format!("{} ({:.1}K)", bytes, bytes as f64 / 1024.0)
|
|
} else if bytes < 1024 * 1024 * 1024 {
|
|
format!("{} ({:.1}M)", bytes, bytes as f64 / (1024.0 * 1024.0))
|
|
} else {
|
|
format!("{} ({:.1}G)", bytes, bytes as f64 / (1024.0 * 1024.0 * 1024.0))
|
|
}
|
|
}
|
|
|
|
/// Format rate (bytes/sec) into human-readable string
|
|
pub fn format_rate(rate: f64) -> String {
|
|
if rate < 1.0 {
|
|
"0B/s".to_string()
|
|
} else if rate < 1024.0 {
|
|
format!("{:.0}B/s", rate)
|
|
} else if rate < 1024.0 * 1024.0 {
|
|
format!("{:.1}KB/s", rate / 1024.0)
|
|
} else {
|
|
format!("{:.1}MB/s", rate / (1024.0 * 1024.0))
|
|
}
|
|
}
|
|
|
|
/// Format RTT in microseconds to human-readable
|
|
pub fn format_rtt(rtt_us: Option<u32>) -> String {
|
|
match rtt_us {
|
|
None => "-".to_string(),
|
|
Some(us) if us < 1000 => format!("{:.0}us", us),
|
|
Some(us) if us < 1_000_000 => format!("{:.1}ms", us as f64 / 1000.0),
|
|
Some(us) => format!("{:.1}s", us as f64 / 1_000_000.0),
|
|
}
|
|
}
|
|
|
|
/// Print header line (called once at startup)
|
|
pub fn print_header() {
|
|
println!(
|
|
"{:<5} {:<22} {:<22} {:<7} {:<12} {:<12} {:<14} {:<14} {:<8} {:<8} {:<8} {:<10} {:<10}",
|
|
"PROTO", "LOCAL", "REMOTE", "PID", "PROCESS", "STATE",
|
|
"BYTES_IN", "BYTES_OUT", "PKTS_IN", "PKTS_OUT", "RTT",
|
|
"RATE_IN", "RATE_OUT"
|
|
);
|
|
}
|
|
|
|
/// Print one connection line per D-03
|
|
pub fn print_connection(record: &ConnectionRecord) {
|
|
let proto = match record.key.protocol {
|
|
Protocol::Tcp => "TCP",
|
|
Protocol::Udp => "UDP",
|
|
};
|
|
let local = format!("{}:{}", record.key.local_addr, record.key.local_port);
|
|
let remote = format!("{}:{}", record.key.remote_addr, record.key.remote_port);
|
|
let state = match &record.tcp_state {
|
|
Some(s) => s.as_str(),
|
|
None => "UDP", // per D-07: show "UDP" for UDP flows
|
|
};
|
|
let partial_marker = if record.is_partial { " [PARTIAL]" } else { "" };
|
|
|
|
println!(
|
|
"{:<5} {:<22} {:<22} {:<7} {:<12} {:<12} {:<14} {:<14} {:<8} {:<8} {:<8} {:<10} {:<10}{}",
|
|
proto,
|
|
local,
|
|
remote,
|
|
record.pid,
|
|
truncate(&record.process_name, 11),
|
|
state,
|
|
format_bytes(record.bytes_in),
|
|
format_bytes(record.bytes_out),
|
|
record.packets_in,
|
|
record.packets_out,
|
|
format_rtt(record.rtt_us),
|
|
format_rate(record.rate_in),
|
|
format_rate(record.rate_out),
|
|
partial_marker,
|
|
);
|
|
}
|
|
|
|
/// Print [CLOSED] line per D-14
|
|
pub fn print_closed(record: &ConnectionRecord) {
|
|
println!(
|
|
"[CLOSED] {} {}:{} -> {}:{} ({}, PID {})",
|
|
match record.key.protocol { Protocol::Tcp => "TCP", Protocol::Udp => "UDP" },
|
|
record.key.local_addr,
|
|
record.key.local_port,
|
|
record.key.remote_addr,
|
|
record.key.remote_port,
|
|
record.process_name,
|
|
record.pid,
|
|
);
|
|
}
|
|
|
|
/// Print all connections for one tick cycle per D-01
|
|
pub fn print_tick(active: &[&ConnectionRecord], closed: &[ConnectionRecord]) {
|
|
for record in active {
|
|
print_connection(record);
|
|
}
|
|
for record in closed {
|
|
print_closed(record);
|
|
}
|
|
}
|
|
|
|
fn truncate(s: &str, max: usize) -> String {
|
|
if s.len() <= max {
|
|
s.to_string()
|
|
} else {
|
|
format!("{}~", &s[..max - 1])
|
|
}
|
|
}
|
|
```
|
|
|
|
Note: The exact column widths and format strings may need adjustment after seeing real data. Per D-04, this is throwaway scaffolding -- prioritize getting data visible over pixel-perfect formatting.
|
|
</action>
|
|
<verify>
|
|
<automated>cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -10</automated>
|
|
</verify>
|
|
<acceptance_criteria>
|
|
- tcptop/src/aggregator.rs contains `pub struct ConnectionTable`
|
|
- tcptop/src/aggregator.rs contains `pub fn tick(&mut self)` that returns active and closed connections
|
|
- tcptop/src/aggregator.rs contains `pub fn update(&mut self, event: CollectorEvent)`
|
|
- tcptop/src/aggregator.rs contains `pub fn seed(&mut self, records: Vec<ConnectionRecord>)`
|
|
- tcptop/src/aggregator.rs contains `Duration::from_secs(5)` (UDP idle timeout per D-06)
|
|
- tcptop/src/aggregator.rs contains `rate_in =` AND `rate_out =` (bandwidth calculation per DATA-06)
|
|
- tcptop/src/aggregator.rs contains `is_closed = true` (TCP close detection per D-12)
|
|
- tcptop/src/aggregator.rs contains `connections.retain` (UDP expiry)
|
|
- tcptop/src/aggregator.rs contains `if pid != 0` (PID=0 enrichment for TcpStateChange per Pitfall 3)
|
|
- tcptop/src/output.rs contains `pub fn format_bytes(bytes: u64) -> String`
|
|
- tcptop/src/output.rs contains `pub fn format_rate(rate: f64) -> String`
|
|
- tcptop/src/output.rs contains `[CLOSED]` (per D-14)
|
|
- tcptop/src/output.rs contains `[PARTIAL]` (per D-15)
|
|
- tcptop/src/output.rs contains `"UDP"` in state column context (per D-07)
|
|
- `cargo check -p tcptop` succeeds
|
|
</acceptance_criteria>
|
|
<done>ConnectionTable processes events with bandwidth rate calculation, UDP timeout, and PID=0 enrichment for state change events. Output formatter produces streaming lines with human-readable sizes, [CLOSED] events, and [PARTIAL] markers.</done>
|
|
</task>
|
|
|
|
<task type="auto">
|
|
<name>Task 2: Wire tokio event loop in main.rs and add pipeline unit test</name>
|
|
<files>
|
|
tcptop/src/main.rs,
|
|
tcptop/tests/pipeline_test.rs
|
|
</files>
|
|
<read_first>
|
|
tcptop/src/main.rs (current state from Plan 01),
|
|
tcptop/src/collector/mod.rs (NetworkCollector trait, CollectorEvent),
|
|
tcptop/src/collector/linux.rs (LinuxCollector::new),
|
|
tcptop/src/aggregator.rs (ConnectionTable API from Task 1),
|
|
tcptop/src/output.rs (print_header, print_tick from Task 1),
|
|
tcptop/src/privilege.rs (check_privileges),
|
|
.planning/phases/01-data-pipeline/01-CONTEXT.md (D-01 streaming output, D-04 throwaway scaffolding)
|
|
</read_first>
|
|
<action>
|
|
**Part A: Replace the placeholder main.rs with the full tokio event loop.**
|
|
|
|
```rust
|
|
mod aggregator;
|
|
mod collector;
|
|
mod model;
|
|
mod output;
|
|
mod privilege;
|
|
mod proc_bootstrap;
|
|
|
|
use aggregator::ConnectionTable;
|
|
use collector::linux::LinuxCollector;
|
|
use collector::NetworkCollector;
|
|
use anyhow::Result;
|
|
use log::info;
|
|
use tokio::sync::mpsc;
|
|
use tokio::time::{interval, Duration};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
env_logger::init();
|
|
|
|
// Privilege check per D-09, D-10, D-11
|
|
privilege::check_privileges();
|
|
|
|
// Create collector and aggregator
|
|
let mut collector = LinuxCollector::new()?;
|
|
let mut table = ConnectionTable::new();
|
|
|
|
// Bootstrap pre-existing connections per D-15
|
|
match collector.bootstrap_existing() {
|
|
Ok(existing) => {
|
|
info!("Bootstrapped {} pre-existing connections", existing.len());
|
|
table.seed(existing);
|
|
}
|
|
Err(e) => {
|
|
log::warn!("Failed to bootstrap existing connections: {}", e);
|
|
}
|
|
}
|
|
|
|
// Channel for collector -> aggregator communication
|
|
// Buffer size 4096 to avoid blocking the RingBuf reader (per anti-pattern in research)
|
|
let (tx, mut rx) = mpsc::channel(4096);
|
|
|
|
// Spawn collector in background task
|
|
let collector_handle = tokio::spawn(async move {
|
|
if let Err(e) = collector.start(tx).await {
|
|
log::error!("Collector error: {}", e);
|
|
}
|
|
});
|
|
|
|
// Print header once
|
|
output::print_header();
|
|
|
|
// Tick interval for rate calculation and output (1 second)
|
|
let mut tick = interval(Duration::from_secs(1));
|
|
|
|
// Signal handling for graceful shutdown
|
|
let mut sigint = tokio::signal::unix::signal(
|
|
tokio::signal::unix::SignalKind::interrupt()
|
|
)?;
|
|
let mut sigterm = tokio::signal::unix::signal(
|
|
tokio::signal::unix::SignalKind::terminate()
|
|
)?;
|
|
|
|
loop {
|
|
tokio::select! {
|
|
// Process incoming events from collector
|
|
Some(event) = rx.recv() => {
|
|
table.update(event);
|
|
}
|
|
// Periodic tick: calculate rates and print
|
|
_ = tick.tick() => {
|
|
let (active, closed) = table.tick();
|
|
output::print_tick(&active, &closed);
|
|
}
|
|
// Graceful shutdown on SIGINT or SIGTERM
|
|
_ = sigint.recv() => {
|
|
info!("Received SIGINT, shutting down");
|
|
break;
|
|
}
|
|
_ = sigterm.recv() => {
|
|
info!("Received SIGTERM, shutting down");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cleanup: collector handle will be dropped, which drops the Ebpf struct
|
|
// and detaches all eBPF programs
|
|
collector_handle.abort();
|
|
Ok(())
|
|
}
|
|
```
|
|
|
|
**Key behaviors:**
|
|
- Privilege check runs FIRST, before any async setup
|
|
- Bootstrap pre-existing connections BEFORE starting collector
|
|
- mpsc channel buffer = 4096 (decouples RingBuf reader from output per anti-pattern guidance)
|
|
- `tokio::select!` multiplexes event processing, periodic tick, and signal handling
|
|
- 1-second tick interval for rate calculation and output refresh
|
|
- SIGINT and SIGTERM handled for graceful shutdown (detaches eBPF programs)
|
|
- Output is streaming lines per D-01: no screen clearing, no cursor manipulation
|
|
- Header printed once at startup
|
|
|
|
**Note on signal handling:** The code uses `tokio::signal` instead of the `signal-hook` crate for simplicity since we're already in a tokio context. If `signal-hook` is preferred (it's in Cargo.toml), the executor may swap -- the behavior is identical.
|
|
|
|
Ensure all `mod` declarations match the actual file structure. The module tree should be:
|
|
- `main.rs` -> `mod aggregator`, `mod collector`, `mod model`, `mod output`, `mod privilege`, `mod proc_bootstrap`
|
|
- `collector/mod.rs` -> `pub mod linux`
|
|
|
|
**Part B: Create a unit test that proves the data pipeline works end-to-end without eBPF/root.**
|
|
|
|
Create `tcptop/tests/pipeline_test.rs` -- an integration test that feeds synthetic `CollectorEvent`s through `ConnectionTable.update()` then calls `tick()` and verifies the output.
|
|
|
|
```rust
|
|
//! Integration test: proves CollectorEvent -> ConnectionTable -> output pipeline
|
|
//! works without requiring eBPF, root, or Linux.
|
|
|
|
use std::net::{IpAddr, Ipv4Addr};
|
|
|
|
// These imports reference the tcptop crate's public modules.
|
|
// If modules are not public, make aggregator, collector (CollectorEvent), model,
|
|
// and output pub(crate) or add a test helper. Adjust as needed.
|
|
|
|
#[test]
|
|
fn test_pipeline_tcp_send_updates_table() {
|
|
// Create a ConnectionTable
|
|
let mut table = tcptop::aggregator::ConnectionTable::new();
|
|
|
|
// Synthesize a TcpSend event
|
|
let key = tcptop::model::ConnectionKey {
|
|
protocol: tcptop::model::Protocol::Tcp,
|
|
local_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
|
|
local_port: 12345,
|
|
remote_addr: IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)),
|
|
remote_port: 443,
|
|
};
|
|
let event = tcptop::collector::CollectorEvent::TcpSend {
|
|
key: key.clone(),
|
|
pid: 1234,
|
|
comm: "curl".to_string(),
|
|
bytes: 1500,
|
|
srtt_us: 5000,
|
|
};
|
|
|
|
// Feed event into aggregator
|
|
table.update(event);
|
|
|
|
// Tick to calculate rates
|
|
let (active, closed) = table.tick();
|
|
|
|
// Verify: one active connection, no closed
|
|
assert_eq!(active.len(), 1);
|
|
assert_eq!(closed.len(), 0);
|
|
|
|
// Verify connection has correct data
|
|
let record = active[0];
|
|
assert_eq!(record.pid, 1234);
|
|
assert_eq!(record.bytes_out, 1500);
|
|
assert_eq!(record.packets_out, 1);
|
|
assert_eq!(record.rtt_us, Some(5000));
|
|
assert!(!record.is_closed);
|
|
assert!(!record.is_partial);
|
|
}
|
|
|
|
#[test]
|
|
fn test_pipeline_tcp_close_lifecycle() {
|
|
let mut table = tcptop::aggregator::ConnectionTable::new();
|
|
|
|
let key = tcptop::model::ConnectionKey {
|
|
protocol: tcptop::model::Protocol::Tcp,
|
|
local_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
|
|
local_port: 12345,
|
|
remote_addr: IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)),
|
|
remote_port: 443,
|
|
};
|
|
|
|
// First: a data event creates the connection
|
|
table.update(tcptop::collector::CollectorEvent::TcpSend {
|
|
key: key.clone(), pid: 1234, comm: "curl".to_string(), bytes: 100, srtt_us: 0,
|
|
});
|
|
|
|
// Then: a state change to CLOSE (state 7)
|
|
table.update(tcptop::collector::CollectorEvent::TcpStateChange {
|
|
key: key.clone(), pid: 0, old_state: 1, new_state: 7,
|
|
});
|
|
|
|
// Tick: should return the closed connection in closed list
|
|
let (active, closed) = table.tick();
|
|
assert_eq!(active.len(), 0, "closed connection should not be in active list");
|
|
assert_eq!(closed.len(), 1, "closed connection should appear in closed list");
|
|
assert_eq!(closed[0].pid, 1234, "PID should be inherited from earlier event, not 0");
|
|
}
|
|
|
|
#[test]
|
|
fn test_output_format_bytes() {
|
|
assert_eq!(tcptop::output::format_bytes(500), "500B");
|
|
assert!(tcptop::output::format_bytes(2048).contains("K"));
|
|
assert!(tcptop::output::format_bytes(2_000_000).contains("M"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_output_format_rate() {
|
|
assert_eq!(tcptop::output::format_rate(0.0), "0B/s");
|
|
assert!(tcptop::output::format_rate(2048.0).contains("KB/s"));
|
|
}
|
|
```
|
|
|
|
**IMPORTANT:** The test imports assume `aggregator`, `collector`, `model`, and `output` are accessible from integration tests. If the crate's modules are private (`mod` not `pub mod`), the executor must either:
|
|
(a) Make the relevant modules `pub` in main.rs (preferred for a binary crate with tests), or
|
|
(b) Convert to unit tests inside `aggregator.rs` using `#[cfg(test)] mod tests { ... }`.
|
|
The key requirement is that the tests exist and pass, proving the data flow works without eBPF/root.
|
|
</action>
|
|
<verify>
|
|
<automated>cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -5 && cargo test -p tcptop --test pipeline_test 2>&1 | tail -20</automated>
|
|
</verify>
|
|
<acceptance_criteria>
|
|
- tcptop/src/main.rs contains `#[tokio::main]`
|
|
- tcptop/src/main.rs contains `privilege::check_privileges()` BEFORE any async work
|
|
- tcptop/src/main.rs contains `LinuxCollector::new()`
|
|
- tcptop/src/main.rs contains `ConnectionTable::new()`
|
|
- tcptop/src/main.rs contains `collector.bootstrap_existing()`
|
|
- tcptop/src/main.rs contains `table.seed(`
|
|
- tcptop/src/main.rs contains `mpsc::channel(` with buffer size >= 1024
|
|
- tcptop/src/main.rs contains `collector.start(tx)` inside a tokio::spawn
|
|
- tcptop/src/main.rs contains `table.update(event)` (processing events from channel)
|
|
- tcptop/src/main.rs contains `table.tick()` (periodic rate calculation)
|
|
- tcptop/src/main.rs contains `output::print_tick(` (formatted output)
|
|
- tcptop/src/main.rs contains `output::print_header()` (header printed once)
|
|
- tcptop/src/main.rs contains `tokio::select!` (event loop multiplexing)
|
|
- tcptop/src/main.rs contains `SignalKind::interrupt` OR `signal::ctrl_c` (graceful shutdown)
|
|
- tcptop/src/main.rs contains `mod aggregator` AND `mod collector` AND `mod model` AND `mod output` AND `mod privilege` AND `mod proc_bootstrap`
|
|
- tcptop/tests/pipeline_test.rs exists with at least 3 tests
|
|
- `cargo check -p tcptop` succeeds
|
|
- `cargo test -p tcptop --test pipeline_test` passes (all tests green)
|
|
</acceptance_criteria>
|
|
<done>Main event loop wires collector, aggregator, and output together with tokio::select! multiplexing. Pipeline unit tests prove data flow from synthetic CollectorEvents through ConnectionTable to output formatters -- verifiable without eBPF, root, or Linux. Privilege check runs first, pre-existing connections bootstrapped, events flow from eBPF through channel to aggregator to stdout. Graceful shutdown on SIGINT/SIGTERM.</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
1. `cargo check -p tcptop` exits 0 (full pipeline compiles)
|
|
2. `cargo test -p tcptop --test pipeline_test` exits 0 (pipeline data flow proven)
|
|
3. `grep "tokio::main" tcptop/src/main.rs` returns a match
|
|
4. `grep "ConnectionTable" tcptop/src/aggregator.rs` returns a match
|
|
5. `grep "format_bytes" tcptop/src/output.rs` returns a match
|
|
6. `grep "CLOSED" tcptop/src/output.rs` returns a match
|
|
7. `grep "select!" tcptop/src/main.rs` returns a match
|
|
8. `grep "check_privileges" tcptop/src/main.rs` returns a match
|
|
9. `grep "pid != 0" tcptop/src/aggregator.rs` returns a match (PID=0 enrichment)
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
- ConnectionTable processes all CollectorEvent variants and calculates bandwidth rates
|
|
- PID=0 enrichment: TcpStateChange with pid=0 inherits PID from existing connection record
|
|
- UDP flows expire after 5 seconds of idle per D-06
|
|
- Closed TCP connections produce [CLOSED] line per D-14 and are removed
|
|
- Pre-existing connections marked [PARTIAL] per D-15
|
|
- Human-readable byte formatting per D-02 (e.g., "1258291 (1.2M)")
|
|
- Output is streaming lines per D-01 (no cursor manipulation)
|
|
- Main event loop multiplexes events, periodic output, and signal handling
|
|
- Full pipeline compiles: privilege -> bootstrap -> collect -> aggregate -> output
|
|
- Pipeline unit tests pass, proving data flow without eBPF/root
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/01-data-pipeline/01-03-SUMMARY.md`
|
|
</output>
|