Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
client_mgmt.cpp
- Committer:
- dflet
- Date:
- 2015-06-06
- Revision:
- 0:547251f42a60
File content as of revision 0:547251f42a60:
/****************************************************************************** * * Copyright (C) 2014 Texas Instruments Incorporated * * All rights reserved. Property of Texas Instruments Incorporated. * Restricted rights to use, duplicate or disclose this code are * granted through contract. * * The program may not be used without the written permission of * Texas Instruments Incorporated or against the terms and conditions * stipulated in the agreement under which this program has been supplied, * and under no circumstances can it be used with non-TI connectivity device. * ******************************************************************************/ #include "server_util.h" #include "client_mgmt.h" #include "server_pkts.h" namespace mbed_mqtt { #ifndef CFG_SR_MAX_CL_ID_SIZE #define MAX_CLIENT_ID_LEN 64 #else #define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE #endif #ifndef CFG_SR_MAX_NUM_CLIENT #define MAX_CLIENTS 16 /* Must not exceed 32 */ #else #define MAX_CLIENTS CFG_SR_MAX_NUM_CLIENT #if (CFG_SR_MAX_NUM_CLIENT > 32) #error "CFG_SR_MAX_NUM_CLIENT must not exceed 32" #endif #endif static struct client_usr { void *ctx; /* Client net */ void *app; /* Client app */ void *will; /* Will topic */ uint32_t index; uint32_t n_sub; #define MQ_CONNECT_FLAG 0x00000001 #define CLEAN_SESS_FLAG 0x00000002 #define ASSIGNMENT_FLAG 0x00000004 /* No state or connected */ uint32_t flags; char client_id[MAX_CLIENT_ID_LEN]; struct pub_qos2_cq qos2_rx_cq; struct pub_qos2_cq qos2_tx_cq; } users[MAX_CLIENTS]; // TBD consistency between inline and macro functions static inline bool is_connected(struct client_usr *cl_usr) { return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false; } static inline bool has_clean_session(struct client_usr *cl_usr) { return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false; } static inline bool is_assigned(struct client_usr *cl_usr) { return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false; } static inline void set_connect_state(struct client_usr *cl_usr, bool connected) { if(connected) cl_usr->flags |= MQ_CONNECT_FLAG; else cl_usr->flags &= ~MQ_CONNECT_FLAG; } static inline void set_clean_session(struct client_usr *cl_usr, bool clean_session) { if(clean_session) cl_usr->flags |= CLEAN_SESS_FLAG; else cl_usr->flags &= ~CLEAN_SESS_FLAG; } static inline void set_assigned(struct client_usr *cl_usr, bool assignment) { if(assignment) cl_usr->flags |= ASSIGNMENT_FLAG; else cl_usr->flags &= ~ASSIGNMENT_FLAG; } static void cl_usr_reset(struct client_usr *cl_usr) { cl_usr->ctx = NULL; cl_usr->app = NULL; cl_usr->will = NULL; cl_usr->n_sub = 0; cl_usr->flags = 0; cl_usr->client_id[0] = '\0'; qos2_pub_cq_reset(&cl_usr->qos2_rx_cq); qos2_pub_cq_reset(&cl_usr->qos2_tx_cq); } static void cl_usr_init(void) { int32_t idx = 0; for(idx = 0; idx < MAX_CLIENTS; idx++) { struct client_usr *cl_usr = users + idx; cl_usr->index = idx; cl_usr_reset(cl_usr); } } static void cl_usr_free(struct client_usr *cl_usr) { cl_usr_reset(cl_usr); cl_usr->flags &= ~(MQ_CONNECT_FLAG | CLEAN_SESS_FLAG | ASSIGNMENT_FLAG); } void cl_sub_count_add(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; if(is_connected(cl_usr)) { cl_usr->n_sub++; USR_INFO("%s has added a new sub, now total is %u\n\r", cl_usr->client_id, cl_usr->n_sub); } return; } void cl_sub_count_del(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; if(is_connected(cl_usr)) { cl_usr->n_sub--; USR_INFO("%s has deleted a sub, now total is %u\n\r", cl_usr->client_id, cl_usr->n_sub); } return; } /*---------------------------------------------------------------------------- * QoS2 PUB RX Message handling mechanism and associated house-keeping *--------------------------------------------------------------------------*/ static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id) { return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id); } static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id) { return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id); } static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id) { return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id); } static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id) { return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id); } static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id) { return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id); } bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) || qos2_pub_rx_logup(cl_usr, msg_id)); } static void ack2_msg_id_dispatch(struct client_usr *cl_usr) { struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq; uint8_t rd_idx = tx_cq->rd_idx; uint8_t n_free = tx_cq->n_free; uint8_t i = 0; for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) { if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1, true, tx_cq->id_vec[i]) <= 0) break; } return; } static void ack2_msg_id_purge(struct client_usr *cl_usr) { qos2_pub_cq_reset(&cl_usr->qos2_tx_cq); qos2_pub_cq_reset(&cl_usr->qos2_rx_cq); } /* */ static struct mqtt_ack_wlist wl_qos_ack1 = {NULL, NULL}; static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1; /* */ static struct mqtt_ack_wlist wl_mqp_sess = {NULL, NULL}; static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess; #define MQP_CL_MAP_GET(mqp) (mqp->private_) #define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |= cl_map) #define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map) #define CL_BMAP(cl_usr) (1 << cl_usr->index) static inline int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp, bool dup) { /* Error, if any, is handled in 'cl_net_close()' ....*/ return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup); } static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr) { struct mqtt_packet *mqp = NULL; for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next) if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr)) _pub_dispatch(cl_usr, mqp, true); } #if 0 static struct client_usr *find_cl_usr(uint32_t index) { struct client_usr *cl_usr = users + 0; int32_t i = 0; for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) { if(index == cl_usr->index) break; } return (MAX_CLIENTS == i)? NULL : cl_usr; } #endif #define IS_CL_USR_FREE(cl_usr) (( 0 == cl_usr->flags) && \ ('\0' == cl_usr->client_id[0])) #define IS_CL_INACTIVE(cl_usr) (( 0 == cl_usr->flags) && \ ('\0' != cl_usr->client_id[0])) #define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr) \ ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) || \ ((MQTT_QOS0 == qos)))? \ false : true) static inline uint32_t _cl_bmap_get(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr); } uint32_t cl_bmap_get(void *usr_cl) { return usr_cl? _cl_bmap_get(usr_cl) : 0; } void *cl_app_hndl_get(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL; } void *cl_will_hndl_get(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL; } static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp) { //uint32_t n_bits = sizeof(uint32_t) << 3; /* Multiply by 8 */ uint32_t sp_map = 0; /* client Map for sessions present */ enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */ const uint32_t cl_ref = cl_map; /* Keep ref to original */ int32_t i = 0; for(i = 0; i < MAX_CLIENTS; i++) { if(cl_map & (1 << i)) { struct client_usr *cl_usr = users + i; //find_cl_usr(i); if(is_connected(cl_usr)) if((_pub_dispatch(cl_usr, mqp, false) > 0) && NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr)) continue;/* Processing done; next CL */ /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */ cl_map &= ~(1 << i); if(IS_CL_INACTIVE(cl_usr)) sp_map |= (1 << i); /* CL: Maintain session */ } } if(sp_map) { struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */ if(cpy) { MQP_CL_MAP_SET(cpy, sp_map); mqp_ack_wlist_append(sess_mqp_wl, cpy); } else sp_map = 0; } if(cl_map) { /* Wait List Publish */ MQP_CL_MAP_SET(mqp, cl_map); mqp_ack_wlist_append(qos_ack1_wl, mqp); } else mqp_free(mqp); /* PUB MQP now has no more use; must be freed */ USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r", cl_ref, cl_map, sp_map); return; } void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp) { pub_dispatch(cl_map, mqp); return; } /* Move to mqtt_common.h and remove it from mqtt_client.h */ static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp) { mqp_free(mqp); return MQP_ERR_PKT_LEN; } /* Move this to a common file */ int32_t cl_pub_msg_send(void *usr_cl, const struct utf8_string *topic, const uint8_t *data_buf, uint32_t data_len, enum mqtt_qos qos, bool retain) { struct mqtt_packet *mqp = NULL; if((NULL == topic) || ((data_len > 0) && (NULL == data_buf)) || (NULL == usr_cl)) return MQP_ERR_FNPARAM; mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len); if(NULL == mqp) return MQP_ERR_PKT_AVL; if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) || (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) return len_err_free_mqp(mqp); mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain)); pub_dispatch(_cl_bmap_get(usr_cl), mqp); return MQP_CONTENT_LEN(mqp); } static void wl_remove(struct mqtt_ack_wlist *list, struct mqtt_packet *prev, struct mqtt_packet *elem) { if(prev) prev->next = elem->next; else list->head = elem->next; if(NULL == list->head) list->tail = NULL; if(list->tail == elem) list->tail = prev; return; } static void sess_wl_mqp_dispatch(struct client_usr *cl_usr) { struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL; uint32_t cl_map = CL_BMAP(cl_usr); for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) { struct mqtt_packet *cpy = NULL; if(0 == (MQP_CL_MAP_GET(mqp) & cl_map)) continue; /* MQP & CL: no association */ /* MQP has associated client(s) - process it */ /* Dissociate this client from MQP */ MQP_CL_MAP_CLR(mqp, cl_map); next = mqp->next; /* House keeping */ if(0 == MQP_CL_MAP_GET(mqp)) { /* MQP w/ no clients, remove from WL */ wl_remove(sess_mqp_wl, prev, mqp); /* Ensure correct arrangement for WL */ cpy = mqp; mqp = prev; } else { /* MQP is associated w/ other clients */ cpy = mqp_server_copy(mqp); if(NULL == cpy) continue; } /* Got packet from session, dispatch it to CL */ pub_dispatch(cl_map, cpy); } return; } static struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr, uint16_t msg_id, struct mqtt_packet **prev) { struct mqtt_packet *mqp = NULL; *prev = NULL; for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) { if(msg_id == mqp->msg_id) { /* Found a MQP whose msg_id matches with input */ MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr)); return mqp; } } return NULL; } static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev, struct mqtt_packet *mqp) { if(0 == MQP_CL_MAP_GET(mqp)) { wl_remove(wl, prev, mqp); mqp_free(mqp); return true; } return false; } static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id) { struct mqtt_packet *prev = NULL, *mqp = NULL; if(false == has_clean_session(cl_usr)) { mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev); if(mqp) wl_rmfree_try(qos_ack1_wl, prev, mqp); } return true; } static void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr) { struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL; uint32_t bmap = CL_BMAP(cl_usr); for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) { next = mqp->next; /* Ideally, check should be done to see if cl_usr and mqp are associated. If yes, then the bit should be cleared. At the moment, blind clearing of the bit has been implemented and it has no side effects. */ MQP_CL_MAP_CLR(mqp, bmap); /* MQP with no clients has no more use, so try deleting MQP */ if(wl_rmfree_try(wl, prev, mqp)) mqp = prev; /* MQP deleted, prep for next iteration */ } return; } static void ack1_wl_mqp_purge(struct client_usr *cl_usr) { wl_purge(qos_ack1_wl, cl_usr); } static void sess_wl_mqp_purge(struct client_usr *cl_usr) { wl_purge(sess_mqp_wl, cl_usr); } static void session_resume(struct client_usr *cl_usr) { ack1_wl_mqp_dispatch(cl_usr); ack2_msg_id_dispatch(cl_usr); sess_wl_mqp_dispatch(cl_usr); } static void session_delete(struct client_usr *cl_usr) { ack1_wl_mqp_purge(cl_usr); ack2_msg_id_purge(cl_usr); sess_wl_mqp_purge(cl_usr); } static bool has_session_data(struct client_usr *cl_usr) { uint32_t map = CL_BMAP(cl_usr); struct mqtt_packet *elem; if(cl_usr->n_sub) return true; elem = qos_ack1_wl->head; while(elem) { if(MQP_CL_MAP_GET(elem) & map) return true; elem = elem->next; } elem = sess_mqp_wl->head; while(elem) { if(MQP_CL_MAP_GET(elem) & map) return true; elem = elem->next; } return false; } bool cl_can_session_delete(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; return cl_usr? (has_clean_session(cl_usr) || !has_session_data(cl_usr)) : false; } void cl_on_net_close(void *usr_cl) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; if(is_assigned(cl_usr)) { if(false == has_session_data(cl_usr)) cl_usr_free(cl_usr); } else if(has_clean_session(cl_usr)) { session_delete(cl_usr); cl_usr_free(cl_usr); } else { set_connect_state(cl_usr, false); cl_usr->ctx = NULL; cl_usr->app = NULL; } return; } static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id) { mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP, MQTT_QOS0, true, msg_id); if(qos2_pub_rx_is_done(cl_usr, msg_id)) qos2_pub_rx_unlog(cl_usr, msg_id); return true; } static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id) { struct mqtt_packet *prev = NULL, *mqp = NULL; mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev); if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) { wl_rmfree_try(qos_ack1_wl, prev, mqp); mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1, true, msg_id); return true; } return false; } bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; bool rv = false; if(NULL == cl_usr) return rv; switch(msg_type) { case MQTT_PUBACK: rv = ack1_unmap_rmfree_try(cl_usr, msg_id); break; case MQTT_PUBREC: rv = proc_pub_rec_rx(cl_usr, msg_id); break; case MQTT_PUBREL: rv = proc_pub_rel_rx(cl_usr, msg_id); break; case MQTT_PUBCOMP: rv = ack2_msg_id_unlog(cl_usr, msg_id); break; default: break; } return rv; } static void assign_cl_index_as_id(struct client_usr *cl_usr) { /* TBD: need a better implementation */ char *client_id = cl_usr->client_id; client_id[0] = 'S'; client_id[1] = 'e'; client_id[2] = 'l'; client_id[3] = 'f'; client_id[4] = '-'; client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30; client_id[6] = ((cl_usr->index & 0x0f)) + 0x30; client_id[7] = '\0'; /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */ return; } static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf) { struct client_usr *cl_usr, *fr_usr = NULL; int32_t idx = 0; for(idx = 0; idx < MAX_CLIENTS; idx++) { cl_usr = users + idx; if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr)) fr_usr = cl_usr; /* Note 1st free cl_usr */ if((NULL == client_id) && (NULL != fr_usr)) { /* Free cl_usr is present to create CL-ID */ break; } else if((NULL != client_id) && (0 == strncmp(cl_usr->client_id, client_id, MAX_CLIENT_ID_LEN))) { /* Found a client obj with exact ID match */ if(is_connected(cl_usr)) { /* Error: CL-ID is already active */ vh_buf[1] = CONNACK_RC_CLI_REJECT; cl_usr = NULL; } break; } } if(idx == MAX_CLIENTS) { /* Did not find a match */ cl_usr = fr_usr; if(NULL == cl_usr) /* Server Resource unavailable */ vh_buf[1] = CONNACK_RC_SVR_UNAVBL; } if(NULL != cl_usr) { if(NULL == client_id) assign_cl_index_as_id(cl_usr); /* Get ID */ else if(IS_CL_USR_FREE(cl_usr)) strncpy(cl_usr->client_id, client_id, MAX_CLIENT_ID_LEN); set_assigned(cl_usr, true); } USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r", cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1, cl_usr? cl_usr->client_id : ""); return cl_usr; } uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id, void *app_cl, void *will, void **usr_cl) { uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT}; struct client_usr *cl_usr; if(client_id && ('\0' == client_id[0])) /* Shouldn't happen: CLIENT ID with a NUL only string */ return CONNACK_RC_CLI_REJECT; cl_usr = assign_cl_usr(client_id, vh_buf); if(NULL == cl_usr) return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */ if((false == clean_session) && has_session_data(cl_usr)) vh_buf[0] = 0x01; /* Set Session Present Flag */ #if 0 if(0x00 == vh_buf[1]) *usr_cl = (void*)cl_usr; #endif *usr_cl = (void*)cl_usr; cl_usr->ctx = ctx_cl; cl_usr->app = app_cl; cl_usr->will = will; return ((vh_buf[0] << 8) | vh_buf[1]); } void cl_on_connack_send(void *usr_cl, bool clean_session) { struct client_usr *cl_usr = (struct client_usr*) usr_cl; if(false == is_assigned(cl_usr)) { USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r", cl_usr->index); return; } set_assigned(cl_usr, false); set_clean_session(cl_usr, clean_session); set_connect_state(cl_usr, true); if(clean_session) { session_delete(cl_usr); } else { session_resume(cl_usr); } DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id); return; } int32_t cl_mgmt_init(void) { cl_usr_init(); return 0; } }//namespace mbed_mqtt