引言

MQTT(Message Queuing Telemetry Transport)是物联网领域最流行的轻量级消息协议。它基于发布/订阅模式,具有低功耗、低带宽、高可靠的特点,非常适合资源受限的嵌入式设备。

本文从协议原理到实战项目,带你全面掌握嵌入式 MQTT 开发。

MQTT 协议核心概念

1.1 架构模型

MQTT BrokerEMQX / MosquittoESP32 传感器发布:temp/humiditySTM32 控制器发布:status/alert手机 App订阅:temp/humidity云端服务订阅:所有主题本地网关订阅:status/alert
MQTT 发布/订阅架构

核心组件

  • Broker:消息中转站(EMQX、Mosquitto、HiveMQ)
  • Publisher:消息发布者(传感器、设备)
  • Subscriber:消息订阅者(App、云端、网关)
  • Topic:消息主题(如 home/living-room/temp

1.2 QoS 服务质量等级

QoS名称说明应用场景
0At most once最多一次,不保证到达传感器周期性数据
1At least once至少一次,可能重复控制指令、状态更新
2Exactly once恰好一次,最可靠计费、关键告警

1.3 Topic 命名规范

✅ 推荐格式:
Topic 示例说明
home/living-room/temperature设备类型/位置/参数
factory/line1/motor/speed工厂/产线/设备/参数
agric/greenhouse/01/humidity行业/区域/编号/参数
❌ 避免:
Topic 示例问题
temp太模糊,无法扩展
home/temp缺少层级
sensor_001/data语义不明确

开发环境搭建

2.1 Broker 选择

Broker语言特点适用场景
EMQXErlang高性能、分布式、支持百万连接企业级、云平台
MosquittoC轻量、稳定、易部署小型项目、本地测试
HiveMQJava企业级、扩展性强大型工业应用

2.2 使用 Docker 快速部署 EMQX

# 启动 EMQX Broker
docker run -d --name emqx \
  -p 1883:1883 \
  -p 8083:8083 \
  -p 8084:8084 \
  -p 8883:8883 \
  -p 18083:18083 \
  emqx/emqx:5.0

# 访问管理控制台:http://localhost:18083
# 默认账号:admin / public

2.3 客户端库选择

平台推荐库特点
ESP32 (Arduino)PubSubClient轻量、成熟
ESP32 (ESP-IDF)esp-mqtt官方支持、功能全
STM32 HALMQTT-C纯 C、可移植
Rust embeddedrumqtt异步、安全
Pythonpaho-mqtt功能丰富

ESP32 MQTT 实战

3.1 硬件连接

模块ESP32 引脚说明
DHT22 温湿度GPIO 4数字传感器
LED 指示灯GPIO 2状态指示
按键GPIO 0手动触发

3.2 Arduino 代码示例

#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>

// WiFi 配置
const char* ssid = "YourWiFi";
const char* password = "YourPassword";

// MQTT 配置
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_user = "admin";
const char* mqtt_pass = "public";

// 主题配置
const char* topic_temp = "home/living-room/temperature";
const char* topic_humid = "home/living-room/humidity";
const char* topic_status = "home/living-room/status";

// 传感器配置
#define DHTPIN 4
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);

WiFiClient espClient;
PubSubClient client(espClient);

// 回调函数(接收订阅消息)
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("收到消息 [");
  Serial.print(topic);
  Serial.print("]: ");
  
  String message;
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.println(message);
  
  // 处理控制指令
  if (String(topic) == "home/living-room/led/control") {
    if (message == "on") {
      digitalWrite(2, HIGH);
    } else if (message == "off") {
      digitalWrite(2, LOW);
    }
  }
}

// 重连逻辑
void reconnect() {
  while (!client.connected()) {
    Serial.print("尝试 MQTT 连接...");
    
    String clientId = "ESP32-Client-" + String(random(0xffff), HEX);
    if (client.connect(clientId.c_str(), mqtt_user, mqtt_pass)) {
      Serial.println("已连接!");
      
      // 订阅控制主题
      client.subscribe("home/living-room/led/control");
      
      // 发布在线状态
      client.publish(topic_status, "online");
    } else {
      Serial.print("失败,rc=");
      Serial.print(client.state());
      Serial.println(" 5 秒后重试");
      delay(5000);
    }
  }
}

