diff --git a/src/UserGuide/Master/Table/API/Programming-MQTT.md b/src/UserGuide/Master/Table/API/Programming-MQTT.md new file mode 100644 index 000000000..2c1baf069 --- /dev/null +++ b/src/UserGuide/Master/Table/API/Programming-MQTT.md @@ -0,0 +1,202 @@ + + +# MQTT Protocol + +[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. +It was designed as an extremely lightweight publish/subscribe messaging transport. +It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. + +IoTDB supports the MQTT v3.1(an OASIS Standard) protocol. +IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly. + + + +## Built-in MQTT Service +The built-in MQTT service provides the ability to connect directly to the IoTDB via MQTT. It listens for publish messages from the MQTT client and immediately writes data to storage if the database already exists. Note: If the database does not exist, it will not be created, so it needs to be created in advance. + +The portion of the MQTT topic before / is defined as the name of the database into which the IoTDB is stored, or if / is not present, the topic is defined directly as the name of the database into which the IoTDB is stored. + +Message payloads can be formatted as events by the `PayloadFormatter` loaded by the Java SPI, and the default implementation of the table model is `LinePayloadFormatter`. + +The table model uses the row protocol by default, where attribute_key is an optional field. The following is the definition of the row protocol: +``` +[,=[,=]][ =[,=]] =[,=] [] +``` + +The following is an example of an MQTT message payload: +``` + myTable,tag1=t1,tag2=t2 attr1=a1,attr2=a2 fieldKey="fieldValue" 1740109006000 + myTable,tag1=t1,tag2=t2 fieldKey="fieldValue" 1740109006001 +``` + + + + +## MQTT Configurations +The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties` by default. +Writing to the IoTDB table model using MQTT requires configuring `mqtt_payload_formatter` to `line` in `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`. + +Example +``` properties +enable_mqtt_service=true +mqtt_payload_formatter=line +``` + +Configurations are as follows: + +| NAME | DESCRIPTION | DEFAULT | +| ------------- |:-----------------------------------------------------------------------------------------------------------------------------------------------:|:---------:| +| enable_mqtt_service | whether to enable the mqtt service | false | +| mqtt_host | the mqtt service binding host | 127.0.0.1 | +| mqtt_port | the mqtt service binding port | 1883 | +| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 | +| mqtt_payload_formatter | the mqtt message payload formatter; Options: [json, line],The built-in json only supports tree models, and the line only supports table models. | json | +| mqtt_max_message_size | the max mqtt message size in byte | 1048576 | + + +## Coding Examples +The following is an example which a mqtt client send messages to IoTDB server. + + ```java +MQTT mqtt = new MQTT(); +mqtt.setHost("127.0.0.1", 1883); +mqtt.setUserName("root"); +mqtt.setPassword("root"); + +BlockingConnection connection = mqtt.blockingConnection(); +connection.connect(); + +String payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; + connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +connection.disconnect(); + ``` + + +## Customize your MQTT Message Format + +If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines +of codes. + +Steps: +1. Create a java project, and add dependency: +```xml + + org.apache.iotdb + iotdb-server + 2.0.0 + +``` +2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` + e.g., +```java + +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import io.netty.buffer.ByteBuf; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class CustomizedTablePayloadFormatter implements PayloadFormatter { + + @Override + public List format(ByteBuf payload) { + if (payload == null) { + return Collections.emptyList(); + } + + List ret = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + long ts = i; + TableMessage message = new TableMessage(); + message.setTable("tableTest"); + List tagKeys = new ArrayList<>(); + List tagValues = new ArrayList<>(); + tagKeys.add("tag1"); + tagValues.add(new Binary[] {new Binary("tagValue1".getBytes(StandardCharsets.UTF_8))}); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add("attribute1"); + attributeValues.add( + new Binary[] {new Binary("attributeValue1".getBytes(StandardCharsets.UTF_8))}); + message.setAttributeKeys(attributeKeys); + List fields = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + List values = new ArrayList<>(); + fields.add("field"); + values.add(1); + dataTypes.add(TSDataType.INT32); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + message.setAttributeValues(attributeValues); + message.setTimestamp(ts); + ret.add(message); + } + return ret; + } + + @Override + public String getName() { + return "Customized"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; + } +} +``` +3. modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`: + clean the file and put your implementation class name into the file. + In this example, the content is: `org.apache.iotdb.mqtt.server.CustomizedTablePayloadFormatter` +4. compile your implementation as a jar file: `mvn package -DskipTests` + + +Then, in your server: +1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. +2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) +3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation + , in this example, the value is `Customized` +4. Launch the IoTDB server. +5. Now IoTDB will use your implementation to parse the MQTT message. + + + diff --git a/src/UserGuide/Master/Tree/API/Programming-MQTT.md b/src/UserGuide/Master/Tree/API/Programming-MQTT.md index 98fca63d4..aaf88467d 100644 --- a/src/UserGuide/Master/Tree/API/Programming-MQTT.md +++ b/src/UserGuide/Master/Tree/API/Programming-MQTT.md @@ -112,7 +112,7 @@ Steps: org.apache.iotdb iotdb-server - 1.1.0-SNAPSHOT + 2.0.0 ``` 2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` @@ -121,9 +121,9 @@ e.g., ```java package org.apache.iotdb.mqtt.server; -import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TreeMessage; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -145,7 +145,7 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { // this is just an example, so we just generate some Messages directly for (int i = 0; i < 2; i++) { long ts = i; - Message message = new Message(); + TreeMessage message = new TreeMessage(); message.setDevice("d" + i); message.setTimestamp(ts); message.setMeasurements(Arrays.asList("s1", "s2")); @@ -160,6 +160,11 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: return "CustomizedJson"; } + + @Override + public String getType() { + return PayloadFormatter.TREE_TYPE; + } } ``` 3. modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`: diff --git a/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md b/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md new file mode 100644 index 000000000..e3327a6ba --- /dev/null +++ b/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md @@ -0,0 +1,200 @@ + + +# MQTT 协议 + +[MQTT](http://mqtt.org/) 是机器对机器(M2M)/“物联网”连接协议。 + +它被设计为一种非常轻量级的发布/订阅消息传递。 + +对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。 + +IoTDB 支持 MQTT v3.1(OASIS 标准)协议。 +IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。 + + + +## 内置 MQTT 服务 +内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,如果数据库已经存在,则立即将数据写入存储。注意:如果数据库不存在,将不会创建数据库,所以需提前创建数据库。 + +MQTT 主题中 / 之前的部分会被定义为存入 IoTDB 的数据库名字,如果不存在 / 则主题会被直接定义为存入 IoTDB 的数据库名字。 + +消息有效载荷可以由 Java SPI 加载的`PayloadFormatter`格式化为事件,表模型的默认实现为`LinePayloadFormatter`。 + +表模型默认使用行协议,其中attribute_key为可选字段。以下为行协议的定义: +``` +[,=[,=]][ =[,=]] =[,=] [] +``` + +以下是 MQTT 消息有效负载示例: +``` + myTable,tag1=t1,tag2=t2 attr1=a1,attr2=a2 fieldKey="fieldValue" 1740109006000 + myTable,tag1=t1,tag2=t2 fieldKey="fieldValue" 1740109006001 +``` + + + + +## MQTT 配置 +默认情况下,IoTDB MQTT 服务从`${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`加载配置。 +使用 MQTT 写入IoTDB表模型需要在 `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties` 中配置 `mqtt_payload_formatter` 为 `line`。 + +如 +``` properties +enable_mqtt_service=true +mqtt_payload_formatter=line +``` + +默认配置如下: + +| 名称 | 描述 | 默认 | +| ------------- |:------------------:|:---------:| +| enable_mqtt_service | 是否启用 mqtt 服务 | false | +| mqtt_host | mqtt 服务绑定主机 | 127.0.0.1 | +| mqtt_port | mqtt 服务绑定端口 | 1883 | +| mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 | +| mqtt_payload_formatter | mqtt 消息有效负载格式化程序;选项: [json,line],内置 json 仅支持树模型,line 仅支持表模型。 | json | +| mqtt_max_message_size | mqtt 消息最大长度(字节) | 1048576 | + + +## 示例代码 +以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。 + + ```java +MQTT mqtt = new MQTT(); +mqtt.setHost("127.0.0.1", 1883); +mqtt.setUserName("root"); +mqtt.setPassword("root"); + +BlockingConnection connection = mqtt.blockingConnection(); +connection.connect(); + +String payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; + connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +connection.disconnect(); + ``` + + +## 自定义 MQTT 消息格式 + +事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 + +步骤: +1. 创建一个 Java 项目,增加如下依赖 +```xml + + org.apache.iotdb + iotdb-server + 2.0.0 + +``` +2. 创建一个实现类,实现接口 `org.apache.iotdb.db.mqtt.protocol.PayloadFormatter` + +```java + +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import io.netty.buffer.ByteBuf; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class CustomizedTablePayloadFormatter implements PayloadFormatter { + + @Override + public List format(ByteBuf payload) { + if (payload == null) { + return Collections.emptyList(); + } + + List ret = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + long ts = i; + TableMessage message = new TableMessage(); + message.setTable("tableTest"); + List tagKeys = new ArrayList<>(); + List tagValues = new ArrayList<>(); + tagKeys.add("tag1"); + tagValues.add(new Binary[] {new Binary("tagValue1".getBytes(StandardCharsets.UTF_8))}); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add("attribute1"); + attributeValues.add( + new Binary[] {new Binary("attributeValue1".getBytes(StandardCharsets.UTF_8))}); + message.setAttributeKeys(attributeKeys); + List fields = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + List values = new ArrayList<>(); + fields.add("field"); + values.add(1); + dataTypes.add(TSDataType.INT32); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + message.setAttributeValues(attributeValues); + message.setTimestamp(ts); + ret.add(message); + } + return ret; + } + + @Override + public String getName() { + return "Customized"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; + } +} +``` +3. 修改项目中的 `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` 文件: + 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。 + 在本例中,文件内容为: `org.apache.iotdb.mqtt.server.CustomizedTablePayloadFormatter` +4. 编译项目生成一个 jar 包: `mvn package -DskipTests` + + +在 IoTDB 服务端: +1. 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。 +2. 打开 MQTT 服务参数. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) +3. 用刚才的实现类中的 getName() 方法的返回值 设置为 `conf/iotdb-system.properties` 中 `mqtt_payload_formatter` 的值, 在本例中,为 `Customized` +4. 启动 IoTDB +5. 搞定 + diff --git a/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md b/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md index 4a113cd13..592c9e637 100644 --- a/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md +++ b/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md @@ -112,7 +112,7 @@ connection.disconnect(); org.apache.iotdb iotdb-server - 1.3.0-SNAPSHOT + 2.0.0 ``` 2. 创建一个实现类,实现接口 `org.apache.iotdb.db.mqtt.protocol.PayloadFormatter` @@ -120,9 +120,9 @@ connection.disconnect(); ```java package org.apache.iotdb.mqtt.server; -import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TreeMessage; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -144,7 +144,7 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { // this is just an example, so we just generate some Messages directly for (int i = 0; i < 2; i++) { long ts = i; - Message message = new Message(); + TreeMessage message = new TreeMessage(); message.setDevice("d" + i); message.setTimestamp(ts); message.setMeasurements(Arrays.asList("s1", "s2")); @@ -159,7 +159,13 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: return "CustomizedJson"; } + + @Override + public String getType() { + return PayloadFormatter.TREE_TYPE; + } } + ``` 3. 修改项目中的 `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` 文件: 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。