David Fletcher
/
cc3100_Test_mqtt_CM4F
TI's MQTT Demo with freertos CM4F
Embed:
(wiki syntax)
Show/hide line numbers
sl_mqtt_client.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 #include "sl_mqtt_client.h" 00017 #include "cc31xx_sl_net.h" 00018 //#include "cc3100_nonos.h" 00019 #include "myBoardInit.h" 00020 #include "cli_uart.h" 00021 #include "osi.h" 00022 00023 using namespace mbed_cc3100; 00024 00025 #if (THIS_BOARD == Seeed_Arch_Max) 00026 cc3100 _cc3100module(PB_9, PB_8, PD_12, PD_13, PD_11, SPI(PB_5, PB_4, PB_3));//Seeed_Arch_Max irq, nHib, cs, mosi, miso, sck 00027 #elif (THIS_BOARD == EA_MBED_LPC4088) 00028 class cc3100 _cc3100module(p14, p15, p9, p10, p8, SPI(p5, p6, p7));//LPC4088 irq, nHib, cs, mosi, miso, sck 00029 #elif (THIS_BOARD == LPCXpresso4337) 00030 class cc3100 _cc3100module(P2_2, P3_5, P1_2, SPI(P1_4, P1_3, PF_4));//LPCXpresso4337 irq, nHib, cs, mosi, miso, sck 00031 #endif 00032 00033 namespace mbed_mqtt { 00034 00035 #ifndef CFG_SL_CL_BUF_LEN 00036 #define BUF_LEN 1024 /*Buffer length*/ 00037 #else 00038 #define BUF_LEN CFG_SL_CL_BUF_LEN 00039 #endif 00040 00041 #ifndef CFG_SL_CL_MAX_MQP 00042 #define MAX_MQP 2 /* # of buffers */ 00043 #else 00044 #define MAX_MQP CFG_SL_CL_MAX_MQP 00045 #endif 00046 00047 #ifndef CFG_SL_CL_STACK 00048 #define OSI_STACK_SIZE 2048 00049 #else 00050 #define OSI_STACK_SIZE CFG_SL_CL_STACK 00051 #endif 00052 //***************************************************************************** 00053 // global variables used 00054 //***************************************************************************** 00055 00056 00057 struct sl_client_ctx { 00058 /* Client information details */ 00059 char *client_id; 00060 char *usr_name; 00061 char *usr_pwd; 00062 SlMqttWill_t mqtt_will; 00063 00064 /* Client management variables */ 00065 bool in_use; 00066 char awaited_ack; 00067 uint16_t conn_ack; 00068 00069 /*Sync Object used to ensure single inflight message - 00070 used in blocking mode to wait on ack*/ 00071 _SlSyncObj_t ack_sync_obj; 00072 /* Suback QoS pointer to store passed by application */ 00073 char *suback_qos; 00074 00075 /* Application information */ 00076 void* app_hndl; 00077 SlMqttClientCbs_t app_cbs; 00078 00079 /* Library information */ 00080 void *cli_hndl; 00081 bool blocking_send; 00082 }; 00083 00084 #ifndef CFG_CL_MQTT_CTXS 00085 #define MAX_SIMULTANEOUS_SERVER_CONN 4 00086 #else 00087 #define MAX_SIMULTANEOUS_SERVER_CONN CFG_MQTT_CL_CTXS 00088 #endif 00089 00090 00091 static struct sl_client_ctx sl_cli_ctx[MAX_SIMULTANEOUS_SERVER_CONN]; 00092 00093 /* Lock Object to be passed to the MQTT lib */ 00094 _SlLockObj_t mqtt_lib_lockobj; 00095 00096 /* Creating a pool of MQTT coonstructs that can be used by the MQTT Lib 00097 mqp_vec =>pointer to a pool of the mqtt packet constructs 00098 buf_vec => the buffer area which is attached with each of mqp_vec*/ 00099 DEFINE_MQP_BUF_VEC(MAX_MQP, mqp_vec, BUF_LEN, buf_vec); 00100 00101 /*Task Priority and Response Time*/ 00102 uint32_t g_wait_secs; 00103 00104 bool g_multi_srvr_conn = false; 00105 00106 /* Receive task handle */ 00107 OsiTaskHandle g_rx_task_hndl; 00108 00109 /* Synchronization object used between the Rx and App task */ 00110 _SlSyncObj_t g_rx_tx_sync_obj; 00111 00112 #define RX_TX_SIGNAL_WAIT() _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, SL_OS_WAIT_FOREVER) 00113 #define RX_TX_SIGNAL_POST() _cc3100module._nonos.sl_SyncObjSignal(&g_rx_tx_sync_obj) 00114 00115 /* MQTT Quality of Service */ 00116 static const enum mqtt_qos qos[] ={ 00117 MQTT_QOS0, 00118 MQTT_QOS1, 00119 MQTT_QOS2 00120 }; 00121 00122 /* Network Services specific to cc3200*/ 00123 struct device_net_services net={&comm_open,&tcp_send,&tcp_recv,&send_dest,&recv_from,&comm_close, 00124 &tcp_listen,&tcp_accept,&tcp_select,&rtc_secs}; 00125 00126 /* Defining Retain Flag. Getting retain flag bit from fixed header*/ 00127 #define MQP_PUB_RETAIN(mqp) ( ((mqp->fh_byte1 ) & 0x01)?true:false ) 00128 00129 /*Defining Duplicate flag. Getting Duplicate flag bit from fixed header*/ 00130 #define MQP_PUB_DUP(mqp) (((mqp->fh_byte1>>3) & 0x01)?true:false ) 00131 00132 /*Defining QOS value. Getting QOS value from fixed header*/ 00133 #define MQP_PUB_QOS(mqp) ((mqp->fh_byte1 >>1) & 0x03) 00134 00135 #define ACK_RX_SIGNAL_WAIT(ack_sync_obj) _cc3100module._nonos.sl_SyncObjWait(ack_sync_obj, SL_OS_WAIT_FOREVER) 00136 #define ACK_RX_SIGNAL_POST(ack_sync_obj) _cc3100module._nonos.sl_SyncObjSignal(ack_sync_obj) 00137 00138 #define STR2UTF_CONV(utf_s, str) {utf_s.buffer = (char *)str; utf_s.length = strlen(str);} 00139 00140 /*Defining Event Messages*/ 00141 #define MQTT_ACK "Ack Received from server" 00142 #define MQTT_ERROR "Connection Lost with broker" 00143 00144 //***************************************************************************** 00145 // process_notify_ack_cb 00146 //***************************************************************************** 00147 static void 00148 process_notify_ack_cb(void *app, uint8_t msg_type, uint16_t msg_id, uint8_t *buf, uint32_t len) 00149 { 00150 00151 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00152 int32_t loopcnt; 00153 00154 switch(msg_type) { 00155 00156 case MQTT_CONNACK: 00157 client_ctx->conn_ack = ((buf[0] << 8) | buf[1]); 00158 client_ctx->awaited_ack = 0; 00159 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00160 break; 00161 case MQTT_SUBACK: 00162 if(true == client_ctx->blocking_send) { 00163 for(loopcnt = 0; loopcnt < len; loopcnt++) { 00164 client_ctx->suback_qos[loopcnt] = buf[loopcnt]; 00165 } 00166 00167 if(client_ctx->awaited_ack == msg_type) { 00168 client_ctx->awaited_ack = 0; 00169 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00170 } 00171 } else { 00172 client_ctx->app_cbs.sl_ExtLib_MqttEvent( 00173 client_ctx->app_hndl, 00174 SL_MQTT_CL_EVT_SUBACK, buf, len); 00175 } 00176 break; 00177 /* Error returns --- TBD */ 00178 default: 00179 if(true == client_ctx->blocking_send) { 00180 if(client_ctx->awaited_ack == msg_type) { 00181 client_ctx->awaited_ack = 0; 00182 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00183 } 00184 } else { 00185 client_ctx->app_cbs.sl_ExtLib_MqttEvent( 00186 client_ctx->app_hndl, 00187 msg_type, MQTT_ACK, strlen(MQTT_ACK)); 00188 } 00189 break; 00190 00191 } 00192 00193 return ; 00194 } 00195 00196 //***************************************************************************** 00197 // process_publish_rx_cb 00198 //***************************************************************************** 00199 static bool 00200 process_publish_rx_cb(void *app, bool dup, enum mqtt_qos qos, bool retain, 00201 struct mqtt_packet *mqp) 00202 { 00203 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00204 /*If incoming message is a publish message from broker */ 00205 /* Invokes the event handler with topic,topic length,payload and 00206 payload length values*/ 00207 client_ctx->app_cbs.sl_ExtLib_MqttRecv(client_ctx->app_hndl, 00208 (char const*)MQP_PUB_TOP_BUF(mqp), 00209 MQP_PUB_TOP_LEN(mqp),MQP_PUB_PAY_BUF(mqp), 00210 MQP_PUB_PAY_LEN(mqp), dup, 00211 MQP_PUB_QOS(mqp), retain); 00212 return true; 00213 } 00214 00215 //***************************************************************************** 00216 // process_disconn_cb 00217 //***************************************************************************** 00218 static void process_disconn_cb(void *app, int32_t cause) 00219 { 00220 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00221 00222 if(client_ctx->awaited_ack != 0) { 00223 client_ctx->awaited_ack = MQTT_DISCONNECT; 00224 ACK_RX_SIGNAL_POST(&client_ctx->ack_sync_obj); 00225 } else { 00226 if(false == client_ctx->blocking_send) { 00227 /* Invoke the disconnect callback */ 00228 client_ctx->app_cbs.sl_ExtLib_MqttDisconn( 00229 client_ctx->app_hndl); 00230 } 00231 } 00232 return; 00233 } 00234 00235 00236 //***************************************************************************** 00237 // Receive Task. Invokes in the context of a task 00238 //***************************************************************************** 00239 void 00240 VMqttRecvTask(void *pArgs) 00241 { 00242 00243 int32_t retval; 00244 do { 00245 if(false == g_multi_srvr_conn) { 00246 /* wait on broker connection */ 00247 RX_TX_SIGNAL_WAIT(); 00248 while(1) { 00249 /* Forcing to use the index 0 always - caution */ 00250 retval = mqtt_client_ctx_run( 00251 sl_cli_ctx[0].cli_hndl, 00252 g_wait_secs); 00253 if((retval < 0) && 00254 (retval != MQP_ERR_TIMEOUT)) { 00255 break; 00256 } 00257 } 00258 } else { 00259 mqtt_client_run(g_wait_secs); 00260 } 00261 }while(1); 00262 } 00263 00264 static struct sl_client_ctx *get_available_clictx_mem() 00265 { 00266 int32_t loopcnt, max_store; 00267 00268 if(false == g_multi_srvr_conn) { 00269 max_store = 1; 00270 } else { 00271 max_store = MAX_SIMULTANEOUS_SERVER_CONN; 00272 } 00273 00274 for(loopcnt = 0; loopcnt < max_store; loopcnt++) { 00275 if(false == sl_cli_ctx[loopcnt].in_use) { 00276 sl_cli_ctx[loopcnt].in_use = true; 00277 return(&(sl_cli_ctx[loopcnt])); 00278 } 00279 } 00280 00281 return NULL; 00282 } 00283 00284 //***************************************************************************** 00285 // sl_ExtLib_MqttClientCtxCreate 00286 //***************************************************************************** 00287 void *sl_ExtLib_MqttClientCtxCreate(const SlMqttClientCtxCfg_t *ctx_cfg, 00288 const SlMqttClientCbs_t *msg_cbs, 00289 void *app_hndl) 00290 { 00291 00292 struct sl_client_ctx *client_ctx_ptr; 00293 struct mqtt_client_ctx_cfg lib_ctx_cfg; 00294 struct mqtt_client_ctx_cbs lib_cli_cbs; 00295 struct secure_conn lib_nw_security; 00296 int32_t retval; 00297 00298 /* Get a client context storage area */ 00299 client_ctx_ptr = get_available_clictx_mem(); 00300 if(client_ctx_ptr == NULL) { 00301 return NULL; 00302 } 00303 00304 /* Create the sync object to signal on arrival of ACK packet */ 00305 _cc3100module._nonos.sl_SyncObjCreate(&client_ctx_ptr->ack_sync_obj, "AckSyncObject"); 00306 _cc3100module._nonos.sl_SyncObjWait(&client_ctx_ptr->ack_sync_obj, SL_OS_NO_WAIT); 00307 00308 /* Store the application handle */ 00309 client_ctx_ptr->app_hndl = app_hndl; 00310 /* Initialize the ACK awaited */ 00311 client_ctx_ptr->awaited_ack = 0; 00312 00313 /* Store the application callbacks, to be invoked later */ 00314 client_ctx_ptr->app_cbs.sl_ExtLib_MqttRecv = \ 00315 msg_cbs->sl_ExtLib_MqttRecv; 00316 client_ctx_ptr->app_cbs.sl_ExtLib_MqttEvent = \ 00317 msg_cbs->sl_ExtLib_MqttEvent; 00318 client_ctx_ptr->app_cbs.sl_ExtLib_MqttDisconn = \ 00319 msg_cbs->sl_ExtLib_MqttDisconn; 00320 00321 /* Initialize the client lib */ 00322 lib_ctx_cfg.config_opts = ctx_cfg->mqtt_mode31; 00323 00324 if(true == g_multi_srvr_conn) { 00325 lib_ctx_cfg.config_opts |= 00326 (MQTT_CFG_APP_HAS_RTSK | MQTT_CFG_MK_GROUP_CTX); 00327 } else { 00328 lib_ctx_cfg.config_opts |= MQTT_CFG_APP_HAS_RTSK; 00329 } 00330 00331 /* get the network connection options */ 00332 client_ctx_ptr->blocking_send = ctx_cfg->blocking_send; 00333 lib_ctx_cfg.nwconn_opts = ctx_cfg->server_info.netconn_flags; 00334 lib_ctx_cfg.nwconn_opts |= DEV_NETCONN_OPT_TCP; 00335 lib_ctx_cfg.server_addr = (char*)ctx_cfg->server_info.server_addr; 00336 lib_ctx_cfg.port_number = ctx_cfg->server_info.port_number; 00337 /*initialize secure socket parameters */ 00338 if(ctx_cfg->server_info.netconn_flags & SL_MQTT_NETCONN_SEC) { 00339 lib_ctx_cfg.nw_security = &lib_nw_security; 00340 /* initialize secure socket parameters */ 00341 lib_ctx_cfg.nw_security->cipher = 00342 (void*)&(ctx_cfg->server_info.cipher); 00343 lib_ctx_cfg.nw_security->method = 00344 (void*)&(ctx_cfg->server_info.method); 00345 lib_ctx_cfg.nw_security->n_file = 00346 ctx_cfg->server_info.n_files; 00347 lib_ctx_cfg.nw_security->files = 00348 (char**)ctx_cfg->server_info.secure_files; 00349 } 00350 else { 00351 lib_ctx_cfg.nw_security=NULL; 00352 } 00353 00354 lib_cli_cbs.publish_rx = process_publish_rx_cb; 00355 lib_cli_cbs.ack_notify = process_notify_ack_cb; 00356 lib_cli_cbs.disconn_cb = process_disconn_cb; 00357 00358 retval = mqtt_client_ctx_create(&lib_ctx_cfg, 00359 &lib_cli_cbs, client_ctx_ptr, 00360 &client_ctx_ptr->cli_hndl); 00361 00362 if(retval < 0) { 00363 Uart_Write((uint8_t*)"mqtt_client_ctx_create failed\r\n"); 00364 client_ctx_ptr->in_use = false; 00365 return NULL; 00366 } 00367 00368 return (void*)client_ctx_ptr; 00369 } 00370 00371 //***************************************************************************** 00372 // sl_ExtLib_MqttClientCtxDelete 00373 //***************************************************************************** 00374 int32_t sl_ExtLib_MqttClientCtxDelete(void *cli_ctx) 00375 { 00376 struct sl_client_ctx *client_ctx=(struct sl_client_ctx *)cli_ctx; 00377 int32_t retval; 00378 00379 retval = mqtt_client_ctx_delete(client_ctx->cli_hndl); 00380 if(retval >= 0) { 00381 /* Check for more paramaters --- TBD */ 00382 _cc3100module._nonos.sl_SyncObjDelete(&client_ctx->ack_sync_obj); 00383 /* Free up the context */ 00384 memset(client_ctx, 0, sizeof(struct sl_client_ctx)); 00385 } 00386 00387 return (retval < 0)? -1: 0; 00388 } 00389 00390 void 00391 mutex_lockup(void *mqtt_lib_lock) 00392 { 00393 _cc3100module._nonos.sl_LockObjLock((_SlLockObj_t*)mqtt_lib_lock, SL_OS_WAIT_FOREVER); 00394 } 00395 00396 void 00397 mutex_unlock(void *mqtt_lib_lock) 00398 { 00399 _cc3100module._nonos.sl_LockObjUnlock((_SlLockObj_t*)mqtt_lib_lock); 00400 } 00401 00402 //***************************************************************************** 00403 // sl_ExtLib_MqttClientInit 00404 //***************************************************************************** 00405 int32_t sl_ExtLib_MqttClientInit(const SlMqttClientLibCfg_t *cfg) 00406 { 00407 00408 struct mqtt_client_lib_cfg lib_cfg; 00409 /* Initialize the control variables */ 00410 memset(sl_cli_ctx, 0, sizeof(sl_cli_ctx)); 00411 00412 /* Setup the MQTT client lib configurations */ 00413 lib_cfg.loopback_port = cfg->loopback_port; 00414 /* Initializing the sync object */ 00415 g_rx_tx_sync_obj = 0; 00416 00417 if(cfg->loopback_port) { 00418 g_multi_srvr_conn = true; 00419 lib_cfg.grp_uses_cbfn = true; /* Does use the group callback func */ 00420 } else { 00421 g_multi_srvr_conn = false; 00422 lib_cfg.grp_uses_cbfn = false; /* Doesnt use the group callback func */ 00423 /* Create the sync object between Rx and App tasks */ 00424 _cc3100module._nonos.sl_SyncObjCreate(&g_rx_tx_sync_obj, "RxTxSyncObject"); 00425 _cc3100module._nonos.sl_SyncObjWait(&g_rx_tx_sync_obj, SL_OS_NO_WAIT); 00426 } 00427 00428 g_wait_secs = cfg->resp_time; 00429 /* Setup the mutex operations */ 00430 _cc3100module._nonos.sl_LockObjCreate(&mqtt_lib_lockobj,"MQTT Lock"); 00431 lib_cfg.mutex = (void *)&mqtt_lib_lockobj; 00432 lib_cfg.mutex_lockin = mutex_lockup; 00433 lib_cfg.mutex_unlock = mutex_unlock; 00434 /* hooking DBG print function */ 00435 lib_cfg.debug_printf = cfg->dbg_print; 00436 /* Initialize client library */ 00437 if(mqtt_client_lib_init(&lib_cfg) < 0) { 00438 Uart_Write((uint8_t*)"mqtt_client_lib_init failed\r\n"); 00439 return -1; 00440 } 00441 00442 /* provide MQTT Lib information to create a pool of MQTT constructs. */ 00443 mqtt_client_buffers_register(MAX_MQP,mqp_vec,BUF_LEN,&buf_vec[0][0]); 00444 /* Register network services speicific to CC3200 */ 00445 mqtt_client_net_svc_register(&net); 00446 /* start the receive task */ 00447 osi_TaskCreate( VMqttRecvTask, 00448 (const signed char *) "MQTTRecv", 00449 OSI_STACK_SIZE, 00450 NULL, 00451 cfg->rx_tsk_priority, &g_rx_task_hndl ); 00452 00453 return 0; 00454 } 00455 00456 //***************************************************************************** 00457 // sl_ExtLib_MqttClientExit 00458 //***************************************************************************** 00459 int32_t 00460 sl_ExtLib_MqttClientExit() 00461 { 00462 int32_t retval; 00463 00464 /* Deinitialize the MQTT client lib */ 00465 retval = mqtt_client_lib_exit(); 00466 if(retval >= 0) { 00467 if(mqtt_lib_lockobj != NULL) 00468 { 00469 /* Delete the MQTT lib lock object */ 00470 _cc3100module._nonos.sl_LockObjDelete(&mqtt_lib_lockobj); 00471 } 00472 if(g_rx_task_hndl != NULL) 00473 { 00474 /* Delete the Rx Task */ 00475 osi_TaskDelete(&g_rx_task_hndl); 00476 } 00477 00478 if(g_rx_tx_sync_obj != NULL) 00479 { 00480 /* Delete the Rx-Tx task sync object */ 00481 _cc3100module._nonos.sl_SyncObjDelete(&g_rx_tx_sync_obj); 00482 } 00483 00484 g_rx_task_hndl = NULL; 00485 g_rx_tx_sync_obj = NULL; 00486 mqtt_lib_lockobj = NULL; 00487 } 00488 00489 return (retval < 0)? -1: 0; 00490 } 00491 00492 //***************************************************************************** 00493 // sl_ExtLib_MqttClientSet 00494 //***************************************************************************** 00495 int32_t 00496 sl_ExtLib_MqttClientSet(void *app, int32_t param, const void *value, uint32_t len) 00497 { 00498 00499 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)app; 00500 switch(param) 00501 { 00502 case SL_MQTT_PARAM_CLIENT_ID: 00503 /* Save the reference to the Client ID */ 00504 client_ctx->client_id =(char*)value; 00505 break; 00506 00507 case SL_MQTT_PARAM_USER_NAME: 00508 /* Save the reference to the Username */ 00509 client_ctx->usr_name =(char*)value; 00510 break; 00511 00512 case SL_MQTT_PARAM_PASS_WORD: 00513 /* Save the reference to the password */ 00514 client_ctx->usr_pwd =(char*)value; 00515 break; 00516 00517 case SL_MQTT_PARAM_WILL_PARAM: 00518 /* Save the reference to will parameters*/ 00519 client_ctx->mqtt_will = *((SlMqttWill_t*)value); 00520 break; 00521 default: 00522 break; 00523 } 00524 00525 return 0; 00526 00527 } 00528 00529 //***************************************************************************** 00530 // sl_ExtLib_MqttClientConnect 00531 //***************************************************************************** 00532 int32_t 00533 sl_ExtLib_MqttClientConnect(void *cli_ctx, bool clean, uint16_t keep_alive_time) 00534 { 00535 00536 00537 int32_t ret = -1; 00538 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00539 00540 /*utf8 strings into which client info will be stored*/ 00541 struct utf8_string client_id, username, usr_pwd, will_topic, will_msg; 00542 struct utf8_string *usrname = NULL, *usrpasswd = NULL, *clientid=NULL; 00543 00544 /* Provide Client ID,user name and password into MQTT Library */ 00545 if(client_ctx->client_id != NULL) { 00546 STR2UTF_CONV(client_id, client_ctx->client_id); 00547 clientid = &client_id; 00548 } 00549 00550 if(client_ctx->usr_name != NULL) { 00551 STR2UTF_CONV(username, client_ctx->usr_name); 00552 usrname = &username; 00553 } 00554 if(client_ctx->usr_pwd != NULL) { 00555 STR2UTF_CONV(usr_pwd, client_ctx->usr_pwd); 00556 usrpasswd = &usr_pwd; 00557 } 00558 ret = mqtt_client_ctx_info_register(client_ctx->cli_hndl, 00559 clientid, usrname, usrpasswd); 00560 if(ret < 0) { 00561 goto mqtt_connect_exit1; 00562 } 00563 00564 /* Register a will message, if specified, into MQTT Library */ 00565 if(NULL != client_ctx->mqtt_will.will_topic ) { 00566 STR2UTF_CONV(will_topic, client_ctx->mqtt_will.will_topic); 00567 STR2UTF_CONV(will_msg, client_ctx->mqtt_will.will_msg); 00568 ret = mqtt_client_ctx_will_register(client_ctx->cli_hndl, 00569 &will_topic, &will_msg, 00570 qos[client_ctx->mqtt_will.will_qos], 00571 client_ctx->mqtt_will.retain); 00572 if(ret < 0) { 00573 Uart_Write((uint8_t*)"mqtt_client_ctx_will_register fail\r\n"); 00574 goto mqtt_connect_exit1; 00575 } 00576 } 00577 00578 client_ctx->awaited_ack = MQTT_CONNACK; 00579 client_ctx->conn_ack = 0; 00580 00581 /* Connect to the server */ 00582 ret = mqtt_connect_msg_send(client_ctx->cli_hndl, clean, keep_alive_time); 00583 /*Network or Socket Error happens*/ 00584 if(ret < 0) { 00585 Uart_Write((uint8_t*)"mqtt_connect_msg_send failed \r\n"); 00586 goto mqtt_connect_exit1; 00587 } 00588 00589 if(false == g_multi_srvr_conn) { 00590 /* Unblock the receive task here */ 00591 RX_TX_SIGNAL_POST(); 00592 } 00593 00594 /* Wait for a CONNACK here */ 00595 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00596 00597 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00598 Uart_Write((uint8_t*)"sl_ExtLib_MqttClientConnect fail\r\n"); 00599 ret = -1; 00600 } else { 00601 ret = (int32_t)client_ctx->conn_ack; 00602 } 00603 00604 mqtt_connect_exit1: 00605 00606 client_ctx->awaited_ack = 0; 00607 client_ctx->conn_ack = 0; 00608 return (ret < 0)? -1: ret; 00609 } 00610 00611 //***************************************************************************** 00612 // sl_ExtLib_MqttClientDisconnect 00613 //***************************************************************************** 00614 int32_t 00615 sl_ExtLib_MqttClientDisconnect(void *cli_ctx) 00616 { 00617 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00618 int32_t retval; 00619 00620 if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) 00621 return -1; 00622 00623 client_ctx->awaited_ack = MQTT_DISCONNECT; 00624 00625 /* send the disconnect command. */ 00626 retval = mqtt_disconn_send(client_ctx->cli_hndl); 00627 00628 if(retval >= 0) { 00629 /* wait on Rx task to acknowledge */ 00630 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00631 } 00632 client_ctx->awaited_ack = 0; 00633 00634 return (retval < 0)? -1: 0; 00635 } 00636 00637 00638 //***************************************************************************** 00639 // sl_ExtLib_MqttClientSub 00640 //***************************************************************************** 00641 int32_t 00642 sl_ExtLib_MqttClientSub(void *cli_ctx, char* const *topics, 00643 uint8_t *qos_level, int32_t count) 00644 { 00645 #define MAX_SIMULTANEOUS_SUB_TOPICS 4 00646 00647 int32_t ret = -1, i; 00648 00649 struct utf8_strqos qos_topics[MAX_SIMULTANEOUS_SUB_TOPICS]; 00650 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00651 00652 if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_SUB_TOPICS)) { 00653 goto mqtt_sub_exit1; /* client not connected*/ 00654 } 00655 00656 for(i = 0; i < count; i++) { 00657 qos_topics[i].buffer = topics[i]; 00658 qos_topics[i].qosreq = qos[qos_level[i]]; 00659 qos_topics[i].length = strlen(topics[i]); 00660 } 00661 00662 /* Set up all variables to receive an ACK for the message to be sent*/ 00663 if(true == client_ctx->blocking_send) { 00664 /* Update the qos_level to be in-out parameter */ 00665 client_ctx->suback_qos = (char*)qos_level; 00666 client_ctx->awaited_ack = MQTT_SUBACK; 00667 } 00668 00669 /* Send the subscription MQTT message */ 00670 ret = mqtt_sub_msg_send(client_ctx->cli_hndl, qos_topics, count); 00671 if(ret < 0) { 00672 goto mqtt_sub_exit1; 00673 } 00674 00675 if(true == client_ctx->blocking_send) { 00676 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00677 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00678 ret = -1; 00679 } 00680 } 00681 00682 mqtt_sub_exit1: 00683 client_ctx->awaited_ack = 0; 00684 return (ret < 0)? -1: 0; 00685 } 00686 00687 00688 //***************************************************************************** 00689 // sl_ExtLib_MqttClientUnsub 00690 //***************************************************************************** 00691 int32_t 00692 sl_ExtLib_MqttClientUnsub(void *cli_ctx, char* const *topics, int32_t count) 00693 { 00694 #define MAX_SIMULTANEOUS_UNSUB_TOPICS 4 00695 00696 int32_t ret = -1, i; 00697 struct utf8_string unsub_topics[MAX_SIMULTANEOUS_UNSUB_TOPICS]; 00698 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00699 00700 if(!(mqtt_client_is_connected(client_ctx->cli_hndl)) || (count > MAX_SIMULTANEOUS_UNSUB_TOPICS)) { 00701 goto mqtt_unsub_exit1; /* Check client status */ 00702 } 00703 00704 for(i = 0; i < count; i++) { 00705 STR2UTF_CONV(unsub_topics[i], topics[i]); 00706 } 00707 00708 /* Set up all variables to receive an ACK for the message to be sent*/ 00709 if(true == client_ctx->blocking_send) { 00710 client_ctx->awaited_ack = MQTT_UNSUBACK; 00711 } 00712 00713 /* Send the unsubscription MQTT message */ 00714 ret = mqtt_unsub_msg_send(client_ctx->cli_hndl, unsub_topics, count); 00715 if(ret < 0) { 00716 goto mqtt_unsub_exit1; 00717 } 00718 00719 if(true == client_ctx->blocking_send) { 00720 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00721 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00722 ret = -1; 00723 } 00724 } 00725 00726 mqtt_unsub_exit1: 00727 client_ctx->awaited_ack = 0; 00728 return (ret < 0)? -1: 0; 00729 } 00730 //***************************************************************************** 00731 // sl_ExtLib_MqttClientSend 00732 //***************************************************************************** 00733 int32_t 00734 sl_ExtLib_MqttClientSend(void *cli_ctx, const char *topic, 00735 const void *data, int32_t len, 00736 char qos_level, bool retain) 00737 { 00738 00739 int32_t ret = -1; 00740 struct utf8_string topic_utf8; 00741 struct sl_client_ctx *client_ctx = (struct sl_client_ctx *)cli_ctx; 00742 00743 /* Check whether Client is connected or not? */ 00744 if(!(mqtt_client_is_connected(client_ctx->cli_hndl))) { 00745 Uart_Write((uint8_t*)"mqtt client is not connected\n\r"); 00746 return ret; 00747 } 00748 00749 STR2UTF_CONV(topic_utf8, topic); 00750 00751 /*Set up all variables to receive an ACK for the message to be sent*/ 00752 if(MQTT_QOS1 == qos[qos_level]) { 00753 client_ctx->awaited_ack = MQTT_PUBACK; 00754 } else if(MQTT_QOS2 == qos[qos_level]) { 00755 client_ctx->awaited_ack = MQTT_PUBCOMP; 00756 } 00757 /*publish the message*/ 00758 ret = mqtt_client_pub_msg_send(client_ctx->cli_hndl, &topic_utf8, 00759 (const uint8_t*)data, len, 00760 qos[qos_level], retain); 00761 if(ret < 0) { 00762 Uart_Write((uint8_t*)"mqtt_client_pub_msg_send failed\n\r"); 00763 goto mqtt_pub_exit1; 00764 } 00765 00766 if(MQTT_QOS0 != qos[qos_level]) { 00767 if(true == client_ctx->blocking_send) { 00768 ACK_RX_SIGNAL_WAIT(&client_ctx->ack_sync_obj); 00769 if(MQTT_DISCONNECT == client_ctx->awaited_ack) { 00770 Uart_Write((uint8_t*)"MQTT_DISCONNECT\n\r"); 00771 ret = -1; 00772 } 00773 } 00774 } 00775 00776 mqtt_pub_exit1: 00777 client_ctx->awaited_ack = 0; 00778 return (ret < 0)? -1: 0; 00779 } 00780 00781 }//namespace mbed_mqtt 00782 00783
Generated on Wed Jul 13 2022 09:55:39 by 1.7.2