Update revision to use TI's mqtt and Freertos.
Dependencies: mbed client server
Fork of cc3100_Test_mqtt_CM3 by
mqtt_V1/common/mqtt_common.cpp
- Committer:
- dflet
- Date:
- 2015-09-03
- Revision:
- 3:a8c249046181
File content as of revision 3:a8c249046181:
/****************************************************************************** * * Copyright (C) 2014 Texas Instruments Incorporated * * All rights reserved. Property of Texas Instruments Incorporated. * Restricted rights to use, duplicate or disclose this code are * granted through contract. * * The program may not be used without the written permission of * Texas Instruments Incorporated or against the terms and conditions * stipulated in the agreement under which this program has been supplied, * and under no circumstances can it be used with non-TI connectivity device. * ******************************************************************************/ /* mqtt_common.c This module implements routines that are common to both client and server components. */ #include "mqtt_common.h" #include "cli_uart.h" namespace mbed_mqtt { static void free_mqp(struct mqtt_packet *mqp) { if((NULL != mqp->free) && (0 == --mqp->n_refs)) mqp->free(mqp); return; } void mqp_free(struct mqtt_packet *mqp) { free_mqp(mqp); } static void reset_mqp(struct mqtt_packet *mqp) { /* Fields not handled here are meant to be left unaltered. */ mqp->msg_type = 0x00; mqp->fh_byte1 = 0x00; mqp->msg_id = 0; mqp->fh_len = 0x00; mqp->vh_len = 0; mqp->pl_len = 0; mqp->private_ = 0; } void mqp_reset(struct mqtt_packet *mqp) { reset_mqp(mqp); } void mqp_init(struct mqtt_packet *mqp, uint8_t offset) { reset_mqp(mqp); mqp->offset = offset; mqp->n_refs = 0x01; mqp->next = NULL; } static int32_t buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8) { uint8_t *ref = buf; buf += buf_wr_nbo_2B(buf, utf8->length); buf += buf_wr_nbytes(buf, (uint8_t*)utf8->buffer, utf8->length); return buf - ref; } int32_t mqp_buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8) { return buf_wr_utf8(buf, utf8); } #define MAX_REMLEN_BYTES (MAX_FH_LEN - 1) static int32_t buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen) { uint8_t val[MAX_REMLEN_BYTES], i = 0; do { val[i] = remlen & 0x7F; /* MOD 128 */ remlen = remlen >> 7; /* DIV 128 */ if(remlen) val[i] |= 0x80; i++; } while(remlen > 0); buf_wr_nbytes(buf + MAX_REMLEN_BYTES - i, val, i); return i; /* # bytes written in buf */ } int32_t mqp_buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen) { return buf_tail_wr_remlen(buf, remlen); } static int32_t buf_rd_remlen(uint8_t *buf, uint32_t *remlen) { uint32_t val = 0, mul = 0; int32_t i = 0; do { val += (buf[i] & 0x7F) << mul; mul += 7; } while((buf[i++] & 0x80) && (i < MAX_REMLEN_BYTES)); *remlen = val; /* Return -1 if value was not found */ return (buf[i - 1] & 0x80)? -1 : i; } int32_t mqp_buf_rd_remlen(uint8_t *buf, uint32_t *remlen) { return buf_rd_remlen(buf, remlen); } int32_t mqp_pub_append_topic(struct mqtt_packet *mqp, const struct utf8_string *topic, uint16_t msg_id) { uint8_t *buf = MQP_VHEADER_BUF(mqp), *ref = buf; if(0 != mqp->vh_len){ Uart_Write((uint8_t*)"Topic has been already added\r\n"); return -1; /* Topic has been already added */ } if(MQP_FREEBUF_LEN(mqp) < (topic + msg_id? 2 : 0)){ Uart_Write((uint8_t*)"Can't WR topic\r\n"); return MQP_ERR_PKT_LEN; /* Can't WR topic */ } buf += buf_wr_utf8(buf, topic); if(0 != msg_id) { /* MSG ID 0 indicates ==> QoS0 */ mqp->msg_id = msg_id; buf += buf_wr_nbo_2B(buf, mqp->msg_id); } mqp->vh_len += buf - ref; return buf - ref; } int32_t mqp_pub_append_data(struct mqtt_packet *mqp, const uint8_t *data_buf, uint32_t data_len) { uint8_t *buf = MQP_PAYLOAD_BUF(mqp) + mqp->pl_len, *ref = buf; if(0 == mqp->vh_len) return -1; /* Must include topic first */ if(MQP_FREEBUF_LEN(mqp) < data_len) return MQP_ERR_PKT_LEN; /* Can't WR data */ /* Including payload info for PUBLISH */ buf += buf_wr_nbytes(buf, data_buf, data_len); mqp->pl_len += buf - ref; return buf - ref; } static bool proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) { uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); if(mqp_raw->pl_len < 2) return false; /* Bytes for MSG ID not available */ buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); mqp_raw->vh_len += 2; mqp_raw->pl_len -= 2; return true; } bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) { return proc_vh_msg_id_rx(mqp_raw); } bool mqp_proc_msg_id_ack_rx(struct mqtt_packet *mqp_raw, bool has_pl) { if((false == proc_vh_msg_id_rx(mqp_raw)) || (has_pl ^ (!!mqp_raw->pl_len))) return false; return true; } bool mqp_proc_pub_rx(struct mqtt_packet *mqp_raw) { uint8_t *buf = MQP_VHEADER_BUF(mqp_raw), *ref = buf; uint16_t tmp = 0; if(mqp_raw->pl_len < (buf - ref + 2)) /* Length Check */ return false; /* No space to hold Topic size */ buf += buf_rd_nbo_2B(buf, &tmp); /* Topic Length */ buf += tmp; /* Topic Content */ if(MQTT_QOS0 != ENUM_QOS(mqp_raw->fh_byte1)) { if(mqp_raw->pl_len < (buf - ref + 2)) return false; /* No space to hold MSG ID */ buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); } mqp_raw->vh_len += buf - ref; mqp_raw->pl_len -= buf - ref; return true; } bool mqp_ack_wlist_append(struct mqtt_ack_wlist *list, struct mqtt_packet *elem) { elem->next = NULL; if(list->tail) { list->tail->next = elem; list->tail = elem; } else { list->tail = elem; list->head = elem; } return true; } struct mqtt_packet *mqp_ack_wlist_remove(struct mqtt_ack_wlist *list, uint16_t msg_id) { struct mqtt_packet *elem = list->head, *prev = NULL; while(elem) { if(msg_id == elem->msg_id) { if(prev) prev->next = elem->next; else list->head = elem->next; if(NULL == list->head) list->tail = NULL; if(list->tail == elem) list->tail = prev; break; } prev = elem; elem = elem->next; } return elem; } void mqp_ack_wlist_purge(struct mqtt_ack_wlist *list) { struct mqtt_packet *elem = list->head; while(elem) { struct mqtt_packet *next = elem->next; free_mqp(elem); elem = next; } list->head = NULL; list->tail = NULL; return; } void secure_conn_struct_init(struct secure_conn *nw_security) { nw_security->method = nw_security->cipher = NULL; nw_security->files = NULL; nw_security->n_file = 0; return; } int32_t mqp_prep_fh(struct mqtt_packet *mqp, uint8_t flags) { uint32_t remlen = mqp->vh_len + mqp->pl_len; uint8_t *buf = mqp->buffer + mqp->offset; uint8_t *ref = buf; buf -= buf_tail_wr_remlen(buf - MAX_REMLEN_BYTES, remlen); buf -= 1; /* Make space for FH Byte1 */ mqp->fh_byte1 = *buf = MAKE_FH_BYTE1(mqp->msg_type, flags); mqp->fh_len = ref - buf; mqp->offset -= ref - buf; return ref - buf; } /* Since, the network connection is a TCP socket stream, it is imperative that adequate checks are put in place to identify a MQTT packet and isolate it for further processing. The intent of following routine is to read just one packet from continuous stream and leave rest for the next iteration. */ #define RET_IF_ERR_IN_NET_RECV(net, buf, len, wait_secs, timed_out, ctx)\ rv = net_ops->recv(net, buf, len, wait_secs, timed_out, ctx); \ if(rv < 1) \ return MQP_ERR_NETWORK; int32_t mqp_recv(int32_t net, const struct device_net_services *net_ops, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out, void *ctx) { uint8_t *buf = MQP_FHEADER_BUF(mqp), *ref = buf, fh_len = mqp->fh_len; uint32_t pl_len = mqp->pl_len, remlen = 0; int32_t rv; while(2 > fh_len) { RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs, timed_out, ctx); mqp->fh_len = ++fh_len; } while(buf[fh_len - 1] & 0x80) { if(fh_len > MAX_FH_LEN) return MQP_ERR_NOT_DEF;/* Shouldn't happen */ RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs, timed_out, ctx); mqp->fh_len = ++fh_len; } mqp_buf_rd_remlen(buf + 1, &remlen); if(mqp->maxlen < (remlen + fh_len)) return MQP_ERR_PKT_LEN; /* Inadequate free buffer */ buf += fh_len; /* Try to read all data that follows FH */ while(pl_len < remlen) { RET_IF_ERR_IN_NET_RECV(net, buf + pl_len, remlen - pl_len, wait_secs, timed_out, ctx); mqp->pl_len = pl_len += rv; } /* Set up MQTT Packet for received data from broker */ buf_wr_nbytes(MQP_FHEADER_BUF(mqp), ref, fh_len); mqp->fh_byte1 = *ref; mqp->msg_type = MSG_TYPE(*ref); return fh_len + remlen; } /*---------------------------------------------------------------------------- * QoS2 PUB RX Message handling mechanism and associated house-keeping *--------------------------------------------------------------------------*/ void qos2_pub_cq_reset(struct pub_qos2_cq *cq) { // memset(cq->id_vec, 0, sizeof(cq->id_vec)); buf_set((uint8_t*)cq->id_vec, 0, sizeof(cq->id_vec)); cq->n_free = MAX_PUBREL_INFLT; cq->rd_idx = 0; cq->wr_idx = 0; return; } bool qos2_pub_cq_logup(struct pub_qos2_cq *cq, uint16_t msg_id) { if(0 != cq->n_free) { cq->id_vec[cq->wr_idx++] = msg_id; cq->wr_idx &= MAX_PUBREL_INFLT - 1; cq->n_free--; return true; } /* Must ensure through an out-of-band arrangement that a remote doesn't push more than MAX_PUBREL_INFLT in-flight QOS2 PUB packets to local. If it still happens that the local receives more in-flight messages, then it has no option but to close the network connection. Closing of network connection is left to invoker to of this service. */ return false; } bool qos2_pub_cq_unlog(struct pub_qos2_cq *cq, uint16_t msg_id) { /* Simple implementation. Should be good for embedded system */ if(cq->n_free < MAX_PUBREL_INFLT) { /* Follow-up messages for QOS2 PUB must be transacted in the same order as the initial sequence of QOS2 PUB dispatches. Therefore, checking the first entry should be OK */ if(msg_id == cq->id_vec[cq->rd_idx++]) { cq->rd_idx &= MAX_PUBREL_INFLT - 1; cq->n_free++; return true; } } return false; } bool qos2_pub_cq_check(struct pub_qos2_cq *cq, uint16_t msg_id) { uint8_t rd_idx = cq->rd_idx; uint8_t n_free = cq->n_free; uint8_t i = 0; /* Go through the vector to see if packet ID is still active */ for(i = 0; i < (MAX_PUBREL_INFLT - n_free); i++) { if(msg_id == cq->id_vec[rd_idx++]) return true; rd_idx &= MAX_PUBREL_INFLT - 1; } return false; } /*---------------------------------------------------------------------------- * Client Context related details and associated house-keeping *--------------------------------------------------------------------------*/ void cl_ctx_reset(struct client_ctx *cl_ctx) { cl_ctx->usr = NULL; cl_ctx->net = -1; cl_ctx->ip_length = 16; cl_ctx->timeout = 0; cl_ctx->ka_secs = 0; cl_ctx->flags = 0; return; } void cl_ctx_timeout_insert(struct client_ctx **head, struct client_ctx *elem) { struct client_ctx *curr, *prev = NULL; if(NULL == *head) { *head = elem; return; } curr = *head; while(curr) { if(elem->timeout < curr->timeout) { if(NULL == prev) { elem->next = *head; *head = elem; } else { prev->next = elem; elem->next = curr; } break; } prev = curr; curr = curr->next; } if(NULL == curr) prev->next = elem; } void cl_ctx_remove(struct client_ctx **head, struct client_ctx *elem) { struct client_ctx *curr = *head, *prev = NULL; while(curr) { if(curr == elem) { if(NULL == prev) { *head = elem->next; prev = *head; } else { prev->next = elem->next; } elem->next = NULL; break; } prev = curr; curr = curr->next; } } void cl_ctx_timeout_update(struct client_ctx *cl_ctx, uint32_t now_secs) { uint32_t timeout = KA_TIMEOUT_NONE; uint16_t ka_secs = cl_ctx->ka_secs; if((0 == ka_secs) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) return; if(0 != ka_secs) timeout = now_secs + ka_secs; cl_ctx->timeout = timeout; return; } }//namespace mbed_mqtt