Files
Zachary D. Rowitsch 38e6dcc34a chore: archive v1.0 phase directories to milestones/v1.0-phases/
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 01:33:15 -04:00

27 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
phase plan type wave depends_on files_modified autonomous requirements must_haves
01-data-pipeline 02 execute 2
01-01
tcptop-ebpf/src/main.rs
tcptop/src/collector/linux.rs
tcptop/src/proc_bootstrap.rs
true
DATA-01
DATA-02
DATA-03
DATA-04
DATA-05
DATA-07
truths artifacts key_links
eBPF programs attach to tcp_sendmsg, tcp_recvmsg, udp_sendmsg, udp_recvmsg kprobes and sock:inet_sock_set_state tracepoint
Each kprobe extracts PID, process name, connection tuple, byte count, and RTT (TCP only) and writes to RingBuf
State change tracepoint captures old/new TCP state and connection tuple
Linux collector loads eBPF programs, reads RingBuf events via AsyncFd, and sends CollectorEvents through mpsc channel
Pre-existing connections are bootstrapped from /proc/net/tcp and /proc/net/udp on startup
TcpStateChange events with PID=0 inherit PID from the existing connection record in the aggregator's ConnectionTable
path provides contains
tcptop-ebpf/src/main.rs 5 eBPF programs (4 kprobes + 1 tracepoint) writing to shared RingBuf #[kprobe]
path provides contains
tcptop/src/collector/linux.rs LinuxCollector implementing NetworkCollector trait impl NetworkCollector for LinuxCollector
path provides contains
tcptop/src/proc_bootstrap.rs Parser for /proc/net/tcp and /proc/net/udp proc/net/tcp
from to via pattern
tcptop-ebpf/src/main.rs tcptop-common/src/lib.rs TcptopEvent written to RingBuf EVENTS.reserve
from to via pattern
tcptop/src/collector/linux.rs tcptop-ebpf/src/main.rs loads compiled eBPF bytecode and attaches programs include_bytes_aligned
from to via pattern
tcptop/src/collector/linux.rs tcptop/src/collector/mod.rs implements NetworkCollector trait impl NetworkCollector
from to via pattern
tcptop/src/collector/linux.rs tcptop/src/proc_bootstrap.rs calls bootstrap for pre-existing connections bootstrap
Implement the eBPF kernel programs and the Linux collector that loads them, reads events from the ring buffer, and translates them into CollectorEvents.

Purpose: This is the core data pipeline -- kernel eBPF programs capture real TCP/UDP events and the userspace collector makes them available through the platform-agnostic trait. This is the hardest engineering in the project.

Output: Working eBPF programs that attach to kernel hook points, a Linux collector that reads events and pushes them through an mpsc channel, and /proc bootstrapping for pre-existing connections.

<execution_context> @/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/workflows/execute-plan.md @/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-data-pipeline/01-CONTEXT.md @.planning/phases/01-data-pipeline/01-RESEARCH.md @.planning/phases/01-data-pipeline/01-01-SUMMARY.md

From tcptop-common/src/lib.rs:

  • TcptopEvent { event_type: u32, data: TcptopEventData }
  • TcptopEventData is a #[repr(C)] pub union with fields data_event: DataEvent and state_event: StateEvent
  • DataEvent { pid, comm, af_family, sport, dport, saddr: [u8;16], daddr: [u8;16], bytes, srtt_us }
  • StateEvent { pid, af_family, sport, dport, saddr: [u8;16], daddr: [u8;16], old_state, new_state }
  • EVENT_TCP_SEND=1, EVENT_TCP_RECV=2, EVENT_UDP_SEND=3, EVENT_UDP_RECV=4, EVENT_TCP_STATE=5

From tcptop/src/collector/mod.rs:

  • trait NetworkCollector { async fn start(&mut self, tx: mpsc::Sender); async fn stop(&mut self); fn bootstrap_existing(&self) -> Result<Vec>; }
  • enum CollectorEvent { TcpSend{..}, TcpRecv{..}, UdpSend{..}, UdpRecv{..}, TcpStateChange{..} }

From tcptop/src/model.rs:

  • ConnectionKey { protocol, local_addr, local_port, remote_addr, remote_port }
  • ConnectionRecord { key, pid, process_name, tcp_state, bytes_in/out, packets_in/out, rate_in/out, rtt_us, last_seen, is_partial, is_closed }
  • Protocol { Tcp, Udp }
  • TcpState with from_kernel(u32) method