void setup() {
  Serial.begin(115200);
  dht.begin();
  
  pinMode(2, OUTPUT);
  
  // 连接 WiFi
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("\nWiFi 已连接");
  
  // 配置 MQTT
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
  
  // 读取传感器数据
  float temp = dht.readTemperature();
  float humid = dht.readHumidity();
  
  if (!isnan(temp) && !isnan(humid)) {
    // 发布温湿度数据(QoS=1,保留消息)
    String tempStr = String(temp, 1);
    String humidStr = String(humid, 1);
    
    client.publish(topic_temp, tempStr.c_str(), true);
    client.publish(topic_humid, humidStr.c_str(), true);
    
    Serial.println("已发布:温度=" + tempStr + "°C, 湿度=" + humidStr + "%");
  }
  
  delay(10000);  // 每 10 秒发布一次
}

3.3 使用 ESP-IDF 官方 MQTT 库

#include "mqtt_client.h"
#include "esp_log.h"

static const char *TAG = "MQTT_EXAMPLE";

esp_mqtt_client_handle_t client;

static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    switch (event->event_id) {
        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT 已连接");
            esp_mqtt_client_subscribe(client, "home/living-room/led/control", 1);
            break;
        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGI(TAG, "MQTT 已断开");
            break;
        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGI(TAG, "订阅成功,msg_id=%d", event->msg_id);
            break;
        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "收到主题:%.*s", event->topic_len, event->topic);
            ESP_LOGI(TAG, "数据:%.*s", event->data_len, event->data);
            break;
        default:
            break;
    }
    return ESP_OK;
}

void mqtt_app_start(void)
{
    esp_mqtt_client_config_t mqtt_cfg = {
        .broker.address.uri = "mqtt://192.168.1.100:1883",
        .credentials.username = "admin",
        .credentials.authentication.password = "public",
    };
    
    client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler_cb, NULL);
    esp_mqtt_client_start(client);
}

STM32 + MQTT 实战

4.1 使用 MQTT-C 库

#include "mqtt.h"
#include <stdio.h>
#include <string.h>
#include "stm32f4xx_hal.h"

// 网络层(基于 TCP)
int net_read(void *context, unsigned char *buf, int count) {
    return HAL_TCP_Read(context, buf, count, 1000);
}

int net_write(void *context, const unsigned char *buf, int count) {
    return HAL_TCP_Write(context, buf, count, 1000);
}

// MQTT 客户端
struct mqtt_client mqtt_client;
uint8_t sendbuf[512];
uint8_t recvbuf[512];

void mqtt_task(void *argument) {
    struct mqtt_response response;
    
    // 初始化
    mqtt_init(&mqtt_client, tcp_socket,
              sendbuf, sizeof(sendbuf),
              recvbuf, sizeof(recvbuf),
              publish_callback);
    
    // 连接 Broker
    mqtt_connect(&mqtt_client, "STM32-Client",
                 NULL, NULL, 0, NULL, NULL,
                 MQTT_CONNECT_CLEAN_SESSION,
                 60);
    
    while (1) {
        mqtt_sync(&mqtt_client);
        
        // 发布传感器数据
        char payload[32];
        sprintf(payload, "%.2f", read_sensor());
        mqtt_publish(&mqtt_client, "factory/sensor/01",
                     payload, strlen(payload), MQTT_QOS_1);
        
        HAL_Delay(5000);
    }
}

4.2 基于 FreeRTOS 的 MQTT 任务

// 发布任务
void mqtt_publish_task(void *argument) {
    for (;;) {
        float temp = read_temperature();
        float humid = read_humidity();
        
        char json_msg[128];
        sprintf(json_msg, "{\"temp\":%.2f,\"humid\":%.2f}", temp, humid);
        
        mqtt_publish(&mqtt_client, "home/sensor/data",
                     json_msg, strlen(json_msg), MQTT_QOS_1);
        
        vTaskDelay(pdMS_TO_TICKS(10000));
    }
}

// 订阅处理任务
void mqtt_subscribe_task(void *argument) {
    for (;;) {
        struct mqtt_response *resp = mqtt_response_queue_poll();
        if (resp != NULL) {
            if (resp->fixed_header.control_type == MQTT_CONTROL_PUBLISH) {
                handle_incoming_message(resp->publish.topic_name,
                                       resp->publish.payload,
                                       resp->publish.payload_size);
            }
        }
        vTaskDelay(pdMS_TO_TICKS(100));
    }
}

高级应用

5.1 TLS/SSL 加密连接

#include <WiFiClientSecure.h>

WiFiClientSecure espClient;
PubSubClient client(espClient);

void setup() {
  // 配置 CA 证书
  espClient.setCACert(root_ca_cert);
  
  // 配置客户端证书(双向认证)
  espClient.setCertificate(client_cert);
  espClient.setPrivateKey(client_key);
  
  client.setServer("mqtt.example.com", 8883);
}

