Update revision to use TI's mqtt and Freertos.

Dependencies:   mbed client server

Fork of cc3100_Test_mqtt_CM3 by David Fletcher

Revision:
3:a8c249046181
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mqtt_V1/sl/sl_mqtt_client.cpp	Thu Sep 03 14:02:37 2015 +0000
@@ -0,0 +1,791 @@
+/******************************************************************************
+*
+*   Copyright (C) 2014 Texas Instruments Incorporated
+*
+*   All rights reserved. Property of Texas Instruments Incorporated.
+*   Restricted rights to use, duplicate or disclose this code are
+*   granted through contract.
+*
+*   The program may not be used without the written permission of
+*   Texas Instruments Incorporated or against the terms and conditions
+*   stipulated in the agreement under which this program has been supplied,
+*   and under no circumstances can it be used with non-TI connectivity device.
+*
+******************************************************************************/
+
+#include "sl_mqtt_client.h"
+#include "cc31xx_sl_net.h"
+//#include "cc3100_nonos.h"
+#include "myBoardInit.h"
+#include "cli_uart.h"
+#include "osi.h"
+
+using namespace mbed_cc3100;
+
+#if (THIS_BOARD == MBED_BOARD_LPC1768)
+//cc3100 _cc3100module(p9, p10, p8, SPI(p11, p12, p13));//LPC1768  irq, nHib, cs, mosi, miso, sck
+cc3100 _cc3100module(p16, p17, p9, p10, p8, SPI(p5, p6, p7));//LPC1768  irq, nHib, cs, mosi, miso, sck
+#elif (THIS_BOARD == Seeed_Arch_Max)
+class cc3100 _cc3100module(PE_5, PE_4, PE_6, SPI(PB_5, PB_4, PB_3));
+#elif (THIS_BOARD == ST_MBED_NUCLEOF103)
+class cc3100 _cc3100module(PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF103  irq, nHib, cs, mosi, miso, sck
+#elif (THIS_BOARD == ST_MBED_NUCLEOF411)
+class cc3100 _cc3100module(PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF411  irq, nHib, cs, mosi, miso, sck
+#elif (THIS_BOARD == ST_MBED_NUCLEOF401)
+class cc3100 _cc3100module(PA_8, PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF401  irq, nHib, cs, mosi, miso, sck
+#elif (THIS_BOARD == EA_MBED_LPC4088)
+class cc3100 _cc3100module(p14, p15, p9, p10, p8, SPI(p5, p6, p7));//LPC4088  irq, nHib, cs, mosi, miso, sck
+#elif (THIS_BOARD == LPCXpresso4337)
+class cc3100 _cc3100module(P2_12, P2_9, P2_2, P3_5, P1_2, SPI(P1_4, P1_3, PF_4));//LPCXpresso4337  irq, nHib, cs, mosi, miso, sck
+#endif
+
+namespace mbed_mqtt {
+
+#ifndef CFG_SL_CL_BUF_LEN
+#define BUF_LEN          1024 /*Buffer length*/
+#else
+#define BUF_LEN          CFG_SL_CL_BUF_LEN
+#endif
+
+#ifndef CFG_SL_CL_MAX_MQP
+#define MAX_MQP          2 /* # of buffers */
+#else
+#define MAX_MQP          CFG_SL_CL_MAX_MQP
+#endif
+
+#ifndef CFG_SL_CL_STACK
+#define OSI_STACK_SIZE 2048
+#else
+#define OSI_STACK_SIZE CFG_SL_CL_STACK
+#endif
+//*****************************************************************************
+// global variables used
+//*****************************************************************************
+
+
+struct sl_client_ctx {
+        /* Client information details */
+        char *client_id;
+        char *usr_name;
+        char *usr_pwd;
+        SlMqttWill_t mqtt_will;
+
+        /* Client management variables */
+        bool in_use;
+        char awaited_ack;
+        uint16_t conn_ack;
+
+        /*Sync Object used to ensure single inflight message -
+          used in blocking mode to wait on ack*/
+        _SlSyncObj_t ack_sync_obj;
+        /* Suback QoS pointer to store passed by application */
+        char *suback_qos;
+
+        /* Application information */
+        void* app_hndl;
+        SlMqttClientCbs_t app_cbs;
+
+        /* Library information */
+        void *cli_hndl;
+        bool blocking_send;
+};
+
+#ifndef CFG_CL_MQTT_CTXS
+#define MAX_SIMULTANEOUS_SERVER_CONN 4
+#else
+#define MAX_SIMULTANEOUS_SERVER_CONN CFG_MQTT_CL_CTXS
+#endif
+
+
+static struct sl_client_ctx sl_cli_ctx[MAX_SIMULTANEOUS_SERVER_CONN];
+
+/* Lock Object to be passed to the MQTT lib */
+_SlLockObj_t mqtt_lib_lockobj;
+
+/* Creating a pool of MQTT coonstructs that can be used by the MQTT Lib
+     mqp_vec =>pointer to a pool of the mqtt packet constructs
+      buf_vec => the buffer area which is attached with each of mqp_vec*/
+DEFINE_MQP_BUF_VEC(MAX_MQP, mqp_vec, BUF_LEN, buf_vec);
+
+/*Task Priority and Response Time*/
+uint32_t g_wait_secs;
+
+bool g_multi_srvr_conn = false;
+
+/* Receive task handle */
+OsiTaskHandle g_rx_task_hndl;
+
+/* Synchronization object used between the Rx and App task */
+_SlSyncObj_t g_rx_tx_sync_obj;
+
+#define RX_TX_SIGNAL_WAIT()     _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_WAIT_FOREVER)
+#define RX_TX_SIGNAL_POST()     _cc3100module._nonos.sl_SyncObjSignal(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE)
+
+/* MQTT Quality of Service */
+static const enum mqtt_qos qos[] ={
+        MQTT_QOS0,
+        MQTT_QOS1,
+        MQTT_QOS2
+};
+
+/* Network Services specific to cc3200*/
+struct device_net_services net={&comm_open,&tcp_send,&tcp_recv,&send_dest,&recv_from,&comm_close,
+                                        &tcp_listen,&tcp_accept,&tcp_select,&rtc_secs};
+        
+/* Defining Retain Flag. Getting retain flag bit from fixed header*/
+#define MQP_PUB_RETAIN(mqp)    ( ((mqp->fh_byte1 ) & 0x01)?true:false )
+
+/*Defining Duplicate flag. Getting Duplicate flag bit from fixed header*/
+#define MQP_PUB_DUP(mqp)        (((mqp->fh_byte1>>3) & 0x01)?true:false )
+
+/*Defining QOS value. Getting QOS value from fixed header*/
+#define MQP_PUB_QOS(mqp)        ((mqp->fh_byte1 >>1)  & 0x03)
+
+#define ACK_RX_SIGNAL_WAIT(ack_sync_obj)     _cc3100module._nonos.sl_SyncObjWait(ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_WAIT_FOREVER)    
+#define ACK_RX_SIGNAL_POST(ack_sync_obj)     _cc3100module._nonos.sl_SyncObjSignal(ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE)
+
+#define STR2UTF_CONV(utf_s, str) {utf_s.buffer = (char *)str; utf_s.length = strlen(str);}
+
+/*Defining Event Messages*/
+#define MQTT_ACK "Ack Received from server"
+#define MQTT_ERROR "Connection Lost with broker"
+
+//*****************************************************************************
+// process_notify_ack_cb 
+//*****************************************************************************
+static void 
+process_notify_ack_cb(void *app, uint8_t msg_type, uint16_t msg_id, uint8_t *buf, uint32_t len)
+{
+
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
+        int32_t loopcnt;
+
+        switch(msg_type) {
+
+        case MQTT_CONNACK:
+                client_ctx->conn_ack = ((buf[0] << 8) | buf[1]);
+                client_ctx->awaited_ack = 0;
+                ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
+        break;
+        case MQTT_SUBACK:
+                if(true == client_ctx->blocking_send) {
+                        for(loopcnt = 0; loopcnt < len; loopcnt++) {
+                                client_ctx->suback_qos[loopcnt] = buf[loopcnt];
+                        }
+                                
+                        if(client_ctx->awaited_ack == msg_type) {
+                                client_ctx->awaited_ack = 0;
+                                ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
+                        }
+                } else {
+                        client_ctx->app_cbs.sl_ExtLib_MqttEvent(
+                                client_ctx->app_hndl,
+                                SL_MQTT_CL_EVT_SUBACK, buf, len);
+                }
+        break;
+        /* Error returns --- TBD */
+        default:
+                if(true == client_ctx->blocking_send) {
+                        if(client_ctx->awaited_ack == msg_type) {
+                                client_ctx->awaited_ack = 0;
+                                ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
+                        }
+                } else {
+                        client_ctx->app_cbs.sl_ExtLib_MqttEvent(
+                                client_ctx->app_hndl,
+                                msg_type, MQTT_ACK, strlen(MQTT_ACK));
+                }
+        break;
+
+        }
+       
+        return ;
+}
+
+//*****************************************************************************
+// process_publish_rx_cb 
+//*****************************************************************************
+static bool 
+process_publish_rx_cb(void *app, bool dup, enum mqtt_qos qos, bool retain, 
+                   struct mqtt_packet *mqp)
+{
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
+        /*If incoming message is a publish message from broker */
+        /* Invokes the event handler with topic,topic length,payload and 
+           payload length values*/
+        client_ctx->app_cbs.sl_ExtLib_MqttRecv(client_ctx->app_hndl,
+                               (char const*)MQP_PUB_TOP_BUF(mqp),
+                               MQP_PUB_TOP_LEN(mqp),MQP_PUB_PAY_BUF(mqp),
+                               MQP_PUB_PAY_LEN(mqp), dup,
+                               MQP_PUB_QOS(mqp), retain);
+        return true;
+}
+
+//*****************************************************************************
+// process_disconn_cb
+//*****************************************************************************
+static void process_disconn_cb(void *app, int32_t cause)
+{
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
+
+        if(client_ctx->awaited_ack != 0) {
+                client_ctx->awaited_ack = MQTT_DISCONNECT;
+                ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
+        } else {
+                if(false == client_ctx->blocking_send) {
+                        /* Invoke the disconnect callback */
+                        client_ctx->app_cbs.sl_ExtLib_MqttDisconn(
+                                               client_ctx->app_hndl);
+                }
+        }
+        return;
+}
+
+
+//*****************************************************************************
+// Receive Task. Invokes in the context of a task 
+//*****************************************************************************
+void 
+VMqttRecvTask(void *pArgs)  
+{
+
+        int32_t retval;
+        do {
+                if(false == g_multi_srvr_conn) {
+                        /* wait on broker connection */
+                        RX_TX_SIGNAL_WAIT();
+                        while(1) {
+                                /* Forcing to use the index 0 always - caution */
+                                retval = mqtt_client_ctx_run(
+                                                sl_cli_ctx[0].cli_hndl, 
+                                                g_wait_secs);
+                                if((retval < 0) && 
+                                        (retval != MQP_ERR_TIMEOUT)) {
+                                        break;
+                                }
+                        }
+                } else {
+                        mqtt_client_run(g_wait_secs);
+                }
+        }while(1);
+}
+
+static struct sl_client_ctx *get_available_clictx_mem()
+{
+        int32_t loopcnt, max_store;
+
+        if(false == g_multi_srvr_conn) {
+                max_store = 1;
+        } else {
+                max_store = MAX_SIMULTANEOUS_SERVER_CONN;
+        }
+
+        for(loopcnt = 0; loopcnt < max_store; loopcnt++) {
+                if(false == sl_cli_ctx[loopcnt].in_use) {
+                        sl_cli_ctx[loopcnt].in_use = true;
+                        return(&(sl_cli_ctx[loopcnt]));
+                }
+        }
+
+        return NULL;
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientCtxCreate
+//*****************************************************************************
+void *sl_ExtLib_MqttClientCtxCreate(const SlMqttClientCtxCfg_t *ctx_cfg,
+                                      const SlMqttClientCbs_t *msg_cbs,
+                                      void *app_hndl)
+{
+
+        struct sl_client_ctx *client_ctx_ptr;
+        struct mqtt_client_ctx_cfg lib_ctx_cfg;
+        struct mqtt_client_ctx_cbs lib_cli_cbs;
+        struct secure_conn lib_nw_security;
+        int32_t retval;
+       
+        /* Get a client context storage area */
+        client_ctx_ptr = get_available_clictx_mem();
+        if(client_ctx_ptr == NULL) {
+                return NULL;
+        }
+        
+        /* Create the sync object to signal on arrival of ACK packet */
+        _cc3100module._nonos.sl_SyncObjCreate(&client_ctx_ptr->ack_sync_obj, "AckSyncObject");
+        _cc3100module._nonos.sl_SyncObjWait(&client_ctx_ptr->ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_NO_WAIT);
+        
+        /* Store the application handle */
+        client_ctx_ptr->app_hndl = app_hndl;
+        /* Initialize the ACK awaited */
+        client_ctx_ptr->awaited_ack = 0;
+        
+        /* Store the application callbacks, to be invoked later */
+        client_ctx_ptr->app_cbs.sl_ExtLib_MqttRecv = \
+                                        msg_cbs->sl_ExtLib_MqttRecv;
+        client_ctx_ptr->app_cbs.sl_ExtLib_MqttEvent = \
+                                        msg_cbs->sl_ExtLib_MqttEvent;
+        client_ctx_ptr->app_cbs.sl_ExtLib_MqttDisconn = \
+                                        msg_cbs->sl_ExtLib_MqttDisconn;
+
+        /* Initialize the client lib */
+        lib_ctx_cfg.config_opts = ctx_cfg->mqtt_mode31;
+        
+        if(true == g_multi_srvr_conn) {
+                lib_ctx_cfg.config_opts |=
+                        (MQTT_CFG_APP_HAS_RTSK | MQTT_CFG_MK_GROUP_CTX);
+        } else {
+                lib_ctx_cfg.config_opts |= MQTT_CFG_APP_HAS_RTSK;
+        }
+        
+        /* get the network connection options */
+        client_ctx_ptr->blocking_send = ctx_cfg->blocking_send;
+        lib_ctx_cfg.nwconn_opts = ctx_cfg->server_info.netconn_flags;
+        lib_ctx_cfg.nwconn_opts |= DEV_NETCONN_OPT_TCP;
+        lib_ctx_cfg.server_addr = (char*)ctx_cfg->server_info.server_addr;
+        lib_ctx_cfg.port_number = ctx_cfg->server_info.port_number;
+        /*initialize secure socket parameters */
+        if(ctx_cfg->server_info.netconn_flags & SL_MQTT_NETCONN_SEC) {
+        		lib_ctx_cfg.nw_security = &lib_nw_security;
+                /* initialize secure socket parameters */
+                lib_ctx_cfg.nw_security->cipher = 
+                        (void*)&(ctx_cfg->server_info.cipher);
+                lib_ctx_cfg.nw_security->method = 
+                        (void*)&(ctx_cfg->server_info.method);
+                lib_ctx_cfg.nw_security->n_file = 
+                        ctx_cfg->server_info.n_files;
+                lib_ctx_cfg.nw_security->files = 
+                        (char**)ctx_cfg->server_info.secure_files;
+        }
+        else {
+                lib_ctx_cfg.nw_security=NULL;
+        }
+
+        lib_cli_cbs.publish_rx = process_publish_rx_cb;
+        lib_cli_cbs.ack_notify = process_notify_ack_cb;
+        lib_cli_cbs.disconn_cb = process_disconn_cb;
+        
+        retval = mqtt_client_ctx_create(&lib_ctx_cfg,
+                                        &lib_cli_cbs, client_ctx_ptr,
+                                        &client_ctx_ptr->cli_hndl);
+        
+        if(retval < 0) {
+                Uart_Write((uint8_t*)"mqtt_client_ctx_create failed\r\n");
+                client_ctx_ptr->in_use = false;
+                return NULL;
+        }
+        
+        return (void*)client_ctx_ptr;
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientCtxDelete
+//*****************************************************************************
+int32_t sl_ExtLib_MqttClientCtxDelete(void *cli_ctx)
+{
+        struct sl_client_ctx *client_ctx=(struct sl_client_ctx *)cli_ctx;
+        int32_t retval;
+
+        retval = mqtt_client_ctx_delete(client_ctx->cli_hndl);
+        if(retval >= 0) {
+                /* Check for more paramaters --- TBD */
+                _cc3100module._nonos.sl_SyncObjDelete(&client_ctx->ack_sync_obj, 0);
+                /* Free up the context */
+                memset(client_ctx, 0, sizeof(struct sl_client_ctx));
+        }
+
+        return (retval < 0)? -1: 0;
+}
+
+void
+mutex_lockup(void *mqtt_lib_lock)
+{
+        _cc3100module._nonos.sl_LockObjLock((_SlLockObj_t*)mqtt_lib_lock, SL_OS_WAIT_FOREVER, NON_OS_LOCK_OBJ_LOCK_VALUE, SL_OS_WAIT_FOREVER);
+}
+
+void
+mutex_unlock(void *mqtt_lib_lock)
+{
+        _cc3100module._nonos.sl_LockObjUnlock((_SlLockObj_t*)mqtt_lib_lock, NON_OS_LOCK_OBJ_UNLOCK_VALUE);
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientInit
+//*****************************************************************************
+int32_t sl_ExtLib_MqttClientInit(const SlMqttClientLibCfg_t  *cfg)
+{
+
+        struct mqtt_client_lib_cfg lib_cfg;
+        /* Initialize the control variables */
+        memset(sl_cli_ctx, 0, sizeof(sl_cli_ctx));
+
+        /* Setup the MQTT client lib configurations */
+        lib_cfg.loopback_port = cfg->loopback_port;
+        /* Initializing the sync object */
+        g_rx_tx_sync_obj = 0;
+
+        if(cfg->loopback_port) {
+                g_multi_srvr_conn = true;
+                lib_cfg.grp_uses_cbfn = true;   /* Does use the group callback func */
+        } else {
+                g_multi_srvr_conn = false;
+                lib_cfg.grp_uses_cbfn = false;   /* Doesnt use the group callback func */
+                /* Create the sync object between Rx and App tasks */
+                _cc3100module._nonos.sl_SyncObjCreate(&g_rx_tx_sync_obj, "RxTxSyncObject");
+                _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_NO_WAIT);
+        }
+        
+        g_wait_secs = cfg->resp_time;
+        /* Setup the mutex operations */
+        _cc3100module._nonos.sl_LockObjCreate(&mqtt_lib_lockobj,"MQTT Lock");
+        lib_cfg.mutex = (void *)&mqtt_lib_lockobj;
+        lib_cfg.mutex_lockin = mutex_lockup;
+        lib_cfg.mutex_unlock = mutex_unlock;
+        /* hooking DBG print function */
+        lib_cfg.debug_printf = cfg->dbg_print;
+        /* Initialize client library */
+        if(mqtt_client_lib_init(&lib_cfg) < 0) {
+                Uart_Write((uint8_t*)"mqtt_client_lib_init failed\r\n");
+                return -1;
+        }
+
+        /* provide MQTT Lib information to create a pool of MQTT constructs. */
+        mqtt_client_buffers_register(MAX_MQP,mqp_vec,BUF_LEN,&buf_vec[0][0]);
+        /* Register network services speicific to CC3200 */
+        mqtt_client_net_svc_register(&net);
+        /* start the receive task */
+        osi_TaskCreate( VMqttRecvTask, 
+                (const signed char *) "MQTTRecv",
+                OSI_STACK_SIZE,
+                NULL, 
+                cfg->rx_tsk_priority, &g_rx_task_hndl );
+                
+        return 0;
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientExit
+//*****************************************************************************
+int32_t 
+sl_ExtLib_MqttClientExit()
+{
+        int32_t retval;
+
+        /* Deinitialize the MQTT client lib */
+        retval = mqtt_client_lib_exit();
+        if(retval >= 0) {
+                if(mqtt_lib_lockobj != NULL)
+                {
+                        /* Delete the MQTT lib lock object */
+                        _cc3100module._nonos.sl_LockObjDelete(&mqtt_lib_lockobj, 0);
+                }
+                if(g_rx_task_hndl != NULL)
+                {
+                        /* Delete the Rx Task */
+                        osi_TaskDelete(&g_rx_task_hndl);
+                }
+                
+                if(g_rx_tx_sync_obj != NULL)
+                {
+                        /* Delete the Rx-Tx task sync object */
+                        _cc3100module._nonos.sl_SyncObjDelete(&g_rx_tx_sync_obj, 0);
+                }
+                
+                g_rx_task_hndl = NULL;
+                g_rx_tx_sync_obj = NULL;
+                mqtt_lib_lockobj = NULL;
+        }
+        
+        return (retval < 0)? -1: 0;
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientSet
+//*****************************************************************************
+int32_t 
+sl_ExtLib_MqttClientSet(void *app, int32_t param, const void *value, uint32_t len)
+{
+
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
+        switch(param)
+        {
+        case SL_MQTT_PARAM_CLIENT_ID:
+                /* Save the reference to the Client ID */
+                client_ctx->client_id =(char*)value;
+                break;
+
+        case SL_MQTT_PARAM_USER_NAME:
+                /* Save the reference to the Username */
+                client_ctx->usr_name =(char*)value;
+                break;
+
+        case SL_MQTT_PARAM_PASS_WORD:
+                /* Save the reference to the password */
+                client_ctx->usr_pwd =(char*)value;
+                break;
+
+        case SL_MQTT_PARAM_WILL_PARAM:
+                /* Save the reference to will parameters*/
+                client_ctx->mqtt_will = *((SlMqttWill_t*)value);
+                break;
+        default:
+                break;
+        }
+
+        return 0;
+   
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientConnect
+//*****************************************************************************
+int32_t
+sl_ExtLib_MqttClientConnect(void *cli_ctx, bool clean, uint16_t keep_alive_time)
+{ 
+
+    
+        int32_t ret = -1;
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
+
+        /*utf8 strings into which client info will be stored*/
+        struct utf8_string client_id, username, usr_pwd, will_topic, will_msg;
+        struct utf8_string *usrname = NULL, *usrpasswd = NULL, *clientid=NULL;
+
+        /* Provide Client ID,user name and password into MQTT Library */
+        if(client_ctx->client_id != NULL) {
+            STR2UTF_CONV(client_id, client_ctx->client_id);
+            clientid = &client_id;
+        }
+        
+        if(client_ctx->usr_name != NULL) {
+                STR2UTF_CONV(username, client_ctx->usr_name);
+                usrname = &username;
+        }
+        if(client_ctx->usr_pwd != NULL) {
+                STR2UTF_CONV(usr_pwd, client_ctx->usr_pwd);
+                usrpasswd = &usr_pwd;
+        }
+        ret = mqtt_client_ctx_info_register(client_ctx->cli_hndl,
+                                clientid, usrname, usrpasswd);
+        if(ret < 0) {
+                goto mqtt_connect_exit1;
+        }
+        
+        /* Register a will message, if specified, into MQTT Library */
+        if(NULL != client_ctx->mqtt_will.will_topic ) {
+                STR2UTF_CONV(will_topic, client_ctx->mqtt_will.will_topic);
+                STR2UTF_CONV(will_msg, client_ctx->mqtt_will.will_msg);
+                ret = mqtt_client_ctx_will_register(client_ctx->cli_hndl,
+                                        &will_topic, &will_msg,
+                                        qos[client_ctx->mqtt_will.will_qos],
+                                        client_ctx->mqtt_will.retain);
+                if(ret < 0) {
+                        Uart_Write((uint8_t*)"mqtt_client_ctx_will_register fail\r\n");
+                        goto mqtt_connect_exit1;
+                }
+        }
+        
+        client_ctx->awaited_ack = MQTT_CONNACK;
+        client_ctx->conn_ack = 0;
+    
+        /* Connect to the server */
+        ret = mqtt_connect_msg_send(client_ctx->cli_hndl, clean, keep_alive_time);
+        /*Network or Socket Error happens*/
+        if(ret < 0) {
+                Uart_Write((uint8_t*)"mqtt_connect_msg_send failed \r\n");
+                goto mqtt_connect_exit1;
+        }
+
+        if(false == g_multi_srvr_conn) {
+                /* Unblock the receive task here */        
+                RX_TX_SIGNAL_POST();
+        }
+        
+        /* Wait for a CONNACK here */
+        ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
+        
+        if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
+                Uart_Write((uint8_t*)"sl_ExtLib_MqttClientConnect fail\r\n");        
+                ret = -1;
+        } else {
+                ret = (int32_t)client_ctx->conn_ack;
+        }
+
+mqtt_connect_exit1:
+
+        client_ctx->awaited_ack = 0;
+        client_ctx->conn_ack = 0;
+        return (ret < 0)? -1: ret;
+}
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientDisconnect
+//*****************************************************************************
+int32_t
+sl_ExtLib_MqttClientDisconnect(void *cli_ctx)
+{
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
+        int32_t retval;
+
+        if(!(mqtt_client_is_connected(client_ctx->cli_hndl)))
+                return -1;
+
+        client_ctx->awaited_ack = MQTT_DISCONNECT;
+
+        /* send the disconnect command. */
+        retval = mqtt_disconn_send(client_ctx->cli_hndl);
+        
+        if(retval >= 0) {
+                /* wait on Rx task to acknowledge */
+                ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
+        }
+        client_ctx->awaited_ack = 0;
+
+        return (retval < 0)? -1: 0;
+}
+
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientSub
+//*****************************************************************************
+int32_t 
+sl_ExtLib_MqttClientSub(void *cli_ctx, char* const *topics,
+                        uint8_t *qos_level, int32_t count)
+{
+#define MAX_SIMULTANEOUS_SUB_TOPICS 4
+
+        int32_t ret = -1, i;
+
+        struct utf8_strqos qos_topics[MAX_SIMULTANEOUS_SUB_TOPICS];
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
+
+        if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_SUB_TOPICS)) {
+                goto mqtt_sub_exit1;  /* client not connected*/
+        }
+
+        for(i = 0; i < count; i++) {
+                qos_topics[i].buffer = topics[i];
+                qos_topics[i].qosreq = qos[qos_level[i]];
+                qos_topics[i].length = strlen(topics[i]);
+        }
+
+        /* Set up all variables to receive an ACK for the message to be sent*/
+        if(true == client_ctx->blocking_send) {      
+                /* Update the qos_level to be in-out parameter */
+                client_ctx->suback_qos = (char*)qos_level;
+                client_ctx->awaited_ack = MQTT_SUBACK;
+        }
+
+        /* Send the subscription MQTT message */
+        ret = mqtt_sub_msg_send(client_ctx->cli_hndl, qos_topics, count);
+        if(ret < 0) {
+                goto mqtt_sub_exit1;
+        }
+
+        if(true == client_ctx->blocking_send) {
+                ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
+                if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
+                        ret = -1;
+                }
+        }
+
+mqtt_sub_exit1:
+        client_ctx->awaited_ack = 0;
+        return (ret < 0)? -1: 0;
+}
+
+
+//*****************************************************************************
+// sl_ExtLib_MqttClientUnsub
+//*****************************************************************************
+int32_t
+sl_ExtLib_MqttClientUnsub(void *cli_ctx, char* const *topics, int32_t count)
+{
+#define MAX_SIMULTANEOUS_UNSUB_TOPICS 4
+
+        int32_t ret = -1, i;
+        struct utf8_string unsub_topics[MAX_SIMULTANEOUS_UNSUB_TOPICS];
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
+
+        if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_UNSUB_TOPICS)) {
+                goto mqtt_unsub_exit1; /* Check client status */
+        }
+
+        for(i = 0; i < count; i++) {
+                STR2UTF_CONV(unsub_topics[i], topics[i]);
+        }
+
+        /* Set up all variables to receive an ACK for the message to be sent*/
+        if(true == client_ctx->blocking_send) {      
+                client_ctx->awaited_ack = MQTT_UNSUBACK;
+        }
+
+        /* Send the unsubscription MQTT message */
+        ret = mqtt_unsub_msg_send(client_ctx->cli_hndl, unsub_topics, count);
+        if(ret < 0) {
+                goto mqtt_unsub_exit1;
+        }
+
+        if(true == client_ctx->blocking_send) {
+                ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
+                if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
+                        ret = -1;
+                }
+        }
+
+mqtt_unsub_exit1:
+        client_ctx->awaited_ack = 0;
+        return (ret < 0)? -1: 0;
+}
+//*****************************************************************************
+// sl_ExtLib_MqttClientSend
+//*****************************************************************************
+int32_t 
+sl_ExtLib_MqttClientSend(void *cli_ctx, const char *topic,
+                          const void *data, int32_t len, 
+                          char qos_level, bool retain)
+{
+
+        int32_t ret = -1;
+        struct utf8_string topic_utf8;
+        struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
+
+        /* Check whether Client is connected or not? */
+        if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) {
+                Uart_Write((uint8_t*)"mqtt client is not connected\n\r");
+                return ret;
+        }
+
+        STR2UTF_CONV(topic_utf8, topic);
+
+        /*Set up all variables to receive an ACK for the message to be sent*/
+        if(MQTT_QOS1 == qos[qos_level]) {
+                client_ctx->awaited_ack = MQTT_PUBACK;
+        } else if(MQTT_QOS2 == qos[qos_level]) {
+                client_ctx->awaited_ack = MQTT_PUBCOMP;
+        }
+        /*publish the message*/
+        ret = mqtt_client_pub_msg_send(client_ctx->cli_hndl, &topic_utf8,
+                                        (const uint8_t*)data, len,
+                                        qos[qos_level], retain);
+        if(ret < 0) {
+                Uart_Write((uint8_t*)"mqtt_client_pub_msg_send failed\n\r");
+                goto mqtt_pub_exit1;
+        }
+
+        if(MQTT_QOS0 != qos[qos_level]) {
+                if(true == client_ctx->blocking_send) {
+                        ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
+                        if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
+                                Uart_Write((uint8_t*)"MQTT_DISCONNECT\n\r");
+                                ret = -1;
+                        }
+                }
+        }
+
+mqtt_pub_exit1:
+        client_ctx->awaited_ack = 0;
+        return (ret < 0)? -1: 0;
+}
+
+}//namespace mbed_mqtt 
+