the push is modified as follows:

1. delete some junk files.
2. mqttclient added comments and updated to v1.0.2.
3. update README.md.
4. update the author's own server certificate.
5. minor changes to salof.
This commit is contained in:
jiejieTop
2020-03-21 21:47:08 +08:00
parent c0cb4460b7
commit 476e0dec02
43 changed files with 1100 additions and 15612 deletions

View File

@@ -0,0 +1,81 @@
/*
* @Author: jiejie
* @Github: https://github.com/jiejieTop
* @Date: 2020-02-25 03:36:09
* @LastEditTime: 2020-02-25 07:16:43
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
*/
#ifndef _DEFCONFIG_H_
#define _DEFCONFIG_H_
#include "mqtt_config.h"
#ifndef MQTT_MAX_PACKET_ID
#define MQTT_MAX_PACKET_ID (0xFFFF - 1)
#endif // !MQTT_MAX_PACKET_ID
#ifndef MQTT_TOPIC_LEN_MAX
#define MQTT_TOPIC_LEN_MAX 64
#endif // !MQTT_TOPIC_LEN_MAX
#ifndef MQTT_ACK_HANDLER_NUM_MAX
#define MQTT_ACK_HANDLER_NUM_MAX 64
#endif // !MQTT_ACK_HANDLER_NUM_MAX
#ifndef MQTT_DEFAULT_BUF_SIZE
#define MQTT_DEFAULT_BUF_SIZE 1024
#endif // !MQTT_DEFAULT_BUF_SIZE
#ifndef MQTT_DEFAULT_CMD_TIMEOUT
#define MQTT_DEFAULT_CMD_TIMEOUT 4000
#endif // !MQTT_DEFAULT_CMD_TIMEOUT
#ifndef MQTT_MAX_CMD_TIMEOUT
#define MQTT_MAX_CMD_TIMEOUT 20000
#endif // !MQTT_MAX_CMD_TIMEOUT
#ifndef MQTT_MIN_CMD_TIMEOUT
#define MQTT_MIN_CMD_TIMEOUT 1000
#endif // !MQTT_MIN_CMD_TIMEOUT
#ifndef MQTT_KEEP_ALIVE_INTERVAL
#define MQTT_KEEP_ALIVE_INTERVAL 100 // unit: second
#endif // !MQTT_KEEP_ALIVE_INTERVAL
#ifndef MQTT_VERSION
#define MQTT_VERSION 4 // 4 is mqtt 3.1.1
#endif // !MQTT_VERSION
#ifndef MQTT_RECONNECT_DEFAULT_DURATION
#define MQTT_RECONNECT_DEFAULT_DURATION 1000
#endif // !MQTT_RECONNECT_DEFAULT_DURATION
#ifndef MQTT_THREAD_STACK_SIZE
#define MQTT_THREAD_STACK_SIZE 4096
#endif // !MQTT_THREAD_STACK_SIZE
#ifndef MQTT_THREAD_PRIO
#define MQTT_THREAD_PRIO 5
#endif // !MQTT_THREAD_PRIO
#ifndef MQTT_THREAD_TICK
#define MQTT_THREAD_TICK 50
#endif // !MQTT_THREAD_TICK
#ifdef MQTT_NETWORK_TYPE_TLS
#ifndef MQTT_TLS_HANDSHAKE_TIMEOUT
#define MQTT_TLS_HANDSHAKE_TIMEOUT (5 * 1000)
#endif // !MQTT_TLS_HANDSHAKE_TIMEOUT
#include "mbedtls/ssl.h"
#include "mbedtls/entropy.h"
#include "mbedtls/net_sockets.h"
#include "mbedtls/ctr_drbg.h"
#include "mbedtls/error.h"
#include "mbedtls/debug.h"
#endif /* MQTT_NETWORK_TYPE_TLS */
#endif /* _DEFCONFIG_H_ */

View File

