Part of TI's mqtt

Dependents:   mqtt_V1 cc3100_Test_mqtt_CM3

Revision:
0:547251f42a60
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/client_mgmt.cpp	Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,790 @@
+/******************************************************************************
+*
+*   Copyright (C) 2014 Texas Instruments Incorporated
+*
+*   All rights reserved. Property of Texas Instruments Incorporated.
+*   Restricted rights to use, duplicate or disclose this code are
+*   granted through contract.
+*
+*   The program may not be used without the written permission of
+*   Texas Instruments Incorporated or against the terms and conditions
+*   stipulated in the agreement under which this program has been supplied,
+*   and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_util.h"
+#include "client_mgmt.h"
+#include "server_pkts.h"
+
+namespace mbed_mqtt {
+
+#ifndef CFG_SR_MAX_CL_ID_SIZE
+#define MAX_CLIENT_ID_LEN 64
+#else
+#define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE
+#endif
+
+#ifndef CFG_SR_MAX_NUM_CLIENT
+#define MAX_CLIENTS       16 /* Must not exceed 32 */
+#else
+#define MAX_CLIENTS       CFG_SR_MAX_NUM_CLIENT
+
+#if (CFG_SR_MAX_NUM_CLIENT > 32)
+#error "CFG_SR_MAX_NUM_CLIENT must not exceed 32"
+#endif
+#endif
+
+static struct client_usr {
+
+        void  *ctx;  /* Client net */
+        void  *app;  /* Client app */
+
+        void  *will; /* Will topic */
+        uint32_t   index;
+        uint32_t   n_sub;
+
+#define MQ_CONNECT_FLAG  0x00000001
+#define CLEAN_SESS_FLAG  0x00000002
+#define ASSIGNMENT_FLAG  0x00000004   /* No state or connected */
+
+        uint32_t   flags;
+
+        char    client_id[MAX_CLIENT_ID_LEN];
+
+        struct pub_qos2_cq qos2_rx_cq;
+        struct pub_qos2_cq qos2_tx_cq;
+
+} users[MAX_CLIENTS];
+
+// TBD consistency between inline and macro functions
+
+static inline bool is_connected(struct client_usr *cl_usr)
+{
+        return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false;
+}
+
+static inline bool has_clean_session(struct client_usr *cl_usr)
+{
+        return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false;
+}
+
+static inline bool is_assigned(struct client_usr *cl_usr)
+{
+        return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false;
+}
+
+static inline void set_connect_state(struct client_usr *cl_usr,
+                                     bool connected)
+{
+        if(connected)
+                cl_usr->flags |=  MQ_CONNECT_FLAG;
+        else 
+                cl_usr->flags &= ~MQ_CONNECT_FLAG;
+}
+
+static inline void set_clean_session(struct client_usr *cl_usr,
+                                     bool clean_session)
+{
+        if(clean_session)
+                cl_usr->flags |=  CLEAN_SESS_FLAG;
+        else
+                cl_usr->flags &= ~CLEAN_SESS_FLAG;
+}
+
+static inline void set_assigned(struct client_usr *cl_usr,
+                                bool assignment)
+{
+        if(assignment)
+                cl_usr->flags |=  ASSIGNMENT_FLAG;
+        else
+                cl_usr->flags &= ~ASSIGNMENT_FLAG;
+}
+
+static void cl_usr_reset(struct client_usr *cl_usr)
+{
+        cl_usr->ctx  = NULL;
+        cl_usr->app  = NULL;
+        cl_usr->will = NULL;
+
+        cl_usr->n_sub = 0;
+        cl_usr->flags = 0;
+        
+        cl_usr->client_id[0] = '\0';
+
+        qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
+        qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
+}
+
+static void cl_usr_init(void)
+{
+        int32_t idx = 0;
+        for(idx = 0; idx < MAX_CLIENTS; idx++) {
+                struct client_usr *cl_usr = users + idx;
+                cl_usr->index =  idx;
+
+                cl_usr_reset(cl_usr);
+        }
+}
+
+static void cl_usr_free(struct client_usr *cl_usr)
+{
+        cl_usr_reset(cl_usr);
+        cl_usr->flags &= ~(MQ_CONNECT_FLAG | 
+                           CLEAN_SESS_FLAG |
+                           ASSIGNMENT_FLAG);
+}
+
+void cl_sub_count_add(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        if(is_connected(cl_usr)) {
+                cl_usr->n_sub++;
+                USR_INFO("%s has added a new sub, now total is %u\n\r",
+                         cl_usr->client_id, cl_usr->n_sub);
+
+        }
+
+        return;
+}
+
+void cl_sub_count_del(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        if(is_connected(cl_usr)) {
+                cl_usr->n_sub--;
+                USR_INFO("%s has deleted a sub, now total is %u\n\r",
+                         cl_usr->client_id, cl_usr->n_sub);
+        }
+
+        return;
+}
+
+/*----------------------------------------------------------------------------
+ * QoS2 PUB RX Message handling mechanism and associated house-keeping
+ *--------------------------------------------------------------------------*/
+static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id);
+}
+
+static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id);
+}
+
+static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) ||
+                          qos2_pub_rx_logup(cl_usr, msg_id));
+}
+
+static void ack2_msg_id_dispatch(struct client_usr *cl_usr)
+{
+        struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq;
+        uint8_t rd_idx = tx_cq->rd_idx;
+        uint8_t n_free = tx_cq->n_free;
+        uint8_t i = 0;
+
+        for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) {
+                if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1,
+                                    true, tx_cq->id_vec[i]) <= 0)
+                        break;
+        }
+
+        return;
+}
+
+static void ack2_msg_id_purge(struct client_usr *cl_usr)
+{
+        qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
+        qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
+}
+
+/*
+
+*/
+static struct mqtt_ack_wlist  wl_qos_ack1 = {NULL, NULL};
+static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1;
+
+/*
+
+*/
+static struct mqtt_ack_wlist  wl_mqp_sess = {NULL, NULL};
+static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess;
+
+#define MQP_CL_MAP_GET(mqp)         (mqp->private_)
+#define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |=  cl_map)
+#define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map)
+
+#define CL_BMAP(cl_usr)             (1 << cl_usr->index)
+
+static inline
+int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp,
+                 bool dup)
+{
+        /* Error, if any, is handled in 'cl_net_close()' ....*/
+        return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup);
+}
+
+static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr)
+{
+        struct mqtt_packet *mqp = NULL;
+
+        for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next)
+                if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr))
+                        _pub_dispatch(cl_usr, mqp, true);
+}
+
+#if  0
+static struct client_usr *find_cl_usr(uint32_t index)
+{
+        struct client_usr *cl_usr = users + 0;
+
+        int32_t i = 0;
+        for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) {
+                if(index == cl_usr->index)
+                        break;
+        }
+
+        return (MAX_CLIENTS == i)? NULL : cl_usr;
+}
+#endif
+
+#define IS_CL_USR_FREE(cl_usr) ((  0  == cl_usr->flags)        &&       \
+                                ('\0' == cl_usr->client_id[0]))
+
+#define IS_CL_INACTIVE(cl_usr) ((  0  == cl_usr->flags)        &&       \
+                                ('\0' != cl_usr->client_id[0]))
+
+#define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr)                          \
+        ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) ||          \
+          ((MQTT_QOS0 == qos)))?                                        \
+         false : true)
+
+static inline uint32_t _cl_bmap_get(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr);
+}
+
+uint32_t cl_bmap_get(void *usr_cl)
+{
+        return usr_cl? _cl_bmap_get(usr_cl) : 0;
+}
+
+void *cl_app_hndl_get(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL;
+}
+
+void *cl_will_hndl_get(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL;
+}
+
+static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
+{
+        //uint32_t n_bits = sizeof(uint32_t) << 3;    /* Multiply by 8 */
+        uint32_t sp_map = 0; /* client Map for sessions present */
+        enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */
+        const uint32_t cl_ref = cl_map; /* Keep ref to original */
+
+        int32_t i = 0;
+        for(i = 0; i < MAX_CLIENTS; i++) {
+                if(cl_map & (1 << i)) {
+                        struct client_usr *cl_usr = users + i; //find_cl_usr(i);
+                        if(is_connected(cl_usr))
+                                if((_pub_dispatch(cl_usr, mqp, false) > 0)  &&
+                                   NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr))
+                                        continue;/* Processing done; next CL */
+
+                        /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */
+                        cl_map &= ~(1 << i); 
+
+                        if(IS_CL_INACTIVE(cl_usr)) 
+                                sp_map |= (1 << i);  /* CL: Maintain session */
+                }
+        }
+
+        if(sp_map) {
+                struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */
+                if(cpy) {
+                        MQP_CL_MAP_SET(cpy, sp_map);
+                        mqp_ack_wlist_append(sess_mqp_wl, cpy);
+                } else 
+                        sp_map = 0;
+        }
+
+        if(cl_map) {                                    /* Wait List Publish */
+                MQP_CL_MAP_SET(mqp, cl_map);
+                mqp_ack_wlist_append(qos_ack1_wl, mqp);
+        } else
+                mqp_free(mqp); /* PUB MQP now has no more use; must be freed */
+
+        USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r",
+                 cl_ref, cl_map, sp_map);
+
+        return;
+}
+
+void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
+{
+        pub_dispatch(cl_map, mqp);
+        return;
+}
+
+/* Move to mqtt_common.h and remove it from mqtt_client.h */
+static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
+{
+        mqp_free(mqp);
+        return MQP_ERR_PKT_LEN;
+}
+
+/* Move this to a common file */
+int32_t cl_pub_msg_send(void *usr_cl,
+                    const struct utf8_string *topic, const uint8_t *data_buf,
+                    uint32_t data_len, enum mqtt_qos qos, bool retain)
+{
+        struct mqtt_packet *mqp = NULL;
+
+        if((NULL == topic)                        ||
+           ((data_len > 0) && (NULL == data_buf)) ||
+           (NULL == usr_cl))
+                return MQP_ERR_FNPARAM;
+
+        mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
+        if(NULL == mqp)
+                return MQP_ERR_PKT_AVL;
+
+        if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
+           (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) 
+                return len_err_free_mqp(mqp);
+ 
+        mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
+
+        pub_dispatch(_cl_bmap_get(usr_cl), mqp);
+
+        return MQP_CONTENT_LEN(mqp);
+}
+
+static void wl_remove(struct mqtt_ack_wlist *list,
+                      struct mqtt_packet *prev,
+                      struct mqtt_packet *elem)
+{
+        if(prev)
+                prev->next = elem->next;
+        else
+                list->head = elem->next;
+
+        if(NULL == list->head)
+                list->tail = NULL;
+
+        if(list->tail == elem)
+                list->tail = prev;
+
+        return;
+}
+
+static void sess_wl_mqp_dispatch(struct client_usr *cl_usr)
+{
+        struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
+        uint32_t cl_map = CL_BMAP(cl_usr);
+
+        for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) {
+                struct mqtt_packet *cpy = NULL;
+
+                if(0 == (MQP_CL_MAP_GET(mqp) & cl_map))
+                        continue; /* MQP & CL: no association */
+
+                /* MQP has associated client(s) -  process it */
+
+                /* Dissociate this client from MQP */
+                MQP_CL_MAP_CLR(mqp, cl_map);
+                next = mqp->next; /* House keeping */
+
+                if(0 == MQP_CL_MAP_GET(mqp)) {
+                        /* MQP w/ no clients,  remove from WL */
+                        wl_remove(sess_mqp_wl, prev, mqp);
+
+                        /* Ensure correct arrangement for WL  */
+                        cpy = mqp; mqp = prev;
+                } else {
+                        /* MQP is associated w/ other clients */
+                        cpy = mqp_server_copy(mqp);
+                        if(NULL == cpy)
+                                continue;
+                }
+
+                /* Got packet from session, dispatch it to CL */
+                pub_dispatch(cl_map, cpy);
+        }
+
+        return;
+}
+
+static
+struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl,
+                                      struct client_usr *cl_usr, uint16_t msg_id,
+                                      struct mqtt_packet **prev)
+{
+        struct mqtt_packet *mqp = NULL;
+
+        *prev = NULL;
+        for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) {
+                if(msg_id == mqp->msg_id) {
+                        /* Found a MQP whose msg_id matches with input */
+                        MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr));
+                        return mqp;
+                }
+        }
+
+        return NULL;
+}
+
+static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev,
+                          struct mqtt_packet *mqp)
+{
+        if(0 == MQP_CL_MAP_GET(mqp)) {
+                wl_remove(wl, prev, mqp);
+                mqp_free(mqp);
+
+                return true;
+        }
+
+        return false;
+}
+
+static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        struct mqtt_packet *prev = NULL, *mqp = NULL;
+
+        if(false == has_clean_session(cl_usr)) {
+                mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
+                if(mqp)
+                        wl_rmfree_try(qos_ack1_wl, prev, mqp);
+        }
+
+        return true;
+}
+
+static 
+void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr)
+{
+        struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
+        uint32_t bmap = CL_BMAP(cl_usr);
+
+        for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) {
+                next = mqp->next;
+
+                /* Ideally, check should be done to see if cl_usr and mqp are
+                   associated. If yes, then the bit should be cleared. At the
+                   moment, blind clearing of the bit has been implemented and
+                   it has no side effects.
+                */
+                MQP_CL_MAP_CLR(mqp, bmap);
+
+                /* MQP with no clients has no more use, so try deleting MQP */
+                if(wl_rmfree_try(wl, prev, mqp))
+                        mqp = prev; /* MQP deleted, prep for next iteration */
+        }
+
+        return;
+}
+
+static void ack1_wl_mqp_purge(struct client_usr *cl_usr)
+{
+        wl_purge(qos_ack1_wl, cl_usr);
+}
+
+static void sess_wl_mqp_purge(struct client_usr *cl_usr)
+{
+        wl_purge(sess_mqp_wl, cl_usr);
+}
+
+static void session_resume(struct client_usr *cl_usr)
+{
+        ack1_wl_mqp_dispatch(cl_usr);
+        ack2_msg_id_dispatch(cl_usr);
+        sess_wl_mqp_dispatch(cl_usr);
+}
+
+static void session_delete(struct client_usr *cl_usr)
+{
+        ack1_wl_mqp_purge(cl_usr);
+        ack2_msg_id_purge(cl_usr);
+        sess_wl_mqp_purge(cl_usr);
+}
+
+static bool has_session_data(struct client_usr *cl_usr)
+{
+        uint32_t map = CL_BMAP(cl_usr);
+        struct mqtt_packet *elem;
+
+        if(cl_usr->n_sub)
+                return true;
+
+        elem = qos_ack1_wl->head;
+        while(elem) {
+                if(MQP_CL_MAP_GET(elem) & map) 
+                        return true;
+
+                elem = elem->next;
+        }
+
+        elem = sess_mqp_wl->head;
+        while(elem) {
+                if(MQP_CL_MAP_GET(elem) & map) 
+                        return true;
+
+                elem = elem->next;
+        }
+
+        return false;
+}
+
+bool cl_can_session_delete(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        return cl_usr? (has_clean_session(cl_usr) ||
+                        !has_session_data(cl_usr)) : false;
+}
+
+void cl_on_net_close(void *usr_cl)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        if(is_assigned(cl_usr)) {
+                if(false == has_session_data(cl_usr))
+                        cl_usr_free(cl_usr);
+        } else if(has_clean_session(cl_usr)) {
+                session_delete(cl_usr);
+                cl_usr_free(cl_usr);
+        } else {
+                set_connect_state(cl_usr, false);
+                cl_usr->ctx = NULL;
+                cl_usr->app = NULL;
+        }
+
+        return;
+}
+
+static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP,
+                         MQTT_QOS0, true, msg_id);
+
+        if(qos2_pub_rx_is_done(cl_usr, msg_id))
+                qos2_pub_rx_unlog(cl_usr, msg_id);
+
+        return true;
+}
+
+static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id)
+{
+        struct mqtt_packet *prev = NULL, *mqp = NULL;
+
+        mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
+        if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) {
+
+                wl_rmfree_try(qos_ack1_wl, prev, mqp);
+
+                mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL,
+                                 MQTT_QOS1, true, msg_id);
+
+                return true;
+        }
+
+        return false;
+}
+
+bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+        bool rv = false;
+
+        if(NULL == cl_usr)
+                return rv;
+
+        switch(msg_type) {
+
+        case MQTT_PUBACK:
+                rv = ack1_unmap_rmfree_try(cl_usr, msg_id);
+                break;
+
+        case MQTT_PUBREC:
+                rv = proc_pub_rec_rx(cl_usr, msg_id);
+                break;
+
+        case MQTT_PUBREL:
+                rv = proc_pub_rel_rx(cl_usr, msg_id);
+                break;
+
+        case MQTT_PUBCOMP:
+                rv = ack2_msg_id_unlog(cl_usr, msg_id);
+                break;
+
+        default:
+                break;
+        }
+
+        return rv;
+}
+
+static void assign_cl_index_as_id(struct client_usr *cl_usr)
+{
+        /* TBD: need a better implementation */
+        char *client_id = cl_usr->client_id;
+
+        client_id[0] = 'S';
+        client_id[1] = 'e';
+        client_id[2] = 'l';
+        client_id[3] = 'f';
+        client_id[4] = '-';
+        client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30;
+        client_id[6] = ((cl_usr->index & 0x0f))      + 0x30;
+        client_id[7] = '\0';
+
+        /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */
+
+        return;
+}
+
+static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf)
+{
+        struct client_usr *cl_usr, *fr_usr = NULL;
+        int32_t idx = 0;
+        for(idx = 0; idx < MAX_CLIENTS; idx++) {
+                cl_usr = users + idx;
+                if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr))
+                        fr_usr = cl_usr;  /* Note 1st free cl_usr */
+
+                if((NULL == client_id) && (NULL != fr_usr)) {
+                        /* Free cl_usr is present to create CL-ID */
+                        break;
+
+                } else if((NULL != client_id) &&
+                          (0 == strncmp(cl_usr->client_id, client_id,
+                                        MAX_CLIENT_ID_LEN))) {
+                        /* Found a client obj with exact ID match */
+                        if(is_connected(cl_usr)) {
+                                /* Error: CL-ID is already active */
+                                vh_buf[1] = CONNACK_RC_CLI_REJECT;
+                                cl_usr    = NULL;
+                        }
+                        break;
+                }
+        }
+
+        if(idx == MAX_CLIENTS) {          /* Did not find a match */
+                cl_usr   = fr_usr;
+                if(NULL == cl_usr)
+                        /* Server Resource unavailable */
+                        vh_buf[1] = CONNACK_RC_SVR_UNAVBL;
+        }
+
+        if(NULL != cl_usr) {
+                if(NULL == client_id)
+                        assign_cl_index_as_id(cl_usr); /* Get ID */
+                else if(IS_CL_USR_FREE(cl_usr))
+                        strncpy(cl_usr->client_id, client_id,
+                                MAX_CLIENT_ID_LEN);
+
+                set_assigned(cl_usr, true);
+        }
+
+        USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r",
+                 cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1,
+                 cl_usr? cl_usr->client_id : "");
+
+        return cl_usr;
+}
+
+uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
+                  void *app_cl, void *will, void **usr_cl)
+{
+        uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT};
+        struct client_usr *cl_usr;
+
+        if(client_id && ('\0' == client_id[0]))
+                /* Shouldn't happen: CLIENT ID with a NUL only string */
+                return CONNACK_RC_CLI_REJECT;
+
+        cl_usr = assign_cl_usr(client_id, vh_buf);
+        if(NULL == cl_usr)
+                return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */
+
+        if((false == clean_session) && has_session_data(cl_usr))
+                vh_buf[0] = 0x01; /* Set Session Present Flag */
+
+#if  0
+        if(0x00 == vh_buf[1])
+                *usr_cl = (void*)cl_usr;
+#endif
+        *usr_cl = (void*)cl_usr;
+
+        cl_usr->ctx  = ctx_cl;
+        cl_usr->app  = app_cl;
+        cl_usr->will = will;
+
+        return ((vh_buf[0] << 8) | vh_buf[1]);
+}
+
+void cl_on_connack_send(void *usr_cl, bool clean_session)
+{
+        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+        if(false == is_assigned(cl_usr)) {
+                USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r",
+                         cl_usr->index);
+                return;
+        }
+
+        set_assigned(cl_usr, false);
+        set_clean_session(cl_usr, clean_session);
+        set_connect_state(cl_usr, true);
+
+        if(clean_session) {
+                session_delete(cl_usr);
+        } else {
+                session_resume(cl_usr);
+        }
+
+        DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id);
+
+        return;
+}
+
+int32_t cl_mgmt_init(void)
+{
+        cl_usr_init();
+
+        return 0;
+}
+
+}//namespace mbed_mqtt