improve cstx board demo

This commit is contained in:
mculover666
2020-08-30 15:27:43 +08:00
parent a8d7951ea9
commit 4fa63c83d9
2 changed files with 148 additions and 84 deletions

View File

@@ -27,19 +27,63 @@ void mqtt_set_esp8266_port(hal_uart_port_t port) {
}
#endif
k_event_t report_result_event;
k_event_flag_t report_success = 1<<0;
k_event_flag_t report_fail = 1<<1;
/* <20><><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3> */
#define REPORT_DATA_TEMPLATE "{\"method\":\"report\",\"clientToken\":\"00000001\",\"params\":{\"TempValue\":%d.%-2d}}"
#define CONTROL_REPLY_DATA "{\"method\":\"control_reply\",\"clientToken\":\"%s\",\"code\":0,\"status\":\"success\"}"
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>󻺴<EFBFBD><F3BBBAB4><EFBFBD><EFBFBD><EFBFBD> */
#define MESSAGE_MAX 5
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
#define PAYLOAD_PARSER_TASK_PRIO 4
#define PAYLOAD_PARSER_TASK_SIZE 1024
typedef struct payload_mail_st {
void* client;
void* payload;
} payload_mail_t;
k_mail_q_t mail_q;
uint8_t mail_pool[MESSAGE_MAX * sizeof(payload_mail_t *)];
char report_buf[200];
char reply_buf[200];
k_event_t report_result_event;
k_event_flag_t report_success = 1<<0;
k_event_flag_t report_fail = 1<<1;
k_mutex_t mqtt_publish_mutex;
k_task_t payload_parser_task;
k_stack_t payload_parser_task_stack[PAYLOAD_PARSER_TASK_SIZE];
static void tos_topic_handler(void* client, message_data_t* msg)
{
(void) client;
char* payload_ptr = NULL;
payload_mail_t payload_mail;
/* <20><>ӡ<EFBFBD><D3A1>־ */
MQTT_LOG_I("-----------------------------------------------------------------------------------");
MQTT_LOG_I("%s:%d %s()...\ntopic: %s, qos: %d. \nmessage:\n\t%s\n", __FILE__, __LINE__, __FUNCTION__,
msg->topic_name, msg->message->qos, (char*)msg->message->payload);
MQTT_LOG_I("-----------------------------------------------------------------------------------\n");
/* <20><><EFBFBD>붯̬<EBB6AF>ڴ<EFBFBD><DAB4>ռ<D5BC><E4A3AC><EFBFBD><EFBFBD>payload */
payload_ptr = (char*)tos_mmheap_alloc(msg->message->payloadlen);
strcpy(payload_ptr, (char*)msg->message->payload);
payload_mail.client = client;
payload_mail.payload = payload_ptr;
/* <20><><EFBFBD>͸<EFBFBD><CDB8><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
tos_mail_q_post(&mail_q, &payload_mail, sizeof(payload_mail_t));
return;
}
void msg_payload_parser_task(void *args)
{
cJSON* cjson_root = NULL;
cJSON* cjson_status = NULL;
cJSON* cjson_method = NULL;
@@ -50,97 +94,103 @@ static void tos_topic_handler(void* client, message_data_t* msg)
char* method = NULL;
char* client_token = NULL;
int relay_status = 0;
k_event_flag_t event_flag = report_fail;
mqtt_message_t reply_msg;
int error;
/* <20><>ӡ<EFBFBD><D3A1>־ */
MQTT_LOG_I("-----------------------------------------------------------------------------------");
MQTT_LOG_I("%s:%d %s()...\ntopic: %s, qos: %d. \nmessage:\n\t%s\n", __FILE__, __LINE__, __FUNCTION__,
msg->topic_name, msg->message->qos, (char*)msg->message->payload);
MQTT_LOG_I("-----------------------------------------------------------------------------------\n");
size_t payload_mail_size;
mqtt_message_t reply_msg;
payload_mail_t payload_mail;
k_event_flag_t event_flag = report_fail;
/* ʹ<><CAB9>cjson<6F><6E><EFBFBD><EFBFBD><EFBFBD>ϱ<EFBFBD><CFB1><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD><EFBFBD> */
cjson_root = cJSON_Parse((char*)msg->message->payload);
if (cjson_root == NULL) {
printf("subscribe message parser fail\r\n");
event_flag = report_fail;
goto exit;
}
/* <20><>ȡ<EFBFBD><C8A1>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD> */
cjson_method = cJSON_GetObjectItem(cjson_root, "method");
method = cJSON_GetStringValue(cjson_method);
/* <20>ж<EFBFBD><D0B6><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>͵<EFBFBD><CDB5><EFBFBD>Ϣ */
if (strstr(method, "report_reply")) {
//<2F><><EFBFBD><EFBFBD><EFBFBD>ϱ<EFBFBD><CFB1><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD>ģ<EFBFBD><C4A3><EFBFBD>ȡstatus״ֵ̬
cjson_status = cJSON_GetObjectItem(cjson_root, "status");
status = cJSON_GetStringValue(cjson_status);
if (cjson_status == NULL || status == NULL) {
printf("report reply status parser fail\r\n");
event_flag = report_fail;
goto exit;
}
//<2F>ж<EFBFBD>status״̬
if (strstr(status,"success")) {
event_flag = report_success;
}else {
event_flag = report_fail;
}
} else if (strstr(method, "control")) {
//<2F>յ<EFBFBD>ƽ̨<C6BD>·<EFBFBD><C2B7>Ŀ<EFBFBD><C4BF>Ʊ<EFBFBD><C6B1>ģ<EFBFBD><C4A3><EFBFBD>ȡclient_token<65><6E><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϱ<EFBFBD><CFB1><EFBFBD>Ӧ
cjson_client_token = cJSON_GetObjectItem(cjson_root, "clientToken");
client_token = cJSON_GetStringValue(cjson_client_token);
printf("parse client token:%s\r\n", client_token);
//<2F><>ȡ relay_status
cjson_params = cJSON_GetObjectItem(cjson_root, "params");
cjson_relay_status = cJSON_GetObjectItem(cjson_params, "relay_status");
relay_status = cjson_relay_status->valueint;
//<2F><><EFBFBD><EFBFBD> relay_status ִ<><D6B4><EFBFBD><EFBFBD>Ӧ<EFBFBD>Ķ<EFBFBD><C4B6><EFBFBD>
if (relay_status == 0) {
HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_SET);
} else if (relay_status == 1) {
HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_RESET);
}
memset(reply_buf, 0, sizeof(reply_buf));
sprintf(reply_buf, CONTROL_REPLY_DATA, client_token);
memset(&reply_msg, 0, sizeof(reply_msg));
reply_msg.qos = QOS0;
reply_msg.payload = (void *) reply_buf;
printf("control reply:\r\n\t%s\r\n", reply_buf);
error = mqtt_publish(client, "$thing/up/property/E2IGF491FP/dev001", &reply_msg);
MQTT_LOG_D("control reply publish error is %#0x", error);
while (1) {
/* <20>ͷ<EFBFBD>cjsonʹ<6E>õĶ<C3B5>̬<EFBFBD>ڴ<EFBFBD> */
cJSON_Delete(cjson_root);
cjson_root = NULL;
method = NULL;
status = NULL;
client_token = NULL;
relay_status = 0;
return;
/* <20><><EFBFBD>õȴ<C3B5><C8B4><EFBFBD>Ϣ */
tos_mail_q_pend(&mail_q, &payload_mail, &payload_mail_size, TOS_TIME_FOREVER);
/* <20>ȴ<EFBFBD><C8B4><EFBFBD>֮<EFBFBD><D6AE><EFBFBD><EFBFBD>ʼʹ<CABC><CAB9>cjson<6F><6E><EFBFBD><EFBFBD> */
cjson_root = cJSON_Parse((char*)payload_mail.payload);
if (cjson_root == NULL) {
tos_mmheap_free(payload_mail.payload);
printf("payload message parser fail\r\n");
continue;
}
/* <20><>ȡ<EFBFBD><C8A1>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD> */
cjson_method = cJSON_GetObjectItem(cjson_root, "method");
method = cJSON_GetStringValue(cjson_method);
/* <20>ж<EFBFBD><D0B6><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>͵<EFBFBD><CDB5><EFBFBD>Ϣ */
if (strstr(method, "report_reply")) {
//<2F><><EFBFBD><EFBFBD><EFBFBD>ϱ<EFBFBD><CFB1><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD>ģ<EFBFBD><C4A3><EFBFBD>ȡstatus״ֵ̬
cjson_status = cJSON_GetObjectItem(cjson_root, "status");
status = cJSON_GetStringValue(cjson_status);
if (cjson_status == NULL || status == NULL) {
printf("report reply status parser fail\r\n");
event_flag = report_fail;
tos_mmheap_free(payload_mail.payload);
continue;
}
//<2F>ж<EFBFBD>status״̬
if (strstr(status,"success")) {
event_flag = report_success;
}else {
event_flag = report_fail;
}
//<2F><><EFBFBD>ѵȴ<D1B5><C8B4><EFBFBD><EFBFBD><EFBFBD>
tos_event_post(&report_result_event, event_flag);
} else if (strstr(method, "control")) {
//<2F>յ<EFBFBD>ƽ̨<C6BD>·<EFBFBD><C2B7>Ŀ<EFBFBD><C4BF>Ʊ<EFBFBD><C6B1>ģ<EFBFBD><C4A3><EFBFBD>ȡclient_token<65><6E><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϱ<EFBFBD><CFB1><EFBFBD>Ӧ
cjson_client_token = cJSON_GetObjectItem(cjson_root, "clientToken");
client_token = cJSON_GetStringValue(cjson_client_token);
printf("parse client token:%s\r\n", client_token);
//<2F><>ȡ relay_status
cjson_params = cJSON_GetObjectItem(cjson_root, "params");
cjson_relay_status = cJSON_GetObjectItem(cjson_params, "relay_status");
relay_status = cjson_relay_status->valueint;
//<2F><><EFBFBD><EFBFBD> relay_status ִ<><D6B4><EFBFBD><EFBFBD>Ӧ<EFBFBD>Ķ<EFBFBD><C4B6><EFBFBD>
if (relay_status == 0) {
HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_SET);
} else if (relay_status == 1) {
HAL_GPIO_WritePin(RELAY_GPIO_Port, RELAY_Pin, GPIO_PIN_RESET);
}
memset(reply_buf, 0, sizeof(reply_buf));
sprintf(reply_buf, CONTROL_REPLY_DATA, client_token);
memset(&reply_msg, 0, sizeof(reply_msg));
reply_msg.qos = QOS0;
reply_msg.payload = (void *) reply_buf;
printf("control reply:\r\n\t%s\r\n", reply_buf);
tos_mutex_pend(&mqtt_publish_mutex);
error = mqtt_publish(payload_mail.client, "$thing/up/property/E2IGF491FP/dev001", &reply_msg);
MQTT_LOG_D("control reply publish error is %#0x", error);
tos_mutex_post(&mqtt_publish_mutex);
}
tos_mmheap_free(payload_mail.payload);
}
exit:
cJSON_Delete(cjson_root);
cjson_root = NULL;
status = NULL;
tos_event_post(&report_result_event, event_flag);
return;
}
void mqttclient_task(void)
{
int error;
@@ -213,6 +263,14 @@ void mqttclient_task(void)
HAL_GPIO_WritePin(LED_G_GPIO_Port, LED_G_Pin, GPIO_PIN_RESET);
}
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E4A3AC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݽ<EFBFBD><DDBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
tos_mutex_create(&mqtt_publish_mutex);
tos_mail_q_create(&mail_q, mail_pool, MESSAGE_MAX, sizeof(k_mail_q_t));
tos_task_create(&payload_parser_task, "payload_parser_task",
msg_payload_parser_task, NULL, PAYLOAD_PARSER_TASK_PRIO,
payload_parser_task_stack, PAYLOAD_PARSER_TASK_SIZE, 10);
error = mqtt_subscribe(client, "$thing/down/property/E2IGF491FP/dev001", QOS0, tos_topic_handler);
MQTT_LOG_D("mqtt subscribe error is %#0x", error);
@@ -230,10 +288,14 @@ void mqttclient_task(void)
msg.qos = QOS0;
msg.payload = (void *) report_buf;
tos_mutex_pend(&mqtt_publish_mutex);
error = mqtt_publish(client, "$thing/up/property/E2IGF491FP/dev001", &msg);
MQTT_LOG_D("data report publish error is %#0x", error);
tos_mutex_post(&mqtt_publish_mutex);
tos_event_pend(&report_result_event,
report_success|report_fail,
&match_flag,

View File

@@ -21,6 +21,8 @@
#define TOS_CFG_MUTEX_EN 1u
#define TOS_CFG_MAIL_QUEUE_EN 1u
#define TOS_CFG_TIMER_EN 1u
#define TOS_CFG_SEM_EN 1u