Task 1: Implement eBPF kernel programs (kprobes + tracepoint) tcptop-ebpf/src/main.rs tcptop-ebpf/src/main.rs (current skeleton from Plan 01), tcptop-common/src/lib.rs (shared types -- must use these exact struct layouts including the union), .planning/phases/01-data-pipeline/01-RESEARCH.md (hook strategy, code examples, pitfalls 1-5) Replace the skeleton eBPF program with full implementations of all 5 hook points. Every program writes a TcptopEvent to the shared EVENTS RingBuf.
**eBPF programs to implement:**

1. **tcp_sendmsg kprobe** -- attach to `tcp_sendmsg`
   - arg0: `*sock` -- extract connection tuple (saddr, daddr, sport, dport, af_family) via `bpf_probe_read_kernel` from `sock->__sk_common`
   - arg2: `size` -- byte count for this send call
   - Read `srtt_us` from `tcp_sock` (cast sock ptr): the field stores smoothed RTT shifted left by 3, so read raw value (userspace will shift right by 3 per Pitfall 5)
   - Get PID via `bpf_get_current_pid_tgid() >> 32`
   - Get process name via `bpf_get_current_comm()`
   - Write TcptopEvent with event_type = EVENT_TCP_SEND

2. **tcp_recvmsg kprobe** -- attach to `tcp_recvmsg`
   - arg0: `*sock` -- same tuple extraction as tcp_sendmsg
   - For byte count: use kretprobe approach. If aya supports kretprobe (via `#[kretprobe]`), capture return value as bytes received. If kretprobe is not straightforward, capture from msghdr arg or use a BPF HashMap to store entry args and read on return.
   - Read `srtt_us` same as tcp_sendmsg
   - Write TcptopEvent with event_type = EVENT_TCP_RECV
   - NOTE: If kretprobe is complex, start with kprobe on entry and use msg->msg_iter.count for byte estimate. Document the approach taken.

3. **udp_sendmsg kprobe** -- attach to `udp_sendmsg`
   - arg0: `*sock`, arg2: `size` (byte count)
   - Extract 4-tuple from sock->__sk_common
   - srtt_us = 0 (UDP has no RTT)
   - Write TcptopEvent with event_type = EVENT_UDP_SEND

4. **udp_recvmsg kprobe** -- attach to `udp_recvmsg`
   - Same approach as tcp_recvmsg for byte count
   - srtt_us = 0
   - Write TcptopEvent with event_type = EVENT_UDP_RECV

5. **inet_sock_set_state tracepoint** -- attach to `sock:inet_sock_set_state`
   - Read tracepoint args: old_state, new_state, sport, dport, saddr, daddr, af_family
   - PID: use `bpf_get_current_pid_tgid()` but note Pitfall 3 -- this fires in softirq context where PID=0.
   - **PID=0 handling approach (chosen: option b -- aggregator-side enrichment):** The eBPF program writes PID=0 as-is. The userspace aggregator in Plan 03 handles this: when `ConnectionTable.update()` receives a `TcpStateChange` with `pid=0`, it looks up the connection by key in the existing table and inherits the PID from the existing `ConnectionRecord`. This works because kprobes on tcp_sendmsg/tcp_recvmsg will have already populated the connection with the correct PID before any state change fires. This avoids adding a BPF HashMap for sock_ptr->pid tracking, keeping the eBPF programs simpler.
   - Write TcptopEvent with event_type = EVENT_TCP_STATE

**Critical eBPF constraints:**
- All `#[repr(C)]` structs from tcptop-common, no deviation. TcptopEventData is a **union** (locked in Plan 01).
- Use `bpf_probe_read_kernel` for every kernel pointer dereference
- No unbounded loops, no dynamic allocation
- Stack limit: 512 bytes per program -- don't copy entire structs onto stack
- Handle `EVENTS.reserve::<TcptopEvent>(0)` returning `None` gracefully (just return 0, event is dropped)
- For IPv4: write 4 bytes to saddr[0..4], zero the rest. Set af_family = AF_INET (2)
- For IPv6: write 16 bytes to saddr[0..16]. Set af_family = AF_INET6 (10)

**Helper function pattern:**
Extract a shared helper for reading the sock tuple to avoid code duplication across the 4 kprobes:
```rust
#[inline(always)]
unsafe fn read_sock_tuple(sock: *const core::ffi::c_void) -> Result<(u16, [u8;16], [u8;16], u16, u16), i64> {
    // Read __sk_common.skc_family, skc_rcv_saddr, skc_daddr, skc_num, skc_dport
    // using bpf_probe_read_kernel for each field
}
```

