Skip to content

Commit 1ec80d1

Browse files
wenyanshi-123shiwenyan
andauthored
Modify MQTT interface related documents (#764)
Co-authored-by: shiwenyan <wenyan.shi@timecho.com>
1 parent 2f96d1c commit 1ec80d1

File tree

4 files changed

+339
-57
lines changed

4 files changed

+339
-57
lines changed

src/UserGuide/Master/Table/API/Programming-MQTT.md

Lines changed: 159 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,20 @@
2222

2323
## 1. Overview
2424

25-
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
25+
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
2626

2727
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
2828

29-
![](/img/mqtt-table-en-1.png)
29+
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
3030

3131

32-
## 2. Configuration
3332

34-
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.
35-
36-
| **Property** | **Description** | **Default** |
37-
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- |
38-
| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE |
39-
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
40-
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
41-
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
42-
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
43-
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
44-
45-
## 3. Write Protocol
33+
## 2. Built-in MQTT Service
34+
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
35+
and then write the data into storage immediately.
36+
The MQTT topic corresponds to IoTDB timeseries.The first segment of the MQTT topic (split by `/`) is used as the database name.The table name is derived from the `<measurement>` in the line protocol.
37+
The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the implementation of `PayloadFormatter` for table is `LinePayloadFormatter`.
38+
The following is the line protocol syntax of MQTT message payload and an example:
4639

4740
* Line Protocol Syntax
4841

@@ -56,7 +49,23 @@ By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IO
5649
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
5750
```
5851

59-
![](/img/mqtt-table-en-2.png)
52+
53+
54+
## 3. MQTT Configurations
55+
56+
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.
57+
58+
Configurations are as follows:
59+
60+
| **Property** | **Description** | **Default** |
61+
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |-------------|
62+
| `enable_mqtt_service` | Enable/ disable the MQTT service. | false |
63+
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
64+
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
65+
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
66+
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
67+
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
68+
6069

6170
## 4. Naming Conventions
6271

@@ -88,3 +97,137 @@ The table name is derived from the `<measurement>` in the line protocol.
8897
| 1`i32`<br>123`i32` | INT32 |
8998
| `"xxx"` | TEXT |
9099
| `t`,`T`,`true`,`True`,`TRUE`<br> `f`,`F`,`false`,`False`,`FALSE` | BOOLEAN |
100+
101+
102+
## 5. Coding Examples
103+
The following is an example which a mqtt client send messages to IoTDB server.
104+
105+
```java
106+
MQTT mqtt = new MQTT();
107+
mqtt.setHost("127.0.0.1", 1883);
108+
mqtt.setUserName("root");
109+
mqtt.setPassword("root");
110+
111+
BlockingConnection connection = mqtt.blockingConnection();
112+
connection.connect();
113+
114+
for (int i = 0; i < 10; i++) {
115+
String payload = String.format("test%d,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1", random.nextDouble());
116+
117+
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
118+
}
119+
120+
connection.disconnect();
121+
122+
```
123+
124+
## 6. Customize your MQTT Message Format
125+
126+
If you do not like the above Line format, you can customize your MQTT Message format by just writing several lines
127+
of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project.
128+
129+
Steps:
130+
1. Create a java project, and add dependency:
131+
```xml
132+
<dependency>
133+
<groupId>org.apache.iotdb</groupId>
134+
<artifactId>iotdb-server</artifactId>
135+
<version>2.0.4-SNAPSHOT</version>
136+
</dependency>
137+
```
138+
2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`
139+
e.g.,
140+
141+
```java
142+
package org.apache.iotdb.mqtt.server;
143+
144+
import io.netty.buffer.ByteBuf;
145+
import org.apache.iotdb.db.protocol.mqtt.Message;
146+
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
147+
148+
import java.nio.charset.StandardCharsets;
149+
import java.util.ArrayList;
150+
import java.util.Arrays;
151+
import java.util.List;
152+
153+
public class CustomizedLinePayloadFormatter implements PayloadFormatter {
154+
155+
@Override
156+
public List<Message> format(String topic, ByteBuf payload) {
157+
// Suppose the payload is a line format
158+
if (payload == null) {
159+
return null;
160+
}
161+
162+
String line = payload.toString(StandardCharsets.UTF_8);
163+
// parse data from the line and generate Messages and put them into List<Meesage> ret
164+
List<Message> ret = new ArrayList<>();
165+
// this is just an example, so we just generate some Messages directly
166+
for (int i = 0; i < 3; i++) {
167+
long ts = i;
168+
TableMessage message = new TableMessage();
169+
170+
// Parsing Database Name
171+
message.setDatabase("db" + i);
172+
173+
//Parsing Table Names
174+
message.setTable("t" + i);
175+
176+
// Parsing Tags
177+
List<String> tagKeys = new ArrayList<>();
178+
tagKeys.add("tag1" + i);
179+
tagKeys.add("tag2" + i);
180+
List<Object> tagValues = new ArrayList<>();
181+
tagValues.add("t_value1" + i);
182+
tagValues.add("t_value2" + i);
183+
message.setTagKeys(tagKeys);
184+
message.setTagValues(tagValues);
185+
186+
// Parsing Attributes
187+
List<String> attributeKeys = new ArrayList<>();
188+
List<Object> attributeValues = new ArrayList<>();
189+
attributeKeys.add("attr1" + i);
190+
attributeKeys.add("attr2" + i);
191+
attributeValues.add("a_value1" + i);
192+
attributeValues.add("a_value2" + i);
193+
message.setAttributeKeys(attributeKeys);
194+
message.setAttributeValues(attributeValues);
195+
196+
// Parsing Fields
197+
List<String> fields = Arrays.asList("field1" + i, "field2" + i);
198+
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
199+
List<Object> values = Arrays.asList("4.0" + i, "5.0" + i);
200+
message.setFields(fields);
201+
message.setDataTypes(dataTypes);
202+
message.setValues(values);
203+
204+
//// Parsing timestamp
205+
message.setTimestamp(ts);
206+
ret.add(message);
207+
}
208+
return ret;
209+
}
210+
211+
@Override
212+
public String getName() {
213+
// set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
214+
return "CustomizedLine";
215+
}
216+
}
217+
```
218+
3. modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`:
219+
clean the file and put your implementation class name into the file.
220+
In this example, the content is: `org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormatter`
221+
4. compile your implementation as a jar file: `mvn package -DskipTests`
222+
223+
224+
Then, in your server:
225+
1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder.
226+
2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`)
227+
3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation
228+
, in this example, the value is `CustomizedLine`
229+
4. Launch the IoTDB server.
230+
5. Now IoTDB will use your implementation to parse the MQTT message.
231+
232+
More: the message format can be anything you want. For example, if it is a binary format,
233+
just use `payload.forEachByte()` or `payload.array` to get bytes content.

