Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
mqtt_client.cpp
00001 /****************************************************************************** 00002 * 00003 * Copyright (C) 2014 Texas Instruments Incorporated 00004 * 00005 * All rights reserved. Property of Texas Instruments Incorporated. 00006 * Restricted rights to use, duplicate or disclose this code are 00007 * granted through contract. 00008 * 00009 * The program may not be used without the written permission of 00010 * Texas Instruments Incorporated or against the terms and conditions 00011 * stipulated in the agreement under which this program has been supplied, 00012 * and under no circumstances can it be used with non-TI connectivity device. 00013 * 00014 ******************************************************************************/ 00015 00016 /* 00017 mqtt_client.c 00018 00019 The module provides implementation to the public interface of the MQTT 00020 Client Library. 00021 */ 00022 #include "FreeRTOS.h" 00023 #include "mqtt_client.h" 00024 #include "semphr.h" 00025 00026 #include "cli_uart.h" 00027 00028 #define PRINT_BUF_LEN 128 00029 extern int8_t print_buf[PRINT_BUF_LEN]; 00030 00031 namespace mbed_mqtt 00032 { 00033 00034 //void createMutex(void); 00035 00036 static void *mutex = NULL; 00037 static void (*mutex_lockin)(void*) = NULL; 00038 static void (*mutex_unlock)(void*) = NULL; 00039 00040 #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex); 00041 #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex); 00042 00043 static bool aux_dbg_enbl = true; 00044 int32_t (*debug_printf)(const char *fmt, ...) = NULL; 00045 00046 #define USR_INFO debug_printf 00047 #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__) 00048 00049 static const struct device_net_services *net_ops = NULL; 00050 00051 static uint16_t msg_id = 0xFFFF; 00052 static inline uint16_t assign_new_msg_id() 00053 { 00054 return msg_id += 2; 00055 } 00056 00057 SemaphoreHandle_t xSemaphore = NULL; 00058 void createMutex() 00059 { 00060 00061 xSemaphore = xSemaphoreCreateMutex(); 00062 } 00063 00064 /*----------------------------------------------------------------------------- 00065 * Data structure for managing the client and its nuances 00066 *---------------------------------------------------------------------------*/ 00067 00068 /* Construct to manage TX for network that requires LIB to send a partial and 00069 incremental data to support restrictive TCP segments. Specifically, for the 00070 deployments, in which the network layer supports small segments, there can 00071 be only one in-flight message. 00072 */ 00073 struct tx_part_pkt { 00074 00075 /* Refers to MQP, if TX for it is active, otherwise, set it to NULL */ 00076 struct mqtt_packet *tx_mqp; 00077 00078 union { 00079 #define VH_MSG_LEN 4 00080 uint8_t vh_msg[VH_MSG_LEN]; /* VH msg for MQP = NULL */ 00081 const uint8_t *buffer; /* Refers to data in MQP */ 00082 }; 00083 00084 uint32_t length; /* Length of entire data */ 00085 uint32_t offset; /* Next data for sending */ 00086 }; 00087 00088 static bool tx_part_setup(struct tx_part_pkt *tx_part, const uint8_t *buffer, 00089 uint32_t length, struct mqtt_packet *tx_mqp) 00090 { 00091 00092 if(tx_mqp) { 00093 tx_part->buffer = buffer; 00094 } else { 00095 if(VH_MSG_LEN < length) 00096 return false; 00097 00098 buf_wr_nbytes(tx_part->vh_msg, buffer, length); 00099 } 00100 00101 tx_part->length = length; 00102 tx_part->tx_mqp = tx_mqp; 00103 00104 return true; 00105 } 00106 00107 static void tx_part_reset(struct tx_part_pkt *tx_part) 00108 { 00109 00110 struct mqtt_packet *tx_mqp = tx_part->tx_mqp; 00111 if(tx_mqp) { 00112 mqp_free(tx_mqp); 00113 } 00114 tx_part->vh_msg[0] = 0x00; 00115 tx_part->tx_mqp = NULL; 00116 tx_part->length = 0; 00117 tx_part->offset = 0; 00118 00119 return; 00120 } 00121 00122 static const uint8_t *tx_part_buf_p(struct tx_part_pkt *tx_part) 00123 { 00124 struct mqtt_packet *tx_mqp = tx_part->tx_mqp; 00125 uint32_t offset = tx_part->offset; 00126 00127 return tx_mqp? 00128 tx_part->buffer + offset : 00129 tx_part->vh_msg + offset; 00130 } 00131 00132 static void tx_part_addup(struct tx_part_pkt *tx_part, uint32_t offset) 00133 { 00134 tx_part->offset += offset; 00135 } 00136 00137 #define TX_PART_BUFFER(tx_part) tx_part_buf_p(tx_part) 00138 #define TX_PART_BUF_SZ(tx_part) (tx_part->length - tx_part->offset) 00139 #define TX_PART_IN_USE(tx_part) TX_PART_BUF_SZ(tx_part) 00140 00141 enum module_state { 00142 00143 WAIT_INIT_STATE, 00144 INIT_DONE_STATE = 0x01, 00145 }; 00146 00147 static enum module_state cl_lib_state = WAIT_INIT_STATE; 00148 00149 static uint16_t loopb_portid = 0; 00150 static bool grp_has_cbfn = false; 00151 00152 #define USE_PROTO_V31_FLAG MQTT_CFG_PROTOCOL_V31 00153 #define APP_RECV_TASK_FLAG MQTT_CFG_APP_HAS_RTSK 00154 #define GROUP_CONTEXT_FLAG MQTT_CFG_MK_GROUP_CTX 00155 00156 #define CLEAN_SESSION_FLAG 0x00010000 00157 #define CONNACK_AWAIT_FLAG 0x00020000 00158 #define NOW_CONNECTED_FLAG 0x00040000 00159 #define KA_PINGER_RSP_FLAG 0x00080000 00160 #define USER_PING_RSP_FLAG 0x00100000 00161 #define NETWORK_CLOSE_FLAG 0x00200000 00162 #define DO_CONNACK_TO_FLAG 0x00400000 00163 00164 static struct client_ctx *free_ctxs = NULL; /* CTX construct available */ 00165 static struct client_ctx *used_ctxs = NULL; /* Relevant only for group */ 00166 static struct client_ctx *conn_ctxs = NULL; /* Relevant only for group */ 00167 00168 static void cl_ctx_freeup(struct client_ctx *cl_ctx) 00169 { 00170 cl_ctx->next = free_ctxs; 00171 free_ctxs = cl_ctx; 00172 00173 return; 00174 } 00175 00176 #define IS_PROTO_VER31(cl_ctx) (cl_ctx->flags & USE_PROTO_V31_FLAG) 00177 #define AWAITS_CONNACK(cl_ctx) (cl_ctx->flags & CONNACK_AWAIT_FLAG) 00178 #define HAS_CONNECTION(cl_ctx) (cl_ctx->flags & NOW_CONNECTED_FLAG) 00179 #define AWAITS_KA_PING(cl_ctx) (cl_ctx->flags & KA_PINGER_RSP_FLAG) 00180 #define AWAITS_PINGRSP(cl_ctx) (cl_ctx->flags & USER_PING_RSP_FLAG) 00181 #define IS_CLN_SESSION(cl_ctx) (cl_ctx->flags & CLEAN_SESSION_FLAG) 00182 #define RECV_TASK_AVBL(cl_ctx) (cl_ctx->flags & APP_RECV_TASK_FLAG) 00183 #define A_GROUP_MEMBER(cl_ctx) (cl_ctx->flags & GROUP_CONTEXT_FLAG) 00184 #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG) 00185 #define CFG_CONNACK_TO(cl_ctx) (cl_ctx->flags & DO_CONNACK_TO_FLAG) 00186 00187 #ifndef CFG_CL_MQTT_CTXS 00188 #define MAX_NWCONN 4 00189 #else 00190 #define MAX_NWCONN CFG_CL_MQTT_CTXS 00191 #endif 00192 00193 static struct client_desc { 00194 00195 /* ALERT: "context" must be first elem in this structure, do not move */ 00196 struct client_ctx context; 00197 00198 #define CLIENT(cl_ctx) ((struct client_desc*) cl_ctx) 00199 #define CL_CTX(client) ((struct client_ctx*) client) 00200 00201 /* Order/Sequence: Client ID, Will Topic, Will Msg, Username, Password */ 00202 const struct utf8_string *conn_pl_utf8s[5]; /* Ref: CONNECT Payload */ 00203 uint8_t will_opts; 00204 00205 /* Wait-List for Level 1 ACK(s), which are PUBACK, PUBREC, UN/SUBACK */ 00206 struct mqtt_ack_wlist qos_ack1_wl; 00207 00208 /* Circular queue to track QOS2 PUBLISH packets from the server. They 00209 are tracked for the duration of PUBLISH-RX to PUBREL-RX. 00210 */ 00211 struct pub_qos2_cq qos2_rx_cq; 00212 00213 /* Circular queue to track QOS2 PUBLISH packets from the client. They 00214 are tracked for the duration of PUBREC-RX to PUBOMP-RX. 00215 */ 00216 struct pub_qos2_cq qos2_tx_cq; 00217 00218 struct mqtt_client_ctx_cbs app_ctx_cbs; /* Callback funcs from app */ 00219 #define CTX_CBS_PTR(cl_ctx) &(CLIENT(cl_ctx)->app_ctx_cbs) 00220 00221 struct tx_part_pkt tx_part;/* Reference to partial TX PKT */ 00222 struct mqtt_packet *rx_mqp; /* Reference to partial RX PKT */ 00223 void *app; 00224 00225 uint32_t nwconn_opts; 00226 char *server_addr; 00227 uint16_t port_number; 00228 struct secure_conn nw_security; 00229 00230 } clients[MAX_NWCONN]; 00231 00232 static void client_reset(struct client_desc *client) 00233 { 00234 00235 struct mqtt_client_ctx_cbs *ctx_cbs = &client->app_ctx_cbs; 00236 int32_t i = 0; 00237 00238 cl_ctx_reset(CL_CTX(client)); 00239 00240 for(i = 0; i < 5; i++) { 00241 client->conn_pl_utf8s[i] = NULL; 00242 } 00243 client->will_opts = 0; 00244 00245 mqp_ack_wlist_purge(&client->qos_ack1_wl); 00246 00247 qos2_pub_cq_reset(&client->qos2_rx_cq); 00248 qos2_pub_cq_reset(&client->qos2_tx_cq); 00249 00250 ctx_cbs->publish_rx = NULL; 00251 ctx_cbs->ack_notify = NULL; 00252 ctx_cbs->disconn_cb = NULL; 00253 00254 tx_part_reset(&client->tx_part); 00255 client->rx_mqp = NULL; 00256 client->app = NULL; 00257 00258 client->nwconn_opts = 0; 00259 client->server_addr = NULL; 00260 client->port_number = 0; 00261 00262 secure_conn_struct_init(&client->nw_security); 00263 00264 return; 00265 } 00266 00267 static void client_desc_init(void) 00268 { 00269 int32_t i = 0; 00270 for(i = 0; i < MAX_NWCONN; i++) { 00271 struct client_desc *client = clients + i; 00272 struct client_ctx *cl_ctx = CL_CTX(client); 00273 00274 /* Initialize certain fields to defaults */ 00275 client->qos_ack1_wl.head = NULL; 00276 client->qos_ack1_wl.tail = NULL; 00277 client->tx_part.tx_mqp = NULL; 00278 00279 client_reset(client); /* Reset remaining */ 00280 00281 cl_ctx->next = free_ctxs; 00282 free_ctxs = cl_ctx; 00283 } 00284 00285 return; 00286 } 00287 00288 static void mqp_free_locked(struct mqtt_packet *mqp) 00289 { 00290 00291 // MUTEX_LOCKIN(); 00292 00293 // MUTEX_UNLOCK(); 00294 00295 if( xSemaphore != NULL ) { 00296 // See if we can obtain the semaphore. If the semaphore is not available 00297 // wait 10 ticks to see if it becomes free. 00298 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 00299 // We were able to obtain the semaphore and can now access the 00300 // shared resource. 00301 mqp_free(mqp); 00302 00303 // We have finished accessing the shared resource. Release the 00304 // semaphore. 00305 xSemaphoreGive( xSemaphore ); 00306 00307 } else { 00308 // We could not obtain the semaphore and can therefore not access 00309 // the shared resource safely. 00310 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 00311 } 00312 } 00313 } 00314 00315 /*---------------------------------------------------------------------------- 00316 * Fix-up to prevent certain good and non-callback MQP being reported to app 00317 *---------------------------------------------------------------------------- 00318 */ 00319 /* cor --> clear on read. */ 00320 static bool mqp_do_not_report_cor(struct mqtt_packet *mqp) 00321 { 00322 bool rv = (1 == mqp->private_)? true : false; 00323 mqp->private_ = 0; 00324 return rv; 00325 } 00326 00327 #define MQP_RX_DO_NOT_RPT_COR(mqp) mqp_do_not_report_cor(mqp) 00328 00329 /* Only if MQP has good RX data i.e. this macro shouldn't be used for bad RX */ 00330 #define MQP_RX_DO_NOT_RPT_SET(mqp) (mqp->private_ = 1) 00331 00332 #define MQP_TX_DONE_LEN_ADD(mqp, add) (mqp->private_ += add) 00333 #define MQP_TX_DONE_LEN_GET(mqp) (mqp->private_) 00334 00335 /*---------------------------Fix-Up Ends ------------------------------------*/ 00336 00337 static int32_t loopb_net = -1; 00338 static const uint8_t LOOP_DATA[] = {0x00, 0x01}; 00339 #define LOOP_DLEN sizeof(LOOP_DATA) 00340 00341 static int32_t loopb_trigger(void) 00342 { 00343 00344 00345 uint8_t ip_addr[] = {127,0,0,1}; 00346 00347 return (-1 != loopb_net)? 00348 net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN, 00349 loopb_portid, ip_addr, 4) : MQP_ERR_LIBQUIT; 00350 } 00351 00352 static void session_311fix(struct client_ctx *cl_ctx) 00353 { 00354 struct mqtt_ack_wlist *wl = &CLIENT(cl_ctx)->qos_ack1_wl; 00355 struct mqtt_packet *elem = wl->head; 00356 00357 while(elem) { 00358 struct mqtt_packet *next = elem->next; 00359 if(MQTT_PUBLISH != elem->msg_type) 00360 mqp_ack_wlist_remove(wl, elem->msg_id); 00361 00362 elem = next; 00363 } 00364 00365 return; 00366 } 00367 00368 static void session_delete(struct client_ctx *cl_ctx) 00369 { 00370 struct client_desc *client = CLIENT(cl_ctx); 00371 00372 DBG_INFO("C: Cleaning session for net %d\n\r", cl_ctx->net); 00373 00374 qos2_pub_cq_reset(&client->qos2_rx_cq); 00375 qos2_pub_cq_reset(&client->qos2_tx_cq); 00376 00377 mqp_ack_wlist_purge(&client->qos_ack1_wl); 00378 00379 return; 00380 } 00381 00382 /*------------------------------------------------------------------------------ 00383 * Routine to manage error conditions in client - close the network connection 00384 *----------------------------------------------------------------------------*/ 00385 static void do_net_close(struct client_ctx *cl_ctx) 00386 { 00387 int32_t net = cl_ctx->net; 00388 00389 if(-1 == net) 00390 return; /* network closed already, must not happen */ 00391 00392 if(IS_CLN_SESSION(cl_ctx)) { 00393 session_delete(cl_ctx); 00394 } else if(!IS_PROTO_VER31(cl_ctx)) { 00395 /* Version 3.1.1 doesn't need SUB and UNSUB re-send */ 00396 session_311fix(cl_ctx); 00397 } 00398 00399 tx_part_reset(&CLIENT(cl_ctx)->tx_part); /* Part TX, if any */ 00400 00401 cl_ctx->flags &= ~(CONNACK_AWAIT_FLAG | NOW_CONNECTED_FLAG | 00402 KA_PINGER_RSP_FLAG | USER_PING_RSP_FLAG | 00403 NETWORK_CLOSE_FLAG | DO_CONNACK_TO_FLAG); 00404 00405 cl_ctx->net = -1; 00406 net_ops->close(net); 00407 00408 USR_INFO("C: Net %d now closed\n\r", net); 00409 00410 return; 00411 } 00412 00413 static void do_net_close_rx(struct client_ctx *cl_ctx, int32_t cause) 00414 { 00415 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); 00416 00417 DBG_INFO("C: RX closing Net %d [%d]\n\r", cl_ctx->net, cause); 00418 00419 do_net_close(cl_ctx); 00420 if(ctx_cbs->disconn_cb) 00421 ctx_cbs->disconn_cb(CLIENT(cl_ctx)->app, cause); 00422 00423 if(A_GROUP_MEMBER(cl_ctx)) 00424 cl_ctx_remove(&used_ctxs, cl_ctx); 00425 00426 return; 00427 } 00428 00429 static void do_net_close_tx(struct client_ctx *cl_ctx, char *cause) 00430 { 00431 DBG_INFO("C: TX closing Net %d [%s]\n\r", cl_ctx->net, cause); 00432 00433 if(RECV_TASK_AVBL(cl_ctx)) { 00434 cl_ctx->flags |= NETWORK_CLOSE_FLAG; 00435 if(A_GROUP_MEMBER(cl_ctx)) 00436 loopb_trigger(); 00437 } else { 00438 struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp; 00439 00440 do_net_close(cl_ctx); /* No RX Task, close now */ 00441 00442 /* Release partial MQP, if any, for a CTX w/ CB */ 00443 if((NULL != rx_mqp) && (NULL != rx_mqp->free)) 00444 mqp_free(rx_mqp); 00445 00446 CLIENT(cl_ctx)->rx_mqp = NULL; 00447 } 00448 00449 return; 00450 } 00451 00452 /*---------------------------------------------------------------------------- 00453 * QoS2 PUB RX Message handling mechanism and associated house-keeping 00454 *--------------------------------------------------------------------------*/ 00455 static bool qos2_pub_rx_logup(struct client_ctx *cl_ctx, uint16_t msg_id) 00456 { 00457 return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); 00458 } 00459 00460 static bool ack2_msg_id_logup(struct client_ctx *cl_ctx, uint16_t msg_id) 00461 { 00462 return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_tx_cq, msg_id); 00463 } 00464 00465 static bool qos2_pub_rx_unlog(struct client_ctx *cl_ctx, uint16_t msg_id) 00466 { 00467 return qos2_pub_cq_unlog(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); 00468 } 00469 00470 static bool ack2_msg_id_unlog(struct client_ctx *cl_ctx, uint16_t msg_id) 00471 { 00472 struct client_desc *client = CLIENT(cl_ctx); 00473 if(qos2_pub_cq_unlog(&client->qos2_tx_cq, msg_id)) { 00474 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); 00475 if(ctx_cbs->ack_notify) 00476 ctx_cbs->ack_notify(client->app, MQTT_PUBCOMP, 00477 msg_id, NULL, 0); 00478 return true; 00479 } 00480 00481 return false; 00482 } 00483 00484 static bool qos2_pub_rx_is_done(struct client_ctx *cl_ctx, uint16_t msg_id) 00485 { 00486 return qos2_pub_cq_check(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); 00487 } 00488 00489 static bool awaits_pkts(struct client_ctx *cl_ctx) 00490 { 00491 00492 struct client_desc *client = CLIENT(cl_ctx); 00493 00494 return client->qos_ack1_wl.head || 00495 qos2_pub_cq_count(&client->qos2_rx_cq) || 00496 qos2_pub_cq_count(&client->qos2_tx_cq)? 00497 true : false; 00498 } 00499 00500 static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp) 00501 { 00502 mqp_free(mqp); 00503 return MQP_ERR_PKT_LEN; 00504 } 00505 00506 static int32_t is_valid_utf8_string(const struct utf8_string *utf8) 00507 { 00508 /* Valid topic should be > 0 byte and must hosted in usable buffer */ 00509 return ((utf8->length > 0) && (NULL != utf8->buffer))? true : false; 00510 } 00511 00512 #define RET_IF_INVALID_UTF8(utf8) \ 00513 if(false == is_valid_utf8_string(utf8)) \ 00514 return -1; 00515 00516 static bool is_connected(struct client_ctx *cl_ctx) 00517 { 00518 00519 return (HAS_CONNECTION(cl_ctx) && !NEED_NET_CLOSE(cl_ctx))? 00520 true : false; 00521 } 00522 00523 uint16_t mqtt_client_new_msg_id() 00524 { 00525 return assign_new_msg_id(); 00526 } 00527 00528 bool mqtt_client_is_connected(void *ctx) 00529 { 00530 return is_connected(CL_CTX(ctx)); 00531 } 00532 00533 /*---------------------------------------------------------------------------- 00534 * MQTT TX Routines 00535 *--------------------------------------------------------------------------*/ 00536 static void used_ctxs_TO_sort(struct client_ctx *cl_ctx_TO) 00537 { 00538 cl_ctx_remove(&used_ctxs, cl_ctx_TO); 00539 cl_ctx_timeout_insert(&used_ctxs, cl_ctx_TO); 00540 } 00541 00542 static inline int32_t net_send(int32_t net, const uint8_t *buf, uint32_t len, void *ctx) 00543 { 00544 00545 int32_t rv = net_ops->send(net, buf, len, ctx); 00546 if(rv <= 0) { 00547 memset(print_buf, 0x00, PRINT_BUF_LEN); 00548 sprintf((char*) print_buf, "MQP_ERR_NETWORK %i\r\n", MQP_ERR_NETWORK); 00549 Uart_Write((uint8_t *) print_buf); 00550 rv = MQP_ERR_NETWORK; 00551 } 00552 return rv; 00553 } 00554 00555 #if 0 00556 static int32_t cl_ctx_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len, 00557 bool is_conn_msg) 00558 { 00559 00560 int32_t rv = MQP_ERR_NOTCONN; 00561 00562 /* For CONNECT msg, a client context mustn't be already connected. 00563 For others msgs, a client context must be conected to server */ 00564 if(false == (is_conn_msg ^ is_connected(cl_ctx))) 00565 goto cl_ctx_send_exit1; 00566 00567 rv = net_send(cl_ctx->net, buf, len); 00568 if(rv > 0) { /* A good send, do follow-up */ 00569 cl_ctx_timeout_update(cl_ctx, net_ops->time()); 00570 if(A_GROUP_MEMBER(cl_ctx) && HAS_CONNECTION(cl_ctx)) { 00571 /* With update to timeout, 00572 a sorting is impending */ 00573 used_ctxs_TO_sort(cl_ctx); 00574 } 00575 00576 goto cl_ctx_send_exit1; /* A Good Send */ 00577 } 00578 00579 do_net_close_tx(cl_ctx, "snd-err"); 00580 00581 cl_ctx_send_exit1: 00582 USR_INFO("C: FH-B1 0x%02x, len %u bytes, to net %d: %s\n\r", 00583 *buf, len, cl_ctx->net, (rv > 0)? "Sent" : "Fail"); 00584 return rv; 00585 } 00586 #endif 00587 00588 static int32_t cl_ctx_part_send(struct client_ctx *cl_ctx) 00589 { 00590 00591 struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part; 00592 const uint8_t *buf = TX_PART_BUFFER(tx_part); 00593 uint32_t len = TX_PART_BUF_SZ(tx_part); 00594 uint32_t ofs = tx_part->offset; 00595 uint8_t B1 = *buf; 00596 00597 int32_t rv = net_send(cl_ctx->net, buf, len, (void*)cl_ctx); 00598 if(rv > 0) { /* Follow-up for a good send */ 00599 if(HAS_CONNECTION(cl_ctx)) { 00600 /* Update TX timeout, if 'CTX' is connected */ 00601 cl_ctx_timeout_update(cl_ctx, net_ops->time()); 00602 00603 /* After update, 'CTX'(s) sorting is a must */ 00604 if(A_GROUP_MEMBER(cl_ctx)) 00605 used_ctxs_TO_sort(cl_ctx); 00606 } 00607 00608 if(rv != len) 00609 /* Partial data was sent */ 00610 tx_part_addup(tx_part, rv); 00611 else 00612 tx_part_reset(tx_part); 00613 00614 goto cl_ctx_send_exit1; /* A Good Send */ 00615 } 00616 00617 do_net_close_tx(cl_ctx, "snd-err"); 00618 00619 cl_ctx_send_exit1: 00620 USR_INFO("C: %s 0x%02x to net %d, %s (%d Bytes) [@ %u]\n\r", 00621 ofs? "PartN" : "FH-B1", B1, cl_ctx->net, 00622 (rv > 0)? "Sent" : "Fail", rv, net_ops->time()); 00623 00624 return rv; 00625 } 00626 00627 static int32_t cl_ctx_seg1_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len, 00628 bool is_conn_msg, struct mqtt_packet *tx_mqp) 00629 { 00630 00631 struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part; 00632 00633 /* For CONNECT msg, a client context mustn't be already connected. 00634 For others msgs, a client context must be conected to server */ 00635 if(false == (is_conn_msg ^ is_connected(cl_ctx))) 00636 return MQP_ERR_NOTCONN; 00637 00638 if(TX_PART_IN_USE(tx_part)) 00639 return MQP_ERR_BADCALL; 00640 00641 tx_part_setup(tx_part, buf, len, tx_mqp); 00642 00643 return cl_ctx_part_send(cl_ctx); 00644 } 00645 00646 int32_t mqtt_client_send_progress(void *ctx) 00647 { 00648 struct client_ctx *cl_ctx = CL_CTX(ctx); 00649 struct tx_part_pkt *tx_part = NULL; 00650 int32_t rv = MQP_ERR_BADCALL; 00651 00652 if(NULL == ctx) 00653 return MQP_ERR_FNPARAM; 00654 00655 tx_part = &CLIENT(cl_ctx)->tx_part; 00656 00657 if(!TX_PART_IN_USE(tx_part)) 00658 return rv; 00659 00660 rv = cl_ctx_part_send(cl_ctx); 00661 if(rv > 0) 00662 rv = TX_PART_BUF_SZ(tx_part); 00663 00664 return rv; 00665 } 00666 00667 static int32_t wr_connect_pl(struct client_ctx *cl_ctx, uint8_t *buf, 00668 uint32_t fsz, uint8_t *conn_opts) 00669 { 00670 00671 /* UTF8 usage selection order: Client, W-Topic, W-Msg, User-Name, Pwd */ 00672 uint8_t utf8_sel[] = {0x00, WILL_CONFIG_VAL, 0x00, 00673 USER_NAME_OPVAL, PASS_WORD_OPVAL 00674 }; 00675 struct client_desc *client = CLIENT(cl_ctx); 00676 uint8_t *ref = buf; 00677 00678 int32_t i = 0; 00679 00680 for(i = 0; i < 5; i++) { /* TBD 5 --> macro */ 00681 uint16_t len = 2; 00682 const struct utf8_string *utf8 = client->conn_pl_utf8s[i]; 00683 if(NULL == utf8) { 00684 /* UTF8 absent: Client ID (i = 0) and Will MSG (i = 2) 00685 set zero byte length in the CONNECT message */ 00686 if(0 != i) 00687 if(!((2 == i) && (*conn_opts & WILL_CONFIG_VAL))) 00688 continue; /* Others, just pass */ 00689 } else { 00690 len += utf8->length; 00691 } 00692 00693 if(fsz < (buf - ref + len)) { /* TBD end = ref + fsz */ 00694 Uart_Write((uint8_t*)"Payload: no space left fail\r\n"); 00695 return MQP_ERR_PKT_LEN; /* Payload: no space left */ 00696 } 00697 if(2 == len) { 00698 buf += buf_wr_nbo_2B(buf, 0); /* WR 0 byte length */ 00699 } else { 00700 buf += mqp_buf_wr_utf8(buf, utf8); 00701 } 00702 *conn_opts |= utf8_sel[i]; /* Enable message flags */ 00703 } 00704 00705 return buf - ref; 00706 } 00707 00708 00709 /* Define protocol information for the supported versions */ 00710 static uint8_t mqtt310[] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', 0x03}; 00711 static uint8_t mqtt311[] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04}; 00712 00713 static inline uint16_t get_connect_vh_len(struct client_ctx *cl_ctx) 00714 { 00715 00716 return (IS_PROTO_VER31(cl_ctx)? sizeof(mqtt310) : sizeof(mqtt311)) 00717 + 3; 00718 } 00719 00720 static int32_t wr_connect_vh(struct client_ctx *cl_ctx, uint8_t *buf, 00721 uint16_t ka_secs, uint8_t conn_opts) 00722 { 00723 00724 uint8_t *ref = buf; 00725 00726 if(IS_PROTO_VER31(cl_ctx)) 00727 buf += buf_wr_nbytes(buf, mqtt310, sizeof(mqtt310)); 00728 else 00729 buf += buf_wr_nbytes(buf, mqtt311, sizeof(mqtt311)); 00730 00731 *buf++ = conn_opts; 00732 buf += buf_wr_nbo_2B(buf, ka_secs); 00733 00734 return buf - ref; 00735 } 00736 00737 static int32_t net_connect(struct client_ctx *cl_ctx) 00738 { 00739 00740 struct client_desc *client = CLIENT(cl_ctx); 00741 00742 if(NEED_NET_CLOSE(cl_ctx)) { 00743 Uart_Write((uint8_t*)"Return MQP_ERR_NOT_DEF\r\n"); 00744 return MQP_ERR_NOT_DEF; 00745 } 00746 if(NULL == net_ops) { 00747 Uart_Write((uint8_t*)"Return MQP_ERR_NET_OPS\r\n"); 00748 return MQP_ERR_NET_OPS; 00749 } 00750 cl_ctx->net = net_ops->open(client->nwconn_opts | DEV_NETCONN_OPT_TCP, 00751 client->server_addr, 00752 client->port_number, 00753 &client->nw_security); 00754 00755 return (-1 == cl_ctx->net)? MQP_ERR_NETWORK : 0; 00756 } 00757 00758 static 00759 int32_t cl_ctx_conn_state_try_locked(struct client_ctx *cl_ctx, const uint8_t *buf, 00760 uint32_t len, uint16_t ka_secs, bool clean_session, 00761 struct mqtt_packet *tx_mqp) 00762 { 00763 00764 int32_t rv = 0; 00765 00766 // MUTEX_LOCKIN(); 00767 if( xSemaphore != NULL ) { 00768 // See if we can obtain the semaphore. If the semaphore is not available 00769 // wait 10 ticks to see if it becomes free. 00770 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 100 ) == pdTRUE ) { 00771 // We were able to obtain the semaphore and can now access the 00772 // shared resource. 00773 00774 rv = net_connect(cl_ctx); 00775 if(rv < 0) { 00776 Uart_Write((uint8_t*)"net_connect failed\r\n"); 00777 goto cl_ctx_conn_state_try_locked_exit1; 00778 } 00779 /* Ensure LIB is initialized & CTX isn't awaiting CONNACK */ 00780 rv = MQP_ERR_BADCALL; 00781 if(false == ((INIT_DONE_STATE != cl_lib_state) || (AWAITS_CONNACK(cl_ctx)))) { 00782 rv = cl_ctx_seg1_send(cl_ctx, buf, len, true, tx_mqp); 00783 } 00784 if(rv < 0) { 00785 Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n"); 00786 goto cl_ctx_conn_state_try_locked_exit1; /* Fail */ 00787 } 00788 /* Successfully sent CONNECT msg - let's do housekeeping */ 00789 cl_ctx->timeout = net_ops->time();/* Fixup: CONN TX Time */ 00790 cl_ctx->flags |= DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG; 00791 cl_ctx->flags |= clean_session? CLEAN_SESSION_FLAG : 0; 00792 00793 cl_ctx->ka_secs = ka_secs; 00794 00795 if(A_GROUP_MEMBER(cl_ctx)) { 00796 cl_ctx->next = conn_ctxs; 00797 conn_ctxs = cl_ctx; 00798 00799 /* First entry in 'conn_ctxs': schedule a move to 00800 'used_conn' (for house-keeping and tracking) */ 00801 if(NULL == cl_ctx->next) { 00802 rv = loopb_trigger(); 00803 } 00804 } 00805 00806 00807 // We have finished accessing the shared resource. Release the 00808 // semaphore. 00809 //xSemaphoreGive( xSemaphore ); 00810 } else { 00811 // We could not obtain the semaphore and can therefore not access 00812 // the shared resource safely. 00813 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 00814 } 00815 } 00816 00817 cl_ctx_conn_state_try_locked_exit1: 00818 // MUTEX_UNLOCK(); 00819 xSemaphoreGive(xSemaphore); 00820 00821 return rv; 00822 } 00823 00824 static 00825 int32_t connect_msg_send(struct client_ctx *cl_ctx, bool clean_session, uint16_t ka_secs) 00826 { 00827 00828 struct mqtt_packet *mqp = mqp_client_send_alloc(MQTT_CONNECT); 00829 uint8_t *buf, *ref, conn_opts = clean_session? CLEAN_START_VAL : 0; 00830 int32_t rv = MQP_ERR_PKT_LEN; 00831 uint32_t fsz; /* Free buffer size in PKT */ 00832 uint16_t vhl = get_connect_vh_len(cl_ctx); 00833 00834 if(NULL == mqp) { 00835 Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\r\n"); 00836 return MQP_ERR_PKT_AVL; 00837 } 00838 fsz = MQP_FREEBUF_LEN(mqp); 00839 if(fsz < vhl) { 00840 Uart_Write((uint8_t*)"No space for VAR HDR\r\n"); 00841 goto connect_msg_send_exit1; /* No space for VAR HDR */ 00842 } 00843 mqp->vh_len = vhl; /* Reserve buffer for variable header */ 00844 buf = ref = MQP_PAYLOAD_BUF(mqp);/* Get started to incorporate payload */ 00845 00846 rv = wr_connect_pl(cl_ctx, buf, fsz - vhl, &conn_opts);/* Payload data */ 00847 if(rv < 0) { 00848 memset(print_buf, 0x00, PRINT_BUF_LEN); 00849 sprintf((char*) print_buf, "Payload WR failed %i\r\n",rv); 00850 Uart_Write((uint8_t *) print_buf); 00851 goto connect_msg_send_exit1; /* Payload WR failed */ 00852 } 00853 buf += rv; 00854 mqp->pl_len = buf - ref; 00855 00856 wr_connect_vh(cl_ctx, ref - vhl, ka_secs, 00857 CLIENT(cl_ctx)->will_opts | conn_opts); /* Var Header */ 00858 00859 mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, MQTT_QOS0, false));/* Fix Header */ 00860 ref = MQP_FHEADER_BUF(mqp); 00861 00862 /* Following routine frees up MQP - whether error or not */ 00863 return cl_ctx_conn_state_try_locked(cl_ctx, ref, buf - ref, 00864 ka_secs, clean_session, 00865 mqp); 00866 connect_msg_send_exit1: 00867 00868 if(mqp) { 00869 mqp_free_locked(mqp); 00870 } 00871 return rv; 00872 } 00873 00874 int32_t mqtt_connect_msg_send(void *ctx, bool clean_session, uint16_t ka_secs) 00875 { 00876 00877 return ctx? 00878 connect_msg_send(CL_CTX(ctx), clean_session, ka_secs) : -1; 00879 } 00880 00881 /* 00882 To be used for the following messages: PUBLISH, SUBSCRIBE, UNSUBSCRIBE 00883 Dispatches msg to broker over socket. Frees-up MQP, in case, MSG has QoS0 or 00884 if client-lib allocated MQP encounters an error in dispatch. 00885 Returns, on success, number of bytes transfered, otherwise -1 00886 */ 00887 static int32_t _msg_dispatch(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, 00888 enum mqtt_qos qos, bool retain) 00889 { 00890 00891 bool not_qos0 = (MQTT_QOS0 != qos)? true : false; 00892 uint16_t msg_id = mqp->msg_id; 00893 int32_t rv = MQP_ERR_NETWORK; 00894 00895 mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain)); 00896 00897 // MUTEX_LOCKIN(); 00898 if( xSemaphore != NULL ) { 00899 // See if we can obtain the semaphore. If the semaphore is not available 00900 // wait 10 ticks to see if it becomes free. 00901 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 200 ) == pdTRUE ) { 00902 // We were able to obtain the semaphore and can now access the 00903 // shared resource. 00904 00905 if(not_qos0) { 00906 mqp->n_refs++; /* Need to enlist, do not free-up MQP */ 00907 } 00908 /* Tries to free-up MQP either on error or if full pkt is sent */ 00909 rv = cl_ctx_seg1_send(cl_ctx, MQP_FHEADER_BUF(mqp), 00910 MQP_CONTENT_LEN(mqp), false, 00911 mqp); 00912 00913 /* At this point, error or not, QoS0 MQP would have been freed */ 00914 00915 if((rv <= 0) && not_qos0) { 00916 mqp_free(mqp); /* Err: Explicitly free-up non QoS0 MQP */ 00917 Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n"); 00918 goto _msg_dispatch_exit1; 00919 } 00920 00921 rv = msg_id; /* Make progress for a good send to the server */ 00922 00923 if(not_qos0) { /* Enlist non QOS0 MQP to await ACK from server */ 00924 mqp_ack_wlist_append(&CLIENT(cl_ctx)->qos_ack1_wl, mqp); 00925 } 00926 // We have finished accessing the shared resource. Release the 00927 // semaphore. 00928 //xSemaphoreGive( xSemaphore ); 00929 } else { 00930 // We could not obtain the semaphore and can therefore not access 00931 // the shared resource safely. 00932 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 00933 } 00934 } 00935 00936 _msg_dispatch_exit1: 00937 00938 // MUTEX_UNLOCK(); 00939 xSemaphoreGive(xSemaphore); 00940 00941 return rv; 00942 } 00943 00944 static 00945 int32_t msg_dispatch_no_free(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, 00946 enum mqtt_qos qos, bool retain) 00947 { 00948 if((NULL == mqp) || (NULL == cl_ctx)) 00949 return MQP_ERR_FNPARAM; 00950 00951 mqp->n_refs++; /* Ensures caller that MQP is not freed-up */ 00952 00953 return _msg_dispatch(cl_ctx, mqp, qos, retain); 00954 } 00955 00956 int32_t mqtt_client_pub_msg_send(void *ctx, const struct utf8_string *topic, 00957 const uint8_t *data_buf, uint32_t data_len, 00958 enum mqtt_qos qos, bool retain) 00959 { 00960 00961 struct mqtt_packet *mqp = NULL; 00962 00963 if((NULL == ctx) || 00964 (NULL == topic) || 00965 ((data_len > 0) && (NULL == data_buf))) { 00966 Uart_Write((uint8_t*)"MQP_ERR_FNPARAM\n\r"); 00967 return MQP_ERR_FNPARAM; 00968 } 00969 if(false == is_valid_utf8_string(topic)) { 00970 Uart_Write((uint8_t*)"MQP_ERR_CONTENT\n\r"); 00971 return MQP_ERR_CONTENT; 00972 } 00973 mqp = mqp_client_send_alloc(MQTT_PUBLISH); 00974 if(NULL == mqp) { 00975 Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\n\r"); 00976 return MQP_ERR_PKT_AVL; 00977 } 00978 if((0 > mqp_pub_append_topic(mqp, topic, qos? assign_new_msg_id(): 0)) || 00979 (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) { 00980 Uart_Write((uint8_t*)"len_err\n\r"); 00981 return len_err_free_mqp(mqp); 00982 } 00983 00984 return _msg_dispatch(CL_CTX(ctx), mqp, qos, retain); 00985 } 00986 00987 int32_t mqtt_client_pub_dispatch(void *ctx, struct mqtt_packet *mqp, 00988 enum mqtt_qos qos, bool retain) 00989 { 00990 return msg_dispatch_no_free(CL_CTX(ctx), mqp, qos, retain); 00991 } 00992 00993 static int32_t tail_incorp_msg_id(struct mqtt_packet *mqp) 00994 { 00995 uint8_t *buf = MQP_FHEADER_BUF(mqp) + mqp->vh_len; 00996 00997 if(0 == mqp->msg_id) { 00998 mqp->msg_id = assign_new_msg_id(); 00999 buf += buf_wr_nbo_2B(buf, mqp->msg_id); 01000 mqp->vh_len += 2; 01001 01002 return 2; 01003 } 01004 01005 return 0; 01006 } 01007 01008 static int32_t buf_utf8_wr_try(uint8_t *buf, uint32_t fsz, const struct utf8_string *topic, 01009 uint8_t qid) 01010 { 01011 uint8_t *ref = buf; 01012 01013 if(fsz < (topic->length + 2 + (QFL_VALUE == qid)? 0 : 1)) 01014 return MQP_ERR_PKT_LEN; /* No buf */ 01015 01016 if(false == is_valid_utf8_string(topic)) 01017 return MQP_ERR_CONTENT;/* Invalid */ 01018 01019 buf += mqp_buf_wr_utf8(buf, topic); 01020 if(QFL_VALUE != qid) 01021 *buf++ = qid; 01022 01023 return buf - ref; 01024 } 01025 01026 static int32_t utf8_array_send(struct client_ctx *cl_ctx, 01027 const struct utf8_strqos *subsc_vec, 01028 const struct utf8_string *unsub_vec, 01029 uint32_t n_elem) 01030 { 01031 struct mqtt_packet *mqp; 01032 uint8_t *ref, *buf, *end; 01033 uint32_t i; 01034 01035 if((NULL == cl_ctx) || !((!!subsc_vec) ^ (!!unsub_vec)) || (0 == n_elem)) 01036 return MQP_ERR_FNPARAM; 01037 01038 mqp = mqp_client_send_alloc(subsc_vec? 01039 MQTT_SUBSCRIBE : MQTT_UNSUBSCRIBE); 01040 if(NULL == mqp) 01041 return MQP_ERR_PKT_AVL; 01042 01043 buf = MQP_VHEADER_BUF(mqp); 01044 end = MQP_FREEBUF_LEN(mqp) + buf; /* End of free buffer */ 01045 if((end - buf) < 2) 01046 return len_err_free_mqp(mqp);/* MSG-ID: no space */ 01047 01048 buf += tail_incorp_msg_id(mqp); 01049 ref = buf; 01050 01051 for(i = 0; i < n_elem; i++) { 01052 const struct utf8_string *topic; 01053 struct utf8_string topreq; 01054 int32_t rv; 01055 01056 if(subsc_vec) { 01057 topreq.length = subsc_vec[i].length; 01058 topreq.buffer = subsc_vec[i].buffer; 01059 topic = &topreq; 01060 } else 01061 topic = unsub_vec + i; 01062 01063 rv = buf_utf8_wr_try(buf, end - buf, topic, subsc_vec? 01064 (uint8_t)subsc_vec[i].qosreq : QFL_VALUE); 01065 if(rv < 0) { 01066 mqp_free(mqp); 01067 return rv; 01068 } 01069 01070 buf += rv; 01071 } 01072 01073 mqp->pl_len = buf - ref; /* Total length of topics data */ 01074 01075 return _msg_dispatch(cl_ctx, mqp, MQTT_QOS1, false); 01076 } 01077 01078 int32_t mqtt_sub_msg_send(void *ctx, const struct utf8_strqos *qos_topics, uint32_t count) 01079 { 01080 return utf8_array_send(CL_CTX(ctx), qos_topics, NULL, count); 01081 } 01082 01083 int32_t mqtt_sub_dispatch(void *ctx, struct mqtt_packet *mqp) 01084 { 01085 return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false); 01086 } 01087 01088 int32_t mqtt_unsub_msg_send(void *ctx, const struct utf8_string *topics, uint32_t count) 01089 { 01090 return utf8_array_send(CL_CTX(ctx), NULL, topics, count); 01091 } 01092 01093 int32_t mqtt_unsub_dispatch(void *ctx, struct mqtt_packet *mqp) 01094 { 01095 return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false); 01096 } 01097 01098 /* Note: in this revision of implementation, vh_msg_send() is being invoked 01099 from a locked RX context. Should this situation change, so should the 01100 'locking' considerations in the routine. */ 01101 static int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, 01102 enum mqtt_qos qos, bool has_vh, 01103 uint16_t vh_data) 01104 { 01105 uint8_t buf[4]; 01106 uint32_t len = 2; 01107 01108 buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false)); 01109 buf[1] = has_vh ? 2 : 0; 01110 01111 if(has_vh) 01112 len += buf_wr_nbo_2B(buf + 2, vh_data); 01113 01114 return cl_ctx_seg1_send(cl_ctx, buf, len, false, NULL); 01115 } 01116 01117 static int32_t pingreq_send(struct client_ctx *cl_ctx, uint32_t rsp_flag) 01118 { 01119 01120 int32_t rv = 0; 01121 uint8_t buf[2]; 01122 01123 buf[0] = MAKE_FH_BYTE1(MQTT_PINGREQ, 01124 MAKE_FH_FLAGS(false, MQTT_QOS0, false)); 01125 buf[1] = 0; 01126 01127 /* Note: in case of error in network send, cl_ctx_send() may 01128 try to terminate connection with server. */ 01129 rv = cl_ctx_seg1_send(cl_ctx, buf, 2, false, NULL); 01130 if(rv > 0) 01131 cl_ctx->flags |= rsp_flag; 01132 01133 return rv; 01134 } 01135 01136 int32_t mqtt_pingreq_send(void *ctx) 01137 { 01138 Uart_Write((uint8_t*)"mqtt_pingreq_send\r\n"); 01139 int32_t rv = 0; 01140 01141 // MUTEX_LOCKIN(); 01142 if( xSemaphore != NULL ) { 01143 // See if we can obtain the semaphore. If the semaphore is not available 01144 // wait 10 ticks to see if it becomes free. 01145 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 01146 // We were able to obtain the semaphore and can now access the 01147 // shared resource. 01148 rv = pingreq_send(CL_CTX(ctx), USER_PING_RSP_FLAG); 01149 01150 // We have finished accessing the shared resource. Release the 01151 // semaphore. 01152 xSemaphoreGive( xSemaphore ); 01153 01154 } else { 01155 // We could not obtain the semaphore and can therefore not access 01156 // the shared resource safely. 01157 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 01158 } 01159 } 01160 01161 // MUTEX_UNLOCK(); 01162 return rv; 01163 } 01164 01165 int32_t mqtt_disconn_send(void *ctx) 01166 { 01167 Uart_Write((uint8_t*)"mqtt_disconn_send\r\n"); 01168 uint8_t buf[2]; 01169 01170 buf[0] = MAKE_FH_BYTE1(MQTT_DISCONNECT, 01171 MAKE_FH_FLAGS(false, MQTT_QOS0, false)); 01172 buf[1] = 0; 01173 01174 // MUTEX_LOCKIN(); 01175 if( xSemaphore != NULL ) { 01176 // See if we can obtain the semaphore. If the semaphore is not available 01177 // wait 10 ticks to see if it becomes free. 01178 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 01179 // We were able to obtain the semaphore and can now access the 01180 // shared resource. 01181 /* Note: in case of error in network send, cl_ctx_send() may 01182 try to terminate connection with server. */ 01183 if(cl_ctx_seg1_send(CL_CTX(ctx), buf, 2, false, NULL) > 0) { 01184 /* Terminate connection on application's request */ 01185 do_net_close_tx(CL_CTX(ctx), "DISCONN"); 01186 } 01187 01188 // We have finished accessing the shared resource. Release the 01189 // semaphore. 01190 xSemaphoreGive( xSemaphore ); 01191 01192 } else { 01193 // We could not obtain the semaphore and can therefore not access 01194 // the shared resource safely. 01195 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 01196 } 01197 } 01198 01199 // MUTEX_UNLOCK(); 01200 return 0; 01201 } 01202 01203 /*------------------------------------------------------------------------------ 01204 * MQTT RX Routines 01205 *------------------------------------------------------------------------------ 01206 */ 01207 static bool ack1_wl_mqp_dispatch(struct client_ctx *cl_ctx) 01208 { 01209 struct mqtt_ack_wlist *wlist = &CLIENT(cl_ctx)->qos_ack1_wl; 01210 struct mqtt_packet *mqp = NULL; 01211 bool rv = true; 01212 01213 for(mqp = wlist->head; mqp && (true == rv); mqp = mqp->next) { 01214 uint8_t *buf = MQP_FHEADER_BUF(mqp); 01215 mqp->fh_byte1 = *buf |= DUP_FLAG_VAL(true); 01216 01217 mqp->n_refs++; /* Ensures MQP is not freed by following */ 01218 01219 /* Error or not, following routine tries to free up MQP */ 01220 if(cl_ctx_seg1_send(cl_ctx, buf, MQP_CONTENT_LEN(mqp), 01221 false, mqp) <= 0) 01222 rv = false; 01223 } 01224 01225 return rv; 01226 } 01227 01228 /* TBD candidate for common */ 01229 static bool ack2_msg_id_dispatch(struct client_ctx *cl_ctx) 01230 { 01231 struct pub_qos2_cq *tx_cq = &CLIENT(cl_ctx)->qos2_tx_cq; 01232 uint8_t rd_idx = tx_cq->rd_idx; 01233 uint8_t n_free = tx_cq->n_free; 01234 bool rv = true; 01235 uint8_t i = 0; 01236 01237 for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free) && (true == rv); i++) { 01238 if(vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1, 01239 true, tx_cq->id_vec[i]) <= 0) 01240 rv = false; 01241 } 01242 01243 return rv; 01244 } 01245 01246 static void session_resume(struct client_ctx *cl_ctx) 01247 { 01248 DBG_INFO("C: Re-send ACK awaited QoS1/2 msgs to net %d\n\r", 01249 cl_ctx->net); 01250 01251 if(ack1_wl_mqp_dispatch(cl_ctx)) 01252 ack2_msg_id_dispatch(cl_ctx); 01253 01254 return; 01255 } 01256 01257 static bool ack1_wl_rmfree(struct mqtt_ack_wlist *wl, uint16_t msg_id) 01258 { 01259 struct mqtt_packet *mqp = mqp_ack_wlist_remove(wl, msg_id); 01260 if(NULL != mqp) { 01261 mqp_free(mqp); 01262 return true; 01263 } 01264 01265 USR_INFO("Err: Unexpected ACK w/ ID 0x%04x\n\r", msg_id); 01266 01267 return false; 01268 } 01269 01270 static bool _proc_pub_rec_rx(struct client_ctx *cl_ctx, uint16_t msg_id) 01271 { 01272 /* Follow-up messages for QOS2 PUB must be transacted in the 01273 same order as the initial sequence of QOS2 PUB dispatches. 01274 Therefore, checking the first entry should be OK 01275 */ 01276 struct mqtt_packet *mqp = CLIENT(cl_ctx)->qos_ack1_wl.head; 01277 01278 if((msg_id == mqp->msg_id) && ack2_msg_id_logup(cl_ctx, msg_id)) { 01279 01280 ack1_wl_rmfree(&CLIENT(cl_ctx)->qos_ack1_wl, msg_id); 01281 01282 vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1, 01283 true, msg_id); 01284 01285 return true; 01286 } 01287 01288 return false; /* Unexpected PUBREC or QOS2 store exceeded */ 01289 } 01290 01291 static bool _proc_pub_rel_rx(struct client_ctx *cl_ctx, uint16_t msg_id) 01292 { 01293 /* For a PUB-REL RX, send PUBCOMP to let server make progress */ 01294 vh_msg_send(cl_ctx, MQTT_PUBCOMP, MQTT_QOS0, true, msg_id); 01295 01296 if(qos2_pub_rx_is_done(cl_ctx, msg_id)) 01297 qos2_pub_rx_unlog(cl_ctx, msg_id); /* Expunge record */ 01298 01299 return true; 01300 } 01301 01302 /* 01303 Process ACK Message from Broker. 01304 Returns true on success, otherwise false. 01305 Used for: PUBACK, SUBACK and UNSUBACK 01306 */ 01307 static 01308 bool _proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01309 { 01310 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); 01311 struct client_desc *client = CLIENT(cl_ctx); 01312 uint16_t msg_id = mqp_raw->msg_id; 01313 uint32_t len = mqp_raw->pl_len; 01314 01315 /* Caters to SUB-ACK, UNSUB-ACK and PUB-ACK Messages */ 01316 if(false == ack1_wl_rmfree(&client->qos_ack1_wl, msg_id)) 01317 return false; /* Err: MSG_ID was not awaited */ 01318 01319 if(ctx_cbs->ack_notify) 01320 ctx_cbs->ack_notify(client->app, mqp_raw->msg_type, msg_id, 01321 len? MQP_PAYLOAD_BUF(mqp_raw): NULL, 01322 len); 01323 return true; 01324 } 01325 01326 static 01327 bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01328 { 01329 uint8_t msg_type = mqp_raw->msg_type; 01330 bool rv = false; 01331 uint16_t msg_id = 0; 01332 01333 if(false == mqp_proc_msg_id_ack_rx(mqp_raw, MQTT_SUBACK == msg_type)) 01334 return rv; /* Problem in contents received from server */ 01335 01336 msg_id = mqp_raw->msg_id; 01337 01338 if(MQTT_PUBREC == msg_type) { 01339 rv = _proc_pub_rec_rx(cl_ctx, msg_id); 01340 if(rv) 01341 MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */ 01342 01343 } else if(MQTT_PUBREL == msg_type) { 01344 rv = _proc_pub_rel_rx(cl_ctx, msg_id); 01345 if(rv) 01346 MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */ 01347 } else if(MQTT_PUBCOMP == msg_type) { 01348 rv = ack2_msg_id_unlog(cl_ctx, msg_id); 01349 } else { 01350 rv = _proc_ack_msg_rx(cl_ctx, mqp_raw); 01351 } 01352 01353 return rv; 01354 } 01355 01356 static 01357 bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01358 { 01359 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); 01360 bool good_pub = mqp_proc_pub_rx(mqp_raw); 01361 uint8_t B = mqp_raw->fh_byte1; 01362 enum mqtt_qos qos = ENUM_QOS(B); 01363 uint16_t msg_id = 0; 01364 01365 if(false == good_pub) 01366 return false; /* Didn't get nicely composed PUB Packet */ 01367 01368 msg_id = mqp_raw->msg_id; 01369 01370 /* Irrespective of the result of the ACK through vh_msg_send(), 01371 the implementation has chosen to process the good PUB packet. 01372 Any error will be handled in next iteration of rx processing. 01373 */ 01374 if(MQTT_QOS1 == qos) 01375 vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id); 01376 01377 if(MQTT_QOS2 == qos) { 01378 /* Ensuring "only once" philosophy for MQTT QoS2 PUBs */ 01379 if(qos2_pub_rx_is_done(cl_ctx, msg_id)) { 01380 /* Already delivered. Drop it & do not report */ 01381 MQP_RX_DO_NOT_RPT_SET(mqp_raw); 01382 return true; /* No more follow-up; all's good */ 01383 } 01384 01385 if(false == qos2_pub_rx_logup(cl_ctx, msg_id)) 01386 return false; /* Failed to record New RX PUB */ 01387 01388 vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id); 01389 } 01390 01391 /* QoS obliations completed, present PUBLISH RX packet to app */ 01392 if(ctx_cbs->publish_rx) { 01393 /* App has chosen the callback method to receive PKT */ 01394 mqp_raw->n_refs++; /* Make app owner of this packet */ 01395 if(ctx_cbs->publish_rx(CLIENT(cl_ctx)->app, BOOL_DUP(B), 01396 qos, BOOL_RETAIN(B), mqp_raw)) { 01397 /* App has no use of PKT any more, so free it */ 01398 mqp_raw->n_refs--; /* Take back ownership */ 01399 } 01400 } 01401 01402 return true; 01403 } 01404 01405 static 01406 bool proc_connack_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01407 { 01408 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); 01409 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); 01410 01411 mqp_raw->vh_len += 2; 01412 mqp_raw->pl_len -= 2; 01413 01414 if(0 != mqp_raw->pl_len) 01415 return false; /* There is no payload in message */ 01416 01417 cl_ctx->flags &= ~(DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG); 01418 01419 if(VHB_CONNACK_RC(buf)) 01420 /* Server has refused the connection, inform the app */ 01421 goto proc_connack_rx_exit1; 01422 01423 cl_ctx->flags |= NOW_CONNECTED_FLAG; 01424 cl_ctx_timeout_update(cl_ctx, net_ops->time()); /* start KA */ 01425 01426 if(IS_CLN_SESSION(cl_ctx)) 01427 session_delete(cl_ctx); 01428 else 01429 session_resume(cl_ctx); 01430 01431 proc_connack_rx_exit1: 01432 if(ctx_cbs->ack_notify) 01433 ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, mqp_raw->msg_type, 01434 0, buf, 2); 01435 01436 return true; 01437 } 01438 01439 static 01440 bool proc_pingrsp_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01441 { 01442 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); 01443 01444 if(0 != mqp_raw->pl_len) 01445 return false; 01446 01447 if(AWAITS_KA_PING(cl_ctx)) { 01448 cl_ctx->flags &= ~KA_PINGER_RSP_FLAG; 01449 return true; 01450 } 01451 01452 if(AWAITS_PINGRSP(cl_ctx)) { 01453 cl_ctx->flags &= ~USER_PING_RSP_FLAG; 01454 if(ctx_cbs->ack_notify) 01455 ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, 01456 mqp_raw->msg_type, 01457 0, NULL, 0); 01458 return true; 01459 } 01460 01461 return false; 01462 } 01463 01464 static 01465 bool conn_sent_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01466 { 01467 bool rv = false; 01468 01469 switch(mqp_raw->msg_type) { 01470 01471 case MQTT_CONNACK: 01472 /* Changes client_ctx->flags to CONNECTED */ 01473 rv = proc_connack_rx(cl_ctx, mqp_raw); 01474 break; 01475 01476 default: 01477 break; 01478 } 01479 01480 return rv; 01481 } 01482 01483 static 01484 bool connected_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) 01485 { 01486 bool rv = false; 01487 01488 switch(mqp_raw->msg_type) { 01489 01490 case MQTT_SUBACK: 01491 case MQTT_PUBACK: 01492 case MQTT_PUBREC: 01493 case MQTT_PUBREL: 01494 case MQTT_PUBCOMP: 01495 case MQTT_UNSUBACK: 01496 rv = proc_ack_msg_rx(cl_ctx, mqp_raw); 01497 break; 01498 01499 case MQTT_PINGRSP: 01500 rv = proc_pingrsp_rx(cl_ctx, mqp_raw); 01501 break; 01502 01503 case MQTT_PUBLISH: 01504 rv = proc_pub_msg_rx(cl_ctx, mqp_raw); 01505 break; 01506 01507 case MQTT_CONNACK: /* not expected */ 01508 default: 01509 break; 01510 } 01511 01512 return rv; 01513 } 01514 01515 static bool process_recv(struct client_ctx *cl_ctx, 01516 struct mqtt_packet *mqp_raw) 01517 { 01518 bool rv; 01519 01520 USR_INFO("C: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d [@ %u]\n\r", 01521 mqp_raw->fh_byte1, cl_ctx->net, net_ops->time()); 01522 01523 /* Working Principle: Only RX processing errors should be 01524 reported as 'false'. Status of TX as a follow-up to RX 01525 messages need not be reported by the xyz_rx() routines. 01526 Error observed in TX is either dealt in next iteration 01527 of RX loop (in case, there is a dedicated RX task for 01528 the CTX) or in TX routine itself (in case, there is no 01529 dedicated RX task for the CTX). 01530 */ 01531 rv = AWAITS_CONNACK(cl_ctx)? 01532 conn_sent_state_rx(cl_ctx, mqp_raw) : 01533 connected_state_rx(cl_ctx, mqp_raw); 01534 01535 DBG_INFO("C: Msg w/ ID 0x%04x, processing status: %s\n\r", 01536 mqp_raw->msg_id, rv? "Good" : "Fail"); 01537 01538 return rv; 01539 } 01540 01541 static int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, void *ctx) 01542 { 01543 bool timed_out = false; 01544 int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, &timed_out, ctx); 01545 if(rv <= 0) { 01546 USR_INFO("C: Net %d, Raw Error %d, Time Out: %c\n\r", 01547 net, rv, timed_out? 'Y' : 'N'); 01548 01549 if(timed_out) 01550 rv = MQP_ERR_TIMEOUT; 01551 } 01552 01553 return rv; 01554 } 01555 01556 /* 01557 MQTT 3.1.1 implementation 01558 ------------------------- 01559 01560 Keep Alive Time is maxmimum interval within which a client should send a 01561 packet to broker. If there are either no packets to be sent to broker or 01562 no retries left, then client is expected to a send a PINGREQ within Keep 01563 Alive Time. Broker should respond by sending PINGRSP with-in reasonable 01564 time of 'wait_secs'. If Keep Alive Time is set as 0, then client is not 01565 expected to be disconnected due to in-activity of MQTT messages. Value 01566 of 'wait_secs' is assumed to be quite smaller than (non-zero) 'ka_secs'. 01567 */ 01568 static void conn2used_ctxs(uint32_t wait_secs) 01569 { 01570 01571 while(conn_ctxs) { 01572 struct client_ctx *cl_ctx = conn_ctxs; 01573 conn_ctxs = conn_ctxs->next; 01574 01575 cl_ctx_timeout_insert(&used_ctxs, cl_ctx); 01576 } 01577 } 01578 01579 static int32_t single_ctx_ka_sequence(struct client_ctx *cl_ctx, uint32_t wait_secs) 01580 { 01581 01582 uint32_t now_secs = net_ops->time(); 01583 01584 if(AWAITS_CONNACK(cl_ctx) && CFG_CONNACK_TO(cl_ctx)) { 01585 cl_ctx->timeout += wait_secs; /* Set CONNACK timeout value */ 01586 cl_ctx->flags &= ~DO_CONNACK_TO_FLAG; 01587 } 01588 01589 if(cl_ctx->timeout > now_secs) { 01590 return 1; /* Still have time for next message transaction */ 01591 } 01592 if(is_connected(cl_ctx)) { 01593 /* Timeout has happened. Check for PINGRESP if PINGREQ done. 01594 Otherwise, send PINGREQ (Are You there?) to the server. */ 01595 if(AWAITS_KA_PING(cl_ctx)) { 01596 goto single_ctx_ka_sequence_exit1; /* No PINGRESP */ 01597 } 01598 return pingreq_send(cl_ctx, KA_PINGER_RSP_FLAG); /* Hello! */ 01599 } 01600 01601 single_ctx_ka_sequence_exit1: 01602 01603 USR_INFO("C: Net %d, no RX MSG in reasonable time\n\r", cl_ctx->net); 01604 return -1; 01605 } 01606 01607 static uint32_t single_ctx_adj_wait_secs_get(struct client_ctx *cl_ctx, uint32_t wait_secs) 01608 { 01609 01610 return (KA_TIMEOUT_NONE != cl_ctx->timeout)? 01611 MIN(cl_ctx->timeout - net_ops->time(), wait_secs) : wait_secs; 01612 } 01613 01614 static int32_t single_ctx_rx_prep(struct client_ctx *cl_ctx, uint32_t *secs2wait) 01615 { 01616 01617 int32_t rv; 01618 01619 if(-1 == cl_ctx->net) 01620 return MQP_ERR_NOTCONN; /* Likely for a ctx w/o a receive task */ 01621 01622 if(NEED_NET_CLOSE(cl_ctx)) 01623 rv = MQP_ERR_NOTCONN; 01624 else if(0 > single_ctx_ka_sequence(cl_ctx, *secs2wait)) 01625 rv = MQP_ERR_NETWORK; 01626 else { 01627 *secs2wait = single_ctx_adj_wait_secs_get(cl_ctx, *secs2wait); 01628 return 1; 01629 } 01630 01631 do_net_close_rx(cl_ctx, rv); 01632 return rv; 01633 } 01634 01635 static 01636 int32_t proc_ctx_data_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, 01637 uint32_t wait_secs, void **app) 01638 { 01639 01640 int32_t rv = MQP_ERR_NOTCONN; 01641 int32_t net = cl_ctx->net; 01642 01643 *app = cl_ctx->usr; 01644 01645 rv = net_recv(net, mqp, wait_secs, (void*)cl_ctx); 01646 01647 // MUTEX_LOCKIN(); 01648 if( xSemaphore != NULL ) { 01649 // See if we can obtain the semaphore. If the semaphore is not available 01650 // wait 10 ticks to see if it becomes free. 01651 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 01652 // We were able to obtain the semaphore and can now access the 01653 // shared resource. 01654 if(rv > 0) { 01655 if(false == process_recv(cl_ctx, mqp)) { 01656 Uart_Write((uint8_t*)"MQP_ERR_CONTENT\r\n"); 01657 rv = MQP_ERR_CONTENT; 01658 } 01659 } 01660 /* RX: close the network connection to the server for this context, if 01661 (a) there is a processing / protocol error other than time-out 01662 (b) A good MQTT CONNACK has a return code - connection refused 01663 */ 01664 if(((rv < 0) && (rv != MQP_ERR_TIMEOUT)) || 01665 ((MQTT_CONNACK == mqp->msg_type) && 01666 MQP_CONNACK_RC(mqp))) { 01667 do_net_close_rx(cl_ctx, rv); 01668 } 01669 01670 // We have finished accessing the shared resource. Release the 01671 // semaphore. 01672 xSemaphoreGive( xSemaphore ); 01673 01674 } else { 01675 // We could not obtain the semaphore and can therefore not access 01676 // the shared resource safely. 01677 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 01678 } 01679 } 01680 01681 // MUTEX_UNLOCK(); 01682 return rv; 01683 } 01684 01685 static int32_t mqp_setup_proc_ctx_data_recv(struct client_ctx *cl_ctx, 01686 struct mqtt_packet *mqp, 01687 uint32_t wait_secs, void **app) 01688 { 01689 struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp; 01690 int32_t rv; 01691 01692 if(NULL != mqp) { 01693 /* Input MQP must be same as MQP for partial RX, if any */ 01694 if(rx_mqp) { 01695 if(mqp != rx_mqp) 01696 return MQP_ERR_FNPARAM; 01697 } else 01698 mqp_reset(mqp); 01699 } 01700 01701 if(NULL == mqp) { 01702 mqp = rx_mqp? rx_mqp : mqp_client_recv_alloc(0); 01703 if(NULL == mqp) 01704 return MQP_ERR_PKT_AVL; 01705 } 01706 01707 rv = proc_ctx_data_recv(cl_ctx, mqp, wait_secs, app); 01708 if(rv == MQP_ERR_TIMEOUT) { 01709 CLIENT(cl_ctx)->rx_mqp = mqp; /* Save partial RX MQP */ 01710 } else { 01711 /* Control reaches here due to either an error in RX or the 01712 completion of RX. In both the cases, the MQP needs to be 01713 detached and processed. For completion of RX: 01714 callback mode: Application has used up MQP data; free it 01715 Non-callback mode: Application will now use complete MQP 01716 */ 01717 CLIENT(cl_ctx)->rx_mqp = NULL; 01718 if(mqp->free) 01719 mqp_free_locked(mqp); /* For only callback mode */ 01720 } 01721 01722 return rv; 01723 } 01724 01725 static int32_t cl_ctx_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, 01726 uint32_t wait_secs) 01727 { 01728 01729 void *app = NULL; 01730 int32_t rv = 0; 01731 01732 do { 01733 if(mqp && (NULL == CLIENT(cl_ctx)->rx_mqp)) 01734 mqp_reset(mqp); 01735 01736 rv = single_ctx_rx_prep(cl_ctx, &wait_secs); 01737 if(rv > 0) 01738 rv = mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, 01739 wait_secs, 01740 &app); 01741 01742 /* 'mqp' must be valid, if rv > 0 but including further 01743 & additional check for sake of static cod eanalysis.*/ 01744 } while((rv > 0) && mqp && MQP_RX_DO_NOT_RPT_COR(mqp)); 01745 01746 return rv; 01747 } 01748 01749 int32_t mqtt_client_ctx_await_msg(void *ctx, uint8_t msg_type, struct mqtt_packet *mqp, 01750 uint32_t wait_secs) 01751 { 01752 struct client_ctx *cl_ctx = CL_CTX(ctx); 01753 int32_t rv = -1; 01754 01755 if((NULL == cl_ctx) || (NULL == mqp)) 01756 return MQP_ERR_FNPARAM; 01757 01758 do { 01759 rv = cl_ctx_recv(cl_ctx, mqp, wait_secs); 01760 01761 } while((rv > 0) && 01762 (0 != msg_type) && (msg_type != mqp->msg_type)); 01763 01764 return rv; 01765 } 01766 01767 int32_t mqtt_client_ctx_run(void *ctx, uint32_t wait_secs) 01768 { 01769 01770 int32_t rv; 01771 01772 if(NULL == ctx) 01773 return MQP_ERR_FNPARAM; 01774 01775 do { 01776 rv = cl_ctx_recv(CL_CTX(ctx), NULL, wait_secs); 01777 01778 } while(rv > 0); 01779 01780 return rv; 01781 } 01782 01783 static struct client_ctx *group_ctxs_ka_sequence(uint32_t wait_secs) { 01784 01785 struct client_ctx *cl_ctx = used_ctxs; 01786 01787 while(cl_ctx) { 01788 struct client_ctx *next = cl_ctx->next; 01789 if(single_ctx_rx_prep(cl_ctx, &wait_secs) < 0) { 01790 /* 'CTX' no more eligible for operation 01791 and has been removed from used_list */ 01792 if(false == grp_has_cbfn) 01793 return cl_ctx; 01794 } 01795 01796 cl_ctx = next; 01797 } 01798 01799 return NULL; 01800 } 01801 01802 #define IO_MON_NO_TIMEOUT (0xFFFFFFFF) 01803 01804 static uint32_t group_ctxs_adj_wait_secs_get(uint32_t wait_secs) 01805 { 01806 01807 return used_ctxs? 01808 single_ctx_adj_wait_secs_get(used_ctxs, wait_secs) : wait_secs; 01809 } 01810 01811 static int32_t recv_hvec[MAX_NWCONN + 1 + 1]; /* GROUP LISTEN PORT + VEC END */ 01812 static int32_t send_hvec = -1; 01813 static int32_t rsvd_hvec = -1; 01814 01815 /* Caller must ensure atomic enviroment for execution of this routine */ 01816 static void recv_hvec_load(int32_t *hvec_recv, uint32_t size, struct client_ctx *list) 01817 { 01818 01819 int32_t i = 0; 01820 01821 for(i = 0; (i < size) && (NULL != list); i++, list = list->next) 01822 hvec_recv[i] = list->net; 01823 01824 hvec_recv[i] = -1; 01825 01826 return; 01827 } 01828 01829 static int32_t group_ctxs_rx_prep(uint32_t wait_secs, void **app) 01830 { 01831 01832 /* CHK 'used ctx'(s) have live connection w/ server. If not, drop it */ 01833 struct client_ctx *ctx_kaTO = group_ctxs_ka_sequence(wait_secs); 01834 int32_t n_hnds; 01835 01836 if(ctx_kaTO) { 01837 *app = CLIENT(ctx_kaTO)->app; 01838 return MQP_ERR_NETWORK; 01839 } 01840 01841 conn2used_ctxs(wait_secs); /* Now, add new 'ctx'(s) to 'used ctxs' */ 01842 01843 recv_hvec[0] = loopb_net; 01844 recv_hvec_load(&recv_hvec[1], MAX_NWCONN + 1, used_ctxs); 01845 01846 wait_secs = group_ctxs_adj_wait_secs_get(wait_secs); 01847 01848 n_hnds = net_ops->io_mon(recv_hvec, &send_hvec, 01849 &rsvd_hvec, wait_secs); 01850 if(0 == n_hnds) 01851 n_hnds = MQP_ERR_TIMEOUT; 01852 else if(n_hnds < 0) 01853 n_hnds = MQP_ERR_LIBQUIT; 01854 01855 return n_hnds; 01856 } 01857 01858 static int32_t proc_loopback_recv(int32_t net) 01859 { 01860 01861 int32_t rv = 0; 01862 uint8_t buf[LOOP_DLEN]; 01863 01864 /* Thanks for waking-up thread, but ain't got much to do now */ 01865 rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0); 01866 if(rv <= 0) { 01867 memset(print_buf, 0x00, PRINT_BUF_LEN); 01868 sprintf((char*) print_buf, "MQP_ERR_LIBQUIT %i\r\n",MQP_ERR_LIBQUIT); 01869 Uart_Write((uint8_t *) print_buf); 01870 net_ops->close(net); 01871 return MQP_ERR_LIBQUIT; 01872 } 01873 01874 return rv; 01875 } 01876 01877 static struct client_ctx *net_cl_ctx_find(int32_t net) { 01878 struct client_ctx *cl_ctx = used_ctxs; 01879 01880 while(cl_ctx && (net != cl_ctx->net)) 01881 cl_ctx = cl_ctx->next; 01882 01883 return cl_ctx; 01884 } 01885 01886 static int32_t proc_net_data_recv(int32_t net, struct mqtt_packet *mqp, void **app) 01887 { 01888 01889 /* Note: used_ctxs are always managed by a single RX task */ 01890 struct client_ctx *cl_ctx = net_cl_ctx_find(net); 01891 int32_t rv = MQP_ERR_NOTCONN; 01892 01893 if(NULL == cl_ctx) { 01894 return rv; /* TX removed it interim, mustn't happen */ 01895 } 01896 return mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, 1, app); 01897 } 01898 01899 static int32_t cl_recv(struct mqtt_packet *mqp, uint32_t wait_secs, void **app) 01900 { 01901 01902 int32_t rv = MQP_ERR_NETWORK; 01903 int32_t n_hnds = 0, idx = 0; 01904 01905 rv = group_ctxs_rx_prep(wait_secs, app); 01906 if(rv > 0) 01907 n_hnds = rv; 01908 01909 for(idx = 0; (idx < n_hnds) && (rv > 0); idx++) { 01910 int32_t net = recv_hvec[idx]; 01911 if(loopb_net == net) 01912 rv = proc_loopback_recv(net); /* UDP Packet */ 01913 else { 01914 rv = proc_net_data_recv(net, mqp, app); 01915 if(false == grp_has_cbfn) 01916 break; /* 'CTX': inform application */ 01917 } 01918 } 01919 01920 return rv; 01921 } 01922 01923 static int32_t grp_net_setup_create() 01924 { 01925 01926 if(0 == loopb_portid) { 01927 return MQP_ERR_NOT_DEF; 01928 } 01929 if(NULL == net_ops) { 01930 return MQP_ERR_NET_OPS; 01931 } 01932 if(-1 == loopb_net) { 01933 loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, loopb_portid, NULL); 01934 01935 if(-1 == loopb_net) { 01936 return MQP_ERR_LIBQUIT; 01937 } 01938 } 01939 01940 return 1; 01941 } 01942 01943 int32_t mqtt_client_await_msg(struct mqtt_packet *mqp, uint32_t wait_secs, void **app) 01944 { 01945 01946 int32_t rv = MQP_ERR_NOTCONN; 01947 *app = NULL; 01948 01949 if(NULL == mqp) 01950 return MQP_ERR_FNPARAM; /* Didn't get a valid MQP */ 01951 01952 if(true == grp_has_cbfn) 01953 return MQP_ERR_BADCALL; /* Err: LIB has CB config */ 01954 01955 rv = grp_net_setup_create(); 01956 if(rv <= 0) 01957 return rv; 01958 01959 do { 01960 rv = cl_recv(mqp, wait_secs, app); 01961 01962 } while((rv > 0) && MQP_RX_DO_NOT_RPT_COR(mqp)); 01963 01964 return rv; 01965 } 01966 01967 int32_t mqtt_client_run(uint32_t wait_secs) 01968 { 01969 01970 void *app = NULL; 01971 int32_t rv = -1; 01972 01973 if(false == grp_has_cbfn) { 01974 return MQP_ERR_BADCALL; /* Err: LIB has no CB config */ 01975 } 01976 rv = grp_net_setup_create(); 01977 if(rv <= 0) { 01978 return rv; 01979 } 01980 do { 01981 rv = cl_recv(NULL, wait_secs, &app); 01982 01983 } while((rv > 0) || 01984 /* 'ctx' related errors are handled by the callbacks */ 01985 ((rv != MQP_ERR_LIBQUIT) && (rv != MQP_ERR_TIMEOUT))); 01986 01987 return rv; 01988 } 01989 01990 /*------------------------------------------------------------------------------ 01991 * Buffer Pool and management, other registrations and initialization. 01992 *------------------------------------------------------------------------------ 01993 */ 01994 static struct mqtt_packet *free_list = NULL; 01995 01996 static struct mqtt_packet *mqp_alloc_atomic(void) { 01997 01998 struct mqtt_packet *mqp = NULL; 01999 02000 // MUTEX_LOCKIN(); 02001 if( xSemaphore != NULL ) { 02002 // See if we can obtain the semaphore. If the semaphore is not available 02003 // wait 10 ticks to see if it becomes free. 02004 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 02005 // We were able to obtain the semaphore and can now access the 02006 // shared resource. 02007 mqp = free_list; 02008 if(mqp) { 02009 free_list = mqp->next; 02010 } 02011 02012 // We have finished accessing the shared resource. Release the 02013 // semaphore. 02014 xSemaphoreGive( xSemaphore ); 02015 02016 } else { 02017 // We could not obtain the semaphore and can therefore not access 02018 // the shared resource safely. 02019 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 02020 } 02021 } 02022 02023 // MUTEX_UNLOCK(); 02024 return mqp; 02025 } 02026 02027 struct mqtt_packet *mqp_client_alloc(uint8_t msg_type, uint8_t offset) { 02028 02029 struct mqtt_packet *mqp = mqp_alloc_atomic(); 02030 if(NULL == mqp) { 02031 USR_INFO("MQP alloc failed - msg type 0x%02x\n\r", msg_type); 02032 return NULL; 02033 } 02034 02035 mqp_init(mqp, offset); 02036 mqp->msg_type = msg_type; 02037 02038 return mqp; 02039 } 02040 02041 /* Do not use this routine with-in this file. */ 02042 static void free_mqp(struct mqtt_packet *mqp) 02043 { 02044 /* Must be used in a locked state */ 02045 mqp->next = free_list; 02046 free_list = mqp; 02047 } 02048 02049 int32_t mqtt_client_buffers_register(uint32_t num_mqp, struct mqtt_packet *mqp_vec, 02050 uint32_t buf_len, uint8_t *buf_vec) 02051 { 02052 uint32_t i, j; 02053 02054 if((0 == num_mqp) || (0 == buf_len) || free_list) 02055 return -1; 02056 02057 for(i = 0, j = 0; i < num_mqp; i++, j += buf_len) { 02058 struct mqtt_packet *mqp = mqp_vec + i; 02059 02060 mqp->buffer = buf_vec + j; 02061 mqp->maxlen = buf_len; 02062 02063 mqp->free = free_mqp; 02064 mqp->next = free_list; 02065 free_list = mqp; 02066 } 02067 02068 return 0; 02069 } 02070 02071 int32_t mqtt_client_ctx_will_register(void *ctx, 02072 const struct utf8_string *will_top, 02073 const struct utf8_string *will_msg, 02074 enum mqtt_qos will_qos, bool retain) 02075 { 02076 uint8_t B = 0; 02077 02078 if((NULL == ctx) || ((NULL == will_top) && (NULL != will_msg))) 02079 return -1; /* Bad Combo */ 02080 02081 if(NULL != will_top) { 02082 RET_IF_INVALID_UTF8(will_top); 02083 02084 B = QOS_VALUE(will_qos) << 3; 02085 if(retain) 02086 B |= WILL_RETAIN_VAL; 02087 02088 if(NULL != will_msg) 02089 RET_IF_INVALID_UTF8(will_msg); 02090 } 02091 02092 CLIENT(ctx)->conn_pl_utf8s[1] = will_top; 02093 CLIENT(ctx)->conn_pl_utf8s[2] = will_msg; 02094 02095 CLIENT(ctx)->will_opts = B; 02096 02097 return 0; 02098 } 02099 02100 int32_t mqtt_client_ctx_info_register(void *ctx, 02101 const struct utf8_string *client_id, 02102 const struct utf8_string *user_name, 02103 const struct utf8_string *pass_word) 02104 { 02105 const struct utf8_string *users_pwd = NULL; 02106 02107 if(NULL == ctx) 02108 return -1; 02109 02110 if(NULL != client_id) 02111 RET_IF_INVALID_UTF8(client_id); 02112 02113 if(NULL != user_name) { 02114 RET_IF_INVALID_UTF8(user_name); 02115 02116 if(NULL != pass_word) 02117 RET_IF_INVALID_UTF8(pass_word); 02118 02119 users_pwd = pass_word; 02120 } 02121 02122 CLIENT(ctx)->conn_pl_utf8s[0] = client_id; 02123 CLIENT(ctx)->conn_pl_utf8s[3] = user_name; 02124 CLIENT(ctx)->conn_pl_utf8s[4] = users_pwd; 02125 02126 return 0; 02127 } 02128 02129 int32_t mqtt_client_net_svc_register(const struct device_net_services *net) 02130 { 02131 if(net && net->open && net->send && net->recv && 02132 net->send_dest && net->recv_from && net->close 02133 && net->io_mon && net->time) { 02134 net_ops = net; 02135 return 0; 02136 } 02137 02138 return -1; 02139 } 02140 02141 static void cl_ctx_setup(struct client_ctx *cl_ctx, /* WR Object */ 02142 const struct mqtt_client_ctx_cfg *ctx_cfg, 02143 const struct mqtt_client_ctx_cbs *ctx_cbs, 02144 void *app) 02145 { 02146 02147 struct client_desc *client = CLIENT(cl_ctx); 02148 02149 cl_ctx->flags = ctx_cfg->config_opts; 02150 02151 client->nwconn_opts = ctx_cfg->nwconn_opts; 02152 client->server_addr = ctx_cfg->server_addr; 02153 client->port_number = ctx_cfg->port_number; 02154 02155 client->app = app; 02156 02157 02158 if(NULL != ctx_cfg->nw_security) 02159 buf_wr_nbytes((uint8_t*)&client->nw_security, 02160 (uint8_t*)ctx_cfg->nw_security, 02161 sizeof(struct secure_conn)); 02162 02163 if(NULL != ctx_cbs) { 02164 // set callback flag 02165 struct mqtt_client_ctx_cbs *cbs_ctx = CTX_CBS_PTR(client); 02166 cbs_ctx->publish_rx = ctx_cbs->publish_rx; 02167 cbs_ctx->ack_notify = ctx_cbs->ack_notify; 02168 cbs_ctx->disconn_cb = ctx_cbs->disconn_cb; 02169 } 02170 02171 return; 02172 } 02173 02174 int32_t mqtt_client_ctx_create(const struct mqtt_client_ctx_cfg *ctx_cfg, 02175 const struct mqtt_client_ctx_cbs *ctx_cbs, 02176 void *app, void **ctx) 02177 { 02178 02179 struct client_ctx *cl_ctx = NULL; 02180 02181 if((NULL == ctx_cfg) || 02182 (NULL == ctx_cfg->server_addr) || 02183 (0 == ctx_cfg->port_number)) { 02184 return -1; 02185 } 02186 if(ctx_cfg->config_opts & MQTT_CFG_MK_GROUP_CTX) { 02187 if(grp_has_cbfn ^ (!!ctx_cbs)) { 02188 return -1; 02189 } 02190 } 02191 02192 // MUTEX_LOCKIN(); 02193 if( xSemaphore != NULL ) { 02194 // See if we can obtain the semaphore. If the semaphore is not available 02195 // wait 10 ticks to see if it becomes free. 02196 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 02197 // We were able to obtain the semaphore and can now access the 02198 // shared resource. 02199 if(free_ctxs) { 02200 cl_ctx = free_ctxs; 02201 free_ctxs = cl_ctx->next; 02202 cl_ctx->next = NULL; 02203 } 02204 02205 // We have finished accessing the shared resource. Release the 02206 // semaphore. 02207 xSemaphoreGive( xSemaphore ); 02208 02209 } else { 02210 // We could not obtain the semaphore and can therefore not access 02211 // the shared resource safely. 02212 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 02213 } 02214 } 02215 02216 // MUTEX_UNLOCK(); 02217 02218 if(cl_ctx) { 02219 cl_ctx_setup(cl_ctx, ctx_cfg, ctx_cbs, app); 02220 *ctx = (void*) cl_ctx; 02221 return 0; 02222 } 02223 02224 return -1; 02225 } 02226 02227 int32_t mqtt_client_ctx_delete(void *ctx) 02228 { 02229 02230 struct client_ctx *cl_ctx = (struct client_ctx*) ctx; 02231 int32_t rv = -1; /* Not sure about deletion as yet */ 02232 02233 // MUTEX_LOCKIN(); 02234 if( xSemaphore != NULL ) { 02235 // See if we can obtain the semaphore. If the semaphore is not available 02236 // wait 10 ticks to see if it becomes free. 02237 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { 02238 // We were able to obtain the semaphore and can now access the 02239 // shared resource. 02240 if((NULL == cl_ctx) || 02241 (-1 != cl_ctx->net) || 02242 (awaits_pkts(cl_ctx))) { 02243 goto mqtt_client_ctx_delete_exit1; 02244 } 02245 rv = 0; /* OK to delete ctx */ 02246 client_reset(CLIENT(cl_ctx)); 02247 cl_ctx_freeup(cl_ctx); 02248 02249 // We have finished accessing the shared resource. Release the 02250 // semaphore. 02251 //xSemaphoreGive( xSemaphore ); 02252 } else { 02253 // We could not obtain the semaphore and can therefore not access 02254 // the shared resource safely. 02255 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); 02256 } 02257 } 02258 02259 mqtt_client_ctx_delete_exit1: 02260 // MUTEX_UNLOCK(); 02261 xSemaphoreGive(xSemaphore); 02262 02263 return rv; 02264 } 02265 02266 int32_t mqtt_client_lib_init(const struct mqtt_client_lib_cfg *lib_cfg) 02267 { 02268 if((NULL == lib_cfg) || (NULL == lib_cfg->debug_printf)) 02269 return -1; 02270 02271 debug_printf = lib_cfg->debug_printf; /* Facilitate debug */ 02272 02273 if(INIT_DONE_STATE == cl_lib_state) { 02274 Uart_Write((uint8_t*)"C: Error trying to re-initialize \n\r"); 02275 USR_INFO("C: Error trying to re-initialize \n\r"); 02276 return -1; 02277 } 02278 02279 USR_INFO("Version: Client LIB %s, Common LIB %s.\n\r", 02280 MQTT_CLIENT_VERSTR, MQTT_COMMON_VERSTR); 02281 02282 client_desc_init(); 02283 02284 cl_lib_state = INIT_DONE_STATE; 02285 02286 loopb_portid = lib_cfg->loopback_port; 02287 grp_has_cbfn = lib_cfg->grp_uses_cbfn; 02288 02289 mutex = lib_cfg->mutex; 02290 mutex_lockin = lib_cfg->mutex_lockin; 02291 mutex_unlock = lib_cfg->mutex_unlock; 02292 02293 aux_dbg_enbl = lib_cfg->aux_debug_en; 02294 02295 return 0; 02296 } 02297 02298 int32_t mqtt_client_lib_exit() 02299 { 02300 struct client_ctx *cl_ctx = free_ctxs; 02301 int32_t count = 0; 02302 02303 while(cl_ctx) { 02304 cl_ctx = cl_ctx->next; 02305 count++; 02306 } 02307 02308 if(MAX_NWCONN == count) { 02309 cl_lib_state = WAIT_INIT_STATE; 02310 free_ctxs = NULL; 02311 return 0; 02312 } 02313 02314 return -1; 02315 } 02316 02317 }//namespace mbed_mqtt
Generated on Fri Jul 15 2022 07:38:03 by 1.7.2