**Drop counter (per Pitfall 7):**
Add an eBPF array map for counting dropped events:
```rust
#[map]
static DROP_COUNT: Array<u32> = Array::with_max_entries(1, 0);
```
Increment when `EVENTS.reserve()` returns None.
cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop-ebpf 2>&1 | tail -10 - tcptop-ebpf/src/main.rs contains `#[kprobe]` at least 4 times (tcp_sendmsg, tcp_recvmsg, udp_sendmsg, udp_recvmsg) - tcptop-ebpf/src/main.rs contains `#[tracepoint]` at least 1 time (inet_sock_set_state) - tcptop-ebpf/src/main.rs contains `EVENTS: RingBuf` - tcptop-ebpf/src/main.rs contains `EVENTS.reserve` (writing events to ring buffer) - tcptop-ebpf/src/main.rs contains `bpf_get_current_pid_tgid` (PID capture) - tcptop-ebpf/src/main.rs contains `bpf_get_current_comm` (process name capture) - tcptop-ebpf/src/main.rs contains `bpf_probe_read_kernel` (safe kernel memory reads) - tcptop-ebpf/src/main.rs contains `srtt_us` (RTT reading from tcp_sock per DATA-05) - tcptop-ebpf/src/main.rs contains `DROP_COUNT` (drop counter per Pitfall 7) - tcptop-ebpf/src/main.rs contains `use tcptop_common` (imports shared types) - tcptop-ebpf/src/main.rs contains `EVENT_TCP_SEND` AND `EVENT_UDP_SEND` AND `EVENT_TCP_STATE` - tcptop-ebpf/src/main.rs accesses union fields via `.data.data_event` and `.data.state_event` (matches locked union layout from Plan 01) - Note: `cargo check -p tcptop-ebpf` may fail without bpf-linker installed -- that is acceptable. The code should be syntactically valid Rust. Five eBPF programs implemented: 4 kprobes for tcp_sendmsg/tcp_recvmsg/udp_sendmsg/udp_recvmsg and 1 tracepoint for inet_sock_set_state. All write TcptopEvent to shared RingBuf using union layout. State change tracepoint passes PID=0 through to userspace (aggregator handles enrichment). Drop counter tracks ring buffer overflow. Task 2: Implement Linux collector and /proc bootstrap tcptop/src/collector/linux.rs, tcptop/src/collector/mod.rs, tcptop/src/proc_bootstrap.rs tcptop/src/collector/mod.rs (NetworkCollector trait and CollectorEvent enum), tcptop/src/model.rs (ConnectionRecord, ConnectionKey, Protocol, TcpState), tcptop-common/src/lib.rs (shared types for deserializing ring buffer events -- note TcptopEventData is a union), tcptop-ebpf/src/main.rs (to understand what the eBPF programs produce), .planning/phases/01-data-pipeline/01-RESEARCH.md (Pattern 1 userspace consumer code, procfs usage, anti-patterns) Implement the Linux/eBPF collector and the /proc/net bootstrapper.
**tcptop/src/proc_bootstrap.rs (per D-15):**
Use the `procfs` crate to parse pre-existing connections:

