TI's MQTT Demo with freertos CM4F

Dependencies:   mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers client_mgmt.cpp Source File

client_mgmt.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 #include "server_util.h"
00017 #include "client_mgmt.h"
00018 #include "server_pkts.h"
00019 
00020 namespace mbed_mqtt {
00021 
00022 #ifndef CFG_SR_MAX_CL_ID_SIZE
00023 #define MAX_CLIENT_ID_LEN 64
00024 #else
00025 #define MAX_CLIENT_ID_LEN CFG_SR_MAX_CL_ID_SIZE
00026 #endif
00027 
00028 #ifndef CFG_SR_MAX_NUM_CLIENT
00029 #define MAX_CLIENTS       16 /* Must not exceed 32 */
00030 #else
00031 #define MAX_CLIENTS       CFG_SR_MAX_NUM_CLIENT
00032 
00033 #if (CFG_SR_MAX_NUM_CLIENT > 32)
00034 #error "CFG_SR_MAX_NUM_CLIENT must not exceed 32"
00035 #endif
00036 #endif
00037 
00038 static struct client_usr {
00039 
00040         void  *ctx;  /* Client net */
00041         void  *app;  /* Client app */
00042 
00043         void  *will; /* Will topic */
00044         uint32_t   index;
00045         uint32_t   n_sub;
00046 
00047 #define MQ_CONNECT_FLAG  0x00000001
00048 #define CLEAN_SESS_FLAG  0x00000002
00049 #define ASSIGNMENT_FLAG  0x00000004   /* No state or connected */
00050 
00051         uint32_t   flags;
00052 
00053         char    client_id[MAX_CLIENT_ID_LEN];
00054 
00055         struct pub_qos2_cq qos2_rx_cq;
00056         struct pub_qos2_cq qos2_tx_cq;
00057 
00058 } users[MAX_CLIENTS];
00059 
00060 // TBD consistency between inline and macro functions
00061 
00062 static inline bool is_connected(struct client_usr *cl_usr)
00063 {
00064         return (cl_usr->flags & MQ_CONNECT_FLAG)? true : false;
00065 }
00066 
00067 static inline bool has_clean_session(struct client_usr *cl_usr)
00068 {
00069         return (cl_usr->flags & CLEAN_SESS_FLAG)? true : false;
00070 }
00071 
00072 static inline bool is_assigned(struct client_usr *cl_usr)
00073 {
00074         return (cl_usr->flags & ASSIGNMENT_FLAG)? true : false;
00075 }
00076 
00077 static inline void set_connect_state(struct client_usr *cl_usr,
00078                                      bool connected)
00079 {
00080         if(connected)
00081                 cl_usr->flags |=  MQ_CONNECT_FLAG;
00082         else 
00083                 cl_usr->flags &= ~MQ_CONNECT_FLAG;
00084 }
00085 
00086 static inline void set_clean_session(struct client_usr *cl_usr,
00087                                      bool clean_session)
00088 {
00089         if(clean_session)
00090                 cl_usr->flags |=  CLEAN_SESS_FLAG;
00091         else
00092                 cl_usr->flags &= ~CLEAN_SESS_FLAG;
00093 }
00094 
00095 static inline void set_assigned(struct client_usr *cl_usr,
00096                                 bool assignment)
00097 {
00098         if(assignment)
00099                 cl_usr->flags |=  ASSIGNMENT_FLAG;
00100         else
00101                 cl_usr->flags &= ~ASSIGNMENT_FLAG;
00102 }
00103 
00104 static void cl_usr_reset(struct client_usr *cl_usr)
00105 {
00106         cl_usr->ctx  = NULL;
00107         cl_usr->app  = NULL;
00108         cl_usr->will = NULL;
00109 
00110         cl_usr->n_sub = 0;
00111         cl_usr->flags = 0;
00112         
00113         cl_usr->client_id[0] = '\0';
00114 
00115         qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
00116         qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
00117 }
00118 
00119 static void cl_usr_init(void)
00120 {
00121         int32_t idx = 0;
00122         for(idx = 0; idx < MAX_CLIENTS; idx++) {
00123                 struct client_usr *cl_usr = users + idx;
00124                 cl_usr->index =  idx;
00125 
00126                 cl_usr_reset(cl_usr);
00127         }
00128 }
00129 
00130 static void cl_usr_free(struct client_usr *cl_usr)
00131 {
00132         cl_usr_reset(cl_usr);
00133         cl_usr->flags &= ~(MQ_CONNECT_FLAG | 
00134                            CLEAN_SESS_FLAG |
00135                            ASSIGNMENT_FLAG);
00136 }
00137 
00138 void cl_sub_count_add(void *usr_cl)
00139 {
00140         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00141 
00142         if(is_connected(cl_usr)) {
00143                 cl_usr->n_sub++;
00144                 USR_INFO("%s has added a new sub, now total is %u\n\r",
00145                          cl_usr->client_id, cl_usr->n_sub);
00146 
00147         }
00148 
00149         return;
00150 }
00151 
00152 void cl_sub_count_del(void *usr_cl)
00153 {
00154         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00155 
00156         if(is_connected(cl_usr)) {
00157                 cl_usr->n_sub--;
00158                 USR_INFO("%s has deleted a sub, now total is %u\n\r",
00159                          cl_usr->client_id, cl_usr->n_sub);
00160         }
00161 
00162         return;
00163 }
00164 
00165 /*----------------------------------------------------------------------------
00166  * QoS2 PUB RX Message handling mechanism and associated house-keeping
00167  *--------------------------------------------------------------------------*/
00168 static inline bool qos2_pub_rx_logup(struct client_usr *cl_usr, uint16_t msg_id)
00169 {
00170         return qos2_pub_cq_logup(&cl_usr->qos2_rx_cq, msg_id);
00171 }
00172 
00173 static inline bool ack2_msg_id_logup(struct client_usr *cl_usr, uint16_t msg_id)
00174 {
00175         return qos2_pub_cq_logup(&cl_usr->qos2_tx_cq, msg_id);
00176 }
00177 
00178 static inline bool qos2_pub_rx_unlog(struct client_usr *cl_usr, uint16_t msg_id)
00179 {
00180         return qos2_pub_cq_unlog(&cl_usr->qos2_rx_cq, msg_id);
00181 }
00182 
00183 static inline bool ack2_msg_id_unlog(struct client_usr *cl_usr, uint16_t msg_id)
00184 {
00185         return qos2_pub_cq_unlog(&cl_usr->qos2_tx_cq, msg_id);
00186 }
00187 
00188 static inline bool qos2_pub_rx_is_done(struct client_usr *cl_usr, uint16_t msg_id)
00189 {
00190         return qos2_pub_cq_check(&cl_usr->qos2_rx_cq, msg_id);
00191 }
00192 
00193 bool cl_mgmt_qos2_pub_rx_update(void *usr_cl, uint16_t msg_id)
00194 {
00195         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00196 
00197         return cl_usr && (qos2_pub_rx_is_done(cl_usr, msg_id) ||
00198                           qos2_pub_rx_logup(cl_usr, msg_id));
00199 }
00200 
00201 static void ack2_msg_id_dispatch(struct client_usr *cl_usr)
00202 {
00203         struct pub_qos2_cq *tx_cq = &cl_usr->qos2_tx_cq;
00204         uint8_t rd_idx = tx_cq->rd_idx;
00205         uint8_t n_free = tx_cq->n_free;
00206         uint8_t i = 0;
00207 
00208         for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free); i++) {
00209                 if(mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL, MQTT_QOS1,
00210                                     true, tx_cq->id_vec[i]) <= 0)
00211                         break;
00212         }
00213 
00214         return;
00215 }
00216 
00217 static void ack2_msg_id_purge(struct client_usr *cl_usr)
00218 {
00219         qos2_pub_cq_reset(&cl_usr->qos2_tx_cq);
00220         qos2_pub_cq_reset(&cl_usr->qos2_rx_cq);
00221 }
00222 
00223 /*
00224 
00225 */
00226 static struct mqtt_ack_wlist  wl_qos_ack1 = {NULL, NULL};
00227 static struct mqtt_ack_wlist *qos_ack1_wl = &wl_qos_ack1;
00228 
00229 /*
00230 
00231 */
00232 static struct mqtt_ack_wlist  wl_mqp_sess = {NULL, NULL};
00233 static struct mqtt_ack_wlist *sess_mqp_wl = &wl_mqp_sess;
00234 
00235 #define MQP_CL_MAP_GET(mqp)         (mqp->private_)
00236 #define MQP_CL_MAP_SET(mqp, cl_map) (mqp->private_ |=  cl_map)
00237 #define MQP_CL_MAP_CLR(mqp, cl_map) (mqp->private_ &= ~cl_map)
00238 
00239 #define CL_BMAP(cl_usr)             (1 << cl_usr->index)
00240 
00241 static inline
00242 int32_t _pub_dispatch(struct client_usr *cl_usr, struct mqtt_packet *mqp,
00243                  bool dup)
00244 {
00245         /* Error, if any, is handled in 'cl_net_close()' ....*/
00246         return mqtt_server_pub_dispatch(cl_usr->ctx, mqp, dup);
00247 }
00248 
00249 static void ack1_wl_mqp_dispatch(struct client_usr *cl_usr)
00250 {
00251         struct mqtt_packet *mqp = NULL;
00252 
00253         for(mqp = qos_ack1_wl->head; NULL != mqp; mqp = mqp->next)
00254                 if(MQP_CL_MAP_GET(mqp) & CL_BMAP(cl_usr))
00255                         _pub_dispatch(cl_usr, mqp, true);
00256 }
00257 
00258 #if  0
00259 static struct client_usr *find_cl_usr(uint32_t index)
00260 {
00261         struct client_usr *cl_usr = users + 0;
00262 
00263         int32_t i = 0;
00264         for(i = 0; i < MAX_CLIENTS; i++, cl_usr++) {
00265                 if(index == cl_usr->index)
00266                         break;
00267         }
00268 
00269         return (MAX_CLIENTS == i)? NULL : cl_usr;
00270 }
00271 #endif
00272 
00273 #define IS_CL_USR_FREE(cl_usr) ((  0  == cl_usr->flags)        &&       \
00274                                 ('\0' == cl_usr->client_id[0]))
00275 
00276 #define IS_CL_INACTIVE(cl_usr) ((  0  == cl_usr->flags)        &&       \
00277                                 ('\0' != cl_usr->client_id[0]))
00278 
00279 #define NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr)                          \
00280         ((((MQTT_QOS1 == qos) && has_clean_session(cl_usr)) ||          \
00281           ((MQTT_QOS0 == qos)))?                                        \
00282          false : true)
00283 
00284 static inline uint32_t _cl_bmap_get(void *usr_cl)
00285 {
00286         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00287 
00288         return IS_CL_USR_FREE(cl_usr)? 0 : CL_BMAP(cl_usr);
00289 }
00290 
00291 uint32_t cl_bmap_get(void *usr_cl)
00292 {
00293         return usr_cl? _cl_bmap_get(usr_cl) : 0;
00294 }
00295 
00296 void *cl_app_hndl_get(void *usr_cl)
00297 {
00298         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00299 
00300         return (cl_usr && is_connected((client_usr*) usr_cl))? cl_usr->app : NULL;
00301 }
00302 
00303 void *cl_will_hndl_get(void *usr_cl)
00304 {
00305         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00306 
00307         return (cl_usr && is_connected((client_usr*)usr_cl))? cl_usr->will : NULL;
00308 }
00309 
00310 static void pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
00311 {
00312         //uint32_t n_bits = sizeof(uint32_t) << 3;    /* Multiply by 8 */
00313         uint32_t sp_map = 0; /* client Map for sessions present */
00314         enum mqtt_qos qos = ENUM_QOS(mqp->fh_byte1);/* QOS */
00315         const uint32_t cl_ref = cl_map; /* Keep ref to original */
00316 
00317         int32_t i = 0;
00318         for(i = 0; i < MAX_CLIENTS; i++) {
00319                 if(cl_map & (1 << i)) {
00320                         struct client_usr *cl_usr = users + i; //find_cl_usr(i);
00321                         if(is_connected(cl_usr))
00322                                 if((_pub_dispatch(cl_usr, mqp, false) > 0)  &&
00323                                    NEED_TO_WAIT_LIST_PUBLISH(qos, cl_usr))
00324                                         continue;/* Processing done; next CL */
00325 
00326                         /* CL: unconnected / PUB Err / QOS1 PKT (clean sess) */
00327                         cl_map &= ~(1 << i); 
00328 
00329                         if(IS_CL_INACTIVE(cl_usr)) 
00330                                 sp_map |= (1 << i);  /* CL: Maintain session */
00331                 }
00332         }
00333 
00334         if(sp_map) {
00335                 struct mqtt_packet *cpy = mqp_server_copy(mqp); /* Make copy */
00336                 if(cpy) {
00337                         MQP_CL_MAP_SET(cpy, sp_map);
00338                         mqp_ack_wlist_append(sess_mqp_wl, cpy);
00339                 } else 
00340                         sp_map = 0;
00341         }
00342 
00343         if(cl_map) {                                    /* Wait List Publish */
00344                 MQP_CL_MAP_SET(mqp, cl_map);
00345                 mqp_ack_wlist_append(qos_ack1_wl, mqp);
00346         } else
00347                 mqp_free(mqp); /* PUB MQP now has no more use; must be freed */
00348 
00349         USR_INFO("PUBLISH: CL Map(0x%08x): For Ack 0x%08x, Inactive 0x%08x\n\r",
00350                  cl_ref, cl_map, sp_map);
00351 
00352         return;
00353 }
00354 
00355 void cl_pub_dispatch(uint32_t cl_map, struct mqtt_packet *mqp)
00356 {
00357         pub_dispatch(cl_map, mqp);
00358         return;
00359 }
00360 
00361 /* Move to mqtt_common.h and remove it from mqtt_client.h */
00362 static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
00363 {
00364         mqp_free(mqp);
00365         return MQP_ERR_PKT_LEN;
00366 }
00367 
00368 /* Move this to a common file */
00369 int32_t cl_pub_msg_send(void *usr_cl,
00370                     const struct utf8_string *topic, const uint8_t *data_buf,
00371                     uint32_t data_len, enum mqtt_qos qos, bool retain)
00372 {
00373         struct mqtt_packet *mqp = NULL;
00374 
00375         if((NULL == topic)                        ||
00376            ((data_len > 0) && (NULL == data_buf)) ||
00377            (NULL == usr_cl))
00378                 return MQP_ERR_FNPARAM;
00379 
00380         mqp = mqp_server_alloc(MQTT_PUBLISH, 2 + topic->length + 2 + data_len);
00381         if(NULL == mqp)
00382                 return MQP_ERR_PKT_AVL;
00383 
00384         if((0 > mqp_pub_append_topic(mqp, topic, qos? mqp_new_id_server(): 0)) ||
00385            (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) 
00386                 return len_err_free_mqp(mqp);
00387  
00388         mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
00389 
00390         pub_dispatch(_cl_bmap_get(usr_cl), mqp);
00391 
00392         return MQP_CONTENT_LEN(mqp);
00393 }
00394 
00395 static void wl_remove(struct mqtt_ack_wlist *list,
00396                       struct mqtt_packet *prev,
00397                       struct mqtt_packet *elem)
00398 {
00399         if(prev)
00400                 prev->next = elem->next;
00401         else
00402                 list->head = elem->next;
00403 
00404         if(NULL == list->head)
00405                 list->tail = NULL;
00406 
00407         if(list->tail == elem)
00408                 list->tail = prev;
00409 
00410         return;
00411 }
00412 
00413 static void sess_wl_mqp_dispatch(struct client_usr *cl_usr)
00414 {
00415         struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
00416         uint32_t cl_map = CL_BMAP(cl_usr);
00417 
00418         for(mqp = sess_mqp_wl->head; NULL != mqp; prev = mqp, mqp = next) {
00419                 struct mqtt_packet *cpy = NULL;
00420 
00421                 if(0 == (MQP_CL_MAP_GET(mqp) & cl_map))
00422                         continue; /* MQP & CL: no association */
00423 
00424                 /* MQP has associated client(s) -  process it */
00425 
00426                 /* Dissociate this client from MQP */
00427                 MQP_CL_MAP_CLR(mqp, cl_map);
00428                 next = mqp->next; /* House keeping */
00429 
00430                 if(0 == MQP_CL_MAP_GET(mqp)) {
00431                         /* MQP w/ no clients,  remove from WL */
00432                         wl_remove(sess_mqp_wl, prev, mqp);
00433 
00434                         /* Ensure correct arrangement for WL  */
00435                         cpy = mqp; mqp = prev;
00436                 } else {
00437                         /* MQP is associated w/ other clients */
00438                         cpy = mqp_server_copy(mqp);
00439                         if(NULL == cpy)
00440                                 continue;
00441                 }
00442 
00443                 /* Got packet from session, dispatch it to CL */
00444                 pub_dispatch(cl_map, cpy);
00445         }
00446 
00447         return;
00448 }
00449 
00450 static
00451 struct mqtt_packet *wl_mqp_unmap_find(struct mqtt_ack_wlist *wl,
00452                                       struct client_usr *cl_usr, uint16_t msg_id,
00453                                       struct mqtt_packet **prev)
00454 {
00455         struct mqtt_packet *mqp = NULL;
00456 
00457         *prev = NULL;
00458         for(mqp = wl->head; NULL != mqp; *prev = mqp, mqp = mqp->next) {
00459                 if(msg_id == mqp->msg_id) {
00460                         /* Found a MQP whose msg_id matches with input */
00461                         MQP_CL_MAP_CLR(mqp, CL_BMAP(cl_usr));
00462                         return mqp;
00463                 }
00464         }
00465 
00466         return NULL;
00467 }
00468 
00469 static bool wl_rmfree_try(struct mqtt_ack_wlist *wl, struct mqtt_packet *prev,
00470                           struct mqtt_packet *mqp)
00471 {
00472         if(0 == MQP_CL_MAP_GET(mqp)) {
00473                 wl_remove(wl, prev, mqp);
00474                 mqp_free(mqp);
00475 
00476                 return true;
00477         }
00478 
00479         return false;
00480 }
00481 
00482 static bool ack1_unmap_rmfree_try(struct client_usr *cl_usr, uint16_t msg_id)
00483 {
00484         struct mqtt_packet *prev = NULL, *mqp = NULL;
00485 
00486         if(false == has_clean_session(cl_usr)) {
00487                 mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
00488                 if(mqp)
00489                         wl_rmfree_try(qos_ack1_wl, prev, mqp);
00490         }
00491 
00492         return true;
00493 }
00494 
00495 static 
00496 void wl_purge(struct mqtt_ack_wlist *wl, struct client_usr *cl_usr)
00497 {
00498         struct mqtt_packet *mqp = NULL, *prev = NULL, *next = NULL;
00499         uint32_t bmap = CL_BMAP(cl_usr);
00500 
00501         for(mqp = wl->head; NULL != mqp; prev = mqp, mqp = next) {
00502                 next = mqp->next;
00503 
00504                 /* Ideally, check should be done to see if cl_usr and mqp are
00505                    associated. If yes, then the bit should be cleared. At the
00506                    moment, blind clearing of the bit has been implemented and
00507                    it has no side effects.
00508                 */
00509                 MQP_CL_MAP_CLR(mqp, bmap);
00510 
00511                 /* MQP with no clients has no more use, so try deleting MQP */
00512                 if(wl_rmfree_try(wl, prev, mqp))
00513                         mqp = prev; /* MQP deleted, prep for next iteration */
00514         }
00515 
00516         return;
00517 }
00518 
00519 static void ack1_wl_mqp_purge(struct client_usr *cl_usr)
00520 {
00521         wl_purge(qos_ack1_wl, cl_usr);
00522 }
00523 
00524 static void sess_wl_mqp_purge(struct client_usr *cl_usr)
00525 {
00526         wl_purge(sess_mqp_wl, cl_usr);
00527 }
00528 
00529 static void session_resume(struct client_usr *cl_usr)
00530 {
00531         ack1_wl_mqp_dispatch(cl_usr);
00532         ack2_msg_id_dispatch(cl_usr);
00533         sess_wl_mqp_dispatch(cl_usr);
00534 }
00535 
00536 static void session_delete(struct client_usr *cl_usr)
00537 {
00538         ack1_wl_mqp_purge(cl_usr);
00539         ack2_msg_id_purge(cl_usr);
00540         sess_wl_mqp_purge(cl_usr);
00541 }
00542 
00543 static bool has_session_data(struct client_usr *cl_usr)
00544 {
00545         uint32_t map = CL_BMAP(cl_usr);
00546         struct mqtt_packet *elem;
00547 
00548         if(cl_usr->n_sub)
00549                 return true;
00550 
00551         elem = qos_ack1_wl->head;
00552         while(elem) {
00553                 if(MQP_CL_MAP_GET(elem) & map) 
00554                         return true;
00555 
00556                 elem = elem->next;
00557         }
00558 
00559         elem = sess_mqp_wl->head;
00560         while(elem) {
00561                 if(MQP_CL_MAP_GET(elem) & map) 
00562                         return true;
00563 
00564                 elem = elem->next;
00565         }
00566 
00567         return false;
00568 }
00569 
00570 bool cl_can_session_delete(void *usr_cl)
00571 {
00572         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00573 
00574         return cl_usr? (has_clean_session(cl_usr) ||
00575                         !has_session_data(cl_usr)) : false;
00576 }
00577 
00578 void cl_on_net_close(void *usr_cl)
00579 {
00580         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00581 
00582         if(is_assigned(cl_usr)) {
00583                 if(false == has_session_data(cl_usr))
00584                         cl_usr_free(cl_usr);
00585         } else if(has_clean_session(cl_usr)) {
00586                 session_delete(cl_usr);
00587                 cl_usr_free(cl_usr);
00588         } else {
00589                 set_connect_state(cl_usr, false);
00590                 cl_usr->ctx = NULL;
00591                 cl_usr->app = NULL;
00592         }
00593 
00594         return;
00595 }
00596 
00597 static bool proc_pub_rel_rx(struct client_usr *cl_usr, uint16_t msg_id)
00598 {
00599         mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBCOMP,
00600                          MQTT_QOS0, true, msg_id);
00601 
00602         if(qos2_pub_rx_is_done(cl_usr, msg_id))
00603                 qos2_pub_rx_unlog(cl_usr, msg_id);
00604 
00605         return true;
00606 }
00607 
00608 static bool proc_pub_rec_rx(struct client_usr *cl_usr, uint16_t msg_id)
00609 {
00610         struct mqtt_packet *prev = NULL, *mqp = NULL;
00611 
00612         mqp = wl_mqp_unmap_find(qos_ack1_wl, cl_usr, msg_id, &prev);
00613         if(mqp && ack2_msg_id_logup(cl_usr, msg_id)) {
00614 
00615                 wl_rmfree_try(qos_ack1_wl, prev, mqp);
00616 
00617                 mqtt_vh_msg_send(cl_usr->ctx, MQTT_PUBREL,
00618                                  MQTT_QOS1, true, msg_id);
00619 
00620                 return true;
00621         }
00622 
00623         return false;
00624 }
00625 
00626 bool cl_notify_ack(void *usr_cl, uint8_t msg_type, uint16_t msg_id)
00627 {
00628         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00629         bool rv = false;
00630 
00631         if(NULL == cl_usr)
00632                 return rv;
00633 
00634         switch(msg_type) {
00635 
00636         case MQTT_PUBACK:
00637                 rv = ack1_unmap_rmfree_try(cl_usr, msg_id);
00638                 break;
00639 
00640         case MQTT_PUBREC:
00641                 rv = proc_pub_rec_rx(cl_usr, msg_id);
00642                 break;
00643 
00644         case MQTT_PUBREL:
00645                 rv = proc_pub_rel_rx(cl_usr, msg_id);
00646                 break;
00647 
00648         case MQTT_PUBCOMP:
00649                 rv = ack2_msg_id_unlog(cl_usr, msg_id);
00650                 break;
00651 
00652         default:
00653                 break;
00654         }
00655 
00656         return rv;
00657 }
00658 
00659 static void assign_cl_index_as_id(struct client_usr *cl_usr)
00660 {
00661         /* TBD: need a better implementation */
00662         char *client_id = cl_usr->client_id;
00663 
00664         client_id[0] = 'S';
00665         client_id[1] = 'e';
00666         client_id[2] = 'l';
00667         client_id[3] = 'f';
00668         client_id[4] = '-';
00669         client_id[5] = ((cl_usr->index & 0xf0) >> 4) + 0x30;
00670         client_id[6] = ((cl_usr->index & 0x0f))      + 0x30;
00671         client_id[7] = '\0';
00672 
00673         /* Make sure that above array size is with in MAX_CLIENT_ID_LEN */
00674 
00675         return;
00676 }
00677 
00678 static struct client_usr *assign_cl_usr(char *client_id, uint8_t *vh_buf)
00679 {
00680         struct client_usr *cl_usr, *fr_usr = NULL;
00681         int32_t idx = 0;
00682         for(idx = 0; idx < MAX_CLIENTS; idx++) {
00683                 cl_usr = users + idx;
00684                 if((NULL == fr_usr) && IS_CL_USR_FREE(cl_usr))
00685                         fr_usr = cl_usr;  /* Note 1st free cl_usr */
00686 
00687                 if((NULL == client_id) && (NULL != fr_usr)) {
00688                         /* Free cl_usr is present to create CL-ID */
00689                         break;
00690 
00691                 } else if((NULL != client_id) &&
00692                           (0 == strncmp(cl_usr->client_id, client_id,
00693                                         MAX_CLIENT_ID_LEN))) {
00694                         /* Found a client obj with exact ID match */
00695                         if(is_connected(cl_usr)) {
00696                                 /* Error: CL-ID is already active */
00697                                 vh_buf[1] = CONNACK_RC_CLI_REJECT;
00698                                 cl_usr    = NULL;
00699                         }
00700                         break;
00701                 }
00702         }
00703 
00704         if(idx == MAX_CLIENTS) {          /* Did not find a match */
00705                 cl_usr   = fr_usr;
00706                 if(NULL == cl_usr)
00707                         /* Server Resource unavailable */
00708                         vh_buf[1] = CONNACK_RC_SVR_UNAVBL;
00709         }
00710 
00711         if(NULL != cl_usr) {
00712                 if(NULL == client_id)
00713                         assign_cl_index_as_id(cl_usr); /* Get ID */
00714                 else if(IS_CL_USR_FREE(cl_usr))
00715                         strncpy(cl_usr->client_id, client_id,
00716                                 MAX_CLIENT_ID_LEN);
00717 
00718                 set_assigned(cl_usr, true);
00719         }
00720 
00721         USR_INFO("Assignment of cl_usr %s (detail: index %d and name %s) \n\r",
00722                  cl_usr? "Succeeded" : "Failed", cl_usr? (int32_t)cl_usr->index : -1,
00723                  cl_usr? cl_usr->client_id : "");
00724 
00725         return cl_usr;
00726 }
00727 
00728 uint16_t cl_connect_rx(void *ctx_cl, bool clean_session, char *client_id,
00729                   void *app_cl, void *will, void **usr_cl)
00730 {
00731         uint8_t vh_buf[] = {0x00, CONNACK_RC_REQ_ACCEPT};
00732         struct client_usr *cl_usr;
00733 
00734         if(client_id && ('\0' == client_id[0]))
00735                 /* Shouldn't happen: CLIENT ID with a NUL only string */
00736                 return CONNACK_RC_CLI_REJECT;
00737 
00738         cl_usr = assign_cl_usr(client_id, vh_buf);
00739         if(NULL == cl_usr)
00740                 return vh_buf[1]; /* Use vh_buf from assign_cl_usr() */
00741 
00742         if((false == clean_session) && has_session_data(cl_usr))
00743                 vh_buf[0] = 0x01; /* Set Session Present Flag */
00744 
00745 #if  0
00746         if(0x00 == vh_buf[1])
00747                 *usr_cl = (void*)cl_usr;
00748 #endif
00749         *usr_cl = (void*)cl_usr;
00750 
00751         cl_usr->ctx  = ctx_cl;
00752         cl_usr->app  = app_cl;
00753         cl_usr->will = will;
00754 
00755         return ((vh_buf[0] << 8) | vh_buf[1]);
00756 }
00757 
00758 void cl_on_connack_send(void *usr_cl, bool clean_session)
00759 {
00760         struct client_usr *cl_usr = (struct client_usr*) usr_cl;
00761 
00762         if(false == is_assigned(cl_usr)) {
00763                 USR_INFO("Fatal: CONNACK for unassigned cl_usr Id %u, abort\n\r",
00764                          cl_usr->index);
00765                 return;
00766         }
00767 
00768         set_assigned(cl_usr, false);
00769         set_clean_session(cl_usr, clean_session);
00770         set_connect_state(cl_usr, true);
00771 
00772         if(clean_session) {
00773                 session_delete(cl_usr);
00774         } else {
00775                 session_resume(cl_usr);
00776         }
00777 
00778         DBG_INFO("Server is now connected to client %s\n\r", cl_usr->client_id);
00779 
00780         return;
00781 }
00782 
00783 int32_t cl_mgmt_init(void)
00784 {
00785         cl_usr_init();
00786 
00787         return 0;
00788 }
00789 
00790 }//namespace mbed_mqtt