src/UserGuide/Master/Tree/API/Programming-MQTT.md

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,16 @@
2020
-->
2121
# MQTT Protocol
2222

23-
[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
24-
It was designed as an extremely lightweight publish/subscribe messaging transport.
25-
It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
23+
## 1. Overview
2624

27-
IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
28-
IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.
25+
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
2926

30-
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
27+
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
3128

29+
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
3230

33-
## 1. Built-in MQTT Service
31+
[Programming-MQTT.md](Programming-MQTT.md)
32+
## 2. Built-in MQTT Service
3433
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
3534
and then write the data into storage immediately.
3635
The MQTT topic corresponds to IoTDB timeseries.
@@ -58,22 +57,22 @@ or json array of the above two.
5857

5958
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png">
6059

61-
## 2. MQTT Configurations
60+
## 3. MQTT Configurations
6261
The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties` by default.
6362

6463
Configurations are as follows:
6564

66-
| NAME | DESCRIPTION | DEFAULT |
65+
| **Property** | DESCRIPTION | DEFAULT |
6766
| ------------- |:-------------:|:------:|
68-
| enable_mqtt_service | whether to enable the mqtt service | false |
69-
| mqtt_host | the mqtt service binding host | 127.0.0.1 |
70-
| mqtt_port | the mqtt service binding port | 1883 |
71-
| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
72-
| mqtt_payload_formatter | the mqtt message payload formatter | json |
73-
| mqtt_max_message_size | the max mqtt message size in byte| 1048576 |
67+
| `enable_mqtt_service` | whether to enable the mqtt service | false |
68+
| `mqtt_host` | the mqtt service binding host | 127.0.0.1 |
69+
| `mqtt_port` | the mqtt service binding port | 1883 |
70+
| `mqtt_handler_pool_size` | the handler pool size for handing the mqtt messages | 1 |
71+
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
72+
| `mqtt_max_message_size` | the max mqtt message size in byte| 1048576 |
7473

7574

76-
## 3. Coding Examples
75+
## 4. Coding Examples
7776
The following is an example which a mqtt client send messages to IoTDB server.
7877

7978
```java
@@ -101,7 +100,7 @@ connection.disconnect();
101100

102101
```
103102

104-
## 4. Customize your MQTT Message Format
103+
## 5. Customize your MQTT Message Format
105104

106105
If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines
107106
of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project.
@@ -112,7 +111,7 @@ Steps:
112111
<dependency>
113112
<groupId>org.apache.iotdb</groupId>
114113
<artifactId>iotdb-server</artifactId>
115-
<version>1.1.0-SNAPSHOT</version>
114+
<version>2.0.4-SNAPSHOT</version>
116115
</dependency>
117116
```
118117
2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`
@@ -133,7 +132,7 @@ import java.util.List;
133132
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
134133

135134
@Override
136-
public List<Message> format(ByteBuf payload) {
135+
public List<Message> format(String topic, ByteBuf payload) {
137136
// Suppose the payload is a json format
138137
if (payload == null) {
139138
return null;

0 commit comments

Comments
 (0)