Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,27 @@ public PollResult poll(long currentTimeMs) {
}
});

// Iterate over the session handlers again to remove any partitions which are no longer
// being fetched and which also have no acknowledgements to send. This handles the case
// where a node has not otherwise been picked up in this poll, but we need to send a
// ShareFetch to remove the stale partitions from the share session.
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null && !handlerMap.containsKey(node) && !sessionHandler.sessionPartitionMap().isEmpty()) {
if (nodesWithPendingRequests.contains(node.id())) {
log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId);
} else {
Set<TopicPartition> currentPartitionsToFetch = new HashSet<>(partitionsToFetch());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to maybe include this logic the previous loop (the one processing the unsent acknowledgements) just after we do the acknowledgements check to avoid passing over the main map twice?
Since we are having the !handlerMap.containsKey(node) check here, it should not coincide with any partitions added in that acknowledgements loop pass.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, but the map is going to be small. I preferred having the logic separate of the third loop.

boolean hasPartitionsToRemove = sessionHandler.sessionPartitions().stream()
.anyMatch(tip -> !currentPartitionsToFetch.contains(tip.topicPartition()));
if (hasPartitionsToRemove) {
handlerMap.put(node, sessionHandler);
log.debug("Added fetch request for previously subscribed partitions without acknowledgements to node {}", nodeId);
}
}
}
});

// Iterate over the share session handlers and build a list of UnsentRequests.
List<UnsentRequest> requests = handlerMap.entrySet().stream().map(entry -> {
Node target = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,20 +1115,65 @@ public void testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgemen
// Change the subscription.
subscriptions.assignFromSubscribed(List.of(tp1));

// Now we will be sending the request to node1 only as leader for tip1 is node1.
// We do not build the request for tip0 as there are no acknowledgements to send.
// We build a request to node 1 to fetch tip1, and a request to node 0 to remove tip0
// from the share session even though there are no acknowledgements to send.
NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult();
assertEquals(1, pollResult.unsentRequests.size());
assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get());
assertEquals(2, pollResult.unsentRequests.size());

ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
ShareFetchRequest.Builder node0Builder, node1Builder;
if (pollResult.unsentRequests.get(0).node().get() == nodeId0) {
node0Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
node1Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(1).requestBuilder();
assertEquals(nodeId1, pollResult.unsentRequests.get(1).node().get());
} else {
node0Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(1).requestBuilder();
node1Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
assertEquals(nodeId0, pollResult.unsentRequests.get(1).node().get());
assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get());
}

assertEquals(1, builder.data().topics().size());
ShareFetchRequestData.FetchTopic fetchTopic = builder.data().topics().stream().findFirst().get();
// node1 fetches the newly assigned partition tip1.
assertEquals(1, node1Builder.data().topics().size());
ShareFetchRequestData.FetchTopic fetchTopic = node1Builder.data().topics().stream().findFirst().get();
assertEquals(tip1.topicId(), fetchTopic.topicId());
assertEquals(1, fetchTopic.partitions().size());
assertEquals(1, fetchTopic.partitions().stream().findFirst().get().partitionIndex());
assertEquals(0, builder.data().forgottenTopicsData().size());
assertEquals(0, node1Builder.data().forgottenTopicsData().size());

// node0 removes tip0 from the share session and fetches nothing.
assertEquals(0, node0Builder.data().topics().size());
assertEquals(1, node0Builder.data().forgottenTopicsData().size());
assertEquals(tip0.topicId(), node0Builder.data().forgottenTopicsData().get(0).topicId());
assertEquals(1, node0Builder.data().forgottenTopicsData().get(0).partitions().size());
assertEquals(0, node0Builder.data().forgottenTopicsData().get(0).partitions().get(0));
}

@Test
public void testShareFetchRemovesUnassignedPartitionFromSession() {
buildRequestManager();

assignFromSubscribed(Set.of(tp0));

// Establish the share session by fetching from tp0.
sendFetchAndVerifyResponse(records, emptyAcquiredRecords, Errors.NONE);
fetchRecords();

// The partition is no longer assigned and there are no acknowledgements to send.
subscriptions.assignFromSubscribed(Set.of());

// We still build a ShareFetch to remove tip0 from the share session on the broker.
NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult();
assertEquals(1, pollResult.unsentRequests.size());

ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
assertEquals(0, builder.data().topics().size());
assertEquals(1, builder.data().forgottenTopicsData().size());
assertEquals(tip0.topicId(), builder.data().forgottenTopicsData().get(0).topicId());
assertEquals(1, builder.data().forgottenTopicsData().get(0).partitions().size());
assertEquals(0, builder.data().forgottenTopicsData().get(0).partitions().get(0));

// The partition has already been removed from the session, so no further ShareFetch is built.
assertEquals(0, shareConsumeRequestManager.sendFetches());
}

@Test
Expand Down
Loading