Part of TI's mqtt

Dependents:   mqtt_V1 cc3100_Test_mqtt_CM3

Committer:
dflet
Date:
Sat Jun 06 13:29:08 2015 +0000
Revision:
0:547251f42a60
Part of mqtt_V1

Who changed what in which revision?

UserRevisionLine numberNew contents of line
dflet 0:547251f42a60 1 /******************************************************************************
dflet 0:547251f42a60 2 *
dflet 0:547251f42a60 3 * Copyright (C) 2014 Texas Instruments Incorporated
dflet 0:547251f42a60 4 *
dflet 0:547251f42a60 5 * All rights reserved. Property of Texas Instruments Incorporated.
dflet 0:547251f42a60 6 * Restricted rights to use, duplicate or disclose this code are
dflet 0:547251f42a60 7 * granted through contract.
dflet 0:547251f42a60 8 *
dflet 0:547251f42a60 9 * The program may not be used without the written permission of
dflet 0:547251f42a60 10 * Texas Instruments Incorporated or against the terms and conditions
dflet 0:547251f42a60 11 * stipulated in the agreement under which this program has been supplied,
dflet 0:547251f42a60 12 * and under no circumstances can it be used with non-TI connectivity device.
dflet 0:547251f42a60 13 *
dflet 0:547251f42a60 14 ******************************************************************************/
dflet 0:547251f42a60 15
dflet 0:547251f42a60 16 #include "server_pkts.h"
dflet 0:547251f42a60 17
dflet 0:547251f42a60 18 namespace mbed_mqtt {
dflet 0:547251f42a60 19
dflet 0:547251f42a60 20 /*-----------------------------------------------------------------------------
dflet 0:547251f42a60 21 * Note: Do not create additional dependency of this file on any header other
dflet 0:547251f42a60 22 * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the
dflet 0:547251f42a60 23 * mqtt_common.[hc] files must be facilitated to create a stand-alone library.
dflet 0:547251f42a60 24 *-----------------------------------------------------------------------------
dflet 0:547251f42a60 25 */
dflet 0:547251f42a60 26
dflet 0:547251f42a60 27 static void *mutex = NULL;
dflet 0:547251f42a60 28 static void (*mutex_lockin)(void*) = NULL;
dflet 0:547251f42a60 29 static void (*mutex_unlock)(void*) = NULL;
dflet 0:547251f42a60 30
dflet 0:547251f42a60 31 #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex);
dflet 0:547251f42a60 32 #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex);
dflet 0:547251f42a60 33
dflet 0:547251f42a60 34 static bool aux_dbg_enbl = false;
dflet 0:547251f42a60 35 static int32_t (*debug_printf)(const char *fmt, ...) = NULL;
dflet 0:547251f42a60 36
dflet 0:547251f42a60 37 #define USR_INFO debug_printf
dflet 0:547251f42a60 38 #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__)
dflet 0:547251f42a60 39
dflet 0:547251f42a60 40 static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end,
dflet 0:547251f42a60 41 struct utf8_string *utf8)
dflet 0:547251f42a60 42 {
dflet 0:547251f42a60 43 const uint8_t *ref = buf; /* Reference */
dflet 0:547251f42a60 44 uint16_t len = 0; /* UTF8 Size */
dflet 0:547251f42a60 45
dflet 0:547251f42a60 46 if(end - buf < 2)
dflet 0:547251f42a60 47 return -1; /* No valid buffer to hold UTF8 size */
dflet 0:547251f42a60 48
dflet 0:547251f42a60 49 buf += buf_rd_nbo_2B(buf, &len);
dflet 0:547251f42a60 50 if(end - buf < len)
dflet 0:547251f42a60 51 return -1; /* No valid buffer to hold UTF8 name */
dflet 0:547251f42a60 52
dflet 0:547251f42a60 53 utf8->length = len;
dflet 0:547251f42a60 54 utf8->buffer = len? (char*)buf : NULL;
dflet 0:547251f42a60 55
dflet 0:547251f42a60 56 return buf + len - ref;
dflet 0:547251f42a60 57 }
dflet 0:547251f42a60 58
dflet 0:547251f42a60 59 static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL;
dflet 0:547251f42a60 60 static const struct device_net_services *net_ops = NULL;
dflet 0:547251f42a60 61
dflet 0:547251f42a60 62 #ifndef CFG_SR_MQTT_CTXS
dflet 0:547251f42a60 63 #define MAX_NWCONN 6
dflet 0:547251f42a60 64 #else
dflet 0:547251f42a60 65 #define MAX_NWCONN CFG_SR_MQTT_CTXS
dflet 0:547251f42a60 66 #endif
dflet 0:547251f42a60 67
dflet 0:547251f42a60 68 static struct client_ctx contexts[MAX_NWCONN];
dflet 0:547251f42a60 69
dflet 0:547251f42a60 70 static struct client_ctx *used_ctxs = NULL;
dflet 0:547251f42a60 71 static struct client_ctx *free_ctxs = NULL;
dflet 0:547251f42a60 72
dflet 0:547251f42a60 73
dflet 0:547251f42a60 74 #define NETWORK_CLOSE_FLAG 0x00200000
dflet 0:547251f42a60 75 #define NW_CONN_ERROR_FLAG 0x00400000
dflet 0:547251f42a60 76 #define RCVD_CONN_MSG_FLAG 0x00800000
dflet 0:547251f42a60 77
dflet 0:547251f42a60 78 #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG)
dflet 0:547251f42a60 79
dflet 0:547251f42a60 80 static void cl_ctx_init(void)
dflet 0:547251f42a60 81 {
dflet 0:547251f42a60 82 int32_t i = 0;
dflet 0:547251f42a60 83 for(i = 0; i < MAX_NWCONN; i++) {
dflet 0:547251f42a60 84 struct client_ctx *cl_ctx = contexts + i;
dflet 0:547251f42a60 85
dflet 0:547251f42a60 86 cl_ctx_reset(cl_ctx);
dflet 0:547251f42a60 87
dflet 0:547251f42a60 88 cl_ctx->next = free_ctxs;
dflet 0:547251f42a60 89 free_ctxs = cl_ctx;
dflet 0:547251f42a60 90 }
dflet 0:547251f42a60 91 }
dflet 0:547251f42a60 92
dflet 0:547251f42a60 93 static void cl_ctx_free(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 94 {
dflet 0:547251f42a60 95 cl_ctx_reset(cl_ctx);
dflet 0:547251f42a60 96
dflet 0:547251f42a60 97 cl_ctx->next = free_ctxs;
dflet 0:547251f42a60 98 free_ctxs = cl_ctx;
dflet 0:547251f42a60 99
dflet 0:547251f42a60 100 return;
dflet 0:547251f42a60 101 }
dflet 0:547251f42a60 102
dflet 0:547251f42a60 103 static struct client_ctx *cl_ctx_alloc(void)
dflet 0:547251f42a60 104 {
dflet 0:547251f42a60 105 struct client_ctx *cl_ctx = free_ctxs;
dflet 0:547251f42a60 106 if(cl_ctx) {
dflet 0:547251f42a60 107 free_ctxs = cl_ctx->next;
dflet 0:547251f42a60 108 cl_ctx->next = NULL;
dflet 0:547251f42a60 109 } else
dflet 0:547251f42a60 110 USR_INFO("S: fatal, no free cl_ctx\n\r");
dflet 0:547251f42a60 111
dflet 0:547251f42a60 112 return cl_ctx;
dflet 0:547251f42a60 113 }
dflet 0:547251f42a60 114
dflet 0:547251f42a60 115 static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 116 {
dflet 0:547251f42a60 117 return (cl_ctx->flags & RCVD_CONN_MSG_FLAG);
dflet 0:547251f42a60 118 }
dflet 0:547251f42a60 119
dflet 0:547251f42a60 120 static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 121 {
dflet 0:547251f42a60 122 cl_ctx->flags |= RCVD_CONN_MSG_FLAG;
dflet 0:547251f42a60 123 }
dflet 0:547251f42a60 124
dflet 0:547251f42a60 125 static void used_ctxs_insert(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 126 {
dflet 0:547251f42a60 127 cl_ctx_timeout_insert(&used_ctxs, cl_ctx);
dflet 0:547251f42a60 128 }
dflet 0:547251f42a60 129
dflet 0:547251f42a60 130 static void used_ctxs_remove(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 131 {
dflet 0:547251f42a60 132 cl_ctx_remove(&used_ctxs, cl_ctx);
dflet 0:547251f42a60 133 }
dflet 0:547251f42a60 134
dflet 0:547251f42a60 135 static int32_t loopb_net = -1;
dflet 0:547251f42a60 136 static const uint8_t LOOP_DATA[] = {0x00, 0x01};
dflet 0:547251f42a60 137 #define LOOP_DLEN sizeof(LOOP_DATA)
dflet 0:547251f42a60 138 static uint16_t loopback_port = 0;
dflet 0:547251f42a60 139 static bool pending_trigs = false;
dflet 0:547251f42a60 140
dflet 0:547251f42a60 141 static int32_t loopb_trigger(void)
dflet 0:547251f42a60 142 {
dflet 0:547251f42a60 143 uint8_t ip_addr[] = {127,0,0,1};
dflet 0:547251f42a60 144 int32_t rv = 0;
dflet 0:547251f42a60 145
dflet 0:547251f42a60 146 if((-1 != loopb_net) && (false == pending_trigs)) {
dflet 0:547251f42a60 147 rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN,
dflet 0:547251f42a60 148 loopback_port, ip_addr, 4);
dflet 0:547251f42a60 149 if(0 == rv)
dflet 0:547251f42a60 150 pending_trigs = true;
dflet 0:547251f42a60 151 }
dflet 0:547251f42a60 152
dflet 0:547251f42a60 153 return rv;
dflet 0:547251f42a60 154 }
dflet 0:547251f42a60 155
dflet 0:547251f42a60 156 static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err)
dflet 0:547251f42a60 157 {
dflet 0:547251f42a60 158 DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net);
dflet 0:547251f42a60 159
dflet 0:547251f42a60 160 net_ops->close(cl_ctx->net);
dflet 0:547251f42a60 161 cl_ctx->net = -1;
dflet 0:547251f42a60 162
dflet 0:547251f42a60 163 if(cl_ctx->usr)
dflet 0:547251f42a60 164 usr_cbs->on_cl_net_close(cl_ctx->usr, due2err);
dflet 0:547251f42a60 165
dflet 0:547251f42a60 166 used_ctxs_remove(cl_ctx);
dflet 0:547251f42a60 167 cl_ctx_free(cl_ctx);
dflet 0:547251f42a60 168 }
dflet 0:547251f42a60 169
dflet 0:547251f42a60 170 static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err)
dflet 0:547251f42a60 171 {
dflet 0:547251f42a60 172 if(due2err)
dflet 0:547251f42a60 173 cl_ctx->flags |= NW_CONN_ERROR_FLAG;
dflet 0:547251f42a60 174
dflet 0:547251f42a60 175 cl_ctx->flags |= NETWORK_CLOSE_FLAG;
dflet 0:547251f42a60 176
dflet 0:547251f42a60 177 loopb_trigger();
dflet 0:547251f42a60 178 }
dflet 0:547251f42a60 179
dflet 0:547251f42a60 180 static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len)
dflet 0:547251f42a60 181 {
dflet 0:547251f42a60 182 int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL);
dflet 0:547251f42a60 183 if(rv <= 0) {
dflet 0:547251f42a60 184 do_net_close_tx(cl_ctx, true);
dflet 0:547251f42a60 185 rv = MQP_ERR_NETWORK;
dflet 0:547251f42a60 186 }
dflet 0:547251f42a60 187
dflet 0:547251f42a60 188 USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r",
dflet 0:547251f42a60 189 *buf, len, cl_ctx->net, rv? "Sent" : "Fail");
dflet 0:547251f42a60 190 return rv;
dflet 0:547251f42a60 191 }
dflet 0:547251f42a60 192
dflet 0:547251f42a60 193 static
dflet 0:547251f42a60 194 int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos,
dflet 0:547251f42a60 195 bool has_vh, uint16_t vh_data)
dflet 0:547251f42a60 196 {
dflet 0:547251f42a60 197 uint8_t buf[4];
dflet 0:547251f42a60 198 uint32_t len = 2;
dflet 0:547251f42a60 199
dflet 0:547251f42a60 200 if(false == had_rcvd_conn_msg(cl_ctx))
dflet 0:547251f42a60 201 return MQP_ERR_NOTCONN;
dflet 0:547251f42a60 202
dflet 0:547251f42a60 203 buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false));
dflet 0:547251f42a60 204 buf[1] = has_vh ? 2 : 0;
dflet 0:547251f42a60 205
dflet 0:547251f42a60 206 if(has_vh)
dflet 0:547251f42a60 207 len += buf_wr_nbo_2B(buf + 2, vh_data);
dflet 0:547251f42a60 208
dflet 0:547251f42a60 209 return cl_ctx_send(cl_ctx, buf, len);
dflet 0:547251f42a60 210 }
dflet 0:547251f42a60 211
dflet 0:547251f42a60 212 static
dflet 0:547251f42a60 213 int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
dflet 0:547251f42a60 214 uint16_t vh_data)
dflet 0:547251f42a60 215 {
dflet 0:547251f42a60 216 struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl;
dflet 0:547251f42a60 217
dflet 0:547251f42a60 218 return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos,
dflet 0:547251f42a60 219 has_vh, vh_data) : -1;
dflet 0:547251f42a60 220 }
dflet 0:547251f42a60 221
dflet 0:547251f42a60 222 int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
dflet 0:547251f42a60 223 uint16_t vh_data)
dflet 0:547251f42a60 224 {
dflet 0:547251f42a60 225 return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
dflet 0:547251f42a60 226 }
dflet 0:547251f42a60 227
dflet 0:547251f42a60 228 int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos,
dflet 0:547251f42a60 229 bool has_vh, uint16_t vh_data)
dflet 0:547251f42a60 230 {
dflet 0:547251f42a60 231 int32_t rv;
dflet 0:547251f42a60 232
dflet 0:547251f42a60 233 MUTEX_LOCKIN();
dflet 0:547251f42a60 234 rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
dflet 0:547251f42a60 235 MUTEX_UNLOCK();
dflet 0:547251f42a60 236
dflet 0:547251f42a60 237 return rv;
dflet 0:547251f42a60 238 }
dflet 0:547251f42a60 239
dflet 0:547251f42a60 240 int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf)
dflet 0:547251f42a60 241 {
dflet 0:547251f42a60 242 struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl;
dflet 0:547251f42a60 243
dflet 0:547251f42a60 244 int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0,
dflet 0:547251f42a60 245 true, (vh_buf[0] << 8) | vh_buf[1]);
dflet 0:547251f42a60 246
dflet 0:547251f42a60 247 if((rv > 0) && (0x00 != vh_buf[1]))
dflet 0:547251f42a60 248 do_net_close_tx(cl_ctx, true);
dflet 0:547251f42a60 249
dflet 0:547251f42a60 250 return rv;
dflet 0:547251f42a60 251 }
dflet 0:547251f42a60 252
dflet 0:547251f42a60 253 static
dflet 0:547251f42a60 254 int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
dflet 0:547251f42a60 255 {
dflet 0:547251f42a60 256 int32_t rv = 0;
dflet 0:547251f42a60 257 uint8_t *buf = MQP_FHEADER_BUF(mqp);
dflet 0:547251f42a60 258
dflet 0:547251f42a60 259 if(dup)
dflet 0:547251f42a60 260 *buf |= DUP_FLAG_VAL(true);
dflet 0:547251f42a60 261
dflet 0:547251f42a60 262 rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp));
dflet 0:547251f42a60 263
dflet 0:547251f42a60 264 *buf &= ~DUP_FLAG_VAL(true);
dflet 0:547251f42a60 265
dflet 0:547251f42a60 266 return rv;
dflet 0:547251f42a60 267 }
dflet 0:547251f42a60 268
dflet 0:547251f42a60 269 int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
dflet 0:547251f42a60 270 {
dflet 0:547251f42a60 271 return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
dflet 0:547251f42a60 272 }
dflet 0:547251f42a60 273
dflet 0:547251f42a60 274 int32_t
dflet 0:547251f42a60 275 mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
dflet 0:547251f42a60 276 {
dflet 0:547251f42a60 277 int32_t rv;
dflet 0:547251f42a60 278
dflet 0:547251f42a60 279 MUTEX_LOCKIN();
dflet 0:547251f42a60 280 rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
dflet 0:547251f42a60 281 MUTEX_UNLOCK();
dflet 0:547251f42a60 282
dflet 0:547251f42a60 283 return rv;
dflet 0:547251f42a60 284 }
dflet 0:547251f42a60 285
dflet 0:547251f42a60 286 #define MQP_MAX_TOPICS 16
dflet 0:547251f42a60 287 #define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2)
dflet 0:547251f42a60 288
dflet 0:547251f42a60 289 static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs,
dflet 0:547251f42a60 290 uint32_t pay_len, uint16_t msg_id)
dflet 0:547251f42a60 291 {
dflet 0:547251f42a60 292 uint8_t *ref = buf += MAX_FH_LEN;
dflet 0:547251f42a60 293
dflet 0:547251f42a60 294 if(MQP_SUBACK_PAY_OFS != pay_ofs)
dflet 0:547251f42a60 295 return MQP_ERR_PKT_LEN;
dflet 0:547251f42a60 296
dflet 0:547251f42a60 297 buf += buf_wr_nbo_2B(buf, msg_id);
dflet 0:547251f42a60 298 ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES,
dflet 0:547251f42a60 299 pay_len + buf - ref);
dflet 0:547251f42a60 300
dflet 0:547251f42a60 301 ref -= 1;
dflet 0:547251f42a60 302 *ref = MAKE_FH_BYTE1(MQTT_SUBACK,
dflet 0:547251f42a60 303 MAKE_FH_FLAGS(false, MQTT_QOS0, false));
dflet 0:547251f42a60 304
dflet 0:547251f42a60 305 return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref);
dflet 0:547251f42a60 306 }
dflet 0:547251f42a60 307
dflet 0:547251f42a60 308 static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:547251f42a60 309 {
dflet 0:547251f42a60 310 return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id);
dflet 0:547251f42a60 311 }
dflet 0:547251f42a60 312
dflet 0:547251f42a60 313 /*----------------------------------------------------------------------------
dflet 0:547251f42a60 314 * Receive Routines
dflet 0:547251f42a60 315 *----------------------------------------------------------------------------
dflet 0:547251f42a60 316 */
dflet 0:547251f42a60 317
dflet 0:547251f42a60 318 /* Candidate to be moved to mqtt_common.c file */
dflet 0:547251f42a60 319 static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 320 {
dflet 0:547251f42a60 321 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
dflet 0:547251f42a60 322
dflet 0:547251f42a60 323 if(mqp_raw->pl_len < 2)
dflet 0:547251f42a60 324 return false; /* Bytes for MSG ID not available */
dflet 0:547251f42a60 325
dflet 0:547251f42a60 326 buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
dflet 0:547251f42a60 327 mqp_raw->vh_len += 2;
dflet 0:547251f42a60 328 mqp_raw->pl_len -= 2;
dflet 0:547251f42a60 329
dflet 0:547251f42a60 330 return true;
dflet 0:547251f42a60 331 }
dflet 0:547251f42a60 332
dflet 0:547251f42a60 333 #define BRK_IF_RD_ERR_UTF8(buf, end, utf8) \
dflet 0:547251f42a60 334 if(rd_buf_utf8(buf, end, utf8) < 0) \
dflet 0:547251f42a60 335 break;
dflet 0:547251f42a60 336
dflet 0:547251f42a60 337 static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos)
dflet 0:547251f42a60 338 {
dflet 0:547251f42a60 339 struct utf8_string utf8;
dflet 0:547251f42a60 340 uint8_t *ref = buf;
dflet 0:547251f42a60 341
dflet 0:547251f42a60 342 buf += mqp_buf_rd_utf8(buf, end, &utf8);
dflet 0:547251f42a60 343
dflet 0:547251f42a60 344 /* Assess that UTF8 has been read and QOS can be read */
dflet 0:547251f42a60 345 if((buf > ref) && (end > buf)) {
dflet 0:547251f42a60 346 utf8_qos->buffer = utf8.buffer;
dflet 0:547251f42a60 347 utf8_qos->length = utf8.length;
dflet 0:547251f42a60 348 utf8_qos->qosreq = (enum mqtt_qos)*buf++;
dflet 0:547251f42a60 349
dflet 0:547251f42a60 350 return buf - ref;
dflet 0:547251f42a60 351 }
dflet 0:547251f42a60 352
dflet 0:547251f42a60 353 return -1;
dflet 0:547251f42a60 354 }
dflet 0:547251f42a60 355
dflet 0:547251f42a60 356 static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw,
dflet 0:547251f42a60 357 struct utf8_strqos *qos_topics, uint32_t *n_topics)
dflet 0:547251f42a60 358 {
dflet 0:547251f42a60 359 uint8_t *buf, *end;
dflet 0:547251f42a60 360 uint32_t i = 0;
dflet 0:547251f42a60 361
dflet 0:547251f42a60 362 if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
dflet 0:547251f42a60 363 return false; /* Problem in contents received from client */
dflet 0:547251f42a60 364
dflet 0:547251f42a60 365 buf = MQP_PAYLOAD_BUF(mqp_raw);
dflet 0:547251f42a60 366 end = buf + mqp_raw->pl_len;
dflet 0:547251f42a60 367
dflet 0:547251f42a60 368 for(i = 0; (i < *n_topics) && (buf < end); i++) {
dflet 0:547251f42a60 369 struct utf8_strqos *qos_top = qos_topics + i;
dflet 0:547251f42a60 370 int32_t len = buf_rd_utf8_qos(buf, end, qos_top);
dflet 0:547251f42a60 371 if(len < 0)
dflet 0:547251f42a60 372 break; /* Failed to read Topic */
dflet 0:547251f42a60 373
dflet 0:547251f42a60 374 buf += len;
dflet 0:547251f42a60 375 }
dflet 0:547251f42a60 376
dflet 0:547251f42a60 377 *n_topics = i;
dflet 0:547251f42a60 378
dflet 0:547251f42a60 379 return ((0 == i) || (buf != end))? false : true;
dflet 0:547251f42a60 380 }
dflet 0:547251f42a60 381
dflet 0:547251f42a60 382 static
dflet 0:547251f42a60 383 bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 384 {
dflet 0:547251f42a60 385 uint32_t n_topics = MQP_MAX_TOPICS;
dflet 0:547251f42a60 386 uint16_t msg_id;
dflet 0:547251f42a60 387
dflet 0:547251f42a60 388 struct utf8_strqos qos_topics[MQP_MAX_TOPICS];
dflet 0:547251f42a60 389 uint8_t ack[MQP_MAX_TOPICS + MQP_SUBACK_PAY_OFS];
dflet 0:547251f42a60 390
dflet 0:547251f42a60 391 if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics))
dflet 0:547251f42a60 392 return false;
dflet 0:547251f42a60 393
dflet 0:547251f42a60 394 msg_id = mqp_raw->msg_id;
dflet 0:547251f42a60 395
dflet 0:547251f42a60 396 /* All topics have been now put in array, pass-on info to upper layer */
dflet 0:547251f42a60 397 if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics,
dflet 0:547251f42a60 398 msg_id, ack + MQP_SUBACK_PAY_OFS)) {
dflet 0:547251f42a60 399
dflet 0:547251f42a60 400 sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id);
dflet 0:547251f42a60 401
dflet 0:547251f42a60 402 return true;
dflet 0:547251f42a60 403 }
dflet 0:547251f42a60 404
dflet 0:547251f42a60 405 return false;
dflet 0:547251f42a60 406 }
dflet 0:547251f42a60 407
dflet 0:547251f42a60 408
dflet 0:547251f42a60 409 static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw,
dflet 0:547251f42a60 410 struct utf8_string *topics, uint32_t *n_topics)
dflet 0:547251f42a60 411 {
dflet 0:547251f42a60 412 uint8_t *buf, *end;
dflet 0:547251f42a60 413 uint32_t i = 0;
dflet 0:547251f42a60 414
dflet 0:547251f42a60 415 if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
dflet 0:547251f42a60 416 return false; /* Problem in contents received from client */
dflet 0:547251f42a60 417
dflet 0:547251f42a60 418 buf = MQP_PAYLOAD_BUF(mqp_raw);
dflet 0:547251f42a60 419 end = buf + mqp_raw->pl_len;
dflet 0:547251f42a60 420
dflet 0:547251f42a60 421 for(i = 0; (i < *n_topics) && (buf < end); i++) {
dflet 0:547251f42a60 422 struct utf8_string *topic = topics + i;
dflet 0:547251f42a60 423 int32_t len = mqp_buf_rd_utf8(buf, end, topic);
dflet 0:547251f42a60 424 if(len < 0)
dflet 0:547251f42a60 425 break; /* Failed to read Topic */
dflet 0:547251f42a60 426
dflet 0:547251f42a60 427 buf += len;
dflet 0:547251f42a60 428 }
dflet 0:547251f42a60 429
dflet 0:547251f42a60 430 *n_topics = i;
dflet 0:547251f42a60 431
dflet 0:547251f42a60 432 return ((0 == i) || (buf != end))? false : true;
dflet 0:547251f42a60 433 }
dflet 0:547251f42a60 434
dflet 0:547251f42a60 435 static
dflet 0:547251f42a60 436 bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 437 {
dflet 0:547251f42a60 438 uint32_t n_topics = MQP_MAX_TOPICS;
dflet 0:547251f42a60 439 uint16_t msg_id;
dflet 0:547251f42a60 440
dflet 0:547251f42a60 441 struct utf8_string topics[MQP_MAX_TOPICS];
dflet 0:547251f42a60 442
dflet 0:547251f42a60 443 if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics))
dflet 0:547251f42a60 444 return false;
dflet 0:547251f42a60 445
dflet 0:547251f42a60 446 msg_id = mqp_raw->msg_id;
dflet 0:547251f42a60 447
dflet 0:547251f42a60 448 /* All topics have been now put in array, pass-on info to upper layer */
dflet 0:547251f42a60 449 if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) {
dflet 0:547251f42a60 450 unsub_ack_send(cl_ctx, msg_id);
dflet 0:547251f42a60 451 return true;
dflet 0:547251f42a60 452 }
dflet 0:547251f42a60 453
dflet 0:547251f42a60 454 return false;
dflet 0:547251f42a60 455 }
dflet 0:547251f42a60 456
dflet 0:547251f42a60 457 static bool proc_pingreq_rx(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 458 {
dflet 0:547251f42a60 459 vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00);
dflet 0:547251f42a60 460 return true;
dflet 0:547251f42a60 461 }
dflet 0:547251f42a60 462
dflet 0:547251f42a60 463 static bool proc_disconn_rx(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 464 {
dflet 0:547251f42a60 465 do_net_close_rx(cl_ctx, false);
dflet 0:547251f42a60 466 return true;
dflet 0:547251f42a60 467 }
dflet 0:547251f42a60 468
dflet 0:547251f42a60 469 static
dflet 0:547251f42a60 470 bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 471 {
dflet 0:547251f42a60 472 bool rv = mqp_proc_pub_rx(mqp_raw);
dflet 0:547251f42a60 473 uint8_t B = mqp_raw->fh_byte1;
dflet 0:547251f42a60 474 enum mqtt_qos qos = ENUM_QOS(B);
dflet 0:547251f42a60 475 struct utf8_string topic;
dflet 0:547251f42a60 476 uint16_t msg_id = 0;
dflet 0:547251f42a60 477
dflet 0:547251f42a60 478 if(false == rv)
dflet 0:547251f42a60 479 return rv; /* Didn't get a good PUB Packet */
dflet 0:547251f42a60 480
dflet 0:547251f42a60 481 msg_id = mqp_raw->msg_id;
dflet 0:547251f42a60 482
dflet 0:547251f42a60 483 topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw);
dflet 0:547251f42a60 484 topic.length = MQP_PUB_TOP_LEN(mqp_raw);
dflet 0:547251f42a60 485
dflet 0:547251f42a60 486 rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic,
dflet 0:547251f42a60 487 MQP_PUB_PAY_BUF(mqp_raw),
dflet 0:547251f42a60 488 MQP_PUB_PAY_LEN(mqp_raw),
dflet 0:547251f42a60 489 msg_id, BOOL_DUP(B), qos,
dflet 0:547251f42a60 490 BOOL_RETAIN(B));
dflet 0:547251f42a60 491 if(false == rv)
dflet 0:547251f42a60 492 return rv;
dflet 0:547251f42a60 493
dflet 0:547251f42a60 494 if(MQTT_QOS1 == qos)
dflet 0:547251f42a60 495 vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id);
dflet 0:547251f42a60 496
dflet 0:547251f42a60 497 if(MQTT_QOS2 == qos)
dflet 0:547251f42a60 498 vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id);
dflet 0:547251f42a60 499
dflet 0:547251f42a60 500 return rv;
dflet 0:547251f42a60 501 }
dflet 0:547251f42a60 502
dflet 0:547251f42a60 503 static
dflet 0:547251f42a60 504 bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 505 {
dflet 0:547251f42a60 506 if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) ||
dflet 0:547251f42a60 507 (false == usr_cbs->ack_notify(cl_ctx->usr,
dflet 0:547251f42a60 508 mqp_raw->msg_type,
dflet 0:547251f42a60 509 mqp_raw->msg_id)))
dflet 0:547251f42a60 510 return false;
dflet 0:547251f42a60 511
dflet 0:547251f42a60 512 return true;
dflet 0:547251f42a60 513 }
dflet 0:547251f42a60 514
dflet 0:547251f42a60 515 #define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD
dflet 0:547251f42a60 516 //#define KA_TIMEOUT_NONE (0xFFFFFFFF)
dflet 0:547251f42a60 517
dflet 0:547251f42a60 518 static void rx_timeout_update(struct client_ctx *cl_ctx)
dflet 0:547251f42a60 519 {
dflet 0:547251f42a60 520 if(false == had_rcvd_conn_msg(cl_ctx))
dflet 0:547251f42a60 521 return;
dflet 0:547251f42a60 522
dflet 0:547251f42a60 523 cl_ctx_timeout_update(cl_ctx, net_ops->time());
dflet 0:547251f42a60 524
dflet 0:547251f42a60 525 used_ctxs_remove(cl_ctx);
dflet 0:547251f42a60 526 used_ctxs_insert(cl_ctx);
dflet 0:547251f42a60 527
dflet 0:547251f42a60 528 return;
dflet 0:547251f42a60 529 }
dflet 0:547251f42a60 530
dflet 0:547251f42a60 531 static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver)
dflet 0:547251f42a60 532 {
dflet 0:547251f42a60 533 const char *buf = utf8->buffer;
dflet 0:547251f42a60 534
dflet 0:547251f42a60 535 /* Check for protocol version 3.1.1 */
dflet 0:547251f42a60 536 if((4 == utf8->length) &&
dflet 0:547251f42a60 537 (buf[0] == 'M') &&
dflet 0:547251f42a60 538 (buf[1] == 'Q') &&
dflet 0:547251f42a60 539 (buf[2] == 'T') &&
dflet 0:547251f42a60 540 (buf[3] == 'T') &&
dflet 0:547251f42a60 541 (0x04 == ver))
dflet 0:547251f42a60 542 return true;
dflet 0:547251f42a60 543
dflet 0:547251f42a60 544 /* Check for protocol version 3.1 */
dflet 0:547251f42a60 545 if((6 == utf8->length) &&
dflet 0:547251f42a60 546 (buf[0] == 'M') &&
dflet 0:547251f42a60 547 (buf[1] == 'Q') &&
dflet 0:547251f42a60 548 (buf[2] == 'I') &&
dflet 0:547251f42a60 549 (buf[3] == 's') &&
dflet 0:547251f42a60 550 (buf[4] == 'd') &&
dflet 0:547251f42a60 551 (buf[5] == 'p') &&
dflet 0:547251f42a60 552 (0x03 == ver))
dflet 0:547251f42a60 553 return true;
dflet 0:547251f42a60 554
dflet 0:547251f42a60 555 return false;
dflet 0:547251f42a60 556 }
dflet 0:547251f42a60 557
dflet 0:547251f42a60 558 static uint16_t proc_connect_vh_rx(struct mqtt_packet *mqp_raw,
dflet 0:547251f42a60 559 uint8_t *conn_flags, uint16_t *ka_secs)
dflet 0:547251f42a60 560 {
dflet 0:547251f42a60 561 struct utf8_string utf8;
dflet 0:547251f42a60 562 uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw);
dflet 0:547251f42a60 563 uint8_t *end = buf + mqp_raw->pl_len;
dflet 0:547251f42a60 564 uint8_t *ref = buf;
dflet 0:547251f42a60 565
dflet 0:547251f42a60 566 buf += mqp_buf_rd_utf8(buf, end, &utf8);
dflet 0:547251f42a60 567 if(end - buf < 1)
dflet 0:547251f42a60 568 return CONNACK_RC_BAD_PROTOV; /* No proto ver */
dflet 0:547251f42a60 569
dflet 0:547251f42a60 570 if(false == proc_protocol_info(&utf8, *buf++))
dflet 0:547251f42a60 571 return CONNACK_RC_BAD_PROTOV;
dflet 0:547251f42a60 572
dflet 0:547251f42a60 573 *conn_flags = *buf++;
dflet 0:547251f42a60 574
dflet 0:547251f42a60 575 if(end - buf < 2)
dflet 0:547251f42a60 576 return 0xFF; /* Bad packet composition */
dflet 0:547251f42a60 577
dflet 0:547251f42a60 578 *ka_secs = (buf[0] << 8) | buf[1];
dflet 0:547251f42a60 579 buf += 2;
dflet 0:547251f42a60 580 *ka_secs += *ka_secs >> 1;
dflet 0:547251f42a60 581
dflet 0:547251f42a60 582 mqp_raw->vh_len = buf - ref;
dflet 0:547251f42a60 583 mqp_raw->pl_len -= buf - ref;
dflet 0:547251f42a60 584
dflet 0:547251f42a60 585 return 0;
dflet 0:547251f42a60 586 }
dflet 0:547251f42a60 587
dflet 0:547251f42a60 588 #define RET_IF_RD_CONN_ERROR(buf, end, utf8) \
dflet 0:547251f42a60 589 { \
dflet 0:547251f42a60 590 int32_t len = mqp_buf_rd_utf8(buf, end, utf8); \
dflet 0:547251f42a60 591 if(len < 0) \
dflet 0:547251f42a60 592 return 0x00FF; \
dflet 0:547251f42a60 593 \
dflet 0:547251f42a60 594 buf += len; \
dflet 0:547251f42a60 595 }
dflet 0:547251f42a60 596
dflet 0:547251f42a60 597 uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags,
dflet 0:547251f42a60 598 struct utf8_string *free_utf8s,
dflet 0:547251f42a60 599 struct utf8_string **used_refs)
dflet 0:547251f42a60 600 {
dflet 0:547251f42a60 601 struct utf8_string *utf8;
dflet 0:547251f42a60 602
dflet 0:547251f42a60 603 utf8 = used_refs[0] = free_utf8s + 0;
dflet 0:547251f42a60 604 RET_IF_RD_CONN_ERROR(buf, end, utf8);
dflet 0:547251f42a60 605
dflet 0:547251f42a60 606 if(conn_flags & WILL_CONFIG_VAL) {
dflet 0:547251f42a60 607 utf8 = used_refs[1] = free_utf8s + 1;
dflet 0:547251f42a60 608 RET_IF_RD_CONN_ERROR(buf, end, utf8);
dflet 0:547251f42a60 609
dflet 0:547251f42a60 610 utf8 = used_refs[2] = free_utf8s + 2;
dflet 0:547251f42a60 611 RET_IF_RD_CONN_ERROR(buf, end, utf8);
dflet 0:547251f42a60 612 }
dflet 0:547251f42a60 613
dflet 0:547251f42a60 614 if((0 == (conn_flags & USER_NAME_OPVAL)) &&
dflet 0:547251f42a60 615 (0 != (conn_flags & PASS_WORD_OPVAL)))
dflet 0:547251f42a60 616 return 0x00FF; /* Bad combination */
dflet 0:547251f42a60 617
dflet 0:547251f42a60 618 if(conn_flags & USER_NAME_OPVAL) {
dflet 0:547251f42a60 619 utf8 = used_refs[3] = free_utf8s + 3;
dflet 0:547251f42a60 620 RET_IF_RD_CONN_ERROR(buf, end, utf8);
dflet 0:547251f42a60 621 }
dflet 0:547251f42a60 622
dflet 0:547251f42a60 623 if(conn_flags & PASS_WORD_OPVAL) {
dflet 0:547251f42a60 624 utf8 = used_refs[4] = free_utf8s + 4;
dflet 0:547251f42a60 625 RET_IF_RD_CONN_ERROR(buf, end, utf8);
dflet 0:547251f42a60 626 }
dflet 0:547251f42a60 627
dflet 0:547251f42a60 628 return 0;
dflet 0:547251f42a60 629 }
dflet 0:547251f42a60 630
dflet 0:547251f42a60 631 static
dflet 0:547251f42a60 632 bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 633 {
dflet 0:547251f42a60 634 struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL};
dflet 0:547251f42a60 635 struct utf8_string free_utf8s[5];
dflet 0:547251f42a60 636 uint8_t conn_flags, *buf, *end;
dflet 0:547251f42a60 637 bool clean_session = false;
dflet 0:547251f42a60 638 uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */
dflet 0:547251f42a60 639
dflet 0:547251f42a60 640 set_rcvd_conn_msg(cl_ctx);
dflet 0:547251f42a60 641
dflet 0:547251f42a60 642 ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs);
dflet 0:547251f42a60 643 if(ack_vh16)
dflet 0:547251f42a60 644 goto proc_connect_rx_exit1;
dflet 0:547251f42a60 645
dflet 0:547251f42a60 646 buf = MQP_PAYLOAD_BUF(mqp_raw);
dflet 0:547251f42a60 647 end = buf + mqp_raw->pl_len;
dflet 0:547251f42a60 648 ack_vh16 = proc_connect_pl_rx(buf, end, conn_flags,
dflet 0:547251f42a60 649 free_utf8s, used_refs);
dflet 0:547251f42a60 650 if(ack_vh16)
dflet 0:547251f42a60 651 goto proc_connect_rx_exit1;
dflet 0:547251f42a60 652
dflet 0:547251f42a60 653 clean_session = (conn_flags & CLEAN_START_VAL)? true : false;
dflet 0:547251f42a60 654 ack_vh16 = (!used_refs[0]->length && !clean_session)?
dflet 0:547251f42a60 655 CONNACK_RC_CLI_REJECT : 0; /* Validate 0 byte Client ID */
dflet 0:547251f42a60 656 if(ack_vh16)
dflet 0:547251f42a60 657 goto proc_connect_rx_exit1;
dflet 0:547251f42a60 658
dflet 0:547251f42a60 659 /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */
dflet 0:547251f42a60 660 ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0],
dflet 0:547251f42a60 661 &cl_ctx->usr);
dflet 0:547251f42a60 662 proc_connect_rx_exit1:
dflet 0:547251f42a60 663
dflet 0:547251f42a60 664 DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16,
dflet 0:547251f42a60 665 ack_vh16 & 0xFF? "error" : "good");
dflet 0:547251f42a60 666
dflet 0:547251f42a60 667 if(0xFF != (ack_vh16 & 0xFF))
dflet 0:547251f42a60 668 vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16);
dflet 0:547251f42a60 669
dflet 0:547251f42a60 670 if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) {
dflet 0:547251f42a60 671 rx_timeout_update(cl_ctx);
dflet 0:547251f42a60 672 usr_cbs->on_connack_send(cl_ctx->usr, clean_session);
dflet 0:547251f42a60 673 } else {
dflet 0:547251f42a60 674 return false;
dflet 0:547251f42a60 675 }
dflet 0:547251f42a60 676
dflet 0:547251f42a60 677 return true;
dflet 0:547251f42a60 678 }
dflet 0:547251f42a60 679
dflet 0:547251f42a60 680
dflet 0:547251f42a60 681 static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list)
dflet 0:547251f42a60 682 {
dflet 0:547251f42a60 683 int32_t i = 0;
dflet 0:547251f42a60 684
dflet 0:547251f42a60 685 for(i = 0; (i < size) && (NULL != list); i++, list = list->next)
dflet 0:547251f42a60 686 recv_hvec[i] = list->net;
dflet 0:547251f42a60 687
dflet 0:547251f42a60 688 recv_hvec[i] = -1;
dflet 0:547251f42a60 689
dflet 0:547251f42a60 690 return;
dflet 0:547251f42a60 691 }
dflet 0:547251f42a60 692
dflet 0:547251f42a60 693 static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:547251f42a60 694 {
dflet 0:547251f42a60 695 uint8_t msg_type = mqp_raw->msg_type;
dflet 0:547251f42a60 696 bool rv = false;
dflet 0:547251f42a60 697
dflet 0:547251f42a60 698 USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r",
dflet 0:547251f42a60 699 mqp_raw->fh_byte1, cl_ctx->net);
dflet 0:547251f42a60 700
dflet 0:547251f42a60 701 if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx))
dflet 0:547251f42a60 702 goto process_recv_exit1; /* Proto Violation */
dflet 0:547251f42a60 703
dflet 0:547251f42a60 704 rx_timeout_update(cl_ctx);
dflet 0:547251f42a60 705
dflet 0:547251f42a60 706 switch(msg_type) {
dflet 0:547251f42a60 707
dflet 0:547251f42a60 708 case MQTT_CONNECT:
dflet 0:547251f42a60 709 rv = proc_connect_rx(cl_ctx, mqp_raw);
dflet 0:547251f42a60 710 break;
dflet 0:547251f42a60 711
dflet 0:547251f42a60 712 case MQTT_DISCONNECT:
dflet 0:547251f42a60 713 rv = proc_disconn_rx(cl_ctx);
dflet 0:547251f42a60 714 break;
dflet 0:547251f42a60 715
dflet 0:547251f42a60 716 case MQTT_SUBSCRIBE:
dflet 0:547251f42a60 717 rv = proc_sub_msg_rx(cl_ctx, mqp_raw);
dflet 0:547251f42a60 718 break;
dflet 0:547251f42a60 719
dflet 0:547251f42a60 720 case MQTT_UNSUBSCRIBE:
dflet 0:547251f42a60 721 rv = proc_unsub_msg_rx(cl_ctx, mqp_raw);
dflet 0:547251f42a60 722 break;
dflet 0:547251f42a60 723
dflet 0:547251f42a60 724 case MQTT_PINGREQ:
dflet 0:547251f42a60 725 rv = proc_pingreq_rx(cl_ctx);
dflet 0:547251f42a60 726 break;
dflet 0:547251f42a60 727
dflet 0:547251f42a60 728 case MQTT_PUBLISH:
dflet 0:547251f42a60 729 rv = proc_pub_msg_rx(cl_ctx, mqp_raw);
dflet 0:547251f42a60 730 break;
dflet 0:547251f42a60 731
dflet 0:547251f42a60 732 case MQTT_PUBACK:
dflet 0:547251f42a60 733 case MQTT_PUBREC:
dflet 0:547251f42a60 734 case MQTT_PUBREL:
dflet 0:547251f42a60 735 case MQTT_PUBCOMP:
dflet 0:547251f42a60 736 rv = proc_ack_msg_rx(cl_ctx, mqp_raw);
dflet 0:547251f42a60 737 break;
dflet 0:547251f42a60 738
dflet 0:547251f42a60 739 default:
dflet 0:547251f42a60 740 break;
dflet 0:547251f42a60 741 }
dflet 0:547251f42a60 742
dflet 0:547251f42a60 743 process_recv_exit1:
dflet 0:547251f42a60 744 DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r",
dflet 0:547251f42a60 745 mqp_raw->msg_id, rv? "Good" : "Fail");
dflet 0:547251f42a60 746
dflet 0:547251f42a60 747 return rv;
dflet 0:547251f42a60 748 }
dflet 0:547251f42a60 749
dflet 0:547251f42a60 750
dflet 0:547251f42a60 751 /* Terminate net connections which haven't received PKTs beyond expected time.
dflet 0:547251f42a60 752 Caller must ensure atomic enviroment for execution of this routine.
dflet 0:547251f42a60 753 */
dflet 0:547251f42a60 754 static void ka_sequence(uint32_t *secs2wait)
dflet 0:547251f42a60 755 {
dflet 0:547251f42a60 756 struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */
dflet 0:547251f42a60 757 uint32_t now_secs = net_ops->time();
dflet 0:547251f42a60 758
dflet 0:547251f42a60 759 while(NULL != cl_ctx) {
dflet 0:547251f42a60 760 struct client_ctx *next = cl_ctx->next;
dflet 0:547251f42a60 761 if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) {
dflet 0:547251f42a60 762 bool due2err = false;
dflet 0:547251f42a60 763 if(cl_ctx->flags & NW_CONN_ERROR_FLAG)
dflet 0:547251f42a60 764 due2err = true;
dflet 0:547251f42a60 765
dflet 0:547251f42a60 766 cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG |
dflet 0:547251f42a60 767 NETWORK_CLOSE_FLAG);
dflet 0:547251f42a60 768
dflet 0:547251f42a60 769 /* Close network: Timeout or TX err */
dflet 0:547251f42a60 770 do_net_close_rx(cl_ctx, due2err);
dflet 0:547251f42a60 771 }
dflet 0:547251f42a60 772
dflet 0:547251f42a60 773 cl_ctx = next;
dflet 0:547251f42a60 774 }
dflet 0:547251f42a60 775
dflet 0:547251f42a60 776 cl_ctx = used_ctxs;
dflet 0:547251f42a60 777 if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) ||
dflet 0:547251f42a60 778 ((NULL == cl_ctx)))
dflet 0:547251f42a60 779 *secs2wait = IO_MON_NO_TIMEOUT;
dflet 0:547251f42a60 780 else
dflet 0:547251f42a60 781 *secs2wait = cl_ctx->timeout - now_secs;
dflet 0:547251f42a60 782
dflet 0:547251f42a60 783 return;
dflet 0:547251f42a60 784 }
dflet 0:547251f42a60 785
dflet 0:547251f42a60 786 /* Put a new functiona name such as mk_new_ctx() or setup_ctx() and
dflet 0:547251f42a60 787 processing to restrict limit number of connections.
dflet 0:547251f42a60 788
dflet 0:547251f42a60 789 Return value as well.
dflet 0:547251f42a60 790 */
dflet 0:547251f42a60 791 static bool accept_ctx(int32_t net, uint32_t wait_secs)
dflet 0:547251f42a60 792 {
dflet 0:547251f42a60 793 struct client_ctx *cl_ctx = cl_ctx_alloc();
dflet 0:547251f42a60 794 if(NULL == cl_ctx)
dflet 0:547251f42a60 795 return false;
dflet 0:547251f42a60 796
dflet 0:547251f42a60 797 cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip,
dflet 0:547251f42a60 798 &cl_ctx->ip_length);
dflet 0:547251f42a60 799 if(-1 == cl_ctx->net) {
dflet 0:547251f42a60 800 cl_ctx_free(cl_ctx);
dflet 0:547251f42a60 801 USR_INFO("S: failed to accept new NW connection\n\r");
dflet 0:547251f42a60 802 return false;
dflet 0:547251f42a60 803 }
dflet 0:547251f42a60 804
dflet 0:547251f42a60 805 DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net);
dflet 0:547251f42a60 806
dflet 0:547251f42a60 807 /* Timeout to receive MQTT_CONNECT */
dflet 0:547251f42a60 808 cl_ctx->timeout = wait_secs + net_ops->time();
dflet 0:547251f42a60 809
dflet 0:547251f42a60 810 used_ctxs_insert(cl_ctx);
dflet 0:547251f42a60 811 return true;
dflet 0:547251f42a60 812 }
dflet 0:547251f42a60 813
dflet 0:547251f42a60 814 static struct client_ctx *net_cl_ctx_find(int32_t net)
dflet 0:547251f42a60 815 {
dflet 0:547251f42a60 816 struct client_ctx *cl_ctx = used_ctxs;
dflet 0:547251f42a60 817 while(cl_ctx) {
dflet 0:547251f42a60 818 if(net == cl_ctx->net)
dflet 0:547251f42a60 819 break;
dflet 0:547251f42a60 820
dflet 0:547251f42a60 821 cl_ctx = cl_ctx->next;
dflet 0:547251f42a60 822 }
dflet 0:547251f42a60 823
dflet 0:547251f42a60 824 if(NULL == cl_ctx)
dflet 0:547251f42a60 825 USR_INFO("Did not find ctx for net %d\n\r", net);
dflet 0:547251f42a60 826
dflet 0:547251f42a60 827 return cl_ctx;
dflet 0:547251f42a60 828 }
dflet 0:547251f42a60 829
dflet 0:547251f42a60 830 static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */
dflet 0:547251f42a60 831 static int32_t send_hvec = -1;
dflet 0:547251f42a60 832 static int32_t rsvd_hvec = -1;
dflet 0:547251f42a60 833 static int32_t listen_net = -1;
dflet 0:547251f42a60 834
dflet 0:547251f42a60 835 static struct mqtt_packet rx_mqp;
dflet 0:547251f42a60 836 static uint8_t rxb[MQP_SERVER_RX_LEN];
dflet 0:547251f42a60 837 static uint16_t listener_port = 0;
dflet 0:547251f42a60 838
dflet 0:547251f42a60 839 static inline
dflet 0:547251f42a60 840 int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out)
dflet 0:547251f42a60 841 {
dflet 0:547251f42a60 842 int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL);
dflet 0:547251f42a60 843 if(rv <= 0)
dflet 0:547251f42a60 844 rv = MQP_ERR_NETWORK;
dflet 0:547251f42a60 845
dflet 0:547251f42a60 846 return rv;
dflet 0:547251f42a60 847 }
dflet 0:547251f42a60 848
dflet 0:547251f42a60 849 static int32_t proc_loopback_recv(int32_t net)
dflet 0:547251f42a60 850 {
dflet 0:547251f42a60 851 uint8_t buf[LOOP_DLEN];
dflet 0:547251f42a60 852
dflet 0:547251f42a60 853 /* Thanks for waking-up thread and do nothing in this routine */
dflet 0:547251f42a60 854 int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0);
dflet 0:547251f42a60 855 pending_trigs = false;
dflet 0:547251f42a60 856
dflet 0:547251f42a60 857 if(rv <= 0) {
dflet 0:547251f42a60 858 net_ops->close(net);
dflet 0:547251f42a60 859 return MQP_ERR_LIBQUIT;
dflet 0:547251f42a60 860 }
dflet 0:547251f42a60 861
dflet 0:547251f42a60 862 return rv;
dflet 0:547251f42a60 863 }
dflet 0:547251f42a60 864
dflet 0:547251f42a60 865 static void proc_net_data_recv(int32_t net)
dflet 0:547251f42a60 866 {
dflet 0:547251f42a60 867 struct client_ctx *cl_ctx = net_cl_ctx_find(net);
dflet 0:547251f42a60 868 bool dummy;
dflet 0:547251f42a60 869 int32_t rv;
dflet 0:547251f42a60 870
dflet 0:547251f42a60 871 mqp_reset(&rx_mqp); /* Start w/ a clean buffer */
dflet 0:547251f42a60 872
dflet 0:547251f42a60 873 rv = net_recv(net, &rx_mqp, 0, &dummy);
dflet 0:547251f42a60 874 if(rv > 0)
dflet 0:547251f42a60 875 /* Working Principle: Only RX processing errors should be
dflet 0:547251f42a60 876 reported as 'false'. Status of TX as a follow-up to RX
dflet 0:547251f42a60 877 messages need not be reported by the xyz_rx() routines.
dflet 0:547251f42a60 878 Error observed in TX is either dealt in next iteration
dflet 0:547251f42a60 879 of RX loop.
dflet 0:547251f42a60 880 */
dflet 0:547251f42a60 881 if(false == process_recv(cl_ctx, &rx_mqp))
dflet 0:547251f42a60 882 rv = MQP_ERR_CONTENT;
dflet 0:547251f42a60 883
dflet 0:547251f42a60 884 if(rv < 0)
dflet 0:547251f42a60 885 do_net_close_rx(cl_ctx, rv);
dflet 0:547251f42a60 886 }
dflet 0:547251f42a60 887
dflet 0:547251f42a60 888 static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs)
dflet 0:547251f42a60 889 {
dflet 0:547251f42a60 890 bool rv = true;
dflet 0:547251f42a60 891 int32_t idx = 0;
dflet 0:547251f42a60 892
dflet 0:547251f42a60 893 MUTEX_LOCKIN();
dflet 0:547251f42a60 894
dflet 0:547251f42a60 895 for(idx = 0; (idx < n_hnds) && (rv == true); idx++) {
dflet 0:547251f42a60 896 int32_t net = recv_hvec[idx];
dflet 0:547251f42a60 897 if(net == listen_net) {
dflet 0:547251f42a60 898 rv = accept_ctx(listen_net, wait_secs);
dflet 0:547251f42a60 899 } else if(loopback_port && (net == loopb_net)) {
dflet 0:547251f42a60 900 if(proc_loopback_recv(loopb_net) < 0)
dflet 0:547251f42a60 901 rv = false;
dflet 0:547251f42a60 902 } else {
dflet 0:547251f42a60 903 proc_net_data_recv(net);
dflet 0:547251f42a60 904 }
dflet 0:547251f42a60 905 }
dflet 0:547251f42a60 906
dflet 0:547251f42a60 907 MUTEX_UNLOCK();
dflet 0:547251f42a60 908
dflet 0:547251f42a60 909 return rv;
dflet 0:547251f42a60 910 }
dflet 0:547251f42a60 911
dflet 0:547251f42a60 912 int32_t mqtt_server_run(uint32_t wait_secs) // TBD break into two functions
dflet 0:547251f42a60 913 {
dflet 0:547251f42a60 914 uint32_t secs2wait = 0;
dflet 0:547251f42a60 915 int32_t n_hnds = 0;
dflet 0:547251f42a60 916
dflet 0:547251f42a60 917 USR_INFO("S: MQTT Server Run invoked ....\n\r");
dflet 0:547251f42a60 918
dflet 0:547251f42a60 919 if(NULL == net_ops)
dflet 0:547251f42a60 920 return MQP_ERR_NET_OPS;
dflet 0:547251f42a60 921
dflet 0:547251f42a60 922 if(loopback_port) {
dflet 0:547251f42a60 923 loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL,
dflet 0:547251f42a60 924 loopback_port, NULL);
dflet 0:547251f42a60 925 if(-1 == loopb_net)
dflet 0:547251f42a60 926 return MQP_ERR_LIBQUIT;
dflet 0:547251f42a60 927 }
dflet 0:547251f42a60 928
dflet 0:547251f42a60 929 listen_net = net_ops->listen(0, listener_port, NULL);
dflet 0:547251f42a60 930 if(-1 == listen_net)
dflet 0:547251f42a60 931 return MQP_ERR_LIBQUIT;
dflet 0:547251f42a60 932
dflet 0:547251f42a60 933 do {
dflet 0:547251f42a60 934 int32_t *r_hvec = recv_hvec + 0;
dflet 0:547251f42a60 935
dflet 0:547251f42a60 936 *r_hvec++ = listen_net;
dflet 0:547251f42a60 937 if(loopback_port)
dflet 0:547251f42a60 938 *r_hvec++ = loopb_net;
dflet 0:547251f42a60 939
dflet 0:547251f42a60 940 /* MQTT Timeouts: close expired conns; get time to next expiry */
dflet 0:547251f42a60 941 ka_sequence(&secs2wait);
dflet 0:547251f42a60 942
dflet 0:547251f42a60 943 /* Prepare array of net handles. Must've atleast listen handle */
dflet 0:547251f42a60 944 // recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1, used_ctxs);
dflet 0:547251f42a60 945 recv_hvec_load(r_hvec, MAX_NWCONN + 1, used_ctxs);
dflet 0:547251f42a60 946
dflet 0:547251f42a60 947 n_hnds = net_ops->io_mon(recv_hvec, &send_hvec,
dflet 0:547251f42a60 948 &rsvd_hvec, secs2wait);
dflet 0:547251f42a60 949 if(n_hnds < 0)
dflet 0:547251f42a60 950 return MQP_ERR_LIBQUIT;
dflet 0:547251f42a60 951
dflet 0:547251f42a60 952 if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs))
dflet 0:547251f42a60 953 return MQP_ERR_LIBQUIT;
dflet 0:547251f42a60 954
dflet 0:547251f42a60 955 } while(1);
dflet 0:547251f42a60 956 }
dflet 0:547251f42a60 957
dflet 0:547251f42a60 958 int32_t mqtt_server_register_net_svc(const struct device_net_services *net)
dflet 0:547251f42a60 959 {
dflet 0:547251f42a60 960 if(net && net->io_mon && net->close && net->send &&
dflet 0:547251f42a60 961 net->recv && net->time && net->listen) {
dflet 0:547251f42a60 962 net_ops = net;
dflet 0:547251f42a60 963 return 0;
dflet 0:547251f42a60 964 }
dflet 0:547251f42a60 965
dflet 0:547251f42a60 966 return -1;
dflet 0:547251f42a60 967 }
dflet 0:547251f42a60 968
dflet 0:547251f42a60 969 int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg,
dflet 0:547251f42a60 970 const struct mqtt_server_msg_cbs *msg_cbs)
dflet 0:547251f42a60 971 {
dflet 0:547251f42a60 972 cl_ctx_init();
dflet 0:547251f42a60 973
dflet 0:547251f42a60 974 if((NULL == lib_cfg) ||
dflet 0:547251f42a60 975 (0 == lib_cfg->listener_port) ||
dflet 0:547251f42a60 976 (NULL == lib_cfg->debug_printf))
dflet 0:547251f42a60 977 return -1;
dflet 0:547251f42a60 978
dflet 0:547251f42a60 979 debug_printf = lib_cfg->debug_printf; /* Facilitate debug */
dflet 0:547251f42a60 980
dflet 0:547251f42a60 981 loopback_port = lib_cfg->loopback_port;
dflet 0:547251f42a60 982 listener_port = lib_cfg->listener_port;
dflet 0:547251f42a60 983
dflet 0:547251f42a60 984 mutex = lib_cfg->mutex;
dflet 0:547251f42a60 985 mutex_lockin = lib_cfg->mutex_lockin;
dflet 0:547251f42a60 986 mutex_unlock = lib_cfg->mutex_unlock;
dflet 0:547251f42a60 987
dflet 0:547251f42a60 988 aux_dbg_enbl = lib_cfg->aux_debug_en;
dflet 0:547251f42a60 989 debug_printf = lib_cfg->debug_printf;
dflet 0:547251f42a60 990
dflet 0:547251f42a60 991 usr_cbs = &usr_obj;
dflet 0:547251f42a60 992
dflet 0:547251f42a60 993 memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs));
dflet 0:547251f42a60 994
dflet 0:547251f42a60 995 mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0);
dflet 0:547251f42a60 996
dflet 0:547251f42a60 997 return 0;
dflet 0:547251f42a60 998 }
dflet 0:547251f42a60 999
dflet 0:547251f42a60 1000 }//namespace mbed_mqtt