diff --git a/README.md b/README.md index ea0694f752..174bf57e4f 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ mounting them into the `apachepulsar/pulsar` Docker image. | Kafka | Apache Kafka | | Kinesis | Amazon Kinesis Data Streams | | MongoDB | MongoDB | +| MQTT | MQTT broker | | Redis | Redis | | Solr | Apache Solr | diff --git a/distribution/io/build.gradle.kts b/distribution/io/build.gradle.kts index 9aa7b81470..66f3a6c7fe 100644 --- a/distribution/io/build.gradle.kts +++ b/distribution/io/build.gradle.kts @@ -65,6 +65,7 @@ dependencies { connectorNars(project(":influxdb")) connectorNars(project(":redis")) connectorNars(project(":solr")) + connectorNars(project(":mqtt")) connectorNars(project(":dynamodb")) connectorNars(project(":alluxio")) connectorNars(project(":azure-data-explorer")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e397b65b12..ca155662eb 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -176,6 +176,7 @@ aerospike-client = "4.5.0" aws-sdk = "1.12.788" aws-sdk2 = "2.32.28" rabbitmq-client = "5.28.0" +hivemq-mqtt-client = "1.3.13" cassandra-driver = "3.11.2" mongodb-driver = "5.4.0" influxdb-client = "7.3.0" @@ -494,6 +495,7 @@ solr-test-framework = { module = "org.apache.solr:solr-test-framework", version. solr-core = { module = "org.apache.solr:solr-core", version.ref = "solr" } # Messaging rabbitmq-amqp-client = { module = "com.rabbitmq:amqp-client", version.ref = "rabbitmq-client" } +hivemq-mqtt-client = { module = "com.hivemq:hivemq-mqtt-client", version.ref = "hivemq-mqtt-client" } nsq-j = { module = "com.sproutsocial:nsq-j", version.ref = "nsq-client" } # Time series influxdb-client-java = { module = "com.influxdb:influxdb-client-java", version.ref = "influxdb-client" } diff --git a/mqtt/build.gradle.kts b/mqtt/build.gradle.kts new file mode 100644 index 0000000000..343be0c079 --- /dev/null +++ b/mqtt/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("pulsar-connectors.java-conventions") + id("pulsar-connectors.nar-conventions") +} + +dependencies { + implementation(libs.pulsar.io.common) + implementation(libs.pulsar.io.core) + implementation(libs.pulsar.functions.instance) + implementation(libs.jackson.databind) + implementation(libs.jackson.dataformat.yaml) + implementation(libs.commons.lang3) + implementation(libs.hivemq.mqtt.client) + + testImplementation(libs.testcontainers) +} diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java new file mode 100644 index 0000000000..2331147d78 --- /dev/null +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.Connector; +import org.apache.pulsar.io.core.annotations.IOType; + +@Connector( + name = "mqtt-sink", + type = IOType.SINK, + help = "A sink connector that moves messages from Pulsar to MQTT.", + configClass = MqttSinkConfig.class +) +@Slf4j +public class MqttSink implements Sink { + + private MqttSinkConfig mqttSinkConfig; + private Mqtt5AsyncClient mqttClient; + private MqttQos mqttQos; + + @Override + public void open(Map config, SinkContext sinkContext) throws Exception { + mqttSinkConfig = MqttSinkConfig.load(config, sinkContext); + mqttSinkConfig.validate(); + mqttQos = MqttQos.fromCode(mqttSinkConfig.getQos()); + + var builder = MqttClient.builder() + .useMqttVersion5() + .serverHost(mqttSinkConfig.getServerHost()) + .serverPort(mqttSinkConfig.getServerPort()); + + if (StringUtils.isNotBlank(mqttSinkConfig.getClientId())) { + builder = builder.identifier(mqttSinkConfig.getClientId()); + } + if (mqttSinkConfig.isSslEnabled()) { + builder = builder.sslWithDefaultConfig(); + } + + mqttClient = builder.buildAsync(); + if (StringUtils.isNotBlank(mqttSinkConfig.getUsername())) { + var authBuilder = mqttClient.connectWith() + .cleanStart(mqttSinkConfig.isCleanStart()) + .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec()) + .simpleAuth() + .username(mqttSinkConfig.getUsername()); + if (mqttSinkConfig.getPassword() != null) { + authBuilder = authBuilder.password(mqttSinkConfig.getPassword().getBytes(StandardCharsets.UTF_8)); + } + authBuilder.applySimpleAuth() + .send() + .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } else { + mqttClient.connectWith() + .cleanStart(mqttSinkConfig.isCleanStart()) + .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec()) + .send() + .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } + log.info("MQTT sink connected to {}:{}.", + mqttSinkConfig.getServerHost(), mqttSinkConfig.getServerPort()); + } + + @Override + public void write(Record record) { + try { + byte[] payload = record.getValue() == null ? new byte[0] : record.getValue(); + mqttClient.publishWith() + .topic(mqttSinkConfig.getTopic()) + .qos(mqttQos) + .payload(payload) + .send() + .whenComplete((result, throwable) -> { + if (throwable == null) { + record.ack(); + } else { + record.fail(); + log.warn("Failed to publish message to MQTT topic {}", + mqttSinkConfig.getTopic(), throwable); + } + }); + } catch (Exception e) { + record.fail(); + log.warn("Failed to schedule MQTT publish for topic {}", mqttSinkConfig.getTopic(), e); + } + } + + @Override + public void close() { + if (mqttClient == null) { + return; + } + + try { + mqttClient.disconnectWith() + .send() + .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.warn("Failed to disconnect MQTT client cleanly", e); + } + } +} diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java new file mode 100644 index 0000000000..4ed1cd927a --- /dev/null +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +@Data +@Accessors(chain = true) +public class MqttSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The MQTT broker host.") + private String serverHost; + + @FieldDoc( + required = true, + defaultValue = "1883", + help = "The MQTT broker port.") + private int serverPort = 1883; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The MQTT topic to publish messages to.") + private String topic; + + @FieldDoc( + defaultValue = "", + help = "MQTT client id used for the broker connection.") + private String clientId; + + @FieldDoc( + defaultValue = "", + sensitive = true, + help = "MQTT username.") + private String username; + + @FieldDoc( + defaultValue = "", + sensitive = true, + help = "MQTT password.") + private String password; + + @FieldDoc( + defaultValue = "0", + help = "MQTT QoS level for outgoing messages. Valid values: 0, 1, 2.") + private int qos = 0; + + @FieldDoc( + defaultValue = "60", + help = "MQTT keep alive interval in seconds.") + private int keepAliveIntervalSec = 60; + + @FieldDoc( + defaultValue = "10000", + help = "Timeout in milliseconds for MQTT connect/disconnect operations.") + private long connectionTimeoutMs = 10000L; + + @FieldDoc( + defaultValue = "true", + help = "Whether to start with a clean session.") + private boolean cleanStart = true; + + @FieldDoc( + defaultValue = "false", + help = "Enable SSL/TLS with the client default SSL configuration.") + private boolean sslEnabled = false; + + public static MqttSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), MqttSinkConfig.class); + } + + public static MqttSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, MqttSinkConfig.class, sinkContext); + } + + public void validate() { + Preconditions.checkArgument(StringUtils.isNotBlank(serverHost), "serverHost cannot be blank"); + Preconditions.checkArgument(serverPort > 0, "serverPort must be a positive integer"); + Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); + Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 0, 1, 2"); + Preconditions.checkArgument(keepAliveIntervalSec >= 0, "keepAliveIntervalSec must be >= 0"); + Preconditions.checkArgument(connectionTimeoutMs > 0, "connectionTimeoutMs must be > 0"); + } +} diff --git a/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000000..d56ad14364 --- /dev/null +++ b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +name: mqtt-sink +description: MQTT sink connector +sinkClass: org.apache.pulsar.io.mqtt.MqttSink +sinkConfigClass: org.apache.pulsar.io.mqtt.MqttSinkConfig diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java new file mode 100644 index 0000000000..e341f6ec8f --- /dev/null +++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +public class MqttSinkConfigTest { + + @Test + public void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + MqttSinkConfig config = MqttSinkConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + assertEquals(config.getServerHost(), "localhost"); + assertEquals(config.getServerPort(), 1883); + assertEquals(config.getTopic(), "test/topic"); + assertEquals(config.getClientId(), "pulsar-mqtt-test"); + assertEquals(config.getUsername(), "mqtt-user"); + assertEquals(config.getPassword(), "mqtt-password"); + assertEquals(config.getQos(), 1); + assertEquals(config.getKeepAliveIntervalSec(), 45); + assertEquals(config.getConnectionTimeoutMs(), 15000L); + assertTrue(config.isCleanStart()); + assertFalse(config.isSslEnabled()); + } + + @Test + public void loadFromMapTest() throws IOException { + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), sinkContext); + + assertNotNull(config); + assertEquals(config.getServerHost(), "localhost"); + assertEquals(config.getServerPort(), 1883); + assertEquals(config.getTopic(), "test/topic"); + assertEquals(config.getClientId(), "pulsar-mqtt-test"); + assertEquals(config.getQos(), 1); + } + + @Test + public void loadFromMapCredentialsFromSecretTest() throws IOException { + Map map = baseConfigMap(); + map.remove("username"); + map.remove("password"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("username")).thenReturn("mqtt-user"); + Mockito.when(sinkContext.getSecret("password")).thenReturn("mqtt-password"); + + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + assertEquals(config.getUsername(), "mqtt-user"); + assertEquals(config.getPassword(), "mqtt-password"); + } + + @Test + public void validValidateTest() throws IOException { + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), sinkContext); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "qos must be one of 0, 1, 2") + public void invalidQosValidateTest() throws IOException { + Map map = baseConfigMap(); + map.put("qos", 3); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + config.validate(); + } + + private static Map baseConfigMap() { + Map map = new HashMap<>(); + map.put("serverHost", "localhost"); + map.put("serverPort", 1883); + map.put("topic", "test/topic"); + map.put("clientId", "pulsar-mqtt-test"); + map.put("username", "mqtt-user"); + map.put("password", "mqtt-password"); + map.put("qos", 1); + map.put("keepAliveIntervalSec", 45); + map.put("connectionTimeoutMs", 15000); + map.put("cleanStart", true); + map.put("sslEnabled", false); + return map; + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } +} diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java new file mode 100644 index 0000000000..ba9a1e1332 --- /dev/null +++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MqttSinkTest { + + private static final int MQTT_PORT = 1883; + private static final String TEST_TOPIC = "pulsar/mqtt/e2e"; + private static final DockerImageName MOSQUITTO_IMAGE = DockerImageName.parse("eclipse-mosquitto:2"); + + private final GenericContainer mqttContainer = new GenericContainer<>(MOSQUITTO_IMAGE) + .withExposedPorts(MQTT_PORT); + + @BeforeClass(alwaysRun = true) + public void beforeClass() { + mqttContainer.start(); + } + + @AfterClass(alwaysRun = true) + public void afterClass() { + mqttContainer.stop(); + } + + @Test + public void testWriteE2EWithMosquitto() throws Exception { + BlockingQueue receivedPayloads = new LinkedBlockingQueue<>(); + CountDownLatch ackLatch = new CountDownLatch(3); + AtomicBoolean failCalled = new AtomicBoolean(false); + + Mqtt5AsyncClient subscriber = MqttClient.builder() + .useMqttVersion5() + .serverHost(mqttContainer.getHost()) + .serverPort(mqttContainer.getMappedPort(MQTT_PORT)) + .identifier("mqtt-sink-e2e-subscriber") + .buildAsync(); + + subscriber.connectWith() + .cleanStart(true) + .send() + .get(10, TimeUnit.SECONDS); + subscriber.subscribeWith() + .topicFilter(TEST_TOPIC) + .callback(publish -> receivedPayloads.add( + new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8))) + .send() + .get(10, TimeUnit.SECONDS); + + Map config = new HashMap<>(); + config.put("serverHost", mqttContainer.getHost()); + config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT)); + config.put("topic", TEST_TOPIC); + config.put("qos", 1); + config.put("connectionTimeoutMs", 10000); + config.put("clientId", "mqtt-sink-e2e-publisher"); + + SinkContext sinkContext = mock(SinkContext.class); + try (MqttSink sink = new MqttSink()) { + sink.open(config, sinkContext); + + for (int i = 0; i < 3; i++) { + sink.write(new TestRecord(("msg-" + i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled)); + } + + assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out waiting for record.ack()"); + assertFalse(failCalled.get(), "record.fail() should not be called on successful publish"); + + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2"); + } finally { + subscriber.disconnectWith() + .sessionExpiryInterval(0) + .send() + .get(10, TimeUnit.SECONDS); + } + } + + private static final class TestRecord implements Record { + private final byte[] value; + private final CountDownLatch ackLatch; + private final AtomicBoolean failCalled; + + private TestRecord(byte[] value, CountDownLatch ackLatch, AtomicBoolean failCalled) { + this.value = value; + this.ackLatch = ackLatch; + this.failCalled = failCalled; + } + + @Override + public byte[] getValue() { + return value; + } + + @Override + public void ack() { + ackLatch.countDown(); + } + + @Override + public void fail() { + failCalled.set(true); + } + } +} diff --git a/mqtt/src/test/resources/sinkConfig.yaml b/mqtt/src/test/resources/sinkConfig.yaml new file mode 100644 index 0000000000..69eb3f7ee5 --- /dev/null +++ b/mqtt/src/test/resources/sinkConfig.yaml @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +serverHost: localhost +serverPort: 1883 +topic: test/topic +clientId: pulsar-mqtt-test +username: mqtt-user +password: mqtt-password +qos: 1 +keepAliveIntervalSec: 45 +connectionTimeoutMs: 15000 +cleanStart: true +sslEnabled: false diff --git a/settings.gradle.kts b/settings.gradle.kts index d186f01b9c..cdef210517 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -86,6 +86,7 @@ include("nsq") include("rabbitmq") include("redis") include("solr") +include("mqtt") // JDBC — parent + sub-modules with qualified names to avoid clashes with debezium include("jdbc")