Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions internal/chplan/scan.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package chplan

// Scan reads rows from a single ClickHouse table. If Columns is empty, the
// Scan reads rows from a ClickHouse table. If Columns is empty, the
// emitter projects `*` and downstream Project nodes can narrow it; otherwise
// only the listed columns are read.
//
Expand All @@ -9,10 +9,30 @@ package chplan
// for synthetic single-row sources like `system.one` that the no-arg
// date functions scan. Empty Database emits just the bare table name,
// matching the original behaviour for every user-facing table.
//
// UnionTables — non-empty when the scan resolves across multiple
// physical tables that share the metric-row shape (MetricName, Attributes,
// TimeUnix, Value). The PromQL `__name__` matcher path uses this for
// unsuffixed names that could live in either the Gauge or Sum table:
// OTel hostmetrics / sqlquery emitters ship cumulative sums under bare
// names (`system_cpu_time`, `clickhouse_event`) that the Prom-naming
// heuristic alone routes to the Gauge table, returning empty. When
// UnionTables is set, the emitter renders a `(SELECT <projection> FROM
// <t_i> UNION ALL ...)` over the listed tables — one row stream per
// arm. Table is left empty in this mode; the emitter rejects the
// ambiguous case where both Table and UnionTables are set.
//
// Each arm is projected through the union-stable column subset
// (MetricName, Attributes, TimeUnix, Value, StartTimeUnix, ServiceName)
// so the Sum-only columns (AggregationTemporality, IsMonotonic) don't
// break the union's column-list match. Downstream wrappers (LWR,
// RangeWindow) read only the common columns, so the projection narrow
// is lossless for the metric-row pipeline.
type Scan struct {
Database string
Table string
Columns []string
Database string
Table string
UnionTables []string
Columns []string
}

func (*Scan) planNode() {}
Expand All @@ -21,13 +41,24 @@ func (*Scan) Children() []Node { return nil }