@@ -2,7 +2,7 @@
* @Author: jiejie
* @Github: https://github.com/jiejieTop
* @Date: 2019-12-09 21:31:25
* @LastEditTime : 2020-02-16 02:50:32
* @LastEditTime: 2020-03-15 01:33:01
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
*/
#include "mqttclient.h"
@@ -28,22 +28,35 @@ static void mqtt_set_client_state(mqtt_client_t* c, client_state_t state)
platform_mutex_unlock(&c->global_lock);
}
static int mqtt_is_connected(mqtt_client_t* c)
{
client_state_t state;
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CLEAN_SESSION == state) {
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
} else if (CLIENT_STATE_CONNECTED != state) {
RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
}
RETURN_ERROR(MQTT_SUCCESS_ERROR);
}
static int mqtt_set_publish_dup(mqtt_client_t* c, unsigned char dup)
{
unsigned char *read_data = c->write_buf;
unsigned char *write_data = c->write_buf;
MQTTHeader header = {0};
MQTTHeader header = {0};
if (NULL == c->write_buf)
RETURN_ERROR(MQTT_SET_PUBLISH_DUP_FAILED);
header.byte = readChar(&read_data); /* read header */
header.byte = readChar(&read_data); /* read header */
if (header.bits.type != PUBLISH)
RETURN_ERROR(MQTT_SET_PUBLISH_DUP_FAILED);
header.bits.dup = dup;
writeChar(&write_data, header.byte); /* write header */
header.bits.dup = dup;
writeChar(&write_data, header.byte); /* write header */
RETURN_ERROR(MQTT_SUCCESS_ERROR);
}
@@ -98,10 +111,10 @@ static int mqtt_decode_packet(mqtt_client_t* c, int* value, int timeout)
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = c->network->read(c->network, &i, 1, timeout);
rc = c->network->read(c->network, &i, 1, timeout); /* read network data */
if (rc != 1)
goto exit;
*value += (i & 127) * multiplier;
*value += (i & 127) * multiplier; /* decode data length according to mqtt protocol */
multiplier *= 128;
} while ((i & 128) != 0);
exit:
@@ -128,7 +141,7 @@ static void mqtt_packet_drain(mqtt_client_t* c, platform_timer_t *timer, int pac
bytes2read = packet_len - total_bytes_read;
}
}
} while ((total_bytes_read < packet_len) && (0 != read_len));
} while ((total_bytes_read < packet_len) && (0 != read_len)); /* read and discard all corrupted data */
}
static int mqtt_read_packet(mqtt_client_t* c, int* packet_type, platform_timer_t* timer)
@@ -141,6 +154,9 @@ static int mqtt_read_packet(mqtt_client_t* c, int* packet_type, platform_timer_t
if (NULL == packet_type)
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
platform_timer_init(timer);
platform_timer_cutdown(timer, c->cmd_timeout);
/* 1. read the header byte. This has the packet type in it */
rc = c->network->read(c->network, c->read_buf, len, platform_timer_remain(timer));
if (rc != len)
@@ -153,8 +169,11 @@ static int mqtt_read_packet(mqtt_client_t* c, int* packet_type, platform_timer_t
len += MQTTPacket_encode(c->read_buf + len, remain_len);
if ((len + remain_len) > c->read_buf_size) {
/* mqtt buffer is too short, read and discard all corrupted data */
mqtt_packet_drain(c, timer, remain_len);
RETURN_ERROR(MQTT_BUFFER_TOO_SHORT_ERROR);
RETURN_ERROR(MQTT_BUFFER_TOO_SHORT_ERROR);
}
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
@@ -174,9 +193,13 @@ static int mqtt_send_packet(mqtt_client_t* c, int length, platform_timer_t* time
int len = 0;
int sent = 0;
platform_timer_init(timer);
platform_timer_cutdown(timer, c->cmd_timeout);
/* send mqtt packet in a blocking manner or exit when it timer is expired */
while ((sent < length) && (!platform_timer_is_expired(timer))) {
len = c->network->write(c->network, &c->write_buf[sent], length, platform_timer_remain(timer));
if (len < 0) // there was an error writing the data
if (len <= 0) // there was an error writing the data
break;
sent += len;
}
@@ -215,8 +238,11 @@ static char mqtt_topic_is_matched(char* topic_filter, MQTTString* topic_name)
{
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
/* support wildcards for MQTT topics, such as '#' '+' */
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+') {
char* nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/')
@@ -236,7 +262,7 @@ static void mqtt_new_message_data(message_data_t* md, MQTTString* topic_name, mq
int len;
len = (topic_name->lenstring.len < MQTT_TOPIC_LEN_MAX - 1) ? topic_name->lenstring.len : MQTT_TOPIC_LEN_MAX - 1;
memcpy(md->topic_name, topic_name->lenstring.data, len);
md->topic_name[len] = '\0';
md->topic_name[len] = '\0'; /* the topic name is too long and will be truncated */
md->message = message;
}
@@ -245,9 +271,11 @@ static message_handlers_t *mqtt_get_msg_handler(mqtt_client_t* c, MQTTString* to
list_t *curr, *next;
message_handlers_t *msg_handler;
/* traverse the msg_handler_list to find the matching message handler */
LIST_FOR_EACH_SAFE(curr, next, &c->msg_handler_list) {
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
/* judge topic is equal or match, support wildcard, such as '#' '+' */
if ((NULL != msg_handler->topic_filter) && ((MQTTPacket_equals(topic_name, (char*)msg_handler->topic_filter)) ||
(mqtt_topic_is_matched((char*)msg_handler->topic_filter, topic_name)))) {
return msg_handler;
@@ -261,12 +289,13 @@ static int mqtt_deliver_message(mqtt_client_t* c, MQTTString* topic_name, mqtt_m
int rc = MQTT_FAILED_ERROR;
message_handlers_t *msg_handler;
/* get mqtt message handler */
msg_handler = mqtt_get_msg_handler(c, topic_name);
if (NULL != msg_handler) {
message_data_t md;
mqtt_new_message_data(&md, topic_name, message);
msg_handler->handler(c, &md);
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
msg_handler->handler(c, &md); /* deliver the message */
rc = MQTT_SUCCESS_ERROR;
} else
goto exit;
@@ -288,14 +317,14 @@ static ack_handlers_t *mqtt_ack_handler_create(mqtt_client_t* c, int type, unsig
list_init(&ack_handler->list);
platform_timer_init(&ack_handler->timer);
platform_timer_cutdown(&ack_handler->timer, c->cmd_timeout);
platform_timer_cutdown(&ack_handler->timer, c->cmd_timeout); /* No response within timeout will be destroyed or resent */
ack_handler->type = type;
ack_handler->packet_id = packet_id;
ack_handler->payload_len = payload_len;
ack_handler->payload = (unsigned char *)ack_handler + sizeof(ack_handlers_t);
ack_handler->handler = handler;
memcpy(ack_handler->payload, c->write_buf, payload_len);
memcpy(ack_handler->payload, c->write_buf, payload_len); /* save the data in ack handler*/
return ack_handler;
}
@@ -304,7 +333,7 @@ static void mqtt_ack_handler_destroy(ack_handlers_t* ack_handler)
{
if (NULL != &ack_handler->list) {
list_del(&ack_handler->list);
platform_memory_free(ack_handler);
platform_memory_free(ack_handler); /* delete ack handler from the list, and free memory */
}
}
@@ -313,13 +342,13 @@ static void mqtt_ack_handler_resend(mqtt_client_t* c, ack_handlers_t* ack_handle
platform_timer_t timer;
platform_timer_init(&timer);
platform_timer_cutdown(&timer, c->cmd_timeout);
platform_timer_cutdown(&ack_handler->timer, c->cmd_timeout);
platform_timer_cutdown(&ack_handler->timer, c->cmd_timeout); /* timeout, recutdown */
platform_mutex_lock(&c->write_lock);
memcpy(c->write_buf, ack_handler->payload, ack_handler->payload_len);
memcpy(c->write_buf, ack_handler->payload, ack_handler->payload_len); /* copy data to write buf form ack handler */
mqtt_send_packet(c, ack_handler->payload_len, &timer);
LOG_E("%s:%d %s()... resend %d package, packet_id is %d ", __FILE__, __LINE__, __FUNCTION__, ack_handler->type, ack_handler->packet_id);
mqtt_send_packet(c, ack_handler->payload_len, &timer); /* resend data */
LOG_W("%s:%d %s()... resend %d package, packet_id is %d ", __FILE__, __LINE__, __FUNCTION__, ack_handler->type, ack_handler->packet_id);
platform_mutex_unlock(&c->write_lock);
}
@@ -334,7 +363,9 @@ static int mqtt_ack_list_node_is_exist(mqtt_client_t* c, int type, unsigned shor
LIST_FOR_EACH_SAFE(curr, next, &c->ack_handler_list) {
ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
if ((packet_id == ack_handler->packet_id) && (type == ack_handler->type))
/* For mqtt packets of qos1 and qos2, you can use the packet id and type as the unique
identifier to determine whether the node already exists and avoid repeated addition. */
if ((packet_id == ack_handler->packet_id) && (type == ack_handler->type))
return 1;
}
@@ -346,9 +377,11 @@ static int mqtt_ack_list_record(mqtt_client_t* c, int type, unsigned short packe
int rc = MQTT_SUCCESS_ERROR;
ack_handlers_t *ack_handler = NULL;
/* Determine if the node already exists */
if (mqtt_ack_list_node_is_exist(c, type, packet_id))
RETURN_ERROR(MQTT_ACK_NODE_IS_EXIST);
/* create a ack handler node */
ack_handler = mqtt_ack_handler_create(c, type, packet_id, payload_len, handler);
if (NULL == ack_handler)
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
@@ -377,6 +410,7 @@ static int mqtt_ack_list_unrecord(mqtt_client_t* c, int type, unsigned short pac
if (handler)
*handler = ack_handler->handler;
/* destroy a ack handler node */
mqtt_ack_handler_destroy(ack_handler);
mqtt_subtract_ack_handler_num(c);
}
@@ -395,13 +429,13 @@ static message_handlers_t *mqtt_msg_handler_create(const char* topic_filter, mqt
list_init(&msg_handler->list);
msg_handler->qos = qos;
msg_handler->handler = handler;
msg_handler->handler = handler; /* register callback handler */
msg_handler->topic_filter = topic_filter;
return msg_handler;
}
static void mqtt_msg_handler_destroy(message_handlers_t *msg_handler)
static void mqtt_msg_handler_destory(message_handlers_t *msg_handler)
{
if (NULL != &msg_handler->list) {
list_del(&msg_handler->list);
@@ -423,6 +457,7 @@ static int mqtt_msg_handler_is_exist(mqtt_client_t* c, message_handlers_t *handl
LIST_FOR_EACH_SAFE(curr, next, &c->msg_handler_list) {
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
/* determine whether a node already exists by mqtt topic, but wildcards are not supported */
if ((NULL != msg_handler->topic_filter) && (mqtt_is_topic_equals(msg_handler->topic_filter, handler->topic_filter))) {
LOG_W("%s:%d %s()...msg_handler->topic_filter: %s, handler->topic_filter: %s", __FILE__, __LINE__, __FUNCTION__, msg_handler->topic_filter, handler->topic_filter);
return 1;
@@ -435,10 +470,11 @@ static int mqtt_msg_handler_is_exist(mqtt_client_t* c, message_handlers_t *handl
static int mqtt_msg_handlers_install(mqtt_client_t* c, message_handlers_t *handler)
{
if (mqtt_msg_handler_is_exist(c, handler)) {
mqtt_msg_handler_destroy(handler);
mqtt_msg_handler_destory(handler);
RETURN_ERROR(MQTT_SUCCESS_ERROR);
}
/* install to msg_handler_list*/
list_add_tail(&handler->list, &c->msg_handler_list);
RETURN_ERROR(MQTT_SUCCESS_ERROR);
@@ -451,6 +487,7 @@ static void mqtt_clean_session(mqtt_client_t* c)
ack_handlers_t *ack_handler;
message_handlers_t *msg_handler;
/* release all ack_handler_list memory */
if (!(list_is_empty(&c->ack_handler_list))) {
LIST_FOR_EACH_SAFE(curr, next, &c->ack_handler_list) {
ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
@@ -459,6 +496,7 @@ static void mqtt_clean_session(mqtt_client_t* c)
list_del_init(&c->ack_handler_list);
}
/* release all msg_handler_list memory */
if (!(list_is_empty(&c->msg_handler_list))) {
LIST_FOR_EACH_SAFE(curr, next, &c->msg_handler_list) {
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
@@ -467,6 +505,8 @@ static void mqtt_clean_session(mqtt_client_t* c)
}
list_del_init(&c->msg_handler_list);
}
mqtt_set_client_state(c, CLIENT_STATE_INVALID);
}
static void mqtt_ack_list_scan(mqtt_client_t* c)
@@ -484,6 +524,8 @@ static void mqtt_ack_list_scan(mqtt_client_t* c)
continue;
if ((ack_handler->type == PUBACK) || (ack_handler->type == PUBREC) || (ack_handler->type == PUBREL) || (ack_handler->type == PUBCOMP)) {
/* timeout has occurred. for qos1 and qos2 packets, you need to resend them. */
mqtt_ack_handler_resend(c, ack_handler);
continue;
}
@@ -505,6 +547,7 @@ static int mqtt_try_resubscribe(mqtt_client_t* c)
LIST_FOR_EACH_SAFE(curr, next, &c->msg_handler_list) {
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
/* resubscribe topic */
if ((rc = mqtt_subscribe(c, msg_handler->topic_filter, msg_handler->qos, msg_handler->handler)) == MQTT_ACK_HANDLER_NUM_TOO_MUCH)
LOG_W("%s:%d %s()... mqtt ack handler num too much ...", __FILE__, __LINE__, __FUNCTION__);
@@ -518,10 +561,10 @@ static int mqtt_try_do_reconnect(mqtt_client_t* c)
int rc = MQTT_CONNECT_FAILED_ERROR;
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
rc = mqtt_connect(c);
rc = mqtt_connect(c); /* reconnect */
if (MQTT_SUCCESS_ERROR == rc) {
rc = mqtt_try_resubscribe(c);
rc = mqtt_try_resubscribe(c); /* resubscribe */
}
LOG_I("%s:%d %s()... mqtt try connect result is %#x", __FILE__, __LINE__, __FUNCTION__, rc);
@@ -556,15 +599,15 @@ static int mqtt_publish_ack_packet(mqtt_client_t *c, unsigned short packet_id, i
platform_mutex_lock(&c->write_lock);
switch (packet_type) {
case PUBREC:
len = MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREL, 0, packet_id);
rc = mqtt_ack_list_record(c, PUBCOMP, packet_id, len, NULL);
case PUBREC:
len = MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREL, 0, packet_id); /* make a PUBREL ack packet */
rc = mqtt_ack_list_record(c, PUBCOMP, packet_id, len, NULL); /* record ack, expect to receive PUBCOMP*/
if (MQTT_SUCCESS_ERROR != rc)
goto exit;
break;
case PUBREL:
len = MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBCOMP, 0, packet_id);
len = MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBCOMP, 0, packet_id); /* make a PUBCOMP ack packet */
break;
default:
@@ -591,11 +634,15 @@ static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_time
unsigned short packet_id;
unsigned char dup, packet_type;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size) != 1)
rc = MQTT_PUBREC_PACKET_ERROR;
(void) dup;
rc = mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
rc = mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL); /* unrecord ack handler */
RETURN_ERROR(rc);
}
@@ -608,7 +655,12 @@ static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
unsigned short packet_id;
int is_nack = 0;
message_handlers_t *msg_handler = NULL;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
/* deserialize subscribe ack packet */
if (MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size) != 1)
RETURN_ERROR(MQTT_SUBSCRIBE_ACK_PACKET_ERROR);
@@ -620,7 +672,7 @@ static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
if (is_nack) {
mqtt_msg_handler_destroy(msg_handler);
mqtt_msg_handler_destory(msg_handler); /* subscribe topic failed, destory message handler */
RETURN_ERROR(MQTT_SUBSCRIBE_NOT_ACK_ERROR);
}
@@ -634,16 +686,20 @@ static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer
int rc = MQTT_FAILED_ERROR;
message_handlers_t *msg_handler;
unsigned short packet_id = 0;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size) != 1)
RETURN_ERROR(MQTT_UNSUBSCRIBE_ACK_PACKET_ERROR);
rc = mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler);
rc = mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler); /* unrecord ack handler, and get message handler */
if (!msg_handler)
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
mqtt_msg_handler_destroy(msg_handler);
mqtt_msg_handler_destory(msg_handler); /* destory message handler */
RETURN_ERROR(rc);
}
@@ -655,6 +711,10 @@ static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
mqtt_message_t msg;
int qos;
msg.payloadlen = 0;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size) != 1)
@@ -662,6 +722,7 @@ static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
msg.qos = (mqtt_qos_t)qos;
/* for qos1 and qos2, you need to send a ack packet */
if (msg.qos != QOS0) {
platform_mutex_lock(&c->write_lock);
@@ -684,6 +745,7 @@ static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
if (msg.qos != QOS2)
mqtt_deliver_message(c, &topic_name, &msg);
else {
/* record the received of a qos2 message and only processes it when the qos2 message is received for the first time */
if ((rc = mqtt_ack_list_record(c, PUBREL, msg.id, len, NULL)) != MQTT_ACK_NODE_IS_EXIST)
mqtt_deliver_message(c, &topic_name, &msg);
}
@@ -697,12 +759,16 @@ static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer
int rc = MQTT_FAILED_ERROR;
unsigned short packet_id;
unsigned char dup, packet_type;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size) != 1)
RETURN_ERROR(MQTT_PUBREC_PACKET_ERROR);
(void) dup;
rc = mqtt_publish_ack_packet(c, packet_id, packet_type);
rc = mqtt_publish_ack_packet(c, packet_id, packet_type); /* make a ack packet and send it */
rc = mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
RETURN_ERROR(rc);
@@ -710,21 +776,17 @@ static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
int rc = MQTT_SUCCESS_ERROR;
int packet_type = 0;
rc = mqtt_read_packet(c, &packet_type, timer);
platform_timer_init(timer);
platform_timer_cutdown(timer, c->cmd_timeout);
switch (packet_type) {
case 0: /* timed out reading packet */
break;
case CONNACK:
break;
case CONNACK: /* has been processed */
goto exit;
case PUBACK:
case PUBCOMP:
@@ -749,19 +811,15 @@ static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
break;
case PINGRESP:
c->ping_outstanding = 0;
c->ping_outstanding = 0; /* keep alive ping success */
break;
default:
goto exit;
}
if (mqtt_keep_alive(c) != MQTT_SUCCESS_ERROR) {
mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
rc = MQTT_NOT_CONNECT_ERROR;
}
rc = mqtt_keep_alive(c);
exit:
if (rc == MQTT_SUCCESS_ERROR)
rc = packet_type;
@@ -785,11 +843,19 @@ static int mqtt_wait_packet(mqtt_client_t* c, int packet_type, platform_timer_t*
static void mqtt_yield_thread(void *arg)
{
int rc;
client_state_t state;
mqtt_client_t *c = (mqtt_client_t *)arg;
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CONNECTED != state) {
LOG_W("%s:%d %s()..., mqtt is not connected to the server...", __FILE__, __LINE__, __FUNCTION__);
platform_thread_stop(c->thread); /* mqtt is not connected to the server, stop thread */
}
while (1) {
rc = mqtt_yield(c, c->cmd_timeout);
if (MQTT_CLOSE_SESSION_ERROR == rc) {
LOG_E("%s:%d %s()..., mqtt close session....", __FILE__, __LINE__, __FUNCTION__);
if (MQTT_CLEAN_SESSION_ERROR == rc) {
LOG_E("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
c->network->disconnect(c->network);
mqtt_clean_session(c);
goto exit;
@@ -836,8 +902,11 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
platform_mutex_lock(&c->write_lock);
/* serialize connect packet */
if ((len = MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)) <= 0)
goto exit;
/* send connect packet */
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
goto exit;
@@ -851,12 +920,24 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
exit:
if (rc == MQTT_SUCCESS_ERROR) {
if(NULL ==c->thread)
if(NULL == c->thread) {
/* connect success, and need init mqtt thread */
c->thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK);
c->ping_outstanding = 0;
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
if (NULL != c->thread) {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
platform_thread_startup(c->thread);
platform_thread_start(c->thread); /* start run mqtt thread */
}
} else {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED); /* reconnect, mqtt thread is already exists */
}
c->ping_outstanding = 0; /* reset ping outstanding */
} else {
mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED);
mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED); /* connect failed */
}
platform_mutex_unlock(&c->write_lock);
@@ -869,15 +950,18 @@ exit:
int mqtt_keep_alive(mqtt_client_t* c)
{
int rc = MQTT_SUCCESS_ERROR;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received)) {
if (c->ping_outstanding) {
LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
rc = MQTT_FAILED_ERROR; /* PINGRESP not received in keepalive interval */
mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
rc = MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
} else {
platform_timer_t timer;
platform_timer_init(&timer);
platform_timer_cutdown(&timer, c->cmd_timeout);
int len = MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);
if (len > 0 && (rc = mqtt_send_packet(c, len, &timer)) == MQTT_SUCCESS_ERROR) // send the ping packet
c->ping_outstanding++;
@@ -903,14 +987,15 @@ int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
}
memset(c->network, 0, sizeof(network_t));
if ((MQTT_MIN_PAYLOAD_SIZE <= init->read_buf_size) || (MQTT_MAX_PAYLOAD_SIZE >= init->read_buf_size))
if ((MQTT_MIN_PAYLOAD_SIZE >= init->read_buf_size) || (MQTT_MAX_PAYLOAD_SIZE <= init->read_buf_size))
init->read_buf_size = MQTT_DEFAULT_BUF_SIZE;
if ((MQTT_MIN_PAYLOAD_SIZE <= init->write_buf_size) || (MQTT_MAX_PAYLOAD_SIZE >= init->read_buf_size))
if ((MQTT_MIN_PAYLOAD_SIZE >= init->write_buf_size) || (MQTT_MAX_PAYLOAD_SIZE <= init->read_buf_size))
init->write_buf_size = MQTT_DEFAULT_BUF_SIZE;
c->read_buf = (unsigned char*) platform_memory_alloc(init->read_buf_size);
c->write_buf = (unsigned char*) platform_memory_alloc(init->write_buf_size);
if ((NULL == c->read_buf) || (NULL == c->write_buf)){
if ((NULL == c->read_buf) || (NULL == c->write_buf)) {
LOG_E("%s:%d %s()... malloc buf failed...", __FILE__, __LINE__, __FUNCTION__);
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
}
@@ -930,8 +1015,10 @@ int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
if (0 == init->connect_params.keep_alive_interval)
init->connect_params.keep_alive_interval = MQTT_KEEP_ALIVE_INTERVAL;
if (0 == init->connect_params.mqtt_version)
init->connect_params.mqtt_version = MQTT_VERSION;
if (0 == init->reconnect_try_duration)
init->reconnect_try_duration = MQTT_RECONNECT_DEFAULT_DURATION;
@@ -968,8 +1055,9 @@ int mqtt_release(mqtt_client_t* c)
if (NULL == c)
RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
while (CLIENT_STATE_INVALID != mqtt_get_client_state(c)); /* wait for the clean session to complete */
if (NULL != c->network) {
network_release(c->network);
platform_memory_free(c->network);
c->network = NULL;
}
@@ -984,8 +1072,6 @@ int mqtt_release(mqtt_client_t* c)
c->read_buf = NULL;
}
mqtt_clean_session(c);
memset(c, 0, sizeof(mqtt_client_t));
RETURN_ERROR(MQTT_SUCCESS_ERROR);
@@ -993,6 +1079,7 @@ int mqtt_release(mqtt_client_t* c)
int mqtt_connect(mqtt_client_t* c)
{
/* connect server in blocking mode and wait for connection result */
return mqtt_connect_with_results(c);
}
@@ -1007,13 +1094,14 @@ int mqtt_disconnect(mqtt_client_t* c)
platform_mutex_lock(&c->write_lock);
len = MQTTSerialize_disconnect(c->write_buf, c->write_buf_size);
/* serialize disconnect packet and send it */
len = MQTTSerialize_disconnect(c->write_buf, c->write_buf_size);
if (len > 0)
rc = mqtt_send_packet(c, len, &timer);
platform_mutex_unlock(&c->write_lock);
mqtt_set_client_state(c, CLIENT_STATE_INVALID);
mqtt_set_client_state(c, CLIENT_STATE_CLEAN_SESSION);
RETURN_ERROR(rc);
}
@@ -1031,22 +1119,22 @@ int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, m
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
platform_timer_init(&timer);
platform_timer_cutdown(&timer, c->cmd_timeout);
platform_mutex_lock(&c->write_lock);
packet_id = mqtt_get_next_packet_id(c);
/* serialize subscribe packet and send it */
len = MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic, (int*)&qos);
if (len <= 0)
goto exit;
if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
goto exit;
if (NULL == handler)
handler = default_msg_handler;
handler = default_msg_handler; /* if handler is not specified, the default handler is used */
/* create a message and record it */
msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
if (NULL == msg_handler)
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
@@ -1073,18 +1161,17 @@ int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter)
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
platform_timer_init(&timer);
platform_timer_cutdown(&timer, c->cmd_timeout);
platform_mutex_lock(&c->write_lock);
packet_id = mqtt_get_next_packet_id(c);
/* serialize unsubscribe packet and send it */
if ((len = MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)) <= 0)
goto exit;
if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
goto exit;
/* create a message and record it */
msg_handler = mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL);
if (NULL == msg_handler)
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
@@ -1114,19 +1201,17 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg
if ((NULL != msg->payload) && (0 == msg->payloadlen))
msg->payloadlen = strlen((char*)msg->payload);
platform_timer_init(&timer);
platform_timer_cutdown(&timer, c->cmd_timeout);
platform_mutex_lock(&c->write_lock);
if (msg->qos == QOS1 || msg->qos == QOS2) {
if (mqtt_ack_handler_is_maximum(c)) {
rc = MQTT_ACK_HANDLER_NUM_TOO_MUCH;
rc = MQTT_ACK_HANDLER_NUM_TOO_MUCH; /* the recorded ack handler has reached the maximum */
goto exit;
}
msg->id = mqtt_get_next_packet_id(c);
}
/* serialize publish packet and send it */
len = MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
topic, (unsigned char*)msg->payload, msg->payloadlen);
if (len <= 0)
@@ -1136,12 +1221,15 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg
goto exit;
if (QOS0 != msg->qos) {
mqtt_set_publish_dup(c,1);
mqtt_set_publish_dup(c,1); /* may resend this data, set the udp flag in advance */
if (QOS1 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBACK, msg->id, len, NULL);
/* expect to receive PUBACK, otherwise data will be resent */
rc = mqtt_ack_list_record(c, PUBACK, msg->id, len, NULL);
} else if (QOS2 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBREC, msg->id, len, NULL);
/* expect to receive PUBREC, otherwise data will be resent */
rc = mqtt_ack_list_record(c, PUBREC, msg->id, len, NULL);
}
}
@@ -1169,26 +1257,33 @@ int mqtt_yield(mqtt_client_t* c, int timeout_ms)
while (!platform_timer_is_expired(&timer)) {
state = mqtt_get_client_state(c);
if (CLIENT_STATE_INVALID == state) {
RETURN_ERROR(MQTT_CLOSE_SESSION_ERROR);
if (CLIENT_STATE_CLEAN_SESSION == state) {
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
} else if (CLIENT_STATE_CONNECTED != state) {
/* mqtt not connect, need reconnect */
rc = mqtt_try_reconnect(c);
if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
RETURN_ERROR(rc);
continue;
}
/* mqtt connected, handle mqtt packet */
rc = mqtt_packet_handle(c, &timer);
if (rc >= 0) {
/* scan ack list, destroy ack handler that have timed out or resend them */
mqtt_ack_list_scan(c);
} else if (MQTT_NOT_CONNECT_ERROR == rc) {
LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
/* reconnect timer cutdown */
platform_timer_cutdown(&c->reconnect_timer, c->reconnect_try_duration);
} else {
break;
}
}
RETURN_ERROR(rc);
}

