Update revision to use TI's mqtt and Freertos.

Dependencies:   mbed client server

Fork of cc3100_Test_mqtt_CM3 by David Fletcher

mqtt_V1/common/mqtt_common.cpp

Committer:
dflet
Date:
2015-09-03
Revision:
3:a8c249046181

File content as of revision 3:a8c249046181:

/******************************************************************************
*
*   Copyright (C) 2014 Texas Instruments Incorporated
*
*   All rights reserved. Property of Texas Instruments Incorporated.
*   Restricted rights to use, duplicate or disclose this code are
*   granted through contract.
*
*   The program may not be used without the written permission of
*   Texas Instruments Incorporated or against the terms and conditions
*   stipulated in the agreement under which this program has been supplied,
*   and under no circumstances can it be used with non-TI connectivity device.
*
******************************************************************************/

/*
   mqtt_common.c

   This module implements routines that are common to both client and server
   components.
*/

#include "mqtt_common.h"
#include "cli_uart.h"

namespace mbed_mqtt {

static void free_mqp(struct mqtt_packet *mqp)
{
        if((NULL != mqp->free) && (0 == --mqp->n_refs))
                mqp->free(mqp);

        return;
}

void mqp_free(struct mqtt_packet *mqp)
{
        free_mqp(mqp);
}

static void reset_mqp(struct mqtt_packet *mqp)
{

        /* Fields not handled here are meant to be left unaltered. */
        mqp->msg_type = 0x00;
        mqp->fh_byte1 = 0x00;
        mqp->msg_id   = 0;
        mqp->fh_len   = 0x00;
        mqp->vh_len   = 0;
        mqp->pl_len   = 0;
        mqp->private_  = 0;
}

void mqp_reset(struct mqtt_packet *mqp)
{
        reset_mqp(mqp);
}

void mqp_init(struct mqtt_packet *mqp, uint8_t offset)
{
        reset_mqp(mqp);

        mqp->offset = offset;
        mqp->n_refs = 0x01;
        mqp->next   = NULL;
}

static int32_t buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8)
{
        uint8_t *ref = buf;

        buf += buf_wr_nbo_2B(buf,      utf8->length);
        buf += buf_wr_nbytes(buf, (uint8_t*)utf8->buffer, utf8->length);

        return buf - ref;
}

int32_t mqp_buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8)
{
        return buf_wr_utf8(buf, utf8);
}

#define MAX_REMLEN_BYTES  (MAX_FH_LEN - 1)

static int32_t buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen)
{

        uint8_t val[MAX_REMLEN_BYTES], i = 0;

        do {
                val[i] = remlen & 0x7F; /* MOD 128 */
                remlen = remlen >> 7;   /* DIV 128 */

                if(remlen)
                        val[i] |= 0x80;

                i++;

        } while(remlen > 0);

        buf_wr_nbytes(buf + MAX_REMLEN_BYTES - i, val, i);
        
        return i;       /* # bytes written in buf */
}

int32_t mqp_buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen)
{
        return buf_tail_wr_remlen(buf, remlen);
}

static int32_t buf_rd_remlen(uint8_t *buf, uint32_t *remlen)
{
        uint32_t val = 0, mul = 0;
        int32_t i = 0;

        do {
                val += (buf[i] & 0x7F) << mul;
                mul += 7;

        } while((buf[i++] & 0x80)       &&
                (i < MAX_REMLEN_BYTES));

        *remlen = val;

        /* Return -1 if value was not found */
        return (buf[i - 1] & 0x80)? -1 : i;
}

int32_t mqp_buf_rd_remlen(uint8_t *buf, uint32_t *remlen)
{
        return buf_rd_remlen(buf, remlen);
}

