diff --git a/build.gradle b/build.gradle
index 5f7a0c4ff700e..ef7676cb57483 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2171,6 +2171,7 @@ project(':clients:clients-integration-tests') {
testImplementation project(':raft')
testImplementation project(':server')
testImplementation project(':storage')
+ testImplementation testFixtures(project(':storage'))
testImplementation project(':core').sourceSets.test.output
testImplementation testFixtures(project(':clients'))
implementation project(':server-common')
diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerDLQTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerDLQTest.java
index b3f7eb09afb3e..e73212a9b0851 100644
--- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerDLQTest.java
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerDLQTest.java
@@ -20,6 +20,7 @@
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
@@ -267,6 +268,71 @@ public void testRejectedRecordsWrittenToDlqWithCopyRecordEnabled() throws Except
verifyDlqMetrics(groupId, recordCount);
}
+ /**
+ * End-to-end DLQ copy when the source records have been tiered to remote storage. The source topic enables
+ * tiered storage and rolls a segment per record, with a 45s total retention and a 5s local retention so the
+ * early offsets are offloaded to remote storage and then deleted locally (well before the remote segments
+ * expire). Once the local segments are gone (verified via the earliest-local offset advancing past them), the
+ * records are rejected with record copy enabled - so the DLQ record fetcher must read the original records
+ * back from remote storage. The resulting DLQ records carrying the original key/value confirm the fetcher
+ * successfully pulled them from remote storage.
+ *
+ *
Tiered storage is backed by the local-filesystem {@code LocalTieredStorage} RSM and the default
+ * {@code TopicBasedRemoteLogMetadataManager}; short task/cleanup intervals keep the offload + local-delete
+ * cycle quick.
+ */
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key = "remote.log.storage.system.enable", value = "true"),
+ @ClusterConfigProperty(key = "remote.log.storage.manager.class.name",
+ value = "org.apache.kafka.server.log.remote.storage.LocalTieredStorage"),
+ @ClusterConfigProperty(key = "remote.log.manager.task.interval.ms", value = "500"),
+ @ClusterConfigProperty(key = "remote.log.metadata.manager.listener.name", value = "EXTERNAL"),
+ @ClusterConfigProperty(key = "rlmm.config.remote.log.metadata.topic.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "rlmm.config.remote.log.metadata.topic.num.partitions", value = "1"),
+ @ClusterConfigProperty(key = "log.retention.check.interval.ms", value = "500"),
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100")
+ }
+ )
+ public void testDlqCopiesRecordsReadFromRemoteStorage() throws Exception {
+ String groupId = "dlq-remote-group";
+ // The broker's default share-group DLQ topic prefix is "dlq.", so the topic name must start with it.
+ String dlqTopic = "dlq.remote";
+ String sourceTopic = "dlq-remote-source";
+ int recordCount = 5;
+
+ alterShareAutoOffsetReset(groupId, "earliest");
+ createDlqTopic(dlqTopic);
+ alterShareGroupConfig(groupId, GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, dlqTopic);
+ // Record copy enabled: the DLQ fetcher must read the original records back to copy their key/value.
+ alterShareGroupConfig(groupId, GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "true");
+
+ // Tiered source topic: one segment per record, 45s total retention and 5s local retention so inactive
+ // segments are offloaded to remote storage and then deleted locally shortly after, while the remote
+ // segments comfortably survive the rest of the test.
+ createRemoteStorageSourceTopic(sourceTopic, 45_000L, 5_000L);
+
+ produceTo(sourceTopic, 0, recordCount);
+
+ // Wait until the early offsets have been offloaded to remote storage AND removed locally - i.e. the
+ // earliest *local* offset has advanced past them, so reading those offsets must now hit remote storage.
+ // The last record stays in the active (never-offloaded) segment, so the earliest local offset should
+ // reach recordCount - 1. A generous timeout (well inside the 45s remote retention) absorbs remote-log
+ // metadata-manager startup; it normally resolves a few seconds after the 5s local retention elapses.
+ waitForCondition(() -> earliestLocalOffset(sourceTopic, 0) >= recordCount - 1,
+ 30_000L, 500L,
+ () -> "Source records were not tiered to remote storage and removed locally in time");
+
+ // Reject every record. Both the share fetch (to deliver them) and the DLQ record fetcher (to copy them)
+ // must read the tiered offsets back from remote storage.
+ rejectRecords(groupId, sourceTopic, recordCount);
+
+ // Record copy is enabled, so every DLQ record must carry the original key/value. For the tiered offsets
+ // (no longer present locally) that is only possible if the DLQ fetcher pulled them from remote storage.
+ verifyDlqTopicRecords(dlqTopic, groupId, sourceTopic, 0, expectedSourceOffsets(recordCount), true);
+ verifyDlqMetrics(groupId, recordCount);
+ }
+
/**
* Rejects records from a multi-partition source topic and verifies they are routed to the correct DLQ
* partition. The destination partition is {@code sourcePartition % numDlqPartitions}; with a DLQ topic that
@@ -537,13 +603,23 @@ private void verifyDlqTopicCreated(String dlqTopic) throws Exception {
*/
private void verifyDlqTopicRecords(String dlqTopic, String groupId, Set expectedSourceOffsets,
boolean copyEnabled) throws InterruptedException {
+ verifyDlqTopicRecords(dlqTopic, groupId, tp.topic(), tp.partition(), expectedSourceOffsets, copyEnabled);
+ }
+
+ /**
+ * As {@link #verifyDlqTopicRecords(String, String, Set, boolean)}, but for an explicit source topic-partition
+ * (rather than the base {@code tp}). The DLQ records' context headers must reference {@code sourceTopic} /
+ * {@code sourcePartition}.
+ */
+ private void verifyDlqTopicRecords(String dlqTopic, String groupId, String sourceTopic, int sourcePartition,
+ Set expectedSourceOffsets, boolean copyEnabled) throws InterruptedException {
List> dlqRecords = readDlqPartition(dlqTopic, 0, expectedSourceOffsets.size());
assertEquals(expectedSourceOffsets.size(), dlqRecords.size(), "Unexpected number of records on the DLQ topic");
Set actualSourceOffsets = new HashSet<>();
for (ConsumerRecord record : dlqRecords) {
if (copyEnabled) {
- // produceMessages() produces records with key "key" and value "value".
+ // produceMessages()/produceTo() produce records with key "key" and value "value".
assertEquals("key", new String(Objects.requireNonNull(record.key()), StandardCharsets.UTF_8),
"DLQ record key should be copied when record copy is enabled");
assertEquals("value", new String(Objects.requireNonNull(record.value()), StandardCharsets.UTF_8),
@@ -553,8 +629,8 @@ private void verifyDlqTopicRecords(String dlqTopic, String groupId, Set ex
assertNull(record.value(), "DLQ record value should be null when record copy is disabled");
}
assertEquals(groupId, headerValue(record, HEADER_DLQ_ERRORS_GROUP));
- assertEquals(tp.topic(), headerValue(record, HEADER_DLQ_ERRORS_TOPIC));
- assertEquals(Integer.toString(tp.partition()), headerValue(record, HEADER_DLQ_ERRORS_PARTITION));
+ assertEquals(sourceTopic, headerValue(record, HEADER_DLQ_ERRORS_TOPIC));
+ assertEquals(Integer.toString(sourcePartition), headerValue(record, HEADER_DLQ_ERRORS_PARTITION));
actualSourceOffsets.add(Long.parseLong(Objects.requireNonNull(headerValue(record, HEADER_DLQ_ERRORS_OFFSET))));
}
assertEquals(expectedSourceOffsets, actualSourceOffsets, "DLQ records should cover every expected source offset");
@@ -591,6 +667,36 @@ private void createDlqTopic(String topicName, int numPartitions) {
}, "Failed to create DLQ topic");
}
+ // Creates a single-partition source topic with tiered storage enabled and one log segment per record (via
+ // per-record index entries). A short local retention (`localRetentionMs`) deletes inactive segments from
+ // local storage soon after they are offloaded to remote storage, so reading those offsets must hit remote
+ // storage; the total retention (`retentionMs`) is kept generous so the remote segments are not deleted while
+ // the test is still running.
+ private void createRemoteStorageSourceTopic(String topic, long retentionMs, long localRetentionMs) {
+ assertDoesNotThrow(() -> {
+ try (Admin admin = createAdminClient()) {
+ Map configs = Map.of(
+ TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true",
+ TopicConfig.RETENTION_MS_CONFIG, Long.toString(retentionMs),
+ TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, Long.toString(localRetentionMs),
+ // Roll a segment for every record so each inactive segment can be offloaded then deleted locally.
+ TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1",
+ TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "12");
+ admin.createTopics(Set.of(new NewTopic(topic, 1, (short) 1).configs(configs))).all().get();
+ }
+ }, "Failed to create remote-storage source topic");
+ }
+
+ // The earliest offset still held in local storage. Offsets below this have been removed locally (e.g. after
+ // being offloaded to remote storage), so reading them must hit remote storage. Used to confirm tiering.
+ private long earliestLocalOffset(String topic, int partition) throws Exception {
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ try (Admin admin = createAdminClient()) {
+ return admin.listOffsets(Map.of(topicPartition, OffsetSpec.earliestLocal()))
+ .partitionResult(topicPartition).get().offset();
+ }
+ }
+
// Produces `count` records (key "key", value "value") to a specific topic-partition.
private void produceTo(String topic, int partition, int count) {
try (Producer producer = createProducer()) {
diff --git a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
index acebdb6a735aa..77e9a0636f2c6 100644
--- a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
+++ b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
@@ -25,6 +25,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.share.LogReader;
import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
@@ -37,6 +38,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -116,13 +118,13 @@ public CompletableFuture> readAsy
LinkedHashMap localReadResults =
read(fetchParams, partitionsToFetch, topicPartitionFetchOffsets, partitionMaxBytes);
- // One future per partition; combined into a single future once every partition has resolved.
+ // Only look at partitions with non-null read results.
LinkedHashMap> futures = new LinkedHashMap<>();
for (TopicIdPartition topicIdPartition : partitionsToFetch) {
LogReadResult logReadResult = localReadResults.get(topicIdPartition);
if (logReadResult == null) {
- futures.put(topicIdPartition, CompletableFuture.completedFuture(
- new LogReadResult(Errors.UNKNOWN_SERVER_ERROR)));
+ // Just skip
+ log.debug("Log read result for partition {} is null", topicIdPartition);
continue;
}
@@ -147,6 +149,7 @@ public CompletableFuture> readAsy
return withInfoAndError(logReadResult, localFetchDataInfo, Errors.forException(cause));
}
if (remoteFetchDataInfo == null) {
+ // We want to return successful local read results so no skipping.
return withInfoAndError(logReadResult, localFetchDataInfo, Errors.UNKNOWN_SERVER_ERROR);
}
return withInfoAndError(logReadResult, remoteFetchDataInfo, Errors.NONE);
@@ -185,8 +188,9 @@ private static LogReadResult withInfoAndError(LogReadResult base, FetchDataInfo
* RemoteStorageFetchInfo is the descriptor surfaced by a prior local read as
* FetchDataInfo#delayedRemoteStorageFetch. The read runs on the remote storage reader pool so the
* caller's thread is not blocked; the future completes exceptionally when remote storage is not
- * configured or the read could not be completed. Used internally by readAsync (package-private so
- * it remains unit-testable).
+ * configured, the read could not be completed, or the read does not finish within the configured
+ * remote fetch timeout ({@code remote.fetch.max.wait.ms}). Used internally by readAsync
+ * (package-private so it remains unit-testable).
*/
// Visibility for testing
CompletableFuture readRemote(RemoteStorageFetchInfo remoteStorageFetchInfo) {
@@ -218,6 +222,33 @@ CompletableFuture readRemote(RemoteStorageFetchInfo remoteStorage
future.completeExceptionally(e);
}
+ // Bound the wait on the remote read so a stalled remote tier cannot pin the read (and the
+ // resources held while it is pending) indefinitely. Use the broker's timer wheel - as
+ // DelayedShareFetch does for its remote fetch - rather than CompletableFuture#orTimeout. On
+ // expiry the future completes exceptionally with a TimeoutException, which the caller treats
+ // as a (skippable) read error.
+ if (!future.isDone()) {
+ long timeoutMs = remoteFetchMaxWaitMs();
+ TimerTask timeoutTask = new TimerTask(timeoutMs) {
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException(
+ "Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms."));
+ }
+ };
+ // Cancel the timer task once the read completes (either outcome) so it does not linger in the wheel.
+ future.whenComplete((info, exception) -> timeoutTask.cancel());
+ replicaManager.addShareFetchTimerRequest(timeoutTask);
+ }
+
return future;
}
+
+ /**
+ * The maximum time to wait for a remote-tier read, taken from the broker's
+ * {@code remote.fetch.max.wait.ms}. Read per call since the config is dynamically reconfigurable.
+ */
+ private long remoteFetchMaxWaitMs() {
+ return replicaManager.config().remoteLogManagerConfig().remoteFetchMaxWaitMs();
+ }
}
diff --git a/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java b/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java
index f0f75b329f2f4..036f4f54cdeea 100644
--- a/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java
+++ b/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java
@@ -16,16 +16,20 @@
*/
package kafka.server.share;
+import kafka.server.KafkaConfig;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogReadResult;
@@ -36,12 +40,14 @@
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import scala.Option;
@@ -70,6 +76,18 @@ public class ReplicaManagerLogReaderTest {
new TopicIdPartition(Uuid.randomUuid(), 0, "topic");
private static final TopicIdPartition TOPIC_ID_PARTITION_2 =
new TopicIdPartition(Uuid.randomUuid(), 1, "topic");
+ // A real config with defaults; readRemote() reads remote.fetch.max.wait.ms from it for its timeout.
+ private static final RemoteLogManagerConfig REMOTE_LOG_MANAGER_CONFIG =
+ new RemoteLogManagerConfig(new AbstractConfig(RemoteLogManagerConfig.configDef(), Map.of(), false));
+
+ // A ReplicaManager mock whose config() chain resolves remote.fetch.max.wait.ms, as readRemote() needs.
+ private static ReplicaManager mockReplicaManager() {
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ KafkaConfig kafkaConfig = mock(KafkaConfig.class);
+ when(kafkaConfig.remoteLogManagerConfig()).thenReturn(REMOTE_LOG_MANAGER_CONFIG);
+ when(replicaManager.config()).thenReturn(kafkaConfig);
+ return replicaManager;
+ }
private static RemoteStorageFetchInfo remoteStorageFetchInfo() {
return new RemoteStorageFetchInfo(
@@ -124,7 +142,7 @@ private static LinkedHashMap maxBytes(TopicIdPartitio
@Test
public void testReadReturnsEmptyWhenNoPartitionsToFetch() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
LinkedHashMap result =
@@ -136,7 +154,7 @@ public void testReadReturnsEmptyWhenNoPartitionsToFetch() {
@Test
public void testReadReturnsResultsFromReplicaManager() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
LogReadResult logReadResult = mock(LogReadResult.class);
Seq> readFromLogResult =
CollectionConverters.asScala(List.of(new Tuple2<>(TOPIC_ID_PARTITION, logReadResult))).toSeq();
@@ -157,7 +175,7 @@ public void testReadReturnsResultsFromReplicaManager() {
@Test
public void testReadRemoteCompletesExceptionallyWhenRemoteLogManagerNotConfigured() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
when(replicaManager.remoteLogManager()).thenReturn(Option.empty());
ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
@@ -170,7 +188,7 @@ public void testReadRemoteCompletesExceptionallyWhenRemoteLogManagerNotConfigure
@Test
public void testReadRemoteCompletesWithFetchedData() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -190,7 +208,7 @@ public void testReadRemoteCompletesWithFetchedData() throws Exception {
@Test
public void testReadRemoteCompletesExceptionallyWhenReadResultHasError() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -211,7 +229,7 @@ public void testReadRemoteCompletesExceptionallyWhenReadResultHasError() {
@Test
public void testReadRemoteCompletesExceptionallyWhenReadResultIsEmpty() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -231,7 +249,7 @@ public void testReadRemoteCompletesExceptionallyWhenReadResultIsEmpty() {
@Test
public void testReadRemoteCompletesExceptionallyWhenSchedulingRejected() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -246,9 +264,29 @@ public void testReadRemoteCompletesExceptionallyWhenSchedulingRejected() {
assertSame(rejected, exception.getCause());
}
+ @Test
+ public void testReadRemoteTimesOutWhenRemoteReadNeverCompletes() {
+ ReplicaManager replicaManager = mockReplicaManager();
+ RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+ when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+ // asyncRead never invokes the callback, so the remote read never completes on its own.
+ // Simulate the timer wheel firing by running the scheduled timeout task immediately.
+ doAnswer(invocation -> {
+ invocation.getArgument(0).run();
+ return null;
+ }).when(replicaManager).addShareFetchTimerRequest(any());
+
+ ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
+ CompletableFuture future = logReader.readRemote(remoteStorageFetchInfo());
+
+ assertTrue(future.isCompletedExceptionally());
+ ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS));
+ assertInstanceOf(TimeoutException.class, exception.getCause());
+ }
+
@Test
public void testReadAsyncReturnsEmptyWhenNoPartitionsToFetch() {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
LinkedHashMap result =
@@ -260,7 +298,7 @@ public void testReadAsyncReturnsEmptyWhenNoPartitionsToFetch() {
@Test
public void testReadAsyncReturnsLocalDataWhenNotTiered() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
FetchDataInfo localData = localData();
stubReadFromLog(replicaManager,
List.of(new Tuple2<>(TOPIC_ID_PARTITION, localReadResult(localData, Errors.NONE))));
@@ -280,7 +318,7 @@ public void testReadAsyncReturnsLocalDataWhenNotTiered() throws Exception {
@Test
public void testReadAsyncFollowsRemoteWhenTieredAndReadRemoteTrue() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -308,7 +346,7 @@ public void testReadAsyncFollowsRemoteWhenTieredAndReadRemoteTrue() throws Excep
@Test
public void testReadAsyncSkipsRemoteWhenReadRemoteFalse() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
FetchDataInfo tieredData = tieredData(remoteStorageFetchInfo());
stubReadFromLog(replicaManager,
List.of(new Tuple2<>(TOPIC_ID_PARTITION, localReadResult(tieredData, Errors.NONE))));
@@ -327,7 +365,7 @@ public void testReadAsyncSkipsRemoteWhenReadRemoteFalse() throws Exception {
@Test
public void testReadAsyncReturnsErrorFromLocalRead() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
FetchDataInfo localData = localData();
stubReadFromLog(replicaManager,
List.of(new Tuple2<>(TOPIC_ID_PARTITION, localReadResult(localData, Errors.UNKNOWN_SERVER_ERROR))));
@@ -345,7 +383,7 @@ public void testReadAsyncReturnsErrorFromLocalRead() throws Exception {
@Test
public void testReadAsyncReturnsErrorWhenRemoteReadFails() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -372,7 +410,7 @@ public void testReadAsyncReturnsErrorWhenRemoteReadFails() throws Exception {
@Test
public void testReadAsyncResolvesPartitionsIndependently() throws Exception {
- ReplicaManager replicaManager = mock(ReplicaManager.class);
+ ReplicaManager replicaManager = mockReplicaManager();
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
@@ -400,4 +438,23 @@ public void testReadAsyncResolvesPartitionsIndependently() throws Exception {
assertSame(remoteData, result.get(TOPIC_ID_PARTITION).info());
assertSame(localData, result.get(TOPIC_ID_PARTITION_2).info());
}
+
+ @Test
+ public void testReadAsyncSkipsPartitionsWithNoReadResult() throws Exception {
+ ReplicaManager replicaManager = mockReplicaManager();
+ // The local read returns a result only for partition 2; partition 1 has no read result at all.
+ FetchDataInfo localData = localData();
+ stubReadFromLog(replicaManager,
+ List.of(new Tuple2<>(TOPIC_ID_PARTITION_2, localReadResult(localData, Errors.NONE))));
+
+ ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
+ LinkedHashMap result =
+ logReader.readAsync(fetchParams(), Set.of(TOPIC_ID_PARTITION, TOPIC_ID_PARTITION_2),
+ offsets(TOPIC_ID_PARTITION, TOPIC_ID_PARTITION_2),
+ maxBytes(TOPIC_ID_PARTITION, TOPIC_ID_PARTITION_2), true).get(10, TimeUnit.SECONDS);
+
+ // Partition 1 had no read result, so it is skipped (absent from the map); only partition 2 is present.
+ assertEquals(Set.of(TOPIC_ID_PARTITION_2), result.keySet());
+ assertSame(localData, result.get(TOPIC_ID_PARTITION_2).info());
+ }
}