From 4fa63c83d9e83405c22dfebf64fdab6e90bc6a3b Mon Sep 17 00:00:00 2001 From: mculover666 <2412828003@qq.com> Date: Sun, 30 Aug 2020 15:27:43 +0800 Subject: [PATCH] improve cstx board demo --- .../mqttclient_iot_explorer_ds18b20_relay.c | 230 +++++++++++------- .../TOS_CONFIG/tos_config.h | 2 + 2 files changed, 148 insertions(+), 84 deletions(-) diff --git a/board/CSTX_STM32F103RE_DTU/KEIL/mqttclient_iot_explorer_ds18b20_relay/demo/mqttclient_iot_explorer_ds18b20_relay.c b/board/CSTX_STM32F103RE_DTU/KEIL/mqttclient_iot_explorer_ds18b20_relay/demo/mqttclient_iot_explorer_ds18b20_relay.c index 4d7cb46e..3fd6865c 100644 --- a/board/CSTX_STM32F103RE_DTU/KEIL/mqttclient_iot_explorer_ds18b20_relay/demo/mqttclient_iot_explorer_ds18b20_relay.c +++ b/board/CSTX_STM32F103RE_DTU/KEIL/mqttclient_iot_explorer_ds18b20_relay/demo/mqttclient_iot_explorer_ds18b20_relay.c @@ -27,19 +27,63 @@ void mqtt_set_esp8266_port(hal_uart_port_t port) { } #endif -k_event_t report_result_event; -k_event_flag_t report_success = 1<<0; -k_event_flag_t report_fail = 1<<1; - +/* 数据模板 */ #define REPORT_DATA_TEMPLATE "{\"method\":\"report\",\"clientToken\":\"00000001\",\"params\":{\"TempValue\":%d.%-2d}}" #define CONTROL_REPLY_DATA "{\"method\":\"control_reply\",\"clientToken\":\"%s\",\"code\":0,\"status\":\"success\"}" +/* 接收数据最大缓存条数 */ +#define MESSAGE_MAX 5 + +/* 解析任务参数 */ +#define PAYLOAD_PARSER_TASK_PRIO 4 +#define PAYLOAD_PARSER_TASK_SIZE 1024 + +typedef struct payload_mail_st { + void* client; + void* payload; +} payload_mail_t; + +k_mail_q_t mail_q; +uint8_t mail_pool[MESSAGE_MAX * sizeof(payload_mail_t *)]; + char report_buf[200]; char reply_buf[200]; +k_event_t report_result_event; +k_event_flag_t report_success = 1<<0; +k_event_flag_t report_fail = 1<<1; + +k_mutex_t mqtt_publish_mutex; + +k_task_t payload_parser_task; +k_stack_t payload_parser_task_stack[PAYLOAD_PARSER_TASK_SIZE]; + static void tos_topic_handler(void* client, message_data_t* msg) { - (void) client; + char* payload_ptr = NULL; + payload_mail_t payload_mail; + + /* 打印日志 */ + MQTT_LOG_I("-----------------------------------------------------------------------------------"); + MQTT_LOG_I("%s:%d %s()...\ntopic: %s, qos: %d. \nmessage:\n\t%s\n", __FILE__, __LINE__, __FUNCTION__, + msg->topic_name, msg->message->qos, (char*)msg->message->payload); + MQTT_LOG_I("-----------------------------------------------------------------------------------\n"); + + /* 申请动态内存空间,存放payload */ + payload_ptr = (char*)tos_mmheap_alloc(msg->message->payloadlen); + strcpy(payload_ptr, (char*)msg->message->payload); + + payload_mail.client = client; + payload_mail.payload = payload_ptr; + + /* 发送给解析任务 */ + tos_mail_q_post(&mail_q, &payload_mail, sizeof(payload_mail_t)); + + return; +} + +void msg_payload_parser_task(void *args) +{ cJSON* cjson_root = NULL; cJSON* cjson_status = NULL; cJSON* cjson_method = NULL; @@ -50,97 +94,103 @@ static void tos_topic_handler(void* client, message_data_t* msg) char* method = NULL; char* client_token = NULL; int relay_status = 0; - k_event_flag_t event_flag = report_fail; - mqtt_message_t reply_msg; + int error; - - /* 打印日志 */ - MQTT_LOG_I("-----------------------------------------------------------------------------------"); - MQTT_LOG_I("%s:%d %s()...\ntopic: %s, qos: %d. \nmessage:\n\t%s\n", __FILE__, __LINE__, __FUNCTION__, - msg->topic_name, msg->message->qos, (char*)msg->message->payload); - MQTT_LOG_I("-----------------------------------------------------------------------------------\n"); + size_t payload_mail_size; + mqtt_message_t reply_msg; + payload_mail_t payload_mail; + k_event_flag_t event_flag = report_fail; - /* 使用cjson解析上报响应数据 */ - cjson_root = cJSON_Parse((char*)msg->message->payload); - if (cjson_root == NULL) { - printf("subscribe message parser fail\r\n"); - event_flag = report_fail; - goto exit; - } - - /* 提取消息类型 */ - cjson_method = cJSON_GetObjectItem(cjson_root, "method"); - method = cJSON_GetStringValue(cjson_method); - - /* 判断是哪种类型的消息 */ - if (strstr(method, "report_reply")) { - - //数据上报响应报文,提取status状态值 - cjson_status = cJSON_GetObjectItem(cjson_root, "status"); - status = cJSON_GetStringValue(cjson_status); - if (cjson_status == NULL || status == NULL) { - printf("report reply status parser fail\r\n"); - event_flag = report_fail; - goto exit; - } - - //判断status状态 - if (strstr(status,"success")) { - event_flag = report_success; - }else { - event_flag = report_fail; - } - } else if (strstr(method, "control")) { - - //收到平台下发的控制报文,提取client_token,用于上报响应 - cjson_client_token = cJSON_GetObjectItem(cjson_root, "clientToken"); - client_token = cJSON_GetStringValue(cjson_client_token); - - printf("parse client token:%s\r\n", client_token); - - //提取 relay_status - cjson_params = cJSON_GetObjectItem(cjson_root, "params"); - cjson_relay_status = cJSON_GetObjectItem(cjson_params, "relay_status"); - relay_status = cjson_relay_status->valueint; - - //根据 relay_status 执行相应的动作 - if (relay_status == 0) { - HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_SET); - } else if (relay_status == 1) { - HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_RESET); - } - - memset(reply_buf, 0, sizeof(reply_buf)); - sprintf(reply_buf, CONTROL_REPLY_DATA, client_token); - memset(&reply_msg, 0, sizeof(reply_msg)); - reply_msg.qos = QOS0; - reply_msg.payload = (void *) reply_buf; - - printf("control reply:\r\n\t%s\r\n", reply_buf); - - error = mqtt_publish(client, "$thing/up/property/E2IGF491FP/dev001", &reply_msg); - - MQTT_LOG_D("control reply publish error is %#0x", error); - + while (1) { + /* 释放cjson使用的动态内存 */ cJSON_Delete(cjson_root); cjson_root = NULL; method = NULL; + status = NULL; client_token = NULL; relay_status = 0; - return; + /* 永久等待消息 */ + tos_mail_q_pend(&mail_q, &payload_mail, &payload_mail_size, TOS_TIME_FOREVER); + + /* 等待到之后开始使用cjson解析 */ + cjson_root = cJSON_Parse((char*)payload_mail.payload); + if (cjson_root == NULL) { + tos_mmheap_free(payload_mail.payload); + printf("payload message parser fail\r\n"); + continue; + } + + /* 提取消息类型 */ + cjson_method = cJSON_GetObjectItem(cjson_root, "method"); + method = cJSON_GetStringValue(cjson_method); + + /* 判断是哪种类型的消息 */ + if (strstr(method, "report_reply")) { + + //数据上报响应报文,提取status状态值 + cjson_status = cJSON_GetObjectItem(cjson_root, "status"); + status = cJSON_GetStringValue(cjson_status); + if (cjson_status == NULL || status == NULL) { + printf("report reply status parser fail\r\n"); + event_flag = report_fail; + tos_mmheap_free(payload_mail.payload); + continue; + } + + //判断status状态 + if (strstr(status,"success")) { + event_flag = report_success; + }else { + event_flag = report_fail; + } + + //唤醒等待任务 + tos_event_post(&report_result_event, event_flag); + + } else if (strstr(method, "control")) { + + //收到平台下发的控制报文,提取client_token,用于上报响应 + cjson_client_token = cJSON_GetObjectItem(cjson_root, "clientToken"); + client_token = cJSON_GetStringValue(cjson_client_token); + + printf("parse client token:%s\r\n", client_token); + + //提取 relay_status + cjson_params = cJSON_GetObjectItem(cjson_root, "params"); + cjson_relay_status = cJSON_GetObjectItem(cjson_params, "relay_status"); + relay_status = cjson_relay_status->valueint; + + //根据 relay_status 执行相应的动作 + if (relay_status == 0) { + HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_SET); + } else if (relay_status == 1) { + HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_RESET); + } + + memset(reply_buf, 0, sizeof(reply_buf)); + sprintf(reply_buf, CONTROL_REPLY_DATA, client_token); + memset(&reply_msg, 0, sizeof(reply_msg)); + reply_msg.qos = QOS0; + reply_msg.payload = (void *) reply_buf; + + printf("control reply:\r\n\t%s\r\n", reply_buf); + + tos_mutex_pend(&mqtt_publish_mutex); + + error = mqtt_publish(payload_mail.client, "$thing/up/property/E2IGF491FP/dev001", &reply_msg); + + MQTT_LOG_D("control reply publish error is %#0x", error); + + tos_mutex_post(&mqtt_publish_mutex); + } + + tos_mmheap_free(payload_mail.payload); } -exit: - cJSON_Delete(cjson_root); - cjson_root = NULL; - status = NULL; - - tos_event_post(&report_result_event, event_flag); - - return; } + void mqttclient_task(void) { int error; @@ -213,6 +263,14 @@ void mqttclient_task(void) HAL_GPIO_WritePin(LED_G_GPIO_Port, LED_G_Pin, GPIO_PIN_RESET); } + /* 创建邮箱,创建数据解析任务 */ + tos_mutex_create(&mqtt_publish_mutex); + tos_mail_q_create(&mail_q, mail_pool, MESSAGE_MAX, sizeof(k_mail_q_t)); + tos_task_create(&payload_parser_task, "payload_parser_task", + msg_payload_parser_task, NULL, PAYLOAD_PARSER_TASK_PRIO, + payload_parser_task_stack, PAYLOAD_PARSER_TASK_SIZE, 10); + + error = mqtt_subscribe(client, "$thing/down/property/E2IGF491FP/dev001", QOS0, tos_topic_handler); MQTT_LOG_D("mqtt subscribe error is %#0x", error); @@ -230,10 +288,14 @@ void mqttclient_task(void) msg.qos = QOS0; msg.payload = (void *) report_buf; + tos_mutex_pend(&mqtt_publish_mutex); + error = mqtt_publish(client, "$thing/up/property/E2IGF491FP/dev001", &msg); MQTT_LOG_D("data report publish error is %#0x", error); + tos_mutex_post(&mqtt_publish_mutex); + tos_event_pend(&report_result_event, report_success|report_fail, &match_flag, diff --git a/board/CSTX_STM32F103RE_DTU/TOS_CONFIG/tos_config.h b/board/CSTX_STM32F103RE_DTU/TOS_CONFIG/tos_config.h index 2dcf307f..54d27ffe 100644 --- a/board/CSTX_STM32F103RE_DTU/TOS_CONFIG/tos_config.h +++ b/board/CSTX_STM32F103RE_DTU/TOS_CONFIG/tos_config.h @@ -21,6 +21,8 @@ #define TOS_CFG_MUTEX_EN 1u +#define TOS_CFG_MAIL_QUEUE_EN 1u + #define TOS_CFG_TIMER_EN 1u #define TOS_CFG_SEM_EN 1u