TI's MQTT Demo with freertos CM4F

Dependencies:   mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers sl_mqtt_client.cpp Source File

sl_mqtt_client.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 #include "sl_mqtt_client.h"
00017 #include "cc31xx_sl_net.h"
00018 //#include "cc3100_nonos.h"
00019 #include "myBoardInit.h"
00020 #include "cli_uart.h"
00021 #include "osi.h"
00022 
00023 using namespace mbed_cc3100;
00024 
00025 #if (THIS_BOARD == Seeed_Arch_Max)
00026 cc3100 _cc3100module(PB_9, PB_8, PD_12, PD_13, PD_11, SPI(PB_5, PB_4, PB_3));//Seeed_Arch_Max  irq, nHib, cs, mosi, miso, sck
00027 #elif (THIS_BOARD == EA_MBED_LPC4088)
00028 class cc3100 _cc3100module(p14, p15, p9, p10, p8, SPI(p5, p6, p7));//LPC4088  irq, nHib, cs, mosi, miso, sck
00029 #elif (THIS_BOARD == LPCXpresso4337)
00030 class cc3100 _cc3100module(P2_2, P3_5, P1_2, SPI(P1_4, P1_3, PF_4));//LPCXpresso4337  irq, nHib, cs, mosi, miso, sck
00031 #endif
00032 
00033 namespace mbed_mqtt {
00034 
00035 #ifndef CFG_SL_CL_BUF_LEN
00036 #define BUF_LEN          1024 /*Buffer length*/
00037 #else
00038 #define BUF_LEN          CFG_SL_CL_BUF_LEN
00039 #endif
00040 
00041 #ifndef CFG_SL_CL_MAX_MQP
00042 #define MAX_MQP          2 /* # of buffers */
00043 #else
00044 #define MAX_MQP          CFG_SL_CL_MAX_MQP
00045 #endif
00046 
00047 #ifndef CFG_SL_CL_STACK
00048 #define OSI_STACK_SIZE 2048
00049 #else
00050 #define OSI_STACK_SIZE CFG_SL_CL_STACK
00051 #endif
00052 //*****************************************************************************
00053 // global variables used
00054 //*****************************************************************************
00055 
00056 
00057 struct sl_client_ctx {
00058         /* Client information details */
00059         char *client_id;
00060         char *usr_name;
00061         char *usr_pwd;
00062         SlMqttWill_t mqtt_will;
00063 
00064         /* Client management variables */
00065         bool in_use;
00066         char awaited_ack;
00067         uint16_t conn_ack;
00068 
00069         /*Sync Object used to ensure single inflight message -
00070           used in blocking mode to wait on ack*/
00071         _SlSyncObj_t ack_sync_obj;
00072         /* Suback QoS pointer to store passed by application */
00073         char *suback_qos;
00074 
00075         /* Application information */
00076         void* app_hndl;
00077         SlMqttClientCbs_t app_cbs;
00078 
00079         /* Library information */
00080         void *cli_hndl;
00081         bool blocking_send;
00082 };
00083 
00084 #ifndef CFG_CL_MQTT_CTXS
00085 #define MAX_SIMULTANEOUS_SERVER_CONN 4
00086 #else
00087 #define MAX_SIMULTANEOUS_SERVER_CONN CFG_MQTT_CL_CTXS
00088 #endif
00089 
00090 
00091 static struct sl_client_ctx sl_cli_ctx[MAX_SIMULTANEOUS_SERVER_CONN];
00092 
00093 /* Lock Object to be passed to the MQTT lib */
00094 _SlLockObj_t mqtt_lib_lockobj;
00095 
00096 /* Creating a pool of MQTT coonstructs that can be used by the MQTT Lib
00097      mqp_vec =>pointer to a pool of the mqtt packet constructs
00098       buf_vec => the buffer area which is attached with each of mqp_vec*/
00099 DEFINE_MQP_BUF_VEC(MAX_MQP, mqp_vec, BUF_LEN, buf_vec);
00100 
00101 /*Task Priority and Response Time*/
00102 uint32_t g_wait_secs;
00103 
00104 bool g_multi_srvr_conn = false;
00105 
00106 /* Receive task handle */
00107 OsiTaskHandle g_rx_task_hndl;
00108 
00109 /* Synchronization object used between the Rx and App task */
00110 _SlSyncObj_t g_rx_tx_sync_obj;
00111 
00112 #define RX_TX_SIGNAL_WAIT()     _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, SL_OS_WAIT_FOREVER)
00113 #define RX_TX_SIGNAL_POST()     _cc3100module._nonos.sl_SyncObjSignal(&g_rx_tx_sync_obj)
00114 
00115 /* MQTT Quality of Service */
00116 static const enum mqtt_qos qos[] ={
00117         MQTT_QOS0,
00118         MQTT_QOS1,
00119         MQTT_QOS2
00120 };
00121 
00122 /* Network Services specific to cc3200*/
00123 struct device_net_services net={&comm_open,&tcp_send,&tcp_recv,&send_dest,&recv_from,&comm_close,
00124                                         &tcp_listen,&tcp_accept,&tcp_select,&rtc_secs};
00125         
00126 /* Defining Retain Flag. Getting retain flag bit from fixed header*/
00127 #define MQP_PUB_RETAIN(mqp)    ( ((mqp->fh_byte1 ) & 0x01)?true:false )
00128 
00129 /*Defining Duplicate flag. Getting Duplicate flag bit from fixed header*/
00130 #define MQP_PUB_DUP(mqp)        (((mqp->fh_byte1>>3) & 0x01)?true:false )
00131 
00132 /*Defining QOS value. Getting QOS value from fixed header*/
00133 #define MQP_PUB_QOS(mqp)        ((mqp->fh_byte1 >>1)  & 0x03)
00134 
00135 #define ACK_RX_SIGNAL_WAIT(ack_sync_obj)     _cc3100module._nonos.sl_SyncObjWait(ack_sync_obj, SL_OS_WAIT_FOREVER)    
00136 #define ACK_RX_SIGNAL_POST(ack_sync_obj)     _cc3100module._nonos.sl_SyncObjSignal(ack_sync_obj)
00137 
00138 #define STR2UTF_CONV(utf_s, str) {utf_s.buffer = (char *)str; utf_s.length = strlen(str);}
00139 
00140 /*Defining Event Messages*/
00141 #define MQTT_ACK "Ack Received from server"
00142 #define MQTT_ERROR "Connection Lost with broker"
00143 
00144 //*****************************************************************************
00145 // process_notify_ack_cb 
00146 //*****************************************************************************
00147 static void 
00148 process_notify_ack_cb(void *app, uint8_t msg_type, uint16_t msg_id, uint8_t *buf, uint32_t len)
00149 {
00150 
00151         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00152         int32_t loopcnt;
00153 
00154         switch(msg_type) {
00155 
00156         case MQTT_CONNACK:
00157                 client_ctx->conn_ack = ((buf[0] << 8) | buf[1]);
00158                 client_ctx->awaited_ack = 0;
00159                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00160         break;
00161         case MQTT_SUBACK:
00162                 if(true == client_ctx->blocking_send) {
00163                         for(loopcnt = 0; loopcnt < len; loopcnt++) {
00164                                 client_ctx->suback_qos[loopcnt] = buf[loopcnt];
00165                         }
00166                                 
00167                         if(client_ctx->awaited_ack == msg_type) {
00168                                 client_ctx->awaited_ack = 0;
00169                                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00170                         }
00171                 } else {
00172                         client_ctx->app_cbs.sl_ExtLib_MqttEvent(
00173                                 client_ctx->app_hndl,
00174                                 SL_MQTT_CL_EVT_SUBACK, buf, len);
00175                 }
00176         break;
00177         /* Error returns --- TBD */
00178         default:
00179                 if(true == client_ctx->blocking_send) {
00180                         if(client_ctx->awaited_ack == msg_type) {
00181                                 client_ctx->awaited_ack = 0;
00182                                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00183                         }
00184                 } else {
00185                         client_ctx->app_cbs.sl_ExtLib_MqttEvent(
00186                                 client_ctx->app_hndl,
00187                                 msg_type, MQTT_ACK, strlen(MQTT_ACK));
00188                 }
00189         break;
00190 
00191         }
00192        
00193         return ;
00194 }
00195 
00196 //*****************************************************************************
00197 // process_publish_rx_cb 
00198 //*****************************************************************************
00199 static bool 
00200 process_publish_rx_cb(void *app, bool dup, enum mqtt_qos qos, bool retain, 
00201                    struct mqtt_packet *mqp)
00202 {
00203         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00204         /*If incoming message is a publish message from broker */
00205         /* Invokes the event handler with topic,topic length,payload and 
00206            payload length values*/
00207         client_ctx->app_cbs.sl_ExtLib_MqttRecv(client_ctx->app_hndl,
00208                                (char const*)MQP_PUB_TOP_BUF(mqp),
00209                                MQP_PUB_TOP_LEN(mqp),MQP_PUB_PAY_BUF(mqp),
00210                                MQP_PUB_PAY_LEN(mqp), dup,
00211                                MQP_PUB_QOS(mqp), retain);
00212         return true;
00213 }
00214 
00215 //*****************************************************************************
00216 // process_disconn_cb
00217 //*****************************************************************************
00218 static void process_disconn_cb(void *app, int32_t cause)
00219 {
00220         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00221 
00222         if(client_ctx->awaited_ack != 0) {
00223                 client_ctx->awaited_ack = MQTT_DISCONNECT;
00224                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00225         } else {
00226                 if(false == client_ctx->blocking_send) {
00227                         /* Invoke the disconnect callback */
00228                         client_ctx->app_cbs.sl_ExtLib_MqttDisconn(
00229                                                client_ctx->app_hndl);
00230                 }
00231         }
00232         return;
00233 }
00234 
00235 
00236 //*****************************************************************************
00237 // Receive Task. Invokes in the context of a task 
00238 //*****************************************************************************
00239 void 
00240 VMqttRecvTask(void *pArgs)  
00241 {
00242 
00243         int32_t retval;
00244         do {
00245                 if(false == g_multi_srvr_conn) {
00246                         /* wait on broker connection */
00247                         RX_TX_SIGNAL_WAIT();
00248                         while(1) {
00249                                 /* Forcing to use the index 0 always - caution */
00250                                 retval = mqtt_client_ctx_run(
00251                                                 sl_cli_ctx[0].cli_hndl, 
00252                                                 g_wait_secs);
00253                                 if((retval < 0) && 
00254                                         (retval != MQP_ERR_TIMEOUT)) {
00255                                         break;
00256                                 }
00257                         }
00258                 } else {
00259                         mqtt_client_run(g_wait_secs);
00260                 }
00261         }while(1);
00262 }
00263 
00264 static struct sl_client_ctx *get_available_clictx_mem()
00265 {
00266         int32_t loopcnt, max_store;
00267 
00268         if(false == g_multi_srvr_conn) {
00269                 max_store = 1;
00270         } else {
00271                 max_store = MAX_SIMULTANEOUS_SERVER_CONN;
00272         }
00273 
00274         for(loopcnt = 0; loopcnt < max_store; loopcnt++) {
00275                 if(false == sl_cli_ctx[loopcnt].in_use) {
00276                         sl_cli_ctx[loopcnt].in_use = true;
00277                         return(&(sl_cli_ctx[loopcnt]));
00278                 }
00279         }
00280 
00281         return NULL;
00282 }
00283 
00284 //*****************************************************************************
00285 // sl_ExtLib_MqttClientCtxCreate
00286 //*****************************************************************************
00287 void *sl_ExtLib_MqttClientCtxCreate(const SlMqttClientCtxCfg_t *ctx_cfg,
00288                                       const SlMqttClientCbs_t *msg_cbs,
00289                                       void *app_hndl)
00290 {
00291 
00292         struct sl_client_ctx *client_ctx_ptr;
00293         struct mqtt_client_ctx_cfg lib_ctx_cfg;
00294         struct mqtt_client_ctx_cbs lib_cli_cbs;
00295         struct secure_conn lib_nw_security;
00296         int32_t retval;
00297        
00298         /* Get a client context storage area */
00299         client_ctx_ptr = get_available_clictx_mem();
00300         if(client_ctx_ptr == NULL) {
00301                 return NULL;
00302         }
00303         
00304         /* Create the sync object to signal on arrival of ACK packet */
00305         _cc3100module._nonos.sl_SyncObjCreate(&client_ctx_ptr->ack_sync_obj, "AckSyncObject");
00306         _cc3100module._nonos.sl_SyncObjWait(&client_ctx_ptr->ack_sync_obj, SL_OS_NO_WAIT);
00307         
00308         /* Store the application handle */
00309         client_ctx_ptr->app_hndl = app_hndl;
00310         /* Initialize the ACK awaited */
00311         client_ctx_ptr->awaited_ack = 0;
00312         
00313         /* Store the application callbacks, to be invoked later */
00314         client_ctx_ptr->app_cbs.sl_ExtLib_MqttRecv = \
00315                                         msg_cbs->sl_ExtLib_MqttRecv;
00316         client_ctx_ptr->app_cbs.sl_ExtLib_MqttEvent = \
00317                                         msg_cbs->sl_ExtLib_MqttEvent;
00318         client_ctx_ptr->app_cbs.sl_ExtLib_MqttDisconn = \
00319                                         msg_cbs->sl_ExtLib_MqttDisconn;
00320 
00321         /* Initialize the client lib */
00322         lib_ctx_cfg.config_opts = ctx_cfg->mqtt_mode31;
00323         
00324         if(true == g_multi_srvr_conn) {
00325                 lib_ctx_cfg.config_opts |=
00326                         (MQTT_CFG_APP_HAS_RTSK | MQTT_CFG_MK_GROUP_CTX);
00327         } else {
00328                 lib_ctx_cfg.config_opts |= MQTT_CFG_APP_HAS_RTSK;
00329         }
00330         
00331         /* get the network connection options */
00332         client_ctx_ptr->blocking_send = ctx_cfg->blocking_send;
00333         lib_ctx_cfg.nwconn_opts = ctx_cfg->server_info.netconn_flags;
00334         lib_ctx_cfg.nwconn_opts |= DEV_NETCONN_OPT_TCP;
00335         lib_ctx_cfg.server_addr = (char*)ctx_cfg->server_info.server_addr;
00336         lib_ctx_cfg.port_number = ctx_cfg->server_info.port_number;
00337         /*initialize secure socket parameters */
00338         if(ctx_cfg->server_info.netconn_flags & SL_MQTT_NETCONN_SEC) {
00339                 lib_ctx_cfg.nw_security = &lib_nw_security;
00340                 /* initialize secure socket parameters */
00341                 lib_ctx_cfg.nw_security->cipher = 
00342                         (void*)&(ctx_cfg->server_info.cipher);
00343                 lib_ctx_cfg.nw_security->method = 
00344                         (void*)&(ctx_cfg->server_info.method);
00345                 lib_ctx_cfg.nw_security->n_file = 
00346                         ctx_cfg->server_info.n_files;
00347                 lib_ctx_cfg.nw_security->files = 
00348                         (char**)ctx_cfg->server_info.secure_files;
00349         }
00350         else {
00351                 lib_ctx_cfg.nw_security=NULL;
00352         }
00353 
00354         lib_cli_cbs.publish_rx = process_publish_rx_cb;
00355         lib_cli_cbs.ack_notify = process_notify_ack_cb;
00356         lib_cli_cbs.disconn_cb = process_disconn_cb;
00357         
00358         retval = mqtt_client_ctx_create(&lib_ctx_cfg,
00359                                         &lib_cli_cbs, client_ctx_ptr,
00360                                         &client_ctx_ptr->cli_hndl);
00361         
00362         if(retval < 0) {
00363                 Uart_Write((uint8_t*)"mqtt_client_ctx_create failed\r\n");
00364                 client_ctx_ptr->in_use = false;
00365                 return NULL;
00366         }
00367         
00368         return (void*)client_ctx_ptr;
00369 }
00370 
00371 //*****************************************************************************
00372 // sl_ExtLib_MqttClientCtxDelete
00373 //*****************************************************************************
00374 int32_t sl_ExtLib_MqttClientCtxDelete(void *cli_ctx)
00375 {
00376         struct sl_client_ctx *client_ctx=(struct sl_client_ctx *)cli_ctx;
00377         int32_t retval;
00378 
00379         retval = mqtt_client_ctx_delete(client_ctx->cli_hndl);
00380         if(retval >= 0) {
00381                 /* Check for more paramaters --- TBD */
00382                 _cc3100module._nonos.sl_SyncObjDelete(&client_ctx->ack_sync_obj);
00383                 /* Free up the context */
00384                 memset(client_ctx, 0, sizeof(struct sl_client_ctx));
00385         }
00386 
00387         return (retval < 0)? -1: 0;
00388 }
00389 
00390 void
00391 mutex_lockup(void *mqtt_lib_lock)
00392 {
00393         _cc3100module._nonos.sl_LockObjLock((_SlLockObj_t*)mqtt_lib_lock, SL_OS_WAIT_FOREVER);
00394 }
00395 
00396 void
00397 mutex_unlock(void *mqtt_lib_lock)
00398 {
00399         _cc3100module._nonos.sl_LockObjUnlock((_SlLockObj_t*)mqtt_lib_lock);
00400 }
00401 
00402 //*****************************************************************************
00403 // sl_ExtLib_MqttClientInit
00404 //*****************************************************************************
00405 int32_t sl_ExtLib_MqttClientInit(const SlMqttClientLibCfg_t  *cfg)
00406 {
00407 
00408         struct mqtt_client_lib_cfg lib_cfg;
00409         /* Initialize the control variables */
00410         memset(sl_cli_ctx, 0, sizeof(sl_cli_ctx));
00411 
00412         /* Setup the MQTT client lib configurations */
00413         lib_cfg.loopback_port = cfg->loopback_port;
00414         /* Initializing the sync object */
00415         g_rx_tx_sync_obj = 0;
00416 
00417         if(cfg->loopback_port) {
00418                 g_multi_srvr_conn = true;
00419                 lib_cfg.grp_uses_cbfn = true;   /* Does use the group callback func */
00420         } else {
00421                 g_multi_srvr_conn = false;
00422                 lib_cfg.grp_uses_cbfn = false;   /* Doesnt use the group callback func */
00423                 /* Create the sync object between Rx and App tasks */
00424                 _cc3100module._nonos.sl_SyncObjCreate(&g_rx_tx_sync_obj, "RxTxSyncObject");
00425                 _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, SL_OS_NO_WAIT);
00426         }
00427         
00428         g_wait_secs = cfg->resp_time;
00429         /* Setup the mutex operations */
00430         _cc3100module._nonos.sl_LockObjCreate(&mqtt_lib_lockobj,"MQTT Lock");
00431         lib_cfg.mutex = (void *)&mqtt_lib_lockobj;
00432         lib_cfg.mutex_lockin = mutex_lockup;
00433         lib_cfg.mutex_unlock = mutex_unlock;
00434         /* hooking DBG print function */
00435         lib_cfg.debug_printf = cfg->dbg_print;
00436         /* Initialize client library */
00437         if(mqtt_client_lib_init(&lib_cfg) < 0) {
00438                 Uart_Write((uint8_t*)"mqtt_client_lib_init failed\r\n");
00439                 return -1;
00440         }
00441 
00442         /* provide MQTT Lib information to create a pool of MQTT constructs. */
00443         mqtt_client_buffers_register(MAX_MQP,mqp_vec,BUF_LEN,&buf_vec[0][0]);
00444         /* Register network services speicific to CC3200 */
00445         mqtt_client_net_svc_register(&net);
00446         /* start the receive task */
00447         osi_TaskCreate( VMqttRecvTask, 
00448                 (const signed char *) "MQTTRecv",
00449                 OSI_STACK_SIZE,
00450                 NULL, 
00451                 cfg->rx_tsk_priority, &g_rx_task_hndl );
00452                 
00453         return 0;
00454 }
00455 
00456 //*****************************************************************************
00457 // sl_ExtLib_MqttClientExit
00458 //*****************************************************************************
00459 int32_t 
00460 sl_ExtLib_MqttClientExit()
00461 {
00462         int32_t retval;
00463 
00464         /* Deinitialize the MQTT client lib */
00465         retval = mqtt_client_lib_exit();
00466         if(retval >= 0) {
00467                 if(mqtt_lib_lockobj != NULL)
00468                 {
00469                         /* Delete the MQTT lib lock object */
00470                         _cc3100module._nonos.sl_LockObjDelete(&mqtt_lib_lockobj);
00471                 }
00472                 if(g_rx_task_hndl != NULL)
00473                 {
00474                         /* Delete the Rx Task */
00475                         osi_TaskDelete(&g_rx_task_hndl);
00476                 }
00477                 
00478                 if(g_rx_tx_sync_obj != NULL)
00479                 {
00480                         /* Delete the Rx-Tx task sync object */
00481                         _cc3100module._nonos.sl_SyncObjDelete(&g_rx_tx_sync_obj);
00482                 }
00483                 
00484                 g_rx_task_hndl = NULL;
00485                 g_rx_tx_sync_obj = NULL;
00486                 mqtt_lib_lockobj = NULL;
00487         }
00488         
00489         return (retval < 0)? -1: 0;
00490 }
00491 
00492 //*****************************************************************************
00493 // sl_ExtLib_MqttClientSet
00494 //*****************************************************************************
00495 int32_t 
00496 sl_ExtLib_MqttClientSet(void *app, int32_t param, const void *value, uint32_t len)
00497 {
00498 
00499         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00500         switch(param)
00501         {
00502         case SL_MQTT_PARAM_CLIENT_ID:
00503                 /* Save the reference to the Client ID */
00504                 client_ctx->client_id =(char*)value;
00505                 break;
00506 
00507         case SL_MQTT_PARAM_USER_NAME:
00508                 /* Save the reference to the Username */
00509                 client_ctx->usr_name =(char*)value;
00510                 break;
00511 
00512         case SL_MQTT_PARAM_PASS_WORD:
00513                 /* Save the reference to the password */
00514                 client_ctx->usr_pwd =(char*)value;
00515                 break;
00516 
00517         case SL_MQTT_PARAM_WILL_PARAM:
00518                 /* Save the reference to will parameters*/
00519                 client_ctx->mqtt_will = *((SlMqttWill_t*)value);
00520                 break;
00521         default:
00522                 break;
00523         }
00524 
00525         return 0;
00526    
00527 }
00528 
00529 //*****************************************************************************
00530 // sl_ExtLib_MqttClientConnect
00531 //*****************************************************************************
00532 int32_t
00533 sl_ExtLib_MqttClientConnect(void *cli_ctx, bool clean, uint16_t keep_alive_time)
00534 { 
00535 
00536     
00537         int32_t ret = -1;
00538         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00539 
00540         /*utf8 strings into which client info will be stored*/
00541         struct utf8_string client_id, username, usr_pwd, will_topic, will_msg;
00542         struct utf8_string *usrname = NULL, *usrpasswd = NULL, *clientid=NULL;
00543 
00544         /* Provide Client ID,user name and password into MQTT Library */
00545         if(client_ctx->client_id != NULL) {
00546             STR2UTF_CONV(client_id, client_ctx->client_id);
00547             clientid = &client_id;
00548         }
00549         
00550         if(client_ctx->usr_name != NULL) {
00551                 STR2UTF_CONV(username, client_ctx->usr_name);
00552                 usrname = &username;
00553         }
00554         if(client_ctx->usr_pwd != NULL) {
00555                 STR2UTF_CONV(usr_pwd, client_ctx->usr_pwd);
00556                 usrpasswd = &usr_pwd;
00557         }
00558         ret = mqtt_client_ctx_info_register(client_ctx->cli_hndl,
00559                                 clientid, usrname, usrpasswd);
00560         if(ret < 0) {
00561                 goto mqtt_connect_exit1;
00562         }
00563         
00564         /* Register a will message, if specified, into MQTT Library */
00565         if(NULL != client_ctx->mqtt_will.will_topic ) {
00566                 STR2UTF_CONV(will_topic, client_ctx->mqtt_will.will_topic);
00567                 STR2UTF_CONV(will_msg, client_ctx->mqtt_will.will_msg);
00568                 ret = mqtt_client_ctx_will_register(client_ctx->cli_hndl,
00569                                         &will_topic, &will_msg,
00570                                         qos[client_ctx->mqtt_will.will_qos],
00571                                         client_ctx->mqtt_will.retain);
00572                 if(ret < 0) {
00573                         Uart_Write((uint8_t*)"mqtt_client_ctx_will_register fail\r\n");
00574                         goto mqtt_connect_exit1;
00575                 }
00576         }
00577         
00578         client_ctx->awaited_ack = MQTT_CONNACK;
00579         client_ctx->conn_ack = 0;
00580     
00581         /* Connect to the server */
00582         ret = mqtt_connect_msg_send(client_ctx->cli_hndl, clean, keep_alive_time);
00583         /*Network or Socket Error happens*/
00584         if(ret < 0) {
00585                 Uart_Write((uint8_t*)"mqtt_connect_msg_send failed \r\n");
00586                 goto mqtt_connect_exit1;
00587         }
00588 
00589         if(false == g_multi_srvr_conn) {
00590                 /* Unblock the receive task here */        
00591                 RX_TX_SIGNAL_POST();
00592         }
00593         
00594         /* Wait for a CONNACK here */
00595         ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00596         
00597         if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00598                 Uart_Write((uint8_t*)"sl_ExtLib_MqttClientConnect fail\r\n");        
00599                 ret = -1;
00600         } else {
00601                 ret = (int32_t)client_ctx->conn_ack;
00602         }
00603 
00604 mqtt_connect_exit1:
00605 
00606         client_ctx->awaited_ack = 0;
00607         client_ctx->conn_ack = 0;
00608         return (ret < 0)? -1: ret;
00609 }
00610 
00611 //*****************************************************************************
00612 // sl_ExtLib_MqttClientDisconnect
00613 //*****************************************************************************
00614 int32_t
00615 sl_ExtLib_MqttClientDisconnect(void *cli_ctx)
00616 {
00617         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00618         int32_t retval;
00619 
00620         if(!(mqtt_client_is_connected(client_ctx->cli_hndl)))
00621                 return -1;
00622 
00623         client_ctx->awaited_ack = MQTT_DISCONNECT;
00624 
00625         /* send the disconnect command. */
00626         retval = mqtt_disconn_send(client_ctx->cli_hndl);
00627         
00628         if(retval >= 0) {
00629                 /* wait on Rx task to acknowledge */
00630                 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00631         }
00632         client_ctx->awaited_ack = 0;
00633 
00634         return (retval < 0)? -1: 0;
00635 }
00636 
00637 
00638 //*****************************************************************************
00639 // sl_ExtLib_MqttClientSub
00640 //*****************************************************************************
00641 int32_t 
00642 sl_ExtLib_MqttClientSub(void *cli_ctx, char* const *topics,
00643                         uint8_t *qos_level, int32_t count)
00644 {
00645 #define MAX_SIMULTANEOUS_SUB_TOPICS 4
00646 
00647         int32_t ret = -1, i;
00648 
00649         struct utf8_strqos qos_topics[MAX_SIMULTANEOUS_SUB_TOPICS];
00650         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00651 
00652         if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_SUB_TOPICS)) {
00653                 goto mqtt_sub_exit1;  /* client not connected*/
00654         }
00655 
00656         for(i = 0; i < count; i++) {
00657                 qos_topics[i].buffer = topics[i];
00658                 qos_topics[i].qosreq = qos[qos_level[i]];
00659                 qos_topics[i].length = strlen(topics[i]);
00660         }
00661 
00662         /* Set up all variables to receive an ACK for the message to be sent*/
00663         if(true == client_ctx->blocking_send) {      
00664                 /* Update the qos_level to be in-out parameter */
00665                 client_ctx->suback_qos = (char*)qos_level;
00666                 client_ctx->awaited_ack = MQTT_SUBACK;
00667         }
00668 
00669         /* Send the subscription MQTT message */
00670         ret = mqtt_sub_msg_send(client_ctx->cli_hndl, qos_topics, count);
00671         if(ret < 0) {
00672                 goto mqtt_sub_exit1;
00673         }
00674 
00675         if(true == client_ctx->blocking_send) {
00676                 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00677                 if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00678                         ret = -1;
00679                 }
00680         }
00681 
00682 mqtt_sub_exit1:
00683         client_ctx->awaited_ack = 0;
00684         return (ret < 0)? -1: 0;
00685 }
00686 
00687 
00688 //*****************************************************************************
00689 // sl_ExtLib_MqttClientUnsub
00690 //*****************************************************************************
00691 int32_t
00692 sl_ExtLib_MqttClientUnsub(void *cli_ctx, char* const *topics, int32_t count)
00693 {
00694 #define MAX_SIMULTANEOUS_UNSUB_TOPICS 4
00695 
00696         int32_t ret = -1, i;
00697         struct utf8_string unsub_topics[MAX_SIMULTANEOUS_UNSUB_TOPICS];
00698         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00699 
00700         if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_UNSUB_TOPICS)) {
00701                 goto mqtt_unsub_exit1; /* Check client status */
00702         }
00703 
00704         for(i = 0; i < count; i++) {
00705                 STR2UTF_CONV(unsub_topics[i], topics[i]);
00706         }
00707 
00708         /* Set up all variables to receive an ACK for the message to be sent*/
00709         if(true == client_ctx->blocking_send) {      
00710                 client_ctx->awaited_ack = MQTT_UNSUBACK;
00711         }
00712 
00713         /* Send the unsubscription MQTT message */
00714         ret = mqtt_unsub_msg_send(client_ctx->cli_hndl, unsub_topics, count);
00715         if(ret < 0) {
00716                 goto mqtt_unsub_exit1;
00717         }
00718 
00719         if(true == client_ctx->blocking_send) {
00720                 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00721                 if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00722                         ret = -1;
00723                 }
00724         }
00725 
00726 mqtt_unsub_exit1:
00727         client_ctx->awaited_ack = 0;
00728         return (ret < 0)? -1: 0;
00729 }
00730 //*****************************************************************************
00731 // sl_ExtLib_MqttClientSend
00732 //*****************************************************************************
00733 int32_t 
00734 sl_ExtLib_MqttClientSend(void *cli_ctx, const char *topic,
00735                           const void *data, int32_t len, 
00736                           char qos_level, bool retain)
00737 {
00738 
00739         int32_t ret = -1;
00740         struct utf8_string topic_utf8;
00741         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00742 
00743         /* Check whether Client is connected or not? */
00744         if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) {
00745                 Uart_Write((uint8_t*)"mqtt client is not connected\n\r");
00746                 return ret;
00747         }
00748 
00749         STR2UTF_CONV(topic_utf8, topic);
00750 
00751         /*Set up all variables to receive an ACK for the message to be sent*/
00752         if(MQTT_QOS1 == qos[qos_level]) {
00753                 client_ctx->awaited_ack = MQTT_PUBACK;
00754         } else if(MQTT_QOS2 == qos[qos_level]) {
00755                 client_ctx->awaited_ack = MQTT_PUBCOMP;
00756         }
00757         /*publish the message*/
00758         ret = mqtt_client_pub_msg_send(client_ctx->cli_hndl, &topic_utf8,
00759                                         (const uint8_t*)data, len,
00760                                         qos[qos_level], retain);
00761         if(ret < 0) {
00762                 Uart_Write((uint8_t*)"mqtt_client_pub_msg_send failed\n\r");
00763                 goto mqtt_pub_exit1;
00764         }
00765 
00766         if(MQTT_QOS0 != qos[qos_level]) {
00767                 if(true == client_ctx->blocking_send) {
00768                         ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00769                         if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00770                                 Uart_Write((uint8_t*)"MQTT_DISCONNECT\n\r");
00771                                 ret = -1;
00772                         }
00773                 }
00774         }
00775 
00776 mqtt_pub_exit1:
00777         client_ctx->awaited_ack = 0;
00778         return (ret < 0)? -1: 0;
00779 }
00780 
00781 }//namespace mbed_mqtt 
00782 
00783