Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
## I/Os

* Add support for Datadog IO (Java) ([#37318](https://github.com/apache/beam/issues/37318)).
* Support for parallel reading in SparkReceiverIO (Java) ([#37410](https://github.com/apache/beam/issues/37410)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,13 @@ public interface HasOffset {
* Some {@link org.apache.spark.streaming.receiver.Receiver} support mechanism of checkpoint (e.g.
* ack). This method should be called before stopping the receiver.
*/
default void setCheckpoint(Long recordsProcessed) {};
default void setCheckpoint(Long recordsProcessed) {}

/**
* Set the shard identifier and the total number of shards for parallel reading.
*
* @param shardId The unique identifier for this shard (reader).
* @param numShards The total number of shards (readers).
*/
default void setShard(int shardId, int numShards) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
*/
@UnboundedPerElement
class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<Integer, V> {

private static final Logger LOG =
LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
Expand All @@ -79,6 +79,7 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
private final Long pullFrequencySec;
private final Long startPollTimeoutSec;
private final Long startOffset;
private final int numReaders;

ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> transform) {
createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
Expand Down Expand Up @@ -115,10 +116,13 @@ class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
startOffset = DEFAULT_START_OFFSET;
}
this.startOffset = startOffset;

Integer numReadersObj = transform.getNumReaders();
this.numReaders = (numReadersObj != null) ? numReadersObj : 1;
}

@GetInitialRestriction
public OffsetRange initialRestriction(@Element byte[] element) {
public OffsetRange initialRestriction(@Element Integer element) {
return new OffsetRange(startOffset, Long.MAX_VALUE);
}

Expand All @@ -134,13 +138,13 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
}

@GetSize
public double getSize(@Element byte[] element, @Restriction OffsetRange offsetRange) {
public double getSize(@Element Integer element, @Restriction OffsetRange offsetRange) {
return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
}

@NewTracker
public OffsetRangeTracker restrictionTracker(
@Element byte[] element, @Restriction OffsetRange restriction) {
@Element Integer element, @Restriction OffsetRange restriction) {
return new OffsetRangeTracker(restriction) {
private final AtomicBoolean isCheckDoneCalled = new AtomicBoolean(false);

Expand Down Expand Up @@ -178,7 +182,8 @@ public Coder<OffsetRange> restrictionCoder() {
}

// Need to do an unchecked cast from Object
// because org.apache.spark.streaming.receiver.ReceiverSupervisor accepts Object in push methods
// because org.apache.spark.streaming.receiver.ReceiverSupervisor accepts Object
// in push methods
@SuppressWarnings("unchecked")
private static class SparkConsumerWithOffset<V> implements SparkConsumer<V> {
private final Queue<V> recordsQueue;
Expand Down Expand Up @@ -210,8 +215,9 @@ public void start(Receiver<V> sparkReceiver) {
return null;
}
/*
Use only [0] element - data.
The other elements are not needed because they are related to Spark environment options.
* Use only [0] element - data.
* The other elements are not needed because they are related to Spark
* environment options.
*/
Object data = input[0];

Expand Down Expand Up @@ -265,7 +271,7 @@ public void stop() {

@ProcessElement
public ProcessContinuation processElement(
@Element byte[] element,
@Element Integer element,
Comment on lines 267 to +274
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The element parameter (representing the shard ID) is never used in the processElement method. This means that when multiple readers are configured (via withNumReaders), each DoFn instance will independently create a SparkReceiver starting from the same startOffset, resulting in duplicate data being read. For example, with 3 readers, the same 20 records will be read 3 times, producing 60 total records with duplicates.

This defeats the purpose of parallel reading for scalability. The shard ID should be used to either:

  1. Partition the offset range among readers (e.g., reader 0 reads offsets 0-6, reader 1 reads 7-13, reader 2 reads 14-19), or
  2. Use the shard ID to configure the Spark Receiver to read from different partitions/sources

Without this coordination, the feature creates duplicate data rather than distributing work.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. The processElement method now uses the element (Shard ID) and passes it to the receiver:

if (sparkReceiver instanceof HasOffset) {
  ((HasOffset) sparkReceiver).setShard(element, numReaders);
}

RestrictionTracker<OffsetRange, Long> tracker,
WatermarkEstimator<Instant> watermarkEstimator,
OutputReceiver<V> receiver) {
Expand All @@ -284,6 +290,9 @@ public ProcessContinuation processElement(
}
LOG.debug("Restriction {}", tracker.currentRestriction().toString());
sparkConsumer = new SparkConsumerWithOffset<>(tracker.currentRestriction().getFrom());
if (sparkReceiver instanceof HasOffset) {
((HasOffset) sparkReceiver).setShard(element, numReaders);
}
sparkConsumer.start(sparkReceiver);

Long recordsProcessed = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.transforms.Impulse;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -99,6 +103,8 @@ public abstract static class Read<V> extends PTransform<PBegin, PCollection<V>>

abstract @Nullable Long getStartOffset();

abstract @Nullable Integer getNumReaders();

abstract Builder<V> toBuilder();

@AutoValue.Builder
Expand All @@ -117,6 +123,8 @@ abstract Builder<V> setSparkReceiverBuilder(

abstract Builder<V> setStartOffset(Long startOffset);

abstract Builder<V> setNumReaders(Integer numReaders);

abstract Read<V> build();
}

Expand Down Expand Up @@ -157,6 +165,16 @@ public Read<V> withStartOffset(Long startOffset) {
return toBuilder().setStartOffset(startOffset).build();
}

/**
* A number of workers to read from Spark {@link Receiver}.
*
* <p>If this value is not set, or set to 1, the reading will be performed on a single worker.
Comment on lines +169 to +171
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation for withNumReaders claims it will allow reading from multiple workers, but it doesn't explain that each reader will independently read all data, resulting in duplicates. The documentation should clearly state the expected behavior - whether it's intended to duplicate data for redundancy or to distribute work without duplication. Currently, the behavior doesn't match the stated goal of addressing a "scalability bottleneck" since duplicating data doesn't improve scalability.

Suggested change
* A number of workers to read from Spark {@link Receiver}.
*
* <p>If this value is not set, or set to 1, the reading will be performed on a single worker.
* Configures how many independent workers (readers) will read from the same Spark
* {@link Receiver}.
*
* <p>Each configured reader connects to the underlying source independently and will
* typically observe the full stream of data. As a result, records may be duplicated
* across readers; this option does <b>not</b> shard or partition the input among workers.
*
* <p>This setting is intended for use cases where redundant consumption of the same data
* is acceptable (for example, to increase robustness when dealing with flaky sources),
* and should not be used as a mechanism for load-balancing or avoiding scalability
* bottlenecks via input partitioning. If you require a single logical read without
* duplicates, leave {@code numReaders} at its default of {@code 1} and apply your own
* partitioning or deduplication to the resulting {@link PCollection}.
*
* <p>If this value is not set, or set to {@code 1}, the reading will be performed on a
* single worker.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I have refactored the implementation to support proper sharding.

  1. I added setShard(int shardId, int numShards) to the HasOffset interface.
  2. The DoFn now passes the unique shard ID to the Receiver via setShard.
  3. I updated the documentation to clarify that the receiver is expected to handle partitioning based on these parameters.

*/
public Read<V> withNumReaders(int numReaders) {
checkArgument(numReaders > 0, "Number of readers should be greater than 0");
return toBuilder().setNumReaders(numReaders).build();
}

@Override
public PCollection<V> expand(PBegin input) {
validateTransform();
Expand Down Expand Up @@ -191,10 +209,20 @@ public PCollection<V> expand(PBegin input) {
sparkReceiverBuilder.getSparkReceiverClass().getName()));
} else {
LOG.info("{} started reading", ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
return input
.apply(Impulse.create())
.apply(ParDo.of(new ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
// TODO: Split data from SparkReceiver into multiple workers
Integer numReadersObj = sparkReceiverRead.getNumReaders();
if (numReadersObj == null || numReadersObj == 1) {
return input
.apply(Create.of(0))
.apply(ParDo.of(new ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
} else {
int numReaders = numReadersObj;
List<Integer> shards =
IntStream.range(0, numReaders).boxed().collect(Collectors.toList());
return input
.apply(Create.of(shards))
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class CustomReceiverWithOffset extends Receiver<String> implements HasOff
public static final int RECORDS_COUNT = 20;

/*
Used in test for imitation of reading with exception
*/
* Used in test for imitation of reading with exception
*/
public static boolean shouldFailInTheMiddle = false;

private Long startOffset;
private int shardId = 0;
private int numShards = 1;

CustomReceiverWithOffset() {
super(StorageLevel.MEMORY_AND_DISK_2());
Expand All @@ -53,6 +55,12 @@ public void setStartOffset(Long startOffset) {
}
}

@Override
public void setShard(int shardId, int numShards) {
this.shardId = shardId;
this.numShards = numShards;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onStart() {
Expand All @@ -76,7 +84,9 @@ private void receive() {
LOG.debug("Expected fail in the middle of reading");
throw new IllegalStateException("Expected exception");
}
store(String.valueOf(currentOffset));
if (currentOffset % numShards == shardId) {
store(String.valueOf(currentOffset));
}
currentOffset++;
} else {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/** Test class for {@link ReadFromSparkReceiverWithOffsetDoFn}. */
public class ReadFromSparkReceiverWithOffsetDoFnTest {

private static final byte[] TEST_ELEMENT = new byte[] {};
private static final Integer TEST_ELEMENT = 0;

private final ReadFromSparkReceiverWithOffsetDoFn<String> dofnInstance =
new ReadFromSparkReceiverWithOffsetDoFn<>(makeReadTransform());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,32 @@ public void testReadFromReceiverIteratorData() {
PAssert.that(actual).containsInAnyOrder(expected);
pipeline.run().waitUntilFinish(Duration.standardSeconds(15));
}

@Test
public void testReadFromCustomReceiverWithParallelism() {
CustomReceiverWithOffset.shouldFailInTheMiddle = false;
ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
SparkReceiverIO.Read<String> reader =
SparkReceiverIO.<String>read()
.withGetOffsetFn(Long::valueOf)
.withTimestampFn(Instant::parse)
.withPullFrequencySec(PULL_FREQUENCY_SEC)
.withStartPollTimeoutSec(START_POLL_TIMEOUT_SEC)
.withStartOffset(START_OFFSET)
.withSparkReceiverBuilder(receiverBuilder)
.withNumReaders(3);

List<String> expected = new ArrayList<>();
// With sharding enabled in CustomReceiverWithOffset, the total records read
// across all workers
// should be exactly the set of 0..RECORDS_COUNT-1, each read exactly once.
for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
expected.add(String.valueOf(i));
}
PCollection<String> actual = pipeline.apply(reader).setCoder(StringUtf8Coder.of());

PAssert.that(actual).containsInAnyOrder(expected);
pipeline.run().waitUntilFinish(Duration.standardSeconds(15));
}
}
Loading