int32_t mqp_pub_append_topic(struct mqtt_packet *mqp, const struct utf8_string *topic,
                     uint16_t msg_id)
{
        uint8_t *buf = MQP_VHEADER_BUF(mqp), *ref = buf;

        if(0 != mqp->vh_len){
                Uart_Write((uint8_t*)"Topic has been already added\r\n");
                return -1; /* Topic has been already added */
        }
        if(MQP_FREEBUF_LEN(mqp) < (topic + msg_id? 2 : 0)){
                Uart_Write((uint8_t*)"Can't WR topic\r\n");
                return MQP_ERR_PKT_LEN; /* Can't WR topic */
        }
        buf += buf_wr_utf8(buf, topic);

        if(0 != msg_id) {  /* MSG ID 0 indicates ==> QoS0 */
                mqp->msg_id = msg_id;
                buf += buf_wr_nbo_2B(buf, mqp->msg_id);
        }

        mqp->vh_len += buf - ref;

        return buf - ref;
}

int32_t mqp_pub_append_data(struct mqtt_packet *mqp, const uint8_t *data_buf,
                        uint32_t data_len)
{
        uint8_t *buf = MQP_PAYLOAD_BUF(mqp) + mqp->pl_len, *ref = buf;

        if(0 == mqp->vh_len)
                return -1;    /* Must include topic first */

        if(MQP_FREEBUF_LEN(mqp) < data_len)
                return MQP_ERR_PKT_LEN; /* Can't WR  data */

        /* Including payload info for PUBLISH */
        buf += buf_wr_nbytes(buf, data_buf, data_len);
        mqp->pl_len += buf - ref;

        return buf - ref;
}

static bool proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
{
        uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);

        if(mqp_raw->pl_len < 2)
                return false;    /* Bytes for MSG ID not available */

        buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
        mqp_raw->vh_len += 2;
        mqp_raw->pl_len -= 2;

        return true;
}

bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
{
        return proc_vh_msg_id_rx(mqp_raw);
}

bool mqp_proc_msg_id_ack_rx(struct mqtt_packet *mqp_raw, bool has_pl)
{
        if((false == proc_vh_msg_id_rx(mqp_raw)) ||
           (has_pl ^ (!!mqp_raw->pl_len)))
                return false;

        return true;
}

bool mqp_proc_pub_rx(struct mqtt_packet *mqp_raw)
{
        uint8_t *buf = MQP_VHEADER_BUF(mqp_raw), *ref = buf;
        uint16_t tmp = 0;

        if(mqp_raw->pl_len < (buf - ref + 2))    /* Length Check  */
                return false;      /* No space to hold Topic size */

        buf += buf_rd_nbo_2B(buf, &tmp);         /* Topic  Length */
        buf += tmp;                              /* Topic Content */

        if(MQTT_QOS0 != ENUM_QOS(mqp_raw->fh_byte1)) {
                if(mqp_raw->pl_len < (buf - ref + 2))
                        return false;  /* No space to hold MSG ID */

                buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
        }

        mqp_raw->vh_len += buf - ref;
        mqp_raw->pl_len -= buf - ref;

        return true;
}

bool mqp_ack_wlist_append(struct mqtt_ack_wlist *list,
                          struct mqtt_packet    *elem)
{
        elem->next = NULL;

        if(list->tail) {
                list->tail->next = elem;
                list->tail = elem;
        } else {
                list->tail = elem;
                list->head = elem;
        }

        return true;
}

struct mqtt_packet *mqp_ack_wlist_remove(struct mqtt_ack_wlist *list,
                                          uint16_t msg_id)
{
        struct mqtt_packet *elem = list->head, *prev = NULL;

        while(elem) {
                if(msg_id == elem->msg_id) {
                        if(prev)
                                prev->next = elem->next;
                        else
                                list->head = elem->next;

                        if(NULL == list->head)
                                list->tail = NULL;

                        if(list->tail == elem)
                                list->tail = prev;

                        break;
                }

                prev = elem;
                elem = elem->next;
        }

        return elem;
}

