Update revision to use TI's mqtt and Freertos.

Dependencies:   mbed client server

Fork of cc3100_Test_mqtt_CM3 by David Fletcher

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mqtt_common.cpp Source File

mqtt_common.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 /*
00017    mqtt_common.c
00018 
00019    This module implements routines that are common to both client and server
00020    components.
00021 */
00022 
00023 #include "mqtt_common.h"
00024 #include "cli_uart.h"
00025 
00026 namespace mbed_mqtt {
00027 
00028 static void free_mqp(struct mqtt_packet *mqp)
00029 {
00030         if((NULL != mqp->free) && (0 == --mqp->n_refs))
00031                 mqp->free(mqp);
00032 
00033         return;
00034 }
00035 
00036 void mqp_free(struct mqtt_packet *mqp)
00037 {
00038         free_mqp(mqp);
00039 }
00040 
00041 static void reset_mqp(struct mqtt_packet *mqp)
00042 {
00043 
00044         /* Fields not handled here are meant to be left unaltered. */
00045         mqp->msg_type = 0x00;
00046         mqp->fh_byte1 = 0x00;
00047         mqp->msg_id   = 0;
00048         mqp->fh_len   = 0x00;
00049         mqp->vh_len   = 0;
00050         mqp->pl_len   = 0;
00051         mqp->private_  = 0;
00052 }
00053 
00054 void mqp_reset(struct mqtt_packet *mqp)
00055 {
00056         reset_mqp(mqp);
00057 }
00058 
00059 void mqp_init(struct mqtt_packet *mqp, uint8_t offset)
00060 {
00061         reset_mqp(mqp);
00062 
00063         mqp->offset = offset;
00064         mqp->n_refs = 0x01;
00065         mqp->next   = NULL;
00066 }
00067 
00068 static int32_t buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8)
00069 {
00070         uint8_t *ref = buf;
00071 
00072         buf += buf_wr_nbo_2B(buf,      utf8->length);
00073         buf += buf_wr_nbytes(buf, (uint8_t*)utf8->buffer, utf8->length);
00074 
00075         return buf - ref;
00076 }
00077 
00078 int32_t mqp_buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8)
00079 {
00080         return buf_wr_utf8(buf, utf8);
00081 }
00082 
00083 #define MAX_REMLEN_BYTES  (MAX_FH_LEN - 1)
00084 
00085 static int32_t buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen)
00086 {
00087 
00088         uint8_t val[MAX_REMLEN_BYTES], i = 0;
00089 
00090         do {
00091                 val[i] = remlen & 0x7F; /* MOD 128 */
00092                 remlen = remlen >> 7;   /* DIV 128 */
00093 
00094                 if(remlen)
00095                         val[i] |= 0x80;
00096 
00097                 i++;
00098 
00099         } while(remlen > 0);
00100 
00101         buf_wr_nbytes(buf + MAX_REMLEN_BYTES - i, val, i);
00102         
00103         return i;       /* # bytes written in buf */
00104 }
00105 
00106 int32_t mqp_buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen)
00107 {
00108         return buf_tail_wr_remlen(buf, remlen);
00109 }
00110 
00111 static int32_t buf_rd_remlen(uint8_t *buf, uint32_t *remlen)
00112 {
00113         uint32_t val = 0, mul = 0;
00114         int32_t i = 0;
00115 
00116         do {
00117                 val += (buf[i] & 0x7F) << mul;
00118                 mul += 7;
00119 
00120         } while((buf[i++] & 0x80)       &&
00121                 (i < MAX_REMLEN_BYTES));
00122 
00123         *remlen = val;
00124 
00125         /* Return -1 if value was not found */
00126         return (buf[i - 1] & 0x80)? -1 : i;
00127 }
00128 
00129 int32_t mqp_buf_rd_remlen(uint8_t *buf, uint32_t *remlen)
00130 {
00131         return buf_rd_remlen(buf, remlen);
00132 }
00133 
00134 int32_t mqp_pub_append_topic(struct mqtt_packet *mqp, const struct utf8_string *topic,
00135                      uint16_t msg_id)
00136 {
00137         uint8_t *buf = MQP_VHEADER_BUF(mqp), *ref = buf;
00138 
00139         if(0 != mqp->vh_len){
00140                 Uart_Write((uint8_t*)"Topic has been already added\r\n");
00141                 return -1; /* Topic has been already added */
00142         }
00143         if(MQP_FREEBUF_LEN(mqp) < (topic + msg_id? 2 : 0)){
00144                 Uart_Write((uint8_t*)"Can't WR topic\r\n");
00145                 return MQP_ERR_PKT_LEN; /* Can't WR topic */
00146         }
00147         buf += buf_wr_utf8(buf, topic);
00148 
00149         if(0 != msg_id) {  /* MSG ID 0 indicates ==> QoS0 */
00150                 mqp->msg_id = msg_id;
00151                 buf += buf_wr_nbo_2B(buf, mqp->msg_id);
00152         }
00153 
00154         mqp->vh_len += buf - ref;
00155 
00156         return buf - ref;
00157 }
00158 
00159 int32_t mqp_pub_append_data(struct mqtt_packet *mqp, const uint8_t *data_buf,
00160                         uint32_t data_len)
00161 {
00162         uint8_t *buf = MQP_PAYLOAD_BUF(mqp) + mqp->pl_len, *ref = buf;
00163 
00164         if(0 == mqp->vh_len)
00165                 return -1;    /* Must include topic first */
00166 
00167         if(MQP_FREEBUF_LEN(mqp) < data_len)
00168                 return MQP_ERR_PKT_LEN; /* Can't WR  data */
00169 
00170         /* Including payload info for PUBLISH */
00171         buf += buf_wr_nbytes(buf, data_buf, data_len);
00172         mqp->pl_len += buf - ref;
00173 
00174         return buf - ref;
00175 }
00176 
00177 static bool proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
00178 {
00179         uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
00180 
00181         if(mqp_raw->pl_len < 2)
00182                 return false;    /* Bytes for MSG ID not available */
00183 
00184         buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
00185         mqp_raw->vh_len += 2;
00186         mqp_raw->pl_len -= 2;
00187 
00188         return true;
00189 }
00190 
00191 bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
00192 {
00193         return proc_vh_msg_id_rx(mqp_raw);
00194 }
00195 
00196 bool mqp_proc_msg_id_ack_rx(struct mqtt_packet *mqp_raw, bool has_pl)
00197 {
00198         if((false == proc_vh_msg_id_rx(mqp_raw)) ||
00199            (has_pl ^ (!!mqp_raw->pl_len)))
00200                 return false;
00201 
00202         return true;
00203 }
00204 
00205 bool mqp_proc_pub_rx(struct mqtt_packet *mqp_raw)
00206 {
00207         uint8_t *buf = MQP_VHEADER_BUF(mqp_raw), *ref = buf;
00208         uint16_t tmp = 0;
00209 
00210         if(mqp_raw->pl_len < (buf - ref + 2))    /* Length Check  */
00211                 return false;      /* No space to hold Topic size */
00212 
00213         buf += buf_rd_nbo_2B(buf, &tmp);         /* Topic  Length */
00214         buf += tmp;                              /* Topic Content */
00215 
00216         if(MQTT_QOS0 != ENUM_QOS(mqp_raw->fh_byte1)) {
00217                 if(mqp_raw->pl_len < (buf - ref + 2))
00218                         return false;  /* No space to hold MSG ID */
00219 
00220                 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
00221         }
00222 
00223         mqp_raw->vh_len += buf - ref;
00224         mqp_raw->pl_len -= buf - ref;
00225 
00226         return true;
00227 }
00228 
00229 bool mqp_ack_wlist_append(struct mqtt_ack_wlist *list,
00230                           struct mqtt_packet    *elem)
00231 {
00232         elem->next = NULL;
00233 
00234         if(list->tail) {
00235                 list->tail->next = elem;
00236                 list->tail = elem;
00237         } else {
00238                 list->tail = elem;
00239                 list->head = elem;
00240         }
00241 
00242         return true;
00243 }
00244 
00245 struct mqtt_packet *mqp_ack_wlist_remove(struct mqtt_ack_wlist *list,
00246                                           uint16_t msg_id)
00247 {
00248         struct mqtt_packet *elem = list->head, *prev = NULL;
00249 
00250         while(elem) {
00251                 if(msg_id == elem->msg_id) {
00252                         if(prev)
00253                                 prev->next = elem->next;
00254                         else
00255                                 list->head = elem->next;
00256 
00257                         if(NULL == list->head)
00258                                 list->tail = NULL;
00259 
00260                         if(list->tail == elem)
00261                                 list->tail = prev;
00262 
00263                         break;
00264                 }
00265 
00266                 prev = elem;
00267                 elem = elem->next;
00268         }
00269 
00270         return elem;
00271 }
00272 
00273 void mqp_ack_wlist_purge(struct mqtt_ack_wlist *list)
00274 {
00275 
00276         struct mqtt_packet *elem = list->head;
00277 
00278         while(elem) {
00279                 struct mqtt_packet *next = elem->next;
00280                 free_mqp(elem);
00281                 elem = next;
00282         }
00283 
00284         list->head = NULL;
00285         list->tail = NULL;
00286 
00287         return;
00288 }
00289 
00290 void secure_conn_struct_init(struct secure_conn *nw_security)
00291 {
00292         nw_security->method = nw_security->cipher = NULL; 
00293         nw_security->files  = NULL;
00294         nw_security->n_file = 0;
00295 
00296         return;
00297 }
00298 
00299 int32_t mqp_prep_fh(struct mqtt_packet *mqp, uint8_t flags)
00300 {
00301 
00302         uint32_t remlen = mqp->vh_len + mqp->pl_len;
00303         uint8_t *buf    = mqp->buffer + mqp->offset;
00304         uint8_t *ref    = buf;
00305 
00306         buf -= buf_tail_wr_remlen(buf - MAX_REMLEN_BYTES, remlen);
00307 
00308         buf -= 1; /* Make space for FH Byte1 */        
00309         mqp->fh_byte1 = *buf = MAKE_FH_BYTE1(mqp->msg_type, flags);
00310 
00311         mqp->fh_len   = ref - buf;
00312         mqp->offset  -= ref - buf;
00313         
00314         return ref - buf;
00315 }
00316 
00317 /*
00318     Since, the network connection is a TCP socket stream, it is imperative that
00319     adequate checks are put in place to identify a MQTT packet and isolate it
00320     for further processing. The intent of following routine is to read just one
00321     packet from continuous stream and leave rest for the next iteration.
00322 */
00323 
00324 #define RET_IF_ERR_IN_NET_RECV(net, buf, len, wait_secs, timed_out, ctx)\
00325         rv = net_ops->recv(net, buf, len, wait_secs, timed_out, ctx);   \
00326         if(rv < 1)                                                      \
00327                 return MQP_ERR_NETWORK;
00328 
00329 int32_t mqp_recv(int32_t net, const struct device_net_services *net_ops,
00330              struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out,
00331              void *ctx)
00332 {
00333         uint8_t *buf = MQP_FHEADER_BUF(mqp), *ref = buf, fh_len = mqp->fh_len;
00334         uint32_t pl_len = mqp->pl_len, remlen = 0;
00335         int32_t rv;
00336 
00337         while(2 > fh_len) {
00338                 RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs,
00339                                        timed_out, ctx);
00340 
00341                 mqp->fh_len = ++fh_len;
00342         }
00343 
00344         while(buf[fh_len - 1] & 0x80) {
00345                 if(fh_len > MAX_FH_LEN)
00346                         return MQP_ERR_NOT_DEF;/* Shouldn't happen */
00347                 
00348                  RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs,
00349                                         timed_out, ctx);
00350 
00351                  mqp->fh_len = ++fh_len;
00352         }
00353 
00354         mqp_buf_rd_remlen(buf + 1, &remlen);
00355         if(mqp->maxlen < (remlen + fh_len))
00356                 return MQP_ERR_PKT_LEN;  /* Inadequate free buffer */
00357 
00358         buf += fh_len;     /* Try to read all data that follows FH */
00359         while(pl_len < remlen) {
00360                RET_IF_ERR_IN_NET_RECV(net, buf + pl_len, remlen - pl_len,
00361                                       wait_secs, timed_out, ctx);
00362 
00363                mqp->pl_len = pl_len += rv;
00364         }
00365 
00366         /* Set up MQTT Packet for received data from broker */
00367         buf_wr_nbytes(MQP_FHEADER_BUF(mqp), ref, fh_len);
00368         mqp->fh_byte1 = *ref;
00369         mqp->msg_type = MSG_TYPE(*ref);
00370 
00371         return fh_len + remlen;
00372 }
00373 
00374 /*----------------------------------------------------------------------------
00375  * QoS2 PUB RX Message handling mechanism and associated house-keeping
00376  *--------------------------------------------------------------------------*/
00377 void qos2_pub_cq_reset(struct pub_qos2_cq *cq)
00378 {
00379 
00380         //        memset(cq->id_vec, 0, sizeof(cq->id_vec));
00381         buf_set((uint8_t*)cq->id_vec, 0, sizeof(cq->id_vec));
00382         cq->n_free = MAX_PUBREL_INFLT;
00383         cq->rd_idx = 0;
00384         cq->wr_idx = 0;
00385 
00386         return;
00387 }
00388 
00389 bool qos2_pub_cq_logup(struct pub_qos2_cq *cq, uint16_t msg_id)
00390 {
00391         if(0 != cq->n_free) {
00392                 cq->id_vec[cq->wr_idx++]  = msg_id;
00393                 cq->wr_idx &= MAX_PUBREL_INFLT - 1;
00394                 cq->n_free--;
00395                 return true;
00396         }
00397 
00398         /* Must ensure through an out-of-band arrangement that a remote doesn't
00399            push more than MAX_PUBREL_INFLT in-flight QOS2 PUB packets to local.
00400            If it still happens that the local receives more in-flight messages,
00401            then it has no option but to close the network connection.
00402 
00403            Closing of network connection is left to invoker to of this service.
00404         */
00405 
00406         return false;
00407 }
00408 
00409 bool qos2_pub_cq_unlog(struct pub_qos2_cq *cq, uint16_t msg_id)
00410 {
00411         /* Simple implementation. Should be good for embedded system */
00412         if(cq->n_free < MAX_PUBREL_INFLT) {
00413                 /* Follow-up messages for QOS2 PUB must be transacted in the
00414                    same order as the initial sequence of QOS2 PUB dispatches.
00415                    Therefore, checking the first entry should be OK
00416                 */
00417                 if(msg_id == cq->id_vec[cq->rd_idx++]) {
00418                         cq->rd_idx &= MAX_PUBREL_INFLT - 1;
00419                         cq->n_free++;
00420                         return true;
00421                 }
00422         }
00423 
00424         return false;
00425 }
00426 
00427 bool qos2_pub_cq_check(struct pub_qos2_cq *cq, uint16_t msg_id)
00428 {
00429         uint8_t rd_idx = cq->rd_idx;
00430         uint8_t n_free = cq->n_free;
00431         uint8_t i = 0;
00432 
00433         /* Go through the vector to see if packet ID is still active */
00434         for(i = 0; i < (MAX_PUBREL_INFLT - n_free); i++) {
00435                     if(msg_id == cq->id_vec[rd_idx++]) 
00436                             return true;
00437 
00438                     rd_idx &= MAX_PUBREL_INFLT - 1;
00439         }
00440 
00441         return false;
00442 }
00443 
00444 /*----------------------------------------------------------------------------
00445  * Client Context related details and associated house-keeping
00446  *--------------------------------------------------------------------------*/
00447 void cl_ctx_reset(struct client_ctx *cl_ctx)
00448 {
00449         cl_ctx->usr       = NULL;
00450         cl_ctx->net       = -1;
00451         cl_ctx->ip_length = 16;
00452 
00453         cl_ctx->timeout   = 0;
00454         cl_ctx->ka_secs   = 0;
00455 
00456         cl_ctx->flags     = 0;
00457 
00458         return;
00459 }
00460 
00461 void cl_ctx_timeout_insert(struct client_ctx **head, struct client_ctx *elem)
00462 {
00463 
00464         struct client_ctx *curr, *prev = NULL;
00465 
00466         if(NULL == *head) {
00467                 *head = elem;
00468                 return;
00469         }
00470 
00471         curr = *head;
00472         while(curr) {
00473                 if(elem->timeout < curr->timeout) {
00474                         if(NULL == prev) {
00475                                 elem->next = *head;
00476                                 *head = elem;
00477                         } else {
00478                                 prev->next = elem;
00479                                 elem->next = curr;
00480                         }
00481 
00482                         break;
00483                 }
00484                 
00485                 prev = curr;
00486                 curr = curr->next;
00487         }
00488 
00489         if(NULL == curr)
00490                 prev->next = elem;
00491 }
00492 
00493 void cl_ctx_remove(struct client_ctx **head, struct client_ctx *elem)
00494 {
00495         struct client_ctx *curr = *head, *prev = NULL;
00496 
00497         while(curr) {
00498                 if(curr == elem) {
00499                         if(NULL == prev) {
00500                                 *head = elem->next;
00501                                 prev  = *head;
00502                         } else {
00503                                 prev->next = elem->next;
00504                         }
00505 
00506                         elem->next = NULL;
00507                         break;
00508                 }
00509 
00510                 prev = curr;
00511                 curr = curr->next;
00512         }
00513 }
00514 
00515 void cl_ctx_timeout_update(struct client_ctx *cl_ctx, uint32_t now_secs)
00516 {
00517         uint32_t timeout = KA_TIMEOUT_NONE;
00518         uint16_t ka_secs = cl_ctx->ka_secs;
00519 
00520         if((0 == ka_secs) && (KA_TIMEOUT_NONE == cl_ctx->timeout))
00521                 return;
00522 
00523         if(0 != ka_secs)
00524                 timeout = now_secs + ka_secs;
00525 
00526         cl_ctx->timeout = timeout;
00527 
00528         return;
00529 }
00530 
00531 }//namespace mbed_mqtt 
00532