-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add support for tagged output type hints. #37434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Warning Gemini is experiencing higher than usual traffic and was unable to create the summary. Please try again in a few hours by commenting |
1bdd579 to
a5fcdc0
Compare
|
/gemini summary |
Summary of ChangesThis pull request significantly advances the type hinting capabilities within the Apache Beam Python SDK by introducing robust support for tagged outputs. It enables developers to explicitly define types for all outputs of a transform, whether main or side outputs, through decorator, method chaining, or function annotation styles. This enhancement aims to improve code readability, facilitate static analysis, and reduce runtime type-related errors in complex Beam pipelines that utilize multiple outputs. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces comprehensive support for tagged output type hints in Apache Beam's Python SDK. This is a significant enhancement that improves type safety and clarity for multi-output transforms. The changes span across pvalue.py, transforms/core.py, and typehints/decorators.py, with corresponding updates to existing tests and the addition of a new, thorough test file (tagged_output_typehints_test.py). The implementation correctly handles the extraction, propagation, and validation of tagged output types through various mechanisms, including decorators, method chaining, and function annotations. The addition of warning messages for undeclared tags in with_outputs() is a good user experience improvement. The overall approach is well-structured and robust.
|
R: @damccorm |
|
R: @robertwb |
|
R: @jrmccluskey |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
2 similar comments
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
jrmccluskey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks! I like where this wound up, should be pretty easy for users to add to their pipelines
| return None, {tag: typ} | ||
|
|
||
| if t is TaggedOutput: | ||
| raise TypeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, should this fail or just warn? I saw an issue #26493 where someone is using typehint -> Iterable[Union[Dict[str, Any], pvalue.TaggedOutput]]:
Their pipeline will break until they fix the typehint. Maybe we should rather warn that the tagged typehints cannot be parsed instead of throwing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably allow this as pvalue.TaggedOutput[*, Any], though perhaps with a warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking for the sake of update compat, it makes sense to fall back to old behavior for bare TaggedOutput. This means the main and tagged outputs will fall back to Union -> Any -> FastPrimitivesCoder?
If we allow this as pvalue.TaggedOutput[*, Any], there is a chance the main output will now map to a different coder e.g.
def dofn(element) -> int | TaggedOutput
If we still extract main, and map tagged outputs to Any then main type will change from Union[int, TaggedOutput] to int and the coder will change from FastPrimitivesCoder to IntCoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is generally a good thing. But if needed we could guard this with the update compatibility flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think an update compat flag is necessary? If the coder changed from FastPrimitivesCoder to IntCoder, then updating a pipeline will fail. The user will just have to change their typehints to -> Any and then the update should work.
robertwb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice.
Have you looked into updating type inference to be able to handle TaggedOutput types?
| return None, {tag: typ} | ||
|
|
||
| if t is TaggedOutput: | ||
| raise TypeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably allow this as pvalue.TaggedOutput[*, Any], though perhaps with a warning.
| yield_type = get_args(return_annotation)[0] | ||
| clean_yield, tagged_types = _extract_main_and_tagged(yield_type) | ||
| clean_main = clean_yield if clean_yield else Any | ||
| return [Iterable[clean_main]], tagged_types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a bit asymmetric here to be wrapping the main type in an iterable but not the tagged types. Should we instead be calling this after the (higher level) iterable unwrapping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed it is not ideal. I'm not sure what is better. Are you suggesting we defer tagged output extraction until during/after strip_iterable?
from_callable doesnt extract any tagged inputs, so
fn(x: int) -> Iterable[int | TaggedOutput[Literal['errors'], str]]
-> output_types = ((Iterable[int | TaggedOutput[Literal['errors'], str]],), {})
convert_to_beam_type passes through TaggedOutput
convert_to_beam_type(Iterable[int | TaggedOutput[Literal['errors'], str]])
-> Iterable[Union[int, TaggedOutput[Literal['errors'], str]]] (TaggedOutput preserved)
strip_iterable() unwraps
Iterable[Union[int, TaggedOutput[...]]] → Union[int, TaggedOutput[...]]
Finally we extract tagged outputs?
|
|
||
| class AnnotationStyleTaggedOutputTest(unittest.TestCase): | ||
| """Tests for function annotation style across all transforms.""" | ||
| def test_map_annotation_union(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woo hoo!
What do you mean by this? Are you referring to beam/sdks/python/apache_beam/pipeline.py Line 995 in 96edda4
This already works and I should remove the TODO comment. The element type will already be set in DoOutputsTuple when the tagged pcollection is created. |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.