add new qloud-c-sdk component

This commit is contained in:
mculover666
2022-03-25 10:06:56 +08:00
parent 565cd29e94
commit a3ac2e56d8
166 changed files with 35027 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
file(GLOB src_mqtt_client ${CMAKE_CURRENT_SOURCE_DIR}/src/*.c)
set(inc_mqtt_client ${CMAKE_CURRENT_SOURCE_DIR}/inc/)
set(src_services ${src_services} ${src_mqtt_client} PARENT_SCOPE)
set(inc_services ${inc_services} ${inc_mqtt_client} PARENT_SCOPE)
file(GLOB src_mqtt_sample ${CMAKE_CURRENT_SOURCE_DIR}/sample/mqtt_sample.c)
add_executable(mqtt_sample ${src_mqtt_sample})
target_link_libraries(mqtt_sample ${libsdk})
if( ${CONFIG_IOT_TEST} STREQUAL "ON")
file(GLOB src_unit_test ${CMAKE_CURRENT_SOURCE_DIR}/test/*.cc)
set(inc_mqtt_client_test ${CMAKE_CURRENT_SOURCE_DIR}/test)
set(src_test ${src_test} ${src_unit_test} PARENT_SCOPE)
set(inc_test ${inc_test} ${inc_mqtt_client_test} PARENT_SCOPE)
endif()

View File

@@ -0,0 +1,446 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client.h
* @brief mqtt client internel api
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-31
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-31 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#ifndef IOT_HUB_DEVICE_C_SDK_SERVICES_COMMON_MQTT_CLIENT_INC_MQTT_CLIENT_H_
#define IOT_HUB_DEVICE_C_SDK_SERVICES_COMMON_MQTT_CLIENT_INC_MQTT_CLIENT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "qcloud_iot_mqtt_client.h"
#include "mqtt_packet.h"
#include "network_interface.h"
#include "utils_list.h"
#include "utils_base64.h"
#include "utils_hmac.h"
/**
* @brief Packet id, random from [1 - 65536]
*
*/
#define MAX_PACKET_ID (65535)
/**
* @brief Max size of conn Id
*
*/
#define MAX_CONN_ID_LEN (6)
/**
* @brief Max number of topic subscribed
*
*/
#define MAX_MESSAGE_HANDLERS (10)
/**
* @brief Max number in repub list
*
*/
#define MAX_REPUB_NUM (20)
/**
* @brief Minimal wait interval when reconnect
*
*/
#define MIN_RECONNECT_WAIT_INTERVAL (1000)
/**
* @brief Minimal MQTT timeout value
*
*/
#define MIN_COMMAND_TIMEOUT (500)
/**
* @brief Maxmal MQTT timeout value
*
*/
#define MAX_COMMAND_TIMEOUT (20000)
/**
* @brief Minimal TLS handshaking timeout value (unit: ms)
*
*/
#define QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT (5 * 1000)
/**
* @brief Enable repeat packet id filter
*
*/
#define MQTT_RMDUP_MSG_ENABLED
/**
* @brief Connect status of mqtt server
*
*/
typedef enum {
NOTCONNECTED = 0,
CONNECTED = 1,
} ConnStatus;
/**
* @brief mqtt connection mode
*
*/
typedef enum {
FIRST_CONNECT = 0,
RECONNECT = 1,
} ConnMode;
/*
* @brief Subscribe status of topic.
*
*/
typedef enum {
SUB_ACK_NOT_RECEIVED = 0,
SUB_ACK_RECEIVED = 1,
} SubStatus;
/**
* @brief data structure for topic subscription handle
*/
typedef struct {
char *topic_filter; /**< topic name, wildcard filter is supported */
SubStatus status; /**< status of sub handle */
SubscribeParams params; /**< params needed to subscribe */
} SubTopicHandle;
/**
* @brief MQTT QCloud IoT Client structure
*
*/
typedef struct {
DeviceInfo *device_info; /**< device info needed to connect server */
uint16_t next_packet_id; /**< MQTT random packet id */
uint32_t command_timeout_ms; /**< MQTT command timeout, unit:ms */
uint8_t write_buf[QCLOUD_IOT_MQTT_TX_BUF_LEN]; /**< MQTT write buffer */
uint8_t read_buf[QCLOUD_IOT_MQTT_RX_BUF_LEN]; /**< MQTT read buffer */
size_t write_buf_size; /**< size of MQTT write buffer */
size_t read_buf_size; /**< size of MQTT read buffer */
MQTTEventHandler event_handle; /**< callback for MQTT event */
uint8_t auto_connect_enable; /**< enable auto connection or not */
uint8_t default_subscribe; /**< no subscribe packet send, only add subhandle */
void *lock_generic; /**< mutex/lock for this client struture */
void *lock_write_buf; /**< mutex/lock for write buffer */
void *list_pub_wait_ack; /**< puback waiting list */
void *list_sub_wait_ack; /**< suback waiting list */
char host_addr[HOST_STR_LENGTH]; /**< MQTT server host */
MQTTGetNextHostIp get_next_host_ip; // get host ip
const char *main_host;
const char *backup_host; // if host not connect will try this.
IotNetwork network_stack; /**< MQTT network stack */
MQTTPacketConnectOption options; /**< handle to connection parameters */
char conn_id[MAX_CONN_ID_LEN]; /**< connect id */
SubTopicHandle sub_handles[MAX_MESSAGE_HANDLERS]; /**< subscription handle array */
Timer ping_timer; /**< MQTT ping timer */
Timer reconnect_delay_timer; /**< MQTT reconnect delay timer */
uint8_t was_manually_disconnected; /**< was disconnect by server or device */
uint8_t is_ping_outstanding; /**< 1 = ping request is sent while ping response not arrived yet */
uint32_t current_reconnect_wait_interval; /**< unit:ms */
uint8_t is_connected; /**< is connected or not */
uint32_t counter_network_disconnected; /**< number of disconnection*/
#ifdef MQTT_RMDUP_MSG_ENABLED
#define MQTT_MAX_REPEAT_BUF_LEN 10
uint16_t repeat_packet_id_buf[MQTT_MAX_REPEAT_BUF_LEN]; /**< repeat packet id buffer */
unsigned int current_packet_id_cnt; /**< index of packet id buffer */
#endif
} QcloudIotClient;
/**
* @brief topic publish info
*
*/
typedef struct {
uint8_t *buf; /**< msg buffer */
uint32_t len; /**< msg length */
uint16_t packet_id; /**< packet id */
Timer pub_start_time; /**< timer for puback waiting */
} QcloudIotPubInfo;
/**
* @brief topic subscribe/unsubscribe info
*
*/
typedef struct {
uint8_t *buf; /**< msg buffer */
uint16_t len; /**< msg length */
MQTTPacketType type; /**< type: sub or unsub */
uint16_t packet_id; /**< packet id */
Timer sub_start_time; /**< timer for suback waiting */
SubTopicHandle handler; /**< handle of topic subscribed(unsubscribed) */
} QcloudIotSubInfo;
/**************************************************************************************
* common
**************************************************************************************/
/**
* @brief Get the next packet id object.
*
* @param[in,out] client pointer to mqtt client
* @return packet id
*/
uint16_t get_next_packet_id(QcloudIotClient *client);
/**
* @brief Get the next conn id object.
*
* @param[out] conn_id buffer to save conn id
*/
void get_next_conn_id(char *conn_id);
/**
* @brief Set the client conn state object.
*
* @param[in,out] client pointer to mqtt client
* @param[in] connected connect status, @see ConnStatus
*/
void set_client_conn_state(QcloudIotClient *client, uint8_t connected);
/**
* @brief Get the client conn state object.
*
* @param[in,out] client
* @return @see ConnStatus
*/
uint8_t get_client_conn_state(QcloudIotClient *client);
/**
* @brief Send mqtt packet, timeout = command_timeout_ms.
*
* @param[in,out] client pointer to mqtt client
* @param[in] length length of data to be sent, data is saved in client write_buf
* @return @see IotReturnCode
*/
int send_mqtt_packet(QcloudIotClient *client, size_t length);
/**************************************************************************************
* connect
**************************************************************************************/
/**
* @brief Connect MQTT server.
*
* @param[in,out] client pointer to mqtt client
* @param[in] mode when connect error. Choose different connection strategies according to the mode
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_connect(QcloudIotClient *client, ConnMode mode);
/**
* @brief Reconnect MQTT server.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_attempt_reconnect(QcloudIotClient *client);
/**
* @brief Disconnect MQTT server.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_disconnect(QcloudIotClient *client);
/**
* @brief Serialize and send pingreq packet.
*
* @param[in,out] client pointer to mqtt client
* @param[in] try_times if failed, retry times
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_pingreq(QcloudIotClient *client, int try_times);
/**************************************************************************************
* publish
**************************************************************************************/
/**
* @brief Serialize and send publish packet.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] topic_name topic to publish
* @param[in] params publish params
* @return >=0 for packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_publish(QcloudIotClient *client, const char *topic_name, const PublishParams *params);
/**
* @brief Deserialize publish packet and deliver_message.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] params publish params
* @return >=0 for packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_handle_publish(QcloudIotClient *client);
/**
* @brief Deserialize puback packet.
*
* @param[in,out] client pointer to mqtt_client
* @return 0 for success.
*/
int qcloud_iot_mqtt_handle_puback(QcloudIotClient *client);
/**
* @brief Process puback waiting timout.
*
* @param[in,out] client pointer to mqtt_client
*/
void qcloud_iot_mqtt_check_pub_timeout(QcloudIotClient *client);
/**************************************************************************************
* subscribe
**************************************************************************************/
/**
* @brief Serialize and send subscribe packet.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic to subscribe
* @param[in] params subscribe params
* @return >=0 for packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_subscribe(QcloudIotClient *client, const char *topic_filter, const SubscribeParams *params);
/**
* @brief Deserialize suback packet and return sub result.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_handle_suback(QcloudIotClient *client);
/**
* @brief Serialize and send unsubscribe packet.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic to unsubscribe
* @return >=0 packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_unsubscribe(QcloudIotClient *client, const char *topic_filter);
/**
* @brief Deserialize unsuback packet and remove from list.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_handle_unsuback(QcloudIotClient *client);
/**
* @brief Process suback waiting timeout.
*
* @param[in,out] client pointer to mqtt client
*/
void qcloud_iot_mqtt_check_sub_timeout(QcloudIotClient *client);
/**
* @brief Resubscribe topic when reconnect.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_resubscribe(QcloudIotClient *client);
/**
* @brief Return if topic is sub ready.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter
* @return true for ready
* @return false for not ready
*/
bool qcloud_iot_mqtt_is_sub_ready(QcloudIotClient *client, const char *topic_filter);
/**
* @brief Get usr data, usr should handle lock/unlock usrdata itself in callback and caller.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter
* @return NULL or user data
*/
void *qcloud_iot_mqtt_get_subscribe_usr_data(QcloudIotClient *client, const char *topic_filter);
/**
* @brief Clear sub handle array.
*
* @param[in,out] client pointer to mqtt client
*/
void qcloud_iot_mqtt_sub_handle_array_clear(QcloudIotClient *client);
/**
* @brief Clear suback wait list and clear sub handle.
*
* @param[in,out] client pointer to mqtt client
*/
void qcloud_iot_mqtt_suback_wait_list_clear(QcloudIotClient *client);
/**************************************************************************************
* yield
**************************************************************************************/
/**
* @brief Check connection and keep alive state, read/handle MQTT message in synchronized way.
*
* @param[in,out] client pointer to mqtt client
* @param[in] timeout_ms timeout value (unit: ms) for this operation
*
* @return QCLOUD_RET_SUCCESS when success, QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT when try reconnecting, others @see
* IotReturnCode
*/
int qcloud_iot_mqtt_yield(QcloudIotClient *client, uint32_t timeout_ms);
/**
* @brief Wait read specific mqtt packet, such as connack.
*
* @param[in,out] client pointer to mqtt client
* @param[in] packet_type packet type except to read
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_wait_for_read(QcloudIotClient *client, uint8_t packet_type);
#ifdef __cplusplus
}
#endif
#endif // IOT_HUB_DEVICE_C_SDK_SERVICES_COMMON_MQTT_CLIENT_INC_MQTT_CLIENT_H_

View File

@@ -0,0 +1,306 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_sample.c
* @brief a simple sample for mqtt client
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-31
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-31 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "qcloud_iot_common.h"
#include "utils_log.h"
/**
* @brief MQTT event callback, @see MQTTEventHandleFun
*
* @param[in] client pointer to mqtt client
* @param[in] handle_context context
* @param[in] msg msg
*/
static void _mqtt_event_handler(void *client, void *handle_context, MQTTEventMsg *msg)
{
MQTTMessage *mqtt_message = (MQTTMessage *)msg->msg;
uintptr_t packet_id = (uintptr_t)msg->msg;
switch (msg->event_type) {
case MQTT_EVENT_UNDEF:
Log_i("undefined event occur.");
break;
case MQTT_EVENT_DISCONNECT:
Log_i("MQTT disconnect.");
break;
case MQTT_EVENT_RECONNECT:
Log_i("MQTT reconnect.");
break;
case MQTT_EVENT_PUBLISH_RECEIVED:
Log_i("topic message arrived but without any related handle: topic=%.*s, topic_msg=%.*s",
mqtt_message->topic_len, STRING_PTR_PRINT_SANITY_CHECK(mqtt_message->topic_name),
mqtt_message->payload_len, STRING_PTR_PRINT_SANITY_CHECK((char *)mqtt_message->payload));
break;
case MQTT_EVENT_SUBSCRIBE_SUCCESS:
Log_i("subscribe success, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_SUBSCRIBE_TIMEOUT:
Log_i("subscribe wait ack timeout, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_SUBSCRIBE_NACK:
Log_i("subscribe nack, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_UNSUBSCRIBE_SUCCESS:
Log_i("unsubscribe success, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_UNSUBSCRIBE_TIMEOUT:
Log_i("unsubscribe timeout, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_UNSUBSCRIBE_NACK:
Log_i("unsubscribe nack, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_PUBLISH_SUCCESS:
Log_i("publish success, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_PUBLISH_TIMEOUT:
Log_i("publish timeout, packet-id=%u", (unsigned int)packet_id);
break;
case MQTT_EVENT_PUBLISH_NACK:
Log_i("publish nack, packet-id=%u", (unsigned int)packet_id);
break;
default:
Log_i("Should NOT arrive here.");
break;
}
}
/**
* @brief Setup MQTT construct parameters.
*
* @param[in,out] initParams @see MQTTInitParams
* @param[in] device_info @see DeviceInfo
*/
static void _setup_connect_init_params(MQTTInitParams *init_params, DeviceInfo *device_info)
{
init_params->device_info = device_info;
init_params->command_timeout = QCLOUD_IOT_MQTT_COMMAND_TIMEOUT;
init_params->keep_alive_interval_ms = QCLOUD_IOT_MQTT_KEEP_ALIVE_INTERNAL;
init_params->auto_connect_enable = 1;
init_params->event_handle.h_fp = _mqtt_event_handler;
init_params->event_handle.context = NULL;
}
/**
* @brief Publish a test mqtt message.
*
* @param[in, out] client pointer to mqtt client
* @param[in] topic_keyword topic keyword
* @param[in] qos qos to publish
* @return packet id (>=0) when success, or err code (<0) @see IotReturnCode
*/
static int _publish_test_msg(void *client, const char *topic_keyword, QoS qos)
{
char topic_name[MAX_SIZE_OF_CLOUD_TOPIC] = {0};
DeviceInfo *dev_info = IOT_MQTT_GetDeviceInfo(client);
HAL_Snprintf(topic_name, sizeof(topic_name), "%s/%s/%s", STRING_PTR_PRINT_SANITY_CHECK(dev_info->product_id),
STRING_PTR_PRINT_SANITY_CHECK(dev_info->device_name), STRING_PTR_PRINT_SANITY_CHECK(topic_keyword));
static int test_count = 0;
PublishParams pub_params = DEFAULT_PUB_PARAMS;
pub_params.qos = qos;
char publish_content[128] = {0};
HAL_Snprintf(publish_content, sizeof(publish_content), "{\"action\": \"publish_test\", \"count\": \"%d\"}",
test_count++);
pub_params.payload = publish_content;
pub_params.payload_len = strlen(publish_content);
return IOT_MQTT_Publish(client, topic_name, &pub_params);
}
/**
* @brief Callback when MQTT msg arrives @see OnMessageHandler
*
* @param[in, out] client pointer to mqtt client
* @param[in] message publish message from server
* @param[in] usr_data user data of SubscribeParams, @see SubscribeParams
*/
static void _on_message_callback(void *client, const MQTTMessage *message, void *user_data)
{
if (!message) {
return;
}
Log_i("Receive Message With topicName:%.*s, payload:%.*s", message->topic_len,
STRING_PTR_PRINT_SANITY_CHECK(message->topic_name), message->payload_len,
STRING_PTR_PRINT_SANITY_CHECK(message->payload_str));
}
/**
* @brief Subscribe MQTT topic and wait for sub result.
*
* @param[in, out] client pointer to mqtt client
* @param[in] topic_keyword topic keyword
* @param[in] qos qos to subscribe
* @return @see IotReturnCode
*/
static int _subscribe_topic_wait_result(void *client, char *topic_keyword, QoS qos)
{
char topic_name[MAX_SIZE_OF_CLOUD_TOPIC] = {0};
DeviceInfo *dev_info = IOT_MQTT_GetDeviceInfo(client);
HAL_Snprintf(topic_name, sizeof(topic_name), "%s/%s/%s", STRING_PTR_PRINT_SANITY_CHECK(dev_info->product_id),
STRING_PTR_PRINT_SANITY_CHECK(dev_info->device_name), STRING_PTR_PRINT_SANITY_CHECK(topic_keyword));
SubscribeParams sub_params = DEFAULT_SUB_PARAMS;
sub_params.qos = qos;
sub_params.on_message_handler = _on_message_callback;
return IOT_MQTT_SubscribeSync(client, topic_name, &sub_params);
}
/**
* @brief Unsubscribe MQTT topic.
*
* @param[in, out] client pointer to mqtt client
* @param[in] topic_keyword topic keyword
* @return @see IotReturnCode
*/
static int _unsubscribe_topic(void *client, char *topic_keyword)
{
char topic_name[MAX_SIZE_OF_CLOUD_TOPIC] = {0};
DeviceInfo *dev_info = IOT_MQTT_GetDeviceInfo(client);
HAL_Snprintf(topic_name, sizeof(topic_name), "%s/%s/%s", STRING_PTR_PRINT_SANITY_CHECK(dev_info->product_id),
STRING_PTR_PRINT_SANITY_CHECK(dev_info->device_name), STRING_PTR_PRINT_SANITY_CHECK(topic_keyword));
int rc = IOT_MQTT_Unsubscribe(client, topic_name);
if (rc < 0) {
Log_e("MQTT unsubscribe FAILED: %d", rc);
return rc;
}
return IOT_MQTT_Yield(client, 500); // wait for unsuback
}
// ----------------------------------------------------------------------------
// Main
// ----------------------------------------------------------------------------
static int sg_main_exit = 0;
#ifdef __linux__
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
static void _main_exit(int sig)
{
Log_e("demo exit by signal:%d\n", sig);
sg_main_exit = 1;
}
#endif
int main(int argc, char **argv)
{
#ifdef __linux__
signal(SIGINT, _main_exit);
#endif
int rc;
// init log level
LogHandleFunc func = {0};
func.log_malloc = HAL_Malloc;
func.log_free = HAL_Free;
func.log_get_current_time_str = HAL_Timer_Current;
func.log_printf = HAL_Printf;
utils_log_init(func, LOG_LEVEL_DEBUG, 2048);
DeviceInfo device_info;
rc = HAL_GetDevInfo((void *)&device_info);
if (rc) {
Log_e("get device info failed: %d", rc);
return rc;
}
// init connection
MQTTInitParams init_params = DEFAULT_MQTT_INIT_PARAMS;
_setup_connect_init_params(&init_params, &device_info);
// create MQTT client and connect with server
void *client = IOT_MQTT_Construct(&init_params);
if (client) {
Log_i("Cloud Device Construct Success");
} else {
Log_e("MQTT Construct failed!");
return QCLOUD_ERR_FAILURE;
}
// subscribe normal topics and wait result
rc = _subscribe_topic_wait_result(client, "data", QOS0);
if (rc) {
Log_e("Client Subscribe Topic Failed: %d", rc);
return rc;
}
do {
rc = _publish_test_msg(client, "data", QOS1);
if (rc < 0) {
Log_e("client publish topic failed :%d.", rc);
}
rc = IOT_MQTT_Yield(client, 2000);
if (rc == QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT) {
HAL_SleepMs(1000);
continue;
} else if (rc != QCLOUD_RET_SUCCESS && rc != QCLOUD_RET_MQTT_RECONNECTED) {
Log_e("exit with error: %d", rc);
break;
}
} while (!sg_main_exit);
rc = _unsubscribe_topic(client, "data");
rc |= IOT_MQTT_Destroy(&client);
utils_log_deinit();
return rc;
}

View File

@@ -0,0 +1,592 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client.c
* @brief
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-28
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-28 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-07 <td>1.1 <td>fancyxu <td>support user host for unittest
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include "mqtt_client.h"
#define MQTT_VERSION_3_1_1 4
// 20 for timestamp length & delimiter
#define MAX_MQTT_CONNECT_USR_NAME_LEN (MAX_SIZE_OF_CLIENT_ID + QCLOUD_IOT_DEVICE_SDK_APPID_LEN + MAX_CONN_ID_LEN + 20)
#define MAX_MQTT_CONNECT_PASSWORD_LEN (51)
/**************************************************************************************
* static method
**************************************************************************************/
/**
* @brief Get random packet id, when start.
*
* @return packet id
*/
static uint16_t _get_random_start_packet_id(void)
{
srand(HAL_Timer_CurrentSec());
return rand() % 65536 + 1;
}
/**
* @brief Init list_pub_wait_ack and list_sub_wait_ack
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
static int _mqtt_client_list_init(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
UtilsListFunc func = {
.list_malloc = HAL_Malloc,
.list_free = HAL_Free,
.list_lock_init = HAL_MutexCreate,
.list_lock_deinit = HAL_MutexDestroy,
.list_lock = HAL_MutexLock,
.list_unlock = HAL_MutexUnlock,
};
client->list_pub_wait_ack = utils_list_create(func, MAX_REPUB_NUM);
if (!client->list_pub_wait_ack) {
Log_e("create pub wait list failed.");
goto error;
}
client->list_sub_wait_ack = utils_list_create(func, MAX_MESSAGE_HANDLERS);
if (!client->list_sub_wait_ack) {
Log_e("create sub wait list failed.");
goto error;
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
error:
utils_list_destroy(client->list_pub_wait_ack);
client->list_pub_wait_ack = NULL;
utils_list_destroy(client->list_sub_wait_ack);
client->list_sub_wait_ack = NULL;
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
/**
* @brief Init network, tcp(with AUTH_WITH_NO_TLS) or tls.
*
* @param[in,out] client pointer to mqtt client
*/
static void _mqtt_client_network_init(QcloudIotClient *client, const MQTTInitParams *params)
{
HAL_Snprintf(client->host_addr, HOST_STR_LENGTH, "%s.%s", client->device_info->product_id,
params->host ? params->host : QCLOUD_IOT_MQTT_DIRECT_DOMAIN);
client->main_host = params->host;
client->backup_host = params->backup_host;
client->get_next_host_ip = params->get_next_host_ip;
#ifndef AUTH_WITH_NO_TLS
// device param for TLS connection
#ifdef AUTH_MODE_CERT
Log_d("cert file: %s", STRING_PTR_PRINT_SANITY_CHECK(client->device_info->cert_file));
Log_d("key file: %s", STRING_PTR_PRINT_SANITY_CHECK(client->device_info->key_file));
client->network_stack.ssl_connect_params.cert_file = client->device_info->cert_file;
client->network_stack.ssl_connect_params.key_file = client->device_info->key_file;
client->network_stack.ssl_connect_params.ca_crt = iot_ca_get();
client->network_stack.ssl_connect_params.ca_crt_len = strlen(client->network_stack.ssl_connect_params.ca_crt);
#else
client->network_stack.ssl_connect_params.psk = (char *)client->device_info->device_secret_decode;
client->network_stack.ssl_connect_params.psk_length = client->device_info->device_secret_decode_len;
client->network_stack.ssl_connect_params.psk_id = client->device_info->client_id;
client->network_stack.ssl_connect_params.ca_crt = NULL;
client->network_stack.ssl_connect_params.ca_crt_len = 0;
#endif
client->network_stack.host = client->host_addr;
client->network_stack.port = MQTT_SERVER_PORT_TLS;
client->network_stack.ssl_connect_params.timeout_ms = client->command_timeout_ms > QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT
? client->command_timeout_ms
: QCLOUD_IOT_TLS_HANDSHAKE_TIMEOUT;
client->network_stack.type = IOT_NETWORK_TYPE_TLS;
#else
client->network_stack.host = client->host_addr;
client->network_stack.port = MQTT_SERVER_PORT_NO_TLS;
client->network_stack.type = IOT_NETWORK_TYPE_TCP;
#endif
qcloud_iot_network_init(&(client->network_stack));
}
/**
* @brief Init mqtt connection options.
*
* @param[in,out] client pointer to mqtt client
* @param[in] params mqtt init params, @see MQTTInitParams
* @return int
*/
static int _mqtt_client_connect_option_init(QcloudIotClient *client, const MQTTInitParams *params)
{
IOT_FUNC_ENTRY;
int rc = 0;
long cur_timesec = 0;
client->options.mqtt_version = MQTT_VERSION_3_1_1;
// Upper limit of keep alive interval is (11.5 * 60) seconds
client->options.client_id = client->device_info->client_id;
client->options.keep_alive_interval =
params->keep_alive_interval_ms / 1000 > 690 ? 690 : params->keep_alive_interval_ms / 1000;
client->options.clean_session = params->clean_session;
// calculate user name & password
client->options.username = (char *)HAL_Malloc(MAX_MQTT_CONNECT_USR_NAME_LEN);
if (!client->options.username) {
Log_e("malloc username failed!");
rc = QCLOUD_ERR_MALLOC;
goto error;
}
cur_timesec =
(MAX_ACCESS_EXPIRE_TIMEOUT <= 0) ? 0x7fffffffL : (HAL_Timer_CurrentSec() + MAX_ACCESS_EXPIRE_TIMEOUT / 1000);
get_next_conn_id(client->conn_id);
HAL_Snprintf(client->options.username, MAX_MQTT_CONNECT_USR_NAME_LEN, "%s;%s;%s;%ld", client->options.client_id,
QCLOUD_IOT_DEVICE_SDK_APPID, client->conn_id, cur_timesec);
#if defined(AUTH_WITH_NO_TLS) && defined(AUTH_MODE_KEY)
char sign[41] = {0};
client->options.password = (char *)HAL_Malloc(MAX_MQTT_CONNECT_PASSWORD_LEN);
if (!client->options.password) {
Log_e("malloc password failed!");
rc = QCLOUD_ERR_MALLOC;
goto error;
}
utils_hmac_sha1(client->options.username, strlen(client->options.username),
client->device_info->device_secret_decode, client->device_info->device_secret_decode_len, sign);
HAL_Snprintf(client->options.password, MAX_MQTT_CONNECT_PASSWORD_LEN, "%s;hmacsha1", sign);
#endif
IOT_FUNC_EXIT_RC(rc);
error:
HAL_Free(client->options.username);
client->options.username = NULL;
HAL_Free(client->options.password);
client->options.password = NULL;
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Init mqtt client.
*
* @param[in,out] client pointer to mqtt client
* @param[in] params mqtt init params, @see MQTTInitParams
* @return @see IotReturnCode
*
* @note
* 1. init device info.
* 2. init mutex and var
* 3. init list @see _mqtt_client_list_init
* 4. init connect option @see _mqtt_client_connect_option_init
* 5. init network @see _mqtt_client_network_init
*/
static int _qcloud_iot_mqtt_client_init(QcloudIotClient *client, const MQTTInitParams *params)
{
IOT_FUNC_ENTRY;
int rc = 0;
memset(client, 0x0, sizeof(QcloudIotClient));
// set device info
client->device_info = params->device_info;
rc = HAL_Snprintf(client->device_info->client_id, MAX_SIZE_OF_CLIENT_ID, "%s%s", client->device_info->product_id,
client->device_info->device_name);
#ifdef AUTH_MODE_KEY
utils_base64decode(client->device_info->device_secret_decode, MAX_SIZE_OF_DECODE_PSK_LENGTH,
&client->device_info->device_secret_decode_len, (uint8_t *)client->device_info->device_secret,
strlen(client->device_info->device_secret));
#endif
Log_i("SDK_Ver: %s, Product_ID: %s, Device_Name: %s", QCLOUD_IOT_DEVICE_SDK_VERSION,
client->device_info->product_id, client->device_info->device_name);
// command timeout should be in range [MIN_COMMAND_TIMEOUT, MAX_COMMAND_TIMEOUT]
client->command_timeout_ms =
(params->command_timeout < MIN_COMMAND_TIMEOUT)
? MIN_COMMAND_TIMEOUT
: (params->command_timeout > MAX_COMMAND_TIMEOUT ? MAX_COMMAND_TIMEOUT : params->command_timeout);
// packet id, random from [1 - 65536]
client->next_packet_id = _get_random_start_packet_id();
client->write_buf_size = QCLOUD_IOT_MQTT_TX_BUF_LEN;
client->read_buf_size = QCLOUD_IOT_MQTT_RX_BUF_LEN;
client->event_handle = params->event_handle;
client->auto_connect_enable = params->auto_connect_enable;
client->default_subscribe = params->default_subscribe;
client->lock_generic = HAL_MutexCreate();
if (!client->lock_generic) {
goto error;
}
client->lock_write_buf = HAL_MutexCreate();
if (!client->lock_write_buf) {
goto error;
}
rc = _mqtt_client_list_init(client);
if (rc) {
goto error;
}
rc = _mqtt_client_connect_option_init(client, params);
if (rc) {
goto error;
}
_mqtt_client_network_init(client, params);
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
error:
HAL_MutexDestroy(client->lock_generic);
client->lock_generic = NULL;
HAL_MutexDestroy(client->lock_write_buf);
client->lock_write_buf = NULL;
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
/**
* @brief Deinit mqtt client.
*
* @param[in,out] client pointer to mqtt client
*/
static void _qcloud_iot_mqtt_client_deinit(QcloudIotClient *client)
{
HAL_Free(client->options.username);
HAL_Free(client->options.password);
HAL_MutexDestroy(client->lock_generic);
HAL_MutexDestroy(client->lock_write_buf);
qcloud_iot_mqtt_sub_handle_array_clear(client);
qcloud_iot_mqtt_suback_wait_list_clear(client);
utils_list_destroy(client->list_pub_wait_ack);
utils_list_destroy(client->list_sub_wait_ack);
Log_i("release mqtt client resources");
}
/**************************************************************************************
* API
**************************************************************************************/
/**
* @brief Create MQTT client and connect to MQTT server.
*
* @param[in] params MQTT init parameters
* @return a valid MQTT client handle when success, or NULL otherwise
*/
void *IOT_MQTT_Construct(const MQTTInitParams *params)
{
POINTER_SANITY_CHECK(params, NULL);
POINTER_SANITY_CHECK(params->device_info, NULL);
STRING_PTR_SANITY_CHECK(params->device_info->product_id, NULL);
STRING_PTR_SANITY_CHECK(params->device_info->device_name, NULL);
#ifdef AUTH_MODE_CERT
STRING_PTR_SANITY_CHECK(params->device_info->dev_cert_file_name, NULL);
STRING_PTR_SANITY_CHECK(params->device_info->dev_key_file_name, NULL);
#else
STRING_PTR_SANITY_CHECK(params->device_info->device_secret, NULL);
#endif
int rc = 0;
QcloudIotClient *client = NULL;
// create and init MQTTClient
client = (QcloudIotClient *)HAL_Malloc(sizeof(QcloudIotClient));
if (!client) {
Log_e("malloc MQTTClient failed");
return NULL;
}
rc = _qcloud_iot_mqtt_client_init(client, params);
if (rc) {
Log_e("mqtt init failed: %d", rc);
goto exit;
}
if (!params->connect_when_construct) {
return client;
}
rc = qcloud_iot_mqtt_connect(client, FIRST_CONNECT);
if (rc) {
Log_e("mqtt connect with id: %s failed: %d", STRING_PTR_PRINT_SANITY_CHECK(client->conn_id), rc);
goto exit;
}
Log_i("mqtt connect with id: %s success", client->conn_id);
return client;
exit:
_qcloud_iot_mqtt_client_deinit(client);
HAL_Free(client);
return NULL;
}
/**
* @brief Connect Mqtt server if not connect.
*
* @param[in,out] client pointer to mqtt client pointer, should using the pointer of IOT_MQTT_Construct return.
* @return @see IotReturnCode
*/
int IOT_MQTT_Connect(void *client)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
if (!get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(qcloud_iot_mqtt_connect(client, FIRST_CONNECT));
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Close connection and destroy MQTT client.
*
* @param[in,out] client pointer to mqtt client pointer, should using the pointer of IOT_MQTT_Construct return.
* @return @see IotReturnCode
*/
int IOT_MQTT_Destroy(void **client)
{
POINTER_SANITY_CHECK(*client, QCLOUD_ERR_INVAL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)(*client);
int rc = qcloud_iot_mqtt_disconnect(mqtt_client);
if (rc) {
// disconnect network stack by force
mqtt_client->network_stack.disconnect(&(mqtt_client->network_stack));
set_client_conn_state(mqtt_client, NOTCONNECTED);
}
_qcloud_iot_mqtt_client_deinit(mqtt_client);
HAL_Free(*client);
*client = NULL;
Log_i("mqtt release!");
return rc;
}
/**
* @brief Check connection and keep alive state, read/handle MQTT packet in synchronized way.
*
* @param[in,out] client pointer to mqtt client
* @param[in] timeout_ms timeout value (unit: ms) for this operation
* @return QCLOUD_RET_SUCCESS when success, QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT when try reconnecting, others @see
* IotReturnCode
*/
int IOT_MQTT_Yield(void *client, uint32_t timeout_ms)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
return qcloud_iot_mqtt_yield(mqtt_client, timeout_ms);
}
/**
* @brief Publish MQTT message.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_name topic to publish
* @param[in] params @see PublishParams
* @return packet id (>=0) when success, or err code (<0) @see IotReturnCode
*/
int IOT_MQTT_Publish(void *client, const char *topic_name, const PublishParams *params)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
POINTER_SANITY_CHECK(params, QCLOUD_ERR_INVAL);
STRING_PTR_SANITY_CHECK(topic_name, QCLOUD_ERR_INVAL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
if (!get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
if (strlen(topic_name) > MAX_SIZE_OF_CLOUD_TOPIC) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
}
if (QOS2 == params->qos) {
Log_e("QoS2 is not supported currently");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_QOS_NOT_SUPPORT);
}
return qcloud_iot_mqtt_publish(mqtt_client, topic_name, params);
}
/**
* @brief Subscribe MQTT topic.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter to subscribe
* @param[in] params @see SubscribeParams
* @return packet id (>=0) when success, or err code (<0) @see IotReturnCode
*/
int IOT_MQTT_Subscribe(void *client, const char *topic_filter, const SubscribeParams *params)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
POINTER_SANITY_CHECK(params, QCLOUD_ERR_INVAL);
STRING_PTR_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
if (!get_client_conn_state(client) && !mqtt_client->default_subscribe) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
if (strlen(topic_filter) > MAX_SIZE_OF_CLOUD_TOPIC) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
}
if (QOS2 == params->qos) {
Log_e("QoS2 is not supported currently");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_QOS_NOT_SUPPORT);
}
return qcloud_iot_mqtt_subscribe(mqtt_client, topic_filter, params);
}
/**
* @brief Unsubscribe MQTT topic.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter to unsubscribe
* @return packet id (>=0) when success, or err code (<0) @see IotReturnCode
*/
int IOT_MQTT_Unsubscribe(void *client, const char *topic_filter)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
STRING_PTR_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
if (!get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
if (strlen(topic_filter) > MAX_SIZE_OF_CLOUD_TOPIC) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MAX_TOPIC_LENGTH);
}
return qcloud_iot_mqtt_unsubscribe(mqtt_client, topic_filter);
}
/**
* @brief Check if MQTT topic has been subscribed or not
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter to subscribe
* @return true already subscribed
* @return false not ready
*/
bool IOT_MQTT_IsSubReady(void *client, const char *topic_filter)
{
POINTER_SANITY_CHECK(client, false);
STRING_PTR_SANITY_CHECK(topic_filter, false);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
return qcloud_iot_mqtt_is_sub_ready(mqtt_client, topic_filter);
}
/**
* @brief Get user data in subscribe.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter to subscribe
* @return NULL or user data
*/
void *IOT_MQTT_GetSubUsrData(void *client, const char *topic_filter)
{
POINTER_SANITY_CHECK(client, NULL);
STRING_PTR_SANITY_CHECK(topic_filter, NULL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
return qcloud_iot_mqtt_get_subscribe_usr_data(mqtt_client, topic_filter);
}
/**
* @brief Subscribe and wait sub ready.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter to subscribe
* @param[in] params @see SubscribeParams
* @return @see IotReturnCode
*/
int IOT_MQTT_SubscribeSync(void *client, const char *topic_filter, const SubscribeParams *params)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
POINTER_SANITY_CHECK(params, QCLOUD_ERR_INVAL);
STRING_PTR_SANITY_CHECK(topic_filter, QCLOUD_ERR_INVAL);
int rc;
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
int cnt_sub = mqtt_client->command_timeout_ms / QCLOUD_IOT_MQTT_YIELD_TIMEOUT;
if (IOT_MQTT_IsSubReady(client, topic_filter)) {
// if already sub, free the user data
if (params->user_data_free) {
params->user_data_free(params->user_data);
}
return QCLOUD_RET_SUCCESS;
}
rc = IOT_MQTT_Subscribe(client, topic_filter, params);
if (rc < 0) {
Log_e("topic subscribe failed: %d, cnt: %d", rc, cnt_sub);
return rc;
}
while (cnt_sub-- >= 0 && rc >= 0 && !IOT_MQTT_IsSubReady(client, topic_filter)) {
/**
* @brief wait for subscription result
*
*/
rc = IOT_MQTT_Yield(client, QCLOUD_IOT_MQTT_YIELD_TIMEOUT);
}
return IOT_MQTT_IsSubReady(client, topic_filter) ? QCLOUD_RET_SUCCESS : QCLOUD_ERR_FAILURE;
}
/**
* @brief Check if MQTT is connected.
*
* @param[in,out] client pointer to mqtt client
* @return true connected
* @return false no connected
*/
bool IOT_MQTT_IsConnected(void *client)
{
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
return get_client_conn_state(mqtt_client);
}
/**
* @brief Get device info using to connect mqtt server.
*
* @param[in,out] client pointer to mqtt client
* @return @see DeviceInfo
*/
DeviceInfo *IOT_MQTT_GetDeviceInfo(void *client)
{
POINTER_SANITY_CHECK(client, NULL);
QcloudIotClient *mqtt_client = (QcloudIotClient *)client;
return mqtt_client->device_info;
}

View File

@@ -0,0 +1,130 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client_common.c
* @brief common api
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-28
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-28 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include "mqtt_client.h"
/**
* @brief Get the next packet id object.
*
* @param[in,out] client pointer to mqtt client
* @return packet id
*/
uint16_t get_next_packet_id(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
POINTER_SANITY_CHECK(client, QCLOUD_ERR_INVAL);
HAL_MutexLock(client->lock_generic);
client->next_packet_id = (uint16_t)((MAX_PACKET_ID == client->next_packet_id) ? 1 : (client->next_packet_id + 1));
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(client->next_packet_id);
}
/**
* @brief Get the next conn id object.
*
* @param[out] conn_id buffer to save conn id
*/
void get_next_conn_id(char *conn_id)
{
int i;
srand(HAL_Timer_CurrentSec());
for (i = 0; i < MAX_CONN_ID_LEN - 1; i++) {
int flag = rand() % 3;
switch (flag) {
case 0:
conn_id[i] = (rand() % 26) + 'a';
break;
case 1:
conn_id[i] = (rand() % 26) + 'A';
break;
case 2:
conn_id[i] = (rand() % 10) + '0';
break;
}
}
conn_id[MAX_CONN_ID_LEN - 1] = '\0';
}
/**
* @brief Set the client conn state object.
*
* @param[in,out] client pointer to mqtt client
* @param[in] connected connect status, @see ConnStatus
*/
void set_client_conn_state(QcloudIotClient *client, uint8_t connected)
{
HAL_MutexLock(client->lock_generic);
client->is_connected = connected;
HAL_MutexUnlock(client->lock_generic);
}
/**
* @brief Get the client conn state object.
*
* @param[in,out] client
* @return @see ConnStatus
*/
uint8_t get_client_conn_state(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
uint8_t is_connected = 0;
HAL_MutexLock(client->lock_generic);
is_connected = client->is_connected;
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(is_connected);
}
/**
* @brief Send mqtt packet, timeout = command_timeout_ms.
*
* @param[in,out] client pointer to mqtt client
* @param[in] length length of data to be sent, data is saved in client write_buf
* @return @see IotReturnCode
*/
int send_mqtt_packet(QcloudIotClient *client, size_t length)
{
IOT_FUNC_ENTRY;
int rc = QCLOUD_RET_SUCCESS;
size_t sent_len = 0;
if (length >= client->write_buf_size) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
rc = client->network_stack.write(&(client->network_stack), client->write_buf, length, client->command_timeout_ms,
&sent_len);
rc = QCLOUD_ERR_TCP_WRITE_TIMEOUT == rc ? QCLOUD_ERR_MQTT_REQUEST_TIMEOUT : rc;
IOT_FUNC_EXIT_RC(rc);
}

View File

@@ -0,0 +1,279 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client_connect.c
* @brief
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-28
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-28 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include "mqtt_client.h"
/**
* @brief switch to domain mode
*
*/
typedef enum {
HOST_DOMAIN = 1,
BACKUP_DOMAIN = 0,
} DomainMode;
/**
* @brief Serialize and send connect packet.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
static int _mqtt_connect(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
uint8_t session_present, connack_rc;
int rc, packet_len;
// TCP or TLS network connect
rc = client->network_stack.connect(&(client->network_stack));
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
// send MQTT CONNECT packet
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_connect_packet_serialize(client->write_buf, client->write_buf_size, &client->options);
if (packet_len > 0) {
rc = send_mqtt_packet(client, packet_len);
} else {
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
}
HAL_MutexUnlock(client->lock_write_buf);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
// recv MQTT CONNACK packet
rc = qcloud_iot_mqtt_wait_for_read(client, CONNACK);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
rc = mqtt_connack_packet_deserialize(client->read_buf, client->read_buf_size, &session_present, &connack_rc);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
if (CONNACK_CONNECTION_ACCEPTED != connack_rc) {
IOT_FUNC_EXIT_RC(connack_rc);
}
// set connect state
set_client_conn_state(client, CONNECTED);
HAL_MutexLock(client->lock_generic);
client->was_manually_disconnected = client->is_ping_outstanding = 0;
HAL_Timer_Countdown(&client->ping_timer, client->options.keep_alive_interval);
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief switch domain
*
* @param[in,out] client pointer to mqtt client
* @param[in] mode @see DomainMode
* @return @see IotReturnCode
*/
static void _mqtt_switch_domain(QcloudIotClient *client, DomainMode mode)
{
if (mode == BACKUP_DOMAIN && client->backup_host) {
HAL_Snprintf(client->host_addr, HOST_STR_LENGTH, "%s.%s", client->device_info->product_id, client->backup_host);
client->network_stack.host = client->host_addr;
} else if (mode == HOST_DOMAIN) {
HAL_Snprintf(client->host_addr, HOST_STR_LENGTH, "%s.%s", client->device_info->product_id, client->main_host);
client->network_stack.host = client->host_addr;
}
}
/**
* @brief set connect host
*
* @param[in,out] client pointer to mqtt client
* @param[in] srv_ip server ip string
* @return @see IotReturnCode
*/
static int _mqtt_set_srv_ip(QcloudIotClient *client, char *srv_ip)
{
int size = HAL_Snprintf(client->host_addr, HOST_STR_LENGTH, "%s", srv_ip);
if (size < 0 || size > HOST_STR_LENGTH - 1) {
Log_e("gen host name failed: %d", size);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
} else {
client->network_stack.host = client->host_addr;
Log_i("using HOST IP : %s", srv_ip);
}
return QCLOUD_RET_SUCCESS;
}
/**
* @brief Connect MQTT server.
*
* @param[in,out] client pointer to mqtt client
* @param[in] mode when connect error. Choose different connection strategies according to the mode
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_connect(QcloudIotClient *client, ConnMode mode)
{
IOT_FUNC_ENTRY;
int rc = 0;
// check connection state first
if (get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_ALREADY_CONNECTED);
}
int connect_fail_cnt = 0;
int max_connect_fail_switch_cnt = (mode == FIRST_CONNECT) ? 3 : 1;
while ((rc = _mqtt_connect(client))) {
client->network_stack.disconnect(&(client->network_stack));
connect_fail_cnt++;
if (connect_fail_cnt == max_connect_fail_switch_cnt && client->backup_host) {
_mqtt_switch_domain(client, BACKUP_DOMAIN);
continue;
}
if (connect_fail_cnt > max_connect_fail_switch_cnt) {
if (!client->get_next_host_ip) {
break;
}
char *srv_ip = client->get_next_host_ip();
if (!srv_ip) {
break;
}
_mqtt_set_srv_ip(client, srv_ip);
}
}
// In reconnection mode, the connection is successfully switched back to the primary domain name
if (!rc && connect_fail_cnt && mode == RECONNECT) {
_mqtt_switch_domain(client, HOST_DOMAIN);
}
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Reconnect MQTT server.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_attempt_reconnect(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc = 0;
Log_i("attempt to reconnect...");
if (get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_ALREADY_CONNECTED);
}
rc = qcloud_iot_mqtt_connect(client, RECONNECT);
if (!get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(rc);
}
#if 0 // clean session is 0, user don't need resubscribe only if mqtt server error.
if (!client->options.clean_session) {
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_RECONNECTED);
}
#endif
rc = qcloud_iot_mqtt_resubscribe(client);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_RECONNECTED);
}
/**
* @brief Disconnect MQTT server.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_disconnect(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc, packet_len = 0;
if (!get_client_conn_state(client)) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_disconnect_packet_serialize(client->write_buf, client->write_buf_size);
if (packet_len > 0) {
rc = send_mqtt_packet(client, packet_len);
} else {
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
}
HAL_MutexUnlock(client->lock_write_buf);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
client->network_stack.disconnect(&(client->network_stack));
set_client_conn_state(client, NOTCONNECTED);
client->was_manually_disconnected = 1;
Log_i("mqtt disconnect!");
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Serialize and send pingreq packet.
*
* @param[in,out] client pointer to mqtt client
* @param[in] try_times if failed, retry times
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_pingreq(QcloudIotClient *client, int try_times)
{
IOT_FUNC_ENTRY;
int rc, packet_len, i = 0;
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_pingreq_packet_serialize(client->write_buf, client->write_buf_size);
if (packet_len > 0) {
do {
rc = send_mqtt_packet(client, packet_len);
} while (rc && (i++ < try_times));
} else {
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
}
HAL_MutexUnlock(client->lock_write_buf);
IOT_FUNC_EXIT_RC(rc);
}

View File

@@ -0,0 +1,397 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client_publish.c
* @brief
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-28
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-28 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include "mqtt_client.h"
/**
* @brief Push pub info to list for republish.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] packet_len packet len of publish packet
* @param[in] packet_id packet id
* @param[out] node list node
* @return @see IotReturnCode
*/
static int _push_pub_info_to_list(QcloudIotClient *client, int packet_len, uint16_t packet_id, void **node)
{
IOT_FUNC_ENTRY;
void * list = client->list_pub_wait_ack;
QcloudIotPubInfo *repub_info = NULL;
// construct republish info
repub_info = (QcloudIotPubInfo *)HAL_Malloc(sizeof(QcloudIotPubInfo) + packet_len);
if (!repub_info) {
Log_e("memory malloc failed!");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
repub_info->buf = (uint8_t *)repub_info + sizeof(QcloudIotPubInfo);
repub_info->len = packet_len;
repub_info->packet_id = packet_id;
memcpy(repub_info->buf, client->write_buf, packet_len); // save the whole packet
HAL_Timer_CountdownMs(&repub_info->pub_start_time, client->command_timeout_ms);
// push republish info to list
*node = utils_list_push(list, repub_info);
if (!*node) {
HAL_Free(repub_info);
Log_e("list push failed! Check the list len!");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Check pub wait list timeout.
*
* @param[in,out] list pointer to pub wait ack list.
* @param[in] node pointer to list node
* @param[in] val pointer to value, @see QcloudIotPubInfo
* @param[in] usr_data @see QcloudIotClient
* @return @see UtilsListResult
*/
static UtilsListResult _pub_wait_list_process_check_timeout(void *list, void *node, void *val, void *usr_data)
{
IOT_FUNC_ENTRY;
MQTTEventMsg msg;
QcloudIotPubInfo *repub_info = (QcloudIotPubInfo *)val;
QcloudIotClient * client = (QcloudIotClient *)usr_data;
// check the request if timeout or not
if (HAL_Timer_Remain(&repub_info->pub_start_time) > 0) {
IOT_FUNC_EXIT_RC(LIST_TRAVERSE_CONTINUE);
}
// notify timeout event
if (client->event_handle.h_fp) {
msg.event_type = MQTT_EVENT_PUBLISH_TIMEOUT;
msg.msg = (void *)(uintptr_t)repub_info->packet_id;
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
utils_list_remove(list, node);
IOT_FUNC_EXIT_RC(LIST_TRAVERSE_CONTINUE);
}
/**
* @brief Remove info from pub wait list.
*
* @param[in,out] list pointer to pub wait ack list.
* @param[in] node pointer to list node
* @param[in] val pointer to value, @see QcloudIotPubInfo
* @param[in] usr_data pointer to packet id
* @return @see UtilsListResult
*/
static UtilsListResult _pub_wait_list_process_remove_info(void *list, void *node, void *val, void *usr_data)
{
IOT_FUNC_ENTRY;
QcloudIotPubInfo *repub_info = (QcloudIotPubInfo *)val;
if (repub_info->packet_id == *((uint16_t *)usr_data)) {
utils_list_remove(list, node);
IOT_FUNC_EXIT_RC(LIST_TRAVERSE_BREAK);
}
IOT_FUNC_EXIT_RC(LIST_TRAVERSE_CONTINUE);
}
/**
* @brief Remove node signed with packet id from publish ACK wait list.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] packet_id packet id
*/
static void _remove_pub_info_from_list(QcloudIotClient *client, uint16_t packet_id)
{
utils_list_process(client->list_pub_wait_ack, LIST_HEAD, _pub_wait_list_process_remove_info, &packet_id);
}
/**
* @brief Check if topic match
*
* @param[in] topic_filter topic name filter, wildcard is supported
* @param[in] topic_name topic name, no wildcard
* @param[in] topic_name_len length of topic name
* @return 1 for matched, 0 for no matched
*
* @note assume topic filter and name is in correct format;
* # can only be at end;
* + and # can only be next to separator.
*/
static uint8_t _is_topic_matched(char *topic_filter, const char *topic_name, uint16_t topic_name_len)
{
char *curf = topic_filter;
char *curn = (char *)topic_name;
char *curn_end = curn + topic_name_len;
while (*curf && curn < curn_end) {
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+') { // skip until we meet the next separator, or end of string
char *nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/') nextpos = ++curn + 1;
} else if (*curf == '#')
curn = curn_end - 1; // skip until end of string
curf++;
curn++;
};
return (curn == curn_end) && (*curf == '\0');
}
/**
* @brief deliver the message to user callback
*
* @param[in,out] client pointer to mqtt client
* @param[in] message message to deliver, @see MQTTMessage
*/
static void _deliver_message(QcloudIotClient *client, MQTTMessage *message)
{
int i;
MQTTEventMsg msg;
HAL_MutexLock(client->lock_generic);
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
if (client->sub_handles[i].topic_filter &&
_is_topic_matched((char *)client->sub_handles[i].topic_filter, message->topic_name, message->topic_len)) {
if (client->sub_handles[i].params.on_message_handler) {
HAL_MutexUnlock(client->lock_generic);
// if found, then handle it, then return
client->sub_handles[i].params.on_message_handler(client, message,
client->sub_handles[i].params.user_data);
return;
}
}
}
HAL_MutexUnlock(client->lock_generic);
/* Message handler not found for topic */
/* May be we do not care change FAILURE use SUCCESS*/
Log_d("no matching any topic, call default handle function");
if (client->event_handle.h_fp) {
msg.event_type = MQTT_EVENT_PUBLISH_RECEIVED;
msg.msg = message;
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
}
#ifdef MQTT_RMDUP_MSG_ENABLED
/**
* @brief Check if packet id repeat.
*
* @param[in,out] client pointer to mqtt client
* @param[in] packet_id packet_id
* @return < 0 for failed.
*/
static int _get_packet_id_repeat_buf(QcloudIotClient *client, uint16_t packet_id)
{
int i;
for (i = 0; i < MQTT_MAX_REPEAT_BUF_LEN; ++i) {
if (packet_id == client->repeat_packet_id_buf[i]) {
return packet_id;
}
}
return -1;
}
/**
* @brief Add packet id, if buf full, then overwrite.
*
* @param[in,out] client pointer to mqtt client
* @param[in] packet_id packet_id
*/
static void _add_packet_id_to_repeat_buf(QcloudIotClient *client, uint16_t packet_id)
{
if (_get_packet_id_repeat_buf(client, packet_id) >= 0) {
return;
}
client->repeat_packet_id_buf[client->current_packet_id_cnt++] = packet_id;
client->current_packet_id_cnt %= MQTT_MAX_REPEAT_BUF_LEN;
}
#endif
/**
* @brief Serialize and send publish packet.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] topic_name topic to publish
* @param[in] params publish params
* @return >=0 for packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_publish(QcloudIotClient *client, const char *topic_name, const PublishParams *params)
{
IOT_FUNC_ENTRY;
int rc, packet_len;
MQTTPublishFlags flags;
void * node = NULL;
uint16_t packet_id = 0;
if (params->qos > QOS0) {
packet_id = get_next_packet_id(client);
}
Log_d("publish qos=%d|packet_id=%d|topic_name=%s|payload=%s", params->qos, packet_id, topic_name,
STRING_PTR_PRINT_SANITY_CHECK((char *)params->payload));
flags.dup = params->dup;
flags.qos = params->qos;
flags.retain = params->retain;
// serialize packet
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_publish_packet_serialize(client->write_buf, client->write_buf_size, &flags, packet_id, topic_name,
params->payload, params->payload_len);
if (packet_len < 0) {
HAL_MutexUnlock(client->lock_write_buf);
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
IOT_FUNC_EXIT_RC(rc);
}
if (params->qos > QOS0) {
rc = _push_pub_info_to_list(client, packet_len, packet_id, &node);
if (rc) {
Log_e("push publish info failed!");
HAL_MutexUnlock(client->lock_write_buf);
IOT_FUNC_EXIT_RC(rc);
}
}
// send the publish packet
rc = send_mqtt_packet(client, packet_len);
HAL_MutexUnlock(client->lock_write_buf);
if (rc) {
if (params->qos > QOS0) {
utils_list_remove(client->list_pub_wait_ack, node);
}
IOT_FUNC_EXIT_RC(rc);
}
IOT_FUNC_EXIT_RC(packet_id);
}
/**
* @brief Deserialize publish packet and deliver_message.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] params publish params
* @return >=0 for packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_handle_publish(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc, packet_len = 0;
MQTTPublishFlags flags;
MQTTMessage msg;
rc = mqtt_publish_packet_deserialize(client->read_buf, client->read_buf_size, &flags, &msg.packet_id,
&msg.topic_name, &msg.topic_len, &msg.payload, &msg.payload_len);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
msg.qos = flags.qos;
if (QOS0 == msg.qos) {
// No further processing required for QOS0
_deliver_message(client, &msg);
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
// only Qos 1 is support
#ifdef MQTT_RMDUP_MSG_ENABLED
// check if packet_id has been received before
if (_get_packet_id_repeat_buf(client, msg.packet_id) < 0)
#endif
{
// deliver to msg callback
_deliver_message(client, &msg);
}
#ifdef MQTT_RMDUP_MSG_ENABLED
// just add packet id
_add_packet_id_to_repeat_buf(client, msg.packet_id);
#endif
// reply with puback
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_puback_packet_serialize(client->write_buf, client->write_buf_size, msg.packet_id);
if (packet_len > 0) {
rc = send_mqtt_packet(client, packet_len);
} else {
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
}
HAL_MutexUnlock(client->lock_write_buf);
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Deserialize puback packet.
*
* @param[in,out] client pointer to mqtt_client
* @return 0 for success.
*/
int qcloud_iot_mqtt_handle_puback(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc;
uint16_t packet_id;
MQTTEventMsg msg;
rc = mqtt_puback_packet_deserialize(client->read_buf, client->read_buf_size, &packet_id);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
_remove_pub_info_from_list(client, packet_id);
/* notify this event to user callback */
if (client->event_handle.h_fp) {
msg.event_type = MQTT_EVENT_PUBLISH_SUCCESS;
msg.msg = (void *)(uintptr_t)packet_id;
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Process puback waiting timout.
*
* @param[in,out] client pointer to mqtt_client
*/
void qcloud_iot_mqtt_check_pub_timeout(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
utils_list_process(client->list_pub_wait_ack, LIST_HEAD, _pub_wait_list_process_check_timeout, client);
IOT_FUNC_EXIT;
}

View File

@@ -0,0 +1,643 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client_subscribe.c
* @brief
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-28
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-28 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include "mqtt_client.h"
/**
* @brief Context of subscribe
*
*/
typedef struct {
uint16_t packet_id;
SubTopicHandle handler;
} QcloudIotSubConext;
/**
* @brief Free topic_filter and user_data
*
* @param[in] handler subtopic handle
*/
static void _clear_sub_handle(SubTopicHandle *handler)
{
if (handler->topic_filter) {
HAL_Free(handler->topic_filter);
handler->topic_filter = NULL;
}
if (handler->params.user_data && handler->params.user_data_free) {
handler->params.user_data_free(handler->params.user_data);
handler->params.user_data = NULL;
}
}
/**
* @brief Push node to subscribe(unsubscribe) ACK wait list.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] packet_len packet len of publish packet
* @param[in] packet_id packet id
* @param[in] type mqtt packet type SUBSCRIBE or UNSUBSCRIBE
* @param[in] handler subtopic handle
* @param[out] node node to push to list
* @return @see IotReturnCode
*/
static int _push_sub_info_to_list(QcloudIotClient *client, int packet_len, uint16_t packet_id, MQTTPacketType type,
const SubTopicHandle *handler, void **node)
{
IOT_FUNC_ENTRY;
void *list = client->list_sub_wait_ack;
QcloudIotSubInfo *sub_info = NULL;
sub_info = (QcloudIotSubInfo *)HAL_Malloc(sizeof(QcloudIotSubInfo) + packet_len);
if (!sub_info) {
Log_e("memory malloc failed!");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
sub_info->buf = (uint8_t *)sub_info + sizeof(QcloudIotSubInfo);
sub_info->len = packet_len;
sub_info->type = type;
sub_info->packet_id = packet_id;
sub_info->handler = *handler;
memcpy(sub_info->buf, client->write_buf, packet_len);
HAL_Timer_CountdownMs(&sub_info->sub_start_time, client->command_timeout_ms);
*node = utils_list_push(list, sub_info);
if (!*node) {
HAL_Free(sub_info);
Log_e("list push failed! Check the list len!");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Check pub wait list timeout.
*
* @param[in,out] list pointer to pub wait ack list.
* @param[in] node pointer to list node
* @param[in] val pointer to value, @see QcloudIotPubInfo
* @param[in] usr_data @see QcloudIotClient
* @return @see UtilsListResult
*/
static UtilsListResult _sub_wait_list_process_pop_info(void *list, void *node, void *val, void *usr_data)
{
QcloudIotSubConext *sub_context = (QcloudIotSubConext *)usr_data;
QcloudIotSubInfo *sub_info = (QcloudIotSubInfo *)val;
if (sub_info->packet_id == sub_context->packet_id) {
memcpy(&sub_context->handler, &sub_info->handler, sizeof(SubTopicHandle));
utils_list_remove(list, node);
return LIST_TRAVERSE_BREAK;
}
return LIST_TRAVERSE_CONTINUE;
}
/**
* @brief Check sub wait list timeout.
*
* @param[in,out] list pointer to sub wait ack list.
* @param[in] node pointer to list node
* @param[in] val pointer to value, @see QcloudIotSubInfo
* @param[in] usr_data @see QcloudIotClient
* @return @see UtilsListResult
*/
static UtilsListResult _sub_wait_list_process_check_timeout(void *list, void *node, void *val, void *usr_data)
{
MQTTEventMsg msg;
QcloudIotSubInfo *sub_info = (QcloudIotSubInfo *)val;
QcloudIotClient *client = (QcloudIotClient *)usr_data;
// check the request if timeout or not
if (HAL_Timer_Remain(&sub_info->sub_start_time) > 0) {
return LIST_TRAVERSE_CONTINUE;
}
// notify timeout event
if (client->event_handle.h_fp) {
msg.event_type = SUBSCRIBE == sub_info->type ? MQTT_EVENT_SUBSCRIBE_TIMEOUT : MQTT_EVENT_UNSUBSCRIBE_TIMEOUT;
msg.msg = (void *)(uintptr_t)sub_info->packet_id;
if (sub_info->handler.params.on_sub_event_handler) {
sub_info->handler.params.on_sub_event_handler(client, MQTT_EVENT_SUBSCRIBE_TIMEOUT,
sub_info->handler.params.user_data);
}
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
_clear_sub_handle(&sub_info->handler);
utils_list_remove(list, node);
return LIST_TRAVERSE_CONTINUE;
}
/**
* @brief Clear sub wait list.
*
* @param[in,out] list pointer to sub wait list.
* @param[in] node pointer to list node
* @param[in] val pointer to value, @see QcloudIotSubInfo
* @param[in] usr_data null
* @return @see UtilsListResult
*/
static UtilsListResult _sub_wait_list_process_clear(void *list, void *node, void *val, void *usr_data)
{
QcloudIotSubInfo *sub_info = (QcloudIotSubInfo *)val;
if (sub_info->type == UNSUBSCRIBE) {
_clear_sub_handle(&sub_info->handler);
}
utils_list_remove(list, node);
return LIST_TRAVERSE_CONTINUE;
}
/**
* @brief Pop node signed with packet id from subscribe ACK wait list, and return the sub handler
*
* @param[in,out] client pointer to mqtt_client
* @param[in] packet_id packet id
* @param[out] sub_handle @see SubTopicHandle
*/
static void _pop_sub_info_from_list(QcloudIotClient *client, uint16_t packet_id, SubTopicHandle *sub_handle)
{
QcloudIotSubConext sub_context = {.packet_id = packet_id};
utils_list_process(client->list_sub_wait_ack, LIST_HEAD, _sub_wait_list_process_pop_info, &sub_context);
memcpy(sub_handle, &sub_context.handler, sizeof(SubTopicHandle));
}
/**
* @brief Remove sub handle when unsubscribe.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] topic_filter topic to remove
* @return true topic exist
* @return false topic no exist
*/
static bool _remove_sub_handle_from_array(QcloudIotClient *client, const char *topic_filter)
{
int i;
bool topic_exists = false;
// remove from message handler array
HAL_MutexLock(client->lock_generic);
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
if ((client->sub_handles[i].topic_filter && !strcmp(client->sub_handles[i].topic_filter, topic_filter)) ||
strstr(topic_filter, "/#") || strstr(topic_filter, "/+")) {
// notify this event to topic subscriber
if (client->sub_handles[i].params.on_sub_event_handler) {
client->sub_handles[i].params.on_sub_event_handler(client, MQTT_EVENT_UNSUBSCRIBE,
client->sub_handles[i].params.user_data);
}
_clear_sub_handle(&client->sub_handles[i]);
// we don't want to break here, if the same topic is registered*with 2 callbacks.Unlikely scenario
topic_exists = true;
}
}
HAL_MutexUnlock(client->lock_generic);
return topic_exists;
}
/**
* @brief Add sub handle when subscribe.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] sub_handle sub_handle to be add to array
* @return true topic exist
* @return false topic no exist
*/
static int _add_sub_handle_to_array(QcloudIotClient *client, const SubTopicHandle *sub_handle)
{
IOT_FUNC_ENTRY;
int i, i_free = -1;
HAL_MutexLock(client->lock_generic);
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
if (client->sub_handles[i].topic_filter) {
if (!strcmp(client->sub_handles[i].topic_filter, sub_handle->topic_filter)) {
i_free = i;
// free the memory before
_clear_sub_handle(&client->sub_handles[i]);
Log_w("Identical topic found: %s", sub_handle->topic_filter);
break;
}
} else {
i_free = i_free == -1 ? i : i_free;
}
}
if (-1 == i_free) {
Log_e("NO more @sub_handles space!");
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
client->sub_handles[i_free] = *sub_handle;
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Set sub handle status.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] topic_filter topic to set status
* @param[in] status @see SubStatus
*/
static void _set_sub_handle_status_to_array(QcloudIotClient *client, const char *topic_filter, SubStatus status)
{
IOT_FUNC_ENTRY;
int i;
HAL_MutexLock(client->lock_generic);
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
if (client->sub_handles[i].topic_filter) {
if (!strcmp(client->sub_handles[i].topic_filter, topic_filter)) {
client->sub_handles[i].status = status;
break;
}
}
}
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT;
}
/**
* @brief Serialize and send subscribe packet.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic to subscribe
* @param[in] params subscribe params
* @return >=0 for packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_subscribe(QcloudIotClient *client, const char *topic_filter, const SubscribeParams *params)
{
IOT_FUNC_ENTRY;
int rc, packet_len, qos = params->qos;
uint16_t packet_id;
char *topic_filter_stored;
void *node = NULL;
SubTopicHandle sub_handle;
// topic filter should be valid in the whole sub life
topic_filter_stored = HAL_Malloc(strlen(topic_filter) + 1);
if (!topic_filter_stored) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MALLOC);
}
strncpy(topic_filter_stored, topic_filter, strlen(topic_filter) + 1);
sub_handle.topic_filter = topic_filter_stored;
sub_handle.params = *params;
sub_handle.status = client->default_subscribe ? SUB_ACK_RECEIVED : SUB_ACK_NOT_RECEIVED;
// add sub handle first to process
rc = _add_sub_handle_to_array(client, &sub_handle);
if (rc) {
goto exit;
}
if (client->default_subscribe) {
return 0;
}
packet_id = get_next_packet_id(client);
Log_d("subscribe topic_name=%s|packet_id=%d", topic_filter_stored, packet_id);
// serialize packet
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_subscribe_packet_serialize(client->write_buf, client->write_buf_size, packet_id, 1,
&topic_filter_stored, &qos);
if (packet_len < 0) {
HAL_MutexUnlock(client->lock_write_buf);
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
goto exit;
}
// add node into sub ack wait list
rc = _push_sub_info_to_list(client, packet_len, packet_id, SUBSCRIBE, &sub_handle, &node);
if (rc) {
HAL_MutexUnlock(client->lock_write_buf);
goto exit;
}
// send packet
rc = send_mqtt_packet(client, packet_len);
HAL_MutexUnlock(client->lock_write_buf);
if (rc) {
utils_list_remove(client->list_sub_wait_ack, node);
goto exit;
}
IOT_FUNC_EXIT_RC(packet_id);
exit:
_remove_sub_handle_from_array(client, topic_filter_stored);
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Deserialize suback packet and return sub result.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_handle_suback(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc, count = 0;
uint16_t packet_id = 0;
int granted_qos;
MQTTEventMsg msg = {MQTT_EVENT_SUBSCRIBE_SUCCESS, NULL};
SubTopicHandle sub_handle = {0};
MQTTEventType event_type = MQTT_EVENT_SUBSCRIBE_SUCCESS;
rc = mqtt_suback_packet_deserialize(client->read_buf, client->read_buf_size, 1, &count, &packet_id, &granted_qos);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
msg.msg = (void *)(uintptr_t)packet_id;
// pop sub info and get sub handle
_pop_sub_info_from_list(client, packet_id, &sub_handle);
if (!sub_handle.topic_filter) {
Log_w("can't get sub handle from list!");
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS); // in case of resubscribe when reconnect
}
// check return code in SUBACK packet: 0x00(QOS0, SUCCESS),0x01(QOS1, SUCCESS),0x02(QOS2,SUCCESS),0x80(Failure)
if (0x80 == granted_qos) {
msg.event_type = MQTT_EVENT_SUBSCRIBE_NACK;
event_type = MQTT_EVENT_SUBSCRIBE_NACK;
Log_e("MQTT SUBSCRIBE failed, packet_id: %u topic: %s", packet_id, sub_handle.topic_filter);
_remove_sub_handle_from_array(client, sub_handle.topic_filter);
rc = QCLOUD_ERR_MQTT_SUB;
}
if (MQTT_EVENT_SUBSCRIBE_SUCCESS == msg.event_type) {
_set_sub_handle_status_to_array(client, sub_handle.topic_filter, SUB_ACK_RECEIVED);
}
// notify this event to user callback
if (client->event_handle.h_fp) {
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
// notify this event to topic subscriber
if (sub_handle.params.on_sub_event_handler) {
sub_handle.params.on_sub_event_handler(client, event_type, sub_handle.params.user_data);
}
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Serialize and send unsubscribe packet.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic to unsubscribe
* @return >=0 packet id, < 0 for failed @see IotReturnCode
*/
int qcloud_iot_mqtt_unsubscribe(QcloudIotClient *client, const char *topic_filter)
{
IOT_FUNC_ENTRY;
int rc, packet_len;
uint16_t packet_id;
void *node = NULL;
SubTopicHandle sub_handle;
memset(&sub_handle, 0, sizeof(SubTopicHandle));
// remove from sub handle
if (!_remove_sub_handle_from_array(client, topic_filter)) {
Log_w("subscription does not exists: %s", topic_filter);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_UNSUB_FAIL);
}
// topic filter should be valid in the whole sub life
char *topic_filter_stored = HAL_Malloc(strlen(topic_filter) + 1);
if (!topic_filter_stored) {
Log_e("malloc failed");
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
strncpy(topic_filter_stored, topic_filter, strlen(topic_filter) + 1);
sub_handle.topic_filter = topic_filter_stored;
packet_id = get_next_packet_id(client);
Log_d("unsubscribe topic_name=%s|packet_id=%d", topic_filter_stored, packet_id);
HAL_MutexLock(client->lock_write_buf);
packet_len = mqtt_unsubscribe_packet_serialize(client->write_buf, client->write_buf_size, packet_id, 1,
&topic_filter_stored);
if (packet_len < 0) {
HAL_MutexUnlock(client->lock_write_buf);
rc = packet_len == MQTT_ERR_SHORT_BUFFER ? QCLOUD_ERR_BUF_TOO_SHORT : QCLOUD_ERR_FAILURE;
goto exit;
}
// add node into sub ack wait list
rc = _push_sub_info_to_list(client, packet_len, packet_id, UNSUBSCRIBE, &sub_handle, &node);
if (rc) {
Log_e("push unsubscribe info failed!");
HAL_MutexUnlock(client->lock_write_buf);
goto exit;
}
/* send the unsubscribe packet */
rc = send_mqtt_packet(client, packet_len);
HAL_MutexUnlock(client->lock_write_buf);
if (rc) {
utils_list_remove(client->list_sub_wait_ack, node);
goto exit;
}
IOT_FUNC_EXIT_RC(packet_id);
exit:
_clear_sub_handle(&sub_handle);
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Deserialize unsuback packet and remove from list.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_handle_unsuback(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc;
uint16_t packet_id = 0;
SubTopicHandle sub_handle = {0};
MQTTEventMsg msg;
rc = mqtt_unsuback_packet_deserialize(client->read_buf, client->read_buf_size, &packet_id);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
_pop_sub_info_from_list(client, packet_id, &sub_handle);
_clear_sub_handle(&sub_handle);
if (client->event_handle.h_fp) {
msg.event_type = MQTT_EVENT_UNSUBSCRIBE_SUCCESS;
msg.msg = (void *)(uintptr_t)packet_id;
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Process suback waiting timeout.
*
* @param[in,out] client pointer to mqtt client
*/
void qcloud_iot_mqtt_check_sub_timeout(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
utils_list_process(client->list_sub_wait_ack, LIST_HEAD, _sub_wait_list_process_check_timeout, client);
IOT_FUNC_EXIT;
}
/**
* @brief Resubscribe topic when reconnect.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_resubscribe(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc, packet_len, qos, itr = 0;
uint16_t packet_id;
SubTopicHandle *sub_handle;
for (itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) {
sub_handle = &client->sub_handles[itr];
if (!sub_handle->topic_filter) {
continue;
}
packet_id = get_next_packet_id(client);
Log_d("subscribe topic_name=%s|packet_id=%d", sub_handle->topic_filter, packet_id);
HAL_MutexLock(client->lock_write_buf);
qos = sub_handle->params.qos;
packet_len = mqtt_subscribe_packet_serialize(client->write_buf, client->write_buf_size, packet_id, 1,
&sub_handle->topic_filter, &qos);
if (packet_len < 0) {
HAL_MutexUnlock(client->lock_write_buf);
continue;
}
// send packet
rc = send_mqtt_packet(client, packet_len);
HAL_MutexUnlock(client->lock_write_buf);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
}
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Return if topic is sub ready.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter
* @return true for ready
* @return false for not ready
*/
bool qcloud_iot_mqtt_is_sub_ready(QcloudIotClient *client, const char *topic_filter)
{
IOT_FUNC_ENTRY;
int i = 0;
HAL_MutexLock(client->lock_generic);
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
if ((client->sub_handles[i].topic_filter && !strcmp(client->sub_handles[i].topic_filter, topic_filter)) ||
strstr(topic_filter, "/#") || strstr(topic_filter, "/+")) {
HAL_MutexUnlock(client->lock_generic);
return client->sub_handles[i].status == SUB_ACK_RECEIVED;
}
}
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(false);
}
/**
* @brief Get usr data, usr should handle lock/unlock usrdata itself in callback and caller.
*
* @param[in,out] client pointer to mqtt client
* @param[in] topic_filter topic filter
* @return NULL or user data
*/
void *qcloud_iot_mqtt_get_subscribe_usr_data(QcloudIotClient *client, const char *topic_filter)
{
IOT_FUNC_ENTRY;
int i = 0;
HAL_MutexLock(client->lock_generic);
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
if ((client->sub_handles[i].topic_filter && !strcmp(client->sub_handles[i].topic_filter, topic_filter)) ||
strstr(topic_filter, "/#") || strstr(topic_filter, "/+")) {
HAL_MutexUnlock(client->lock_generic);
return client->sub_handles[i].params.user_data;
}
}
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT_RC(NULL);
}
/**
* @brief Clear sub handle array.
*
* @param[in,out] client pointer to mqtt client
*/
void qcloud_iot_mqtt_sub_handle_array_clear(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
/* notify this event to topic subscriber */
if (client->sub_handles[i].topic_filter && client->sub_handles[i].params.on_sub_event_handler) {
client->sub_handles[i].params.on_sub_event_handler(client, MQTT_EVENT_CLIENT_DESTROY,
client->sub_handles[i].params.user_data);
}
_clear_sub_handle(&client->sub_handles[i]);
}
IOT_FUNC_EXIT;
}
/**
* @brief Clear suback wait list and clear sub handle.
*
* @param[in,out] client pointer to mqtt client
*/
void qcloud_iot_mqtt_suback_wait_list_clear(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
utils_list_process(client->list_sub_wait_ack, LIST_HEAD, _sub_wait_list_process_clear, client);
IOT_FUNC_EXIT;
}

View File

@@ -0,0 +1,554 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client_yield.c
* @brief
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-05-28
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-05-28 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>fix code standard of IotReturnCode and QcloudIotClient
* </table>
*/
#include "mqtt_client.h"
/**
* @brief Remain waiting time after MQTT data is received (unit: ms)
*
*/
#define QCLOUD_IOT_MQTT_MAX_REMAIN_WAIT_MS (100)
/**
* @brief Read one byte from network for mqtt packet header except remaining length.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] timeout_ms timeout to read
* @param[out] packet_type packet type to get
* @param[in,out] pptr pointer to buf pointer
* @return @see IotReturnCode
*/
static int _read_packet_header(QcloudIotClient *client, uint32_t timeout_ms, uint8_t *packet_type, uint8_t **pptr)
{
int rc = 0;
size_t read_len = 0;
// refresh timeout
timeout_ms = timeout_ms <= 0 ? 1 : timeout_ms;
rc = client->network_stack.read(&(client->network_stack), *pptr, 1, timeout_ms, &read_len);
if (rc == QCLOUD_ERR_SSL_NOTHING_TO_READ || rc == QCLOUD_ERR_TCP_NOTHING_TO_READ) {
return QCLOUD_ERR_MQTT_NOTHING_TO_READ;
}
// get packet type
*packet_type = (**pptr & 0xf0) >> 4;
(*pptr)++;
return rc;
}
/**
* @brief Read 1~4 bytes from network for mqtt packet header remaining length.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] timeout_ms timeout to read
* @param[out] plen read len
* @param[in,out] pptr pointer to buf pointer
* @return int
*/
static int _read_packet_remaining_len(QcloudIotClient *client, uint32_t timeout_ms, uint32_t *plen, uint8_t **pptr)
{
IOT_FUNC_ENTRY;
uint8_t *buf = *pptr;
uint8_t c = 0;
uint32_t multiplier = 1;
uint32_t count = 0;
uint32_t len = 0;
size_t rlen = 0;
// refresh timeout
timeout_ms = timeout_ms <= 0 ? 1 : timeout_ms;
timeout_ms += QCLOUD_IOT_MQTT_MAX_REMAIN_WAIT_MS;
do {
if (++count > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
/* bad data */
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_PACKET_READ)
}
if (client->network_stack.read(&(client->network_stack), buf, 1, timeout_ms, &rlen)) {
/* The value argument is the important value. len is just used temporarily
* and never used by the calling function for anything else */
IOT_FUNC_EXIT_RC(QCLOUD_ERR_FAILURE);
}
c = *buf++;
len += (c & 127) * multiplier;
multiplier *= 128;
} while (c & 128);
if (plen) {
*plen = len;
}
*pptr += count;
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Discard the packet for mqtt read buf not enough.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] timeout_ms timeout to read
* @param[in] rem_len remaining length to read
*/
static void _discard_packet_for_short_buf(QcloudIotClient *client, uint32_t timeout_ms, uint32_t rem_len)
{
int rc;
size_t bytes_to_be_read, total_bytes_read = 0;
size_t read_len;
timeout_ms = timeout_ms <= 0 ? 1 : timeout_ms;
timeout_ms += QCLOUD_IOT_MQTT_MAX_REMAIN_WAIT_MS;
bytes_to_be_read = client->read_buf_size;
do {
rc = client->network_stack.read(&(client->network_stack), client->read_buf, bytes_to_be_read, timeout_ms,
&read_len);
if (rc) {
break;
}
total_bytes_read += read_len;
bytes_to_be_read =
(rem_len - total_bytes_read) >= client->read_buf_size ? client->read_buf_size : rem_len - total_bytes_read;
} while (total_bytes_read < rem_len);
}
/**
* @brief Read rem_len bytes from network for mqtt packet payload.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] timeout_ms timeout to read
* @param[in] rem_len length to read
* @param[out] buf buffer to save payload
* @return @see IotReturnCode
*/
static int _read_packet_payload(QcloudIotClient *client, uint32_t timeout_ms, uint32_t rem_len, uint8_t *buf)
{
int rc;
size_t read_len = 0;
timeout_ms = timeout_ms <= 0 ? 1 : timeout_ms;
timeout_ms += QCLOUD_IOT_MQTT_MAX_REMAIN_WAIT_MS;
rc = client->network_stack.read(&(client->network_stack), buf, rem_len, timeout_ms, &read_len);
if (rc) {
return rc;
}
if (read_len != rem_len) {
#ifdef AUTH_WITH_NO_TLS
return QCLOUD_ERR_SSL_READ_TIMEOUT;
#else
return QCLOUD_ERR_TCP_READ_TIMEOUT;
#endif
}
return QCLOUD_RET_SUCCESS;
}
/**
* @brief Read MQTT packet from network stack
*
* @param[in,out] client pointer to mqtt_client
* @param[in,out] timer timeout timer
* @param[out] packet_type MQTT packet type
* @return @see IotReturnCode
*
* @note
* 1. read 1st byte in fixed header and check if valid
* 2. read the remaining length
* 3. read payload according to remaining length
*/
static int _read_mqtt_packet(QcloudIotClient *client, Timer *timer, uint8_t *packet_type)
{
IOT_FUNC_ENTRY;
int rc = 0;
uint32_t rem_len = 0, packet_len = 0;
uint8_t *packet_read_buf = client->read_buf;
// 1. read 1st byte in fixed header and check if valid
rc = _read_packet_header(client, HAL_Timer_Remain(timer), packet_type, &packet_read_buf);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
// 2. read the remaining length
rc = _read_packet_remaining_len(client, HAL_Timer_Remain(timer), &rem_len, &packet_read_buf);
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
// if read buffer is not enough to read the remaining length, discard the packet
packet_len = packet_read_buf - client->read_buf + rem_len;
if (packet_len >= client->read_buf_size) {
_discard_packet_for_short_buf(client, HAL_Timer_Remain(timer), rem_len);
Log_e("MQTT Recv buffer not enough: %lu < %d", client->read_buf_size, rem_len);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_BUF_TOO_SHORT);
}
// 3. read payload according to remaining length
if (rem_len > 0) {
rc = _read_packet_payload(client, HAL_Timer_Remain(timer), rem_len, packet_read_buf);
}
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Reset ping counter & ping timer.
*
* @param[in,out] client pointer to mqtt_client
*/
static void _handle_pingresp_packet(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
HAL_MutexLock(client->lock_generic);
client->is_ping_outstanding = 0;
HAL_Timer_Countdown(&client->ping_timer, client->options.keep_alive_interval);
HAL_MutexUnlock(client->lock_generic);
IOT_FUNC_EXIT;
}
/**
* @brief Read a mqtt packet from network and handle it.
*
* @param[in,out] client pointer to mqtt_client
* @param[in] timer timer for operation
* @param[out] packet_type packet type of packet read
* @return @see IotReturnCode
*/
static int _cycle_for_read(QcloudIotClient *client, Timer *timer, uint8_t *packet_type)
{
IOT_FUNC_ENTRY;
int rc;
/* read the socket, see what work is due */
rc = _read_mqtt_packet(client, timer, packet_type);
if (QCLOUD_ERR_MQTT_NOTHING_TO_READ == rc) {
/* Nothing to read, not a cycle failure */
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
if (rc) {
IOT_FUNC_EXIT_RC(rc);
}
switch (*packet_type) {
case CONNACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
case PINGRESP:
break;
case PUBACK:
rc = qcloud_iot_mqtt_handle_puback(client);
break;
case SUBACK:
rc = qcloud_iot_mqtt_handle_suback(client);
break;
case UNSUBACK:
rc = qcloud_iot_mqtt_handle_unsuback(client);
break;
case PUBLISH:
rc = qcloud_iot_mqtt_handle_publish(client);
break;
default:
// Either unknown packet type or failure occurred should not happen
IOT_FUNC_EXIT_RC(QCLOUD_ERR_RX_MESSAGE_INVAL);
break;
}
switch (*packet_type) {
// Recv below msgs are all considered as PING OK
case PUBACK:
case SUBACK:
case UNSUBACK:
case PINGRESP:
_handle_pingresp_packet(client);
break;
// Recv downlink pub means link is OK but we still need to send PING request
case PUBLISH:
HAL_MutexLock(client->lock_generic);
client->is_ping_outstanding = 0;
HAL_MutexUnlock(client->lock_generic);
break;
}
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Set reconnect wait interval if auto conect is enable.
*
* @param[in,out] client pointer to mqtt client
*/
static void _set_reconnect_wait_interval(QcloudIotClient *client)
{
client->counter_network_disconnected++;
if (client->auto_connect_enable) {
srand(HAL_Timer_CurrentSec());
// range: 1000 - 2000 ms, in 10ms unit
client->current_reconnect_wait_interval = (rand() % 100 + 100) * 10;
HAL_Timer_CountdownMs(&(client->reconnect_delay_timer), client->current_reconnect_wait_interval);
}
}
/**
* @brief Handle disconnect if connection is error.
*
* @param[in,out] client pointer to mqtt client
*/
static void _handle_disconnect(QcloudIotClient *client)
{
int rc;
MQTTEventMsg msg;
if (!get_client_conn_state(client)) {
_set_reconnect_wait_interval(client);
return;
}
rc = qcloud_iot_mqtt_disconnect(client);
// disconnect network stack by force
if (rc) {
client->network_stack.disconnect(&(client->network_stack));
set_client_conn_state(client, NOTCONNECTED);
}
Log_e("disconnect MQTT for some reasons..");
// notify event
if (client->event_handle.h_fp) {
msg.event_type = MQTT_EVENT_DISCONNECT;
msg.msg = NULL;
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
// exceptional disconnection
client->was_manually_disconnected = 0;
_set_reconnect_wait_interval(client);
}
/**
* @brief Handle reconnect.
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
static int _handle_reconnect(QcloudIotClient *client)
{
IOT_FUNC_ENTRY;
int rc = QCLOUD_RET_MQTT_RECONNECTED;
MQTTEventMsg msg;
// reconnect control by delay timer (increase interval exponentially )
if (!HAL_Timer_Expired(&(client->reconnect_delay_timer))) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT);
}
rc = qcloud_iot_mqtt_attempt_reconnect(client);
if (QCLOUD_RET_MQTT_RECONNECTED == rc) {
Log_e("attempt to reconnect success.");
// notify event
if (client->event_handle.h_fp) {
msg.event_type = MQTT_EVENT_RECONNECT;
msg.msg = NULL;
client->event_handle.h_fp(client, client->event_handle.context, &msg);
}
IOT_FUNC_EXIT_RC(rc);
}
Log_e("attempt to reconnect failed, errCode: %d", rc);
client->current_reconnect_wait_interval *= 2;
if (MAX_RECONNECT_WAIT_INTERVAL < client->current_reconnect_wait_interval) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT);
}
HAL_Timer_CountdownMs(&(client->reconnect_delay_timer), client->current_reconnect_wait_interval);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT);
}
/**
* @brief Handle MQTT keep alive (hearbeat with server).
*
* @param[in,out] client pointer to mqtt client
* @return @see IotReturnCode
*/
static int _mqtt_keep_alive(QcloudIotClient *client)
{
#define MQTT_PING_RETRY_TIMES 2
#define MQTT_PING_SEND_RETRY_TIMES 3
IOT_FUNC_ENTRY;
int rc = 0;
if (0 == client->options.keep_alive_interval) {
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
if (!HAL_Timer_Expired(&client->ping_timer)) {
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
if (client->is_ping_outstanding >= MQTT_PING_RETRY_TIMES) {
// reaching here means we haven't received any MQTT packet for a long time (keep_alive_interval)
Log_e("Fail to recv MQTT msg. Something wrong with the connection.");
_handle_disconnect(client);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
// there is no ping outstanding - send one
rc = qcloud_iot_mqtt_pingreq(client, MQTT_PING_SEND_RETRY_TIMES);
if (rc) {
// if sending a PING fails, propably the connection is not OK and we decide to disconnect and begin reconnection
// attempts
Log_e("Fail to send PING request. Something wrong with the connection.");
_handle_disconnect(client);
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
// start a timer to wait for PINGRESP from server
HAL_MutexLock(client->lock_generic);
client->is_ping_outstanding++;
HAL_Timer_CountdownMs(&client->ping_timer, client->command_timeout_ms);
HAL_MutexUnlock(client->lock_generic);
Log_d("PING request %u has been sent...", client->is_ping_outstanding);
IOT_FUNC_EXIT_RC(QCLOUD_RET_SUCCESS);
}
/**
* @brief Check connection and keep alive state, read/handle MQTT message in synchronized way.
*
* @param[in,out] client pointer to mqtt client
* @param[in] timeout_ms timeout value (unit: ms) for this operation
*
* @return QCLOUD_RET_SUCCESS when success, QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT when try reconnecting, others @see
* IotReturnCode
*/
int qcloud_iot_mqtt_yield(QcloudIotClient *client, uint32_t timeout_ms)
{
IOT_FUNC_ENTRY;
int rc = QCLOUD_RET_SUCCESS;
uint8_t packet_type;
Timer timer;
// 1. check if manually disconnect
if (!get_client_conn_state(client) && client->was_manually_disconnected == 1) {
IOT_FUNC_EXIT_RC(QCLOUD_RET_MQTT_MANUALLY_DISCONNECTED);
}
// 2. check connection state and if auto reconnect is enabled
if (!get_client_conn_state(client) && client->auto_connect_enable == 0) {
IOT_FUNC_EXIT_RC(QCLOUD_ERR_MQTT_NO_CONN);
}
// 3. main loop for packet reading/handling and keep alive maintainance
HAL_Timer_CountdownMs(&timer, timeout_ms);
while (!HAL_Timer_Expired(&timer)) {
// handle reconnect
if (!get_client_conn_state(client)) {
if (client->current_reconnect_wait_interval > MAX_RECONNECT_WAIT_INTERVAL) {
rc = QCLOUD_ERR_MQTT_RECONNECT_TIMEOUT;
break;
}
rc = _handle_reconnect(client);
continue;
}
// read and handle packet
rc = _cycle_for_read(client, &timer, &packet_type);
switch (rc) {
case QCLOUD_RET_SUCCESS:
// check list of wait publish ACK to remove node that is ACKED or timeout
qcloud_iot_mqtt_check_pub_timeout(client);
// check list of wait subscribe(or unsubscribe) ACK to remove node that is ACKED or timeout
qcloud_iot_mqtt_check_sub_timeout(client);
rc = _mqtt_keep_alive(client);
if (rc) {
IOT_FUNC_EXIT_RC(client->auto_connect_enable ? QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT : rc);
}
break;
case QCLOUD_ERR_SSL_READ_TIMEOUT:
case QCLOUD_ERR_SSL_READ:
case QCLOUD_ERR_TCP_PEER_SHUTDOWN:
case QCLOUD_ERR_TCP_READ_TIMEOUT:
case QCLOUD_ERR_TCP_READ_FAIL:
Log_e("network read failed, rc: %d. MQTT Disconnect.", rc);
_handle_disconnect(client);
IOT_FUNC_EXIT_RC(client->auto_connect_enable ? QCLOUD_ERR_MQTT_ATTEMPTING_RECONNECT
: QCLOUD_ERR_MQTT_NO_CONN);
break;
default: // others, just return
IOT_FUNC_EXIT_RC(rc);
}
}
IOT_FUNC_EXIT_RC(rc);
}
/**
* @brief Wait read specific mqtt packet, such as connack.
*
* @param[in,out] client pointer to mqtt client
* @param[in] packet_type packet type except to read
* @return @see IotReturnCode
*/
int qcloud_iot_mqtt_wait_for_read(QcloudIotClient *client, uint8_t packet_type)
{
IOT_FUNC_ENTRY;
int rc;
uint8_t read_packet_type = 0;
Timer timer;
HAL_Timer_CountdownMs(&timer, client->command_timeout_ms);
do {
if (HAL_Timer_Expired(&timer)) {
rc = QCLOUD_ERR_MQTT_REQUEST_TIMEOUT;
break;
}
rc = _cycle_for_read(client, &timer, &read_packet_type);
} while (QCLOUD_RET_SUCCESS == rc && read_packet_type != packet_type);
IOT_FUNC_EXIT_RC(rc);
}

View File

@@ -0,0 +1,8 @@
Language: Cpp
BasedOnStyle: Google
ColumnLimit: 120
DerivePointerAlignment: true
PointerAlignment: Left
SortIncludes: true
IncludeBlocks: Preserve
IndentPPDirectives: AfterHash

View File

@@ -0,0 +1,64 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file test_mqtt_client.cc
* @brief unittest for mqtt client
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-07-07
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-07-07 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>support tls test
* <tr><td>2021-07-12 <td>1.1 <td>fancyxu <td>fix connect twice in 5s error
* </table>
*/
#include "mqtt_client_test.h"
namespace mqtt_client_unittest {
void MqttClientTest::SetUp() {
LogHandleFunc func = {0};
func.log_malloc = HAL_Malloc;
func.log_free = HAL_Free;
func.log_get_current_time_str = HAL_Timer_Current;
func.log_printf = HAL_Printf;
utils_log_init(func, LOG_LEVEL_DEBUG, 2048);
ASSERT_EQ(HAL_GetDevInfo(reinterpret_cast<void *>(&device_info)), 0);
MQTTInitParams init_params = DEFAULT_MQTT_INIT_PARAMS;
init_params.device_info = &device_info;
init_params.command_timeout = QCLOUD_IOT_MQTT_COMMAND_TIMEOUT;
init_params.keep_alive_interval_ms = QCLOUD_IOT_MQTT_KEEP_ALIVE_INTERNAL;
init_params.auto_connect_enable = 1;
init_params.event_handle.h_fp = NULL;
init_params.event_handle.context = NULL;
HAL_SleepMs(5000); // for iot hub can not connect twice in 5 s
client = IOT_MQTT_Construct(&init_params);
ASSERT_NE(client, nullptr);
}
void MqttClientTest::TearDown() {
IOT_MQTT_Destroy(&client);
utils_log_deinit();
}
} // namespace mqtt_client_unittest

View File

@@ -0,0 +1,53 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file mqtt_client_test.h
* @brief
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-07-18
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-07-18 <td>1.0 <td>fancyxu <td>first commit
* </table>
*/
#ifndef IOT_HUB_DEVICE_C_SDK_SERVICES_COMMON_MQTT_CLIENT_TEST_MQTT_CLIENT_TEST_H_
#define IOT_HUB_DEVICE_C_SDK_SERVICES_COMMON_MQTT_CLIENT_TEST_MQTT_CLIENT_TEST_H_
#include "gtest/gtest.h"
#include "qcloud_iot_common.h"
namespace mqtt_client_unittest {
/**
* @brief test fixture of mqtt client
*
*/
class MqttClientTest : public testing::Test {
protected:
void SetUp() override;
void TearDown() override;
void *client = NULL;
DeviceInfo device_info;
};
} // namespace mqtt_client_unittest
#endif // IOT_HUB_DEVICE_C_SDK_SERVICES_COMMON_MQTT_CLIENT_TEST_MQTT_CLIENT_TEST_H_

View File

@@ -0,0 +1,155 @@
/**
* @copyright
*
* Tencent is pleased to support the open source community by making IoT Hub available.
* Copyright(C) 2018 - 2021 THL A29 Limited, a Tencent company.All rights reserved.
*
* Licensed under the MIT License(the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
* @file test_mqtt_client.cc
* @brief unittest for mqtt client
* @author fancyxu (fancyxu@tencent.com)
* @version 1.0
* @date 2021-07-07
*
* @par Change Log:
* <table>
* <tr><th>Date <th>Version <th>Author <th>Description
* <tr><td>2021-07-07 <td>1.0 <td>fancyxu <td>first commit
* <tr><td>2021-07-08 <td>1.1 <td>fancyxu <td>support tls test
* <tr><td>2021-07-12 <td>1.1 <td>fancyxu <td>fix connect twice in 5s error
* <tr><td>2021-07-18 <td>1.2 <td>fancyxu <td>remove MqttClientTest for common use
* </table>
*/
#include <iostream>
#include <string>
#include "mqtt_client_test.h"
namespace mqtt_client_unittest {
/**
* @brief Test subscribe.
*
*/
TEST_F(MqttClientTest, subscribe) {
int wait_cnt;
char topic_name[MAX_SIZE_OF_CLOUD_TOPIC] = {0};
HAL_Snprintf(topic_name, sizeof(topic_name), "%s/%s/data", device_info.product_id, device_info.device_name);
SubscribeParams sub_params = DEFAULT_SUB_PARAMS;
/**
* @brief QOS0
*
*/
wait_cnt = 10;
sub_params.qos = QOS0;
ASSERT_GE(IOT_MQTT_Subscribe(client, topic_name, &sub_params), 0);
while (!IOT_MQTT_IsSubReady(client, topic_name) && (wait_cnt > 0)) {
ASSERT_EQ(IOT_MQTT_Yield(client, QCLOUD_IOT_MQTT_YIELD_TIMEOUT), 0);
wait_cnt--;
}
ASSERT_NE(wait_cnt, 0);
ASSERT_GE(IOT_MQTT_Unsubscribe(client, topic_name), 0);
ASSERT_EQ(IOT_MQTT_Yield(client, QCLOUD_IOT_MQTT_YIELD_TIMEOUT), 0);
/**
* @brief QOS1
*
*/
wait_cnt = 10;
sub_params.qos = QOS1;
ASSERT_GE(IOT_MQTT_Subscribe(client, topic_name, &sub_params), 0);
while (!IOT_MQTT_IsSubReady(client, topic_name) && (wait_cnt > 0)) {
ASSERT_EQ(IOT_MQTT_Yield(client, QCLOUD_IOT_MQTT_YIELD_TIMEOUT), 0);
wait_cnt--;
}
ASSERT_NE(wait_cnt, 0);
ASSERT_GE(IOT_MQTT_Unsubscribe(client, topic_name), 0);
ASSERT_EQ(IOT_MQTT_Yield(client, QCLOUD_IOT_MQTT_YIELD_TIMEOUT), 0);
/**
* @brief sub sync
*
*/
ASSERT_GE(IOT_MQTT_SubscribeSync(client, topic_name, &sub_params), 0);
ASSERT_GE(IOT_MQTT_Unsubscribe(client, topic_name), 0);
ASSERT_EQ(IOT_MQTT_Yield(client, QCLOUD_IOT_MQTT_YIELD_TIMEOUT), 0);
}
/**
* @brief Test publish.
*
*/
TEST_F(MqttClientTest, publish) {
char topic_name[MAX_SIZE_OF_CLOUD_TOPIC] = {0};
HAL_Snprintf(topic_name, sizeof(topic_name), "%s/%s/data", device_info.product_id, device_info.device_name);
char topic_content[] = "{\"action\": \"publish_test\", \"count\": \"0\"}";
PublishParams pub_params = DEFAULT_PUB_PARAMS;
pub_params.payload = topic_content;
pub_params.payload_len = strlen(topic_content);
/**
* @brief QOS0
*
*/
pub_params.qos = QOS0;
ASSERT_GE(IOT_MQTT_Publish(client, topic_name, &pub_params), 0);
/**
* @brief QOS1
*
*/
pub_params.qos = QOS1;
ASSERT_GE(IOT_MQTT_Publish(client, topic_name, &pub_params), 0);
}
/**
* @brief Test clean session.
*
*/
TEST_F(MqttClientTest, clean_session) {
IOT_MQTT_Destroy(&client);
HAL_SleepMs(5000); // for iot hub can not connect twice in 5 s
SubscribeParams sub_params = DEFAULT_SUB_PARAMS;
char topic_name[MAX_SIZE_OF_CLOUD_TOPIC] = {0};
ASSERT_EQ(HAL_GetDevInfo(reinterpret_cast<void *>(&device_info)), 0);
HAL_Snprintf(topic_name, sizeof(topic_name), "%s/%s/data", device_info.product_id, device_info.device_name);
MQTTInitParams init_params = DEFAULT_MQTT_INIT_PARAMS;
init_params.device_info = &device_info;
init_params.clean_session = 0;
client = IOT_MQTT_Construct(&init_params);
ASSERT_NE(client, nullptr);
ASSERT_GE(IOT_MQTT_SubscribeSync(client, topic_name, &sub_params), 0);
IOT_MQTT_Destroy(&client);
HAL_SleepMs(5000);
init_params.clean_session = 0;
init_params.connect_when_construct = 0;
init_params.default_subscribe = 1;
client = IOT_MQTT_Construct(&init_params);
ASSERT_NE(client, nullptr);
ASSERT_GE(IOT_MQTT_SubscribeSync(client, topic_name, &sub_params), 0);
ASSERT_EQ(IOT_MQTT_Connect(client), 0);
}
} // namespace mqtt_client_unittest