Skip to content

高级特性

本文档介绍FDP-P2P的高级MQTT特性应用。

持久会话

持久会话允许低功耗设备间歇性连接而不丢失消息。

工作原理

关键配置

配置项说明
Clean Sessionfalse启用持久会话
Client ID固定值必须使用相同的Client ID恢复会话
QoS1 或 2QoS 0消息不会被存储

代码示例

javascript
// 设备端连接配置
const client = mqtt.connect('mqtt://broker.example.com', {
  clientId: 'device-001',  // 必须固定
  clean: false,            // 持久会话
});

// 首次连接后订阅
client.on('connect', () => {
  // 持久会话会保留订阅,但首次连接需要订阅
  client.subscribe('nodes/device-001/pending', { qos: 1 });
});

注意事项

Broker限制

Broker通常对持久会话有限制:

  • 离线消息数量上限
  • 消息大小限制
  • 消息保留时间

请查阅你所使用的Broker文档。

遗嘱消息

遗嘱消息用于在节点异常断开时自动通知其他节点。

工作原理

配置方式

javascript
const client = mqtt.connect('mqtt://broker.example.com', {
  clientId: 'device-001',
  will: {
    topic: 'nodes/device-001/status',
    payload: JSON.stringify({
      time: Math.floor(Date.now() / 1000),
      online: false
    }),
    qos: 1,
    retain: true  // 保留消息
  }
});

触发条件

场景是否触发遗嘱
网络故障✅ 触发
程序崩溃✅ 触发
Keep-alive超时✅ 触发
正常断开(DISCONNECT)❌ 不触发

最佳实践

  1. 正常上线时发布在线状态

    json
    {"time": 1700000000, "online": true, "version": "1.0.0"}
  2. 定期更新状态(如每分钟一次)

  3. 正常下线时主动发布离线状态

    javascript
    // 正常退出前
    client.publish('nodes/device-001/status',
      JSON.stringify({ time: now, online: false }),
      { retain: true }
    );
    client.end();  // 正常断开,不触发遗嘱
  4. 遗嘱作为兜底,确保异常情况也能通知

保留消息

保留消息让新订阅者立即获取最新状态。

工作原理

特点

  • 每个主题只能有一个保留消息
  • 新消息会替换旧消息
  • 发送空载荷可删除保留消息
  • 保留消息由Broker永久存储(直到被替换或删除)

适用场景

场景适合使用保留消息
设备在线状态✅ 是
设备当前配置✅ 是
任务消息❌ 否
进度更新❌ 否

删除保留消息

javascript
// 发送空载荷删除保留消息
client.publish('nodes/device-001/status', '', { retain: true });

QoS级别

级别说明

QoS名称保证适用场景
0最多一次可能丢失进度更新、日志
1至少一次不丢失,可能重复FDP默认
2恰好一次不丢失,不重复资金交易(开销大)

FDP推荐配置

消息类型推荐QoS
Pending1
Ack1
Progress0
Complete1
Failed1
Status1

QoS 1重复场景

QoS 1可能在以下情况产生重复消息:

  1. PUBACK确认包在网络中丢失
  2. 断开重连后Broker重传未确认消息
  3. 网络延迟导致超时重发

重复率参考

  • 稳定网络:< 1%
  • 不稳定网络(移动网络):5-10%

处理重复

FDP要求业务层实现幂等处理,参考任务生命周期 - 幂等性要求

消息顺序

MQTT不保证顺序

即使使用QoS 1,消息也可能乱序到达:

  • Broker处理重排序
  • 网络路由变化
  • 重传机制

顺序敏感场景处理

如果业务需要保证顺序:

方案1:序列号

json
{
  "msg_id": "...",
  "seq": 42,
  "action": "..."
}

接收方只处理seq > last_processed_seq的消息。

方案2:时间戳过滤

json
{
  "msg_id": "...",
  "time": 1700000042,
  "action": "set_config",
  "value": {...}
}

对于配置类操作,只应用时间戳最新的。

方案3:依赖链

json
{
  "msg_id": "msg-003",
  "depends_on": "msg-002",
  "action": "..."
}

等待依赖消息处理完成后再处理。

连接保活

Keep-alive机制

MQTT使用Keep-alive检测连接状态:

javascript
const client = mqtt.connect('mqtt://broker.example.com', {
  keepalive: 60,  // 秒
});
  • 客户端在keepalive时间内无消息时发送PINGREQ
  • Broker回复PINGRESP
  • 超过1.5倍keepalive时间无响应则认为断开

推荐值

场景Keep-alive
常驻设备60秒
移动应用30秒
低功耗设备300秒或更长

安全建议

传输加密

方式端口说明
TCP1883明文,仅用于开发
TLS8883生产环境必须
WebSocketws://.../明文
WSSwss://.../加密WebSocket

认证方式

  • 用户名/密码认证
  • 客户端证书(mTLS)
  • Token认证

端对端安全

TLS只保护传输通道,不能验证消息发送者身份。如需端对端安全,请使用FDP-Security安全协议

物联网设备通信协议文档