Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentTopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -675,8 +674,9 @@ private static String deserializeLeaderBrokerId(byte[] bytes) {
}

/**
* Best-effort delete underlying persistent topics for all segments.
* Uses the internal admin client which handles cross-broker routing.
* Best-effort delete the underlying topic for every segment in the DAG. Uses the
* segment-aware admin endpoint, which routes to the segment-owning broker via the
* standard bundle-ownership lookup.
*/
private CompletableFuture<Void> deleteSegmentTopics(TopicName parentTopic,
ScalableTopicMetadata metadata,
Expand All @@ -685,10 +685,11 @@ private CompletableFuture<Void> deleteSegmentTopics(TopicName parentTopic,
var admin = pulsar().getAdminClient();
CompletableFuture<?>[] futures = metadata.getSegments().values().stream()
.map(seg -> {
String name = segmentPersistentName(parentTopic, seg);
return admin.topics().deleteAsync(name, force)
String segmentTopicName = SegmentTopicName.fromParent(
parentTopic, seg.hashRange(), seg.segmentId()).toString();
return admin.scalableTopics().deleteSegmentAsync(segmentTopicName, force)
.exceptionally(ex -> {
log.warn().attr("segment", name).exceptionMessage(ex)
log.warn().attr("segment", segmentTopicName).exceptionMessage(ex)
.log("Failed to delete segment topic");
return null;
});
Expand All @@ -701,15 +702,4 @@ private CompletableFuture<Void> deleteSegmentTopics(TopicName parentTopic,
return CompletableFuture.completedFuture(null);
}
}

/**
* Convert a segment:// topic name to persistent:// for the underlying managed ledger topic.
*/
private String segmentPersistentName(TopicName parentTopic, SegmentInfo segment) {
TopicName segTopic = SegmentTopicName.fromParent(
parentTopic, segment.hashRange(), segment.segmentId());
return "persistent://" + segTopic.getTenant() + "/"
+ segTopic.getNamespacePortion() + "/"
+ segTopic.getLocalName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,107 @@ public void terminateSegment(
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}")
@ApiOperation(value = "Create a subscription cursor on the segment topic at the earliest"
+ " position. Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Subscription cursor created (or already existed)"),
@ApiResponse(code = 401, message = "This operation requires super-user access"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 500, message = "Internal server error")})
public void createSubscription(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the parent topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true)
@PathParam("descriptor") String descriptor,
@ApiParam(value = "Subscription name", required = true)
@PathParam("subscription") String subscription,
@ApiParam(value = "Whether leader broker redirected this call to this broker.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor);

validateSuperUserAccessAsync()
.thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative))
.thenCompose(__ -> pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
.thenCompose(topic -> topic.createSubscription(subscription,
CommandSubscribe.InitialPosition.Earliest, false, null))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId()).attr("segment", segmentTopic)
.attr("subscription", subscription)
.log("Created subscription on segment topic");
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic)
.attr("subscription", subscription)
.exception(ex).log("Failed to create subscription on segment");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}")
@ApiOperation(value = "Delete a subscription cursor on the segment topic. Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Subscription cursor deleted (or never existed)"),
@ApiResponse(code = 401, message = "This operation requires super-user access"),
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 500, message = "Internal server error")})
public void deleteSubscription(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the parent topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true)
@PathParam("descriptor") String descriptor,
@ApiParam(value = "Subscription name", required = true)
@PathParam("subscription") String subscription,
@ApiParam(value = "Whether leader broker redirected this call to this broker.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor);

validateSuperUserAccessAsync()
.thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative))
.thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
// Topic not loaded → no cursor to delete. Idempotent success.
return CompletableFuture.completedFuture(null);
}
var sub = optTopic.get().getSubscription(subscription);
if (sub == null) {
// Subscription doesn't exist on this segment — idempotent success.
return CompletableFuture.completedFuture(null);
}
return sub.delete();
})
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId()).attr("segment", segmentTopic)
.attr("subscription", subscription)
.log("Deleted subscription on segment topic");
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic)
.attr("subscription", subscription)
.exception(ex).log("Failed to delete subscription on segment");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog")
@ApiOperation(value = "Number of unconsumed entries in the segment topic for the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,11 @@ private CompletableFuture<Void> clearSubscriptionBacklogOnSegment(SegmentInfo se
}

