Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
Revision 0:547251f42a60, committed 2015-06-06
- Comitter:
- dflet
- Date:
- Sat Jun 06 13:29:08 2015 +0000
- Commit message:
- Part of mqtt_V1
Changed in this revision
diff -r 000000000000 -r 547251f42a60 client_mgmt.cpp
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/client_mgmt.cpp Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,790 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_util.h"
+#include "client_mgmt.h"
+#include "server_pkts.h"
+
+namespace mbed_mqtt {
+
+#ifndef CFG_SR_MAX_CL_ID_SIZE
+#define MAX_CLIENT_ID_LEN 64
+#else
+#define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE
+#endif
+
+#ifndef CFG_SR_MAX_NUM_CLIENT
+#define MAX_CLIENTS 16 /* Must not exceed 32 */
+#else
+#define MAX_CLIENTS CFG_SR_MAX_NUM_CLIENT
+
+#if (CFG_SR_MAX_NUM_CLIENT > 32)
+#error "CFG_SR_MAX_NUM_CLIENT must not exceed 32"
+#endif
+#endif
+
+static struct client_usr {
+
+ void *ctx; /* Client net */
+ void *app; /* Client app */
+
+ void *will; /* Will topic */
+ uint32_t index;
+ uint32_t n_sub;
+
+#define MQ_CONNECT_FLAG 0x00000001
+#define CLEAN_SESS_FLAG 0x00000002
+#define ASSIGNMENT_FLAG 0x00000004 /* No state or connected */
+
+ uint32_t flags;
+
+ char client_id[MAX_CLIENT_ID_LEN];
+
+ struct pub_qos2_cq qos2_rx_cq;
+ struct pub_qos2_cq qos2_tx_cq;
+
+} users[MAX_CLIENTS];
+
+// TBD consistency between inline and macro functions
+
+static inline bool is_connected(struct client_usr *cl_usr)
+{
+ return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false;
+}
+
+static inline bool has_clean_session(struct client_usr *cl_usr)
+{
+ return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false;
+}
+
+static inline bool is_assigned(struct client_usr *cl_usr)
+{
+ return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false;
+}
+
+static inline void set_connect_state(struct client_usr *cl_usr,
+ bool connected)
+{
+ if(connected)
+ cl_usr->flags |= MQ_CONNECT_FLAG;
+ else
+ cl_usr->flags &= ~MQ_CONNECT_FLAG;
+}
+
+static inline void set_clean_session(struct client_usr *cl_usr,
+ bool clean_session)
+{
+ if(clean_session)
+ cl_usr->flags |= CLEAN_SESS_FLAG;
+ else
+ cl_usr->flags &= ~CLEAN_SESS_FLAG;
+}
+
+static inline void set_assigned(struct client_usr *cl_usr,
+ bool assignment)
+{
+ if(assignment)
+ cl_usr->flags |= ASSIGNMENT_FLAG;
+ else
+ cl_usr->flags &= ~ASSIGNMENT_FLAG;
+}
+
+static void cl_usr_reset(struct client_usr *cl_usr)
+{
+ cl_usr->ctx = NULL;
+ cl_usr->app = NULL;
+ cl_usr->will = NULL;
+
+ cl_usr->n_sub = 0;
+ cl_usr->flags = 0;
+
+ cl_usr->client_id[0] = '\0';
+
+ qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
+ qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
+}
+
+static void cl_usr_init(void)
+{
+ int32_t idx = 0;
+ for(idx = 0; idx < MAX_CLIENTS; idx++) {
+ struct client_usr *cl_usr = users + idx;
+ cl_usr->index = idx;
+
+ cl_usr_reset(cl_usr);
+ }
+}
+
+static void cl_usr_free(struct client_usr *cl_usr)
+{
+ cl_usr_reset(cl_usr);
+ cl_usr->flags &= ~(MQ_CONNECT_FLAG |
+ CLEAN_SESS_FLAG |
+ ASSIGNMENT_FLAG);
+}
+
+void cl_sub_count_add(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(is_connected(cl_usr)) {
+ cl_usr->n_sub++;
+ USR_INFO("%s has added a new sub, now total is %u\n\r",
+ cl_usr->client_id, cl_usr->n_sub);
+
+ }
+
+ return;
+}
+
+void cl_sub_count_del(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(is_connected(cl_usr)) {
+ cl_usr->n_sub--;
+ USR_INFO("%s has deleted a sub, now total is %u\n\r",
+ cl_usr->client_id, cl_usr->n_sub);
+ }
+
+ return;
+}
+
+/*----------------------------------------------------------------------------
+ * QoS2 PUB RX Message handling mechanism and associated house-keeping
+ *--------------------------------------------------------------------------*/
+static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id);
+}
+
+static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id);
+}
+
+static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id);
+}
+
+bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) ||
+ qos2_pub_rx_logup(cl_usr, msg_id));
+}
+
+static void ack2_msg_id_dispatch(struct client_usr *cl_usr)
+{
+ struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq;
+ uint8_t rd_idx = tx_cq->rd_idx;
+ uint8_t n_free = tx_cq->n_free;
+ uint8_t i = 0;
+
+ for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) {
+ if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1,
+ true, tx_cq->id_vec[i]) <= 0)
+ break;
+ }
+
+ return;
+}
+
+static void ack2_msg_id_purge(struct client_usr *cl_usr)
+{
+ qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
+ qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
+}
+
+/*
+
+*/
+static struct mqtt_ack_wlist wl_qos_ack1 = {NULL, NULL};
+static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1;
+
+/*
+
+*/
+static struct mqtt_ack_wlist wl_mqp_sess = {NULL, NULL};
+static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess;
+
+#define MQP_CL_MAP_GET(mqp) (mqp->private_)
+#define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |= cl_map)
+#define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map)
+
+#define CL_BMAP(cl_usr) (1 << cl_usr->index)
+
+static inline
+int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp,
+ bool dup)
+{
+ /* Error, if any, is handled in 'cl_net_close()' ....*/
+ return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup);
+}
+
+static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr)
+{
+ struct mqtt_packet *mqp = NULL;
+
+ for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next)
+ if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr))
+ _pub_dispatch(cl_usr, mqp, true);
+}
+
+#if 0
+static struct client_usr *find_cl_usr(uint32_t index)
+{
+ struct client_usr *cl_usr = users + 0;
+
+ int32_t i = 0;
+ for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) {
+ if(index == cl_usr->index)
+ break;
+ }
+
+ return (MAX_CLIENTS == i)? NULL : cl_usr;
+}
+#endif
+
+#define IS_CL_USR_FREE(cl_usr) (( 0 == cl_usr->flags) && \
+ ('\0' == cl_usr->client_id[0]))
+
+#define IS_CL_INACTIVE(cl_usr) (( 0 == cl_usr->flags) && \
+ ('\0' != cl_usr->client_id[0]))
+
+#define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr) \
+ ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) || \
+ ((MQTT_QOS0 == qos)))? \
+ false : true)
+
+static inline uint32_t _cl_bmap_get(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr);
+}
+
+uint32_t cl_bmap_get(void *usr_cl)
+{
+ return usr_cl? _cl_bmap_get(usr_cl) : 0;
+}
+
+void *cl_app_hndl_get(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL;
+}
+
+void *cl_will_hndl_get(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL;
+}
+
+static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
+{
+ //uint32_t n_bits = sizeof(uint32_t) << 3; /* Multiply by 8 */
+ uint32_t sp_map = 0; /* client Map for sessions present */
+ enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */
+ const uint32_t cl_ref = cl_map; /* Keep ref to original */
+
+ int32_t i = 0;
+ for(i = 0; i < MAX_CLIENTS; i++) {
+ if(cl_map & (1 << i)) {
+ struct client_usr *cl_usr = users + i; //find_cl_usr(i);
+ if(is_connected(cl_usr))
+ if((_pub_dispatch(cl_usr, mqp, false) > 0) &&
+ NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr))
+ continue;/* Processing done; next CL */
+
+ /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */
+ cl_map &= ~(1 << i);
+
+ if(IS_CL_INACTIVE(cl_usr))
+ sp_map |= (1 << i); /* CL: Maintain session */
+ }
+ }
+
+ if(sp_map) {
+ struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */
+ if(cpy) {
+ MQP_CL_MAP_SET(cpy, sp_map);
+ mqp_ack_wlist_append(sess_mqp_wl, cpy);
+ } else
+ sp_map = 0;
+ }
+
+ if(cl_map) { /* Wait List Publish */
+ MQP_CL_MAP_SET(mqp, cl_map);
+ mqp_ack_wlist_append(qos_ack1_wl, mqp);
+ } else
+ mqp_free(mqp); /* PUB MQP now has no more use; must be freed */
+
+ USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r",
+ cl_ref, cl_map, sp_map);
+
+ return;
+}
+
+void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
+{
+ pub_dispatch(cl_map, mqp);
+ return;
+}
+
+/* Move to mqtt_common.h and remove it from mqtt_client.h */
+static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
+{
+ mqp_free(mqp);
+ return MQP_ERR_PKT_LEN;
+}
+
+/* Move this to a common file */
+int32_t cl_pub_msg_send(void *usr_cl,
+ const struct utf8_string *topic, const uint8_t *data_buf,
+ uint32_t data_len, enum mqtt_qos qos, bool retain)
+{
+ struct mqtt_packet *mqp = NULL;
+
+ if((NULL == topic) ||
+ ((data_len > 0) && (NULL == data_buf)) ||
+ (NULL == usr_cl))
+ return MQP_ERR_FNPARAM;
+
+ mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
+ if(NULL == mqp)
+ return MQP_ERR_PKT_AVL;
+
+ if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
+ (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len))))
+ return len_err_free_mqp(mqp);
+
+ mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
+
+ pub_dispatch(_cl_bmap_get(usr_cl), mqp);
+
+ return MQP_CONTENT_LEN(mqp);
+}
+
+static void wl_remove(struct mqtt_ack_wlist *list,
+ struct mqtt_packet *prev,
+ struct mqtt_packet *elem)
+{
+ if(prev)
+ prev->next = elem->next;
+ else
+ list->head = elem->next;
+
+ if(NULL == list->head)
+ list->tail = NULL;
+
+ if(list->tail == elem)
+ list->tail = prev;
+
+ return;
+}
+
+static void sess_wl_mqp_dispatch(struct client_usr *cl_usr)
+{
+ struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
+ uint32_t cl_map = CL_BMAP(cl_usr);
+
+ for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) {
+ struct mqtt_packet *cpy = NULL;
+
+ if(0 == (MQP_CL_MAP_GET(mqp) & cl_map))
+ continue; /* MQP & CL: no association */
+
+ /* MQP has associated client(s) - process it */
+
+ /* Dissociate this client from MQP */
+ MQP_CL_MAP_CLR(mqp, cl_map);
+ next = mqp->next; /* House keeping */
+
+ if(0 == MQP_CL_MAP_GET(mqp)) {
+ /* MQP w/ no clients, remove from WL */
+ wl_remove(sess_mqp_wl, prev, mqp);
+
+ /* Ensure correct arrangement for WL */
+ cpy = mqp; mqp = prev;
+ } else {
+ /* MQP is associated w/ other clients */
+ cpy = mqp_server_copy(mqp);
+ if(NULL == cpy)
+ continue;
+ }
+
+ /* Got packet from session, dispatch it to CL */
+ pub_dispatch(cl_map, cpy);
+ }
+
+ return;
+}
+
+static
+struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl,
+ struct client_usr *cl_usr, uint16_t msg_id,
+ struct mqtt_packet **prev)
+{
+ struct mqtt_packet *mqp = NULL;
+
+ *prev = NULL;
+ for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) {
+ if(msg_id == mqp->msg_id) {
+ /* Found a MQP whose msg_id matches with input */
+ MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr));
+ return mqp;
+ }
+ }
+
+ return NULL;
+}
+
+static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev,
+ struct mqtt_packet *mqp)
+{
+ if(0 == MQP_CL_MAP_GET(mqp)) {
+ wl_remove(wl, prev, mqp);
+ mqp_free(mqp);
+
+ return true;
+ }
+
+ return false;
+}
+
+static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ struct mqtt_packet *prev = NULL, *mqp = NULL;
+
+ if(false == has_clean_session(cl_usr)) {
+ mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
+ if(mqp)
+ wl_rmfree_try(qos_ack1_wl, prev, mqp);
+ }
+
+ return true;
+}
+
+static
+void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr)
+{
+ struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
+ uint32_t bmap = CL_BMAP(cl_usr);
+
+ for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) {
+ next = mqp->next;
+
+ /* Ideally, check should be done to see if cl_usr and mqp are
+ associated. If yes, then the bit should be cleared. At the
+ moment, blind clearing of the bit has been implemented and
+ it has no side effects.
+ */
+ MQP_CL_MAP_CLR(mqp, bmap);
+
+ /* MQP with no clients has no more use, so try deleting MQP */
+ if(wl_rmfree_try(wl, prev, mqp))
+ mqp = prev; /* MQP deleted, prep for next iteration */
+ }
+
+ return;
+}
+
+static void ack1_wl_mqp_purge(struct client_usr *cl_usr)
+{
+ wl_purge(qos_ack1_wl, cl_usr);
+}
+
+static void sess_wl_mqp_purge(struct client_usr *cl_usr)
+{
+ wl_purge(sess_mqp_wl, cl_usr);
+}
+
+static void session_resume(struct client_usr *cl_usr)
+{
+ ack1_wl_mqp_dispatch(cl_usr);
+ ack2_msg_id_dispatch(cl_usr);
+ sess_wl_mqp_dispatch(cl_usr);
+}
+
+static void session_delete(struct client_usr *cl_usr)
+{
+ ack1_wl_mqp_purge(cl_usr);
+ ack2_msg_id_purge(cl_usr);
+ sess_wl_mqp_purge(cl_usr);
+}
+
+static bool has_session_data(struct client_usr *cl_usr)
+{
+ uint32_t map = CL_BMAP(cl_usr);
+ struct mqtt_packet *elem;
+
+ if(cl_usr->n_sub)
+ return true;
+
+ elem = qos_ack1_wl->head;
+ while(elem) {
+ if(MQP_CL_MAP_GET(elem) & map)
+ return true;
+
+ elem = elem->next;
+ }
+
+ elem = sess_mqp_wl->head;
+ while(elem) {
+ if(MQP_CL_MAP_GET(elem) & map)
+ return true;
+
+ elem = elem->next;
+ }
+
+ return false;
+}
+
+bool cl_can_session_delete(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ return cl_usr? (has_clean_session(cl_usr) ||
+ !has_session_data(cl_usr)) : false;
+}
+
+void cl_on_net_close(void *usr_cl)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(is_assigned(cl_usr)) {
+ if(false == has_session_data(cl_usr))
+ cl_usr_free(cl_usr);
+ } else if(has_clean_session(cl_usr)) {
+ session_delete(cl_usr);
+ cl_usr_free(cl_usr);
+ } else {
+ set_connect_state(cl_usr, false);
+ cl_usr->ctx = NULL;
+ cl_usr->app = NULL;
+ }
+
+ return;
+}
+
+static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP,
+ MQTT_QOS0, true, msg_id);
+
+ if(qos2_pub_rx_is_done(cl_usr, msg_id))
+ qos2_pub_rx_unlog(cl_usr, msg_id);
+
+ return true;
+}
+
+static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id)
+{
+ struct mqtt_packet *prev = NULL, *mqp = NULL;
+
+ mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
+ if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) {
+
+ wl_rmfree_try(qos_ack1_wl, prev, mqp);
+
+ mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL,
+ MQTT_QOS1, true, msg_id);
+
+ return true;
+ }
+
+ return false;
+}
+
+bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+ bool rv = false;
+
+ if(NULL == cl_usr)
+ return rv;
+
+ switch(msg_type) {
+
+ case MQTT_PUBACK:
+ rv = ack1_unmap_rmfree_try(cl_usr, msg_id);
+ break;
+
+ case MQTT_PUBREC:
+ rv = proc_pub_rec_rx(cl_usr, msg_id);
+ break;
+
+ case MQTT_PUBREL:
+ rv = proc_pub_rel_rx(cl_usr, msg_id);
+ break;
+
+ case MQTT_PUBCOMP:
+ rv = ack2_msg_id_unlog(cl_usr, msg_id);
+ break;
+
+ default:
+ break;
+ }
+
+ return rv;
+}
+
+static void assign_cl_index_as_id(struct client_usr *cl_usr)
+{
+ /* TBD: need a better implementation */
+ char *client_id = cl_usr->client_id;
+
+ client_id[0] = 'S';
+ client_id[1] = 'e';
+ client_id[2] = 'l';
+ client_id[3] = 'f';
+ client_id[4] = '-';
+ client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30;
+ client_id[6] = ((cl_usr->index & 0x0f)) + 0x30;
+ client_id[7] = '\0';
+
+ /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */
+
+ return;
+}
+
+static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf)
+{
+ struct client_usr *cl_usr, *fr_usr = NULL;
+ int32_t idx = 0;
+ for(idx = 0; idx < MAX_CLIENTS; idx++) {
+ cl_usr = users + idx;
+ if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr))
+ fr_usr = cl_usr; /* Note 1st free cl_usr */
+
+ if((NULL == client_id) && (NULL != fr_usr)) {
+ /* Free cl_usr is present to create CL-ID */
+ break;
+
+ } else if((NULL != client_id) &&
+ (0 == strncmp(cl_usr->client_id, client_id,
+ MAX_CLIENT_ID_LEN))) {
+ /* Found a client obj with exact ID match */
+ if(is_connected(cl_usr)) {
+ /* Error: CL-ID is already active */
+ vh_buf[1] = CONNACK_RC_CLI_REJECT;
+ cl_usr = NULL;
+ }
+ break;
+ }
+ }
+
+ if(idx == MAX_CLIENTS) { /* Did not find a match */
+ cl_usr = fr_usr;
+ if(NULL == cl_usr)
+ /* Server Resource unavailable */
+ vh_buf[1] = CONNACK_RC_SVR_UNAVBL;
+ }
+
+ if(NULL != cl_usr) {
+ if(NULL == client_id)
+ assign_cl_index_as_id(cl_usr); /* Get ID */
+ else if(IS_CL_USR_FREE(cl_usr))
+ strncpy(cl_usr->client_id, client_id,
+ MAX_CLIENT_ID_LEN);
+
+ set_assigned(cl_usr, true);
+ }
+
+ USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r",
+ cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1,
+ cl_usr? cl_usr->client_id : "");
+
+ return cl_usr;
+}
+
+uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
+ void *app_cl, void *will, void **usr_cl)
+{
+ uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT};
+ struct client_usr *cl_usr;
+
+ if(client_id && ('\0' == client_id[0]))
+ /* Shouldn't happen: CLIENT ID with a NUL only string */
+ return CONNACK_RC_CLI_REJECT;
+
+ cl_usr = assign_cl_usr(client_id, vh_buf);
+ if(NULL == cl_usr)
+ return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */
+
+ if((false == clean_session) && has_session_data(cl_usr))
+ vh_buf[0] = 0x01; /* Set Session Present Flag */
+
+#if 0
+ if(0x00 == vh_buf[1])
+ *usr_cl = (void*)cl_usr;
+#endif
+ *usr_cl = (void*)cl_usr;
+
+ cl_usr->ctx = ctx_cl;
+ cl_usr->app = app_cl;
+ cl_usr->will = will;
+
+ return ((vh_buf[0] << 8) | vh_buf[1]);
+}
+
+void cl_on_connack_send(void *usr_cl, bool clean_session)
+{
+ struct client_usr *cl_usr = (struct client_usr*) usr_cl;
+
+ if(false == is_assigned(cl_usr)) {
+ USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r",
+ cl_usr->index);
+ return;
+ }
+
+ set_assigned(cl_usr, false);
+ set_clean_session(cl_usr, clean_session);
+ set_connect_state(cl_usr, true);
+
+ if(clean_session) {
+ session_delete(cl_usr);
+ } else {
+ session_resume(cl_usr);
+ }
+
+ DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id);
+
+ return;
+}
+
+int32_t cl_mgmt_init(void)
+{
+ cl_usr_init();
+
+ return 0;
+}
+
+}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 client_mgmt.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/client_mgmt.h Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,69 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#ifndef __CLIENT_MGMT_H__
+#define __CLIENT_MGMT_H__
+
+#include "mqtt_common.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+namespace mbed_mqtt {
+
+uint32_t cl_bmap_get(void *usr_cl);
+
+void *cl_app_hndl_get(void *usr_cl);
+
+void *cl_will_hndl_get(void *usr_cl);
+
+bool cl_can_session_delete(void *usr_cl);
+
+void cl_sub_count_add(void *usr_cl);
+
+void cl_sub_count_del(void *usr_cl);
+
+bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id);
+
+void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp);
+
+int32_t cl_pub_msg_send(void *usr_cl,
+ const struct utf8_string *topic, const uint8_t *data_buf,
+ uint32_t data_len, enum mqtt_qos qos, bool retain);
+
+void cl_on_net_close(void *usr_cl);
+
+bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id);
+
+/* uint16_t composition: MSB is CONNACK-Flags and LSB is CONNACK-RC. The place-holder
+ '*usr_cl' has valid value, only if CONNACK-RC in return value is 0.
+*/
+uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
+ void *app_cl, void *will, void **usr_cl);
+
+void cl_on_connack_send(void *usr_cl, bool clean_session);
+
+int32_t cl_mgmt_init(void);
+
+}//namespace mbed_mqtt
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff -r 000000000000 -r 547251f42a60 server_core.cpp
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_core.cpp Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,1695 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "client_mgmt.h"
+#include "server_util.h"
+#include "server_pkts.h"
+#include "server_plug.h"
+#include "server_core.h"
+
+namespace mbed_mqtt {
+
+#ifndef CFG_SR_MAX_SUBTOP_LEN
+#define MAX_SUBTOP_LEN 32
+#else
+#define MAX_SUBTOP_LEN CFG_SR_MAX_SUBTOP_LEN
+#endif
+
+#ifndef CFG_SR_MAX_TOPIC_NODE
+#define MAX_TOP_NODES 32
+#else
+#define MAX_TOP_NODES CFG_SR_MAX_TOPIC_NODE
+#endif
+
+/* A topic name (or string or tree or hierarchy) is handled as a series of nodes.
+ A 'topic node' refers to a level in the topic tree or hierarchy and it stores
+ the associated part of the topic string at that level. For example, a topic
+ having a name "abc/x/1" will have three nodes for each of the subtopics, viz,
+ "abc/", "x/" and "1". Further, the association between the nodes is indicated
+ as following.
+
+ -- down link hierarchy --> -- down link hierarchy -->
+ "abc/" "x/" "1"
+ <-- up link hierarchy -- <-- up link hierarchy --
+
+ To extend this example, another topic having a name "abc/y/2" will be managed
+ as following.
+
+ -- down link hierarchy --> -- down link hierarchy -->
+ "abc/" "x/" "1"
+ <-- up link hierarchy -- <-- up link hierarchy --
+ | ^
+ | |
+ down link neighbour up link neighbour
+ | |
+ V |
+ -- down link hierarchy -->
+ "y/" "2"
+ <-- up link hierarchy --
+
+ To reduce wasted in byte alignments, the structure has been hand packed.
+*/
+static struct topic_node {
+
+ struct topic_node *dn_nhbr; /* Down Link Neighbour node */
+ struct topic_node *up_nhbr; /* Up Link Neighbour node */
+
+ struct topic_node *dn_hier; /* Down Link Hierarchy node */
+ struct topic_node *up_hier; /* Up Link Hierarchy node */
+
+ uint32_t cl_map[3]; /* Subscribers for each QOS */
+
+ uint8_t *my_data; /* Leaf node: retained data */
+ uint32_t my_dlen;
+
+ void *will_cl; /* CL OBJ of WILL Leaf node */
+
+#define TNODE_PROP_RETAIN_DATA 0x04
+ /* Bits 0,1 for QOS and rest are flags */
+ uint8_t my_prop;
+
+ uint8_t pg_map; /* Map: application plugins */
+
+ uint16_t toplen; /* Length of node sub-topic */
+ char subtop[MAX_SUBTOP_LEN];/* NUL terminated sub-topic */
+
+} nodes[MAX_TOP_NODES];
+
+/*
+ Housekeeping to manage subtopics (nodes) at run-time.
+*/
+static struct _node_stack {
+ struct topic_node *node;
+ uint32_t val1;
+ uint32_t val2;
+
+} node_stack[MAX_TOP_NODES];
+
+static int32_t stack_idx = 0;
+
+static void stack_add(struct topic_node *node, uint32_t val1, uint32_t val2)
+{
+ node_stack[stack_idx].node = node;
+ node_stack[stack_idx].val1 = val1;
+ node_stack[stack_idx].val2 = val2;
+
+ stack_idx++;
+}
+
+static inline struct _node_stack *stack_pop()
+{
+ if(0 == stack_idx)
+ return NULL;
+
+ return node_stack + (--stack_idx);
+}
+
+static inline bool is_node_retain(const struct topic_node *node)
+{
+ return (node->my_prop & TNODE_PROP_RETAIN_DATA)? true : false;
+}
+
+static inline void node_retain_set(struct topic_node *node, bool retain)
+{
+ if(retain)
+ node->my_prop |= TNODE_PROP_RETAIN_DATA;
+ else
+ node->my_prop &= ~TNODE_PROP_RETAIN_DATA;
+}
+
+static inline void node_qid_set(struct topic_node *node, uint8_t qid)
+{
+ node->my_prop &= ~QID_VMASK;
+ node->my_prop |= qid;
+}
+
+static inline uint8_t node_qid_get(struct topic_node *node)
+{
+ return node->my_prop & QID_VMASK;
+}
+
+static inline bool is_node_willed(const struct topic_node *node)
+{
+ return (NULL != node->will_cl)? true : false;
+}
+
+static inline bool enrolls_plugin(const struct topic_node *node)
+{
+ return (PG_MAP_ALL_DFLTS != node->pg_map)? true : false;
+}
+
+static struct topic_node *free_nodes = NULL;
+static struct topic_node *root_node = NULL;
+
+static void node_reset(struct topic_node *node)
+{
+ node->dn_nhbr = NULL;
+ node->up_nhbr = NULL;
+ node->dn_hier = NULL;
+ node->up_hier = NULL;
+
+ node->cl_map[0] = node->cl_map[1] = node->cl_map[2] = 0;
+ node->my_data = NULL;
+ node->my_dlen = 0;
+ node->will_cl = NULL;
+ node->my_prop = QID_VMASK;
+
+ node->pg_map = PG_MAP_ALL_DFLTS;
+ node->toplen = 0;
+}
+
+static void topic_node_init(void)
+{
+ int i = 0;
+
+ for(i = 0; i < MAX_TOP_NODES; i++) {
+ struct topic_node *node = nodes + i;
+
+ node_reset(node);
+
+ node->dn_nhbr = free_nodes;
+ free_nodes = node;
+ }
+
+ return;
+}
+
+static struct topic_node *alloc_topic_node(void)
+{
+ struct topic_node *node = free_nodes;
+ if(NULL != node) {
+ free_nodes = node->dn_nhbr;
+
+ node_reset(node);
+ }
+
+ return node;
+}
+
+static void free_node(struct topic_node *node)
+{
+ node_reset(node);
+
+ node->dn_nhbr = free_nodes;
+ free_nodes = node;
+}
+
+static bool is_end_of_subtop(const char *topstr, char const **next_subtop)
+{
+ bool rv = false;
+
+ if('\0' == *topstr) {
+ *next_subtop = NULL; /* Reached end of topstr */
+ rv = true;
+ }
+
+ if('/' == *topstr) {
+ /* Next sub-topic is NULL, if a '\0' follows '/' */
+ *next_subtop = *(topstr + 1)? topstr + 1 : NULL;
+ rv = true;
+ }
+
+ return rv;
+}
+
+static int32_t subtop_read(const char *topstr, char *subtop_buf, uint16_t buf_length,
+ char const **next_subtop)
+{
+ int32_t idx = 0, len = buf_length;
+
+ *next_subtop = topstr;
+
+ while(idx < (len - 1)) {
+ subtop_buf[idx++] = *topstr;
+
+ if(is_end_of_subtop(topstr, next_subtop))
+ break;
+
+ topstr++;
+ }
+
+ if(idx == len) {
+ USR_INFO("S: Fatal, insufficient buffer for sub-str\n\r");
+ return -1; /* zero or insufficient buffer provided */
+ }
+
+ /* NUL terminated buffer: unless '\0' ended copy, make one */
+ if('\0' == subtop_buf[idx - 1])
+ idx--; /* A fix up */
+ else
+ subtop_buf[idx] = '\0';
+
+ return idx;
+}
+
+static
+struct topic_node *alloc_node_subtop(const char *topstr, char const **next_subtop)
+{
+ uint16_t len = 0;
+ struct topic_node *node = alloc_topic_node();
+ if(NULL == node)
+ return NULL;
+
+ len = subtop_read(topstr, node->subtop, MAX_SUBTOP_LEN, next_subtop);
+ if(len <= 0) {
+ free_node(node);
+ return NULL;
+ }
+
+ node->toplen = len;
+
+ return node;
+}
+
+/* Returns true if string 'subtop' is part of string 'topstr' otherwise false.
+ Additionally, on success, pointer to next subtopic in 'topstr' is provided.
+*/
+bool is_first_subtop(const char *subtop, const char *topstr, char const **next_subtop)
+{
+ while(*subtop == *topstr) {
+ if(is_end_of_subtop(topstr, next_subtop))
+ return true;
+
+ subtop++;
+ topstr++;
+ }
+
+ return false;
+}
+
+/* Find a topic node in neighbour list that matches first subtopic in 'topstr'.
+ Additionally, on success, pointer to next subtopic in 'topstr' is provided.
+*/
+struct topic_node *subtop_nhbr_node_find(const struct topic_node *root_nh,
+ const char *topstr,
+ char const **next_subtop)
+{
+ /* This routine does not make use of 'next_subtop' */
+ while(root_nh) {
+ if(true == is_first_subtop(root_nh->subtop, topstr, next_subtop))
+ break;
+
+ root_nh = root_nh->dn_nhbr;
+ }
+
+ return (struct topic_node*) root_nh;/* Bad: const from poiner removed */
+}
+
+/* Find a topic node in neighbour list that matches the given 'subtop' string */
+struct topic_node *nhbr_node_find(const struct topic_node *root_nh,
+ const char *subtop)
+{
+ const char *next_subtop = NULL;
+
+ return subtop_nhbr_node_find(root_nh, subtop, &next_subtop);
+}
+
+/* Find leaf node of branch-combo that matches complete 'topstr'.
+ Modus Operandi: For each sub topic in 'topstr', go across neighbour list,
+ then for matching neighbour node, make its child node as root of neighbour
+ list for another iteration for next sub topic.
+*/
+/* Complex */
+static struct topic_node *lowest_node_find(const char *topstr, bool *flag_nh,
+ char const **next_subtop)
+{
+ struct topic_node *root_nh = root_node, *node = NULL;
+
+ *next_subtop = topstr;
+ *flag_nh = false;
+
+ while(root_nh) {
+ node = subtop_nhbr_node_find(root_nh, topstr, next_subtop);
+ if(NULL == node) { /* Partial or no match */
+ *flag_nh = true;
+ node = root_nh;
+ break;
+ }
+
+ /* NULL '*next_subtop' indicates a complete match */
+ if(NULL == (*next_subtop))
+ break;
+
+ root_nh = node->dn_hier;
+ topstr = *next_subtop;
+ }
+
+ return node;
+}
+
+struct topic_node *leaf_node_find(const char *topstr)
+{
+ const char *next_subtop;
+ bool flag_nh;
+ struct topic_node *node = lowest_node_find(topstr, &flag_nh,
+ &next_subtop);
+
+ return (NULL == next_subtop)? node : NULL;
+}
+
+static void try_node_delete(struct topic_node *node); /* Forward declaration */
+
+/* Create 'child only' hierarchy of nodes to hold all sub topics in 'topstr'.
+ The routine returns a start node of hierarchy and also provides leaf node.
+*/
+static
+struct topic_node *hier_nodes_create(const char *topstr, struct topic_node **leaf)
+{
+ struct topic_node *base = NULL, *node = NULL, *prev = NULL;
+ const char *next_subtop = NULL;
+
+ do {
+ node = alloc_node_subtop(topstr, &next_subtop);
+
+ if(NULL == node) {
+ /* Allocation of a node failed, free up
+ the previous allocations, if any, in
+ the hierarchy that was being created */
+ if(prev)
+ try_node_delete(prev);
+
+ base = NULL;
+ break;
+ }
+
+ if(NULL == prev) {
+ // prev = node;
+ base = node; /* First node of hierarchy */
+ } else {
+ prev->dn_hier = node;
+ node->up_hier = prev;
+ // prev = node;
+ }
+
+ prev = node;
+ topstr = next_subtop;
+
+ } while(next_subtop);
+
+ *leaf = node;
+
+ DBG_INFO("S: Hierarchy of nodes created: Base: %s & Leaf: %s\n\r",
+ base? base->subtop : " ", (*leaf)? (*leaf)->subtop : " ");
+
+ return base;
+}
+
+static void install_nhbr_node(struct topic_node *base, struct topic_node *node)
+{
+ while(base->dn_nhbr) {
+ base = base->dn_nhbr;
+ }
+
+ base->dn_nhbr = node;
+ node->up_nhbr = base;
+
+ DBG_INFO("S: %s added as a neighbour to %s\n\r",
+ node->subtop, base->subtop);
+}
+
+static void set_up_hier_nodes(struct topic_node *up_hier,
+ struct topic_node *dn_hier)
+{
+ up_hier->dn_hier = dn_hier;
+ dn_hier->up_hier = up_hier;
+
+ DBG_INFO("%s added as DN HIER to %s \n\r",
+ dn_hier->subtop, up_hier->subtop);
+}
+
+static inline void install_topic_root_node(struct topic_node *node)
+{
+ root_node = node;
+}
+
+/* Create or update tree to create branch-combo to refer to 'topstr'.
+ Modus operandi:
+
+*/
+struct topic_node *topic_node_create(const char *topstr)
+{
+ struct topic_node *base, *node, *leaf;
+ const char *next_subtop = topstr;
+ bool flag_nh;
+
+ base = lowest_node_find(topstr, &flag_nh, &next_subtop);
+ if(NULL == next_subtop)
+ return base; /* Found exact match */
+
+ /* Control reaches here, if either no or partial branch-combo has been
+ found for 'topstr'. Now, let's create remaining branches for string
+ 'next_subtop' and assign them to appropriately to topic tree.
+ */
+ DBG_INFO("S: Creating Hierarchy for %s\n\r", next_subtop);
+
+ node = hier_nodes_create(next_subtop, &leaf);
+ if(node) {
+ if(NULL == base)
+ install_topic_root_node(node);
+ else if(flag_nh)
+ install_nhbr_node(base, node);
+ else
+ set_up_hier_nodes(base, node);
+
+ return leaf;
+ }
+
+ return NULL;
+}
+
+static bool can_delete_node(const struct topic_node *node)
+{
+ if(node->up_nhbr && node->up_hier) {
+ USR_INFO("S: fatal, node w/ up-nhbr & up-hier.\n\r");
+ return false;
+ }
+
+ if(node->dn_nhbr ||
+ node->dn_hier)
+ return false;
+
+ return true;
+}
+
+static struct topic_node *node_delete(struct topic_node *node)
+{
+ struct topic_node *next = NULL;
+
+ if(false == can_delete_node(node))
+ return NULL;
+
+ if(node->up_nhbr) {
+ node->up_nhbr->dn_nhbr = NULL;
+ next = node->up_nhbr;
+ }
+
+ if(node->up_hier) {
+ node->up_hier->dn_hier = NULL;
+ next = node->up_hier;
+ }
+
+ if((NULL == node->up_nhbr) &&
+ (NULL == node->up_hier))
+ root_node = NULL;
+
+ DBG_INFO("S: Deleted node %s\n\r", node->subtop);
+
+ free_node(node);
+
+ return next;
+}
+
+struct topbuf_desc {
+ char *buffer;
+ uint16_t maxlen;
+ uint16_t offset;
+};
+
+static bool topbuf_add(struct topbuf_desc *buf_desc, const struct topic_node *node)
+{
+ const char *next_subtop;
+ char *buf = buf_desc->buffer + buf_desc->offset;
+ uint16_t len = buf_desc->maxlen - buf_desc->offset;
+
+ int32_t rv = subtop_read(node->subtop, buf, len, &next_subtop);
+ if(rv < 0)
+ return false;
+
+ if(NULL != next_subtop) {
+ USR_INFO("S: topstr_add fatal, bad subtop.\n\r");
+ return false;
+ }
+
+ buf_desc->offset += rv;
+
+ return true;
+}
+
+static bool topbuf_cpy(struct topbuf_desc *buf_desc, const char *subtop)
+{
+ const char *next_subtop;
+ char *buf = buf_desc->buffer + buf_desc->offset;
+ uint16_t len = buf_desc->maxlen - buf_desc->offset;
+
+ int32_t rv = subtop_read(subtop, buf, len, &next_subtop);
+ if(rv < 0)
+ return false;
+
+ if(NULL != next_subtop) {
+ USR_INFO("S: topstr_copy fatal, bad subtop.\n\r");
+ return false;
+ }
+
+ buf_desc->offset += rv;
+
+ return true;
+}
+
+static bool has_a_wildcard(const struct topic_node *node)
+{
+ const char *str = node->subtop;
+ while('\0' != *str) {
+ if(('+' == *str) || ('#' == *str))
+ return true;
+
+ str++;
+ }
+
+ return false;
+}
+
+static bool is_node_SUB_subtop(const struct topic_node *node, const char *subtop)
+{
+ if(false == has_a_wildcard(node))
+ return ((0 == strcmp(node->subtop, subtop)) ||
+ (node->dn_hier && (0 == strcmp("+/", subtop))) ||
+ (!node->dn_hier && (0 == strcmp("+", subtop))))?
+ true : false;
+
+ return false;
+}
+
+/* Search node tree, created by PUB retention, for the branch combo, whose
+ absolute sub-topic sequence 'matches', in entirety, the topic, which is
+ being subscribed. The 'match' criteria is met either through the
+ wildcard sub-topics or exact compare.
+
+ The hierarchical search starts at the specified 'base' node and ends at
+ the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the
+ 'topSUB', then the leaf node is returned, otherwise a NULL is returned.
+
+ As part of hierarchical search, neighbouring nodes, if any, are logged
+ for subsequent iteration of the routine.
+*/
+static struct topic_node *pub_hier_search(const char *topSUB,
+ const struct topic_node *base,
+ struct topbuf_desc *mk_pubtop)
+{
+ const struct topic_node *node = base, *prev = NULL;
+ const char *next_subtop = topSUB;
+ char subtop[MAX_SUBTOP_LEN];
+
+ while(next_subtop && node) {
+ if(subtop_read(topSUB, subtop, MAX_SUBTOP_LEN,
+ &next_subtop) <= 0)
+ break;
+
+ if(node->dn_nhbr)
+ stack_add(node->dn_nhbr, (uint32_t)topSUB, mk_pubtop->offset);
+
+ if(false == is_node_SUB_subtop(node, subtop))
+ break; /* Node doesn't form part of published topic */
+
+ if(false == topbuf_add(mk_pubtop, node))
+ break;
+
+ prev = node;
+ node = node->dn_hier;
+
+ topSUB = next_subtop;
+ }
+
+ if(NULL != next_subtop)
+ node = NULL;
+ else
+ node = prev;
+
+ return (struct topic_node *)node;
+}
+
+static bool is_node_PUB_subtop(const struct topic_node *node,
+ const char *subtop, bool endtop)
+{
+ /* Assumes that subtop hasn't got any wildcard characater */
+ return ((0 == strcmp(subtop, node->subtop)) ||
+ (!endtop && (0 == strcmp("+/", node->subtop))) ||
+ (endtop && (0 == strcmp("+", node->subtop))))?
+ true : false;
+}
+
+/* Search node tree, created by subscriptions, for the branch combo, whose
+ sub-topic sequence 'matches', in entirety, the absolute topic, to which
+ data has been published. The 'match' criteria is met either through the
+ wildcard sub-topics or exact compare.
+
+ The hierarchical search starts at the specified 'base' node and ends at
+ the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the
+ 'topPUB', then the leaf node is returned, otherwise a NULL is returned.
+
+ As part of hierarchical search, neighbouring nodes, if any, are logged
+ for subsequent iteration of the routine.
+*/
+static struct topic_node *SUB_leaf_search(const char *topPUB,
+ const struct topic_node *base)
+{
+ const struct topic_node *node = base, *prev = NULL;
+ const char *next_subtop = topPUB;
+ char subtop[MAX_SUBTOP_LEN];
+
+ while(next_subtop && node) {
+ if(subtop_read(topPUB, subtop, MAX_SUBTOP_LEN,
+ &next_subtop) <= 0)
+ break;
+
+ if(node->dn_nhbr)
+ stack_add(node->dn_nhbr, (uint32_t)topPUB, 0);
+
+ if(0 == strcmp("#", node->subtop))
+ goto SUB_leaf_search_exit1;
+
+ if(false == is_node_PUB_subtop(node, subtop, !next_subtop))
+ break; /* Node doesn't form part of published topic */
+
+ prev = node;
+ node = node->dn_hier;
+
+ topPUB = next_subtop;
+ }
+
+ if(NULL != next_subtop)
+ node = NULL;
+ else
+ node = prev;
+
+SUB_leaf_search_exit1:
+ return (struct topic_node*) node; // Bad
+}
+
+
+/*-------------------------------------------------------------------------
+ * MQTT Routines
+ *-------------------------------------------------------------------------
+ */
+
+#define WBUF_LEN MQP_SERVER_RX_LEN /* Assignment to ease implementation */
+static char work_buf[WBUF_LEN];
+
+static void try_node_delete(struct topic_node *node)
+{
+ while(node) {
+
+ if(is_node_retain(node) ||
+ is_node_willed(node) ||
+ enrolls_plugin(node) ||
+ node->cl_map[0] ||
+ node->cl_map[1] ||
+ node->cl_map[2])
+ break;
+
+ node = node_delete(node);
+ }
+}
+
+/* Move this to a common file */
+static void pub_msg_send(const struct utf8_string *topic, const uint8_t *data_buf,
+ uint32_t data_len, uint8_t fh_flags, uint32_t cl_map)
+{
+ enum mqtt_qos qos = ENUM_QOS(fh_flags);
+ struct mqtt_packet *mqp = NULL;
+
+ mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
+ if(NULL == mqp)
+ return;
+
+ if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
+ (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) {
+ mqp_free(mqp);
+ return;
+ }
+
+ mqp_prep_fh(mqp, fh_flags);
+
+ if(cl_map)
+ cl_pub_dispatch(cl_map, mqp);
+
+ return;
+}
+
+static struct topic_node *SUB_node_create(const char *topSUB, uint8_t qid, void *usr_cl)
+{
+ struct topic_node *leaf = topic_node_create(topSUB);
+ if(leaf) {
+ uint8_t j = 0;
+ uint32_t map = cl_bmap_get(usr_cl);
+
+ for(j = 0; j < 3; j++)
+ /* Client: clear QOS of existing sub, if any */
+ leaf->cl_map[j] &= ~map;
+
+ leaf->cl_map[qid] |= map;
+
+ cl_sub_count_add(usr_cl);
+ }
+
+ return leaf;
+}
+
+static uint8_t proc_pub_leaf(struct topic_node *leaf, const struct utf8_string *topic,
+ enum mqtt_qos qos, void *usr_cl)
+{
+ uint8_t qid = QOS_VALUE(qos);
+
+ if(is_node_retain(leaf)) {
+ /* If it is an earlier created topic w/ retained
+ data, then pick lower of the two QOS(s) */
+ qid = MIN(node_qid_get(leaf), qid);
+
+ /* Publish the retained data to this client */
+ pub_msg_send(topic, leaf->my_data, leaf->my_dlen,
+ MAKE_FH_FLAGS(false, qid, true),
+ cl_bmap_get(usr_cl));
+ }
+
+ return qid;
+}
+
+/* Multi-level wild-card subscription - search of all of the tree i.e.
+ "no topic" ('no_top') in particular and publish it to client, if
+ the hierarchy has no wild-card node. */
+static
+uint8_t proc_pub_hier_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop,
+ enum mqtt_qos qos, void *usr_cl)
+{
+ struct topic_node *node = base, *leaf = NULL;
+ uint8_t ack = QOS_VALUE(qos);
+
+ /* 1. Find the leaf node of a non wildcard branch-combo */
+ while(node) {
+ if(node->dn_nhbr)
+ stack_add(node->dn_nhbr, 0, mk_pubtop->offset);
+
+ if(has_a_wildcard(node))
+ break;
+
+ if(false == topbuf_add(mk_pubtop, node))
+ break;
+
+ leaf = node;
+ node = node->dn_hier;
+ }
+
+ /* A non NULL value of 'node' would indicate a hierarchy with a
+ wildcard (sub-)topic (the 'node') - not suitable for PUB. */
+
+ if(NULL == node) {
+ /* 2. Send retained data, if any, to SUB Client */
+ struct utf8_string topic = {mk_pubtop->buffer,
+ mk_pubtop->offset};
+
+ /* In this version, at this juncture, the 'leaf'
+ will not be NULL. Nevertheless a check (for
+ the sake of static analytical tools).........*/
+ if(leaf)
+ ack = proc_pub_leaf(leaf, &topic, qos, usr_cl);
+ }
+
+ return ack;
+}
+
+/* Multi-level wild-card subscription - search of all of the tree i.e.
+ "no topic" ('no_top') in particular and publish it to client, if
+ a hierarchy in the tree has no wild-card node. */
+static
+uint8_t proc_pub_tree_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop,
+ enum mqtt_qos qos, void *usr_cl)
+{
+ uint32_t stack_ref = stack_idx;
+ uint8_t min = QOS_VALUE(qos);
+
+ if(base != NULL)
+ stack_add(base, 0, mk_pubtop->offset);
+
+ while(stack_ref < stack_idx) {
+ struct _node_stack *stack = stack_pop();
+ uint8_t ack;
+
+ mk_pubtop->offset = (uint16_t) stack->val2;
+
+ ack = proc_pub_hier_no_top(stack->node, mk_pubtop, qos, usr_cl);
+ if(ack < min)
+ min = ack;
+ }
+
+ return min;
+}
+
+static uint8_t proc_pub_hier_SUBtop(const char *topSUB, const struct topic_node *base,
+ struct topbuf_desc *mk_pubtop, enum mqtt_qos qos,
+ void *usr_cl)
+{
+ struct topic_node *leaf = pub_hier_search(topSUB, base, mk_pubtop);
+ uint8_t min = QOS_VALUE(qos);
+
+ if(leaf) {
+ struct utf8_string topic = {mk_pubtop->buffer,
+ mk_pubtop->offset};
+
+ min = proc_pub_leaf(leaf, &topic, qos, usr_cl);
+ }
+
+ return min;
+}
+
+/* used by sl or no wc */
+static uint8_t proc_pub_tree_SUBtop(const char *topSUB, struct topic_node *base,
+ struct topbuf_desc *mk_pubtop,
+ enum mqtt_qos qos, void *usr_cl)
+{
+ uint32_t stack_ref = stack_idx;
+ uint8_t min = QOS_VALUE(qos);
+
+ if(NULL != base)
+ stack_add(base, (uint32_t)topSUB, mk_pubtop->offset);
+
+ while(stack_ref < stack_idx) {
+ struct _node_stack *stack = stack_pop();
+ uint8_t ack;
+
+ mk_pubtop->offset = stack->val2;
+ ack = proc_pub_hier_SUBtop((char*)stack->val1, stack->node,
+ mk_pubtop, qos, usr_cl);
+
+ if(ack < min)
+ min = ack;
+ }
+
+ return min;
+}
+
+static
+uint8_t proc_sub_ml_wc_hier(const char *grandpa_topSUB, char *parent_subtop,
+ struct topic_node *base, struct topbuf_desc *mk_pubtop,
+ enum mqtt_qos qos, void *usr_cl)
+{
+ uint8_t min = QOS_VALUE(qos), ack = QFL_VALUE;
+ char *subtop = NULL;
+
+ /* 1. Search hier node for 'grandpa' and if found, get to parent level */
+ if('\0' != grandpa_topSUB[0]) {
+ struct topic_node *leaf = pub_hier_search(grandpa_topSUB, base,
+ mk_pubtop);
+ if(NULL == leaf)
+ return min;
+
+ base = leaf->dn_hier; /* nhbr root at parent level */
+ }
+
+ /* 2. If present, process parent as a leaf and get its down hierarchy */
+ subtop = parent_subtop;
+ if(('\0' != subtop[0]) && ('+' != subtop[0]) && ('/' != subtop[0])) {
+ uint16_t offset = mk_pubtop->offset; /* Refer to grandpa's pubtop */
+ uint16_t sublen = 0;
+
+ while(subtop[sublen]){sublen++;}
+
+ ack = proc_pub_tree_SUBtop(subtop, base, mk_pubtop, qos, usr_cl);
+ mk_pubtop->offset = offset; /* Restores grandpa's pubtop */
+
+ subtop[sublen] = '/'; /* Make parent's hier subtop */
+
+ base = nhbr_node_find(base, subtop);
+ if(base)
+ base = topbuf_cpy(mk_pubtop, subtop)?
+ base->dn_hier : NULL;
+ subtop[sublen] = '\0'; /* Get back, original subtop */
+ }
+
+ min = MIN(min, ack);
+ /* 3. Process '#' WC by walking thru entire sub-tree of parent 'base' */
+ if(NULL != base)
+ ack = proc_pub_tree_no_top(base, mk_pubtop, qos, usr_cl);
+
+ return MIN(min, ack);
+}
+
+static uint8_t proc_sub_ml_wc_tree(char *grandpa_topSUB, char *parent_subtop,
+ struct topic_node *base,
+ enum mqtt_qos qos, void *usr_cl)
+{
+ struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */};
+ uint32_t stack_ref = stack_idx;
+ uint8_t min = QOS_VALUE(qos);
+
+ if(NULL != base)
+ stack_add(base, (uint32_t)grandpa_topSUB, mk_pubtop.offset);
+
+ while(stack_ref < stack_idx) {
+ struct _node_stack *stack = stack_pop();
+ uint8_t ack;
+
+ mk_pubtop.offset = stack->val2;
+ ack = proc_sub_ml_wc_hier((char*)stack->val1, parent_subtop,
+ stack->node, &mk_pubtop, qos, usr_cl);
+ if(ack < min)
+ min = ack;
+ }
+
+ return min;
+}
+
+static uint8_t ml_wc_nodes_create(char *parent_topSUB, uint16_t toplen, uint8_t qid, void *usr_cl)
+{
+ struct topic_node *parent_leaf = NULL;
+
+ if('\0' != parent_topSUB[0]) {
+ parent_leaf = SUB_node_create(parent_topSUB, qid, usr_cl);
+ if(NULL == parent_leaf)
+ return QFL_VALUE;
+ }
+
+ /* Get the topic SUB to it's original state */
+ if(toplen > 1) parent_topSUB[toplen - 2] = '/';
+ parent_topSUB[toplen - 1] = '#';
+
+ if(NULL == SUB_node_create(parent_topSUB, qid, usr_cl)) {
+ /* Failed to create WC topic, so delete parent as well.
+ In this revision, 'parent_leaf' will not be a 'NULL'
+ at this juncture, nevertheless a check (for tools) */
+ if(parent_leaf)
+ node_delete(parent_leaf);
+
+ return QFL_VALUE;
+ }
+
+ return qid;
+}
+
+/* Process Multi-level Wildcard Topic SUBSCRIBE */
+static uint8_t proc_sub_ml_wildcard(char *topSUB, uint16_t toplen, enum mqtt_qos qos,
+ void *usr_cl)
+{
+ uint16_t len = 0, limit = MIN(toplen, MAX_SUBTOP_LEN);
+ char subtop[MAX_SUBTOP_LEN], *ptr;
+ uint8_t min = QOS_VALUE(qos);
+
+ /* 'topSUB': Need to create grandpa topic and parent-subtopic */
+ topSUB[toplen - 1] = '\0'; /* Remove '#' */
+ if(toplen > 1) /* Remove '/' */
+ topSUB[toplen - 2] = '\0';
+
+ do { /* Do processing to get parent sub-topic into buffer */
+ if('/' == topSUB[toplen - len - 1])
+ break; /* found '/' */
+
+ len++; /* Copy parent characters */
+ subtop[MAX_SUBTOP_LEN - len] = topSUB[toplen - len];
+ } while(len < limit);
+
+ if((toplen > len) && ('/' != topSUB[toplen - len - 1]))
+ return QFL_VALUE; /* Bad Length */
+
+ topSUB[toplen - len] = '\0'; /* End of grand-pa's topic name */
+ ptr = subtop + MAX_SUBTOP_LEN - len; /* Parent's leaf subtop */
+ min = proc_sub_ml_wc_tree(topSUB, ptr, root_node, qos, usr_cl);
+
+ /* Make branch-combo to complete processing of parent' topic */
+ strcpy(topSUB + toplen - len, ptr); // topSUB[toplen - len] = *ptr;
+
+ /* Create nodes for multi-level wildcard topic & it's parent */
+ min = ml_wc_nodes_create(topSUB, toplen, min, usr_cl);
+
+ return min;
+}
+
+/* Process Single-level Wildcard or No Wild Card Topic SUBSCRIBE */
+static
+uint8_t proc_sub_sl_or_no_wc(const char *topSUB, enum mqtt_qos qos, void *usr_cl)
+{
+ struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */};
+ uint8_t min = QOS_VALUE(qos);
+
+ /* For single level wildcard or absolute topic, find PUB nodes */
+ min = proc_pub_tree_SUBtop(topSUB, root_node, &mk_pubtop, qos, usr_cl);
+
+ if(NULL == SUB_node_create(topSUB, min, usr_cl))
+ min = QFL_VALUE;
+
+ return min;
+}
+
+static uint16_t proc_forward_slash(char *buf, uint16_t len)
+{
+ uint16_t i, j;
+ for(i = 1, j = 1; i < len; i++) {
+ char curr = buf[i];
+ if(('/' == curr) && (buf[i - 1] == curr))
+ continue; /* Drop consecutive '/' */
+
+ buf[j++] = curr;
+ }
+
+ if((1 != j) && ('/' == buf[j - 1]))
+ j--; /* Topic can not end with a '/' */
+
+ buf[j] = '\0';
+
+ return j;
+}
+
+static inline bool is_valid_char_order(char prev, char curr)
+{
+ return ((('/' != prev) && ('+' == curr)) ||
+ (('+' == prev) && ('/' != curr)) ||
+ (('/' != prev) && ('#' == curr)) ||
+ (('#' == prev)))? false : true;
+}
+
+
+static bool is_valid_SUB_top(const char *buf, uint16_t len)
+{
+ char prev, curr;
+ uint16_t i = 0;
+
+ if((0 == len) || ('\0' == *buf))
+ return false;
+
+ curr = buf[0];
+ for(i = 1; (i < len) && ('\0' != curr); i++) {
+ prev = curr;
+ curr = buf[i];
+
+ if(false == is_valid_char_order(prev, curr))
+ break;
+ }
+
+ return (i == len)? true : false;
+}
+
+static bool proc_sub_msg_rx(void *usr_cl, const struct utf8_strqos *qos_topics,
+ uint32_t n_topics, uint16_t msg_id, uint8_t *ack)
+{
+ int32_t i = 0;
+ for(i = 0; i < n_topics; i++) {
+ const struct utf8_strqos *qos_top = qos_topics + i;
+ enum mqtt_qos qos = qos_top->qosreq;
+ char *buf = (char*)qos_top->buffer;
+ uint16_t len = qos_top->length;
+
+ /* Remove zero-topics and trailing '/' from SUB top */
+ len = proc_forward_slash(buf, len);
+ ack[i] = QFL_VALUE;
+ if(false == is_valid_SUB_top(buf, len))
+ continue;
+
+ buf[len] = '\0'; /* Dirty trick, cheeky one */
+
+ ack[i] = ('#' == buf[len - 1])?
+ proc_sub_ml_wildcard(buf, len, qos, usr_cl) :
+ proc_sub_sl_or_no_wc(buf, qos, usr_cl);
+
+ DBG_INFO("SUB Topic%-2d %s is ACK'ed w/ 0x%02x\n\r",
+ i + 1, buf, ack[i]);
+ }
+
+ return true; /* Send SUB-ACK and do not close network */
+}
+
+static
+bool proc_sub_msg_rx_locked(void *usr_cl, const struct utf8_strqos *qos_topics,
+ uint32_t n_topics, uint16_t msg_id, uint8_t *ack)
+{
+ return proc_sub_msg_rx(usr_cl, qos_topics, n_topics, msg_id, ack);
+}
+
+static void leaf_un_sub(struct topic_node *leaf, void *usr_cl)
+{
+ uint8_t j = 0;
+ uint32_t map = cl_bmap_get(usr_cl);
+
+ for(j = 0; j < 3; j++) {
+ /* Client: clear QOS of existing sub, if any */
+ if(0 == (leaf->cl_map[j] & map))
+ continue;
+
+ leaf->cl_map[j] &= ~map;
+ cl_sub_count_del(usr_cl);
+
+ try_node_delete(leaf);
+
+ break;
+ }
+}
+
+static bool proc_un_sub_msg(void *usr_cl, const struct utf8_string *topics,
+ uint32_t n_topics, uint16_t msg_id)
+{
+ uint32_t i = 0;
+
+ for(i = 0; i < n_topics; i++) {
+ const struct utf8_string *topic = topics + i;
+ struct topic_node *leaf = NULL;
+ uint16_t len = topic->length;
+
+ /* The maximum length of 'work_buf' is same as that of RX buffer
+ in the PKT-LIB. Therefore, the WBUF_LEN is not being checked
+ against the length of the topic (a constituent of RX buffer).
+ */
+ strncpy(work_buf, topic->buffer, topic->length);
+ work_buf[len] = '\0';
+
+ if('#' == work_buf[len - 1]) { /* Multi-level Wildcard */
+ work_buf[len - 1] = '\0';
+ if(len > 1)
+ work_buf[len - 2] = '\0';
+
+ leaf = leaf_node_find(work_buf);
+ if(leaf)
+ leaf_un_sub(leaf, usr_cl);
+
+ if(len > 1)
+ work_buf[len - 2] = '/';
+ work_buf[len - 1] = '#';
+ }
+
+ leaf = leaf_node_find(work_buf);
+ if(leaf)
+ leaf_un_sub(leaf, usr_cl);
+ }
+
+ return true; /* Do not close network */
+}
+
+static
+bool proc_un_sub_msg_locked(void *usr_cl, const struct utf8_string *topics,
+ uint32_t n_topics, uint16_t msg_id)
+{
+ return proc_un_sub_msg(usr_cl, topics, n_topics, msg_id);
+}
+
+static void
+leaf_msg_send(const struct topic_node *leaf, const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len, bool dup, enum mqtt_qos qos,
+ bool retain)
+{
+ uint8_t qid = 0, fh_fgs = 0;
+
+ for(qid = 0; qid < 3; qid++) {
+ uint8_t map = leaf->cl_map[qid];
+ fh_fgs = MAKE_FH_FLAGS(dup, MIN(qid, QOS_VALUE(qos)), retain);
+
+ if(map)
+ pub_msg_send(topic, data_buf, data_len, fh_fgs, map);
+ }
+
+ if(enrolls_plugin(leaf))
+ plugin_publish(leaf->pg_map, topic, data_buf, data_len,
+ dup, qos, retain);
+
+ return;
+}
+
+static void node_data_set(struct topic_node *node, uint8_t *data,
+ uint32_t dlen, uint8_t qid, bool retain)
+{
+ node->my_data = data;
+ node->my_dlen = dlen;
+
+ node_qid_set(node, qid);
+ node_retain_set(node, retain);
+
+ return;
+}
+
+static bool node_data_update(struct topic_node *node, bool drop_qid0,
+ const uint8_t *data_buf, uint32_t data_len,
+ uint8_t qid, bool retain)
+{
+#define NODE_DATA_RESET_PARAMS NULL, 0, 0, false
+
+ /* Assumes that caller has provided either reset or valid params */
+
+ uint8_t *data = NULL;
+
+ if(node->my_dlen)
+ my_free(node->my_data);
+
+ /* Watch out for assignment in 'if' statement - avoid such smarts */
+ if((drop_qid0 && (0 == qid)) || (data_buf && !(data = (uint8_t*)my_malloc(data_len)))) {
+ node_data_set(node, NODE_DATA_RESET_PARAMS);
+ } else {
+
+ if(data)
+ buf_wr_nbytes(data, data_buf, data_len);
+
+ node_data_set(node, data, data_len, qid, retain);
+ }
+
+ return ((!!data) ^ (!!data_len))? false : true;
+}
+
+static inline bool is_wildcard_char(char c)
+{
+ return (('+' == c) || ('#' == c))? true : false;
+}
+
+static int32_t pub_topic_read(const struct utf8_string *topic, char *buf, uint32_t len)
+{
+ uint32_t i = 0;
+ uint32_t toplen = topic->length;
+
+ if(len < (toplen + 1))
+ return -1;
+
+ for(i = 0; i < toplen; i++) {
+ char c = topic->buffer[i];
+ if(is_wildcard_char(c))
+ return -1; /* Invalid: wildcard in PUB topic */
+
+ if('\0' == c)
+ return -1; /* Invalid: NUL char in PUB topic */
+
+ buf[i] = c;
+ }
+
+ buf[i] = '\0';
+
+ return i;
+}
+
+static
+void proc_sub_tree_topPUB(const char *topPUB, const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len, enum mqtt_qos qos,
+ bool retain)
+{
+ struct topic_node *leaf = NULL;
+ uint32_t stack_ref = stack_idx;
+
+ if(NULL != root_node)
+ stack_add(root_node, (uint32_t)topPUB, 0 /* Not used */);
+
+ while(stack_ref < stack_idx) {
+ struct _node_stack *stack = stack_pop();
+
+ /* Find leaf node of SUB that matches the PUB topic */
+ leaf = SUB_leaf_search((char*)stack->val1, stack->node);
+ if(leaf)
+ leaf_msg_send(leaf, topic, data_buf, data_len,
+ false, qos, retain);
+ }
+}
+
+static bool _proc_pub_msg_rx(void *usr_cl, const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len, uint8_t msg_id,
+ enum mqtt_qos qos, bool retain)
+{
+ int32_t err = -1;
+
+ /* Prior to msg processing, chk for topic or buffer errors */
+ if((pub_topic_read(topic, work_buf, WBUF_LEN) <= 0) ||
+ (proc_forward_slash(work_buf, topic->length) <= 0))
+ goto _proc_pub_msg_rx_exit;
+
+ /* If a valid MSG ID is specified for a QOS2 pkt, track it */
+ err = -2;
+ if((msg_id) &&
+ (MQTT_QOS2 == qos) &&
+ (false == cl_mgmt_qos2_pub_rx_update(usr_cl, msg_id)))
+ goto _proc_pub_msg_rx_exit;
+
+ /* Forward data to all subscribers of PUB topic in server */
+ proc_sub_tree_topPUB(work_buf, topic, data_buf,
+ data_len, qos, false);
+
+ err = 0;
+ if(retain) {
+ struct topic_node *leaf = topic_node_create(work_buf);
+ if((NULL == leaf) ||
+ (false == node_data_update(leaf, true, data_buf, data_len,
+ QOS_VALUE(qos), retain)))
+ err = -3; /* Resources no more available */
+
+ if(leaf)
+ try_node_delete(leaf);
+ }
+
+ _proc_pub_msg_rx_exit:
+ DBG_INFO("Processing of PUB message from %s (0x%08x) has %s (%d)\n\r",
+ usr_cl? "client" : "plugin", usr_cl? cl_bmap_get(usr_cl) : 0,
+ err? "failed" : "succeeded", err);
+
+ return (err < 0)? false : true;
+}
+
+static
+bool proc_pub_msg_rx_locked(void *usr_cl, const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id,
+ bool dup, enum mqtt_qos qos, bool retain)
+{
+ return _proc_pub_msg_rx(usr_cl, topic, data_buf, data_len,
+ msg_id, qos, retain);
+}
+
+static int32_t utf8_str_rd(const struct utf8_string *utf8, char *buf, uint32_t len)
+{
+ if((NULL == utf8) || (utf8->length > (len - 1)))
+ return -1;
+
+ buf_wr_nbytes((uint8_t*)buf, (uint8_t*)utf8->buffer, utf8->length);
+ buf[utf8->length] = '\0';
+
+ return utf8->length;
+}
+
+#define CONN_FLAGS_WQID_GET(conn_flags) \
+ ((conn_flags >> 3) & QID_VMASK) /* WILL QOS VAL */
+
+static uint16_t proc_connect_rx(void *ctx_cl, uint8_t conn_flags,
+ struct utf8_string * const *utf8_vec, void **usr_cl)
+{
+ struct topic_node *leaf = NULL;
+ struct utf8_string *utf8 = NULL;
+ void *app_cl = NULL;
+ uint16_t utf8_len = 0;
+ uint16_t rv = plugin_connect(MQC_UTF8_CLIENTID(utf8_vec),
+ MQC_UTF8_USERNAME(utf8_vec),
+ MQC_UTF8_PASSWORD(utf8_vec),
+ &app_cl);
+ if(rv)
+ goto proc_connect_rx_exit1; /* Admin did not permit connection */
+
+ rv = CONNACK_RC_SVR_UNAVBL; /* Server (resource) unavailable */
+
+ utf8 = MQC_UTF8_WILL_TOP(utf8_vec);
+ if(utf8 && utf8->length) {
+ utf8_str_rd(utf8, work_buf, WBUF_LEN);
+
+ leaf = topic_node_create(work_buf);
+ if(NULL == leaf)
+ goto proc_connect_rx_exit2;
+
+ if(false == node_data_update(leaf, false,
+ (uint8_t*)MQC_WILL_MSG_BUF(utf8_vec),
+ MQC_WILL_MSG_LEN(utf8_vec),
+ CONN_FLAGS_WQID_GET(conn_flags),
+ conn_flags & WILL_RETAIN_VAL))
+ goto proc_connect_rx_exit3;
+ }
+
+ utf8 = MQC_UTF8_CLIENTID(utf8_vec);
+ if(utf8)
+ utf8_len = utf8_str_rd(utf8, work_buf, WBUF_LEN);
+
+ rv = cl_connect_rx(ctx_cl, (conn_flags & CLEAN_START_VAL)? true : false,
+ utf8_len? work_buf : NULL, app_cl, leaf, usr_cl);
+ if(CONNACK_RC_REQ_ACCEPT == (rv & 0xFF)) {
+ if(leaf)
+ leaf->will_cl = *usr_cl;
+
+ return rv; /* Connection successful */
+ }
+
+ if(leaf)
+ node_data_update(leaf, true, NODE_DATA_RESET_PARAMS);
+
+ proc_connect_rx_exit3: try_node_delete(leaf);
+ proc_connect_rx_exit2: plugin_disconn(app_cl, true);
+ proc_connect_rx_exit1:
+ return rv;
+}
+
+static
+uint16_t proc_connect_rx_locked(void *ctx_cl, uint8_t conn_flags,
+ struct utf8_string * const *utf8_vec, void **usr_cl)
+{
+ return proc_connect_rx(ctx_cl, conn_flags, utf8_vec, usr_cl);
+}
+
+static void session_hier_delete(struct topic_node *node, void *usr_cl)
+{
+ struct topic_node *prev = NULL;
+ uint32_t cl_map = cl_bmap_get(usr_cl);
+
+ while(node) {
+ int32_t i = 0;
+ for(i = 0; i < 3; i++) {
+ if(node->cl_map[i] & cl_map) {
+ node->cl_map[i] &= ~cl_map;
+ cl_sub_count_del(usr_cl);
+ /* Client/Topic/QID 1-to-1 map */
+ break;
+ }
+ }
+
+ if(node->dn_nhbr)
+ stack_add(node->dn_nhbr, 0, 0);
+
+ prev = node;
+ node = node->dn_hier;
+ }
+
+ if(prev)
+ try_node_delete(prev);
+}
+
+void session_tree_delete(void *usr_cl)
+{
+ uint32_t stack_ref = stack_idx;
+
+ if(NULL != root_node)
+ stack_add(root_node, 0, 0);
+
+ while(stack_ref < stack_idx) {
+ struct _node_stack *stack = stack_pop();
+ session_hier_delete(stack->node, usr_cl);
+ }
+}
+
+static void proc_client_will(struct topic_node *leaf)
+{
+ uint32_t wbuf_len = WBUF_LEN - 1; /* Make space for '\0' in wbuf */
+ uint32_t offset = wbuf_len;
+ struct utf8_string topic;
+ struct topic_node *node;
+
+ work_buf[offset] = '\0'; /* Ensures wbuf is NUL terminated */
+ node = leaf;
+
+ /* Prepare a topic string by walking back from leaf to root */
+ do {
+ if(offset < node->toplen)
+ return;
+
+ offset -= node->toplen;
+ strncpy(work_buf + offset, node->subtop, node->toplen);
+
+ while(node->up_nhbr)
+ node = node->up_nhbr;
+
+ node = node->up_hier;
+
+ } while(node);
+
+ topic.buffer = work_buf + offset;
+ topic.length = wbuf_len - offset;
+
+#define MK_QOS_ENUM(qid) ((enum mqtt_qos)(qid & QID_VMASK))
+
+ proc_sub_tree_topPUB((char*)topic.buffer,
+ &topic, leaf->my_data, leaf->my_dlen,
+ MK_QOS_ENUM(node_qid_get(leaf)),
+ is_node_retain(leaf));
+
+}
+
+static void on_cl_net_close(void *usr_cl, bool due2err)
+{
+ struct topic_node *leaf = NULL;
+ void *app_cl = NULL;
+
+ /* See if client has a WILL that it intends to broadcast */
+ leaf = (struct topic_node*) cl_will_hndl_get(usr_cl);
+ if(NULL != leaf) {
+ if(usr_cl != leaf->will_cl)
+ return; /* Mismatch: should never happen */
+
+ if(due2err)
+ proc_client_will(leaf); /* pls broadcast */
+
+ /* Network is closing, so cleanup WILL msg store */
+ node_data_update(leaf, true, NODE_DATA_RESET_PARAMS);
+ leaf->will_cl = NULL;
+ try_node_delete(leaf);
+ }
+
+ /* If not needed for future, delete session info */
+ if(cl_can_session_delete(usr_cl))
+ session_tree_delete(usr_cl);
+
+ /* Inform app that client has been disconnected */
+ app_cl = cl_app_hndl_get(usr_cl);
+ plugin_disconn(app_cl, due2err);
+
+ cl_on_net_close(usr_cl);
+}
+
+static
+void on_cl_net_close_locked(void *usr_cl, bool due2err)
+{
+ on_cl_net_close(usr_cl, due2err);
+ return;
+}
+
+static void on_connack_send(void *usr_cl, bool clean_session)
+{
+ /* If asserted, then need to start w/ clean state */
+ if(clean_session)
+ session_tree_delete(usr_cl);
+
+ cl_on_connack_send(usr_cl, clean_session);
+
+ return;
+}
+
+static
+void on_connack_send_locked(void *usr_cl, bool clean_session)
+{
+ on_connack_send(usr_cl, clean_session);
+ return;
+}
+
+static
+bool proc_notify_ack_locked(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
+{
+ return cl_notify_ack(usr_cl, msg_type, msg_id);
+}
+
+static int32_t proc_topic_enroll(uint8_t pg_id, const struct utf8_string *topic,
+ enum mqtt_qos qos)
+{
+ struct topic_node *leaf = NULL;
+ uint16_t len = 0;
+
+ if((NULL == topic) || (NULL == topic->buffer) || (0 == topic->length))
+ return -1;
+
+ if(WBUF_LEN < (topic->length + 1))
+ return -2;
+
+ len = topic->length;
+ strncpy(work_buf, topic->buffer, len);
+ work_buf[len] = '\0';
+
+ leaf = topic_node_create(work_buf);
+ if(NULL == leaf)
+ return -3;
+
+ PG_MAP_VAL_SETUP(leaf->pg_map, QOS_VALUE(qos), pg_id);
+
+ return 0;
+}
+
+static
+int32_t proc_topic_enroll_locked(uint8_t pg_id, const struct utf8_string *topic,
+ enum mqtt_qos qos)
+{
+ int32_t rv = 0;
+
+ MUTEX_LOCKIN();
+ rv = proc_topic_enroll(pg_id, topic, qos);
+ MUTEX_UNLOCK();
+
+ return rv;
+}
+
+static int32_t proc_topic_cancel(uint8_t pg_id, const struct utf8_string *topic)
+{
+ struct topic_node *leaf = NULL;
+ uint16_t len = 0;
+
+ if(NULL == topic)
+ return -1;
+
+ if(WBUF_LEN < (topic->length + 1))
+ return -2;
+
+ len = topic->length;
+ strncpy(work_buf, topic->buffer, len);
+ work_buf[len] = '\0';
+
+ leaf = leaf_node_find(work_buf);
+ if(NULL == leaf)
+ return -2;
+
+ PG_MAP_VAL_RESET(leaf->pg_map, pg_id);
+
+ try_node_delete(leaf);
+
+ return 0;
+}
+
+static
+int32_t proc_topic_cancel_locked(uint8_t pg_id, const struct utf8_string *topic)
+{
+ int32_t rv = 0;
+
+ MUTEX_LOCKIN();
+ rv = proc_topic_cancel(pg_id, topic);
+ MUTEX_UNLOCK();
+
+ return rv;
+}
+
+static
+int32_t proc_app_pub_send_locked(const struct utf8_string *topic, const uint8_t *data_buf,
+ uint32_t data_len, enum mqtt_qos qos, bool retain)
+{
+ bool rv;
+
+ MUTEX_LOCKIN();
+ /* Received from application, process topic for distribution */
+ rv = _proc_pub_msg_rx(NULL, topic, data_buf, data_len,
+ 0x00, qos, retain);
+ MUTEX_UNLOCK();
+
+ return rv? (int32_t)data_len : -1;
+}
+
+int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg,
+ const struct mqtt_server_app_cfg *app_cfg)
+{
+ /* If mutex is specified, then the following set of callbacks
+ are invoked in the locked state - enumerated by 'locked' */
+ struct mqtt_server_msg_cbs pkts_cbs = {proc_connect_rx_locked,
+ proc_sub_msg_rx_locked,
+ proc_un_sub_msg_locked,
+ proc_pub_msg_rx_locked,
+ proc_notify_ack_locked,
+ on_cl_net_close_locked,
+ on_connack_send_locked};
+
+ struct plugin_core_msg_cbs core_cbs = {proc_topic_enroll_locked,
+ proc_topic_cancel_locked,
+ proc_app_pub_send_locked};
+
+ util_params_set(lib_cfg->debug_printf,
+ lib_cfg->mutex,
+ lib_cfg->mutex_lockin,
+ lib_cfg->mutex_unlock);
+
+ USR_INFO("Version: Server LIB %s, Common LIB %s.\n\r",
+ MQTT_SERVER_VERSTR, MQTT_COMMON_VERSTR);
+
+ topic_node_init();
+
+ cl_mgmt_init();
+
+ plugin_init(&core_cbs);
+
+ mqtt_server_lib_init(lib_cfg, &pkts_cbs);
+
+ return 0;
+}
+
+}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 server_core.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_core.h Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,274 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+
+#ifndef __SERVER_CORE_H__
+#define __SERVER_CORE_H__
+
+#include "server_pkts.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+namespace mbed_mqtt {
+
+/**@file server_core.h
+ The MQTT server daemon, a task, provisions the high level abstractions for the
+ smart applicaltions. This is an intelligent layer that utilizes the services
+ of the MQTT Server Packet Library and is responsible for the functions of the
+ topic management, the client management and support of multiple server
+ applications.
+
+ The light-weight server enables the services of the MQTT protocol for an
+ application to either extend and / or control the existing functions to suit
+ the deployment specific scenario. These applications in practice are the
+ plug-ins / add-ons to the core server functionalities. Specifically,
+ these applications, among others capabilities, can be used for analyzing and
+ approving the credentials of the remote clients, acting as a bridge between
+ a MQTT external client and the server, and a snooper to learn about all the
+ data transactions to / from the server.
+
+ The server is targeted to conform to MQTT 3.1.1 specification.
+
+ The services of the server are multi-task safe. Platform specific atomicity
+ constructs are used, through abstractions, by the server to maintain data
+ coherency and synchronization.
+
+ The server offers the following compile line configurable parameters (-D opt)
+ - <b> CFG_SR_MAX_MQP_TX_LEN: </b> the constant buffer length allocated for a TX.
+ \n\n
+ - <b> CFG_SR_MAX_SUBTOP_LEN: </b> the maximum buffer size to hold a sub-topic.
+ For e.g., in the topic /x/y/z, the phrase /x, /y and z are sub-topics.
+ \n\n
+ - <b> CFG_SR_MAX_TOPIC_NODE: </b> the maximum number of topic nodes in server.
+ For e.g., in the topic /x/y/z, there are three nodes (/x, /y and z).
+ \n\n
+ - <b> CFG_SR_MAX_CL_ID_SIZE: </b> the maximum length of the client-id string.
+ \n\n
+ - <b> CFG_SR_MAX_NUM_CLIENT: </b> the maximum number of clients to be managed.
+ Note this is different from the maximum number of 'contexts'. A large number
+ of clients can be managed using fewer number of 'contexts' (connections).
+ \n\n
+
+ @note Any future extensions & development must follow the following guidelines.
+ A new API or an extension to the existing API
+ a) must be rudimentary
+ b) must not imply a rule or policy (including a state machine)
+ b) must ensure simple design and implementation
+*/
+
+
+/** @defgroup server_daemon The Server Daemon API(s)
+ @{
+*/
+
+struct mqtt_server_app_cbs {
+
+ /** Connection request from remote client - assess credentials.
+ This routine presents, to the application, the information about the
+ credentials of the remote client that is trying to establish a MQTT
+ connection with the server. The application shall utilize these data
+ in conjunction with its policy to either allow or deny connection to
+ the server.
+
+ Should the application choose to allow the remote client to establish
+ a MQTT connection, then it must provide in 'app-user' (place-holder),
+ a handle that refers to the user of this connection. The server will
+ provide this handle in follow-up routines to enable the application
+ to refer to the associated connection in its implementation.
+
+ @param[in] client_id UTF8 based ID of the remote client - is set to
+ NULL to indicate a zero-length client id.
+ @param[in] user_name UTF8 based user-name provided by the remote
+ client. It is set to NULL if the client did not provide an user name
+ @param[in] pass_word UTF8 based pass-word provided by the remote
+ client. It is set to NULL if the client did not provide a pass-word
+ @param[out] app_usr place-holder for application to provide a handle
+ to the user of this specific connection / client.
+
+ @return 16bit value for the variable header in the CONNACK message.
+ The MSB in the return value refers to the 8bit parameter of the
+ acknowledge flags and must be set 0. The LSB in the return value
+ refers to the 8bit 'return code' parameter in the CONNACK message
+ and must be set accordingly.
+ */
+ uint16_t (*connect)(const struct utf8_string *client_id,
+ const struct utf8_string *user_name,
+ const struct utf8_string *pass_word,
+ void **app_usr);
+
+ /** Indicate a PUBLISH message from the network.
+ This routine presents, to the application, the topic and the related
+ data along with other qualifiers published by one of the clients
+ associated with this server. The application must enroll with the
+ server the particular topic for which the data should be notified.
+
+ @param[in] topic UTF8 topic Name for which data has been published
+ @param[in] data_buf the published binary data for the topic
+ @param[in] data_len the length of the binary data
+ @param[in] dup is this a duplicate data from remote client?
+ @param[in] qos quality of service of the message / data
+ @param[in] retain should the server retain the data?
+
+ @return none
+ */
+ void (*publish)(const struct utf8_string *topic,
+ const uint8_t *payload, uint32_t pay_len,
+ bool dup, uint8_t qos, bool retain);
+
+ /** Notify disconnection to the remote client.
+ This routine is invoked by the server to declare to the application
+ to a particular client has been terminated and follow-up, if needed,
+ can now commence. This routine aids the application by indicating if
+ an error condition had caused the disconnection.
+
+ @param[in] app_usr handle to refer to the user of the connection in
+ the application
+ @param[in] due2err has the connection been closed due to an error?
+
+ @return none
+ */
+ void (*disconn)(const void *app_usr, bool due2err);
+};
+
+/** Enroll with server to receive data for a topic
+ This routine registers with the server, the specified topic for which the
+ application should receive any published data from the network. Whenever,
+ any of the remote clients that are connected to the server or applications,
+ this or other, publishes data for the specified topic, the server will
+ present the published information to this application.
+
+ As part of the enrollment, the application should provide the maxmimum QoS
+ with which the server should provide the published data. If the topic data
+ received by the server bears a QoS higher than the one specified by the
+ application, the server shall lower it to the QoS level preferred by the
+ application. Otherwise, the QoS of the data shall be presented 'as-is'. In
+ other words, the application should expect a published data with a lower QoS.
+
+ @param[in] app_hnd handle to the application context in the server. This
+ handle is provided by server @ref mqtt_server_app_register()
+ @param[in] topic UTF8 based string for which the application needs to start
+ getting the published data
+ @param[in] qos the meximum QoS the application intends to receive data for
+ this particular topic
+
+ @return 0 on sucess, otherwise a negative value on error.
+*/
+int32_t mqtt_server_topic_enroll(const void *app_hnd,
+ const struct utf8_string *topic, enum mqtt_qos qos);
+
+/** Cancel previous enrollment to receive data for a topic
+ This routines terminates the previous registration, if any, made by the
+ application to receive any publishised data for the specified topic. Once,
+ the enrollment is removed, the application, there after, will not receive
+ any data for this topic from the server.
+
+ @param[in] app_hnd handle to the application context in the server. This
+ handle is provided by server @ref mqtt_server_app_register()
+ @param[in] topic UTF8 based string for which the application needs to stop
+ getting the published data
+
+ @return 0 on success, otherwise a negative value on error.
+*/
+int32_t mqtt_server_topic_disenroll(const void *app_hnd,
+ const struct utf8_string *topic);
+
+/** Send data to network for a topic
+ This routine offers the binary data along-with associated properties for a
+ specific topic to the server. The server, based on the subscriptions from the
+ remote clients and the enrollments made by the local applications for this
+ topic, will send the binary data and its qualifiers, adjusted for the maximum
+ subscribed or enrolled QoS, to the remote clients and the local applications.
+
+ @param[in] topic UTF8 topic Name for which data has been published
+ @param[in] data_buf the published binary data for the topic
+ @param[in] data_len the length of the binary data
+ @param[in] qos quality of service of the message / data
+ @param[in] retain should the server retain the data?
+
+ @return on success, the length of data sent, otherwise -1 on error.
+*/
+int32_t mqtt_server_app_pub_send(const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len,
+ enum mqtt_qos qos, bool retain);
+
+/** Register an application with the server.
+ This routine makes known to the server about an application identified by
+ its name and creates a context / reference for the application in the
+ server.
+
+ An application intending to utlize the servicse of the MQTT server must be
+ first registered with the server.
+
+ @param[in] cbs callback routines from the application to be invoked by the
+ server
+ @param[in] name refers to the name of the application. The application must
+ retain the memory used by the 'name' after the function call. The server
+ does not copy the name into its internal buffers.
+
+ @return a valid handle to context of application in the server, othewise
+ NULL on error
+*/
+void *mqtt_server_app_register(const struct mqtt_server_app_cbs *cbs,
+ const char *name);
+
+
+/** Configuration for the applications that utilize the MQTT Server Daemon.
+ At the moment this configuration is not used and has been incorporated
+ to support forward compatibility (future use)
+*/
+struct mqtt_server_app_cfg {
+
+ void *place_holder; /**< Dummy, not used as of now */
+};
+
+/** Initialize the MQTT Server (Task / Daemon).
+ This routine initializes the server implementation and sets it up using
+ the provided configuration. The server implementation must be initialized
+ prior to invoking of any other routine or service.
+
+ This routine should be invoked as part of the platform initialization.
+
+ @note This routine must be invoked only once in an run of the system. This
+ routine internally invokes the @ref mqtt_server_lib_init( ) and therefore,
+ there is no need to invoke the @ref mqtt_server_lib_init( ) explicitly.
+
+ The server needs to be in a state to listen to incoming MQTT connection
+ requests. Therefore, the platform sequence after provisioning the buffer
+ using the API @ref mqtt_server_register_net_svc, must invoke the API @ref
+ mqtt_server_run, in an infinite loop, to keep the server daemon active.
+
+ @param[in] lib_cfg configuration information for the MQTT server packet
+ library.
+ @param[in] app_cfg configuration information for the server applications.
+
+ @return 0 on success, otherwise -1 on error
+*/
+int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg,
+ const struct mqtt_server_app_cfg *app_cfg);
+
+/** @} */ /* End of server_daemon */
+
+}//namespace mbed_mqtt
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
+
diff -r 000000000000 -r 547251f42a60 server_pkts.cpp
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_pkts.cpp Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,1000 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_pkts.h"
+
+namespace mbed_mqtt {
+
+/*-----------------------------------------------------------------------------
+ * Note: Do not create additional dependency of this file on any header other
+ * than server_pkts.h. Specifically, server_pkts.[hc] in conjunction with the
+ * mqtt_common.[hc] files must be facilitated to create a stand-alone library.
+ *-----------------------------------------------------------------------------
+ */
+
+static void *mutex = NULL;
+static void (*mutex_lockin)(void*) = NULL;
+static void (*mutex_unlock)(void*) = NULL;
+
+#define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex);
+#define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex);
+
+static bool aux_dbg_enbl = false;
+static int32_t (*debug_printf)(const char *fmt, ...) = NULL;
+
+#define USR_INFO debug_printf
+#define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__)
+
+static int32_t mqp_buf_rd_utf8(const uint8_t *buf, const uint8_t *end,
+ struct utf8_string *utf8)
+{
+ const uint8_t *ref = buf; /* Reference */
+ uint16_t len = 0; /* UTF8 Size */
+
+ if(end - buf < 2)
+ return -1; /* No valid buffer to hold UTF8 size */
+
+ buf += buf_rd_nbo_2B(buf, &len);
+ if(end - buf < len)
+ return -1; /* No valid buffer to hold UTF8 name */
+
+ utf8->length = len;
+ utf8->buffer = len? (char*)buf : NULL;
+
+ return buf + len - ref;
+}
+
+static struct mqtt_server_msg_cbs usr_obj, *usr_cbs = NULL;
+static const struct device_net_services *net_ops = NULL;
+
+#ifndef CFG_SR_MQTT_CTXS
+#define MAX_NWCONN 6
+#else
+#define MAX_NWCONN CFG_SR_MQTT_CTXS
+#endif
+
+static struct client_ctx contexts[MAX_NWCONN];
+
+static struct client_ctx *used_ctxs = NULL;
+static struct client_ctx *free_ctxs = NULL;
+
+
+#define NETWORK_CLOSE_FLAG 0x00200000
+#define NW_CONN_ERROR_FLAG 0x00400000
+#define RCVD_CONN_MSG_FLAG 0x00800000
+
+#define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG)
+
+static void cl_ctx_init(void)
+{
+ int32_t i = 0;
+ for(i = 0; i < MAX_NWCONN; i++) {
+ struct client_ctx *cl_ctx = contexts + i;
+
+ cl_ctx_reset(cl_ctx);
+
+ cl_ctx->next = free_ctxs;
+ free_ctxs = cl_ctx;
+ }
+}
+
+static void cl_ctx_free(struct client_ctx *cl_ctx)
+{
+ cl_ctx_reset(cl_ctx);
+
+ cl_ctx->next = free_ctxs;
+ free_ctxs = cl_ctx;
+
+ return;
+}
+
+static struct client_ctx *cl_ctx_alloc(void)
+{
+ struct client_ctx *cl_ctx = free_ctxs;
+ if(cl_ctx) {
+ free_ctxs = cl_ctx->next;
+ cl_ctx->next = NULL;
+ } else
+ USR_INFO("S: fatal, no free cl_ctx\n\r");
+
+ return cl_ctx;
+}
+
+static inline bool had_rcvd_conn_msg(struct client_ctx *cl_ctx)
+{
+ return (cl_ctx->flags & RCVD_CONN_MSG_FLAG);
+}
+
+static inline void set_rcvd_conn_msg(struct client_ctx *cl_ctx)
+{
+ cl_ctx->flags |= RCVD_CONN_MSG_FLAG;
+}
+
+static void used_ctxs_insert(struct client_ctx *cl_ctx)
+{
+ cl_ctx_timeout_insert(&used_ctxs, cl_ctx);
+}
+
+static void used_ctxs_remove(struct client_ctx *cl_ctx)
+{
+ cl_ctx_remove(&used_ctxs, cl_ctx);
+}
+
+static int32_t loopb_net = -1;
+static const uint8_t LOOP_DATA[] = {0x00, 0x01};
+#define LOOP_DLEN sizeof(LOOP_DATA)
+static uint16_t loopback_port = 0;
+static bool pending_trigs = false;
+
+static int32_t loopb_trigger(void)
+{
+ uint8_t ip_addr[] = {127,0,0,1};
+ int32_t rv = 0;
+
+ if((-1 != loopb_net) && (false == pending_trigs)) {
+ rv = net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN,
+ loopback_port, ip_addr, 4);
+ if(0 == rv)
+ pending_trigs = true;
+ }
+
+ return rv;
+}
+
+static void do_net_close_rx(struct client_ctx *cl_ctx, bool due2err)
+{
+ DBG_INFO("S: RX closing Net %d ...\n\r", (int32_t)cl_ctx->net);
+
+ net_ops->close(cl_ctx->net);
+ cl_ctx->net = -1;
+
+ if(cl_ctx->usr)
+ usr_cbs->on_cl_net_close(cl_ctx->usr, due2err);
+
+ used_ctxs_remove(cl_ctx);
+ cl_ctx_free(cl_ctx);
+}
+
+static void do_net_close_tx(struct client_ctx *cl_ctx, bool due2err)
+{
+ if(due2err)
+ cl_ctx->flags |= NW_CONN_ERROR_FLAG;
+
+ cl_ctx->flags |= NETWORK_CLOSE_FLAG;
+
+ loopb_trigger();
+}
+
+static int32_t cl_ctx_send(struct client_ctx *cl_ctx, uint8_t *buf, uint32_t len)
+{
+ int32_t rv = net_ops->send(cl_ctx->net, buf, len, NULL);
+ if(rv <= 0) {
+ do_net_close_tx(cl_ctx, true);
+ rv = MQP_ERR_NETWORK;
+ }
+
+ USR_INFO("S: FH-B1 0x%02x, len %u to net %d: %s\n\r",
+ *buf, len, cl_ctx->net, rv? "Sent" : "Fail");
+ return rv;
+}
+
+static
+int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, enum mqtt_qos qos,
+ bool has_vh, uint16_t vh_data)
+{
+ uint8_t buf[4];
+ uint32_t len = 2;
+
+ if(false == had_rcvd_conn_msg(cl_ctx))
+ return MQP_ERR_NOTCONN;
+
+ buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false));
+ buf[1] = has_vh ? 2 : 0;
+
+ if(has_vh)
+ len += buf_wr_nbo_2B(buf + 2, vh_data);
+
+ return cl_ctx_send(cl_ctx, buf, len);
+}
+
+static
+int32_t _mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
+ uint16_t vh_data)
+{
+ struct client_ctx *cl_ctx = (struct client_ctx*) ctx_cl;
+
+ return cl_ctx? vh_msg_send((client_ctx*)ctx_cl, msg_type, qos,
+ has_vh, vh_data) : -1;
+}
+
+int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos, bool has_vh,
+ uint16_t vh_data)
+{
+ return _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
+}
+
+int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos,
+ bool has_vh, uint16_t vh_data)
+{
+ int32_t rv;
+
+ MUTEX_LOCKIN();
+ rv = _mqtt_vh_msg_send(ctx_cl, msg_type, qos, has_vh, vh_data);
+ MUTEX_UNLOCK();
+
+ return rv;
+}
+
+int32_t mqtt_connack_send(void *ctx_cl, uint8_t *vh_buf)
+{
+ struct client_ctx *cl_ctx = (struct client_ctx *) ctx_cl;
+
+ int32_t rv = vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0,
+ true, (vh_buf[0] << 8) | vh_buf[1]);
+
+ if((rv > 0) && (0x00 != vh_buf[1]))
+ do_net_close_tx(cl_ctx, true);
+
+ return rv;
+}
+
+static
+int32_t _mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
+{
+ int32_t rv = 0;
+ uint8_t *buf = MQP_FHEADER_BUF(mqp);
+
+ if(dup)
+ *buf |= DUP_FLAG_VAL(true);
+
+ rv = cl_ctx_send((struct client_ctx*)ctx_cl, buf, MQP_CONTENT_LEN(mqp));
+
+ *buf &= ~DUP_FLAG_VAL(true);
+
+ return rv;
+}
+
+int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
+{
+ return _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
+}
+
+int32_t
+mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp, bool dup)
+{
+ int32_t rv;
+
+ MUTEX_LOCKIN();
+ rv = _mqtt_server_pub_dispatch(ctx_cl, mqp, dup);
+ MUTEX_UNLOCK();
+
+ return rv;
+}
+
+#define MQP_MAX_TOPICS 16
+#define MQP_SUBACK_PAY_OFS (MAX_FH_LEN + 2)
+
+static int32_t sub_ack_send(struct client_ctx *cl_ctx, uint8_t *buf, uint8_t pay_ofs,
+ uint32_t pay_len, uint16_t msg_id)
+{
+ uint8_t *ref = buf += MAX_FH_LEN;
+
+ if(MQP_SUBACK_PAY_OFS != pay_ofs)
+ return MQP_ERR_PKT_LEN;
+
+ buf += buf_wr_nbo_2B(buf, msg_id);
+ ref -= mqp_buf_tail_wr_remlen(ref - MAX_REMLEN_BYTES,
+ pay_len + buf - ref);
+
+ ref -= 1;
+ *ref = MAKE_FH_BYTE1(MQTT_SUBACK,
+ MAKE_FH_FLAGS(false, MQTT_QOS0, false));
+
+ return cl_ctx_send(cl_ctx, ref, pay_len + buf - ref);
+}
+
+static inline int32_t unsub_ack_send(struct client_ctx *cl_ctx, uint16_t msg_id)
+{
+ return vh_msg_send(cl_ctx, MQTT_UNSUBACK, MQTT_QOS0, true, msg_id);
+}
+
+/*----------------------------------------------------------------------------
+ * Receive Routines
+ *----------------------------------------------------------------------------
+ */
+
+/* Candidate to be moved to mqtt_common.c file */
+static bool mqp_proc_vh_msg_id_rx(struct mqtt_packet *mqp_raw)
+{
+ uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
+
+ if(mqp_raw->pl_len < 2)
+ return false; /* Bytes for MSG ID not available */
+
+ buf += buf_rd_nbo_2B(buf, &mqp_raw->msg_id);
+ mqp_raw->vh_len += 2;
+ mqp_raw->pl_len -= 2;
+
+ return true;
+}
+
+#define BRK_IF_RD_ERR_UTF8(buf, end, utf8) \
+ if(rd_buf_utf8(buf, end, utf8) < 0) \
+ break;
+
+static int32_t buf_rd_utf8_qos(uint8_t *buf, uint8_t *end, struct utf8_strqos *utf8_qos)
+{
+ struct utf8_string utf8;
+ uint8_t *ref = buf;
+
+ buf += mqp_buf_rd_utf8(buf, end, &utf8);
+
+ /* Assess that UTF8 has been read and QOS can be read */
+ if((buf > ref) && (end > buf)) {
+ utf8_qos->buffer = utf8.buffer;
+ utf8_qos->length = utf8.length;
+ utf8_qos->qosreq = (enum mqtt_qos)*buf++;
+
+ return buf - ref;
+ }
+
+ return -1;
+}
+
+static bool _proc_sub_msg_rx(struct mqtt_packet *mqp_raw,
+ struct utf8_strqos *qos_topics, uint32_t *n_topics)
+{
+ uint8_t *buf, *end;
+ uint32_t i = 0;
+
+ if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
+ return false; /* Problem in contents received from client */
+
+ buf = MQP_PAYLOAD_BUF(mqp_raw);
+ end = buf + mqp_raw->pl_len;
+
+ for(i = 0; (i < *n_topics) && (buf < end); i++) {
+ struct utf8_strqos *qos_top = qos_topics + i;
+ int32_t len = buf_rd_utf8_qos(buf, end, qos_top);
+ if(len < 0)
+ break; /* Failed to read Topic */
+
+ buf += len;
+ }
+
+ *n_topics = i;
+
+ return ((0 == i) || (buf != end))? false : true;
+}
+
+static
+bool proc_sub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+ uint32_t n_topics = MQP_MAX_TOPICS;
+ uint16_t msg_id;
+
+ struct utf8_strqos qos_topics[MQP_MAX_TOPICS];
+ uint8_t ack[MQP_MAX_TOPICS + MQP_SUBACK_PAY_OFS];
+
+ if(false == _proc_sub_msg_rx(mqp_raw, qos_topics, &n_topics))
+ return false;
+
+ msg_id = mqp_raw->msg_id;
+
+ /* All topics have been now put in array, pass-on info to upper layer */
+ if(usr_cbs->sub_msg_rx(cl_ctx->usr, qos_topics, n_topics,
+ msg_id, ack + MQP_SUBACK_PAY_OFS)) {
+
+ sub_ack_send(cl_ctx, ack, MQP_SUBACK_PAY_OFS, n_topics, msg_id);
+
+ return true;
+ }
+
+ return false;
+}
+
+
+static bool _proc_unsub_msg_rx(struct mqtt_packet *mqp_raw,
+ struct utf8_string *topics, uint32_t *n_topics)
+{
+ uint8_t *buf, *end;
+ uint32_t i = 0;
+
+ if(false == mqp_proc_vh_msg_id_rx(mqp_raw))
+ return false; /* Problem in contents received from client */
+
+ buf = MQP_PAYLOAD_BUF(mqp_raw);
+ end = buf + mqp_raw->pl_len;
+
+ for(i = 0; (i < *n_topics) && (buf < end); i++) {
+ struct utf8_string *topic = topics + i;
+ int32_t len = mqp_buf_rd_utf8(buf, end, topic);
+ if(len < 0)
+ break; /* Failed to read Topic */
+
+ buf += len;
+ }
+
+ *n_topics = i;
+
+ return ((0 == i) || (buf != end))? false : true;
+}
+
+static
+bool proc_unsub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+ uint32_t n_topics = MQP_MAX_TOPICS;
+ uint16_t msg_id;
+
+ struct utf8_string topics[MQP_MAX_TOPICS];
+
+ if(false == _proc_unsub_msg_rx(mqp_raw, topics, &n_topics))
+ return false;
+
+ msg_id = mqp_raw->msg_id;
+
+ /* All topics have been now put in array, pass-on info to upper layer */
+ if(usr_cbs->un_sub_msg(cl_ctx->usr, topics, n_topics, msg_id)) {
+ unsub_ack_send(cl_ctx, msg_id);
+ return true;
+ }
+
+ return false;
+}
+
+static bool proc_pingreq_rx(struct client_ctx *cl_ctx)
+{
+ vh_msg_send(cl_ctx, MQTT_PINGRSP, MQTT_QOS0, false, 0x00);
+ return true;
+}
+
+static bool proc_disconn_rx(struct client_ctx *cl_ctx)
+{
+ do_net_close_rx(cl_ctx, false);
+ return true;
+}
+
+static
+bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+ bool rv = mqp_proc_pub_rx(mqp_raw);
+ uint8_t B = mqp_raw->fh_byte1;
+ enum mqtt_qos qos = ENUM_QOS(B);
+ struct utf8_string topic;
+ uint16_t msg_id = 0;
+
+ if(false == rv)
+ return rv; /* Didn't get a good PUB Packet */
+
+ msg_id = mqp_raw->msg_id;
+
+ topic.buffer = (char*)MQP_PUB_TOP_BUF(mqp_raw);
+ topic.length = MQP_PUB_TOP_LEN(mqp_raw);
+
+ rv = usr_cbs->pub_msg_rx(cl_ctx->usr, &topic,
+ MQP_PUB_PAY_BUF(mqp_raw),
+ MQP_PUB_PAY_LEN(mqp_raw),
+ msg_id, BOOL_DUP(B), qos,
+ BOOL_RETAIN(B));
+ if(false == rv)
+ return rv;
+
+ if(MQTT_QOS1 == qos)
+ vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id);
+
+ if(MQTT_QOS2 == qos)
+ vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id);
+
+ return rv;
+}
+
+static
+bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+ if((false == mqp_proc_msg_id_ack_rx(mqp_raw, false)) ||
+ (false == usr_cbs->ack_notify(cl_ctx->usr,
+ mqp_raw->msg_type,
+ mqp_raw->msg_id)))
+ return false;
+
+ return true;
+}
+
+#define IO_MON_NO_TIMEOUT (0xFFFFFFFF) // TBD
+//#define KA_TIMEOUT_NONE (0xFFFFFFFF)
+
+static void rx_timeout_update(struct client_ctx *cl_ctx)
+{
+ if(false == had_rcvd_conn_msg(cl_ctx))
+ return;
+
+ cl_ctx_timeout_update(cl_ctx, net_ops->time());
+
+ used_ctxs_remove(cl_ctx);
+ used_ctxs_insert(cl_ctx);
+
+ return;
+}
+
+static bool proc_protocol_info(struct utf8_string *utf8, uint8_t ver)
+{
+ const char *buf = utf8->buffer;
+
+ /* Check for protocol version 3.1.1 */
+ if((4 == utf8->length) &&
+ (buf[0] == 'M') &&
+ (buf[1] == 'Q') &&
+ (buf[2] == 'T') &&
+ (buf[3] == 'T') &&
+ (0x04 == ver))
+ return true;
+
+ /* Check for protocol version 3.1 */
+ if((6 == utf8->length) &&
+ (buf[0] == 'M') &&
+ (buf[1] == 'Q') &&
+ (buf[2] == 'I') &&
+ (buf[3] == 's') &&
+ (buf[4] == 'd') &&
+ (buf[5] == 'p') &&
+ (0x03 == ver))
+ return true;
+
+ return false;
+}
+
+static uint16_t proc_connect_vh_rx(struct mqtt_packet *mqp_raw,
+ uint8_t *conn_flags, uint16_t *ka_secs)
+{
+ struct utf8_string utf8;
+ uint8_t *buf = MQP_PAYLOAD_BUF(mqp_raw);
+ uint8_t *end = buf + mqp_raw->pl_len;
+ uint8_t *ref = buf;
+
+ buf += mqp_buf_rd_utf8(buf, end, &utf8);
+ if(end - buf < 1)
+ return CONNACK_RC_BAD_PROTOV; /* No proto ver */
+
+ if(false == proc_protocol_info(&utf8, *buf++))
+ return CONNACK_RC_BAD_PROTOV;
+
+ *conn_flags = *buf++;
+
+ if(end - buf < 2)
+ return 0xFF; /* Bad packet composition */
+
+ *ka_secs = (buf[0] << 8) | buf[1];
+ buf += 2;
+ *ka_secs += *ka_secs >> 1;
+
+ mqp_raw->vh_len = buf - ref;
+ mqp_raw->pl_len -= buf - ref;
+
+ return 0;
+}
+
+#define RET_IF_RD_CONN_ERROR(buf, end, utf8) \
+ { \
+ int32_t len = mqp_buf_rd_utf8(buf, end, utf8); \
+ if(len < 0) \
+ return 0x00FF; \
+ \
+ buf += len; \
+ }
+
+uint16_t proc_connect_pl_rx(const uint8_t *buf, const uint8_t *end, uint8_t conn_flags,
+ struct utf8_string *free_utf8s,
+ struct utf8_string **used_refs)
+{
+ struct utf8_string *utf8;
+
+ utf8 = used_refs[0] = free_utf8s + 0;
+ RET_IF_RD_CONN_ERROR(buf, end, utf8);
+
+ if(conn_flags & WILL_CONFIG_VAL) {
+ utf8 = used_refs[1] = free_utf8s + 1;
+ RET_IF_RD_CONN_ERROR(buf, end, utf8);
+
+ utf8 = used_refs[2] = free_utf8s + 2;
+ RET_IF_RD_CONN_ERROR(buf, end, utf8);
+ }
+
+ if((0 == (conn_flags & USER_NAME_OPVAL)) &&
+ (0 != (conn_flags & PASS_WORD_OPVAL)))
+ return 0x00FF; /* Bad combination */
+
+ if(conn_flags & USER_NAME_OPVAL) {
+ utf8 = used_refs[3] = free_utf8s + 3;
+ RET_IF_RD_CONN_ERROR(buf, end, utf8);
+ }
+
+ if(conn_flags & PASS_WORD_OPVAL) {
+ utf8 = used_refs[4] = free_utf8s + 4;
+ RET_IF_RD_CONN_ERROR(buf, end, utf8);
+ }
+
+ return 0;
+}
+
+static
+bool proc_connect_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+ struct utf8_string *used_refs[5] = {NULL, NULL, NULL, NULL, NULL};
+ struct utf8_string free_utf8s[5];
+ uint8_t conn_flags, *buf, *end;
+ bool clean_session = false;
+ uint16_t ack_vh16 = 0; /* Variable Header of CONNACK (response) Message */
+
+ set_rcvd_conn_msg(cl_ctx);
+
+ ack_vh16 = proc_connect_vh_rx(mqp_raw, &conn_flags, &cl_ctx->ka_secs);
+ if(ack_vh16)
+ goto proc_connect_rx_exit1;
+
+ buf = MQP_PAYLOAD_BUF(mqp_raw);
+ end = buf + mqp_raw->pl_len;
+ ack_vh16 = proc_connect_pl_rx(buf, end, conn_flags,
+ free_utf8s, used_refs);
+ if(ack_vh16)
+ goto proc_connect_rx_exit1;
+
+ clean_session = (conn_flags & CLEAN_START_VAL)? true : false;
+ ack_vh16 = (!used_refs[0]->length && !clean_session)?
+ CONNACK_RC_CLI_REJECT : 0; /* Validate 0 byte Client ID */
+ if(ack_vh16)
+ goto proc_connect_rx_exit1;
+
+ /* UTF8 Order: Client ID, Will Topic, Will Msg, User Name, Pass Word */
+ ack_vh16 = usr_cbs->connect_rx(cl_ctx, conn_flags, &used_refs[0],
+ &cl_ctx->usr);
+ proc_connect_rx_exit1:
+
+ DBG_INFO("S: CONNACK RC (16bits) is %u (%s)\n\r", ack_vh16,
+ ack_vh16 & 0xFF? "error" : "good");
+
+ if(0xFF != (ack_vh16 & 0xFF))
+ vh_msg_send(cl_ctx, MQTT_CONNACK, MQTT_QOS0, true, ack_vh16);
+
+ if(CONNACK_RC_REQ_ACCEPT == (ack_vh16 & 0xFF)) {
+ rx_timeout_update(cl_ctx);
+ usr_cbs->on_connack_send(cl_ctx->usr, clean_session);
+ } else {
+ return false;
+ }
+
+ return true;
+}
+
+
+static void recv_hvec_load(int32_t *recv_hvec, uint32_t size, struct client_ctx *list)
+{
+ int32_t i = 0;
+
+ for(i = 0; (i < size) && (NULL != list); i++, list = list->next)
+ recv_hvec[i] = list->net;
+
+ recv_hvec[i] = -1;
+
+ return;
+}
+
+static bool process_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
+{
+ uint8_t msg_type = mqp_raw->msg_type;
+ bool rv = false;
+
+ USR_INFO("S: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d\n\r",
+ mqp_raw->fh_byte1, cl_ctx->net);
+
+ if((MQTT_CONNECT != msg_type) ^ had_rcvd_conn_msg(cl_ctx))
+ goto process_recv_exit1; /* Proto Violation */
+
+ rx_timeout_update(cl_ctx);
+
+ switch(msg_type) {
+
+ case MQTT_CONNECT:
+ rv = proc_connect_rx(cl_ctx, mqp_raw);
+ break;
+
+ case MQTT_DISCONNECT:
+ rv = proc_disconn_rx(cl_ctx);
+ break;
+
+ case MQTT_SUBSCRIBE:
+ rv = proc_sub_msg_rx(cl_ctx, mqp_raw);
+ break;
+
+ case MQTT_UNSUBSCRIBE:
+ rv = proc_unsub_msg_rx(cl_ctx, mqp_raw);
+ break;
+
+ case MQTT_PINGREQ:
+ rv = proc_pingreq_rx(cl_ctx);
+ break;
+
+ case MQTT_PUBLISH:
+ rv = proc_pub_msg_rx(cl_ctx, mqp_raw);
+ break;
+
+ case MQTT_PUBACK:
+ case MQTT_PUBREC:
+ case MQTT_PUBREL:
+ case MQTT_PUBCOMP:
+ rv = proc_ack_msg_rx(cl_ctx, mqp_raw);
+ break;
+
+ default:
+ break;
+ }
+
+ process_recv_exit1:
+ DBG_INFO("S: Processing of MSG ID 0x%02x: %s\n\r",
+ mqp_raw->msg_id, rv? "Good" : "Fail");
+
+ return rv;
+}
+
+
+/* Terminate net connections which haven't received PKTs beyond expected time.
+ Caller must ensure atomic enviroment for execution of this routine.
+*/
+static void ka_sequence(uint32_t *secs2wait)
+{
+ struct client_ctx *cl_ctx = used_ctxs; /* Sorted for timeout (ascend) */
+ uint32_t now_secs = net_ops->time();
+
+ while(NULL != cl_ctx) {
+ struct client_ctx *next = cl_ctx->next;
+ if(NEED_NET_CLOSE(cl_ctx) || !(cl_ctx->timeout > now_secs)) {
+ bool due2err = false;
+ if(cl_ctx->flags & NW_CONN_ERROR_FLAG)
+ due2err = true;
+
+ cl_ctx->flags &= ~(NW_CONN_ERROR_FLAG |
+ NETWORK_CLOSE_FLAG);
+
+ /* Close network: Timeout or TX err */
+ do_net_close_rx(cl_ctx, due2err);
+ }
+
+ cl_ctx = next;
+ }
+
+ cl_ctx = used_ctxs;
+ if(((NULL != cl_ctx) && (KA_TIMEOUT_NONE == cl_ctx->timeout)) ||
+ ((NULL == cl_ctx)))
+ *secs2wait = IO_MON_NO_TIMEOUT;
+ else
+ *secs2wait = cl_ctx->timeout - now_secs;
+
+ return;
+}
+
+/* Put a new functiona name such as mk_new_ctx() or setup_ctx() and
+ processing to restrict limit number of connections.
+
+ Return value as well.
+*/
+static bool accept_ctx(int32_t net, uint32_t wait_secs)
+{
+ struct client_ctx *cl_ctx = cl_ctx_alloc();
+ if(NULL == cl_ctx)
+ return false;
+
+ cl_ctx->net = net_ops->accept(net, cl_ctx->remote_ip,
+ &cl_ctx->ip_length);
+ if(-1 == cl_ctx->net) {
+ cl_ctx_free(cl_ctx);
+ USR_INFO("S: failed to accept new NW connection\n\r");
+ return false;
+ }
+
+ DBG_INFO("Accepted new connection (fd) %d\n\r", (int32_t)cl_ctx->net);
+
+ /* Timeout to receive MQTT_CONNECT */
+ cl_ctx->timeout = wait_secs + net_ops->time();
+
+ used_ctxs_insert(cl_ctx);
+ return true;
+}
+
+static struct client_ctx *net_cl_ctx_find(int32_t net)
+{
+ struct client_ctx *cl_ctx = used_ctxs;
+ while(cl_ctx) {
+ if(net == cl_ctx->net)
+ break;
+
+ cl_ctx = cl_ctx->next;
+ }
+
+ if(NULL == cl_ctx)
+ USR_INFO("Did not find ctx for net %d\n\r", net);
+
+ return cl_ctx;
+}
+
+static int32_t recv_hvec[MAX_NWCONN + 1 + 1 + 1]; /* LISTEN + LOOPBACK + VEC END */
+static int32_t send_hvec = -1;
+static int32_t rsvd_hvec = -1;
+static int32_t listen_net = -1;
+
+static struct mqtt_packet rx_mqp;
+static uint8_t rxb[MQP_SERVER_RX_LEN];
+static uint16_t listener_port = 0;
+
+static inline
+int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, bool *timed_out)
+{
+ int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, timed_out, NULL);
+ if(rv <= 0)
+ rv = MQP_ERR_NETWORK;
+
+ return rv;
+}
+
+static int32_t proc_loopback_recv(int32_t net)
+{
+ uint8_t buf[LOOP_DLEN];
+
+ /* Thanks for waking-up thread and do nothing in this routine */
+ int32_t rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0);
+ pending_trigs = false;
+
+ if(rv <= 0) {
+ net_ops->close(net);
+ return MQP_ERR_LIBQUIT;
+ }
+
+ return rv;
+}
+
+static void proc_net_data_recv(int32_t net)
+{
+ struct client_ctx *cl_ctx = net_cl_ctx_find(net);
+ bool dummy;
+ int32_t rv;
+
+ mqp_reset(&rx_mqp); /* Start w/ a clean buffer */
+
+ rv = net_recv(net, &rx_mqp, 0, &dummy);
+ if(rv > 0)
+ /* Working Principle: Only RX processing errors should be
+ reported as 'false'. Status of TX as a follow-up to RX
+ messages need not be reported by the xyz_rx() routines.
+ Error observed in TX is either dealt in next iteration
+ of RX loop.
+ */
+ if(false == process_recv(cl_ctx, &rx_mqp))
+ rv = MQP_ERR_CONTENT;
+
+ if(rv < 0)
+ do_net_close_rx(cl_ctx, rv);
+}
+
+static bool accept_and_recv_locked(int32_t *recv_hnds, int32_t n_hnds, uint32_t wait_secs)
+{
+ bool rv = true;
+ int32_t idx = 0;
+
+ MUTEX_LOCKIN();
+
+ for(idx = 0; (idx < n_hnds) && (rv == true); idx++) {
+ int32_t net = recv_hvec[idx];
+ if(net == listen_net) {
+ rv = accept_ctx(listen_net, wait_secs);
+ } else if(loopback_port && (net == loopb_net)) {
+ if(proc_loopback_recv(loopb_net) < 0)
+ rv = false;
+ } else {
+ proc_net_data_recv(net);
+ }
+ }
+
+ MUTEX_UNLOCK();
+
+ return rv;
+}
+
+int32_t mqtt_server_run(uint32_t wait_secs) // TBD break into two functions
+{
+ uint32_t secs2wait = 0;
+ int32_t n_hnds = 0;
+
+ USR_INFO("S: MQTT Server Run invoked ....\n\r");
+
+ if(NULL == net_ops)
+ return MQP_ERR_NET_OPS;
+
+ if(loopback_port) {
+ loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL,
+ loopback_port, NULL);
+ if(-1 == loopb_net)
+ return MQP_ERR_LIBQUIT;
+ }
+
+ listen_net = net_ops->listen(0, listener_port, NULL);
+ if(-1 == listen_net)
+ return MQP_ERR_LIBQUIT;
+
+ do {
+ int32_t *r_hvec = recv_hvec + 0;
+
+ *r_hvec++ = listen_net;
+ if(loopback_port)
+ *r_hvec++ = loopb_net;
+
+ /* MQTT Timeouts: close expired conns; get time to next expiry */
+ ka_sequence(&secs2wait);
+
+ /* Prepare array of net handles. Must've atleast listen handle */
+ // recv_hvec_load(&recv_hvec[2], MAX_NWCONN + 1, used_ctxs);
+ recv_hvec_load(r_hvec, MAX_NWCONN + 1, used_ctxs);
+
+ n_hnds = net_ops->io_mon(recv_hvec, &send_hvec,
+ &rsvd_hvec, secs2wait);
+ if(n_hnds < 0)
+ return MQP_ERR_LIBQUIT;
+
+ if(false == accept_and_recv_locked(recv_hvec, n_hnds, wait_secs))
+ return MQP_ERR_LIBQUIT;
+
+ } while(1);
+}
+
+int32_t mqtt_server_register_net_svc(const struct device_net_services *net)
+{
+ if(net && net->io_mon && net->close && net->send &&
+ net->recv && net->time && net->listen) {
+ net_ops = net;
+ return 0;
+ }
+
+ return -1;
+}
+
+int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *lib_cfg,
+ const struct mqtt_server_msg_cbs *msg_cbs)
+{
+ cl_ctx_init();
+
+ if((NULL == lib_cfg) ||
+ (0 == lib_cfg->listener_port) ||
+ (NULL == lib_cfg->debug_printf))
+ return -1;
+
+ debug_printf = lib_cfg->debug_printf; /* Facilitate debug */
+
+ loopback_port = lib_cfg->loopback_port;
+ listener_port = lib_cfg->listener_port;
+
+ mutex = lib_cfg->mutex;
+ mutex_lockin = lib_cfg->mutex_lockin;
+ mutex_unlock = lib_cfg->mutex_unlock;
+
+ aux_dbg_enbl = lib_cfg->aux_debug_en;
+ debug_printf = lib_cfg->debug_printf;
+
+ usr_cbs = &usr_obj;
+
+ memcpy(usr_cbs, msg_cbs, sizeof(struct mqtt_server_msg_cbs));
+
+ mqp_buffer_attach(&rx_mqp, rxb, MQP_SERVER_RX_LEN, 0);
+
+ return 0;
+}
+
+}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 server_pkts.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_pkts.h Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,440 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#ifndef __SERVER_PKTS_H__
+#define __SERVER_PKTS_H__
+
+#include "mqtt_common.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+namespace mbed_mqtt {
+
+/*-----------------------------------------------------------------------------
+ * Note: Do not create additional dependency of this file on any header other
+ * than mqtt_common.h. Specifically, server_pkts.[hc] in conjunction with the
+ * mqtt_common.[hc] files must be facilitated to create a stand-alone library.
+ *-----------------------------------------------------------------------------
+ */
+
+/**@file server_pkts.h
+ The C library provisions the interface / API(s) for the MQTT Server Packet LIB.
+
+ This is a light-weight library that enables the services of the MQTT protcol
+ for user's server application(s) to exchange the MQTT packets with one or
+ more remote clients. The Server Packet LIB is a simple and easy-to-use
+ implementation to support both un-packing of the messages received from the
+ remote clients and formation of packets to be sent to the remote clients.
+
+ The library is targeted to conform to MQTT 3.1.1 specification.
+
+ The Server Packet LIB is a highly portable software and implies a very limited
+ set of dependencies on a platform. Importantly, these limited dependencies are
+ common features used in the embedded and the networking world, and can be
+ easily adapted to the target platforms. The services of the library are
+ multi-task safe. Platform specific atomicity constructs are used, through
+ abstractions, by the library to maintain data coherency and synchronization.
+ In addition, the library can be configured to support several in-flight
+ messages.
+
+ The Server Packet LIB can support multiple and simultaneous MQTT connections
+ from clients. However, the responsibility of managing the clients and topics,
+ authentication and approval for connections and supporting any other needs
+ that specific to the deployment remains with the user application.
+
+ The number of the network connections that the Server Packet LIB can support
+ is configurable through the compile line option / flag <b> -DCFG_SR_MQTT_CTXS
+ </b>. In addition, the maximum length of the RX buffer used by the server is
+ also configurable through the compile line option / flag <b>
+ -DCFG_SR_MAX_MQP_RX_LEN </b>.
+
+ @note Any future extensions & development must follow the following guidelines.
+ A new API or an extension to the existing API
+ a) must be rudimentary
+ b) must not imply a rule or policy (including a state machine)
+ b) must ensure simple design and implementation
+*/
+
+
+/** @defgroup server_pktlib The Server Library API(s)
+ @{
+*/
+
+/** @defgroup conn_utf8_help_group Helper Macros for RX CONNECT
+ @{
+*/
+
+/** Yields pointer to the UTF8 content */
+#define MQ_CONN_UTF8_BUF(utf8) ((utf8)? (utf8)->buffer : NULL)
+
+/** Length or size of the UTF8 content */
+#define MQ_CONN_UTF8_LEN(utf8) ((utf8)? (utf8)->length : 0)
+
+#define MQC_UTF8_CLIENTID(utf8_vec) (utf8_vec[0]) /**< Returns Client ID */
+#define MQC_UTF8_WILL_TOP(utf8_vec) (utf8_vec[1]) /**< Returns Will Topic */
+#define MQC_UTF8_WILL_MSG(utf8_vec) (utf8_vec[2]) /**< Returns Will Data */
+#define MQC_UTF8_USERNAME(utf8_vec) (utf8_vec[3]) /**< Returns User Name */
+#define MQC_UTF8_PASSWORD(utf8_vec) (utf8_vec[4]) /**< Returns Pass Word */
+
+#define MQC_CLIENTID_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_CLIENTID(utf8_vec))
+#define MQC_CLIENTID_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_CLIENTID(utf8_vec))
+
+#define MQC_WILL_TOP_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_WILL_TOP(utf8_vec))
+#define MQC_WILL_TOP_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_WILL_TOP(utf8_vec))
+
+#define MQC_WILL_MSG_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_WILL_MSG(utf8_vec))
+#define MQC_WILL_MSG_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_WILL_MSG(utf8_vec))
+
+#define MQC_USERNAME_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_USERNAME(utf8_vec))
+#define MQC_USERNAME_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_USERNAME(utf8_vec))
+
+#define MQC_PASSWORD_BUF(utf8_vec) MQ_CONN_UTF8_BUF(MQC_UTF8_PASSWORD(utf8_vec))
+#define MQC_PASSWORD_LEN(utf8_vec) MQ_CONN_UTF8_LEN(MQC_UTF8_PASSWORD(utf8_vec))
+
+/** @} */
+
+#ifndef CFG_SR_MAX_MQP_RX_LEN
+#define MQP_SERVER_RX_LEN 1024 /**< Max size(B) of RX Buffer for MQTT Server */
+#else
+#define MQP_SERVER_RX_LEN CFG_SR_MAX_MQP_RX_LEN
+#endif
+
+/** Send a Variable Header only message to the client.
+ Application should this routine to send PUBREL and PUBCOMP messages.
+
+ @param[in] ctx_cl handle to the underlying network context in the LIB. This
+ handle is provided to the application by the LIB in the CONNECT callback.
+ @param[in] msg_type message type that has to be sent to the client
+ @param[in] qos QoS with which the message needs to send to server
+ @param[in] has_vh does this message has data in the variable header?
+ @param[in] vh_data data <MSB:LSB> to be included in the variable header
+ @return on success, the number of bytes transferred. Otherwise, errors
+ defined in @ref lib_err_group
+*/
+int32_t mqtt_vh_msg_send(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos,
+ bool has_vh, uint16_t vh_data);
+
+/** mqtt_vh_msg_send() with mutual exclusion (in multi-task application).
+ This routine ensures that the LIB sends the specified VH MSG onto the network
+ in a manner that excludes execution of any other control. This API has been
+ enabled to support the scenarios, where the multi-tasked user application has
+ chosen to use a mutex object other than the one provisioned in the packet LIB
+ to streamline / serialize accesses to the services of the packet LIB.
+
+ Refer to @ref mqtt_vh_msg_send for details
+*/
+int32_t mqtt_vh_msg_send_locked(void *ctx_cl, uint8_t msg_type, enum mqtt_qos qos,
+ bool has_vh, uint16_t vh_data);
+
+/** Dispatch application constructed PUBLISH message to the client.
+ Prior to sending the message to the client, this routine shall update the
+ fixed-header to account for the duplicate flag that has been indicated by
+ the caller.
+
+ The caller must populate the payload information of topic and data before
+ invoking this service. In addition, the application must prepare, for the
+ packet, the fix header leaving aside the duplicate flag - this flag shall
+ be included in the fix heder by the LIB.
+
+ This service facilitates the application to re-use, iteratively, a single
+ PUBLISH packet for multiple remote clients that have subscribed to the
+ same topic for which the data is being published. The LIB does not free-up
+ the MQTT packet after sending the packet to the remote client and the
+ application is responsible for managing the packet container / memory
+
+ @param[in] ctx_cl handle to the underlying network context in the LIB. This
+ handle is provided to the application by the LIB in the CONNECT callback.
+ @param[in] mqp app created PUBLISH message alongwith the fixed header
+ @param[in] dup is this a duplicate message that is being sent to client?
+ @return on success, the number of bytes transferred. Otherwise, error
+ defined in @ref lib_err_group
+*/
+int32_t mqtt_server_pub_dispatch(void *ctx_cl, struct mqtt_packet *mqp, bool dup);
+
+/** mqtt_server_pub_dispatch() with mutual exclusion (in multi-task application).
+ This routine ensures that the LIB sends the specified packet onto the network
+ in a manner that excludes execution of any other control. This API has been
+ enabled to support the scenarios, where the multi-tasked user application has
+ chosen to use a mutex object other than the one provisioned in the packet LIB
+ to streamline / serialize accesses to the services of the packet LIB.
+
+ Refer to @ref mqtt_server_pub_dispatch for other details.
+*/
+int32_t mqtt_server_pub_dispatch_locked(void *ctx_cl, struct mqtt_packet *mqp,
+ bool dup);
+
+/** Run the server packet LIB for the specified wait time.
+ This routine yields the control back to the application after the specified
+ duration of wait time. Such an arrangement enable the application to make
+ overall progress to meet it intended functionality.
+
+ The wait time implies the maximum intermediate duration between the reception
+ of two successive messages from the server. If no message is received before
+ the expiry of the wait time, the routine returns. However, the routine would
+ continue to block, in case, messages are being received within the successive
+ period of wait time.
+
+ @param[in] wait_secs maximum time to wait for a message from the server
+
+ @note if the value of MQP_ERR_LIBQUIT is returned, then system must be
+ restarted.
+*/
+int32_t mqtt_server_run(uint32_t wait_secs);
+
+/** Abstraction for the device specific network services
+ Network services for communication with the clients
+
+ @param[in] net refers to network services supported by the platform
+ @return on success, 0, otherwise -1
+
+ @ref net_ops_group
+ @note all entries in net must be supported by the platform.
+*/
+int32_t mqtt_server_register_net_svc(const struct device_net_services *net);
+
+
+/** <b> Working Principle </b> for implementing the call-back services:
+ Implementation of the call-back services should report in return value, only
+ about errors found in the RX processing. Specifically, the status of TX as a
+ follow-up to RX message (represented as a call-back service) need not be
+ reported to the server packet library.
+
+ Error observed in TX (supported by appropriate API(s) / services in the
+ service packet library) is recorded in the 'context' and shall be dealt in
+ the next iteration of RX loop.
+*/
+struct mqtt_server_msg_cbs {
+
+ /** Indicate the CONNECT Message from the client
+ This routine provides, to the application, information about the
+ connection request that a remote client has made. The application
+ should utilize the credential and other data presented by the LIB
+ to authenticate, analyze and finally, approve or deny the request.
+
+ Application at its discretion can also imply deployment specific
+ policies to make decision about allowing or refusing the request.
+
+ The return value of this routine is a 16bit value that commensurates
+ with the 'variable header' of the CONNACK message. Specifically, the
+ LSB of the 16bit return value corresponds to the 8bit 'return code'
+ parameter in the CONNACK message and the MSB to the 'acknowledge
+ flags'. The application must set a valid return value.
+
+ The LIB uses the return value to compose the CONNACK message to the
+ remote client. If the LSB of return value is a 'non zero' value,
+ then the LIB, after sending the CONNACK message to the remote client,
+ will terminate the network connection.
+
+ @param[in] ctx_cl handle to the underlying network context in the LIB
+ @param[in] conn_flags options received in CONNECT message from server
+ @param[in] utf8_vec vector / array of pointers to UTF8 information.
+ The order of UTF8 information is client-id, will topic, will-message,
+ user-name and pass-word. A NULL in vector element indicates absence
+ of that particular UTF8 information.
+ @param[out] usr place holder for application to provide connection
+ specific handle. In subsequent calls from the implementation this
+ handle will be passed as a parameter to enable application to refer
+ to the particular active connection.
+
+ @return 16bit value for the variable header of the CONNACK message,
+ MSB is CONNACK-Flags, LSB is CONNACK-RC
+ */
+ uint16_t (*connect_rx)(void *ctx_cl, uint8_t conn_flags,
+ struct utf8_string * const *utf8_vec, void **usr);
+
+ /** Indicate the SUBSCRIBE Message from the client
+ This routine provides, to the application, information about the
+ topics that the remote client intends to subscribe to.
+
+ On return from this routine, if the application has found a problem
+ in the processing of message, then the LIB will simply terminate the
+ associated network connection.
+
+ @param[in] usr_cl handle to connection context in the application
+ @param[in] qos_topics an array of topic along-with its qos
+ @param[in] n_topics the count / number of elements in the array
+ @param[in] msg_id the associated message ID provided by the client
+ @param[in] acks place holder array for the application to provide
+ finalized qos for each of the subscribed topic. The order of ack
+ is same as that of qos_topics
+
+ @return The application should return false, if it encounters any
+ problem in the processing of topic. Otherwise, true.
+
+ @note The memory space pointed by the 'buffer' field in the elements
+ of 'qos_topics' array has an additional byte available beyond the
+ size of memory indicated by the 'length' field. This extra byte can
+ be used by the application to NUL terminate the buffer. <b> This
+ quirk is applicable to this routine only. </b>
+ */
+ bool (*sub_msg_rx)(void *usr_cl, const struct utf8_strqos *qos_topics,
+ uint32_t n_topics, uint16_t msg_id, uint8_t *acks);
+
+ /** Indicate the UNSUBSCRIBE Message from the client
+ This routine provides, to the application, information about the
+ topics that the remote client intends to unsubscribe.
+
+ On return from this routine, if the application has found a problem
+ in the processing of message, then the LIB will simply terminate the
+ associated network connection.
+
+ @param[in] usr_cl handle to connection context in the application
+ @param[in] topics an array of topic in the message
+ @param[in] n_topics the count / number of elements in the array
+ @param[in] msg_id the associated message ID provided by the client
+
+ @return The application should return false, if it encounters any
+ problem in the processing of topic. Otherwise, true.
+ */
+ bool (*un_sub_msg)(void *usr_cl, const struct utf8_string *topics,
+ uint32_t n_topics, uint16_t msg_id);
+
+ /** Indicate the PUBLISH Message from the client.
+ This routine provides, to the application, the binary data along-with
+ its qualifiers and the topic to which a remote client has published
+ data.
+
+ On return from this routine, if the application has found a problem
+ in processing of the contents of the PUBLISH message, the LIB will
+ simply terminate the associated network connection. Otherwise,
+ depending upon the QoS level of the PUBLISH message, the LIB shall
+ dispatch the ACK (PUBACK or PUBREC) to the client, thereby,
+ relieveing the application from this support.
+
+ @param[in] usr_cl handle to connection context in the application
+ @param[in] topic UTF8 Topic Name for which data has been published
+ @param[in] data_buf the published binary data for the topic
+ @param[in] data_len the length of the binary data
+ @param[in] msg_id the associated message ID provided by the client
+ @param[in] dup has client indicated this as a duplicate message
+ @param[in] qos quality of service of the message
+ @param[in] retain should the server retain the data?
+
+ @return The application should return false, if it encounters any
+ problem in the processing of data, topic and related resources.
+ Otherwise, true.
+ */
+ bool (*pub_msg_rx)(void *usr_cl, const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id,
+ bool dup, enum mqtt_qos qos, bool retain);
+
+ /** Notify the acknowledgement that was received from the remote client.
+ Following are the messages that are notified by the server LIB.
+
+ PUBACK, PUBREC, PUBREL, PUBCOMP
+
+ On return from this routine, if the application has found problem
+ in processing the ACK message, then the LIB will simply terminate
+ the associated network connection
+
+ @param[in] usr_cl handle to connection context in the application
+ @param[in] msg_type refers to the type of ACK message
+ @param[in] msg_id the associated message ID provided by the client
+ @return application should return false if the ACK was not expected
+ by it or no reference was found for it. Otherwise true.
+ */
+ bool (*ack_notify)(void *usr_cl, uint8_t msg_type, uint16_t msg_id);
+
+ /** Notify that network connection to client has been closed.
+ This routine is invoked by the LIB to declare to the application that
+ the network connection to a particular client has been terminated and
+ follow-up, if needed, can now commence. If configured, removal of the
+ client session and / or dispatch of the WILL message, will be typical
+ aspects, among others, to follow-up. The routine aids the application
+ by indicating if an error condition had caused the closure.
+
+ This routine is invoked by the LIB irrespective of the source entity,
+ server or client, that has caused the closure of the network.
+
+ @param[in] usr_cl handle to connection context in the application
+ @param[in] due2err has the connection been closed due to an error?
+ */
+ void (*on_cl_net_close)(void *usr_cl, bool due2err);
+
+ /** Notify that CONNACK has been sent to the specified client.
+ This routine is invoked by the LIB to enable the application to make
+ progress and follow-up on the session information for the particular
+ client. Specifically, this routine facilitates the application to
+ either delete the session or re-send / sync-up the pending messages
+ associated with the client. The follow-up action is depenedent upon
+ the 'clean_session' option in the CONNECT message from the client.
+ @param[in] usr_cl handle to connection context in the application
+ @param[in] clean_session was a clean session requested in CONNECT?
+ */
+ void (*on_connack_send)(void *usr_cl, bool clean_session);
+};
+
+struct mqtt_server_lib_cfg {
+
+ /** Port to listen to incoming network connections from the clients */
+ uint16_t listener_port;
+
+ /** If the server application has more than one task and / or supports
+ at-least one plug-in, then a non-zero value must be provided.
+ Otherwise, this parameter must be set to zero. This parameter is
+ used by the implementation to synchronize multiple tasks for the
+ network connection.
+ */
+ uint16_t loopback_port;
+
+ /** For a multi-task enviroment, provide a handle to platform mutex */
+ void *mutex;
+ void (*mutex_lockin)(void *mutex);
+ void (*mutex_unlock)(void *mutex);
+
+ int32_t (*debug_printf)(const char *format, ...); /**< Debug, mandatory */
+ bool aux_debug_en; /**< Assert to indicate additional debug info */
+
+};
+
+/** Initialize the MQTT Server Packet library
+ This routine initializes the packet and network constructs that are required
+ to manage the multiple network connetions. The server packet LIB must be
+ initialized prior to invoking of any other routine or service.
+
+ @note This routine must be invoked only once in an run of the system.
+
+ If there are more than one application (tasks) that utilize the services
+ of the server packet library, then certain configuration must be set in
+ the LIB @see struct mqtt_server_lib_cfg
+
+ The application should also provision the platform network specific network
+ services into the packet library @see mqtt_server_register_net_svc.
+
+ Once, the server LIB has been intialized successfully, it must be put into
+ action, to listen to requests for incoming connections, by invoking the API
+ @ref mqtt_server_run.
+
+ @param[in] cfg configuration information for the MQTT server packet library
+ @param[in] cbs callback routines that LIB will invoke into the application
+
+ @return 0 on success otherwise -1.
+*/
+int32_t mqtt_server_lib_init(const struct mqtt_server_lib_cfg *cfg,
+ const struct mqtt_server_msg_cbs *cbs);
+
+/** @} */ /* End of server_pktlib */
+
+}//namespace mbed_mqtt
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff -r 000000000000 -r 547251f42a60 server_plug.cpp
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_plug.cpp Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,219 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_plug.h"
+#include "server_core.h"
+#include "server_util.h"
+
+namespace mbed_mqtt {
+
+#define MAX_PLUGINS PG_MAP_MAX_ELEMS
+#define PG_NAME_LEN 32
+
+static struct plugin_desc {
+
+ char *name;
+ uint8_t index;
+ uint8_t inuse;
+
+ struct mqtt_server_app_cbs app_cbs;
+
+} plugins[MAX_PLUGINS];
+
+#define PLUGIN(app_hnd) ((struct plugin_desc*) app_hnd)
+
+static inline bool is_inuse(struct plugin_desc *plugin)
+{
+ return plugin->inuse? true : false;
+}
+
+static inline void inuse_set(struct plugin_desc *plugin, bool inuse)
+{
+ plugin->inuse = inuse? 0x01 : 0x00;
+}
+
+static struct plugin_desc *acl_pg = NULL;
+
+static struct plugin_core_msg_cbs *msg_cbs, msg_cbacks = {NULL, NULL, NULL};
+
+static inline struct plugin_desc *plugin_find(int32_t idx)
+{
+ return plugins + idx;
+}
+
+static void plugin_reset(struct plugin_desc *plugin)
+{
+ //plugin->app_cbs = NULL; ==> TBD
+ inuse_set(plugin, false);
+
+ return;
+}
+
+static struct plugin_desc *plugin_alloc(void)
+{
+ struct plugin_desc *plugin = NULL;;
+ int32_t idx = 0;
+
+ for(idx = 0; idx < MAX_PLUGINS; idx++) {
+ plugin = plugins + idx;
+ if(false == is_inuse(plugin)) {
+ inuse_set(plugin, true);
+ break;
+ }
+ }
+
+ DBG_INFO("Plugin alloc %s\n\r",
+ (MAX_PLUGINS == idx)? "Failed" : "Success");
+
+ return (MAX_PLUGINS != idx)? plugin : NULL;
+}
+
+#if 0
+static void plugin_free(struct plugin_desc *plugin)
+{
+ plugin_reset(plugin);
+}
+#endif
+
+uint16_t plugin_connect(const struct utf8_string *clientId,
+ const struct utf8_string *username,
+ const struct utf8_string *password,
+ void **app_usr)
+{
+ uint16_t rv = CONNACK_RC_REQ_ACCEPT; /* Accept everything from MQTT network */
+
+ *app_usr = NULL;
+ if(acl_pg)
+ rv = acl_pg->app_cbs.connect(clientId, username,
+ password, app_usr);
+
+ return rv;
+}
+
+int32_t plugin_publish(uint8_t pg_map, const struct utf8_string *topic,
+ const uint8_t *payload, uint32_t pay_len,
+ bool dup, uint8_t qos, bool retain)
+{
+ int32_t i = 0;
+ for(i = 0; i < MAX_PLUGINS; i++) {
+ if(PG_MAP_HAS_VALUE(pg_map, i)) {
+ struct plugin_desc *plugin = plugin_find(i);
+
+ DBG_INFO("Publishing to Plugin ID: %d (%s)\n\r",
+ plugin->index, plugin->name);
+
+ if(false == is_inuse(plugin))
+ continue; /* Must not happen */
+
+ plugin->app_cbs.publish(topic, payload, pay_len,
+ dup, qos, retain);
+ }
+ }
+
+ /* TBD for error value return. */
+
+ return pay_len;
+}
+
+int32_t plugin_disconn(const void *app_usr, bool due2err)
+{
+ if(acl_pg)
+ acl_pg->app_cbs.disconn(app_usr, due2err);
+
+ /* TBD for error value return. */
+
+ return 0;
+}
+
+int32_t mqtt_server_topic_enroll(const void *app_hnd,
+ const struct utf8_string *topic, enum mqtt_qos qos)
+{
+ return app_hnd?
+ msg_cbs->topic_enroll(PLUGIN(app_hnd)->index, topic, qos) : -1;
+}
+
+int32_t mqtt_server_topic_disenroll(const void *app_hnd,
+ const struct utf8_string *topic)
+{
+ return app_hnd?
+ msg_cbs->topic_cancel(PLUGIN(app_hnd)->index, topic) : -1;
+}
+
+int32_t mqtt_server_app_pub_send(const struct utf8_string *topic,
+ const uint8_t *data_buf, uint32_t data_len,
+ enum mqtt_qos qos, bool retain)
+{
+ return msg_cbs->publish(topic, data_buf, data_len, qos, retain);
+}
+
+static void *server_app_register(const struct mqtt_server_app_cbs *cbs,
+ const char *name)
+{
+ struct plugin_desc *plugin = plugin_alloc();
+ if(NULL != plugin) {
+ strncpy(plugin->name, name, PG_NAME_LEN - 1);
+ memcpy(&plugin->app_cbs, cbs,
+ sizeof(struct mqtt_server_app_cbs));
+
+ if((NULL == acl_pg) && cbs->connect)
+ acl_pg = plugin;
+
+ }
+ return plugin;
+}
+
+void *mqtt_server_app_register(const struct mqtt_server_app_cbs *cbs,
+ const char *name)
+{
+ if((NULL == cbs) ||
+ ((!!cbs->connect) ^ (!!cbs->disconn)) ||
+ (acl_pg && cbs->connect))
+ return NULL;
+
+ return server_app_register(cbs, name);
+}
+
+static bool inited = false;
+
+int32_t plugin_init(const struct plugin_core_msg_cbs *cbs)
+{
+ int32_t idx = 0;
+
+ if(inited)
+ return -1;
+
+ if(NULL == cbs)
+ return -2;
+
+ if(!(cbs->topic_enroll && cbs->topic_cancel && cbs->publish))
+ return -3;
+
+ for(idx = 0; idx < MAX_PLUGINS; idx++) {
+ struct plugin_desc *plugin = plugins + idx;
+ plugin->index = idx;
+
+ plugin_reset(plugin);
+ }
+
+ msg_cbs = &msg_cbacks;
+ memcpy(msg_cbs, cbs, sizeof(struct plugin_core_msg_cbs));
+
+ inited = true;
+
+ USR_INFO("Plugin module has been initialized.\n\r");
+ return 0;
+}
+
+}//namespace mbed_mqtt
diff -r 000000000000 -r 547251f42a60 server_plug.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_plug.h Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,87 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#ifndef __SERVER_PLUG_H__
+#define __SERVER_PLUG_H__
+
+#include "server_pkts.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+namespace mbed_mqtt {
+
+/* Used by Server Core Logic */
+
+#define PG_MAP_BITS_SIZE 2
+#define PG_MAP_BITS_MASK ((1 << PG_MAP_BITS_SIZE) - 1)
+#define PG_MAP_MAX_ELEMS 4 /* should be able accomodate in 1 byte */
+#define PG_MAP_ALL_DFLTS ((1 << (PG_MAP_BITS_SIZE * PG_MAP_MAX_ELEMS)) - 1)
+
+#define PG_MAP_HAS_VALUE(pg_map, index) \
+ (((~pg_map) >> (index * PG_MAP_BITS_SIZE)) & PG_MAP_BITS_MASK)
+
+#define PG_MAP_VAL_SETUP(pg_map, qid, index) \
+ { \
+ uint32_t ofst = index * PG_MAP_BITS_SIZE; \
+ pg_map &= ~(PG_MAP_BITS_MASK << ofst); \
+ pg_map |= qid << ofst; \
+ }
+
+#define PG_MAP_VAL_RESET(pg_map, index) \
+ pg_map |= PG_MAP_BITS_MASK << (index * PG_MAP_BITS_SIZE);
+
+#if (PG_MAP_BITS_MASK != QID_VMASK)
+#error "PG_MAP_BITS_MASK must be same as 2bit QOS_VMASK"
+#endif
+
+#if ((PG_MAP_MAX_ELEMS * PG_MAP_BITS_SIZE) > 8)
+#error "Make size-of pg_map greate than 1 byte"
+#endif
+
+struct plugin_core_msg_cbs {
+
+ int32_t (*topic_enroll)(uint8_t pg_id, const struct utf8_string *topic,
+ enum mqtt_qos qos);
+ int32_t (*topic_cancel)(uint8_t pg_id, const struct utf8_string *topic);
+ int32_t (*publish)(const struct utf8_string *topic, const uint8_t *data_buf,
+ uint32_t data_len, enum mqtt_qos qos, bool retain);
+};
+
+int32_t plugin_init(const struct plugin_core_msg_cbs *cbs);
+
+/* uint16_t composition: MSB is CONNACK-Flags = 0, LSB is CONNACK-RC */
+uint16_t plugin_connect(const struct utf8_string *clientId,
+ const struct utf8_string *username,
+ const struct utf8_string *password,
+ void **app_usr);
+
+int32_t plugin_publish(uint8_t pg_map, const struct utf8_string *topic,
+ const uint8_t *payload, uint32_t pay_len,
+ bool dup, uint8_t qos, bool retain);
+
+int32_t plugin_disconn(const void *app_usr, bool due2err);
+
+}//namespace mbed_mqtt
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
+
diff -r 000000000000 -r 547251f42a60 server_util.cpp
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_util.cpp Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,135 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "server_util.h"
+
+namespace mbed_mqtt {
+
+static uint16_t msg_id = 0xFFFF;
+static inline uint16_t assign_new_msg_id()
+{
+ return msg_id += 2;
+}
+
+uint16_t mqp_new_id_server(void)
+{
+ return assign_new_msg_id();
+}
+
+static void my_pkt_free(struct mqtt_packet *mqp)
+{
+ my_free((void*) mqp);
+}
+
+/*----------------------------------------------------------------------------
+ * Try to prevent fragmentation by consistently allocating a fixed memory size
+ *----------------------------------------------------------------------------
+ */
+#ifndef CFG_SR_MAX_MQP_TX_LEN
+#define MQP_SERVER_TX_LEN 1024
+#else
+#define MQP_SERVER_TX_LEN CFG_SR_MAX_MQP_TX_LEN
+#endif
+
+/*------------------ Fixed memory configuration ends -----------------------*/
+
+static struct mqtt_packet *server_mqp_alloc(uint8_t msg_type, uint32_t buf_sz, uint8_t offset)
+{
+ uint32_t mqp_sz = sizeof(struct mqtt_packet);
+ struct mqtt_packet *mqp = NULL;
+
+ buf_sz += offset;
+
+ if((mqp_sz + buf_sz) > MQP_SERVER_TX_LEN) {
+ USR_INFO("S: fatal, buf alloc len > MQP_SERVER_TX_LEN\n\r");
+ return NULL;
+ }
+
+ mqp = (mqtt_packet*)my_malloc(MQP_SERVER_TX_LEN);
+ if(NULL != mqp) {
+ mqp_init(mqp, offset);
+
+ mqp->msg_type = msg_type;
+ mqp->maxlen = buf_sz;
+ mqp->buffer = (uint8_t*)mqp + mqp_sz;
+
+ mqp->free = my_pkt_free;
+
+ } else {
+ USR_INFO("S: fatal, failed to alloc Server MQP\n\r");
+ }
+
+ return mqp;
+}
+
+struct mqtt_packet *mqp_server_alloc(uint8_t msg_type, uint32_t buf_sz)
+{
+ return server_mqp_alloc(msg_type, buf_sz, MAX_FH_LEN);
+}
+
+struct mqtt_packet *mqp_server_copy(const struct mqtt_packet *mqp)
+{
+ struct mqtt_packet *cpy = server_mqp_alloc(mqp->msg_type,
+ mqp->maxlen, 0);
+ if(NULL != cpy) {
+ uint8_t *buffer = cpy->buffer;
+
+ /* Copy to overwrite everything in 'cpy' from source 'mqp' */
+ buf_wr_nbytes((uint8_t*)cpy, (uint8_t*)mqp, sizeof(struct mqtt_packet));
+
+ cpy->buffer = buffer; /* Restore buffer and copy */
+ buf_wr_nbytes(cpy->buffer, mqp->buffer, mqp->maxlen);
+ }
+
+ return cpy;
+}
+
+int32_t (*util_dbg_prn)(const char *fmt, ...) = NULL;
+bool util_prn_aux = false;
+
+static void (*lockin_mutex)(void*) = NULL;
+static void (*unlock_mutex)(void*) = NULL;
+static void *mutex_hnd = NULL;
+
+void util_mutex_lockin(void)
+{
+ if(lockin_mutex)
+ lockin_mutex(mutex_hnd);
+
+ return;
+}
+
+void util_mutex_unlock(void)
+{
+ if(unlock_mutex)
+ unlock_mutex(mutex_hnd);
+
+ return;
+}
+
+void util_params_set(int32_t (*dbg_prn)(const char *fmt, ...),
+ void *mutex,
+ void (*mutex_lockin)(void*),
+ void (*mutex_unlock)(void*))
+{
+ util_dbg_prn = dbg_prn;
+ mutex_hnd = mutex;
+ lockin_mutex = mutex_lockin;
+ unlock_mutex = mutex_unlock;
+
+ return;
+}
+
+}//namespace mbed_mqtt {
diff -r 000000000000 -r 547251f42a60 server_util.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/server_util.h Sat Jun 06 13:29:08 2015 +0000
@@ -0,0 +1,67 @@
+/******************************************************************************
+*
+* Copyright (C) 2014 Texas Instruments Incorporated
+*
+* All rights reserved. Property of Texas Instruments Incorporated.
+* Restricted rights to use, duplicate or disclose this code are
+* granted through contract.
+*
+* The program may not be used without the written permission of
+* Texas Instruments Incorporated or against the terms and conditions
+* stipulated in the agreement under which this program has been supplied,
+* and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#ifndef __SERVER_UTIL_H__
+#define __SERVER_UTIL_H__
+
+#include "mqtt_common.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+namespace mbed_mqtt {
+
+#define my_malloc malloc
+#define my_free free
+
+#define MQTT_SERVER_VERSTR "1.0.1"
+
+#define MIN(a,b) ((a > b)? b : a)
+
+uint16_t mqp_new_id_server(void);
+struct mqtt_packet *mqp_server_alloc(uint8_t msg_type, uint32_t buf_sz);
+
+struct mqtt_packet *mqp_server_copy(const struct mqtt_packet *mqp);
+
+extern int32_t (*util_dbg_prn)(const char *fmt, ...);
+extern bool util_prn_aux;
+
+void util_mutex_lockin(void);
+void util_mutex_unlock(void);
+
+#define MUTEX_LOCKIN() util_mutex_lockin()
+#define MUTEX_UNLOCK() util_mutex_unlock()
+
+#define USR_INFO(FMT, ...) if(util_dbg_prn) util_dbg_prn(FMT, ##__VA_ARGS__)
+
+#define DBG_INFO(FMT, ...) \
+ if(util_prn_aux && util_dbg_prn) \
+ util_dbg_prn(FMT, ##__VA_ARGS__)
+
+void util_params_set(int32_t (*dbg_prn)(const char *fmt, ...),
+ void *mutex,
+ void (*mutex_lockin)(void*),
+ void (*mutex_unlock)(void*));
+
+}//namespace mbed_mqtt {
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+