diff --git a/README.md b/README.md index 9fdc9e5..efcea34 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ That's it. System, disk, network, netstat, process, and S.M.A.R.T. metrics are c | **S.M.A.R.T.** | Disk health (ATA attributes, NVMe health log, temperature, wear, errors) | smartctl_exporter | | **SSL Certificates** | Certificate expiry monitoring (auto-discovered from nginx/angie config) | — | | **Bot-logs** *(opt-in)* | Ships UA-classified bot events from nginx access logs to topsrv.io as gzipped ndjson with disk-backed WAL. 38 families: global/RU/Asian search, AI 2026 crawlers, SEO tools, social link previews, archive | — | +| **Packages** | Installed-package inventory: dpkg/rpm/apk parsed pure-Go (no shell-out). Aggregates on `/metrics`; full snapshot (NEVRA, vendor, GPG key, signature digest, modularityLabel, autoInstalled, repoOrigin, licenses) pushed to `/v1/inventory` for CVE matching | apt-prom-exporter / pkg-exporter | ## Auto-discovery @@ -143,6 +144,16 @@ Channel = "stable" # stable / beta # Disabled = true # set to disable # Interval = "5m" +# Package inventory (optional — auto-discovery detects dpkg/rpm/apk via files). +# On /metrics only aggregates are exposed (counts, scan duration, error counter). +# Full per-package snapshot is pushed to /v1/inventory with kind="packages". +# [Packages] +# Disabled = false # set to fully skip the collector +# Interval = "6h" # snapshot scan period (±10% jitter to avoid herds) +# Managers = [] # auto-detect by default; e.g. ["dpkg","rpm"] to force a subset +# DisablePush = false # set to skip POSTing snapshots to /v1/inventory (keep /metrics only) +# MaxPackages = 10000 # safety cap; logs a warning and truncates if exceeded + # Bot-logs (optional — ships UA-classified bot events to topsrv.io for analytics) # [BotLogs] # Enabled = true @@ -184,6 +195,11 @@ Channel = "stable" # stable / beta | `Angie.AccessLogs` | `[]` | Paths to access log files | | `Smart.Disabled` | `false` | Disable S.M.A.R.T. collector | | `Smart.Interval` | `5m` | S.M.A.R.T. polling interval | +| `Packages.Disabled` | `false` | Disable package inventory collector | +| `Packages.Interval` | `6h` | Inventory scan period (±10% jitter); pushed to `/v1/inventory` | +| `Packages.Managers` | `[]` | Force subset of managers (`dpkg`/`rpm`/`apk`); empty = auto-detect | +| `Packages.DisablePush` | `false` | Skip POSTing snapshots to `/v1/inventory` (keeps `/metrics` aggregates) | +| `Packages.MaxPackages` | `10000` | Truncate snapshot if exceeded (logs a warning) | | `BotLogs.Enabled` | `false` | Ship UA-classified bot events to topsrv.io | | `BotLogs.Token` | — | Bot-logs ingest bearer token (separate from `Push.Token`) | | `BotLogs.Endpoint` | derived | Ingest URL; defaults to `[Push].Endpoint` with `/v1/bot-logs` path | diff --git a/cfg/local.toml.dist b/cfg/local.toml.dist index e3f2434..594c74d 100644 --- a/cfg/local.toml.dist +++ b/cfg/local.toml.dist @@ -32,6 +32,17 @@ Channel = "stable" # stable / beta # ExtraLabels = ["server_name"] # AccessLogs = ["/var/log/angie/access.log"] +# Package inventory (optional — auto-discovery detects dpkg/rpm/apk via files). +# Sends a periodic snapshot to /v1/inventory with kind="packages". +# On /metrics only aggregates are exposed (counts, upgradable, scan duration). +# [Packages] +# Disabled = false # set true to fully skip the collector +# Interval = "6h" # snapshot scan period (±10% jitter to avoid herds) +# Managers = [] # auto-detect by default; e.g. ["dpkg","rpm"] to force a subset +# DisablePush = false # set true to skip POSTing snapshots to /v1/inventory (kept on /metrics aggregates) +# MaxPackages = 10000 # safety cap; logs a warning and truncates if exceeded +# CheckUpgrades = false # (Phase 4, not wired yet) — will populate topsrv_packages_upgradable from local apt/dnf cache + # Bot-logs ingest to topsrv.io (optional — ships bot traffic events for analytics). # When enabled, botlog reads four nginx variables (http_user_agent, server_name, # remote_addr, http_referer) from each access-log line. These are NOT added to diff --git a/docs/metrics.md b/docs/metrics.md index 46dda6f..268d339 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -313,6 +313,20 @@ Alerting guidance: - `rate(topsrv_botlog_events_total{state="dropped", reason="spool_evict"}[5m]) > 0` — WAL budget exceeded; raise `MaxSpoolMB` or investigate why receiver isn't catching up - `rate(topsrv_botlog_events_total{state="sent"}[5m]) == 0` while `match_total` grows — pipeline stuck between observer and ingest +## Packages (`topsrv_packages_*`) + +Installed-package inventory by manager (dpkg / rpm / apk). The full per-package snapshot is NOT exposed here — it ships to gatesrv via `/v1/inventory` (see `docs/packages-collector-implementation.md`). On `/metrics` only low-cardinality aggregates are kept (manager × small set of gauges ⇒ 5–8 series per host). + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `topsrv_packages_installed` | gauge | manager | Number of installed packages by manager | +| `topsrv_packages_held` | gauge | manager | Packages held back (dpkg hold / dnf versionlock) | +| `topsrv_packages_scan_duration_seconds` | gauge | manager | Duration of the last inventory scan. Alert: `> 5s` on hosts with <5000 packages = degraded disk or BDB mid-write | +| `topsrv_packages_scan_errors_total` | counter | manager | Cumulative scan failures by manager. Alert: `increase(... [1h]) > 0` = parser/permission/IO error — check logs | +| `topsrv_packages_last_scan_timestamp_seconds` | gauge | manager | Unix ts of last successful scan. Alert: `time() - X > 12h` (Interval=6h default + 2× slack) = collector stuck | +| `topsrv_packages_last_push_timestamp_seconds` | gauge | kind | Unix ts of last successful inventory push to `/v1/inventory`, by kind (`packages`, future `repos`/`packageHistory`). Alert: `time() - X > 24h` = gatesrv unreachable | +| `topsrv_packages_manager_info` | gauge | manager, os_id, os_version | Always 1 — labels carry distro context for joining with other metrics | + ## Self-monitoring (`topsrv_collector_*`) Per-collector instrumentation. Any collector registered via `addCollector` is wrapped to record its last scrape duration and recover from panics without breaking `/metrics`. diff --git a/go.mod b/go.mod index 591bc1f..8e44d3c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.6 require ( github.com/BurntSushi/toml v1.6.0 github.com/anatol/smart.go v0.0.0-20260213010714-b8c428e1f7f2 + github.com/anchore/go-rpmdb v0.1.0 github.com/jackc/pgx/v5 v5.9.1 github.com/namsral/flag v1.7.4-pre github.com/nxadm/tail v1.4.11 @@ -16,17 +17,20 @@ require ( github.com/stretchr/testify v1.11.1 github.com/vmkteam/appkit v0.1.2 github.com/vmkteam/embedlog v0.1.3 + modernc.org/sqlite v1.50.1 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.10.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/getsentry/sentry-go v0.35.3 // indirect github.com/getsentry/sentry-go/echo v0.35.3 // indirect github.com/go-ole/go-ole v1.2.6 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect @@ -38,9 +42,11 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/prometheus/procfs v0.17.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/smartystreets/goconvey v1.8.1 // indirect github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect @@ -50,10 +56,14 @@ require ( go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/net v0.48.0 // indirect - golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.41.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.33.0 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/protobuf v1.36.9 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/libc v1.72.3 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 23d3aab..f2f6196 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/anatol/smart.go v0.0.0-20260213010714-b8c428e1f7f2 h1:J6/IHsKLFywW0BQ github.com/anatol/smart.go v0.0.0-20260213010714-b8c428e1f7f2/go.mod h1:d/4DjXysOOORPHYnOZnExlH4BTIEPGNwDt1727Wnpfg= github.com/anatol/vmtest v0.0.0-20250627153117-302402d269a6 h1:zdaWj/ncXyzpPH3YqACvJXMrJxkkILrnWbjHojHBctc= github.com/anatol/vmtest v0.0.0-20250627153117-302402d269a6/go.mod h1:m5pN88x7ZnEDGXZldwg7RCX+EikR9qz/iSI2GzXq++Y= +github.com/anchore/go-rpmdb v0.1.0 h1:Q8dc208/HYzCqhx0L1zurfm1UPil24hlo9NjkdFmLdE= +github.com/anchore/go-rpmdb v0.1.0/go.mod h1:eQVa6QFGzKy0qMcnW2pez0XBczvgwSjw9vA23qifEyU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -11,6 +13,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ebitengine/purego v0.10.0 h1:QIw4xfpWT6GWTzaW5XEKy3HXoqrJGx1ijYHzTF0/ISU= github.com/ebitengine/purego v0.10.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -19,6 +23,8 @@ github.com/getsentry/sentry-go v0.35.3 h1:u5IJaEqZyPdWqe/hKlBKBBnMTSxB/HenCqF3QL github.com/getsentry/sentry-go v0.35.3/go.mod h1:mdL49ixwT2yi57k5eh7mpnDyPybixPzlzEJFu0Z76QA= github.com/getsentry/sentry-go/echo v0.35.3 h1:aJ0e4kGuH7T1ggAd3LOYwAyQV0bq37AX36vNPr6JYnM= github.com/getsentry/sentry-go/echo v0.35.3/go.mod h1:zQn5wNGqJUwIlA6z/pi7CFeXiUGrWkzue28C0Mfbz/Q= +github.com/glebarez/go-sqlite v1.20.3 h1:89BkqGOXR9oRmG58ZrzgoY/Fhy5x0M+/WV48U5zVrZ4= +github.com/glebarez/go-sqlite v1.20.3/go.mod h1:u3N6D/wftiAzIOJtZl6BmedqxmmkDfH3q+ihjqxC9u0= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= @@ -26,8 +32,14 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -64,6 +76,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/namsral/flag v1.7.4-pre h1:b2ScHhoCUkbsq0d2C15Mv+VU8bl8hAXV8arnWiOHNZs= github.com/namsral/flag v1.7.4-pre/go.mod h1:OXldTctbM6SWH1K899kPZcf65KxJiD7MsceFUpB5yDo= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= @@ -82,6 +96,8 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/satyrius/gonx v1.4.0 h1:F3uxif5Yx6FBzdQAh79bHQK6CTJugOcN0w0Z8azQuQg= @@ -119,19 +135,25 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -142,3 +164,31 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY= +modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI= +modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ= +modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU= +modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg= +modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.50.1 h1:l+cQvn0sd0zJJtfygGHuQJ5AjlrwXmWPw4KP3ZMwr9w= +modernc.org/sqlite v1.50.1/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/internal/app/app.go b/internal/app/app.go index e43b2a1..fc352ed 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -17,6 +17,7 @@ import ( "github.com/vmkteam/topsrv/internal/topsrv/angie" "github.com/vmkteam/topsrv/internal/topsrv/botlog" "github.com/vmkteam/topsrv/internal/topsrv/nginx" + "github.com/vmkteam/topsrv/internal/topsrv/packages" "github.com/vmkteam/topsrv/internal/topsrv/postgres" "github.com/vmkteam/topsrv/internal/topsrv/smart" @@ -36,6 +37,7 @@ type Config struct { Angie *AngieConfig `toml:"Angie,omitempty"` Smart *smart.Config `toml:"Smart,omitempty"` BotLogs *botlog.Config `toml:"BotLogs,omitempty"` + Packages *packages.Config `toml:"Packages,omitempty"` } type ServerConfig struct { @@ -265,6 +267,9 @@ func (a *App) registerCollectors(ctx context.Context, services []topsrv.Service) // S.M.A.R.T. disk monitoring — always enabled, [Smart] overrides interval. a.registerSmart(ctx) + + // Package inventory — opt-out, [Packages].Disabled=true to skip. + a.registerPackages(ctx) } // discoverAccessLogs extracts access logs from a DiscoverResult and returns a LogConfig. @@ -619,6 +624,27 @@ func (a *App) registerSmart(ctx context.Context) { a.goBackground(func() { c.Run(ctx) }) } +func (a *App) registerPackages(ctx context.Context) { + cfg := a.cfg.Packages + if cfg != nil && cfg.Disabled { + return + } + if cfg == nil { + cfg = &packages.Config{} + } + if err := cfg.Validate(); err != nil { + a.Error(ctx, "packages: invalid config", "error", err) + return + } + c := packages.NewCollector(a.Logger, *cfg) + a.addCollector(c) + a.goBackground(func() { c.Run(ctx) }) + + if a.pusher != nil && !cfg.DisablePush { + a.pusher.AddInventoryProvider(c) + } +} + func (a *App) registerNginx(ctx context.Context, services []topsrv.Service) { ngxCfg := a.cfg.Nginx diff --git a/internal/topsrv/collector.go b/internal/topsrv/collector.go index c6ce87d..be96469 100644 --- a/internal/topsrv/collector.go +++ b/internal/topsrv/collector.go @@ -1,6 +1,7 @@ package topsrv import ( + "github.com/vmkteam/topsrv/internal/topsrv/packages" "github.com/vmkteam/topsrv/internal/topsrv/postgres" "github.com/prometheus/client_golang/prometheus" @@ -13,11 +14,31 @@ type Collector interface { Name() string } -// QueryMetaProvider returns query metadata for push to gatesrv. +// QueryMetaProvider returns query metadata for push to gatesrv /v1/meta. type QueryMetaProvider interface { QueryMeta() []postgres.QueryMeta } +// InventoryProvider returns inventory snapshots for push to gatesrv +// /v1/inventory. Each payload carries its own `kind` discriminator +// (packages, repos, packageHistory, ...). Returns nil/empty when there's +// nothing fresh to push — pusher won't send empty payloads. +// +// One-shot semantics: provider should drain its buffer after Inventory() +// is called so each snapshot is sent at most once. The optional +// InventoryAckReceiver mixin lets the provider learn which sends actually +// succeeded so it can re-queue on failure. +type InventoryProvider interface { + Inventory() []packages.Payload +} + +// InventoryAckReceiver is optionally implemented by an InventoryProvider to +// observe successful pushes. The pusher type-asserts and calls when available +// — providers that don't need ack tracking can skip it. +type InventoryAckReceiver interface { + OnInventoryPushed(kind string) +} + // Service is a discovered service on the host. type Service struct { Type string // "postgresql", "nginx", "angie", "redis", "php-fpm" diff --git a/internal/topsrv/packages/apk.go b/internal/topsrv/packages/apk.go new file mode 100644 index 0000000..0500919 --- /dev/null +++ b/internal/topsrv/packages/apk.go @@ -0,0 +1,167 @@ +//go:build linux + +package packages + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strconv" + "strings" + + "github.com/vmkteam/embedlog" +) + +const ( + apkInstalledPath = "/lib/apk/db/installed" + apkWorldPath = "/etc/apk/world" +) + +// Apk is the apk-tools (Alpine) manager. +type Apk struct { + embedlog.Logger + root string +} + +func NewApk(logger embedlog.Logger, root string) *Apk { + return &Apk{Logger: logger, root: root} +} + +func (m *Apk) Name() string { return ManagerApk } + +// Scan reads /lib/apk/db/installed. apk-tools encodes each package as a +// paragraph with single-letter keys (P=name, V=version, A=arch, I=installed +// size bytes, o=origin, m=maintainer, L=license, t=build time, c=git commit, +// C=checksum, U=URL, T=description). Records are blank-line separated, no +// continuation lines. +func (m *Apk) Scan(_ context.Context) (Snapshot, error) { + f, err := os.Open(m.root + apkInstalledPath) + if err != nil { + return Snapshot{}, fmt.Errorf("open apk db: %w", err) + } + defer f.Close() + + records, err := m.parseDB(f) + if err != nil { + return Snapshot{}, fmt.Errorf("parse apk db: %w", err) + } + + pkgs := make([]Package, 0, len(records)) + for _, r := range records { + if r["P"] == "" { + continue + } + size, _ := strconv.ParseInt(r["I"], 10, 64) + installTime, _ := strconv.ParseInt(r["t"], 10, 64) + pkg := Package{ + Name: r["P"], + Version: r["V"], + Arch: r["A"], + Status: StatusGeneric, + SourceName: r["o"], // origin: apk's source-package equivalent + Vendor: r["m"], // maintainer; apk has no separate Vendor field + InstallTime: installTime, + InstalledSize: size, + Homepage: r["U"], + GitCommit: r["c"], + } + if r["L"] != "" { + pkg.Licenses = []string{r["L"]} + } + m.setChecksum(&pkg, r["C"]) + pkgs = append(pkgs, pkg) + } + + m.enrichAutoInstalled(pkgs) + + return Snapshot{Manager: ManagerApk, Packages: pkgs}, nil +} + +// setChecksum parses apk's "C:" field, which encodes a hash with an algorithm +// prefix: Q1 = SHA1, Q2 = SHA256. Strips the prefix for cleaner JSON and +// records the algorithm explicitly. +func (m *Apk) setChecksum(pkg *Package, c string) { + switch { + case c == "": + return + case strings.HasPrefix(c, "Q1"): + pkg.SigDigest = c[2:] + pkg.SigAlgorithm = "sha1" + case strings.HasPrefix(c, "Q2"): + pkg.SigDigest = c[2:] + pkg.SigAlgorithm = "sha256" + default: + pkg.SigDigest = c + } +} + +// enrichAutoInstalled flags packages NOT listed in /etc/apk/world as +// auto-installed (i.e. pulled in as a dependency). apk doesn't record install +// reason per-package; `world` is the canonical list of manually-requested +// packages. Anything installed but not in world arrived as a dependency. +// +// apk has NO per-package repoOrigin: the `installed` DB doesn't store which +// repository each package came from. Reconstructing this would require +// scanning every APKINDEX.tar.gz in /var/cache/apk/, which is brittle and +// slow — skipped. +func (m *Apk) enrichAutoInstalled(pkgs []Package) { + f, err := os.Open(m.root + apkWorldPath) + if err != nil { + return + } + defer f.Close() + + manual := make(map[string]bool, 64) + sc := bufio.NewScanner(f) + for sc.Scan() { + // world entries may carry version constraints (e.g. "nginx>1.20" or + // "package=1.2-r3") — strip them to leave just the name. + line := strings.TrimSpace(sc.Text()) + if line == "" || line[0] == '#' { + continue + } + name := line + for i, r := range name { + if r == '<' || r == '>' || r == '=' || r == '~' { + name = name[:i] + break + } + } + manual[strings.TrimSpace(name)] = true + } + + for i := range pkgs { + if !manual[pkgs[i].Name] { + pkgs[i].AutoInstalled = true + } + } +} + +func (m *Apk) parseDB(r io.Reader) ([]map[string]string, error) { + sc := bufio.NewScanner(r) + sc.Buffer(make([]byte, 64*1024), 64*1024) + + var records []map[string]string + cur := map[string]string{} + flush := func() { + if len(cur) > 0 { + records = append(records, cur) + cur = map[string]string{} + } + } + for sc.Scan() { + line := sc.Text() + if line == "" { + flush() + continue + } + if len(line) < 2 || line[1] != ':' { + continue + } + cur[line[:1]] = line[2:] + } + flush() + return records, sc.Err() +} diff --git a/internal/topsrv/packages/collector.go b/internal/topsrv/packages/collector.go new file mode 100644 index 0000000..972348f --- /dev/null +++ b/internal/topsrv/packages/collector.go @@ -0,0 +1,158 @@ +package packages + +import ( + "context" + "math/rand/v2" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/vmkteam/embedlog" +) + +// Collector reports installed-package counts and snapshot health on /metrics, +// and ships full inventory snapshots to /v1/inventory. +// +// Mirrors the smart/ pattern: a background goroutine populates `cache` on a +// long interval (default 6h), and scrape-time Collect() just drains it. The +// expensive filesystem walk never blocks /metrics. +type Collector struct { + embedlog.Logger + + cfg Config + + installed *prometheus.Desc + held *prometheus.Desc + scanDuration *prometheus.Desc + scanErrors *prometheus.Desc + lastScanTS *prometheus.Desc + lastPushTS *prometheus.Desc + managerInfo *prometheus.Desc + + mu sync.Mutex + cache []prometheus.Metric + scanned bool //nolint:unused // mutated in packages_linux.go (build-tagged) + + // pending holds payloads produced by the last scan, drained by Inventory(). + // One-shot: each scan overwrites the previous pending if the pusher hasn't + // drained — only the latest snapshot is ever sent. + pending []Payload + lastPushUnix map[string]int64 // kind → last successful push unix sec + scanErrCount map[string]int // manager → cumulative scan failure count +} + +// NewCollector wires descriptors. Caller must Validate() the config first. +func NewCollector(logger embedlog.Logger, cfg Config) *Collector { + return &Collector{ + Logger: logger, + cfg: cfg, + installed: prometheus.NewDesc( + "topsrv_packages_installed", + "Number of installed packages by manager.", + []string{"manager"}, nil), + held: prometheus.NewDesc( + "topsrv_packages_held", + "Packages held back (dpkg hold / dnf versionlock).", + []string{"manager"}, nil), + scanDuration: prometheus.NewDesc( + "topsrv_packages_scan_duration_seconds", + "Duration of the last inventory scan.", + []string{"manager"}, nil), + scanErrors: prometheus.NewDesc( + "topsrv_packages_scan_errors_total", + "Cumulative scan failures by manager.", + []string{"manager"}, nil), + lastScanTS: prometheus.NewDesc( + "topsrv_packages_last_scan_timestamp_seconds", + "Unix ts of last successful inventory scan.", + []string{"manager"}, nil), + lastPushTS: prometheus.NewDesc( + "topsrv_packages_last_push_timestamp_seconds", + "Unix ts of last successful inventory push to /v1/inventory, by kind.", + []string{"kind"}, nil), + managerInfo: prometheus.NewDesc( + "topsrv_packages_manager_info", + "Package manager and OS info.", + []string{"manager", "os_id", "os_version"}, nil), + } +} + +func (c *Collector) Name() string { return "packages" } + +func (c *Collector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.installed + ch <- c.held + ch <- c.scanDuration + ch <- c.scanErrors + ch <- c.lastScanTS + ch <- c.lastPushTS + ch <- c.managerInfo +} + +func (c *Collector) Collect(ch chan<- prometheus.Metric) { + c.mu.Lock() + defer c.mu.Unlock() + for _, m := range c.cache { + ch <- m + } + for kind, ts := range c.lastPushUnix { + ch <- prometheus.MustNewConstMetric(c.lastPushTS, prometheus.GaugeValue, float64(ts), kind) + } + for mgr, n := range c.scanErrCount { + ch <- prometheus.MustNewConstMetric(c.scanErrors, prometheus.CounterValue, float64(n), mgr) + } +} + +// Inventory drains the pending snapshot buffer. Called by Pusher each tick. +// One-shot: subsequent calls return nil until the next scan finishes. +func (c *Collector) Inventory() []Payload { + c.mu.Lock() + defer c.mu.Unlock() + out := c.pending + c.pending = nil + return out +} + +// OnInventoryPushed implements InventoryAckReceiver. Surfaced via +// topsrv_packages_last_push_timestamp_seconds. +func (c *Collector) OnInventoryPushed(kind string) { + c.mu.Lock() + if c.lastPushUnix == nil { + c.lastPushUnix = make(map[string]int64, 4) + } + c.lastPushUnix[kind] = time.Now().Unix() + c.mu.Unlock() +} + +// Run starts the background scan loop. Blocks until ctx is cancelled. +// Interval gets ±10% jitter per iteration so co-deployed agents don't +// stampede gatesrv at the same wall-clock instant. +func (c *Collector) Run(ctx context.Context) { + interval := c.cfg.ParsedInterval() + c.Print(ctx, "packages: started", "interval", interval) + + c.scan(ctx) + + for { + timer := time.NewTimer(interval + jitter(interval)) + select { + case <-ctx.Done(): + timer.Stop() + c.Print(ctx, "packages: stopped") + return + case <-timer.C: + c.scan(ctx) + } + } +} + +// jitter returns a uniformly-distributed perturbation in ±10% of base. The +// math/rand/v2 package is auto-seeded per-process from the OS, so two agents +// started together drift apart naturally — no hostname-seeding needed. +func jitter(base time.Duration) time.Duration { + span := int64(base) / 10 + if span <= 0 { + return 0 + } + return time.Duration(rand.Int64N(2*span) - span) +} diff --git a/internal/topsrv/packages/config.go b/internal/topsrv/packages/config.go new file mode 100644 index 0000000..27e24cd --- /dev/null +++ b/internal/topsrv/packages/config.go @@ -0,0 +1,64 @@ +package packages + +import ( + "fmt" + "time" +) + +const ( + defaultInterval = 6 * time.Hour + defaultMaxPackages = 10000 +) + +// Config configures the package inventory collector. Loaded from the +// [Packages] TOML section. Disable-style flags (Disabled, DisablePush) use +// zero=false so absence of the TOML section means "enabled with defaults" — +// same pattern as [Postgres].Disabled / [Smart].Disabled. +type Config struct { + Disabled bool // skip the collector entirely + Interval string // snapshot scan period; default "6h" + Managers []string // empty = auto-detect (dpkg|rpm|apk by marker files) + CheckUpgrades bool // populate topsrv_packages_upgradable from local apt/dnf cache (Phase 4) + DisablePush bool // when true, skip POSTing snapshots to /v1/inventory + MaxPackages int // safety cap; logs warning and truncates if exceeded + + parsedInterval time.Duration +} + +// ParsedInterval returns the configured Interval as a time.Duration, falling +// back to the default when unset or malformed. +func (c *Config) ParsedInterval() time.Duration { + if c == nil || c.parsedInterval == 0 { + return defaultInterval + } + return c.parsedInterval +} + +// MaxPackagesOrDefault returns the configured cap or the default. +func (c *Config) MaxPackagesOrDefault() int { + if c == nil || c.MaxPackages <= 0 { + return defaultMaxPackages + } + return c.MaxPackages +} + +// Validate parses Interval and applies defaults. Returns an error only for +// malformed values that the operator likely meant to set. +func (c *Config) Validate() error { + if c == nil { + return nil + } + if c.Interval == "" { + c.parsedInterval = defaultInterval + return nil + } + d, err := time.ParseDuration(c.Interval) + if err != nil { + return fmt.Errorf("packages: invalid Interval %q: %w", c.Interval, err) + } + if d < time.Minute { + return fmt.Errorf("packages: Interval %q too small (minimum 1m)", c.Interval) + } + c.parsedInterval = d + return nil +} diff --git a/internal/topsrv/packages/dnf_history.go b/internal/topsrv/packages/dnf_history.go new file mode 100644 index 0000000..9b75e55 --- /dev/null +++ b/internal/topsrv/packages/dnf_history.go @@ -0,0 +1,122 @@ +//go:build linux + +package packages + +import ( + "database/sql" + "fmt" + "os" +) + +const dnfHistoryPath = "/var/lib/dnf/history.sqlite" + +// libdnf TransactionItemReason codes (stable since 2020). Map 1:1 to what +// `dnf history info` reports under "Reason". +const ( + dnfReasonNone = 0 + dnfReasonDependency = 1 + dnfReasonUser = 2 + dnfReasonClean = 3 + dnfReasonWeakDependency = 4 + dnfReasonGroup = 5 + dnfReasonExternalUser = 6 +) + +// libdnf TransactionItemAction. We only care about state-changing actions. +const ( + dnfActionInstall = 1 + dnfActionDowngrade = 2 + dnfActionUpgrade = 6 + dnfActionReinstall = 9 +) + +const dnfStateDone = 1 + +// enrichFromDnfHistory merges repoOrigin + autoInstalled by walking +// /var/lib/dnf/history.sqlite (libdnf schema). For each (name, arch) we keep: +// - reason from the *initial* INSTALL (libdnf zeroes reason on UPGRADE rows, +// so taking the latest would lose user/dep classification) +// - repo from the *latest* state-changing action (later upgrades may pull +// from a different repo than the initial install) +// +// Opened read-only with immutable=1 — lock-safe vs concurrent `dnf install`. +// Absence of history.sqlite is normal on rpm-only systems — silently skip. +func (m *Rpm) enrichFromDnfHistory(pkgs []Package) { + dbPath := m.root + dnfHistoryPath + if _, err := os.Stat(dbPath); err != nil { + return + } + + db, err := sql.Open("sqlite", fmt.Sprintf("file:%s?mode=ro&immutable=1", dbPath)) + if err != nil { + return + } + defer db.Close() + + rows, err := db.Query(` + SELECT + r.name, + r.arch, + COALESCE(repo.repoid, ''), + ti.reason, + ti.action + FROM trans_item ti + JOIN rpm r ON r.item_id = ti.item_id + LEFT JOIN repo ON repo.id = ti.repo_id + WHERE ti.state = ? + AND ti.action IN (?, ?, ?, ?) + ORDER BY ti.id ASC + `, dnfStateDone, dnfActionInstall, dnfActionDowngrade, dnfActionUpgrade, dnfActionReinstall) + if err != nil { + return // schema may differ in dnf5; silently skip + } + defer rows.Close() + + type entry struct { + repo string + initialReason int + hasInstall bool + } + latest := make(map[string]entry, 256) + + for rows.Next() { + var name, arch, repo string + var reason, action int + if err := rows.Scan(&name, &arch, &repo, &reason, &action); err != nil { + continue + } + key := name + "/" + arch + e := latest[key] + if action == dnfActionInstall && !e.hasInstall { + e.initialReason = reason + e.hasInstall = true + } + if repo != "" { + e.repo = repo // latest non-empty wins + } + latest[key] = e + } + + for i := range pkgs { + e, ok := latest[pkgs[i].Name+"/"+pkgs[i].Arch] + if !ok { + continue + } + // "@System" = repo unknown / installed before dnf history tracking. + // Don't surface that as a meaningful repo origin. + if e.repo != "" && e.repo != "@System" { + pkgs[i].RepoOrigin = e.repo + } + if e.hasInstall && isAutoInstallReason(e.initialReason) { + pkgs[i].AutoInstalled = true + } + } +} + +func isAutoInstallReason(r int) bool { + switch r { + case dnfReasonDependency, dnfReasonWeakDependency, dnfReasonGroup: + return true + } + return false +} diff --git a/internal/topsrv/packages/dpkg.go b/internal/topsrv/packages/dpkg.go new file mode 100644 index 0000000..246223e --- /dev/null +++ b/internal/topsrv/packages/dpkg.go @@ -0,0 +1,184 @@ +//go:build linux + +package packages + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strconv" + "strings" + + "github.com/vmkteam/embedlog" +) + +const ( + dpkgStatusPath = "/var/lib/dpkg/status" + aptExtStatesPath = "/var/lib/apt/extended_states" +) + +// Dpkg is the dpkg/apt manager. Owns parsing of /var/lib/dpkg/status, +// /var/lib/apt/extended_states (autoInstalled enrichment), and the shared +// RFC822 paragraph format used by both files. +type Dpkg struct { + embedlog.Logger + root string +} + +func NewDpkg(logger embedlog.Logger, root string) *Dpkg { + return &Dpkg{Logger: logger, root: root} +} + +func (m *Dpkg) Name() string { return ManagerDpkg } + +// Scan reads /var/lib/dpkg/status, filters to installed/held packages, and +// merges apt's auto-installed classification. dpkg's `Version` field +// already embeds epoch as a prefix ("1:6.7p1-5+deb8u2") — exactly what +// Vulners expects on the wire. +func (m *Dpkg) Scan(_ context.Context) (Snapshot, error) { + f, err := os.Open(m.root + dpkgStatusPath) + if err != nil { + return Snapshot{}, fmt.Errorf("open dpkg status: %w", err) + } + defer f.Close() + + records, err := m.parseRFC822(f) + if err != nil { + return Snapshot{}, fmt.Errorf("parse dpkg status: %w", err) + } + + pkgs := make([]Package, 0, len(records)) + for _, r := range records { + status := r["Status"] + if status != StatusInstalled && status != StatusHoldInstalled { + continue + } + name := r["Package"] + srcName, srcVersion := m.parseSource(r["Source"], name) + + size, _ := strconv.ParseInt(r["Installed-Size"], 10, 64) + size *= 1024 // dpkg reports KiB + + pkgs = append(pkgs, Package{ + Name: name, + Version: r["Version"], // already includes epoch prefix when present + Arch: r["Architecture"], + Status: status, + SourceName: srcName, + SourceVersion: srcVersion, + Vendor: r["Maintainer"], // dpkg has no Vendor; Maintainer fills the supply-chain slot + InstalledSize: size, + Section: r["Section"], + Priority: r["Priority"], + Homepage: r["Homepage"], + }) + } + + m.enrichAutoInstalled(pkgs) + + return Snapshot{Manager: ManagerDpkg, Packages: pkgs}, nil +} + +// parseSource splits dpkg's `Source:` field into name+version. Two valid forms: +// +// Source: openssl → ("openssl", "") +// Source: openssl (1.1.1-1ubuntu2) → ("openssl", "1.1.1-1ubuntu2") +// +// Empty Source is the common case (binary name == source name); per Debian +// convention we fall back to binaryName, otherwise CVE-mapping by source breaks. +func (m *Dpkg) parseSource(raw, binaryName string) (name, version string) { + if raw == "" { + return binaryName, "" + } + if i := strings.IndexByte(raw, '('); i > 0 { + name = strings.TrimSpace(raw[:i]) + j := strings.IndexByte(raw[i+1:], ')') + if j > 0 { + version = strings.TrimSpace(raw[i+1 : i+1+j]) + } + return name, version + } + return raw, "" +} + +// enrichAutoInstalled merges apt's manually-installed vs auto-installed +// classification. apt records this in extended_states (also RFC822 paragraphs), +// separate from dpkg's status. Absence of the file is normal on systems +// that haven't run apt — silently skip. +func (m *Dpkg) enrichAutoInstalled(pkgs []Package) { + f, err := os.Open(m.root + aptExtStatesPath) + if err != nil { + return + } + defer f.Close() + + records, err := m.parseRFC822(f) + if err != nil { + return + } + + // key by "name/arch" — multiarch lets the same name coexist (libssl3:amd64 + libssl3:i386) + auto := make(map[string]bool, len(records)) + for _, r := range records { + if r["Auto-Installed"] != "1" { + continue + } + auto[r["Package"]+"/"+r["Architecture"]] = true + } + + for i := range pkgs { + if auto[pkgs[i].Name+"/"+pkgs[i].Arch] { + pkgs[i].AutoInstalled = true + } + } +} + +// parseRFC822 parses a paragraph-per-record file. Records are blank-line +// separated. Continuation lines start with a space (or tab) and are joined to +// the previous field with "\n". Format used by dpkg status, apt +// extended_states, and apt history.log. +func (m *Dpkg) parseRFC822(r io.Reader) ([]map[string]string, error) { + sc := bufio.NewScanner(r) + sc.Buffer(make([]byte, 1<<20), 1<<20) // 1 MiB lines (dpkg Description can be long) + + var records []map[string]string + cur := map[string]string{} + lastKey := "" + + flush := func() { + if len(cur) > 0 { + records = append(records, cur) + cur = map[string]string{} + lastKey = "" + } + } + + for sc.Scan() { + line := sc.Text() + if line == "" { + flush() + continue + } + if line[0] == ' ' || line[0] == '\t' { + if lastKey != "" { + cur[lastKey] += "\n" + strings.TrimSpace(line) + } + continue + } + idx := strings.IndexByte(line, ':') + if idx <= 0 { + continue + } + key := line[:idx] + val := strings.TrimSpace(line[idx+1:]) + cur[key] = val + lastKey = key + } + flush() + if err := sc.Err(); err != nil { + return nil, err + } + return records, nil +} diff --git a/internal/topsrv/packages/inventory.go b/internal/topsrv/packages/inventory.go new file mode 100644 index 0000000..c7a89d7 --- /dev/null +++ b/internal/topsrv/packages/inventory.go @@ -0,0 +1,83 @@ +package packages + +import "time" + +// Manager values used in label sets, JSON, and discovery dispatch. +const ( + ManagerDpkg = "dpkg" + ManagerRpm = "rpm" + ManagerApk = "apk" +) + +// Status values for Package.Status. dpkg distinguishes installed vs held; +// rpm and apk collapse to a generic "installed". +const ( + StatusInstalled = "install ok installed" + StatusHoldInstalled = "hold ok installed" + StatusGeneric = "installed" +) + +// Payload — body shape for POST /v1/inventory. The kind discriminator routes +// snapshots on gatesrv. Host-level facts (osId, kernelRelease, ...) live in +// `host`, not in every package row — keeps payload small. +type Payload struct { + Kind string `json:"kind"` + ScannedAt time.Time `json:"scannedAt"` + Host HostMeta `json:"host"` + Data any `json:"data"` // Snapshot | ReposSnapshot | HistorySnapshot (Phase 4) +} + +// HostMeta — required upstream by Vulners audit (osId+osVersionId+kernelRelease). +// kernelRelease is critical: Vulners' agent filters kernel-*/linux-image-* not +// matching `uname -r` to avoid false positives on inactive kernels. +type HostMeta struct { + OsID string `json:"osId"` + OsVersionID string `json:"osVersionId"` + OsVersionCodename string `json:"osVersionCodename,omitempty"` + OsIDLike []string `json:"osIdLike,omitempty"` + OsPrettyName string `json:"osPrettyName,omitempty"` + KernelRelease string `json:"kernelRelease"` + KernelArch string `json:"kernelArch,omitempty"` + PackageManager string `json:"packageManager"` +} + +// Snapshot is the data shape for kind="packages". +type Snapshot struct { + Manager string `json:"manager"` + Packages []Package `json:"packages"` +} + +// Package — security-oriented inventory entry. Field grouping mirrors the +// MUST/SHOULD/NICE classification from the Vulners + Syft/Trivy analysis; +// see docs/packages-collector-implementation.md "Security data model". +type Package struct { + // MUST — Vulners audit requires these + Name string `json:"name"` + Version string `json:"version"` // dpkg: includes epoch prefix ("1:6.7p1-5+deb8u2") + Release string `json:"release,omitempty"` + Epoch *int `json:"epoch,omitempty"` // rpm only; nil = absent + Arch string `json:"arch,omitempty"` + Status string `json:"status,omitempty"` + + // SHOULD — supply-chain, accuracy, drift + SourceName string `json:"sourceName,omitempty"` + SourceVersion string `json:"sourceVersion,omitempty"` + ModularityLabel string `json:"modularityLabel,omitempty"` // rpm RHEL 8+ + Vendor string `json:"vendor,omitempty"` + GpgKeyID string `json:"gpgKeyId,omitempty"` + SigDigest string `json:"sigDigest,omitempty"` + SigAlgorithm string `json:"sigAlgorithm,omitempty"` + AutoInstalled bool `json:"autoInstalled,omitempty"` + RepoOrigin string `json:"repoOrigin,omitempty"` + IsActiveKernel bool `json:"isActiveKernel,omitempty"` + IsOldKernel bool `json:"isOldKernel,omitempty"` + + // NICE — SBOM, forensics + InstallTime int64 `json:"installTime,omitempty"` + InstalledSize int64 `json:"installedSize,omitempty"` + Licenses []string `json:"licenses,omitempty"` + Section string `json:"section,omitempty"` + Priority string `json:"priority,omitempty"` + Homepage string `json:"homepage,omitempty"` + GitCommit string `json:"gitCommit,omitempty"` +} diff --git a/internal/topsrv/packages/manager.go b/internal/topsrv/packages/manager.go new file mode 100644 index 0000000..c7035e3 --- /dev/null +++ b/internal/topsrv/packages/manager.go @@ -0,0 +1,48 @@ +//go:build linux + +package packages + +import ( + "context" + "os" + + "github.com/vmkteam/embedlog" +) + +// managerFunc bundles a manager's name with its Scan method. No interface +// here on purpose — concrete types are clearer (Go philosophy: interfaces +// only when polymorphism is actually needed for dispatch). This struct just +// holds the dispatch pair, the implementations live on *Dpkg, *Rpm, *Apk. +type managerFunc struct { + name string + scan func(context.Context) (Snapshot, error) +} + +// detectManagers probes the filesystem for marker files of each supported +// package manager and returns dispatch entries for every match. Order: +// dpkg first (Ubuntu/Debian) → rpm → apk. Hosts with multiple managers +// (containers with mixed origin) yield multiple entries. +// +// `root` enables chroot-style scans (e.g. /mnt/host-root); production passes "". +func detectManagers(logger embedlog.Logger, root string) []managerFunc { + var out []managerFunc + if fileExists(root + dpkgStatusPath) { + d := NewDpkg(logger, root) + out = append(out, managerFunc{name: d.Name(), scan: d.Scan}) + } + if r := NewRpm(logger, root); r.findDB() != "" { + out = append(out, managerFunc{name: r.Name(), scan: r.Scan}) + } + if fileExists(root + apkInstalledPath) { + a := NewApk(logger, root) + out = append(out, managerFunc{name: a.Name(), scan: a.Scan}) + } + return out +} + +// fileExists hides the err-ignoring intent of Stat. Marker-file probes +// don't distinguish "missing" from "no permission" — both mean "skip". +func fileExists(path string) bool { + _, err := os.Stat(path) + return err == nil +} diff --git a/internal/topsrv/packages/osinfo.go b/internal/topsrv/packages/osinfo.go new file mode 100644 index 0000000..2f62d30 --- /dev/null +++ b/internal/topsrv/packages/osinfo.go @@ -0,0 +1,51 @@ +//go:build linux + +package packages + +import ( + "bufio" + "os" + "runtime" + "strings" +) + +// detectHost populates HostMeta from /etc/os-release + /proc/sys/kernel/osrelease. +// PackageManager is left empty — the caller fills it per-snapshot since one host +// may have multiple managers (rare: dpkg+rpm cohabitation in containers). +// `root` enables chroot-style scans (e.g. /mnt/host-root for sidecar deployments); +// the production collector passes "". +func detectHost(root string) HostMeta { + h := HostMeta{ + KernelArch: runtime.GOARCH, + } + + if f, err := os.Open(root + "/etc/os-release"); err == nil { + defer f.Close() + sc := bufio.NewScanner(f) + for sc.Scan() { + k, v, ok := strings.Cut(sc.Text(), "=") + if !ok { + continue + } + v = strings.Trim(v, `"`) + switch k { + case "ID": + h.OsID = v + case "VERSION_ID": + h.OsVersionID = v + case "VERSION_CODENAME": + h.OsVersionCodename = v + case "ID_LIKE": + h.OsIDLike = strings.Fields(v) + case "PRETTY_NAME": + h.OsPrettyName = v + } + } + } + + if b, err := os.ReadFile(root + "/proc/sys/kernel/osrelease"); err == nil { + h.KernelRelease = strings.TrimSpace(string(b)) + } + + return h +} diff --git a/internal/topsrv/packages/packages_linux.go b/internal/topsrv/packages/packages_linux.go new file mode 100644 index 0000000..9e8a8b7 --- /dev/null +++ b/internal/topsrv/packages/packages_linux.go @@ -0,0 +1,148 @@ +//go:build linux + +package packages + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// inventoryKindPackages is the discriminator used in /v1/inventory for the +// installed-packages snapshot. Repos / packageHistory land in Phase 4. +const inventoryKindPackages = "packages" + +// scan walks the filesystem package databases and refreshes the cache. +// Called from Run() under a long interval (default 6h). +func (c *Collector) scan(ctx context.Context) { + host := detectHost("") + managers := c.resolveManagers() + + limit := c.cfg.MaxPackagesOrDefault() + now := time.Now().UTC() + + var ( + metrics []prometheus.Metric + snapshots []Snapshot + errBumps []string + ) + for _, m := range managers { + if ctx.Err() != nil { + return + } + snap, mx, ok := c.scanManager(ctx, m, host, limit, now) + metrics = append(metrics, mx...) + if !ok { + errBumps = append(errBumps, m.name) + continue + } + snapshots = append(snapshots, snap) + } + + pending := buildPendingPayloads(host, snapshots, now, !c.cfg.DisablePush) + + c.mu.Lock() + c.cache = metrics + c.pending = pending + c.scanned = true + if c.scanErrCount == nil { + c.scanErrCount = make(map[string]int, len(errBumps)) + } + for _, name := range errBumps { + c.scanErrCount[name]++ + } + c.mu.Unlock() +} + +// resolveManagers honours an explicit [Packages].Managers list when set, +// otherwise falls back to filesystem detection. Order is preserved. +func (c *Collector) resolveManagers() []managerFunc { + if len(c.cfg.Managers) == 0 { + return detectManagers(c.Logger, "") + } + var out []managerFunc + for _, name := range c.cfg.Managers { + switch name { + case ManagerDpkg: + d := NewDpkg(c.Logger, "") + out = append(out, managerFunc{name: d.Name(), scan: d.Scan}) + case ManagerRpm: + r := NewRpm(c.Logger, "") + out = append(out, managerFunc{name: r.Name(), scan: r.Scan}) + case ManagerApk: + a := NewApk(c.Logger, "") + out = append(out, managerFunc{name: a.Name(), scan: a.Scan}) + } + } + return out +} + +// scanManager runs one manager's Scan(), builds its metric vector, and +// returns ok=false on parse failure. Caller bumps the error counter on !ok. +func (c *Collector) scanManager(ctx context.Context, m managerFunc, host HostMeta, limit int, now time.Time) (Snapshot, []prometheus.Metric, bool) { + start := time.Now() + snap, err := m.scan(ctx) + duration := time.Since(start).Seconds() + + if err != nil { + c.Error(ctx, "packages: scan failed", "manager", m.name, "error", err) + return Snapshot{}, nil, false + } + + total := len(snap.Packages) + if total > limit { + snap.Packages = snap.Packages[:limit] + c.Print(ctx, "packages: snapshot truncated", "manager", m.name, "total", total, "limit", limit) + } + + held := countHeld(snap.Packages) + + c.mu.Lock() + firstScan := !c.scanned + c.mu.Unlock() + if firstScan { + c.Print(ctx, "packages: initial scan complete", + "manager", m.name, "total", total, "held", held, "duration_s", duration) + } + + return snap, []prometheus.Metric{ + prometheus.MustNewConstMetric(c.installed, prometheus.GaugeValue, float64(total), m.name), + prometheus.MustNewConstMetric(c.held, prometheus.GaugeValue, float64(held), m.name), + prometheus.MustNewConstMetric(c.scanDuration, prometheus.GaugeValue, duration, m.name), + prometheus.MustNewConstMetric(c.lastScanTS, prometheus.GaugeValue, float64(now.Unix()), m.name), + prometheus.MustNewConstMetric(c.managerInfo, prometheus.GaugeValue, 1, m.name, host.OsID, host.OsVersionID), + }, true +} + +// buildPendingPayloads constructs the /v1/inventory payloads. One payload per +// snapshot keeps gatesrv routing simple (one POST per kind+manager). When +// PushInventory is disabled we still keep metrics but skip the buffer — +// Inventory() returns nil. +func buildPendingPayloads(host HostMeta, snaps []Snapshot, now time.Time, pushEnabled bool) []Payload { + if !pushEnabled { + return nil + } + out := make([]Payload, 0, len(snaps)) + for _, s := range snaps { + h := host + h.PackageManager = s.Manager + out = append(out, Payload{ + Kind: inventoryKindPackages, + ScannedAt: now, + Host: h, + Data: s, + }) + } + return out +} + +func countHeld(pkgs []Package) int { + n := 0 + for _, p := range pkgs { + if p.Status == StatusHoldInstalled { + n++ + } + } + return n +} diff --git a/internal/topsrv/packages/packages_other.go b/internal/topsrv/packages/packages_other.go new file mode 100644 index 0000000..6ed3ffb --- /dev/null +++ b/internal/topsrv/packages/packages_other.go @@ -0,0 +1,9 @@ +//go:build !linux + +package packages + +import "context" + +// scan is a no-op on non-Linux platforms. dpkg/rpm/apk databases live under +// /var/lib// on Linux only. +func (c *Collector) scan(_ context.Context) {} diff --git a/internal/topsrv/packages/parsers_test.go b/internal/topsrv/packages/parsers_test.go new file mode 100644 index 0000000..468aac2 --- /dev/null +++ b/internal/topsrv/packages/parsers_test.go @@ -0,0 +1,152 @@ +//go:build linux + +package packages + +import ( + "strings" + "testing" +) + +func TestRpmParseSrcRpm(t *testing.T) { + m := &Rpm{} + cases := []struct { + in string + wantName string + wantVer string + }{ + // Standard: name has no hyphens. + {"openssl-1.1.1k-7.el8_6.src.rpm", "openssl", "1.1.1k-7.el8_6"}, + // Tricky: name contains hyphens — last two hyphens split version/release, + // everything before is the source name. Vulners CVE mapping depends on this. + {"kernel-headers-5.14.0-362.el9.src.rpm", "kernel-headers", "5.14.0-362.el9"}, + {"libnss-systemd-252.16-1.fc39.src.rpm", "libnss-systemd", "252.16-1.fc39"}, + // Without .src suffix (some rpm builds). + {"foo-1.0-1.rpm", "foo", "1.0-1"}, + // Degenerate cases — must not panic. + {"foo", "foo", ""}, + {"", "", ""}, + } + for _, tc := range cases { + gotName, gotVer := m.parseSrcRpm(tc.in) + if gotName != tc.wantName || gotVer != tc.wantVer { + t.Errorf("parseSrcRpm(%q) = (%q, %q), want (%q, %q)", + tc.in, gotName, gotVer, tc.wantName, tc.wantVer) + } + } +} + +func TestDpkgParseSource(t *testing.T) { + m := &Dpkg{} + cases := []struct { + raw, binary string + wantName string + wantVer string + }{ + // Empty Source: → fallback to binary name (Debian convention). + {"", "openssl", "openssl", ""}, + // Just a source name, no version in parens. + {"openssl", "libssl3", "openssl", ""}, + // Source with version. + {"openssl (1.1.1-1ubuntu2)", "libssl3", "openssl", "1.1.1-1ubuntu2"}, + // Malformed: open paren without close — graceful degradation, no panic. + {"openssl (", "libssl3", "openssl", ""}, + // Whitespace around tokens. + {" foo ( 1.2.3 ) ", "x", "foo", "1.2.3"}, + } + for _, tc := range cases { + gotName, gotVer := m.parseSource(tc.raw, tc.binary) + if gotName != tc.wantName || gotVer != tc.wantVer { + t.Errorf("parseSource(%q, %q) = (%q, %q), want (%q, %q)", + tc.raw, tc.binary, gotName, gotVer, tc.wantName, tc.wantVer) + } + } +} + +func TestDpkgParseRFC822(t *testing.T) { + m := &Dpkg{} + // Two packages, multi-line Description (continuation with leading space), + // blank-line separator between records. + in := `Package: foo +Version: 1.0 +Description: short + long line continues + second continued line + +Package: bar +Version: 2.0 +Description: another +` + records, err := m.parseRFC822(strings.NewReader(in)) + if err != nil { + t.Fatalf("parseRFC822: %v", err) + } + if len(records) != 2 { + t.Fatalf("got %d records, want 2", len(records)) + } + if got := records[0]["Package"]; got != "foo" { + t.Errorf("record[0].Package = %q, want foo", got) + } + if got := records[0]["Description"]; got != "short\nlong line continues\nsecond continued line" { + t.Errorf("record[0].Description continuation broken: %q", got) + } + if got := records[1]["Version"]; got != "2.0" { + t.Errorf("record[1].Version = %q, want 2.0", got) + } +} + +func TestDpkgParseRFC822Empty(t *testing.T) { + m := &Dpkg{} + records, err := m.parseRFC822(strings.NewReader("")) + if err != nil { + t.Fatalf("parseRFC822(empty): %v", err) + } + if len(records) != 0 { + t.Errorf("got %d records, want 0", len(records)) + } +} + +func TestApkSetChecksum(t *testing.T) { + m := &Apk{} + cases := []struct { + in string + wantDig string + wantAlgo string + }{ + {"", "", ""}, + {"Q1abc123", "abc123", "sha1"}, + {"Q2xyz789", "xyz789", "sha256"}, + {"raw-no-prefix", "raw-no-prefix", ""}, // forward-compat: unknown prefix → keep raw + } + for _, tc := range cases { + var p Package + m.setChecksum(&p, tc.in) + if p.SigDigest != tc.wantDig || p.SigAlgorithm != tc.wantAlgo { + t.Errorf("setChecksum(%q) = (%q, %q), want (%q, %q)", + tc.in, p.SigDigest, p.SigAlgorithm, tc.wantDig, tc.wantAlgo) + } + } +} + +func TestRpmExtractKeyID(t *testing.T) { + m := &Rpm{} + cases := []struct { + fields []string + want string + }{ + // Standard rpm PGP string. + {[]string{"RSA/SHA256, Mon Nov 20 12:00:00, Key ID 199e2f91fd431d51"}, "199e2f91fd431d51"}, + // Lowercase normalization. + {[]string{"Key ID ABCD1234"}, "abcd1234"}, + // Fallback to second field when first empty. + {[]string{"", "DSA, Key ID 1234abcd5678"}, "1234abcd5678"}, + // No match at all. + {[]string{"", ""}, ""}, + {[]string{"no key id here"}, ""}, + } + for _, tc := range cases { + got := m.extractKeyID(tc.fields...) + if got != tc.want { + t.Errorf("extractKeyID(%v) = %q, want %q", tc.fields, got, tc.want) + } + } +} diff --git a/internal/topsrv/packages/rpm.go b/internal/topsrv/packages/rpm.go new file mode 100644 index 0000000..c857211 --- /dev/null +++ b/internal/topsrv/packages/rpm.go @@ -0,0 +1,188 @@ +//go:build linux + +package packages + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "regexp" + "strings" + + rpmdb "github.com/anchore/go-rpmdb/pkg" + "github.com/vmkteam/embedlog" + // modernc.org/sqlite registers driver "sqlite" — required by anchore/go-rpmdb's + // sqlite3 backend which calls sql.Open("sqlite", "file:...?mode=ro&immutable=1") + // without registering one itself. Same pattern Trivy and Syft use. Phase 5 + // will gate this import behind a build tag for slim builds. + _ "modernc.org/sqlite" +) + +// rpmDBCandidates lists every supported rpm database path in priority order. +// First-match wins. SQLite (RHEL 9+, Fedora 32+) is tried first, then NDB +// (SUSE), then BDB (RHEL 7-8). Each variant has a sysimage equivalent for +// CoreOS / OSTree layouts. +var rpmDBCandidates = []string{ + "/var/lib/rpm/rpmdb.sqlite", // RHEL 9+, Fedora 32+ + "/var/lib/rpm/Packages.db", // SUSE / openSUSE NDB + "/var/lib/rpm/Packages", // RHEL 7-8 BDB + "/usr/lib/sysimage/rpm/rpmdb.sqlite", // CoreOS / OSTree + "/usr/lib/sysimage/rpm/Packages.db", + "/usr/lib/sysimage/rpm/Packages", +} + +// keyIDRe extracts the short key id from rpm's PGP/RSAHeader fields. rpm prints +// signatures as "RSA/SHA256, Mon Nov 20 ..., Key ID 199e2f91fd431d51". +var keyIDRe = regexp.MustCompile(`(?i)Key ID ([0-9a-f]{8,40})`) + +// Rpm is the rpm manager (RHEL/Fedora/Rocky/Alma/CentOS/SUSE). +type Rpm struct { + embedlog.Logger + root string +} + +func NewRpm(logger embedlog.Logger, root string) *Rpm { + return &Rpm{Logger: logger, root: root} +} + +func (m *Rpm) Name() string { return ManagerRpm } + +// Scan parses the rpm database. BDB (`Packages`) is copy-then-parsed +// (Syft/Trivy pattern) to avoid mid-write inconsistency during `rpm -i` — +// BDB writes pages in-place. SQLite is opened read-only with `immutable=1` +// inside anchore/go-rpmdb. NDB is append-only and safe. +func (m *Rpm) Scan(_ context.Context) (Snapshot, error) { + openPath, cleanup, err := m.openDB() + if err != nil { + return Snapshot{}, err + } + defer cleanup() + + db, err := rpmdb.Open(openPath) + if err != nil { + return Snapshot{}, fmt.Errorf("rpmdb open %s: %w", openPath, err) + } + defer db.Close() + + infos, err := db.ListPackages() + if err != nil { + return Snapshot{}, fmt.Errorf("rpmdb list: %w", err) + } + + pkgs := make([]Package, 0, len(infos)) + for _, p := range infos { + pkgs = append(pkgs, m.toPackage(p)) + } + + m.enrichFromDnfHistory(pkgs) + + return Snapshot{Manager: ManagerRpm, Packages: pkgs}, nil +} + +// findDB returns the first existing rpm database path under m.root, or "" if +// none found. Used by both Scan() (via openDB) and detectManagers(). +func (m *Rpm) findDB() string { + for _, p := range rpmDBCandidates { + if fileExists(m.root + p) { + return m.root + p + } + } + return "" +} + +// openDB locates the rpm database and, for BDB only, copies it to a tempfile +// to escape mid-write inconsistency. Returns the path to read and a cleanup +// function. Callers always defer cleanup(). +func (m *Rpm) openDB() (string, func(), error) { + dbPath := m.findDB() + if dbPath == "" { + return "", func() {}, fmt.Errorf("no rpm database found under %q", m.root) + } + if filepath.Base(dbPath) != "Packages" { + return dbPath, func() {}, nil // sqlite/ndb — opened directly + } + tmp, err := m.copyToTemp(dbPath) + if err != nil { + return "", func() {}, fmt.Errorf("snapshot rpm BDB: %w", err) + } + return tmp, func() { os.Remove(tmp) }, nil +} + +func (m *Rpm) toPackage(p *rpmdb.PackageInfo) Package { + srcName, srcVer := m.parseSrcRpm(p.SourceRpm) + pkg := Package{ + Name: p.Name, + Version: p.Version, + Release: p.Release, + Epoch: p.Epoch, + Arch: p.Arch, + Status: StatusGeneric, + SourceName: srcName, + SourceVersion: srcVer, + ModularityLabel: p.Modularitylabel, + Vendor: p.Vendor, + GpgKeyID: m.extractKeyID(p.PGP, p.RSAHeader), + InstallTime: int64(p.InstallTime), + InstalledSize: int64(p.Size), + } + if p.SigMD5 != "" { + pkg.SigDigest = p.SigMD5 + pkg.SigAlgorithm = "md5" + } + if p.License != "" { + pkg.Licenses = []string{p.License} + } + return pkg +} + +// parseSrcRpm splits "name-VERSION-RELEASE.src.rpm" into name and VERSION-RELEASE. +// rpm guarantees this shape — RELEASE is anything after the last hyphen, +// VERSION is everything between the first and last hyphen. NEVRA without epoch. +func (m *Rpm) parseSrcRpm(s string) (name, version string) { + if s == "" { + return "", "" + } + s = strings.TrimSuffix(s, ".rpm") + s = strings.TrimSuffix(s, ".src") + relIdx := strings.LastIndexByte(s, '-') + if relIdx <= 0 { + return s, "" + } + verIdx := strings.LastIndexByte(s[:relIdx], '-') + if verIdx <= 0 { + return s, s[relIdx+1:] + } + return s[:verIdx], s[verIdx+1:] +} + +// extractKeyID grabs the GPG short key id from rpm's PGP/RSAHeader strings. +// Returns the first match (PGP first, RSAHeader fallback). Both fields may be +// empty for unsigned/locally-built packages — that absence is meaningful. +func (m *Rpm) extractKeyID(fields ...string) string { + for _, f := range fields { + if match := keyIDRe.FindStringSubmatch(f); len(match) == 2 { + return strings.ToLower(match[1]) + } + } + return "" +} + +func (m *Rpm) copyToTemp(src string) (string, error) { + in, err := os.Open(src) + if err != nil { + return "", err + } + defer in.Close() + out, err := os.CreateTemp("", "rpmdb-topsrv-*") + if err != nil { + return "", err + } + defer out.Close() + if _, err := io.Copy(out, in); err != nil { + os.Remove(out.Name()) + return "", err + } + return out.Name(), nil +} diff --git a/internal/topsrv/push.go b/internal/topsrv/push.go index 155ce8f..9ceaa44 100644 --- a/internal/topsrv/push.go +++ b/internal/topsrv/push.go @@ -14,6 +14,7 @@ import ( "sort" "time" + "github.com/vmkteam/topsrv/internal/topsrv/packages" "github.com/vmkteam/topsrv/internal/topsrv/postgres" "github.com/prometheus/client_golang/prometheus" @@ -42,13 +43,15 @@ type PushConfig struct { type Pusher struct { embedlog.Logger - cfg PushConfig - registry *prometheus.Registry - client *http.Client - interval time.Duration - hostname string - metaProviders []QueryMetaProvider - metaEndpoint string // derived: /v1/write → /v1/meta + cfg PushConfig + registry *prometheus.Registry + client *http.Client + interval time.Duration + hostname string + metaProviders []QueryMetaProvider + inventoryProviders []InventoryProvider + metaEndpoint string // derived: /v1/write → /v1/meta + inventoryEndpoint string // derived: /v1/write → /v1/inventory } func NewPusher(logger embedlog.Logger, appName, version string, cfg PushConfig, registry *prometheus.Registry) *Pusher { @@ -63,13 +66,14 @@ func NewPusher(logger embedlog.Logger, appName, version string, cfg PushConfig, } return &Pusher{ - Logger: logger, - cfg: cfg, - registry: registry, - client: appkit.NewHTTPClient(appName, version, pushTimeout), - interval: interval, - hostname: hostname, - metaEndpoint: deriveMetaEndpoint(cfg.Endpoint), + Logger: logger, + cfg: cfg, + registry: registry, + client: appkit.NewHTTPClient(appName, version, pushTimeout), + interval: interval, + hostname: hostname, + metaEndpoint: deriveEndpoint(cfg.Endpoint, "/v1/meta"), + inventoryEndpoint: deriveEndpoint(cfg.Endpoint, "/v1/inventory"), } } @@ -100,6 +104,11 @@ func (p *Pusher) AddMetaProvider(mp QueryMetaProvider) { p.metaProviders = append(p.metaProviders, mp) } +// AddInventoryProvider registers a provider for /v1/inventory snapshot push. +func (p *Pusher) AddInventoryProvider(ip InventoryProvider) { + p.inventoryProviders = append(p.inventoryProviders, ip) +} + // Flush performs a final gather and attempts to send metrics. // If the send fails, data is spooled to disk for retry on next startup. func (p *Pusher) Flush() { @@ -146,6 +155,10 @@ func (p *Pusher) push(ctx context.Context) { if len(p.metaProviders) > 0 { p.sendMeta(ctx) } + + if len(p.inventoryProviders) > 0 { + p.sendInventory(ctx) + } } // gather collects metrics and encodes as gzipped Prometheus text format. @@ -221,28 +234,10 @@ func (p *Pusher) send(ctx context.Context, data []byte) error { return nil } -// spool saves failed payload to disk for later retry. +// spool saves failed metrics payload to disk for later retry. Uses +// SpoolDir/.gz; trimming caps the directory at pushMaxSpoolSize. func (p *Pusher) spool(ctx context.Context, data []byte) { - if p.cfg.SpoolDir == "" { - return - } - if err := os.MkdirAll(p.cfg.SpoolDir, 0o750); err != nil { - p.Error(ctx, "push: spool mkdir failed", "error", err) - return - } - - name := fmt.Sprintf("%d.gz", time.Now().UnixMilli()) - path := filepath.Join(p.cfg.SpoolDir, name) - if err := os.WriteFile(path, data, 0o640); err != nil { - p.Error(ctx, "push: spool write failed", "error", err) - return - } - - files, _ := filepath.Glob(filepath.Join(p.cfg.SpoolDir, "*.gz")) - p.Print(ctx, "push: spooled", "name", name, "pending", len(files)) - - // Trim old files if spool is too large. - p.trimSpool() + p.spoolFile(ctx, "", "", "gz", data) } // retrySpool sends buffered payloads from spool dir (oldest first). @@ -275,13 +270,14 @@ func (p *Pusher) retrySpool(ctx context.Context) { } } -// deriveMetaEndpoint replaces /v1/write with /v1/meta in the push endpoint URL. -func deriveMetaEndpoint(pushEndpoint string) string { +// deriveEndpoint replaces the push endpoint's path with `path` (e.g. /v1/meta +// or /v1/inventory) while preserving scheme/host/credentials. +func deriveEndpoint(pushEndpoint, path string) string { u, err := url.Parse(pushEndpoint) if err != nil { return "" } - u.Path = "/v1/meta" + u.Path = path return u.String() } @@ -289,7 +285,41 @@ type metaPayload struct { Queries []postgres.QueryMeta `json:"queries"` } -// sendMeta pushes query metadata from all providers to gatesrv. +// postJSON POSTs a JSON body to `url` with the agent's auth + hostname headers. +// On non-2xx it returns an error with the first 200 bytes of the response body +// so callers (and operators reading logs) can see validation messages from +// gatesrv. Common path for both /v1/meta and /v1/inventory pushes. +func (p *Pusher) postJSON(ctx context.Context, url string, body []byte) error { + ctx, cancel := context.WithTimeout(ctx, pushTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + if p.cfg.Token != "" { + req.Header.Set("Authorization", "Bearer "+p.cfg.Token) + } + if p.hostname != "" { + req.Header.Set("X-Hostname", p.hostname) + } + + resp, err := p.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 200)) + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(preview)) + } + _, _ = io.Copy(io.Discard, resp.Body) + return nil +} + +// sendMeta pushes query metadata from all providers to gatesrv /v1/meta. func (p *Pusher) sendMeta(ctx context.Context) { if p.metaEndpoint == "" { return @@ -308,42 +338,80 @@ func (p *Pusher) sendMeta(ctx context.Context) { return } - ctx, cancel := context.WithTimeout(ctx, pushTimeout) - defer cancel() + if err := p.postJSON(ctx, p.metaEndpoint, body); err != nil { + p.Error(ctx, "meta: send failed", "error", err) + } +} - req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.metaEndpoint, bytes.NewReader(body)) - if err != nil { +// sendInventory pushes inventory snapshots from all providers to +// /v1/inventory. Each Payload is sent as a separate request — gatesrv routes +// by `kind`. Providers return nil/empty when there's nothing fresh. +// +// On 2xx the provider's optional OnInventoryPushed is invoked. +// On failure we spool to SpoolDir/inventory/. +func (p *Pusher) sendInventory(ctx context.Context) { + if p.inventoryEndpoint == "" { return } - req.Header.Set("Content-Type", "application/json") - if p.cfg.Token != "" { - req.Header.Set("Authorization", "Bearer "+p.cfg.Token) - } - if p.hostname != "" { - req.Header.Set("X-Hostname", p.hostname) + for _, ip := range p.inventoryProviders { + for _, payload := range ip.Inventory() { + p.sendOneInventory(ctx, ip, payload) + } } +} - resp, err := p.client.Do(req) +func (p *Pusher) sendOneInventory(ctx context.Context, ip InventoryProvider, payload packages.Payload) { + body, err := json.Marshal(payload) if err != nil { - p.Error(ctx, "meta: send failed", "error", err) + p.Error(ctx, "inventory: marshal failed", "kind", payload.Kind, "error", err) return } - defer resp.Body.Close() - _, _ = io.Copy(io.Discard, resp.Body) - if resp.StatusCode >= 400 { - p.Error(ctx, "meta: HTTP error", "status", resp.StatusCode) + if err := p.postJSON(ctx, p.inventoryEndpoint, body); err != nil { + p.Error(ctx, "inventory: send failed", "kind", payload.Kind, "error", err) + p.spoolFile(ctx, "inventory", payload.Kind, "json", body) + return + } + + if ack, ok := ip.(InventoryAckReceiver); ok { + ack.OnInventoryPushed(payload.Kind) } } -// trimSpool removes oldest files if spool exceeds max size. -func (p *Pusher) trimSpool() { - files, err := filepath.Glob(filepath.Join(p.cfg.SpoolDir, "*.gz")) - if err != nil || len(files) <= pushMaxSpoolSize { +// spoolFile writes a failed payload to //-. +// and trims older files of the same subdir+ext to pushMaxSpoolSize. subdir="" +// places files directly in SpoolDir (legacy metrics layout: prefix=unixms, +// ext=gz, no kind prefix). +func (p *Pusher) spoolFile(ctx context.Context, subdir, prefix, ext string, body []byte) { + if p.cfg.SpoolDir == "" { return } - sort.Strings(files) - for _, path := range files[:len(files)-pushMaxSpoolSize] { - os.Remove(path) + dir := p.cfg.SpoolDir + if subdir != "" { + dir = filepath.Join(dir, subdir) + } + if err := os.MkdirAll(dir, 0o750); err != nil { + p.Error(ctx, "spool: mkdir failed", "subdir", subdir, "error", err) + return + } + name := fmt.Sprintf("%d.%s", time.Now().UnixMilli(), ext) + if prefix != "" { + name = prefix + "-" + name + } + path := filepath.Join(dir, name) + if err := os.WriteFile(path, body, 0o640); err != nil { + p.Error(ctx, "spool: write failed", "subdir", subdir, "error", err) + return + } + files, _ := filepath.Glob(filepath.Join(dir, "*."+ext)) + p.Print(ctx, "spool: queued", "subdir", subdir, "name", name, "pending", len(files)) + + if len(files) > pushMaxSpoolSize { + sort.Strings(files) + dropped := files[:len(files)-pushMaxSpoolSize] + for _, f := range dropped { + os.Remove(f) + } + p.Print(ctx, "spool: dropped oldest", "subdir", subdir, "dropped", len(dropped), "kept", pushMaxSpoolSize) } } diff --git a/internal/topsrv/push_test.go b/internal/topsrv/push_test.go index 6e3273f..00271f0 100644 --- a/internal/topsrv/push_test.go +++ b/internal/topsrv/push_test.go @@ -108,7 +108,8 @@ func TestPusherSpoolTrim(t *testing.T) { } p := NewPusher(embedlog.Logger{}, "topsrv", "test", PushConfig{SpoolDir: dir}, nil) - p.trimSpool() + // spool() writes one more file then trims down to pushMaxSpoolSize. + p.spool(context.Background(), []byte("data")) files, _ := filepath.Glob(filepath.Join(dir, "*.gz")) assert.Len(t, files, pushMaxSpoolSize)