Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions clients/python/provider/examples/sse_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import logging
import os
import sys

_PYTHON_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, os.path.join(_PYTHON_DIR, "sdk"))
sys.path.insert(0, os.path.join(_PYTHON_DIR, "bindings"))

from openfeature.evaluation_context import EvaluationContext
from superposition_provider import LocalResolutionProvider, HttpDataSource
from superposition_provider.types import SuperpositionOptions, SseStrategy

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logging.getLogger("superposition_sdk").setLevel(logging.CRITICAL)
logging.getLogger("superposition_provider").setLevel(logging.WARNING)
log = logging.getLogger(__name__)


ENDPOINT = os.environ.get("SUPERPOSITION_ENDPOINT", "http://localhost:8080")
TOKEN = os.environ.get("SUPERPOSITION_TOKEN", "token")
ORG = os.environ.get("SUPERPOSITION_ORG_ID", "localorg")
WORKSPACE = os.environ.get("SUPERPOSITION_WORKSPACE", "dev")


async def main():
loop = asyncio.get_event_loop()
_orig = loop.default_exception_handler
loop.set_exception_handler(
lambda l, ctx: None if ctx.get("message") == "Unclosed client session" else _orig(ctx)
)
Comment on lines +27 to +31
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: use get_running_loop() and rename ambiguous l.

asyncio.get_event_loop() inside a coroutine is deprecated since Python 3.10 (and emits a DeprecationWarning in 3.12); in an async def, asyncio.get_running_loop() is the idiomatic replacement. Also, the lambda parameter l trips Ruff E741 — rename to loop_arg (or _loop) for clarity.

♻️ Suggested cleanup
-    loop = asyncio.get_event_loop()
+    loop = asyncio.get_running_loop()
     _orig = loop.default_exception_handler
     loop.set_exception_handler(
-        lambda l, ctx: None if ctx.get("message") == "Unclosed client session" else _orig(ctx)
+        lambda _loop, ctx: None if ctx.get("message") == "Unclosed client session" else _orig(ctx)
     )
🧰 Tools
🪛 Ruff (0.15.10)

[error] 30-30: Ambiguous variable name: l

