update demo mqtt to mqttclient

This commit is contained in:
Chen Han
2022-06-21 18:13:09 +00:00
committed by GitHub
parent 5a8f31a2b5
commit 4c4ac49b76
2 changed files with 88 additions and 69 deletions

View File

@@ -1,8 +1,9 @@
#include "cmsis_os.h"
#include "socket_wrapper.h"
#include "sal_module_wrapper.h"
#include "mqtt_wrapper.h"
// #include "mqtt_wrapper.h"
#include "mqtt_config.h"
#include "mqttclient.h"
#include <stdio.h>
@@ -13,84 +14,73 @@ int sock_id = 0;
void mqtt_publisher(void *pdata);
osThreadDef(mqtt_publisher, osPriorityNormal, 1, MQTT_PUBLISHER_STK_SIZE);
//mqtt_reciever
#define MQTT_RECIEVER_STK_SIZE 1024
void mqtt_reciever(void *pdata);
osThreadDef(mqtt_reciever, osPriorityNormal, 1, MQTT_RECIEVER_STK_SIZE);
static void tos_topic_handler(void* client, message_data_t* msg)
{
(void)client;
MQTT_LOG_I("-----------------------------------------------------------------------------------");
MQTT_LOG_I("%s:%d %s()...\ntopic: %s, qos: %d. \nmessage:\n\t%s\n", __FILE__, __LINE__, __FUNCTION__,
msg->topic_name, msg->message->qos, (char *)msg->message->payload);
MQTT_LOG_I("-----------------------------------------------------------------------------------\n");
}
void mqtt_publisher(void *pdata)
{
mqtt_con_opt_t con_param;
con_param.keep_alive_interval = 2000;
con_param.cleansession = 1;
con_param.username = MQTT_USR_NAME;
con_param.password = MQTT_PASSWORD;
con_param.client_id = MQTT_CLIENT_ID;
int error;
char buf[100] = {0};
mqtt_client_t *client = NULL;
mqtt_message_t msg;
mqtt_pub_opt_t pub_param;
pub_param.dup = 0;
pub_param.qos = 0;
pub_param.retained = 0;
pub_param.id = 0;
pub_param.payload = "hello tencent cloud";
pub_param.payload_len = 20;
pub_param.topic = MQTT_PUBLISH_TOPIC;
memset(&msg, 0, sizeof(msg));
mqtt_sub_opt_t sub_param;
sub_param.count = 1;
sub_param.dup = 0;
sub_param.id = 0;
sub_param.req_qos = 0;
sub_param.topic = MQTT_SUBSCRIBE_TOPIC;
mqtt_log_init();
client = mqtt_lease();
printf("start connect\n");
tos_sal_module_register(get_socket_module());
tos_sal_module_init();
mqtt_set_port(client, MQTT_SERVER_PORT);
mqtt_set_host(client, MQTT_SERVER_IP);
mqtt_set_client_id(client, MQTT_CLIENT_ID);
mqtt_set_user_name(client, MQTT_USR_NAME);
mqtt_set_password(client, MQTT_PASSWORD);
mqtt_set_clean_session(client, 1);
sock_id = tos_mqtt_connect(MQTT_SERVER_IP, MQTT_SERVER_PORT, &con_param);
if (sock_id == -1)
{
printf("connect failed!!!\n");
return -1; //to exit thread
}
printf("connect success\n");
error = mqtt_connect(client);
if (tos_mqtt_subscribe(sock_id, &sub_param) != 0)
{
printf("subscribe failed!!!\n");
}else{
printf("subscribe success\n");
}
MQTT_LOG_D("mqtt connect error is %#x", error);
osThreadCreate(osThread(mqtt_reciever), NULL); // start receive
mqtt_subscribe(client, MQTT_SUBSCRIBE_TOPIC, QOS0, tos_topic_handler);
MQTT_LOG_D("mqtt subscribe error is %#x", error);
memset(&msg, 0, sizeof(msg));
for (;;)
{
printf("\n");
printf("publish topic-->%s| data-->%s| \n", pub_param.topic, pub_param.payload);
if (tos_mqtt_publish(sock_id, &pub_param) != 0) {
printf("publish failed!!!\n");
}
osDelay(2000);
sprintf(buf, "welcome to mqttclient, this is a publish test, a rand number: %d ...", random_number());
msg.qos = QOS0;
msg.payload = (void *)buf;
error = mqtt_publish(client, MQTT_PUBLISH_TOPIC, &msg);
osDelay(4000);
}
}
void mqtt_reciever(void *pdata)
{
uint8_t read_data[100];
int8_t topic[30];
uint32_t read_len;
// void mqtt_reciever(void *pdata)
// {
// uint8_t read_data[100];
// int8_t topic[30];
// uint32_t read_len;
for (;;)
{
read_len = tos_mqtt_receive(topic, sizeof(topic), read_data, sizeof(read_data));
if (read_len >= 0)
{
printf("receive topic-->%s| data-->%s| \n", topic, read_data);
}
osDelay(100);
}
}
// for (;;)
// {
// read_len = tos_mqtt_receive(topic, sizeof(topic), read_data, sizeof(read_data));
// if (read_len >= 0)
// {
// printf("receive topic-->%s| data-->%s| \n", topic, read_data);
// }
// osDelay(100);
// }
// }
int main(void)
{