Update revision to use TI's mqtt and Freertos.

Dependencies:   mbed client server

Fork of cc3100_Test_mqtt_CM3 by David Fletcher

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers sl_mqtt_client.cpp Source File

sl_mqtt_client.cpp

00001 /******************************************************************************
00002 *
00003 *   Copyright (C) 2014 Texas Instruments Incorporated
00004 *
00005 *   All rights reserved. Property of Texas Instruments Incorporated.
00006 *   Restricted rights to use, duplicate or disclose this code are
00007 *   granted through contract.
00008 *
00009 *   The program may not be used without the written permission of
00010 *   Texas Instruments Incorporated or against the terms and conditions
00011 *   stipulated in the agreement under which this program has been supplied,
00012 *   and under no circumstances can it be used with non-TI connectivity device.
00013 *
00014 ******************************************************************************/
00015 
00016 #include "sl_mqtt_client.h"
00017 #include "cc31xx_sl_net.h"
00018 //#include "cc3100_nonos.h"
00019 #include "myBoardInit.h"
00020 #include "cli_uart.h"
00021 #include "osi.h"
00022 
00023 using namespace mbed_cc3100;
00024 
00025 #if (THIS_BOARD == MBED_BOARD_LPC1768)
00026 //cc3100 _cc3100module(p9, p10, p8, SPI(p11, p12, p13));//LPC1768  irq, nHib, cs, mosi, miso, sck
00027 cc3100 _cc3100module(p16, p17, p9, p10, p8, SPI(p5, p6, p7));//LPC1768  irq, nHib, cs, mosi, miso, sck
00028 #elif (THIS_BOARD == Seeed_Arch_Max)
00029 class cc3100 _cc3100module(PE_5, PE_4, PE_6, SPI(PB_5, PB_4, PB_3));
00030 #elif (THIS_BOARD == ST_MBED_NUCLEOF103)
00031 class cc3100 _cc3100module(PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF103  irq, nHib, cs, mosi, miso, sck
00032 #elif (THIS_BOARD == ST_MBED_NUCLEOF411)
00033 class cc3100 _cc3100module(PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF411  irq, nHib, cs, mosi, miso, sck
00034 #elif (THIS_BOARD == ST_MBED_NUCLEOF401)
00035 class cc3100 _cc3100module(PA_8, PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF401  irq, nHib, cs, mosi, miso, sck
00036 #elif (THIS_BOARD == EA_MBED_LPC4088)
00037 class cc3100 _cc3100module(p14, p15, p9, p10, p8, SPI(p5, p6, p7));//LPC4088  irq, nHib, cs, mosi, miso, sck
00038 #elif (THIS_BOARD == LPCXpresso4337)
00039 class cc3100 _cc3100module(P2_12, P2_9, P2_2, P3_5, P1_2, SPI(P1_4, P1_3, PF_4));//LPCXpresso4337  irq, nHib, cs, mosi, miso, sck
00040 #endif
00041 
00042 namespace mbed_mqtt {
00043 
00044 #ifndef CFG_SL_CL_BUF_LEN
00045 #define BUF_LEN          1024 /*Buffer length*/
00046 #else
00047 #define BUF_LEN          CFG_SL_CL_BUF_LEN
00048 #endif
00049 
00050 #ifndef CFG_SL_CL_MAX_MQP
00051 #define MAX_MQP          2 /* # of buffers */
00052 #else
00053 #define MAX_MQP          CFG_SL_CL_MAX_MQP
00054 #endif
00055 
00056 #ifndef CFG_SL_CL_STACK
00057 #define OSI_STACK_SIZE 2048
00058 #else
00059 #define OSI_STACK_SIZE CFG_SL_CL_STACK
00060 #endif
00061 //*****************************************************************************
00062 // global variables used
00063 //*****************************************************************************
00064 
00065 
00066 struct sl_client_ctx {
00067         /* Client information details */
00068         char *client_id;
00069         char *usr_name;
00070         char *usr_pwd;
00071         SlMqttWill_t mqtt_will;
00072 
00073         /* Client management variables */
00074         bool in_use;
00075         char awaited_ack;
00076         uint16_t conn_ack;
00077 
00078         /*Sync Object used to ensure single inflight message -
00079           used in blocking mode to wait on ack*/
00080         _SlSyncObj_t ack_sync_obj;
00081         /* Suback QoS pointer to store passed by application */
00082         char *suback_qos;
00083 
00084         /* Application information */
00085         void* app_hndl;
00086         SlMqttClientCbs_t app_cbs;
00087 
00088         /* Library information */
00089         void *cli_hndl;
00090         bool blocking_send;
00091 };
00092 
00093 #ifndef CFG_CL_MQTT_CTXS
00094 #define MAX_SIMULTANEOUS_SERVER_CONN 4
00095 #else
00096 #define MAX_SIMULTANEOUS_SERVER_CONN CFG_MQTT_CL_CTXS
00097 #endif
00098 
00099 
00100 static struct sl_client_ctx sl_cli_ctx[MAX_SIMULTANEOUS_SERVER_CONN];
00101 
00102 /* Lock Object to be passed to the MQTT lib */
00103 _SlLockObj_t mqtt_lib_lockobj;
00104 
00105 /* Creating a pool of MQTT coonstructs that can be used by the MQTT Lib
00106      mqp_vec =>pointer to a pool of the mqtt packet constructs
00107       buf_vec => the buffer area which is attached with each of mqp_vec*/
00108 DEFINE_MQP_BUF_VEC(MAX_MQP, mqp_vec, BUF_LEN, buf_vec);
00109 
00110 /*Task Priority and Response Time*/
00111 uint32_t g_wait_secs;
00112 
00113 bool g_multi_srvr_conn = false;
00114 
00115 /* Receive task handle */
00116 OsiTaskHandle g_rx_task_hndl;
00117 
00118 /* Synchronization object used between the Rx and App task */
00119 _SlSyncObj_t g_rx_tx_sync_obj;
00120 
00121 #define RX_TX_SIGNAL_WAIT()     _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_WAIT_FOREVER)
00122 #define RX_TX_SIGNAL_POST()     _cc3100module._nonos.sl_SyncObjSignal(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE)
00123 
00124 /* MQTT Quality of Service */
00125 static const enum mqtt_qos qos[] ={
00126         MQTT_QOS0,
00127         MQTT_QOS1,
00128         MQTT_QOS2
00129 };
00130 
00131 /* Network Services specific to cc3200*/
00132 struct device_net_services net={&comm_open,&tcp_send,&tcp_recv,&send_dest,&recv_from,&comm_close,
00133                                         &tcp_listen,&tcp_accept,&tcp_select,&rtc_secs};
00134         
00135 /* Defining Retain Flag. Getting retain flag bit from fixed header*/
00136 #define MQP_PUB_RETAIN(mqp)    ( ((mqp->fh_byte1 ) & 0x01)?true:false )
00137 
00138 /*Defining Duplicate flag. Getting Duplicate flag bit from fixed header*/
00139 #define MQP_PUB_DUP(mqp)        (((mqp->fh_byte1>>3) & 0x01)?true:false )
00140 
00141 /*Defining QOS value. Getting QOS value from fixed header*/
00142 #define MQP_PUB_QOS(mqp)        ((mqp->fh_byte1 >>1)  & 0x03)
00143 
00144 #define ACK_RX_SIGNAL_WAIT(ack_sync_obj)     _cc3100module._nonos.sl_SyncObjWait(ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_WAIT_FOREVER)    
00145 #define ACK_RX_SIGNAL_POST(ack_sync_obj)     _cc3100module._nonos.sl_SyncObjSignal(ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE)
00146 
00147 #define STR2UTF_CONV(utf_s, str) {utf_s.buffer = (char *)str; utf_s.length = strlen(str);}
00148 
00149 /*Defining Event Messages*/
00150 #define MQTT_ACK "Ack Received from server"
00151 #define MQTT_ERROR "Connection Lost with broker"
00152 
00153 //*****************************************************************************
00154 // process_notify_ack_cb 
00155 //*****************************************************************************
00156 static void 
00157 process_notify_ack_cb(void *app, uint8_t msg_type, uint16_t msg_id, uint8_t *buf, uint32_t len)
00158 {
00159 
00160         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00161         int32_t loopcnt;
00162 
00163         switch(msg_type) {
00164 
00165         case MQTT_CONNACK:
00166                 client_ctx->conn_ack = ((buf[0] << 8) | buf[1]);
00167                 client_ctx->awaited_ack = 0;
00168                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00169         break;
00170         case MQTT_SUBACK:
00171                 if(true == client_ctx->blocking_send) {
00172                         for(loopcnt = 0; loopcnt < len; loopcnt++) {
00173                                 client_ctx->suback_qos[loopcnt] = buf[loopcnt];
00174                         }
00175                                 
00176                         if(client_ctx->awaited_ack == msg_type) {
00177                                 client_ctx->awaited_ack = 0;
00178                                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00179                         }
00180                 } else {
00181                         client_ctx->app_cbs.sl_ExtLib_MqttEvent(
00182                                 client_ctx->app_hndl,
00183                                 SL_MQTT_CL_EVT_SUBACK, buf, len);
00184                 }
00185         break;
00186         /* Error returns --- TBD */
00187         default:
00188                 if(true == client_ctx->blocking_send) {
00189                         if(client_ctx->awaited_ack == msg_type) {
00190                                 client_ctx->awaited_ack = 0;
00191                                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00192                         }
00193                 } else {
00194                         client_ctx->app_cbs.sl_ExtLib_MqttEvent(
00195                                 client_ctx->app_hndl,
00196                                 msg_type, MQTT_ACK, strlen(MQTT_ACK));
00197                 }
00198         break;
00199 
00200         }
00201        
00202         return ;
00203 }
00204 
00205 //*****************************************************************************
00206 // process_publish_rx_cb 
00207 //*****************************************************************************
00208 static bool 
00209 process_publish_rx_cb(void *app, bool dup, enum mqtt_qos qos, bool retain, 
00210                    struct mqtt_packet *mqp)
00211 {
00212         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00213         /*If incoming message is a publish message from broker */
00214         /* Invokes the event handler with topic,topic length,payload and 
00215            payload length values*/
00216         client_ctx->app_cbs.sl_ExtLib_MqttRecv(client_ctx->app_hndl,
00217                                (char const*)MQP_PUB_TOP_BUF(mqp),
00218                                MQP_PUB_TOP_LEN(mqp),MQP_PUB_PAY_BUF(mqp),
00219                                MQP_PUB_PAY_LEN(mqp), dup,
00220                                MQP_PUB_QOS(mqp), retain);
00221         return true;
00222 }
00223 
00224 //*****************************************************************************
00225 // process_disconn_cb
00226 //*****************************************************************************
00227 static void process_disconn_cb(void *app, int32_t cause)
00228 {
00229         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00230 
00231         if(client_ctx->awaited_ack != 0) {
00232                 client_ctx->awaited_ack = MQTT_DISCONNECT;
00233                 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj);
00234         } else {
00235                 if(false == client_ctx->blocking_send) {
00236                         /* Invoke the disconnect callback */
00237                         client_ctx->app_cbs.sl_ExtLib_MqttDisconn(
00238                                                client_ctx->app_hndl);
00239                 }
00240         }
00241         return;
00242 }
00243 
00244 
00245 //*****************************************************************************
00246 // Receive Task. Invokes in the context of a task 
00247 //*****************************************************************************
00248 void 
00249 VMqttRecvTask(void *pArgs)  
00250 {
00251 
00252         int32_t retval;
00253         do {
00254                 if(false == g_multi_srvr_conn) {
00255                         /* wait on broker connection */
00256                         RX_TX_SIGNAL_WAIT();
00257                         while(1) {
00258                                 /* Forcing to use the index 0 always - caution */
00259                                 retval = mqtt_client_ctx_run(
00260                                                 sl_cli_ctx[0].cli_hndl, 
00261                                                 g_wait_secs);
00262                                 if((retval < 0) && 
00263                                         (retval != MQP_ERR_TIMEOUT)) {
00264                                         break;
00265                                 }
00266                         }
00267                 } else {
00268                         mqtt_client_run(g_wait_secs);
00269                 }
00270         }while(1);
00271 }
00272 
00273 static struct sl_client_ctx *get_available_clictx_mem()
00274 {
00275         int32_t loopcnt, max_store;
00276 
00277         if(false == g_multi_srvr_conn) {
00278                 max_store = 1;
00279         } else {
00280                 max_store = MAX_SIMULTANEOUS_SERVER_CONN;
00281         }
00282 
00283         for(loopcnt = 0; loopcnt < max_store; loopcnt++) {
00284                 if(false == sl_cli_ctx[loopcnt].in_use) {
00285                         sl_cli_ctx[loopcnt].in_use = true;
00286                         return(&(sl_cli_ctx[loopcnt]));
00287                 }
00288         }
00289 
00290         return NULL;
00291 }
00292 
00293 //*****************************************************************************
00294 // sl_ExtLib_MqttClientCtxCreate
00295 //*****************************************************************************
00296 void *sl_ExtLib_MqttClientCtxCreate(const SlMqttClientCtxCfg_t *ctx_cfg,
00297                                       const SlMqttClientCbs_t *msg_cbs,
00298                                       void *app_hndl)
00299 {
00300 
00301         struct sl_client_ctx *client_ctx_ptr;
00302         struct mqtt_client_ctx_cfg lib_ctx_cfg;
00303         struct mqtt_client_ctx_cbs lib_cli_cbs;
00304         struct secure_conn lib_nw_security;
00305         int32_t retval;
00306        
00307         /* Get a client context storage area */
00308         client_ctx_ptr = get_available_clictx_mem();
00309         if(client_ctx_ptr == NULL) {
00310                 return NULL;
00311         }
00312         
00313         /* Create the sync object to signal on arrival of ACK packet */
00314         _cc3100module._nonos.sl_SyncObjCreate(&client_ctx_ptr->ack_sync_obj, "AckSyncObject");
00315         _cc3100module._nonos.sl_SyncObjWait(&client_ctx_ptr->ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_NO_WAIT);
00316         
00317         /* Store the application handle */
00318         client_ctx_ptr->app_hndl = app_hndl;
00319         /* Initialize the ACK awaited */
00320         client_ctx_ptr->awaited_ack = 0;
00321         
00322         /* Store the application callbacks, to be invoked later */
00323         client_ctx_ptr->app_cbs.sl_ExtLib_MqttRecv = \
00324                                         msg_cbs->sl_ExtLib_MqttRecv;
00325         client_ctx_ptr->app_cbs.sl_ExtLib_MqttEvent = \
00326                                         msg_cbs->sl_ExtLib_MqttEvent;
00327         client_ctx_ptr->app_cbs.sl_ExtLib_MqttDisconn = \
00328                                         msg_cbs->sl_ExtLib_MqttDisconn;
00329 
00330         /* Initialize the client lib */
00331         lib_ctx_cfg.config_opts = ctx_cfg->mqtt_mode31;
00332         
00333         if(true == g_multi_srvr_conn) {
00334                 lib_ctx_cfg.config_opts |=
00335                         (MQTT_CFG_APP_HAS_RTSK | MQTT_CFG_MK_GROUP_CTX);
00336         } else {
00337                 lib_ctx_cfg.config_opts |= MQTT_CFG_APP_HAS_RTSK;
00338         }
00339         
00340         /* get the network connection options */
00341         client_ctx_ptr->blocking_send = ctx_cfg->blocking_send;
00342         lib_ctx_cfg.nwconn_opts = ctx_cfg->server_info.netconn_flags;
00343         lib_ctx_cfg.nwconn_opts |= DEV_NETCONN_OPT_TCP;
00344         lib_ctx_cfg.server_addr = (char*)ctx_cfg->server_info.server_addr;
00345         lib_ctx_cfg.port_number = ctx_cfg->server_info.port_number;
00346         /*initialize secure socket parameters */
00347         if(ctx_cfg->server_info.netconn_flags & SL_MQTT_NETCONN_SEC) {
00348                 lib_ctx_cfg.nw_security = &lib_nw_security;
00349                 /* initialize secure socket parameters */
00350                 lib_ctx_cfg.nw_security->cipher = 
00351                         (void*)&(ctx_cfg->server_info.cipher);
00352                 lib_ctx_cfg.nw_security->method = 
00353                         (void*)&(ctx_cfg->server_info.method);
00354                 lib_ctx_cfg.nw_security->n_file = 
00355                         ctx_cfg->server_info.n_files;
00356                 lib_ctx_cfg.nw_security->files = 
00357                         (char**)ctx_cfg->server_info.secure_files;
00358         }
00359         else {
00360                 lib_ctx_cfg.nw_security=NULL;
00361         }
00362 
00363         lib_cli_cbs.publish_rx = process_publish_rx_cb;
00364         lib_cli_cbs.ack_notify = process_notify_ack_cb;
00365         lib_cli_cbs.disconn_cb = process_disconn_cb;
00366         
00367         retval = mqtt_client_ctx_create(&lib_ctx_cfg,
00368                                         &lib_cli_cbs, client_ctx_ptr,
00369                                         &client_ctx_ptr->cli_hndl);
00370         
00371         if(retval < 0) {
00372                 Uart_Write((uint8_t*)"mqtt_client_ctx_create failed\r\n");
00373                 client_ctx_ptr->in_use = false;
00374                 return NULL;
00375         }
00376         
00377         return (void*)client_ctx_ptr;
00378 }
00379 
00380 //*****************************************************************************
00381 // sl_ExtLib_MqttClientCtxDelete
00382 //*****************************************************************************
00383 int32_t sl_ExtLib_MqttClientCtxDelete(void *cli_ctx)
00384 {
00385         struct sl_client_ctx *client_ctx=(struct sl_client_ctx *)cli_ctx;
00386         int32_t retval;
00387 
00388         retval = mqtt_client_ctx_delete(client_ctx->cli_hndl);
00389         if(retval >= 0) {
00390                 /* Check for more paramaters --- TBD */
00391                 _cc3100module._nonos.sl_SyncObjDelete(&client_ctx->ack_sync_obj, 0);
00392                 /* Free up the context */
00393                 memset(client_ctx, 0, sizeof(struct sl_client_ctx));
00394         }
00395 
00396         return (retval < 0)? -1: 0;
00397 }
00398 
00399 void
00400 mutex_lockup(void *mqtt_lib_lock)
00401 {
00402         _cc3100module._nonos.sl_LockObjLock((_SlLockObj_t*)mqtt_lib_lock, SL_OS_WAIT_FOREVER, NON_OS_LOCK_OBJ_LOCK_VALUE, SL_OS_WAIT_FOREVER);
00403 }
00404 
00405 void
00406 mutex_unlock(void *mqtt_lib_lock)
00407 {
00408         _cc3100module._nonos.sl_LockObjUnlock((_SlLockObj_t*)mqtt_lib_lock, NON_OS_LOCK_OBJ_UNLOCK_VALUE);
00409 }
00410 
00411 //*****************************************************************************
00412 // sl_ExtLib_MqttClientInit
00413 //*****************************************************************************
00414 int32_t sl_ExtLib_MqttClientInit(const SlMqttClientLibCfg_t  *cfg)
00415 {
00416 
00417         struct mqtt_client_lib_cfg lib_cfg;
00418         /* Initialize the control variables */
00419         memset(sl_cli_ctx, 0, sizeof(sl_cli_ctx));
00420 
00421         /* Setup the MQTT client lib configurations */
00422         lib_cfg.loopback_port = cfg->loopback_port;
00423         /* Initializing the sync object */
00424         g_rx_tx_sync_obj = 0;
00425 
00426         if(cfg->loopback_port) {
00427                 g_multi_srvr_conn = true;
00428                 lib_cfg.grp_uses_cbfn = true;   /* Does use the group callback func */
00429         } else {
00430                 g_multi_srvr_conn = false;
00431                 lib_cfg.grp_uses_cbfn = false;   /* Doesnt use the group callback func */
00432                 /* Create the sync object between Rx and App tasks */
00433                 _cc3100module._nonos.sl_SyncObjCreate(&g_rx_tx_sync_obj, "RxTxSyncObject");
00434                 _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_NO_WAIT);
00435         }
00436         
00437         g_wait_secs = cfg->resp_time;
00438         /* Setup the mutex operations */
00439         _cc3100module._nonos.sl_LockObjCreate(&mqtt_lib_lockobj,"MQTT Lock");
00440         lib_cfg.mutex = (void *)&mqtt_lib_lockobj;
00441         lib_cfg.mutex_lockin = mutex_lockup;
00442         lib_cfg.mutex_unlock = mutex_unlock;
00443         /* hooking DBG print function */
00444         lib_cfg.debug_printf = cfg->dbg_print;
00445         /* Initialize client library */
00446         if(mqtt_client_lib_init(&lib_cfg) < 0) {
00447                 Uart_Write((uint8_t*)"mqtt_client_lib_init failed\r\n");
00448                 return -1;
00449         }
00450 
00451         /* provide MQTT Lib information to create a pool of MQTT constructs. */
00452         mqtt_client_buffers_register(MAX_MQP,mqp_vec,BUF_LEN,&buf_vec[0][0]);
00453         /* Register network services speicific to CC3200 */
00454         mqtt_client_net_svc_register(&net);
00455         /* start the receive task */
00456         osi_TaskCreate( VMqttRecvTask, 
00457                 (const signed char *) "MQTTRecv",
00458                 OSI_STACK_SIZE,
00459                 NULL, 
00460                 cfg->rx_tsk_priority, &g_rx_task_hndl );
00461                 
00462         return 0;
00463 }
00464 
00465 //*****************************************************************************
00466 // sl_ExtLib_MqttClientExit
00467 //*****************************************************************************
00468 int32_t 
00469 sl_ExtLib_MqttClientExit()
00470 {
00471         int32_t retval;
00472 
00473         /* Deinitialize the MQTT client lib */
00474         retval = mqtt_client_lib_exit();
00475         if(retval >= 0) {
00476                 if(mqtt_lib_lockobj != NULL)
00477                 {
00478                         /* Delete the MQTT lib lock object */
00479                         _cc3100module._nonos.sl_LockObjDelete(&mqtt_lib_lockobj, 0);
00480                 }
00481                 if(g_rx_task_hndl != NULL)
00482                 {
00483                         /* Delete the Rx Task */
00484                         osi_TaskDelete(&g_rx_task_hndl);
00485                 }
00486                 
00487                 if(g_rx_tx_sync_obj != NULL)
00488                 {
00489                         /* Delete the Rx-Tx task sync object */
00490                         _cc3100module._nonos.sl_SyncObjDelete(&g_rx_tx_sync_obj, 0);
00491                 }
00492                 
00493                 g_rx_task_hndl = NULL;
00494                 g_rx_tx_sync_obj = NULL;
00495                 mqtt_lib_lockobj = NULL;
00496         }
00497         
00498         return (retval < 0)? -1: 0;
00499 }
00500 
00501 //*****************************************************************************
00502 // sl_ExtLib_MqttClientSet
00503 //*****************************************************************************
00504 int32_t 
00505 sl_ExtLib_MqttClientSet(void *app, int32_t param, const void *value, uint32_t len)
00506 {
00507 
00508         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app;
00509         switch(param)
00510         {
00511         case SL_MQTT_PARAM_CLIENT_ID:
00512                 /* Save the reference to the Client ID */
00513                 client_ctx->client_id =(char*)value;
00514                 break;
00515 
00516         case SL_MQTT_PARAM_USER_NAME:
00517                 /* Save the reference to the Username */
00518                 client_ctx->usr_name =(char*)value;
00519                 break;
00520 
00521         case SL_MQTT_PARAM_PASS_WORD:
00522                 /* Save the reference to the password */
00523                 client_ctx->usr_pwd =(char*)value;
00524                 break;
00525 
00526         case SL_MQTT_PARAM_WILL_PARAM:
00527                 /* Save the reference to will parameters*/
00528                 client_ctx->mqtt_will = *((SlMqttWill_t*)value);
00529                 break;
00530         default:
00531                 break;
00532         }
00533 
00534         return 0;
00535    
00536 }
00537 
00538 //*****************************************************************************
00539 // sl_ExtLib_MqttClientConnect
00540 //*****************************************************************************
00541 int32_t
00542 sl_ExtLib_MqttClientConnect(void *cli_ctx, bool clean, uint16_t keep_alive_time)
00543 { 
00544 
00545     
00546         int32_t ret = -1;
00547         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00548 
00549         /*utf8 strings into which client info will be stored*/
00550         struct utf8_string client_id, username, usr_pwd, will_topic, will_msg;
00551         struct utf8_string *usrname = NULL, *usrpasswd = NULL, *clientid=NULL;
00552 
00553         /* Provide Client ID,user name and password into MQTT Library */
00554         if(client_ctx->client_id != NULL) {
00555             STR2UTF_CONV(client_id, client_ctx->client_id);
00556             clientid = &client_id;
00557         }
00558         
00559         if(client_ctx->usr_name != NULL) {
00560                 STR2UTF_CONV(username, client_ctx->usr_name);
00561                 usrname = &username;
00562         }
00563         if(client_ctx->usr_pwd != NULL) {
00564                 STR2UTF_CONV(usr_pwd, client_ctx->usr_pwd);
00565                 usrpasswd = &usr_pwd;
00566         }
00567         ret = mqtt_client_ctx_info_register(client_ctx->cli_hndl,
00568                                 clientid, usrname, usrpasswd);
00569         if(ret < 0) {
00570                 goto mqtt_connect_exit1;
00571         }
00572         
00573         /* Register a will message, if specified, into MQTT Library */
00574         if(NULL != client_ctx->mqtt_will.will_topic ) {
00575                 STR2UTF_CONV(will_topic, client_ctx->mqtt_will.will_topic);
00576                 STR2UTF_CONV(will_msg, client_ctx->mqtt_will.will_msg);
00577                 ret = mqtt_client_ctx_will_register(client_ctx->cli_hndl,
00578                                         &will_topic, &will_msg,
00579                                         qos[client_ctx->mqtt_will.will_qos],
00580                                         client_ctx->mqtt_will.retain);
00581                 if(ret < 0) {
00582                         Uart_Write((uint8_t*)"mqtt_client_ctx_will_register fail\r\n");
00583                         goto mqtt_connect_exit1;
00584                 }
00585         }
00586         
00587         client_ctx->awaited_ack = MQTT_CONNACK;
00588         client_ctx->conn_ack = 0;
00589     
00590         /* Connect to the server */
00591         ret = mqtt_connect_msg_send(client_ctx->cli_hndl, clean, keep_alive_time);
00592         /*Network or Socket Error happens*/
00593         if(ret < 0) {
00594                 Uart_Write((uint8_t*)"mqtt_connect_msg_send failed \r\n");
00595                 goto mqtt_connect_exit1;
00596         }
00597 
00598         if(false == g_multi_srvr_conn) {
00599                 /* Unblock the receive task here */        
00600                 RX_TX_SIGNAL_POST();
00601         }
00602         
00603         /* Wait for a CONNACK here */
00604         ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00605         
00606         if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00607                 Uart_Write((uint8_t*)"sl_ExtLib_MqttClientConnect fail\r\n");        
00608                 ret = -1;
00609         } else {
00610                 ret = (int32_t)client_ctx->conn_ack;
00611         }
00612 
00613 mqtt_connect_exit1:
00614 
00615         client_ctx->awaited_ack = 0;
00616         client_ctx->conn_ack = 0;
00617         return (ret < 0)? -1: ret;
00618 }
00619 
00620 //*****************************************************************************
00621 // sl_ExtLib_MqttClientDisconnect
00622 //*****************************************************************************
00623 int32_t
00624 sl_ExtLib_MqttClientDisconnect(void *cli_ctx)
00625 {
00626         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00627         int32_t retval;
00628 
00629         if(!(mqtt_client_is_connected(client_ctx->cli_hndl)))
00630                 return -1;
00631 
00632         client_ctx->awaited_ack = MQTT_DISCONNECT;
00633 
00634         /* send the disconnect command. */
00635         retval = mqtt_disconn_send(client_ctx->cli_hndl);
00636         
00637         if(retval >= 0) {
00638                 /* wait on Rx task to acknowledge */
00639                 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00640         }
00641         client_ctx->awaited_ack = 0;
00642 
00643         return (retval < 0)? -1: 0;
00644 }
00645 
00646 
00647 //*****************************************************************************
00648 // sl_ExtLib_MqttClientSub
00649 //*****************************************************************************
00650 int32_t 
00651 sl_ExtLib_MqttClientSub(void *cli_ctx, char* const *topics,
00652                         uint8_t *qos_level, int32_t count)
00653 {
00654 #define MAX_SIMULTANEOUS_SUB_TOPICS 4
00655 
00656         int32_t ret = -1, i;
00657 
00658         struct utf8_strqos qos_topics[MAX_SIMULTANEOUS_SUB_TOPICS];
00659         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00660 
00661         if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_SUB_TOPICS)) {
00662                 goto mqtt_sub_exit1;  /* client not connected*/
00663         }
00664 
00665         for(i = 0; i < count; i++) {
00666                 qos_topics[i].buffer = topics[i];
00667                 qos_topics[i].qosreq = qos[qos_level[i]];
00668                 qos_topics[i].length = strlen(topics[i]);
00669         }
00670 
00671         /* Set up all variables to receive an ACK for the message to be sent*/
00672         if(true == client_ctx->blocking_send) {      
00673                 /* Update the qos_level to be in-out parameter */
00674                 client_ctx->suback_qos = (char*)qos_level;
00675                 client_ctx->awaited_ack = MQTT_SUBACK;
00676         }
00677 
00678         /* Send the subscription MQTT message */
00679         ret = mqtt_sub_msg_send(client_ctx->cli_hndl, qos_topics, count);
00680         if(ret < 0) {
00681                 goto mqtt_sub_exit1;
00682         }
00683 
00684         if(true == client_ctx->blocking_send) {
00685                 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00686                 if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00687                         ret = -1;
00688                 }
00689         }
00690 
00691 mqtt_sub_exit1:
00692         client_ctx->awaited_ack = 0;
00693         return (ret < 0)? -1: 0;
00694 }
00695 
00696 
00697 //*****************************************************************************
00698 // sl_ExtLib_MqttClientUnsub
00699 //*****************************************************************************
00700 int32_t
00701 sl_ExtLib_MqttClientUnsub(void *cli_ctx, char* const *topics, int32_t count)
00702 {
00703 #define MAX_SIMULTANEOUS_UNSUB_TOPICS 4
00704 
00705         int32_t ret = -1, i;
00706         struct utf8_string unsub_topics[MAX_SIMULTANEOUS_UNSUB_TOPICS];
00707         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00708 
00709         if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_UNSUB_TOPICS)) {
00710                 goto mqtt_unsub_exit1; /* Check client status */
00711         }
00712 
00713         for(i = 0; i < count; i++) {
00714                 STR2UTF_CONV(unsub_topics[i], topics[i]);
00715         }
00716 
00717         /* Set up all variables to receive an ACK for the message to be sent*/
00718         if(true == client_ctx->blocking_send) {      
00719                 client_ctx->awaited_ack = MQTT_UNSUBACK;
00720         }
00721 
00722         /* Send the unsubscription MQTT message */
00723         ret = mqtt_unsub_msg_send(client_ctx->cli_hndl, unsub_topics, count);
00724         if(ret < 0) {
00725                 goto mqtt_unsub_exit1;
00726         }
00727 
00728         if(true == client_ctx->blocking_send) {
00729                 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00730                 if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00731                         ret = -1;
00732                 }
00733         }
00734 
00735 mqtt_unsub_exit1:
00736         client_ctx->awaited_ack = 0;
00737         return (ret < 0)? -1: 0;
00738 }
00739 //*****************************************************************************
00740 // sl_ExtLib_MqttClientSend
00741 //*****************************************************************************
00742 int32_t 
00743 sl_ExtLib_MqttClientSend(void *cli_ctx, const char *topic,
00744                           const void *data, int32_t len, 
00745                           char qos_level, bool retain)
00746 {
00747 
00748         int32_t ret = -1;
00749         struct utf8_string topic_utf8;
00750         struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx;
00751 
00752         /* Check whether Client is connected or not? */
00753         if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) {
00754                 Uart_Write((uint8_t*)"mqtt client is not connected\n\r");
00755                 return ret;
00756         }
00757 
00758         STR2UTF_CONV(topic_utf8, topic);
00759 
00760         /*Set up all variables to receive an ACK for the message to be sent*/
00761         if(MQTT_QOS1 == qos[qos_level]) {
00762                 client_ctx->awaited_ack = MQTT_PUBACK;
00763         } else if(MQTT_QOS2 == qos[qos_level]) {
00764                 client_ctx->awaited_ack = MQTT_PUBCOMP;
00765         }
00766         /*publish the message*/
00767         ret = mqtt_client_pub_msg_send(client_ctx->cli_hndl, &topic_utf8,
00768                                         (const uint8_t*)data, len,
00769                                         qos[qos_level], retain);
00770         if(ret < 0) {
00771                 Uart_Write((uint8_t*)"mqtt_client_pub_msg_send failed\n\r");
00772                 goto mqtt_pub_exit1;
00773         }
00774 
00775         if(MQTT_QOS0 != qos[qos_level]) {
00776                 if(true == client_ctx->blocking_send) {
00777                         ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj);
00778                         if(MQTT_DISCONNECT == client_ctx->awaited_ack) {
00779                                 Uart_Write((uint8_t*)"MQTT_DISCONNECT\n\r");
00780                                 ret = -1;
00781                         }
00782                 }
00783         }
00784 
00785 mqtt_pub_exit1:
00786         client_ctx->awaited_ack = 0;
00787         return (ret < 0)? -1: 0;
00788 }
00789 
00790 }//namespace mbed_mqtt 
00791