Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 47 additions & 21 deletions internal/metrics/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"

"github.com/optiqor/kerno/internal/bpf"
Expand All @@ -24,15 +26,15 @@ type Bridge struct {
logger *slog.Logger

mu sync.Mutex
seen map[string]int // metric name → cardinality count
seen map[string]map[string]struct{} // metric name → unique label tuples
cancelFn context.CancelFunc
}

// NewBridge creates a metrics bridge.
func NewBridge(logger *slog.Logger) *Bridge {
return &Bridge{
logger: logger,
seen: make(map[string]int),
seen: make(map[string]map[string]struct{}),
}
}

Expand Down Expand Up @@ -63,14 +65,38 @@ func (b *Bridge) Stop() {
}

// cardinalityOK returns true if the metric has not exceeded the label
// cardinality limit. This is a simple counter — not a true cardinality
// tracker (would need an LRU or HyperLogLog for production), but
// sufficient as a safety valve.
func (b *Bridge) cardinalityOK(metric string) bool {
// cardinality limit. Existing label tuples remain allowed after the limit;
// only new tuples are dropped.
func (b *Bridge) cardinalityOK(metric string, labels ...string) bool {
key := labelTupleKey(labels)

b.mu.Lock()
defer b.mu.Unlock()
b.seen[metric]++
return b.seen[metric] <= LabelCardinalityLimit

tuples := b.seen[metric]
if tuples == nil {
tuples = make(map[string]struct{})
b.seen[metric] = tuples
}
if _, ok := tuples[key]; ok {
return true
}
if len(tuples) >= LabelCardinalityLimit {
return false
}
tuples[key] = struct{}{}
return true
}

func labelTupleKey(labels []string) string {
var b strings.Builder
for _, label := range labels {
b.WriteString(strconv.Itoa(len(label)))
b.WriteByte(':')
b.WriteString(label)
b.WriteByte('|')
}
return b.String()
}

func (b *Bridge) consume(ctx context.Context, name string, ch <-chan bpf.RawEvent) {
Expand Down Expand Up @@ -113,11 +139,11 @@ func (b *Bridge) recordSyscall(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("syscall_latency").Inc()
return
}
if !b.cardinalityOK("syscall") {
return
}
comm := event.CommString()
sc := bpf.SyscallName(event.SyscallNr)
if !b.cardinalityOK("syscall", sc, comm) {
return
}
SyscallDuration.WithLabelValues(sc, comm).Observe(float64(event.LatencyNs))
SyscallTotal.WithLabelValues(sc, comm).Inc()
}
Expand All @@ -128,12 +154,12 @@ func (b *Bridge) recordTCP(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("tcp_monitor").Inc()
return
}
if !b.cardinalityOK("tcp") {
return
}
src := event.SrcAddr().String()
dst := event.DstAddr().String()
comm := event.CommString()
if !b.cardinalityOK("tcp", src, dst, comm) {
return
}

TCPConnectionsTotal.WithLabelValues(src, dst, comm).Inc()

Expand Down Expand Up @@ -161,11 +187,11 @@ func (b *Bridge) recordDiskIO(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("disk_io").Inc()
return
}
if !b.cardinalityOK("disk_io") {
return
}
dev := formatDev(event.Dev)
op := event.OpString()
if !b.cardinalityOK("disk_io", dev, op) {
return
}
DiskIODuration.WithLabelValues(dev, op).Observe(float64(event.LatencyNs))
DiskIOBytesTotal.WithLabelValues(dev, op).Add(float64(event.NrBytes))
}
Expand All @@ -176,10 +202,10 @@ func (b *Bridge) recordSchedDelay(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("sched_delay").Inc()
return
}
if !b.cardinalityOK("sched_delay") {
comm := event.CommString()
if !b.cardinalityOK("sched_delay", comm) {
return
}
comm := event.CommString()
SchedDelay.WithLabelValues(comm).Observe(float64(event.RunqDelayNs))
}

Expand All @@ -189,10 +215,10 @@ func (b *Bridge) recordFD(raw bpf.RawEvent) {
CollectorErrorsTotal.WithLabelValues("fd_track").Inc()
return
}
if !b.cardinalityOK("fd_track") {
comm := event.CommString()
if !b.cardinalityOK("fd_track", comm) {
return
}
comm := event.CommString()
switch event.Op {
case bpf.FDOpOpen:
FDOpenTotal.WithLabelValues(comm).Inc()
Expand Down
24 changes: 21 additions & 3 deletions internal/metrics/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metrics
import (
"encoding/binary"
"log/slog"
"strconv"
"testing"

"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestCardinalityLimit(t *testing.T) {

var got bool
for i := 0; i < tt.calls; i++ {
got = b.cardinalityOK("test_metric")
got = b.cardinalityOK("test_metric", "tuple", strconv.Itoa(i))
}

if got != tt.wantLast {
Expand All @@ -242,13 +243,30 @@ func TestCardinalityLimit(t *testing.T) {
b := NewBridge(slog.Default())

for i := 0; i < LabelCardinalityLimit+1; i++ {
b.cardinalityOK("test_metric")
b.cardinalityOK("test_metric", strconv.Itoa(i))
}

if !b.cardinalityOK("other_metric") {
if !b.cardinalityOK("other_metric", "new_tuple") {
t.Error("expected cardinalityOK to return true for different metric")
}
})

t.Run("existing_tuple_still_allowed_after_limit", func(t *testing.T) {
b := NewBridge(slog.Default())

for i := 0; i < LabelCardinalityLimit; i++ {
if !b.cardinalityOK("test_metric", strconv.Itoa(i)) {
t.Fatalf("tuple %d rejected before limit", i)
}
}

if !b.cardinalityOK("test_metric", "0") {
t.Error("expected existing tuple to remain allowed after limit")
}
if b.cardinalityOK("test_metric", "brand_new_tuple") {
t.Error("expected new tuple to be rejected after limit")
}
})
}

func TestFormatDev(t *testing.T) {
Expand Down
Loading