Part of TI's mqtt

Dependents:   mqtt_V1 cc3100_Test_mqtt_CM3

client_mgmt.cpp

Committer:
dflet
Date:
2015-06-06
Revision:
0:547251f42a60

File content as of revision 0:547251f42a60:

/******************************************************************************
*
*   Copyright (C) 2014 Texas Instruments Incorporated
*
*   All rights reserved. Property of Texas Instruments Incorporated.
*   Restricted rights to use, duplicate or disclose this code are
*   granted through contract.
*
*   The program may not be used without the written permission of
*   Texas Instruments Incorporated or against the terms and conditions
*   stipulated in the agreement under which this program has been supplied,
*   and under no circumstances can it be used with non-TI connectivity device.
*
******************************************************************************/

#include "server_util.h"
#include "client_mgmt.h"
#include "server_pkts.h"

namespace mbed_mqtt {

#ifndef CFG_SR_MAX_CL_ID_SIZE
#define MAX_CLIENT_ID_LEN 64
#else
#define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE
#endif

#ifndef CFG_SR_MAX_NUM_CLIENT
#define MAX_CLIENTS       16 /* Must not exceed 32 */
#else
#define MAX_CLIENTS       CFG_SR_MAX_NUM_CLIENT

#if (CFG_SR_MAX_NUM_CLIENT > 32)
#error "CFG_SR_MAX_NUM_CLIENT must not exceed 32"
#endif
#endif

static struct client_usr {

        void  *ctx;  /* Client net */
        void  *app;  /* Client app */

        void  *will; /* Will topic */
        uint32_t   index;
        uint32_t   n_sub;

#define MQ_CONNECT_FLAG  0x00000001
#define CLEAN_SESS_FLAG  0x00000002
#define ASSIGNMENT_FLAG  0x00000004   /* No state or connected */

        uint32_t   flags;

        char    client_id[MAX_CLIENT_ID_LEN];

        struct pub_qos2_cq qos2_rx_cq;
        struct pub_qos2_cq qos2_tx_cq;

} users[MAX_CLIENTS];

// TBD consistency between inline and macro functions

static inline bool is_connected(struct client_usr *cl_usr)
{
        return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false;
}

static inline bool has_clean_session(struct client_usr *cl_usr)
{
        return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false;
}

static inline bool is_assigned(struct client_usr *cl_usr)
{
        return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false;
}

static inline void set_connect_state(struct client_usr *cl_usr,
                                     bool connected)
{
        if(connected)
                cl_usr->flags |=  MQ_CONNECT_FLAG;
        else 
                cl_usr->flags &= ~MQ_CONNECT_FLAG;
}

static inline void set_clean_session(struct client_usr *cl_usr,
                                     bool clean_session)
{
        if(clean_session)
                cl_usr->flags |=  CLEAN_SESS_FLAG;
        else
                cl_usr->flags &= ~CLEAN_SESS_FLAG;
}

static inline void set_assigned(struct client_usr *cl_usr,
                                bool assignment)
{
        if(assignment)
                cl_usr->flags |=  ASSIGNMENT_FLAG;
        else
                cl_usr->flags &= ~ASSIGNMENT_FLAG;
}

static void cl_usr_reset(struct client_usr *cl_usr)
{
        cl_usr->ctx  = NULL;
        cl_usr->app  = NULL;
        cl_usr->will = NULL;

        cl_usr->n_sub = 0;
        cl_usr->flags = 0;
        
        cl_usr->client_id[0] = '\0';

        qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
        qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
}

static void cl_usr_init(void)
{
        int32_t idx = 0;
        for(idx = 0; idx < MAX_CLIENTS; idx++) {
                struct client_usr *cl_usr = users + idx;
                cl_usr->index =  idx;

                cl_usr_reset(cl_usr);
        }
}

static void cl_usr_free(struct client_usr *cl_usr)
{
        cl_usr_reset(cl_usr);
        cl_usr->flags &= ~(MQ_CONNECT_FLAG | 
                           CLEAN_SESS_FLAG |
                           ASSIGNMENT_FLAG);
}

void cl_sub_count_add(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        if(is_connected(cl_usr)) {
                cl_usr->n_sub++;
                USR_INFO("%s has added a new sub, now total is %u\n\r",
                         cl_usr->client_id, cl_usr->n_sub);

        }

        return;
}

void cl_sub_count_del(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        if(is_connected(cl_usr)) {
                cl_usr->n_sub--;
                USR_INFO("%s has deleted a sub, now total is %u\n\r",
                         cl_usr->client_id, cl_usr->n_sub);
        }

        return;
}

/*----------------------------------------------------------------------------
 * QoS2 PUB RX Message handling mechanism and associated house-keeping
 *--------------------------------------------------------------------------*/
static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id)
{
        return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id);
}

