27 KiB
27 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | requirements | must_haves | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 01-data-pipeline | 02 | execute | 2 |
|
|
true |
|
|
Purpose: This is the core data pipeline -- kernel eBPF programs capture real TCP/UDP events and the userspace collector makes them available through the platform-agnostic trait. This is the hardest engineering in the project.
Output: Working eBPF programs that attach to kernel hook points, a Linux collector that reads events and pushes them through an mpsc channel, and /proc bootstrapping for pre-existing connections.
<execution_context> @/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/workflows/execute-plan.md @/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-data-pipeline/01-CONTEXT.md @.planning/phases/01-data-pipeline/01-RESEARCH.md @.planning/phases/01-data-pipeline/01-01-SUMMARY.mdFrom tcptop-common/src/lib.rs:
- TcptopEvent { event_type: u32, data: TcptopEventData }
- TcptopEventData is a
#[repr(C)] pub unionwith fieldsdata_event: DataEventandstate_event: StateEvent - DataEvent { pid, comm, af_family, sport, dport, saddr: [u8;16], daddr: [u8;16], bytes, srtt_us }
- StateEvent { pid, af_family, sport, dport, saddr: [u8;16], daddr: [u8;16], old_state, new_state }
- EVENT_TCP_SEND=1, EVENT_TCP_RECV=2, EVENT_UDP_SEND=3, EVENT_UDP_RECV=4, EVENT_TCP_STATE=5
From tcptop/src/collector/mod.rs:
- trait NetworkCollector { async fn start(&mut self, tx: mpsc::Sender); async fn stop(&mut self); fn bootstrap_existing(&self) -> Result<Vec>; }
- enum CollectorEvent { TcpSend{..}, TcpRecv{..}, UdpSend{..}, UdpRecv{..}, TcpStateChange{..} }
From tcptop/src/model.rs:
- ConnectionKey { protocol, local_addr, local_port, remote_addr, remote_port }
- ConnectionRecord { key, pid, process_name, tcp_state, bytes_in/out, packets_in/out, rate_in/out, rtt_us, last_seen, is_partial, is_closed }
- Protocol { Tcp, Udp }
- TcpState with from_kernel(u32) method
**eBPF programs to implement:**
1. **tcp_sendmsg kprobe** -- attach to `tcp_sendmsg`
- arg0: `*sock` -- extract connection tuple (saddr, daddr, sport, dport, af_family) via `bpf_probe_read_kernel` from `sock->__sk_common`
- arg2: `size` -- byte count for this send call
- Read `srtt_us` from `tcp_sock` (cast sock ptr): the field stores smoothed RTT shifted left by 3, so read raw value (userspace will shift right by 3 per Pitfall 5)
- Get PID via `bpf_get_current_pid_tgid() >> 32`
- Get process name via `bpf_get_current_comm()`
- Write TcptopEvent with event_type = EVENT_TCP_SEND
2. **tcp_recvmsg kprobe** -- attach to `tcp_recvmsg`
- arg0: `*sock` -- same tuple extraction as tcp_sendmsg
- For byte count: use kretprobe approach. If aya supports kretprobe (via `#[kretprobe]`), capture return value as bytes received. If kretprobe is not straightforward, capture from msghdr arg or use a BPF HashMap to store entry args and read on return.
- Read `srtt_us` same as tcp_sendmsg
- Write TcptopEvent with event_type = EVENT_TCP_RECV
- NOTE: If kretprobe is complex, start with kprobe on entry and use msg->msg_iter.count for byte estimate. Document the approach taken.
3. **udp_sendmsg kprobe** -- attach to `udp_sendmsg`
- arg0: `*sock`, arg2: `size` (byte count)
- Extract 4-tuple from sock->__sk_common
- srtt_us = 0 (UDP has no RTT)
- Write TcptopEvent with event_type = EVENT_UDP_SEND
4. **udp_recvmsg kprobe** -- attach to `udp_recvmsg`
- Same approach as tcp_recvmsg for byte count
- srtt_us = 0
- Write TcptopEvent with event_type = EVENT_UDP_RECV
5. **inet_sock_set_state tracepoint** -- attach to `sock:inet_sock_set_state`
- Read tracepoint args: old_state, new_state, sport, dport, saddr, daddr, af_family
- PID: use `bpf_get_current_pid_tgid()` but note Pitfall 3 -- this fires in softirq context where PID=0.
- **PID=0 handling approach (chosen: option b -- aggregator-side enrichment):** The eBPF program writes PID=0 as-is. The userspace aggregator in Plan 03 handles this: when `ConnectionTable.update()` receives a `TcpStateChange` with `pid=0`, it looks up the connection by key in the existing table and inherits the PID from the existing `ConnectionRecord`. This works because kprobes on tcp_sendmsg/tcp_recvmsg will have already populated the connection with the correct PID before any state change fires. This avoids adding a BPF HashMap for sock_ptr->pid tracking, keeping the eBPF programs simpler.
- Write TcptopEvent with event_type = EVENT_TCP_STATE
**Critical eBPF constraints:**
- All `#[repr(C)]` structs from tcptop-common, no deviation. TcptopEventData is a **union** (locked in Plan 01).
- Use `bpf_probe_read_kernel` for every kernel pointer dereference
- No unbounded loops, no dynamic allocation
- Stack limit: 512 bytes per program -- don't copy entire structs onto stack
- Handle `EVENTS.reserve::<TcptopEvent>(0)` returning `None` gracefully (just return 0, event is dropped)
- For IPv4: write 4 bytes to saddr[0..4], zero the rest. Set af_family = AF_INET (2)
- For IPv6: write 16 bytes to saddr[0..16]. Set af_family = AF_INET6 (10)
**Helper function pattern:**
Extract a shared helper for reading the sock tuple to avoid code duplication across the 4 kprobes:
```rust
#[inline(always)]
unsafe fn read_sock_tuple(sock: *const core::ffi::c_void) -> Result<(u16, [u8;16], [u8;16], u16, u16), i64> {
// Read __sk_common.skc_family, skc_rcv_saddr, skc_daddr, skc_num, skc_dport
// using bpf_probe_read_kernel for each field
}
```
**Drop counter (per Pitfall 7):**
Add an eBPF array map for counting dropped events:
```rust
#[map]
static DROP_COUNT: Array<u32> = Array::with_max_entries(1, 0);
```
Increment when `EVENTS.reserve()` returns None.
cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop-ebpf 2>&1 | tail -10
- tcptop-ebpf/src/main.rs contains `#[kprobe]` at least 4 times (tcp_sendmsg, tcp_recvmsg, udp_sendmsg, udp_recvmsg)
- tcptop-ebpf/src/main.rs contains `#[tracepoint]` at least 1 time (inet_sock_set_state)
- tcptop-ebpf/src/main.rs contains `EVENTS: RingBuf`
- tcptop-ebpf/src/main.rs contains `EVENTS.reserve` (writing events to ring buffer)
- tcptop-ebpf/src/main.rs contains `bpf_get_current_pid_tgid` (PID capture)
- tcptop-ebpf/src/main.rs contains `bpf_get_current_comm` (process name capture)
- tcptop-ebpf/src/main.rs contains `bpf_probe_read_kernel` (safe kernel memory reads)
- tcptop-ebpf/src/main.rs contains `srtt_us` (RTT reading from tcp_sock per DATA-05)
- tcptop-ebpf/src/main.rs contains `DROP_COUNT` (drop counter per Pitfall 7)
- tcptop-ebpf/src/main.rs contains `use tcptop_common` (imports shared types)
- tcptop-ebpf/src/main.rs contains `EVENT_TCP_SEND` AND `EVENT_UDP_SEND` AND `EVENT_TCP_STATE`
- tcptop-ebpf/src/main.rs accesses union fields via `.data.data_event` and `.data.state_event` (matches locked union layout from Plan 01)
- Note: `cargo check -p tcptop-ebpf` may fail without bpf-linker installed -- that is acceptable. The code should be syntactically valid Rust.
Five eBPF programs implemented: 4 kprobes for tcp_sendmsg/tcp_recvmsg/udp_sendmsg/udp_recvmsg and 1 tracepoint for inet_sock_set_state. All write TcptopEvent to shared RingBuf using union layout. State change tracepoint passes PID=0 through to userspace (aggregator handles enrichment). Drop counter tracks ring buffer overflow.
Task 2: Implement Linux collector and /proc bootstrap
tcptop/src/collector/linux.rs,
tcptop/src/collector/mod.rs,
tcptop/src/proc_bootstrap.rs
tcptop/src/collector/mod.rs (NetworkCollector trait and CollectorEvent enum),
tcptop/src/model.rs (ConnectionRecord, ConnectionKey, Protocol, TcpState),
tcptop-common/src/lib.rs (shared types for deserializing ring buffer events -- note TcptopEventData is a union),
tcptop-ebpf/src/main.rs (to understand what the eBPF programs produce),
.planning/phases/01-data-pipeline/01-RESEARCH.md (Pattern 1 userspace consumer code, procfs usage, anti-patterns)
Implement the Linux/eBPF collector and the /proc/net bootstrapper.
**tcptop/src/proc_bootstrap.rs (per D-15):**
Use the `procfs` crate to parse pre-existing connections:
```rust
use crate::model::{ConnectionKey, ConnectionRecord, Protocol, TcpState};
use anyhow::Result;
use std::net::IpAddr;
use std::time::Instant;
pub fn bootstrap_connections() -> Result<Vec<ConnectionRecord>> {
let mut connections = Vec::new();
// Parse /proc/net/tcp for TCP connections
if let Ok(tcp) = procfs::net::tcp() {
for entry in tcp {
let key = ConnectionKey {
protocol: Protocol::Tcp,
local_addr: entry.local_address.ip(), // procfs crate returns SocketAddr
local_port: entry.local_address.port(),
remote_addr: entry.remote_address.ip(),
remote_port: entry.remote_address.port(),
};
connections.push(ConnectionRecord {
key,
pid: entry.inode as u32, // inode, not PID -- need /proc/*/fd walk to map
process_name: String::new(), // enriched later
tcp_state: TcpState::from_kernel(entry.state as u32),
bytes_in: 0,
bytes_out: 0,
packets_in: 0,
packets_out: 0,
rate_in: 0.0,
rate_out: 0.0,
prev_bytes_in: 0,
prev_bytes_out: 0,
rtt_us: None,
last_seen: Instant::now(),
is_partial: true, // per D-15: mark as partial
is_closed: false,
});
}
}
// Parse /proc/net/udp for UDP connections
if let Ok(udp) = procfs::net::udp() {
for entry in udp {
let key = ConnectionKey {
protocol: Protocol::Udp,
local_addr: entry.local_address.ip(),
local_port: entry.local_address.port(),
remote_addr: entry.remote_address.ip(),
remote_port: entry.remote_address.port(),
};
connections.push(ConnectionRecord {
key,
pid: entry.inode as u32,
process_name: String::new(),
tcp_state: None, // per D-07: no state for UDP
bytes_in: 0,
bytes_out: 0,
packets_in: 0,
packets_out: 0,
rate_in: 0.0,
rate_out: 0.0,
prev_bytes_in: 0,
prev_bytes_out: 0,
rtt_us: None,
last_seen: Instant::now(),
is_partial: true,
is_closed: false,
});
}
}
// Also parse /proc/net/tcp6 and /proc/net/udp6 for IPv6
// procfs crate may handle this via tcp6() and udp6() methods
Ok(connections)
}
/// Walk /proc/*/fd to map socket inodes to PIDs
/// This enriches the inode-based entries from /proc/net/*
pub fn enrich_pids(connections: &mut [ConnectionRecord]) -> Result<()> {
// Use procfs::process::all_processes() to iterate /proc/*/fd
// For each process, read fd symlinks, find socket:[inode] entries
// Map inode -> PID and update matching ConnectionRecords
// This is best-effort -- skip processes we can't read
if let Ok(procs) = procfs::process::all_processes() {
for proc_result in procs {
if let Ok(proc) = proc_result {
let pid = proc.pid() as u32;
let comm = proc.stat().map(|s| s.comm).unwrap_or_default();
if let Ok(fds) = proc.fd() {
for fd_result in fds {
if let Ok(fd_info) = fd_result {
// Check if fd target is a socket
// Match inode to connection records
// Update pid and process_name
}
}
}
}
}
}
Ok(())
}
```
Note: The exact `procfs` crate API may differ from the above pseudocode. The executor MUST read the actual procfs 0.18 API (check docs.rs/procfs) and adjust method names. The key behaviors are:
- Parse /proc/net/tcp, /proc/net/tcp6, /proc/net/udp, /proc/net/udp6
- Map socket inodes to PIDs via /proc/*/fd
- Mark all bootstrapped connections as `is_partial: true`
**tcptop/src/collector/linux.rs:**
Implement `LinuxCollector` struct that implements `NetworkCollector` trait.
```rust
use super::{CollectorEvent, NetworkCollector};
use crate::model::{ConnectionKey, ConnectionRecord, Protocol};
use crate::proc_bootstrap;
use anyhow::{Context, Result};
use aya::maps::RingBuf;
use aya::programs::{KProbe, TracePoint};
use aya::Ebpf;
use log::{info, warn};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use tcptop_common::*;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc;
use std::os::fd::AsRawFd;
pub struct LinuxCollector {
bpf: Option<Ebpf>,
}
impl LinuxCollector {
pub fn new() -> Result<Self> {
Ok(Self { bpf: None })
}
fn load_and_attach(&mut self) -> Result<()> {
// Load eBPF bytecode compiled by aya-build
let mut bpf = Ebpf::load(
aya::include_bytes_aligned!(concat!(env!("OUT_DIR"), "/tcptop"))
)?;
// Initialize aya-log for kernel-side debug logging
if let Err(e) = aya_log::EbpfLogger::init(&mut bpf) {
warn!("Failed to init eBPF logger: {}", e);
}
// Attach kprobes
let program: &mut KProbe = bpf.program_mut("tcp_sendmsg").unwrap().try_into()?;
program.load()?;
program.attach("tcp_sendmsg", 0)?;
let program: &mut KProbe = bpf.program_mut("tcp_recvmsg").unwrap().try_into()?;
program.load()?;
program.attach("tcp_recvmsg", 0)?;
let program: &mut KProbe = bpf.program_mut("udp_sendmsg").unwrap().try_into()?;
program.load()?;
program.attach("udp_sendmsg", 0)?;
let program: &mut KProbe = bpf.program_mut("udp_recvmsg").unwrap().try_into()?;
program.load()?;
program.attach("udp_recvmsg", 0)?;
// Attach tracepoint
let program: &mut TracePoint = bpf.program_mut("inet_sock_set_state").unwrap().try_into()?;
program.load()?;
program.attach("sock", "inet_sock_set_state")?;
self.bpf = Some(bpf);
Ok(())
}
fn parse_event(raw: &[u8]) -> Option<CollectorEvent> {
if raw.len() < core::mem::size_of::<TcptopEvent>() {
return None;
}
let event: &TcptopEvent = unsafe { &*(raw.as_ptr() as *const TcptopEvent) };
match event.event_type {
EVENT_TCP_SEND | EVENT_TCP_RECV => {
let d = unsafe { &event.data.data_event };
let key = Self::data_event_to_key(d, Protocol::Tcp);
let comm = Self::comm_to_string(&d.comm);
let srtt = d.srtt_us >> 3; // Pitfall 5: shift right by 3 for actual microseconds
if event.event_type == EVENT_TCP_SEND {
Some(CollectorEvent::TcpSend { key, pid: d.pid, comm, bytes: d.bytes, srtt_us: srtt })
} else {
Some(CollectorEvent::TcpRecv { key, pid: d.pid, comm, bytes: d.bytes, srtt_us: srtt })
}
}
EVENT_UDP_SEND | EVENT_UDP_RECV => {
let d = unsafe { &event.data.data_event };
let key = Self::data_event_to_key(d, Protocol::Udp);
let comm = Self::comm_to_string(&d.comm);
if event.event_type == EVENT_UDP_SEND {
Some(CollectorEvent::UdpSend { key, pid: d.pid, comm, bytes: d.bytes })
} else {
Some(CollectorEvent::UdpRecv { key, pid: d.pid, comm, bytes: d.bytes })
}
}
EVENT_TCP_STATE => {
let s = unsafe { &event.data.state_event };
let key = ConnectionKey {
protocol: Protocol::Tcp,
local_addr: Self::parse_addr(s.af_family, &s.saddr),
local_port: s.sport,
remote_addr: Self::parse_addr(s.af_family, &s.daddr),
remote_port: s.dport,
};
Some(CollectorEvent::TcpStateChange { key, pid: s.pid, old_state: s.old_state, new_state: s.new_state })
}
_ => None,
}
}
fn data_event_to_key(d: &DataEvent, protocol: Protocol) -> ConnectionKey {
ConnectionKey {
protocol,
local_addr: Self::parse_addr(d.af_family, &d.saddr),
local_port: d.sport,
remote_addr: Self::parse_addr(d.af_family, &d.daddr),
remote_port: d.dport,
}
}
fn parse_addr(af_family: u16, bytes: &[u8; 16]) -> IpAddr {
if af_family == AF_INET {
IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]))
} else {
IpAddr::V6(Ipv6Addr::from(*bytes))
}
}
fn comm_to_string(comm: &[u8; 16]) -> String {
let end = comm.iter().position(|&b| b == 0).unwrap_or(16);
String::from_utf8_lossy(&comm[..end]).to_string()
}
}
#[async_trait::async_trait]
impl NetworkCollector for LinuxCollector {
async fn start(&mut self, tx: mpsc::Sender<CollectorEvent>) -> Result<()> {
self.load_and_attach().context("Failed to load eBPF programs")?;
let bpf = self.bpf.as_mut().unwrap();
let ring_buf = RingBuf::try_from(bpf.map_mut("EVENTS").unwrap())?;
let async_fd = AsyncFd::new(ring_buf)?;
// The exact AsyncFd wrapping may differ -- aya's RingBuf may need
// to be wrapped differently. Check aya 0.13 docs for the async ring buffer API.
// Alternative: use aya::maps::ring_buf::RingBuf with poll-based reading in a tokio::spawn.
info!("eBPF programs attached, reading events...");
loop {
let mut guard = async_fd.readable().await?;
while let Some(item) = ring_buf.next() {
if let Some(event) = Self::parse_event(&item) {
if tx.send(event).await.is_err() {
return Ok(()); // receiver dropped, shutting down
}
}
}
guard.clear_ready();
}
}
async fn stop(&mut self) -> Result<()> {
// Dropping the Ebpf struct detaches all programs
self.bpf.take();
Ok(())
}
fn bootstrap_existing(&self) -> Result<Vec<ConnectionRecord>> {
proc_bootstrap::bootstrap_connections()
}
}
```
**IMPORTANT:** The exact aya 0.13 API for async RingBuf reading may differ from the above pseudocode. The executor MUST:
1. Check if `aya::maps::RingBuf` implements `AsRawFd` (needed for AsyncFd)
2. If not, check for `aya::maps::ring_buf::AsyncRingBuf` or similar
3. Check if `include_bytes_aligned!` is the correct macro path
4. Adapt the loading and reading code to match the actual aya 0.13.1 API
The key behaviors that MUST be preserved regardless of API details:
- Load compiled eBPF bytecode
- Attach to all 5 hook points
- Read events from ring buffer asynchronously
- Parse TcptopEvent using the **union** layout: access `event.data.data_event` or `event.data.state_event` based on `event_type` tag
- Send events through mpsc channel
- Apply srtt_us >> 3 for RTT conversion (Pitfall 5)
- Handle IPv4 and IPv6 addresses correctly
**Update tcptop/src/collector/mod.rs:**
Uncomment or add `pub mod linux;` if it was a placeholder. Make sure `linux.rs` is properly declared as a submodule.
cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -10
- tcptop/src/collector/linux.rs contains `impl NetworkCollector for LinuxCollector`
- tcptop/src/collector/linux.rs contains `include_bytes_aligned!` (loading eBPF bytecode)
- tcptop/src/collector/linux.rs contains `KProbe` AND `TracePoint` (program types)
- tcptop/src/collector/linux.rs contains `tcp_sendmsg` AND `tcp_recvmsg` AND `udp_sendmsg` AND `udp_recvmsg` (kprobe attachment names)
- tcptop/src/collector/linux.rs contains `inet_sock_set_state` (tracepoint attachment)
- tcptop/src/collector/linux.rs contains `srtt_us >> 3` (RTT shift per Pitfall 5)
- tcptop/src/collector/linux.rs contains `AF_INET` AND `Ipv4Addr` AND `Ipv6Addr` (dual-stack address parsing)
- tcptop/src/collector/linux.rs contains `tx.send` (sending events through mpsc channel)
- tcptop/src/collector/linux.rs accesses `event.data.data_event` and `event.data.state_event` (union access matching Plan 01 locked layout)
- tcptop/src/proc_bootstrap.rs contains `is_partial: true` (per D-15)
- tcptop/src/proc_bootstrap.rs contains `proc/net/tcp` (parsing pre-existing TCP connections)
- tcptop/src/proc_bootstrap.rs contains `proc/net/udp` OR `udp()` (parsing pre-existing UDP connections)
- tcptop/src/collector/mod.rs contains `pub mod linux`
- `cargo check -p tcptop` succeeds (the userspace crate compiles)
Linux eBPF collector implements NetworkCollector trait, loads and attaches 5 eBPF programs, reads events from RingBuf asynchronously, and converts them to CollectorEvents using union access. /proc bootstrap provides pre-existing connections marked as partial. TcpStateChange events pass PID=0 through to aggregator for enrichment. All address parsing handles both IPv4 and IPv6.
1. `cargo check -p tcptop` exits 0 (userspace with collector compiles)
2. `grep -c "kprobe" tcptop-ebpf/src/main.rs` returns >= 4
3. `grep "tracepoint" tcptop-ebpf/src/main.rs` returns at least 1 match
4. `grep "impl NetworkCollector" tcptop/src/collector/linux.rs` returns a match
5. `grep "is_partial: true" tcptop/src/proc_bootstrap.rs` returns at least 1 match
6. `grep "srtt_us >> 3" tcptop/src/collector/linux.rs` returns a match
<success_criteria>
- eBPF crate has 5 programs (4 kprobes + 1 tracepoint) writing TcptopEvent to RingBuf
- LinuxCollector implements NetworkCollector with load/attach/read/parse pipeline
- /proc bootstrap parses tcp, tcp6, udp, udp6 and marks connections as partial
- Event parsing correctly handles IPv4 and IPv6 addresses
- RTT values are shifted right by 3 in userspace
- TcpStateChange events with PID=0 are passed through (aggregator handles enrichment per Pitfall 3)
- Userspace crate compiles (eBPF crate compilation requires bpf-linker) </success_criteria>