TI's MQTT Demo with freertos CM4F
Embed:
(wiki syntax)
Show/hide line numbers
client_mgmt.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 #include "server_util.h" 00017 #include "client_mgmt.h" 00018 #include "server_pkts.h" 00019 00020 namespace mbed_mqtt { 00021 00022 #ifndef CFG_SR_MAX_CL_ID_SIZE 00023 #define MAX_CLIENT_ID_LEN 64 00024 #else 00025 #define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE 00026 #endif 00027 00028 #ifndef CFG_SR_MAX_NUM_CLIENT 00029 #define MAX_CLIENTS 16 /* Must not exceed 32 */ 00030 #else 00031 #define MAX_CLIENTS CFG_SR_MAX_NUM_CLIENT 00032 00033 #if (CFG_SR_MAX_NUM_CLIENT > 32) 00034 #error "CFG_SR_MAX_NUM_CLIENT must not exceed 32" 00035 #endif 00036 #endif 00037 00038 static struct client_usr { 00039 00040 void *ctx; /* Client net */ 00041 void *app; /* Client app */ 00042 00043 void *will; /* Will topic */ 00044 uint32_t index; 00045 uint32_t n_sub; 00046 00047 #define MQ_CONNECT_FLAG 0x00000001 00048 #define CLEAN_SESS_FLAG 0x00000002 00049 #define ASSIGNMENT_FLAG 0x00000004 /* No state or connected */ 00050 00051 uint32_t flags; 00052 00053 char client_id[MAX_CLIENT_ID_LEN]; 00054 00055 struct pub_qos2_cq qos2_rx_cq; 00056 struct pub_qos2_cq qos2_tx_cq; 00057 00058 } users[MAX_CLIENTS]; 00059 00060 // TBD consistency between inline and macro functions 00061 00062 static inline bool is_connected(struct client_usr *cl_usr) 00063 { 00064 return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false; 00065 } 00066 00067 static inline bool has_clean_session(struct client_usr *cl_usr) 00068 { 00069 return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false; 00070 } 00071 00072 static inline bool is_assigned(struct client_usr *cl_usr) 00073 { 00074 return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false; 00075 } 00076 00077 static inline void set_connect_state(struct client_usr *cl_usr, 00078 bool connected) 00079 { 00080 if(connected) 00081 cl_usr->flags |= MQ_CONNECT_FLAG; 00082 else 00083 cl_usr->flags &= ~MQ_CONNECT_FLAG; 00084 } 00085 00086 static inline void set_clean_session(struct client_usr *cl_usr, 00087 bool clean_session) 00088 { 00089 if(clean_session) 00090 cl_usr->flags |= CLEAN_SESS_FLAG; 00091 else 00092 cl_usr->flags &= ~CLEAN_SESS_FLAG; 00093 } 00094 00095 static inline void set_assigned(struct client_usr *cl_usr, 00096 bool assignment) 00097 { 00098 if(assignment) 00099 cl_usr->flags |= ASSIGNMENT_FLAG; 00100 else 00101 cl_usr->flags &= ~ASSIGNMENT_FLAG; 00102 } 00103 00104 static void cl_usr_reset(struct client_usr *cl_usr) 00105 { 00106 cl_usr->ctx = NULL; 00107 cl_usr->app = NULL; 00108 cl_usr->will = NULL; 00109 00110 cl_usr->n_sub = 0; 00111 cl_usr->flags = 0; 00112 00113 cl_usr->client_id[0] = '\0'; 00114 00115 qos2_pub_cq_reset(&cl_usr->qos2_rx_cq); 00116 qos2_pub_cq_reset(&cl_usr->qos2_tx_cq); 00117 } 00118 00119 static void cl_usr_init(void) 00120 { 00121 int32_t idx = 0; 00122 for(idx = 0; idx < MAX_CLIENTS; idx++) { 00123 struct client_usr *cl_usr = users + idx; 00124 cl_usr->index = idx; 00125 00126 cl_usr_reset(cl_usr); 00127 } 00128 } 00129 00130 static void cl_usr_free(struct client_usr *cl_usr) 00131 { 00132 cl_usr_reset(cl_usr); 00133 cl_usr->flags &= ~(MQ_CONNECT_FLAG | 00134 CLEAN_SESS_FLAG | 00135 ASSIGNMENT_FLAG); 00136 } 00137 00138 void cl_sub_count_add(void *usr_cl) 00139 { 00140 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00141 00142 if(is_connected(cl_usr)) { 00143 cl_usr->n_sub++; 00144 USR_INFO("%s has added a new sub, now total is %u\n\r", 00145 cl_usr->client_id, cl_usr->n_sub); 00146 00147 } 00148 00149 return; 00150 } 00151 00152 void cl_sub_count_del(void *usr_cl) 00153 { 00154 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00155 00156 if(is_connected(cl_usr)) { 00157 cl_usr->n_sub--; 00158 USR_INFO("%s has deleted a sub, now total is %u\n\r", 00159 cl_usr->client_id, cl_usr->n_sub); 00160 } 00161 00162 return; 00163 } 00164 00165 /*---------------------------------------------------------------------------- 00166 * QoS2 PUB RX Message handling mechanism and associated house-keeping 00167 *--------------------------------------------------------------------------*/ 00168 static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id) 00169 { 00170 return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id); 00171 } 00172 00173 static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id) 00174 { 00175 return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id); 00176 } 00177 00178 static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id) 00179 { 00180 return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id); 00181 } 00182 00183 static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id) 00184 { 00185 return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id); 00186 } 00187 00188 static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id) 00189 { 00190 return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id); 00191 } 00192 00193 bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id) 00194 { 00195 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00196 00197 return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) || 00198 qos2_pub_rx_logup(cl_usr, msg_id)); 00199 } 00200 00201 static void ack2_msg_id_dispatch(struct client_usr *cl_usr) 00202 { 00203 struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq; 00204 uint8_t rd_idx = tx_cq->rd_idx; 00205 uint8_t n_free = tx_cq->n_free; 00206 uint8_t i = 0; 00207 00208 for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) { 00209 if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1, 00210 true, tx_cq->id_vec[i]) <= 0) 00211 break; 00212 } 00213 00214 return; 00215 } 00216 00217 static void ack2_msg_id_purge(struct client_usr *cl_usr) 00218 { 00219 qos2_pub_cq_reset(&cl_usr->qos2_tx_cq); 00220 qos2_pub_cq_reset(&cl_usr->qos2_rx_cq); 00221 } 00222 00223 /* 00224 00225 */ 00226 static struct mqtt_ack_wlist wl_qos_ack1 = {NULL, NULL}; 00227 static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1; 00228 00229 /* 00230 00231 */ 00232 static struct mqtt_ack_wlist wl_mqp_sess = {NULL, NULL}; 00233 static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess; 00234 00235 #define MQP_CL_MAP_GET(mqp) (mqp->private_) 00236 #define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |= cl_map) 00237 #define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map) 00238 00239 #define CL_BMAP(cl_usr) (1 << cl_usr->index) 00240 00241 static inline 00242 int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp, 00243 bool dup) 00244 { 00245 /* Error, if any, is handled in 'cl_net_close()' ....*/ 00246 return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup); 00247 } 00248 00249 static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr) 00250 { 00251 struct mqtt_packet *mqp = NULL; 00252 00253 for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next) 00254 if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr)) 00255 _pub_dispatch(cl_usr, mqp, true); 00256 } 00257 00258 #if 0 00259 static struct client_usr *find_cl_usr(uint32_t index) 00260 { 00261 struct client_usr *cl_usr = users + 0; 00262 00263 int32_t i = 0; 00264 for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) { 00265 if(index == cl_usr->index) 00266 break; 00267 } 00268 00269 return (MAX_CLIENTS == i)? NULL : cl_usr; 00270 } 00271 #endif 00272 00273 #define IS_CL_USR_FREE(cl_usr) (( 0 == cl_usr->flags) && \ 00274 ('\0' == cl_usr->client_id[0])) 00275 00276 #define IS_CL_INACTIVE(cl_usr) (( 0 == cl_usr->flags) && \ 00277 ('\0' != cl_usr->client_id[0])) 00278 00279 #define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr) \ 00280 ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) || \ 00281 ((MQTT_QOS0 == qos)))? \ 00282 false : true) 00283 00284 static inline uint32_t _cl_bmap_get(void *usr_cl) 00285 { 00286 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00287 00288 return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr); 00289 } 00290 00291 uint32_t cl_bmap_get(void *usr_cl) 00292 { 00293 return usr_cl? _cl_bmap_get(usr_cl) : 0; 00294 } 00295 00296 void *cl_app_hndl_get(void *usr_cl) 00297 { 00298 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00299 00300 return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL; 00301 } 00302 00303 void *cl_will_hndl_get(void *usr_cl) 00304 { 00305 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00306 00307 return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL; 00308 } 00309 00310 static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp) 00311 { 00312 //uint32_t n_bits = sizeof(uint32_t) << 3; /* Multiply by 8 */ 00313 uint32_t sp_map = 0; /* client Map for sessions present */ 00314 enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */ 00315 const uint32_t cl_ref = cl_map; /* Keep ref to original */ 00316 00317 int32_t i = 0; 00318 for(i = 0; i < MAX_CLIENTS; i++) { 00319 if(cl_map & (1 << i)) { 00320 struct client_usr *cl_usr = users + i; //find_cl_usr(i); 00321 if(is_connected(cl_usr)) 00322 if((_pub_dispatch(cl_usr, mqp, false) > 0) && 00323 NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr)) 00324 continue;/* Processing done; next CL */ 00325 00326 /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */ 00327 cl_map &= ~(1 << i); 00328 00329 if(IS_CL_INACTIVE(cl_usr)) 00330 sp_map |= (1 << i); /* CL: Maintain session */ 00331 } 00332 } 00333 00334 if(sp_map) { 00335 struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */ 00336 if(cpy) { 00337 MQP_CL_MAP_SET(cpy, sp_map); 00338 mqp_ack_wlist_append(sess_mqp_wl, cpy); 00339 } else 00340 sp_map = 0; 00341 } 00342 00343 if(cl_map) { /* Wait List Publish */ 00344 MQP_CL_MAP_SET(mqp, cl_map); 00345 mqp_ack_wlist_append(qos_ack1_wl, mqp); 00346 } else 00347 mqp_free(mqp); /* PUB MQP now has no more use; must be freed */ 00348 00349 USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r", 00350 cl_ref, cl_map, sp_map); 00351 00352 return; 00353 } 00354 00355 void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp) 00356 { 00357 pub_dispatch(cl_map, mqp); 00358 return; 00359 } 00360 00361 /* Move to mqtt_common.h and remove it from mqtt_client.h */ 00362 static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp) 00363 { 00364 mqp_free(mqp); 00365 return MQP_ERR_PKT_LEN; 00366 } 00367 00368 /* Move this to a common file */ 00369 int32_t cl_pub_msg_send(void *usr_cl, 00370 const struct utf8_string *topic, const uint8_t *data_buf, 00371 uint32_t data_len, enum mqtt_qos qos, bool retain) 00372 { 00373 struct mqtt_packet *mqp = NULL; 00374 00375 if((NULL == topic) || 00376 ((data_len > 0) && (NULL == data_buf)) || 00377 (NULL == usr_cl)) 00378 return MQP_ERR_FNPARAM; 00379 00380 mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len); 00381 if(NULL == mqp) 00382 return MQP_ERR_PKT_AVL; 00383 00384 if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) || 00385 (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) 00386 return len_err_free_mqp(mqp); 00387 00388 mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain)); 00389 00390 pub_dispatch(_cl_bmap_get(usr_cl), mqp); 00391 00392 return MQP_CONTENT_LEN(mqp); 00393 } 00394 00395 static void wl_remove(struct mqtt_ack_wlist *list, 00396 struct mqtt_packet *prev, 00397 struct mqtt_packet *elem) 00398 { 00399 if(prev) 00400 prev->next = elem->next; 00401 else 00402 list->head = elem->next; 00403 00404 if(NULL == list->head) 00405 list->tail = NULL; 00406 00407 if(list->tail == elem) 00408 list->tail = prev; 00409 00410 return; 00411 } 00412 00413 static void sess_wl_mqp_dispatch(struct client_usr *cl_usr) 00414 { 00415 struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL; 00416 uint32_t cl_map = CL_BMAP(cl_usr); 00417 00418 for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) { 00419 struct mqtt_packet *cpy = NULL; 00420 00421 if(0 == (MQP_CL_MAP_GET(mqp) & cl_map)) 00422 continue; /* MQP & CL: no association */ 00423 00424 /* MQP has associated client(s) - process it */ 00425 00426 /* Dissociate this client from MQP */ 00427 MQP_CL_MAP_CLR(mqp, cl_map); 00428 next = mqp->next; /* House keeping */ 00429 00430 if(0 == MQP_CL_MAP_GET(mqp)) { 00431 /* MQP w/ no clients, remove from WL */ 00432 wl_remove(sess_mqp_wl, prev, mqp); 00433 00434 /* Ensure correct arrangement for WL */ 00435 cpy = mqp; mqp = prev; 00436 } else { 00437 /* MQP is associated w/ other clients */ 00438 cpy = mqp_server_copy(mqp); 00439 if(NULL == cpy) 00440 continue; 00441 } 00442 00443 /* Got packet from session, dispatch it to CL */ 00444 pub_dispatch(cl_map, cpy); 00445 } 00446 00447 return; 00448 } 00449 00450 static 00451 struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl, 00452 struct client_usr *cl_usr, uint16_t msg_id, 00453 struct mqtt_packet **prev) 00454 { 00455 struct mqtt_packet *mqp = NULL; 00456 00457 *prev = NULL; 00458 for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) { 00459 if(msg_id == mqp->msg_id) { 00460 /* Found a MQP whose msg_id matches with input */ 00461 MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr)); 00462 return mqp; 00463 } 00464 } 00465 00466 return NULL; 00467 } 00468 00469 static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev, 00470 struct mqtt_packet *mqp) 00471 { 00472 if(0 == MQP_CL_MAP_GET(mqp)) { 00473 wl_remove(wl, prev, mqp); 00474 mqp_free(mqp); 00475 00476 return true; 00477 } 00478 00479 return false; 00480 } 00481 00482 static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id) 00483 { 00484 struct mqtt_packet *prev = NULL, *mqp = NULL; 00485 00486 if(false == has_clean_session(cl_usr)) { 00487 mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev); 00488 if(mqp) 00489 wl_rmfree_try(qos_ack1_wl, prev, mqp); 00490 } 00491 00492 return true; 00493 } 00494 00495 static 00496 void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr) 00497 { 00498 struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL; 00499 uint32_t bmap = CL_BMAP(cl_usr); 00500 00501 for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) { 00502 next = mqp->next; 00503 00504 /* Ideally, check should be done to see if cl_usr and mqp are 00505 associated. If yes, then the bit should be cleared. At the 00506 moment, blind clearing of the bit has been implemented and 00507 it has no side effects. 00508 */ 00509 MQP_CL_MAP_CLR(mqp, bmap); 00510 00511 /* MQP with no clients has no more use, so try deleting MQP */ 00512 if(wl_rmfree_try(wl, prev, mqp)) 00513 mqp = prev; /* MQP deleted, prep for next iteration */ 00514 } 00515 00516 return; 00517 } 00518 00519 static void ack1_wl_mqp_purge(struct client_usr *cl_usr) 00520 { 00521 wl_purge(qos_ack1_wl, cl_usr); 00522 } 00523 00524 static void sess_wl_mqp_purge(struct client_usr *cl_usr) 00525 { 00526 wl_purge(sess_mqp_wl, cl_usr); 00527 } 00528 00529 static void session_resume(struct client_usr *cl_usr) 00530 { 00531 ack1_wl_mqp_dispatch(cl_usr); 00532 ack2_msg_id_dispatch(cl_usr); 00533 sess_wl_mqp_dispatch(cl_usr); 00534 } 00535 00536 static void session_delete(struct client_usr *cl_usr) 00537 { 00538 ack1_wl_mqp_purge(cl_usr); 00539 ack2_msg_id_purge(cl_usr); 00540 sess_wl_mqp_purge(cl_usr); 00541 } 00542 00543 static bool has_session_data(struct client_usr *cl_usr) 00544 { 00545 uint32_t map = CL_BMAP(cl_usr); 00546 struct mqtt_packet *elem; 00547 00548 if(cl_usr->n_sub) 00549 return true; 00550 00551 elem = qos_ack1_wl->head; 00552 while(elem) { 00553 if(MQP_CL_MAP_GET(elem) & map) 00554 return true; 00555 00556 elem = elem->next; 00557 } 00558 00559 elem = sess_mqp_wl->head; 00560 while(elem) { 00561 if(MQP_CL_MAP_GET(elem) & map) 00562 return true; 00563 00564 elem = elem->next; 00565 } 00566 00567 return false; 00568 } 00569 00570 bool cl_can_session_delete(void *usr_cl) 00571 { 00572 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00573 00574 return cl_usr? (has_clean_session(cl_usr) || 00575 !has_session_data(cl_usr)) : false; 00576 } 00577 00578 void cl_on_net_close(void *usr_cl) 00579 { 00580 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00581 00582 if(is_assigned(cl_usr)) { 00583 if(false == has_session_data(cl_usr)) 00584 cl_usr_free(cl_usr); 00585 } else if(has_clean_session(cl_usr)) { 00586 session_delete(cl_usr); 00587 cl_usr_free(cl_usr); 00588 } else { 00589 set_connect_state(cl_usr, false); 00590 cl_usr->ctx = NULL; 00591 cl_usr->app = NULL; 00592 } 00593 00594 return; 00595 } 00596 00597 static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id) 00598 { 00599 mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP, 00600 MQTT_QOS0, true, msg_id); 00601 00602 if(qos2_pub_rx_is_done(cl_usr, msg_id)) 00603 qos2_pub_rx_unlog(cl_usr, msg_id); 00604 00605 return true; 00606 } 00607 00608 static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id) 00609 { 00610 struct mqtt_packet *prev = NULL, *mqp = NULL; 00611 00612 mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev); 00613 if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) { 00614 00615 wl_rmfree_try(qos_ack1_wl, prev, mqp); 00616 00617 mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, 00618 MQTT_QOS1, true, msg_id); 00619 00620 return true; 00621 } 00622 00623 return false; 00624 } 00625 00626 bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id) 00627 { 00628 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00629 bool rv = false; 00630 00631 if(NULL == cl_usr) 00632 return rv; 00633 00634 switch(msg_type) { 00635 00636 case MQTT_PUBACK: 00637 rv = ack1_unmap_rmfree_try(cl_usr, msg_id); 00638 break; 00639 00640 case MQTT_PUBREC: 00641 rv = proc_pub_rec_rx(cl_usr, msg_id); 00642 break; 00643 00644 case MQTT_PUBREL: 00645 rv = proc_pub_rel_rx(cl_usr, msg_id); 00646 break; 00647 00648 case MQTT_PUBCOMP: 00649 rv = ack2_msg_id_unlog(cl_usr, msg_id); 00650 break; 00651 00652 default: 00653 break; 00654 } 00655 00656 return rv; 00657 } 00658 00659 static void assign_cl_index_as_id(struct client_usr *cl_usr) 00660 { 00661 /* TBD: need a better implementation */ 00662 char *client_id = cl_usr->client_id; 00663 00664 client_id[0] = 'S'; 00665 client_id[1] = 'e'; 00666 client_id[2] = 'l'; 00667 client_id[3] = 'f'; 00668 client_id[4] = '-'; 00669 client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30; 00670 client_id[6] = ((cl_usr->index & 0x0f)) + 0x30; 00671 client_id[7] = '\0'; 00672 00673 /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */ 00674 00675 return; 00676 } 00677 00678 static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf) 00679 { 00680 struct client_usr *cl_usr, *fr_usr = NULL; 00681 int32_t idx = 0; 00682 for(idx = 0; idx < MAX_CLIENTS; idx++) { 00683 cl_usr = users + idx; 00684 if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr)) 00685 fr_usr = cl_usr; /* Note 1st free cl_usr */ 00686 00687 if((NULL == client_id) && (NULL != fr_usr)) { 00688 /* Free cl_usr is present to create CL-ID */ 00689 break; 00690 00691 } else if((NULL != client_id) && 00692 (0 == strncmp(cl_usr->client_id, client_id, 00693 MAX_CLIENT_ID_LEN))) { 00694 /* Found a client obj with exact ID match */ 00695 if(is_connected(cl_usr)) { 00696 /* Error: CL-ID is already active */ 00697 vh_buf[1] = CONNACK_RC_CLI_REJECT; 00698 cl_usr = NULL; 00699 } 00700 break; 00701 } 00702 } 00703 00704 if(idx == MAX_CLIENTS) { /* Did not find a match */ 00705 cl_usr = fr_usr; 00706 if(NULL == cl_usr) 00707 /* Server Resource unavailable */ 00708 vh_buf[1] = CONNACK_RC_SVR_UNAVBL; 00709 } 00710 00711 if(NULL != cl_usr) { 00712 if(NULL == client_id) 00713 assign_cl_index_as_id(cl_usr); /* Get ID */ 00714 else if(IS_CL_USR_FREE(cl_usr)) 00715 strncpy(cl_usr->client_id, client_id, 00716 MAX_CLIENT_ID_LEN); 00717 00718 set_assigned(cl_usr, true); 00719 } 00720 00721 USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r", 00722 cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1, 00723 cl_usr? cl_usr->client_id : ""); 00724 00725 return cl_usr; 00726 } 00727 00728 uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id, 00729 void *app_cl, void *will, void **usr_cl) 00730 { 00731 uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT}; 00732 struct client_usr *cl_usr; 00733 00734 if(client_id && ('\0' == client_id[0])) 00735 /* Shouldn't happen: CLIENT ID with a NUL only string */ 00736 return CONNACK_RC_CLI_REJECT; 00737 00738 cl_usr = assign_cl_usr(client_id, vh_buf); 00739 if(NULL == cl_usr) 00740 return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */ 00741 00742 if((false == clean_session) && has_session_data(cl_usr)) 00743 vh_buf[0] = 0x01; /* Set Session Present Flag */ 00744 00745 #if 0 00746 if(0x00 == vh_buf[1]) 00747 *usr_cl = (void*)cl_usr; 00748 #endif 00749 *usr_cl = (void*)cl_usr; 00750 00751 cl_usr->ctx = ctx_cl; 00752 cl_usr->app = app_cl; 00753 cl_usr->will = will; 00754 00755 return ((vh_buf[0] << 8) | vh_buf[1]); 00756 } 00757 00758 void cl_on_connack_send(void *usr_cl, bool clean_session) 00759 { 00760 struct client_usr *cl_usr = (struct client_usr*) usr_cl; 00761 00762 if(false == is_assigned(cl_usr)) { 00763 USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r", 00764 cl_usr->index); 00765 return; 00766 } 00767 00768 set_assigned(cl_usr, false); 00769 set_clean_session(cl_usr, clean_session); 00770 set_connect_state(cl_usr, true); 00771 00772 if(clean_session) { 00773 session_delete(cl_usr); 00774 } else { 00775 session_resume(cl_usr); 00776 } 00777 00778 DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id); 00779 00780 return; 00781 } 00782 00783 int32_t cl_mgmt_init(void) 00784 { 00785 cl_usr_init(); 00786 00787 return 0; 00788 } 00789 00790 }//namespace mbed_mqtt
Generated on Wed Jul 13 2022 09:55:38 by
1.7.2