Part of TI's mqtt

Dependents:   mqtt_V1 cc3100_Test_mqtt_CM3

Committer:
dflet
Date:
Sat Jun 06 13:28:41 2015 +0000
Revision:
0:087b5655778d
Part of mtqq_V1

Who changed what in which revision?

UserRevisionLine numberNew contents of line
dflet 0:087b5655778d 1 /******************************************************************************
dflet 0:087b5655778d 2 *
dflet 0:087b5655778d 3 * Copyright (C) 2014 Texas Instruments Incorporated
dflet 0:087b5655778d 4 *
dflet 0:087b5655778d 5 * All rights reserved. Property of Texas Instruments Incorporated.
dflet 0:087b5655778d 6 * Restricted rights to use, duplicate or disclose this code are
dflet 0:087b5655778d 7 * granted through contract.
dflet 0:087b5655778d 8 *
dflet 0:087b5655778d 9 * The program may not be used without the written permission of
dflet 0:087b5655778d 10 * Texas Instruments Incorporated or against the terms and conditions
dflet 0:087b5655778d 11 * stipulated in the agreement under which this program has been supplied,
dflet 0:087b5655778d 12 * and under no circumstances can it be used with non-TI connectivity device.
dflet 0:087b5655778d 13 *
dflet 0:087b5655778d 14 ******************************************************************************/
dflet 0:087b5655778d 15
dflet 0:087b5655778d 16 /*
dflet 0:087b5655778d 17 mqtt_client.c
dflet 0:087b5655778d 18
dflet 0:087b5655778d 19 The module provides implementation to the public interface of the MQTT
dflet 0:087b5655778d 20 Client Library.
dflet 0:087b5655778d 21 */
dflet 0:087b5655778d 22 #include "FreeRTOS.h"
dflet 0:087b5655778d 23 #include "mqtt_client.h"
dflet 0:087b5655778d 24 #include "semphr.h"
dflet 0:087b5655778d 25
dflet 0:087b5655778d 26 #include "cli_uart.h"
dflet 0:087b5655778d 27
dflet 0:087b5655778d 28 #define PRINT_BUF_LEN 128
dflet 0:087b5655778d 29 extern int8_t print_buf[PRINT_BUF_LEN];
dflet 0:087b5655778d 30
dflet 0:087b5655778d 31 namespace mbed_mqtt
dflet 0:087b5655778d 32 {
dflet 0:087b5655778d 33
dflet 0:087b5655778d 34 //void createMutex(void);
dflet 0:087b5655778d 35
dflet 0:087b5655778d 36 static void *mutex = NULL;
dflet 0:087b5655778d 37 static void (*mutex_lockin)(void*) = NULL;
dflet 0:087b5655778d 38 static void (*mutex_unlock)(void*) = NULL;
dflet 0:087b5655778d 39
dflet 0:087b5655778d 40 #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex);
dflet 0:087b5655778d 41 #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex);
dflet 0:087b5655778d 42
dflet 0:087b5655778d 43 static bool aux_dbg_enbl = true;
dflet 0:087b5655778d 44 int32_t (*debug_printf)(const char *fmt, ...) = NULL;
dflet 0:087b5655778d 45
dflet 0:087b5655778d 46 #define USR_INFO debug_printf
dflet 0:087b5655778d 47 #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__)
dflet 0:087b5655778d 48
dflet 0:087b5655778d 49 static const struct device_net_services *net_ops = NULL;
dflet 0:087b5655778d 50
dflet 0:087b5655778d 51 static uint16_t msg_id = 0xFFFF;
dflet 0:087b5655778d 52 static inline uint16_t assign_new_msg_id()
dflet 0:087b5655778d 53 {
dflet 0:087b5655778d 54 return msg_id += 2;
dflet 0:087b5655778d 55 }
dflet 0:087b5655778d 56
dflet 0:087b5655778d 57 SemaphoreHandle_t xSemaphore = NULL;
dflet 0:087b5655778d 58 void createMutex()
dflet 0:087b5655778d 59 {
dflet 0:087b5655778d 60
dflet 0:087b5655778d 61 xSemaphore = xSemaphoreCreateMutex();
dflet 0:087b5655778d 62 }
dflet 0:087b5655778d 63
dflet 0:087b5655778d 64 /*-----------------------------------------------------------------------------
dflet 0:087b5655778d 65 * Data structure for managing the client and its nuances
dflet 0:087b5655778d 66 *---------------------------------------------------------------------------*/
dflet 0:087b5655778d 67
dflet 0:087b5655778d 68 /* Construct to manage TX for network that requires LIB to send a partial and
dflet 0:087b5655778d 69 incremental data to support restrictive TCP segments. Specifically, for the
dflet 0:087b5655778d 70 deployments, in which the network layer supports small segments, there can
dflet 0:087b5655778d 71 be only one in-flight message.
dflet 0:087b5655778d 72 */
dflet 0:087b5655778d 73 struct tx_part_pkt {
dflet 0:087b5655778d 74
dflet 0:087b5655778d 75 /* Refers to MQP, if TX for it is active, otherwise, set it to NULL */
dflet 0:087b5655778d 76 struct mqtt_packet *tx_mqp;
dflet 0:087b5655778d 77
dflet 0:087b5655778d 78 union {
dflet 0:087b5655778d 79 #define VH_MSG_LEN 4
dflet 0:087b5655778d 80 uint8_t vh_msg[VH_MSG_LEN]; /* VH msg for MQP = NULL */
dflet 0:087b5655778d 81 const uint8_t *buffer; /* Refers to data in MQP */
dflet 0:087b5655778d 82 };
dflet 0:087b5655778d 83
dflet 0:087b5655778d 84 uint32_t length; /* Length of entire data */
dflet 0:087b5655778d 85 uint32_t offset; /* Next data for sending */
dflet 0:087b5655778d 86 };
dflet 0:087b5655778d 87
dflet 0:087b5655778d 88 static bool tx_part_setup(struct tx_part_pkt *tx_part, const uint8_t *buffer,
dflet 0:087b5655778d 89 uint32_t length, struct mqtt_packet *tx_mqp)
dflet 0:087b5655778d 90 {
dflet 0:087b5655778d 91
dflet 0:087b5655778d 92 if(tx_mqp) {
dflet 0:087b5655778d 93 tx_part->buffer = buffer;
dflet 0:087b5655778d 94 } else {
dflet 0:087b5655778d 95 if(VH_MSG_LEN < length)
dflet 0:087b5655778d 96 return false;
dflet 0:087b5655778d 97
dflet 0:087b5655778d 98 buf_wr_nbytes(tx_part->vh_msg, buffer, length);
dflet 0:087b5655778d 99 }
dflet 0:087b5655778d 100
dflet 0:087b5655778d 101 tx_part->length = length;
dflet 0:087b5655778d 102 tx_part->tx_mqp = tx_mqp;
dflet 0:087b5655778d 103
dflet 0:087b5655778d 104 return true;
dflet 0:087b5655778d 105 }
dflet 0:087b5655778d 106
dflet 0:087b5655778d 107 static void tx_part_reset(struct tx_part_pkt *tx_part)
dflet 0:087b5655778d 108 {
dflet 0:087b5655778d 109
dflet 0:087b5655778d 110 struct mqtt_packet *tx_mqp = tx_part->tx_mqp;
dflet 0:087b5655778d 111 if(tx_mqp) {
dflet 0:087b5655778d 112 mqp_free(tx_mqp);
dflet 0:087b5655778d 113 }
dflet 0:087b5655778d 114 tx_part->vh_msg[0] = 0x00;
dflet 0:087b5655778d 115 tx_part->tx_mqp = NULL;
dflet 0:087b5655778d 116 tx_part->length = 0;
dflet 0:087b5655778d 117 tx_part->offset = 0;
dflet 0:087b5655778d 118
dflet 0:087b5655778d 119 return;
dflet 0:087b5655778d 120 }
dflet 0:087b5655778d 121
dflet 0:087b5655778d 122 static const uint8_t *tx_part_buf_p(struct tx_part_pkt *tx_part)
dflet 0:087b5655778d 123 {
dflet 0:087b5655778d 124 struct mqtt_packet *tx_mqp = tx_part->tx_mqp;
dflet 0:087b5655778d 125 uint32_t offset = tx_part->offset;
dflet 0:087b5655778d 126
dflet 0:087b5655778d 127 return tx_mqp?
dflet 0:087b5655778d 128 tx_part->buffer + offset :
dflet 0:087b5655778d 129 tx_part->vh_msg + offset;
dflet 0:087b5655778d 130 }
dflet 0:087b5655778d 131
dflet 0:087b5655778d 132 static void tx_part_addup(struct tx_part_pkt *tx_part, uint32_t offset)
dflet 0:087b5655778d 133 {
dflet 0:087b5655778d 134 tx_part->offset += offset;
dflet 0:087b5655778d 135 }
dflet 0:087b5655778d 136
dflet 0:087b5655778d 137 #define TX_PART_BUFFER(tx_part) tx_part_buf_p(tx_part)
dflet 0:087b5655778d 138 #define TX_PART_BUF_SZ(tx_part) (tx_part->length - tx_part->offset)
dflet 0:087b5655778d 139 #define TX_PART_IN_USE(tx_part) TX_PART_BUF_SZ(tx_part)
dflet 0:087b5655778d 140
dflet 0:087b5655778d 141 enum module_state {
dflet 0:087b5655778d 142
dflet 0:087b5655778d 143 WAIT_INIT_STATE,
dflet 0:087b5655778d 144 INIT_DONE_STATE = 0x01,
dflet 0:087b5655778d 145 };
dflet 0:087b5655778d 146
dflet 0:087b5655778d 147 static enum module_state cl_lib_state = WAIT_INIT_STATE;
dflet 0:087b5655778d 148
dflet 0:087b5655778d 149 static uint16_t loopb_portid = 0;
dflet 0:087b5655778d 150 static bool grp_has_cbfn = false;
dflet 0:087b5655778d 151
dflet 0:087b5655778d 152 #define USE_PROTO_V31_FLAG MQTT_CFG_PROTOCOL_V31
dflet 0:087b5655778d 153 #define APP_RECV_TASK_FLAG MQTT_CFG_APP_HAS_RTSK
dflet 0:087b5655778d 154 #define GROUP_CONTEXT_FLAG MQTT_CFG_MK_GROUP_CTX
dflet 0:087b5655778d 155
dflet 0:087b5655778d 156 #define CLEAN_SESSION_FLAG 0x00010000
dflet 0:087b5655778d 157 #define CONNACK_AWAIT_FLAG 0x00020000
dflet 0:087b5655778d 158 #define NOW_CONNECTED_FLAG 0x00040000
dflet 0:087b5655778d 159 #define KA_PINGER_RSP_FLAG 0x00080000
dflet 0:087b5655778d 160 #define USER_PING_RSP_FLAG 0x00100000
dflet 0:087b5655778d 161 #define NETWORK_CLOSE_FLAG 0x00200000
dflet 0:087b5655778d 162 #define DO_CONNACK_TO_FLAG 0x00400000
dflet 0:087b5655778d 163
dflet 0:087b5655778d 164 static struct client_ctx *free_ctxs = NULL; /* CTX construct available */
dflet 0:087b5655778d 165 static struct client_ctx *used_ctxs = NULL; /* Relevant only for group */
dflet 0:087b5655778d 166 static struct client_ctx *conn_ctxs = NULL; /* Relevant only for group */
dflet 0:087b5655778d 167
dflet 0:087b5655778d 168 static void cl_ctx_freeup(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 169 {
dflet 0:087b5655778d 170 cl_ctx->next = free_ctxs;
dflet 0:087b5655778d 171 free_ctxs = cl_ctx;
dflet 0:087b5655778d 172
dflet 0:087b5655778d 173 return;
dflet 0:087b5655778d 174 }
dflet 0:087b5655778d 175
dflet 0:087b5655778d 176 #define IS_PROTO_VER31(cl_ctx) (cl_ctx->flags & USE_PROTO_V31_FLAG)
dflet 0:087b5655778d 177 #define AWAITS_CONNACK(cl_ctx) (cl_ctx->flags & CONNACK_AWAIT_FLAG)
dflet 0:087b5655778d 178 #define HAS_CONNECTION(cl_ctx) (cl_ctx->flags & NOW_CONNECTED_FLAG)
dflet 0:087b5655778d 179 #define AWAITS_KA_PING(cl_ctx) (cl_ctx->flags & KA_PINGER_RSP_FLAG)
dflet 0:087b5655778d 180 #define AWAITS_PINGRSP(cl_ctx) (cl_ctx->flags & USER_PING_RSP_FLAG)
dflet 0:087b5655778d 181 #define IS_CLN_SESSION(cl_ctx) (cl_ctx->flags & CLEAN_SESSION_FLAG)
dflet 0:087b5655778d 182 #define RECV_TASK_AVBL(cl_ctx) (cl_ctx->flags & APP_RECV_TASK_FLAG)
dflet 0:087b5655778d 183 #define A_GROUP_MEMBER(cl_ctx) (cl_ctx->flags & GROUP_CONTEXT_FLAG)
dflet 0:087b5655778d 184 #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG)
dflet 0:087b5655778d 185 #define CFG_CONNACK_TO(cl_ctx) (cl_ctx->flags & DO_CONNACK_TO_FLAG)
dflet 0:087b5655778d 186
dflet 0:087b5655778d 187 #ifndef CFG_CL_MQTT_CTXS
dflet 0:087b5655778d 188 #define MAX_NWCONN 4
dflet 0:087b5655778d 189 #else
dflet 0:087b5655778d 190 #define MAX_NWCONN CFG_CL_MQTT_CTXS
dflet 0:087b5655778d 191 #endif
dflet 0:087b5655778d 192
dflet 0:087b5655778d 193 static struct client_desc {
dflet 0:087b5655778d 194
dflet 0:087b5655778d 195 /* ALERT: "context" must be first elem in this structure, do not move */
dflet 0:087b5655778d 196 struct client_ctx context;
dflet 0:087b5655778d 197
dflet 0:087b5655778d 198 #define CLIENT(cl_ctx) ((struct client_desc*) cl_ctx)
dflet 0:087b5655778d 199 #define CL_CTX(client) ((struct client_ctx*) client)
dflet 0:087b5655778d 200
dflet 0:087b5655778d 201 /* Order/Sequence: Client ID, Will Topic, Will Msg, Username, Password */
dflet 0:087b5655778d 202 const struct utf8_string *conn_pl_utf8s[5]; /* Ref: CONNECT Payload */
dflet 0:087b5655778d 203 uint8_t will_opts;
dflet 0:087b5655778d 204
dflet 0:087b5655778d 205 /* Wait-List for Level 1 ACK(s), which are PUBACK, PUBREC, UN/SUBACK */
dflet 0:087b5655778d 206 struct mqtt_ack_wlist qos_ack1_wl;
dflet 0:087b5655778d 207
dflet 0:087b5655778d 208 /* Circular queue to track QOS2 PUBLISH packets from the server. They
dflet 0:087b5655778d 209 are tracked for the duration of PUBLISH-RX to PUBREL-RX.
dflet 0:087b5655778d 210 */
dflet 0:087b5655778d 211 struct pub_qos2_cq qos2_rx_cq;
dflet 0:087b5655778d 212
dflet 0:087b5655778d 213 /* Circular queue to track QOS2 PUBLISH packets from the client. They
dflet 0:087b5655778d 214 are tracked for the duration of PUBREC-RX to PUBOMP-RX.
dflet 0:087b5655778d 215 */
dflet 0:087b5655778d 216 struct pub_qos2_cq qos2_tx_cq;
dflet 0:087b5655778d 217
dflet 0:087b5655778d 218 struct mqtt_client_ctx_cbs app_ctx_cbs; /* Callback funcs from app */
dflet 0:087b5655778d 219 #define CTX_CBS_PTR(cl_ctx) &(CLIENT(cl_ctx)->app_ctx_cbs)
dflet 0:087b5655778d 220
dflet 0:087b5655778d 221 struct tx_part_pkt tx_part;/* Reference to partial TX PKT */
dflet 0:087b5655778d 222 struct mqtt_packet *rx_mqp; /* Reference to partial RX PKT */
dflet 0:087b5655778d 223 void *app;
dflet 0:087b5655778d 224
dflet 0:087b5655778d 225 uint32_t nwconn_opts;
dflet 0:087b5655778d 226 char *server_addr;
dflet 0:087b5655778d 227 uint16_t port_number;
dflet 0:087b5655778d 228 struct secure_conn nw_security;
dflet 0:087b5655778d 229
dflet 0:087b5655778d 230 } clients[MAX_NWCONN];
dflet 0:087b5655778d 231
dflet 0:087b5655778d 232 static void client_reset(struct client_desc *client)
dflet 0:087b5655778d 233 {
dflet 0:087b5655778d 234
dflet 0:087b5655778d 235 struct mqtt_client_ctx_cbs *ctx_cbs = &client->app_ctx_cbs;
dflet 0:087b5655778d 236 int32_t i = 0;
dflet 0:087b5655778d 237
dflet 0:087b5655778d 238 cl_ctx_reset(CL_CTX(client));
dflet 0:087b5655778d 239
dflet 0:087b5655778d 240 for(i = 0; i < 5; i++) {
dflet 0:087b5655778d 241 client->conn_pl_utf8s[i] = NULL;
dflet 0:087b5655778d 242 }
dflet 0:087b5655778d 243 client->will_opts = 0;
dflet 0:087b5655778d 244
dflet 0:087b5655778d 245 mqp_ack_wlist_purge(&client->qos_ack1_wl);
dflet 0:087b5655778d 246
dflet 0:087b5655778d 247 qos2_pub_cq_reset(&client->qos2_rx_cq);
dflet 0:087b5655778d 248 qos2_pub_cq_reset(&client->qos2_tx_cq);
dflet 0:087b5655778d 249
dflet 0:087b5655778d 250 ctx_cbs->publish_rx = NULL;
dflet 0:087b5655778d 251 ctx_cbs->ack_notify = NULL;
dflet 0:087b5655778d 252 ctx_cbs->disconn_cb = NULL;
dflet 0:087b5655778d 253
dflet 0:087b5655778d 254 tx_part_reset(&client->tx_part);
dflet 0:087b5655778d 255 client->rx_mqp = NULL;
dflet 0:087b5655778d 256 client->app = NULL;
dflet 0:087b5655778d 257
dflet 0:087b5655778d 258 client->nwconn_opts = 0;
dflet 0:087b5655778d 259 client->server_addr = NULL;
dflet 0:087b5655778d 260 client->port_number = 0;
dflet 0:087b5655778d 261
dflet 0:087b5655778d 262 secure_conn_struct_init(&client->nw_security);
dflet 0:087b5655778d 263
dflet 0:087b5655778d 264 return;
dflet 0:087b5655778d 265 }
dflet 0:087b5655778d 266
dflet 0:087b5655778d 267 static void client_desc_init(void)
dflet 0:087b5655778d 268 {
dflet 0:087b5655778d 269 int32_t i = 0;
dflet 0:087b5655778d 270 for(i = 0; i < MAX_NWCONN; i++) {
dflet 0:087b5655778d 271 struct client_desc *client = clients + i;
dflet 0:087b5655778d 272 struct client_ctx *cl_ctx = CL_CTX(client);
dflet 0:087b5655778d 273
dflet 0:087b5655778d 274 /* Initialize certain fields to defaults */
dflet 0:087b5655778d 275 client->qos_ack1_wl.head = NULL;
dflet 0:087b5655778d 276 client->qos_ack1_wl.tail = NULL;
dflet 0:087b5655778d 277 client->tx_part.tx_mqp = NULL;
dflet 0:087b5655778d 278
dflet 0:087b5655778d 279 client_reset(client); /* Reset remaining */
dflet 0:087b5655778d 280
dflet 0:087b5655778d 281 cl_ctx->next = free_ctxs;
dflet 0:087b5655778d 282 free_ctxs = cl_ctx;
dflet 0:087b5655778d 283 }
dflet 0:087b5655778d 284
dflet 0:087b5655778d 285 return;
dflet 0:087b5655778d 286 }
dflet 0:087b5655778d 287
dflet 0:087b5655778d 288 static void mqp_free_locked(struct mqtt_packet *mqp)
dflet 0:087b5655778d 289 {
dflet 0:087b5655778d 290
dflet 0:087b5655778d 291 // MUTEX_LOCKIN();
dflet 0:087b5655778d 292
dflet 0:087b5655778d 293 // MUTEX_UNLOCK();
dflet 0:087b5655778d 294
dflet 0:087b5655778d 295 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 296 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 297 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 298 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 299 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 300 // shared resource.
dflet 0:087b5655778d 301 mqp_free(mqp);
dflet 0:087b5655778d 302
dflet 0:087b5655778d 303 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 304 // semaphore.
dflet 0:087b5655778d 305 xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 306
dflet 0:087b5655778d 307 } else {
dflet 0:087b5655778d 308 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 309 // the shared resource safely.
dflet 0:087b5655778d 310 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 311 }
dflet 0:087b5655778d 312 }
dflet 0:087b5655778d 313 }
dflet 0:087b5655778d 314
dflet 0:087b5655778d 315 /*----------------------------------------------------------------------------
dflet 0:087b5655778d 316 * Fix-up to prevent certain good and non-callback MQP being reported to app
dflet 0:087b5655778d 317 *----------------------------------------------------------------------------
dflet 0:087b5655778d 318 */
dflet 0:087b5655778d 319 /* cor --> clear on read. */
dflet 0:087b5655778d 320 static bool mqp_do_not_report_cor(struct mqtt_packet *mqp)
dflet 0:087b5655778d 321 {
dflet 0:087b5655778d 322 bool rv = (1 == mqp->private_)? true : false;
dflet 0:087b5655778d 323 mqp->private_ = 0;
dflet 0:087b5655778d 324 return rv;
dflet 0:087b5655778d 325 }
dflet 0:087b5655778d 326
dflet 0:087b5655778d 327 #define MQP_RX_DO_NOT_RPT_COR(mqp) mqp_do_not_report_cor(mqp)
dflet 0:087b5655778d 328
dflet 0:087b5655778d 329 /* Only if MQP has good RX data i.e. this macro shouldn't be used for bad RX */
dflet 0:087b5655778d 330 #define MQP_RX_DO_NOT_RPT_SET(mqp) (mqp->private_ = 1)
dflet 0:087b5655778d 331
dflet 0:087b5655778d 332 #define MQP_TX_DONE_LEN_ADD(mqp, add) (mqp->private_ += add)
dflet 0:087b5655778d 333 #define MQP_TX_DONE_LEN_GET(mqp) (mqp->private_)
dflet 0:087b5655778d 334
dflet 0:087b5655778d 335 /*---------------------------Fix-Up Ends ------------------------------------*/
dflet 0:087b5655778d 336
dflet 0:087b5655778d 337 static int32_t loopb_net = -1;
dflet 0:087b5655778d 338 static const uint8_t LOOP_DATA[] = {0x00, 0x01};
dflet 0:087b5655778d 339 #define LOOP_DLEN sizeof(LOOP_DATA)
dflet 0:087b5655778d 340
dflet 0:087b5655778d 341 static int32_t loopb_trigger(void)
dflet 0:087b5655778d 342 {
dflet 0:087b5655778d 343
dflet 0:087b5655778d 344
dflet 0:087b5655778d 345 uint8_t ip_addr[] = {127,0,0,1};
dflet 0:087b5655778d 346
dflet 0:087b5655778d 347 return (-1 != loopb_net)?
dflet 0:087b5655778d 348 net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN,
dflet 0:087b5655778d 349 loopb_portid, ip_addr, 4) : MQP_ERR_LIBQUIT;
dflet 0:087b5655778d 350 }
dflet 0:087b5655778d 351
dflet 0:087b5655778d 352 static void session_311fix(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 353 {
dflet 0:087b5655778d 354 struct mqtt_ack_wlist *wl = &CLIENT(cl_ctx)->qos_ack1_wl;
dflet 0:087b5655778d 355 struct mqtt_packet *elem = wl->head;
dflet 0:087b5655778d 356
dflet 0:087b5655778d 357 while(elem) {
dflet 0:087b5655778d 358 struct mqtt_packet *next = elem->next;
dflet 0:087b5655778d 359 if(MQTT_PUBLISH != elem->msg_type)
dflet 0:087b5655778d 360 mqp_ack_wlist_remove(wl, elem->msg_id);
dflet 0:087b5655778d 361
dflet 0:087b5655778d 362 elem = next;
dflet 0:087b5655778d 363 }
dflet 0:087b5655778d 364
dflet 0:087b5655778d 365 return;
dflet 0:087b5655778d 366 }
dflet 0:087b5655778d 367
dflet 0:087b5655778d 368 static void session_delete(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 369 {
dflet 0:087b5655778d 370 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 371
dflet 0:087b5655778d 372 DBG_INFO("C: Cleaning session for net %d\n\r", cl_ctx->net);
dflet 0:087b5655778d 373
dflet 0:087b5655778d 374 qos2_pub_cq_reset(&client->qos2_rx_cq);
dflet 0:087b5655778d 375 qos2_pub_cq_reset(&client->qos2_tx_cq);
dflet 0:087b5655778d 376
dflet 0:087b5655778d 377 mqp_ack_wlist_purge(&client->qos_ack1_wl);
dflet 0:087b5655778d 378
dflet 0:087b5655778d 379 return;
dflet 0:087b5655778d 380 }
dflet 0:087b5655778d 381
dflet 0:087b5655778d 382 /*------------------------------------------------------------------------------
dflet 0:087b5655778d 383 * Routine to manage error conditions in client - close the network connection
dflet 0:087b5655778d 384 *----------------------------------------------------------------------------*/
dflet 0:087b5655778d 385 static void do_net_close(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 386 {
dflet 0:087b5655778d 387 int32_t net = cl_ctx->net;
dflet 0:087b5655778d 388
dflet 0:087b5655778d 389 if(-1 == net)
dflet 0:087b5655778d 390 return; /* network closed already, must not happen */
dflet 0:087b5655778d 391
dflet 0:087b5655778d 392 if(IS_CLN_SESSION(cl_ctx)) {
dflet 0:087b5655778d 393 session_delete(cl_ctx);
dflet 0:087b5655778d 394 } else if(!IS_PROTO_VER31(cl_ctx)) {
dflet 0:087b5655778d 395 /* Version 3.1.1 doesn't need SUB and UNSUB re-send */
dflet 0:087b5655778d 396 session_311fix(cl_ctx);
dflet 0:087b5655778d 397 }
dflet 0:087b5655778d 398
dflet 0:087b5655778d 399 tx_part_reset(&CLIENT(cl_ctx)->tx_part); /* Part TX, if any */
dflet 0:087b5655778d 400
dflet 0:087b5655778d 401 cl_ctx->flags &= ~(CONNACK_AWAIT_FLAG | NOW_CONNECTED_FLAG |
dflet 0:087b5655778d 402 KA_PINGER_RSP_FLAG | USER_PING_RSP_FLAG |
dflet 0:087b5655778d 403 NETWORK_CLOSE_FLAG | DO_CONNACK_TO_FLAG);
dflet 0:087b5655778d 404
dflet 0:087b5655778d 405 cl_ctx->net = -1;
dflet 0:087b5655778d 406 net_ops->close(net);
dflet 0:087b5655778d 407
dflet 0:087b5655778d 408 USR_INFO("C: Net %d now closed\n\r", net);
dflet 0:087b5655778d 409
dflet 0:087b5655778d 410 return;
dflet 0:087b5655778d 411 }
dflet 0:087b5655778d 412
dflet 0:087b5655778d 413 static void do_net_close_rx(struct client_ctx *cl_ctx, int32_t cause)
dflet 0:087b5655778d 414 {
dflet 0:087b5655778d 415 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
dflet 0:087b5655778d 416
dflet 0:087b5655778d 417 DBG_INFO("C: RX closing Net %d [%d]\n\r", cl_ctx->net, cause);
dflet 0:087b5655778d 418
dflet 0:087b5655778d 419 do_net_close(cl_ctx);
dflet 0:087b5655778d 420 if(ctx_cbs->disconn_cb)
dflet 0:087b5655778d 421 ctx_cbs->disconn_cb(CLIENT(cl_ctx)->app, cause);
dflet 0:087b5655778d 422
dflet 0:087b5655778d 423 if(A_GROUP_MEMBER(cl_ctx))
dflet 0:087b5655778d 424 cl_ctx_remove(&used_ctxs, cl_ctx);
dflet 0:087b5655778d 425
dflet 0:087b5655778d 426 return;
dflet 0:087b5655778d 427 }
dflet 0:087b5655778d 428
dflet 0:087b5655778d 429 static void do_net_close_tx(struct client_ctx *cl_ctx, char *cause)
dflet 0:087b5655778d 430 {
dflet 0:087b5655778d 431 DBG_INFO("C: TX closing Net %d [%s]\n\r", cl_ctx->net, cause);
dflet 0:087b5655778d 432
dflet 0:087b5655778d 433 if(RECV_TASK_AVBL(cl_ctx)) {
dflet 0:087b5655778d 434 cl_ctx->flags |= NETWORK_CLOSE_FLAG;
dflet 0:087b5655778d 435 if(A_GROUP_MEMBER(cl_ctx))
dflet 0:087b5655778d 436 loopb_trigger();
dflet 0:087b5655778d 437 } else {
dflet 0:087b5655778d 438 struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp;
dflet 0:087b5655778d 439
dflet 0:087b5655778d 440 do_net_close(cl_ctx); /* No RX Task, close now */
dflet 0:087b5655778d 441
dflet 0:087b5655778d 442 /* Release partial MQP, if any, for a CTX w/ CB */
dflet 0:087b5655778d 443 if((NULL != rx_mqp) && (NULL != rx_mqp->free))
dflet 0:087b5655778d 444 mqp_free(rx_mqp);
dflet 0:087b5655778d 445
dflet 0:087b5655778d 446 CLIENT(cl_ctx)->rx_mqp = NULL;
dflet 0:087b5655778d 447 }
dflet 0:087b5655778d 448
dflet 0:087b5655778d 449 return;
dflet 0:087b5655778d 450 }
dflet 0:087b5655778d 451
dflet 0:087b5655778d 452 /*----------------------------------------------------------------------------
dflet 0:087b5655778d 453 * QoS2 PUB RX Message handling mechanism and associated house-keeping
dflet 0:087b5655778d 454 *--------------------------------------------------------------------------*/
dflet 0:087b5655778d 455 static bool qos2_pub_rx_logup(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 456 {
dflet 0:087b5655778d 457 return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id);
dflet 0:087b5655778d 458 }
dflet 0:087b5655778d 459
dflet 0:087b5655778d 460 static bool ack2_msg_id_logup(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 461 {
dflet 0:087b5655778d 462 return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_tx_cq, msg_id);
dflet 0:087b5655778d 463 }
dflet 0:087b5655778d 464
dflet 0:087b5655778d 465 static bool qos2_pub_rx_unlog(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 466 {
dflet 0:087b5655778d 467 return qos2_pub_cq_unlog(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id);
dflet 0:087b5655778d 468 }
dflet 0:087b5655778d 469
dflet 0:087b5655778d 470 static bool ack2_msg_id_unlog(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 471 {
dflet 0:087b5655778d 472 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 473 if(qos2_pub_cq_unlog(&client->qos2_tx_cq, msg_id)) {
dflet 0:087b5655778d 474 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
dflet 0:087b5655778d 475 if(ctx_cbs->ack_notify)
dflet 0:087b5655778d 476 ctx_cbs->ack_notify(client->app, MQTT_PUBCOMP,
dflet 0:087b5655778d 477 msg_id, NULL, 0);
dflet 0:087b5655778d 478 return true;
dflet 0:087b5655778d 479 }
dflet 0:087b5655778d 480
dflet 0:087b5655778d 481 return false;
dflet 0:087b5655778d 482 }
dflet 0:087b5655778d 483
dflet 0:087b5655778d 484 static bool qos2_pub_rx_is_done(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 485 {
dflet 0:087b5655778d 486 return qos2_pub_cq_check(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id);
dflet 0:087b5655778d 487 }
dflet 0:087b5655778d 488
dflet 0:087b5655778d 489 static bool awaits_pkts(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 490 {
dflet 0:087b5655778d 491
dflet 0:087b5655778d 492 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 493
dflet 0:087b5655778d 494 return client->qos_ack1_wl.head ||
dflet 0:087b5655778d 495 qos2_pub_cq_count(&client->qos2_rx_cq) ||
dflet 0:087b5655778d 496 qos2_pub_cq_count(&client->qos2_tx_cq)?
dflet 0:087b5655778d 497 true : false;
dflet 0:087b5655778d 498 }
dflet 0:087b5655778d 499
dflet 0:087b5655778d 500 static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp)
dflet 0:087b5655778d 501 {
dflet 0:087b5655778d 502 mqp_free(mqp);
dflet 0:087b5655778d 503 return MQP_ERR_PKT_LEN;
dflet 0:087b5655778d 504 }
dflet 0:087b5655778d 505
dflet 0:087b5655778d 506 static int32_t is_valid_utf8_string(const struct utf8_string *utf8)
dflet 0:087b5655778d 507 {
dflet 0:087b5655778d 508 /* Valid topic should be > 0 byte and must hosted in usable buffer */
dflet 0:087b5655778d 509 return ((utf8->length > 0) && (NULL != utf8->buffer))? true : false;
dflet 0:087b5655778d 510 }
dflet 0:087b5655778d 511
dflet 0:087b5655778d 512 #define RET_IF_INVALID_UTF8(utf8) \
dflet 0:087b5655778d 513 if(false == is_valid_utf8_string(utf8)) \
dflet 0:087b5655778d 514 return -1;
dflet 0:087b5655778d 515
dflet 0:087b5655778d 516 static bool is_connected(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 517 {
dflet 0:087b5655778d 518
dflet 0:087b5655778d 519 return (HAS_CONNECTION(cl_ctx) && !NEED_NET_CLOSE(cl_ctx))?
dflet 0:087b5655778d 520 true : false;
dflet 0:087b5655778d 521 }
dflet 0:087b5655778d 522
dflet 0:087b5655778d 523 uint16_t mqtt_client_new_msg_id()
dflet 0:087b5655778d 524 {
dflet 0:087b5655778d 525 return assign_new_msg_id();
dflet 0:087b5655778d 526 }
dflet 0:087b5655778d 527
dflet 0:087b5655778d 528 bool mqtt_client_is_connected(void *ctx)
dflet 0:087b5655778d 529 {
dflet 0:087b5655778d 530 return is_connected(CL_CTX(ctx));
dflet 0:087b5655778d 531 }
dflet 0:087b5655778d 532
dflet 0:087b5655778d 533 /*----------------------------------------------------------------------------
dflet 0:087b5655778d 534 * MQTT TX Routines
dflet 0:087b5655778d 535 *--------------------------------------------------------------------------*/
dflet 0:087b5655778d 536 static void used_ctxs_TO_sort(struct client_ctx *cl_ctx_TO)
dflet 0:087b5655778d 537 {
dflet 0:087b5655778d 538 cl_ctx_remove(&used_ctxs, cl_ctx_TO);
dflet 0:087b5655778d 539 cl_ctx_timeout_insert(&used_ctxs, cl_ctx_TO);
dflet 0:087b5655778d 540 }
dflet 0:087b5655778d 541
dflet 0:087b5655778d 542 static inline int32_t net_send(int32_t net, const uint8_t *buf, uint32_t len, void *ctx)
dflet 0:087b5655778d 543 {
dflet 0:087b5655778d 544
dflet 0:087b5655778d 545 int32_t rv = net_ops->send(net, buf, len, ctx);
dflet 0:087b5655778d 546 if(rv <= 0) {
dflet 0:087b5655778d 547 memset(print_buf, 0x00, PRINT_BUF_LEN);
dflet 0:087b5655778d 548 sprintf((char*) print_buf, "MQP_ERR_NETWORK %i\r\n", MQP_ERR_NETWORK);
dflet 0:087b5655778d 549 Uart_Write((uint8_t *) print_buf);
dflet 0:087b5655778d 550 rv = MQP_ERR_NETWORK;
dflet 0:087b5655778d 551 }
dflet 0:087b5655778d 552 return rv;
dflet 0:087b5655778d 553 }
dflet 0:087b5655778d 554
dflet 0:087b5655778d 555 #if 0
dflet 0:087b5655778d 556 static int32_t cl_ctx_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len,
dflet 0:087b5655778d 557 bool is_conn_msg)
dflet 0:087b5655778d 558 {
dflet 0:087b5655778d 559
dflet 0:087b5655778d 560 int32_t rv = MQP_ERR_NOTCONN;
dflet 0:087b5655778d 561
dflet 0:087b5655778d 562 /* For CONNECT msg, a client context mustn't be already connected.
dflet 0:087b5655778d 563 For others msgs, a client context must be conected to server */
dflet 0:087b5655778d 564 if(false == (is_conn_msg ^ is_connected(cl_ctx)))
dflet 0:087b5655778d 565 goto cl_ctx_send_exit1;
dflet 0:087b5655778d 566
dflet 0:087b5655778d 567 rv = net_send(cl_ctx->net, buf, len);
dflet 0:087b5655778d 568 if(rv > 0) { /* A good send, do follow-up */
dflet 0:087b5655778d 569 cl_ctx_timeout_update(cl_ctx, net_ops->time());
dflet 0:087b5655778d 570 if(A_GROUP_MEMBER(cl_ctx) && HAS_CONNECTION(cl_ctx)) {
dflet 0:087b5655778d 571 /* With update to timeout,
dflet 0:087b5655778d 572 a sorting is impending */
dflet 0:087b5655778d 573 used_ctxs_TO_sort(cl_ctx);
dflet 0:087b5655778d 574 }
dflet 0:087b5655778d 575
dflet 0:087b5655778d 576 goto cl_ctx_send_exit1; /* A Good Send */
dflet 0:087b5655778d 577 }
dflet 0:087b5655778d 578
dflet 0:087b5655778d 579 do_net_close_tx(cl_ctx, "snd-err");
dflet 0:087b5655778d 580
dflet 0:087b5655778d 581 cl_ctx_send_exit1:
dflet 0:087b5655778d 582 USR_INFO("C: FH-B1 0x%02x, len %u bytes, to net %d: %s\n\r",
dflet 0:087b5655778d 583 *buf, len, cl_ctx->net, (rv > 0)? "Sent" : "Fail");
dflet 0:087b5655778d 584 return rv;
dflet 0:087b5655778d 585 }
dflet 0:087b5655778d 586 #endif
dflet 0:087b5655778d 587
dflet 0:087b5655778d 588 static int32_t cl_ctx_part_send(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 589 {
dflet 0:087b5655778d 590
dflet 0:087b5655778d 591 struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part;
dflet 0:087b5655778d 592 const uint8_t *buf = TX_PART_BUFFER(tx_part);
dflet 0:087b5655778d 593 uint32_t len = TX_PART_BUF_SZ(tx_part);
dflet 0:087b5655778d 594 uint32_t ofs = tx_part->offset;
dflet 0:087b5655778d 595 uint8_t B1 = *buf;
dflet 0:087b5655778d 596
dflet 0:087b5655778d 597 int32_t rv = net_send(cl_ctx->net, buf, len, (void*)cl_ctx);
dflet 0:087b5655778d 598 if(rv > 0) { /* Follow-up for a good send */
dflet 0:087b5655778d 599 if(HAS_CONNECTION(cl_ctx)) {
dflet 0:087b5655778d 600 /* Update TX timeout, if 'CTX' is connected */
dflet 0:087b5655778d 601 cl_ctx_timeout_update(cl_ctx, net_ops->time());
dflet 0:087b5655778d 602
dflet 0:087b5655778d 603 /* After update, 'CTX'(s) sorting is a must */
dflet 0:087b5655778d 604 if(A_GROUP_MEMBER(cl_ctx))
dflet 0:087b5655778d 605 used_ctxs_TO_sort(cl_ctx);
dflet 0:087b5655778d 606 }
dflet 0:087b5655778d 607
dflet 0:087b5655778d 608 if(rv != len)
dflet 0:087b5655778d 609 /* Partial data was sent */
dflet 0:087b5655778d 610 tx_part_addup(tx_part, rv);
dflet 0:087b5655778d 611 else
dflet 0:087b5655778d 612 tx_part_reset(tx_part);
dflet 0:087b5655778d 613
dflet 0:087b5655778d 614 goto cl_ctx_send_exit1; /* A Good Send */
dflet 0:087b5655778d 615 }
dflet 0:087b5655778d 616
dflet 0:087b5655778d 617 do_net_close_tx(cl_ctx, "snd-err");
dflet 0:087b5655778d 618
dflet 0:087b5655778d 619 cl_ctx_send_exit1:
dflet 0:087b5655778d 620 USR_INFO("C: %s 0x%02x to net %d, %s (%d Bytes) [@ %u]\n\r",
dflet 0:087b5655778d 621 ofs? "PartN" : "FH-B1", B1, cl_ctx->net,
dflet 0:087b5655778d 622 (rv > 0)? "Sent" : "Fail", rv, net_ops->time());
dflet 0:087b5655778d 623
dflet 0:087b5655778d 624 return rv;
dflet 0:087b5655778d 625 }
dflet 0:087b5655778d 626
dflet 0:087b5655778d 627 static int32_t cl_ctx_seg1_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len,
dflet 0:087b5655778d 628 bool is_conn_msg, struct mqtt_packet *tx_mqp)
dflet 0:087b5655778d 629 {
dflet 0:087b5655778d 630
dflet 0:087b5655778d 631 struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part;
dflet 0:087b5655778d 632
dflet 0:087b5655778d 633 /* For CONNECT msg, a client context mustn't be already connected.
dflet 0:087b5655778d 634 For others msgs, a client context must be conected to server */
dflet 0:087b5655778d 635 if(false == (is_conn_msg ^ is_connected(cl_ctx)))
dflet 0:087b5655778d 636 return MQP_ERR_NOTCONN;
dflet 0:087b5655778d 637
dflet 0:087b5655778d 638 if(TX_PART_IN_USE(tx_part))
dflet 0:087b5655778d 639 return MQP_ERR_BADCALL;
dflet 0:087b5655778d 640
dflet 0:087b5655778d 641 tx_part_setup(tx_part, buf, len, tx_mqp);
dflet 0:087b5655778d 642
dflet 0:087b5655778d 643 return cl_ctx_part_send(cl_ctx);
dflet 0:087b5655778d 644 }
dflet 0:087b5655778d 645
dflet 0:087b5655778d 646 int32_t mqtt_client_send_progress(void *ctx)
dflet 0:087b5655778d 647 {
dflet 0:087b5655778d 648 struct client_ctx *cl_ctx = CL_CTX(ctx);
dflet 0:087b5655778d 649 struct tx_part_pkt *tx_part = NULL;
dflet 0:087b5655778d 650 int32_t rv = MQP_ERR_BADCALL;
dflet 0:087b5655778d 651
dflet 0:087b5655778d 652 if(NULL == ctx)
dflet 0:087b5655778d 653 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 654
dflet 0:087b5655778d 655 tx_part = &CLIENT(cl_ctx)->tx_part;
dflet 0:087b5655778d 656
dflet 0:087b5655778d 657 if(!TX_PART_IN_USE(tx_part))
dflet 0:087b5655778d 658 return rv;
dflet 0:087b5655778d 659
dflet 0:087b5655778d 660 rv = cl_ctx_part_send(cl_ctx);
dflet 0:087b5655778d 661 if(rv > 0)
dflet 0:087b5655778d 662 rv = TX_PART_BUF_SZ(tx_part);
dflet 0:087b5655778d 663
dflet 0:087b5655778d 664 return rv;
dflet 0:087b5655778d 665 }
dflet 0:087b5655778d 666
dflet 0:087b5655778d 667 static int32_t wr_connect_pl(struct client_ctx *cl_ctx, uint8_t *buf,
dflet 0:087b5655778d 668 uint32_t fsz, uint8_t *conn_opts)
dflet 0:087b5655778d 669 {
dflet 0:087b5655778d 670
dflet 0:087b5655778d 671 /* UTF8 usage selection order: Client, W-Topic, W-Msg, User-Name, Pwd */
dflet 0:087b5655778d 672 uint8_t utf8_sel[] = {0x00, WILL_CONFIG_VAL, 0x00,
dflet 0:087b5655778d 673 USER_NAME_OPVAL, PASS_WORD_OPVAL
dflet 0:087b5655778d 674 };
dflet 0:087b5655778d 675 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 676 uint8_t *ref = buf;
dflet 0:087b5655778d 677
dflet 0:087b5655778d 678 int32_t i = 0;
dflet 0:087b5655778d 679
dflet 0:087b5655778d 680 for(i = 0; i < 5; i++) { /* TBD 5 --> macro */
dflet 0:087b5655778d 681 uint16_t len = 2;
dflet 0:087b5655778d 682 const struct utf8_string *utf8 = client->conn_pl_utf8s[i];
dflet 0:087b5655778d 683 if(NULL == utf8) {
dflet 0:087b5655778d 684 /* UTF8 absent: Client ID (i = 0) and Will MSG (i = 2)
dflet 0:087b5655778d 685 set zero byte length in the CONNECT message */
dflet 0:087b5655778d 686 if(0 != i)
dflet 0:087b5655778d 687 if(!((2 == i) && (*conn_opts & WILL_CONFIG_VAL)))
dflet 0:087b5655778d 688 continue; /* Others, just pass */
dflet 0:087b5655778d 689 } else {
dflet 0:087b5655778d 690 len += utf8->length;
dflet 0:087b5655778d 691 }
dflet 0:087b5655778d 692
dflet 0:087b5655778d 693 if(fsz < (buf - ref + len)) { /* TBD end = ref + fsz */
dflet 0:087b5655778d 694 Uart_Write((uint8_t*)"Payload: no space left fail\r\n");
dflet 0:087b5655778d 695 return MQP_ERR_PKT_LEN; /* Payload: no space left */
dflet 0:087b5655778d 696 }
dflet 0:087b5655778d 697 if(2 == len) {
dflet 0:087b5655778d 698 buf += buf_wr_nbo_2B(buf, 0); /* WR 0 byte length */
dflet 0:087b5655778d 699 } else {
dflet 0:087b5655778d 700 buf += mqp_buf_wr_utf8(buf, utf8);
dflet 0:087b5655778d 701 }
dflet 0:087b5655778d 702 *conn_opts |= utf8_sel[i]; /* Enable message flags */
dflet 0:087b5655778d 703 }
dflet 0:087b5655778d 704
dflet 0:087b5655778d 705 return buf - ref;
dflet 0:087b5655778d 706 }
dflet 0:087b5655778d 707
dflet 0:087b5655778d 708
dflet 0:087b5655778d 709 /* Define protocol information for the supported versions */
dflet 0:087b5655778d 710 static uint8_t mqtt310[] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', 0x03};
dflet 0:087b5655778d 711 static uint8_t mqtt311[] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04};
dflet 0:087b5655778d 712
dflet 0:087b5655778d 713 static inline uint16_t get_connect_vh_len(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 714 {
dflet 0:087b5655778d 715
dflet 0:087b5655778d 716 return (IS_PROTO_VER31(cl_ctx)? sizeof(mqtt310) : sizeof(mqtt311))
dflet 0:087b5655778d 717 + 3;
dflet 0:087b5655778d 718 }
dflet 0:087b5655778d 719
dflet 0:087b5655778d 720 static int32_t wr_connect_vh(struct client_ctx *cl_ctx, uint8_t *buf,
dflet 0:087b5655778d 721 uint16_t ka_secs, uint8_t conn_opts)
dflet 0:087b5655778d 722 {
dflet 0:087b5655778d 723
dflet 0:087b5655778d 724 uint8_t *ref = buf;
dflet 0:087b5655778d 725
dflet 0:087b5655778d 726 if(IS_PROTO_VER31(cl_ctx))
dflet 0:087b5655778d 727 buf += buf_wr_nbytes(buf, mqtt310, sizeof(mqtt310));
dflet 0:087b5655778d 728 else
dflet 0:087b5655778d 729 buf += buf_wr_nbytes(buf, mqtt311, sizeof(mqtt311));
dflet 0:087b5655778d 730
dflet 0:087b5655778d 731 *buf++ = conn_opts;
dflet 0:087b5655778d 732 buf += buf_wr_nbo_2B(buf, ka_secs);
dflet 0:087b5655778d 733
dflet 0:087b5655778d 734 return buf - ref;
dflet 0:087b5655778d 735 }
dflet 0:087b5655778d 736
dflet 0:087b5655778d 737 static int32_t net_connect(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 738 {
dflet 0:087b5655778d 739
dflet 0:087b5655778d 740 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 741
dflet 0:087b5655778d 742 if(NEED_NET_CLOSE(cl_ctx)) {
dflet 0:087b5655778d 743 Uart_Write((uint8_t*)"Return MQP_ERR_NOT_DEF\r\n");
dflet 0:087b5655778d 744 return MQP_ERR_NOT_DEF;
dflet 0:087b5655778d 745 }
dflet 0:087b5655778d 746 if(NULL == net_ops) {
dflet 0:087b5655778d 747 Uart_Write((uint8_t*)"Return MQP_ERR_NET_OPS\r\n");
dflet 0:087b5655778d 748 return MQP_ERR_NET_OPS;
dflet 0:087b5655778d 749 }
dflet 0:087b5655778d 750 cl_ctx->net = net_ops->open(client->nwconn_opts | DEV_NETCONN_OPT_TCP,
dflet 0:087b5655778d 751 client->server_addr,
dflet 0:087b5655778d 752 client->port_number,
dflet 0:087b5655778d 753 &client->nw_security);
dflet 0:087b5655778d 754
dflet 0:087b5655778d 755 return (-1 == cl_ctx->net)? MQP_ERR_NETWORK : 0;
dflet 0:087b5655778d 756 }
dflet 0:087b5655778d 757
dflet 0:087b5655778d 758 static
dflet 0:087b5655778d 759 int32_t cl_ctx_conn_state_try_locked(struct client_ctx *cl_ctx, const uint8_t *buf,
dflet 0:087b5655778d 760 uint32_t len, uint16_t ka_secs, bool clean_session,
dflet 0:087b5655778d 761 struct mqtt_packet *tx_mqp)
dflet 0:087b5655778d 762 {
dflet 0:087b5655778d 763
dflet 0:087b5655778d 764 int32_t rv = 0;
dflet 0:087b5655778d 765
dflet 0:087b5655778d 766 // MUTEX_LOCKIN();
dflet 0:087b5655778d 767 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 768 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 769 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 770 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 100 ) == pdTRUE ) {
dflet 0:087b5655778d 771 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 772 // shared resource.
dflet 0:087b5655778d 773
dflet 0:087b5655778d 774 rv = net_connect(cl_ctx);
dflet 0:087b5655778d 775 if(rv < 0) {
dflet 0:087b5655778d 776 Uart_Write((uint8_t*)"net_connect failed\r\n");
dflet 0:087b5655778d 777 goto cl_ctx_conn_state_try_locked_exit1;
dflet 0:087b5655778d 778 }
dflet 0:087b5655778d 779 /* Ensure LIB is initialized & CTX isn't awaiting CONNACK */
dflet 0:087b5655778d 780 rv = MQP_ERR_BADCALL;
dflet 0:087b5655778d 781 if(false == ((INIT_DONE_STATE != cl_lib_state) || (AWAITS_CONNACK(cl_ctx)))) {
dflet 0:087b5655778d 782 rv = cl_ctx_seg1_send(cl_ctx, buf, len, true, tx_mqp);
dflet 0:087b5655778d 783 }
dflet 0:087b5655778d 784 if(rv < 0) {
dflet 0:087b5655778d 785 Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n");
dflet 0:087b5655778d 786 goto cl_ctx_conn_state_try_locked_exit1; /* Fail */
dflet 0:087b5655778d 787 }
dflet 0:087b5655778d 788 /* Successfully sent CONNECT msg - let's do housekeeping */
dflet 0:087b5655778d 789 cl_ctx->timeout = net_ops->time();/* Fixup: CONN TX Time */
dflet 0:087b5655778d 790 cl_ctx->flags |= DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG;
dflet 0:087b5655778d 791 cl_ctx->flags |= clean_session? CLEAN_SESSION_FLAG : 0;
dflet 0:087b5655778d 792
dflet 0:087b5655778d 793 cl_ctx->ka_secs = ka_secs;
dflet 0:087b5655778d 794
dflet 0:087b5655778d 795 if(A_GROUP_MEMBER(cl_ctx)) {
dflet 0:087b5655778d 796 cl_ctx->next = conn_ctxs;
dflet 0:087b5655778d 797 conn_ctxs = cl_ctx;
dflet 0:087b5655778d 798
dflet 0:087b5655778d 799 /* First entry in 'conn_ctxs': schedule a move to
dflet 0:087b5655778d 800 'used_conn' (for house-keeping and tracking) */
dflet 0:087b5655778d 801 if(NULL == cl_ctx->next) {
dflet 0:087b5655778d 802 rv = loopb_trigger();
dflet 0:087b5655778d 803 }
dflet 0:087b5655778d 804 }
dflet 0:087b5655778d 805
dflet 0:087b5655778d 806
dflet 0:087b5655778d 807 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 808 // semaphore.
dflet 0:087b5655778d 809 //xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 810 } else {
dflet 0:087b5655778d 811 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 812 // the shared resource safely.
dflet 0:087b5655778d 813 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 814 }
dflet 0:087b5655778d 815 }
dflet 0:087b5655778d 816
dflet 0:087b5655778d 817 cl_ctx_conn_state_try_locked_exit1:
dflet 0:087b5655778d 818 // MUTEX_UNLOCK();
dflet 0:087b5655778d 819 xSemaphoreGive(xSemaphore);
dflet 0:087b5655778d 820
dflet 0:087b5655778d 821 return rv;
dflet 0:087b5655778d 822 }
dflet 0:087b5655778d 823
dflet 0:087b5655778d 824 static
dflet 0:087b5655778d 825 int32_t connect_msg_send(struct client_ctx *cl_ctx, bool clean_session, uint16_t ka_secs)
dflet 0:087b5655778d 826 {
dflet 0:087b5655778d 827
dflet 0:087b5655778d 828 struct mqtt_packet *mqp = mqp_client_send_alloc(MQTT_CONNECT);
dflet 0:087b5655778d 829 uint8_t *buf, *ref, conn_opts = clean_session? CLEAN_START_VAL : 0;
dflet 0:087b5655778d 830 int32_t rv = MQP_ERR_PKT_LEN;
dflet 0:087b5655778d 831 uint32_t fsz; /* Free buffer size in PKT */
dflet 0:087b5655778d 832 uint16_t vhl = get_connect_vh_len(cl_ctx);
dflet 0:087b5655778d 833
dflet 0:087b5655778d 834 if(NULL == mqp) {
dflet 0:087b5655778d 835 Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\r\n");
dflet 0:087b5655778d 836 return MQP_ERR_PKT_AVL;
dflet 0:087b5655778d 837 }
dflet 0:087b5655778d 838 fsz = MQP_FREEBUF_LEN(mqp);
dflet 0:087b5655778d 839 if(fsz < vhl) {
dflet 0:087b5655778d 840 Uart_Write((uint8_t*)"No space for VAR HDR\r\n");
dflet 0:087b5655778d 841 goto connect_msg_send_exit1; /* No space for VAR HDR */
dflet 0:087b5655778d 842 }
dflet 0:087b5655778d 843 mqp->vh_len = vhl; /* Reserve buffer for variable header */
dflet 0:087b5655778d 844 buf = ref = MQP_PAYLOAD_BUF(mqp);/* Get started to incorporate payload */
dflet 0:087b5655778d 845
dflet 0:087b5655778d 846 rv = wr_connect_pl(cl_ctx, buf, fsz - vhl, &conn_opts);/* Payload data */
dflet 0:087b5655778d 847 if(rv < 0) {
dflet 0:087b5655778d 848 memset(print_buf, 0x00, PRINT_BUF_LEN);
dflet 0:087b5655778d 849 sprintf((char*) print_buf, "Payload WR failed %i\r\n",rv);
dflet 0:087b5655778d 850 Uart_Write((uint8_t *) print_buf);
dflet 0:087b5655778d 851 goto connect_msg_send_exit1; /* Payload WR failed */
dflet 0:087b5655778d 852 }
dflet 0:087b5655778d 853 buf += rv;
dflet 0:087b5655778d 854 mqp->pl_len = buf - ref;
dflet 0:087b5655778d 855
dflet 0:087b5655778d 856 wr_connect_vh(cl_ctx, ref - vhl, ka_secs,
dflet 0:087b5655778d 857 CLIENT(cl_ctx)->will_opts | conn_opts); /* Var Header */
dflet 0:087b5655778d 858
dflet 0:087b5655778d 859 mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, MQTT_QOS0, false));/* Fix Header */
dflet 0:087b5655778d 860 ref = MQP_FHEADER_BUF(mqp);
dflet 0:087b5655778d 861
dflet 0:087b5655778d 862 /* Following routine frees up MQP - whether error or not */
dflet 0:087b5655778d 863 return cl_ctx_conn_state_try_locked(cl_ctx, ref, buf - ref,
dflet 0:087b5655778d 864 ka_secs, clean_session,
dflet 0:087b5655778d 865 mqp);
dflet 0:087b5655778d 866 connect_msg_send_exit1:
dflet 0:087b5655778d 867
dflet 0:087b5655778d 868 if(mqp) {
dflet 0:087b5655778d 869 mqp_free_locked(mqp);
dflet 0:087b5655778d 870 }
dflet 0:087b5655778d 871 return rv;
dflet 0:087b5655778d 872 }
dflet 0:087b5655778d 873
dflet 0:087b5655778d 874 int32_t mqtt_connect_msg_send(void *ctx, bool clean_session, uint16_t ka_secs)
dflet 0:087b5655778d 875 {
dflet 0:087b5655778d 876
dflet 0:087b5655778d 877 return ctx?
dflet 0:087b5655778d 878 connect_msg_send(CL_CTX(ctx), clean_session, ka_secs) : -1;
dflet 0:087b5655778d 879 }
dflet 0:087b5655778d 880
dflet 0:087b5655778d 881 /*
dflet 0:087b5655778d 882 To be used for the following messages: PUBLISH, SUBSCRIBE, UNSUBSCRIBE
dflet 0:087b5655778d 883 Dispatches msg to broker over socket. Frees-up MQP, in case, MSG has QoS0 or
dflet 0:087b5655778d 884 if client-lib allocated MQP encounters an error in dispatch.
dflet 0:087b5655778d 885 Returns, on success, number of bytes transfered, otherwise -1
dflet 0:087b5655778d 886 */
dflet 0:087b5655778d 887 static int32_t _msg_dispatch(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
dflet 0:087b5655778d 888 enum mqtt_qos qos, bool retain)
dflet 0:087b5655778d 889 {
dflet 0:087b5655778d 890
dflet 0:087b5655778d 891 bool not_qos0 = (MQTT_QOS0 != qos)? true : false;
dflet 0:087b5655778d 892 uint16_t msg_id = mqp->msg_id;
dflet 0:087b5655778d 893 int32_t rv = MQP_ERR_NETWORK;
dflet 0:087b5655778d 894
dflet 0:087b5655778d 895 mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain));
dflet 0:087b5655778d 896
dflet 0:087b5655778d 897 // MUTEX_LOCKIN();
dflet 0:087b5655778d 898 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 899 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 900 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 901 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 200 ) == pdTRUE ) {
dflet 0:087b5655778d 902 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 903 // shared resource.
dflet 0:087b5655778d 904
dflet 0:087b5655778d 905 if(not_qos0) {
dflet 0:087b5655778d 906 mqp->n_refs++; /* Need to enlist, do not free-up MQP */
dflet 0:087b5655778d 907 }
dflet 0:087b5655778d 908 /* Tries to free-up MQP either on error or if full pkt is sent */
dflet 0:087b5655778d 909 rv = cl_ctx_seg1_send(cl_ctx, MQP_FHEADER_BUF(mqp),
dflet 0:087b5655778d 910 MQP_CONTENT_LEN(mqp), false,
dflet 0:087b5655778d 911 mqp);
dflet 0:087b5655778d 912
dflet 0:087b5655778d 913 /* At this point, error or not, QoS0 MQP would have been freed */
dflet 0:087b5655778d 914
dflet 0:087b5655778d 915 if((rv <= 0) && not_qos0) {
dflet 0:087b5655778d 916 mqp_free(mqp); /* Err: Explicitly free-up non QoS0 MQP */
dflet 0:087b5655778d 917 Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n");
dflet 0:087b5655778d 918 goto _msg_dispatch_exit1;
dflet 0:087b5655778d 919 }
dflet 0:087b5655778d 920
dflet 0:087b5655778d 921 rv = msg_id; /* Make progress for a good send to the server */
dflet 0:087b5655778d 922
dflet 0:087b5655778d 923 if(not_qos0) { /* Enlist non QOS0 MQP to await ACK from server */
dflet 0:087b5655778d 924 mqp_ack_wlist_append(&CLIENT(cl_ctx)->qos_ack1_wl, mqp);
dflet 0:087b5655778d 925 }
dflet 0:087b5655778d 926 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 927 // semaphore.
dflet 0:087b5655778d 928 //xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 929 } else {
dflet 0:087b5655778d 930 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 931 // the shared resource safely.
dflet 0:087b5655778d 932 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 933 }
dflet 0:087b5655778d 934 }
dflet 0:087b5655778d 935
dflet 0:087b5655778d 936 _msg_dispatch_exit1:
dflet 0:087b5655778d 937
dflet 0:087b5655778d 938 // MUTEX_UNLOCK();
dflet 0:087b5655778d 939 xSemaphoreGive(xSemaphore);
dflet 0:087b5655778d 940
dflet 0:087b5655778d 941 return rv;
dflet 0:087b5655778d 942 }
dflet 0:087b5655778d 943
dflet 0:087b5655778d 944 static
dflet 0:087b5655778d 945 int32_t msg_dispatch_no_free(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
dflet 0:087b5655778d 946 enum mqtt_qos qos, bool retain)
dflet 0:087b5655778d 947 {
dflet 0:087b5655778d 948 if((NULL == mqp) || (NULL == cl_ctx))
dflet 0:087b5655778d 949 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 950
dflet 0:087b5655778d 951 mqp->n_refs++; /* Ensures caller that MQP is not freed-up */
dflet 0:087b5655778d 952
dflet 0:087b5655778d 953 return _msg_dispatch(cl_ctx, mqp, qos, retain);
dflet 0:087b5655778d 954 }
dflet 0:087b5655778d 955
dflet 0:087b5655778d 956 int32_t mqtt_client_pub_msg_send(void *ctx, const struct utf8_string *topic,
dflet 0:087b5655778d 957 const uint8_t *data_buf, uint32_t data_len,
dflet 0:087b5655778d 958 enum mqtt_qos qos, bool retain)
dflet 0:087b5655778d 959 {
dflet 0:087b5655778d 960
dflet 0:087b5655778d 961 struct mqtt_packet *mqp = NULL;
dflet 0:087b5655778d 962
dflet 0:087b5655778d 963 if((NULL == ctx) ||
dflet 0:087b5655778d 964 (NULL == topic) ||
dflet 0:087b5655778d 965 ((data_len > 0) && (NULL == data_buf))) {
dflet 0:087b5655778d 966 Uart_Write((uint8_t*)"MQP_ERR_FNPARAM\n\r");
dflet 0:087b5655778d 967 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 968 }
dflet 0:087b5655778d 969 if(false == is_valid_utf8_string(topic)) {
dflet 0:087b5655778d 970 Uart_Write((uint8_t*)"MQP_ERR_CONTENT\n\r");
dflet 0:087b5655778d 971 return MQP_ERR_CONTENT;
dflet 0:087b5655778d 972 }
dflet 0:087b5655778d 973 mqp = mqp_client_send_alloc(MQTT_PUBLISH);
dflet 0:087b5655778d 974 if(NULL == mqp) {
dflet 0:087b5655778d 975 Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\n\r");
dflet 0:087b5655778d 976 return MQP_ERR_PKT_AVL;
dflet 0:087b5655778d 977 }
dflet 0:087b5655778d 978 if((0 > mqp_pub_append_topic(mqp, topic, qos? assign_new_msg_id(): 0)) ||
dflet 0:087b5655778d 979 (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) {
dflet 0:087b5655778d 980 Uart_Write((uint8_t*)"len_err\n\r");
dflet 0:087b5655778d 981 return len_err_free_mqp(mqp);
dflet 0:087b5655778d 982 }
dflet 0:087b5655778d 983
dflet 0:087b5655778d 984 return _msg_dispatch(CL_CTX(ctx), mqp, qos, retain);
dflet 0:087b5655778d 985 }
dflet 0:087b5655778d 986
dflet 0:087b5655778d 987 int32_t mqtt_client_pub_dispatch(void *ctx, struct mqtt_packet *mqp,
dflet 0:087b5655778d 988 enum mqtt_qos qos, bool retain)
dflet 0:087b5655778d 989 {
dflet 0:087b5655778d 990 return msg_dispatch_no_free(CL_CTX(ctx), mqp, qos, retain);
dflet 0:087b5655778d 991 }
dflet 0:087b5655778d 992
dflet 0:087b5655778d 993 static int32_t tail_incorp_msg_id(struct mqtt_packet *mqp)
dflet 0:087b5655778d 994 {
dflet 0:087b5655778d 995 uint8_t *buf = MQP_FHEADER_BUF(mqp) + mqp->vh_len;
dflet 0:087b5655778d 996
dflet 0:087b5655778d 997 if(0 == mqp->msg_id) {
dflet 0:087b5655778d 998 mqp->msg_id = assign_new_msg_id();
dflet 0:087b5655778d 999 buf += buf_wr_nbo_2B(buf, mqp->msg_id);
dflet 0:087b5655778d 1000 mqp->vh_len += 2;
dflet 0:087b5655778d 1001
dflet 0:087b5655778d 1002 return 2;
dflet 0:087b5655778d 1003 }
dflet 0:087b5655778d 1004
dflet 0:087b5655778d 1005 return 0;
dflet 0:087b5655778d 1006 }
dflet 0:087b5655778d 1007
dflet 0:087b5655778d 1008 static int32_t buf_utf8_wr_try(uint8_t *buf, uint32_t fsz, const struct utf8_string *topic,
dflet 0:087b5655778d 1009 uint8_t qid)
dflet 0:087b5655778d 1010 {
dflet 0:087b5655778d 1011 uint8_t *ref = buf;
dflet 0:087b5655778d 1012
dflet 0:087b5655778d 1013 if(fsz < (topic->length + 2 + (QFL_VALUE == qid)? 0 : 1))
dflet 0:087b5655778d 1014 return MQP_ERR_PKT_LEN; /* No buf */
dflet 0:087b5655778d 1015
dflet 0:087b5655778d 1016 if(false == is_valid_utf8_string(topic))
dflet 0:087b5655778d 1017 return MQP_ERR_CONTENT;/* Invalid */
dflet 0:087b5655778d 1018
dflet 0:087b5655778d 1019 buf += mqp_buf_wr_utf8(buf, topic);
dflet 0:087b5655778d 1020 if(QFL_VALUE != qid)
dflet 0:087b5655778d 1021 *buf++ = qid;
dflet 0:087b5655778d 1022
dflet 0:087b5655778d 1023 return buf - ref;
dflet 0:087b5655778d 1024 }
dflet 0:087b5655778d 1025
dflet 0:087b5655778d 1026 static int32_t utf8_array_send(struct client_ctx *cl_ctx,
dflet 0:087b5655778d 1027 const struct utf8_strqos *subsc_vec,
dflet 0:087b5655778d 1028 const struct utf8_string *unsub_vec,
dflet 0:087b5655778d 1029 uint32_t n_elem)
dflet 0:087b5655778d 1030 {
dflet 0:087b5655778d 1031 struct mqtt_packet *mqp;
dflet 0:087b5655778d 1032 uint8_t *ref, *buf, *end;
dflet 0:087b5655778d 1033 uint32_t i;
dflet 0:087b5655778d 1034
dflet 0:087b5655778d 1035 if((NULL == cl_ctx) || !((!!subsc_vec) ^ (!!unsub_vec)) || (0 == n_elem))
dflet 0:087b5655778d 1036 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 1037
dflet 0:087b5655778d 1038 mqp = mqp_client_send_alloc(subsc_vec?
dflet 0:087b5655778d 1039 MQTT_SUBSCRIBE : MQTT_UNSUBSCRIBE);
dflet 0:087b5655778d 1040 if(NULL == mqp)
dflet 0:087b5655778d 1041 return MQP_ERR_PKT_AVL;
dflet 0:087b5655778d 1042
dflet 0:087b5655778d 1043 buf = MQP_VHEADER_BUF(mqp);
dflet 0:087b5655778d 1044 end = MQP_FREEBUF_LEN(mqp) + buf; /* End of free buffer */
dflet 0:087b5655778d 1045 if((end - buf) < 2)
dflet 0:087b5655778d 1046 return len_err_free_mqp(mqp);/* MSG-ID: no space */
dflet 0:087b5655778d 1047
dflet 0:087b5655778d 1048 buf += tail_incorp_msg_id(mqp);
dflet 0:087b5655778d 1049 ref = buf;
dflet 0:087b5655778d 1050
dflet 0:087b5655778d 1051 for(i = 0; i < n_elem; i++) {
dflet 0:087b5655778d 1052 const struct utf8_string *topic;
dflet 0:087b5655778d 1053 struct utf8_string topreq;
dflet 0:087b5655778d 1054 int32_t rv;
dflet 0:087b5655778d 1055
dflet 0:087b5655778d 1056 if(subsc_vec) {
dflet 0:087b5655778d 1057 topreq.length = subsc_vec[i].length;
dflet 0:087b5655778d 1058 topreq.buffer = subsc_vec[i].buffer;
dflet 0:087b5655778d 1059 topic = &topreq;
dflet 0:087b5655778d 1060 } else
dflet 0:087b5655778d 1061 topic = unsub_vec + i;
dflet 0:087b5655778d 1062
dflet 0:087b5655778d 1063 rv = buf_utf8_wr_try(buf, end - buf, topic, subsc_vec?
dflet 0:087b5655778d 1064 (uint8_t)subsc_vec[i].qosreq : QFL_VALUE);
dflet 0:087b5655778d 1065 if(rv < 0) {
dflet 0:087b5655778d 1066 mqp_free(mqp);
dflet 0:087b5655778d 1067 return rv;
dflet 0:087b5655778d 1068 }
dflet 0:087b5655778d 1069
dflet 0:087b5655778d 1070 buf += rv;
dflet 0:087b5655778d 1071 }
dflet 0:087b5655778d 1072
dflet 0:087b5655778d 1073 mqp->pl_len = buf - ref; /* Total length of topics data */
dflet 0:087b5655778d 1074
dflet 0:087b5655778d 1075 return _msg_dispatch(cl_ctx, mqp, MQTT_QOS1, false);
dflet 0:087b5655778d 1076 }
dflet 0:087b5655778d 1077
dflet 0:087b5655778d 1078 int32_t mqtt_sub_msg_send(void *ctx, const struct utf8_strqos *qos_topics, uint32_t count)
dflet 0:087b5655778d 1079 {
dflet 0:087b5655778d 1080 return utf8_array_send(CL_CTX(ctx), qos_topics, NULL, count);
dflet 0:087b5655778d 1081 }
dflet 0:087b5655778d 1082
dflet 0:087b5655778d 1083 int32_t mqtt_sub_dispatch(void *ctx, struct mqtt_packet *mqp)
dflet 0:087b5655778d 1084 {
dflet 0:087b5655778d 1085 return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false);
dflet 0:087b5655778d 1086 }
dflet 0:087b5655778d 1087
dflet 0:087b5655778d 1088 int32_t mqtt_unsub_msg_send(void *ctx, const struct utf8_string *topics, uint32_t count)
dflet 0:087b5655778d 1089 {
dflet 0:087b5655778d 1090 return utf8_array_send(CL_CTX(ctx), NULL, topics, count);
dflet 0:087b5655778d 1091 }
dflet 0:087b5655778d 1092
dflet 0:087b5655778d 1093 int32_t mqtt_unsub_dispatch(void *ctx, struct mqtt_packet *mqp)
dflet 0:087b5655778d 1094 {
dflet 0:087b5655778d 1095 return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false);
dflet 0:087b5655778d 1096 }
dflet 0:087b5655778d 1097
dflet 0:087b5655778d 1098 /* Note: in this revision of implementation, vh_msg_send() is being invoked
dflet 0:087b5655778d 1099 from a locked RX context. Should this situation change, so should the
dflet 0:087b5655778d 1100 'locking' considerations in the routine. */
dflet 0:087b5655778d 1101 static int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type,
dflet 0:087b5655778d 1102 enum mqtt_qos qos, bool has_vh,
dflet 0:087b5655778d 1103 uint16_t vh_data)
dflet 0:087b5655778d 1104 {
dflet 0:087b5655778d 1105 uint8_t buf[4];
dflet 0:087b5655778d 1106 uint32_t len = 2;
dflet 0:087b5655778d 1107
dflet 0:087b5655778d 1108 buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false));
dflet 0:087b5655778d 1109 buf[1] = has_vh ? 2 : 0;
dflet 0:087b5655778d 1110
dflet 0:087b5655778d 1111 if(has_vh)
dflet 0:087b5655778d 1112 len += buf_wr_nbo_2B(buf + 2, vh_data);
dflet 0:087b5655778d 1113
dflet 0:087b5655778d 1114 return cl_ctx_seg1_send(cl_ctx, buf, len, false, NULL);
dflet 0:087b5655778d 1115 }
dflet 0:087b5655778d 1116
dflet 0:087b5655778d 1117 static int32_t pingreq_send(struct client_ctx *cl_ctx, uint32_t rsp_flag)
dflet 0:087b5655778d 1118 {
dflet 0:087b5655778d 1119
dflet 0:087b5655778d 1120 int32_t rv = 0;
dflet 0:087b5655778d 1121 uint8_t buf[2];
dflet 0:087b5655778d 1122
dflet 0:087b5655778d 1123 buf[0] = MAKE_FH_BYTE1(MQTT_PINGREQ,
dflet 0:087b5655778d 1124 MAKE_FH_FLAGS(false, MQTT_QOS0, false));
dflet 0:087b5655778d 1125 buf[1] = 0;
dflet 0:087b5655778d 1126
dflet 0:087b5655778d 1127 /* Note: in case of error in network send, cl_ctx_send() may
dflet 0:087b5655778d 1128 try to terminate connection with server. */
dflet 0:087b5655778d 1129 rv = cl_ctx_seg1_send(cl_ctx, buf, 2, false, NULL);
dflet 0:087b5655778d 1130 if(rv > 0)
dflet 0:087b5655778d 1131 cl_ctx->flags |= rsp_flag;
dflet 0:087b5655778d 1132
dflet 0:087b5655778d 1133 return rv;
dflet 0:087b5655778d 1134 }
dflet 0:087b5655778d 1135
dflet 0:087b5655778d 1136 int32_t mqtt_pingreq_send(void *ctx)
dflet 0:087b5655778d 1137 {
dflet 0:087b5655778d 1138 Uart_Write((uint8_t*)"mqtt_pingreq_send\r\n");
dflet 0:087b5655778d 1139 int32_t rv = 0;
dflet 0:087b5655778d 1140
dflet 0:087b5655778d 1141 // MUTEX_LOCKIN();
dflet 0:087b5655778d 1142 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 1143 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 1144 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 1145 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 1146 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 1147 // shared resource.
dflet 0:087b5655778d 1148 rv = pingreq_send(CL_CTX(ctx), USER_PING_RSP_FLAG);
dflet 0:087b5655778d 1149
dflet 0:087b5655778d 1150 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 1151 // semaphore.
dflet 0:087b5655778d 1152 xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 1153
dflet 0:087b5655778d 1154 } else {
dflet 0:087b5655778d 1155 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 1156 // the shared resource safely.
dflet 0:087b5655778d 1157 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 1158 }
dflet 0:087b5655778d 1159 }
dflet 0:087b5655778d 1160
dflet 0:087b5655778d 1161 // MUTEX_UNLOCK();
dflet 0:087b5655778d 1162 return rv;
dflet 0:087b5655778d 1163 }
dflet 0:087b5655778d 1164
dflet 0:087b5655778d 1165 int32_t mqtt_disconn_send(void *ctx)
dflet 0:087b5655778d 1166 {
dflet 0:087b5655778d 1167 Uart_Write((uint8_t*)"mqtt_disconn_send\r\n");
dflet 0:087b5655778d 1168 uint8_t buf[2];
dflet 0:087b5655778d 1169
dflet 0:087b5655778d 1170 buf[0] = MAKE_FH_BYTE1(MQTT_DISCONNECT,
dflet 0:087b5655778d 1171 MAKE_FH_FLAGS(false, MQTT_QOS0, false));
dflet 0:087b5655778d 1172 buf[1] = 0;
dflet 0:087b5655778d 1173
dflet 0:087b5655778d 1174 // MUTEX_LOCKIN();
dflet 0:087b5655778d 1175 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 1176 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 1177 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 1178 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 1179 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 1180 // shared resource.
dflet 0:087b5655778d 1181 /* Note: in case of error in network send, cl_ctx_send() may
dflet 0:087b5655778d 1182 try to terminate connection with server. */
dflet 0:087b5655778d 1183 if(cl_ctx_seg1_send(CL_CTX(ctx), buf, 2, false, NULL) > 0) {
dflet 0:087b5655778d 1184 /* Terminate connection on application's request */
dflet 0:087b5655778d 1185 do_net_close_tx(CL_CTX(ctx), "DISCONN");
dflet 0:087b5655778d 1186 }
dflet 0:087b5655778d 1187
dflet 0:087b5655778d 1188 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 1189 // semaphore.
dflet 0:087b5655778d 1190 xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 1191
dflet 0:087b5655778d 1192 } else {
dflet 0:087b5655778d 1193 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 1194 // the shared resource safely.
dflet 0:087b5655778d 1195 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 1196 }
dflet 0:087b5655778d 1197 }
dflet 0:087b5655778d 1198
dflet 0:087b5655778d 1199 // MUTEX_UNLOCK();
dflet 0:087b5655778d 1200 return 0;
dflet 0:087b5655778d 1201 }
dflet 0:087b5655778d 1202
dflet 0:087b5655778d 1203 /*------------------------------------------------------------------------------
dflet 0:087b5655778d 1204 * MQTT RX Routines
dflet 0:087b5655778d 1205 *------------------------------------------------------------------------------
dflet 0:087b5655778d 1206 */
dflet 0:087b5655778d 1207 static bool ack1_wl_mqp_dispatch(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 1208 {
dflet 0:087b5655778d 1209 struct mqtt_ack_wlist *wlist = &CLIENT(cl_ctx)->qos_ack1_wl;
dflet 0:087b5655778d 1210 struct mqtt_packet *mqp = NULL;
dflet 0:087b5655778d 1211 bool rv = true;
dflet 0:087b5655778d 1212
dflet 0:087b5655778d 1213 for(mqp = wlist->head; mqp && (true == rv); mqp = mqp->next) {
dflet 0:087b5655778d 1214 uint8_t *buf = MQP_FHEADER_BUF(mqp);
dflet 0:087b5655778d 1215 mqp->fh_byte1 = *buf |= DUP_FLAG_VAL(true);
dflet 0:087b5655778d 1216
dflet 0:087b5655778d 1217 mqp->n_refs++; /* Ensures MQP is not freed by following */
dflet 0:087b5655778d 1218
dflet 0:087b5655778d 1219 /* Error or not, following routine tries to free up MQP */
dflet 0:087b5655778d 1220 if(cl_ctx_seg1_send(cl_ctx, buf, MQP_CONTENT_LEN(mqp),
dflet 0:087b5655778d 1221 false, mqp) <= 0)
dflet 0:087b5655778d 1222 rv = false;
dflet 0:087b5655778d 1223 }
dflet 0:087b5655778d 1224
dflet 0:087b5655778d 1225 return rv;
dflet 0:087b5655778d 1226 }
dflet 0:087b5655778d 1227
dflet 0:087b5655778d 1228 /* TBD candidate for common */
dflet 0:087b5655778d 1229 static bool ack2_msg_id_dispatch(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 1230 {
dflet 0:087b5655778d 1231 struct pub_qos2_cq *tx_cq = &CLIENT(cl_ctx)->qos2_tx_cq;
dflet 0:087b5655778d 1232 uint8_t rd_idx = tx_cq->rd_idx;
dflet 0:087b5655778d 1233 uint8_t n_free = tx_cq->n_free;
dflet 0:087b5655778d 1234 bool rv = true;
dflet 0:087b5655778d 1235 uint8_t i = 0;
dflet 0:087b5655778d 1236
dflet 0:087b5655778d 1237 for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free) && (true == rv); i++) {
dflet 0:087b5655778d 1238 if(vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1,
dflet 0:087b5655778d 1239 true, tx_cq->id_vec[i]) <= 0)
dflet 0:087b5655778d 1240 rv = false;
dflet 0:087b5655778d 1241 }
dflet 0:087b5655778d 1242
dflet 0:087b5655778d 1243 return rv;
dflet 0:087b5655778d 1244 }
dflet 0:087b5655778d 1245
dflet 0:087b5655778d 1246 static void session_resume(struct client_ctx *cl_ctx)
dflet 0:087b5655778d 1247 {
dflet 0:087b5655778d 1248 DBG_INFO("C: Re-send ACK awaited QoS1/2 msgs to net %d\n\r",
dflet 0:087b5655778d 1249 cl_ctx->net);
dflet 0:087b5655778d 1250
dflet 0:087b5655778d 1251 if(ack1_wl_mqp_dispatch(cl_ctx))
dflet 0:087b5655778d 1252 ack2_msg_id_dispatch(cl_ctx);
dflet 0:087b5655778d 1253
dflet 0:087b5655778d 1254 return;
dflet 0:087b5655778d 1255 }
dflet 0:087b5655778d 1256
dflet 0:087b5655778d 1257 static bool ack1_wl_rmfree(struct mqtt_ack_wlist *wl, uint16_t msg_id)
dflet 0:087b5655778d 1258 {
dflet 0:087b5655778d 1259 struct mqtt_packet *mqp = mqp_ack_wlist_remove(wl, msg_id);
dflet 0:087b5655778d 1260 if(NULL != mqp) {
dflet 0:087b5655778d 1261 mqp_free(mqp);
dflet 0:087b5655778d 1262 return true;
dflet 0:087b5655778d 1263 }
dflet 0:087b5655778d 1264
dflet 0:087b5655778d 1265 USR_INFO("Err: Unexpected ACK w/ ID 0x%04x\n\r", msg_id);
dflet 0:087b5655778d 1266
dflet 0:087b5655778d 1267 return false;
dflet 0:087b5655778d 1268 }
dflet 0:087b5655778d 1269
dflet 0:087b5655778d 1270 static bool _proc_pub_rec_rx(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 1271 {
dflet 0:087b5655778d 1272 /* Follow-up messages for QOS2 PUB must be transacted in the
dflet 0:087b5655778d 1273 same order as the initial sequence of QOS2 PUB dispatches.
dflet 0:087b5655778d 1274 Therefore, checking the first entry should be OK
dflet 0:087b5655778d 1275 */
dflet 0:087b5655778d 1276 struct mqtt_packet *mqp = CLIENT(cl_ctx)->qos_ack1_wl.head;
dflet 0:087b5655778d 1277
dflet 0:087b5655778d 1278 if((msg_id == mqp->msg_id) && ack2_msg_id_logup(cl_ctx, msg_id)) {
dflet 0:087b5655778d 1279
dflet 0:087b5655778d 1280 ack1_wl_rmfree(&CLIENT(cl_ctx)->qos_ack1_wl, msg_id);
dflet 0:087b5655778d 1281
dflet 0:087b5655778d 1282 vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1,
dflet 0:087b5655778d 1283 true, msg_id);
dflet 0:087b5655778d 1284
dflet 0:087b5655778d 1285 return true;
dflet 0:087b5655778d 1286 }
dflet 0:087b5655778d 1287
dflet 0:087b5655778d 1288 return false; /* Unexpected PUBREC or QOS2 store exceeded */
dflet 0:087b5655778d 1289 }
dflet 0:087b5655778d 1290
dflet 0:087b5655778d 1291 static bool _proc_pub_rel_rx(struct client_ctx *cl_ctx, uint16_t msg_id)
dflet 0:087b5655778d 1292 {
dflet 0:087b5655778d 1293 /* For a PUB-REL RX, send PUBCOMP to let server make progress */
dflet 0:087b5655778d 1294 vh_msg_send(cl_ctx, MQTT_PUBCOMP, MQTT_QOS0, true, msg_id);
dflet 0:087b5655778d 1295
dflet 0:087b5655778d 1296 if(qos2_pub_rx_is_done(cl_ctx, msg_id))
dflet 0:087b5655778d 1297 qos2_pub_rx_unlog(cl_ctx, msg_id); /* Expunge record */
dflet 0:087b5655778d 1298
dflet 0:087b5655778d 1299 return true;
dflet 0:087b5655778d 1300 }
dflet 0:087b5655778d 1301
dflet 0:087b5655778d 1302 /*
dflet 0:087b5655778d 1303 Process ACK Message from Broker.
dflet 0:087b5655778d 1304 Returns true on success, otherwise false.
dflet 0:087b5655778d 1305 Used for: PUBACK, SUBACK and UNSUBACK
dflet 0:087b5655778d 1306 */
dflet 0:087b5655778d 1307 static
dflet 0:087b5655778d 1308 bool _proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1309 {
dflet 0:087b5655778d 1310 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
dflet 0:087b5655778d 1311 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 1312 uint16_t msg_id = mqp_raw->msg_id;
dflet 0:087b5655778d 1313 uint32_t len = mqp_raw->pl_len;
dflet 0:087b5655778d 1314
dflet 0:087b5655778d 1315 /* Caters to SUB-ACK, UNSUB-ACK and PUB-ACK Messages */
dflet 0:087b5655778d 1316 if(false == ack1_wl_rmfree(&client->qos_ack1_wl, msg_id))
dflet 0:087b5655778d 1317 return false; /* Err: MSG_ID was not awaited */
dflet 0:087b5655778d 1318
dflet 0:087b5655778d 1319 if(ctx_cbs->ack_notify)
dflet 0:087b5655778d 1320 ctx_cbs->ack_notify(client->app, mqp_raw->msg_type, msg_id,
dflet 0:087b5655778d 1321 len? MQP_PAYLOAD_BUF(mqp_raw): NULL,
dflet 0:087b5655778d 1322 len);
dflet 0:087b5655778d 1323 return true;
dflet 0:087b5655778d 1324 }
dflet 0:087b5655778d 1325
dflet 0:087b5655778d 1326 static
dflet 0:087b5655778d 1327 bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1328 {
dflet 0:087b5655778d 1329 uint8_t msg_type = mqp_raw->msg_type;
dflet 0:087b5655778d 1330 bool rv = false;
dflet 0:087b5655778d 1331 uint16_t msg_id = 0;
dflet 0:087b5655778d 1332
dflet 0:087b5655778d 1333 if(false == mqp_proc_msg_id_ack_rx(mqp_raw, MQTT_SUBACK == msg_type))
dflet 0:087b5655778d 1334 return rv; /* Problem in contents received from server */
dflet 0:087b5655778d 1335
dflet 0:087b5655778d 1336 msg_id = mqp_raw->msg_id;
dflet 0:087b5655778d 1337
dflet 0:087b5655778d 1338 if(MQTT_PUBREC == msg_type) {
dflet 0:087b5655778d 1339 rv = _proc_pub_rec_rx(cl_ctx, msg_id);
dflet 0:087b5655778d 1340 if(rv)
dflet 0:087b5655778d 1341 MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */
dflet 0:087b5655778d 1342
dflet 0:087b5655778d 1343 } else if(MQTT_PUBREL == msg_type) {
dflet 0:087b5655778d 1344 rv = _proc_pub_rel_rx(cl_ctx, msg_id);
dflet 0:087b5655778d 1345 if(rv)
dflet 0:087b5655778d 1346 MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */
dflet 0:087b5655778d 1347 } else if(MQTT_PUBCOMP == msg_type) {
dflet 0:087b5655778d 1348 rv = ack2_msg_id_unlog(cl_ctx, msg_id);
dflet 0:087b5655778d 1349 } else {
dflet 0:087b5655778d 1350 rv = _proc_ack_msg_rx(cl_ctx, mqp_raw);
dflet 0:087b5655778d 1351 }
dflet 0:087b5655778d 1352
dflet 0:087b5655778d 1353 return rv;
dflet 0:087b5655778d 1354 }
dflet 0:087b5655778d 1355
dflet 0:087b5655778d 1356 static
dflet 0:087b5655778d 1357 bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1358 {
dflet 0:087b5655778d 1359 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
dflet 0:087b5655778d 1360 bool good_pub = mqp_proc_pub_rx(mqp_raw);
dflet 0:087b5655778d 1361 uint8_t B = mqp_raw->fh_byte1;
dflet 0:087b5655778d 1362 enum mqtt_qos qos = ENUM_QOS(B);
dflet 0:087b5655778d 1363 uint16_t msg_id = 0;
dflet 0:087b5655778d 1364
dflet 0:087b5655778d 1365 if(false == good_pub)
dflet 0:087b5655778d 1366 return false; /* Didn't get nicely composed PUB Packet */
dflet 0:087b5655778d 1367
dflet 0:087b5655778d 1368 msg_id = mqp_raw->msg_id;
dflet 0:087b5655778d 1369
dflet 0:087b5655778d 1370 /* Irrespective of the result of the ACK through vh_msg_send(),
dflet 0:087b5655778d 1371 the implementation has chosen to process the good PUB packet.
dflet 0:087b5655778d 1372 Any error will be handled in next iteration of rx processing.
dflet 0:087b5655778d 1373 */
dflet 0:087b5655778d 1374 if(MQTT_QOS1 == qos)
dflet 0:087b5655778d 1375 vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id);
dflet 0:087b5655778d 1376
dflet 0:087b5655778d 1377 if(MQTT_QOS2 == qos) {
dflet 0:087b5655778d 1378 /* Ensuring "only once" philosophy for MQTT QoS2 PUBs */
dflet 0:087b5655778d 1379 if(qos2_pub_rx_is_done(cl_ctx, msg_id)) {
dflet 0:087b5655778d 1380 /* Already delivered. Drop it & do not report */
dflet 0:087b5655778d 1381 MQP_RX_DO_NOT_RPT_SET(mqp_raw);
dflet 0:087b5655778d 1382 return true; /* No more follow-up; all's good */
dflet 0:087b5655778d 1383 }
dflet 0:087b5655778d 1384
dflet 0:087b5655778d 1385 if(false == qos2_pub_rx_logup(cl_ctx, msg_id))
dflet 0:087b5655778d 1386 return false; /* Failed to record New RX PUB */
dflet 0:087b5655778d 1387
dflet 0:087b5655778d 1388 vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id);
dflet 0:087b5655778d 1389 }
dflet 0:087b5655778d 1390
dflet 0:087b5655778d 1391 /* QoS obliations completed, present PUBLISH RX packet to app */
dflet 0:087b5655778d 1392 if(ctx_cbs->publish_rx) {
dflet 0:087b5655778d 1393 /* App has chosen the callback method to receive PKT */
dflet 0:087b5655778d 1394 mqp_raw->n_refs++; /* Make app owner of this packet */
dflet 0:087b5655778d 1395 if(ctx_cbs->publish_rx(CLIENT(cl_ctx)->app, BOOL_DUP(B),
dflet 0:087b5655778d 1396 qos, BOOL_RETAIN(B), mqp_raw)) {
dflet 0:087b5655778d 1397 /* App has no use of PKT any more, so free it */
dflet 0:087b5655778d 1398 mqp_raw->n_refs--; /* Take back ownership */
dflet 0:087b5655778d 1399 }
dflet 0:087b5655778d 1400 }
dflet 0:087b5655778d 1401
dflet 0:087b5655778d 1402 return true;
dflet 0:087b5655778d 1403 }
dflet 0:087b5655778d 1404
dflet 0:087b5655778d 1405 static
dflet 0:087b5655778d 1406 bool proc_connack_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1407 {
dflet 0:087b5655778d 1408 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
dflet 0:087b5655778d 1409 uint8_t *buf = MQP_VHEADER_BUF(mqp_raw);
dflet 0:087b5655778d 1410
dflet 0:087b5655778d 1411 mqp_raw->vh_len += 2;
dflet 0:087b5655778d 1412 mqp_raw->pl_len -= 2;
dflet 0:087b5655778d 1413
dflet 0:087b5655778d 1414 if(0 != mqp_raw->pl_len)
dflet 0:087b5655778d 1415 return false; /* There is no payload in message */
dflet 0:087b5655778d 1416
dflet 0:087b5655778d 1417 cl_ctx->flags &= ~(DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG);
dflet 0:087b5655778d 1418
dflet 0:087b5655778d 1419 if(VHB_CONNACK_RC(buf))
dflet 0:087b5655778d 1420 /* Server has refused the connection, inform the app */
dflet 0:087b5655778d 1421 goto proc_connack_rx_exit1;
dflet 0:087b5655778d 1422
dflet 0:087b5655778d 1423 cl_ctx->flags |= NOW_CONNECTED_FLAG;
dflet 0:087b5655778d 1424 cl_ctx_timeout_update(cl_ctx, net_ops->time()); /* start KA */
dflet 0:087b5655778d 1425
dflet 0:087b5655778d 1426 if(IS_CLN_SESSION(cl_ctx))
dflet 0:087b5655778d 1427 session_delete(cl_ctx);
dflet 0:087b5655778d 1428 else
dflet 0:087b5655778d 1429 session_resume(cl_ctx);
dflet 0:087b5655778d 1430
dflet 0:087b5655778d 1431 proc_connack_rx_exit1:
dflet 0:087b5655778d 1432 if(ctx_cbs->ack_notify)
dflet 0:087b5655778d 1433 ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, mqp_raw->msg_type,
dflet 0:087b5655778d 1434 0, buf, 2);
dflet 0:087b5655778d 1435
dflet 0:087b5655778d 1436 return true;
dflet 0:087b5655778d 1437 }
dflet 0:087b5655778d 1438
dflet 0:087b5655778d 1439 static
dflet 0:087b5655778d 1440 bool proc_pingrsp_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1441 {
dflet 0:087b5655778d 1442 struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx);
dflet 0:087b5655778d 1443
dflet 0:087b5655778d 1444 if(0 != mqp_raw->pl_len)
dflet 0:087b5655778d 1445 return false;
dflet 0:087b5655778d 1446
dflet 0:087b5655778d 1447 if(AWAITS_KA_PING(cl_ctx)) {
dflet 0:087b5655778d 1448 cl_ctx->flags &= ~KA_PINGER_RSP_FLAG;
dflet 0:087b5655778d 1449 return true;
dflet 0:087b5655778d 1450 }
dflet 0:087b5655778d 1451
dflet 0:087b5655778d 1452 if(AWAITS_PINGRSP(cl_ctx)) {
dflet 0:087b5655778d 1453 cl_ctx->flags &= ~USER_PING_RSP_FLAG;
dflet 0:087b5655778d 1454 if(ctx_cbs->ack_notify)
dflet 0:087b5655778d 1455 ctx_cbs->ack_notify(CLIENT(cl_ctx)->app,
dflet 0:087b5655778d 1456 mqp_raw->msg_type,
dflet 0:087b5655778d 1457 0, NULL, 0);
dflet 0:087b5655778d 1458 return true;
dflet 0:087b5655778d 1459 }
dflet 0:087b5655778d 1460
dflet 0:087b5655778d 1461 return false;
dflet 0:087b5655778d 1462 }
dflet 0:087b5655778d 1463
dflet 0:087b5655778d 1464 static
dflet 0:087b5655778d 1465 bool conn_sent_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1466 {
dflet 0:087b5655778d 1467 bool rv = false;
dflet 0:087b5655778d 1468
dflet 0:087b5655778d 1469 switch(mqp_raw->msg_type) {
dflet 0:087b5655778d 1470
dflet 0:087b5655778d 1471 case MQTT_CONNACK:
dflet 0:087b5655778d 1472 /* Changes client_ctx->flags to CONNECTED */
dflet 0:087b5655778d 1473 rv = proc_connack_rx(cl_ctx, mqp_raw);
dflet 0:087b5655778d 1474 break;
dflet 0:087b5655778d 1475
dflet 0:087b5655778d 1476 default:
dflet 0:087b5655778d 1477 break;
dflet 0:087b5655778d 1478 }
dflet 0:087b5655778d 1479
dflet 0:087b5655778d 1480 return rv;
dflet 0:087b5655778d 1481 }
dflet 0:087b5655778d 1482
dflet 0:087b5655778d 1483 static
dflet 0:087b5655778d 1484 bool connected_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1485 {
dflet 0:087b5655778d 1486 bool rv = false;
dflet 0:087b5655778d 1487
dflet 0:087b5655778d 1488 switch(mqp_raw->msg_type) {
dflet 0:087b5655778d 1489
dflet 0:087b5655778d 1490 case MQTT_SUBACK:
dflet 0:087b5655778d 1491 case MQTT_PUBACK:
dflet 0:087b5655778d 1492 case MQTT_PUBREC:
dflet 0:087b5655778d 1493 case MQTT_PUBREL:
dflet 0:087b5655778d 1494 case MQTT_PUBCOMP:
dflet 0:087b5655778d 1495 case MQTT_UNSUBACK:
dflet 0:087b5655778d 1496 rv = proc_ack_msg_rx(cl_ctx, mqp_raw);
dflet 0:087b5655778d 1497 break;
dflet 0:087b5655778d 1498
dflet 0:087b5655778d 1499 case MQTT_PINGRSP:
dflet 0:087b5655778d 1500 rv = proc_pingrsp_rx(cl_ctx, mqp_raw);
dflet 0:087b5655778d 1501 break;
dflet 0:087b5655778d 1502
dflet 0:087b5655778d 1503 case MQTT_PUBLISH:
dflet 0:087b5655778d 1504 rv = proc_pub_msg_rx(cl_ctx, mqp_raw);
dflet 0:087b5655778d 1505 break;
dflet 0:087b5655778d 1506
dflet 0:087b5655778d 1507 case MQTT_CONNACK: /* not expected */
dflet 0:087b5655778d 1508 default:
dflet 0:087b5655778d 1509 break;
dflet 0:087b5655778d 1510 }
dflet 0:087b5655778d 1511
dflet 0:087b5655778d 1512 return rv;
dflet 0:087b5655778d 1513 }
dflet 0:087b5655778d 1514
dflet 0:087b5655778d 1515 static bool process_recv(struct client_ctx *cl_ctx,
dflet 0:087b5655778d 1516 struct mqtt_packet *mqp_raw)
dflet 0:087b5655778d 1517 {
dflet 0:087b5655778d 1518 bool rv;
dflet 0:087b5655778d 1519
dflet 0:087b5655778d 1520 USR_INFO("C: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d [@ %u]\n\r",
dflet 0:087b5655778d 1521 mqp_raw->fh_byte1, cl_ctx->net, net_ops->time());
dflet 0:087b5655778d 1522
dflet 0:087b5655778d 1523 /* Working Principle: Only RX processing errors should be
dflet 0:087b5655778d 1524 reported as 'false'. Status of TX as a follow-up to RX
dflet 0:087b5655778d 1525 messages need not be reported by the xyz_rx() routines.
dflet 0:087b5655778d 1526 Error observed in TX is either dealt in next iteration
dflet 0:087b5655778d 1527 of RX loop (in case, there is a dedicated RX task for
dflet 0:087b5655778d 1528 the CTX) or in TX routine itself (in case, there is no
dflet 0:087b5655778d 1529 dedicated RX task for the CTX).
dflet 0:087b5655778d 1530 */
dflet 0:087b5655778d 1531 rv = AWAITS_CONNACK(cl_ctx)?
dflet 0:087b5655778d 1532 conn_sent_state_rx(cl_ctx, mqp_raw) :
dflet 0:087b5655778d 1533 connected_state_rx(cl_ctx, mqp_raw);
dflet 0:087b5655778d 1534
dflet 0:087b5655778d 1535 DBG_INFO("C: Msg w/ ID 0x%04x, processing status: %s\n\r",
dflet 0:087b5655778d 1536 mqp_raw->msg_id, rv? "Good" : "Fail");
dflet 0:087b5655778d 1537
dflet 0:087b5655778d 1538 return rv;
dflet 0:087b5655778d 1539 }
dflet 0:087b5655778d 1540
dflet 0:087b5655778d 1541 static int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, void *ctx)
dflet 0:087b5655778d 1542 {
dflet 0:087b5655778d 1543 bool timed_out = false;
dflet 0:087b5655778d 1544 int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, &timed_out, ctx);
dflet 0:087b5655778d 1545 if(rv <= 0) {
dflet 0:087b5655778d 1546 USR_INFO("C: Net %d, Raw Error %d, Time Out: %c\n\r",
dflet 0:087b5655778d 1547 net, rv, timed_out? 'Y' : 'N');
dflet 0:087b5655778d 1548
dflet 0:087b5655778d 1549 if(timed_out)
dflet 0:087b5655778d 1550 rv = MQP_ERR_TIMEOUT;
dflet 0:087b5655778d 1551 }
dflet 0:087b5655778d 1552
dflet 0:087b5655778d 1553 return rv;
dflet 0:087b5655778d 1554 }
dflet 0:087b5655778d 1555
dflet 0:087b5655778d 1556 /*
dflet 0:087b5655778d 1557 MQTT 3.1.1 implementation
dflet 0:087b5655778d 1558 -------------------------
dflet 0:087b5655778d 1559
dflet 0:087b5655778d 1560 Keep Alive Time is maxmimum interval within which a client should send a
dflet 0:087b5655778d 1561 packet to broker. If there are either no packets to be sent to broker or
dflet 0:087b5655778d 1562 no retries left, then client is expected to a send a PINGREQ within Keep
dflet 0:087b5655778d 1563 Alive Time. Broker should respond by sending PINGRSP with-in reasonable
dflet 0:087b5655778d 1564 time of 'wait_secs'. If Keep Alive Time is set as 0, then client is not
dflet 0:087b5655778d 1565 expected to be disconnected due to in-activity of MQTT messages. Value
dflet 0:087b5655778d 1566 of 'wait_secs' is assumed to be quite smaller than (non-zero) 'ka_secs'.
dflet 0:087b5655778d 1567 */
dflet 0:087b5655778d 1568 static void conn2used_ctxs(uint32_t wait_secs)
dflet 0:087b5655778d 1569 {
dflet 0:087b5655778d 1570
dflet 0:087b5655778d 1571 while(conn_ctxs) {
dflet 0:087b5655778d 1572 struct client_ctx *cl_ctx = conn_ctxs;
dflet 0:087b5655778d 1573 conn_ctxs = conn_ctxs->next;
dflet 0:087b5655778d 1574
dflet 0:087b5655778d 1575 cl_ctx_timeout_insert(&used_ctxs, cl_ctx);
dflet 0:087b5655778d 1576 }
dflet 0:087b5655778d 1577 }
dflet 0:087b5655778d 1578
dflet 0:087b5655778d 1579 static int32_t single_ctx_ka_sequence(struct client_ctx *cl_ctx, uint32_t wait_secs)
dflet 0:087b5655778d 1580 {
dflet 0:087b5655778d 1581
dflet 0:087b5655778d 1582 uint32_t now_secs = net_ops->time();
dflet 0:087b5655778d 1583
dflet 0:087b5655778d 1584 if(AWAITS_CONNACK(cl_ctx) && CFG_CONNACK_TO(cl_ctx)) {
dflet 0:087b5655778d 1585 cl_ctx->timeout += wait_secs; /* Set CONNACK timeout value */
dflet 0:087b5655778d 1586 cl_ctx->flags &= ~DO_CONNACK_TO_FLAG;
dflet 0:087b5655778d 1587 }
dflet 0:087b5655778d 1588
dflet 0:087b5655778d 1589 if(cl_ctx->timeout > now_secs) {
dflet 0:087b5655778d 1590 return 1; /* Still have time for next message transaction */
dflet 0:087b5655778d 1591 }
dflet 0:087b5655778d 1592 if(is_connected(cl_ctx)) {
dflet 0:087b5655778d 1593 /* Timeout has happened. Check for PINGRESP if PINGREQ done.
dflet 0:087b5655778d 1594 Otherwise, send PINGREQ (Are You there?) to the server. */
dflet 0:087b5655778d 1595 if(AWAITS_KA_PING(cl_ctx)) {
dflet 0:087b5655778d 1596 goto single_ctx_ka_sequence_exit1; /* No PINGRESP */
dflet 0:087b5655778d 1597 }
dflet 0:087b5655778d 1598 return pingreq_send(cl_ctx, KA_PINGER_RSP_FLAG); /* Hello! */
dflet 0:087b5655778d 1599 }
dflet 0:087b5655778d 1600
dflet 0:087b5655778d 1601 single_ctx_ka_sequence_exit1:
dflet 0:087b5655778d 1602
dflet 0:087b5655778d 1603 USR_INFO("C: Net %d, no RX MSG in reasonable time\n\r", cl_ctx->net);
dflet 0:087b5655778d 1604 return -1;
dflet 0:087b5655778d 1605 }
dflet 0:087b5655778d 1606
dflet 0:087b5655778d 1607 static uint32_t single_ctx_adj_wait_secs_get(struct client_ctx *cl_ctx, uint32_t wait_secs)
dflet 0:087b5655778d 1608 {
dflet 0:087b5655778d 1609
dflet 0:087b5655778d 1610 return (KA_TIMEOUT_NONE != cl_ctx->timeout)?
dflet 0:087b5655778d 1611 MIN(cl_ctx->timeout - net_ops->time(), wait_secs) : wait_secs;
dflet 0:087b5655778d 1612 }
dflet 0:087b5655778d 1613
dflet 0:087b5655778d 1614 static int32_t single_ctx_rx_prep(struct client_ctx *cl_ctx, uint32_t *secs2wait)
dflet 0:087b5655778d 1615 {
dflet 0:087b5655778d 1616
dflet 0:087b5655778d 1617 int32_t rv;
dflet 0:087b5655778d 1618
dflet 0:087b5655778d 1619 if(-1 == cl_ctx->net)
dflet 0:087b5655778d 1620 return MQP_ERR_NOTCONN; /* Likely for a ctx w/o a receive task */
dflet 0:087b5655778d 1621
dflet 0:087b5655778d 1622 if(NEED_NET_CLOSE(cl_ctx))
dflet 0:087b5655778d 1623 rv = MQP_ERR_NOTCONN;
dflet 0:087b5655778d 1624 else if(0 > single_ctx_ka_sequence(cl_ctx, *secs2wait))
dflet 0:087b5655778d 1625 rv = MQP_ERR_NETWORK;
dflet 0:087b5655778d 1626 else {
dflet 0:087b5655778d 1627 *secs2wait = single_ctx_adj_wait_secs_get(cl_ctx, *secs2wait);
dflet 0:087b5655778d 1628 return 1;
dflet 0:087b5655778d 1629 }
dflet 0:087b5655778d 1630
dflet 0:087b5655778d 1631 do_net_close_rx(cl_ctx, rv);
dflet 0:087b5655778d 1632 return rv;
dflet 0:087b5655778d 1633 }
dflet 0:087b5655778d 1634
dflet 0:087b5655778d 1635 static
dflet 0:087b5655778d 1636 int32_t proc_ctx_data_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
dflet 0:087b5655778d 1637 uint32_t wait_secs, void **app)
dflet 0:087b5655778d 1638 {
dflet 0:087b5655778d 1639
dflet 0:087b5655778d 1640 int32_t rv = MQP_ERR_NOTCONN;
dflet 0:087b5655778d 1641 int32_t net = cl_ctx->net;
dflet 0:087b5655778d 1642
dflet 0:087b5655778d 1643 *app = cl_ctx->usr;
dflet 0:087b5655778d 1644
dflet 0:087b5655778d 1645 rv = net_recv(net, mqp, wait_secs, (void*)cl_ctx);
dflet 0:087b5655778d 1646
dflet 0:087b5655778d 1647 // MUTEX_LOCKIN();
dflet 0:087b5655778d 1648 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 1649 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 1650 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 1651 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 1652 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 1653 // shared resource.
dflet 0:087b5655778d 1654 if(rv > 0) {
dflet 0:087b5655778d 1655 if(false == process_recv(cl_ctx, mqp)) {
dflet 0:087b5655778d 1656 Uart_Write((uint8_t*)"MQP_ERR_CONTENT\r\n");
dflet 0:087b5655778d 1657 rv = MQP_ERR_CONTENT;
dflet 0:087b5655778d 1658 }
dflet 0:087b5655778d 1659 }
dflet 0:087b5655778d 1660 /* RX: close the network connection to the server for this context, if
dflet 0:087b5655778d 1661 (a) there is a processing / protocol error other than time-out
dflet 0:087b5655778d 1662 (b) A good MQTT CONNACK has a return code - connection refused
dflet 0:087b5655778d 1663 */
dflet 0:087b5655778d 1664 if(((rv < 0) && (rv != MQP_ERR_TIMEOUT)) ||
dflet 0:087b5655778d 1665 ((MQTT_CONNACK == mqp->msg_type) &&
dflet 0:087b5655778d 1666 MQP_CONNACK_RC(mqp))) {
dflet 0:087b5655778d 1667 do_net_close_rx(cl_ctx, rv);
dflet 0:087b5655778d 1668 }
dflet 0:087b5655778d 1669
dflet 0:087b5655778d 1670 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 1671 // semaphore.
dflet 0:087b5655778d 1672 xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 1673
dflet 0:087b5655778d 1674 } else {
dflet 0:087b5655778d 1675 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 1676 // the shared resource safely.
dflet 0:087b5655778d 1677 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 1678 }
dflet 0:087b5655778d 1679 }
dflet 0:087b5655778d 1680
dflet 0:087b5655778d 1681 // MUTEX_UNLOCK();
dflet 0:087b5655778d 1682 return rv;
dflet 0:087b5655778d 1683 }
dflet 0:087b5655778d 1684
dflet 0:087b5655778d 1685 static int32_t mqp_setup_proc_ctx_data_recv(struct client_ctx *cl_ctx,
dflet 0:087b5655778d 1686 struct mqtt_packet *mqp,
dflet 0:087b5655778d 1687 uint32_t wait_secs, void **app)
dflet 0:087b5655778d 1688 {
dflet 0:087b5655778d 1689 struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp;
dflet 0:087b5655778d 1690 int32_t rv;
dflet 0:087b5655778d 1691
dflet 0:087b5655778d 1692 if(NULL != mqp) {
dflet 0:087b5655778d 1693 /* Input MQP must be same as MQP for partial RX, if any */
dflet 0:087b5655778d 1694 if(rx_mqp) {
dflet 0:087b5655778d 1695 if(mqp != rx_mqp)
dflet 0:087b5655778d 1696 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 1697 } else
dflet 0:087b5655778d 1698 mqp_reset(mqp);
dflet 0:087b5655778d 1699 }
dflet 0:087b5655778d 1700
dflet 0:087b5655778d 1701 if(NULL == mqp) {
dflet 0:087b5655778d 1702 mqp = rx_mqp? rx_mqp : mqp_client_recv_alloc(0);
dflet 0:087b5655778d 1703 if(NULL == mqp)
dflet 0:087b5655778d 1704 return MQP_ERR_PKT_AVL;
dflet 0:087b5655778d 1705 }
dflet 0:087b5655778d 1706
dflet 0:087b5655778d 1707 rv = proc_ctx_data_recv(cl_ctx, mqp, wait_secs, app);
dflet 0:087b5655778d 1708 if(rv == MQP_ERR_TIMEOUT) {
dflet 0:087b5655778d 1709 CLIENT(cl_ctx)->rx_mqp = mqp; /* Save partial RX MQP */
dflet 0:087b5655778d 1710 } else {
dflet 0:087b5655778d 1711 /* Control reaches here due to either an error in RX or the
dflet 0:087b5655778d 1712 completion of RX. In both the cases, the MQP needs to be
dflet 0:087b5655778d 1713 detached and processed. For completion of RX:
dflet 0:087b5655778d 1714 callback mode: Application has used up MQP data; free it
dflet 0:087b5655778d 1715 Non-callback mode: Application will now use complete MQP
dflet 0:087b5655778d 1716 */
dflet 0:087b5655778d 1717 CLIENT(cl_ctx)->rx_mqp = NULL;
dflet 0:087b5655778d 1718 if(mqp->free)
dflet 0:087b5655778d 1719 mqp_free_locked(mqp); /* For only callback mode */
dflet 0:087b5655778d 1720 }
dflet 0:087b5655778d 1721
dflet 0:087b5655778d 1722 return rv;
dflet 0:087b5655778d 1723 }
dflet 0:087b5655778d 1724
dflet 0:087b5655778d 1725 static int32_t cl_ctx_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp,
dflet 0:087b5655778d 1726 uint32_t wait_secs)
dflet 0:087b5655778d 1727 {
dflet 0:087b5655778d 1728
dflet 0:087b5655778d 1729 void *app = NULL;
dflet 0:087b5655778d 1730 int32_t rv = 0;
dflet 0:087b5655778d 1731
dflet 0:087b5655778d 1732 do {
dflet 0:087b5655778d 1733 if(mqp && (NULL == CLIENT(cl_ctx)->rx_mqp))
dflet 0:087b5655778d 1734 mqp_reset(mqp);
dflet 0:087b5655778d 1735
dflet 0:087b5655778d 1736 rv = single_ctx_rx_prep(cl_ctx, &wait_secs);
dflet 0:087b5655778d 1737 if(rv > 0)
dflet 0:087b5655778d 1738 rv = mqp_setup_proc_ctx_data_recv(cl_ctx, mqp,
dflet 0:087b5655778d 1739 wait_secs,
dflet 0:087b5655778d 1740 &app);
dflet 0:087b5655778d 1741
dflet 0:087b5655778d 1742 /* 'mqp' must be valid, if rv > 0 but including further
dflet 0:087b5655778d 1743 & additional check for sake of static cod eanalysis.*/
dflet 0:087b5655778d 1744 } while((rv > 0) && mqp && MQP_RX_DO_NOT_RPT_COR(mqp));
dflet 0:087b5655778d 1745
dflet 0:087b5655778d 1746 return rv;
dflet 0:087b5655778d 1747 }
dflet 0:087b5655778d 1748
dflet 0:087b5655778d 1749 int32_t mqtt_client_ctx_await_msg(void *ctx, uint8_t msg_type, struct mqtt_packet *mqp,
dflet 0:087b5655778d 1750 uint32_t wait_secs)
dflet 0:087b5655778d 1751 {
dflet 0:087b5655778d 1752 struct client_ctx *cl_ctx = CL_CTX(ctx);
dflet 0:087b5655778d 1753 int32_t rv = -1;
dflet 0:087b5655778d 1754
dflet 0:087b5655778d 1755 if((NULL == cl_ctx) || (NULL == mqp))
dflet 0:087b5655778d 1756 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 1757
dflet 0:087b5655778d 1758 do {
dflet 0:087b5655778d 1759 rv = cl_ctx_recv(cl_ctx, mqp, wait_secs);
dflet 0:087b5655778d 1760
dflet 0:087b5655778d 1761 } while((rv > 0) &&
dflet 0:087b5655778d 1762 (0 != msg_type) && (msg_type != mqp->msg_type));
dflet 0:087b5655778d 1763
dflet 0:087b5655778d 1764 return rv;
dflet 0:087b5655778d 1765 }
dflet 0:087b5655778d 1766
dflet 0:087b5655778d 1767 int32_t mqtt_client_ctx_run(void *ctx, uint32_t wait_secs)
dflet 0:087b5655778d 1768 {
dflet 0:087b5655778d 1769
dflet 0:087b5655778d 1770 int32_t rv;
dflet 0:087b5655778d 1771
dflet 0:087b5655778d 1772 if(NULL == ctx)
dflet 0:087b5655778d 1773 return MQP_ERR_FNPARAM;
dflet 0:087b5655778d 1774
dflet 0:087b5655778d 1775 do {
dflet 0:087b5655778d 1776 rv = cl_ctx_recv(CL_CTX(ctx), NULL, wait_secs);
dflet 0:087b5655778d 1777
dflet 0:087b5655778d 1778 } while(rv > 0);
dflet 0:087b5655778d 1779
dflet 0:087b5655778d 1780 return rv;
dflet 0:087b5655778d 1781 }
dflet 0:087b5655778d 1782
dflet 0:087b5655778d 1783 static struct client_ctx *group_ctxs_ka_sequence(uint32_t wait_secs) {
dflet 0:087b5655778d 1784
dflet 0:087b5655778d 1785 struct client_ctx *cl_ctx = used_ctxs;
dflet 0:087b5655778d 1786
dflet 0:087b5655778d 1787 while(cl_ctx) {
dflet 0:087b5655778d 1788 struct client_ctx *next = cl_ctx->next;
dflet 0:087b5655778d 1789 if(single_ctx_rx_prep(cl_ctx, &wait_secs) < 0) {
dflet 0:087b5655778d 1790 /* 'CTX' no more eligible for operation
dflet 0:087b5655778d 1791 and has been removed from used_list */
dflet 0:087b5655778d 1792 if(false == grp_has_cbfn)
dflet 0:087b5655778d 1793 return cl_ctx;
dflet 0:087b5655778d 1794 }
dflet 0:087b5655778d 1795
dflet 0:087b5655778d 1796 cl_ctx = next;
dflet 0:087b5655778d 1797 }
dflet 0:087b5655778d 1798
dflet 0:087b5655778d 1799 return NULL;
dflet 0:087b5655778d 1800 }
dflet 0:087b5655778d 1801
dflet 0:087b5655778d 1802 #define IO_MON_NO_TIMEOUT (0xFFFFFFFF)
dflet 0:087b5655778d 1803
dflet 0:087b5655778d 1804 static uint32_t group_ctxs_adj_wait_secs_get(uint32_t wait_secs)
dflet 0:087b5655778d 1805 {
dflet 0:087b5655778d 1806
dflet 0:087b5655778d 1807 return used_ctxs?
dflet 0:087b5655778d 1808 single_ctx_adj_wait_secs_get(used_ctxs, wait_secs) : wait_secs;
dflet 0:087b5655778d 1809 }
dflet 0:087b5655778d 1810
dflet 0:087b5655778d 1811 static int32_t recv_hvec[MAX_NWCONN + 1 + 1]; /* GROUP LISTEN PORT + VEC END */
dflet 0:087b5655778d 1812 static int32_t send_hvec = -1;
dflet 0:087b5655778d 1813 static int32_t rsvd_hvec = -1;
dflet 0:087b5655778d 1814
dflet 0:087b5655778d 1815 /* Caller must ensure atomic enviroment for execution of this routine */
dflet 0:087b5655778d 1816 static void recv_hvec_load(int32_t *hvec_recv, uint32_t size, struct client_ctx *list)
dflet 0:087b5655778d 1817 {
dflet 0:087b5655778d 1818
dflet 0:087b5655778d 1819 int32_t i = 0;
dflet 0:087b5655778d 1820
dflet 0:087b5655778d 1821 for(i = 0; (i < size) && (NULL != list); i++, list = list->next)
dflet 0:087b5655778d 1822 hvec_recv[i] = list->net;
dflet 0:087b5655778d 1823
dflet 0:087b5655778d 1824 hvec_recv[i] = -1;
dflet 0:087b5655778d 1825
dflet 0:087b5655778d 1826 return;
dflet 0:087b5655778d 1827 }
dflet 0:087b5655778d 1828
dflet 0:087b5655778d 1829 static int32_t group_ctxs_rx_prep(uint32_t wait_secs, void **app)
dflet 0:087b5655778d 1830 {
dflet 0:087b5655778d 1831
dflet 0:087b5655778d 1832 /* CHK 'used ctx'(s) have live connection w/ server. If not, drop it */
dflet 0:087b5655778d 1833 struct client_ctx *ctx_kaTO = group_ctxs_ka_sequence(wait_secs);
dflet 0:087b5655778d 1834 int32_t n_hnds;
dflet 0:087b5655778d 1835
dflet 0:087b5655778d 1836 if(ctx_kaTO) {
dflet 0:087b5655778d 1837 *app = CLIENT(ctx_kaTO)->app;
dflet 0:087b5655778d 1838 return MQP_ERR_NETWORK;
dflet 0:087b5655778d 1839 }
dflet 0:087b5655778d 1840
dflet 0:087b5655778d 1841 conn2used_ctxs(wait_secs); /* Now, add new 'ctx'(s) to 'used ctxs' */
dflet 0:087b5655778d 1842
dflet 0:087b5655778d 1843 recv_hvec[0] = loopb_net;
dflet 0:087b5655778d 1844 recv_hvec_load(&recv_hvec[1], MAX_NWCONN + 1, used_ctxs);
dflet 0:087b5655778d 1845
dflet 0:087b5655778d 1846 wait_secs = group_ctxs_adj_wait_secs_get(wait_secs);
dflet 0:087b5655778d 1847
dflet 0:087b5655778d 1848 n_hnds = net_ops->io_mon(recv_hvec, &send_hvec,
dflet 0:087b5655778d 1849 &rsvd_hvec, wait_secs);
dflet 0:087b5655778d 1850 if(0 == n_hnds)
dflet 0:087b5655778d 1851 n_hnds = MQP_ERR_TIMEOUT;
dflet 0:087b5655778d 1852 else if(n_hnds < 0)
dflet 0:087b5655778d 1853 n_hnds = MQP_ERR_LIBQUIT;
dflet 0:087b5655778d 1854
dflet 0:087b5655778d 1855 return n_hnds;
dflet 0:087b5655778d 1856 }
dflet 0:087b5655778d 1857
dflet 0:087b5655778d 1858 static int32_t proc_loopback_recv(int32_t net)
dflet 0:087b5655778d 1859 {
dflet 0:087b5655778d 1860
dflet 0:087b5655778d 1861 int32_t rv = 0;
dflet 0:087b5655778d 1862 uint8_t buf[LOOP_DLEN];
dflet 0:087b5655778d 1863
dflet 0:087b5655778d 1864 /* Thanks for waking-up thread, but ain't got much to do now */
dflet 0:087b5655778d 1865 rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0);
dflet 0:087b5655778d 1866 if(rv <= 0) {
dflet 0:087b5655778d 1867 memset(print_buf, 0x00, PRINT_BUF_LEN);
dflet 0:087b5655778d 1868 sprintf((char*) print_buf, "MQP_ERR_LIBQUIT %i\r\n",MQP_ERR_LIBQUIT);
dflet 0:087b5655778d 1869 Uart_Write((uint8_t *) print_buf);
dflet 0:087b5655778d 1870 net_ops->close(net);
dflet 0:087b5655778d 1871 return MQP_ERR_LIBQUIT;
dflet 0:087b5655778d 1872 }
dflet 0:087b5655778d 1873
dflet 0:087b5655778d 1874 return rv;
dflet 0:087b5655778d 1875 }
dflet 0:087b5655778d 1876
dflet 0:087b5655778d 1877 static struct client_ctx *net_cl_ctx_find(int32_t net) {
dflet 0:087b5655778d 1878 struct client_ctx *cl_ctx = used_ctxs;
dflet 0:087b5655778d 1879
dflet 0:087b5655778d 1880 while(cl_ctx && (net != cl_ctx->net))
dflet 0:087b5655778d 1881 cl_ctx = cl_ctx->next;
dflet 0:087b5655778d 1882
dflet 0:087b5655778d 1883 return cl_ctx;
dflet 0:087b5655778d 1884 }
dflet 0:087b5655778d 1885
dflet 0:087b5655778d 1886 static int32_t proc_net_data_recv(int32_t net, struct mqtt_packet *mqp, void **app)
dflet 0:087b5655778d 1887 {
dflet 0:087b5655778d 1888
dflet 0:087b5655778d 1889 /* Note: used_ctxs are always managed by a single RX task */
dflet 0:087b5655778d 1890 struct client_ctx *cl_ctx = net_cl_ctx_find(net);
dflet 0:087b5655778d 1891 int32_t rv = MQP_ERR_NOTCONN;
dflet 0:087b5655778d 1892
dflet 0:087b5655778d 1893 if(NULL == cl_ctx) {
dflet 0:087b5655778d 1894 return rv; /* TX removed it interim, mustn't happen */
dflet 0:087b5655778d 1895 }
dflet 0:087b5655778d 1896 return mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, 1, app);
dflet 0:087b5655778d 1897 }
dflet 0:087b5655778d 1898
dflet 0:087b5655778d 1899 static int32_t cl_recv(struct mqtt_packet *mqp, uint32_t wait_secs, void **app)
dflet 0:087b5655778d 1900 {
dflet 0:087b5655778d 1901
dflet 0:087b5655778d 1902 int32_t rv = MQP_ERR_NETWORK;
dflet 0:087b5655778d 1903 int32_t n_hnds = 0, idx = 0;
dflet 0:087b5655778d 1904
dflet 0:087b5655778d 1905 rv = group_ctxs_rx_prep(wait_secs, app);
dflet 0:087b5655778d 1906 if(rv > 0)
dflet 0:087b5655778d 1907 n_hnds = rv;
dflet 0:087b5655778d 1908
dflet 0:087b5655778d 1909 for(idx = 0; (idx < n_hnds) && (rv > 0); idx++) {
dflet 0:087b5655778d 1910 int32_t net = recv_hvec[idx];
dflet 0:087b5655778d 1911 if(loopb_net == net)
dflet 0:087b5655778d 1912 rv = proc_loopback_recv(net); /* UDP Packet */
dflet 0:087b5655778d 1913 else {
dflet 0:087b5655778d 1914 rv = proc_net_data_recv(net, mqp, app);
dflet 0:087b5655778d 1915 if(false == grp_has_cbfn)
dflet 0:087b5655778d 1916 break; /* 'CTX': inform application */
dflet 0:087b5655778d 1917 }
dflet 0:087b5655778d 1918 }
dflet 0:087b5655778d 1919
dflet 0:087b5655778d 1920 return rv;
dflet 0:087b5655778d 1921 }
dflet 0:087b5655778d 1922
dflet 0:087b5655778d 1923 static int32_t grp_net_setup_create()
dflet 0:087b5655778d 1924 {
dflet 0:087b5655778d 1925
dflet 0:087b5655778d 1926 if(0 == loopb_portid) {
dflet 0:087b5655778d 1927 return MQP_ERR_NOT_DEF;
dflet 0:087b5655778d 1928 }
dflet 0:087b5655778d 1929 if(NULL == net_ops) {
dflet 0:087b5655778d 1930 return MQP_ERR_NET_OPS;
dflet 0:087b5655778d 1931 }
dflet 0:087b5655778d 1932 if(-1 == loopb_net) {
dflet 0:087b5655778d 1933 loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, loopb_portid, NULL);
dflet 0:087b5655778d 1934
dflet 0:087b5655778d 1935 if(-1 == loopb_net) {
dflet 0:087b5655778d 1936 return MQP_ERR_LIBQUIT;
dflet 0:087b5655778d 1937 }
dflet 0:087b5655778d 1938 }
dflet 0:087b5655778d 1939
dflet 0:087b5655778d 1940 return 1;
dflet 0:087b5655778d 1941 }
dflet 0:087b5655778d 1942
dflet 0:087b5655778d 1943 int32_t mqtt_client_await_msg(struct mqtt_packet *mqp, uint32_t wait_secs, void **app)
dflet 0:087b5655778d 1944 {
dflet 0:087b5655778d 1945
dflet 0:087b5655778d 1946 int32_t rv = MQP_ERR_NOTCONN;
dflet 0:087b5655778d 1947 *app = NULL;
dflet 0:087b5655778d 1948
dflet 0:087b5655778d 1949 if(NULL == mqp)
dflet 0:087b5655778d 1950 return MQP_ERR_FNPARAM; /* Didn't get a valid MQP */
dflet 0:087b5655778d 1951
dflet 0:087b5655778d 1952 if(true == grp_has_cbfn)
dflet 0:087b5655778d 1953 return MQP_ERR_BADCALL; /* Err: LIB has CB config */
dflet 0:087b5655778d 1954
dflet 0:087b5655778d 1955 rv = grp_net_setup_create();
dflet 0:087b5655778d 1956 if(rv <= 0)
dflet 0:087b5655778d 1957 return rv;
dflet 0:087b5655778d 1958
dflet 0:087b5655778d 1959 do {
dflet 0:087b5655778d 1960 rv = cl_recv(mqp, wait_secs, app);
dflet 0:087b5655778d 1961
dflet 0:087b5655778d 1962 } while((rv > 0) && MQP_RX_DO_NOT_RPT_COR(mqp));
dflet 0:087b5655778d 1963
dflet 0:087b5655778d 1964 return rv;
dflet 0:087b5655778d 1965 }
dflet 0:087b5655778d 1966
dflet 0:087b5655778d 1967 int32_t mqtt_client_run(uint32_t wait_secs)
dflet 0:087b5655778d 1968 {
dflet 0:087b5655778d 1969
dflet 0:087b5655778d 1970 void *app = NULL;
dflet 0:087b5655778d 1971 int32_t rv = -1;
dflet 0:087b5655778d 1972
dflet 0:087b5655778d 1973 if(false == grp_has_cbfn) {
dflet 0:087b5655778d 1974 return MQP_ERR_BADCALL; /* Err: LIB has no CB config */
dflet 0:087b5655778d 1975 }
dflet 0:087b5655778d 1976 rv = grp_net_setup_create();
dflet 0:087b5655778d 1977 if(rv <= 0) {
dflet 0:087b5655778d 1978 return rv;
dflet 0:087b5655778d 1979 }
dflet 0:087b5655778d 1980 do {
dflet 0:087b5655778d 1981 rv = cl_recv(NULL, wait_secs, &app);
dflet 0:087b5655778d 1982
dflet 0:087b5655778d 1983 } while((rv > 0) ||
dflet 0:087b5655778d 1984 /* 'ctx' related errors are handled by the callbacks */
dflet 0:087b5655778d 1985 ((rv != MQP_ERR_LIBQUIT) && (rv != MQP_ERR_TIMEOUT)));
dflet 0:087b5655778d 1986
dflet 0:087b5655778d 1987 return rv;
dflet 0:087b5655778d 1988 }
dflet 0:087b5655778d 1989
dflet 0:087b5655778d 1990 /*------------------------------------------------------------------------------
dflet 0:087b5655778d 1991 * Buffer Pool and management, other registrations and initialization.
dflet 0:087b5655778d 1992 *------------------------------------------------------------------------------
dflet 0:087b5655778d 1993 */
dflet 0:087b5655778d 1994 static struct mqtt_packet *free_list = NULL;
dflet 0:087b5655778d 1995
dflet 0:087b5655778d 1996 static struct mqtt_packet *mqp_alloc_atomic(void) {
dflet 0:087b5655778d 1997
dflet 0:087b5655778d 1998 struct mqtt_packet *mqp = NULL;
dflet 0:087b5655778d 1999
dflet 0:087b5655778d 2000 // MUTEX_LOCKIN();
dflet 0:087b5655778d 2001 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 2002 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 2003 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 2004 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 2005 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 2006 // shared resource.
dflet 0:087b5655778d 2007 mqp = free_list;
dflet 0:087b5655778d 2008 if(mqp) {
dflet 0:087b5655778d 2009 free_list = mqp->next;
dflet 0:087b5655778d 2010 }
dflet 0:087b5655778d 2011
dflet 0:087b5655778d 2012 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 2013 // semaphore.
dflet 0:087b5655778d 2014 xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 2015
dflet 0:087b5655778d 2016 } else {
dflet 0:087b5655778d 2017 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 2018 // the shared resource safely.
dflet 0:087b5655778d 2019 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 2020 }
dflet 0:087b5655778d 2021 }
dflet 0:087b5655778d 2022
dflet 0:087b5655778d 2023 // MUTEX_UNLOCK();
dflet 0:087b5655778d 2024 return mqp;
dflet 0:087b5655778d 2025 }
dflet 0:087b5655778d 2026
dflet 0:087b5655778d 2027 struct mqtt_packet *mqp_client_alloc(uint8_t msg_type, uint8_t offset) {
dflet 0:087b5655778d 2028
dflet 0:087b5655778d 2029 struct mqtt_packet *mqp = mqp_alloc_atomic();
dflet 0:087b5655778d 2030 if(NULL == mqp) {
dflet 0:087b5655778d 2031 USR_INFO("MQP alloc failed - msg type 0x%02x\n\r", msg_type);
dflet 0:087b5655778d 2032 return NULL;
dflet 0:087b5655778d 2033 }
dflet 0:087b5655778d 2034
dflet 0:087b5655778d 2035 mqp_init(mqp, offset);
dflet 0:087b5655778d 2036 mqp->msg_type = msg_type;
dflet 0:087b5655778d 2037
dflet 0:087b5655778d 2038 return mqp;
dflet 0:087b5655778d 2039 }
dflet 0:087b5655778d 2040
dflet 0:087b5655778d 2041 /* Do not use this routine with-in this file. */
dflet 0:087b5655778d 2042 static void free_mqp(struct mqtt_packet *mqp)
dflet 0:087b5655778d 2043 {
dflet 0:087b5655778d 2044 /* Must be used in a locked state */
dflet 0:087b5655778d 2045 mqp->next = free_list;
dflet 0:087b5655778d 2046 free_list = mqp;
dflet 0:087b5655778d 2047 }
dflet 0:087b5655778d 2048
dflet 0:087b5655778d 2049 int32_t mqtt_client_buffers_register(uint32_t num_mqp, struct mqtt_packet *mqp_vec,
dflet 0:087b5655778d 2050 uint32_t buf_len, uint8_t *buf_vec)
dflet 0:087b5655778d 2051 {
dflet 0:087b5655778d 2052 uint32_t i, j;
dflet 0:087b5655778d 2053
dflet 0:087b5655778d 2054 if((0 == num_mqp) || (0 == buf_len) || free_list)
dflet 0:087b5655778d 2055 return -1;
dflet 0:087b5655778d 2056
dflet 0:087b5655778d 2057 for(i = 0, j = 0; i < num_mqp; i++, j += buf_len) {
dflet 0:087b5655778d 2058 struct mqtt_packet *mqp = mqp_vec + i;
dflet 0:087b5655778d 2059
dflet 0:087b5655778d 2060 mqp->buffer = buf_vec + j;
dflet 0:087b5655778d 2061 mqp->maxlen = buf_len;
dflet 0:087b5655778d 2062
dflet 0:087b5655778d 2063 mqp->free = free_mqp;
dflet 0:087b5655778d 2064 mqp->next = free_list;
dflet 0:087b5655778d 2065 free_list = mqp;
dflet 0:087b5655778d 2066 }
dflet 0:087b5655778d 2067
dflet 0:087b5655778d 2068 return 0;
dflet 0:087b5655778d 2069 }
dflet 0:087b5655778d 2070
dflet 0:087b5655778d 2071 int32_t mqtt_client_ctx_will_register(void *ctx,
dflet 0:087b5655778d 2072 const struct utf8_string *will_top,
dflet 0:087b5655778d 2073 const struct utf8_string *will_msg,
dflet 0:087b5655778d 2074 enum mqtt_qos will_qos, bool retain)
dflet 0:087b5655778d 2075 {
dflet 0:087b5655778d 2076 uint8_t B = 0;
dflet 0:087b5655778d 2077
dflet 0:087b5655778d 2078 if((NULL == ctx) || ((NULL == will_top) && (NULL != will_msg)))
dflet 0:087b5655778d 2079 return -1; /* Bad Combo */
dflet 0:087b5655778d 2080
dflet 0:087b5655778d 2081 if(NULL != will_top) {
dflet 0:087b5655778d 2082 RET_IF_INVALID_UTF8(will_top);
dflet 0:087b5655778d 2083
dflet 0:087b5655778d 2084 B = QOS_VALUE(will_qos) << 3;
dflet 0:087b5655778d 2085 if(retain)
dflet 0:087b5655778d 2086 B |= WILL_RETAIN_VAL;
dflet 0:087b5655778d 2087
dflet 0:087b5655778d 2088 if(NULL != will_msg)
dflet 0:087b5655778d 2089 RET_IF_INVALID_UTF8(will_msg);
dflet 0:087b5655778d 2090 }
dflet 0:087b5655778d 2091
dflet 0:087b5655778d 2092 CLIENT(ctx)->conn_pl_utf8s[1] = will_top;
dflet 0:087b5655778d 2093 CLIENT(ctx)->conn_pl_utf8s[2] = will_msg;
dflet 0:087b5655778d 2094
dflet 0:087b5655778d 2095 CLIENT(ctx)->will_opts = B;
dflet 0:087b5655778d 2096
dflet 0:087b5655778d 2097 return 0;
dflet 0:087b5655778d 2098 }
dflet 0:087b5655778d 2099
dflet 0:087b5655778d 2100 int32_t mqtt_client_ctx_info_register(void *ctx,
dflet 0:087b5655778d 2101 const struct utf8_string *client_id,
dflet 0:087b5655778d 2102 const struct utf8_string *user_name,
dflet 0:087b5655778d 2103 const struct utf8_string *pass_word)
dflet 0:087b5655778d 2104 {
dflet 0:087b5655778d 2105 const struct utf8_string *users_pwd = NULL;
dflet 0:087b5655778d 2106
dflet 0:087b5655778d 2107 if(NULL == ctx)
dflet 0:087b5655778d 2108 return -1;
dflet 0:087b5655778d 2109
dflet 0:087b5655778d 2110 if(NULL != client_id)
dflet 0:087b5655778d 2111 RET_IF_INVALID_UTF8(client_id);
dflet 0:087b5655778d 2112
dflet 0:087b5655778d 2113 if(NULL != user_name) {
dflet 0:087b5655778d 2114 RET_IF_INVALID_UTF8(user_name);
dflet 0:087b5655778d 2115
dflet 0:087b5655778d 2116 if(NULL != pass_word)
dflet 0:087b5655778d 2117 RET_IF_INVALID_UTF8(pass_word);
dflet 0:087b5655778d 2118
dflet 0:087b5655778d 2119 users_pwd = pass_word;
dflet 0:087b5655778d 2120 }
dflet 0:087b5655778d 2121
dflet 0:087b5655778d 2122 CLIENT(ctx)->conn_pl_utf8s[0] = client_id;
dflet 0:087b5655778d 2123 CLIENT(ctx)->conn_pl_utf8s[3] = user_name;
dflet 0:087b5655778d 2124 CLIENT(ctx)->conn_pl_utf8s[4] = users_pwd;
dflet 0:087b5655778d 2125
dflet 0:087b5655778d 2126 return 0;
dflet 0:087b5655778d 2127 }
dflet 0:087b5655778d 2128
dflet 0:087b5655778d 2129 int32_t mqtt_client_net_svc_register(const struct device_net_services *net)
dflet 0:087b5655778d 2130 {
dflet 0:087b5655778d 2131 if(net && net->open && net->send && net->recv &&
dflet 0:087b5655778d 2132 net->send_dest && net->recv_from && net->close
dflet 0:087b5655778d 2133 && net->io_mon && net->time) {
dflet 0:087b5655778d 2134 net_ops = net;
dflet 0:087b5655778d 2135 return 0;
dflet 0:087b5655778d 2136 }
dflet 0:087b5655778d 2137
dflet 0:087b5655778d 2138 return -1;
dflet 0:087b5655778d 2139 }
dflet 0:087b5655778d 2140
dflet 0:087b5655778d 2141 static void cl_ctx_setup(struct client_ctx *cl_ctx, /* WR Object */
dflet 0:087b5655778d 2142 const struct mqtt_client_ctx_cfg *ctx_cfg,
dflet 0:087b5655778d 2143 const struct mqtt_client_ctx_cbs *ctx_cbs,
dflet 0:087b5655778d 2144 void *app)
dflet 0:087b5655778d 2145 {
dflet 0:087b5655778d 2146
dflet 0:087b5655778d 2147 struct client_desc *client = CLIENT(cl_ctx);
dflet 0:087b5655778d 2148
dflet 0:087b5655778d 2149 cl_ctx->flags = ctx_cfg->config_opts;
dflet 0:087b5655778d 2150
dflet 0:087b5655778d 2151 client->nwconn_opts = ctx_cfg->nwconn_opts;
dflet 0:087b5655778d 2152 client->server_addr = ctx_cfg->server_addr;
dflet 0:087b5655778d 2153 client->port_number = ctx_cfg->port_number;
dflet 0:087b5655778d 2154
dflet 0:087b5655778d 2155 client->app = app;
dflet 0:087b5655778d 2156
dflet 0:087b5655778d 2157
dflet 0:087b5655778d 2158 if(NULL != ctx_cfg->nw_security)
dflet 0:087b5655778d 2159 buf_wr_nbytes((uint8_t*)&client->nw_security,
dflet 0:087b5655778d 2160 (uint8_t*)ctx_cfg->nw_security,
dflet 0:087b5655778d 2161 sizeof(struct secure_conn));
dflet 0:087b5655778d 2162
dflet 0:087b5655778d 2163 if(NULL != ctx_cbs) {
dflet 0:087b5655778d 2164 // set callback flag
dflet 0:087b5655778d 2165 struct mqtt_client_ctx_cbs *cbs_ctx = CTX_CBS_PTR(client);
dflet 0:087b5655778d 2166 cbs_ctx->publish_rx = ctx_cbs->publish_rx;
dflet 0:087b5655778d 2167 cbs_ctx->ack_notify = ctx_cbs->ack_notify;
dflet 0:087b5655778d 2168 cbs_ctx->disconn_cb = ctx_cbs->disconn_cb;
dflet 0:087b5655778d 2169 }
dflet 0:087b5655778d 2170
dflet 0:087b5655778d 2171 return;
dflet 0:087b5655778d 2172 }
dflet 0:087b5655778d 2173
dflet 0:087b5655778d 2174 int32_t mqtt_client_ctx_create(const struct mqtt_client_ctx_cfg *ctx_cfg,
dflet 0:087b5655778d 2175 const struct mqtt_client_ctx_cbs *ctx_cbs,
dflet 0:087b5655778d 2176 void *app, void **ctx)
dflet 0:087b5655778d 2177 {
dflet 0:087b5655778d 2178
dflet 0:087b5655778d 2179 struct client_ctx *cl_ctx = NULL;
dflet 0:087b5655778d 2180
dflet 0:087b5655778d 2181 if((NULL == ctx_cfg) ||
dflet 0:087b5655778d 2182 (NULL == ctx_cfg->server_addr) ||
dflet 0:087b5655778d 2183 (0 == ctx_cfg->port_number)) {
dflet 0:087b5655778d 2184 return -1;
dflet 0:087b5655778d 2185 }
dflet 0:087b5655778d 2186 if(ctx_cfg->config_opts & MQTT_CFG_MK_GROUP_CTX) {
dflet 0:087b5655778d 2187 if(grp_has_cbfn ^ (!!ctx_cbs)) {
dflet 0:087b5655778d 2188 return -1;
dflet 0:087b5655778d 2189 }
dflet 0:087b5655778d 2190 }
dflet 0:087b5655778d 2191
dflet 0:087b5655778d 2192 // MUTEX_LOCKIN();
dflet 0:087b5655778d 2193 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 2194 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 2195 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 2196 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 2197 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 2198 // shared resource.
dflet 0:087b5655778d 2199 if(free_ctxs) {
dflet 0:087b5655778d 2200 cl_ctx = free_ctxs;
dflet 0:087b5655778d 2201 free_ctxs = cl_ctx->next;
dflet 0:087b5655778d 2202 cl_ctx->next = NULL;
dflet 0:087b5655778d 2203 }
dflet 0:087b5655778d 2204
dflet 0:087b5655778d 2205 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 2206 // semaphore.
dflet 0:087b5655778d 2207 xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 2208
dflet 0:087b5655778d 2209 } else {
dflet 0:087b5655778d 2210 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 2211 // the shared resource safely.
dflet 0:087b5655778d 2212 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 2213 }
dflet 0:087b5655778d 2214 }
dflet 0:087b5655778d 2215
dflet 0:087b5655778d 2216 // MUTEX_UNLOCK();
dflet 0:087b5655778d 2217
dflet 0:087b5655778d 2218 if(cl_ctx) {
dflet 0:087b5655778d 2219 cl_ctx_setup(cl_ctx, ctx_cfg, ctx_cbs, app);
dflet 0:087b5655778d 2220 *ctx = (void*) cl_ctx;
dflet 0:087b5655778d 2221 return 0;
dflet 0:087b5655778d 2222 }
dflet 0:087b5655778d 2223
dflet 0:087b5655778d 2224 return -1;
dflet 0:087b5655778d 2225 }
dflet 0:087b5655778d 2226
dflet 0:087b5655778d 2227 int32_t mqtt_client_ctx_delete(void *ctx)
dflet 0:087b5655778d 2228 {
dflet 0:087b5655778d 2229
dflet 0:087b5655778d 2230 struct client_ctx *cl_ctx = (struct client_ctx*) ctx;
dflet 0:087b5655778d 2231 int32_t rv = -1; /* Not sure about deletion as yet */
dflet 0:087b5655778d 2232
dflet 0:087b5655778d 2233 // MUTEX_LOCKIN();
dflet 0:087b5655778d 2234 if( xSemaphore != NULL ) {
dflet 0:087b5655778d 2235 // See if we can obtain the semaphore. If the semaphore is not available
dflet 0:087b5655778d 2236 // wait 10 ticks to see if it becomes free.
dflet 0:087b5655778d 2237 if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) {
dflet 0:087b5655778d 2238 // We were able to obtain the semaphore and can now access the
dflet 0:087b5655778d 2239 // shared resource.
dflet 0:087b5655778d 2240 if((NULL == cl_ctx) ||
dflet 0:087b5655778d 2241 (-1 != cl_ctx->net) ||
dflet 0:087b5655778d 2242 (awaits_pkts(cl_ctx))) {
dflet 0:087b5655778d 2243 goto mqtt_client_ctx_delete_exit1;
dflet 0:087b5655778d 2244 }
dflet 0:087b5655778d 2245 rv = 0; /* OK to delete ctx */
dflet 0:087b5655778d 2246 client_reset(CLIENT(cl_ctx));
dflet 0:087b5655778d 2247 cl_ctx_freeup(cl_ctx);
dflet 0:087b5655778d 2248
dflet 0:087b5655778d 2249 // We have finished accessing the shared resource. Release the
dflet 0:087b5655778d 2250 // semaphore.
dflet 0:087b5655778d 2251 //xSemaphoreGive( xSemaphore );
dflet 0:087b5655778d 2252 } else {
dflet 0:087b5655778d 2253 // We could not obtain the semaphore and can therefore not access
dflet 0:087b5655778d 2254 // the shared resource safely.
dflet 0:087b5655778d 2255 Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n");
dflet 0:087b5655778d 2256 }
dflet 0:087b5655778d 2257 }
dflet 0:087b5655778d 2258
dflet 0:087b5655778d 2259 mqtt_client_ctx_delete_exit1:
dflet 0:087b5655778d 2260 // MUTEX_UNLOCK();
dflet 0:087b5655778d 2261 xSemaphoreGive(xSemaphore);
dflet 0:087b5655778d 2262
dflet 0:087b5655778d 2263 return rv;
dflet 0:087b5655778d 2264 }
dflet 0:087b5655778d 2265
dflet 0:087b5655778d 2266 int32_t mqtt_client_lib_init(const struct mqtt_client_lib_cfg *lib_cfg)
dflet 0:087b5655778d 2267 {
dflet 0:087b5655778d 2268 if((NULL == lib_cfg) || (NULL == lib_cfg->debug_printf))
dflet 0:087b5655778d 2269 return -1;
dflet 0:087b5655778d 2270
dflet 0:087b5655778d 2271 debug_printf = lib_cfg->debug_printf; /* Facilitate debug */
dflet 0:087b5655778d 2272
dflet 0:087b5655778d 2273 if(INIT_DONE_STATE == cl_lib_state) {
dflet 0:087b5655778d 2274 Uart_Write((uint8_t*)"C: Error trying to re-initialize \n\r");
dflet 0:087b5655778d 2275 USR_INFO("C: Error trying to re-initialize \n\r");
dflet 0:087b5655778d 2276 return -1;
dflet 0:087b5655778d 2277 }
dflet 0:087b5655778d 2278
dflet 0:087b5655778d 2279 USR_INFO("Version: Client LIB %s, Common LIB %s.\n\r",
dflet 0:087b5655778d 2280 MQTT_CLIENT_VERSTR, MQTT_COMMON_VERSTR);
dflet 0:087b5655778d 2281
dflet 0:087b5655778d 2282 client_desc_init();
dflet 0:087b5655778d 2283
dflet 0:087b5655778d 2284 cl_lib_state = INIT_DONE_STATE;
dflet 0:087b5655778d 2285
dflet 0:087b5655778d 2286 loopb_portid = lib_cfg->loopback_port;
dflet 0:087b5655778d 2287 grp_has_cbfn = lib_cfg->grp_uses_cbfn;
dflet 0:087b5655778d 2288
dflet 0:087b5655778d 2289 mutex = lib_cfg->mutex;
dflet 0:087b5655778d 2290 mutex_lockin = lib_cfg->mutex_lockin;
dflet 0:087b5655778d 2291 mutex_unlock = lib_cfg->mutex_unlock;
dflet 0:087b5655778d 2292
dflet 0:087b5655778d 2293 aux_dbg_enbl = lib_cfg->aux_debug_en;
dflet 0:087b5655778d 2294
dflet 0:087b5655778d 2295 return 0;
dflet 0:087b5655778d 2296 }
dflet 0:087b5655778d 2297
dflet 0:087b5655778d 2298 int32_t mqtt_client_lib_exit()
dflet 0:087b5655778d 2299 {
dflet 0:087b5655778d 2300 struct client_ctx *cl_ctx = free_ctxs;
dflet 0:087b5655778d 2301 int32_t count = 0;
dflet 0:087b5655778d 2302
dflet 0:087b5655778d 2303 while(cl_ctx) {
dflet 0:087b5655778d 2304 cl_ctx = cl_ctx->next;
dflet 0:087b5655778d 2305 count++;
dflet 0:087b5655778d 2306 }
dflet 0:087b5655778d 2307
dflet 0:087b5655778d 2308 if(MAX_NWCONN == count) {
dflet 0:087b5655778d 2309 cl_lib_state = WAIT_INIT_STATE;
dflet 0:087b5655778d 2310 free_ctxs = NULL;
dflet 0:087b5655778d 2311 return 0;
dflet 0:087b5655778d 2312 }
dflet 0:087b5655778d 2313
dflet 0:087b5655778d 2314 return -1;
dflet 0:087b5655778d 2315 }
dflet 0:087b5655778d 2316
dflet 0:087b5655778d 2317 }//namespace mbed_mqtt