MQTT是什么?
轻量级的,基于发布-订阅模式的消息传输协议,用于物联网、移动应用。适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。可实现高效通信。
MQTT协议特点
提供轻量级、可靠、安全、双向通信、连续有状态的物联网通信。
MQTT工作原理
MQTT 协议基于 TCP/IP 协议;
主要有以下几个概念,以读者通过邮局向报社订阅报纸为例说明:
- 代理服务器 (Broker) ,邮局,MQTT Broker(邮局)是消息的中转站,它负责接收发送者(发件人)发送的消息,并将它们传递给接收者(收件人)。就像邮局是信息传递的中介。
- 客户端(Client), 类比为报纸的发送者和接受者。MQTT Client(客户端)是消息的生产者或消费者,可以是报纸发送者(发件人)或报纸接收者(收件人)。
- 发布者(Publisher),报社, 就像发件人使用邮局来发送报纸。
- 订阅者(Subscriber),读者, 收件人使用邮局来接收抱着。
- 主题(Topic),类比为报纸,主题是消息的分类标识,用于指定消息的目的地。就像邮件需要一个地址来指定接收者,MQTT消息需要一个主题来指定接收者。
- 消息(Message), 类比为具体某期的报纸,消息是MQTT中的数据单元,可以包含文本、二进制数据或任何其他内容。
- 消息质量 (QoS) , 类比为邮寄报纸的速度和可靠性。MQTT定义了不同级别的服务质量(0、1和2)
- Retain (保留消息), 类比为报纸的存档版,保留消息是MQTT Broker(邮局)保留的最新一条消息,新的订阅者可以在订阅时接收到它,就像邮局可以保留最新的报纸供后来的订阅者查看。
MQTT协议应用
| 发送端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <iostream> #include "mqtt/client.h"
int main() { const std::string serverAddress = "tcp://broker.example.com:1883"; const std::string clientId = "mqtt_client_cpp"; const std::string topic = "test/topic"; const std::string payload = "Hello, MQTT from C++";
mqtt::async_client client(serverAddress, clientId);
mqtt::connect_options connOpts; connOpts.set_keep_alive_interval(20); connOpts.set_clean_session(true);
try { client.connect(connOpts)->wait(); std::cout << "Connected to MQTT broker." << std::endl;
mqtt::message_ptr pubmsg = mqtt::make_message(topic, payload); pubmsg->set_qos(1);
client.publish(pubmsg)->wait(); std::cout << "Message published." << std::endl;
client.disconnect()->wait(); std::cout << "Disconnected from MQTT broker." << std::endl; } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; return 1; }
return 0; }
|
| 接受端代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| #include <iostream> #include "mqtt/client.h"
class MyCallback : public virtual mqtt::callback { mqtt::async_client& client; void connection_lost(const std::string& cause) override { std::cout << "Connection lost: " << cause << std::endl;
try { client.reconnect(); std::cout << "Reconnected to MQTT broker." << std::endl; } catch (const mqtt::exception& exc) { std::cerr << "Reconnection error: " << exc.what() << std::endl; }std::cout << "Connection lost: " << cause << std::endl; }
void delivery_complete(const mqtt::delivery_token_ptr& token) override { std::cout << "Message delivery completed" << std::endl; }
void message_arrived(const std::string& topic, mqtt::message_ptr msg) override { std::cout << "Message received on topic: " << topic << std::endl; std::cout << "Message content: " << msg->to_string() << std::endl; }
public: MyCallback(mqtt::async_client& c) : client(c) {} };
int main() { const std::string serverAddress = "tcp://broker.example.com:1883"; const std::string clientId = "mqtt_client_cpp"; const std::string topic = "test/topic";
mqtt::async_client client(serverAddress, clientId); client.set_callback(MyCallback(client));
mqtt::connect_options connOpts; connOpts.set_keep_alive_interval(20); connOpts.set_clean_session(true);
try { client.connect(connOpts)->wait(); std::cout << "Connected to MQTT broker." << std::endl;
client.subscribe(topic, 1)->wait(); std::cout << "Subscribed to topic: " << topic << std::endl;
while (true) { } } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; return 1; }
return 0; }
|
接受端加入重连机制;
MQTT 协议入门:基础知识和快速教程
免费的公共 MQTT 服务器