diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 4ff391ecfc1d6..ac3f891288bf8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -248,6 +248,27 @@ public PollResult poll(long currentTimeMs) { } }); + // Iterate over the session handlers again to remove any partitions which are no longer + // being fetched and which also have no acknowledgements to send. This handles the case + // where a node has not otherwise been picked up in this poll, but we need to send a + // ShareFetch to remove the stale partitions from the share session. + sessionHandlers.forEach((nodeId, sessionHandler) -> { + Node node = cluster.nodeById(nodeId); + if (node != null && !handlerMap.containsKey(node) && !sessionHandler.sessionPartitionMap().isEmpty()) { + if (nodesWithPendingRequests.contains(node.id())) { + log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId); + } else { + Set currentPartitionsToFetch = new HashSet<>(partitionsToFetch()); + boolean hasPartitionsToRemove = sessionHandler.sessionPartitions().stream() + .anyMatch(tip -> !currentPartitionsToFetch.contains(tip.topicPartition())); + if (hasPartitionsToRemove) { + handlerMap.put(node, sessionHandler); + log.debug("Added fetch request for previously subscribed partitions without acknowledgements to node {}", nodeId); + } + } + } + }); + // Iterate over the share session handlers and build a list of UnsentRequests. List requests = handlerMap.entrySet().stream().map(entry -> { Node target = entry.getKey(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index ca8d0ca08655a..4530965a35d10 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -1115,20 +1115,65 @@ public void testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgemen // Change the subscription. subscriptions.assignFromSubscribed(List.of(tp1)); - // Now we will be sending the request to node1 only as leader for tip1 is node1. - // We do not build the request for tip0 as there are no acknowledgements to send. + // We build a request to node 1 to fetch tip1, and a request to node 0 to remove tip0 + // from the share session even though there are no acknowledgements to send. NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); - assertEquals(1, pollResult.unsentRequests.size()); - assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get()); + assertEquals(2, pollResult.unsentRequests.size()); - ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + ShareFetchRequest.Builder node0Builder, node1Builder; + if (pollResult.unsentRequests.get(0).node().get() == nodeId0) { + node0Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + node1Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(1).requestBuilder(); + assertEquals(nodeId1, pollResult.unsentRequests.get(1).node().get()); + } else { + node0Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(1).requestBuilder(); + node1Builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + assertEquals(nodeId0, pollResult.unsentRequests.get(1).node().get()); + assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get()); + } - assertEquals(1, builder.data().topics().size()); - ShareFetchRequestData.FetchTopic fetchTopic = builder.data().topics().stream().findFirst().get(); + // node1 fetches the newly assigned partition tip1. + assertEquals(1, node1Builder.data().topics().size()); + ShareFetchRequestData.FetchTopic fetchTopic = node1Builder.data().topics().stream().findFirst().get(); assertEquals(tip1.topicId(), fetchTopic.topicId()); assertEquals(1, fetchTopic.partitions().size()); assertEquals(1, fetchTopic.partitions().stream().findFirst().get().partitionIndex()); - assertEquals(0, builder.data().forgottenTopicsData().size()); + assertEquals(0, node1Builder.data().forgottenTopicsData().size()); + + // node0 removes tip0 from the share session and fetches nothing. + assertEquals(0, node0Builder.data().topics().size()); + assertEquals(1, node0Builder.data().forgottenTopicsData().size()); + assertEquals(tip0.topicId(), node0Builder.data().forgottenTopicsData().get(0).topicId()); + assertEquals(1, node0Builder.data().forgottenTopicsData().get(0).partitions().size()); + assertEquals(0, node0Builder.data().forgottenTopicsData().get(0).partitions().get(0)); + } + + @Test + public void testShareFetchRemovesUnassignedPartitionFromSession() { + buildRequestManager(); + + assignFromSubscribed(Set.of(tp0)); + + // Establish the share session by fetching from tp0. + sendFetchAndVerifyResponse(records, emptyAcquiredRecords, Errors.NONE); + fetchRecords(); + + // The partition is no longer assigned and there are no acknowledgements to send. + subscriptions.assignFromSubscribed(Set.of()); + + // We still build a ShareFetch to remove tip0 from the share session on the broker. + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(1, pollResult.unsentRequests.size()); + + ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + assertEquals(0, builder.data().topics().size()); + assertEquals(1, builder.data().forgottenTopicsData().size()); + assertEquals(tip0.topicId(), builder.data().forgottenTopicsData().get(0).topicId()); + assertEquals(1, builder.data().forgottenTopicsData().get(0).partitions().size()); + assertEquals(0, builder.data().forgottenTopicsData().get(0).partitions().get(0)); + + // The partition has already been removed from the session, so no further ShareFetch is built. + assertEquals(0, shareConsumeRequestManager.sendFetches()); } @Test