Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
Revision 0:547251f42a60, committed 2015-06-06
- Comitter:
- dflet
- Date:
- Sat Jun 06 13:29:08 2015 +0000
- Commit message:
- Part of mqtt_V1
Changed in this revision
diff -r 000000000000 -r 547251f42a60 client_mgmt.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/client_mgmt.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,790 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "server_util.h" +#include "client_mgmt.h" +#include "server_pkts.h" + +namespace mbed_mqtt { + +#ifndef CFG_SR_MAX_CL_ID_SIZE +#define MAX_CLIENT_ID_LEN 64 +#else +#define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE +#endif + +#ifndef CFG_SR_MAX_NUM_CLIENT +#define MAX_CLIENTS 16 /* Must not exceed 32 */ +#else +#define MAX_CLIENTS CFG_SR_MAX_NUM_CLIENT + +#if (CFG_SR_MAX_NUM_CLIENT > 32) +#error "CFG_SR_MAX_NUM_CLIENT must not exceed 32" +#endif +#endif + +static struct client_usr { + + void *ctx; /* Client net */ + void *app; /* Client app */ + + void *will; /* Will topic */ + uint32_t index; + uint32_t n_sub; + +#define MQ_CONNECT_FLAG 0x00000001 +#define CLEAN_SESS_FLAG 0x00000002 +#define ASSIGNMENT_FLAG 0x00000004 /* No state or connected */ + + uint32_t flags; + + char client_id[MAX_CLIENT_ID_LEN]; + + struct pub_qos2_cq qos2_rx_cq; + struct pub_qos2_cq qos2_tx_cq; + +} users[MAX_CLIENTS]; + +// TBD consistency between inline and macro functions + +static inline bool is_connected(struct client_usr *cl_usr) +{ + return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false; +} + +static inline bool has_clean_session(struct client_usr *cl_usr) +{ + return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false; +} + +static inline bool is_assigned(struct client_usr *cl_usr) +{ + return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false; +} + +static inline void set_connect_state(struct client_usr *cl_usr, + bool connected) +{ + if(connected) + cl_usr->flags |= MQ_CONNECT_FLAG; + else + cl_usr->flags &= ~MQ_CONNECT_FLAG; +} + +static inline void set_clean_session(struct client_usr *cl_usr, + bool clean_session) +{ + if(clean_session) + cl_usr->flags |= CLEAN_SESS_FLAG; + else + cl_usr->flags &= ~CLEAN_SESS_FLAG; +} + +static inline void set_assigned(struct client_usr *cl_usr, + bool assignment) +{ + if(assignment) + cl_usr->flags |= ASSIGNMENT_FLAG; + else + cl_usr->flags &= ~ASSIGNMENT_FLAG; +} + +static void cl_usr_reset(struct client_usr *cl_usr) +{ + cl_usr->ctx = NULL; + cl_usr->app = NULL; + cl_usr->will = NULL; + + cl_usr->n_sub = 0; + cl_usr->flags = 0; + + cl_usr->client_id[0] = '\0'; + + qos2_pub_cq_reset(&cl_usr->qos2_rx_cq); + qos2_pub_cq_reset(&cl_usr->qos2_tx_cq); +} + +static void cl_usr_init(void) +{ + int32_t idx = 0; + for(idx = 0; idx < MAX_CLIENTS; idx++) { + struct client_usr *cl_usr = users + idx; + cl_usr->index = idx; + + cl_usr_reset(cl_usr); + } +} + +static void cl_usr_free(struct client_usr *cl_usr) +{ + cl_usr_reset(cl_usr); + cl_usr->flags &= ~(MQ_CONNECT_FLAG | + CLEAN_SESS_FLAG | + ASSIGNMENT_FLAG); +} + +void cl_sub_count_add(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + if(is_connected(cl_usr)) { + cl_usr->n_sub++; + USR_INFO("%s has added a new sub, now total is %u\n\r", + cl_usr->client_id, cl_usr->n_sub); + + } + + return; +} + +void cl_sub_count_del(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + if(is_connected(cl_usr)) { + cl_usr->n_sub--; + USR_INFO("%s has deleted a sub, now total is %u\n\r", + cl_usr->client_id, cl_usr->n_sub); + } + + return; +} + +/*---------------------------------------------------------------------------- + * QoS2 PUB RX Message handling mechanism and associated house-keeping + *--------------------------------------------------------------------------*/ +static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id) +{ + return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id); +} + +static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id) +{ + return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id); +} + +static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id) +{ + return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id); +} + +static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id) +{ + return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id); +} + +static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id) +{ + return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id); +} + +bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) || + qos2_pub_rx_logup(cl_usr, msg_id)); +} + +static void ack2_msg_id_dispatch(struct client_usr *cl_usr) +{ + struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq; + uint8_t rd_idx = tx_cq->rd_idx; + uint8_t n_free = tx_cq->n_free; + uint8_t i = 0; + + for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) { + if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1, + true, tx_cq->id_vec[i]) <= 0) + break; + } + + return; +} + +static void ack2_msg_id_purge(struct client_usr *cl_usr) +{ + qos2_pub_cq_reset(&cl_usr->qos2_tx_cq); + qos2_pub_cq_reset(&cl_usr->qos2_rx_cq); +} + +/* + +*/ +static struct mqtt_ack_wlist wl_qos_ack1 = {NULL, NULL}; +static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1; + +/* + +*/ +static struct mqtt_ack_wlist wl_mqp_sess = {NULL, NULL}; +static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess; + +#define MQP_CL_MAP_GET(mqp) (mqp->private_) +#define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |= cl_map) +#define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map) + +#define CL_BMAP(cl_usr) (1 << cl_usr->index) + +static inline +int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp, + bool dup) +{ + /* Error, if any, is handled in 'cl_net_close()' ....*/ + return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup); +} + +static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr) +{ + struct mqtt_packet *mqp = NULL; + + for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next) + if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr)) + _pub_dispatch(cl_usr, mqp, true); +} + +#if 0 +static struct client_usr *find_cl_usr(uint32_t index) +{ + struct client_usr *cl_usr = users + 0; + + int32_t i = 0; + for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) { + if(index == cl_usr->index) + break; + } + + return (MAX_CLIENTS == i)? NULL : cl_usr; +} +#endif + +#define IS_CL_USR_FREE(cl_usr) (( 0 == cl_usr->flags) && \ + ('\0' == cl_usr->client_id[0])) + +#define IS_CL_INACTIVE(cl_usr) (( 0 == cl_usr->flags) && \ + ('\0' != cl_usr->client_id[0])) + +#define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr) \ + ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) || \ + ((MQTT_QOS0 == qos)))? \ + false : true) + +static inline uint32_t _cl_bmap_get(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr); +} + +uint32_t cl_bmap_get(void *usr_cl) +{ + return usr_cl? _cl_bmap_get(usr_cl) : 0; +} + +void *cl_app_hndl_get(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL; +} + +void *cl_will_hndl_get(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL; +} + +static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp) +{ + //uint32_t n_bits = sizeof(uint32_t) << 3; /* Multiply by 8 */ + uint32_t sp_map = 0; /* client Map for sessions present */ + enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */ + const uint32_t cl_ref = cl_map; /* Keep ref to original */ + + int32_t i = 0; + for(i = 0; i < MAX_CLIENTS; i++) { + if(cl_map & (1 << i)) { + struct client_usr *cl_usr = users + i; //find_cl_usr(i); + if(is_connected(cl_usr)) + if((_pub_dispatch(cl_usr, mqp, false) > 0) && + NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr)) + continue;/* Processing done; next CL */ + + /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */ + cl_map &= ~(1 << i); + + if(IS_CL_INACTIVE(cl_usr)) + sp_map |= (1 << i); /* CL: Maintain session */ + } + } + + if(sp_map) { + struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */ + if(cpy) { + MQP_CL_MAP_SET(cpy, sp_map); + mqp_ack_wlist_append(sess_mqp_wl, cpy); + } else + sp_map = 0; + } + + if(cl_map) { /* Wait List Publish */ + MQP_CL_MAP_SET(mqp, cl_map); + mqp_ack_wlist_append(qos_ack1_wl, mqp); + } else + mqp_free(mqp); /* PUB MQP now has no more use; must be freed */ + + USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r", + cl_ref, cl_map, sp_map); + + return; +} + +void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp) +{ + pub_dispatch(cl_map, mqp); + return; +} + +/* Move to mqtt_common.h and remove it from mqtt_client.h */ +static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp) +{ + mqp_free(mqp); + return MQP_ERR_PKT_LEN; +} + +/* Move this to a common file */ +int32_t cl_pub_msg_send(void *usr_cl, + const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, enum mqtt_qos qos, bool retain) +{ + struct mqtt_packet *mqp = NULL; + + if((NULL == topic) || + ((data_len > 0) && (NULL == data_buf)) || + (NULL == usr_cl)) + return MQP_ERR_FNPARAM; + + mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len); + if(NULL == mqp) + return MQP_ERR_PKT_AVL; + + if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) || + (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) + return len_err_free_mqp(mqp); + + mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain)); + + pub_dispatch(_cl_bmap_get(usr_cl), mqp); + + return MQP_CONTENT_LEN(mqp); +} + +static void wl_remove(struct mqtt_ack_wlist *list, + struct mqtt_packet *prev, + struct mqtt_packet *elem) +{ + if(prev) + prev->next = elem->next; + else + list->head = elem->next; + + if(NULL == list->head) + list->tail = NULL; + + if(list->tail == elem) + list->tail = prev; + + return; +} + +static void sess_wl_mqp_dispatch(struct client_usr *cl_usr) +{ + struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL; + uint32_t cl_map = CL_BMAP(cl_usr); + + for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) { + struct mqtt_packet *cpy = NULL; + + if(0 == (MQP_CL_MAP_GET(mqp) & cl_map)) + continue; /* MQP & CL: no association */ + + /* MQP has associated client(s) - process it */ + + /* Dissociate this client from MQP */ + MQP_CL_MAP_CLR(mqp, cl_map); + next = mqp->next; /* House keeping */ + + if(0 == MQP_CL_MAP_GET(mqp)) { + /* MQP w/ no clients, remove from WL */ + wl_remove(sess_mqp_wl, prev, mqp); + + /* Ensure correct arrangement for WL */ + cpy = mqp; mqp = prev; + } else { + /* MQP is associated w/ other clients */ + cpy = mqp_server_copy(mqp); + if(NULL == cpy) + continue; + } + + /* Got packet from session, dispatch it to CL */ + pub_dispatch(cl_map, cpy); + } + + return; +} + +static +struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl, + struct client_usr *cl_usr, uint16_t msg_id, + struct mqtt_packet **prev) +{ + struct mqtt_packet *mqp = NULL; + + *prev = NULL; + for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) { + if(msg_id == mqp->msg_id) { + /* Found a MQP whose msg_id matches with input */ + MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr)); + return mqp; + } + } + + return NULL; +} + +static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev, + struct mqtt_packet *mqp) +{ + if(0 == MQP_CL_MAP_GET(mqp)) { + wl_remove(wl, prev, mqp); + mqp_free(mqp); + + return true; + } + + return false; +} + +static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id) +{ + struct mqtt_packet *prev = NULL, *mqp = NULL; + + if(false == has_clean_session(cl_usr)) { + mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev); + if(mqp) + wl_rmfree_try(qos_ack1_wl, prev, mqp); + } + + return true; +} + +static +void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr) +{ + struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL; + uint32_t bmap = CL_BMAP(cl_usr); + + for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) { + next = mqp->next; + + /* Ideally, check should be done to see if cl_usr and mqp are + associated. If yes, then the bit should be cleared. At the + moment, blind clearing of the bit has been implemented and + it has no side effects. + */ + MQP_CL_MAP_CLR(mqp, bmap); + + /* MQP with no clients has no more use, so try deleting MQP */ + if(wl_rmfree_try(wl, prev, mqp)) + mqp = prev; /* MQP deleted, prep for next iteration */ + } + + return; +} + +static void ack1_wl_mqp_purge(struct client_usr *cl_usr) +{ + wl_purge(qos_ack1_wl, cl_usr); +} + +static void sess_wl_mqp_purge(struct client_usr *cl_usr) +{ + wl_purge(sess_mqp_wl, cl_usr); +} + +static void session_resume(struct client_usr *cl_usr) +{ + ack1_wl_mqp_dispatch(cl_usr); + ack2_msg_id_dispatch(cl_usr); + sess_wl_mqp_dispatch(cl_usr); +} + +static void session_delete(struct client_usr *cl_usr) +{ + ack1_wl_mqp_purge(cl_usr); + ack2_msg_id_purge(cl_usr); + sess_wl_mqp_purge(cl_usr); +} + +static bool has_session_data(struct client_usr *cl_usr) +{ + uint32_t map = CL_BMAP(cl_usr); + struct mqtt_packet *elem; + + if(cl_usr->n_sub) + return true; + + elem = qos_ack1_wl->head; + while(elem) { + if(MQP_CL_MAP_GET(elem) & map) + return true; + + elem = elem->next; + } + + elem = sess_mqp_wl->head; + while(elem) { + if(MQP_CL_MAP_GET(elem) & map) + return true; + + elem = elem->next; + } + + return false; +} + +bool cl_can_session_delete(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + return cl_usr? (has_clean_session(cl_usr) || + !has_session_data(cl_usr)) : false; +} + +void cl_on_net_close(void *usr_cl) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + if(is_assigned(cl_usr)) { + if(false == has_session_data(cl_usr)) + cl_usr_free(cl_usr); + } else if(has_clean_session(cl_usr)) { + session_delete(cl_usr); + cl_usr_free(cl_usr); + } else { + set_connect_state(cl_usr, false); + cl_usr->ctx = NULL; + cl_usr->app = NULL; + } + + return; +} + +static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id) +{ + mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP, + MQTT_QOS0, true, msg_id); + + if(qos2_pub_rx_is_done(cl_usr, msg_id)) + qos2_pub_rx_unlog(cl_usr, msg_id); + + return true; +} + +static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id) +{ + struct mqtt_packet *prev = NULL, *mqp = NULL; + + mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev); + if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) { + + wl_rmfree_try(qos_ack1_wl, prev, mqp); + + mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, + MQTT_QOS1, true, msg_id); + + return true; + } + + return false; +} + +bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + bool rv = false; + + if(NULL == cl_usr) + return rv; + + switch(msg_type) { + + case MQTT_PUBACK: + rv = ack1_unmap_rmfree_try(cl_usr, msg_id); + break; + + case MQTT_PUBREC: + rv = proc_pub_rec_rx(cl_usr, msg_id); + break; + + case MQTT_PUBREL: + rv = proc_pub_rel_rx(cl_usr, msg_id); + break; + + case MQTT_PUBCOMP: + rv = ack2_msg_id_unlog(cl_usr, msg_id); + break; + + default: + break; + } + + return rv; +} + +static void assign_cl_index_as_id(struct client_usr *cl_usr) +{ + /* TBD: need a better implementation */ + char *client_id = cl_usr->client_id; + + client_id[0] = 'S'; + client_id[1] = 'e'; + client_id[2] = 'l'; + client_id[3] = 'f'; + client_id[4] = '-'; + client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30; + client_id[6] = ((cl_usr->index & 0x0f)) + 0x30; + client_id[7] = '\0'; + + /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */ + + return; +} + +static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf) +{ + struct client_usr *cl_usr, *fr_usr = NULL; + int32_t idx = 0; + for(idx = 0; idx < MAX_CLIENTS; idx++) { + cl_usr = users + idx; + if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr)) + fr_usr = cl_usr; /* Note 1st free cl_usr */ + + if((NULL == client_id) && (NULL != fr_usr)) { + /* Free cl_usr is present to create CL-ID */ + break; + + } else if((NULL != client_id) && + (0 == strncmp(cl_usr->client_id, client_id, + MAX_CLIENT_ID_LEN))) { + /* Found a client obj with exact ID match */ + if(is_connected(cl_usr)) { + /* Error: CL-ID is already active */ + vh_buf[1] = CONNACK_RC_CLI_REJECT; + cl_usr = NULL; + } + break; + } + } + + if(idx == MAX_CLIENTS) { /* Did not find a match */ + cl_usr = fr_usr; + if(NULL == cl_usr) + /* Server Resource unavailable */ + vh_buf[1] = CONNACK_RC_SVR_UNAVBL; + } + + if(NULL != cl_usr) { + if(NULL == client_id) + assign_cl_index_as_id(cl_usr); /* Get ID */ + else if(IS_CL_USR_FREE(cl_usr)) + strncpy(cl_usr->client_id, client_id, + MAX_CLIENT_ID_LEN); + + set_assigned(cl_usr, true); + } + + USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r", + cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1, + cl_usr? cl_usr->client_id : ""); + + return cl_usr; +} + +uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id, + void *app_cl, void *will, void **usr_cl) +{ + uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT}; + struct client_usr *cl_usr; + + if(client_id && ('\0' == client_id[0])) + /* Shouldn't happen: CLIENT ID with a NUL only string */ + return CONNACK_RC_CLI_REJECT; + + cl_usr = assign_cl_usr(client_id, vh_buf); + if(NULL == cl_usr) + return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */ + + if((false == clean_session) && has_session_data(cl_usr)) + vh_buf[0] = 0x01; /* Set Session Present Flag */ + +#if 0 + if(0x00 == vh_buf[1]) + *usr_cl = (void*)cl_usr; +#endif + *usr_cl = (void*)cl_usr; + + cl_usr->ctx = ctx_cl; + cl_usr->app = app_cl; + cl_usr->will = will; + + return ((vh_buf[0] << 8) | vh_buf[1]); +} + +void cl_on_connack_send(void *usr_cl, bool clean_session) +{ + struct client_usr *cl_usr = (struct client_usr*) usr_cl; + + if(false == is_assigned(cl_usr)) { + USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r", + cl_usr->index); + return; + } + + set_assigned(cl_usr, false); + set_clean_session(cl_usr, clean_session); + set_connect_state(cl_usr, true); + + if(clean_session) { + session_delete(cl_usr); + } else { + session_resume(cl_usr); + } + + DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id); + + return; +} + +int32_t cl_mgmt_init(void) +{ + cl_usr_init(); + + return 0; +} + +}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 client_mgmt.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/client_mgmt.h Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,69 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#ifndef __CLIENT_MGMT_H__ +#define __CLIENT_MGMT_H__ + +#include "mqtt_common.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +namespace mbed_mqtt { + +uint32_t cl_bmap_get(void *usr_cl); + +void *cl_app_hndl_get(void *usr_cl); + +void *cl_will_hndl_get(void *usr_cl); + +bool cl_can_session_delete(void *usr_cl); + +void cl_sub_count_add(void *usr_cl); + +void cl_sub_count_del(void *usr_cl); + +bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id); + +void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp); + +int32_t cl_pub_msg_send(void *usr_cl, + const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, enum mqtt_qos qos, bool retain); + +void cl_on_net_close(void *usr_cl); + +bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id); + +/* uint16_t composition: MSB is CONNACK-Flags and LSB is CONNACK-RC. The place-holder + '*usr_cl' has valid value, only if CONNACK-RC in return value is 0. +*/ +uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id, + void *app_cl, void *will, void **usr_cl); + +void cl_on_connack_send(void *usr_cl, bool clean_session); + +int32_t cl_mgmt_init(void); + +}//namespace mbed_mqtt + +#ifdef __cplusplus +} +#endif + +#endif +
diff -r 000000000000 -r 547251f42a60 server_core.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_core.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,1695 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "client_mgmt.h" +#include "server_util.h" +#include "server_pkts.h" +#include "server_plug.h" +#include "server_core.h" + +namespace mbed_mqtt { + +#ifndef CFG_SR_MAX_SUBTOP_LEN +#define MAX_SUBTOP_LEN 32 +#else +#define MAX_SUBTOP_LEN CFG_SR_MAX_SUBTOP_LEN +#endif + +#ifndef CFG_SR_MAX_TOPIC_NODE +#define MAX_TOP_NODES 32 +#else +#define MAX_TOP_NODES CFG_SR_MAX_TOPIC_NODE +#endif + +/* A topic name (or string or tree or hierarchy) is handled as a series of nodes. + A 'topic node' refers to a level in the topic tree or hierarchy and it stores + the associated part of the topic string at that level. For example, a topic + having a name "abc/x/1" will have three nodes for each of the subtopics, viz, + "abc/", "x/" and "1". Further, the association between the nodes is indicated + as following. + + -- down link hierarchy --> -- down link hierarchy --> + "abc/" "x/" "1" + <-- up link hierarchy -- <-- up link hierarchy -- + + To extend this example, another topic having a name "abc/y/2" will be managed + as following. + + -- down link hierarchy --> -- down link hierarchy --> + "abc/" "x/" "1" + <-- up link hierarchy -- <-- up link hierarchy -- + | ^ + | | + down link neighbour up link neighbour + | | + V | + -- down link hierarchy --> + "y/" "2" + <-- up link hierarchy -- + + To reduce wasted in byte alignments, the structure has been hand packed. +*/ +static struct topic_node { + + struct topic_node *dn_nhbr; /* Down Link Neighbour node */ + struct topic_node *up_nhbr; /* Up Link Neighbour node */ + + struct topic_node *dn_hier; /* Down Link Hierarchy node */ + struct topic_node *up_hier; /* Up Link Hierarchy node */ + + uint32_t cl_map[3]; /* Subscribers for each QOS */ + + uint8_t *my_data; /* Leaf node: retained data */ + uint32_t my_dlen; + + void *will_cl; /* CL OBJ of WILL Leaf node */ + +#define TNODE_PROP_RETAIN_DATA 0x04 + /* Bits 0,1 for QOS and rest are flags */ + uint8_t my_prop; + + uint8_t pg_map; /* Map: application plugins */ + + uint16_t toplen; /* Length of node sub-topic */ + char subtop[MAX_SUBTOP_LEN];/* NUL terminated sub-topic */ + +} nodes[MAX_TOP_NODES]; + +/* + Housekeeping to manage subtopics (nodes) at run-time. +*/ +static struct _node_stack { + struct topic_node *node; + uint32_t val1; + uint32_t val2; + +} node_stack[MAX_TOP_NODES]; + +static int32_t stack_idx = 0; + +static void stack_add(struct topic_node *node, uint32_t val1, uint32_t val2) +{ + node_stack[stack_idx].node = node; + node_stack[stack_idx].val1 = val1; + node_stack[stack_idx].val2 = val2; + + stack_idx++; +} + +static inline struct _node_stack *stack_pop() +{ + if(0 == stack_idx) + return NULL; + + return node_stack + (--stack_idx); +} + +static inline bool is_node_retain(const struct topic_node *node) +{ + return (node->my_prop & TNODE_PROP_RETAIN_DATA)? true : false; +} + +static inline void node_retain_set(struct topic_node *node, bool retain) +{ + if(retain) + node->my_prop |= TNODE_PROP_RETAIN_DATA; + else + node->my_prop &= ~TNODE_PROP_RETAIN_DATA; +} + +static inline void node_qid_set(struct topic_node *node, uint8_t qid) +{ + node->my_prop &= ~QID_VMASK; + node->my_prop |= qid; +} + +static inline uint8_t node_qid_get(struct topic_node *node) +{ + return node->my_prop & QID_VMASK; +} + +static inline bool is_node_willed(const struct topic_node *node) +{ + return (NULL != node->will_cl)? true : false; +} + +static inline bool enrolls_plugin(const struct topic_node *node) +{ + return (PG_MAP_ALL_DFLTS != node->pg_map)? true : false; +} + +static struct topic_node *free_nodes = NULL; +static struct topic_node *root_node = NULL; + +static void node_reset(struct topic_node *node) +{ + node->dn_nhbr = NULL; + node->up_nhbr = NULL; + node->dn_hier = NULL; + node->up_hier = NULL; + + node->cl_map[0] = node->cl_map[1] = node->cl_map[2] = 0; + node->my_data = NULL; + node->my_dlen = 0; + node->will_cl = NULL; + node->my_prop = QID_VMASK; + + node->pg_map = PG_MAP_ALL_DFLTS; + node->toplen = 0; +} + +static void topic_node_init(void) +{ + int i = 0; + + for(i = 0; i < MAX_TOP_NODES; i++) { + struct topic_node *node = nodes + i; + + node_reset(node); + + node->dn_nhbr = free_nodes; + free_nodes = node; + } + + return; +} + +static struct topic_node *alloc_topic_node(void) +{ + struct topic_node *node = free_nodes; + if(NULL != node) { + free_nodes = node->dn_nhbr; + + node_reset(node); + } + + return node; +} + +static void free_node(struct topic_node *node) +{ + node_reset(node); + + node->dn_nhbr = free_nodes; + free_nodes = node; +} + +static bool is_end_of_subtop(const char *topstr, char const **next_subtop) +{ + bool rv = false; + + if('\0' == *topstr) { + *next_subtop = NULL; /* Reached end of topstr */ + rv = true; + } + + if('/' == *topstr) { + /* Next sub-topic is NULL, if a '\0' follows '/' */ + *next_subtop = *(topstr + 1)? topstr + 1 : NULL; + rv = true; + } + + return rv; +} + +static int32_t subtop_read(const char *topstr, char *subtop_buf, uint16_t buf_length, + char const **next_subtop) +{ + int32_t idx = 0, len = buf_length; + + *next_subtop = topstr; + + while(idx < (len - 1)) { + subtop_buf[idx++] = *topstr; + + if(is_end_of_subtop(topstr, next_subtop)) + break; + + topstr++; + } + + if(idx == len) { + USR_INFO("S: Fatal, insufficient buffer for sub-str\n\r"); + return -1; /* zero or insufficient buffer provided */ + } + + /* NUL terminated buffer: unless '\0' ended copy, make one */ + if('\0' == subtop_buf[idx - 1]) + idx--; /* A fix up */ + else + subtop_buf[idx] = '\0'; + + return idx; +} + +static +struct topic_node *alloc_node_subtop(const char *topstr, char const **next_subtop) +{ + uint16_t len = 0; + struct topic_node *node = alloc_topic_node(); + if(NULL == node) + return NULL; + + len = subtop_read(topstr, node->subtop, MAX_SUBTOP_LEN, next_subtop); + if(len <= 0) { + free_node(node); + return NULL; + } + + node->toplen = len; + + return node; +} + +/* Returns true if string 'subtop' is part of string 'topstr' otherwise false. + Additionally, on success, pointer to next subtopic in 'topstr' is provided. +*/ +bool is_first_subtop(const char *subtop, const char *topstr, char const **next_subtop) +{ + while(*subtop == *topstr) { + if(is_end_of_subtop(topstr, next_subtop)) + return true; + + subtop++; + topstr++; + } + + return false; +} + +/* Find a topic node in neighbour list that matches first subtopic in 'topstr'. + Additionally, on success, pointer to next subtopic in 'topstr' is provided. +*/ +struct topic_node *subtop_nhbr_node_find(const struct topic_node *root_nh, + const char *topstr, + char const **next_subtop) +{ + /* This routine does not make use of 'next_subtop' */ + while(root_nh) { + if(true == is_first_subtop(root_nh->subtop, topstr, next_subtop)) + break; + + root_nh = root_nh->dn_nhbr; + } + + return (struct topic_node*) root_nh;/* Bad: const from poiner removed */ +} + +/* Find a topic node in neighbour list that matches the given 'subtop' string */ +struct topic_node *nhbr_node_find(const struct topic_node *root_nh, + const char *subtop) +{ + const char *next_subtop = NULL; + + return subtop_nhbr_node_find(root_nh, subtop, &next_subtop); +} + +/* Find leaf node of branch-combo that matches complete 'topstr'. + Modus Operandi: For each sub topic in 'topstr', go across neighbour list, + then for matching neighbour node, make its child node as root of neighbour + list for another iteration for next sub topic. +*/ +/* Complex */ +static struct topic_node *lowest_node_find(const char *topstr, bool *flag_nh, + char const **next_subtop) +{ + struct topic_node *root_nh = root_node, *node = NULL; + + *next_subtop = topstr; + *flag_nh = false; + + while(root_nh) { + node = subtop_nhbr_node_find(root_nh, topstr, next_subtop); + if(NULL == node) { /* Partial or no match */ + *flag_nh = true; + node = root_nh; + break; + } + + /* NULL '*next_subtop' indicates a complete match */ + if(NULL == (*next_subtop)) + break; + + root_nh = node->dn_hier; + topstr = *next_subtop; + } + + return node; +} + +struct topic_node *leaf_node_find(const char *topstr) +{ + const char *next_subtop; + bool flag_nh; + struct topic_node *node = lowest_node_find(topstr, &flag_nh, + &next_subtop); + + return (NULL == next_subtop)? node : NULL; +} + +static void try_node_delete(struct topic_node *node); /* Forward declaration */ + +/* Create 'child only' hierarchy of nodes to hold all sub topics in 'topstr'. + The routine returns a start node of hierarchy and also provides leaf node. +*/ +static +struct topic_node *hier_nodes_create(const char *topstr, struct topic_node **leaf) +{ + struct topic_node *base = NULL, *node = NULL, *prev = NULL; + const char *next_subtop = NULL; + + do { + node = alloc_node_subtop(topstr, &next_subtop); + + if(NULL == node) { + /* Allocation of a node failed, free up + the previous allocations, if any, in + the hierarchy that was being created */ + if(prev) + try_node_delete(prev); + + base = NULL; + break; + } + + if(NULL == prev) { + // prev = node; + base = node; /* First node of hierarchy */ + } else { + prev->dn_hier = node; + node->up_hier = prev; + // prev = node; + } + + prev = node; + topstr = next_subtop; + + } while(next_subtop); + + *leaf = node; + + DBG_INFO("S: Hierarchy of nodes created: Base: %s & Leaf: %s\n\r", + base? base->subtop : " ", (*leaf)? (*leaf)->subtop : " "); + + return base; +} + +static void install_nhbr_node(struct topic_node *base, struct topic_node *node) +{ + while(base->dn_nhbr) { + base = base->dn_nhbr; + } + + base->dn_nhbr = node; + node->up_nhbr = base; + + DBG_INFO("S: %s added as a neighbour to %s\n\r", + node->subtop, base->subtop); +} + +static void set_up_hier_nodes(struct topic_node *up_hier, + struct topic_node *dn_hier) +{ + up_hier->dn_hier = dn_hier; + dn_hier->up_hier = up_hier; + + DBG_INFO("%s added as DN HIER to %s \n\r", + dn_hier->subtop, up_hier->subtop); +} + +static inline void install_topic_root_node(struct topic_node *node) +{ + root_node = node; +} + +/* Create or update tree to create branch-combo to refer to 'topstr'. + Modus operandi: + +*/ +struct topic_node *topic_node_create(const char *topstr) +{ + struct topic_node *base, *node, *leaf; + const char *next_subtop = topstr; + bool flag_nh; + + base = lowest_node_find(topstr, &flag_nh, &next_subtop); + if(NULL == next_subtop) + return base; /* Found exact match */ + + /* Control reaches here, if either no or partial branch-combo has been + found for 'topstr'. Now, let's create remaining branches for string + 'next_subtop' and assign them to appropriately to topic tree. + */ + DBG_INFO("S: Creating Hierarchy for %s\n\r", next_subtop); + + node = hier_nodes_create(next_subtop, &leaf); + if(node) { + if(NULL == base) + install_topic_root_node(node); + else if(flag_nh) + install_nhbr_node(base, node); + else + set_up_hier_nodes(base, node); + + return leaf; + } + + return NULL; +} + +static bool can_delete_node(const struct topic_node *node) +{ + if(node->up_nhbr && node->up_hier) { + USR_INFO("S: fatal, node w/ up-nhbr & up-hier.\n\r"); + return false; + } + + if(node->dn_nhbr || + node->dn_hier) + return false; + + return true; +} + +static struct topic_node *node_delete(struct topic_node *node) +{ + struct topic_node *next = NULL; + + if(false == can_delete_node(node)) + return NULL; + + if(node->up_nhbr) { + node->up_nhbr->dn_nhbr = NULL; + next = node->up_nhbr; + } + + if(node->up_hier) { + node->up_hier->dn_hier = NULL; + next = node->up_hier; + } + + if((NULL == node->up_nhbr) && + (NULL == node->up_hier)) + root_node = NULL; + + DBG_INFO("S: Deleted node %s\n\r", node->subtop); + + free_node(node); + + return next; +} + +struct topbuf_desc { + char *buffer; + uint16_t maxlen; + uint16_t offset; +}; + +static bool topbuf_add(struct topbuf_desc *buf_desc, const struct topic_node *node) +{ + const char *next_subtop; + char *buf = buf_desc->buffer + buf_desc->offset; + uint16_t len = buf_desc->maxlen - buf_desc->offset; + + int32_t rv = subtop_read(node->subtop, buf, len, &next_subtop); + if(rv < 0) + return false; + + if(NULL != next_subtop) { + USR_INFO("S: topstr_add fatal, bad subtop.\n\r"); + return false; + } + + buf_desc->offset += rv; + + return true; +} + +static bool topbuf_cpy(struct topbuf_desc *buf_desc, const char *subtop) +{ + const char *next_subtop; + char *buf = buf_desc->buffer + buf_desc->offset; + uint16_t len = buf_desc->maxlen - buf_desc->offset; + + int32_t rv = subtop_read(subtop, buf, len, &next_subtop); + if(rv < 0) + return false; + + if(NULL != next_subtop) { + USR_INFO("S: topstr_copy fatal, bad subtop.\n\r"); + return false; + } + + buf_desc->offset += rv; + + return true; +} + +static bool has_a_wildcard(const struct topic_node *node) +{ + const char *str = node->subtop; + while('\0' != *str) { + if(('+' == *str) || ('#' == *str)) + return true; + + str++; + } + + return false; +} + +static bool is_node_SUB_subtop(const struct topic_node *node, const char *subtop) +{ + if(false == has_a_wildcard(node)) + return ((0 == strcmp(node->subtop, subtop)) || + (node->dn_hier && (0 == strcmp("+/", subtop))) || + (!node->dn_hier && (0 == strcmp("+", subtop))))? + true : false; + + return false; +} + +/* Search node tree, created by PUB retention, for the branch combo, whose + absolute sub-topic sequence 'matches', in entirety, the topic, which is + being subscribed. The 'match' criteria is met either through the + wildcard sub-topics or exact compare. + + The hierarchical search starts at the specified 'base' node and ends at + the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the + 'topSUB', then the leaf node is returned, otherwise a NULL is returned. + + As part of hierarchical search, neighbouring nodes, if any, are logged + for subsequent iteration of the routine. +*/ +static struct topic_node *pub_hier_search(const char *topSUB, + const struct topic_node *base, + struct topbuf_desc *mk_pubtop) +{ + const struct topic_node *node = base, *prev = NULL; + const char *next_subtop = topSUB; + char subtop[MAX_SUBTOP_LEN]; + + while(next_subtop && node) { + if(subtop_read(topSUB, subtop, MAX_SUBTOP_LEN, + &next_subtop) <= 0) + break; + + if(node->dn_nhbr) + stack_add(node->dn_nhbr, (uint32_t)topSUB, mk_pubtop->offset); + + if(false == is_node_SUB_subtop(node, subtop)) + break; /* Node doesn't form part of published topic */ + + if(false == topbuf_add(mk_pubtop, node)) + break; + + prev = node; + node = node->dn_hier; + + topSUB = next_subtop; + } + + if(NULL != next_subtop) + node = NULL; + else + node = prev; + + return (struct topic_node *)node; +} + +static bool is_node_PUB_subtop(const struct topic_node *node, + const char *subtop, bool endtop) +{ + /* Assumes that subtop hasn't got any wildcard characater */ + return ((0 == strcmp(subtop, node->subtop)) || + (!endtop && (0 == strcmp("+/", node->subtop))) || + (endtop && (0 == strcmp("+", node->subtop))))? + true : false; +} + +/* Search node tree, created by subscriptions, for the branch combo, whose + sub-topic sequence 'matches', in entirety, the absolute topic, to which + data has been published. The 'match' criteria is met either through the + wildcard sub-topics or exact compare. + + The hierarchical search starts at the specified 'base' node and ends at + the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the + 'topPUB', then the leaf node is returned, otherwise a NULL is returned. + + As part of hierarchical search, neighbouring nodes, if any, are logged + for subsequent iteration of the routine. +*/ +static struct topic_node *SUB_leaf_search(const char *topPUB, + const struct topic_node *base) +{ + const struct topic_node *node = base, *prev = NULL; + const char *next_subtop = topPUB; + char subtop[MAX_SUBTOP_LEN]; + + while(next_subtop && node) { + if(subtop_read(topPUB, subtop, MAX_SUBTOP_LEN, + &next_subtop) <= 0) + break; + + if(node->dn_nhbr) + stack_add(node->dn_nhbr, (uint32_t)topPUB, 0); + + if(0 == strcmp("#", node->subtop)) + goto SUB_leaf_search_exit1; + + if(false == is_node_PUB_subtop(node, subtop, !next_subtop)) + break; /* Node doesn't form part of published topic */ + + prev = node; + node = node->dn_hier; + + topPUB = next_subtop; + } + + if(NULL != next_subtop) + node = NULL; + else + node = prev; + +SUB_leaf_search_exit1: + return (struct topic_node*) node; // Bad +} + + +/*------------------------------------------------------------------------- + * MQTT Routines + *------------------------------------------------------------------------- + */ + +#define WBUF_LEN MQP_SERVER_RX_LEN /* Assignment to ease implementation */ +static char work_buf[WBUF_LEN]; + +static void try_node_delete(struct topic_node *node) +{ + while(node) { + + if(is_node_retain(node) || + is_node_willed(node) || + enrolls_plugin(node) || + node->cl_map[0] || + node->cl_map[1] || + node->cl_map[2]) + break; + + node = node_delete(node); + } +} + +/* Move this to a common file */ +static void pub_msg_send(const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, uint8_t fh_flags, uint32_t cl_map) +{ + enum mqtt_qos qos = ENUM_QOS(fh_flags); + struct mqtt_packet *mqp = NULL; + + mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len); + if(NULL == mqp) + return; + + if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) || + (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) { + mqp_free(mqp); + return; + } + + mqp_prep_fh(mqp, fh_flags); + + if(cl_map) + cl_pub_dispatch(cl_map, mqp); + + return; +} + +static struct topic_node *SUB_node_create(const char *topSUB, uint8_t qid, void *usr_cl) +{ + struct topic_node *leaf = topic_node_create(topSUB); + if(leaf) { + uint8_t j = 0; + uint32_t map = cl_bmap_get(usr_cl); + + for(j = 0; j < 3; j++) + /* Client: clear QOS of existing sub, if any */ + leaf->cl_map[j] &= ~map; + + leaf->cl_map[qid] |= map; + + cl_sub_count_add(usr_cl); + } + + return leaf; +} + +static uint8_t proc_pub_leaf(struct topic_node *leaf, const struct utf8_string *topic, + enum mqtt_qos qos, void *usr_cl) +{ + uint8_t qid = QOS_VALUE(qos); + + if(is_node_retain(leaf)) { + /* If it is an earlier created topic w/ retained + data, then pick lower of the two QOS(s) */ + qid = MIN(node_qid_get(leaf), qid); + + /* Publish the retained data to this client */ + pub_msg_send(topic, leaf->my_data, leaf->my_dlen, + MAKE_FH_FLAGS(false, qid, true), + cl_bmap_get(usr_cl)); + } + + return qid; +} + +/* Multi-level wild-card subscription - search of all of the tree i.e. + "no topic" ('no_top') in particular and publish it to client, if + the hierarchy has no wild-card node. */ +static +uint8_t proc_pub_hier_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + struct topic_node *node = base, *leaf = NULL; + uint8_t ack = QOS_VALUE(qos); + + /* 1. Find the leaf node of a non wildcard branch-combo */ + while(node) { + if(node->dn_nhbr) + stack_add(node->dn_nhbr, 0, mk_pubtop->offset); + + if(has_a_wildcard(node)) + break; + + if(false == topbuf_add(mk_pubtop, node)) + break; + + leaf = node; + node = node->dn_hier; + } + + /* A non NULL value of 'node' would indicate a hierarchy with a + wildcard (sub-)topic (the 'node') - not suitable for PUB. */ + + if(NULL == node) { + /* 2. Send retained data, if any, to SUB Client */ + struct utf8_string topic = {mk_pubtop->buffer, + mk_pubtop->offset}; + + /* In this version, at this juncture, the 'leaf' + will not be NULL. Nevertheless a check (for + the sake of static analytical tools).........*/ + if(leaf) + ack = proc_pub_leaf(leaf, &topic, qos, usr_cl); + } + + return ack; +} + +/* Multi-level wild-card subscription - search of all of the tree i.e. + "no topic" ('no_top') in particular and publish it to client, if + a hierarchy in the tree has no wild-card node. */ +static +uint8_t proc_pub_tree_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + uint32_t stack_ref = stack_idx; + uint8_t min = QOS_VALUE(qos); + + if(base != NULL) + stack_add(base, 0, mk_pubtop->offset); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + uint8_t ack; + + mk_pubtop->offset = (uint16_t) stack->val2; + + ack = proc_pub_hier_no_top(stack->node, mk_pubtop, qos, usr_cl); + if(ack < min) + min = ack; + } + + return min; +} + +static uint8_t proc_pub_hier_SUBtop(const char *topSUB, const struct topic_node *base, + struct topbuf_desc *mk_pubtop, enum mqtt_qos qos, + void *usr_cl) +{ + struct topic_node *leaf = pub_hier_search(topSUB, base, mk_pubtop); + uint8_t min = QOS_VALUE(qos); + + if(leaf) { + struct utf8_string topic = {mk_pubtop->buffer, + mk_pubtop->offset}; + + min = proc_pub_leaf(leaf, &topic, qos, usr_cl); + } + + return min; +} + +/* used by sl or no wc */ +static uint8_t proc_pub_tree_SUBtop(const char *topSUB, struct topic_node *base, + struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + uint32_t stack_ref = stack_idx; + uint8_t min = QOS_VALUE(qos); + + if(NULL != base) + stack_add(base, (uint32_t)topSUB, mk_pubtop->offset); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + uint8_t ack; + + mk_pubtop->offset = stack->val2; + ack = proc_pub_hier_SUBtop((char*)stack->val1, stack->node, + mk_pubtop, qos, usr_cl); + + if(ack < min) + min = ack; + } + + return min; +} + +static +uint8_t proc_sub_ml_wc_hier(const char *grandpa_topSUB, char *parent_subtop, + struct topic_node *base, struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + uint8_t min = QOS_VALUE(qos), ack = QFL_VALUE; + char *subtop = NULL; + + /* 1. Search hier node for 'grandpa' and if found, get to parent level */ + if('\0' != grandpa_topSUB[0]) { + struct topic_node *leaf = pub_hier_search(grandpa_topSUB, base, + mk_pubtop); + if(NULL == leaf) + return min; + + base = leaf->dn_hier; /* nhbr root at parent level */ + } + + /* 2. If present, process parent as a leaf and get its down hierarchy */ + subtop = parent_subtop; + if(('\0' != subtop[0]) && ('+' != subtop[0]) && ('/' != subtop[0])) { + uint16_t offset = mk_pubtop->offset; /* Refer to grandpa's pubtop */ + uint16_t sublen = 0; + + while(subtop[sublen]){sublen++;} + + ack = proc_pub_tree_SUBtop(subtop, base, mk_pubtop, qos, usr_cl); + mk_pubtop->offset = offset; /* Restores grandpa's pubtop */ + + subtop[sublen] = '/'; /* Make parent's hier subtop */ + + base = nhbr_node_find(base, subtop); + if(base) + base = topbuf_cpy(mk_pubtop, subtop)? + base->dn_hier : NULL; + subtop[sublen] = '\0'; /* Get back, original subtop */ + } + + min = MIN(min, ack); + /* 3. Process '#' WC by walking thru entire sub-tree of parent 'base' */ + if(NULL != base) + ack = proc_pub_tree_no_top(base, mk_pubtop, qos, usr_cl); + + return MIN(min, ack); +} + +static uint8_t proc_sub_ml_wc_tree(char *grandpa_topSUB, char *parent_subtop, + struct topic_node *base, + enum mqtt_qos qos, void *usr_cl) +{ + struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */}; + uint32_t stack_ref = stack_idx; + uint8_t min = QOS_VALUE(qos); + + if(NULL != base) + stack_add(base, (uint32_t)grandpa_topSUB, mk_pubtop.offset); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + uint8_t ack; + + mk_pubtop.offset = stack->val2; + ack = proc_sub_ml_wc_hier((char*)stack->val1, parent_subtop, + stack->node, &mk_pubtop, qos, usr_cl); + if(ack < min) + min = ack; + } + + return min; +} + +static uint8_t ml_wc_nodes_create(char *parent_topSUB, uint16_t toplen, uint8_t qid, void *usr_cl) +{ + struct topic_node *parent_leaf = NULL; + + if('\0' != parent_topSUB[0]) { + parent_leaf = SUB_node_create(parent_topSUB, qid, usr_cl); + if(NULL == parent_leaf) + return QFL_VALUE; + } + + /* Get the topic SUB to it's original state */ + if(toplen > 1) parent_topSUB[toplen - 2] = '/'; + parent_topSUB[toplen - 1] = '#'; + + if(NULL == SUB_node_create(parent_topSUB, qid, usr_cl)) { + /* Failed to create WC topic, so delete parent as well. + In this revision, 'parent_leaf' will not be a 'NULL' + at this juncture, nevertheless a check (for tools) */ + if(parent_leaf) + node_delete(parent_leaf); + + return QFL_VALUE; + } + + return qid; +} + +/* Process Multi-level Wildcard Topic SUBSCRIBE */ +static uint8_t proc_sub_ml_wildcard(char *topSUB, uint16_t toplen, enum mqtt_qos qos, + void *usr_cl) +{ + uint16_t len = 0, limit = MIN(toplen, MAX_SUBTOP_LEN); + char subtop[MAX_SUBTOP_LEN], *ptr; + uint8_t min = QOS_VALUE(qos); + + /* 'topSUB': Need to create grandpa topic and parent-subtopic */ + topSUB[toplen - 1] = '\0'; /* Remove '#' */ + if(toplen > 1) /* Remove '/' */ + topSUB[toplen - 2] = '\0'; + + do { /* Do processing to get parent sub-topic into buffer */ + if('/' == topSUB[toplen - len - 1]) + break; /* found '/' */ + + len++; /* Copy parent characters */ + subtop[MAX_SUBTOP_LEN - len] = topSUB[toplen - len]; + } while(len < limit); + + if((toplen > len) && ('/' != topSUB[toplen - len - 1])) + return QFL_VALUE; /* Bad Length */ + + topSUB[toplen - len] = '\0'; /* End of grand-pa's topic name */ + ptr = subtop + MAX_SUBTOP_LEN - len; /* Parent's leaf subtop */ + min = proc_sub_ml_wc_tree(topSUB, ptr, root_node, qos, usr_cl); + + /* Make branch-combo to complete processing of parent' topic */ + strcpy(topSUB + toplen - len, ptr); // topSUB[toplen - len] = *ptr; + + /* Create nodes for multi-level wildcard topic & it's parent */ + min = ml_wc_nodes_create(topSUB, toplen, min, usr_cl); + + return min; +} + +/* Process Single-level Wildcard or No Wild Card Topic SUBSCRIBE */ +static +uint8_t proc_sub_sl_or_no_wc(const char *topSUB, enum mqtt_qos qos, void *usr_cl) +{ + struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */}; + uint8_t min = QOS_VALUE(qos); + + /* For single level wildcard or absolute topic, find PUB nodes */ + min = proc_pub_tree_SUBtop(topSUB, root_node, &mk_pubtop, qos, usr_cl); + + if(NULL == SUB_node_create(topSUB, min, usr_cl)) + min = QFL_VALUE; + + return min; +} + +static uint16_t proc_forward_slash(char *buf, uint16_t len) +{ + uint16_t i, j; + for(i = 1, j = 1; i < len; i++) { + char curr = buf[i]; + if(('/' == curr) && (buf[i - 1] == curr)) + continue; /* Drop consecutive '/' */ + + buf[j++] = curr; + } + + if((1 != j) && ('/' == buf[j - 1])) + j--; /* Topic can not end with a '/' */ + + buf[j] = '\0'; + + return j; +} + +static inline bool is_valid_char_order(char prev, char curr) +{ + return ((('/' != prev) && ('+' == curr)) || + (('+' == prev) && ('/' != curr)) || + (('/' != prev) && ('#' == curr)) || + (('#' == prev)))? false : true; +} + + +static bool is_valid_SUB_top(const char *buf, uint16_t len) +{ + char prev, curr; + uint16_t i = 0; + + if((0 == len) || ('\0' == *buf)) + return false; + + curr = buf[0]; + for(i = 1; (i < len) && ('\0' != curr); i++) { + prev = curr; + curr = buf[i]; + + if(false == is_valid_char_order(prev, curr)) + break; + } + + return (i == len)? true : false; +} + +static bool proc_sub_msg_rx(void *usr_cl, const struct utf8_strqos *qos_topics, + uint32_t n_topics, uint16_t msg_id, uint8_t *ack) +{ + int32_t i = 0; + for(i = 0; i < n_topics; i++) { + const struct utf8_strqos *qos_top = qos_topics + i; + enum mqtt_qos qos = qos_top->qosreq; + char *buf = (char*)qos_top->buffer; + uint16_t len = qos_top->length; + + /* Remove zero-topics and trailing '/' from SUB top */ + len = proc_forward_slash(buf, len); + ack[i] = QFL_VALUE; + if(false == is_valid_SUB_top(buf, len)) + continue; + + buf[len] = '\0'; /* Dirty trick, cheeky one */ + + ack[i] = ('#' == buf[len - 1])? + proc_sub_ml_wildcard(buf, len, qos, usr_cl) : + proc_sub_sl_or_no_wc(buf, qos, usr_cl); + + DBG_INFO("SUB Topic%-2d %s is ACK'ed w/ 0x%02x\n\r", + i + 1, buf, ack[i]); + } + + return true; /* Send SUB-ACK and do not close network */ +} + +static +bool proc_sub_msg_rx_locked(void *usr_cl, const struct utf8_strqos *qos_topics, + uint32_t n_topics, uint16_t msg_id, uint8_t *ack) +{ + return proc_sub_msg_rx(usr_cl, qos_topics, n_topics, msg_id, ack); +} + +static void leaf_un_sub(struct topic_node *leaf, void *usr_cl) +{ + uint8_t j = 0; + uint32_t map = cl_bmap_get(usr_cl); + + for(j = 0; j < 3; j++) { + /* Client: clear QOS of existing sub, if any */ + if(0 == (leaf->cl_map[j] & map)) + continue; + + leaf->cl_map[j] &= ~map; + cl_sub_count_del(usr_cl); + + try_node_delete(leaf); + + break; + } +} + +static bool proc_un_sub_msg(void *usr_cl, const struct utf8_string *topics, + uint32_t n_topics, uint16_t msg_id) +{ + uint32_t i = 0; + + for(i = 0; i < n_topics; i++) { + const struct utf8_string *topic = topics + i; + struct topic_node *leaf = NULL; + uint16_t len = topic->length; + + /* The maximum length of 'work_buf' is same as that of RX buffer + in the PKT-LIB. Therefore, the WBUF_LEN is not being checked + against the length of the topic (a constituent of RX buffer). + */ + strncpy(work_buf, topic->buffer, topic->length); + work_buf[len] = '\0'; + + if('#' == work_buf[len - 1]) { /* Multi-level Wildcard */ + work_buf[len - 1] = '\0'; + if(len > 1) + work_buf[len - 2] = '\0'; + + leaf = leaf_node_find(work_buf); + if(leaf) + leaf_un_sub(leaf, usr_cl); + + if(len > 1) + work_buf[len - 2] = '/'; + work_buf[len - 1] = '#'; + } + + leaf = leaf_node_find(work_buf); + if(leaf) + leaf_un_sub(leaf, usr_cl); + } + + return true; /* Do not close network */ +} + +static +bool proc_un_sub_msg_locked(void *usr_cl, const struct utf8_string *topics, + uint32_t n_topics, uint16_t msg_id) +{ + return proc_un_sub_msg(usr_cl, topics, n_topics, msg_id); +} + +static void +leaf_msg_send(const struct topic_node *leaf, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, bool dup, enum mqtt_qos qos, + bool retain) +{ + uint8_t qid = 0, fh_fgs = 0; + + for(qid = 0; qid < 3; qid++) { + uint8_t map = leaf->cl_map[qid]; + fh_fgs = MAKE_FH_FLAGS(dup, MIN(qid, QOS_VALUE(qos)), retain); + + if(map) + pub_msg_send(topic, data_buf, data_len, fh_fgs, map); + } + + if(enrolls_plugin(leaf)) + plugin_publish(leaf->pg_map, topic, data_buf, data_len, + dup, qos, retain); + + return; +} + +static void node_data_set(struct topic_node *node, uint8_t *data, + uint32_t dlen, uint8_t qid, bool retain) +{ + node->my_data = data; + node->my_dlen = dlen; + + node_qid_set(node, qid); + node_retain_set(node, retain); + + return; +} + +static bool node_data_update(struct topic_node *node, bool drop_qid0, + const uint8_t *data_buf, uint32_t data_len, + uint8_t qid, bool retain) +{ +#define NODE_DATA_RESET_PARAMS NULL, 0, 0, false + + /* Assumes that caller has provided either reset or valid params */ + + uint8_t *data = NULL; + + if(node->my_dlen) + my_free(node->my_data); + + /* Watch out for assignment in 'if' statement - avoid such smarts */ + if((drop_qid0 && (0 == qid)) || (data_buf && !(data = (uint8_t*)my_malloc(data_len)))) { + node_data_set(node, NODE_DATA_RESET_PARAMS); + } else { + + if(data) + buf_wr_nbytes(data, data_buf, data_len); + + node_data_set(node, data, data_len, qid, retain); + } + + return ((!!data) ^ (!!data_len))? false : true; +} + +static inline bool is_wildcard_char(char c) +{ + return (('+' == c) || ('#' == c))? true : false; +} + +static int32_t pub_topic_read(const struct utf8_string *topic, char *buf, uint32_t len) +{ + uint32_t i = 0; + uint32_t toplen = topic->length; + + if(len < (toplen + 1)) + return -1; + + for(i = 0; i < toplen; i++) { + char c = topic->buffer[i]; + if(is_wildcard_char(c)) + return -1; /* Invalid: wildcard in PUB topic */ + + if('\0' == c) + return -1; /* Invalid: NUL char in PUB topic */ + + buf[i] = c; + } + + buf[i] = '\0'; + + return i; +} + +static +void proc_sub_tree_topPUB(const char *topPUB, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, enum mqtt_qos qos, + bool retain) +{ + struct topic_node *leaf = NULL; + uint32_t stack_ref = stack_idx; + + if(NULL != root_node) + stack_add(root_node, (uint32_t)topPUB, 0 /* Not used */); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + + /* Find leaf node of SUB that matches the PUB topic */ + leaf = SUB_leaf_search((char*)stack->val1, stack->node); + if(leaf) + leaf_msg_send(leaf, topic, data_buf, data_len, + false, qos, retain); + } +} + +static bool _proc_pub_msg_rx(void *usr_cl, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, uint8_t msg_id, + enum mqtt_qos qos, bool retain) +{ + int32_t err = -1; + + /* Prior to msg processing, chk for topic or buffer errors */ + if((pub_topic_read(topic, work_buf, WBUF_LEN) <= 0) || + (proc_forward_slash(work_buf, topic->length) <= 0)) + goto _proc_pub_msg_rx_exit; + + /* If a valid MSG ID is specified for a QOS2 pkt, track it */ + err = -2; + if((msg_id) && + (MQTT_QOS2 == qos) && + (false == cl_mgmt_qos2_pub_rx_update(usr_cl, msg_id))) + goto _proc_pub_msg_rx_exit; + + /* Forward data to all subscribers of PUB topic in server */ + proc_sub_tree_topPUB(work_buf, topic, data_buf, + data_len, qos, false); + + err = 0; + if(retain) { + struct topic_node *leaf = topic_node_create(work_buf); + if((NULL == leaf) || + (false == node_data_update(leaf, true, data_buf, data_len, + QOS_VALUE(qos), retain))) + err = -3; /* Resources no more available */ + + if(leaf) + try_node_delete(leaf); + } + + _proc_pub_msg_rx_exit: + DBG_INFO("Processing of PUB message from %s (0x%08x) has %s (%d)\n\r", + usr_cl? "client" : "plugin", usr_cl? cl_bmap_get(usr_cl) : 0, + err? "failed" : "succeeded", err); + + return (err < 0)? false : true; +} + +static +bool proc_pub_msg_rx_locked(void *usr_cl, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id, + bool dup, enum mqtt_qos qos, bool retain) +{ + return _proc_pub_msg_rx(usr_cl, topic, data_buf, data_len, + msg_id, qos, retain); +} + +static int32_t utf8_str_rd(const struct utf8_string *utf8, char *buf, uint32_t len) +{ + if((NULL == utf8) || (utf8->length > (len - 1))) + return -1; + + buf_wr_nbytes((uint8_t*)buf, (uint8_t*)utf8->buffer, utf8->length); + buf[utf8->length] = '\0'; + + return utf8->length; +} + +#define CONN_FLAGS_WQID_GET(conn_flags) \ + ((conn_flags >> 3) & QID_VMASK) /* WILL QOS VAL */ + +static uint16_t proc_connect_rx(void *ctx_cl, uint8_t conn_flags, + struct utf8_string * const *utf8_vec, void **usr_cl) +{ + struct topic_node *leaf = NULL; + struct utf8_string *utf8 = NULL; + void *app_cl = NULL; + uint16_t utf8_len = 0; + uint16_t rv = plugin_connect(MQC_UTF8_CLIENTID(utf8_vec), + MQC_UTF8_USERNAME(utf8_vec), + MQC_UTF8_PASSWORD(utf8_vec), + &app_cl); + if(rv) + goto proc_connect_rx_exit1; /* Admin did not permit connection */ + + rv = CONNACK_RC_SVR_UNAVBL; /* Server (resource) unavailable */ + + utf8 = MQC_UTF8_WILL_TOP(utf8_vec); + if(utf8 && utf8->length) { + utf8_str_rd(utf8, work_buf, WBUF_LEN); + + leaf = topic_node_create(work_buf); + if(NULL == leaf) + goto proc_connect_rx_exit2; + + if(false == node_data_update(leaf, false, + (uint8_t*)MQC_WILL_MSG_BUF(utf8_vec), + MQC_WILL_MSG_LEN(utf8_vec), + CONN_FLAGS_WQID_GET(conn_flags), + conn_flags & WILL_RETAIN_VAL)) + goto proc_connect_rx_exit3; + } + + utf8 = MQC_UTF8_CLIENTID(utf8_vec); + if(utf8) + utf8_len = utf8_str_rd(utf8, work_buf, WBUF_LEN); + + rv = cl_connect_rx(ctx_cl, (conn_flags & CLEAN_START_VAL)? true : false, + utf8_len? work_buf : NULL, app_cl, leaf, usr_cl); + if(CONNACK_RC_REQ_ACCEPT == (rv & 0xFF)) { + if(leaf) + leaf->will_cl = *usr_cl; + + return rv; /* Connection successful */ + } + + if(leaf) + node_data_update(leaf, true, NODE_DATA_RESET_PARAMS); + + proc_connect_rx_exit3: try_node_delete(leaf); + proc_connect_rx_exit2: plugin_disconn(app_cl, true); + proc_connect_rx_exit1: + return rv; +} + +static +uint16_t proc_connect_rx_locked(void *ctx_cl, uint8_t conn_flags, + struct utf8_string * const *utf8_vec, void **usr_cl) +{ + return proc_connect_rx(ctx_cl, conn_flags, utf8_vec, usr_cl); +} + +static void session_hier_delete(struct topic_node *node, void *usr_cl) +{ + struct topic_node *prev = NULL; + uint32_t cl_map = cl_bmap_get(usr_cl); + + while(node) { + int32_t i = 0; + for(i = 0; i < 3; i++) { + if(node->cl_map[i] & cl_map) { + node->cl_map[i] &= ~cl_map; + cl_sub_count_del(usr_cl); + /* Client/Topic/QID 1-to-1 map */ + break; + } + } + + if(node->dn_nhbr) + stack_add(node->dn_nhbr, 0, 0); + + prev = node; + node = node->dn_hier; + } + + if(prev) + try_node_delete(prev); +} + +void session_tree_delete(void *usr_cl) +{ + uint32_t stack_ref = stack_idx; + + if(NULL != root_node) + stack_add(root_node, 0, 0); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + session_hier_delete(stack->node, usr_cl); + } +} + +static void proc_client_will(struct topic_node *leaf) +{ + uint32_t wbuf_len = WBUF_LEN - 1; /* Make space for '\0' in wbuf */ + uint32_t offset = wbuf_len; + struct utf8_string topic; + struct topic_node *node; + + work_buf[offset] = '\0'; /* Ensures wbuf is NUL terminated */ + node = leaf; + + /* Prepare a topic string by walking back from leaf to root */ + do { + if(offset < node->toplen) + return; + + offset -= node->toplen; + strncpy(work_buf + offset, node->subtop, node->toplen); + + while(node->up_nhbr) + node = node->up_nhbr; + + node = node->up_hier; + + } while(node); + + topic.buffer = work_buf + offset; + topic.length = wbuf_len - offset; + +#define MK_QOS_ENUM(qid) ((enum mqtt_qos)(qid & QID_VMASK)) + + proc_sub_tree_topPUB((char*)topic.buffer, + &topic, leaf->my_data, leaf->my_dlen, + MK_QOS_ENUM(node_qid_get(leaf)), + is_node_retain(leaf)); + +} + +static void on_cl_net_close(void *usr_cl, bool due2err) +{ + struct topic_node *leaf = NULL; + void *app_cl = NULL; + + /* See if client has a WILL that it intends to broadcast */ + leaf = (struct topic_node*) cl_will_hndl_get(usr_cl); + if(NULL != leaf) { + if(usr_cl != leaf->will_cl) + return; /* Mismatch: should never happen */ + + if(due2err) + proc_client_will(leaf); /* pls broadcast */ + + /* Network is closing, so cleanup WILL msg store */ + node_data_update(leaf, true, NODE_DATA_RESET_PARAMS); + leaf->will_cl = NULL; + try_node_delete(leaf); + } + + /* If not needed for future, delete session info */ + if(cl_can_session_delete(usr_cl)) + session_tree_delete(usr_cl); + + /* Inform app that client has been disconnected */ + app_cl = cl_app_hndl_get(usr_cl); + plugin_disconn(app_cl, due2err); + + cl_on_net_close(usr_cl); +} + +static +void on_cl_net_close_locked(void *usr_cl, bool due2err) +{ + on_cl_net_close(usr_cl, due2err); + return; +} + +static void on_connack_send(void *usr_cl, bool clean_session) +{ + /* If asserted, then need to start w/ clean state */ + if(clean_session) + session_tree_delete(usr_cl); + + cl_on_connack_send(usr_cl, clean_session); + + return; +} + +static +void on_connack_send_locked(void *usr_cl, bool clean_session) +{ + on_connack_send(usr_cl, clean_session); + return; +} + +static +bool proc_notify_ack_locked(void *usr_cl, uint8_t msg_type, uint16_t msg_id) +{ + return cl_notify_ack(usr_cl, msg_type, msg_id); +} + +static int32_t proc_topic_enroll(uint8_t pg_id, const struct utf8_string *topic, + enum mqtt_qos qos) +{ + struct topic_node *leaf = NULL; + uint16_t len = 0; + + if((NULL == topic) || (NULL == topic->buffer) || (0 == topic->length)) + return -1; + + if(WBUF_LEN < (topic->length + 1)) + return -2; + + len = topic->length; + strncpy(work_buf, topic->buffer, len); + work_buf[len] = '\0'; + + leaf = topic_node_create(work_buf); + if(NULL == leaf) + return -3; + + PG_MAP_VAL_SETUP(leaf->pg_map, QOS_VALUE(qos), pg_id); + + return 0; +} + +static +int32_t proc_topic_enroll_locked(uint8_t pg_id, const struct utf8_string *topic, + enum mqtt_qos qos) +{ + int32_t rv = 0; + + MUTEX_LOCKIN(); + rv = proc_topic_enroll(pg_id, topic, qos); + MUTEX_UNLOCK(); + + return rv; +} + +static int32_t proc_topic_cancel(uint8_t pg_id, const struct utf8_string *topic) +{ + struct topic_node *leaf = NULL; + uint16_t len = 0; + + if(NULL == topic) + return -1; + + if(WBUF_LEN < (topic->length + 1)) + return -2; + + len = topic->length; + strncpy(work_buf, topic->buffer, len); + work_buf[len] = '\0'; + + leaf = leaf_node_find(work_buf); + if(NULL == leaf) + return -2; + + PG_MAP_VAL_RESET(leaf->pg_map, pg_id); + + try_node_delete(leaf); + + return 0; +} + +static +int32_t proc_topic_cancel_locked(uint8_t pg_id, const struct utf8_string *topic) +{ + int32_t rv = 0; + + MUTEX_LOCKIN(); + rv = proc_topic_cancel(pg_id, topic); + MUTEX_UNLOCK(); + + return rv; +} + +static +int32_t proc_app_pub_send_locked(const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, enum mqtt_qos qos, bool retain) +{ + bool rv; + + MUTEX_LOCKIN(); + /* Received from application, process topic for distribution */ + rv = _proc_pub_msg_rx(NULL, topic, data_buf, data_len, + 0x00, qos, retain); + MUTEX_UNLOCK(); + + return rv? (int32_t)data_len : -1; +} + +int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg, + const struct mqtt_server_app_cfg *app_cfg) +{ + /* If mutex is specified, then the following set of callbacks + are invoked in the locked state - enumerated by 'locked' */ + struct mqtt_server_msg_cbs pkts_cbs = {proc_connect_rx_locked, + proc_sub_msg_rx_locked, + proc_un_sub_msg_locked, + proc_pub_msg_rx_locked, + proc_notify_ack_locked, + on_cl_net_close_locked, + on_connack_send_locked}; + + struct plugin_core_msg_cbs core_cbs = {proc_topic_enroll_locked, + proc_topic_cancel_locked, + proc_app_pub_send_locked}; + + util_params_set(lib_cfg->debug_printf, + lib_cfg->mutex, + lib_cfg->mutex_lockin, + lib_cfg->mutex_unlock); + + USR_INFO("Version: Server LIB %s, Common LIB %s.\n\r", + MQTT_SERVER_VERSTR, MQTT_COMMON_VERSTR); + + topic_node_init(); + + cl_mgmt_init(); + + plugin_init(&core_cbs); + + mqtt_server_lib_init(lib_cfg, &pkts_cbs); + + return 0; +} + +}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 server_core.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_core.h Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,274 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + + +#ifndef __SERVER_CORE_H__ +#define __SERVER_CORE_H__ + +#include "server_pkts.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +namespace mbed_mqtt { + +/**@file server_core.h + The MQTT server daemon, a task, provisions the high level abstractions for the + smart applicaltions. This is an intelligent layer that utilizes the services + of the MQTT Server Packet Library and is responsible for the functions of the + topic management, the client management and support of multiple server + applications. + + The light-weight server enables the services of the MQTT protocol for an + application to either extend and / or control the existing functions to suit + the deployment specific scenario. These applications in practice are the + plug-ins / add-ons to the core server functionalities. Specifically, + these applications, among others capabilities, can be used for analyzing and + approving the credentials of the remote clients, acting as a bridge between + a MQTT external client and the server, and a snooper to learn about all the + data transactions to / from the server. + + The server is targeted to conform to MQTT 3.1.1 specification. + + The services of the server are multi-task safe. Platform specific atomicity + constructs are used, through abstractions, by the server to maintain data + coherency and synchronization. + + The server offers the following compile line configurable parameters (-D opt) + - <b> CFG_SR_MAX_MQP_TX_LEN: </b> the constant buffer length allocated for a TX. + \n\n + - <b> CFG_SR_MAX_SUBTOP_LEN: </b> the maximum buffer size to hold a sub-topic. + For e.g., in the topic /x/y/z, the phrase /x, /y and z are sub-topics. + \n\n + - <b> CFG_SR_MAX_TOPIC_NODE: </b> the maximum number of topic nodes in server. + For e.g., in the topic /x/y/z, there are three nodes (/x, /y and z). + \n\n + - <b> CFG_SR_MAX_CL_ID_SIZE: </b> the maximum length of the client-id string. + \n\n + - <b> CFG_SR_MAX_NUM_CLIENT: </b> the maximum number of clients to be managed. + Note this is different from the maximum number of 'contexts'. A large number + of clients can be managed using fewer number of 'contexts' (connections). + \n\n + + @note Any future extensions & development must follow the following guidelines. + A new API or an extension to the existing API + a) must be rudimentary + b) must not imply a rule or policy (including a state machine) + b) must ensure simple design and implementation +*/ + + +/** @defgroup server_daemon The Server Daemon API(s) + @{ +*/ + +struct mqtt_server_app_cbs { + + /** Connection request from remote client - assess credentials. + This routine presents, to the application, the information about the + credentials of the remote client that is trying to establish a MQTT + connection with the server. The application shall utilize these data + in conjunction with its policy to either allow or deny connection to + the server. + + Should the application choose to allow the remote client to establish + a MQTT connection, then it must provide in 'app-user' (place-holder), + a handle that refers to the user of this connection. The server will + provide this handle in follow-up routines to enable the application + to refer to the associated connection in its implementation. + + @param[in] client_id UTF8 based ID of the remote client - is set to + NULL to indicate a zero-length client id. + @param[in] user_name UTF8 based user-name provided by the remote + client. It is set to NULL if the client did not provide an user name + @param[in] pass_word UTF8 based pass-word provided by the remote + client. It is set to NULL if the client did not provide a pass-word + @param[out] app_usr place-holder for application to provide a handle + to the user of this specific connection / client. + + @return 16bit value for the variable header in the CONNACK message. + The MSB in the return value refers to the 8bit parameter of the + acknowledge flags and must be set 0. The LSB in the return value + refers to the 8bit 'return code' parameter in the CONNACK message + and must be set accordingly. + */ + uint16_t (*connect)(const struct utf8_string *client_id, + const struct utf8_string *user_name, + const struct utf8_string *pass_word, + void **app_usr); + + /** Indicate a PUBLISH message from the network. + This routine presents, to the application, the topic and the related + data along with other qualifiers published by one of the clients + associated with this server. The application must enroll with the + server the particular topic for which the data should be notified. + + @param[in] topic UTF8 topic Name for which data has been published + @param[in] data_buf the published binary data for the topic + @param[in] data_len the length of the binary data + @param[in] dup is this a duplicate data from remote client? + @param[in] qos quality of service of the message / data + @param[in] retain should the server retain the data? + + @return none + */ + void (*publish)(const struct utf8_string *topic, + const uint8_t *payload, uint32_t pay_len, + bool dup, uint8_t qos, bool retain); + + /** Notify disconnection to the remote client. + This routine is invoked by the server to declare to the application + to a particular client has been terminated and follow-up, if needed, + can now commence. This routine aids the application by indicating if + an error condition had caused the disconnection. + + @param[in] app_usr handle to refer to the user of the connection in + the application + @param[in] due2err has the connection been closed due to an error? + + @return none + */ + void (*disconn)(const void *app_usr, bool due2err); +}; + +/** Enroll with server to receive data for a topic + This routine registers with the server, the specified topic for which the + application should receive any published data from the network. Whenever, + any of the remote clients that are connected to the server or applications, + this or other, publishes data for the specified topic, the server will + present the published information to this application. + + As part of the enrollment, the application should provide the maxmimum QoS + with which the server should provide the published data. If the topic data + received by the server bears a QoS higher than the one specified by the + application, the server shall lower it to the QoS level preferred by the + application. Otherwise, the QoS of the data shall be presented 'as-is'. In + other words, the application should expect a published data with a lower QoS. + + @param[in] app_hnd handle to the application context in the server. This + handle is provided by server @ref mqtt_server_app_register() + @param[in] topic UTF8 based string for which the application needs to start + getting the published data + @param[in] qos the meximum QoS the application intends to receive data for + this particular topic + + @return 0 on sucess, otherwise a negative value on error. +*/ +int32_t mqtt_server_topic_enroll(const void *app_hnd, + const struct utf8_string *topic, enum mqtt_qos qos); + +/** Cancel previous enrollment to receive data for a topic + This routines terminates the previous registration, if any, made by the + application to receive any publishised data for the specified topic. Once, + the enrollment is removed, the application, there after, will not receive + any data for this topic from the server. + + @param[in] app_hnd handle to the application context in the server. This + handle is provided by server @ref mqtt_server_app_register() + @param[in] topic UTF8 based string for which the application needs to stop + getting the published data + + @return 0 on success, otherwise a negative value on error. +*/ +int32_t mqtt_server_topic_disenroll(const void *app_hnd, + const struct utf8_string *topic); + +/** Send data to network for a topic + This routine offers the binary data along-with associated properties for a + specific topic to the server. The server, based on the subscriptions from the + remote clients and the enrollments made by the local applications for this + topic, will send the binary data and its qualifiers, adjusted for the maximum + subscribed or enrolled QoS, to the remote clients and the local applications. + + @param[in] topic UTF8 topic Name for which data has been published + @param[in] data_buf the published binary data for the topic + @param[in] data_len the length of the binary data + @param[in] qos quality of service of the message / data + @param[in] retain should the server retain the data? + + @return on success, the length of data sent, otherwise -1 on error. +*/ +int32_t mqtt_server_app_pub_send(const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, + enum mqtt_qos qos, bool retain); + +/** Register an application with the server. + This routine makes known to the server about an application identified by + its name and creates a context / reference for the application in the + server. + + An application intending to utlize the servicse of the MQTT server must be + first registered with the server. + + @param[in] cbs callback routines from the application to be invoked by the + server + @param[in] name refers to the name of the application. The application must + retain the memory used by the 'name' after the function call. The server + does not copy the name into its internal buffers. + + @return a valid handle to context of application in the server, othewise + NULL on error +*/ +void *mqtt_server_app_register(const struct mqtt_server_app_cbs *cbs, + const char *name); + + +/** Configuration for the applications that utilize the MQTT Server Daemon. + At the moment this configuration is not used and has been incorporated + to support forward compatibility (future use) +*/ +struct mqtt_server_app_cfg { + + void *place_holder; /**< Dummy, not used as of now */ +}; + +/** Initialize the MQTT Server (Task / Daemon). + This routine initializes the server implementation and sets it up using + the provided configuration. The server implementation must be initialized + prior to invoking of any other routine or service. + + This routine should be invoked as part of the platform initialization. + + @note This routine must be invoked only once in an run of the system. This + routine internally invokes the @ref mqtt_server_lib_init( ) and therefore, + there is no need to invoke the @ref mqtt_server_lib_init( ) explicitly. + + The server needs to be in a state to listen to incoming MQTT connection + requests. Therefore, the platform sequence after provisioning the buffer + using the API @ref mqtt_server_register_net_svc, must invoke the API @ref + mqtt_server_run, in an infinite loop, to keep the server daemon active. + + @param[in] lib_cfg configuration information for the MQTT server packet + library. + @param[in] app_cfg configuration information for the server applications. + + @return 0 on success, otherwise -1 on error +*/ +int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg, + const struct mqtt_server_app_cfg *app_cfg); + +/** @} */ /* End of server_daemon */ + +}//namespace mbed_mqtt + +#ifdef __cplusplus +} +#endif + +#endif + +
diff -r 000000000000 -r 547251f42a60 server_pkts.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_pkts.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,1000 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "server_pkts.h" + +namespace mbed_mqtt { + +/*----------------------------------------------------------------------------- + * Note: Do not create additional dependency of this file on any header other + * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the + * mqtt_common.[hc] files must be facilitated to create a stand-alone library. + *----------------------------------------------------------------------------- + */ + +static void *mutex = NULL; +static void (*mutex_lockin)(void*) = NULL; +static void (*mutex_unlock)(void*) = NULL; + +#define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex); +#define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex); + +static bool aux_dbg_enbl = false; +static int32_t (*debug_printf)(const char *fmt, ...) = NULL; + +#define USR_INFO debug_printf +#define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__) + +static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end, + struct utf8_string *utf8) +{ + const uint8_t *ref = buf; /* Reference */ + uint16_t len = 0; /* UTF8 Size */ + + if(end - buf < 2) + return -1; /* No valid buffer to hold UTF8 size */ + + buf += buf_rd_nbo_2B(buf, &len); + if(end - buf < len) + return -1; /* No valid buffer to hold UTF8 name */ + + utf8->length = len; + utf8->buffer = len? (char*)buf : NULL; + + return buf + len - ref; +} + +static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL; +static const struct device_net_services *net_ops = NULL; + +#ifndef CFG_SR_MQTT_CTXS +#define MAX_NWCONN 6 +#else +#define MAX_NWCONN CFG_SR_MQTT_CTXS +#endif + +static struct client_ctx contexts[MAX_NWCONN]; + +static struct client_ctx *used_ctxs = NULL; +static struct client_ctx *free_ctxs = NULL; + + +#define NETWORK_CLOSE_FLAG 0x00200000 +#define NW_CONN_ERROR_FLAG 0x00400000 +#define RCVD_CONN_MSG_FLAG 0x00800000 + +#define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG) + +static void cl_ctx_init(void) +{ + int32_t i = 0; + for(i = 0; i < MAX_NWCONN; i++) { + struct client_ctx *cl_ctx = contexts + i; + + cl_ctx_reset(cl_ctx); + + cl_ctx->next = free_ctxs; + free_ctxs = cl_ctx; + } +} + +static void cl_ctx_free(struct client_ctx *cl_ctx) +{ + cl_ctx_reset(cl_ctx); + + cl_ctx->next = free_ctxs; + free_ctxs = cl_ctx; + + return; +} + +static struct client_ctx *cl_ctx_alloc(void) +{ + struct client_ctx *cl_ctx = free_ctxs; + if(cl_ctx) { + free_ctxs = cl_ctx->next; + cl_ctx->next = NULL; + } else + USR_INFO("S: fatal, no free cl_ctx\n\r"); + + return cl_ctx; +} + +static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx) +{ + return (cl_ctx->flags & RCVD_CONN_MSG_FLAG); +} + +static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx) +{ + cl_ctx->flags |= RCVD_CONN_MSG_FLAG; +} + +static void used_ctxs_insert(struct client_ctx *cl_ctx) +{ + cl_ctx_timeout_insert(&used_ctxs, cl_ctx); +} + +static void used_ctxs_remove(struct client_ctx *cl_ctx) +{ + cl_ctx_remove(&used_ctxs, cl_ctx); +} + +static int32_t loopb_net = -1; +static const uint8_t LOOP_DATA[] = {0x00, 0x01}; +#define LOOP_DLEN sizeof(LOOP_DATA) +static uint16_t loopback_port = 0; +static bool pending_trigs = false; + +static int32_t loopb_trigger(void) +{ + uint8_t ip_addr[] = {127,0,0,1}; + int32_t rv = 0; + + if((-1 != loopb_net) && (false == pending_trigs)) { + rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN, + loopback_port, ip_addr, 4); + if(0 == rv) + pending_trigs = true; + } + + return rv; +} + +static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err) +{ + DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net); + + net_ops->close(cl_ctx->net); + cl_ctx->net = -1; + + if(cl_ctx->usr) + usr_cbs->on_cl_net_close(cl_ctx->usr, due2err); + + used_ctxs_remove(cl_ctx); + cl_ctx_free(cl_ctx); +} + +static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err) +{ + if(due2err) + cl_ctx->flags |= NW_CONN_ERROR_FLAG; + + cl_ctx->flags |= NETWORK_CLOSE_FLAG; + + loopb_trigger(); +} + +static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len) +{ + int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL); + if(rv <= 0) { + do_net_close_tx(cl_ctx, true); + rv = MQP_ERR_NETWORK; + } + + USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r", + *buf, len, cl_ctx->net, rv? "Sent" : "Fail"); + return rv; +} + +static +int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos, + bool has_vh, uint16_t vh_data) +{ + uint8_t buf[4]; + uint32_t len = 2; + + if(false == had_rcvd_conn_msg(cl_ctx)) + return MQP_ERR_NOTCONN; + + buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false)); + buf[1] = has_vh ? 2 : 0; + + if(has_vh) + len += buf_wr_nbo_2B(buf + 2, vh_data); + + return cl_ctx_send(cl_ctx, buf, len); +} + +static +int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh, + uint16_t vh_data) +{ + struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl; + + return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos, + has_vh, vh_data) : -1; +} + +int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh, + uint16_t vh_data) +{ + return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data); +} + +int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, + bool has_vh, uint16_t vh_data) +{ + int32_t rv; + + MUTEX_LOCKIN(); + rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data); + MUTEX_UNLOCK(); + + return rv; +} + +int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf) +{ + struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl; + + int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, + true, (vh_buf[0] << 8) | vh_buf[1]); + + if((rv > 0) && (0x00 != vh_buf[1])) + do_net_close_tx(cl_ctx, true); + + return rv; +} + +static +int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup) +{ + int32_t rv = 0; + uint8_t *buf = MQP_FHEADER_BUF(mqp); + + if(dup) + *buf |= DUP_FLAG_VAL(true); + + rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp)); + + *buf &= ~DUP_FLAG_VAL(true); + + return rv; +} + +int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup) +{ + return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup); +} + +int32_t +mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup) +{ + int32_t rv; + + MUTEX_LOCKIN(); + rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup); + MUTEX_UNLOCK(); + + return rv; +} + +#define MQP_MAX_TOPICS 16 +#define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2) + +static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs, + uint32_t pay_len, uint16_t msg_id) +{ + uint8_t *ref = buf += MAX_FH_LEN; + + if(MQP_SUBACK_PAY_OFS != pay_ofs) + return MQP_ERR_PKT_LEN; + + buf += buf_wr_nbo_2B(buf, msg_id); + ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES, + pay_len + buf - ref); + + ref -= 1; + *ref = MAKE_FH_BYTE1(MQTT_SUBACK, + MAKE_FH_FLAGS(false, MQTT_QOS0, false)); + + return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref); +} + +static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id); +} + +/*---------------------------------------------------------------------------- + * Receive Routines + *---------------------------------------------------------------------------- + */ + +/* Candidate to be moved to mqtt_common.c file */ +static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw) +{ + uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); + + if(mqp_raw->pl_len < 2) + return false; /* Bytes for MSG ID not available */ + + buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id); + mqp_raw->vh_len += 2; + mqp_raw->pl_len -= 2; + + return true; +} + +#define BRK_IF_RD_ERR_UTF8(buf, end, utf8) \ + if(rd_buf_utf8(buf, end, utf8) < 0) \ + break; + +static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos) +{ + struct utf8_string utf8; + uint8_t *ref = buf; + + buf += mqp_buf_rd_utf8(buf, end, &utf8); + + /* Assess that UTF8 has been read and QOS can be read */ + if((buf > ref) && (end > buf)) { + utf8_qos->buffer = utf8.buffer; + utf8_qos->length = utf8.length; + utf8_qos->qosreq = (enum mqtt_qos)*buf++; + + return buf - ref; + } + + return -1; +} + +static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw, + struct utf8_strqos *qos_topics, uint32_t *n_topics) +{ + uint8_t *buf, *end; + uint32_t i = 0; + + if(false == mqp_proc_vh_msg_id_rx(mqp_raw)) + return false; /* Problem in contents received from client */ + + buf = MQP_PAYLOAD_BUF(mqp_raw); + end = buf + mqp_raw->pl_len; + + for(i = 0; (i < *n_topics) && (buf < end); i++) { + struct utf8_strqos *qos_top = qos_topics + i; + int32_t len = buf_rd_utf8_qos(buf, end, qos_top); + if(len < 0) + break; /* Failed to read Topic */ + + buf += len; + } + + *n_topics = i; + + return ((0 == i) || (buf != end))? false : true; +} + +static +bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint32_t n_topics = MQP_MAX_TOPICS; + uint16_t msg_id; + + struct utf8_strqos qos_topics[MQP_MAX_TOPICS]; + uint8_t ack[MQP_MAX_TOPICS + MQP_SUBACK_PAY_OFS]; + + if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics)) + return false; + + msg_id = mqp_raw->msg_id; + + /* All topics have been now put in array, pass-on info to upper layer */ + if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics, + msg_id, ack + MQP_SUBACK_PAY_OFS)) { + + sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id); + + return true; + } + + return false; +} + + +static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw, + struct utf8_string *topics, uint32_t *n_topics) +{ + uint8_t *buf, *end; + uint32_t i = 0; + + if(false == mqp_proc_vh_msg_id_rx(mqp_raw)) + return false; /* Problem in contents received from client */ + + buf = MQP_PAYLOAD_BUF(mqp_raw); + end = buf + mqp_raw->pl_len; + + for(i = 0; (i < *n_topics) && (buf < end); i++) { + struct utf8_string *topic = topics + i; + int32_t len = mqp_buf_rd_utf8(buf, end, topic); + if(len < 0) + break; /* Failed to read Topic */ + + buf += len; + } + + *n_topics = i; + + return ((0 == i) || (buf != end))? false : true; +} + +static +bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint32_t n_topics = MQP_MAX_TOPICS; + uint16_t msg_id; + + struct utf8_string topics[MQP_MAX_TOPICS]; + + if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics)) + return false; + + msg_id = mqp_raw->msg_id; + + /* All topics have been now put in array, pass-on info to upper layer */ + if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) { + unsub_ack_send(cl_ctx, msg_id); + return true; + } + + return false; +} + +static bool proc_pingreq_rx(struct client_ctx *cl_ctx) +{ + vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00); + return true; +} + +static bool proc_disconn_rx(struct client_ctx *cl_ctx) +{ + do_net_close_rx(cl_ctx, false); + return true; +} + +static +bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + bool rv = mqp_proc_pub_rx(mqp_raw); + uint8_t B = mqp_raw->fh_byte1; + enum mqtt_qos qos = ENUM_QOS(B); + struct utf8_string topic; + uint16_t msg_id = 0; + + if(false == rv) + return rv; /* Didn't get a good PUB Packet */ + + msg_id = mqp_raw->msg_id; + + topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw); + topic.length = MQP_PUB_TOP_LEN(mqp_raw); + + rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic, + MQP_PUB_PAY_BUF(mqp_raw), + MQP_PUB_PAY_LEN(mqp_raw), + msg_id, BOOL_DUP(B), qos, + BOOL_RETAIN(B)); + if(false == rv) + return rv; + + if(MQTT_QOS1 == qos) + vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id); + + if(MQTT_QOS2 == qos) + vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id); + + return rv; +} + +static +bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) || + (false == usr_cbs->ack_notify(cl_ctx->usr, + mqp_raw->msg_type, + mqp_raw->msg_id))) + return false; + + return true; +} + +#define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD +//#define KA_TIMEOUT_NONE (0xFFFFFFFF) + +static void rx_timeout_update(struct client_ctx *cl_ctx) +{ + if(false == had_rcvd_conn_msg(cl_ctx)) + return; + + cl_ctx_timeout_update(cl_ctx, net_ops->time()); + + used_ctxs_remove(cl_ctx); + used_ctxs_insert(cl_ctx); + + return; +} + +static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver) +{ + const char *buf = utf8->buffer; + + /* Check for protocol version 3.1.1 */ + if((4 == utf8->length) && + (buf[0] == 'M') && + (buf[1] == 'Q') && + (buf[2] == 'T') && + (buf[3] == 'T') && + (0x04 == ver)) + return true; + + /* Check for protocol version 3.1 */ + if((6 == utf8->length) && + (buf[0] == 'M') && + (buf[1] == 'Q') && + (buf[2] == 'I') && + (buf[3] == 's') && + (buf[4] == 'd') && + (buf[5] == 'p') && + (0x03 == ver)) + return true; + + return false; +} + +static uint16_t proc_connect_vh_rx(struct mqtt_packet *mqp_raw, + uint8_t *conn_flags, uint16_t *ka_secs) +{ + struct utf8_string utf8; + uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw); + uint8_t *end = buf + mqp_raw->pl_len; + uint8_t *ref = buf; + + buf += mqp_buf_rd_utf8(buf, end, &utf8); + if(end - buf < 1) + return CONNACK_RC_BAD_PROTOV; /* No proto ver */ + + if(false == proc_protocol_info(&utf8, *buf++)) + return CONNACK_RC_BAD_PROTOV; + + *conn_flags = *buf++; + + if(end - buf < 2) + return 0xFF; /* Bad packet composition */ + + *ka_secs = (buf[0] << 8) | buf[1]; + buf += 2; + *ka_secs += *ka_secs >> 1; + + mqp_raw->vh_len = buf - ref; + mqp_raw->pl_len -= buf - ref; + + return 0; +} + +#define RET_IF_RD_CONN_ERROR(buf, end, utf8) \ + { \ + int32_t len = mqp_buf_rd_utf8(buf, end, utf8); \ + if(len < 0) \ + return 0x00FF; \ + \ + buf += len; \ + } + +uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags, + struct utf8_string *free_utf8s, + struct utf8_string **used_refs) +{ + struct utf8_string *utf8; + + utf8 = used_refs[0] = free_utf8s + 0; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + + if(conn_flags & WILL_CONFIG_VAL) { + utf8 = used_refs[1] = free_utf8s + 1; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + + utf8 = used_refs[2] = free_utf8s + 2; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + } + + if((0 == (conn_flags & USER_NAME_OPVAL)) && + (0 != (conn_flags & PASS_WORD_OPVAL))) + return 0x00FF; /* Bad combination */ + + if(conn_flags & USER_NAME_OPVAL) { + utf8 = used_refs[3] = free_utf8s + 3; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + } + + if(conn_flags & PASS_WORD_OPVAL) { + utf8 = used_refs[4] = free_utf8s + 4; + RET_IF_RD_CONN_ERROR(buf, end, utf8); + } + + return 0; +} + +static +bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL}; + struct utf8_string free_utf8s[5]; + uint8_t conn_flags, *buf, *end; + bool clean_session = false; + uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */ + + set_rcvd_conn_msg(cl_ctx); + + ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs); + if(ack_vh16) + goto proc_connect_rx_exit1; + + buf = MQP_PAYLOAD_BUF(mqp_raw); + end = buf + mqp_raw->pl_len; + ack_vh16 = proc_connect_pl_rx(buf, end, conn_flags, + free_utf8s, used_refs); + if(ack_vh16) + goto proc_connect_rx_exit1; + + clean_session = (conn_flags & CLEAN_START_VAL)? true : false; + ack_vh16 = (!used_refs[0]->length && !clean_session)? + CONNACK_RC_CLI_REJECT : 0; /* Validate 0 byte Client ID */ + if(ack_vh16) + goto proc_connect_rx_exit1; + + /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */ + ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0], + &cl_ctx->usr); + proc_connect_rx_exit1: + + DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16, + ack_vh16 & 0xFF? "error" : "good"); + + if(0xFF != (ack_vh16 & 0xFF)) + vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16); + + if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) { + rx_timeout_update(cl_ctx); + usr_cbs->on_connack_send(cl_ctx->usr, clean_session); + } else { + return false; + } + + return true; +} + + +static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list) +{ + int32_t i = 0; + + for(i = 0; (i < size) && (NULL != list); i++, list = list->next) + recv_hvec[i] = list->net; + + recv_hvec[i] = -1; + + return; +} + +static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint8_t msg_type = mqp_raw->msg_type; + bool rv = false; + + USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r", + mqp_raw->fh_byte1, cl_ctx->net); + + if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx)) + goto process_recv_exit1; /* Proto Violation */ + + rx_timeout_update(cl_ctx); + + switch(msg_type) { + + case MQTT_CONNECT: + rv = proc_connect_rx(cl_ctx, mqp_raw); + break; + + case MQTT_DISCONNECT: + rv = proc_disconn_rx(cl_ctx); + break; + + case MQTT_SUBSCRIBE: + rv = proc_sub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_UNSUBSCRIBE: + rv = proc_unsub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_PINGREQ: + rv = proc_pingreq_rx(cl_ctx); + break; + + case MQTT_PUBLISH: + rv = proc_pub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_PUBACK: + case MQTT_PUBREC: + case MQTT_PUBREL: + case MQTT_PUBCOMP: + rv = proc_ack_msg_rx(cl_ctx, mqp_raw); + break; + + default: + break; + } + + process_recv_exit1: + DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r", + mqp_raw->msg_id, rv? "Good" : "Fail"); + + return rv; +} + + +/* Terminate net connections which haven't received PKTs beyond expected time. + Caller must ensure atomic enviroment for execution of this routine. +*/ +static void ka_sequence(uint32_t *secs2wait) +{ + struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */ + uint32_t now_secs = net_ops->time(); + + while(NULL != cl_ctx) { + struct client_ctx *next = cl_ctx->next; + if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) { + bool due2err = false; + if(cl_ctx->flags & NW_CONN_ERROR_FLAG) + due2err = true; + + cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG | + NETWORK_CLOSE_FLAG); + + /* Close network: Timeout or TX err */ + do_net_close_rx(cl_ctx, due2err); + } + + cl_ctx = next; + } + + cl_ctx = used_ctxs; + if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) || + ((NULL == cl_ctx))) + *secs2wait = IO_MON_NO_TIMEOUT; + else + *secs2wait = cl_ctx->timeout - now_secs; + + return; +} + +/* Put a new functiona name such as mk_new_ctx() or setup_ctx() and + processing to restrict limit number of connections. + + Return value as well. +*/ +static bool accept_ctx(int32_t net, uint32_t wait_secs) +{ + struct client_ctx *cl_ctx = cl_ctx_alloc(); + if(NULL == cl_ctx) + return false; + + cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip, + &cl_ctx->ip_length); + if(-1 == cl_ctx->net) { + cl_ctx_free(cl_ctx); + USR_INFO("S: failed to accept new NW connection\n\r"); + return false; + } + + DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net); + + /* Timeout to receive MQTT_CONNECT */ + cl_ctx->timeout = wait_secs + net_ops->time(); + + used_ctxs_insert(cl_ctx); + return true; +} + +static struct client_ctx *net_cl_ctx_find(int32_t net) +{ + struct client_ctx *cl_ctx = used_ctxs; + while(cl_ctx) { + if(net == cl_ctx->net) + break; + + cl_ctx = cl_ctx->next; + } + + if(NULL == cl_ctx) + USR_INFO("Did not find ctx for net %d\n\r", net); + + return cl_ctx; +} + +static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */ +static int32_t send_hvec = -1; +static int32_t rsvd_hvec = -1; +static int32_t listen_net = -1; + +static struct mqtt_packet rx_mqp; +static uint8_t rxb[MQP_SERVER_RX_LEN]; +static uint16_t listener_port = 0; + +static inline +int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out) +{ + int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL); + if(rv <= 0) + rv = MQP_ERR_NETWORK; + + return rv; +} + +static int32_t proc_loopback_recv(int32_t net) +{ + uint8_t buf[LOOP_DLEN]; + + /* Thanks for waking-up thread and do nothing in this routine */ + int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0); + pending_trigs = false; + + if(rv <= 0) { + net_ops->close(net); + return MQP_ERR_LIBQUIT; + } + + return rv; +} + +static void proc_net_data_recv(int32_t net) +{ + struct client_ctx *cl_ctx = net_cl_ctx_find(net); + bool dummy; + int32_t rv; + + mqp_reset(&rx_mqp); /* Start w/ a clean buffer */ + + rv = net_recv(net, &rx_mqp, 0, &dummy); + if(rv > 0) + /* Working Principle: Only RX processing errors should be + reported as 'false'. Status of TX as a follow-up to RX + messages need not be reported by the xyz_rx() routines. + Error observed in TX is either dealt in next iteration + of RX loop. + */ + if(false == process_recv(cl_ctx, &rx_mqp)) + rv = MQP_ERR_CONTENT; + + if(rv < 0) + do_net_close_rx(cl_ctx, rv); +} + +static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs) +{ + bool rv = true; + int32_t idx = 0; + + MUTEX_LOCKIN(); + + for(idx = 0; (idx < n_hnds) && (rv == true); idx++) { + int32_t net = recv_hvec[idx]; + if(net == listen_net) { + rv = accept_ctx(listen_net, wait_secs); + } else if(loopback_port && (net == loopb_net)) { + if(proc_loopback_recv(loopb_net) < 0) + rv = false; + } else { + proc_net_data_recv(net); + } + } + + MUTEX_UNLOCK(); + + return rv; +} + +int32_t mqtt_server_run(uint32_t wait_secs) // TBD break into two functions +{ + uint32_t secs2wait = 0; + int32_t n_hnds = 0; + + USR_INFO("S: MQTT Server Run invoked ....\n\r"); + + if(NULL == net_ops) + return MQP_ERR_NET_OPS; + + if(loopback_port) { + loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, + loopback_port, NULL); + if(-1 == loopb_net) + return MQP_ERR_LIBQUIT; + } + + listen_net = net_ops->listen(0, listener_port, NULL); + if(-1 == listen_net) + return MQP_ERR_LIBQUIT; + + do { + int32_t *r_hvec = recv_hvec + 0; + + *r_hvec++ = listen_net; + if(loopback_port) + *r_hvec++ = loopb_net; + + /* MQTT Timeouts: close expired conns; get time to next expiry */ + ka_sequence(&secs2wait); + + /* Prepare array of net handles. Must've atleast listen handle */ + // recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1, used_ctxs); + recv_hvec_load(r_hvec, MAX_NWCONN + 1, used_ctxs); + + n_hnds = net_ops->io_mon(recv_hvec, &send_hvec, + &rsvd_hvec, secs2wait); + if(n_hnds < 0) + return MQP_ERR_LIBQUIT; + + if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs)) + return MQP_ERR_LIBQUIT; + + } while(1); +} + +int32_t mqtt_server_register_net_svc(const struct device_net_services *net) +{ + if(net && net->io_mon && net->close && net->send && + net->recv && net->time && net->listen) { + net_ops = net; + return 0; + } + + return -1; +} + +int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg, + const struct mqtt_server_msg_cbs *msg_cbs) +{ + cl_ctx_init(); + + if((NULL == lib_cfg) || + (0 == lib_cfg->listener_port) || + (NULL == lib_cfg->debug_printf)) + return -1; + + debug_printf = lib_cfg->debug_printf; /* Facilitate debug */ + + loopback_port = lib_cfg->loopback_port; + listener_port = lib_cfg->listener_port; + + mutex = lib_cfg->mutex; + mutex_lockin = lib_cfg->mutex_lockin; + mutex_unlock = lib_cfg->mutex_unlock; + + aux_dbg_enbl = lib_cfg->aux_debug_en; + debug_printf = lib_cfg->debug_printf; + + usr_cbs = &usr_obj; + + memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs)); + + mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0); + + return 0; +} + +}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 server_pkts.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_pkts.h Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,440 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#ifndef __SERVER_PKTS_H__ +#define __SERVER_PKTS_H__ + +#include "mqtt_common.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +namespace mbed_mqtt { + +/*----------------------------------------------------------------------------- + * Note: Do not create additional dependency of this file on any header other + * than mqtt_common.h. Specifically, server_pkts.[hc] in conjunction with the + * mqtt_common.[hc] files must be facilitated to create a stand-alone library. + *----------------------------------------------------------------------------- + */ + +/**@file server_pkts.h + The C library provisions the interface / API(s) for the MQTT Server Packet LIB. + + This is a light-weight library that enables the services of the MQTT protcol + for user's server application(s) to exchange the MQTT packets with one or + more remote clients. The Server Packet LIB is a simple and easy-to-use + implementation to support both un-packing of the messages received from the + remote clients and formation of packets to be sent to the remote clients. + + The library is targeted to conform to MQTT 3.1.1 specification. + + The Server Packet LIB is a highly portable software and implies a very limited + set of dependencies on a platform. Importantly, these limited dependencies are + common features used in the embedded and the networking world, and can be + easily adapted to the target platforms. The services of the library are + multi-task safe. Platform specific atomicity constructs are used, through + abstractions, by the library to maintain data coherency and synchronization. + In addition, the library can be configured to support several in-flight + messages. + + The Server Packet LIB can support multiple and simultaneous MQTT connections + from clients. However, the responsibility of managing the clients and topics, + authentication and approval for connections and supporting any other needs + that specific to the deployment remains with the user application. + + The number of the network connections that the Server Packet LIB can support + is configurable through the compile line option / flag <b> -DCFG_SR_MQTT_CTXS + </b>. In addition, the maximum length of the RX buffer used by the server is + also configurable through the compile line option / flag <b> + -DCFG_SR_MAX_MQP_RX_LEN </b>. + + @note Any future extensions & development must follow the following guidelines. + A new API or an extension to the existing API + a) must be rudimentary + b) must not imply a rule or policy (including a state machine) + b) must ensure simple design and implementation +*/ + + +/** @defgroup server_pktlib The Server Library API(s) + @{ +*/ + +/** @defgroup conn_utf8_help_group Helper Macros for RX CONNECT + @{ +*/ + +/** Yields pointer to the UTF8 content */ +#define MQ_CONN_UTF8_BUF(utf8) ((utf8)? (utf8)->buffer : NULL) + +/** Length or size of the UTF8 content */ +#define MQ_CONN_UTF8_LEN(utf8) ((utf8)? (utf8)->length : 0) + +#define MQC_UTF8_CLIENTID(utf8_vec) (utf8_vec[0]) /**< Returns Client ID */ +#define MQC_UTF8_WILL_TOP(utf8_vec) (utf8_vec[1]) /**< Returns Will Topic */ +#define MQC_UTF8_WILL_MSG(utf8_vec) (utf8_vec[2]) /**< Returns Will Data */ +#define MQC_UTF8_USERNAME(utf8_vec) (utf8_vec[3]) /**< Returns User Name */ +#define MQC_UTF8_PASSWORD(utf8_vec) (utf8_vec[4]) /**< Returns Pass Word */ + +#define MQC_CLIENTID_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_CLIENTID(utf8_vec)) +#define MQC_CLIENTID_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_CLIENTID(utf8_vec)) + +#define MQC_WILL_TOP_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_WILL_TOP(utf8_vec)) +#define MQC_WILL_TOP_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_WILL_TOP(utf8_vec)) + +#define MQC_WILL_MSG_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_WILL_MSG(utf8_vec)) +#define MQC_WILL_MSG_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_WILL_MSG(utf8_vec)) + +#define MQC_USERNAME_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_USERNAME(utf8_vec)) +#define MQC_USERNAME_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_USERNAME(utf8_vec)) + +#define MQC_PASSWORD_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_PASSWORD(utf8_vec)) +#define MQC_PASSWORD_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_PASSWORD(utf8_vec)) + +/** @} */ + +#ifndef CFG_SR_MAX_MQP_RX_LEN +#define MQP_SERVER_RX_LEN 1024 /**< Max size(B) of RX Buffer for MQTT Server */ +#else +#define MQP_SERVER_RX_LEN CFG_SR_MAX_MQP_RX_LEN +#endif + +/** Send a Variable Header only message to the client. + Application should this routine to send PUBREL and PUBCOMP messages. + + @param[in] ctx_cl handle to the underlying network context in the LIB. This + handle is provided to the application by the LIB in the CONNECT callback. + @param[in] msg_type message type that has to be sent to the client + @param[in] qos QoS with which the message needs to send to server + @param[in] has_vh does this message has data in the variable header? + @param[in] vh_data data <MSB:LSB> to be included in the variable header + @return on success, the number of bytes transferred. Otherwise, errors + defined in @ref lib_err_group +*/ +int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, + bool has_vh, uint16_t vh_data); + +/** mqtt_vh_msg_send() with mutual exclusion (in multi-task application). + This routine ensures that the LIB sends the specified VH MSG onto the network + in a manner that excludes execution of any other control. This API has been + enabled to support the scenarios, where the multi-tasked user application has + chosen to use a mutex object other than the one provisioned in the packet LIB + to streamline / serialize accesses to the services of the packet LIB. + + Refer to @ref mqtt_vh_msg_send for details +*/ +int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, + bool has_vh, uint16_t vh_data); + +/** Dispatch application constructed PUBLISH message to the client. + Prior to sending the message to the client, this routine shall update the + fixed-header to account for the duplicate flag that has been indicated by + the caller. + + The caller must populate the payload information of topic and data before + invoking this service. In addition, the application must prepare, for the + packet, the fix header leaving aside the duplicate flag - this flag shall + be included in the fix heder by the LIB. + + This service facilitates the application to re-use, iteratively, a single + PUBLISH packet for multiple remote clients that have subscribed to the + same topic for which the data is being published. The LIB does not free-up + the MQTT packet after sending the packet to the remote client and the + application is responsible for managing the packet container / memory + + @param[in] ctx_cl handle to the underlying network context in the LIB. This + handle is provided to the application by the LIB in the CONNECT callback. + @param[in] mqp app created PUBLISH message alongwith the fixed header + @param[in] dup is this a duplicate message that is being sent to client? + @return on success, the number of bytes transferred. Otherwise, error + defined in @ref lib_err_group +*/ +int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup); + +/** mqtt_server_pub_dispatch() with mutual exclusion (in multi-task application). + This routine ensures that the LIB sends the specified packet onto the network + in a manner that excludes execution of any other control. This API has been + enabled to support the scenarios, where the multi-tasked user application has + chosen to use a mutex object other than the one provisioned in the packet LIB + to streamline / serialize accesses to the services of the packet LIB. + + Refer to @ref mqtt_server_pub_dispatch for other details. +*/ +int32_t mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, + bool dup); + +/** Run the server packet LIB for the specified wait time. + This routine yields the control back to the application after the specified + duration of wait time. Such an arrangement enable the application to make + overall progress to meet it intended functionality. + + The wait time implies the maximum intermediate duration between the reception + of two successive messages from the server. If no message is received before + the expiry of the wait time, the routine returns. However, the routine would + continue to block, in case, messages are being received within the successive + period of wait time. + + @param[in] wait_secs maximum time to wait for a message from the server + + @note if the value of MQP_ERR_LIBQUIT is returned, then system must be + restarted. +*/ +int32_t mqtt_server_run(uint32_t wait_secs); + +/** Abstraction for the device specific network services + Network services for communication with the clients + + @param[in] net refers to network services supported by the platform + @return on success, 0, otherwise -1 + + @ref net_ops_group + @note all entries in net must be supported by the platform. +*/ +int32_t mqtt_server_register_net_svc(const struct device_net_services *net); + + +/** <b> Working Principle </b> for implementing the call-back services: + Implementation of the call-back services should report in return value, only + about errors found in the RX processing. Specifically, the status of TX as a + follow-up to RX message (represented as a call-back service) need not be + reported to the server packet library. + + Error observed in TX (supported by appropriate API(s) / services in the + service packet library) is recorded in the 'context' and shall be dealt in + the next iteration of RX loop. +*/ +struct mqtt_server_msg_cbs { + + /** Indicate the CONNECT Message from the client + This routine provides, to the application, information about the + connection request that a remote client has made. The application + should utilize the credential and other data presented by the LIB + to authenticate, analyze and finally, approve or deny the request. + + Application at its discretion can also imply deployment specific + policies to make decision about allowing or refusing the request. + + The return value of this routine is a 16bit value that commensurates + with the 'variable header' of the CONNACK message. Specifically, the + LSB of the 16bit return value corresponds to the 8bit 'return code' + parameter in the CONNACK message and the MSB to the 'acknowledge + flags'. The application must set a valid return value. + + The LIB uses the return value to compose the CONNACK message to the + remote client. If the LSB of return value is a 'non zero' value, + then the LIB, after sending the CONNACK message to the remote client, + will terminate the network connection. + + @param[in] ctx_cl handle to the underlying network context in the LIB + @param[in] conn_flags options received in CONNECT message from server + @param[in] utf8_vec vector / array of pointers to UTF8 information. + The order of UTF8 information is client-id, will topic, will-message, + user-name and pass-word. A NULL in vector element indicates absence + of that particular UTF8 information. + @param[out] usr place holder for application to provide connection + specific handle. In subsequent calls from the implementation this + handle will be passed as a parameter to enable application to refer + to the particular active connection. + + @return 16bit value for the variable header of the CONNACK message, + MSB is CONNACK-Flags, LSB is CONNACK-RC + */ + uint16_t (*connect_rx)(void *ctx_cl, uint8_t conn_flags, + struct utf8_string * const *utf8_vec, void **usr); + + /** Indicate the SUBSCRIBE Message from the client + This routine provides, to the application, information about the + topics that the remote client intends to subscribe to. + + On return from this routine, if the application has found a problem + in the processing of message, then the LIB will simply terminate the + associated network connection. + + @param[in] usr_cl handle to connection context in the application + @param[in] qos_topics an array of topic along-with its qos + @param[in] n_topics the count / number of elements in the array + @param[in] msg_id the associated message ID provided by the client + @param[in] acks place holder array for the application to provide + finalized qos for each of the subscribed topic. The order of ack + is same as that of qos_topics + + @return The application should return false, if it encounters any + problem in the processing of topic. Otherwise, true. + + @note The memory space pointed by the 'buffer' field in the elements + of 'qos_topics' array has an additional byte available beyond the + size of memory indicated by the 'length' field. This extra byte can + be used by the application to NUL terminate the buffer. <b> This + quirk is applicable to this routine only. </b> + */ + bool (*sub_msg_rx)(void *usr_cl, const struct utf8_strqos *qos_topics, + uint32_t n_topics, uint16_t msg_id, uint8_t *acks); + + /** Indicate the UNSUBSCRIBE Message from the client + This routine provides, to the application, information about the + topics that the remote client intends to unsubscribe. + + On return from this routine, if the application has found a problem + in the processing of message, then the LIB will simply terminate the + associated network connection. + + @param[in] usr_cl handle to connection context in the application + @param[in] topics an array of topic in the message + @param[in] n_topics the count / number of elements in the array + @param[in] msg_id the associated message ID provided by the client + + @return The application should return false, if it encounters any + problem in the processing of topic. Otherwise, true. + */ + bool (*un_sub_msg)(void *usr_cl, const struct utf8_string *topics, + uint32_t n_topics, uint16_t msg_id); + + /** Indicate the PUBLISH Message from the client. + This routine provides, to the application, the binary data along-with + its qualifiers and the topic to which a remote client has published + data. + + On return from this routine, if the application has found a problem + in processing of the contents of the PUBLISH message, the LIB will + simply terminate the associated network connection. Otherwise, + depending upon the QoS level of the PUBLISH message, the LIB shall + dispatch the ACK (PUBACK or PUBREC) to the client, thereby, + relieveing the application from this support. + + @param[in] usr_cl handle to connection context in the application + @param[in] topic UTF8 Topic Name for which data has been published + @param[in] data_buf the published binary data for the topic + @param[in] data_len the length of the binary data + @param[in] msg_id the associated message ID provided by the client + @param[in] dup has client indicated this as a duplicate message + @param[in] qos quality of service of the message + @param[in] retain should the server retain the data? + + @return The application should return false, if it encounters any + problem in the processing of data, topic and related resources. + Otherwise, true. + */ + bool (*pub_msg_rx)(void *usr_cl, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id, + bool dup, enum mqtt_qos qos, bool retain); + + /** Notify the acknowledgement that was received from the remote client. + Following are the messages that are notified by the server LIB. + + PUBACK, PUBREC, PUBREL, PUBCOMP + + On return from this routine, if the application has found problem + in processing the ACK message, then the LIB will simply terminate + the associated network connection + + @param[in] usr_cl handle to connection context in the application + @param[in] msg_type refers to the type of ACK message + @param[in] msg_id the associated message ID provided by the client + @return application should return false if the ACK was not expected + by it or no reference was found for it. Otherwise true. + */ + bool (*ack_notify)(void *usr_cl, uint8_t msg_type, uint16_t msg_id); + + /** Notify that network connection to client has been closed. + This routine is invoked by the LIB to declare to the application that + the network connection to a particular client has been terminated and + follow-up, if needed, can now commence. If configured, removal of the + client session and / or dispatch of the WILL message, will be typical + aspects, among others, to follow-up. The routine aids the application + by indicating if an error condition had caused the closure. + + This routine is invoked by the LIB irrespective of the source entity, + server or client, that has caused the closure of the network. + + @param[in] usr_cl handle to connection context in the application + @param[in] due2err has the connection been closed due to an error? + */ + void (*on_cl_net_close)(void *usr_cl, bool due2err); + + /** Notify that CONNACK has been sent to the specified client. + This routine is invoked by the LIB to enable the application to make + progress and follow-up on the session information for the particular + client. Specifically, this routine facilitates the application to + either delete the session or re-send / sync-up the pending messages + associated with the client. The follow-up action is depenedent upon + the 'clean_session' option in the CONNECT message from the client. + @param[in] usr_cl handle to connection context in the application + @param[in] clean_session was a clean session requested in CONNECT? + */ + void (*on_connack_send)(void *usr_cl, bool clean_session); +}; + +struct mqtt_server_lib_cfg { + + /** Port to listen to incoming network connections from the clients */ + uint16_t listener_port; + + /** If the server application has more than one task and / or supports + at-least one plug-in, then a non-zero value must be provided. + Otherwise, this parameter must be set to zero. This parameter is + used by the implementation to synchronize multiple tasks for the + network connection. + */ + uint16_t loopback_port; + + /** For a multi-task enviroment, provide a handle to platform mutex */ + void *mutex; + void (*mutex_lockin)(void *mutex); + void (*mutex_unlock)(void *mutex); + + int32_t (*debug_printf)(const char *format, ...); /**< Debug, mandatory */ + bool aux_debug_en; /**< Assert to indicate additional debug info */ + +}; + +/** Initialize the MQTT Server Packet library + This routine initializes the packet and network constructs that are required + to manage the multiple network connetions. The server packet LIB must be + initialized prior to invoking of any other routine or service. + + @note This routine must be invoked only once in an run of the system. + + If there are more than one application (tasks) that utilize the services + of the server packet library, then certain configuration must be set in + the LIB @see struct mqtt_server_lib_cfg + + The application should also provision the platform network specific network + services into the packet library @see mqtt_server_register_net_svc. + + Once, the server LIB has been intialized successfully, it must be put into + action, to listen to requests for incoming connections, by invoking the API + @ref mqtt_server_run. + + @param[in] cfg configuration information for the MQTT server packet library + @param[in] cbs callback routines that LIB will invoke into the application + + @return 0 on success otherwise -1. +*/ +int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *cfg, + const struct mqtt_server_msg_cbs *cbs); + +/** @} */ /* End of server_pktlib */ + +}//namespace mbed_mqtt + +#ifdef __cplusplus +} +#endif + +#endif +
diff -r 000000000000 -r 547251f42a60 server_plug.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_plug.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,219 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "server_plug.h" +#include "server_core.h" +#include "server_util.h" + +namespace mbed_mqtt { + +#define MAX_PLUGINS PG_MAP_MAX_ELEMS +#define PG_NAME_LEN 32 + +static struct plugin_desc { + + char *name; + uint8_t index; + uint8_t inuse; + + struct mqtt_server_app_cbs app_cbs; + +} plugins[MAX_PLUGINS]; + +#define PLUGIN(app_hnd) ((struct plugin_desc*) app_hnd) + +static inline bool is_inuse(struct plugin_desc *plugin) +{ + return plugin->inuse? true : false; +} + +static inline void inuse_set(struct plugin_desc *plugin, bool inuse) +{ + plugin->inuse = inuse? 0x01 : 0x00; +} + +static struct plugin_desc *acl_pg = NULL; + +static struct plugin_core_msg_cbs *msg_cbs, msg_cbacks = {NULL, NULL, NULL}; + +static inline struct plugin_desc *plugin_find(int32_t idx) +{ + return plugins + idx; +} + +static void plugin_reset(struct plugin_desc *plugin) +{ + //plugin->app_cbs = NULL; ==> TBD + inuse_set(plugin, false); + + return; +} + +static struct plugin_desc *plugin_alloc(void) +{ + struct plugin_desc *plugin = NULL;; + int32_t idx = 0; + + for(idx = 0; idx < MAX_PLUGINS; idx++) { + plugin = plugins + idx; + if(false == is_inuse(plugin)) { + inuse_set(plugin, true); + break; + } + } + + DBG_INFO("Plugin alloc %s\n\r", + (MAX_PLUGINS == idx)? "Failed" : "Success"); + + return (MAX_PLUGINS != idx)? plugin : NULL; +} + +#if 0 +static void plugin_free(struct plugin_desc *plugin) +{ + plugin_reset(plugin); +} +#endif + +uint16_t plugin_connect(const struct utf8_string *clientId, + const struct utf8_string *username, + const struct utf8_string *password, + void **app_usr) +{ + uint16_t rv = CONNACK_RC_REQ_ACCEPT; /* Accept everything from MQTT network */ + + *app_usr = NULL; + if(acl_pg) + rv = acl_pg->app_cbs.connect(clientId, username, + password, app_usr); + + return rv; +} + +int32_t plugin_publish(uint8_t pg_map, const struct utf8_string *topic, + const uint8_t *payload, uint32_t pay_len, + bool dup, uint8_t qos, bool retain) +{ + int32_t i = 0; + for(i = 0; i < MAX_PLUGINS; i++) { + if(PG_MAP_HAS_VALUE(pg_map, i)) { + struct plugin_desc *plugin = plugin_find(i); + + DBG_INFO("Publishing to Plugin ID: %d (%s)\n\r", + plugin->index, plugin->name); + + if(false == is_inuse(plugin)) + continue; /* Must not happen */ + + plugin->app_cbs.publish(topic, payload, pay_len, + dup, qos, retain); + } + } + + /* TBD for error value return. */ + + return pay_len; +} + +int32_t plugin_disconn(const void *app_usr, bool due2err) +{ + if(acl_pg) + acl_pg->app_cbs.disconn(app_usr, due2err); + + /* TBD for error value return. */ + + return 0; +} + +int32_t mqtt_server_topic_enroll(const void *app_hnd, + const struct utf8_string *topic, enum mqtt_qos qos) +{ + return app_hnd? + msg_cbs->topic_enroll(PLUGIN(app_hnd)->index, topic, qos) : -1; +} + +int32_t mqtt_server_topic_disenroll(const void *app_hnd, + const struct utf8_string *topic) +{ + return app_hnd? + msg_cbs->topic_cancel(PLUGIN(app_hnd)->index, topic) : -1; +} + +int32_t mqtt_server_app_pub_send(const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, + enum mqtt_qos qos, bool retain) +{ + return msg_cbs->publish(topic, data_buf, data_len, qos, retain); +} + +static void *server_app_register(const struct mqtt_server_app_cbs *cbs, + const char *name) +{ + struct plugin_desc *plugin = plugin_alloc(); + if(NULL != plugin) { + strncpy(plugin->name, name, PG_NAME_LEN - 1); + memcpy(&plugin->app_cbs, cbs, + sizeof(struct mqtt_server_app_cbs)); + + if((NULL == acl_pg) && cbs->connect) + acl_pg = plugin; + + } + return plugin; +} + +void *mqtt_server_app_register(const struct mqtt_server_app_cbs *cbs, + const char *name) +{ + if((NULL == cbs) || + ((!!cbs->connect) ^ (!!cbs->disconn)) || + (acl_pg && cbs->connect)) + return NULL; + + return server_app_register(cbs, name); +} + +static bool inited = false; + +int32_t plugin_init(const struct plugin_core_msg_cbs *cbs) +{ + int32_t idx = 0; + + if(inited) + return -1; + + if(NULL == cbs) + return -2; + + if(!(cbs->topic_enroll && cbs->topic_cancel && cbs->publish)) + return -3; + + for(idx = 0; idx < MAX_PLUGINS; idx++) { + struct plugin_desc *plugin = plugins + idx; + plugin->index = idx; + + plugin_reset(plugin); + } + + msg_cbs = &msg_cbacks; + memcpy(msg_cbs, cbs, sizeof(struct plugin_core_msg_cbs)); + + inited = true; + + USR_INFO("Plugin module has been initialized.\n\r"); + return 0; +} + +}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 server_plug.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_plug.h Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,87 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#ifndef __SERVER_PLUG_H__ +#define __SERVER_PLUG_H__ + +#include "server_pkts.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +namespace mbed_mqtt { + +/* Used by Server Core Logic */ + +#define PG_MAP_BITS_SIZE 2 +#define PG_MAP_BITS_MASK ((1 << PG_MAP_BITS_SIZE) - 1) +#define PG_MAP_MAX_ELEMS 4 /* should be able accomodate in 1 byte */ +#define PG_MAP_ALL_DFLTS ((1 << (PG_MAP_BITS_SIZE * PG_MAP_MAX_ELEMS)) - 1) + +#define PG_MAP_HAS_VALUE(pg_map, index) \ + (((~pg_map) >> (index * PG_MAP_BITS_SIZE)) & PG_MAP_BITS_MASK) + +#define PG_MAP_VAL_SETUP(pg_map, qid, index) \ + { \ + uint32_t ofst = index * PG_MAP_BITS_SIZE; \ + pg_map &= ~(PG_MAP_BITS_MASK << ofst); \ + pg_map |= qid << ofst; \ + } + +#define PG_MAP_VAL_RESET(pg_map, index) \ + pg_map |= PG_MAP_BITS_MASK << (index * PG_MAP_BITS_SIZE); + +#if (PG_MAP_BITS_MASK != QID_VMASK) +#error "PG_MAP_BITS_MASK must be same as 2bit QOS_VMASK" +#endif + +#if ((PG_MAP_MAX_ELEMS * PG_MAP_BITS_SIZE) > 8) +#error "Make size-of pg_map greate than 1 byte" +#endif + +struct plugin_core_msg_cbs { + + int32_t (*topic_enroll)(uint8_t pg_id, const struct utf8_string *topic, + enum mqtt_qos qos); + int32_t (*topic_cancel)(uint8_t pg_id, const struct utf8_string *topic); + int32_t (*publish)(const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, enum mqtt_qos qos, bool retain); +}; + +int32_t plugin_init(const struct plugin_core_msg_cbs *cbs); + +/* uint16_t composition: MSB is CONNACK-Flags = 0, LSB is CONNACK-RC */ +uint16_t plugin_connect(const struct utf8_string *clientId, + const struct utf8_string *username, + const struct utf8_string *password, + void **app_usr); + +int32_t plugin_publish(uint8_t pg_map, const struct utf8_string *topic, + const uint8_t *payload, uint32_t pay_len, + bool dup, uint8_t qos, bool retain); + +int32_t plugin_disconn(const void *app_usr, bool due2err); + +}//namespace mbed_mqtt + +#ifdef __cplusplus +} +#endif + +#endif + +
diff -r 000000000000 -r 547251f42a60 server_util.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_util.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,135 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "server_util.h" + +namespace mbed_mqtt { + +static uint16_t msg_id = 0xFFFF; +static inline uint16_t assign_new_msg_id() +{ + return msg_id += 2; +} + +uint16_t mqp_new_id_server(void) +{ + return assign_new_msg_id(); +} + +static void my_pkt_free(struct mqtt_packet *mqp) +{ + my_free((void*) mqp); +} + +/*---------------------------------------------------------------------------- + * Try to prevent fragmentation by consistently allocating a fixed memory size + *---------------------------------------------------------------------------- + */ +#ifndef CFG_SR_MAX_MQP_TX_LEN +#define MQP_SERVER_TX_LEN 1024 +#else +#define MQP_SERVER_TX_LEN CFG_SR_MAX_MQP_TX_LEN +#endif + +/*------------------ Fixed memory configuration ends -----------------------*/ + +static struct mqtt_packet *server_mqp_alloc(uint8_t msg_type, uint32_t buf_sz, uint8_t offset) +{ + uint32_t mqp_sz = sizeof(struct mqtt_packet); + struct mqtt_packet *mqp = NULL; + + buf_sz += offset; + + if((mqp_sz + buf_sz) > MQP_SERVER_TX_LEN) { + USR_INFO("S: fatal, buf alloc len > MQP_SERVER_TX_LEN\n\r"); + return NULL; + } + + mqp = (mqtt_packet*)my_malloc(MQP_SERVER_TX_LEN); + if(NULL != mqp) { + mqp_init(mqp, offset); + + mqp->msg_type = msg_type; + mqp->maxlen = buf_sz; + mqp->buffer = (uint8_t*)mqp + mqp_sz; + + mqp->free = my_pkt_free; + + } else { + USR_INFO("S: fatal, failed to alloc Server MQP\n\r"); + } + + return mqp; +} + +struct mqtt_packet *mqp_server_alloc(uint8_t msg_type, uint32_t buf_sz) +{ + return server_mqp_alloc(msg_type, buf_sz, MAX_FH_LEN); +} + +struct mqtt_packet *mqp_server_copy(const struct mqtt_packet *mqp) +{ + struct mqtt_packet *cpy = server_mqp_alloc(mqp->msg_type, + mqp->maxlen, 0); + if(NULL != cpy) { + uint8_t *buffer = cpy->buffer; + + /* Copy to overwrite everything in 'cpy' from source 'mqp' */ + buf_wr_nbytes((uint8_t*)cpy, (uint8_t*)mqp, sizeof(struct mqtt_packet)); + + cpy->buffer = buffer; /* Restore buffer and copy */ + buf_wr_nbytes(cpy->buffer, mqp->buffer, mqp->maxlen); + } + + return cpy; +} + +int32_t (*util_dbg_prn)(const char *fmt, ...) = NULL; +bool util_prn_aux = false; + +static void (*lockin_mutex)(void*) = NULL; +static void (*unlock_mutex)(void*) = NULL; +static void *mutex_hnd = NULL; + +void util_mutex_lockin(void) +{ + if(lockin_mutex) + lockin_mutex(mutex_hnd); + + return; +} + +void util_mutex_unlock(void) +{ + if(unlock_mutex) + unlock_mutex(mutex_hnd); + + return; +} + +void util_params_set(int32_t (*dbg_prn)(const char *fmt, ...), + void *mutex, + void (*mutex_lockin)(void*), + void (*mutex_unlock)(void*)) +{ + util_dbg_prn = dbg_prn; + mutex_hnd = mutex; + lockin_mutex = mutex_lockin; + unlock_mutex = mutex_unlock; + + return; +} + +}//namespace mbed_mqtt {
diff -r 000000000000 -r 547251f42a60 server_util.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_util.h Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,67 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#ifndef __SERVER_UTIL_H__ +#define __SERVER_UTIL_H__ + +#include "mqtt_common.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +namespace mbed_mqtt { + +#define my_malloc malloc +#define my_free free + +#define MQTT_SERVER_VERSTR "1.0.1" + +#define MIN(a,b) ((a > b)? b : a) + +uint16_t mqp_new_id_server(void); +struct mqtt_packet *mqp_server_alloc(uint8_t msg_type, uint32_t buf_sz); + +struct mqtt_packet *mqp_server_copy(const struct mqtt_packet *mqp); + +extern int32_t (*util_dbg_prn)(const char *fmt, ...); +extern bool util_prn_aux; + +void util_mutex_lockin(void); +void util_mutex_unlock(void); + +#define MUTEX_LOCKIN() util_mutex_lockin() +#define MUTEX_UNLOCK() util_mutex_unlock() + +#define USR_INFO(FMT, ...) if(util_dbg_prn) util_dbg_prn(FMT, ##__VA_ARGS__) + +#define DBG_INFO(FMT, ...) \ + if(util_prn_aux && util_dbg_prn) \ + util_dbg_prn(FMT, ##__VA_ARGS__) + +void util_params_set(int32_t (*dbg_prn)(const char *fmt, ...), + void *mutex, + void (*mutex_lockin)(void*), + void (*mutex_unlock)(void*)); + +}//namespace mbed_mqtt { + +#ifdef __cplusplus +} +#endif + +#endif +