diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ec61d58d2afe4..ad13c9f325c71 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -323,7 +323,12 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean authoritative, Map properties) { - CompletableFuture ret = validateNonPartitionTopicNameAsync(topicName.getLocalName()); + CompletableFuture ret = namespaceResources().namespaceExistsAsync(namespaceName) + .thenAccept(exists -> { + if (!exists) { + throw new RestException(Status.NOT_FOUND, "V1 namespace [" + namespaceName + "] does not exist"); + } + }).thenCompose(__ -> validateNonPartitionTopicNameAsync(topicName.getLocalName())); if (topicName.isGlobal()) { ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 4424ceded8127..8b7e282e3fa3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -143,7 +143,7 @@ public void getTopics(@Suspended AsyncResponse response, @ApiParam(value = "Include system topic") @QueryParam("includeSystemTopic") boolean includeSystemTopic) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS) + validateNamespaceOperationAsync(NamespaceName.get(property, cluster, namespace), NamespaceOperation.GET_TOPICS) // Validate that namespace exists, throws 404 if it doesn't exist .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenCompose(policies -> internalGetListOfTopics(policies, mode)) @@ -292,7 +292,8 @@ public void getPermissions(@Suspended AsyncResponse response, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION) + validateNamespaceOperationAsync(NamespaceName.get(property, cluster, namespace), + NamespaceOperation.GET_PERMISSION) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(namespaceName)) .thenAccept(permissions -> response.resume(permissions)) .exceptionally(ex -> { @@ -823,7 +824,7 @@ public void getBundlesData(@Suspended final AsyncResponse asyncResponse, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); validatePoliciesReadOnlyAccessAsync() - .thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(property, namespace), + .thenCompose(__ -> validateNamespaceOperationAsync(NamespaceName.get(property, cluster, namespace), NamespaceOperation.GET_BUNDLE)) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenAccept(policies -> asyncResponse.resume(policies.bundles)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 353f2fa6f2ecd..5cf036e9c5c94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1074,7 +1074,12 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) { } }); } - return CompletableFuture.completedFuture(null); + return namespaceResources().namespaceExistsAsync(namespaceName) + .thenAccept(exists -> { + if (!exists) { + throw new RestException(Status.NOT_FOUND, "Namespace [" + namespaceName + "] does not exist"); + } + }); } public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 91fe43f8b396a..efa131348cd0b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -194,6 +194,26 @@ public void cleanup() throws Exception { conf.setClusterName(configClusterName); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) { + admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder() + .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build()); + } + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Test public void internalConfiguration() throws Exception { ServiceConfiguration conf = pulsar.getConfiguration(); @@ -948,7 +968,7 @@ public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception { @Test public void test500Error() throws Exception { final String property = "prop-xyz"; - final String cluster = "use"; + final String cluster = pulsar.getConfig().getClusterName(); final String namespace = "ns"; final String partitionedTopicName = "error-500-topic"; AsyncResponse response1 = mock(AsyncResponse.class); @@ -958,6 +978,7 @@ public void test500Error() throws Exception { NamespaceService namespaceService = pulsar.getNamespaceService(); doReturn(future).when(namespaceService).checkTopicExists(any()); + createNamespaceIfAbsent(TopicName.get("persistent", property, cluster, namespace, partitionedTopicName)); persistentTopics.createPartitionedTopic(response1, property, cluster, namespace, partitionedTopicName, 5, false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java index b9a0849e01f89..76de52e4fbabb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java @@ -68,6 +68,7 @@ public class AdminTopicApiTest extends ProducerConsumerBase { protected void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); + admin.namespaces().createNamespace("my-property/test/my-ns"); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 4b796fcca3385..d26c0c967f772 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ClientCnx; @@ -635,4 +636,37 @@ public void testTenantNotExist(TopicDomain topicDomain) throws Exception { assertEquals(getLookupRequestPermits(), lookupPermitsBefore); }); } + + @Test + public void testTopicNameContainsSpecialCharacters() throws Exception { + // V1 topic create failed because v1 namespace does not exist. + String topic = "persistent://public/default/tp/h"; + try { + admin1.topics().createNonPartitionedTopic(topic); + fail("Expected a namespace not found ex, since the v1 namespace does not exist"); + } catch (PulsarAdminException.NotFoundException ex) { + assertTrue(ex.getMessage().contains("does not exist")); + } + try { + admin1.topics().createPartitionedTopic(topic, 1); + fail("Expected a namespace not found ex, since the v1 namespace does not exist"); + } catch (PulsarAdminException.NotFoundException ex) { + assertTrue(ex.getMessage().contains("does not exist")); + } + + // The topic can be created after v1 namespace was created + String v1Ns = "public/" + pulsar1.getConfig().getClusterName() + "/default"; + admin1.namespaces().createNamespace(v1Ns); + String topic2 = "persistent://" + v1Ns + "/h"; + String topic3 = "persistent://" + v1Ns + "/h2"; + admin1.topics().createNonPartitionedTopic(topic2); + admin1.topics().createPartitionedTopic(topic3, 1); + List v1Topics = admin1.topics().getList(v1Ns); + assertTrue(v1Topics.contains(topic2)); + assertTrue(v1Topics.contains(topic3 + "-partition-0")); + + // cleanup. + admin1.topics().delete(topic2, false); + admin1.topics().deletePartitionedTopic(topic3, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 752ce1d6d9cd3..9235f8e535980 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -189,6 +189,26 @@ protected void setup() throws Exception { admin.namespaces().createNamespace(testTenant + "/" + testNamespaceLocal); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) { + admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder() + .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build()); + } + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Override @AfterMethod(alwaysRun = true) protected void cleanup() throws Exception { @@ -715,15 +735,19 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix final String partitionedTopicName = "special-topic"; pulsar.getDefaultManagedLedgerFactory() .open(TopicName.get(nonPartitionTopicName2).getPersistenceNamingEncoding()); + final TopicName topicName = TopicName.get("persistent", testTenant, + pulsar.getConfig().getClusterName(), testNamespace, partitionedTopicName); + createNamespaceIfAbsent(topicName); doAnswer(invocation -> { - persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace"); - persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname"); + persistentTopics.namespaceName = NamespaceName.get(testTenant, testNamespace); + persistentTopics.topicName = topicName; return null; }).when(persistentTopics).validatePartitionedTopicName(any(), any(), any()); doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString()); AsyncResponse response = mock(AsyncResponse.class); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index cbfc5b1d236b4..e2af99f37a5cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -47,7 +47,10 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.Assert; @@ -70,6 +73,26 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) { + admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder() + .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build()); + } + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100; private static class TestConsumerStateEventListener implements ConsumerEventListener { @@ -169,7 +192,9 @@ FailoverConsumer createConsumer(String topicName, String subName, String listene @Test public void testSimpleConsumerEventsWithoutPartition() throws Exception { - final String topicName = "persistent://prop/use/ns-abc/failover-topic1-" + System.currentTimeMillis(); + final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + + "/ns-abc/failover-topic1-" + System.currentTimeMillis(); + createNamespaceIfAbsent(TopicName.get(topicName)); final String subName = "sub1"; final int numMsgs = 100; @@ -314,7 +339,9 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { int numPartitions = 4; final String topicName = BrokerTestUtil.newUniqueName( - "persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition"); + "persistent://prop/" + pulsar.getConfig().getClusterName() + + "/ns-abc/testSimpleConsumerEventsWithPartition"); + createNamespaceIfAbsent(TopicName.get(topicName)); final TopicName destName = TopicName.get(topicName); final String subName = "sub1"; final int numMsgs = 100; @@ -501,7 +528,8 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { @Test public void testActiveConsumerFailoverWithDelay() throws Exception { - final String topicName = "persistent://prop/use/ns-abc/failover-topic3"; + final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + "/ns-abc/failover-topic3"; + createNamespaceIfAbsent(TopicName.get(topicName)); final String subName = "sub1"; final int numMsgs = 100; List> receivedMessages = new ArrayList<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index 375bbff8a4df4..accb6a4d680a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -27,13 +27,14 @@ import java.util.Set; import java.util.concurrent.Future; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.tests.EnumValuesDataProvider; import org.slf4j.Logger; @@ -58,11 +59,28 @@ protected void cleanup() throws Exception { internalCleanup(); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values") - public void producerSendAsync(TopicType topicType) throws PulsarClientException, PulsarAdminException { + public void producerSendAsync(TopicType topicType) throws Exception { // Given String key = "producerSendAsync-" + topicType; - final String topicName = "persistent://prop/cluster/namespace/topic-" + key; + final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + "/namespace/topic-" + key; + createNamespaceIfAbsent(TopicName.get(topicName)); final String subscriptionName = "my-subscription-" + key; final String messagePrefix = "my-message-" + key + "-"; final int numberOfMessages = 30; @@ -126,15 +144,17 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException, } @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values") - public void producerSend(TopicType topicType) throws PulsarClientException, PulsarAdminException { + public void producerSend(TopicType topicType) throws Exception { // Given String key = "producerSend-" + topicType; - final String topicName = "persistent://prop/cluster/namespace/topic-" + key; + final String topicName = "persistent://prop/" + pulsar.getConfig().getClusterName() + "/namespace/topic-" + key; + createNamespaceIfAbsent(TopicName.get(topicName)); final String subscriptionName = "my-subscription-" + key; final String messagePrefix = "my-message-" + key + "-"; final int numberOfMessages = 30; if (topicType == TopicType.PARTITIONED) { int numberOfPartitions = 7; + createNamespaceIfAbsent(TopicName.get(topicName)); admin.topics().createPartitionedTopic(topicName, numberOfPartitions); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index 700e0859de644..f314e21d7b782 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -50,6 +50,9 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -66,6 +69,15 @@ public class ZeroQueueSizeTest extends BrokerTestBase { @Override public void setup() throws Exception { baseSetup(); + admin.clusters().createCluster("use", + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Collections.emptySet(), Set.of("use")); + admin.tenants().createTenant("prop-xyz", tenantInfo); + TenantInfo tenantInfo1 = admin.tenants().getTenantInfo("prop"); + tenantInfo1.getAllowedClusters().add("use"); + admin.tenants().updateTenant("prop", tenantInfo1); + admin.namespaces().createNamespace("prop-xyz/use/ns-abc"); + admin.namespaces().createNamespace("prop/use/ns-abc"); } @AfterClass(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 2930256a75c37..9a6d896596e02 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -902,6 +902,8 @@ public void testDeleteTopicAndSchemaForV1() throws Exception { final String topicOne = "not-partitioned-topic"; final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + namespace + "/partitioned-topic"; + admin.namespaces().createNamespace(tenant + "/" + cluster + "/" + namespace); + // persistent, non-partitioned v1/topic final String topic1 = TopicName.get( TopicDomain.persistent.value(), diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index fce91d5b2c290..cb7f94d174f04 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -29,7 +29,6 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -37,12 +36,13 @@ import lombok.Cleanup; import lombok.NoArgsConstructor; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -68,7 +68,7 @@ public void cleanup() throws Exception { } @Test - public void testInitialization() throws InterruptedException, ExecutionException, PulsarAdminException { + public void testInitialization() throws Exception { Properties properties = new Properties(); properties.setProperty("serviceUrl", brokerUrl.toString()); @@ -81,7 +81,7 @@ public void testInitialization() throws InterruptedException, ExecutionException admin.tenants().createTenant(tenantName, tenantInfo); String topicName = String.format("persistent://%s/ns/topic-scale-ns-0/topic", tenantName); - + createNamespaceIfAbsent(TopicName.get(topicName)); int numberOfMessages = 10; @Cleanup("shutdownNow") @@ -123,7 +123,7 @@ public void testNonDurableSubscribe() throws Exception { properties.setProperty("useTls", "false"); final String topicName = getTopicWithRandomSuffix("non-durable"); - admin.topics().createNonPartitionedTopic(topicName); + createNamespaceIfAbsent(TopicName.get(topicName)); int numberOfMessages = 10; @Cleanup("shutdownNow") @@ -215,6 +215,7 @@ public void testRead() throws Exception { properties.setProperty("useTls", "false"); final String topicName = getTopicWithRandomSuffix("reader"); + createNamespaceIfAbsent(TopicName.get(topicName)); admin.topics().createNonPartitionedTopic(topicName); int numberOfMessages = 10; @@ -258,6 +259,22 @@ public void testRead() throws Exception { .until(()->admin.topics().getSubscriptions(topicName).size() == 0); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Test(timeOut = 20000) public void testEncryption() throws Exception { Properties properties = new Properties(); @@ -265,6 +282,7 @@ public void testEncryption() throws Exception { properties.setProperty("useTls", "false"); final String topicName = getTopicWithRandomSuffix("encryption"); + createNamespaceIfAbsent(TopicName.get(topicName)); admin.topics().createNonPartitionedTopic(topicName); final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/"; final int numberOfMessages = 10; @@ -432,8 +450,9 @@ public void testSendMultipleMessage() throws Exception { } } - private static String getTopicWithRandomSuffix(String localNameBase) { - return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString()); + private String getTopicWithRandomSuffix(String localNameBase) { + return String.format("persistent://prop/%s/test/%s-%s", pulsar.getConfiguration().getClusterName(), + localNameBase, UUID.randomUUID().toString()); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java index c8cc6a7aa6ad4..b8b1ab3be2d9d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java @@ -43,6 +43,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -104,6 +106,22 @@ protected void setup() throws Exception { proxyService.start(); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Override @AfterMethod(alwaysRun = true) protected void cleanup() throws Exception { @@ -173,6 +191,7 @@ public void testPartitions() throws Exception { String topicName = "persistent://sample/test/local/partitioned-topic" + System.currentTimeMillis(); TenantInfoImpl tenantInfo = createDefaultTenantInfo(); admin.tenants().createTenant("sample", tenantInfo); + createNamespaceIfAbsent(TopicName.get(topicName)); admin.topics().createPartitionedTopic(topicName, 2); @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java index 235672020328b..2086ff74608ef 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java @@ -39,6 +39,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -111,6 +113,22 @@ protected PulsarClient internalSetUpForClient(boolean addCertificates, String lo return clientBuilder.build(); } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Override @AfterMethod(alwaysRun = true) protected void cleanup() throws Exception { @@ -160,6 +178,7 @@ public void testPartitions() throws Exception { String topicName = "persistent://sample/test/local/partitioned-topic" + System.currentTimeMillis(); TenantInfoImpl tenantInfo = createDefaultTenantInfo(); admin.tenants().createTenant("sample", tenantInfo); + createNamespaceIfAbsent(TopicName.get(topicName)); admin.topics().createPartitionedTopic(topicName, 2); @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index ee0f8010b7d79..0c00e9ec2e7c2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -91,6 +91,9 @@ protected void setup() throws Exception { Optional proxyLogLevel = Optional.of(2); assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel()); proxyService.start(); + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + admin.tenants().createTenant("sample", tenantInfo); + admin.namespaces().createNamespace("sample/test/local"); } @Override @@ -151,8 +154,6 @@ public void testProducerConsumer() throws Exception { @Test public void testPartitions() throws Exception { - TenantInfoImpl tenantInfo = createDefaultTenantInfo(); - admin.tenants().createTenant("sample", tenantInfo); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 64ea589e023a6..09b7b82d34f0f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -246,6 +246,7 @@ public void testProducerConsumer() throws Exception { public void testPartitions() throws Exception { TenantInfoImpl tenantInfo = createDefaultTenantInfo(); admin.tenants().createTenant("sample", tenantInfo); + admin.namespaces().createNamespace("sample/test/local"); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index f299dbe39ea10..94e4de367ea15 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -34,6 +34,9 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -89,6 +92,26 @@ protected void cleanup() throws Exception { } } + private void createNamespaceIfAbsent(TopicName topicName) throws Exception { + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + NamespaceName namespaceName = topicName.getNamespaceObject(); + if (!namespaceName.isV2()) { + if (!admin.clusters().getClusters().contains(namespaceName.getCluster())) { + admin.clusters().createCluster(namespaceName.getCluster(), ClusterData.builder() + .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build()); + } + tenantInfo.getAllowedClusters().add(namespaceName.getCluster()); + } + if (!admin.tenants().getTenants().contains(topicName.getTenant())) { + admin.tenants().createTenant(topicName.getTenant(), tenantInfo); + } + try { + admin.namespaces().createNamespace(topicName.getNamespace()); + } catch (Exception ex) { + // Namespace may already exist. + } + } + @Test public void testProducer() throws Exception { @Cleanup @@ -96,6 +119,7 @@ public void testProducer() throws Exception { .serviceUrl(proxyService.getServiceUrlTls()) .allowTlsInsecureConnection(false) .tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build(); + admin.topics().createPartitionedTopic("persistent://sample/test/local/topic", 2); Producer producer = client.newProducer(Schema.BYTES) .topic("persistent://sample/test/local/topic").create(); @@ -112,14 +136,17 @@ public void testPartitions() throws Exception { .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build(); TenantInfoImpl tenantInfo = createDefaultTenantInfo(); admin.tenants().createTenant("sample", tenantInfo); - admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); + String topicName = "persistent://sample/" + pulsar.getConfig().getClusterName() + + "/local/partitioned-topic"; + createNamespaceIfAbsent(TopicName.get(topicName)); + admin.topics().createPartitionedTopic(topicName, 2); Producer producer = client.newProducer(Schema.BYTES) - .topic("persistent://sample/test/local/partitioned-topic") + .topic(topicName) .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // Create a consumer directly attached to broker - Consumer consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic") + Consumer consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").subscribe(); for (int i = 0; i < 10; i++) {