diff --git a/README.md b/README.md index 64ad007..5a8bac4 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,10 @@ Channel = "stable" # stable / beta # UATruncate = 1024 # truncate user-agent at this length # URITruncate = 2048 # truncate request URI at this length # ExtraUAPatterns = ["MyCustomCrawler/"] # local additions to the bot list +# Field aliases — only set when discovery cannot infer the right name +# (e.g. operator-defined `set $custom $http_referer;`). Empty falls back. +# [BotLogs.FieldAliases] +# Referer = "ref" ``` | Parameter | Default | Description | @@ -190,6 +194,7 @@ Channel = "stable" # stable / beta | `BotLogs.UATruncate` | `1024` | Max UA length per event | | `BotLogs.URITruncate` | `2048` | Max URI length per event | | `BotLogs.ExtraUAPatterns` | `[]` | Local additions to known-bots UA patterns | +| `BotLogs.FieldAliases.*` | auto | Per-format field-name overrides (UserAgent/Host/ServerName/RemoteAddr/Referer). Discovery auto-detects from `log_format`; only set when operator uses non-standard variables nginx config | ### Environment variables @@ -380,11 +385,19 @@ Enabling `[BotLogs]` extends nginx auto-discovery to log_formats *without* timing histograms are simply skipped per-line for those. Installs without `[BotLogs]` keep the original behavior. -**Required log_format variables**: `$http_user_agent`, `$server_name`, -`$remote_addr`, `$http_referer`. Any user-supplied `[Nginx].ExtraLabels` -are dropped while bot-logs is enabled (ParsedLine has only four label slots) -— this is logged on startup. Missing variables surface as empty strings; -events still ship. +**Required log_format variables**: `$http_user_agent`, `$host` / +`$server_name`, `$remote_addr`, `$http_referer`. Field names are +auto-detected from each tailed `log_format` — common alternates work +out of the box (`http_host`, `realip_remote_addr`, `http_x_real_ip`, +`http_x_forwarded_for`, `http_referrer` typo, custom JSON keys). The +resolution and its provenance (override / detected / default) are +emitted as a `botlog: resolved field aliases` log line at startup. +When discovery cannot infer the right name (e.g. operator-defined +`set $custom $http_referer;`), override via `[BotLogs.FieldAliases]`. +If a format genuinely lacks UA, a `botlog_no_ua_field` warning is +raised and `topsrv_collector_config_warnings_total{kind=...}` ticks. +Operator-supplied `[Nginx].ExtraLabels` are kept on metrics labels; +required botlog fields are unioned into `ExtractFields` for parsing. Metrics: `topsrv_botlog_events_total{state=enqueued|sent|spooled|dropped}`, `topsrv_botlog_match_total{family}`, `topsrv_botlog_send_errors_total{kind}`, diff --git a/cfg/local.toml.dist b/cfg/local.toml.dist index 650fe35..e3f2434 100644 --- a/cfg/local.toml.dist +++ b/cfg/local.toml.dist @@ -51,3 +51,9 @@ Channel = "stable" # stable / beta # UATruncate = 1024 # truncate user-agent at this length # URITruncate = 2048 # truncate request URI at this length # ExtraUAPatterns = ["MyCustomCrawler/"] # local additions to the bot list +# Field aliases — override only if discovery cannot infer the right name +# (e.g. nginx `set $custom $http_referer;` puts $custom in your log_format). +# Empty entries fall back to auto-detected, then to default nginx variable. +# [BotLogs.FieldAliases] +# Referer = "ref" +# UserAgent = "" diff --git a/docs/metrics.md b/docs/metrics.md index ed85cc5..e3b9a81 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -306,4 +306,4 @@ Per-collector instrumentation. Any collector registered via `addCollector` is wr |--------|------|--------|-------------| | `topsrv_collector_scrape_duration_seconds` | gauge | collector | Last scrape duration. Alert: `> 5s` = monitoring is adding overhead to the target | | `topsrv_collector_scrape_panics_total` | counter | collector | Panics recovered during Collect. Any non-zero rate = bug, page immediately | -| `topsrv_collector_config_warnings_total` | counter | kind | Operator-config warnings raised at startup. `kind` ∈ {`high_card_label` (denylisted variable in ExtraLabels), `missing_extract` (ExtraLabels references variable absent from ExtractFields → empty label values), `truncated_extract` (ExtractFields > MaxExtras=8 → tail dropped), `botlog_no_ua_field` (BotLogs enabled but no log_format carries http_user_agent)}. Any non-zero value points at a fixable config — see the matching WARN in stdout/journald for the offending names | +| `topsrv_collector_config_warnings_total` | counter | kind | Operator-config warnings raised at startup. `kind` ∈ {`high_card_label` (denylisted variable in ExtraLabels), `missing_extract` (ExtraLabels references variable absent from ExtractFields → empty label values), `truncated_extract` (ExtractFields > MaxExtras=8 → tail dropped), `botlog_no_ua_field` (BotLogs enabled but no log_format carries http_user_agent — override via `[BotLogs.FieldAliases].UserAgent`), `botlog_alias_mismatch` (tailed paths resolve to non-identical botlog aliases; first path's resolution is used)}. Any non-zero value points at a fixable config — see the matching WARN in stdout/journald for the offending names | diff --git a/internal/app/app.go b/internal/app/app.go index 38d55fd..5322522 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -92,6 +92,11 @@ type App struct { // Observer's positional field-index resolution. extractFields []string + // Per-format field aliases resolved from discovered log_format strings. + // registerLogCollector picks one canonical set (warning on mismatch) and + // registerBotLogs hands it to NewObserver. + botlogAliases botlog.FieldAliases + // Tracks pusher/log collector/smart/updater so Shutdown can wait for // their drain paths before the process exits. bg sync.WaitGroup @@ -132,7 +137,7 @@ func New(appName, version string, logger embedlog.Logger, cfg Config) *App { }, []string{"collector"}), configWarnings: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "topsrv_collector_config_warnings_total", - Help: "Operator-config warnings raised at startup. kind=high_card_label|missing_extract|truncated_extract|botlog_no_ua_field.", + Help: "Operator-config warnings raised at startup. kind=high_card_label|missing_extract|truncated_extract|botlog_no_ua_field|botlog_alias_mismatch.", }, []string{"kind"}), } a.registry.MustRegister(a.scrapeDuration, a.scrapePanics, a.configWarnings) @@ -356,10 +361,11 @@ func (a *App) registerLogCollector(ctx context.Context, cfg nginx.LogConfig) { cfg.ExtractFields = cfg.ExtraLabels if a.cfg.BotLogs != nil && a.cfg.BotLogs.Enabled { - cfg.ExtractFields = mergeUnique(cfg.ExtraLabels, botlog.RequiredFields()) - if !logHasUserAgent(cfg) { + a.botlogAliases = a.resolveBotlogAliases(ctx, cfg) + cfg.ExtractFields = mergeUnique(cfg.ExtraLabels, botlog.RequiredFields(a.botlogAliases)) + if a.botlogAliases.UserAgent == "" { a.warnConfig(ctx, "botlog_no_ua_field", - "BotLogs enabled but no tailed log_format contains http_user_agent; events will never match — check nginx/angie log_format directives", + "BotLogs enabled but no tailed log_format contains http_user_agent; events will never match — check nginx/angie log_format directives, or set [BotLogs.FieldAliases].UserAgent to the custom field name", "log_paths", cfg.LogPaths) } } @@ -417,16 +423,94 @@ func truncateList(s []string, n int) []string { return append(append([]string(nil), s[:n]...), "…+more") } -func logHasUserAgent(cfg nginx.LogConfig) bool { - if strings.Contains(cfg.LogFormat, "http_user_agent") { - return true +// resolveBotlogAliases inspects every log_format paired with cfg's LogPaths +// (default plus per-path overrides) and returns one canonical alias set for +// the Observer. The TOML override from BotLogs.FieldAliases wins, then the +// detected aliases, with DefaultAliases as the last-resort layer. +// +// Multi-format setups: if paths resolve to non-identical alias sets we keep +// the first path's resolution and emit a config warning — the v2 plan is +// per-path Observer wiring once an operator hits this in production. +func (a *App) resolveBotlogAliases(ctx context.Context, cfg nginx.LogConfig) botlog.FieldAliases { + override := botlog.FieldAliases{} + if a.cfg.BotLogs != nil { + override = a.cfg.BotLogs.FieldAliases + } + + formatFor := func(p string) (string, bool) { + if f, ok := cfg.LogFormats[p]; ok { + return f, cfg.JSONPaths[p] + } + return cfg.LogFormat, false } - for _, f := range cfg.LogFormats { - if strings.Contains(f, "http_user_agent") { - return true + + // Memoise per (format, isJSON) — N paths sharing one log_format incur + // one DetectAliases call instead of N. + type formatKey struct { + format string + isJSON bool + } + detectCache := make(map[formatKey]botlog.FieldAliases) + + var canonical, canonicalDetected botlog.FieldAliases + var canonicalPath string + mismatched := make([]string, 0) + for _, p := range cfg.LogPaths { + format, isJSON := formatFor(p) + key := formatKey{format, isJSON} + detected, ok := detectCache[key] + if !ok { + detected = botlog.DetectAliases(format, isJSON) + detectCache[key] = detected + } + resolved := override.WithFallback(detected).WithFallback(botlog.DefaultAliases()) + if canonicalPath == "" { + canonical = resolved + canonicalDetected = detected + canonicalPath = p + continue + } + if resolved != canonical { + mismatched = append(mismatched, p) + } + } + if len(mismatched) > 0 { + a.warnConfig(ctx, "botlog_alias_mismatch", + "log_formats across tailed paths resolve to different botlog aliases; using first path's resolution", + "canonical_path", canonicalPath, "mismatched_paths", truncateList(mismatched, 8)) + } + if canonicalPath != "" { + a.Print(ctx, "botlog: resolved field aliases", + "aliases", canonical.String(), + "sources", aliasSources(override, canonicalDetected), + "canonical_path", canonicalPath) + } + return canonical +} + +// aliasSources renders per-field provenance: "override" if the operator set +// the alias via [BotLogs.FieldAliases], "detected" if it came from +// log_format inspection, "default" if neither matched and DefaultAliases +// supplied the value. Mirrors FieldAliases.String() shape so log readers can +// align the two lines. +func aliasSources(override, detected botlog.FieldAliases) string { + src := func(o, d string) string { + switch { + case o != "": + return "override" + case d != "": + return "detected" + default: + return "default" } } - return false + return fmt.Sprintf("ua=%s host=%s server=%s remote=%s referer=%s", + src(override.UserAgent, detected.UserAgent), + src(override.Host, detected.Host), + src(override.ServerName, detected.ServerName), + src(override.RemoteAddr, detected.RemoteAddr), + src(override.Referer, detected.Referer), + ) } // highCardLabels returns the intersection of labels with the denylist. @@ -516,7 +600,7 @@ func (a *App) registerBotLogs(ctx context.Context) { return } bp := botlog.NewPusher(a.Logger, a.appName, a.version, *a.cfg.BotLogs, a.registry) - obs := botlog.NewObserver(bp, *a.cfg.BotLogs, a.hostname, a.extractFields) + obs := botlog.NewObserver(bp, *a.cfg.BotLogs, a.hostname, a.extractFields, a.botlogAliases) a.logCollector.AddObserver(obs) a.goBackground(func() { bp.Run(ctx) }) a.Print(ctx, "botlog: observer attached", "endpoint", a.cfg.BotLogs.Endpoint, "spool", a.cfg.BotLogs.SpoolDir) diff --git a/internal/app/app_test.go b/internal/app/app_test.go index ed83698..c3c103e 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -1,8 +1,10 @@ package app import ( + "context" "testing" + "github.com/vmkteam/topsrv/internal/topsrv/botlog" "github.com/vmkteam/topsrv/internal/topsrv/nginx" "github.com/prometheus/client_golang/prometheus" @@ -59,30 +61,72 @@ func TestInstrumentedCollectorRecoversPanic(t *testing.T) { assert.InDelta(t, 1.0, counterValue(t, panics), 1e-9, "panics counter must be incremented once") } -func TestLogHasUserAgent(t *testing.T) { - cases := []struct { - name string - cfg nginx.LogConfig - want bool - }{ - {"empty", nginx.LogConfig{}, false}, - {"single format with UA", nginx.LogConfig{LogFormat: "$remote_addr - $http_user_agent"}, true}, - {"single format without UA", nginx.LogConfig{LogFormat: "$remote_addr $request_time"}, false}, - {"map with UA", nginx.LogConfig{LogFormats: map[string]string{"/a": "$http_user_agent"}}, true}, - {"map without UA", nginx.LogConfig{LogFormats: map[string]string{"/a": "$remote_addr"}}, false}, - {"any-of map has UA", nginx.LogConfig{LogFormats: map[string]string{ - "/a": "$remote_addr", - "/b": "$http_user_agent", - }}, true}, - {"json with UA", nginx.LogConfig{LogFormats: map[string]string{ - "/a": `{"ua":"$http_user_agent"}`, - }}, true}, +// newAliasTestApp builds the minimal App required to exercise +// resolveBotlogAliases — Logger for warnConfig, configWarnings counter for +// the warning side-effect, and a BotLogs config carrying the override. +func newAliasTestApp(t *testing.T, override botlog.FieldAliases) *App { + t.Helper() + a := &App{ + cfg: Config{BotLogs: &botlog.Config{FieldAliases: override}}, + configWarnings: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "test_config_warnings_total", Help: "test", + }, []string{"kind"}), } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.want, logHasUserAgent(tc.cfg)) - }) + return a +} + +func TestResolveBotlogAliases_SinglePathStandardFormat(t *testing.T) { + a := newAliasTestApp(t, botlog.FieldAliases{}) + cfg := nginx.LogConfig{ + LogPaths: []string{"/var/log/nginx/a.log"}, + LogFormat: `$remote_addr [$time_local] "$request" $status "$http_referer" "$http_user_agent"`, + } + got := a.resolveBotlogAliases(context.Background(), cfg) + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Equal(t, "http_referer", got.Referer) + assert.Equal(t, "remote_addr", got.RemoteAddr) +} + +func TestResolveBotlogAliases_OverrideWins(t *testing.T) { + a := newAliasTestApp(t, botlog.FieldAliases{Referer: "ref"}) + cfg := nginx.LogConfig{ + LogPaths: []string{"/var/log/nginx/a.log"}, + LogFormat: `$remote_addr "$http_referer" "$http_user_agent"`, + } + got := a.resolveBotlogAliases(context.Background(), cfg) + assert.Equal(t, "ref", got.Referer, "explicit override beats auto-detected") + assert.Equal(t, "http_user_agent", got.UserAgent, "non-overridden fields keep detection") +} + +func TestResolveBotlogAliases_MismatchEmitsWarning(t *testing.T) { + a := newAliasTestApp(t, botlog.FieldAliases{}) + // Two paths, different formats with diverging detected aliases: + // path A uses $http_referer, path B uses $http_referrer. + cfg := nginx.LogConfig{ + LogPaths: []string{"/var/log/a.log", "/var/log/b.log"}, + LogFormats: map[string]string{ + "/var/log/a.log": `$remote_addr "$http_referer" "$http_user_agent"`, + "/var/log/b.log": `$remote_addr "$http_referrer" "$http_user_agent"`, + }, + } + got := a.resolveBotlogAliases(context.Background(), cfg) + assert.Equal(t, "http_referer", got.Referer, "first path's resolution wins") + assert.InDelta(t, 1.0, counterValue(t, a.configWarnings.WithLabelValues("botlog_alias_mismatch")), 1e-9) +} + +func TestResolveBotlogAliases_AllPathsAgreeNoWarning(t *testing.T) { + a := newAliasTestApp(t, botlog.FieldAliases{}) + // Two paths sharing the same format → no mismatch warning, no DetectAliases + // re-run (memoised by (format,isJSON)). + cfg := nginx.LogConfig{ + LogPaths: []string{"/var/log/a.log", "/var/log/b.log"}, + LogFormats: map[string]string{ + "/var/log/a.log": `$remote_addr "$http_referer" "$http_user_agent"`, + "/var/log/b.log": `$remote_addr "$http_referer" "$http_user_agent"`, + }, } + a.resolveBotlogAliases(context.Background(), cfg) + assert.InDelta(t, 0.0, counterValue(t, a.configWarnings.WithLabelValues("botlog_alias_mismatch")), 1e-9) } // TestInstrumentedCollectorNormalCallNoPanicCount verifies the counter stays zero diff --git a/internal/topsrv/botlog/aliases.go b/internal/topsrv/botlog/aliases.go new file mode 100644 index 0000000..1efa446 --- /dev/null +++ b/internal/topsrv/botlog/aliases.go @@ -0,0 +1,182 @@ +package botlog + +import ( + "regexp" + "slices" + "strings" + "sync" +) + +// FieldAliases resolves the per-format names of the five nginx variables +// botlog needs. For non-JSON formats the value is the nginx variable name +// (matches gonx's field naming, e.g. "http_referer" for "$http_referer"). +// For JSON formats the value is the JSON key wrapping the variable in the +// log_format (e.g. "ref" for `"ref":"$http_referer"`). An empty string means +// the format doesn't carry the field and downstream code should ship empty. +type FieldAliases struct { + UserAgent string + Host string + ServerName string + RemoteAddr string + Referer string +} + +// DefaultAliases returns the canonical nginx variable names — used as the +// last-resort fallback when discovery is unavailable and no explicit +// override is configured. +func DefaultAliases() FieldAliases { + return FieldAliases{ + UserAgent: fieldUserAgent, + Host: fieldHost, + ServerName: fieldServerName, + RemoteAddr: fieldRemoteAddr, + Referer: fieldReferer, + } +} + +// WithFallback returns a copy of a with each empty field replaced by the +// corresponding value from fallback. Reads "use a, falling back to fallback +// for fields a left empty" — supports layering chains like +// `override.WithFallback(detected).WithFallback(DefaultAliases())`. +func (a FieldAliases) WithFallback(fallback FieldAliases) FieldAliases { + out := a + if out.UserAgent == "" { + out.UserAgent = fallback.UserAgent + } + if out.Host == "" { + out.Host = fallback.Host + } + if out.ServerName == "" { + out.ServerName = fallback.ServerName + } + if out.RemoteAddr == "" { + out.RemoteAddr = fallback.RemoteAddr + } + if out.Referer == "" { + out.Referer = fallback.Referer + } + return out +} + +// Names returns the resolved field names in a stable order, dropping empty +// entries — suitable for merging into LogConfig.ExtractFields so the parser +// knows which nginx variables to copy into ParsedLine.Extras. +func (a FieldAliases) Names() []string { + out := []string{a.UserAgent, a.Host, a.ServerName, a.RemoteAddr, a.Referer} + return slices.DeleteFunc(out, func(s string) bool { return s == "" }) +} + +// Known nginx-variable variants per semantic field. First match wins. +// Ordering reflects operator preference (standard → common typo → fallback). +var ( + uaCandidates = []string{"http_user_agent"} + hostCandidates = []string{"host", "http_host"} + serverCandidates = []string{"server_name"} + remoteCandidates = []string{"remote_addr", "realip_remote_addr", "http_x_real_ip", "http_x_forwarded_for"} + refererCandidates = []string{"http_referer", "http_referrer", "referer"} +) + +// DetectAliases inspects an nginx log_format string and returns the resolved +// field name for each semantic field. Empty values for fields the format +// doesn't carry — caller layers DefaultAliases under and an explicit override +// on top via WithFallback. +// +// For non-JSON formats (combined / key=value / logfmt / hybrid), the result +// is the nginx variable name without the '$' prefix. For JSON formats, the +// result is the JSON key wrapping the variable in the format string. +func DetectAliases(format string, isJSON bool) FieldAliases { + if isJSON { + return FieldAliases{ + UserAgent: detectJSONKey(format, uaCandidates), + Host: detectJSONKey(format, hostCandidates), + ServerName: detectJSONKey(format, serverCandidates), + RemoteAddr: detectJSONKey(format, remoteCandidates), + Referer: detectJSONKey(format, refererCandidates), + } + } + return FieldAliases{ + UserAgent: detectTextVar(format, uaCandidates), + Host: detectTextVar(format, hostCandidates), + ServerName: detectTextVar(format, serverCandidates), + RemoteAddr: detectTextVar(format, remoteCandidates), + Referer: detectTextVar(format, refererCandidates), + } +} + +// detectTextVar returns the first candidate that appears as $candidate in +// format (word-boundary delimited, so $http_referer does not match +// $http_referrer). gonx uses the variable name itself as the field name, +// so the returned string is the lookup key for entry.Field(). +func detectTextVar(format string, candidates []string) string { + for _, name := range candidates { + if textVarRe(name).MatchString(format) { + return name + } + } + return "" +} + +// detectJSONKey returns the JSON key that wraps the first matched candidate +// variable in a JSON log_format — patterns like `"":"$"` or +// the variant with whitespace around the colon. Returns "" if no candidate +// is wrapped. +func detectJSONKey(format string, candidates []string) string { + for _, name := range candidates { + if m := jsonKeyRe(name).FindStringSubmatch(format); m != nil { + return m[1] + } + } + return "" +} + +// reCache memoises compiled regexes — DetectAliases runs once per log_format +// at startup, and across N tailed paths that share a format we'd otherwise +// compile the same patterns N times. +var reCache sync.Map // string → *regexp.Regexp + +func cachedRegexp(key, pattern string) *regexp.Regexp { + if v, ok := reCache.Load(key); ok { + if re, ok := v.(*regexp.Regexp); ok { + return re + } + } + re := regexp.MustCompile(pattern) + reCache.Store(key, re) + return re +} + +// textVarRe matches "$name" with a word boundary so http_referer does not +// accidentally match the longer http_referrer. +func textVarRe(name string) *regexp.Regexp { + return cachedRegexp("t:"+name, `\$`+regexp.QuoteMeta(name)+`\b`) +} + +// jsonKeyRe matches `"":"$"` (with optional whitespace around the +// colon) and captures the JSON key. The variable side is word-boundaried so +// $http_referer does not consume $http_referrer's key. +func jsonKeyRe(name string) *regexp.Regexp { + return cachedRegexp("j:"+name, `"([^"]+)"\s*:\s*"\$`+regexp.QuoteMeta(name)+`\b`) +} + +// String renders aliases as a one-liner suitable for startup logging. +func (a FieldAliases) String() string { + var sb strings.Builder + sb.WriteString("ua=") + sb.WriteString(orDash(a.UserAgent)) + sb.WriteString(" host=") + sb.WriteString(orDash(a.Host)) + sb.WriteString(" server=") + sb.WriteString(orDash(a.ServerName)) + sb.WriteString(" remote=") + sb.WriteString(orDash(a.RemoteAddr)) + sb.WriteString(" referer=") + sb.WriteString(orDash(a.Referer)) + return sb.String() +} + +func orDash(s string) string { + if s == "" { + return "-" + } + return s +} diff --git a/internal/topsrv/botlog/aliases_test.go b/internal/topsrv/botlog/aliases_test.go new file mode 100644 index 0000000..66ae588 --- /dev/null +++ b/internal/topsrv/botlog/aliases_test.go @@ -0,0 +1,167 @@ +package botlog + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultAliases(t *testing.T) { + a := DefaultAliases() + assert.Equal(t, "http_user_agent", a.UserAgent) + assert.Equal(t, "host", a.Host) + assert.Equal(t, "server_name", a.ServerName) + assert.Equal(t, "remote_addr", a.RemoteAddr) + assert.Equal(t, "http_referer", a.Referer) +} + +func TestFieldAliases_Names(t *testing.T) { + a := FieldAliases{UserAgent: "ua", Host: "", ServerName: "sn", RemoteAddr: "ip", Referer: "ref"} + assert.Equal(t, []string{"ua", "sn", "ip", "ref"}, a.Names()) +} + +func TestFieldAliases_WithFallback(t *testing.T) { + base := DefaultAliases() + override := FieldAliases{Referer: "ref"} + got := override.WithFallback(base) + assert.Equal(t, "ref", got.Referer, "non-empty stays") + assert.Equal(t, "http_user_agent", got.UserAgent, "empty falls back") + assert.Equal(t, "host", got.Host) +} + +func TestDetectAliases_TextCombined(t *testing.T) { + // Standard combined format — every field uses the canonical variable. + format := `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"` + got := DetectAliases(format, false) + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Equal(t, "remote_addr", got.RemoteAddr) + assert.Equal(t, "http_referer", got.Referer) + assert.Empty(t, got.Host, "combined format has no $host") + assert.Empty(t, got.ServerName) +} + +func TestDetectAliases_TextKeyValue(t *testing.T) { + // Key=value style: $variables wrapped in literal label="..." pairs. + // gonx still names fields after the variable, not the wrapper. + format := `time="$time_iso8601" h="$host" sn="$server_name" ip="$remote_addr" ua="$http_user_agent" ref="$http_referer"` + got := DetectAliases(format, false) + assert.Equal(t, "host", got.Host) + assert.Equal(t, "server_name", got.ServerName) + assert.Equal(t, "remote_addr", got.RemoteAddr) + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Equal(t, "http_referer", got.Referer) +} + +func TestDetectAliases_TextLogfmt(t *testing.T) { + // Bare key=$var (no quotes around values). + format := `time=$time_local host=$host ua=$http_user_agent referer=$http_referer` + got := DetectAliases(format, false) + assert.Equal(t, "host", got.Host) + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Equal(t, "http_referer", got.Referer) +} + +func TestDetectAliases_TextHybridWithFallbacks(t *testing.T) { + // Hybrid format: no $remote_addr (uses XFF), no $http_referer (uses + // http_referrer typo variant). Both should be picked up via fallback. + format := `$http_x_forwarded_for [$time_local] "$request" $status "$http_referrer" "$http_user_agent" h="$host"` + got := DetectAliases(format, false) + assert.Equal(t, "http_x_forwarded_for", got.RemoteAddr, "falls back to XFF when remote_addr absent") + assert.Equal(t, "http_referrer", got.Referer, "picks up typo variant") + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Equal(t, "host", got.Host) +} + +func TestDetectAliases_TextWordBoundary(t *testing.T) { + // $http_referrer must NOT be matched by the $http_referer candidate. + // Without word boundary, regex would match the prefix and report wrong. + // refererCandidates lists "http_referer" before "http_referrer", so a + // missing word boundary would silently return "http_referer" here. + format := `... "$http_referrer" ...` + got := DetectAliases(format, false) + assert.Equal(t, "http_referrer", got.Referer) + assert.NotEqual(t, "http_referer", got.Referer, "prefix-only match would be a regex bug") +} + +func TestDetectAliases_JSONStandardKeys(t *testing.T) { + // dl.old-games.su-style: JSON keys match the nginx variable name 1:1. + format := `{"time_local":"$time_local","remote_addr":"$remote_addr","host":"$host","request_uri":"$request_uri","http_referer":"$http_referer","http_user_agent":"$http_user_agent"}` + got := DetectAliases(format, true) + assert.Equal(t, "remote_addr", got.RemoteAddr) + assert.Equal(t, "host", got.Host) + assert.Equal(t, "http_referer", got.Referer) + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Empty(t, got.ServerName, "format omits server_name") +} + +func TestDetectAliases_JSONCustomKeys(t *testing.T) { + // Operator chose short keys: "ua", "ref", "ip". Our code must follow. + format := `{"ua":"$http_user_agent","ip":"$remote_addr","ref":"$http_referer","h":"$host"}` + got := DetectAliases(format, true) + assert.Equal(t, "ua", got.UserAgent) + assert.Equal(t, "ip", got.RemoteAddr) + assert.Equal(t, "ref", got.Referer) + assert.Equal(t, "h", got.Host) +} + +func TestDetectAliases_JSONWhitespace(t *testing.T) { + // Some operators pretty-print log_format with spaces around the colon. + format := `{"http_user_agent" : "$http_user_agent", "host":"$host"}` + got := DetectAliases(format, true) + assert.Equal(t, "http_user_agent", got.UserAgent) + assert.Equal(t, "host", got.Host) +} + +func TestDetectAliases_JSONReferrerTypo(t *testing.T) { + // Operator's JSON key has the typo, nginx variable too. + format := `{"http_referrer":"$http_referrer","http_user_agent":"$http_user_agent"}` + got := DetectAliases(format, true) + assert.Equal(t, "http_referrer", got.Referer) +} + +func TestDetectAliases_JSONFallbackToXFF(t *testing.T) { + // JSON format without $remote_addr — pick XFF as the next best identity. + format := `{"client_ip":"$http_x_forwarded_for","ua":"$http_user_agent"}` + got := DetectAliases(format, true) + assert.Equal(t, "client_ip", got.RemoteAddr) +} + +func TestDetectAliases_TextRealipRemoteAddr(t *testing.T) { + // Angie/nginx realip style (mangoads.com): log_format carries only + // $realip_remote_addr, no $remote_addr. Before the fix the regex + // `\$remote_addr\b` did not match this variable (the prefix in front of + // `remote_addr` is `realip_`, not `$`), the alias stayed empty, and + // gatesrv fell back to the agent's peer IP (often 127.0.0.1). + format := `time="$time_iso8601" clientIp="$realip_remote_addr" ua="$http_user_agent"` + got := DetectAliases(format, false) + assert.Equal(t, "realip_remote_addr", got.RemoteAddr) +} + +func TestDetectAliases_TextRemoteAddrWinsOverRealip(t *testing.T) { + // When log_format carries both variables, $remote_addr must win — + // it is already post real_ip substitution, while $realip_remote_addr + // holds the pre-substitution peer (CF edge / load balancer). + format := `addr="$remote_addr" peer="$realip_remote_addr" ua="$http_user_agent"` + got := DetectAliases(format, false) + assert.Equal(t, "remote_addr", got.RemoteAddr) +} + +func TestDetectAliases_EmptyFormatGivesEmpty(t *testing.T) { + // Format with none of the candidates — every alias empty. Caller layers + // DefaultAliases or surfaces a startup warning. + got := DetectAliases(`$status $body_bytes_sent`, false) + assert.Equal(t, FieldAliases{}, got) +} + +func TestFieldAliases_String(t *testing.T) { + // Startup log relies on this exact shape — dashboards or oncall grep for + // the "ua=… host=… server=… remote=… referer=…" tokens. + full := FieldAliases{UserAgent: "ua", Host: "h", ServerName: "sn", RemoteAddr: "ip", Referer: "ref"} + assert.Equal(t, "ua=ua host=h server=sn remote=ip referer=ref", full.String()) + + assert.Equal(t, "ua=- host=- server=- remote=- referer=-", + FieldAliases{}.String(), "empty fields render as -") + + partial := FieldAliases{UserAgent: "http_user_agent", Referer: "http_referer"} + assert.Equal(t, "ua=http_user_agent host=- server=- remote=- referer=http_referer", partial.String()) +} diff --git a/internal/topsrv/botlog/config.go b/internal/topsrv/botlog/config.go index 529e2d7..dc51527 100644 --- a/internal/topsrv/botlog/config.go +++ b/internal/topsrv/botlog/config.go @@ -45,6 +45,12 @@ type Config struct { URITruncate int // max URI length stored per event; default 2048 ExtraUAPatterns []string // local additions to knownBots (substring, case-sensitive) + // FieldAliases is an explicit override for the per-format auto-detection + // of nginx field names. Set when discovery cannot infer the right name + // (e.g. operator uses `set $custom $http_referer;` in nginx config). + // Empty fields mean "fall back to auto-detected (or default) name". + FieldAliases FieldAliases + parsedBatchInterval time.Duration // populated by Validate } diff --git a/internal/topsrv/botlog/integration_test.go b/internal/topsrv/botlog/integration_test.go index 1439e44..900ffe6 100644 --- a/internal/topsrv/botlog/integration_test.go +++ b/internal/topsrv/botlog/integration_test.go @@ -97,9 +97,9 @@ func TestE2E_TailFileToIngest(t *testing.T) { logC := nginx.NewLogCollector(embedlog.Logger{}, nginx.LogConfig{ LogPaths: []string{logPath}, JSONPaths: map[string]bool{logPath: true}, - ExtractFields: RequiredFields(), + ExtractFields: RequiredFields(DefaultAliases()), }) - obs := NewObserver(p, cfg, "smoke-host", RequiredFields()) + obs := NewObserver(p, cfg, "smoke-host", RequiredFields(DefaultAliases()), DefaultAliases()) logC.AddObserver(obs) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/topsrv/botlog/observer.go b/internal/topsrv/botlog/observer.go index c87da7f..840e634 100644 --- a/internal/topsrv/botlog/observer.go +++ b/internal/topsrv/botlog/observer.go @@ -18,13 +18,15 @@ const ( fieldReferer = "http_referer" ) -// RequiredFields lists the nginx variables the LogCollector must read into -// ParsedLine.Extras for the bot-log Observer to work. These are NOT promoted -// to Prometheus labels — app.registerLogCollector merges them into -// LogConfig.ExtractFields, leaving operator-supplied ExtraLabels (low -// cardinality) as the only Prometheus label set. -func RequiredFields() []string { - return []string{fieldUserAgent, fieldHost, fieldServerName, fieldRemoteAddr, fieldReferer} +// RequiredFields returns the field names that LogCollector must read into +// ParsedLine.Extras for the bot-log Observer to work, resolved via aliases. +// Empty aliases (fields the operator's log_format doesn't carry) are dropped +// — those Event fields ship empty. These names are NOT promoted to Prometheus +// labels — app.registerLogCollector merges them into LogConfig.ExtractFields, +// leaving operator-supplied ExtraLabels (low cardinality) as the only +// Prometheus label set. +func RequiredFields(aliases FieldAliases) []string { + return aliases.Names() } // maxHostLen caps the request Host header value retained on an Event. Picked @@ -64,21 +66,34 @@ type Observer struct { // NewObserver wires an Observer against an already-constructed Pusher. // extractFields must mirror the LogCollector's ExtractFields slice — its // indices determine where each variable lands in ParsedLine.Extras. -func NewObserver(p *Pusher, cfg Config, hostname string, extractFields []string) *Observer { +// aliases is the per-format resolution of which field name carries which +// semantic value (UA, Host, etc.); empty entries collapse to idx -1 and the +// Event ships those fields empty. +func NewObserver(p *Pusher, cfg Config, hostname string, extractFields []string, aliases FieldAliases) *Observer { return &Observer{ pusher: p, hostname: hostname, uaTruncate: cfg.UATruncate, uriTruncate: cfg.URITruncate, extraPatterns: cfg.ExtraUAPatterns, - idxUA: slices.Index(extractFields, fieldUserAgent), - idxHost: slices.Index(extractFields, fieldHost), - idxServerName: slices.Index(extractFields, fieldServerName), - idxRemoteAddr: slices.Index(extractFields, fieldRemoteAddr), - idxReferer: slices.Index(extractFields, fieldReferer), + idxUA: indexOrSkip(extractFields, aliases.UserAgent), + idxHost: indexOrSkip(extractFields, aliases.Host), + idxServerName: indexOrSkip(extractFields, aliases.ServerName), + idxRemoteAddr: indexOrSkip(extractFields, aliases.RemoteAddr), + idxReferer: indexOrSkip(extractFields, aliases.Referer), } } +// indexOrSkip is slices.Index that maps "" to -1 directly, avoiding the +// false-positive where an empty alias collides with a "" stub left in +// extractFields. +func indexOrSkip(extractFields []string, name string) int { + if name == "" { + return -1 + } + return slices.Index(extractFields, name) +} + // OnLogLine satisfies nginx.LogObserver. Matches the UA first to skip Fields // construction on the ~90% non-bot traffic — the hot path is one Extras read // and one substring scan when no bot is seen. diff --git a/internal/topsrv/botlog/observer_test.go b/internal/topsrv/botlog/observer_test.go index 5656e68..cb10fbb 100644 --- a/internal/topsrv/botlog/observer_test.go +++ b/internal/topsrv/botlog/observer_test.go @@ -18,12 +18,19 @@ import ( ) func TestRequiredFieldsContents(t *testing.T) { - // RequiredFields contract: nginx variables botlog needs read into - // ParsedLine.Extras. Order is no longer load-bearing (Observer resolves - // indices at runtime), but the set must stay stable across versions. + // RequiredFields contract: with default aliases, the returned set covers + // the five canonical nginx variables botlog needs read into Extras. Order + // is not load-bearing (Observer resolves indices at runtime), but the set + // must stay stable across versions. assert.ElementsMatch(t, []string{fieldUserAgent, fieldHost, fieldServerName, fieldRemoteAddr, fieldReferer}, - RequiredFields()) + RequiredFields(DefaultAliases())) +} + +func TestRequiredFieldsHonoursAliases(t *testing.T) { + // Custom JSON keys → RequiredFields returns the aliased names. + a := FieldAliases{UserAgent: "ua", Host: "h", ServerName: "sn", RemoteAddr: "ip", Referer: "ref"} + assert.ElementsMatch(t, []string{"ua", "h", "sn", "ip", "ref"}, RequiredFields(a)) } func newObserverPair(t *testing.T) (*Observer, *Pusher) { @@ -36,9 +43,9 @@ func newObserverPair(t *testing.T) (*Observer, *Pusher) { } require.NoError(t, cfg.Validate(topsrv.PushConfig{})) p := NewPusher(embedlog.Logger{}, "topsrv-test", "test", cfg, prometheus.NewRegistry()) - // Tests use the canonical RequiredFields() order: ua, host, server_name, - // remote_addr, referer (see botParsedLine). - o := NewObserver(p, cfg, "web01", RequiredFields()) + // Tests use the canonical default aliases — ExtractFields and Observer + // indices line up with botParsedLine's Extras layout. + o := NewObserver(p, cfg, "web01", RequiredFields(DefaultAliases()), DefaultAliases()) return o, p } @@ -195,9 +202,9 @@ func TestObserver_PluggableThroughLogCollector(t *testing.T) { logC := nginx.NewLogCollector(embedlog.Logger{}, nginx.LogConfig{ LogPaths: []string{logPath}, JSONPaths: map[string]bool{logPath: true}, - ExtractFields: RequiredFields(), + ExtractFields: RequiredFields(DefaultAliases()), }) - obs := NewObserver(p, cfg, "web01", RequiredFields()) + obs := NewObserver(p, cfg, "web01", RequiredFields(DefaultAliases()), DefaultAliases()) logC.AddObserver(obs) logC.ParseJSONLine(`{"status":"200","body_bytes_sent":"100","request_time":"0.1","request_uri":"/a",` + @@ -219,7 +226,7 @@ func TestObserver_IndicesResolvedAtRuntime(t *testing.T) { // Operator labels first, botlog's required fields after — UA at index 2. extract := []string{"server_name", "http_platform", fieldUserAgent, fieldRemoteAddr, fieldReferer} - obs := NewObserver(p, cfg, "host1", extract) + obs := NewObserver(p, cfg, "host1", extract, DefaultAliases()) pl := &nginx.ParsedLine{ Status: "200", @@ -246,7 +253,7 @@ func TestObserver_MissingFieldsSafe(t *testing.T) { p := NewPusher(embedlog.Logger{}, "topsrv-test", "test", cfg, prometheus.NewRegistry()) // Only UA — no server_name / remote_addr / referer fields tailed. - obs := NewObserver(p, cfg, "host1", []string{fieldUserAgent}) + obs := NewObserver(p, cfg, "host1", []string{fieldUserAgent}, DefaultAliases()) pl := &nginx.ParsedLine{ Status: "200", URI: "/a", diff --git a/internal/topsrv/postgres/collector.go b/internal/topsrv/postgres/collector.go index 3de7323..1e078e2 100644 --- a/internal/topsrv/postgres/collector.go +++ b/internal/topsrv/postgres/collector.go @@ -18,7 +18,21 @@ const ( collectTimeout = 10 * time.Second versionPG14 = 140000 // pg_stat_wal introduced versionPG17 = 170000 // pg_stat_checkpointer, pg_stat_wal.stats_reset introduced + versionPG18 = 180000 // pg_stat_wal.wal_write_time/wal_sync_time removed (moved into pg_stat_io) settingRefreshInterval = time.Minute + + // appNamesTTL bounds how long a (queryid, application_name) pair stays + // in memory after it was last seen in pg_stat_activity. Long enough + // (1h) that a query absent from a few scrapes still has its caller + // resolved; short enough that one-shot pids/uuids don't accumulate. + appNamesTTL = time.Hour + appNamesSweepPeriod = 5 * time.Minute + + // Relation names probed by relHasColumn. Kept as named constants so a + // rename or typo can't silently disable feature detection. + relPgStatStmts = "pg_stat_statements" + relPgStatWal = "pg_stat_wal" + relPgStatActivity = "pg_stat_activity" ) // Collector collects PostgreSQL metrics via SQL queries. @@ -32,9 +46,10 @@ type Collector struct { pool *pgxpool.Pool // nil until first successful ensureReady versionNum int // server_version_num, cached database string // current_database() — scope for per-DB views (pg_stat_user_tables/indexes) - statementsTimeCol string // "total_exec_time" (PG13+) or "total_time" (PG12-) + statementsTimeCol string // "total_exec_time" (PG13+); empty iff pg_stat_statements unavailable hasWalBytes bool // pg_stat_statements has wal_bytes column hasToplevel bool // pg_stat_statements has toplevel column (PG14+) + hasWalIOTime bool // pg_stat_wal has wal_write_time/wal_sync_time (PG14..PG17) archiveEnabled bool // archive_mode is 'on' or 'always' hasActivityQID bool // pg_stat_activity.query_id column present (PG14+) @@ -42,10 +57,13 @@ type Collector struct { queryMetaMu sync.RWMutex queryMeta []QueryMeta - // queryid → set of application_names (accumulated from pg_stat_activity samples). - // Sampled by a background ticker independent of Prometheus scrape to capture short queries. + // queryid → application_name → last-seen time. Sampled by a background + // ticker independent of Prometheus scrape to catch short-lived queries. + // Entries older than appNamesTTL are pruned by a separate sweeper so a + // rotating app_name (pid/uuid suffix per process restart) does not turn + // the map into a slow memory leak. appNamesMu sync.RWMutex - appNames map[int64]map[string]bool + appNames map[int64]map[string]time.Time appSampleLastErr time.Time // rate-limit sampler error logs to 1/minute appSampleCancel context.CancelFunc appSampleDone chan struct{} @@ -166,13 +184,14 @@ func NewCollector(logger embedlog.Logger, dsn string) (*Collector, error) { cfg.ConnConfig.RuntimeParams["application_name"] = "topsrv" c := &Collector{ - Logger: logger, - cfg: cfg, - statementsTimeCol: "total_exec_time", // refined by detectFeatures once reachable - appNames: make(map[int64]map[string]bool), - prevStmts: make(map[string]stmtPrev), - histBuckets: newHistBuckets(), - settingsCache: map[string]float64{}, + Logger: logger, + cfg: cfg, + // statementsTimeCol stays "" until detectFeatures confirms pg_stat_statements + // is present with total_exec_time (PG13+/extension 1.8+). + appNames: make(map[int64]map[string]time.Time), + prevStmts: make(map[string]stmtPrev), + histBuckets: newHistBuckets(), + settingsCache: map[string]float64{}, } c.initDescriptors() return c, nil @@ -228,19 +247,27 @@ func (c *Collector) detectFeatures(ctx context.Context) error { _ = c.pool.QueryRow(ctx, "SELECT current_database()").Scan(&c.database) - // pg_stat_statements column probes (extension views aren't in information_schema). - hasCol := func(col string) bool { - var ok int - return c.pool.QueryRow(ctx, - "SELECT 1 FROM pg_attribute a JOIN pg_class c ON c.oid = a.attrelid WHERE c.relname = 'pg_stat_statements' AND a.attname = $1", col).Scan(&ok) == nil - } - if !hasCol("total_exec_time") { - c.statementsTimeCol = "total_time" + // pg_stat_statements: probe the canonical column. relHasColumn returns + // false both when the relation is absent (switchToLargestDB landed in a + // DB without CREATE EXTENSION) and when it exists but is pre-1.8 (no + // total_exec_time — pre-PG13, unsupported). statementsTimeCol == "" + // thereafter signals "skip collectStatements" everywhere. + if c.relHasColumn(ctx, relPgStatStmts, "total_exec_time") { + c.statementsTimeCol = "total_exec_time" + c.hasWalBytes = c.relHasColumn(ctx, relPgStatStmts, "wal_bytes") + c.hasToplevel = c.relHasColumn(ctx, relPgStatStmts, "toplevel") + c.Print(ctx, "postgres: pg_stat_statements detected", "database", c.database, "wal_bytes", c.hasWalBytes, "toplevel", c.hasToplevel) + } else { + c.Print(ctx, "postgres: pg_stat_statements disabled", "database", c.database, "reason", "relation or total_exec_time column missing") } - c.hasWalBytes = hasCol("wal_bytes") - c.hasToplevel = hasCol("toplevel") - if c.hasToplevel { - c.Print(ctx, "postgres: pg_stat_statements toplevel filter enabled") + + // pg_stat_wal.wal_write_time / wal_sync_time removed in PG18 (moved to + // pg_stat_io). Probe instead of version-gating — survives backports. + if c.versionNum >= versionPG14 { + c.hasWalIOTime = c.relHasColumn(ctx, relPgStatWal, "wal_write_time") + if !c.hasWalIOTime { + c.Print(ctx, "postgres: pg_stat_wal write/sync timing columns absent — wal_io_time metric will not be emitted", "version", c.versionNum) + } } // archive_mode ('on' and 'always' both produce archiver stats). @@ -253,9 +280,7 @@ func (c *Collector) detectFeatures(ctx context.Context) error { } // pg_stat_activity.query_id (PG14+) — required for the app-name sampler. - _ = c.pool.QueryRow(ctx, - `SELECT EXISTS (SELECT 1 FROM pg_attribute a JOIN pg_class c ON c.oid = a.attrelid - WHERE c.relname = 'pg_stat_activity' AND a.attname = 'query_id')`).Scan(&c.hasActivityQID) + c.hasActivityQID = c.relHasColumn(ctx, relPgStatActivity, "query_id") if c.hasActivityQID { c.startAppNamesSampler(1 * time.Second) @@ -265,6 +290,18 @@ func (c *Collector) detectFeatures(ctx context.Context) error { return nil } +// relHasColumn reports whether the given relation has the given column. It +// uses to_regclass so a missing relation simply yields false instead of an +// SQL exception — callers can probe extensions/views that may or may not +// exist in the current database. +func (c *Collector) relHasColumn(ctx context.Context, rel, col string) bool { + var ok int + err := c.pool.QueryRow(ctx, + "SELECT 1 FROM pg_attribute WHERE attrelid = to_regclass($1) AND attname = $2", + rel, col).Scan(&ok) + return err == nil +} + // Name returns a human-readable collector name. func (c *Collector) Name() string { return "postgres" } diff --git a/internal/topsrv/postgres/collector_test.go b/internal/topsrv/postgres/collector_test.go index 56478a4..fe97541 100644 --- a/internal/topsrv/postgres/collector_test.go +++ b/internal/topsrv/postgres/collector_test.go @@ -126,3 +126,37 @@ func TestEnsureReadyRetryable(t *testing.T) { assert.False(t, pg.initDone, "initDone must stay false after failures so detectFeatures runs on recovery") } + +// TestPruneAppNamesEvictsStale is the regression test for the slow memory +// leak that grew RSS by ~70MB/day on busy Postgres hosts: a never-cleared +// map[queryid][application_name] accumulated forever as processes restarted +// with new pid-suffixed app_names. pruneAppNames must drop entries older +// than appNamesTTL and clean up newly-empty queryid sub-maps. +func TestPruneAppNamesEvictsStale(t *testing.T) { + c := &Collector{appNames: make(map[int64]map[string]time.Time)} + now := time.Now() + + c.appNames[1] = map[string]time.Time{ + "fresh-app": now.Add(-time.Minute), + "stale-app": now.Add(-2 * appNamesTTL), + } + c.appNames[2] = map[string]time.Time{ + "only-stale": now.Add(-2 * appNamesTTL), + } + c.appNames[3] = map[string]time.Time{ + "only-fresh": now, + } + + c.pruneAppNames(now) + + assert.Len(t, c.appNames[1], 1, "qid=1 keeps the fresh app_name only") + _, hasStale := c.appNames[1]["stale-app"] + assert.False(t, hasStale, "stale entry must be dropped") + _, hasFresh := c.appNames[1]["fresh-app"] + assert.True(t, hasFresh, "fresh entry must survive") + + _, qid2Present := c.appNames[2] + assert.False(t, qid2Present, "qid=2 had only stale entries — sub-map dropped entirely") + + assert.Len(t, c.appNames[3], 1, "qid=3 untouched") +} diff --git a/internal/topsrv/postgres/replication.go b/internal/topsrv/postgres/replication.go index 495cfb4..1e49115 100644 --- a/internal/topsrv/postgres/replication.go +++ b/internal/topsrv/postgres/replication.go @@ -69,27 +69,35 @@ func (c *Collector) collectWAL(ctx context.Context, ch chan<- prometheus.Metric) } } -// collectStatWAL emits pg_stat_wal metrics (PG14+). wal_bytes is intentionally skipped — -// it has different semantics than topsrv_pg_wal_bytes (LSN position from pg_current_wal_lsn). +// collectStatWAL emits pg_stat_wal metrics (PG14+). wal_bytes is intentionally +// skipped — it has different semantics than topsrv_pg_wal_bytes (LSN position +// from pg_current_wal_lsn). PG18 removed wal_write_time/wal_sync_time from +// pg_stat_wal (the timings moved to pg_stat_io); on those versions the +// wal_io_time metric is skipped instead of breaking the whole query. func (c *Collector) collectStatWAL(ctx context.Context, ch chan<- prometheus.Metric) { if c.versionNum < versionPG14 { return } var records, fpi, buffersFull int64 var writeTime, syncTime float64 - err := c.pool.QueryRow(ctx, `SELECT wal_records, wal_fpi, wal_buffers_full, - wal_write_time, wal_sync_time - FROM pg_stat_wal`). - Scan(&records, &fpi, &buffersFull, &writeTime, &syncTime) - if err != nil { + q := `SELECT wal_records, wal_fpi, wal_buffers_full` + scanArgs := []any{&records, &fpi, &buffersFull} + if c.hasWalIOTime { + q += `, wal_write_time, wal_sync_time` + scanArgs = append(scanArgs, &writeTime, &syncTime) + } + q += ` FROM pg_stat_wal` + if err := c.pool.QueryRow(ctx, q).Scan(scanArgs...); err != nil { c.queryWarn("stat_wal", err) return } ch <- prometheus.MustNewConstMetric(c.statWalRecords, prometheus.CounterValue, float64(records)) ch <- prometheus.MustNewConstMetric(c.statWalFpi, prometheus.CounterValue, float64(fpi)) ch <- prometheus.MustNewConstMetric(c.statWalBuffersFull, prometheus.CounterValue, float64(buffersFull)) - ch <- prometheus.MustNewConstMetric(c.statWalIoTime, prometheus.CounterValue, writeTime*msToSec, "write") - ch <- prometheus.MustNewConstMetric(c.statWalIoTime, prometheus.CounterValue, syncTime*msToSec, "sync") + if c.hasWalIOTime { + ch <- prometheus.MustNewConstMetric(c.statWalIoTime, prometheus.CounterValue, writeTime*msToSec, "write") + ch <- prometheus.MustNewConstMetric(c.statWalIoTime, prometheus.CounterValue, syncTime*msToSec, "sync") + } } // collectArchiver emits pg_stat_archiver metrics. NULL last_*_time => metric is not emitted diff --git a/internal/topsrv/postgres/statements.go b/internal/topsrv/postgres/statements.go index 901ac80..e9b7198 100644 --- a/internal/topsrv/postgres/statements.go +++ b/internal/topsrv/postgres/statements.go @@ -71,10 +71,10 @@ func newHistBuckets() map[float64]uint64 { func (c *Collector) collectStatements(ctx context.Context, ch chan<- prometheus.Metric) { // appName mapping is maintained by a background ticker (startAppNamesSampler). - // pg_stat_statements 1.8+ (PG13+) renamed total_time → total_exec_time - timeCol := "total_exec_time" - if c.statementsTimeCol != "" { - timeCol = c.statementsTimeCol + // Extension absent in the current database (or pre-1.8) — emit nothing + // rather than fire a query that will 42P01/42703 every scrape. + if c.statementsTimeCol == "" { + return } // PG17 (pg_stat_statements 1.11) renamed blk_read_time → shared_blk_read_time, blk_write_time → shared_blk_write_time @@ -83,16 +83,8 @@ func (c *Collector) collectStatements(ctx context.Context, ch chan<- prometheus. blkReadCol, blkWriteCol = "shared_blk_read_time", "shared_blk_write_time" } - // pg_stat_statements 1.8+ (PG13+) renamed min_time → min_exec_time (same as total_time → total_exec_time). - minCol, maxCol := "min_exec_time", "max_exec_time" - if timeCol == "total_time" { - minCol, maxCol = "min_time", "max_time" - } - r := strings.NewReplacer( - "{time_col}", timeCol, - "{min_col}", minCol, - "{max_col}", maxCol, + "{time_col}", c.statementsTimeCol, "{blk_read_col}", blkReadCol, "{blk_write_col}", blkWriteCol, ) @@ -118,7 +110,7 @@ func (c *Collector) collectStatements(ctx context.Context, ch chan<- prometheus. sum(s.shared_blks_hit), sum(s.shared_blks_read), sum(s.shared_blks_dirtied), sum(s.{blk_read_col}), sum(s.{blk_write_col}), sum(s.temp_blks_read), sum(s.temp_blks_written), - min(s.{min_col}), max(s.{max_col})` + walBytesCol + ` + min(s.min_exec_time), max(s.max_exec_time)` + walBytesCol + ` FROM pg_stat_statements s JOIN pg_database d ON d.oid = s.dbid WHERE s.userid != 0` + toplevelFilter + ` @@ -144,7 +136,7 @@ func (c *Collector) collectStatements(ctx context.Context, ch chan<- prometheus. } topMeta := c.selectTopStatements(all, topMetaN) - c.collectQueryMeta(ctx, topMeta, timeCol) + c.collectQueryMeta(ctx, topMeta) } // scanStatements reads all pg_stat_statements rows, computes deltas against previous snapshot, @@ -244,7 +236,7 @@ func (c *Collector) addToBuckets(sec float64) { } // collectQueryMeta fetches full query texts for top queries and stores them for push to gatesrv. -func (c *Collector) collectQueryMeta(ctx context.Context, topSet map[string]bool, timeCol string) { +func (c *Collector) collectQueryMeta(ctx context.Context, topSet map[string]bool) { if len(topSet) == 0 { return } @@ -254,13 +246,12 @@ func (c *Collector) collectQueryMeta(ctx context.Context, topSet map[string]bool toplevelFilter = " AND s.toplevel" } - q := strings.NewReplacer("{time_col}", timeCol).Replace( - `SELECT s.dbid::text || ':' || s.queryid::text, + q := `SELECT s.dbid::text || ':' || s.queryid::text, s.queryid::text, d.datname, s.query FROM pg_stat_statements s JOIN pg_database d ON d.oid = s.dbid WHERE s.userid != 0` + toplevelFilter + ` - ORDER BY s.{time_col} DESC`) + ORDER BY s.` + c.statementsTimeCol + ` DESC` rows, err := c.pool.Query(ctx, q) if err != nil { return @@ -306,6 +297,7 @@ func (c *Collector) sampleAppNames(ctx context.Context) { } defer rows.Close() + now := time.Now() c.appNamesMu.Lock() defer c.appNamesMu.Unlock() for rows.Next() { @@ -315,30 +307,55 @@ func (c *Collector) sampleAppNames(ctx context.Context) { continue } if c.appNames[qid] == nil { - c.appNames[qid] = make(map[string]bool) + c.appNames[qid] = make(map[string]time.Time) + } + c.appNames[qid][app] = now + } +} + +// pruneAppNames drops (queryid, application_name) pairs older than +// appNamesTTL. Runs on its own ticker (appNamesSweepPeriod) so a quiet +// queryid can't pin app_names in memory forever. +func (c *Collector) pruneAppNames(now time.Time) { + cutoff := now.Add(-appNamesTTL) + c.appNamesMu.Lock() + defer c.appNamesMu.Unlock() + for qid, apps := range c.appNames { + for app, seen := range apps { + if seen.Before(cutoff) { + delete(apps, app) + } + } + if len(apps) == 0 { + delete(c.appNames, qid) } - c.appNames[qid][app] = true } } // startAppNamesSampler runs sampleAppNames on an independent ticker so short-lived queries -// get captured between Prometheus scrapes. Cancelled via Close(). +// get captured between Prometheus scrapes, and a slower ticker prunes +// (queryid, application_name) pairs whose lastSeen is older than appNamesTTL. +// Cancelled via Close(). func (c *Collector) startAppNamesSampler(interval time.Duration) { var ctx context.Context ctx, c.appSampleCancel = context.WithCancel(context.Background()) c.appSampleDone = make(chan struct{}) go func() { defer close(c.appSampleDone) - t := time.NewTicker(interval) - defer t.Stop() + sampleT := time.NewTicker(interval) + defer sampleT.Stop() + pruneT := time.NewTicker(appNamesSweepPeriod) + defer pruneT.Stop() for { select { case <-ctx.Done(): return - case <-t.C: + case <-sampleT.C: sampleCtx, cancel := context.WithTimeout(ctx, 2*time.Second) c.sampleAppNames(sampleCtx) cancel() + case <-pruneT.C: + c.pruneAppNames(time.Now()) } } }() diff --git a/internal/topsrv/updater.go b/internal/topsrv/updater.go index f89e8c7..0c4390d 100644 --- a/internal/topsrv/updater.go +++ b/internal/topsrv/updater.go @@ -32,6 +32,19 @@ const ( updateMaxBackups = 5 updateExitCode = 42 updateStateFile = "update-state.json" + backupPrefix = "topsrv-" + + // updateCrashWindow bounds how long after an update we treat restarts + // as potentially crash-loop-relevant. After it, the counter is reset. + updateCrashWindow = 5 * time.Minute + + // updateStableThreshold is how long the new binary must run before we + // declare it stable and zero RestartCount. Picked so a fast-crashing + // binary cannot beat the threshold even with k8s/systemd restart-backoff. + updateStableThreshold = 60 * time.Second + + // updateMaxRestarts is the crash count that triggers a rollback. + updateMaxRestarts = 3 ) // UpdateConfig contains auto-update settings. @@ -56,6 +69,10 @@ type updateState struct { PreviousVersion string `json:"previous_version"` CurrentVersion string `json:"current_version"` RestartCount int `json:"restart_count"` + // Graceful is set on ctx.Done so the next checkRollback skips the + // crash increment. Cleared on every checkRollback so a subsequent + // real crash still counts. + Graceful bool `json:"graceful,omitempty"` } // Updater checks for new agent versions and performs self-update. @@ -131,8 +148,18 @@ func deriveUpdateEndpoint(pushEndpoint string) string { } // Run starts the update check loop. Blocks until ctx is cancelled. +// +// Crash-loop bookkeeping has two complementary signals: +// - markStable, called after the binary has been alive for +// updateStableThreshold, zeroes RestartCount so a healthy process +// accumulates nothing. +// - markGraceful, called on ctx.Done, lets the next start skip the +// RestartCount bump — manual / supervised restarts are not crashes. func (u *Updater) Run(ctx context.Context) { u.checkRollback() + defer u.markGracefulIfCancelled(ctx) + + go u.markStableAfter(ctx, updateStableThreshold) jitter := time.Duration(rand.IntN(60)) * time.Second u.Printf("update: started, endpoint=%s, interval=%s, jitter=%s", u.endpoint, u.interval, jitter) @@ -159,6 +186,50 @@ func (u *Updater) Run(ctx context.Context) { } } +// markStableAfter zeroes RestartCount once the new binary survives the +// threshold. Cancelled if ctx fires first — a fast graceful shutdown should +// not be treated as proof of stability. +func (u *Updater) markStableAfter(ctx context.Context, after time.Duration) { + select { + case <-time.After(after): + case <-ctx.Done(): + return + } + state, err := u.loadState() + if err != nil { + return + } + if state.RestartCount == 0 { + return + } + state.RestartCount = 0 + u.saveState(state) + u.Printf("update: stable after %s, restart counter reset", after) +} + +// markGracefulIfCancelled writes the Graceful flag only when ctx has +// fired — i.e. SIGTERM / systemctl stop / k8s drain. A panic-driven exit +// leaves ctx un-cancelled and must not mask itself as a clean shutdown. +func (u *Updater) markGracefulIfCancelled(ctx context.Context) { + if ctx.Err() == nil { + return + } + u.markGraceful() +} + +// markGraceful records that this process is exiting via ctx.Done — the +// next start of checkRollback skips its increment. Best-effort: loadState +// errors mean there is no post-update state to mark. +func (u *Updater) markGraceful() { + state, err := u.loadState() + if err != nil || state.Graceful { + return + } + state.Graceful = true + u.saveState(state) + u.Printf("update: graceful shutdown marked") +} + func (u *Updater) check(ctx context.Context) { resp, err := u.fetchUpdate(ctx) if err != nil { @@ -373,7 +444,7 @@ func (u *Updater) backup() error { return err } - dest := filepath.Join(u.backupDir, "topsrv-"+u.version) + dest := filepath.Join(u.backupDir, backupPrefix+u.version) src, err := os.Open(u.binPath) if err != nil { return err @@ -443,7 +514,7 @@ func compareVersionedNames(a, b string) int { // extractVersion extracts version string from backup filename like "topsrv-0.0.9" or "topsrv-v0.0.9". func extractVersion(name string) string { - _, after, ok := strings.Cut(name, "topsrv-") + _, after, ok := strings.Cut(name, backupPrefix) if !ok { return "" } @@ -477,43 +548,69 @@ func (u *Updater) replace(newBinPath string) error { return os.Rename(tmpPath, u.binPath) } -// checkRollback detects crash-loop after update and rolls back. +// checkRollback rolls back to PreviousVersion on a crash-loop. The Graceful +// flag and markStable keep supervised / healthy restarts from counting. func (u *Updater) checkRollback() { state, err := u.loadState() if err != nil { return } - if time.Since(state.LastUpdate) < 5*time.Minute { - state.RestartCount++ - u.saveState(state) - - if state.RestartCount >= 3 { - u.Printf("update: crash-loop detected (%d restarts in %s), rolling back to %s", - state.RestartCount, time.Since(state.LastUpdate).Round(time.Second), state.PreviousVersion) - u.rollback(state.PreviousVersion) + // Outside the post-update window — nothing to track; tidy the counter + // if a previous run left one behind. + if time.Since(state.LastUpdate) >= updateCrashWindow { + if state.RestartCount != 0 || state.Graceful { + state.RestartCount = 0 + state.Graceful = false + u.saveState(state) } - } else if state.RestartCount > 0 { - // Stable — reset counter. - state.RestartCount = 0 + return + } + + // Previous shutdown was graceful — clear the flag and skip the bump. + // Always clear so the next start, if it crashes, is counted normally. + if state.Graceful { + state.Graceful = false u.saveState(state) + return + } + + state.RestartCount++ + u.saveState(state) + + if state.RestartCount >= updateMaxRestarts { + u.Printf("update: crash-loop detected (%d restarts in %s), rolling back to %s", + state.RestartCount, time.Since(state.LastUpdate).Round(time.Second), state.PreviousVersion) + u.rollback(state.PreviousVersion) } } func (u *Updater) rollback(version string) { - backupPath := filepath.Join(u.backupDir, "topsrv-"+version) - if _, err := os.Stat(backupPath); err != nil { - u.Errorf("update: rollback failed, backup not found: %s", backupPath) + if err := u.attemptRollback(version); err != nil { + u.Errorf("update: rollback failed: %v", err) return } + u.Printf("update: rolled back to %s, restarting", version) + os.Exit(updateExitCode) +} +// attemptRollback does the rollback work without os.Exit so the success +// path is unit-testable. On success the post-update window is closed so +// the supervisor's restart after our os.Exit cannot trip checkRollback +// into another rollback of the same binary — version history is left in +// place for post-mortem. +func (u *Updater) attemptRollback(version string) error { + backupPath := filepath.Join(u.backupDir, backupPrefix+version) if err := u.replace(backupPath); err != nil { - u.Errorf("update: rollback replace failed: %v", err) - return + return fmt.Errorf("replace %s: %w", backupPath, err) } - u.Printf("update: rolled back to %s, restarting", version) - os.Exit(updateExitCode) + state, _ := u.loadState() + state.LastUpdate = time.Time{} + state.RestartCount = 0 + state.Graceful = false + u.saveState(state) + return nil } func (u *Updater) loadState() (updateState, error) { diff --git a/internal/topsrv/updater_test.go b/internal/topsrv/updater_test.go index 4042fa1..b29e69b 100644 --- a/internal/topsrv/updater_test.go +++ b/internal/topsrv/updater_test.go @@ -3,6 +3,7 @@ package topsrv import ( "archive/tar" "compress/gzip" + "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -187,6 +188,217 @@ func TestStateLoadMissing(t *testing.T) { assert.Error(t, err) } +// TestCheckRollback_GracefulSkipsIncrement is the regression test for the +// "ручные рестарты считаются как crash" bug. SIGTERM-driven shutdowns mark +// the state graceful so the next start does not bump the counter. +func TestCheckRollback_GracefulSkipsIncrement(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{ + LastUpdate: time.Now(), + RestartCount: 2, + Graceful: true, + }) + + u.checkRollback() + + got, err := u.loadState() + require.NoError(t, err) + assert.Equal(t, 2, got.RestartCount, "graceful exit must not bump RestartCount") + assert.False(t, got.Graceful, "flag is consumed so a later real crash counts") +} + +func TestCheckRollback_CrashWithinWindowIncrements(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{ + LastUpdate: time.Now(), + RestartCount: 1, + }) + + u.checkRollback() + + got, err := u.loadState() + require.NoError(t, err) + assert.Equal(t, 2, got.RestartCount) +} + +func TestCheckRollback_OutsideWindowResets(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{ + LastUpdate: time.Now().Add(-2 * updateCrashWindow), + PreviousVersion: "1.0.0", + RestartCount: 2, + Graceful: true, + }) + + u.checkRollback() + + got, err := u.loadState() + require.NoError(t, err) + assert.Equal(t, 0, got.RestartCount, "stale counter is cleared") + assert.False(t, got.Graceful, "stale graceful flag is cleared") + assert.Equal(t, "1.0.0", got.PreviousVersion, "other fields are preserved") +} + +func TestCheckRollback_MissingStateNoop(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + assert.NotPanics(t, u.checkRollback, "missing state file must be tolerated") + _, err := u.loadState() + assert.Error(t, err, "checkRollback must not create state when there isn't one") +} + +// TestCheckRollback_ThresholdAttemptsRollback drives the counter to the +// threshold and verifies rollback is attempted. Backup is intentionally +// absent so rollback exits via Errorf before reaching os.Exit — the +// observable effect is that the counter reached updateMaxRestarts. +func TestCheckRollback_ThresholdAttemptsRollback(t *testing.T) { + tmp := t.TempDir() + u := &Updater{ + stateDir: tmp, + backupDir: filepath.Join(tmp, "missing-backups"), + } + u.saveState(updateState{ + LastUpdate: time.Now(), + RestartCount: updateMaxRestarts - 1, + PreviousVersion: "1.0.0", + }) + + u.checkRollback() + + got, err := u.loadState() + require.NoError(t, err) + assert.GreaterOrEqual(t, got.RestartCount, updateMaxRestarts, + "counter must reach the threshold so the rollback branch is taken") +} + +func TestMarkGracefulSetsFlag(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{LastUpdate: time.Now(), RestartCount: 1}) + + u.markGraceful() + + got, err := u.loadState() + require.NoError(t, err) + assert.True(t, got.Graceful) + assert.Equal(t, 1, got.RestartCount, "markGraceful must not touch the counter") +} + +// TestMarkGracefulIfCancelled_PanicLeavesFlagFalse is the panic-safety +// regression: Run's defer must distinguish "ctx cancelled (real graceful +// shutdown)" from "panic unwinding (real crash)". Without the ctx.Err() +// guard, a panic in Run would write Graceful=true and mask itself from +// crash-loop detection on the next start. +func TestMarkGracefulIfCancelled_PanicLeavesFlagFalse(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{LastUpdate: time.Now()}) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + // Live ctx — simulates panic-unwinding path where defer fires but ctx + // was never cancelled. Must NOT mark graceful. + u.markGracefulIfCancelled(ctx) + got, err := u.loadState() + require.NoError(t, err) + assert.False(t, got.Graceful, "live ctx must not mark graceful (panic case)") + + // Now cancel and call again — must mark graceful. + cancel() + u.markGracefulIfCancelled(ctx) + got, err = u.loadState() + require.NoError(t, err) + assert.True(t, got.Graceful, "cancelled ctx marks graceful") +} + +func TestAttemptRollback_MissingBackup(t *testing.T) { + tmp := t.TempDir() + u := &Updater{ + stateDir: tmp, + backupDir: filepath.Join(tmp, "missing"), + } + u.saveState(updateState{LastUpdate: time.Now(), RestartCount: updateMaxRestarts}) + + err := u.attemptRollback("1.0.0") + require.ErrorIs(t, err, os.ErrNotExist, "missing backup must surface as not-exist for callers") + + got, err := u.loadState() + require.NoError(t, err) + assert.Equal(t, updateMaxRestarts, got.RestartCount, + "failed rollback must not touch state — operator needs the original counters for diagnosis") +} + +func TestAttemptRollback_SuccessClosesWindow(t *testing.T) { + tmp := t.TempDir() + binPath := filepath.Join(tmp, "topsrv") + require.NoError(t, os.WriteFile(binPath, []byte("current"), 0o755)) + backupDir := filepath.Join(tmp, "backups") + require.NoError(t, os.MkdirAll(backupDir, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(backupDir, "topsrv-1.0.0"), []byte("previous"), 0o755)) + + u := &Updater{ + stateDir: tmp, + backupDir: backupDir, + binPath: binPath, + } + u.saveState(updateState{ + LastUpdate: time.Now(), + PreviousVersion: "1.0.0", + CurrentVersion: "1.1.0", + RestartCount: updateMaxRestarts, + Graceful: true, + }) + + require.NoError(t, u.attemptRollback("1.0.0")) + + // Binary replaced. + data, err := os.ReadFile(binPath) + require.NoError(t, err) + assert.Equal(t, "previous", string(data)) + + // State cleared enough to keep checkRollback out of the window on the + // next supervisor restart, but version history is preserved for + // post-mortem. + got, err := u.loadState() + require.NoError(t, err) + assert.True(t, got.LastUpdate.IsZero(), "window closed") + assert.Equal(t, 0, got.RestartCount) + assert.False(t, got.Graceful) + assert.Equal(t, "1.0.0", got.PreviousVersion, "version history preserved") + assert.Equal(t, "1.1.0", got.CurrentVersion) +} + +func TestMarkStableAfterResetsCounter(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{LastUpdate: time.Now(), RestartCount: 2}) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + done := make(chan struct{}) + go func() { + u.markStableAfter(ctx, 10*time.Millisecond) + close(done) + }() + <-done + + got, err := u.loadState() + require.NoError(t, err) + assert.Equal(t, 0, got.RestartCount, "uptime threshold resets counter") +} + +func TestMarkStableAfterCancelled(t *testing.T) { + u := &Updater{stateDir: t.TempDir()} + u.saveState(updateState{LastUpdate: time.Now(), RestartCount: 2}) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + u.markStableAfter(ctx, time.Hour) // returns immediately via ctx + + got, err := u.loadState() + require.NoError(t, err) + assert.Equal(t, 2, got.RestartCount, "cancelled threshold must not reset") +} + func TestCompareVersionedNames(t *testing.T) { tests := []struct { a, b string