Update revision to use TI's mqtt and Freertos.
Dependencies: mbed client server
Fork of cc3100_Test_mqtt_CM3 by
sl_mqtt_client.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 #include "sl_mqtt_client.h" 00017 #include "cc31xx_sl_net.h" 00018 //#include "cc3100_nonos.h" 00019 #include "myBoardInit.h" 00020 #include "cli_uart.h" 00021 #include "osi.h" 00022 00023 using namespace mbed_cc3100; 00024 00025 #if (THIS_BOARD == MBED_BOARD_LPC1768) 00026 //cc3100 _cc3100module(p9, p10, p8, SPI(p11, p12, p13));//LPC1768 irq, nHib, cs, mosi, miso, sck 00027 cc3100 _cc3100module(p16, p17, p9, p10, p8, SPI(p5, p6, p7));//LPC1768 irq, nHib, cs, mosi, miso, sck 00028 #elif (THIS_BOARD == Seeed_Arch_Max) 00029 class cc3100 _cc3100module(PE_5, PE_4, PE_6, SPI(PB_5, PB_4, PB_3)); 00030 #elif (THIS_BOARD == ST_MBED_NUCLEOF103) 00031 class cc3100 _cc3100module(PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF103 irq, nHib, cs, mosi, miso, sck 00032 #elif (THIS_BOARD == ST_MBED_NUCLEOF411) 00033 class cc3100 _cc3100module(PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF411 irq, nHib, cs, mosi, miso, sck 00034 #elif (THIS_BOARD == ST_MBED_NUCLEOF401) 00035 class cc3100 _cc3100module(PA_8, PA_9, PC_7, PB_6, SPI(PA_7, PA_6, PA_5));//nucleoF401 irq, nHib, cs, mosi, miso, sck 00036 #elif (THIS_BOARD == EA_MBED_LPC4088) 00037 class cc3100 _cc3100module(p14, p15, p9, p10, p8, SPI(p5, p6, p7));//LPC4088 irq, nHib, cs, mosi, miso, sck 00038 #elif (THIS_BOARD == LPCXpresso4337) 00039 class cc3100 _cc3100module(P2_12, P2_9, P2_2, P3_5, P1_2, SPI(P1_4, P1_3, PF_4));//LPCXpresso4337 irq, nHib, cs, mosi, miso, sck 00040 #endif 00041 00042 namespace mbed_mqtt { 00043 00044 #ifndef CFG_SL_CL_BUF_LEN 00045 #define BUF_LEN 1024 /*Buffer length*/ 00046 #else 00047 #define BUF_LEN CFG_SL_CL_BUF_LEN 00048 #endif 00049 00050 #ifndef CFG_SL_CL_MAX_MQP 00051 #define MAX_MQP 2 /* # of buffers */ 00052 #else 00053 #define MAX_MQP CFG_SL_CL_MAX_MQP 00054 #endif 00055 00056 #ifndef CFG_SL_CL_STACK 00057 #define OSI_STACK_SIZE 2048 00058 #else 00059 #define OSI_STACK_SIZE CFG_SL_CL_STACK 00060 #endif 00061 //***************************************************************************** 00062 // global variables used 00063 //***************************************************************************** 00064 00065 00066 struct sl_client_ctx { 00067 /* Client information details */ 00068 char *client_id; 00069 char *usr_name; 00070 char *usr_pwd; 00071 SlMqttWill_t mqtt_will; 00072 00073 /* Client management variables */ 00074 bool in_use; 00075 char awaited_ack; 00076 uint16_t conn_ack; 00077 00078 /*Sync Object used to ensure single inflight message - 00079 used in blocking mode to wait on ack*/ 00080 _SlSyncObj_t ack_sync_obj; 00081 /* Suback QoS pointer to store passed by application */ 00082 char *suback_qos; 00083 00084 /* Application information */ 00085 void* app_hndl; 00086 SlMqttClientCbs_t app_cbs; 00087 00088 /* Library information */ 00089 void *cli_hndl; 00090 bool blocking_send; 00091 }; 00092 00093 #ifndef CFG_CL_MQTT_CTXS 00094 #define MAX_SIMULTANEOUS_SERVER_CONN 4 00095 #else 00096 #define MAX_SIMULTANEOUS_SERVER_CONN CFG_MQTT_CL_CTXS 00097 #endif 00098 00099 00100 static struct sl_client_ctx sl_cli_ctx[MAX_SIMULTANEOUS_SERVER_CONN]; 00101 00102 /* Lock Object to be passed to the MQTT lib */ 00103 _SlLockObj_t mqtt_lib_lockobj; 00104 00105 /* Creating a pool of MQTT coonstructs that can be used by the MQTT Lib 00106 mqp_vec =>pointer to a pool of the mqtt packet constructs 00107 buf_vec => the buffer area which is attached with each of mqp_vec*/ 00108 DEFINE_MQP_BUF_VEC(MAX_MQP, mqp_vec, BUF_LEN, buf_vec); 00109 00110 /*Task Priority and Response Time*/ 00111 uint32_t g_wait_secs; 00112 00113 bool g_multi_srvr_conn = false; 00114 00115 /* Receive task handle */ 00116 OsiTaskHandle g_rx_task_hndl; 00117 00118 /* Synchronization object used between the Rx and App task */ 00119 _SlSyncObj_t g_rx_tx_sync_obj; 00120 00121 #define RX_TX_SIGNAL_WAIT() _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_WAIT_FOREVER) 00122 #define RX_TX_SIGNAL_POST() _cc3100module._nonos.sl_SyncObjSignal(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE) 00123 00124 /* MQTT Quality of Service */ 00125 static const enum mqtt_qos qos[] ={ 00126 MQTT_QOS0, 00127 MQTT_QOS1, 00128 MQTT_QOS2 00129 }; 00130 00131 /* Network Services specific to cc3200*/ 00132 struct device_net_services net={&comm_open,&tcp_send,&tcp_recv,&send_dest,&recv_from,&comm_close, 00133 &tcp_listen,&tcp_accept,&tcp_select,&rtc_secs}; 00134 00135 /* Defining Retain Flag. Getting retain flag bit from fixed header*/ 00136 #define MQP_PUB_RETAIN(mqp) ( ((mqp->fh_byte1 ) & 0x01)?true:false ) 00137 00138 /*Defining Duplicate flag. Getting Duplicate flag bit from fixed header*/ 00139 #define MQP_PUB_DUP(mqp) (((mqp->fh_byte1>>3) & 0x01)?true:false ) 00140 00141 /*Defining QOS value. Getting QOS value from fixed header*/ 00142 #define MQP_PUB_QOS(mqp) ((mqp->fh_byte1 >>1) & 0x03) 00143 00144 #define ACK_RX_SIGNAL_WAIT(ack_sync_obj) _cc3100module._nonos.sl_SyncObjWait(ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_WAIT_FOREVER) 00145 #define ACK_RX_SIGNAL_POST(ack_sync_obj) _cc3100module._nonos.sl_SyncObjSignal(ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE) 00146 00147 #define STR2UTF_CONV(utf_s, str) {utf_s.buffer = (char *)str; utf_s.length = strlen(str);} 00148 00149 /*Defining Event Messages*/ 00150 #define MQTT_ACK "Ack Received from server" 00151 #define MQTT_ERROR "Connection Lost with broker" 00152 00153 //***************************************************************************** 00154 // process_notify_ack_cb 00155 //***************************************************************************** 00156 static void 00157 process_notify_ack_cb(void *app, uint8_t msg_type, uint16_t msg_id, uint8_t *buf, uint32_t len) 00158 { 00159 00160 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00161 int32_t loopcnt; 00162 00163 switch(msg_type) { 00164 00165 case MQTT_CONNACK: 00166 client_ctx->conn_ack = ((buf[0] << 8) | buf[1]); 00167 client_ctx->awaited_ack = 0; 00168 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00169 break; 00170 case MQTT_SUBACK: 00171 if(true == client_ctx->blocking_send) { 00172 for(loopcnt = 0; loopcnt < len; loopcnt++) { 00173 client_ctx->suback_qos[loopcnt] = buf[loopcnt]; 00174 } 00175 00176 if(client_ctx->awaited_ack == msg_type) { 00177 client_ctx->awaited_ack = 0; 00178 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00179 } 00180 } else { 00181 client_ctx->app_cbs.sl_ExtLib_MqttEvent( 00182 client_ctx->app_hndl, 00183 SL_MQTT_CL_EVT_SUBACK, buf, len); 00184 } 00185 break; 00186 /* Error returns --- TBD */ 00187 default: 00188 if(true == client_ctx->blocking_send) { 00189 if(client_ctx->awaited_ack == msg_type) { 00190 client_ctx->awaited_ack = 0; 00191 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00192 } 00193 } else { 00194 client_ctx->app_cbs.sl_ExtLib_MqttEvent( 00195 client_ctx->app_hndl, 00196 msg_type, MQTT_ACK, strlen(MQTT_ACK)); 00197 } 00198 break; 00199 00200 } 00201 00202 return ; 00203 } 00204 00205 //***************************************************************************** 00206 // process_publish_rx_cb 00207 //***************************************************************************** 00208 static bool 00209 process_publish_rx_cb(void *app, bool dup, enum mqtt_qos qos, bool retain, 00210 struct mqtt_packet *mqp) 00211 { 00212 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00213 /*If incoming message is a publish message from broker */ 00214 /* Invokes the event handler with topic,topic length,payload and 00215 payload length values*/ 00216 client_ctx->app_cbs.sl_ExtLib_MqttRecv(client_ctx->app_hndl, 00217 (char const*)MQP_PUB_TOP_BUF(mqp), 00218 MQP_PUB_TOP_LEN(mqp),MQP_PUB_PAY_BUF(mqp), 00219 MQP_PUB_PAY_LEN(mqp), dup, 00220 MQP_PUB_QOS(mqp), retain); 00221 return true; 00222 } 00223 00224 //***************************************************************************** 00225 // process_disconn_cb 00226 //***************************************************************************** 00227 static void process_disconn_cb(void *app, int32_t cause) 00228 { 00229 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00230 00231 if(client_ctx->awaited_ack != 0) { 00232 client_ctx->awaited_ack = MQTT_DISCONNECT; 00233 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00234 } else { 00235 if(false == client_ctx->blocking_send) { 00236 /* Invoke the disconnect callback */ 00237 client_ctx->app_cbs.sl_ExtLib_MqttDisconn( 00238 client_ctx->app_hndl); 00239 } 00240 } 00241 return; 00242 } 00243 00244 00245 //***************************************************************************** 00246 // Receive Task. Invokes in the context of a task 00247 //***************************************************************************** 00248 void 00249 VMqttRecvTask(void *pArgs) 00250 { 00251 00252 int32_t retval; 00253 do { 00254 if(false == g_multi_srvr_conn) { 00255 /* wait on broker connection */ 00256 RX_TX_SIGNAL_WAIT(); 00257 while(1) { 00258 /* Forcing to use the index 0 always - caution */ 00259 retval = mqtt_client_ctx_run( 00260 sl_cli_ctx[0].cli_hndl, 00261 g_wait_secs); 00262 if((retval < 0) && 00263 (retval != MQP_ERR_TIMEOUT)) { 00264 break; 00265 } 00266 } 00267 } else { 00268 mqtt_client_run(g_wait_secs); 00269 } 00270 }while(1); 00271 } 00272 00273 static struct sl_client_ctx *get_available_clictx_mem() 00274 { 00275 int32_t loopcnt, max_store; 00276 00277 if(false == g_multi_srvr_conn) { 00278 max_store = 1; 00279 } else { 00280 max_store = MAX_SIMULTANEOUS_SERVER_CONN; 00281 } 00282 00283 for(loopcnt = 0; loopcnt < max_store; loopcnt++) { 00284 if(false == sl_cli_ctx[loopcnt].in_use) { 00285 sl_cli_ctx[loopcnt].in_use = true; 00286 return(&(sl_cli_ctx[loopcnt])); 00287 } 00288 } 00289 00290 return NULL; 00291 } 00292 00293 //***************************************************************************** 00294 // sl_ExtLib_MqttClientCtxCreate 00295 //***************************************************************************** 00296 void *sl_ExtLib_MqttClientCtxCreate(const SlMqttClientCtxCfg_t *ctx_cfg, 00297 const SlMqttClientCbs_t *msg_cbs, 00298 void *app_hndl) 00299 { 00300 00301 struct sl_client_ctx *client_ctx_ptr; 00302 struct mqtt_client_ctx_cfg lib_ctx_cfg; 00303 struct mqtt_client_ctx_cbs lib_cli_cbs; 00304 struct secure_conn lib_nw_security; 00305 int32_t retval; 00306 00307 /* Get a client context storage area */ 00308 client_ctx_ptr = get_available_clictx_mem(); 00309 if(client_ctx_ptr == NULL) { 00310 return NULL; 00311 } 00312 00313 /* Create the sync object to signal on arrival of ACK packet */ 00314 _cc3100module._nonos.sl_SyncObjCreate(&client_ctx_ptr->ack_sync_obj, "AckSyncObject"); 00315 _cc3100module._nonos.sl_SyncObjWait(&client_ctx_ptr->ack_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_NO_WAIT); 00316 00317 /* Store the application handle */ 00318 client_ctx_ptr->app_hndl = app_hndl; 00319 /* Initialize the ACK awaited */ 00320 client_ctx_ptr->awaited_ack = 0; 00321 00322 /* Store the application callbacks, to be invoked later */ 00323 client_ctx_ptr->app_cbs.sl_ExtLib_MqttRecv = \ 00324 msg_cbs->sl_ExtLib_MqttRecv; 00325 client_ctx_ptr->app_cbs.sl_ExtLib_MqttEvent = \ 00326 msg_cbs->sl_ExtLib_MqttEvent; 00327 client_ctx_ptr->app_cbs.sl_ExtLib_MqttDisconn = \ 00328 msg_cbs->sl_ExtLib_MqttDisconn; 00329 00330 /* Initialize the client lib */ 00331 lib_ctx_cfg.config_opts = ctx_cfg->mqtt_mode31; 00332 00333 if(true == g_multi_srvr_conn) { 00334 lib_ctx_cfg.config_opts |= 00335 (MQTT_CFG_APP_HAS_RTSK | MQTT_CFG_MK_GROUP_CTX); 00336 } else { 00337 lib_ctx_cfg.config_opts |= MQTT_CFG_APP_HAS_RTSK; 00338 } 00339 00340 /* get the network connection options */ 00341 client_ctx_ptr->blocking_send = ctx_cfg->blocking_send; 00342 lib_ctx_cfg.nwconn_opts = ctx_cfg->server_info.netconn_flags; 00343 lib_ctx_cfg.nwconn_opts |= DEV_NETCONN_OPT_TCP; 00344 lib_ctx_cfg.server_addr = (char*)ctx_cfg->server_info.server_addr; 00345 lib_ctx_cfg.port_number = ctx_cfg->server_info.port_number; 00346 /*initialize secure socket parameters */ 00347 if(ctx_cfg->server_info.netconn_flags & SL_MQTT_NETCONN_SEC) { 00348 lib_ctx_cfg.nw_security = &lib_nw_security; 00349 /* initialize secure socket parameters */ 00350 lib_ctx_cfg.nw_security->cipher = 00351 (void*)&(ctx_cfg->server_info.cipher); 00352 lib_ctx_cfg.nw_security->method = 00353 (void*)&(ctx_cfg->server_info.method); 00354 lib_ctx_cfg.nw_security->n_file = 00355 ctx_cfg->server_info.n_files; 00356 lib_ctx_cfg.nw_security->files = 00357 (char**)ctx_cfg->server_info.secure_files; 00358 } 00359 else { 00360 lib_ctx_cfg.nw_security=NULL; 00361 } 00362 00363 lib_cli_cbs.publish_rx = process_publish_rx_cb; 00364 lib_cli_cbs.ack_notify = process_notify_ack_cb; 00365 lib_cli_cbs.disconn_cb = process_disconn_cb; 00366 00367 retval = mqtt_client_ctx_create(&lib_ctx_cfg, 00368 &lib_cli_cbs, client_ctx_ptr, 00369 &client_ctx_ptr->cli_hndl); 00370 00371 if(retval < 0) { 00372 Uart_Write((uint8_t*)"mqtt_client_ctx_create failed\r\n"); 00373 client_ctx_ptr->in_use = false; 00374 return NULL; 00375 } 00376 00377 return (void*)client_ctx_ptr; 00378 } 00379 00380 //***************************************************************************** 00381 // sl_ExtLib_MqttClientCtxDelete 00382 //***************************************************************************** 00383 int32_t sl_ExtLib_MqttClientCtxDelete(void *cli_ctx) 00384 { 00385 struct sl_client_ctx *client_ctx=(struct sl_client_ctx *)cli_ctx; 00386 int32_t retval; 00387 00388 retval = mqtt_client_ctx_delete(client_ctx->cli_hndl); 00389 if(retval >= 0) { 00390 /* Check for more paramaters --- TBD */ 00391 _cc3100module._nonos.sl_SyncObjDelete(&client_ctx->ack_sync_obj, 0); 00392 /* Free up the context */ 00393 memset(client_ctx, 0, sizeof(struct sl_client_ctx)); 00394 } 00395 00396 return (retval < 0)? -1: 0; 00397 } 00398 00399 void 00400 mutex_lockup(void *mqtt_lib_lock) 00401 { 00402 _cc3100module._nonos.sl_LockObjLock((_SlLockObj_t*)mqtt_lib_lock, SL_OS_WAIT_FOREVER, NON_OS_LOCK_OBJ_LOCK_VALUE, SL_OS_WAIT_FOREVER); 00403 } 00404 00405 void 00406 mutex_unlock(void *mqtt_lib_lock) 00407 { 00408 _cc3100module._nonos.sl_LockObjUnlock((_SlLockObj_t*)mqtt_lib_lock, NON_OS_LOCK_OBJ_UNLOCK_VALUE); 00409 } 00410 00411 //***************************************************************************** 00412 // sl_ExtLib_MqttClientInit 00413 //***************************************************************************** 00414 int32_t sl_ExtLib_MqttClientInit(const SlMqttClientLibCfg_t *cfg) 00415 { 00416 00417 struct mqtt_client_lib_cfg lib_cfg; 00418 /* Initialize the control variables */ 00419 memset(sl_cli_ctx, 0, sizeof(sl_cli_ctx)); 00420 00421 /* Setup the MQTT client lib configurations */ 00422 lib_cfg.loopback_port = cfg->loopback_port; 00423 /* Initializing the sync object */ 00424 g_rx_tx_sync_obj = 0; 00425 00426 if(cfg->loopback_port) { 00427 g_multi_srvr_conn = true; 00428 lib_cfg.grp_uses_cbfn = true; /* Does use the group callback func */ 00429 } else { 00430 g_multi_srvr_conn = false; 00431 lib_cfg.grp_uses_cbfn = false; /* Doesnt use the group callback func */ 00432 /* Create the sync object between Rx and App tasks */ 00433 _cc3100module._nonos.sl_SyncObjCreate(&g_rx_tx_sync_obj, "RxTxSyncObject"); 00434 _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, NON_OS_SYNC_OBJ_SIGNAL_VALUE, NON_OS_SYNC_OBJ_CLEAR_VALUE, SL_OS_NO_WAIT); 00435 } 00436 00437 g_wait_secs = cfg->resp_time; 00438 /* Setup the mutex operations */ 00439 _cc3100module._nonos.sl_LockObjCreate(&mqtt_lib_lockobj,"MQTT Lock"); 00440 lib_cfg.mutex = (void *)&mqtt_lib_lockobj; 00441 lib_cfg.mutex_lockin = mutex_lockup; 00442 lib_cfg.mutex_unlock = mutex_unlock; 00443 /* hooking DBG print function */ 00444 lib_cfg.debug_printf = cfg->dbg_print; 00445 /* Initialize client library */ 00446 if(mqtt_client_lib_init(&lib_cfg) < 0) { 00447 Uart_Write((uint8_t*)"mqtt_client_lib_init failed\r\n"); 00448 return -1; 00449 } 00450 00451 /* provide MQTT Lib information to create a pool of MQTT constructs. */ 00452 mqtt_client_buffers_register(MAX_MQP,mqp_vec,BUF_LEN,&buf_vec[0][0]); 00453 /* Register network services speicific to CC3200 */ 00454 mqtt_client_net_svc_register(&net); 00455 /* start the receive task */ 00456 osi_TaskCreate( VMqttRecvTask, 00457 (const signed char *) "MQTTRecv", 00458 OSI_STACK_SIZE, 00459 NULL, 00460 cfg->rx_tsk_priority, &g_rx_task_hndl ); 00461 00462 return 0; 00463 } 00464 00465 //***************************************************************************** 00466 // sl_ExtLib_MqttClientExit 00467 //***************************************************************************** 00468 int32_t 00469 sl_ExtLib_MqttClientExit() 00470 { 00471 int32_t retval; 00472 00473 /* Deinitialize the MQTT client lib */ 00474 retval = mqtt_client_lib_exit(); 00475 if(retval >= 0) { 00476 if(mqtt_lib_lockobj != NULL) 00477 { 00478 /* Delete the MQTT lib lock object */ 00479 _cc3100module._nonos.sl_LockObjDelete(&mqtt_lib_lockobj, 0); 00480 } 00481 if(g_rx_task_hndl != NULL) 00482 { 00483 /* Delete the Rx Task */ 00484 osi_TaskDelete(&g_rx_task_hndl); 00485 } 00486 00487 if(g_rx_tx_sync_obj != NULL) 00488 { 00489 /* Delete the Rx-Tx task sync object */ 00490 _cc3100module._nonos.sl_SyncObjDelete(&g_rx_tx_sync_obj, 0); 00491 } 00492 00493 g_rx_task_hndl = NULL; 00494 g_rx_tx_sync_obj = NULL; 00495 mqtt_lib_lockobj = NULL; 00496 } 00497 00498 return (retval < 0)? -1: 0; 00499 } 00500 00501 //***************************************************************************** 00502 // sl_ExtLib_MqttClientSet 00503 //***************************************************************************** 00504 int32_t 00505 sl_ExtLib_MqttClientSet(void *app, int32_t param, const void *value, uint32_t len) 00506 { 00507 00508 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00509 switch(param) 00510 { 00511 case SL_MQTT_PARAM_CLIENT_ID: 00512 /* Save the reference to the Client ID */ 00513 client_ctx->client_id =(char*)value; 00514 break; 00515 00516 case SL_MQTT_PARAM_USER_NAME: 00517 /* Save the reference to the Username */ 00518 client_ctx->usr_name =(char*)value; 00519 break; 00520 00521 case SL_MQTT_PARAM_PASS_WORD: 00522 /* Save the reference to the password */ 00523 client_ctx->usr_pwd =(char*)value; 00524 break; 00525 00526 case SL_MQTT_PARAM_WILL_PARAM: 00527 /* Save the reference to will parameters*/ 00528 client_ctx->mqtt_will = *((SlMqttWill_t*)value); 00529 break; 00530 default: 00531 break; 00532 } 00533 00534 return 0; 00535 00536 } 00537 00538 //***************************************************************************** 00539 // sl_ExtLib_MqttClientConnect 00540 //***************************************************************************** 00541 int32_t 00542 sl_ExtLib_MqttClientConnect(void *cli_ctx, bool clean, uint16_t keep_alive_time) 00543 { 00544 00545 00546 int32_t ret = -1; 00547 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00548 00549 /*utf8 strings into which client info will be stored*/ 00550 struct utf8_string client_id, username, usr_pwd, will_topic, will_msg; 00551 struct utf8_string *usrname = NULL, *usrpasswd = NULL, *clientid=NULL; 00552 00553 /* Provide Client ID,user name and password into MQTT Library */ 00554 if(client_ctx->client_id != NULL) { 00555 STR2UTF_CONV(client_id, client_ctx->client_id); 00556 clientid = &client_id; 00557 } 00558 00559 if(client_ctx->usr_name != NULL) { 00560 STR2UTF_CONV(username, client_ctx->usr_name); 00561 usrname = &username; 00562 } 00563 if(client_ctx->usr_pwd != NULL) { 00564 STR2UTF_CONV(usr_pwd, client_ctx->usr_pwd); 00565 usrpasswd = &usr_pwd; 00566 } 00567 ret = mqtt_client_ctx_info_register(client_ctx->cli_hndl, 00568 clientid, usrname, usrpasswd); 00569 if(ret < 0) { 00570 goto mqtt_connect_exit1; 00571 } 00572 00573 /* Register a will message, if specified, into MQTT Library */ 00574 if(NULL != client_ctx->mqtt_will.will_topic ) { 00575 STR2UTF_CONV(will_topic, client_ctx->mqtt_will.will_topic); 00576 STR2UTF_CONV(will_msg, client_ctx->mqtt_will.will_msg); 00577 ret = mqtt_client_ctx_will_register(client_ctx->cli_hndl, 00578 &will_topic, &will_msg, 00579 qos[client_ctx->mqtt_will.will_qos], 00580 client_ctx->mqtt_will.retain); 00581 if(ret < 0) { 00582 Uart_Write((uint8_t*)"mqtt_client_ctx_will_register fail\r\n"); 00583 goto mqtt_connect_exit1; 00584 } 00585 } 00586 00587 client_ctx->awaited_ack = MQTT_CONNACK; 00588 client_ctx->conn_ack = 0; 00589 00590 /* Connect to the server */ 00591 ret = mqtt_connect_msg_send(client_ctx->cli_hndl, clean, keep_alive_time); 00592 /*Network or Socket Error happens*/ 00593 if(ret < 0) { 00594 Uart_Write((uint8_t*)"mqtt_connect_msg_send failed \r\n"); 00595 goto mqtt_connect_exit1; 00596 } 00597 00598 if(false == g_multi_srvr_conn) { 00599 /* Unblock the receive task here */ 00600 RX_TX_SIGNAL_POST(); 00601 } 00602 00603 /* Wait for a CONNACK here */ 00604 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00605 00606 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00607 Uart_Write((uint8_t*)"sl_ExtLib_MqttClientConnect fail\r\n"); 00608 ret = -1; 00609 } else { 00610 ret = (int32_t)client_ctx->conn_ack; 00611 } 00612 00613 mqtt_connect_exit1: 00614 00615 client_ctx->awaited_ack = 0; 00616 client_ctx->conn_ack = 0; 00617 return (ret < 0)? -1: ret; 00618 } 00619 00620 //***************************************************************************** 00621 // sl_ExtLib_MqttClientDisconnect 00622 //***************************************************************************** 00623 int32_t 00624 sl_ExtLib_MqttClientDisconnect(void *cli_ctx) 00625 { 00626 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00627 int32_t retval; 00628 00629 if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) 00630 return -1; 00631 00632 client_ctx->awaited_ack = MQTT_DISCONNECT; 00633 00634 /* send the disconnect command. */ 00635 retval = mqtt_disconn_send(client_ctx->cli_hndl); 00636 00637 if(retval >= 0) { 00638 /* wait on Rx task to acknowledge */ 00639 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00640 } 00641 client_ctx->awaited_ack = 0; 00642 00643 return (retval < 0)? -1: 0; 00644 } 00645 00646 00647 //***************************************************************************** 00648 // sl_ExtLib_MqttClientSub 00649 //***************************************************************************** 00650 int32_t 00651 sl_ExtLib_MqttClientSub(void *cli_ctx, char* const *topics, 00652 uint8_t *qos_level, int32_t count) 00653 { 00654 #define MAX_SIMULTANEOUS_SUB_TOPICS 4 00655 00656 int32_t ret = -1, i; 00657 00658 struct utf8_strqos qos_topics[MAX_SIMULTANEOUS_SUB_TOPICS]; 00659 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00660 00661 if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_SUB_TOPICS)) { 00662 goto mqtt_sub_exit1; /* client not connected*/ 00663 } 00664 00665 for(i = 0; i < count; i++) { 00666 qos_topics[i].buffer = topics[i]; 00667 qos_topics[i].qosreq = qos[qos_level[i]]; 00668 qos_topics[i].length = strlen(topics[i]); 00669 } 00670 00671 /* Set up all variables to receive an ACK for the message to be sent*/ 00672 if(true == client_ctx->blocking_send) { 00673 /* Update the qos_level to be in-out parameter */ 00674 client_ctx->suback_qos = (char*)qos_level; 00675 client_ctx->awaited_ack = MQTT_SUBACK; 00676 } 00677 00678 /* Send the subscription MQTT message */ 00679 ret = mqtt_sub_msg_send(client_ctx->cli_hndl, qos_topics, count); 00680 if(ret < 0) { 00681 goto mqtt_sub_exit1; 00682 } 00683 00684 if(true == client_ctx->blocking_send) { 00685 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00686 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00687 ret = -1; 00688 } 00689 } 00690 00691 mqtt_sub_exit1: 00692 client_ctx->awaited_ack = 0; 00693 return (ret < 0)? -1: 0; 00694 } 00695 00696 00697 //***************************************************************************** 00698 // sl_ExtLib_MqttClientUnsub 00699 //***************************************************************************** 00700 int32_t 00701 sl_ExtLib_MqttClientUnsub(void *cli_ctx, char* const *topics, int32_t count) 00702 { 00703 #define MAX_SIMULTANEOUS_UNSUB_TOPICS 4 00704 00705 int32_t ret = -1, i; 00706 struct utf8_string unsub_topics[MAX_SIMULTANEOUS_UNSUB_TOPICS]; 00707 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00708 00709 if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_UNSUB_TOPICS)) { 00710 goto mqtt_unsub_exit1; /* Check client status */ 00711 } 00712 00713 for(i = 0; i < count; i++) { 00714 STR2UTF_CONV(unsub_topics[i], topics[i]); 00715 } 00716 00717 /* Set up all variables to receive an ACK for the message to be sent*/ 00718 if(true == client_ctx->blocking_send) { 00719 client_ctx->awaited_ack = MQTT_UNSUBACK; 00720 } 00721 00722 /* Send the unsubscription MQTT message */ 00723 ret = mqtt_unsub_msg_send(client_ctx->cli_hndl, unsub_topics, count); 00724 if(ret < 0) { 00725 goto mqtt_unsub_exit1; 00726 } 00727 00728 if(true == client_ctx->blocking_send) { 00729 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00730 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00731 ret = -1; 00732 } 00733 } 00734 00735 mqtt_unsub_exit1: 00736 client_ctx->awaited_ack = 0; 00737 return (ret < 0)? -1: 0; 00738 } 00739 //***************************************************************************** 00740 // sl_ExtLib_MqttClientSend 00741 //***************************************************************************** 00742 int32_t 00743 sl_ExtLib_MqttClientSend(void *cli_ctx, const char *topic, 00744 const void *data, int32_t len, 00745 char qos_level, bool retain) 00746 { 00747 00748 int32_t ret = -1; 00749 struct utf8_string topic_utf8; 00750 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00751 00752 /* Check whether Client is connected or not? */ 00753 if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) { 00754 Uart_Write((uint8_t*)"mqtt client is not connected\n\r"); 00755 return ret; 00756 } 00757 00758 STR2UTF_CONV(topic_utf8, topic); 00759 00760 /*Set up all variables to receive an ACK for the message to be sent*/ 00761 if(MQTT_QOS1 == qos[qos_level]) { 00762 client_ctx->awaited_ack = MQTT_PUBACK; 00763 } else if(MQTT_QOS2 == qos[qos_level]) { 00764 client_ctx->awaited_ack = MQTT_PUBCOMP; 00765 } 00766 /*publish the message*/ 00767 ret = mqtt_client_pub_msg_send(client_ctx->cli_hndl, &topic_utf8, 00768 (const uint8_t*)data, len, 00769 qos[qos_level], retain); 00770 if(ret < 0) { 00771 Uart_Write((uint8_t*)"mqtt_client_pub_msg_send failed\n\r"); 00772 goto mqtt_pub_exit1; 00773 } 00774 00775 if(MQTT_QOS0 != qos[qos_level]) { 00776 if(true == client_ctx->blocking_send) { 00777 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00778 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00779 Uart_Write((uint8_t*)"MQTT_DISCONNECT\n\r"); 00780 ret = -1; 00781 } 00782 } 00783 } 00784 00785 mqtt_pub_exit1: 00786 client_ctx->awaited_ack = 0; 00787 return (ret < 0)? -1: 0; 00788 } 00789 00790 }//namespace mbed_mqtt 00791
Generated on Tue Jul 12 2022 18:55:10 by 1.7.2