diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 32468c4889259..3cdece40851f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -57,7 +57,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; -import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentTopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -675,8 +674,9 @@ private static String deserializeLeaderBrokerId(byte[] bytes) { } /** - * Best-effort delete underlying persistent topics for all segments. - * Uses the internal admin client which handles cross-broker routing. + * Best-effort delete the underlying topic for every segment in the DAG. Uses the + * segment-aware admin endpoint, which routes to the segment-owning broker via the + * standard bundle-ownership lookup. */ private CompletableFuture deleteSegmentTopics(TopicName parentTopic, ScalableTopicMetadata metadata, @@ -685,10 +685,11 @@ private CompletableFuture deleteSegmentTopics(TopicName parentTopic, var admin = pulsar().getAdminClient(); CompletableFuture[] futures = metadata.getSegments().values().stream() .map(seg -> { - String name = segmentPersistentName(parentTopic, seg); - return admin.topics().deleteAsync(name, force) + String segmentTopicName = SegmentTopicName.fromParent( + parentTopic, seg.hashRange(), seg.segmentId()).toString(); + return admin.scalableTopics().deleteSegmentAsync(segmentTopicName, force) .exceptionally(ex -> { - log.warn().attr("segment", name).exceptionMessage(ex) + log.warn().attr("segment", segmentTopicName).exceptionMessage(ex) .log("Failed to delete segment topic"); return null; }); @@ -701,15 +702,4 @@ private CompletableFuture deleteSegmentTopics(TopicName parentTopic, return CompletableFuture.completedFuture(null); } } - - /** - * Convert a segment:// topic name to persistent:// for the underlying managed ledger topic. - */ - private String segmentPersistentName(TopicName parentTopic, SegmentInfo segment) { - TopicName segTopic = SegmentTopicName.fromParent( - parentTopic, segment.hashRange(), segment.segmentId()); - return "persistent://" + segTopic.getTenant() + "/" - + segTopic.getNamespacePortion() + "/" - + segTopic.getLocalName(); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java index 507119518b07d..98d09613d7d48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java @@ -175,6 +175,107 @@ public void terminateSegment( }); } + @PUT + @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}") + @ApiOperation(value = "Create a subscription cursor on the segment topic at the earliest" + + " position. Super-user only.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Subscription cursor created (or already existed)"), + @ApiResponse(code = 401, message = "This operation requires super-user access"), + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void createSubscription( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify the parent topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true) + @PathParam("descriptor") String descriptor, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Whether leader broker redirected this call to this broker.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(tenant, namespace); + TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor); + + validateSuperUserAccessAsync() + .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative)) + .thenCompose(__ -> pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString())) + .thenCompose(topic -> topic.createSubscription(subscription, + CommandSubscribe.InitialPosition.Earliest, false, null)) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .log("Created subscription on segment topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .exception(ex).log("Failed to create subscription on segment"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}") + @ApiOperation(value = "Delete a subscription cursor on the segment topic. Super-user only.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Subscription cursor deleted (or never existed)"), + @ApiResponse(code = 401, message = "This operation requires super-user access"), + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void deleteSubscription( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify the parent topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true) + @PathParam("descriptor") String descriptor, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Whether leader broker redirected this call to this broker.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(tenant, namespace); + TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor); + + validateSuperUserAccessAsync() + .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative)) + .thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString())) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + // Topic not loaded → no cursor to delete. Idempotent success. + return CompletableFuture.completedFuture(null); + } + var sub = optTopic.get().getSubscription(subscription); + if (sub == null) { + // Subscription doesn't exist on this segment — idempotent success. + return CompletableFuture.completedFuture(null); + } + return sub.delete(); + }) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .log("Deleted subscription on segment topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .exception(ex).log("Failed to delete subscription on segment"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog") @ApiOperation(value = "Number of unconsumed entries in the segment topic for the " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 2d6c61e2256fa..ca32495de5d3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -698,11 +698,11 @@ private CompletableFuture clearSubscriptionBacklogOnSegment(SegmentInfo se } private CompletableFuture createSubscriptionOnSegment(SegmentInfo segment, String subscription) { - String persistentName = toSegmentUnderlyingPersistentName(segment); + String segmentTopicName = toSegmentPersistentName(segment); try { return brokerService.getPulsar().getAdminClient() - .topics().createSubscriptionAsync(persistentName, subscription, - org.apache.pulsar.client.api.MessageId.earliest) + .scalableTopics() + .createSegmentSubscriptionAsync(segmentTopicName, subscription) .exceptionally(ex -> { Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException.ConflictException) { @@ -717,17 +717,18 @@ private CompletableFuture createSubscriptionOnSegment(SegmentInfo segment, } private CompletableFuture deleteSubscriptionOnSegment(SegmentInfo segment, String subscription) { - String persistentName = toSegmentUnderlyingPersistentName(segment); + String segmentTopicName = toSegmentPersistentName(segment); try { return brokerService.getPulsar().getAdminClient() - .topics().deleteSubscriptionAsync(persistentName, subscription, true) + .scalableTopics() + .deleteSegmentSubscriptionAsync(segmentTopicName, subscription) .exceptionally(ex -> { Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) { return null; } log.warn().attr("subscription", subscription) - .attr("segment", persistentName).exceptionMessage(cause) + .attr("segment", segmentTopicName).exceptionMessage(cause) .log("Failed to delete subscription from segment"); return null; }); @@ -1083,19 +1084,6 @@ private String toSegmentPersistentName(SegmentInfo segment) { return segmentTopicName.toString(); } - /** - * Return the {@code persistent://} form of a segment's underlying managed-ledger topic, - * suitable for the standard {@link org.apache.pulsar.client.admin.Topics} admin API. - * The segment-owning broker is discovered by the admin client's normal bundle routing. - */ - private String toSegmentUnderlyingPersistentName(SegmentInfo segment) { - TopicName segmentTopicName = SegmentTopicName.fromParent( - topicName, segment.hashRange(), segment.segmentId()); - return "persistent://" + segmentTopicName.getTenant() + "/" - + segmentTopicName.getNamespacePortion() + "/" - + segmentTopicName.getLocalName(); - } - private CompletableFuture terminateSegmentTopic(String segmentTopicName) { try { return brokerService.getPulsar().getAdminClient() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 9da30e662d438..91467caa45ca7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -47,7 +47,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.ScalableTopics; import org.apache.pulsar.client.admin.Topics; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.ScalableConsumerType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ScalableTopicStats; @@ -118,14 +117,14 @@ public void setUp() throws Exception { // Default: all admin ops succeed. when(topics.getSubscriptionsAsync(anyString())) .thenReturn(CompletableFuture.completedFuture(java.util.List.of())); - when(topics.createSubscriptionAsync(anyString(), anyString(), any(MessageId.class))) - .thenReturn(CompletableFuture.completedFuture(null)); - when(topics.deleteSubscriptionAsync(anyString(), anyString(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); when(scalableTopics.createSegmentAsync(anyString(), any())) .thenReturn(CompletableFuture.completedFuture(null)); when(scalableTopics.terminateSegmentAsync(anyString())) .thenReturn(CompletableFuture.completedFuture(null)); + when(scalableTopics.createSegmentSubscriptionAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(scalableTopics.deleteSegmentSubscriptionAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); controller = newController(topicName); } @@ -291,9 +290,9 @@ public void testCreateSubscriptionStream() throws Exception { resources.getSubscriptionAsync(topicName, "sub-stream").get(); assertTrue(persisted.isPresent()); assertEquals(persisted.get().type(), SubscriptionType.STREAM); - // Propagated to every active segment via admin.topics().createSubscriptionAsync(). - verify(topics, org.mockito.Mockito.times(INITIAL_SEGMENTS)) - .createSubscriptionAsync(anyString(), anyString(), any(MessageId.class)); + // Propagated to every active segment via the segment-subscription admin endpoint. + verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS)) + .createSegmentSubscriptionAsync(anyString(), anyString()); } @Test @@ -324,8 +323,8 @@ public void testDeleteSubscription() throws Exception { controller.deleteSubscription("sub-a").get(); assertFalse(resources.getSubscriptionAsync(topicName, "sub-a").get().isPresent()); // Propagated a delete to every segment (all segments incl. any sealed ones). - verify(topics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS)) - .deleteSubscriptionAsync(anyString(), anyString(), anyBoolean()); + verify(scalableTopics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS)) + .deleteSegmentSubscriptionAsync(anyString(), anyString()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java new file mode 100644 index 0000000000000..d55495999d5ea --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.common.policies.data.ScalableSubscriptionType; +import org.testng.annotations.Test; + +/** + * Coverage for {@code admin.scalableTopics().createSubscription(...)}: the admin + * API must materialize a cursor on every active segment so a consumer that + * subscribes after messages are produced still receives those + * messages — the whole point of pre-creating the subscription. + * + *

The behavioural assertion (a late consumer sees pre-subscription messages) + * is the user-facing guarantee, and any regression in + * {@code ScalableTopicController.createSubscriptionOnSegment} — which converts + * each {@code SegmentInfo} to the underlying {@code persistent://} topic and + * pre-creates the cursor through the standard topic admin API — would surface + * here as messages going missing. + */ +public class V5ScalableSubscriptionAdminTest extends V5ClientBaseTest { + + @Test + public void testPreCreatedSubscriptionRetainsPreProductionMessages() throws Exception { + String topic = newScalableTopic(3); + String subscription = "pre-created-sub"; + + // Pre-create the subscription on the scalable topic. This must materialize a + // cursor on every active segment so that subsequent produces are retained + // until the consumer drains them. + admin.scalableTopics().createSubscription(topic, subscription, + ScalableSubscriptionType.QUEUE); + + // Produce *before* any consumer subscribes. + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + int n = 30; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String v = "msg-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + sent.add(v); + } + + // Subscribe with the SAME subscription name. If createSubscription truly + // pre-created cursors on every segment, the consumer must receive every + // message produced above. If it didn't, the consumer attaches at "latest" + // by default and drops the entire backlog. + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Set received = new HashSet<>(); + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < n && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + } + assertEquals(received, sent, + "pre-created subscription must retain every message produced before consumer subscribed"); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java index 10da169d14e08..f9fb01444a6ba 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java @@ -313,6 +313,32 @@ CompletableFuture seekSubscriptionAsync(String topic, String subscription, */ CompletableFuture deleteSegmentAsync(String segmentTopic, boolean force); + /** + * Create a subscription cursor on the given segment topic at the earliest position. + * The call routes to the broker that owns the segment. + * + *

Used internally by {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController + * ScalableTopicController} to fan a new scalable-topic subscription out across every + * active segment so a future consumer doesn't drop the backlog. + * + * @param segmentTopic Full segment topic name ({@code segment://tenant/namespace/topic/descriptor}) + * @param subscription Subscription name + */ + CompletableFuture createSegmentSubscriptionAsync(String segmentTopic, String subscription); + + /** + * Delete a subscription cursor on the given segment topic. The call routes to the broker + * that owns the segment. + * + *

Used internally by {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController + * ScalableTopicController} when a scalable-topic subscription is deleted, so no orphan + * cursors remain on any segment in the DAG. + * + * @param segmentTopic Full segment topic name ({@code segment://tenant/namespace/topic/descriptor}) + * @param subscription Subscription name + */ + CompletableFuture deleteSegmentSubscriptionAsync(String segmentTopic, String subscription); + /** * Returns the number of unconsumed entries in the given subscription's cursor on the * segment topic — i.e. the per-subscription backlog. The call routes to the broker diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java index 5a040123d2ed5..57d26c7011acd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java @@ -290,6 +290,28 @@ public CompletableFuture deleteSegmentAsync(String segmentTopic, boolean f return asyncDeleteRequest(path); } + @Override + public CompletableFuture createSegmentSubscriptionAsync(String segmentTopic, + String subscription) { + TopicName tn = TopicName.get(segmentTopic); + WebTarget path = adminSegments + .path(tn.getTenant()).path(tn.getNamespacePortion()) + .path(tn.getLocalName()).path(tn.getSegmentDescriptor()) + .path("subscription").path(subscription); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override + public CompletableFuture deleteSegmentSubscriptionAsync(String segmentTopic, + String subscription) { + TopicName tn = TopicName.get(segmentTopic); + WebTarget path = adminSegments + .path(tn.getTenant()).path(tn.getNamespacePortion()) + .path(tn.getLocalName()).path(tn.getSegmentDescriptor()) + .path("subscription").path(subscription); + return asyncDeleteRequest(path); + } + @Override public CompletableFuture getSegmentSubscriptionBacklogAsync(String segmentTopic, String subscription) {