Skip to content

Commit 90f2a7d

Browse files
committed
Add Redis integration for BabelQueue with Python 3.12 support
1 parent bdeda0e commit 90f2a7d

10 files changed

Lines changed: 638 additions & 13 deletions

File tree

.github/workflows/ci.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,34 @@ jobs:
3131
3232
- name: Run tests
3333
run: pytest
34+
35+
integration:
36+
name: Redis integration
37+
runs-on: ubuntu-latest
38+
services:
39+
redis:
40+
image: redis:7
41+
ports:
42+
- 6379:6379
43+
options: >-
44+
--health-cmd "redis-cli ping"
45+
--health-interval 5s
46+
--health-timeout 3s
47+
--health-retries 10
48+
steps:
49+
- uses: actions/checkout@v4
50+
51+
- name: Setup Python
52+
uses: actions/setup-python@v5
53+
with:
54+
python-version: '3.12'
55+
56+
- name: Install (with redis extra)
57+
run: |
58+
python -m pip install --upgrade pip
59+
pip install -e ".[redis,dev]"
60+
61+
- name: Run tests (Redis transport included)
62+
env:
63+
BABELQUEUE_TEST_REDIS: redis://localhost:6379/0
64+
run: pytest

CHANGELOG.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@ The envelope wire format is versioned separately by `meta.schema_version`
99

1010
## [Unreleased]
1111

12+
### Added
13+
- **Runtime**`BabelQueue(broker_url=...)` app with a `@app.handler("urn:...")`
14+
decorator, `publish()`, and a `consume()` / `run()` loop. Routes by URN over the
15+
canonical envelope; `attempts`-based retry → opt-in dead-letter queue;
16+
`on_unknown_urn` strategies (`fail`/`delete`/`release`/`dead_letter`).
17+
- **Transports** — a pluggable `Transport` abstraction with `InMemoryTransport`
18+
(`memory://`, for tests/local) and `RedisTransport` (`redis://`, reliable-queue
19+
pattern via `BLMOVE` + a processing list). Redis client is an optional extra
20+
(`pip install "babelqueue[redis]"`), imported lazily — the core stays zero-dep.
21+
22+
## [0.1.0] - 2026-06-06
23+
1224
### Added
1325
- `EnvelopeCodec` — builds (`make`, `from_message`), encodes and decodes the
1426
canonical `{job, trace_id, data, meta, attempts}` envelope (`schema_version` 1).
@@ -22,9 +34,7 @@ The envelope wire format is versioned separately by `meta.schema_version`
2234

2335
### Notes
2436
- Pre-1.0: the public API may change before the `1.0.0` tag.
25-
- **Zero runtime dependencies** (standard library only). Requires Python `>=3.9`.
26-
- This is the framework-agnostic **core**. The broker runtime
27-
(`BabelQueue(broker_url=...)` + `@app.handler`, over `redis`/`pika`) and the
28-
Celery/Django adapters are planned next iterations, built on this core.
37+
- The core has **zero runtime dependencies** (standard library only); Python `>=3.9`.
2938

30-
[Unreleased]: https://github.com/BabelQueue/babelqueue-python/commits/main
39+
[Unreleased]: https://github.com/BabelQueue/babelqueue-python/compare/v0.1.0...HEAD
40+
[0.1.0]: https://github.com/BabelQueue/babelqueue-python/releases/tag/v0.1.0

README.md

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,48 @@ dlq = dead_letter.annotate(envelope, "failed", "orders", attempts=3, error="boom
8080
# publish `EnvelopeCodec.encode(dlq)` to the "orders.dlq" queue
8181
```
8282

83-
## What's here vs. coming
83+
## Runtime — produce & consume
8484

85-
- **Now (this package):** the codec, contracts, dead-letter and unknown-URN
86-
helpers, plus the shared conformance fixtures. Bring your own broker client.
87-
- **Next (planned):** a built-in runtime — `BabelQueue(broker_url=...)` with an
88-
`@app.handler("urn:…")` decorator over `redis`/`pika` — and **Celery** / **Django**
89-
adapters. Install via extras (`babelqueue[redis]`, `babelqueue[celery]`, …).
85+
For an end-to-end app, use `BabelQueue` with a broker. Redis support comes via an
86+
extra:
87+
88+
```bash
89+
pip install "babelqueue[redis]"
90+
```
91+
92+
```python
93+
from babelqueue import BabelQueue
94+
95+
app = BabelQueue("redis://localhost:6379/0", queue="orders")
96+
97+
@app.handler("urn:babel:orders:created")
98+
def on_order_created(data, meta): # AI/ML, data processing, anything
99+
print("order", data["order_id"])
100+
101+
# producer (any service, any language) …
102+
app.publish("urn:babel:orders:created", {"order_id": 1042})
103+
104+
# worker
105+
app.run() # consume forever (Ctrl-C to stop)
106+
```
107+
108+
- **Routing** is by URN; the wire format is the canonical envelope, so this
109+
consumes messages produced by *any* BabelQueue SDK.
110+
- **Handlers** receive `(data, meta)`, or `(data, meta, message)` to get the full
111+
envelope (incl. `trace_id`).
112+
- **Retry & dead-letter:** failures are retried up to `max_attempts` (bumping the
113+
envelope's `attempts`); enable `dead_letter=True` to quarantine exhausted
114+
messages on `<queue>.dlq`. `on_unknown_urn` = `fail` | `delete` | `release` | `dead_letter`.
115+
- **Transports:** `redis://` (reliable-queue pattern) and `memory://` (in-process,
116+
great for tests/local). Bring your own by passing `transport=...`.
117+
118+
> RabbitMQ (`pika`) and **Celery** / **Django** adapters are the next iterations.
119+
120+
## What's here
121+
122+
The codec/contracts/dead-letter (zero-dep core) **and** the `BabelQueue` runtime
123+
above (in-memory built in; Redis via the `[redis]` extra). For framework
124+
integration, the Celery and Django adapters are planned.
90125

