feat: SSE support in provider#975
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughIntroduces Server-Sent Events (SSE) streaming as a refresh strategy for the Python provider SDK and implements SSE broadcasting infrastructure in the Rust backend. Changes add Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Python Client
participant Provider as LocalResolutionProvider
participant SSEServer as SSE Stream Endpoint
participant AppState as AppState/SSE Broadcaster
Client->>Provider: initialize(SseStrategy)
Provider->>Provider: start SSE background loop
Provider->>SSEServer: connect via aiohttp
SSEServer->>AppState: subscribe_sse(schema_name)
AppState->>SSEServer: Receiver<()>
SSEServer-->>Client: SSE stream opened (keepalive pings)
Note over AppState: Config change triggers
AppState->>AppState: notify_change() called
AppState->>AppState: broadcast SSE signal
AppState-->>SSEServer: watch::Receiver detects change
SSEServer-->>Client: emit config_change event
Client->>Provider: SSE event received
Provider->>Provider: refresh() called
Provider->>Provider: on_config_change(before, after)
Provider-->>Client: callback invoked with changes
sequenceDiagram
participant Handler as API Handler (create/update/delete)
participant NotifyChange as notify_change()
participant SSEBroadcaster as SSE Broadcaster
participant WebhookExecutor as execute_webhook_call()
Handler->>NotifyChange: call notify_change(WebhookData)
NotifyChange->>SSEBroadcaster: get_sse_sender() & send()
SSEBroadcaster-->>NotifyChange: signal dispatched (fire-and-forget)
NotifyChange->>WebhookExecutor: execute_webhook_call(data)
WebhookExecutor-->>NotifyChange: bool success
NotifyChange-->>Handler: return bool
Handler->>Handler: map to HttpResponse (Ok or 512)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~35 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds server-sent events (SSE) signaling on workspace changes, and introduces an SSE-based refresh strategy in the Python provider so SDK clients can refresh immediately when config/experiments are updated.
Changes:
- Add an SSE endpoint (
/stream) to the Superposition service and wire it into routing. - Add an SSE broadcaster to shared
AppState, and introducenotify_change()to broadcast refresh signals alongside existing webhook notifications. - Add
SseStrategyto the Python provider and implement an SSE listener loop + example.
Reviewed changes
Copilot reviewed 16 out of 17 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/superposition/src/stream/handlers.rs | Implements the SSE HTTP handler and streaming response. |
| crates/superposition/src/stream.rs | Exposes the stream endpoints module. |
| crates/superposition/src/main.rs | Registers /stream scope under workspace-specific routes. |
| crates/superposition/src/app_state.rs | Initializes sse_broadcaster in the service AppState. |
| crates/superposition/Cargo.toml | Adds direct deps used by the new SSE handler (tokio, futures-util). |
| crates/service_utils/src/service/types.rs | Extends AppState with SSE broadcaster map + subscribe/get helpers. |
| crates/service_utils/src/helpers.rs | Adds notify_change() to broadcast SSE refresh signals before webhook execution. |
| crates/experimentation_platform/src/api/experiments/handlers.rs | Switches webhook calls to notify_change() so experiments also trigger SSE refresh. |
| crates/context_aware_config/src/api/dimension/handlers.rs | Switches webhook calls to notify_change() so dimension changes trigger SSE refresh. |
| crates/context_aware_config/src/api/default_config/handlers.rs | Switches webhook calls to notify_change() so default-config changes trigger SSE refresh. |
| crates/context_aware_config/src/api/context/handlers.rs | Switches webhook calls to notify_change() so context changes trigger SSE refresh. |
| clients/python/provider/superposition_provider/types.py | Adds SseStrategy and includes it in the RefreshStrategy union. |
| clients/python/provider/superposition_provider/local_provider.py | Implements SSE loop strategy and adds optional on_config_change callback + refresh changes. |
| clients/python/provider/superposition_provider/http_data_source.py | Attempts to close underlying aiohttp session on close. |
| clients/python/provider/superposition_provider/init.py | Exports SseStrategy and updates docs to mention SSE. |
| clients/python/provider/examples/sse_example.py | Adds a runnable example demonstrating SSE-based refresh. |
| Cargo.lock | Updates lockfile for new Rust dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -0,0 +1,43 @@ | |||
| use actix_web::{HttpResponse, Scope, web::Data}; | |||
| use futures_util::stream::{self, StreamExt}; | |||
There was a problem hiding this comment.
StreamExt is imported but never used in this file. This will trigger an unused import warning (and can fail builds if warnings are denied). Remove StreamExt or use it explicitly.
| use futures_util::stream::{self, StreamExt}; | |
| use futures_util::stream::{self}; |
| pub sse_broadcaster: Mutex<HashMap<String, watch::Sender<()>>>, | ||
| } | ||
|
|
||
| impl AppState { | ||
| pub fn get_sse_sender(&self, schema_name: &str) -> watch::Sender<()> { | ||
| let mut map = self.sse_broadcaster.lock().expect("sse_broadcaster lock poisoned"); | ||
| map.entry(schema_name.to_string()) | ||
| .or_insert_with(|| watch::channel(()).0) | ||
| .clone() | ||
| } |
There was a problem hiding this comment.
sse_broadcaster stores a per-schema watch::Sender in a global HashMap with no eviction/removal path. In long-running services with many workspaces/schemas, this can grow without bound. Consider removing entries when there are no receivers (e.g., check sender.receiver_count() / is_closed() and prune), or use a bounded cache (LRU/TTL) for schema keys.
| /// Calls `execute_webhook_call` and also broadcasts an SSE "config changed" | ||
| /// signal so that connected SDK clients refresh immediately. | ||
| pub async fn notify_change<T>( | ||
| data: WebhookData<T>, | ||
| workspace_context: &WorkspaceContext, | ||
| state: &Data<AppState>, | ||
| conn: &mut DBConnection, | ||
| ) -> bool | ||
| where | ||
| T: Serialize, | ||
| { | ||
| // Broadcast SSE signal (fire-and-forget; ok if no subscribers). | ||
| let sender = state.get_sse_sender(&workspace_context.schema_name); | ||
| let _ = sender.send(()); | ||
|
|
||
| execute_webhook_call(data, workspace_context, state, conn).await |
There was a problem hiding this comment.
notify_change is used from both config endpoints and experimentation endpoints, but the docstring describes only a "config changed" signal. This is misleading for callers and for SSE consumers. Consider renaming the helper and/or documenting that it represents a generic "workspace changed" refresh signal (or accept an event type so experiments vs config can be distinguished).
| async for line_bytes in resp.content: | ||
| self_ref = weak_self() | ||
| if self_ref is None: | ||
| logger.info("Provider garbage collected, stopping SSE loop.") | ||
| return | ||
| line = line_bytes.decode("utf-8", errors="replace").strip() | ||
| logger.debug(f"SSE raw line: {line!r}") | ||
| if not line or line.startswith(":"): | ||
| continue | ||
| if line.startswith("event:") or line.startswith("data:"): | ||
| async def _do_refresh(ref=self_ref): | ||
| await asyncio.sleep(debounce_s) | ||
| try: | ||
| await ref.refresh() | ||
| except Exception as e: | ||
| logger.warning(f"SSE-triggered refresh failed: {e}") | ||
| if debounce_task and not debounce_task.done(): | ||
| debounce_task.cancel() | ||
| debounce_task = asyncio.create_task(_do_refresh()) |
There was a problem hiding this comment.
The SSE client loop iterates async for line_bytes in resp.content, but aiohttp yields arbitrary chunk boundaries, not guaranteed line-delimited SSE frames. This can merge/split event:/data: lines and cause missed or spurious refresh triggers. Use a line-based reader (e.g., await resp.content.readline() in a loop) and parse SSE events based on \n\n frame separation.
| async for line_bytes in resp.content: | |
| self_ref = weak_self() | |
| if self_ref is None: | |
| logger.info("Provider garbage collected, stopping SSE loop.") | |
| return | |
| line = line_bytes.decode("utf-8", errors="replace").strip() | |
| logger.debug(f"SSE raw line: {line!r}") | |
| if not line or line.startswith(":"): | |
| continue | |
| if line.startswith("event:") or line.startswith("data:"): | |
| async def _do_refresh(ref=self_ref): | |
| await asyncio.sleep(debounce_s) | |
| try: | |
| await ref.refresh() | |
| except Exception as e: | |
| logger.warning(f"SSE-triggered refresh failed: {e}") | |
| if debounce_task and not debounce_task.done(): | |
| debounce_task.cancel() | |
| debounce_task = asyncio.create_task(_do_refresh()) | |
| event_lines: List[str] = [] | |
| while True: | |
| line_bytes = await resp.content.readline() | |
| if line_bytes == b"": | |
| break | |
| self_ref = weak_self() | |
| if self_ref is None: | |
| logger.info("Provider garbage collected, stopping SSE loop.") | |
| return | |
| line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n") | |
| logger.debug(f"SSE raw line: {line!r}") | |
| if line == "": | |
| if any( | |
| event_line.startswith("event:") or event_line.startswith("data:") | |
| for event_line in event_lines | |
| ): | |
| async def _do_refresh(ref=self_ref): | |
| await asyncio.sleep(debounce_s) | |
| try: | |
| await ref.refresh() | |
| except Exception as e: | |
| logger.warning(f"SSE-triggered refresh failed: {e}") | |
| if debounce_task and not debounce_task.done(): | |
| debounce_task.cancel() | |
| debounce_task = asyncio.create_task(_do_refresh()) | |
| event_lines = [] | |
| continue | |
| if line.startswith(":"): | |
| continue | |
| event_lines.append(line) |
| @dataclass | ||
| class SseStrategy: | ||
| """SSE-based refresh strategy. | ||
|
|
||
| Connects to the server's SSE endpoint and refreshes when a change event | ||
| is received. Reconnects automatically on connection failure. | ||
|
|
||
| Requires SuperpositionOptions to build the SSE endpoint URL and authenticate. | ||
| """ | ||
| superposition_options: SuperpositionOptions | ||
| reconnect_delay: int = 5 # seconds between reconnect attempts | ||
| debounce_ms: int = 500 # debounce rapid successive events | ||
|
|
There was a problem hiding this comment.
SseStrategy carries a separate SuperpositionOptions instance, which can diverge from the HttpDataSource options passed to the provider (endpoint/token/org/workspace). This can lead to refreshing from a different workspace than the one being evaluated. Consider deriving SSE connection settings from the primary data source (when it’s HTTP) or validating that the options match on initialization.
| tokio::select! { | ||
| result = rx.changed() => match result { | ||
| Ok(()) => { | ||
| let payload = "event: config_change\ndata: {}\n\n"; |
There was a problem hiding this comment.
The SSE event name is hard-coded to config_change, but this stream is triggered via notify_change from experiment mutations as well. If clients rely on event: semantics, they may mis-handle experiment-only updates. Consider using a more generic event name (e.g., workspace_change) or emitting distinct events for config vs experiments.
| let payload = "event: config_change\ndata: {}\n\n"; | |
| let payload = "event: workspace_change\ndata: {}\n\n"; |
| ) | ||
| for result in results: | ||
| if isinstance(result, Exception): | ||
| logger.warning(f"Error during refresh: {result}") | ||
|
|
There was a problem hiding this comment.
refresh() now uses asyncio.gather(..., return_exceptions=True) and only logs exceptions. This changes behavior: callers (and _ensure_fresh_data with use_stale_on_error=False) can no longer detect refresh failures because errors are swallowed. Consider preserving the prior behavior (let exceptions propagate) or re-raising when any refresh task fails.
| ) | |
| for result in results: | |
| if isinstance(result, Exception): | |
| logger.warning(f"Error during refresh: {result}") | |
| ) | |
| errors = [] | |
| for result in results: | |
| if isinstance(result, Exception): | |
| logger.warning(f"Error during refresh: {result}") | |
| errors.append(result) | |
| if errors: | |
| raise errors[0] |
| logger.warning(f"SSE-triggered refresh failed: {e}") | ||
| if debounce_task and not debounce_task.done(): | ||
| debounce_task.cancel() | ||
| debounce_task = asyncio.create_task(_do_refresh()) |
There was a problem hiding this comment.
When debouncing SSE events, the previous debounce_task is cancelled but never awaited/joined. Cancelled tasks can still surface "Task exception was never retrieved"/pending-task warnings depending on timing. Consider awaiting cancellation (with contextlib.suppress(asyncio.CancelledError)) or adding a done-callback that consumes exceptions.
| if self.client: | ||
| try: | ||
| http_client = getattr(self.client._config, "http_client", None) | ||
| session = getattr(http_client, "_session", None) | ||
| if session and not session.closed: | ||
| await session.close() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
HttpDataSource.close() reaches into private SDK internals (self.client._config.http_client._session) and silently swallows all exceptions. If the SDK changes, this may stop closing sessions without any signal. Consider preferring a public close API if available, and at least log (debug/warn) when this cleanup path fails.
| if self.client: | |
| try: | |
| http_client = getattr(self.client._config, "http_client", None) | |
| session = getattr(http_client, "_session", None) | |
| if session and not session.closed: | |
| await session.close() | |
| except Exception: | |
| pass | |
| if not self.client: | |
| return | |
| client = self.client | |
| try: | |
| close_method = getattr(client, "close", None) | |
| if callable(close_method): | |
| close_result = close_method() | |
| if hasattr(close_result, "__await__"): | |
| await close_result | |
| return | |
| http_client = getattr(getattr(client, "_config", None), "http_client", None) | |
| session = getattr(http_client, "_session", None) | |
| if session and not session.closed: | |
| await session.close() | |
| except Exception: | |
| logger.warning("Failed to close Superposition HTTP client cleanly", exc_info=True) | |
| finally: |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
crates/superposition/src/stream/handlers.rs (1)
20-36: Nit:keepalivearm can starve outrx.changed()in the worst case; consider biasing select.
tokio::select!polls arms in pseudo-random order by default. If many keepalive ticks coincide withrx.changed()notifications, the keepalive branch may occasionally win, delaying aconfig_changeemission by up to one polling cycle. Not correctness-critical (therx.changed()state persists across the next iteration and fires on the following poll), buttokio::select! { biased; result = rx.changed() => ..., _ = keepalive.tick() => ... }will prefer real events over keepalives.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/superposition/src/stream/handlers.rs` around lines 20 - 36, The select in stream::unfold currently can let the keepalive tick starve rx.changed(); change the tokio::select! inside the async closure to a biased select so rx.changed() is preferred (use tokio::select! { biased; result = rx.changed() => ..., _ = keepalive.tick() => ... }) ensuring the config_change path (rx.changed()) wins over keepalive ticks; update the select invocation near event_stream creation where rx and keepalive are matched.crates/service_utils/src/service/types.rs (1)
102-116: Avoid panicking the request handler on lock poisoning; also note unbounded map growth.Two minor concerns with the new broadcaster:
self.sse_broadcaster.lock().expect("sse_broadcaster lock poisoned")will panic the handler if a prior holder panicked while holding the lock. Since the critical section here is trivial and infallible (onlyHashMapentry insertion +Sender::clone()), poisoning should be impossible in practice — but recovering viaPoisonError::into_inner()is cheap insurance and avoids worker-process fallout.The
sse_broadcastermap is append-only: entries are created lazily inget_sse_senderand never removed. This is bounded by the number of distinctschema_names ever seen for the process lifetime, which is probably fine, but worth noting if workspaces are ever churned frequently.🛡️ Suggested hardening
pub fn get_sse_sender(&self, schema_name: &str) -> watch::Sender<()> { - let mut map = self.sse_broadcaster.lock().expect("sse_broadcaster lock poisoned"); + let mut map = self + .sse_broadcaster + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); map.entry(schema_name.to_string()) .or_insert_with(|| watch::channel(()).0) .clone() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/service_utils/src/service/types.rs` around lines 102 - 116, get_sse_sender currently panics on Mutex poisoning and never removes entries, so change the lock acquisition to recover from PoisonError (e.g. self.sse_broadcaster.lock().unwrap_or_else(|e| e.into_inner())) instead of expect, and update get_sse_sender/subscribe_sse to avoid unbounded map growth by pruning or replacing stale senders (for example, check the watch::Sender receiver count and remove/replace entries with no receivers, or switch sse_broadcaster to a bounded cache like an LRU); keep references to the symbols sse_broadcaster, get_sse_sender and subscribe_sse when making these changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@clients/python/provider/examples/sse_example.py`:
- Around line 27-31: Replace asyncio.get_event_loop() with
asyncio.get_running_loop() inside the coroutine and rename the lambda parameter
from l to loop_arg (or _loop) for clarity; update the set_exception_handler call
so the handler signature is correct (use loop_arg, ctx) and when delegating to
the original handler call _orig(loop_arg, ctx) instead of _orig(ctx), keeping
_orig and set_exception_handler usage as shown in the snippet.
In `@clients/python/provider/superposition_provider/http_data_source.py`:
- Around line 221-231: The close method should not reach into private internals
or silently swallow errors: in SuperpositionProvider.close, first check for and
call a public close/aclose on self.client (e.g., if hasattr(self.client,
"aclose"): await self.client.aclose() elif hasattr(self.client, "close"): await
maybe_awaitable(self.client.close())), and only if no public close exists fall
back to the current getattr chain (client._config -> http_client -> _session)
but catch exceptions and log them at debug/warning instead of pass; update
references to self.client, client._config, http_client and _session accordingly
and ensure you do not suppress the exception silently. (Implement
maybe_awaitable as appropriate or await when coroutine.)
In `@clients/python/provider/superposition_provider/local_provider.py`:
- Around line 363-377: Keep the resolve_all_features call as-is (it’s
inherited), remove the dead _sse_connected_event field and any
assignments/usages of it, and delete the redundant conditional around
options.org_id (SuperpositionProviderOptions.org_id is required so the "if
options.org_id:" branch is unnecessary). For the SSE loop (the method that reads
the event stream and currently handles "event:" / "data:" lines one-by-one),
change the parser to accumulate successive lines into a buffer and only treat an
empty line as the end of a single SSE event; on that boundary parse the buffered
event/data block and trigger a single refresh (e.g., call the existing
refresh/notification path once per completed event) rather than firing on every
line. Ensure you update any related cancellation/cleanup logic accordingly so
events are debounced by the SSE boundary.
- Around line 508-569: The SSE loop has four fixes: remove or stop guarding
header addition by a dead check on options.org_id in _sse_loop
(SuperpositionOptions requires a non-empty org_id) or document that empty is
allowed; either wire self._sse_connected_event into initialize() to await the
first connection (await self._sse_connected_event.wait()) or remove the
attribute and its set() call (symbols: self._sse_connected_event, initialize);
replace per-line handling in _sse_loop/_do_refresh with proper SSE parsing by
buffering resp.content lines into an event until a blank line then trigger a
single debounce refresh task per complete event (symbols: _sse_loop,
_do_refresh, debounce_task); and narrow the broad inner except to avoid
swallowing asyncio.CancelledError (catch aiohttp.ClientError and
asyncio.TimeoutError explicitly, and re-raise or let CancelledError propagate).
Optionally make sock_read timeout configurable rather than hard-coding 30s if
server keepalive cadence may change.
In `@crates/service_utils/src/helpers.rs`:
- Around line 463-479: notify_change currently broadcasts the SSE via
state.get_sse_sender(...).send(()) before calling execute_webhook_call, which
causes clients to refresh even when webhook delivery fails; change the control
flow so execute_webhook_call(data, workspace_context, state, conn).await is
invoked first and only if it returns true then call sender.send(()), so SSE is
emitted on successful webhook delivery; while here, consider replacing the
std::sync::Mutex used inside state.get_sse_sender with a non-blocking
alternative (tokio::sync::Mutex or parking_lot::Mutex) to avoid holding a
blocking mutex inside the async fn.
---
Nitpick comments:
In `@crates/service_utils/src/service/types.rs`:
- Around line 102-116: get_sse_sender currently panics on Mutex poisoning and
never removes entries, so change the lock acquisition to recover from
PoisonError (e.g. self.sse_broadcaster.lock().unwrap_or_else(|e|
e.into_inner())) instead of expect, and update get_sse_sender/subscribe_sse to
avoid unbounded map growth by pruning or replacing stale senders (for example,
check the watch::Sender receiver count and remove/replace entries with no
receivers, or switch sse_broadcaster to a bounded cache like an LRU); keep
references to the symbols sse_broadcaster, get_sse_sender and subscribe_sse when
making these changes.
In `@crates/superposition/src/stream/handlers.rs`:
- Around line 20-36: The select in stream::unfold currently can let the
keepalive tick starve rx.changed(); change the tokio::select! inside the async
closure to a biased select so rx.changed() is preferred (use tokio::select! {
biased; result = rx.changed() => ..., _ = keepalive.tick() => ... }) ensuring
the config_change path (rx.changed()) wins over keepalive ticks; update the
select invocation near event_stream creation where rx and keepalive are matched.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: add4b1a4-5f88-413f-9dd6-9cd8f7ac71bc
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
clients/python/provider/examples/sse_example.pyclients/python/provider/superposition_provider/__init__.pyclients/python/provider/superposition_provider/http_data_source.pyclients/python/provider/superposition_provider/local_provider.pyclients/python/provider/superposition_provider/types.pycrates/context_aware_config/src/api/context/handlers.rscrates/context_aware_config/src/api/default_config/handlers.rscrates/context_aware_config/src/api/dimension/handlers.rscrates/experimentation_platform/src/api/experiments/handlers.rscrates/service_utils/src/helpers.rscrates/service_utils/src/service/types.rscrates/superposition/Cargo.tomlcrates/superposition/src/app_state.rscrates/superposition/src/main.rscrates/superposition/src/stream.rscrates/superposition/src/stream/handlers.rs
| loop = asyncio.get_event_loop() | ||
| _orig = loop.default_exception_handler | ||
| loop.set_exception_handler( | ||
| lambda l, ctx: None if ctx.get("message") == "Unclosed client session" else _orig(ctx) | ||
| ) |
There was a problem hiding this comment.
Minor: use get_running_loop() and rename ambiguous l.
asyncio.get_event_loop() inside a coroutine is deprecated since Python 3.10 (and emits a DeprecationWarning in 3.12); in an async def, asyncio.get_running_loop() is the idiomatic replacement. Also, the lambda parameter l trips Ruff E741 — rename to loop_arg (or _loop) for clarity.
♻️ Suggested cleanup
- loop = asyncio.get_event_loop()
+ loop = asyncio.get_running_loop()
_orig = loop.default_exception_handler
loop.set_exception_handler(
- lambda l, ctx: None if ctx.get("message") == "Unclosed client session" else _orig(ctx)
+ lambda _loop, ctx: None if ctx.get("message") == "Unclosed client session" else _orig(ctx)
)🧰 Tools
🪛 Ruff (0.15.10)
[error] 30-30: Ambiguous variable name: l
(E741)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@clients/python/provider/examples/sse_example.py` around lines 27 - 31,
Replace asyncio.get_event_loop() with asyncio.get_running_loop() inside the
coroutine and rename the lambda parameter from l to loop_arg (or _loop) for
clarity; update the set_exception_handler call so the handler signature is
correct (use loop_arg, ctx) and when delegating to the original handler call
_orig(loop_arg, ctx) instead of _orig(ctx), keeping _orig and
set_exception_handler usage as shown in the snippet.
| async def close(self) -> None: | ||
| """Close the HTTP client.""" | ||
| """Close the HTTP client and its underlying aiohttp session.""" | ||
| if self.client: | ||
| try: | ||
| http_client = getattr(self.client._config, "http_client", None) | ||
| session = getattr(http_client, "_session", None) | ||
| if session and not session.closed: | ||
| await session.close() | ||
| except Exception: | ||
| pass | ||
| self.client = None |
There was a problem hiding this comment.
Reaching into private SDK internals + silent exception swallow.
self.client._config.http_client._session traverses two layers of underscore-prefixed attributes of the SDK/HTTP client. Any internal rename in superposition_sdk or its HTTP backend will silently stop closing the aiohttp session (re-introducing the "Unclosed client session" warnings this change is meant to fix), and the broad except Exception: pass hides it. Consider:
- Exposing a public
close()/aclose()on the SDK client (or its HTTP client) and calling that instead. - At minimum, logging the exception at
debug/warningso regressions are observable (ruff S110/BLE001).
Proposed logging tweak
- try:
- http_client = getattr(self.client._config, "http_client", None)
- session = getattr(http_client, "_session", None)
- if session and not session.closed:
- await session.close()
- except Exception:
- pass
+ try:
+ http_client = getattr(self.client._config, "http_client", None)
+ session = getattr(http_client, "_session", None)
+ if session and not session.closed:
+ await session.close()
+ except Exception as e: # noqa: BLE001
+ logger.debug("Failed to close underlying aiohttp session: %s", e)🧰 Tools
🪛 Ruff (0.15.10)
[error] 229-230: try-except-pass detected, consider logging the exception
(S110)
[warning] 229-229: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@clients/python/provider/superposition_provider/http_data_source.py` around
lines 221 - 231, The close method should not reach into private internals or
silently swallow errors: in SuperpositionProvider.close, first check for and
call a public close/aclose on self.client (e.g., if hasattr(self.client,
"aclose"): await self.client.aclose() elif hasattr(self.client, "close"): await
maybe_awaitable(self.client.close())), and only if no public close exists fall
back to the current getattr chain (client._config -> http_client -> _session)
but catch exceptions and log them at debug/warning instead of pass; update
references to self.client, client._config, http_client and _session accordingly
and ensure you do not suppress the exception silently. (Implement
maybe_awaitable as appropriate or await when coroutine.)
| before = self.resolve_all_features(EvaluationContext()) if self.on_config_change and self.ffi_cache else None | ||
|
|
||
| results = await asyncio.gather( | ||
| self._fetch_and_cache_config(), | ||
| self._fetch_and_cache_experiments(), | ||
| return_exceptions=True, | ||
| ) | ||
| for result in results: | ||
| if isinstance(result, Exception): | ||
| logger.warning(f"Error during refresh: {result}") | ||
|
|
||
| if self.on_config_change and before is not None: | ||
| after = self.resolve_all_features(EvaluationContext()) | ||
| if before != after: | ||
| self.on_config_change(before, after) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm where resolve_all_features is defined (likely interfaces.py AllFeatureProvider).
fd -e py . clients/python/provider | xargs rg -nP '\bdef\s+resolve_all_features\b' -C2Repository: juspay/superposition
Length of output: 482
🏁 Script executed:
# Check LocalProvider class definition and inheritance
rg -n 'class LocalProvider' clients/python/provider -A 5Repository: juspay/superposition
Length of output: 693
🏁 Script executed:
# Check for _sse_connected_event usage
rg -n '_sse_connected_event' clients/python/providerRepository: juspay/superposition
Length of output: 326
🏁 Script executed:
# Find options/org_id dataclass definition
rg -n 'org_id' clients/python/provider/superposition_provider/local_provider.pyRepository: juspay/superposition
Length of output: 278
🏁 Script executed:
# Review SSE parsing logic - search for SSE event handling
rg -n 'event:|data:' clients/python/provider -A 2 -B 2Repository: juspay/superposition
Length of output: 6681
🏁 Script executed:
# Get complete LocalProviderOptions definition
rg -n 'class LocalProviderOptions' clients/python/provider -A 20Repository: juspay/superposition
Length of output: 1951
🏁 Script executed:
# Check if _sse_connected_event is ever awaited or used
rg -n 'await.*_sse_connected_event|_sse_connected_event.wait|_sse_connected_event.set' clients/python/providerRepository: juspay/superposition
Length of output: 46
🏁 Script executed:
# Find LocalProvider class definition and verify inheritance
rg -n 'class LocalProvider' clients/python/provider -A 10Repository: juspay/superposition
Length of output: 1130
🏁 Script executed:
# Search for LocalProvider class definition in local_provider.py
rg -n 'class LocalProvider' clients/python/provider/superposition_provider/local_provider.py -A 15Repository: juspay/superposition
Length of output: 46
🏁 Script executed:
# Get context around lines 510-520 to see what options object is used there
sed -n '510,520p' clients/python/provider/superposition_provider/local_provider.pyRepository: juspay/superposition
Length of output: 740
🏁 Script executed:
# Check if AllFeatureProvider is imported in local_provider.py
rg -n 'AllFeatureProvider|from.*interfaces' clients/python/provider/superposition_provider/local_provider.pyRepository: juspay/superposition
Length of output: 279
🏁 Script executed:
# Get full signature of resolve_all_features
rg -n 'def resolve_all_features' clients/python/provider/superposition_provider/interfaces.py -A 5Repository: juspay/superposition
Length of output: 1040
🏁 Script executed:
# Also verify lines 363-377 match what was shown in review
sed -n '363,377p' clients/python/provider/superposition_provider/local_provider.pyRepository: juspay/superposition
Length of output: 714
Remove the AttributeError concern — resolve_all_features is correctly inherited from AllFeatureProvider.
The method is defined in the AllFeatureProvider interface (signature: context: Optional[EvaluationContext] → Dict[str, Any]) and LocalResolutionProvider inherits from it, so this call is valid.
That said, address these issues:
_sse_connected_eventis a dead field: Set at line 520 but never awaited or used. Remove it.- Redundant org_id check at line 516: Since
org_idis a required field inSuperpositionProviderOptions(no default value),if options.org_id:always evaluates toTrue. Remove the conditional. - SSE parsing at line 553 fires on every
event:ordata:line individually: This violates the Server-Sent Events spec, which defines event boundaries at blank lines. Currently, a single SSE event may trigger multiple refresh calls in rapid succession before being cancelled, causing unnecessary churn. Restructure to accumulate lines until a blank line is encountered.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@clients/python/provider/superposition_provider/local_provider.py` around
lines 363 - 377, Keep the resolve_all_features call as-is (it’s inherited),
remove the dead _sse_connected_event field and any assignments/usages of it, and
delete the redundant conditional around options.org_id
(SuperpositionProviderOptions.org_id is required so the "if options.org_id:"
branch is unnecessary). For the SSE loop (the method that reads the event stream
and currently handles "event:" / "data:" lines one-by-one), change the parser to
accumulate successive lines into a buffer and only treat an empty line as the
end of a single SSE event; on that boundary parse the buffered event/data block
and trigger a single refresh (e.g., call the existing refresh/notification path
once per completed event) rather than firing on every line. Ensure you update
any related cancellation/cleanup logic accordingly so events are debounced by
the SSE boundary.
| case SseStrategy(): | ||
| import weakref | ||
| strategy: SseStrategy = self.refresh_strategy | ||
| options = strategy.superposition_options | ||
| debounce_s = strategy.debounce_ms / 1000 | ||
| reconnect_delay = max(strategy.reconnect_delay, 1) | ||
| sse_url = f"{options.endpoint.rstrip('/')}/{options.org_id}/{options.workspace_id}/stream" | ||
| sse_headers = {"Authorization": f"Bearer {options.token}"} | ||
| if options.org_id: | ||
| sse_headers["x-org-id"] = options.org_id | ||
| weak_self = weakref.ref(self) | ||
| connected_event = asyncio.Event() | ||
| self._sse_connected_event = connected_event | ||
|
|
||
| async def _sse_loop(): | ||
| import aiohttp | ||
| logger.info(f"Starting SSE refresh (url={sse_url})") | ||
| try: | ||
| async with aiohttp.ClientSession() as session: | ||
| while True: | ||
| if weak_self() is None: | ||
| logger.info("Provider garbage collected, stopping SSE loop.") | ||
| return | ||
| try: | ||
| async with session.get( | ||
| sse_url, | ||
| headers=sse_headers, | ||
| timeout=aiohttp.ClientTimeout(total=None, sock_read=30), | ||
| ) as resp: | ||
| if resp.status != 200: | ||
| logger.warning(f"SSE endpoint returned {resp.status}, retrying in {reconnect_delay}s") | ||
| await asyncio.sleep(reconnect_delay) | ||
| continue | ||
| logger.info("SSE connection established") | ||
| connected_event.set() | ||
| debounce_task = None | ||
| async for line_bytes in resp.content: | ||
| self_ref = weak_self() | ||
| if self_ref is None: | ||
| logger.info("Provider garbage collected, stopping SSE loop.") | ||
| return | ||
| line = line_bytes.decode("utf-8", errors="replace").strip() | ||
| logger.debug(f"SSE raw line: {line!r}") | ||
| if not line or line.startswith(":"): | ||
| continue | ||
| if line.startswith("event:") or line.startswith("data:"): | ||
| async def _do_refresh(ref=self_ref): | ||
| await asyncio.sleep(debounce_s) | ||
| try: | ||
| await ref.refresh() | ||
| except Exception as e: | ||
| logger.warning(f"SSE-triggered refresh failed: {e}") | ||
| if debounce_task and not debounce_task.done(): | ||
| debounce_task.cancel() | ||
| debounce_task = asyncio.create_task(_do_refresh()) | ||
| except (aiohttp.ClientError, asyncio.TimeoutError) as e: | ||
| logger.warning(f"SSE connection error: {e}, retrying in {reconnect_delay}s") | ||
| await asyncio.sleep(reconnect_delay) | ||
| except asyncio.CancelledError: | ||
| logger.info("SSE loop cancelled") | ||
|
|
||
| self._background_task = asyncio.create_task(_sse_loop()) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
aiohttp StreamReader async for iteration yields lines or chunks?
💡 Result:
aiohttp StreamReader's default async for iteration (async for line in stream_reader:) yields lines (bytes ending with \n). For chunks, use explicit methods like iter_chunked(n), iter_any, or iter_chunks.
Citations:
- 1: https://aiohttp.readthedocs.org/en/stable/streams.html
- 2: https://aiohttp.readthedocs.io/en/stable/streams.html
- 3: https://docs.aiohttp.org/en/v3.7.4/streams.html
- 4: http://aiohttp.readthedocs.org/en/stable/_sources/streams.rst.txt
- 5: https://docs.aiohttp.org/en/v3.8.3/_sources/streams.rst.txt
🏁 Script executed:
# Find SuperpositionOptions class definition
find . -type f -name "*.py" | xargs grep -l "class SuperpositionOptions"Repository: juspay/superposition
Length of output: 121
🏁 Script executed:
# Search for _sse_connected_event usage in codebase
rg "_sse_connected_event" --type pyRepository: juspay/superposition
Length of output: 319
🏁 Script executed:
# Check SuperpositionOptions structure and org_id field
rg -A 20 "class SuperpositionOptions" --type pyRepository: juspay/superposition
Length of output: 1826
🏁 Script executed:
# Check if _sse_connected_event is awaited in initialize() method
rg "initialize" -A 20 --type py clients/python/provider/superposition_provider/local_provider.py | head -50Repository: juspay/superposition
Length of output: 1787
🏁 Script executed:
# Check types.py for org_id field definition and default values
cat clients/python/provider/superposition_provider/types.py | head -100Repository: juspay/superposition
Length of output: 2767
🏁 Script executed:
# View the full SSE loop code around line 540-570 to see blank line handling
sed -n '535,570p' clients/python/provider/superposition_provider/local_provider.pyRepository: juspay/superposition
Length of output: 2791
SSE loop: several small issues — unused _sse_connected_event, dead if org_id, inefficient per-line triggering, and broad except.
-
Line 516–517:
options.org_idis a required, non-defaultstrinSuperpositionOptions, soif options.org_id:is effectively dead code. Remove the conditional or confirm that empty strings are valid. -
Line 519–520:
self._sse_connected_eventis stored and.set()on connect but never awaited anywhere (noawait self._sse_connected_event.wait()ininitialize()or elsewhere). Either wire it intoinitialize()to block until the first SSE connection is established, or remove the field. -
Line 553–562 (SSE parsing): SSE events are delimited by blank lines, and a typical event has both
event:anddata:lines. The current code triggers_do_refreshon each matching line, creating two tasks per event—the first is immediately cancelled by the second, so it works due to the cancel-replace pattern, but it's inefficient and not correct SSE parsing. Buffer lines until the blank line terminator, then trigger once per event. -
Line 554–559:
except Exception as eis very broad (Ruff BLE001) but safe in this inner async context. Consider explicitly excludingCancelledErrorif needed. -
Line 535:
timeout=aiohttp.ClientTimeout(total=None, sock_read=30)means a 30s idle read will close the connection. Since the server emits keepalives every 15s, this is fine, but it couples client and server behavior tightly: if keepalive interval is raised past 30s, the connection will flap.
♻️ Suggested SSE parsing fix
- debounce_task = None
- async for line_bytes in resp.content:
+ debounce_task = None
+ pending_event = False
+ async for line_bytes in resp.content:
self_ref = weak_self()
if self_ref is None:
logger.info("Provider garbage collected, stopping SSE loop.")
return
line = line_bytes.decode("utf-8", errors="replace").strip()
logger.debug(f"SSE raw line: {line!r}")
- if not line or line.startswith(":"):
- continue
- if line.startswith("event:") or line.startswith("data:"):
+ if line.startswith(":"):
+ continue
+ if not line:
+ # event terminator — trigger once per event
+ if not pending_event:
+ continue
+ pending_event = False
async def _do_refresh(ref=self_ref):
await asyncio.sleep(debounce_s)
try:
await ref.refresh()
except Exception as e:
logger.warning(f"SSE-triggered refresh failed: {e}")
if debounce_task and not debounce_task.done():
debounce_task.cancel()
debounce_task = asyncio.create_task(_do_refresh())
+ elif line.startswith("event:") or line.startswith("data:"):
+ pending_event = True🧰 Tools
🪛 Ruff (0.15.10)
[warning] 558-558: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@clients/python/provider/superposition_provider/local_provider.py` around
lines 508 - 569, The SSE loop has four fixes: remove or stop guarding header
addition by a dead check on options.org_id in _sse_loop (SuperpositionOptions
requires a non-empty org_id) or document that empty is allowed; either wire
self._sse_connected_event into initialize() to await the first connection (await
self._sse_connected_event.wait()) or remove the attribute and its set() call
(symbols: self._sse_connected_event, initialize); replace per-line handling in
_sse_loop/_do_refresh with proper SSE parsing by buffering resp.content lines
into an event until a blank line then trigger a single debounce refresh task per
complete event (symbols: _sse_loop, _do_refresh, debounce_task); and narrow the
broad inner except to avoid swallowing asyncio.CancelledError (catch
aiohttp.ClientError and asyncio.TimeoutError explicitly, and re-raise or let
CancelledError propagate). Optionally make sock_read timeout configurable rather
than hard-coding 30s if server keepalive cadence may change.
| /// Calls `execute_webhook_call` and also broadcasts an SSE "config changed" | ||
| /// signal so that connected SDK clients refresh immediately. | ||
| pub async fn notify_change<T>( | ||
| data: WebhookData<T>, | ||
| workspace_context: &WorkspaceContext, | ||
| state: &Data<AppState>, | ||
| conn: &mut DBConnection, | ||
| ) -> bool | ||
| where | ||
| T: Serialize, | ||
| { | ||
| // Broadcast SSE signal (fire-and-forget; ok if no subscribers). | ||
| let sender = state.get_sse_sender(&workspace_context.schema_name); | ||
| let _ = sender.send(()); | ||
|
|
||
| execute_webhook_call(data, workspace_context, state, conn).await | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, verify the function exists and check the exact code at lines 463-479
cat -n crates/service_utils/src/helpers.rs | sed -n '463,479p'Repository: juspay/superposition
Length of output: 754
🏁 Script executed:
# Find all callers of notify_change to understand the usage pattern
rg --type rust "notify_change" -B 3 -A 3Repository: juspay/superposition
Length of output: 14832
🏁 Script executed:
# Check if there are any tests for notify_change
rg --type rust "notify_change" crates/service_utils/src -A 10 | grep -E "(test|#\[|assert|expect)"Repository: juspay/superposition
Length of output: 46
🏁 Script executed:
# Look for any documentation or comments about the intended behavior
rg --type rust "SSE|webhook|broadcast" crates/service_utils/src/helpers.rs -B 2 -A 2Repository: juspay/superposition
Length of output: 3418
🏁 Script executed:
# Look at the context around a few notify_change calls to see if DB is committed before the call
rg --type rust "notify_change" crates/experimentation_platform/src/api/experiments/handlers.rs -B 15 | head -50Repository: juspay/superposition
Length of output: 1830
🏁 Script executed:
# Search for when transactions are committed in the handlers
rg --type rust "commit|transaction|insert|update" crates/experimentation_platform/src/api/experiments/handlers.rs -B 2 -A 2 | head -80Repository: juspay/superposition
Length of output: 2833
🏁 Script executed:
# Check if 512 is a custom status code defined in the codebase
rg --type rust "512" -rRepository: juspay/superposition
Length of output: 477
🏁 Script executed:
# Look for any architectural documentation or comments about webhook/SSE interaction
rg --type rust "SSE|webhook" crates/service_utils/src/helpers.rs -B 5 -A 5Repository: juspay/superposition
Length of output: 5362
🏁 Script executed:
# Check if there are any GitHub issues or ADRs (architecture decision records) mentioning this behavior
find . -type f \( -name "*.md" -o -name "*.txt" \) | head -20 | xargs grep -l "webhook\|SSE" 2>/dev/nullRepository: juspay/superposition
Length of output: 109
SSE broadcast fires even when the webhook call fails — appears intentional.
notify_change broadcasts the SSE signal before invoking execute_webhook_call, so when webhook delivery fails, the caller returns an error status to the client while SSE subscribers have already been notified to refresh. However, the DB change is already committed at this point (the transaction completes before notify_change is called), so this fire-and-forget pattern appears intentional—allowing SDK clients to refresh immediately regardless of webhook delivery success.
Two implementation notes:
state.get_sse_sender(...)acquires astd::sync::Mutexguard inside anasync fn. The critical section is tiny (HashMap entry insert) so this is fine in practice, but if the map ever grows or contention increases, considerparking_lot::Mutexortokio::sync::Mutexto avoid blocking a worker thread.sender.send(())ontokio::sync::watchalways marks the channel changed regardless of prior value; if you ever need "collapse duplicate notifications," watch is already doing that for idle receivers, so no action needed — just flagging for awareness.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/service_utils/src/helpers.rs` around lines 463 - 479, notify_change
currently broadcasts the SSE via state.get_sse_sender(...).send(()) before
calling execute_webhook_call, which causes clients to refresh even when webhook
delivery fails; change the control flow so execute_webhook_call(data,
workspace_context, state, conn).await is invoked first and only if it returns
true then call sender.send(()), so SSE is emitted on successful webhook
delivery; while here, consider replacing the std::sync::Mutex used inside
state.get_sse_sender with a non-blocking alternative (tokio::sync::Mutex or
parking_lot::Mutex) to avoid holding a blocking mutex inside the async fn.
24d10f9 to
c539677
Compare
c539677 to
a60758f
Compare
This pull request introduces a new SSE-based refresh strategy to the Python provider, enabling real-time configuration updates via server-sent events. It also refactors Rust backend handlers to use a new unified notification function. The most important changes are grouped below:
Python Provider: SSE Refresh Strategy and Example
SseStrategyas a new refresh strategy in the provider, allowing the client to listen for real-time configuration changes via SSE. This includes changes to the type definitions, provider logic, and the refresh strategy startup logic. (clients/python/provider/superposition_provider/types.py[1] [2];clients/python/provider/superposition_provider/local_provider.py[3] [4] [5] [6] [7];clients/python/provider/superposition_provider/__init__.py[8] [9] [10]sse_example.pydemonstrating how to use the new SSE refresh strategy in a client application. (clients/python/provider/examples/sse_example.pyclients/python/provider/examples/sse_example.pyR1-R59)Python Provider: Maintenance and Improvements
clients/python/provider/superposition_provider/http_data_source.pyclients/python/provider/superposition_provider/http_data_source.pyL222-R230)on_config_changecallback to the local provider, allowing clients to react to configuration changes. The provider now calls this callback after a refresh if changes are detected. (clients/python/provider/superposition_provider/local_provider.py[1] [2] [3]Callableinlocal_provider.py. (clients/python/provider/superposition_provider/local_provider.pyclients/python/provider/superposition_provider/local_provider.pyL12-R12)Rust Backend: Notification Refactor
notify_changefunction instead of the previousexecute_webhook_call, streamlining the change notification logic throughout the codebase. (crates/context_aware_config/src/api/context/handlers.rs[1] [2] [3] [4] [5] [6] [7];crates/context_aware_config/src/api/default_config/handlers.rs[8] [9] [10] [11];crates/context_aware_config/src/api/dimension/handlers.rs[12]Summary by CodeRabbit
New Features
Improvements