```rust
use crate::model::{ConnectionKey, ConnectionRecord, Protocol, TcpState};
use anyhow::Result;
use std::net::IpAddr;
use std::time::Instant;

pub fn bootstrap_connections() -> Result<Vec<ConnectionRecord>> {
    let mut connections = Vec::new();

    // Parse /proc/net/tcp for TCP connections
    if let Ok(tcp) = procfs::net::tcp() {
        for entry in tcp {
            let key = ConnectionKey {
                protocol: Protocol::Tcp,
                local_addr: entry.local_address.ip(),   // procfs crate returns SocketAddr
                local_port: entry.local_address.port(),
                remote_addr: entry.remote_address.ip(),
                remote_port: entry.remote_address.port(),
            };
            connections.push(ConnectionRecord {
                key,
                pid: entry.inode as u32,  // inode, not PID -- need /proc/*/fd walk to map
                process_name: String::new(), // enriched later
                tcp_state: TcpState::from_kernel(entry.state as u32),
                bytes_in: 0,
                bytes_out: 0,
                packets_in: 0,
                packets_out: 0,
                rate_in: 0.0,
                rate_out: 0.0,
                prev_bytes_in: 0,
                prev_bytes_out: 0,
                rtt_us: None,
                last_seen: Instant::now(),
                is_partial: true,   // per D-15: mark as partial
                is_closed: false,
            });
        }
    }

    // Parse /proc/net/udp for UDP connections
    if let Ok(udp) = procfs::net::udp() {
        for entry in udp {
            let key = ConnectionKey {
                protocol: Protocol::Udp,
                local_addr: entry.local_address.ip(),
                local_port: entry.local_address.port(),
                remote_addr: entry.remote_address.ip(),
                remote_port: entry.remote_address.port(),
            };
            connections.push(ConnectionRecord {
                key,
                pid: entry.inode as u32,
                process_name: String::new(),
                tcp_state: None,      // per D-07: no state for UDP
                bytes_in: 0,
                bytes_out: 0,
                packets_in: 0,
                packets_out: 0,
                rate_in: 0.0,
                rate_out: 0.0,
                prev_bytes_in: 0,
                prev_bytes_out: 0,
                rtt_us: None,
                last_seen: Instant::now(),
                is_partial: true,
                is_closed: false,
            });
        }
    }

    // Also parse /proc/net/tcp6 and /proc/net/udp6 for IPv6
    // procfs crate may handle this via tcp6() and udp6() methods

    Ok(connections)
}

/// Walk /proc/*/fd to map socket inodes to PIDs
/// This enriches the inode-based entries from /proc/net/*
pub fn enrich_pids(connections: &mut [ConnectionRecord]) -> Result<()> {
    // Use procfs::process::all_processes() to iterate /proc/*/fd
    // For each process, read fd symlinks, find socket:[inode] entries
    // Map inode -> PID and update matching ConnectionRecords
    // This is best-effort -- skip processes we can't read
    if let Ok(procs) = procfs::process::all_processes() {
        for proc_result in procs {
            if let Ok(proc) = proc_result {
                let pid = proc.pid() as u32;
                let comm = proc.stat().map(|s| s.comm).unwrap_or_default();
                if let Ok(fds) = proc.fd() {
                    for fd_result in fds {
                        if let Ok(fd_info) = fd_result {
                            // Check if fd target is a socket
                            // Match inode to connection records
                            // Update pid and process_name
                        }
                    }
                }
            }
        }
    }
    Ok(())
}
```