5.2 遗嘱消息(Last Will)

// 设备异常离线时自动发布遗嘱消息
client.setWill(topic_status, "offline", true, 1);

client.connect("ESP32-Client", mqtt_user, mqtt_pass);

5.3 Retain 保留消息

// 发布保留消息(新订阅者立即收到最新值)
client.publish(topic_temp, "25.5", true, 1);

// 清除保留消息
client.publish(topic_temp, "", true, 1);

5.4 主题通配符

# 通配符 匹配任意层级
home/#  →  home/living-room/temp, home/bedroom/humidity
+ 通配符 匹配单层
home/+/temp  →  home/living-room/temp, home/bedroom/temp

完整项目:智能家居传感器节点

6.1 系统架构

ESP32 温湿度传感器DHT22 + 光照传感器ESP32 门窗传感器磁簧开关 + 震动检测ESP32 智能插座继电器 + 电量计量EMQX Broker本地部署 / 云端手机 AppiOS / AndroidWeb 管理后台Vue3 + Element Plus云端服务数据持久化 + AI 分析
智能家居 MQTT 系统架构

6.2 主题设计

📡 传感器数据
home/{room}/sensor/{type}
→ home/living-room/sensor/temperature
→ home/bedroom/sensor/humidity
📊 设备状态
home/{device}/status
→ home/light/status
→ home/ac/status
🎮 控制指令
home/{device}/control/{action}
→ home/light/control/on
→ home/ac/control/set_temp

6.3 数据格式(JSON)

// 传感器上报
{
  "device_id": "esp32-living-01",
  "timestamp": 1712012400,
  "data": {
    "temperature": 25.5,
    "humidity": 60.2,
    "light": 850
  }
}

// 控制指令
{
  "command": "set_temperature",
  "value": 26,
  "mode": "cool"
}

6.4 Node-RED 可视化

# 安装 Node-RED
docker run -d --name nodered \
  -p 1880:1880 \
  -v node_red_data:/data \
  nodered/node-red

# 安装 MQTT 节点
npm install node-red-contrib-mqtt

性能优化

7.1 降低功耗

// ESP32 深度睡眠 + MQTT
#include <esp_deep_sleep.h>

void loop() {
  // 读取并发布数据
  publish_sensor_data();
  
  // 进入深度睡眠(10 分钟)
  esp_deep_sleep(10 * 60 * 1000000);
}

7.2 减少流量

// 变化时发布(而非定时)
float last_temp = 0;
float current_temp = read_temperature();

if (abs(current_temp - last_temp) > 0.5) {
  client.publish(topic_temp, String(current_temp).c_str());
  last_temp = current_temp;
}

7.3 批量发布

// 多条消息打包
String batch_msg = "{\"sensors\":[";
for (int i = 0; i < sensor_count; i++) {
  batch_msg += sensors[i].toJson();
  if (i < sensor_count - 1) batch_msg += ",";
}
batch_msg += "]}";

client.publish("factory/batch", batch_msg.c_str());

常见问题

Q1: 连接频繁断开

原因:Keep Alive 时间过短或网络不稳定。

解决

client.setKeepAlive(120);  // 延长到 120 秒
client.setSocketTimeout(5000);

Q2: 消息丢失

原因:QoS 等级过低或 Broker 配置问题。

解决

  • 关键数据使用 QoS 1 或 QoS 2
  • 启用 Broker 消息持久化
  • 客户端实现本地消息队列

Q3: 内存不足

原因:缓冲区过大或连接数过多。

解决

// 减小缓冲区
PubSubClient client(espClient);
client.setBufferSize(256);  // 默认 128

总结

嵌入式 MQTT 开发要点:

  1. 选择合适 Broker:EMQX(企业级)、Mosquitto(轻量)
  2. 理解 QoS 等级:根据场景选择 0/1/2
  3. 规范 Topic 命名:层级清晰、语义明确
  4. 实现重连机制:确保连接可靠性
  5. 优化功耗流量:深度睡眠、变化发布

掌握 MQTT,让你的嵌入式设备轻松接入物联网!


本文基于 MQTT 3.1.1/5.0 协议和 2026 年主流嵌入式平台编写。

参考资料

  • MQTT 官方文档:https://mqtt.org/documentation
  • EMQX 文档:https://docs.emqx.com/
  • PubSubClient 库:https://pubsubclient.knolleary.net/
  • ESP-IDF MQTT:https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/protocols/mqtt.html