func (s *Scan) Equal(other Node) bool {
o, ok := other.(*Scan)
if !ok || s.Database != o.Database || s.Table != o.Table || len(s.Columns) != len(o.Columns) {
if !ok || s.Database != o.Database || s.Table != o.Table {
return false
}
if len(s.Columns) != len(o.Columns) {
return false
}
for i := range s.Columns {
if s.Columns[i] != o.Columns[i] {
return false
}
}
if len(s.UnionTables) != len(o.UnionTables) {
return false
}
for i := range s.UnionTables {
if s.UnionTables[i] != o.UnionTables[i] {
return false
}
}
return true
}
115 changes: 113 additions & 2 deletions internal/chsql/emit_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (e *emitter) splice(b *Builder) {
}

func (e *emitter) emitScan(s *chplan.Scan) error {
if err := validateScanShape(s); err != nil {
return err
}
sb := NewQuery().From(scanTableFrag(s))
if len(s.Columns) > 0 {
cols := make([]Frag, 0, len(s.Columns))
Expand All @@ -76,6 +79,19 @@ func (e *emitter) emitScan(s *chplan.Scan) error {
return nil
}

// validateScanShape enforces the mutual exclusion between Scan.Table and
// Scan.UnionTables. Both empty is also rejected — the emitter has no
// table to read from.
func validateScanShape(s *chplan.Scan) error {
if s.Table == "" && len(s.UnionTables) == 0 {
return fmt.Errorf("chsql: Scan has neither Table nor UnionTables set")
}
if s.Table != "" && len(s.UnionTables) > 0 {
return fmt.Errorf("chsql: Scan has both Table=%q and UnionTables=%v set; pick one", s.Table, s.UnionTables)
}
return nil
}

// emitOneRow renders `SELECT 1` — a no-FROM SELECT that ClickHouse
// evaluates to a single literal row. Used by chplan.Project over
// chplan.OneRow to materialise PromQL `time()` / `vector(scalar)` /
Expand Down Expand Up @@ -130,7 +146,7 @@ func (e *emitter) emitStepGrid(g *chplan.StepGrid) error {
// up to and including End. Matches Prom's range-query step grid
// (the upstream evaluator emits (end-start)/step + 1 samples per
// series in the canonical case).
numAnchors := (g.End.Sub(g.Start).Nanoseconds())/stepNS + 1
numAnchors := g.End.Sub(g.Start).Nanoseconds()/stepNS + 1
if numAnchors < 1 {
numAnchors = 1
}
Expand Down Expand Up @@ -183,13 +199,95 @@ func (e *emitter) emitCrossJoin(j *chplan.CrossJoin) error {
// quoted table name; when Database is non-empty (synthetic single-row
// sources like `system.one`) it emits the qualified `<db>`.`<tbl>` shape
// so CH resolves the system database directly.
//
// When UnionTables is non-empty (the OTel-hostmetrics / sqlquery fallback
// the PromQL matcher path uses for unsuffixed names), the table reference
// renders as a `merge(currentDatabase(), '<regex>')` table function call.
// CH's `merge()` reads the union of all tables in the named database
// whose name matches the regex, projecting only the columns common to
// every member — the gauge / sum / histogram tables share the metric-row
// quadruple (MetricName, Attributes, TimeUnix, Value) plus all the
// envelope columns (Resource* / Scope* / ServiceName / StartTimeUnix /
// Flags / Exemplars), so the union covers everything the downstream
// LWR / RangeWindow / projection wrappers reference. The Sum-only
// columns (AggregationTemporality, IsMonotonic) drop out of the merged
// view; no metric-row consumer reads them, so the narrow is safe.
//
// The merge() call gets per-arm PREWHERE granule pruning automatically:
// CH translates a top-level predicate into the underlying tables'
// PREWHERE during the ReadFromMerge planning step (verified via
// `EXPLAIN`). The emitter therefore doesn't have to manually fan
// PREWHERE per arm — the legacy emitFilterScan path drives the single
// PREWHERE on the outer SELECT and CH does the rest.
func scanTableFrag(s *chplan.Scan) Frag {
if len(s.UnionTables) > 0 {
return mergeTableFrag(s.Database, s.UnionTables)
}
if s.Database != "" {
return Qual(s.Database, s.Table)
}
return Col(s.Table)
}

// mergeTableFrag renders the CH `merge(currentDatabase(), '<regex>')`
// table-function call that backs Scan.UnionTables. The database argument
// uses `currentDatabase()` when the Scan's Database is empty so the
// fanout follows whichever database the connection-time setting selected
// (cerberus's clickhouse-go client opens its session against the
// configured CERBERUS_CH_DATABASE — `otel` by default). When the Scan
// explicitly names a Database, that literal is used directly.
//
// The regex anchors at both ends (`^…$`) so the table-name pattern
// matches only the exact members of UnionTables — a stray
// `otel_metrics_gauge_v2` (or whatever) won't accidentally pull into
// the scan. Pipe-separated alternation enumerates the member names,
// each `regexp.QuoteMeta`-escaped against accidental metacharacters
// (the OTel-CH defaults are all plain `[a-z_0-9]+` so escapeing is a
// no-op in practice but the safety net is cheap).
func mergeTableFrag(db string, tables []string) Frag {
dbArg := "currentDatabase()"
if db != "" {
dbArg = "'" + escapeSingleQuotes(db) + "'"
}
escaped := make([]string, len(tables))
for i, t := range tables {
escaped[i] = regexQuoteMeta(t)
}
regex := "^(" + strings.Join(escaped, "|") + ")$"
return func(b *Builder) {
b.sb.WriteString("merge(")
b.sb.WriteString(dbArg)
b.sb.WriteString(", '")
b.sb.WriteString(escapeSingleQuotes(regex))
b.sb.WriteString("')")
}
}

// escapeSingleQuotes doubles every single-quote in s so it can be
// embedded inside a single-quoted SQL string literal.
func escapeSingleQuotes(s string) string {
return strings.ReplaceAll(s, "'", "''")
}

// regexQuoteMeta returns s with every RE2 regex metacharacter escaped.
// We don't import `regexp` solely for QuoteMeta — the OTel-CH default
// table names are all plain `[a-z_0-9]+`, and the override surface in
// `internal/schema/otel.go` is config-driven so a user could in principle
// supply a name with a regex metacharacter. The escape list covers the
// RE2 surface CH's `merge()` regex argument understands.
func regexQuoteMeta(s string) string {
const meta = `\.+*?()|[]{}^$`
var out strings.Builder
out.Grow(len(s))
for _, r := range s {
if strings.ContainsRune(meta, r) {
out.WriteByte('\\')
}
out.WriteRune(r)
}
return out.String()
}

func (e *emitter) emitFilter(f *chplan.Filter) error {
// Pre-flight the predicate so a chplan error surfaces here, not
// inside the Where-render callback (where the error has no path
Expand Down Expand Up @@ -230,7 +328,20 @@ func (e *emitter) emitFilter(f *chplan.Filter) error {
// promotion always activates in that case when the table shape has
// wide columns registered.
func (e *emitter) emitFilterScan(f *chplan.Filter, scan *chplan.Scan) error {
shape := tableShapeFor(scan.Table)
if err := validateScanShape(scan); err != nil {
return err
}
// For a UnionTables scan every member table shares the metric-row
// shape (the OTel-CH metrics tables all order by (ServiceName,
// MetricName, Attributes, TimeUnix) and carry the same wide columns
// — see internal/chsql/tableshape.go). Resolving against the first
// member is correct for shape lookup; the PREWHERE/WHERE split CH
// then translates uniformly to every arm of the merge() fanout.
shapeKey := scan.Table
if shapeKey == "" && len(scan.UnionTables) > 0 {
shapeKey = scan.UnionTables[0]
}
shape := tableShapeFor(shapeKey)
conjuncts := flattenAnd(f.Predicate)
conjuncts = orderedConjuncts(conjuncts, shape)

Expand Down
60 changes: 55 additions & 5 deletions internal/promql/lower.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,24 @@ func lower(expr parser.Expr, s schema.Metrics, ctx lowerCtx) (chplan.Node, error
// itself.
func lowerVectorSelector(v *parser.VectorSelector, s schema.Metrics, ctx lowerCtx) (chplan.Node, error) {
metricName := metricNameFromMatchers(v.LabelMatchers)
table := s.GaugeTable
// Resolve the candidate physical tables for this matcher.
//
// `TablesFor` admits the OTel-emitter reality that cerberus's
// original suffix heuristic (`TableFor`) missed: hostmetrics /
// sqlquery / prometheus-self ship cumulative sums under bare names
// (`system_cpu_time`, `clickhouse_event`, `otelcol_process_uptime`)
// that the Prom convention reserves for gauges. Returning the
// (Gauge, Sum) pair for unsuffixed names lets the scan resolve
// against either physical layout — the MetricName PREWHERE makes
// the empty arm cost-free.
//
// Suffixed names (`_total` / `_count` / `_sum` / `_bucket`) still
// route to a single table via TableFor; histogram-companion +
// bucket selectors below override to the histogram table without
// touching the union path. Existing fixtures stay byte-stable.
tables := []string{s.GaugeTable}
if metricName != "" {
table = s.TableFor(metricName)
tables = s.TablesFor(metricName)
}

// Classic-histogram companion routing: `<base>_count` / `<base>_sum`
Expand All @@ -158,18 +173,18 @@ func lowerVectorSelector(v *parser.VectorSelector, s schema.Metrics, ctx lowerCt
var bucketSuffixed string
var bucketLeMatchers []*labels.Matcher
if bareBucket, ok := isClassicBucketSelector(metricName, s); ok {
table = s.HistogramTable
tables = []string{s.HistogramTable}
bucketSuffixed = metricName
var scanMatchers []*labels.Matcher
scanMatchers, bucketLeMatchers = splitBucketMatchers(matchers, bareBucket)
matchers = scanMatchers
} else if bare, col, ok := s.HistogramCompanionColumn(metricName); ok && s.HistogramTable != "" {
table = s.HistogramTable
tables = []string{s.HistogramTable}
matchers = rewriteMetricName(matchers, bare)
companionValueColumn = col
}

scan := &chplan.Scan{Table: table}
scan := scanFromTables(tables)

pred := buildPredicate(matchers, s)
// Build the input subtree the LWR / range-vector pipeline consumes.
Expand Down Expand Up @@ -405,6 +420,41 @@ func rewriteMetricName(matchers []*labels.Matcher, bareName string) []*labels.Ma
return out
}

// scanFromTables returns the chplan.Scan node for a metric-matcher
// lowering. A single-element `tables` slice routes to the legacy
// `Scan{Table: …}` shape so existing fixtures and emit paths remain
// byte-stable; a multi-element slice routes to `Scan{UnionTables: …}`
// which the chsql emitter renders as a CH `merge(currentDatabase(),
// '<regex>')` table function call (see `chsql.scanTableFrag`). The
// multi-element path supports the OTel-emitter case where a bare
// (unsuffixed) metric name could be either a Gauge or a cumulative
// Sum — the suffix heuristic alone can't disambiguate. The empty
// slice is treated as a Gauge-only fallback (the caller's default
// when the matcher carries no `__name__`).
func scanFromTables(tables []string) *chplan.Scan {
switch len(tables) {
case 0:
// Unreachable from the production caller (which always passes
// at least one candidate from schema.Metrics.TablesFor /
// GaugeTable) — but the defensive zero-element case keeps the
// helper total without panicking. Returning an empty Scan
// would surface a downstream emit-time validation error
// ("Scan has neither Table nor UnionTables set"), which is the
// correct failure mode if a future caller passes nil/empty.
return &chplan.Scan{}
case 1:
return &chplan.Scan{Table: tables[0]}
default:
// Defensive copy: the caller's slice may be a return from
// schema.Metrics.TablesFor whose backing array is shared with
// the schema. A downstream optimizer pass that wanted to
// in-place mutate UnionTables would corrupt the schema; the
// copy keeps the plan-tree slice independent.
owned := append([]string(nil), tables...)
return &chplan.Scan{UnionTables: owned}
}
}

// wrapInstantLatestPerSeries adds the LWR + staleness predicates on
// top of (scan, pred) and collapses to one row per `(MetricName,
// Attributes)` series via `argMax(Value, TimeUnix)`. The output
Expand Down
48 changes: 48 additions & 0 deletions internal/schema/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ func (m Metrics) HistogramCompanionColumn(metricName string) (bareName, valueCol
// `_bucket` suffixes are treated as cumulative (Sum table); everything else
// goes to the Gauge table. This is the same convention the Prometheus
// remote-write integration uses for OTel metrics.
//
// The single-table return is preserved for byte-stable SQL on the cases
// where the suffix heuristic is reliable. For ambiguous unsuffixed names —
// OTel hostmetrics receiver emits cumulative sums under bare names like
// `system_cpu_time` / `system_disk_io`, and the sqlquery receiver emits
// `clickhouse_event` — callers should consult [Metrics.TablesFor] which
// returns the (Gauge, Sum) pair so the matcher resolves against either
// physical layout. See TablesFor for the read-side rationale.
func (m Metrics) TableFor(metricName string) string {
for _, suf := range []string{"_count", "_total", "_sum", "_bucket"} {
if hasSuffix(metricName, suf) {
Expand All @@ -394,6 +402,46 @@ func (m Metrics) TableFor(metricName string) string {
return m.GaugeTable
}

// TablesFor returns the metric tables a PromQL `__name__` matcher may
// resolve against. Where [Metrics.TableFor] commits to a single table
// based on a Prom-naming suffix, TablesFor admits the OTel reality:
// upstream emitters (hostmetrics, sqlquery, prometheus/self) ship
// cumulative sums under bare names that the Prom convention reserves
// for gauges. The catalog endpoints (`/api/v1/series`,
// `/api/v1/label/...`) already UNION across all metric tables; the
// matcher-resolve side must do the same or the dashboard surface
// returns "Unable to fetch labels" for every system_/clickhouse_*
// metric whose data lives in `otel_metrics_sum` but whose name doesn't
// trip the suffix check.
//
// The return slice is the candidate set in stable order — Gauge first
// for byte-stable SQL on the simple-gauge case, then Sum for the
// hostmetrics-shaped fallback. The `_total` / `_count` / `_sum` /
// `_bucket` suffixed names route to a single table (Sum or, with
// HistogramCompanionColumn / isClassicBucketSelector, Histogram) and
// keep the single-element return so existing fixtures stay stable.
//
// Sum-without-suffix is the case the v0.1 heuristic missed; TablesFor
// fans the lookup across (Gauge, Sum) so the matcher finds rows
// regardless of which side actually stored them. The empty arm
// contributes zero rows under the MetricName PREWHERE — the union is a
// no-op for any metric whose name is unambiguous after the suffix
// check.
func (m Metrics) TablesFor(metricName string) []string {
for _, suf := range []string{"_count", "_total", "_sum", "_bucket"} {
if hasSuffix(metricName, suf) {
return []string{m.SumTable}
}
}
// Unsuffixed name: could be Gauge (the v0.1 default) OR Sum (the
// OTel-hostmetrics / sqlquery shape). Fan the scan across both so
// the matcher finds rows wherever the upstream emitter dropped them.
if m.SumTable != "" && m.SumTable != m.GaugeTable {
return []string{m.GaugeTable, m.SumTable}
}
return []string{m.GaugeTable}
}

func hasSuffix(s, suffix string) bool {
return len(s) >= len(suffix) && s[len(s)-len(suffix):] == suffix
}
Loading
Loading