Skip to content

Having to handle CancelledError instead of it working automatically #485

@chamini2

Description

@chamini2

I am having to do

        try:
            async with sse_response(request) as response:
                async for event in self.execute_workflow(
                    layers, caller_user, resolved_workflow, payload
                ):
                    await response.send(json.dumps(event.to_json()))
        except asyncio.CancelledError:
            # aiohttp_sse is not handling the finishing of the response properly
            # and it is raising an internal CancelledError used for stopping the
            # response. We have to catch it and ignore it.
            logger.debug(
                "Ignoring cancelled error in streaming workflow request", exc_info=True
            )

And if we look at the exc info it is happening inside the lib:

Ignoring cancelled error in streaming workflow request
Traceback (most recent call last):
  File "/opt/isolate_controller/projects/isolate_controller/isolate_controller/gateway/_gateway.py", line 1071, in streaming_workflow_request
    async with sse_response(request) as response:
  File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/helpers.py", line 61, in __aexit__
    return await self._obj.__aexit__(exc_type, exc, tb)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/__init__.py", line 221, in __aexit__
    await self.wait()
  File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/__init__.py", line 147, in wait
    await self._ping_task
  File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/__init__.py", line 203, in _ping
    await asyncio.sleep(self._ping_interval)
  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 649, in sleep
    return await future
           ^^^^^^^^^^^^
asyncio.exceptions.CancelledError

async def wait(self) -> None:
"""EventSourceResponse object is used for streaming data to the client,
this method returns future, so we can wait until connection will
be closed or other task explicitly call ``stop_streaming`` method.
"""
if self._ping_task is None:
raise RuntimeError("Response is not started")
try:
await self._ping_task
except asyncio.CancelledError:
if (
sys.version_info >= (3, 11)
and (task := asyncio.current_task())
and task.cancelling()
):
raise
def stop_streaming(self) -> None:
"""Used in conjunction with ``wait`` could be called from other task
to notify client that server no longer wants to stream anything.
"""
if self._ping_task is None:
raise RuntimeError("Response is not started")
self._ping_task.cancel()

I do not see any other source of CancelledError being riased in my code, so I am thinking this is somehow maybe getting the wrong reference in asyncio.current_task()?

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions