MQTT的基础认识及简单应用

MQTT是什么?

轻量级的,基于发布-订阅模式的消息传输协议,用于物联网、移动应用。适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。可实现高效通信。

MQTT协议特点

提供轻量级、可靠、安全、双向通信、连续有状态的物联网通信。

MQTT工作原理

MQTT 协议基于 TCP/IP 协议;
主要有以下几个概念,以读者通过邮局向报社订阅报纸为例说明:

  • 代理服务器 (Broker) ,邮局,MQTT Broker(邮局)是消息的中转站,它负责接收发送者(发件人)发送的消息,并将它们传递给接收者(收件人)。就像邮局是信息传递的中介。
  • 客户端(Client), 类比为报纸的发送者和接受者。MQTT Client(客户端)是消息的生产者或消费者,可以是报纸发送者(发件人)或报纸接收者(收件人)。
    • 发布者(Publisher),报社, 就像发件人使用邮局来发送报纸。
    • 订阅者(Subscriber),读者, 收件人使用邮局来接收抱着。
  • 主题(Topic),类比为报纸,主题是消息的分类标识,用于指定消息的目的地。就像邮件需要一个地址来指定接收者,MQTT消息需要一个主题来指定接收者。
  • 消息(Message), 类比为具体某期的报纸,消息是MQTT中的数据单元,可以包含文本、二进制数据或任何其他内容。
  • 消息质量 (QoS) , 类比为邮寄报纸的速度和可靠性。MQTT定义了不同级别的服务质量(0、1和2)
  • Retain (保留消息), 类比为报纸的存档版,保留消息是MQTT Broker(邮局)保留的最新一条消息,新的订阅者可以在订阅时接收到它,就像邮局可以保留最新的报纸供后来的订阅者查看。

MQTT协议应用

| 发送端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include "mqtt/client.h"

int main() {
const std::string serverAddress = "tcp://broker.example.com:1883"; // MQTT代理服务器地址
const std::string clientId = "mqtt_client_cpp"; // 客户端ID
const std::string topic = "test/topic"; // MQTT主题
const std::string payload = "Hello, MQTT from C++"; // 消息内容

mqtt::async_client client(serverAddress, clientId);

mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);

try {
client.connect(connOpts)->wait();
std::cout << "Connected to MQTT broker." << std::endl;

mqtt::message_ptr pubmsg = mqtt::make_message(topic, payload);
pubmsg->set_qos(1);

client.publish(pubmsg)->wait();
std::cout << "Message published." << std::endl;

client.disconnect()->wait();
std::cout << "Disconnected from MQTT broker." << std::endl;
} catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
return 1;
}

return 0;
}


| 接受端代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <iostream>
#include "mqtt/client.h"

class MyCallback : public virtual mqtt::callback {
mqtt::async_client& client;
void connection_lost(const std::string& cause) override {
std::cout << "Connection lost: " << cause << std::endl;

// 在连接丢失后尝试重新连接
try {
client.reconnect();
std::cout << "Reconnected to MQTT broker." << std::endl;
} catch (const mqtt::exception& exc) {
std::cerr << "Reconnection error: " << exc.what() << std::endl;
}std::cout << "Connection lost: " << cause << std::endl;
}

void delivery_complete(const mqtt::delivery_token_ptr& token) override {
std::cout << "Message delivery completed" << std::endl;
}

void message_arrived(const std::string& topic, mqtt::message_ptr msg) override {
std::cout << "Message received on topic: " << topic << std::endl;
std::cout << "Message content: " << msg->to_string() << std::endl;
}

public:
MyCallback(mqtt::async_client& c) : client(c) {}
};

int main() {
const std::string serverAddress = "tcp://broker.example.com:1883"; // MQTT代理服务器地址
const std::string clientId = "mqtt_client_cpp"; // 客户端ID
const std::string topic = "test/topic"; // 订阅的MQTT主题

mqtt::async_client client(serverAddress, clientId);
client.set_callback(MyCallback(client));

mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);

try {
client.connect(connOpts)->wait();
std::cout << "Connected to MQTT broker." << std::endl;

client.subscribe(topic, 1)->wait();
std::cout << "Subscribed to topic: " << topic << std::endl;

while (true) {
// 在此处可以处理其他逻辑
}
} catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
return 1;
}

return 0;
}

接受端加入重连机制;

MQTT 协议入门:基础知识和快速教程

免费的公共 MQTT 服务器