Files
Zachary D. Rowitsch 38e6dcc34a chore: archive v1.0 phase directories to milestones/v1.0-phases/
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 01:33:15 -04:00

123 lines
4.9 KiB
Markdown

---
phase: 01-data-pipeline
plan: 03
subsystem: data-pipeline
tags: [tokio, aggregator, streaming-output, bandwidth-calculation, connection-lifecycle]
# Dependency graph
requires:
- phase: 01-data-pipeline/01
provides: "shared data model (ConnectionKey, ConnectionRecord, Protocol, TcpState) and privilege check"
- phase: 01-data-pipeline/02
provides: "LinuxCollector with eBPF kprobes, RingBuf event loop, CollectorEvent enum, proc_bootstrap"
provides:
- "ConnectionTable aggregator with tick-based bandwidth rate calculation"
- "UDP flow timeout (5s idle expiry)"
- "TCP close lifecycle with [CLOSED] output"
- "Streaming stdout formatter with human-readable sizes"
- "Full tokio event loop wiring collector -> aggregator -> output"
- "Pipeline integration tests proving data flow without eBPF/root"
affects: [02-tui, 03-output-packaging]
# Tech tracking
tech-stack:
added: [tokio-signal]
patterns: [lib.rs re-export for integration testing, tokio::select! event loop, mpsc channel decoupling]
key-files:
created:
- tcptop/src/aggregator.rs
- tcptop/src/output.rs
- tcptop/src/lib.rs
- tcptop/tests/pipeline_test.rs
modified:
- tcptop/src/main.rs
key-decisions:
- "Created lib.rs to expose modules for integration testing (binary crates cant be imported by tests)"
- "Used tokio::signal instead of signal-hook crate for SIGINT/SIGTERM (already in tokio context)"
- "mpsc channel buffer 4096 to decouple RingBuf reader from aggregator processing"
patterns-established:
- "lib.rs re-exports pub modules; main.rs is thin wrapper"
- "tokio::select! multiplexes events, periodic tick, and signal handling"
- "ConnectionTable::tick() returns (active, closed) tuple for output"
requirements-completed: [DATA-06, DATA-07]
# Metrics
duration: 2min
completed: 2026-03-21
---
# Phase 01 Plan 03: Data Pipeline Wiring Summary
**Connection aggregator with tick-based bandwidth calculation, UDP timeout, TCP close lifecycle, and streaming stdout output wired through tokio event loop**
## Performance
- **Duration:** 2 min
- **Started:** 2026-03-21T23:20:27Z
- **Completed:** 2026-03-21T23:22:52Z
- **Tasks:** 2
- **Files modified:** 5
## Accomplishments
- ConnectionTable processes all 5 CollectorEvent variants with bandwidth rate calculation per tick
- PID=0 enrichment for TcpStateChange events (inherits PID from existing connection record)
- UDP flows expire after 5s idle; closed TCP connections produce [CLOSED] line and are removed
- Human-readable formatters for bytes (500B, 2048 (2.0K)), rates (1.5KB/s), and RTT (5.0ms)
- Full tokio event loop with graceful SIGINT/SIGTERM shutdown
- 4 pipeline integration tests prove data flow from synthetic events through aggregator to output
## Task Commits
Each task was committed atomically:
1. **Task 1: Implement aggregator and output formatter** - `c091aaa` (feat)
2. **Task 2: Wire tokio event loop and pipeline tests** - `a99c088` (feat)
## Files Created/Modified
- `tcptop/src/aggregator.rs` - ConnectionTable with tick-based rate calculation, UDP timeout, TCP close lifecycle
- `tcptop/src/output.rs` - Streaming stdout formatter with human-readable sizes, [CLOSED]/[PARTIAL] markers
- `tcptop/src/lib.rs` - Public module re-exports for integration testing
- `tcptop/src/main.rs` - Full tokio::select! event loop wiring collector, aggregator, and output
- `tcptop/tests/pipeline_test.rs` - 4 integration tests proving pipeline data flow
## Decisions Made
- Created lib.rs to expose modules for integration testing -- binary crates cannot be imported by tests, so lib.rs re-exports pub modules
- Used tokio::signal instead of signal-hook crate for graceful shutdown since we're already in a tokio context
- mpsc channel buffer size 4096 to decouple the RingBuf reader from aggregator processing per anti-pattern guidance
## Deviations from Plan
### Auto-fixed Issues
**1. [Rule 3 - Blocking] Created lib.rs for integration test access**
- **Found during:** Task 2 (pipeline unit test)
- **Issue:** Binary crate modules cannot be imported by integration tests via `tcptop::aggregator::*`
- **Fix:** Created lib.rs that re-exports pub modules; main.rs uses `tcptop::` imports
- **Files modified:** tcptop/src/lib.rs, tcptop/src/main.rs
- **Verification:** `cargo test -p tcptop --test pipeline_test` passes all 4 tests
- **Committed in:** a99c088 (Task 2 commit)
---
**Total deviations:** 1 auto-fixed (1 blocking)
**Impact on plan:** Standard Rust pattern for testable binaries. No scope creep.
## Issues Encountered
None
## User Setup Required
None - no external service configuration required.
## Next Phase Readiness
- Phase 1 data pipeline complete: eBPF events flow from kernel through collector, aggregator, to formatted stdout
- Ready for Phase 2 (TUI) to replace streaming output with ratatui-based interface
- ConnectionTable API (tick, update, seed) provides the data layer the TUI will consume
---
*Phase: 01-data-pipeline*
*Completed: 2026-03-21*