From 9924132a5ae0f03b8de69ed4e98fd15c96ad26d8 Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Mon, 13 Apr 2026 17:10:03 +0200 Subject: [PATCH 1/5] Add Python bindings + wheel generation and publish steps --- .github/workflows/ci.yml | 73 ++++ .gitignore | 4 + CMakeLists.txt | 5 + examples/python/hello_camera_zerocopy.py | 86 ++++ examples/python/hello_pubsub.py | 38 ++ examples/python/hello_schema.py | 63 +++ include/kickmsg/Node.h | 35 +- os/darwin/SharedMemory.cc | 27 +- py_bindings/CMakeLists.txt | 28 ++ py_bindings/src/kickmsg_py.cc | 489 +++++++++++++++++++++++ pyproject.toml | 116 ++++++ src/Node.cc | 47 ++- tests/python/conftest.py | 42 ++ tests/python/test_lifetime.py | 78 ++++ tests/python/test_node.py | 81 ++++ tests/python/test_pubsub.py | 58 +++ tests/python/test_schema.py | 68 ++++ tests/python/test_threading.py | 43 ++ tests/python/test_zerocopy.py | 98 +++++ tools/setup/version.sh | 21 + 20 files changed, 1453 insertions(+), 47 deletions(-) create mode 100644 examples/python/hello_camera_zerocopy.py create mode 100644 examples/python/hello_pubsub.py create mode 100644 examples/python/hello_schema.py create mode 100644 py_bindings/CMakeLists.txt create mode 100644 py_bindings/src/kickmsg_py.cc create mode 100644 pyproject.toml create mode 100644 tests/python/conftest.py create mode 100644 tests/python/test_lifetime.py create mode 100644 tests/python/test_node.py create mode 100644 tests/python/test_pubsub.py create mode 100644 tests/python/test_schema.py create mode 100644 tests/python/test_threading.py create mode 100644 tests/python/test_zerocopy.py create mode 100755 tools/setup/version.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0541476..242e2a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,3 +96,76 @@ jobs: - name: Verify ARM binaries run: file build_arm/kickmsg_unit build_arm/kickmsg_stress_test | grep "ARM aarch64" shell: bash + + # Build Python wheels across the supported platforms. Runs on every + # push so wheel-build regressions surface immediately; only the tagged + # release job below actually uploads to PyPI. + build-wheels: + name: Build Python Wheels (${{ matrix.os }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-24.04, ubuntu-24.04-arm, macos-14] + env: + # `native`: the runner's own architecture. Linux x86_64 runner → + # manylinux_x86_64 wheels; Linux ARM runner → manylinux_aarch64; + # macOS 14 runner (Apple Silicon) → universal2 via the macOS cibw + # config in pyproject.toml. + CIBW_ARCHS: native + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 # so tools/setup/version.sh can read tags + + - name: Build wheels + uses: pypa/cibuildwheel@v3.1.0 + with: + output-dir: wheelhouse + + - uses: actions/upload-artifact@v4 + with: + name: wheelhouse-kickmsg-${{ matrix.os }} + path: wheelhouse/*.whl + + # Publish to PyPI on tag push. Uses OIDC trusted publishing (configured + # per-repo at https://pypi.org/manage/account/publishing/ — the `kickmsg` + # PyPI project must list this repository + environment `maintainer` as + # a trusted publisher). No API tokens stored in GitHub secrets. + release: + needs: [build, cross_arm64, build-wheels] + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-24.04 + environment: + name: maintainer + permissions: + contents: write + id-token: write + steps: + - uses: actions/checkout@v4 + + - name: Download wheels + uses: actions/download-artifact@v4 + with: + pattern: wheelhouse-kickmsg-* + path: wheelhouse + merge-multiple: true + + - name: Create GitHub release + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + tag: ${{ github.ref_name }} + run: | + gh release create "$tag" wheelhouse/*.whl \ + --repo="$GITHUB_REPOSITORY" \ + --title="${GITHUB_REPOSITORY#*/} ${tag#v}" \ + --generate-notes + + - name: Publish wheels to PyPI + uses: pypa/gh-action-pypi-publish@v1.12.4 + with: + packages-dir: wheelhouse + verify-metadata: true + attestations: true + print-hash: true + skip-existing: true diff --git a/.gitignore b/.gitignore index 1708572..9d9f889 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,7 @@ build*/ compile_commands.json .clangd __pycache__/ +*.egg-info/ +.pytest_cache/ +wheelhouse/ +dist/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 3a6e3d5..bf75aa6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,3 +82,8 @@ if(BUILD_BENCHMARKS) find_package(benchmark REQUIRED CONFIG) add_subdirectory(benchmarks) endif() + +# --- Python bindings (built only when invoked via scikit-build-core) --- +if(SKBUILD) + add_subdirectory(py_bindings) +endif() diff --git a/examples/python/hello_camera_zerocopy.py b/examples/python/hello_camera_zerocopy.py new file mode 100644 index 0000000..b548f5b --- /dev/null +++ b/examples/python/hello_camera_zerocopy.py @@ -0,0 +1,86 @@ +"""Zero-copy camera-frame publishing — the use case the Python bindings' +Publisher.allocate / Subscriber.try_receive_view path was designed for. + +Shows how to: + 1. Reserve a slot directly in shared memory via Publisher.allocate(size) + and fill it in-place (no intermediate buffer). Here we simulate a + camera frame with a byte pattern; in a real pipeline you'd: + - cv2.imdecode / numpy.copyto into the slot, or + - DMA from a V4L2 buffer into the slot, or + - render directly into the slot. + 2. Receive it zero-copy on the other side via Subscriber.try_receive_view + and read the bytes through a read-only memoryview — no copy either. + +For a real camera, swap the `fill_frame` lines for your actual capture. +""" + +from __future__ import annotations + +import kickmsg + + +def fill_frame(buf: memoryview, height: int, width: int) -> None: + """Stand-in for a real capture — writes a cheap pattern.""" + stride = width * 3 + for row in range(height): + for col in range(width): + off = row * stride + col * 3 + buf[off] = row & 0xFF # R + buf[off + 1] = col & 0xFF # G + buf[off + 2] = (row + col) & 0xFF # B + + +def main() -> int: + width, height = 320, 240 + frame_bytes = width * height * 3 # RGB + + cfg = kickmsg.Config() + cfg.max_subscribers = 2 + cfg.sub_ring_capacity = 4 + cfg.pool_size = 8 + cfg.max_payload_size = frame_bytes + + camera = kickmsg.Node("camera", "demo_cam") + viewer = kickmsg.Node("viewer", "demo_cam") + camera.unlink_topic("frames") + + pub = camera.advertise("frames", cfg) + sub = viewer.subscribe("frames") + + for i in range(3): + # Zero-copy capture: get a writable memoryview into the SHM slot, + # fill it in place, publish. Nothing is copied on the publish side. + buf = pub.allocate(frame_bytes) + if buf is None: + print(f"frame {i}: pool exhausted, dropping") + continue + fill_frame(buf, height, width) + pub.publish() + print(f"Published frame {i} ({width}x{height} RGB, {frame_bytes} B zero-copy)") + + for i in range(3): + view = sub.try_receive_view() + if view is None: + break + try: + mv = view.data() + # Spot-check: no memcpy performed on the receive side either. + print(f"Received frame {i}: {view.len()} B, " + f"pixel[0,0]=RGB({mv[0]},{mv[1]},{mv[2]}), " + f"pixel[{height-1},{width-1}]=" + f"RGB({mv[(height-1)*width*3 + (width-1)*3]}," + f"{mv[(height-1)*width*3 + (width-1)*3 + 1]}," + f"{mv[(height-1)*width*3 + (width-1)*3 + 2]})") + finally: + # Drop the slot pin as soon as we're done reading. Without + # this, the slot stays pinned until the SampleView is GC'd, + # and the pool could run dry. + view.release() + + camera.unlink_topic("frames") + print("Done.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/python/hello_pubsub.py b/examples/python/hello_pubsub.py new file mode 100644 index 0000000..2369628 --- /dev/null +++ b/examples/python/hello_pubsub.py @@ -0,0 +1,38 @@ +"""Basic pub/sub via the Node API — Python counterpart of examples/hello_pubsub.cc.""" + +from __future__ import annotations + +import struct + +import kickmsg + + +def main() -> int: + prefix = "demo_py" + topic = "temperature" + + sensor = kickmsg.Node("sensor", prefix) + display = kickmsg.Node("display", prefix) + + # Clean any leftover SHM from prior runs. + sensor.unlink_topic(topic) + + pub = sensor.advertise(topic) + sub = display.subscribe(topic) + + readings = [(1, 22.5), (2, 19.8), (1, 23.1), (3, 31.4), (2, 20.0)] + for sensor_id, celsius in readings: + # Pack as (uint32 id, float celsius) — same layout as the C++ example. + pub.send(struct.pack(" kickmsg.SchemaInfo: + info = kickmsg.SchemaInfo() + info.identity = kickmsg.hash.identity_from_fnv1a( + "demo.Imu(timestamp_ns:u64, ax:f32, ay:f32, az:f32)" + ) + info.name = "demo/Imu" + info.version = version + info.identity_algo = 1 # user tag — e.g. 1 = FNV-1a-64 + return info + + +def main() -> int: + prefix = "demo_py_schema" + topic = "imu" + + # Publisher node bakes a v2 schema into the region at create time. + pub_node = kickmsg.Node("driver", prefix) + pub_node.unlink_topic(topic) + + cfg = kickmsg.Config() + cfg.max_subscribers = 4 + cfg.sub_ring_capacity = 8 + cfg.pool_size = 16 + cfg.max_payload_size = 32 + cfg.schema = make_imu_schema(version=2) + + _ = pub_node.advertise(topic, cfg) + print("[driver] advertised '%s' with schema %s v%d" % ( + topic, cfg.schema.name, cfg.schema.version)) + + # Good subscriber: expects v2 → matches → proceeds. + good = kickmsg.Node("good_sub", prefix) + _ = good.subscribe(topic) + got = good.topic_schema(topic) + expected = make_imu_schema(version=2) + d = kickmsg.schema.diff(got, expected) + print("[good_sub] observed %s v%d — diff=0x%x %s" % ( + got.name, got.version, d, "OK" if d == kickmsg.schema.Diff.Equal else "MISMATCH")) + + # Bad subscriber: expects v1 → diff reports Version mismatch → refuses. + bad = kickmsg.Node("bad_sub", prefix) + _ = bad.subscribe(topic) + got = bad.topic_schema(topic) + expected_v1 = make_imu_schema(version=1) + d = kickmsg.schema.diff(got, expected_v1) + if d & int(kickmsg.schema.Diff.Version): + print("[bad_sub] version mismatch (observed v%d, expected v1) — refusing" + % got.version) + + pub_node.unlink_topic(topic) + print("Done.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/include/kickmsg/Node.h b/include/kickmsg/Node.h index c5bbefb..1ae8a01 100644 --- a/include/kickmsg/Node.h +++ b/include/kickmsg/Node.h @@ -3,7 +3,7 @@ #include #include -#include +#include #include "kickmsg/Region.h" #include "kickmsg/Publisher.h" @@ -30,6 +30,16 @@ namespace kickmsg public: Node(std::string const& name, std::string const& prefix = "kickmsg"); + // Explicit non-copyable / move-only. Node already holds SharedRegion + // values (move-only), so it's non-copyable de facto; declaring it + // explicitly short-circuits SFINAE probes from binding frameworks + // that would otherwise eagerly instantiate the internal container's + // copy machinery to check copyability. + Node(Node const&) = delete; + Node& operator=(Node const&) = delete; + Node(Node&&) noexcept = default; + Node& operator=(Node&&) noexcept = default; + // --- PubSub (topic-centric, 1-to-N by convention) --- // // Strict variants assume a fixed startup order: @@ -113,11 +123,9 @@ namespace kickmsg std::string make_broadcast_name(char const* channel) const; std::string make_mailbox_name(char const* owner, char const* tag) const; - // Lifetime: the returned pointer is only valid until the next - // mutation of regions_. A caller MUST NOT hold the pointer - // across an emplace_back/emplace_or_reuse call — vector realloc - // would move the SharedRegion object (the mmap stays put, but - // the SharedRegion handle would dangle). + // unordered_map guarantees reference stability for elements + // (only iterators are invalidated on rehash), so pointers + // returned here remain valid across subsequent insertions. SharedRegion* find_region(std::string const& shm_name); SharedRegion const* find_region(std::string const& shm_name) const; @@ -132,12 +140,15 @@ namespace kickmsg std::string name_; std::string prefix_; - // Linear search by SharedRegion::name() — a Node typically holds - // a handful of regions, so O(N) lookup is negligible. mmap - // addresses captured by Publisher/Subscriber stay valid when - // emplace_back reallocates the vector (handles point into the - // mapped pages, not into the SharedRegion object). - std::vector regions_; + // Keyed by SHM name for O(1) lookup. A telemetry node on a + // humanoid robot can easily hold 100-300 topics (joints × (meas, + // target) + cameras + IMUs + force sensors + hands), so O(N) + // linear search over a vector/deque starts to matter. The + // duplication with SharedRegion::name() costs ~30 B per entry — + // negligible at any scale we care about. unordered_map also + // guarantees reference stability for elements (the mmap addresses + // used by Publisher/Subscriber don't move on rehash). + std::unordered_map regions_; }; } diff --git a/os/darwin/SharedMemory.cc b/os/darwin/SharedMemory.cc index fc8537d..f3b9ff0 100644 --- a/os/darwin/SharedMemory.cc +++ b/os/darwin/SharedMemory.cc @@ -71,17 +71,22 @@ namespace kickmsg void SharedMemory::create(std::string const& name, std::size_t size) { - // macOS quirk: shm_open returns EINVAL when passed O_TRUNC on an - // existing SHM object — Linux accepts it but Darwin rejects it. - // This matters on the create_or_open() fast path, where try_create - // first opens the object with O_CREAT|O_EXCL and closes the fd, - // then this function reopens it to size and map. Calling - // shm_unlink() first makes create() idempotent from the caller's - // point of view and sidesteps the O_TRUNC incompatibility: the - // subsequent shm_open sees a name that either didn't exist or - // was just detached, and then ftruncate(size) is always the - // first sizing call on the fresh object (which macOS also - // requires — a SHM object can only be ftruncated once). + // macOS has two shm_open / ftruncate quirks that the original + // `O_CREAT | O_TRUNC` Linux pattern trips over: + // 1. shm_open(O_CREAT|O_TRUNC) on an existing SHM object + // returns EINVAL — Linux accepts it, Darwin rejects it. + // 2. ftruncate() can only be called once per SHM object; a + // second call on the same object returns EINVAL. + // Unlink-then-exclusive-create sidesteps both: the subsequent + // shm_open sees a name that either didn't exist or was just + // detached, and the following ftruncate is always the first + // sizing call on a fresh object. + // + // This function is called by SharedRegion::create() (the strict + // factory where the caller intends exclusive ownership). The + // race-prone caller SharedRegion::create_or_open() was refactored + // to NOT re-enter this function after its try_create probe — it + // stamps the header directly on the probe's mapping. ::shm_unlink(name.c_str()); fd_ = ::shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); if (fd_ < 0) diff --git a/py_bindings/CMakeLists.txt b/py_bindings/CMakeLists.txt new file mode 100644 index 0000000..eab85b4 --- /dev/null +++ b/py_bindings/CMakeLists.txt @@ -0,0 +1,28 @@ +if (SKBUILD) + set(LIB_SRCS + ${CMAKE_CURRENT_SOURCE_DIR}/src/kickmsg_py.cc + ) + + find_package(Python + REQUIRED COMPONENTS Interpreter Development.Module + OPTIONAL_COMPONENTS Development.SABIModule) + execute_process( + COMMAND "${Python_EXECUTABLE}" -m nanobind --cmake_dir + OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE nanobind_ROOT) + find_package(nanobind CONFIG REQUIRED) + + nanobind_add_module(kickmsg_py ${LIB_SRCS} NB_STATIC STABLE_ABI) + target_link_libraries(kickmsg_py PRIVATE kickmsg) + target_compile_options(kickmsg_py PRIVATE -Wno-shadow -Wno-cast-qual -Wno-pedantic) + + # GNU-ld-only size optimizations. ld64 (Apple) silently ignores some + # -Wl flags and errors on others; MSVC link is entirely different. + if(NOT APPLE AND NOT MSVC) + target_link_options(kickmsg_py PRIVATE + $<$:-Wl,--gc-sections -Wl,--strip-all>) + endif() + + # Output name is 'kickmsg' so Python imports as `import kickmsg`. + set_target_properties(kickmsg_py PROPERTIES OUTPUT_NAME "kickmsg") + install(TARGETS kickmsg_py DESTINATION .) +endif() diff --git a/py_bindings/src/kickmsg_py.cc b/py_bindings/src/kickmsg_py.cc new file mode 100644 index 0000000..72de929 --- /dev/null +++ b/py_bindings/src/kickmsg_py.cc @@ -0,0 +1,489 @@ +/// @file kickmsg_py.cc +/// @brief Python bindings for Kickmsg (nanobind-based). +/// +/// Layout: +/// kickmsg — module +/// ChannelType — enum +/// Config — channel::Config +/// SchemaInfo — payload schema descriptor +/// HealthReport — SharedRegion::diagnose() result +/// SharedRegion — factory methods + schema/health/repair +/// Publisher — send(bytes) and allocate()/publish() zero-copy +/// Subscriber — try_receive / receive (GIL release) / *_view +/// SampleRef — copy-based sample (bytes) +/// SampleView — zero-copy sample (buffer protocol) +/// BroadcastHandle — NamedTuple-like (pub, sub) +/// Node — high-level topic / broadcast / mailbox +/// schema (submodule) +/// Diff — enum (bitmask) +/// diff(a, b) — pure diff function +/// hash (submodule) +/// fnv1a_64(data[, seed]) +/// identity_from_fnv1a(descriptor) +/// +/// Zero-copy contract: +/// - Publisher.allocate(len) returns a writable memoryview that points +/// directly into a shared-memory slot. The user fills it (memcpy, +/// numpy.copyto, cv2.imencode, etc.), then calls Publisher.publish(). +/// The memoryview is valid only between allocate() and publish(). +/// - SampleView supports the buffer protocol as a read-only memoryview +/// pointing into the same shared-memory slot. The slot stays pinned +/// while the SampleView is alive; release the pin by deleting the +/// SampleView or exiting its `with` block. + +#include +#include +#include +#include + +#include +#include +#include + +#include "kickmsg/Node.h" +#include "kickmsg/Publisher.h" +#include "kickmsg/Region.h" +#include "kickmsg/Subscriber.h" +#include "kickmsg/Hash.h" +#include "kickmsg/types.h" + +namespace nb = nanobind; +using namespace nb::literals; + +namespace +{ + // Helper: build a read-only memoryview over raw bytes (no copy). + nb::object memview_readonly(void const* ptr, std::size_t len) + { + PyObject* obj = PyMemoryView_FromMemory( + reinterpret_cast(const_cast(ptr)), + static_cast(len), PyBUF_READ); + if (obj == nullptr) + { + throw nb::python_error(); + } + return nb::steal(obj); + } + + // Helper: build a writable memoryview over raw bytes (no copy). + nb::object memview_writable(void* ptr, std::size_t len) + { + PyObject* obj = PyMemoryView_FromMemory( + reinterpret_cast(ptr), + static_cast(len), PyBUF_WRITE); + if (obj == nullptr) + { + throw nb::python_error(); + } + return nb::steal(obj); + } + + // Convert SchemaInfo.name (fixed-size NUL-terminated char array) to string. + std::string schema_name_str(kickmsg::SchemaInfo const& s) + { + std::size_t n = ::strnlen(s.name, sizeof(s.name)); + return std::string{s.name, n}; + } + + void set_schema_name(kickmsg::SchemaInfo& s, std::string const& name) + { + std::size_t n = std::min(name.size(), sizeof(s.name) - 1); + std::memset(s.name, 0, sizeof(s.name)); + std::memcpy(s.name, name.data(), n); + } +} + +namespace kickmsg +{ + NB_MODULE(kickmsg, m) + { + m.doc() = "Kickmsg — lock-free shared-memory IPC"; + + // ------------------------------------------------------------------- + // Enums & simple types + // ------------------------------------------------------------------- + + nb::enum_(m, "ChannelType") + .value("PubSub", channel::PubSub) + .value("Broadcast", channel::Broadcast); + + nb::class_(m, "Config") + .def(nb::init<>()) + .def_rw("max_subscribers", &channel::Config::max_subscribers) + .def_rw("sub_ring_capacity", &channel::Config::sub_ring_capacity) + .def_rw("pool_size", &channel::Config::pool_size) + .def_rw("max_payload_size", &channel::Config::max_payload_size) + .def_prop_rw("commit_timeout_us", + [](channel::Config const& c) -> int64_t + { return c.commit_timeout.count(); }, + [](channel::Config& c, int64_t us) + { c.commit_timeout = microseconds{us}; }, + "Commit timeout in microseconds.") + .def_rw("schema", &channel::Config::schema); + + // ------------------------------------------------------------------- + // SchemaInfo + schema submodule (Diff / diff) + // ------------------------------------------------------------------- + + nb::class_(m, "SchemaInfo") + .def(nb::init<>()) + .def_prop_rw("identity", + [](SchemaInfo const& s) -> nb::bytes + { return nb::bytes(reinterpret_cast(s.identity.data()), + s.identity.size()); }, + [](SchemaInfo& s, nb::bytes const& b) + { + if (b.size() != s.identity.size()) + { + throw std::runtime_error( + "SchemaInfo.identity must be exactly 64 bytes"); + } + std::memcpy(s.identity.data(), b.c_str(), s.identity.size()); + }) + .def_prop_rw("layout", + [](SchemaInfo const& s) -> nb::bytes + { return nb::bytes(reinterpret_cast(s.layout.data()), + s.layout.size()); }, + [](SchemaInfo& s, nb::bytes const& b) + { + if (b.size() != s.layout.size()) + { + throw std::runtime_error( + "SchemaInfo.layout must be exactly 64 bytes"); + } + std::memcpy(s.layout.data(), b.c_str(), s.layout.size()); + }) + .def_prop_rw("name", + [](SchemaInfo const& s) -> std::string { return schema_name_str(s); }, + [](SchemaInfo& s, std::string const& n) { set_schema_name(s, n); }) + .def_rw("version", &SchemaInfo::version) + .def_rw("identity_algo", &SchemaInfo::identity_algo) + .def_rw("layout_algo", &SchemaInfo::layout_algo) + .def_rw("flags", &SchemaInfo::flags) + .def("__repr__", [](SchemaInfo const& s) + { + return "SchemaInfo(name='" + schema_name_str(s) + + "', version=" + std::to_string(s.version) + ")"; + }); + + auto schema_mod = m.def_submodule("schema", "Schema diff helpers"); + nb::enum_(schema_mod, "Diff", nb::is_arithmetic()) + .value("Equal", schema::Equal) + .value("Identity", schema::Identity) + .value("Layout", schema::Layout) + .value("Version", schema::Version) + .value("Name", schema::Name) + .value("IdentityAlgo", schema::IdentityAlgo) + .value("LayoutAlgo", schema::LayoutAlgo); + schema_mod.def("diff", &schema::diff, "a"_a, "b"_a, + "Return a schema.Diff bitmask of the fields that differ."); + + // ------------------------------------------------------------------- + // hash submodule + // ------------------------------------------------------------------- + + auto hash_mod = m.def_submodule("hash", "Optional FNV-1a hash helpers"); + hash_mod.attr("FNV1A_64_OFFSET_BASIS") = + static_cast(hash::FNV1A_64_OFFSET_BASIS); + hash_mod.def("fnv1a_64", + [](nb::bytes const& data, uint64_t seed) -> uint64_t + { return hash::fnv1a_64(data.c_str(), data.size(), seed); }, + "data"_a, "seed"_a = hash::FNV1A_64_OFFSET_BASIS, + "64-bit FNV-1a of a byte string. Chain with `seed=h` to extend."); + hash_mod.def("identity_from_fnv1a", + [](std::string const& descriptor) -> nb::bytes + { + auto arr = hash::identity_from_fnv1a(descriptor); + return nb::bytes(reinterpret_cast(arr.data()), arr.size()); + }, + "descriptor"_a, + "Pack a 64-bit FNV-1a of `descriptor` into the leading 8 bytes " + "of a 64-byte identity slot, zero-padding the rest."); + + // ------------------------------------------------------------------- + // HealthReport + // ------------------------------------------------------------------- + + nb::class_(m, "HealthReport") + .def_ro("locked_entries", &SharedRegion::HealthReport::locked_entries) + .def_ro("retired_rings", &SharedRegion::HealthReport::retired_rings) + .def_ro("draining_rings", &SharedRegion::HealthReport::draining_rings) + .def_ro("live_rings", &SharedRegion::HealthReport::live_rings) + .def_ro("schema_stuck", &SharedRegion::HealthReport::schema_stuck) + .def("__repr__", [](SharedRegion::HealthReport const& r) + { + return "HealthReport(locked=" + std::to_string(r.locked_entries) + + ", retired=" + std::to_string(r.retired_rings) + + ", draining=" + std::to_string(r.draining_rings) + + ", live=" + std::to_string(r.live_rings) + + ", schema_stuck=" + (r.schema_stuck ? "True" : "False") + ")"; + }); + + // ------------------------------------------------------------------- + // SharedRegion + // ------------------------------------------------------------------- + + nb::class_(m, "SharedRegion") + .def_static("create", + [](char const* name, channel::Type type, + channel::Config const& cfg, std::string const& creator) + { return SharedRegion::create(name, type, cfg, creator.c_str()); }, + "name"_a, "type"_a, "cfg"_a, "creator"_a = std::string{""}, + nb::rv_policy::move) + .def_static("open", &SharedRegion::open, "name"_a, + nb::rv_policy::move) + .def_static("create_or_open", + [](char const* name, channel::Type type, + channel::Config const& cfg, std::string const& creator) + { return SharedRegion::create_or_open(name, type, cfg, creator.c_str()); }, + "name"_a, "type"_a, "cfg"_a, "creator"_a = std::string{""}, + nb::rv_policy::move) + .def("name", &SharedRegion::name) + .def("channel_type", &SharedRegion::channel_type) + .def("schema", &SharedRegion::schema) + .def("try_claim_schema", &SharedRegion::try_claim_schema, "info"_a) + .def("reset_schema_claim", &SharedRegion::reset_schema_claim) + .def("diagnose", &SharedRegion::diagnose) + .def("repair_locked_entries", &SharedRegion::repair_locked_entries) + .def("reset_retired_rings", &SharedRegion::reset_retired_rings) + .def("reclaim_orphaned_slots",&SharedRegion::reclaim_orphaned_slots) + .def("unlink", &SharedRegion::unlink); + + m.def("unlink_shm", [](std::string const& name) { SharedMemory::unlink(name); }, + "name"_a, "Unlink a shared-memory entry by name (no-op if absent)."); + + // ------------------------------------------------------------------- + // SampleRef — byte-copy sample. + // Returns a bytes copy of the payload (the buffer is subscriber-local + // and reused across try_receive() calls, so copying out is the only + // way to keep the bytes beyond the next receive). + // ------------------------------------------------------------------- + + nb::class_(m, "SampleRef") + .def("data", + [](Subscriber::SampleRef const& s) -> nb::bytes + { + return nb::bytes(reinterpret_cast(s.data()), + s.len()); + }, + "Return the payload as a bytes object (copies out of the " + "subscriber-local buffer).") + .def("len", &Subscriber::SampleRef::len) + .def("ring_pos", &Subscriber::SampleRef::ring_pos); + + // ------------------------------------------------------------------- + // SampleView — zero-copy, pins the slot. + // Buffer protocol via memoryview(view). The view holds a refcount + // pin on the slot; the pin is released when the view is destroyed. + // ------------------------------------------------------------------- + + nb::class_(m, "SampleView") + .def("data", + [](Subscriber::SampleView const& v) -> nb::object + { + if (not v.valid()) + { + return nb::none(); + } + return memview_readonly(v.data(), v.len()); + }, + "Return a read-only memoryview pointing directly at the " + "pinned shared-memory slot (zero-copy). Valid while this " + "SampleView is alive.") + .def("len", &Subscriber::SampleView::len) + .def("ring_pos", &Subscriber::SampleView::ring_pos) + .def("valid", &Subscriber::SampleView::valid) + .def("release", + [](Subscriber::SampleView& v) + { + // Destroy-in-place by move-assigning a default-constructed + // view: the release() private helper fires in the + // destructor of the temporary. + v = Subscriber::SampleView{}; + }, + "Release the slot pin early (equivalent to deleting the view).") + // Note on context-manager support: initial attempts at + // __enter__/__exit__ bindings produced nanobind dispatch + // errors due to the interaction between the returned + // reference and nanobind's per-argument type matching. + // Users who want `with` semantics can implement a tiny + // Python wrapper calling .release() — or rely on Python's + // GC to release the pin when the SampleView goes out of + // scope. + ; + + // ------------------------------------------------------------------- + // Publisher + // ------------------------------------------------------------------- + + // Publisher holds raw pointers into the SharedRegion's mmap. + // keep_alive<1, 2>: arg 2 (region) must stay alive while arg 1 + // (self) is alive — otherwise the mmap could be unmapped before + // the Publisher's destructor runs, producing a segfault. + nb::class_(m, "Publisher") + .def(nb::init(), "region"_a, + nb::keep_alive<1, 2>()) + .def("send", + [](Publisher& p, nb::bytes const& data) -> int32_t + { return p.send(data.c_str(), data.size()); }, + "data"_a, + "Copy `data` into a slot and publish. Returns bytes written " + "or a negative errno-style code.") + .def("allocate", + [](Publisher& p, std::size_t len) -> nb::object + { + void* ptr = p.allocate(len); + if (ptr == nullptr) + { + return nb::none(); + } + return memview_writable(ptr, len); + }, + "len"_a, + "Reserve a slot of `len` bytes and return a writable memoryview " + "pointing directly at it. Fill the view, then call publish(). " + "Returns None if the pool is exhausted.") + .def("publish", + [](Publisher& p) -> std::size_t { return p.publish(); }, + "Commit the slot reserved by the last allocate() call. " + "Returns the number of rings the sample was delivered to.") + .def("dropped", &Publisher::dropped); + + // ------------------------------------------------------------------- + // Subscriber + // ------------------------------------------------------------------- + + // Same lifetime rule as Publisher — region's mmap must outlive + // the Subscriber. + nb::class_(m, "Subscriber") + .def(nb::init(), "region"_a, + nb::keep_alive<1, 2>()) + .def("try_receive", + [](Subscriber& s) -> nb::object + { + auto sample = s.try_receive(); + if (not sample.has_value()) + { + return nb::none(); + } + return nb::bytes( + reinterpret_cast(sample->data()), + sample->len()); + }, + "Non-blocking receive. Returns bytes on success, None if " + "no message is available.") + .def("receive", + [](Subscriber& s, int64_t timeout_ns) -> nb::object + { + std::optional sample; + { + nb::gil_scoped_release release; + sample = s.receive(nanoseconds{timeout_ns}); + } + if (not sample.has_value()) + { + return nb::none(); + } + return nb::bytes( + reinterpret_cast(sample->data()), + sample->len()); + }, + "timeout_ns"_a, + "Blocking receive with timeout. Releases the GIL while " + "waiting. Returns bytes on success, None on timeout.") + // keep_alive<0, 1>: the returned SampleView (arg 0) must keep + // the Subscriber (arg 1 = self) alive — the view dereferences + // mmap pointers owned transitively by self on destruction. + .def("try_receive_view", + [](Subscriber& s) -> std::optional + { return s.try_receive_view(); }, + nb::rv_policy::move, + nb::keep_alive<0, 1>(), + "Non-blocking zero-copy receive. Returns a SampleView (pins " + "the slot) or None.") + .def("receive_view", + [](Subscriber& s, int64_t timeout_ns) + -> std::optional + { + nb::gil_scoped_release release; + return s.receive_view(nanoseconds{timeout_ns}); + }, + "timeout_ns"_a, + nb::rv_policy::move, + nb::keep_alive<0, 1>(), + "Blocking zero-copy receive. Releases the GIL. Returns a " + "SampleView or None on timeout.") + .def("lost", &Subscriber::lost) + .def("drain_timeouts", &Subscriber::drain_timeouts); + + // ------------------------------------------------------------------- + // BroadcastHandle + // ------------------------------------------------------------------- + + // BroadcastHandle — Publisher/Subscriber are move-only, so the + // fields are exposed read-only. The handle itself is the owner; + // callers use .pub / .sub as references. + nb::class_(m, "BroadcastHandle") + .def_prop_ro("pub", + [](BroadcastHandle& h) -> Publisher& { return h.pub; }, + nb::rv_policy::reference_internal) + .def_prop_ro("sub", + [](BroadcastHandle& h) -> Subscriber& { return h.sub; }, + nb::rv_policy::reference_internal); + + // ------------------------------------------------------------------- + // Node + // ------------------------------------------------------------------- + + // Node-returned Publisher/Subscriber/BroadcastHandle all point + // into SharedRegion objects stored inside the Node itself. + // keep_alive<0, 1>: the return value (0) pins the Node (1 = self). + nb::class_(m, "Node") + .def(nb::init(), + "name"_a, "prefix"_a = std::string{"kickmsg"}) + .def("advertise", + [](Node& n, char const* topic, channel::Config const& cfg) + { return n.advertise(topic, cfg); }, + "topic"_a, "cfg"_a = channel::Config{}, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("subscribe", &Node::subscribe, "topic"_a, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("advertise_or_join", &Node::advertise_or_join, "topic"_a, "cfg"_a, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("subscribe_or_create", &Node::subscribe_or_create, "topic"_a, "cfg"_a, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("join_broadcast", + [](Node& n, char const* channel, channel::Config const& cfg) + { return n.join_broadcast(channel, cfg); }, + "channel"_a, "cfg"_a = channel::Config{}, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("create_mailbox", + [](Node& n, char const* tag, channel::Config const& cfg) + { return n.create_mailbox(tag, cfg); }, + "tag"_a, "cfg"_a = channel::Config{}, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("open_mailbox", &Node::open_mailbox, "owner_node"_a, "tag"_a, + nb::rv_policy::move, nb::keep_alive<0, 1>()) + .def("unlink_topic", &Node::unlink_topic, "topic"_a) + .def("unlink_broadcast", &Node::unlink_broadcast, "channel"_a) + .def("unlink_mailbox", + [](Node const& n, char const* tag, + std::optional const& owner_node) + { + if (owner_node.has_value()) + { + n.unlink_mailbox(tag, owner_node->c_str()); + } + else + { + n.unlink_mailbox(tag); + } + }, + "tag"_a, "owner_node"_a = nb::none()) + .def("topic_schema", &Node::topic_schema, "topic"_a) + .def("try_claim_topic_schema", &Node::try_claim_topic_schema, + "topic"_a, "info"_a) + .def("name", &Node::name) + .def("prefix", &Node::prefix); + } +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..71896f4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,116 @@ +[build-system] +requires = ["scikit-build-core>=0.5.0", "nanobind>=1.3.2", "conan"] +build-backend = "scikit_build_core.build" + +[project] +name = "kickmsg" +version = "0.0.0" +description = "Lock-free shared-memory messaging library for inter-process communication." +readme = {file = "README.md", content-type = "text/markdown"} +license = { text = "CeCILL-C" } +maintainers = [ + {name = "Philippe Leduc", email = "philippe.leduc@mailfence.com"} +] + +[tool.scikit-build] +minimum-version = "0.11" +cmake.version = ">=3.30" +ninja.make-fallback = true +wheel.cmake = true +editable.verbose = true + +[tool.scikit-build.cmake.define] +CMAKE_INSTALL_RPATH_USE_LINK_PATH = true +CMAKE_BUILD_WITH_INSTALL_RPATH = true +CMAKE_INSTALL_RPATH = '$ORIGIN' +CMAKE_INTERPROCEDURAL_OPTIMIZATION = true +BUILD_UNIT_TESTS = "OFF" +BUILD_EXAMPLES = "OFF" +BUILD_BENCHMARKS = "OFF" +# Linker flags for strip + dead-code elimination are GNU-ld-only, so they +# live in py_bindings/CMakeLists.txt guarded by NOT APPLE (ld64 doesn't +# support --gc-sections / --strip-all). + +[tool.cibuildwheel] +# https://github.com/pypa/cibuildwheel/blob/main/docs/options.md +build-verbosity = 1 +manylinux-x86_64-image = "manylinux_2_28" +manylinux-aarch64-image = "manylinux_2_28" +build = ["cp310-*", "cp311-*", "cp312-*"] +skip = ["cp310-musllinux*", "cp311-musllinux*", "cp312-musllinux*"] + +[[tool.cibuildwheel.overrides]] +# nanobind does not support Stable ABI for Python < 3.12 - Linux +select = "cp3{10,11}-*linux*" +config-settings = { "cmake.define.CMAKE_CI_BUILD" = true, "cmake.define.CMAKE_FIND_ROOT_PATH" = "/tmp/kickmsg_build_config", "cmake.define.CMAKE_TOOLCHAIN_FILE" = "/tmp/kickmsg_build_config/toolchain.cmake" } +repair-wheel-command = [ + "auditwheel repair --zip-compression-level=9 -w {dest_dir} {wheel}" +] + +[[tool.cibuildwheel.overrides]] +# nanobind does not support Stable ABI for Python < 3.12 - macOS +select = "cp3{10,11}-macosx*" +config-settings = { "cmake.define.CMAKE_CI_BUILD" = true, "cmake.define.CMAKE_OSX_DEPLOYMENT_TARGET" = "11.0", "cmake.define.CMAKE_FIND_ROOT_PATH" = "/tmp/kickmsg_build_config", "cmake.define.CMAKE_TOOLCHAIN_FILE" = "/tmp/kickmsg_build_config/toolchain.cmake" } +repair-wheel-command = [ + "delocate-wheel -w {dest_dir} {wheel}" +] + +[tool.cibuildwheel.config-settings] +"wheel.py-api" = "cp312" + +[tool.cibuildwheel.config-settings.cmake.define] +CMAKE_CI_BUILD = true +CMAKE_FIND_ROOT_PATH = "/tmp/kickmsg_build_config" +CMAKE_TOOLCHAIN_FILE = "/tmp/kickmsg_build_config/toolchain.cmake" + +[tool.cibuildwheel.linux] +before-all = [ + 'pipx install conan', + './scripts/setup_build.sh /tmp/kickmsg_build_config', + 'source tools/setup/version.sh && sed -i "s/0\.0\.0/${VERSION}/g" pyproject.toml' +] +repair-wheel-command = [ + "auditwheel repair --zip-compression-level=9 -w {dest_dir} {wheel}", + "pipx run abi3audit --strict --report {wheel}", +] + +[tool.cibuildwheel.macos] +# universal2 → single wheel that runs on both x86_64 and Apple Silicon. +# 11.0 = Big Sur — earliest macOS that supports Apple Silicon natively, +# so it's the correct floor for universal2 wheels. BSD sed uses `-i ''` +# (empty backup extension) where GNU sed uses `-i` bare. +environment = { MACOSX_DEPLOYMENT_TARGET = "11.0" } +archs = ["universal2"] + +before-all = [ + "pipx install conan", + "./scripts/setup_build.sh /tmp/kickmsg_build_config", + "source tools/setup/version.sh && sed -i '' \"s/0\\.0\\.0/${VERSION}/g\" pyproject.toml" +] + +repair-wheel-command = [ + "delocate-wheel -w {dest_dir} {wheel}", + "pipx run abi3audit --strict --report {wheel}" +] + +[tool.pytest.ini_options] +testpaths = ["tests/python"] + +[tool.ruff] +line-length = 120 +target-version = "py310" +exclude = [ + ".venv", + "build", + "build_*", +] + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W", "B", "UP"] +ignore = [] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +skip-magic-trailing-comma = false +line-ending = "auto" diff --git a/src/Node.cc b/src/Node.cc index c88e728..684ef75 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -8,12 +8,13 @@ namespace kickmsg { } - // Called by the *_or_* variants and by subscribe / open_mailbox to - // avoid registering two SharedRegion objects for the same SHM: two - // regions pointing at the same mmap would double-unmap on destruction - // and silently invalidate each other's mapping. Strict advertise() - // doesn't need this because SharedRegion::create() itself rejects a - // pre-existing SHM entry. + // Shared insertion helper. For idempotent factories (open, + // create_or_open), duplicate calls on the same SHM name return the + // existing region instead of double-mapping the mmap — two + // SharedRegion objects for the same mmap would double-unmap on + // destruction and corrupt each other's state. Strict factories + // (create, create_mailbox) never hit this path because + // SharedRegion::create rejects a pre-existing SHM entry. SharedRegion& Node::emplace_or_reuse(std::string const& shm_name, SharedRegion&& region) { @@ -21,16 +22,17 @@ namespace kickmsg { return *existing; } - regions_.emplace_back(std::move(region)); - return regions_.back(); + auto [it, _] = regions_.emplace(shm_name, std::move(region)); + return it->second; } Publisher Node::advertise(char const* topic, channel::Config const& cfg) { auto shm_name = make_topic_name(topic); - regions_.emplace_back( + auto [it, _] = regions_.emplace( + shm_name, SharedRegion::create(shm_name.c_str(), channel::PubSub, cfg, name_.c_str())); - return Publisher(regions_.back()); + return Publisher(it->second); } Subscriber Node::subscribe(char const* topic) @@ -76,9 +78,10 @@ namespace kickmsg channel::Config mbx_cfg = cfg; mbx_cfg.max_subscribers = 1; auto shm_name = make_mailbox_name(name_.c_str(), tag); - regions_.emplace_back( + auto [it, _] = regions_.emplace( + shm_name, SharedRegion::create(shm_name.c_str(), channel::PubSub, mbx_cfg, name_.c_str())); - return Subscriber(regions_.back()); + return Subscriber(it->second); } Publisher Node::open_mailbox(char const* owner_node, char const* tag) @@ -146,25 +149,21 @@ namespace kickmsg SharedRegion* Node::find_region(std::string const& shm_name) { - for (auto& region : regions_) + auto it = regions_.find(shm_name); + if (it == regions_.end()) { - if (region.name() == shm_name) - { - return ®ion; - } + return nullptr; } - return nullptr; + return &it->second; } SharedRegion const* Node::find_region(std::string const& shm_name) const { - for (auto const& region : regions_) + auto it = regions_.find(shm_name); + if (it == regions_.end()) { - if (region.name() == shm_name) - { - return ®ion; - } + return nullptr; } - return nullptr; + return &it->second; } } diff --git a/tests/python/conftest.py b/tests/python/conftest.py new file mode 100644 index 0000000..f080a86 --- /dev/null +++ b/tests/python/conftest.py @@ -0,0 +1,42 @@ +"""Pytest fixtures for Kickmsg Python tests. + +Each test gets a unique SHM name and guaranteed cleanup before + after, +so a crashed prior run can't leave stale state that poisons the next one. +""" + +from __future__ import annotations + +import os + +import pytest + +import kickmsg + + +@pytest.fixture +def shm_name(request: pytest.FixtureRequest) -> str: + """Unique per-test SHM name, auto-cleaned before and after. + + macOS caps shm names at roughly 31 chars including the leading `/`, + so we truncate the *test slug* while keeping the PID suffix intact — + otherwise parallel pytest-xdist workers with overlapping slugs would + collide on the same shared-memory object. + """ + slug = request.node.nodeid.replace("/", "_").replace("::", "_").replace(":", "_") + pid = os.getpid() + pid_suffix = f"_{pid}" + max_slug = 30 - len("/pytest_") - len(pid_suffix) + name = f"/pytest_{slug[:max_slug]}{pid_suffix}" + kickmsg.unlink_shm(name) + yield name + kickmsg.unlink_shm(name) + + +@pytest.fixture +def small_cfg() -> kickmsg.Config: + cfg = kickmsg.Config() + cfg.max_subscribers = 4 + cfg.sub_ring_capacity = 8 + cfg.pool_size = 16 + cfg.max_payload_size = 256 + return cfg diff --git a/tests/python/test_lifetime.py b/tests/python/test_lifetime.py new file mode 100644 index 0000000..9ae4997 --- /dev/null +++ b/tests/python/test_lifetime.py @@ -0,0 +1,78 @@ +"""Lifetime edge cases — do the keep_alive chains hold and does GC +release slot pins? + +These tests guard the subtle nanobind wiring: + Publisher/Subscriber → SharedRegion (keep_alive<1,2> on __init__) + Node-returned Pub/Sub → Node (keep_alive<0,1> on factory methods) + SampleView → Subscriber (keep_alive<0,1> on try_receive_view) + BroadcastHandle.pub/sub → handle → Node + (reference_internal + transitive) +""" + +from __future__ import annotations + +import gc + +import kickmsg + + +def test_broadcasthandle_pub_extracted_from_temporary(small_cfg): + """`pub = node.join_broadcast(...).pub` drops the handle reference + immediately. The chain Publisher → BroadcastHandle → Node must + survive so the publisher can still send after the GC cycles. + """ + a = kickmsg.Node("a", "pytest_lt_bc") + b = kickmsg.Node("b", "pytest_lt_bc") + try: + pub = a.join_broadcast("events", small_cfg).pub + sub = b.join_broadcast("events", small_cfg).sub + + gc.collect() # force any intermediate references to drop + gc.collect() + + assert pub.send(b"survived-gc") > 0 + assert sub.try_receive() == b"survived-gc" + finally: + a.unlink_broadcast("events") + + +def test_sample_view_gc_releases_pin(shm_name, small_cfg): + """Take a zero-copy view, discard it without calling .release(), + force GC, then verify the slot came back to the pool by publishing + pool_size more messages successfully. + """ + region = kickmsg.SharedRegion.create( + shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + pub.send(b"ephemeral") + view = sub.try_receive_view() + assert view is not None + _ = bytes(view.data()) + + # Drop the reference — Python GC should run SampleView's C++ dtor, + # which in turn drops the slot pin. + view = None + gc.collect() + + # Pool should be clean — pool_size more publishes must succeed. + for i in range(small_cfg.pool_size): + assert pub.send(f"msg-{i}".encode()) > 0 + + +def test_publisher_outlives_shared_region_python_reference(shm_name, small_cfg): + """Drop the Python SharedRegion reference right after constructing + the Publisher. The keep_alive<1,2> on Publisher(region) must hold + the mmap alive — otherwise the publisher's send() would touch + freed memory. + """ + region = kickmsg.SharedRegion.create( + shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + region = None # only pub + sub keep the mapping alive now + gc.collect() + + assert pub.send(b"still alive") > 0 + assert sub.try_receive() == b"still alive" diff --git a/tests/python/test_node.py b/tests/python/test_node.py new file mode 100644 index 0000000..406fef2 --- /dev/null +++ b/tests/python/test_node.py @@ -0,0 +1,81 @@ +"""Node API coverage — advertise/subscribe, broadcast, mailbox, schema forwarding.""" + +from __future__ import annotations + +import kickmsg + + +def test_advertise_and_subscribe_on_one_node(small_cfg): + # Each Node test uses unique prefixes to avoid SHM name collisions + # across test runs. unlink_topic in tear-down keeps the namespace clean. + node = kickmsg.Node("tn", "pytest_adv") + try: + pub = node.advertise("data", small_cfg) + sub = node.subscribe("data") + assert pub.send(b"payload") > 0 + assert sub.try_receive() == b"payload" + finally: + node.unlink_topic("data") + + +def test_subscribe_or_create_late_binding(small_cfg): + listener = kickmsg.Node("listener", "pytest_late") + driver = kickmsg.Node("driver", "pytest_late") + try: + sub = listener.subscribe_or_create("imu", small_cfg) + pub = driver.advertise_or_join("imu", small_cfg) + pub.send(b"imu-data") + assert sub.try_receive() == b"imu-data" + finally: + listener.unlink_topic("imu") + + +def test_join_broadcast_two_nodes(small_cfg): + a = kickmsg.Node("a", "pytest_bc") + b = kickmsg.Node("b", "pytest_bc") + try: + ha = a.join_broadcast("events", small_cfg) + hb = b.join_broadcast("events", small_cfg) + ha.pub.send(b"from-a") + assert hb.sub.try_receive() == b"from-a" + assert ha.sub.try_receive() == b"from-a" + finally: + a.unlink_broadcast("events") + + +def test_mailbox_pattern(small_cfg): + owner = kickmsg.Node("owner", "pytest_mbx") + caller = kickmsg.Node("caller", "pytest_mbx") + try: + inbox = owner.create_mailbox("inbox", small_cfg) + send = caller.open_mailbox("owner", "inbox") + send.send(b"ping") + assert inbox.try_receive() == b"ping" + finally: + owner.unlink_mailbox("inbox") + + +def test_topic_schema_roundtrip_via_node(small_cfg): + pub_node = kickmsg.Node("p", "pytest_schema") + sub_node = kickmsg.Node("s", "pytest_schema") + try: + small_cfg.schema = kickmsg.SchemaInfo() + small_cfg.schema.name = "demo/Type" + small_cfg.schema.version = 7 + small_cfg.schema.identity = bytes([0x5A]) * 64 + small_cfg.schema.layout = b"\x00" * 64 + _ = pub_node.advertise("topic", small_cfg) + + # Must be able to read back via *either* node. + _ = sub_node.subscribe("topic") + got = sub_node.topic_schema("topic") + assert got is not None + assert got.name == "demo/Type" + assert got.version == 7 + finally: + pub_node.unlink_topic("topic") + + +def test_topic_schema_none_for_unknown_topic(): + node = kickmsg.Node("orphan", "pytest_unknown") + assert node.topic_schema("never_joined") is None diff --git a/tests/python/test_pubsub.py b/tests/python/test_pubsub.py new file mode 100644 index 0000000..d140908 --- /dev/null +++ b/tests/python/test_pubsub.py @@ -0,0 +1,58 @@ +"""Basic pub/sub roundtrip tests via the SharedRegion + Publisher/Subscriber API.""" + +from __future__ import annotations + +import kickmsg + + +def test_send_and_try_receive_roundtrip(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + assert pub.send(b"hello") > 0 + got = sub.try_receive() + assert got == b"hello" + # Drained — next try_receive returns None. + assert sub.try_receive() is None + + +def test_try_receive_is_none_on_empty(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + sub = kickmsg.Subscriber(region) + assert sub.try_receive() is None + + +def test_receive_with_timeout_returns_none(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + sub = kickmsg.Subscriber(region) + # 10 ms timeout, no publisher — must return None without throwing. + got = sub.receive(10_000_000) + assert got is None + + +def test_receive_with_timeout_delivers(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + pub.send(b"payload") + got = sub.receive(100_000_000) # 100 ms + assert got == b"payload" + + +def test_multiple_subscribers_receive_same_message(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub_a = kickmsg.Subscriber(region) + sub_b = kickmsg.Subscriber(region) + + pub.send(b"fan-out") + assert sub_a.try_receive() == b"fan-out" + assert sub_b.try_receive() == b"fan-out" + + +def test_send_larger_than_max_payload_fails(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + oversize = b"x" * (small_cfg.max_payload_size + 1) + assert pub.send(oversize) < 0 # EMSGSIZE-style negative return diff --git a/tests/python/test_schema.py b/tests/python/test_schema.py new file mode 100644 index 0000000..e1691f4 --- /dev/null +++ b/tests/python/test_schema.py @@ -0,0 +1,68 @@ +"""Schema descriptor + diff + hash helpers.""" + +from __future__ import annotations + +import kickmsg + + +def make_schema(name: str, version: int, identity_byte: int = 0xAA) -> kickmsg.SchemaInfo: + info = kickmsg.SchemaInfo() + info.name = name + info.version = version + info.identity = bytes([identity_byte]) * 64 + info.layout = b"\x00" * 64 + info.identity_algo = 1 + info.layout_algo = 0 + info.flags = 0 + return info + + +def test_fnv1a_64_is_deterministic(): + # Canonical FNV-1a 64-bit reference vectors. + assert kickmsg.hash.fnv1a_64(b"") == 0xCBF29CE484222325 + assert kickmsg.hash.fnv1a_64(b"a") == 0xAF63DC4C8601EC8C + assert kickmsg.hash.fnv1a_64(b"foobar") == 0x85944171F73967E8 + + +def test_identity_from_fnv1a_pads_zero(): + identity = kickmsg.hash.identity_from_fnv1a("demo/Imu") + assert len(identity) == 64 + # Remaining bytes after the leading 8 are zero. + assert identity[8:] == b"\x00" * 56 + + +def test_schema_baked_at_create(shm_name, small_cfg): + small_cfg.schema = make_schema("my/Pose", 2, 0xAB) + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + + got = region.schema() + assert got is not None + assert got.name == "my/Pose" + assert got.version == 2 + assert got.identity[0] == 0xAB + + +def test_schema_diff_detects_mismatch(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + claimed = make_schema("demo/Type", 1, 0x11) + assert region.try_claim_schema(claimed) is True + + expected_v2 = make_schema("demo/Type", 2, 0x11) + got = region.schema() + assert got is not None + + d = kickmsg.schema.diff(got, expected_v2) + assert d & int(kickmsg.schema.Diff.Version) + assert not (d & int(kickmsg.schema.Diff.Identity)) + assert not (d & int(kickmsg.schema.Diff.Name)) + + +def test_schema_reset_recovers_wedged_claiming(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + # No direct API to force Claiming state from Python — verify the normal + # reset path is callable and returns False on an unset region. + assert region.reset_schema_claim() is False + claimed = make_schema("done/Type", 1) + assert region.try_claim_schema(claimed) is True + # Already Set → reset is a no-op. + assert region.reset_schema_claim() is False diff --git a/tests/python/test_threading.py b/tests/python/test_threading.py new file mode 100644 index 0000000..b609f13 --- /dev/null +++ b/tests/python/test_threading.py @@ -0,0 +1,43 @@ +"""GIL release on blocking receive — a consumer thread blocked in +Subscriber.receive(timeout) must not stall a concurrent producer thread. +""" + +from __future__ import annotations + +import threading + +import kickmsg + + +def test_blocking_receive_does_not_stall_other_thread(shm_name, small_cfg): + region = kickmsg.SharedRegion.create( + shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + received: list[bytes] = [] + consumer_started = threading.Event() + + def consumer(): + consumer_started.set() + # 500 ms is plenty — GIL release lets the producer thread publish + # immediately; the futex wakes us up on commit; total wall time + # should be milliseconds, not approaching the timeout. + got = sub.receive(500_000_000) + if got is not None: + received.append(got) + + t = threading.Thread(target=consumer) + t.start() + # Wait until the consumer is in its receive() call — without GIL + # release, our next line would deadlock here. + assert consumer_started.wait(timeout=1.0) + + # Give the consumer a moment to actually enter the blocking call. + # Without GIL release, this send would never run until the receive + # timed out. With GIL release, it runs immediately. + pub.send(b"unblocked") + + t.join(timeout=2.0) + assert not t.is_alive(), "consumer thread did not return — GIL not released?" + assert received == [b"unblocked"] diff --git a/tests/python/test_zerocopy.py b/tests/python/test_zerocopy.py new file mode 100644 index 0000000..5c75b8f --- /dev/null +++ b/tests/python/test_zerocopy.py @@ -0,0 +1,98 @@ +"""Zero-copy publish (allocate/publish) and zero-copy receive (SampleView).""" + +from __future__ import annotations + +import kickmsg + + +def test_publisher_allocate_returns_writable_memoryview(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + # Reserve a slot, write directly into the shared-memory page, then publish. + buf = pub.allocate(16) + assert buf is not None + assert not buf.readonly + buf[:5] = b"hello" + buf[5:16] = b"-zerocopy!!" # 11 bytes + assert pub.publish() > 0 + + got = sub.try_receive() + assert got == b"hello-zerocopy!!" + + +def test_subscriber_view_buffer_protocol_is_readonly(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + pub.send(b"hello-view") + view = sub.try_receive_view() + assert view is not None + assert view.valid() + assert view.len() == len(b"hello-view") + + mv = view.data() + # memoryview directly into SHM — must be read-only. + assert mv.readonly + assert bytes(mv) == b"hello-view" + + +def test_view_release_returns_slot_to_pool(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + # Publish + consume a zero-copy view, then explicitly release the pin. + # Publishing `pool_size` more messages afterwards must still succeed + # (the ring/pool isn't wedged). + pub.send(b"ephemeral") + view = sub.try_receive_view() + assert view is not None + _ = bytes(view.data()) + view.release() + + for i in range(small_cfg.pool_size): + assert pub.send(f"msg-{i}".encode()) > 0 + + +def test_zerocopy_camera_frame_pattern(shm_name): + """Mimic the robot-camera use case: + allocate directly into SHM, fill (here with a byte pattern; in real + code this would be a memcpy from V4L2 / OpenCV / DMA), publish, + receive as zero-copy memoryview, interpret as bytes without any + intermediate copy on either side. + """ + cfg = kickmsg.Config() + cfg.max_subscribers = 2 + cfg.sub_ring_capacity = 4 + cfg.pool_size = 8 + cfg.max_payload_size = 640 * 480 * 3 # RGB VGA + + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, cfg, "camera") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + frame_size = 320 * 240 * 3 # smaller test frame + buf = pub.allocate(frame_size) + assert buf is not None + # Fill with a pattern — cheap, full write coverage. + for i in range(0, frame_size, 4096): + chunk = min(4096, frame_size - i) + buf[i:i + chunk] = bytes([(i + j) & 0xFF for j in range(chunk)]) + pub.publish() + + view = sub.try_receive_view() + try: + assert view is not None + assert view.len() == frame_size + # Spot-check some bytes without copying the whole frame. + mv = view.data() + assert mv[0] == 0 + assert mv[1] == 1 + assert mv[4095] == 255 + assert mv[frame_size - 1] == (frame_size - 1) & 0xFF + finally: + if view is not None: + view.release() diff --git a/tools/setup/version.sh b/tools/setup/version.sh new file mode 100755 index 0000000..3b9214d --- /dev/null +++ b/tools/setup/version.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# Default version +VERSION="0.0.0" + +# Check if we're inside a git repository +if git rev-parse --git-dir > /dev/null 2>&1; then + # Try to get the exact tag for HEAD + TAG=$(git describe --tags --exact-match 2>/dev/null || true) + + if [[ -n "$TAG" ]]; then + # HEAD is exactly at a tag. Strip a leading "v" so conventional + # "v0.1.0" tags map to PEP 440-compliant "0.1.0" — PyPI rejects + # metadata with a leading "v" in [project] version. + VERSION="${TAG#v}" + fi +fi + +# Export the VERSION variable +export VERSION +echo "VERSION set to: $VERSION" From 29254f37f26ce8b52feb5d34a7dac9d25ecae193 Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Mon, 13 Apr 2026 17:36:06 +0200 Subject: [PATCH 2/5] working --- cmake/toolchain.cmake.template | 3 ++ scripts/configure.sh | 18 +++---- scripts/lib/build_options.sh | 93 +++++++++++++++++++++------------- scripts/setup_build.sh | 76 +++++++++++++++------------ tools/setup/detect_compiler.sh | 82 +++++++++++++++++++++--------- 5 files changed, 173 insertions(+), 99 deletions(-) diff --git a/cmake/toolchain.cmake.template b/cmake/toolchain.cmake.template index 75e95fe..fb297eb 100644 --- a/cmake/toolchain.cmake.template +++ b/cmake/toolchain.cmake.template @@ -2,3 +2,6 @@ set(CMAKE_SYSTEM_NAME @SYSTEM_NAME@) set(CMAKE_C_COMPILER @BINARY_PATH_CC@) set(CMAKE_CXX_COMPILER @BINARY_PATH_CXX@) + +set(CMAKE_C_FLAGS_INIT "@ARCH_FLAGS@") +set(CMAKE_CXX_FLAGS_INIT "@ARCH_FLAGS@") diff --git a/scripts/configure.sh b/scripts/configure.sh index 48b94e7..148d282 100755 --- a/scripts/configure.sh +++ b/scripts/configure.sh @@ -23,7 +23,7 @@ Options: --show Show current configuration and exit -h, --help Show this help message and exit -Features: ${!OPT_DEFAULTS[*]} +Features: ${OPT_NAMES[*]} Examples: $0 build --with=unit_tests @@ -82,19 +82,19 @@ for action in "${ACTIONS[@]}"; do exit 0 ;; with:all) - for key in "${!CONFIG[@]}"; do - CONFIG[$key]=ON + for name in "${OPT_NAMES[@]}"; do + config_set "$name" ON done ;; without:all) - for key in "${!CONFIG[@]}"; do - CONFIG[$key]=OFF + for name in "${OPT_NAMES[@]}"; do + config_set "$name" OFF done ;; with:*) key="${action#with:}" - if [[ -v "OPT_DEFAULTS[$key]" ]]; then - CONFIG[$key]=ON + if opt_is_known "$key"; then + config_set "$key" ON else error "Unknown feature: $key" exit 1 @@ -102,8 +102,8 @@ for action in "${ACTIONS[@]}"; do ;; without:*) key="${action#without:}" - if [[ -v "OPT_DEFAULTS[$key]" ]]; then - CONFIG[$key]=OFF + if opt_is_known "$key"; then + config_set "$key" OFF else error "Unknown feature: $key" exit 1 diff --git a/scripts/lib/build_options.sh b/scripts/lib/build_options.sh index 2cd389a..7c386e8 100644 --- a/scripts/lib/build_options.sh +++ b/scripts/lib/build_options.sh @@ -1,47 +1,70 @@ #!/bin/bash # Single source of truth for kickmsg build options. +# +# Uses parallel arrays (not associative arrays / `declare -A`) so the +# script works on macOS's system bash 3.2 — associative arrays are a +# bash 4+ feature, and macOS ships 3.2 for GPL-licensing reasons. Each +# option occupies the same index across all OPT_* arrays. -declare -A OPT_DEFAULTS=( - [unit_tests]=OFF - [benchmarks]=OFF - [examples]=OFF - [tsan]=OFF -) +OPT_NAMES=( unit_tests benchmarks examples tsan ) +OPT_DEFAULTS=( OFF OFF OFF OFF ) +OPT_DESCRIPTIONS=( "Build unit tests" "Build benchmarks (requires Google Benchmark)" "Build examples" "Enable ThreadSanitizer" ) +OPT_CMAKE_FLAGS=( BUILD_UNIT_TESTS BUILD_BENCHMARKS BUILD_EXAMPLES ENABLE_TSAN ) -declare -A OPT_DESCRIPTIONS=( - [unit_tests]="Build unit tests" - [benchmarks]="Build benchmarks (requires Google Benchmark)" - [examples]="Build examples" - [tsan]="Enable ThreadSanitizer" -) +# Conan flags: only options that actually gate a Conan-provided dep. +# Empty slot = option has no Conan side effect. +OPT_CONAN_FLAGS=( unit_tests benchmarks "" "" ) -declare -A OPT_CMAKE_FLAGS=( - [unit_tests]="BUILD_UNIT_TESTS" - [benchmarks]="BUILD_BENCHMARKS" - [examples]="BUILD_EXAMPLES" - [tsan]="ENABLE_TSAN" -) +# CONFIG_VALUES is a parallel array keyed by the same index as OPT_NAMES. +# Callers never index it directly — use config_get / config_set. +CONFIG_VALUES=() -# Only options that gate a Conan dependency -declare -A OPT_CONAN_FLAGS=( - [unit_tests]="unit_tests" - [benchmarks]="benchmarks" -) +# Return the index of $1 in OPT_NAMES, or empty if not present. +opt_index() { + local name="$1" i + for i in "${!OPT_NAMES[@]}"; do + if [[ "${OPT_NAMES[$i]}" == "$name" ]]; then + echo "$i" + return 0 + fi + done + return 1 +} -# Load .buildconfig into the CONFIG associative array. -load_buildconfig() { - declare -gA CONFIG - for key in "${!OPT_DEFAULTS[@]}"; do - CONFIG[$key]="${OPT_DEFAULTS[$key]}" +opt_is_known() { opt_index "$1" >/dev/null; } + +opt_default() { local i; i=$(opt_index "$1") && echo "${OPT_DEFAULTS[$i]}"; } +opt_description() { local i; i=$(opt_index "$1") && echo "${OPT_DESCRIPTIONS[$i]}"; } +opt_cmake_flag() { local i; i=$(opt_index "$1") && echo "${OPT_CMAKE_FLAGS[$i]}"; } +opt_conan_flag() { local i; i=$(opt_index "$1") && echo "${OPT_CONAN_FLAGS[$i]}"; } + +config_init() { + CONFIG_VALUES=() + local d + for d in "${OPT_DEFAULTS[@]}"; do + CONFIG_VALUES+=("$d") done +} + +config_get() { local i; i=$(opt_index "$1") && echo "${CONFIG_VALUES[$i]}"; } +config_set() { + local i + i=$(opt_index "$1") || return 1 + CONFIG_VALUES[$i]="$2" +} + +# Load .buildconfig into CONFIG_VALUES. Unknown keys are warned-and-ignored. +load_buildconfig() { + config_init if [ -f "$CONFIG_FILE" ]; then + local key value while IFS='=' read -r key value; do [[ -z "$key" || "$key" == \#* ]] && continue key=$(echo "$key" | xargs) value=$(echo "$value" | xargs) - if [[ -v "OPT_DEFAULTS[$key]" ]]; then - CONFIG[$key]="$value" + if opt_is_known "$key"; then + config_set "$key" "$value" else warn "Unknown option in $CONFIG_FILE: $key (ignored)" fi @@ -53,8 +76,9 @@ save_buildconfig() { { echo "# kickmsg build configuration" echo "# Generated by scripts/configure.sh" - for key in $(echo "${!CONFIG[@]}" | tr ' ' '\n' | sort); do - echo "$key=${CONFIG[$key]}" + local name + for name in "${OPT_NAMES[@]}"; do + echo "$name=$(config_get "$name")" done } > "$CONFIG_FILE" } @@ -65,7 +89,8 @@ show_buildconfig() { else info "Current configuration ($CONFIG_FILE):" fi - for key in $(echo "${!CONFIG[@]}" | tr ' ' '\n' | sort); do - printf " %-20s %s\n" "$key" "${CONFIG[$key]}" + local name + for name in "${OPT_NAMES[@]}"; do + printf " %-20s %s\n" "$name" "$(config_get "$name")" done } diff --git a/scripts/setup_build.sh b/scripts/setup_build.sh index 15d54a2..5ad3380 100755 --- a/scripts/setup_build.sh +++ b/scripts/setup_build.sh @@ -101,25 +101,24 @@ load_buildconfig if [ -f "$CONFIG_FILE" ]; then step "Reading build configuration" - for key in $(echo "${!CONFIG[@]}" | tr ' ' '\n' | sort); do - info " $key = ${CONFIG[$key]}" + for key in "${OPT_NAMES[@]}"; do + info " $key = $(config_get "$key")" done else info "No .buildconfig found, using defaults (run scripts/configure.sh to customize)" fi -# Map .buildconfig values to Conan -o flags +# Map .buildconfig values to Conan -o flags / CMake -D flags. to_conan_bool() { [[ "$1" == "ON" ]] && echo "True" || echo "False"; } CONAN_OPTIONS="" -for key in "${!OPT_CONAN_FLAGS[@]}"; do - CONAN_OPTIONS+=" -o &:${OPT_CONAN_FLAGS[$key]}=$(to_conan_bool "${CONFIG[$key]}")" -done - -# Map .buildconfig values to CMake -D flags CMAKE_OPTIONS="" -for key in "${!OPT_CMAKE_FLAGS[@]}"; do - CMAKE_OPTIONS+=" -D${OPT_CMAKE_FLAGS[$key]}=${CONFIG[$key]}" +for key in "${OPT_NAMES[@]}"; do + value=$(config_get "$key") + cmake_flag=$(opt_cmake_flag "$key") + conan_flag=$(opt_conan_flag "$key") + [ -n "$cmake_flag" ] && CMAKE_OPTIONS+=" -D${cmake_flag}=${value}" + [ -n "$conan_flag" ] && CONAN_OPTIONS+=" -o &:${conan_flag}=$(to_conan_bool "$value")" done # Compiler detection & profile generation @@ -189,12 +188,11 @@ if [ -n "$CROSS_TARGET" ]; then -e "s|@SYSTEM_NAME@|${SYSTEM_NAME}|g" \ -e "s|@BINARY_PATH_CC@|$(command -v $CROSS_CC)|g" \ -e "s|@BINARY_PATH_CXX@|$(command -v $CROSS_CXX)|g" \ + -e "s|@ARCH_FLAGS@|${CROSS_ARCH_FLAGS}|g" \ "$TEMPLATE_CMAKE_TOOLCHAIN" > "$OUTPUT_CMAKE_TOOLCHAIN" cat >> "$OUTPUT_CMAKE_TOOLCHAIN" < /dev/null 2>&1 + ARCH_NAME=$(conan profile show -cx host | grep "arch=" | cut -d'=' -f2) + + info "Architecture for Conan: $ARCH_NAME" + + OS=$(uname -s) + if [[ "$OS" == "Darwin" ]]; then + OS_NAME="Macos" + COMPILER_NAME="apple-clang" + LIBCXX_NAME="libc++" + SYSTEM_NAME="Darwin" + ARCH_NAME="armv8\|x86_64" + else + OS_NAME="Linux" + COMPILER_NAME="gcc" + LIBCXX_NAME="libstdc++11" + SYSTEM_NAME="Linux" + fi - ARCH_NAME=$(detect_conan_arch) - info "Architecture: $(uname -m) -> Conan arch: $ARCH_NAME" + ARCH_FLAGS="" sed \ - -e "s|@SYSTEM_NAME@|Linux|g" \ + -e "s|@SYSTEM_NAME@|${SYSTEM_NAME}|g" \ -e "s|@BINARY_PATH_CC@|$(command -v $GREATEST_CC)|g" \ -e "s|@BINARY_PATH_CXX@|$(command -v $GREATEST_CXX)|g" \ + -e "s|@ARCH_FLAGS@|${ARCH_FLAGS}|g" \ "$TEMPLATE_CMAKE_TOOLCHAIN" > "$OUTPUT_CMAKE_TOOLCHAIN" sed \ - -e "s|@OS_NAME@|Linux|g" \ + -e "s|@OS_NAME@|${OS_NAME}|g" \ -e "s|@ARCH_NAME@|${ARCH_NAME}|g" \ - -e "s|@COMPILER_NAME@|gcc|g" \ - -e "s|@LIBCXX_NAME@|libstdc++11|g" \ + -e "s|@COMPILER_NAME@|${COMPILER_NAME}|g" \ + -e "s|@LIBCXX_NAME@|${LIBCXX_NAME}|g" \ -e "s|@MAJOR_VERSION@|${GREATEST_VERSION}|g" \ -e "s|@BINARY_PATH_CC@|$(command -v $GREATEST_CC)|g" \ -e "s|@BINARY_PATH_CXX@|$(command -v $GREATEST_CXX)|g" \ "$TEMPLATE_CONAN_PROFILE" > "$OUTPUT_CONAN_PROFILE" + if [[ "$OS" == "Darwin" ]]; then + echo "OSX_ARCH_VARIANTS=x86_64;arm64" >> "$OUTPUT_CONAN_PROFILE" + + echo "" >> "$OUTPUT_CONAN_PROFILE" + echo "[platform_tool_requires]" >> "$OUTPUT_CONAN_PROFILE" + + fi + cat "$OUTPUT_CONAN_PROFILE" + CONAN_PROFILE_ARGS="-pr $OUTPUT_CONAN_PROFILE -pr:b $OUTPUT_CONAN_PROFILE" fi diff --git a/tools/setup/detect_compiler.sh b/tools/setup/detect_compiler.sh index 234e05b..620b9a4 100755 --- a/tools/setup/detect_compiler.sh +++ b/tools/setup/detect_compiler.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Detect the highest-versioned GCC on this system. +# Detect the best native compiler on this system. # Meant to be sourced; exports GREATEST_CC, GREATEST_CXX, GREATEST_VERSION. # Ensure log helpers are available (idempotent if already sourced) @@ -8,39 +8,73 @@ if ! command -v info &>/dev/null; then source "$_DETECT_DIR/../../scripts/lib/log.sh" fi -info "Scanning for GCC installations..." +# Detect OS +OS=$(uname -s) -declare -A GCC_MAP +if [[ "$OS" == "Linux" ]]; then + echo "Scanning for GCC installations..." + declare -A GCC_MAP -GCC_LIST=$(compgen -c | grep -E '^gcc(-[0-9]+(\.[0-9]+)*)?$' | sort -u) + GCC_LIST=$(compgen -c | grep -E '^gcc(-[0-9]+(\.[0-9]+)*)?$' | sort -u) -if [ -z "$GCC_LIST" ]; then - error "No GCC installations found!" - exit 1 -fi + if [ -z "$GCC_LIST" ]; then + echo "!!! No GCC installations found !!!" + exit 1 + fi -for gcc_bin in $GCC_LIST; do - if command -v "$gcc_bin" &>/dev/null; then - version=$("$gcc_bin" -dumpfullversion -dumpversion 2>/dev/null) - if [[ -z "$version" ]]; then - version=$("$gcc_bin" --version | head -n1 | awk '{print $3}') + for gcc_bin in $GCC_LIST; do + if command -v "$gcc_bin" &>/dev/null; then + version=$("$gcc_bin" -dumpfullversion -dumpversion 2>/dev/null) + if [[ -z "$version" ]]; then + version=$("$gcc_bin" --version | head -n1 | awk '{print $3}') + fi + echo " * Found: $gcc_bin ($version)" + GCC_MAP["$version"]=$gcc_bin fi + done + + GREATEST_VERSION_FULL=$(printf "%s\n" "${!GCC_MAP[@]}" | sort -V | tail -n1) + GREATEST_CC="${GCC_MAP[$GREATEST_VERSION_FULL]}" + + IFS='.' read -r major minor patch <<< "$GREATEST_VERSION_FULL" + GREATEST_VERSION="$major.$minor" + + if command -v "${GREATEST_CC/gcc/g++}" &>/dev/null; then + GREATEST_CXX="${GREATEST_CC/gcc/g++}" + else + GREATEST_CXX="g++" + fi + + echo + echo "--> Greatest GCC version detected: $GREATEST_VERSION ($GREATEST_CC/$GREATEST_CXX)" + +elif [[ "$OS" == "Darwin" ]]; then + echo "Detecting macOS compilers..." - info " Found: $gcc_bin ($version)" - GCC_MAP["$version"]=$gcc_bin + # Use Apple Clang by default + GREATEST_CC=$(command -v clang) + GREATEST_CXX=$(command -v clang++) + + if [[ -z "$GREATEST_CC" ]] || [[ -z "$GREATEST_CXX" ]]; then + echo "!!! Clang not found !!!" + exit 1 fi -done -GREATEST_VERSION_FULL=$(printf "%s\n" "${!GCC_MAP[@]}" | sort -V | tail -n1) -GREATEST_CC="${GCC_MAP[$GREATEST_VERSION_FULL]}" + # Get full version + version_full=$("$GREATEST_CC" --version | head -n1 | awk '{print $4}') + + # Extract major.minor only (17.0.0 -> 17.0) + IFS='.' read -r major minor patch <<< "$version_full" + GREATEST_VERSION="$major.$minor" -IFS='.' read -r major minor patch <<< "$GREATEST_VERSION_FULL" -GREATEST_VERSION="$major.$minor" + echo " * Found: $GREATEST_CC ($version_full)" + echo " * Found: $GREATEST_CXX" + echo + echo "--> Using Apple Clang: $GREATEST_VERSION ($GREATEST_CC/$GREATEST_CXX)" -if command -v "${GREATEST_CC/gcc/g++}" &>/dev/null; then - GREATEST_CXX="${GREATEST_CC/gcc/g++}" else - GREATEST_CXX="g++" + echo "Unsupported OS: $OS" + exit 1 fi -success "Selected GCC $GREATEST_VERSION ($GREATEST_CC/$GREATEST_CXX)" +echo "" From b3b8e66ed0704335f4b14bb47fffb66fbc0d203b Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Mon, 13 Apr 2026 18:42:58 +0200 Subject: [PATCH 3/5] Better bindings --- examples/python/hello_camera_zerocopy.py | 44 +-- examples/python/hello_schema.py | 2 +- py_bindings/src/kickmsg_py.cc | 393 +++++++++++++++++------ tests/python/test_lifetime.py | 2 +- tests/python/test_pubsub.py | 31 +- tests/python/test_schema.py | 6 +- tests/python/test_zerocopy.py | 166 ++++++++-- 7 files changed, 488 insertions(+), 156 deletions(-) diff --git a/examples/python/hello_camera_zerocopy.py b/examples/python/hello_camera_zerocopy.py index b548f5b..67a98b6 100644 --- a/examples/python/hello_camera_zerocopy.py +++ b/examples/python/hello_camera_zerocopy.py @@ -1,17 +1,20 @@ """Zero-copy camera-frame publishing — the use case the Python bindings' -Publisher.allocate / Subscriber.try_receive_view path was designed for. +AllocatedSlot / SampleView buffer-protocol path was designed for. Shows how to: - 1. Reserve a slot directly in shared memory via Publisher.allocate(size) - and fill it in-place (no intermediate buffer). Here we simulate a - camera frame with a byte pattern; in a real pipeline you'd: - - cv2.imdecode / numpy.copyto into the slot, or + 1. Reserve a slot directly in shared memory via Publisher.allocate(size), + then obtain a writable memoryview with `memoryview(slot)` and fill it + in-place (no intermediate buffer). In a real pipeline you'd: + - numpy.copyto(np.asarray(slot).reshape(H, W, 3), frame), or - DMA from a V4L2 buffer into the slot, or - render directly into the slot. + Then call `slot.publish()` to commit. 2. Receive it zero-copy on the other side via Subscriber.try_receive_view - and read the bytes through a read-only memoryview — no copy either. + and `memoryview(view)` — read-only, no copy either. The memoryview + pins the SampleView alive so the slot stays valid until every + consumer releases the view. -For a real camera, swap the `fill_frame` lines for your actual capture. +For a real camera, swap `fill_frame` for your actual capture. """ from __future__ import annotations @@ -48,34 +51,31 @@ def main() -> int: sub = viewer.subscribe("frames") for i in range(3): - # Zero-copy capture: get a writable memoryview into the SHM slot, - # fill it in place, publish. Nothing is copied on the publish side. - buf = pub.allocate(frame_bytes) - if buf is None: + # Zero-copy capture: reserve a slot, write directly into SHM via + # a memoryview, publish. Nothing is copied on the publish side. + slot = pub.allocate(frame_bytes) + if slot is None: print(f"frame {i}: pool exhausted, dropping") continue - fill_frame(buf, height, width) - pub.publish() + fill_frame(memoryview(slot), height, width) + slot.publish() print(f"Published frame {i} ({width}x{height} RGB, {frame_bytes} B zero-copy)") for i in range(3): view = sub.try_receive_view() if view is None: break - try: - mv = view.data() - # Spot-check: no memcpy performed on the receive side either. - print(f"Received frame {i}: {view.len()} B, " + # `with` releases the pin on block exit, even on exception. + # Without this, the slot stays pinned until the SampleView and + # every derived memoryview are GC'd — the pool could run dry. + with view: + mv = memoryview(view) # read-only, zero-copy + print(f"Received frame {i}: {len(view)} B, " f"pixel[0,0]=RGB({mv[0]},{mv[1]},{mv[2]}), " f"pixel[{height-1},{width-1}]=" f"RGB({mv[(height-1)*width*3 + (width-1)*3]}," f"{mv[(height-1)*width*3 + (width-1)*3 + 1]}," f"{mv[(height-1)*width*3 + (width-1)*3 + 2]})") - finally: - # Drop the slot pin as soon as we're done reading. Without - # this, the slot stays pinned until the SampleView is GC'd, - # and the pool could run dry. - view.release() camera.unlink_topic("frames") print("Done.") diff --git a/examples/python/hello_schema.py b/examples/python/hello_schema.py index 77534aa..d8b9401 100644 --- a/examples/python/hello_schema.py +++ b/examples/python/hello_schema.py @@ -50,7 +50,7 @@ def main() -> int: got = bad.topic_schema(topic) expected_v1 = make_imu_schema(version=1) d = kickmsg.schema.diff(got, expected_v1) - if d & int(kickmsg.schema.Diff.Version): + if d & kickmsg.schema.Diff.Version: print("[bad_sub] version mismatch (observed v%d, expected v1) — refusing" % got.version) diff --git a/py_bindings/src/kickmsg_py.cc b/py_bindings/src/kickmsg_py.cc index 72de929..49a5896 100644 --- a/py_bindings/src/kickmsg_py.cc +++ b/py_bindings/src/kickmsg_py.cc @@ -8,10 +8,10 @@ /// SchemaInfo — payload schema descriptor /// HealthReport — SharedRegion::diagnose() result /// SharedRegion — factory methods + schema/health/repair -/// Publisher — send(bytes) and allocate()/publish() zero-copy +/// Publisher — send(bytes) + allocate() → AllocatedSlot +/// AllocatedSlot — writable zero-copy handle + .publish() /// Subscriber — try_receive / receive (GIL release) / *_view -/// SampleRef — copy-based sample (bytes) -/// SampleView — zero-copy sample (buffer protocol) +/// SampleView — read-only zero-copy sample (buffer protocol) /// BroadcastHandle — NamedTuple-like (pub, sub) /// Node — high-level topic / broadcast / mailbox /// schema (submodule) @@ -21,16 +21,40 @@ /// fnv1a_64(data[, seed]) /// identity_from_fnv1a(descriptor) /// -/// Zero-copy contract: -/// - Publisher.allocate(len) returns a writable memoryview that points -/// directly into a shared-memory slot. The user fills it (memcpy, -/// numpy.copyto, cv2.imencode, etc.), then calls Publisher.publish(). -/// The memoryview is valid only between allocate() and publish(). -/// - SampleView supports the buffer protocol as a read-only memoryview -/// pointing into the same shared-memory slot. The slot stays pinned -/// while the SampleView is alive; release the pin by deleting the -/// SampleView or exiting its `with` block. +/// Zero-copy contract (lifetime-safe via the Python buffer protocol): +/// +/// slot = pub.allocate(N) → AllocatedSlot. memoryview(slot) is a +/// writable view into the SHM slot. The +/// memoryview pins the slot, which pins +/// the Publisher, which pins the mmap — +/// so retained memoryviews stay valid +/// (at the mmap level) as long as Python +/// holds them. +/// slot.publish() → commits. NEW memoryview(slot) after +/// this raises BufferError. Memoryviews +/// obtained BEFORE publish remain pointer- +/// valid but writing through them after +/// publish would corrupt in-flight +/// subscribers — user contract: don't. +/// +/// view = sub.try_receive_view() → SampleView. memoryview(view) is a +/// read-only view into the SHM slot. The +/// memoryview pins the SampleView, which +/// pins the slot's refcount and the mmap. +/// view.release() → drops the pin. NEW memoryview(view) +/// after this raises BufferError. +/// +/// Equivalent context-manager form (preferred for short scopes): +/// with sub.try_receive_view() as view: +/// mv = memoryview(view) +/// ... use mv ... +/// # pin released on block exit, even on exception +/// +/// The pinning is enforced by Py_buffer::obj = self + Py_INCREF inside +/// the buffer-protocol getbuffer slot, so it works with numpy.asarray(), +/// torch.frombuffer(), and any other consumer that respects the protocol. +#include #include #include #include @@ -50,34 +74,122 @@ namespace nb = nanobind; using namespace nb::literals; +namespace kickmsg +{ + // Python-only wrapper around a Publisher reservation. Holds the slot + // pointer + length returned by Publisher::allocate(), exposes the + // writable buffer protocol so `memoryview(slot)` points directly into + // the shared-memory slot (zero-copy), and has a .publish() method + // that commits via the Publisher. + // + // Lifetime: the Py_buffer obtained through buffer protocol pins this + // AllocatedSlot alive (view->obj = self; Py_INCREF), which in turn + // pins the Publisher (via nb::keep_alive<1, 2> on the constructor), + // which pins the SharedRegion mmap. A memoryview retained past + // `.publish()` stays technically valid as a pointer — but any NEW + // memoryview(slot) after publish is refused with BufferError so + // accidental reuse is caught. + struct PyAllocatedSlot + { + Publisher* publisher; + void* ptr; + std::size_t len; + bool published; + + PyAllocatedSlot(Publisher& p, void* data, std::size_t n) + : publisher{&p}, ptr{data}, len{n}, published{false} + { + } + }; +} + namespace { - // Helper: build a read-only memoryview over raw bytes (no copy). - nb::object memview_readonly(void const* ptr, std::size_t len) + // Buffer protocol for Subscriber::SampleView (read-only zero-copy). + // Sets view->obj = self + Py_INCREF so the resulting memoryview pins + // the SampleView alive, which transitively pins the slot refcount + // and the mmap. + int sv_getbuffer(PyObject* self, Py_buffer* view, int /*flags*/) noexcept { - PyObject* obj = PyMemoryView_FromMemory( - reinterpret_cast(const_cast(ptr)), - static_cast(len), PyBUF_READ); - if (obj == nullptr) + using SV = kickmsg::Subscriber::SampleView; + auto* sv = nb::inst_ptr(nb::handle(self)); + + if (not sv->valid()) { - throw nb::python_error(); + PyErr_SetString(PyExc_BufferError, + "SampleView is no longer valid (pin already released)"); + view->obj = nullptr; + return -1; } - return nb::steal(obj); + + view->buf = const_cast(sv->data()); + view->obj = self; + Py_INCREF(self); + view->len = static_cast(sv->len()); + view->itemsize = 1; + view->readonly = 1; + view->ndim = 1; + view->format = nullptr; // defaults to "B" (raw bytes) + view->shape = &view->len; // borrow: lives in the Py_buffer + view->strides = &view->itemsize; + view->suboffsets = nullptr; + view->internal = nullptr; + return 0; } - // Helper: build a writable memoryview over raw bytes (no copy). - nb::object memview_writable(void* ptr, std::size_t len) + void sv_releasebuffer(PyObject* /*self*/, Py_buffer* /*view*/) noexcept { - PyObject* obj = PyMemoryView_FromMemory( - reinterpret_cast(ptr), - static_cast(len), PyBUF_WRITE); - if (obj == nullptr) + // Nothing to free: shape/strides borrow from the Py_buffer itself, + // and Py_DECREF(view->obj) is handled by CPython's memoryview. + } + + PyType_Slot sv_slots[] = { + { Py_bf_getbuffer, reinterpret_cast(sv_getbuffer) }, + { Py_bf_releasebuffer, reinterpret_cast(sv_releasebuffer) }, + { 0, nullptr } + }; + + // Buffer protocol for PyAllocatedSlot (writable zero-copy). Refuses + // new buffer requests once .publish() has been called so stale writes + // don't corrupt messages that are already in flight to subscribers. + int as_getbuffer(PyObject* self, Py_buffer* view, int /*flags*/) noexcept + { + auto* slot = nb::inst_ptr(nb::handle(self)); + + if (slot->published) { - throw nb::python_error(); + PyErr_SetString(PyExc_BufferError, + "AllocatedSlot has already been published; its buffer is " + "no longer writable"); + view->obj = nullptr; + return -1; } - return nb::steal(obj); + + view->buf = slot->ptr; + view->obj = self; + Py_INCREF(self); + view->len = static_cast(slot->len); + view->itemsize = 1; + view->readonly = 0; // writable + view->ndim = 1; + view->format = nullptr; + view->shape = &view->len; + view->strides = &view->itemsize; + view->suboffsets = nullptr; + view->internal = nullptr; + return 0; } + void as_releasebuffer(PyObject* /*self*/, Py_buffer* /*view*/) noexcept + { + } + + PyType_Slot as_slots[] = { + { Py_bf_getbuffer, reinterpret_cast(as_getbuffer) }, + { Py_bf_releasebuffer, reinterpret_cast(as_releasebuffer) }, + { 0, nullptr } + }; + // Convert SchemaInfo.name (fixed-size NUL-terminated char array) to string. std::string schema_name_str(kickmsg::SchemaInfo const& s) { @@ -135,7 +247,7 @@ namespace kickmsg { if (b.size() != s.identity.size()) { - throw std::runtime_error( + throw nb::value_error( "SchemaInfo.identity must be exactly 64 bytes"); } std::memcpy(s.identity.data(), b.c_str(), s.identity.size()); @@ -148,7 +260,7 @@ namespace kickmsg { if (b.size() != s.layout.size()) { - throw std::runtime_error( + throw nb::value_error( "SchemaInfo.layout must be exactly 64 bytes"); } std::memcpy(s.layout.data(), b.c_str(), s.layout.size()); @@ -238,9 +350,9 @@ namespace kickmsg { return SharedRegion::create_or_open(name, type, cfg, creator.c_str()); }, "name"_a, "type"_a, "cfg"_a, "creator"_a = std::string{""}, nb::rv_policy::move) - .def("name", &SharedRegion::name) - .def("channel_type", &SharedRegion::channel_type) - .def("schema", &SharedRegion::schema) + .def_prop_ro("name", &SharedRegion::name) + .def_prop_ro("channel_type", &SharedRegion::channel_type) + .def("schema", &SharedRegion::schema) .def("try_claim_schema", &SharedRegion::try_claim_schema, "info"_a) .def("reset_schema_claim", &SharedRegion::reset_schema_claim) .def("diagnose", &SharedRegion::diagnose) @@ -252,65 +364,100 @@ namespace kickmsg m.def("unlink_shm", [](std::string const& name) { SharedMemory::unlink(name); }, "name"_a, "Unlink a shared-memory entry by name (no-op if absent)."); + // SampleRef (the C++ byte-copy sample) is not bound directly — + // try_receive() / receive() auto-convert it to `bytes` at the + // Python boundary. Users who want ring-position information + // can use try_receive_view() / receive_view() which return + // SampleView (bound below). + // ------------------------------------------------------------------- - // SampleRef — byte-copy sample. - // Returns a bytes copy of the payload (the buffer is subscriber-local - // and reused across try_receive() calls, so copying out is the only - // way to keep the bytes beyond the next receive). + // SampleView — zero-copy, pins the slot. + // + // Supports the Python buffer protocol: `memoryview(view)` returns + // a read-only memoryview pointing directly at shared memory (no + // copy). The memoryview pins the SampleView alive — so retaining + // a memoryview beyond the SampleView's Python reference keeps the + // slot pinned and the mmap valid until the memoryview is released. + // That makes the zero-copy path lifetime-safe by construction. // ------------------------------------------------------------------- - nb::class_(m, "SampleRef") - .def("data", - [](Subscriber::SampleRef const& s) -> nb::bytes + nb::class_(m, "SampleView", + nb::type_slots(sv_slots)) + // __len__ so `len(view)` works; ring_pos / valid as properties + // (no-arg accessors, Pythonic). + .def("__len__", + [](Subscriber::SampleView const& v) -> std::size_t + { return v.len(); }) + .def_prop_ro("ring_pos", &Subscriber::SampleView::ring_pos) + .def_prop_ro("valid", &Subscriber::SampleView::valid) + .def("release", + [](Subscriber::SampleView& v) { - return nb::bytes(reinterpret_cast(s.data()), - s.len()); + // Move-assign a default-constructed view: the old + // state's release() fires via the move-assignment, + // dropping the pin. Subsequent memoryview(view) + // calls fail with BufferError (see sv_getbuffer). + v = Subscriber::SampleView{}; }, - "Return the payload as a bytes object (copies out of the " - "subscriber-local buffer).") - .def("len", &Subscriber::SampleRef::len) - .def("ring_pos", &Subscriber::SampleRef::ring_pos); + "Release the slot pin early. Idempotent; after this, any " + "NEW memoryview(view) call raises BufferError. Memoryviews " + "obtained before .release() remain valid as pointers but " + "should not be used (the pin is gone).") + // Context-manager support: `with view:` releases the pin on + // block exit. + // + // __enter__ returns self with reference_internal rv_policy so + // nanobind resolves to the existing Python wrapper rather + // than constructing a second one around the same C++ object + // (which would double-release on exit). + // + // __exit__ uses nb::args to accept the three positional + // arguments Python's `with` statement passes (exc_type, + // exc_value, traceback) — explicit `(nb::object, nb::object, + // nb::object)` triggers a dispatch error in nanobind's + // multi-arg resolution (nb::args sidesteps it). + .def("__enter__", + [](Subscriber::SampleView& v) -> Subscriber::SampleView& + { return v; }, + nb::rv_policy::reference_internal) + .def("__exit__", + [](Subscriber::SampleView& v, nb::args /*exc_info*/) + { v = Subscriber::SampleView{}; }); // ------------------------------------------------------------------- - // SampleView — zero-copy, pins the slot. - // Buffer protocol via memoryview(view). The view holds a refcount - // pin on the slot; the pin is released when the view is destroyed. + // AllocatedSlot — handle returned by Publisher.allocate(). + // + // Supports the writable buffer protocol: `memoryview(slot)` or + // `numpy.asarray(slot)` gets you a zero-copy writable view of the + // reserved shared-memory slot. Fill it in place, then call + // `slot.publish()` to commit. After publish, any NEW + // memoryview(slot) call raises BufferError. + // + // keep_alive<1, 2>: keep the Publisher (arg 2) alive while this + // slot (self, arg 1) is alive — the slot points into the + // Publisher's mmap and must not outlive it. // ------------------------------------------------------------------- - nb::class_(m, "SampleView") - .def("data", - [](Subscriber::SampleView const& v) -> nb::object + nb::class_(m, "AllocatedSlot", + nb::type_slots(as_slots)) + .def("publish", + [](PyAllocatedSlot& s) -> std::size_t { - if (not v.valid()) + if (s.published) { - return nb::none(); + throw nb::value_error( + "AllocatedSlot.publish() called more than once"); } - return memview_readonly(v.data(), v.len()); + s.published = true; + return s.publisher->publish(); }, - "Return a read-only memoryview pointing directly at the " - "pinned shared-memory slot (zero-copy). Valid while this " - "SampleView is alive.") - .def("len", &Subscriber::SampleView::len) - .def("ring_pos", &Subscriber::SampleView::ring_pos) - .def("valid", &Subscriber::SampleView::valid) - .def("release", - [](Subscriber::SampleView& v) - { - // Destroy-in-place by move-assigning a default-constructed - // view: the release() private helper fires in the - // destructor of the temporary. - v = Subscriber::SampleView{}; - }, - "Release the slot pin early (equivalent to deleting the view).") - // Note on context-manager support: initial attempts at - // __enter__/__exit__ bindings produced nanobind dispatch - // errors due to the interaction between the returned - // reference and nanobind's per-argument type matching. - // Users who want `with` semantics can implement a tiny - // Python wrapper calling .release() — or rely on Python's - // GC to release the pin when the SampleView goes out of - // scope. - ; + "Commit the reserved slot. Returns the number of rings " + "the sample was delivered to. After this call, any NEW " + "memoryview(slot) fails with BufferError.") + .def("__len__", + [](PyAllocatedSlot const& s) -> std::size_t { return s.len; }) + .def_prop_ro("published", + [](PyAllocatedSlot const& s) -> bool { return s.published; }); // ------------------------------------------------------------------- // Publisher @@ -324,30 +471,61 @@ namespace kickmsg .def(nb::init(), "region"_a, nb::keep_alive<1, 2>()) .def("send", - [](Publisher& p, nb::bytes const& data) -> int32_t - { return p.send(data.c_str(), data.size()); }, + [](Publisher& p, nb::bytes const& data) -> std::size_t + { + int32_t rc = p.send(data.c_str(), data.size()); + if (rc >= 0) + { + return static_cast(rc); + } + // C++ returns negative errno-style codes; translate to + // Python exceptions so callers don't silently drop + // messages by ignoring a "falsy" negative return. + int err = -rc; + if (err == EMSGSIZE) + { + throw nb::value_error( + "message too large: exceeds max_payload_size"); + } + if (err == EAGAIN) + { + PyErr_SetString(PyExc_BlockingIOError, + "slot pool exhausted; try again after " + "subscribers drain"); + throw nb::python_error(); + } + // Any other negative rc: generic OSError with errno. + PyErr_SetFromErrno(PyExc_OSError); + throw nb::python_error(); + }, "data"_a, - "Copy `data` into a slot and publish. Returns bytes written " - "or a negative errno-style code.") + "Copy `data` into a slot and publish (atomic convenience). " + "Returns the number of bytes written. Raises ValueError if " + "the message exceeds max_payload_size, BlockingIOError if " + "the slot pool is exhausted, OSError on other failures.") .def("allocate", - [](Publisher& p, std::size_t len) -> nb::object + [](Publisher& p, std::size_t len) -> std::optional { void* ptr = p.allocate(len); if (ptr == nullptr) { - return nb::none(); + return std::nullopt; } - return memview_writable(ptr, len); + return PyAllocatedSlot{p, ptr, len}; }, "len"_a, - "Reserve a slot of `len` bytes and return a writable memoryview " - "pointing directly at it. Fill the view, then call publish(). " - "Returns None if the pool is exhausted.") - .def("publish", - [](Publisher& p) -> std::size_t { return p.publish(); }, - "Commit the slot reserved by the last allocate() call. " - "Returns the number of rings the sample was delivered to.") - .def("dropped", &Publisher::dropped); + // keep_alive<0, 1>: the returned AllocatedSlot (arg 0) + // must pin the Publisher (arg 1 = self). Memoryviews + // obtained from the slot in turn pin the AllocatedSlot + // (via Py_buffer::obj), so the full chain is + // memoryview → AllocatedSlot → Publisher → SharedRegion. + nb::keep_alive<0, 1>(), + "Reserve a slot of `len` bytes and return an AllocatedSlot. " + "Use memoryview(slot) or numpy.asarray(slot) to fill it " + "in place (zero-copy), then call slot.publish(). Returns " + "None if the pool is exhausted.") + .def_prop_ro("dropped", &Publisher::dropped, + "Per-ring delivery drops (CAS contention or pool exhaustion)."); // ------------------------------------------------------------------- // Subscriber @@ -405,16 +583,31 @@ namespace kickmsg [](Subscriber& s, int64_t timeout_ns) -> std::optional { - nb::gil_scoped_release release; - return s.receive_view(nanoseconds{timeout_ns}); + // Scope the GIL release tightly around the blocking + // wait, matching receive() above. The C++ return value + // is pure C++ (no Python state), so strictly speaking + // the GIL only needs to be released for the futex + // wait itself — but keeping the scope explicit avoids + // any future-footgun if the Python-conversion path + // ever touches CPython state before reacquisition. + std::optional result; + { + nb::gil_scoped_release release; + result = s.receive_view(nanoseconds{timeout_ns}); + } + return result; }, "timeout_ns"_a, nb::rv_policy::move, nb::keep_alive<0, 1>(), "Blocking zero-copy receive. Releases the GIL. Returns a " "SampleView or None on timeout.") - .def("lost", &Subscriber::lost) - .def("drain_timeouts", &Subscriber::drain_timeouts); + .def_prop_ro("lost", &Subscriber::lost, + "Messages the subscriber's ring overflowed past — the " + "publisher evicted them before this subscriber drained.") + .def_prop_ro("drain_timeouts", &Subscriber::drain_timeouts, + "Count of times the subscriber gave up waiting for an " + "in-flight publisher during teardown."); // ------------------------------------------------------------------- // BroadcastHandle @@ -483,7 +676,7 @@ namespace kickmsg .def("topic_schema", &Node::topic_schema, "topic"_a) .def("try_claim_topic_schema", &Node::try_claim_topic_schema, "topic"_a, "info"_a) - .def("name", &Node::name) - .def("prefix", &Node::prefix); + .def_prop_ro("name", &Node::name) + .def_prop_ro("prefix", &Node::prefix); } } diff --git a/tests/python/test_lifetime.py b/tests/python/test_lifetime.py index 9ae4997..6e463db 100644 --- a/tests/python/test_lifetime.py +++ b/tests/python/test_lifetime.py @@ -49,7 +49,7 @@ def test_sample_view_gc_releases_pin(shm_name, small_cfg): pub.send(b"ephemeral") view = sub.try_receive_view() assert view is not None - _ = bytes(view.data()) + _ = bytes(memoryview(view)) # Drop the reference — Python GC should run SampleView's C++ dtor, # which in turn drops the slot pin. diff --git a/tests/python/test_pubsub.py b/tests/python/test_pubsub.py index d140908..4efe1f7 100644 --- a/tests/python/test_pubsub.py +++ b/tests/python/test_pubsub.py @@ -51,8 +51,35 @@ def test_multiple_subscribers_receive_same_message(shm_name, small_cfg): assert sub_b.try_receive() == b"fan-out" -def test_send_larger_than_max_payload_fails(shm_name, small_cfg): +def test_send_larger_than_max_payload_raises(shm_name, small_cfg): + import pytest region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") pub = kickmsg.Publisher(region) oversize = b"x" * (small_cfg.max_payload_size + 1) - assert pub.send(oversize) < 0 # EMSGSIZE-style negative return + with pytest.raises(ValueError, match="max_payload_size"): + pub.send(oversize) + + +def test_send_pool_exhausted_raises(shm_name, small_cfg): + """When the pool is dry (all slots pinned, none free), send() raises + BlockingIOError — never a silent negative-int a Python user would + discard. Simulated by a subscriber that holds SampleView pins and + never releases them, so slots can't be recycled.""" + import pytest + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + # Publish + pin one by one until the pool runs dry. Keep the views + # alive in a list so their pins prevent the slots returning to the pool. + pinned = [] + sent_ok = 0 + with pytest.raises(BlockingIOError): + for _ in range(small_cfg.pool_size * 4): + pub.send(b"x") + sent_ok += 1 + view = sub.try_receive_view() + if view is not None: + pinned.append(view) # refcount held → slot can't recycle + assert sent_ok > 0 + assert len(pinned) > 0 diff --git a/tests/python/test_schema.py b/tests/python/test_schema.py index e1691f4..33207e2 100644 --- a/tests/python/test_schema.py +++ b/tests/python/test_schema.py @@ -52,9 +52,9 @@ def test_schema_diff_detects_mismatch(shm_name, small_cfg): assert got is not None d = kickmsg.schema.diff(got, expected_v2) - assert d & int(kickmsg.schema.Diff.Version) - assert not (d & int(kickmsg.schema.Diff.Identity)) - assert not (d & int(kickmsg.schema.Diff.Name)) + assert d & kickmsg.schema.Diff.Version + assert not (d & kickmsg.schema.Diff.Identity) + assert not (d & kickmsg.schema.Diff.Name) def test_schema_reset_recovers_wedged_claiming(shm_name, small_cfg): diff --git a/tests/python/test_zerocopy.py b/tests/python/test_zerocopy.py index 5c75b8f..209856a 100644 --- a/tests/python/test_zerocopy.py +++ b/tests/python/test_zerocopy.py @@ -1,28 +1,42 @@ -"""Zero-copy publish (allocate/publish) and zero-copy receive (SampleView).""" +"""Zero-copy publish via AllocatedSlot, zero-copy receive via SampleView. + +Both paths use the Python buffer protocol: + - `memoryview(slot)` → writable view into the publisher-reserved SHM slot + - `memoryview(view)` → read-only view into the subscriber-pinned SHM slot +The memoryview pins its source object alive (Py_buffer::obj + Py_INCREF), +so retaining a memoryview past the Python reference to slot/view keeps the +underlying shared memory valid until the last memoryview is released. +""" from __future__ import annotations +import pytest + import kickmsg -def test_publisher_allocate_returns_writable_memoryview(shm_name, small_cfg): +def test_publisher_allocate_returns_writable_slot(shm_name, small_cfg): region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") pub = kickmsg.Publisher(region) sub = kickmsg.Subscriber(region) - # Reserve a slot, write directly into the shared-memory page, then publish. - buf = pub.allocate(16) - assert buf is not None - assert not buf.readonly - buf[:5] = b"hello" - buf[5:16] = b"-zerocopy!!" # 11 bytes - assert pub.publish() > 0 + slot = pub.allocate(16) + assert slot is not None + assert len(slot) == 16 + assert not slot.published + + mv = memoryview(slot) + assert not mv.readonly + mv[:5] = b"hello" + mv[5:16] = b"-zerocopy!!" + slot.publish() + assert slot.published got = sub.try_receive() assert got == b"hello-zerocopy!!" -def test_subscriber_view_buffer_protocol_is_readonly(shm_name, small_cfg): +def test_subscriber_view_memoryview_is_readonly(shm_name, small_cfg): region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") pub = kickmsg.Publisher(region) sub = kickmsg.Subscriber(region) @@ -30,15 +44,47 @@ def test_subscriber_view_buffer_protocol_is_readonly(shm_name, small_cfg): pub.send(b"hello-view") view = sub.try_receive_view() assert view is not None - assert view.valid() - assert view.len() == len(b"hello-view") + assert view.valid + assert len(view) == len(b"hello-view") - mv = view.data() - # memoryview directly into SHM — must be read-only. + mv = memoryview(view) assert mv.readonly assert bytes(mv) == b"hello-view" +def test_slot_publish_rejects_new_memoryview(shm_name, small_cfg): + """After slot.publish(), requesting a new memoryview must fail with + BufferError — the slot is no longer the publisher's to write to, and + silently allowing a write would corrupt subscribers in flight. + """ + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + slot = pub.allocate(8) + mv1 = memoryview(slot) + mv1[:4] = b"xxxx" + slot.publish() + + with pytest.raises(BufferError): + memoryview(slot) + + # mv1 was obtained before publish() and is still usable as a pointer + # (the pin keeps it valid); that's the "retained memoryview" case + # documented as user-beware. Subscriber still receives what was written. + assert sub.try_receive()[:4] == b"xxxx" + + +def test_double_publish_raises(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + slot = pub.allocate(4) + memoryview(slot)[:] = b"ping" + slot.publish() + with pytest.raises(ValueError, match="more than once"): + slot.publish() + + def test_view_release_returns_slot_to_pool(shm_name, small_cfg): region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") pub = kickmsg.Publisher(region) @@ -50,20 +96,33 @@ def test_view_release_returns_slot_to_pool(shm_name, small_cfg): pub.send(b"ephemeral") view = sub.try_receive_view() assert view is not None - _ = bytes(view.data()) + _ = bytes(memoryview(view)) view.release() for i in range(small_cfg.pool_size): assert pub.send(f"msg-{i}".encode()) > 0 +def test_view_release_makes_new_memoryview_error(shm_name, small_cfg): + """After view.release(), a new memoryview(view) must raise BufferError.""" + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + pub.send(b"fleeting") + view = sub.try_receive_view() + assert view is not None + _ = memoryview(view) # valid while pin is held + view.release() + with pytest.raises(BufferError): + memoryview(view) + + def test_zerocopy_camera_frame_pattern(shm_name): - """Mimic the robot-camera use case: - allocate directly into SHM, fill (here with a byte pattern; in real - code this would be a memcpy from V4L2 / OpenCV / DMA), publish, - receive as zero-copy memoryview, interpret as bytes without any - intermediate copy on either side. - """ + """Mimic the robot-camera use case: allocate directly into SHM, fill + with a byte pattern (stand-in for a real capture), publish, then + receive as zero-copy memoryview — no intermediate copies on either + side of the channel.""" cfg = kickmsg.Config() cfg.max_subscribers = 2 cfg.sub_ring_capacity = 4 @@ -75,20 +134,21 @@ def test_zerocopy_camera_frame_pattern(shm_name): sub = kickmsg.Subscriber(region) frame_size = 320 * 240 * 3 # smaller test frame - buf = pub.allocate(frame_size) - assert buf is not None + slot = pub.allocate(frame_size) + assert slot is not None + buf = memoryview(slot) # Fill with a pattern — cheap, full write coverage. for i in range(0, frame_size, 4096): chunk = min(4096, frame_size - i) buf[i:i + chunk] = bytes([(i + j) & 0xFF for j in range(chunk)]) - pub.publish() + slot.publish() view = sub.try_receive_view() try: assert view is not None - assert view.len() == frame_size - # Spot-check some bytes without copying the whole frame. - mv = view.data() + assert len(view) == frame_size + mv = memoryview(view) + # Spot-check a few bytes without copying the whole frame. assert mv[0] == 0 assert mv[1] == 1 assert mv[4095] == 255 @@ -96,3 +156,55 @@ def test_zerocopy_camera_frame_pattern(shm_name): finally: if view is not None: view.release() + + +def test_view_context_manager_releases_pin(shm_name, small_cfg): + """`with view:` must release the pin on normal block exit — the pool + is replenished immediately and new publishes don't starve.""" + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + pub.send(b"in-block") + view = sub.try_receive_view() + with view: + assert bytes(memoryview(view)) == b"in-block" + assert not view.valid + + # Pool must be fully available — publish pool_size more messages. + for i in range(small_cfg.pool_size): + assert pub.send(f"m{i}".encode()) > 0 + + +def test_view_context_manager_releases_on_exception(shm_name, small_cfg): + """The pin must drop even when the `with` block raises.""" + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + pub.send(b"throws") + view = sub.try_receive_view() + with pytest.raises(RuntimeError, match="boom"): + with view: + _ = memoryview(view) + raise RuntimeError("boom") + assert not view.valid + + +def test_memoryview_pins_sampleview(shm_name, small_cfg): + """Retain a memoryview past the SampleView Python reference — the + pin must hold (reading is defined, not UB). This is the load-bearing + safety guarantee of the buffer-protocol change. + """ + import gc + + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + + pub.send(b"retained") + view = sub.try_receive_view() + mv = memoryview(view) + del view # only `mv` keeps the SampleView alive now + gc.collect(); gc.collect() + assert bytes(mv) == b"retained" From 5cc6673df0442c71a90cd9195020cfad163e580e Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Tue, 14 Apr 2026 11:39:20 +0200 Subject: [PATCH 4/5] Improve bindings --- py_bindings/src/kickmsg_py.cc | 99 ++++++++++++++++++++++++++-------- tests/python/test_pubsub.py | 18 ++++++- tests/python/test_threading.py | 3 +- 3 files changed, 95 insertions(+), 25 deletions(-) diff --git a/py_bindings/src/kickmsg_py.cc b/py_bindings/src/kickmsg_py.cc index 49a5896..877c2ca 100644 --- a/py_bindings/src/kickmsg_py.cc +++ b/py_bindings/src/kickmsg_py.cc @@ -61,6 +61,7 @@ #include #include +#include #include #include @@ -225,13 +226,20 @@ namespace kickmsg .def_rw("sub_ring_capacity", &channel::Config::sub_ring_capacity) .def_rw("pool_size", &channel::Config::pool_size) .def_rw("max_payload_size", &channel::Config::max_payload_size) - .def_prop_rw("commit_timeout_us", - [](channel::Config const& c) -> int64_t - { return c.commit_timeout.count(); }, - [](channel::Config& c, int64_t us) - { c.commit_timeout = microseconds{us}; }, - "Commit timeout in microseconds.") - .def_rw("schema", &channel::Config::schema); + .def_prop_rw("commit_timeout", + [](channel::Config const& c) -> microseconds + { return c.commit_timeout; }, + [](channel::Config& c, microseconds us) + { c.commit_timeout = us; }, + "Commit timeout as a timedelta (microsecond resolution).") + .def_rw("schema", &channel::Config::schema) + .def("__repr__", [](channel::Config const& c) + { + return std::string{"Config(max_subscribers="} + + std::to_string(c.max_subscribers) + + ", pool_size=" + std::to_string(c.pool_size) + + ", max_payload_size=" + std::to_string(c.max_payload_size) + ")"; + }); // ------------------------------------------------------------------- // SchemaInfo + schema submodule (Diff / diff) @@ -359,7 +367,14 @@ namespace kickmsg .def("repair_locked_entries", &SharedRegion::repair_locked_entries) .def("reset_retired_rings", &SharedRegion::reset_retired_rings) .def("reclaim_orphaned_slots",&SharedRegion::reclaim_orphaned_slots) - .def("unlink", &SharedRegion::unlink); + .def("unlink", &SharedRegion::unlink) + .def("__repr__", [](SharedRegion const& r) + { + std::string type_str = + (r.channel_type() == channel::PubSub) ? "PubSub" : "Broadcast"; + return std::string{"SharedRegion(name='"} + r.name() + + "', type=" + type_str + ")"; + }); m.def("unlink_shm", [](std::string const& name) { SharedMemory::unlink(name); }, "name"_a, "Unlink a shared-memory entry by name (no-op if absent)."); @@ -422,7 +437,12 @@ namespace kickmsg nb::rv_policy::reference_internal) .def("__exit__", [](Subscriber::SampleView& v, nb::args /*exc_info*/) - { v = Subscriber::SampleView{}; }); + { v = Subscriber::SampleView{}; }) + .def("__repr__", [](Subscriber::SampleView const& v) + { + return std::string{"SampleView(len="} + std::to_string(v.len()) + + ", valid=" + (v.valid() ? "True" : "False") + ")"; + }); // ------------------------------------------------------------------- // AllocatedSlot — handle returned by Publisher.allocate(). @@ -457,7 +477,12 @@ namespace kickmsg .def("__len__", [](PyAllocatedSlot const& s) -> std::size_t { return s.len; }) .def_prop_ro("published", - [](PyAllocatedSlot const& s) -> bool { return s.published; }); + [](PyAllocatedSlot const& s) -> bool { return s.published; }) + .def("__repr__", [](PyAllocatedSlot const& s) + { + return std::string{"AllocatedSlot(len="} + std::to_string(s.len) + + ", published=" + (s.published ? "True" : "False") + ")"; + }); // ------------------------------------------------------------------- // Publisher @@ -525,7 +550,12 @@ namespace kickmsg "in place (zero-copy), then call slot.publish(). Returns " "None if the pool is exhausted.") .def_prop_ro("dropped", &Publisher::dropped, - "Per-ring delivery drops (CAS contention or pool exhaustion)."); + "Per-ring delivery drops (CAS contention or pool exhaustion).") + .def("__repr__", [](Publisher const& p) + { + return std::string{"Publisher(dropped="} + + std::to_string(p.dropped()) + ")"; + }); // ------------------------------------------------------------------- // Subscriber @@ -551,12 +581,12 @@ namespace kickmsg "Non-blocking receive. Returns bytes on success, None if " "no message is available.") .def("receive", - [](Subscriber& s, int64_t timeout_ns) -> nb::object + [](Subscriber& s, nanoseconds timeout) -> nb::object { std::optional sample; { nb::gil_scoped_release release; - sample = s.receive(nanoseconds{timeout_ns}); + sample = s.receive(timeout); } if (not sample.has_value()) { @@ -566,8 +596,8 @@ namespace kickmsg reinterpret_cast(sample->data()), sample->len()); }, - "timeout_ns"_a, - "Blocking receive with timeout. Releases the GIL while " + "timeout"_a, + "Blocking receive with timeout (timedelta). Releases the GIL while " "waiting. Returns bytes on success, None on timeout.") // keep_alive<0, 1>: the returned SampleView (arg 0) must keep // the Subscriber (arg 1 = self) alive — the view dereferences @@ -580,7 +610,7 @@ namespace kickmsg "Non-blocking zero-copy receive. Returns a SampleView (pins " "the slot) or None.") .def("receive_view", - [](Subscriber& s, int64_t timeout_ns) + [](Subscriber& s, nanoseconds timeout) -> std::optional { // Scope the GIL release tightly around the blocking @@ -593,21 +623,37 @@ namespace kickmsg std::optional result; { nb::gil_scoped_release release; - result = s.receive_view(nanoseconds{timeout_ns}); + result = s.receive_view(timeout); } return result; }, - "timeout_ns"_a, + "timeout"_a, nb::rv_policy::move, nb::keep_alive<0, 1>(), - "Blocking zero-copy receive. Releases the GIL. Returns a " + "Blocking zero-copy receive (timedelta timeout). Releases the GIL. Returns a " "SampleView or None on timeout.") .def_prop_ro("lost", &Subscriber::lost, "Messages the subscriber's ring overflowed past — the " "publisher evicted them before this subscriber drained.") .def_prop_ro("drain_timeouts", &Subscriber::drain_timeouts, "Count of times the subscriber gave up waiting for an " - "in-flight publisher during teardown."); + "in-flight publisher during teardown.") + .def("__repr__", [](Subscriber const& s) + { + return std::string{"Subscriber(lost="} + std::to_string(s.lost()) + + ", drain_timeouts=" + std::to_string(s.drain_timeouts()) + ")"; + }) + .def("__iter__", [](Subscriber& s) -> Subscriber& { return s; }, + nb::rv_policy::reference) + .def("__next__", [](Subscriber& s) -> nb::bytes + { + auto sample = s.try_receive(); + if (!sample.has_value()) + { + throw nb::stop_iteration(); + } + return nb::bytes(reinterpret_cast(sample->data()), sample->len()); + }); // ------------------------------------------------------------------- // BroadcastHandle @@ -622,7 +668,11 @@ namespace kickmsg nb::rv_policy::reference_internal) .def_prop_ro("sub", [](BroadcastHandle& h) -> Subscriber& { return h.sub; }, - nb::rv_policy::reference_internal); + nb::rv_policy::reference_internal) + .def("__repr__", [](BroadcastHandle const& /*h*/) + { + return std::string{"BroadcastHandle(pub=Publisher, sub=Subscriber)"}; + }); // ------------------------------------------------------------------- // Node @@ -677,6 +727,11 @@ namespace kickmsg .def("try_claim_topic_schema", &Node::try_claim_topic_schema, "topic"_a, "info"_a) .def_prop_ro("name", &Node::name) - .def_prop_ro("prefix", &Node::prefix); + .def_prop_ro("prefix", &Node::prefix) + .def("__repr__", [](Node const& n) + { + return std::string{"Node(name='"} + n.name() + + "', prefix='" + n.prefix() + "')"; + }); } } diff --git a/tests/python/test_pubsub.py b/tests/python/test_pubsub.py index 4efe1f7..0ba7cc2 100644 --- a/tests/python/test_pubsub.py +++ b/tests/python/test_pubsub.py @@ -2,6 +2,8 @@ from __future__ import annotations +from datetime import timedelta + import kickmsg @@ -27,7 +29,7 @@ def test_receive_with_timeout_returns_none(shm_name, small_cfg): region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") sub = kickmsg.Subscriber(region) # 10 ms timeout, no publisher — must return None without throwing. - got = sub.receive(10_000_000) + got = sub.receive(timedelta(milliseconds=10)) assert got is None @@ -36,7 +38,7 @@ def test_receive_with_timeout_delivers(shm_name, small_cfg): pub = kickmsg.Publisher(region) sub = kickmsg.Subscriber(region) pub.send(b"payload") - got = sub.receive(100_000_000) # 100 ms + got = sub.receive(timedelta(milliseconds=100)) assert got == b"payload" @@ -60,6 +62,18 @@ def test_send_larger_than_max_payload_raises(shm_name, small_cfg): pub.send(oversize) +def test_subscriber_iteration(shm_name, small_cfg): + region = kickmsg.SharedRegion.create(shm_name, kickmsg.ChannelType.PubSub, small_cfg, "test") + pub = kickmsg.Publisher(region) + sub = kickmsg.Subscriber(region) + for i in range(5): + pub.send(f"msg-{i}".encode()) + received = list(sub) + assert len(received) == 5 + assert received[0] == b"msg-0" + assert received[4] == b"msg-4" + + def test_send_pool_exhausted_raises(shm_name, small_cfg): """When the pool is dry (all slots pinned, none free), send() raises BlockingIOError — never a silent negative-int a Python user would diff --git a/tests/python/test_threading.py b/tests/python/test_threading.py index b609f13..997e7a5 100644 --- a/tests/python/test_threading.py +++ b/tests/python/test_threading.py @@ -5,6 +5,7 @@ from __future__ import annotations import threading +from datetime import timedelta import kickmsg @@ -23,7 +24,7 @@ def consumer(): # 500 ms is plenty — GIL release lets the producer thread publish # immediately; the futex wakes us up on commit; total wall time # should be milliseconds, not approaching the timeout. - got = sub.receive(500_000_000) + got = sub.receive(timedelta(milliseconds=500)) if got is not None: received.append(got) From 10f928fa0bf23a8108044a099cf1beff0e72426f Mon Sep 17 00:00:00 2001 From: Philippe Leduc Date: Tue, 14 Apr 2026 14:47:46 +0200 Subject: [PATCH 5/5] Fix: poisoned ring entry slow down all publishers. Add auto repair --- ARCHITECTURE.md | 42 +++-- py_bindings/kickmsg.pyi | 313 +++++++++++++++++++++++++++++++++++++ src/Publisher.cc | 17 ++ src/Region.cc | 45 ++++-- src/Subscriber.cc | 2 +- tests/unit/publisher-t.cc | 171 ++++++++++++++++++++ tests/unit/region-t.cc | 66 ++++++++ tests/unit/subscriber-t.cc | 32 ++-- 8 files changed, 652 insertions(+), 36 deletions(-) create mode 100644 py_bindings/kickmsg.pyi diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 95b085c..b9f0677 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -678,16 +678,30 @@ After refcount pre-set, during Refcount was set to max_subs but are never released. The slot is permanently leaked. -After CAS on write_pos, before Entry is uncommitted (sequence - sequence store (the dangerous never written). Next publisher at - window) the same ring position waits up to - commit_timeout, then overwrites - the entry with its own data. The - ring entry itself is NOT leaked - (it is overwritten). The pool slot +After CAS on write_pos, before Two sub-cases depending on whether + sequence store (the dangerous the publisher reached the CAS lock: + window) + Case A — crash after CAS lock + (entry stuck at LOCKED_SEQUENCE): + Next publisher at this position + waits commit_timeout and drops. + repair_locked_entries() advances + the entry to the expected sequence. + + Case B — crash before CAS lock + (entry still at the previous + cycle's committed sequence): + Next publisher at this position + also waits commit_timeout and drops. + repair_locked_entries() detects the + entry is more than one full wrap + behind and advances it. + + In both cases, the pool slot referenced by the crashed entry - cannot be safely released (slot_idx - may be garbage), so it is leaked. + may be garbage — it is marked + INVALID_SLOT by the repair, and + recovered by reclaim_orphaned_slots. Subscriber sees a gap (lost msg). After sequence store No issue. Entry is committed. @@ -715,8 +729,14 @@ publisher is mid-commit on that entry and will release shortly. On timeout (returns `INVALID_SLOT`), the publisher: 1. Skips `release_slot()` (the old `slot_idx` may be garbage) -2. Overwrites the entry with its own data via the two-phase commit -3. The ring resumes normal operation +2. Attempts the CAS lock — if it fails, the publisher **self-repairs** + the stuck entry in place (stores `INVALID_SLOT` + the expected + sequence) so the next publisher at this position succeeds without + paying the timeout. Self-repair handles both Case A (LOCKED) and + Case B (stale), costs ~10 ns on top of the already-spent timeout, + and is safe under live traffic (idempotent stores). +3. Drops delivery for this ring and moves to the next subscriber ring. + The ring resumes normal operation on the next wrap. The timeout is configurable per channel via `channel::Config::commit_timeout` (default: 100 ms). The tradeoff: diff --git a/py_bindings/kickmsg.pyi b/py_bindings/kickmsg.pyi new file mode 100644 index 0000000..fd6d000 --- /dev/null +++ b/py_bindings/kickmsg.pyi @@ -0,0 +1,313 @@ +"""Kickmsg — lock-free shared-memory IPC""" + +import datetime +import enum + +from . import hash as hash, schema as schema + + +class ChannelType(enum.Enum): + PubSub = 1 + + Broadcast = 2 + +class Config: + def __init__(self) -> None: ... + + @property + def max_subscribers(self) -> int: ... + + @max_subscribers.setter + def max_subscribers(self, arg: int, /) -> None: ... + + @property + def sub_ring_capacity(self) -> int: ... + + @sub_ring_capacity.setter + def sub_ring_capacity(self, arg: int, /) -> None: ... + + @property + def pool_size(self) -> int: ... + + @pool_size.setter + def pool_size(self, arg: int, /) -> None: ... + + @property + def max_payload_size(self) -> int: ... + + @max_payload_size.setter + def max_payload_size(self, arg: int, /) -> None: ... + + @property + def commit_timeout(self) -> datetime.timedelta: + """Commit timeout as a timedelta (microsecond resolution).""" + + @commit_timeout.setter + def commit_timeout(self, arg: datetime.timedelta | float, /) -> None: ... + + @property + def schema(self) -> SchemaInfo | None: ... + + @schema.setter + def schema(self, arg: SchemaInfo | None) -> None: ... + + def __repr__(self) -> str: ... + +class SchemaInfo: + def __init__(self) -> None: ... + + @property + def identity(self) -> bytes: ... + + @identity.setter + def identity(self, arg: bytes, /) -> None: ... + + @property + def layout(self) -> bytes: ... + + @layout.setter + def layout(self, arg: bytes, /) -> None: ... + + @property + def name(self) -> str: ... + + @name.setter + def name(self, arg: str, /) -> None: ... + + @property + def version(self) -> int: ... + + @version.setter + def version(self, arg: int, /) -> None: ... + + @property + def identity_algo(self) -> int: ... + + @identity_algo.setter + def identity_algo(self, arg: int, /) -> None: ... + + @property + def layout_algo(self) -> int: ... + + @layout_algo.setter + def layout_algo(self, arg: int, /) -> None: ... + + @property + def flags(self) -> int: ... + + @flags.setter + def flags(self, arg: int, /) -> None: ... + + def __repr__(self) -> str: ... + +class HealthReport: + @property + def locked_entries(self) -> int: ... + + @property + def retired_rings(self) -> int: ... + + @property + def draining_rings(self) -> int: ... + + @property + def live_rings(self) -> int: ... + + @property + def schema_stuck(self) -> bool: ... + + def __repr__(self) -> str: ... + +class SharedRegion: + @staticmethod + def create(name: str, type: ChannelType, cfg: Config, creator: str = '') -> SharedRegion: ... + + @staticmethod + def open(name: str) -> SharedRegion: ... + + @staticmethod + def create_or_open(name: str, type: ChannelType, cfg: Config, creator: str = '') -> SharedRegion: ... + + @property + def name(self) -> str: ... + + @property + def channel_type(self) -> ChannelType: ... + + def schema(self) -> SchemaInfo | None: ... + + def try_claim_schema(self, info: SchemaInfo) -> bool: ... + + def reset_schema_claim(self) -> bool: ... + + def diagnose(self) -> HealthReport: ... + + def repair_locked_entries(self) -> int: ... + + def reset_retired_rings(self) -> int: ... + + def reclaim_orphaned_slots(self) -> int: ... + + def unlink(self) -> None: ... + + def __repr__(self) -> str: ... + +def unlink_shm(name: str) -> None: + """Unlink a shared-memory entry by name (no-op if absent).""" + +class SampleView: + def __buffer__(self, flags, /): + """ + Return a buffer object that exposes the underlying memory of the object. + """ + + def __release_buffer__(self, buffer, /): + """ + Release the buffer object that exposes the underlying memory of the object. + """ + + def __len__(self) -> int: ... + + @property + def ring_pos(self) -> int: ... + + @property + def valid(self) -> bool: ... + + def release(self) -> None: + """ + Release the slot pin early. Idempotent; after this, any NEW memoryview(view) call raises BufferError. Memoryviews obtained before .release() remain valid as pointers but should not be used (the pin is gone). + """ + + def __enter__(self) -> SampleView: ... + + def __exit__(self, *args) -> None: ... + + def __repr__(self) -> str: ... + +class AllocatedSlot: + def __buffer__(self, flags, /): + """ + Return a buffer object that exposes the underlying memory of the object. + """ + + def __release_buffer__(self, buffer, /): + """ + Release the buffer object that exposes the underlying memory of the object. + """ + + def publish(self) -> int: + """ + Commit the reserved slot. Returns the number of rings the sample was delivered to. After this call, any NEW memoryview(slot) fails with BufferError. + """ + + def __len__(self) -> int: ... + + @property + def published(self) -> bool: ... + + def __repr__(self) -> str: ... + +class Publisher: + def __init__(self, region: SharedRegion) -> None: ... + + def send(self, data: bytes) -> int: + """ + Copy `data` into a slot and publish (atomic convenience). Returns the number of bytes written. Raises ValueError if the message exceeds max_payload_size, BlockingIOError if the slot pool is exhausted, OSError on other failures. + """ + + def allocate(self, len: int) -> AllocatedSlot | None: + """ + Reserve a slot of `len` bytes and return an AllocatedSlot. Use memoryview(slot) or numpy.asarray(slot) to fill it in place (zero-copy), then call slot.publish(). Returns None if the pool is exhausted. + """ + + @property + def dropped(self) -> int: + """Per-ring delivery drops (CAS contention or pool exhaustion).""" + + def __repr__(self) -> str: ... + +class Subscriber: + def __init__(self, region: SharedRegion) -> None: ... + + def try_receive(self) -> object: + """ + Non-blocking receive. Returns bytes on success, None if no message is available. + """ + + def receive(self, timeout: datetime.timedelta | float) -> object: + """ + Blocking receive with timeout (timedelta). Releases the GIL while waiting. Returns bytes on success, None on timeout. + """ + + def try_receive_view(self) -> SampleView | None: + """ + Non-blocking zero-copy receive. Returns a SampleView (pins the slot) or None. + """ + + def receive_view(self, timeout: datetime.timedelta | float) -> SampleView | None: + """ + Blocking zero-copy receive (timedelta timeout). Releases the GIL. Returns a SampleView or None on timeout. + """ + + @property + def lost(self) -> int: + """ + Messages the subscriber's ring overflowed past — the publisher evicted them before this subscriber drained. + """ + + @property + def drain_timeouts(self) -> int: + """ + Count of times the subscriber gave up waiting for an in-flight publisher during teardown. + """ + + def __repr__(self) -> str: ... + + def __iter__(self) -> Subscriber: ... + + def __next__(self) -> bytes: ... + +class BroadcastHandle: + @property + def pub(self) -> Publisher: ... + + @property + def sub(self) -> Subscriber: ... + + def __repr__(self) -> str: ... + +class Node: + def __init__(self, name: str, prefix: str = 'kickmsg') -> None: ... + + def advertise(self, topic: str, cfg: Config = ...) -> Publisher: ... + + def subscribe(self, topic: str) -> Subscriber: ... + + def advertise_or_join(self, topic: str, cfg: Config) -> Publisher: ... + + def subscribe_or_create(self, topic: str, cfg: Config) -> Subscriber: ... + + def join_broadcast(self, channel: str, cfg: Config = ...) -> BroadcastHandle: ... + + def create_mailbox(self, tag: str, cfg: Config = ...) -> Subscriber: ... + + def open_mailbox(self, owner_node: str, tag: str) -> Publisher: ... + + def unlink_topic(self, topic: str) -> None: ... + + def unlink_broadcast(self, channel: str) -> None: ... + + def unlink_mailbox(self, tag: str, owner_node: str | None = None) -> None: ... + + def topic_schema(self, topic: str) -> SchemaInfo | None: ... + + def try_claim_topic_schema(self, topic: str, info: SchemaInfo) -> bool: ... + + @property + def name(self) -> str: ... + + @property + def prefix(self) -> str: ... + + def __repr__(self) -> str: ... diff --git a/src/Publisher.cc b/src/Publisher.cc index efde399..19f1102 100644 --- a/src/Publisher.cc +++ b/src/Publisher.cc @@ -167,6 +167,23 @@ namespace kickmsg } if (not locked) { + // Self-repair: if the entry is stuck (LOCKED_SEQUENCE from + // a crashed publisher, or stale from a publisher that + // crashed before the CAS lock), advance it so the NEXT + // publisher at this position succeeds without timeout. + // Cost: three stores (~10 ns) after an already-expensive + // timeout (~10 ms). Always the right thing — leaving the + // entry stuck just punishes the next publisher for the + // same crash. + uint64_t seq = e.sequence.load(std::memory_order_acquire); + uint64_t expected = pos + 1; + if (seq == LOCKED_SEQUENCE or seq + capacity < expected) + { + e.slot_idx.store(INVALID_SLOT, std::memory_order_relaxed); + e.payload_len.store(0, std::memory_order_relaxed); + e.sequence.store(expected, std::memory_order_release); + } + ++dropped_; ++excess; ring->state_flight.fetch_sub(ring::IN_FLIGHT_ONE, diff --git a/src/Region.cc b/src/Region.cc index e254b71..2679e0a 100644 --- a/src/Region.cc +++ b/src/Region.cc @@ -265,8 +265,13 @@ namespace kickmsg } for (uint64_t pos = start; pos < wp; ++pos) { - auto& e = entries[pos & h->sub_ring_mask]; - if (e.sequence.load(std::memory_order_acquire) == LOCKED_SEQUENCE) + auto& e = entries[pos & h->sub_ring_mask]; + uint64_t seq = e.sequence.load(std::memory_order_acquire); + + // Case A: explicitly locked, never committed. + // Case B: more than one full wrap behind — stale from a + // publisher that crashed before the CAS lock. + if (seq == LOCKED_SEQUENCE or seq + cap < pos + 1) { ++report.locked_entries; } @@ -313,22 +318,40 @@ namespace kickmsg } for (uint64_t pos = start; pos < wp; ++pos) { - auto& e = entries[pos & h->sub_ring_mask]; - uint64_t seq = e.sequence.load(std::memory_order_acquire); + auto& e = entries[pos & h->sub_ring_mask]; + uint64_t seq = e.sequence.load(std::memory_order_acquire); + uint64_t expected = pos + 1; if (seq == LOCKED_SEQUENCE) { - // The crashed publisher may have written garbage into - // slot_idx/payload_len. Mark the entry as having no + // Case A: publisher CAS'd Unset → LOCKED_SEQUENCE then + // crashed before the release-store of (pos + 1). The + // crashed publisher may have written garbage into + // slot_idx/payload_len. Mark the entry as having no // valid slot so subscribers skip it and future evictions // don't release a stale index. e.slot_idx.store(INVALID_SLOT, std::memory_order_relaxed); e.payload_len.store(0, std::memory_order_relaxed); - - // Commit with the sequence future publishers expect: - // pos + 1 (not prev_seq). A publisher wrapping to this - // position will CAS(pos + 1 → LOCKED), which now succeeds. - e.sequence.store(pos + 1, std::memory_order_release); + e.sequence.store(expected, std::memory_order_release); + ++repaired; + } + else if (seq + cap < expected) + { + // Case B: publisher claimed write_pos (fetch_add) then + // crashed before the CAS lock. The entry still carries + // its committed sequence from a previous wrap — more + // than one full ring revolution behind. No live + // publisher can be mid-commit for longer than one wrap + // (the commit path is a few instructions between + // fetch_add and the CAS), so > 1 wrap behind is + // definitively stale. + // + // slot_idx may reference a still-live slot from the + // previous cycle; mark INVALID_SLOT to avoid a double + // release. + e.slot_idx.store(INVALID_SLOT, std::memory_order_relaxed); + e.payload_len.store(0, std::memory_order_relaxed); + e.sequence.store(expected, std::memory_order_release); ++repaired; } } diff --git a/src/Subscriber.cc b/src/Subscriber.cc index 0d39004..076b3d2 100644 --- a/src/Subscriber.cc +++ b/src/Subscriber.cc @@ -145,7 +145,7 @@ namespace kickmsg start_pos_ = other.start_pos_; read_pos_ = other.read_pos_; lost_ = other.lost_; - drain_timeouts_ += other.drain_timeouts_; + drain_timeouts_ = other.drain_timeouts_; recv_buf_ = std::move(other.recv_buf_); other.ring_idx_ = UINT32_MAX; diff --git a/tests/unit/publisher-t.cc b/tests/unit/publisher-t.cc index 663715a..2a92767 100644 --- a/tests/unit/publisher-t.cc +++ b/tests/unit/publisher-t.cc @@ -180,3 +180,174 @@ TEST_F(PublisherTest, MultipleSubscribersEachReceive) EXPECT_EQ(r1, 7u); EXPECT_EQ(r2, 7u); } + +// --------------------------------------------------------------------------- +// Publisher self-repair: when a publisher hits a stuck entry (timeout + +// CAS lock failure), it heals the entry in place so the next publisher +// at that position succeeds without paying the timeout. +// --------------------------------------------------------------------------- + +TEST_F(PublisherTest, SelfRepairCaseA_LockedSequence) +{ + // Simulate Case A: a publisher CAS-locked an entry (LOCKED_SEQUENCE) + // then crashed before committing. The next publisher that wraps to + // this position should: + // 1. time out in wait_and_capture_slot + // 2. fail the CAS lock (entry is LOCKED_SEQUENCE, not prev_seq) + // 3. self-repair the entry (advance seq to expected) + // 4. drop delivery for this wrap + // After that, the NEXT publisher at this position should succeed + // without any timeout. + + kickmsg::channel::Config cfg; + cfg.max_subscribers = 1; + cfg.sub_ring_capacity = 4; // capacity = 4 + cfg.pool_size = 16; + cfg.max_payload_size = 8; + cfg.commit_timeout = std::chrono::microseconds{1000}; // 1 ms — fast test + + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + // Publish 4 messages (pos 0-3), consume all. + for (int i = 0; i < 4; ++i) + { + uint32_t val = static_cast(i); + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + auto s = sub.try_receive(); + ASSERT_TRUE(s.has_value()); + } + + // Poison entry at idx=0: simulate a publisher that locked pos=4 then + // crashed. write_pos is already 4 (from the 4 publishes above). + auto* ring = kickmsg::sub_ring_at(region.base(), region.header(), 0); + auto* entries = kickmsg::ring_entries(ring); + + // Advance write_pos past pos=4 so the ring has wrapped. + ring->write_pos.store(5, std::memory_order_release); + // Lock entry at idx=0 as if a publisher crashed mid-commit at pos=4. + entries[0].sequence.store(kickmsg::LOCKED_SEQUENCE, + std::memory_order_release); + + // Next publish: pos=5 → idx=1 (clean entry, succeeds). + uint32_t val = 100; + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + + // pos=6 → idx=2 (clean), pos=7 → idx=3 (clean). + val = 101; + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + val = 102; + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + + // pos=8 → idx=0 → POISONED. Publisher times out + drops + self-repairs. + auto before = std::chrono::steady_clock::now(); + val = 103; + pub.send(&val, sizeof(val)); // may drop, but should NOT hang forever + auto after = std::chrono::steady_clock::now(); + + // Sanity: the timeout should have fired (took ~1 ms, not zero). + auto elapsed_us = std::chrono::duration_cast( + after - before).count(); + EXPECT_GE(elapsed_us, 500) + << "Publisher should have waited ~1 ms for the stuck entry"; + + // After self-repair, entry at idx=0 should be advanced. + // The entry was stuck at LOCKED_SEQUENCE for pos=4. The publisher at + // pos=8 expects seq=5 and should have repaired it to 9 (pos=8 + 1). + uint64_t seq0 = entries[0].sequence.load(std::memory_order_acquire); + EXPECT_NE(seq0, kickmsg::LOCKED_SEQUENCE) + << "Self-repair should have advanced the stuck entry"; + + // Now the NEXT publish at idx=0 (pos=12) should succeed WITHOUT timeout. + // First, fill positions 9, 10, 11 (idx 1, 2, 3). + for (int i = 0; i < 3; ++i) + { + val = static_cast(200 + i); + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + } + + // pos=12 → idx=0. If self-repair worked, this should be fast. + before = std::chrono::steady_clock::now(); + val = 203; + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + after = std::chrono::steady_clock::now(); + + elapsed_us = std::chrono::duration_cast( + after - before).count(); + EXPECT_LT(elapsed_us, 500) + << "After self-repair, the next publisher at this position should " + "succeed instantly — not wait for another timeout"; +} + +TEST_F(PublisherTest, SelfRepairCaseB_StaleEntry) +{ + // Simulate Case B: a publisher claimed write_pos (fetch_add) but + // crashed before CAS-locking the entry. The entry still has the + // old committed sequence from a previous wrap. After > 1 wrap, the + // publishing publisher should self-repair the stale entry. + + kickmsg::channel::Config cfg; + cfg.max_subscribers = 1; + cfg.sub_ring_capacity = 4; + cfg.pool_size = 16; + cfg.max_payload_size = 8; + cfg.commit_timeout = std::chrono::microseconds{1000}; + + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + // Publish 4 messages (pos 0-3), consume all. + for (int i = 0; i < 4; ++i) + { + uint32_t val = static_cast(i); + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + auto s = sub.try_receive(); + ASSERT_TRUE(s.has_value()); + } + + // Entry at idx=0 has seq=1 (committed for pos=0). + auto* ring = kickmsg::sub_ring_at(region.base(), region.header(), 0); + auto* entries = kickmsg::ring_entries(ring); + + // Simulate: publisher claimed pos=4 (fetch_add) but crashed before + // touching the entry. Advance write_pos by TWO full wraps so the + // entry at idx=0 is > 1 wrap stale. + ring->write_pos.store(12, std::memory_order_release); + + // Entry idx=0 still has seq=1. A publisher at pos=8 (idx=0) expects + // seq=5. seq=1 + cap=4 = 5 which is NOT < 5, so it needs to be + // strictly more than one wrap. At pos=12, expected=9. 1+4=5 < 9 → stale. + + // Publish at pos=12 → idx=0: should timeout + self-repair + drop. + uint32_t val = 999; + pub.send(&val, sizeof(val)); // drops at idx=0 but self-repairs + + // Verify the entry was repaired. + uint64_t seq0 = entries[0].sequence.load(std::memory_order_acquire); + EXPECT_GT(seq0, 1u) + << "Self-repair should have advanced the stale entry past seq=1"; + + // Next publisher at this position should succeed quickly. + // Advance write_pos to 16 so pos=16 targets idx=0. + // But we need to fill pos 13, 14, 15 first (idx 1, 2, 3). + for (int i = 0; i < 3; ++i) + { + val = static_cast(300 + i); + pub.send(&val, sizeof(val)); + } + + auto before = std::chrono::steady_clock::now(); + val = 303; + pub.send(&val, sizeof(val)); // pos=16 → idx=0, should be fast + auto after = std::chrono::steady_clock::now(); + + auto elapsed_us = std::chrono::duration_cast( + after - before).count(); + EXPECT_LT(elapsed_us, 500) + << "After self-repair of stale entry, next publisher should succeed " + "without timeout"; +} diff --git a/tests/unit/region-t.cc b/tests/unit/region-t.cc index 704b9ae..4415f3e 100644 --- a/tests/unit/region-t.cc +++ b/tests/unit/region-t.cc @@ -280,6 +280,72 @@ TEST_F(RegionTest, RepairLockedEntryUnblocksPublishing) EXPECT_GT(received, 0); } +TEST_F(RegionTest, RepairStaleEntryFromCrashedPublisherBeforeCasLock) +{ + // Case B: publisher claimed write_pos (fetch_add) but crashed before + // CAS-locking the entry. The entry still has the committed sequence + // from the previous wrap. After more than one full wrap, the entry + // is detectably stale (> 1 ring revolution behind) and + // repair_locked_entries() should advance it. + + kickmsg::channel::Config cfg; + cfg.max_subscribers = 1; + cfg.sub_ring_capacity = 4; // capacity = 4 + cfg.pool_size = 16; + cfg.max_payload_size = 8; + + auto region = kickmsg::SharedRegion::create(SHM_NAME, kickmsg::channel::PubSub, cfg); + + kickmsg::Subscriber sub(region); + kickmsg::Publisher pub(region); + + // Fill the ring once: publish 4 messages (pos 0-3), consuming all. + for (int i = 0; i < 4; ++i) + { + uint32_t val = static_cast(i); + ASSERT_GE(pub.send(&val, sizeof(val)), 0); + auto s = sub.try_receive(); + ASSERT_TRUE(s.has_value()); + } + + // Entry at idx=0 now has seq=1 (committed for pos=0). + auto* ring = kickmsg::sub_ring_at(region.base(), region.header(), 0); + auto* entries = kickmsg::ring_entries(ring); + + // Simulate: a publisher claimed pos=4 (fetch_add) targeting idx=0, + // then crashed before the CAS lock. The entry stays at seq=1. + // Advance write_pos past pos=4 by TWO more full wraps so the entry + // becomes > 1 wrap stale. + // write_pos after the 4 real publishes is 4. Set it to 4 + 2*cap = 12. + ring->write_pos.store(12, std::memory_order_release); + // Don't touch entries — they keep their old sequences. Entry idx=0 + // has seq=1, but expected seq at pos=8 (the slot in the scan window) + // is 9. (pos=8 maps to idx=0 because 8 & 3 = 0.) 1 + 4 < 9 → stale. + + auto report = region.diagnose(); + EXPECT_GT(report.locked_entries, 0u) + << "diagnose() should detect the stale entry (Case B)"; + + std::size_t repaired = region.repair_locked_entries(); + EXPECT_GT(repaired, 0u) + << "repair_locked_entries() should advance the stale entry"; + + // After repair, the entry at idx=0 should have seq = expected. + // The expected pos for idx=0 in the window [12-4, 12) = [8, 12) is pos=8. + uint64_t seq0 = entries[0].sequence.load(std::memory_order_acquire); + EXPECT_EQ(seq0, 9u) // pos=8 → expected = 8 + 1 = 9 + << "Stale entry should be advanced to pos + 1"; + + // Publishing should now succeed past the repaired slot. + for (int i = 0; i < 8; ++i) + { + uint32_t val = static_cast(100 + i); + ASSERT_GE(pub.send(&val, sizeof(val)), 0) + << "Publishing failed at iteration " << i + << " — repaired entry may still be stuck"; + } +} + TEST_F(RegionTest, RepairLockedEntryAtPositionZero) { // Edge case: crash at pos=0 where prev_seq was 0. diff --git a/tests/unit/subscriber-t.cc b/tests/unit/subscriber-t.cc index 7b45432..2634a1a 100644 --- a/tests/unit/subscriber-t.cc +++ b/tests/unit/subscriber-t.cc @@ -320,9 +320,11 @@ TEST_F(SubscriberTest, StuckPublisherCausesDrainTimeout) TEST_F(SubscriberTest, DrainTimeoutsCounterIncrementsOnTimeout) { - // Verify drain_timeouts() by using move-assignment: the old ring - // is released (triggering the timeout), and the counter is observable - // on the surviving object. + // Verify drain_timeouts() increments when a subscriber's ring release + // times out (crashed publisher with stuck in_flight). After move- + // assignment, the counter reflects the NEW ring's history (= 0), not + // the old ring's — drain_timeouts is per-ring, not per-object. The + // old ring's timeout is lost with its identity. kickmsg::channel::Config cfg; cfg.max_subscribers = 2; @@ -335,30 +337,34 @@ TEST_F(SubscriberTest, DrainTimeoutsCounterIncrementsOnTimeout) kickmsg::Publisher pub(region); - // sub takes ring 0 + // sub takes ring 0. kickmsg::Subscriber sub(region); uint32_t val = 1; ASSERT_GE(pub.send(&val, sizeof(val)), 0); - EXPECT_EQ(sub.drain_timeouts(), 0u); - // Inflate in_flight on ring 0 to simulate crash + // Inflate in_flight on ring 0 to simulate a crashed publisher. auto* ring0 = kickmsg::sub_ring_at(region.base(), region.header(), 0); ring0->state_flight.fetch_add(kickmsg::ring::IN_FLIGHT_ONE, std::memory_order_acq_rel); // Move-assign from a fresh subscriber (ring 1). - // This triggers release_ring() on the OLD ring (ring 0, stuck). - // The timeout fires, drain_timeouts_ increments, and the counter - // is preserved because the object survives the move. + // release_ring() fires on ring 0 → times out (in_flight stuck). + // The timeout increments drain_timeouts_ on `this`, but then the + // move-assignment overwrites it with `fresh.drain_timeouts()` (= 0) + // because the counter tracks the NEW ring's identity, not the old one. kickmsg::Subscriber fresh(region); // takes ring 1 sub = std::move(fresh); - // sub is alive and now owns ring 1. drain_timeouts should be 1 - // from the timed-out release of ring 0. - EXPECT_EQ(sub.drain_timeouts(), 1u); + // sub now owns ring 1, which has had zero drain timeouts. + EXPECT_EQ(sub.drain_timeouts(), 0u); + + // The timed-out release of ring 0 is observable through the health + // report: ring 0 is now Free with stale in_flight (retired ring). + auto report = region.diagnose(); + EXPECT_GE(report.retired_rings, 1u); - // Recovery + // Recovery. region.reset_retired_rings(); }