From eb02bbf3c068bea2c74e776253bd10788e047a42 Mon Sep 17 00:00:00 2001 From: ramnnn2006 Date: Tue, 9 Jun 2026 12:45:07 +0530 Subject: [PATCH] fix(metrics): track unique cardinality label tuples cardinalityOK was incrementing seen[metric] on every event, so a single busy (syscall, comm) pair could exhaust the 5000 limit in under a second and permanently silence all observations for that category, including already-tracked series. fix: change seen from map[string]int to map[string]map[string]struct{} and add a labelKey param to cardinalityOK. each caller now passes the actual label tuple (joined with \x00) so only genuinely new combinations count toward the cap. known tuples always pass through even when full. updated TestCardinalityLimit to test new semantics: unique keys fill the bucket, known keys still pass when full, new keys are rejected, different metrics stay isolated. Signed-off-by: ramnnn2006 --- internal/metrics/bridge.go | 47 +++++++++++++++++++------------ internal/metrics/bridge_test.go | 50 +++++++++++++++++---------------- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/internal/metrics/bridge.go b/internal/metrics/bridge.go index 697185d..265ef96 100644 --- a/internal/metrics/bridge.go +++ b/internal/metrics/bridge.go @@ -24,7 +24,7 @@ type Bridge struct { logger *slog.Logger mu sync.Mutex - seen map[string]int // metric name → cardinality count + seen map[string]map[string]struct{} // metric name → cardinality count cancelFn context.CancelFunc } @@ -32,7 +32,7 @@ type Bridge struct { func NewBridge(logger *slog.Logger) *Bridge { return &Bridge{ logger: logger, - seen: make(map[string]int), + seen: make(map[string]map[string]struct{}), } } @@ -66,11 +66,22 @@ func (b *Bridge) Stop() { // cardinality limit. This is a simple counter — not a true cardinality // tracker (would need an LRU or HyperLogLog for production), but // sufficient as a safety valve. -func (b *Bridge) cardinalityOK(metric string) bool { +func (b *Bridge) cardinalityOK(metric, labelKey string) bool { b.mu.Lock() defer b.mu.Unlock() - b.seen[metric]++ - return b.seen[metric] <= LabelCardinalityLimit + s, ok := b.seen[metric] + if !ok { + s = make(map[string]struct{}) + b.seen[metric] = s + } + if _, exists := s[labelKey]; exists { + return true + } + if len(s) >= LabelCardinalityLimit { + return false + } + s[labelKey] = struct{}{} + return true } func (b *Bridge) consume(ctx context.Context, name string, ch <-chan bpf.RawEvent) { @@ -113,11 +124,11 @@ func (b *Bridge) recordSyscall(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("syscall_latency").Inc() return } - if !b.cardinalityOK("syscall") { - return - } comm := event.CommString() sc := bpf.SyscallName(event.SyscallNr) + if !b.cardinalityOK("syscall", sc+"\x00"+comm) { + return + } SyscallDuration.WithLabelValues(sc, comm).Observe(float64(event.LatencyNs)) SyscallTotal.WithLabelValues(sc, comm).Inc() } @@ -128,12 +139,12 @@ func (b *Bridge) recordTCP(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("tcp_monitor").Inc() return } - if !b.cardinalityOK("tcp") { - return - } src := event.SrcAddr().String() dst := event.DstAddr().String() comm := event.CommString() + if !b.cardinalityOK("tcp", src+"\x00"+dst+"\x00"+comm) { + return + } TCPConnectionsTotal.WithLabelValues(src, dst, comm).Inc() @@ -161,11 +172,11 @@ func (b *Bridge) recordDiskIO(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("disk_io").Inc() return } - if !b.cardinalityOK("disk_io") { - return - } dev := formatDev(event.Dev) op := event.OpString() + if !b.cardinalityOK("disk_io", dev+"\x00"+op) { + return + } DiskIODuration.WithLabelValues(dev, op).Observe(float64(event.LatencyNs)) DiskIOBytesTotal.WithLabelValues(dev, op).Add(float64(event.NrBytes)) } @@ -176,10 +187,10 @@ func (b *Bridge) recordSchedDelay(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("sched_delay").Inc() return } - if !b.cardinalityOK("sched_delay") { + comm := event.CommString() + if !b.cardinalityOK("sched_delay", comm) { return } - comm := event.CommString() SchedDelay.WithLabelValues(comm).Observe(float64(event.RunqDelayNs)) } @@ -189,10 +200,10 @@ func (b *Bridge) recordFD(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("fd_track").Inc() return } - if !b.cardinalityOK("fd_track") { + comm := event.CommString() + if !b.cardinalityOK("fd_track", comm) { return } - comm := event.CommString() switch event.Op { case bpf.FDOpOpen: FDOpenTotal.WithLabelValues(comm).Inc() diff --git a/internal/metrics/bridge_test.go b/internal/metrics/bridge_test.go index f19b413..7b642bc 100644 --- a/internal/metrics/bridge_test.go +++ b/internal/metrics/bridge_test.go @@ -5,6 +5,7 @@ package metrics import ( "encoding/binary" + "fmt" "log/slog" "testing" @@ -206,46 +207,47 @@ func TestRecordSchedDelay(t *testing.T) { } func TestCardinalityLimit(t *testing.T) { - tests := []struct { + cases := []struct { name string - calls int + unique int wantLast bool }{ - { - name: "at_limit", - calls: LabelCardinalityLimit, - wantLast: true, - }, - { - name: "one_past_limit", - calls: LabelCardinalityLimit + 1, - wantLast: false, - }, + {"at limit", LabelCardinalityLimit, true}, + {"one past limit", LabelCardinalityLimit + 1, false}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { b := NewBridge(slog.Default()) - var got bool - for i := 0; i < tt.calls; i++ { - got = b.cardinalityOK("test_metric") + for i := 0; i < c.unique; i++ { + got = b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i)) } - - if got != tt.wantLast { - t.Fatalf("last cardinalityOK() = %v, want %v", got, tt.wantLast) + if got != c.wantLast { + t.Fatalf("last cardinalityOK() = %v, want %v", got, c.wantLast) } }) } - t.Run("different_metric_still_allowed", func(t *testing.T) { + t.Run("known key passes when full", func(t *testing.T) { b := NewBridge(slog.Default()) + for i := 0; i < LabelCardinalityLimit; i++ { + b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i)) + } + if !b.cardinalityOK("test_metric", "k0") { + t.Error("known key should pass even when bucket is full") + } + if b.cardinalityOK("test_metric", "brand_new") { + t.Error("new key should be rejected when bucket is full") + } + }) + t.Run("different_metric_still_allowed", func(t *testing.T) { + b := NewBridge(slog.Default()) for i := 0; i < LabelCardinalityLimit+1; i++ { - b.cardinalityOK("test_metric") + b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i)) } - - if !b.cardinalityOK("other_metric") { + if !b.cardinalityOK("other_metric", "any") { t.Error("expected cardinalityOK to return true for different metric") } })