(E741)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@clients/python/provider/examples/sse_example.py` around lines 27 - 31,
Replace asyncio.get_event_loop() with asyncio.get_running_loop() inside the
coroutine and rename the lambda parameter from l to loop_arg (or _loop) for
clarity; update the set_exception_handler call so the handler signature is
correct (use loop_arg, ctx) and when delegating to the original handler call
_orig(loop_arg, ctx) instead of _orig(ctx), keeping _orig and
set_exception_handler usage as shown in the snippet.


options = SuperpositionOptions(endpoint=ENDPOINT, token=TOKEN, org_id=ORG, workspace_id=WORKSPACE)

def on_config_change(before, after):
for key, value in after.items():
if before.get(key) != value:
log.info(f"[UPDATE] {key}: {before.get(key)!r} -> {value!r}")

provider = LocalResolutionProvider(
primary_source=HttpDataSource(options),
refresh_strategy=SseStrategy(superposition_options=options, reconnect_delay=5),
on_config_change=on_config_change,
)

await provider.initialize(EvaluationContext())
log.info(f"Initial config: {provider.resolve_all_features(EvaluationContext())}")
log.info("Listening for SSE config changes (Ctrl-C to stop)")

try:
await asyncio.Event().wait()
except KeyboardInterrupt:
pass
finally:
await provider.shutdown()


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Provides OpenFeature-compliant feature flag providers with support for:
- Local resolution with caching (LocalResolutionProvider)
- Remote evaluation without caching (SuperpositionAPIProvider)
- Configurable refresh strategies (Polling, OnDemand, Watch, Manual)
- Configurable refresh strategies (Polling, OnDemand, Watch, SSE, Manual)
- File-based and HTTP-based data sources
- Full FFI integration for performance
"""
Expand All @@ -29,6 +29,7 @@
PollingStrategy,
OnDemandStrategy,
WatchStrategy,
SseStrategy,
ManualStrategy,
RefreshStrategy as RefreshStrategyType,
ConfigurationOptions,
Expand Down Expand Up @@ -59,6 +60,7 @@
"PollingStrategy",
"OnDemandStrategy",
"WatchStrategy",
"SseStrategy",
"ManualStrategy",
"RefreshStrategyType",
"ConfigurationOptions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ def supports_experiments(self) -> bool:
return True

async def close(self) -> None:
"""Close the HTTP client."""
"""Close the HTTP client and its underlying aiohttp session."""
if self.client:
try:
http_client = getattr(self.client._config, "http_client", None)
session = getattr(http_client, "_session", None)
if session and not session.closed:
await session.close()
except Exception:
pass
Comment on lines 201 to +208
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HttpDataSource.close() reaches into private SDK internals (self.client._config.http_client._session) and silently swallows all exceptions. If the SDK changes, this may stop closing sessions without any signal. Consider preferring a public close API if available, and at least log (debug/warn) when this cleanup path fails.

Suggested change
if self.client:
try:
http_client = getattr(self.client._config, "http_client", None)
session = getattr(http_client, "_session", None)
if session and not session.closed:
await session.close()
except Exception:
pass
if not self.client:
return
client = self.client
try:
close_method = getattr(client, "close", None)
if callable(close_method):
close_result = close_method()
if hasattr(close_result, "__await__"):
await close_result
return
http_client = getattr(getattr(client, "_config", None), "http_client", None)
session = getattr(http_client, "_session", None)
if session and not session.closed:
await session.close()
except Exception:
logger.warning("Failed to close Superposition HTTP client cleanly", exc_info=True)
finally:

Copilot uses AI. Check for mistakes.
self.client = None
Comment on lines 199 to 209
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Reaching into private SDK internals + silent exception swallow.

self.client._config.http_client._session traverses two layers of underscore-prefixed attributes of the SDK/HTTP client. Any internal rename in superposition_sdk or its HTTP backend will silently stop closing the aiohttp session (re-introducing the "Unclosed client session" warnings this change is meant to fix), and the broad except Exception: pass hides it. Consider:

  • Exposing a public close()/aclose() on the SDK client (or its HTTP client) and calling that instead.
  • At minimum, logging the exception at debug/warning so regressions are observable (ruff S110/BLE001).
Proposed logging tweak
-            try:
-                http_client = getattr(self.client._config, "http_client", None)
-                session = getattr(http_client, "_session", None)
-                if session and not session.closed:
-                    await session.close()
-            except Exception:
-                pass
+            try:
+                http_client = getattr(self.client._config, "http_client", None)
+                session = getattr(http_client, "_session", None)
+                if session and not session.closed:
+                    await session.close()
+            except Exception as e:  # noqa: BLE001
+                logger.debug("Failed to close underlying aiohttp session: %s", e)
🧰 Tools
🪛 Ruff (0.15.10)

[error] 229-230: try-except-pass detected, consider logging the exception

(S110)


[warning] 229-229: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@clients/python/provider/superposition_provider/http_data_source.py` around
lines 221 - 231, The close method should not reach into private internals or
silently swallow errors: in SuperpositionProvider.close, first check for and
call a public close/aclose on self.client (e.g., if hasattr(self.client,
"aclose"): await self.client.aclose() elif hasattr(self.client, "close"): await
maybe_awaitable(self.client.close())), and only if no public close exists fall
back to the current getattr chain (client._config -> http_client -> _session)
but catch exceptions and log them at debug/warning instead of pass; update
references to self.client, client._config, http_client and _session accordingly
and ensure you do not suppress the exception silently. (Implement
maybe_awaitable as appropriate or await when coroutine.)

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import json
import weakref
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Tuple, Union, Sequence, Mapping
from typing import Callable, Dict, List, Optional, Any, Tuple, Union, Sequence, Mapping

