add sub callback function in esp8266_tc_fw warpper

This commit is contained in:
mculover666
2020-07-29 10:18:09 +08:00
parent c16a12d34b
commit 0f68497eea
5 changed files with 142 additions and 24 deletions

View File

@@ -2,6 +2,48 @@
static tencent_firmware_module_t *g_tencent_firmware_module = NULL;
static k_task_t mqtt_message_handle_task;
static k_stack_t mqtt_message_handle_task_stack[MQTT_MESSAGE_HANDLE_TASK_STACK_SIZE];
static mqtt_message_t mqtt_message;
static uint8_t mqtt_message_pool[MQTT_MESSAGE_POOL_SIZE];
static k_list_t mqtt_sub_list;
static k_mmblk_pool_t sub_list_node_mbp;
static mqtt_message_handlers_t sub_list_node_pool[MQTT_SUB_TOPIC_HANDLES_POOL_SIZE];
k_mail_q_t mqtt_message_mail;
void mqtt_message_handle_task_entry(void *arg)
{
/*
topic:$thing/down/property/xxx/xxx
payload:
"{
"method":"xxx",
"clientToken":"xxx",
"code":0,
"status":"success"
}"
*/
size_t message_size;
k_list_t *cur;
mqtt_message_handlers_t *mqtt_message_handler;
while (K_TRUE) {
tos_mail_q_pend(&mqtt_message_mail, &mqtt_message, &message_size, TOS_TIME_FOREVER);
TOS_LIST_FOR_EACH(cur, &mqtt_sub_list) {
mqtt_message_handler = TOS_LIST_ENTRY(cur, mqtt_message_handlers_t, list);
if (strcmp(mqtt_message_handler->topic_filter, mqtt_message.topic) == 0) {
mqtt_message_handler->handler(&mqtt_message);
}
}
}
}
int tos_tf_module_register(tencent_firmware_module_t *module)
{
if (!g_tencent_firmware_module) {
@@ -12,10 +54,27 @@ int tos_tf_module_register(tencent_firmware_module_t *module)
}
int tos_tf_module_init(void)
{
{
if (tos_mail_q_create(&mqtt_message_mail, mqtt_message_pool, MQTT_MESSAGE_NUM_MAX, sizeof(mqtt_message_t)) != K_ERR_NONE) {
return -1;
}
tos_list_init(&mqtt_sub_list);
if (tos_mmblk_pool_create(&sub_list_node_mbp, sub_list_node_pool, MQTT_SUB_TOPIC_MAX, sizeof(mqtt_message_handlers_t)) != K_ERR_NONE) {
return -1;
}
if (tos_task_create(&mqtt_message_handle_task, "mqtt_message_handle",
mqtt_message_handle_task_entry, NULL, MQTT_MESSAGE_HANDLE_TASK_PRIO,
mqtt_message_handle_task_stack, MQTT_MESSAGE_HANDLE_TASK_STACK_SIZE, 10) != K_ERR_NONE) {
return -1;
}
if (g_tencent_firmware_module && g_tencent_firmware_module->init) {
return g_tencent_firmware_module->init();
}
return -1;
}
@@ -59,8 +118,20 @@ int tos_tf_module_mqtt_publ(const char *topic, qos_t qos, char *payload)
return -1;
}
int tos_tf_module_mqtt_sub(char *topic, qos_t qos)
int tos_tf_module_mqtt_sub(char *topic, qos_t qos, message_handler_t handle)
{
mqtt_message_handlers_t *mqtt_message_handler;
if (tos_mmblk_alloc(&sub_list_node_mbp, (void*)&mqtt_message_handler) != K_ERR_NONE) {
return -1;
}
mqtt_message_handler->topic_filter = topic;
mqtt_message_handler->qos = qos;
mqtt_message_handler->handler = handle;
tos_list_add_tail(&mqtt_message_handler->list, &mqtt_sub_list);
if (g_tencent_firmware_module && g_tencent_firmware_module->mqtt_sub) {
return g_tencent_firmware_module->mqtt_sub(topic, qos);
}
@@ -69,6 +140,20 @@ int tos_tf_module_mqtt_sub(char *topic, qos_t qos)
int tos_tf_module_mqtt_unsub(const char *topic)
{
k_list_t *cur;
mqtt_message_handlers_t *mqtt_message_handler;
TOS_LIST_FOR_EACH(cur, &mqtt_sub_list) {
mqtt_message_handler = TOS_LIST_ENTRY(cur, mqtt_message_handlers_t, list);
if (strcmp(mqtt_message_handler->topic_filter, topic) == 0) {
tos_list_del(&mqtt_message_handler->list);
if (tos_mmblk_free(&sub_list_node_mbp, (void*)&mqtt_message_handler) != K_ERR_NONE) {
return -1;
}
}
}
if (g_tencent_firmware_module && g_tencent_firmware_module->mqtt_unsub) {
return g_tencent_firmware_module->mqtt_unsub(topic);
}

View File

@@ -20,6 +20,7 @@
#include <stdint.h>
#include <stdio.h>
#include "tos_k.h"
#define AUTH_MODE_KEY 1
#define AUTH_MODE_CERT 2
@@ -39,10 +40,18 @@
#define DEVICE_KEY_FILE_NAME_MAX_SIZE 128
#define TOPIC_MAX_SIZE ((DEVICE_NAME_MAX_SIZE) + (PRODUCT_ID_MAX_SIZE) + 64)
#define TOPIC_NAME_MAX_SIZE ((DEVICE_NAME_MAX_SIZE) + (PRODUCT_ID_MAX_SIZE) + 64)
#define PUB_PAYLOAD_MAX_SIZE 200
#define MQTT_MESSAGE_HANDLE_TASK_STACK_SIZE 512
#define MQTT_MESSAGE_HANDLE_TASK_PRIO 3
#define MQTT_MESSAGE_NUM_MAX 3
#define MQTT_MESSAGE_POOL_SIZE MQTT_MESSAGE_NUM_MAX*sizeof(mqtt_message_t)
#define MQTT_SUB_TOPIC_MAX 5
#define MQTT_SUB_TOPIC_HANDLES_POOL_SIZE MQTT_SUB_TOPIC_MAX * sizeof(mqtt_message_handlers_t)
typedef enum mqtt_state_en {
MQTT_STATE_DISCONNECTED,
MQTT_STATE_CONNECTED,
@@ -68,6 +77,22 @@ typedef struct mqtt_param_st {
uint8_t auto_connect_enable;
} mqtt_param_t;
typedef struct mqtt_message_st {
char topic[64];
char payload[128];
} mqtt_message_t;
typedef void (*message_handler_t)(mqtt_message_t* msg);
typedef struct mqtt_message_handlers_st {
k_list_t list;
qos_t qos;
const char* topic_filter;
message_handler_t handler;
} mqtt_message_handlers_t;
extern k_mail_q_t mqtt_message_mail;
#define DEFAULT_MQTT_PARAMS { TLS_MODE_PSK, MQTT_COMMAND_TIMEOUT, 240, 1, 1 }
typedef struct device_info_st {
@@ -201,10 +226,11 @@ int tos_tf_module_mqtt_publ(const char *topic, qos_t qos, char *payload);
*
* @param[in] topic mqtt topic
* @param[in] qos quality of service
* @param[in] handle will be callback when topic arrive
*
* @return errcode
*/
int tos_tf_module_mqtt_sub(char *topic, qos_t qos);
int tos_tf_module_mqtt_sub(char *topic, qos_t qos, message_handler_t handle);
/**
* @brief Unsubscribe a mqtt topic.