TI's MQTT Demo with freertos CM4F
Embed:
(wiki syntax)
Show/hide line numbers
server_core.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 #include "client_mgmt.h" 00017 #include "server_util.h" 00018 #include "server_pkts.h" 00019 #include "server_plug.h" 00020 #include "server_core.h" 00021 00022 namespace mbed_mqtt { 00023 00024 #ifndef CFG_SR_MAX_SUBTOP_LEN 00025 #define MAX_SUBTOP_LEN 32 00026 #else 00027 #define MAX_SUBTOP_LEN CFG_SR_MAX_SUBTOP_LEN 00028 #endif 00029 00030 #ifndef CFG_SR_MAX_TOPIC_NODE 00031 #define MAX_TOP_NODES 32 00032 #else 00033 #define MAX_TOP_NODES CFG_SR_MAX_TOPIC_NODE 00034 #endif 00035 00036 /* A topic name (or string or tree or hierarchy) is handled as a series of nodes. 00037 A 'topic node' refers to a level in the topic tree or hierarchy and it stores 00038 the associated part of the topic string at that level. For example, a topic 00039 having a name "abc/x/1" will have three nodes for each of the subtopics, viz, 00040 "abc/", "x/" and "1". Further, the association between the nodes is indicated 00041 as following. 00042 00043 -- down link hierarchy --> -- down link hierarchy --> 00044 "abc/" "x/" "1" 00045 <-- up link hierarchy -- <-- up link hierarchy -- 00046 00047 To extend this example, another topic having a name "abc/y/2" will be managed 00048 as following. 00049 00050 -- down link hierarchy --> -- down link hierarchy --> 00051 "abc/" "x/" "1" 00052 <-- up link hierarchy -- <-- up link hierarchy -- 00053 | ^ 00054 | | 00055 down link neighbour up link neighbour 00056 | | 00057 V | 00058 -- down link hierarchy --> 00059 "y/" "2" 00060 <-- up link hierarchy -- 00061 00062 To reduce wasted in byte alignments, the structure has been hand packed. 00063 */ 00064 static struct topic_node { 00065 00066 struct topic_node *dn_nhbr; /* Down Link Neighbour node */ 00067 struct topic_node *up_nhbr; /* Up Link Neighbour node */ 00068 00069 struct topic_node *dn_hier; /* Down Link Hierarchy node */ 00070 struct topic_node *up_hier; /* Up Link Hierarchy node */ 00071 00072 uint32_t cl_map[3]; /* Subscribers for each QOS */ 00073 00074 uint8_t *my_data; /* Leaf node: retained data */ 00075 uint32_t my_dlen; 00076 00077 void *will_cl; /* CL OBJ of WILL Leaf node */ 00078 00079 #define TNODE_PROP_RETAIN_DATA 0x04 00080 /* Bits 0,1 for QOS and rest are flags */ 00081 uint8_t my_prop; 00082 00083 uint8_t pg_map; /* Map: application plugins */ 00084 00085 uint16_t toplen; /* Length of node sub-topic */ 00086 char subtop[MAX_SUBTOP_LEN];/* NUL terminated sub-topic */ 00087 00088 } nodes[MAX_TOP_NODES]; 00089 00090 /* 00091 Housekeeping to manage subtopics (nodes) at run-time. 00092 */ 00093 static struct _node_stack { 00094 struct topic_node *node; 00095 uint32_t val1; 00096 uint32_t val2; 00097 00098 } node_stack[MAX_TOP_NODES]; 00099 00100 static int32_t stack_idx = 0; 00101 00102 static void stack_add(struct topic_node *node, uint32_t val1, uint32_t val2) 00103 { 00104 node_stack[stack_idx].node = node; 00105 node_stack[stack_idx].val1 = val1; 00106 node_stack[stack_idx].val2 = val2; 00107 00108 stack_idx++; 00109 } 00110 00111 static inline struct _node_stack *stack_pop() 00112 { 00113 if(0 == stack_idx) 00114 return NULL; 00115 00116 return node_stack + (--stack_idx); 00117 } 00118 00119 static inline bool is_node_retain(const struct topic_node *node) 00120 { 00121 return (node->my_prop & TNODE_PROP_RETAIN_DATA)? true : false; 00122 } 00123 00124 static inline void node_retain_set(struct topic_node *node, bool retain) 00125 { 00126 if(retain) 00127 node->my_prop |= TNODE_PROP_RETAIN_DATA; 00128 else 00129 node->my_prop &= ~TNODE_PROP_RETAIN_DATA; 00130 } 00131 00132 static inline void node_qid_set(struct topic_node *node, uint8_t qid) 00133 { 00134 node->my_prop &= ~QID_VMASK; 00135 node->my_prop |= qid; 00136 } 00137 00138 static inline uint8_t node_qid_get(struct topic_node *node) 00139 { 00140 return node->my_prop & QID_VMASK; 00141 } 00142 00143 static inline bool is_node_willed(const struct topic_node *node) 00144 { 00145 return (NULL != node->will_cl)? true : false; 00146 } 00147 00148 static inline bool enrolls_plugin(const struct topic_node *node) 00149 { 00150 return (PG_MAP_ALL_DFLTS != node->pg_map)? true : false; 00151 } 00152 00153 static struct topic_node *free_nodes = NULL; 00154 static struct topic_node *root_node = NULL; 00155 00156 static void node_reset(struct topic_node *node) 00157 { 00158 node->dn_nhbr = NULL; 00159 node->up_nhbr = NULL; 00160 node->dn_hier = NULL; 00161 node->up_hier = NULL; 00162 00163 node->cl_map[0] = node->cl_map[1] = node->cl_map[2] = 0; 00164 node->my_data = NULL; 00165 node->my_dlen = 0; 00166 node->will_cl = NULL; 00167 node->my_prop = QID_VMASK; 00168 00169 node->pg_map = PG_MAP_ALL_DFLTS; 00170 node->toplen = 0; 00171 } 00172 00173 static void topic_node_init(void) 00174 { 00175 int i = 0; 00176 00177 for(i = 0; i < MAX_TOP_NODES; i++) { 00178 struct topic_node *node = nodes + i; 00179 00180 node_reset(node); 00181 00182 node->dn_nhbr = free_nodes; 00183 free_nodes = node; 00184 } 00185 00186 return; 00187 } 00188 00189 static struct topic_node *alloc_topic_node(void) 00190 { 00191 struct topic_node *node = free_nodes; 00192 if(NULL != node) { 00193 free_nodes = node->dn_nhbr; 00194 00195 node_reset(node); 00196 } 00197 00198 return node; 00199 } 00200 00201 static void free_node(struct topic_node *node) 00202 { 00203 node_reset(node); 00204 00205 node->dn_nhbr = free_nodes; 00206 free_nodes = node; 00207 } 00208 00209 static bool is_end_of_subtop(const char *topstr, char const **next_subtop) 00210 { 00211 bool rv = false; 00212 00213 if('\0' == *topstr) { 00214 *next_subtop = NULL; /* Reached end of topstr */ 00215 rv = true; 00216 } 00217 00218 if('/' == *topstr) { 00219 /* Next sub-topic is NULL, if a '\0' follows '/' */ 00220 *next_subtop = *(topstr + 1)? topstr + 1 : NULL; 00221 rv = true; 00222 } 00223 00224 return rv; 00225 } 00226 00227 static int32_t subtop_read(const char *topstr, char *subtop_buf, uint16_t buf_length, 00228 char const **next_subtop) 00229 { 00230 int32_t idx = 0, len = buf_length; 00231 00232 *next_subtop = topstr; 00233 00234 while(idx < (len - 1)) { 00235 subtop_buf[idx++] = *topstr; 00236 00237 if(is_end_of_subtop(topstr, next_subtop)) 00238 break; 00239 00240 topstr++; 00241 } 00242 00243 if(idx == len) { 00244 USR_INFO("S: Fatal, insufficient buffer for sub-str\n\r"); 00245 return -1; /* zero or insufficient buffer provided */ 00246 } 00247 00248 /* NUL terminated buffer: unless '\0' ended copy, make one */ 00249 if('\0' == subtop_buf[idx - 1]) 00250 idx--; /* A fix up */ 00251 else 00252 subtop_buf[idx] = '\0'; 00253 00254 return idx; 00255 } 00256 00257 static 00258 struct topic_node *alloc_node_subtop(const char *topstr, char const **next_subtop) 00259 { 00260 uint16_t len = 0; 00261 struct topic_node *node = alloc_topic_node(); 00262 if(NULL == node) 00263 return NULL; 00264 00265 len = subtop_read(topstr, node->subtop, MAX_SUBTOP_LEN, next_subtop); 00266 if(len <= 0) { 00267 free_node(node); 00268 return NULL; 00269 } 00270 00271 node->toplen = len; 00272 00273 return node; 00274 } 00275 00276 /* Returns true if string 'subtop' is part of string 'topstr' otherwise false. 00277 Additionally, on success, pointer to next subtopic in 'topstr' is provided. 00278 */ 00279 bool is_first_subtop(const char *subtop, const char *topstr, char const **next_subtop) 00280 { 00281 while(*subtop == *topstr) { 00282 if(is_end_of_subtop(topstr, next_subtop)) 00283 return true; 00284 00285 subtop++; 00286 topstr++; 00287 } 00288 00289 return false; 00290 } 00291 00292 /* Find a topic node in neighbour list that matches first subtopic in 'topstr'. 00293 Additionally, on success, pointer to next subtopic in 'topstr' is provided. 00294 */ 00295 struct topic_node *subtop_nhbr_node_find(const struct topic_node *root_nh, 00296 const char *topstr, 00297 char const **next_subtop) 00298 { 00299 /* This routine does not make use of 'next_subtop' */ 00300 while(root_nh) { 00301 if(true == is_first_subtop(root_nh->subtop, topstr, next_subtop)) 00302 break; 00303 00304 root_nh = root_nh->dn_nhbr; 00305 } 00306 00307 return (struct topic_node*) root_nh;/* Bad: const from poiner removed */ 00308 } 00309 00310 /* Find a topic node in neighbour list that matches the given 'subtop' string */ 00311 struct topic_node *nhbr_node_find(const struct topic_node *root_nh, 00312 const char *subtop) 00313 { 00314 const char *next_subtop = NULL; 00315 00316 return subtop_nhbr_node_find(root_nh, subtop, &next_subtop); 00317 } 00318 00319 /* Find leaf node of branch-combo that matches complete 'topstr'. 00320 Modus Operandi: For each sub topic in 'topstr', go across neighbour list, 00321 then for matching neighbour node, make its child node as root of neighbour 00322 list for another iteration for next sub topic. 00323 */ 00324 /* Complex */ 00325 static struct topic_node *lowest_node_find(const char *topstr, bool *flag_nh, 00326 char const **next_subtop) 00327 { 00328 struct topic_node *root_nh = root_node, *node = NULL; 00329 00330 *next_subtop = topstr; 00331 *flag_nh = false; 00332 00333 while(root_nh) { 00334 node = subtop_nhbr_node_find(root_nh, topstr, next_subtop); 00335 if(NULL == node) { /* Partial or no match */ 00336 *flag_nh = true; 00337 node = root_nh; 00338 break; 00339 } 00340 00341 /* NULL '*next_subtop' indicates a complete match */ 00342 if(NULL == (*next_subtop)) 00343 break; 00344 00345 root_nh = node->dn_hier; 00346 topstr = *next_subtop; 00347 } 00348 00349 return node; 00350 } 00351 00352 struct topic_node *leaf_node_find(const char *topstr) 00353 { 00354 const char *next_subtop; 00355 bool flag_nh; 00356 struct topic_node *node = lowest_node_find(topstr, &flag_nh, 00357 &next_subtop); 00358 00359 return (NULL == next_subtop)? node : NULL; 00360 } 00361 00362 static void try_node_delete(struct topic_node *node); /* Forward declaration */ 00363 00364 /* Create 'child only' hierarchy of nodes to hold all sub topics in 'topstr'. 00365 The routine returns a start node of hierarchy and also provides leaf node. 00366 */ 00367 static 00368 struct topic_node *hier_nodes_create(const char *topstr, struct topic_node **leaf) 00369 { 00370 struct topic_node *base = NULL, *node = NULL, *prev = NULL; 00371 const char *next_subtop = NULL; 00372 00373 do { 00374 node = alloc_node_subtop(topstr, &next_subtop); 00375 00376 if(NULL == node) { 00377 /* Allocation of a node failed, free up 00378 the previous allocations, if any, in 00379 the hierarchy that was being created */ 00380 if(prev) 00381 try_node_delete(prev); 00382 00383 base = NULL; 00384 break; 00385 } 00386 00387 if(NULL == prev) { 00388 // prev = node; 00389 base = node; /* First node of hierarchy */ 00390 } else { 00391 prev->dn_hier = node; 00392 node->up_hier = prev; 00393 // prev = node; 00394 } 00395 00396 prev = node; 00397 topstr = next_subtop; 00398 00399 } while(next_subtop); 00400 00401 *leaf = node; 00402 00403 DBG_INFO("S: Hierarchy of nodes created: Base: %s & Leaf: %s\n\r", 00404 base? base->subtop : " ", (*leaf)? (*leaf)->subtop : " "); 00405 00406 return base; 00407 } 00408 00409 static void install_nhbr_node(struct topic_node *base, struct topic_node *node) 00410 { 00411 while(base->dn_nhbr) { 00412 base = base->dn_nhbr; 00413 } 00414 00415 base->dn_nhbr = node; 00416 node->up_nhbr = base; 00417 00418 DBG_INFO("S: %s added as a neighbour to %s\n\r", 00419 node->subtop, base->subtop); 00420 } 00421 00422 static void set_up_hier_nodes(struct topic_node *up_hier, 00423 struct topic_node *dn_hier) 00424 { 00425 up_hier->dn_hier = dn_hier; 00426 dn_hier->up_hier = up_hier; 00427 00428 DBG_INFO("%s added as DN HIER to %s \n\r", 00429 dn_hier->subtop, up_hier->subtop); 00430 } 00431 00432 static inline void install_topic_root_node(struct topic_node *node) 00433 { 00434 root_node = node; 00435 } 00436 00437 /* Create or update tree to create branch-combo to refer to 'topstr'. 00438 Modus operandi: 00439 00440 */ 00441 struct topic_node *topic_node_create(const char *topstr) 00442 { 00443 struct topic_node *base, *node, *leaf; 00444 const char *next_subtop = topstr; 00445 bool flag_nh; 00446 00447 base = lowest_node_find(topstr, &flag_nh, &next_subtop); 00448 if(NULL == next_subtop) 00449 return base; /* Found exact match */ 00450 00451 /* Control reaches here, if either no or partial branch-combo has been 00452 found for 'topstr'. Now, let's create remaining branches for string 00453 'next_subtop' and assign them to appropriately to topic tree. 00454 */ 00455 DBG_INFO("S: Creating Hierarchy for %s\n\r", next_subtop); 00456 00457 node = hier_nodes_create(next_subtop, &leaf); 00458 if(node) { 00459 if(NULL == base) 00460 install_topic_root_node(node); 00461 else if(flag_nh) 00462 install_nhbr_node(base, node); 00463 else 00464 set_up_hier_nodes(base, node); 00465 00466 return leaf; 00467 } 00468 00469 return NULL; 00470 } 00471 00472 static bool can_delete_node(const struct topic_node *node) 00473 { 00474 if(node->up_nhbr && node->up_hier) { 00475 USR_INFO("S: fatal, node w/ up-nhbr & up-hier.\n\r"); 00476 return false; 00477 } 00478 00479 if(node->dn_nhbr || 00480 node->dn_hier) 00481 return false; 00482 00483 return true; 00484 } 00485 00486 static struct topic_node *node_delete(struct topic_node *node) 00487 { 00488 struct topic_node *next = NULL; 00489 00490 if(false == can_delete_node(node)) 00491 return NULL; 00492 00493 if(node->up_nhbr) { 00494 node->up_nhbr->dn_nhbr = NULL; 00495 next = node->up_nhbr; 00496 } 00497 00498 if(node->up_hier) { 00499 node->up_hier->dn_hier = NULL; 00500 next = node->up_hier; 00501 } 00502 00503 if((NULL == node->up_nhbr) && 00504 (NULL == node->up_hier)) 00505 root_node = NULL; 00506 00507 DBG_INFO("S: Deleted node %s\n\r", node->subtop); 00508 00509 free_node(node); 00510 00511 return next; 00512 } 00513 00514 struct topbuf_desc { 00515 char *buffer; 00516 uint16_t maxlen; 00517 uint16_t offset; 00518 }; 00519 00520 static bool topbuf_add(struct topbuf_desc *buf_desc, const struct topic_node *node) 00521 { 00522 const char *next_subtop; 00523 char *buf = buf_desc->buffer + buf_desc->offset; 00524 uint16_t len = buf_desc->maxlen - buf_desc->offset; 00525 00526 int32_t rv = subtop_read(node->subtop, buf, len, &next_subtop); 00527 if(rv < 0) 00528 return false; 00529 00530 if(NULL != next_subtop) { 00531 USR_INFO("S: topstr_add fatal, bad subtop.\n\r"); 00532 return false; 00533 } 00534 00535 buf_desc->offset += rv; 00536 00537 return true; 00538 } 00539 00540 static bool topbuf_cpy(struct topbuf_desc *buf_desc, const char *subtop) 00541 { 00542 const char *next_subtop; 00543 char *buf = buf_desc->buffer + buf_desc->offset; 00544 uint16_t len = buf_desc->maxlen - buf_desc->offset; 00545 00546 int32_t rv = subtop_read(subtop, buf, len, &next_subtop); 00547 if(rv < 0) 00548 return false; 00549 00550 if(NULL != next_subtop) { 00551 USR_INFO("S: topstr_copy fatal, bad subtop.\n\r"); 00552 return false; 00553 } 00554 00555 buf_desc->offset += rv; 00556 00557 return true; 00558 } 00559 00560 static bool has_a_wildcard(const struct topic_node *node) 00561 { 00562 const char *str = node->subtop; 00563 while('\0' != *str) { 00564 if(('+' == *str) || ('#' == *str)) 00565 return true; 00566 00567 str++; 00568 } 00569 00570 return false; 00571 } 00572 00573 static bool is_node_SUB_subtop(const struct topic_node *node, const char *subtop) 00574 { 00575 if(false == has_a_wildcard(node)) 00576 return ((0 == strcmp(node->subtop, subtop)) || 00577 (node->dn_hier && (0 == strcmp("+/", subtop))) || 00578 (!node->dn_hier && (0 == strcmp("+", subtop))))? 00579 true : false; 00580 00581 return false; 00582 } 00583 00584 /* Search node tree, created by PUB retention, for the branch combo, whose 00585 absolute sub-topic sequence 'matches', in entirety, the topic, which is 00586 being subscribed. The 'match' criteria is met either through the 00587 wildcard sub-topics or exact compare. 00588 00589 The hierarchical search starts at the specified 'base' node and ends at 00590 the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the 00591 'topSUB', then the leaf node is returned, otherwise a NULL is returned. 00592 00593 As part of hierarchical search, neighbouring nodes, if any, are logged 00594 for subsequent iteration of the routine. 00595 */ 00596 static struct topic_node *pub_hier_search(const char *topSUB, 00597 const struct topic_node *base, 00598 struct topbuf_desc *mk_pubtop) 00599 { 00600 const struct topic_node *node = base, *prev = NULL; 00601 const char *next_subtop = topSUB; 00602 char subtop[MAX_SUBTOP_LEN]; 00603 00604 while(next_subtop && node) { 00605 if(subtop_read(topSUB, subtop, MAX_SUBTOP_LEN, 00606 &next_subtop) <= 0) 00607 break; 00608 00609 if(node->dn_nhbr) 00610 stack_add(node->dn_nhbr, (uint32_t)topSUB, mk_pubtop->offset); 00611 00612 if(false == is_node_SUB_subtop(node, subtop)) 00613 break; /* Node doesn't form part of published topic */ 00614 00615 if(false == topbuf_add(mk_pubtop, node)) 00616 break; 00617 00618 prev = node; 00619 node = node->dn_hier; 00620 00621 topSUB = next_subtop; 00622 } 00623 00624 if(NULL != next_subtop) 00625 node = NULL; 00626 else 00627 node = prev; 00628 00629 return (struct topic_node *)node; 00630 } 00631 00632 static bool is_node_PUB_subtop(const struct topic_node *node, 00633 const char *subtop, bool endtop) 00634 { 00635 /* Assumes that subtop hasn't got any wildcard characater */ 00636 return ((0 == strcmp(subtop, node->subtop)) || 00637 (!endtop && (0 == strcmp("+/", node->subtop))) || 00638 (endtop && (0 == strcmp("+", node->subtop))))? 00639 true : false; 00640 } 00641 00642 /* Search node tree, created by subscriptions, for the branch combo, whose 00643 sub-topic sequence 'matches', in entirety, the absolute topic, to which 00644 data has been published. The 'match' criteria is met either through the 00645 wildcard sub-topics or exact compare. 00646 00647 The hierarchical search starts at the specified 'base' node and ends at 00648 the leaf node. If the sequence of 'base-to-leaf' sub-topics 'match' the 00649 'topPUB', then the leaf node is returned, otherwise a NULL is returned. 00650 00651 As part of hierarchical search, neighbouring nodes, if any, are logged 00652 for subsequent iteration of the routine. 00653 */ 00654 static struct topic_node *SUB_leaf_search(const char *topPUB, 00655 const struct topic_node *base) 00656 { 00657 const struct topic_node *node = base, *prev = NULL; 00658 const char *next_subtop = topPUB; 00659 char subtop[MAX_SUBTOP_LEN]; 00660 00661 while(next_subtop && node) { 00662 if(subtop_read(topPUB, subtop, MAX_SUBTOP_LEN, 00663 &next_subtop) <= 0) 00664 break; 00665 00666 if(node->dn_nhbr) 00667 stack_add(node->dn_nhbr, (uint32_t)topPUB, 0); 00668 00669 if(0 == strcmp("#", node->subtop)) 00670 goto SUB_leaf_search_exit1; 00671 00672 if(false == is_node_PUB_subtop(node, subtop, !next_subtop)) 00673 break; /* Node doesn't form part of published topic */ 00674 00675 prev = node; 00676 node = node->dn_hier; 00677 00678 topPUB = next_subtop; 00679 } 00680 00681 if(NULL != next_subtop) 00682 node = NULL; 00683 else 00684 node = prev; 00685 00686 SUB_leaf_search_exit1: 00687 return (struct topic_node*) node; // Bad 00688 } 00689 00690 00691 /*------------------------------------------------------------------------- 00692 * MQTT Routines 00693 *------------------------------------------------------------------------- 00694 */ 00695 00696 #define WBUF_LEN MQP_SERVER_RX_LEN /* Assignment to ease implementation */ 00697 static char work_buf[WBUF_LEN]; 00698 00699 static void try_node_delete(struct topic_node *node) 00700 { 00701 while(node) { 00702 00703 if(is_node_retain(node) || 00704 is_node_willed(node) || 00705 enrolls_plugin(node) || 00706 node->cl_map[0] || 00707 node->cl_map[1] || 00708 node->cl_map[2]) 00709 break; 00710 00711 node = node_delete(node); 00712 } 00713 } 00714 00715 /* Move this to a common file */ 00716 static void pub_msg_send(const struct utf8_string *topic, const uint8_t *data_buf, 00717 uint32_t data_len, uint8_t fh_flags, uint32_t cl_map) 00718 { 00719 enum mqtt_qos qos = ENUM_QOS(fh_flags); 00720 struct mqtt_packet *mqp = NULL; 00721 00722 mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len); 00723 if(NULL == mqp) 00724 return; 00725 00726 if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) || 00727 (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) { 00728 mqp_free(mqp); 00729 return; 00730 } 00731 00732 mqp_prep_fh(mqp, fh_flags); 00733 00734 if(cl_map) 00735 cl_pub_dispatch(cl_map, mqp); 00736 00737 return; 00738 } 00739 00740 static struct topic_node *SUB_node_create(const char *topSUB, uint8_t qid, void *usr_cl) 00741 { 00742 struct topic_node *leaf = topic_node_create(topSUB); 00743 if(leaf) { 00744 uint8_t j = 0; 00745 uint32_t map = cl_bmap_get(usr_cl); 00746 00747 for(j = 0; j < 3; j++) 00748 /* Client: clear QOS of existing sub, if any */ 00749 leaf->cl_map[j] &= ~map; 00750 00751 leaf->cl_map[qid] |= map; 00752 00753 cl_sub_count_add(usr_cl); 00754 } 00755 00756 return leaf; 00757 } 00758 00759 static uint8_t proc_pub_leaf(struct topic_node *leaf, const struct utf8_string *topic, 00760 enum mqtt_qos qos, void *usr_cl) 00761 { 00762 uint8_t qid = QOS_VALUE(qos); 00763 00764 if(is_node_retain(leaf)) { 00765 /* If it is an earlier created topic w/ retained 00766 data, then pick lower of the two QOS(s) */ 00767 qid = MIN(node_qid_get(leaf), qid); 00768 00769 /* Publish the retained data to this client */ 00770 pub_msg_send(topic, leaf->my_data, leaf->my_dlen, 00771 MAKE_FH_FLAGS(false, qid, true), 00772 cl_bmap_get(usr_cl)); 00773 } 00774 00775 return qid; 00776 } 00777 00778 /* Multi-level wild-card subscription - search of all of the tree i.e. 00779 "no topic" ('no_top') in particular and publish it to client, if 00780 the hierarchy has no wild-card node. */ 00781 static 00782 uint8_t proc_pub_hier_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop, 00783 enum mqtt_qos qos, void *usr_cl) 00784 { 00785 struct topic_node *node = base, *leaf = NULL; 00786 uint8_t ack = QOS_VALUE(qos); 00787 00788 /* 1. Find the leaf node of a non wildcard branch-combo */ 00789 while(node) { 00790 if(node->dn_nhbr) 00791 stack_add(node->dn_nhbr, 0, mk_pubtop->offset); 00792 00793 if(has_a_wildcard(node)) 00794 break; 00795 00796 if(false == topbuf_add(mk_pubtop, node)) 00797 break; 00798 00799 leaf = node; 00800 node = node->dn_hier; 00801 } 00802 00803 /* A non NULL value of 'node' would indicate a hierarchy with a 00804 wildcard (sub-)topic (the 'node') - not suitable for PUB. */ 00805 00806 if(NULL == node) { 00807 /* 2. Send retained data, if any, to SUB Client */ 00808 struct utf8_string topic = {mk_pubtop->buffer, 00809 mk_pubtop->offset}; 00810 00811 /* In this version, at this juncture, the 'leaf' 00812 will not be NULL. Nevertheless a check (for 00813 the sake of static analytical tools).........*/ 00814 if(leaf) 00815 ack = proc_pub_leaf(leaf, &topic, qos, usr_cl); 00816 } 00817 00818 return ack; 00819 } 00820 00821 /* Multi-level wild-card subscription - search of all of the tree i.e. 00822 "no topic" ('no_top') in particular and publish it to client, if 00823 a hierarchy in the tree has no wild-card node. */ 00824 static 00825 uint8_t proc_pub_tree_no_top(struct topic_node *base, struct topbuf_desc *mk_pubtop, 00826 enum mqtt_qos qos, void *usr_cl) 00827 { 00828 uint32_t stack_ref = stack_idx; 00829 uint8_t min = QOS_VALUE(qos); 00830 00831 if(base != NULL) 00832 stack_add(base, 0, mk_pubtop->offset); 00833 00834 while(stack_ref < stack_idx) { 00835 struct _node_stack *stack = stack_pop(); 00836 uint8_t ack; 00837 00838 mk_pubtop->offset = (uint16_t) stack->val2; 00839 00840 ack = proc_pub_hier_no_top(stack->node, mk_pubtop, qos, usr_cl); 00841 if(ack < min) 00842 min = ack; 00843 } 00844 00845 return min; 00846 } 00847 00848 static uint8_t proc_pub_hier_SUBtop(const char *topSUB, const struct topic_node *base, 00849 struct topbuf_desc *mk_pubtop, enum mqtt_qos qos, 00850 void *usr_cl) 00851 { 00852 struct topic_node *leaf = pub_hier_search(topSUB, base, mk_pubtop); 00853 uint8_t min = QOS_VALUE(qos); 00854 00855 if(leaf) { 00856 struct utf8_string topic = {mk_pubtop->buffer, 00857 mk_pubtop->offset}; 00858 00859 min = proc_pub_leaf(leaf, &topic, qos, usr_cl); 00860 } 00861 00862 return min; 00863 } 00864 00865 /* used by sl or no wc */ 00866 static uint8_t proc_pub_tree_SUBtop(const char *topSUB, struct topic_node *base, 00867 struct topbuf_desc *mk_pubtop, 00868 enum mqtt_qos qos, void *usr_cl) 00869 { 00870 uint32_t stack_ref = stack_idx; 00871 uint8_t min = QOS_VALUE(qos); 00872 00873 if(NULL != base) 00874 stack_add(base, (uint32_t)topSUB, mk_pubtop->offset); 00875 00876 while(stack_ref < stack_idx) { 00877 struct _node_stack *stack = stack_pop(); 00878 uint8_t ack; 00879 00880 mk_pubtop->offset = stack->val2; 00881 ack = proc_pub_hier_SUBtop((char*)stack->val1, stack->node, 00882 mk_pubtop, qos, usr_cl); 00883 00884 if(ack < min) 00885 min = ack; 00886 } 00887 00888 return min; 00889 } 00890 00891 static 00892 uint8_t proc_sub_ml_wc_hier(const char *grandpa_topSUB, char *parent_subtop, 00893 struct topic_node *base, struct topbuf_desc *mk_pubtop, 00894 enum mqtt_qos qos, void *usr_cl) 00895 { 00896 uint8_t min = QOS_VALUE(qos), ack = QFL_VALUE; 00897 char *subtop = NULL; 00898 00899 /* 1. Search hier node for 'grandpa' and if found, get to parent level */ 00900 if('\0' != grandpa_topSUB[0]) { 00901 struct topic_node *leaf = pub_hier_search(grandpa_topSUB, base, 00902 mk_pubtop); 00903 if(NULL == leaf) 00904 return min; 00905 00906 base = leaf->dn_hier; /* nhbr root at parent level */ 00907 } 00908 00909 /* 2. If present, process parent as a leaf and get its down hierarchy */ 00910 subtop = parent_subtop; 00911 if(('\0' != subtop[0]) && ('+' != subtop[0]) && ('/' != subtop[0])) { 00912 uint16_t offset = mk_pubtop->offset; /* Refer to grandpa's pubtop */ 00913 uint16_t sublen = 0; 00914 00915 while(subtop[sublen]){sublen++;} 00916 00917 ack = proc_pub_tree_SUBtop(subtop, base, mk_pubtop, qos, usr_cl); 00918 mk_pubtop->offset = offset; /* Restores grandpa's pubtop */ 00919 00920 subtop[sublen] = '/'; /* Make parent's hier subtop */ 00921 00922 base = nhbr_node_find(base, subtop); 00923 if(base) 00924 base = topbuf_cpy(mk_pubtop, subtop)? 00925 base->dn_hier : NULL; 00926 subtop[sublen] = '\0'; /* Get back, original subtop */ 00927 } 00928 00929 min = MIN(min, ack); 00930 /* 3. Process '#' WC by walking thru entire sub-tree of parent 'base' */ 00931 if(NULL != base) 00932 ack = proc_pub_tree_no_top(base, mk_pubtop, qos, usr_cl); 00933 00934 return MIN(min, ack); 00935 } 00936 00937 static uint8_t proc_sub_ml_wc_tree(char *grandpa_topSUB, char *parent_subtop, 00938 struct topic_node *base, 00939 enum mqtt_qos qos, void *usr_cl) 00940 { 00941 struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */}; 00942 uint32_t stack_ref = stack_idx; 00943 uint8_t min = QOS_VALUE(qos); 00944 00945 if(NULL != base) 00946 stack_add(base, (uint32_t)grandpa_topSUB, mk_pubtop.offset); 00947 00948 while(stack_ref < stack_idx) { 00949 struct _node_stack *stack = stack_pop(); 00950 uint8_t ack; 00951 00952 mk_pubtop.offset = stack->val2; 00953 ack = proc_sub_ml_wc_hier((char*)stack->val1, parent_subtop, 00954 stack->node, &mk_pubtop, qos, usr_cl); 00955 if(ack < min) 00956 min = ack; 00957 } 00958 00959 return min; 00960 } 00961 00962 static uint8_t ml_wc_nodes_create(char *parent_topSUB, uint16_t toplen, uint8_t qid, void *usr_cl) 00963 { 00964 struct topic_node *parent_leaf = NULL; 00965 00966 if('\0' != parent_topSUB[0]) { 00967 parent_leaf = SUB_node_create(parent_topSUB, qid, usr_cl); 00968 if(NULL == parent_leaf) 00969 return QFL_VALUE; 00970 } 00971 00972 /* Get the topic SUB to it's original state */ 00973 if(toplen > 1) parent_topSUB[toplen - 2] = '/'; 00974 parent_topSUB[toplen - 1] = '#'; 00975 00976 if(NULL == SUB_node_create(parent_topSUB, qid, usr_cl)) { 00977 /* Failed to create WC topic, so delete parent as well. 00978 In this revision, 'parent_leaf' will not be a 'NULL' 00979 at this juncture, nevertheless a check (for tools) */ 00980 if(parent_leaf) 00981 node_delete(parent_leaf); 00982 00983 return QFL_VALUE; 00984 } 00985 00986 return qid; 00987 } 00988 00989 /* Process Multi-level Wildcard Topic SUBSCRIBE */ 00990 static uint8_t proc_sub_ml_wildcard(char *topSUB, uint16_t toplen, enum mqtt_qos qos, 00991 void *usr_cl) 00992 { 00993 uint16_t len = 0, limit = MIN(toplen, MAX_SUBTOP_LEN); 00994 char subtop[MAX_SUBTOP_LEN], *ptr; 00995 uint8_t min = QOS_VALUE(qos); 00996 00997 /* 'topSUB': Need to create grandpa topic and parent-subtopic */ 00998 topSUB[toplen - 1] = '\0'; /* Remove '#' */ 00999 if(toplen > 1) /* Remove '/' */ 01000 topSUB[toplen - 2] = '\0'; 01001 01002 do { /* Do processing to get parent sub-topic into buffer */ 01003 if('/' == topSUB[toplen - len - 1]) 01004 break; /* found '/' */ 01005 01006 len++; /* Copy parent characters */ 01007 subtop[MAX_SUBTOP_LEN - len] = topSUB[toplen - len]; 01008 } while(len < limit); 01009 01010 if((toplen > len) && ('/' != topSUB[toplen - len - 1])) 01011 return QFL_VALUE; /* Bad Length */ 01012 01013 topSUB[toplen - len] = '\0'; /* End of grand-pa's topic name */ 01014 ptr = subtop + MAX_SUBTOP_LEN - len; /* Parent's leaf subtop */ 01015 min = proc_sub_ml_wc_tree(topSUB, ptr, root_node, qos, usr_cl); 01016 01017 /* Make branch-combo to complete processing of parent' topic */ 01018 strcpy(topSUB + toplen - len, ptr); // topSUB[toplen - len] = *ptr; 01019 01020 /* Create nodes for multi-level wildcard topic & it's parent */ 01021 min = ml_wc_nodes_create(topSUB, toplen, min, usr_cl); 01022 01023 return min; 01024 } 01025 01026 /* Process Single-level Wildcard or No Wild Card Topic SUBSCRIBE */ 01027 static 01028 uint8_t proc_sub_sl_or_no_wc(const char *topSUB, enum mqtt_qos qos, void *usr_cl) 01029 { 01030 struct topbuf_desc mk_pubtop = {work_buf, WBUF_LEN, 0 /* offset */}; 01031 uint8_t min = QOS_VALUE(qos); 01032 01033 /* For single level wildcard or absolute topic, find PUB nodes */ 01034 min = proc_pub_tree_SUBtop(topSUB, root_node, &mk_pubtop, qos, usr_cl); 01035 01036 if(NULL == SUB_node_create(topSUB, min, usr_cl)) 01037 min = QFL_VALUE; 01038 01039 return min; 01040 } 01041 01042 static uint16_t proc_forward_slash(char *buf, uint16_t len) 01043 { 01044 uint16_t i, j; 01045 for(i = 1, j = 1; i < len; i++) { 01046 char curr = buf[i]; 01047 if(('/' == curr) && (buf[i - 1] == curr)) 01048 continue; /* Drop consecutive '/' */ 01049 01050 buf[j++] = curr; 01051 } 01052 01053 if((1 != j) && ('/' == buf[j - 1])) 01054 j--; /* Topic can not end with a '/' */ 01055 01056 buf[j] = '\0'; 01057 01058 return j; 01059 } 01060 01061 static inline bool is_valid_char_order(char prev, char curr) 01062 { 01063 return ((('/' != prev) && ('+' == curr)) || 01064 (('+' == prev) && ('/' != curr)) || 01065 (('/' != prev) && ('#' == curr)) || 01066 (('#' == prev)))? false : true; 01067 } 01068 01069 01070 static bool is_valid_SUB_top(const char *buf, uint16_t len) 01071 { 01072 char prev, curr; 01073 uint16_t i = 0; 01074 01075 if((0 == len) || ('\0' == *buf)) 01076 return false; 01077 01078 curr = buf[0]; 01079 for(i = 1; (i < len) && ('\0' != curr); i++) { 01080 prev = curr; 01081 curr = buf[i]; 01082 01083 if(false == is_valid_char_order(prev, curr)) 01084 break; 01085 } 01086 01087 return (i == len)? true : false; 01088 } 01089 01090 static bool proc_sub_msg_rx(void *usr_cl, const struct utf8_strqos *qos_topics, 01091 uint32_t n_topics, uint16_t msg_id, uint8_t *ack) 01092 { 01093 int32_t i = 0; 01094 for(i = 0; i < n_topics; i++) { 01095 const struct utf8_strqos *qos_top = qos_topics + i; 01096 enum mqtt_qos qos = qos_top->qosreq; 01097 char *buf = (char*)qos_top->buffer; 01098 uint16_t len = qos_top->length; 01099 01100 /* Remove zero-topics and trailing '/' from SUB top */ 01101 len = proc_forward_slash(buf, len); 01102 ack[i] = QFL_VALUE; 01103 if(false == is_valid_SUB_top(buf, len)) 01104 continue; 01105 01106 buf[len] = '\0'; /* Dirty trick, cheeky one */ 01107 01108 ack[i] = ('#' == buf[len - 1])? 01109 proc_sub_ml_wildcard(buf, len, qos, usr_cl) : 01110 proc_sub_sl_or_no_wc(buf, qos, usr_cl); 01111 01112 DBG_INFO("SUB Topic%-2d %s is ACK'ed w/ 0x%02x\n\r", 01113 i + 1, buf, ack[i]); 01114 } 01115 01116 return true; /* Send SUB-ACK and do not close network */ 01117 } 01118 01119 static 01120 bool proc_sub_msg_rx_locked(void *usr_cl, const struct utf8_strqos *qos_topics, 01121 uint32_t n_topics, uint16_t msg_id, uint8_t *ack) 01122 { 01123 return proc_sub_msg_rx(usr_cl, qos_topics, n_topics, msg_id, ack); 01124 } 01125 01126 static void leaf_un_sub(struct topic_node *leaf, void *usr_cl) 01127 { 01128 uint8_t j = 0; 01129 uint32_t map = cl_bmap_get(usr_cl); 01130 01131 for(j = 0; j < 3; j++) { 01132 /* Client: clear QOS of existing sub, if any */ 01133 if(0 == (leaf->cl_map[j] & map)) 01134 continue; 01135 01136 leaf->cl_map[j] &= ~map; 01137 cl_sub_count_del(usr_cl); 01138 01139 try_node_delete(leaf); 01140 01141 break; 01142 } 01143 } 01144 01145 static bool proc_un_sub_msg(void *usr_cl, const struct utf8_string *topics, 01146 uint32_t n_topics, uint16_t msg_id) 01147 { 01148 uint32_t i = 0; 01149 01150 for(i = 0; i < n_topics; i++) { 01151 const struct utf8_string *topic = topics + i; 01152 struct topic_node *leaf = NULL; 01153 uint16_t len = topic->length; 01154 01155 /* The maximum length of 'work_buf' is same as that of RX buffer 01156 in the PKT-LIB. Therefore, the WBUF_LEN is not being checked 01157 against the length of the topic (a constituent of RX buffer). 01158 */ 01159 strncpy(work_buf, topic->buffer, topic->length); 01160 work_buf[len] = '\0'; 01161 01162 if('#' == work_buf[len - 1]) { /* Multi-level Wildcard */ 01163 work_buf[len - 1] = '\0'; 01164 if(len > 1) 01165 work_buf[len - 2] = '\0'; 01166 01167 leaf = leaf_node_find(work_buf); 01168 if(leaf) 01169 leaf_un_sub(leaf, usr_cl); 01170 01171 if(len > 1) 01172 work_buf[len - 2] = '/'; 01173 work_buf[len - 1] = '#'; 01174 } 01175 01176 leaf = leaf_node_find(work_buf); 01177 if(leaf) 01178 leaf_un_sub(leaf, usr_cl); 01179 } 01180 01181 return true; /* Do not close network */ 01182 } 01183 01184 static 01185 bool proc_un_sub_msg_locked(void *usr_cl, const struct utf8_string *topics, 01186 uint32_t n_topics, uint16_t msg_id) 01187 { 01188 return proc_un_sub_msg(usr_cl, topics, n_topics, msg_id); 01189 } 01190 01191 static void 01192 leaf_msg_send(const struct topic_node *leaf, const struct utf8_string *topic, 01193 const uint8_t *data_buf, uint32_t data_len, bool dup, enum mqtt_qos qos, 01194 bool retain) 01195 { 01196 uint8_t qid = 0, fh_fgs = 0; 01197 01198 for(qid = 0; qid < 3; qid++) { 01199 uint8_t map = leaf->cl_map[qid]; 01200 fh_fgs = MAKE_FH_FLAGS(dup, MIN(qid, QOS_VALUE(qos)), retain); 01201 01202 if(map) 01203 pub_msg_send(topic, data_buf, data_len, fh_fgs, map); 01204 } 01205 01206 if(enrolls_plugin(leaf)) 01207 plugin_publish(leaf->pg_map, topic, data_buf, data_len, 01208 dup, qos, retain); 01209 01210 return; 01211 } 01212 01213 static void node_data_set(struct topic_node *node, uint8_t *data, 01214 uint32_t dlen, uint8_t qid, bool retain) 01215 { 01216 node->my_data = data; 01217 node->my_dlen = dlen; 01218 01219 node_qid_set(node, qid); 01220 node_retain_set(node, retain); 01221 01222 return; 01223 } 01224 01225 static bool node_data_update(struct topic_node *node, bool drop_qid0, 01226 const uint8_t *data_buf, uint32_t data_len, 01227 uint8_t qid, bool retain) 01228 { 01229 #define NODE_DATA_RESET_PARAMS NULL, 0, 0, false 01230 01231 /* Assumes that caller has provided either reset or valid params */ 01232 01233 uint8_t *data = NULL; 01234 01235 if(node->my_dlen) 01236 my_free(node->my_data); 01237 01238 /* Watch out for assignment in 'if' statement - avoid such smarts */ 01239 if((drop_qid0 && (0 == qid)) || (data_buf && !(data = (uint8_t*)my_malloc(data_len)))) { 01240 node_data_set(node, NODE_DATA_RESET_PARAMS); 01241 } else { 01242 01243 if(data) 01244 buf_wr_nbytes(data, data_buf, data_len); 01245 01246 node_data_set(node, data, data_len, qid, retain); 01247 } 01248 01249 return ((!!data) ^ (!!data_len))? false : true; 01250 } 01251 01252 static inline bool is_wildcard_char(char c) 01253 { 01254 return (('+' == c) || ('#' == c))? true : false; 01255 } 01256 01257 static int32_t pub_topic_read(const struct utf8_string *topic, char *buf, uint32_t len) 01258 { 01259 uint32_t i = 0; 01260 uint32_t toplen = topic->length; 01261 01262 if(len < (toplen + 1)) 01263 return -1; 01264 01265 for(i = 0; i < toplen; i++) { 01266 char c = topic->buffer[i]; 01267 if(is_wildcard_char(c)) 01268 return -1; /* Invalid: wildcard in PUB topic */ 01269 01270 if('\0' == c) 01271 return -1; /* Invalid: NUL char in PUB topic */ 01272 01273 buf[i] = c; 01274 } 01275 01276 buf[i] = '\0'; 01277 01278 return i; 01279 } 01280 01281 static 01282 void proc_sub_tree_topPUB(const char *topPUB, const struct utf8_string *topic, 01283 const uint8_t *data_buf, uint32_t data_len, enum mqtt_qos qos, 01284 bool retain) 01285 { 01286 struct topic_node *leaf = NULL; 01287 uint32_t stack_ref = stack_idx; 01288 01289 if(NULL != root_node) 01290 stack_add(root_node, (uint32_t)topPUB, 0 /* Not used */); 01291 01292 while(stack_ref < stack_idx) { 01293 struct _node_stack *stack = stack_pop(); 01294 01295 /* Find leaf node of SUB that matches the PUB topic */ 01296 leaf = SUB_leaf_search((char*)stack->val1, stack->node); 01297 if(leaf) 01298 leaf_msg_send(leaf, topic, data_buf, data_len, 01299 false, qos, retain); 01300 } 01301 } 01302 01303 static bool _proc_pub_msg_rx(void *usr_cl, const struct utf8_string *topic, 01304 const uint8_t *data_buf, uint32_t data_len, uint8_t msg_id, 01305 enum mqtt_qos qos, bool retain) 01306 { 01307 int32_t err = -1; 01308 01309 /* Prior to msg processing, chk for topic or buffer errors */ 01310 if((pub_topic_read(topic, work_buf, WBUF_LEN) <= 0) || 01311 (proc_forward_slash(work_buf, topic->length) <= 0)) 01312 goto _proc_pub_msg_rx_exit; 01313 01314 /* If a valid MSG ID is specified for a QOS2 pkt, track it */ 01315 err = -2; 01316 if((msg_id) && 01317 (MQTT_QOS2 == qos) && 01318 (false == cl_mgmt_qos2_pub_rx_update(usr_cl, msg_id))) 01319 goto _proc_pub_msg_rx_exit; 01320 01321 /* Forward data to all subscribers of PUB topic in server */ 01322 proc_sub_tree_topPUB(work_buf, topic, data_buf, 01323 data_len, qos, false); 01324 01325 err = 0; 01326 if(retain) { 01327 struct topic_node *leaf = topic_node_create(work_buf); 01328 if((NULL == leaf) || 01329 (false == node_data_update(leaf, true, data_buf, data_len, 01330 QOS_VALUE(qos), retain))) 01331 err = -3; /* Resources no more available */ 01332 01333 if(leaf) 01334 try_node_delete(leaf); 01335 } 01336 01337 _proc_pub_msg_rx_exit: 01338 DBG_INFO("Processing of PUB message from %s (0x%08x) has %s (%d)\n\r", 01339 usr_cl? "client" : "plugin", usr_cl? cl_bmap_get(usr_cl) : 0, 01340 err? "failed" : "succeeded", err); 01341 01342 return (err < 0)? false : true; 01343 } 01344 01345 static 01346 bool proc_pub_msg_rx_locked(void *usr_cl, const struct utf8_string *topic, 01347 const uint8_t *data_buf, uint32_t data_len, uint16_t msg_id, 01348 bool dup, enum mqtt_qos qos, bool retain) 01349 { 01350 return _proc_pub_msg_rx(usr_cl, topic, data_buf, data_len, 01351 msg_id, qos, retain); 01352 } 01353 01354 static int32_t utf8_str_rd(const struct utf8_string *utf8, char *buf, uint32_t len) 01355 { 01356 if((NULL == utf8) || (utf8->length > (len - 1))) 01357 return -1; 01358 01359 buf_wr_nbytes((uint8_t*)buf, (uint8_t*)utf8->buffer, utf8->length); 01360 buf[utf8->length] = '\0'; 01361 01362 return utf8->length; 01363 } 01364 01365 #define CONN_FLAGS_WQID_GET(conn_flags) \ 01366 ((conn_flags >> 3) & QID_VMASK) /* WILL QOS VAL */ 01367 01368 static uint16_t proc_connect_rx(void *ctx_cl, uint8_t conn_flags, 01369 struct utf8_string * const *utf8_vec, void **usr_cl) 01370 { 01371 struct topic_node *leaf = NULL; 01372 struct utf8_string *utf8 = NULL; 01373 void *app_cl = NULL; 01374 uint16_t utf8_len = 0; 01375 uint16_t rv = plugin_connect(MQC_UTF8_CLIENTID(utf8_vec), 01376 MQC_UTF8_USERNAME(utf8_vec), 01377 MQC_UTF8_PASSWORD(utf8_vec), 01378 &app_cl); 01379 if(rv) 01380 goto proc_connect_rx_exit1; /* Admin did not permit connection */ 01381 01382 rv = CONNACK_RC_SVR_UNAVBL; /* Server (resource) unavailable */ 01383 01384 utf8 = MQC_UTF8_WILL_TOP(utf8_vec); 01385 if(utf8 && utf8->length) { 01386 utf8_str_rd(utf8, work_buf, WBUF_LEN); 01387 01388 leaf = topic_node_create(work_buf); 01389 if(NULL == leaf) 01390 goto proc_connect_rx_exit2; 01391 01392 if(false == node_data_update(leaf, false, 01393 (uint8_t*)MQC_WILL_MSG_BUF(utf8_vec), 01394 MQC_WILL_MSG_LEN(utf8_vec), 01395 CONN_FLAGS_WQID_GET(conn_flags), 01396 conn_flags & WILL_RETAIN_VAL)) 01397 goto proc_connect_rx_exit3; 01398 } 01399 01400 utf8 = MQC_UTF8_CLIENTID(utf8_vec); 01401 if(utf8) 01402 utf8_len = utf8_str_rd(utf8, work_buf, WBUF_LEN); 01403 01404 rv = cl_connect_rx(ctx_cl, (conn_flags & CLEAN_START_VAL)? true : false, 01405 utf8_len? work_buf : NULL, app_cl, leaf, usr_cl); 01406 if(CONNACK_RC_REQ_ACCEPT == (rv & 0xFF)) { 01407 if(leaf) 01408 leaf->will_cl = *usr_cl; 01409 01410 return rv; /* Connection successful */ 01411 } 01412 01413 if(leaf) 01414 node_data_update(leaf, true, NODE_DATA_RESET_PARAMS); 01415 01416 proc_connect_rx_exit3: try_node_delete(leaf); 01417 proc_connect_rx_exit2: plugin_disconn(app_cl, true); 01418 proc_connect_rx_exit1: 01419 return rv; 01420 } 01421 01422 static 01423 uint16_t proc_connect_rx_locked(void *ctx_cl, uint8_t conn_flags, 01424 struct utf8_string * const *utf8_vec, void **usr_cl) 01425 { 01426 return proc_connect_rx(ctx_cl, conn_flags, utf8_vec, usr_cl); 01427 } 01428 01429 static void session_hier_delete(struct topic_node *node, void *usr_cl) 01430 { 01431 struct topic_node *prev = NULL; 01432 uint32_t cl_map = cl_bmap_get(usr_cl); 01433 01434 while(node) { 01435 int32_t i = 0; 01436 for(i = 0; i < 3; i++) { 01437 if(node->cl_map[i] & cl_map) { 01438 node->cl_map[i] &= ~cl_map; 01439 cl_sub_count_del(usr_cl); 01440 /* Client/Topic/QID 1-to-1 map */ 01441 break; 01442 } 01443 } 01444 01445 if(node->dn_nhbr) 01446 stack_add(node->dn_nhbr, 0, 0); 01447 01448 prev = node; 01449 node = node->dn_hier; 01450 } 01451 01452 if(prev) 01453 try_node_delete(prev); 01454 } 01455 01456 void session_tree_delete(void *usr_cl) 01457 { 01458 uint32_t stack_ref = stack_idx; 01459 01460 if(NULL != root_node) 01461 stack_add(root_node, 0, 0); 01462 01463 while(stack_ref < stack_idx) { 01464 struct _node_stack *stack = stack_pop(); 01465 session_hier_delete(stack->node, usr_cl); 01466 } 01467 } 01468 01469 static void proc_client_will(struct topic_node *leaf) 01470 { 01471 uint32_t wbuf_len = WBUF_LEN - 1; /* Make space for '\0' in wbuf */ 01472 uint32_t offset = wbuf_len; 01473 struct utf8_string topic; 01474 struct topic_node *node; 01475 01476 work_buf[offset] = '\0'; /* Ensures wbuf is NUL terminated */ 01477 node = leaf; 01478 01479 /* Prepare a topic string by walking back from leaf to root */ 01480 do { 01481 if(offset < node->toplen) 01482 return; 01483 01484 offset -= node->toplen; 01485 strncpy(work_buf + offset, node->subtop, node->toplen); 01486 01487 while(node->up_nhbr) 01488 node = node->up_nhbr; 01489 01490 node = node->up_hier; 01491 01492 } while(node); 01493 01494 topic.buffer = work_buf + offset; 01495 topic.length = wbuf_len - offset; 01496 01497 #define MK_QOS_ENUM(qid) ((enum mqtt_qos)(qid & QID_VMASK)) 01498 01499 proc_sub_tree_topPUB((char*)topic.buffer, 01500 &topic, leaf->my_data, leaf->my_dlen, 01501 MK_QOS_ENUM(node_qid_get(leaf)), 01502 is_node_retain(leaf)); 01503 01504 } 01505 01506 static void on_cl_net_close(void *usr_cl, bool due2err) 01507 { 01508 struct topic_node *leaf = NULL; 01509 void *app_cl = NULL; 01510 01511 /* See if client has a WILL that it intends to broadcast */ 01512 leaf = (struct topic_node*) cl_will_hndl_get(usr_cl); 01513 if(NULL != leaf) { 01514 if(usr_cl != leaf->will_cl) 01515 return; /* Mismatch: should never happen */ 01516 01517 if(due2err) 01518 proc_client_will(leaf); /* pls broadcast */ 01519 01520 /* Network is closing, so cleanup WILL msg store */ 01521 node_data_update(leaf, true, NODE_DATA_RESET_PARAMS); 01522 leaf->will_cl = NULL; 01523 try_node_delete(leaf); 01524 } 01525 01526 /* If not needed for future, delete session info */ 01527 if(cl_can_session_delete(usr_cl)) 01528 session_tree_delete(usr_cl); 01529 01530 /* Inform app that client has been disconnected */ 01531 app_cl = cl_app_hndl_get(usr_cl); 01532 plugin_disconn(app_cl, due2err); 01533 01534 cl_on_net_close(usr_cl); 01535 } 01536 01537 static 01538 void on_cl_net_close_locked(void *usr_cl, bool due2err) 01539 { 01540 on_cl_net_close(usr_cl, due2err); 01541 return; 01542 } 01543 01544 static void on_connack_send(void *usr_cl, bool clean_session) 01545 { 01546 /* If asserted, then need to start w/ clean state */ 01547 if(clean_session) 01548 session_tree_delete(usr_cl); 01549 01550 cl_on_connack_send(usr_cl, clean_session); 01551 01552 return; 01553 } 01554 01555 static 01556 void on_connack_send_locked(void *usr_cl, bool clean_session) 01557 { 01558 on_connack_send(usr_cl, clean_session); 01559 return; 01560 } 01561 01562 static 01563 bool proc_notify_ack_locked(void *usr_cl, uint8_t msg_type, uint16_t msg_id) 01564 { 01565 return cl_notify_ack(usr_cl, msg_type, msg_id); 01566 } 01567 01568 static int32_t proc_topic_enroll(uint8_t pg_id, const struct utf8_string *topic, 01569 enum mqtt_qos qos) 01570 { 01571 struct topic_node *leaf = NULL; 01572 uint16_t len = 0; 01573 01574 if((NULL == topic) || (NULL == topic->buffer) || (0 == topic->length)) 01575 return -1; 01576 01577 if(WBUF_LEN < (topic->length + 1)) 01578 return -2; 01579 01580 len = topic->length; 01581 strncpy(work_buf, topic->buffer, len); 01582 work_buf[len] = '\0'; 01583 01584 leaf = topic_node_create(work_buf); 01585 if(NULL == leaf) 01586 return -3; 01587 01588 PG_MAP_VAL_SETUP(leaf->pg_map, QOS_VALUE(qos), pg_id); 01589 01590 return 0; 01591 } 01592 01593 static 01594 int32_t proc_topic_enroll_locked(uint8_t pg_id, const struct utf8_string *topic, 01595 enum mqtt_qos qos) 01596 { 01597 int32_t rv = 0; 01598 01599 MUTEX_LOCKIN(); 01600 rv = proc_topic_enroll(pg_id, topic, qos); 01601 MUTEX_UNLOCK(); 01602 01603 return rv; 01604 } 01605 01606 static int32_t proc_topic_cancel(uint8_t pg_id, const struct utf8_string *topic) 01607 { 01608 struct topic_node *leaf = NULL; 01609 uint16_t len = 0; 01610 01611 if(NULL == topic) 01612 return -1; 01613 01614 if(WBUF_LEN < (topic->length + 1)) 01615 return -2; 01616 01617 len = topic->length; 01618 strncpy(work_buf, topic->buffer, len); 01619 work_buf[len] = '\0'; 01620 01621 leaf = leaf_node_find(work_buf); 01622 if(NULL == leaf) 01623 return -2; 01624 01625 PG_MAP_VAL_RESET(leaf->pg_map, pg_id); 01626 01627 try_node_delete(leaf); 01628 01629 return 0; 01630 } 01631 01632 static 01633 int32_t proc_topic_cancel_locked(uint8_t pg_id, const struct utf8_string *topic) 01634 { 01635 int32_t rv = 0; 01636 01637 MUTEX_LOCKIN(); 01638 rv = proc_topic_cancel(pg_id, topic); 01639 MUTEX_UNLOCK(); 01640 01641 return rv; 01642 } 01643 01644 static 01645 int32_t proc_app_pub_send_locked(const struct utf8_string *topic, const uint8_t *data_buf, 01646 uint32_t data_len, enum mqtt_qos qos, bool retain) 01647 { 01648 bool rv; 01649 01650 MUTEX_LOCKIN(); 01651 /* Received from application, process topic for distribution */ 01652 rv = _proc_pub_msg_rx(NULL, topic, data_buf, data_len, 01653 0x00, qos, retain); 01654 MUTEX_UNLOCK(); 01655 01656 return rv? (int32_t)data_len : -1; 01657 } 01658 01659 int32_t mqtt_server_init(const struct mqtt_server_lib_cfg *lib_cfg, 01660 const struct mqtt_server_app_cfg *app_cfg) 01661 { 01662 /* If mutex is specified, then the following set of callbacks 01663 are invoked in the locked state - enumerated by 'locked' */ 01664 struct mqtt_server_msg_cbs pkts_cbs = {proc_connect_rx_locked, 01665 proc_sub_msg_rx_locked, 01666 proc_un_sub_msg_locked, 01667 proc_pub_msg_rx_locked, 01668 proc_notify_ack_locked, 01669 on_cl_net_close_locked, 01670 on_connack_send_locked}; 01671 01672 struct plugin_core_msg_cbs core_cbs = {proc_topic_enroll_locked, 01673 proc_topic_cancel_locked, 01674 proc_app_pub_send_locked}; 01675 01676 util_params_set(lib_cfg->debug_printf, 01677 lib_cfg->mutex, 01678 lib_cfg->mutex_lockin, 01679 lib_cfg->mutex_unlock); 01680 01681 USR_INFO("Version: Server LIB %s, Common LIB %s.\n\r", 01682 MQTT_SERVER_VERSTR, MQTT_COMMON_VERSTR); 01683 01684 topic_node_init(); 01685 01686 cl_mgmt_init(); 01687 01688 plugin_init(&core_cbs); 01689 01690 mqtt_server_lib_init(lib_cfg, &pkts_cbs); 01691 01692 return 0; 01693 } 01694 01695 }//namespace mbed_mqtt
Generated on Wed Jul 13 2022 09:55:39 by
1.7.2