Skip to content

MQTT实现指南

本文档提供FDP-P2P在MQTT传输层上的实现指南。

连接配置

基本参数

参数推荐值说明
MQTT版本3.1.1 或 5.05.0支持更多特性
Clean Sessionfalse启用持久会话
QoS1至少一次送达
Client ID唯一且稳定用于恢复持久会话

连接示例

javascript
const client = mqtt.connect('mqtt://broker.example.com', {
  clientId: 'device-001',
  username: 'device-001',
  password: 'secret',
  clean: false,  // 持久会话
});

主题映射

FDP-P2P主题与MQTT主题一一对应:

FDP主题                    → MQTT主题
nodes/{username}/pending   → nodes/{username}/pending
nodes/{username}/ack       → nodes/{username}/ack
nodes/{username}/progress  → nodes/{username}/progress
nodes/{username}/complete  → nodes/{username}/complete
nodes/{username}/failed    → nodes/{username}/failed
nodes/{username}/status    → nodes/{username}/status

ACL配置

安全原则

  1. 节点只能订阅自己的主题
  2. 节点可以发布到任何节点的主题
  3. 状态主题对所有节点可读

EMQX ACL规则

erlang
%% 允许节点订阅自己的所有主题
{allow, all, subscribe, ["nodes/${username}/+"]}.

%% 允许节点发布到任何节点的 pending 主题
{allow, all, publish, ["nodes/+/pending"]}.

%% 允许节点发布到任何节点的响应主题
{allow, all, publish, ["nodes/+/ack"]}.
{allow, all, publish, ["nodes/+/progress"]}.
{allow, all, publish, ["nodes/+/complete"]}.
{allow, all, publish, ["nodes/+/failed"]}.

%% 允许节点发布自己的状态
{allow, all, publish, ["nodes/${username}/status"]}.

%% 允许任何人订阅任何人的状态
{allow, all, subscribe, ["nodes/+/status"]}.

%% 拒绝其他所有操作
{deny, all}.

Mosquitto ACL规则

# 用户只能订阅自己的主题
pattern read nodes/%u/#

# 用户可以发布到任何节点
pattern write nodes/+/pending
pattern write nodes/+/ack
pattern write nodes/+/progress
pattern write nodes/+/complete
pattern write nodes/+/failed

# 用户只能发布自己的状态
pattern write nodes/%u/status

# 任何人都可以读取状态
pattern read nodes/+/status

消息发布

发送任务

javascript
const msg = {
  sender: 'app-001',
  receiver: 'device-001',
  msg_id: uuid(),
  action: 'unlock',
  time: Math.floor(Date.now() / 1000),
  exp: Math.floor(Date.now() / 1000) + 60,
  value: { door: 'front' }
};

client.publish(
  `nodes/${msg.receiver}/pending`,
  JSON.stringify(msg),
  { qos: 1 }
);

回复结果

javascript
// 发送完成消息
const complete = {
  msg_id: receivedMsg.msg_id,
  value: { status: 'unlocked' }
};

client.publish(
  `nodes/${receivedMsg.sender}/complete`,
  JSON.stringify(complete),
  { qos: 1 }
);

发布状态(保留消息)

javascript
const status = {
  time: Math.floor(Date.now() / 1000),
  online: true,
  battery: 85,
  version: '1.2.0'
};

client.publish(
  `nodes/${myUsername}/status`,
  JSON.stringify(status),
  { qos: 1, retain: true }  // 保留消息
);

消息订阅

设备端订阅

只需接收任务的设备:

javascript
client.subscribe(`nodes/${myUsername}/pending`, { qos: 1 });

client.on('message', (topic, payload) => {
  const msg = JSON.parse(payload);

  // 检查过期
  if (msg.exp < Date.now() / 1000) {
    return;  // 丢弃过期消息
  }

  // 处理任务
  handleTask(msg);
});

应用端订阅

需要接收响应的应用:

javascript
client.subscribe([
  `nodes/${myUsername}/ack`,
  `nodes/${myUsername}/progress`,
  `nodes/${myUsername}/complete`,
  `nodes/${myUsername}/failed`,
], { qos: 1 });

命令行测试

使用 mosquitto_pub/sub

终端1 - 设备订阅任务:

bash
mosquitto_sub -h broker.example.com -t "nodes/device-001/pending" \
  -u device-001 -P secret -q 1

终端2 - 应用订阅响应:

bash
mosquitto_sub -h broker.example.com -t "nodes/app-001/complete" \
  -u app-001 -P secret -q 1

终端3 - 应用发送任务:

bash
mosquitto_pub -h broker.example.com -t "nodes/device-001/pending" \
  -u app-001 -P secret -q 1 \
  -m '{"sender":"app-001","receiver":"device-001","msg_id":"test-001","action":"ping","time":1700000000,"exp":9999999999}'

终端4 - 设备回复完成:

bash
mosquitto_pub -h broker.example.com -t "nodes/app-001/complete" \
  -u device-001 -P secret -q 1 \
  -m '{"msg_id":"test-001","value":"pong"}'

错误处理

连接断开

javascript
client.on('offline', () => {
  console.log('MQTT连接断开');
  // MQTT客户端会自动重连
  // 持久会话保证不丢消息
});

client.on('reconnect', () => {
  console.log('正在重连...');
});

发布失败

javascript
client.publish(topic, payload, { qos: 1 }, (err) => {
  if (err) {
    console.error('发布失败:', err);
    // 实现重试逻辑
  }
});

性能建议

消息大小

场景建议
常规控制指令< 1KB
状态上报< 500B
配置下发< 10KB
大数据传输分片或使用其他通道

发布频率

消息类型建议频率
任务消息按需发送
进度更新≤ 1次/秒
状态上报≈ 1次/分钟

并发连接

  • EMQX Serverless免费额度:约10设备24小时在线/月
  • 自建Broker:根据服务器配置决定

物联网设备通信协议文档