Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 49 additions & 23 deletions internal/metrics/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ type Bridge struct {
logger *slog.Logger

mu sync.Mutex
seen map[string]int // metric name → cardinality count
seen map[string]map[string]struct{} // metric name -> unique label tuple set
cancelFn context.CancelFunc
}

// NewBridge creates a metrics bridge.
func NewBridge(logger *slog.Logger) *Bridge {
return &Bridge{
logger: logger,
seen: make(map[string]int),
seen: make(map[string]map[string]struct{}),
}
}

Expand Down Expand Up @@ -62,15 +62,38 @@ func (b *Bridge) Stop() {
}
}

// cardinalityOK returns true if the metric has not exceeded the label
// cardinality limit. This is a simple counter — not a true cardinality
// tracker (would need an LRU or HyperLogLog for production), but
// sufficient as a safety valve.
func (b *Bridge) cardinalityOK(metric string) bool {
// cardinalityOK returns true when a metric label tuple can be recorded.
// Previously seen tuples always remain allowed; only new tuples are rejected
// after the per-metric cardinality budget is exhausted.
func (b *Bridge) cardinalityOK(metric string, labels ...string) bool {
labelKey := labelsKey(labels...)

b.mu.Lock()
defer b.mu.Unlock()
b.seen[metric]++
return b.seen[metric] <= LabelCardinalityLimit

tuples, ok := b.seen[metric]
if !ok {
tuples = make(map[string]struct{})
b.seen[metric] = tuples
}

if _, ok := tuples[labelKey]; ok {
return true
}
if len(tuples) >= LabelCardinalityLimit {
return false
}

tuples[labelKey] = struct{}{}
return true
}

func labelsKey(labels ...string) string {
key := ""
for _, label := range labels {
key += fmt.Sprintf("%d:%s;", len(label), label)
}
return key
}

func (b *Bridge) consume(ctx context.Context, name string, ch <-chan bpf.RawEvent) {
Expand Down Expand Up @@ -113,11 +136,11 @@ func (b *Bridge) recordSyscall(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("syscall_latency").Inc()
return
}
if !b.cardinalityOK("syscall") {
return
}
comm := event.CommString()
sc := bpf.SyscallName(event.SyscallNr)
if !b.cardinalityOK("syscall", sc, comm) {
return
}
SyscallDuration.WithLabelValues(sc, comm).Observe(float64(event.LatencyNs))
SyscallTotal.WithLabelValues(sc, comm).Inc()
}
Expand All @@ -128,12 +151,12 @@ func (b *Bridge) recordTCP(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("tcp_monitor").Inc()
return
}
if !b.cardinalityOK("tcp") {
return
}
src := event.SrcAddr().String()
dst := event.DstAddr().String()
comm := event.CommString()
if !b.cardinalityOK("tcp", src, dst, comm) {
return
}

TCPConnectionsTotal.WithLabelValues(src, dst, comm).Inc()

Expand Down Expand Up @@ -161,11 +184,11 @@ func (b *Bridge) recordDiskIO(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("disk_io").Inc()
return
}
if !b.cardinalityOK("disk_io") {
return
}
dev := formatDev(event.Dev)
op := event.OpString()
if !b.cardinalityOK("disk_io", dev, op) {
return
}
DiskIODuration.WithLabelValues(dev, op).Observe(float64(event.LatencyNs))
DiskIOBytesTotal.WithLabelValues(dev, op).Add(float64(event.NrBytes))
}
Expand All @@ -176,10 +199,10 @@ func (b *Bridge) recordSchedDelay(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("sched_delay").Inc()
return
}
if !b.cardinalityOK("sched_delay") {
comm := event.CommString()
if !b.cardinalityOK("sched_delay", comm) {
return
}
comm := event.CommString()
SchedDelay.WithLabelValues(comm).Observe(float64(event.RunqDelayNs))
}

Expand All @@ -189,14 +212,17 @@ func (b *Bridge) recordFD(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("fd_track").Inc()
return
}
if !b.cardinalityOK("fd_track") {
return
}
comm := event.CommString()
switch event.Op {
case bpf.FDOpOpen:
if !b.cardinalityOK("fd_open", comm) {
return
}
FDOpenTotal.WithLabelValues(comm).Inc()
case bpf.FDOpClose:
if !b.cardinalityOK("fd_close", comm) {
return
}
FDCloseTotal.WithLabelValues(comm).Inc()
}
}
Expand Down
50 changes: 36 additions & 14 deletions internal/metrics/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metrics
import (
"encoding/binary"
"log/slog"
"strconv"
"testing"

"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -207,19 +208,22 @@ func TestRecordSchedDelay(t *testing.T) {

func TestCardinalityLimit(t *testing.T) {
tests := []struct {
name string
calls int
wantLast bool
name string
labels []string
calls int
want bool
}{
{
name: "at_limit",
calls: LabelCardinalityLimit,
wantLast: true,
name: "same_tuple_still_allowed_at_limit",
labels: []string{"read", "nginx"},
calls: LabelCardinalityLimit,
want: true,
},
{
name: "one_past_limit",
calls: LabelCardinalityLimit + 1,
wantLast: false,
name: "same_tuple_still_allowed_past_limit",
labels: []string{"read", "nginx"},
calls: LabelCardinalityLimit + 1,
want: true,
},
}

Expand All @@ -229,23 +233,41 @@ func TestCardinalityLimit(t *testing.T) {

var got bool
for i := 0; i < tt.calls; i++ {
got = b.cardinalityOK("test_metric")
got = b.cardinalityOK("test_metric", tt.labels...)
}

if got != tt.wantLast {
t.Fatalf("last cardinalityOK() = %v, want %v", got, tt.wantLast)
if got != tt.want {
t.Fatalf("last cardinalityOK() = %v, want %v", got, tt.want)
}
})
}

t.Run("new_tuple_rejected_after_limit", func(t *testing.T) {
b := NewBridge(slog.Default())

for i := 0; i < LabelCardinalityLimit; i++ {
if !b.cardinalityOK("test_metric", "syscall", strconv.Itoa(i)) {
t.Fatalf("tuple %d unexpectedly rejected before limit", i)
}
}

if b.cardinalityOK("test_metric", "syscall", "over-limit") {
t.Fatal("expected new tuple beyond limit to be rejected")
}

if !b.cardinalityOK("test_metric", "syscall", "0") {
t.Fatal("expected previously seen tuple to remain allowed")
}
})

t.Run("different_metric_still_allowed", func(t *testing.T) {
b := NewBridge(slog.Default())

for i := 0; i < LabelCardinalityLimit+1; i++ {
b.cardinalityOK("test_metric")
b.cardinalityOK("test_metric", "same", "tuple")
}

if !b.cardinalityOK("other_metric") {
if !b.cardinalityOK("other_metric", "new", "tuple") {
t.Error("expected cardinalityOK to return true for different metric")
}
})
Expand Down
Loading