引言
MQTT(Message Queuing Telemetry Transport)是物联网领域最流行的轻量级消息协议。它基于发布/订阅模式,具有低功耗、低带宽、高可靠的特点,非常适合资源受限的嵌入式设备。
本文从协议原理到实战项目,带你全面掌握嵌入式 MQTT 开发。
MQTT 协议核心概念
1.1 架构模型
核心组件:
- Broker:消息中转站(EMQX、Mosquitto、HiveMQ)
- Publisher:消息发布者(传感器、设备)
- Subscriber:消息订阅者(App、云端、网关)
- Topic:消息主题(如
home/living-room/temp)
1.2 QoS 服务质量等级
| QoS | 名称 | 说明 | 应用场景 |
|---|---|---|---|
| 0 | At most once | 最多一次,不保证到达 | 传感器周期性数据 |
| 1 | At least once | 至少一次,可能重复 | 控制指令、状态更新 |
| 2 | Exactly once | 恰好一次,最可靠 | 计费、关键告警 |
1.3 Topic 命名规范
✅ 推荐格式:
| Topic 示例 | 说明 |
|---|---|
| home/living-room/temperature | 设备类型/位置/参数 |
| factory/line1/motor/speed | 工厂/产线/设备/参数 |
| agric/greenhouse/01/humidity | 行业/区域/编号/参数 |
❌ 避免:
| Topic 示例 | 问题 |
|---|---|
| temp | 太模糊,无法扩展 |
| home/temp | 缺少层级 |
| sensor_001/data | 语义不明确 |
开发环境搭建
2.1 Broker 选择
| Broker | 语言 | 特点 | 适用场景 |
|---|---|---|---|
| EMQX | Erlang | 高性能、分布式、支持百万连接 | 企业级、云平台 |
| Mosquitto | C | 轻量、稳定、易部署 | 小型项目、本地测试 |
| HiveMQ | Java | 企业级、扩展性强 | 大型工业应用 |
2.2 使用 Docker 快速部署 EMQX
# 启动 EMQX Broker
docker run -d --name emqx \
-p 1883:1883 \
-p 8083:8083 \
-p 8084:8084 \
-p 8883:8883 \
-p 18083:18083 \
emqx/emqx:5.0
# 访问管理控制台:http://localhost:18083
# 默认账号:admin / public
2.3 客户端库选择
| 平台 | 推荐库 | 特点 |
|---|---|---|
| ESP32 (Arduino) | PubSubClient | 轻量、成熟 |
| ESP32 (ESP-IDF) | esp-mqtt | 官方支持、功能全 |
| STM32 HAL | MQTT-C | 纯 C、可移植 |
| Rust embedded | rumqtt | 异步、安全 |
| Python | paho-mqtt | 功能丰富 |
ESP32 MQTT 实战
3.1 硬件连接
| 模块 | ESP32 引脚 | 说明 |
|---|---|---|
| DHT22 温湿度 | GPIO 4 | 数字传感器 |
| LED 指示灯 | GPIO 2 | 状态指示 |
| 按键 | GPIO 0 | 手动触发 |
3.2 Arduino 代码示例
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
// WiFi 配置
const char* ssid = "YourWiFi";
const char* password = "YourPassword";
// MQTT 配置
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_user = "admin";
const char* mqtt_pass = "public";
// 主题配置
const char* topic_temp = "home/living-room/temperature";
const char* topic_humid = "home/living-room/humidity";
const char* topic_status = "home/living-room/status";
// 传感器配置
#define DHTPIN 4
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);
WiFiClient espClient;
PubSubClient client(espClient);
// 回调函数(接收订阅消息)
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("收到消息 [");
Serial.print(topic);
Serial.print("]: ");
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
// 处理控制指令
if (String(topic) == "home/living-room/led/control") {
if (message == "on") {
digitalWrite(2, HIGH);
} else if (message == "off") {
digitalWrite(2, LOW);
}
}
}
// 重连逻辑
void reconnect() {
while (!client.connected()) {
Serial.print("尝试 MQTT 连接...");
String clientId = "ESP32-Client-" + String(random(0xffff), HEX);
if (client.connect(clientId.c_str(), mqtt_user, mqtt_pass)) {
Serial.println("已连接!");
// 订阅控制主题
client.subscribe("home/living-room/led/control");
// 发布在线状态
client.publish(topic_status, "online");
} else {
Serial.print("失败,rc=");
Serial.print(client.state());
Serial.println(" 5 秒后重试");
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
dht.begin();
pinMode(2, OUTPUT);
// 连接 WiFi
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWiFi 已连接");
// 配置 MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
// 读取传感器数据
float temp = dht.readTemperature();
float humid = dht.readHumidity();
if (!isnan(temp) && !isnan(humid)) {
// 发布温湿度数据(QoS=1,保留消息)
String tempStr = String(temp, 1);
String humidStr = String(humid, 1);
client.publish(topic_temp, tempStr.c_str(), true);
client.publish(topic_humid, humidStr.c_str(), true);
Serial.println("已发布:温度=" + tempStr + "°C, 湿度=" + humidStr + "%");
}
delay(10000); // 每 10 秒发布一次
}
3.3 使用 ESP-IDF 官方 MQTT 库
#include "mqtt_client.h"
#include "esp_log.h"
static const char *TAG = "MQTT_EXAMPLE";
esp_mqtt_client_handle_t client;
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT 已连接");
esp_mqtt_client_subscribe(client, "home/living-room/led/control", 1);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT 已断开");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "订阅成功,msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "收到主题:%.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, "数据:%.*s", event->data_len, event->data);
break;
default:
break;
}
return ESP_OK;
}
void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = "mqtt://192.168.1.100:1883",
.credentials.username = "admin",
.credentials.authentication.password = "public",
};
client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler_cb, NULL);
esp_mqtt_client_start(client);
}
STM32 + MQTT 实战
4.1 使用 MQTT-C 库
#include "mqtt.h"
#include <stdio.h>
#include <string.h>
#include "stm32f4xx_hal.h"
// 网络层(基于 TCP)
int net_read(void *context, unsigned char *buf, int count) {
return HAL_TCP_Read(context, buf, count, 1000);
}
int net_write(void *context, const unsigned char *buf, int count) {
return HAL_TCP_Write(context, buf, count, 1000);
}
// MQTT 客户端
struct mqtt_client mqtt_client;
uint8_t sendbuf[512];
uint8_t recvbuf[512];
void mqtt_task(void *argument) {
struct mqtt_response response;
// 初始化
mqtt_init(&mqtt_client, tcp_socket,
sendbuf, sizeof(sendbuf),
recvbuf, sizeof(recvbuf),
publish_callback);
// 连接 Broker
mqtt_connect(&mqtt_client, "STM32-Client",
NULL, NULL, 0, NULL, NULL,
MQTT_CONNECT_CLEAN_SESSION,
60);
while (1) {
mqtt_sync(&mqtt_client);
// 发布传感器数据
char payload[32];
sprintf(payload, "%.2f", read_sensor());
mqtt_publish(&mqtt_client, "factory/sensor/01",
payload, strlen(payload), MQTT_QOS_1);
HAL_Delay(5000);
}
}
4.2 基于 FreeRTOS 的 MQTT 任务
// 发布任务
void mqtt_publish_task(void *argument) {
for (;;) {
float temp = read_temperature();
float humid = read_humidity();
char json_msg[128];
sprintf(json_msg, "{\"temp\":%.2f,\"humid\":%.2f}", temp, humid);
mqtt_publish(&mqtt_client, "home/sensor/data",
json_msg, strlen(json_msg), MQTT_QOS_1);
vTaskDelay(pdMS_TO_TICKS(10000));
}
}
// 订阅处理任务
void mqtt_subscribe_task(void *argument) {
for (;;) {
struct mqtt_response *resp = mqtt_response_queue_poll();
if (resp != NULL) {
if (resp->fixed_header.control_type == MQTT_CONTROL_PUBLISH) {
handle_incoming_message(resp->publish.topic_name,
resp->publish.payload,
resp->publish.payload_size);
}
}
vTaskDelay(pdMS_TO_TICKS(100));
}
}
高级应用
5.1 TLS/SSL 加密连接
#include <WiFiClientSecure.h>
WiFiClientSecure espClient;
PubSubClient client(espClient);
void setup() {
// 配置 CA 证书
espClient.setCACert(root_ca_cert);
// 配置客户端证书(双向认证)
espClient.setCertificate(client_cert);
espClient.setPrivateKey(client_key);
client.setServer("mqtt.example.com", 8883);
}
5.2 遗嘱消息(Last Will)
// 设备异常离线时自动发布遗嘱消息
client.setWill(topic_status, "offline", true, 1);
client.connect("ESP32-Client", mqtt_user, mqtt_pass);
5.3 Retain 保留消息
// 发布保留消息(新订阅者立即收到最新值)
client.publish(topic_temp, "25.5", true, 1);
// 清除保留消息
client.publish(topic_temp, "", true, 1);
5.4 主题通配符
# 通配符
匹配任意层级
home/# → home/living-room/temp, home/bedroom/humidity
+ 通配符
匹配单层
home/+/temp → home/living-room/temp, home/bedroom/temp
完整项目:智能家居传感器节点
6.1 系统架构
6.2 主题设计
📡 传感器数据
home/{room}/sensor/{type}
→ home/living-room/sensor/temperature
→ home/bedroom/sensor/humidity
→ home/bedroom/sensor/humidity
📊 设备状态
home/{device}/status
→ home/light/status
→ home/ac/status
→ home/ac/status
🎮 控制指令
home/{device}/control/{action}
→ home/light/control/on
→ home/ac/control/set_temp
→ home/ac/control/set_temp
6.3 数据格式(JSON)
// 传感器上报
{
"device_id": "esp32-living-01",
"timestamp": 1712012400,
"data": {
"temperature": 25.5,
"humidity": 60.2,
"light": 850
}
}
// 控制指令
{
"command": "set_temperature",
"value": 26,
"mode": "cool"
}
6.4 Node-RED 可视化
# 安装 Node-RED
docker run -d --name nodered \
-p 1880:1880 \
-v node_red_data:/data \
nodered/node-red
# 安装 MQTT 节点
npm install node-red-contrib-mqtt
性能优化
7.1 降低功耗
// ESP32 深度睡眠 + MQTT
#include <esp_deep_sleep.h>
void loop() {
// 读取并发布数据
publish_sensor_data();
// 进入深度睡眠(10 分钟)
esp_deep_sleep(10 * 60 * 1000000);
}
7.2 减少流量
// 变化时发布(而非定时)
float last_temp = 0;
float current_temp = read_temperature();
if (abs(current_temp - last_temp) > 0.5) {
client.publish(topic_temp, String(current_temp).c_str());
last_temp = current_temp;
}
7.3 批量发布
// 多条消息打包
String batch_msg = "{\"sensors\":[";
for (int i = 0; i < sensor_count; i++) {
batch_msg += sensors[i].toJson();
if (i < sensor_count - 1) batch_msg += ",";
}
batch_msg += "]}";
client.publish("factory/batch", batch_msg.c_str());
常见问题
Q1: 连接频繁断开
原因:Keep Alive 时间过短或网络不稳定。
解决:
client.setKeepAlive(120); // 延长到 120 秒
client.setSocketTimeout(5000);
Q2: 消息丢失
原因:QoS 等级过低或 Broker 配置问题。
解决:
- 关键数据使用 QoS 1 或 QoS 2
- 启用 Broker 消息持久化
- 客户端实现本地消息队列
Q3: 内存不足
原因:缓冲区过大或连接数过多。
解决:
// 减小缓冲区
PubSubClient client(espClient);
client.setBufferSize(256); // 默认 128
总结
嵌入式 MQTT 开发要点:
- 选择合适 Broker:EMQX(企业级)、Mosquitto(轻量)
- 理解 QoS 等级:根据场景选择 0/1/2
- 规范 Topic 命名:层级清晰、语义明确
- 实现重连机制:确保连接可靠性
- 优化功耗流量:深度睡眠、变化发布
掌握 MQTT,让你的嵌入式设备轻松接入物联网!
本文基于 MQTT 3.1.1/5.0 协议和 2026 年主流嵌入式平台编写。
参考资料
- MQTT 官方文档:https://mqtt.org/documentation
- EMQX 文档:https://docs.emqx.com/
- PubSubClient 库:https://pubsubclient.knolleary.net/
- ESP-IDF MQTT:https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/protocols/mqtt.html