Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
Diff: server_core.cpp
- Revision:
- 0:547251f42a60
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server_core.cpp Sat Jun 06 13:29:08 2015 +0000 @@ -0,0 +1,1695 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +#include "client_mgmt.h" +#include "server_util.h" +#include "server_pkts.h" +#include "server_plug.h" +#include "server_core.h" + +namespace mbed_mqtt { + +#ifndef CFG_SR_MAX_SUBTOP_LEN +#define MAX_SUBTOP_LEN 32 +#else +#define MAX_SUBTOP_LEN CFG_SR_MAX_SUBTOP_LEN +#endif + +#ifndef CFG_SR_MAX_TOPIC_NODE +#define MAX_TOP_NODES 32 +#else +#define MAX_TOP_NODES CFG_SR_MAX_TOPIC_NODE +#endif + +/* A topic name (or string or tree or hierarchy) is handled as a series of nodes. + A 'topic node' refers to a level in the topic tree or hierarchy and it stores + the associated part of the topic string at that level. For example, a topic + having a name "abc/x/1" will have three nodes for each of the subtopics, viz, + "abc/", "x/" and "1". Further, the association between the nodes is indicated + as following. + + -- down link hierarchy --> -- down link hierarchy --> + "abc/" "x/" "1" + <-- up link hierarchy -- <-- up link hierarchy -- + + To extend this example, another topic having a name "abc/y/2" will be managed + as following. + + -- down link hierarchy --> -- down link hierarchy --> + "abc/" "x/" "1" + <-- up link hierarchy -- <-- up link hierarchy -- + | ^ + | | + down link neighbour up link neighbour + | | + V | + -- down link hierarchy --> + "y/" "2" + <-- up link hierarchy -- + + To reduce wasted in byte alignments, the structure has been hand packed. +*/ +static struct topic_node { + + struct topic_node *dn_nhbr; /* Down Link Neighbour node */ + struct topic_node *up_nhbr; /* Up Link Neighbour node */ + + struct topic_node *dn_hier; /* Down Link Hierarchy node */ + struct topic_node *up_hier; /* Up Link Hierarchy node */ + + uint32_t cl_map[3]; /* Subscribers for each QOS */ + + uint8_t *my_data; /* Leaf node: retained data */ + uint32_t my_dlen; + + void *will_cl; /* CL OBJ of WILL Leaf node */ + +#define TNODE_PROP_RETAIN_DATA 0x04 + /* Bits 0,1 for QOS and rest are flags */ + uint8_t my_prop; + + uint8_t pg_map; /* Map: application plugins */ + + uint16_t toplen; /* Length of node sub-topic */ + char subtop[MAX_SUBTOP_LEN];/* NUL terminated sub-topic */ + +} nodes[MAX_TOP_NODES]; + +/* + Housekeeping to manage subtopics (nodes) at run-time. +*/ +static struct _node_stack { + struct topic_node *node; + uint32_t val1; + uint32_t val2; + +} node_stack[MAX_TOP_NODES]; + +static int32_t stack_idx = 0; + +static void stack_add(struct topic_node *node, uint32_t val1, uint32_t val2) +{ + node_stack[stack_idx].node = node; + node_stack[stack_idx].val1 = val1; + node_stack[stack_idx].val2 = val2; + + stack_idx++; +} + +static inline struct _node_stack *stack_pop() +{ + if(0 == stack_idx) + return NULL; + + return node_stack + (--stack_idx); +} + +static inline bool is_node_retain(const struct topic_node *node) +{ + return (node->my_prop & TNODE_PROP_RETAIN_DATA)? true : false; +} + +static inline void node_retain_set(struct topic_node *node, bool retain) +{ + if(retain) + node->my_prop |= TNODE_PROP_RETAIN_DATA; + else + node->my_prop &= ~TNODE_PROP_RETAIN_DATA; +} + +static inline void node_qid_set(struct topic_node *node, uint8_t qid) +{ + node->my_prop &= ~QID_VMASK; + node->my_prop |= qid; +} + +static inline uint8_t node_qid_get(struct topic_node *node) +{ + return node->my_prop & QID_VMASK; +} + +static inline bool is_node_willed(const struct topic_node *node) +{ + return (NULL != node->will_cl)? true : false; +} + +static inline bool enrolls_plugin(const struct topic_node *node) +{ + return (PG_MAP_ALL_DFLTS != node->pg_map)? true : false; +} + +static struct topic_node *free_nodes = NULL; +static struct topic_node *root_node = NULL; + +static void node_reset(struct topic_node *node) +{ + node->dn_nhbr = NULL; + node->up_nhbr = NULL; + node->dn_hier = NULL; + node->up_hier = NULL; + + node->cl_map[0] = node->cl_map[1] = node->cl_map[2] = 0; + node->my_data = NULL; + node->my_dlen = 0; + node->will_cl = NULL; + node->my_prop = QID_VMASK; + + node->pg_map = PG_MAP_ALL_DFLTS; + node->toplen = 0; +} + +static void topic_node_init(void) +{ + int i = 0; + + for(i = 0; i < MAX_TOP_NODES; i++) { + struct topic_node *node = nodes + i; + + node_reset(node); + + node->dn_nhbr = free_nodes; + free_nodes = node; + } + + return; +} + +static struct topic_node *alloc_topic_node(void) +{ + struct topic_node *node = free_nodes; + if(NULL != node) { + free_nodes = node->dn_nhbr; + + node_reset(node); + } + + return node; +} + +static void free_node(struct topic_node *node) +{ + node_reset(node); + + node->dn_nhbr = free_nodes; + free_nodes = node; +} + +static bool is_end_of_subtop(const char *topstr, char const **next_subtop) +{ + bool rv = false; + + if('\0' == *topstr) { + *next_subtop = NULL; /* Reached end of topstr */ + rv = true; + } + + if('/' == *topstr) { + /* Next sub-topic is NULL, if a '\0' follows '/' */ + *next_subtop = *(topstr + 1)? topstr + 1 : NULL; + rv = true; + } + + return rv; +} + +static int32_t subtop_read(const char *topstr, char *subtop_buf, uint16_t buf_length, + char const **next_subtop) +{ + int32_t idx = 0, len = buf_length; + + *next_subtop = topstr; + + while(idx < (len - 1)) { + subtop_buf[idx++] = *topstr; + + if(is_end_of_subtop(topstr, next_subtop)) + break; + + topstr++; + } + + if(idx == len) { + USR_INFO("S: Fatal, insufficient buffer for sub-str\n\r"); + return -1; /* zero or insufficient buffer provided */ + } + + /* NUL terminated buffer: unless '\0' ended copy, make one */ + if('\0' == subtop_buf[idx - 1]) + idx--; /* A fix up */ + else + subtop_buf[idx] = '\0'; + + return idx; +} + +static +struct topic_node *alloc_node_subtop(const char *topstr, char const **next_subtop) +{ + uint16_t len = 0; + struct topic_node *node = alloc_topic_node(); + if(NULL == node) + return NULL; + + len = subtop_read(topstr, node->subtop, MAX_SUBTOP_LEN, next_subtop); + if(len <= 0) { + free_node(node); + return NULL; + } + + node->toplen = len; + + return node; +} + +/* Returns true if string 'subtop' is part of string 'topstr' otherwise false. + Additionally, on success, pointer to next subtopic in 'topstr' is provided. +*/ +bool is_first_subtop(const char *subtop, const char *topstr, char const **next_subtop) +{ + while(*subtop == *topstr) { + if(is_end_of_subtop(topstr, next_subtop)) + return true; + + subtop++; + topstr++; + } + + return false; +} + +/* Find a topic node in neighbour list that matches first subtopic in 'topstr'. + Additionally, on success, pointer to next subtopic in 'topstr' is provided. +*/ +struct topic_node *subtop_nhbr_node_find(const struct topic_node *root_nh, + const char *topstr, + char const **next_subtop) +{ + /* This routine does not make use of 'next_subtop' */ + while(root_nh) { + if(true == is_first_subtop(root_nh->subtop, topstr, next_subtop)) + break; + + root_nh = root_nh->dn_nhbr; + } + + return (struct topic_node*) root_nh;/* Bad: const from poiner removed */ +} + +/* Find a topic node in neighbour list that matches the given 'subtop' string */ +struct topic_node *nhbr_node_find(const struct topic_node *root_nh, + const char *subtop) +{ + const char *next_subtop = NULL; + + return subtop_nhbr_node_find(root_nh, subtop, &next_subtop); +} + +/* Find leaf node of branch-combo that matches complete 'topstr'. + Modus Operandi: For each sub topic in 'topstr', go across neighbour list, + then for matching neighbour node, make its child node as root of neighbour + list for another iteration for next sub topic. +*/ +/* Complex */ +static struct topic_node *lowest_node_find(const char *topstr, bool *flag_nh, + char const **next_subtop) +{ + struct topic_node *root_nh = root_node, *node = NULL; + + *next_subtop = topstr; + *flag_nh = false; + + while(root_nh) { + node = subtop_nhbr_node_find(root_nh, topstr, next_subtop); + if(NULL == node) { /* Partial or no match */ + *flag_nh = true; + node = root_nh; + break; + } + + /* NULL '*next_subtop' indicates a complete match */ + if(NULL == (*next_subtop)) + break; + + root_nh = node->dn_hier; + topstr = *next_subtop; + } + + return node; +} + +struct topic_node *leaf_node_find(const char *topstr) +{ + const char *next_subtop; + bool flag_nh; + struct topic_node *node = lowest_node_find(topstr, &flag_nh, + &next_subtop); + + return (NULL == next_subtop)? node : NULL; +} + +static void try_node_delete(struct topic_node *node); /* Forward declaration */ + +/* Create 'child only' hierarchy of nodes to hold all sub topics in 'topstr'. + The routine returns a start node of hierarchy and also provides leaf node. +*/ +static +struct topic_node *hier_nodes_create(const char *topstr, struct topic_node **leaf) +{ + struct topic_node *base = NULL, *node = NULL, *prev = NULL; + const char *next_subtop = NULL; + + do { + node = alloc_node_subtop(topstr, &next_subtop); + + if(NULL == node) { + /* Allocation of a node failed, free up + the previous allocations, if any, in + the hierarchy that was being created */ + if(prev) + try_node_delete(prev); + + base = NULL; + break; + } + + if(NULL == prev) { + // prev = node; + base = node; /* First node of hierarchy */ + } else { + prev->dn_hier = node; + node->up_hier = prev; + // prev = node; + } + + prev = node; + topstr = next_subtop; + + } while(next_subtop); + + *leaf = node; + + DBG_INFO("S: Hierarchy of nodes created: Base: %s & Leaf: %s\n\r", + base? base->subtop : " ", (*leaf)? (*leaf)->subtop : " "); + + return base; +} + +static void install_nhbr_node(struct topic_node *base, struct topic_node *node) +{ + while(base->dn_nhbr) { + base = base->dn_nhbr; + } + + base->dn_nhbr = node; + node->up_nhbr = base; + + DBG_INFO("S: %s added as a neighbour to %s\n\r", + node->subtop, base->subtop); +} + +static void set_up_hier_nodes(struct topic_node *up_hier, + struct topic_node *dn_hier) +{ + up_hier->dn_hier = dn_hier; + dn_hier->up_hier = up_hier; + + DBG_INFO("%s added as DN HIER to %s \n\r", + dn_hier->subtop, up_hier->subtop); +} + +static inline void install_topic_root_node(struct topic_node *node) +{ + root_node = node; +} + +/* Create or update tree to create branch-combo to refer to 'topstr'. + Modus operandi: + +*/ +struct topic_node *topic_node_create(const char *topstr) +{ + struct topic_node *base, *node, *leaf; + const char *next_subtop = topstr; + bool flag_nh; + + base = lowest_node_find(topstr, &flag_nh, &next_subtop); + if(NULL == next_subtop) + return base; /* Found exact match */ + + /* Control reaches here, if either no or partial branch-combo has been + found for 'topstr'. Now, let's create remaining branches for string + 'next_subtop' and assign them to appropriately to topic tree. + */ + DBG_INFO("S: Creating Hierarchy for %s\n\r", next_subtop); + + node = hier_nodes_create(next_subtop, &leaf); + if(node) { + if(NULL == base) + install_topic_root_node(node); + else if(flag_nh) + install_nhbr_node(base, node); + else + set_up_hier_nodes(base, node); + + return leaf; + } + + return NULL; +} + +static bool can_delete_node(const struct topic_node *node) +{ + if(node->up_nhbr && node->up_hier) { + USR_INFO("S: fatal, node w/ up-nhbr & up-hier.\n\r"); + return false; + } + + if(node->dn_nhbr || + node->dn_hier) + return false; + + return true; +} + +static struct topic_node *node_delete(struct topic_node *node) +{ + struct topic_node *next = NULL; + + if(false == can_delete_node(node)) + return NULL; + + if(node->up_nhbr) { + node->up_nhbr->dn_nhbr = NULL; + next = node->up_nhbr; + } + + if(node->up_hier) { + node->up_hier->dn_hier = NULL; + next = node->up_hier; + } + + if((NULL == node->up_nhbr) && + (NULL == node->up_hier)) + root_node = NULL; + + DBG_INFO("S: Deleted node %s\n\r", node->subtop); + + free_node(node); + + return next; +} + +struct topbuf_desc { + char *buffer; + uint16_t maxlen; + uint16_t offset; +}; + +static bool topbuf_add(struct topbuf_desc *buf_desc, const struct topic_node *node) +{ + const char *next_subtop; + char *buf = buf_desc->buffer + buf_desc->offset; + uint16_t len = buf_desc->maxlen - buf_desc->offset; + + int32_t rv = subtop_read(node->subtop, buf, len, &next_subtop); + if(rv < 0) + return false; + + if(NULL != next_subtop) { + USR_INFO("S: topstr_add fatal, bad subtop.\n\r"); + return false; + } + + buf_desc->offset += rv; + + return true; +} + +static bool topbuf_cpy(struct topbuf_desc *buf_desc, const char *subtop) +{ + const char *next_subtop; + char *buf = buf_desc->buffer + buf_desc->offset; + uint16_t len = buf_desc->maxlen - buf_desc->offset; + + int32_t rv = subtop_read(subtop, buf, len, &next_subtop); + if(rv < 0) + return false; + + if(NULL != next_subtop) { + USR_INFO("S: topstr_copy fatal, bad subtop.\n\r"); + return false; + } + + buf_desc->offset += rv; + + return true; +} + +static bool has_a_wildcard(const struct topic_node *node) +{ + const char *str = node->subtop; + while('\0' != *str) { + if(('+' == *str) || ('#' == *str)) + return true; + + str++; + } + + return false; +} + +static bool is_node_SUB_subtop(const struct topic_node *node, const char *subtop) +{ + if(false == has_a_wildcard(node)) + return ((0 == strcmp(node->subtop, subtop)) || + (node->dn_hier && (0 == strcmp("+/", subtop))) || + (!node->dn_hier && (0 == strcmp("+", subtop))))? + true : false; + + return false; +} + +/* Search node tree, created by PUB retention, for the branch combo, whose + absolute sub-topic sequence 'matches', in entirety, the topic, which is + being subscribed. The 'match' criteria is met either through the + wildcard sub-topics or exact compare. + + The hierarchical search starts at the specified 'base' node and ends at + the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the + 'topSUB', then the leaf node is returned, otherwise a NULL is returned. + + As part of hierarchical search, neighbouring nodes, if any, are logged + for subsequent iteration of the routine. +*/ +static struct topic_node *pub_hier_search(const char *topSUB, + const struct topic_node *base, + struct topbuf_desc *mk_pubtop) +{ + const struct topic_node *node = base, *prev = NULL; + const char *next_subtop = topSUB; + char subtop[MAX_SUBTOP_LEN]; + + while(next_subtop && node) { + if(subtop_read(topSUB, subtop, MAX_SUBTOP_LEN, + &next_subtop) <= 0) + break; + + if(node->dn_nhbr) + stack_add(node->dn_nhbr, (uint32_t)topSUB, mk_pubtop->offset); + + if(false == is_node_SUB_subtop(node, subtop)) + break; /* Node doesn't form part of published topic */ + + if(false == topbuf_add(mk_pubtop, node)) + break; + + prev = node; + node = node->dn_hier; + + topSUB = next_subtop; + } + + if(NULL != next_subtop) + node = NULL; + else + node = prev; + + return (struct topic_node *)node; +} + +static bool is_node_PUB_subtop(const struct topic_node *node, + const char *subtop, bool endtop) +{ + /* Assumes that subtop hasn't got any wildcard characater */ + return ((0 == strcmp(subtop, node->subtop)) || + (!endtop && (0 == strcmp("+/", node->subtop))) || + (endtop && (0 == strcmp("+", node->subtop))))? + true : false; +} + +/* Search node tree, created by subscriptions, for the branch combo, whose + sub-topic sequence 'matches', in entirety, the absolute topic, to which + data has been published. The 'match' criteria is met either through the + wildcard sub-topics or exact compare. + + The hierarchical search starts at the specified 'base' node and ends at + the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the + 'topPUB', then the leaf node is returned, otherwise a NULL is returned. + + As part of hierarchical search, neighbouring nodes, if any, are logged + for subsequent iteration of the routine. +*/ +static struct topic_node *SUB_leaf_search(const char *topPUB, + const struct topic_node *base) +{ + const struct topic_node *node = base, *prev = NULL; + const char *next_subtop = topPUB; + char subtop[MAX_SUBTOP_LEN]; + + while(next_subtop && node) { + if(subtop_read(topPUB, subtop, MAX_SUBTOP_LEN, + &next_subtop) <= 0) + break; + + if(node->dn_nhbr) + stack_add(node->dn_nhbr, (uint32_t)topPUB, 0); + + if(0 == strcmp("#", node->subtop)) + goto SUB_leaf_search_exit1; + + if(false == is_node_PUB_subtop(node, subtop, !next_subtop)) + break; /* Node doesn't form part of published topic */ + + prev = node; + node = node->dn_hier; + + topPUB = next_subtop; + } + + if(NULL != next_subtop) + node = NULL; + else + node = prev; + +SUB_leaf_search_exit1: + return (struct topic_node*) node; // Bad +} + + +/*------------------------------------------------------------------------- + * MQTT Routines + *------------------------------------------------------------------------- + */ + +#define WBUF_LEN MQP_SERVER_RX_LEN /* Assignment to ease implementation */ +static char work_buf[WBUF_LEN]; + +static void try_node_delete(struct topic_node *node) +{ + while(node) { + + if(is_node_retain(node) || + is_node_willed(node) || + enrolls_plugin(node) || + node->cl_map[0] || + node->cl_map[1] || + node->cl_map[2]) + break; + + node = node_delete(node); + } +} + +/* Move this to a common file */ +static void pub_msg_send(const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, uint8_t fh_flags, uint32_t cl_map) +{ + enum mqtt_qos qos = ENUM_QOS(fh_flags); + struct mqtt_packet *mqp = NULL; + + mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len); + if(NULL == mqp) + return; + + if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) || + (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) { + mqp_free(mqp); + return; + } + + mqp_prep_fh(mqp, fh_flags); + + if(cl_map) + cl_pub_dispatch(cl_map, mqp); + + return; +} + +static struct topic_node *SUB_node_create(const char *topSUB, uint8_t qid, void *usr_cl) +{ + struct topic_node *leaf = topic_node_create(topSUB); + if(leaf) { + uint8_t j = 0; + uint32_t map = cl_bmap_get(usr_cl); + + for(j = 0; j < 3; j++) + /* Client: clear QOS of existing sub, if any */ + leaf->cl_map[j] &= ~map; + + leaf->cl_map[qid] |= map; + + cl_sub_count_add(usr_cl); + } + + return leaf; +} + +static uint8_t proc_pub_leaf(struct topic_node *leaf, const struct utf8_string *topic, + enum mqtt_qos qos, void *usr_cl) +{ + uint8_t qid = QOS_VALUE(qos); + + if(is_node_retain(leaf)) { + /* If it is an earlier created topic w/ retained + data, then pick lower of the two QOS(s) */ + qid = MIN(node_qid_get(leaf), qid); + + /* Publish the retained data to this client */ + pub_msg_send(topic, leaf->my_data, leaf->my_dlen, + MAKE_FH_FLAGS(false, qid, true), + cl_bmap_get(usr_cl)); + } + + return qid; +} + +/* Multi-level wild-card subscription - search of all of the tree i.e. + "no topic" ('no_top') in particular and publish it to client, if + the hierarchy has no wild-card node. */ +static +uint8_t proc_pub_hier_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + struct topic_node *node = base, *leaf = NULL; + uint8_t ack = QOS_VALUE(qos); + + /* 1. Find the leaf node of a non wildcard branch-combo */ + while(node) { + if(node->dn_nhbr) + stack_add(node->dn_nhbr, 0, mk_pubtop->offset); + + if(has_a_wildcard(node)) + break; + + if(false == topbuf_add(mk_pubtop, node)) + break; + + leaf = node; + node = node->dn_hier; + } + + /* A non NULL value of 'node' would indicate a hierarchy with a + wildcard (sub-)topic (the 'node') - not suitable for PUB. */ + + if(NULL == node) { + /* 2. Send retained data, if any, to SUB Client */ + struct utf8_string topic = {mk_pubtop->buffer, + mk_pubtop->offset}; + + /* In this version, at this juncture, the 'leaf' + will not be NULL. Nevertheless a check (for + the sake of static analytical tools).........*/ + if(leaf) + ack = proc_pub_leaf(leaf, &topic, qos, usr_cl); + } + + return ack; +} + +/* Multi-level wild-card subscription - search of all of the tree i.e. + "no topic" ('no_top') in particular and publish it to client, if + a hierarchy in the tree has no wild-card node. */ +static +uint8_t proc_pub_tree_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + uint32_t stack_ref = stack_idx; + uint8_t min = QOS_VALUE(qos); + + if(base != NULL) + stack_add(base, 0, mk_pubtop->offset); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + uint8_t ack; + + mk_pubtop->offset = (uint16_t) stack->val2; + + ack = proc_pub_hier_no_top(stack->node, mk_pubtop, qos, usr_cl); + if(ack < min) + min = ack; + } + + return min; +} + +static uint8_t proc_pub_hier_SUBtop(const char *topSUB, const struct topic_node *base, + struct topbuf_desc *mk_pubtop, enum mqtt_qos qos, + void *usr_cl) +{ + struct topic_node *leaf = pub_hier_search(topSUB, base, mk_pubtop); + uint8_t min = QOS_VALUE(qos); + + if(leaf) { + struct utf8_string topic = {mk_pubtop->buffer, + mk_pubtop->offset}; + + min = proc_pub_leaf(leaf, &topic, qos, usr_cl); + } + + return min; +} + +/* used by sl or no wc */ +static uint8_t proc_pub_tree_SUBtop(const char *topSUB, struct topic_node *base, + struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + uint32_t stack_ref = stack_idx; + uint8_t min = QOS_VALUE(qos); + + if(NULL != base) + stack_add(base, (uint32_t)topSUB, mk_pubtop->offset); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + uint8_t ack; + + mk_pubtop->offset = stack->val2; + ack = proc_pub_hier_SUBtop((char*)stack->val1, stack->node, + mk_pubtop, qos, usr_cl); + + if(ack < min) + min = ack; + } + + return min; +} + +static +uint8_t proc_sub_ml_wc_hier(const char *grandpa_topSUB, char *parent_subtop, + struct topic_node *base, struct topbuf_desc *mk_pubtop, + enum mqtt_qos qos, void *usr_cl) +{ + uint8_t min = QOS_VALUE(qos), ack = QFL_VALUE; + char *subtop = NULL; + + /* 1. Search hier node for 'grandpa' and if found, get to parent level */ + if('\0' != grandpa_topSUB[0]) { + struct topic_node *leaf = pub_hier_search(grandpa_topSUB, base, + mk_pubtop); + if(NULL == leaf) + return min; + + base = leaf->dn_hier; /* nhbr root at parent level */ + } + + /* 2. If present, process parent as a leaf and get its down hierarchy */ + subtop = parent_subtop; + if(('\0' != subtop[0]) && ('+' != subtop[0]) && ('/' != subtop[0])) { + uint16_t offset = mk_pubtop->offset; /* Refer to grandpa's pubtop */ + uint16_t sublen = 0; + + while(subtop[sublen]){sublen++;} + + ack = proc_pub_tree_SUBtop(subtop, base, mk_pubtop, qos, usr_cl); + mk_pubtop->offset = offset; /* Restores grandpa's pubtop */ + + subtop[sublen] = '/'; /* Make parent's hier subtop */ + + base = nhbr_node_find(base, subtop); + if(base) + base = topbuf_cpy(mk_pubtop, subtop)? + base->dn_hier : NULL; + subtop[sublen] = '\0'; /* Get back, original subtop */ + } + + min = MIN(min, ack); + /* 3. Process '#' WC by walking thru entire sub-tree of parent 'base' */ + if(NULL != base) + ack = proc_pub_tree_no_top(base, mk_pubtop, qos, usr_cl); + + return MIN(min, ack); +} + +static uint8_t proc_sub_ml_wc_tree(char *grandpa_topSUB, char *parent_subtop, + struct topic_node *base, + enum mqtt_qos qos, void *usr_cl) +{ + struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */}; + uint32_t stack_ref = stack_idx; + uint8_t min = QOS_VALUE(qos); + + if(NULL != base) + stack_add(base, (uint32_t)grandpa_topSUB, mk_pubtop.offset); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + uint8_t ack; + + mk_pubtop.offset = stack->val2; + ack = proc_sub_ml_wc_hier((char*)stack->val1, parent_subtop, + stack->node, &mk_pubtop, qos, usr_cl); + if(ack < min) + min = ack; + } + + return min; +} + +static uint8_t ml_wc_nodes_create(char *parent_topSUB, uint16_t toplen, uint8_t qid, void *usr_cl) +{ + struct topic_node *parent_leaf = NULL; + + if('\0' != parent_topSUB[0]) { + parent_leaf = SUB_node_create(parent_topSUB, qid, usr_cl); + if(NULL == parent_leaf) + return QFL_VALUE; + } + + /* Get the topic SUB to it's original state */ + if(toplen > 1) parent_topSUB[toplen - 2] = '/'; + parent_topSUB[toplen - 1] = '#'; + + if(NULL == SUB_node_create(parent_topSUB, qid, usr_cl)) { + /* Failed to create WC topic, so delete parent as well. + In this revision, 'parent_leaf' will not be a 'NULL' + at this juncture, nevertheless a check (for tools) */ + if(parent_leaf) + node_delete(parent_leaf); + + return QFL_VALUE; + } + + return qid; +} + +/* Process Multi-level Wildcard Topic SUBSCRIBE */ +static uint8_t proc_sub_ml_wildcard(char *topSUB, uint16_t toplen, enum mqtt_qos qos, + void *usr_cl) +{ + uint16_t len = 0, limit = MIN(toplen, MAX_SUBTOP_LEN); + char subtop[MAX_SUBTOP_LEN], *ptr; + uint8_t min = QOS_VALUE(qos); + + /* 'topSUB': Need to create grandpa topic and parent-subtopic */ + topSUB[toplen - 1] = '\0'; /* Remove '#' */ + if(toplen > 1) /* Remove '/' */ + topSUB[toplen - 2] = '\0'; + + do { /* Do processing to get parent sub-topic into buffer */ + if('/' == topSUB[toplen - len - 1]) + break; /* found '/' */ + + len++; /* Copy parent characters */ + subtop[MAX_SUBTOP_LEN - len] = topSUB[toplen - len]; + } while(len < limit); + + if((toplen > len) && ('/' != topSUB[toplen - len - 1])) + return QFL_VALUE; /* Bad Length */ + + topSUB[toplen - len] = '\0'; /* End of grand-pa's topic name */ + ptr = subtop + MAX_SUBTOP_LEN - len; /* Parent's leaf subtop */ + min = proc_sub_ml_wc_tree(topSUB, ptr, root_node, qos, usr_cl); + + /* Make branch-combo to complete processing of parent' topic */ + strcpy(topSUB + toplen - len, ptr); // topSUB[toplen - len] = *ptr; + + /* Create nodes for multi-level wildcard topic & it's parent */ + min = ml_wc_nodes_create(topSUB, toplen, min, usr_cl); + + return min; +} + +/* Process Single-level Wildcard or No Wild Card Topic SUBSCRIBE */ +static +uint8_t proc_sub_sl_or_no_wc(const char *topSUB, enum mqtt_qos qos, void *usr_cl) +{ + struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */}; + uint8_t min = QOS_VALUE(qos); + + /* For single level wildcard or absolute topic, find PUB nodes */ + min = proc_pub_tree_SUBtop(topSUB, root_node, &mk_pubtop, qos, usr_cl); + + if(NULL == SUB_node_create(topSUB, min, usr_cl)) + min = QFL_VALUE; + + return min; +} + +static uint16_t proc_forward_slash(char *buf, uint16_t len) +{ + uint16_t i, j; + for(i = 1, j = 1; i < len; i++) { + char curr = buf[i]; + if(('/' == curr) && (buf[i - 1] == curr)) + continue; /* Drop consecutive '/' */ + + buf[j++] = curr; + } + + if((1 != j) && ('/' == buf[j - 1])) + j--; /* Topic can not end with a '/' */ + + buf[j] = '\0'; + + return j; +} + +static inline bool is_valid_char_order(char prev, char curr) +{ + return ((('/' != prev) && ('+' == curr)) || + (('+' == prev) && ('/' != curr)) || + (('/' != prev) && ('#' == curr)) || + (('#' == prev)))? false : true; +} + + +static bool is_valid_SUB_top(const char *buf, uint16_t len) +{ + char prev, curr; + uint16_t i = 0; + + if((0 == len) || ('\0' == *buf)) + return false; + + curr = buf[0]; + for(i = 1; (i < len) && ('\0' != curr); i++) { + prev = curr; + curr = buf[i]; + + if(false == is_valid_char_order(prev, curr)) + break; + } + + return (i == len)? true : false; +} + +static bool proc_sub_msg_rx(void *usr_cl, const struct utf8_strqos *qos_topics, + uint32_t n_topics, uint16_t msg_id, uint8_t *ack) +{ + int32_t i = 0; + for(i = 0; i < n_topics; i++) { + const struct utf8_strqos *qos_top = qos_topics + i; + enum mqtt_qos qos = qos_top->qosreq; + char *buf = (char*)qos_top->buffer; + uint16_t len = qos_top->length; + + /* Remove zero-topics and trailing '/' from SUB top */ + len = proc_forward_slash(buf, len); + ack[i] = QFL_VALUE; + if(false == is_valid_SUB_top(buf, len)) + continue; + + buf[len] = '\0'; /* Dirty trick, cheeky one */ + + ack[i] = ('#' == buf[len - 1])? + proc_sub_ml_wildcard(buf, len, qos, usr_cl) : + proc_sub_sl_or_no_wc(buf, qos, usr_cl); + + DBG_INFO("SUB Topic%-2d %s is ACK'ed w/ 0x%02x\n\r", + i + 1, buf, ack[i]); + } + + return true; /* Send SUB-ACK and do not close network */ +} + +static +bool proc_sub_msg_rx_locked(void *usr_cl, const struct utf8_strqos *qos_topics, + uint32_t n_topics, uint16_t msg_id, uint8_t *ack) +{ + return proc_sub_msg_rx(usr_cl, qos_topics, n_topics, msg_id, ack); +} + +static void leaf_un_sub(struct topic_node *leaf, void *usr_cl) +{ + uint8_t j = 0; + uint32_t map = cl_bmap_get(usr_cl); + + for(j = 0; j < 3; j++) { + /* Client: clear QOS of existing sub, if any */ + if(0 == (leaf->cl_map[j] & map)) + continue; + + leaf->cl_map[j] &= ~map; + cl_sub_count_del(usr_cl); + + try_node_delete(leaf); + + break; + } +} + +static bool proc_un_sub_msg(void *usr_cl, const struct utf8_string *topics, + uint32_t n_topics, uint16_t msg_id) +{ + uint32_t i = 0; + + for(i = 0; i < n_topics; i++) { + const struct utf8_string *topic = topics + i; + struct topic_node *leaf = NULL; + uint16_t len = topic->length; + + /* The maximum length of 'work_buf' is same as that of RX buffer + in the PKT-LIB. Therefore, the WBUF_LEN is not being checked + against the length of the topic (a constituent of RX buffer). + */ + strncpy(work_buf, topic->buffer, topic->length); + work_buf[len] = '\0'; + + if('#' == work_buf[len - 1]) { /* Multi-level Wildcard */ + work_buf[len - 1] = '\0'; + if(len > 1) + work_buf[len - 2] = '\0'; + + leaf = leaf_node_find(work_buf); + if(leaf) + leaf_un_sub(leaf, usr_cl); + + if(len > 1) + work_buf[len - 2] = '/'; + work_buf[len - 1] = '#'; + } + + leaf = leaf_node_find(work_buf); + if(leaf) + leaf_un_sub(leaf, usr_cl); + } + + return true; /* Do not close network */ +} + +static +bool proc_un_sub_msg_locked(void *usr_cl, const struct utf8_string *topics, + uint32_t n_topics, uint16_t msg_id) +{ + return proc_un_sub_msg(usr_cl, topics, n_topics, msg_id); +} + +static void +leaf_msg_send(const struct topic_node *leaf, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, bool dup, enum mqtt_qos qos, + bool retain) +{ + uint8_t qid = 0, fh_fgs = 0; + + for(qid = 0; qid < 3; qid++) { + uint8_t map = leaf->cl_map[qid]; + fh_fgs = MAKE_FH_FLAGS(dup, MIN(qid, QOS_VALUE(qos)), retain); + + if(map) + pub_msg_send(topic, data_buf, data_len, fh_fgs, map); + } + + if(enrolls_plugin(leaf)) + plugin_publish(leaf->pg_map, topic, data_buf, data_len, + dup, qos, retain); + + return; +} + +static void node_data_set(struct topic_node *node, uint8_t *data, + uint32_t dlen, uint8_t qid, bool retain) +{ + node->my_data = data; + node->my_dlen = dlen; + + node_qid_set(node, qid); + node_retain_set(node, retain); + + return; +} + +static bool node_data_update(struct topic_node *node, bool drop_qid0, + const uint8_t *data_buf, uint32_t data_len, + uint8_t qid, bool retain) +{ +#define NODE_DATA_RESET_PARAMS NULL, 0, 0, false + + /* Assumes that caller has provided either reset or valid params */ + + uint8_t *data = NULL; + + if(node->my_dlen) + my_free(node->my_data); + + /* Watch out for assignment in 'if' statement - avoid such smarts */ + if((drop_qid0 && (0 == qid)) || (data_buf && !(data = (uint8_t*)my_malloc(data_len)))) { + node_data_set(node, NODE_DATA_RESET_PARAMS); + } else { + + if(data) + buf_wr_nbytes(data, data_buf, data_len); + + node_data_set(node, data, data_len, qid, retain); + } + + return ((!!data) ^ (!!data_len))? false : true; +} + +static inline bool is_wildcard_char(char c) +{ + return (('+' == c) || ('#' == c))? true : false; +} + +static int32_t pub_topic_read(const struct utf8_string *topic, char *buf, uint32_t len) +{ + uint32_t i = 0; + uint32_t toplen = topic->length; + + if(len < (toplen + 1)) + return -1; + + for(i = 0; i < toplen; i++) { + char c = topic->buffer[i]; + if(is_wildcard_char(c)) + return -1; /* Invalid: wildcard in PUB topic */ + + if('\0' == c) + return -1; /* Invalid: NUL char in PUB topic */ + + buf[i] = c; + } + + buf[i] = '\0'; + + return i; +} + +static +void proc_sub_tree_topPUB(const char *topPUB, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, enum mqtt_qos qos, + bool retain) +{ + struct topic_node *leaf = NULL; + uint32_t stack_ref = stack_idx; + + if(NULL != root_node) + stack_add(root_node, (uint32_t)topPUB, 0 /* Not used */); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + + /* Find leaf node of SUB that matches the PUB topic */ + leaf = SUB_leaf_search((char*)stack->val1, stack->node); + if(leaf) + leaf_msg_send(leaf, topic, data_buf, data_len, + false, qos, retain); + } +} + +static bool _proc_pub_msg_rx(void *usr_cl, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, uint8_t msg_id, + enum mqtt_qos qos, bool retain) +{ + int32_t err = -1; + + /* Prior to msg processing, chk for topic or buffer errors */ + if((pub_topic_read(topic, work_buf, WBUF_LEN) <= 0) || + (proc_forward_slash(work_buf, topic->length) <= 0)) + goto _proc_pub_msg_rx_exit; + + /* If a valid MSG ID is specified for a QOS2 pkt, track it */ + err = -2; + if((msg_id) && + (MQTT_QOS2 == qos) && + (false == cl_mgmt_qos2_pub_rx_update(usr_cl, msg_id))) + goto _proc_pub_msg_rx_exit; + + /* Forward data to all subscribers of PUB topic in server */ + proc_sub_tree_topPUB(work_buf, topic, data_buf, + data_len, qos, false); + + err = 0; + if(retain) { + struct topic_node *leaf = topic_node_create(work_buf); + if((NULL == leaf) || + (false == node_data_update(leaf, true, data_buf, data_len, + QOS_VALUE(qos), retain))) + err = -3; /* Resources no more available */ + + if(leaf) + try_node_delete(leaf); + } + + _proc_pub_msg_rx_exit: + DBG_INFO("Processing of PUB message from %s (0x%08x) has %s (%d)\n\r", + usr_cl? "client" : "plugin", usr_cl? cl_bmap_get(usr_cl) : 0, + err? "failed" : "succeeded", err); + + return (err < 0)? false : true; +} + +static +bool proc_pub_msg_rx_locked(void *usr_cl, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id, + bool dup, enum mqtt_qos qos, bool retain) +{ + return _proc_pub_msg_rx(usr_cl, topic, data_buf, data_len, + msg_id, qos, retain); +} + +static int32_t utf8_str_rd(const struct utf8_string *utf8, char *buf, uint32_t len) +{ + if((NULL == utf8) || (utf8->length > (len - 1))) + return -1; + + buf_wr_nbytes((uint8_t*)buf, (uint8_t*)utf8->buffer, utf8->length); + buf[utf8->length] = '\0'; + + return utf8->length; +} + +#define CONN_FLAGS_WQID_GET(conn_flags) \ + ((conn_flags >> 3) & QID_VMASK) /* WILL QOS VAL */ + +static uint16_t proc_connect_rx(void *ctx_cl, uint8_t conn_flags, + struct utf8_string * const *utf8_vec, void **usr_cl) +{ + struct topic_node *leaf = NULL; + struct utf8_string *utf8 = NULL; + void *app_cl = NULL; + uint16_t utf8_len = 0; + uint16_t rv = plugin_connect(MQC_UTF8_CLIENTID(utf8_vec), + MQC_UTF8_USERNAME(utf8_vec), + MQC_UTF8_PASSWORD(utf8_vec), + &app_cl); + if(rv) + goto proc_connect_rx_exit1; /* Admin did not permit connection */ + + rv = CONNACK_RC_SVR_UNAVBL; /* Server (resource) unavailable */ + + utf8 = MQC_UTF8_WILL_TOP(utf8_vec); + if(utf8 && utf8->length) { + utf8_str_rd(utf8, work_buf, WBUF_LEN); + + leaf = topic_node_create(work_buf); + if(NULL == leaf) + goto proc_connect_rx_exit2; + + if(false == node_data_update(leaf, false, + (uint8_t*)MQC_WILL_MSG_BUF(utf8_vec), + MQC_WILL_MSG_LEN(utf8_vec), + CONN_FLAGS_WQID_GET(conn_flags), + conn_flags & WILL_RETAIN_VAL)) + goto proc_connect_rx_exit3; + } + + utf8 = MQC_UTF8_CLIENTID(utf8_vec); + if(utf8) + utf8_len = utf8_str_rd(utf8, work_buf, WBUF_LEN); + + rv = cl_connect_rx(ctx_cl, (conn_flags & CLEAN_START_VAL)? true : false, + utf8_len? work_buf : NULL, app_cl, leaf, usr_cl); + if(CONNACK_RC_REQ_ACCEPT == (rv & 0xFF)) { + if(leaf) + leaf->will_cl = *usr_cl; + + return rv; /* Connection successful */ + } + + if(leaf) + node_data_update(leaf, true, NODE_DATA_RESET_PARAMS); + + proc_connect_rx_exit3: try_node_delete(leaf); + proc_connect_rx_exit2: plugin_disconn(app_cl, true); + proc_connect_rx_exit1: + return rv; +} + +static +uint16_t proc_connect_rx_locked(void *ctx_cl, uint8_t conn_flags, + struct utf8_string * const *utf8_vec, void **usr_cl) +{ + return proc_connect_rx(ctx_cl, conn_flags, utf8_vec, usr_cl); +} + +static void session_hier_delete(struct topic_node *node, void *usr_cl) +{ + struct topic_node *prev = NULL; + uint32_t cl_map = cl_bmap_get(usr_cl); + + while(node) { + int32_t i = 0; + for(i = 0; i < 3; i++) { + if(node->cl_map[i] & cl_map) { + node->cl_map[i] &= ~cl_map; + cl_sub_count_del(usr_cl); + /* Client/Topic/QID 1-to-1 map */ + break; + } + } + + if(node->dn_nhbr) + stack_add(node->dn_nhbr, 0, 0); + + prev = node; + node = node->dn_hier; + } + + if(prev) + try_node_delete(prev); +} + +void session_tree_delete(void *usr_cl) +{ + uint32_t stack_ref = stack_idx; + + if(NULL != root_node) + stack_add(root_node, 0, 0); + + while(stack_ref < stack_idx) { + struct _node_stack *stack = stack_pop(); + session_hier_delete(stack->node, usr_cl); + } +} + +static void proc_client_will(struct topic_node *leaf) +{ + uint32_t wbuf_len = WBUF_LEN - 1; /* Make space for '\0' in wbuf */ + uint32_t offset = wbuf_len; + struct utf8_string topic; + struct topic_node *node; + + work_buf[offset] = '\0'; /* Ensures wbuf is NUL terminated */ + node = leaf; + + /* Prepare a topic string by walking back from leaf to root */ + do { + if(offset < node->toplen) + return; + + offset -= node->toplen; + strncpy(work_buf + offset, node->subtop, node->toplen); + + while(node->up_nhbr) + node = node->up_nhbr; + + node = node->up_hier; + + } while(node); + + topic.buffer = work_buf + offset; + topic.length = wbuf_len - offset; + +#define MK_QOS_ENUM(qid) ((enum mqtt_qos)(qid & QID_VMASK)) + + proc_sub_tree_topPUB((char*)topic.buffer, + &topic, leaf->my_data, leaf->my_dlen, + MK_QOS_ENUM(node_qid_get(leaf)), + is_node_retain(leaf)); + +} + +static void on_cl_net_close(void *usr_cl, bool due2err) +{ + struct topic_node *leaf = NULL; + void *app_cl = NULL; + + /* See if client has a WILL that it intends to broadcast */ + leaf = (struct topic_node*) cl_will_hndl_get(usr_cl); + if(NULL != leaf) { + if(usr_cl != leaf->will_cl) + return; /* Mismatch: should never happen */ + + if(due2err) + proc_client_will(leaf); /* pls broadcast */ + + /* Network is closing, so cleanup WILL msg store */ + node_data_update(leaf, true, NODE_DATA_RESET_PARAMS); + leaf->will_cl = NULL; + try_node_delete(leaf); + } + + /* If not needed for future, delete session info */ + if(cl_can_session_delete(usr_cl)) + session_tree_delete(usr_cl); + + /* Inform app that client has been disconnected */ + app_cl = cl_app_hndl_get(usr_cl); + plugin_disconn(app_cl, due2err); + + cl_on_net_close(usr_cl); +} + +static +void on_cl_net_close_locked(void *usr_cl, bool due2err) +{ + on_cl_net_close(usr_cl, due2err); + return; +} + +static void on_connack_send(void *usr_cl, bool clean_session) +{ + /* If asserted, then need to start w/ clean state */ + if(clean_session) + session_tree_delete(usr_cl); + + cl_on_connack_send(usr_cl, clean_session); + + return; +} + +static +void on_connack_send_locked(void *usr_cl, bool clean_session) +{ + on_connack_send(usr_cl, clean_session); + return; +} + +static +bool proc_notify_ack_locked(void *usr_cl, uint8_t msg_type, uint16_t msg_id) +{ + return cl_notify_ack(usr_cl, msg_type, msg_id); +} + +static int32_t proc_topic_enroll(uint8_t pg_id, const struct utf8_string *topic, + enum mqtt_qos qos) +{ + struct topic_node *leaf = NULL; + uint16_t len = 0; + + if((NULL == topic) || (NULL == topic->buffer) || (0 == topic->length)) + return -1; + + if(WBUF_LEN < (topic->length + 1)) + return -2; + + len = topic->length; + strncpy(work_buf, topic->buffer, len); + work_buf[len] = '\0'; + + leaf = topic_node_create(work_buf); + if(NULL == leaf) + return -3; + + PG_MAP_VAL_SETUP(leaf->pg_map, QOS_VALUE(qos), pg_id); + + return 0; +} + +static +int32_t proc_topic_enroll_locked(uint8_t pg_id, const struct utf8_string *topic, + enum mqtt_qos qos) +{ + int32_t rv = 0; + + MUTEX_LOCKIN(); + rv = proc_topic_enroll(pg_id, topic, qos); + MUTEX_UNLOCK(); + + return rv; +} + +static int32_t proc_topic_cancel(uint8_t pg_id, const struct utf8_string *topic) +{ + struct topic_node *leaf = NULL; + uint16_t len = 0; + + if(NULL == topic) + return -1; + + if(WBUF_LEN < (topic->length + 1)) + return -2; + + len = topic->length; + strncpy(work_buf, topic->buffer, len); + work_buf[len] = '\0'; + + leaf = leaf_node_find(work_buf); + if(NULL == leaf) + return -2; + + PG_MAP_VAL_RESET(leaf->pg_map, pg_id); + + try_node_delete(leaf); + + return 0; +} + +static +int32_t proc_topic_cancel_locked(uint8_t pg_id, const struct utf8_string *topic) +{ + int32_t rv = 0; + + MUTEX_LOCKIN(); + rv = proc_topic_cancel(pg_id, topic); + MUTEX_UNLOCK(); + + return rv; +} + +static +int32_t proc_app_pub_send_locked(const struct utf8_string *topic, const uint8_t *data_buf, + uint32_t data_len, enum mqtt_qos qos, bool retain) +{ + bool rv; + + MUTEX_LOCKIN(); + /* Received from application, process topic for distribution */ + rv = _proc_pub_msg_rx(NULL, topic, data_buf, data_len, + 0x00, qos, retain); + MUTEX_UNLOCK(); + + return rv? (int32_t)data_len : -1; +} + +int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg, + const struct mqtt_server_app_cfg *app_cfg) +{ + /* If mutex is specified, then the following set of callbacks + are invoked in the locked state - enumerated by 'locked' */ + struct mqtt_server_msg_cbs pkts_cbs = {proc_connect_rx_locked, + proc_sub_msg_rx_locked, + proc_un_sub_msg_locked, + proc_pub_msg_rx_locked, + proc_notify_ack_locked, + on_cl_net_close_locked, + on_connack_send_locked}; + + struct plugin_core_msg_cbs core_cbs = {proc_topic_enroll_locked, + proc_topic_cancel_locked, + proc_app_pub_send_locked}; + + util_params_set(lib_cfg->debug_printf, + lib_cfg->mutex, + lib_cfg->mutex_lockin, + lib_cfg->mutex_unlock); + + USR_INFO("Version: Server LIB %s, Common LIB %s.\n\r", + MQTT_SERVER_VERSTR, MQTT_COMMON_VERSTR); + + topic_node_init(); + + cl_mgmt_init(); + + plugin_init(&core_cbs); + + mqtt_server_lib_init(lib_cfg, &pkts_cbs); + + return 0; +} + +}//namespace mbed_mqtt