Skip to content

feat: Add Intent Bus distributed worker integration#20

Open
dsecurity49 wants to merge 6 commits into
syrin-labs:mainfrom
dsecurity49:feat/intent-bus-integration
Open

feat: Add Intent Bus distributed worker integration#20
dsecurity49 wants to merge 6 commits into
syrin-labs:mainfrom
dsecurity49:feat/intent-bus-integration

Conversation

@dsecurity49

@dsecurity49 dsecurity49 commented May 15, 2026

Copy link
Copy Markdown

Description

This PR adds an integration for Intent Bus, a zero-trust, decentralized dispatch and transport layer. As discussed in the Discord, this allows Syrin agents to be deployed as distributed workers across varied environments (like Termux/Android) without requiring open ports or complex infrastructure.

Key Features

  • At-Least-Once Delivery: Uses atomic claim semantics to ensure missions are never lost.
  • State Recovery: Integrates Syrin checkpoints with the Intent Bus KV store to allow missions to resume after worker crashes or network drops.
  • Telemetry Support: Includes a ChiasmObserver for real-time task tracking and observability.
  • Profile-Driven CLI: Supports JSON/YAML configuration profiles for easy node management.

Relevant Links

The integration includes a core adapter (intent_bus_syrin.py), a full worker implementation, a trigger CLI, and comprehensive documentation in TECHNICAL.md.

Summary by CodeRabbit

  • New Features

    • Intent Bus integration: distributed, zero-trust mission execution with a worker runtime, CLI dispatch, and interactive publish/trigger tools
  • Documentation

    • Added installation guide, technical specification, API reference, advanced examples, and updated integrations list
  • Examples

    • New example profiles and local-dev/demo workflows for summarization, research, and review scenarios
  • Tests

    • Added unit tests and a stress test for mission publishing
  • Chores

    • Gitignore updated for common dev artifacts and credential files

@coderabbitai

coderabbitai Bot commented May 15, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

This PR introduces Intent Bus, a zero-trust distributed execution layer for Syrin agents. It includes comprehensive documentation, a Python harness (IntentBusSyrinHarness) for claiming missions from an intent bus and running agents, CLI tools for dispatching missions, worker bootstrap logic with multi-provider LLM support, state persistence/recovery, optional dashboard telemetry, and framework tests with example profiles.

Changes

Intent Bus Syrin Integration

Layer / File(s) Summary
Documentation and Project Setup
README.md, intent_bus/.gitignore, intent_bus/INSTALL.md, intent_bus/README.md, intent_bus/TECHNICAL.md, intent_bus/examples/README.md
Root README registers the Intent Bus integration; project-level gitignore excludes Python artifacts, secrets, and local config; INSTALL.md documents prerequisites, dependencies, and startup; intent_bus README provides overview and quick-start; TECHNICAL.md specifies API reference and operational patterns; examples README walks through working workflow and multi-worker orchestration.
Core Framework Implementation
intent_bus/intent_bus_syrin.py
IntentBusSyrinHarness bridges WorkerRuntime callbacks to async mission handling, manages background asyncio event loop, and enforces mission timeouts. SyrinMissionContext wraps per-mission state/telemetry/heartbeat operations. ChiasmObserver sends HTTP POST/PATCH telemetry to dashboard. Error translation maps common exceptions (auth, rate limit, timeout, config) to user-facing messages. Observability configuration manages LiteLLM and intent-bus logger verbosity.
Worker Entrypoint and Agent Factory
intent_bus/worker.py
Bootstraps harness with CLI argument parsing (goal, namespace, model, capabilities, timeout, dashboard URL, profile). Loads optional JSON/YAML profiles for config overrides and --fast mode for testing. Resolves Intent Bus and provider-specific API keys from environment or home directory. Configures observability. Builds an agent factory that selects Syrin Model wrapper (Gemini/Google/Anthropic/OpenAI) by provider string and returns an Agent with the chosen model and system prompt. Instantiates harness with optional telemetry observer and starts serving the configured goal/namespace.
Mission Dispatch Interface
intent_bus/interface.py, intent_bus/trigger.py
trigger.py publishes a single CLI instruction to an intent queue (with optional profile override). interface.py provides an interactive REPL that repeatedly prompts for missions and publishes them. Both load API keys from ~/.apikey and support JSON/YAML profile loading for goal/namespace/model overrides.
Framework Testing
intent_bus/tests/stress_test.py, intent_bus/tests/test_framework.py
test_framework.py validates error translation, ChiasmObserver HTTP call sequence and payload, SyrinMissionContext telemetry emission, and JSON profile override behavior. stress_test.py load-tests the bus by publishing 10 concurrent missions with expected response validation.
Example Configuration Profiles
intent_bus/examples/profiles/local-dev.yaml, intent_bus/examples/profiles/researcher.json, intent_bus/examples/profiles/reviewer.json, intent_bus/examples/profiles/summarizer.json
Provides example profiles for local development, research workloads, reviewer workflows, and a summarizer sample used in examples.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A rabbit hops through intent queues so deep,
Syrin agents claim missions while workers sleep,
State persists and checkpoints hum in tune,
Dashboards blink bright beneath the moon,
Hooray for missions finished soon! 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.79% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: Add Intent Bus distributed worker integration' clearly and concisely describes the primary addition—an Intent Bus integration enabling distributed worker functionality. It directly reflects the main objective and changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (1)
intent_bus/tests/stress_test.py (1)

