Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions internal/chplan/equal_invariants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2004,3 +2004,103 @@ func TestMetricsHistogramOverTime_Equal_InnerNilBoth(t *testing.T) {
t.Errorf("both Inner nil with equal sibling fields should be Equal")
}
}

func TestUnionAll_Equal_Positive(t *testing.T) {
t.Parallel()
build := func() *chplan.UnionAll {
return &chplan.UnionAll{
Inputs: []chplan.Node{
&chplan.Scan{Table: "otel_metrics_histogram"},
&chplan.Scan{Table: "otel_metrics_sum"},
},
}
}
if !build().Equal(build()) {
t.Fatalf("identical UnionAll trees should be Equal")
}
}

func TestUnionAll_Equal_Negative_InputsLen(t *testing.T) {
t.Parallel()
a := &chplan.UnionAll{
Inputs: []chplan.Node{
&chplan.Scan{Table: "a"},
&chplan.Scan{Table: "b"},
},
}
b := &chplan.UnionAll{
Inputs: []chplan.Node{&chplan.Scan{Table: "a"}},
}
if a.Equal(b) {
t.Errorf("different Inputs length should not be Equal")
}
}

func TestUnionAll_Equal_Negative_InputContent(t *testing.T) {
t.Parallel()
a := &chplan.UnionAll{
Inputs: []chplan.Node{
&chplan.Scan{Table: "x"},
&chplan.Scan{Table: "y"},
},
}
b := &chplan.UnionAll{
Inputs: []chplan.Node{
&chplan.Scan{Table: "x"},
&chplan.Scan{Table: "z"},
},
}
if a.Equal(b) {
t.Errorf("differing per-arm subtree should not be Equal")
}
}

func TestUnionAll_Equal_Negative_InputOrder(t *testing.T) {
t.Parallel()
a := &chplan.UnionAll{
Inputs: []chplan.Node{
&chplan.Scan{Table: "a"},
&chplan.Scan{Table: "b"},
},
}
b := &chplan.UnionAll{
Inputs: []chplan.Node{
&chplan.Scan{Table: "b"},
&chplan.Scan{Table: "a"},
},
}
// Equal is positional — the emitted SQL byte-stream is
// order-sensitive even though UNION ALL is set-multiset
// commutative, so the Equal contract reflects the emit order.
if a.Equal(b) {
t.Errorf("differing per-arm order should not be Equal under positional semantics")
}
}

func TestUnionAll_Equal_Negative_OtherType(t *testing.T) {
t.Parallel()
u := &chplan.UnionAll{Inputs: []chplan.Node{&chplan.Scan{Table: "a"}}}
other := &chplan.Scan{Table: "a"}
if u.Equal(other) {
t.Errorf("UnionAll.Equal must reject non-UnionAll node types")
}
}

func TestUnionAll_Children_ReturnsCopy(t *testing.T) {
t.Parallel()
inner := &chplan.Scan{Table: "a"}
u := &chplan.UnionAll{Inputs: []chplan.Node{inner, &chplan.Scan{Table: "b"}}}
children := u.Children()
if len(children) != 2 {
t.Fatalf("Children() len = %d; want 2", len(children))
}
if children[0] != inner {
t.Errorf("Children()[0] should preserve the underlying node identity")
}
// Mutating the returned slice must not bleed back into the
// UnionAll's Inputs.
children[0] = &chplan.Scan{Table: "tampered"}
if u.Inputs[0] != inner {
t.Errorf("Children() must return a copy, not the underlying slice")
}
}
71 changes: 71 additions & 0 deletions internal/chplan/union_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package chplan

// UnionAll concatenates the row streams of two or more sibling subtrees
// without deduplication. Lowers to a `(SELECT …) UNION ALL (SELECT …)
// [UNION ALL (SELECT …)]` chain — every arm renders as a parenthesised
// SELECT so the union remains binding-safe regardless of arm shape.
//
// Used by the PromQL classic-histogram companion-suffix routing where a
// `<base>_count` / `<base>_sum` reference can resolve against EITHER:
//
// - the OTel-CH histogram row (a single row written under the bare
// `<base>` name with `Count` / `Sum` columns), the convention the
// OTel exporter uses; OR
// - the OTel-CH sum row (a row written under the suffixed
// `<base>_count` / `<base>_sum` name), the shape hostmetrics /
// sqlquery emitters produce for counts that aren't actually
// histogram companions (`system_cpu_logical_count`,
// `system_processes_count`, `system_filesystem_inodes_count`, …).
//
// The two arms produce non-overlapping rows by construction — each
// scan-side MetricName filter only admits rows from one physical layout —
// so UNION ALL is correct: no row appears in both arms, and a `DISTINCT`
// would be a wasteful no-op. The two physical tables have different
// column shapes (histogram lacks `Value`; sum lacks `Count`), so the
// existing single-`merge()` UnionTables path can't fan them; each arm
// owns its own Project that synthesises the canonical Sample-row
// quadruple (MetricName, Attributes, TimeUnix, Value).
//
// Every Inputs element MUST project the same output column shape
// — the chsql emitter relies on positional column matching across
// UNION ALL arms (ClickHouse's behaviour). The PromQL lowering
// guarantees this by feeding each arm through the same canonical
// Sample-row Project.
type UnionAll struct {
// Inputs lists the per-arm subtrees in stable left-to-right order.
// Empty / single-arm UnionAlls are an invariant violation — the
// emitter rejects them so the lowering cannot accidentally produce
// a degenerate union.
Inputs []Node
}

