MqttKit.c 35 KB


  1. /**
  2. ************************************************************
  3. ************************************************************
  4. ************************************************************
  5. * 文件名: MqttKit.c
  6. *
  7. * 作者: 张继瑞
  8. *
  9. * 日期: 2018-04-27
  10. *
  11. * 版本: V1.6
  12. *
  13. * 说明: MQTT协议
  14. *
  15. * 修改记录: V1.1:解决MQTT_PacketSubscribe订阅不为2个topic
  16. * 个数时协议错误的bug
  17. * V1.2:修复MQTT_PacketCmdResp的bug
  18. * V1.3:将strncpy替换为memcpy,解决潜在bug
  19. * V1.4:修复 MQTT_PacketPublishAck
  20. * MQTT_PacketPublishRel
  21. * 函数封包错误的bug
  22. * V1.5:增加 MQTT_UnPacketCmd
  23. * MQTT_UnPacketPublish
  24. * 接口对消息内容长度的提取参数
  25. * V1.6:增加二进制文件上传接口
  26. ************************************************************
  27. ************************************************************
  28. ************************************************************
  29. **/
  30. #include "main.h"
  31. //#include "MqttKit.h"
  32. //#include <string.h>
  33. //#include <stdio.h>
  34. #define CMD_TOPIC_PREFIX "$creq"
  35. //==========================================================
  36. // 函数名称: EDP_NewBuffer
  37. //
  38. // 函数功能: 申请内存
  39. //
  40. // 入口参数: edpPacket:包结构体
  41. // size:大小
  42. //
  43. // 返回参数: 无
  44. //
  45. // 说明: 1.可使用动态分配来分配内存
  46. // 2.可使用局部或全局数组来指定内存
  47. //==========================================================
  48. void MQTT_NewBuffer(MQTT_PACKET_STRUCTURE *mqttPacket, uint32 size)
  49. {
  50. uint32 i = 0;
  51. if(mqttPacket->_data == NULL)
  52. {
  53. mqttPacket->_memFlag = MEM_FLAG_ALLOC;
  54. mqttPacket->_data = (uint8 *)MQTT_MallocBuffer(size);
  55. if(mqttPacket->_data != NULL)
  56. {
  57. mqttPacket->_len = 0;
  58. mqttPacket->_size = size;
  59. for(; i < mqttPacket->_size; i++)
  60. mqttPacket->_data[i] = 0;
  61. }
  62. }
  63. else
  64. {
  65. mqttPacket->_memFlag = MEM_FLAG_STATIC;
  66. for(; i < mqttPacket->_size; i++)
  67. mqttPacket->_data[i] = 0;
  68. mqttPacket->_len = 0;
  69. if(mqttPacket->_size < size)
  70. mqttPacket->_data = NULL;
  71. }
  72. }
  73. //==========================================================
  74. // 函数名称: MQTT_DeleteBuffer
  75. //
  76. // 函数功能: 释放数据内存
  77. //
  78. // 入口参数: edpPacket:包结构体
  79. //
  80. // 返回参数: 无
  81. //
  82. // 说明:
  83. //==========================================================
  84. void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket)
  85. {
  86. if(mqttPacket->_memFlag == MEM_FLAG_ALLOC)
  87. MQTT_FreeBuffer(mqttPacket->_data);
  88. mqttPacket->_data = NULL;
  89. mqttPacket->_len = 0;
  90. mqttPacket->_size = 0;
  91. mqttPacket->_memFlag = MEM_FLAG_NULL;
  92. }
  93. int32 MQTT_DumpLength(size_t len, uint8 *buf)
  94. {
  95. int32 i = 0;
  96. for(i = 1; i <= 4; ++i)
  97. {
  98. *buf = len % 128;
  99. len >>= 7;
  100. if(len > 0)
  101. {
  102. *buf |= 128;
  103. ++buf;
  104. }
  105. else
  106. {
  107. return i;
  108. }
  109. }
  110. return -1;
  111. }
  112. int32 MQTT_ReadLength(const uint8 *stream, int32 size, uint32 *len)
  113. {
  114. int32 i;
  115. const uint8 *in = stream;
  116. uint32 multiplier = 1;
  117. *len = 0;
  118. for(i = 0; i < size; ++i)
  119. {
  120. *len += (in[i] & 0x7f) * multiplier;
  121. if(!(in[i] & 0x80))
  122. {
  123. return i + 1;
  124. }
  125. multiplier <<= 7;
  126. if(multiplier >= 2097152) //128 * *128 * *128
  127. {
  128. return -2; // error, out of range
  129. }
  130. }
  131. return -1; // not complete
  132. }
  133. //==========================================================
  134. // 函数名称: MQTT_UnPacketRecv
  135. //
  136. // 函数功能: MQTT数据接收类型判断
  137. //
  138. // 入口参数: dataPtr:接收的数据指针
  139. //
  140. // 返回参数: 0-成功 其他-失败原因
  141. //
  142. // 说明:
  143. //==========================================================
  144. uint8 MQTT_UnPacketRecv(uint8 *dataPtr)
  145. {
  146. uint8 status = 255;
  147. uint8 type = dataPtr[0] >> 4; //类型检查
  148. if(type < 1 || type > 14)
  149. return status;
  150. if(type == MQTT_PKT_PUBLISH)
  151. {
  152. uint8 *msgPtr;
  153. uint32 remain_len = 0;
  154. msgPtr = dataPtr + MQTT_ReadLength(dataPtr + 1, 4, &remain_len) + 1;
  155. if(remain_len < 2 || dataPtr[0] & 0x01) //retain
  156. return 255;
  157. if(remain_len < ((uint16)msgPtr[0] << 8 | msgPtr[1]) + 2)
  158. return 255;
  159. if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下发
  160. status = MQTT_PKT_CMD;
  161. else
  162. status = MQTT_PKT_PUBLISH;
  163. }
  164. else
  165. status = type;
  166. return status;
  167. }
  168. //==========================================================
  169. // 函数名称: MQTT_PacketConnect
  170. //
  171. // 函数功能: 连接消息组包
  172. //
  173. // 入口参数: user:用户名:产品ID
  174. // password:密码:鉴权信息或apikey
  175. // devid:设备ID
  176. // cTime:连接保持时间
  177. // clean_session:离线消息清除标志
  178. // qos:重发标志
  179. // will_topic:异常离线topic
  180. // will_msg:异常离线消息
  181. // will_retain:消息推送标志
  182. // mqttPacket:包指针
  183. //
  184. // 返回参数: 0-成功 其他-失败
  185. //
  186. // 说明:
  187. //==========================================================
  188. uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,
  189. uint16 cTime, uint1 clean_session, uint1 qos,
  190. const int8 *will_topic, const int8 *will_msg, int32 will_retain,
  191. MQTT_PACKET_STRUCTURE *mqttPacket)
  192. {
  193. uint8 flags = 0;
  194. uint8 will_topic_len = 0;
  195. uint16 total_len = 15;
  196. int16 len = 0, devid_len = strlen(devid);
  197. if(!devid)
  198. return 1;
  199. total_len += devid_len + 2;
  200. //断线后,是否清理离线消息:1-清理 0-不清理--------------------------------------------
  201. if(clean_session)
  202. {
  203. flags |= MQTT_CONNECT_CLEAN_SESSION;
  204. }
  205. //异常掉线情况下,服务器发布的topic------------------------------------------------------
  206. if(will_topic)
  207. {
  208. flags |= MQTT_CONNECT_WILL_FLAG;
  209. will_topic_len = strlen(will_topic);
  210. total_len += 4 + will_topic_len + strlen(will_msg);
  211. }
  212. //qos级别--主要用于PUBLISH(发布态)消息的,保证消息传递的次数-----------------------------
  213. switch((unsigned char)qos)
  214. {
  215. case MQTT_QOS_LEVEL0:
  216. flags |= MQTT_CONNECT_WILL_QOS0; //最多一次
  217. break;
  218. case MQTT_QOS_LEVEL1:
  219. flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS1); //最少一次
  220. break;
  221. case MQTT_QOS_LEVEL2:
  222. flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS2); //只有一次
  223. break;
  224. default:
  225. return 2;
  226. }
  227. //主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了
  228. if(will_retain)
  229. {
  230. flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_RETAIN);
  231. }
  232. //账号为空 密码为空---------------------------------------------------------------------
  233. if(!user || !password)
  234. {
  235. return 3;
  236. }
  237. flags |= MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSORD;
  238. total_len += strlen(user) + strlen(password) + 4;
  239. //分配内存-----------------------------------------------------------------------------
  240. MQTT_NewBuffer(mqttPacket, total_len);
  241. if(mqttPacket->_data == NULL)
  242. return 4;
  243. memset(mqttPacket->_data, 0, total_len);
  244. /*************************************固定头部***********************************************/
  245. //固定头部----------------------连接请求类型---------------------------------------------
  246. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_CONNECT << 4;
  247. //固定头部----------------------剩余长度值-----------------------------------------------
  248. len = MQTT_DumpLength(total_len - 5, mqttPacket->_data + mqttPacket->_len);
  249. if(len < 0)
  250. {
  251. MQTT_DeleteBuffer(mqttPacket);
  252. return 5;
  253. }
  254. else
  255. mqttPacket->_len += len;
  256. /*************************************可变头部***********************************************/
  257. //可变头部----------------------协议名长度 和 协议名--------------------------------------
  258. mqttPacket->_data[mqttPacket->_len++] = 0;
  259. mqttPacket->_data[mqttPacket->_len++] = 4;
  260. mqttPacket->_data[mqttPacket->_len++] = 'M';
  261. mqttPacket->_data[mqttPacket->_len++] = 'Q';
  262. mqttPacket->_data[mqttPacket->_len++] = 'T';
  263. mqttPacket->_data[mqttPacket->_len++] = 'T';
  264. //可变头部----------------------protocol level 4-----------------------------------------
  265. mqttPacket->_data[mqttPacket->_len++] = 4;
  266. //可变头部----------------------连接标志(该函数开头处理的数据)-----------------------------
  267. mqttPacket->_data[mqttPacket->_len++] = flags;
  268. //可变头部----------------------保持连接的时间(秒)----------------------------------------
  269. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(cTime);
  270. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(cTime);
  271. /*************************************消息体************************************************/
  272. //消息体----------------------------devid长度、devid-------------------------------------
  273. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(devid_len);
  274. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(devid_len);
  275. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, devid, devid_len);
  276. mqttPacket->_len += devid_len;
  277. //消息体----------------------------will_flag 和 will_msg---------------------------------
  278. if(flags & MQTT_CONNECT_WILL_FLAG)
  279. {
  280. unsigned short mLen = 0;
  281. if(!will_msg)
  282. will_msg = "";
  283. mLen = strlen(will_topic);
  284. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);
  285. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);
  286. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_topic, mLen);
  287. mqttPacket->_len += mLen;
  288. mLen = strlen(will_msg);
  289. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);
  290. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);
  291. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_msg, mLen);
  292. mqttPacket->_len += mLen;
  293. }
  294. //消息体----------------------------use---------------------------------------------------
  295. if(flags & MQTT_CONNECT_USER_NAME)
  296. {
  297. unsigned short user_len = strlen(user);
  298. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(user_len);
  299. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(user_len);
  300. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, user, user_len);
  301. mqttPacket->_len += user_len;
  302. }
  303. //消息体----------------------------password----------------------------------------------
  304. if(flags & MQTT_CONNECT_PASSORD)
  305. {
  306. unsigned short psw_len = strlen(password);
  307. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(psw_len);
  308. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(psw_len);
  309. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, password, psw_len);
  310. mqttPacket->_len += psw_len;
  311. }
  312. return 0;
  313. }
  314. //==========================================================
  315. // 函数名称: MQTT_PacketDisConnect
  316. //
  317. // 函数功能: 断开连接消息组包
  318. //
  319. // 入口参数: mqttPacket:包指针
  320. //
  321. // 返回参数: 0-成功 1-失败
  322. //
  323. // 说明:
  324. //==========================================================
  325. uint1 MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket)
  326. {
  327. MQTT_NewBuffer(mqttPacket, 2);
  328. if(mqttPacket->_data == NULL)
  329. return 1;
  330. /*************************************固定头部***********************************************/
  331. //固定头部----------------------头部消息-------------------------------------------------
  332. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_DISCONNECT << 4;
  333. //固定头部----------------------剩余长度值-----------------------------------------------
  334. mqttPacket->_data[mqttPacket->_len++] = 0;
  335. return 0;
  336. }
  337. //==========================================================
  338. // 函数名称: MQTT_UnPacketConnectAck
  339. //
  340. // 函数功能: 连接消息解包
  341. //
  342. // 入口参数: rev_data:接收的数据
  343. //
  344. // 返回参数: 1、255-失败 其他-平台的返回码
  345. //
  346. // 说明:
  347. //==========================================================
  348. uint8 MQTT_UnPacketConnectAck(uint8 *rev_data)
  349. {
  350. if(rev_data[1] != 2)
  351. return 1;
  352. if(rev_data[2] == 0 || rev_data[2] == 1)
  353. return rev_data[3];
  354. else
  355. return 255;
  356. }
  357. //==========================================================
  358. // 函数名称: MQTT_PacketSaveData
  359. //
  360. // 函数功能: 数据点上传组包
  361. //
  362. // 入口参数: devid:设备ID(可为空)
  363. // send_buf:json缓存buf
  364. // send_len:json总长
  365. // type_bin_head:bin文件的消息头
  366. // type:类型
  367. //
  368. // 返回参数: 0-成功 1-失败
  369. //
  370. // 说明:
  371. //==========================================================
  372. uint1 MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket)
  373. {
  374. if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", NULL, send_len + 3, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0)
  375. {
  376. mqttPacket->_data[mqttPacket->_len++] = type; //类型
  377. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(send_len);
  378. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(send_len);
  379. }
  380. else
  381. return 1;
  382. return 0;
  383. }
  384. //==========================================================
  385. // 函数名称: MQTT_PacketSaveBinData
  386. //
  387. // 函数功能: 为禁止文件上传组包
  388. //
  389. // 入口参数: name:数据流名字
  390. // file_len:文件长度
  391. // mqttPacket:包指针
  392. //
  393. // 返回参数: 0-成功 1-失败
  394. //
  395. // 说明:
  396. //==========================================================
  397. uint1 MQTT_PacketSaveBinData(const int8 *name, int16 file_len, MQTT_PACKET_STRUCTURE *mqttPacket)
  398. {
  399. uint1 result = 1;
  400. int8 *bin_head = NULL;
  401. uint8 bin_head_len = 0;
  402. int8 *payload = NULL;
  403. int32 payload_size = 0;
  404. bin_head = (int8 *)MQTT_MallocBuffer(13 + strlen(name));
  405. if(bin_head == NULL)
  406. return result;
  407. sprintf(bin_head, "{\"ds_id\":\"%s\"}", name);
  408. bin_head_len = strlen(bin_head);
  409. payload_size = 7 + bin_head_len + file_len;
  410. payload = (int8 *)MQTT_MallocBuffer(payload_size - file_len);
  411. if(payload == NULL)
  412. {
  413. MQTT_FreeBuffer(bin_head);
  414. return result;
  415. }
  416. payload[0] = 2; //类型
  417. payload[1] = MOSQ_MSB(bin_head_len);
  418. payload[2] = MOSQ_LSB(bin_head_len);
  419. memcpy(payload + 3, bin_head, bin_head_len);
  420. payload[bin_head_len + 3] = (file_len >> 24) & 0xFF;
  421. payload[bin_head_len + 4] = (file_len >> 16) & 0xFF;
  422. payload[bin_head_len + 5] = (file_len >> 8) & 0xFF;
  423. payload[bin_head_len + 6] = file_len & 0xFF;
  424. if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", payload, payload_size, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0)
  425. result = 0;
  426. MQTT_FreeBuffer(bin_head);
  427. MQTT_FreeBuffer(payload);
  428. return result;
  429. }
  430. //==========================================================
  431. // 函数名称: MQTT_UnPacketCmd
  432. //
  433. // 函数功能: 命令下发解包
  434. //
  435. // 入口参数: rev_data:接收的数据指针
  436. // cmdid:cmdid-uuid
  437. // req:命令
  438. //
  439. // 返回参数: 0-成功 其他-失败原因
  440. //
  441. // 说明:
  442. //==========================================================
  443. uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len)
  444. {
  445. int8 *dataPtr = strchr((int8 *)rev_data + 6, '/'); //加6是跳过头信息
  446. uint32 remain_len = 0;
  447. if(dataPtr == NULL) //未找到'/'
  448. return 1;
  449. dataPtr++; //跳过'/'
  450. MQTT_ReadLength(rev_data + 1, 4, &remain_len); //读取剩余字节
  451. *cmdid = (int8 *)MQTT_MallocBuffer(37); //cmdid固定36字节,多分配一个结束符的位置
  452. if(*cmdid == NULL)
  453. return 2;
  454. memset(*cmdid, 0, 37); //全部清零
  455. memcpy(*cmdid, (const int8 *)dataPtr, 36); //复制cmdid
  456. dataPtr += 36;
  457. *req_len = remain_len - 44; //命令长度 = 剩余长度(remain_len) - 2 - 5($creq) - 1(\) - cmdid长度
  458. *req = (int8 *)MQTT_MallocBuffer(*req_len + 1); //分配命令长度+1
  459. if(*req == NULL)
  460. {
  461. MQTT_FreeBuffer(*cmdid);
  462. return 3;
  463. }
  464. memset(*req, 0, *req_len + 1); //清零
  465. memcpy(*req, (const int8 *)dataPtr, *req_len); //复制命令
  466. return 0;
  467. }
  468. //==========================================================
  469. // 函数名称: MQTT_PacketCmdResp
  470. //
  471. // 函数功能: 命令回复组包
  472. //
  473. // 入口参数: cmdid:cmdid
  474. // req:命令
  475. // mqttPacket:包指针
  476. //
  477. // 返回参数: 0-成功 1-失败
  478. //
  479. // 说明:
  480. //==========================================================
  481. uint1 MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket)
  482. {
  483. uint16 cmdid_len = strlen(cmdid);
  484. uint16 req_len = strlen(req);
  485. _Bool status = 0;
  486. int8 *payload = MQTT_MallocBuffer(cmdid_len + 6);
  487. if(payload == NULL)
  488. return 1;
  489. memset(payload, 0, cmdid_len + 6);
  490. memcpy(payload, "$crsp/", 6);
  491. strncat(payload, cmdid, cmdid_len);
  492. if(MQTT_PacketPublish(MQTT_PUBLISH_ID, payload, req, strlen(req), MQTT_QOS_LEVEL0, 0, 1, mqttPacket) == 0)
  493. status = 0;
  494. else
  495. status = 1;
  496. MQTT_FreeBuffer(payload);
  497. return status;
  498. }
  499. //==========================================================
  500. // 函数名称: MQTT_PacketSubscribe
  501. //
  502. // 函数功能: Subscribe消息组包
  503. //
  504. // 入口参数: pkt_id:pkt_id
  505. // qos:消息重发次数
  506. // topics:订阅的消息
  507. // topics_cnt:订阅的消息个数
  508. // mqttPacket:包指针
  509. //
  510. // 返回参数: 0-成功 其他-失败
  511. //
  512. // 说明:
  513. //==========================================================
  514. uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
  515. {
  516. uint32 topic_len = 0, remain_len = 0;
  517. int16 len = 0;
  518. uint8 i = 0;
  519. if(pkt_id == 0)
  520. return 1;
  521. //计算topic长度-------------------------------------------------------------------------
  522. for(; i < topics_cnt; i++)
  523. {
  524. if(topics[i] == NULL)
  525. return 2;
  526. topic_len += strlen(topics[i]);
  527. }
  528. //2 bytes packet id + topic filter(2 bytes topic + topic length + 1 byte reserve)------
  529. remain_len = 2 + 3 * topics_cnt + topic_len;
  530. //分配内存------------------------------------------------------------------------------
  531. MQTT_NewBuffer(mqttPacket, remain_len + 5);
  532. if(mqttPacket->_data == NULL)
  533. return 3;
  534. /*************************************固定头部***********************************************/
  535. //固定头部----------------------头部消息-------------------------------------------------
  536. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_SUBSCRIBE << 4 | 0x02;
  537. //固定头部----------------------剩余长度值-----------------------------------------------
  538. len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
  539. if(len < 0)
  540. {
  541. MQTT_DeleteBuffer(mqttPacket);
  542. return 4;
  543. }
  544. else
  545. mqttPacket->_len += len;
  546. /*************************************payload***********************************************/
  547. //payload----------------------pkt_id---------------------------------------------------
  548. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
  549. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
  550. //payload----------------------topic_name-----------------------------------------------
  551. for(i = 0; i < topics_cnt; i++)
  552. {
  553. topic_len = strlen(topics[i]);
  554. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
  555. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
  556. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
  557. mqttPacket->_len += topic_len;
  558. mqttPacket->_data[mqttPacket->_len++] = qos & 0xFF;
  559. }
  560. return 0;
  561. }
  562. //==========================================================
  563. // 函数名称: MQTT_UnPacketSubscrebe
  564. //
  565. // 函数功能: Subscribe的回复消息解包
  566. //
  567. // 入口参数: rev_data:接收到的信息
  568. //
  569. // 返回参数: 0-成功 其他-失败
  570. //
  571. // 说明:
  572. //==========================================================
  573. uint8 MQTT_UnPacketSubscribe(uint8 *rev_data)
  574. {
  575. uint8 result = 255;
  576. if(rev_data[2] == MOSQ_MSB(MQTT_SUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_SUBSCRIBE_ID))
  577. {
  578. switch(rev_data[4])
  579. {
  580. case 0x00:
  581. case 0x01:
  582. case 0x02:
  583. //MQTT Subscribe OK
  584. result = 0;
  585. break;
  586. case 0x80:
  587. //MQTT Subscribe Failed
  588. result = 1;
  589. break;
  590. default:
  591. //MQTT Subscribe UnKnown Err
  592. result = 2;
  593. break;
  594. }
  595. }
  596. return result;
  597. }
  598. //==========================================================
  599. // 函数名称: MQTT_PacketUnSubscribe
  600. //
  601. // 函数功能: UnSubscribe消息组包
  602. //
  603. // 入口参数: pkt_id:pkt_id
  604. // qos:消息重发次数
  605. // topics:订阅的消息
  606. // topics_cnt:订阅的消息个数
  607. // mqttPacket:包指针
  608. //
  609. // 返回参数: 0-成功 其他-失败
  610. //
  611. // 说明:
  612. //==========================================================
  613. uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
  614. {
  615. uint32 topic_len = 0, remain_len = 0;
  616. int16 len = 0;
  617. uint8 i = 0;
  618. if(pkt_id == 0)
  619. return 1;
  620. //计算topic长度-------------------------------------------------------------------------
  621. for(; i < topics_cnt; i++)
  622. {
  623. if(topics[i] == NULL)
  624. return 2;
  625. topic_len += strlen(topics[i]);
  626. }
  627. //2 bytes packet id, 2 bytes topic length + topic + 1 byte reserve---------------------
  628. remain_len = 2 + (topics_cnt << 1) + topic_len;
  629. //分配内存------------------------------------------------------------------------------
  630. MQTT_NewBuffer(mqttPacket, remain_len + 5);
  631. if(mqttPacket->_data == NULL)
  632. return 3;
  633. /*************************************固定头部***********************************************/
  634. //固定头部----------------------头部消息-------------------------------------------------
  635. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_UNSUBSCRIBE << 4 | 0x02;
  636. //固定头部----------------------剩余长度值-----------------------------------------------
  637. len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
  638. if(len < 0)
  639. {
  640. MQTT_DeleteBuffer(mqttPacket);
  641. return 4;
  642. }
  643. else
  644. mqttPacket->_len += len;
  645. /*************************************payload***********************************************/
  646. //payload----------------------pkt_id---------------------------------------------------
  647. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
  648. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
  649. //payload----------------------topic_name-----------------------------------------------
  650. for(i = 0; i < topics_cnt; i++)
  651. {
  652. topic_len = strlen(topics[i]);
  653. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
  654. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
  655. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
  656. mqttPacket->_len += topic_len;
  657. }
  658. return 0;
  659. }
  660. //==========================================================
  661. // 函数名称: MQTT_UnPacketUnSubscribe
  662. //
  663. // 函数功能: UnSubscribe的回复消息解包
  664. //
  665. // 入口参数: rev_data:接收到的信息
  666. //
  667. // 返回参数: 0-成功 其他-失败
  668. //
  669. // 说明:
  670. //==========================================================
  671. uint1 MQTT_UnPacketUnSubscribe(uint8 *rev_data)
  672. {
  673. uint1 result = 1;
  674. if(rev_data[2] == MOSQ_MSB(MQTT_UNSUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_UNSUBSCRIBE_ID))
  675. {
  676. result = 0;
  677. }
  678. return result;
  679. }
  680. //==========================================================
  681. // 函数名称: MQTT_PacketPublish
  682. //
  683. // 函数功能: Pulish消息组包
  684. //
  685. // 入口参数: pkt_id:pkt_id
  686. // topic:发布的topic
  687. // payload:消息体
  688. // payload_len:消息体长度
  689. // qos:重发次数
  690. // retain:离线消息推送
  691. // own:
  692. // mqttPacket:包指针
  693. //
  694. // 返回参数: 0-成功 其他-失败
  695. //
  696. // 说明:
  697. //==========================================================
  698. uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,
  699. const int8 *payload, uint32 payload_len,
  700. enum MqttQosLevel qos, int32 retain, int32 own,
  701. MQTT_PACKET_STRUCTURE *mqttPacket)
  702. {
  703. uint32 total_len = 0, topic_len = 0;
  704. uint32 data_len = 0;
  705. int32 len = 0;
  706. uint8 flags = 0;
  707. //pkt_id检查----------------------------------------------------------------------------
  708. if(pkt_id == 0)
  709. return 1;
  710. //$dp为系统上传数据点的指令--------------------------------------------------------------
  711. for(topic_len = 0; topic[topic_len] != '\0'; ++topic_len)
  712. {
  713. if((topic[topic_len] == '#') || (topic[topic_len] == '+'))
  714. return 2;
  715. }
  716. //Publish消息---------------------------------------------------------------------------
  717. flags |= MQTT_PKT_PUBLISH << 4;
  718. //retain标志----------------------------------------------------------------------------
  719. if(retain)
  720. flags |= 0x01;
  721. //总长度--------------------------------------------------------------------------------
  722. total_len = topic_len + payload_len + 2;
  723. //qos级别--主要用于PUBLISH(发布态)消息的,保证消息传递的次数-----------------------------
  724. switch(qos)
  725. {
  726. case MQTT_QOS_LEVEL0:
  727. flags |= MQTT_CONNECT_WILL_QOS0; //最多一次
  728. break;
  729. case MQTT_QOS_LEVEL1:
  730. flags |= 0x02; //最少一次
  731. total_len += 2;
  732. break;
  733. case MQTT_QOS_LEVEL2:
  734. flags |= 0x04; //只有一次
  735. total_len += 2;
  736. break;
  737. default:
  738. return 3;
  739. }
  740. //分配内存------------------------------------------------------------------------------
  741. if(payload[0] == 2)
  742. {
  743. uint32 data_len_t = 0;
  744. while(payload[data_len_t++] != '}');
  745. data_len_t -= 3;
  746. data_len = data_len_t + 7;
  747. data_len_t = payload_len - data_len;
  748. MQTT_NewBuffer(mqttPacket, total_len + 3 - data_len_t);
  749. if(mqttPacket->_data == NULL)
  750. return 4;
  751. memset(mqttPacket->_data, 0, total_len + 3 - data_len_t);
  752. }
  753. else
  754. {
  755. MQTT_NewBuffer(mqttPacket, total_len + 3);
  756. if(mqttPacket->_data == NULL)
  757. return 4;
  758. memset(mqttPacket->_data, 0, total_len + 3);
  759. }
  760. /*************************************固定头部***********************************************/
  761. //固定头部----------------------头部消息-------------------------------------------------
  762. mqttPacket->_data[mqttPacket->_len++] = flags;
  763. //固定头部----------------------剩余长度值-----------------------------------------------
  764. len = MQTT_DumpLength(total_len, mqttPacket->_data + mqttPacket->_len);
  765. if(len < 0)
  766. {
  767. MQTT_DeleteBuffer(mqttPacket);
  768. return 5;
  769. }
  770. else
  771. mqttPacket->_len += len;
  772. /*************************************可变头部***********************************************/
  773. //可变头部----------------------写入topic长度、topic-------------------------------------
  774. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
  775. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
  776. strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topic, topic_len);
  777. mqttPacket->_len += topic_len;
  778. if(qos != MQTT_QOS_LEVEL0)
  779. {
  780. mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
  781. mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
  782. }
  783. //可变头部----------------------写入payload----------------------------------------------
  784. if(payload != NULL)
  785. {
  786. if(payload[0] == 2)
  787. {
  788. memcpy((int8 *)mqttPacket->_data + mqttPacket->_len, payload, data_len);
  789. mqttPacket->_len += data_len;
  790. }
  791. else
  792. {
  793. memcpy((int8 *)mqttPacket->_data + mqttPacket->_len, payload, payload_len);
  794. mqttPacket->_len += payload_len;
  795. }
  796. }
  797. return 0;
  798. }
  799. //==========================================================
  800. // 函数名称: MQTT_UnPacketPublish
  801. //
  802. // 函数功能: Publish消息解包
  803. //
  804. // 入口参数: flags:MQTT相关标志信息
  805. // pkt:指向可变头部
  806. // size:固定头部中的剩余长度信息
  807. //
  808. // 返回参数: 0-成功 其他-失败原因
  809. //
  810. // 说明:
  811. //==========================================================
  812. uint8 MQTT_UnPacketPublish(uint8 *rev_data, int8 **topic, uint16 *topic_len, int8 **payload, uint16 *payload_len, uint8 *qos, uint16 *pkt_id)
  813. {
  814. const int8 flags = rev_data[0] & 0x0F;
  815. uint8 *msgPtr;
  816. uint32 remain_len = 0;
  817. const int8 dup = flags & 0x08;
  818. *qos = (flags & 0x06) >> 1;
  819. msgPtr = rev_data + MQTT_ReadLength(rev_data + 1, 4, &remain_len) + 1;
  820. if(remain_len < 2 || flags & 0x01) //retain
  821. return 255;
  822. *topic_len = (uint16)msgPtr[0] << 8 | msgPtr[1];
  823. if(remain_len < *topic_len + 2)
  824. return 255;
  825. if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下发
  826. return MQTT_PKT_CMD;
  827. switch(*qos)
  828. {
  829. case MQTT_QOS_LEVEL0: // qos0 have no packet identifier
  830. if(0 != dup)
  831. return 255;
  832. *topic = MQTT_MallocBuffer(*topic_len + 1); //为topic分配内存
  833. if(*topic == NULL)
  834. return 255;
  835. memset(*topic, 0, *topic_len + 1);
  836. memcpy(*topic, (int8 *)msgPtr + 2, *topic_len); //复制数据
  837. *payload_len = remain_len - 2 - *topic_len; //为payload分配内存
  838. *payload = MQTT_MallocBuffer(*payload_len + 1);
  839. if(*payload == NULL) //如果失败
  840. {
  841. MQTT_FreeBuffer(*topic); //则需要把topic的内存释放掉
  842. return 255;
  843. }
  844. memset(*payload, 0, *payload_len + 1);
  845. memcpy(*payload, (int8 *)msgPtr + 2 + *topic_len, *payload_len);
  846. break;
  847. case MQTT_QOS_LEVEL1:
  848. case MQTT_QOS_LEVEL2:
  849. if(*topic_len + 2 > remain_len)
  850. return 255;
  851. *pkt_id = (uint16)msgPtr[*topic_len + 2] << 8 | msgPtr[*topic_len + 3];
  852. if(pkt_id == 0)
  853. return 255;
  854. *topic = MQTT_MallocBuffer(*topic_len + 1); //为topic分配内存
  855. if(*topic == NULL)
  856. return 255;
  857. memset(*topic, 0, *topic_len + 1);
  858. memcpy(*topic, (int8 *)msgPtr + 2, *topic_len); //复制数据
  859. *payload_len = remain_len - 4 - *topic_len;
  860. *payload = MQTT_MallocBuffer(*payload_len + 1); //为payload分配内存
  861. if(*payload == NULL) //如果失败
  862. {
  863. MQTT_FreeBuffer(*topic); //则需要把topic的内存释放掉
  864. return 255;
  865. }
  866. memset(*payload, 0, *payload_len + 1);
  867. memcpy(*payload, (int8 *)msgPtr + 4 + *topic_len, *payload_len);
  868. break;
  869. default:
  870. return 255;
  871. }
  872. if(strchr((int8 *)topic, '+') || strchr((int8 *)topic, '#'))
  873. return 255;
  874. return 0;
  875. }
  876. //==========================================================
  877. // 函数名称: MQTT_PacketPublishAck
  878. //
  879. // 函数功能: Publish Ack消息组包
  880. //
  881. // 入口参数: pkt_id:packet id
  882. // mqttPacket:包指针
  883. //
  884. // 返回参数: 0-成功 1-失败原因
  885. //
  886. // 说明: 当收到的Publish消息的QoS等级为1时,需要Ack回复
  887. //==========================================================
  888. uint1 MQTT_PacketPublishAck(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
  889. {
  890. MQTT_NewBuffer(mqttPacket, 4);
  891. if(mqttPacket->_data == NULL)
  892. return 1;
  893. /*************************************固定头部***********************************************/
  894. //固定头部----------------------头部消息-------------------------------------------------
  895. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBACK << 4;
  896. //固定头部----------------------剩余长度-------------------------------------------------
  897. mqttPacket->_data[mqttPacket->_len++] = 2;
  898. /*************************************可变头部***********************************************/
  899. //可变头部----------------------pkt_id长度-----------------------------------------------
  900. mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
  901. mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
  902. return 0;
  903. }
  904. //==========================================================
  905. // 函数名称: MQTT_UnPacketPublishAck
  906. //
  907. // 函数功能: Publish Ack消息解包
  908. //
  909. // 入口参数: rev_data:收到的数据
  910. //
  911. // 返回参数: 0-成功 1-失败原因
  912. //
  913. // 说明:
  914. //==========================================================
  915. uint1 MQTT_UnPacketPublishAck(uint8 *rev_data)
  916. {
  917. if(rev_data[1] != 2)
  918. return 1;
  919. if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))
  920. return 0;
  921. else
  922. return 1;
  923. }
  924. //==========================================================
  925. // 函数名称: MQTT_PacketPublishRec
  926. //
  927. // 函数功能: Publish Rec消息组包
  928. //
  929. // 入口参数: pkt_id:packet id
  930. // mqttPacket:包指针
  931. //
  932. // 返回参数: 0-成功 1-失败原因
  933. //
  934. // 说明: 当收到的Publish消息的QoS等级为2时,先收到rec
  935. //==========================================================
  936. uint1 MQTT_PacketPublishRec(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
  937. {
  938. MQTT_NewBuffer(mqttPacket, 4);
  939. if(mqttPacket->_data == NULL)
  940. return 1;
  941. /*************************************固定头部***********************************************/
  942. //固定头部----------------------头部消息-------------------------------------------------
  943. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREC << 4;
  944. //固定头部----------------------剩余长度-------------------------------------------------
  945. mqttPacket->_data[mqttPacket->_len++] = 2;
  946. /*************************************可变头部***********************************************/
  947. //可变头部----------------------pkt_id长度-----------------------------------------------
  948. mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
  949. mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
  950. return 0;
  951. }
  952. //==========================================================
  953. // 函数名称: MQTT_UnPacketPublishRec
  954. //
  955. // 函数功能: Publish Rec消息解包
  956. //
  957. // 入口参数: rev_data:接收到的数据
  958. //
  959. // 返回参数: 0-成功 1-失败
  960. //
  961. // 说明:
  962. //==========================================================
  963. uint1 MQTT_UnPacketPublishRec(uint8 *rev_data)
  964. {
  965. if(rev_data[1] != 2)
  966. return 1;
  967. if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))
  968. return 0;
  969. else
  970. return 1;
  971. }
  972. //==========================================================
  973. // 函数名称: MQTT_PacketPublishRel
  974. //
  975. // 函数功能: Publish Rel消息组包
  976. //
  977. // 入口参数: pkt_id:packet id
  978. // mqttPacket:包指针
  979. //
  980. // 返回参数: 0-成功 1-失败原因
  981. //
  982. // 说明: 当收到的Publish消息的QoS等级为2时,先收到rec,再回复rel
  983. //==========================================================
  984. uint1 MQTT_PacketPublishRel(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
  985. {
  986. MQTT_NewBuffer(mqttPacket, 4);
  987. if(mqttPacket->_data == NULL)
  988. return 1;
  989. /*************************************固定头部***********************************************/
  990. //固定头部----------------------头部消息-------------------------------------------------
  991. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBREL << 4 | 0x02;
  992. //固定头部----------------------剩余长度-------------------------------------------------
  993. mqttPacket->_data[mqttPacket->_len++] = 2;
  994. /*************************************可变头部***********************************************/
  995. //可变头部----------------------pkt_id长度-----------------------------------------------
  996. mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
  997. mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
  998. return 0;
  999. }
  1000. //==========================================================
  1001. // 函数名称: MQTT_UnPacketPublishRel
  1002. //
  1003. // 函数功能: Publish Rel消息解包
  1004. //
  1005. // 入口参数: rev_data:接收到的数据
  1006. //
  1007. // 返回参数: 0-成功 1-失败
  1008. //
  1009. // 说明:
  1010. //==========================================================
  1011. uint1 MQTT_UnPacketPublishRel(uint8 *rev_data, uint16 pkt_id)
  1012. {
  1013. if(rev_data[1] != 2)
  1014. return 1;
  1015. if(rev_data[2] == MOSQ_MSB(pkt_id) && rev_data[3] == MOSQ_LSB(pkt_id))
  1016. return 0;
  1017. else
  1018. return 1;
  1019. }
  1020. //==========================================================
  1021. // 函数名称: MQTT_PacketPublishComp
  1022. //
  1023. // 函数功能: Publish Comp消息组包
  1024. //
  1025. // 入口参数: pkt_id:packet id
  1026. // mqttPacket:包指针
  1027. //
  1028. // 返回参数: 0-成功 1-失败原因
  1029. //
  1030. // 说明: 当收到的Publish消息的QoS等级为2时,先收到rec,再回复rel
  1031. //==========================================================
  1032. uint1 MQTT_PacketPublishComp(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket)
  1033. {
  1034. MQTT_NewBuffer(mqttPacket, 4);
  1035. if(mqttPacket->_data == NULL)
  1036. return 1;
  1037. /*************************************固定头部***********************************************/
  1038. //固定头部----------------------头部消息-------------------------------------------------
  1039. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PUBCOMP << 4;
  1040. //固定头部----------------------剩余长度-------------------------------------------------
  1041. mqttPacket->_data[mqttPacket->_len++] = 2;
  1042. /*************************************可变头部***********************************************/
  1043. //可变头部----------------------pkt_id长度-----------------------------------------------
  1044. mqttPacket->_data[mqttPacket->_len++] = pkt_id >> 8;
  1045. mqttPacket->_data[mqttPacket->_len++] = pkt_id & 0xff;
  1046. return 0;
  1047. }
  1048. //==========================================================
  1049. // 函数名称: MQTT_UnPacketPublishComp
  1050. //
  1051. // 函数功能: Publish Comp消息解包
  1052. //
  1053. // 入口参数: rev_data:接收到的数据
  1054. //
  1055. // 返回参数: 0-成功 1-失败
  1056. //
  1057. // 说明:
  1058. //==========================================================
  1059. uint1 MQTT_UnPacketPublishComp(uint8 *rev_data)
  1060. {
  1061. if(rev_data[1] != 2)
  1062. return 1;
  1063. if(rev_data[2] == MOSQ_MSB(MQTT_PUBLISH_ID) && rev_data[3] == MOSQ_LSB(MQTT_PUBLISH_ID))
  1064. return 0;
  1065. else
  1066. return 1;
  1067. }
  1068. //==========================================================
  1069. // 函数名称: MQTT_PacketPing
  1070. //
  1071. // 函数功能: 心跳请求组包
  1072. //
  1073. // 入口参数: mqttPacket:包指针
  1074. //
  1075. // 返回参数: 0-成功 1-失败
  1076. //
  1077. // 说明:
  1078. //==========================================================
  1079. uint1 MQTT_PacketPing(MQTT_PACKET_STRUCTURE *mqttPacket)
  1080. {
  1081. MQTT_NewBuffer(mqttPacket, 2);
  1082. if(mqttPacket->_data == NULL)
  1083. return 1;
  1084. /*************************************固定头部***********************************************/
  1085. //固定头部----------------------头部消息-------------------------------------------------
  1086. mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_PINGREQ << 4;
  1087. //固定头部----------------------剩余长度-------------------------------------------------
  1088. mqttPacket->_data[mqttPacket->_len++] = 0;
  1089. return 0;
  1090. }