static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id)
{
        return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id);
}

static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id)
{
        return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id);
}

static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id)
{
        return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id);
}

static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id)
{
        return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id);
}

bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) ||
                          qos2_pub_rx_logup(cl_usr, msg_id));
}

static void ack2_msg_id_dispatch(struct client_usr *cl_usr)
{
        struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq;
        uint8_t rd_idx = tx_cq->rd_idx;
        uint8_t n_free = tx_cq->n_free;
        uint8_t i = 0;

        for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) {
                if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1,
                                    true, tx_cq->id_vec[i]) <= 0)
                        break;
        }

        return;
}

static void ack2_msg_id_purge(struct client_usr *cl_usr)
{
        qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
        qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
}

/*

*/
static struct mqtt_ack_wlist  wl_qos_ack1 = {NULL, NULL};
static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1;

/*

*/
static struct mqtt_ack_wlist  wl_mqp_sess = {NULL, NULL};
static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess;

#define MQP_CL_MAP_GET(mqp)         (mqp->private_)
#define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |=  cl_map)
#define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map)

#define CL_BMAP(cl_usr)             (1 << cl_usr->index)

static inline
int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp,
                 bool dup)
{
        /* Error, if any, is handled in 'cl_net_close()' ....*/
        return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup);
}

static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr)
{
        struct mqtt_packet *mqp = NULL;

        for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next)
                if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr))
                        _pub_dispatch(cl_usr, mqp, true);
}

#if  0
static struct client_usr *find_cl_usr(uint32_t index)
{
        struct client_usr *cl_usr = users + 0;

        int32_t i = 0;
        for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) {
                if(index == cl_usr->index)
                        break;
        }

        return (MAX_CLIENTS == i)? NULL : cl_usr;
}
#endif

#define IS_CL_USR_FREE(cl_usr) ((  0  == cl_usr->flags)        &&       \
                                ('\0' == cl_usr->client_id[0]))

#define IS_CL_INACTIVE(cl_usr) ((  0  == cl_usr->flags)        &&       \
                                ('\0' != cl_usr->client_id[0]))

#define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr)                          \
        ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) ||          \
          ((MQTT_QOS0 == qos)))?                                        \
         false : true)

static inline uint32_t _cl_bmap_get(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr);
}

uint32_t cl_bmap_get(void *usr_cl)
{
        return usr_cl? _cl_bmap_get(usr_cl) : 0;
}

void *cl_app_hndl_get(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL;
}

void *cl_will_hndl_get(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL;
}