Note: The exact `procfs` crate API may differ from the above pseudocode. The executor MUST read the actual procfs 0.18 API (check docs.rs/procfs) and adjust method names. The key behaviors are:
- Parse /proc/net/tcp, /proc/net/tcp6, /proc/net/udp, /proc/net/udp6
- Map socket inodes to PIDs via /proc/*/fd
- Mark all bootstrapped connections as `is_partial: true`

**tcptop/src/collector/linux.rs:**
Implement `LinuxCollector` struct that implements `NetworkCollector` trait.

```rust
use super::{CollectorEvent, NetworkCollector};
use crate::model::{ConnectionKey, ConnectionRecord, Protocol};
use crate::proc_bootstrap;
use anyhow::{Context, Result};
use aya::maps::RingBuf;
use aya::programs::{KProbe, TracePoint};
use aya::Ebpf;
use log::{info, warn};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use tcptop_common::*;
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc;
use std::os::fd::AsRawFd;

pub struct LinuxCollector {
    bpf: Option<Ebpf>,
}

impl LinuxCollector {
    pub fn new() -> Result<Self> {
        Ok(Self { bpf: None })
    }

    fn load_and_attach(&mut self) -> Result<()> {
        // Load eBPF bytecode compiled by aya-build
        let mut bpf = Ebpf::load(
            aya::include_bytes_aligned!(concat!(env!("OUT_DIR"), "/tcptop"))
        )?;

        // Initialize aya-log for kernel-side debug logging
        if let Err(e) = aya_log::EbpfLogger::init(&mut bpf) {
            warn!("Failed to init eBPF logger: {}", e);
        }

        // Attach kprobes
        let program: &mut KProbe = bpf.program_mut("tcp_sendmsg").unwrap().try_into()?;
        program.load()?;
        program.attach("tcp_sendmsg", 0)?;

        let program: &mut KProbe = bpf.program_mut("tcp_recvmsg").unwrap().try_into()?;
        program.load()?;
        program.attach("tcp_recvmsg", 0)?;

        let program: &mut KProbe = bpf.program_mut("udp_sendmsg").unwrap().try_into()?;
        program.load()?;
        program.attach("udp_sendmsg", 0)?;

        let program: &mut KProbe = bpf.program_mut("udp_recvmsg").unwrap().try_into()?;
        program.load()?;
        program.attach("udp_recvmsg", 0)?;

        // Attach tracepoint
        let program: &mut TracePoint = bpf.program_mut("inet_sock_set_state").unwrap().try_into()?;
        program.load()?;
        program.attach("sock", "inet_sock_set_state")?;

        self.bpf = Some(bpf);
        Ok(())
    }

    fn parse_event(raw: &[u8]) -> Option<CollectorEvent> {
        if raw.len() < core::mem::size_of::<TcptopEvent>() {
            return None;
        }
        let event: &TcptopEvent = unsafe { &*(raw.as_ptr() as *const TcptopEvent) };

        match event.event_type {
            EVENT_TCP_SEND | EVENT_TCP_RECV => {
                let d = unsafe { &event.data.data_event };
                let key = Self::data_event_to_key(d, Protocol::Tcp);
                let comm = Self::comm_to_string(&d.comm);
                let srtt = d.srtt_us >> 3; // Pitfall 5: shift right by 3 for actual microseconds
                if event.event_type == EVENT_TCP_SEND {
                    Some(CollectorEvent::TcpSend { key, pid: d.pid, comm, bytes: d.bytes, srtt_us: srtt })
                } else {
                    Some(CollectorEvent::TcpRecv { key, pid: d.pid, comm, bytes: d.bytes, srtt_us: srtt })
                }
            }
            EVENT_UDP_SEND | EVENT_UDP_RECV => {
                let d = unsafe { &event.data.data_event };
                let key = Self::data_event_to_key(d, Protocol::Udp);
                let comm = Self::comm_to_string(&d.comm);
                if event.event_type == EVENT_UDP_SEND {
                    Some(CollectorEvent::UdpSend { key, pid: d.pid, comm, bytes: d.bytes })
                } else {
                    Some(CollectorEvent::UdpRecv { key, pid: d.pid, comm, bytes: d.bytes })
                }
            }
            EVENT_TCP_STATE => {
                let s = unsafe { &event.data.state_event };
                let key = ConnectionKey {
                    protocol: Protocol::Tcp,
                    local_addr: Self::parse_addr(s.af_family, &s.saddr),
                    local_port: s.sport,
                    remote_addr: Self::parse_addr(s.af_family, &s.daddr),
                    remote_port: s.dport,
                };
                Some(CollectorEvent::TcpStateChange { key, pid: s.pid, old_state: s.old_state, new_state: s.new_state })
            }
            _ => None,
        }
    }

    fn data_event_to_key(d: &DataEvent, protocol: Protocol) -> ConnectionKey {
        ConnectionKey {
            protocol,
            local_addr: Self::parse_addr(d.af_family, &d.saddr),
            local_port: d.sport,
            remote_addr: Self::parse_addr(d.af_family, &d.daddr),
            remote_port: d.dport,
        }
    }

    fn parse_addr(af_family: u16, bytes: &[u8; 16]) -> IpAddr {
        if af_family == AF_INET {
            IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]))
        } else {
            IpAddr::V6(Ipv6Addr::from(*bytes))
        }
    }

    fn comm_to_string(comm: &[u8; 16]) -> String {
        let end = comm.iter().position(|&b| b == 0).unwrap_or(16);
        String::from_utf8_lossy(&comm[..end]).to_string()
    }
}

#[async_trait::async_trait]
impl NetworkCollector for LinuxCollector {
    async fn start(&mut self, tx: mpsc::Sender<CollectorEvent>) -> Result<()> {
        self.load_and_attach().context("Failed to load eBPF programs")?;

        let bpf = self.bpf.as_mut().unwrap();
        let ring_buf = RingBuf::try_from(bpf.map_mut("EVENTS").unwrap())?;
        let async_fd = AsyncFd::new(ring_buf)?;
        // The exact AsyncFd wrapping may differ -- aya's RingBuf may need
        // to be wrapped differently. Check aya 0.13 docs for the async ring buffer API.
        // Alternative: use aya::maps::ring_buf::RingBuf with poll-based reading in a tokio::spawn.

        info!("eBPF programs attached, reading events...");

        loop {
            let mut guard = async_fd.readable().await?;
            while let Some(item) = ring_buf.next() {
                if let Some(event) = Self::parse_event(&item) {
                    if tx.send(event).await.is_err() {
                        return Ok(()); // receiver dropped, shutting down
                    }
                }
            }
            guard.clear_ready();
        }
    }

    async fn stop(&mut self) -> Result<()> {
        // Dropping the Ebpf struct detaches all programs
        self.bpf.take();
        Ok(())
    }

    fn bootstrap_existing(&self) -> Result<Vec<ConnectionRecord>> {
        proc_bootstrap::bootstrap_connections()
    }
}
```

**IMPORTANT:** The exact aya 0.13 API for async RingBuf reading may differ from the above pseudocode. The executor MUST:
1. Check if `aya::maps::RingBuf` implements `AsRawFd` (needed for AsyncFd)
2. If not, check for `aya::maps::ring_buf::AsyncRingBuf` or similar
3. Check if `include_bytes_aligned!` is the correct macro path
4. Adapt the loading and reading code to match the actual aya 0.13.1 API

The key behaviors that MUST be preserved regardless of API details:
- Load compiled eBPF bytecode
- Attach to all 5 hook points
- Read events from ring buffer asynchronously
- Parse TcptopEvent using the **union** layout: access `event.data.data_event` or `event.data.state_event` based on `event_type` tag
- Send events through mpsc channel
- Apply srtt_us >> 3 for RTT conversion (Pitfall 5)
- Handle IPv4 and IPv6 addresses correctly

**Update tcptop/src/collector/mod.rs:**
Uncomment or add `pub mod linux;` if it was a placeholder. Make sure `linux.rs` is properly declared as a submodule.
cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -10 - tcptop/src/collector/linux.rs contains `impl NetworkCollector for LinuxCollector` - tcptop/src/collector/linux.rs contains `include_bytes_aligned!` (loading eBPF bytecode) - tcptop/src/collector/linux.rs contains `KProbe` AND `TracePoint` (program types) - tcptop/src/collector/linux.rs contains `tcp_sendmsg` AND `tcp_recvmsg` AND `udp_sendmsg` AND `udp_recvmsg` (kprobe attachment names) - tcptop/src/collector/linux.rs contains `inet_sock_set_state` (tracepoint attachment) - tcptop/src/collector/linux.rs contains `srtt_us >> 3` (RTT shift per Pitfall 5) - tcptop/src/collector/linux.rs contains `AF_INET` AND `Ipv4Addr` AND `Ipv6Addr` (dual-stack address parsing) - tcptop/src/collector/linux.rs contains `tx.send` (sending events through mpsc channel) - tcptop/src/collector/linux.rs accesses `event.data.data_event` and `event.data.state_event` (union access matching Plan 01 locked layout) - tcptop/src/proc_bootstrap.rs contains `is_partial: true` (per D-15) - tcptop/src/proc_bootstrap.rs contains `proc/net/tcp` (parsing pre-existing TCP connections) - tcptop/src/proc_bootstrap.rs contains `proc/net/udp` OR `udp()` (parsing pre-existing UDP connections) - tcptop/src/collector/mod.rs contains `pub mod linux` - `cargo check -p tcptop` succeeds (the userspace crate compiles) Linux eBPF collector implements NetworkCollector trait, loads and attaches 5 eBPF programs, reads events from RingBuf asynchronously, and converts them to CollectorEvents using union access. /proc bootstrap provides pre-existing connections marked as partial. TcpStateChange events pass PID=0 through to aggregator for enrichment. All address parsing handles both IPv4 and IPv6. 1. `cargo check -p tcptop` exits 0 (userspace with collector compiles) 2. `grep -c "kprobe" tcptop-ebpf/src/main.rs` returns >= 4 3. `grep "tracepoint" tcptop-ebpf/src/main.rs` returns at least 1 match 4. `grep "impl NetworkCollector" tcptop/src/collector/linux.rs` returns a match 5. `grep "is_partial: true" tcptop/src/proc_bootstrap.rs` returns at least 1 match 6. `grep "srtt_us >> 3" tcptop/src/collector/linux.rs` returns a match

<success_criteria>

  • eBPF crate has 5 programs (4 kprobes + 1 tracepoint) writing TcptopEvent to RingBuf
  • LinuxCollector implements NetworkCollector with load/attach/read/parse pipeline
  • /proc bootstrap parses tcp, tcp6, udp, udp6 and marks connections as partial
  • Event parsing correctly handles IPv4 and IPv6 addresses
  • RTT values are shifted right by 3 in userspace
  • TcpStateChange events with PID=0 are passed through (aggregator handles enrichment per Pitfall 3)
  • Userspace crate compiles (eBPF crate compilation requires bpf-linker) </success_criteria>
After completion, create `.planning/phases/01-data-pipeline/01-02-SUMMARY.md`