123 lines
4.9 KiB
Markdown
123 lines
4.9 KiB
Markdown
---
|
|
phase: 01-data-pipeline
|
|
plan: 03
|
|
subsystem: data-pipeline
|
|
tags: [tokio, aggregator, streaming-output, bandwidth-calculation, connection-lifecycle]
|
|
|
|
# Dependency graph
|
|
requires:
|
|
- phase: 01-data-pipeline/01
|
|
provides: "shared data model (ConnectionKey, ConnectionRecord, Protocol, TcpState) and privilege check"
|
|
- phase: 01-data-pipeline/02
|
|
provides: "LinuxCollector with eBPF kprobes, RingBuf event loop, CollectorEvent enum, proc_bootstrap"
|
|
provides:
|
|
- "ConnectionTable aggregator with tick-based bandwidth rate calculation"
|
|
- "UDP flow timeout (5s idle expiry)"
|
|
- "TCP close lifecycle with [CLOSED] output"
|
|
- "Streaming stdout formatter with human-readable sizes"
|
|
- "Full tokio event loop wiring collector -> aggregator -> output"
|
|
- "Pipeline integration tests proving data flow without eBPF/root"
|
|
affects: [02-tui, 03-output-packaging]
|
|
|
|
# Tech tracking
|
|
tech-stack:
|
|
added: [tokio-signal]
|
|
patterns: [lib.rs re-export for integration testing, tokio::select! event loop, mpsc channel decoupling]
|
|
|
|
key-files:
|
|
created:
|
|
- tcptop/src/aggregator.rs
|
|
- tcptop/src/output.rs
|
|
- tcptop/src/lib.rs
|
|
- tcptop/tests/pipeline_test.rs
|
|
modified:
|
|
- tcptop/src/main.rs
|
|
|
|
key-decisions:
|
|
- "Created lib.rs to expose modules for integration testing (binary crates cant be imported by tests)"
|
|
- "Used tokio::signal instead of signal-hook crate for SIGINT/SIGTERM (already in tokio context)"
|
|
- "mpsc channel buffer 4096 to decouple RingBuf reader from aggregator processing"
|
|
|
|
patterns-established:
|
|
- "lib.rs re-exports pub modules; main.rs is thin wrapper"
|
|
- "tokio::select! multiplexes events, periodic tick, and signal handling"
|
|
- "ConnectionTable::tick() returns (active, closed) tuple for output"
|
|
|
|
requirements-completed: [DATA-06, DATA-07]
|
|
|
|
# Metrics
|
|
duration: 2min
|
|
completed: 2026-03-21
|
|
---
|
|
|
|
# Phase 01 Plan 03: Data Pipeline Wiring Summary
|
|
|
|
**Connection aggregator with tick-based bandwidth calculation, UDP timeout, TCP close lifecycle, and streaming stdout output wired through tokio event loop**
|
|
|
|
## Performance
|
|
|
|
- **Duration:** 2 min
|
|
- **Started:** 2026-03-21T23:20:27Z
|
|
- **Completed:** 2026-03-21T23:22:52Z
|
|
- **Tasks:** 2
|
|
- **Files modified:** 5
|
|
|
|
## Accomplishments
|
|
- ConnectionTable processes all 5 CollectorEvent variants with bandwidth rate calculation per tick
|
|
- PID=0 enrichment for TcpStateChange events (inherits PID from existing connection record)
|
|
- UDP flows expire after 5s idle; closed TCP connections produce [CLOSED] line and are removed
|
|
- Human-readable formatters for bytes (500B, 2048 (2.0K)), rates (1.5KB/s), and RTT (5.0ms)
|
|
- Full tokio event loop with graceful SIGINT/SIGTERM shutdown
|
|
- 4 pipeline integration tests prove data flow from synthetic events through aggregator to output
|
|
|
|
## Task Commits
|
|
|
|
Each task was committed atomically:
|
|
|
|
1. **Task 1: Implement aggregator and output formatter** - `c091aaa` (feat)
|
|
2. **Task 2: Wire tokio event loop and pipeline tests** - `a99c088` (feat)
|
|
|
|
## Files Created/Modified
|
|
- `tcptop/src/aggregator.rs` - ConnectionTable with tick-based rate calculation, UDP timeout, TCP close lifecycle
|
|
- `tcptop/src/output.rs` - Streaming stdout formatter with human-readable sizes, [CLOSED]/[PARTIAL] markers
|
|
- `tcptop/src/lib.rs` - Public module re-exports for integration testing
|
|
- `tcptop/src/main.rs` - Full tokio::select! event loop wiring collector, aggregator, and output
|
|
- `tcptop/tests/pipeline_test.rs` - 4 integration tests proving pipeline data flow
|
|
|
|
## Decisions Made
|
|
- Created lib.rs to expose modules for integration testing -- binary crates cannot be imported by tests, so lib.rs re-exports pub modules
|
|
- Used tokio::signal instead of signal-hook crate for graceful shutdown since we're already in a tokio context
|
|
- mpsc channel buffer size 4096 to decouple the RingBuf reader from aggregator processing per anti-pattern guidance
|
|
|
|
## Deviations from Plan
|
|
|
|
### Auto-fixed Issues
|
|
|
|
**1. [Rule 3 - Blocking] Created lib.rs for integration test access**
|
|
- **Found during:** Task 2 (pipeline unit test)
|
|
- **Issue:** Binary crate modules cannot be imported by integration tests via `tcptop::aggregator::*`
|
|
- **Fix:** Created lib.rs that re-exports pub modules; main.rs uses `tcptop::` imports
|
|
- **Files modified:** tcptop/src/lib.rs, tcptop/src/main.rs
|
|
- **Verification:** `cargo test -p tcptop --test pipeline_test` passes all 4 tests
|
|
- **Committed in:** a99c088 (Task 2 commit)
|
|
|
|
---
|
|
|
|
**Total deviations:** 1 auto-fixed (1 blocking)
|
|
**Impact on plan:** Standard Rust pattern for testable binaries. No scope creep.
|
|
|
|
## Issues Encountered
|
|
None
|
|
|
|
## User Setup Required
|
|
None - no external service configuration required.
|
|
|
|
## Next Phase Readiness
|
|
- Phase 1 data pipeline complete: eBPF events flow from kernel through collector, aggregator, to formatted stdout
|
|
- Ready for Phase 2 (TUI) to replace streaming output with ratatui-based interface
|
|
- ConnectionTable API (tick, update, seed) provides the data layer the TUI will consume
|
|
|
|
---
|
|
*Phase: 01-data-pipeline*
|
|
*Completed: 2026-03-21*
|