static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
{
        //uint32_t n_bits = sizeof(uint32_t) << 3;    /* Multiply by 8 */
        uint32_t sp_map = 0; /* client Map for sessions present */
        enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */
        const uint32_t cl_ref = cl_map; /* Keep ref to original */

        int32_t i = 0;
        for(i = 0; i < MAX_CLIENTS; i++) {
                if(cl_map & (1 << i)) {
                        struct client_usr *cl_usr = users + i; //find_cl_usr(i);
                        if(is_connected(cl_usr))
                                if((_pub_dispatch(cl_usr, mqp, false) > 0)  &&
                                   NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr))
                                        continue;/* Processing done; next CL */

                        /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */
                        cl_map &= ~(1 << i); 

                        if(IS_CL_INACTIVE(cl_usr)) 
                                sp_map |= (1 << i);  /* CL: Maintain session */
                }
        }

        if(sp_map) {
                struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */
                if(cpy) {
                        MQP_CL_MAP_SET(cpy, sp_map);
                        mqp_ack_wlist_append(sess_mqp_wl, cpy);
                } else 
                        sp_map = 0;
        }

        if(cl_map) {                                    /* Wait List Publish */
                MQP_CL_MAP_SET(mqp, cl_map);
                mqp_ack_wlist_append(qos_ack1_wl, mqp);
        } else
                mqp_free(mqp); /* PUB MQP now has no more use; must be freed */

        USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r",
                 cl_ref, cl_map, sp_map);

        return;
}

void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
{
        pub_dispatch(cl_map, mqp);
        return;
}

/* Move to mqtt_common.h and remove it from mqtt_client.h */
static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
{
        mqp_free(mqp);
        return MQP_ERR_PKT_LEN;
}

/* Move this to a common file */
int32_t cl_pub_msg_send(void *usr_cl,
                    const struct utf8_string *topic, const uint8_t *data_buf,
                    uint32_t data_len, enum mqtt_qos qos, bool retain)
{
        struct mqtt_packet *mqp = NULL;

        if((NULL == topic)                        ||
           ((data_len > 0) && (NULL == data_buf)) ||
           (NULL == usr_cl))
                return MQP_ERR_FNPARAM;

        mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
        if(NULL == mqp)
                return MQP_ERR_PKT_AVL;

        if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
           (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) 
                return len_err_free_mqp(mqp);
 
        mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));

        pub_dispatch(_cl_bmap_get(usr_cl), mqp);

        return MQP_CONTENT_LEN(mqp);
}

static void wl_remove(struct mqtt_ack_wlist *list,
                      struct mqtt_packet *prev,
                      struct mqtt_packet *elem)
{
        if(prev)
                prev->next = elem->next;
        else
                list->head = elem->next;

        if(NULL == list->head)
                list->tail = NULL;

        if(list->tail == elem)
                list->tail = prev;

        return;
}

static void sess_wl_mqp_dispatch(struct client_usr *cl_usr)
{
        struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
        uint32_t cl_map = CL_BMAP(cl_usr);

        for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) {
                struct mqtt_packet *cpy = NULL;

                if(0 == (MQP_CL_MAP_GET(mqp) & cl_map))
                        continue; /* MQP & CL: no association */

                /* MQP has associated client(s) -  process it */

                /* Dissociate this client from MQP */
                MQP_CL_MAP_CLR(mqp, cl_map);
                next = mqp->next; /* House keeping */

                if(0 == MQP_CL_MAP_GET(mqp)) {
                        /* MQP w/ no clients,  remove from WL */
                        wl_remove(sess_mqp_wl, prev, mqp);

                        /* Ensure correct arrangement for WL  */
                        cpy = mqp; mqp = prev;
                } else {
                        /* MQP is associated w/ other clients */
                        cpy = mqp_server_copy(mqp);
                        if(NULL == cpy)
                                continue;
                }

                /* Got packet from session, dispatch it to CL */
                pub_dispatch(cl_map, cpy);
        }

        return;
}

static
struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl,
                                      struct client_usr *cl_usr, uint16_t msg_id,
                                      struct mqtt_packet **prev)
{
        struct mqtt_packet *mqp = NULL;

        *prev = NULL;
        for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) {
                if(msg_id == mqp->msg_id) {
                        /* Found a MQP whose msg_id matches with input */
                        MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr));
                        return mqp;
                }
        }

        return NULL;
}

