TI's mqtt.

Dependencies:   client server

Committer:
dflet
Date:
Sat Jun 06 13:37:53 2015 +0000
Revision:
1:673880ad39ad
Parent:
0:698866e331b2
TI's mqtt

Who changed what in which revision?

UserRevisionLine numberNew contents of line
dflet 0:698866e331b2 1 /******************************************************************************
dflet 0:698866e331b2 2 *
dflet 0:698866e331b2 3 * Copyright (C) 2014 Texas Instruments Incorporated
dflet 0:698866e331b2 4 *
dflet 0:698866e331b2 5 * All rights reserved. Property of Texas Instruments Incorporated.
dflet 0:698866e331b2 6 * Restricted rights to use, duplicate or disclose this code are
dflet 0:698866e331b2 7 * granted through contract.
dflet 0:698866e331b2 8 *
dflet 0:698866e331b2 9 * The program may not be used without the written permission of
dflet 0:698866e331b2 10 * Texas Instruments Incorporated or against the terms and conditions
dflet 0:698866e331b2 11 * stipulated in the agreement under which this program has been supplied,
dflet 0:698866e331b2 12 * and under no circumstances can it be used with non-TI connectivity device.
dflet 0:698866e331b2 13 *
dflet 0:698866e331b2 14 ******************************************************************************/
dflet 0:698866e331b2 15
dflet 0:698866e331b2 16 /*
dflet 0:698866e331b2 17 mqtt_common.c
dflet 0:698866e331b2 18
dflet 0:698866e331b2 19 This module implements routines that are common to both client and server
dflet 0:698866e331b2 20 components.
dflet 0:698866e331b2 21 */
dflet 0:698866e331b2 22
dflet 0:698866e331b2 23 #include "mqtt_common.h"
dflet 0:698866e331b2 24 #include "cli_uart.h"
dflet 0:698866e331b2 25
dflet 0:698866e331b2 26 namespace mbed_mqtt {
dflet 0:698866e331b2 27
dflet 0:698866e331b2 28 static void free_mqp(struct mqtt_packet *mqp)
dflet 0:698866e331b2 29 {
dflet 0:698866e331b2 30 if((NULL != mqp->free) && (0 == --mqp->n_refs))
dflet 0:698866e331b2 31 mqp->free(mqp);
dflet 0:698866e331b2 32
dflet 0:698866e331b2 33 return;
dflet 0:698866e331b2 34 }
dflet 0:698866e331b2 35
dflet 0:698866e331b2 36 void mqp_free(struct mqtt_packet *mqp)
dflet 0:698866e331b2 37 {
dflet 0:698866e331b2 38 free_mqp(mqp);
dflet 0:698866e331b2 39 }
dflet 0:698866e331b2 40
dflet 0:698866e331b2 41 static void reset_mqp(struct mqtt_packet *mqp)
dflet 0:698866e331b2 42 {
dflet 0:698866e331b2 43
dflet 0:698866e331b2 44 /* Fields not handled here are meant to be left unaltered. */
dflet 0:698866e331b2 45 mqp->msg_type = 0x00;
dflet 0:698866e331b2 46 mqp->fh_byte1 = 0x00;
dflet 0:698866e331b2 47 mqp->msg_id = 0;
dflet 0:698866e331b2 48 mqp->fh_len = 0x00;
dflet 0:698866e331b2 49 mqp->vh_len = 0;
dflet 0:698866e331b2 50 mqp->pl_len = 0;
dflet 0:698866e331b2 51 mqp->private_ = 0;
dflet 0:698866e331b2 52 }
dflet 0:698866e331b2 53
dflet 0:698866e331b2 54 void mqp_reset(struct mqtt_packet *mqp)
dflet 0:698866e331b2 55 {
dflet 0:698866e331b2 56 reset_mqp(mqp);
dflet 0:698866e331b2 57 }
dflet 0:698866e331b2 58
dflet 0:698866e331b2 59 void mqp_init(struct mqtt_packet *mqp, uint8_t offset)
dflet 0:698866e331b2 60 {
dflet 0:698866e331b2 61 reset_mqp(mqp);
dflet 0:698866e331b2 62
dflet 0:698866e331b2 63 mqp->offset = offset;
dflet 0:698866e331b2 64 mqp->n_refs = 0x01;
dflet 0:698866e331b2 65 mqp->next = NULL;
dflet 0:698866e331b2 66 }
dflet 0:698866e331b2 67
dflet 0:698866e331b2 68 static int32_t buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8)
dflet 0:698866e331b2 69 {
dflet 0:698866e331b2 70 uint8_t *ref = buf;
dflet 0:698866e331b2 71
dflet 0:698866e331b2 72 buf += buf_wr_nbo_2B(buf, utf8->length);
dflet 0:698866e331b2 73 buf += buf_wr_nbytes(buf, (uint8_t*)utf8->buffer, utf8->length);
dflet 0:698866e331b2 74
dflet 0:698866e331b2 75 return buf - ref;
dflet 0:698866e331b2 76 }
dflet 0:698866e331b2 77
dflet 0:698866e331b2 78 int32_t mqp_buf_wr_utf8(uint8_t *buf, const struct utf8_string *utf8)
dflet 0:698866e331b2 79 {
dflet 0:698866e331b2 80 return buf_wr_utf8(buf, utf8);
dflet 0:698866e331b2 81 }
dflet 0:698866e331b2 82
dflet 0:698866e331b2 83 #define MAX_REMLEN_BYTES (MAX_FH_LEN - 1)
dflet 0:698866e331b2 84
dflet 0:698866e331b2 85 static int32_t buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen)
dflet 0:698866e331b2 86 {
dflet 0:698866e331b2 87
dflet 0:698866e331b2 88 uint8_t val[MAX_REMLEN_BYTES], i = 0;
dflet 0:698866e331b2 89
dflet 0:698866e331b2 90 do {
dflet 0:698866e331b2 91 val[i] = remlen & 0x7F; /* MOD 128 */
dflet 0:698866e331b2 92 remlen = remlen >> 7; /* DIV 128 */
dflet 0:698866e331b2 93
dflet 0:698866e331b2 94 if(remlen)
dflet 0:698866e331b2 95 val[i] |= 0x80;
dflet 0:698866e331b2 96
dflet 0:698866e331b2 97 i++;
dflet 0:698866e331b2 98
dflet 0:698866e331b2 99 } while(remlen > 0);
dflet 0:698866e331b2 100
dflet 0:698866e331b2 101 buf_wr_nbytes(buf + MAX_REMLEN_BYTES - i, val, i);
dflet 0:698866e331b2 102
dflet 0:698866e331b2 103 return i; /* # bytes written in buf */
dflet 0:698866e331b2 104 }
dflet 0:698866e331b2 105
dflet 0:698866e331b2 106 int32_t mqp_buf_tail_wr_remlen(uint8_t *buf, uint32_t remlen)
dflet 0:698866e331b2 107 {
dflet 0:698866e331b2 108 return buf_tail_wr_remlen(buf, remlen);
dflet 0:698866e331b2 109 }
dflet 0:698866e331b2 110
dflet 0:698866e331b2 111 static int32_t buf_rd_remlen(uint8_t *buf, uint32_t *remlen)
dflet 0:698866e331b2 112 {
dflet 0:698866e331b2 113 uint32_t val = 0, mul = 0;
dflet 0:698866e331b2 114 int32_t i = 0;
dflet 0:698866e331b2 115
dflet 0:698866e331b2 116 do {
dflet 0:698866e331b2 117 val += (buf[i] & 0x7F) << mul;
dflet 0:698866e331b2 118 mul += 7;
dflet 0:698866e331b2 119
dflet 0:698866e331b2 120 } while((buf[i++] & 0x80) &&
dflet 0:698866e331b2 121 (i < MAX_REMLEN_BYTES));
dflet 0:698866e331b2 122
dflet 0:698866e331b2 123 *remlen = val;
dflet 0:698866e331b2 124
dflet 0:698866e331b2 125 /* Return -1 if value was not found */
dflet 0:698866e331b2 126 return (buf[i - 1] & 0x80)? -1 : i;
dflet 0:698866e331b2 127 }
dflet 0:698866e331b2 128
dflet 0:698866e331b2 129 int32_t mqp_buf_rd_remlen(uint8_t *buf, uint32_t *remlen)
dflet 0:698866e331b2 130 {
dflet 0:698866e331b2 131 return buf_rd_remlen(buf, remlen);
dflet 0:698866e331b2 132 }
dflet 0:698866e331b2 133
dflet 0:698866e331b2 134 int32_t mqp_pub_append_topic(struct mqtt_packet *mqp, const struct utf8_string *topic,
dflet 0:698866e331b2 135 uint16_t msg_id)
dflet 0:698866e331b2 136 {
dflet 0:698866e331b2 137 uint8_t *buf = MQP_VHEADER_BUF(mqp), *ref = buf;
dflet 0:698866e331b2 138
dflet 0:698866e331b2 139 if(0 != mqp->vh_len){
dflet 0:698866e331b2 140 Uart_Write((uint8_t*)"Topic has been already added\r\n");
dflet 0:698866e331b2 141 return -1; /* Topic has been already added */
dflet 0:698866e331b2 142 }
dflet 0:698866e331b2 143 if(MQP_FREEBUF_LEN(mqp) < (topic + msg_id? 2 : 0)){
dflet 0:698866e331b2 144 Uart_Write((uint8_t*)"Can't WR topic\r\n");
dflet 0:698866e331b2 145 return MQP_ERR_PKT_LEN; /* Can't WR topic */
dflet 0:698866e331b2 146 }
dflet 0:698866e331b2 147 buf += buf_wr_utf8(buf, topic);
dflet 0:698866e331b2 148
dflet 0:698866e331b2 149 if(0 != msg_id) { /* MSG ID 0 indicates ==> QoS0 */
dflet 0:698866e331b2 150 mqp->msg_id = msg_id;
dflet 0:698866e331b2 151 buf += buf_wr_nbo_2B(buf, mqp->msg_id);
dflet 0:698866e331b2 152 }
dflet 0:698866e331b2 153
dflet 0:698866e331b2 154 mqp->vh_len += buf - ref;
dflet 0:698866e331b2 155
dflet 0:698866e331b2 156 return buf - ref;
dflet 0:698866e331b2 157 }
dflet 0:698866e331b2 158
dflet 0:698866e331b2 159 int32_t mqp_pub_append_data(struct mqtt_packet *mqp, const uint8_t *data_buf,
dflet 0:698866e331b2 160 uint32_t data_len)
dflet 0:698866e331b2 161 {
dflet 0:698866e331b2 162 uint8_t *buf = MQP_PAYLOAD_BUF(mqp) + mqp->pl_len, *ref = buf;
dflet 0:698866e331b2 163
dflet 0:698866e331b2 164 if(0 == mqp->vh_len)
dflet 0:698866e331b2 165 return -1; /* Must include topic first */
dflet 0:698866e331b2 166
dflet 0:698866e331b2 167 if(MQP_FREEBUF_LEN(mqp) < data_len)
dflet 0:698866e331b2 168 return MQP_ERR_PKT_LEN; /* Can't WR data */
dflet 0:698866e331b2 169
dflet 0:698866e331b2 170 /* Including payload info for PUBLISH */
dflet 0:698866e331b2 171 buf += buf_wr_nbytes(buf, data_buf, data_len);
dflet 0:698866e331b2 172 mqp->pl_len += buf - ref;
dflet 0:698866e331b2 173
dflet 0:698866e331b2 174 return buf - ref;
dflet 0:698866e331b2 175 }
dflet 0:698866e331b2 176
dflet 0:698866e331b2 177 static bool proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
dflet 0:698866e331b2 178 {
dflet 0:698866e331b2 179 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
dflet 0:698866e331b2 180
dflet 0:698866e331b2 181 if(mqp_raw->pl_len < 2)
dflet 0:698866e331b2 182 return false; /* Bytes for MSG ID not available */
dflet 0:698866e331b2 183
dflet 0:698866e331b2 184 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
dflet 0:698866e331b2 185 mqp_raw->vh_len += 2;
dflet 0:698866e331b2 186 mqp_raw->pl_len -= 2;
dflet 0:698866e331b2 187
dflet 0:698866e331b2 188 return true;
dflet 0:698866e331b2 189 }
dflet 0:698866e331b2 190
dflet 0:698866e331b2 191 bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
dflet 0:698866e331b2 192 {
dflet 0:698866e331b2 193 return proc_vh_msg_id_rx(mqp_raw);
dflet 0:698866e331b2 194 }
dflet 0:698866e331b2 195
dflet 0:698866e331b2 196 bool mqp_proc_msg_id_ack_rx(struct mqtt_packet *mqp_raw, bool has_pl)
dflet 0:698866e331b2 197 {
dflet 0:698866e331b2 198 if((false == proc_vh_msg_id_rx(mqp_raw)) ||
dflet 0:698866e331b2 199 (has_pl ^ (!!mqp_raw->pl_len)))
dflet 0:698866e331b2 200 return false;
dflet 0:698866e331b2 201
dflet 0:698866e331b2 202 return true;
dflet 0:698866e331b2 203 }
dflet 0:698866e331b2 204
dflet 0:698866e331b2 205 bool mqp_proc_pub_rx(struct mqtt_packet *mqp_raw)
dflet 0:698866e331b2 206 {
dflet 0:698866e331b2 207 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw), *ref = buf;
dflet 0:698866e331b2 208 uint16_t tmp = 0;
dflet 0:698866e331b2 209
dflet 0:698866e331b2 210 if(mqp_raw->pl_len < (buf - ref + 2)) /* Length Check */
dflet 0:698866e331b2 211 return false; /* No space to hold Topic size */
dflet 0:698866e331b2 212
dflet 0:698866e331b2 213 buf += buf_rd_nbo_2B(buf, &tmp); /* Topic Length */
dflet 0:698866e331b2 214 buf += tmp; /* Topic Content */
dflet 0:698866e331b2 215
dflet 0:698866e331b2 216 if(MQTT_QOS0 != ENUM_QOS(mqp_raw->fh_byte1)) {
dflet 0:698866e331b2 217 if(mqp_raw->pl_len < (buf - ref + 2))
dflet 0:698866e331b2 218 return false; /* No space to hold MSG ID */
dflet 0:698866e331b2 219
dflet 0:698866e331b2 220 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
dflet 0:698866e331b2 221 }
dflet 0:698866e331b2 222
dflet 0:698866e331b2 223 mqp_raw->vh_len += buf - ref;
dflet 0:698866e331b2 224 mqp_raw->pl_len -= buf - ref;
dflet 0:698866e331b2 225
dflet 0:698866e331b2 226 return true;
dflet 0:698866e331b2 227 }
dflet 0:698866e331b2 228
dflet 0:698866e331b2 229 bool mqp_ack_wlist_append(struct mqtt_ack_wlist *list,
dflet 0:698866e331b2 230 struct mqtt_packet *elem)
dflet 0:698866e331b2 231 {
dflet 0:698866e331b2 232 elem->next = NULL;
dflet 0:698866e331b2 233
dflet 0:698866e331b2 234 if(list->tail) {
dflet 0:698866e331b2 235 list->tail->next = elem;
dflet 0:698866e331b2 236 list->tail = elem;
dflet 0:698866e331b2 237 } else {
dflet 0:698866e331b2 238 list->tail = elem;
dflet 0:698866e331b2 239 list->head = elem;
dflet 0:698866e331b2 240 }
dflet 0:698866e331b2 241
dflet 0:698866e331b2 242 return true;
dflet 0:698866e331b2 243 }
dflet 0:698866e331b2 244
dflet 0:698866e331b2 245 struct mqtt_packet *mqp_ack_wlist_remove(struct mqtt_ack_wlist *list,
dflet 0:698866e331b2 246 uint16_t msg_id)
dflet 0:698866e331b2 247 {
dflet 0:698866e331b2 248 struct mqtt_packet *elem = list->head, *prev = NULL;
dflet 0:698866e331b2 249
dflet 0:698866e331b2 250 while(elem) {
dflet 0:698866e331b2 251 if(msg_id == elem->msg_id) {
dflet 0:698866e331b2 252 if(prev)
dflet 0:698866e331b2 253 prev->next = elem->next;
dflet 0:698866e331b2 254 else
dflet 0:698866e331b2 255 list->head = elem->next;
dflet 0:698866e331b2 256
dflet 0:698866e331b2 257 if(NULL == list->head)
dflet 0:698866e331b2 258 list->tail = NULL;
dflet 0:698866e331b2 259
dflet 0:698866e331b2 260 if(list->tail == elem)
dflet 0:698866e331b2 261 list->tail = prev;
dflet 0:698866e331b2 262
dflet 0:698866e331b2 263 break;
dflet 0:698866e331b2 264 }
dflet 0:698866e331b2 265
dflet 0:698866e331b2 266 prev = elem;
dflet 0:698866e331b2 267 elem = elem->next;
dflet 0:698866e331b2 268 }
dflet 0:698866e331b2 269
dflet 0:698866e331b2 270 return elem;
dflet 0:698866e331b2 271 }
dflet 0:698866e331b2 272
dflet 0:698866e331b2 273 void mqp_ack_wlist_purge(struct mqtt_ack_wlist *list)
dflet 0:698866e331b2 274 {
dflet 0:698866e331b2 275
dflet 0:698866e331b2 276 struct mqtt_packet *elem = list->head;
dflet 0:698866e331b2 277
dflet 0:698866e331b2 278 while(elem) {
dflet 0:698866e331b2 279 struct mqtt_packet *next = elem->next;
dflet 0:698866e331b2 280 free_mqp(elem);
dflet 0:698866e331b2 281 elem = next;
dflet 0:698866e331b2 282 }
dflet 0:698866e331b2 283
dflet 0:698866e331b2 284 list->head = NULL;
dflet 0:698866e331b2 285 list->tail = NULL;
dflet 0:698866e331b2 286
dflet 0:698866e331b2 287 return;
dflet 0:698866e331b2 288 }
dflet 0:698866e331b2 289
dflet 0:698866e331b2 290 void secure_conn_struct_init(struct secure_conn *nw_security)
dflet 0:698866e331b2 291 {
dflet 0:698866e331b2 292 nw_security->method = nw_security->cipher = NULL;
dflet 0:698866e331b2 293 nw_security->files = NULL;
dflet 0:698866e331b2 294 nw_security->n_file = 0;
dflet 0:698866e331b2 295
dflet 0:698866e331b2 296 return;
dflet 0:698866e331b2 297 }
dflet 0:698866e331b2 298
dflet 0:698866e331b2 299 int32_t mqp_prep_fh(struct mqtt_packet *mqp, uint8_t flags)
dflet 0:698866e331b2 300 {
dflet 0:698866e331b2 301
dflet 0:698866e331b2 302 uint32_t remlen = mqp->vh_len + mqp->pl_len;
dflet 0:698866e331b2 303 uint8_t *buf = mqp->buffer + mqp->offset;
dflet 0:698866e331b2 304 uint8_t *ref = buf;
dflet 0:698866e331b2 305
dflet 0:698866e331b2 306 buf -= buf_tail_wr_remlen(buf - MAX_REMLEN_BYTES, remlen);
dflet 0:698866e331b2 307
dflet 0:698866e331b2 308 buf -= 1; /* Make space for FH Byte1 */
dflet 0:698866e331b2 309 mqp->fh_byte1 = *buf = MAKE_FH_BYTE1(mqp->msg_type, flags);
dflet 0:698866e331b2 310
dflet 0:698866e331b2 311 mqp->fh_len = ref - buf;
dflet 0:698866e331b2 312 mqp->offset -= ref - buf;
dflet 0:698866e331b2 313
dflet 0:698866e331b2 314 return ref - buf;
dflet 0:698866e331b2 315 }
dflet 0:698866e331b2 316
dflet 0:698866e331b2 317 /*
dflet 0:698866e331b2 318 Since, the network connection is a TCP socket stream, it is imperative that
dflet 0:698866e331b2 319 adequate checks are put in place to identify a MQTT packet and isolate it
dflet 0:698866e331b2 320 for further processing. The intent of following routine is to read just one
dflet 0:698866e331b2 321 packet from continuous stream and leave rest for the next iteration.
dflet 0:698866e331b2 322 */
dflet 0:698866e331b2 323
dflet 0:698866e331b2 324 #define RET_IF_ERR_IN_NET_RECV(net, buf, len, wait_secs, timed_out, ctx)\
dflet 0:698866e331b2 325 rv = net_ops->recv(net, buf, len, wait_secs, timed_out, ctx); \
dflet 0:698866e331b2 326 if(rv < 1) \
dflet 0:698866e331b2 327 return MQP_ERR_NETWORK;
dflet 0:698866e331b2 328
dflet 0:698866e331b2 329 int32_t mqp_recv(int32_t net, const struct device_net_services *net_ops,
dflet 0:698866e331b2 330 struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out,
dflet 0:698866e331b2 331 void *ctx)
dflet 0:698866e331b2 332 {
dflet 0:698866e331b2 333 uint8_t *buf = MQP_FHEADER_BUF(mqp), *ref = buf, fh_len = mqp->fh_len;
dflet 0:698866e331b2 334 uint32_t pl_len = mqp->pl_len, remlen = 0;
dflet 0:698866e331b2 335 int32_t rv;
dflet 0:698866e331b2 336
dflet 0:698866e331b2 337 while(2 > fh_len) {
dflet 0:698866e331b2 338 RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs,
dflet 0:698866e331b2 339 timed_out, ctx);
dflet 0:698866e331b2 340
dflet 0:698866e331b2 341 mqp->fh_len = ++fh_len;
dflet 0:698866e331b2 342 }
dflet 0:698866e331b2 343
dflet 0:698866e331b2 344 while(buf[fh_len - 1] & 0x80) {
dflet 0:698866e331b2 345 if(fh_len > MAX_FH_LEN)
dflet 0:698866e331b2 346 return MQP_ERR_NOT_DEF;/* Shouldn't happen */
dflet 0:698866e331b2 347
dflet 0:698866e331b2 348 RET_IF_ERR_IN_NET_RECV(net, buf + fh_len, 1, wait_secs,
dflet 0:698866e331b2 349 timed_out, ctx);
dflet 0:698866e331b2 350
dflet 0:698866e331b2 351 mqp->fh_len = ++fh_len;
dflet 0:698866e331b2 352 }
dflet 0:698866e331b2 353
dflet 0:698866e331b2 354 mqp_buf_rd_remlen(buf + 1, &remlen);
dflet 0:698866e331b2 355 if(mqp->maxlen < (remlen + fh_len))
dflet 0:698866e331b2 356 return MQP_ERR_PKT_LEN; /* Inadequate free buffer */
dflet 0:698866e331b2 357
dflet 0:698866e331b2 358 buf += fh_len; /* Try to read all data that follows FH */
dflet 0:698866e331b2 359 while(pl_len < remlen) {
dflet 0:698866e331b2 360 RET_IF_ERR_IN_NET_RECV(net, buf + pl_len, remlen - pl_len,
dflet 0:698866e331b2 361 wait_secs, timed_out, ctx);
dflet 0:698866e331b2 362
dflet 0:698866e331b2 363 mqp->pl_len = pl_len += rv;
dflet 0:698866e331b2 364 }
dflet 0:698866e331b2 365
dflet 0:698866e331b2 366 /* Set up MQTT Packet for received data from broker */
dflet 0:698866e331b2 367 buf_wr_nbytes(MQP_FHEADER_BUF(mqp), ref, fh_len);
dflet 0:698866e331b2 368 mqp->fh_byte1 = *ref;
dflet 0:698866e331b2 369 mqp->msg_type = MSG_TYPE(*ref);
dflet 0:698866e331b2 370
dflet 0:698866e331b2 371 return fh_len + remlen;
dflet 0:698866e331b2 372 }
dflet 0:698866e331b2 373
dflet 0:698866e331b2 374 /*----------------------------------------------------------------------------
dflet 0:698866e331b2 375 * QoS2 PUB RX Message handling mechanism and associated house-keeping
dflet 0:698866e331b2 376 *--------------------------------------------------------------------------*/
dflet 0:698866e331b2 377 void qos2_pub_cq_reset(struct pub_qos2_cq *cq)
dflet 0:698866e331b2 378 {
dflet 0:698866e331b2 379
dflet 0:698866e331b2 380 // memset(cq->id_vec, 0, sizeof(cq->id_vec));
dflet 0:698866e331b2 381 buf_set((uint8_t*)cq->id_vec, 0, sizeof(cq->id_vec));
dflet 0:698866e331b2 382 cq->n_free = MAX_PUBREL_INFLT;
dflet 0:698866e331b2 383 cq->rd_idx = 0;
dflet 0:698866e331b2 384 cq->wr_idx = 0;
dflet 0:698866e331b2 385
dflet 0:698866e331b2 386 return;
dflet 0:698866e331b2 387 }
dflet 0:698866e331b2 388
dflet 0:698866e331b2 389 bool qos2_pub_cq_logup(struct pub_qos2_cq *cq, uint16_t msg_id)
dflet 0:698866e331b2 390 {
dflet 0:698866e331b2 391 if(0 != cq->n_free) {
dflet 0:698866e331b2 392 cq->id_vec[cq->wr_idx++] = msg_id;
dflet 0:698866e331b2 393 cq->wr_idx &= MAX_PUBREL_INFLT - 1;
dflet 0:698866e331b2 394 cq->n_free--;
dflet 0:698866e331b2 395 return true;
dflet 0:698866e331b2 396 }
dflet 0:698866e331b2 397
dflet 0:698866e331b2 398 /* Must ensure through an out-of-band arrangement that a remote doesn't
dflet 0:698866e331b2 399 push more than MAX_PUBREL_INFLT in-flight QOS2 PUB packets to local.
dflet 0:698866e331b2 400 If it still happens that the local receives more in-flight messages,
dflet 0:698866e331b2 401 then it has no option but to close the network connection.
dflet 0:698866e331b2 402
dflet 0:698866e331b2 403 Closing of network connection is left to invoker to of this service.
dflet 0:698866e331b2 404 */
dflet 0:698866e331b2 405
dflet 0:698866e331b2 406 return false;
dflet 0:698866e331b2 407 }
dflet 0:698866e331b2 408
dflet 0:698866e331b2 409 bool qos2_pub_cq_unlog(struct pub_qos2_cq *cq, uint16_t msg_id)
dflet 0:698866e331b2 410 {
dflet 0:698866e331b2 411 /* Simple implementation. Should be good for embedded system */
dflet 0:698866e331b2 412 if(cq->n_free < MAX_PUBREL_INFLT) {
dflet 0:698866e331b2 413 /* Follow-up messages for QOS2 PUB must be transacted in the
dflet 0:698866e331b2 414 same order as the initial sequence of QOS2 PUB dispatches.
dflet 0:698866e331b2 415 Therefore, checking the first entry should be OK
dflet 0:698866e331b2 416 */
dflet 0:698866e331b2 417 if(msg_id == cq->id_vec[cq->rd_idx++]) {
dflet 0:698866e331b2 418 cq->rd_idx &= MAX_PUBREL_INFLT - 1;
dflet 0:698866e331b2 419 cq->n_free++;
dflet 0:698866e331b2 420 return true;
dflet 0:698866e331b2 421 }
dflet 0:698866e331b2 422 }
dflet 0:698866e331b2 423
dflet 0:698866e331b2 424 return false;
dflet 0:698866e331b2 425 }
dflet 0:698866e331b2 426
dflet 0:698866e331b2 427 bool qos2_pub_cq_check(struct pub_qos2_cq *cq, uint16_t msg_id)
dflet 0:698866e331b2 428 {
dflet 0:698866e331b2 429 uint8_t rd_idx = cq->rd_idx;
dflet 0:698866e331b2 430 uint8_t n_free = cq->n_free;
dflet 0:698866e331b2 431 uint8_t i = 0;
dflet 0:698866e331b2 432
dflet 0:698866e331b2 433 /* Go through the vector to see if packet ID is still active */
dflet 0:698866e331b2 434 for(i = 0; i < (MAX_PUBREL_INFLT - n_free); i++) {
dflet 0:698866e331b2 435 if(msg_id == cq->id_vec[rd_idx++])
dflet 0:698866e331b2 436 return true;
dflet 0:698866e331b2 437
dflet 0:698866e331b2 438 rd_idx &= MAX_PUBREL_INFLT - 1;
dflet 0:698866e331b2 439 }
dflet 0:698866e331b2 440
dflet 0:698866e331b2 441 return false;
dflet 0:698866e331b2 442 }
dflet 0:698866e331b2 443
dflet 0:698866e331b2 444 /*----------------------------------------------------------------------------
dflet 0:698866e331b2 445 * Client Context related details and associated house-keeping
dflet 0:698866e331b2 446 *--------------------------------------------------------------------------*/
dflet 0:698866e331b2 447 void cl_ctx_reset(struct client_ctx *cl_ctx)
dflet 0:698866e331b2 448 {
dflet 0:698866e331b2 449 cl_ctx->usr = NULL;
dflet 0:698866e331b2 450 cl_ctx->net = -1;
dflet 0:698866e331b2 451 cl_ctx->ip_length = 16;
dflet 0:698866e331b2 452
dflet 0:698866e331b2 453 cl_ctx->timeout = 0;
dflet 0:698866e331b2 454 cl_ctx->ka_secs = 0;
dflet 0:698866e331b2 455
dflet 0:698866e331b2 456 cl_ctx->flags = 0;
dflet 0:698866e331b2 457
dflet 0:698866e331b2 458 return;
dflet 0:698866e331b2 459 }
dflet 0:698866e331b2 460
dflet 0:698866e331b2 461 void cl_ctx_timeout_insert(struct client_ctx **head, struct client_ctx *elem)
dflet 0:698866e331b2 462 {
dflet 0:698866e331b2 463
dflet 0:698866e331b2 464 struct client_ctx *curr, *prev = NULL;
dflet 0:698866e331b2 465
dflet 0:698866e331b2 466 if(NULL == *head) {
dflet 0:698866e331b2 467 *head = elem;
dflet 0:698866e331b2 468 return;
dflet 0:698866e331b2 469 }
dflet 0:698866e331b2 470
dflet 0:698866e331b2 471 curr = *head;
dflet 0:698866e331b2 472 while(curr) {
dflet 0:698866e331b2 473 if(elem->timeout < curr->timeout) {
dflet 0:698866e331b2 474 if(NULL == prev) {
dflet 0:698866e331b2 475 elem->next = *head;
dflet 0:698866e331b2 476 *head = elem;
dflet 0:698866e331b2 477 } else {
dflet 0:698866e331b2 478 prev->next = elem;
dflet 0:698866e331b2 479 elem->next = curr;
dflet 0:698866e331b2 480 }
dflet 0:698866e331b2 481
dflet 0:698866e331b2 482 break;
dflet 0:698866e331b2 483 }
dflet 0:698866e331b2 484
dflet 0:698866e331b2 485 prev = curr;
dflet 0:698866e331b2 486 curr = curr->next;
dflet 0:698866e331b2 487 }
dflet 0:698866e331b2 488
dflet 0:698866e331b2 489 if(NULL == curr)
dflet 0:698866e331b2 490 prev->next = elem;
dflet 0:698866e331b2 491 }
dflet 0:698866e331b2 492
dflet 0:698866e331b2 493 void cl_ctx_remove(struct client_ctx **head, struct client_ctx *elem)
dflet 0:698866e331b2 494 {
dflet 0:698866e331b2 495 struct client_ctx *curr = *head, *prev = NULL;
dflet 0:698866e331b2 496
dflet 0:698866e331b2 497 while(curr) {
dflet 0:698866e331b2 498 if(curr == elem) {
dflet 0:698866e331b2 499 if(NULL == prev) {
dflet 0:698866e331b2 500 *head = elem->next;
dflet 0:698866e331b2 501 prev = *head;
dflet 0:698866e331b2 502 } else {
dflet 0:698866e331b2 503 prev->next = elem->next;
dflet 0:698866e331b2 504 }
dflet 0:698866e331b2 505
dflet 0:698866e331b2 506 elem->next = NULL;
dflet 0:698866e331b2 507 break;
dflet 0:698866e331b2 508 }
dflet 0:698866e331b2 509
dflet 0:698866e331b2 510 prev = curr;
dflet 0:698866e331b2 511 curr = curr->next;
dflet 0:698866e331b2 512 }
dflet 0:698866e331b2 513 }
dflet 0:698866e331b2 514
dflet 0:698866e331b2 515 void cl_ctx_timeout_update(struct client_ctx *cl_ctx, uint32_t now_secs)
dflet 0:698866e331b2 516 {
dflet 0:698866e331b2 517 uint32_t timeout = KA_TIMEOUT_NONE;
dflet 0:698866e331b2 518 uint16_t ka_secs = cl_ctx->ka_secs;
dflet 0:698866e331b2 519
dflet 0:698866e331b2 520 if((0 == ka_secs) && (KA_TIMEOUT_NONE == cl_ctx->timeout))
dflet 0:698866e331b2 521 return;
dflet 0:698866e331b2 522
dflet 0:698866e331b2 523 if(0 != ka_secs)
dflet 0:698866e331b2 524 timeout = now_secs + ka_secs;
dflet 0:698866e331b2 525
dflet 0:698866e331b2 526 cl_ctx->timeout = timeout;
dflet 0:698866e331b2 527
dflet 0:698866e331b2 528 return;
dflet 0:698866e331b2 529 }
dflet 0:698866e331b2 530
dflet 0:698866e331b2 531 }//namespace mbed_mqtt
dflet 0:698866e331b2 532