20-29: ⚡ Quick win

This stress test is sequential, not concurrent.

Line 20 publishes one mission at a time, so it doesn't exercise concurrent publish pressure as advertised. Use a small thread pool (or async client path) to actually fan out requests.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@intent_bus/tests/stress_test.py` around lines 20 - 29, The loop in the stress
test currently publishes missions sequentially (for i in range..., calling
client.publish inside the try block), so replace the sequential publish with
concurrent publishes by using a small thread pool or asyncio: e.g., submit
client.publish tasks for each payload (referencing the existing payload
construction and client.publish call) via ThreadPoolExecutor.map or gather async
tasks, then wait for all futures to complete; keep the same payload format
(including STATUS_GREEN_MISSION_{i}) and error handling around individual
publish calls to preserve logging and test semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@intent_bus/examples/README.md`:
- Line 48: The README line claiming “No two workers will ever process the same
mission” is incorrect given at-least-once delivery; update the sentence around
“Atomic Claims” and the example that mentions `trigger.py` to state that claims
are atomic (only one worker will hold the claim at a time) but that missions may
still be redelivered and processed more than once under retries or partial
failures (i.e., at-least-once semantics), and suggest deduplication or
idempotency as mitigation.
- Line 5: README references a missing profile examples/profiles/summarizer.json
which will cause a file-not-found error; fix by either adding the file with the
intended JSON content or updating the README to point to an existing profile.
Locate the references to examples/profiles/summarizer.json in
intent_bus/examples/README.md (lines showing the “Create a specific profile”
step and subsequent examples) and either: 1) create
examples/profiles/summarizer.json with the content shown in the README, or 2)
replace each occurrence with an existing profile filename such as
examples/profiles/researcher.json or examples/profiles/reviewer.json so the
walkthrough runs without errors.

In `@intent_bus/INSTALL.md`:
- Around line 61-63: The CLI arguments Markdown table under the "Worker CLI
Arguments" heading is not surrounded by blank lines (triggering MD058); edit the
INSTALL.md section containing the "## Worker CLI Arguments" heading and the
table (the pipe-delimited table block) to insert a blank line before the table
and a blank line after the table so the table is separated from surrounding
content.

In `@intent_bus/intent_bus_syrin.py`:
- Around line 223-251: The event loop records errors but keeps processing so a
later "run_end" can erroneously mark the mission successful; inside the
run_mission event handler (the block handling event.type == "error"), after
computing clean_err (via self._translate_error) and calling self._display and
updating event.data, short-circuit the mission by either setting a terminal
error state and breaking/returning from run_mission or raising an exception that
will be caught by the surrounding try/await (so asyncio.wait_for will fail);
ensure any observer/telemetry notifications (self._notify_observer_async,
ctx.emit_telemetry) and checkpoint persistence
(ctx.persist_state(agent.save_checkpoint())) still run if needed before exiting,
and ensure final_result cannot be left as a success value when an error has
occurred.
- Around line 40-48: Validate and restrict the request URL scheme before calling
urllib.request.urlopen in _send_request: parse self.url (and combined
f"{self.url}{endpoint}") and ensure the scheme is either "http" or "https",
raising/logging an error or returning None if not allowed; perform this check at
the start of the _send_request method so urlopen is never called with
unsupported schemes (reference _send_request, self.url, endpoint, and
urllib.request.urlopen).
- Around line 282-283: The shutdown code currently swallows all exceptions from
asyncio.run_coroutine_threadsafe(_cleanup(), self._loop).result(timeout=10)
which hides failures; replace the bare except with capturing the exception (e.g.
except Exception as e:) and log it at warning/error level including exception
context instead of passing — use the instance logger if available
(self._logger.warning(..., exc_info=e)) or fallback to
logging.getLogger(__name__).warning(..., exc_info=e) so cleanup failures (from
_cleanup(), bus close, loop stop) are recorded for diagnosis while still
permitting graceful shutdown.

In `@intent_bus/interface.py`:
- Around line 39-50: The REPL currently hardcodes target_goal and namespace when
packaging and publishing the intent (see payload, target_goal, and
client.publish), so wire the CLI flags --goal, --namespace and any profile
override into the REPL loop and use those variables instead of the literals;
replace the hardcoded target_goal = "gemma_test_mission" and namespace="default"
with the configured goal and namespace values (and apply any profile override to
select the goal/worker) so client.publish(goal=configured_goal, payload=payload,
namespace=configured_namespace) publishes to the chosen target.
- Around line 19-25: The startup exception handler currently swallows errors and
returns, leaving a zero exit code; update the except block that catches
exceptions from load_secret and IntentClient (symbols: load_secret,
IntentClient) to terminate the process with a non-zero exit (e.g., call
sys.exit(1) or re-raise as SystemExit) instead of returning; add an import for
sys if missing and keep the existing error print before exiting so shell callers
correctly observe failure on missing ~/.apikey or client init failures.

In `@intent_bus/tests/test_framework.py`:
- Around line 11-31: Add a tearDown method to the test class that stops the
background loop/thread started by IntentBusSyrinHarness in setUp: call the
harness shutdown method (e.g., self.harness.stop() or self.harness.shutdown() or
self.harness.close() if implemented) and join any worker thread (e.g.,
self.harness._thread.join(timeout)) to ensure it terminates; if no shutdown
method exists, stop the event loop safely via
self.harness.loop.call_soon_threadsafe(self.harness.loop.stop) and then join the
thread. Ensure the method handles missing attributes gracefully (try/except
AttributeError) so tests always clean up the harness.

In `@intent_bus/trigger.py`:
- Around line 71-72: The exception handler in the dispatch/publish code (the
except Exception as e block in intent_bus/trigger.py) only prints the error and
exits with code 0; change it to return a failing exit status by either
re-raising the exception or calling sys.exit(1). Specifically, update the except
block around the dispatch/publish logic (the block that currently does
print(f"Failed to dispatch: {e}")) to import sys if needed and call sys.exit(1)
after logging the error (or raise the caught exception) so callers get a nonzero
exit code on failure.
- Around line 47-49: The code uses profile = load_profile(args.profile) and
immediately calls profile.get(...), which will raise if load_profile returned a
non-dict (list, str, None); update the logic in trigger.py around load_profile
to validate that profile is an object/dict before using .get: if profile is not
a dict, raise or log a clear error mentioning args.profile, otherwise set
args.goal = profile.get("goal", args.goal) and args.namespace =
profile.get("namespace", args.namespace); reference the load_profile call and
the args.goal/args.namespace assignments so you modify the correct block.

In `@intent_bus/worker.py`:
- Around line 43-44: Validate the dashboard URL scheme before calling
urllib.request.Request/urllib.request.urlopen: parse args.dashboard (e.g., with
urllib.parse.urlparse), ensure parsed.scheme is either "http" or "https", and if
not, raise or log a clear error and skip the diagnostics probe; only construct
the Request and call urllib.request.urlopen when the scheme is valid.

---

Nitpick comments:
In `@intent_bus/tests/stress_test.py`:
- Around line 20-29: The loop in the stress test currently publishes missions
sequentially (for i in range..., calling client.publish inside the try block),
so replace the sequential publish with concurrent publishes by using a small
thread pool or asyncio: e.g., submit client.publish tasks for each payload
(referencing the existing payload construction and client.publish call) via
ThreadPoolExecutor.map or gather async tasks, then wait for all futures to
complete; keep the same payload format (including STATUS_GREEN_MISSION_{i}) and
error handling around individual publish calls to preserve logging and test
semantics.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ccf7dd3b-f2a2-479a-b215-60ddf5055c33

📥 Commits

Reviewing files that changed from the base of the PR and between 05a9399 and ceb7100.

📒 Files selected for processing (16)
  • README.md
  • intent_bus/.gitignore
  • intent_bus/INSTALL.md
  • intent_bus/README.md
  • intent_bus/TECHNICAL.md
  • intent_bus/examples/README.md
  • intent_bus/examples/profiles/local-dev.yaml
  • intent_bus/examples/profiles/researcher.json
  • intent_bus/examples/profiles/reviewer.json
  • intent_bus/intent_bus_syrin.py
  • intent_bus/interface.py
  • intent_bus/tests/__init__.py
  • intent_bus/tests/stress_test.py
  • intent_bus/tests/test_framework.py
  • intent_bus/trigger.py
  • intent_bus/worker.py

Comment thread intent_bus/examples/README.md
Comment thread intent_bus/examples/README.md Outdated
Comment thread intent_bus/INSTALL.md
Comment thread intent_bus/intent_bus_syrin.py
Comment thread intent_bus/intent_bus_syrin.py
Comment thread intent_bus/interface.py Outdated
Comment thread intent_bus/tests/test_framework.py
Comment thread intent_bus/trigger.py
Comment thread intent_bus/trigger.py
Comment thread intent_bus/worker.py Outdated
@dsecurity49 dsecurity49 reopened this May 16, 2026
- Fix profile validation logic (load file before checking dict)
- Add dashboard URL scheme validation (http/https only)
- Exit non-zero on startup failures
- Fail mission immediately after error event
- Log shutdown exceptions at WARNING level
- Fix interface.py YAML profile loading

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@intent_bus/interface.py`:
- Line 55: The inline single-line compound if statements (e.g., "if not bus_key:
raise ValueError(...)" in intent_bus/interface.py) should be expanded into
multi-line if blocks to satisfy Ruff E701; replace those one-liners with an
explicit if block that raises inside the body (same for the similar occurrences
around the shown 65–66 region), keeping the same condition and exception
type/message and preserving surrounding logic and indentation.

In `@intent_bus/worker.py`:
- Around line 140-153: The code does not validate chiasm_key when args.dashboard
is set, so ChiasmObserver may be constructed with an empty API key; add a
validation check before creating ChiasmObserver: when args.dashboard is truthy
ensure chiasm_key is present (similar to the existing bus_key/active_key checks)
and exit with a clear fatal message if missing; update the block that creates
ChiasmObserver (referencing chiasm_key, args.dashboard, and ChiasmObserver) to
only instantiate the observer after this validation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 47bb7f47-c8a8-4065-86a9-da4348a4d47d

📥 Commits

Reviewing files that changed from the base of the PR and between ceb7100 and 10ed95c.

📒 Files selected for processing (7)
  • intent_bus/examples/README.md
  • intent_bus/examples/profiles/summarizer.json
  • intent_bus/intent_bus_syrin.py
  • intent_bus/interface.py
  • intent_bus/tests/test_framework.py
  • intent_bus/trigger.py
  • intent_bus/worker.py
✅ Files skipped from review due to trivial changes (2)
  • intent_bus/examples/profiles/summarizer.json
  • intent_bus/examples/README.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • intent_bus/tests/test_framework.py

Comment thread intent_bus/interface.py Outdated
Comment thread intent_bus/worker.py

@dsecurity49 dsecurity49 left a comment

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
intent_bus/intent_bus_syrin.py (1)

319-339: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

error events bypass both telemetry and observer updates.

The break on Line 331 exits before the shared routing block on Lines 333-339, so failed missions never emit syrin_trace_error and never reach the dashboard observer through this path. That leaves the most important terminal state invisible unless the failure comes from the outer timeout/exception handler instead.

Suggested fix
                     if event.type == "error":
                         raw_err = event.data.get('msg', 'Unknown Error') if isinstance(event.data, dict) else str(event.data)
                         if not str(raw_err).strip(): 
                             raw_err = "Unknown Error"
                             
                         clean_err = self._translate_error(raw_err)
                         fatal_error_msg = clean_err
                         mission_failed = True

                         if isinstance(event.data, dict): 
                             event.data['msg'] = clean_err
+                        if event.type in self.telemetry_filter:
+                            await ctx.emit_telemetry(event.type, event.data)
+                        if self.observer:
+                            payload = event.data if isinstance(event.data, dict) else {"data": event.data}
+                            await self._notify_observer_async(ctx.mission_id, event.type, payload)
                         self._display(ctx.mission_id, "🔴 MISSION FAILED:", clean_err, "RED")
                         break  # Stop processing events after error
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@intent_bus/intent_bus_syrin.py` around lines 319 - 339, The error handling
block currently breaks before routing so error events skip telemetry and
observer updates; update the error branch in the loop that handles event.type ==
"error" (the code using _translate_error, _display and setting
fatal_error_msg/mission_failed) to first emit telemetry and notify observers
just like the shared routing code: call await ctx.emit_telemetry(event.type,
event.data) if event.type in self.telemetry_filter and call await
self._notify_observer_async(ctx.mission_id, event.type, payload) (building
payload the same way you do elsewhere) before performing the break, or
alternatively move the existing routing lines to execute for error events as
well; keep use of clean_err/event.data['msg'] and then break to stop further
processing.
intent_bus/examples/README.md (1)

5-24: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Clarify the working directory for these example commands.

worker.py / trigger.py and examples/profiles/summarizer.json are relative to different roots, and this page never says where to run them from. Copy-pasting from the repo root or from intent_bus/examples/ will fail on at least one of those paths. Please either add an explicit cd intent_bus step or rewrite the commands to use repo-root-safe paths.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@intent_bus/examples/README.md` around lines 5 - 24, The example commands
assume a working directory but don't state it; update the README examples to use
repo-root-safe paths or add an explicit cd instruction: either prepend paths
with intent_bus/ (e.g., run python intent_bus/worker.py --profile
intent_bus/examples/profiles/summarizer.json and python intent_bus/trigger.py
"..." --profile intent_bus/examples/profiles/summarizer.json) or add a step like
`cd intent_bus` before the worker.py/trigger.py commands so references to
worker.py, trigger.py, and examples/profiles/summarizer.json resolve correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@intent_bus/examples/README.md`:
- Around line 52-56: The README's recovery walkthrough references log messages
that the harness never emits—specifically '[[STATE]] Syncing state to KV
store...' and '🟡 [State Warning] Recovered previous checkpoint'—so update the
steps to reflect the actual behavior: remove or replace those exact strings in
the walkthrough and instead instruct users to confirm recovery by observing the
worker terminal for the harness's real checkpoint/restore log lines (or simply
verifying the agent resumes from its last progress after restart), and reword
steps 3–5 to describe killing the worker, restarting it, and checking that the
task resumes rather than expecting the non-existent log messages.

In `@intent_bus/intent_bus_syrin.py`:
- Around line 228-235: The three error-mapping if blocks using err_str in
intent_bus_syrin.py are ordered so the generic 400 match short-circuits more
specific diagnostics; move the specific "not provided"/"llm provider not" check
(the third if that returns "[Configuration Error] Invalid model syntax or
missing provider prefix.") above the generic 400/bad request branch (the first
if returning "[Configuration Error] Unknown, deprecated, or decommissioned model
specified.") or alternatively remove "400" and "badrequest" from the generic
branch so err_str is first tested for the provider-missing tokens before falling
back to the generic 400 handler; keep checks for upstream 5xx errors unchanged.
- Around line 422-425: The current try/except only calls self.shutdown() on
KeyboardInterrupt, which leaks resources on other exceptions; change the block
around runtime.listen(goal=goal, handler=runner_bridge, namespace=namespace,
full_envelope=True) to ensure teardown always runs by using a
try/except/finally: keep handling KeyboardInterrupt specially if needed (e.g.,
swallow or log) but move the self.shutdown() call into the finally clause so
shutdown() is invoked for successful exit, KeyboardInterrupt, and any other
exception from runtime.listen.

---

Outside diff comments:
In `@intent_bus/examples/README.md`:
- Around line 5-24: The example commands assume a working directory but don't
state it; update the README examples to use repo-root-safe paths or add an
explicit cd instruction: either prepend paths with intent_bus/ (e.g., run python
intent_bus/worker.py --profile intent_bus/examples/profiles/summarizer.json and
python intent_bus/trigger.py "..." --profile
intent_bus/examples/profiles/summarizer.json) or add a step like `cd intent_bus`
before the worker.py/trigger.py commands so references to worker.py, trigger.py,
and examples/profiles/summarizer.json resolve correctly.

In `@intent_bus/intent_bus_syrin.py`:
- Around line 319-339: The error handling block currently breaks before routing
so error events skip telemetry and observer updates; update the error branch in
the loop that handles event.type == "error" (the code using _translate_error,
_display and setting fatal_error_msg/mission_failed) to first emit telemetry and
notify observers just like the shared routing code: call await
ctx.emit_telemetry(event.type, event.data) if event.type in
self.telemetry_filter and call await self._notify_observer_async(ctx.mission_id,
event.type, payload) (building payload the same way you do elsewhere) before
performing the break, or alternatively move the existing routing lines to
execute for error events as well; keep use of clean_err/event.data['msg'] and
then break to stop further processing.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: adf90102-df8b-47e1-aeda-dd75c5088f88

📥 Commits

Reviewing files that changed from the base of the PR and between 10ed95c and 40a2ff9.

📒 Files selected for processing (8)
  • intent_bus/INSTALL.md
  • intent_bus/README.md
  • intent_bus/TECHNICAL.md
  • intent_bus/examples/README.md
  • intent_bus/examples/profiles/reviewer.json
  • intent_bus/intent_bus_syrin.py
  • intent_bus/interface.py
  • intent_bus/worker.py
✅ Files skipped from review due to trivial changes (2)
  • intent_bus/README.md
  • intent_bus/TECHNICAL.md
🚧 Files skipped from review as they are similar to previous changes (3)
  • intent_bus/INSTALL.md
  • intent_bus/interface.py
  • intent_bus/worker.py

Comment on lines +52 to +56
1. Dispatch a long mission (e.g., writing a 10-page essay).
2. Look for `[[STATE]] Syncing state to KV store...` in the worker terminal.
3. **Kill the worker** (`Ctrl+C`). The Intent Bus drops the claim after 60 seconds.
4. Restart the worker.
5. Watch the log: `[🟡 [State Warning] Recovered previous checkpoint]`. The agent resumes writing from where it was killed rather than starting over.

@coderabbitai coderabbitai Bot Jun 1, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

The recovery walkthrough points to log messages the harness does not emit.

The implementation never prints [[STATE]] Syncing state to KV store..., and 🟡 [State Warning] is only shown on recovery failures, not on a successful checkpoint restore. As written, users cannot validate the flow by watching for the messages this guide tells them to expect.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@intent_bus/examples/README.md` around lines 52 - 56, The README's recovery
walkthrough references log messages that the harness never emits—specifically
'[[STATE]] Syncing state to KV store...' and '🟡 [State Warning] Recovered
previous checkpoint'—so update the steps to reflect the actual behavior: remove
or replace those exact strings in the walkthrough and instead instruct users to
confirm recovery by observing the worker terminal for the harness's real
checkpoint/restore log lines (or simply verifying the agent resumes from its
last progress after restart), and reword steps 3–5 to describe killing the
worker, restarting it, and checking that the task resumes rather than expecting
the non-existent log messages.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix it

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n "STATE\|checkpoint\|recover\|persist\|KV\|Syncing\|Warning\|Recovered" intent_bus/intent_bus_syrin.py

Length of output: 362


🏁 Script executed:

#!/bin/bash
# Search for all logging/print statements related to state in the harness
rg -n "print\|log\|logger\|logging\|STATE\|state\|checkpoint\|recover" intent_bus/intent_bus_syrin.py | head -80

Length of output: 441


🏁 Script executed:

#!/bin/bash
fd intent_bus_syrin.py
cat intent_bus/intent_bus_syrin.py 2>/dev/null || echo "File not found at that path"
find . -name "intent_bus_syrin.py" 2>/dev/null

Length of output: 19036



Now I have the full picture. Here's what the code actually does:

  • State persistence (step 2): persist_state() is called silently when a checkpoint_ready event fires — there is no [[STATE]] Syncing state to KV store... print anywhere.
  • State recovery (step 5): Successful recovery is completely silent. 🟡 [State Warning] only appears when recover_state() throws an exception, not on a successful restore.

Fix the two incorrect log-message references in the State Recovery Walkthrough:

  • Step 2: remove the nonexistent [[STATE]] log string; tell users state is persisted silently.
  • Step 5: remove the nonexistent 🟡 [State Warning] success log; tell users to observe that the agent continues from its last progress point rather than restarting.

Failed to handle agent chat message. Please try again.

Comment on lines +228 to +235
if any(k in err_str for k in ["400", "bad request", "badrequest", "decommissioned", "not found", "does not exist", "invalid_request_error"]):
return "[Configuration Error] Unknown, deprecated, or decommissioned model specified. (Non-retryable)"

if any(k in err_str for k in ["500", "502", "503", "504", "server error", "overloaded", "busy"]):
return "[Upstream Provider Error] The model server is currently overloaded or down."

if any(k in err_str for k in ["400", "badrequest", "not provided", "llm provider not"]):
return "[Configuration Error] Invalid model syntax or missing provider prefix."

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reorder the 400 classifiers to avoid misleading error messages.

The first 400 / badrequest branch returns immediately, so messages like “provider not provided” never reach the more specific mapping below. That turns a fixable model-prefix/configuration mistake into a misleading “unknown or decommissioned model” error.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@intent_bus/intent_bus_syrin.py` around lines 228 - 235, The three
error-mapping if blocks using err_str in intent_bus_syrin.py are ordered so the
generic 400 match short-circuits more specific diagnostics; move the specific
"not provided"/"llm provider not" check (the third if that returns
"[Configuration Error] Invalid model syntax or missing provider prefix.") above
the generic 400/bad request branch (the first if returning "[Configuration
Error] Unknown, deprecated, or decommissioned model specified.") or
alternatively remove "400" and "badrequest" from the generic branch so err_str
is first tested for the provider-missing tokens before falling back to the
generic 400 handler; keep checks for upstream 5xx errors unchanged.

Comment on lines +422 to +425
try:
runtime.listen(goal=goal, handler=runner_bridge, namespace=namespace, full_envelope=True)
except KeyboardInterrupt:
self.shutdown()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Always call shutdown() when listen() exits.

Right now cleanup only runs on KeyboardInterrupt. If WorkerRuntime.listen(...) raises any other exception, the daemon event loop thread and bus resources are left behind. Move teardown into a finally so failed startups and runtime errors do not leak the harness.

Suggested fix
-        try: 
-            runtime.listen(goal=goal, handler=runner_bridge, namespace=namespace, full_envelope=True)
-        except KeyboardInterrupt: 
-            self.shutdown()
+        try:
+            runtime.listen(goal=goal, handler=runner_bridge, namespace=namespace, full_envelope=True)
+        except KeyboardInterrupt:
+            pass
+        finally:
+            self.shutdown()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@intent_bus/intent_bus_syrin.py` around lines 422 - 425, The current
try/except only calls self.shutdown() on KeyboardInterrupt, which leaks
resources on other exceptions; change the block around runtime.listen(goal=goal,
handler=runner_bridge, namespace=namespace, full_envelope=True) to ensure
teardown always runs by using a try/except/finally: keep handling
KeyboardInterrupt specially if needed (e.g., swallow or log) but move the
self.shutdown() call into the finally clause so shutdown() is invoked for
successful exit, KeyboardInterrupt, and any other exception from runtime.listen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant