TI's MQTT Demo with freertos CM4F

Dependencies:   mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers server_core.cpp Source File

server_core.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 #include "client_mgmt.h"
00017 #include "server_util.h"
00018 #include "server_pkts.h"
00019 #include "server_plug.h"
00020 #include "server_core.h"
00021 
00022 namespace mbed_mqtt {
00023 
00024 #ifndef CFG_SR_MAX_SUBTOP_LEN
00025 #define MAX_SUBTOP_LEN 32
00026 #else
00027 #define MAX_SUBTOP_LEN CFG_SR_MAX_SUBTOP_LEN
00028 #endif
00029 
00030 #ifndef CFG_SR_MAX_TOPIC_NODE
00031 #define MAX_TOP_NODES  32
00032 #else
00033 #define MAX_TOP_NODES  CFG_SR_MAX_TOPIC_NODE
00034 #endif
00035 
00036 /* A topic name (or string or tree or hierarchy) is handled as a series of nodes.
00037    A 'topic node' refers to a level in the topic tree or hierarchy and it stores
00038    the associated part of the topic string at that level. For example, a topic
00039    having a name "abc/x/1" will have three nodes for each of the subtopics, viz,
00040    "abc/", "x/" and "1". Further, the association between the nodes is indicated
00041    as following.
00042 
00043           -- down link hierarchy -->      -- down link hierarchy --> 
00044    "abc/"                            "x/"                            "1"
00045           <-- up   link hierarchy --      <-- up   link hierarchy --
00046 
00047    To extend this example, another topic having a name "abc/y/2" will be managed
00048    as following.
00049   
00050           -- down link hierarchy -->      -- down link hierarchy --> 
00051    "abc/"                            "x/"                            "1"
00052           <-- up   link hierarchy --      <-- up   link hierarchy --
00053                                     |   ^
00054                                     |   |
00055                   down link neighbour   up link neighbour
00056                                     |   |
00057                                     V   |
00058                                           -- down link hierarchy -->
00059                                      "y/"                            "2" 
00060                                           <-- up   link hierarchy --
00061 
00062    To reduce wasted in byte alignments, the structure has been hand packed.
00063 */
00064 static struct topic_node {
00065 
00066         struct topic_node *dn_nhbr;   /* Down Link Neighbour node */
00067         struct topic_node *up_nhbr;   /* Up   Link Neighbour node */
00068 
00069         struct topic_node *dn_hier;   /* Down Link Hierarchy node */
00070         struct topic_node *up_hier;   /* Up   Link Hierarchy node */
00071 
00072         uint32_t                cl_map[3]; /* Subscribers for each QOS */
00073 
00074         uint8_t                *my_data;   /* Leaf node: retained data */
00075         uint32_t                my_dlen;
00076 
00077         void              *will_cl;   /* CL OBJ of WILL Leaf node */
00078 
00079 #define TNODE_PROP_RETAIN_DATA  0x04
00080         /* Bits 0,1 for QOS and rest are flags */
00081         uint8_t                 my_prop;
00082 
00083         uint8_t                 pg_map;    /* Map: application plugins */
00084 
00085         uint16_t    toplen;                /* Length of node sub-topic */
00086         char     subtop[MAX_SUBTOP_LEN];/* NUL terminated sub-topic */
00087 
00088 } nodes[MAX_TOP_NODES];
00089 
00090 /* 
00091     Housekeeping to manage subtopics (nodes) at run-time.
00092 */
00093 static struct _node_stack {
00094         struct topic_node *node;
00095         uint32_t                val1;
00096         uint32_t                val2;
00097 
00098 } node_stack[MAX_TOP_NODES];
00099 
00100 static int32_t stack_idx = 0;
00101 
00102 static void stack_add(struct topic_node *node, uint32_t val1, uint32_t val2)
00103 {
00104         node_stack[stack_idx].node = node;
00105         node_stack[stack_idx].val1 = val1;
00106         node_stack[stack_idx].val2 = val2;
00107 
00108         stack_idx++;
00109 }
00110 
00111 static inline struct _node_stack *stack_pop()
00112 {
00113         if(0 == stack_idx)
00114                 return NULL;
00115 
00116         return node_stack + (--stack_idx);
00117 }
00118 
00119 static inline bool is_node_retain(const struct topic_node *node)
00120 {
00121         return (node->my_prop & TNODE_PROP_RETAIN_DATA)? true : false;
00122 }
00123 
00124 static inline void node_retain_set(struct topic_node *node, bool retain)
00125 {
00126         if(retain)
00127                 node->my_prop |=  TNODE_PROP_RETAIN_DATA;
00128         else
00129                 node->my_prop &= ~TNODE_PROP_RETAIN_DATA;
00130 }
00131 
00132 static inline void node_qid_set(struct topic_node *node, uint8_t qid)
00133 {
00134         node->my_prop &= ~QID_VMASK;
00135         node->my_prop |=  qid;
00136 }
00137 
00138 static inline uint8_t node_qid_get(struct topic_node *node)
00139 {
00140         return node->my_prop & QID_VMASK;
00141 }
00142 
00143 static inline bool is_node_willed(const struct topic_node *node)
00144 {
00145         return (NULL != node->will_cl)? true : false;
00146 }
00147 
00148 static inline bool enrolls_plugin(const struct topic_node *node)
00149 {
00150         return (PG_MAP_ALL_DFLTS != node->pg_map)? true : false;
00151 }
00152 
00153 static struct topic_node *free_nodes = NULL;
00154 static struct topic_node *root_node  = NULL;
00155 
00156 static void node_reset(struct topic_node *node)
00157 {
00158         node->dn_nhbr = NULL;
00159         node->up_nhbr = NULL;
00160         node->dn_hier = NULL;
00161         node->up_hier = NULL;
00162 
00163         node->cl_map[0] = node->cl_map[1] = node->cl_map[2] = 0;
00164         node->my_data = NULL;
00165         node->my_dlen = 0;
00166         node->will_cl = NULL;
00167         node->my_prop = QID_VMASK;
00168 
00169         node->pg_map  = PG_MAP_ALL_DFLTS;
00170         node->toplen  = 0;
00171 }
00172 
00173 static void topic_node_init(void)
00174 {
00175         int i = 0;
00176 
00177         for(i = 0; i < MAX_TOP_NODES; i++) {
00178                 struct topic_node *node  = nodes + i;
00179 
00180                 node_reset(node);
00181 
00182                 node->dn_nhbr = free_nodes;
00183                 free_nodes  = node;
00184         }
00185 
00186         return;
00187 }
00188 
00189 static struct topic_node *alloc_topic_node(void)
00190 {
00191         struct topic_node *node = free_nodes;
00192         if(NULL != node) {
00193                 free_nodes  = node->dn_nhbr;
00194 
00195                 node_reset(node);
00196         }
00197 
00198         return node;
00199 }
00200 
00201 static void free_node(struct topic_node *node)
00202 {
00203         node_reset(node);
00204         
00205         node->dn_nhbr = free_nodes;
00206         free_nodes     = node;
00207 }
00208 
00209 static bool is_end_of_subtop(const char *topstr, char const **next_subtop)
00210 {
00211         bool rv = false;
00212 
00213         if('\0' == *topstr) {
00214                 *next_subtop = NULL; /* Reached end of topstr */
00215                 rv = true;
00216         }
00217         
00218         if('/'  == *topstr) {
00219                 /* Next sub-topic is NULL, if a '\0' follows '/' */
00220                 *next_subtop = *(topstr + 1)? topstr + 1 : NULL;
00221                 rv = true;
00222         }
00223 
00224         return rv;
00225 }
00226 
00227 static int32_t subtop_read(const char *topstr, char *subtop_buf, uint16_t buf_length,
00228                        char const **next_subtop)
00229 {
00230         int32_t idx = 0, len = buf_length;
00231 
00232         *next_subtop = topstr;
00233 
00234         while(idx < (len - 1)) {
00235                 subtop_buf[idx++] = *topstr;
00236 
00237                 if(is_end_of_subtop(topstr, next_subtop))
00238                         break;
00239 
00240                 topstr++;
00241         }
00242 
00243         if(idx == len) {
00244                 USR_INFO("S: Fatal, insufficient buffer for sub-str\n\r");
00245                 return -1; /* zero or insufficient buffer provided */
00246         }
00247 
00248         /* NUL terminated buffer: unless '\0' ended copy, make one */
00249         if('\0' == subtop_buf[idx - 1])
00250                 idx--; /* A fix up */
00251         else
00252                 subtop_buf[idx] = '\0';
00253 
00254         return idx;
00255 }
00256 
00257 static 
00258 struct topic_node *alloc_node_subtop(const char *topstr, char const **next_subtop)
00259 {
00260         uint16_t len = 0;
00261         struct topic_node *node = alloc_topic_node();
00262         if(NULL == node)
00263                 return NULL;
00264 
00265         len = subtop_read(topstr, node->subtop, MAX_SUBTOP_LEN, next_subtop);
00266         if(len <= 0) {
00267                 free_node(node);
00268                 return NULL;
00269         }
00270 
00271         node->toplen = len;
00272 
00273         return node;
00274 }
00275 
00276 /* Returns true if string 'subtop' is part of string 'topstr' otherwise false.
00277    Additionally, on success, pointer to next subtopic in 'topstr' is provided.
00278 */
00279 bool is_first_subtop(const char *subtop, const char *topstr, char const **next_subtop)
00280 {
00281         while(*subtop == *topstr) {
00282                 if(is_end_of_subtop(topstr, next_subtop))
00283                         return true;
00284 
00285                 subtop++;
00286                 topstr++;
00287         }
00288 
00289         return false;
00290 }
00291 
00292 /* Find a topic node in neighbour list that matches first subtopic in 'topstr'.
00293    Additionally, on success, pointer to next subtopic in 'topstr' is provided.
00294 */
00295 struct topic_node *subtop_nhbr_node_find(const struct topic_node *root_nh,
00296                                          const char *topstr, 
00297                                          char const **next_subtop)
00298 {
00299         /* This routine does not make use of 'next_subtop' */
00300         while(root_nh) {
00301                 if(true == is_first_subtop(root_nh->subtop, topstr, next_subtop))
00302                         break;
00303 
00304                 root_nh = root_nh->dn_nhbr;
00305         }
00306 
00307         return (struct topic_node*) root_nh;/* Bad: const from poiner removed */
00308 }
00309 
00310 /* Find a topic node in neighbour list that matches the given 'subtop' string */
00311 struct topic_node *nhbr_node_find(const struct topic_node *root_nh,
00312                                   const char *subtop)
00313 {
00314         const char *next_subtop = NULL;
00315         
00316         return subtop_nhbr_node_find(root_nh, subtop, &next_subtop);
00317 }
00318 
00319 /* Find leaf node of branch-combo that matches complete 'topstr'. 
00320    Modus Operandi:  For each sub topic in 'topstr', go across neighbour list,
00321    then for matching neighbour node, make its child node as root of neighbour
00322    list for another iteration for next sub topic.
00323 */
00324 /* Complex */
00325 static struct topic_node *lowest_node_find(const char *topstr, bool *flag_nh, 
00326                                            char const **next_subtop)
00327 {
00328         struct topic_node *root_nh = root_node, *node = NULL;
00329 
00330         *next_subtop = topstr;
00331         *flag_nh = false;
00332 
00333         while(root_nh) {
00334                 node = subtop_nhbr_node_find(root_nh, topstr, next_subtop);
00335                 if(NULL == node) {         /* Partial or no match */
00336                         *flag_nh = true;
00337                         node = root_nh;
00338                         break;
00339                 }
00340 
00341                 /* NULL '*next_subtop' indicates a complete match */
00342                 if(NULL == (*next_subtop))
00343                         break;
00344 
00345                 root_nh = node->dn_hier;
00346                 topstr  = *next_subtop;
00347         }
00348 
00349         return node;
00350 }
00351 
00352 struct topic_node *leaf_node_find(const char *topstr)
00353 {
00354         const char *next_subtop;
00355         bool flag_nh;   
00356         struct topic_node *node = lowest_node_find(topstr, &flag_nh,
00357                                                    &next_subtop);
00358 
00359         return (NULL == next_subtop)? node : NULL;
00360 }
00361 
00362 static void try_node_delete(struct topic_node *node); /* Forward declaration */
00363 
00364 /* Create 'child only' hierarchy of nodes to hold all sub topics in 'topstr'. 
00365    The routine returns a start node of hierarchy and also provides leaf node.
00366 */
00367 static 
00368 struct topic_node *hier_nodes_create(const char *topstr, struct topic_node **leaf)
00369 {
00370         struct topic_node *base = NULL, *node = NULL, *prev = NULL;
00371         const char *next_subtop = NULL;
00372 
00373         do {
00374                 node = alloc_node_subtop(topstr, &next_subtop);
00375 
00376                 if(NULL == node) {
00377                         /* Allocation of a node failed, free up
00378                            the previous allocations, if any, in
00379                            the hierarchy that was being created */
00380                         if(prev)
00381                                 try_node_delete(prev);
00382                         
00383                         base = NULL;
00384                         break;
00385                 }
00386 
00387                 if(NULL == prev) {
00388                         //                        prev = node;
00389                         base = node; /* First node of hierarchy */
00390                 } else {
00391                         prev->dn_hier = node;
00392                         node->up_hier = prev;
00393                         //                        prev          = node;
00394                 }
00395 
00396                 prev   = node;
00397                 topstr = next_subtop;
00398 
00399         } while(next_subtop);
00400 
00401         *leaf = node;
00402 
00403         DBG_INFO("S: Hierarchy of nodes created: Base: %s & Leaf: %s\n\r",
00404                  base? base->subtop : " ", (*leaf)? (*leaf)->subtop : " ");
00405 
00406         return base;
00407 }
00408 
00409 static void install_nhbr_node(struct topic_node *base, struct topic_node *node)
00410 {
00411         while(base->dn_nhbr) {
00412                 base = base->dn_nhbr;
00413         }
00414 
00415         base->dn_nhbr = node;
00416         node->up_nhbr = base;
00417 
00418         DBG_INFO("S: %s added as a neighbour to %s\n\r",
00419                  node->subtop, base->subtop);
00420 }
00421 
00422 static void set_up_hier_nodes(struct topic_node *up_hier, 
00423                               struct topic_node *dn_hier)
00424 {
00425         up_hier->dn_hier = dn_hier;
00426         dn_hier->up_hier = up_hier;
00427 
00428         DBG_INFO("%s added as DN HIER to %s \n\r",
00429                  dn_hier->subtop, up_hier->subtop);
00430 }
00431 
00432 static inline void install_topic_root_node(struct topic_node *node)
00433 {
00434         root_node = node;
00435 }
00436 
00437 /* Create or update tree to create branch-combo to refer to 'topstr'.
00438    Modus operandi: 
00439   
00440 */
00441 struct topic_node *topic_node_create(const char *topstr)
00442 {
00443         struct topic_node *base, *node, *leaf;
00444         const char *next_subtop = topstr;
00445         bool flag_nh;
00446 
00447         base = lowest_node_find(topstr, &flag_nh, &next_subtop);
00448         if(NULL == next_subtop)
00449                 return base;   /* Found exact match */
00450 
00451         /* Control reaches here, if either no or partial branch-combo has been
00452            found for 'topstr'. Now, let's create remaining branches for string
00453            'next_subtop' and assign them to appropriately to topic tree.
00454         */
00455         DBG_INFO("S: Creating Hierarchy for %s\n\r", next_subtop);
00456 
00457         node = hier_nodes_create(next_subtop, &leaf);
00458         if(node) {
00459                 if(NULL == base)
00460                         install_topic_root_node(node);
00461                 else if(flag_nh)
00462                         install_nhbr_node(base, node);
00463                 else 
00464                         set_up_hier_nodes(base, node);
00465 
00466                 return leaf;
00467         }
00468 
00469         return NULL;
00470 }
00471 
00472 static bool can_delete_node(const struct topic_node *node)
00473 {
00474         if(node->up_nhbr && node->up_hier) {
00475                 USR_INFO("S: fatal, node w/ up-nhbr & up-hier.\n\r");
00476                 return false;
00477         }
00478 
00479         if(node->dn_nhbr  ||
00480            node->dn_hier)
00481                 return false;
00482 
00483         return true;
00484 }
00485 
00486 static struct topic_node *node_delete(struct topic_node *node)
00487 {
00488         struct topic_node *next = NULL;
00489 
00490         if(false == can_delete_node(node))
00491                 return NULL;
00492 
00493         if(node->up_nhbr) {
00494                 node->up_nhbr->dn_nhbr = NULL;
00495                 next = node->up_nhbr;
00496         }
00497 
00498         if(node->up_hier) {
00499                 node->up_hier->dn_hier = NULL;
00500                 next = node->up_hier;
00501         }
00502 
00503         if((NULL == node->up_nhbr) &&
00504            (NULL == node->up_hier))
00505                 root_node = NULL;
00506 
00507         DBG_INFO("S: Deleted node %s\n\r", node->subtop);
00508 
00509         free_node(node);
00510 
00511         return next;
00512 }
00513 
00514 struct topbuf_desc {
00515         char   *buffer;
00516         uint16_t   maxlen;
00517         uint16_t   offset;
00518 };
00519 
00520 static bool topbuf_add(struct topbuf_desc *buf_desc, const struct topic_node *node)
00521 {
00522         const char *next_subtop;
00523         char  *buf = buf_desc->buffer + buf_desc->offset;
00524         uint16_t  len = buf_desc->maxlen - buf_desc->offset;
00525 
00526         int32_t rv = subtop_read(node->subtop, buf, len, &next_subtop);
00527         if(rv < 0)
00528                 return false;
00529 
00530         if(NULL != next_subtop) {
00531                 USR_INFO("S: topstr_add fatal, bad subtop.\n\r");
00532                 return false;
00533         }
00534 
00535         buf_desc->offset += rv;
00536 
00537         return true;
00538 }
00539 
00540 static bool topbuf_cpy(struct topbuf_desc *buf_desc, const char *subtop)
00541 {
00542         const char *next_subtop;
00543         char  *buf = buf_desc->buffer + buf_desc->offset;
00544         uint16_t  len = buf_desc->maxlen - buf_desc->offset;
00545 
00546         int32_t rv = subtop_read(subtop, buf, len, &next_subtop);
00547         if(rv < 0)
00548                 return false;
00549 
00550         if(NULL != next_subtop) {
00551                 USR_INFO("S: topstr_copy fatal, bad subtop.\n\r");
00552                 return false;
00553         }
00554 
00555         buf_desc->offset += rv;
00556 
00557         return true;
00558 }
00559 
00560 static bool has_a_wildcard(const struct topic_node *node)
00561 {
00562         const char *str = node->subtop;
00563         while('\0' != *str) {
00564                 if(('+' == *str) || ('#' == *str))
00565                         return true;
00566 
00567                 str++;
00568         }
00569 
00570         return false;
00571 }        
00572 
00573 static bool is_node_SUB_subtop(const struct topic_node *node, const char *subtop)
00574 {
00575         if(false == has_a_wildcard(node))
00576                 return ((0 == strcmp(node->subtop, subtop))                ||
00577                         (node->dn_hier  && (0 == strcmp("+/", subtop)))    ||
00578                         (!node->dn_hier && (0 == strcmp("+",  subtop))))?
00579                         true : false;
00580 
00581        return false;
00582 }
00583 
00584 /* Search node tree, created by PUB retention, for the branch combo, whose
00585    absolute sub-topic sequence 'matches', in entirety, the topic, which is
00586    being subscribed. The 'match' criteria is met either through the
00587    wildcard sub-topics or exact compare.
00588 
00589    The hierarchical search starts at the specified 'base' node and ends at
00590    the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the
00591    'topSUB', then the leaf node is returned, otherwise a NULL is returned.
00592 
00593    As part of hierarchical search, neighbouring nodes, if any, are logged
00594    for subsequent iteration of the routine.
00595 */
00596 static struct topic_node *pub_hier_search(const char *topSUB,
00597                                           const struct topic_node *base, 
00598                                           struct topbuf_desc *mk_pubtop)
00599 {
00600         const struct topic_node *node = base, *prev = NULL;
00601         const char *next_subtop = topSUB;
00602         char subtop[MAX_SUBTOP_LEN];
00603 
00604         while(next_subtop && node) {
00605                 if(subtop_read(topSUB, subtop, MAX_SUBTOP_LEN,
00606                                &next_subtop) <= 0)
00607                         break;
00608 
00609                 if(node->dn_nhbr)
00610                         stack_add(node->dn_nhbr, (uint32_t)topSUB, mk_pubtop->offset);
00611 
00612                 if(false == is_node_SUB_subtop(node, subtop))
00613                         break;    /* Node doesn't form part of published topic */
00614 
00615                 if(false == topbuf_add(mk_pubtop, node))
00616                         break;
00617 
00618                 prev = node;
00619                 node = node->dn_hier;
00620 
00621                 topSUB = next_subtop;
00622         }
00623 
00624         if(NULL != next_subtop)
00625                 node = NULL;
00626         else 
00627                 node = prev;
00628 
00629         return (struct topic_node *)node;
00630 }
00631 
00632 static bool is_node_PUB_subtop(const struct topic_node *node,
00633                                const char *subtop, bool endtop)
00634 {
00635         /* Assumes that subtop hasn't got any wildcard characater */
00636         return ((0 == strcmp(subtop, node->subtop))               ||
00637                 (!endtop && (0 == strcmp("+/", node->subtop)))    ||
00638                 (endtop  && (0 == strcmp("+",  node->subtop))))?
00639                 true : false;
00640 }
00641 
00642 /* Search node tree, created by subscriptions, for the branch combo, whose
00643    sub-topic sequence 'matches', in entirety, the absolute topic, to which
00644    data has been published. The 'match' criteria is met either through the
00645    wildcard sub-topics or exact compare.
00646 
00647    The hierarchical search starts at the specified 'base' node and ends at
00648    the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the
00649    'topPUB', then the leaf node is returned, otherwise a NULL is returned.
00650 
00651    As part of hierarchical search, neighbouring nodes, if any, are logged
00652    for subsequent iteration of the routine.
00653 */ 
00654 static struct topic_node *SUB_leaf_search(const char *topPUB,
00655                                           const struct topic_node *base)
00656 {
00657         const struct topic_node *node = base, *prev = NULL;
00658         const char *next_subtop = topPUB;
00659         char subtop[MAX_SUBTOP_LEN];
00660 
00661         while(next_subtop && node) {
00662                 if(subtop_read(topPUB, subtop, MAX_SUBTOP_LEN,
00663                                &next_subtop) <= 0)
00664                         break;
00665 
00666                 if(node->dn_nhbr)
00667                         stack_add(node->dn_nhbr, (uint32_t)topPUB, 0);
00668 
00669                 if(0 == strcmp("#", node->subtop))
00670                         goto SUB_leaf_search_exit1;
00671 
00672                 if(false == is_node_PUB_subtop(node, subtop, !next_subtop))
00673                         break; /* Node doesn't form part of published topic */
00674 
00675                 prev = node;
00676                 node = node->dn_hier;
00677 
00678                 topPUB = next_subtop;
00679         }
00680 
00681         if(NULL != next_subtop)
00682                 node = NULL;
00683         else 
00684                 node = prev;
00685 
00686 SUB_leaf_search_exit1:
00687         return (struct topic_node*) node; // Bad
00688 }
00689 
00690 
00691 /*-------------------------------------------------------------------------
00692  * MQTT Routines
00693  *-------------------------------------------------------------------------
00694  */
00695 
00696 #define WBUF_LEN   MQP_SERVER_RX_LEN /* Assignment to ease implementation */
00697 static char work_buf[WBUF_LEN];
00698 
00699 static void try_node_delete(struct topic_node *node)
00700 {
00701         while(node) {
00702 
00703                 if(is_node_retain(node) ||
00704                    is_node_willed(node) ||
00705                    enrolls_plugin(node) ||
00706                    node->cl_map[0]      ||
00707                    node->cl_map[1]      ||
00708                    node->cl_map[2])
00709                         break;
00710 
00711                 node = node_delete(node);
00712         }
00713 }
00714 
00715 /* Move this to a common file */
00716 static void pub_msg_send(const struct utf8_string *topic, const uint8_t *data_buf,
00717                          uint32_t data_len, uint8_t fh_flags, uint32_t cl_map)
00718 {
00719         enum mqtt_qos qos = ENUM_QOS(fh_flags);
00720         struct mqtt_packet *mqp = NULL;
00721 
00722         mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
00723         if(NULL == mqp)
00724                 return;
00725 
00726         if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
00727            (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) {
00728                 mqp_free(mqp);
00729                 return;
00730         }
00731 
00732         mqp_prep_fh(mqp, fh_flags);
00733 
00734         if(cl_map)
00735                 cl_pub_dispatch(cl_map, mqp);
00736 
00737         return;
00738 }
00739 
00740 static struct topic_node *SUB_node_create(const char *topSUB, uint8_t qid, void *usr_cl)
00741 {
00742         struct topic_node *leaf = topic_node_create(topSUB);
00743         if(leaf) {
00744                 uint8_t    j = 0;
00745                 uint32_t map = cl_bmap_get(usr_cl);
00746 
00747                 for(j = 0; j < 3; j++)
00748                         /* Client: clear QOS of existing sub, if any */
00749                         leaf->cl_map[j] &= ~map;
00750 
00751                 leaf->cl_map[qid] |= map;
00752 
00753                 cl_sub_count_add(usr_cl); 
00754         }
00755 
00756         return leaf;
00757 }
00758 
00759 static uint8_t proc_pub_leaf(struct topic_node *leaf, const struct utf8_string *topic,
00760                         enum mqtt_qos qos, void *usr_cl)
00761 {
00762         uint8_t  qid = QOS_VALUE(qos);
00763 
00764         if(is_node_retain(leaf)) {
00765                 /* If it is an earlier created topic w/ retained
00766                    data, then pick lower of the two QOS(s)  */
00767                 qid = MIN(node_qid_get(leaf), qid);
00768 
00769                 /* Publish the retained data to this client */
00770                 pub_msg_send(topic, leaf->my_data, leaf->my_dlen,
00771                              MAKE_FH_FLAGS(false, qid, true),
00772                              cl_bmap_get(usr_cl));
00773         }
00774 
00775         return qid;
00776 }
00777 
00778 /* Multi-level wild-card subscription - search of all of the tree i.e.
00779    "no topic" ('no_top') in particular and publish it to client, if
00780    the hierarchy has no wild-card node. */
00781 static
00782 uint8_t proc_pub_hier_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop,
00783                         enum mqtt_qos qos, void *usr_cl)
00784 {
00785         struct topic_node *node = base, *leaf = NULL;
00786         uint8_t ack = QOS_VALUE(qos);
00787 
00788         /* 1. Find the leaf node of a non wildcard branch-combo */
00789         while(node) {
00790                 if(node->dn_nhbr)
00791                         stack_add(node->dn_nhbr, 0, mk_pubtop->offset);
00792 
00793                 if(has_a_wildcard(node))
00794                         break;
00795 
00796                 if(false == topbuf_add(mk_pubtop, node))
00797                         break;
00798 
00799                 leaf = node;
00800                 node = node->dn_hier;
00801         }
00802 
00803         /* A non NULL value of 'node' would indicate a hierarchy with a
00804            wildcard (sub-)topic (the 'node') - not suitable for PUB. */
00805 
00806         if(NULL == node) {
00807                 /* 2. Send retained data, if any, to SUB Client */
00808                 struct utf8_string topic = {mk_pubtop->buffer,
00809                                             mk_pubtop->offset};
00810 
00811                 /* In this version, at this juncture, the 'leaf'
00812                    will not be NULL. Nevertheless a check (for
00813                    the sake of static analytical tools).........*/
00814                 if(leaf)
00815                         ack = proc_pub_leaf(leaf, &topic, qos, usr_cl);
00816         }
00817 
00818         return ack;
00819 }
00820 
00821 /* Multi-level wild-card subscription - search of all of the tree i.e.
00822    "no topic" ('no_top') in particular and publish it to client, if
00823    a hierarchy in the tree has no wild-card node. */
00824 static
00825 uint8_t proc_pub_tree_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop,
00826                         enum mqtt_qos qos, void *usr_cl)
00827 {
00828         uint32_t stack_ref = stack_idx;
00829         uint8_t min = QOS_VALUE(qos);
00830 
00831         if(base != NULL)
00832                 stack_add(base, 0, mk_pubtop->offset);
00833 
00834         while(stack_ref < stack_idx) {
00835                 struct _node_stack *stack = stack_pop();
00836                 uint8_t ack;
00837 
00838                 mk_pubtop->offset = (uint16_t) stack->val2;
00839 
00840                 ack = proc_pub_hier_no_top(stack->node, mk_pubtop, qos, usr_cl);
00841                 if(ack < min)
00842                         min = ack;
00843         }
00844 
00845         return min;
00846 }
00847 
00848 static uint8_t proc_pub_hier_SUBtop(const char *topSUB,  const struct topic_node *base,
00849                                struct topbuf_desc *mk_pubtop, enum mqtt_qos qos,
00850                                void *usr_cl)
00851 {
00852         struct topic_node *leaf = pub_hier_search(topSUB, base, mk_pubtop);
00853         uint8_t min = QOS_VALUE(qos);
00854 
00855         if(leaf) {
00856                 struct utf8_string topic = {mk_pubtop->buffer,
00857                                             mk_pubtop->offset};
00858 
00859                 min = proc_pub_leaf(leaf, &topic, qos, usr_cl);
00860         }
00861 
00862         return min;
00863 }
00864 
00865 /* used by sl or no wc */
00866 static uint8_t proc_pub_tree_SUBtop(const char *topSUB, struct topic_node *base,
00867                                struct topbuf_desc *mk_pubtop,
00868                                enum mqtt_qos qos, void *usr_cl)
00869 {
00870         uint32_t stack_ref = stack_idx;
00871         uint8_t min = QOS_VALUE(qos);
00872 
00873         if(NULL != base)
00874                 stack_add(base, (uint32_t)topSUB, mk_pubtop->offset);
00875 
00876         while(stack_ref < stack_idx) {
00877                 struct _node_stack *stack = stack_pop();
00878                 uint8_t ack;
00879 
00880                 mk_pubtop->offset = stack->val2;
00881                 ack = proc_pub_hier_SUBtop((char*)stack->val1,  stack->node,
00882                                            mk_pubtop, qos, usr_cl);
00883 
00884                 if(ack < min)
00885                         min = ack;
00886         }
00887 
00888         return min;
00889 }
00890 
00891 static 
00892 uint8_t proc_sub_ml_wc_hier(const char *grandpa_topSUB, char *parent_subtop,
00893                        struct topic_node *base, struct topbuf_desc *mk_pubtop,
00894                        enum mqtt_qos qos, void *usr_cl)
00895 {
00896         uint8_t min = QOS_VALUE(qos), ack = QFL_VALUE;
00897         char *subtop = NULL;
00898 
00899         /* 1. Search hier node for 'grandpa' and if found, get to parent level */
00900         if('\0' != grandpa_topSUB[0]) {
00901                 struct topic_node *leaf = pub_hier_search(grandpa_topSUB, base,
00902                                                           mk_pubtop);
00903                 if(NULL == leaf)
00904                         return min;
00905 
00906                 base = leaf->dn_hier;            /* nhbr root at parent level */
00907         }
00908 
00909         /* 2. If present, process parent as a leaf and get its down hierarchy */
00910         subtop = parent_subtop;
00911         if(('\0' != subtop[0]) && ('+' != subtop[0]) && ('/' != subtop[0])) {
00912                 uint16_t offset = mk_pubtop->offset;  /* Refer to grandpa's pubtop */
00913                 uint16_t sublen = 0;
00914 
00915                 while(subtop[sublen]){sublen++;}
00916 
00917                 ack = proc_pub_tree_SUBtop(subtop, base, mk_pubtop, qos, usr_cl);
00918                 mk_pubtop->offset = offset;      /* Restores grandpa's pubtop */
00919 
00920                 subtop[sublen] = '/';            /* Make parent's hier subtop */
00921 
00922                 base = nhbr_node_find(base, subtop);
00923                 if(base)
00924                         base =  topbuf_cpy(mk_pubtop, subtop)?
00925                                 base->dn_hier : NULL;
00926                 subtop[sublen] =  '\0';          /* Get back, original subtop */
00927         }
00928 
00929         min = MIN(min, ack);        
00930         /* 3. Process '#' WC by walking thru entire sub-tree of parent 'base' */
00931         if(NULL != base)
00932                 ack = proc_pub_tree_no_top(base, mk_pubtop, qos, usr_cl);
00933 
00934         return MIN(min, ack);
00935 }
00936 
00937 static uint8_t proc_sub_ml_wc_tree(char *grandpa_topSUB, char *parent_subtop,
00938                               struct topic_node *base,
00939                               enum mqtt_qos qos, void *usr_cl)
00940 {      
00941         struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */};
00942         uint32_t stack_ref = stack_idx;
00943         uint8_t min = QOS_VALUE(qos);
00944 
00945         if(NULL != base)
00946                 stack_add(base, (uint32_t)grandpa_topSUB, mk_pubtop.offset);
00947 
00948         while(stack_ref < stack_idx) {
00949                 struct _node_stack *stack = stack_pop();
00950                 uint8_t ack;
00951 
00952                 mk_pubtop.offset = stack->val2;
00953                 ack = proc_sub_ml_wc_hier((char*)stack->val1, parent_subtop,
00954                                           stack->node, &mk_pubtop, qos, usr_cl);
00955                 if(ack < min)
00956                         min = ack;
00957         }
00958 
00959         return min;
00960 }
00961 
00962 static uint8_t ml_wc_nodes_create(char *parent_topSUB, uint16_t toplen, uint8_t qid, void *usr_cl)
00963 {
00964         struct topic_node *parent_leaf = NULL;
00965 
00966         if('\0' != parent_topSUB[0]) {
00967                 parent_leaf = SUB_node_create(parent_topSUB, qid, usr_cl);
00968                 if(NULL == parent_leaf)
00969                         return QFL_VALUE;
00970         }
00971 
00972         /* Get the topic SUB to it's original state */
00973         if(toplen > 1) parent_topSUB[toplen - 2] = '/';
00974         parent_topSUB[toplen - 1] = '#';
00975 
00976         if(NULL == SUB_node_create(parent_topSUB, qid, usr_cl)) {
00977                 /* Failed to create WC topic, so delete parent as well.
00978                    In this revision, 'parent_leaf' will not be a 'NULL'
00979                    at this juncture, nevertheless a check (for tools) */
00980                 if(parent_leaf)
00981                         node_delete(parent_leaf);
00982 
00983                 return QFL_VALUE;
00984         }
00985 
00986         return qid;
00987 }
00988 
00989 /* Process Multi-level Wildcard Topic SUBSCRIBE */
00990 static uint8_t proc_sub_ml_wildcard(char *topSUB, uint16_t toplen, enum mqtt_qos qos,
00991                                void *usr_cl)
00992 {
00993         uint16_t len = 0, limit = MIN(toplen, MAX_SUBTOP_LEN);
00994         char subtop[MAX_SUBTOP_LEN], *ptr;
00995         uint8_t min = QOS_VALUE(qos);
00996 
00997         /* 'topSUB': Need to create grandpa topic and parent-subtopic */
00998         topSUB[toplen - 1] = '\0'; /* Remove '#' */
00999         if(toplen > 1)             /* Remove '/' */
01000                 topSUB[toplen - 2] = '\0';
01001 
01002         do {    /* Do processing to get parent sub-topic into buffer */
01003                 if('/' == topSUB[toplen - len - 1])
01004                         break;      /* found '/' */
01005 
01006                 len++; /* Copy parent characters */
01007                 subtop[MAX_SUBTOP_LEN - len] = topSUB[toplen - len];
01008         } while(len < limit);
01009 
01010         if((toplen > len) && ('/' != topSUB[toplen - len - 1]))
01011                 return QFL_VALUE;  /* Bad Length */
01012 
01013         topSUB[toplen - len] = '\0'; /* End of grand-pa's topic name */
01014         ptr = subtop + MAX_SUBTOP_LEN - len; /* Parent's leaf subtop */
01015         min = proc_sub_ml_wc_tree(topSUB, ptr, root_node, qos, usr_cl);
01016 
01017         /* Make branch-combo to complete processing of parent' topic */
01018         strcpy(topSUB + toplen - len, ptr); // topSUB[toplen - len] = *ptr;
01019 
01020         /* Create nodes for multi-level wildcard topic & it's parent */
01021         min = ml_wc_nodes_create(topSUB, toplen, min, usr_cl);
01022 
01023         return min;
01024 }
01025 
01026 /* Process Single-level Wildcard or No Wild Card Topic SUBSCRIBE */
01027 static 
01028 uint8_t proc_sub_sl_or_no_wc(const char *topSUB, enum mqtt_qos qos, void *usr_cl)
01029 {
01030         struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */};
01031         uint8_t min = QOS_VALUE(qos);
01032 
01033         /* For single level wildcard or absolute topic, find PUB nodes */
01034         min = proc_pub_tree_SUBtop(topSUB, root_node, &mk_pubtop, qos, usr_cl);
01035 
01036         if(NULL == SUB_node_create(topSUB, min, usr_cl))
01037                 min = QFL_VALUE;
01038 
01039         return min;
01040 }
01041 
01042 static uint16_t proc_forward_slash(char *buf, uint16_t len)
01043 {
01044         uint16_t i, j;
01045         for(i = 1, j = 1; i < len; i++) {
01046                 char curr = buf[i];
01047                 if(('/'  == curr) && (buf[i - 1] == curr))
01048                         continue; /* Drop consecutive '/' */
01049 
01050                 buf[j++] = curr;
01051         }
01052 
01053         if((1 != j) && ('/' == buf[j - 1]))
01054                 j--;      /* Topic can not end with a '/' */
01055 
01056         buf[j] = '\0';
01057 
01058         return j;
01059 }
01060 
01061 static inline bool is_valid_char_order(char prev, char curr)
01062 {
01063         return ((('/' != prev) && ('+' == curr))  ||
01064                 (('+' == prev) && ('/' != curr))  ||
01065                 (('/' != prev) && ('#' == curr))  ||
01066                 (('#' == prev)))? false : true;
01067 }
01068 
01069 
01070 static bool is_valid_SUB_top(const char *buf, uint16_t len)
01071 {
01072         char prev, curr;
01073         uint16_t i = 0;
01074 
01075         if((0 == len) || ('\0' == *buf))
01076                 return false;
01077 
01078         curr = buf[0];
01079         for(i = 1; (i < len) && ('\0' != curr); i++) {
01080                 prev = curr;
01081                 curr = buf[i];
01082 
01083                 if(false == is_valid_char_order(prev, curr))
01084                         break;
01085         }
01086 
01087         return (i == len)? true : false;
01088 }
01089 
01090 static bool proc_sub_msg_rx(void *usr_cl, const struct utf8_strqos *qos_topics,
01091                             uint32_t n_topics, uint16_t msg_id, uint8_t *ack)
01092 {
01093         int32_t i = 0;
01094         for(i = 0; i < n_topics; i++) {
01095                 const struct utf8_strqos *qos_top = qos_topics + i;
01096                 enum mqtt_qos qos = qos_top->qosreq;
01097                 char *buf = (char*)qos_top->buffer;
01098                 uint16_t len = qos_top->length;
01099 
01100                 /* Remove zero-topics and trailing '/' from SUB top */
01101                 len = proc_forward_slash(buf, len);
01102                 ack[i] = QFL_VALUE;
01103                 if(false == is_valid_SUB_top(buf, len))
01104                         continue;
01105 
01106                 buf[len] = '\0';  /* Dirty trick, cheeky one */
01107 
01108                 ack[i] = ('#' == buf[len - 1])? 
01109                         proc_sub_ml_wildcard(buf, len, qos, usr_cl) : 
01110                         proc_sub_sl_or_no_wc(buf, qos, usr_cl);
01111 
01112                 DBG_INFO("SUB Topic%-2d %s is ACK'ed w/ 0x%02x\n\r",
01113                          i + 1, buf, ack[i]);
01114         }
01115 
01116         return true; /* Send SUB-ACK and do not close network */
01117 }
01118 
01119 static
01120 bool proc_sub_msg_rx_locked(void *usr_cl, const struct utf8_strqos *qos_topics,
01121                             uint32_t n_topics, uint16_t msg_id, uint8_t *ack)
01122 {
01123         return proc_sub_msg_rx(usr_cl, qos_topics, n_topics, msg_id, ack);
01124 }
01125 
01126 static void leaf_un_sub(struct topic_node *leaf, void *usr_cl)
01127 {
01128         uint8_t  j   = 0;
01129         uint32_t map = cl_bmap_get(usr_cl);
01130         
01131         for(j = 0; j < 3; j++) {
01132                 /* Client: clear QOS of existing sub, if any */
01133                 if(0 == (leaf->cl_map[j] & map))
01134                         continue;
01135 
01136                 leaf->cl_map[j]  &= ~map;
01137                 cl_sub_count_del(usr_cl);
01138 
01139                 try_node_delete(leaf);
01140 
01141                 break;
01142         }
01143 }
01144 
01145 static bool proc_un_sub_msg(void *usr_cl, const struct utf8_string *topics,
01146                             uint32_t n_topics, uint16_t msg_id)
01147 {
01148         uint32_t i = 0;
01149 
01150         for(i = 0; i < n_topics; i++) {
01151                 const struct utf8_string *topic = topics + i;
01152                 struct topic_node *leaf  = NULL;
01153                 uint16_t len = topic->length;
01154 
01155                 /* The maximum length of 'work_buf' is same as that of RX buffer
01156                    in the PKT-LIB. Therefore, the WBUF_LEN is not being checked
01157                    against the length of the topic (a constituent of RX buffer).
01158                 */
01159                 strncpy(work_buf, topic->buffer, topic->length);
01160                 work_buf[len] = '\0';
01161 
01162                 if('#' == work_buf[len - 1]) { /* Multi-level Wildcard */
01163                         work_buf[len - 1]         = '\0';
01164                         if(len > 1)
01165                                 work_buf[len - 2] = '\0';
01166 
01167                         leaf = leaf_node_find(work_buf);
01168                         if(leaf)
01169                                 leaf_un_sub(leaf, usr_cl);
01170 
01171                         if(len > 1)
01172                                 work_buf[len - 2] = '/';
01173                         work_buf[len - 1]         = '#';
01174                 }
01175 
01176                 leaf = leaf_node_find(work_buf); 
01177                 if(leaf)
01178                         leaf_un_sub(leaf, usr_cl);
01179         }
01180 
01181         return true; /* Do not close network */
01182 }
01183 
01184 static
01185 bool proc_un_sub_msg_locked(void *usr_cl, const struct utf8_string *topics,
01186                             uint32_t n_topics, uint16_t msg_id)
01187 {
01188         return proc_un_sub_msg(usr_cl, topics, n_topics, msg_id);
01189 }
01190 
01191 static void
01192 leaf_msg_send(const struct topic_node *leaf, const struct utf8_string *topic,
01193               const uint8_t *data_buf, uint32_t data_len, bool dup, enum mqtt_qos qos,
01194               bool retain)
01195 {
01196         uint8_t qid = 0, fh_fgs = 0;
01197 
01198         for(qid = 0; qid < 3; qid++) {
01199                 uint8_t map = leaf->cl_map[qid];
01200                 fh_fgs = MAKE_FH_FLAGS(dup, MIN(qid, QOS_VALUE(qos)), retain);
01201 
01202                 if(map)
01203                         pub_msg_send(topic, data_buf, data_len, fh_fgs, map);
01204         }
01205 
01206         if(enrolls_plugin(leaf))
01207                 plugin_publish(leaf->pg_map, topic, data_buf, data_len,
01208                                dup, qos, retain);
01209 
01210         return;
01211 }
01212 
01213 static void node_data_set(struct topic_node *node, uint8_t *data,
01214                           uint32_t dlen, uint8_t qid, bool retain)
01215 {
01216         node->my_data  = data;
01217         node->my_dlen  = dlen;
01218 
01219         node_qid_set(node, qid);
01220         node_retain_set(node, retain);
01221 
01222         return;
01223 }
01224 
01225 static bool node_data_update(struct topic_node *node, bool drop_qid0,
01226                              const uint8_t *data_buf, uint32_t data_len, 
01227                              uint8_t qid, bool retain)
01228 {
01229 #define NODE_DATA_RESET_PARAMS  NULL, 0, 0, false
01230 
01231         /* Assumes that caller has provided either reset or valid params */
01232 
01233         uint8_t *data  = NULL;
01234 
01235         if(node->my_dlen)
01236                 my_free(node->my_data);
01237 
01238         /* Watch out for assignment in 'if' statement - avoid such smarts */
01239         if((drop_qid0 && (0 == qid)) || (data_buf  && !(data = (uint8_t*)my_malloc(data_len)))) {
01240                 node_data_set(node, NODE_DATA_RESET_PARAMS);
01241         } else {
01242 
01243                 if(data)
01244                         buf_wr_nbytes(data, data_buf, data_len);
01245 
01246                 node_data_set(node, data, data_len, qid, retain);
01247         }
01248 
01249         return ((!!data) ^ (!!data_len))? false : true; 
01250 }
01251 
01252 static inline bool is_wildcard_char(char c)
01253 {
01254         return (('+' == c) || ('#' == c))? true : false;
01255 }
01256 
01257 static int32_t pub_topic_read(const struct utf8_string *topic, char *buf, uint32_t len)
01258 {
01259         uint32_t i = 0;
01260         uint32_t toplen = topic->length;  
01261 
01262         if(len < (toplen + 1))
01263                 return -1;
01264 
01265         for(i = 0; i < toplen; i++) {
01266                 char c = topic->buffer[i];
01267                 if(is_wildcard_char(c))
01268                         return -1; /* Invalid: wildcard in PUB topic */
01269 
01270                 if('\0' == c)
01271                         return -1; /* Invalid: NUL char in PUB topic */
01272 
01273                 buf[i] = c;
01274         }
01275 
01276         buf[i] = '\0';
01277 
01278         return i;
01279 }
01280 
01281 static
01282 void proc_sub_tree_topPUB(const char *topPUB,   const struct utf8_string *topic,
01283                           const uint8_t *data_buf, uint32_t data_len, enum mqtt_qos qos,
01284                           bool retain)
01285 {
01286         struct topic_node *leaf = NULL;
01287         uint32_t stack_ref = stack_idx;
01288 
01289         if(NULL != root_node)
01290                 stack_add(root_node, (uint32_t)topPUB, 0 /* Not used */);
01291 
01292         while(stack_ref < stack_idx) {
01293                 struct _node_stack *stack = stack_pop();
01294 
01295                 /* Find leaf node of SUB that matches the PUB topic */
01296                 leaf = SUB_leaf_search((char*)stack->val1, stack->node); 
01297                 if(leaf)
01298                         leaf_msg_send(leaf, topic, data_buf, data_len,
01299                                       false, qos, retain);
01300         }
01301 }
01302 
01303 static bool _proc_pub_msg_rx(void *usr_cl, const struct utf8_string *topic,
01304                              const uint8_t *data_buf, uint32_t data_len, uint8_t msg_id,
01305                              enum mqtt_qos qos, bool retain)
01306 {
01307         int32_t err = -1;
01308 
01309         /* Prior to msg processing, chk for topic or buffer errors */
01310         if((pub_topic_read(topic,  work_buf,  WBUF_LEN) <= 0) ||
01311            (proc_forward_slash(work_buf, topic->length) <= 0))
01312                 goto _proc_pub_msg_rx_exit;
01313 
01314         /* If a valid MSG ID is specified for a QOS2 pkt, track it */
01315         err = -2;
01316         if((msg_id)           &&
01317            (MQTT_QOS2 == qos) &&
01318            (false == cl_mgmt_qos2_pub_rx_update(usr_cl, msg_id)))
01319                 goto _proc_pub_msg_rx_exit;
01320 
01321         /* Forward data to all subscribers of PUB topic in  server */
01322         proc_sub_tree_topPUB(work_buf, topic, data_buf,
01323                              data_len, qos, false);
01324 
01325         err = 0;
01326         if(retain) {
01327                 struct topic_node *leaf = topic_node_create(work_buf);
01328                 if((NULL  == leaf) ||
01329                    (false == node_data_update(leaf, true, data_buf, data_len,
01330                                               QOS_VALUE(qos), retain)))
01331                         err = -3;   /* Resources no more available */
01332 
01333                 if(leaf)
01334                         try_node_delete(leaf);
01335         }
01336 
01337  _proc_pub_msg_rx_exit:
01338         DBG_INFO("Processing of PUB message from %s (0x%08x) has %s (%d)\n\r",
01339                  usr_cl? "client" : "plugin", usr_cl? cl_bmap_get(usr_cl) : 0,
01340                  err? "failed" : "succeeded", err);
01341 
01342         return (err < 0)? false : true;
01343 }
01344 
01345 static
01346 bool proc_pub_msg_rx_locked(void *usr_cl, const struct utf8_string *topic,
01347                             const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id,
01348                             bool dup, enum mqtt_qos qos, bool retain)
01349 {
01350         return _proc_pub_msg_rx(usr_cl, topic, data_buf, data_len,
01351                                 msg_id, qos, retain);
01352 }
01353 
01354 static int32_t utf8_str_rd(const struct utf8_string *utf8, char *buf, uint32_t len)
01355 {
01356         if((NULL == utf8) || (utf8->length > (len - 1)))
01357                 return -1;
01358 
01359         buf_wr_nbytes((uint8_t*)buf, (uint8_t*)utf8->buffer, utf8->length);
01360         buf[utf8->length] = '\0';
01361 
01362         return utf8->length;
01363 }
01364 
01365 #define CONN_FLAGS_WQID_GET(conn_flags)                         \
01366         ((conn_flags >> 3) & QID_VMASK)        /* WILL QOS VAL */
01367 
01368 static uint16_t proc_connect_rx(void *ctx_cl, uint8_t conn_flags,
01369                            struct utf8_string * const *utf8_vec, void **usr_cl)
01370 {
01371         struct topic_node  *leaf = NULL;
01372         struct utf8_string *utf8 = NULL;
01373         void *app_cl = NULL;
01374         uint16_t utf8_len = 0;
01375         uint16_t rv = plugin_connect(MQC_UTF8_CLIENTID(utf8_vec),
01376                                 MQC_UTF8_USERNAME(utf8_vec),
01377                                 MQC_UTF8_PASSWORD(utf8_vec),
01378                                 &app_cl);
01379         if(rv)
01380                 goto proc_connect_rx_exit1; /* Admin did not permit connection */
01381 
01382         rv = CONNACK_RC_SVR_UNAVBL; /* Server (resource) unavailable */
01383 
01384         utf8 = MQC_UTF8_WILL_TOP(utf8_vec);
01385         if(utf8 && utf8->length) {
01386                 utf8_str_rd(utf8, work_buf, WBUF_LEN);
01387 
01388                 leaf = topic_node_create(work_buf);
01389                 if(NULL == leaf)
01390                         goto proc_connect_rx_exit2;
01391 
01392                 if(false == node_data_update(leaf, false, 
01393                                              (uint8_t*)MQC_WILL_MSG_BUF(utf8_vec),
01394                                              MQC_WILL_MSG_LEN(utf8_vec),
01395                                              CONN_FLAGS_WQID_GET(conn_flags),
01396                                              conn_flags & WILL_RETAIN_VAL))
01397                         goto proc_connect_rx_exit3;
01398         }
01399 
01400         utf8 = MQC_UTF8_CLIENTID(utf8_vec);
01401         if(utf8)
01402                 utf8_len = utf8_str_rd(utf8, work_buf, WBUF_LEN);
01403                 
01404         rv = cl_connect_rx(ctx_cl, (conn_flags & CLEAN_START_VAL)? true : false,
01405                            utf8_len? work_buf : NULL, app_cl, leaf, usr_cl);
01406         if(CONNACK_RC_REQ_ACCEPT == (rv & 0xFF)) {
01407                 if(leaf)
01408                         leaf->will_cl = *usr_cl;
01409 
01410                 return rv; /* Connection successful */
01411         }
01412 
01413         if(leaf)
01414                 node_data_update(leaf, true, NODE_DATA_RESET_PARAMS);
01415 
01416  proc_connect_rx_exit3: try_node_delete(leaf);
01417  proc_connect_rx_exit2: plugin_disconn(app_cl, true);
01418  proc_connect_rx_exit1:
01419         return rv;
01420 }
01421 
01422 static
01423 uint16_t proc_connect_rx_locked(void *ctx_cl, uint8_t conn_flags,
01424                            struct utf8_string * const *utf8_vec, void **usr_cl)
01425 {
01426         return proc_connect_rx(ctx_cl, conn_flags, utf8_vec, usr_cl);
01427 }
01428 
01429 static void session_hier_delete(struct topic_node *node, void *usr_cl)
01430 {
01431         struct topic_node *prev = NULL;
01432         uint32_t cl_map = cl_bmap_get(usr_cl);
01433 
01434         while(node) {
01435                 int32_t i = 0;
01436                 for(i = 0; i < 3; i++) {
01437                         if(node->cl_map[i] & cl_map) {
01438                                 node->cl_map[i] &= ~cl_map; 
01439                                 cl_sub_count_del(usr_cl);
01440                                 /* Client/Topic/QID 1-to-1 map */
01441                                 break; 
01442                         }
01443                 }
01444 
01445                 if(node->dn_nhbr)
01446                         stack_add(node->dn_nhbr, 0, 0);
01447 
01448                 prev = node;
01449                 node = node->dn_hier;
01450         }
01451 
01452         if(prev)
01453                 try_node_delete(prev);
01454 }
01455 
01456 void session_tree_delete(void *usr_cl)
01457 {
01458         uint32_t stack_ref = stack_idx;
01459         
01460         if(NULL != root_node)
01461                 stack_add(root_node, 0, 0);
01462 
01463         while(stack_ref < stack_idx) {
01464                 struct _node_stack *stack = stack_pop();
01465                 session_hier_delete(stack->node, usr_cl);
01466         }
01467 }
01468 
01469 static void proc_client_will(struct topic_node *leaf)
01470 {
01471         uint32_t wbuf_len = WBUF_LEN - 1; /* Make space for '\0' in wbuf */
01472         uint32_t offset   = wbuf_len;
01473         struct utf8_string topic;
01474         struct topic_node  *node;
01475 
01476         work_buf[offset] = '\0';  /* Ensures wbuf is NUL terminated */
01477         node = leaf;
01478 
01479         /* Prepare a topic string by walking back from leaf to root */
01480         do {
01481                 if(offset < node->toplen)
01482                         return;
01483 
01484                 offset -= node->toplen;
01485                 strncpy(work_buf + offset, node->subtop, node->toplen);
01486 
01487                 while(node->up_nhbr)
01488                         node = node->up_nhbr;
01489 
01490                 node = node->up_hier;
01491 
01492         } while(node);
01493 
01494         topic.buffer = work_buf + offset;
01495         topic.length = wbuf_len - offset;
01496         
01497 #define MK_QOS_ENUM(qid) ((enum mqtt_qos)(qid & QID_VMASK))
01498 
01499         proc_sub_tree_topPUB((char*)topic.buffer, 
01500                              &topic, leaf->my_data, leaf->my_dlen,
01501                              MK_QOS_ENUM(node_qid_get(leaf)),
01502                              is_node_retain(leaf));
01503 
01504 }
01505 
01506 static void on_cl_net_close(void *usr_cl, bool due2err)
01507 {
01508         struct topic_node *leaf = NULL;
01509         void *app_cl = NULL;
01510 
01511         /* See if client has a WILL that it intends to broadcast */
01512         leaf = (struct topic_node*) cl_will_hndl_get(usr_cl);
01513         if(NULL != leaf) {
01514                 if(usr_cl != leaf->will_cl)
01515                         return; /* Mismatch: should never happen */
01516 
01517                 if(due2err)
01518                         proc_client_will(leaf); /* pls broadcast */
01519 
01520                 /* Network is closing, so cleanup WILL msg store */
01521                 node_data_update(leaf, true, NODE_DATA_RESET_PARAMS);
01522                 leaf->will_cl =  NULL;
01523                 try_node_delete(leaf);
01524         }
01525 
01526         /* If not needed for future, delete session info */
01527         if(cl_can_session_delete(usr_cl))
01528                 session_tree_delete(usr_cl);
01529 
01530         /* Inform app that client has been disconnected  */
01531         app_cl = cl_app_hndl_get(usr_cl);
01532         plugin_disconn(app_cl, due2err);
01533 
01534         cl_on_net_close(usr_cl);
01535 }
01536 
01537 static
01538 void on_cl_net_close_locked(void *usr_cl, bool due2err)
01539 {
01540         on_cl_net_close(usr_cl, due2err);
01541         return;
01542 }
01543 
01544 static void on_connack_send(void *usr_cl, bool clean_session)
01545 {
01546         /* If asserted, then need to start w/ clean state */
01547         if(clean_session)
01548                 session_tree_delete(usr_cl);  
01549 
01550         cl_on_connack_send(usr_cl, clean_session);
01551 
01552         return;
01553 }
01554 
01555 static
01556 void on_connack_send_locked(void *usr_cl, bool clean_session)
01557 {
01558         on_connack_send(usr_cl, clean_session);
01559         return;
01560 }
01561 
01562 static
01563 bool proc_notify_ack_locked(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
01564 {
01565         return cl_notify_ack(usr_cl, msg_type, msg_id);
01566 }
01567 
01568 static int32_t proc_topic_enroll(uint8_t pg_id, const struct utf8_string *topic,
01569                              enum mqtt_qos qos)
01570 {
01571         struct topic_node *leaf = NULL;
01572         uint16_t len = 0;
01573 
01574         if((NULL == topic) || (NULL == topic->buffer) || (0 == topic->length))
01575                 return -1;
01576 
01577         if(WBUF_LEN < (topic->length + 1))
01578                 return -2;
01579 
01580         len = topic->length;
01581         strncpy(work_buf, topic->buffer, len);
01582         work_buf[len] = '\0';
01583 
01584         leaf = topic_node_create(work_buf);
01585         if(NULL == leaf)
01586                 return -3;
01587 
01588         PG_MAP_VAL_SETUP(leaf->pg_map, QOS_VALUE(qos), pg_id);
01589 
01590         return 0;
01591 }
01592 
01593 static
01594 int32_t proc_topic_enroll_locked(uint8_t pg_id, const struct utf8_string *topic,
01595                              enum mqtt_qos qos)
01596 {
01597         int32_t rv = 0;
01598 
01599         MUTEX_LOCKIN();
01600         rv = proc_topic_enroll(pg_id, topic, qos);
01601         MUTEX_UNLOCK();
01602 
01603         return rv;
01604 }
01605 
01606 static int32_t proc_topic_cancel(uint8_t pg_id, const struct utf8_string *topic)
01607 {
01608         struct topic_node *leaf = NULL;
01609         uint16_t len = 0;
01610 
01611         if(NULL == topic)
01612                 return -1;
01613 
01614         if(WBUF_LEN < (topic->length + 1))
01615                 return -2;
01616 
01617         len = topic->length;
01618         strncpy(work_buf, topic->buffer, len);
01619         work_buf[len] = '\0';
01620 
01621         leaf = leaf_node_find(work_buf);
01622         if(NULL == leaf)
01623                 return -2;
01624 
01625         PG_MAP_VAL_RESET(leaf->pg_map, pg_id);
01626 
01627         try_node_delete(leaf);
01628 
01629         return 0;
01630 }
01631 
01632 static
01633 int32_t proc_topic_cancel_locked(uint8_t pg_id, const struct utf8_string *topic)
01634 {
01635         int32_t rv = 0;
01636 
01637         MUTEX_LOCKIN();
01638         rv = proc_topic_cancel(pg_id, topic);
01639         MUTEX_UNLOCK();
01640 
01641         return rv;
01642 }
01643 
01644 static
01645 int32_t proc_app_pub_send_locked(const struct utf8_string *topic, const uint8_t *data_buf,
01646                              uint32_t data_len, enum mqtt_qos qos, bool retain)
01647 {
01648         bool rv;
01649 
01650         MUTEX_LOCKIN();
01651         /* Received from application, process topic for distribution */
01652         rv = _proc_pub_msg_rx(NULL, topic, data_buf, data_len,
01653                               0x00, qos, retain);
01654         MUTEX_UNLOCK();
01655 
01656         return rv? (int32_t)data_len : -1;
01657 }
01658 
01659 int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg,
01660                      const struct mqtt_server_app_cfg *app_cfg)
01661 {
01662         /* If mutex is specified, then the following set of callbacks
01663            are invoked in the locked state - enumerated by 'locked' */
01664         struct mqtt_server_msg_cbs pkts_cbs = {proc_connect_rx_locked, 
01665                                                proc_sub_msg_rx_locked,
01666                                                proc_un_sub_msg_locked,
01667                                                proc_pub_msg_rx_locked,
01668                                                proc_notify_ack_locked,
01669                                                on_cl_net_close_locked,
01670                                                on_connack_send_locked};
01671 
01672         struct plugin_core_msg_cbs core_cbs = {proc_topic_enroll_locked,
01673                                                proc_topic_cancel_locked,
01674                                                proc_app_pub_send_locked};
01675 
01676         util_params_set(lib_cfg->debug_printf,
01677                         lib_cfg->mutex,
01678                         lib_cfg->mutex_lockin,
01679                         lib_cfg->mutex_unlock);
01680 
01681         USR_INFO("Version: Server LIB %s, Common LIB %s.\n\r",
01682                  MQTT_SERVER_VERSTR, MQTT_COMMON_VERSTR);
01683 
01684         topic_node_init();
01685 
01686         cl_mgmt_init();
01687 
01688         plugin_init(&core_cbs);
01689 
01690         mqtt_server_lib_init(lib_cfg, &pkts_cbs);
01691 
01692         return 0;
01693 }
01694 
01695 }//namespace mbed_mqtt