Ensure proper cancellation of in-flight workflow tasks#1819
Ensure proper cancellation of in-flight workflow tasks#1819thepatrickchin wants to merge 7 commits intoNVIDIA:developfrom
Conversation
WalkthroughEnforces workflow schema presence, preempts and awaits any in-flight workflow task before starting a new one, makes workflow run cancellation-aware (avoids sending completion/observability on cancel), tightens done-callback clearing, and prevents Runner.aexit from masking propagated exceptions; adds async tests for Runner exit/cancellation behavior. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py (1)
449-500:⚠️ Potential issue | 🟠 MajorMake the pending trace local to the workflow task.
If either awaited send in this
finallyraises, the reset at the end is skipped and the handler keeps the previous run's trace. The next run can then suppress its own trace or emit stale data.💡 One way to contain the state to a single workflow run
- _cancelled = False + _cancelled = False + pending_observability_trace: ResponseObservabilityTrace | None = None try: auth_callback = self._flow_handler.authenticate if self._flow_handler else None async with self._session_manager.session(user_id=self._user_id, user_message_id=user_message_id, conversation_id=conversation_id, http_connection=self._socket, user_input_callback=self.human_interaction_callback, user_authentication_callback=auth_callback) as session: self._session_manager._context.metadata._request.payload = self._user_message_payload async for value in generate_streaming_response(payload, session=session, streaming=True, step_adaptor=self._step_adaptor, result_type=result_type, output_type=output_type): # Store observability trace to send after completion message if isinstance(value, ResponseObservabilityTrace): - if self._pending_observability_trace is None: - self._pending_observability_trace = value + if pending_observability_trace is None: + pending_observability_trace = value continue @@ finally: if not _cancelled: await self.create_websocket_message(data_model=SystemResponseContent(), message_type=WebSocketMessageType.RESPONSE_MESSAGE, status=WebSocketMessageStatus.COMPLETE) # Send observability trace after completion message - if self._pending_observability_trace is not None: - await self.create_websocket_message(data_model=self._pending_observability_trace, + if pending_observability_trace is not None: + await self.create_websocket_message(data_model=pending_observability_trace, message_type=WebSocketMessageType.OBSERVABILITY_TRACE_MESSAGE) - - self._pending_observability_trace = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py` around lines 449 - 500, The handler-level _pending_observability_trace must be made local to the workflow run: at the start of the workflow (before streaming/generating responses in generate_streaming_response) capture the current self._pending_observability_trace into a local variable (e.g. pending_observability_trace = self._pending_observability_trace) and immediately clear the handler state (self._pending_observability_trace = None) so the handler no longer holds stale state; then use that local pending_observability_trace when sending the observability message in the finally block (instead of referencing self._pending_observability_trace) so any exceptions while awaiting create_websocket_message won't leave stale traces on the handler object. Ensure references in this change target _pending_observability_trace and the finally send that calls create_websocket_message (and leave generate_streaming_response and authenticate/session logic unchanged).
🧹 Nitpick comments (1)
packages/nvidia_nat_core/tests/nat/runtime/test_runner.py (1)
287-319: Please cover the actual workflow-replacement path too.These cases only validate
Runner.__aexit__. The regression inpackages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.pyhappens when one live workflow task is cancelled and another starts immediately, so that handoff still needs a regression test.As per coding guidelines, "Maintain >= 80% test coverage; add or update tests when introducing changes."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_core/tests/nat/runtime/test_runner.py` around lines 287 - 319, Add a new async test that exercises the workflow-replacement path: use WorkflowBuilder to create two distinct entry functions (or reuse entry_fn then immediately start a second run), start a Runner for the first workflow, trigger an asyncio.CancelledError inside that running workflow (so it is cancelled), and immediately start a second Runner invocation to simulate handoff; assert that the cancelled task’s CancelledError is not masked by a ValueError and that the second workflow runs to completion (e.g., returns expected result or calls result()), referencing Runner, WorkflowBuilder, entry_fn (or a second entry function), ContextState, and ExporterManager to set up and verify the handoff behavior tied to the message_handler regression.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 278-297: The current logic cancels the previous
self._running_workflow_task and immediately starts a new asyncio.create_task
pointing at self._run_workflow, but the cancelled task can still mutate
handler-scoped state on self and race with the replacement; to fix, either
(preferred) serialize run startup by awaiting the cancelled task to finish
before creating the new one (e.g., call self._running_workflow_task.cancel()
then await it/await with timeout or use asyncio.shield to observe completion) so
the old run cannot teardown shared state after the new one starts, or
(alternative) change _run_workflow to accept a per-run context object (pass a
new RunContext instance into _run_workflow) and ensure all per-run state is
stored on that object instead of self; update references to
_running_workflow_task, _run_workflow, _conversation_id, and the
_done_callback/_worker.remove_conversation_handler logic accordingly so they
operate on the serialized/isolated run rather than shared handler state.
---
Outside diff comments:
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 449-500: The handler-level _pending_observability_trace must be
made local to the workflow run: at the start of the workflow (before
streaming/generating responses in generate_streaming_response) capture the
current self._pending_observability_trace into a local variable (e.g.
pending_observability_trace = self._pending_observability_trace) and immediately
clear the handler state (self._pending_observability_trace = None) so the
handler no longer holds stale state; then use that local
pending_observability_trace when sending the observability message in the
finally block (instead of referencing self._pending_observability_trace) so any
exceptions while awaiting create_websocket_message won't leave stale traces on
the handler object. Ensure references in this change target
_pending_observability_trace and the finally send that calls
create_websocket_message (and leave generate_streaming_response and
authenticate/session logic unchanged).
---
Nitpick comments:
In `@packages/nvidia_nat_core/tests/nat/runtime/test_runner.py`:
- Around line 287-319: Add a new async test that exercises the
workflow-replacement path: use WorkflowBuilder to create two distinct entry
functions (or reuse entry_fn then immediately start a second run), start a
Runner for the first workflow, trigger an asyncio.CancelledError inside that
running workflow (so it is cancelled), and immediately start a second Runner
invocation to simulate handoff; assert that the cancelled task’s CancelledError
is not masked by a ValueError and that the second workflow runs to completion
(e.g., returns expected result or calls result()), referencing Runner,
WorkflowBuilder, entry_fn (or a second entry function), ContextState, and
ExporterManager to set up and verify the handoff behavior tied to the
message_handler regression.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 4cbaff47-8f18-4c01-99aa-218943804912
📒 Files selected for processing (3)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.pypackages/nvidia_nat_core/src/nat/runtime/runner.pypackages/nvidia_nat_core/tests/nat/runtime/test_runner.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py (1)
278-284: Consider adding debug logging for cancelled task cleanup.The pattern of catching and suppressing exceptions during cleanup is intentional here. However, logging at debug level would aid troubleshooting and satisfy static analysis (S110/BLE001) without changing behavior.
♻️ Optional: Add debug logging for visibility
if self._running_workflow_task is not None: self._running_workflow_task.cancel() try: await self._running_workflow_task - except (asyncio.CancelledError, Exception): - pass + except asyncio.CancelledError: + logger.debug("Previous workflow task cancelled successfully") + except Exception: + logger.debug("Previous workflow task raised exception during cancellation", exc_info=True) self._running_workflow_task = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py` around lines 278 - 284, Add debug-level logging around the cancellation/cleanup of self._running_workflow_task: before calling self._running_workflow_task.cancel() log that the running workflow task is being cancelled (include task repr/id), and inside the except (asyncio.CancelledError, Exception) block log the caught exception at debug level (including exception info) so cleanup still suppresses errors but is visible in logs; use the component's existing logger (e.g., self._logger or the module logger) and keep behavior unchanged while emitting these debug messages around the cancel(), await self._running_workflow_task, and the exception handler.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 278-284: Add debug-level logging around the cancellation/cleanup
of self._running_workflow_task: before calling
self._running_workflow_task.cancel() log that the running workflow task is being
cancelled (include task repr/id), and inside the except (asyncio.CancelledError,
Exception) block log the caught exception at debug level (including exception
info) so cleanup still suppresses errors but is visible in logs; use the
component's existing logger (e.g., self._logger or the module logger) and keep
behavior unchanged while emitting these debug messages around the cancel(),
await self._running_workflow_task, and the exception handler.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0a00474d-41ca-447c-afbe-373bad03ec1f
📒 Files selected for processing (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py
- Cancel any in-flight workflow task when a new user message arrives so that a workflow stuck waiting on an OAuth redirect (e.g. blocked popup) can be cleanly replaced. - Guard Runner.__aexit__ from raising a secondary ValueError when an exception (e.g. CancelledError) is already propagating, - Suppress the COMPLETE/observability messages in _run_workflow when the task was cancelled. Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
- Only clear _running_workflow_task when the callback matches the current task - Move conversation handler removal outside the task-identity check so a cancelled task's callback still cleans up the handler when no new task was created (e.g. RuntimeError before asyncio.create_task) Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
Reset _pending_observability_trace unconditionally in the finally block so a stale trace from a cancelled task cannot persist into the next workflow execution Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
_done_callback closed over self._conversation_id directly, which can change if a new message arrives before the task completes. The callback could then remove the wrong conversation's handler registration. Capture the value into a local variable so each callback always cleans up the conversation it was created for. Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
Prevents a race where the old task's finally block mutates shared handler state (e.g. pending_observability_trace) concurrently with a new workflow run. Also suppress CancelledError in Runner.__aexit__ and add tests to cover both clean-exit and cancellation paths. Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
cd5a3b3 to
dc91e06
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py (1)
272-293:⚠️ Potential issue | 🟠 MajorStage the replacement request locally until the old task is gone.
Line 272 commits the new request onto handler-scoped fields before the previous task has exited. The old run still reads
self._message_parent_id/self._conversation_idincreate_websocket_message()andself._user_message_payloadon Line 462, and its done-callback on Lines 288-293 can also remove the newly registered conversation handler if it completes in that window. Parse from local values first, then copy them ontoselfonly after the cancel/await block.💡 Suggested shape
try: - self._initialize_workflow_request(user_message_as_validated_type) - message_content: typing.Any = await self._process_websocket_user_message(user_message_as_validated_type) - - if self._workflow_schema_type is None: + workflow_schema_type = user_message_as_validated_type.schema_type + if workflow_schema_type is None: raise RuntimeError("Workflow schema type is not initialized") + message_content: typing.Any = await self._process_websocket_user_message( + user_message_as_validated_type, + workflow_schema_type, + ) if self._running_workflow_task is not None: self._running_workflow_task.cancel() try: await self._running_workflow_task except (asyncio.CancelledError, Exception): pass self._running_workflow_task = None + + self._initialize_workflow_request(user_message_as_validated_type)
_process_websocket_user_message()would need to takeworkflow_schema_typeas an argument so this parsing stays self-free.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py` around lines 272 - 293, The handler is assigning new request state onto self (e.g., self._message_parent_id, self._conversation_id, self._user_message_payload via _process_websocket_user_message) before the previous _running_workflow_task has fully exited, which can cause races in create_websocket_message and the _done_callback; fix by parsing/staging all request-specific values into local variables first (call _process_websocket_user_message with needed context like workflow_schema_type instead of relying on self), then perform the cancel/await of self._running_workflow_task, and only after the old task is cleared copy those local variables onto self (and then register _done_callback and set self._running_workflow_task). Ensure references to _message_parent_id, _conversation_id, _user_message_payload, _process_websocket_user_message, _running_workflow_task, _done_callback, _worker.get_conversation_handler, and _worker.remove_conversation_handler are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 493-504: The finally block may be cancelled while awaiting
create_websocket_message, leaving self._pending_observability_trace set across
requests; fix by wrapping the sends that reference
self._pending_observability_trace in an inner try/finally so that regardless of
cancellation you always clear self._pending_observability_trace in the inner
finally. Specifically, around the await self.create_websocket_message(...) calls
(the ones using SystemResponseContent() and self._pending_observability_trace
and WebSocketMessageType.*), add an inner try that performs the sends and an
inner finally that sets self._pending_observability_trace = None to guarantee
cleanup even if awaits are cancelled.
---
Duplicate comments:
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 272-293: The handler is assigning new request state onto self
(e.g., self._message_parent_id, self._conversation_id,
self._user_message_payload via _process_websocket_user_message) before the
previous _running_workflow_task has fully exited, which can cause races in
create_websocket_message and the _done_callback; fix by parsing/staging all
request-specific values into local variables first (call
_process_websocket_user_message with needed context like workflow_schema_type
instead of relying on self), then perform the cancel/await of
self._running_workflow_task, and only after the old task is cleared copy those
local variables onto self (and then register _done_callback and set
self._running_workflow_task). Ensure references to _message_parent_id,
_conversation_id, _user_message_payload, _process_websocket_user_message,
_running_workflow_task, _done_callback, _worker.get_conversation_handler, and
_worker.remove_conversation_handler are updated accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b2bae0b0-1da3-4ece-be8d-ff4d624563dc
📒 Files selected for processing (3)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.pypackages/nvidia_nat_core/src/nat/runtime/runner.pypackages/nvidia_nat_core/tests/nat/runtime/test_runner.py
✅ Files skipped from review due to trivial changes (1)
- packages/nvidia_nat_core/tests/nat/runtime/test_runner.py
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/nvidia_nat_core/src/nat/runtime/runner.py
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py (1)
278-284: Consider debug-level logging for suppressed exceptions.The silent
passis intentional for graceful cancellation cleanup, but logging atDEBUGlevel would aid troubleshooting without adding noise. Static analysis also flags this pattern (S110, BLE001).♻️ Optional: Add debug logging
if self._running_workflow_task is not None: self._running_workflow_task.cancel() try: await self._running_workflow_task - except (asyncio.CancelledError, Exception): - pass + except asyncio.CancelledError: + logger.debug("Previous workflow task cancelled successfully") + except Exception: + logger.debug("Previous workflow task raised exception during cancellation", exc_info=True) self._running_workflow_task = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py` around lines 278 - 284, The except block that currently swallows CancelledError and other Exception for self._running_workflow_task should log the caught exception at debug level instead of silently passing; update the handler around self._running_workflow_task.cancel() / await self._running_workflow_task to call the module logger (or self.logger) with a debug message including the exception info (e.g., logger.debug("workflow task cancelled/failed", exc_info=exc) or similar) when catching asyncio.CancelledError and Exception so suppressed errors are recorded for troubleshooting while keeping behavior unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py`:
- Around line 278-284: The except block that currently swallows CancelledError
and other Exception for self._running_workflow_task should log the caught
exception at debug level instead of silently passing; update the handler around
self._running_workflow_task.cancel() / await self._running_workflow_task to call
the module logger (or self.logger) with a debug message including the exception
info (e.g., logger.debug("workflow task cancelled/failed", exc_info=exc) or
similar) when catching asyncio.CancelledError and Exception so suppressed errors
are recorded for troubleshooting while keeping behavior unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: baa9419b-ef7f-4c5c-8234-504d966e24ad
📒 Files selected for processing (1)
packages/nvidia_nat_core/src/nat/front_ends/fastapi/message_handler.py
Description
Fixes a bug where an in-flight workflow task was not cancelled when the user clicked "Stop Generating" and submitted a new prompt. This left the handler stuck, most visibly when an OAuth login window was blocked by popup blocker or closed by the user.
Closes #1818
By Submitting this PR I confirm:
Summary by CodeRabbit
Bug Fixes
Tests