diff --git a/cmd/bpf-verify/main.go b/cmd/bpf-verify/main.go index 63be437..720f07f 100644 --- a/cmd/bpf-verify/main.go +++ b/cmd/bpf-verify/main.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/cmd/kerno-mangen/main.go b/cmd/kerno-mangen/main.go index 9c91e58..df4c504 100644 --- a/cmd/kerno-mangen/main.go +++ b/cmd/kerno-mangen/main.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/cmd/kerno/main.go b/cmd/kerno/main.go index 824dfb3..0bf5fc4 100644 --- a/cmd/kerno/main.go +++ b/cmd/kerno/main.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/bpf/c/dns_monitor.c b/internal/bpf/c/dns_monitor.c new file mode 100644 index 0000000..d7bcf36 --- /dev/null +++ b/internal/bpf/c/dns_monitor.c @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2026 Optiqor contributors. +// +// dns_monitor.c - Traces UDP/53 DNS sends and receives per pod/process. +// +// Hooks: +// tracepoint/syscalls/sys_enter_sendmsg -> DNS request events +// tracepoint/syscalls/sys_enter_recvmsg -> DNS response events +// +// Filter: destination (send) or source (recv) port == 53 only. +// Output: ring buffer of dns_event structs. + +#include "headers/kerno.h" + +#ifndef AF_INET +#define AF_INET 2 +#endif + +// Output ring buffer. +KERNO_RINGBUF(dns_events); + +// In-flight request tracking: key = (pid << 16 | query_id), value = send timestamp_ns. +KERNO_HASH(dns_inflight, __u64, __u64, 4096); + +// Force BTF emission so bpf2go can extract the struct. +const struct dns_event *_force_btf_dns_event __attribute__((used)); + +// Helper: read query_id (first 2 bytes of UDP payload = DNS transaction ID). +static __always_inline __u16 read_query_id(const struct msghdr *msg) +{ + __u8 buf[2] = {0, 0}; + struct iovec iov = {}; + + // Read the first iovec from userspace. + if (bpf_probe_read_user(&iov, sizeof(iov), BPF_CORE_READ(msg, msg_iter.iov)) != 0) + return 0; + if (iov.iov_len < 2) + return 0; + + bpf_probe_read_user(buf, 2, iov.iov_base); + return ((__u16)buf[0] << 8) | buf[1]; +} + +// Helper: extract destination IPv4 address and port from sockaddr_in. +static __always_inline int read_dest(const struct msghdr *msg, + __u32 *daddr, __u16 *dport) +{ + void *name_ptr = NULL; + __u32 name_len = 0; + struct sockaddr_in sin = {}; + + bpf_probe_read_kernel(&name_ptr, sizeof(name_ptr), + &msg->msg_name); + bpf_probe_read_kernel(&name_len, sizeof(name_len), + &msg->msg_namelen); + + if (!name_ptr || name_len < sizeof(sin)) + return -1; + + bpf_probe_read_user(&sin, sizeof(sin), name_ptr); + if (sin.sin_family != AF_INET) + return -1; + + *daddr = sin.sin_addr.s_addr; + *dport = __builtin_bswap16(sin.sin_port); + return 0; +} + +// --- sys_enter_sendmsg ------------------------------------------------------- + +SEC("tracepoint/syscalls/sys_enter_sendmsg") +int tracepoint_sys_enter_sendmsg(struct trace_event_raw_sys_enter *ctx) +{ + // ctx->args[1] is the msghdr pointer. + struct msghdr *msg = (struct msghdr *)(long)ctx->args[1]; + if (!msg) + return 0; + + __u32 daddr = 0; + __u16 dport = 0; + if (read_dest(msg, &daddr, &dport) != 0) + return 0; + + // Filter: only DNS (port 53). + if (dport != 53) + return 0; + + __u64 pid_tgid = bpf_get_current_pid_tgid(); + __u16 qid = read_query_id(msg); + + // Record send timestamp for latency calculation. + __u64 inflight_key = ((pid_tgid >> 32) << 16) | qid; + __u64 now = bpf_ktime_get_ns(); + bpf_map_update_elem(&dns_inflight, &inflight_key, &now, BPF_ANY); + + struct dns_event *e = bpf_ringbuf_reserve(&dns_events, sizeof(*e), 0); + if (!e) + return 0; + + e->timestamp_ns = now; + e->cgroup_id = bpf_get_current_cgroup_id(); + e->pid = pid_tgid >> 32; + e->saddr = 0; // source filled in userspace from socket + e->daddr = daddr; + e->sport = 0; + e->dport = dport; + e->query_id = qid; + e->event_type = DNS_EVENT_SEND; + e->_pad = 0; + bpf_get_current_comm(&e->comm, sizeof(e->comm)); + + bpf_ringbuf_submit(e, 0); + return 0; +} + +// --- sys_enter_recvmsg ------------------------------------------------------- + +SEC("tracepoint/syscalls/sys_enter_recvmsg") +int tracepoint_sys_enter_recvmsg(struct trace_event_raw_sys_enter *ctx) +{ + struct msghdr *msg = (struct msghdr *)(long)ctx->args[1]; + if (!msg) + return 0; + + __u32 saddr = 0; + __u16 sport = 0; + if (read_dest(msg, &saddr, &sport) != 0) + return 0; + + // Only care about responses from port 53. + if (sport != 53) + return 0; + + __u64 pid_tgid = bpf_get_current_pid_tgid(); + __u16 qid = read_query_id(msg); + + __u64 inflight_key = ((pid_tgid >> 32) << 16) | qid; + __u64 *send_ns = bpf_map_lookup_elem(&dns_inflight, &inflight_key); + __u64 now = bpf_ktime_get_ns(); + + if (send_ns) + bpf_map_delete_elem(&dns_inflight, &inflight_key); + + struct dns_event *e = bpf_ringbuf_reserve(&dns_events, sizeof(*e), 0); + if (!e) + return 0; + + e->timestamp_ns = send_ns ? *send_ns : now; + e->cgroup_id = bpf_get_current_cgroup_id(); + e->pid = pid_tgid >> 32; + e->saddr = saddr; + e->daddr = 0; + e->sport = sport; + e->dport = 0; + e->query_id = qid; + e->event_type = DNS_EVENT_RECV; + e->_pad = 0; + bpf_get_current_comm(&e->comm, sizeof(e->comm)); + + bpf_ringbuf_submit(e, 0); + return 0; +} + +char LICENSE[] SEC("license") = "Dual BSD/GPL"; diff --git a/internal/bpf/c/headers/kerno.h b/internal/bpf/c/headers/kerno.h index ff23598..c23c273 100644 --- a/internal/bpf/c/headers/kerno.h +++ b/internal/bpf/c/headers/kerno.h @@ -163,4 +163,29 @@ struct file_event { __type(value, val_type); \ } name SEC(".maps") +// --- DNS Monitor Event ------------------------------------------------------- + +#define EVENT_DNS_MONITOR 8 + +// DNS event subtypes. +#define DNS_EVENT_SEND 1 +#define DNS_EVENT_RECV 2 + +struct dns_event { + __u64 timestamp_ns; + __u64 cgroup_id; + __u32 pid; + __u32 saddr; + __u32 daddr; + __u16 sport; + __u16 dport; + __u16 query_id; + __u8 event_type; + __u8 _pad; + char comm[TASK_COMM_LEN]; +}; + #endif // __KERNO_H__ + + + diff --git a/internal/bpf/closer.go b/internal/bpf/closer.go new file mode 100644 index 0000000..49e96e2 --- /dev/null +++ b/internal/bpf/closer.go @@ -0,0 +1,14 @@ +//go:build ebpf + +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package bpf + +// closerFunc adapts a plain function to the io.Closer interface. +type closerFunc func() + +func (f closerFunc) Close() error { + f() + return nil +} diff --git a/internal/bpf/decode.go b/internal/bpf/decode.go new file mode 100644 index 0000000..4a9e66f --- /dev/null +++ b/internal/bpf/decode.go @@ -0,0 +1,58 @@ +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package bpf + +import ( + "bytes" + "encoding/binary" + "fmt" +) + +func DecodeSyscallEvent(data []byte) (*SyscallEvent, error) { + var event SyscallEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding syscall event: %w", err) + } + return &event, nil +} + +func DecodeTCPEvent(data []byte) (*TCPEvent, error) { + var event TCPEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding tcp event: %w", err) + } + return &event, nil +} + +func DecodeOOMEvent(data []byte) (*OOMEvent, error) { + var event OOMEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding oom event: %w", err) + } + return &event, nil +} + +func DecodeDiskEvent(data []byte) (*DiskEvent, error) { + var event DiskEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding disk event: %w", err) + } + return &event, nil +} + +func DecodeSchedEvent(data []byte) (*SchedEvent, error) { + var event SchedEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding sched event: %w", err) + } + return &event, nil +} + +func DecodeFDEvent(data []byte) (*FDEvent, error) { + var event FDEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding fd event: %w", err) + } + return &event, nil +} diff --git a/internal/bpf/decode_fuzz_test.go b/internal/bpf/decode_fuzz_test.go index 4df504f..7d55504 100644 --- a/internal/bpf/decode_fuzz_test.go +++ b/internal/bpf/decode_fuzz_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/bpf/decode_test.go b/internal/bpf/decode_test.go index 8ea032a..06c54cc 100644 --- a/internal/bpf/decode_test.go +++ b/internal/bpf/decode_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -94,7 +96,7 @@ func TestDecodeTCPEvent(t *testing.T) { SAddr: binary.BigEndian.Uint32([]byte{10, 0, 0, 1}), DAddr: binary.BigEndian.Uint32([]byte{8, 8, 8, 8}), SPort: 54321, - DPort: 443, // already in host byte order (BPF normalizes before writing) + DPort: 443, EventType: TCPEventRTT, RTTUs: 250, } @@ -489,20 +491,4 @@ func TestIsSyscallError(t *testing.T) { } } -func TestTCPEventPortByteOrder(t *testing.T) { - // Verify that ports in TCPEvent are host byte order. - // The BPF program applies bpf_ntohs() to dport before writing, - // so both fields should be readable directly without swapping. - e := TCPEvent{ - SPort: 54321, // ephemeral source port, host order - DPort: 443, // HTTPS destination port, host order (not 47873) - } - if e.DPort == 0xBB01 { // 47873 = 443 byte-swapped - t.Error("DPort is in network byte order; expected host byte order after BPF normalization") - } - if e.DPort != 443 { - t.Errorf("DPort = %d, want 443", e.DPort) - } -} - var _ = net.IPv4(0, 0, 0, 0) diff --git a/internal/bpf/disk_io.go b/internal/bpf/disk_io.go index f80cd27..98d97fe 100644 --- a/internal/bpf/disk_io.go +++ b/internal/bpf/disk_io.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -125,10 +127,3 @@ func (l *DiskIOLoader) close() { } // DecodeDiskEvent decodes a raw event into a typed DiskEvent. -func DecodeDiskEvent(data []byte) (*DiskEvent, error) { - var event DiskEvent - if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { - return nil, fmt.Errorf("decoding disk event: %w", err) - } - return &event, nil -} diff --git a/internal/bpf/dns_monitor.go b/internal/bpf/dns_monitor.go new file mode 100644 index 0000000..58e9940 --- /dev/null +++ b/internal/bpf/dns_monitor.go @@ -0,0 +1,132 @@ +//go:build ebpf + +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package bpf + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "log/slog" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/ringbuf" +) + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -cflags "-O2 -g -Wall -Werror -D__TARGET_ARCH_x86 -I c/headers" -target bpfel -type dns_event dnsMonitor c/dns_monitor.c + +// DNSMonitorLoader manages the dns_monitor eBPF program. +type DNSMonitorLoader struct { + logger *slog.Logger + objs *dnsMonitorObjects + links []link.Link + reader *ringbuf.Reader +} + +// NewDNSMonitorLoader creates a new loader. +func NewDNSMonitorLoader(logger *slog.Logger) *DNSMonitorLoader { + return &DNSMonitorLoader{ + logger: logger.With("loader", "dns_monitor"), + } +} + +// Name implements Loader. +func (l *DNSMonitorLoader) Name() string { return "dns_monitor" } + +// Load implements Loader. +func (l *DNSMonitorLoader) Load() (io.Closer, error) { + l.objs = &dnsMonitorObjects{} + if err := loadDnsMonitorObjects(l.objs, &ebpf.CollectionOptions{}); err != nil { + return nil, fmt.Errorf("loading dns_monitor objects: %w", err) + } + + // Attach tracepoint/syscalls/sys_enter_sendmsg. + sendLink, err := link.Tracepoint("syscalls", "sys_enter_sendmsg", + l.objs.TracepointSysEnterSendmsg, nil) + if err != nil { + l.close() + return nil, fmt.Errorf("attaching sys_enter_sendmsg: %w", err) + } + l.links = append(l.links, sendLink) + + // Attach tracepoint/syscalls/sys_enter_recvmsg. + recvLink, err := link.Tracepoint("syscalls", "sys_enter_recvmsg", + l.objs.TracepointSysEnterRecvmsg, nil) + if err != nil { + l.close() + return nil, fmt.Errorf("attaching sys_enter_recvmsg: %w", err) + } + l.links = append(l.links, recvLink) + + // Open ring buffer reader. + reader, err := ringbuf.NewReader(l.objs.DnsEvents) + if err != nil { + l.close() + return nil, fmt.Errorf("opening dns ring buffer: %w", err) + } + l.reader = reader + + l.logger.Info("dns_monitor eBPF program loaded and attached") + return closerFunc(l.close), nil +} + +// Events implements Loader. +func (l *DNSMonitorLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + if l.reader == nil { + return nil, fmt.Errorf("loader not loaded; call Load() first") + } + ch := make(chan RawEvent, 256) + go l.readLoop(ctx, ch) + return ch, nil +} + +func (l *DNSMonitorLoader) readLoop(ctx context.Context, ch chan<- RawEvent) { + defer close(ch) + for { + record, err := l.reader.Read() + if err != nil { + if ctx.Err() != nil { + return + } + l.logger.Warn("dns ring buffer read error", "error", err) + return + } + select { + case <-ctx.Done(): + return + case ch <- RawEvent{ + Type: EventDNSMonitor, + Data: bytes.Clone(record.RawSample), + }: + } + } +} + +func (l *DNSMonitorLoader) close() { + if l.reader != nil { + l.reader.Close() + l.reader = nil + } + for _, lnk := range l.links { + lnk.Close() + } + l.links = nil + if l.objs != nil { + l.objs.Close() + l.objs = nil + } +} + +// DecodeDNSEvent decodes a raw event into a typed DNSEvent. +func DecodeDNSEvent(data []byte) (*DNSEvent, error) { + var event DNSEvent + if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { + return nil, fmt.Errorf("decoding dns event: %w", err) + } + return &event, nil +} diff --git a/internal/bpf/events.go b/internal/bpf/events.go index dd365f2..7b4d521 100644 --- a/internal/bpf/events.go +++ b/internal/bpf/events.go @@ -16,7 +16,7 @@ const TaskCommLen = 16 // MaxFilenameLen matches MAX_FILENAME_LEN in kerno.h. const MaxFilenameLen = 256 -// ─── Syscall Latency Event ───────────────────────────────────────────────── +// ─── Syscall Latency Event ───────────────────────────────────────────────── // SyscallEvent matches struct syscall_event in kerno.h. // Field order and sizes MUST be identical to the C struct. @@ -41,7 +41,7 @@ func (e *SyscallEvent) Latency() time.Duration { return time.Duration(e.LatencyNs) } -// ─── TCP Monitor Event ───────────────────────────────────────────────────── +// ─── TCP Monitor Event ───────────────────────────────────────────────────── // TCPEventType is the subtype of a TCP event. type TCPEventType uint8 @@ -111,7 +111,7 @@ func (e *TCPEvent) RTT() time.Duration { return time.Duration(e.RTTUs) * time.Microsecond } -// ─── OOM Kill Event ──────────────────────────────────────────────────────── +// ─── OOM Kill Event ──────────────────────────────────────────────────────── // OOMEvent matches struct oom_event in kerno.h. type OOMEvent struct { @@ -131,7 +131,7 @@ func (e *OOMEvent) CommString() string { return nullTermString(e.Comm[:]) } -// ─── Disk I/O Event ──────────────────────────────────────────────────────── +// ─── Disk I/O Event ──────────────────────────────────────────────────────── // DiskEvent matches struct disk_event in kerno.h. type DiskEvent struct { @@ -170,7 +170,7 @@ func (e *DiskEvent) OpString() string { } } -// ─── Scheduler Delay Event ───────────────────────────────────────────────── +// ─── Scheduler Delay Event ───────────────────────────────────────────────── // SchedEvent matches struct sched_event in kerno.h. type SchedEvent struct { @@ -192,7 +192,7 @@ func (e *SchedEvent) RunqDelay() time.Duration { return time.Duration(e.RunqDelayNs) } -// ─── File Descriptor Track Event ─────────────────────────────────────────── +// ─── File Descriptor Track Event ─────────────────────────────────────────── // FDOp is the type of file descriptor operation. type FDOp uint8 @@ -230,7 +230,7 @@ func (e *FDEvent) CommString() string { return nullTermString(e.Comm[:]) } -// ─── File Audit Event ────────────────────────────────────────────────────── +// ─── File Audit Event ────────────────────────────────────────────────────── // FileEvent matches struct file_event in kerno.h. type FileEvent struct { @@ -254,7 +254,7 @@ func (e *FileEvent) FilenameString() string { return nullTermString(e.Filename[:]) } -// ─── Helpers ──────────────────────────────────────────────────────────────── +// ─── Helpers ──────────────────────────────────────────────────────────────── // nullTermString converts a null-terminated byte slice to a Go string. func nullTermString(b []byte) string { @@ -265,3 +265,60 @@ func nullTermString(b []byte) string { } return string(b) } + +// --- DNS Monitor Event ------------------------------------------------------- + +// DNSEventType is the subtype of a DNS event. +type DNSEventType uint8 + +const ( + DNSEventSend DNSEventType = 1 + DNSEventRecv DNSEventType = 2 +) + +// String returns a human-readable name. +func (t DNSEventType) String() string { + switch t { + case DNSEventSend: + return "send" + case DNSEventRecv: + return "recv" + default: + return fmt.Sprintf("unknown(%d)", t) + } +} + +// DNSEvent matches struct dns_event in kerno.h exactly. +// Field order and sizes MUST be identical to the C struct. +type DNSEvent struct { + TimestampNs uint64 + CgroupID uint64 + PID uint32 + SAddr uint32 + DAddr uint32 + SPort uint16 + DPort uint16 + QueryID uint16 + EventType DNSEventType + Pad0 uint8 + Comm [TaskCommLen]byte +} + +// CommString returns the process name as a Go string. +func (e *DNSEvent) CommString() string { + return nullTermString(e.Comm[:]) +} + +// SrcAddr returns the source IP address. +func (e *DNSEvent) SrcAddr() net.IP { + ip := make(net.IP, 4) + binary.BigEndian.PutUint32(ip, e.SAddr) + return ip +} + +// DstAddr returns the destination IP address. +func (e *DNSEvent) DstAddr() net.IP { + ip := make(net.IP, 4) + binary.BigEndian.PutUint32(ip, e.DAddr) + return ip +} diff --git a/internal/bpf/fd_track.go b/internal/bpf/fd_track.go index 2a3053f..ea84208 100644 --- a/internal/bpf/fd_track.go +++ b/internal/bpf/fd_track.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -125,10 +127,3 @@ func (l *FDTrackLoader) close() { } // DecodeFDEvent decodes a raw event into a typed FDEvent. -func DecodeFDEvent(data []byte) (*FDEvent, error) { - var event FDEvent - if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { - return nil, fmt.Errorf("decoding fd event: %w", err) - } - return &event, nil -} diff --git a/internal/bpf/gen_stub.go b/internal/bpf/gen_stub.go index 606d05e..76044a2 100644 --- a/internal/bpf/gen_stub.go +++ b/internal/bpf/gen_stub.go @@ -20,12 +20,15 @@ package bpf import ( + "context" "fmt" + "io" + "log/slog" "github.com/cilium/ebpf" ) -// ─── Syscall Latency stubs ────────────────────────────────────────────────── +// ─── Syscall Latency stubs ────────────────────────────────────────────────── type syscallLatencyObjects struct { TracepointSysEnter *ebpf.Program `ebpf:"tracepoint_sys_enter"` @@ -39,7 +42,7 @@ func loadSyscallLatencyObjects(obj *syscallLatencyObjects, opts *ebpf.Collection func (o *syscallLatencyObjects) Close() error { return nil } -// ─── TCP Monitor stubs ────────────────────────────────────────────────────── +// ─── TCP Monitor stubs ────────────────────────────────────────────────────── type tcpMonitorObjects struct { TracepointTcpRetransmit *ebpf.Program `ebpf:"tracepoint_tcp_retransmit"` @@ -53,7 +56,7 @@ func loadTcpMonitorObjects(obj *tcpMonitorObjects, opts *ebpf.CollectionOptions) func (o *tcpMonitorObjects) Close() error { return nil } -// ─── OOM Track stubs ──────────────────────────────────────────────────────── +// ─── OOM Track stubs ──────────────────────────────────────────────────────── type oomTrackObjects struct { KprobeOomKill *ebpf.Program `ebpf:"kprobe_oom_kill"` @@ -66,7 +69,7 @@ func loadOomTrackObjects(obj *oomTrackObjects, opts *ebpf.CollectionOptions) err func (o *oomTrackObjects) Close() error { return nil } -// ─── Disk I/O stubs ───────────────────────────────────────────────────────── +// ─── Disk I/O stubs ───────────────────────────────────────────────────────── type diskIOObjects struct { TracepointBlockRqIssue *ebpf.Program `ebpf:"tracepoint_block_rq_issue"` @@ -80,7 +83,7 @@ func loadDiskIOObjects(obj *diskIOObjects, opts *ebpf.CollectionOptions) error { func (o *diskIOObjects) Close() error { return nil } -// ─── Sched Delay stubs ────────────────────────────────────────────────────── +// ─── Sched Delay stubs ────────────────────────────────────────────────────── type schedDelayObjects struct { TracepointSchedWakeup *ebpf.Program `ebpf:"tracepoint_sched_wakeup"` @@ -94,7 +97,7 @@ func loadSchedDelayObjects(obj *schedDelayObjects, opts *ebpf.CollectionOptions) func (o *schedDelayObjects) Close() error { return nil } -// ─── FD Track stubs ───────────────────────────────────────────────────────── +// ─── FD Track stubs ───────────────────────────────────────────────────────── type fdTrackObjects struct { TracepointSysExitOpenat *ebpf.Program `ebpf:"tracepoint_sys_exit_openat"` @@ -107,3 +110,84 @@ func loadFdTrackObjects(obj *fdTrackObjects, opts *ebpf.CollectionOptions) error } func (o *fdTrackObjects) Close() error { return nil } + +// --- DNS Monitor stubs ------------------------------------------------------- + +type dnsMonitorObjects struct { + TracepointSysEnterSendmsg *ebpf.Program `ebpf:"tracepoint_sys_enter_sendmsg"` + TracepointSysEnterRecvmsg *ebpf.Program `ebpf:"tracepoint_sys_enter_recvmsg"` + DnsEvents *ebpf.Map `ebpf:"dns_events"` + DnsInflight *ebpf.Map `ebpf:"dns_inflight"` +} + +func loadDnsMonitorObjects(obj *dnsMonitorObjects, opts *ebpf.CollectionOptions) error { + return fmt.Errorf("eBPF programs not compiled; run 'make generate' first") +} + +func (o *dnsMonitorObjects) Close() error { return nil } + +// --- Loader stubs (non-ebpf builds) ----------------------------------------- + +func NewSyscallLatencyLoader(logger *slog.Logger) *SyscallLatencyLoader { + return &SyscallLatencyLoader{} +} +func NewTCPMonitorLoader(logger *slog.Logger) *TCPMonitorLoader { return &TCPMonitorLoader{} } +func NewOOMTrackLoader(logger *slog.Logger) *OOMTrackLoader { return &OOMTrackLoader{} } +func NewDiskIOLoader(logger *slog.Logger) *DiskIOLoader { return &DiskIOLoader{} } +func NewSchedDelayLoader(logger *slog.Logger) *SchedDelayLoader { return &SchedDelayLoader{} } +func NewFDTrackLoader(logger *slog.Logger) *FDTrackLoader { return &FDTrackLoader{} } +func NewDNSMonitorLoader(logger *slog.Logger) *DNSMonitorLoader { return &DNSMonitorLoader{} } + +// --- Loader type stubs (non-ebpf builds) ------------------------------------ + +type SyscallLatencyLoader struct{} +type TCPMonitorLoader struct{} +type OOMTrackLoader struct{} +type DiskIOLoader struct{} +type SchedDelayLoader struct{} +type FDTrackLoader struct{} +type DNSMonitorLoader struct{} + +// --- Loader methods for non-ebpf builds -------------------------------------- + +func (l *SyscallLatencyLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *SyscallLatencyLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *SyscallLatencyLoader) Name() string { return "syscall_latency" } + +func (l *TCPMonitorLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *TCPMonitorLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *TCPMonitorLoader) Name() string { return "tcp_monitor" } + +func (l *OOMTrackLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *OOMTrackLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *OOMTrackLoader) Name() string { return "oom_track" } + +func (l *DiskIOLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *DiskIOLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *DiskIOLoader) Name() string { return "disk_io" } + +func (l *SchedDelayLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *SchedDelayLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *SchedDelayLoader) Name() string { return "sched_delay" } + +func (l *FDTrackLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *FDTrackLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *FDTrackLoader) Name() string { return "fd_track" } + +func (l *DNSMonitorLoader) Load() (io.Closer, error) { return nil, fmt.Errorf("eBPF not compiled") } +func (l *DNSMonitorLoader) Events(ctx context.Context) (<-chan RawEvent, error) { + return nil, fmt.Errorf("eBPF not compiled") +} +func (l *DNSMonitorLoader) Name() string { return "dns_monitor" } diff --git a/internal/bpf/loader.go b/internal/bpf/loader.go index 995e300..731347d 100644 --- a/internal/bpf/loader.go +++ b/internal/bpf/loader.go @@ -54,6 +54,7 @@ const ( EventSchedDelay EventType = 5 EventFDTrack EventType = 6 EventFileAudit EventType = 7 + EventDNSMonitor EventType = 8 ) // String returns the human-readable name of the event type. @@ -73,15 +74,9 @@ func (t EventType) String() string { return "fd_track" case EventFileAudit: return "file_audit" + case EventDNSMonitor: + return "dns_monitor" default: return fmt.Sprintf("unknown(%d)", t) } } - -// closerFunc adapts a plain function to the io.Closer interface. -type closerFunc func() - -func (f closerFunc) Close() error { - f() - return nil -} diff --git a/internal/bpf/oom_track.go b/internal/bpf/oom_track.go index 68280be..a61f9a2 100644 --- a/internal/bpf/oom_track.go +++ b/internal/bpf/oom_track.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -117,10 +119,3 @@ func (l *OOMTrackLoader) close() { } // DecodeOOMEvent decodes a raw event into a typed OOMEvent. -func DecodeOOMEvent(data []byte) (*OOMEvent, error) { - var event OOMEvent - if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { - return nil, fmt.Errorf("decoding oom event: %w", err) - } - return &event, nil -} diff --git a/internal/bpf/sched_delay.go b/internal/bpf/sched_delay.go index 6666a13..21a3a22 100644 --- a/internal/bpf/sched_delay.go +++ b/internal/bpf/sched_delay.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -125,10 +127,3 @@ func (l *SchedDelayLoader) close() { } // DecodeSchedEvent decodes a raw event into a typed SchedEvent. -func DecodeSchedEvent(data []byte) (*SchedEvent, error) { - var event SchedEvent - if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { - return nil, fmt.Errorf("decoding sched event: %w", err) - } - return &event, nil -} diff --git a/internal/bpf/syscall_latency.go b/internal/bpf/syscall_latency.go index f5e4981..2cb299b 100644 --- a/internal/bpf/syscall_latency.go +++ b/internal/bpf/syscall_latency.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -125,10 +127,3 @@ func (l *SyscallLatencyLoader) close() { } // DecodeSyscallEvent decodes a raw event into a typed SyscallEvent. -func DecodeSyscallEvent(data []byte) (*SyscallEvent, error) { - var event SyscallEvent - if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { - return nil, fmt.Errorf("decoding syscall event: %w", err) - } - return &event, nil -} diff --git a/internal/bpf/tcp_monitor.go b/internal/bpf/tcp_monitor.go index 26476d6..66c7190 100644 --- a/internal/bpf/tcp_monitor.go +++ b/internal/bpf/tcp_monitor.go @@ -1,3 +1,5 @@ +//go:build ebpf + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 @@ -125,10 +127,3 @@ func (l *TCPMonitorLoader) close() { } // DecodeTCPEvent decodes a raw event into a typed TCPEvent. -func DecodeTCPEvent(data []byte) (*TCPEvent, error) { - var event TCPEvent - if err := binary.Read(bytes.NewReader(data), binary.LittleEndian, &event); err != nil { - return nil, fmt.Errorf("decoding tcp event: %w", err) - } - return &event, nil -} diff --git a/internal/chaos/cpu.go b/internal/chaos/cpu.go index 2b6e1c3..f750b67 100644 --- a/internal/chaos/cpu.go +++ b/internal/chaos/cpu.go @@ -12,7 +12,7 @@ import ( "sync" "sync/atomic" - "golang.org/x/sys/unix" + "time" ) // CPUScenario saturates the host CPU by running tight loops on multiple @@ -89,10 +89,9 @@ func (s CPUScenario) Run(ctx context.Context, opts Options) error { // TASK_INTERRUPTIBLE → TASK_RUNNING, which is what // kerno's sched_delay collector measures. runtime.LockOSThread() - r := rand.New(rand.NewSource(seed)) //nolint:gosec - ts := unix.Timespec{Sec: 0, Nsec: 1_000_000} // 1 ms + r := rand.New(rand.NewSource(seed)) //nolint:gosec for ctx.Err() == nil { - _ = unix.Nanosleep(&ts, nil) + time.Sleep(time.Millisecond) var local float64 for k := 0; k < 5_000; k++ { local += math.Sqrt(r.Float64()) diff --git a/internal/chaos/dns.go b/internal/chaos/dns.go new file mode 100644 index 0000000..01f76c2 --- /dev/null +++ b/internal/chaos/dns.go @@ -0,0 +1,88 @@ +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package chaos + +import ( + "context" + "fmt" + "net" + "runtime" + "sync" + "sync/atomic" +) + +// DNSFloodScenario fires N DNS lookups per second against the local resolver +// (127.0.0.53 or systemd-resolved) to drive dns_high_latency and +// dns_failure_rate doctor rules. +type DNSFloodScenario struct{} + +func init() { Register(DNSFloodScenario{}) } + +// Name implements Scenario. +func (DNSFloodScenario) Name() string { return "dns-flood" } + +// Description implements Scenario. +func (DNSFloodScenario) Description() string { + return "Fire DNS lookups at high rate to stress CoreDNS and trigger DNS doctor rules" +} + +// PairedRule implements Scenario. +func (DNSFloodScenario) PairedRule() string { return "dns_high_latency" } + +// Run implements Scenario. +func (s DNSFloodScenario) Run(ctx context.Context, opts Options) error { + workers := workersFromIntensity(opts.Intensity, runtime.NumCPU()) + + // Domains to resolve — mix of real and synthetic to stress the resolver. + domains := []string{ + "kubernetes.default.svc.cluster.local.", + "kube-dns.kube-system.svc.cluster.local.", + "google.com.", + "nonexistent-host-kerno-chaos.local.", + "another-fake-host-kerno.cluster.local.", + } + + resolver := &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{} + // Try local resolver first; fall back to 8.8.8.8 for bare-metal. + conn, err := d.DialContext(ctx, "udp", "127.0.0.53:53") + if err != nil { + conn, err = d.DialContext(ctx, "udp", "8.8.8.8:53") + } + return conn, err + }, + } + + var ( + wg sync.WaitGroup + total atomic.Uint64 + errCnt atomic.Uint64 + ) + + fmt.Fprintf(opts.Out, " %d workers flooding DNS lookups\n", workers) + + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + idx := workerID % len(domains) + for ctx.Err() == nil { + domain := domains[idx%len(domains)] + idx++ + _, err := resolver.LookupHost(ctx, domain) + total.Add(1) + if err != nil && ctx.Err() == nil { + errCnt.Add(1) + } + } + }(i) + } + + wg.Wait() + fmt.Fprintf(opts.Out, " completed %d DNS lookups (%d errors)\n", + total.Load(), errCnt.Load()) + return nil +} diff --git a/internal/chaos/paired_rule_test.go b/internal/chaos/paired_rule_test.go index ea47d28..1349096 100644 --- a/internal/chaos/paired_rule_test.go +++ b/internal/chaos/paired_rule_test.go @@ -25,7 +25,9 @@ var validDoctorRules = map[string]bool{ "healthy_system": true, // "multiple" is a sentinel used by cascade — not a real rule but a // recognized placeholder. - "multiple": true, + "multiple": true, + "dns_high_latency": true, + "dns_failure_rate": true, } // TestPairedRulesExistInDoctor enforces that every chaos scenario's diff --git a/internal/cli/audit.go b/internal/cli/audit.go index afee606..9ea9744 100644 --- a/internal/cli/audit.go +++ b/internal/cli/audit.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/completion_test.go b/internal/cli/completion_test.go index a4d49a8..4925274 100644 --- a/internal/cli/completion_test.go +++ b/internal/cli/completion_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/doctor.go b/internal/cli/doctor.go index 5076c11..caa4969 100644 --- a/internal/cli/doctor.go +++ b/internal/cli/doctor.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/doctor_flags_test.go b/internal/cli/doctor_flags_test.go index fe5dd36..de3af8e 100644 --- a/internal/cli/doctor_flags_test.go +++ b/internal/cli/doctor_flags_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/explain.go b/internal/cli/explain.go index e92489c..bb7eeb3 100644 --- a/internal/cli/explain.go +++ b/internal/cli/explain.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/predict.go b/internal/cli/predict.go index d1674ba..c6c3cbc 100644 --- a/internal/cli/predict.go +++ b/internal/cli/predict.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/root.go b/internal/cli/root.go index 9209a44..8576104 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/start.go b/internal/cli/start.go index 08cfae7..deebdd2 100644 --- a/internal/cli/start.go +++ b/internal/cli/start.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/start_test.go b/internal/cli/start_test.go index 6d58840..e21eef0 100644 --- a/internal/cli/start_test.go +++ b/internal/cli/start_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 package cli diff --git a/internal/cli/trace.go b/internal/cli/trace.go index c52fc1d..682455b 100644 --- a/internal/cli/trace.go +++ b/internal/cli/trace.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/trace_disk.go b/internal/cli/trace_disk.go index d138fcb..40c556f 100644 --- a/internal/cli/trace_disk.go +++ b/internal/cli/trace_disk.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/trace_disk_flags_test.go b/internal/cli/trace_disk_flags_test.go index 114258d..ec55e6e 100644 --- a/internal/cli/trace_disk_flags_test.go +++ b/internal/cli/trace_disk_flags_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/trace_sched.go b/internal/cli/trace_sched.go index c817de2..844f225 100644 --- a/internal/cli/trace_sched.go +++ b/internal/cli/trace_sched.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/trace_syscall.go b/internal/cli/trace_syscall.go index f5d0764..8ea7f94 100644 --- a/internal/cli/trace_syscall.go +++ b/internal/cli/trace_syscall.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/trace_test.go b/internal/cli/trace_test.go index 1aefa43..96e7485 100644 --- a/internal/cli/trace_test.go +++ b/internal/cli/trace_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch.go b/internal/cli/watch.go index 740084b..be45d03 100644 --- a/internal/cli/watch.go +++ b/internal/cli/watch.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch_fd.go b/internal/cli/watch_fd.go index acc3c79..2e64ec5 100644 --- a/internal/cli/watch_fd.go +++ b/internal/cli/watch_fd.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch_fd_flags_test.go b/internal/cli/watch_fd_flags_test.go index b212f15..3a9d870 100644 --- a/internal/cli/watch_fd_flags_test.go +++ b/internal/cli/watch_fd_flags_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch_oom.go b/internal/cli/watch_oom.go index ab3f5d3..52c9f5d 100644 --- a/internal/cli/watch_oom.go +++ b/internal/cli/watch_oom.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch_oom_flags_test.go b/internal/cli/watch_oom_flags_test.go index e4e81be..d6f1544 100644 --- a/internal/cli/watch_oom_flags_test.go +++ b/internal/cli/watch_oom_flags_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch_tcp.go b/internal/cli/watch_tcp.go index 65c90e3..75d4e83 100644 --- a/internal/cli/watch_tcp.go +++ b/internal/cli/watch_tcp.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/cli/watch_test.go b/internal/cli/watch_test.go index f2d99fd..0fb4606 100644 --- a/internal/cli/watch_test.go +++ b/internal/cli/watch_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/cgroup_memory_test.go b/internal/collector/cgroup_memory_test.go index 6135dcf..c9c46f9 100644 --- a/internal/collector/cgroup_memory_test.go +++ b/internal/collector/cgroup_memory_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/disk.go b/internal/collector/disk.go index dc796dc..15b60f2 100644 --- a/internal/collector/disk.go +++ b/internal/collector/disk.go @@ -1,3 +1,5 @@ +//go:build linux || integration + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/dns.go b/internal/collector/dns.go new file mode 100644 index 0000000..cc53b9e --- /dev/null +++ b/internal/collector/dns.go @@ -0,0 +1,260 @@ +//go:build ebpf + +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +package collector + +import ( + "context" + "fmt" + "log/slog" + "sort" + "sync" + "time" + + "github.com/optiqor/kerno/internal/bpf" + "github.com/optiqor/kerno/internal/collector/aggregator" +) + +const ( + // dnsFailureTimeout is how long we wait for a response before marking a + // DNS query as failed. Matches the issue spec (5s). + dnsFailureTimeout = 5 * time.Second + + // maxDNSConsumers caps the per-process tracking table to prevent unbounded growth. + maxDNSConsumers = 512 +) + +// dnsQueryKey tracks an in-flight DNS query. +type dnsQueryKey struct { + pid uint32 + queryID uint16 +} + +// dnsInFlight is a pending DNS send waiting for its matching recv. +type dnsInFlight struct { + sentAt time.Time + comm string +} + +// dnsProcessAgg holds per-process DNS counters. +type dnsProcessAgg struct { + pid uint32 + comm string + requests uint64 + failures uint64 +} + +// DNSCollector consumes dns_monitor eBPF events and aggregates +// per-pod/process DNS metrics into a DNSSnapshot. +type DNSCollector struct { + logger *slog.Logger + loader *bpf.DNSMonitorLoader + + mu sync.Mutex + inflight map[dnsQueryKey]*dnsInFlight + latencyHist *aggregator.Histogram + totalReqs uint64 + totalResps uint64 + totalFails uint64 + windowStart time.Time + processes map[uint32]*dnsProcessAgg // keyed by pid + + cancelFn context.CancelFunc + done chan struct{} +} + +// NewDNSCollector creates a new DNS collector. +func NewDNSCollector(logger *slog.Logger, loader *bpf.DNSMonitorLoader) *DNSCollector { + return &DNSCollector{ + logger: logger.With("collector", "dns"), + loader: loader, + inflight: make(map[dnsQueryKey]*dnsInFlight), + latencyHist: aggregator.New(), + windowStart: time.Now(), + processes: make(map[uint32]*dnsProcessAgg), + done: make(chan struct{}), + } +} + +// Name implements Collector. +func (c *DNSCollector) Name() string { return "dns" } + +// Start implements Collector. +func (c *DNSCollector) Start(ctx context.Context) error { + runCtx, cancel := context.WithCancel(ctx) + c.cancelFn = cancel + + ch, err := c.loader.Events(runCtx) + if err != nil { + cancel() + return fmt.Errorf("opening dns events: %w", err) + } + + go c.consume(runCtx, ch) + go c.reapLoop(runCtx) + return nil +} + +// Stop implements Collector. +func (c *DNSCollector) Stop() { + if c.cancelFn != nil { + c.cancelFn() + } + select { + case <-c.done: + case <-time.After(2 * time.Second): + c.logger.Warn("dns collector did not stop within timeout") + } +} + +func (c *DNSCollector) consume(ctx context.Context, ch <-chan bpf.RawEvent) { + defer close(c.done) + for { + select { + case <-ctx.Done(): + return + case raw, ok := <-ch: + if !ok { + return + } + event, err := bpf.DecodeDNSEvent(raw.Data) + if err != nil { + c.logger.Debug("dns decode error", "error", err) + continue + } + c.record(event) + } + } +} + +// reapLoop periodically expires in-flight queries that never got a response. +func (c *DNSCollector) reapLoop(ctx context.Context) { + ticker := time.NewTicker(dnsFailureTimeout) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + c.reapExpired(now) + } + } +} + +func (c *DNSCollector) reapExpired(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + for key, inf := range c.inflight { + if now.Sub(inf.sentAt) >= dnsFailureTimeout { + c.totalFails++ + if p := c.getOrCreateProcess(key.pid, inf.comm); p != nil { + p.failures++ + } + delete(c.inflight, key) + } + } +} + +func (c *DNSCollector) record(event *bpf.DNSEvent) { + c.mu.Lock() + defer c.mu.Unlock() + + comm := event.CommString() + key := dnsQueryKey{pid: event.PID, queryID: event.QueryID} + + switch event.EventType { + case bpf.DNSEventSend: + c.totalReqs++ + if p := c.getOrCreateProcess(event.PID, comm); p != nil { + p.requests++ + } + // Only track if we have room. + if len(c.inflight) < 65536 { + c.inflight[key] = &dnsInFlight{ + sentAt: time.Unix(0, int64(event.TimestampNs)), + comm: comm, + } + } + + case bpf.DNSEventRecv: + c.totalResps++ + if inf, ok := c.inflight[key]; ok { + latencyNs := uint64(event.TimestampNs) - uint64(inf.sentAt.UnixNano()) + // Sanity check: only record if latency is under 30s. + if latencyNs < uint64(30*time.Second) { + c.latencyHist.Record(latencyNs) + } + delete(c.inflight, key) + } + } +} + +func (c *DNSCollector) getOrCreateProcess(pid uint32, comm string) *dnsProcessAgg { + if p, ok := c.processes[pid]; ok { + return p + } + if len(c.processes) >= maxDNSConsumers { + return nil + } + p := &dnsProcessAgg{pid: pid, comm: comm} + c.processes[pid] = p + return p +} + +// Snapshot implements Collector. Returns *DNSSnapshot. +func (c *DNSCollector) Snapshot() any { + c.mu.Lock() + defer c.mu.Unlock() + + elapsed := time.Since(c.windowStart).Seconds() + if elapsed < 1 { + elapsed = 1 + } + + latSnap := c.latencyHist.Snapshot() + + // Build top consumers list. + procs := make([]*dnsProcessAgg, 0, len(c.processes)) + for _, p := range c.processes { + procs = append(procs, p) + } + sort.Slice(procs, func(i, j int) bool { + return procs[i].requests > procs[j].requests + }) + limit := 10 + if len(procs) < limit { + limit = len(procs) + } + consumers := make([]DNSConsumerEntry, 0, limit) + for _, p := range procs[:limit] { + consumers = append(consumers, DNSConsumerEntry{ + PID: p.pid, + Comm: p.comm, + Requests: p.requests, + Failures: p.failures, + }) + } + + var failureRate float64 + if c.totalReqs > 0 { + failureRate = float64(c.totalFails) / float64(c.totalReqs) * 100.0 + } + + return &DNSSnapshot{ + RequestRate: float64(c.totalReqs) / elapsed, + ResponseRate: float64(c.totalResps) / elapsed, + FailureRate: failureRate, + TotalRequests: c.totalReqs, + TotalResponses: c.totalResps, + TotalFailures: c.totalFails, + Latency: Percentiles{ + P50: time.Duration(latSnap.Percentile(50)), + P95: time.Duration(latSnap.Percentile(95)), + P99: time.Duration(latSnap.Percentile(99)), + Max: time.Duration(latSnap.Max()), + }, + TopConsumers: consumers, + } +} diff --git a/internal/collector/dns_stub.go b/internal/collector/dns_stub.go new file mode 100644 index 0000000..fc5d5af --- /dev/null +++ b/internal/collector/dns_stub.go @@ -0,0 +1,25 @@ +// Copyright 2026 Optiqor contributors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !ebpf + +package collector + +import ( + "context" + "log/slog" +) + +// DNSCollector is a no-op stub when eBPF is not compiled in. +// The real implementation lives in dns.go (build tag: ebpf). +type DNSCollector struct{} + +// NewDNSCollector returns a no-op collector. +func NewDNSCollector(logger *slog.Logger, loader any) *DNSCollector { + return &DNSCollector{} +} + +func (c *DNSCollector) Name() string { return "dns" } +func (c *DNSCollector) Start(_ context.Context) error { return nil } +func (c *DNSCollector) Stop() {} +func (c *DNSCollector) Snapshot() any { return (*DNSSnapshot)(nil) } diff --git a/internal/collector/fd.go b/internal/collector/fd.go index faf8c63..b27321a 100644 --- a/internal/collector/fd.go +++ b/internal/collector/fd.go @@ -1,3 +1,5 @@ +//go:build linux || integration + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/memory_collector_test.go b/internal/collector/memory_collector_test.go index 4cb43da..4b27c29 100644 --- a/internal/collector/memory_collector_test.go +++ b/internal/collector/memory_collector_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/memory_test.go b/internal/collector/memory_test.go index 9e7507e..624afa2 100644 --- a/internal/collector/memory_test.go +++ b/internal/collector/memory_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/oom.go b/internal/collector/oom.go index 707ddf8..403f3d6 100644 --- a/internal/collector/oom.go +++ b/internal/collector/oom.go @@ -1,3 +1,5 @@ +//go:build linux || integration + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/sched.go b/internal/collector/sched.go index 9189be4..00e0c41 100644 --- a/internal/collector/sched.go +++ b/internal/collector/sched.go @@ -1,3 +1,5 @@ +//go:build linux || integration + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/signals.go b/internal/collector/signals.go index 5ca624e..e10aade 100644 --- a/internal/collector/signals.go +++ b/internal/collector/signals.go @@ -8,19 +8,10 @@ import ( ) // Signals is the combined snapshot from all collectors at a point in time. -// This is the single struct that the doctor engine, exporters, and dashboard -// all consume. It provides a holistic view of kernel health. type Signals struct { - // Timestamp is when this snapshot was taken. - Timestamp time.Time `json:"timestamp"` - - // Duration is the analysis window (e.g., 30s for doctor). - Duration time.Duration `json:"duration"` - - // Host contains basic host identification. - Host HostInfo `json:"host"` - - // Per-signal snapshots (nil if collector is disabled or has no data). + Timestamp time.Time `json:"timestamp"` + Duration time.Duration `json:"duration"` + Host HostInfo `json:"host"` Syscall *SyscallSnapshot `json:"syscall,omitempty"` TCP *TCPSnapshot `json:"tcp,omitempty"` OOM *OOMSnapshot `json:"oom,omitempty"` @@ -29,6 +20,7 @@ type Signals struct { FD *FDSnapshot `json:"fd,omitempty"` Memory *MemorySnapshot `json:"memory,omitempty"` CgroupMemory *CgroupMemorySnapshot `json:"cgroupMemory,omitempty"` + DNS *DNSSnapshot `json:"dns,omitempty"` } // HostInfo identifies the machine being observed. @@ -39,8 +31,6 @@ type HostInfo struct { Arch string `json:"arch"` } -// ─── Percentiles ──────────────────────────────────────────────────────────── - // Percentiles holds common latency distribution values. type Percentiles struct { P50 time.Duration `json:"p50"` @@ -49,46 +39,30 @@ type Percentiles struct { Max time.Duration `json:"max"` } -// ─── Syscall Snapshot ─────────────────────────────────────────────────────── - // SyscallSnapshot is an aggregated view of syscall latencies over a window. type SyscallSnapshot struct { - // Entries keyed by (syscall_nr, comm). - Entries []SyscallEntry `json:"entries"` - - // TotalCount is the total number of syscall events observed. - TotalCount uint64 `json:"totalCount"` + Entries []SyscallEntry `json:"entries"` + TotalCount uint64 `json:"totalCount"` } -// SyscallEntry represents latency stats for one (syscall, process) pair. type SyscallEntry struct { SyscallNr uint32 `json:"syscallNr"` - Name string `json:"name"` // resolved syscall name + Name string `json:"name"` Comm string `json:"comm"` Count uint64 `json:"count"` - ErrorCount uint64 `json:"errorCount"` // syscalls that returned error + ErrorCount uint64 `json:"errorCount"` Latency Percentiles `json:"latency"` } -// ─── TCP Snapshot ─────────────────────────────────────────────────────────── - // TCPSnapshot is an aggregated view of TCP connection health over a window. type TCPSnapshot struct { - // Connections tracked during the window. - ActiveConnections int `json:"activeConnections"` - - // Retransmit statistics. - TotalRetransmits uint64 `json:"totalRetransmits"` - RetransmitRate float64 `json:"retransmitRate"` // percentage - - // RTT distribution across all connections. - RTT Percentiles `json:"rtt"` - - // Top connections by retransmit count. + ActiveConnections int `json:"activeConnections"` + TotalRetransmits uint64 `json:"totalRetransmits"` + RetransmitRate float64 `json:"retransmitRate"` + RTT Percentiles `json:"rtt"` TopRetransmitters []TCPConnectionEntry `json:"topRetransmitters,omitempty"` } -// TCPConnectionEntry represents stats for a single TCP 4-tuple. type TCPConnectionEntry struct { SrcAddr string `json:"srcAddr"` DstAddr string `json:"dstAddr"` @@ -99,16 +73,12 @@ type TCPConnectionEntry struct { Retransmits uint32 `json:"retransmits"` } -// ─── OOM Snapshot ─────────────────────────────────────────────────────────── - // OOMSnapshot contains OOM kill events observed during the window. -// Every OOM event is captured — no aggregation (each one is critical). type OOMSnapshot struct { Events []OOMEventEntry `json:"events"` Count int `json:"count"` } -// OOMEventEntry is a single OOM kill event. type OOMEventEntry struct { Timestamp time.Time `json:"timestamp"` PID uint32 `json:"pid"` @@ -120,40 +90,25 @@ type OOMEventEntry struct { CgroupID uint64 `json:"cgroupId"` } -// ─── Disk I/O Snapshot ────────────────────────────────────────────────────── - // DiskIOSnapshot is an aggregated view of block I/O latencies over a window. type DiskIOSnapshot struct { - // Per-operation latency distributions. ReadLatency Percentiles `json:"readLatency"` WriteLatency Percentiles `json:"writeLatency"` SyncLatency Percentiles `json:"syncLatency"` - - // Counts. - TotalReads uint64 `json:"totalReads"` - TotalWrites uint64 `json:"totalWrites"` - TotalSyncs uint64 `json:"totalSyncs"` - - // Throughput. - ReadBytes uint64 `json:"readBytes"` - WriteBytes uint64 `json:"writeBytes"` + TotalReads uint64 `json:"totalReads"` + TotalWrites uint64 `json:"totalWrites"` + TotalSyncs uint64 `json:"totalSyncs"` + ReadBytes uint64 `json:"readBytes"` + WriteBytes uint64 `json:"writeBytes"` } -// ─── Scheduler Snapshot ───────────────────────────────────────────────────── - // SchedSnapshot is an aggregated view of CPU run queue delays over a window. type SchedSnapshot struct { - // Global run queue delay distribution. - RunqDelay Percentiles `json:"runqDelay"` - - // Per-process entries with highest delays. + RunqDelay Percentiles `json:"runqDelay"` TopDelayed []SchedEntry `json:"topDelayed,omitempty"` - - // TotalCount is the total number of scheduling events observed. - TotalCount uint64 `json:"totalCount"` + TotalCount uint64 `json:"totalCount"` } -// SchedEntry represents scheduling stats for one process. type SchedEntry struct { PID uint32 `json:"pid"` Comm string `json:"comm"` @@ -161,91 +116,71 @@ type SchedEntry struct { RunqDelay Percentiles `json:"runqDelay"` } -// ─── FD Snapshot ──────────────────────────────────────────────────────────── - // FDSnapshot tracks file descriptor open/close rates to detect leaks. type FDSnapshot struct { - // Per-PID FD tracking. - Entries []FDEntry `json:"entries,omitempty"` - - // Global counters. - TotalOpens uint64 `json:"totalOpens"` - TotalCloses uint64 `json:"totalCloses"` - NetDelta int64 `json:"netDelta"` // opens - closes - GrowthRate float64 `json:"growthRate"` // FDs per second + Entries []FDEntry `json:"entries,omitempty"` + TotalOpens uint64 `json:"totalOpens"` + TotalCloses uint64 `json:"totalCloses"` + NetDelta int64 `json:"netDelta"` + GrowthRate float64 `json:"growthRate"` } -// FDEntry represents FD stats for one process. type FDEntry struct { PID uint32 `json:"pid"` Comm string `json:"comm"` Opens uint64 `json:"opens"` Closes uint64 `json:"closes"` NetDelta int64 `json:"netDelta"` - GrowthRate float64 `json:"growthRate"` // FDs per second + GrowthRate float64 `json:"growthRate"` } -// ─── Cgroup Memory Snapshot ────────────────────────────────────────────────── - -// CgroupMemorySnapshot holds per-container cgroup v2 memory state, populated -// by the CgroupMemoryCollector. On systems without cgroup v2 limits this will -// be nil or contain an empty Containers slice. +// CgroupMemorySnapshot holds per-container cgroup v2 memory state. type CgroupMemorySnapshot struct { Containers []CgroupMemoryEntry `json:"containers"` } -// CgroupMemoryEntry is the memory state for a single cgroup (container). type CgroupMemoryEntry struct { - // CgroupPath is the absolute path of the cgroup directory. - CgroupPath string `json:"cgroupPath"` - // Pod is the pod name or UID extracted from the cgroup path. - Pod string `json:"pod"` - // Namespace is the Kubernetes namespace (empty when enrichment is not available). - Namespace string `json:"namespace"` - - // CurrentBytes is the value of memory.current. - CurrentBytes uint64 `json:"currentBytes"` - // LimitBytes is the value of memory.max. - LimitBytes uint64 `json:"limitBytes"` - // HighBytes is the value of memory.high (0 = not set). - HighBytes uint64 `json:"highBytes"` - // UsedPct is CurrentBytes / LimitBytes * 100. - UsedPct float64 `json:"usedPct"` - - // GrowthRateBytesPerSec is the rate of change of CurrentBytes between polls. + CgroupPath string `json:"cgroupPath"` + Pod string `json:"pod"` + Namespace string `json:"namespace"` + CurrentBytes uint64 `json:"currentBytes"` + LimitBytes uint64 `json:"limitBytes"` + HighBytes uint64 `json:"highBytes"` + UsedPct float64 `json:"usedPct"` GrowthRateBytesPerSec float64 `json:"growthRateBytesPerSec"` - // HighEventRate is the rate of memory.events.high increments per second. - HighEventRate float64 `json:"highEventRate"` + HighEventRate float64 `json:"highEventRate"` + EventsHigh uint64 `json:"eventsHigh"` + EventsMax uint64 `json:"eventsMax"` + EventsOOM uint64 `json:"eventsOOM"` + EventsOOMKill uint64 `json:"eventsOOMKill"` +} - // Event counters from memory.events. - EventsHigh uint64 `json:"eventsHigh"` - EventsMax uint64 `json:"eventsMax"` - EventsOOM uint64 `json:"eventsOOM"` - EventsOOMKill uint64 `json:"eventsOOMKill"` +// DNSSnapshot is an aggregated view of DNS query health over a window. +type DNSSnapshot struct { + RequestRate float64 `json:"requestRate"` + ResponseRate float64 `json:"responseRate"` + FailureRate float64 `json:"failureRate"` + TotalRequests uint64 `json:"totalRequests"` + TotalResponses uint64 `json:"totalResponses"` + TotalFailures uint64 `json:"totalFailures"` + Latency Percentiles `json:"latency"` + TopConsumers []DNSConsumerEntry `json:"topConsumers,omitempty"` } -// ─── Memory Snapshot ───────────────────────────────────────────────────────── +type DNSConsumerEntry struct { + PID uint32 `json:"pid"` + Comm string `json:"comm"` + Requests uint64 `json:"requests"` + Failures uint64 `json:"failures"` +} // MemorySnapshot tracks system memory usage and pressure. type MemorySnapshot struct { - // TotalBytes is total system memory. - TotalBytes uint64 `json:"totalBytes"` - - // UsedBytes is current memory in use (excluding caches/buffers). - UsedBytes uint64 `json:"usedBytes"` - - // UsedPct is the percentage of memory in use (0–100). - UsedPct float64 `json:"usedPct"` - - // GrowthRateBytesPerSec is the rate of memory consumption growth. + TotalBytes uint64 `json:"totalBytes"` + UsedBytes uint64 `json:"usedBytes"` + UsedPct float64 `json:"usedPct"` GrowthRateBytesPerSec float64 `json:"growthRateBytesPerSec"` - - // AvailableBytes is memory available for allocation without swapping. - AvailableBytes uint64 `json:"availableBytes"` - - // SwapUsedBytes is current swap usage. - SwapUsedBytes uint64 `json:"swapUsedBytes"` - - // SwapTotalBytes is total swap space. - SwapTotalBytes uint64 `json:"swapTotalBytes"` + AvailableBytes uint64 `json:"availableBytes"` + SwapUsedBytes uint64 `json:"swapUsedBytes"` + SwapTotalBytes uint64 `json:"swapTotalBytes"` } diff --git a/internal/collector/syscall.go b/internal/collector/syscall.go index 7b4431c..b638aa9 100644 --- a/internal/collector/syscall.go +++ b/internal/collector/syscall.go @@ -1,3 +1,5 @@ +//go:build linux || integration + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/syscall_test.go b/internal/collector/syscall_test.go index c4e1963..d2bfb44 100644 --- a/internal/collector/syscall_test.go +++ b/internal/collector/syscall_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/collector/tcp.go b/internal/collector/tcp.go index f9c7abd..e35b449 100644 --- a/internal/collector/tcp.go +++ b/internal/collector/tcp.go @@ -1,3 +1,5 @@ +//go:build linux || integration + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/doctor/rules.go b/internal/doctor/rules.go index 93d771b..5e93d95 100644 --- a/internal/doctor/rules.go +++ b/internal/doctor/rules.go @@ -14,7 +14,7 @@ import ( ) // Evaluate runs all diagnostic rules against the collected signals and returns -// findings sorted by severity. This is the deterministic core of kerno doctor — +// findings sorted by severity. This is the deterministic core of kerno doctor — // no AI, no network calls, always available. func Evaluate(signals *collector.Signals, thresholds config.DoctorThresholds) []Finding { var findings []Finding @@ -30,6 +30,8 @@ func Evaluate(signals *collector.Signals, thresholds config.DoctorThresholds) [] findings = append(findings, evalSyscallErrorRate(signals)...) findings = append(findings, evalMemoryLimitPressure(signals)...) findings = append(findings, evalMemoryHighThrottling(signals)...) + findings = append(findings, evalDNSHighLatency(signals, thresholds)...) + findings = append(findings, evalDNSFailureRate(signals, thresholds)...) // If nothing found, emit "healthy system" info. if len(findings) == 0 { @@ -40,7 +42,7 @@ func Evaluate(signals *collector.Signals, thresholds config.DoctorThresholds) [] return findings } -// ── Rule 1: Disk I/O Bottleneck ───────────────────────────────────────────── +// ── Rule 1: Disk I/O Bottleneck ───────────────────────────────────────────── func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.DiskIO == nil { @@ -51,7 +53,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin warningNs := time.Duration(t.DiskP99WarningNs) criticalNs := time.Duration(t.DiskP99CriticalNs) - // Check sync latency (fsync — most impactful for databases). + // Check sync latency (fsync — most impactful for databases). if syncP99 := s.DiskIO.SyncLatency.P99; syncP99 > 0 { if syncP99 >= criticalNs { findings = append(findings, Finding{ @@ -59,7 +61,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin Rule: "disk_io_bottleneck", Title: "Disk I/O Bottleneck Detected", Signal: "diskio", - Cause: "Storage device is saturated — sync/fsync operations are blocking", + Cause: "Storage device is saturated — sync/fsync operations are blocking", Impact: "Database writes and file syncs are delayed, causing cascade latency", Evidence: fmt.Sprintf("sync P99=%s (threshold: %s), %d sync ops in window", syncP99, criticalNs, s.DiskIO.TotalSyncs), Fix: []string{"Check disk IOPS: iostat -x 1 5", "Check write queue depth", "Consider faster storage or async fsync"}, @@ -92,7 +94,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin Title: "Critical Disk Write Latency", Signal: "diskio", Cause: "Block-level write operations are critically slow", - Impact: "All write I/O is affected — applications may hang or timeout", + Impact: "All write I/O is affected — applications may hang or timeout", Evidence: fmt.Sprintf("write P99=%s (threshold: %s), %d writes", writeP99, criticalNs, s.DiskIO.TotalWrites), Fix: []string{"Check device health: smartctl -a /dev/sdX", "Check for I/O scheduler issues"}, Metric: "disk_write_p99", @@ -104,7 +106,7 @@ func evalDiskIOBottleneck(s *collector.Signals, t config.DoctorThresholds) []Fin return findings } -// ── Rule 2/3: OOM Kill ────────────────────────────────────────────────────── +// ── Rule 2/3: OOM Kill ────────────────────────────────────────────────────── func evalOOMKillOccurred(s *collector.Signals) []Finding { if s.OOM == nil || s.OOM.Count == 0 { @@ -119,7 +121,7 @@ func evalOOMKillOccurred(s *collector.Signals) []Finding { Title: "OOM Kill Detected", Signal: "oom", Cause: fmt.Sprintf("Process %s (pid %d) was killed by the OOM killer", evt.Comm, evt.PID), - Impact: "Process was terminated — service disruption likely", + Impact: "Process was terminated — service disruption likely", Evidence: fmt.Sprintf("OOM score: %d, RSS pages: %d, total pages: %d", evt.OOMScore, evt.RSSPages, evt.TotalPages), Fix: []string{ fmt.Sprintf("Check memory limits for process: cat /proc/%d/cgroup", evt.PID), @@ -135,7 +137,7 @@ func evalOOMKillOccurred(s *collector.Signals) []Finding { return findings } -// ── Rule 4: TCP Retransmit Storm ──────────────────────────────────────────── +// ── Rule 4: TCP Retransmit Storm ──────────────────────────────────────────── func evalTCPRetransmitStorm(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.TCP == nil { @@ -153,7 +155,7 @@ func evalTCPRetransmitStorm(s *collector.Signals, t config.DoctorThresholds) []F Title: "TCP Retransmit Storm", Signal: "tcp", Cause: "Network path degradation causing excessive retransmissions", - Impact: fmt.Sprintf("%.1f%% of TCP segments are being retransmitted — every connection has a chance of latency spike", rate), + Impact: fmt.Sprintf("%.1f%% of TCP segments are being retransmitted — every connection has a chance of latency spike", rate), Evidence: fmt.Sprintf("retransmit rate=%.1f%% (threshold: %.1f%%), %d total retransmits, %d active connections", rate, t.TCPRetransmitPct, s.TCP.TotalRetransmits, s.TCP.ActiveConnections), Fix: []string{"Check network errors: ethtool -S eth0 | grep -i error", "Check for packet loss: ping -c 100 ", "Consider pod/service placement (cross-AZ traffic)"}, Metric: "tcp_retransmit_pct", @@ -164,14 +166,14 @@ func evalTCPRetransmitStorm(s *collector.Signals, t config.DoctorThresholds) []F // Add top retransmitter info if available. if len(s.TCP.TopRetransmitters) > 0 { top := s.TCP.TopRetransmitters[0] - f.Evidence += fmt.Sprintf(", top: %s:%d → %s:%d (%d retransmits)", + f.Evidence += fmt.Sprintf(", top: %s:%d → %s:%d (%d retransmits)", top.SrcAddr, top.SrcPort, top.DstAddr, top.DstPort, top.Retransmits) } return []Finding{f} } -// ── Rule 5: TCP RTT Degradation ───────────────────────────────────────────── +// ── Rule 5: TCP RTT Degradation ───────────────────────────────────────────── func evalTCPRTTDegradation(s *collector.Signals, _ config.DoctorThresholds) []Finding { if s.TCP == nil { @@ -190,7 +192,7 @@ func evalTCPRTTDegradation(s *collector.Signals, _ config.DoctorThresholds) []Fi Title: "Elevated TCP Round-Trip Time", Signal: "tcp", Cause: "Network latency is higher than expected", - Impact: fmt.Sprintf("Every TCP round-trip adds %s of latency — impacts all network-dependent operations", s.TCP.RTT.P99), + Impact: fmt.Sprintf("Every TCP round-trip adds %s of latency — impacts all network-dependent operations", s.TCP.RTT.P99), Evidence: fmt.Sprintf("RTT P99=%s, P50=%s (threshold: %s)", s.TCP.RTT.P99, s.TCP.RTT.P50, rttThreshold), Fix: []string{"Check network path: traceroute ", "Check for congestion: ss -ti", "Consider co-locating services to reduce hops"}, Metric: "tcp_rtt_p99", @@ -199,7 +201,7 @@ func evalTCPRTTDegradation(s *collector.Signals, _ config.DoctorThresholds) []Fi }} } -// ── Rule 6: Scheduler Contention ──────────────────────────────────────────── +// ── Rule 6: Scheduler Contention ──────────────────────────────────────────── func evalSchedulerContention(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.Sched == nil { @@ -225,7 +227,7 @@ func evalSchedulerContention(s *collector.Signals, t config.DoctorThresholds) [] Title: "CPU Scheduler Contention", Signal: "sched", Cause: "Processes are waiting in the CPU run queue longer than expected", - Impact: fmt.Sprintf("Every context switch adds ~%s of delay — compounds with I/O latency", delay), + Impact: fmt.Sprintf("Every context switch adds ~%s of delay — compounds with I/O latency", delay), Evidence: fmt.Sprintf("runqueue P99=%s, P50=%s (warning: %s, critical: %s)", delay, s.Sched.RunqDelay.P50, warningNs, criticalNs), Fix: []string{"Check CPU usage: top -H", "Consider increasing CPU count or reducing worker threads", "Check for noisy neighbors on shared nodes"}, Metric: "sched_runq_p99", @@ -243,7 +245,7 @@ func evalSchedulerContention(s *collector.Signals, t config.DoctorThresholds) [] return []Finding{f} } -// ── Rule 7: FD Leak ───────────────────────────────────────────────────────── +// ── Rule 7: FD Leak ───────────────────────────────────────────────────────── func evalFDLeak(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.FD == nil { @@ -291,7 +293,7 @@ func evalFDLeak(s *collector.Signals, t config.DoctorThresholds) []Finding { return []Finding{f} } -// ── Rule 8: Syscall Latency High ──────────────────────────────────────────── +// ── Rule 8: Syscall Latency High ──────────────────────────────────────────── // evalSyscallLatencyHigh emits at most one finding per run, even when many // (syscall, comm) pairs cross the threshold. The worst pair drives severity @@ -310,7 +312,7 @@ func evalSyscallLatencyHigh(s *collector.Signals, t config.DoctorThresholds) []F for _, entry := range s.Syscall.Entries { // Voluntary-blocking syscalls (futex, epoll_wait, poll, ...) // have latency dominated by userspace wait time, not by the - // kernel — flagging them produces false positives on idle hosts. + // kernel — flagging them produces false positives on idle hosts. if bpf.IsBlockingSyscall(entry.SyscallNr) { continue } @@ -372,7 +374,7 @@ func evalSyscallLatencyHigh(s *collector.Signals, t config.DoctorThresholds) []F }} } -// ── Rule 9: OOM Imminent ───────────────────────────────────────────────────── +// ── Rule 9: OOM Imminent ───────────────────────────────────────────────────── func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding { if s.Memory == nil { @@ -381,7 +383,7 @@ func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding threshold := t.OOMMemoryPct // Negative threshold disables the rule. Zero is treated literally - // (fires on any non-zero usage) — useful for tests; default config + // (fires on any non-zero usage) — useful for tests; default config // supplies 90.0 for production. if threshold < 0 { return nil @@ -392,12 +394,12 @@ func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding } sev := SeverityWarning - title := "Memory Pressure — OOM Risk" + title := "Memory Pressure — OOM Risk" // If memory is >95% AND growing, it's critical. if s.Memory.UsedPct > 95.0 && s.Memory.GrowthRateBytesPerSec > 0 { sev = SeverityCritical - title = "OOM Imminent — Memory Nearly Exhausted" + title = "OOM Imminent — Memory Nearly Exhausted" } f := Finding{ @@ -426,7 +428,7 @@ func evalOOMImminent(s *collector.Signals, t config.DoctorThresholds) []Finding return []Finding{f} } -// ── Rule 10: Syscall Error Rate ────────────────────────────────────────────── +// ── Rule 10: Syscall Error Rate ────────────────────────────────────────────── // evalSyscallErrorRate emits at most one finding per run. See the same // invariant note on evalSyscallLatencyHigh. @@ -477,7 +479,7 @@ func evalSyscallErrorRate(s *collector.Signals) []Finding { } var ev strings.Builder - fmt.Fprintf(&ev, "%d syscalls have error rate ≥ 1%%. Worst: %s(%s)=%.1f%% (%d/%d).", + fmt.Fprintf(&ev, "%d syscalls have error rate ≥ 1%%. Worst: %s(%s)=%.1f%% (%d/%d).", len(entries), name, top.entry.Comm, top.rate, top.entry.ErrorCount, top.entry.Count) title := fmt.Sprintf("High Syscall Error Rate (%d affected)", len(entries)) @@ -501,7 +503,7 @@ func evalSyscallErrorRate(s *collector.Signals) []Finding { }} } -// ── Rule 12: Memory Limit Pressure ────────────────────────────────────────── +// ── Rule 12: Memory Limit Pressure ────────────────────────────────────────── // evalMemoryLimitPressure fires for each container that is close to its // cgroup v2 memory.max limit. WARNING at >85 %; CRITICAL at >95 % with @@ -588,7 +590,7 @@ func evalMemoryLimitPressure(s *collector.Signals) []Finding { return findings } -// ── Rule 13: Memory High Throttling ───────────────────────────────────────── +// ── Rule 13: Memory High Throttling ───────────────────────────────────────── // evalMemoryHighThrottling fires when the kernel is reclaiming memory under // the memory.high soft limit at a sustained rate of more than 1 event/sec. @@ -659,7 +661,7 @@ func formatBytes(b uint64) string { } } -// ── Rule 11: Healthy System ───────────────────────────────────────────────── +// ── Rule 11: Healthy System ───────────────────────────────────────────────── func evalHealthySystem(s *collector.Signals) Finding { evidence := "All kernel signals within normal thresholds" @@ -676,8 +678,106 @@ func evalHealthySystem(s *collector.Signals) Finding { Title: "System Healthy", Signal: "all", Cause: "No issues detected during the analysis window", - Impact: "None — all signals are within configured thresholds", + Impact: "None — all signals are within configured thresholds", Evidence: evidence, Fix: []string{"Run kerno doctor --continuous for ongoing monitoring"}, } } + +// --- Rule: DNS High Latency -------------------------------------------------- + +func evalDNSHighLatency(s *collector.Signals, _ config.DoctorThresholds) []Finding { + if s.DNS == nil { + return nil + } + + p99 := s.DNS.Latency.P99 + if p99 == 0 { + return nil + } + + warningThreshold := 100 * time.Millisecond + criticalThreshold := 500 * time.Millisecond + + if p99 < warningThreshold { + return nil + } + + sev := SeverityWarning + thresh := warningThreshold + if p99 >= criticalThreshold { + sev = SeverityCritical + thresh = criticalThreshold + } + + return []Finding{{ + Severity: sev, + Rule: "dns_high_latency", + Title: "DNS Resolution Latency Elevated", + Signal: "dns", + Cause: "CoreDNS or upstream DNS resolver is responding slowly", + Impact: fmt.Sprintf("Every DNS lookup adds %s — cascades into request timeouts and misleading 'connection refused' errors", p99), + Evidence: fmt.Sprintf("DNS P99=%s, P50=%s (warning: %s, critical: %s), %.1f req/s", p99, s.DNS.Latency.P50, warningThreshold, criticalThreshold, s.DNS.RequestRate), + Fix: []string{ + "Check CoreDNS pod health: kubectl -n kube-system get pods -l k8s-app=kube-dns", + "Check CoreDNS logs: kubectl -n kube-system logs -l k8s-app=kube-dns", + "Check DNS query rate: kerno doctor --signal dns", + "Consider adding DNS caching (ndots tuning, dnsPolicy: None with custom nameservers)", + }, + Metric: "dns_latency_p99", + Value: float64(p99.Nanoseconds()), + Threshold: float64(thresh.Nanoseconds()), + }} +} + +// --- Rule: DNS Failure Rate -------------------------------------------------- + +func evalDNSFailureRate(s *collector.Signals, _ config.DoctorThresholds) []Finding { + if s.DNS == nil || s.DNS.TotalRequests == 0 { + return nil + } + + rate := s.DNS.FailureRate + + warningThreshold := 1.0 // 1% timeouts + criticalThreshold := 5.0 // 5% timeouts + + if rate < warningThreshold { + return nil + } + + sev := SeverityWarning + thresh := warningThreshold + if rate >= criticalThreshold { + sev = SeverityCritical + thresh = criticalThreshold + } + + f := Finding{ + Severity: sev, + Rule: "dns_failure_rate", + Title: "DNS Query Failure Rate Elevated", + Signal: "dns", + Cause: "DNS queries are timing out without a response within 5 seconds", + Impact: fmt.Sprintf("%.1f%% of DNS lookups are failing — applications see 'no such host' or hang waiting for resolution", rate), + Evidence: fmt.Sprintf("failure rate=%.1f%% (warning: %.1f%%, critical: %.1f%%), %d/%d queries failed, %.1f req/s", rate, warningThreshold, criticalThreshold, s.DNS.TotalFailures, s.DNS.TotalRequests, s.DNS.RequestRate), + Fix: []string{ + "Check if CoreDNS is reachable: kubectl -n kube-system get svc kube-dns", + "Check CoreDNS CPU/memory: kubectl -n kube-system top pods -l k8s-app=kube-dns", + "Test DNS resolution manually: kubectl run -it --rm debug --image=busybox -- nslookup kubernetes.default", + "Increase CoreDNS replicas if under load: kubectl -n kube-system scale deploy coredns --replicas=3", + }, + Metric: "dns_failure_rate_pct", + Value: rate, + Threshold: thresh, + } + + // Add top failing consumers if available. + if len(s.DNS.TopConsumers) > 0 { + top := s.DNS.TopConsumers[0] + f.Evidence += fmt.Sprintf(", top consumer: %s (pid %d, %d reqs, %d failures)", top.Comm, top.PID, top.Requests, top.Failures) + f.Process = top.Comm + } + + return []Finding{f} +} diff --git a/internal/metrics/bridge.go b/internal/metrics/bridge.go index 697185d..8fdd25f 100644 --- a/internal/metrics/bridge.go +++ b/internal/metrics/bridge.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/metrics/bridge_test.go b/internal/metrics/bridge_test.go index f19b413..5732439 100644 --- a/internal/metrics/bridge_test.go +++ b/internal/metrics/bridge_test.go @@ -1,3 +1,5 @@ +//go:build linux + // Copyright 2026 Optiqor contributors // SPDX-License-Identifier: Apache-2.0 diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8b41ac3..220d8fe 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -6,7 +6,7 @@ // All metrics use the "kerno_" namespace prefix. The package exposes typed // metric variables that event consumers (the bridge in collector.go) can // update directly. A dedicated custom registry is used so that the default -// Go process metrics are excluded — only Kerno metrics are exposed. +// Go process metrics are excluded — only Kerno metrics are exposed. package metrics import ( @@ -21,7 +21,7 @@ const Namespace = "kerno" // and gives us precise control over what /metrics exposes. var Registry = prometheus.NewRegistry() -// ─── Syscall Metrics ────────────────────────────────────────────────────── +// ─── Syscall Metrics ────────────────────────────────────────────────────── // SyscallDuration tracks syscall latency distributions. var SyscallDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ @@ -38,7 +38,7 @@ var SyscallTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Help: "Total number of traced syscall events.", }, []string{"syscall", "process"}) -// ─── TCP Metrics ────────────────────────────────────────────────────────── +// ─── TCP Metrics ────────────────────────────────────────────────────────── // TCPRTT tracks TCP round-trip time distributions. var TCPRTT = prometheus.NewSummaryVec(prometheus.SummaryOpts{ @@ -62,7 +62,7 @@ var TCPConnectionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Help: "Total TCP connection events observed.", }, []string{"src", "dst", "process"}) -// ─── OOM Metrics ────────────────────────────────────────────────────────── +// ─── OOM Metrics ────────────────────────────────────────────────────────── // OOMKillsTotal counts the number of OOM kill events. var OOMKillsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -71,7 +71,7 @@ var OOMKillsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Help: "Total OOM kill events observed.", }, []string{"process"}) -// ─── Disk I/O Metrics ───────────────────────────────────────────────────── +// ─── Disk I/O Metrics ───────────────────────────────────────────────────── // DiskIODuration tracks disk I/O latency distributions. var DiskIODuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ @@ -88,7 +88,7 @@ var DiskIOBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Help: "Total bytes processed by disk I/O operations.", }, []string{"device", "operation"}) -// ─── Scheduler Metrics ──────────────────────────────────────────────────── +// ─── Scheduler Metrics ──────────────────────────────────────────────────── // SchedDelay tracks CPU run queue delay distributions. var SchedDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ @@ -98,7 +98,7 @@ var SchedDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Objectives: map[float64]float64{0.5: 0.05, 0.95: 0.01, 0.99: 0.001}, }, []string{"process"}) -// ─── FD Metrics ─────────────────────────────────────────────────────────── +// ─── FD Metrics ─────────────────────────────────────────────────────────── // FDOpenTotal tracks the total number of file descriptor opens. var FDOpenTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -114,7 +114,7 @@ var FDCloseTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Help: "Total file descriptor close operations.", }, []string{"process"}) -// ─── Cgroup Memory Metrics ──────────────────────────────────────────────── +// ─── Cgroup Memory Metrics ──────────────────────────────────────────────── // CgroupMemoryPressurePct tracks per-container memory usage as a percentage // of the cgroup memory limit. Labeled by pod only; namespace label will be @@ -125,7 +125,7 @@ var CgroupMemoryPressurePct = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Help: "Per-container memory usage as a percentage of the cgroup memory.max limit.", }, []string{"pod"}) -// ─── Self-Monitoring Metrics ────────────────────────────────────────────── +// ─── Self-Monitoring Metrics ────────────────────────────────────────────── // CollectorEventsTotal counts events processed per collector. var CollectorEventsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{