View File

@@ -2,7 +2,7 @@
* @Author: jiejie
* @Github: https://github.com/jiejieTop
* @Date: 2019-12-09 21:31:25
* @LastEditTime : 2020-01-11 20:11:47
* @LastEditTime: 2020-03-15 01:12:28
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
*/
#ifndef _MQTTCLIENT_H_
@@ -17,7 +17,7 @@
#include "platform_memory.h"
#include "platform_mutex.h"
#include "platform_thread.h"
#include "mqtt_config.h"
#include "mqtt_defconfig.h"
#include "network.h"
#include "random.h"
#include "error.h"
@@ -35,6 +35,7 @@ typedef enum client_state {
CLIENT_STATE_INITIALIZED = 0,
CLIENT_STATE_CONNECTED = 1,
CLIENT_STATE_DISCONNECTED = 2,
CLIENT_STATE_CLEAN_SESSION = 3
}client_state_t;
typedef struct mqtt_connack_data {
@@ -93,10 +94,10 @@ typedef struct connect_params {
typedef struct mqtt_client {
unsigned short packet_id;
unsigned char *read_buf;
unsigned char *write_buf;
unsigned char ping_outstanding;
unsigned char ack_handler_number;
unsigned char *read_buf;
unsigned char *write_buf;
unsigned int cmd_timeout;
unsigned int read_buf_size;
unsigned int write_buf_size;