Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions internal/metrics/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ type Bridge struct {
logger *slog.Logger

mu sync.Mutex
seen map[string]int // metric name → cardinality count
seen map[string]map[string]struct{} // metric name → cardinality count
cancelFn context.CancelFunc
}

// NewBridge creates a metrics bridge.
func NewBridge(logger *slog.Logger) *Bridge {
return &Bridge{
logger: logger,
seen: make(map[string]int),
seen: make(map[string]map[string]struct{}),
}
}

Expand Down Expand Up @@ -66,11 +66,22 @@ func (b *Bridge) Stop() {
// cardinality limit. This is a simple counter — not a true cardinality
// tracker (would need an LRU or HyperLogLog for production), but
// sufficient as a safety valve.
func (b *Bridge) cardinalityOK(metric string) bool {
func (b *Bridge) cardinalityOK(metric, labelKey string) bool {
b.mu.Lock()
defer b.mu.Unlock()
b.seen[metric]++
return b.seen[metric] <= LabelCardinalityLimit
s, ok := b.seen[metric]
if !ok {
s = make(map[string]struct{})
b.seen[metric] = s
}
if _, exists := s[labelKey]; exists {
return true
}
if len(s) >= LabelCardinalityLimit {
return false
}
s[labelKey] = struct{}{}
return true
}

func (b *Bridge) consume(ctx context.Context, name string, ch <-chan bpf.RawEvent) {
Expand Down Expand Up @@ -113,11 +124,11 @@ func (b *Bridge) recordSyscall(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("syscall_latency").Inc()
return
}
if !b.cardinalityOK("syscall") {
return
}
comm := event.CommString()
sc := bpf.SyscallName(event.SyscallNr)
if !b.cardinalityOK("syscall", sc+"\x00"+comm) {
return
}
SyscallDuration.WithLabelValues(sc, comm).Observe(float64(event.LatencyNs))
SyscallTotal.WithLabelValues(sc, comm).Inc()
}
Expand All @@ -128,12 +139,12 @@ func (b *Bridge) recordTCP(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("tcp_monitor").Inc()
return
}
if !b.cardinalityOK("tcp") {
return
}
src := event.SrcAddr().String()
dst := event.DstAddr().String()
comm := event.CommString()
if !b.cardinalityOK("tcp", src+"\x00"+dst+"\x00"+comm) {
return
}

TCPConnectionsTotal.WithLabelValues(src, dst, comm).Inc()

Expand Down Expand Up @@ -161,11 +172,11 @@ func (b *Bridge) recordDiskIO(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("disk_io").Inc()
return
}
if !b.cardinalityOK("disk_io") {
return
}
dev := formatDev(event.Dev)
op := event.OpString()
if !b.cardinalityOK("disk_io", dev+"\x00"+op) {
return
}
DiskIODuration.WithLabelValues(dev, op).Observe(float64(event.LatencyNs))
DiskIOBytesTotal.WithLabelValues(dev, op).Add(float64(event.NrBytes))
}
Expand All @@ -176,10 +187,10 @@ func (b *Bridge) recordSchedDelay(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("sched_delay").Inc()
return
}
if !b.cardinalityOK("sched_delay") {
comm := event.CommString()
if !b.cardinalityOK("sched_delay", comm) {
return
}
comm := event.CommString()
SchedDelay.WithLabelValues(comm).Observe(float64(event.RunqDelayNs))
}

Expand All @@ -189,10 +200,10 @@ func (b *Bridge) recordFD(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("fd_track").Inc()
return
}
if !b.cardinalityOK("fd_track") {
comm := event.CommString()
if !b.cardinalityOK("fd_track", comm) {
return
}
comm := event.CommString()
switch event.Op {
case bpf.FDOpOpen:
FDOpenTotal.WithLabelValues(comm).Inc()
Expand Down
50 changes: 26 additions & 24 deletions internal/metrics/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package metrics

import (
"encoding/binary"
"fmt"
"log/slog"
"testing"

Expand Down Expand Up @@ -206,46 +207,47 @@ func TestRecordSchedDelay(t *testing.T) {
}

func TestCardinalityLimit(t *testing.T) {
tests := []struct {
cases := []struct {
name string
calls int
unique int
wantLast bool
}{
{
name: "at_limit",
calls: LabelCardinalityLimit,
wantLast: true,
},
{
name: "one_past_limit",
calls: LabelCardinalityLimit + 1,
wantLast: false,
},
{"at limit", LabelCardinalityLimit, true},
{"one past limit", LabelCardinalityLimit + 1, false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := NewBridge(slog.Default())

var got bool
for i := 0; i < tt.calls; i++ {
got = b.cardinalityOK("test_metric")
for i := 0; i < c.unique; i++ {
got = b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i))
}

if got != tt.wantLast {
t.Fatalf("last cardinalityOK() = %v, want %v", got, tt.wantLast)
if got != c.wantLast {
t.Fatalf("last cardinalityOK() = %v, want %v", got, c.wantLast)
}
})
}

t.Run("different_metric_still_allowed", func(t *testing.T) {
t.Run("known key passes when full", func(t *testing.T) {
b := NewBridge(slog.Default())
for i := 0; i < LabelCardinalityLimit; i++ {
b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i))
}
if !b.cardinalityOK("test_metric", "k0") {
t.Error("known key should pass even when bucket is full")
}
if b.cardinalityOK("test_metric", "brand_new") {
t.Error("new key should be rejected when bucket is full")
}
})

t.Run("different_metric_still_allowed", func(t *testing.T) {
b := NewBridge(slog.Default())
for i := 0; i < LabelCardinalityLimit+1; i++ {
b.cardinalityOK("test_metric")
b.cardinalityOK("test_metric", fmt.Sprintf("k%d", i))
}

if !b.cardinalityOK("other_metric") {
if !b.cardinalityOK("other_metric", "any") {
t.Error("expected cardinalityOK to return true for different metric")
}
})
Expand Down
Loading