Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,76 @@ jobs:
- name: Verify ARM binaries
run: file build_arm/kickmsg_unit build_arm/kickmsg_stress_test | grep "ARM aarch64"
shell: bash

# Build Python wheels across the supported platforms. Runs on every
# push so wheel-build regressions surface immediately; only the tagged
# release job below actually uploads to PyPI.
build-wheels:
name: Build Python Wheels (${{ matrix.os }})
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-24.04, ubuntu-24.04-arm, macos-14]
env:
# `native`: the runner's own architecture. Linux x86_64 runner →
# manylinux_x86_64 wheels; Linux ARM runner → manylinux_aarch64;
# macOS 14 runner (Apple Silicon) → universal2 via the macOS cibw
# config in pyproject.toml.
CIBW_ARCHS: native
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # so tools/setup/version.sh can read tags

- name: Build wheels
uses: pypa/cibuildwheel@v3.1.0
with:
output-dir: wheelhouse

- uses: actions/upload-artifact@v4
with:
name: wheelhouse-kickmsg-${{ matrix.os }}
path: wheelhouse/*.whl

# Publish to PyPI on tag push. Uses OIDC trusted publishing (configured
# per-repo at https://pypi.org/manage/account/publishing/ — the `kickmsg`
# PyPI project must list this repository + environment `maintainer` as
# a trusted publisher). No API tokens stored in GitHub secrets.
release:
needs: [build, cross_arm64, build-wheels]
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-24.04
environment:
name: maintainer
permissions:
contents: write
id-token: write
steps:
- uses: actions/checkout@v4

- name: Download wheels
uses: actions/download-artifact@v4
with:
pattern: wheelhouse-kickmsg-*
path: wheelhouse
merge-multiple: true

- name: Create GitHub release
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
tag: ${{ github.ref_name }}
run: |
gh release create "$tag" wheelhouse/*.whl \
--repo="$GITHUB_REPOSITORY" \
--title="${GITHUB_REPOSITORY#*/} ${tag#v}" \
--generate-notes

- name: Publish wheels to PyPI
uses: pypa/gh-action-pypi-publish@v1.12.4
with:
packages-dir: wheelhouse
verify-metadata: true
attestations: true
print-hash: true
skip-existing: true
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ build*/
compile_commands.json
.clangd
__pycache__/
*.egg-info/
.pytest_cache/
wheelhouse/
dist/
42 changes: 31 additions & 11 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,16 +678,30 @@ After refcount pre-set, during Refcount was set to max_subs but
are never released. The slot is
permanently leaked.

After CAS on write_pos, before Entry is uncommitted (sequence
sequence store (the dangerous never written). Next publisher at
window) the same ring position waits up to
commit_timeout, then overwrites
the entry with its own data. The
ring entry itself is NOT leaked
(it is overwritten). The pool slot
After CAS on write_pos, before Two sub-cases depending on whether
sequence store (the dangerous the publisher reached the CAS lock:
window)
Case A — crash after CAS lock
(entry stuck at LOCKED_SEQUENCE):
Next publisher at this position
waits commit_timeout and drops.
repair_locked_entries() advances
the entry to the expected sequence.

Case B — crash before CAS lock
(entry still at the previous
cycle's committed sequence):
Next publisher at this position
also waits commit_timeout and drops.
repair_locked_entries() detects the
entry is more than one full wrap
behind and advances it.

In both cases, the pool slot
referenced by the crashed entry
cannot be safely released (slot_idx
may be garbage), so it is leaked.
may be garbage — it is marked
INVALID_SLOT by the repair, and
recovered by reclaim_orphaned_slots.
Subscriber sees a gap (lost msg).

After sequence store No issue. Entry is committed.
Expand Down Expand Up @@ -715,8 +729,14 @@ publisher is mid-commit on that entry and will release shortly.

On timeout (returns `INVALID_SLOT`), the publisher:
1. Skips `release_slot()` (the old `slot_idx` may be garbage)
2. Overwrites the entry with its own data via the two-phase commit
3. The ring resumes normal operation
2. Attempts the CAS lock — if it fails, the publisher **self-repairs**
the stuck entry in place (stores `INVALID_SLOT` + the expected
sequence) so the next publisher at this position succeeds without
paying the timeout. Self-repair handles both Case A (LOCKED) and
Case B (stale), costs ~10 ns on top of the already-spent timeout,
and is safe under live traffic (idempotent stores).
3. Drops delivery for this ring and moves to the next subscriber ring.
The ring resumes normal operation on the next wrap.

The timeout is configurable per channel via `channel::Config::commit_timeout`
(default: 100 ms). The tradeoff:
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,8 @@ if(BUILD_BENCHMARKS)
find_package(benchmark REQUIRED CONFIG)
add_subdirectory(benchmarks)
endif()

# --- Python bindings (built only when invoked via scikit-build-core) ---
if(SKBUILD)
add_subdirectory(py_bindings)
endif()
3 changes: 3 additions & 0 deletions cmake/toolchain.cmake.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ set(CMAKE_SYSTEM_NAME @SYSTEM_NAME@)

set(CMAKE_C_COMPILER @BINARY_PATH_CC@)
set(CMAKE_CXX_COMPILER @BINARY_PATH_CXX@)

set(CMAKE_C_FLAGS_INIT "@ARCH_FLAGS@")
set(CMAKE_CXX_FLAGS_INIT "@ARCH_FLAGS@")
86 changes: 86 additions & 0 deletions examples/python/hello_camera_zerocopy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Zero-copy camera-frame publishing — the use case the Python bindings'
AllocatedSlot / SampleView buffer-protocol path was designed for.

Shows how to:
1. Reserve a slot directly in shared memory via Publisher.allocate(size),
then obtain a writable memoryview with `memoryview(slot)` and fill it
in-place (no intermediate buffer). In a real pipeline you'd:
- numpy.copyto(np.asarray(slot).reshape(H, W, 3), frame), or
- DMA from a V4L2 buffer into the slot, or
- render directly into the slot.
Then call `slot.publish()` to commit.
2. Receive it zero-copy on the other side via Subscriber.try_receive_view
and `memoryview(view)` — read-only, no copy either. The memoryview
pins the SampleView alive so the slot stays valid until every
consumer releases the view.

For a real camera, swap `fill_frame` for your actual capture.
"""

from __future__ import annotations

import kickmsg


def fill_frame(buf: memoryview, height: int, width: int) -> None:
"""Stand-in for a real capture — writes a cheap pattern."""
stride = width * 3
for row in range(height):
for col in range(width):
off = row * stride + col * 3
buf[off] = row & 0xFF # R
buf[off + 1] = col & 0xFF # G
buf[off + 2] = (row + col) & 0xFF # B


def main() -> int:
width, height = 320, 240
frame_bytes = width * height * 3 # RGB

cfg = kickmsg.Config()
cfg.max_subscribers = 2
cfg.sub_ring_capacity = 4
cfg.pool_size = 8
cfg.max_payload_size = frame_bytes

camera = kickmsg.Node("camera", "demo_cam")
viewer = kickmsg.Node("viewer", "demo_cam")
camera.unlink_topic("frames")

pub = camera.advertise("frames", cfg)
sub = viewer.subscribe("frames")

for i in range(3):
# Zero-copy capture: reserve a slot, write directly into SHM via
# a memoryview, publish. Nothing is copied on the publish side.
slot = pub.allocate(frame_bytes)
if slot is None:
print(f"frame {i}: pool exhausted, dropping")
continue
fill_frame(memoryview(slot), height, width)
slot.publish()
print(f"Published frame {i} ({width}x{height} RGB, {frame_bytes} B zero-copy)")

for i in range(3):
view = sub.try_receive_view()
if view is None:
break
# `with` releases the pin on block exit, even on exception.
# Without this, the slot stays pinned until the SampleView and
# every derived memoryview are GC'd — the pool could run dry.
with view:
mv = memoryview(view) # read-only, zero-copy
print(f"Received frame {i}: {len(view)} B, "
f"pixel[0,0]=RGB({mv[0]},{mv[1]},{mv[2]}), "
f"pixel[{height-1},{width-1}]="
f"RGB({mv[(height-1)*width*3 + (width-1)*3]},"
f"{mv[(height-1)*width*3 + (width-1)*3 + 1]},"
f"{mv[(height-1)*width*3 + (width-1)*3 + 2]})")

camera.unlink_topic("frames")
print("Done.")
return 0


if __name__ == "__main__":
raise SystemExit(main())
38 changes: 38 additions & 0 deletions examples/python/hello_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Basic pub/sub via the Node API — Python counterpart of examples/hello_pubsub.cc."""

from __future__ import annotations

import struct

import kickmsg


def main() -> int:
prefix = "demo_py"
topic = "temperature"

sensor = kickmsg.Node("sensor", prefix)
display = kickmsg.Node("display", prefix)

# Clean any leftover SHM from prior runs.
sensor.unlink_topic(topic)

pub = sensor.advertise(topic)
sub = display.subscribe(topic)

readings = [(1, 22.5), (2, 19.8), (1, 23.1), (3, 31.4), (2, 20.0)]
for sensor_id, celsius in readings:
# Pack as (uint32 id, float celsius) — same layout as the C++ example.
pub.send(struct.pack("<If", sensor_id, celsius))

while (sample := sub.try_receive()) is not None:
sensor_id, celsius = struct.unpack("<If", sample)
print(f"Sensor {sensor_id}: {celsius:.1f} C")

sensor.unlink_topic(topic)
print("Done.")
return 0


if __name__ == "__main__":
raise SystemExit(main())
63 changes: 63 additions & 0 deletions examples/python/hello_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Schema descriptor in Python — counterpart of examples/hello_schema.cc."""

from __future__ import annotations

import kickmsg


def make_imu_schema(version: int) -> kickmsg.SchemaInfo:
info = kickmsg.SchemaInfo()
info.identity = kickmsg.hash.identity_from_fnv1a(
"demo.Imu(timestamp_ns:u64, ax:f32, ay:f32, az:f32)"
)
info.name = "demo/Imu"
info.version = version
info.identity_algo = 1 # user tag — e.g. 1 = FNV-1a-64
return info


def main() -> int:
prefix = "demo_py_schema"
topic = "imu"

# Publisher node bakes a v2 schema into the region at create time.
pub_node = kickmsg.Node("driver", prefix)
pub_node.unlink_topic(topic)

cfg = kickmsg.Config()
cfg.max_subscribers = 4
cfg.sub_ring_capacity = 8
cfg.pool_size = 16
cfg.max_payload_size = 32
cfg.schema = make_imu_schema(version=2)

_ = pub_node.advertise(topic, cfg)
print("[driver] advertised '%s' with schema %s v%d" % (
topic, cfg.schema.name, cfg.schema.version))

# Good subscriber: expects v2 → matches → proceeds.
good = kickmsg.Node("good_sub", prefix)
_ = good.subscribe(topic)
got = good.topic_schema(topic)
expected = make_imu_schema(version=2)
d = kickmsg.schema.diff(got, expected)
print("[good_sub] observed %s v%d — diff=0x%x %s" % (
got.name, got.version, d, "OK" if d == kickmsg.schema.Diff.Equal else "MISMATCH"))

# Bad subscriber: expects v1 → diff reports Version mismatch → refuses.
bad = kickmsg.Node("bad_sub", prefix)
_ = bad.subscribe(topic)
got = bad.topic_schema(topic)
expected_v1 = make_imu_schema(version=1)
d = kickmsg.schema.diff(got, expected_v1)
if d & kickmsg.schema.Diff.Version:
print("[bad_sub] version mismatch (observed v%d, expected v1) — refusing"
% got.version)

pub_node.unlink_topic(topic)
print("Done.")
return 0


if __name__ == "__main__":
raise SystemExit(main())
Loading
Loading