Part of TI's mqtt

Dependents:   mqtt_V1 cc3100_Test_mqtt_CM3

Revision:
0:547251f42a60
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/server_pkts.cpp	Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,1000 @@
+/******************************************************************************
+*
+*   Copyright (C) 2014 Texas Instruments Incorporated
+*
+*   All rights reserved. Property of Texas Instruments Incorporated.
+*   Restricted rights to use, duplicate or disclose this code are
+*   granted through contract.
+*
+*   The program may not be used without the written permission of
+*   Texas Instruments Incorporated or against the terms and conditions
+*   stipulated in the agreement under which this program has been supplied,
+*   and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_pkts.h"
+
+namespace mbed_mqtt {
+
+/*-----------------------------------------------------------------------------
+ * Note: Do not create additional dependency of this file on any header other
+ * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the
+ * mqtt_common.[hc] files must be facilitated to create a stand-alone library.
+ *-----------------------------------------------------------------------------
+ */
+
+static void  *mutex = NULL;
+static void (*mutex_lockin)(void*) = NULL;
+static void (*mutex_unlock)(void*) = NULL;
+
+#define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex);
+#define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex);
+
+static bool aux_dbg_enbl = false;
+static int32_t (*debug_printf)(const char *fmt, ...) = NULL;
+
+#define USR_INFO debug_printf
+#define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__)
+
+static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end,
+                           struct utf8_string *utf8)
+{
+        const uint8_t *ref = buf; /* Reference */
+        uint16_t len = 0;         /* UTF8 Size */
+
+        if(end - buf < 2)
+                return -1; /* No valid buffer to hold UTF8 size */
+
+        buf += buf_rd_nbo_2B(buf, &len);
+        if(end - buf < len)
+                return -1; /* No valid buffer to hold UTF8 name */
+
+        utf8->length = len;
+        utf8->buffer = len? (char*)buf : NULL;
+
+        return buf + len - ref;
+}
+
+static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL;
+static const struct device_net_services *net_ops = NULL;
+
+#ifndef CFG_SR_MQTT_CTXS
+#define MAX_NWCONN 6
+#else
+#define MAX_NWCONN CFG_SR_MQTT_CTXS
+#endif
+
+static struct client_ctx contexts[MAX_NWCONN];
+
+static struct client_ctx *used_ctxs = NULL;
+static struct client_ctx *free_ctxs = NULL;
+
+
+#define NETWORK_CLOSE_FLAG         0x00200000
+#define NW_CONN_ERROR_FLAG         0x00400000
+#define RCVD_CONN_MSG_FLAG         0x00800000
+
+#define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG)
+
+static void cl_ctx_init(void)
+{
+        int32_t i = 0;
+        for(i = 0; i < MAX_NWCONN; i++) {
+                struct client_ctx *cl_ctx = contexts + i;
+
+                cl_ctx_reset(cl_ctx);
+
+                cl_ctx->next = free_ctxs;
+                free_ctxs = cl_ctx;
+        }
+}
+
+static void cl_ctx_free(struct client_ctx *cl_ctx)
+{
+        cl_ctx_reset(cl_ctx);
+
+        cl_ctx->next = free_ctxs;
+        free_ctxs = cl_ctx;
+
+        return;
+}
+
+static struct client_ctx *cl_ctx_alloc(void)
+{
+        struct client_ctx *cl_ctx = free_ctxs;
+        if(cl_ctx) {
+                free_ctxs = cl_ctx->next;
+                cl_ctx->next = NULL;
+        } else
+                USR_INFO("S: fatal, no free cl_ctx\n\r");
+
+        return cl_ctx;
+}
+
+static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx)
+{
+        return (cl_ctx->flags & RCVD_CONN_MSG_FLAG);
+}
+
+static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx)
+{
+        cl_ctx->flags |= RCVD_CONN_MSG_FLAG;
+}
+
+static void used_ctxs_insert(struct client_ctx *cl_ctx)
+{
+        cl_ctx_timeout_insert(&used_ctxs, cl_ctx);
+}
+
+static void used_ctxs_remove(struct client_ctx *cl_ctx)
+{
+        cl_ctx_remove(&used_ctxs, cl_ctx);
+}
+
+static int32_t loopb_net        = -1;
+static const uint8_t LOOP_DATA[] = {0x00, 0x01};
+#define LOOP_DLEN sizeof(LOOP_DATA)
+static uint16_t  loopback_port   = 0;
+static bool pending_trigs   = false;
+
+static int32_t loopb_trigger(void)
+{
+        uint8_t ip_addr[] = {127,0,0,1};
+        int32_t rv = 0;
+        
+        if((-1 != loopb_net) && (false == pending_trigs)) {
+                rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN,
+                                        loopback_port, ip_addr, 4);
+                if(0 == rv)
+                        pending_trigs = true;
+        }
+
+        return rv;
+}
+
+static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err)
+{
+        DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net);
+
+        net_ops->close(cl_ctx->net);
+        cl_ctx->net = -1;
+
+        if(cl_ctx->usr)
+                usr_cbs->on_cl_net_close(cl_ctx->usr, due2err);
+
+        used_ctxs_remove(cl_ctx);
+        cl_ctx_free(cl_ctx);
+}
+
+static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err)
+{
+        if(due2err)
+                cl_ctx->flags |= NW_CONN_ERROR_FLAG;
+
+        cl_ctx->flags |= NETWORK_CLOSE_FLAG;
+
+        loopb_trigger();
+}
+
+static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len)
+{
+        int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL);
+        if(rv <= 0) {
+                do_net_close_tx(cl_ctx, true);
+                rv = MQP_ERR_NETWORK;
+        }
+
+        USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r",
+                 *buf, len, cl_ctx->net, rv? "Sent" : "Fail");
+        return rv;
+}
+
+static
+int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos,
+                bool has_vh, uint16_t vh_data)
+{
+        uint8_t  buf[4];
+        uint32_t len = 2;
+
+        if(false == had_rcvd_conn_msg(cl_ctx))
+                return MQP_ERR_NOTCONN;
+
+        buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false));
+        buf[1] = has_vh ? 2 : 0;
+
+        if(has_vh)
+                len += buf_wr_nbo_2B(buf + 2, vh_data);
+
+        return cl_ctx_send(cl_ctx, buf, len);
+}
+
+static
+int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
+                      uint16_t vh_data)
+{
+        struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl;
+
+        return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos,
+                                   has_vh, vh_data) : -1;
+}
+
+int32_t  mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
+                      uint16_t vh_data)
+{
+        return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
+}
+
+int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos,
+                            bool has_vh, uint16_t vh_data)
+{
+        int32_t rv;
+
+        MUTEX_LOCKIN();
+        rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
+        MUTEX_UNLOCK();
+
+        return rv;
+}
+
+int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf)
+{
+        struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl;
+
+        int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0,
+                             true, (vh_buf[0] << 8) | vh_buf[1]);
+
+        if((rv > 0) && (0x00 != vh_buf[1]))
+                do_net_close_tx(cl_ctx, true);
+
+        return rv;
+}
+
+static
+int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
+{
+        int32_t rv = 0;
+        uint8_t *buf = MQP_FHEADER_BUF(mqp);
+
+        if(dup)
+                *buf |= DUP_FLAG_VAL(true);
+
+        rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp));
+
+        *buf &= ~DUP_FLAG_VAL(true);
+
+        return rv;
+}
+
+int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
+{
+        return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
+}
+
+int32_t 
+mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
+{
+        int32_t rv;
+
+        MUTEX_LOCKIN();
+        rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
+        MUTEX_UNLOCK();
+
+        return rv;
+}
+
+#define MQP_MAX_TOPICS      16
+#define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2)
+
+static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs,
+                        uint32_t pay_len, uint16_t msg_id)
+{
+        uint8_t *ref = buf += MAX_FH_LEN;
+
+        if(MQP_SUBACK_PAY_OFS != pay_ofs)
+                return MQP_ERR_PKT_LEN;
+
+        buf += buf_wr_nbo_2B(buf, msg_id);
+        ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES, 
+                                      pay_len + buf - ref);
+
+        ref -= 1;
+        *ref = MAKE_FH_BYTE1(MQTT_SUBACK, 
+                             MAKE_FH_FLAGS(false, MQTT_QOS0, false));
+
+        return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref);
+}
+
+static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id)
+{
+        return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id);
+}
+
+/*----------------------------------------------------------------------------
+ * Receive Routines
+ *----------------------------------------------------------------------------
+ */
+
+/* Candidate to be moved to mqtt_common.c file */
+static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
+{
+        uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
+
+        if(mqp_raw->pl_len < 2)
+                return false;    /* Bytes for MSG ID not available */
+
+        buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
+        mqp_raw->vh_len += 2;
+        mqp_raw->pl_len -= 2;
+
+        return true;
+}
+
+#define BRK_IF_RD_ERR_UTF8(buf, end, utf8)      \
+        if(rd_buf_utf8(buf, end, utf8) < 0)     \
+                break;
+
+static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos)
+{
+        struct utf8_string utf8;
+        uint8_t *ref = buf;
+
+        buf += mqp_buf_rd_utf8(buf, end, &utf8);
+
+        /* Assess that UTF8 has been read and QOS can be read */
+        if((buf > ref) && (end > buf)) {
+                utf8_qos->buffer = utf8.buffer;
+                utf8_qos->length = utf8.length;
+                utf8_qos->qosreq = (enum mqtt_qos)*buf++;
+
+                return buf - ref;
+        }
+
+        return -1;
+}
+
+static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw,
+                             struct utf8_strqos *qos_topics, uint32_t *n_topics)
+{
+        uint8_t *buf, *end;
+        uint32_t i = 0;
+
+        if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
+                return false;   /* Problem in contents received from client */
+
+        buf = MQP_PAYLOAD_BUF(mqp_raw);
+        end = buf + mqp_raw->pl_len;
+
+        for(i = 0; (i < *n_topics) && (buf < end); i++) {
+                struct utf8_strqos *qos_top = qos_topics + i;
+                int32_t len = buf_rd_utf8_qos(buf, end, qos_top);
+                if(len < 0)
+                        break; /* Failed to read Topic */
+
+                buf += len;
+        }
+
+        *n_topics = i;
+
+        return ((0 == i) || (buf != end))? false : true;
+}
+
+static
+bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+        uint32_t n_topics = MQP_MAX_TOPICS;
+        uint16_t msg_id;
+
+        struct utf8_strqos qos_topics[MQP_MAX_TOPICS];
+        uint8_t ack[MQP_MAX_TOPICS  +  MQP_SUBACK_PAY_OFS];
+
+        if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics))
+                return false;
+
+        msg_id = mqp_raw->msg_id;
+
+        /* All topics have been now put in array, pass-on info to upper layer */
+        if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics,
+                               msg_id, ack + MQP_SUBACK_PAY_OFS)) {
+
+                sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id);
+
+                return true;
+        }
+
+        return false;
+}
+
+
+static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw,
+                               struct utf8_string *topics, uint32_t *n_topics)
+{
+        uint8_t *buf, *end;
+        uint32_t i = 0;
+        
+        if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
+                return false;   /* Problem in contents received from client */
+
+        buf = MQP_PAYLOAD_BUF(mqp_raw);
+        end = buf + mqp_raw->pl_len;
+
+        for(i = 0; (i < *n_topics) && (buf < end); i++) {
+                struct utf8_string *topic = topics + i;
+                int32_t len = mqp_buf_rd_utf8(buf, end, topic);
+                if(len < 0)
+                        break; /* Failed to read Topic */
+                
+                buf += len;
+        }
+
+        *n_topics = i;
+
+        return ((0 == i) || (buf != end))? false : true;
+}
+
+static
+bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+        uint32_t n_topics = MQP_MAX_TOPICS;
+        uint16_t msg_id;
+
+        struct utf8_string topics[MQP_MAX_TOPICS];
+
+        if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics))
+                return false;
+
+        msg_id = mqp_raw->msg_id;
+
+        /* All topics have been now put in array, pass-on info to upper layer */
+        if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) {
+                unsub_ack_send(cl_ctx, msg_id);
+                return true;
+        }
+
+        return false;
+}
+
+static bool proc_pingreq_rx(struct client_ctx *cl_ctx)
+{
+        vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00);
+        return true;
+}
+
+static bool proc_disconn_rx(struct client_ctx *cl_ctx)
+{
+        do_net_close_rx(cl_ctx, false);
+        return true;
+}
+
+static
+bool proc_pub_msg_rx(struct client_ctx  *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+        bool rv = mqp_proc_pub_rx(mqp_raw);
+        uint8_t B = mqp_raw->fh_byte1;
+        enum mqtt_qos qos = ENUM_QOS(B);
+        struct utf8_string topic;
+        uint16_t msg_id = 0;
+
+        if(false == rv)
+                return rv; /* Didn't get a good PUB Packet */
+
+        msg_id = mqp_raw->msg_id;
+
+        topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw);
+        topic.length =      MQP_PUB_TOP_LEN(mqp_raw);
+
+        rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic, 
+                                 MQP_PUB_PAY_BUF(mqp_raw),
+                                 MQP_PUB_PAY_LEN(mqp_raw),
+                                 msg_id, BOOL_DUP(B), qos, 
+                                 BOOL_RETAIN(B));
+        if(false == rv)
+                return rv;
+
+        if(MQTT_QOS1 == qos)
+                vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id);
+
+        if(MQTT_QOS2 == qos)
+                vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id);
+
+        return rv;
+}
+
+static 
+bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+        if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) ||
+           (false == usr_cbs->ack_notify(cl_ctx->usr,
+                                         mqp_raw->msg_type,
+                                         mqp_raw->msg_id)))
+                return false;
+
+        return true;
+}
+
+#define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD
+//#define KA_TIMEOUT_NONE   (0xFFFFFFFF)
+
+static void rx_timeout_update(struct client_ctx *cl_ctx)
+{
+        if(false == had_rcvd_conn_msg(cl_ctx))
+                return;
+
+        cl_ctx_timeout_update(cl_ctx, net_ops->time());
+
+        used_ctxs_remove(cl_ctx);
+        used_ctxs_insert(cl_ctx);
+
+        return;
+}
+
+static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver)
+{
+        const char *buf = utf8->buffer;
+
+        /* Check for protocol version 3.1.1 */
+        if((4 == utf8->length)  &&
+           (buf[0] == 'M')      &&
+           (buf[1] == 'Q')      &&
+           (buf[2] == 'T')      &&
+           (buf[3] == 'T')      &&
+           (0x04 == ver))
+                return true;
+
+        /* Check for protocol version 3.1   */
+        if((6 == utf8->length)  &&
+           (buf[0] == 'M')      &&
+           (buf[1] == 'Q')      &&
+           (buf[2] == 'I')      &&
+           (buf[3] == 's')      &&
+           (buf[4] == 'd')      &&
+           (buf[5] == 'p')      &&
+           (0x03 == ver))
+                return true;
+
+        return false;
+}
+
+static uint16_t proc_connect_vh_rx(struct mqtt_packet  *mqp_raw,
+                              uint8_t *conn_flags, uint16_t *ka_secs)
+{
+        struct utf8_string utf8;
+        uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw);
+        uint8_t *end = buf + mqp_raw->pl_len;
+        uint8_t *ref = buf;
+
+        buf += mqp_buf_rd_utf8(buf, end, &utf8);
+        if(end - buf < 1)
+                return CONNACK_RC_BAD_PROTOV; /* No proto ver */
+
+        if(false == proc_protocol_info(&utf8, *buf++))
+                return CONNACK_RC_BAD_PROTOV;
+
+        *conn_flags = *buf++;
+
+        if(end - buf < 2)
+                return 0xFF;        /* Bad packet composition */
+
+        *ka_secs  = (buf[0] << 8) | buf[1];
+        buf      += 2;
+        *ka_secs += *ka_secs >> 1;
+
+        mqp_raw->vh_len  = buf - ref;
+        mqp_raw->pl_len -= buf - ref;
+
+        return 0;
+}
+
+#define RET_IF_RD_CONN_ERROR(buf, end, utf8)                   \
+        {                                                      \
+                int32_t len = mqp_buf_rd_utf8(buf, end, utf8);     \
+                if(len < 0)                                    \
+                        return 0x00FF;                         \
+                                                               \
+                buf += len;                                    \
+        }
+
+uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags,
+                       struct utf8_string *free_utf8s,
+                       struct utf8_string **used_refs)
+{
+        struct utf8_string *utf8;
+
+        utf8 = used_refs[0] = free_utf8s + 0;
+        RET_IF_RD_CONN_ERROR(buf, end, utf8);
+
+        if(conn_flags & WILL_CONFIG_VAL) {
+                utf8 = used_refs[1] = free_utf8s + 1;
+                RET_IF_RD_CONN_ERROR(buf, end, utf8);
+
+                utf8 = used_refs[2] = free_utf8s + 2;
+                RET_IF_RD_CONN_ERROR(buf, end, utf8);
+        }
+
+        if((0 == (conn_flags & USER_NAME_OPVAL))  &&
+           (0 != (conn_flags & PASS_WORD_OPVAL)))
+                return 0x00FF;  /* Bad combination */
+
+        if(conn_flags & USER_NAME_OPVAL) {
+                utf8 = used_refs[3] = free_utf8s + 3;
+                RET_IF_RD_CONN_ERROR(buf, end, utf8);
+        }
+
+        if(conn_flags & PASS_WORD_OPVAL) {
+                utf8 = used_refs[4] = free_utf8s + 4;
+                RET_IF_RD_CONN_ERROR(buf, end, utf8);
+        }
+
+        return 0;
+}
+
+static
+bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+        struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL};
+        struct utf8_string free_utf8s[5];
+        uint8_t conn_flags, *buf, *end;
+        bool clean_session = false;
+        uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */
+
+        set_rcvd_conn_msg(cl_ctx);
+
+        ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs);
+        if(ack_vh16) 
+                goto proc_connect_rx_exit1;
+
+        buf = MQP_PAYLOAD_BUF(mqp_raw);
+        end = buf + mqp_raw->pl_len;
+        ack_vh16 = proc_connect_pl_rx(buf,  end, conn_flags,
+                                      free_utf8s, used_refs);
+        if(ack_vh16)
+                goto proc_connect_rx_exit1;
+
+        clean_session = (conn_flags & CLEAN_START_VAL)? true : false;
+        ack_vh16 = (!used_refs[0]->length && !clean_session)?
+                   CONNACK_RC_CLI_REJECT : 0;   /* Validate 0 byte Client ID */
+        if(ack_vh16)
+                goto proc_connect_rx_exit1;
+
+        /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */
+        ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0],
+                                       &cl_ctx->usr);
+ proc_connect_rx_exit1:
+
+        DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16,
+                 ack_vh16 & 0xFF? "error" : "good");
+
+        if(0xFF != (ack_vh16 & 0xFF))
+                vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16);
+
+        if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) {
+                rx_timeout_update(cl_ctx);
+                usr_cbs->on_connack_send(cl_ctx->usr, clean_session);
+        } else {
+                return false;
+        }
+
+        return true;
+}
+
+
+static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list)
+{
+        int32_t i = 0;
+
+        for(i = 0; (i < size) && (NULL != list); i++, list = list->next)
+                recv_hvec[i] = list->net;
+
+        recv_hvec[i] = -1;
+
+        return;
+}
+
+static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+        uint8_t msg_type = mqp_raw->msg_type;
+        bool rv = false;
+
+        USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r",
+                 mqp_raw->fh_byte1, cl_ctx->net);
+
+        if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx))
+                goto process_recv_exit1; /* Proto Violation */
+
+        rx_timeout_update(cl_ctx);
+
+        switch(msg_type) {
+
+        case MQTT_CONNECT:
+                rv = proc_connect_rx(cl_ctx, mqp_raw);
+                break;
+
+        case MQTT_DISCONNECT:
+                rv = proc_disconn_rx(cl_ctx);
+                break;
+
+        case MQTT_SUBSCRIBE:
+                rv = proc_sub_msg_rx(cl_ctx, mqp_raw);
+                break;
+
+        case MQTT_UNSUBSCRIBE:
+                rv = proc_unsub_msg_rx(cl_ctx, mqp_raw);
+                break;
+
+        case MQTT_PINGREQ:
+                rv = proc_pingreq_rx(cl_ctx);
+                break;
+
+        case MQTT_PUBLISH:
+                rv = proc_pub_msg_rx(cl_ctx, mqp_raw);
+                break;
+
+        case MQTT_PUBACK:
+        case MQTT_PUBREC:
+        case MQTT_PUBREL:
+        case MQTT_PUBCOMP:
+                rv = proc_ack_msg_rx(cl_ctx, mqp_raw);
+                break;
+
+        default:
+                break;
+        }
+
+ process_recv_exit1:
+        DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r",
+                 mqp_raw->msg_id, rv? "Good" : "Fail");
+
+        return rv;
+}
+
+
+/* Terminate net connections which haven't received PKTs beyond expected time.
+   Caller must ensure atomic enviroment for execution of this routine.
+*/
+static void ka_sequence(uint32_t *secs2wait)
+{
+        struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */
+        uint32_t now_secs = net_ops->time();
+
+        while(NULL != cl_ctx) {
+                struct client_ctx *next = cl_ctx->next;
+                if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) {
+                        bool due2err = false;
+                        if(cl_ctx->flags & NW_CONN_ERROR_FLAG)
+                                due2err = true;
+
+                        cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG |
+                                           NETWORK_CLOSE_FLAG);
+
+                        /* Close network: Timeout or TX err */
+                        do_net_close_rx(cl_ctx, due2err);
+                }
+
+                cl_ctx = next;
+        }
+
+        cl_ctx = used_ctxs;
+        if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) ||
+           ((NULL == cl_ctx)))
+                *secs2wait = IO_MON_NO_TIMEOUT;
+        else
+                *secs2wait = cl_ctx->timeout - now_secs;
+
+        return;
+}
+
+/* Put a new functiona name such as mk_new_ctx() or setup_ctx() and
+   processing to restrict limit number of connections.
+
+   Return value as well.
+*/
+static bool accept_ctx(int32_t net, uint32_t wait_secs)
+{
+        struct client_ctx *cl_ctx = cl_ctx_alloc();
+        if(NULL == cl_ctx)
+                return false;
+
+        cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip,
+                                      &cl_ctx->ip_length);
+        if(-1 == cl_ctx->net) {
+                cl_ctx_free(cl_ctx);
+                USR_INFO("S: failed to accept new NW connection\n\r");
+                return false;
+        }
+
+        DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net);
+
+        /* Timeout to receive MQTT_CONNECT */
+        cl_ctx->timeout = wait_secs + net_ops->time(); 
+
+        used_ctxs_insert(cl_ctx);
+        return true;
+}
+
+static struct client_ctx *net_cl_ctx_find(int32_t net)
+{
+        struct client_ctx *cl_ctx = used_ctxs;
+        while(cl_ctx) {
+                if(net == cl_ctx->net)
+                        break;
+
+                cl_ctx = cl_ctx->next;
+        }
+
+        if(NULL == cl_ctx)
+                USR_INFO("Did not find ctx for net %d\n\r", net); 
+
+        return cl_ctx;
+}
+
+static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */
+static int32_t send_hvec  = -1;
+static int32_t rsvd_hvec  = -1;
+static int32_t listen_net = -1;
+
+static struct mqtt_packet rx_mqp;
+static uint8_t rxb[MQP_SERVER_RX_LEN];
+static uint16_t listener_port = 0;
+
+static inline
+int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out)
+{
+        int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL);
+        if(rv <= 0)
+                rv = MQP_ERR_NETWORK;
+
+        return rv;
+}
+
+static int32_t proc_loopback_recv(int32_t net)
+{
+        uint8_t buf[LOOP_DLEN];
+
+        /* Thanks for waking-up thread and do nothing in this routine */
+        int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0);
+        pending_trigs = false;
+
+        if(rv <= 0) {
+                net_ops->close(net);
+                return MQP_ERR_LIBQUIT;
+        }
+
+        return rv;
+}
+
+static void proc_net_data_recv(int32_t net)
+{
+        struct client_ctx *cl_ctx = net_cl_ctx_find(net);
+        bool dummy;
+        int32_t rv;
+
+        mqp_reset(&rx_mqp); /* Start w/ a clean buffer */
+
+        rv = net_recv(net, &rx_mqp, 0, &dummy);
+        if(rv > 0)
+                /* Working Principle: Only RX processing errors should be
+                   reported as 'false'. Status of TX as a follow-up to RX
+                   messages need not be reported by the xyz_rx() routines.
+                   Error observed in TX is either dealt in next iteration
+                   of RX loop.
+                */
+                if(false == process_recv(cl_ctx, &rx_mqp))
+                        rv = MQP_ERR_CONTENT;
+
+        if(rv < 0)
+                do_net_close_rx(cl_ctx, rv);
+}
+
+static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs)
+{
+        bool rv = true;
+        int32_t idx = 0;
+
+        MUTEX_LOCKIN();
+
+        for(idx = 0; (idx < n_hnds) && (rv == true); idx++) {
+                int32_t net = recv_hvec[idx];
+                if(net == listen_net) {
+                        rv = accept_ctx(listen_net, wait_secs);
+                } else if(loopback_port && (net == loopb_net)) {
+                        if(proc_loopback_recv(loopb_net) < 0)
+                                rv = false;
+                } else {
+                        proc_net_data_recv(net);
+                }
+        }
+
+        MUTEX_UNLOCK();
+
+        return rv;
+}
+
+int32_t mqtt_server_run(uint32_t wait_secs)   // TBD break into two functions
+{
+        uint32_t secs2wait = 0;
+        int32_t n_hnds = 0;
+
+        USR_INFO("S: MQTT Server Run invoked ....\n\r");
+
+        if(NULL == net_ops)
+                return MQP_ERR_NET_OPS;
+
+        if(loopback_port) {
+                loopb_net  = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, 
+                                           loopback_port, NULL);
+                if(-1 == loopb_net)
+                        return MQP_ERR_LIBQUIT;
+        }
+
+        listen_net = net_ops->listen(0, listener_port, NULL);
+        if(-1 == listen_net)
+                return MQP_ERR_LIBQUIT;
+
+        do {
+                int32_t *r_hvec = recv_hvec + 0;
+
+                *r_hvec++ = listen_net;
+                if(loopback_port)
+                        *r_hvec++ = loopb_net;
+
+                /* MQTT Timeouts: close expired conns; get time to next expiry */
+                ka_sequence(&secs2wait); 
+
+                /* Prepare array of net handles. Must've atleast listen handle */
+                //                recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1,  used_ctxs);
+                recv_hvec_load(r_hvec, MAX_NWCONN + 1,  used_ctxs);
+
+                n_hnds = net_ops->io_mon(recv_hvec, &send_hvec,
+                                         &rsvd_hvec, secs2wait);
+                if(n_hnds < 0)
+                        return MQP_ERR_LIBQUIT;
+
+                if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs))
+                        return MQP_ERR_LIBQUIT;
+
+        } while(1);
+}
+
+int32_t mqtt_server_register_net_svc(const struct device_net_services *net)
+{
+        if(net && net->io_mon && net->close && net->send &&
+           net->recv && net->time && net->listen) {
+                net_ops = net;
+                return 0;
+        }
+
+        return -1;
+}
+
+int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg,
+                         const struct mqtt_server_msg_cbs *msg_cbs)
+{
+        cl_ctx_init();
+
+        if((NULL == lib_cfg)                ||
+           (0    == lib_cfg->listener_port) ||
+           (NULL == lib_cfg->debug_printf))
+                return -1;
+
+        debug_printf = lib_cfg->debug_printf; /* Facilitate debug */
+
+        loopback_port = lib_cfg->loopback_port;
+        listener_port = lib_cfg->listener_port;
+
+        mutex         = lib_cfg->mutex;
+        mutex_lockin  = lib_cfg->mutex_lockin;
+        mutex_unlock  = lib_cfg->mutex_unlock;
+
+        aux_dbg_enbl  = lib_cfg->aux_debug_en;
+        debug_printf  = lib_cfg->debug_printf;
+
+        usr_cbs = &usr_obj;
+
+        memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs));
+
+        mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0);
+
+        return 0;
+}
+
+}//namespace mbed_mqtt