David Fletcher
/
cc3100_Test_mqtt_CM4F
TI's MQTT Demo with freertos CM4F
Embed:
(wiki syntax)
Show/hide line numbers
mqtt_common.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 /* 00017 mqtt_common.c 00018 00019 This module implements routines that are common to both client and server 00020 components. 00021 */ 00022 00023 #include "mqtt_common.h" 00024 #include "cli_uart.h" 00025 00026 namespace mbed_mqtt { 00027 00028 static void free_mqp(struct mqtt_packet *mqp) 00029 { 00030 if((NULL != mqp->free) && (0 == --mqp->n_refs)) 00031 mqp->free(mqp); 00032 00033 return; 00034 } 00035 00036 void mqp_free(struct mqtt_packet *mqp) 00037 { 00038 free_mqp(mqp); 00039 } 00040 00041 static void reset_mqp(struct mqtt_packet *mqp) 00042 { 00043 00044 /* Fields not handled here are meant to be left unaltered. */ 00045 mqp->msg_type = 0x00; 00046 mqp->fh_byte1 = 0x00; 00047 mqp->msg_id = 0; 00048 mqp->fh_len = 0x00; 00049 mqp->vh_len = 0; 00050 mqp->pl_len = 0; 00051 mqp->private_ = 0; 00052 } 00053 00054 void mqp_reset(struct mqtt_packet *mqp) 00055 { 00056 reset_mqp(mqp); 00057 } 00058 00059 void mqp_init(struct mqtt_packet *mqp, uint8_t offset) 00060 { 00061 reset_mqp(mqp); 00062 00063 mqp->offset = offset; 00064 mqp->n_refs = 0x01; 00065 mqp->next = NULL; 00066 } 00067 00068 static int32_t buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8) 00069 { 00070 uint8_t *ref = buf; 00071 00072 buf += buf_wr_nbo_2B(buf, utf8->length); 00073 buf += buf_wr_nbytes(buf, (uint8_t*)utf8->buffer, utf8->length); 00074 00075 return buf - ref; 00076 } 00077 00078 int32_t mqp_buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8) 00079 { 00080 return buf_wr_utf8(buf, utf8); 00081 } 00082 00083 #define MAX_REMLEN_BYTES (MAX_FH_LEN - 1) 00084 00085 static int32_t buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen) 00086 { 00087 00088 uint8_t val[MAX_REMLEN_BYTES], i = 0; 00089 00090 do { 00091 val[i] = remlen & 0x7F; /* MOD 128 */ 00092 remlen = remlen >> 7; /* DIV 128 */ 00093 00094 if(remlen) 00095 val[i] |= 0x80; 00096 00097 i++; 00098 00099 } while(remlen > 0); 00100 00101 buf_wr_nbytes(buf + MAX_REMLEN_BYTES - i, val, i); 00102 00103 return i; /* # bytes written in buf */ 00104 } 00105 00106 int32_t mqp_buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen) 00107 { 00108 return buf_tail_wr_remlen(buf, remlen); 00109 } 00110 00111 static int32_t buf_rd_remlen(uint8_t *buf, uint32_t *remlen) 00112 { 00113 uint32_t val = 0, mul = 0; 00114 int32_t i = 0; 00115 00116 do { 00117 val += (buf[i] & 0x7F) << mul; 00118 mul += 7; 00119 00120 } while((buf[i++] & 0x80) && 00121 (i < MAX_REMLEN_BYTES)); 00122 00123 *remlen = val; 00124 00125 /* Return -1 if value was not found */ 00126 return (buf[i - 1] & 0x80)? -1 : i; 00127 } 00128 00129 int32_t mqp_buf_rd_remlen(uint8_t *buf, uint32_t *remlen) 00130 { 00131 return buf_rd_remlen(buf, remlen); 00132 } 00133 00134 int32_t mqp_pub_append_topic(struct mqtt_packet *mqp, const struct utf8_string *topic, 00135 uint16_t msg_id) 00136 { 00137 uint8_t *buf = MQP_VHEADER_BUF(mqp), *ref = buf; 00138 00139 if(0 != mqp->vh_len){ 00140 Uart_Write((uint8_t*)"Topic has been already added\r\n"); 00141 return -1; /* Topic has been already added */ 00142 } 00143 if(MQP_FREEBUF_LEN(mqp) < (topic + msg_id? 2 : 0)){ 00144 Uart_Write((uint8_t*)"Can't WR topic\r\n"); 00145 return MQP_ERR_PKT_LEN; /* Can't WR topic */ 00146 } 00147 buf += buf_wr_utf8(buf, topic); 00148 00149 if(0 != msg_id) { /* MSG ID 0 indicates ==> QoS0 */ 00150 mqp->msg_id = msg_id; 00151 buf += buf_wr_nbo_2B(buf, mqp->msg_id); 00152 } 00153 00154 mqp->vh_len += buf - ref; 00155 00156 return buf - ref; 00157 } 00158 00159 int32_t mqp_pub_append_data(struct mqtt_packet *mqp, const uint8_t *data_buf, 00160 uint32_t data_len) 00161 { 00162 uint8_t *buf = MQP_PAYLOAD_BUF(mqp) + mqp->pl_len, *ref = buf; 00163 00164 if(0 == mqp->vh_len) 00165 return -1; /* Must include topic first */ 00166 00167 if(MQP_FREEBUF_LEN(mqp) < data_len) 00168 return MQP_ERR_PKT_LEN; /* Can't WR data */ 00169 00170 /* Including payload info for PUBLISH */ 00171 buf += buf_wr_nbytes(buf, data_buf, data_len); 00172 mqp->pl_len += buf - ref; 00173 00174 return buf - ref; 00175 } 00176 00177 static bool proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) 00178 { 00179 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); 00180 00181 if(mqp_raw->pl_len < 2) 00182 return false; /* Bytes for MSG ID not available */ 00183 00184 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); 00185 mqp_raw->vh_len += 2; 00186 mqp_raw->pl_len -= 2; 00187 00188 return true; 00189 } 00190 00191 bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) 00192 { 00193 return proc_vh_msg_id_rx(mqp_raw); 00194 } 00195 00196 bool mqp_proc_msg_id_ack_rx(struct mqtt_packet *mqp_raw, bool has_pl) 00197 { 00198 if((false == proc_vh_msg_id_rx(mqp_raw)) || 00199 (has_pl ^ (!!mqp_raw->pl_len))) 00200 return false; 00201 00202 return true; 00203 } 00204 00205 bool mqp_proc_pub_rx(struct mqtt_packet *mqp_raw) 00206 { 00207 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw), *ref = buf; 00208 uint16_t tmp = 0; 00209 00210 if(mqp_raw->pl_len < (buf - ref + 2)) /* Length Check */ 00211 return false; /* No space to hold Topic size */ 00212 00213 buf += buf_rd_nbo_2B(buf, &tmp); /* Topic Length */ 00214 buf += tmp; /* Topic Content */ 00215 00216 if(MQTT_QOS0 != ENUM_QOS(mqp_raw->fh_byte1)) { 00217 if(mqp_raw->pl_len < (buf - ref + 2)) 00218 return false; /* No space to hold MSG ID */ 00219 00220 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); 00221 } 00222 00223 mqp_raw->vh_len += buf - ref; 00224 mqp_raw->pl_len -= buf - ref; 00225 00226 return true; 00227 } 00228 00229 bool mqp_ack_wlist_append(struct mqtt_ack_wlist *list, 00230 struct mqtt_packet *elem) 00231 { 00232 elem->next = NULL; 00233 00234 if(list->tail) { 00235 list->tail->next = elem; 00236 list->tail = elem; 00237 } else { 00238 list->tail = elem; 00239 list->head = elem; 00240 } 00241 00242 return true; 00243 } 00244 00245 struct mqtt_packet *mqp_ack_wlist_remove(struct mqtt_ack_wlist *list, 00246 uint16_t msg_id) 00247 { 00248 struct mqtt_packet *elem = list->head, *prev = NULL; 00249 00250 while(elem) { 00251 if(msg_id == elem->msg_id) { 00252 if(prev) 00253 prev->next = elem->next; 00254 else 00255 list->head = elem->next; 00256 00257 if(NULL == list->head) 00258 list->tail = NULL; 00259 00260 if(list->tail == elem) 00261 list->tail = prev; 00262 00263 break; 00264 } 00265 00266 prev = elem; 00267 elem = elem->next; 00268 } 00269 00270 return elem; 00271 } 00272 00273 void mqp_ack_wlist_purge(struct mqtt_ack_wlist *list) 00274 { 00275 00276 struct mqtt_packet *elem = list->head; 00277 00278 while(elem) { 00279 struct mqtt_packet *next = elem->next; 00280 free_mqp(elem); 00281 elem = next; 00282 } 00283 00284 list->head = NULL; 00285 list->tail = NULL; 00286 00287 return; 00288 } 00289 00290 void secure_conn_struct_init(struct secure_conn *nw_security) 00291 { 00292 nw_security->method = nw_security->cipher = NULL; 00293 nw_security->files = NULL; 00294 nw_security->n_file = 0; 00295 00296 return; 00297 } 00298 00299 int32_t mqp_prep_fh(struct mqtt_packet *mqp, uint8_t flags) 00300 { 00301 00302 uint32_t remlen = mqp->vh_len + mqp->pl_len; 00303 uint8_t *buf = mqp->buffer + mqp->offset; 00304 uint8_t *ref = buf; 00305 00306 buf -= buf_tail_wr_remlen(buf - MAX_REMLEN_BYTES, remlen); 00307 00308 buf -= 1; /* Make space for FH Byte1 */ 00309 mqp->fh_byte1 = *buf = MAKE_FH_BYTE1(mqp->msg_type, flags); 00310 00311 mqp->fh_len = ref - buf; 00312 mqp->offset -= ref - buf; 00313 00314 return ref - buf; 00315 } 00316 00317 /* 00318 Since, the network connection is a TCP socket stream, it is imperative that 00319 adequate checks are put in place to identify a MQTT packet and isolate it 00320 for further processing. The intent of following routine is to read just one 00321 packet from continuous stream and leave rest for the next iteration. 00322 */ 00323 00324 #define RET_IF_ERR_IN_NET_RECV(net, buf, len, wait_secs, timed_out, ctx)\ 00325 rv = net_ops->recv(net, buf, len, wait_secs, timed_out, ctx); \ 00326 if(rv < 1) \ 00327 return MQP_ERR_NETWORK; 00328 00329 int32_t mqp_recv(int32_t net, const struct device_net_services *net_ops, 00330 struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out, 00331 void *ctx) 00332 { 00333 uint8_t *buf = MQP_FHEADER_BUF(mqp), *ref = buf, fh_len = mqp->fh_len; 00334 uint32_t pl_len = mqp->pl_len, remlen = 0; 00335 int32_t rv; 00336 00337 while(2 > fh_len) { 00338 RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs, 00339 timed_out, ctx); 00340 00341 mqp->fh_len = ++fh_len; 00342 } 00343 00344 while(buf[fh_len - 1] & 0x80) { 00345 if(fh_len > MAX_FH_LEN) 00346 return MQP_ERR_NOT_DEF;/* Shouldn't happen */ 00347 00348 RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs, 00349 timed_out, ctx); 00350 00351 mqp->fh_len = ++fh_len; 00352 } 00353 00354 mqp_buf_rd_remlen(buf + 1, &remlen); 00355 if(mqp->maxlen < (remlen + fh_len)) 00356 return MQP_ERR_PKT_LEN; /* Inadequate free buffer */ 00357 00358 buf += fh_len; /* Try to read all data that follows FH */ 00359 while(pl_len < remlen) { 00360 RET_IF_ERR_IN_NET_RECV(net, buf + pl_len, remlen - pl_len, 00361 wait_secs, timed_out, ctx); 00362 00363 mqp->pl_len = pl_len += rv; 00364 } 00365 00366 /* Set up MQTT Packet for received data from broker */ 00367 buf_wr_nbytes(MQP_FHEADER_BUF(mqp), ref, fh_len); 00368 mqp->fh_byte1 = *ref; 00369 mqp->msg_type = MSG_TYPE(*ref); 00370 00371 return fh_len + remlen; 00372 } 00373 00374 /*---------------------------------------------------------------------------- 00375 * QoS2 PUB RX Message handling mechanism and associated house-keeping 00376 *--------------------------------------------------------------------------*/ 00377 void qos2_pub_cq_reset(struct pub_qos2_cq *cq) 00378 { 00379 00380 // memset(cq->id_vec, 0, sizeof(cq->id_vec)); 00381 buf_set((uint8_t*)cq->id_vec, 0, sizeof(cq->id_vec)); 00382 cq->n_free = MAX_PUBREL_INFLT; 00383 cq->rd_idx = 0; 00384 cq->wr_idx = 0; 00385 00386 return; 00387 } 00388 00389 bool qos2_pub_cq_logup(struct pub_qos2_cq *cq, uint16_t msg_id) 00390 { 00391 if(0 != cq->n_free) { 00392 cq->id_vec[cq->wr_idx++] = msg_id; 00393 cq->wr_idx &= MAX_PUBREL_INFLT - 1; 00394 cq->n_free--; 00395 return true; 00396 } 00397 00398 /* Must ensure through an out-of-band arrangement that a remote doesn't 00399 push more than MAX_PUBREL_INFLT in-flight QOS2 PUB packets to local. 00400 If it still happens that the local receives more in-flight messages, 00401 then it has no option but to close the network connection. 00402 00403 Closing of network connection is left to invoker to of this service. 00404 */ 00405 00406 return false; 00407 } 00408 00409 bool qos2_pub_cq_unlog(struct pub_qos2_cq *cq, uint16_t msg_id) 00410 { 00411 /* Simple implementation. Should be good for embedded system */ 00412 if(cq->n_free < MAX_PUBREL_INFLT) { 00413 /* Follow-up messages for QOS2 PUB must be transacted in the 00414 same order as the initial sequence of QOS2 PUB dispatches. 00415 Therefore, checking the first entry should be OK 00416 */ 00417 if(msg_id == cq->id_vec[cq->rd_idx++]) { 00418 cq->rd_idx &= MAX_PUBREL_INFLT - 1; 00419 cq->n_free++; 00420 return true; 00421 } 00422 } 00423 00424 return false; 00425 } 00426 00427 bool qos2_pub_cq_check(struct pub_qos2_cq *cq, uint16_t msg_id) 00428 { 00429 uint8_t rd_idx = cq->rd_idx; 00430 uint8_t n_free = cq->n_free; 00431 uint8_t i = 0; 00432 00433 /* Go through the vector to see if packet ID is still active */ 00434 for(i = 0; i < (MAX_PUBREL_INFLT - n_free); i++) { 00435 if(msg_id == cq->id_vec[rd_idx++]) 00436 return true; 00437 00438 rd_idx &= MAX_PUBREL_INFLT - 1; 00439 } 00440 00441 return false; 00442 } 00443 00444 /*---------------------------------------------------------------------------- 00445 * Client Context related details and associated house-keeping 00446 *--------------------------------------------------------------------------*/ 00447 void cl_ctx_reset(struct client_ctx *cl_ctx) 00448 { 00449 cl_ctx->usr = NULL; 00450 cl_ctx->net = -1; 00451 cl_ctx->ip_length = 16; 00452 00453 cl_ctx->timeout = 0; 00454 cl_ctx->ka_secs = 0; 00455 00456 cl_ctx->flags = 0; 00457 00458 return; 00459 } 00460 00461 void cl_ctx_timeout_insert(struct client_ctx **head, struct client_ctx *elem) 00462 { 00463 00464 struct client_ctx *curr, *prev = NULL; 00465 00466 if(NULL == *head) { 00467 *head = elem; 00468 return; 00469 } 00470 00471 curr = *head; 00472 while(curr) { 00473 if(elem->timeout < curr->timeout) { 00474 if(NULL == prev) { 00475 elem->next = *head; 00476 *head = elem; 00477 } else { 00478 prev->next = elem; 00479 elem->next = curr; 00480 } 00481 00482 break; 00483 } 00484 00485 prev = curr; 00486 curr = curr->next; 00487 } 00488 00489 if(NULL == curr) 00490 prev->next = elem; 00491 } 00492 00493 void cl_ctx_remove(struct client_ctx **head, struct client_ctx *elem) 00494 { 00495 struct client_ctx *curr = *head, *prev = NULL; 00496 00497 while(curr) { 00498 if(curr == elem) { 00499 if(NULL == prev) { 00500 *head = elem->next; 00501 prev = *head; 00502 } else { 00503 prev->next = elem->next; 00504 } 00505 00506 elem->next = NULL; 00507 break; 00508 } 00509 00510 prev = curr; 00511 curr = curr->next; 00512 } 00513 } 00514 00515 void cl_ctx_timeout_update(struct client_ctx *cl_ctx, uint32_t now_secs) 00516 { 00517 uint32_t timeout = KA_TIMEOUT_NONE; 00518 uint16_t ka_secs = cl_ctx->ka_secs; 00519 00520 if((0 == ka_secs) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) 00521 return; 00522 00523 if(0 != ka_secs) 00524 timeout = now_secs + ka_secs; 00525 00526 cl_ctx->timeout = timeout; 00527 00528 return; 00529 } 00530 00531 }//namespace mbed_mqtt 00532
Generated on Wed Jul 13 2022 09:55:38 by 1.7.2