Skip to content

feat(qwp): add QWP WebSocket transport (store-and-forward ingest, typed errors, failover, query client)#62

Open
mtopolnik wants to merge 243 commits into
mainfrom
mt_qwp-egress
Open

feat(qwp): add QWP WebSocket transport (store-and-forward ingest, typed errors, failover, query client)#62
mtopolnik wants to merge 243 commits into
mainfrom
mt_qwp-egress

Conversation

@mtopolnik

@mtopolnik mtopolnik commented Apr 24, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds the QWP WebSocket transport to the Go client end to end. QWP is
QuestDB's binary columnar wire protocol — distinct framing and codecs
from ILP, and the only transport that exposes the full QuestDB type
system (int8/16/32, float32, char, date, nanosecond timestamps, uuid,
varchar, geohash, int64 arrays, fixed-width decimals).

The scope is the whole transport, both directions:

  • Ingest rebuilt on a cursor-engine architecture, with an opt-in
    disk-backed store-and-forward layer, a typed SenderError
    API, and multi-host failover.
  • Query/exec (QwpQueryClient) — streaming columnar result batches
    back from the server, with transparent reconnect-and-replay.

Plus README + CLAUDE.md documentation for the whole transport and
runnable examples.

Note: scope grew well past the original "egress only" framing — hence
the retitle from feat(ilp): add QWP egress over WebSocket. QWP is not
a version of ILP, so the scope is qwp, not ilp.


Ingest: cursor-engine architecture

