diff --git a/.bumpversion.toml b/.bumpversion.toml index c25d9d42..82dc93a5 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "6.1.0" +current_version = "7.0.0" commit = false tag = false diff --git a/.claude/skills/review-pr/SKILL.md b/.claude/skills/review-pr/SKILL.md index 2e4dcab1..5e8db9b0 100644 --- a/.claude/skills/review-pr/SKILL.md +++ b/.claude/skills/review-pr/SKILL.md @@ -130,6 +130,20 @@ Group the callsites from 2.5b by execution context. Typical contexts in this cod Every entry on this list must be reviewed in Step 3. +### 2.5e Build profile facts + +**This sub-step runs at every level, including levels 0 and 1 where the rest of Step 2.5 is skipped.** A single `Cargo.toml` setting can flip the panic-safety story for the entire crate; agents must reason from the actual profile, not from defaults. + +Read `questdb-rs/Cargo.toml` and `questdb-rs-ffi/Cargo.toml` and record, with file:line citations: + +- **panic strategy** per profile (`[profile.release]`, `[profile.dev]`). If `panic = "abort"` in either, **every `catch_unwind` in that crate is a no-op for that profile** and every reachable panic is a process abort. Agents 2, 3, and 4 (and the level-0 inline review) must not credit `catch_unwind` as a panic guard under `panic = "abort"`. The only acceptable defense under abort-panic is proving no panic path exists. +- **overflow-checks** per profile. If `overflow-checks = false` in release (the default), integer overflow wraps silently in release builds instead of panicking — bugs that look like panics in test builds disappear into wrong values in production. State which mode applies. +- **`[profile.*.package.*]` overrides** if present — a per-dependency profile can reintroduce unwinding for one crate even when the workspace defaults to abort. +- **`#[global_allocator]`** if defined anywhere in the workspace. A custom allocator changes the OOM behavior (some abort, some unwind, some return null). +- **lto / codegen-units / strip** — informational; flag if they look unusual. + +A review without this section is incomplete. State the panic mode in one line at the top of every Step 3 agent prompt so the agent reasons from the right premise. + ## Step 3: Parallel review Every agent receives: @@ -141,6 +155,7 @@ Every agent receives: - **Bugs at callsites outside the diff outrank bugs inside the diff.** A confirmed bug in a file the PR did not touch but that calls a changed symbol is a P0 finding. - **"Looks correct in isolation" is not a valid conclusion.** Before clearing a changed symbol, the agent must walk the callsite inventory from 2.5b and explicitly state, per callsite, whether the new behavior is still correct there. - **The diff is the entry point, not the scope.** If the change surface map shows the symbol is reachable from N other files, the review covers N+1 files. +- **Crate-wide settings affect untouched code.** A change to `Cargo.toml` (panic strategy, allocator, feature defaults, MSRV, profile overrides), a new `#[global_allocator]`, or a new `panic_handler` retroactively changes the safety story for every existing function in the crate — not just the diff. When `Cargo.toml`, build scripts, or workspace-level config files appear in the diff, the review covers the panic/allocation/overflow contract of the **entire affected crate**, not just the touched lines. The same applies when 2.5e records a profile fact (e.g. `panic = "abort"`) that invalidates existing safety patterns in untouched code. - A single finding of the form "in `test_line_sender.cpp` the new behavior of `line_sender_buffer_column_f64` causes Y" is worth more than five findings inside the diff. ### Agents @@ -160,7 +175,12 @@ Launch the following agents in parallel. - **C++ exceptions escaping into C:** the C++ wrapper (`include/questdb/ingress/*.hpp`) is reachable from pure-C callers via inline forwarders. Any path where the wrapper can throw (`std::bad_alloc`, `std::system_error`, user-defined `throw`) and reach a C caller is undefined behavior. Verify wrapper functions called from C are `noexcept` or only invoked from C++ contexts. - **SIGPIPE on broken sockets:** writing to a closed peer raises SIGPIPE by default on Linux/macOS, killing the process. Verify TCP/HTTP write paths set `MSG_NOSIGNAL` or mask SIGPIPE. -Every fallible operation must use `Result`/`Option` with proper error propagation. Every `extern "C"` function must wrap its body in `catch_unwind` or prove no panic path exists. +**Panic strategy is the foundation.** Before reasoning about any panic guard, look up the `panic` setting from Step 2.5e: + +- **Under `panic = "abort"`**, `catch_unwind` is a no-op — it cannot catch anything because nothing unwinds. Every reachable panic is a process abort regardless of where the `catch_unwind` is placed. The only acceptable defense is *proving no panic path exists*: front-load every length check, replace `unwrap`/`expect`/indexing on wire-derived or caller-supplied values with `Result`-returning equivalents, validate before allocating, use `checked_*` arithmetic. A `catch_unwind` wrapper in this mode is misleading documentation, not a safety net — flag it if it gives the reader false confidence. +- **Under `panic = "unwind"`**, every `extern "C"` function must wrap its body in `catch_unwind` AND every `Drop` impl on the unwind path must be panic-free (double-panic aborts the process). Fallible operations must use `Result`/`Option` with proper error propagation. + +State which panic mode applies in the agent's first sentence. Every panic-related finding must be evaluated under the actual mode, not the textbook one. **Agent 3 — FFI boundary safety:** Check every `#[no_mangle]` / `extern "C"` function. Verify: NULL pointer checks on all pointer arguments, proper error propagation across the FFI boundary (no panics escaping into C), correct ownership transfer semantics (who allocates, who frees), buffer length validation, string encoding correctness (UTF-8 ↔ C strings, NUL handling), and that the C header (`include/questdb/ingress/line_sender.h`) and C++ wrapper (`include/questdb/ingress/line_sender.hpp` + the split `line_sender_core.hpp` / `line_sender_array.hpp` / `line_sender_decimal.hpp`) accurately reflect the Rust implementation. If `cbindgen.toml` is involved, verify generated output matches handwritten headers. @@ -250,10 +270,12 @@ Review the diff for: - All `unsafe` blocks have documented safety invariants - No undefined behavior: dangling pointers, use-after-free, double-free, data races - Proper `Send`/`Sync` bounds on public types -- No panics that can escape FFI boundaries (every `extern "C"` function uses `catch_unwind` or proves panics are impossible) +- No panics that can escape FFI boundaries — and the meaning of "escape" depends on the panic strategy (see Step 2.5e). Under `panic = "abort"`, `catch_unwind` is a no-op and *every* reachable panic is a fatal escape; the FFI function must prove no panic path exists. Under `panic = "unwind"`, every `extern "C"` function must wrap its body in `catch_unwind`. ### Crash surface -Anything that aborts the Rust side aborts the host process. Beyond panics, check for: +Anything that aborts the Rust side aborts the host process. The first check is the panic strategy itself — everything else is downstream of it. + +- **Panic strategy** (from Step 2.5e): under `panic = "abort"`, the entire `catch_unwind` defense collapses — every panic across the entire crate is fatal. Verify the profile before crediting any panic guard. A finding that says "the panic at X is caught by `catch_unwind` at Y" is incorrect under abort-panic. - Direct aborts: `std::process::abort()`, `libc::abort()`, `std::intrinsics::abort()` - Allocation-failure aborts: any allocation sized by an untrusted length parameter must validate the bound before allocating (Rust's default allocator aborts on OOM) - Stack overflow: unbounded recursion, recursive `Drop` impls, deeply nested untrusted input diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..a9988c76 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "questdb"] + path = questdb + url = https://github.com/questdb/questdb.git + branch = master diff --git a/CMakeLists.txt b/CMakeLists.txt index fd10cd2c..76587cb8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.15.0) -project(c-questdb-client VERSION 6.1.0) +project(c-questdb-client VERSION 7.0.0) set(CPACK_PROJECT_NAME ${PROJECT_NAME}) set(CPACK_PROJECT_VERSION ${PROJECT_VERSION}) @@ -31,12 +31,66 @@ option( "Build shared library dependencies instead of static." OFF) +# Opt in to the synchronous WebSocket egress reader (`line_reader_*` +# C/C++ surface). Pulls in the `tungstenite` + `zstd` Rust crates as +# transitive dependencies, so downstreams that only need the line +# sender can flip this OFF to keep the resulting library minimal. +# Defaults ON for the in-tree build because the `line_reader_*` +# examples and tests need it; external consumers who add this +# project via `add_subdirectory` should set +# `-DQUESTDB_ENABLE_READER=OFF` if they don't want the surface. +option( + QUESTDB_ENABLE_READER + "Enable the synchronous WebSocket egress reader (line_reader_* API). Adds tungstenite+zstd." + ON) + +# When QUESTDB_TESTS_AND_EXAMPLES is enabled the reader-related +# examples and tests need the reader API present. Refusing to build +# in that combination is friendlier than producing a confusing link +# error from missing `line_reader_*` symbols. +if(QUESTDB_TESTS_AND_EXAMPLES AND NOT QUESTDB_ENABLE_READER) + message(FATAL_ERROR + "QUESTDB_TESTS_AND_EXAMPLES=ON requires QUESTDB_ENABLE_READER=ON: " + "the line_reader_* examples and tests would fail to link without it.") +endif() + +# Compile in the `tls_verify=false` (sender) / `tls_verify=unsafe_off` +# (egress reader) escape hatch. ON by default to preserve the legacy +# behaviour of the shipped C ABI; security-conscious distributions can +# flip it OFF (`-DQUESTDB_ENABLE_INSECURE_SKIP_VERIFY=OFF`) to harden +# the resulting library — `line_sender_opts_tls_verify` then disappears +# from the symbol table and `tls_verify=unsafe_off` in a connect string +# is rejected at parse time. +option( + QUESTDB_ENABLE_INSECURE_SKIP_VERIFY + "Compile in support for tls_verify off. Allows downstream code to disable TLS certificate verification at runtime." + ON) + +option( + QUESTDB_SANITIZE + "Build the C/C++ tests with -fsanitize=address,undefined." + OFF) + # Build static and dynamic lib written in Rust by invoking `cargo`. # Imports `questdb_client` target. add_subdirectory(corrosion) -corrosion_import_crate( - MANIFEST_PATH questdb-rs-ffi/Cargo.toml - LOCKED) # Use `Cargo.lock` +set(QUESTDB_CARGO_FEATURES "") +if(QUESTDB_ENABLE_READER) + list(APPEND QUESTDB_CARGO_FEATURES sync-reader-ws) +endif() +if(QUESTDB_ENABLE_INSECURE_SKIP_VERIFY) + list(APPEND QUESTDB_CARGO_FEATURES insecure-skip-verify) +endif() +if(QUESTDB_CARGO_FEATURES) + corrosion_import_crate( + MANIFEST_PATH questdb-rs-ffi/Cargo.toml + LOCKED + FEATURES ${QUESTDB_CARGO_FEATURES}) +else() + corrosion_import_crate( + MANIFEST_PATH questdb-rs-ffi/Cargo.toml + LOCKED) +endif() target_include_directories( questdb_client INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/include) @@ -44,7 +98,7 @@ if(WIN32) set_target_properties( questdb_client-shared PROPERTIES - DEFINE_SYMBOL "LINESENDER_DYN_LIB") + DEFINE_SYMBOL "QUESTDB_CLIENT_DYN_LIB") target_link_libraries( questdb_client-shared INTERFACE wsock32 ws2_32 ntdll crypt32 Secur32 Ncrypt) @@ -82,6 +136,27 @@ function(set_compile_flags TARGET_NAME) endif() endfunction() +function(apply_sanitizers TARGET_NAME) + if(NOT QUESTDB_SANITIZE) + return() + endif() + if(MSVC) + message(WARNING + "QUESTDB_SANITIZE is not supported on MSVC (its ASan runtime is " + "not validated against the rustc-built library); skipping.") + else() + target_compile_options( + ${TARGET_NAME} PRIVATE + -fsanitize=address,undefined + -fno-sanitize-recover=all + -fno-omit-frame-pointer + -g) + target_link_options( + ${TARGET_NAME} PRIVATE + -fsanitize=address,undefined) + endif() +endfunction() + # Examples function(compile_example TARGET_NAME) list(POP_FRONT ARGV) @@ -187,6 +262,24 @@ if (QUESTDB_TESTS_AND_EXAMPLES) compile_example( line_sender_cpp_example_decimal_binary examples/line_sender_cpp_example_decimal_binary.cpp) + compile_example( + line_reader_c_example_from_conf + examples/line_reader_c_example_from_conf.c) + compile_example( + line_reader_cpp_example_from_conf + examples/line_reader_cpp_example_from_conf.cpp) + compile_example( + line_reader_c_example_with_binds + examples/line_reader_c_example_with_binds.c) + compile_example( + line_reader_cpp_example_with_binds + examples/line_reader_cpp_example_with_binds.cpp) + compile_example( + line_reader_cpp_example_columns + examples/line_reader_cpp_example_columns.cpp) + compile_example( + line_reader_c_example_columns + examples/line_reader_c_example_columns.c) # Include Rust tests as part of the tests run add_test( @@ -207,9 +300,21 @@ if (QUESTDB_TESTS_AND_EXAMPLES) ${TARGET_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) set_compile_flags(${TARGET_NAME}) + apply_sanitizers(${TARGET_NAME}) add_test( NAME ${TARGET_NAME} COMMAND ${TARGET_NAME}) + if(QUESTDB_SANITIZE AND NOT MSVC) + # Leak detection is off: the Rust library is not instrumented and + # legitimately retains process-global allocations at exit, which + # LSan cannot tell apart from an FFI leak without a suppression + # file. ASan's use-after-free / out-of-bounds / double-free checks + # stay active. + set_tests_properties( + ${TARGET_NAME} PROPERTIES + ENVIRONMENT + "ASAN_OPTIONS=detect_leaks=0;UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1") + endif() endfunction() compile_test( @@ -217,6 +322,42 @@ if (QUESTDB_TESTS_AND_EXAMPLES) cpp_test/mock_server.cpp cpp_test/test_line_sender.cpp) + # Live-broker integration tests for the egress reader. Skips per-test + # if no broker is reachable on QDB_LIVE_BROKER_HOST:QDB_LIVE_BROKER_HTTP_PORT + # (defaults: localhost:9000), so this test is safe to wire into ctest + # even on machines without a broker. + compile_test( + test_line_reader + cpp_test/test_line_reader.cpp) + + # Broker-independent smoke test for the line_reader FFI. Targets a + # guaranteed-closed port (127.0.0.1:1) and asserts the FFI surfaces + # a non-NULL error that can be inspected and freed. Uses standard + # exit-code semantics so SIGSEGV / SIGABRT correctly fail the test + # (the previous WILL_FAIL-based smoke treated any non-zero exit as a + # pass, and inverted to a failure when QuestDB happened to be running + # on the developer's machine). + compile_test( + line_reader_c_smoke + cpp_test/smoke_line_reader.c) + + # Broker-independent C++ tests for the line_reader FFI. Covers the + # error-handling surface, parser rejection paths, the connect-failure + # path against a closed port, NULL-idempotent free / close functions, + # `from_env`, and the C++ `line_reader_error` exception wrapper. + compile_test( + test_line_reader_offline + cpp_test/test_line_reader_offline.cpp) + + # Mock-server-driven C++ tests for the line_reader FFI. Drives the + # reader against an in-process WebSocket + QWP1 mock so the + # column-getter / bind-encoding / server_info / error-code / stats + # surface that previously needed a live broker now runs in CI. + compile_test( + test_line_reader_mock + cpp_test/qwp_mock_server.cpp + cpp_test/test_line_reader_mock.cpp) + # System testing Python3 script. # This will download the latest QuestDB instance from Github, # thus will also require a Java 11 installation to run the tests. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..863b2ce1 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,116 @@ +# Contributing + +Thanks for your interest in `c-questdb-client`. This guide covers what you need +to know before opening a PR. + +## Where to ask questions + +- General usage / "is this a bug?" → [Community Forum](https://community.questdb.io/) +- Confirmed bugs and feature requests → [GitHub issues](https://github.com/questdb/c-questdb-client/issues) +- Server-side questions (storage engine, SQL, WAL) belong in the + [`questdb/questdb`](https://github.com/questdb/questdb) repo, not here. + +## Repo layout + +| Path | What lives here | +| ---- | --------------- | +| `questdb-rs/` | Core Rust crate (`questdb-rs` on crates.io). Pure Rust; all protocol logic. | +| `questdb-rs-ffi/` | C ABI shim. Pure FFI exports — no business logic. | +| `include/` | Hand-maintained C / C++ headers. **Not** generated by cbindgen. | +| `cpp_test/` | C++ test suite. Mirrors C++ wrapper coverage. | +| `system_test/` | Python integration suites that spawn a real QuestDB server. | +| `examples/` | Buildable C / C++ examples. | +| `ci/` | Azure Pipelines YAML. | +| `doc/` | Long-form docs: build, release, security, considerations. | + +For a deeper tour see [`doc/DEV_NOTES.md`](doc/DEV_NOTES.md). + +## Build & test + +Full build instructions live in [`doc/BUILD.md`](doc/BUILD.md). The short +version for Rust-only work: + +```sh +cd questdb-rs +cargo build --features almost-all-features +cargo test --features almost-all-features +``` + +For the C / C++ side: + +```sh +cmake -S . -B build -DQUESTDB_TESTS_AND_EXAMPLES=ON +cmake --build build +ctest --test-dir build +``` + +`almost-all-features` enables every cross-compatible feature flag at once (it +deliberately omits the mutually-exclusive `aws-lc-crypto` / `ring-crypto` pair — +see [`questdb-rs/Cargo.toml`](questdb-rs/Cargo.toml)). + +## Coding standards + +- **Rust:** `cargo fmt` + `cargo clippy --all-targets -- -D warnings` must + pass. The `almost-all-features` flag is the canonical lint target. Apply + this to both `questdb-rs/` and `questdb-rs-ffi/`. +- **C / C++:** run `clang-format` on touched files (config lives in + [`.clang-format`](.clang-format)). +- **Comments:** explain *why*, not *what*. The reviewers look closely at any + comment that restates the code. +- **Unsafe Rust:** keep `unsafe` blocks narrow and document the safety + invariants the caller must uphold. +- **No new runtime dependencies** in `questdb-rs` without discussion — the + library statically links everything it ships. + +## Pre-commit hook + +[`.githooks/pre-commit`](.githooks/pre-commit) runs `cargo fmt --check` and +`cargo clippy` on staged commits. It is opt-in — enable it once per clone: + +```sh +git config core.hooksPath .githooks +``` + +## CI coverage + +PR CI runs the offline / mock-based suites only. The `questdb` submodule is +**not** checked out (`submodules: false` in +[`ci/run_tests_pipeline.yaml`](ci/run_tests_pipeline.yaml)), so these suites +do **not** run on every PR: + +- The 79 `live-server-tests` Rust tests in `questdb-rs/` +- The C++ `test_line_reader` live suite +- The Python failover system test (`system_test/test_egress_failover.py`) +- The Python ingestion integration test (`system_test/test.py`) + +End-to-end coverage against a real QuestDB broker lives in the separate +`TestVsQuestDBMaster` job, which clones `questdb/questdb` master, builds the +jar with JDK 25 + Maven, and runs the integration suites. **That job is the +gate for live-server correctness — PR CI alone does not prove it.** Watch the +job result before merging any change that touches the wire protocol, transport, +or failover paths. + +To reproduce the live suites locally: + +```sh +git submodule update --init --recursive +# build the jar under questdb/core/target/ — see questdb/README.md +cd questdb-rs +cargo test --features live-server-tests +``` + +## Submitting a pull request + +1. Branch off `main`. Keep the diff focused — unrelated refactors get split + into separate PRs. +2. Write or update tests. Bug fixes need a regression test; new public APIs + need at least one usage test. +3. Run `cargo fmt`, `cargo clippy`, and the relevant test suite locally + before pushing. +4. Open the PR against `main` with a description that explains the *why*. + Link the issue if there is one. +5. Watch CI. The `TestVsQuestDBMaster` job (see above) needs to pass for any + protocol-level change. + +By submitting a contribution, you agree it is licensed under the project's +[Apache 2.0 License](LICENSE). diff --git a/cbindgen.toml b/cbindgen.toml index 09277c6b..d80629a0 100644 --- a/cbindgen.toml +++ b/cbindgen.toml @@ -40,10 +40,17 @@ includes = [] # ["my_great_lib.h"] no_includes = true after_includes = """ -#if defined(LINESENDER_DYN_LIB) && defined(_MSC_VER) -# define LINESENDER_API __declspec(dllimport) +/* `LINESENDER_DYN_LIB` is the historical name of this toggle, from when the + library shipped only the line sender. Accepted as an alias so consumers + predating the `QUESTDB_CLIENT_*` naming keep linking unchanged. */ +#if defined(LINESENDER_DYN_LIB) && !defined(QUESTDB_CLIENT_DYN_LIB) +# define QUESTDB_CLIENT_DYN_LIB +#endif + +#if defined(QUESTDB_CLIENT_DYN_LIB) && defined(_MSC_VER) +# define QUESTDB_CLIENT_API __declspec(dllimport) #else -# define LINESENDER_API +# define QUESTDB_CLIENT_API #endif """ @@ -64,5 +71,5 @@ style = "type" usize_is_size_t = true [fn] -prefix = "LINESENDER_API" +prefix = "QUESTDB_CLIENT_API" args = "vertical" diff --git a/ci/run_all_tests.py b/ci/run_all_tests.py index e8c6f250..5076e94f 100644 --- a/ci/run_all_tests.py +++ b/ci/run_all_tests.py @@ -19,14 +19,30 @@ def run_cmd(*args, cwd=None): sys.stderr.write(f'Command `{args_str}` failed with return code {cpe.returncode}.\n') sys.exit(cpe.returncode) +def find_binary(build_dir, name, exe_suffix): + return next(iter(build_dir.glob(f'**/{name}{exe_suffix}'))) + + def main(): build_dir = pathlib.Path('build') - exe_suffix = '.exe' if platform.system() == 'Windows' else '' - test_line_sender_path = next(iter( - build_dir.glob(f'**/test_line_sender{exe_suffix}'))) build_cxx20_dir = pathlib.Path('build_CXX20') - test_line_sender_path_CXX20 = next(iter( - build_cxx20_dir.glob(f'**/test_line_sender{exe_suffix}'))) + exe_suffix = '.exe' if platform.system() == 'Windows' else '' + + # Test binaries to invoke from each build tree. All are + # broker-independent or skip-on-no-broker, so they are safe to run + # unconditionally in CI. + cpp_tests = [ + 'test_line_sender', + 'test_line_reader_offline', + 'test_line_reader_mock', + 'line_reader_c_smoke', + 'test_line_reader', # live-broker; skips per-test when no broker reachable + ] + test_paths = [ + (d, find_binary(d, name, exe_suffix)) + for d in (build_dir, build_cxx20_dir) + for name in cpp_tests + ] system_test_path = pathlib.Path('system_test') / 'test.py' qdb_v = '9.2.0' # The version of QuestDB we'll test against. @@ -49,8 +65,8 @@ def main(): run_cmd('cargo', 'test', '--features=almost-all-features', '--', '--nocapture', cwd='questdb-rs') run_cmd('cargo', 'test', cwd='questdb-rs-ffi') - run_cmd(str(test_line_sender_path)) - run_cmd(str(test_line_sender_path_CXX20)) + for _, path in test_paths: + run_cmd(str(path)) run_cmd('python3', str(system_test_path), 'run', '--versions', qdb_v, '-v') # run_cmd('python3', str(system_test_path), 'run', '--repo', './questdb', '-v') diff --git a/ci/run_fuzz_pipeline.yaml b/ci/run_fuzz_pipeline.yaml index 61820c04..def86182 100644 --- a/ci/run_fuzz_pipeline.yaml +++ b/ci/run_fuzz_pipeline.yaml @@ -2,26 +2,29 @@ trigger: none pr: none -# Hourly cron against the default branch, always run even if there were no -# changes since the previous run. The Linux leg now runs on the self-hosted +# Hourly cron against the default branch (and active egress feature +# branches), always run even if there were no changes since the previous +# run. The Linux leg of the QWP/WS fuzz runs on the self-hosted # hetzner-incus pool (matching questdb/questdb's ci/test-fuzz.yml) — the # Microsoft-hosted ubuntu-latest agents were hitting "Free disk space on / # is lower than 5%" and OOM-killing the test with SIGBUS mid-run. The -# mac/windows legs stay on Microsoft-hosted images. The equivalent -# PR-time job in run_tests_pipeline.yaml (TestQwpWsFuzz) has the same -# split; this pipeline is the always-on stability watchdog and the -# source of Slack #builds alerts. +# mac/windows legs stay on Microsoft-hosted images. The pure-Rust QWP +# egress proptest fuzz runs cross-platform on hosted images (no JDK / no +# server). Equivalent PR-time jobs live in run_tests_pipeline.yaml +# (TestQwpWsFuzz / TestQwpEgressLiveServerFuzz); this pipeline is the +# always-on stability watchdog and the source of Slack #builds alerts. schedules: - cron: "0 * * * *" - displayName: Run QWP/WS fuzz every hour + displayName: Run QWP/WS + QWP egress fuzz every hour branches: include: - main + - vi_egress always: true stages: - stage: ScheduledFuzz - displayName: "Scheduled QWP/WS fuzz" + displayName: "Scheduled QWP/WS + QWP egress fuzz" jobs: - job: TestQwpWsFuzz displayName: "QWP/WS fuzz suite (mac/windows hosted)" @@ -147,6 +150,13 @@ stages: exit 1 fi done + # `set -x` traces `echo "##vso[...]VALUE"` as `+ echo + # '##vso[...]VALUE'`; Azure's stdout parser matches `##vso[` + # anywhere on the line and takes everything past `]` to EOL + # as the value, including the trailing `'` bash added. + # Disable trace just for these two echoes so JAVA_HOME does + # not end up as `/usr/lib/jvm/java-25-openjdk-amd64'`. + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" @@ -171,6 +181,8 @@ stages: fi CARGO_BIN="$(dirname "$(command -v cargo)")" echo "Using cargo from: $CARGO_BIN" + # See JAVA_HOME comment above re: `set -x` + `##vso[...]`. + set +x echo "##vso[task.prependpath]$CARGO_BIN" displayName: "Resolve Rust toolchain" - template: compile.yaml @@ -207,6 +219,104 @@ stages: pathToPublish: $(Build.ArtifactStagingDirectory)/qdb-log-$(Agent.OS).zip artifactName: qdb-log-$(Agent.OS)-linux + # Pure-Rust proptest fuzz for the QWP egress codec. No server, no + # JDK — runs cross-platform on Microsoft-hosted images. Crank + # PROPTEST_CASES well above the in-source default so the scheduled + # cron actually explores beyond what a standard `cargo test` + # would. Failures replay locally via the regression seeds checked + # in under questdb-rs/proptest-regressions/egress/. + - job: TestQwpEgressFuzz + displayName: "QWP egress fuzz suite (cross-platform)" + strategy: + matrix: + linux: + imageName: "ubuntu-latest" + mac: + imageName: "macos-latest" + windows-2022: + imageName: "windows-2022" + pool: + vmImage: $(imageName) + timeoutInMinutes: 90 + steps: + - checkout: self + fetchDepth: 1 + lfs: false + submodules: false + - template: compile.yaml + - bash: | + set -euxo pipefail + cd questdb-rs + # `cargo test` only accepts one positional filter, and a filter + # under `--test NAME` applies inside that integration bin — it + # does not reach `mod tests` blocks in the lib. Split into one + # invocation per scope so the proptest modules in src/egress/ + # actually run. + PROPTEST_CASES=10000 cargo test --release \ + --features sync-reader-ws \ + --test qwp_egress_bounds_fuzz \ + --test qwp_egress_fragmentation_fuzz + PROPTEST_CASES=10000 cargo test --release \ + --features sync-reader-ws \ + --lib egress::binds::tests + PROPTEST_CASES=10000 cargo test --release \ + --features sync-reader-ws \ + --lib egress::decoder::tests + displayName: "Run QWP egress fuzz suite" + + # Live-server fuzz: bind / fragmentation / random-schema cases + # that need a real QuestDB. Builds the questdb jar from `master`, + # then runs the `egress_live_server_*_fuzz` cargo tests. + # Linux-only — keeps the scheduled Maven cost bounded; the + # pull-request-time pipeline (run_tests_pipeline.yaml) already + # runs the same suite for inbound changes. + # Seed varies per build via `QWP_EGRESS_FUZZ_SEED=$(Build.BuildId)` + # so the scheduled cron actually explores beyond the deterministic + # default that pull-request runs use. + - job: TestQwpEgressLiveServerFuzz + displayName: "QWP egress live-server fuzz (scheduled)" + pool: + vmImage: "ubuntu-latest" + timeoutInMinutes: 30 + steps: + - checkout: self + fetchDepth: 1 + lfs: false + submodules: false + - template: compile.yaml + - script: | + git clone --depth 1 https://github.com/questdb/questdb.git + displayName: git clone questdb + - script: | + set -euo pipefail + JDK_URL="https://api.adoptium.net/v3/binary/latest/25/ga/linux/x64/jdk/hotspot/normal/eclipse" + sudo mkdir -p /opt/jdk25 + curl -fsSL "$JDK_URL" | sudo tar -xz -C /opt/jdk25 --strip-components=1 + echo "##vso[task.setvariable variable=JAVA_HOME]/opt/jdk25" + echo "##vso[task.prependpath]/opt/jdk25/bin" + displayName: "Install JDK 25" + - task: Maven@3 + displayName: "Compile QuestDB" + inputs: + mavenPOMFile: "questdb/pom.xml" + jdkVersionOption: "default" + options: "-DskipTests -Pbuild-web-console" + - script: | + set -euxo pipefail + cd questdb-rs + # `--test NAME` selectors limit the build to the fuzz + # binaries we own. Other live-server tests are excluded + # from this scheduled run so a transient unrelated failure + # there can't drown out a real fuzz regression. + QWP_EGRESS_FUZZ_SEED="$(Build.BuildId)" \ + cargo test --features live-server-tests \ + --test egress_live_server_bind_fuzz \ + --test egress_live_server_fragmentation_fuzz \ + --test egress_live_server_fuzz \ + --test egress_live_server_alter_fuzz \ + -- --nocapture + displayName: "QWP egress live-server fuzz (varying seed)" + # Mirrors questdb/questdb's docker-release-pipeline.yml NotifyOnFailure # pattern. BUILDS_SLACK_HOOK_URL must be configured as a secret # pipeline variable (or attached via a shared variable group) before @@ -217,6 +327,8 @@ stages: dependsOn: - TestQwpWsFuzz - TestQwpWsFuzzLinux + - TestQwpEgressFuzz + - TestQwpEgressLiveServerFuzz condition: failed() pool: vmImage: "ubuntu-latest" @@ -227,7 +339,7 @@ stages: inputs: script: | curl -X POST -H 'Content-type: application/json' \ - --data '{"text":"🚨 *c-questdb-client QWP/WS fuzz failed*\n\nBranch: `$(Build.SourceBranchName)`\nBuild: <$(System.CollectionUri)$(System.TeamProject)/_build/results?buildId=$(Build.BuildId)|#$(Build.BuildId)>\nCommit: `$(Build.SourceVersion)`"}' \ + --data '{"text":"🚨 *c-questdb-client QWP/WS + egress fuzz failed*\n\nBranch: `$(Build.SourceBranchName)`\nBuild: <$(System.CollectionUri)$(System.TeamProject)/_build/results?buildId=$(Build.BuildId)|#$(Build.BuildId)>\nCommit: `$(Build.SourceVersion)`"}' \ "$(BUILDS_SLACK_HOOK_URL)" env: BUILDS_SLACK_HOOK_URL: $(BUILDS_SLACK_HOOK_URL) diff --git a/ci/run_tests_pipeline.yaml b/ci/run_tests_pipeline.yaml index 2a2da047..57ddd5d5 100644 --- a/ci/run_tests_pipeline.yaml +++ b/ci/run_tests_pipeline.yaml @@ -114,6 +114,37 @@ stages: cd tls_proxy cargo clippy --all-targets --all-features -- -D warnings displayName: "tls_proxy: clippy" + - script: | + cd system_test + cd failover_clients + cargo fmt --all -- --check + displayName: "failover_clients: fmt" + - script: | + cd system_test + cd failover_clients + cargo clippy --all-targets --all-features + displayName: "failover_clients: clippy" + - job: SanitizeCppTests + displayName: "C/C++ tests under ASan + UBSan" + pool: + vmImage: "ubuntu-latest" + timeoutInMinutes: 60 + steps: + - checkout: self + fetchDepth: 1 + lfs: false + submodules: false + - script: | + cmake -S . -B build_sanitize \ + -DCMAKE_BUILD_TYPE=RelWithDebInfo \ + -DQUESTDB_TESTS_AND_EXAMPLES=ON \ + -DQUESTDB_SANITIZE=ON + displayName: "Configure (ASan + UBSan)" + - script: cmake --build build_sanitize --config Release + displayName: "Build" + - script: ctest --output-on-failure -E rust_tests + workingDirectory: build_sanitize + displayName: "ctest under ASan + UBSan" # Runs the full system_test/test.py against a fresh build of # QuestDB master. Moved off Microsoft-hosted ubuntu-latest because # the QWP/WS fuzz portion of the run filled the agent disk ("No @@ -160,6 +191,13 @@ stages: exit 1 fi done + # `set -x` traces `echo "##vso[...]VALUE"` as `+ echo + # '##vso[...]VALUE'`; Azure's stdout parser matches `##vso[` + # anywhere on the line and takes everything past `]` to EOL + # as the value, including the trailing `'` bash added. + # Disable trace just for these two echoes so JAVA_HOME does + # not end up as `/usr/lib/jvm/java-25-openjdk-amd64'`. + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" @@ -177,6 +215,8 @@ stages: fi CARGO_BIN="$(dirname "$(command -v cargo)")" echo "Using cargo from: $CARGO_BIN" + # See JAVA_HOME comment above re: `set -x` + `##vso[...]`. + set +x echo "##vso[task.prependpath]$CARGO_BIN" displayName: "Resolve Rust toolchain" - template: compile.yaml @@ -195,6 +235,9 @@ stages: - script: | python3 system_test/test.py run --repo ./questdb -v displayName: "integration test" + - script: | + python3 system_test/test_egress_failover.py run --repo ./questdb -v + displayName: "egress integration test" - task: ArchiveFiles@2 displayName: "Compress QuestDB server log on failure" condition: failed() @@ -327,6 +370,13 @@ stages: exit 1 fi done + # `set -x` traces `echo "##vso[...]VALUE"` as `+ echo + # '##vso[...]VALUE'`; Azure's stdout parser matches `##vso[` + # anywhere on the line and takes everything past `]` to EOL + # as the value, including the trailing `'` bash added. + # Disable trace just for these two echoes so JAVA_HOME does + # not end up as `/usr/lib/jvm/java-25-openjdk-amd64'`. + set +x echo "##vso[task.setvariable variable=JAVA_HOME_17_X64]$JAVA_PATH_17" echo "##vso[task.setvariable variable=JAVA_HOME]$JAVA_PATH_25" displayName: "Install missing deps + resolve JDKs" @@ -344,6 +394,8 @@ stages: fi CARGO_BIN="$(dirname "$(command -v cargo)")" echo "Using cargo from: $CARGO_BIN" + # See JAVA_HOME comment above re: `set -x` + `##vso[...]`. + set +x echo "##vso[task.prependpath]$CARGO_BIN" displayName: "Resolve Rust toolchain" - template: compile.yaml @@ -379,3 +431,202 @@ stages: inputs: pathToPublish: $(Build.ArtifactStagingDirectory)/qdb-log-$(Agent.OS).zip artifactName: qdb-log-$(Agent.OS)-linux + + # Live-server fuzz coverage for the OSS-questdb tests that need a + # real server (bind / fragmentation / random-schema / ALTER + # orchestration). `--test NAME` selectors limit the build to the + # fuzz binaries so other live-server tests don't drag this job + # into unrelated breakage. Default seed gives a deterministic + # sweep for inbound changes; the hourly cron in + # run_fuzz_pipeline.yaml varies the seed per build. + - job: TestQwpEgressLiveServerFuzz + displayName: "QWP egress live-server fuzz suite" + pool: + vmImage: "ubuntu-latest" + timeoutInMinutes: 30 + steps: + - checkout: self + fetchDepth: 1 + lfs: false + submodules: false + - template: compile.yaml + - script: | + git clone --depth 1 https://github.com/questdb/questdb.git + displayName: git clone questdb + - script: | + set -euo pipefail + JDK_URL="https://api.adoptium.net/v3/binary/latest/25/ga/linux/x64/jdk/hotspot/normal/eclipse" + sudo mkdir -p /opt/jdk25 + curl -fsSL "$JDK_URL" | sudo tar -xz -C /opt/jdk25 --strip-components=1 + echo "##vso[task.setvariable variable=JAVA_HOME]/opt/jdk25" + echo "##vso[task.prependpath]/opt/jdk25/bin" + displayName: "Install JDK 25 (required by QuestDB master)" + - task: Maven@3 + displayName: "Compile QuestDB" + inputs: + mavenPOMFile: "questdb/pom.xml" + jdkVersionOption: "default" + options: "-DskipTests -Pbuild-web-console" + - script: | + set -euxo pipefail + cd questdb-rs + # `--test NAME` selectors keep the build scoped to the + # fuzz binaries plus the pipelined-reader integration + # suite. The pipelined tests share the same live-server + # fixture and exercise the only path that drives the + # background-IO worker end-to-end (worker startup, + # event-channel publication, terminal events, cancel + # drain, sync↔pipelined per-row equivalence). M9 of + # the level-3 review added `--test egress_pipelined` + # here; before that the integration tests never ran + # in CI even though they passed locally. + cargo test --features live-server-tests \ + --test egress_live_server_bind_fuzz \ + --test egress_live_server_fragmentation_fuzz \ + --test egress_live_server_fuzz \ + --test egress_live_server_alter_fuzz \ + --test egress_pipelined \ + -- --nocapture + displayName: "QWP egress live-server fuzz + pipelined" + + # Cross-repo trigger: fire the questdb-enterprise + # build-and-test-e2e-c-client pipeline with this build's SHA so + # the c_client-marked failover tests (Rust today; C / C++ as + # those sidecars land) run against an Enterprise primary. The + # Enterprise pipeline posts a status check back on the PR + # (`enterprise-e2e-c-client` context); this job fire-and-forgets + # and does not block the PR. + # + # Fork PRs are intentionally skipped: System.AccessToken is not + # exposed to fork builds, so the dispatch call would fail with + # 401. We'd rather skip silently than spam fork PRs with + # spurious red checks. + # + # Prerequisites the questdb infra team needs to have configured + # (one-time, in the Azure DevOps UI): + # 1. Register ci/build-and-test-e2e-c-client.yaml in the + # `questdb-enterprise` ADO project as a pipeline. Name must + # match `enterprisePipelineName` below. + # 2. Create a PAT in the `questdb-enterprise` ADO project + # with `Build (Read & execute)` scope (User settings → + # Personal access tokens → New). Add it as a secret + # variable on THIS pipeline named ENT_DISPATCH_PAT + # (Pipelines → questdb.c-questdb-client → Edit → + # Variables → +, mark as secret). PAT-based auth is used + # rather than $(System.AccessToken) because the two + # pipelines live in different ADO projects, and tokens + # issued for this project can't see the Enterprise + # project's pipelines API without it. + - job: TriggerEnterpriseCClientE2E + displayName: "Trigger questdb-enterprise c_client e2e" + # Use the hetzner-incus self-hosted pool: the Microsoft-hosted + # queue is consistently deeper than incus, and this is a + # ~10-second curl+jq job so it picks up a slot quickly here. + # The hetzner image already ships curl + jq (the Enterprise + # ReportToOssPr stage uses both). + pool: + name: hetzner-incus + # Skip on fork PRs: secret pipeline variables (and System + # tokens) are not exposed to fork builds, so the REST POST + # would 401. Skipping rather than failing keeps fork-PR CI + # clean. + condition: and(succeeded(), ne(variables['System.PullRequest.IsFork'], 'True')) + variables: + # Hardcoded because the Enterprise pipeline lives in its own + # ADO project, not the one this pipeline runs in. + enterpriseProject: 'questdb-enterprise' + enterprisePipelineName: 'build-and-test-e2e-c-client' + steps: + - checkout: none + - bash: | + set -euo pipefail + + ORG_URL="$(System.TeamFoundationCollectionUri)" + PROJECT="$(enterpriseProject)" + PIPELINE_NAME="$(enterprisePipelineName)" + + if [ -z "${ENT_DISPATCH_PAT:-}" ]; then + echo "ERROR: ENT_DISPATCH_PAT secret pipeline variable is not set." >&2 + echo "Create a PAT in the questdb-enterprise ADO project with" >&2 + echo "'Build (Read & execute)' scope and add it as a secret" >&2 + echo "variable named ENT_DISPATCH_PAT on this pipeline." >&2 + exit 1 + fi + + # Resolve pipeline ID by name. The pipelines list API + # returns every pipeline in the project; we filter + # client-side because the $filter query parameter on + # this endpoint is limited. + echo "Looking up '${PIPELINE_NAME}' in '${PROJECT}'..." + PIPELINES=$(curl -fsS -u ":${ENT_DISPATCH_PAT}" \ + "${ORG_URL}${PROJECT}/_apis/pipelines?api-version=7.0") + PIPELINE_ID=$(echo "$PIPELINES" | jq -r --arg n "$PIPELINE_NAME" \ + '.value[] | select(.name == $n) | .id' | head -1) + + if [ -z "$PIPELINE_ID" ] || [ "$PIPELINE_ID" = "null" ]; then + echo "ERROR: pipeline '${PIPELINE_NAME}' not found in project '${PROJECT}'." >&2 + echo "Register ci/build-and-test-e2e-c-client.yaml in" >&2 + echo "the Azure DevOps UI under that name to enable the" >&2 + echo "cross-repo c_client e2e check." >&2 + exit 1 + fi + + CLIENT_PR_NUMBER="$(System.PullRequest.PullRequestNumber)" + if [ -z "${CLIENT_PR_NUMBER}" ] || [ "${CLIENT_PR_NUMBER}" = "None" ]; then + # Branch builds (e.g. nightly on main) still benefit + # from the cross-repo check, but there's no PR number + # to attach a status to. The Enterprise pipeline's + # report stage skips the GitHub-status POST when + # cClientPrNumber is empty. + CLIENT_PR_NUMBER="" + fi + + # Forward the client branch name as a hint for the + # Enterprise side to try a same-name branch match + # (xyz → xyz, fallback to main). For a PR build, + # System.PullRequest.SourceBranch is `refs/heads/xyz`; + # for a direct branch build the branch name is in + # Build.SourceBranchName already. Strip refs/heads/ so + # the Enterprise step can use it directly as a git ref. + if [ "$(Build.Reason)" = "PullRequest" ]; then + CLIENT_BRANCH="$(System.PullRequest.SourceBranch)" + CLIENT_BRANCH="${CLIENT_BRANCH#refs/heads/}" + else + CLIENT_BRANCH="$(Build.SourceBranchName)" + fi + echo "Client branch hint: ${CLIENT_BRANCH}" + + # `templateParameters` matches the `parameters:` block + # in build-and-test-e2e-c-client.yaml. + BODY=$(jq -n \ + --arg commit "$(Build.SourceVersion)" \ + --arg pr "${CLIENT_PR_NUMBER}" \ + --arg branch "${CLIENT_BRANCH}" \ + '{ + templateParameters: { + cClientCommit: $commit, + cClientPrNumber: $pr, + clientBranch: $branch + }, + resources: { repositories: { self: { refName: "refs/heads/main" } } } + }') + + echo "POSTing pipeline run with body:" + echo "$BODY" | jq . + + RESPONSE=$(curl -fsS -u ":${ENT_DISPATCH_PAT}" \ + -H "Content-Type: application/json" \ + -X POST \ + -d "$BODY" \ + "${ORG_URL}${PROJECT}/_apis/pipelines/${PIPELINE_ID}/runs?api-version=7.0") + + RUN_URL=$(echo "$RESPONSE" | jq -r '._links.web.href // ""') + echo "Enterprise build queued: ${RUN_URL}" + displayName: "Queue enterprise c_client e2e" + env: + # The PAT is a secret pipeline variable that must be set + # on this pipeline in the ADO UI (Variables → +, name + # ENT_DISPATCH_PAT, mark as secret). Secret variables + # aren't auto-exposed to script env; the explicit + # `env:` mapping below is required. + ENT_DISPATCH_PAT: $(ENT_DISPATCH_PAT) diff --git a/cpp_test/qwp_mock_server.cpp b/cpp_test/qwp_mock_server.cpp new file mode 100644 index 00000000..e3b44bed --- /dev/null +++ b/cpp_test/qwp_mock_server.cpp @@ -0,0 +1,1177 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2025 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + ******************************************************************************/ + +#include "qwp_mock_server.hpp" + +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#pragma comment(lib, "Ws2_32.lib") +using socket_t = SOCKET; +using ssize_t = std::intptr_t; +#define INVALID_SOCKET_VALUE INVALID_SOCKET +#define close_socket(s) closesocket(s) +// Winsock spells the shutdown constants differently. +#define QWP_SHUT_RDWR SD_BOTH +#define QWP_SHUT_WR SD_SEND +// Windows TCP has no SIGPIPE; closed-peer writes return WSAECONNRESET. +#define QWP_MSG_NOSIGNAL 0 +#else +#include +#include +#include +#include +#include +#include +using socket_t = int; +#define INVALID_SOCKET_VALUE (-1) +#define close_socket(s) ::close(s) +#define QWP_SHUT_RDWR SHUT_RDWR +#define QWP_SHUT_WR SHUT_WR +// Suppress SIGPIPE on closed-peer writes. Linux exposes the flag per +// `send()` call (`MSG_NOSIGNAL`); macOS/BSD exposes it as a per-socket +// option (`SO_NOSIGPIPE` set via `setsockopt`). Define both portably so +// the mock server can refuse to take down the test process when a +// client closes the connection mid-frame. +#ifdef MSG_NOSIGNAL +#define QWP_MSG_NOSIGNAL MSG_NOSIGNAL +#else +#define QWP_MSG_NOSIGNAL 0 +#endif +#endif + +namespace +{ +// Set the per-socket "do not raise SIGPIPE on closed-peer writes" option. +// macOS/BSD use `SO_NOSIGPIPE` because they lack `MSG_NOSIGNAL`; Linux +// already covers this via the `QWP_MSG_NOSIGNAL` flag on each `send()`. +// Windows has no SIGPIPE. Call this immediately after `socket()`/ +// `accept()` for any fd the mock server will write to. +inline void set_no_sigpipe([[maybe_unused]] socket_t fd) +{ +#if defined(SO_NOSIGPIPE) + int one = 1; + (void)::setsockopt( + fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)); +#endif +} +} // namespace + +namespace qwp_mock +{ + +// =========================================================================== +// SHA1 + Base64 — used by the WebSocket handshake to compute +// Sec-WebSocket-Accept. Hand-rolled to avoid pulling in a crypto dep. +// =========================================================================== + +namespace +{ + +struct Sha1State +{ + uint32_t h[5]; + uint64_t total_bits; + uint8_t buf[64]; + size_t buf_len; +}; + +inline uint32_t rotl(uint32_t x, int n) { return (x << n) | (x >> (32 - n)); } + +void sha1_init(Sha1State& s) +{ + s.h[0] = 0x67452301; + s.h[1] = 0xEFCDAB89; + s.h[2] = 0x98BADCFE; + s.h[3] = 0x10325476; + s.h[4] = 0xC3D2E1F0; + s.total_bits = 0; + s.buf_len = 0; +} + +void sha1_compress(Sha1State& s, const uint8_t* block) +{ + uint32_t w[80]; + for (int i = 0; i < 16; ++i) + { + w[i] = (uint32_t(block[i * 4]) << 24) | (uint32_t(block[i * 4 + 1]) << 16) | + (uint32_t(block[i * 4 + 2]) << 8) | uint32_t(block[i * 4 + 3]); + } + for (int i = 16; i < 80; ++i) + w[i] = rotl(w[i - 3] ^ w[i - 8] ^ w[i - 14] ^ w[i - 16], 1); + + uint32_t a = s.h[0], b = s.h[1], c = s.h[2], d = s.h[3], e = s.h[4]; + for (int i = 0; i < 80; ++i) + { + uint32_t f, k; + if (i < 20) + { + f = (b & c) | (~b & d); + k = 0x5A827999; + } + else if (i < 40) + { + f = b ^ c ^ d; + k = 0x6ED9EBA1; + } + else if (i < 60) + { + f = (b & c) | (b & d) | (c & d); + k = 0x8F1BBCDC; + } + else + { + f = b ^ c ^ d; + k = 0xCA62C1D6; + } + uint32_t t = rotl(a, 5) + f + e + k + w[i]; + e = d; + d = c; + c = rotl(b, 30); + b = a; + a = t; + } + s.h[0] += a; + s.h[1] += b; + s.h[2] += c; + s.h[3] += d; + s.h[4] += e; +} + +void sha1_update(Sha1State& s, const uint8_t* data, size_t len) +{ + s.total_bits += uint64_t(len) * 8; + while (len > 0) + { + size_t take = std::min(64 - s.buf_len, len); + std::memcpy(s.buf + s.buf_len, data, take); + s.buf_len += take; + data += take; + len -= take; + if (s.buf_len == 64) + { + sha1_compress(s, s.buf); + s.buf_len = 0; + } + } +} + +void sha1_finish(Sha1State& s, uint8_t out[20]) +{ + s.buf[s.buf_len++] = 0x80; + if (s.buf_len > 56) + { + std::memset(s.buf + s.buf_len, 0, 64 - s.buf_len); + sha1_compress(s, s.buf); + s.buf_len = 0; + } + std::memset(s.buf + s.buf_len, 0, 56 - s.buf_len); + for (int i = 7; i >= 0; --i) + s.buf[56 + i] = uint8_t(s.total_bits >> ((7 - i) * 8)); + sha1_compress(s, s.buf); + for (int i = 0; i < 5; ++i) + { + out[i * 4] = uint8_t(s.h[i] >> 24); + out[i * 4 + 1] = uint8_t(s.h[i] >> 16); + out[i * 4 + 2] = uint8_t(s.h[i] >> 8); + out[i * 4 + 3] = uint8_t(s.h[i]); + } +} + +std::string base64_encode(const uint8_t* data, size_t len) +{ + static const char tbl[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + std::string out; + out.reserve((len + 2) / 3 * 4); + size_t i = 0; + for (; i + 3 <= len; i += 3) + { + uint32_t v = (uint32_t(data[i]) << 16) | (uint32_t(data[i + 1]) << 8) | + uint32_t(data[i + 2]); + out.push_back(tbl[(v >> 18) & 0x3F]); + out.push_back(tbl[(v >> 12) & 0x3F]); + out.push_back(tbl[(v >> 6) & 0x3F]); + out.push_back(tbl[v & 0x3F]); + } + if (i < len) + { + uint32_t v = uint32_t(data[i]) << 16; + if (i + 1 < len) + v |= uint32_t(data[i + 1]) << 8; + out.push_back(tbl[(v >> 18) & 0x3F]); + out.push_back(tbl[(v >> 12) & 0x3F]); + out.push_back((i + 1 < len) ? tbl[(v >> 6) & 0x3F] : '='); + out.push_back('='); + } + return out; +} + +std::string compute_ws_accept(const std::string& sec_key) +{ + static const char GUID[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + Sha1State s; + sha1_init(s); + sha1_update( + s, reinterpret_cast(sec_key.data()), sec_key.size()); + sha1_update(s, reinterpret_cast(GUID), sizeof(GUID) - 1); + uint8_t hash[20]; + sha1_finish(s, hash); + return base64_encode(hash, 20); +} + +} // anonymous namespace + +// =========================================================================== +// Wire helpers (public API). +// =========================================================================== + +void encode_varint_u64(uint64_t v, std::vector& out) +{ + while ((v & ~uint64_t(0x7F)) != 0) + { + out.push_back(uint8_t((v & 0x7F) | 0x80)); + v >>= 7; + } + out.push_back(uint8_t(v)); +} + +std::vector framed( + uint8_t version, uint8_t flags, uint16_t table_count, + const std::vector& payload) +{ + std::vector out; + out.reserve(12 + payload.size()); + out.push_back('Q'); + out.push_back('W'); + out.push_back('P'); + out.push_back('1'); + out.push_back(version); + out.push_back(flags); + out.push_back(uint8_t(table_count)); + out.push_back(uint8_t(table_count >> 8)); + uint32_t plen = uint32_t(payload.size()); + out.push_back(uint8_t(plen)); + out.push_back(uint8_t(plen >> 8)); + out.push_back(uint8_t(plen >> 16)); + out.push_back(uint8_t(plen >> 24)); + out.insert(out.end(), payload.begin(), payload.end()); + return out; +} + +std::vector server_info_frame( + uint8_t role, + const std::string& cluster_id, + const std::string& node_id, + uint64_t epoch, + uint32_t capabilities, + int64_t server_wall_ns) +{ + std::vector p; + p.push_back(MSG_SERVER_INFO); + p.push_back(role); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(epoch >> (i * 8))); + for (int i = 0; i < 4; ++i) + p.push_back(uint8_t(capabilities >> (i * 8))); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(static_cast(server_wall_ns) >> (i * 8))); + uint16_t cl = uint16_t(cluster_id.size()); + p.push_back(uint8_t(cl)); + p.push_back(uint8_t(cl >> 8)); + p.insert(p.end(), cluster_id.begin(), cluster_id.end()); + uint16_t nl = uint16_t(node_id.size()); + p.push_back(uint8_t(nl)); + p.push_back(uint8_t(nl >> 8)); + p.insert(p.end(), node_id.begin(), node_id.end()); + return framed(2, 0, 0, p); +} + +std::vector result_end_frame(int64_t request_id) +{ + std::vector p; + p.push_back(MSG_RESULT_END); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(request_id >> (i * 8))); + encode_varint_u64(0, p); // final_seq + encode_varint_u64(0, p); // total_rows (not asserted by client beyond plumbing) + return framed(2, 0, 0, p); +} + +std::vector exec_done_frame( + int64_t request_id, uint8_t op_type, uint64_t rows_affected) +{ + std::vector p; + p.push_back(MSG_EXEC_DONE); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(request_id >> (i * 8))); + p.push_back(op_type); + encode_varint_u64(rows_affected, p); + return framed(2, 0, 0, p); +} + +std::vector query_error_frame( + int64_t request_id, uint8_t status_code, const std::string& message) +{ + std::vector p; + p.push_back(MSG_QUERY_ERROR); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(request_id >> (i * 8))); + p.push_back(status_code); + // msg_len is u16 LE, not a varint. + uint16_t mlen = uint16_t(message.size()); + p.push_back(uint8_t(mlen)); + p.push_back(uint8_t(mlen >> 8)); + p.insert(p.end(), message.begin(), message.end()); + return framed(2, 0, 0, p); +} + +std::vector cache_reset_frame(uint8_t mask) +{ + std::vector p = {MSG_CACHE_RESET, mask}; + return framed(2, 0, 0, p); +} + +std::vector result_batch_frame( + int64_t request_id, uint64_t batch_seq, uint64_t schema_id, + size_t row_count, const std::vector& columns) +{ + std::vector p; + p.push_back(MSG_RESULT_BATCH); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(request_id >> (i * 8))); + encode_varint_u64(batch_seq, p); + + // Table block. + encode_varint_u64(0, p); // empty table name + encode_varint_u64(uint64_t(row_count), p); + encode_varint_u64(uint64_t(columns.size()), p); + + // Schema section: Full mode (0x00). + p.push_back(0x00); + encode_varint_u64(schema_id, p); + for (const auto& c : columns) + { + encode_varint_u64(uint64_t(c.name.size()), p); + p.insert(p.end(), c.name.begin(), c.name.end()); + p.push_back(c.kind); + } + + // Per-column data. + for (const auto& c : columns) + p.insert(p.end(), c.data.begin(), c.data.end()); + + // RESULT_BATCH frames have table_count = 1. + return framed(2, 0, 1, p); +} + +std::vector result_batch_frame_with_dict( + int64_t request_id, uint64_t batch_seq, uint64_t schema_id, + size_t row_count, const std::vector& columns, + uint64_t dict_delta_start, + const std::vector& dict_entries) +{ + std::vector p; + p.push_back(MSG_RESULT_BATCH); + for (int i = 0; i < 8; ++i) + p.push_back(uint8_t(request_id >> (i * 8))); + encode_varint_u64(batch_seq, p); + + // Delta symbol-dict section (FLAG_DELTA_SYMBOL_DICT in the header). + encode_varint_u64(dict_delta_start, p); + encode_varint_u64(uint64_t(dict_entries.size()), p); + for (const auto& s : dict_entries) + { + encode_varint_u64(uint64_t(s.size()), p); + p.insert(p.end(), s.begin(), s.end()); + } + + encode_varint_u64(0, p); // empty table name + encode_varint_u64(uint64_t(row_count), p); + encode_varint_u64(uint64_t(columns.size()), p); + + p.push_back(0x00); // Schema: full mode + encode_varint_u64(schema_id, p); + for (const auto& c : columns) + { + encode_varint_u64(uint64_t(c.name.size()), p); + p.insert(p.end(), c.name.begin(), c.name.end()); + p.push_back(c.kind); + } + for (const auto& c : columns) + p.insert(p.end(), c.data.begin(), c.data.end()); + + // flags = FLAG_DELTA_SYMBOL_DICT (0x08); table_count = 1. + return framed(2, 0x08, 1, p); +} + +std::vector symbol_column_bytes(const std::vector& codes) +{ + std::vector out; + out.push_back(0x00); // null_flag = no validity (all rows non-null) + for (uint32_t code : codes) + encode_varint_u64(uint64_t(code), out); + return out; +} + +std::vector fixed_column_bytes( + size_t row_count, const std::vector& packed_values) +{ + (void)row_count; + std::vector out; + out.push_back(0x00); // null_flag = no validity + out.insert(out.end(), packed_values.begin(), packed_values.end()); + return out; +} + +std::vector fixed_column_bytes_nullable( + size_t row_count, + const std::vector& is_null, + const std::vector& packed_non_null_values, + size_t elem_size) +{ + assert(is_null.size() == row_count); + std::vector out; + bool any_null = std::any_of(is_null.begin(), is_null.end(), + [](bool b) { return b; }); + if (!any_null) + { + out.push_back(0x00); + out.insert(out.end(), packed_non_null_values.begin(), + packed_non_null_values.end()); + return out; + } + out.push_back(0x01); // null_flag = validity present + const size_t bitmap_len = (row_count + 7) / 8; + std::vector bitmap(bitmap_len, 0); + for (size_t i = 0; i < row_count; ++i) + if (is_null[i]) + bitmap[i >> 3] |= uint8_t(1u << (i & 7)); + out.insert(out.end(), bitmap.begin(), bitmap.end()); + out.insert(out.end(), packed_non_null_values.begin(), + packed_non_null_values.end()); + (void)elem_size; + return out; +} + +std::vector varlen_column_bytes( + const std::vector>& rows) +{ + std::vector out; + out.push_back(0x00); // no validity (every row non-null) + // Wire format: `(non_null + 1) × u32_le offsets`, then `total_bytes` + // raw data. Note: the egress decoder expects offsets *immediately + // after* the null_flag, no varint length prefix. + uint32_t off = 0; + auto push_u32 = [&](uint32_t v) + { + out.push_back(uint8_t(v)); + out.push_back(uint8_t(v >> 8)); + out.push_back(uint8_t(v >> 16)); + out.push_back(uint8_t(v >> 24)); + }; + push_u32(off); + for (const auto& r : rows) + { + off += uint32_t(r.size()); + push_u32(off); + } + for (const auto& r : rows) + out.insert(out.end(), r.begin(), r.end()); + return out; +} + +std::vector decimal64_column_bytes( + const std::vector& values, int8_t scale) +{ + std::vector out; + out.push_back(0x00); // validity: no nulls + encode_varint_u64(uint64_t(uint8_t(scale)), out); + for (int64_t v : values) + for (int i = 0; i < 8; ++i) + out.push_back(uint8_t(v >> (i * 8))); + return out; +} + +std::vector decimal128_column_bytes( + const std::vector>& values, int8_t scale) +{ + std::vector out; + out.push_back(0x00); + encode_varint_u64(uint64_t(uint8_t(scale)), out); + for (const auto& v : values) + out.insert(out.end(), v.begin(), v.end()); + return out; +} + +std::vector decimal256_column_bytes( + const std::vector>& values, int8_t scale) +{ + std::vector out; + out.push_back(0x00); // validity: no nulls + out.push_back(uint8_t(scale)); // 1B scale (decode_decimal_wide reads u8) + for (const auto& v : values) + out.insert(out.end(), v.begin(), v.end()); + return out; +} + +std::vector geohash_column_bytes( + const std::vector& is_null, + const std::vector& packed_non_null_values, + uint8_t precision_bits) +{ + std::vector out; + bool any_null = std::any_of(is_null.begin(), is_null.end(), + [](bool b) { return b; }); + if (!any_null) + { + out.push_back(0x00); + } + else + { + out.push_back(0x01); + const size_t bitmap_len = (is_null.size() + 7) / 8; + std::vector bitmap(bitmap_len, 0); + for (size_t i = 0; i < is_null.size(); ++i) + if (is_null[i]) + bitmap[i >> 3] |= uint8_t(1u << (i & 7)); + out.insert(out.end(), bitmap.begin(), bitmap.end()); + } + encode_varint_u64(uint64_t(precision_bits), out); + out.insert(out.end(), packed_non_null_values.begin(), + packed_non_null_values.end()); + return out; +} + +std::vector array_column_bytes( + const std::vector>& rows) +{ + std::vector out; + bool any_null = std::any_of(rows.begin(), rows.end(), + [](const std::optional& r) { return !r.has_value(); }); + if (!any_null) + { + out.push_back(0x00); + } + else + { + out.push_back(0x01); + const size_t bitmap_len = (rows.size() + 7) / 8; + std::vector bitmap(bitmap_len, 0); + for (size_t i = 0; i < rows.size(); ++i) + if (!rows[i].has_value()) + bitmap[i >> 3] |= uint8_t(1u << (i & 7)); + out.insert(out.end(), bitmap.begin(), bitmap.end()); + } + for (const auto& row : rows) + { + if (!row.has_value()) + continue; + out.push_back(uint8_t(row->shape.size())); + for (uint32_t dim : row->shape) + { + out.push_back(uint8_t(dim)); + out.push_back(uint8_t(dim >> 8)); + out.push_back(uint8_t(dim >> 16)); + out.push_back(uint8_t(dim >> 24)); + } + out.insert(out.end(), row->data.begin(), row->data.end()); + } + return out; +} + +// =========================================================================== +// MockServer implementation. +// =========================================================================== + +namespace +{ + +bool send_all(socket_t fd, const uint8_t* data, size_t len) +{ + while (len > 0) + { + ssize_t n = ::send(fd, reinterpret_cast(data), +#ifdef _WIN32 + int(len), +#else + len, +#endif + QWP_MSG_NOSIGNAL); + if (n <= 0) + return false; + data += n; + len -= size_t(n); + } + return true; +} + +bool recv_all(socket_t fd, uint8_t* data, size_t len) +{ + while (len > 0) + { + ssize_t n = ::recv(fd, reinterpret_cast(data), +#ifdef _WIN32 + int(len), +#else + len, +#endif + 0); + if (n <= 0) + return false; + data += n; + len -= size_t(n); + } + return true; +} + +// Read the HTTP request, find Sec-WebSocket-Key, write the upgrade +// response with X-QWP-Version: 2. Returns true on success. +bool ws_handshake(socket_t fd, bool reject_401) +{ + std::string buf; + buf.reserve(1024); + char b; + while (true) + { + ssize_t n = ::recv(fd, &b, 1, 0); + if (n <= 0) + return false; + buf.push_back(b); + if (buf.size() >= 4 && + buf.compare(buf.size() - 4, 4, "\r\n\r\n") == 0) + break; + if (buf.size() > 8192) + return false; + } + + if (reject_401) + { + const char resp[] = + "HTTP/1.1 401 Unauthorized\r\nContent-Length: 0\r\n" + "Connection: close\r\n\r\n"; + send_all(fd, reinterpret_cast(resp), sizeof(resp) - 1); + return false; + } + + // Find Sec-WebSocket-Key (case-insensitive). + std::string key; + { + size_t p = 0; + while (p < buf.size()) + { + size_t eol = buf.find("\r\n", p); + if (eol == std::string::npos) + break; + std::string line = buf.substr(p, eol - p); + p = eol + 2; + // Lowercase the header name portion before the colon. + size_t colon = line.find(':'); + if (colon == std::string::npos) + continue; + std::string name = line.substr(0, colon); + std::transform(name.begin(), name.end(), name.begin(), + [](char c) { return char(std::tolower(c)); }); + if (name == "sec-websocket-key") + { + key = line.substr(colon + 1); + // Trim whitespace. + size_t s = key.find_first_not_of(" \t"); + size_t e = key.find_last_not_of(" \t"); + if (s == std::string::npos) + key.clear(); + else + key = key.substr(s, e - s + 1); + break; + } + } + } + if (key.empty()) + return false; + + std::string accept = compute_ws_accept(key); + std::string resp = + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "X-QWP-Version: 2\r\n" + "Sec-WebSocket-Accept: " + + accept + "\r\n\r\n"; + return send_all(fd, reinterpret_cast(resp.data()), + resp.size()); +} + +// Read a single WebSocket frame from `fd`. Returns: +// {opcode, payload} — opcode is the low 4 bits (0x1 text, 0x2 binary, +// 0x8 close, 0x9 ping, 0xA pong). +// On error / connection close returns opcode = -1. +struct WsFrame +{ + int opcode; + std::vector payload; +}; + +WsFrame ws_read(socket_t fd) +{ + WsFrame f{-1, {}}; + uint8_t hdr[2]; + if (!recv_all(fd, hdr, 2)) + return f; + f.opcode = hdr[0] & 0x0F; + bool masked = (hdr[1] & 0x80) != 0; + uint64_t plen = hdr[1] & 0x7F; + if (plen == 126) + { + uint8_t ext[2]; + if (!recv_all(fd, ext, 2)) + { + f.opcode = -1; + return f; + } + plen = (uint64_t(ext[0]) << 8) | ext[1]; + } + else if (plen == 127) + { + uint8_t ext[8]; + if (!recv_all(fd, ext, 8)) + { + f.opcode = -1; + return f; + } + plen = 0; + for (int i = 0; i < 8; ++i) + plen = (plen << 8) | ext[i]; + } + uint8_t mask[4] = {0}; + if (masked && !recv_all(fd, mask, 4)) + { + f.opcode = -1; + return f; + } + f.payload.resize(size_t(plen)); + if (plen > 0 && !recv_all(fd, f.payload.data(), size_t(plen))) + { + f.opcode = -1; + return f; + } + if (masked) + for (size_t i = 0; i < f.payload.size(); ++i) + f.payload[i] ^= mask[i & 3]; + return f; +} + +// Graceful end-of-script teardown. Sends a single WebSocket Close +// control frame (opcode 0x88, FIN bit set, zero-length payload) so +// the client sees a clean protocol-level close, then half-closes the +// TCP send side (`shutdown(SHUT_WR)`) so the FIN propagates AFTER +// any buffered RESULT_END / EXEC_DONE bytes are delivered. +// +// Why this matters: a bare `close(fd)` on macOS surfaces to the +// client as a TCP RST when there's unACK'd data in the local send +// buffer, and the kernel discards still-undelivered data on RST. The +// failure mode looks like a race — RESULT_END *was* sent, but the +// client read returns "Connection reset by peer" instead of seeing +// the frame. Half-close avoids that: the FIN is queued behind the +// data, so the client drains the recv buffer cleanly and then sees +// EOF on a subsequent read. +void graceful_close(socket_t fd) +{ + // RFC 6455 §5.5.1 server Close: 0x88 = FIN | OP_CLOSE; payload = 0. + static const std::uint8_t ws_close[2] = {0x88, 0x00}; + (void)send( + fd, + reinterpret_cast(ws_close), + sizeof(ws_close), + QWP_MSG_NOSIGNAL); + (void)::shutdown(fd, QWP_SHUT_WR); + close_socket(fd); +} + +// Write a single (server-side, unmasked) binary WebSocket frame. +bool ws_write_binary(socket_t fd, const std::vector& payload) +{ + std::vector hdr; + hdr.push_back(0x82); // FIN + BINARY + size_t len = payload.size(); + if (len < 126) + { + hdr.push_back(uint8_t(len)); + } + else if (len <= 0xFFFF) + { + hdr.push_back(126); + hdr.push_back(uint8_t(len >> 8)); + hdr.push_back(uint8_t(len)); + } + else + { + hdr.push_back(127); + for (int i = 7; i >= 0; --i) + hdr.push_back(uint8_t(len >> (i * 8))); + } + if (!send_all(fd, hdr.data(), hdr.size())) + return false; + return send_all(fd, payload.data(), payload.size()); +} + +// Read frames until we observe a binary frame whose payload starts with +// `expected_kind`. Stash the captured payload into `out_captured`. Returns +// the request_id from a QUERY_REQUEST (offset 1, i64 LE) when the +// expected kind is QUERY_REQUEST; -1 otherwise. Returns -1 on error. +int64_t read_until_kind( + socket_t fd, uint8_t expected_kind, + std::vector>& out_captured, + std::mutex& out_captured_mtx) +{ + while (true) + { + WsFrame f = ws_read(fd); + if (f.opcode < 0) + return -1; + if (f.opcode == 0x8) + return -1; // close + if (f.opcode == 0x9) + { + // Reply pong with same payload, then keep reading. + std::vector hdr = {0x8A, uint8_t(f.payload.size())}; + send_all(fd, hdr.data(), hdr.size()); + if (!f.payload.empty()) + send_all(fd, f.payload.data(), f.payload.size()); + continue; + } + if (f.opcode != 0x2) // not binary + continue; + if (f.payload.empty()) + continue; + { + std::lock_guard g(out_captured_mtx); + out_captured.push_back(f.payload); + } + if (f.payload[0] == expected_kind) + { + if (expected_kind == MSG_QUERY_REQUEST && f.payload.size() >= 9) + { + int64_t rid = 0; + for (int i = 0; i < 8; ++i) + rid |= int64_t(f.payload[1 + i]) << (i * 8); + return rid; + } + return 0; + } + } +} + +} // anonymous namespace + +struct MockServer::Impl +{ + socket_t listen_fd = INVALID_SOCKET_VALUE; + uint16_t port = 0; + std::vector