Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
Revision 0:087b5655778d, committed 2015-06-06
- Comitter:
- dflet
- Date:
- Sat Jun 06 13:28:41 2015 +0000
- Commit message:
- Part of mtqq_V1
Changed in this revision
mqtt_client.cpp | Show annotated file Show diff for this revision Revisions of this file |
mqtt_client.h | Show annotated file Show diff for this revision Revisions of this file |
diff -r 000000000000 -r 087b5655778d mqtt_client.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mqtt_client.cpp Sat Jun 06 13:28:41 2015 +0000 @@ -0,0 +1,2317 @@ +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +/* + mqtt_client.c + + The module provides implementation to the public interface of the MQTT + Client Library. +*/ +#include "FreeRTOS.h" +#include "mqtt_client.h" +#include "semphr.h" + +#include "cli_uart.h" + +#define PRINT_BUF_LEN 128 +extern int8_t print_buf[PRINT_BUF_LEN]; + +namespace mbed_mqtt +{ + +//void createMutex(void); + +static void *mutex = NULL; +static void (*mutex_lockin)(void*) = NULL; +static void (*mutex_unlock)(void*) = NULL; + +#define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex); +#define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex); + +static bool aux_dbg_enbl = true; +int32_t (*debug_printf)(const char *fmt, ...) = NULL; + +#define USR_INFO debug_printf +#define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__) + +static const struct device_net_services *net_ops = NULL; + +static uint16_t msg_id = 0xFFFF; +static inline uint16_t assign_new_msg_id() +{ + return msg_id += 2; +} + +SemaphoreHandle_t xSemaphore = NULL; +void createMutex() +{ + + xSemaphore = xSemaphoreCreateMutex(); +} + +/*----------------------------------------------------------------------------- + * Data structure for managing the client and its nuances + *---------------------------------------------------------------------------*/ + +/* Construct to manage TX for network that requires LIB to send a partial and + incremental data to support restrictive TCP segments. Specifically, for the + deployments, in which the network layer supports small segments, there can + be only one in-flight message. +*/ +struct tx_part_pkt { + + /* Refers to MQP, if TX for it is active, otherwise, set it to NULL */ + struct mqtt_packet *tx_mqp; + + union { +#define VH_MSG_LEN 4 + uint8_t vh_msg[VH_MSG_LEN]; /* VH msg for MQP = NULL */ + const uint8_t *buffer; /* Refers to data in MQP */ + }; + + uint32_t length; /* Length of entire data */ + uint32_t offset; /* Next data for sending */ +}; + +static bool tx_part_setup(struct tx_part_pkt *tx_part, const uint8_t *buffer, + uint32_t length, struct mqtt_packet *tx_mqp) +{ + + if(tx_mqp) { + tx_part->buffer = buffer; + } else { + if(VH_MSG_LEN < length) + return false; + + buf_wr_nbytes(tx_part->vh_msg, buffer, length); + } + + tx_part->length = length; + tx_part->tx_mqp = tx_mqp; + + return true; +} + +static void tx_part_reset(struct tx_part_pkt *tx_part) +{ + + struct mqtt_packet *tx_mqp = tx_part->tx_mqp; + if(tx_mqp) { + mqp_free(tx_mqp); + } + tx_part->vh_msg[0] = 0x00; + tx_part->tx_mqp = NULL; + tx_part->length = 0; + tx_part->offset = 0; + + return; +} + +static const uint8_t *tx_part_buf_p(struct tx_part_pkt *tx_part) +{ + struct mqtt_packet *tx_mqp = tx_part->tx_mqp; + uint32_t offset = tx_part->offset; + + return tx_mqp? + tx_part->buffer + offset : + tx_part->vh_msg + offset; +} + +static void tx_part_addup(struct tx_part_pkt *tx_part, uint32_t offset) +{ + tx_part->offset += offset; +} + +#define TX_PART_BUFFER(tx_part) tx_part_buf_p(tx_part) +#define TX_PART_BUF_SZ(tx_part) (tx_part->length - tx_part->offset) +#define TX_PART_IN_USE(tx_part) TX_PART_BUF_SZ(tx_part) + +enum module_state { + + WAIT_INIT_STATE, + INIT_DONE_STATE = 0x01, +}; + +static enum module_state cl_lib_state = WAIT_INIT_STATE; + +static uint16_t loopb_portid = 0; +static bool grp_has_cbfn = false; + +#define USE_PROTO_V31_FLAG MQTT_CFG_PROTOCOL_V31 +#define APP_RECV_TASK_FLAG MQTT_CFG_APP_HAS_RTSK +#define GROUP_CONTEXT_FLAG MQTT_CFG_MK_GROUP_CTX + +#define CLEAN_SESSION_FLAG 0x00010000 +#define CONNACK_AWAIT_FLAG 0x00020000 +#define NOW_CONNECTED_FLAG 0x00040000 +#define KA_PINGER_RSP_FLAG 0x00080000 +#define USER_PING_RSP_FLAG 0x00100000 +#define NETWORK_CLOSE_FLAG 0x00200000 +#define DO_CONNACK_TO_FLAG 0x00400000 + +static struct client_ctx *free_ctxs = NULL; /* CTX construct available */ +static struct client_ctx *used_ctxs = NULL; /* Relevant only for group */ +static struct client_ctx *conn_ctxs = NULL; /* Relevant only for group */ + +static void cl_ctx_freeup(struct client_ctx *cl_ctx) +{ + cl_ctx->next = free_ctxs; + free_ctxs = cl_ctx; + + return; +} + +#define IS_PROTO_VER31(cl_ctx) (cl_ctx->flags & USE_PROTO_V31_FLAG) +#define AWAITS_CONNACK(cl_ctx) (cl_ctx->flags & CONNACK_AWAIT_FLAG) +#define HAS_CONNECTION(cl_ctx) (cl_ctx->flags & NOW_CONNECTED_FLAG) +#define AWAITS_KA_PING(cl_ctx) (cl_ctx->flags & KA_PINGER_RSP_FLAG) +#define AWAITS_PINGRSP(cl_ctx) (cl_ctx->flags & USER_PING_RSP_FLAG) +#define IS_CLN_SESSION(cl_ctx) (cl_ctx->flags & CLEAN_SESSION_FLAG) +#define RECV_TASK_AVBL(cl_ctx) (cl_ctx->flags & APP_RECV_TASK_FLAG) +#define A_GROUP_MEMBER(cl_ctx) (cl_ctx->flags & GROUP_CONTEXT_FLAG) +#define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG) +#define CFG_CONNACK_TO(cl_ctx) (cl_ctx->flags & DO_CONNACK_TO_FLAG) + +#ifndef CFG_CL_MQTT_CTXS +#define MAX_NWCONN 4 +#else +#define MAX_NWCONN CFG_CL_MQTT_CTXS +#endif + +static struct client_desc { + + /* ALERT: "context" must be first elem in this structure, do not move */ + struct client_ctx context; + +#define CLIENT(cl_ctx) ((struct client_desc*) cl_ctx) +#define CL_CTX(client) ((struct client_ctx*) client) + + /* Order/Sequence: Client ID, Will Topic, Will Msg, Username, Password */ + const struct utf8_string *conn_pl_utf8s[5]; /* Ref: CONNECT Payload */ + uint8_t will_opts; + + /* Wait-List for Level 1 ACK(s), which are PUBACK, PUBREC, UN/SUBACK */ + struct mqtt_ack_wlist qos_ack1_wl; + + /* Circular queue to track QOS2 PUBLISH packets from the server. They + are tracked for the duration of PUBLISH-RX to PUBREL-RX. + */ + struct pub_qos2_cq qos2_rx_cq; + + /* Circular queue to track QOS2 PUBLISH packets from the client. They + are tracked for the duration of PUBREC-RX to PUBOMP-RX. + */ + struct pub_qos2_cq qos2_tx_cq; + + struct mqtt_client_ctx_cbs app_ctx_cbs; /* Callback funcs from app */ +#define CTX_CBS_PTR(cl_ctx) &(CLIENT(cl_ctx)->app_ctx_cbs) + + struct tx_part_pkt tx_part;/* Reference to partial TX PKT */ + struct mqtt_packet *rx_mqp; /* Reference to partial RX PKT */ + void *app; + + uint32_t nwconn_opts; + char *server_addr; + uint16_t port_number; + struct secure_conn nw_security; + +} clients[MAX_NWCONN]; + +static void client_reset(struct client_desc *client) +{ + + struct mqtt_client_ctx_cbs *ctx_cbs = &client->app_ctx_cbs; + int32_t i = 0; + + cl_ctx_reset(CL_CTX(client)); + + for(i = 0; i < 5; i++) { + client->conn_pl_utf8s[i] = NULL; + } + client->will_opts = 0; + + mqp_ack_wlist_purge(&client->qos_ack1_wl); + + qos2_pub_cq_reset(&client->qos2_rx_cq); + qos2_pub_cq_reset(&client->qos2_tx_cq); + + ctx_cbs->publish_rx = NULL; + ctx_cbs->ack_notify = NULL; + ctx_cbs->disconn_cb = NULL; + + tx_part_reset(&client->tx_part); + client->rx_mqp = NULL; + client->app = NULL; + + client->nwconn_opts = 0; + client->server_addr = NULL; + client->port_number = 0; + + secure_conn_struct_init(&client->nw_security); + + return; +} + +static void client_desc_init(void) +{ + int32_t i = 0; + for(i = 0; i < MAX_NWCONN; i++) { + struct client_desc *client = clients + i; + struct client_ctx *cl_ctx = CL_CTX(client); + + /* Initialize certain fields to defaults */ + client->qos_ack1_wl.head = NULL; + client->qos_ack1_wl.tail = NULL; + client->tx_part.tx_mqp = NULL; + + client_reset(client); /* Reset remaining */ + + cl_ctx->next = free_ctxs; + free_ctxs = cl_ctx; + } + + return; +} + +static void mqp_free_locked(struct mqtt_packet *mqp) +{ + +// MUTEX_LOCKIN(); + +// MUTEX_UNLOCK(); + + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + mqp_free(mqp); + + // We have finished accessing the shared resource. Release the + // semaphore. + xSemaphoreGive( xSemaphore ); + + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } +} + +/*---------------------------------------------------------------------------- + * Fix-up to prevent certain good and non-callback MQP being reported to app + *---------------------------------------------------------------------------- + */ +/* cor --> clear on read. */ +static bool mqp_do_not_report_cor(struct mqtt_packet *mqp) +{ + bool rv = (1 == mqp->private_)? true : false; + mqp->private_ = 0; + return rv; +} + +#define MQP_RX_DO_NOT_RPT_COR(mqp) mqp_do_not_report_cor(mqp) + +/* Only if MQP has good RX data i.e. this macro shouldn't be used for bad RX */ +#define MQP_RX_DO_NOT_RPT_SET(mqp) (mqp->private_ = 1) + +#define MQP_TX_DONE_LEN_ADD(mqp, add) (mqp->private_ += add) +#define MQP_TX_DONE_LEN_GET(mqp) (mqp->private_) + +/*---------------------------Fix-Up Ends ------------------------------------*/ + +static int32_t loopb_net = -1; +static const uint8_t LOOP_DATA[] = {0x00, 0x01}; +#define LOOP_DLEN sizeof(LOOP_DATA) + +static int32_t loopb_trigger(void) +{ + + + uint8_t ip_addr[] = {127,0,0,1}; + + return (-1 != loopb_net)? + net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN, + loopb_portid, ip_addr, 4) : MQP_ERR_LIBQUIT; +} + +static void session_311fix(struct client_ctx *cl_ctx) +{ + struct mqtt_ack_wlist *wl = &CLIENT(cl_ctx)->qos_ack1_wl; + struct mqtt_packet *elem = wl->head; + + while(elem) { + struct mqtt_packet *next = elem->next; + if(MQTT_PUBLISH != elem->msg_type) + mqp_ack_wlist_remove(wl, elem->msg_id); + + elem = next; + } + + return; +} + +static void session_delete(struct client_ctx *cl_ctx) +{ + struct client_desc *client = CLIENT(cl_ctx); + + DBG_INFO("C: Cleaning session for net %d\n\r", cl_ctx->net); + + qos2_pub_cq_reset(&client->qos2_rx_cq); + qos2_pub_cq_reset(&client->qos2_tx_cq); + + mqp_ack_wlist_purge(&client->qos_ack1_wl); + + return; +} + +/*------------------------------------------------------------------------------ + * Routine to manage error conditions in client - close the network connection + *----------------------------------------------------------------------------*/ +static void do_net_close(struct client_ctx *cl_ctx) +{ + int32_t net = cl_ctx->net; + + if(-1 == net) + return; /* network closed already, must not happen */ + + if(IS_CLN_SESSION(cl_ctx)) { + session_delete(cl_ctx); + } else if(!IS_PROTO_VER31(cl_ctx)) { + /* Version 3.1.1 doesn't need SUB and UNSUB re-send */ + session_311fix(cl_ctx); + } + + tx_part_reset(&CLIENT(cl_ctx)->tx_part); /* Part TX, if any */ + + cl_ctx->flags &= ~(CONNACK_AWAIT_FLAG | NOW_CONNECTED_FLAG | + KA_PINGER_RSP_FLAG | USER_PING_RSP_FLAG | + NETWORK_CLOSE_FLAG | DO_CONNACK_TO_FLAG); + + cl_ctx->net = -1; + net_ops->close(net); + + USR_INFO("C: Net %d now closed\n\r", net); + + return; +} + +static void do_net_close_rx(struct client_ctx *cl_ctx, int32_t cause) +{ + struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); + + DBG_INFO("C: RX closing Net %d [%d]\n\r", cl_ctx->net, cause); + + do_net_close(cl_ctx); + if(ctx_cbs->disconn_cb) + ctx_cbs->disconn_cb(CLIENT(cl_ctx)->app, cause); + + if(A_GROUP_MEMBER(cl_ctx)) + cl_ctx_remove(&used_ctxs, cl_ctx); + + return; +} + +static void do_net_close_tx(struct client_ctx *cl_ctx, char *cause) +{ + DBG_INFO("C: TX closing Net %d [%s]\n\r", cl_ctx->net, cause); + + if(RECV_TASK_AVBL(cl_ctx)) { + cl_ctx->flags |= NETWORK_CLOSE_FLAG; + if(A_GROUP_MEMBER(cl_ctx)) + loopb_trigger(); + } else { + struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp; + + do_net_close(cl_ctx); /* No RX Task, close now */ + + /* Release partial MQP, if any, for a CTX w/ CB */ + if((NULL != rx_mqp) && (NULL != rx_mqp->free)) + mqp_free(rx_mqp); + + CLIENT(cl_ctx)->rx_mqp = NULL; + } + + return; +} + +/*---------------------------------------------------------------------------- + * QoS2 PUB RX Message handling mechanism and associated house-keeping + *--------------------------------------------------------------------------*/ +static bool qos2_pub_rx_logup(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); +} + +static bool ack2_msg_id_logup(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_tx_cq, msg_id); +} + +static bool qos2_pub_rx_unlog(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + return qos2_pub_cq_unlog(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); +} + +static bool ack2_msg_id_unlog(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + struct client_desc *client = CLIENT(cl_ctx); + if(qos2_pub_cq_unlog(&client->qos2_tx_cq, msg_id)) { + struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); + if(ctx_cbs->ack_notify) + ctx_cbs->ack_notify(client->app, MQTT_PUBCOMP, + msg_id, NULL, 0); + return true; + } + + return false; +} + +static bool qos2_pub_rx_is_done(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + return qos2_pub_cq_check(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); +} + +static bool awaits_pkts(struct client_ctx *cl_ctx) +{ + + struct client_desc *client = CLIENT(cl_ctx); + + return client->qos_ack1_wl.head || + qos2_pub_cq_count(&client->qos2_rx_cq) || + qos2_pub_cq_count(&client->qos2_tx_cq)? + true : false; +} + +static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp) +{ + mqp_free(mqp); + return MQP_ERR_PKT_LEN; +} + +static int32_t is_valid_utf8_string(const struct utf8_string *utf8) +{ + /* Valid topic should be > 0 byte and must hosted in usable buffer */ + return ((utf8->length > 0) && (NULL != utf8->buffer))? true : false; +} + +#define RET_IF_INVALID_UTF8(utf8) \ + if(false == is_valid_utf8_string(utf8)) \ + return -1; + +static bool is_connected(struct client_ctx *cl_ctx) +{ + + return (HAS_CONNECTION(cl_ctx) && !NEED_NET_CLOSE(cl_ctx))? + true : false; +} + +uint16_t mqtt_client_new_msg_id() +{ + return assign_new_msg_id(); +} + +bool mqtt_client_is_connected(void *ctx) +{ + return is_connected(CL_CTX(ctx)); +} + +/*---------------------------------------------------------------------------- + * MQTT TX Routines + *--------------------------------------------------------------------------*/ +static void used_ctxs_TO_sort(struct client_ctx *cl_ctx_TO) +{ + cl_ctx_remove(&used_ctxs, cl_ctx_TO); + cl_ctx_timeout_insert(&used_ctxs, cl_ctx_TO); +} + +static inline int32_t net_send(int32_t net, const uint8_t *buf, uint32_t len, void *ctx) +{ + + int32_t rv = net_ops->send(net, buf, len, ctx); + if(rv <= 0) { + memset(print_buf, 0x00, PRINT_BUF_LEN); + sprintf((char*) print_buf, "MQP_ERR_NETWORK %i\r\n", MQP_ERR_NETWORK); + Uart_Write((uint8_t *) print_buf); + rv = MQP_ERR_NETWORK; + } + return rv; +} + +#if 0 +static int32_t cl_ctx_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len, + bool is_conn_msg) +{ + + int32_t rv = MQP_ERR_NOTCONN; + + /* For CONNECT msg, a client context mustn't be already connected. + For others msgs, a client context must be conected to server */ + if(false == (is_conn_msg ^ is_connected(cl_ctx))) + goto cl_ctx_send_exit1; + + rv = net_send(cl_ctx->net, buf, len); + if(rv > 0) { /* A good send, do follow-up */ + cl_ctx_timeout_update(cl_ctx, net_ops->time()); + if(A_GROUP_MEMBER(cl_ctx) && HAS_CONNECTION(cl_ctx)) { + /* With update to timeout, + a sorting is impending */ + used_ctxs_TO_sort(cl_ctx); + } + + goto cl_ctx_send_exit1; /* A Good Send */ + } + + do_net_close_tx(cl_ctx, "snd-err"); + +cl_ctx_send_exit1: + USR_INFO("C: FH-B1 0x%02x, len %u bytes, to net %d: %s\n\r", + *buf, len, cl_ctx->net, (rv > 0)? "Sent" : "Fail"); + return rv; +} +#endif + +static int32_t cl_ctx_part_send(struct client_ctx *cl_ctx) +{ + + struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part; + const uint8_t *buf = TX_PART_BUFFER(tx_part); + uint32_t len = TX_PART_BUF_SZ(tx_part); + uint32_t ofs = tx_part->offset; + uint8_t B1 = *buf; + + int32_t rv = net_send(cl_ctx->net, buf, len, (void*)cl_ctx); + if(rv > 0) { /* Follow-up for a good send */ + if(HAS_CONNECTION(cl_ctx)) { + /* Update TX timeout, if 'CTX' is connected */ + cl_ctx_timeout_update(cl_ctx, net_ops->time()); + + /* After update, 'CTX'(s) sorting is a must */ + if(A_GROUP_MEMBER(cl_ctx)) + used_ctxs_TO_sort(cl_ctx); + } + + if(rv != len) + /* Partial data was sent */ + tx_part_addup(tx_part, rv); + else + tx_part_reset(tx_part); + + goto cl_ctx_send_exit1; /* A Good Send */ + } + + do_net_close_tx(cl_ctx, "snd-err"); + +cl_ctx_send_exit1: + USR_INFO("C: %s 0x%02x to net %d, %s (%d Bytes) [@ %u]\n\r", + ofs? "PartN" : "FH-B1", B1, cl_ctx->net, + (rv > 0)? "Sent" : "Fail", rv, net_ops->time()); + + return rv; +} + +static int32_t cl_ctx_seg1_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len, + bool is_conn_msg, struct mqtt_packet *tx_mqp) +{ + + struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part; + + /* For CONNECT msg, a client context mustn't be already connected. + For others msgs, a client context must be conected to server */ + if(false == (is_conn_msg ^ is_connected(cl_ctx))) + return MQP_ERR_NOTCONN; + + if(TX_PART_IN_USE(tx_part)) + return MQP_ERR_BADCALL; + + tx_part_setup(tx_part, buf, len, tx_mqp); + + return cl_ctx_part_send(cl_ctx); +} + +int32_t mqtt_client_send_progress(void *ctx) +{ + struct client_ctx *cl_ctx = CL_CTX(ctx); + struct tx_part_pkt *tx_part = NULL; + int32_t rv = MQP_ERR_BADCALL; + + if(NULL == ctx) + return MQP_ERR_FNPARAM; + + tx_part = &CLIENT(cl_ctx)->tx_part; + + if(!TX_PART_IN_USE(tx_part)) + return rv; + + rv = cl_ctx_part_send(cl_ctx); + if(rv > 0) + rv = TX_PART_BUF_SZ(tx_part); + + return rv; +} + +static int32_t wr_connect_pl(struct client_ctx *cl_ctx, uint8_t *buf, + uint32_t fsz, uint8_t *conn_opts) +{ + + /* UTF8 usage selection order: Client, W-Topic, W-Msg, User-Name, Pwd */ + uint8_t utf8_sel[] = {0x00, WILL_CONFIG_VAL, 0x00, + USER_NAME_OPVAL, PASS_WORD_OPVAL + }; + struct client_desc *client = CLIENT(cl_ctx); + uint8_t *ref = buf; + + int32_t i = 0; + + for(i = 0; i < 5; i++) { /* TBD 5 --> macro */ + uint16_t len = 2; + const struct utf8_string *utf8 = client->conn_pl_utf8s[i]; + if(NULL == utf8) { + /* UTF8 absent: Client ID (i = 0) and Will MSG (i = 2) + set zero byte length in the CONNECT message */ + if(0 != i) + if(!((2 == i) && (*conn_opts & WILL_CONFIG_VAL))) + continue; /* Others, just pass */ + } else { + len += utf8->length; + } + + if(fsz < (buf - ref + len)) { /* TBD end = ref + fsz */ + Uart_Write((uint8_t*)"Payload: no space left fail\r\n"); + return MQP_ERR_PKT_LEN; /* Payload: no space left */ + } + if(2 == len) { + buf += buf_wr_nbo_2B(buf, 0); /* WR 0 byte length */ + } else { + buf += mqp_buf_wr_utf8(buf, utf8); + } + *conn_opts |= utf8_sel[i]; /* Enable message flags */ + } + + return buf - ref; +} + + +/* Define protocol information for the supported versions */ +static uint8_t mqtt310[] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', 0x03}; +static uint8_t mqtt311[] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04}; + +static inline uint16_t get_connect_vh_len(struct client_ctx *cl_ctx) +{ + + return (IS_PROTO_VER31(cl_ctx)? sizeof(mqtt310) : sizeof(mqtt311)) + + 3; +} + +static int32_t wr_connect_vh(struct client_ctx *cl_ctx, uint8_t *buf, + uint16_t ka_secs, uint8_t conn_opts) +{ + + uint8_t *ref = buf; + + if(IS_PROTO_VER31(cl_ctx)) + buf += buf_wr_nbytes(buf, mqtt310, sizeof(mqtt310)); + else + buf += buf_wr_nbytes(buf, mqtt311, sizeof(mqtt311)); + + *buf++ = conn_opts; + buf += buf_wr_nbo_2B(buf, ka_secs); + + return buf - ref; +} + +static int32_t net_connect(struct client_ctx *cl_ctx) +{ + + struct client_desc *client = CLIENT(cl_ctx); + + if(NEED_NET_CLOSE(cl_ctx)) { + Uart_Write((uint8_t*)"Return MQP_ERR_NOT_DEF\r\n"); + return MQP_ERR_NOT_DEF; + } + if(NULL == net_ops) { + Uart_Write((uint8_t*)"Return MQP_ERR_NET_OPS\r\n"); + return MQP_ERR_NET_OPS; + } + cl_ctx->net = net_ops->open(client->nwconn_opts | DEV_NETCONN_OPT_TCP, + client->server_addr, + client->port_number, + &client->nw_security); + + return (-1 == cl_ctx->net)? MQP_ERR_NETWORK : 0; +} + +static +int32_t cl_ctx_conn_state_try_locked(struct client_ctx *cl_ctx, const uint8_t *buf, + uint32_t len, uint16_t ka_secs, bool clean_session, + struct mqtt_packet *tx_mqp) +{ + + int32_t rv = 0; + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 100 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + + rv = net_connect(cl_ctx); + if(rv < 0) { + Uart_Write((uint8_t*)"net_connect failed\r\n"); + goto cl_ctx_conn_state_try_locked_exit1; + } + /* Ensure LIB is initialized & CTX isn't awaiting CONNACK */ + rv = MQP_ERR_BADCALL; + if(false == ((INIT_DONE_STATE != cl_lib_state) || (AWAITS_CONNACK(cl_ctx)))) { + rv = cl_ctx_seg1_send(cl_ctx, buf, len, true, tx_mqp); + } + if(rv < 0) { + Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n"); + goto cl_ctx_conn_state_try_locked_exit1; /* Fail */ + } + /* Successfully sent CONNECT msg - let's do housekeeping */ + cl_ctx->timeout = net_ops->time();/* Fixup: CONN TX Time */ + cl_ctx->flags |= DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG; + cl_ctx->flags |= clean_session? CLEAN_SESSION_FLAG : 0; + + cl_ctx->ka_secs = ka_secs; + + if(A_GROUP_MEMBER(cl_ctx)) { + cl_ctx->next = conn_ctxs; + conn_ctxs = cl_ctx; + + /* First entry in 'conn_ctxs': schedule a move to + 'used_conn' (for house-keeping and tracking) */ + if(NULL == cl_ctx->next) { + rv = loopb_trigger(); + } + } + + + // We have finished accessing the shared resource. Release the + // semaphore. + //xSemaphoreGive( xSemaphore ); + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +cl_ctx_conn_state_try_locked_exit1: +// MUTEX_UNLOCK(); + xSemaphoreGive(xSemaphore); + + return rv; +} + +static +int32_t connect_msg_send(struct client_ctx *cl_ctx, bool clean_session, uint16_t ka_secs) +{ + + struct mqtt_packet *mqp = mqp_client_send_alloc(MQTT_CONNECT); + uint8_t *buf, *ref, conn_opts = clean_session? CLEAN_START_VAL : 0; + int32_t rv = MQP_ERR_PKT_LEN; + uint32_t fsz; /* Free buffer size in PKT */ + uint16_t vhl = get_connect_vh_len(cl_ctx); + + if(NULL == mqp) { + Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\r\n"); + return MQP_ERR_PKT_AVL; + } + fsz = MQP_FREEBUF_LEN(mqp); + if(fsz < vhl) { + Uart_Write((uint8_t*)"No space for VAR HDR\r\n"); + goto connect_msg_send_exit1; /* No space for VAR HDR */ + } + mqp->vh_len = vhl; /* Reserve buffer for variable header */ + buf = ref = MQP_PAYLOAD_BUF(mqp);/* Get started to incorporate payload */ + + rv = wr_connect_pl(cl_ctx, buf, fsz - vhl, &conn_opts);/* Payload data */ + if(rv < 0) { + memset(print_buf, 0x00, PRINT_BUF_LEN); + sprintf((char*) print_buf, "Payload WR failed %i\r\n",rv); + Uart_Write((uint8_t *) print_buf); + goto connect_msg_send_exit1; /* Payload WR failed */ + } + buf += rv; + mqp->pl_len = buf - ref; + + wr_connect_vh(cl_ctx, ref - vhl, ka_secs, + CLIENT(cl_ctx)->will_opts | conn_opts); /* Var Header */ + + mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, MQTT_QOS0, false));/* Fix Header */ + ref = MQP_FHEADER_BUF(mqp); + + /* Following routine frees up MQP - whether error or not */ + return cl_ctx_conn_state_try_locked(cl_ctx, ref, buf - ref, + ka_secs, clean_session, + mqp); +connect_msg_send_exit1: + + if(mqp) { + mqp_free_locked(mqp); + } + return rv; +} + +int32_t mqtt_connect_msg_send(void *ctx, bool clean_session, uint16_t ka_secs) +{ + + return ctx? + connect_msg_send(CL_CTX(ctx), clean_session, ka_secs) : -1; +} + +/* + To be used for the following messages: PUBLISH, SUBSCRIBE, UNSUBSCRIBE + Dispatches msg to broker over socket. Frees-up MQP, in case, MSG has QoS0 or + if client-lib allocated MQP encounters an error in dispatch. + Returns, on success, number of bytes transfered, otherwise -1 +*/ +static int32_t _msg_dispatch(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, + enum mqtt_qos qos, bool retain) +{ + + bool not_qos0 = (MQTT_QOS0 != qos)? true : false; + uint16_t msg_id = mqp->msg_id; + int32_t rv = MQP_ERR_NETWORK; + + mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain)); + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 200 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + + if(not_qos0) { + mqp->n_refs++; /* Need to enlist, do not free-up MQP */ + } + /* Tries to free-up MQP either on error or if full pkt is sent */ + rv = cl_ctx_seg1_send(cl_ctx, MQP_FHEADER_BUF(mqp), + MQP_CONTENT_LEN(mqp), false, + mqp); + + /* At this point, error or not, QoS0 MQP would have been freed */ + + if((rv <= 0) && not_qos0) { + mqp_free(mqp); /* Err: Explicitly free-up non QoS0 MQP */ + Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n"); + goto _msg_dispatch_exit1; + } + + rv = msg_id; /* Make progress for a good send to the server */ + + if(not_qos0) { /* Enlist non QOS0 MQP to await ACK from server */ + mqp_ack_wlist_append(&CLIENT(cl_ctx)->qos_ack1_wl, mqp); + } + // We have finished accessing the shared resource. Release the + // semaphore. + //xSemaphoreGive( xSemaphore ); + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +_msg_dispatch_exit1: + +// MUTEX_UNLOCK(); + xSemaphoreGive(xSemaphore); + + return rv; +} + +static +int32_t msg_dispatch_no_free(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, + enum mqtt_qos qos, bool retain) +{ + if((NULL == mqp) || (NULL == cl_ctx)) + return MQP_ERR_FNPARAM; + + mqp->n_refs++; /* Ensures caller that MQP is not freed-up */ + + return _msg_dispatch(cl_ctx, mqp, qos, retain); +} + +int32_t mqtt_client_pub_msg_send(void *ctx, const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, + enum mqtt_qos qos, bool retain) +{ + + struct mqtt_packet *mqp = NULL; + + if((NULL == ctx) || + (NULL == topic) || + ((data_len > 0) && (NULL == data_buf))) { + Uart_Write((uint8_t*)"MQP_ERR_FNPARAM\n\r"); + return MQP_ERR_FNPARAM; + } + if(false == is_valid_utf8_string(topic)) { + Uart_Write((uint8_t*)"MQP_ERR_CONTENT\n\r"); + return MQP_ERR_CONTENT; + } + mqp = mqp_client_send_alloc(MQTT_PUBLISH); + if(NULL == mqp) { + Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\n\r"); + return MQP_ERR_PKT_AVL; + } + if((0 > mqp_pub_append_topic(mqp, topic, qos? assign_new_msg_id(): 0)) || + (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) { + Uart_Write((uint8_t*)"len_err\n\r"); + return len_err_free_mqp(mqp); + } + + return _msg_dispatch(CL_CTX(ctx), mqp, qos, retain); +} + +int32_t mqtt_client_pub_dispatch(void *ctx, struct mqtt_packet *mqp, + enum mqtt_qos qos, bool retain) +{ + return msg_dispatch_no_free(CL_CTX(ctx), mqp, qos, retain); +} + +static int32_t tail_incorp_msg_id(struct mqtt_packet *mqp) +{ + uint8_t *buf = MQP_FHEADER_BUF(mqp) + mqp->vh_len; + + if(0 == mqp->msg_id) { + mqp->msg_id = assign_new_msg_id(); + buf += buf_wr_nbo_2B(buf, mqp->msg_id); + mqp->vh_len += 2; + + return 2; + } + + return 0; +} + +static int32_t buf_utf8_wr_try(uint8_t *buf, uint32_t fsz, const struct utf8_string *topic, + uint8_t qid) +{ + uint8_t *ref = buf; + + if(fsz < (topic->length + 2 + (QFL_VALUE == qid)? 0 : 1)) + return MQP_ERR_PKT_LEN; /* No buf */ + + if(false == is_valid_utf8_string(topic)) + return MQP_ERR_CONTENT;/* Invalid */ + + buf += mqp_buf_wr_utf8(buf, topic); + if(QFL_VALUE != qid) + *buf++ = qid; + + return buf - ref; +} + +static int32_t utf8_array_send(struct client_ctx *cl_ctx, + const struct utf8_strqos *subsc_vec, + const struct utf8_string *unsub_vec, + uint32_t n_elem) +{ + struct mqtt_packet *mqp; + uint8_t *ref, *buf, *end; + uint32_t i; + + if((NULL == cl_ctx) || !((!!subsc_vec) ^ (!!unsub_vec)) || (0 == n_elem)) + return MQP_ERR_FNPARAM; + + mqp = mqp_client_send_alloc(subsc_vec? + MQTT_SUBSCRIBE : MQTT_UNSUBSCRIBE); + if(NULL == mqp) + return MQP_ERR_PKT_AVL; + + buf = MQP_VHEADER_BUF(mqp); + end = MQP_FREEBUF_LEN(mqp) + buf; /* End of free buffer */ + if((end - buf) < 2) + return len_err_free_mqp(mqp);/* MSG-ID: no space */ + + buf += tail_incorp_msg_id(mqp); + ref = buf; + + for(i = 0; i < n_elem; i++) { + const struct utf8_string *topic; + struct utf8_string topreq; + int32_t rv; + + if(subsc_vec) { + topreq.length = subsc_vec[i].length; + topreq.buffer = subsc_vec[i].buffer; + topic = &topreq; + } else + topic = unsub_vec + i; + + rv = buf_utf8_wr_try(buf, end - buf, topic, subsc_vec? + (uint8_t)subsc_vec[i].qosreq : QFL_VALUE); + if(rv < 0) { + mqp_free(mqp); + return rv; + } + + buf += rv; + } + + mqp->pl_len = buf - ref; /* Total length of topics data */ + + return _msg_dispatch(cl_ctx, mqp, MQTT_QOS1, false); +} + +int32_t mqtt_sub_msg_send(void *ctx, const struct utf8_strqos *qos_topics, uint32_t count) +{ + return utf8_array_send(CL_CTX(ctx), qos_topics, NULL, count); +} + +int32_t mqtt_sub_dispatch(void *ctx, struct mqtt_packet *mqp) +{ + return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false); +} + +int32_t mqtt_unsub_msg_send(void *ctx, const struct utf8_string *topics, uint32_t count) +{ + return utf8_array_send(CL_CTX(ctx), NULL, topics, count); +} + +int32_t mqtt_unsub_dispatch(void *ctx, struct mqtt_packet *mqp) +{ + return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false); +} + +/* Note: in this revision of implementation, vh_msg_send() is being invoked + from a locked RX context. Should this situation change, so should the + 'locking' considerations in the routine. */ +static int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, + enum mqtt_qos qos, bool has_vh, + uint16_t vh_data) +{ + uint8_t buf[4]; + uint32_t len = 2; + + buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false)); + buf[1] = has_vh ? 2 : 0; + + if(has_vh) + len += buf_wr_nbo_2B(buf + 2, vh_data); + + return cl_ctx_seg1_send(cl_ctx, buf, len, false, NULL); +} + +static int32_t pingreq_send(struct client_ctx *cl_ctx, uint32_t rsp_flag) +{ + + int32_t rv = 0; + uint8_t buf[2]; + + buf[0] = MAKE_FH_BYTE1(MQTT_PINGREQ, + MAKE_FH_FLAGS(false, MQTT_QOS0, false)); + buf[1] = 0; + + /* Note: in case of error in network send, cl_ctx_send() may + try to terminate connection with server. */ + rv = cl_ctx_seg1_send(cl_ctx, buf, 2, false, NULL); + if(rv > 0) + cl_ctx->flags |= rsp_flag; + + return rv; +} + +int32_t mqtt_pingreq_send(void *ctx) +{ + Uart_Write((uint8_t*)"mqtt_pingreq_send\r\n"); + int32_t rv = 0; + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + rv = pingreq_send(CL_CTX(ctx), USER_PING_RSP_FLAG); + + // We have finished accessing the shared resource. Release the + // semaphore. + xSemaphoreGive( xSemaphore ); + + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +// MUTEX_UNLOCK(); + return rv; +} + +int32_t mqtt_disconn_send(void *ctx) +{ + Uart_Write((uint8_t*)"mqtt_disconn_send\r\n"); + uint8_t buf[2]; + + buf[0] = MAKE_FH_BYTE1(MQTT_DISCONNECT, + MAKE_FH_FLAGS(false, MQTT_QOS0, false)); + buf[1] = 0; + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + /* Note: in case of error in network send, cl_ctx_send() may + try to terminate connection with server. */ + if(cl_ctx_seg1_send(CL_CTX(ctx), buf, 2, false, NULL) > 0) { + /* Terminate connection on application's request */ + do_net_close_tx(CL_CTX(ctx), "DISCONN"); + } + + // We have finished accessing the shared resource. Release the + // semaphore. + xSemaphoreGive( xSemaphore ); + + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +// MUTEX_UNLOCK(); + return 0; +} + +/*------------------------------------------------------------------------------ + * MQTT RX Routines + *------------------------------------------------------------------------------ + */ +static bool ack1_wl_mqp_dispatch(struct client_ctx *cl_ctx) +{ + struct mqtt_ack_wlist *wlist = &CLIENT(cl_ctx)->qos_ack1_wl; + struct mqtt_packet *mqp = NULL; + bool rv = true; + + for(mqp = wlist->head; mqp && (true == rv); mqp = mqp->next) { + uint8_t *buf = MQP_FHEADER_BUF(mqp); + mqp->fh_byte1 = *buf |= DUP_FLAG_VAL(true); + + mqp->n_refs++; /* Ensures MQP is not freed by following */ + + /* Error or not, following routine tries to free up MQP */ + if(cl_ctx_seg1_send(cl_ctx, buf, MQP_CONTENT_LEN(mqp), + false, mqp) <= 0) + rv = false; + } + + return rv; +} + +/* TBD candidate for common */ +static bool ack2_msg_id_dispatch(struct client_ctx *cl_ctx) +{ + struct pub_qos2_cq *tx_cq = &CLIENT(cl_ctx)->qos2_tx_cq; + uint8_t rd_idx = tx_cq->rd_idx; + uint8_t n_free = tx_cq->n_free; + bool rv = true; + uint8_t i = 0; + + for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free) && (true == rv); i++) { + if(vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1, + true, tx_cq->id_vec[i]) <= 0) + rv = false; + } + + return rv; +} + +static void session_resume(struct client_ctx *cl_ctx) +{ + DBG_INFO("C: Re-send ACK awaited QoS1/2 msgs to net %d\n\r", + cl_ctx->net); + + if(ack1_wl_mqp_dispatch(cl_ctx)) + ack2_msg_id_dispatch(cl_ctx); + + return; +} + +static bool ack1_wl_rmfree(struct mqtt_ack_wlist *wl, uint16_t msg_id) +{ + struct mqtt_packet *mqp = mqp_ack_wlist_remove(wl, msg_id); + if(NULL != mqp) { + mqp_free(mqp); + return true; + } + + USR_INFO("Err: Unexpected ACK w/ ID 0x%04x\n\r", msg_id); + + return false; +} + +static bool _proc_pub_rec_rx(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + /* Follow-up messages for QOS2 PUB must be transacted in the + same order as the initial sequence of QOS2 PUB dispatches. + Therefore, checking the first entry should be OK + */ + struct mqtt_packet *mqp = CLIENT(cl_ctx)->qos_ack1_wl.head; + + if((msg_id == mqp->msg_id) && ack2_msg_id_logup(cl_ctx, msg_id)) { + + ack1_wl_rmfree(&CLIENT(cl_ctx)->qos_ack1_wl, msg_id); + + vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1, + true, msg_id); + + return true; + } + + return false; /* Unexpected PUBREC or QOS2 store exceeded */ +} + +static bool _proc_pub_rel_rx(struct client_ctx *cl_ctx, uint16_t msg_id) +{ + /* For a PUB-REL RX, send PUBCOMP to let server make progress */ + vh_msg_send(cl_ctx, MQTT_PUBCOMP, MQTT_QOS0, true, msg_id); + + if(qos2_pub_rx_is_done(cl_ctx, msg_id)) + qos2_pub_rx_unlog(cl_ctx, msg_id); /* Expunge record */ + + return true; +} + +/* + Process ACK Message from Broker. + Returns true on success, otherwise false. + Used for: PUBACK, SUBACK and UNSUBACK +*/ +static +bool _proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); + struct client_desc *client = CLIENT(cl_ctx); + uint16_t msg_id = mqp_raw->msg_id; + uint32_t len = mqp_raw->pl_len; + + /* Caters to SUB-ACK, UNSUB-ACK and PUB-ACK Messages */ + if(false == ack1_wl_rmfree(&client->qos_ack1_wl, msg_id)) + return false; /* Err: MSG_ID was not awaited */ + + if(ctx_cbs->ack_notify) + ctx_cbs->ack_notify(client->app, mqp_raw->msg_type, msg_id, + len? MQP_PAYLOAD_BUF(mqp_raw): NULL, + len); + return true; +} + +static +bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + uint8_t msg_type = mqp_raw->msg_type; + bool rv = false; + uint16_t msg_id = 0; + + if(false == mqp_proc_msg_id_ack_rx(mqp_raw, MQTT_SUBACK == msg_type)) + return rv; /* Problem in contents received from server */ + + msg_id = mqp_raw->msg_id; + + if(MQTT_PUBREC == msg_type) { + rv = _proc_pub_rec_rx(cl_ctx, msg_id); + if(rv) + MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */ + + } else if(MQTT_PUBREL == msg_type) { + rv = _proc_pub_rel_rx(cl_ctx, msg_id); + if(rv) + MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */ + } else if(MQTT_PUBCOMP == msg_type) { + rv = ack2_msg_id_unlog(cl_ctx, msg_id); + } else { + rv = _proc_ack_msg_rx(cl_ctx, mqp_raw); + } + + return rv; +} + +static +bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); + bool good_pub = mqp_proc_pub_rx(mqp_raw); + uint8_t B = mqp_raw->fh_byte1; + enum mqtt_qos qos = ENUM_QOS(B); + uint16_t msg_id = 0; + + if(false == good_pub) + return false; /* Didn't get nicely composed PUB Packet */ + + msg_id = mqp_raw->msg_id; + + /* Irrespective of the result of the ACK through vh_msg_send(), + the implementation has chosen to process the good PUB packet. + Any error will be handled in next iteration of rx processing. + */ + if(MQTT_QOS1 == qos) + vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id); + + if(MQTT_QOS2 == qos) { + /* Ensuring "only once" philosophy for MQTT QoS2 PUBs */ + if(qos2_pub_rx_is_done(cl_ctx, msg_id)) { + /* Already delivered. Drop it & do not report */ + MQP_RX_DO_NOT_RPT_SET(mqp_raw); + return true; /* No more follow-up; all's good */ + } + + if(false == qos2_pub_rx_logup(cl_ctx, msg_id)) + return false; /* Failed to record New RX PUB */ + + vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id); + } + + /* QoS obliations completed, present PUBLISH RX packet to app */ + if(ctx_cbs->publish_rx) { + /* App has chosen the callback method to receive PKT */ + mqp_raw->n_refs++; /* Make app owner of this packet */ + if(ctx_cbs->publish_rx(CLIENT(cl_ctx)->app, BOOL_DUP(B), + qos, BOOL_RETAIN(B), mqp_raw)) { + /* App has no use of PKT any more, so free it */ + mqp_raw->n_refs--; /* Take back ownership */ + } + } + + return true; +} + +static +bool proc_connack_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); + uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); + + mqp_raw->vh_len += 2; + mqp_raw->pl_len -= 2; + + if(0 != mqp_raw->pl_len) + return false; /* There is no payload in message */ + + cl_ctx->flags &= ~(DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG); + + if(VHB_CONNACK_RC(buf)) + /* Server has refused the connection, inform the app */ + goto proc_connack_rx_exit1; + + cl_ctx->flags |= NOW_CONNECTED_FLAG; + cl_ctx_timeout_update(cl_ctx, net_ops->time()); /* start KA */ + + if(IS_CLN_SESSION(cl_ctx)) + session_delete(cl_ctx); + else + session_resume(cl_ctx); + +proc_connack_rx_exit1: + if(ctx_cbs->ack_notify) + ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, mqp_raw->msg_type, + 0, buf, 2); + + return true; +} + +static +bool proc_pingrsp_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); + + if(0 != mqp_raw->pl_len) + return false; + + if(AWAITS_KA_PING(cl_ctx)) { + cl_ctx->flags &= ~KA_PINGER_RSP_FLAG; + return true; + } + + if(AWAITS_PINGRSP(cl_ctx)) { + cl_ctx->flags &= ~USER_PING_RSP_FLAG; + if(ctx_cbs->ack_notify) + ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, + mqp_raw->msg_type, + 0, NULL, 0); + return true; + } + + return false; +} + +static +bool conn_sent_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + bool rv = false; + + switch(mqp_raw->msg_type) { + + case MQTT_CONNACK: + /* Changes client_ctx->flags to CONNECTED */ + rv = proc_connack_rx(cl_ctx, mqp_raw); + break; + + default: + break; + } + + return rv; +} + +static +bool connected_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) +{ + bool rv = false; + + switch(mqp_raw->msg_type) { + + case MQTT_SUBACK: + case MQTT_PUBACK: + case MQTT_PUBREC: + case MQTT_PUBREL: + case MQTT_PUBCOMP: + case MQTT_UNSUBACK: + rv = proc_ack_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_PINGRSP: + rv = proc_pingrsp_rx(cl_ctx, mqp_raw); + break; + + case MQTT_PUBLISH: + rv = proc_pub_msg_rx(cl_ctx, mqp_raw); + break; + + case MQTT_CONNACK: /* not expected */ + default: + break; + } + + return rv; +} + +static bool process_recv(struct client_ctx *cl_ctx, + struct mqtt_packet *mqp_raw) +{ + bool rv; + + USR_INFO("C: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d [@ %u]\n\r", + mqp_raw->fh_byte1, cl_ctx->net, net_ops->time()); + + /* Working Principle: Only RX processing errors should be + reported as 'false'. Status of TX as a follow-up to RX + messages need not be reported by the xyz_rx() routines. + Error observed in TX is either dealt in next iteration + of RX loop (in case, there is a dedicated RX task for + the CTX) or in TX routine itself (in case, there is no + dedicated RX task for the CTX). + */ + rv = AWAITS_CONNACK(cl_ctx)? + conn_sent_state_rx(cl_ctx, mqp_raw) : + connected_state_rx(cl_ctx, mqp_raw); + + DBG_INFO("C: Msg w/ ID 0x%04x, processing status: %s\n\r", + mqp_raw->msg_id, rv? "Good" : "Fail"); + + return rv; +} + +static int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, void *ctx) +{ + bool timed_out = false; + int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, &timed_out, ctx); + if(rv <= 0) { + USR_INFO("C: Net %d, Raw Error %d, Time Out: %c\n\r", + net, rv, timed_out? 'Y' : 'N'); + + if(timed_out) + rv = MQP_ERR_TIMEOUT; + } + + return rv; +} + +/* + MQTT 3.1.1 implementation + ------------------------- + + Keep Alive Time is maxmimum interval within which a client should send a + packet to broker. If there are either no packets to be sent to broker or + no retries left, then client is expected to a send a PINGREQ within Keep + Alive Time. Broker should respond by sending PINGRSP with-in reasonable + time of 'wait_secs'. If Keep Alive Time is set as 0, then client is not + expected to be disconnected due to in-activity of MQTT messages. Value + of 'wait_secs' is assumed to be quite smaller than (non-zero) 'ka_secs'. +*/ +static void conn2used_ctxs(uint32_t wait_secs) +{ + + while(conn_ctxs) { + struct client_ctx *cl_ctx = conn_ctxs; + conn_ctxs = conn_ctxs->next; + + cl_ctx_timeout_insert(&used_ctxs, cl_ctx); + } +} + +static int32_t single_ctx_ka_sequence(struct client_ctx *cl_ctx, uint32_t wait_secs) +{ + + uint32_t now_secs = net_ops->time(); + + if(AWAITS_CONNACK(cl_ctx) && CFG_CONNACK_TO(cl_ctx)) { + cl_ctx->timeout += wait_secs; /* Set CONNACK timeout value */ + cl_ctx->flags &= ~DO_CONNACK_TO_FLAG; + } + + if(cl_ctx->timeout > now_secs) { + return 1; /* Still have time for next message transaction */ + } + if(is_connected(cl_ctx)) { + /* Timeout has happened. Check for PINGRESP if PINGREQ done. + Otherwise, send PINGREQ (Are You there?) to the server. */ + if(AWAITS_KA_PING(cl_ctx)) { + goto single_ctx_ka_sequence_exit1; /* No PINGRESP */ + } + return pingreq_send(cl_ctx, KA_PINGER_RSP_FLAG); /* Hello! */ + } + +single_ctx_ka_sequence_exit1: + + USR_INFO("C: Net %d, no RX MSG in reasonable time\n\r", cl_ctx->net); + return -1; +} + +static uint32_t single_ctx_adj_wait_secs_get(struct client_ctx *cl_ctx, uint32_t wait_secs) +{ + + return (KA_TIMEOUT_NONE != cl_ctx->timeout)? + MIN(cl_ctx->timeout - net_ops->time(), wait_secs) : wait_secs; +} + +static int32_t single_ctx_rx_prep(struct client_ctx *cl_ctx, uint32_t *secs2wait) +{ + + int32_t rv; + + if(-1 == cl_ctx->net) + return MQP_ERR_NOTCONN; /* Likely for a ctx w/o a receive task */ + + if(NEED_NET_CLOSE(cl_ctx)) + rv = MQP_ERR_NOTCONN; + else if(0 > single_ctx_ka_sequence(cl_ctx, *secs2wait)) + rv = MQP_ERR_NETWORK; + else { + *secs2wait = single_ctx_adj_wait_secs_get(cl_ctx, *secs2wait); + return 1; + } + + do_net_close_rx(cl_ctx, rv); + return rv; +} + +static +int32_t proc_ctx_data_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, + uint32_t wait_secs, void **app) +{ + + int32_t rv = MQP_ERR_NOTCONN; + int32_t net = cl_ctx->net; + + *app = cl_ctx->usr; + + rv = net_recv(net, mqp, wait_secs, (void*)cl_ctx); + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + if(rv > 0) { + if(false == process_recv(cl_ctx, mqp)) { + Uart_Write((uint8_t*)"MQP_ERR_CONTENT\r\n"); + rv = MQP_ERR_CONTENT; + } + } + /* RX: close the network connection to the server for this context, if + (a) there is a processing / protocol error other than time-out + (b) A good MQTT CONNACK has a return code - connection refused + */ + if(((rv < 0) && (rv != MQP_ERR_TIMEOUT)) || + ((MQTT_CONNACK == mqp->msg_type) && + MQP_CONNACK_RC(mqp))) { + do_net_close_rx(cl_ctx, rv); + } + + // We have finished accessing the shared resource. Release the + // semaphore. + xSemaphoreGive( xSemaphore ); + + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +// MUTEX_UNLOCK(); + return rv; +} + +static int32_t mqp_setup_proc_ctx_data_recv(struct client_ctx *cl_ctx, + struct mqtt_packet *mqp, + uint32_t wait_secs, void **app) +{ + struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp; + int32_t rv; + + if(NULL != mqp) { + /* Input MQP must be same as MQP for partial RX, if any */ + if(rx_mqp) { + if(mqp != rx_mqp) + return MQP_ERR_FNPARAM; + } else + mqp_reset(mqp); + } + + if(NULL == mqp) { + mqp = rx_mqp? rx_mqp : mqp_client_recv_alloc(0); + if(NULL == mqp) + return MQP_ERR_PKT_AVL; + } + + rv = proc_ctx_data_recv(cl_ctx, mqp, wait_secs, app); + if(rv == MQP_ERR_TIMEOUT) { + CLIENT(cl_ctx)->rx_mqp = mqp; /* Save partial RX MQP */ + } else { + /* Control reaches here due to either an error in RX or the + completion of RX. In both the cases, the MQP needs to be + detached and processed. For completion of RX: + callback mode: Application has used up MQP data; free it + Non-callback mode: Application will now use complete MQP + */ + CLIENT(cl_ctx)->rx_mqp = NULL; + if(mqp->free) + mqp_free_locked(mqp); /* For only callback mode */ + } + + return rv; +} + +static int32_t cl_ctx_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, + uint32_t wait_secs) +{ + + void *app = NULL; + int32_t rv = 0; + + do { + if(mqp && (NULL == CLIENT(cl_ctx)->rx_mqp)) + mqp_reset(mqp); + + rv = single_ctx_rx_prep(cl_ctx, &wait_secs); + if(rv > 0) + rv = mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, + wait_secs, + &app); + + /* 'mqp' must be valid, if rv > 0 but including further + & additional check for sake of static cod eanalysis.*/ + } while((rv > 0) && mqp && MQP_RX_DO_NOT_RPT_COR(mqp)); + + return rv; +} + +int32_t mqtt_client_ctx_await_msg(void *ctx, uint8_t msg_type, struct mqtt_packet *mqp, + uint32_t wait_secs) +{ + struct client_ctx *cl_ctx = CL_CTX(ctx); + int32_t rv = -1; + + if((NULL == cl_ctx) || (NULL == mqp)) + return MQP_ERR_FNPARAM; + + do { + rv = cl_ctx_recv(cl_ctx, mqp, wait_secs); + + } while((rv > 0) && + (0 != msg_type) && (msg_type != mqp->msg_type)); + + return rv; +} + +int32_t mqtt_client_ctx_run(void *ctx, uint32_t wait_secs) +{ + + int32_t rv; + + if(NULL == ctx) + return MQP_ERR_FNPARAM; + + do { + rv = cl_ctx_recv(CL_CTX(ctx), NULL, wait_secs); + + } while(rv > 0); + + return rv; +} + +static struct client_ctx *group_ctxs_ka_sequence(uint32_t wait_secs) { + + struct client_ctx *cl_ctx = used_ctxs; + + while(cl_ctx) { + struct client_ctx *next = cl_ctx->next; + if(single_ctx_rx_prep(cl_ctx, &wait_secs) < 0) { + /* 'CTX' no more eligible for operation + and has been removed from used_list */ + if(false == grp_has_cbfn) + return cl_ctx; + } + + cl_ctx = next; + } + + return NULL; +} + +#define IO_MON_NO_TIMEOUT (0xFFFFFFFF) + +static uint32_t group_ctxs_adj_wait_secs_get(uint32_t wait_secs) +{ + + return used_ctxs? + single_ctx_adj_wait_secs_get(used_ctxs, wait_secs) : wait_secs; +} + +static int32_t recv_hvec[MAX_NWCONN + 1 + 1]; /* GROUP LISTEN PORT + VEC END */ +static int32_t send_hvec = -1; +static int32_t rsvd_hvec = -1; + +/* Caller must ensure atomic enviroment for execution of this routine */ +static void recv_hvec_load(int32_t *hvec_recv, uint32_t size, struct client_ctx *list) +{ + + int32_t i = 0; + + for(i = 0; (i < size) && (NULL != list); i++, list = list->next) + hvec_recv[i] = list->net; + + hvec_recv[i] = -1; + + return; +} + +static int32_t group_ctxs_rx_prep(uint32_t wait_secs, void **app) +{ + + /* CHK 'used ctx'(s) have live connection w/ server. If not, drop it */ + struct client_ctx *ctx_kaTO = group_ctxs_ka_sequence(wait_secs); + int32_t n_hnds; + + if(ctx_kaTO) { + *app = CLIENT(ctx_kaTO)->app; + return MQP_ERR_NETWORK; + } + + conn2used_ctxs(wait_secs); /* Now, add new 'ctx'(s) to 'used ctxs' */ + + recv_hvec[0] = loopb_net; + recv_hvec_load(&recv_hvec[1], MAX_NWCONN + 1, used_ctxs); + + wait_secs = group_ctxs_adj_wait_secs_get(wait_secs); + + n_hnds = net_ops->io_mon(recv_hvec, &send_hvec, + &rsvd_hvec, wait_secs); + if(0 == n_hnds) + n_hnds = MQP_ERR_TIMEOUT; + else if(n_hnds < 0) + n_hnds = MQP_ERR_LIBQUIT; + + return n_hnds; +} + +static int32_t proc_loopback_recv(int32_t net) +{ + + int32_t rv = 0; + uint8_t buf[LOOP_DLEN]; + + /* Thanks for waking-up thread, but ain't got much to do now */ + rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0); + if(rv <= 0) { + memset(print_buf, 0x00, PRINT_BUF_LEN); + sprintf((char*) print_buf, "MQP_ERR_LIBQUIT %i\r\n",MQP_ERR_LIBQUIT); + Uart_Write((uint8_t *) print_buf); + net_ops->close(net); + return MQP_ERR_LIBQUIT; + } + + return rv; +} + +static struct client_ctx *net_cl_ctx_find(int32_t net) { + struct client_ctx *cl_ctx = used_ctxs; + + while(cl_ctx && (net != cl_ctx->net)) + cl_ctx = cl_ctx->next; + + return cl_ctx; +} + +static int32_t proc_net_data_recv(int32_t net, struct mqtt_packet *mqp, void **app) +{ + + /* Note: used_ctxs are always managed by a single RX task */ + struct client_ctx *cl_ctx = net_cl_ctx_find(net); + int32_t rv = MQP_ERR_NOTCONN; + + if(NULL == cl_ctx) { + return rv; /* TX removed it interim, mustn't happen */ + } + return mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, 1, app); +} + +static int32_t cl_recv(struct mqtt_packet *mqp, uint32_t wait_secs, void **app) +{ + + int32_t rv = MQP_ERR_NETWORK; + int32_t n_hnds = 0, idx = 0; + + rv = group_ctxs_rx_prep(wait_secs, app); + if(rv > 0) + n_hnds = rv; + + for(idx = 0; (idx < n_hnds) && (rv > 0); idx++) { + int32_t net = recv_hvec[idx]; + if(loopb_net == net) + rv = proc_loopback_recv(net); /* UDP Packet */ + else { + rv = proc_net_data_recv(net, mqp, app); + if(false == grp_has_cbfn) + break; /* 'CTX': inform application */ + } + } + + return rv; +} + +static int32_t grp_net_setup_create() +{ + + if(0 == loopb_portid) { + return MQP_ERR_NOT_DEF; + } + if(NULL == net_ops) { + return MQP_ERR_NET_OPS; + } + if(-1 == loopb_net) { + loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, loopb_portid, NULL); + + if(-1 == loopb_net) { + return MQP_ERR_LIBQUIT; + } + } + + return 1; +} + +int32_t mqtt_client_await_msg(struct mqtt_packet *mqp, uint32_t wait_secs, void **app) +{ + + int32_t rv = MQP_ERR_NOTCONN; + *app = NULL; + + if(NULL == mqp) + return MQP_ERR_FNPARAM; /* Didn't get a valid MQP */ + + if(true == grp_has_cbfn) + return MQP_ERR_BADCALL; /* Err: LIB has CB config */ + + rv = grp_net_setup_create(); + if(rv <= 0) + return rv; + + do { + rv = cl_recv(mqp, wait_secs, app); + + } while((rv > 0) && MQP_RX_DO_NOT_RPT_COR(mqp)); + + return rv; +} + +int32_t mqtt_client_run(uint32_t wait_secs) +{ + + void *app = NULL; + int32_t rv = -1; + + if(false == grp_has_cbfn) { + return MQP_ERR_BADCALL; /* Err: LIB has no CB config */ + } + rv = grp_net_setup_create(); + if(rv <= 0) { + return rv; + } + do { + rv = cl_recv(NULL, wait_secs, &app); + + } while((rv > 0) || + /* 'ctx' related errors are handled by the callbacks */ + ((rv != MQP_ERR_LIBQUIT) && (rv != MQP_ERR_TIMEOUT))); + + return rv; +} + +/*------------------------------------------------------------------------------ + * Buffer Pool and management, other registrations and initialization. + *------------------------------------------------------------------------------ + */ +static struct mqtt_packet *free_list = NULL; + +static struct mqtt_packet *mqp_alloc_atomic(void) { + + struct mqtt_packet *mqp = NULL; + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + mqp = free_list; + if(mqp) { + free_list = mqp->next; + } + + // We have finished accessing the shared resource. Release the + // semaphore. + xSemaphoreGive( xSemaphore ); + + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +// MUTEX_UNLOCK(); + return mqp; +} + +struct mqtt_packet *mqp_client_alloc(uint8_t msg_type, uint8_t offset) { + + struct mqtt_packet *mqp = mqp_alloc_atomic(); + if(NULL == mqp) { + USR_INFO("MQP alloc failed - msg type 0x%02x\n\r", msg_type); + return NULL; + } + + mqp_init(mqp, offset); + mqp->msg_type = msg_type; + + return mqp; +} + +/* Do not use this routine with-in this file. */ +static void free_mqp(struct mqtt_packet *mqp) +{ + /* Must be used in a locked state */ + mqp->next = free_list; + free_list = mqp; +} + +int32_t mqtt_client_buffers_register(uint32_t num_mqp, struct mqtt_packet *mqp_vec, + uint32_t buf_len, uint8_t *buf_vec) +{ + uint32_t i, j; + + if((0 == num_mqp) || (0 == buf_len) || free_list) + return -1; + + for(i = 0, j = 0; i < num_mqp; i++, j += buf_len) { + struct mqtt_packet *mqp = mqp_vec + i; + + mqp->buffer = buf_vec + j; + mqp->maxlen = buf_len; + + mqp->free = free_mqp; + mqp->next = free_list; + free_list = mqp; + } + + return 0; +} + +int32_t mqtt_client_ctx_will_register(void *ctx, + const struct utf8_string *will_top, + const struct utf8_string *will_msg, + enum mqtt_qos will_qos, bool retain) +{ + uint8_t B = 0; + + if((NULL == ctx) || ((NULL == will_top) && (NULL != will_msg))) + return -1; /* Bad Combo */ + + if(NULL != will_top) { + RET_IF_INVALID_UTF8(will_top); + + B = QOS_VALUE(will_qos) << 3; + if(retain) + B |= WILL_RETAIN_VAL; + + if(NULL != will_msg) + RET_IF_INVALID_UTF8(will_msg); + } + + CLIENT(ctx)->conn_pl_utf8s[1] = will_top; + CLIENT(ctx)->conn_pl_utf8s[2] = will_msg; + + CLIENT(ctx)->will_opts = B; + + return 0; +} + +int32_t mqtt_client_ctx_info_register(void *ctx, + const struct utf8_string *client_id, + const struct utf8_string *user_name, + const struct utf8_string *pass_word) +{ + const struct utf8_string *users_pwd = NULL; + + if(NULL == ctx) + return -1; + + if(NULL != client_id) + RET_IF_INVALID_UTF8(client_id); + + if(NULL != user_name) { + RET_IF_INVALID_UTF8(user_name); + + if(NULL != pass_word) + RET_IF_INVALID_UTF8(pass_word); + + users_pwd = pass_word; + } + + CLIENT(ctx)->conn_pl_utf8s[0] = client_id; + CLIENT(ctx)->conn_pl_utf8s[3] = user_name; + CLIENT(ctx)->conn_pl_utf8s[4] = users_pwd; + + return 0; +} + +int32_t mqtt_client_net_svc_register(const struct device_net_services *net) +{ + if(net && net->open && net->send && net->recv && + net->send_dest && net->recv_from && net->close + && net->io_mon && net->time) { + net_ops = net; + return 0; + } + + return -1; +} + +static void cl_ctx_setup(struct client_ctx *cl_ctx, /* WR Object */ + const struct mqtt_client_ctx_cfg *ctx_cfg, + const struct mqtt_client_ctx_cbs *ctx_cbs, + void *app) +{ + + struct client_desc *client = CLIENT(cl_ctx); + + cl_ctx->flags = ctx_cfg->config_opts; + + client->nwconn_opts = ctx_cfg->nwconn_opts; + client->server_addr = ctx_cfg->server_addr; + client->port_number = ctx_cfg->port_number; + + client->app = app; + + + if(NULL != ctx_cfg->nw_security) + buf_wr_nbytes((uint8_t*)&client->nw_security, + (uint8_t*)ctx_cfg->nw_security, + sizeof(struct secure_conn)); + + if(NULL != ctx_cbs) { + // set callback flag + struct mqtt_client_ctx_cbs *cbs_ctx = CTX_CBS_PTR(client); + cbs_ctx->publish_rx = ctx_cbs->publish_rx; + cbs_ctx->ack_notify = ctx_cbs->ack_notify; + cbs_ctx->disconn_cb = ctx_cbs->disconn_cb; + } + + return; +} + +int32_t mqtt_client_ctx_create(const struct mqtt_client_ctx_cfg *ctx_cfg, + const struct mqtt_client_ctx_cbs *ctx_cbs, + void *app, void **ctx) +{ + + struct client_ctx *cl_ctx = NULL; + + if((NULL == ctx_cfg) || + (NULL == ctx_cfg->server_addr) || + (0 == ctx_cfg->port_number)) { + return -1; + } + if(ctx_cfg->config_opts & MQTT_CFG_MK_GROUP_CTX) { + if(grp_has_cbfn ^ (!!ctx_cbs)) { + return -1; + } + } + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + if(free_ctxs) { + cl_ctx = free_ctxs; + free_ctxs = cl_ctx->next; + cl_ctx->next = NULL; + } + + // We have finished accessing the shared resource. Release the + // semaphore. + xSemaphoreGive( xSemaphore ); + + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +// MUTEX_UNLOCK(); + + if(cl_ctx) { + cl_ctx_setup(cl_ctx, ctx_cfg, ctx_cbs, app); + *ctx = (void*) cl_ctx; + return 0; + } + + return -1; +} + +int32_t mqtt_client_ctx_delete(void *ctx) +{ + + struct client_ctx *cl_ctx = (struct client_ctx*) ctx; + int32_t rv = -1; /* Not sure about deletion as yet */ + +// MUTEX_LOCKIN(); + if( xSemaphore != NULL ) { + // See if we can obtain the semaphore. If the semaphore is not available + // wait 10 ticks to see if it becomes free. + if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { + // We were able to obtain the semaphore and can now access the + // shared resource. + if((NULL == cl_ctx) || + (-1 != cl_ctx->net) || + (awaits_pkts(cl_ctx))) { + goto mqtt_client_ctx_delete_exit1; + } + rv = 0; /* OK to delete ctx */ + client_reset(CLIENT(cl_ctx)); + cl_ctx_freeup(cl_ctx); + + // We have finished accessing the shared resource. Release the + // semaphore. + //xSemaphoreGive( xSemaphore ); + } else { + // We could not obtain the semaphore and can therefore not access + // the shared resource safely. + Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); + } + } + +mqtt_client_ctx_delete_exit1: +// MUTEX_UNLOCK(); + xSemaphoreGive(xSemaphore); + + return rv; +} + +int32_t mqtt_client_lib_init(const struct mqtt_client_lib_cfg *lib_cfg) +{ + if((NULL == lib_cfg) || (NULL == lib_cfg->debug_printf)) + return -1; + + debug_printf = lib_cfg->debug_printf; /* Facilitate debug */ + + if(INIT_DONE_STATE == cl_lib_state) { + Uart_Write((uint8_t*)"C: Error trying to re-initialize \n\r"); + USR_INFO("C: Error trying to re-initialize \n\r"); + return -1; + } + + USR_INFO("Version: Client LIB %s, Common LIB %s.\n\r", + MQTT_CLIENT_VERSTR, MQTT_COMMON_VERSTR); + + client_desc_init(); + + cl_lib_state = INIT_DONE_STATE; + + loopb_portid = lib_cfg->loopback_port; + grp_has_cbfn = lib_cfg->grp_uses_cbfn; + + mutex = lib_cfg->mutex; + mutex_lockin = lib_cfg->mutex_lockin; + mutex_unlock = lib_cfg->mutex_unlock; + + aux_dbg_enbl = lib_cfg->aux_debug_en; + + return 0; +} + +int32_t mqtt_client_lib_exit() +{ + struct client_ctx *cl_ctx = free_ctxs; + int32_t count = 0; + + while(cl_ctx) { + cl_ctx = cl_ctx->next; + count++; + } + + if(MAX_NWCONN == count) { + cl_lib_state = WAIT_INIT_STATE; + free_ctxs = NULL; + return 0; + } + + return -1; +} + +}//namespace mbed_mqtt
diff -r 000000000000 -r 087b5655778d mqtt_client.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mqtt_client.h Sat Jun 06 13:28:41 2015 +0000 @@ -0,0 +1,945 @@ + +/****************************************************************************** +* +* Copyright (C) 2014 Texas Instruments Incorporated +* +* All rights reserved. Property of Texas Instruments Incorporated. +* Restricted rights to use, duplicate or disclose this code are +* granted through contract. +* +* The program may not be used without the written permission of +* Texas Instruments Incorporated or against the terms and conditions +* stipulated in the agreement under which this program has been supplied, +* and under no circumstances can it be used with non-TI connectivity device. +* +******************************************************************************/ + +/* + mqtt_client.h + + This module enumerates the public interfaces / API of the MQTT Client + Library. +*/ + + +#ifndef __MQTT_CLIENT_H__ +#define __MQTT_CLIENT_H__ + +/**@file mqtt_client.h + This C library provisions the interface / API(s) for the MQTT Client. + + This is a light-weight library to enable the services of the MQTT protocol + for one or more client applications (that would typically run on a resource + constrained system). The key consideration in the design of small footprint + library has been the abstraction of the low level details of the message + transactions with the server and yet, provide rudimentary API(s), such that, + the capabilities and features of the protocol are availalbe to be utilized + by existing and new applications in an un-restrictive manner. + + The library is targeted to conform to MQTT 3.1.1 specification. + + The MQTT Client library is a highly portable software and implies a very + limited set of dependencies on a platform. Importantly, these limited + dependencies are the commonly used features used in the embedded and the + networking world, and can be easily adapted to the target platforms. + + The services of the library are multi-task safe. Platform specific atomicity + constructs are used, through abstractions, by the library to maintain data + coherency and synchronization. In addition, the library can be configured to + support several in-flight messages. + + The client library supports multiple and simultaneous MQTT connections to one + or more servers. In this client LIB, the reference to an individual connection + in conjunction with the associated configuration parameters has been termed as + a 'context'. Therefore, the client library supports multiple 'contexts'. An + application that intends to make use of the client library must set-up or + create a 'context' prior to establishing a connection with the server. The + application can choose to destroy the 'context' after the connection with the + server has been terminated. The client LIB can only support a finite set of + 'contexts' and the number can be configured by using a compiler line option / + flag <b> -DCFG_CL_MQTT_CTXS </b>. + + Once, the 'context' is set-up, the application can send and receive the MQTT + packets to / from the server. Among several features supported by the client + LIB, the 'context' manages the keep-alive mechanism by automatically sending + PING request to the server, if there has been no other packet send to the + server with the keep-alive duration. + + @note Any future extensions & development must follow the following guidelines. + A new API or an extension to the existing API + a) must be rudimentary + b) must not imply a rule or policy (including a state machine) + b) must ensure simple design and implementation + +*/ + +#include "mqtt_common.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +namespace mbed_mqtt { + +/*------------------------------------------------------------------------------ + * MQTT Client Messaging Routines / Services + *------------------------------------------------------------------------------ + */ + +/** @defgroup client_api The Client Library API(s) + @{ +*/ + +#define MQTT_CLIENT_VERSTR "1.0.3" /**< Version of Client LIB */ + +void createMutex(void); + +static int32_t grp_net_setup_create(void); + +/** Provides a new MSG Identifier for a packet dispatch to server + + @return MSG / PKT Transaction identifier +*/ +uint16_t mqtt_client_new_msg_id(void); + +/** Ascertain whether connection / session with the server is active or not. + Prior to sending out any information any message to server, the application + can use this routine to check the status of the connection. If connection + does not exist, then client should first CONNECT to the broker. + + A connection to server could have been closed unsolicitedly either due to + keep alive time-out or due to error in RX / TX transactions. + + @note this API does not refer to network layer connection + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + + @return true if connection is active otherwise false. +*/ +bool mqtt_client_is_connected(void *ctx); + +/** Send the CONNECT message to the server (and don't wait for CONNACK). + This routine accomplishes multiple sequences. As a first step, it tries + to establish a network connection with the server. Then, it populates + an internaly allocated packet buffer with all the previously provided + payload data information, prepares the requisite headers and finally, + dispatches the constructed message to the server. + + Prior to invoking this service, the client application should provision the + intended payload contents of the CONNECT message by using the API(s) @ref + mqtt_client_ctx_info_register and @ref mqtt_client_ctx_will_register. And + information about the server of interest must be provided in the client LIB + 'context' creation (@ref mqtt_client_ctx_create). + + The client application must invoke an appropriate receive routine to know + about the corresponding response CONNACK from the server. The client LIB will + close the network connection to the server, if the server happens to refuse + the CONNECT request. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] clean_session asserted to delete references to previous session + at both server and client + @param[in] ka_secs Keep Alive Time + @return number of bytes sent or LIB defined errors (@ref lib_err_group) +*/ +int32_t mqtt_connect_msg_send(void *ctx, bool clean_session, uint16_t ka_secs); + +/** Send a PUBLISH message to the server (don't wait for PUBACK / PUBREC). + This routine creates a PUBLISH message in an internally allocated packet + buffer by embedding the 'topic' and 'data' contents, then prepares the + packet header and finally, dispatches the message to the server. + + After the packet has been sent to the server, if the associated QoS of the + dispatched packet is ether level 1 or 2, the client LIB 'context' will then + store the packet until the time, a corresponding PUB-ACK (for QoS1) or + PUB-REC (QoS2) message is received from the server. + + If the client LIB 'context' has been configured to assert 'clean session', + then the references to all the stored and unacknowledged PUBLISH messages + are dropped at the time of MQTT disconnection (or network disconnection). + Otherwise, these unacknowledged packets continue to be availalbe for the + next iteration of the MQTT connection. However, if the client application + asserts the 'clean session' parameter in the next iteration of the CONNECT + operation, then references to all the stored PUBLISH messages, if any, are + dropped. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] topic UTF8 based Topic Name for which data is being published. + @param[in] data_buf The binary data that is being published for the topic. + @param[in] data_len The length of the binary data. + @param[in] qos quality of service of the message + @param[in] retain should the server retain the message. + @return on success, a transaction message id otherwise, LIB defined errors + (@ref lib_err_group) +*/ +int32_t mqtt_client_pub_msg_send(void *ctx, + const struct utf8_string *topic, + const uint8_t *data_buf, uint32_t data_len, + enum mqtt_qos qos, bool retain); + +/** Dispatch application constructed PUBLISH message to the server. + Prior to sending the message to the server, this routine will prepare a fixed + header to account for the size of the contents and the flags that have been + indicated by the caller. + + After the packet has been sent to the server, if the associated QoS of the + dispatched packet is ether level 1 or 2, the client LIB 'context' will then + store the packet until the time, a corresponding PUB-ACK (for QoS1) or + PUB-REC (QoS2) message is received from the server. + + If the client LIB 'context' has been configured to assert 'clean session', + then the references to all the stored and unacknowledged PUBLISH messages + are dropped at the time of MQTT disconnection (or network disconnection). + Otherwise, these unacknowledged packets continue to be availalbe for the + next iteration of the MQTT connection. However, if the client application + asserts the 'clean session' parameter in the next iteration of the CONNECT + operation, then references to all the stored PUBLISH messages, if any, are + dropped. + + The caller must populate the payload information with topic and data before + invoking this service. + + This service facilitates direct writing of topic and (real-time) payload data + into the buffer, thereby, avoiding power consuming and wasteful intermediate + data copies. + + In case, the routine returns an error, the caller is responsbile for freeing + up or re-using the packet buffer. For all other cases, the client library + will manage the return of the packet buffer to the pool. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] mqp app created PUBLISH message without the fixed header + @param[in] qos QoS with which the message needs to send to server + @param[in] retain Asserted if the message is to be retained by server. + @return on success, the transaction Message ID, otherwise LIB defined errors + (@ref lib_err_group) +*/ +int32_t mqtt_client_pub_dispatch(void *ctx, struct mqtt_packet *mqp, + enum mqtt_qos qos, bool retain); + +/** Send a SUBSCRIBE message to the server (and don't wait for SUBACK). + This routine creates a SUBSCRIBE message in an internally allocated packet + buffer by embedding the 'qos_topics', then prepares the message header and + finally, dispatches the packet to the server. + + After the packet has been dispatched to the server, the library will store + the packet until the time, a corresponding SUB-ACK has been received from + the server. This mechanism enables the client LIB 'context' to trace the + sequence of the message-ID and / or resend the SUB packets to the server. + + The client LIB 'context', if configured to operate in the MQTT 3.1.1 mode + will drop or remove the un-acknowledged SUB messages at the time of the + termination of the network connection. + + In the MQTT 3.1 mode, the client LIB 'context' will remove the unacknowledged + SUB messages at the time of the termination of the network connection, if the + 'clean session' has been asserted. In case, the 'clean session' has not been + asserted, the stored SUB messages will continue to be available for the next + iteration of the MQTT connection. However, if the client application asserts + the 'clean session' parameter in the next iteration of the CONNECT operation, + then references to all the stored SUBSCRIBE messages, if any, are dropped. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] qos_topics an array of topic along-with its qos + @param[in] count the number of elements in the array + @return on success, the transaction Message ID, otherwise Lib defined errors + (@ref lib_err_group) +*/ +int32_t mqtt_sub_msg_send(void *ctx, const struct utf8_strqos *qos_topics, uint32_t count); + +/** Dispatch application constructed SUSBSCRIBE message to the server. + Prior to sending the message to the server, this routine will prepare a fixed + header to account for the size of the size of the contents. + + After the packet has been dispatched to the server, the library will store + the packet until the time, a corresponding SUB-ACK has been received from + the server. This mechanism enables the client LIB 'context' to trace the + sequence of the message-ID and / or resend the SUB packets to the server. + + The client LIB 'context', if configured to operate in the MQTT 3.1.1 mode + will drop or remove the un-acknowledged SUB messages at the time of the + termination of the network connection. + + In the MQTT 3.1 mode, the client LIB 'context' will remove the unacknowledged + SUB messages at the time of the termination of the network connection, if the + 'clean session' has been asserted. In case, the 'clean session' has not been + asserted, the stored SUB messages will continue to be available for the next + iteration of the MQTT connection. However, if the client application asserts + the 'clean session' parameter in the next iteration of the CONNECT operation, + then references to all the stored SUBSCRIBE messages, if any, are dropped. + + The caller must populate the payload information of topic along with qos + before invoking this service. + + This service facilitates direct writing of topic and (real-time) payload data + into the buffer, thereby, avoiding power consuming and wasteful intermediate + data copies. + + In case, the routine returns an error, the caller is responsbile for freeing + up or re-using the packet buffer. For all other cases, the client library + will manage the return of the packet buffer to the pool. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] mqp app created SUBSCRIBE message without the fixed header. + @return on success, the transaction Message ID, otherwise Lib defined errors + (@ref lib_err_group) +*/ +int32_t mqtt_sub_dispatch(void *ctx, struct mqtt_packet *mqp); + +/** Send an UNSUBSCRIBE message to the server (and don't wait for UNSUBACK). + This routine creates an UNSUBSCRIBE message in an internally allocated packet + buffer by embedding the 'topics', then prepares the message header and + finally, dispatches the packet to the server. + + After the packet has been dispatched to the server, the library will store + the packet until the time, a corresponding UNSUB-ACK has been received from + the server. This mechanism enables the client LIB 'context' to trace the + sequence of the message-ID and / or resend the UNSUB packets to the server. + + The client LIB 'context', if configured to operate in the MQTT 3.1.1 mode + will drop or remove the un-acknowledged SUB messages at the time of the + termination of the network connection. + + In the MQTT 3.1 mode, the client LIB 'context' will remove the unacknowledged + UNSUB messages at the time of the termination of the network connection, if + the 'clean session' has been asserted. In case, the 'clean session' has not + been asserted, the stored UNSUB messages will continue to be available for + the next iteration of the MQTT connection. However, if the client application + asserts the 'clean session' parameter in the next iteration of the CONNECT + operation, then references to all the stored UNSUBSCRIBE messages, if any, + are dropped. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] topics an array of topic to unsubscribe + @param[in] count the number of elements in the array + @return on success, the transaction Message ID, otherwise Lib defined errors + (@ref lib_err_group) +*/ +int32_t mqtt_unsub_msg_send(void *ctx, const struct utf8_string *topics, uint32_t count); + +/** Dispatch application constructed UNSUSBSCRIBE message to the server. + Prior to sending the message to the server, this routine will prepare a fixed + header to account for the size of the size of the contents. + + After the packet has been dispatched to the server, the library will store + the packet until the time, a corresponding UNSUB-ACK has been received from + the server. This mechanism enables the client LIB 'context' to trace the + sequence of the message-ID and / or resend the UNSUB packets to the server. + + The client LIB 'context', if configured to operate in the MQTT 3.1.1 mode + will drop or remove the un-acknowledged SUB messages at the time of the + termination of the network connection. + + In the MQTT 3.1 mode, the client LIB 'context' will remove the unacknowledged + UNSUB messages at the time of the termination of the network connection, if + the 'clean session' has been asserted. In case, the 'clean session' has not + been asserted, the stored UNSUB messages will continue to be available for + the next iteration of the MQTT connection. However, if the client application + asserts the 'clean session' parameter in the next iteration of the CONNECT + operation, then references to all the stored UNSUBSCRIBE messages, if any, + are dropped. + + The caller must populate the payload information of topics before invoking + this service. + + This service facilitates direct writing of topic and (real-time) payload data + into the buffer, thereby, avoiding power consuming and wasteful intermediate + data copies. + + In case, the routine returns an error, the caller is responsbile for freeing + up or re-using the packet buffer. For all other cases, the client library + will manage the return of the packet buffer to the pool. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] Packet Buffer that holds UNSUBSCRIBE message without a fixed header + @return on success, the transaction Message ID, otherwise LIB defined errors + (@ref lib_err_group) +*/ +int32_t mqtt_unsub_dispatch(void *ctx, struct mqtt_packet *mqp); + +/** Send a PINGREQ message to the server (and don't wait for PINGRSP). + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @return number of bytes sent or Lib define errors (@ref lib_err_group) +*/ + +int32_t mqtt_pingreq_send(void *ctx); + +/** Send a DISCONNECT message to the server. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @return number of bytes sent or Lib define errors (@ref lib_err_group) +*/ +int32_t mqtt_disconn_send(void *ctx); + + +/** Send remaining data or contents of the scheduled message to the server. + This routine tries to send the remaining data in an active transfer of a + message to the server. This service is valid, only if the network layer of + the platform is not able to send out the entire message in one TCP packet + and has to "back-off" or "give up the control" before it can schedule or + dispatch the next packet. In such a scenario, the network layer sends the + first part (segment) of the scheduled message in the mqtt_xxx_send() API and + the subsequent parts or the segments are sent using this routine. + + This routine is not applicable to the platforms or the scenarios, where the + implementation of the platform can segment the MQTT message in a manner to + schedule consercutive or back-to-back blocking socket transactions. + Specifically, this API must be used by an application, only if the network + layer can indicate in an asynchronous manner, its readiness to send the next + packet to the server. And the mechanism to indicate readiness of the network + layer for the next send transaction is out of band and out of scope for the + Client LIB and depends on the platform. + + A platform that resorts to partial send of a message and has to back-off from + transmission implies the following the considerations on to the application. + (a) The next segment / part of the currently active MQTT packet can be sent + or scheduled only after receiving the indication, to do so, from the network + layer. + (b) The next or new MQTT message (or its first segment) can be scheduled for + transmission only after receiving the indication for completion of handling + of the last segment of currently active message. + + @note The application developer should refer to the platform specific network + implementation for details. + + The routine returns the number of remaining bytes in the message to be sent. + However, as described earlier, the application is expected to wait for an + indication about the readiness of the network layer prior to sending or + scheduling another segment, if so available, to the server. Now, the new + segment can be a next part of the currently active message or it can be the + first segment of a new message. A return value of zero means that there is + no more data left in the scheduled message to be sent to the server and the + application should wait for an appropriate event to indicate the transmission + of the last segment. + + In case of an error, the transfer of the remaining segments or parts of the + scheduled message is aborted. Depending on the configuration of the 'clean + session' in-conjunction with the revision of the MQTT protocol, the active + message would be stored for re-transmission, MQTT connection is established + again. To store a message for re-transmission, at least one segment of the + message must have been successfully sent to the server. + + @note This API must be used by the application only if the platform has the + capability to indicate the completion of the sending of an active segment. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @return the number of bytes remaining to be sent in the message. Otherwise, + LIB defined errors (@ref lib_err_group) +*/ +int32_t mqtt_client_send_progress(void *ctx); + +/** Block on the 'context' to receive a message type with-in specified wait time. + This service is valid only for the configuration, where the application has + not provided the callbacks to the client LIB 'context'. The caller must + provide a packet buffer of adequate size to hold the expected message from + the server. + + The wait time implies the maximum intermediate duration between the reception + of two successive messages from the server. If no message is received before + the expiry of the wait time, the routine returns. However, the routine would + continue to block, in case, messages are being received within the successive + period of wait time and these messages are not the one that client is waiting + for. + + For platforms that can receive a MQTT message over multiple TCP packets or in + segments, it is neccessary for the application to provide the same packet + packet buffer 'mqp' across all iterations of this routine that returns with a + value of MQP_ERR_TIMEOUT. Such an arrangement enables the implementation to + iteratively build from consecutive multiple RX network packets / segements, a + MQTT message into the packet buffer 'mqp. The application can always choose + to assign a new 'mqp' once, a complete MQTT message has been received. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] msg_type message type to receive. A value of 0 would imply that + caller is ready to receive the next message, whatsoever, from the server. + @param[out] mqp packet buffer to hold the message received from the server. + @param[in] wait_secs maximum Time to wait for a message from the server. + @return On success, the number of bytes received for 'msg_type' from server, + otherwise LIB defined error values (@ref lib_err_group) +*/ +int32_t mqtt_client_ctx_await_msg(void *ctx, uint8_t msg_type, struct mqtt_packet *mqp, + uint32_t wait_secs); + +/** Helper function to receive any message from the server. + Refer to mqtt_client_ctx_await_msg for the details. + @see mqtt_client_ctx_await_msg +*/ +static inline +int32_t mqtt_client_ctx_recv(void *ctx, struct mqtt_packet *mqp, uint32_t wait_secs) +{ + /* Receive next and any MQTT Message from the broker */ + return mqtt_client_ctx_await_msg(ctx, 0x00, mqp, wait_secs); +} + +/** Run the context for the specificed wait time. + This service is valid only for the configuration, where the application has + populated the callbacks that can be invoked by the client LIB 'context'. + + This routine yields the control back to the application after the duration + of the wait time. Such an arrangement enable the application to make overall + progress to meet its intended functionality. + + The wait time implies the maximum intermediate duration between the reception + of two successive messages from the server. If no message is received before + the expiry of the wait time, the routine returns. However, the routine would + continue to block, in case, messages are being received within the successive + period of the wait time. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] wait_secs maximum time to wait for a message from the server + + @return MQP_ERR_NOTCONN if MQTT connection is closed by the application, + MQP_ERR_TIMEOUT if there was no MQTT transaction in the interval of wait time + and other values (@ref lib_err_group) +*/ +int32_t mqtt_client_ctx_run(void *ctx, uint32_t wait_secs); + +/** Block to receive any message for the grouped contexts within specified time. + This service is valid only for the set-up, where the applicatiion has not + configured the grouped contexts in the callback mode. The caller must provide + a packet buffer of adequate size to hold the expected message from the server. + + The wait time implies the maximum intermediate duration between the reception + of two successive messages from the server. If no message is received before + the expiry of the wait time, the routine returns. However, the routine would + continue to block, in case, messages are being received within the successive + period of wait time. + + In this revision of the LIB, the underlying implementation will be provide a + valid value in the 'app' field for the all returned values, including errors, + other than the cases of 'MQP_ERR_TIMEOUT' and 'MQP_ERR_LIBQUIT'. The value in + 'app', when valid refers to a previously configured handle to an application + and idenitifies the 'context' for which the routine has been returned. + + @param[out] mqp packet buffer to hold the message received from the server. + @param[in] wait_secs Maximum Time to wait for a message from the server. + @param[out] app place holder to indicate application handle for the packet. + @return On success, the number of bytes received for 'msg_type' from server, + otherwise LIB defined error values (@ref lib_err_group) + + @note if the value of MQP_ERR_LIBQUIT is returned, then system must be + restarted. +*/ +int32_t mqtt_client_await_msg(struct mqtt_packet *mqp, uint32_t wait_secs, void **app); + +/** Run the LIB for the specified wait time. + This service is valid only for the set-up of grouped contexts, where the + application has populated the callbacks that can be invoked by the LIB. + + This routine yields the control back to the application after the specified + duration of wait time. Such an arrangement enable the application to + make overall progress to meet it intended functionality. + + The wait time implies the maximum intermediate duration between the reception + of two successive messages from the server. If no message is received before + the expiry of the wait time, the routine returns. However, the routine would + continue to block, in case, messages are being received within the successive + period of wait time. + + @param[in] wait_secs maximum time to wait for a message from the server + + @return on connection close by client app, number of bytes received for the + last msg from broker, otherwise LIB defined error values. + + @note if the value of MQP_ERR_LIBQUIT is returned, then system must be + restarted. +*/ +int32_t mqtt_client_run(uint32_t wait_secs); + +/*------------------------------------------------------------------------------ + * MQTT Client Library: Packet Buffer Pool and its management + *------------------------------------------------------------------------------ + */ + +/** Allocates a free MQTT Packet Buffer. + The pool that will be used by the library to allocate a free MQP buffer + must be configured (i.e. registered) a-priori by the app. + + The parameter 'offset' is used to specify the number of bytes that are + reserved for the header in the buffer + + @param[in] msg_type Message Type for which MQP buffer is being assigned. + @param[in] offset Number of bytes to be reserved for MQTT headers. + @return A NULL on error, otherwise a reference to a valid packet holder. + + @see mqtt_client_register_buffers +*/ +struct mqtt_packet *mqp_client_alloc(uint8_t msg_type, uint8_t offset); + +/** Helper function to allocate a MQTT Packet Buffer for a message to be + dispatched to server. + + @see to mqp_client_alloc( ) for details. +*/ +static inline struct mqtt_packet *mqp_client_send_alloc(uint8_t msg_type) +{ + return mqp_client_alloc(msg_type, MAX_FH_LEN); +} + +/** Helper function to allocate a MQTT Packet Buffer for a message to be + received from the server. + + @see to mqp_client_alloc( ) for details. +*/ +static inline struct mqtt_packet *mqp_client_recv_alloc(uint8_t msg_type) +{ + return mqp_client_alloc(msg_type, 0); +} + +/** Create a pool of MQTT Packet Buffers for the client library. + This routine creates a pool of free MQTT Packet Buffers by attaching a buffer + (buf) to a packet holder (mqp). The count of mqp elements and buf elements in + the routine are same. And the size of the buffer in constant across all the + elements. + + The MQTT Packet Buffer pool should support (a) certain number of in-flight and + stored packets that await ACK(s) from the server (b) certain number of packets + from server whose processing would be deferred by the client app (to another + context) (c) a packet to create a CONNECT message to re-establish transaction + with the server. + + A meaningful size of the pool is very much application specific and depends + on the target functionality. For example, an application that intends to have + only one in-flight message to the server would need atmost three MQP buffers + (1 for TX (for Qos1 or 2 store), 1 for RX and 1 for CONNECT message). If the + application sends only QoS0 messages to the server, then the number of MQP + buffers would reduce to two (i.e. 1 Tx to support CONNECT / PUB out and 1 RX) + + @param[in] num_mqp Number or count of elements in mqp_vec and buf_vec. + @param[in] mqp_vec An array of MQTT Packet Holder without a buffer. + @param[in] buf_len The size or length of the buffer element in the 'buf_vec' + @param[in] buf_vec An array of buffers. + @retun 0 on success otherwise -1 on error. + + @note The parameters mqp_vec and buf_vec should be peristent entities. + + @see mqtt_client_await_msg + @see mqtt_client_run +*/ +int32_t mqtt_client_buffers_register(uint32_t num_mqp, struct mqtt_packet *mqp_vec, + uint32_t buf_len, uint8_t *buf_vec); + +/*------------------------------------------------------------------------------ + * MQTT Client Library: Register application, platform information and services. + *------------------------------------------------------------------------------ + */ + +/** Register application info and its credentials with the client library. + This routine registers information for all the specificed parameters, + therefore, an upate to single element would imply re-specification of + the other paramters, as well. + + @note Contents embedded in the parameters is not copied by the routine, + and instead a reference to the listed constructs is retained. Therefore, + the app must enable the parameter contents for persistency. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] client_id MQTT UTF8 identifier of the client. If set to NULL, + then the client will be treated as zero length entity. + @param[in] user_name MQTT UTF8 user name for the client. If not used, + set it to NULL. If used, then it can't be of zero length. + @param[in] pass_word MQTT UTF8 pass word for the client. If not used, set + it to NULL, If used, then it can't be of zero length. + @return 0 on success otherwise -1 + + User name without a pass word is a valid configuration. A pass word won't + be processed if it is not associated with a valid user name. +*/ +int32_t mqtt_client_ctx_info_register(void *ctx, + const struct utf8_string *client_id, + const struct utf8_string *user_name, + const struct utf8_string *pass_word); + +/** Register WILL information of the client application. + This routine registers information for all the specificed parameters, + therefore, an update to single element would imply re-specification + of the other paramters, as well. + + @note Contents embedded in the parameters is not copied by the routine, + and instead a reference to the listed constructs is retained. Therefore, + the app must enable the parameter contents for persistency. + + @param[in] ctx handle to the underlying network context in the LIB + @see mqtt_client_ctx_create + @param[in] will_top UTF8 WILL Topic on which WILL message is to be published. + @param[in] will_msg UTF8 WILL message. + @param[in] will_qos QOS for the WILL message + @param[in] retain asserted to indicate that published WILL must be retained + @return 0 on success otherwise -1. + + Both will_top and will_msg should be either present or should be NULL. + will_qos and retain are relevant only for a valid Topic and Message combo. +*/ +int32_t mqtt_client_ctx_will_register(void *ctx, + const struct utf8_string *will_top, + const struct utf8_string *will_msg, + enum mqtt_qos will_qos, bool retain); + +/** Abstraction for the device specific network services. + Network services for communication with the server + + @param[in] net refers to network services supported by the platform + @return on success, 0, otherwise -1 + + @ref net_ops_group + @note all entries in net must be supported by the platform. +*/ +int32_t mqtt_client_net_svc_register(const struct device_net_services *net); + +/** Helper functions & macros to derive 16 bit CONNACK Return Code from broker. + */ +#define VHB_CONNACK_RC(vh_buf) (vh_buf[1]) /**< CONNACK VH:: Return Code */ +#define MQP_CONNACK_RC(mqp) (mqp->buffer[3])/**< CONNACK MQP:: Return Code */ + +#define VHB_CONNACK_SP(vh_buf) (vh_buf[0] & 0x1)/**< CONNACK VH:: Session Bit */ +#define MQP_CONNACK_SP(mqp) (mqp->buffer[2] & 0x1) /**< CONNACK MQP:: \ + Session Bit */ + +#define VHB_CONNACK_VH16(vh_buf)((vh_buf[0] << 8) | vh_buf[1]) +#define MQP_CONNACK_VH16(mqp) ((mqp->buffer[2] << 8) | mqp->buffer[3]) + +/** Callbacks to be invoked by MQTT Client library onto Client application */ +struct mqtt_client_ctx_cbs { + + /** Provides a PUBLISH message from server to the client application. + The application can utilize the associated set of helper macros to + get references to the topic and the data information contained in + the message. @ref rxpub_help_group + + Depending upon the QoS level of the message, the MQTT client library + shall dispatch the correponding acknowlegement (PUBACK or PUBREC) to + the server, thereby, relieving application of this support. + + If the application completes the processing of the packet within the + implementation of this callback routine, then a value of 'true' must + be returned to the client LIB 'context'. The library, in this case, + does not handover the packet to application and instead, frees it up + on return from this routine. + + If the application intends to defer the processing of the PUBLISH + message to a different execution task, then it must takeover the + owernship of the packet by returning a value of 'false' to the client + LIB 'context. In this arrangement, the client LIB 'context' hands + over the packet to the application. The responsibility of storing, + managing and eventually freeing up the packet back to the pool, now, + lies with the app. + + @param[in] app application to which this PUBLISH message is targeted + @see mqtt_client_ctx_create + @param[in] dup asserted to indicate a DUPLICATE PUBLISH + @param[in] qos quality of Service of the PUBLISH message + @param[in] retain Asserted to indicate message at new subscription + @param[in] mqp Packet Buffer that holds the PUBLISH message + + @return true to indicate that processing of the packet has been + completed and it can freed-up and returned back to the pool by + the library. Otherwise, false. + */ + bool (*publish_rx)(void *app, + bool dup, enum mqtt_qos qos, bool retain, + struct mqtt_packet *mqp); + + /** Notifies the client application about an ACK or a response from the + server. Following are the messages that are notified by the client + LIB to the application. + + CONNACK, PINGRSP, PUBACK, PUBCOMP, SUBACK, UNSUBACK + + @param[in] app application to which this ACK message is targeted + @see mqtt_client_ctx_create + @param[in] msg_type Type of the MQTT messsage + @param[in] msg_id transaction identity of the message + @param[in] buf refers to contents of message and depends on msg_type + @param[in] len length of the buf + @return none + + @note The size of the buf parameter i.e len is non-zero for the + SUBACK and CONNACK messages. For SUBACK the buf carries an array of + QOS responses provided by the server. For CONNACK, the buf carries + variable header contents. Helper macro VHB_CONNACK_RC( ) and + VHB_CONNACK_SP( ) can be used to access contents provided by the + server. For all other messages, the value of len parameter is zero. + + @note The parameter msg_id is not relevant for the messages CONNACK + and PINGRSP and is set to zero. + */ + void (*ack_notify)(void *app, uint8_t msg_type, uint16_t msg_id, uint8_t *buf, uint32_t len); + + /** Notifies the client application about the termination of connection + with the server. After servicing this callback, the application can + destroy associated context if it no longer required + + @param[in] app application whose connection got terminated + @see mqtt_client_ctx_create + @param[in] cause reason that created disconnection + (@ref lib_err_group) + */ + void (*disconn_cb)(void *app, int32_t cause); +}; + +struct mqtt_client_ctx_cfg { + + /** @defgroup mqtt_ctx_cfg_opt_grp Options for application to config CTX + @{ + */ +#define MQTT_CFG_PROTOCOL_V31 0x0001 /**< Assert for ver3.1, default is 3.1.1 */ +#define MQTT_CFG_APP_HAS_RTSK 0x0002 /**< Assert if APP has dedicated RX Task */ +#define MQTT_CFG_MK_GROUP_CTX 0x0004 /**< Assert if task manages > 1 contexts */ + /** @} */ + + uint16_t config_opts; /**< Context config Opt, @ref mqtt_ctx_cfg_opt_grp */ + + /** @defgroup mqtt_netconn_opt_grp Options for App to configure network + @{ + */ +#define MQTT_NETCONN_OPT_IP6 DEV_NETCONN_OPT_IP6 /**<@ref dev_netconn_opt_grp */ +#define MQTT_NETCONN_OPT_URL DEV_NETCONN_OPT_URL /**<@ref dev_netconn_opt_grp */ +#define MQTT_NETCONN_OPT_SEC DEV_NETCONN_OPT_SEC /**<@ref dev_netconn_opt_grp */ + /** @} */ + + uint32_t nwconn_opts; /**< Network Options, @ref mqtt_netconn_opt_grp */ + + char *server_addr; /**< Reference to '\0' terminated address string */ + uint16_t port_number; /**< Network Listening Port number of the server */ + struct secure_conn *nw_security; /**< Refer to @ref mqtt_netsec_grp */ +}; + +/** Create a Network Connection Context. + This routine sets-up the parameters that are required to operate and manage + the network connection with a MQTT server / broker. As part of the creation + of a context, the implementation also records the handle, if provided, by + the application. In addition, the invoker of the routine must facilitate a + place holder to enable the client LIB to provision the reference to the + 'context', so created. + + Specifically, this routine associates or ties-up, in an one-to-one manner, + the caller provided handle 'app' and the client LIB provisioned handle 'ctx'. + The parameter 'app' is returned by the client LIB in certain other routines + to indicate the underlying 'context' with which network transaction or event + is associated. Similarly, the caller must specify the context handle 'ctx' + for which the services are being invoked. + + A user or a task prior to utilizing the services of the library to schedule + MQTT transactions must create a 'context'. A client LIB 'context' can be + operated in two modes: (a) sync-wait or explicit receive mode and (b) the + callback mode. Provisioning or absence of the callback parameter in this + routine defines the mode of operation of the 'context'. + + Explicit receive mode is analogous to the paradigm of the socket programming + in which an application utilize the recv() function call. It is anticipated + that applications which would make use of limited set of MQTT messages may + find this mode of operation useful. Applications which intend to operate the + 'context' in this mode must not provision any callbacks. + + On the other hand, certain applications, may prefer an asynchronous mode of + operation and would want the client LIB 'context' to raise callbacks into + the application, as and when, packets arrive from the server. And such + applications must provide the callback routines. + + @param[in] ctx_cfg configuration information for the Network Context. + @param[in] ctx_cbs callback routines. Must be set to NULL, if the application + intends to operate the context in the sync-wait / explicit receive mode. + @param[in] app handle to application. Returned by LIB in other routines + to refer to the underlying context. + @param[out] ctx reference to the context created and is provisioned by the + implementation. (Valid only if routine returns a success) + + @return 0 on success otherwise -1. +*/ +int32_t mqtt_client_ctx_create(const struct mqtt_client_ctx_cfg *ctx_cfg, + const struct mqtt_client_ctx_cbs *ctx_cbs, + void *app, void **ctx); + +/** Delete a Network Connection Context. + This routines destroys the previously created network 'context' and releases + resources that would be assigned for maintaining the information about the + 'context'. + + A user or a task prior to deleting the 'context' must ensure that there is no + active MQTT connection on this context. + + @param[in] ctx handle to network context to be deleted. The context must + have been previously created. + + @return 0 on success otherwise -1 +*/ + +int32_t mqtt_client_ctx_delete(void *ctx); + +/** Contruct / Data to initialize MQTT Client Library */ +struct mqtt_client_lib_cfg { + + /** If an application has more than one contexts (i.e. grouped contexts) + to manage in a single task, then a non-zero value must be provided. + Otherwise, this parameter must be set to zero. + */ + uint16_t loopback_port; + bool grp_uses_cbfn; /**< Assert if grouped contexts use call-backs */ + + /** For a multi-task environment, provide a handle to platform mutex */ + void *mutex; + void (*mutex_lockin)(void *mutex); /**< Take platform mutex function */ + void (*mutex_unlock)(void *mutex); /**< Give platform mutex function */ + + int32_t (*debug_printf)(const char *format, ...); /**< Debug, mandatory */ + bool aux_debug_en; /**< Assert to indicate additional debug info */ +}; + +/** Initialize the MQTT client library. + This routine initializes all the common constructs that are required to + manage the multiple network connetions. The client LIB must be initialized + prior to invoking of any other routine or service. + + @note This routine must be invoked only once in an run of the system. + + Depending upon the deployment needs, this routine can be invoked either as + part of the platform initialization sequence or as part of the application. + Deployments that have more than one application utilizing the services of + the client LIB should try to invoke the routine from the initialization + sequence of the platform. + + In addition, if an application has to manage more than one network + connections (i.e. in other words, if the application has to handle + a group of connections), then certain configuration must be set in + the LIB @see struct mqtt_client_lib_cfg + + @note There must be only one group of network connetions in the system. + + @param[in] cfg Configuration information for the MQTT client Library. + + @return 0 on success otherwise -1. +*/ + +int32_t mqtt_client_lib_init(const struct mqtt_client_lib_cfg *cfg); + +/** Exit the MQTT client library. + @return 0 on success otherwise -1. +*/ +int32_t mqtt_client_lib_exit(void); + +/** @} */ /* End group client_api */ + +}//namespace mbed_mqtt + +#ifdef __cplusplus +} +#endif + +#endif +