mqttclient version 1.0.2 released...
This commit is contained in:
@@ -2,7 +2,7 @@
|
|||||||
* @Author: jiejie
|
* @Author: jiejie
|
||||||
* @Github: https://github.com/jiejieTop
|
* @Github: https://github.com/jiejieTop
|
||||||
* @Date: 2019-12-09 21:31:25
|
* @Date: 2019-12-09 21:31:25
|
||||||
* @LastEditTime: 2020-03-15 01:33:01
|
* @LastEditTime: 2020-04-23 15:12:36
|
||||||
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
||||||
*/
|
*/
|
||||||
#include "mqttclient.h"
|
#include "mqttclient.h"
|
||||||
@@ -297,12 +297,15 @@ static int mqtt_deliver_message(mqtt_client_t* c, MQTTString* topic_name, mqtt_m
|
|||||||
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
|
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
|
||||||
msg_handler->handler(c, &md); /* deliver the message */
|
msg_handler->handler(c, &md); /* deliver the message */
|
||||||
rc = MQTT_SUCCESS_ERROR;
|
rc = MQTT_SUCCESS_ERROR;
|
||||||
} else
|
} else if (NULL != c->interceptor_handler) {
|
||||||
goto exit;
|
message_data_t md;
|
||||||
|
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
|
||||||
|
c->interceptor_handler(c, &md);
|
||||||
|
rc = MQTT_SUCCESS_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
memset(message->payload, 0, strlen(message->payload));
|
memset(message->payload, 0, strlen(message->payload));
|
||||||
memset(topic_name->lenstring.data, 0, topic_name->lenstring.len);
|
memset(topic_name->lenstring.data, 0, topic_name->lenstring.len);
|
||||||
exit:
|
|
||||||
|
|
||||||
RETURN_ERROR(rc);
|
RETURN_ERROR(rc);
|
||||||
}
|
}
|
||||||
@@ -448,9 +451,6 @@ static int mqtt_msg_handler_is_exist(mqtt_client_t* c, message_handlers_t *handl
|
|||||||
list_t *curr, *next;
|
list_t *curr, *next;
|
||||||
message_handlers_t *msg_handler;
|
message_handlers_t *msg_handler;
|
||||||
|
|
||||||
if ((NULL == c) || (NULL == handler))
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
if (list_is_empty(&c->msg_handler_list))
|
if (list_is_empty(&c->msg_handler_list))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
@@ -459,7 +459,8 @@ static int mqtt_msg_handler_is_exist(mqtt_client_t* c, message_handlers_t *handl
|
|||||||
|
|
||||||
/* determine whether a node already exists by mqtt topic, but wildcards are not supported */
|
/* determine whether a node already exists by mqtt topic, but wildcards are not supported */
|
||||||
if ((NULL != msg_handler->topic_filter) && (mqtt_is_topic_equals(msg_handler->topic_filter, handler->topic_filter))) {
|
if ((NULL != msg_handler->topic_filter) && (mqtt_is_topic_equals(msg_handler->topic_filter, handler->topic_filter))) {
|
||||||
LOG_W("%s:%d %s()...msg_handler->topic_filter: %s, handler->topic_filter: %s", __FILE__, __LINE__, __FUNCTION__, msg_handler->topic_filter, handler->topic_filter);
|
LOG_W("%s:%d %s()...msg_handler->topic_filter: %s, handler->topic_filter: %s",
|
||||||
|
__FILE__, __LINE__, __FUNCTION__, msg_handler->topic_filter, handler->topic_filter);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -469,6 +470,9 @@ static int mqtt_msg_handler_is_exist(mqtt_client_t* c, message_handlers_t *handl
|
|||||||
|
|
||||||
static int mqtt_msg_handlers_install(mqtt_client_t* c, message_handlers_t *handler)
|
static int mqtt_msg_handlers_install(mqtt_client_t* c, message_handlers_t *handler)
|
||||||
{
|
{
|
||||||
|
if ((NULL == c) || (NULL == handler))
|
||||||
|
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
|
||||||
|
|
||||||
if (mqtt_msg_handler_is_exist(c, handler)) {
|
if (mqtt_msg_handler_is_exist(c, handler)) {
|
||||||
mqtt_msg_handler_destory(handler);
|
mqtt_msg_handler_destory(handler);
|
||||||
RETURN_ERROR(MQTT_SUCCESS_ERROR);
|
RETURN_ERROR(MQTT_SUCCESS_ERROR);
|
||||||
@@ -578,7 +582,7 @@ static int mqtt_try_reconnect(mqtt_client_t* c)
|
|||||||
|
|
||||||
rc = mqtt_try_do_reconnect(c);
|
rc = mqtt_try_do_reconnect(c);
|
||||||
|
|
||||||
if (platform_timer_is_expired(&c->reconnect_timer)) {
|
if ((MQTT_SUCCESS_ERROR != rc) && (platform_timer_is_expired(&c->reconnect_timer))) {
|
||||||
platform_timer_cutdown(&c->reconnect_timer, c->reconnect_try_duration);
|
platform_timer_cutdown(&c->reconnect_timer, c->reconnect_try_duration);
|
||||||
if (NULL != c->reconnect_handler)
|
if (NULL != c->reconnect_handler)
|
||||||
c->reconnect_handler(c, c->reconnect_date);
|
c->reconnect_handler(c, c->reconnect_date);
|
||||||
@@ -673,6 +677,7 @@ static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
|
|||||||
|
|
||||||
if (is_nack) {
|
if (is_nack) {
|
||||||
mqtt_msg_handler_destory(msg_handler); /* subscribe topic failed, destory message handler */
|
mqtt_msg_handler_destory(msg_handler); /* subscribe topic failed, destory message handler */
|
||||||
|
LOG_D("subscribe topic failed...");
|
||||||
RETURN_ERROR(MQTT_SUBSCRIBE_NOT_ACK_ERROR);
|
RETURN_ERROR(MQTT_SUBSCRIBE_NOT_ACK_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -840,6 +845,54 @@ static int mqtt_wait_packet(mqtt_client_t* c, int packet_type, platform_timer_t*
|
|||||||
RETURN_ERROR(rc);
|
RETURN_ERROR(rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
|
||||||
|
{
|
||||||
|
int rc = MQTT_SUCCESS_ERROR;
|
||||||
|
client_state_t state;
|
||||||
|
platform_timer_t timer;
|
||||||
|
|
||||||
|
if (NULL == c)
|
||||||
|
RETURN_ERROR(MQTT_FAILED_ERROR);
|
||||||
|
|
||||||
|
if (0 == timeout_ms)
|
||||||
|
timeout_ms = c->cmd_timeout;
|
||||||
|
|
||||||
|
platform_timer_init(&timer);
|
||||||
|
platform_timer_cutdown(&timer, timeout_ms);
|
||||||
|
|
||||||
|
while (!platform_timer_is_expired(&timer)) {
|
||||||
|
state = mqtt_get_client_state(c);
|
||||||
|
if (CLIENT_STATE_CLEAN_SESSION == state) {
|
||||||
|
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
|
||||||
|
} else if (CLIENT_STATE_CONNECTED != state) {
|
||||||
|
/* mqtt not connect, need reconnect */
|
||||||
|
rc = mqtt_try_reconnect(c);
|
||||||
|
|
||||||
|
if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
|
||||||
|
RETURN_ERROR(rc);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* mqtt connected, handle mqtt packet */
|
||||||
|
rc = mqtt_packet_handle(c, &timer);
|
||||||
|
|
||||||
|
if (rc >= 0) {
|
||||||
|
/* scan ack list, destroy ack handler that have timed out or resend them */
|
||||||
|
mqtt_ack_list_scan(c);
|
||||||
|
|
||||||
|
} else if (MQTT_NOT_CONNECT_ERROR == rc) {
|
||||||
|
LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
|
||||||
|
|
||||||
|
/* reconnect timer cutdown */
|
||||||
|
platform_timer_cutdown(&c->reconnect_timer, c->reconnect_try_duration);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RETURN_ERROR(rc);
|
||||||
|
}
|
||||||
|
|
||||||
static void mqtt_yield_thread(void *arg)
|
static void mqtt_yield_thread(void *arg)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
@@ -882,9 +935,6 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
|
|||||||
if (CLIENT_STATE_CONNECTED == mqtt_get_client_state(c))
|
if (CLIENT_STATE_CONNECTED == mqtt_get_client_state(c))
|
||||||
RETURN_ERROR(MQTT_SUCCESS_ERROR);
|
RETURN_ERROR(MQTT_SUCCESS_ERROR);
|
||||||
|
|
||||||
platform_timer_init(&connect_timer);
|
|
||||||
platform_timer_cutdown(&connect_timer, c->cmd_timeout);
|
|
||||||
|
|
||||||
rc = c->network->connect(c->network);
|
rc = c->network->connect(c->network);
|
||||||
if (MQTT_SUCCESS_ERROR != rc)
|
if (MQTT_SUCCESS_ERROR != rc)
|
||||||
RETURN_ERROR(rc);
|
RETURN_ERROR(rc);
|
||||||
@@ -906,6 +956,9 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
|
|||||||
if ((len = MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)) <= 0)
|
if ((len = MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)) <= 0)
|
||||||
goto exit;
|
goto exit;
|
||||||
|
|
||||||
|
platform_timer_init(&connect_timer);
|
||||||
|
platform_timer_cutdown(&connect_timer, c->cmd_timeout);
|
||||||
|
|
||||||
/* send connect packet */
|
/* send connect packet */
|
||||||
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
|
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
|
||||||
goto exit;
|
goto exit;
|
||||||
@@ -1032,6 +1085,7 @@ int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
|
|||||||
|
|
||||||
c->reconnect_date = init->reconnect_date;
|
c->reconnect_date = init->reconnect_date;
|
||||||
c->reconnect_handler = init->reconnect_handler;
|
c->reconnect_handler = init->reconnect_handler;
|
||||||
|
c->interceptor_handler = NULL;
|
||||||
|
|
||||||
// c->network->network_params = &init->connect_params.network_params;
|
// c->network->network_params = &init->connect_params.network_params;
|
||||||
if ((rc = network_init(c->network, &init->connect_params.network_params)) < 0)
|
if ((rc = network_init(c->network, &init->connect_params.network_params)) < 0)
|
||||||
@@ -1052,10 +1106,22 @@ int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
|
|||||||
|
|
||||||
int mqtt_release(mqtt_client_t* c)
|
int mqtt_release(mqtt_client_t* c)
|
||||||
{
|
{
|
||||||
|
platform_timer_t timer;
|
||||||
|
|
||||||
if (NULL == c)
|
if (NULL == c)
|
||||||
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
|
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
|
||||||
|
|
||||||
while (CLIENT_STATE_INVALID != mqtt_get_client_state(c)); /* wait for the clean session to complete */
|
platform_timer_init(&timer);
|
||||||
|
platform_timer_cutdown(&timer, c->cmd_timeout);
|
||||||
|
|
||||||
|
/* wait for the clean session to complete */
|
||||||
|
while ((CLIENT_STATE_INVALID != mqtt_get_client_state(c))) {
|
||||||
|
// platform_timer_usleep(1000); // 1ms avoid compiler optimization.
|
||||||
|
if (platform_timer_is_expired(&timer)) {
|
||||||
|
LOG_E("%s:%d %s()... mqtt release failed...", __FILE__, __LINE__, __FUNCTION__);
|
||||||
|
RETURN_ERROR(MQTT_FAILED_ERROR)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL != c->network) {
|
if (NULL != c->network) {
|
||||||
platform_memory_free(c->network);
|
platform_memory_free(c->network);
|
||||||
@@ -1067,9 +1133,9 @@ int mqtt_release(mqtt_client_t* c)
|
|||||||
c->read_buf = NULL;
|
c->read_buf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != c->read_buf) {
|
if (NULL != c->write_buf) {
|
||||||
platform_memory_free(c->read_buf);
|
platform_memory_free(c->write_buf);
|
||||||
c->read_buf = NULL;
|
c->write_buf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(c, 0, sizeof(mqtt_client_t));
|
memset(c, 0, sizeof(mqtt_client_t));
|
||||||
@@ -1203,7 +1269,7 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg
|
|||||||
|
|
||||||
platform_mutex_lock(&c->write_lock);
|
platform_mutex_lock(&c->write_lock);
|
||||||
|
|
||||||
if (msg->qos == QOS1 || msg->qos == QOS2) {
|
if (msg->qos != QOS0) {
|
||||||
if (mqtt_ack_handler_is_maximum(c)) {
|
if (mqtt_ack_handler_is_maximum(c)) {
|
||||||
rc = MQTT_ACK_HANDLER_NUM_TOO_MUCH; /* the recorded ack handler has reached the maximum */
|
rc = MQTT_ACK_HANDLER_NUM_TOO_MUCH; /* the recorded ack handler has reached the maximum */
|
||||||
goto exit;
|
goto exit;
|
||||||
@@ -1240,50 +1306,35 @@ exit:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int mqtt_yield(mqtt_client_t* c, int timeout_ms)
|
int mqtt_list_subscribe_topic(mqtt_client_t* c)
|
||||||
{
|
{
|
||||||
int rc = MQTT_SUCCESS_ERROR;
|
int i = 0;
|
||||||
client_state_t state;
|
list_t *curr, *next;
|
||||||
platform_timer_t timer;
|
message_handlers_t *msg_handler;
|
||||||
|
|
||||||
if (NULL == c)
|
if (NULL == c)
|
||||||
RETURN_ERROR(MQTT_FAILED_ERROR);
|
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
|
||||||
|
|
||||||
if (0 == timeout_ms)
|
if (list_is_empty(&c->msg_handler_list))
|
||||||
timeout_ms = c->cmd_timeout;
|
LOG_I("%s:%d %s()... there are no subscribed topics...", __FILE__, __LINE__, __FUNCTION__);
|
||||||
|
|
||||||
platform_timer_init(&timer);
|
LIST_FOR_EACH_SAFE(curr, next, &c->msg_handler_list) {
|
||||||
platform_timer_cutdown(&timer, timeout_ms);
|
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
|
||||||
|
/* determine whether a node already exists by mqtt topic, but wildcards are not supported */
|
||||||
while (!platform_timer_is_expired(&timer)) {
|
if (NULL != msg_handler->topic_filter) {
|
||||||
state = mqtt_get_client_state(c);
|
LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__, __LINE__, __FUNCTION__, ++i ,msg_handler->topic_filter);
|
||||||
if (CLIENT_STATE_CLEAN_SESSION == state) {
|
|
||||||
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
|
|
||||||
} else if (CLIENT_STATE_CONNECTED != state) {
|
|
||||||
/* mqtt not connect, need reconnect */
|
|
||||||
rc = mqtt_try_reconnect(c);
|
|
||||||
|
|
||||||
if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
|
|
||||||
RETURN_ERROR(rc);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* mqtt connected, handle mqtt packet */
|
|
||||||
rc = mqtt_packet_handle(c, &timer);
|
|
||||||
|
|
||||||
if (rc >= 0) {
|
|
||||||
/* scan ack list, destroy ack handler that have timed out or resend them */
|
|
||||||
mqtt_ack_list_scan(c);
|
|
||||||
|
|
||||||
} else if (MQTT_NOT_CONNECT_ERROR == rc) {
|
|
||||||
LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
|
|
||||||
|
|
||||||
/* reconnect timer cutdown */
|
|
||||||
platform_timer_cutdown(&c->reconnect_timer, c->reconnect_try_duration);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RETURN_ERROR(rc);
|
RETURN_ERROR(MQTT_SUCCESS_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
int mqtt_set_interceptor_handler(mqtt_client_t* c, interceptor_handler_t handler)
|
||||||
|
{
|
||||||
|
if (NULL == handler)
|
||||||
|
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
|
||||||
|
|
||||||
|
c->interceptor_handler = handler;
|
||||||
|
|
||||||
|
RETURN_ERROR(MQTT_SUCCESS_ERROR);
|
||||||
}
|
}
|
||||||
|
@@ -2,7 +2,7 @@
|
|||||||
* @Author: jiejie
|
* @Author: jiejie
|
||||||
* @Github: https://github.com/jiejieTop
|
* @Github: https://github.com/jiejieTop
|
||||||
* @Date: 2019-12-09 21:31:25
|
* @Date: 2019-12-09 21:31:25
|
||||||
* @LastEditTime: 2020-03-15 01:12:28
|
* @LastEditTime: 2020-04-18 12:29:23
|
||||||
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
||||||
*/
|
*/
|
||||||
#ifndef _MQTTCLIENT_H_
|
#ifndef _MQTTCLIENT_H_
|
||||||
@@ -57,6 +57,7 @@ typedef struct message_data {
|
|||||||
mqtt_message_t *message;
|
mqtt_message_t *message;
|
||||||
} message_data_t;
|
} message_data_t;
|
||||||
|
|
||||||
|
typedef void (*interceptor_handler_t)(void* client, message_data_t* msg);
|
||||||
typedef void (*message_handler_t)(void* client, message_data_t* msg);
|
typedef void (*message_handler_t)(void* client, message_data_t* msg);
|
||||||
typedef void (*reconnect_handler_t)(void* client, void* reconnect_date);
|
typedef void (*reconnect_handler_t)(void* client, void* reconnect_date);
|
||||||
|
|
||||||
@@ -115,6 +116,7 @@ typedef struct mqtt_client {
|
|||||||
platform_timer_t last_sent;
|
platform_timer_t last_sent;
|
||||||
platform_timer_t last_received;
|
platform_timer_t last_received;
|
||||||
connect_params_t *connect_params;
|
connect_params_t *connect_params;
|
||||||
|
interceptor_handler_t interceptor_handler;
|
||||||
} mqtt_client_t;
|
} mqtt_client_t;
|
||||||
|
|
||||||
typedef struct client_init_params{
|
typedef struct client_init_params{
|
||||||
@@ -135,8 +137,8 @@ int mqtt_disconnect(mqtt_client_t* c);
|
|||||||
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
|
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
|
||||||
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
|
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
|
||||||
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
|
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
|
||||||
int mqtt_yield(mqtt_client_t* c, int timeout_ms);
|
int mqtt_list_subscribe_topic(mqtt_client_t* c);
|
||||||
|
int mqtt_set_interceptor_handler(mqtt_client_t* c, interceptor_handler_t handler);
|
||||||
|
|
||||||
|
|
||||||
#endif /* _MQTTCLIENT_H_ */
|
#endif /* _MQTTCLIENT_H_ */
|
||||||
|
@@ -2,7 +2,7 @@
|
|||||||
* @Author: jiejie
|
* @Author: jiejie
|
||||||
* @Github: https://github.com/jiejieTop
|
* @Github: https://github.com/jiejieTop
|
||||||
* @Date: 2019-12-11 21:53:07
|
* @Date: 2019-12-11 21:53:07
|
||||||
* @LastEditTime : 2020-01-18 13:54:38
|
* @LastEditTime: 2020-04-23 15:03:32
|
||||||
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
||||||
*/
|
*/
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
@@ -17,12 +17,7 @@ extern const char *test_ca_get();
|
|||||||
mqtt_client_t client;
|
mqtt_client_t client;
|
||||||
client_init_params_t init_params;
|
client_init_params_t init_params;
|
||||||
|
|
||||||
// static void reconnect_handler(void* client, void* reconnect_date)
|
static void topic1_handler(void* client, message_data_t* msg)
|
||||||
// {
|
|
||||||
// LOG_E("%s:%d %s()...mqtt is reconnecting, reconnect_date is %s", __FILE__, __LINE__, __FUNCTION__, (char*)reconnect_date);
|
|
||||||
// }
|
|
||||||
|
|
||||||
static void topic_test1_handler(void* client, message_data_t* msg)
|
|
||||||
{
|
{
|
||||||
(void) client;
|
(void) client;
|
||||||
LOG_I("-----------------------------------------------------------------------------------");
|
LOG_I("-----------------------------------------------------------------------------------");
|
||||||
@@ -30,38 +25,32 @@ static void topic_test1_handler(void* client, message_data_t* msg)
|
|||||||
LOG_I("-----------------------------------------------------------------------------------");
|
LOG_I("-----------------------------------------------------------------------------------");
|
||||||
}
|
}
|
||||||
|
|
||||||
// void *mqtt_unsubscribe_thread(void *arg)
|
|
||||||
// {
|
|
||||||
// sleep(2);
|
|
||||||
// mqtt_unsubscribe(&client, "test");
|
|
||||||
|
|
||||||
// // sleep(10);
|
|
||||||
// mqtt_disconnect(&client);
|
|
||||||
|
|
||||||
// sleep(2);
|
|
||||||
|
|
||||||
// mqtt_connect(&client);
|
|
||||||
|
|
||||||
// pthread_exit(NULL);
|
|
||||||
// }
|
|
||||||
|
|
||||||
void *mqtt_publish_thread(void *arg)
|
void *mqtt_publish_thread(void *arg)
|
||||||
{
|
{
|
||||||
char buf[80] = { 0 };
|
char buf[100] = { 0 };
|
||||||
mqtt_message_t msg;
|
mqtt_message_t msg;
|
||||||
memset(&msg, 0, sizeof(msg));
|
memset(&msg, 0, sizeof(msg));
|
||||||
sprintf(buf, "welcome to mqttclient, this is a publish test...");
|
sprintf(buf, "welcome to mqttclient, this is a publish test...");
|
||||||
|
|
||||||
msg.qos = 2;
|
|
||||||
msg.payload = (void *) buf;
|
|
||||||
// msg.payloadlen = strlen(buf);
|
|
||||||
while(1) {
|
|
||||||
mqtt_publish(&client, "testtopic1-acer3", &msg);
|
|
||||||
mqtt_publish(&client, "testtopic2-acer3", &msg);
|
|
||||||
mqtt_publish(&client, "testtopic3-acer3", &msg);
|
|
||||||
mqtt_publish(&client, "testtopic4-acer3", &msg);
|
|
||||||
// LOG_I("random_number is %d",random_number());
|
|
||||||
sleep(2);
|
sleep(2);
|
||||||
|
|
||||||
|
mqtt_list_subscribe_topic(&client);
|
||||||
|
|
||||||
|
msg.payload = (void *) buf;
|
||||||
|
|
||||||
|
while(1) {
|
||||||
|
sprintf(buf, "welcome to mqttclient, this is a publish test, a rand number: %d ...", random_number());
|
||||||
|
|
||||||
|
msg.qos = 0;
|
||||||
|
mqtt_publish(&client, "topic1", &msg);
|
||||||
|
|
||||||
|
msg.qos = 1;
|
||||||
|
mqtt_publish(&client, "topic2", &msg);
|
||||||
|
|
||||||
|
msg.qos = 2;
|
||||||
|
mqtt_publish(&client, "topic3", &msg);
|
||||||
|
|
||||||
|
sleep(4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,40 +60,33 @@ int main(void)
|
|||||||
// pthread_t thread1;
|
// pthread_t thread1;
|
||||||
pthread_t thread2;
|
pthread_t thread2;
|
||||||
|
|
||||||
init_params.read_buf_size = 1024;
|
printf("\nwelcome to mqttclient test...\n");
|
||||||
init_params.write_buf_size = 1024;
|
|
||||||
// init_params.reconnect_date = "this is a test";
|
|
||||||
// init_params.reconnect_handler = reconnect_handler;
|
|
||||||
init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get();
|
|
||||||
init_params.connect_params.network_params.addr = "www.jiejie01.top"; //"47.95.164.112";//"jiejie01.top"; //"129.204.201.235"; //"192.168.1.101";
|
|
||||||
init_params.connect_params.network_params.port = "8883";
|
|
||||||
init_params.connect_params.user_name = random_string(10); // random_string(10); //"jiejietop-acer1";
|
|
||||||
init_params.connect_params.password = random_string(10);; //random_string(10); // "123456";
|
|
||||||
init_params.connect_params.client_id = random_string(10);; //random_string(10); // "clientid-acer1";
|
|
||||||
init_params.connect_params.clean_session = 1;
|
|
||||||
|
|
||||||
log_init();
|
log_init();
|
||||||
|
|
||||||
|
init_params.read_buf_size = 1024;
|
||||||
|
init_params.write_buf_size = 1024;
|
||||||
|
|
||||||
|
#ifdef MQTT_NETWORK_TYPE_TLS
|
||||||
|
init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get();
|
||||||
|
init_params.connect_params.network_params.port = "8883";
|
||||||
|
#else
|
||||||
|
init_params.connect_params.network_params.port = "1883";
|
||||||
|
#endif
|
||||||
|
init_params.connect_params.network_params.addr = "www.jiejie01.top"; //"47.95.164.112";//"jiejie01.top"; //"129.204.201.235"; //"192.168.1.101";
|
||||||
|
|
||||||
|
init_params.connect_params.user_name = random_string(10); // random_string(10); //"jiejietop-acer1";
|
||||||
|
init_params.connect_params.password = random_string(10); //random_string(10); // "123456";
|
||||||
|
init_params.connect_params.client_id = random_string(10); //random_string(10); // "clientid-acer1";
|
||||||
|
init_params.connect_params.clean_session = 1;
|
||||||
|
|
||||||
mqtt_init(&client, &init_params);
|
mqtt_init(&client, &init_params);
|
||||||
|
|
||||||
mqtt_connect(&client);
|
mqtt_connect(&client);
|
||||||
|
|
||||||
LOG_D("mqtt connect success...");
|
mqtt_subscribe(&client, "topic1", QOS0, topic1_handler);
|
||||||
|
mqtt_subscribe(&client, "topic2", QOS1, NULL);
|
||||||
mqtt_subscribe(&client, "testtopic1-acer3", QOS2, topic_test1_handler);
|
mqtt_subscribe(&client, "topic3", QOS2, NULL);
|
||||||
mqtt_subscribe(&client, "testtopic2-acer3", QOS2, NULL);
|
|
||||||
mqtt_subscribe(&client, "testtopic3-acer3", QOS2, NULL);
|
|
||||||
mqtt_subscribe(&client, "testtopic4-acer3", QOS2, NULL);
|
|
||||||
mqtt_subscribe(&client, "testtopic5-acer3", QOS1, NULL);
|
|
||||||
mqtt_subscribe(&client, "testtopic6-acer3", QOS2, NULL);
|
|
||||||
mqtt_subscribe(&client, "testtopic7-acer3", QOS0, NULL);
|
|
||||||
|
|
||||||
// LOG_E("create mqtt publish thread fail");
|
|
||||||
// res = pthread_create(&thread1, NULL, mqtt_unsubscribe_thread, NULL);
|
|
||||||
// if(res != 0) {
|
|
||||||
// LOG_I("create thread2 fail");
|
|
||||||
// exit(res);
|
|
||||||
// }
|
|
||||||
|
|
||||||
res = pthread_create(&thread2, NULL, mqtt_publish_thread, NULL);
|
res = pthread_create(&thread2, NULL, mqtt_publish_thread, NULL);
|
||||||
if(res != 0) {
|
if(res != 0) {
|
||||||
|
@@ -2,41 +2,42 @@
|
|||||||
* @Author: jiejie
|
* @Author: jiejie
|
||||||
* @Github: https://github.com/jiejieTop
|
* @Github: https://github.com/jiejieTop
|
||||||
* @Date: 2020-01-12 10:51:11
|
* @Date: 2020-01-12 10:51:11
|
||||||
* @LastEditTime : 2020-01-16 00:18:45
|
* @LastEditTime: 2020-03-21 21:07:24
|
||||||
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
|
||||||
*/
|
*/
|
||||||
static const char *test_ca_crt = {
|
static const char *test_ca_crt = {
|
||||||
"-----BEGIN CERTIFICATE-----\r\n"
|
"-----BEGIN CERTIFICATE-----\r\n"
|
||||||
"MIIFlzCCBH+gAwIBAgIQBf0c/dicZUkWCJnzWyxOuDANBgkqhkiG9w0BAQsFADBy\r\n"
|
"MIIFrTCCBJWgAwIBAgIQDApSGhCHMtFicAaXgcO0vjANBgkqhkiG9w0BAQsFADBy\r\n"
|
||||||
"MQswCQYDVQQGEwJDTjElMCMGA1UEChMcVHJ1c3RBc2lhIFRlY2hub2xvZ2llcywg\r\n"
|
"MQswCQYDVQQGEwJDTjElMCMGA1UEChMcVHJ1c3RBc2lhIFRlY2hub2xvZ2llcywg\r\n"
|
||||||
"SW5jLjEdMBsGA1UECxMURG9tYWluIFZhbGlkYXRlZCBTU0wxHTAbBgNVBAMTFFRy\r\n"
|
"SW5jLjEdMBsGA1UECxMURG9tYWluIFZhbGlkYXRlZCBTU0wxHTAbBgNVBAMTFFRy\r\n"
|
||||||
"dXN0QXNpYSBUTFMgUlNBIENBMB4XDTE5MDMyOTAwMDAwMFoXDTIwMDMyODEyMDAw\r\n"
|
"dXN0QXNpYSBUTFMgUlNBIENBMB4XDTIwMDEyOTAwMDAwMFoXDTIxMDQyODEyMDAw\r\n"
|
||||||
"MFowGzEZMBcGA1UEAxMQd3d3LmppZWppZTAxLnRvcDCCASIwDQYJKoZIhvcNAQEB\r\n"
|
"MFowGzEZMBcGA1UEAxMQd3d3LmppZWppZTAxLnRvcDCCASIwDQYJKoZIhvcNAQEB\r\n"
|
||||||
"BQADggEPADCCAQoCggEBAKV5ot0wC7L2sa2ABzLDaYYmxbX61FpHYscY8R1R8nUQ\r\n"
|
"BQADggEPADCCAQoCggEBAKIDMo45w085LaCMG1LWY5b8V94zDqdt+weVhKolgsLZ\r\n"
|
||||||
"pRfbj8eCNcg6DZf85OX/sSmtqRnk2Z+Nu9ML5KO5sSvVCr/eL0bW8jqlQ39FuyoR\r\n"
|
"htAQTDrafBx1sNJtOpa8ADeQkFbWOTEy3tgViOBvBr+8Qhl6vYsESJrg7DXeCVRm\r\n"
|
||||||
"vsLkUP8iXIkHyEulPagYK8yFNCf0eR0c/SQO0U+UI4JrVOtwFHV/y8IbZ9pEboBH\r\n"
|
"04pk+cFrdWYRE70AUz8RXRuaWLv1Fu2L+qlymrnZB/WBJFnETINh6yzqY8FNETUV\r\n"
|
||||||
"2II2OYC8azavFoUQXVn+niQiYJb9KGN6Jz4mCgVeGXfoKjW5qtQuCtj0rdpDx+UX\r\n"
|
"EL08eE0LoXt/4b7iAJYEFRYTyBKjLpkr04e92SQIuL/l42j92lYNOjYfYOlpWZUC\r\n"
|
||||||
"JWP5ktLCCfV5Ke+Vb0Ovahr8kASremR/XSh+K8pP+94uxrMp79wc4YVEiMbKEb8j\r\n"
|
"cp4WHXEDd1YXdTMmXpfsU3VlYS3RTusOFsgXpLFET9xGRvtDSu5qw6rPGf/y/PhT\r\n"
|
||||||
"PVnOJwF7d7bjOU2M0baq5ouxE1xl8c2xIc8zABIrUuMCAwEAAaOCAn4wggJ6MB8G\r\n"
|
"1LFB/xlL2E6Rpo/6VWuQ8A5rA+H3D1I/fIBB97orMYUCAwEAAaOCApQwggKQMB8G\r\n"
|
||||||
"A1UdIwQYMBaAFH/TmfOgRw4xAFZWIo63zJ7dygGKMB0GA1UdDgQWBBS4EllqeVPA\r\n"
|
"A1UdIwQYMBaAFH/TmfOgRw4xAFZWIo63zJ7dygGKMB0GA1UdDgQWBBQyzyOTN1l5\r\n"
|
||||||
"jt5ZiROEesnBWP5vDTApBgNVHREEIjAgghB3d3cuamllamllMDEudG9wggxqaWVq\r\n"
|
"Rg1Ih1tQ0TPYMqfw+jApBgNVHREEIjAgghB3d3cuamllamllMDEudG9wggxqaWVq\r\n"
|
||||||
"aWUwMS50b3AwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggr\r\n"
|
"aWUwMS50b3AwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggr\r\n"
|
||||||
"BgEFBQcDAjBMBgNVHSAERTBDMDcGCWCGSAGG/WwBAjAqMCgGCCsGAQUFBwIBFhxo\r\n"
|
"BgEFBQcDAjBMBgNVHSAERTBDMDcGCWCGSAGG/WwBAjAqMCgGCCsGAQUFBwIBFhxo\r\n"
|
||||||
"dHRwczovL3d3dy5kaWdpY2VydC5jb20vQ1BTMAgGBmeBDAECATB9BggrBgEFBQcB\r\n"
|
"dHRwczovL3d3dy5kaWdpY2VydC5jb20vQ1BTMAgGBmeBDAECATCBkgYIKwYBBQUH\r\n"
|
||||||
"AQRxMG8wIQYIKwYBBQUHMAGGFWh0dHA6Ly9vY3NwLmRjb2NzcC5jbjBKBggrBgEF\r\n"
|
"AQEEgYUwgYIwNAYIKwYBBQUHMAGGKGh0dHA6Ly9zdGF0dXNlLmRpZ2l0YWxjZXJ0\r\n"
|
||||||
"BQcwAoY+aHR0cDovL2NhY2VydHMuZGlnaXRhbGNlcnR2YWxpZGF0aW9uLmNvbS9U\r\n"
|
"dmFsaWRhdGlvbi5jb20wSgYIKwYBBQUHMAKGPmh0dHA6Ly9jYWNlcnRzLmRpZ2l0\r\n"
|
||||||
"cnVzdEFzaWFUTFNSU0FDQS5jcnQwCQYDVR0TBAIwADCCAQQGCisGAQQB1nkCBAIE\r\n"
|
"YWxjZXJ0dmFsaWRhdGlvbi5jb20vVHJ1c3RBc2lhVExTUlNBQ0EuY3J0MAkGA1Ud\r\n"
|
||||||
"gfUEgfIA8AB2ALvZ37wfinG1k5Qjl6qSe0c4V5UKq1LoGpCWZDaOHtGFAAABacpL\r\n"
|
"EwQCMAAwggEEBgorBgEEAdZ5AgQCBIH1BIHyAPAAdQB9PvL4j/+IVWgkwsDKnlKJ\r\n"
|
||||||
"x9kAAAQDAEcwRQIgbNEvEkLAgPDhqZGrXqV3rmByWiQ5CgTUnRo8vRKYH7ICIQDS\r\n"
|
"eSvFDngJfy5ql2iZfiLw1wAAAW/wKSTgAAAEAwBGMEQCIGFq0FdvZfXf4lV20Am1\r\n"
|
||||||
"V+bNpM7cOoPADyZsmRDxdu+kUfkhD3csdkALmMyobAB2AId1v+dZfPiMQ5lfvfNu\r\n"
|
"HRP6F7wxzkesK0r1566sNqvxAiBp5W3iTLWEgeJa/PfH5hX/d+K5CIyXScLa4qqa\r\n"
|
||||||
"/1aNR1Y2/0q1YMG06v9eoIMPAAABacpLyO0AAAQDAEcwRQIgP69W3D/iVIaVFVc/\r\n"
|
"MCLHwwB3AFzcQ5L+5qtFRLFemtRW5hA3+9X6R9yhc5SyXub2xw7KAAABb/ApJJIA\r\n"
|
||||||
"+I98YB083woBDWfquBpZq2dXA5ECIQDtP465d8Q0UuNK1gRYCZOe1Daexdndkt2g\r\n"
|
"AAQDAEgwRgIhANEsjShyRf0GGpwJ6ZTQKBHo933rlSpaNIvor7cG8RBQAiEAkeDf\r\n"
|
||||||
"8zServG3HjANBgkqhkiG9w0BAQsFAAOCAQEADgP0Q4vUgYAR6e98x/tP12Rpsqkt\r\n"
|
"7+n+zyEGZUMOYI0E0R2chjPBJGvtw1yD12sxekowDQYJKoZIhvcNAQELBQADggEB\r\n"
|
||||||
"l0Mbpr12lqSDmH2QfmoWRIRmyRrbpR2fkPKikrib3ezTmSJI70saELY8ceLZiaQu\r\n"
|
"AIzaZ5X1So+xVe2JWkMfmJA8IQhdp9WghCDLRORcIggcY9BtYxFSBdusxIa1bhdt\r\n"
|
||||||
"Vq6zT/8xXDUcWJbeRgUVo8J3b20oxJLLsf8QmmEMdycKsUJv0iybQq9CCqay8Tq3\r\n"
|
"rCY6RoepCwTrhV9PotwMgZtSOu8szHHRlqX8zNUhIh628yzPSTDDZ4xgeJvlGAkT\r\n"
|
||||||
"mGJ2bc6zAqWUjcugLSvh/c6QPp9d8UGyVre/UJFbGVpk8swkhXFTypUVcm+TNqzW\r\n"
|
"Zlv0XrezkDLRZkKN9R6KX1ccaPNbn9PD6SMtpHPbE7UEZYfrV3wAJhFzsyhp2JF4\r\n"
|
||||||
"A3kKC3k5Sk+l1CTNHgQTO7+T+/9anzLuyf/FNTO3WLav/J/FobMbXycqbwYemisk\r\n"
|
"KLLNPaeDgDM3Lu6tUm/bznDEyxi7/ZoR+7fSQAMF5Jo1ysKUAOC00I9Ne+7eSbTh\r\n"
|
||||||
"kS9xodt6dEDc5wf8nWlUOHzU/n0WCjr4Cgl1gRdTRFwy+cLEhLMZOu4vjQ==\r\n"
|
"flV//8NBN+Z2ShCV0uFedf6ugDUMOuOUCtp0c7N+sM1IVE5MOhLRDAGRIUyi7/43\r\n"
|
||||||
|
"dJ0Okust1fXo4UTDGJtyp30=\r\n"
|
||||||
"-----END CERTIFICATE-----\r\n"
|
"-----END CERTIFICATE-----\r\n"
|
||||||
"-----BEGIN CERTIFICATE-----\r\n"
|
"-----BEGIN CERTIFICATE-----\r\n"
|
||||||
"MIIErjCCA5agAwIBAgIQBYAmfwbylVM0jhwYWl7uLjANBgkqhkiG9w0BAQsFADBh\r\n"
|
"MIIErjCCA5agAwIBAgIQBYAmfwbylVM0jhwYWl7uLjANBgkqhkiG9w0BAQsFADBh\r\n"
|
||||||
|
Reference in New Issue
Block a user