Part of TI's mqtt

Dependents:   mqtt_V1 cc3100_Test_mqtt_CM3

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mqtt_client.cpp Source File

mqtt_client.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 /*
00017    mqtt_client.c
00018 
00019    The module provides implementation to the public interface of the MQTT
00020    Client Library.
00021 */
00022 #include "FreeRTOS.h"
00023 #include "mqtt_client.h"
00024 #include "semphr.h"
00025 
00026 #include "cli_uart.h"
00027 
00028 #define PRINT_BUF_LEN    128
00029 extern int8_t print_buf[PRINT_BUF_LEN];
00030 
00031 namespace mbed_mqtt
00032 {
00033 
00034 //void createMutex(void);
00035 
00036 static void  *mutex = NULL;
00037 static void (*mutex_lockin)(void*) = NULL;
00038 static void (*mutex_unlock)(void*) = NULL;
00039 
00040 #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex);
00041 #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex);
00042 
00043 static bool aux_dbg_enbl = true;
00044 int32_t (*debug_printf)(const char *fmt, ...)   = NULL;
00045 
00046 #define USR_INFO debug_printf
00047 #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__)
00048 
00049 static const struct device_net_services *net_ops = NULL;
00050 
00051 static uint16_t msg_id = 0xFFFF;
00052 static inline uint16_t assign_new_msg_id()
00053 {
00054     return msg_id += 2;
00055 }
00056 
00057 SemaphoreHandle_t xSemaphore = NULL;
00058 void createMutex()
00059 {
00060 
00061     xSemaphore = xSemaphoreCreateMutex();
00062 }
00063 
00064 /*-----------------------------------------------------------------------------
00065  * Data structure for managing the client and its nuances
00066  *---------------------------------------------------------------------------*/
00067 
00068 /* Construct to manage TX for network that requires LIB to send a partial and
00069    incremental data to support restrictive TCP segments. Specifically, for the
00070    deployments, in which the network layer supports small segments, there can
00071    be only one in-flight message.
00072 */
00073 struct tx_part_pkt {
00074 
00075     /* Refers to MQP, if TX for it is active, otherwise, set it to NULL */
00076     struct mqtt_packet *tx_mqp;
00077 
00078     union {
00079 #define VH_MSG_LEN 4
00080         uint8_t          vh_msg[VH_MSG_LEN];  /* VH msg for MQP = NULL */
00081         const uint8_t    *buffer;             /* Refers to data in MQP */
00082     };
00083 
00084     uint32_t                 length;              /* Length of entire data */
00085     uint32_t                 offset;              /* Next data for sending */
00086 };
00087 
00088 static bool tx_part_setup(struct tx_part_pkt *tx_part, const uint8_t *buffer,
00089                           uint32_t length, struct mqtt_packet *tx_mqp)
00090 {
00091 
00092     if(tx_mqp) {
00093         tx_part->buffer = buffer;
00094     } else {
00095         if(VH_MSG_LEN < length)
00096             return false;
00097 
00098         buf_wr_nbytes(tx_part->vh_msg, buffer, length);
00099     }
00100 
00101     tx_part->length = length;
00102     tx_part->tx_mqp = tx_mqp;
00103 
00104     return true;
00105 }
00106 
00107 static void tx_part_reset(struct tx_part_pkt *tx_part)
00108 {
00109 
00110     struct mqtt_packet *tx_mqp = tx_part->tx_mqp;
00111     if(tx_mqp) {
00112         mqp_free(tx_mqp);
00113     }
00114     tx_part->vh_msg[0] = 0x00;
00115     tx_part->tx_mqp    = NULL;
00116     tx_part->length    = 0;
00117     tx_part->offset    = 0;
00118 
00119     return;
00120 }
00121 
00122 static const uint8_t *tx_part_buf_p(struct tx_part_pkt *tx_part)
00123 {
00124     struct mqtt_packet *tx_mqp = tx_part->tx_mqp;
00125     uint32_t offset = tx_part->offset;
00126 
00127     return  tx_mqp?
00128             tx_part->buffer + offset :
00129             tx_part->vh_msg + offset;
00130 }
00131 
00132 static void tx_part_addup(struct tx_part_pkt *tx_part, uint32_t offset)
00133 {
00134     tx_part->offset += offset;
00135 }
00136 
00137 #define TX_PART_BUFFER(tx_part)  tx_part_buf_p(tx_part)
00138 #define TX_PART_BUF_SZ(tx_part) (tx_part->length - tx_part->offset)
00139 #define TX_PART_IN_USE(tx_part)  TX_PART_BUF_SZ(tx_part)
00140 
00141 enum module_state {
00142 
00143     WAIT_INIT_STATE,
00144     INIT_DONE_STATE = 0x01,
00145 };
00146 
00147 static enum module_state cl_lib_state = WAIT_INIT_STATE;
00148 
00149 static uint16_t  loopb_portid = 0;
00150 static bool grp_has_cbfn = false;
00151 
00152 #define USE_PROTO_V31_FLAG         MQTT_CFG_PROTOCOL_V31
00153 #define APP_RECV_TASK_FLAG         MQTT_CFG_APP_HAS_RTSK
00154 #define GROUP_CONTEXT_FLAG         MQTT_CFG_MK_GROUP_CTX
00155 
00156 #define CLEAN_SESSION_FLAG         0x00010000
00157 #define CONNACK_AWAIT_FLAG         0x00020000
00158 #define NOW_CONNECTED_FLAG         0x00040000
00159 #define KA_PINGER_RSP_FLAG         0x00080000
00160 #define USER_PING_RSP_FLAG         0x00100000
00161 #define NETWORK_CLOSE_FLAG         0x00200000
00162 #define DO_CONNACK_TO_FLAG         0x00400000
00163 
00164 static struct client_ctx *free_ctxs  = NULL;  /* CTX construct available */
00165 static struct client_ctx *used_ctxs  = NULL;  /* Relevant only for group */
00166 static struct client_ctx *conn_ctxs  = NULL;  /* Relevant only for group */
00167 
00168 static void cl_ctx_freeup(struct client_ctx *cl_ctx)
00169 {
00170     cl_ctx->next = free_ctxs;
00171     free_ctxs = cl_ctx;
00172 
00173     return;
00174 }
00175 
00176 #define IS_PROTO_VER31(cl_ctx) (cl_ctx->flags & USE_PROTO_V31_FLAG)
00177 #define AWAITS_CONNACK(cl_ctx) (cl_ctx->flags & CONNACK_AWAIT_FLAG)
00178 #define HAS_CONNECTION(cl_ctx) (cl_ctx->flags & NOW_CONNECTED_FLAG)
00179 #define AWAITS_KA_PING(cl_ctx) (cl_ctx->flags & KA_PINGER_RSP_FLAG)
00180 #define AWAITS_PINGRSP(cl_ctx) (cl_ctx->flags & USER_PING_RSP_FLAG)
00181 #define IS_CLN_SESSION(cl_ctx) (cl_ctx->flags & CLEAN_SESSION_FLAG)
00182 #define RECV_TASK_AVBL(cl_ctx) (cl_ctx->flags & APP_RECV_TASK_FLAG)
00183 #define A_GROUP_MEMBER(cl_ctx) (cl_ctx->flags & GROUP_CONTEXT_FLAG)
00184 #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG)
00185 #define CFG_CONNACK_TO(cl_ctx) (cl_ctx->flags & DO_CONNACK_TO_FLAG)
00186 
00187 #ifndef CFG_CL_MQTT_CTXS
00188 #define MAX_NWCONN 4
00189 #else
00190 #define MAX_NWCONN CFG_CL_MQTT_CTXS
00191 #endif
00192 
00193 static struct client_desc {
00194 
00195     /* ALERT:  "context" must be first elem in this structure, do not move */
00196     struct client_ctx           context;
00197 
00198 #define CLIENT(cl_ctx) ((struct client_desc*) cl_ctx)
00199 #define CL_CTX(client) ((struct client_ctx*)  client)
00200 
00201     /* Order/Sequence: Client ID, Will Topic, Will Msg, Username, Password */
00202     const struct utf8_string    *conn_pl_utf8s[5]; /* Ref: CONNECT Payload */
00203     uint8_t                           will_opts;
00204 
00205     /* Wait-List for Level 1 ACK(s), which are PUBACK, PUBREC, UN/SUBACK */
00206     struct mqtt_ack_wlist        qos_ack1_wl;
00207 
00208     /* Circular queue to track QOS2 PUBLISH packets from the server. They
00209        are tracked for the duration of PUBLISH-RX to PUBREL-RX.
00210     */
00211     struct pub_qos2_cq           qos2_rx_cq;
00212 
00213     /* Circular queue to track QOS2 PUBLISH packets from the client. They
00214        are tracked for the duration of PUBREC-RX to PUBOMP-RX.
00215     */
00216     struct pub_qos2_cq           qos2_tx_cq;
00217 
00218     struct mqtt_client_ctx_cbs   app_ctx_cbs; /* Callback funcs from app */
00219 #define CTX_CBS_PTR(cl_ctx) &(CLIENT(cl_ctx)->app_ctx_cbs)
00220 
00221     struct tx_part_pkt           tx_part;/* Reference to partial TX PKT */
00222     struct mqtt_packet          *rx_mqp; /* Reference to partial RX PKT */
00223     void                        *app;
00224 
00225     uint32_t                          nwconn_opts;
00226     char                          *server_addr;
00227     uint16_t                          port_number;
00228     struct secure_conn           nw_security;
00229 
00230 } clients[MAX_NWCONN];
00231 
00232 static void client_reset(struct client_desc *client)
00233 {
00234 
00235     struct mqtt_client_ctx_cbs *ctx_cbs = &client->app_ctx_cbs;
00236     int32_t i = 0;
00237 
00238     cl_ctx_reset(CL_CTX(client));
00239 
00240     for(i = 0; i < 5; i++) {
00241         client->conn_pl_utf8s[i] = NULL;
00242     }
00243     client->will_opts                = 0;
00244 
00245     mqp_ack_wlist_purge(&client->qos_ack1_wl);
00246 
00247     qos2_pub_cq_reset(&client->qos2_rx_cq);
00248     qos2_pub_cq_reset(&client->qos2_tx_cq);
00249 
00250     ctx_cbs->publish_rx              = NULL;
00251     ctx_cbs->ack_notify              = NULL;
00252     ctx_cbs->disconn_cb              = NULL;
00253 
00254     tx_part_reset(&client->tx_part);
00255     client->rx_mqp                   = NULL;
00256     client->app                      = NULL;
00257 
00258     client->nwconn_opts              = 0;
00259     client->server_addr              = NULL;
00260     client->port_number              = 0;
00261 
00262     secure_conn_struct_init(&client->nw_security);
00263 
00264     return;
00265 }
00266 
00267 static void client_desc_init(void)
00268 {
00269     int32_t i = 0;
00270     for(i = 0; i < MAX_NWCONN; i++) {
00271         struct client_desc   *client = clients + i;
00272         struct client_ctx *cl_ctx = CL_CTX(client);
00273 
00274         /* Initialize certain fields to defaults */
00275         client->qos_ack1_wl.head = NULL;
00276         client->qos_ack1_wl.tail = NULL;
00277         client->tx_part.tx_mqp   = NULL;
00278 
00279         client_reset(client); /* Reset remaining */
00280 
00281         cl_ctx->next = free_ctxs;
00282         free_ctxs    = cl_ctx;
00283     }
00284 
00285     return;
00286 }
00287 
00288 static void mqp_free_locked(struct mqtt_packet *mqp)
00289 {
00290 
00291 //        MUTEX_LOCKIN();
00292 
00293 //        MUTEX_UNLOCK();
00294 
00295     if( xSemaphore != NULL ) {
00296         // See if we can obtain the semaphore.  If the semaphore is not available
00297         // wait 10 ticks to see if it becomes free.
00298         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
00299             // We were able to obtain the semaphore and can now access the
00300             // shared resource.
00301             mqp_free(mqp);
00302 
00303             // We have finished accessing the shared resource.  Release the
00304             // semaphore.
00305             xSemaphoreGive( xSemaphore );
00306 
00307         } else {
00308             // We could not obtain the semaphore and can therefore not access
00309             // the shared resource safely.
00310             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
00311         }
00312     }
00313 }
00314 
00315 /*----------------------------------------------------------------------------
00316  * Fix-up to prevent certain good and non-callback MQP being reported to app
00317  *----------------------------------------------------------------------------
00318  */
00319 /* cor --> clear on read. */
00320 static bool mqp_do_not_report_cor(struct mqtt_packet *mqp)
00321 {
00322     bool rv = (1 == mqp->private_)? true : false;
00323     mqp->private_ = 0;
00324     return rv;
00325 }
00326 
00327 #define MQP_RX_DO_NOT_RPT_COR(mqp)     mqp_do_not_report_cor(mqp)
00328 
00329 /* Only if MQP has good RX data i.e. this macro shouldn't be used for bad RX */
00330 #define MQP_RX_DO_NOT_RPT_SET(mqp)    (mqp->private_ = 1)
00331 
00332 #define MQP_TX_DONE_LEN_ADD(mqp, add) (mqp->private_ += add)
00333 #define MQP_TX_DONE_LEN_GET(mqp)      (mqp->private_)
00334 
00335 /*---------------------------Fix-Up Ends ------------------------------------*/
00336 
00337 static int32_t loopb_net        = -1;
00338 static const uint8_t LOOP_DATA[] = {0x00, 0x01};
00339 #define LOOP_DLEN sizeof(LOOP_DATA)
00340 
00341 static int32_t loopb_trigger(void)
00342 {
00343 
00344 
00345     uint8_t ip_addr[] = {127,0,0,1};
00346 
00347     return  (-1 != loopb_net)?
00348             net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN,
00349                                loopb_portid, ip_addr, 4) : MQP_ERR_LIBQUIT;
00350 }
00351 
00352 static void session_311fix(struct client_ctx *cl_ctx)
00353 {
00354     struct mqtt_ack_wlist *wl = &CLIENT(cl_ctx)->qos_ack1_wl;
00355     struct mqtt_packet  *elem = wl->head;
00356 
00357     while(elem) {
00358         struct mqtt_packet *next = elem->next;
00359         if(MQTT_PUBLISH != elem->msg_type)
00360             mqp_ack_wlist_remove(wl, elem->msg_id);
00361 
00362         elem = next;
00363     }
00364 
00365     return;
00366 }
00367 
00368 static void session_delete(struct client_ctx *cl_ctx)
00369 {
00370     struct client_desc *client = CLIENT(cl_ctx);
00371 
00372     DBG_INFO("C: Cleaning session for net %d\n\r", cl_ctx->net);
00373 
00374     qos2_pub_cq_reset(&client->qos2_rx_cq);
00375     qos2_pub_cq_reset(&client->qos2_tx_cq);
00376 
00377     mqp_ack_wlist_purge(&client->qos_ack1_wl);
00378 
00379     return;
00380 }
00381 
00382 /*------------------------------------------------------------------------------
00383  * Routine to manage error conditions in client - close the network connection
00384  *----------------------------------------------------------------------------*/
00385 static void do_net_close(struct client_ctx *cl_ctx)
00386 {
00387     int32_t net = cl_ctx->net;
00388 
00389     if(-1 == net)
00390         return;  /* network closed already, must not happen */
00391 
00392     if(IS_CLN_SESSION(cl_ctx)) {
00393         session_delete(cl_ctx);
00394     } else if(!IS_PROTO_VER31(cl_ctx)) {
00395         /* Version 3.1.1 doesn't need SUB and UNSUB re-send */
00396         session_311fix(cl_ctx);
00397     }
00398 
00399     tx_part_reset(&CLIENT(cl_ctx)->tx_part); /* Part TX, if any */
00400 
00401     cl_ctx->flags &= ~(CONNACK_AWAIT_FLAG | NOW_CONNECTED_FLAG |
00402                        KA_PINGER_RSP_FLAG | USER_PING_RSP_FLAG |
00403                        NETWORK_CLOSE_FLAG | DO_CONNACK_TO_FLAG);
00404 
00405     cl_ctx->net = -1;
00406     net_ops->close(net);
00407 
00408     USR_INFO("C: Net %d now closed\n\r", net);
00409 
00410     return;
00411 }
00412 
00413 static void do_net_close_rx(struct client_ctx *cl_ctx, int32_t cause)
00414 {
00415     struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
00416 
00417     DBG_INFO("C: RX closing Net %d [%d]\n\r", cl_ctx->net, cause);
00418 
00419     do_net_close(cl_ctx);
00420     if(ctx_cbs->disconn_cb)
00421         ctx_cbs->disconn_cb(CLIENT(cl_ctx)->app, cause);
00422 
00423     if(A_GROUP_MEMBER(cl_ctx))
00424         cl_ctx_remove(&used_ctxs, cl_ctx);
00425 
00426     return;
00427 }
00428 
00429 static void do_net_close_tx(struct client_ctx *cl_ctx, char *cause)
00430 {
00431     DBG_INFO("C: TX closing Net %d [%s]\n\r", cl_ctx->net, cause);
00432 
00433     if(RECV_TASK_AVBL(cl_ctx)) {
00434         cl_ctx->flags |= NETWORK_CLOSE_FLAG;
00435         if(A_GROUP_MEMBER(cl_ctx))
00436             loopb_trigger();
00437     } else {
00438         struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp;
00439 
00440         do_net_close(cl_ctx);  /* No RX Task, close now */
00441 
00442         /* Release partial MQP, if any, for a CTX w/ CB */
00443         if((NULL != rx_mqp) && (NULL != rx_mqp->free))
00444             mqp_free(rx_mqp);
00445 
00446         CLIENT(cl_ctx)->rx_mqp = NULL;
00447     }
00448 
00449     return;
00450 }
00451 
00452 /*----------------------------------------------------------------------------
00453  * QoS2 PUB RX Message handling mechanism and associated house-keeping
00454  *--------------------------------------------------------------------------*/
00455 static bool qos2_pub_rx_logup(struct client_ctx *cl_ctx, uint16_t msg_id)
00456 {
00457     return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id);
00458 }
00459 
00460 static bool ack2_msg_id_logup(struct client_ctx *cl_ctx, uint16_t msg_id)
00461 {
00462     return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_tx_cq, msg_id);
00463 }
00464 
00465 static bool qos2_pub_rx_unlog(struct client_ctx *cl_ctx, uint16_t msg_id)
00466 {
00467     return qos2_pub_cq_unlog(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id);
00468 }
00469 
00470 static bool ack2_msg_id_unlog(struct client_ctx *cl_ctx, uint16_t msg_id)
00471 {
00472     struct client_desc *client = CLIENT(cl_ctx);
00473     if(qos2_pub_cq_unlog(&client->qos2_tx_cq, msg_id)) {
00474         struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
00475         if(ctx_cbs->ack_notify)
00476             ctx_cbs->ack_notify(client->app, MQTT_PUBCOMP,
00477                                 msg_id, NULL, 0);
00478         return true;
00479     }
00480 
00481     return false;
00482 }
00483 
00484 static bool qos2_pub_rx_is_done(struct client_ctx *cl_ctx, uint16_t msg_id)
00485 {
00486     return qos2_pub_cq_check(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id);
00487 }
00488 
00489 static bool awaits_pkts(struct client_ctx *cl_ctx)
00490 {
00491 
00492     struct client_desc *client = CLIENT(cl_ctx);
00493 
00494     return  client->qos_ack1_wl.head                ||
00495             qos2_pub_cq_count(&client->qos2_rx_cq)  ||
00496             qos2_pub_cq_count(&client->qos2_tx_cq)?
00497             true : false;
00498 }
00499 
00500 static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
00501 {
00502     mqp_free(mqp);
00503     return MQP_ERR_PKT_LEN;
00504 }
00505 
00506 static int32_t is_valid_utf8_string(const struct utf8_string *utf8)
00507 {
00508     /* Valid topic should be > 0 byte and must hosted in usable buffer */
00509     return ((utf8->length > 0) && (NULL != utf8->buffer))? true : false;
00510 }
00511 
00512 #define RET_IF_INVALID_UTF8(utf8)               \
00513         if(false == is_valid_utf8_string(utf8)) \
00514                 return -1;
00515 
00516 static bool is_connected(struct client_ctx *cl_ctx)
00517 {
00518 
00519     return (HAS_CONNECTION(cl_ctx) && !NEED_NET_CLOSE(cl_ctx))?
00520            true : false;
00521 }
00522 
00523 uint16_t mqtt_client_new_msg_id()
00524 {
00525     return assign_new_msg_id();
00526 }
00527 
00528 bool mqtt_client_is_connected(void *ctx)
00529 {
00530     return is_connected(CL_CTX(ctx));
00531 }
00532 
00533 /*----------------------------------------------------------------------------
00534  * MQTT TX Routines
00535  *--------------------------------------------------------------------------*/
00536 static void used_ctxs_TO_sort(struct client_ctx *cl_ctx_TO)
00537 {
00538     cl_ctx_remove(&used_ctxs, cl_ctx_TO);
00539     cl_ctx_timeout_insert(&used_ctxs, cl_ctx_TO);
00540 }
00541 
00542 static inline int32_t net_send(int32_t net, const uint8_t *buf, uint32_t len, void *ctx)
00543 {
00544 
00545     int32_t rv = net_ops->send(net, buf, len, ctx);
00546     if(rv <= 0) {
00547         memset(print_buf, 0x00, PRINT_BUF_LEN);
00548         sprintf((char*) print_buf, "MQP_ERR_NETWORK %i\r\n", MQP_ERR_NETWORK);
00549         Uart_Write((uint8_t *) print_buf);
00550         rv = MQP_ERR_NETWORK;
00551     }
00552     return rv;
00553 }
00554 
00555 #if 0
00556 static int32_t cl_ctx_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len,
00557                            bool is_conn_msg)
00558 {
00559 
00560     int32_t rv = MQP_ERR_NOTCONN;
00561 
00562     /* For CONNECT msg, a client context mustn't be already connected.
00563        For others msgs, a client context must be conected to server */
00564     if(false == (is_conn_msg ^ is_connected(cl_ctx)))
00565         goto cl_ctx_send_exit1;
00566 
00567     rv = net_send(cl_ctx->net, buf, len);
00568     if(rv > 0) { /* A good send, do follow-up */
00569         cl_ctx_timeout_update(cl_ctx, net_ops->time());
00570         if(A_GROUP_MEMBER(cl_ctx) && HAS_CONNECTION(cl_ctx)) {
00571             /* With update to timeout,
00572                a sorting is impending */
00573             used_ctxs_TO_sort(cl_ctx);
00574         }
00575 
00576         goto cl_ctx_send_exit1; /* A Good Send */
00577     }
00578 
00579     do_net_close_tx(cl_ctx, "snd-err");
00580 
00581 cl_ctx_send_exit1:
00582     USR_INFO("C: FH-B1 0x%02x, len %u bytes, to net %d: %s\n\r",
00583              *buf, len, cl_ctx->net, (rv > 0)? "Sent" : "Fail");
00584     return rv;
00585 }
00586 #endif
00587 
00588 static int32_t cl_ctx_part_send(struct client_ctx *cl_ctx)
00589 {
00590 
00591     struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part;
00592     const uint8_t *buf = TX_PART_BUFFER(tx_part);
00593     uint32_t len = TX_PART_BUF_SZ(tx_part);
00594     uint32_t ofs = tx_part->offset;
00595     uint8_t  B1  = *buf;
00596 
00597     int32_t rv = net_send(cl_ctx->net, buf, len, (void*)cl_ctx);
00598     if(rv > 0) { /* Follow-up for a good send */
00599         if(HAS_CONNECTION(cl_ctx)) {
00600             /* Update TX timeout, if 'CTX' is connected */
00601             cl_ctx_timeout_update(cl_ctx, net_ops->time());
00602 
00603             /* After update, 'CTX'(s) sorting is a must */
00604             if(A_GROUP_MEMBER(cl_ctx))
00605                 used_ctxs_TO_sort(cl_ctx);
00606         }
00607 
00608         if(rv != len)
00609             /* Partial data was sent */
00610             tx_part_addup(tx_part, rv);
00611         else
00612             tx_part_reset(tx_part);
00613 
00614         goto cl_ctx_send_exit1; /* A Good Send */
00615     }
00616 
00617     do_net_close_tx(cl_ctx, "snd-err");
00618 
00619 cl_ctx_send_exit1:
00620     USR_INFO("C: %s 0x%02x to net %d, %s (%d Bytes) [@ %u]\n\r",
00621              ofs? "PartN" : "FH-B1", B1, cl_ctx->net,
00622              (rv > 0)? "Sent" : "Fail", rv, net_ops->time());
00623 
00624     return rv;
00625 }
00626 
00627 static int32_t cl_ctx_seg1_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len,
00628                                 bool is_conn_msg, struct mqtt_packet *tx_mqp)
00629 {
00630 
00631     struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part;
00632 
00633     /* For CONNECT msg, a client context mustn't be already connected.
00634        For others msgs, a client context must be conected to server */
00635     if(false == (is_conn_msg ^ is_connected(cl_ctx)))
00636         return MQP_ERR_NOTCONN;
00637 
00638     if(TX_PART_IN_USE(tx_part))
00639         return MQP_ERR_BADCALL;
00640 
00641     tx_part_setup(tx_part, buf, len, tx_mqp);
00642 
00643     return cl_ctx_part_send(cl_ctx);
00644 }
00645 
00646 int32_t mqtt_client_send_progress(void *ctx)
00647 {
00648     struct client_ctx *cl_ctx = CL_CTX(ctx);
00649     struct tx_part_pkt *tx_part = NULL;
00650     int32_t rv = MQP_ERR_BADCALL;
00651 
00652     if(NULL == ctx)
00653         return MQP_ERR_FNPARAM;
00654 
00655     tx_part = &CLIENT(cl_ctx)->tx_part;
00656 
00657     if(!TX_PART_IN_USE(tx_part))
00658         return rv;
00659 
00660     rv = cl_ctx_part_send(cl_ctx);
00661     if(rv > 0)
00662         rv = TX_PART_BUF_SZ(tx_part);
00663 
00664     return rv;
00665 }
00666 
00667 static int32_t wr_connect_pl(struct client_ctx *cl_ctx, uint8_t *buf,
00668                              uint32_t fsz, uint8_t *conn_opts)
00669 {
00670 
00671     /* UTF8 usage selection order: Client, W-Topic, W-Msg, User-Name, Pwd */
00672     uint8_t utf8_sel[] = {0x00, WILL_CONFIG_VAL, 0x00,
00673                           USER_NAME_OPVAL, PASS_WORD_OPVAL
00674                          };
00675     struct client_desc *client = CLIENT(cl_ctx);
00676     uint8_t *ref = buf;
00677 
00678     int32_t i = 0;
00679 
00680     for(i = 0; i < 5; i++) {                         /* TBD 5 --> macro */
00681         uint16_t len = 2;
00682         const struct utf8_string *utf8 = client->conn_pl_utf8s[i];
00683         if(NULL == utf8) {
00684             /* UTF8 absent: Client ID (i = 0) and Will MSG (i = 2)
00685                set zero byte length in the CONNECT message */
00686             if(0 != i)
00687                 if(!((2 == i) && (*conn_opts & WILL_CONFIG_VAL)))
00688                     continue;      /* Others, just pass */
00689         } else {
00690             len += utf8->length;
00691         }
00692 
00693         if(fsz < (buf - ref + len)) {         /* TBD end = ref + fsz */
00694             Uart_Write((uint8_t*)"Payload: no space left fail\r\n");
00695             return MQP_ERR_PKT_LEN;   /* Payload: no space left */
00696         }
00697         if(2 == len) {
00698             buf += buf_wr_nbo_2B(buf, 0);  /*  WR 0 byte length */
00699         } else {
00700             buf += mqp_buf_wr_utf8(buf, utf8);
00701         }
00702         *conn_opts |= utf8_sel[i];          /* Enable message flags */
00703     }
00704 
00705     return buf - ref;
00706 }
00707 
00708 
00709 /* Define protocol information for the supported versions */
00710 static uint8_t mqtt310[] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', 0x03};
00711 static uint8_t mqtt311[] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04};
00712 
00713 static inline uint16_t get_connect_vh_len(struct client_ctx *cl_ctx)
00714 {
00715 
00716     return (IS_PROTO_VER31(cl_ctx)? sizeof(mqtt310) : sizeof(mqtt311))
00717            + 3;
00718 }
00719 
00720 static int32_t wr_connect_vh(struct client_ctx *cl_ctx, uint8_t *buf,
00721                              uint16_t ka_secs, uint8_t conn_opts)
00722 {
00723 
00724     uint8_t  *ref = buf;
00725 
00726     if(IS_PROTO_VER31(cl_ctx))
00727         buf += buf_wr_nbytes(buf, mqtt310, sizeof(mqtt310));
00728     else
00729         buf += buf_wr_nbytes(buf, mqtt311, sizeof(mqtt311));
00730 
00731     *buf++ = conn_opts;
00732     buf += buf_wr_nbo_2B(buf, ka_secs);
00733 
00734     return buf - ref;
00735 }
00736 
00737 static int32_t net_connect(struct client_ctx *cl_ctx)
00738 {
00739 
00740     struct client_desc *client = CLIENT(cl_ctx);
00741 
00742     if(NEED_NET_CLOSE(cl_ctx)) {
00743         Uart_Write((uint8_t*)"Return MQP_ERR_NOT_DEF\r\n");
00744         return MQP_ERR_NOT_DEF;
00745     }
00746     if(NULL == net_ops) {
00747         Uart_Write((uint8_t*)"Return MQP_ERR_NET_OPS\r\n");
00748         return MQP_ERR_NET_OPS;
00749     }
00750     cl_ctx->net = net_ops->open(client->nwconn_opts | DEV_NETCONN_OPT_TCP,
00751                                 client->server_addr,
00752                                 client->port_number,
00753                                 &client->nw_security);
00754 
00755     return (-1 == cl_ctx->net)? MQP_ERR_NETWORK : 0;
00756 }
00757 
00758 static
00759 int32_t cl_ctx_conn_state_try_locked(struct client_ctx *cl_ctx, const uint8_t *buf,
00760                                      uint32_t len, uint16_t ka_secs, bool clean_session,
00761                                      struct mqtt_packet *tx_mqp)
00762 {
00763 
00764     int32_t rv = 0;
00765 
00766 //        MUTEX_LOCKIN();
00767     if( xSemaphore != NULL ) {
00768         // See if we can obtain the semaphore.  If the semaphore is not available
00769         // wait 10 ticks to see if it becomes free.
00770         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 100 ) == pdTRUE ) {
00771             // We were able to obtain the semaphore and can now access the
00772             // shared resource.
00773 
00774             rv = net_connect(cl_ctx);
00775             if(rv < 0) {
00776                 Uart_Write((uint8_t*)"net_connect failed\r\n");
00777                 goto cl_ctx_conn_state_try_locked_exit1;
00778             }
00779             /* Ensure LIB is initialized & CTX isn't awaiting CONNACK */
00780             rv = MQP_ERR_BADCALL;
00781             if(false == ((INIT_DONE_STATE != cl_lib_state) || (AWAITS_CONNACK(cl_ctx)))) {
00782                 rv = cl_ctx_seg1_send(cl_ctx, buf, len, true, tx_mqp);
00783             }
00784             if(rv < 0) {
00785                 Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n");
00786                 goto cl_ctx_conn_state_try_locked_exit1; /* Fail */
00787             }
00788             /* Successfully sent CONNECT msg - let's do housekeeping */
00789             cl_ctx->timeout = net_ops->time();/* Fixup: CONN TX Time */
00790             cl_ctx->flags  |= DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG;
00791             cl_ctx->flags  |= clean_session? CLEAN_SESSION_FLAG : 0;
00792 
00793             cl_ctx->ka_secs = ka_secs;
00794 
00795             if(A_GROUP_MEMBER(cl_ctx)) {
00796                 cl_ctx->next = conn_ctxs;
00797                 conn_ctxs    = cl_ctx;
00798 
00799                 /* First entry in 'conn_ctxs': schedule a move to
00800                    'used_conn' (for house-keeping and tracking) */
00801                 if(NULL == cl_ctx->next) {
00802                     rv = loopb_trigger();
00803                 }
00804             }
00805 
00806 
00807             // We have finished accessing the shared resource.  Release the
00808             // semaphore.
00809             //xSemaphoreGive( xSemaphore );
00810         } else {
00811             // We could not obtain the semaphore and can therefore not access
00812             // the shared resource safely.
00813             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
00814         }
00815     }
00816 
00817 cl_ctx_conn_state_try_locked_exit1:
00818 //        MUTEX_UNLOCK();
00819     xSemaphoreGive(xSemaphore);
00820 
00821     return rv;
00822 }
00823 
00824 static
00825 int32_t connect_msg_send(struct client_ctx *cl_ctx, bool clean_session, uint16_t ka_secs)
00826 {
00827 
00828     struct mqtt_packet *mqp = mqp_client_send_alloc(MQTT_CONNECT);
00829     uint8_t *buf, *ref, conn_opts = clean_session? CLEAN_START_VAL : 0;
00830     int32_t rv = MQP_ERR_PKT_LEN;
00831     uint32_t fsz; /* Free buffer size in PKT */
00832     uint16_t vhl = get_connect_vh_len(cl_ctx);
00833 
00834     if(NULL == mqp) {
00835         Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\r\n");
00836         return MQP_ERR_PKT_AVL;
00837     }
00838     fsz = MQP_FREEBUF_LEN(mqp);
00839     if(fsz < vhl) {
00840         Uart_Write((uint8_t*)"No space for VAR HDR\r\n");
00841         goto connect_msg_send_exit1;           /* No space for VAR HDR */
00842     }
00843     mqp->vh_len = vhl;               /* Reserve buffer for variable header */
00844     buf = ref = MQP_PAYLOAD_BUF(mqp);/* Get started to incorporate payload */
00845 
00846     rv = wr_connect_pl(cl_ctx, buf, fsz - vhl, &conn_opts);/* Payload data */
00847     if(rv < 0) {
00848         memset(print_buf, 0x00, PRINT_BUF_LEN);
00849         sprintf((char*) print_buf, "Payload WR failed %i\r\n",rv);
00850         Uart_Write((uint8_t *) print_buf);
00851         goto connect_msg_send_exit1;              /* Payload WR failed */
00852     }
00853     buf += rv;
00854     mqp->pl_len = buf - ref;
00855 
00856     wr_connect_vh(cl_ctx, ref - vhl, ka_secs,
00857                   CLIENT(cl_ctx)->will_opts | conn_opts);    /* Var Header */
00858 
00859     mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, MQTT_QOS0, false));/* Fix Header */
00860     ref = MQP_FHEADER_BUF(mqp);
00861 
00862     /* Following routine frees up MQP - whether error or not */
00863     return cl_ctx_conn_state_try_locked(cl_ctx, ref, buf - ref,
00864                                         ka_secs, clean_session,
00865                                         mqp);
00866 connect_msg_send_exit1:
00867 
00868     if(mqp) {
00869         mqp_free_locked(mqp);
00870     }
00871     return rv;
00872 }
00873 
00874 int32_t mqtt_connect_msg_send(void *ctx, bool clean_session, uint16_t ka_secs)
00875 {
00876 
00877     return  ctx?
00878             connect_msg_send(CL_CTX(ctx), clean_session, ka_secs) : -1;
00879 }
00880 
00881 /*
00882    To be used for the following messages: PUBLISH, SUBSCRIBE, UNSUBSCRIBE
00883    Dispatches msg to broker over socket. Frees-up MQP, in case, MSG has QoS0 or
00884    if client-lib allocated MQP encounters an error in dispatch.
00885    Returns, on success, number of bytes transfered, otherwise -1
00886 */
00887 static int32_t _msg_dispatch(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
00888                              enum mqtt_qos qos, bool retain)
00889 {
00890 
00891     bool not_qos0 = (MQTT_QOS0 != qos)? true : false;
00892     uint16_t msg_id = mqp->msg_id;
00893     int32_t rv = MQP_ERR_NETWORK;
00894 
00895     mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
00896 
00897 //        MUTEX_LOCKIN();
00898     if( xSemaphore != NULL ) {
00899         // See if we can obtain the semaphore.  If the semaphore is not available
00900         // wait 10 ticks to see if it becomes free.
00901         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 200 ) == pdTRUE ) {
00902             // We were able to obtain the semaphore and can now access the
00903             // shared resource.
00904 
00905             if(not_qos0) {
00906                 mqp->n_refs++;   /* Need to enlist, do not free-up MQP */
00907             }
00908             /* Tries to free-up MQP either on error or if full pkt is sent */
00909             rv = cl_ctx_seg1_send(cl_ctx, MQP_FHEADER_BUF(mqp),
00910                                   MQP_CONTENT_LEN(mqp),  false,
00911                                   mqp);
00912 
00913             /* At this point, error or not, QoS0 MQP would have been freed */
00914 
00915             if((rv <= 0) && not_qos0) {
00916                 mqp_free(mqp); /* Err: Explicitly free-up non QoS0 MQP */
00917                 Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n");
00918                 goto _msg_dispatch_exit1;
00919             }
00920 
00921             rv = msg_id;   /* Make progress for a good send to the server  */
00922 
00923             if(not_qos0) {  /* Enlist non QOS0 MQP to await ACK from server */
00924                 mqp_ack_wlist_append(&CLIENT(cl_ctx)->qos_ack1_wl, mqp);
00925             }
00926             // We have finished accessing the shared resource.  Release the
00927             // semaphore.
00928             //xSemaphoreGive( xSemaphore );
00929         } else {
00930             // We could not obtain the semaphore and can therefore not access
00931             // the shared resource safely.
00932             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
00933         }
00934     }
00935 
00936 _msg_dispatch_exit1:
00937 
00938 //        MUTEX_UNLOCK();
00939     xSemaphoreGive(xSemaphore);
00940 
00941     return rv;
00942 }
00943 
00944 static
00945 int32_t msg_dispatch_no_free(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
00946                              enum mqtt_qos qos, bool retain)
00947 {
00948     if((NULL == mqp) || (NULL == cl_ctx))
00949         return MQP_ERR_FNPARAM;
00950 
00951     mqp->n_refs++; /* Ensures caller that MQP is not freed-up */
00952 
00953     return _msg_dispatch(cl_ctx, mqp, qos, retain);
00954 }
00955 
00956 int32_t mqtt_client_pub_msg_send(void *ctx, const struct utf8_string *topic,
00957                                  const uint8_t *data_buf, uint32_t data_len,
00958                                  enum mqtt_qos qos, bool retain)
00959 {
00960 
00961     struct mqtt_packet *mqp = NULL;
00962 
00963     if((NULL == ctx)   ||
00964             (NULL == topic) ||
00965             ((data_len > 0) && (NULL == data_buf))) {
00966         Uart_Write((uint8_t*)"MQP_ERR_FNPARAM\n\r");
00967         return MQP_ERR_FNPARAM;
00968     }
00969     if(false == is_valid_utf8_string(topic)) {
00970         Uart_Write((uint8_t*)"MQP_ERR_CONTENT\n\r");
00971         return MQP_ERR_CONTENT;
00972     }
00973     mqp = mqp_client_send_alloc(MQTT_PUBLISH);
00974     if(NULL == mqp) {
00975         Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\n\r");
00976         return MQP_ERR_PKT_AVL;
00977     }
00978     if((0 > mqp_pub_append_topic(mqp, topic, qos? assign_new_msg_id(): 0)) ||
00979             (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) {
00980         Uart_Write((uint8_t*)"len_err\n\r");
00981         return len_err_free_mqp(mqp);
00982     }
00983 
00984     return _msg_dispatch(CL_CTX(ctx), mqp, qos, retain);
00985 }
00986 
00987 int32_t mqtt_client_pub_dispatch(void *ctx, struct mqtt_packet *mqp,
00988                                  enum mqtt_qos qos, bool retain)
00989 {
00990     return msg_dispatch_no_free(CL_CTX(ctx), mqp, qos, retain);
00991 }
00992 
00993 static int32_t tail_incorp_msg_id(struct mqtt_packet *mqp)
00994 {
00995     uint8_t *buf = MQP_FHEADER_BUF(mqp) + mqp->vh_len;
00996 
00997     if(0 == mqp->msg_id) {
00998         mqp->msg_id  = assign_new_msg_id();
00999         buf += buf_wr_nbo_2B(buf, mqp->msg_id);
01000         mqp->vh_len += 2;
01001 
01002         return 2;
01003     }
01004 
01005     return 0;
01006 }
01007 
01008 static int32_t buf_utf8_wr_try(uint8_t *buf, uint32_t fsz, const struct utf8_string *topic,
01009                                uint8_t qid)
01010 {
01011     uint8_t *ref = buf;
01012 
01013     if(fsz < (topic->length + 2 + (QFL_VALUE == qid)? 0 : 1))
01014         return MQP_ERR_PKT_LEN; /* No buf */
01015 
01016     if(false == is_valid_utf8_string(topic))
01017         return MQP_ERR_CONTENT;/* Invalid */
01018 
01019     buf += mqp_buf_wr_utf8(buf, topic);
01020     if(QFL_VALUE != qid)
01021         *buf++ = qid;
01022 
01023     return buf - ref;
01024 }
01025 
01026 static int32_t utf8_array_send(struct client_ctx           *cl_ctx,
01027                                const struct utf8_strqos *subsc_vec,
01028                                const struct utf8_string *unsub_vec,
01029                                uint32_t n_elem)
01030 {
01031     struct mqtt_packet *mqp;
01032     uint8_t *ref, *buf, *end;
01033     uint32_t i;
01034 
01035     if((NULL == cl_ctx) || !((!!subsc_vec) ^ (!!unsub_vec)) || (0 == n_elem))
01036         return MQP_ERR_FNPARAM;
01037 
01038     mqp = mqp_client_send_alloc(subsc_vec?
01039                                 MQTT_SUBSCRIBE : MQTT_UNSUBSCRIBE);
01040     if(NULL == mqp)
01041         return MQP_ERR_PKT_AVL;
01042 
01043     buf = MQP_VHEADER_BUF(mqp);
01044     end = MQP_FREEBUF_LEN(mqp) + buf;  /* End of free buffer */
01045     if((end - buf) < 2)
01046         return len_err_free_mqp(mqp);/* MSG-ID: no space */
01047 
01048     buf += tail_incorp_msg_id(mqp);
01049     ref  = buf;
01050 
01051     for(i = 0; i < n_elem; i++) {
01052         const struct utf8_string *topic;
01053         struct utf8_string topreq;
01054         int32_t rv;
01055 
01056         if(subsc_vec) {
01057             topreq.length = subsc_vec[i].length;
01058             topreq.buffer = subsc_vec[i].buffer;
01059             topic = &topreq;
01060         } else
01061             topic = unsub_vec + i;
01062 
01063         rv = buf_utf8_wr_try(buf, end - buf, topic, subsc_vec?
01064                              (uint8_t)subsc_vec[i].qosreq : QFL_VALUE);
01065         if(rv < 0) {
01066             mqp_free(mqp);
01067             return rv;
01068         }
01069 
01070         buf += rv;
01071     }
01072 
01073     mqp->pl_len = buf - ref; /* Total length of topics data */
01074 
01075     return _msg_dispatch(cl_ctx, mqp, MQTT_QOS1, false);
01076 }
01077 
01078 int32_t mqtt_sub_msg_send(void *ctx, const struct utf8_strqos *qos_topics, uint32_t count)
01079 {
01080     return utf8_array_send(CL_CTX(ctx), qos_topics, NULL, count);
01081 }
01082 
01083 int32_t mqtt_sub_dispatch(void *ctx, struct mqtt_packet *mqp)
01084 {
01085     return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false);
01086 }
01087 
01088 int32_t mqtt_unsub_msg_send(void *ctx, const struct utf8_string *topics, uint32_t count)
01089 {
01090     return utf8_array_send(CL_CTX(ctx), NULL, topics, count);
01091 }
01092 
01093 int32_t mqtt_unsub_dispatch(void *ctx, struct mqtt_packet *mqp)
01094 {
01095     return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false);
01096 }
01097 
01098 /* Note: in this revision of implementation, vh_msg_send() is being invoked
01099    from a locked RX context. Should this situation change, so should the
01100    'locking' considerations in the routine. */
01101 static int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type,
01102                            enum mqtt_qos qos, bool has_vh,
01103                            uint16_t vh_data)
01104 {
01105     uint8_t  buf[4];
01106     uint32_t len = 2;
01107 
01108     buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false));
01109     buf[1] = has_vh ? 2 : 0;
01110 
01111     if(has_vh)
01112         len += buf_wr_nbo_2B(buf + 2, vh_data);
01113 
01114     return cl_ctx_seg1_send(cl_ctx, buf, len, false, NULL);
01115 }
01116 
01117 static int32_t pingreq_send(struct client_ctx *cl_ctx, uint32_t rsp_flag)
01118 {
01119 
01120     int32_t rv = 0;
01121     uint8_t buf[2];
01122 
01123     buf[0] = MAKE_FH_BYTE1(MQTT_PINGREQ,
01124                            MAKE_FH_FLAGS(false, MQTT_QOS0, false));
01125     buf[1] = 0;
01126 
01127     /* Note: in case of error in network send, cl_ctx_send() may
01128        try to terminate connection with server. */
01129     rv = cl_ctx_seg1_send(cl_ctx, buf, 2, false, NULL);
01130     if(rv > 0)
01131         cl_ctx->flags |= rsp_flag;
01132 
01133     return rv;
01134 }
01135 
01136 int32_t mqtt_pingreq_send(void *ctx)
01137 {
01138     Uart_Write((uint8_t*)"mqtt_pingreq_send\r\n");
01139     int32_t rv = 0;
01140 
01141 //        MUTEX_LOCKIN();
01142     if( xSemaphore != NULL ) {
01143         // See if we can obtain the semaphore.  If the semaphore is not available
01144         // wait 10 ticks to see if it becomes free.
01145         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
01146             // We were able to obtain the semaphore and can now access the
01147             // shared resource.
01148             rv = pingreq_send(CL_CTX(ctx), USER_PING_RSP_FLAG);
01149 
01150             // We have finished accessing the shared resource.  Release the
01151             // semaphore.
01152             xSemaphoreGive( xSemaphore );
01153 
01154         } else {
01155             // We could not obtain the semaphore and can therefore not access
01156             // the shared resource safely.
01157             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
01158         }
01159     }
01160 
01161 //        MUTEX_UNLOCK();
01162     return rv;
01163 }
01164 
01165 int32_t mqtt_disconn_send(void *ctx)
01166 {
01167     Uart_Write((uint8_t*)"mqtt_disconn_send\r\n");
01168     uint8_t buf[2];
01169 
01170     buf[0] = MAKE_FH_BYTE1(MQTT_DISCONNECT,
01171                            MAKE_FH_FLAGS(false, MQTT_QOS0, false));
01172     buf[1] = 0;
01173 
01174 //        MUTEX_LOCKIN();
01175     if( xSemaphore != NULL ) {
01176         // See if we can obtain the semaphore.  If the semaphore is not available
01177         // wait 10 ticks to see if it becomes free.
01178         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
01179             // We were able to obtain the semaphore and can now access the
01180             // shared resource.
01181             /* Note: in case of error in network send, cl_ctx_send() may
01182             try to terminate connection with server. */
01183             if(cl_ctx_seg1_send(CL_CTX(ctx), buf, 2, false, NULL) > 0) {
01184                 /* Terminate connection on application's request */
01185                 do_net_close_tx(CL_CTX(ctx), "DISCONN");
01186             }
01187 
01188             // We have finished accessing the shared resource.  Release the
01189             // semaphore.
01190             xSemaphoreGive( xSemaphore );
01191 
01192         } else {
01193             // We could not obtain the semaphore and can therefore not access
01194             // the shared resource safely.
01195             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
01196         }
01197     }
01198 
01199 //        MUTEX_UNLOCK();
01200     return 0;
01201 }
01202 
01203 /*------------------------------------------------------------------------------
01204  *  MQTT RX Routines
01205  *------------------------------------------------------------------------------
01206  */
01207 static bool ack1_wl_mqp_dispatch(struct client_ctx *cl_ctx)
01208 {
01209     struct mqtt_ack_wlist *wlist = &CLIENT(cl_ctx)->qos_ack1_wl;
01210     struct mqtt_packet *mqp = NULL;
01211     bool rv = true;
01212 
01213     for(mqp = wlist->head; mqp && (true == rv); mqp = mqp->next) {
01214         uint8_t *buf = MQP_FHEADER_BUF(mqp);
01215         mqp->fh_byte1 = *buf |= DUP_FLAG_VAL(true);
01216 
01217         mqp->n_refs++; /* Ensures MQP is not freed by following */
01218 
01219         /* Error or not, following routine tries to free up MQP */
01220         if(cl_ctx_seg1_send(cl_ctx, buf, MQP_CONTENT_LEN(mqp),
01221                             false, mqp) <= 0)
01222             rv = false;
01223     }
01224 
01225     return rv;
01226 }
01227 
01228 /* TBD candidate for common */
01229 static bool ack2_msg_id_dispatch(struct client_ctx *cl_ctx)
01230 {
01231     struct pub_qos2_cq *tx_cq = &CLIENT(cl_ctx)->qos2_tx_cq;
01232     uint8_t rd_idx = tx_cq->rd_idx;
01233     uint8_t n_free = tx_cq->n_free;
01234     bool rv = true;
01235     uint8_t i = 0;
01236 
01237     for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free) && (true == rv); i++) {
01238         if(vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1,
01239                        true, tx_cq->id_vec[i]) <= 0)
01240             rv = false;
01241     }
01242 
01243     return rv;
01244 }
01245 
01246 static void session_resume(struct client_ctx *cl_ctx)
01247 {
01248     DBG_INFO("C: Re-send ACK awaited QoS1/2 msgs to net %d\n\r",
01249              cl_ctx->net);
01250 
01251     if(ack1_wl_mqp_dispatch(cl_ctx))
01252         ack2_msg_id_dispatch(cl_ctx);
01253 
01254     return;
01255 }
01256 
01257 static bool ack1_wl_rmfree(struct mqtt_ack_wlist *wl, uint16_t msg_id)
01258 {
01259     struct mqtt_packet *mqp = mqp_ack_wlist_remove(wl, msg_id);
01260     if(NULL != mqp) {
01261         mqp_free(mqp);
01262         return true;
01263     }
01264 
01265     USR_INFO("Err: Unexpected ACK w/ ID 0x%04x\n\r", msg_id);
01266 
01267     return false;
01268 }
01269 
01270 static bool _proc_pub_rec_rx(struct client_ctx *cl_ctx, uint16_t msg_id)
01271 {
01272     /* Follow-up messages for QOS2 PUB must be transacted in the
01273        same order as the initial sequence of QOS2 PUB dispatches.
01274        Therefore, checking the first entry should be OK
01275     */
01276     struct mqtt_packet *mqp = CLIENT(cl_ctx)->qos_ack1_wl.head;
01277 
01278     if((msg_id == mqp->msg_id) && ack2_msg_id_logup(cl_ctx, msg_id)) {
01279 
01280         ack1_wl_rmfree(&CLIENT(cl_ctx)->qos_ack1_wl, msg_id);
01281 
01282         vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1,
01283                     true, msg_id);
01284 
01285         return true;
01286     }
01287 
01288     return false; /* Unexpected PUBREC or QOS2 store exceeded */
01289 }
01290 
01291 static bool _proc_pub_rel_rx(struct client_ctx *cl_ctx, uint16_t msg_id)
01292 {
01293     /* For a PUB-REL RX, send PUBCOMP to let server make progress */
01294     vh_msg_send(cl_ctx, MQTT_PUBCOMP, MQTT_QOS0, true, msg_id);
01295 
01296     if(qos2_pub_rx_is_done(cl_ctx, msg_id))
01297         qos2_pub_rx_unlog(cl_ctx, msg_id);  /* Expunge record */
01298 
01299     return true;
01300 }
01301 
01302 /*
01303    Process ACK Message from Broker.
01304    Returns true on success, otherwise false.
01305    Used for: PUBACK, SUBACK and UNSUBACK
01306 */
01307 static
01308 bool _proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01309 {
01310     struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
01311     struct client_desc *client = CLIENT(cl_ctx);
01312     uint16_t msg_id = mqp_raw->msg_id;
01313     uint32_t len = mqp_raw->pl_len;
01314 
01315     /* Caters to SUB-ACK, UNSUB-ACK and PUB-ACK Messages */
01316     if(false == ack1_wl_rmfree(&client->qos_ack1_wl, msg_id))
01317         return false; /* Err: MSG_ID was not awaited */
01318 
01319     if(ctx_cbs->ack_notify)
01320         ctx_cbs->ack_notify(client->app, mqp_raw->msg_type, msg_id,
01321                             len? MQP_PAYLOAD_BUF(mqp_raw): NULL,
01322                             len);
01323     return true;
01324 }
01325 
01326 static
01327 bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01328 {
01329     uint8_t  msg_type = mqp_raw->msg_type;
01330     bool rv = false;
01331     uint16_t msg_id = 0;
01332 
01333     if(false == mqp_proc_msg_id_ack_rx(mqp_raw, MQTT_SUBACK == msg_type))
01334         return rv; /* Problem in contents received from server */
01335 
01336     msg_id = mqp_raw->msg_id;
01337 
01338     if(MQTT_PUBREC == msg_type) {
01339         rv =  _proc_pub_rec_rx(cl_ctx, msg_id);
01340         if(rv)
01341             MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */
01342 
01343     } else if(MQTT_PUBREL == msg_type) {
01344         rv =  _proc_pub_rel_rx(cl_ctx, msg_id);
01345         if(rv)
01346             MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */
01347     } else if(MQTT_PUBCOMP == msg_type) {
01348         rv = ack2_msg_id_unlog(cl_ctx, msg_id);
01349     } else {
01350         rv = _proc_ack_msg_rx(cl_ctx, mqp_raw);
01351     }
01352 
01353     return rv;
01354 }
01355 
01356 static
01357 bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01358 {
01359     struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
01360     bool good_pub = mqp_proc_pub_rx(mqp_raw);
01361     uint8_t B = mqp_raw->fh_byte1;
01362     enum mqtt_qos qos = ENUM_QOS(B);
01363     uint16_t msg_id = 0;
01364 
01365     if(false == good_pub)
01366         return false; /* Didn't get nicely composed PUB Packet */
01367 
01368     msg_id = mqp_raw->msg_id;
01369 
01370     /* Irrespective of the result of the ACK through vh_msg_send(),
01371        the implementation has chosen to process the good PUB packet.
01372        Any error will be handled in next iteration of rx processing.
01373     */
01374     if(MQTT_QOS1 == qos)
01375         vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id);
01376 
01377     if(MQTT_QOS2 == qos) {
01378         /* Ensuring "only once" philosophy for MQTT QoS2 PUBs */
01379         if(qos2_pub_rx_is_done(cl_ctx, msg_id)) {
01380             /* Already delivered. Drop it & do not report */
01381             MQP_RX_DO_NOT_RPT_SET(mqp_raw);
01382             return true; /* No more follow-up; all's good */
01383         }
01384 
01385         if(false == qos2_pub_rx_logup(cl_ctx, msg_id))
01386             return false;  /* Failed to record New RX PUB */
01387 
01388         vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id);
01389     }
01390 
01391     /* QoS obliations completed, present PUBLISH RX packet to app */
01392     if(ctx_cbs->publish_rx) {
01393         /* App has chosen the callback method to  receive PKT */
01394         mqp_raw->n_refs++;   /* Make app owner of this packet */
01395         if(ctx_cbs->publish_rx(CLIENT(cl_ctx)->app, BOOL_DUP(B),
01396                                qos, BOOL_RETAIN(B), mqp_raw)) {
01397             /* App has no use of PKT any more, so free it */
01398             mqp_raw->n_refs--;     /* Take back ownership */
01399         }
01400     }
01401 
01402     return true;
01403 }
01404 
01405 static
01406 bool proc_connack_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01407 {
01408     struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
01409     uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
01410 
01411     mqp_raw->vh_len += 2;
01412     mqp_raw->pl_len -= 2;
01413 
01414     if(0 != mqp_raw->pl_len)
01415         return false;      /* There is no payload in message */
01416 
01417     cl_ctx->flags &= ~(DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG);
01418 
01419     if(VHB_CONNACK_RC(buf))
01420         /* Server has refused the connection, inform the app */
01421         goto proc_connack_rx_exit1;
01422 
01423     cl_ctx->flags |=  NOW_CONNECTED_FLAG;
01424     cl_ctx_timeout_update(cl_ctx, net_ops->time());  /* start KA */
01425 
01426     if(IS_CLN_SESSION(cl_ctx))
01427         session_delete(cl_ctx);
01428     else
01429         session_resume(cl_ctx);
01430 
01431 proc_connack_rx_exit1:
01432     if(ctx_cbs->ack_notify)
01433         ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, mqp_raw->msg_type,
01434                             0, buf, 2);
01435 
01436     return true;
01437 }
01438 
01439 static
01440 bool proc_pingrsp_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01441 {
01442     struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
01443 
01444     if(0 != mqp_raw->pl_len)
01445         return false;
01446 
01447     if(AWAITS_KA_PING(cl_ctx)) {
01448         cl_ctx->flags &= ~KA_PINGER_RSP_FLAG;
01449         return true;
01450     }
01451 
01452     if(AWAITS_PINGRSP(cl_ctx)) {
01453         cl_ctx->flags &= ~USER_PING_RSP_FLAG;
01454         if(ctx_cbs->ack_notify)
01455             ctx_cbs->ack_notify(CLIENT(cl_ctx)->app,
01456                                 mqp_raw->msg_type,
01457                                 0, NULL, 0);
01458         return true;
01459     }
01460 
01461     return false;
01462 }
01463 
01464 static
01465 bool conn_sent_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01466 {
01467     bool rv = false;
01468 
01469     switch(mqp_raw->msg_type) {
01470 
01471         case MQTT_CONNACK:
01472             /* Changes client_ctx->flags to CONNECTED */
01473             rv = proc_connack_rx(cl_ctx, mqp_raw);
01474             break;
01475 
01476         default:
01477             break;
01478     }
01479 
01480     return rv;
01481 }
01482 
01483 static
01484 bool connected_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
01485 {
01486     bool rv = false;
01487 
01488     switch(mqp_raw->msg_type) {
01489 
01490         case MQTT_SUBACK:
01491         case MQTT_PUBACK:
01492         case MQTT_PUBREC:
01493         case MQTT_PUBREL:
01494         case MQTT_PUBCOMP:
01495         case MQTT_UNSUBACK:
01496             rv = proc_ack_msg_rx(cl_ctx, mqp_raw);
01497             break;
01498 
01499         case MQTT_PINGRSP:
01500             rv = proc_pingrsp_rx(cl_ctx, mqp_raw);
01501             break;
01502 
01503         case MQTT_PUBLISH:
01504             rv = proc_pub_msg_rx(cl_ctx, mqp_raw);
01505             break;
01506 
01507         case MQTT_CONNACK: /* not expected */
01508         default:
01509             break;
01510     }
01511 
01512     return rv;
01513 }
01514 
01515 static bool process_recv(struct client_ctx   *cl_ctx,
01516                          struct mqtt_packet *mqp_raw)
01517 {
01518     bool rv;
01519 
01520     USR_INFO("C: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d [@ %u]\n\r",
01521              mqp_raw->fh_byte1, cl_ctx->net, net_ops->time());
01522 
01523     /* Working Principle: Only RX processing errors should be
01524        reported as 'false'. Status of TX as a follow-up to RX
01525        messages need not be reported by the xyz_rx() routines.
01526        Error observed in TX is either dealt in next iteration
01527        of RX loop (in case, there is a dedicated RX task for
01528        the CTX) or in TX routine itself (in case, there is no
01529        dedicated RX task for the CTX).
01530     */
01531     rv =    AWAITS_CONNACK(cl_ctx)?
01532             conn_sent_state_rx(cl_ctx, mqp_raw) :
01533             connected_state_rx(cl_ctx, mqp_raw);
01534 
01535     DBG_INFO("C: Msg w/ ID 0x%04x, processing status: %s\n\r",
01536              mqp_raw->msg_id, rv? "Good" : "Fail");
01537 
01538     return rv;
01539 }
01540 
01541 static int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, void *ctx)
01542 {
01543     bool timed_out = false;
01544     int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, &timed_out, ctx);
01545     if(rv <= 0) {
01546         USR_INFO("C: Net %d, Raw Error %d, Time Out: %c\n\r",
01547                  net, rv, timed_out? 'Y' : 'N');
01548 
01549         if(timed_out)
01550             rv = MQP_ERR_TIMEOUT;
01551     }
01552 
01553     return rv;
01554 }
01555 
01556 /*
01557    MQTT 3.1.1 implementation
01558    -------------------------
01559 
01560    Keep Alive Time is maxmimum interval within which a client should send a
01561    packet to broker. If there are either no packets to be sent to broker or
01562    no retries left, then client is expected to a send a PINGREQ within Keep
01563    Alive Time. Broker should respond by sending PINGRSP with-in reasonable
01564    time of 'wait_secs'. If Keep Alive Time is set as 0, then client is not
01565    expected to be disconnected due to in-activity of MQTT messages. Value
01566    of 'wait_secs' is assumed to be quite smaller than (non-zero) 'ka_secs'.
01567 */
01568 static void conn2used_ctxs(uint32_t wait_secs)
01569 {
01570 
01571     while(conn_ctxs) {
01572         struct client_ctx *cl_ctx = conn_ctxs;
01573         conn_ctxs = conn_ctxs->next;
01574 
01575         cl_ctx_timeout_insert(&used_ctxs, cl_ctx);
01576     }
01577 }
01578 
01579 static int32_t single_ctx_ka_sequence(struct client_ctx *cl_ctx, uint32_t wait_secs)
01580 {
01581 
01582     uint32_t now_secs = net_ops->time();
01583 
01584     if(AWAITS_CONNACK(cl_ctx) && CFG_CONNACK_TO(cl_ctx)) {
01585         cl_ctx->timeout += wait_secs; /* Set CONNACK timeout value */
01586         cl_ctx->flags   &= ~DO_CONNACK_TO_FLAG;
01587     }
01588 
01589     if(cl_ctx->timeout > now_secs) {
01590         return 1;  /* Still have time for next message transaction */
01591     }
01592     if(is_connected(cl_ctx)) {
01593         /* Timeout has happened. Check for PINGRESP if PINGREQ done.
01594            Otherwise, send PINGREQ (Are You there?) to the server. */
01595         if(AWAITS_KA_PING(cl_ctx)) {
01596             goto single_ctx_ka_sequence_exit1; /* No PINGRESP  */
01597         }
01598         return pingreq_send(cl_ctx, KA_PINGER_RSP_FLAG); /* Hello! */
01599     }
01600 
01601 single_ctx_ka_sequence_exit1:
01602 
01603     USR_INFO("C: Net %d, no RX MSG in reasonable time\n\r", cl_ctx->net);
01604     return -1;
01605 }
01606 
01607 static uint32_t single_ctx_adj_wait_secs_get(struct client_ctx *cl_ctx, uint32_t wait_secs)
01608 {
01609 
01610     return (KA_TIMEOUT_NONE != cl_ctx->timeout)?
01611            MIN(cl_ctx->timeout - net_ops->time(), wait_secs) : wait_secs;
01612 }
01613 
01614 static int32_t single_ctx_rx_prep(struct client_ctx *cl_ctx, uint32_t *secs2wait)
01615 {
01616 
01617     int32_t rv;
01618 
01619     if(-1 == cl_ctx->net)
01620         return MQP_ERR_NOTCONN; /* Likely for a ctx w/o a receive task */
01621 
01622     if(NEED_NET_CLOSE(cl_ctx))
01623         rv = MQP_ERR_NOTCONN;
01624     else if(0 > single_ctx_ka_sequence(cl_ctx, *secs2wait))
01625         rv = MQP_ERR_NETWORK;
01626     else {
01627         *secs2wait = single_ctx_adj_wait_secs_get(cl_ctx, *secs2wait);
01628         return 1;
01629     }
01630 
01631     do_net_close_rx(cl_ctx, rv);
01632     return rv;
01633 }
01634 
01635 static
01636 int32_t proc_ctx_data_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
01637                            uint32_t wait_secs, void **app)
01638 {
01639 
01640     int32_t rv  = MQP_ERR_NOTCONN;
01641     int32_t net = cl_ctx->net;
01642 
01643     *app = cl_ctx->usr;
01644 
01645     rv = net_recv(net, mqp, wait_secs, (void*)cl_ctx);
01646 
01647 //        MUTEX_LOCKIN();
01648     if( xSemaphore != NULL ) {
01649         // See if we can obtain the semaphore.  If the semaphore is not available
01650         // wait 10 ticks to see if it becomes free.
01651         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
01652             // We were able to obtain the semaphore and can now access the
01653             // shared resource.
01654             if(rv > 0) {
01655                 if(false == process_recv(cl_ctx, mqp)) {
01656                     Uart_Write((uint8_t*)"MQP_ERR_CONTENT\r\n");
01657                     rv = MQP_ERR_CONTENT;
01658                 }
01659             }
01660             /* RX: close the network connection to the server for this context, if
01661                (a) there is a processing / protocol error other than time-out
01662                (b) A good MQTT CONNACK has a return code - connection refused
01663             */
01664             if(((rv < 0) && (rv != MQP_ERR_TIMEOUT)) ||
01665                     ((MQTT_CONNACK == mqp->msg_type) &&
01666                      MQP_CONNACK_RC(mqp))) {
01667                 do_net_close_rx(cl_ctx, rv);
01668             }
01669 
01670             // We have finished accessing the shared resource.  Release the
01671             // semaphore.
01672             xSemaphoreGive( xSemaphore );
01673 
01674         } else {
01675             // We could not obtain the semaphore and can therefore not access
01676             // the shared resource safely.
01677             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
01678         }
01679     }
01680 
01681 //        MUTEX_UNLOCK();
01682     return rv;
01683 }
01684 
01685 static int32_t mqp_setup_proc_ctx_data_recv(struct client_ctx *cl_ctx,
01686         struct mqtt_packet *mqp,
01687         uint32_t wait_secs, void **app)
01688 {
01689     struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp;
01690     int32_t rv;
01691 
01692     if(NULL != mqp) {
01693         /* Input MQP must be same as MQP for partial RX, if any */
01694         if(rx_mqp) {
01695             if(mqp != rx_mqp)
01696                 return MQP_ERR_FNPARAM;
01697         } else
01698             mqp_reset(mqp);
01699     }
01700 
01701     if(NULL == mqp) {
01702         mqp = rx_mqp? rx_mqp : mqp_client_recv_alloc(0);
01703         if(NULL == mqp)
01704             return MQP_ERR_PKT_AVL;
01705     }
01706 
01707     rv = proc_ctx_data_recv(cl_ctx, mqp, wait_secs, app);
01708     if(rv == MQP_ERR_TIMEOUT) {
01709         CLIENT(cl_ctx)->rx_mqp = mqp;  /* Save partial RX MQP */
01710     } else {
01711         /* Control reaches here due to either an error in RX or the
01712            completion of RX. In both the cases, the MQP needs to be
01713            detached and processed. For completion of RX:
01714            callback mode: Application has used up MQP data; free it
01715            Non-callback mode: Application will now use complete MQP
01716         */
01717         CLIENT(cl_ctx)->rx_mqp = NULL;
01718         if(mqp->free)
01719             mqp_free_locked(mqp);  /* For only callback mode */
01720     }
01721 
01722     return rv;
01723 }
01724 
01725 static int32_t cl_ctx_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
01726                            uint32_t wait_secs)
01727 {
01728 
01729     void *app = NULL;
01730     int32_t rv = 0;
01731 
01732     do {
01733         if(mqp && (NULL == CLIENT(cl_ctx)->rx_mqp))
01734             mqp_reset(mqp);
01735 
01736         rv = single_ctx_rx_prep(cl_ctx, &wait_secs);
01737         if(rv > 0)
01738             rv = mqp_setup_proc_ctx_data_recv(cl_ctx, mqp,
01739                                               wait_secs,
01740                                               &app);
01741 
01742         /* 'mqp' must be valid, if rv > 0 but including further
01743            & additional check for sake of static cod eanalysis.*/
01744     } while((rv > 0) && mqp && MQP_RX_DO_NOT_RPT_COR(mqp));
01745 
01746     return rv;
01747 }
01748 
01749 int32_t mqtt_client_ctx_await_msg(void *ctx, uint8_t msg_type, struct mqtt_packet *mqp,
01750                                   uint32_t wait_secs)
01751 {
01752     struct client_ctx *cl_ctx = CL_CTX(ctx);
01753     int32_t rv = -1;
01754 
01755     if((NULL == cl_ctx) || (NULL == mqp))
01756         return MQP_ERR_FNPARAM;
01757 
01758     do {
01759         rv = cl_ctx_recv(cl_ctx, mqp, wait_secs);
01760 
01761     } while((rv > 0) &&
01762             (0 != msg_type) && (msg_type != mqp->msg_type));
01763 
01764     return rv;
01765 }
01766 
01767 int32_t mqtt_client_ctx_run(void *ctx, uint32_t wait_secs)
01768 {
01769 
01770     int32_t rv;
01771 
01772     if(NULL == ctx)
01773         return MQP_ERR_FNPARAM;
01774 
01775     do {
01776         rv = cl_ctx_recv(CL_CTX(ctx), NULL, wait_secs);
01777 
01778     } while(rv > 0);
01779 
01780     return rv;
01781 }
01782 
01783 static struct client_ctx *group_ctxs_ka_sequence(uint32_t wait_secs) {
01784 
01785     struct client_ctx *cl_ctx = used_ctxs;
01786 
01787     while(cl_ctx) {
01788         struct client_ctx *next = cl_ctx->next;
01789         if(single_ctx_rx_prep(cl_ctx, &wait_secs) < 0) {
01790             /* 'CTX' no more eligible for operation
01791                and has been removed from used_list */
01792             if(false == grp_has_cbfn)
01793                 return cl_ctx;
01794         }
01795 
01796         cl_ctx = next;
01797     }
01798 
01799     return NULL;
01800 }
01801 
01802 #define IO_MON_NO_TIMEOUT (0xFFFFFFFF)
01803 
01804 static uint32_t group_ctxs_adj_wait_secs_get(uint32_t wait_secs)
01805 {
01806 
01807     return  used_ctxs?
01808             single_ctx_adj_wait_secs_get(used_ctxs, wait_secs) : wait_secs;
01809 }
01810 
01811 static int32_t recv_hvec[MAX_NWCONN + 1 + 1]; /* GROUP LISTEN PORT + VEC END */
01812 static int32_t send_hvec  = -1;
01813 static int32_t rsvd_hvec  = -1;
01814 
01815 /* Caller must ensure atomic enviroment for execution of this routine */
01816 static void recv_hvec_load(int32_t *hvec_recv, uint32_t size, struct client_ctx *list)
01817 {
01818 
01819     int32_t i = 0;
01820 
01821     for(i = 0; (i < size) && (NULL != list); i++, list = list->next)
01822         hvec_recv[i] = list->net;
01823 
01824     hvec_recv[i] = -1;
01825 
01826     return;
01827 }
01828 
01829 static int32_t group_ctxs_rx_prep(uint32_t wait_secs, void **app)
01830 {
01831 
01832     /* CHK 'used ctx'(s) have live connection w/ server. If not, drop it */
01833     struct client_ctx *ctx_kaTO = group_ctxs_ka_sequence(wait_secs);
01834     int32_t n_hnds;
01835 
01836     if(ctx_kaTO) {
01837         *app = CLIENT(ctx_kaTO)->app;
01838         return MQP_ERR_NETWORK;
01839     }
01840 
01841     conn2used_ctxs(wait_secs);   /* Now, add new 'ctx'(s) to 'used ctxs' */
01842 
01843     recv_hvec[0] = loopb_net;
01844     recv_hvec_load(&recv_hvec[1], MAX_NWCONN + 1, used_ctxs);
01845 
01846     wait_secs = group_ctxs_adj_wait_secs_get(wait_secs);
01847 
01848     n_hnds = net_ops->io_mon(recv_hvec, &send_hvec,
01849                              &rsvd_hvec, wait_secs);
01850     if(0 == n_hnds)
01851         n_hnds = MQP_ERR_TIMEOUT;
01852     else if(n_hnds < 0)
01853         n_hnds = MQP_ERR_LIBQUIT;
01854 
01855     return n_hnds;
01856 }
01857 
01858 static int32_t proc_loopback_recv(int32_t net)
01859 {
01860 
01861     int32_t rv = 0;
01862     uint8_t buf[LOOP_DLEN];
01863 
01864     /* Thanks for waking-up thread, but ain't got much to do now */
01865     rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0);
01866     if(rv <= 0) {
01867         memset(print_buf, 0x00, PRINT_BUF_LEN);
01868         sprintf((char*) print_buf, "MQP_ERR_LIBQUIT %i\r\n",MQP_ERR_LIBQUIT);
01869         Uart_Write((uint8_t *) print_buf);
01870         net_ops->close(net);
01871         return MQP_ERR_LIBQUIT;
01872     }
01873 
01874     return rv;
01875 }
01876 
01877 static struct client_ctx *net_cl_ctx_find(int32_t net) {
01878     struct client_ctx *cl_ctx = used_ctxs;
01879 
01880     while(cl_ctx && (net != cl_ctx->net))
01881         cl_ctx = cl_ctx->next;
01882 
01883     return cl_ctx;
01884 }
01885 
01886 static int32_t proc_net_data_recv(int32_t net, struct mqtt_packet *mqp, void **app)
01887 {
01888 
01889     /* Note: used_ctxs are always managed by a single RX task */
01890     struct client_ctx  *cl_ctx = net_cl_ctx_find(net);
01891     int32_t rv = MQP_ERR_NOTCONN;
01892 
01893     if(NULL == cl_ctx) {
01894         return rv; /* TX removed it interim, mustn't happen */
01895     }
01896     return mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, 1, app);
01897 }
01898 
01899 static int32_t cl_recv(struct mqtt_packet *mqp, uint32_t wait_secs, void **app)
01900 {
01901 
01902     int32_t rv = MQP_ERR_NETWORK;
01903     int32_t n_hnds = 0, idx = 0;
01904 
01905     rv = group_ctxs_rx_prep(wait_secs, app);
01906     if(rv > 0)
01907         n_hnds = rv;
01908 
01909     for(idx = 0; (idx < n_hnds) && (rv > 0); idx++) {
01910         int32_t net = recv_hvec[idx];
01911         if(loopb_net == net)
01912             rv = proc_loopback_recv(net); /* UDP Packet */
01913         else {
01914             rv = proc_net_data_recv(net, mqp, app);
01915             if(false == grp_has_cbfn)
01916                 break; /* 'CTX': inform application */
01917         }
01918     }
01919 
01920     return rv;
01921 }
01922 
01923 static int32_t grp_net_setup_create()
01924 {
01925 
01926     if(0 == loopb_portid) {
01927         return MQP_ERR_NOT_DEF;
01928     }
01929     if(NULL == net_ops) {
01930         return MQP_ERR_NET_OPS;
01931     }
01932     if(-1 == loopb_net) {
01933         loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, loopb_portid, NULL);
01934 
01935         if(-1 == loopb_net) {
01936             return MQP_ERR_LIBQUIT;
01937         }
01938     }
01939 
01940     return 1;
01941 }
01942 
01943 int32_t mqtt_client_await_msg(struct mqtt_packet *mqp, uint32_t wait_secs, void **app)
01944 {
01945 
01946     int32_t rv = MQP_ERR_NOTCONN;
01947     *app = NULL;
01948 
01949     if(NULL == mqp)
01950         return MQP_ERR_FNPARAM; /* Didn't get a valid MQP */
01951 
01952     if(true == grp_has_cbfn)
01953         return MQP_ERR_BADCALL; /* Err: LIB has CB config */
01954 
01955     rv = grp_net_setup_create();
01956     if(rv <= 0)
01957         return rv;
01958 
01959     do {
01960         rv = cl_recv(mqp, wait_secs, app);
01961 
01962     } while((rv > 0) && MQP_RX_DO_NOT_RPT_COR(mqp));
01963 
01964     return rv;
01965 }
01966 
01967 int32_t mqtt_client_run(uint32_t wait_secs)
01968 {
01969 
01970     void *app = NULL;
01971     int32_t rv = -1;
01972 
01973     if(false == grp_has_cbfn) {
01974         return MQP_ERR_BADCALL; /* Err: LIB has no CB config */
01975     }
01976     rv = grp_net_setup_create();
01977     if(rv <= 0) {
01978         return rv;
01979     }
01980     do {
01981         rv = cl_recv(NULL, wait_secs, &app);
01982 
01983     } while((rv > 0) ||
01984             /* 'ctx' related errors are handled by the callbacks */
01985             ((rv != MQP_ERR_LIBQUIT) && (rv != MQP_ERR_TIMEOUT)));
01986 
01987     return rv;
01988 }
01989 
01990 /*------------------------------------------------------------------------------
01991  * Buffer Pool and management, other registrations and initialization.
01992  *------------------------------------------------------------------------------
01993  */
01994 static struct mqtt_packet *free_list = NULL;
01995 
01996 static struct mqtt_packet *mqp_alloc_atomic(void) {
01997 
01998     struct mqtt_packet *mqp = NULL;
01999 
02000 //        MUTEX_LOCKIN();
02001     if( xSemaphore != NULL ) {
02002         // See if we can obtain the semaphore.  If the semaphore is not available
02003         // wait 10 ticks to see if it becomes free.
02004         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
02005             // We were able to obtain the semaphore and can now access the
02006             // shared resource.
02007             mqp = free_list;
02008             if(mqp) {
02009                 free_list = mqp->next;
02010             }
02011 
02012             // We have finished accessing the shared resource.  Release the
02013             // semaphore.
02014             xSemaphoreGive( xSemaphore );
02015 
02016         } else {
02017             // We could not obtain the semaphore and can therefore not access
02018             // the shared resource safely.
02019             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
02020         }
02021     }
02022 
02023 //        MUTEX_UNLOCK();
02024     return mqp;
02025 }
02026 
02027 struct mqtt_packet *mqp_client_alloc(uint8_t msg_type, uint8_t offset) {
02028 
02029     struct mqtt_packet *mqp = mqp_alloc_atomic();
02030     if(NULL == mqp) {
02031         USR_INFO("MQP alloc failed - msg type 0x%02x\n\r", msg_type);
02032         return NULL;
02033     }
02034 
02035     mqp_init(mqp, offset);
02036     mqp->msg_type = msg_type;
02037 
02038     return mqp;
02039 }
02040 
02041 /* Do not use this routine with-in this file. */
02042 static void free_mqp(struct mqtt_packet *mqp)
02043 {
02044     /* Must be used in a locked state */
02045     mqp->next = free_list;
02046     free_list = mqp;
02047 }
02048 
02049 int32_t mqtt_client_buffers_register(uint32_t num_mqp, struct mqtt_packet *mqp_vec,
02050                                      uint32_t buf_len, uint8_t *buf_vec)
02051 {
02052     uint32_t i, j;
02053 
02054     if((0 == num_mqp) || (0 == buf_len) || free_list)
02055         return -1;
02056 
02057     for(i = 0, j = 0; i < num_mqp; i++, j += buf_len) {
02058         struct mqtt_packet *mqp = mqp_vec + i;
02059 
02060         mqp->buffer = buf_vec + j;
02061         mqp->maxlen = buf_len;
02062 
02063         mqp->free   = free_mqp;
02064         mqp->next   = free_list;
02065         free_list   = mqp;
02066     }
02067 
02068     return 0;
02069 }
02070 
02071 int32_t mqtt_client_ctx_will_register(void *ctx,
02072                                       const struct utf8_string  *will_top,
02073                                       const struct utf8_string  *will_msg,
02074                                       enum mqtt_qos will_qos, bool retain)
02075 {
02076     uint8_t B = 0;
02077 
02078     if((NULL == ctx) || ((NULL == will_top) && (NULL != will_msg)))
02079         return -1;  /* Bad Combo */
02080 
02081     if(NULL != will_top) {
02082         RET_IF_INVALID_UTF8(will_top);
02083 
02084         B = QOS_VALUE(will_qos) << 3;
02085         if(retain)
02086             B |= WILL_RETAIN_VAL;
02087 
02088         if(NULL != will_msg)
02089             RET_IF_INVALID_UTF8(will_msg);
02090     }
02091 
02092     CLIENT(ctx)->conn_pl_utf8s[1] = will_top;
02093     CLIENT(ctx)->conn_pl_utf8s[2] = will_msg;
02094 
02095     CLIENT(ctx)->will_opts = B;
02096 
02097     return 0;
02098 }
02099 
02100 int32_t mqtt_client_ctx_info_register(void *ctx,
02101                                       const struct utf8_string *client_id,
02102                                       const struct utf8_string *user_name,
02103                                       const struct utf8_string *pass_word)
02104 {
02105     const struct utf8_string *users_pwd = NULL;
02106 
02107     if(NULL == ctx)
02108         return -1;
02109 
02110     if(NULL != client_id)
02111         RET_IF_INVALID_UTF8(client_id);
02112 
02113     if(NULL != user_name) {
02114         RET_IF_INVALID_UTF8(user_name);
02115 
02116         if(NULL != pass_word)
02117             RET_IF_INVALID_UTF8(pass_word);
02118 
02119         users_pwd = pass_word;
02120     }
02121 
02122     CLIENT(ctx)->conn_pl_utf8s[0] = client_id;
02123     CLIENT(ctx)->conn_pl_utf8s[3] = user_name;
02124     CLIENT(ctx)->conn_pl_utf8s[4] = users_pwd;
02125 
02126     return 0;
02127 }
02128 
02129 int32_t mqtt_client_net_svc_register(const struct device_net_services *net)
02130 {
02131     if(net && net->open && net->send && net->recv  &&
02132             net->send_dest && net->recv_from && net->close
02133             && net->io_mon && net->time) {
02134         net_ops = net;
02135         return 0;
02136     }
02137 
02138     return -1;
02139 }
02140 
02141 static void cl_ctx_setup(struct client_ctx *cl_ctx, /* WR Object */
02142                          const struct mqtt_client_ctx_cfg *ctx_cfg,
02143                          const struct mqtt_client_ctx_cbs *ctx_cbs,
02144                          void *app)
02145 {
02146 
02147     struct client_desc *client = CLIENT(cl_ctx);
02148 
02149     cl_ctx->flags       = ctx_cfg->config_opts;
02150 
02151     client->nwconn_opts = ctx_cfg->nwconn_opts;
02152     client->server_addr = ctx_cfg->server_addr;
02153     client->port_number = ctx_cfg->port_number;
02154 
02155     client->app         = app;
02156 
02157 
02158     if(NULL != ctx_cfg->nw_security)
02159         buf_wr_nbytes((uint8_t*)&client->nw_security,
02160                       (uint8_t*)ctx_cfg->nw_security,
02161                       sizeof(struct secure_conn));
02162 
02163     if(NULL != ctx_cbs) {
02164         // set callback flag
02165         struct mqtt_client_ctx_cbs *cbs_ctx = CTX_CBS_PTR(client);
02166         cbs_ctx->publish_rx = ctx_cbs->publish_rx;
02167         cbs_ctx->ack_notify = ctx_cbs->ack_notify;
02168         cbs_ctx->disconn_cb = ctx_cbs->disconn_cb;
02169     }
02170 
02171     return;
02172 }
02173 
02174 int32_t mqtt_client_ctx_create(const struct mqtt_client_ctx_cfg *ctx_cfg,
02175                                const struct mqtt_client_ctx_cbs *ctx_cbs,
02176                                void *app, void **ctx)
02177 {
02178 
02179     struct client_ctx *cl_ctx = NULL;
02180 
02181     if((NULL == ctx_cfg)                 ||
02182             (NULL == ctx_cfg->server_addr)    ||
02183             (0    == ctx_cfg->port_number)) {
02184         return -1;
02185     }
02186     if(ctx_cfg->config_opts & MQTT_CFG_MK_GROUP_CTX) {
02187         if(grp_has_cbfn ^ (!!ctx_cbs)) {
02188             return -1;
02189         }
02190     }
02191 
02192 //        MUTEX_LOCKIN();
02193     if( xSemaphore != NULL ) {
02194         // See if we can obtain the semaphore.  If the semaphore is not available
02195         // wait 10 ticks to see if it becomes free.
02196         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
02197             // We were able to obtain the semaphore and can now access the
02198             // shared resource.
02199             if(free_ctxs) {
02200                 cl_ctx       = free_ctxs;
02201                 free_ctxs    = cl_ctx->next;
02202                 cl_ctx->next = NULL;
02203             }
02204 
02205             // We have finished accessing the shared resource.  Release the
02206             // semaphore.
02207             xSemaphoreGive( xSemaphore );
02208 
02209         } else {
02210             // We could not obtain the semaphore and can therefore not access
02211             // the shared resource safely.
02212             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
02213         }
02214     }
02215 
02216 //        MUTEX_UNLOCK();
02217 
02218     if(cl_ctx) {
02219         cl_ctx_setup(cl_ctx, ctx_cfg, ctx_cbs, app);
02220         *ctx = (void*) cl_ctx;
02221         return 0;
02222     }
02223 
02224     return -1;
02225 }
02226 
02227 int32_t mqtt_client_ctx_delete(void *ctx)
02228 {
02229 
02230     struct client_ctx  *cl_ctx = (struct client_ctx*) ctx;
02231     int32_t rv = -1; /* Not sure about deletion as yet */
02232 
02233 //        MUTEX_LOCKIN();
02234     if( xSemaphore != NULL ) {
02235         // See if we can obtain the semaphore.  If the semaphore is not available
02236         // wait 10 ticks to see if it becomes free.
02237         if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
02238             // We were able to obtain the semaphore and can now access the
02239             // shared resource.
02240             if((NULL == cl_ctx)          ||
02241                     (-1   != cl_ctx->net)     ||
02242                     (awaits_pkts(cl_ctx))) {
02243                 goto mqtt_client_ctx_delete_exit1;
02244             }
02245             rv = 0; /* OK to delete ctx */
02246             client_reset(CLIENT(cl_ctx));
02247             cl_ctx_freeup(cl_ctx);
02248 
02249             // We have finished accessing the shared resource.  Release the
02250             // semaphore.
02251             //xSemaphoreGive( xSemaphore );
02252         } else {
02253             // We could not obtain the semaphore and can therefore not access
02254             // the shared resource safely.
02255             Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
02256         }
02257     }
02258 
02259 mqtt_client_ctx_delete_exit1:
02260 //        MUTEX_UNLOCK();
02261     xSemaphoreGive(xSemaphore);
02262 
02263     return rv;
02264 }
02265 
02266 int32_t mqtt_client_lib_init(const struct mqtt_client_lib_cfg *lib_cfg)
02267 {
02268     if((NULL == lib_cfg) || (NULL == lib_cfg->debug_printf))
02269         return -1;
02270 
02271     debug_printf = lib_cfg->debug_printf; /* Facilitate debug */
02272 
02273     if(INIT_DONE_STATE == cl_lib_state) {
02274         Uart_Write((uint8_t*)"C: Error trying to re-initialize \n\r");
02275         USR_INFO("C: Error trying to re-initialize \n\r");
02276         return -1;
02277     }
02278 
02279     USR_INFO("Version: Client LIB %s, Common LIB %s.\n\r",
02280              MQTT_CLIENT_VERSTR, MQTT_COMMON_VERSTR);
02281 
02282     client_desc_init();
02283 
02284     cl_lib_state = INIT_DONE_STATE;
02285 
02286     loopb_portid = lib_cfg->loopback_port;
02287     grp_has_cbfn = lib_cfg->grp_uses_cbfn;
02288 
02289     mutex        = lib_cfg->mutex;
02290     mutex_lockin = lib_cfg->mutex_lockin;
02291     mutex_unlock = lib_cfg->mutex_unlock;
02292 
02293     aux_dbg_enbl = lib_cfg->aux_debug_en;
02294 
02295     return 0;
02296 }
02297 
02298 int32_t mqtt_client_lib_exit()
02299 {
02300     struct client_ctx *cl_ctx = free_ctxs;
02301     int32_t count = 0;
02302 
02303     while(cl_ctx) {
02304         cl_ctx = cl_ctx->next;
02305         count++;
02306     }
02307 
02308     if(MAX_NWCONN == count) {
02309         cl_lib_state = WAIT_INIT_STATE;
02310         free_ctxs    = NULL;
02311         return 0;
02312     }
02313 
02314     return -1;
02315 }
02316 
02317 }//namespace mbed_mqtt