feat(flow-collector): horizontal scale via NATS KV TemplateStore + Da…#3148
feat(flow-collector): horizontal scale via NATS KV TemplateStore + Da…#3148mikemiles-dev wants to merge 2 commits into
Conversation
Review Summary by Qodo(Agentic_describe updated until commit d8e67a9)Horizontal scale flow collector via NATS KV TemplateStore and DaemonSet deployment
WalkthroughsDescription• Enable horizontal scaling of flow collector via NATS KV shared template store • Switch K8s deployment from single Deployment to per-node DaemonSet with LoadBalancer service • Preserve exporter source IP through load balancer using externalTrafficPolicy: Local • Add Prometheus metrics endpoint with hand-rolled HTTP server and template-store counters • Implement background metrics ticker to aggregate per-source template-store statistics Diagramflowchart LR
exporters["Exporters<br/>UDP NetFlow/IPFIX/sFlow"]
lb["Cloud L4 LB<br/>externalTrafficPolicy: Local"]
daemonset["DaemonSet<br/>one pod per node"]
nats_kv["NATS JetStream KV<br/>flow_templates bucket"]
prometheus["Prometheus<br/>metrics endpoint"]
exporters -->|source IP preserved| lb
lb -->|distribute across nodes| daemonset
daemonset -->|read/write templates| nats_kv
daemonset -->|expose metrics| prometheus
File Changes1. rust/flow-collector/Cargo.toml
|
Code Review by Qodo
1. Non-ASCII in scaling guide
|
…emonSet Wire the netflow_parser 1.0.3 TemplateStore extension point into the flow collector and switch the K8s deployment from a single Deployment to a per-node DaemonSet behind `Service: LoadBalancer` with `externalTrafficPolicy: Local`. Together these let the collector scale horizontally with node count while preserving exporter source IP and sharing learned NetFlow / IPFIX templates across pods via NATS JetStream KV. Why --- A single flow-collector Deployment was a singleton bottleneck for UDP ingest. Scaling required either source-IP-affinity routing (so every exporter always lands on the same pod and templates stay local) or a shared template store so any pod could decode any flow. We picked the shared-store path because it doesn't depend on the LB layer. The original plan was to front this with Envoy Gateway UDPRoute, but a research spike found UDPRoute does not preserve client source IP today — upstreams see the gateway pod's IP, which would collapse AutoScopedParser's per-(source_ip,source_id) scoping. Service + DaemonSet + `externalTrafficPolicy: Local` is the workable alternative on MetalLB (L2 mode preserves source IP through to the pod). Gateway API stays available for the rest of the stack (HTTP/gRPC/mTLS). Code ---- * `Cargo.toml`: bump netflow_parser 1.0.0 -> 1.0.3 (TemplateStore API); add `bytes` for the NATS KV payload type. * `src/nats_client.rs` (new): Shared `connect_once` and `connect_with_retry` helpers. Both publisher and template-store bootstrap go through these, applying TLS root CA + client cert + creds file from `Config.security` and `Config.nats_creds_file`. Eliminates the easy-to-miss bug where one path used bare `async_nats::connect(url)` and silently broke mTLS deployments. Bounded backoff (60 attempts, 0.5s -> 30s) so neither path crash-loops the pod when NATS is slow to come up. * `src/template_store.rs` (new): `NatsKvTemplateStore` implements the parser's sync `TemplateStore` trait. async_nats is async-only so we bridge via `tokio::task::block_in_place` + `Handle::block_on`. NATS KV keys are restricted to `[A-Za-z0-9._=/-]`; the scope produced by AutoScopedParser (`v9:1.2.3.4:2055/0`, IPv6 with brackets, etc.) is sanitized via underscore replacement. `kind_tag` wildcard arm warns once via `std::sync::Once` if a future netflow_parser version ships a new `TemplateKind` variant. * `src/netflow/mod.rs`: `NetflowHandler::new` accepts an optional `Arc<dyn TemplateStore>` and threads it through `NetflowParserBuilder::with_template_store`. New `TemplateEvent:: Restored` arm in the event-callback (logs at info). When a store is configured, spawns a 1Hz background ticker that aggregates per-source `CacheMetrics` into the listener-level Prometheus counters via a `DeltaState` that tracks per-source last-known values plus a `retired` total — so listener counters never decrease when sources are LRU-evicted. Off the parse_datagram hot path. * `src/listener.rs`: `build_handler` takes `Option<Arc<dyn TemplateStore>>`. sFlow ignores it (template-less). * `src/config.rs`: new optional `TemplateStoreConfig` with `kv_bucket`, `kv_history` (u8, validated `1..=64`), `kv_ttl_secs`, and optional `nats_url` override for split-fault-domain setups. * `src/main.rs`: `bootstrap_template_store(&Config, &TemplateStoreConfig)` uses `nats_client::connect_with_retry` (so TLS/creds/retry match the publisher), then `create_or_update_key_value` for genuinely idempotent KV bucket bootstrap across config drift. Independent NATS connection from the publisher's so KV failures cannot stall publishing and vice versa. * `src/metrics.rs`: `ListenerMetrics` gains `template_store_restored`, `template_store_codec_errors`, `template_store_backend_errors`, `source_count` atomics. New `render_prometheus()` produces text exposition format with HELP/TYPE headers and properly escaped label values (`\` -> `\\`, `"` -> `\"`, `\n` -> `\n`). sFlow listeners are filtered out of `template_store_*` and `flow_collector_sources` rows since those are structurally zero for sFlow. New `run_prometheus_server()` is a hand-rolled HTTP/1.1 server on tokio's TcpListener — `GET /metrics` returns the exposition; anything else 404s. No new deps. Defensive measures against slowloris-style attacks: per-line read timeout (3s), response write timeout (5s), 8 KiB max-request-bytes cap via `AsyncReadExt::take`, and a `Semaphore`-bounded concurrency cap (64) that fast-fails excess connections rather than queueing them so an attacker cannot exhaust file descriptors. Spawned from main.rs when `metrics_addr` is set; runs independently of the log reporter and the publisher so a scrape failure can never stall ingestion. K8s manifests ------------- * `k8s/demo/base/serviceradar-flow-collector.yaml`: `Deployment` -> `DaemonSet`, replicas dropped, unused PVC removed (the Rust binary writes nothing to disk). ConfigMap JSON gains the `template_store` block. Service was already `externalTrafficPolicy: Local` — kept. Added `RollingUpdate` with `maxUnavailable: 1`. HTTP probes on the `/metrics` endpoint replace the old `pgrep` exec probes (catches deadlocked-but-running scenarios). * `helm/serviceradar/templates/flow-collector.yaml`: same kind flip, PVC removal, plus `nodeSelector` / `tolerations` knobs. `templates/NOTES.txt` warns operators about the `replicaCount` and `data.*` value removals and prints the explicit `kubectl delete deployment / pvc` commands needed to clean up orphaned objects from a pre-DaemonSet install. * `helm/serviceradar/values.yaml`: drop `replicaCount` and `data.*`, add `flowCollector.config.template_store` block, default `service.type: LoadBalancer` and `service.externalTrafficPolicy: Local`, add the IPFIX (4739) and metrics (50046) ports. Tests ----- 55 tests pass; `clippy --all-targets -D warnings` clean. New tests: * 5 `NatsKvTemplateStore` key-render tests (IPv4, IPv6, empty scope, distinct scopes, distinct kinds). * 2 `TemplateStoreConfig` deserialization tests. * 2 `DeltaState` monotonic-counter tests covering source eviction and re-emergence and multiple-sources / multiple-kinds aggregation. * `escape_label_handles_quotes_backslashes_newlines`, `template_store_metrics_only_emit_for_netflow_listeners`. * `http_server_serves_metrics_and_404` — end-to-end including a deliberate split-read of the request line that the old single-`read()` code would have failed. * `http_server_drops_idle_connection_within_read_timeout` — slow client closed within 2x READ_TIMEOUT (was: would hang forever). * `http_server_concurrency_cap_fast_fails_excess_connections` — saturate with 64 idle conns, verify a fresh request doesn't hang past 2s (was: would queue and starve real scrapers). Docs ---- `docs/docs/flow-collector-scaling.md`: deployment model with diagram and rationale for `externalTrafficPolicy: Local`; configuration knobs table; small / medium / large / very-large sizing rules of thumb; Prometheus metrics with healthy-vs-trouble interpretations and alerting suggestions; signals you've outgrown the design plus the sharding strategy when you do; "Upgrading from a Pre-DaemonSet Install" section with explicit `kubectl delete deployment / pvc` commands; "Rolling Upgrade Behavior" section explaining how `externalTrafficPolicy: Local` interacts with DaemonSet rolling updates and why `maxUnavailable: 1` is the floor; deploy verification checklist (LB IP, source-IP smoke test, cross-pod template share check, Prometheus scrape). Deliberately avoids speculative throughput numbers — flagged as "benchmark on your hardware" — until real load tests fill them in.
5eac517 to
d8e67a9
Compare
|
Persistent review updated to latest commit d8e67a9 |
…aling doc - Add a default netflow listener on 0.0.0.0:4739 in Helm values and the demo manifest so the advertised IPFIX Service port has a real UDP socket behind it instead of silently dropping datagrams. - Gate the flow-collector liveness/readiness probes on service.ports.metrics.enabled AND config.metrics_addr; fall back to a pgrep exec probe when metrics are off so disabling /metrics no longer crashloops the pod. Document the implication near the demo values' metrics.enabled toggle. - Replace non-ASCII characters (em/en dashes, arrows, box drawing, less-than-or-equal) in docs/docs/flow-collector-scaling.md with ASCII equivalents to satisfy the docs ASCII-only constraint.
…emonSet
Wires the new netflow_parser 1.0.3 TemplateStore extension point into the flow-collector and switches the K8s deployment from a single Deployment to a per-node DaemonSet behind a
Service: LoadBalancerwithexternalTrafficPolicy: Local. Together these two changes let the flow collector scale horizontally with node count while preserving exporter source IP and sharing learned NetFlow / IPFIX templates across pods.Why
A single flow-collector Deployment was a singleton bottleneck for UDP ingest. Scaling it required either source-IP-affinity routing (so every exporter always lands on the same pod and templates stay local) or a shared template store so any pod could decode any flow. We picked the shared-store path because it doesn't depend on the LB layer.
The original plan was to front this with Envoy Gateway UDPRoute, but a research spike found that Envoy Gateway's UDPRoute does not preserve client source IP (https://gateway.envoyproxy.io/docs/tasks/traffic/udp-routing/) — upstreams see the gateway pod's IP, which would collapse AutoScopedParser's per-(source_ip,source_id) scoping. Service+DaemonSet +
externalTrafficPolicy: Localis the workable alternative on MetalLB (L2 mode preserves source IP through to the pod). Gateway API stays available for the rest of the stack (HTTP/gRPC/mTLS).Code changes
Cargo.toml: bump netflow_parser 1.0.0 -> 1.0.3 (TemplateStore API); addbytesfor the NATS KV payload type.src/template_store.rs(new): NatsKvTemplateStore implements the parser's TemplateStore trait. async_nats is async-only and the trait is sync, so we bridge viatokio::task::block_in_place+Handle::block_on. Safe under the multi_thread runtime the collector already uses. NATS KV keys are restricted to [A-Za-z0-9._=/-]; the scope produced by AutoScopedParser ("v9:1.2.3.4:2055/0", IPv6 with brackets, etc.) is sanitized via underscore replacement. Five render tests cover IPv4, IPv6, empty scope, distinct-scopes and distinct-kinds.src/netflow/mod.rs: NetflowHandler::new accepts an optionalArc<dyn TemplateStore>and threads it throughNetflowParserBuilder::with_template_store(...). Adds the new TemplateEvent::Restored arm to the existing event-callback (logs at info). Addsupdate_template_store_metrics()which iterates each per-source NetflowParser viaAutoScopedParser::{ipfix,v9,legacy}_info(), sumstemplate_store_*counters from eachCacheMetrics, and writes the totals into ListenerMetrics atomics. Called under the parser lock inside parse_datagram so the snapshot is always fresh on the next scrape.src/listener.rs: build_handler now takesOption<Arc<dyn TemplateStore>>. sFlow ignores it (template-less).src/config.rs: new optional TemplateStoreConfig{ kv_bucket, kv_history, kv_ttl_secs }. Two deserialization tests.src/main.rs: newbootstrap_template_store()opens an independent NATS connection (separate from the publisher's), gets-or-creates the KV bucket idempotently, and wraps it in NatsKvTemplateStore. Kept on its own connection so KV-bucket flakiness can't backpressure publish and vice versa.src/metrics.rs: ListenerMetrics gains template_store_restored, template_store_codec_errors, template_store_backend_errors, source_count atomics. Newrender_prometheus()produces text exposition format (8 metrics, HELP/TYPE headers, protocol + listen_addr labels). Newrun_prometheus_server()is a hand-rolled HTTP/1.1 server on tokio's TcpListener —GET /metricsreturns the exposition; anything else 404s. No new deps. Spawned from main.rs whenmetrics_addris set; runs independently of the log reporter and the publisher so a scrape failure can never stall ingestion.K8s manifest changes
k8s/demo/base/serviceradar-flow-collector.yaml: Deployment ->DaemonSet, replicas dropped, unused PVC removed (the Rust binary writes nothing to disk). ConfigMap JSON gains the
template_storeblock. Service was alreadyexternalTrafficPolicy: Local— kept. AddedRollingUpdatewithmaxUnavailable: 1.helm/serviceradar/templates/flow-collector.yaml: same kind flip, PVC removal, plusnodeSelector/tolerationsknobs.helm/serviceradar/values.yaml: dropreplicaCountanddata.*, addflowCollector.config.template_storeblock, defaultservice.type: LoadBalancerandservice.externalTrafficPolicy: Local, add the IPFIX (4739) and metrics (50046) ports.Tests
48 tests pass:
What's left to do
Code:
nats kv add flow_templates --replicas=3to override.Operational (no cluster access from here):
nats kv ls flow_templatesshows scoped keys.Decisions still open:
kv_replicasin TemplateStoreConfig or rely on pre-create.IMPORTANT: Please sign the Developer Certificate of Origin
Thank you for your contribution to ServiceRadar. Please note, when contributing, the developer must include
a DCO sign-off statement indicating the DCO acceptance in one commit message. Here
is an example DCO Signed-off-by line in a commit message:
Describe your changes
Issue ticket number and link
Code checklist before requesting a review