Appearance
MQTT实现指南
本文档提供FDP-P2P在MQTT传输层上的实现指南。
连接配置
基本参数
| 参数 | 推荐值 | 说明 |
|---|---|---|
| MQTT版本 | 3.1.1 或 5.0 | 5.0支持更多特性 |
| Clean Session | false | 启用持久会话 |
| QoS | 1 | 至少一次送达 |
| Client ID | 唯一且稳定 | 用于恢复持久会话 |
连接示例
javascript
const client = mqtt.connect('mqtt://broker.example.com', {
clientId: 'device-001',
username: 'device-001',
password: 'secret',
clean: false, // 持久会话
});主题映射
FDP-P2P主题与MQTT主题一一对应:
FDP主题 → MQTT主题
nodes/{username}/pending → nodes/{username}/pending
nodes/{username}/ack → nodes/{username}/ack
nodes/{username}/progress → nodes/{username}/progress
nodes/{username}/complete → nodes/{username}/complete
nodes/{username}/failed → nodes/{username}/failed
nodes/{username}/status → nodes/{username}/statusACL配置
安全原则
- 节点只能订阅自己的主题
- 节点可以发布到任何节点的主题
- 状态主题对所有节点可读
EMQX ACL规则
erlang
%% 允许节点订阅自己的所有主题
{allow, all, subscribe, ["nodes/${username}/+"]}.
%% 允许节点发布到任何节点的 pending 主题
{allow, all, publish, ["nodes/+/pending"]}.
%% 允许节点发布到任何节点的响应主题
{allow, all, publish, ["nodes/+/ack"]}.
{allow, all, publish, ["nodes/+/progress"]}.
{allow, all, publish, ["nodes/+/complete"]}.
{allow, all, publish, ["nodes/+/failed"]}.
%% 允许节点发布自己的状态
{allow, all, publish, ["nodes/${username}/status"]}.
%% 允许任何人订阅任何人的状态
{allow, all, subscribe, ["nodes/+/status"]}.
%% 拒绝其他所有操作
{deny, all}.Mosquitto ACL规则
# 用户只能订阅自己的主题
pattern read nodes/%u/#
# 用户可以发布到任何节点
pattern write nodes/+/pending
pattern write nodes/+/ack
pattern write nodes/+/progress
pattern write nodes/+/complete
pattern write nodes/+/failed
# 用户只能发布自己的状态
pattern write nodes/%u/status
# 任何人都可以读取状态
pattern read nodes/+/status消息发布
发送任务
javascript
const msg = {
sender: 'app-001',
receiver: 'device-001',
msg_id: uuid(),
action: 'unlock',
time: Math.floor(Date.now() / 1000),
exp: Math.floor(Date.now() / 1000) + 60,
value: { door: 'front' }
};
client.publish(
`nodes/${msg.receiver}/pending`,
JSON.stringify(msg),
{ qos: 1 }
);回复结果
javascript
// 发送完成消息
const complete = {
msg_id: receivedMsg.msg_id,
value: { status: 'unlocked' }
};
client.publish(
`nodes/${receivedMsg.sender}/complete`,
JSON.stringify(complete),
{ qos: 1 }
);发布状态(保留消息)
javascript
const status = {
time: Math.floor(Date.now() / 1000),
online: true,
battery: 85,
version: '1.2.0'
};
client.publish(
`nodes/${myUsername}/status`,
JSON.stringify(status),
{ qos: 1, retain: true } // 保留消息
);消息订阅
设备端订阅
只需接收任务的设备:
javascript
client.subscribe(`nodes/${myUsername}/pending`, { qos: 1 });
client.on('message', (topic, payload) => {
const msg = JSON.parse(payload);
// 检查过期
if (msg.exp < Date.now() / 1000) {
return; // 丢弃过期消息
}
// 处理任务
handleTask(msg);
});应用端订阅
需要接收响应的应用:
javascript
client.subscribe([
`nodes/${myUsername}/ack`,
`nodes/${myUsername}/progress`,
`nodes/${myUsername}/complete`,
`nodes/${myUsername}/failed`,
], { qos: 1 });命令行测试
使用 mosquitto_pub/sub
终端1 - 设备订阅任务:
bash
mosquitto_sub -h broker.example.com -t "nodes/device-001/pending" \
-u device-001 -P secret -q 1终端2 - 应用订阅响应:
bash
mosquitto_sub -h broker.example.com -t "nodes/app-001/complete" \
-u app-001 -P secret -q 1终端3 - 应用发送任务:
bash
mosquitto_pub -h broker.example.com -t "nodes/device-001/pending" \
-u app-001 -P secret -q 1 \
-m '{"sender":"app-001","receiver":"device-001","msg_id":"test-001","action":"ping","time":1700000000,"exp":9999999999}'终端4 - 设备回复完成:
bash
mosquitto_pub -h broker.example.com -t "nodes/app-001/complete" \
-u device-001 -P secret -q 1 \
-m '{"msg_id":"test-001","value":"pong"}'错误处理
连接断开
javascript
client.on('offline', () => {
console.log('MQTT连接断开');
// MQTT客户端会自动重连
// 持久会话保证不丢消息
});
client.on('reconnect', () => {
console.log('正在重连...');
});发布失败
javascript
client.publish(topic, payload, { qos: 1 }, (err) => {
if (err) {
console.error('发布失败:', err);
// 实现重试逻辑
}
});性能建议
消息大小
| 场景 | 建议 |
|---|---|
| 常规控制指令 | < 1KB |
| 状态上报 | < 500B |
| 配置下发 | < 10KB |
| 大数据传输 | 分片或使用其他通道 |
发布频率
| 消息类型 | 建议频率 |
|---|---|
| 任务消息 | 按需发送 |
| 进度更新 | ≤ 1次/秒 |
| 状态上报 | ≈ 1次/分钟 |
并发连接
- EMQX Serverless免费额度:约10设备24小时在线/月
- 自建Broker:根据服务器配置决定