func (*UnionAll) planNode() {}

// Children returns the per-arm subtrees in stable order. Matches the
// Node interface's depth-first visitor contract.
func (u *UnionAll) Children() []Node {
out := make([]Node, len(u.Inputs))
copy(out, u.Inputs)
return out
}

// Equal reports structural equality with another UnionAll. Order is
// significant: `(A) UNION ALL (B)` and `(B) UNION ALL (A)` produce the
// same multiset, but the emitted SQL byte-stream differs, so the
// equality is positional to match the canonical fixture-comparison
// semantics every other plan node uses.
func (u *UnionAll) Equal(other Node) bool {
o, ok := other.(*UnionAll)
if !ok {
return false
}
if len(u.Inputs) != len(o.Inputs) {
return false
}
for i := range u.Inputs {
if !u.Inputs[i].Equal(o.Inputs[i]) {
return false
}
}
return true
}
2 changes: 2 additions & 0 deletions internal/chsql/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func (e *emitter) emitNode(n chplan.Node) error {
return e.emitCrossJoin(v)
case *chplan.SetOperation:
return e.emitSetOperation(v)
case *chplan.UnionAll:
return e.emitUnionAll(v)
default:
return fmt.Errorf("%w: node %T", ErrUnsupported, n)
}
Expand Down
32 changes: 32 additions & 0 deletions internal/chsql/emit_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,38 @@ func (e *emitter) emitStepGrid(g *chplan.StepGrid) error {
// `_cerb_n` on the right; the outer Filter reads both by bare name
// (no collision), so the L/R aliases are inert beyond satisfying
// the parser invariant.
// emitUnionAll renders an N-way UNION ALL of the per-arm subtrees.
// Every arm renders via subqueryFrag so the per-arm SELECT lands inside
// parentheses (matching ClickHouse's `(SELECT …) UNION ALL (SELECT …)`
// shape), with arg-binding positions preserved across the union by the
// recursive subquery emit. The arms are emitted in stable left-to-right
// order so the byte-stream matches the chplan IR snapshot's ordering.
//
// Zero arms is a programmer error (the lowering should never produce a
// UnionAll with no inputs); a single arm renders as just that arm's
// parenthesised subquery — the `UNION ALL` keyword is omitted because
// CH parses a bare `(SELECT …)` as a valid SELECT statement (matches
// chsql.UnionAll's Frag-level behaviour for the same shape).
//
// Used by the PromQL classic-histogram companion-suffix routing
// (internal/promql/lower.go) to fan a `_count` / `_sum` selector
// across the histogram + sum tables when both physical layouts may
// hold matching rows. See chplan.UnionAll for the structural contract.
func (e *emitter) emitUnionAll(u *chplan.UnionAll) error {
if len(u.Inputs) == 0 {
return fmt.Errorf("%w: UnionAll has no inputs", ErrUnsupported)
}
for i, in := range u.Inputs {
if i > 0 {
e.b.WriteString(" UNION ALL ")
}
if err := e.emitSubquery(in); err != nil {
return err
}
}
return nil
}

func (e *emitter) emitCrossJoin(j *chplan.CrossJoin) error {
leftSub, err := e.subqueryFrag(j.Left)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions internal/optimizer/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,26 @@ func rewriteChildren(n chplan.Node, fn func(chplan.Node) (chplan.Node, bool)) (c
cp.KExpr = newKExpr
}
return &cp, true
case *chplan.UnionAll:
// Recurse into each arm so existing optimizer rules
// (constant-fold, PREWHERE promotion, etc.) can rewrite the
// per-arm Project(Filter(Scan)) subtrees the PromQL
// classic-histogram companion-suffix lowering emits.
newInputs := make([]chplan.Node, len(v.Inputs))
changed := false
for i, in := range v.Inputs {
newIn, ch := fn(in)
if ch {
changed = true
}
newInputs[i] = newIn
}
if !changed {
return v, false
}
cp := *v
cp.Inputs = newInputs
return &cp, true
}
return n, false
}
Loading
Loading