void mqp_ack_wlist_purge(struct mqtt_ack_wlist *list)
{

        struct mqtt_packet *elem = list->head;

        while(elem) {
                struct mqtt_packet *next = elem->next;
                free_mqp(elem);
                elem = next;
        }

        list->head = NULL;
        list->tail = NULL;

        return;
}

void secure_conn_struct_init(struct secure_conn *nw_security)
{
        nw_security->method = nw_security->cipher = NULL; 
        nw_security->files  = NULL;
        nw_security->n_file = 0;

        return;
}

int32_t mqp_prep_fh(struct mqtt_packet *mqp, uint8_t flags)
{

        uint32_t remlen = mqp->vh_len + mqp->pl_len;
        uint8_t *buf    = mqp->buffer + mqp->offset;
        uint8_t *ref    = buf;

        buf -= buf_tail_wr_remlen(buf - MAX_REMLEN_BYTES, remlen);

        buf -= 1; /* Make space for FH Byte1 */        
        mqp->fh_byte1 = *buf = MAKE_FH_BYTE1(mqp->msg_type, flags);

        mqp->fh_len   = ref - buf;
        mqp->offset  -= ref - buf;
        
        return ref - buf;
}

/*
    Since, the network connection is a TCP socket stream, it is imperative that
    adequate checks are put in place to identify a MQTT packet and isolate it
    for further processing. The intent of following routine is to read just one
    packet from continuous stream and leave rest for the next iteration.
*/

#define RET_IF_ERR_IN_NET_RECV(net, buf, len, wait_secs, timed_out, ctx)\
        rv = net_ops->recv(net, buf, len, wait_secs, timed_out, ctx);   \
        if(rv < 1)                                                      \
                return MQP_ERR_NETWORK;

int32_t mqp_recv(int32_t net, const struct device_net_services *net_ops,
             struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out,
             void *ctx)
{
        uint8_t *buf = MQP_FHEADER_BUF(mqp), *ref = buf, fh_len = mqp->fh_len;
        uint32_t pl_len = mqp->pl_len, remlen = 0;
        int32_t rv;

        while(2 > fh_len) {
                RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs,
                                       timed_out, ctx);

                mqp->fh_len = ++fh_len;
        }

        while(buf[fh_len - 1] & 0x80) {
                if(fh_len > MAX_FH_LEN)
                        return MQP_ERR_NOT_DEF;/* Shouldn't happen */
                
                 RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs,
                                        timed_out, ctx);

                 mqp->fh_len = ++fh_len;
        }

        mqp_buf_rd_remlen(buf + 1, &remlen);
        if(mqp->maxlen < (remlen + fh_len))
                return MQP_ERR_PKT_LEN;  /* Inadequate free buffer */

        buf += fh_len;     /* Try to read all data that follows FH */
        while(pl_len < remlen) {
               RET_IF_ERR_IN_NET_RECV(net, buf + pl_len, remlen - pl_len,
                                      wait_secs, timed_out, ctx);

               mqp->pl_len = pl_len += rv;
        }

        /* Set up MQTT Packet for received data from broker */
        buf_wr_nbytes(MQP_FHEADER_BUF(mqp), ref, fh_len);
        mqp->fh_byte1 = *ref;
        mqp->msg_type = MSG_TYPE(*ref);

        return fh_len + remlen;
}

/*----------------------------------------------------------------------------
 * QoS2 PUB RX Message handling mechanism and associated house-keeping
 *--------------------------------------------------------------------------*/
void qos2_pub_cq_reset(struct pub_qos2_cq *cq)
{

        //        memset(cq->id_vec, 0, sizeof(cq->id_vec));
        buf_set((uint8_t*)cq->id_vec, 0, sizeof(cq->id_vec));
        cq->n_free = MAX_PUBREL_INFLT;
        cq->rd_idx = 0;
        cq->wr_idx = 0;

        return;
}