private CompletableFuture<Void> createSubscriptionOnSegment(SegmentInfo segment, String subscription) {
String persistentName = toSegmentUnderlyingPersistentName(segment);
String segmentTopicName = toSegmentPersistentName(segment);
try {
return brokerService.getPulsar().getAdminClient()
.topics().createSubscriptionAsync(persistentName, subscription,
org.apache.pulsar.client.api.MessageId.earliest)
.scalableTopics()
.createSegmentSubscriptionAsync(segmentTopicName, subscription)
.exceptionally(ex -> {
Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException.ConflictException) {
Expand All @@ -717,17 +717,18 @@ private CompletableFuture<Void> createSubscriptionOnSegment(SegmentInfo segment,
}

private CompletableFuture<Void> deleteSubscriptionOnSegment(SegmentInfo segment, String subscription) {
String persistentName = toSegmentUnderlyingPersistentName(segment);
String segmentTopicName = toSegmentPersistentName(segment);
try {
return brokerService.getPulsar().getAdminClient()
.topics().deleteSubscriptionAsync(persistentName, subscription, true)
.scalableTopics()
.deleteSegmentSubscriptionAsync(segmentTopicName, subscription)
.exceptionally(ex -> {
Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) {
return null;
}
log.warn().attr("subscription", subscription)
.attr("segment", persistentName).exceptionMessage(cause)
.attr("segment", segmentTopicName).exceptionMessage(cause)
.log("Failed to delete subscription from segment");
return null;
});
Expand Down Expand Up @@ -1083,19 +1084,6 @@ private String toSegmentPersistentName(SegmentInfo segment) {
return segmentTopicName.toString();
}

/**
* Return the {@code persistent://} form of a segment's underlying managed-ledger topic,
* suitable for the standard {@link org.apache.pulsar.client.admin.Topics} admin API.
* The segment-owning broker is discovered by the admin client's normal bundle routing.
*/
private String toSegmentUnderlyingPersistentName(SegmentInfo segment) {
TopicName segmentTopicName = SegmentTopicName.fromParent(
topicName, segment.hashRange(), segment.segmentId());
return "persistent://" + segmentTopicName.getTenant() + "/"
+ segmentTopicName.getNamespacePortion() + "/"
+ segmentTopicName.getLocalName();
}

private CompletableFuture<Void> terminateSegmentTopic(String segmentTopicName) {
try {
return brokerService.getPulsar().getAdminClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ScalableTopics;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
Expand Down Expand Up @@ -118,14 +117,14 @@ public void setUp() throws Exception {
// Default: all admin ops succeed.
when(topics.getSubscriptionsAsync(anyString()))
.thenReturn(CompletableFuture.completedFuture(java.util.List.of()));
when(topics.createSubscriptionAsync(anyString(), anyString(), any(MessageId.class)))
.thenReturn(CompletableFuture.completedFuture(null));
when(topics.deleteSubscriptionAsync(anyString(), anyString(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(scalableTopics.createSegmentAsync(anyString(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(scalableTopics.terminateSegmentAsync(anyString()))
.thenReturn(CompletableFuture.completedFuture(null));
when(scalableTopics.createSegmentSubscriptionAsync(anyString(), anyString()))
.thenReturn(CompletableFuture.completedFuture(null));
when(scalableTopics.deleteSegmentSubscriptionAsync(anyString(), anyString()))
.thenReturn(CompletableFuture.completedFuture(null));

controller = newController(topicName);
}
Expand Down Expand Up @@ -291,9 +290,9 @@ public void testCreateSubscriptionStream() throws Exception {
resources.getSubscriptionAsync(topicName, "sub-stream").get();
assertTrue(persisted.isPresent());
assertEquals(persisted.get().type(), SubscriptionType.STREAM);
// Propagated to every active segment via admin.topics().createSubscriptionAsync().
verify(topics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
.createSubscriptionAsync(anyString(), anyString(), any(MessageId.class));
// Propagated to every active segment via the segment-subscription admin endpoint.
verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
.createSegmentSubscriptionAsync(anyString(), anyString());
}

@Test
Expand Down Expand Up @@ -324,8 +323,8 @@ public void testDeleteSubscription() throws Exception {
controller.deleteSubscription("sub-a").get();
assertFalse(resources.getSubscriptionAsync(topicName, "sub-a").get().isPresent());
// Propagated a delete to every segment (all segments incl. any sealed ones).
verify(topics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS))
.deleteSubscriptionAsync(anyString(), anyString(), anyBoolean());
verify(scalableTopics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS))
.deleteSegmentSubscriptionAsync(anyString(), anyString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import static org.testng.Assert.assertEquals;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import lombok.Cleanup;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
import org.testng.annotations.Test;

/**
* Coverage for {@code admin.scalableTopics().createSubscription(...)}: the admin
* API must materialize a cursor on every active segment so a consumer that
* subscribes <em>after</em> messages are produced still receives those
* messages — the whole point of pre-creating the subscription.
*
* <p>The behavioural assertion (a late consumer sees pre-subscription messages)
* is the user-facing guarantee, and any regression in
* {@code ScalableTopicController.createSubscriptionOnSegment} — which converts
* each {@code SegmentInfo} to the underlying {@code persistent://} topic and
* pre-creates the cursor through the standard topic admin API — would surface
* here as messages going missing.
*/
public class V5ScalableSubscriptionAdminTest extends V5ClientBaseTest {

@Test
public void testPreCreatedSubscriptionRetainsPreProductionMessages() throws Exception {
String topic = newScalableTopic(3);
String subscription = "pre-created-sub";

// Pre-create the subscription on the scalable topic. This must materialize a
// cursor on every active segment so that subsequent produces are retained
// until the consumer drains them.
admin.scalableTopics().createSubscription(topic, subscription,
ScalableSubscriptionType.QUEUE);

// Produce *before* any consumer subscribes.
@Cleanup
Producer<String> producer = v5Client.newProducer(Schema.string())
.topic(topic)
.create();
int n = 30;
Set<String> sent = new HashSet<>();
for (int i = 0; i < n; i++) {
String v = "msg-" + i;
producer.newMessage().key("k-" + i).value(v).send();
sent.add(v);
}

// Subscribe with the SAME subscription name. If createSubscription truly
// pre-created cursors on every segment, the consumer must receive every
// message produced above. If it didn't, the consumer attaches at "latest"
// by default and drops the entire backlog.
@Cleanup
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName(subscription)
.subscribe();

Set<String> received = new HashSet<>();
long deadline = System.currentTimeMillis() + 30_000L;
while (received.size() < n && System.currentTimeMillis() < deadline) {
Message<String> msg = consumer.receive(Duration.ofSeconds(1));
if (msg != null) {
received.add(msg.value());
consumer.acknowledge(msg.id());
}
}
assertEquals(received, sent,
"pre-created subscription must retain every message produced before consumer subscribed");
}
}
Loading
Loading