static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev,
                          struct mqtt_packet *mqp)
{
        if(0 == MQP_CL_MAP_GET(mqp)) {
                wl_remove(wl, prev, mqp);
                mqp_free(mqp);

                return true;
        }

        return false;
}

static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id)
{
        struct mqtt_packet *prev = NULL, *mqp = NULL;

        if(false == has_clean_session(cl_usr)) {
                mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
                if(mqp)
                        wl_rmfree_try(qos_ack1_wl, prev, mqp);
        }

        return true;
}

static 
void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr)
{
        struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
        uint32_t bmap = CL_BMAP(cl_usr);

        for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) {
                next = mqp->next;

                /* Ideally, check should be done to see if cl_usr and mqp are
                   associated. If yes, then the bit should be cleared. At the
                   moment, blind clearing of the bit has been implemented and
                   it has no side effects.
                */
                MQP_CL_MAP_CLR(mqp, bmap);

                /* MQP with no clients has no more use, so try deleting MQP */
                if(wl_rmfree_try(wl, prev, mqp))
                        mqp = prev; /* MQP deleted, prep for next iteration */
        }

        return;
}

static void ack1_wl_mqp_purge(struct client_usr *cl_usr)
{
        wl_purge(qos_ack1_wl, cl_usr);
}

static void sess_wl_mqp_purge(struct client_usr *cl_usr)
{
        wl_purge(sess_mqp_wl, cl_usr);
}

static void session_resume(struct client_usr *cl_usr)
{
        ack1_wl_mqp_dispatch(cl_usr);
        ack2_msg_id_dispatch(cl_usr);
        sess_wl_mqp_dispatch(cl_usr);
}

static void session_delete(struct client_usr *cl_usr)
{
        ack1_wl_mqp_purge(cl_usr);
        ack2_msg_id_purge(cl_usr);
        sess_wl_mqp_purge(cl_usr);
}

static bool has_session_data(struct client_usr *cl_usr)
{
        uint32_t map = CL_BMAP(cl_usr);
        struct mqtt_packet *elem;

        if(cl_usr->n_sub)
                return true;

        elem = qos_ack1_wl->head;
        while(elem) {
                if(MQP_CL_MAP_GET(elem) & map) 
                        return true;

                elem = elem->next;
        }

        elem = sess_mqp_wl->head;
        while(elem) {
                if(MQP_CL_MAP_GET(elem) & map) 
                        return true;

                elem = elem->next;
        }

        return false;
}

bool cl_can_session_delete(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        return cl_usr? (has_clean_session(cl_usr) ||
                        !has_session_data(cl_usr)) : false;
}

void cl_on_net_close(void *usr_cl)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        if(is_assigned(cl_usr)) {
                if(false == has_session_data(cl_usr))
                        cl_usr_free(cl_usr);
        } else if(has_clean_session(cl_usr)) {
                session_delete(cl_usr);
                cl_usr_free(cl_usr);
        } else {
                set_connect_state(cl_usr, false);
                cl_usr->ctx = NULL;
                cl_usr->app = NULL;
        }

        return;
}

static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id)
{
        mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP,
                         MQTT_QOS0, true, msg_id);

        if(qos2_pub_rx_is_done(cl_usr, msg_id))
                qos2_pub_rx_unlog(cl_usr, msg_id);

        return true;
}

static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id)
{
        struct mqtt_packet *prev = NULL, *mqp = NULL;

        mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
        if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) {

                wl_rmfree_try(qos_ack1_wl, prev, mqp);

                mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL,
                                 MQTT_QOS1, true, msg_id);

                return true;
        }

        return false;
}

bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;
        bool rv = false;

        if(NULL == cl_usr)
                return rv;

        switch(msg_type) {

        case MQTT_PUBACK:
                rv = ack1_unmap_rmfree_try(cl_usr, msg_id);
                break;

        case MQTT_PUBREC:
                rv = proc_pub_rec_rx(cl_usr, msg_id);
                break;

        case MQTT_PUBREL:
                rv = proc_pub_rel_rx(cl_usr, msg_id);
                break;

        case MQTT_PUBCOMP:
                rv = ack2_msg_id_unlog(cl_usr, msg_id);
                break;

        default:
                break;
        }

        return rv;
}

