561 lines
27 KiB
Markdown
561 lines
27 KiB
Markdown
---
|
|
phase: 01-data-pipeline
|
|
plan: 02
|
|
type: execute
|
|
wave: 2
|
|
depends_on: ["01-01"]
|
|
files_modified:
|
|
- tcptop-ebpf/src/main.rs
|
|
- tcptop/src/collector/linux.rs
|
|
- tcptop/src/proc_bootstrap.rs
|
|
autonomous: true
|
|
requirements:
|
|
- DATA-01
|
|
- DATA-02
|
|
- DATA-03
|
|
- DATA-04
|
|
- DATA-05
|
|
- DATA-07
|
|
|
|
must_haves:
|
|
truths:
|
|
- "eBPF programs attach to tcp_sendmsg, tcp_recvmsg, udp_sendmsg, udp_recvmsg kprobes and sock:inet_sock_set_state tracepoint"
|
|
- "Each kprobe extracts PID, process name, connection tuple, byte count, and RTT (TCP only) and writes to RingBuf"
|
|
- "State change tracepoint captures old/new TCP state and connection tuple"
|
|
- "Linux collector loads eBPF programs, reads RingBuf events via AsyncFd, and sends CollectorEvents through mpsc channel"
|
|
- "Pre-existing connections are bootstrapped from /proc/net/tcp and /proc/net/udp on startup"
|
|
- "TcpStateChange events with PID=0 inherit PID from the existing connection record in the aggregator's ConnectionTable"
|
|
artifacts:
|
|
- path: "tcptop-ebpf/src/main.rs"
|
|
provides: "5 eBPF programs (4 kprobes + 1 tracepoint) writing to shared RingBuf"
|
|
contains: "#[kprobe]"
|
|
- path: "tcptop/src/collector/linux.rs"
|
|
provides: "LinuxCollector implementing NetworkCollector trait"
|
|
contains: "impl NetworkCollector for LinuxCollector"
|
|
- path: "tcptop/src/proc_bootstrap.rs"
|
|
provides: "Parser for /proc/net/tcp and /proc/net/udp"
|
|
contains: "proc/net/tcp"
|
|
key_links:
|
|
- from: "tcptop-ebpf/src/main.rs"
|
|
to: "tcptop-common/src/lib.rs"
|
|
via: "TcptopEvent written to RingBuf"
|
|
pattern: "EVENTS\\.reserve"
|
|
- from: "tcptop/src/collector/linux.rs"
|
|
to: "tcptop-ebpf/src/main.rs"
|
|
via: "loads compiled eBPF bytecode and attaches programs"
|
|
pattern: "include_bytes_aligned"
|
|
- from: "tcptop/src/collector/linux.rs"
|
|
to: "tcptop/src/collector/mod.rs"
|
|
via: "implements NetworkCollector trait"
|
|
pattern: "impl NetworkCollector"
|
|
- from: "tcptop/src/collector/linux.rs"
|
|
to: "tcptop/src/proc_bootstrap.rs"
|
|
via: "calls bootstrap for pre-existing connections"
|
|
pattern: "bootstrap"
|
|
---
|
|
|
|
<objective>
|
|
Implement the eBPF kernel programs and the Linux collector that loads them, reads events from the ring buffer, and translates them into CollectorEvents.
|
|
|
|
Purpose: This is the core data pipeline -- kernel eBPF programs capture real TCP/UDP events and the userspace collector makes them available through the platform-agnostic trait. This is the hardest engineering in the project.
|
|
|
|
Output: Working eBPF programs that attach to kernel hook points, a Linux collector that reads events and pushes them through an mpsc channel, and /proc bootstrapping for pre-existing connections.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/workflows/execute-plan.md
|
|
@/Users/zrowitsch/local_src/tcptop/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/01-data-pipeline/01-CONTEXT.md
|
|
@.planning/phases/01-data-pipeline/01-RESEARCH.md
|
|
@.planning/phases/01-data-pipeline/01-01-SUMMARY.md
|
|
|
|
<interfaces>
|
|
<!-- From Plan 01 -- types the executor must use -->
|
|
|
|
From tcptop-common/src/lib.rs:
|
|
- TcptopEvent { event_type: u32, data: TcptopEventData }
|
|
- TcptopEventData is a `#[repr(C)] pub union` with fields `data_event: DataEvent` and `state_event: StateEvent`
|
|
- DataEvent { pid, comm, af_family, sport, dport, saddr: [u8;16], daddr: [u8;16], bytes, srtt_us }
|
|
- StateEvent { pid, af_family, sport, dport, saddr: [u8;16], daddr: [u8;16], old_state, new_state }
|
|
- EVENT_TCP_SEND=1, EVENT_TCP_RECV=2, EVENT_UDP_SEND=3, EVENT_UDP_RECV=4, EVENT_TCP_STATE=5
|
|
|
|
From tcptop/src/collector/mod.rs:
|
|
- trait NetworkCollector { async fn start(&mut self, tx: mpsc::Sender<CollectorEvent>); async fn stop(&mut self); fn bootstrap_existing(&self) -> Result<Vec<ConnectionRecord>>; }
|
|
- enum CollectorEvent { TcpSend{..}, TcpRecv{..}, UdpSend{..}, UdpRecv{..}, TcpStateChange{..} }
|
|
|
|
From tcptop/src/model.rs:
|
|
- ConnectionKey { protocol, local_addr, local_port, remote_addr, remote_port }
|
|
- ConnectionRecord { key, pid, process_name, tcp_state, bytes_in/out, packets_in/out, rate_in/out, rtt_us, last_seen, is_partial, is_closed }
|
|
- Protocol { Tcp, Udp }
|
|
- TcpState with from_kernel(u32) method
|
|
</interfaces>
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto">
|
|
<name>Task 1: Implement eBPF kernel programs (kprobes + tracepoint)</name>
|
|
<files>
|
|
tcptop-ebpf/src/main.rs
|
|
</files>
|
|
<read_first>
|
|
tcptop-ebpf/src/main.rs (current skeleton from Plan 01),
|
|
tcptop-common/src/lib.rs (shared types -- must use these exact struct layouts including the union),
|
|
.planning/phases/01-data-pipeline/01-RESEARCH.md (hook strategy, code examples, pitfalls 1-5)
|
|
</read_first>
|
|
<action>
|
|
Replace the skeleton eBPF program with full implementations of all 5 hook points. Every program writes a TcptopEvent to the shared EVENTS RingBuf.
|
|
|
|
**eBPF programs to implement:**
|
|
|
|
1. **tcp_sendmsg kprobe** -- attach to `tcp_sendmsg`
|
|
- arg0: `*sock` -- extract connection tuple (saddr, daddr, sport, dport, af_family) via `bpf_probe_read_kernel` from `sock->__sk_common`
|
|
- arg2: `size` -- byte count for this send call
|
|
- Read `srtt_us` from `tcp_sock` (cast sock ptr): the field stores smoothed RTT shifted left by 3, so read raw value (userspace will shift right by 3 per Pitfall 5)
|
|
- Get PID via `bpf_get_current_pid_tgid() >> 32`
|
|
- Get process name via `bpf_get_current_comm()`
|
|
- Write TcptopEvent with event_type = EVENT_TCP_SEND
|
|
|
|
2. **tcp_recvmsg kprobe** -- attach to `tcp_recvmsg`
|
|
- arg0: `*sock` -- same tuple extraction as tcp_sendmsg
|
|
- For byte count: use kretprobe approach. If aya supports kretprobe (via `#[kretprobe]`), capture return value as bytes received. If kretprobe is not straightforward, capture from msghdr arg or use a BPF HashMap to store entry args and read on return.
|
|
- Read `srtt_us` same as tcp_sendmsg
|
|
- Write TcptopEvent with event_type = EVENT_TCP_RECV
|
|
- NOTE: If kretprobe is complex, start with kprobe on entry and use msg->msg_iter.count for byte estimate. Document the approach taken.
|
|
|
|
3. **udp_sendmsg kprobe** -- attach to `udp_sendmsg`
|
|
- arg0: `*sock`, arg2: `size` (byte count)
|
|
- Extract 4-tuple from sock->__sk_common
|
|
- srtt_us = 0 (UDP has no RTT)
|
|
- Write TcptopEvent with event_type = EVENT_UDP_SEND
|
|
|
|
4. **udp_recvmsg kprobe** -- attach to `udp_recvmsg`
|
|
- Same approach as tcp_recvmsg for byte count
|
|
- srtt_us = 0
|
|
- Write TcptopEvent with event_type = EVENT_UDP_RECV
|
|
|
|
5. **inet_sock_set_state tracepoint** -- attach to `sock:inet_sock_set_state`
|
|
- Read tracepoint args: old_state, new_state, sport, dport, saddr, daddr, af_family
|
|
- PID: use `bpf_get_current_pid_tgid()` but note Pitfall 3 -- this fires in softirq context where PID=0.
|
|
- **PID=0 handling approach (chosen: option b -- aggregator-side enrichment):** The eBPF program writes PID=0 as-is. The userspace aggregator in Plan 03 handles this: when `ConnectionTable.update()` receives a `TcpStateChange` with `pid=0`, it looks up the connection by key in the existing table and inherits the PID from the existing `ConnectionRecord`. This works because kprobes on tcp_sendmsg/tcp_recvmsg will have already populated the connection with the correct PID before any state change fires. This avoids adding a BPF HashMap for sock_ptr->pid tracking, keeping the eBPF programs simpler.
|
|
- Write TcptopEvent with event_type = EVENT_TCP_STATE
|
|
|
|
**Critical eBPF constraints:**
|
|
- All `#[repr(C)]` structs from tcptop-common, no deviation. TcptopEventData is a **union** (locked in Plan 01).
|
|
- Use `bpf_probe_read_kernel` for every kernel pointer dereference
|
|
- No unbounded loops, no dynamic allocation
|
|
- Stack limit: 512 bytes per program -- don't copy entire structs onto stack
|
|
- Handle `EVENTS.reserve::<TcptopEvent>(0)` returning `None` gracefully (just return 0, event is dropped)
|
|
- For IPv4: write 4 bytes to saddr[0..4], zero the rest. Set af_family = AF_INET (2)
|
|
- For IPv6: write 16 bytes to saddr[0..16]. Set af_family = AF_INET6 (10)
|
|
|
|
**Helper function pattern:**
|
|
Extract a shared helper for reading the sock tuple to avoid code duplication across the 4 kprobes:
|
|
```rust
|
|
#[inline(always)]
|
|
unsafe fn read_sock_tuple(sock: *const core::ffi::c_void) -> Result<(u16, [u8;16], [u8;16], u16, u16), i64> {
|
|
// Read __sk_common.skc_family, skc_rcv_saddr, skc_daddr, skc_num, skc_dport
|
|
// using bpf_probe_read_kernel for each field
|
|
}
|
|
```
|
|
|
|
**Drop counter (per Pitfall 7):**
|
|
Add an eBPF array map for counting dropped events:
|
|
```rust
|
|
#[map]
|
|
static DROP_COUNT: Array<u32> = Array::with_max_entries(1, 0);
|
|
```
|
|
Increment when `EVENTS.reserve()` returns None.
|
|
</action>
|
|
<verify>
|
|
<automated>cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop-ebpf 2>&1 | tail -10</automated>
|
|
</verify>
|
|
<acceptance_criteria>
|
|
- tcptop-ebpf/src/main.rs contains `#[kprobe]` at least 4 times (tcp_sendmsg, tcp_recvmsg, udp_sendmsg, udp_recvmsg)
|
|
- tcptop-ebpf/src/main.rs contains `#[tracepoint]` at least 1 time (inet_sock_set_state)
|
|
- tcptop-ebpf/src/main.rs contains `EVENTS: RingBuf`
|
|
- tcptop-ebpf/src/main.rs contains `EVENTS.reserve` (writing events to ring buffer)
|
|
- tcptop-ebpf/src/main.rs contains `bpf_get_current_pid_tgid` (PID capture)
|
|
- tcptop-ebpf/src/main.rs contains `bpf_get_current_comm` (process name capture)
|
|
- tcptop-ebpf/src/main.rs contains `bpf_probe_read_kernel` (safe kernel memory reads)
|
|
- tcptop-ebpf/src/main.rs contains `srtt_us` (RTT reading from tcp_sock per DATA-05)
|
|
- tcptop-ebpf/src/main.rs contains `DROP_COUNT` (drop counter per Pitfall 7)
|
|
- tcptop-ebpf/src/main.rs contains `use tcptop_common` (imports shared types)
|
|
- tcptop-ebpf/src/main.rs contains `EVENT_TCP_SEND` AND `EVENT_UDP_SEND` AND `EVENT_TCP_STATE`
|
|
- tcptop-ebpf/src/main.rs accesses union fields via `.data.data_event` and `.data.state_event` (matches locked union layout from Plan 01)
|
|
- Note: `cargo check -p tcptop-ebpf` may fail without bpf-linker installed -- that is acceptable. The code should be syntactically valid Rust.
|
|
</acceptance_criteria>
|
|
<done>Five eBPF programs implemented: 4 kprobes for tcp_sendmsg/tcp_recvmsg/udp_sendmsg/udp_recvmsg and 1 tracepoint for inet_sock_set_state. All write TcptopEvent to shared RingBuf using union layout. State change tracepoint passes PID=0 through to userspace (aggregator handles enrichment). Drop counter tracks ring buffer overflow.</done>
|
|
</task>
|
|
|
|
<task type="auto">
|
|
<name>Task 2: Implement Linux collector and /proc bootstrap</name>
|
|
<files>
|
|
tcptop/src/collector/linux.rs,
|
|
tcptop/src/collector/mod.rs,
|
|
tcptop/src/proc_bootstrap.rs
|
|
</files>
|
|
<read_first>
|
|
tcptop/src/collector/mod.rs (NetworkCollector trait and CollectorEvent enum),
|
|
tcptop/src/model.rs (ConnectionRecord, ConnectionKey, Protocol, TcpState),
|
|
tcptop-common/src/lib.rs (shared types for deserializing ring buffer events -- note TcptopEventData is a union),
|
|
tcptop-ebpf/src/main.rs (to understand what the eBPF programs produce),
|
|
.planning/phases/01-data-pipeline/01-RESEARCH.md (Pattern 1 userspace consumer code, procfs usage, anti-patterns)
|
|
</read_first>
|
|
<action>
|
|
Implement the Linux/eBPF collector and the /proc/net bootstrapper.
|
|
|
|
**tcptop/src/proc_bootstrap.rs (per D-15):**
|
|
Use the `procfs` crate to parse pre-existing connections:
|
|
|
|
```rust
|
|
use crate::model::{ConnectionKey, ConnectionRecord, Protocol, TcpState};
|
|
use anyhow::Result;
|
|
use std::net::IpAddr;
|
|
use std::time::Instant;
|
|
|
|
pub fn bootstrap_connections() -> Result<Vec<ConnectionRecord>> {
|
|
let mut connections = Vec::new();
|
|
|
|
// Parse /proc/net/tcp for TCP connections
|
|
if let Ok(tcp) = procfs::net::tcp() {
|
|
for entry in tcp {
|
|
let key = ConnectionKey {
|
|
protocol: Protocol::Tcp,
|
|
local_addr: entry.local_address.ip(), // procfs crate returns SocketAddr
|
|
local_port: entry.local_address.port(),
|
|
remote_addr: entry.remote_address.ip(),
|
|
remote_port: entry.remote_address.port(),
|
|
};
|
|
connections.push(ConnectionRecord {
|
|
key,
|
|
pid: entry.inode as u32, // inode, not PID -- need /proc/*/fd walk to map
|
|
process_name: String::new(), // enriched later
|
|
tcp_state: TcpState::from_kernel(entry.state as u32),
|
|
bytes_in: 0,
|
|
bytes_out: 0,
|
|
packets_in: 0,
|
|
packets_out: 0,
|
|
rate_in: 0.0,
|
|
rate_out: 0.0,
|
|
prev_bytes_in: 0,
|
|
prev_bytes_out: 0,
|
|
rtt_us: None,
|
|
last_seen: Instant::now(),
|
|
is_partial: true, // per D-15: mark as partial
|
|
is_closed: false,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Parse /proc/net/udp for UDP connections
|
|
if let Ok(udp) = procfs::net::udp() {
|
|
for entry in udp {
|
|
let key = ConnectionKey {
|
|
protocol: Protocol::Udp,
|
|
local_addr: entry.local_address.ip(),
|
|
local_port: entry.local_address.port(),
|
|
remote_addr: entry.remote_address.ip(),
|
|
remote_port: entry.remote_address.port(),
|
|
};
|
|
connections.push(ConnectionRecord {
|
|
key,
|
|
pid: entry.inode as u32,
|
|
process_name: String::new(),
|
|
tcp_state: None, // per D-07: no state for UDP
|
|
bytes_in: 0,
|
|
bytes_out: 0,
|
|
packets_in: 0,
|
|
packets_out: 0,
|
|
rate_in: 0.0,
|
|
rate_out: 0.0,
|
|
prev_bytes_in: 0,
|
|
prev_bytes_out: 0,
|
|
rtt_us: None,
|
|
last_seen: Instant::now(),
|
|
is_partial: true,
|
|
is_closed: false,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Also parse /proc/net/tcp6 and /proc/net/udp6 for IPv6
|
|
// procfs crate may handle this via tcp6() and udp6() methods
|
|
|
|
Ok(connections)
|
|
}
|
|
|
|
/// Walk /proc/*/fd to map socket inodes to PIDs
|
|
/// This enriches the inode-based entries from /proc/net/*
|
|
pub fn enrich_pids(connections: &mut [ConnectionRecord]) -> Result<()> {
|
|
// Use procfs::process::all_processes() to iterate /proc/*/fd
|
|
// For each process, read fd symlinks, find socket:[inode] entries
|
|
// Map inode -> PID and update matching ConnectionRecords
|
|
// This is best-effort -- skip processes we can't read
|
|
if let Ok(procs) = procfs::process::all_processes() {
|
|
for proc_result in procs {
|
|
if let Ok(proc) = proc_result {
|
|
let pid = proc.pid() as u32;
|
|
let comm = proc.stat().map(|s| s.comm).unwrap_or_default();
|
|
if let Ok(fds) = proc.fd() {
|
|
for fd_result in fds {
|
|
if let Ok(fd_info) = fd_result {
|
|
// Check if fd target is a socket
|
|
// Match inode to connection records
|
|
// Update pid and process_name
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
```
|
|
|
|
Note: The exact `procfs` crate API may differ from the above pseudocode. The executor MUST read the actual procfs 0.18 API (check docs.rs/procfs) and adjust method names. The key behaviors are:
|
|
- Parse /proc/net/tcp, /proc/net/tcp6, /proc/net/udp, /proc/net/udp6
|
|
- Map socket inodes to PIDs via /proc/*/fd
|
|
- Mark all bootstrapped connections as `is_partial: true`
|
|
|
|
**tcptop/src/collector/linux.rs:**
|
|
Implement `LinuxCollector` struct that implements `NetworkCollector` trait.
|
|
|
|
```rust
|
|
use super::{CollectorEvent, NetworkCollector};
|
|
use crate::model::{ConnectionKey, ConnectionRecord, Protocol};
|
|
use crate::proc_bootstrap;
|
|
use anyhow::{Context, Result};
|
|
use aya::maps::RingBuf;
|
|
use aya::programs::{KProbe, TracePoint};
|
|
use aya::Ebpf;
|
|
use log::{info, warn};
|
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
|
use tcptop_common::*;
|
|
use tokio::io::unix::AsyncFd;
|
|
use tokio::sync::mpsc;
|
|
use std::os::fd::AsRawFd;
|
|
|
|
pub struct LinuxCollector {
|
|
bpf: Option<Ebpf>,
|
|
}
|
|
|
|
impl LinuxCollector {
|
|
pub fn new() -> Result<Self> {
|
|
Ok(Self { bpf: None })
|
|
}
|
|
|
|
fn load_and_attach(&mut self) -> Result<()> {
|
|
// Load eBPF bytecode compiled by aya-build
|
|
let mut bpf = Ebpf::load(
|
|
aya::include_bytes_aligned!(concat!(env!("OUT_DIR"), "/tcptop"))
|
|
)?;
|
|
|
|
// Initialize aya-log for kernel-side debug logging
|
|
if let Err(e) = aya_log::EbpfLogger::init(&mut bpf) {
|
|
warn!("Failed to init eBPF logger: {}", e);
|
|
}
|
|
|
|
// Attach kprobes
|
|
let program: &mut KProbe = bpf.program_mut("tcp_sendmsg").unwrap().try_into()?;
|
|
program.load()?;
|
|
program.attach("tcp_sendmsg", 0)?;
|
|
|
|
let program: &mut KProbe = bpf.program_mut("tcp_recvmsg").unwrap().try_into()?;
|
|
program.load()?;
|
|
program.attach("tcp_recvmsg", 0)?;
|
|
|
|
let program: &mut KProbe = bpf.program_mut("udp_sendmsg").unwrap().try_into()?;
|
|
program.load()?;
|
|
program.attach("udp_sendmsg", 0)?;
|
|
|
|
let program: &mut KProbe = bpf.program_mut("udp_recvmsg").unwrap().try_into()?;
|
|
program.load()?;
|
|
program.attach("udp_recvmsg", 0)?;
|
|
|
|
// Attach tracepoint
|
|
let program: &mut TracePoint = bpf.program_mut("inet_sock_set_state").unwrap().try_into()?;
|
|
program.load()?;
|
|
program.attach("sock", "inet_sock_set_state")?;
|
|
|
|
self.bpf = Some(bpf);
|
|
Ok(())
|
|
}
|
|
|
|
fn parse_event(raw: &[u8]) -> Option<CollectorEvent> {
|
|
if raw.len() < core::mem::size_of::<TcptopEvent>() {
|
|
return None;
|
|
}
|
|
let event: &TcptopEvent = unsafe { &*(raw.as_ptr() as *const TcptopEvent) };
|
|
|
|
match event.event_type {
|
|
EVENT_TCP_SEND | EVENT_TCP_RECV => {
|
|
let d = unsafe { &event.data.data_event };
|
|
let key = Self::data_event_to_key(d, Protocol::Tcp);
|
|
let comm = Self::comm_to_string(&d.comm);
|
|
let srtt = d.srtt_us >> 3; // Pitfall 5: shift right by 3 for actual microseconds
|
|
if event.event_type == EVENT_TCP_SEND {
|
|
Some(CollectorEvent::TcpSend { key, pid: d.pid, comm, bytes: d.bytes, srtt_us: srtt })
|
|
} else {
|
|
Some(CollectorEvent::TcpRecv { key, pid: d.pid, comm, bytes: d.bytes, srtt_us: srtt })
|
|
}
|
|
}
|
|
EVENT_UDP_SEND | EVENT_UDP_RECV => {
|
|
let d = unsafe { &event.data.data_event };
|
|
let key = Self::data_event_to_key(d, Protocol::Udp);
|
|
let comm = Self::comm_to_string(&d.comm);
|
|
if event.event_type == EVENT_UDP_SEND {
|
|
Some(CollectorEvent::UdpSend { key, pid: d.pid, comm, bytes: d.bytes })
|
|
} else {
|
|
Some(CollectorEvent::UdpRecv { key, pid: d.pid, comm, bytes: d.bytes })
|
|
}
|
|
}
|
|
EVENT_TCP_STATE => {
|
|
let s = unsafe { &event.data.state_event };
|
|
let key = ConnectionKey {
|
|
protocol: Protocol::Tcp,
|
|
local_addr: Self::parse_addr(s.af_family, &s.saddr),
|
|
local_port: s.sport,
|
|
remote_addr: Self::parse_addr(s.af_family, &s.daddr),
|
|
remote_port: s.dport,
|
|
};
|
|
Some(CollectorEvent::TcpStateChange { key, pid: s.pid, old_state: s.old_state, new_state: s.new_state })
|
|
}
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
fn data_event_to_key(d: &DataEvent, protocol: Protocol) -> ConnectionKey {
|
|
ConnectionKey {
|
|
protocol,
|
|
local_addr: Self::parse_addr(d.af_family, &d.saddr),
|
|
local_port: d.sport,
|
|
remote_addr: Self::parse_addr(d.af_family, &d.daddr),
|
|
remote_port: d.dport,
|
|
}
|
|
}
|
|
|
|
fn parse_addr(af_family: u16, bytes: &[u8; 16]) -> IpAddr {
|
|
if af_family == AF_INET {
|
|
IpAddr::V4(Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]))
|
|
} else {
|
|
IpAddr::V6(Ipv6Addr::from(*bytes))
|
|
}
|
|
}
|
|
|
|
fn comm_to_string(comm: &[u8; 16]) -> String {
|
|
let end = comm.iter().position(|&b| b == 0).unwrap_or(16);
|
|
String::from_utf8_lossy(&comm[..end]).to_string()
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl NetworkCollector for LinuxCollector {
|
|
async fn start(&mut self, tx: mpsc::Sender<CollectorEvent>) -> Result<()> {
|
|
self.load_and_attach().context("Failed to load eBPF programs")?;
|
|
|
|
let bpf = self.bpf.as_mut().unwrap();
|
|
let ring_buf = RingBuf::try_from(bpf.map_mut("EVENTS").unwrap())?;
|
|
let async_fd = AsyncFd::new(ring_buf)?;
|
|
// The exact AsyncFd wrapping may differ -- aya's RingBuf may need
|
|
// to be wrapped differently. Check aya 0.13 docs for the async ring buffer API.
|
|
// Alternative: use aya::maps::ring_buf::RingBuf with poll-based reading in a tokio::spawn.
|
|
|
|
info!("eBPF programs attached, reading events...");
|
|
|
|
loop {
|
|
let mut guard = async_fd.readable().await?;
|
|
while let Some(item) = ring_buf.next() {
|
|
if let Some(event) = Self::parse_event(&item) {
|
|
if tx.send(event).await.is_err() {
|
|
return Ok(()); // receiver dropped, shutting down
|
|
}
|
|
}
|
|
}
|
|
guard.clear_ready();
|
|
}
|
|
}
|
|
|
|
async fn stop(&mut self) -> Result<()> {
|
|
// Dropping the Ebpf struct detaches all programs
|
|
self.bpf.take();
|
|
Ok(())
|
|
}
|
|
|
|
fn bootstrap_existing(&self) -> Result<Vec<ConnectionRecord>> {
|
|
proc_bootstrap::bootstrap_connections()
|
|
}
|
|
}
|
|
```
|
|
|
|
**IMPORTANT:** The exact aya 0.13 API for async RingBuf reading may differ from the above pseudocode. The executor MUST:
|
|
1. Check if `aya::maps::RingBuf` implements `AsRawFd` (needed for AsyncFd)
|
|
2. If not, check for `aya::maps::ring_buf::AsyncRingBuf` or similar
|
|
3. Check if `include_bytes_aligned!` is the correct macro path
|
|
4. Adapt the loading and reading code to match the actual aya 0.13.1 API
|
|
|
|
The key behaviors that MUST be preserved regardless of API details:
|
|
- Load compiled eBPF bytecode
|
|
- Attach to all 5 hook points
|
|
- Read events from ring buffer asynchronously
|
|
- Parse TcptopEvent using the **union** layout: access `event.data.data_event` or `event.data.state_event` based on `event_type` tag
|
|
- Send events through mpsc channel
|
|
- Apply srtt_us >> 3 for RTT conversion (Pitfall 5)
|
|
- Handle IPv4 and IPv6 addresses correctly
|
|
|
|
**Update tcptop/src/collector/mod.rs:**
|
|
Uncomment or add `pub mod linux;` if it was a placeholder. Make sure `linux.rs` is properly declared as a submodule.
|
|
</action>
|
|
<verify>
|
|
<automated>cd /Users/zrowitsch/local_src/tcptop && cargo check -p tcptop 2>&1 | tail -10</automated>
|
|
</verify>
|
|
<acceptance_criteria>
|
|
- tcptop/src/collector/linux.rs contains `impl NetworkCollector for LinuxCollector`
|
|
- tcptop/src/collector/linux.rs contains `include_bytes_aligned!` (loading eBPF bytecode)
|
|
- tcptop/src/collector/linux.rs contains `KProbe` AND `TracePoint` (program types)
|
|
- tcptop/src/collector/linux.rs contains `tcp_sendmsg` AND `tcp_recvmsg` AND `udp_sendmsg` AND `udp_recvmsg` (kprobe attachment names)
|
|
- tcptop/src/collector/linux.rs contains `inet_sock_set_state` (tracepoint attachment)
|
|
- tcptop/src/collector/linux.rs contains `srtt_us >> 3` (RTT shift per Pitfall 5)
|
|
- tcptop/src/collector/linux.rs contains `AF_INET` AND `Ipv4Addr` AND `Ipv6Addr` (dual-stack address parsing)
|
|
- tcptop/src/collector/linux.rs contains `tx.send` (sending events through mpsc channel)
|
|
- tcptop/src/collector/linux.rs accesses `event.data.data_event` and `event.data.state_event` (union access matching Plan 01 locked layout)
|
|
- tcptop/src/proc_bootstrap.rs contains `is_partial: true` (per D-15)
|
|
- tcptop/src/proc_bootstrap.rs contains `proc/net/tcp` (parsing pre-existing TCP connections)
|
|
- tcptop/src/proc_bootstrap.rs contains `proc/net/udp` OR `udp()` (parsing pre-existing UDP connections)
|
|
- tcptop/src/collector/mod.rs contains `pub mod linux`
|
|
- `cargo check -p tcptop` succeeds (the userspace crate compiles)
|
|
</acceptance_criteria>
|
|
<done>Linux eBPF collector implements NetworkCollector trait, loads and attaches 5 eBPF programs, reads events from RingBuf asynchronously, and converts them to CollectorEvents using union access. /proc bootstrap provides pre-existing connections marked as partial. TcpStateChange events pass PID=0 through to aggregator for enrichment. All address parsing handles both IPv4 and IPv6.</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
1. `cargo check -p tcptop` exits 0 (userspace with collector compiles)
|
|
2. `grep -c "kprobe" tcptop-ebpf/src/main.rs` returns >= 4
|
|
3. `grep "tracepoint" tcptop-ebpf/src/main.rs` returns at least 1 match
|
|
4. `grep "impl NetworkCollector" tcptop/src/collector/linux.rs` returns a match
|
|
5. `grep "is_partial: true" tcptop/src/proc_bootstrap.rs` returns at least 1 match
|
|
6. `grep "srtt_us >> 3" tcptop/src/collector/linux.rs` returns a match
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
- eBPF crate has 5 programs (4 kprobes + 1 tracepoint) writing TcptopEvent to RingBuf
|
|
- LinuxCollector implements NetworkCollector with load/attach/read/parse pipeline
|
|
- /proc bootstrap parses tcp, tcp6, udp, udp6 and marks connections as partial
|
|
- Event parsing correctly handles IPv4 and IPv6 addresses
|
|
- RTT values are shifted right by 3 in userspace
|
|
- TcpStateChange events with PID=0 are passed through (aggregator handles enrichment per Pitfall 3)
|
|
- Userspace crate compiles (eBPF crate compilation requires bpf-linker)
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/01-data-pipeline/01-02-SUMMARY.md`
|
|
</output>
|