91126
## Testing
92127

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "0.1.0"
7+
version = "0.2.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"

src/babelqueue/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,26 @@
1212
from __future__ import annotations
1313

1414
from . import dead_letter
15+
from .app import BabelQueue
1516
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
1617
from .contracts import HasTraceId, PolyglotMessage
1718
from .exceptions import BabelQueueError, UnknownUrnError
1819
from .routing import UnknownUrnStrategy
20+
from .transport import InMemoryTransport, ReceivedMessage, Transport
1921

20-
__version__ = "0.1.0"
22+
__version__ = "0.2.0"
2123

2224
__all__ = [
25+
"BabelQueue",
2326
"EnvelopeCodec",
2427
"SCHEMA_VERSION",
2528
"SOURCE_LANG",
2629
"PolyglotMessage",
2730
"HasTraceId",
2831
"UnknownUrnStrategy",
32+
"Transport",
33+
"InMemoryTransport",
34+
"ReceivedMessage",
2935
"BabelQueueError",
3036
"UnknownUrnError",
3137
"dead_letter",

src/babelqueue/app.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
"""The BabelQueue runtime: produce and consume polyglot messages.
2+
3+
from babelqueue import BabelQueue
4+
5+
app = BabelQueue("redis://localhost:6379/0", queue="orders")
6+
7+
@app.handler("urn:babel:orders:created")
8+
def on_order_created(data, meta):
9+
... # AI/ML, data processing, anything
10+
11+
app.publish("urn:babel:orders:created", {"order_id": 1042})
12+
app.run() # consume forever
13+
14+
Routing is by URN; the wire format is the canonical envelope (shared core codec),
15+
so this interoperates with the PHP/Laravel, Symfony, Go, ... SDKs. Retry uses the
16+
top-level ``attempts`` counter; failures past ``max_attempts`` go to a dead-letter
17+
queue when enabled.
18+
"""
19+
20+
from __future__ import annotations
21+
22+
import inspect
23+
from typing import Any, Callable, Dict, Mapping, Optional
24+
25+
from . import dead_letter
26+
from .codec import EnvelopeCodec
27+
from .exceptions import UnknownUrnError
28+
from .routing import UnknownUrnStrategy
29+
from .transport import ReceivedMessage, Transport, make_transport
30+
31+
Handler = Callable[..., None]
32+
33+
34+
class BabelQueue:
35+
def __init__(
36+
self,
37+
broker_url: str = "memory://",
38+
*,
39+
transport: Optional[Transport] = None,
40+
queue: str = "default",
41+
on_unknown_urn: str = UnknownUrnStrategy.FAIL,
42+
max_attempts: int = 3,
43+
dead_letter: bool = False,
44+
dead_letter_queue: Optional[str] = None,
45+
dead_letter_suffix: str = ".dlq",
46+
) -> None:
47+
self.transport = transport if transport is not None else make_transport(broker_url)
48+
self.queue = queue
49+
self.on_unknown_urn = on_unknown_urn
50+
self.max_attempts = max_attempts
51+
self.dead_letter_enabled = bool(dead_letter)
52+
self.dead_letter_queue = dead_letter_queue
53+
self.dead_letter_suffix = dead_letter_suffix
54+
self._handlers: Dict[str, Handler] = {}
55+
56+
# -- Produce ------------------------------------------------------------
57+
58+
def publish(
59+
self,
60+
urn: str,
61+
data: Mapping[str, Any],
62+
*,
63+
queue: Optional[str] = None,
64+
trace_id: Optional[str] = None,
65+
) -> str:
66+
"""Publish a message; returns its id (``meta.id``)."""
67+
target = queue or self.queue
68+
envelope = EnvelopeCodec.make(urn, data, queue=target, trace_id=trace_id)
69+
self.transport.publish(target, EnvelopeCodec.encode(envelope))
70+
return envelope["meta"]["id"]
71+
72+
# -- Register handlers --------------------------------------------------
73+
74+
def handler(self, urn: str) -> Callable[[Handler], Handler]:
75+
"""Decorator: register ``fn`` as the handler for ``urn``."""
76+
77+
def decorator(fn: Handler) -> Handler:
78+
self._handlers[urn] = fn
79+
return fn
80+
81+
return decorator
82+
83+
def register(self, urn: str, fn: Handler) -> None:
84+
self._handlers[urn] = fn
85+
86+
# -- Consume ------------------------------------------------------------
87+
88+
def consume(
89+
self,
90+
queue: Optional[str] = None,
91+
*,
92+
max_messages: Optional[int] = None,
93+
timeout: float = 1.0,
94+
) -> int:
95+
"""Consume messages until interrupted (or ``max_messages`` processed).
96+
97+
Returns the number of messages processed. With ``max_messages`` set, the
98+
loop stops once that many are handled or the queue drains within ``timeout``.
99+
"""
100+
target = queue or self.queue
101+
processed = 0
102+
try:
103+
while max_messages is None or processed < max_messages:
104+
received = self.transport.pop(target, timeout=timeout)
105+
if received is None:
106+
if max_messages is not None:
107+
break
108+
continue
109+
self.dispatch(received)
110+
processed += 1
111+
except KeyboardInterrupt: # pragma: no cover - graceful Ctrl-C
112+
pass
113+
return processed
114+
115+
run = consume
116+
117+
def dispatch(self, received: ReceivedMessage) -> None:
118+
"""Route one reserved message to its handler and acknowledge it."""
119+
envelope = EnvelopeCodec.decode(received.body)
120+
urn = str(envelope.get("job") or envelope.get("urn") or "")
121+
handler = self._handlers.get(urn) if urn else None
122+
123+
try:
124+
if handler is None:
125+
self._route_unknown(urn, received, envelope)
126+
return
127+
self._invoke(handler, envelope)
128+
self.transport.ack(received)
129+
except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop
130+
self._retry_or_dead_letter(received, envelope, exc)
131+
132+
# -- Internals ----------------------------------------------------------
133+
134+
def _invoke(self, handler: Handler, envelope: Mapping[str, Any]) -> None:
135+
data = dict(envelope.get("data") or {})
136+
meta = dict(envelope.get("meta") or {})
137+
if _handler_wants_envelope(handler):
138+
handler(data, meta, dict(envelope))
139+
else:
140+
handler(data, meta)
141+
142+
def _route_unknown(self, urn: str, received: ReceivedMessage, envelope: Mapping[str, Any]) -> None:
143+
strategy = self.on_unknown_urn
144+
if strategy == UnknownUrnStrategy.DELETE:
145+
self.transport.ack(received)
146+
return
147+
if strategy == UnknownUrnStrategy.RELEASE:
148+
self.transport.publish(received.queue, received.body)
149+
self.transport.ack(received)
150+
return
151+
if strategy == UnknownUrnStrategy.DEAD_LETTER:
152+
self._dead_letter(received, dict(envelope), "unknown_urn", None)
153+
return
154+
# FAIL — surfaced through the retry/dead-letter path (never kills the loop).
155+
raise UnknownUrnError(
156+
f"No handler mapped for URN [{urn or '(empty)'}]."
157+
)
158+
159+
def _retry_or_dead_letter(
160+
self, received: ReceivedMessage, envelope: Dict[str, Any], exc: BaseException
161+
) -> None:
162+
attempts = int(envelope.get("attempts", 0)) + 1
163+
envelope["attempts"] = attempts
164+
165+
if attempts < self.max_attempts:
166+
self.transport.publish(received.queue, EnvelopeCodec.encode(envelope))
167+
self.transport.ack(received)
168+
return
169+
170+
if self.dead_letter_enabled:
171+
reason = "unknown_urn" if isinstance(exc, UnknownUrnError) else "failed"
172+
self._dead_letter(received, envelope, reason, exc)
173+
return
174+
175+
# Retries exhausted, no DLQ configured — drop it (ack so it leaves the queue).
176+
self.transport.ack(received)
177+
178+
def _dead_letter(
179+
self,
180+
received: ReceivedMessage,
181+
envelope: Dict[str, Any],
182+
reason: str,
183+
exc: Optional[BaseException],
184+
) -> None:
185+
original_queue = str((envelope.get("meta") or {}).get("queue") or received.queue)
186+
annotated = dead_letter.annotate(
187+
envelope,
188+
reason,
189+
original_queue,
190+
int(envelope.get("attempts", 0)),
191+
error=(str(exc) if exc is not None else None),
192+
exception=(type(exc).__name__ if exc is not None else None),
193+
)
194+
target = self.dead_letter_queue or (received.queue + self.dead_letter_suffix)
195+
self.transport.publish(target, EnvelopeCodec.encode(annotated))
196+
self.transport.ack(received)
197+
198+
199+
def _handler_wants_envelope(fn: Handler) -> bool:
200+
"""True if the handler takes a 3rd positional arg (the full envelope)."""
201+
try:
202+
params = list(inspect.signature(fn).parameters.values())
203+
except (TypeError, ValueError): # pragma: no cover - builtins/C callables
204+
return False
205+
positional = [
206+
p for p in params
207+
if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
208+
]
209+
has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params)
210+
return has_varargs or len(positional) >= 3

0 commit comments

Comments
 (0)