Skip to content

KAFKA-20678: Add timeout in replica manager log reader remote read.#22707

Open
smjn wants to merge 3 commits into
apache:trunkfrom
smjn:KAFKA-20678-2
Open

KAFKA-20678: Add timeout in replica manager log reader remote read.#22707
smjn wants to merge 3 commits into
apache:trunkfrom
smjn:KAFKA-20678-2

Conversation

@smjn

@smjn smjn commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

ReplicaManagerLogReader.readAsync fetches data from remote storage, if
enabled. The response is async. Current code does not bound this call to
remote storage. This PR uses the existing remoteFetchMaxWaitMs to
alleviate the situation by scheduling a timed killer task into the
replica manager's timer wheel.

A new integ test has been added to ShareConsumerDLQTest to verify e-2-e
tiering based DLQ.

Reviewers: Apoorv Mittal apoorvmittal10@gmail.com

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jun 30, 2026
@smjn smjn requested a review from apoorvmittal10 June 30, 2026 08:33
@github-actions github-actions Bot added build Gradle build or GitHub Actions clients labels Jun 30, 2026

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, one comment.

Comment on lines +235 to +237
future.completeExceptionally(new TimeoutException(
"Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms."));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need not to cancel the pending future for remote calls?

};
// Cancel the timer task once the read completes (either outcome) so it does not linger in the wheel.
future.whenComplete((info, exception) -> timeoutTask.cancel());
replicaManager.addShareFetchTimerRequest(timeoutTask);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why share fetch timer API being used here? Also should we make it callers responsibility to handle the timeouts?

Comment on lines +235 to +236
future.completeExceptionally(new TimeoutException(
"Remote read for " + remoteStorageFetchInfo + " did not complete within " + timeoutMs + " ms."));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just completing it exceptionally might not be a good idea as the method can still return the locally fetched data while skipping the remote storage partitions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions clients core Kafka Broker KIP-932 Queues for Kafka triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants