diff --git a/internal/collector/memory.go b/internal/collector/memory.go index f8d0ac8..dd0d4c4 100644 --- a/internal/collector/memory.go +++ b/internal/collector/memory.go @@ -158,6 +158,10 @@ func (c *MemoryCollector) poll() error { } } + var swapUsed uint64 + if swapTotal > swapFree { + swapUsed = swapTotal - swapFree + } c.snap = MemorySnapshot{ TotalBytes: total, UsedBytes: used, @@ -165,7 +169,7 @@ func (c *MemoryCollector) poll() error { GrowthRateBytesPerSec: growth, AvailableBytes: available, SwapTotalBytes: swapTotal, - SwapUsedBytes: swapTotal - swapFree, + SwapUsedBytes: swapUsed, } c.prev = memSample{used: used, at: now} c.have = true diff --git a/internal/collector/memory_collector_test.go b/internal/collector/memory_collector_test.go index 4cb43da..fc6d367 100644 --- a/internal/collector/memory_collector_test.go +++ b/internal/collector/memory_collector_test.go @@ -58,45 +58,130 @@ func TestParseMeminfoLine(t *testing.T) { {"BadValue: not_a_number kB", "", 0, false}, } for _, c := range cases { - k, v, ok := parseMeminfoLine(c.in) - if ok != c.want { - t.Errorf("parseMeminfoLine(%q) ok=%v, want %v", c.in, ok, c.want) - } - if ok && (k != c.key || v != c.val) { - t.Errorf("parseMeminfoLine(%q) = (%q, %d), want (%q, %d)", c.in, k, v, c.key, c.val) - } + t.Run(c.in, func(t *testing.T) { + k, v, ok := parseMeminfoLine(c.in) + if ok != c.want { + t.Errorf("parseMeminfoLine(%q) ok=%v, want %v", c.in, ok, c.want) + } + if ok && (k != c.key || v != c.val) { + t.Errorf("parseMeminfoLine(%q) = (%q, %d), want (%q, %d)", c.in, k, v, c.key, c.val) + } + }) } } func TestMemoryCollectorPoll(t *testing.T) { - path := writeMeminfo(t, 16<<30, 8<<30, 4<<30, 2<<30) // 16GB total, 8GB avail - - c := NewMemoryCollector(newSilentLogger(), 50*time.Millisecond) - c.procPath = path - - if err := c.poll(); err != nil { - t.Fatalf("poll: %v", err) - } - - snap, ok := c.Snapshot().(*MemorySnapshot) - if !ok || snap == nil { - t.Fatal("expected non-nil snapshot") - } - if snap.TotalBytes != 16<<30 { - t.Errorf("TotalBytes = %d, want %d", snap.TotalBytes, uint64(16<<30)) - } - if snap.UsedBytes != 8<<30 { - t.Errorf("UsedBytes = %d, want %d", snap.UsedBytes, uint64(8<<30)) - } - if snap.AvailableBytes != 8<<30 { - t.Errorf("AvailableBytes = %d, want %d", snap.AvailableBytes, uint64(8<<30)) - } - if snap.SwapUsedBytes != 2<<30 { - t.Errorf("SwapUsedBytes = %d, want %d", snap.SwapUsedBytes, uint64(2<<30)) + cases := []struct { + name string + total uint64 + available uint64 + swapTotal uint64 + swapFree uint64 + wantTotal uint64 + wantUsed uint64 + wantAvail uint64 + wantSwapUsed uint64 + wantPctLo float64 + wantPctHi float64 + wantErr bool + }{ + { + name: "half used with swap", + total: 16 << 30, + available: 8 << 30, + swapTotal: 4 << 30, + swapFree: 2 << 30, + wantTotal: 16 << 30, + wantUsed: 8 << 30, + wantAvail: 8 << 30, + wantSwapUsed: 2 << 30, + wantPctLo: 49.9, + wantPctHi: 50.1, + }, + { + name: "no swap configured", + total: 8 << 30, + available: 4 << 30, + wantTotal: 8 << 30, + wantUsed: 4 << 30, + wantAvail: 4 << 30, + wantPctLo: 49.9, + wantPctHi: 50.1, + }, + { + name: "swapfree exceeds swaptotal, guard clamps to zero", + total: 4 << 30, + available: 2 << 30, + swapTotal: 1 << 30, + swapFree: 2 << 30, + wantTotal: 4 << 30, + wantUsed: 2 << 30, + wantAvail: 2 << 30, + wantPctLo: 49.9, + wantPctHi: 50.1, + }, + { + name: "fully used memory", + total: 4 << 30, + available: 0, + wantTotal: 4 << 30, + wantUsed: 4 << 30, + wantPctLo: 99.9, + wantPctHi: 100.1, + }, + { + name: "exactly at 75 percent used", + total: 4 << 30, + available: 1 << 30, + wantTotal: 4 << 30, + wantUsed: 3 << 30, + wantAvail: 1 << 30, + wantPctLo: 74.9, + wantPctHi: 75.1, + }, + { + name: "zero total returns error", + total: 0, + wantErr: true, + }, } - // Used pct should be ~50%. - if snap.UsedPct < 49.0 || snap.UsedPct > 51.0 { - t.Errorf("UsedPct = %v, want ~50", snap.UsedPct) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + path := writeMeminfo(t, c.total, c.available, c.swapTotal, c.swapFree) + col := NewMemoryCollector(newSilentLogger(), 50*time.Millisecond) + col.procPath = path + + err := col.poll() + if c.wantErr { + if err == nil { + t.Fatal("expected poll error, got nil") + } + return + } + if err != nil { + t.Fatalf("poll: %v", err) + } + + snap, ok := col.Snapshot().(*MemorySnapshot) + if !ok || snap == nil { + t.Fatal("expected non-nil snapshot") + } + if snap.TotalBytes != c.wantTotal { + t.Errorf("TotalBytes = %d, want %d", snap.TotalBytes, c.wantTotal) + } + if snap.UsedBytes != c.wantUsed { + t.Errorf("UsedBytes = %d, want %d", snap.UsedBytes, c.wantUsed) + } + if snap.AvailableBytes != c.wantAvail { + t.Errorf("AvailableBytes = %d, want %d", snap.AvailableBytes, c.wantAvail) + } + if snap.SwapUsedBytes != c.wantSwapUsed { + t.Errorf("SwapUsedBytes = %d, want %d", snap.SwapUsedBytes, c.wantSwapUsed) + } + if snap.UsedPct < c.wantPctLo || snap.UsedPct > c.wantPctHi { + t.Errorf("UsedPct = %v, want [%v, %v]", snap.UsedPct, c.wantPctLo, c.wantPctHi) + } + }) } } diff --git a/internal/collector/memory_test.go b/internal/collector/memory_test.go index 9e7507e..087a8d8 100644 --- a/internal/collector/memory_test.go +++ b/internal/collector/memory_test.go @@ -17,7 +17,7 @@ import ( // usage under this many MB under sustained 100K events/s load. const memoryBudgetMB = 50 -// readPeakMB is the heap-in-use ceiling we treat as the working-set +// readHeapInuseMB is the heap-in-use ceiling we treat as the working-set // memory cost. Reported via t.Logf so CI surfaces the actual number. func readHeapInuseMB() float64 { runtime.GC() @@ -30,154 +30,164 @@ func readHeapInuseMB() float64 { // TestCollectorMemoryHighCardinalityBurst pumps a burst of events // through every collector with cardinality far exceeding each LRU cap. // The invariant under test: no matter how many unique keys are seen, -// the working set stays bounded by the cap × per-key size. +// the working set stays bounded by the cap x per-key size. func TestCollectorMemoryHighCardinalityBurst(t *testing.T) { if testing.Short() { t.Skip("skipping memory test in -short mode") } - sc := NewSyscallCollector(newSilentLogger(), nil) - tc := NewTCPCollector(newSilentLogger(), nil) - oc := NewOOMCollector(newSilentLogger(), nil) - dc := NewDiskIOCollector(newSilentLogger(), nil) - schc := NewSchedCollector(newSilentLogger(), nil) - fc := NewFDCollector(newSilentLogger(), nil) - - baseMB := readHeapInuseMB() - - const events = 2_000_000 - - // 16K unique comms so we churn the LRU well past its 4K cap. - const distinctComms = 16_000 - const distinctSyscalls = 400 + cases := []struct { + name string + events int + comms int + syscalls int + }{ + {"small burst", 200_000, 4_000, 100}, + {"large burst", 2_000_000, 16_000, 400}, + } - for i := 0; i < events; i++ { - comm := fmt.Sprintf("p%05d", i%distinctComms) - nr := uint32(i % distinctSyscalls) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + sc := NewSyscallCollector(newSilentLogger(), nil) + tc := NewTCPCollector(newSilentLogger(), nil) + oc := NewOOMCollector(newSilentLogger(), nil) + dc := NewDiskIOCollector(newSilentLogger(), nil) + schc := NewSchedCollector(newSilentLogger(), nil) + fc := NewFDCollector(newSilentLogger(), nil) + + baseMB := readHeapInuseMB() + + for i := 0; i < c.events; i++ { + comm := fmt.Sprintf("p%05d", i%c.comms) + nr := uint32(i % c.syscalls) + + sc.record(makeSyscallEvent(comm, nr, uint64((i%1000)+1)*1000, 0)) + schc.record(&bpf.SchedEvent{ + PID: uint32(i % c.comms), + RunqDelayNs: uint64((i % 500) + 1), + }) + dc.record(makeDiskEvent("RWS"[i%3], uint64((i%1000)+1)*1000, 4096)) + fc.record(&bpf.FDEvent{ + PID: uint32(i % c.comms), + Op: bpf.FDOp((i % 2) + 1), + }) + + tcpEvent := &bpf.TCPEvent{ + SAddr: uint32(i), + DAddr: uint32(i + 1), + SPort: uint16(i % 65535), + DPort: 80, + RTTUs: uint32((i % 500) + 1), + EventType: bpf.TCPEventRTT, + } + copy(tcpEvent.Comm[:], comm) + tc.record(tcpEvent) + + // OOM is intentionally rare — exercise the bounded-log path. + if i%10000 == 0 { + oc.record(makeOOMEvent(comm, uint32(i))) + } + } - sc.record(makeSyscallEvent(comm, nr, uint64((i%1000)+1)*1000, 0)) + peakMB := readHeapInuseMB() + growthMB := peakMB - baseMB + t.Logf("%s: %d events through 6 collectors -> heap growth %.2f MB (peak %.2f MB)", + c.name, c.events, growthMB, peakMB) - schc.record(&bpf.SchedEvent{ - PID: uint32(i % distinctComms), - RunqDelayNs: uint64((i % 500) + 1), - }) + if growthMB > memoryBudgetMB { + t.Errorf("heap grew %.2f MB, exceeds %d MB budget", growthMB, memoryBudgetMB) + } - dc.record(makeDiskEvent("RWS"[i%3], uint64((i%1000)+1)*1000, 4096)) + // force snapshots to ensure they don't reveal hidden allocations. + for _, snapper := range []interface { + Snapshot() any + }{sc, tc, oc, dc, schc, fc} { + _ = snapper.Snapshot() + } - fc.record(&bpf.FDEvent{ - PID: uint32(i % distinctComms), - Op: bpf.FDOp((i % 2) + 1), + // LRU should be at its cap, not unbounded. + if got := sc.keys.Len(); got > sc.cap { + t.Errorf("syscall LRU Len=%d > cap=%d", got, sc.cap) + } + if got := tc.conns.Len(); got > tc.cap { + t.Errorf("tcp LRU Len=%d > cap=%d", got, tc.cap) + } + if got := schc.keys.Len(); got > schc.cap { + t.Errorf("sched LRU Len=%d > cap=%d", got, schc.cap) + } + if got := fc.keys.Len(); got > fc.cap { + t.Errorf("fd LRU Len=%d > cap=%d", got, fc.cap) + } }) - - // TCP: vary 4-tuple to grow connection map. - tcpEvent := &bpf.TCPEvent{ - SAddr: uint32(i), - DAddr: uint32(i + 1), - SPort: uint16(i % 65535), - DPort: 80, - RTTUs: uint32((i % 500) + 1), - EventType: bpf.TCPEventRTT, - } - copy(tcpEvent.Comm[:], comm) - tc.record(tcpEvent) - - // OOM is intentionally rare — exercise the bounded-log path. - if i%10000 == 0 { - oc.record(makeOOMEvent(comm, uint32(i))) - } - } - - peakMB := readHeapInuseMB() - growthMB := peakMB - baseMB - t.Logf("burst: %d events through 6 collectors → heap growth %.2f MB (peak %.2f MB)", - events, growthMB, peakMB) - - if growthMB > memoryBudgetMB { - t.Errorf("heap grew %.2f MB, exceeds %d MB budget", growthMB, memoryBudgetMB) - } - - // Force snapshots to ensure they don't reveal hidden allocations. - for _, snapper := range []interface { - Snapshot() any - }{sc, tc, oc, dc, schc, fc} { - _ = snapper.Snapshot() - } - - // LRU should be at its cap, not unbounded. - if got := sc.keys.Len(); got > sc.cap { - t.Errorf("syscall LRU Len=%d > cap=%d", got, sc.cap) - } - if got := tc.conns.Len(); got > tc.cap { - t.Errorf("tcp LRU Len=%d > cap=%d", got, tc.cap) - } - if got := schc.keys.Len(); got > schc.cap { - t.Errorf("sched LRU Len=%d > cap=%d", got, schc.cap) - } - if got := fc.keys.Len(); got > fc.cap { - t.Errorf("fd LRU Len=%d > cap=%d", got, fc.cap) } } // TestCollectorMemorySustained100Kps exercises the working set under -// sustained 100K events/s load for a few seconds. It rate-limits via -// a token loop instead of time.Sleep per event so the achieved rate -// matches the target on slower CI runners. +// sustained load for a few seconds. It rate-limits via a token loop +// instead of time.Sleep per event so the achieved rate matches the +// target on slower CI runners. func TestCollectorMemorySustained100Kps(t *testing.T) { if testing.Short() { t.Skip("skipping memory test in -short mode") } - sc := NewSyscallCollector(newSilentLogger(), nil) - dc := NewDiskIOCollector(newSilentLogger(), nil) - schc := NewSchedCollector(newSilentLogger(), nil) - - baseMB := readHeapInuseMB() - - const targetRate = 100_000 // events/sec - const duration = 3 * time.Second - const batchSize = 1000 - tickInterval := time.Second / (targetRate / batchSize) - - var ( - totalEvents int - mu sync.Mutex - stopAt = time.Now().Add(duration) - ) - - ticker := time.NewTicker(tickInterval) - defer ticker.Stop() - - commTemplate := func(i int) string { return fmt.Sprintf("p%04d", i%4000) } - - start := time.Now() - for time.Now().Before(stopAt) { - <-ticker.C - for j := 0; j < batchSize; j++ { - i := totalEvents + j - comm := commTemplate(i) - sc.record(makeSyscallEvent(comm, uint32(i%200), uint64((i%1000)+1)*1000, 0)) - schc.record(&bpf.SchedEvent{PID: uint32(i % 4000), RunqDelayNs: uint64((i % 500) + 1)}) - dc.record(makeDiskEvent("RWS"[i%3], uint64((i%1000)+1)*1000, 4096)) - } - mu.Lock() - totalEvents += batchSize - mu.Unlock() + cases := []struct { + name string + targetRate int + duration time.Duration + }{ + {"50k/s for 1s", 50_000, 1 * time.Second}, + {"100k/s for 3s", 100_000, 3 * time.Second}, } - elapsed := time.Since(start) - achievedRate := float64(totalEvents*3) / elapsed.Seconds() // 3 collectors per batch - peakMB := readHeapInuseMB() - growthMB := peakMB - baseMB + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + sc := NewSyscallCollector(newSilentLogger(), nil) + dc := NewDiskIOCollector(newSilentLogger(), nil) + schc := NewSchedCollector(newSilentLogger(), nil) - t.Logf("sustained: %d events × 3 collectors in %v = %.0f events/s; heap growth %.2f MB", - totalEvents, elapsed.Round(time.Millisecond), achievedRate, growthMB) + baseMB := readHeapInuseMB() - if achievedRate < float64(targetRate)*0.8 { - t.Logf("WARNING: achieved rate %.0f/s is far below %d target — runner may be slow", achievedRate, targetRate) - } - if growthMB > memoryBudgetMB { - t.Errorf("heap grew %.2f MB under %.0f events/s, exceeds %d MB budget", - growthMB, achievedRate, memoryBudgetMB) + const batchSize = 1000 + tickInterval := time.Second / time.Duration(c.targetRate/batchSize) + + var totalEvents int + stopAt := time.Now().Add(c.duration) + + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() + + commTemplate := func(i int) string { return fmt.Sprintf("p%04d", i%4000) } + + start := time.Now() + for time.Now().Before(stopAt) { + <-ticker.C + for j := 0; j < batchSize; j++ { + i := totalEvents + j + comm := commTemplate(i) + sc.record(makeSyscallEvent(comm, uint32(i%200), uint64((i%1000)+1)*1000, 0)) + schc.record(&bpf.SchedEvent{PID: uint32(i % 4000), RunqDelayNs: uint64((i % 500) + 1)}) + dc.record(makeDiskEvent("RWS"[i%3], uint64((i%1000)+1)*1000, 4096)) + } + totalEvents += batchSize + } + elapsed := time.Since(start) + + achievedRate := float64(totalEvents*3) / elapsed.Seconds() + peakMB := readHeapInuseMB() + growthMB := peakMB - baseMB + + t.Logf("sustained (%s): %d events x 3 collectors in %v = %.0f events/s; heap growth %.2f MB", + c.name, totalEvents, elapsed.Round(time.Millisecond), achievedRate, growthMB) + + if achievedRate < float64(c.targetRate)*0.8 { + t.Logf("WARNING: achieved rate %.0f/s is far below %d target -- runner may be slow", achievedRate, c.targetRate) + } + if growthMB > memoryBudgetMB { + t.Errorf("heap grew %.2f MB under %.0f events/s, exceeds %d MB budget", + growthMB, achievedRate, memoryBudgetMB) + } + }) } } @@ -186,32 +196,43 @@ func TestCollectorMemorySustained100Kps(t *testing.T) { // memory but adds confidence that the sustained test isn't masking // concurrency bugs. func TestCollectorConcurrentRecord(t *testing.T) { - sc := NewSyscallCollector(newSilentLogger(), nil) - - const goroutines = 16 - const eventsPerGoroutine = 100_000 - - var wg sync.WaitGroup - wg.Add(goroutines) - for g := 0; g < goroutines; g++ { - go func(seed int) { - defer wg.Done() - for i := 0; i < eventsPerGoroutine; i++ { - sc.record(makeSyscallEvent( - fmt.Sprintf("p%04d", (seed*eventsPerGoroutine+i)%4000), - uint32(i%100), - uint64((i%1000)+1)*1000, - 0, - )) - } - }(g) + cases := []struct { + name string + goroutines int + eventsPerGoroutine int + }{ + {"1 goroutine", 1, 200_000}, + {"4 goroutines", 4, 50_000}, + {"16 goroutines", 16, 100_000}, } - wg.Wait() - snap := sc.Snapshot().(*SyscallSnapshot) - expected := uint64(goroutines * eventsPerGoroutine) - if snap.TotalCount != expected { - t.Errorf("TotalCount = %d, want %d (concurrent writes lost events)", - snap.TotalCount, expected) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + sc := NewSyscallCollector(newSilentLogger(), nil) + + var wg sync.WaitGroup + wg.Add(c.goroutines) + for g := 0; g < c.goroutines; g++ { + go func(seed int) { + defer wg.Done() + for i := 0; i < c.eventsPerGoroutine; i++ { + sc.record(makeSyscallEvent( + fmt.Sprintf("p%04d", (seed*c.eventsPerGoroutine+i)%4000), + uint32(i%100), + uint64((i%1000)+1)*1000, + 0, + )) + } + }(g) + } + wg.Wait() + + snap := sc.Snapshot().(*SyscallSnapshot) + expected := uint64(c.goroutines * c.eventsPerGoroutine) + if snap.TotalCount != expected { + t.Errorf("TotalCount = %d, want %d (concurrent writes lost events)", + snap.TotalCount, expected) + } + }) } }