from openfeature.provider import (
AbstractProvider,
Expand All @@ -26,7 +26,7 @@
from . import FetchResponse
from .data_source import SuperpositionDataSource, ConfigData, ExperimentData
from .interfaces import AllFeatureProvider, FeatureExperimentMeta
from .types import RefreshStrategy, OnDemandStrategy, WatchStrategy, PollingStrategy, ManualStrategy, default_on_demand_strategy
from .types import RefreshStrategy, OnDemandStrategy, WatchStrategy, PollingStrategy, ManualStrategy, SseStrategy, default_on_demand_strategy

logger = logging.getLogger(__name__)

Expand All @@ -45,6 +45,7 @@ def __init__(
primary_source: SuperpositionDataSource,
fallback_source: Optional[SuperpositionDataSource] = None,
refresh_strategy: RefreshStrategy = default_on_demand_strategy(),
on_config_change: Optional[Callable[[Dict[str, Any], Dict[str, Any]], None]] = None,
):
"""Initialize local resolution provider.

Expand All @@ -56,6 +57,7 @@ def __init__(
self.primary_source = primary_source
self.fallback_source = fallback_source
self.refresh_strategy = refresh_strategy
self.on_config_change = on_config_change

self.metadata = Metadata(name="LocalResolutionProvider")
self.status = ProviderStatus.NOT_READY
Expand All @@ -68,6 +70,8 @@ def __init__(

# Background task for refresh strategy
self._background_task: Optional[asyncio.Task] = None
# Set once the SSE connection is established (SseStrategy only)
self._sse_connected_event: Optional[asyncio.Event] = None

async def initialize(self, context: EvaluationContext):
"""Initialize the provider.
Expand Down Expand Up @@ -358,7 +362,21 @@ async def refresh(self) -> None:

Useful for MANUAL refresh strategy.
"""
await asyncio.gather(self._fetch_and_cache_config(), self._fetch_and_cache_experiments())
before = self.resolve_all_features(EvaluationContext()) if self.on_config_change and self.ffi_cache else None

results = await asyncio.gather(
self._fetch_and_cache_config(),
self._fetch_and_cache_experiments(),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
logger.warning(f"Error during refresh: {result}")

Comment on lines +371 to +375
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refresh() now uses asyncio.gather(..., return_exceptions=True) and only logs exceptions. This changes behavior: callers (and _ensure_fresh_data with use_stale_on_error=False) can no longer detect refresh failures because errors are swallowed. Consider preserving the prior behavior (let exceptions propagate) or re-raising when any refresh task fails.

Suggested change
)
for result in results:
if isinstance(result, Exception):
logger.warning(f"Error during refresh: {result}")
)
errors = []
for result in results:
if isinstance(result, Exception):
logger.warning(f"Error during refresh: {result}")
errors.append(result)
if errors:
raise errors[0]

Copilot uses AI. Check for mistakes.
if self.on_config_change and before is not None:
after = self.resolve_all_features(EvaluationContext())
if before != after:
self.on_config_change(before, after)
Comment on lines +365 to +379
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm where resolve_all_features is defined (likely interfaces.py AllFeatureProvider).
fd -e py . clients/python/provider | xargs rg -nP '\bdef\s+resolve_all_features\b' -C2

Repository: juspay/superposition

Length of output: 482


🏁 Script executed:

# Check LocalProvider class definition and inheritance
rg -n 'class LocalProvider' clients/python/provider -A 5

Repository: juspay/superposition

Length of output: 693


🏁 Script executed:

# Check for _sse_connected_event usage
rg -n '_sse_connected_event' clients/python/provider

Repository: juspay/superposition

Length of output: 326


🏁 Script executed:

# Find options/org_id dataclass definition
rg -n 'org_id' clients/python/provider/superposition_provider/local_provider.py

Repository: juspay/superposition

Length of output: 278


🏁 Script executed:

# Review SSE parsing logic - search for SSE event handling
rg -n 'event:|data:' clients/python/provider -A 2 -B 2

Repository: juspay/superposition

Length of output: 6681


🏁 Script executed:

# Get complete LocalProviderOptions definition
rg -n 'class LocalProviderOptions' clients/python/provider -A 20

Repository: juspay/superposition

Length of output: 1951


🏁 Script executed:

# Check if _sse_connected_event is ever awaited or used
rg -n 'await.*_sse_connected_event|_sse_connected_event.wait|_sse_connected_event.set' clients/python/provider

Repository: juspay/superposition

Length of output: 46


🏁 Script executed:

# Find LocalProvider class definition and verify inheritance
rg -n 'class LocalProvider' clients/python/provider -A 10

Repository: juspay/superposition

Length of output: 1130


🏁 Script executed:

# Search for LocalProvider class definition in local_provider.py
rg -n 'class LocalProvider' clients/python/provider/superposition_provider/local_provider.py -A 15

Repository: juspay/superposition

Length of output: 46


🏁 Script executed:

# Get context around lines 510-520 to see what options object is used there
sed -n '510,520p' clients/python/provider/superposition_provider/local_provider.py

Repository: juspay/superposition

Length of output: 740


🏁 Script executed:

# Check if AllFeatureProvider is imported in local_provider.py
rg -n 'AllFeatureProvider|from.*interfaces' clients/python/provider/superposition_provider/local_provider.py

Repository: juspay/superposition

Length of output: 279


🏁 Script executed:

# Get full signature of resolve_all_features
rg -n 'def resolve_all_features' clients/python/provider/superposition_provider/interfaces.py -A 5

Repository: juspay/superposition

Length of output: 1040


🏁 Script executed:

# Also verify lines 363-377 match what was shown in review
sed -n '363,377p' clients/python/provider/superposition_provider/local_provider.py

Repository: juspay/superposition

Length of output: 714


Remove the AttributeError concern — resolve_all_features is correctly inherited from AllFeatureProvider.

The method is defined in the AllFeatureProvider interface (signature: context: Optional[EvaluationContext] → Dict[str, Any]) and LocalResolutionProvider inherits from it, so this call is valid.

That said, address these issues:

  • _sse_connected_event is a dead field: Set at line 520 but never awaited or used. Remove it.
  • Redundant org_id check at line 516: Since org_id is a required field in SuperpositionProviderOptions (no default value), if options.org_id: always evaluates to True. Remove the conditional.
  • SSE parsing at line 553 fires on every event: or data: line individually: This violates the Server-Sent Events spec, which defines event boundaries at blank lines. Currently, a single SSE event may trigger multiple refresh calls in rapid succession before being cancelled, causing unnecessary churn. Restructure to accumulate lines until a blank line is encountered.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@clients/python/provider/superposition_provider/local_provider.py` around
lines 363 - 377, Keep the resolve_all_features call as-is (it’s inherited),
remove the dead _sse_connected_event field and any assignments/usages of it, and
delete the redundant conditional around options.org_id
(SuperpositionProviderOptions.org_id is required so the "if options.org_id:"
branch is unnecessary). For the SSE loop (the method that reads the event stream
and currently handles "event:" / "data:" lines one-by-one), change the parser to
accumulate successive lines into a buffer and only treat an empty line as the
end of a single SSE event; on that boundary parse the buffered event/data block
and trigger a single refresh (e.g., call the existing refresh/notification path
once per completed event) rather than firing on every line. Ensure you update
any related cancellation/cleanup logic accordingly so events are debounced by
the SSE boundary.


# --- Private helpers ---

Expand Down Expand Up @@ -574,7 +592,75 @@ async def _watch_loop() -> None:
case WatchStrategy():
self._background_task = asyncio.create_task(_watch_loop())
case PollingStrategy():
self._background_task = asyncio.create_task(_polling_loop())
self._background_task = asyncio.create_task(self._polling_loop())
case SseStrategy():
import weakref
strategy: SseStrategy = self.refresh_strategy
options = strategy.superposition_options
debounce_s = strategy.debounce_ms / 1000
reconnect_delay = max(strategy.reconnect_delay, 1)
sse_url = f"{options.endpoint.rstrip('/')}/{options.org_id}/{options.workspace_id}/stream"
sse_headers = {"Authorization": f"Bearer {options.token}"}
if options.org_id:
sse_headers["x-org-id"] = options.org_id
weak_self = weakref.ref(self)
connected_event = asyncio.Event()
self._sse_connected_event = connected_event

async def _sse_loop():
import aiohttp
logger.info(f"Starting SSE refresh (url={sse_url})")
try:
async with aiohttp.ClientSession() as session:
while True:
if weak_self() is None:
logger.info("Provider garbage collected, stopping SSE loop.")
return
try:
async with session.get(
sse_url,
headers=sse_headers,
timeout=aiohttp.ClientTimeout(total=None, sock_read=30),
) as resp:
if resp.status != 200:
logger.warning(f"SSE endpoint returned {resp.status}, retrying in {reconnect_delay}s")
await asyncio.sleep(reconnect_delay)
continue
logger.info("SSE connection established")
connected_event.set()
self_ref = weak_self()
if self_ref is not None:
try:
await self_ref.refresh()
except Exception as e:
logger.warning(f"Reconnect refresh failed: {e}")
debounce_task = None
async for line_bytes in resp.content:
self_ref = weak_self()
if self_ref is None:
logger.info("Provider garbage collected, stopping SSE loop.")
return
line = line_bytes.decode("utf-8", errors="replace").strip()
logger.debug(f"SSE raw line: {line!r}")
if not line or line.startswith(":"):
continue
if line.startswith("event:") or line.startswith("data:"):
async def _do_refresh(ref=self_ref):
await asyncio.sleep(debounce_s)
try:
await ref.refresh()
except Exception as e:
logger.warning(f"SSE-triggered refresh failed: {e}")
if debounce_task and not debounce_task.done():
debounce_task.cancel()
debounce_task = asyncio.create_task(_do_refresh())
Comment on lines +638 to +656
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSE client loop iterates async for line_bytes in resp.content, but aiohttp yields arbitrary chunk boundaries, not guaranteed line-delimited SSE frames. This can merge/split event:/data: lines and cause missed or spurious refresh triggers. Use a line-based reader (e.g., await resp.content.readline() in a loop) and parse SSE events based on \n\n frame separation.

Suggested change
async for line_bytes in resp.content:
self_ref = weak_self()
if self_ref is None:
logger.info("Provider garbage collected, stopping SSE loop.")
return
line = line_bytes.decode("utf-8", errors="replace").strip()
logger.debug(f"SSE raw line: {line!r}")
if not line or line.startswith(":"):
continue
if line.startswith("event:") or line.startswith("data:"):
async def _do_refresh(ref=self_ref):
await asyncio.sleep(debounce_s)
try:
await ref.refresh()
except Exception as e:
logger.warning(f"SSE-triggered refresh failed: {e}")
if debounce_task and not debounce_task.done():
debounce_task.cancel()
debounce_task = asyncio.create_task(_do_refresh())
event_lines: List[str] = []
while True:
line_bytes = await resp.content.readline()
if line_bytes == b"":
break
self_ref = weak_self()
if self_ref is None:
logger.info("Provider garbage collected, stopping SSE loop.")
return
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n")
logger.debug(f"SSE raw line: {line!r}")
if line == "":
if any(
event_line.startswith("event:") or event_line.startswith("data:")
for event_line in event_lines
):
async def _do_refresh(ref=self_ref):
await asyncio.sleep(debounce_s)
try:
await ref.refresh()
except Exception as e:
logger.warning(f"SSE-triggered refresh failed: {e}")
if debounce_task and not debounce_task.done():
debounce_task.cancel()
debounce_task = asyncio.create_task(_do_refresh())
event_lines = []
continue
if line.startswith(":"):
continue
event_lines.append(line)

Copilot uses AI. Check for mistakes.
Comment on lines +653 to +656
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When debouncing SSE events, the previous debounce_task is cancelled but never awaited/joined. Cancelled tasks can still surface "Task exception was never retrieved"/pending-task warnings depending on timing. Consider awaiting cancellation (with contextlib.suppress(asyncio.CancelledError)) or adding a done-callback that consumes exceptions.

Copilot uses AI. Check for mistakes.
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.warning(f"SSE connection error: {e}, retrying in {reconnect_delay}s")
await asyncio.sleep(reconnect_delay)
except asyncio.CancelledError:
logger.info("SSE loop cancelled")

self._background_task = asyncio.create_task(_sse_loop())
Comment on lines +596 to +663
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

aiohttp StreamReader async for iteration yields lines or chunks?

💡 Result:

aiohttp StreamReader's default async for iteration (async for line in stream_reader:) yields lines (bytes ending with \n). For chunks, use explicit methods like iter_chunked(n), iter_any, or iter_chunks.

Citations:


🏁 Script executed:

# Find SuperpositionOptions class definition
find . -type f -name "*.py" | xargs grep -l "class SuperpositionOptions"

Repository: juspay/superposition

Length of output: 121


🏁 Script executed:

# Search for _sse_connected_event usage in codebase
rg "_sse_connected_event" --type py

Repository: juspay/superposition

Length of output: 319


🏁 Script executed:

# Check SuperpositionOptions structure and org_id field
rg -A 20 "class SuperpositionOptions" --type py

Repository: juspay/superposition

Length of output: 1826


🏁 Script executed:

# Check if _sse_connected_event is awaited in initialize() method
rg "initialize" -A 20 --type py clients/python/provider/superposition_provider/local_provider.py | head -50

Repository: juspay/superposition

Length of output: 1787


🏁 Script executed:

# Check types.py for org_id field definition and default values
cat clients/python/provider/superposition_provider/types.py | head -100

Repository: juspay/superposition

Length of output: 2767


🏁 Script executed:

# View the full SSE loop code around line 540-570 to see blank line handling
sed -n '535,570p' clients/python/provider/superposition_provider/local_provider.py

Repository: juspay/superposition

Length of output: 2791


SSE loop: several small issues — unused _sse_connected_event, dead if org_id, inefficient per-line triggering, and broad except.

  1. Line 516–517: options.org_id is a required, non-default str in SuperpositionOptions, so if options.org_id: is effectively dead code. Remove the conditional or confirm that empty strings are valid.

  2. Line 519–520: self._sse_connected_event is stored and .set() on connect but never awaited anywhere (no await self._sse_connected_event.wait() in initialize() or elsewhere). Either wire it into initialize() to block until the first SSE connection is established, or remove the field.

  3. Line 553–562 (SSE parsing): SSE events are delimited by blank lines, and a typical event has both event: and data: lines. The current code triggers _do_refresh on each matching line, creating two tasks per event—the first is immediately cancelled by the second, so it works due to the cancel-replace pattern, but it's inefficient and not correct SSE parsing. Buffer lines until the blank line terminator, then trigger once per event.

  4. Line 554–559: except Exception as e is very broad (Ruff BLE001) but safe in this inner async context. Consider explicitly excluding CancelledError if needed.

  5. Line 535: timeout=aiohttp.ClientTimeout(total=None, sock_read=30) means a 30s idle read will close the connection. Since the server emits keepalives every 15s, this is fine, but it couples client and server behavior tightly: if keepalive interval is raised past 30s, the connection will flap.

♻️ Suggested SSE parsing fix
-                                        debounce_task = None
-                                        async for line_bytes in resp.content:
+                                        debounce_task = None
+                                        pending_event = False
+                                        async for line_bytes in resp.content:
                                             self_ref = weak_self()
                                             if self_ref is None:
                                                 logger.info("Provider garbage collected, stopping SSE loop.")
                                                 return
                                             line = line_bytes.decode("utf-8", errors="replace").strip()
                                             logger.debug(f"SSE raw line: {line!r}")
-                                            if not line or line.startswith(":"):
-                                                continue
-                                            if line.startswith("event:") or line.startswith("data:"):
+                                            if line.startswith(":"):
+                                                continue
+                                            if not line:
+                                                # event terminator — trigger once per event
+                                                if not pending_event:
+                                                    continue
+                                                pending_event = False
                                                 async def _do_refresh(ref=self_ref):
                                                     await asyncio.sleep(debounce_s)
                                                     try:
                                                         await ref.refresh()
                                                     except Exception as e:
                                                         logger.warning(f"SSE-triggered refresh failed: {e}")
                                                 if debounce_task and not debounce_task.done():
                                                     debounce_task.cancel()
                                                 debounce_task = asyncio.create_task(_do_refresh())
+                                            elif line.startswith("event:") or line.startswith("data:"):
+                                                pending_event = True
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 558-558: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@clients/python/provider/superposition_provider/local_provider.py` around
lines 508 - 569, The SSE loop has four fixes: remove or stop guarding header
addition by a dead check on options.org_id in _sse_loop (SuperpositionOptions
requires a non-empty org_id) or document that empty is allowed; either wire
self._sse_connected_event into initialize() to await the first connection (await
self._sse_connected_event.wait()) or remove the attribute and its set() call
(symbols: self._sse_connected_event, initialize); replace per-line handling in
_sse_loop/_do_refresh with proper SSE parsing by buffering resp.content lines
into an event until a blank line then trigger a single debounce refresh task per
complete event (symbols: _sse_loop, _do_refresh, debounce_task); and narrow the
broad inner except to avoid swallowing asyncio.CancelledError (catch
aiohttp.ClientError and asyncio.TimeoutError explicitly, and re-raise or let
CancelledError propagate). Optionally make sock_read timeout configurable rather
than hard-coding 30s if server keepalive cadence may change.

case ManualStrategy():
logger.debug("MANUAL strategy: caller must invoke refresh()")
case OnDemandStrategy():
Expand Down
15 changes: 14 additions & 1 deletion clients/python/provider/superposition_provider/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ class WatchStrategy:
def default_watch_strategy():
return WatchStrategy(500)

@dataclass
class SseStrategy:
"""SSE-based refresh strategy.

Connects to the server's SSE endpoint and refreshes when a change event
is received. Reconnects automatically on connection failure.

Requires SuperpositionOptions to build the SSE endpoint URL and authenticate.
"""
superposition_options: SuperpositionOptions
reconnect_delay: int = 5 # seconds between reconnect attempts
debounce_ms: int = 500 # debounce rapid successive events

Comment on lines +74 to +86
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SseStrategy carries a separate SuperpositionOptions instance, which can diverge from the HttpDataSource options passed to the provider (endpoint/token/org/workspace). This can lead to refreshing from a different workspace than the one being evaluated. Consider deriving SSE connection settings from the primary data source (when it’s HTTP) or validating that the options match on initialization.

Copilot uses AI. Check for mistakes.
@dataclass
class ManualStrategy:
"""Manual refresh strategy.
Expand All @@ -81,7 +94,7 @@ class ManualStrategy:


# Union type for all refresh strategies
RefreshStrategy = Union[PollingStrategy, OnDemandStrategy, WatchStrategy, ManualStrategy]
RefreshStrategy = Union[PollingStrategy, OnDemandStrategy, WatchStrategy, SseStrategy, ManualStrategy]


# ============================================================================
Expand Down
14 changes: 7 additions & 7 deletions crates/context_aware_config/src/api/context/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use diesel::{
use serde_json::{Map, Value};
use service_utils::{
helpers::{
WebhookData, execute_webhook_call, fetch_dimensions_info_map, parse_config_tags,
WebhookData, fetch_dimensions_info_map, notify_change, parse_config_tags,
},
middlewares::auth_z::{Action as AuthZAction, AuthZ},
service::types::{
Expand Down Expand Up @@ -186,7 +186,7 @@ async fn create_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down Expand Up @@ -322,7 +322,7 @@ async fn update_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down Expand Up @@ -467,7 +467,7 @@ async fn move_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down Expand Up @@ -754,7 +754,7 @@ async fn delete_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::NoContent()
Expand Down Expand Up @@ -1129,7 +1129,7 @@ async fn bulk_operations_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut resp_builder = if webhook_status {
HttpResponse::Ok()
Expand Down Expand Up @@ -1245,7 +1245,7 @@ async fn weight_recompute_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use diesel::{
use jsonschema::ValidationError;
use serde_json::Value;
use service_utils::{
helpers::{WebhookData, execute_webhook_call, parse_config_tags},
helpers::{WebhookData, notify_change, parse_config_tags},
service::types::{
AppHeader, AppState, CustomHeaders, DbConnection, EncryptionKey, SchemaName,
WorkspaceContext,
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn create_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down Expand Up @@ -348,7 +348,7 @@ async fn update_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down Expand Up @@ -586,7 +586,7 @@ async fn delete_handler(
};

let webhook_status =
execute_webhook_call(data, &workspace_context, &state, &mut conn).await;
notify_change(data, &workspace_context, &state, &mut conn).await;

let mut http_resp = if webhook_status {
HttpResponse::Ok()
Expand Down
Loading
Loading