MQTT Agent 示例

本示例展示了如何使用 coreMQTT-Agent 库实现线程安全的 MQTT 客户端应用。

功能说明

  • 基于 coreMQTT-Agent 的线程安全 MQTT 操作

  • 使用 FreeRTOS 队列进行线程间消息传递

  • 支持异步订阅和发布操作

  • 命令完成回调机制

  • 多任务环境下的 MQTT 通信

硬件连接

本示例使用芯片内部 WiFi 外设,无需额外接线。

coreMQTT-Agent 简介

coreMQTT-Agent 是对 coreMQTT 库的线程安全封装,主要特性包括:

1. 线程安全

  • 所有 MQTT 操作通过命令队列序列化执行

  • 避免多线程环境下的竞态条件

  • 支持从多个任务中安全调用 MQTT API

2. 异步操作模型

  • 订阅/发布操作不会阻塞调用者

  • 通过回调函数通知操作完成

  • 可使用信号量实现同步等待

3. 专用 Agent 任务

  • 运行 MQTTAgent_CommandLoop() 处理所有 MQTT 操作

  • 统一管理网络收发和 MQTT 协议处理

  • 简化多线程 MQTT 应用开发

架构设计

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Publisher  │     │ Subscriber  │     │  Other Task │
│    Task     │     │    Task     │     │             │
└──────┬──────┘     └──────┬──────┘     └──────┬──────┘
       │                   │                   │
       │   Command Queue   │   Command Queue   │
       ├───────────────────┼───────────────────┤
       │                   │                   │
       ▼                   ▼                   ▼
┌────────────────────────────────────────────────────┐
│           MQTT Agent Task (Command Loop)           │
│  ┌──────────────────────────────────────────────┐  │
│  │  MQTTAgent_CommandLoop()                     │  │
│  │  - 接收命令队列中的命令                       │  │
│  │  - 执行 MQTT 操作 (Subscribe/Publish/etc)    │  │
│  │  - 处理网络事件和消息                         │  │
│  │  - 调用完成回调                               │  │
│  └──────────────────────────────────────────────┘  │
└──────────────────────┬─────────────────────────────┘
                       │
                       ▼
              ┌─────────────────┐
              │  MQTT Broker    │
              └─────────────────┘

核心 API

Agent 初始化

MQTTStatus_t MQTTAgent_Init(
    MQTTAgentContext_t * pMqttAgentContext,
    const MQTTAgentMessageInterface_t * pMsgInterface,
    const MQTTFixedBuffer_t * pNetworkBuffer,
    const TransportInterface_t * pTransportInterface,
    MQTTGetCurrentTimeFunc_t getCurrentTimeMs,
    IncomingPubCallback_t incomingCallback,
    void * pIncomingPacketContext
);

Agent 命令循环 (在专用任务中运行)

MQTTStatus_t MQTTAgent_CommandLoop(MQTTAgentContext_t * pMqttAgentContext);

线程安全的订阅操作

MQTTStatus_t MQTTAgent_Subscribe(
    const MQTTAgentContext_t * pMqttAgentContext,
    MQTTAgentSubscribeArgs_t * pSubscriptionArgs,
    MQTTAgentCommandInfo_t * pCommandInfo
);

线程安全的发布操作

MQTTStatus_t MQTTAgent_Publish(
    const MQTTAgentContext_t * pMqttAgentContext,
    MQTTPublishInfo_t * pPublishInfo,
    MQTTAgentCommandInfo_t * pCommandInfo
);

示例代码说明

1. 消息接口实现

示例实现了 MQTTAgentMessageInterface_t 结构,使用 FreeRTOS 队列:

MQTTAgentMessageInterface_t messageInterface = {
    .pMsgCtx = &g_command_queue_context,
    .send = agent_send_command,        // 发送命令到队列
    .recv = agent_recv_command,        // 从队列接收命令
    .getCommand = agent_get_command,   // 获取空闲命令结构
    .releaseCommand = agent_release_command  // 释放命令结构
};

2. Agent 任务

运行 MQTTAgent_CommandLoop() 的专用任务:

static void mqtt_agent_task(void *arg)
{
    MQTTStatus_t status = MQTTAgent_CommandLoop(&g_agent_context);
    // 命令循环退出表示连接断开或发生错误
}

3. 异步订阅

从任意任务中调用,不会阻塞:

MQTTAgentSubscribeArgs_t subscribeArgs = {
    .pSubscribeInfo = subscribeInfo,
    .numSubscriptions = 1
};

MQTTAgentCommandInfo_t commandInfo = {
    .cmdCompleteCallback = subscribe_command_callback,
    .pCmdCompleteCallbackContext = &subscribe_sem
};

MQTTAgent_Subscribe(&g_agent_context, &subscribeArgs, &commandInfo);

4. 异步发布

支持 QoS 0/1/2,使用回调通知完成:

MQTTPublishInfo_t publishInfo = {
    .qos = MQTTQoS1,
    .pTopicName = "/test/topic",
    .topicNameLength = strlen("/test/topic"),
    .pPayload = "Hello from Agent!",
    .payloadLength = strlen("Hello from Agent!")
};

MQTTAgentCommandInfo_t commandInfo = {
    .cmdCompleteCallback = publish_command_callback,
    .pCmdCompleteCallbackContext = &publish_sem
};

MQTTAgent_Publish(&g_agent_context, &publishInfo, &commandInfo);

