From ded0a2114f3188d590ccccb9e5b9c0bfc97d0ec8 Mon Sep 17 00:00:00 2001 From: Saurabh Kumar Bajpai Date: Mon, 8 Jun 2026 14:18:06 +0530 Subject: [PATCH] fix(metrics): track unique cardinality label tuples --- internal/metrics/bridge.go | 68 +++++++++++++++++++++++---------- internal/metrics/bridge_test.go | 24 ++++++++++-- 2 files changed, 68 insertions(+), 24 deletions(-) diff --git a/internal/metrics/bridge.go b/internal/metrics/bridge.go index 697185d..7883298 100644 --- a/internal/metrics/bridge.go +++ b/internal/metrics/bridge.go @@ -7,6 +7,8 @@ import ( "context" "fmt" "log/slog" + "strconv" + "strings" "sync" "github.com/optiqor/kerno/internal/bpf" @@ -24,7 +26,7 @@ type Bridge struct { logger *slog.Logger mu sync.Mutex - seen map[string]int // metric name → cardinality count + seen map[string]map[string]struct{} // metric name → unique label tuples cancelFn context.CancelFunc } @@ -32,7 +34,7 @@ type Bridge struct { func NewBridge(logger *slog.Logger) *Bridge { return &Bridge{ logger: logger, - seen: make(map[string]int), + seen: make(map[string]map[string]struct{}), } } @@ -63,14 +65,38 @@ func (b *Bridge) Stop() { } // cardinalityOK returns true if the metric has not exceeded the label -// cardinality limit. This is a simple counter — not a true cardinality -// tracker (would need an LRU or HyperLogLog for production), but -// sufficient as a safety valve. -func (b *Bridge) cardinalityOK(metric string) bool { +// cardinality limit. Existing label tuples remain allowed after the limit; +// only new tuples are dropped. +func (b *Bridge) cardinalityOK(metric string, labels ...string) bool { + key := labelTupleKey(labels) + b.mu.Lock() defer b.mu.Unlock() - b.seen[metric]++ - return b.seen[metric] <= LabelCardinalityLimit + + tuples := b.seen[metric] + if tuples == nil { + tuples = make(map[string]struct{}) + b.seen[metric] = tuples + } + if _, ok := tuples[key]; ok { + return true + } + if len(tuples) >= LabelCardinalityLimit { + return false + } + tuples[key] = struct{}{} + return true +} + +func labelTupleKey(labels []string) string { + var b strings.Builder + for _, label := range labels { + b.WriteString(strconv.Itoa(len(label))) + b.WriteByte(':') + b.WriteString(label) + b.WriteByte('|') + } + return b.String() } func (b *Bridge) consume(ctx context.Context, name string, ch <-chan bpf.RawEvent) { @@ -113,11 +139,11 @@ func (b *Bridge) recordSyscall(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("syscall_latency").Inc() return } - if !b.cardinalityOK("syscall") { - return - } comm := event.CommString() sc := bpf.SyscallName(event.SyscallNr) + if !b.cardinalityOK("syscall", sc, comm) { + return + } SyscallDuration.WithLabelValues(sc, comm).Observe(float64(event.LatencyNs)) SyscallTotal.WithLabelValues(sc, comm).Inc() } @@ -128,12 +154,12 @@ func (b *Bridge) recordTCP(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("tcp_monitor").Inc() return } - if !b.cardinalityOK("tcp") { - return - } src := event.SrcAddr().String() dst := event.DstAddr().String() comm := event.CommString() + if !b.cardinalityOK("tcp", src, dst, comm) { + return + } TCPConnectionsTotal.WithLabelValues(src, dst, comm).Inc() @@ -161,11 +187,11 @@ func (b *Bridge) recordDiskIO(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("disk_io").Inc() return } - if !b.cardinalityOK("disk_io") { - return - } dev := formatDev(event.Dev) op := event.OpString() + if !b.cardinalityOK("disk_io", dev, op) { + return + } DiskIODuration.WithLabelValues(dev, op).Observe(float64(event.LatencyNs)) DiskIOBytesTotal.WithLabelValues(dev, op).Add(float64(event.NrBytes)) } @@ -176,10 +202,10 @@ func (b *Bridge) recordSchedDelay(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("sched_delay").Inc() return } - if !b.cardinalityOK("sched_delay") { + comm := event.CommString() + if !b.cardinalityOK("sched_delay", comm) { return } - comm := event.CommString() SchedDelay.WithLabelValues(comm).Observe(float64(event.RunqDelayNs)) } @@ -189,10 +215,10 @@ func (b *Bridge) recordFD(raw bpf.RawEvent) { CollectorErrorsTotal.WithLabelValues("fd_track").Inc() return } - if !b.cardinalityOK("fd_track") { + comm := event.CommString() + if !b.cardinalityOK("fd_track", comm) { return } - comm := event.CommString() switch event.Op { case bpf.FDOpOpen: FDOpenTotal.WithLabelValues(comm).Inc() diff --git a/internal/metrics/bridge_test.go b/internal/metrics/bridge_test.go index f19b413..78cd29c 100644 --- a/internal/metrics/bridge_test.go +++ b/internal/metrics/bridge_test.go @@ -6,6 +6,7 @@ package metrics import ( "encoding/binary" "log/slog" + "strconv" "testing" "github.com/prometheus/client_golang/prometheus/testutil" @@ -229,7 +230,7 @@ func TestCardinalityLimit(t *testing.T) { var got bool for i := 0; i < tt.calls; i++ { - got = b.cardinalityOK("test_metric") + got = b.cardinalityOK("test_metric", "tuple", strconv.Itoa(i)) } if got != tt.wantLast { @@ -242,13 +243,30 @@ func TestCardinalityLimit(t *testing.T) { b := NewBridge(slog.Default()) for i := 0; i < LabelCardinalityLimit+1; i++ { - b.cardinalityOK("test_metric") + b.cardinalityOK("test_metric", strconv.Itoa(i)) } - if !b.cardinalityOK("other_metric") { + if !b.cardinalityOK("other_metric", "new_tuple") { t.Error("expected cardinalityOK to return true for different metric") } }) + + t.Run("existing_tuple_still_allowed_after_limit", func(t *testing.T) { + b := NewBridge(slog.Default()) + + for i := 0; i < LabelCardinalityLimit; i++ { + if !b.cardinalityOK("test_metric", strconv.Itoa(i)) { + t.Fatalf("tuple %d rejected before limit", i) + } + } + + if !b.cardinalityOK("test_metric", "0") { + t.Error("expected existing tuple to remain allowed after limit") + } + if b.cardinalityOK("test_metric", "brand_new_tuple") { + t.Error("expected new tuple to be rejected after limit") + } + }) } func TestFormatDev(t *testing.T) {