The previous async sender (qwp_sender_async.go) is removed and
replaced by a cursor engine + dedicated send-loop goroutine
(qwp_sender_cursor.go, qwp_sf_*.go). All QWP wire I/O — memory-
backed and disk-backed — now goes through one path:

  • The producer encodes a batch into the cursor engine via a blocking
    append; the send-loop goroutine drains it to the WebSocket, parses
    ACKs, advances the acked FSN, and owns reconnect + replay.
  • Cursor frames are self-sufficient — full schema definitions plus
    the full symbol dictionary from id 0 on every flush. This is what
    makes reconnect/replay/orphan-adoption safe across a fresh server
    connection.
  • WithInFlightWindow(n) / in_flight_window=n is retained but a
    no-op
    — backpressure is now governed by the engine's segment ring
    and the append deadline. Connect strings carrying it still parse.
  • Flush semantics: Flush, FlushAndGetSequence, and auto-flush all
    publish into the cursor engine and return without waiting for the
    server ACK
    (Java spec — design/qwp-cursor-durability.md decision
    feat(client): add table and column name length validation #1: "flush() never waits for ACK; ACKs are async"). Durability is
    anchored in local persistence (in-RAM segment for memory mode,
    on-disk for SF); the send loop delivers + replays in the background.
    FlushAndGetSequence returns the published FSN (the upper bound of
    any SenderError.ToFsn for that batch); pair it with AwaitAckedFsn
    — the only API that blocks on server acknowledgement — to confirm
    server-side commit.

BenchmarkQwpSenderSteadyState is still 0 allocs/op on the
Table→Symbol→Column→At hot path (pinned by a test).

Store-and-forward (SF) — disk-backed durable buffering

Opt-in via sf_dir on a ws:: / wss:: connection. Outgoing batches
are persisted to mmap'd segment files before they leave the wire; the
send loop replays from disk across transient disconnects and process
restarts. Brief outages are invisible to user code; an unrecoverable
failure surfaces on the next At / AtNow / Flush.

  • Slot at <sf_dir>/<sender_id>/, guarded by an advisory exclusive
    flock (.lock + .lock.pid sidecar) so two senders can't share a
    slot; released on process exit.
  • Crash recovery: on restart with the same sf_dir + sender_id,
    opens existing segments, validates per-frame CRC32C, recovers a torn
    tail at the last good frame, and resumes from a persisted
    ack-watermark.
  • Pre-allocated disk blocks via per-platform fallocate
    (darwin / linux / other-unix) with a Windows file backend.
  • Orphan-slot adoption (drain_orphans=on): scans sibling slots
    under sf_dir, adopts any holding unacked data on a separate
    connection via background drainers (capped by max_background_drainers,
    visible through QwpSender.BackgroundDrainers()), and drops a
    .failed sentinel if a drainer can't make progress.
  • sf_dir unset = the same engine, memory-backed segments only.
  • Full fsync-durability mode (sf_durability=flush/append,
    request_durable_ack=on) is parsed and validated but deferred
    these keys currently report "not yet supported"; the ack-watermark
    groundwork is in place.

Typed SenderError API

QWP server rejections surface as *SenderError (sender_error.go,
sender_error_handler.go) carrying a stable Category
(SCHEMA_MISMATCH, PARSE_ERROR, INTERNAL_ERROR, SECURITY_ERROR,
WRITE_ERROR, PROTOCOL_VIOLATION, UNKNOWN), the server message, the
AppliedPolicy, and a [FromFsn, ToFsn] span (join against
FlushAndGetSequence to pinpoint the rejected rows).

  • Two delivery paths, same payload: an async callback via
    WithErrorHandler (dead-letter for DROP_AND_CONTINUE), and the
    producer-side typed error via errors.As after Flush /
    FlushAndGetSequence (after a HALT).
  • Each category resolves to a PolicyHALT (latch; sender does not
    resume — close + rebuild is the supported recovery, matching Java) or
    DROP_AND_CONTINUE (drop the rejected span, keep going, recover via
    the async handler).
  • Resolution precedence (highest first): WithErrorPolicyResolver
    WithErrorPolicy(category, policy) → connect-string
    on_<category>_erroron_server_error → spec defaults.
    PROTOCOL_VIOLATION and UNKNOWN are always HALT and not
    user-configurable.
  • New options: WithErrorHandler, WithErrorPolicy,
    WithErrorPolicyResolver, WithErrorInboxCapacity. Connect-string
    equivalents: on_server_error (halt/drop/auto),
    on_{schema,parse,internal,security,write}_error (halt/drop),
    error_inbox_capacity.

Multi-host failover & SERVER_INFO (v2)

Ports the v2 egress feature set from the Java reference client; the Go
client advertises qwpMaxSupportedVersion=2 and participates in the v2
control plane. Failover now covers ingest as well as query.

  • SERVER_INFO frame (0x18) decoded into a public QwpServerInfo
    (role, epoch, capabilities, server_wall_ns, cluster_id, node_id).
    Read inside connect() after the upgrade when negotiated version ≥ 2
    and the caller opted in (serverInfoTimeout > 0); ingest senders
    default opted-out so v1 servers keep working unchanged.
  • Multi-endpoint addr= accepts a comma-separated list (or repeated
    addr=). Host tracker (qwp_host_tracker.go) walks endpoints in
    priority order — no shuffle/load-balance — with cancellable
    full-jitter exponential backoff. target (any/primary/replica)
    filters by cluster role; zone is an opaque locality hint (effective
    on the query side; accepted-but-inert for ingest).
  • Reconnect budget knobs: reconnect_max_duration_millis,
    reconnect_initial_backoff_millis, reconnect_max_backoff_millis,
    initial_connect_retry (off / on|sync / async),
    close_flush_timeout_millis. Options: WithReconnectPolicy,
    WithInitialConnectRetry, WithInitialConnectMode,
    WithCloseFlushTimeout, WithSfDir, WithSenderId, WithSfMaxBytes,
    WithSfMaxTotalBytes.
  • Transparent reconnect-and-replay for SELECTs: the dying generation
    is torn down on transport-terminal events, endpoints are walked from
    the next index with the same role filter, the new generation is
    published atomically and resubmitted with a fresh request_id.
    Batches() yields a non-fatal *QwpFailoverReset between
    generations — consumer pattern: errors.As(err, &reset); acc.Discard(); continue.
  • Exec replay is opt-in (WithQwpQueryReplayExec(true)) so
    non-idempotent INSERT/UPDATE/DELETE/DDL don't double-execute.
  • Typed errors: QwpRoleMismatchError (with SawV1Mismatch,
    LastObserved *QwpServerInfo, Unwrap), QwpFailoverExhaustedError,
    QwpFailoverReset. All transport-class faults latch ioErr so a
    poisoned connection can't be reused.

Query/exec: QwpQueryClient

Query side over /read/v1 — streaming columnar result batches back from
the server.

  • NewQwpQueryClient(ctx, opts...) / QwpQueryClientFromConf.
    Options cover address/endpoints, auth (raw header, basic, bearer),
    compression (raw|zstd|auto), buffer-pool depth, initial credit,
    max rows per batch, plus the failover knobs above.
  • Query(ctx, sql, opts...) (*QwpQuery, error) streams
    *QwpResultBatch via a Go 1.23 range-over-func iterator. Typed column
    accessors plus bulk Range* readers (no-null columns lower to a
    single memmove); batches are deep-copyable for use beyond the
    iteration step. Exec for non-SELECT.
  • Per-call typed bind parameters via WithQueryBinds(QwpBindFunc)
    $1, $2, …, ascending-index setters, latched error surfaced through
    the result, capped at the spec's 1024 binds.
  • Cancellation: per-query Cancel + client Close, both cooperating
    with mid-flight batches and already-expired caller ctx (independent
    5s drain, matching Java's shutdownJoinMs). QwpQueryClient is not
    safe for concurrent Query/Exec; Cancel/Close are.

Wire-protocol additions

  • Egress message kinds: QUERY_REQUEST, RESULT_BATCH, RESULT_END,
    QUERY_ERROR, CANCEL, CREDIT, EXEC_DONE; connection-scoped
    CACHE_RESET (0x17) for server-driven symbol-dict / schema-cache
    eviction; SERVER_INFO (0x18).
  • FLAG_ZSTD for zstd-compressed batches; preference negotiated via the
    X-QWP-Accept-Encoding upgrade header.
  • Gorilla-compressed timestamp columns decoded on the egress side.
  • Decoder-only BINARY and IPv4 so arbitrary SELECTs (pg_catalog
    views etc.) decode cleanly. New status codes CANCELLED,
    LIMIT_EXCEEDED.
  • Separate endpoints: /read/v1 (query), /write/v4 (ingest);
    qwpTransport no longer hard-codes the path.

Decoder hardening & spec compliance

  • Hard caps on per-batch rows, table/column name length, array
    dimensionality, and array element count; table_count enforced.
  • VARCHAR offset validation; nDims=0 arrays rejected; Gorilla overrun
    returns a decode error instead of corrupting state.
  • Spec frame-size and version-byte checks on every inbound frame; cursor
    walk guarded against corrupt payloadLen / segment frames.
  • Outgoing SQL preflighted against the spec's 1 MiB cap.
  • Decoder errors poison the client so a half-corrupt connection can't be
    reused.

Hot-path perf

  • Inlined varint fast path (symbol-id and delta-entry loops),
    byte-iterating null-bitmap decode, memmove-based Float64Array /
    Int64Array decode, 8-byte LE Gorilla refills, preallocated layout
    slice, three further query-decoder cuts. Java QWP query benchmarks
    ported (bench/qwp-egress-read, bench/qwp-egress-read-wide).

Config-string additions

conf_parse.go (single source of truth) gains: sf_dir, sender_id,
sf_max_bytes, sf_max_total_bytes, sf_durability,
sf_append_deadline_millis, reconnect_max_duration_millis,
reconnect_initial_backoff_millis, reconnect_max_backoff_millis,
initial_connect_retry, close_flush_timeout_millis, drain_orphans,
max_background_drainers, on_server_error, on_{schema,parse,internal, security,write}_error, error_inbox_capacity, request_durable_ack,
durable_ack_keepalive_interval_millis, target, zone, plus
max_name_len (exposes WithFileNameLimit for parity with the Java and
Rust clients — both require >= 16; the Go client now matches).
Duplicate keys are now rejected for parser parity with Rust.

Docs, examples, CI

  • README: new top-level QWP section (Quickstart switch to ws/wss,
    QWP-only column types via QwpSender, flush/backpressure, auth, error
    handling, multi-host failover, store-and-forward) and a "Querying with
    QwpQueryClient" subsection.
  • CLAUDE.md rewritten to describe the cursor-engine + SF architecture,
    error-handling policy precedence, and the testing invariants.
  • New examples (rendered by examples.manifest.yaml on questdb.io):
    examples/qwp/basic, examples/qwp/sf, examples/qwp/basic-query.
  • CI: new binary-check.yml (guards against committed binaries), Go
    test runs now use -race, Go-version matrix actually tests 1.23.

Benchmarks

This PR contributes 13 go test benchmarks plus 2 standalone
Java-ported bench programs
.

Here's how to run the basic benchmark (medium-wide schema, 100,000,000 rows):

$ cd bench/qwp-egress-read
$ go run . -rows 100000000

Here are the results from a local run (both client and server on the same Mac M1 laptop):

Protocol                 time(ms)     rows/sec      MiB/sec
--------                 --------     --------      -------
QWP egress (WS)              2458     40680038      1003.18
PostgreSQL wire             19219      5203153       296.38
HTTP /exec JSON             30706      3256617       211.89

Egress query latency — live server (qwp_egress_bench_test.go,
skipped when no local server is reachable; Go counterparts of the Java
JMH latency benchmarks, reporting p50/p90/p99/p99.9 via
b.ReportMetric):

  • BenchmarkQwpEgressLatency — single query round-trip wall time
    (client opened once, reused). Default SELECT 1 is the parse +
    protocol round-trip floor; QDB_BENCH_SQL folds in storage/cursor
    cost.
  • BenchmarkQwpEgressBindLatency — same round-trip with a
    bind-variable query, isolating bind encode/decode + select-cache
    lookup against the literal SELECT 1.

Decoder hot path — in-process (qwp_bench_test.go):

  • BenchmarkQwpGorillaDecode — Gorilla delta-of-delta timestamp-column
    decode throughput; regression gate for the 8-byte LE refill in
    qwpBitReader. Sub-cases ConstantDelta, SmallJitter, WideJitter.

Result-batch accessor micro-benchmarks — in-process
(qwp_query_batch_perf_test.go) — measure the QwpColumnBatch /
QwpColumn decode-side accessors across access patterns:

  • BenchmarkBatchInt64PerCell / BenchmarkColumnInt64PerCell
    per-cell via the (col,row) batch surface vs. a hoisted column
    handle.
  • BenchmarkInt64RangeNoNulls / BenchmarkInt64RangeWithNulls — bulk
    Int64Range fast path (no nulls) vs. the per-row scalar loop (nulls).
  • BenchmarkBatchMultiColRowMajor / BenchmarkColumnMultiColRowMajor
    — wide (16-column) row-major scan, batch surface vs. hoisted handles.
  • BenchmarkBatchColumnMajor / BenchmarkColumnMajorHandle
    column-major scan, batch surface vs. per-column handle.
  • BenchmarkColumnMajorRange / BenchmarkColumnMajorRangePure
    column-major Int64Range with vs. without a per-row consumer (the
    bulk-read upper bound).

Standalone Java-ported bench programs (each its own go.mod, run as
main; build-gated in CI) — faithful 1:1 ports of the Java
application-style egress benchmarks; each compares QWP egress vs.
PG-wire vs. HTTP /exec:

  • bench/qwp-egress-read — port of QuestDB's QwpEgressReadBenchmark
    (narrow 5-column row); source of the headline number above. This is
    the only incarnation of the read benchmark — there is
    intentionally no BenchmarkQwpEgressRead go test twin, which would
    merely re-measure the same QWP read path the standalone already
    covers.
  • bench/qwp-egress-read-wide — port of QwpEgressReadBenchmarkWide
    (15-column row: five extra DOUBLEs + five high-cardinality SYMBOLs).

(BenchmarkQwpSenderSteadyState / …Nulls, still pinned at 0 allocs/op
on the ingest hot path, predate this PR — they landed with the initial
QWP ingress in #60 and are preserved, not added, here.)

Test plan

Verified on this branch: Go 1.26, race detector, live QuestDB at
localhost:9000. Docker was unavailable in the verification
environment, so the testcontainers-go ILP suite (TestIntegrationSuite
— ILP over HTTP/TCP, haproxy) was not run here and should be run
where Docker is available before merge.

  • go build ./..., go vet ./..., staticcheck@v0.7.0 ./...
    all clean
  • go test -race -count=1 ./... with -skip TestIntegrationSuite
    — the entire root package passes under the race detector (~53s).
    The live-server QWP ingest / query / error-API integration suites
    genuinely execute (not skipped) and pass.
  • TestIntegrationSuite (testcontainers-go: ILP over HTTP/TCP,
    haproxy) — requires Docker; not run in this environment, to be
    run before merge.
  • BenchmarkQwpSenderSteadyState -benchmem0 B/op, 0 allocs/op; TestQwpSenderSteadyStateZeroAllocs passes (ingest
    hot path stays allocation-free).
  • Cursor engine + SF: segment ring, allocate/fallocate, classify,
    round-walk, ack-watermark, manager trim, lock sidecar, dispatcher,
    drainer, orphan adoption, send-loop spin/Close-race/timer-leak
    regressions (qwp_sf_*_test.go)
  • Typed error API: category/policy resolution precedence, async vs
    sync delivery, HALT latching, plus live-server integration
    (sender_error_test.go, qwp_error_api_*,
    qwp_error_resilience_test.go)
  • Multi-host failover: host-tracker walk, yield-and-resume,
    max-attempts exhaustion, mid-backoff cancel, server QUERY_ERROR
    not retried, Exec replay opt-in vs default
    (qwp_failover_test.go, qwp_host_tracker_test.go)
  • Query client: streaming batches, typed binds, Cancel during
    iteration, Close during in-flight batch, ctx expiry mid-wait
  • Three ported Java QWP test contracts pass under -race:
    TestQwpEgressIOInPlaceDecodeAliasing,
    TestQwpEgressIOReleaseClosePoolRace, and the QwpConstantsTest
    group (TestQwpFlagBitPositions / TestQwpHeaderSize /
    TestQwpMaxColumnsPerTable / TestQwpIsFixedWidthType). The
    original fourth — the QwpDeltaDictRollbackTest analog,
    TestQwpSyncFlushFailureDoesNotAdvanceMaxSentSymbolId — was
    intentionally removed (commit d3a9744): self-sufficient cursor
    framing ships the full schema + symbol-dict from id 0 on every
    flush, which voids the delta-rollback premise, and the contract
    was deleted upstream on the Java side as well.

mtopolnik and others added 27 commits April 22, 2026 09:18
When the caller's context expired while QwpQuery.Batches() or
QwpQueryClient.Exec was blocked in takeEvent, the iterator yielded
(or Exec returned) without sending CANCEL or draining the remaining
events. The Batches() defer still set q.done = true, so the caller's
defer q.Close() short-circuited via its CAS guard and no CANCEL
reached the server. The dispatcher kept pulling frames for the
abandoned query, eventually filling io.events to its cap and blocking
handleResultBatch; the next c.Query on the same client then parked
on the single-slot requests channel — effectively a deadlock until
the whole client was closed.

Make the takeEvent-error path symmetrical to the !keepGoing break-out
branch: CANCEL the current request and drain to a terminal frame on
a bounded cleanup context, so the dispatcher returns to idle
regardless of q.ctx's state. The shared cleanup is factored into
(*QwpQuery).cancelAndDrainOnCleanupCtx, which Close also reuses.
Exec gets the same CANCEL + drain in its error branch.

Adds TestQwpQueryDrainAfterIteratorCtxExpiry and
TestQwpExecDrainAfterCtxExpiry, both of which fail on the previous
behavior (second query stalls on context deadline) and pass now.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The dispatcher's sendQueryRequest, sendCancel, and sendCredit all
called transport.sendMessage with context.Background(), so a
conn.Write parked on a peer that has stopped draining (TCP
zero-window, hung app) had no ctx to observe shutdown.

In most cases this was masked by coder/websocket's behavior:
cancelling the reader's Read ctx fires an AfterFunc that tears down
the underlying net.Conn, which unblocks an in-flight Write as a side
effect. That AfterFunc is only registered while Read is active, so
the protection disappears as soon as the reader returns from
conn.Read — for example after consuming a frame, while it is parked
on frameCh waiting for the dispatcher. If the dispatcher is stuck in
Write at that moment, shutdown closes shutdownCh and cancels the
reader's ctx, the reader wakes and exits, but the dispatcher's Write
stays parked; doneCh never closes and shutdown returns ctx.Err()
after the caller's timeout. The dispatcher goroutine survives past
Close (and only unwinds when transport.close() tears down the conn
directly), and the 5s cleanup drain in QwpQuery.Close / Exec waits
the full timeout instead of being preemptible.

Collapse readCtx/readCancel into ioCtx/ioCancel and plumb ioCtx
through the three sendMessage call sites. Now both the reader's Read
and the dispatcher's Writes share a single cancel signal fired by
shutdown, and either side's active I/O is enough for coder/websocket
to tear the conn down on cancel.

Add TestQwpEgressIOShutdownUnblocksStuckWrite, which builds the
failure scenario deterministically via a net.Pipe server that
upgrades, emits one valid binary WS frame, then stops reading.
Before the fix the test hits the 1s shutdown timeout; after the fix
shutdown returns in ~200 ms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
A panic raised inside the Batches() yield body used to skip the
releaseBuffer call and the cancel+drain cleanup. With bufferPoolSize=1
this permanently starved the decode pool; with any pool size the outer
`defer q.done.Store(true)` still flipped done=true during unwinding, so
the caller's `defer q.Close()` CAS failed and the dispatcher was left
parked in receiveLoop for the in-flight query. The next Query or Exec
on the same client deadlocked on the idle dispatcher.

Wrap the yield call in an IIFE whose defer always returns the buffer to
the pool, and — if a panic is in flight — runs cancelAndDrainOnCleanupCtx
before re-panicking so the dispatcher returns to idle before control
leaves the iterator. Add a test that panics inside the yield body with
bufferPoolSize=1 and then runs a second query on the same client, which
now succeeds.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The decimal scale and geohash precision bits were written onto
qwpColumnSchemaInfo, which the decoder persists in its
connection-scoped schema registry and which is aliased by every
batch that references the same schema id — including
SerializedBatch snapshots produced by CopyAll. With bufferPoolSize
> 1, the dispatcher decoding batch N+1 could write into the same
struct that a consumer goroutine was reading from batch N, tripping
-race and risking torn reads on weakly-ordered architectures.

Both values actually travel in the DATA section, not the schema
section — they are per-batch, not per-schema. Move them off
qwpColumnSchemaInfo and onto qwpColumnLayout, which is
dispatcher-exclusive until ownership transfers via the events
channel. The schema registry entries are now immutable shared
state.

Accessors (DecimalScale, GeohashPrecisionBits) now read from the
layout, CopyAll propagates the new fields to the snapshot, and
clear zeros them between reuses.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Close previously CAS'd a boolean `done` flag, which prevented double-
close by the same caller but did nothing to coordinate with a Batches
iteration running on another goroutine. Both callers ended up in
drainUntilTerminal competing for the dispatcher's single terminal
event; whichever lost blocked until its cleanup ctx expired (5 s).

Replace `done atomic.Bool` with a three-state `state atomic.Int32`
(Idle / Iterating / Done) and coordinate via CAS:

- Batches() enters via CAS(Idle→Iterating); its defer flips to Done.
- Close() claims cleanup only via CAS(Idle→Done). On failure it is a
  no-op — either an iterator is active and will run its own cancel+
  drain on exit, or the cursor is already Done.
- Cancel() now gates on state == Done instead of the old bool; still
  works during Iterating, which is its whole point.

The submit-error and closed-client paths in Query() set state
directly to Done before any goroutine can observe it, so no CAS is
needed there.

Tighten the QwpQuery doc to spell out the new contract: Close is a
no-op while a Batches iteration is in flight; use Cancel (or cancel
q.ctx) to unblock an in-flight iterator from another goroutine.

Add TestQwpQueryCloseIsNoOpWhileIterating, which parks the iterator
mid-stream and asserts a concurrent Close returns within 500 ms.
Verified to fail against the old buggy Close and pass with the fix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When decoder.decode failed mid-batch, the egress dispatcher only
marked the current query done and returned to the outer loop. The
next Query on the same client was accepted, submitted, and decoded
against a possibly-desynced qwpConnDict / qwpSchemaRegistry. The
delta-dict sync check catches most cases, but a mis-advanced reader
from a single out-of-range size can leave the dict accidentally in
sync at the offset level while values are wrong — silent data
corruption on a subsequent query.

Add an ingress-style terminal latch on qwpEgressIO: ioErr is set
on any decoder- or framing-level error (handleResultBatch /
ResultEnd / QueryError / ExecDone decode failures, dispatchFrame
header-peek failure, unknown msg_kind). Every subsequent
submitQuery reads the latch first and returns the stored error
synchronously, so a fresh query can never land on a desynced
decoder. Mirrors the asyncState.ioErr pattern used by the ingest
side and documented in CLAUDE.md.

Transport-read failures are unchanged — they already tear the
connection down via the reader closing frameCh.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three gaps the recent review called out, paired so -race in CI catches
future regressions in the same class.

1. qwp_query_batch_test.go: TestQwpColumnBatchCopyAllScaleAndPrecision-
   AreRaceFree decodes frame A (scale=2, precision=20), takes a
   CopyAll snapshot, then hammers four reader goroutines against
   snapshot.DecimalScale / GeohashPrecisionBits while the main
   goroutine re-decodes frame B (scale=7, precision=40) 200× into the
   source batch. Commit 58e1915 moved scale/precisionBits off the
   connection-scoped qwpColumnSchemaInfo onto per-batch qwpColumnLayout
   — this is the concurrent write-vs-snapshot-read pattern it exists
   to prevent, and without the fix -race flags it immediately.

2. .github/workflows/build.yml: go test now runs with -race. The
   hardening commits on this branch would have had tighter feedback
   with the race detector on; turning it on now ensures future
   concurrency regressions fail CI rather than leaking into a branch.

3. qwp_query_integration_test.go: three egress negative-path tests
   against the live server.

   - TestQwpIntegrationCancelLongRunningQuery: extended to verify the
     post-cancel invariant that actually matters in production — the
     client's dispatcher returned to idle and a follow-up Query
     round-trips without stranding. Old test only checked saw>=1,
     which was a tautology on a query that completes naturally.
     Does not assert Cancel short-circuited the server (localhost
     long_sequence races past Cancel).

   - TestQwpIntegrationCtxDeadlineMidStream: query's ctx expires
     while the iterator is blocked in takeEvent. Exercises the
     takeEvent-ctx-expiry branch in Batches() and confirms
     cancelAndDrainOnCleanupCtx leaves the client usable for a
     follow-up Query.

   - TestQwpIntegrationClientCloseDuringLongQuery: another goroutine
     closes the QwpQueryClient while the iterator is mid-stream. The
     iterator must surface a transport error and exit without
     hanging; Close must return within its own bounded timeout. This
     is the closest in-band proxy for a server-initiated connection
     close.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The per-element decode loop in QwpColumnBatch.Float64Array and
Int64Array was doing a bounds-checked sub-slice, a LittleEndian.Uint64
load, a Float64frombits no-op, and a bounds-checked store on every
element. For array-heavy schemas this was the hottest path in the
per-cell API.

Replace the loop with an unsafe reinterpretation of the payload bytes
as []float64 / []int64 followed by copy, which lowers to memmove.
float64 and int64 are 8 bytes on every supported architecture and Go
stores them little-endian on all targets questdb-client supports, so
the wire layout matches the in-memory layout. The reinterpreted
source slice is only ever read by copy, never dereferenced as an
aligned 8-byte load, so the unaligned payload base pointer is safe.

Guard elems == 0 so the non-null empty-shape case (nDims=1, dim0=0)
does not panic on &l.values[base] when base == len(l.values).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The qwpBitReader hot path was refilling its 64-bit accumulator one
byte at a time, paying a refill check on every call. The Gorilla DoD
decoder issues up to four single-bit prefix reads plus a 7/9/12/32-bit
signed payload per timestamp, so a long TIMESTAMP column was incurring
five or six refill checks per row.

Restructure readBits around a fast path (single shift+mask when the
accumulator already holds enough bits) and a cold readBitsSlow that
prefers an 8-byte little-endian load when the source has 8 bytes
available, falling back to a 1-byte load only at the buffer tail.
Specialise readBit to skip the n-bit machinery once the accumulator
is populated, which is the common case after the first refill. The
Java reference does effectively this; the Go port lost the
optimisation.

bytesConsumed still reflects bits actually read rather than bytes
loaded, so the speculative 8-byte refill cannot mislead the outer
reader's cursor.

Add BenchmarkQwpGorillaDecode over 4096 timestamps across three DoD
distributions as a regression gate. Measured on Apple M1 Pro:

  ConstantDelta  19289 -> 13882 ns/op  (1.39x)
  SmallJitter    76390 -> 42217 ns/op  (1.81x)
  WideJitter    143102 -> 74443 ns/op  (1.92x)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
parseSymbol reads one varint per non-null row and appendDelta reads one
per delta dictionary entry. Both went through readVarintInt63 →
readVarint → qwpReadVarint, a call chain too large to inline; for
symbol-heavy result sets and bursts of new symbols this dominated the
hot loop even though the common case is a single byte with the high bit
clear.

Hoist the underlying buffer and position into locals and inline the
single-byte fast path directly in each loop. Multi-byte varints, EOF,
and overflow fall back to the existing readVarintInt63 (which still
allocates *qwpDecodeError on the cold path), keeping wrapped error
identity. In appendDelta the per-entry slice bound check is also
inlined so the success path stays loop-local.

While here, drop two dead branches that readVarintInt63 already
guarantees: id64 < 0 in parseSymbol and entryLen < 0 in appendDelta —
the int63 cast cannot produce a negative value. The dictSize range
check is reframed as uint64 ≥ uint64(dictSize) so it stays correct
on platforms where int is 32 bits.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
parseNullSection used to read `bitmap[i>>3]` once per row, repeating
the load and bounds check for every rows in the same byte. The loop
runs once per nullable column, so the overhead compounds on wide-row
batches.

Walk the bitmap one byte at a time instead. Each byte covers eight
rows, so the load and bounds check happen once per eight iterations.
Fast paths for 0x00 (all non-null) and 0xFF (all null) bytes skip the
inner bit loop entirely with straight-line stores, which are the
common cases for mostly-dense or mostly-sparse columns. A short tail
loop handles the final `rowCount & 7` rows with the same single-load
pattern.

Semantics are unchanged: a set bit still marks a null row (-1 in
nonNullIdx) and a clear bit still assigns the next dense index.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The decoder knew columnCount upfront but was growing out.layouts one
element at a time with append, paying a cap check and potential grow
per iteration. Since qwpColumnLayout is ~120 bytes (multiple slice
headers), that overhead adds up on first contact with a new column
count.

Replace the grow loop with the cap-check/reslice pattern already used
for nonNullIdx, timestampBuf, symbolRowIds, and arrayRow* elsewhere in
the same file: if cap is short, allocate exactly columnCount; else
reslice. This loses the amortising-append behavior, but there is
nothing to amortise when the final size is known upfront, and
subsequent batches with the same column count still reuse the backing
array.

Also drops the now-redundant `out.layouts = out.layouts[:columnCount]`
below — the new block sets the length for both grow and shrink cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds a by-value QwpColumn handle returned by
QwpColumnBatch.Column(col), plus four bulk Int64Range / Float64Range
/ Int32Range / Float32Range methods on the handle that materialise
dense fixed-width columns into caller-supplied slices. The bulk
no-nulls path is ~9.4x faster than per-cell access: one memmove via
unsafe.Slice instead of a method call per row.

The handle captures the layout pointer plus the row count in a
16-byte struct returned by value. Per-cell access through the handle
measures the same as through the batch surface — Go's inliner
already hoists b.layouts[col] in common shapes — so the handle's
practical value is ergonomic plus serving as the natural carrier for
the Range methods.

The no-nulls Range fast path slices l.values[fromRow*N:toRow*N]
before reinterpreting via unsafe.Slice, so caller misuse with
toRow > rowCount panics the same way as the per-cell accessor
instead of silently reading past the buffer.

The batch-level (col, row) accessor bodies are intentionally not
delegated to the QwpColumn handle. Routing through
b.Column(col).X(row) ~doubles per-cell latency on Go 1.26 because
the inliner does not flatten the by-value receiver chain, so the
QwpColumn struct construction stays on the hot path. A doc comment
above the typed accessors records why the duplication is intentional.

The shared string-slice decode moves from a QwpColumnBatch method to
a free qwpStringSlice function so both surfaces share it.

Tests cover handle/batch parity, Range correctness across all four
typed variants, the panic-on-misuse contract for the no-nulls fast
path, and the zero-allocation invariant when dst has sufficient
capacity. A new qwp_query_batch_perf_test.go contains
microbenchmarks characterising per-cell vs bulk and handle vs batch
performance for future regression checking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
readVarintInt63 rejects any unsigned varint whose uint64→int64 cast
would flip the sign, so the returned int64 is guaranteed non-negative.
The `< 0` clauses in appendDelta, the table-block header (name,
row_count, column_count, schema_id), parseFullSchema (column name),
and parseGeohash (precision bits) were therefore unreachable.

Drop them. The upper-bound checks (>= qwpDefaultMaxSchemasPerConnection,
> qwpMaxRowsPerBatch, etc.) still exercise the hostile-input paths; the
existing hardening tests — including H12_NegativeSchemaIdVarint, which
feeds a 5-byte varint decoding to 0x80000000 — continue to reject via
the range cap rather than the negative check. parseSymbol's analogous
id64 < 0 was already removed in ffe170e; this aligns the remaining
sites.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The bytesConsumed() overrun check in parseTimestamp's Gorilla branch
panicked on an invariant that holds for well-formed frames. A panic
here crashes the user's process on malformed network input, which is
the opposite of what a decoder-internal check should do. Surface a
*qwpDecodeError instead so the dispatcher latches it via setIoErr
like any other decode failure — defense-in-depth is cheap on a
network-fed decoder.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds a fluent QwpBinds encoder and a WithQueryBinds option that
threads typed bind parameters through Query and Exec calls. The
full QuestDB type system is covered (18 types including
DECIMAL64/128/256, UUID, LONG256, GEOHASH, VARCHAR,
TIMESTAMP_NANOS, etc.) with matching NullXxxBind variants plus a
DecimalBind helper that auto-selects the narrowest fixed-width
wire form for a given Decimal.

Callers previously had to interpolate values into the SQL text,
which defeats the server's SQL-text-keyed factory cache. With
bind parameters, repeated calls with the same SQL text hit the
cached factory and just vary the payload.

The encoded bind payload is copied into a request-owned slice
inside buildRequest before the qwpRequest reaches the I/O
dispatcher, so a follow-up query's scratch reset cannot race the
dispatcher's read. The QwpBinds scratch stays on QwpQueryClient
across queries to amortize the encoding buffer; the per-request
copy is the only new allocation on the submit path.

Tests cover every type's wire layout, null handling, the
fluent-chain short-circuit on latched errors, client-level
plumbing through both Query and Exec, scratch-reset-across-calls,
and a live-server integration test that asserts the server
accepts the payload and returns per-call result sets.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds decode and dispatcher support for the new QWP egress control
message the server emits between queries when a connection-scoped
cache reaches its soft cap. Body is a single reset_mask byte (bit 0
clears the SYMBOL delta dict, bit 1 clears the schema-fingerprint
cache); unknown bits are preserved on decode and ignored on apply so
future bits stay forward-compatible. Ports server commit ea495f01 and
Java client commit 359f2d70.

The I/O dispatcher routes CACHE_RESET to a new handler that applies
the reset to the decoder's per-connection state and, crucially, does
not mark the current query done and does not emit a user event --
the frame is transparent to the caller. A truncated frame poisons
the connection via the same path used by the other non-RESULT_BATCH
decoders, since a desynced decoder cannot be trusted on subsequent
frames.

qwpConnDict.clear swaps to fresh backing arrays (rather than slicing
len to 0) so any qwpSymbolDictView snapshot a user handler still
holds on a prior batch keeps reading the original bytes; the Java
client can get away with position reset because its buffers live in
native memory, but Go slice aliases would be corrupted by in-place
reuse. qwpSchemaRegistry.clear nils its slot references (letting the
per-id slices be GC'd once the last QwpColumnBatch alias drops) and
truncates the slot slice to zero length while preserving capacity,
so a workload that churns just above the soft cap does not pay for
re-growing the lookup table on every reset.

Tests cover the wire round-trip for all four defined mask values,
forward-compat preservation of unknown bits, truncation / wrong-kind
/ bad-magic / FLAG_ZSTD rejection paths, apply semantics against a
seeded decoder for every mask subset, snapshot detachment across a
dict generation change, capacity preservation across clear, and an
end-to-end dispatcher test that runs two queries with a CACHE_RESET
between them and asserts the event stream stays clean while the
decoder caches are empty afterwards.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three independent perf fixes surfaced by a review of the egress
decoder hot path:

1. Replace the written-but-unread `arrayRowLen` field on
   qwpColumnLayout with `arrayElems`, the precomputed element
   count for each array row. parseArray already had to compute
   the element count for its bounds check against
   qwpMaxArrayElements; storing it costs nothing extra at decode
   time and lets `arrayElementCount` (called by every per-cell
   array accessor) collapse from a multiply loop over the shape
   header into a single int32 cache load plus one nDims byte
   read for the data offset.

2. Add Float64ArrayInto / Int64ArrayInto on QwpColumn. These are
   append-into-dst variants of the existing Float64Array /
   Int64Array accessors, mirroring the contract the bulk *Range
   accessors already use: a hot loop reuses one dst slice across
   rows by truncating with dst[:0], and the per-cell make() that
   otherwise dominates wide-array scans goes away. NULL rows
   leave dst untouched (distinct from the per-cell variant which
   returns nil).

3. Replace the `if n == 64` branches in qwpBitReader.readBits and
   readBitsSlow with `^uint64(0) >> (64 - n)` for the mask and
   `(buf >> 1) >> (n - 1)` for the accumulator drain. Both keep
   the shift count in [0, 63] for n in [1, 64], so Go's compiler
   no longer has to emit the runtime guard it otherwise inserts
   for shifts that may equal the operand width. The Gorilla DoD
   path issues several of these per row.

Updates the three array test fixtures in qwp_query_batch_test.go
to set arrayElems (with the actual element count) instead of the
removed arrayRowLen, and adds three new tests covering the Into
accessors' append, NULL, and backing-array-reuse semantics.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Apr 24, 2026

Copy link
Copy Markdown

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a full QWP query stack and related protocol/workflow changes: bind encoding, WebSocket query client and config parsing, egress I/O/dispatcher, RESULT_BATCH decoder and column-major batch surface, Gorilla delta-of-delta timestamp decoder, transport endpoint separation, many protocol hardening updates, extensive tests/benchmarks, README and CI tweaks.

Changes

Cohort / File(s) Summary
Query client & config
qwp_query_client.go, qwp_query_client_test.go, qwp_query_conf.go
New QwpQueryClient + QwpQuery cursor APIs, functional options, config string parsing (ws::/wss::), bind plumbing, Query/Exec semantics, and comprehensive client tests (auth, compression, binds, cancel/close, error mapping).
Egress I/O & dispatcher
qwp_query_io.go, qwp_query_io_test.go
New egress I/O (reader + dispatcher goroutines), event model (BATCH/END/ERROR/EXEC_DONE), pooled batch buffers, CREDIT flow-control, CANCEL semantics, decoder-poisoning and coordinated shutdown; fully tested.
Result decoding & batch surface
qwp_query_decoder.go, qwp_query_decoder_test.go, qwp_query_batch.go, qwp_query_batch_test.go, qwp_query_batch_perf_test.go
Stateful RESULT_BATCH decoder with schema/dict registries, zstd handling, per-column layouts, QwpColumnBatch/QwpColumn typed accessors, CopyAll snapshotting, arrays/symbols/decimals support, and extensive unit/integration/perf tests.
Gorilla decoder & bit I/O
qwp_gorilla_decoder.go, qwp_gorilla_decoder_test.go, qwp_gorilla_test.go, qwp_bench_test.go
Adds LSB-first qwpBitReader and qwpGorillaDecoder for DoD timestamps, bit-level utilities/tests and a benchmark; some zero-allocation tests skip under -race.
Bind encoding & tests
qwp_bind_values.go, qwp_bind_values_test.go
New QwpBinds encoder covering many wire types (decimals choosing width, nulls, geohash, varchar, etc.), error-latching, reset/count/Err, plus exhaustive byte-exact tests and failure cases.
Wire parsing & hardening
qwp_wire.go, qwp_wire_test.go, qwp_constants.go, qwp_constants_test.go, qwp_errors.go, qwp_query_errors.go
Stricter varint checks, new qwpDecodeError and qwpByteReader, readVarintInt63, added egress message kinds/types/statuses (BINARY, IPV4, CANCELLED, LIMIT_EXCEEDED), status-name mapping, and QwpQueryError type; tests updated/expanded.
Transport & endpoints
qwp_transport.go, qwp_transport_test.go, sender.go
Separate ingest (/write/v4) and egress (/read/v1) endpoints; qwpTransportOpts.endpointPath required; egress-only upgrade headers surfaced; connect fails on empty path; transport close signature simplified; tests adjusted.
Sender & tests
qwp_sender.go, qwp_sender_test.go, qwp_sender_async_test.go
Senders now set endpointPath: qwpWritePath; Close uses new transport close(); sender tests updated to pass write path and adjust closes.
Buffer/array null semantics
qwp_buffer.go, qwp_buffer_test.go
Non-nullable array addNull path now panics (unsupported); public ingest API expects nullable array columns and represents NULL via null bitmap; added guard tests.
Byte-level utilities & decoding helpers
qwp_wire.go, qwp_gorilla_decoder.go, qwp_gorilla_decoder_test.go
Introduces qwpByteReader, qwpDecodeError, readVarintInt63, and bit-level readers/decoders with validation and clear error propagation; tests cover truncation and signed reads.
Query decoder internals & integration
qwp_query_decoder.go, qwp_query_decoder_test.go, qwp_query_integration_test.go
Decoder implements per-column decode paths (fixed, decimals, symbols, arrays, Gorilla timestamps), cache-reset semantics, strict zstd validation, hardening checks, and many integration tests.
Race tag tests
qwp_race_on_test.go, qwp_race_off_test.go
Build-tagged files expose raceEnabled constant to alter test behavior under -race.
Docs, CI, modules
README.md, .github/workflows/build.yml, go.mod
README adds QWP-over-WebSocket usage and examples; CI test runs go test -race; go.mod promotes some dependencies from indirect to direct.
Misc tests & small fixes
qwp_sender_async_test.go, sender_pool_test.go, many *_test.go
Numerous test additions/tweaks (close API, endpointPath usage, allocation-conditional skips, sender close adjustments, guard tests, polling vs goroutine drain).
Examples
examples/qwp/query/main.go
New example showing QwpQueryClient usage, bulk INSERT, and batch vs range read patterns.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant QwpQueryClient
    participant EgressIO as qwpEgressIO
    participant WS as WebSocket
    participant Decoder as qwpQueryDecoder
    participant Batch as QwpColumnBatch

    Client->>QwpQueryClient: Query(ctx, sql, binds)
    QwpQueryClient->>EgressIO: submitQuery(requestId, sql, binds)
    EgressIO->>WS: send QUERY_REQUEST (upgrade headers)
    WS-->>EgressIO: RESULT_BATCH frame
    EgressIO->>Decoder: decode RESULT_BATCH
    Decoder-->>Batch: build QwpColumnBatch
    EgressIO-->>QwpQueryClient: emit BATCH event
    QwpQueryClient-->>Client: yield *QwpColumnBatch
    Client->>Batch: typed accessors (Int64/String/Array)
    Client->>QwpQueryClient: release batch
    EgressIO->>WS: send CREDIT
    WS-->>EgressIO: RESULT_END
    EgressIO-->>QwpQueryClient: emit END
    QwpQueryClient-->>Client: iterator terminates
Loading
sequenceDiagram
    participant Decoder as qwpQueryDecoder
    participant BitReader as qwpBitReader
    participant Gorilla as qwpGorillaDecoder
    participant Column as qwpColumnLayout

    Decoder->>Decoder: parse batch header, optional zstd decompress
    Decoder->>Column: init layout, null bitmap
    alt Gorilla-compressed timestamp column
        Decoder->>BitReader: create reader over bitstream
        BitReader->>Gorilla: supply bits
        Gorilla->>Gorilla: decode DoD -> timestamps
        Gorilla-->>Column: append decoded timestamps
    else Fixed-width / strings / arrays
        Decoder->>Column: decode fixed bytes, offsets, arrays, symbols
    end
    Decoder-->>Decoder: finish columns, return populated batch
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • bluestreak01
  • puzpuzpuz

Poem

🐰 I hopped through frames and bit-packed streams,
I bound up decimals and chased delta-of-delta dreams,
I peeked at batches, decoded Gorilla time,
I sent a CREDIT and answered a query's chime—
A rabbit cheers for bytes and WebSocket beams. 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 63.66% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The PR title 'feat(qwp): add QWP WebSocket transport (store-and-forward ingest, typed errors, failover, query client)' clearly and specifically describes the main change—adding QWP query-side egress client functionality over WebSocket with associated improvements.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch mt_qwp-egress

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (11)
README.md (1)

129-135: Use a checked type assertion in the README example to avoid panic.

At Line 134, qwp := sender.(qdb.QwpSender) can panic if the sender was created with a non-QWP config. Prefer the safe assertion form in docs.

Proposed doc snippet update
- qwp := sender.(qdb.QwpSender)
+ qwp, ok := sender.(qdb.QwpSender)
+ if !ok {
+     log.Fatal("sender is not QWP-capable; use ws:// or wss:// config")
+ }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 129 - 135, The README example uses an unchecked type
assertion qwp := sender.(qdb.QwpSender) which can panic if the returned sender
does not implement qdb.QwpSender; change this to a checked assertion (ok form)
when converting the value from LineSenderFromConf, e.g. assign with qwp, ok :=
sender.(qdb.QwpSender) and handle the false case by logging or returning an
error instead of allowing a panic; update the snippet around the sender, qwp and
LineSenderFromConf usage to demonstrate safe handling.
qwp_query_errors.go (1)

47-54: Consider including RequestId in the default error message.

RequestId is the primary correlation key for a query but is not rendered by Error(), so anything that logs the error via %v/%s drops it unless the caller type-asserts. Including it in the formatted string keeps the existing errors.As path intact while making plain-text logs self-contained.

💡 Proposed format change
 func (e *QwpQueryError) Error() string {
 	name := qwpStatusName(e.Status)
 	if e.Message != "" {
-		return fmt.Sprintf("qwp: query error %s (0x%02X): %s",
-			name, byte(e.Status), e.Message)
+		return fmt.Sprintf("qwp: query error %s (0x%02X) [request_id=%d]: %s",
+			name, byte(e.Status), e.RequestId, e.Message)
 	}
-	return fmt.Sprintf("qwp: query error %s (0x%02X)", name, byte(e.Status))
+	return fmt.Sprintf("qwp: query error %s (0x%02X) [request_id=%d]",
+		name, byte(e.Status), e.RequestId)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_query_errors.go` around lines 47 - 54, Update QwpQueryError.Error() to
include the RequestId in its returned string so plain-text logs show the
correlation id; modify the two fmt.Sprintf calls inside QwpQueryError.Error (the
branch that includes e.Message and the fallback) to append a clear
"request_id=%s" or similar with e.RequestId, preserving the existing status name
formatting via qwpStatusName(e.Status) and the byte cast of e.Status so
errors.As behavior is unchanged.
qwp_wire.go (2)

372-381: Minor: use math.MaxInt64 for the int63 upper bound.

uint64(1<<63-1) is correct but less immediately obvious than uint64(math.MaxInt64), and math is already imported.

-	if v > uint64(1<<63-1) {
+	if v > uint64(math.MaxInt64) {
 		return 0, newQwpDecodeError("varint overflow: value exceeds int63")
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_wire.go` around lines 372 - 381, In readVarintInt63 replace the literal
upper-bound check uint64(1<<63-1) with uint64(math.MaxInt64) to improve clarity;
update the comparison in func (r *qwpByteReader) readVarintInt63() so it uses
uint64(math.MaxInt64) (math is already imported) and keep the existing error
handling and return values unchanged.

356-363: Wrapped varint errors produce a duplicated qwp: prefix.

qwpReadVarint returns errors.New("qwp: varint overflow")/"qwp: varint truncated", and wrapQwpDecodeError prepends "qwp: decode: " around err.Error(). The final message is qwp: decode: qwp: varint overflow, which is what users will see in logs. Either strip the qwp: prefix before wrapping, or switch qwpReadVarint to return plain messages since it's package-internal.

🧹 Minimal cleanup
 // qwpReadVarint decodes an unsigned LEB128 varint from buf.
 func qwpReadVarint(buf []byte) (uint64, int, error) {
 	var v uint64
 	var shift uint
 	for i, b := range buf {
 		if i >= qwpMaxVarintLen {
-			return 0, 0, errors.New("qwp: varint overflow")
+			return 0, 0, errors.New("varint overflow")
 		}
 		if shift == 63 && b&0x7E != 0 {
-			return 0, 0, errors.New("qwp: varint overflow")
+			return 0, 0, errors.New("varint overflow")
 		}
 		v |= uint64(b&0x7F) << shift
 		if b&0x80 == 0 {
 			return v, i + 1, nil
 		}
 		shift += 7
 	}
-	return 0, 0, errors.New("qwp: varint truncated")
+	return 0, 0, errors.New("varint truncated")
 }

Any external call sites already add their own prefix on wrap, and the decode-path wrapper supplies the qwp: decode: envelope.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_wire.go` around lines 356 - 363, The wrapped varint errors get a
duplicated "qwp: " prefix because qwpReadVarint returns errors already starting
with "qwp: " and readVarint wraps them with wrapQwpDecodeError; fix by removing
the "qwp: " prefix from package-internal error strings in qwpReadVarint (or
alternatively make wrapQwpDecodeError idempotent by skipping prepending when
err.Error() already starts with "qwp: "); update qwpReadVarint's error returns
(and any similar package-local helpers) so they return plain messages like
"varint overflow"/"varint truncated" and keep wrapQwpDecodeError as the single
place that adds the "qwp: decode: " envelope.
qwp_bench_test.go (1)

336-387: ts0/ts1 in the test cases don't match the stream's actual first two timestamps.

mk accumulates cur from zero using stepFn, so the stream's ts[0] and ts[1] depend on the step function. The literal ts0/ts1 in the case table happen to agree for ConstantDelta and SmallJitter on the initial delta (which is what the DoDs were computed against), but for WideJitter the encoder's initial delta is 1 (ts[1]-ts[0] = 1_000_001 - 1_000_000) while the case feeds the decoder an initial delta of 1_000_000.

Bit-level decode work stays the same, so the bench timings aren't misleading today — but if anyone later asserts on decoded values (or adds a correctness check like the one in qwp_gorilla_test.go's decodeGorilla), this case will silently disagree with the encoder.

🛠️ Proposed refactor: let `mk` return the actual first two timestamps
-	mk := func(stepFn func(i int) int64) []byte {
+	mk := func(stepFn func(i int) int64) (data []byte, ts0, ts1 int64) {
 		ts := make([]int64, n)
 		var cur int64
 		for i := range ts {
 			cur += stepFn(i)
 			ts[i] = cur
 		}
 		var wb qwpWireBuffer
 		var enc qwpGorillaEncoder
 		enc.encodeTimestamps(&wb, intsToBytes(ts), n)
-		// Strip the 16-byte uncompressed prefix the bit reader doesn't
-		// touch — the decoder's reset() takes only the bit-packed tail.
-		return append([]byte(nil), wb.bytes()[16:]...)
+		// Strip the 16-byte uncompressed prefix the bit reader doesn't
+		// touch — the decoder's reset() takes only the bit-packed tail.
+		return append([]byte(nil), wb.bytes()[16:]...), ts[0], ts[1]
 	}

…and inline the returned ts0/ts1 into each case entry so the cases stay self-consistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_bench_test.go` around lines 336 - 387, The test's hardcoded ts0/ts1
values don't match the encoded stream for some cases (e.g., WideJitter); change
mk to return the encoded byte slice plus the actual first two timestamps (e.g.,
return ([]byte, int64, int64) or a small struct) by capturing ts[0] and ts[1]
before encoding, then update the cases table to take those returned ts0/ts1
values and pass them into qwpGorillaDecoder.reset in the benchmark loop so the
decoder is initialized with the true initial timestamps.
qwp_query_conf.go (2)

246-260: Move compression_level bounds check into the parser for a clearer error.

validate() enforces [1, 22] at the very end, so a user-supplied compression_level=50 surfaces as the generic "compression level must be in [1, 22]" error without the offending config-string context. Since sibling keys like buffer_pool_size and initial_credit already emit NewInvalidConfigStrError wrappers with the original textual value on parse failure, doing the same for the range check here would keep error messages consistent across keys.

Not blocking — the validation path still catches it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_query_conf.go` around lines 246 - 260, In the "compression_level" parser
inside the switch (case "compression_level") after converting v with
strconv.Atoi, add a bounds check for n and return NewInvalidConfigStrError when
n is outside [1,22] so the error includes the original string value;
specifically, in the block handling compression_level (where
cfg.compressionLevel is set) validate n ∈ [1,22] and if not return
NewInvalidConfigStrError("invalid compression_level %q: must be in [1, 22]", v)
before assigning cfg.compressionLevel, keeping the existing strconv.Atoi error
handling and leaving validate() as a final safeguard.

138-189: Consider skipping the compression-level range check when compression is "raw".

The docstring at line 77 explicitly says compressionLevel is ignored when compression == "raw", but validate() still rejects compressionLevel outside [1, 22] unconditionally. This is fine for the default (qwpDefaultCompressionLevel = 3) but surprises users who set a sentinel like 0 alongside compression=raw via options — the request is silently constructed but then rejected on a field that's documented as ignored.

Low priority since defaults cover the common case; flagging only because the asymmetry between the docstring and the validation rule is easy to trip over.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_query_conf.go` around lines 138 - 189, The validate() function currently
enforces compressionLevel ∈ [1,22] unconditionally; change it so the range check
is skipped when c.compression == qwpCompressionRaw (since compressionLevel is
documented as ignored for raw). In practice, update the check around
c.compressionLevel in qwpQueryClientConfig.validate to only validate when
c.compression != qwpCompressionRaw (leave validation for qwpCompressionZstd and
qwpCompressionAuto unchanged), referencing the c.compression and
c.compressionLevel fields and the qwpCompressionRaw constant.
qwp_bind_values_test.go (1)

86-550: LGTM – exhaustive byte-exact bind coverage.

The suite is thorough: per-type happy paths, null-exhaustive ordering, mixed types, decimal auto-width dispatch (including the null → DECIMAL256 path), boundary rejections (non-BMP char, bad scale, out-of-range geohash precision), index-order invariants, the qwpMaxColumnsPerTable cap, reset/re-encode determinism, large-payload buffer growth, and the fluent-chain short-circuit invariant. Mirroring the header bytes via testBindNonNull / testBindNullFlag / testBindNullBitmap rather than reusing production constants is a nice forcing function against silent protocol drift.

One minor naming nit: TestQwpBindsResetPreservesBuffer reads as "reset preserves buffer contents" but actually asserts the buffer is cleared and re-encoding is deterministic — something like TestQwpBindsResetIsDeterministic or …ClearsState would match the assertions more directly. Non-blocking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_bind_values_test.go` around lines 86 - 550, Rename the misleading test
TestQwpBindsResetPreservesBuffer to a name that reflects its assertions (e.g.,
TestQwpBindsResetIsDeterministic or TestQwpBindsResetClearsState); update the
test function identifier and any references to it (test runner, comments) so the
new name is used consistently, and keep the test body and assertions in place
(function symbol: TestQwpBindsResetPreservesBuffer -> new name).
qwp_query_batch_test.go (1)

35-1153: LGTM – excellent coverage of the batch/column decode surface.

The fixed-width/nullable/symbol/string/array paths are all covered with explicit byte layouts, and the two CopyAll integrity tests (pool-reuse Int64 and Gorilla-timestamp alias rebinding) are particularly valuable — they pin exactly the bug classes the deep clone is meant to defend against. The Range* OOB-panic and zero-alloc pins, the ArrayInto append/reuse semantics, and the concurrent scale/precision snapshot test (useful even without -race as a value-stability check) round the suite out well.

A couple of non-blocking notes:

  • Line 738–740: the len(batch.layouts[0].timestampBuf) == 0 precondition is (correctly) called out as fragile; if the encoder heuristic ever stops choosing Gorilla for that input, the test silently stops exercising the intended alias-rebinding bug. Consider forcing the Gorilla path explicitly (e.g. via an internal flag or by constructing the layout directly) so regressions in the heuristic don't quietly weaken coverage.
  • Line 714 (v := v): redundant since Go 1.22's per-iteration loop-variable semantics (and the repo is on Go modules / v4), though harmless. Same pattern on line 725.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_query_batch_test.go` around lines 35 - 1153, In
TestQwpColumnBatchCopyAllGorillaTimestampSurvivesPoolReuse the current
precondition check relying on encoder heuristics
(len(batch.layouts[0].timestampBuf) != 0) is fragile; change
encodeSingleColumnBatch (or add a new helper) to accept a flag or construct a
Gorilla-encoded timestamp layout explicitly so the test deterministically takes
the Gorilla path and always populates layout.timestampBuf, and update the test
to call that deterministic helper instead of relying on heuristics; also remove
the redundant loop-variable shadowing v := v in the orig/fresh row-building
loops in TestQwpColumnBatchCopyAllGorillaTimestampSurvivesPoolReuse to avoid
unnecessary code now that Go isolates iteration variables.
qwp_query_client.go (2)

447-453: Extract the duplicated misuse error string.

The identical "qwp query: Exec called on a SELECT-style statement; use Query instead" message appears in both the qwpEventKindBatch and qwpEventKindEnd branches. A single package-scope errors.New (or fmt.Errorf result) keeps the two branches in sync if the wording ever changes.

♻️ Proposed DRY
+var errExecOnSelect = errors.New(
+	"qwp query: Exec called on a SELECT-style statement; use Query instead")
+
...
-			return ExecResult{}, fmt.Errorf(
-				"qwp query: Exec called on a SELECT-style statement; use Query instead")
+			return ExecResult{}, errExecOnSelect
...
-			return ExecResult{}, fmt.Errorf(
-				"qwp query: Exec called on a SELECT-style statement; use Query instead")
+			return ExecResult{}, errExecOnSelect
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_query_client.go` around lines 447 - 453, The duplicated error string used
in the qwpEventKindBatch and qwpEventKindEnd branches should be extracted to a
package-level error constant; create a variable like errExecSelectStyle (using
errors.New or fmt.Errorf) and replace the two identical fmt.Errorf calls with
that variable so both branches reference the single shared error; update
references in the switch handling qwpEventKindBatch and qwpEventKindEnd to
return that error.

621-636: Defensive: panic safety for yield on the takeEvent-error path.

The qwpEventKindBatch branch (Lines 640-657) carefully wraps yield in a deferred recover that runs cancelAndDrainOnCleanupCtx before rethrowing — precisely because the outer defer q.state.Store(qwpQueryStateDone) already flipped state, so the caller's defer q.Close() would become a no-op and strand the dispatcher.

The takeEvent-error branch on Line 633 has the same hazard (the dispatcher is still parked in receiveLoop for this query) but calls yield(nil, err) without a recover — if the user's range body panics on the error receipt, the cancel+drain on Line 634 is skipped and the client is stranded for future Query/Exec.

Unlikely in practice (range bodies rarely panic on an error), so flagging as defensive hardening rather than a bug.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@qwp_query_client.go` around lines 621 - 636, The takeEvent error path
currently calls yield(nil, err) directly and then
q.cancelAndDrainOnCleanupCtx(), which can be skipped if the user's yield handler
panics; wrap the yield(nil, err) call in the same panic-safe pattern used in the
qwpEventKindBatch branch: create a deferred recover that on panic first calls
q.cancelAndDrainOnCleanupCtx() then re-panics (or returns the panic), and invoke
yield(nil, err) inside that protected block so cancelAndDrainOnCleanupCtx always
runs even if yield panics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@qwp_gorilla_decoder.go`:
- Around line 183-185: The doc comment about prefix bit order is incorrect and
misleading; update the comment near decodeDoD to state that bits are read
LSB-first so a stored byte 0b01 yields readBit=1 then readBit=0 (i.e., first
bit==1 then second bit==0), which matches the decoder's branches (the initial
readBit==1 path leads to the b==0 branch mapping to DoD=0, then readBit==0
selects the 7-bit signed payload). Replace the existing sentence with a concise
explanation that explicitly shows the LSB-first read order and the resulting bit
sequence for 0b01 to align with decodeDoD's logic.

In `@qwp_query_client.go`:
- Around line 723-729: Cancel() currently unconditionally sets q.cancelled and
calls q.client.io.requestCancel, which can emit duplicate CANCEL frames; change
both Cancel() and cancelAndDrainOnCleanupCtx to atomically gate emission by
using q.cancelled.CompareAndSwap(false, true) (or equivalent CAS) so that
q.client.io.requestCancel(q.requestId) is only invoked when the CAS transitions
false->true; keep storing/reading q.state behavior unchanged and preserve
thread-safety around q.requestId and q.client references.

In `@qwp_query_conf.go`:
- Around line 191-196: Update the docstring for parseQwpQueryConf to remove the
incorrect claim that compression/compression_level are omitted and instead state
that the parser now supports compression and compression_level; keep mention
that tls_roots and tls_roots_password are rejected/unavailable in this Go
module. Reference the function name parseQwpQueryConf and the config keys
compression, compression_level, tls_roots, and tls_roots_password so readers
know which keys are handled and which are unsupported.

In `@qwp_query_decoder.go`:
- Around line 695-722: In parseString (qwpQueryDecoder) after reading the
offsets buffer but before using totalBytes or slicing stringBytes, validate
every 4-byte offset word: decode each offset with binary.LittleEndian.Uint32,
ensure the sequence is non-decreasing (each offset >= previous), ensure the
first offset is 0 and every offset is <= totalBytes (where totalBytes is the
last offset), and reject the frame via newQwpDecodeError if any offset is out of
order or exceeds totalBytes; do this using l.nonNullCount and the offsets slice
so malformed intermediate offsets are caught early and prevent later panics in
qwpStringSlice.
- Around line 1194-1207: The int64sAsBytes function currently reinterprets
host-order int64s as bytes but downstream code (parseTimestamp and
QwpColumnBatch.Int64) reads them with binary.LittleEndian, which will break on
big-endian platforms; fix by either (A) during Gorilla decoding, explicitly
encode/emit timestamps as little-endian byte slices so int64sAsBytes returns
LE-ordered bytes consumed by parseTimestamp/QwpColumnBatch.Int64, or (B)
restrict compilation to little-endian architectures with a build constraint (add
a go:build tag that limits to known little-endian targets) and keep the
reinterpretation; update comments to reflect the chosen approach and ensure
references to int64sAsBytes and parseTimestamp remain consistent.

In `@qwp_query_io.go`:
- Around line 330-336: submitQuery can succeed if io.shutdownCh is already
closed but io.requests has buffer space; to fix, first do a non-blocking check
of io.shutdownCh and return the shutdown error immediately if it is closed, then
proceed to the existing send/select with io.requests and ctx.Done; update the
submitQuery function to perform a select { case <-io.shutdownCh: return
errors.New("qwp: I/O goroutine shut down") default: } before attempting to send
to io.requests so shutdown deterministically wins when already closed.

---

Nitpick comments:
In `@qwp_bench_test.go`:
- Around line 336-387: The test's hardcoded ts0/ts1 values don't match the
encoded stream for some cases (e.g., WideJitter); change mk to return the
encoded byte slice plus the actual first two timestamps (e.g., return ([]byte,
int64, int64) or a small struct) by capturing ts[0] and ts[1] before encoding,
then update the cases table to take those returned ts0/ts1 values and pass them
into qwpGorillaDecoder.reset in the benchmark loop so the decoder is initialized
with the true initial timestamps.

In `@qwp_bind_values_test.go`:
- Around line 86-550: Rename the misleading test
TestQwpBindsResetPreservesBuffer to a name that reflects its assertions (e.g.,
TestQwpBindsResetIsDeterministic or TestQwpBindsResetClearsState); update the
test function identifier and any references to it (test runner, comments) so the
new name is used consistently, and keep the test body and assertions in place
(function symbol: TestQwpBindsResetPreservesBuffer -> new name).

In `@qwp_query_batch_test.go`:
- Around line 35-1153: In
TestQwpColumnBatchCopyAllGorillaTimestampSurvivesPoolReuse the current
precondition check relying on encoder heuristics
(len(batch.layouts[0].timestampBuf) != 0) is fragile; change
encodeSingleColumnBatch (or add a new helper) to accept a flag or construct a
Gorilla-encoded timestamp layout explicitly so the test deterministically takes
the Gorilla path and always populates layout.timestampBuf, and update the test
to call that deterministic helper instead of relying on heuristics; also remove
the redundant loop-variable shadowing v := v in the orig/fresh row-building
loops in TestQwpColumnBatchCopyAllGorillaTimestampSurvivesPoolReuse to avoid
unnecessary code now that Go isolates iteration variables.

In `@qwp_query_client.go`:
- Around line 447-453: The duplicated error string used in the qwpEventKindBatch
and qwpEventKindEnd branches should be extracted to a package-level error
constant; create a variable like errExecSelectStyle (using errors.New or
fmt.Errorf) and replace the two identical fmt.Errorf calls with that variable so
both branches reference the single shared error; update references in the switch
handling qwpEventKindBatch and qwpEventKindEnd to return that error.
- Around line 621-636: The takeEvent error path currently calls yield(nil, err)
directly and then q.cancelAndDrainOnCleanupCtx(), which can be skipped if the
user's yield handler panics; wrap the yield(nil, err) call in the same
panic-safe pattern used in the qwpEventKindBatch branch: create a deferred
recover that on panic first calls q.cancelAndDrainOnCleanupCtx() then re-panics
(or returns the panic), and invoke yield(nil, err) inside that protected block
so cancelAndDrainOnCleanupCtx always runs even if yield panics.

In `@qwp_query_conf.go`:
- Around line 246-260: In the "compression_level" parser inside the switch (case
"compression_level") after converting v with strconv.Atoi, add a bounds check
for n and return NewInvalidConfigStrError when n is outside [1,22] so the error
includes the original string value; specifically, in the block handling
compression_level (where cfg.compressionLevel is set) validate n ∈ [1,22] and if
not return NewInvalidConfigStrError("invalid compression_level %q: must be in
[1, 22]", v) before assigning cfg.compressionLevel, keeping the existing
strconv.Atoi error handling and leaving validate() as a final safeguard.
- Around line 138-189: The validate() function currently enforces
compressionLevel ∈ [1,22] unconditionally; change it so the range check is
skipped when c.compression == qwpCompressionRaw (since compressionLevel is
documented as ignored for raw). In practice, update the check around
c.compressionLevel in qwpQueryClientConfig.validate to only validate when
c.compression != qwpCompressionRaw (leave validation for qwpCompressionZstd and
qwpCompressionAuto unchanged), referencing the c.compression and
c.compressionLevel fields and the qwpCompressionRaw constant.

In `@qwp_query_errors.go`:
- Around line 47-54: Update QwpQueryError.Error() to include the RequestId in
its returned string so plain-text logs show the correlation id; modify the two
fmt.Sprintf calls inside QwpQueryError.Error (the branch that includes e.Message
and the fallback) to append a clear "request_id=%s" or similar with e.RequestId,
preserving the existing status name formatting via qwpStatusName(e.Status) and
the byte cast of e.Status so errors.As behavior is unchanged.

In `@qwp_wire.go`:
- Around line 372-381: In readVarintInt63 replace the literal upper-bound check
uint64(1<<63-1) with uint64(math.MaxInt64) to improve clarity; update the
comparison in func (r *qwpByteReader) readVarintInt63() so it uses
uint64(math.MaxInt64) (math is already imported) and keep the existing error
handling and return values unchanged.
- Around line 356-363: The wrapped varint errors get a duplicated "qwp: " prefix
because qwpReadVarint returns errors already starting with "qwp: " and
readVarint wraps them with wrapQwpDecodeError; fix by removing the "qwp: "
prefix from package-internal error strings in qwpReadVarint (or alternatively
make wrapQwpDecodeError idempotent by skipping prepending when err.Error()
already starts with "qwp: "); update qwpReadVarint's error returns (and any
similar package-local helpers) so they return plain messages like "varint
overflow"/"varint truncated" and keep wrapQwpDecodeError as the single place
that adds the "qwp: decode: " envelope.

In `@README.md`:
- Around line 129-135: The README example uses an unchecked type assertion qwp
:= sender.(qdb.QwpSender) which can panic if the returned sender does not
implement qdb.QwpSender; change this to a checked assertion (ok form) when
converting the value from LineSenderFromConf, e.g. assign with qwp, ok :=
sender.(qdb.QwpSender) and handle the false case by logging or returning an
error instead of allowing a panic; update the snippet around the sender, qwp and
LineSenderFromConf usage to demonstrate safe handling.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 478408a8-7e26-40b9-aee6-308c8ecd020e

📥 Commits

Reviewing files that changed from the base of the PR and between 58fc055 and 1c5e454.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (34)
  • .github/workflows/build.yml
  • README.md
  • go.mod
  • qwp_bench_test.go
  • qwp_bind_values.go
  • qwp_bind_values_test.go
  • qwp_buffer.go
  • qwp_buffer_test.go
  • qwp_constants.go
  • qwp_constants_test.go
  • qwp_errors.go
  • qwp_gorilla_decoder.go
  • qwp_gorilla_decoder_test.go
  • qwp_gorilla_test.go
  • qwp_integration_test.go
  • qwp_query_batch.go
  • qwp_query_batch_perf_test.go
  • qwp_query_batch_test.go
  • qwp_query_client.go
  • qwp_query_client_test.go
  • qwp_query_conf.go
  • qwp_query_decoder.go
  • qwp_query_decoder_test.go
  • qwp_query_errors.go
  • qwp_query_integration_test.go
  • qwp_query_io.go
  • qwp_query_io_test.go
  • qwp_sender_async_test.go
  • qwp_sender_test.go
  • qwp_transport.go
  • qwp_transport_test.go
  • qwp_wire.go
  • qwp_wire_test.go
  • sender.go

Comment thread qwp_gorilla_decoder.go Outdated
Comment thread qwp_query_client.go
Comment thread qwp_query_conf.go
Comment thread qwp_query_decoder.go
Comment thread qwp_query_decoder.go
Comment thread qwp_query_io.go
mtopolnik and others added 30 commits June 3, 2026 14:13
Mirror the server and the Java and Rust clients: drop QWP's
schema-reference mode, schema ids, the schema registry, and protocol
version 2 from the Go client.

Ingress writes columns inline on every table block (no schema mode byte,
no schema id). Egress carries the schema only in the first RESULT_BATCH
of a query (batch_seq == 0); continuation batches reuse it via a
per-query schema held on the decoder and reset by the IO dispatcher at
each query start. CACHE_RESET keeps only the dict bit. SERVER_INFO is
always read post-upgrade (no version gate) and both directions advertise
the single protocol version. The now-dead v1-mismatch detection
(SawV1Mismatch on QwpRoleMismatchError) is removed, since the egress
client always reads SERVER_INFO.

Tests refreshed: reference-mode and schema-id fixtures dropped or
repurposed as continuation tests, version-negotiation tests collapsed to
the single version, and the egress test mocks now emit SERVER_INFO.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
positionCursorAt() repositions the send cursor after a reconnect, with
fsnAtZero pinned to ackedFsn+1 and nextWireSeq reset to 0, so wireSeq=0
must map to the first unacked frame. When its first
engineFindSegmentContaining() missed that frame, it fell back to the
active segment's publishedOffset(). The producer runs concurrently and
could publish the target frame in that window: the publishedOffset()
read then sat one frame past the target, so the cursor parked past it.
The send loop dropped the target frame and shifted every later frame's
FSN by one, and the server trimmed the unsent span on its next
cumulative ACK — silent row loss while close() reported clean delivery.

publishedCursor is stored after frameCount in tryAppend, and Go atomics
are sequentially consistent, so observing the post-publish offset
guarantees the frameCount bump is visible too. Re-check
engineFindSegmentContaining() after the publishedOffset() barrier: a
frame published in the window is now found and the cursor lands on it.
If the frame is genuinely not published yet, both lookups miss and the
normal send loop picks it up later, leaving the common path unchanged.
The frame-walk is extracted into positionCursorInSegment() so both the
direct hit and the re-check share it.

Mirrors the Java client fix (java-questdb-client PR #40). Adds
TestQwpSfPositionCursorAtReconnectRace, which hammers positionCursorAt()
against a live producer and asserts the cursor never parks past the
target frame; it reproduces the bug pre-fix (best under -race) and is
deterministically green post-fix.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Since questdb/questdb#7200, server master pins core's test-scoped
org.questdb:questdb-client dependency to a -SNAPSHOT version that is
not published to Maven Central. `package -DskipTests` still compiles
tests, so the qwp-fuzz job's server build now fails dependency
resolution: "Could not find artifact
org.questdb:questdb-client:jar:1.3.3-SNAPSHOT".

Mirror the server repo's .github/actions/detect-local-client step:
read questdb.client.version from core/pom.xml and, when it is a
SNAPSHOT, init the java-questdb-client submodule and pass
-Plocal-client so the existing `-pl core -am` reactor builds the
client ahead of core. Release versions keep resolving from Maven
Central with no profile, so the job works on both sides of future
client-release transitions.

Verified against post-#7200 master: the exact CI command plus
-Plocal-client builds the client (1.3.3-SNAPSHOT) in-reactor and
produces the server jar in ~45s with a warm ~/.m2.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Memory mode (no sf_dir) built its sender via a one-shot synchronous
dial of endpoints[0] through a single-host factory, installed no host
tracker, and hard-coded the default reconnect budget. As a result a
ws::addr=node-a,node-b,node-c; connect string — the README's headline
failover example, which is memory mode — only ever dialed node-a: it
ignored the rest of the addr list, initial_connect_retry, and the
reconnect_* budgets, and on node-a's death re-dialed node-a alone
until it latched a terminal HALT, losing buffered data. This
contradicted the README, which states the failover knobs apply
whether or not sf_dir is set.

Route memory mode through the same conf-driven cursor constructor
that SF mode already uses. newQwpCursorLineSenderFromConf now handles
both modes: an empty sf_dir selects a RAM-backed cursor engine (empty
slot path) and the smaller memory-mode total-bytes ceiling, while the
multi-host failover plumbing — host tracker, endpoint factory,
initial-connect mode, and reconnect budgets — is shared. The dead
single-dial branch in newQwpLineSenderFromConf is removed, along with
its now-unused address argument; the microbatch encoder presizing it
carried is preserved (and now applies to SF mode as well). The memory
engine is constructed identically to before, and the orphan-adoption
block stays inert for memory mode because validation already requires
sf_dir for drain_orphans.

Add regression tests: a dead first endpoint now fails over to the
healthy peer in memory mode, and the send loop receives the
multi-host tracker plus the user's reconnect_max_duration_millis.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Five documentation sites still described QWP Flush as a synchronous
barrier that blocks until the server ACKs — the exact opposite of the
publish-only contract the cursor architecture actually implements
(Flush returns once the batch is published into the cursor engine and
never waits for the ACK; the I/O goroutine delivers and replays in the
background). This is a data-loss trap: a memory-mode user trusting
Flush()==nil as server durability would lose unacked rows on process
exit.

Rewrite all five to the publish-only contract and point callers at
FlushAndGetSequence paired with AwaitAckedFsn for server-ACK
confirmation:

  - README.md flush-semantics paragraph ("Flush blocks until the
    server has ACKed everything ... durable on the server").
  - WithInFlightWindow godoc ("value of 1 forces synchronous mode ...
    Defaults to 128") — rewritten to the no-op contract and given a
    Deprecated: tag, since the window has no effect.
  - WithCloseTimeout godoc ("Calling Flush() before Close() guarantees
    all data is ACKed") — replaced with the close-time-drain semantics
    and the memory-loss vs SF-replay distinction.
  - examples/qwp/basic/main.go ("Flush is a synchronous barrier on
    QWP"), which is published to questdb.io and contradicted the same
    file's own async header comment.
  - qwp_sender.go in-code comment ("Explicit Flush() is where the
    drain barrier lives") — there is no drain barrier.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The never-ACKed terminal heuristic in the QWP SF send loop HALTed
after a single connection that sent frames but received zero ACKs,
treating it as an incompatible server build and refusing to retry.
A routine server restart or load-balancer RST landing in the window
between a fresh sender's first frame and its first ACK has the
identical signature, so that one strike could strand recoverable
memory-mode data.

The guard now counts consecutive ACK-less connections in a new
silentConnStrikes counter and only declares the server incompatible
once it reaches qwpSfMaxSilentConnStrikes (2) — at least one full
reconnect+replay cycle that still met nothing but silence. A lone
transient drop reconnects and replays instead. A genuinely
incompatible server still fails fast, now at the cost of exactly one
extra reconnect, so the original port-hammering guard is preserved.
The counter needs no reset: the guard's totalAcks == 0 precondition
freezes it the moment any ACK lands.

Tests: add a silentDropUntilConn knob to the fake server to model a
transient first-connection drop, add
TestQwpSfSendLoopSilentDropOnFirstConnReconnects to pin the recovery
path, and update the terminal test to reflect the two-strike bound.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A QWP cursor flush whose encoded frame exceeded the per-segment byte
cap returned qwpSfErrPayloadTooLarge while retaining the pending rows
(the retain-on-error contract meant for transient backpressure).
Because the segment cap never grows and the per-table split path was
not ported from the Java client, every subsequent flush re-encoded the
same frame and failed identically, and Close lost the batch. This was
reachable with zero misconfiguration: the shipped 8 MiB byte-trigger
default sits above the 4 MiB segment default.

Three changes close it:

  - Clamp: effectiveAutoFlushBytes now also clamps to 90% of the
    per-segment frame cap (maxFrameBytes, derived from the new
    engineMaxFrameBytes accessor), so the soft auto-flush fires before
    a batch can grow past what a single segment holds. Applies in both
    memory and store-and-forward modes, alongside the existing
    server-cap clamp.

  - Drop guards: enqueueCursor now drops an over-segment frame with a
    typed error and resets pending state instead of retaining it, so
    the sender can never become unrecoverable. The >65535-tables encode
    error, the same retain-forever family, gets the same treatment.
    Mirrors the Java client's flushPendingRowsSplit drop-and-throw.

  - Validation: sanitizeQwpConf rejects an explicitly-set
    auto_flush_bytes that exceeds an explicitly-set sf_max_bytes (new
    autoFlushBytesSet flag, set by the parser and WithAutoFlushBytes).
    Gated on explicit-set so the shipped defaults and a merely-lowered
    sf_max_bytes are left to the runtime clamp rather than rejected.

Tests: new qwp_segment_cap_guard_test.go covers the drop, the Close
path, the clamp-keeps-trigger-below-segment invariant, the no-drift
segment boundary, and the >65535-tables drop; new conf tests cover the
validation and its no-footgun accept cases; the memory-mode clamp
expectations were updated to reflect the segment floor.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The SenderErrorHandler is documented as allowed to call Close() or
Flush(). Because the handler runs on the dispatcher goroutine, those
calls raced the producer goroutine in two ways that could crash the
host process:

1. Engine teardown vs a parked append. On a HALT the send loop stops
   draining, the cursor ring fills, and the producer parks in
   engineAppendBlocking's backpressure spin calling appendOrFsn every
   park interval. A handler-invoked Close() ran engineClose ->
   segmentRingClose, which nil'd and munmapped the active segment while
   the producer was still inside appendOrFsn — a nil-pointer deref in
   memory mode, a SIGBUS on the munmapped pages in SF mode.

2. Producer-state data race. closeCursor and FlushAndGetSequence read
   and wrote producer-owned state (the tableBuffers map ranged in
   buildTableEncodeInfo, the encoder, hasTable, pendingRowCount,
   lastErr) with no happens-before against a producer mid-At(), up to
   Go's fatal "concurrent map iteration and map write".

Fix (1) by serializing the producer's only ring-append entry against
engineClose: a new engine appendMu wraps each appendOrFsn (initial try
and every backpressure-spin retry) and re-checks the closed flag under
it, while engineClose holds the lock across the manager + ring
teardown. A parked producer now unwinds with a clean
qwpSfErrEngineClosed instead of touching a torn-down segment. This is a
per-flush lock, not per-row, so the zero-alloc steady-state hot path is
unchanged.

Fix (2) by detecting when Close()/Flush() run on the dispatcher
goroutine (via the existing qwpGoid/loopGoid machinery, gated so the
runtime.Stack cost is only paid once the dispatcher has started) and
skipping all producer-state access in that case — running only the
goroutine-safe teardown / latched-error surfacing.

A handler-invoked Close() consequently no longer flushes rows the
producer staged but had not flushed itself; it cannot do so safely from
another goroutine. The SenderErrorHandler doc is updated to state this,
and to direct callers wanting a guaranteed flush to do it from the
producer goroutine.

Adds qwp_sender_handler_close_test.go: a deterministic engine-level
regression for the crash, a deterministic behavioral test that
off-producer Close/Flush leaves producer state untouched, and a -race
test driving the documented handler-Close path with a concurrent
producer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The RESULT_BATCH decoder materialises a row-indexed scratch array per
column (nonNullIdx, symbolRowIds, arrayRowStart, arrayElems), each
rowCount entries wide. An all-null column is nearly free on the wire — a
rowCount/8 null bitmap that zstd-compresses to almost nothing — yet
still forces that full rowCount-sized allocation, a 32–96x
amplification. row_count and column_count were each capped individually,
but their product was not, so a frame packed with all-null columns up to
the decompressed-frame cap could drive multi-GiB transient allocations
and OOM the client, defeating the decoder's "rejected before any large
allocation" hardening.

Bound the declared cell count (row_count × column_count) against
qwpMaxCellsPerBatch (= qwpZstdMaxDecompressedSize) up front, before the
per-column loop sizes any index array. A conformant server spends at
least one wire byte per cell, so a legitimate batch never declares more
cells than its maximum possible decompressed byte size; amplified frames
are now rejected with zero index-array allocation.

Add hardening tests H7a (an over-cap frame is rejected before any column
is parsed) and H7b (a batch exactly at the cap is not rejected by the
guard).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Close(ctx) with a short or already-expired ctx had shutdown() return
via ctx.Done() before the reader and dispatcher goroutines joined
(doneCh). Close then ran tr.close() unconditionally, which set
t.conn = nil — racing the reader goroutine's per-iteration read of
io.transport.conn (and the dispatcher's sendMessage reads of the same
field). That is a data race and can nil-deref the unsupervised reader
goroutine, crashing the host. reconnectAndReplay's 5s-bounded cleanup
had the same shape.

Make the transport's conn immutable after a successful connect(). It
is the cleaner root-cause fix: close() no longer nils the field, it
just shuts the connection down — which already errors every in-flight
Read/Write — guarded by a sync.Once so repeat calls stay idempotent.
readerRun captures the conn once into a local. Removing the racing
write covers Close, reconnectAndReplay, and the defensive teardown
paths at once, without touching that orchestration. coder/websocket's
Conn.Close is itself safe under concurrent and repeated calls, and the
ingest send loop already joins its I/O goroutine before close(), so it
is unaffected.

Add TestQwpQueryClientCloseShortCtxNoReaderRace, which reproduces the
race under `go test -race`. Update TestQwpTransportConnectAndClose to
assert the new invariant: conn is retained but dead (I/O errors) and
close() is idempotent.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A flush whose encoded frame exceeded a wire cap dropped every pending
row in every table and returned a typed error, even though the batch
was usually over the cap only because it aggregated many small tables.
Java's flushPendingRows splits such a batch per table and drops only
the irreducible single-table-over-cap case; the Go "Mirrors Java"
comment was wrong.

enqueueCursor now falls back to enqueueCursorSplit when the combined
frame overruns the server batch cap or the per-segment frame cap. The
split re-encodes each table as its own self-sufficient single-table
frame (full symbol dict from id 0 + full inline schema, so each one
replays on its own) and appends every table that fits. Only a table
whose own frame is still over-cap is irreducible: its rows are dropped
and named in the error while every other table goes out.

The retain-on-error contract now holds per table — a table is reset
only once its frame is in a segment, so a transient append failure
mid-split retains the failed table and the unprocessed tail without
re-sending what already landed; recomputePendingFromBuffers reconciles
the aggregate counters from the buffers in that case.

The shared frameCapExceeded helper replaces the two inline drop guards
and keeps the existing error substrings, so the single-table cap-guard
tests are unaffected. New tests cover the segment-cap split, the
server-cap split, and the all-fit aggregation case.

This does not fully close the async-initial-connect poison: while the
server cap is still 0 (pre-bind), an over-server-cap frame under the
segment cap is persisted whole and can replay into ws-close[1009] on
restart. Closing that needs a separate pre-bind cap or replay-time
revalidation.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The user-supplied WithErrorPolicyResolver callback ran unguarded inside
qwpSfPolicyResolver.resolve, which the receiver goroutine invokes
synchronously when classifying a server rejection. A panic in that
callback would unwind the receiver goroutine and take down the host
process.

Wrap the callback in a callResolver helper that recovers, logs at
[ERROR], and falls back to qwpSfDefaultPolicyFor(c) — mirroring the
existing panic guard around the error handler in
qwpSfErrorDispatcher.deliver. The fallback is always a concrete
Halt/DropAndContinue (never PolicyAuto), so resolve's != PolicyAuto
check short-circuits the precedence chain: a broken resolver yields the
safe spec policy rather than silently deferring to a per-category or
global slot.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
FlushAndGetSequence publishes pending rows into the cursor engine
(durable — an FSN is assigned and the frame is queued for replay) and
then eagerly samples the send loop's latched error. A HALT latched by a
previous batch can land in the window between the publish and that eager
check, so the call returns (-1, err) for a batch that is already sealed
in a segment.

On that path the table buffers were not reset, leaving the rows both
queued for replay and retained as pending. A user following the
documented close+rebuild recovery would re-send the "failed" batch and
double-write it once the SF slot replays the sealed frame.

Reset the buffers inside flushCursor as soon as the enqueue succeeds,
before the eager error check, and drop the now-redundant reset from
FlushAndGetSequence. Retain-on-error is preserved: when enqueueCursor
fails before sealing, flushCursor returns before the reset, so those
un-persisted rows stay pending for the next attempt. Mirrors the Java
client, which resets in flushPendingRows before checkError() throws.

Add a regression test that makes the race deterministic by filling the
engine ring to its cap so the publish parks on backpressure, then
latching the terminal error and freeing a segment so the parked append
completes before the eager check fires.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
target=primary|replica was accepted by the parser and fed into the SF
ingress host tracker, but the ingest connect path does not select
endpoints by server role. The round-walk rejected every successful
upgrade as a role mismatch and re-swept with backoff until
reconnect_max_duration expired (~5 min), then HALTed — a connect/close
storm against the cluster that never delivered a row.

Build the ingress tracker with qwpTargetAny and bind on any successful
upgrade, mirroring how zone= is already handled: target= is accepted
and validated at config time but inert on ingestion. Endpoint selection
by role remains fully honoured on the egress query path
(qwp_query_failover.go), which has its own tracker. No user-facing API
change; WithTarget / target= still parse and validate.

Replace the round-walk tests that pinned the old topology-reject
behavior with TestRoundWalkIngressIgnoresTargetFilter (binds for
any/primary/replica) and add TestQwpIngressAcceptsTargetInert, an
end-to-end guard that connects with target=primary/replica and confirms
delivery via the flush+ACK barrier.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
QWP was flattened to a single wire version (qwpVersion = 0x01; "the
sole QWP protocol version"), and the server emits SERVER_INFO as the
first post-upgrade frame regardless. Comments across the QWP code still
described a pre-flattening world of multiple wire versions where "v1
servers don't emit SERVER_INFO" and "v2 is required" for the role/zone
data — a model that no longer exists and that misexplains current
behavior.

Reword those comments to the current facts:

- Egress: a nil ServerInfo means the client did not consume the frame
  (serverInfoTimeout disabled, or no parseable frame), not that it
  talked to a "v1 server." target=primary/replica needs the role from
  SERVER_INFO; without consuming it the role is unknown.
- Ingest: target= / zone= are inert because the ingestion path does not
  route by server role or zone (an egress-only feature), not because it
  is "v1-pinned" or "never reads SERVER_INFO."
- Drop "QWP v1 protocol specification" / "QWP v1 binary messages" — there
  is one version.

Comment-only; no behavior change. ILP's genuine protocol versions
(V1/V2/V3) are untouched, as are the qwp_transport.go comments about
ingest SERVER_INFO consumption, which describe a separate open question.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The wire-protocol specs (connect/wire-protocols/qwp-{ingress,egress}-
websocket.md), the server code (QwpEgressUpgradeProcessor appends it;
QwpIngressUpgradeProcessor does not), and a live probe all agree: the
server delivers an unsolicited SERVER_INFO frame as the first frame
only on the egress endpoint (/read/v1). The ingest endpoint (/write/v4)
sends none, and the client never expects one — per the ingress spec's
lifecycle it sends data right after the upgrade and the first inbound
frame is an ACK.

Correct the comments accordingly:

- qwp_transport.go claimed "the server always emits SERVER_INFO as the
  first post-upgrade frame" — true only on the read endpoint. Scope it,
  and state that the ingest endpoint sends none and the client never
  expects one (so readAck reads the first frame as an ACK).
- Sharpen the ingress role/zone comments to the spec's wording: ingress
  is role- and zone-blind and never receives SERVER_INFO. Role
  enforcement on ingress is the server's 421 + X-QuestDB-Role upgrade
  reject, not a client-side filter — any node that completes the ingest
  upgrade is already write-eligible, which is precisely why target= (and
  zone=) are correctly inert on ingest.

Add TestQwpServerInfoIsEgressOnly: opts into a synchronous first-frame
read on each endpoint and asserts egress delivers SERVER_INFO while
ingest times out. It fails loudly if the server ever starts emitting
SERVER_INFO on /write/v4 (which would require the ingest read path to
consume and discard it). Self-skips without a live server.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The receiver clamped incoming ACK sequences against nextWireSeq-1,
but nextWireSeq is bumped before sendMessage, so that ceiling
covered the frame currently inside the wire write. A non-compliant
server's early or forged ACK naming that in-flight sequence passed
the clamp and let engineAcknowledge advance ackedFsn over it. The
segment manager then trims (munmaps) the segment whose bytes
sendMessage is still reading — SIGSEGV — or marks a never-delivered
frame acked — silent loss. The ring's own publishedFsn clamp does
not help: a frame is published well before it is sent.

Add a highestFullySent counter, advanced to the frame's wire
sequence only after sendMessage returns and reset to -1 on every
(re)connect. Both receiver clamp sites — the OK path and the
drop-and-continue rejection path — now key off it instead of
nextWireSeq-1. nextWireSeq's pre-bump stays; it now only feeds the
sender's own wireSeq/fsnSent derivation. A spec-honoring server is
unaffected: it can only ACK sequence N after fully receiving frame
N, by which point sendMessage(N) has returned. ACKs are cumulative
and idempotent, so the rare in-process store-visibility lag
self-heals on the next ACK.

Add TestQwpSfSendLoopReceiverClampsForgedAckToFullySent, which
drives receiverLoop with a frame pinned mid-sendMessage and a
forged ACK naming it, covering both the OK and rejection clamp
sites.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
qwpTransport.connect() builds a fresh one-shot http.Transport per call
with keep-alives at the default. On a non-101 upgrade response (421
role-reject, 503 proxy, ...) coder/websocket reads the body to EOF and
closes it, which returns the keep-alive TCP conn to that transport's
idle pool. Nothing reuses the abandoned transport or calls
CloseIdleConnections on it, so the parked conn plus its two persistConn
read/write goroutines leak. Role-rejects are steady-state in a failover
topology, so the leak accumulates — dozens to hundreds against a
503-ing proxy over a multi-minute outage budget.

Set DisableKeepAlives on the one-shot transport. This prevents pooling
at the source: tryPutIdleConn rejects the conn, it is closed, and the
goroutines exit. It is race-free, unlike a post-hoc CloseIdleConnections
call, which can run before the transport's readLoop has parked the conn.
A successful 101 hijacks the conn out of pool management, so the flag
never affects the live WebSocket, and it adds no Connection: close
header to conflict with the WS upgrade.

Add TestQwpTransportUpgradeRejectNoConnLeak, which drives 30 non-101
rejects through connect() and asserts the goroutine count stays flat.
It fails without the fix (~2 leaked goroutines per cycle).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Two idle-path costs in the QWP store-and-forward stack, from review
comment M13.

The send loop's fallback park timer was 50µs, re-armed on every idle
iteration. The publish doorbell already makes steady-state sends
event-driven, so this timer only bounds recovery from a missed wakeup
and never gates send latency; at 50µs it burned ~13% of a core per
idle sender on Darwin. Widening it to 1ms cuts the idle wake rate 20x
with no steady-state latency change. The Java spec's 50µs assumes
LockSupport.parkNanos, which is cheap to rearm; a Go time.Timer
re-armed at 20kHz is not, so matching the constant would not match the
cost. The single default feeds the ingest loop, the SF cursor, and the
orphan drainers.

The segment manager's 1ms worker tick allocated a fresh ring-snapshot
slice every tick. It now refills a worker-owned ringSnapshot field via
append(s[:0], ...), reusing the backing array for zero steady-state
allocations.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Dump mode (WithQwpDumpWriter) wires the client to an in-process
net.Pipe and a fake WebSocket server rather than a real connection.
net.Pipe is synchronous — a Write blocks until the peer reads — so the
fake server's OK ACK could reach the receiver before the send loop
stored highestFullySent. That store happens only after sendMessage
returns, because it gates segment munmap; until it lands the receiver
clamps any ACK away via its highestFullySent < 0 guard. The sole ACK
was dropped, ackedFsn never advanced, and Close drained until its 5s
timeout. The failure was scheduler-sensitive: ~9/10 under default
GOMAXPROCS and 100% under GOMAXPROCS=1. TestQwpDumpWriter is the only
test exercising this path.

A real socket cannot hit this: kernel send-buffering returns
sendMessage (and stores highestFullySent) long before any network ACK
returns. asyncWritePipeConn restores that ordering by buffering the
dump-mode client write — Write queues and returns immediately, and a
single pump goroutine drains the queue to the synchronous pipe in FIFO
order. It also closes a latent pump/fake-server goroutine leak on dial
failure: the deferred cleanup now closes the buffered conn, which both
stops the pump and closes the pipe. Dump mode stays fully in-process,
which the public WithQwpDumpWriter relies on.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A mixed-case column name forced getOrCreateColumn off its fast path
and into a strings.ToLower map probe on every cursor miss. ToLower
allocates a fresh lowercase key whenever the name has an uppercase
letter, so a sparse mixed-case writer paid one allocation per
cursor-miss column, every row — measured at +17 allocs and ~25%
ns/op over the all-lowercase path. The lowercase-only zero-alloc
benchmark never exercised it.

The cursor fast path now compares with an ASCII case-insensitive
fold (qwpASCIIEqualFold) instead of a case-sensitive ==, matching
the server's ASCII-only column-name folding (Java
Chars.toLowerCaseAscii). A same-order writer keeps the fast path
regardless of casing, with no allocation. The fold guards the 0x20
case bit behind a letter check so legal name punctuation that
differs only in that bit ('@'/'`', '['/'{', ']'/'}', '^'/'~')
never compares equal.

On a cursor miss the slow path probes columnIndex with the name
verbatim before lowering. That hits the canonical key for
lowercase names and any memoized casing-variant alias, so the
common case skips ToLower. A ToLower hit memoizes the verbatim
casing as an alias of the canonical key, recorded in aliasKeys.
A map hit also resyncs the sequential cursor to idx+1, restoring
the fast path for the rest of the row after a sparse skip.

cancelRow drops the memoized aliases when it removes columns: an
alias maps to a column index that truncation — or the
committedColumnCount==0 reset case, which removes every column —
can leave dangling past the columns slice. Alias keys always carry
an uppercase letter, so dropping them never touches the
all-lowercase canonical keys of surviving columns.

A new mixed-case sparse benchmark and zero-alloc pin cover the
gap, alongside white-box tests for the cursor resync and the
alias cleanup.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Two QWP SF close paths used unbounded waits that could hang the
sender's Close() forever.

The error dispatcher's close() joined the loop goroutine with a bare
wg.Wait(). A user SenderErrorHandler that never returns leaves the
loop parked inside deliver(), so it never observes done and never
calls wg.Done() — close() then waits indefinitely. (The old comment
calling the drain timeout a "hard ceiling on close() blocking time"
only held for a slow handler, not a wedged one.) close() now joins
under qwpSfDispatcherCloseJoinTimeout, abandons the parked goroutine,
counts the still-queued notifications as dropped, and logs a warning.

The drainer pool's close() bounded its polite grace, then cancelled
the master ctx and waited on <-doneCh with no bound. Cancellation
unwinds ctx-aware blocking (TCP dials, the poll loop) but not
drainerRun's engine-open phase — flock, mmap, a full CRC scan of a
possibly-huge slot, or hung NFS make no ctx checks. A drainer wedged
there kept close() blocked despite the cancel. A bounded
qwpSfDrainerPoolHardCloseGrace now follows the cancel; a drainer
still alive past it is abandoned with a logged count, leaving its
slot a valid orphan for a future sender to re-adopt.

Both abandon paths leak the wedged goroutine until its underlying
call returns, which is inherent: a goroutine stuck in user code or an
un-cancellable syscall cannot be force-killed. Each fix ships with a
regression test that pins a handler / ignores ctx and asserts close()
returns within the new bound.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address a pre-release review of the QWP public API, fixing
"one-way door" naming and visibility issues that would be costly
to change after a tag.

- Export the constants behind documented byte/uint32 fields so they
  are usable without reading the source: QwpRole* for
  QwpServerInfo.Role, QwpType* for QwpColumnBatch.ColumnType,
  QwpCapZone for QwpServerInfo.Capabilities, and a curated set of
  QwpOpType* (mirroring CompiledQuery.TYPE_*) for ExecResult.OpType.
  A pin test locks the hand-entered QwpOpType* values to the
  server's discriminators.

- Export QwpTargetFilter and have both WithTarget (ingest) and
  WithQwpQueryTarget (egress) take it, replacing the egress option's
  string argument and the dead "stash 255, validate() catches it"
  sentinel with a real range check in validate().

- Drop the SerializedBatch = QwpColumnBatch alias; CopyAll now
  returns *QwpColumnBatch, with the alias's lifetime notes folded
  into the method doc.

- Export the store-and-forward backpressure sentinel as
  ErrBackpressureTimeout (errors.Is-matchable) so the most
  operationally important transient error is no longer only
  string-matchable.

- Rename QueryOption to QwpQueryOption and WithQueryBinds to
  WithQwpQueryBinds for a uniformly Qwp-prefixed query surface.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The cursor/SF durability layer's headline guarantees were under-tested:
several were never exercised in CI, and a few had no test at all. This
addresses review item M14 by closing those gaps.

- CI now runs the 0-allocs/op invariant. The Test*ZeroAllocs pins
  self-skip under -race, and build.yml's only test step was -race, so
  the allocation-free guarantee never actually ran. Add a cheap
  non-race `go test -run ZeroAllocs` step.

- Disk-backed replay is now exercised outside the jar-gated fuzz
  workflow. Parametrize the gap-free-replay, reconnect-after-close, and
  drop-and-continue send-loop tests over {memory, disk} engine
  backings.

- ENOSPC/fallocate failure is now covered. Add a qwpSfReserveNewBlocksFn
  seam (mirroring the Java FilesFacade fault-injection point) so a
  reservation failure can be injected without filling a disk, then
  assert qwpSfAllocate surfaces the error without extending the file
  (no sparse mapping -> no later SIGBUS) and qwpSfCreateSegment unlinks
  the partial .sfa.

- Flush's no-ACK-wait contract is now pinned for the pending-rows path
  (only the zero-pending fast path was covered). Against a silent-ACK
  server, FlushAndGetSequence returns promptly with rows published
  while ackedFsn stays behind publishedFsn.

- The .sfa on-disk format is now locked against the Java MmapSegment
  layout with a golden-image conformance test covering the constants,
  the Go reader, and the Go writer (byte-for-byte modulo createdMicros).

The remaining M14 item - a lying ACK with seq beyond the last
fully-sent frame - was already covered by the receiver's forged-ACK
clamp regression test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The CLAUDE.md QWP section and two source comments carried claims that
would misdirect maintenance:

- "the trackers exist for tests and external observers" implied
  maxSentSymbolId / batchMaxSymbolId could be dropped. In fact
  batchMaxSymbolId is the batchMaxId arg to
  encodeMultiTableWithDeltaDict and bounds the dict written to the wire
  (writeDeltaDict emits globalDict[0..batchMaxSymbolId]); dropping it
  would silently truncate the symbol dictionary. maxSentSymbolId is the
  cross-flush high-water mark resetAfterFlush rewinds batchMaxSymbolId
  to. Both fields are load-bearing.

- The qwp_sender.go field comment called maxSentSymbolId "the highest
  symbol ID ACKed by the server"; it actually advances at append time
  (right after engineAppendBlocking), never on server ACK.

- The disk-backed segment path was written
  <sf_dir>/<sender_id>/<slot>/*.sfa, which has a phantom level: the
  per-sender directory is itself the slot, so the real layout is
  <sf_dir>/<sender_id>/*.sfa.

Corrects CLAUDE.md, the review-pr skill (same phantom path), and the
qwp_sender.go / qwp_sender_cursor.go comments. Documentation and
comments only; no behavior change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Three independent findings from a QWP review pass.

Dump-mode fake server: emit 0-based cumulative ACK sequences. It
incremented seq before building the frame, so the first batch (FSN 0)
was acked as sequence 1. The ring.acknowledge publishedFsn clamp
silently absorbed the off-by-one, so dump mode exercised a different
ACK path than production and could mask sequencing bugs. Build the
ACK with the current seq and post-increment instead.

Segment ring: re-arm the high-water-mark backup wakeup on every
rotation. wakeupRequestedForActive had two set sites and no clear
site, so after the first rotation or HWM crossing it latched forever
and the backup manager-nudge fired only once over the ring's whole
lifetime instead of once per active segment. Reset it to false when a
spare is promoted so each fresh active can again nudge a slow segment
manager. A regression test pins the per-segment behavior.

AwaitAckedFsn: wait event-driven instead of polling a 5ms ticker. The
poll imposed a ~2.5ms mean confirmation floor (<=200-400 confirmed
batches/s). It now blocks on a broadcast channel that ring.acknowledge
closes on every ackedFsn advance, plus the send loop's done channel
(closed on Close and on every HALT, so it never hangs to ctx) and the
caller's ctx. The notify channel is lazily created and nil when idle,
so an ACK with no waiter costs only a mutex and the producer hot path
is untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The bulk batch accessors documented mis-typed calls as producing
"numeric noise, not a type error", but several actually panicked with
opaque runtime errors: Int64Range on a bit-packed BOOLEAN overran the
values slice, and Float64Array/Int64Array on a non-array column indexed
the empty arrayRowStart side table. Make the contract honest by guarding
where the check is cheap and documenting the truth where it is not.

The *Range accessors now require a matching fixed element width and the
array accessors require a DOUBLE_ARRAY/LONG_ARRAY column; a mismatch
panics with a typed message naming the column and its wire type. The
guards run once per bulk call, so they stay off the zero-alloc row-sweep
path (benchmarks still report 0 allocs/op). Same-width reinterpretation
is intentionally preserved: Int64Range on a DOUBLE column still yields
the raw bits, as before. The per-cell accessors stay unguarded for
latency, and their doc now states plainly that a mis-typed per-cell call
is undefined and may reinterpret bytes or panic.

New qwpIsArrayType and qwpTypeName helpers back the guards and their
messages.

Tests pin the new behavior and close the related egress gaps called out
in the review: Range/array type-mismatch panics and per-cell OOB
characterization; CopyAll survival across pool reuse for SYMBOL (cloned
row ids + frozen dict view) and ARRAY (cloned side tables + rebound
payload) columns; CACHE_RESET arriving mid-query; credit starvation with
a never-releasing consumer; and the egress bind path. The review's
"v1-server (no SERVER_INFO)" bind item was reframed to a plain egress
bind-path test: SERVER_INFO is endpoint-based (egress always sends it),
not version-gated, so a no-SERVER_INFO egress server is not a real
scenario.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address a batch of QWP review findings spanning the send loop,
egress I/O, transport, and dependencies.

Transport lifetime: the SF send loop's terminal (HALT) exits
returned without closing the active WebSocket, so a dead socket and
its server-side connection lingered until the user called Close().
run() now closes the transport on every loop exit; the swap is
idempotent against the clean-shutdown path.

Egress framing and buffers: parseFrameHeader now validates the
header's declared payload_length against the actual body size and
rejects a mismatch instead of decoding a desynced frame. The
dispatcher recycles the pooled read buffer for every non-RESULT_BATCH
frame (RESULT_END, QUERY_ERROR, EXEC_DONE, CACHE_RESET, and the error
paths) rather than dropping it to GC, restoring the read-buffer pool
win on query-heavy workloads.

Credit attribution: a batch buffer released after its query ended and
the next one began added its bytes to the new query's CREDIT window.
Buffers are now stamped with their request id and credit only the
query the dispatcher is currently serving.

Upgrade errors: a dial failure carrying a response (notably a 101
status with a bad Sec-WebSocket-Accept) discarded the real cause
behind a "rejected with HTTP 101" message. QwpUpgradeRejectError now
wraps the dial error via Unwrap and surfaces it in the 101 message;
host-role classification is unchanged.

Docs and deps: refreshed stale async-era comments that referenced the
removed qwpAsyncState path and the old Flush-waits-for-ACK contract.
Swapped golang.org/x/exp/slices for the stdlib (dropping the direct
dependency) and bumped klauspost/compress to v1.18.4, the newest
release that stays within the go 1.23 floor.

Added regression tests for the payload_length check and the
upgrade-error wrapping.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address a batch of QWP egress review comments spanning two hot-path
optimizations, one durability fix, several weak/stale tests, and
release housekeeping.

Flush no longer walks every table buffer. Table() now records the
buffers it touches in a per-cycle dirty set, and buildTableEncodeInfo,
resetAfterFlush, and recomputePendingFromBuffers iterate only that set
instead of the whole tableBuffers map. A sender juggling hundreds of
tables now pays per flush only for the handful actually written; the
emitted frames are unchanged, only the iteration scope narrows.

The 2D/3D array column writers no longer allocate a flattened temp and
copy every element twice. A new reserveArrayValue helper reserves the
header + payload in arrayData once, and the typed writers stream
elements straight in after a cheap shape-regularity pre-check. The
zero-alloc steady-state hot path is preserved.

The SF ack-watermark open path now forces a real disk block under an
existing 16-byte file by reading its bytes and writing them back.
qwpSfAllocate no-ops on an already-full-size file, so a foreign sparse
watermark on a full disk would otherwise SIGBUS the manager goroutine
on first store; ENOSPC now surfaces at open and degrades to the
no-watermark fallback instead.

Test fixes:
- Delete the weak TestErrorApiHaltVsConcurrentFlush (asserted any-of-N
  and exercised an unsupported concurrent-producer pattern); the strict
  sibling already pins the all-of-N post-latch contract.
- Race TestQwpEgressIOReleaseClosePoolRace against the real shutdown()
  and dispatcher teardown rather than a hand-rolled copy of it.
- Strengthen TestQwpExecOptInReplaysTransparently to assert the fault
  fired and the replayed EXEC_DONE decodes to the expected result.
- Drop the misleading ws+retry_timeout parser happy-case and add
  TestQwpSanitizeRejectsRetryTimeout for the sanitizer rejection.

Housekeeping:
- examples.manifest.yaml: qwp-ingest is option-based, so use the addr:
  shape rather than a conf: literal.
- Add the license banner to the e2e sidecar; fix the non-standard
  banner variant in qwp_egress_bench_test.go.
- .gitignore: add Python caches and the example/bench/sidecar build
  artifacts.
- Bump the stale qwpClientId from go/4.1.0 to go/4.3.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Commit ef6ca50 bumped the root module's github.com/klauspost/compress
from v1.17.0 to v1.18.4, but the three modules that depend on the root
through a replace directive were not re-tidied, so their go.mod/go.sum
kept the stale v1.17.0 indirect pin.

CI runs `go mod tidy -diff` over every bench/*/go.mod, which fails with
an actionable diff on that drift. Re-tidy all three replace-linked
modules so they match the bumped version:

  - bench/qwp-egress-read
  - bench/qwp-egress-read-wide
  - system_test/enterprise_e2e/sidecar

The sidecar is not gated by the bench tidy-diff loop, but it shares the
same drift, so it is re-tidied here for consistency and for the
enterprise e2e build.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants