1. 简介

MQTT(Message Queuing Telemetry Protocol)是一个基于发布/订阅的轻量级通讯协议,通常用于物联网中的网络通信,通常被设计于在低带宽、不稳定或高延迟的网络环境下进行数据传输,因此适用于连接设备之间的通信,特别是在资源有限的环境中

协议由IBM和Eurotech公司共同开发,最初的设计目的是为了解决工业远程监控和传感器通信中的数据传输问题。在1999年的时候,Andy Stanford-Clark和 Arlen Nipper需要一种通过卫星连接原油管道数据采集监控系统的协议,以最大限度地减少电池损耗和带宽,因此发明了MQTT协议

MQTT协议主要包含以下几个特点:

  • 轻量与高效:MQTT协议头非常小,只需要几个字节,与传统的HTTP协议对比,MQTT可以显著地减少数据包大小,提升传输速率
  • 发布 / 订阅:发送者和接收者通过主题(Topic)进行通信,可以实现一对多、多对多的消息传递
  • QoS(服务质量)级别:通过以下3个级别来确保消息的接收与发送
    • QoS 0:至多一次传输,消息可能会丢失
    • QoS 1:至少一次传输,以确保消息到达,但消息可能会重复
    • QoS 2:仅一次传输,确保消息到达且不会重复
  • 持久会话:MQTT协议允许客户端建立持久会话,以便在连接丢失后重新连接时能够恢复之前的订阅和消息传递状态

一般来说,MQTT协议用于低带宽和网络不稳定的物联网场景,在这样的场景下,可以使用极少的代码(也就是QoS相关的配置)为物联网提供可靠的消息服务,以下是一个MQTT使用的典型例子:

MQTT: publish / subscribe architecture

MQTT协议中包含了以下几个部分:客户端、消息和代理(Broker),如下所示:

  • 客户端:使用MQTT协议向代理服务器推送消息或拉取消息进行消费,包含了发布者和订阅者,Publisher 和 Subscriber都属于MQTT Client,这俩是相对的概念
  • 消息代理:中介服务器,也称之为Broker,负责接收所有的消息,过滤消息,确定哪个Client订阅了哪个Topic下的消息,并将消息发送给对应的Client,同时还负责保存会话数据,包括订阅和错过的消息,也负责客户端的身份验证与授权
  • 消息:消息中包含了主题(Topic)标识,可以理解成是消息的分类表示,用于区分不同的消息类型,同时还包含了有效载荷,也就是消息雷荣,可以是任何格式的数据

MQTT

MQTT和消息队列的区别:

  • MQTT会将消息推送给所有对应主题下的订阅者,每个订阅者有相同的负载(数据信息)
  • 消息队列中的一条消息只能够被一个客户端消费

2. Topic

在MQTT中,Topic是指Broker为每个连接的Cliet过滤消息的UTF-8字符串,Topic是一种分层的结构,可以由一个或者多个Topic组成,每个Topic由/进行分割,如下所示:

MQTT Topic

与传统的消息队列进行对比,MQTT的Topic十分轻量级,Client在发布或订阅前无需预先创建对应的Topic,Broker在接收每个Topic前也不需要进行初始化工作

Topic中包含了通配符的概念,通配符共有2种,分别是 +#,其中+代表的是单级通配符,而#代表的是多级通配符

当Client订阅了带有多级通配符的Topic时,不论Topic有多长多深,都会收到通配符号前的Topic内的所有消息,如果只将Topic定义为 #,则会收到所有消息

需要注意的是,多级通配符必须作为Topic中的最后一个字符,并且以/开头,比如sports/#就是一个正确的多级通配符示例

3. MQTT搭建

Tips:MQTT中间件有多种可选,本文选择的是EMQX,以下记录的是EMQX的快速搭建

通过docker-compose的方式快速搭建起EMQX作为Broker,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: '3'
services:
emqx1:
container_name: emqx1
image: emqx:latest
environment:
- "EMQX_NAME=emqx"
- "EMQX_HOST=node1.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io]"
networks:
emqx-bridge:
aliases:
- node1.emqx.io
ports:
- 18083:18083
- 1883:1883