static void assign_cl_index_as_id(struct client_usr *cl_usr)
{
        /* TBD: need a better implementation */
        char *client_id = cl_usr->client_id;

        client_id[0] = 'S';
        client_id[1] = 'e';
        client_id[2] = 'l';
        client_id[3] = 'f';
        client_id[4] = '-';
        client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30;
        client_id[6] = ((cl_usr->index & 0x0f))      + 0x30;
        client_id[7] = '\0';

        /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */

        return;
}

static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf)
{
        struct client_usr *cl_usr, *fr_usr = NULL;
        int32_t idx = 0;
        for(idx = 0; idx < MAX_CLIENTS; idx++) {
                cl_usr = users + idx;
                if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr))
                        fr_usr = cl_usr;  /* Note 1st free cl_usr */

                if((NULL == client_id) && (NULL != fr_usr)) {
                        /* Free cl_usr is present to create CL-ID */
                        break;

                } else if((NULL != client_id) &&
                          (0 == strncmp(cl_usr->client_id, client_id,
                                        MAX_CLIENT_ID_LEN))) {
                        /* Found a client obj with exact ID match */
                        if(is_connected(cl_usr)) {
                                /* Error: CL-ID is already active */
                                vh_buf[1] = CONNACK_RC_CLI_REJECT;
                                cl_usr    = NULL;
                        }
                        break;
                }
        }

        if(idx == MAX_CLIENTS) {          /* Did not find a match */
                cl_usr   = fr_usr;
                if(NULL == cl_usr)
                        /* Server Resource unavailable */
                        vh_buf[1] = CONNACK_RC_SVR_UNAVBL;
        }

        if(NULL != cl_usr) {
                if(NULL == client_id)
                        assign_cl_index_as_id(cl_usr); /* Get ID */
                else if(IS_CL_USR_FREE(cl_usr))
                        strncpy(cl_usr->client_id, client_id,
                                MAX_CLIENT_ID_LEN);

                set_assigned(cl_usr, true);
        }

        USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r",
                 cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1,
                 cl_usr? cl_usr->client_id : "");

        return cl_usr;
}

uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
                  void *app_cl, void *will, void **usr_cl)
{
        uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT};
        struct client_usr *cl_usr;

        if(client_id && ('\0' == client_id[0]))
                /* Shouldn't happen: CLIENT ID with a NUL only string */
                return CONNACK_RC_CLI_REJECT;

        cl_usr = assign_cl_usr(client_id, vh_buf);
        if(NULL == cl_usr)
                return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */

        if((false == clean_session) && has_session_data(cl_usr))
                vh_buf[0] = 0x01; /* Set Session Present Flag */

#if  0
        if(0x00 == vh_buf[1])
                *usr_cl = (void*)cl_usr;
#endif
        *usr_cl = (void*)cl_usr;

        cl_usr->ctx  = ctx_cl;
        cl_usr->app  = app_cl;
        cl_usr->will = will;

        return ((vh_buf[0] << 8) | vh_buf[1]);
}

void cl_on_connack_send(void *usr_cl, bool clean_session)
{
        struct client_usr *cl_usr = (struct client_usr*) usr_cl;

        if(false == is_assigned(cl_usr)) {
                USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r",
                         cl_usr->index);
                return;
        }

        set_assigned(cl_usr, false);
        set_clean_session(cl_usr, clean_session);
        set_connect_state(cl_usr, true);

        if(clean_session) {
                session_delete(cl_usr);
        } else {
                session_resume(cl_usr);
        }

        DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id);

        return;
}

int32_t cl_mgmt_init(void)
{
        cl_usr_init();

        return 0;
}

}//namespace mbed_mqtt