Make exporter survive connection interruption#357
Conversation
WalkthroughThe changes in the pull request focus on the Changes
Sequence Diagram(s)sequenceDiagram
participant Controller
participant Exporter
participant ListenStream
Controller->>Exporter: Sends request
Exporter->>ListenStream: Push request into listen_tx
ListenStream-->>Exporter: Emit request via listen_rx
alt Handling error
Exporter->>Exporter: Log error and retry with backoff
end
sequenceDiagram
participant Controller
participant Exporter
participant StatusStream
Controller->>Exporter: Sends status update
Exporter->>StatusStream: Push update into status_tx
StatusStream-->>Exporter: Emit update via status_rx
alt Handling error
Exporter->>Exporter: Log error and retry with backoff
end
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
✅ Deploy Preview for jumpstarter-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
packages/jumpstarter/jumpstarter/exporter/exporter.py (3)
40-45: Connection handling wrapped intry-except.A nice addition for basic error handling. However, consider whether logging at an error level (
logger.error) would be more appropriate for visibility or if you want to re-raise in certain failure scenarios.
71-85: Introducing a long-livedlistenloop with retries.Good approach to continuously watch for incoming requests. However, this may loop infinitely if errors persist. Consider adding a maximum retry count or an exponential backoff to avoid a potential hard loop.
Example (adding a brief exponential backoff snippet):
except Exception as e: logger.info("Listen stream interrupted, restarting: {}".format(e)) + # Consider an exponential backoff or maximum retry limit here await sleep(3)
98-107: Similar retry logic for thestatusroutine.As with the
listenloop, use caution regarding infinite retry. You may want to introduce adjustable sleep intervals (e.g., exponential backoff) or a maximum retry limit.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/jumpstarter-cli-admin/jumpstarter_cli_admin/get_test.py(2 hunks)packages/jumpstarter/jumpstarter/exporter/exporter.py(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (11)
- GitHub Check: Redirect rules - jumpstarter-docs
- GitHub Check: Header rules - jumpstarter-docs
- GitHub Check: Pages changed - jumpstarter-docs
- GitHub Check: e2e
- GitHub Check: pytest-matrix (3.11)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter-dev .devfile/Containerfile)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter-utils Dockerfile.utils)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter Dockerfile)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter-devspace .devfile/Containerfile.client)
🔇 Additional comments (6)
packages/jumpstarter-cli-admin/jumpstarter_cli_admin/get_test.py (2)
767-767: Added blank line before decorator for improved readability.No functional changes introduced. This is purely a readability enhancement.
1145-1145: Added blank line before decorator for improved readability.No functional changes introduced. This is purely a readability enhancement.
packages/jumpstarter/jumpstarter/exporter/exporter.py (4)
7-7: Importingsleepfrom anyio.This import is used for the new retry logic. Good addition for asynchronously handling retries.
87-87: Asynchronous iteration overlisten_rx.This leverages the new
listen_tx/listen_rxstreams seamlessly. No issues spotted.
96-97: Creating memory object streams for status updates.Good pattern to handle asynchronous status messages.
109-110: Starting thestatuscoroutine within the task group.This is consistent with the overall async pattern in the exporter. Nicely done.
| listen_tx, listen_rx = create_memory_object_stream() | ||
|
|
||
| async def listen(): | ||
| while True: |
There was a problem hiding this comment.
I am worried about our previous experiences with re-connections in a long running service....
can we add a limit to retries? and may be a flag to enable this?
There was a problem hiding this comment.
Implemented the retry limit, yet to add a flag.
ebadc18 to
1d0fd22
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/jumpstarter/jumpstarter/exporter/exporter.py (2)
99-100: Consider adding error handling for the consumer loop.While you've added robust error handling for the producer side (the
listenfunction), the consumer loop doesn't have equivalent error handling. This might be intentional if errors here should cause termination, but for consistency consider adding error handling.- async for request in listen_rx: + try: + async for request in listen_rx: + logger.info("Handling new connection request on lease %s", lease_name) + tg.start_soon( + self.__handle, path, request.router_endpoint, request.router_token, self.tls, self.grpc_options + ) + except Exception as e: + logger.error("Error consuming connection requests: %s", e) + # Handle consumer errors appropriately
110-132: Refactor duplicate retry logic into a helper function.The retry logic in
statusis identical to the logic inlisten. Consider extracting this pattern into a helper function to avoid code duplication and ensure consistent behavior.# Add at the beginning of the class or as a standalone function: + async def _stream_with_retry(self, stream_func, stream_tx, retries=5, backoff=3): + """Helper function for streaming with retry logic.""" + retries_left = retries + while True: + try: + async for item in stream_func(): + await stream_tx.send(item) + except Exception as e: + if retries_left > 0: + retries_left -= 1 + logger.info( + "Stream interrupted, restarting in %ss, %s retries left: %s", + backoff, retries_left, e + ) + await sleep(backoff) + else: + logger.error("Maximum retries exceeded, waiting for manual intervention: %s", e) + await sleep(60) + retries_left = retries + else: + retries_left = retries # Then in handle method: + async def listen(): + controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory()) + await self._stream_with_retry( + lambda: controller.Listen(jumpstarter_pb2.ListenRequest(lease_name=lease_name)), + listen_tx, + self.max_retries if self.retry_enabled else 0, + self.retry_backoff + ) # And in serve method: + async def status(): + controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory()) + await self._stream_with_retry( + lambda: controller.Status(jumpstarter_pb2.StatusRequest()), + status_tx, + self.max_retries if self.retry_enabled else 0, + self.retry_backoff + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/jumpstarter/jumpstarter/exporter/exporter.py(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (11)
- GitHub Check: Redirect rules - jumpstarter-docs
- GitHub Check: Header rules - jumpstarter-docs
- GitHub Check: Pages changed - jumpstarter-docs
- GitHub Check: e2e
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.11)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter-dev .devfile/Containerfile)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter Dockerfile)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter-devspace .devfile/Containerfile.client)
- GitHub Check: build-and-push-image (jumpstarter-dev/jumpstarter-utils Dockerfile.utils)
🔇 Additional comments (5)
packages/jumpstarter/jumpstarter/exporter/exporter.py (5)
41-46: Good addition of error handling.The try-except block for handling connection failures is a good start for making the exporter more robust. This implementation will properly log errors when connections fail.
73-95: Consider making retry parameters configurable.While adding retry logic with backoff is an excellent improvement, hardcoding the retry count and backoff duration limits flexibility. This addresses part of the previous comment about adding retry limits, but doesn't provide a way to configure or disable this behavior.
Consider making these parameters configurable through class initialization:
@dataclass(kw_only=True) class Exporter(AbstractAsyncContextManager, Metadata): channel_factory: Callable[[], grpc.aio.Channel] device_factory: Callable[[], Driver] lease_name: str = field(init=False, default="") tls: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1) grpc_options: dict[str, str] = field(default_factory=dict) + max_retries: int = field(default=5) + retry_backoff: int = field(default=3) + retry_enabled: bool = field(default=True)Then update the
listenfunction signature:- async def listen(retries=5, backoff=3): + async def listen(): + retries = self.max_retries if self.retry_enabled else 0 + backoff = self.retry_backoff
96-96: LGTM: Task startup looks good.The use of task groups to manage the asynchronous listen operation is properly implemented.
135-135: Good job using memory streams for decoupling producers and consumers.Using memory object streams to separate the producer (status) from the consumer loop improves code modularity and error isolation, which is a good design practice.
133-135:Details
❓ Verification inconclusive
Consider synchronizing cancellation across both status and listen streams.
The task group is properly set up, but there's a potential race condition if a status change occurs while the
statusfunction is in a retry backoff. Make sure both streams are properly cancelled when the service state changes.Verify how task cancellation propagates to inner functions during backoff periods. Does the cancellation immediately terminate the sleeping tasks?
🏁 Script executed:
#!/bin/bash # Search for anyio sleep cancellation behavior rg -A 3 -B 3 "cancel.*sleep|sleep.*cancel" --glob "*.py" # Check for explicit cancellation handling with sleep rg -A 3 -B 3 "try.*sleep.*except.*CancelledError|CancelledError.*sleep" --glob "*.py"Length of output: 146
I'll perform an additional investigation into the cancellation behavior during backoff periods in the exporter. To help clarify whether sleeping tasks (such as those in a retry backoff) are immediately cancelled, please run the following scripts:
If these searches still produce no output, it might indicate that cancellation or backoff logic is managed indirectly (for example, via anyio’s built-in mechanisms). In that case—and because the absence of explicit sleep cancellation handling makes it unclear if backoff sleeping tasks are immediately terminated—I recommend that you manually verify the cancellation propagation (e.g., by checking anyio’s documentation or your test suite behavior).
Action Required: Verify Cancellation Synchronization for Backoff Tasks
The current task group setup in
exporter.py(lines 133–135) is correct, but a potential race condition remains if a status change occurs while thestatusfunction is in a retry backoff. Specifically, it’s unclear if sleeping tasks within backoff periods are immediately cancelled. Please ensure that cancellation propagates correctly across both the status and listen streams when the service state changes.
🏁 Script executed:
#!/bin/bash echo "Searching exporter.py for sleep usage to uncover backoff/sleep patterns..." rg -A 5 -B 5 "sleep(" packages/jumpstarter/jumpstarter/exporter/exporter.py echo "Searching exporter.py for CancelledError handling..." rg -A 5 -B 5 "CancelledError" packages/jumpstarter/jumpstarter/exporter/exporter.py echo "Searching globally for backoff usage in Python files..." rg -A 3 -B 3 "backoff" --glob "*.py"Length of output: 3866
Ensure proper cancellation propagation during retry backoff
- The exporter’s
status(and similarlylisten) function usesawait sleep(backoff)within a try/except block. Since AnyIO’s sleep is cancellation-friendly, a running backoff should be interrupted by a CancelledError when the task group is cancelled.- However, there’s no explicit synchronization between the streams outside the task group context. Please verify (e.g., via tests or documentation review) that cancellation indeed terminates the sleeping tasks immediately when the service state changes.
- If cancellation is not prompt during backoff, consider adding explicit cancellation checks before or after the sleep to avoid any potential race condition.
Summary by CodeRabbit