-
Notifications
You must be signed in to change notification settings - Fork 4.5k
feat(io): Implement parallel reading in SparkReceiverIO (Fixes #37410) #37411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(io): Implement parallel reading in SparkReceiverIO (Fixes #37410) #37411
Conversation
…Readers Adds withNumReaders() to SparkReceiverIO and implements parallel execution using Create.of(shards) + Reshuffle. This addresses scalability limitations by allowing work distribution across multiple workers.
Summary of ChangesHello @ATHARVA262005, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the scalability of Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR attempts to add parallel reading capability to SparkReceiverIO to address a scalability bottleneck where data reading was previously restricted to a single worker. The implementation adds a withNumReaders(int) configuration option and changes the internal architecture from using Impulse.create() to using Create.of(shards) + Reshuffle for distributing work across multiple workers.
Changes:
- Added
withNumReaders(int)builder method to configure the number of concurrent readers - Modified
ReadFromSparkReceiverWithOffsetDoFnto acceptIntegershard identifiers instead ofbyte[] - Implemented parallel work distribution using
Create.of(shards)+Reshufflepattern whennumReaders > 1 - Added test case
testReadFromCustomReceiverWithParallelismto verify parallel reading behavior - Updated CHANGES.md to document the new feature
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| SparkReceiverIO.java | Added numReaders configuration, modified expand() method to support parallel reading with sharding, changed DoFn input type from byte[] to Integer |
| ReadFromSparkReceiverWithOffsetDoFn.java | Updated DoFn signature to accept Integer element type instead of byte[], reformatted some comments |
| SparkReceiverIOTest.java | Added new test case for parallel reading with 3 readers |
| ReadFromSparkReceiverWithOffsetDoFnTest.java | Updated test element type from byte[] to Integer |
| CHANGES.md | Added entry documenting parallel reading support |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * A number of workers to read from Spark {@link Receiver}. | ||
| * | ||
| * <p>If this value is not set, or set to 1, the reading will be performed on a single worker. |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for withNumReaders claims it will allow reading from multiple workers, but it doesn't explain that each reader will independently read all data, resulting in duplicates. The documentation should clearly state the expected behavior - whether it's intended to duplicate data for redundancy or to distribute work without duplication. Currently, the behavior doesn't match the stated goal of addressing a "scalability bottleneck" since duplicating data doesn't improve scalability.
| * A number of workers to read from Spark {@link Receiver}. | |
| * | |
| * <p>If this value is not set, or set to 1, the reading will be performed on a single worker. | |
| * Configures how many independent workers (readers) will read from the same Spark | |
| * {@link Receiver}. | |
| * | |
| * <p>Each configured reader connects to the underlying source independently and will | |
| * typically observe the full stream of data. As a result, records may be duplicated | |
| * across readers; this option does <b>not</b> shard or partition the input among workers. | |
| * | |
| * <p>This setting is intended for use cases where redundant consumption of the same data | |
| * is acceptable (for example, to increase robustness when dealing with flaky sources), | |
| * and should not be used as a mechanism for load-balancing or avoiding scalability | |
| * bottlenecks via input partitioning. If you require a single logical read without | |
| * duplicates, leave {@code numReaders} at its default of {@code 1} and apply your own | |
| * partitioning or deduplication to the resulting {@link PCollection}. | |
| * | |
| * <p>If this value is not set, or set to {@code 1}, the reading will be performed on a | |
| * single worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I have refactored the implementation to support proper sharding.
- I added setShard(int shardId, int numShards) to the HasOffset interface.
- The DoFn now passes the unique shard ID to the Receiver via setShard.
- I updated the documentation to clarify that the receiver is expected to handle partitioning based on these parameters.
| checkArgument(startPollTimeoutSec != null, "Start poll timeout can not be null"); | ||
| return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build(); | ||
| } | ||
|
|
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Javadoc comment for withStartOffset was removed, creating inconsistency with the other configuration methods (withPullFrequencySec, withStartPollTimeoutSec, etc.) which all have Javadoc comments. The comment should be restored to maintain documentation consistency across the API.
| /** Inclusive start offset from which the reading should be started. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. I have restored the Javadoc for withStartOffset to match the consistency of the other methods.
| public ProcessContinuation processElement( | ||
| @Element byte[] element, | ||
| @Element Integer element, |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The element parameter (representing the shard ID) is never used in the processElement method. This means that when multiple readers are configured (via withNumReaders), each DoFn instance will independently create a SparkReceiver starting from the same startOffset, resulting in duplicate data being read. For example, with 3 readers, the same 20 records will be read 3 times, producing 60 total records with duplicates.
This defeats the purpose of parallel reading for scalability. The shard ID should be used to either:
- Partition the offset range among readers (e.g., reader 0 reads offsets 0-6, reader 1 reads 7-13, reader 2 reads 14-19), or
- Use the shard ID to configure the Spark Receiver to read from different partitions/sources
Without this coordination, the feature creates duplicate data rather than distributing work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. The processElement method now uses the element (Shard ID) and passes it to the receiver:
if (sparkReceiver instanceof HasOffset) {
((HasOffset) sparkReceiver).setShard(element, numReaders);
}| for (int j = 0; j < 3; j++) { | ||
| for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) { | ||
| expected.add(String.valueOf(i)); | ||
| } |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test expects 60 records (3 readers * 20 records each), which means it's testing for duplicate data rather than proper parallel reading. This test validates the buggy behavior where each reader independently reads all data from the source. The test should instead verify that 20 unique records are read once, distributed across the 3 readers without duplication.
| for (int j = 0; j < 3; j++) { | |
| for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) { | |
| expected.add(String.valueOf(i)); | |
| } | |
| for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) { | |
| expected.add(String.valueOf(i)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I have updated CustomReceiverWithOffset to implement the setShard logic (using simple modulo partitioning).
I also updated the test assertion to expect 20 unique records total strings '0' to '19' exactly once), verifying that the work is correctly distributed without duplication.
| .apply(Impulse.create()) | ||
| .apply( | ||
| MapElements.into(TypeDescriptors.integers()) | ||
| .via( | ||
| new SerializableFunction<byte[], Integer>() { | ||
| @Override | ||
| public Integer apply(byte[] input) { | ||
| return 0; | ||
| } | ||
| })) |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The backward compatibility path introduces unnecessary complexity by converting the Impulse output (byte[]) to Integer via MapElements. This conversion is not needed - you could directly use Impulse.create() followed by MapElements to produce an Integer shard ID of 0, or better yet, use Create.of(0) directly for consistency with the multi-reader path. The current approach adds an extra transformation step without clear benefit.
| .apply(Impulse.create()) | |
| .apply( | |
| MapElements.into(TypeDescriptors.integers()) | |
| .via( | |
| new SerializableFunction<byte[], Integer>() { | |
| @Override | |
| public Integer apply(byte[] input) { | |
| return 0; | |
| } | |
| })) | |
| .apply(Create.of(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. I removed the Impulse + MapElements chain and replaced it with Create.of(0) as suggested. This is cleaner and consistent with the multi-reader path.
1. Implemented setShard() in HasOffset to allow receivers to handle partitioning (prevents data duplication). 2. Updated CustomReceiverWithOffset to filter records based on shardId. 3. Updated DoFn to pass shardId/numShards to the receiver. 4. Restored Javadoc for withStartOffset. 5. Simplified backward compatibility logic in expand() using Create.of(). 6. Updated tests to verify parallel reading produces correct, unique record set.
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Description
This PR addresses a scalability bottleneck in
SparkReceiverIOwhere data reading was previously restricted to a single worker.Previously, the
Readtransform usedImpulse.create(), which limited the downstreamSplittable DoFnexecution to a single initial restriction. This change introduces proper parallelization logic, allowing users to configure multiple concurrent readers.Key Changes:
withNumReaders(int): A new configuration option to theSparkReceiverIObuilder.Create.of(shards)+Reshufflepattern inexpand(). WhennumReaders > 1, this forces the distribution of reading tasks across available Beam workers.ReadFromSparkReceiverWithOffsetDoFnto accept explicitIntegershard identifiers instead of genericbyte[].numReadersis partially set or defaults to 1, the IO mimics the original behavior exactly.Issue Links
Fixes #37410
Type of change
Tests
I have added a new unit test
testReadFromCustomReceiverWithParallelismtoSparkReceiverIOTest.javawhich verifies that:Running the tests: