diff --git a/board/TencentOS_tiny_EVB_MX_Plus/KEIL/mqtt_based_qcould_firmware/TencentOS_tiny.uvoptx b/board/TencentOS_tiny_EVB_MX_Plus/KEIL/mqtt_based_qcould_firmware/TencentOS_tiny.uvoptx index 6d848292..60dc296c 100644 --- a/board/TencentOS_tiny_EVB_MX_Plus/KEIL/mqtt_based_qcould_firmware/TencentOS_tiny.uvoptx +++ b/board/TencentOS_tiny_EVB_MX_Plus/KEIL/mqtt_based_qcould_firmware/TencentOS_tiny.uvoptx @@ -790,7 +790,7 @@ kernel - 0 + 1 0 0 0 @@ -1182,7 +1182,7 @@ devices - 0 + 1 0 0 0 @@ -1222,7 +1222,7 @@ at - 0 + 1 0 0 0 diff --git a/devices/esp8266_tencent_firmware/esp8266_tencent_firmware.c b/devices/esp8266_tencent_firmware/esp8266_tencent_firmware.c index 973418c7..d0b8b446 100644 --- a/devices/esp8266_tencent_firmware/esp8266_tencent_firmware.c +++ b/devices/esp8266_tencent_firmware/esp8266_tencent_firmware.c @@ -345,8 +345,7 @@ static int esp8266_tencent_firmware_ota_read_fwdata(uint8_t *ota_fw_data_buffer, return tos_chr_fifo_pop_stream(&ota_fw_data_chr_fifo, ota_fw_data_buffer, read_len); } -__STATIC__ uint8_t topic_buffer[64]; -__STATIC__ uint8_t payload_buffer[64]; +static mqtt_message_t mqtt_message; void esp8266_tencent_firmware_recvpub(void) { @@ -366,14 +365,14 @@ void esp8266_tencent_firmware_recvpub(void) } else if (data == ',') { break; } - if (read_len < sizeof(topic_buffer)) { - topic_buffer[read_len++] = data; + if (read_len < sizeof(mqtt_message.topic)) { + mqtt_message.topic[read_len++] = data; } } - if (read_len == sizeof(topic_buffer)) { - topic_buffer[read_len - 1] = '\0'; + if (read_len == sizeof(mqtt_message.topic)) { + mqtt_message.topic[read_len - 1] = '\0'; } else { - topic_buffer[read_len] = '\0'; + mqtt_message.topic[read_len] = '\0'; } while (1) { @@ -386,17 +385,16 @@ void esp8266_tencent_firmware_recvpub(void) payload_len = payload_len * 10 + (data - '0'); } - if (payload_len > sizeof(payload_buffer)) { - payload_len = sizeof(payload_buffer); + if (payload_len > sizeof(mqtt_message.payload)) { + payload_len = sizeof(mqtt_message.payload); } - read_len = tos_at_uart_read(payload_buffer, payload_len + 2); + read_len = tos_at_uart_read((uint8_t*)mqtt_message.payload, payload_len + 2); if (read_len != payload_len + 2) { return; } - printf("topic received: %s\n", topic_buffer); - printf("payload: %s\n", payload_buffer); + tos_mail_q_post(&mqtt_message_mail, &mqtt_message, sizeof(mqtt_message_t)); } void esp8266_tencent_firmware_recvcmd(void) diff --git a/examples/tencent_firmware_mqtt/tencent_firmware_mqtt.c b/examples/tencent_firmware_mqtt/tencent_firmware_mqtt.c index f14a9ac9..2334224f 100644 --- a/examples/tencent_firmware_mqtt/tencent_firmware_mqtt.c +++ b/examples/tencent_firmware_mqtt/tencent_firmware_mqtt.c @@ -2,6 +2,15 @@ #include "esp8266_tencent_firmware.h" #include "tencent_firmware_module_wrapper.h" +void default_message_handler(mqtt_message_t* msg) +{ + printf("callback:\r\n"); + printf("---------------------------------------------------------\r\n"); + printf("\ttopic:%s\r\n", msg->topic); + printf("\tpayload:%s\r\n", msg->payload); + printf("---------------------------------------------------------\r\n"); +} + void mqtt_demo_task(void) { char payload[256] = {0}; @@ -37,23 +46,23 @@ void mqtt_demo_task(void) printf("MQTT: %s\n", state == MQTT_STATE_CONNECTED ? "CONNECTED" : "DISCONNECTED"); } - static char topic_name[TOPIC_MAX_SIZE] = {0}; - int size = snprintf(topic_name, TOPIC_MAX_SIZE, "%s/%s/data", product_id, device_name); + static char topic_name[TOPIC_NAME_MAX_SIZE] = {0}; + int size = snprintf(topic_name, TOPIC_NAME_MAX_SIZE, "%s/%s/data", product_id, device_name); if (size < 0 || size > sizeof(topic_name) - 1) { printf("topic content length not enough! content size:%d buf size:%d", size, (int)sizeof(topic_name)); } - if (tos_tf_module_mqtt_sub(topic_name, QOS0) != 0) { + if (tos_tf_module_mqtt_sub(topic_name, QOS0, default_message_handler) != 0) { printf("module mqtt sub fail\n"); } else { printf("module mqtt sub success\n"); } while (1) { - HAL_Delay(1000); + tos_sleep_ms(1000); /* use AT+PUB AT command */ - memset(payload, 0, sizeof(payload)); + memset(payload, 0, sizeof(payload)); strncpy(payload, "{\\\"type\\\":\\\"get\\\"\\, \\\"clientToken\\\":\\\"03UKNYBUZG-0\\\"}", sizeof(payload)); printf("message publish: %s\n", payload); if (tos_tf_module_mqtt_pub(topic_name, QOS0, payload) != 0) { diff --git a/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.c b/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.c index 2c64f178..67b8a0df 100644 --- a/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.c +++ b/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.c @@ -2,6 +2,48 @@ static tencent_firmware_module_t *g_tencent_firmware_module = NULL; +static k_task_t mqtt_message_handle_task; +static k_stack_t mqtt_message_handle_task_stack[MQTT_MESSAGE_HANDLE_TASK_STACK_SIZE]; + +static mqtt_message_t mqtt_message; +static uint8_t mqtt_message_pool[MQTT_MESSAGE_POOL_SIZE]; + +static k_list_t mqtt_sub_list; +static k_mmblk_pool_t sub_list_node_mbp; +static mqtt_message_handlers_t sub_list_node_pool[MQTT_SUB_TOPIC_HANDLES_POOL_SIZE]; + +k_mail_q_t mqtt_message_mail; + +void mqtt_message_handle_task_entry(void *arg) +{ + /* + topic:$thing/down/property/xxx/xxx + payload: + "{ + "method":"xxx", + "clientToken":"xxx", + "code":0, + "status":"success" + }" + */ + size_t message_size; + k_list_t *cur; + mqtt_message_handlers_t *mqtt_message_handler; + + while (K_TRUE) { + tos_mail_q_pend(&mqtt_message_mail, &mqtt_message, &message_size, TOS_TIME_FOREVER); + + TOS_LIST_FOR_EACH(cur, &mqtt_sub_list) { + mqtt_message_handler = TOS_LIST_ENTRY(cur, mqtt_message_handlers_t, list); + + if (strcmp(mqtt_message_handler->topic_filter, mqtt_message.topic) == 0) { + mqtt_message_handler->handler(&mqtt_message); + } + } + } +} + + int tos_tf_module_register(tencent_firmware_module_t *module) { if (!g_tencent_firmware_module) { @@ -12,10 +54,27 @@ int tos_tf_module_register(tencent_firmware_module_t *module) } int tos_tf_module_init(void) -{ +{ + if (tos_mail_q_create(&mqtt_message_mail, mqtt_message_pool, MQTT_MESSAGE_NUM_MAX, sizeof(mqtt_message_t)) != K_ERR_NONE) { + return -1; + } + + tos_list_init(&mqtt_sub_list); + + if (tos_mmblk_pool_create(&sub_list_node_mbp, sub_list_node_pool, MQTT_SUB_TOPIC_MAX, sizeof(mqtt_message_handlers_t)) != K_ERR_NONE) { + return -1; + } + + if (tos_task_create(&mqtt_message_handle_task, "mqtt_message_handle", + mqtt_message_handle_task_entry, NULL, MQTT_MESSAGE_HANDLE_TASK_PRIO, + mqtt_message_handle_task_stack, MQTT_MESSAGE_HANDLE_TASK_STACK_SIZE, 10) != K_ERR_NONE) { + return -1; + } + if (g_tencent_firmware_module && g_tencent_firmware_module->init) { return g_tencent_firmware_module->init(); } + return -1; } @@ -59,8 +118,20 @@ int tos_tf_module_mqtt_publ(const char *topic, qos_t qos, char *payload) return -1; } -int tos_tf_module_mqtt_sub(char *topic, qos_t qos) +int tos_tf_module_mqtt_sub(char *topic, qos_t qos, message_handler_t handle) { + mqtt_message_handlers_t *mqtt_message_handler; + + if (tos_mmblk_alloc(&sub_list_node_mbp, (void*)&mqtt_message_handler) != K_ERR_NONE) { + return -1; + } + + mqtt_message_handler->topic_filter = topic; + mqtt_message_handler->qos = qos; + mqtt_message_handler->handler = handle; + + tos_list_add_tail(&mqtt_message_handler->list, &mqtt_sub_list); + if (g_tencent_firmware_module && g_tencent_firmware_module->mqtt_sub) { return g_tencent_firmware_module->mqtt_sub(topic, qos); } @@ -69,6 +140,20 @@ int tos_tf_module_mqtt_sub(char *topic, qos_t qos) int tos_tf_module_mqtt_unsub(const char *topic) { + k_list_t *cur; + mqtt_message_handlers_t *mqtt_message_handler; + + TOS_LIST_FOR_EACH(cur, &mqtt_sub_list) { + mqtt_message_handler = TOS_LIST_ENTRY(cur, mqtt_message_handlers_t, list); + + if (strcmp(mqtt_message_handler->topic_filter, topic) == 0) { + tos_list_del(&mqtt_message_handler->list); + if (tos_mmblk_free(&sub_list_node_mbp, (void*)&mqtt_message_handler) != K_ERR_NONE) { + return -1; + } + } + } + if (g_tencent_firmware_module && g_tencent_firmware_module->mqtt_unsub) { return g_tencent_firmware_module->mqtt_unsub(topic); } diff --git a/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.h b/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.h index 3df31555..2044989a 100644 --- a/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.h +++ b/net/tencent_firmware_module_wrapper/tencent_firmware_module_wrapper.h @@ -20,6 +20,7 @@ #include #include +#include "tos_k.h" #define AUTH_MODE_KEY 1 #define AUTH_MODE_CERT 2 @@ -39,10 +40,18 @@ #define DEVICE_KEY_FILE_NAME_MAX_SIZE 128 -#define TOPIC_MAX_SIZE ((DEVICE_NAME_MAX_SIZE) + (PRODUCT_ID_MAX_SIZE) + 64) +#define TOPIC_NAME_MAX_SIZE ((DEVICE_NAME_MAX_SIZE) + (PRODUCT_ID_MAX_SIZE) + 64) #define PUB_PAYLOAD_MAX_SIZE 200 +#define MQTT_MESSAGE_HANDLE_TASK_STACK_SIZE 512 +#define MQTT_MESSAGE_HANDLE_TASK_PRIO 3 +#define MQTT_MESSAGE_NUM_MAX 3 +#define MQTT_MESSAGE_POOL_SIZE MQTT_MESSAGE_NUM_MAX*sizeof(mqtt_message_t) + +#define MQTT_SUB_TOPIC_MAX 5 +#define MQTT_SUB_TOPIC_HANDLES_POOL_SIZE MQTT_SUB_TOPIC_MAX * sizeof(mqtt_message_handlers_t) + typedef enum mqtt_state_en { MQTT_STATE_DISCONNECTED, MQTT_STATE_CONNECTED, @@ -68,6 +77,22 @@ typedef struct mqtt_param_st { uint8_t auto_connect_enable; } mqtt_param_t; +typedef struct mqtt_message_st { + char topic[64]; + char payload[128]; +} mqtt_message_t; + +typedef void (*message_handler_t)(mqtt_message_t* msg); + +typedef struct mqtt_message_handlers_st { + k_list_t list; + qos_t qos; + const char* topic_filter; + message_handler_t handler; +} mqtt_message_handlers_t; + +extern k_mail_q_t mqtt_message_mail; + #define DEFAULT_MQTT_PARAMS { TLS_MODE_PSK, MQTT_COMMAND_TIMEOUT, 240, 1, 1 } typedef struct device_info_st { @@ -201,10 +226,11 @@ int tos_tf_module_mqtt_publ(const char *topic, qos_t qos, char *payload); * * @param[in] topic mqtt topic * @param[in] qos quality of service + * @param[in] handle will be callback when topic arrive * * @return errcode */ -int tos_tf_module_mqtt_sub(char *topic, qos_t qos); +int tos_tf_module_mqtt_sub(char *topic, qos_t qos, message_handler_t handle); /** * @brief Unsubscribe a mqtt topic.