Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
Diff: client_mgmt.cpp
- Revision:
- 0:547251f42a60
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/client_mgmt.cpp Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,790 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_util.h"
+#include "client_mgmt.h"
+#include "server_pkts.h"
+
+namespace mbed_mqtt {
+
+#ifndef CFG_SR_MAX_CL_ID_SIZE
+#define MAX_CLIENT_ID_LEN 64
+#else
+#define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE
+#endif
+
+#ifndef CFG_SR_MAX_NUM_CLIENT
+#define MAX_CLIENTS 16 /* Must not exceed 32 */
+#else
+#define MAX_CLIENTS CFG_SR_MAX_NUM_CLIENT
+
+#if (CFG_SR_MAX_NUM_CLIENT > 32)
+#error "CFG_SR_MAX_NUM_CLIENT must not exceed 32"
+#endif
+#endif
+
+static struct client_usr {
+
+ void *ctx; /* Client net */
+ void *app; /* Client app */
+
+ void *will; /* Will topic */
+ uint32_t index;
+ uint32_t n_sub;
+
+#define MQ_CONNECT_FLAG 0x00000001
+#define CLEAN_SESS_FLAG 0x00000002
+#define ASSIGNMENT_FLAG 0x00000004 /* No state or connected */
+
+ uint32_t flags;
+
+ char client_id[MAX_CLIENT_ID_LEN];
+
+ struct pub_qos2_cq qos2_rx_cq;
+ struct pub_qos2_cq qos2_tx_cq;
+
+} users[MAX_CLIENTS];
+
+// TBD consistency between inline and macro functions
+
+static inline bool is_connected(struct client_usr *cl_usr)
+{
+ return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false;
+}
+
+static inline bool has_clean_session(struct client_usr *cl_usr)
+{
+ return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false;
+}
+
+static inline bool is_assigned(struct client_usr *cl_usr)
+{
+ return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false;
+}
+
+static inline void set_connect_state(struct client_usr *cl_usr,
+ bool connected)
+{
+ if(connected)
+ cl_usr->flags |= MQ_CONNECT_FLAG;
+ else
+ cl_usr->flags &= ~MQ_CONNECT_FLAG;
+}
+
+static inline void set_clean_session(struct client_usr *cl_usr,
+ bool clean_session)
+{
+ if(clean_session)
+ cl_usr->flags |= CLEAN_SESS_FLAG;
+ else
+ cl_usr->flags &= ~CLEAN_SESS_FLAG;
+}
+
+static inline void set_assigned(struct client_usr *cl_usr,
+ bool assignment)
+{
+ if(assignment)
+ cl_usr->flags |= ASSIGNMENT_FLAG;
+ else
+ cl_usr->flags &= ~ASSIGNMENT_FLAG;
+}
+
+static void cl_usr_reset(struct client_usr *cl_usr)
+{
+ cl_usr->ctx = NULL;
+ cl_usr->app = NULL;
+ cl_usr->will = NULL;
+
+ cl_usr->n_sub = 0;
+ cl_usr->flags = 0;
+
+ cl_usr->client_id[0] = '\0';
+
+ qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
+ qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
+}
+
+static void cl_usr_init(void)
+{
+ int32_t idx = 0;
+ for(idx = 0; idx < MAX_CLIENTS; idx++) {
+ struct client_usr *cl_usr = users + idx;
+ cl_usr->index = idx;
+
+ cl_usr_reset(cl_usr);
+ }
+}
+
+static void cl_usr_free(struct client_usr *cl_usr)
+{
+ cl_usr_reset(cl_usr);
+ cl_usr->flags &= ~(MQ_CONNECT_FLAG |
+ CLEAN_SESS_FLAG |
+ ASSIGNMENT_FLAG);
+}
+
+void cl_sub_count_add(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(is_connected(cl_usr)) {
+ cl_usr->n_sub++;
+ USR_INFO("%s has added a new sub, now total is %u\n\r",
+ cl_usr->client_id, cl_usr->n_sub);
+
+ }
+
+ return;
+}
+
+void cl_sub_count_del(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(is_connected(cl_usr)) {
+ cl_usr->n_sub--;
+ USR_INFO("%s has deleted a sub, now total is %u\n\r",
+ cl_usr->client_id, cl_usr->n_sub);
+ }
+
+ return;
+}
+
+/*----------------------------------------------------------------------------
+ * QoS2 PUB RX Message handling mechanism and associated house-keeping
+ *--------------------------------------------------------------------------*/
+static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id);
+}
+
+static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id);
+}
+
+static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) ||
+ qos2_pub_rx_logup(cl_usr, msg_id));
+}
+
+static void ack2_msg_id_dispatch(struct client_usr *cl_usr)
+{
+ struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq;
+ uint8_t rd_idx = tx_cq->rd_idx;
+ uint8_t n_free = tx_cq->n_free;
+ uint8_t i = 0;
+
+ for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) {
+ if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1,
+ true, tx_cq->id_vec[i]) <= 0)
+ break;
+ }
+
+ return;
+}
+
+static void ack2_msg_id_purge(struct client_usr *cl_usr)
+{
+ qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
+ qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
+}
+
+/*
+
+*/
+static struct mqtt_ack_wlist wl_qos_ack1 = {NULL, NULL};
+static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1;
+
+/*
+
+*/
+static struct mqtt_ack_wlist wl_mqp_sess = {NULL, NULL};
+static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess;
+
+#define MQP_CL_MAP_GET(mqp) (mqp->private_)
+#define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |= cl_map)
+#define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map)
+
+#define CL_BMAP(cl_usr) (1 << cl_usr->index)
+
+static inline
+int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp,
+ bool dup)
+{
+ /* Error, if any, is handled in 'cl_net_close()' ....*/
+ return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup);
+}
+
+static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr)
+{
+ struct mqtt_packet *mqp = NULL;
+
+ for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next)
+ if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr))
+ _pub_dispatch(cl_usr, mqp, true);
+}
+
+#if 0
+static struct client_usr *find_cl_usr(uint32_t index)
+{
+ struct client_usr *cl_usr = users + 0;
+
+ int32_t i = 0;
+ for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) {
+ if(index == cl_usr->index)
+ break;
+ }
+
+ return (MAX_CLIENTS == i)? NULL : cl_usr;
+}
+#endif
+
+#define IS_CL_USR_FREE(cl_usr) (( 0 == cl_usr->flags) && \
+ ('\0' == cl_usr->client_id[0]))
+
+#define IS_CL_INACTIVE(cl_usr) (( 0 == cl_usr->flags) && \
+ ('\0' != cl_usr->client_id[0]))
+
+#define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr) \
+ ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) || \
+ ((MQTT_QOS0 == qos)))? \
+ false : true)
+
+static inline uint32_t _cl_bmap_get(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr);
+}
+
+uint32_t cl_bmap_get(void *usr_cl)
+{
+ return usr_cl? _cl_bmap_get(usr_cl) : 0;
+}
+
+void *cl_app_hndl_get(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL;
+}
+
+void *cl_will_hndl_get(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL;
+}
+
+static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
+{
+ //uint32_t n_bits = sizeof(uint32_t) << 3; /* Multiply by 8 */
+ uint32_t sp_map = 0; /* client Map for sessions present */
+ enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */
+ const uint32_t cl_ref = cl_map; /* Keep ref to original */
+
+ int32_t i = 0;
+ for(i = 0; i < MAX_CLIENTS; i++) {
+ if(cl_map & (1 << i)) {
+ struct client_usr *cl_usr = users + i; //find_cl_usr(i);
+ if(is_connected(cl_usr))
+ if((_pub_dispatch(cl_usr, mqp, false) > 0) &&
+ NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr))
+ continue;/* Processing done; next CL */
+
+ /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */
+ cl_map &= ~(1 << i);
+
+ if(IS_CL_INACTIVE(cl_usr))
+ sp_map |= (1 << i); /* CL: Maintain session */
+ }
+ }
+
+ if(sp_map) {
+ struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */
+ if(cpy) {
+ MQP_CL_MAP_SET(cpy, sp_map);
+ mqp_ack_wlist_append(sess_mqp_wl, cpy);
+ } else
+ sp_map = 0;
+ }
+
+ if(cl_map) { /* Wait List Publish */
+ MQP_CL_MAP_SET(mqp, cl_map);
+ mqp_ack_wlist_append(qos_ack1_wl, mqp);
+ } else
+ mqp_free(mqp); /* PUB MQP now has no more use; must be freed */
+
+ USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r",
+ cl_ref, cl_map, sp_map);
+
+ return;
+}
+
+void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
+{
+ pub_dispatch(cl_map, mqp);
+ return;
+}
+
+/* Move to mqtt_common.h and remove it from mqtt_client.h */
+static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
+{
+ mqp_free(mqp);
+ return MQP_ERR_PKT_LEN;
+}
+
+/* Move this to a common file */
+int32_t cl_pub_msg_send(void *usr_cl,
+ const struct utf8_string *topic, const uint8_t *data_buf,
+ uint32_t data_len, enum mqtt_qos qos, bool retain)
+{
+ struct mqtt_packet *mqp = NULL;
+
+ if((NULL == topic) ||
+ ((data_len > 0) && (NULL == data_buf)) ||
+ (NULL == usr_cl))
+ return MQP_ERR_FNPARAM;
+
+ mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
+ if(NULL == mqp)
+ return MQP_ERR_PKT_AVL;
+
+ if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
+ (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len))))
+ return len_err_free_mqp(mqp);
+
+ mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
+
+ pub_dispatch(_cl_bmap_get(usr_cl), mqp);
+
+ return MQP_CONTENT_LEN(mqp);
+}
+
+static void wl_remove(struct mqtt_ack_wlist *list,
+ struct mqtt_packet *prev,
+ struct mqtt_packet *elem)
+{
+ if(prev)
+ prev->next = elem->next;
+ else
+ list->head = elem->next;
+
+ if(NULL == list->head)
+ list->tail = NULL;
+
+ if(list->tail == elem)
+ list->tail = prev;
+
+ return;
+}
+
+static void sess_wl_mqp_dispatch(struct client_usr *cl_usr)
+{
+ struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
+ uint32_t cl_map = CL_BMAP(cl_usr);
+
+ for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) {
+ struct mqtt_packet *cpy = NULL;
+
+ if(0 == (MQP_CL_MAP_GET(mqp) & cl_map))
+ continue; /* MQP & CL: no association */
+
+ /* MQP has associated client(s) - process it */
+
+ /* Dissociate this client from MQP */
+ MQP_CL_MAP_CLR(mqp, cl_map);
+ next = mqp->next; /* House keeping */
+
+ if(0 == MQP_CL_MAP_GET(mqp)) {
+ /* MQP w/ no clients, remove from WL */
+ wl_remove(sess_mqp_wl, prev, mqp);
+
+ /* Ensure correct arrangement for WL */
+ cpy = mqp; mqp = prev;
+ } else {
+ /* MQP is associated w/ other clients */
+ cpy = mqp_server_copy(mqp);
+ if(NULL == cpy)
+ continue;
+ }
+
+ /* Got packet from session, dispatch it to CL */
+ pub_dispatch(cl_map, cpy);
+ }
+
+ return;
+}
+
+static
+struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl,
+ struct client_usr *cl_usr, uint16_t msg_id,
+ struct mqtt_packet **prev)
+{
+ struct mqtt_packet *mqp = NULL;
+
+ *prev = NULL;
+ for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) {
+ if(msg_id == mqp->msg_id) {
+ /* Found a MQP whose msg_id matches with input */
+ MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr));
+ return mqp;
+ }
+ }
+
+ return NULL;
+}
+
+static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev,
+ struct mqtt_packet *mqp)
+{
+ if(0 == MQP_CL_MAP_GET(mqp)) {
+ wl_remove(wl, prev, mqp);
+ mqp_free(mqp);
+
+ return true;
+ }
+
+ return false;
+}
+
+static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ struct mqtt_packet *prev = NULL, *mqp = NULL;
+
+ if(false == has_clean_session(cl_usr)) {
+ mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
+ if(mqp)
+ wl_rmfree_try(qos_ack1_wl, prev, mqp);
+ }
+
+ return true;
+}
+
+static
+void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr)
+{
+ struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
+ uint32_t bmap = CL_BMAP(cl_usr);
+
+ for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) {
+ next = mqp->next;
+
+ /* Ideally, check should be done to see if cl_usr and mqp are
+ associated. If yes, then the bit should be cleared. At the
+ moment, blind clearing of the bit has been implemented and
+ it has no side effects.
+ */
+ MQP_CL_MAP_CLR(mqp, bmap);
+
+ /* MQP with no clients has no more use, so try deleting MQP */
+ if(wl_rmfree_try(wl, prev, mqp))
+ mqp = prev; /* MQP deleted, prep for next iteration */
+ }
+
+ return;
+}
+
+static void ack1_wl_mqp_purge(struct client_usr *cl_usr)
+{
+ wl_purge(qos_ack1_wl, cl_usr);
+}
+
+static void sess_wl_mqp_purge(struct client_usr *cl_usr)
+{
+ wl_purge(sess_mqp_wl, cl_usr);
+}
+
+static void session_resume(struct client_usr *cl_usr)
+{
+ ack1_wl_mqp_dispatch(cl_usr);
+ ack2_msg_id_dispatch(cl_usr);
+ sess_wl_mqp_dispatch(cl_usr);
+}
+
+static void session_delete(struct client_usr *cl_usr)
+{
+ ack1_wl_mqp_purge(cl_usr);
+ ack2_msg_id_purge(cl_usr);
+ sess_wl_mqp_purge(cl_usr);
+}
+
+static bool has_session_data(struct client_usr *cl_usr)
+{
+ uint32_t map = CL_BMAP(cl_usr);
+ struct mqtt_packet *elem;
+
+ if(cl_usr->n_sub)
+ return true;
+
+ elem = qos_ack1_wl->head;
+ while(elem) {
+ if(MQP_CL_MAP_GET(elem) & map)
+ return true;
+
+ elem = elem->next;
+ }
+
+ elem = sess_mqp_wl->head;
+ while(elem) {
+ if(MQP_CL_MAP_GET(elem) & map)
+ return true;
+
+ elem = elem->next;
+ }
+
+ return false;
+}
+
+bool cl_can_session_delete(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return cl_usr? (has_clean_session(cl_usr) ||
+ !has_session_data(cl_usr)) : false;
+}
+
+void cl_on_net_close(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(is_assigned(cl_usr)) {
+ if(false == has_session_data(cl_usr))
+ cl_usr_free(cl_usr);
+ } else if(has_clean_session(cl_usr)) {
+ session_delete(cl_usr);
+ cl_usr_free(cl_usr);
+ } else {
+ set_connect_state(cl_usr, false);
+ cl_usr->ctx = NULL;
+ cl_usr->app = NULL;
+ }
+
+ return;
+}
+
+static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP,
+ MQTT_QOS0, true, msg_id);
+
+ if(qos2_pub_rx_is_done(cl_usr, msg_id))
+ qos2_pub_rx_unlog(cl_usr, msg_id);
+
+ return true;
+}
+
+static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ struct mqtt_packet *prev = NULL, *mqp = NULL;
+
+ mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
+ if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) {
+
+ wl_rmfree_try(qos_ack1_wl, prev, mqp);
+
+ mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL,
+ MQTT_QOS1, true, msg_id);
+
+ return true;
+ }
+
+ return false;
+}
+
+bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+ bool rv = false;
+
+ if(NULL == cl_usr)
+ return rv;
+
+ switch(msg_type) {
+
+ case MQTT_PUBACK:
+ rv = ack1_unmap_rmfree_try(cl_usr, msg_id);
+ break;
+
+ case MQTT_PUBREC:
+ rv = proc_pub_rec_rx(cl_usr, msg_id);
+ break;
+
+ case MQTT_PUBREL:
+ rv = proc_pub_rel_rx(cl_usr, msg_id);
+ break;
+
+ case MQTT_PUBCOMP:
+ rv = ack2_msg_id_unlog(cl_usr, msg_id);
+ break;
+
+ default:
+ break;
+ }
+
+ return rv;
+}
+
+static void assign_cl_index_as_id(struct client_usr *cl_usr)
+{
+ /* TBD: need a better implementation */
+ char *client_id = cl_usr->client_id;
+
+ client_id[0] = 'S';
+ client_id[1] = 'e';
+ client_id[2] = 'l';
+ client_id[3] = 'f';
+ client_id[4] = '-';
+ client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30;
+ client_id[6] = ((cl_usr->index & 0x0f)) + 0x30;
+ client_id[7] = '\0';
+
+ /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */
+
+ return;
+}
+
+static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf)
+{
+ struct client_usr *cl_usr, *fr_usr = NULL;
+ int32_t idx = 0;
+ for(idx = 0; idx < MAX_CLIENTS; idx++) {
+ cl_usr = users + idx;
+ if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr))
+ fr_usr = cl_usr; /* Note 1st free cl_usr */
+
+ if((NULL == client_id) && (NULL != fr_usr)) {
+ /* Free cl_usr is present to create CL-ID */
+ break;
+
+ } else if((NULL != client_id) &&
+ (0 == strncmp(cl_usr->client_id, client_id,
+ MAX_CLIENT_ID_LEN))) {
+ /* Found a client obj with exact ID match */
+ if(is_connected(cl_usr)) {
+ /* Error: CL-ID is already active */
+ vh_buf[1] = CONNACK_RC_CLI_REJECT;
+ cl_usr = NULL;
+ }
+ break;
+ }
+ }
+
+ if(idx == MAX_CLIENTS) { /* Did not find a match */
+ cl_usr = fr_usr;
+ if(NULL == cl_usr)
+ /* Server Resource unavailable */
+ vh_buf[1] = CONNACK_RC_SVR_UNAVBL;
+ }
+
+ if(NULL != cl_usr) {
+ if(NULL == client_id)
+ assign_cl_index_as_id(cl_usr); /* Get ID */
+ else if(IS_CL_USR_FREE(cl_usr))
+ strncpy(cl_usr->client_id, client_id,
+ MAX_CLIENT_ID_LEN);
+
+ set_assigned(cl_usr, true);
+ }
+
+ USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r",
+ cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1,
+ cl_usr? cl_usr->client_id : "");
+
+ return cl_usr;
+}
+
+uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
+ void *app_cl, void *will, void **usr_cl)
+{
+ uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT};
+ struct client_usr *cl_usr;
+
+ if(client_id && ('\0' == client_id[0]))
+ /* Shouldn't happen: CLIENT ID with a NUL only string */
+ return CONNACK_RC_CLI_REJECT;
+
+ cl_usr = assign_cl_usr(client_id, vh_buf);
+ if(NULL == cl_usr)
+ return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */
+
+ if((false == clean_session) && has_session_data(cl_usr))
+ vh_buf[0] = 0x01; /* Set Session Present Flag */
+
+#if 0
+ if(0x00 == vh_buf[1])
+ *usr_cl = (void*)cl_usr;
+#endif
+ *usr_cl = (void*)cl_usr;
+
+ cl_usr->ctx = ctx_cl;
+ cl_usr->app = app_cl;
+ cl_usr->will = will;
+
+ return ((vh_buf[0] << 8) | vh_buf[1]);
+}
+
+void cl_on_connack_send(void *usr_cl, bool clean_session)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(false == is_assigned(cl_usr)) {
+ USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r",
+ cl_usr->index);
+ return;
+ }
+
+ set_assigned(cl_usr, false);
+ set_clean_session(cl_usr, clean_session);
+ set_connect_state(cl_usr, true);
+
+ if(clean_session) {
+ session_delete(cl_usr);
+ } else {
+ session_resume(cl_usr);
+ }
+
+ DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id);
+
+ return;
+}
+
+int32_t cl_mgmt_init(void)
+{
+ cl_usr_init();
+
+ return 0;
+}
+
+}//namespace mbed_mqtt