update qcloud sdk
1. iot-hub sdk update to 3.2.0 2. iot-explorer update to 3.1.1
This commit is contained in:
583
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client.c
vendored
Normal file
583
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client.c
vendored
Normal file
@@ -0,0 +1,583 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making IoT Hub available.
|
||||
* Copyright (C) 2018-2020 THL A29 Limited, a Tencent company. All rights reserved.
|
||||
|
||||
* Licensed under the MIT License (the "License"); you may not use this file except in
|
||||
* compliance with the License. You may obtain a copy of the License at
|
||||
* http://opensource.org/licenses/MIT
|
||||
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is
|
||||
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||
* either express or implied. See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
#include <ctype.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "lite-utils.h"
|
||||
#include "log_upload.h"
|
||||
#include "mqtt_client.h"
|
||||
#include "qcloud_iot_ca.h"
|
||||
#include "qcloud_iot_common.h"
|
||||
#include "qcloud_iot_device.h"
|
||||
#include "qcloud_iot_export.h"
|
||||
#include "qcloud_iot_import.h"
|
||||
#include "utils_base64.h"
|
||||
#include "utils_list.h"
|
||||
|
||||
static uint16_t _get_random_start_packet_id(void)
|
||||
{
|
||||
srand((unsigned)HAL_GetTimeMs());
|
||||
return rand() % 65536 + 1;
|
||||
}
|
||||
|
||||
DeviceInfo *IOT_MQTT_GetDeviceInfo(void *pClient)
|
||||
{
|
||||
POINTER_SANITY_CHECK(pClient, NULL);
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
return &mqtt_client->device_info;
|
||||
}
|
||||
|
||||
// currently return a constant value
|
||||
int IOT_MQTT_GetErrCode(void)
|
||||
{
|
||||
return QCLOUD_ERR_FAILURE;
|
||||
}
|
||||
|
||||
void *IOT_MQTT_Construct(MQTTInitParams *pParams)
|
||||
{
|
||||
POINTER_SANITY_CHECK(pParams, NULL);
|
||||
STRING_PTR_SANITY_CHECK(pParams->product_id, NULL);
|
||||
STRING_PTR_SANITY_CHECK(pParams->device_name, NULL);
|
||||
|
||||
Qcloud_IoT_Client *mqtt_client = NULL;
|
||||
|
||||
// create and init MQTTClient
|
||||
if ((mqtt_client = (Qcloud_IoT_Client *)HAL_Malloc(sizeof(Qcloud_IoT_Client))) == NULL) {
|
||||
Log_e("malloc MQTTClient failed");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int rc = qcloud_iot_mqtt_init(mqtt_client, pParams);
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
Log_e("mqtt init failed: %d", rc);
|
||||
HAL_Free(mqtt_client);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
MQTTConnectParams connect_params = DEFAULT_MQTTCONNECT_PARAMS;
|
||||
connect_params.client_id = mqtt_client->device_info.client_id;
|
||||
// Upper limit of keep alive interval is (11.5 * 60) seconds
|
||||
connect_params.keep_alive_interval = Min(pParams->keep_alive_interval_ms / 1000, 690);
|
||||
connect_params.clean_session = pParams->clean_session;
|
||||
connect_params.auto_connect_enable = pParams->auto_connect_enable;
|
||||
#if defined(AUTH_WITH_NOTLS) && defined(AUTH_MODE_KEY)
|
||||
if (pParams->device_secret == NULL) {
|
||||
Log_e("Device secret is null!");
|
||||
qcloud_iot_mqtt_fini(mqtt_client);
|
||||
HAL_Free(mqtt_client);
|
||||
return NULL;
|
||||
}
|
||||
size_t src_len = strlen(pParams->device_secret);
|
||||
size_t len;
|
||||
memset(mqtt_client->psk_decode, 0x00, DECODE_PSK_LENGTH);
|
||||
rc = qcloud_iot_utils_base64decode(mqtt_client->psk_decode, DECODE_PSK_LENGTH, &len,
|
||||
(unsigned char *)pParams->device_secret, src_len);
|
||||
connect_params.device_secret = (char *)mqtt_client->psk_decode;
|
||||
connect_params.device_secret_len = len;
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
Log_e("Device secret decode err, secret:%s", pParams->device_secret);
|
||||
qcloud_iot_mqtt_fini(mqtt_client);
|
||||
HAL_Free(mqtt_client);
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
rc = qcloud_iot_mqtt_connect(mqtt_client, &connect_params);
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
Log_e("mqtt connect with id: %s failed: %d", mqtt_client->options.conn_id, rc);
|
||||
qcloud_iot_mqtt_fini(mqtt_client);
|
||||
HAL_Free(mqtt_client);
|
||||
return NULL;
|
||||
} else {
|
||||
Log_i("mqtt connect with id: %s success", mqtt_client->options.conn_id);
|
||||
}
|
||||
|
||||
#ifdef LOG_UPLOAD
|
||||
// log subscribe topics
|
||||
if (is_log_uploader_init()) {
|
||||
int log_level;
|
||||
rc = qcloud_get_log_level(mqtt_client, &log_level);
|
||||
// rc = qcloud_log_topic_subscribe(mqtt_client);
|
||||
if (rc < 0) {
|
||||
Log_e("client get log topic failed: %d", rc);
|
||||
}
|
||||
set_log_mqtt_client((void *)mqtt_client);
|
||||
|
||||
IOT_Log_Upload(true);
|
||||
}
|
||||
#endif
|
||||
return mqtt_client;
|
||||
}
|
||||
|
||||
int IOT_MQTT_Destroy(void **pClient)
|
||||
{
|
||||
POINTER_SANITY_CHECK(*pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)(*pClient);
|
||||
|
||||
int rc = qcloud_iot_mqtt_disconnect(mqtt_client);
|
||||
// disconnect network stack by force
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
mqtt_client->network_stack.disconnect(&(mqtt_client->network_stack));
|
||||
set_client_conn_state(mqtt_client, NOTCONNECTED);
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
|
||||
/* notify this event to topic subscriber */
|
||||
if (NULL != mqtt_client->sub_handles[i].topic_filter && NULL != mqtt_client->sub_handles[i].sub_event_handler)
|
||||
mqtt_client->sub_handles[i].sub_event_handler(mqtt_client, MQTT_EVENT_CLIENT_DESTROY,
|
||||
mqtt_client->sub_handles[i].handler_user_data);
|
||||
|
||||
if (NULL != mqtt_client->sub_handles[i].topic_filter) {
|
||||
HAL_Free((void *)mqtt_client->sub_handles[i].topic_filter);
|
||||
mqtt_client->sub_handles[i].topic_filter = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef MQTT_RMDUP_MSG_ENABLED
|
||||
reset_repeat_packet_id_buffer(mqtt_client);
|
||||
#endif
|
||||
|
||||
HAL_MutexDestroy(mqtt_client->lock_generic);
|
||||
HAL_MutexDestroy(mqtt_client->lock_write_buf);
|
||||
|
||||
HAL_MutexDestroy(mqtt_client->lock_list_sub);
|
||||
HAL_MutexDestroy(mqtt_client->lock_list_pub);
|
||||
|
||||
list_destroy(mqtt_client->list_pub_wait_ack);
|
||||
list_destroy(mqtt_client->list_sub_wait_ack);
|
||||
|
||||
HAL_Free(*pClient);
|
||||
*pClient = NULL;
|
||||
#ifdef LOG_UPLOAD
|
||||
set_log_mqtt_client(NULL);
|
||||
#endif
|
||||
Log_i("mqtt release!");
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int IOT_MQTT_Yield(void *pClient, uint32_t timeout_ms)
|
||||
{
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
|
||||
#ifdef MULTITHREAD_ENABLED
|
||||
/* only one instance of yield is allowed in running state*/
|
||||
if (mqtt_client->thread_running) {
|
||||
HAL_SleepMs(timeout_ms);
|
||||
return QCLOUD_RET_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
int rc = qcloud_iot_mqtt_yield(mqtt_client, timeout_ms);
|
||||
|
||||
#ifdef LOG_UPLOAD
|
||||
/* do instant log uploading if MQTT communication error */
|
||||
if (rc == QCLOUD_RET_SUCCESS)
|
||||
IOT_Log_Upload(false);
|
||||
else
|
||||
IOT_Log_Upload(true);
|
||||
#endif
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int IOT_MQTT_Publish(void *pClient, char *topicName, PublishParams *pParams)
|
||||
{
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
|
||||
return qcloud_iot_mqtt_publish(mqtt_client, topicName, pParams);
|
||||
}
|
||||
|
||||
int IOT_MQTT_Subscribe(void *pClient, char *topicFilter, SubscribeParams *pParams)
|
||||
{
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
|
||||
return qcloud_iot_mqtt_subscribe(mqtt_client, topicFilter, pParams);
|
||||
}
|
||||
|
||||
int IOT_MQTT_Unsubscribe(void *pClient, char *topicFilter)
|
||||
{
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
|
||||
return qcloud_iot_mqtt_unsubscribe(mqtt_client, topicFilter);
|
||||
}
|
||||
|
||||
bool IOT_MQTT_IsSubReady(void *pClient, char *topicFilter)
|
||||
{
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
|
||||
return qcloud_iot_mqtt_is_sub_ready(mqtt_client, topicFilter);
|
||||
}
|
||||
|
||||
bool IOT_MQTT_IsConnected(void *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
|
||||
IOT_FUNC_EXIT_RC(get_client_conn_state(mqtt_client) == 1)
|
||||
}
|
||||
|
||||
#ifdef MULTITHREAD_ENABLED
|
||||
|
||||
static void _mqtt_yield_thread(void *ptr)
|
||||
{
|
||||
int rc = QCLOUD_RET_SUCCESS;
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)ptr;
|
||||
|
||||
Log_i("MQTT client %s start loop", mqtt_client->device_info.client_id);
|
||||
while (mqtt_client->thread_running) {
|
||||
int rc = qcloud_iot_mqtt_yield(mqtt_client, 500);
|
||||
|
||||
#ifdef LOG_UPLOAD
|
||||
/* do instant log uploading if MQTT communication error */
|
||||
if (rc == QCLOUD_RET_SUCCESS)
|
||||
IOT_Log_Upload(false);
|
||||
else
|
||||
IOT_Log_Upload(true);
|
||||
#endif
|
||||
|
||||
if (rc == QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT) {
|
||||
HAL_SleepMs(500);
|
||||
continue;
|
||||
} else if (rc == QCLOUD_RET_MQTT_MANUALLY_DISCONNECTED || rc == QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT) {
|
||||
Log_e("MQTT Yield thread exit with error: %d", rc);
|
||||
break;
|
||||
} else if (rc != QCLOUD_RET_SUCCESS && rc != QCLOUD_RET_MQTT_RECONNECTED) {
|
||||
Log_e("MQTT Yield thread error: %d", rc);
|
||||
}
|
||||
|
||||
HAL_SleepMs(200);
|
||||
}
|
||||
|
||||
mqtt_client->thread_running = false;
|
||||
mqtt_client->thread_exit_code = rc;
|
||||
|
||||
#ifdef LOG_UPLOAD
|
||||
IOT_Log_Upload(true);
|
||||
#endif
|
||||
|
||||
Log_i("MQTT client %s stop loop", mqtt_client->device_info.client_id);
|
||||
}
|
||||
|
||||
int IOT_MQTT_StartLoop(void *pClient)
|
||||
{
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
ThreadParams thread_params = {0};
|
||||
thread_params.thread_func = _mqtt_yield_thread;
|
||||
thread_params.thread_name = "MQTT_yield_thread";
|
||||
thread_params.user_arg = pClient;
|
||||
thread_params.stack_size = 4096;
|
||||
thread_params.priority = 1;
|
||||
mqtt_client->thread_running = true;
|
||||
|
||||
int rc = HAL_ThreadCreate(&thread_params);
|
||||
if (rc) {
|
||||
Log_e("create mqtt yield thread fail: %d", rc);
|
||||
return QCLOUD_ERR_FAILURE;
|
||||
}
|
||||
|
||||
HAL_SleepMs(500);
|
||||
return QCLOUD_RET_SUCCESS;
|
||||
}
|
||||
|
||||
void IOT_MQTT_StopLoop(void *pClient)
|
||||
{
|
||||
POINTER_SANITY_CHECK_RTN(pClient);
|
||||
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
mqtt_client->thread_running = false;
|
||||
HAL_SleepMs(1000);
|
||||
return;
|
||||
}
|
||||
|
||||
bool IOT_MQTT_GetLoopStatus(void *pClient, int *exit_code)
|
||||
{
|
||||
POINTER_SANITY_CHECK(pClient, false);
|
||||
Qcloud_IoT_Client *mqtt_client = (Qcloud_IoT_Client *)pClient;
|
||||
*exit_code = mqtt_client->thread_exit_code;
|
||||
return mqtt_client->thread_running;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
static inline void _strlowr(char *str)
|
||||
{
|
||||
while (*str != '\0') {
|
||||
*str = tolower(*str);
|
||||
str++;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int qcloud_iot_mqtt_init(Qcloud_IoT_Client *pClient, MQTTInitParams *pParams)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(pParams, QCLOUD_ERR_INVAL);
|
||||
|
||||
memset(pClient, 0x0, sizeof(Qcloud_IoT_Client));
|
||||
|
||||
int rc = iot_device_info_set(&(pClient->device_info), pParams->product_id, pParams->device_name);
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
Log_e("failed to set device info: %d", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
int size =
|
||||
HAL_Snprintf(pClient->host_addr, HOST_STR_LENGTH, "%s.%s", pParams->product_id, QCLOUD_IOT_MQTT_DIRECT_DOMAIN);
|
||||
if (size < 0 || size > HOST_STR_LENGTH - 1) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
// enable below code for some special platform
|
||||
//_strlowr(s_qcloud_iot_host);
|
||||
|
||||
int i = 0;
|
||||
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
|
||||
pClient->sub_handles[i].topic_filter = NULL;
|
||||
pClient->sub_handles[i].message_handler = NULL;
|
||||
pClient->sub_handles[i].sub_event_handler = NULL;
|
||||
pClient->sub_handles[i].qos = QOS0;
|
||||
pClient->sub_handles[i].handler_user_data = NULL;
|
||||
}
|
||||
|
||||
if (pParams->command_timeout < MIN_COMMAND_TIMEOUT)
|
||||
pParams->command_timeout = MIN_COMMAND_TIMEOUT;
|
||||
if (pParams->command_timeout > MAX_COMMAND_TIMEOUT)
|
||||
pParams->command_timeout = MAX_COMMAND_TIMEOUT;
|
||||
pClient->command_timeout_ms = pParams->command_timeout;
|
||||
|
||||
// packet id, random from [1 - 65536]
|
||||
pClient->next_packet_id = _get_random_start_packet_id();
|
||||
pClient->write_buf_size = QCLOUD_IOT_MQTT_TX_BUF_LEN;
|
||||
pClient->read_buf_size = QCLOUD_IOT_MQTT_RX_BUF_LEN;
|
||||
pClient->is_ping_outstanding = 0;
|
||||
pClient->was_manually_disconnected = 0;
|
||||
pClient->counter_network_disconnected = 0;
|
||||
|
||||
pClient->event_handle = pParams->event_handle;
|
||||
|
||||
pClient->lock_generic = HAL_MutexCreate();
|
||||
if (NULL == pClient->lock_generic) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
set_client_conn_state(pClient, NOTCONNECTED);
|
||||
|
||||
if ((pClient->lock_write_buf = HAL_MutexCreate()) == NULL) {
|
||||
Log_e("create write buf lock failed.");
|
||||
goto error;
|
||||
}
|
||||
if ((pClient->lock_list_sub = HAL_MutexCreate()) == NULL) {
|
||||
Log_e("create sub list lock failed.");
|
||||
goto error;
|
||||
}
|
||||
if ((pClient->lock_list_pub = HAL_MutexCreate()) == NULL) {
|
||||
Log_e("create pub list lock failed.");
|
||||
goto error;
|
||||
}
|
||||
|
||||
if ((pClient->list_pub_wait_ack = list_new()) == NULL) {
|
||||
Log_e("create pub wait list failed.");
|
||||
goto error;
|
||||
}
|
||||
pClient->list_pub_wait_ack->free = HAL_Free;
|
||||
|
||||
if ((pClient->list_sub_wait_ack = list_new()) == NULL) {
|
||||
Log_e("create sub wait list failed.");
|
||||
goto error;
|
||||
}
|
||||
pClient->list_sub_wait_ack->free = HAL_Free;
|
||||
|
||||
#ifndef AUTH_WITH_NOTLS
|
||||
// device param for TLS connection
|
||||
#ifdef AUTH_MODE_CERT
|
||||
Log_d("cert file: %s", pParams->cert_file);
|
||||
Log_d("key file: %s", pParams->key_file);
|
||||
|
||||
strncpy(pClient->cert_file_path, pParams->cert_file, FILE_PATH_MAX_LEN - 1);
|
||||
strncpy(pClient->key_file_path, pParams->key_file, FILE_PATH_MAX_LEN - 1);
|
||||
|
||||
pClient->network_stack.ssl_connect_params.cert_file = pClient->cert_file_path;
|
||||
pClient->network_stack.ssl_connect_params.key_file = pClient->key_file_path;
|
||||
pClient->network_stack.ssl_connect_params.ca_crt = iot_ca_get();
|
||||
pClient->network_stack.ssl_connect_params.ca_crt_len = strlen(pClient->network_stack.ssl_connect_params.ca_crt);
|
||||
#else
|
||||
if (pParams->device_secret != NULL) {
|
||||
size_t src_len = strlen(pParams->device_secret);
|
||||
size_t len;
|
||||
memset(pClient->psk_decode, 0x00, DECODE_PSK_LENGTH);
|
||||
qcloud_iot_utils_base64decode(pClient->psk_decode, DECODE_PSK_LENGTH, &len,
|
||||
(unsigned char *)pParams->device_secret, src_len);
|
||||
pClient->network_stack.ssl_connect_params.psk = (char *)pClient->psk_decode;
|
||||
pClient->network_stack.ssl_connect_params.psk_length = len;
|
||||
} else {
|
||||
Log_e("psk is empty!");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_INVAL);
|
||||
}
|
||||
if (strnlen(pClient->device_info.client_id, MAX_SIZE_OF_CLIENT_ID) == 0) {
|
||||
Log_e("psk id is empty!");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_INVAL);
|
||||
}
|
||||
pClient->network_stack.ssl_connect_params.psk_id = pClient->device_info.client_id;
|
||||
pClient->network_stack.ssl_connect_params.ca_crt = NULL;
|
||||
pClient->network_stack.ssl_connect_params.ca_crt_len = 0;
|
||||
#endif
|
||||
|
||||
pClient->network_stack.host = pClient->host_addr;
|
||||
pClient->network_stack.port = MQTT_SERVER_PORT_TLS;
|
||||
pClient->network_stack.ssl_connect_params.timeout_ms =
|
||||
pClient->command_timeout_ms > QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT ? pClient->command_timeout_ms
|
||||
: QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT;
|
||||
|
||||
#else
|
||||
pClient->network_stack.host = pClient->host_addr;
|
||||
pClient->network_stack.port = MQTT_SERVER_PORT_NOTLS;
|
||||
#endif
|
||||
|
||||
// init network stack
|
||||
qcloud_iot_mqtt_network_init(&(pClient->network_stack));
|
||||
|
||||
// ping timer and reconnect delay timer
|
||||
InitTimer(&(pClient->ping_timer));
|
||||
InitTimer(&(pClient->reconnect_delay_timer));
|
||||
|
||||
#ifdef SYSTEM_COMM
|
||||
pClient->sys_state.result_recv_ok = false;
|
||||
pClient->sys_state.topic_sub_ok = false;
|
||||
pClient->sys_state.time = 0;
|
||||
#endif
|
||||
|
||||
#ifdef MULTITHREAD_ENABLED
|
||||
pClient->thread_running = false;
|
||||
#endif
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
|
||||
error:
|
||||
if (pClient->list_pub_wait_ack) {
|
||||
pClient->list_pub_wait_ack->free(pClient->list_pub_wait_ack);
|
||||
pClient->list_pub_wait_ack = NULL;
|
||||
}
|
||||
if (pClient->list_sub_wait_ack) {
|
||||
pClient->list_sub_wait_ack->free(pClient->list_sub_wait_ack);
|
||||
pClient->list_sub_wait_ack = NULL;
|
||||
}
|
||||
if (pClient->lock_generic) {
|
||||
HAL_MutexDestroy(pClient->lock_generic);
|
||||
pClient->lock_generic = NULL;
|
||||
}
|
||||
if (pClient->lock_list_sub) {
|
||||
HAL_MutexDestroy(pClient->lock_list_sub);
|
||||
pClient->lock_list_sub = NULL;
|
||||
}
|
||||
if (pClient->lock_list_pub) {
|
||||
HAL_MutexDestroy(pClient->lock_list_pub);
|
||||
pClient->lock_list_pub = NULL;
|
||||
}
|
||||
if (pClient->lock_write_buf) {
|
||||
HAL_MutexDestroy(pClient->lock_write_buf);
|
||||
pClient->lock_write_buf = NULL;
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE)
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_fini(Qcloud_IoT_Client *mqtt_client)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(mqtt_client, QCLOUD_ERR_INVAL);
|
||||
|
||||
HAL_MutexDestroy(mqtt_client->lock_generic);
|
||||
HAL_MutexDestroy(mqtt_client->lock_write_buf);
|
||||
|
||||
HAL_MutexDestroy(mqtt_client->lock_list_sub);
|
||||
HAL_MutexDestroy(mqtt_client->lock_list_pub);
|
||||
|
||||
list_destroy(mqtt_client->list_pub_wait_ack);
|
||||
list_destroy(mqtt_client->list_sub_wait_ack);
|
||||
|
||||
Log_i("release mqtt client resources");
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_set_autoreconnect(Qcloud_IoT_Client *pClient, bool value)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
pClient->options.auto_connect_enable = (uint8_t)value;
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
bool qcloud_iot_mqtt_is_autoreconnect_enabled(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
bool is_enabled = false;
|
||||
if (pClient->options.auto_connect_enable == 1) {
|
||||
is_enabled = true;
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(is_enabled);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_get_network_disconnected_count(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
IOT_FUNC_EXIT_RC(pClient->counter_network_disconnected);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_reset_network_disconnected_count(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
pClient->counter_network_disconnected = 0;
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
1405
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_common.c
vendored
Normal file
1405
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_common.c
vendored
Normal file
File diff suppressed because it is too large
Load Diff
477
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_connect.c
vendored
Normal file
477
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_connect.c
vendored
Normal file
@@ -0,0 +1,477 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2014 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
|
||||
*******************************************************************************/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "mqtt_client.h"
|
||||
#include "qcloud_iot_common.h"
|
||||
#include "utils_hmac.h"
|
||||
|
||||
#define MQTT_CONNECT_FLAG_USERNAME 0x80
|
||||
#define MQTT_CONNECT_FLAG_PASSWORD 0x40
|
||||
#define MQTT_CONNECT_FLAG_WILL_RETAIN 0x20
|
||||
#define MQTT_CONNECT_FLAG_WILL_QOS2 0x18
|
||||
#define MQTT_CONNECT_FLAG_WILL_QOS1 0x08
|
||||
#define MQTT_CONNECT_FLAG_WILL_QOS0 0x00
|
||||
#define MQTT_CONNECT_FLAG_WILL_FLAG 0x04
|
||||
#define MQTT_CONNECT_FLAG_CLEAN_SES 0x02
|
||||
|
||||
#define MQTT_CONNACK_FLAG_SES_PRE 0x01
|
||||
|
||||
/**
|
||||
* Connect return code
|
||||
*/
|
||||
typedef enum {
|
||||
CONNACK_CONNECTION_ACCEPTED = 0, // connection accepted
|
||||
CONANCK_UNACCEPTABLE_PROTOCOL_VERSION_ERROR = 1, // connection refused: unaccpeted protocol verison
|
||||
CONNACK_IDENTIFIER_REJECTED_ERROR = 2, // connection refused: identifier rejected
|
||||
CONNACK_SERVER_UNAVAILABLE_ERROR = 3, // connection refused: server unavailable
|
||||
CONNACK_BAD_USERDATA_ERROR = 4, // connection refused: bad user name or password
|
||||
CONNACK_NOT_AUTHORIZED_ERROR = 5 // connection refused: not authorized
|
||||
} MQTTConnackReturnCodes;
|
||||
|
||||
/**
|
||||
* Determines the length of the MQTT connect packet that would be produced using the supplied connect options.
|
||||
* @param options the options to be used to build the connect packet
|
||||
* @param the length of buffer needed to contain the serialized version of the packet
|
||||
* @return int indicating function execution status
|
||||
*/
|
||||
static uint32_t _get_packet_connect_rem_len(MQTTConnectParams *options)
|
||||
{
|
||||
size_t len = 0;
|
||||
/* variable depending on MQTT or MQIsdp */
|
||||
if (3 == options->mqtt_version) {
|
||||
len = 12;
|
||||
} else if (4 == options->mqtt_version) {
|
||||
len = 10;
|
||||
}
|
||||
|
||||
len += strlen(options->client_id) + 2;
|
||||
|
||||
if (options->username) {
|
||||
len += strlen(options->username) + 2;
|
||||
}
|
||||
|
||||
if (options->password) {
|
||||
len += strlen(options->password) + 2;
|
||||
}
|
||||
|
||||
return (uint32_t)len;
|
||||
}
|
||||
|
||||
static void _copy_connect_params(MQTTConnectParams *destination, MQTTConnectParams *source)
|
||||
{
|
||||
POINTER_SANITY_CHECK_RTN(destination);
|
||||
POINTER_SANITY_CHECK_RTN(source);
|
||||
|
||||
/* In case of reconnecting, source == destination */
|
||||
if (source == destination) {
|
||||
return;
|
||||
}
|
||||
|
||||
destination->mqtt_version = source->mqtt_version;
|
||||
destination->client_id = source->client_id;
|
||||
destination->username = source->username;
|
||||
destination->keep_alive_interval = source->keep_alive_interval;
|
||||
destination->clean_session = source->clean_session;
|
||||
destination->auto_connect_enable = source->auto_connect_enable;
|
||||
#ifdef AUTH_WITH_NOTLS
|
||||
destination->device_secret = source->device_secret;
|
||||
destination->device_secret_len = source->device_secret_len;
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the connect options into the buffer.
|
||||
* @param buf the buffer into which the packet will be serialized
|
||||
* @param len the length in bytes of the supplied buffer
|
||||
* @param options the options to be used to build the connect packet
|
||||
* @param serialized length
|
||||
* @return int indicating function execution status
|
||||
*/
|
||||
static int _serialize_connect_packet(unsigned char *buf, size_t buf_len, MQTTConnectParams *options,
|
||||
uint32_t *serialized_len)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(options, QCLOUD_ERR_INVAL);
|
||||
STRING_PTR_SANITY_CHECK(options->client_id, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char *ptr = buf;
|
||||
unsigned char header = 0;
|
||||
unsigned char flags = 0;
|
||||
uint32_t rem_len = 0;
|
||||
int rc;
|
||||
|
||||
long cur_timesec = HAL_Timer_current_sec() + MAX_ACCESS_EXPIRE_TIMEOUT / 1000;
|
||||
if (cur_timesec <= 0 || MAX_ACCESS_EXPIRE_TIMEOUT <= 0) {
|
||||
cur_timesec = LONG_MAX;
|
||||
}
|
||||
long cur_timesec_bak = cur_timesec;
|
||||
int cur_timesec_len = 0;
|
||||
while (cur_timesec_bak != 0) {
|
||||
cur_timesec_bak /= 10;
|
||||
++cur_timesec_len;
|
||||
}
|
||||
|
||||
int username_len =
|
||||
strlen(options->client_id) + strlen(QCLOUD_IOT_DEVICE_SDK_APPID) + MAX_CONN_ID_LEN + cur_timesec_len + 4;
|
||||
options->username = (char *)HAL_Malloc(username_len);
|
||||
if (options->username == NULL) {
|
||||
Log_e("malloc username failed!");
|
||||
rc = QCLOUD_ERR_MALLOC;
|
||||
goto err_exit;
|
||||
}
|
||||
|
||||
get_next_conn_id(options->conn_id);
|
||||
HAL_Snprintf(options->username, username_len, "%s;%s;%s;%ld", options->client_id, QCLOUD_IOT_DEVICE_SDK_APPID,
|
||||
options->conn_id, cur_timesec);
|
||||
|
||||
#if defined(AUTH_WITH_NOTLS) && defined(AUTH_MODE_KEY)
|
||||
if (options->device_secret != NULL && options->username != NULL) {
|
||||
char sign[41] = {0};
|
||||
utils_hmac_sha1(options->username, strlen(options->username), sign, options->device_secret,
|
||||
options->device_secret_len);
|
||||
options->password = (char *)HAL_Malloc(51);
|
||||
if (options->password == NULL) {
|
||||
Log_e("malloc password failed!");
|
||||
rc = QCLOUD_ERR_MALLOC;
|
||||
goto err_exit;
|
||||
}
|
||||
HAL_Snprintf(options->password, 51, "%s;hmacsha1", sign);
|
||||
}
|
||||
#endif
|
||||
|
||||
rem_len = _get_packet_connect_rem_len(options);
|
||||
if (get_mqtt_packet_len(rem_len) > buf_len) {
|
||||
Log_e("get_mqtt_packet_len failed!");
|
||||
rc = QCLOUD_ERR_BUF_TOO_SHORT;
|
||||
goto err_exit;
|
||||
}
|
||||
|
||||
rc = mqtt_init_packet_header(&header, CONNECT, QOS0, 0, 0);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
Log_e("mqtt_init_packet_header failed!");
|
||||
goto err_exit;
|
||||
}
|
||||
|
||||
// 1st byte in fixed header
|
||||
mqtt_write_char(&ptr, header);
|
||||
|
||||
// remaining length
|
||||
ptr += mqtt_write_packet_rem_len(ptr, rem_len);
|
||||
|
||||
// MQTT protocol name and version in variable header
|
||||
if (4 == options->mqtt_version) {
|
||||
mqtt_write_utf8_string(&ptr, "MQTT");
|
||||
mqtt_write_char(&ptr, (unsigned char)4);
|
||||
} else {
|
||||
mqtt_write_utf8_string(&ptr, "MQIsdp");
|
||||
mqtt_write_char(&ptr, (unsigned char)3);
|
||||
}
|
||||
|
||||
// flags in variable header
|
||||
flags |= (options->clean_session) ? MQTT_CONNECT_FLAG_CLEAN_SES : 0;
|
||||
flags |= (options->username != NULL) ? MQTT_CONNECT_FLAG_USERNAME : 0;
|
||||
|
||||
#if defined(AUTH_WITH_NOTLS) && defined(AUTH_MODE_KEY)
|
||||
flags |= MQTT_CONNECT_FLAG_PASSWORD;
|
||||
#endif
|
||||
|
||||
mqtt_write_char(&ptr, flags);
|
||||
|
||||
// keep alive interval (unit:ms) in variable header
|
||||
mqtt_write_uint_16(&ptr, options->keep_alive_interval);
|
||||
|
||||
// client id
|
||||
mqtt_write_utf8_string(&ptr, options->client_id);
|
||||
|
||||
if ((flags & MQTT_CONNECT_FLAG_USERNAME) && options->username != NULL) {
|
||||
mqtt_write_utf8_string(&ptr, options->username);
|
||||
HAL_Free(options->username);
|
||||
options->username = NULL;
|
||||
}
|
||||
|
||||
if ((flags & MQTT_CONNECT_FLAG_PASSWORD) && options->password != NULL) {
|
||||
mqtt_write_utf8_string(&ptr, options->password);
|
||||
HAL_Free(options->password);
|
||||
options->password = NULL;
|
||||
}
|
||||
|
||||
*serialized_len = (uint32_t)(ptr - buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
|
||||
err_exit:
|
||||
HAL_Free(options->username);
|
||||
options->username = NULL;
|
||||
|
||||
HAL_Free(options->password);
|
||||
options->password = NULL;
|
||||
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes the supplied (wire) buffer into connack data - return code
|
||||
* @param sessionPresent the session present flag returned (only for MQTT 3.1.1)
|
||||
* @param connack_rc returned integer value of the connack return code
|
||||
* @param buf the raw buffer data, of the correct length determined by the remaining length field
|
||||
* @param buflen the length in bytes of the data in the supplied buffer
|
||||
* @return int indicating function execution status
|
||||
*/
|
||||
static int _deserialize_connack_packet(uint8_t *sessionPresent, int *connack_rc, unsigned char *buf, size_t buflen)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(sessionPresent, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(connack_rc, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char header, type = 0;
|
||||
unsigned char *curdata = buf;
|
||||
unsigned char *enddata = NULL;
|
||||
int rc;
|
||||
uint32_t decodedLen = 0, readBytesLen = 0;
|
||||
unsigned char flags = 0;
|
||||
unsigned char connack_rc_char;
|
||||
|
||||
// CONNACK: 2 bytes in fixed header and 2 bytes in variable header, no payload
|
||||
if (4 > buflen) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
|
||||
}
|
||||
|
||||
header = mqtt_read_char(&curdata);
|
||||
type = (header & MQTT_HEADER_TYPE_MASK) >> MQTT_HEADER_TYPE_SHIFT;
|
||||
if (CONNACK != type) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
rc = mqtt_read_packet_rem_len_form_buf(curdata, &decodedLen, &readBytesLen);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
curdata += (readBytesLen);
|
||||
enddata = curdata + decodedLen;
|
||||
if (enddata - curdata != 2) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
// variable header - connack flag, refer to MQTT spec 3.2.2.1
|
||||
flags = mqtt_read_char(&curdata);
|
||||
*sessionPresent = flags & MQTT_CONNACK_FLAG_SES_PRE;
|
||||
|
||||
// variable header - return code, refer to MQTT spec 3.2.2.3
|
||||
connack_rc_char = mqtt_read_char(&curdata);
|
||||
switch (connack_rc_char) {
|
||||
case CONNACK_CONNECTION_ACCEPTED:
|
||||
*connack_rc = QCLOUD_RET_MQTT_CONNACK_CONNECTION_ACCEPTED;
|
||||
break;
|
||||
case CONANCK_UNACCEPTABLE_PROTOCOL_VERSION_ERROR:
|
||||
*connack_rc = QCLOUD_ERR_MQTT_CONNACK_UNACCEPTABLE_PROTOCOL_VERSION;
|
||||
break;
|
||||
case CONNACK_IDENTIFIER_REJECTED_ERROR:
|
||||
*connack_rc = QCLOUD_ERR_MQTT_CONNACK_IDENTIFIER_REJECTED;
|
||||
break;
|
||||
case CONNACK_SERVER_UNAVAILABLE_ERROR:
|
||||
*connack_rc = QCLOUD_ERR_MQTT_CONNACK_SERVER_UNAVAILABLE;
|
||||
break;
|
||||
case CONNACK_BAD_USERDATA_ERROR:
|
||||
*connack_rc = QCLOUD_ERR_MQTT_CONNACK_BAD_USERDATA;
|
||||
break;
|
||||
case CONNACK_NOT_AUTHORIZED_ERROR:
|
||||
*connack_rc = QCLOUD_ERR_MQTT_CONNACK_NOT_AUTHORIZED;
|
||||
break;
|
||||
default:
|
||||
*connack_rc = QCLOUD_ERR_MQTT_CONNACK_UNKNOWN;
|
||||
break;
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Setup connection with MQTT server
|
||||
*
|
||||
* @param pClient
|
||||
* @param options
|
||||
* @return
|
||||
*/
|
||||
static int _mqtt_connect(Qcloud_IoT_Client *pClient, MQTTConnectParams *options)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
Timer connect_timer;
|
||||
int connack_rc = QCLOUD_ERR_FAILURE, rc = QCLOUD_ERR_FAILURE;
|
||||
uint8_t sessionPresent = 0;
|
||||
uint32_t len = 0;
|
||||
|
||||
InitTimer(&connect_timer);
|
||||
countdown_ms(&connect_timer, pClient->command_timeout_ms);
|
||||
|
||||
if (NULL != options) {
|
||||
_copy_connect_params(&(pClient->options), options);
|
||||
}
|
||||
|
||||
// TCP or TLS network connect
|
||||
rc = pClient->network_stack.connect(&(pClient->network_stack));
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
HAL_MutexLock(pClient->lock_write_buf);
|
||||
// serialize CONNECT packet
|
||||
rc = _serialize_connect_packet(pClient->write_buf, pClient->write_buf_size, &(pClient->options), &len);
|
||||
if (QCLOUD_RET_SUCCESS != rc || 0 == len) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
// send CONNECT packet
|
||||
rc = send_mqtt_packet(pClient, len, &connect_timer);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
|
||||
// wait for CONNACK
|
||||
rc = wait_for_read(pClient, CONNACK, &connect_timer, QOS0);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
// deserialize CONNACK and check reture code
|
||||
rc = _deserialize_connack_packet(&sessionPresent, &connack_rc, pClient->read_buf, pClient->read_buf_size);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
if (QCLOUD_RET_MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) {
|
||||
IOT_FUNC_EXIT_RC(connack_rc);
|
||||
}
|
||||
|
||||
set_client_conn_state(pClient, CONNECTED);
|
||||
HAL_MutexLock(pClient->lock_generic);
|
||||
pClient->was_manually_disconnected = 0;
|
||||
pClient->is_ping_outstanding = 0;
|
||||
countdown(&pClient->ping_timer, pClient->options.keep_alive_interval);
|
||||
HAL_MutexUnlock(pClient->lock_generic);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_connect(Qcloud_IoT_Client *pClient, MQTTConnectParams *pParams)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
int rc;
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(pParams, QCLOUD_ERR_INVAL);
|
||||
|
||||
// check connection state first
|
||||
if (get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_ALREADY_CONNECTED);
|
||||
}
|
||||
|
||||
rc = _mqtt_connect(pClient, pParams);
|
||||
|
||||
// disconnect network if connect fail
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
pClient->network_stack.disconnect(&(pClient->network_stack));
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_attempt_reconnect(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
int rc;
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
Log_i("attempt to reconnect...");
|
||||
|
||||
if (get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_ALREADY_CONNECTED);
|
||||
}
|
||||
|
||||
rc = qcloud_iot_mqtt_connect(pClient, &pClient->options);
|
||||
|
||||
if (!get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
rc = qcloud_iot_mqtt_resubscribe(pClient);
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_RECONNECTED);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_disconnect(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
int rc;
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
Timer timer;
|
||||
uint32_t serialized_len = 0;
|
||||
|
||||
if (get_client_conn_state(pClient) == 0) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
HAL_MutexLock(pClient->lock_write_buf);
|
||||
rc = serialize_packet_with_zero_payload(pClient->write_buf, pClient->write_buf_size, DISCONNECT, &serialized_len);
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
InitTimer(&timer);
|
||||
countdown_ms(&timer, pClient->command_timeout_ms);
|
||||
|
||||
if (serialized_len > 0) {
|
||||
rc = send_mqtt_packet(pClient, serialized_len, &timer);
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
}
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
|
||||
pClient->network_stack.disconnect(&(pClient->network_stack));
|
||||
set_client_conn_state(pClient, NOTCONNECTED);
|
||||
pClient->was_manually_disconnected = 1;
|
||||
|
||||
Log_i("mqtt disconnect!");
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
60
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_net.c
vendored
Normal file
60
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_net.c
vendored
Normal file
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making IoT Hub available.
|
||||
* Copyright (C) 2018-2020 THL A29 Limited, a Tencent company. All rights reserved.
|
||||
|
||||
* Licensed under the MIT License (the "License"); you may not use this file except in
|
||||
* compliance with the License. You may obtain a copy of the License at
|
||||
* http://opensource.org/licenses/MIT
|
||||
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is
|
||||
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
||||
* either express or implied. See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "mqtt_client_net.h"
|
||||
|
||||
// TODO: how to implement
|
||||
/**
|
||||
* @brief Check if TLS connection is valid
|
||||
*
|
||||
* @param pNetwork
|
||||
* @return
|
||||
*/
|
||||
int qcloud_iot_mqtt_tls_is_connected(Network *pNetwork)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Init network stack
|
||||
*
|
||||
* @param pNetwork
|
||||
* @param pConnectParams
|
||||
* @return
|
||||
*/
|
||||
int qcloud_iot_mqtt_network_init(Network *pNetwork)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* first choice: TLS */
|
||||
pNetwork->type = NETWORK_TLS;
|
||||
|
||||
#ifdef AUTH_WITH_NOTLS
|
||||
pNetwork->type = NETWORK_TCP;
|
||||
#endif
|
||||
|
||||
rc = network_init(pNetwork);
|
||||
pNetwork->is_connected = qcloud_iot_mqtt_tls_is_connected;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
383
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_publish.c
vendored
Normal file
383
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_publish.c
vendored
Normal file
@@ -0,0 +1,383 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2014 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Ian Craggs - initial API and implementation and/or initial documentation
|
||||
* Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144
|
||||
*******************************************************************************/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "mqtt_client.h"
|
||||
#include "utils_list.h"
|
||||
|
||||
/**
|
||||
* @param mqttstring the MQTTString structure into which the data is to be read
|
||||
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
|
||||
* @param enddata pointer to the end of the data: do not read beyond
|
||||
* @return SUCCESS if successful, FAILURE if not
|
||||
*/
|
||||
static int _read_string_with_len(char **string, uint16_t *stringLen, unsigned char **pptr, unsigned char *enddata)
|
||||
{
|
||||
int rc = QCLOUD_ERR_FAILURE;
|
||||
|
||||
/* the first two bytes are the length of the string */
|
||||
/* enough length to read the integer? */
|
||||
if (enddata - (*pptr) > 1) {
|
||||
*stringLen = mqtt_read_uint16_t(pptr); /* increments pptr to point past length */
|
||||
|
||||
if (*stringLen > QCLOUD_IOT_MQTT_RX_BUF_LEN) {
|
||||
Log_e("stringLen exceed QCLOUD_IOT_MQTT_RX_BUF_LEN");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
if (&(*pptr)[*stringLen] <= enddata) {
|
||||
*string = (char *)*pptr;
|
||||
*pptr += *stringLen;
|
||||
rc = QCLOUD_RET_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines the length of the MQTT publish packet that would be produced using the supplied parameters
|
||||
* @param qos the MQTT QoS of the publish (packetid is omitted for QoS 0)
|
||||
* @param topicName the topic name to be used in the publish
|
||||
* @param payload_len the length of the payload to be sent
|
||||
* @return the length of buffer needed to contain the serialized version of the packet
|
||||
*/
|
||||
static uint32_t _get_publish_packet_len(uint8_t qos, char *topicName, size_t payload_len)
|
||||
{
|
||||
size_t len = 0;
|
||||
|
||||
len += 2 + strlen(topicName) + payload_len;
|
||||
if (qos > 0) {
|
||||
len += 2; /* packetid */
|
||||
}
|
||||
return (uint32_t)len;
|
||||
}
|
||||
|
||||
static int _mask_push_pubInfo_to(Qcloud_IoT_Client *c, int len, unsigned short msgId, ListNode **node)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
if (!c || !node) {
|
||||
Log_e("invalid parameters!");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_PUSH_TO_LIST_FAILED);
|
||||
}
|
||||
|
||||
if ((len < 0) || (len > c->write_buf_size)) {
|
||||
Log_e("the param of len is error!");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
HAL_MutexLock(c->lock_list_pub);
|
||||
|
||||
if (c->list_pub_wait_ack->len >= MAX_REPUB_NUM) {
|
||||
HAL_MutexUnlock(c->lock_list_pub);
|
||||
Log_e("more than %u elements in republish list. List overflow!", c->list_pub_wait_ack->len);
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
QcloudIotPubInfo *repubInfo = (QcloudIotPubInfo *)HAL_Malloc(sizeof(QcloudIotPubInfo) + len);
|
||||
if (NULL == repubInfo) {
|
||||
HAL_MutexUnlock(c->lock_list_pub);
|
||||
Log_e("memory malloc failed!");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
repubInfo->node_state = MQTT_NODE_STATE_NORMANL;
|
||||
repubInfo->msg_id = msgId;
|
||||
repubInfo->len = len;
|
||||
InitTimer(&repubInfo->pub_start_time);
|
||||
countdown_ms(&repubInfo->pub_start_time, c->command_timeout_ms);
|
||||
|
||||
repubInfo->buf = (unsigned char *)repubInfo + sizeof(QcloudIotPubInfo);
|
||||
|
||||
memcpy(repubInfo->buf, c->write_buf, len);
|
||||
|
||||
*node = list_node_new(repubInfo);
|
||||
if (NULL == *node) {
|
||||
HAL_MutexUnlock(c->lock_list_pub);
|
||||
HAL_Free(repubInfo);
|
||||
Log_e("list_node_new failed!");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
list_rpush(c->list_pub_wait_ack, *node);
|
||||
|
||||
HAL_MutexUnlock(c->lock_list_pub);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes the supplied (wire) buffer into publish data
|
||||
* @param dup returned integer - the MQTT dup flag
|
||||
* @param qos returned integer - the MQTT QoS value
|
||||
* @param retained returned integer - the MQTT retained flag
|
||||
* @param packet_id returned integer - the MQTT packet identifier
|
||||
* @param topicName returned MQTTString - the MQTT topic in the publish
|
||||
* @param payload returned byte buffer - the MQTT publish payload
|
||||
* @param payload_len returned integer - the length of the MQTT payload
|
||||
* @param buf the raw buffer data, of the correct length determined by the remaining length field
|
||||
* @param buf_len the length in bytes of the data in the supplied buffer
|
||||
* @return error code. 1 is success
|
||||
*/
|
||||
int deserialize_publish_packet(uint8_t *dup, QoS *qos, uint8_t *retained, uint16_t *packet_id, char **topicName,
|
||||
uint16_t *topicNameLen, unsigned char **payload, size_t *payload_len, unsigned char *buf,
|
||||
size_t buf_len)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(dup, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(qos, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(retained, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(packet_id, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char header, type = 0;
|
||||
unsigned char *curdata = buf;
|
||||
unsigned char *enddata = NULL;
|
||||
int rc;
|
||||
uint32_t decodedLen = 0;
|
||||
uint32_t readBytesLen = 0;
|
||||
|
||||
/* Publish header size is at least four bytes.
|
||||
* Fixed header is two bytes.
|
||||
* Variable header size depends on QoS And Topic Name.
|
||||
* QoS level 0 doesn't have a message identifier (0 - 2 bytes)
|
||||
* Topic Name length fields decide size of topic name field (at least 2 bytes)
|
||||
* MQTT v3.1.1 Specification 3.3.1 */
|
||||
if (4 > buf_len) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
|
||||
}
|
||||
|
||||
header = mqtt_read_char(&curdata);
|
||||
type = (header & MQTT_HEADER_TYPE_MASK) >> MQTT_HEADER_TYPE_SHIFT;
|
||||
*dup = (header & MQTT_HEADER_DUP_MASK) >> MQTT_HEADER_DUP_SHIFT;
|
||||
*qos = (QoS)((header & MQTT_HEADER_QOS_MASK) >> MQTT_HEADER_QOS_SHIFT);
|
||||
*retained = header & MQTT_HEADER_RETAIN_MASK;
|
||||
|
||||
if (PUBLISH != type) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
/* read remaining length */
|
||||
rc = mqtt_read_packet_rem_len_form_buf(curdata, &decodedLen, &readBytesLen);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
curdata += (readBytesLen);
|
||||
enddata = curdata + decodedLen;
|
||||
|
||||
/* do we have enough data to read the protocol version byte? */
|
||||
if (QCLOUD_RET_SUCCESS != _read_string_with_len(topicName, topicNameLen, &curdata, enddata) ||
|
||||
(0 > (enddata - curdata))) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
if (QOS0 != *qos) {
|
||||
*packet_id = mqtt_read_uint16_t(&curdata);
|
||||
}
|
||||
|
||||
*payload_len = (size_t)(enddata - curdata);
|
||||
*payload = curdata;
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the ack packet into the supplied buffer.
|
||||
* @param buf the buffer into which the packet will be serialized
|
||||
* @param buf_len the length in bytes of the supplied buffer
|
||||
* @param packet_type the MQTT packet type: 1.PUBACK; 2.PUBREL; 3.PUBCOMP
|
||||
* @param dup the MQTT dup flag
|
||||
* @param packet_id the MQTT packet identifier
|
||||
* @return serialized length, or error if 0
|
||||
*/
|
||||
int serialize_pub_ack_packet(unsigned char *buf, size_t buf_len, MessageTypes packet_type, uint8_t dup,
|
||||
uint16_t packet_id, uint32_t *serialized_len)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char header = 0;
|
||||
unsigned char *ptr = buf;
|
||||
QoS requestQoS = (PUBREL == packet_type) ? QOS1 : QOS0; // refer to MQTT spec 3.6.1
|
||||
int rc = mqtt_init_packet_header(&header, packet_type, requestQoS, dup, 0);
|
||||
|
||||
/* Minimum byte length required by ACK headers is
|
||||
* 2 for fixed and 2 for variable part */
|
||||
if (4 > buf_len) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
|
||||
}
|
||||
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
mqtt_write_char(&ptr, header); /* write header */
|
||||
|
||||
ptr += mqtt_write_packet_rem_len(ptr, 2); /* write remaining length */
|
||||
mqtt_write_uint_16(&ptr, packet_id);
|
||||
*serialized_len = (uint32_t)(ptr - buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the supplied publish data into the supplied buffer, ready for sending
|
||||
* @param buf the buffer into which the packet will be serialized
|
||||
* @param buf_len the length in bytes of the supplied buffer
|
||||
* @param dup integer - the MQTT dup flag
|
||||
* @param qos integer - the MQTT QoS value
|
||||
* @param retained integer - the MQTT retained flag
|
||||
* @param packet_id integer - the MQTT packet identifier
|
||||
* @param topicName MQTTString - the MQTT topic in the publish
|
||||
* @param payload byte buffer - the MQTT publish payload
|
||||
* @param payload_len integer - the length of the MQTT payload
|
||||
* @return the length of the serialized data. <= 0 indicates error
|
||||
*/
|
||||
static int _serialize_publish_packet(unsigned char *buf, size_t buf_len, uint8_t dup, QoS qos, uint8_t retained,
|
||||
uint16_t packet_id, char *topicName, unsigned char *payload, size_t payload_len,
|
||||
uint32_t *serialized_len)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(payload, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char *ptr = buf;
|
||||
unsigned char header = 0;
|
||||
uint32_t rem_len = 0;
|
||||
int rc;
|
||||
|
||||
rem_len = _get_publish_packet_len(qos, topicName, payload_len);
|
||||
if (get_mqtt_packet_len(rem_len) > buf_len) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
|
||||
}
|
||||
|
||||
rc = mqtt_init_packet_header(&header, PUBLISH, qos, dup, retained);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
mqtt_write_char(&ptr, header); /* write header */
|
||||
|
||||
ptr += mqtt_write_packet_rem_len(ptr, rem_len); /* write remaining length */
|
||||
;
|
||||
|
||||
mqtt_write_utf8_string(&ptr, topicName); /* Variable Header: Topic Name */
|
||||
|
||||
if (qos > 0) {
|
||||
mqtt_write_uint_16(&ptr, packet_id); /* Variable Header: Topic Name */
|
||||
}
|
||||
|
||||
memcpy(ptr, payload, payload_len);
|
||||
ptr += payload_len;
|
||||
|
||||
*serialized_len = (uint32_t)(ptr - buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_publish(Qcloud_IoT_Client *pClient, char *topicName, PublishParams *pParams)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(pParams, QCLOUD_ERR_INVAL);
|
||||
STRING_PTR_SANITY_CHECK(topicName, QCLOUD_ERR_INVAL);
|
||||
|
||||
Timer timer;
|
||||
uint32_t len = 0;
|
||||
int rc;
|
||||
|
||||
ListNode *node = NULL;
|
||||
|
||||
size_t topicLen = strlen(topicName);
|
||||
if (topicLen > MAX_SIZE_OF_CLOUD_TOPIC) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
|
||||
}
|
||||
|
||||
if (pParams->qos == QOS2) {
|
||||
Log_e("QoS2 is not supported currently");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_QOS_NOT_SUPPORT);
|
||||
}
|
||||
|
||||
if (!get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
InitTimer(&timer);
|
||||
countdown_ms(&timer, pClient->command_timeout_ms);
|
||||
|
||||
HAL_MutexLock(pClient->lock_write_buf);
|
||||
if (pParams->qos == QOS1) {
|
||||
pParams->id = get_next_packet_id(pClient);
|
||||
if (IOT_Log_Get_Level() <= eLOG_DEBUG) {
|
||||
Log_d("publish topic seq=%d|topicName=%s|payload=%s", pParams->id, topicName, (char *)pParams->payload);
|
||||
} else {
|
||||
Log_i("publish topic seq=%d|topicName=%s", pParams->id, topicName);
|
||||
}
|
||||
} else {
|
||||
if (IOT_Log_Get_Level() <= eLOG_DEBUG) {
|
||||
Log_d("publish packetID=%d|topicName=%s|payload=%s", pParams->id, topicName, (char *)pParams->payload);
|
||||
} else {
|
||||
Log_i("publish packetID=%d|topicName=%s", pParams->id, topicName);
|
||||
}
|
||||
}
|
||||
|
||||
rc = _serialize_publish_packet(pClient->write_buf, pClient->write_buf_size, 0, pParams->qos, pParams->retained,
|
||||
pParams->id, topicName, (unsigned char *)pParams->payload, pParams->payload_len,
|
||||
&len);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
if (pParams->qos > QOS0) {
|
||||
rc = _mask_push_pubInfo_to(pClient, len, pParams->id, &node);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
Log_e("push publish into to pubInfolist failed!");
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
}
|
||||
|
||||
/* send the publish packet */
|
||||
rc = send_mqtt_packet(pClient, len, &timer);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
if (pParams->qos > QOS0) {
|
||||
HAL_MutexLock(pClient->lock_list_pub);
|
||||
list_remove(pClient->list_pub_wait_ack, node);
|
||||
HAL_MutexUnlock(pClient->lock_list_pub);
|
||||
}
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(pParams->id);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
251
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_subscribe.c
vendored
Normal file
251
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_subscribe.c
vendored
Normal file
@@ -0,0 +1,251 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2014 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Ian Craggs - initial API and implementation and/or initial documentation
|
||||
*******************************************************************************/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "mqtt_client.h"
|
||||
|
||||
/**
|
||||
* Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters
|
||||
* @param count the number of topic filter strings in topicFilters
|
||||
* @param topicFilters the array of topic filter strings to be used in the publish
|
||||
* @return the length of buffer needed to contain the serialized version of the packet
|
||||
*/
|
||||
static uint32_t _get_subscribe_packet_rem_len(uint32_t count, char **topicFilters)
|
||||
{
|
||||
size_t i;
|
||||
size_t len = 2; /* packetid */
|
||||
|
||||
for (i = 0; i < count; ++i) {
|
||||
len += 2 + strlen(*topicFilters + i) + 1; /* length + topic + req_qos */
|
||||
}
|
||||
|
||||
return (uint32_t)len;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the supplied subscribe data into the supplied buffer, ready for sending
|
||||
* @param buf the buffer into which the packet will be serialized
|
||||
* @param buf_len the length in bytes of the supplied bufferr
|
||||
* @param dup integer - the MQTT dup flag
|
||||
* @param packet_id integer - the MQTT packet identifier
|
||||
* @param count - number of members in the topicFilters and reqQos arrays
|
||||
* @param topicFilters - array of topic filter names
|
||||
* @param requestedQoSs - array of requested QoS
|
||||
* @return the length of the serialized data. <= 0 indicates error
|
||||
*/
|
||||
static int _serialize_subscribe_packet(unsigned char *buf, size_t buf_len, uint8_t dup, uint16_t packet_id,
|
||||
uint32_t count, char **topicFilters, QoS *requestedQoSs,
|
||||
uint32_t *serialized_len)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char *ptr = buf;
|
||||
unsigned char header = 0;
|
||||
uint32_t rem_len = 0;
|
||||
uint32_t i = 0;
|
||||
int rc;
|
||||
|
||||
// remaining length of SUBSCRIBE packet = packet type(2 byte) + count * (remaining length(2 byte) + topicLen + qos(1
|
||||
// byte))
|
||||
rem_len = _get_subscribe_packet_rem_len(count, topicFilters);
|
||||
if (get_mqtt_packet_len(rem_len) > buf_len) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
|
||||
}
|
||||
// init header
|
||||
rc = mqtt_init_packet_header(&header, SUBSCRIBE, QOS1, dup, 0);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
// 1st byte in fixed header
|
||||
mqtt_write_char(&ptr, header);
|
||||
// remaining length
|
||||
ptr += mqtt_write_packet_rem_len(ptr, rem_len);
|
||||
// variable header
|
||||
mqtt_write_uint_16(&ptr, packet_id);
|
||||
// payload
|
||||
for (i = 0; i < count; ++i) {
|
||||
mqtt_write_utf8_string(&ptr, *topicFilters + i);
|
||||
mqtt_write_char(&ptr, (unsigned char)requestedQoSs[i]);
|
||||
}
|
||||
|
||||
*serialized_len = (uint32_t)(ptr - buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_subscribe(Qcloud_IoT_Client *pClient, char *topicFilter, SubscribeParams *pParams)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
int rc;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(pParams, QCLOUD_ERR_INVAL);
|
||||
// POINTER_SANITY_CHECK(pParams->on_message_handler, QCLOUD_ERR_INVAL);
|
||||
STRING_PTR_SANITY_CHECK(topicFilter, QCLOUD_ERR_INVAL);
|
||||
|
||||
Timer timer;
|
||||
uint32_t len = 0;
|
||||
uint16_t packet_id = 0;
|
||||
|
||||
ListNode *node = NULL;
|
||||
|
||||
size_t topicLen = strlen(topicFilter);
|
||||
if (topicLen > MAX_SIZE_OF_CLOUD_TOPIC) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
|
||||
}
|
||||
|
||||
if (pParams->qos == QOS2) {
|
||||
Log_e("QoS2 is not supported currently");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_QOS_NOT_SUPPORT);
|
||||
}
|
||||
|
||||
if (!get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN)
|
||||
}
|
||||
|
||||
/* topic filter should be valid in the whole sub life */
|
||||
char *topic_filter_stored = HAL_Malloc(topicLen + 1);
|
||||
if (topic_filter_stored == NULL) {
|
||||
Log_e("malloc failed");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
|
||||
strcpy(topic_filter_stored, topicFilter);
|
||||
topic_filter_stored[topicLen] = 0;
|
||||
|
||||
InitTimer(&timer);
|
||||
countdown_ms(&timer, pClient->command_timeout_ms);
|
||||
|
||||
HAL_MutexLock(pClient->lock_write_buf);
|
||||
packet_id = get_next_packet_id(pClient);
|
||||
Log_d("topicName=%s|packet_id=%d", topic_filter_stored, packet_id);
|
||||
|
||||
rc = _serialize_subscribe_packet(pClient->write_buf, pClient->write_buf_size, 0, packet_id, 1, &topic_filter_stored,
|
||||
&pParams->qos, &len);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
HAL_Free(topic_filter_stored);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
/* add node into sub ack wait list */
|
||||
SubTopicHandle sub_handle;
|
||||
sub_handle.topic_filter = topic_filter_stored;
|
||||
sub_handle.message_handler = pParams->on_message_handler;
|
||||
sub_handle.sub_event_handler = pParams->on_sub_event_handler;
|
||||
sub_handle.qos = pParams->qos;
|
||||
sub_handle.handler_user_data = pParams->user_data;
|
||||
|
||||
rc = push_sub_info_to(pClient, len, (unsigned int)packet_id, SUBSCRIBE, &sub_handle, &node);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
Log_e("push publish into to pubInfolist failed!");
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
HAL_Free(topic_filter_stored);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
// send SUBSCRIBE packet
|
||||
rc = send_mqtt_packet(pClient, len, &timer);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexLock(pClient->lock_list_sub);
|
||||
list_remove(pClient->list_sub_wait_ack, node);
|
||||
HAL_MutexUnlock(pClient->lock_list_sub);
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
HAL_Free(topic_filter_stored);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(packet_id);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_resubscribe(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
int rc;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
uint32_t itr = 0;
|
||||
char * topic = NULL;
|
||||
SubscribeParams temp_param;
|
||||
|
||||
if (NULL == pClient) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_INVAL);
|
||||
}
|
||||
|
||||
if (!get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
for (itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) {
|
||||
topic = (char *)pClient->sub_handles[itr].topic_filter;
|
||||
if (topic == NULL) {
|
||||
continue;
|
||||
}
|
||||
temp_param.on_message_handler = pClient->sub_handles[itr].message_handler;
|
||||
temp_param.on_sub_event_handler = pClient->sub_handles[itr].sub_event_handler;
|
||||
temp_param.qos = pClient->sub_handles[itr].qos;
|
||||
temp_param.user_data = pClient->sub_handles[itr].handler_user_data;
|
||||
|
||||
rc = qcloud_iot_mqtt_subscribe(pClient, topic, &temp_param);
|
||||
if (rc < 0) {
|
||||
Log_e("resubscribe failed %d, topic: %s", rc, topic);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
bool qcloud_iot_mqtt_is_sub_ready(Qcloud_IoT_Client *pClient, char *topicFilter)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
POINTER_SANITY_CHECK(pClient, false);
|
||||
STRING_PTR_SANITY_CHECK(topicFilter, false);
|
||||
|
||||
size_t topicLen = strlen(topicFilter);
|
||||
if (topicLen > MAX_SIZE_OF_CLOUD_TOPIC) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
HAL_MutexLock(pClient->lock_generic);
|
||||
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
|
||||
if ((pClient->sub_handles[i].topic_filter != NULL &&
|
||||
!strcmp(pClient->sub_handles[i].topic_filter, topicFilter)) ||
|
||||
strstr(topicFilter, "/#") != NULL || strstr(topicFilter, "/+") != NULL) {
|
||||
HAL_MutexUnlock(pClient->lock_generic);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
HAL_MutexUnlock(pClient->lock_generic);
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
199
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_unsubscribe.c
vendored
Normal file
199
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_unsubscribe.c
vendored
Normal file
@@ -0,0 +1,199 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2014 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Ian Craggs - initial API and implementation and/or initial documentation
|
||||
*******************************************************************************/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "mqtt_client.h"
|
||||
|
||||
/**
|
||||
* Determines the length of the MQTT unsubscribe packet that would be produced using the supplied parameters
|
||||
* @param count the number of topic filter strings in topicFilters
|
||||
* @param topicFilters the array of topic filter strings to be used in the publish
|
||||
* @return the length of buffer needed to contain the serialized version of the packet
|
||||
*/
|
||||
static uint32_t _get_unsubscribe_packet_rem_len(uint32_t count, char **topicFilters)
|
||||
{
|
||||
size_t i;
|
||||
size_t len = 2; /* packetid */
|
||||
|
||||
for (i = 0; i < count; ++i) {
|
||||
len += 2 + strlen(*topicFilters + i); /* length + topic*/
|
||||
}
|
||||
|
||||
return (uint32_t)len;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the supplied unsubscribe data into the supplied buffer, ready for sending
|
||||
* @param buf the raw buffer data, of the correct length determined by the remaining length field
|
||||
* @param buf_len the length in bytes of the data in the supplied buffer
|
||||
* @param dup integer - the MQTT dup flag
|
||||
* @param packet_id integer - the MQTT packet identifier
|
||||
* @param count - number of members in the topicFilters array
|
||||
* @param topicFilters - array of topic filter names
|
||||
* @param serialized_len - the length of the serialized data
|
||||
* @return int indicating function execution status
|
||||
*/
|
||||
static int _serialize_unsubscribe_packet(unsigned char *buf, size_t buf_len, uint8_t dup, uint16_t packet_id,
|
||||
uint32_t count, char **topicFilters, uint32_t *serialized_len)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(buf, QCLOUD_ERR_INVAL);
|
||||
POINTER_SANITY_CHECK(serialized_len, QCLOUD_ERR_INVAL);
|
||||
|
||||
unsigned char *ptr = buf;
|
||||
unsigned char header = 0;
|
||||
uint32_t rem_len = 0;
|
||||
uint32_t i = 0;
|
||||
int rc;
|
||||
|
||||
rem_len = _get_unsubscribe_packet_rem_len(count, topicFilters);
|
||||
if (get_mqtt_packet_len(rem_len) > buf_len) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
|
||||
}
|
||||
|
||||
rc = mqtt_init_packet_header(&header, UNSUBSCRIBE, QOS1, dup, 0);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
mqtt_write_char(&ptr, header); /* write header */
|
||||
|
||||
ptr += mqtt_write_packet_rem_len(ptr, rem_len); /* write remaining length */
|
||||
|
||||
mqtt_write_uint_16(&ptr, packet_id);
|
||||
|
||||
for (i = 0; i < count; ++i) {
|
||||
mqtt_write_utf8_string(&ptr, *topicFilters + i);
|
||||
}
|
||||
|
||||
*serialized_len = (uint32_t)(ptr - buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
int qcloud_iot_mqtt_unsubscribe(Qcloud_IoT_Client *pClient, char *topicFilter)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
int rc;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
STRING_PTR_SANITY_CHECK(topicFilter, QCLOUD_ERR_INVAL);
|
||||
|
||||
int i = 0;
|
||||
Timer timer;
|
||||
uint32_t len = 0;
|
||||
uint16_t packet_id = 0;
|
||||
bool suber_exists = false;
|
||||
|
||||
ListNode *node = NULL;
|
||||
|
||||
size_t topicLen = strlen(topicFilter);
|
||||
if (topicLen > MAX_SIZE_OF_CLOUD_TOPIC) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
|
||||
}
|
||||
|
||||
/* Remove from message handler array */
|
||||
HAL_MutexLock(pClient->lock_generic);
|
||||
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
|
||||
if ((pClient->sub_handles[i].topic_filter != NULL &&
|
||||
!strcmp(pClient->sub_handles[i].topic_filter, topicFilter)) ||
|
||||
strstr(topicFilter, "/#") != NULL || strstr(topicFilter, "/+") != NULL) {
|
||||
/* notify this event to topic subscriber */
|
||||
if (NULL != pClient->sub_handles[i].sub_event_handler)
|
||||
pClient->sub_handles[i].sub_event_handler(pClient, MQTT_EVENT_UNSUBSCRIBE,
|
||||
pClient->sub_handles[i].handler_user_data);
|
||||
|
||||
/* Free the topic filter malloced in qcloud_iot_mqtt_subscribe */
|
||||
HAL_Free((void *)pClient->sub_handles[i].topic_filter);
|
||||
pClient->sub_handles[i].topic_filter = NULL;
|
||||
|
||||
/* We don't want to break here, if the same topic is registered
|
||||
* with 2 callbacks. Unlikely scenario */
|
||||
suber_exists = true;
|
||||
}
|
||||
}
|
||||
HAL_MutexUnlock(pClient->lock_generic);
|
||||
|
||||
if (suber_exists == false) {
|
||||
Log_e("subscription does not exists: %s", topicFilter);
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_UNSUB_FAIL);
|
||||
}
|
||||
|
||||
if (!get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
/* topic filter should be valid in the whole sub life */
|
||||
char *topic_filter_stored = HAL_Malloc(topicLen + 1);
|
||||
if (topic_filter_stored == NULL) {
|
||||
Log_e("malloc failed");
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
|
||||
}
|
||||
strcpy(topic_filter_stored, topicFilter);
|
||||
topic_filter_stored[topicLen] = 0;
|
||||
|
||||
InitTimer(&timer);
|
||||
countdown_ms(&timer, pClient->command_timeout_ms);
|
||||
|
||||
HAL_MutexLock(pClient->lock_write_buf);
|
||||
packet_id = get_next_packet_id(pClient);
|
||||
rc = _serialize_unsubscribe_packet(pClient->write_buf, pClient->write_buf_size, 0, packet_id, 1,
|
||||
&topic_filter_stored, &len);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
HAL_Free(topic_filter_stored);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
SubTopicHandle sub_handle;
|
||||
sub_handle.topic_filter = topic_filter_stored;
|
||||
sub_handle.sub_event_handler = NULL;
|
||||
sub_handle.message_handler = NULL;
|
||||
sub_handle.handler_user_data = NULL;
|
||||
|
||||
rc = push_sub_info_to(pClient, len, (unsigned int)packet_id, UNSUBSCRIBE, &sub_handle, &node);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
Log_e("push publish into to pubInfolist failed: %d", rc);
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
HAL_Free(topic_filter_stored);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
/* send the unsubscribe packet */
|
||||
rc = send_mqtt_packet(pClient, len, &timer);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexLock(pClient->lock_list_sub);
|
||||
list_remove(pClient->list_sub_wait_ack, node);
|
||||
HAL_MutexUnlock(pClient->lock_list_sub);
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
HAL_Free(topic_filter_stored);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
|
||||
IOT_FUNC_EXIT_RC(packet_id);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
501
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_yield.c
vendored
Normal file
501
components/connectivity/qcloud-iot-hub-sdk/3rdparty/sdk_src/protocol/mqtt/mqtt_client_yield.c
vendored
Normal file
@@ -0,0 +1,501 @@
|
||||
/*******************************************************************************
|
||||
* Copyright (c) 2014 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
|
||||
*******************************************************************************/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "log_upload.h"
|
||||
#include "mqtt_client.h"
|
||||
#include "qcloud_iot_import.h"
|
||||
|
||||
static uint32_t _get_random_interval(void)
|
||||
{
|
||||
srand((unsigned)HAL_GetTimeMs());
|
||||
/* range: 1000 - 2000 ms, in 10ms unit */
|
||||
return (rand() % 100 + 100) * 10;
|
||||
}
|
||||
|
||||
static void _iot_disconnect_callback(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
if (NULL != pClient->event_handle.h_fp) {
|
||||
MQTTEventMsg msg;
|
||||
msg.event_type = MQTT_EVENT_DISCONNECT;
|
||||
msg.msg = NULL;
|
||||
|
||||
pClient->event_handle.h_fp(pClient, pClient->event_handle.context, &msg);
|
||||
}
|
||||
}
|
||||
|
||||
static void _reconnect_callback(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
if (NULL != pClient->event_handle.h_fp) {
|
||||
MQTTEventMsg msg;
|
||||
msg.event_type = MQTT_EVENT_RECONNECT;
|
||||
msg.msg = NULL;
|
||||
|
||||
pClient->event_handle.h_fp(pClient, pClient->event_handle.context, &msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief handle exceptional disconnection
|
||||
*
|
||||
* @param pClient
|
||||
* @return
|
||||
*/
|
||||
static int _handle_disconnect(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
int rc;
|
||||
|
||||
if (0 == get_client_conn_state(pClient)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
rc = qcloud_iot_mqtt_disconnect(pClient);
|
||||
// disconnect network stack by force
|
||||
if (rc != QCLOUD_RET_SUCCESS) {
|
||||
pClient->network_stack.disconnect(&(pClient->network_stack));
|
||||
set_client_conn_state(pClient, NOTCONNECTED);
|
||||
}
|
||||
|
||||
Log_e("disconnect MQTT for some reasons..");
|
||||
|
||||
_iot_disconnect_callback(pClient);
|
||||
|
||||
// exceptional disconnection
|
||||
pClient->was_manually_disconnected = 0;
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief handle reconnect
|
||||
*
|
||||
* @param pClient
|
||||
* @return
|
||||
*/
|
||||
static int _handle_reconnect(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
int8_t isPhysicalLayerConnected = 1;
|
||||
int rc = QCLOUD_RET_MQTT_RECONNECTED;
|
||||
|
||||
// reconnect control by delay timer (increase interval exponentially )
|
||||
if (!expired(&(pClient->reconnect_delay_timer))) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT);
|
||||
}
|
||||
|
||||
if (NULL != pClient->network_stack.is_connected) {
|
||||
isPhysicalLayerConnected =
|
||||
(int8_t)pClient->network_stack.is_connected(&(pClient->network_stack)); // always return 1
|
||||
}
|
||||
|
||||
if (isPhysicalLayerConnected) {
|
||||
rc = qcloud_iot_mqtt_attempt_reconnect(pClient);
|
||||
if (rc == QCLOUD_RET_MQTT_RECONNECTED) {
|
||||
Log_e("attempt to reconnect success.");
|
||||
_reconnect_callback(pClient);
|
||||
#ifdef LOG_UPLOAD
|
||||
if (is_log_uploader_init()) {
|
||||
int log_level;
|
||||
if (qcloud_get_log_level(pClient, &log_level) < 0) {
|
||||
Log_e("client get log topic failed: %d", rc);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
} else {
|
||||
Log_e("attempt to reconnect failed, errCode: %d", rc);
|
||||
rc = QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT;
|
||||
}
|
||||
}
|
||||
|
||||
pClient->current_reconnect_wait_interval *= 2;
|
||||
|
||||
if (MAX_RECONNECT_WAIT_INTERVAL < pClient->current_reconnect_wait_interval) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT);
|
||||
}
|
||||
countdown_ms(&(pClient->reconnect_delay_timer), pClient->current_reconnect_wait_interval);
|
||||
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief handle MQTT keep alive (hearbeat with server)
|
||||
*
|
||||
* @param pClient
|
||||
* @return
|
||||
*/
|
||||
static int _mqtt_keep_alive(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
#define MQTT_PING_RETRY_TIMES 2
|
||||
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
int rc;
|
||||
Timer timer;
|
||||
uint32_t serialized_len = 0;
|
||||
|
||||
if (0 == pClient->options.keep_alive_interval) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
if (!expired(&pClient->ping_timer)) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
if (pClient->is_ping_outstanding >= MQTT_PING_RETRY_TIMES) {
|
||||
// Reaching here means we haven't received any MQTT packet for a long time (keep_alive_interval)
|
||||
Log_e("Fail to recv MQTT msg. Something wrong with the connection.");
|
||||
rc = _handle_disconnect(pClient);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
/* there is no ping outstanding - send one */
|
||||
HAL_MutexLock(pClient->lock_write_buf);
|
||||
rc = serialize_packet_with_zero_payload(pClient->write_buf, pClient->write_buf_size, PINGREQ, &serialized_len);
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
/* send the ping packet */
|
||||
int i = 0;
|
||||
InitTimer(&timer);
|
||||
do {
|
||||
countdown_ms(&timer, pClient->command_timeout_ms);
|
||||
rc = send_mqtt_packet(pClient, serialized_len, &timer);
|
||||
} while (QCLOUD_RET_SUCCESS != rc && (i++ < 3));
|
||||
|
||||
if (QCLOUD_RET_SUCCESS != rc) {
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
// If sending a PING fails, propably the connection is not OK and we decide to disconnect and begin reconnection
|
||||
// attempts
|
||||
Log_e("Fail to send PING request. Something wrong with the connection.");
|
||||
rc = _handle_disconnect(pClient);
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
HAL_MutexUnlock(pClient->lock_write_buf);
|
||||
|
||||
HAL_MutexLock(pClient->lock_generic);
|
||||
pClient->is_ping_outstanding++;
|
||||
/* start a timer to wait for PINGRESP from server */
|
||||
countdown(&pClient->ping_timer, Min(5, pClient->options.keep_alive_interval / 2));
|
||||
HAL_MutexUnlock(pClient->lock_generic);
|
||||
Log_d("PING request %u has been sent...", pClient->is_ping_outstanding);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check connection and keep alive state, read/handle MQTT message in synchronized way
|
||||
*
|
||||
* @param pClient handle to MQTT client
|
||||
* @param timeout_ms timeout value (unit: ms) for this operation
|
||||
*
|
||||
* @return QCLOUD_RET_SUCCESS when success, QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT when try reconnecing, or err code for
|
||||
* failure
|
||||
*/
|
||||
int qcloud_iot_mqtt_yield(Qcloud_IoT_Client *pClient, uint32_t timeout_ms)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
int rc = QCLOUD_RET_SUCCESS;
|
||||
Timer timer;
|
||||
uint8_t packet_type;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
NUMBERIC_SANITY_CHECK(timeout_ms, QCLOUD_ERR_INVAL);
|
||||
|
||||
// 1. check if manually disconnect
|
||||
if (!get_client_conn_state(pClient) && pClient->was_manually_disconnected == 1) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_MANUALLY_DISCONNECTED);
|
||||
}
|
||||
|
||||
// 2. check connection state and if auto reconnect is enabled
|
||||
if (!get_client_conn_state(pClient) && pClient->options.auto_connect_enable == 0) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
|
||||
}
|
||||
|
||||
InitTimer(&timer);
|
||||
countdown_ms(&timer, timeout_ms);
|
||||
|
||||
// 3. main loop for packet reading/handling and keep alive maintainance
|
||||
while (!expired(&timer)) {
|
||||
if (!get_client_conn_state(pClient)) {
|
||||
if (pClient->current_reconnect_wait_interval > MAX_RECONNECT_WAIT_INTERVAL) {
|
||||
rc = QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT;
|
||||
break;
|
||||
}
|
||||
rc = _handle_reconnect(pClient);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
rc = cycle_for_read(pClient, &timer, &packet_type, QOS0);
|
||||
|
||||
if (rc == QCLOUD_RET_SUCCESS) {
|
||||
/* check list of wait publish ACK to remove node that is ACKED or timeout */
|
||||
qcloud_iot_mqtt_pub_info_proc(pClient);
|
||||
|
||||
/* check list of wait subscribe(or unsubscribe) ACK to remove node that is ACKED or timeout */
|
||||
qcloud_iot_mqtt_sub_info_proc(pClient);
|
||||
|
||||
rc = _mqtt_keep_alive(pClient);
|
||||
} else if (rc == QCLOUD_ERR_SSL_READ_TIMEOUT || rc == QCLOUD_ERR_SSL_READ ||
|
||||
rc == QCLOUD_ERR_TCP_PEER_SHUTDOWN || rc == QCLOUD_ERR_TCP_READ_FAIL) {
|
||||
Log_e("network read failed, rc: %d. MQTT Disconnect.", rc);
|
||||
rc = _handle_disconnect(pClient);
|
||||
}
|
||||
|
||||
if (rc == QCLOUD_ERR_MQTT_NO_CONN) {
|
||||
pClient->counter_network_disconnected++;
|
||||
|
||||
if (pClient->options.auto_connect_enable == 1) {
|
||||
pClient->current_reconnect_wait_interval = _get_random_interval();
|
||||
countdown_ms(&(pClient->reconnect_delay_timer), pClient->current_reconnect_wait_interval);
|
||||
|
||||
// reconnect timeout
|
||||
rc = QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else if (rc != QCLOUD_RET_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
// workaround wrapper for qcloud_iot_mqtt_yield for multi-thread mode
|
||||
int qcloud_iot_mqtt_yield_mt(Qcloud_IoT_Client *mqtt_client, uint32_t timeout_ms)
|
||||
{
|
||||
POINTER_SANITY_CHECK(mqtt_client, QCLOUD_ERR_INVAL);
|
||||
NUMBERIC_SANITY_CHECK(timeout_ms, QCLOUD_ERR_INVAL);
|
||||
|
||||
#ifdef MULTITHREAD_ENABLED
|
||||
/* only one instance of yield is allowed in running state*/
|
||||
if (mqtt_client->thread_running) {
|
||||
HAL_SleepMs(timeout_ms);
|
||||
return QCLOUD_RET_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
return qcloud_iot_mqtt_yield(mqtt_client, timeout_ms);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief puback waiting timeout process
|
||||
*
|
||||
* @param pClient reference to MQTTClient
|
||||
*
|
||||
*/
|
||||
int qcloud_iot_mqtt_pub_info_proc(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
|
||||
POINTER_SANITY_CHECK(pClient, QCLOUD_ERR_INVAL);
|
||||
|
||||
HAL_MutexLock(pClient->lock_list_pub);
|
||||
do {
|
||||
if (0 == pClient->list_pub_wait_ack->len) {
|
||||
break;
|
||||
}
|
||||
|
||||
ListIterator *iter;
|
||||
ListNode * node = NULL;
|
||||
ListNode * temp_node = NULL;
|
||||
|
||||
if (NULL == (iter = list_iterator_new(pClient->list_pub_wait_ack, LIST_TAIL))) {
|
||||
Log_e("new list failed");
|
||||
break;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
node = list_iterator_next(iter);
|
||||
|
||||
if (NULL != temp_node) {
|
||||
list_remove(pClient->list_pub_wait_ack, temp_node);
|
||||
temp_node = NULL;
|
||||
}
|
||||
|
||||
if (NULL == node) {
|
||||
break; /* end of list */
|
||||
}
|
||||
|
||||
QcloudIotPubInfo *repubInfo = (QcloudIotPubInfo *)node->val;
|
||||
if (NULL == repubInfo) {
|
||||
Log_e("node's value is invalid!");
|
||||
temp_node = node;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* remove invalid node */
|
||||
if (MQTT_NODE_STATE_INVALID == repubInfo->node_state) {
|
||||
temp_node = node;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!pClient->is_connected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* check the request if timeout or not */
|
||||
if (left_ms(&repubInfo->pub_start_time) > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_list_pub);
|
||||
/* If wait ACK timeout, remove the node from list */
|
||||
/* It is up to user to do republishing or not */
|
||||
temp_node = node;
|
||||
|
||||
countdown_ms(&repubInfo->pub_start_time, pClient->command_timeout_ms);
|
||||
HAL_MutexLock(pClient->lock_list_pub);
|
||||
|
||||
/* notify timeout event */
|
||||
if (NULL != pClient->event_handle.h_fp) {
|
||||
MQTTEventMsg msg;
|
||||
msg.event_type = MQTT_EVENT_PUBLISH_TIMEOUT;
|
||||
msg.msg = (void *)(uintptr_t)repubInfo->msg_id;
|
||||
pClient->event_handle.h_fp(pClient, pClient->event_handle.context, &msg);
|
||||
}
|
||||
}
|
||||
|
||||
list_iterator_destroy(iter);
|
||||
|
||||
} while (0);
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_list_pub);
|
||||
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief suback waiting timeout process
|
||||
*
|
||||
* @param pClient reference to MQTTClient
|
||||
*
|
||||
*/
|
||||
int qcloud_iot_mqtt_sub_info_proc(Qcloud_IoT_Client *pClient)
|
||||
{
|
||||
IOT_FUNC_ENTRY;
|
||||
int rc = QCLOUD_RET_SUCCESS;
|
||||
|
||||
if (!pClient) {
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_ERR_INVAL);
|
||||
}
|
||||
|
||||
HAL_MutexLock(pClient->lock_list_sub);
|
||||
do {
|
||||
if (0 == pClient->list_sub_wait_ack->len) {
|
||||
break;
|
||||
}
|
||||
|
||||
ListIterator *iter;
|
||||
ListNode * node = NULL;
|
||||
ListNode * temp_node = NULL;
|
||||
uint16_t packet_id = 0;
|
||||
MessageTypes msg_type;
|
||||
|
||||
if (NULL == (iter = list_iterator_new(pClient->list_sub_wait_ack, LIST_TAIL))) {
|
||||
Log_e("new list failed");
|
||||
HAL_MutexUnlock(pClient->lock_list_sub);
|
||||
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
node = list_iterator_next(iter);
|
||||
|
||||
if (NULL != temp_node) {
|
||||
list_remove(pClient->list_sub_wait_ack, temp_node);
|
||||
temp_node = NULL;
|
||||
}
|
||||
|
||||
if (NULL == node) {
|
||||
break; /* end of list */
|
||||
}
|
||||
|
||||
QcloudIotSubInfo *sub_info = (QcloudIotSubInfo *)node->val;
|
||||
if (NULL == sub_info) {
|
||||
Log_e("node's value is invalid!");
|
||||
temp_node = node;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* remove invalid node */
|
||||
if (MQTT_NODE_STATE_INVALID == sub_info->node_state) {
|
||||
temp_node = node;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pClient->is_connected <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* check the request if timeout or not */
|
||||
if (left_ms(&sub_info->sub_start_time) > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* When arrive here, it means timeout to wait ACK */
|
||||
packet_id = sub_info->msg_id;
|
||||
msg_type = sub_info->type;
|
||||
|
||||
/* Wait MQTT SUBSCRIBE ACK timeout */
|
||||
if (NULL != pClient->event_handle.h_fp) {
|
||||
MQTTEventMsg msg;
|
||||
|
||||
if (SUBSCRIBE == msg_type) {
|
||||
/* subscribe timeout */
|
||||
msg.event_type = MQTT_EVENT_SUBCRIBE_TIMEOUT;
|
||||
msg.msg = (void *)(uintptr_t)packet_id;
|
||||
|
||||
/* notify this event to topic subscriber */
|
||||
if (NULL != sub_info->handler.sub_event_handler)
|
||||
sub_info->handler.sub_event_handler(pClient, MQTT_EVENT_SUBCRIBE_TIMEOUT,
|
||||
sub_info->handler.handler_user_data);
|
||||
|
||||
} else {
|
||||
/* unsubscribe timeout */
|
||||
msg.event_type = MQTT_EVENT_UNSUBCRIBE_TIMEOUT;
|
||||
msg.msg = (void *)(uintptr_t)packet_id;
|
||||
}
|
||||
|
||||
pClient->event_handle.h_fp(pClient, pClient->event_handle.context, &msg);
|
||||
}
|
||||
|
||||
if (NULL != sub_info->handler.topic_filter)
|
||||
HAL_Free((void *)(sub_info->handler.topic_filter));
|
||||
|
||||
temp_node = node;
|
||||
}
|
||||
|
||||
list_iterator_destroy(iter);
|
||||
|
||||
} while (0);
|
||||
|
||||
HAL_MutexUnlock(pClient->lock_list_sub);
|
||||
|
||||
IOT_FUNC_EXIT_RC(rc);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
Reference in New Issue
Block a user