Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,7 @@ project(':clients:clients-integration-tests') {
testImplementation project(':raft')
testImplementation project(':server')
testImplementation project(':storage')
testImplementation testFixtures(project(':storage'))
testImplementation project(':core').sourceSets.test.output
testImplementation testFixtures(project(':clients'))
implementation project(':server-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -267,6 +268,71 @@ public void testRejectedRecordsWrittenToDlqWithCopyRecordEnabled() throws Except
verifyDlqMetrics(groupId, recordCount);
}

/**
* End-to-end DLQ copy when the source records have been tiered to remote storage. The source topic enables
* tiered storage and rolls a segment per record, with a 45s total retention and a 5s local retention so the
* early offsets are offloaded to remote storage and then deleted locally (well before the remote segments
* expire). Once the local segments are gone (verified via the earliest-local offset advancing past them), the
* records are rejected with record copy enabled - so the DLQ record fetcher must read the original records
* back from remote storage. The resulting DLQ records carrying the original key/value confirm the fetcher
* successfully pulled them from remote storage.
*
* <p>Tiered storage is backed by the local-filesystem {@code LocalTieredStorage} RSM and the default
* {@code TopicBasedRemoteLogMetadataManager}; short task/cleanup intervals keep the offload + local-delete
* cycle quick.
*/
@ClusterTest(
serverProperties = {
@ClusterConfigProperty(key = "remote.log.storage.system.enable", value = "true"),
@ClusterConfigProperty(key = "remote.log.storage.manager.class.name",
value = "org.apache.kafka.server.log.remote.storage.LocalTieredStorage"),
@ClusterConfigProperty(key = "remote.log.manager.task.interval.ms", value = "500"),
@ClusterConfigProperty(key = "remote.log.metadata.manager.listener.name", value = "EXTERNAL"),
@ClusterConfigProperty(key = "rlmm.config.remote.log.metadata.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "rlmm.config.remote.log.metadata.topic.num.partitions", value = "1"),
@ClusterConfigProperty(key = "log.retention.check.interval.ms", value = "500"),
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100")
}
)
public void testDlqCopiesRecordsReadFromRemoteStorage() throws Exception {
String groupId = "dlq-remote-group";
// The broker's default share-group DLQ topic prefix is "dlq.", so the topic name must start with it.
String dlqTopic = "dlq.remote";
String sourceTopic = "dlq-remote-source";
int recordCount = 5;

alterShareAutoOffsetReset(groupId, "earliest");
createDlqTopic(dlqTopic);
alterShareGroupConfig(groupId, GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, dlqTopic);
// Record copy enabled: the DLQ fetcher must read the original records back to copy their key/value.
alterShareGroupConfig(groupId, GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "true");

// Tiered source topic: one segment per record, 45s total retention and 5s local retention so inactive
// segments are offloaded to remote storage and then deleted locally shortly after, while the remote
// segments comfortably survive the rest of the test.
createRemoteStorageSourceTopic(sourceTopic, 45_000L, 5_000L);

produceTo(sourceTopic, 0, recordCount);

// Wait until the early offsets have been offloaded to remote storage AND removed locally - i.e. the
// earliest *local* offset has advanced past them, so reading those offsets must now hit remote storage.
// The last record stays in the active (never-offloaded) segment, so the earliest local offset should
// reach recordCount - 1. A generous timeout (well inside the 45s remote retention) absorbs remote-log
// metadata-manager startup; it normally resolves a few seconds after the 5s local retention elapses.
waitForCondition(() -> earliestLocalOffset(sourceTopic, 0) >= recordCount - 1,
30_000L, 500L,
() -> "Source records were not tiered to remote storage and removed locally in time");

// Reject every record. Both the share fetch (to deliver them) and the DLQ record fetcher (to copy them)
// must read the tiered offsets back from remote storage.
rejectRecords(groupId, sourceTopic, recordCount);

// Record copy is enabled, so every DLQ record must carry the original key/value. For the tiered offsets
// (no longer present locally) that is only possible if the DLQ fetcher pulled them from remote storage.
verifyDlqTopicRecords(dlqTopic, groupId, sourceTopic, 0, expectedSourceOffsets(recordCount), true);
verifyDlqMetrics(groupId, recordCount);
}

/**
* Rejects records from a multi-partition source topic and verifies they are routed to the correct DLQ
* partition. The destination partition is {@code sourcePartition % numDlqPartitions}; with a DLQ topic that
Expand Down Expand Up @@ -537,13 +603,23 @@ private void verifyDlqTopicCreated(String dlqTopic) throws Exception {
*/
private void verifyDlqTopicRecords(String dlqTopic, String groupId, Set<Long> expectedSourceOffsets,
boolean copyEnabled) throws InterruptedException {
verifyDlqTopicRecords(dlqTopic, groupId, tp.topic(), tp.partition(), expectedSourceOffsets, copyEnabled);
}

/**
* As {@link #verifyDlqTopicRecords(String, String, Set, boolean)}, but for an explicit source topic-partition
* (rather than the base {@code tp}). The DLQ records' context headers must reference {@code sourceTopic} /
* {@code sourcePartition}.
*/
private void verifyDlqTopicRecords(String dlqTopic, String groupId, String sourceTopic, int sourcePartition,
Set<Long> expectedSourceOffsets, boolean copyEnabled) throws InterruptedException {
List<ConsumerRecord<byte[], byte[]>> dlqRecords = readDlqPartition(dlqTopic, 0, expectedSourceOffsets.size());

assertEquals(expectedSourceOffsets.size(), dlqRecords.size(), "Unexpected number of records on the DLQ topic");
Set<Long> actualSourceOffsets = new HashSet<>();
for (ConsumerRecord<byte[], byte[]> record : dlqRecords) {
if (copyEnabled) {
// produceMessages() produces records with key "key" and value "value".
// produceMessages()/produceTo() produce records with key "key" and value "value".
assertEquals("key", new String(Objects.requireNonNull(record.key()), StandardCharsets.UTF_8),
"DLQ record key should be copied when record copy is enabled");
assertEquals("value", new String(Objects.requireNonNull(record.value()), StandardCharsets.UTF_8),
Expand All @@ -553,8 +629,8 @@ private void verifyDlqTopicRecords(String dlqTopic, String groupId, Set<Long> ex
assertNull(record.value(), "DLQ record value should be null when record copy is disabled");
}
assertEquals(groupId, headerValue(record, HEADER_DLQ_ERRORS_GROUP));
assertEquals(tp.topic(), headerValue(record, HEADER_DLQ_ERRORS_TOPIC));
assertEquals(Integer.toString(tp.partition()), headerValue(record, HEADER_DLQ_ERRORS_PARTITION));
assertEquals(sourceTopic, headerValue(record, HEADER_DLQ_ERRORS_TOPIC));
assertEquals(Integer.toString(sourcePartition), headerValue(record, HEADER_DLQ_ERRORS_PARTITION));
actualSourceOffsets.add(Long.parseLong(Objects.requireNonNull(headerValue(record, HEADER_DLQ_ERRORS_OFFSET))));
}
assertEquals(expectedSourceOffsets, actualSourceOffsets, "DLQ records should cover every expected source offset");
Expand Down Expand Up @@ -591,6 +667,36 @@ private void createDlqTopic(String topicName, int numPartitions) {
}, "Failed to create DLQ topic");
}

// Creates a single-partition source topic with tiered storage enabled and one log segment per record (via
// per-record index entries). A short local retention (`localRetentionMs`) deletes inactive segments from
// local storage soon after they are offloaded to remote storage, so reading those offsets must hit remote
// storage; the total retention (`retentionMs`) is kept generous so the remote segments are not deleted while
// the test is still running.
private void createRemoteStorageSourceTopic(String topic, long retentionMs, long localRetentionMs) {
assertDoesNotThrow(() -> {
try (Admin admin = createAdminClient()) {
Map<String, String> configs = Map.of(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true",
TopicConfig.RETENTION_MS_CONFIG, Long.toString(retentionMs),
TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, Long.toString(localRetentionMs),
// Roll a segment for every record so each inactive segment can be offloaded then deleted locally.
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1",
TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "12");
admin.createTopics(Set.of(new NewTopic(topic, 1, (short) 1).configs(configs))).all().get();
}
}, "Failed to create remote-storage source topic");
}

// The earliest offset still held in local storage. Offsets below this have been removed locally (e.g. after
// being offloaded to remote storage), so reading them must hit remote storage. Used to confirm tiering.
private long earliestLocalOffset(String topic, int partition) throws Exception {
TopicPartition topicPartition = new TopicPartition(topic, partition);
try (Admin admin = createAdminClient()) {
return admin.listOffsets(Map.of(topicPartition, OffsetSpec.earliestLocal()))
.partitionResult(topicPartition).get().offset();
}
}

// Produces `count` records (key "key", value "value") to a specific topic-partition.
private void produceTo(String topic, int partition, int count) {
try (Producer<byte[], byte[]> producer = createProducer()) {
Expand Down
41 changes: 36 additions & 5 deletions core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.share.LogReader;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
Expand All @@ -37,6 +38,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import scala.Tuple2;
Expand Down Expand Up @@ -116,13 +118,13 @@ public CompletableFuture<LinkedHashMap<TopicIdPartition, LogReadResult>> readAsy
LinkedHashMap<TopicIdPartition, LogReadResult> localReadResults =
read(fetchParams, partitionsToFetch, topicPartitionFetchOffsets, partitionMaxBytes);

// One future per partition; combined into a single future once every partition has resolved.
// Only look at partitions with non-null read results.
LinkedHashMap<TopicIdPartition, CompletableFuture<LogReadResult>> futures = new LinkedHashMap<>();
for (TopicIdPartition topicIdPartition : partitionsToFetch) {
LogReadResult logReadResult = localReadResults.get(topicIdPartition);
if (logReadResult == null) {
futures.put(topicIdPartition, CompletableFuture.completedFuture(
new LogReadResult(Errors.UNKNOWN_SERVER_ERROR)));
// Just skip
log.debug("Log read result for partition {} is null", topicIdPartition);
continue;
}

Expand All @@ -147,6 +149,7 @@ public CompletableFuture<LinkedHashMap<TopicIdPartition, LogReadResult>> readAsy
return withInfoAndError(logReadResult, localFetchDataInfo, Errors.forException(cause));
}
if (remoteFetchDataInfo == null) {
// We want to return successful local read results so no skipping.
return withInfoAndError(logReadResult, localFetchDataInfo, Errors.UNKNOWN_SERVER_ERROR);
}
return withInfoAndError(logReadResult, remoteFetchDataInfo, Errors.NONE);
Expand Down Expand Up @@ -185,8 +188,9 @@ private static LogReadResult withInfoAndError(LogReadResult base, FetchDataInfo
* RemoteStorageFetchInfo is the descriptor surfaced by a prior local read as
* FetchDataInfo#delayedRemoteStorageFetch. The read runs on the remote storage reader pool so the
* caller's thread is not blocked; the future completes exceptionally when remote storage is not
* configured or the read could not be completed. Used internally by readAsync (package-private so
* it remains unit-testable).
* configured, the read could not be completed, or the read does not finish within the configured
* remote fetch timeout ({@code remote.fetch.max.wait.ms}). Used internally by readAsync
* (package-private so it remains unit-testable).
*/
// Visibility for testing
CompletableFuture<FetchDataInfo> readRemote(RemoteStorageFetchInfo remoteStorageFetchInfo) {
Expand Down Expand Up @@ -218,6 +222,33 @@ CompletableFuture<FetchDataInfo> readRemote(RemoteStorageFetchInfo remoteStorage
future.completeExceptionally(e);
}

// Bound the wait on the remote read so a stalled remote tier cannot pin the read (and the
// resources held while it is pending) indefinitely. Use the broker's timer wheel - as
// DelayedShareFetch does for its remote fetch - rather than CompletableFuture#orTimeout. On
// expiry the future completes exceptionally with a TimeoutException, which the caller treats
// as a (skippable) read error.
if (!future.isDone()) {
long timeoutMs = remoteFetchMaxWaitMs();
TimerTask timeoutTask = new TimerTask(timeoutMs) {
@Override
public void run() {
future.completeExceptionally(new TimeoutException(
"Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms."));
Comment on lines +235 to +236

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just completing it exceptionally might not be a good idea as the method can still return the locally fetched data while skipping the remote storage partitions.

}
Comment on lines +235 to +237

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need not to cancel the pending future for remote calls?

};
// Cancel the timer task once the read completes (either outcome) so it does not linger in the wheel.
future.whenComplete((info, exception) -> timeoutTask.cancel());
replicaManager.addShareFetchTimerRequest(timeoutTask);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why share fetch timer API being used here? Also should we make it callers responsibility to handle the timeouts?

}

return future;
}

/**
* The maximum time to wait for a remote-tier read, taken from the broker's
* {@code remote.fetch.max.wait.ms}. Read per call since the config is dynamically reconfigurable.
*/
private long remoteFetchMaxWaitMs() {
return replicaManager.config().remoteLogManagerConfig().remoteFetchMaxWaitMs();
}
}
Loading
Loading