networks:
emqx-bridge:
driver: bridge

通过docker-compose up -d即可快速启动EMQX服务,其中暴露了1883作为MQTT协议的通讯接口,18083作为UI管理页面的端口,可以通过http://<ip_address>:18083进入到EMQX的DashBoard(默认用户名为admin,密码为public),如下所示:

EMQX

4. 代码示例

Tips:以下代码中需要依赖 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public class MqttDemo {
public static void main(String[] args) throws MqttException {
MqttMessageSubscriber subscriber = MqttMessageSubscriber.initialize(new CustomizeMqttCallback());
subscriber.subscribe("test");

MqttMessagePublisher publisher = MqttMessagePublisher.getInstance();
publisher.publish("test", "Hello World!");
}

public static class MqttMessagePublisher implements Closeable {
private static final String BROKER_URL = System.getenv("BROKER_URL");
private static final String USERNAME = System.getenv("MQTT_USERNAME");
private static final String PASSWORD = System.getenv("MQTT_PASSWORD");
private static final int QOS = Integer.parseInt(Objects.requireNonNull(System.getenv("MQTT_QOS")));
private static final String CLIENT_ID = UUID.randomUUID().toString();
private final MqttClient client;

private MqttMessagePublisher() throws MqttException {
client = new MqttClient(BROKER_URL, CLIENT_ID);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(USERNAME);
mqttConnectOptions.setPassword(PASSWORD.toCharArray());
mqttConnectOptions.setCleanSession(true);
client.connect(mqttConnectOptions);
}

public void publish(String topic, String message) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(QOS);
client.publish(topic, mqttMessage);
System.out.println("Message published on topic: " + topic + ", message: " + message);
}

public static MqttMessagePublisher getInstance() {
return MqttMessagePublisherHolder.INSTANCE;
}

@Override
public void close() {
try {
if (client != null && client.isConnected()) {
client.disconnect();
}
} catch (MqttException e) {
e.printStackTrace();
}
}

private static class MqttMessagePublisherHolder {
private static final MqttMessagePublisher INSTANCE;

static {
try {
INSTANCE = new MqttMessagePublisher();
} catch (MqttException e) {
throw new ExceptionInInitializerError("Failed to create MqttMessagePublisher instance: " + e.getMessage());
}
}
}
}

public static class CustomizeMqttCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost to MQTT broker: " + cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("Message received on topic: " + topic + ", message: " + payload);
// Process the received message here
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Called when message delivery is complete (QoS 1 and QoS 2 messages)
}
}

public static class MqttMessageSubscriber {
private static final String BROKER_URL = System.getenv("BROKER_URL");
private static final String USERNAME = System.getenv("MQTT_USERNAME");
private static final String PASSWORD = System.getenv("MQTT_PASSWORD");
private static final int QOS = Integer.parseInt(Objects.requireNonNull(System.getenv("MQTT_QOS")));
private static final String CLIENT_ID = UUID.randomUUID().toString();
private final MqttClient client;

private MqttMessageSubscriber(MqttCallback callback) throws MqttException {
client = new MqttClient(BROKER_URL, CLIENT_ID);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(USERNAME);
mqttConnectOptions.setPassword(PASSWORD.toCharArray());
mqttConnectOptions.setCleanSession(true);
client.setCallback(callback);
client.connect(mqttConnectOptions);
}

public static MqttMessageSubscriber initialize(MqttCallback callback) throws MqttException {
return new MqttMessageSubscriber(callback);
}

public void subscribe(String topic) throws MqttException {
client.subscribe(topic, QOS);
}

public void close() {
try {
if (client != null && client.isConnected()) {
client.disconnect();
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}

其中 BROKER_URL 等相关参数需要通过环境变量进行配置,各个参数如下所示:

  • BROKER_URL:代表的是Broker的URL,一般是tcp:<ip_address>:1883
  • MQTT_USERNAME:用于认证的用户名,可前往EMQX DashBoard - Access Control - Authentication进行配置
  • MQTT_PASSWORD:用于认证的密码,同上
  • MQTT_QOS:需要应用的QoS质量等级

5. 相关链接