-
Notifications
You must be signed in to change notification settings - Fork 25
Add support for stream responses #605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds comprehensive streaming response support to the storey data processing library, enabling steps to yield multiple chunks of data incrementally rather than returning a single result. The implementation includes new primitive types for streaming, modifications to existing flow steps, a new Collector step for aggregating streams, and extensive test coverage.
Changes:
- Added streaming primitives (StreamChunk, StreamCompletion, StreamingError) and modified Map, MapClass, Complete, Reduce, and ParallelExecution steps to support generator functions
- Introduced Collector step to aggregate streaming chunks back into single events
- Updated AwaitableResult and AsyncAwaitableResult to return generators for streaming responses
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| storey/dtypes.py | Adds StreamChunk, StreamCompletion, and StreamingError classes for streaming support |
| storey/flow.py | Adds _StreamingStepMixin and updates Map, MapClass, Complete, Reduce, ParallelExecution, and Choice to handle streaming |
| storey/sources.py | Updates AwaitableResult and AsyncAwaitableResult to support streaming generators |
| storey/steps/collector.py | Implements new Collector step for aggregating streaming chunks |
| storey/steps/init.py | Exports the new Collector step |
| storey/init.py | Exports StreamingError and Collector for public API |
| tests/test_streaming.py | Comprehensive test suite covering streaming primitives, Map/MapClass streaming, Collector, Complete, error handling, graph splits, and cyclic graphs |
| tests/test_flow.py | Refactors cycle creation to use cleaner .to() API instead of direct _outlets manipulation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
alxtkr77
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests/test_streaming.py - Test Duplication
The 56 tests have significant structural duplication - almost every test has both a sync and async version with ~90% identical code (27 pairs).
Consider using pytest parametrization to consolidate:
@pytest.fixture(params=["sync", "async"])
def flow_context(request):
if request.param == "sync":
return SyncEmitSource, lambda f: f()
else:
return AsyncEmitSource, lambda f: asyncio.run(f())
def test_collector_basic(self, flow_context):
source_cls, run = flow_context
# Single implementation handles bothThis would reduce tests from 56 to ~30 while maintaining the same coverage, and make future maintenance easier.
alxtkr77
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flow.py - Duplicate _is_generator method
The _is_generator() method is defined identically in two places:
_StreamingStepMixin(line ~201)ParallelExecutionRunnable(line ~1448)
Consider reusing the mixin method or extracting to a shared utility to avoid duplication.
| event_copy.original_event._awaitable_result = awaitable_result | ||
| event_copy.original_event._original_events = original_events | ||
| tasks.append(asyncio.get_running_loop().create_task(outlets[i]._do_and_recover(event_copy))) | ||
| # Restore after deepcopy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamCompletion branch duplicates significant logic from the regular event branch (handling awaitable_result, original_events, deepcopy loop).
Consider extracting the common pattern into a helper method to reduce duplication.
| ) | ||
|
|
||
| async def _emit_streaming_chunks(self, original_event, generator): | ||
| """Emit streaming chunks from a generator, then send StreamCompletion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints to the parameters:
async def _emit_streaming_chunks(
self,
original_event: Event,
generator: Union[Generator, AsyncGenerator]
) -> None:| self._expected_completions = expected_completions | ||
| # Map from event id -> {"chunks": [], "completions": 0, "first_event": Event} | ||
| self._collected_streams: dict[str, dict] = defaultdict( | ||
| lambda: {"chunks": [], "completions": 0, "first_event": None} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _collected_streams dict grows unbounded if streams never complete (e.g., due to upstream errors or flow termination). While _cleanup warns about incomplete streams, consider adding:
- A configurable timeout for incomplete streams
- Or a max size limit to prevent memory issues in long-running flows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two things have been explicitly excluded from the scope of this feature (max size from the start, timeout was dropped).
|
|
||
| assert result == [10, 20] | ||
|
|
||
| def test_collector_single_chunk_unwrap(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't have an async counterpart (test_async_collector_single_chunk_unwrap), unlike the other Collector tests. Consider adding for consistency.
ML-11875
ML-11877