5. 命令完成回调

操作完成后在 Agent 任务上下文中调用:

static void subscribe_command_callback(MQTTAgentCommandContext_t *pCommandContext,
                                       MQTTAgentReturnInfo_t *pReturnInfo)
{
    SemaphoreHandle_t sem = (SemaphoreHandle_t)pCommandContext;

    if (pReturnInfo->returnCode == MQTTSuccess) {
        CLOG("Subscribe command succeeded");
    } else {
        CLOG("Subscribe command failed: %d", pReturnInfo->returnCode);
    }

    xSemaphoreGive(sem);  // 通知等待的任务
}

配置说明

prj.conf 配置

# 启用 coreMQTT 基础库
CONFIG_SDK_MODULE_COREMQTT=y

# 启用 coreMQTT-Agent (线程安全封装)
CONFIG_SDK_MODULE_COREMQTT_AGENT=y

网络配置

修改 main.c 中的连接参数:

#define MQTT_BROKER_ADDR   "broker.emqx.io"  // MQTT Broker 地址
#define MQTT_BROKER_PORT   "1883"            // MQTT Broker 端口
#define MQTT_CLIENT_ID     "arcs_mqtt_agent_client"  // 客户端 ID

示例内容

  1. 初始化文件系统

  2. 连接 WiFi 网络

  3. 连接到 MQTT Broker

  4. 启动 Agent 任务

  5. 订阅主题并定期发布消息

编译

重要提示:在编译前,请先确认您使用的开发板型号。SDK 目前支持以下开发板:

  • arcs_evb - ARCS EVB 评估板

  • arcs_mini - ARCS Mini 开发板

根据您的开发板型号,选择对应的编译命令:

在示例目录下执行编译:

# 使用 arcs_evb 开发板
./build.sh -C -DBOARD=arcs_evb

# 或使用 arcs_mini 开发板
./build.sh -C -DBOARD=arcs_mini

Note

如果在 SDK 根目录执行,需要指定示例路径:

# 使用 arcs_evb 开发板
./build.sh -C -S samples/<示例路径> -DBOARD=arcs_evb

# 或使用 arcs_mini 开发板
./build.sh -C -S samples/<示例路径> -DBOARD=arcs_mini

Note

确保已安装对应的工具链。

烧录

编译完成后,使用 SDK tools 目录下的 cskburn 工具烧录固件:

./tools/burn/cskburn -s /dev/ttyUSB0 -b 3000000 0x0 build/arcs.bin -C arcs

Note

烧录参数说明

  • -s /dev/ttyUSB0:串口设备路径,需要根据实际情况修改 - Linux 系统:通常是 /dev/ttyUSB0/dev/ttyACM0 - 可通过 ls /dev/tty* 命令查看可用串口设备 - 不同开发板或 USB 转串口芯片可能使用不同的设备名

  • -b 3000000:烧录波特率(3Mbps)

  • 0x0:烧录起始地址

  • build/arcs.bin:编译生成的固件路径

  • -C arcs:芯片类型

注意事项

  • 确保开发板已正确连接到电脑

  • 如果无法识别串口设备,请检查 USB 连接线是否正常,或尝试其他 USB 端口

配置 WiFi

在设备串口终端中执行:

# 配置 WiFi SSID
kv set wifi_ssid "your_wifi_name"

# 配置 WiFi 密码
kv set wifi_passwd "your_wifi_password"

# 重启设备
reboot

预期输出

设备启动后会自动:

  1. 初始化文件系统

  2. 连接 WiFi 网络

  3. 连接到 MQTT Broker

  4. 启动 Agent 任务

  5. 订阅 /test/topic 主题

  6. 定期发布消息到 /test/topic

与标准 coreMQTT 的对比

特性

coreMQTT

coreMQTT-Agent

线程安全

否,需要手动加锁

是,内置线程安全

阻塞模式

同步阻塞

异步非阻塞

多任务支持

需要手动同步

原生支持

复杂度

较低

中等

适用场景

单线程应用

多线程 RTOS 应用

性能开销

较小

略高(队列和回调)

测试方法

使用 MQTTX 订阅消息

# 订阅设备发布的消息
mqttx sub -h broker.emqx.io -p 1883 -t "arcs/test/pub"

使用 MQTTX 发布消息

# 向设备发布消息
mqttx pub -h broker.emqx.io -p 1883 -t "arcs/test/sub" -m "Hello ARCS Mqtt Agent"

设备会收到消息并在串口打印。

注意事项

  1. 任务优先级: Agent 任务应有足够高的优先级,确保及时处理网络事件

  2. 栈大小: Agent 任务需要较大的栈空间(建议 ≥ 4KB)

  3. 队列深度: 命令队列深度根据并发命令数量调整

  4. 回调上下文: 回调函数在 Agent 任务中执行,避免长时间阻塞操作

  5. 命令池: 示例使用静态命令池,可根据需要调整大小

故障排除

编译错误: 找不到 core_mqtt_agent.h

确保 prj.conf 中启用了:

CONFIG_SDK_MODULE_COREMQTT_AGENT=y

Agent 命令循环退出

检查:

  • 网络连接是否稳定

  • Broker 地址和端口是否正确

  • Keep-alive 超时设置是否合理

订阅/发布超时

  • 检查命令队列是否已满

  • 确认 Agent 任务正常运行

  • 验证网络连接状态

参考资料