diff --git a/internal/metrics/bridge.go b/internal/metrics/bridge.go index 697185d..265ef96 100644 --- a/internal/metrics/bridge.go +++ b/internal/metrics/bridge.go @@ -24,7 +24,7 @@ type Bridge struct { logger *slog.Logger mu sync.Mutex - seen map[string]int // metric name → cardinality count + seen map[string]map[string]struct{} // metric name → cardinality count cancelFn context.CancelFunc } @@ -32,7 +32,7 @@ type Bridge struct { func NewBridge(logger *slog.Logger) *Bridge { return &Bridge{ logger: logger, - seen: make(map[string]int), + seen: make(map[string]map[string]struct{}), } } @@ -66,11 +66,22 @@ func (b *Bridge) Stop() { // cardinality limit. This is a simple counter — not a true cardinality // tracker (would need an LRU or HyperLogLog for production), but // sufficient as a safety valve. -func (b *Bridge) cardinalityOK(metric string) bool { +func (b *Bridge) cardinalityOK(metric, labelKey string) bool { b.mu.Lock() defer b.mu.Unlock() - b.seen[metric]++ - return b.seen[metric] <= LabelCardinalityLimit + s, ok := b.seen[metric] + if !ok { + s = make(map[string]struct{}) + b.seen[metric] = s + } + if _, exists := s[labelKey]; exists { + return true + } + if len(s) >= LabelCardinalityLimit { + return false + } + s[labelKey] = struct{}{} + return true } func (b *Bridge) consume(ctx context.Context, name string, ch <-chan bpf.RawEvent) { @@ -113,11 +124,11 @@ func (b *Bridge) recordSyscall(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("syscall_latency").Inc() return } - if !b.cardinalityOK("syscall") { - return - } comm := event.CommString() sc := bpf.SyscallName(event.SyscallNr) + if !b.cardinalityOK("syscall", sc+"\x00"+comm) { + return + } SyscallDuration.WithLabelValues(sc, comm).Observe(float64(event.LatencyNs)) SyscallTotal.WithLabelValues(sc, comm).Inc() } @@ -128,12 +139,12 @@ func (b *Bridge) recordTCP(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("tcp_monitor").Inc() return } - if !b.cardinalityOK("tcp") { - return - } src := event.SrcAddr().String() dst := event.DstAddr().String() comm := event.CommString() + if !b.cardinalityOK("tcp", src+"\x00"+dst+"\x00"+comm) { + return + } TCPConnectionsTotal.WithLabelValues(src, dst, comm).Inc() @@ -161,11 +172,11 @@ func (b *Bridge) recordDiskIO(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("disk_io").Inc() return } - if !b.cardinalityOK("disk_io") { - return - } dev := formatDev(event.Dev) op := event.OpString() + if !b.cardinalityOK("disk_io", dev+"\x00"+op) { + return + } DiskIODuration.WithLabelValues(dev, op).Observe(float64(event.LatencyNs)) DiskIOBytesTotal.WithLabelValues(dev, op).Add(float64(event.NrBytes)) } @@ -176,10 +187,10 @@ func (b *Bridge) recordSchedDelay(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("sched_delay").Inc() return } - if !b.cardinalityOK("sched_delay") { + comm := event.CommString() + if !b.cardinalityOK("sched_delay", comm) { return } - comm := event.CommString() SchedDelay.WithLabelValues(comm).Observe(float64(event.RunqDelayNs)) } @@ -189,10 +200,10 @@ func (b *Bridge) recordFD(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("fd_track").Inc() return } - if !b.cardinalityOK("fd_track") { + comm := event.CommString() + if !b.cardinalityOK("fd_track", comm) { return } - comm := event.CommString() switch event.Op { case bpf.FDOpOpen: FDOpenTotal.WithLabelValues(comm).Inc() diff --git a/internal/metrics/bridge_test.go b/internal/metrics/bridge_test.go index f19b413..7b642bc 100644 --- a/internal/metrics/bridge_test.go +++ b/internal/metrics/bridge_test.go @@ -5,6 +5,7 @@ package metrics import ( "encoding/binary" + "fmt" "log/slog" "testing" @@ -206,46 +207,47 @@ func TestRecordSchedDelay(t *testing.T) { } func TestCardinalityLimit(t *testing.T) { - tests := []struct { + cases := []struct { name string - calls int + unique int wantLast bool }{ - { - name: "at_limit", - calls: LabelCardinalityLimit, - wantLast: true, - }, - { - name: "one_past_limit", - calls: LabelCardinalityLimit + 1, - wantLast: false, - }, + {"at limit", LabelCardinalityLimit, true}, + {"one past limit", LabelCardinalityLimit + 1, false}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { b := NewBridge(slog.Default()) - var got bool - for i := 0; i < tt.calls; i++ { - got = b.cardinalityOK("test_metric") + for i := 0; i < c.unique; i++ { + got = b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i)) } - - if got != tt.wantLast { - t.Fatalf("last cardinalityOK() = %v, want %v", got, tt.wantLast) + if got != c.wantLast { + t.Fatalf("last cardinalityOK() = %v, want %v", got, c.wantLast) } }) } - t.Run("different_metric_still_allowed", func(t *testing.T) { + t.Run("known key passes when full", func(t *testing.T) { b := NewBridge(slog.Default()) + for i := 0; i < LabelCardinalityLimit; i++ { + b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i)) + } + if !b.cardinalityOK("test_metric", "k0") { + t.Error("known key should pass even when bucket is full") + } + if b.cardinalityOK("test_metric", "brand_new") { + t.Error("new key should be rejected when bucket is full") + } + }) + t.Run("different_metric_still_allowed", func(t *testing.T) { + b := NewBridge(slog.Default()) for i := 0; i < LabelCardinalityLimit+1; i++ { - b.cardinalityOK("test_metric") + b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i)) } - - if !b.cardinalityOK("other_metric") { + if !b.cardinalityOK("other_metric", "any") { t.Error("expected cardinalityOK to return true for different metric") } })