first commit for opensource

first commit for opensource
This commit is contained in:
supowang
2019-09-16 13:19:50 +08:00
parent 08ab013b8e
commit edb2879617
6303 changed files with 5472815 additions and 23 deletions

View File

@@ -0,0 +1,352 @@
#include "qcloud.h"
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_network_host_construct(qcloud_network_t *network, qcloud_device_t *device)
{
int server_len;
char mqtt_server[QCLOUD_SERVER_DOMAIN_MAX];
memset(network->host, 0, sizeof(network->host));
server_len = osal_snprintf(mqtt_server, sizeof(mqtt_server), "%s.%s", device->product_id, qcloud_mqtt_server);
if (server_len < 0 || server_len > QCLOUD_SERVER_DOMAIN_MAX - 1) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
memcpy(network->host, mqtt_server, sizeof(network->host));
network->port = qcloud_mqtt_port;
return QCLOUD_ERR_SUCCESS;
}
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_network_init(qcloud_network_t *network, qcloud_device_t *device)
{
#if (QCLOUD_CFG_TLS_EN > 0u)
QCLOUD_FUNC_EXIT_RC_IF_NOT(qcloud_tls_init(&network->tls_opt, device), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
QCLOUD_FUNC_EXIT_RC_IF_NOT(qcloud_network_tls_init(network), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
#else
QCLOUD_FUNC_EXIT_RC_IF_NOT(qcloud_network_tcp_init(network), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
#endif
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_network_host_construct(network, device), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
return QCLOUD_ERR_SUCCESS;
}
__QCLOUD_STATIC__ uint16_t mqtt_client_random_packet_id_generate(void)
{
#define PACKET_ID_MAX (65535)
srand((unsigned)osal_timer_current_sec());
return rand() % (PACKET_ID_MAX + 1) + 1;
}
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_construct(qcloud_mqtt_client_t *client,
mqtt_event_handler_fn_t handler,
void *handler_context,
qcloud_auto_connect_state_t auto_connect_state)
{
client->event_handler.handler = handler;
client->event_handler.context = handler_context;
client->auto_connect_state = auto_connect_state;
client->command_timeout = QCLOUD_MQTT_COMMAND_TIMEOUT;
client->connection_state = QCLOUD_MQTT_CONNECTION_STATE_DISCONNECTED;
// packet id 取随机数 1- 65536
client->packet_id = mqtt_client_random_packet_id_generate();
client->ping_outstanding = 0;
client->is_manually_disconnected = QCLOUD_FALSE;
client->network_disconnect_counter = 0;
if ((client->global_lock = osal_mutex_create()) == NULL) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
if ((client->tx_lock = osal_mutex_create()) == NULL) {
QCLOUD_LOG_E("write buf lock failed.");
goto errout;
}
if ((client->ack_pend_list_lock = osal_mutex_create()) == NULL) {
QCLOUD_LOG_E("ack list lock failed.");
goto errout;
}
if ((client->msg_handler_list_lock = osal_mutex_create()) == NULL) {
QCLOUD_LOG_E("handler list lock failed.");
goto errout;
}
qcloud_list_init(&client->ack_pend_list);
qcloud_list_init(&client->msg_handler_list);
// ping定时器以及重连延迟定时器相关初始化
osal_timer_init(&client->ping_timer);
osal_timer_init(&client->reconnect_timer);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
errout:
if (client->global_lock) {
osal_mutex_destroy(client->global_lock);
client->global_lock = NULL;
}
if (client->tx_lock) {
osal_mutex_destroy(client->tx_lock);
client->tx_lock = NULL;
}
if (client->ack_pend_list_lock) {
osal_mutex_destroy(client->ack_pend_list_lock);
client->ack_pend_list_lock = NULL;
}
if (client->msg_handler_list_lock) {
osal_mutex_destroy(client->msg_handler_list_lock);
client->msg_handler_list_lock = NULL;
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE)
}
#if (QCLOUD_CFG_TLS_EN == 0u) && (QCLOUD_CFG_AUTH_MODE == QCLOUD_AUTH_MODE_KEY)
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_key_decode(uint8_t *decoded_buf, size_t decoded_buf_size, const char *key, size_t *decoded_key_len)
{
QCLOUD_POINTER_SANITY_CHECK(decoded_buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(key, QCLOUD_ERR_INVAL);
int key_len = 0;
memset(decoded_buf, 0, decoded_buf_size);
key_len = strlen(key);
if (qcloud_utils_base64_decode(decoded_buf, decoded_buf_size, decoded_key_len,
(unsigned char *)key, key_len) != 0) {
QCLOUD_LOG_E("psk decode failed!");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_INVAL);
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_password_encode(mqtt_connect_opt_t *connect_opt, uint8_t *key, size_t key_len)
{
char digest[41];
int password_len = 0;
memset(digest, 0, sizeof(digest));
utils_hmac_sha1(connect_opt->username, strlen(connect_opt->username), digest, (const char *)key, key_len);
password_len = osal_snprintf(connect_opt->password, sizeof(connect_opt->password), "%s;hmacsha1", digest);
if (password_len < 0 || password_len >= QCLOUD_DEVICE_PASSWORD_MAX) {
QCLOUD_LOG_E("password encode failed");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_DEV_INFO);
}
connect_opt->password_len = password_len;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
#endif
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_id_generate(mqtt_connect_opt_t *connect_opt, qcloud_device_t *device)
{
int client_id_len = 0;
memset(connect_opt->client_id, 0, sizeof(connect_opt->client_id));
client_id_len = osal_snprintf(connect_opt->client_id, sizeof(connect_opt->client_id), "%s%s", device->product_id, device->device_name);
if (client_id_len < 0 || client_id_len >= QCLOUD_MQTT_DEVICE_CLIENT_ID_MAX) {
QCLOUD_LOG_E("client id generate failed");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_DEV_INFO);
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_STATIC__ qcloud_err_t mqtt_client_username_generate(mqtt_connect_opt_t *connect_opt)
{
uint32_t now;
int username_len = 0;
char connection_id[QCLOUD_MQTT_CONNECT_ID_MAX + 1];
now = osal_timer_current_sec() + QCLOUD_MQTT_ACCESS_EXPIRE_TIMEOUT_MAX / 1000;
mqtt_glue_connect_id_generate(connection_id);
memset(connect_opt->username, 0, sizeof(connect_opt->username));
username_len = osal_snprintf(connect_opt->username, sizeof(connect_opt->username), "%s;%s;%s;%ld",
connect_opt->client_id, QCLOUD_APPID, connection_id, now);
if (username_len < 0 || username_len >= QCLOUD_DEVICE_USERNAME_MAX) {
QCLOUD_LOG_E("username generate failed");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_DEV_INFO);
}
connect_opt->username_len = username_len;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_INTERNAL__ void mqtt_client_connection_state_set(qcloud_mqtt_client_t *client, qcloud_mqtt_con_status_t state)
{
osal_mutex_lock(client->global_lock);
client->connection_state = state;
osal_mutex_unlock(client->global_lock);
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_connect_opt_create(mqtt_connect_opt_t *connect_opt,
qcloud_device_t *device,
mqtt_version_t mqtt_version,
uint16_t keep_alive_interval,
mqtt_clean_session_state_t clean_session)
{
QCLOUD_POINTER_SANITY_CHECK(connect_opt, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(device, QCLOUD_ERR_INVAL);
#if (QCLOUD_CFG_TLS_EN == 0u)
uint8_t decoded_key[QCLOUD_PSK_MAX];
size_t decoded_key_len;
#endif
memset(connect_opt, 0, sizeof(mqtt_connect_opt_t));
// 1. client id
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_id_generate(connect_opt, device), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
// 2. username
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_username_generate(connect_opt), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
#if (QCLOUD_CFG_TLS_EN == 0u) && (QCLOUD_CFG_AUTH_MODE == QCLOUD_AUTH_MODE_KEY)
// 3. key(temporary)
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_key_decode(decoded_key, sizeof(decoded_key), device->key, &decoded_key_len), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
// 4. password
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_password_encode(connect_opt, decoded_key, decoded_key_len), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
#endif
connect_opt->mqtt_version = mqtt_version;
// keep alive interval is in second, no long than 11.5 minutes(11.5 * 60 seconds)
connect_opt->keep_alive_interval = QCLOUD_MIN(keep_alive_interval, 11.5 * 60);
connect_opt->clean_session = clean_session;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_create(qcloud_mqtt_client_t *client,
qcloud_device_t *device,
mqtt_event_handler_fn_t handler,
void *handler_context,
qcloud_auto_connect_state_t auto_connect_state)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(device, QCLOUD_ERR_INVAL);
memset(client, 0, sizeof(qcloud_mqtt_client_t));
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_network_init(&client->network, device), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
QCLOUD_FUNC_EXIT_RC_IF_NOT(mqtt_client_construct(client, handler, handler_context, auto_connect_state), QCLOUD_ERR_SUCCESS, QCLOUD_ERR_FAILURE);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_API__ void qcloud_mqtt_client_destroy(qcloud_mqtt_client_t *client)
{
#if (QCLOUD_CFG_DUPLICATED_MSG_REMOVE_EN > 0u)
mqtt_glue_packet_id_cache_reset();
#endif
if (qcloud_mqtt_client_is_connected(client)) {
qcloud_mqtt_client_disconnect(client);
}
mqtt_glue_ack_list_destroy(client);
osal_mutex_destroy(client->ack_pend_list_lock);
osal_mutex_destroy(client->global_lock);
osal_mutex_destroy(client->tx_lock);
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_publish(qcloud_mqtt_client_t *client, char *topic, mqtt_publish_opt_t *publish_opt)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(topic, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(publish_opt, QCLOUD_ERR_INVAL);
QCLOUD_FUNC_EXIT_RC(qcloud_mqtt_publish(client, topic, publish_opt));
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_subscribe(qcloud_mqtt_client_t *client, const char *topic_filter, mqtt_subscribe_opt_t *subscribe_opt)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(subscribe_opt, QCLOUD_ERR_INVAL);
QCLOUD_FUNC_EXIT_RC(qcloud_mqtt_subscribe(client, topic_filter, subscribe_opt));
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_unsubscribe(qcloud_mqtt_client_t *client, const char *topic_filter)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
QCLOUD_FUNC_EXIT_RC(qcloud_mqtt_unsubscribe(client, topic_filter));
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_yield(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt, uint32_t timeout_ms)
{
QCLOUD_FUNC_EXIT_RC(qcloud_mqtt_yield(client, connect_opt, timeout_ms));
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_disconnect(qcloud_mqtt_client_t *client)
{
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_FUNC_EXIT_RC(qcloud_mqtt_disconnect(client));
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_client_connect(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt)
{
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(connect_opt, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
QCLOUD_FUNC_EXIT_RC_IF(client->connection_state, QCLOUD_MQTT_CONNECTION_STATE_CONNECTED, QCLOUD_ERR_MQTT_ALREADY_CONNECTED);
rc = qcloud_mqtt_connect(client, connect_opt);
if (rc != QCLOUD_ERR_SUCCESS) {
QCLOUD_LOG_E("mqtt connect failed: %d", rc);
} else {
QCLOUD_LOG_I("mqtt connect success");
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_API__ int qcloud_mqtt_client_is_connected(qcloud_mqtt_client_t *client)
{
int is_connected = 0;
if (!client) {
return QCLOUD_FALSE;
}
osal_mutex_lock(client->global_lock);
is_connected = client->connection_state == QCLOUD_MQTT_CONNECTION_STATE_CONNECTED;
osal_mutex_unlock(client->global_lock);
return is_connected ? QCLOUD_TRUE : QCLOUD_FALSE;
}

View File

@@ -0,0 +1,962 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Sergio R. Caprile - non-blocking packet read functions for stream transport
*******************************************************************************/
#include "qcloud.h"
__QCLOUD_STATIC__ int mqtt_common_packet_length_get(int rem_len)
{
rem_len += 1; /* header byte */
/* now remaining_length field */
if (rem_len < 128) {
rem_len += 1;
} else if (rem_len < 16384) {
rem_len += 2;
} else if (rem_len < 2097151) {
rem_len += 3;
} else {
rem_len += 4;
}
return rem_len;
}
/**
* Determines the length of the MQTT connect packet that would be produced using the supplied connect options.
* @param options the options to be used to build the connect packet
* @param the length of buffer needed to contain the serialized version of the packet
* @return int indicating function execution status
*/
__QCLOUD_STATIC__ int mqtt_common_serialize_connect_packet_length(mqtt_connect_opt_t *connect_opt)
{
int len = 0;
/* variable depending on MQTT or MQIsdp */
if (connect_opt->mqtt_version == 3) {
len = 12;
} else if (connect_opt->mqtt_version == 4) {
len = 10;
}
len += strlen(connect_opt->client_id) + 2;
if (connect_opt->username_len) {
len += connect_opt->username_len + 2;
}
if (connect_opt->password_len) {
len += connect_opt->password_len + 2;
}
return len;
}
/**
* Determines the length of the MQTT publish packet that would be produced using the supplied parameters
* @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0)
* @param topicName the topic name to be used in the publish
* @param payload_len the length of the payload to be sent
* @return the length of buffer needed to contain the serialized version of the packet
*/
__QCLOUD_STATIC__ int mqtt_common_serialize_publish_packet_length(int qos, char *topic, size_t payload_len)
{
int len = 0;
len += 2 + strlen(topic) + payload_len;
if (qos > MQTT_QOS0) {
len += 2; /* packet id */
}
return len;
}
/**
* Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters
* @param count the number of topic filter strings in topicFilters
* @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet
*/
__QCLOUD_STATIC__ int mqtt_common_serialize_subscribe_packet_length(uint32_t count, char *topic_filters[])
{
int i;
int len = 2; /* packet id */
for (i = 0; i < count; ++i) {
len += 2 + strlen(topic_filters[i]) + 1; /* length + topic + req_qos */
}
return len;
}
/**
* Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters
* @param count the number of topic filter strings in topicFilters
* @param topicFilters the array of topic filter strings to be used in the publish
* @return the length of buffer needed to contain the serialized version of the packet
*/
__QCLOUD_STATIC__ int mqtt_common_serialize_unsubscribe_packet_length(uint32_t count, char *topic_filters[])
{
int i = 0;
int len = 2; /* packet id */
for (i = 0; i < count; ++i) {
len += 2 + strlen(topic_filters[i]); /* length + topic*/
}
return len;
}
/**
* Decodes the message length according to the MQTT algorithm
* @param getcharfn pointer to function to read the next character from the data source
* @param value the decoded length returned
* @return the number of bytes read from the socket
*/
__QCLOUD_STATIC__ qcloud_err_t mqtt_common_packet_do_decode_from_buf(uint32_t (*getcharfn)(uint8_t *, uint32_t), uint32_t *value, uint32_t *read_bytes_len)
{
QCLOUD_FUNC_ENTRY;
uint8_t c;
uint32_t multiplier = 1;
uint32_t len = 0, get_len;
*value = 0;
do {
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
/* bad data */
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_PACKET_READ);
}
get_len = 0;
get_len = (*getcharfn)(&c, 1);
if (1 != get_len) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
*value += (c & 127) * multiplier;
multiplier *= 128;
} while ((c & 128) != 0);
*read_bytes_len = len;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
static uint8_t *bufptr;
__QCLOUD_STATIC__ uint32_t __bufchar(uint8_t *c, uint32_t count)
{
uint32_t i;
for (i = 0; i < count; ++i) {
*c = *bufptr++;
}
return count;
}
/**
* Encodes the message length according to the MQTT algorithm
* @param buf the buffer into which the encoded data is written
* @param length the length to be encoded
* @return the number of bytes written to buffer
*/
__QCLOUD_INTERNAL__ int mqtt_common_packet_encode(uint8_t *buf, int length)
{
QCLOUD_FUNC_ENTRY;
int rc = 0;
uint8_t encode_byte;
do {
encode_byte = (uint8_t)(length % 128);
length /= 128;
/* if there are more digits to encode, set the top bit of this digit */
if (length > 0) {
encode_byte |= 0x80;
}
buf[rc++] = encode_byte;
} while (length > 0);
return rc;
}
__QCLOUD_STATIC__ qcloud_err_t mqtt_common_packet_decode_from_buf(uint8_t *buf, uint32_t *value, uint32_t *read_bytes_len)
{
bufptr = buf;
return mqtt_common_packet_do_decode_from_buf(__bufchar, value, read_bytes_len);
}
/**
* Calculates uint16 packet id from two bytes read from the input buffer
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the value calculated
*/
__QCLOUD_STATIC__ uint16_t mqtt_common_packet_read_dbyte(uint8_t **pptr)
{
uint8_t *ptr = *pptr;
uint8_t firstByte = (uint8_t) (*ptr);
uint8_t secondByte = (uint8_t) (*(ptr + 1));
uint16_t len = (uint16_t) (secondByte + (256 * firstByte));
*pptr += 2;
return len;
}
/**
* Reads one character from the input buffer.
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the character read
*/
__QCLOUD_STATIC__ uint8_t mqtt_common_packet_read_byte(uint8_t **pptr)
{
uint8_t c = **pptr;
(*pptr)++;
return c;
}
/**
* @param mqttstring the MQTTString structure into which the data is to be read
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param enddata pointer to the end of the data: do not read beyond
* @return SUCCESS if successful, FAILURE if not
*/
__QCLOUD_STATIC__ qcloud_err_t mqtt_common_packet_read_string(char **string, uint16_t *string_len, uint8_t **pptr, uint8_t *enddata)
{
/* the first two bytes are the length of the string */
if (enddata - (*pptr) <= 1) { /* enough length to read the integer? */
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
*string_len = mqtt_common_packet_read_dbyte(pptr); /* increments pptr to point past length */
if (*string_len > QCLOUD_MQTT_CLIENT_RX_BUF_LEN){
QCLOUD_LOG_E("string length overflow!");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
if (&(*pptr)[*string_len] > enddata) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
*string = (char *)*pptr;
*pptr += *string_len;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Writes one character to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param c the character to write
*/
__QCLOUD_STATIC__ void mqtt_common_packet_write_byte(uint8_t **pptr, uint8_t c)
{
**pptr = c;
(*pptr)++;
}
/**
* Writes an integer as 2 bytes to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param anInt the integer to write
*/
__QCLOUD_STATIC__ void mqtt_common_packet_write_dbyte(uint8_t **pptr, uint16_t dbyte)
{
**pptr = (uint8_t)(dbyte / 256);
(*pptr)++;
**pptr = (uint8_t)(dbyte % 256);
(*pptr)++;
}
/**
* Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param string the C string to write
*/
__QCLOUD_STATIC__ void mqtt_common_packet_write_string(uint8_t **pptr, const char *string)
{
size_t len = strlen(string);
mqtt_common_packet_write_dbyte(pptr, (uint16_t)len);
memcpy(*pptr, string, len);
*pptr += len;
}
/**
* Serializes the ack packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buf_len the length in bytes of the supplied buffer
* @param packet_type the MQTT packet type: 1.PUBACK; 2.PUBREL; 3.PUBCOMP
* @param dup the MQTT dup flag
* @param packet_id the MQTT packet identifier
* @return serialized length, or error if 0
*/
__QCLOUD_STATIC__ qcloud_err_t mqtt_common_serialize_ack_packet(uint8_t *buf,
size_t buf_len,
mqtt_packet_t packet_type,
uint8_t dup,
uint16_t packet_id,
uint32_t *serialized_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
mqtt_header_t header = {0};
uint8_t *ptr = buf;
/* Minimum byte length required by ACK headers is
* 2 for fixed and 2 for variable part */
if (buf_len < 4) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.bits.type = packet_type;
header.bits.dup = dup;
header.bits.qos = (packet_type == MQTT_PACKET_TYPE_PUBREL) ? 1 : 0;
mqtt_common_packet_write_byte(&ptr, header.byte); /* write header */
ptr += mqtt_common_packet_encode(ptr, 2); /* write remaining length */
mqtt_common_packet_write_dbyte(&ptr, packet_id);
*serialized_len = ptr - buf;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Deserializes the supplied (wire) buffer into an ack
* @param packet_type returned integer - the MQTT packet type
* @param dup returned integer - the MQTT dup flag
* @param packet_id returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buf_len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_deserialize_ack_packet(uint8_t *packet_type,
uint8_t *dup,
uint16_t *packet_id,
uint8_t *buf,
size_t buf_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(packet_type, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(dup, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(packet_id, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
mqtt_header_t header = {0};
uint8_t ack_code;
uint8_t *curdata = buf, *enddata = NULL;
uint32_t decoded_len = 0, read_bytes_len = 0;
/* PUBACK fixed header size is two bytes, variable header is 2 bytes, MQTT v3.1.1 Specification 3.4.1 */
if (buf_len < 4) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.byte = mqtt_common_packet_read_byte(&curdata);
*dup = header.bits.dup;
*packet_type = header.bits.type;
/* read remaining length */
rc = mqtt_common_packet_decode_from_buf(curdata, &decoded_len, &read_bytes_len);
if (QCLOUD_ERR_SUCCESS != rc) {
QCLOUD_FUNC_EXIT_RC(rc);
}
curdata += read_bytes_len;
enddata = curdata + decoded_len;
if (enddata - curdata < 2) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
*packet_id = mqtt_common_packet_read_dbyte(&curdata);
// 返回错误码处理
if (enddata - curdata >= 1) {
ack_code = mqtt_common_packet_read_byte(&curdata);
if (ack_code != 0) {
QCLOUD_LOG_E("deserialize ack packet failure! 0x%02x", ack_code);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Deserializes the supplied (wire) buffer into suback data
* @param packet_id returned integer - the MQTT packet identifier
* @param max_count - the maximum number of members allowed in the grantedQoSs array
* @param count returned integer - number of members in the grantedQoSs array
* @param grantedQoSs returned array of integers - the granted qualities of service
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buf_len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success, 0 is failure
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_deserialize_suback_packet(uint16_t *packet_id,
uint32_t max_count,
uint32_t *count,
int granted_qoss[],
uint8_t *buf,
size_t buf_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(packet_id, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(count, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(granted_qoss, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
mqtt_header_t header = {0};
uint8_t *curdata = buf, *enddata = NULL;
uint32_t decoded_len = 0, read_bytes_len = 0;
// SUBACK头部大小为4字节, 负载部分至少为1字节QOS返回码
if (buf_len < 5) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
// 读取报文固定头部的第一个字节
header.byte = mqtt_common_packet_read_byte(&curdata);
if (header.bits.type != MQTT_PACKET_TYPE_SUBACK) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
// 读取报文固定头部的剩余长度
rc = mqtt_common_packet_decode_from_buf(curdata, &decoded_len, &read_bytes_len);
if (rc != QCLOUD_ERR_SUCCESS) {
QCLOUD_FUNC_EXIT_RC(rc);
}
curdata += read_bytes_len;
enddata = curdata + decoded_len;
if (enddata - curdata < 2) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
// 读取报文可变头部的报文标识符
*packet_id = mqtt_common_packet_read_dbyte(&curdata);
// 读取报文的负载部分
*count = 0;
while (curdata < enddata) {
if (*count > max_count) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
granted_qoss[(*count)++] = mqtt_common_packet_read_byte(&curdata);
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Deserializes the supplied (wire) buffer into unsuback data
* @param packet_id returned integer - the MQTT packet identifier
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buf_len the length in bytes of the data in the supplied buffer
* @return int indicating function execution status
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_deserialize_unsuback_packet(uint16_t *packet_id,
uint8_t *buf,
size_t buf_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(packet_id, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
uint8_t type = 0, dup = 0;
rc = mqtt_common_deserialize_ack_packet(&type, &dup, packet_id, buf, buf_len);
if (rc != QCLOUD_ERR_SUCCESS || type != MQTT_PACKET_TYPE_UNSUBACK) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Serializes the connect options into the buffer.
* @param buf the buffer into which the packet will be serialized
* @param len the length in bytes of the supplied buffer
* @param options the options to be used to build the connect packet
* @param serialized length
* @return int indicating function execution status
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_connect_packet(uint8_t *buf,
size_t buf_len,
mqtt_connect_opt_t *connect_opt,
uint32_t *serialized_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(connect_opt, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
int packet_len = 0;
uint8_t *ptr = buf;
mqtt_header_t header = {0};
mqtt_connect_flag_t flags = {0};
packet_len = mqtt_common_serialize_connect_packet_length(connect_opt);
if (mqtt_common_packet_length_get(packet_len) > buf_len) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.byte = 0;
header.bits.type = MQTT_PACKET_TYPE_CONNECT;
// 报文固定头部第一个字节
mqtt_common_packet_write_byte(&ptr, header.byte); /* write header */
// 报文固定头部剩余长度字段
ptr += mqtt_common_packet_encode(ptr, packet_len); /* write remaining length */
// 报文可变头部协议名 + 协议版本号
if (connect_opt->mqtt_version == 4) {
mqtt_common_packet_write_string(&ptr, "MQTT");
mqtt_common_packet_write_byte(&ptr, (uint8_t)4);
} else {
mqtt_common_packet_write_string(&ptr, "MQIsdp");
mqtt_common_packet_write_byte(&ptr, (uint8_t)3);
}
// 报文可变头部连接标识位
flags.all = 0;
flags.bits.cleansession = connect_opt->clean_session;
if (connect_opt->username_len) {
flags.bits.username = 1;
}
if (connect_opt->password_len) {
flags.bits.password = 1;
}
mqtt_common_packet_write_byte(&ptr, flags.all);
// 报文可变头部心跳周期/保持连接, 一个以秒为单位的时间间隔, 表示为一个16位的字
mqtt_common_packet_write_dbyte(&ptr, connect_opt->keep_alive_interval);
// 有效负载部分: 客户端标识符
mqtt_common_packet_write_string(&ptr, connect_opt->client_id);
// 用户名
if (flags.bits.username) {
mqtt_common_packet_write_string(&ptr, connect_opt->username);
}
if (flags.bits.password) {
mqtt_common_packet_write_string(&ptr, connect_opt->password);
}
*serialized_len = ptr - buf;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Deserializes the supplied (wire) buffer into connack data - return code
* @param sessionPresent the session present flag returned (only for MQTT 3.1.1)
* @param connack_rc returned integer value of the connack return code
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buflen the length in bytes of the data in the supplied buffer
* @return int indicating function execution status
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_deserialize_connack_packet(uint8_t *session_present,
uint8_t *connack_rc,
uint8_t *buf,
size_t buf_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(session_present, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(connack_rc, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
mqtt_header_t header = {0};
mqtt_connack_flags_t flags = {0};
uint8_t *curdata = buf, *enddata = NULL;
uint32_t decoded_len = 0, read_bytes_len = 0;
// CONNACK 头部大小是固定的2字节长度, 可变头部也是两个字节的长度, 无有效负载
if (buf_len < 4) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
// 读取固定头部第一个字节
header.byte = mqtt_common_packet_read_byte(&curdata);
if (header.bits.type != MQTT_PACKET_TYPE_CONNACK) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
// 读取固定头部剩余长度字段
QCLOUD_FUNC_EXIT_RC_IF_NOT(rc = mqtt_common_packet_decode_from_buf(curdata, &decoded_len, &read_bytes_len), QCLOUD_ERR_SUCCESS, rc);
curdata += read_bytes_len;
enddata = curdata + decoded_len;
if (enddata - curdata != 2) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
// 读取可变头部-连接确认标志 参考MQTT协议说明文档3.2.2.1小结
flags.all = mqtt_common_packet_read_byte(&curdata);
*session_present = flags.bits.sessionpresent;
// 读取可变头部-连接返回码 参考MQTT协议说明文档3.2.2.3小结
*connack_rc = mqtt_common_packet_read_byte(&curdata);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Serializes a 0-length packet into the supplied buffer, ready for writing to a socket
* @param buf the buffer into which the packet will be serialized
* @param buf_len the length in bytes of the supplied buffer, to avoid overruns
* @param packettype the message type
* @param serialized length
* @return int indicating function execution status
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_zero_payload_packet(uint8_t *buf,
size_t buf_len,
mqtt_packet_t packet_type,
uint32_t *serialized_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
uint8_t *ptr = buf;
mqtt_header_t header = {0};
/* Buffer should have at least 2 bytes for the header */
if (buf_len < 2) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.byte = 0;
header.bits.type = packet_type;
/* write header */
mqtt_common_packet_write_byte(&ptr, header.byte);
/* write remaining length */
ptr += mqtt_common_packet_encode(ptr, 0);
*serialized_len = ptr - buf;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Deserializes the supplied (wire) buffer into publish data
* @param dup returned integer - the MQTT dup flag
* @param qos returned integer - the MQTT QoS value
* @param retained returned integer - the MQTT retained flag
* @param packet_id returned integer - the MQTT packet identifier
* @param topicName returned MQTTString - the MQTT topic in the publish
* @param payload returned byte buffer - the MQTT publish payload
* @param payload_len returned integer - the length of the MQTT payload
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buf_len the length in bytes of the data in the supplied buffer
* @return error code. 1 is success
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_deserialize_publish_packet(uint8_t *dup, int *qos,
uint8_t *retained, uint16_t *packet_id,
char **topic, uint16_t *topic_len,
uint8_t **payload, size_t *payload_len,
uint8_t *buf, size_t buf_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(dup, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(qos, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(retained, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(packet_id, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
uint32_t decoded_len = 0, read_bytes_len = 0;
mqtt_header_t header = {0};
uint8_t *curdata = buf, *enddata = NULL;
/* Publish header size is at least four bytes.
* Fixed header is two bytes.
* Variable header size depends on QoS And Topic Name.
* QoS level 0 doesn't have a message identifier (0 - 2 bytes)
* Topic Name length fields decide size of topic name field (at least 2 bytes)
* MQTT v3.1.1 Specification 3.3.1 */
if (buf_len < 4) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.byte = mqtt_common_packet_read_byte(&curdata);
if (header.bits.type != MQTT_PACKET_TYPE_PUBLISH) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
*dup = header.bits.dup;
*qos = header.bits.qos;
*retained = header.bits.retain;
/* read remaining length */
rc = mqtt_common_packet_decode_from_buf(curdata, &decoded_len, &read_bytes_len); /* read remaining length */
if (QCLOUD_ERR_SUCCESS != rc) {
QCLOUD_FUNC_EXIT_RC(rc);
}
curdata += read_bytes_len;
enddata = curdata + decoded_len;
/* do we have enough data to read the protocol version byte? */
if (mqtt_common_packet_read_string(topic, topic_len, &curdata, enddata) != QCLOUD_ERR_SUCCESS ||
enddata - curdata < 0) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
if (*qos > MQTT_QOS0) {
*packet_id = mqtt_common_packet_read_dbyte(&curdata);
}
*payload_len = enddata - curdata;
*payload = curdata;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Serializes a puback packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_puback_packet(uint8_t *buf,
size_t buf_len,
uint16_t packet_id,
uint32_t *serialized_len)
{
return mqtt_common_serialize_ack_packet(buf, buf_len, MQTT_PACKET_TYPE_PUBACK, 0, packet_id, serialized_len);
}
/**
* Serializes a pubrel packet into the supplied buffer.
* @param buf the buffer into which the packet will be serialized
* @param buflen the length in bytes of the supplied buffer
* @param dup integer - the MQTT dup flag
* @param packetid integer - the MQTT packet identifier
* @return serialized length, or error if 0
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_pubrel_packet(uint8_t *buf,
size_t buf_len,
uint8_t dup,
uint16_t packet_id,
uint32_t *serialized_len)
{
return mqtt_common_serialize_ack_packet(buf, buf_len, MQTT_PACKET_TYPE_PUBREL, dup, packet_id, serialized_len);
}
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_pubrec_packet(uint8_t *buf,
size_t buf_len,
uint16_t packet_id,
uint32_t *serialized_len)
{
return mqtt_common_serialize_ack_packet(buf, buf_len, MQTT_PACKET_TYPE_PUBREC, 0, packet_id, serialized_len);
}
/**
* Serializes the supplied publish data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buf_len the length in bytes of the supplied buffer
* @param dup integer - the MQTT dup flag
* @param qos integer - the MQTT QoS value
* @param retained integer - the MQTT retained flag
* @param packet_id integer - the MQTT packet identifier
* @param topicName MQTTString - the MQTT topic in the publish
* @param payload byte buffer - the MQTT publish payload
* @param payload_len integer - the length of the MQTT payload
* @return the length of the serialized data. <= 0 indicates error
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_publish_packet(uint8_t *buf,
size_t buf_len,
uint8_t dup,
int qos,
uint8_t retained,
uint16_t packet_id,
char *topic,
uint8_t *payload,
size_t payload_len,
uint32_t *serialized_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(payload, QCLOUD_ERR_INVAL);
uint8_t *ptr = buf;
mqtt_header_t header = {0};
int rem_len = 0;
rem_len = mqtt_common_serialize_publish_packet_length(qos, topic, payload_len);
if (mqtt_common_packet_length_get(rem_len) > buf_len) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.bits.type = MQTT_PACKET_TYPE_PUBLISH;
header.bits.dup = dup;
header.bits.qos = qos;
header.bits.retain = retained;
mqtt_common_packet_write_byte(&ptr, header.byte); /* write header */
ptr += mqtt_common_packet_encode(ptr, rem_len); /* write remaining length */;
mqtt_common_packet_write_string(&ptr, topic); /* Variable Header: Topic Name */
if (qos > MQTT_QOS0) {
mqtt_common_packet_write_dbyte(&ptr, packet_id); /* Variable Header: Topic Name */
}
memcpy(ptr, payload, payload_len);
ptr += payload_len;
*serialized_len = ptr - buf;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Serializes the supplied subscribe data into the supplied buffer, ready for sending
* @param buf the buffer into which the packet will be serialized
* @param buf_len the length in bytes of the supplied bufferr
* @param dup integer - the MQTT dup flag
* @param packet_id integer - the MQTT packet identifier
* @param count - number of members in the topicFilters and reqQos arrays
* @param topicFilters - array of topic filter names
* @param requestedQoSs - array of requested QoS
* @return the length of the serialized data. <= 0 indicates error
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_subscribe_packet(uint8_t *buf,
size_t buf_len,
uint8_t dup,
uint16_t packet_id,
uint32_t count,
char *topic_filters[],
int requested_qoss[],
uint32_t *serialized_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
int i = 0;
uint8_t *ptr = buf;
mqtt_header_t header = {0};
uint32_t rem_len = 0;
// SUBSCRIBE报文的剩余长度 = 报文标识符(2 byte) + count * (长度字段(2 byte) + topicLen + qos(1 byte))
rem_len = mqtt_common_serialize_subscribe_packet_length(count, topic_filters);
if (mqtt_common_packet_length_get(rem_len) > buf_len) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
// 初始化报文头部
header.byte = 0;
header.bits.type = MQTT_PACKET_TYPE_SUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = MQTT_QOS1;
// 写报文固定头部第一个字节
mqtt_common_packet_write_byte(&ptr, header.byte);
// 写报文固定头部剩余长度字段
ptr += mqtt_common_packet_encode(ptr, rem_len);
// 写可变头部: 报文标识符
mqtt_common_packet_write_dbyte(&ptr, packet_id);
// 写报文的负载部分数据
for (i = 0; i < count; ++i) {
mqtt_common_packet_write_string(&ptr, topic_filters[i]);
mqtt_common_packet_write_byte(&ptr, (uint8_t)requested_qoss[i]);
}
*serialized_len = ptr - buf;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
/**
* Serializes the supplied unsubscribe data into the supplied buffer, ready for sending
* @param buf the raw buffer data, of the correct length determined by the remaining length field
* @param buf_len the length in bytes of the data in the supplied buffer
* @param dup integer - the MQTT dup flag
* @param packet_id integer - the MQTT packet identifier
* @param count - number of members in the topicFilters array
* @param topicFilters - array of topic filter names
* @param serialized_len - the length of the serialized data
* @return int indicating function execution status
*/
__QCLOUD_INTERNAL__ qcloud_err_t mqtt_common_serialize_unsubscribe_packet(uint8_t *buf, size_t buf_len,
uint8_t dup, uint16_t packet_id,
uint32_t count, char *topic_filters[],
uint32_t *serialized_len)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
int i = 0;
uint8_t *ptr = buf;
mqtt_header_t header = {0};
uint32_t rem_len = 0;
rem_len = mqtt_common_serialize_unsubscribe_packet_length(count, topic_filters);
if (mqtt_common_packet_length_get(rem_len) > buf_len) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
header.byte = 0;
header.bits.type = MQTT_PACKET_TYPE_UNSUBSCRIBE;
header.bits.dup = dup;
header.bits.qos = MQTT_QOS1;
mqtt_common_packet_write_byte(&ptr, header.byte); /* write header */
ptr += mqtt_common_packet_encode(ptr, rem_len); /* write remaining length */
mqtt_common_packet_write_dbyte(&ptr, packet_id);
for (i = 0; i < count; ++i) {
mqtt_common_packet_write_string(&ptr, topic_filters[i]);
}
*serialized_len = ptr - buf;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}

View File

@@ -0,0 +1,153 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "qcloud.h"
__QCLOUD_STATIC__ qcloud_err_t mqtt_connack2errno(uint8_t connack_rc)
{
switch (connack_rc) {
case MQTT_CONNACK_CONNECTION_ACCEPTED:
return QCLOUD_ERR_MQTT_CONNACK_CONNECTION_ACCEPTED;
case MQTT_CONANCK_UNACCEPTABLE_PROTOCOL_VERSION_ERROR:
return QCLOUD_ERR_MQTT_CONANCK_UNACCEPTABLE_PROTOCOL_VERSION;
case MQTT_CONNACK_IDENTIFIER_REJECTED_ERROR:
return QCLOUD_ERR_MQTT_CONNACK_IDENTIFIER_REJECTED;
case MQTT_CONNACK_SERVER_UNAVAILABLE_ERROR:
return QCLOUD_ERR_MQTT_CONNACK_SERVER_UNAVAILABLE;
case MQTT_CONNACK_BAD_USERDATA_ERROR:
return QCLOUD_ERR_MQTT_CONNACK_BAD_USERDATA;
case MQTT_CONNACK_NOT_AUTHORIZED_ERROR:
return QCLOUD_ERR_MQTT_CONNACK_NOT_AUTHORIZED;
default:
return QCLOUD_ERR_MQTT_CONNACK_UNKNOWN;
}
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_connect(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(connect_opt, QCLOUD_ERR_INVAL);
osal_timer_t timer;
uint32_t serialized_len = 0;
uint8_t connack_rc = 0, session_present = 0;
qcloud_err_t rc = QCLOUD_ERR_FAILURE;
osal_timer_init(&timer);
osal_timer_countdown_ms(&timer, client->command_timeout);
QCLOUD_FUNC_EXIT_RC_IF_NOT(rc = client->network.connect(&(client->network)), QCLOUD_ERR_SUCCESS, rc);
osal_mutex_lock(client->tx_lock);
// 序列化CONNECT报文
rc = mqtt_common_serialize_connect_packet(client->tx_buffer, sizeof(client->tx_buffer), connect_opt, &serialized_len);
if (rc != QCLOUD_ERR_SUCCESS || serialized_len == 0) {
osal_mutex_unlock(client->tx_lock);
goto errout;
}
// 发送CONNECT报文
rc = mqtt_glue_packet_send(client, serialized_len, &timer);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
goto errout;
}
osal_mutex_unlock(client->tx_lock);
// 阻塞等待CONNACK的报文,
rc = mqtt_glue_spin4ack(client, &timer, (uint8_t)MQTT_PACKET_TYPE_CONNACK);
if (QCLOUD_ERR_SUCCESS != rc) {
goto errout;
}
// 反序列化CONNACK包, 检查返回码
rc = mqtt_common_deserialize_connack_packet(&session_present, &connack_rc, client->rx_buffer, sizeof(client->rx_buffer));
if (QCLOUD_ERR_SUCCESS != rc) {
goto errout;
}
rc = mqtt_connack2errno(connack_rc);
if (rc != QCLOUD_ERR_MQTT_CONNACK_CONNECTION_ACCEPTED) {
goto errout;
}
mqtt_client_connection_state_set(client, QCLOUD_MQTT_CONNECTION_STATE_CONNECTED);
client->keep_alive_interval = connect_opt->keep_alive_interval;
osal_mutex_lock(client->global_lock);
client->is_manually_disconnected = QCLOUD_FALSE;
client->ping_outstanding = 0;
osal_timer_countdown(&client->ping_timer, client->keep_alive_interval);
osal_mutex_unlock(client->global_lock);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
errout:
client->network.disconnect(&client->network);
QCLOUD_FUNC_EXIT_RC(rc);
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_disconnect(qcloud_mqtt_client_t *client)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
osal_timer_t timer;
uint32_t serialized_len = 0;
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
mqtt_glue_msg_handler_list_destroy(client);
// 1. 组disconnect包
osal_mutex_lock(client->tx_lock);
rc = mqtt_common_serialize_zero_payload_packet(client->tx_buffer, sizeof(client->tx_buffer), MQTT_PACKET_TYPE_DISCONNECT, &serialized_len);
if (rc != QCLOUD_ERR_SUCCESS) {
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
osal_timer_init(&timer);
osal_timer_countdown_ms(&timer, client->command_timeout);
// 2. 发送disconnect包
if (serialized_len > 0) {
rc = mqtt_glue_packet_send(client, serialized_len, &timer);
if (rc != QCLOUD_ERR_SUCCESS) {
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
}
osal_mutex_unlock(client->tx_lock);
// 3. 断开底层TCP连接, 并修改相关标识位
client->network.disconnect(&(client->network));
mqtt_client_connection_state_set(client, QCLOUD_MQTT_CONNECTION_STATE_DISCONNECTED);
client->is_manually_disconnected = QCLOUD_TRUE;
QCLOUD_LOG_I("mqtt disconnect!");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
#ifdef __cplusplus
}
#endif

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,65 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "qcloud.h"
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_publish(qcloud_mqtt_client_t *client, char *topic, mqtt_publish_opt_t *publish_opt)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(publish_opt, QCLOUD_ERR_INVAL);
QCLOUD_STRING_PTR_SANITY_CHECK(topic, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
osal_timer_t timer;
uint32_t len = 0;
size_t topic_len = 0;
topic_len = strlen(topic);
if (topic_len > QCLOUD_MQTT_TOPIC_SIZE_MAX) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
}
if (publish_opt->qos == MQTT_QOS2) {
QCLOUD_LOG_E("QoS2 isn't supported yet");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_QOS_NOT_SUPPORT);
}
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
osal_timer_init(&timer);
osal_timer_countdown_ms(&timer, client->command_timeout);
osal_mutex_lock(client->tx_lock);
if (publish_opt->qos == MQTT_QOS1) {
publish_opt->id = mqtt_glue_packet_id_generate(client);
}
QCLOUD_LOG_D("publish topic seq=%d|name=%s|payload=%s", publish_opt->id, topic, (char *)publish_opt->payload);
rc = mqtt_common_serialize_publish_packet(client->tx_buffer, sizeof(client->tx_buffer),
0, publish_opt->qos, publish_opt->retained, publish_opt->id,
topic, (uint8_t *) publish_opt->payload, publish_opt->payload_len, &len);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
rc = mqtt_glue_packet_send(client, len, &timer);
if (rc == QCLOUD_ERR_SUCCESS && publish_opt->qos > MQTT_QOS0) {
mqtt_glue_ack_list_record(client, QCLOUD_MQTT_ACK_TYPE_PUBACK, NULL, publish_opt->id, len);
}
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,89 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "qcloud.h"
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_subscribe(qcloud_mqtt_client_t *client, const char *topic_filter, mqtt_subscribe_opt_t *subscribe_opt)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(subscribe_opt, QCLOUD_ERR_INVAL);
QCLOUD_STRING_PTR_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
osal_timer_t timer;
size_t topic_len;
uint32_t len = 0;
uint16_t packet_id = 0;
char *topic_filter_mutable = NULL;
qcloud_mqtt_msg_handler_t *msg_handler = NULL;
topic_len = strlen(topic_filter);
if (topic_len > QCLOUD_MQTT_TOPIC_SIZE_MAX) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
}
if (subscribe_opt->qos == MQTT_QOS2) {
QCLOUD_LOG_E("QoS2 not supported yet");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_QOS_NOT_SUPPORT);
}
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN)
}
/* topic filter should be valid in the whole sub life */
topic_filter_mutable = mqtt_glue_string_const2mutable(topic_filter, topic_len);
if (!topic_filter_mutable) {
QCLOUD_LOG_E("malloc failed");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
osal_timer_init(&timer);
osal_timer_countdown_ms(&timer, client->command_timeout);
osal_mutex_lock(client->tx_lock);
// 序列化SUBSCRIBE报文
packet_id = mqtt_glue_packet_id_generate(client);
QCLOUD_LOG_D("topic name=%s|packet id=%d|private data=%s", topic_filter_mutable, packet_id, (char *)subscribe_opt->private_data);
rc = mqtt_common_serialize_subscribe_packet(client->tx_buffer, sizeof(client->tx_buffer),
0, packet_id, 1, &topic_filter_mutable,
(int *)&subscribe_opt->qos, &len);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
mqtt_glue_string_mutable_free(topic_filter_mutable);
QCLOUD_FUNC_EXIT_RC(rc);
}
// 发送SUBSCRIBE报文
rc = mqtt_glue_packet_send(client, len, &timer);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
mqtt_glue_string_mutable_free(topic_filter_mutable);
QCLOUD_FUNC_EXIT_RC(rc);
}
msg_handler = mqtt_glue_msg_handler_create(topic_filter_mutable, subscribe_opt);
if (!msg_handler) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
rc = mqtt_glue_ack_list_record(client, QCLOUD_MQTT_ACK_TYPE_SUBACK, msg_handler, packet_id, len);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
mqtt_glue_msg_handler_destory(msg_handler);
QCLOUD_FUNC_EXIT_RC(rc);
}
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,83 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "qcloud.h"
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_unsubscribe(qcloud_mqtt_client_t *client, const char *topic_filter)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_STRING_PTR_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
osal_timer_t timer;
size_t topic_len = 0;
uint32_t len = 0;
uint16_t packet_id = 0;
int is_subscribed = QCLOUD_FALSE;
char *topic_filter_mutable = NULL;
topic_len = strlen(topic_filter);
if (topic_len > QCLOUD_MQTT_TOPIC_SIZE_MAX) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
}
mqtt_glue_msg_handler_uninstall(client, topic_filter, &is_subscribed);
if (!is_subscribed) {
QCLOUD_LOG_E("not subscribed: %s", topic_filter);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_UNSUB_FAIL);
}
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
/* topic filter should be valid in the whole sub life */
topic_filter_mutable = mqtt_glue_string_const2mutable(topic_filter, topic_len);
if (!topic_filter_mutable) {
QCLOUD_LOG_E("malloc failed");
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
osal_timer_init(&timer);
osal_timer_countdown_ms(&timer, client->command_timeout);
osal_mutex_lock(client->tx_lock);
packet_id = mqtt_glue_packet_id_generate(client);
rc = mqtt_common_serialize_unsubscribe_packet(client->tx_buffer, sizeof(client->tx_buffer),
0, packet_id, 1, &topic_filter_mutable, &len);
mqtt_glue_string_mutable_free(topic_filter_mutable);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
/* send the unsubscribe packet */
rc = mqtt_glue_packet_send(client, len, &timer);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
rc = mqtt_glue_ack_list_record(client, QCLOUD_MQTT_ACK_TYPE_UNSUBACK, NULL, packet_id, len);
if (QCLOUD_ERR_SUCCESS != rc) {
QCLOUD_LOG_E("push publish into to pubInfolist failed: %d", rc);
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,286 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "qcloud.h"
__QCLOUD_STATIC__ qcloud_err_t mqtt_yield_resubscribe(qcloud_mqtt_client_t *client)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
char *topic = NULL;
mqtt_subscribe_opt_t subscribe_opt;
qcloud_list_t *curr, *next;
qcloud_mqtt_msg_handler_t *msg_handler;
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
if (qcloud_list_empty(&client->msg_handler_list)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
osal_mutex_lock(client->msg_handler_list_lock);
QCLOUD_LIST_FOR_EACH_SAFE(curr, next, &client->msg_handler_list) {
msg_handler = QCLOUD_LIST_ENTRY(curr, qcloud_mqtt_msg_handler_t, list);
subscribe_opt.message_handler = msg_handler->handler;
subscribe_opt.qos = msg_handler->qos;
subscribe_opt.private_data = msg_handler->private_data;
rc = qcloud_mqtt_subscribe(client, msg_handler->topic_filter_mutable, &subscribe_opt);
if (rc != QCLOUD_ERR_SUCCESS) {
osal_mutex_unlock(client->msg_handler_list_lock);
QCLOUD_LOG_E("resubscribe failed %d, topic: %s", rc, topic);
QCLOUD_FUNC_EXIT_RC(rc);
}
}
osal_mutex_unlock(client->msg_handler_list_lock);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_STATIC__ qcloud_err_t mqtt_yield_do_reconnect(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_POINTER_SANITY_CHECK(connect_opt, QCLOUD_ERR_INVAL);
qcloud_err_t rc;
QCLOUD_LOG_I("attempt to reconnect...");
if (qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_ALREADY_CONNECTED);
}
rc = qcloud_mqtt_connect(client, connect_opt);
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(rc);
}
rc = mqtt_yield_resubscribe(client);
if (rc != QCLOUD_ERR_SUCCESS) {
QCLOUD_FUNC_EXIT_RC(rc);
}
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_RECONNECTED);
}
/**
* @brief 处理非手动断开连接的情况
*
* @param client
* @return
*/
__QCLOUD_STATIC__ qcloud_err_t mqtt_yield_try_disconnect(qcloud_mqtt_client_t *client)
{
QCLOUD_FUNC_ENTRY;
qcloud_err_t rc;
if (!qcloud_mqtt_client_is_connected(client)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
rc = qcloud_mqtt_disconnect(client);
// 若断开连接失败, 强制断开底层TLS层连接
if (rc != QCLOUD_ERR_SUCCESS) {
client->network.disconnect(&(client->network));
mqtt_client_connection_state_set(client, QCLOUD_MQTT_CONNECTION_STATE_DISCONNECTED);
}
mqtt_glue_callback_involve(client, MQTT_EVENT_DISCONNECT, NULL);
// 非手动断开连接
client->is_manually_disconnected = QCLOUD_FALSE;
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
/**
* @brief 处理自动重连的相关逻辑
*
* @param client
* @return
*/
__QCLOUD_STATIC__ qcloud_err_t mqtt_yield_try_reconnect(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt)
{
QCLOUD_FUNC_ENTRY;
qcloud_err_t rc = QCLOUD_ERR_MQTT_RECONNECTED;
// 自动重连等待时间还未过期, 还未到重连的时候, 返回正在进行重连
if (!osal_timer_is_expired(&client->reconnect_timer)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT);
}
rc = mqtt_yield_do_reconnect(client, connect_opt);
if (rc == QCLOUD_ERR_MQTT_RECONNECTED) {
QCLOUD_LOG_E("reconnect success");
mqtt_glue_callback_involve(client, MQTT_EVENT_RECONNECT, NULL);
QCLOUD_FUNC_EXIT_RC(rc);
} else {
QCLOUD_LOG_E("attempt reconnect failed %d", rc);
rc = QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT;
}
client->reconnect_try_duration *= 2;
if (client->reconnect_try_duration > QCLOUD_MQTT_RECONNECT_TRY_THRESHOLD) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT);
}
osal_timer_countdown_ms(&client->reconnect_timer, client->reconnect_try_duration);
QCLOUD_FUNC_EXIT_RC(rc);
}
/**
* @brief 处理与服务器维持心跳的相关逻辑
*
* @param client
* @return
*/
__QCLOUD_STATIC__ qcloud_err_t mqtt_yield_keep_alive(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt)
{
#define MQTT_PING_RETRY_TIMES 2
QCLOUD_FUNC_ENTRY;
int try = 0;
qcloud_err_t rc;
osal_timer_t timer;
uint32_t serialized_len = 0;
if (!connect_opt->keep_alive_interval) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
if (!osal_timer_is_expired(&client->ping_timer)) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
if (client->ping_outstanding >= MQTT_PING_RETRY_TIMES) {
// reaching here means we haven't received any MQTT packet for a long time(keep_alive_interval)
QCLOUD_LOG_E("fail to recv MQTT msg.");
QCLOUD_FUNC_EXIT_RC(mqtt_yield_try_disconnect(client));
}
/* there is no ping outstanding - send one */
osal_mutex_lock(client->tx_lock);
rc = mqtt_common_serialize_zero_payload_packet(client->tx_buffer, sizeof(client->tx_buffer), MQTT_PACKET_TYPE_PINGREQ, &serialized_len);
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
QCLOUD_FUNC_EXIT_RC(rc);
}
/* send the ping packet */
osal_timer_init(&timer);
do {
osal_timer_countdown_ms(&timer, client->command_timeout);
rc = mqtt_glue_packet_send(client, serialized_len, &timer);
} while (QCLOUD_ERR_SUCCESS != rc && (try++ < 3));
if (QCLOUD_ERR_SUCCESS != rc) {
osal_mutex_unlock(client->tx_lock);
// if send a PING fails, propably the connection is not OK and we decide to disconnect and begin reconnection attempts
QCLOUD_LOG_E("fail to send PING request.");
rc = mqtt_yield_try_disconnect(client);
QCLOUD_FUNC_EXIT_RC(rc);
}
osal_mutex_unlock(client->tx_lock);
osal_mutex_lock(client->global_lock);
++client->ping_outstanding;
/* start a timer to wait for PINGRESP from server */
osal_timer_countdown(&client->ping_timer, QCLOUD_MIN(5, connect_opt->keep_alive_interval / 2));
osal_mutex_unlock(client->global_lock);
QCLOUD_LOG_D("PING request %u sent.", client->ping_outstanding);
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_SUCCESS);
}
__QCLOUD_API__ qcloud_err_t qcloud_mqtt_yield(qcloud_mqtt_client_t *client, mqtt_connect_opt_t *connect_opt, uint32_t timeout_ms)
{
QCLOUD_FUNC_ENTRY;
QCLOUD_POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QCLOUD_NUMBERIC_SANITY_CHECK(timeout_ms, QCLOUD_ERR_INVAL);
qcloud_err_t rc = QCLOUD_ERR_SUCCESS;
osal_timer_t timer;
uint8_t packet_type;
// 1. 检查连接是否已经手动断开
if (!qcloud_mqtt_client_is_connected(client)) {
if (client->is_manually_disconnected) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_MANUALLY_DISCONNECTED);
} else if (client->auto_connect_state != QCLOUD_AUTO_CONN_STATE_ENABLED) {
QCLOUD_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
}
osal_timer_init(&timer);
osal_timer_countdown_ms(&timer, timeout_ms);
// 3. 循环读取消息以及心跳包管理
while (!osal_timer_is_expired(&timer)) {
if (!qcloud_mqtt_client_is_connected(client)) {
if (client->reconnect_try_duration > QCLOUD_MQTT_RECONNECT_TRY_THRESHOLD) {
rc = QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT;
break;
}
rc = mqtt_yield_try_reconnect(client, connect_opt);
continue;
}
rc = mqtt_glue_spin(client, &timer, &packet_type);
if (rc == QCLOUD_ERR_SUCCESS) {
/* check list of ACK pend list to remove node that is timeout */
mqtt_glue_ack_list_scan(client);
rc = mqtt_yield_keep_alive(client, connect_opt);
} else if (rc == QCLOUD_ERR_SSL_READ_TIMEOUT ||
rc == QCLOUD_ERR_SSL_READ ||
rc == QCLOUD_ERR_TCP_PEER_SHUTDOWN ||
rc == QCLOUD_ERR_TCP_READ_FAIL) {
QCLOUD_LOG_E("network failed, MQTT disconnect %d", rc);
rc = mqtt_yield_try_disconnect(client);
}
if (rc == QCLOUD_ERR_MQTT_NO_CONN) {
++client->network_disconnect_counter;
if (client->auto_connect_state != QCLOUD_AUTO_CONN_STATE_ENABLED) {
break;
}
client->reconnect_try_duration = QCLOUD_MQTT_RECONNECT_TRY_INITIAL;
osal_timer_countdown_ms(&client->reconnect_timer, client->reconnect_try_duration);
// 如果超时时间到了,则会直接返回
rc = QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT;
} else if (rc != QCLOUD_ERR_SUCCESS) {
break;
}
}
QCLOUD_FUNC_EXIT_RC(rc);
}
#ifdef __cplusplus
}
#endif