bool qos2_pub_cq_logup(struct pub_qos2_cq *cq, uint16_t msg_id)
{
        if(0 != cq->n_free) {
                cq->id_vec[cq->wr_idx++]  = msg_id;
                cq->wr_idx &= MAX_PUBREL_INFLT - 1;
                cq->n_free--;
                return true;
        }

        /* Must ensure through an out-of-band arrangement that a remote doesn't
           push more than MAX_PUBREL_INFLT in-flight QOS2 PUB packets to local.
           If it still happens that the local receives more in-flight messages,
           then it has no option but to close the network connection.

           Closing of network connection is left to invoker to of this service.
        */

        return false;
}

bool qos2_pub_cq_unlog(struct pub_qos2_cq *cq, uint16_t msg_id)
{
        /* Simple implementation. Should be good for embedded system */
        if(cq->n_free < MAX_PUBREL_INFLT) {
                /* Follow-up messages for QOS2 PUB must be transacted in the
                   same order as the initial sequence of QOS2 PUB dispatches.
                   Therefore, checking the first entry should be OK
                */
                if(msg_id == cq->id_vec[cq->rd_idx++]) {
                        cq->rd_idx &= MAX_PUBREL_INFLT - 1;
                        cq->n_free++;
                        return true;
                }
        }

        return false;
}

bool qos2_pub_cq_check(struct pub_qos2_cq *cq, uint16_t msg_id)
{
        uint8_t rd_idx = cq->rd_idx;
        uint8_t n_free = cq->n_free;
        uint8_t i = 0;

        /* Go through the vector to see if packet ID is still active */
        for(i = 0; i < (MAX_PUBREL_INFLT - n_free); i++) {
                    if(msg_id == cq->id_vec[rd_idx++]) 
                            return true;

                    rd_idx &= MAX_PUBREL_INFLT - 1;
        }

        return false;
}

/*----------------------------------------------------------------------------
 * Client Context related details and associated house-keeping
 *--------------------------------------------------------------------------*/
void cl_ctx_reset(struct client_ctx *cl_ctx)
{
        cl_ctx->usr       = NULL;
        cl_ctx->net       = -1;
        cl_ctx->ip_length = 16;

        cl_ctx->timeout   = 0;
        cl_ctx->ka_secs   = 0;

        cl_ctx->flags     = 0;

        return;
}

void cl_ctx_timeout_insert(struct client_ctx **head, struct client_ctx *elem)
{

        struct client_ctx *curr, *prev = NULL;

        if(NULL == *head) {
                *head = elem;
                return;
        }

        curr = *head;
        while(curr) {
                if(elem->timeout < curr->timeout) {
                        if(NULL == prev) {
                                elem->next = *head;
                                *head = elem;
                        } else {
                                prev->next = elem;
                                elem->next = curr;
                        }

                        break;
                }
                
                prev = curr;
                curr = curr->next;
        }

        if(NULL == curr)
                prev->next = elem;
}

void cl_ctx_remove(struct client_ctx **head, struct client_ctx *elem)
{
        struct client_ctx *curr = *head, *prev = NULL;

        while(curr) {
                if(curr == elem) {
                        if(NULL == prev) {
                                *head = elem->next;
                                prev  = *head;
                        } else {
                                prev->next = elem->next;
                        }

                        elem->next = NULL;
                        break;
                }

                prev = curr;
                curr = curr->next;
        }
}

void cl_ctx_timeout_update(struct client_ctx *cl_ctx, uint32_t now_secs)
{
        uint32_t timeout = KA_TIMEOUT_NONE;
        uint16_t ka_secs = cl_ctx->ka_secs;

        if((0 == ka_secs) && (KA_TIMEOUT_NONE == cl_ctx->timeout))
                return;

        if(0 != ka_secs)
                timeout = now_secs + ka_secs;

        cl_ctx->timeout = timeout;

        return;
}

}//namespace mbed_mqtt