Part of TI's mqtt
Dependents: mqtt_V1 cc3100_Test_mqtt_CM3
mqtt_client.cpp@0:087b5655778d, 2015-06-06 (annotated)
- Committer:
- dflet
- Date:
- Sat Jun 06 13:28:41 2015 +0000
- Revision:
- 0:087b5655778d
Part of mtqq_V1
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
dflet | 0:087b5655778d | 1 | /****************************************************************************** |
dflet | 0:087b5655778d | 2 | * |
dflet | 0:087b5655778d | 3 | * Copyright (C) 2014 Texas Instruments Incorporated |
dflet | 0:087b5655778d | 4 | * |
dflet | 0:087b5655778d | 5 | * All rights reserved. Property of Texas Instruments Incorporated. |
dflet | 0:087b5655778d | 6 | * Restricted rights to use, duplicate or disclose this code are |
dflet | 0:087b5655778d | 7 | * granted through contract. |
dflet | 0:087b5655778d | 8 | * |
dflet | 0:087b5655778d | 9 | * The program may not be used without the written permission of |
dflet | 0:087b5655778d | 10 | * Texas Instruments Incorporated or against the terms and conditions |
dflet | 0:087b5655778d | 11 | * stipulated in the agreement under which this program has been supplied, |
dflet | 0:087b5655778d | 12 | * and under no circumstances can it be used with non-TI connectivity device. |
dflet | 0:087b5655778d | 13 | * |
dflet | 0:087b5655778d | 14 | ******************************************************************************/ |
dflet | 0:087b5655778d | 15 | |
dflet | 0:087b5655778d | 16 | /* |
dflet | 0:087b5655778d | 17 | mqtt_client.c |
dflet | 0:087b5655778d | 18 | |
dflet | 0:087b5655778d | 19 | The module provides implementation to the public interface of the MQTT |
dflet | 0:087b5655778d | 20 | Client Library. |
dflet | 0:087b5655778d | 21 | */ |
dflet | 0:087b5655778d | 22 | #include "FreeRTOS.h" |
dflet | 0:087b5655778d | 23 | #include "mqtt_client.h" |
dflet | 0:087b5655778d | 24 | #include "semphr.h" |
dflet | 0:087b5655778d | 25 | |
dflet | 0:087b5655778d | 26 | #include "cli_uart.h" |
dflet | 0:087b5655778d | 27 | |
dflet | 0:087b5655778d | 28 | #define PRINT_BUF_LEN 128 |
dflet | 0:087b5655778d | 29 | extern int8_t print_buf[PRINT_BUF_LEN]; |
dflet | 0:087b5655778d | 30 | |
dflet | 0:087b5655778d | 31 | namespace mbed_mqtt |
dflet | 0:087b5655778d | 32 | { |
dflet | 0:087b5655778d | 33 | |
dflet | 0:087b5655778d | 34 | //void createMutex(void); |
dflet | 0:087b5655778d | 35 | |
dflet | 0:087b5655778d | 36 | static void *mutex = NULL; |
dflet | 0:087b5655778d | 37 | static void (*mutex_lockin)(void*) = NULL; |
dflet | 0:087b5655778d | 38 | static void (*mutex_unlock)(void*) = NULL; |
dflet | 0:087b5655778d | 39 | |
dflet | 0:087b5655778d | 40 | #define MUTEX_LOCKIN() if(mutex_lockin) mutex_lockin(mutex); |
dflet | 0:087b5655778d | 41 | #define MUTEX_UNLOCK() if(mutex_unlock) mutex_unlock(mutex); |
dflet | 0:087b5655778d | 42 | |
dflet | 0:087b5655778d | 43 | static bool aux_dbg_enbl = true; |
dflet | 0:087b5655778d | 44 | int32_t (*debug_printf)(const char *fmt, ...) = NULL; |
dflet | 0:087b5655778d | 45 | |
dflet | 0:087b5655778d | 46 | #define USR_INFO debug_printf |
dflet | 0:087b5655778d | 47 | #define DBG_INFO(I, ...) if(aux_dbg_enbl) debug_printf(I, ##__VA_ARGS__) |
dflet | 0:087b5655778d | 48 | |
dflet | 0:087b5655778d | 49 | static const struct device_net_services *net_ops = NULL; |
dflet | 0:087b5655778d | 50 | |
dflet | 0:087b5655778d | 51 | static uint16_t msg_id = 0xFFFF; |
dflet | 0:087b5655778d | 52 | static inline uint16_t assign_new_msg_id() |
dflet | 0:087b5655778d | 53 | { |
dflet | 0:087b5655778d | 54 | return msg_id += 2; |
dflet | 0:087b5655778d | 55 | } |
dflet | 0:087b5655778d | 56 | |
dflet | 0:087b5655778d | 57 | SemaphoreHandle_t xSemaphore = NULL; |
dflet | 0:087b5655778d | 58 | void createMutex() |
dflet | 0:087b5655778d | 59 | { |
dflet | 0:087b5655778d | 60 | |
dflet | 0:087b5655778d | 61 | xSemaphore = xSemaphoreCreateMutex(); |
dflet | 0:087b5655778d | 62 | } |
dflet | 0:087b5655778d | 63 | |
dflet | 0:087b5655778d | 64 | /*----------------------------------------------------------------------------- |
dflet | 0:087b5655778d | 65 | * Data structure for managing the client and its nuances |
dflet | 0:087b5655778d | 66 | *---------------------------------------------------------------------------*/ |
dflet | 0:087b5655778d | 67 | |
dflet | 0:087b5655778d | 68 | /* Construct to manage TX for network that requires LIB to send a partial and |
dflet | 0:087b5655778d | 69 | incremental data to support restrictive TCP segments. Specifically, for the |
dflet | 0:087b5655778d | 70 | deployments, in which the network layer supports small segments, there can |
dflet | 0:087b5655778d | 71 | be only one in-flight message. |
dflet | 0:087b5655778d | 72 | */ |
dflet | 0:087b5655778d | 73 | struct tx_part_pkt { |
dflet | 0:087b5655778d | 74 | |
dflet | 0:087b5655778d | 75 | /* Refers to MQP, if TX for it is active, otherwise, set it to NULL */ |
dflet | 0:087b5655778d | 76 | struct mqtt_packet *tx_mqp; |
dflet | 0:087b5655778d | 77 | |
dflet | 0:087b5655778d | 78 | union { |
dflet | 0:087b5655778d | 79 | #define VH_MSG_LEN 4 |
dflet | 0:087b5655778d | 80 | uint8_t vh_msg[VH_MSG_LEN]; /* VH msg for MQP = NULL */ |
dflet | 0:087b5655778d | 81 | const uint8_t *buffer; /* Refers to data in MQP */ |
dflet | 0:087b5655778d | 82 | }; |
dflet | 0:087b5655778d | 83 | |
dflet | 0:087b5655778d | 84 | uint32_t length; /* Length of entire data */ |
dflet | 0:087b5655778d | 85 | uint32_t offset; /* Next data for sending */ |
dflet | 0:087b5655778d | 86 | }; |
dflet | 0:087b5655778d | 87 | |
dflet | 0:087b5655778d | 88 | static bool tx_part_setup(struct tx_part_pkt *tx_part, const uint8_t *buffer, |
dflet | 0:087b5655778d | 89 | uint32_t length, struct mqtt_packet *tx_mqp) |
dflet | 0:087b5655778d | 90 | { |
dflet | 0:087b5655778d | 91 | |
dflet | 0:087b5655778d | 92 | if(tx_mqp) { |
dflet | 0:087b5655778d | 93 | tx_part->buffer = buffer; |
dflet | 0:087b5655778d | 94 | } else { |
dflet | 0:087b5655778d | 95 | if(VH_MSG_LEN < length) |
dflet | 0:087b5655778d | 96 | return false; |
dflet | 0:087b5655778d | 97 | |
dflet | 0:087b5655778d | 98 | buf_wr_nbytes(tx_part->vh_msg, buffer, length); |
dflet | 0:087b5655778d | 99 | } |
dflet | 0:087b5655778d | 100 | |
dflet | 0:087b5655778d | 101 | tx_part->length = length; |
dflet | 0:087b5655778d | 102 | tx_part->tx_mqp = tx_mqp; |
dflet | 0:087b5655778d | 103 | |
dflet | 0:087b5655778d | 104 | return true; |
dflet | 0:087b5655778d | 105 | } |
dflet | 0:087b5655778d | 106 | |
dflet | 0:087b5655778d | 107 | static void tx_part_reset(struct tx_part_pkt *tx_part) |
dflet | 0:087b5655778d | 108 | { |
dflet | 0:087b5655778d | 109 | |
dflet | 0:087b5655778d | 110 | struct mqtt_packet *tx_mqp = tx_part->tx_mqp; |
dflet | 0:087b5655778d | 111 | if(tx_mqp) { |
dflet | 0:087b5655778d | 112 | mqp_free(tx_mqp); |
dflet | 0:087b5655778d | 113 | } |
dflet | 0:087b5655778d | 114 | tx_part->vh_msg[0] = 0x00; |
dflet | 0:087b5655778d | 115 | tx_part->tx_mqp = NULL; |
dflet | 0:087b5655778d | 116 | tx_part->length = 0; |
dflet | 0:087b5655778d | 117 | tx_part->offset = 0; |
dflet | 0:087b5655778d | 118 | |
dflet | 0:087b5655778d | 119 | return; |
dflet | 0:087b5655778d | 120 | } |
dflet | 0:087b5655778d | 121 | |
dflet | 0:087b5655778d | 122 | static const uint8_t *tx_part_buf_p(struct tx_part_pkt *tx_part) |
dflet | 0:087b5655778d | 123 | { |
dflet | 0:087b5655778d | 124 | struct mqtt_packet *tx_mqp = tx_part->tx_mqp; |
dflet | 0:087b5655778d | 125 | uint32_t offset = tx_part->offset; |
dflet | 0:087b5655778d | 126 | |
dflet | 0:087b5655778d | 127 | return tx_mqp? |
dflet | 0:087b5655778d | 128 | tx_part->buffer + offset : |
dflet | 0:087b5655778d | 129 | tx_part->vh_msg + offset; |
dflet | 0:087b5655778d | 130 | } |
dflet | 0:087b5655778d | 131 | |
dflet | 0:087b5655778d | 132 | static void tx_part_addup(struct tx_part_pkt *tx_part, uint32_t offset) |
dflet | 0:087b5655778d | 133 | { |
dflet | 0:087b5655778d | 134 | tx_part->offset += offset; |
dflet | 0:087b5655778d | 135 | } |
dflet | 0:087b5655778d | 136 | |
dflet | 0:087b5655778d | 137 | #define TX_PART_BUFFER(tx_part) tx_part_buf_p(tx_part) |
dflet | 0:087b5655778d | 138 | #define TX_PART_BUF_SZ(tx_part) (tx_part->length - tx_part->offset) |
dflet | 0:087b5655778d | 139 | #define TX_PART_IN_USE(tx_part) TX_PART_BUF_SZ(tx_part) |
dflet | 0:087b5655778d | 140 | |
dflet | 0:087b5655778d | 141 | enum module_state { |
dflet | 0:087b5655778d | 142 | |
dflet | 0:087b5655778d | 143 | WAIT_INIT_STATE, |
dflet | 0:087b5655778d | 144 | INIT_DONE_STATE = 0x01, |
dflet | 0:087b5655778d | 145 | }; |
dflet | 0:087b5655778d | 146 | |
dflet | 0:087b5655778d | 147 | static enum module_state cl_lib_state = WAIT_INIT_STATE; |
dflet | 0:087b5655778d | 148 | |
dflet | 0:087b5655778d | 149 | static uint16_t loopb_portid = 0; |
dflet | 0:087b5655778d | 150 | static bool grp_has_cbfn = false; |
dflet | 0:087b5655778d | 151 | |
dflet | 0:087b5655778d | 152 | #define USE_PROTO_V31_FLAG MQTT_CFG_PROTOCOL_V31 |
dflet | 0:087b5655778d | 153 | #define APP_RECV_TASK_FLAG MQTT_CFG_APP_HAS_RTSK |
dflet | 0:087b5655778d | 154 | #define GROUP_CONTEXT_FLAG MQTT_CFG_MK_GROUP_CTX |
dflet | 0:087b5655778d | 155 | |
dflet | 0:087b5655778d | 156 | #define CLEAN_SESSION_FLAG 0x00010000 |
dflet | 0:087b5655778d | 157 | #define CONNACK_AWAIT_FLAG 0x00020000 |
dflet | 0:087b5655778d | 158 | #define NOW_CONNECTED_FLAG 0x00040000 |
dflet | 0:087b5655778d | 159 | #define KA_PINGER_RSP_FLAG 0x00080000 |
dflet | 0:087b5655778d | 160 | #define USER_PING_RSP_FLAG 0x00100000 |
dflet | 0:087b5655778d | 161 | #define NETWORK_CLOSE_FLAG 0x00200000 |
dflet | 0:087b5655778d | 162 | #define DO_CONNACK_TO_FLAG 0x00400000 |
dflet | 0:087b5655778d | 163 | |
dflet | 0:087b5655778d | 164 | static struct client_ctx *free_ctxs = NULL; /* CTX construct available */ |
dflet | 0:087b5655778d | 165 | static struct client_ctx *used_ctxs = NULL; /* Relevant only for group */ |
dflet | 0:087b5655778d | 166 | static struct client_ctx *conn_ctxs = NULL; /* Relevant only for group */ |
dflet | 0:087b5655778d | 167 | |
dflet | 0:087b5655778d | 168 | static void cl_ctx_freeup(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 169 | { |
dflet | 0:087b5655778d | 170 | cl_ctx->next = free_ctxs; |
dflet | 0:087b5655778d | 171 | free_ctxs = cl_ctx; |
dflet | 0:087b5655778d | 172 | |
dflet | 0:087b5655778d | 173 | return; |
dflet | 0:087b5655778d | 174 | } |
dflet | 0:087b5655778d | 175 | |
dflet | 0:087b5655778d | 176 | #define IS_PROTO_VER31(cl_ctx) (cl_ctx->flags & USE_PROTO_V31_FLAG) |
dflet | 0:087b5655778d | 177 | #define AWAITS_CONNACK(cl_ctx) (cl_ctx->flags & CONNACK_AWAIT_FLAG) |
dflet | 0:087b5655778d | 178 | #define HAS_CONNECTION(cl_ctx) (cl_ctx->flags & NOW_CONNECTED_FLAG) |
dflet | 0:087b5655778d | 179 | #define AWAITS_KA_PING(cl_ctx) (cl_ctx->flags & KA_PINGER_RSP_FLAG) |
dflet | 0:087b5655778d | 180 | #define AWAITS_PINGRSP(cl_ctx) (cl_ctx->flags & USER_PING_RSP_FLAG) |
dflet | 0:087b5655778d | 181 | #define IS_CLN_SESSION(cl_ctx) (cl_ctx->flags & CLEAN_SESSION_FLAG) |
dflet | 0:087b5655778d | 182 | #define RECV_TASK_AVBL(cl_ctx) (cl_ctx->flags & APP_RECV_TASK_FLAG) |
dflet | 0:087b5655778d | 183 | #define A_GROUP_MEMBER(cl_ctx) (cl_ctx->flags & GROUP_CONTEXT_FLAG) |
dflet | 0:087b5655778d | 184 | #define NEED_NET_CLOSE(cl_ctx) (cl_ctx->flags & NETWORK_CLOSE_FLAG) |
dflet | 0:087b5655778d | 185 | #define CFG_CONNACK_TO(cl_ctx) (cl_ctx->flags & DO_CONNACK_TO_FLAG) |
dflet | 0:087b5655778d | 186 | |
dflet | 0:087b5655778d | 187 | #ifndef CFG_CL_MQTT_CTXS |
dflet | 0:087b5655778d | 188 | #define MAX_NWCONN 4 |
dflet | 0:087b5655778d | 189 | #else |
dflet | 0:087b5655778d | 190 | #define MAX_NWCONN CFG_CL_MQTT_CTXS |
dflet | 0:087b5655778d | 191 | #endif |
dflet | 0:087b5655778d | 192 | |
dflet | 0:087b5655778d | 193 | static struct client_desc { |
dflet | 0:087b5655778d | 194 | |
dflet | 0:087b5655778d | 195 | /* ALERT: "context" must be first elem in this structure, do not move */ |
dflet | 0:087b5655778d | 196 | struct client_ctx context; |
dflet | 0:087b5655778d | 197 | |
dflet | 0:087b5655778d | 198 | #define CLIENT(cl_ctx) ((struct client_desc*) cl_ctx) |
dflet | 0:087b5655778d | 199 | #define CL_CTX(client) ((struct client_ctx*) client) |
dflet | 0:087b5655778d | 200 | |
dflet | 0:087b5655778d | 201 | /* Order/Sequence: Client ID, Will Topic, Will Msg, Username, Password */ |
dflet | 0:087b5655778d | 202 | const struct utf8_string *conn_pl_utf8s[5]; /* Ref: CONNECT Payload */ |
dflet | 0:087b5655778d | 203 | uint8_t will_opts; |
dflet | 0:087b5655778d | 204 | |
dflet | 0:087b5655778d | 205 | /* Wait-List for Level 1 ACK(s), which are PUBACK, PUBREC, UN/SUBACK */ |
dflet | 0:087b5655778d | 206 | struct mqtt_ack_wlist qos_ack1_wl; |
dflet | 0:087b5655778d | 207 | |
dflet | 0:087b5655778d | 208 | /* Circular queue to track QOS2 PUBLISH packets from the server. They |
dflet | 0:087b5655778d | 209 | are tracked for the duration of PUBLISH-RX to PUBREL-RX. |
dflet | 0:087b5655778d | 210 | */ |
dflet | 0:087b5655778d | 211 | struct pub_qos2_cq qos2_rx_cq; |
dflet | 0:087b5655778d | 212 | |
dflet | 0:087b5655778d | 213 | /* Circular queue to track QOS2 PUBLISH packets from the client. They |
dflet | 0:087b5655778d | 214 | are tracked for the duration of PUBREC-RX to PUBOMP-RX. |
dflet | 0:087b5655778d | 215 | */ |
dflet | 0:087b5655778d | 216 | struct pub_qos2_cq qos2_tx_cq; |
dflet | 0:087b5655778d | 217 | |
dflet | 0:087b5655778d | 218 | struct mqtt_client_ctx_cbs app_ctx_cbs; /* Callback funcs from app */ |
dflet | 0:087b5655778d | 219 | #define CTX_CBS_PTR(cl_ctx) &(CLIENT(cl_ctx)->app_ctx_cbs) |
dflet | 0:087b5655778d | 220 | |
dflet | 0:087b5655778d | 221 | struct tx_part_pkt tx_part;/* Reference to partial TX PKT */ |
dflet | 0:087b5655778d | 222 | struct mqtt_packet *rx_mqp; /* Reference to partial RX PKT */ |
dflet | 0:087b5655778d | 223 | void *app; |
dflet | 0:087b5655778d | 224 | |
dflet | 0:087b5655778d | 225 | uint32_t nwconn_opts; |
dflet | 0:087b5655778d | 226 | char *server_addr; |
dflet | 0:087b5655778d | 227 | uint16_t port_number; |
dflet | 0:087b5655778d | 228 | struct secure_conn nw_security; |
dflet | 0:087b5655778d | 229 | |
dflet | 0:087b5655778d | 230 | } clients[MAX_NWCONN]; |
dflet | 0:087b5655778d | 231 | |
dflet | 0:087b5655778d | 232 | static void client_reset(struct client_desc *client) |
dflet | 0:087b5655778d | 233 | { |
dflet | 0:087b5655778d | 234 | |
dflet | 0:087b5655778d | 235 | struct mqtt_client_ctx_cbs *ctx_cbs = &client->app_ctx_cbs; |
dflet | 0:087b5655778d | 236 | int32_t i = 0; |
dflet | 0:087b5655778d | 237 | |
dflet | 0:087b5655778d | 238 | cl_ctx_reset(CL_CTX(client)); |
dflet | 0:087b5655778d | 239 | |
dflet | 0:087b5655778d | 240 | for(i = 0; i < 5; i++) { |
dflet | 0:087b5655778d | 241 | client->conn_pl_utf8s[i] = NULL; |
dflet | 0:087b5655778d | 242 | } |
dflet | 0:087b5655778d | 243 | client->will_opts = 0; |
dflet | 0:087b5655778d | 244 | |
dflet | 0:087b5655778d | 245 | mqp_ack_wlist_purge(&client->qos_ack1_wl); |
dflet | 0:087b5655778d | 246 | |
dflet | 0:087b5655778d | 247 | qos2_pub_cq_reset(&client->qos2_rx_cq); |
dflet | 0:087b5655778d | 248 | qos2_pub_cq_reset(&client->qos2_tx_cq); |
dflet | 0:087b5655778d | 249 | |
dflet | 0:087b5655778d | 250 | ctx_cbs->publish_rx = NULL; |
dflet | 0:087b5655778d | 251 | ctx_cbs->ack_notify = NULL; |
dflet | 0:087b5655778d | 252 | ctx_cbs->disconn_cb = NULL; |
dflet | 0:087b5655778d | 253 | |
dflet | 0:087b5655778d | 254 | tx_part_reset(&client->tx_part); |
dflet | 0:087b5655778d | 255 | client->rx_mqp = NULL; |
dflet | 0:087b5655778d | 256 | client->app = NULL; |
dflet | 0:087b5655778d | 257 | |
dflet | 0:087b5655778d | 258 | client->nwconn_opts = 0; |
dflet | 0:087b5655778d | 259 | client->server_addr = NULL; |
dflet | 0:087b5655778d | 260 | client->port_number = 0; |
dflet | 0:087b5655778d | 261 | |
dflet | 0:087b5655778d | 262 | secure_conn_struct_init(&client->nw_security); |
dflet | 0:087b5655778d | 263 | |
dflet | 0:087b5655778d | 264 | return; |
dflet | 0:087b5655778d | 265 | } |
dflet | 0:087b5655778d | 266 | |
dflet | 0:087b5655778d | 267 | static void client_desc_init(void) |
dflet | 0:087b5655778d | 268 | { |
dflet | 0:087b5655778d | 269 | int32_t i = 0; |
dflet | 0:087b5655778d | 270 | for(i = 0; i < MAX_NWCONN; i++) { |
dflet | 0:087b5655778d | 271 | struct client_desc *client = clients + i; |
dflet | 0:087b5655778d | 272 | struct client_ctx *cl_ctx = CL_CTX(client); |
dflet | 0:087b5655778d | 273 | |
dflet | 0:087b5655778d | 274 | /* Initialize certain fields to defaults */ |
dflet | 0:087b5655778d | 275 | client->qos_ack1_wl.head = NULL; |
dflet | 0:087b5655778d | 276 | client->qos_ack1_wl.tail = NULL; |
dflet | 0:087b5655778d | 277 | client->tx_part.tx_mqp = NULL; |
dflet | 0:087b5655778d | 278 | |
dflet | 0:087b5655778d | 279 | client_reset(client); /* Reset remaining */ |
dflet | 0:087b5655778d | 280 | |
dflet | 0:087b5655778d | 281 | cl_ctx->next = free_ctxs; |
dflet | 0:087b5655778d | 282 | free_ctxs = cl_ctx; |
dflet | 0:087b5655778d | 283 | } |
dflet | 0:087b5655778d | 284 | |
dflet | 0:087b5655778d | 285 | return; |
dflet | 0:087b5655778d | 286 | } |
dflet | 0:087b5655778d | 287 | |
dflet | 0:087b5655778d | 288 | static void mqp_free_locked(struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 289 | { |
dflet | 0:087b5655778d | 290 | |
dflet | 0:087b5655778d | 291 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 292 | |
dflet | 0:087b5655778d | 293 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 294 | |
dflet | 0:087b5655778d | 295 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 296 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 297 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 298 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 299 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 300 | // shared resource. |
dflet | 0:087b5655778d | 301 | mqp_free(mqp); |
dflet | 0:087b5655778d | 302 | |
dflet | 0:087b5655778d | 303 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 304 | // semaphore. |
dflet | 0:087b5655778d | 305 | xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 306 | |
dflet | 0:087b5655778d | 307 | } else { |
dflet | 0:087b5655778d | 308 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 309 | // the shared resource safely. |
dflet | 0:087b5655778d | 310 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 311 | } |
dflet | 0:087b5655778d | 312 | } |
dflet | 0:087b5655778d | 313 | } |
dflet | 0:087b5655778d | 314 | |
dflet | 0:087b5655778d | 315 | /*---------------------------------------------------------------------------- |
dflet | 0:087b5655778d | 316 | * Fix-up to prevent certain good and non-callback MQP being reported to app |
dflet | 0:087b5655778d | 317 | *---------------------------------------------------------------------------- |
dflet | 0:087b5655778d | 318 | */ |
dflet | 0:087b5655778d | 319 | /* cor --> clear on read. */ |
dflet | 0:087b5655778d | 320 | static bool mqp_do_not_report_cor(struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 321 | { |
dflet | 0:087b5655778d | 322 | bool rv = (1 == mqp->private_)? true : false; |
dflet | 0:087b5655778d | 323 | mqp->private_ = 0; |
dflet | 0:087b5655778d | 324 | return rv; |
dflet | 0:087b5655778d | 325 | } |
dflet | 0:087b5655778d | 326 | |
dflet | 0:087b5655778d | 327 | #define MQP_RX_DO_NOT_RPT_COR(mqp) mqp_do_not_report_cor(mqp) |
dflet | 0:087b5655778d | 328 | |
dflet | 0:087b5655778d | 329 | /* Only if MQP has good RX data i.e. this macro shouldn't be used for bad RX */ |
dflet | 0:087b5655778d | 330 | #define MQP_RX_DO_NOT_RPT_SET(mqp) (mqp->private_ = 1) |
dflet | 0:087b5655778d | 331 | |
dflet | 0:087b5655778d | 332 | #define MQP_TX_DONE_LEN_ADD(mqp, add) (mqp->private_ += add) |
dflet | 0:087b5655778d | 333 | #define MQP_TX_DONE_LEN_GET(mqp) (mqp->private_) |
dflet | 0:087b5655778d | 334 | |
dflet | 0:087b5655778d | 335 | /*---------------------------Fix-Up Ends ------------------------------------*/ |
dflet | 0:087b5655778d | 336 | |
dflet | 0:087b5655778d | 337 | static int32_t loopb_net = -1; |
dflet | 0:087b5655778d | 338 | static const uint8_t LOOP_DATA[] = {0x00, 0x01}; |
dflet | 0:087b5655778d | 339 | #define LOOP_DLEN sizeof(LOOP_DATA) |
dflet | 0:087b5655778d | 340 | |
dflet | 0:087b5655778d | 341 | static int32_t loopb_trigger(void) |
dflet | 0:087b5655778d | 342 | { |
dflet | 0:087b5655778d | 343 | |
dflet | 0:087b5655778d | 344 | |
dflet | 0:087b5655778d | 345 | uint8_t ip_addr[] = {127,0,0,1}; |
dflet | 0:087b5655778d | 346 | |
dflet | 0:087b5655778d | 347 | return (-1 != loopb_net)? |
dflet | 0:087b5655778d | 348 | net_ops->send_dest(loopb_net, LOOP_DATA, LOOP_DLEN, |
dflet | 0:087b5655778d | 349 | loopb_portid, ip_addr, 4) : MQP_ERR_LIBQUIT; |
dflet | 0:087b5655778d | 350 | } |
dflet | 0:087b5655778d | 351 | |
dflet | 0:087b5655778d | 352 | static void session_311fix(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 353 | { |
dflet | 0:087b5655778d | 354 | struct mqtt_ack_wlist *wl = &CLIENT(cl_ctx)->qos_ack1_wl; |
dflet | 0:087b5655778d | 355 | struct mqtt_packet *elem = wl->head; |
dflet | 0:087b5655778d | 356 | |
dflet | 0:087b5655778d | 357 | while(elem) { |
dflet | 0:087b5655778d | 358 | struct mqtt_packet *next = elem->next; |
dflet | 0:087b5655778d | 359 | if(MQTT_PUBLISH != elem->msg_type) |
dflet | 0:087b5655778d | 360 | mqp_ack_wlist_remove(wl, elem->msg_id); |
dflet | 0:087b5655778d | 361 | |
dflet | 0:087b5655778d | 362 | elem = next; |
dflet | 0:087b5655778d | 363 | } |
dflet | 0:087b5655778d | 364 | |
dflet | 0:087b5655778d | 365 | return; |
dflet | 0:087b5655778d | 366 | } |
dflet | 0:087b5655778d | 367 | |
dflet | 0:087b5655778d | 368 | static void session_delete(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 369 | { |
dflet | 0:087b5655778d | 370 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 371 | |
dflet | 0:087b5655778d | 372 | DBG_INFO("C: Cleaning session for net %d\n\r", cl_ctx->net); |
dflet | 0:087b5655778d | 373 | |
dflet | 0:087b5655778d | 374 | qos2_pub_cq_reset(&client->qos2_rx_cq); |
dflet | 0:087b5655778d | 375 | qos2_pub_cq_reset(&client->qos2_tx_cq); |
dflet | 0:087b5655778d | 376 | |
dflet | 0:087b5655778d | 377 | mqp_ack_wlist_purge(&client->qos_ack1_wl); |
dflet | 0:087b5655778d | 378 | |
dflet | 0:087b5655778d | 379 | return; |
dflet | 0:087b5655778d | 380 | } |
dflet | 0:087b5655778d | 381 | |
dflet | 0:087b5655778d | 382 | /*------------------------------------------------------------------------------ |
dflet | 0:087b5655778d | 383 | * Routine to manage error conditions in client - close the network connection |
dflet | 0:087b5655778d | 384 | *----------------------------------------------------------------------------*/ |
dflet | 0:087b5655778d | 385 | static void do_net_close(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 386 | { |
dflet | 0:087b5655778d | 387 | int32_t net = cl_ctx->net; |
dflet | 0:087b5655778d | 388 | |
dflet | 0:087b5655778d | 389 | if(-1 == net) |
dflet | 0:087b5655778d | 390 | return; /* network closed already, must not happen */ |
dflet | 0:087b5655778d | 391 | |
dflet | 0:087b5655778d | 392 | if(IS_CLN_SESSION(cl_ctx)) { |
dflet | 0:087b5655778d | 393 | session_delete(cl_ctx); |
dflet | 0:087b5655778d | 394 | } else if(!IS_PROTO_VER31(cl_ctx)) { |
dflet | 0:087b5655778d | 395 | /* Version 3.1.1 doesn't need SUB and UNSUB re-send */ |
dflet | 0:087b5655778d | 396 | session_311fix(cl_ctx); |
dflet | 0:087b5655778d | 397 | } |
dflet | 0:087b5655778d | 398 | |
dflet | 0:087b5655778d | 399 | tx_part_reset(&CLIENT(cl_ctx)->tx_part); /* Part TX, if any */ |
dflet | 0:087b5655778d | 400 | |
dflet | 0:087b5655778d | 401 | cl_ctx->flags &= ~(CONNACK_AWAIT_FLAG | NOW_CONNECTED_FLAG | |
dflet | 0:087b5655778d | 402 | KA_PINGER_RSP_FLAG | USER_PING_RSP_FLAG | |
dflet | 0:087b5655778d | 403 | NETWORK_CLOSE_FLAG | DO_CONNACK_TO_FLAG); |
dflet | 0:087b5655778d | 404 | |
dflet | 0:087b5655778d | 405 | cl_ctx->net = -1; |
dflet | 0:087b5655778d | 406 | net_ops->close(net); |
dflet | 0:087b5655778d | 407 | |
dflet | 0:087b5655778d | 408 | USR_INFO("C: Net %d now closed\n\r", net); |
dflet | 0:087b5655778d | 409 | |
dflet | 0:087b5655778d | 410 | return; |
dflet | 0:087b5655778d | 411 | } |
dflet | 0:087b5655778d | 412 | |
dflet | 0:087b5655778d | 413 | static void do_net_close_rx(struct client_ctx *cl_ctx, int32_t cause) |
dflet | 0:087b5655778d | 414 | { |
dflet | 0:087b5655778d | 415 | struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); |
dflet | 0:087b5655778d | 416 | |
dflet | 0:087b5655778d | 417 | DBG_INFO("C: RX closing Net %d [%d]\n\r", cl_ctx->net, cause); |
dflet | 0:087b5655778d | 418 | |
dflet | 0:087b5655778d | 419 | do_net_close(cl_ctx); |
dflet | 0:087b5655778d | 420 | if(ctx_cbs->disconn_cb) |
dflet | 0:087b5655778d | 421 | ctx_cbs->disconn_cb(CLIENT(cl_ctx)->app, cause); |
dflet | 0:087b5655778d | 422 | |
dflet | 0:087b5655778d | 423 | if(A_GROUP_MEMBER(cl_ctx)) |
dflet | 0:087b5655778d | 424 | cl_ctx_remove(&used_ctxs, cl_ctx); |
dflet | 0:087b5655778d | 425 | |
dflet | 0:087b5655778d | 426 | return; |
dflet | 0:087b5655778d | 427 | } |
dflet | 0:087b5655778d | 428 | |
dflet | 0:087b5655778d | 429 | static void do_net_close_tx(struct client_ctx *cl_ctx, char *cause) |
dflet | 0:087b5655778d | 430 | { |
dflet | 0:087b5655778d | 431 | DBG_INFO("C: TX closing Net %d [%s]\n\r", cl_ctx->net, cause); |
dflet | 0:087b5655778d | 432 | |
dflet | 0:087b5655778d | 433 | if(RECV_TASK_AVBL(cl_ctx)) { |
dflet | 0:087b5655778d | 434 | cl_ctx->flags |= NETWORK_CLOSE_FLAG; |
dflet | 0:087b5655778d | 435 | if(A_GROUP_MEMBER(cl_ctx)) |
dflet | 0:087b5655778d | 436 | loopb_trigger(); |
dflet | 0:087b5655778d | 437 | } else { |
dflet | 0:087b5655778d | 438 | struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp; |
dflet | 0:087b5655778d | 439 | |
dflet | 0:087b5655778d | 440 | do_net_close(cl_ctx); /* No RX Task, close now */ |
dflet | 0:087b5655778d | 441 | |
dflet | 0:087b5655778d | 442 | /* Release partial MQP, if any, for a CTX w/ CB */ |
dflet | 0:087b5655778d | 443 | if((NULL != rx_mqp) && (NULL != rx_mqp->free)) |
dflet | 0:087b5655778d | 444 | mqp_free(rx_mqp); |
dflet | 0:087b5655778d | 445 | |
dflet | 0:087b5655778d | 446 | CLIENT(cl_ctx)->rx_mqp = NULL; |
dflet | 0:087b5655778d | 447 | } |
dflet | 0:087b5655778d | 448 | |
dflet | 0:087b5655778d | 449 | return; |
dflet | 0:087b5655778d | 450 | } |
dflet | 0:087b5655778d | 451 | |
dflet | 0:087b5655778d | 452 | /*---------------------------------------------------------------------------- |
dflet | 0:087b5655778d | 453 | * QoS2 PUB RX Message handling mechanism and associated house-keeping |
dflet | 0:087b5655778d | 454 | *--------------------------------------------------------------------------*/ |
dflet | 0:087b5655778d | 455 | static bool qos2_pub_rx_logup(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 456 | { |
dflet | 0:087b5655778d | 457 | return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); |
dflet | 0:087b5655778d | 458 | } |
dflet | 0:087b5655778d | 459 | |
dflet | 0:087b5655778d | 460 | static bool ack2_msg_id_logup(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 461 | { |
dflet | 0:087b5655778d | 462 | return qos2_pub_cq_logup(&CLIENT(cl_ctx)->qos2_tx_cq, msg_id); |
dflet | 0:087b5655778d | 463 | } |
dflet | 0:087b5655778d | 464 | |
dflet | 0:087b5655778d | 465 | static bool qos2_pub_rx_unlog(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 466 | { |
dflet | 0:087b5655778d | 467 | return qos2_pub_cq_unlog(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); |
dflet | 0:087b5655778d | 468 | } |
dflet | 0:087b5655778d | 469 | |
dflet | 0:087b5655778d | 470 | static bool ack2_msg_id_unlog(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 471 | { |
dflet | 0:087b5655778d | 472 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 473 | if(qos2_pub_cq_unlog(&client->qos2_tx_cq, msg_id)) { |
dflet | 0:087b5655778d | 474 | struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); |
dflet | 0:087b5655778d | 475 | if(ctx_cbs->ack_notify) |
dflet | 0:087b5655778d | 476 | ctx_cbs->ack_notify(client->app, MQTT_PUBCOMP, |
dflet | 0:087b5655778d | 477 | msg_id, NULL, 0); |
dflet | 0:087b5655778d | 478 | return true; |
dflet | 0:087b5655778d | 479 | } |
dflet | 0:087b5655778d | 480 | |
dflet | 0:087b5655778d | 481 | return false; |
dflet | 0:087b5655778d | 482 | } |
dflet | 0:087b5655778d | 483 | |
dflet | 0:087b5655778d | 484 | static bool qos2_pub_rx_is_done(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 485 | { |
dflet | 0:087b5655778d | 486 | return qos2_pub_cq_check(&CLIENT(cl_ctx)->qos2_rx_cq, msg_id); |
dflet | 0:087b5655778d | 487 | } |
dflet | 0:087b5655778d | 488 | |
dflet | 0:087b5655778d | 489 | static bool awaits_pkts(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 490 | { |
dflet | 0:087b5655778d | 491 | |
dflet | 0:087b5655778d | 492 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 493 | |
dflet | 0:087b5655778d | 494 | return client->qos_ack1_wl.head || |
dflet | 0:087b5655778d | 495 | qos2_pub_cq_count(&client->qos2_rx_cq) || |
dflet | 0:087b5655778d | 496 | qos2_pub_cq_count(&client->qos2_tx_cq)? |
dflet | 0:087b5655778d | 497 | true : false; |
dflet | 0:087b5655778d | 498 | } |
dflet | 0:087b5655778d | 499 | |
dflet | 0:087b5655778d | 500 | static inline int32_t len_err_free_mqp(struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 501 | { |
dflet | 0:087b5655778d | 502 | mqp_free(mqp); |
dflet | 0:087b5655778d | 503 | return MQP_ERR_PKT_LEN; |
dflet | 0:087b5655778d | 504 | } |
dflet | 0:087b5655778d | 505 | |
dflet | 0:087b5655778d | 506 | static int32_t is_valid_utf8_string(const struct utf8_string *utf8) |
dflet | 0:087b5655778d | 507 | { |
dflet | 0:087b5655778d | 508 | /* Valid topic should be > 0 byte and must hosted in usable buffer */ |
dflet | 0:087b5655778d | 509 | return ((utf8->length > 0) && (NULL != utf8->buffer))? true : false; |
dflet | 0:087b5655778d | 510 | } |
dflet | 0:087b5655778d | 511 | |
dflet | 0:087b5655778d | 512 | #define RET_IF_INVALID_UTF8(utf8) \ |
dflet | 0:087b5655778d | 513 | if(false == is_valid_utf8_string(utf8)) \ |
dflet | 0:087b5655778d | 514 | return -1; |
dflet | 0:087b5655778d | 515 | |
dflet | 0:087b5655778d | 516 | static bool is_connected(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 517 | { |
dflet | 0:087b5655778d | 518 | |
dflet | 0:087b5655778d | 519 | return (HAS_CONNECTION(cl_ctx) && !NEED_NET_CLOSE(cl_ctx))? |
dflet | 0:087b5655778d | 520 | true : false; |
dflet | 0:087b5655778d | 521 | } |
dflet | 0:087b5655778d | 522 | |
dflet | 0:087b5655778d | 523 | uint16_t mqtt_client_new_msg_id() |
dflet | 0:087b5655778d | 524 | { |
dflet | 0:087b5655778d | 525 | return assign_new_msg_id(); |
dflet | 0:087b5655778d | 526 | } |
dflet | 0:087b5655778d | 527 | |
dflet | 0:087b5655778d | 528 | bool mqtt_client_is_connected(void *ctx) |
dflet | 0:087b5655778d | 529 | { |
dflet | 0:087b5655778d | 530 | return is_connected(CL_CTX(ctx)); |
dflet | 0:087b5655778d | 531 | } |
dflet | 0:087b5655778d | 532 | |
dflet | 0:087b5655778d | 533 | /*---------------------------------------------------------------------------- |
dflet | 0:087b5655778d | 534 | * MQTT TX Routines |
dflet | 0:087b5655778d | 535 | *--------------------------------------------------------------------------*/ |
dflet | 0:087b5655778d | 536 | static void used_ctxs_TO_sort(struct client_ctx *cl_ctx_TO) |
dflet | 0:087b5655778d | 537 | { |
dflet | 0:087b5655778d | 538 | cl_ctx_remove(&used_ctxs, cl_ctx_TO); |
dflet | 0:087b5655778d | 539 | cl_ctx_timeout_insert(&used_ctxs, cl_ctx_TO); |
dflet | 0:087b5655778d | 540 | } |
dflet | 0:087b5655778d | 541 | |
dflet | 0:087b5655778d | 542 | static inline int32_t net_send(int32_t net, const uint8_t *buf, uint32_t len, void *ctx) |
dflet | 0:087b5655778d | 543 | { |
dflet | 0:087b5655778d | 544 | |
dflet | 0:087b5655778d | 545 | int32_t rv = net_ops->send(net, buf, len, ctx); |
dflet | 0:087b5655778d | 546 | if(rv <= 0) { |
dflet | 0:087b5655778d | 547 | memset(print_buf, 0x00, PRINT_BUF_LEN); |
dflet | 0:087b5655778d | 548 | sprintf((char*) print_buf, "MQP_ERR_NETWORK %i\r\n", MQP_ERR_NETWORK); |
dflet | 0:087b5655778d | 549 | Uart_Write((uint8_t *) print_buf); |
dflet | 0:087b5655778d | 550 | rv = MQP_ERR_NETWORK; |
dflet | 0:087b5655778d | 551 | } |
dflet | 0:087b5655778d | 552 | return rv; |
dflet | 0:087b5655778d | 553 | } |
dflet | 0:087b5655778d | 554 | |
dflet | 0:087b5655778d | 555 | #if 0 |
dflet | 0:087b5655778d | 556 | static int32_t cl_ctx_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len, |
dflet | 0:087b5655778d | 557 | bool is_conn_msg) |
dflet | 0:087b5655778d | 558 | { |
dflet | 0:087b5655778d | 559 | |
dflet | 0:087b5655778d | 560 | int32_t rv = MQP_ERR_NOTCONN; |
dflet | 0:087b5655778d | 561 | |
dflet | 0:087b5655778d | 562 | /* For CONNECT msg, a client context mustn't be already connected. |
dflet | 0:087b5655778d | 563 | For others msgs, a client context must be conected to server */ |
dflet | 0:087b5655778d | 564 | if(false == (is_conn_msg ^ is_connected(cl_ctx))) |
dflet | 0:087b5655778d | 565 | goto cl_ctx_send_exit1; |
dflet | 0:087b5655778d | 566 | |
dflet | 0:087b5655778d | 567 | rv = net_send(cl_ctx->net, buf, len); |
dflet | 0:087b5655778d | 568 | if(rv > 0) { /* A good send, do follow-up */ |
dflet | 0:087b5655778d | 569 | cl_ctx_timeout_update(cl_ctx, net_ops->time()); |
dflet | 0:087b5655778d | 570 | if(A_GROUP_MEMBER(cl_ctx) && HAS_CONNECTION(cl_ctx)) { |
dflet | 0:087b5655778d | 571 | /* With update to timeout, |
dflet | 0:087b5655778d | 572 | a sorting is impending */ |
dflet | 0:087b5655778d | 573 | used_ctxs_TO_sort(cl_ctx); |
dflet | 0:087b5655778d | 574 | } |
dflet | 0:087b5655778d | 575 | |
dflet | 0:087b5655778d | 576 | goto cl_ctx_send_exit1; /* A Good Send */ |
dflet | 0:087b5655778d | 577 | } |
dflet | 0:087b5655778d | 578 | |
dflet | 0:087b5655778d | 579 | do_net_close_tx(cl_ctx, "snd-err"); |
dflet | 0:087b5655778d | 580 | |
dflet | 0:087b5655778d | 581 | cl_ctx_send_exit1: |
dflet | 0:087b5655778d | 582 | USR_INFO("C: FH-B1 0x%02x, len %u bytes, to net %d: %s\n\r", |
dflet | 0:087b5655778d | 583 | *buf, len, cl_ctx->net, (rv > 0)? "Sent" : "Fail"); |
dflet | 0:087b5655778d | 584 | return rv; |
dflet | 0:087b5655778d | 585 | } |
dflet | 0:087b5655778d | 586 | #endif |
dflet | 0:087b5655778d | 587 | |
dflet | 0:087b5655778d | 588 | static int32_t cl_ctx_part_send(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 589 | { |
dflet | 0:087b5655778d | 590 | |
dflet | 0:087b5655778d | 591 | struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part; |
dflet | 0:087b5655778d | 592 | const uint8_t *buf = TX_PART_BUFFER(tx_part); |
dflet | 0:087b5655778d | 593 | uint32_t len = TX_PART_BUF_SZ(tx_part); |
dflet | 0:087b5655778d | 594 | uint32_t ofs = tx_part->offset; |
dflet | 0:087b5655778d | 595 | uint8_t B1 = *buf; |
dflet | 0:087b5655778d | 596 | |
dflet | 0:087b5655778d | 597 | int32_t rv = net_send(cl_ctx->net, buf, len, (void*)cl_ctx); |
dflet | 0:087b5655778d | 598 | if(rv > 0) { /* Follow-up for a good send */ |
dflet | 0:087b5655778d | 599 | if(HAS_CONNECTION(cl_ctx)) { |
dflet | 0:087b5655778d | 600 | /* Update TX timeout, if 'CTX' is connected */ |
dflet | 0:087b5655778d | 601 | cl_ctx_timeout_update(cl_ctx, net_ops->time()); |
dflet | 0:087b5655778d | 602 | |
dflet | 0:087b5655778d | 603 | /* After update, 'CTX'(s) sorting is a must */ |
dflet | 0:087b5655778d | 604 | if(A_GROUP_MEMBER(cl_ctx)) |
dflet | 0:087b5655778d | 605 | used_ctxs_TO_sort(cl_ctx); |
dflet | 0:087b5655778d | 606 | } |
dflet | 0:087b5655778d | 607 | |
dflet | 0:087b5655778d | 608 | if(rv != len) |
dflet | 0:087b5655778d | 609 | /* Partial data was sent */ |
dflet | 0:087b5655778d | 610 | tx_part_addup(tx_part, rv); |
dflet | 0:087b5655778d | 611 | else |
dflet | 0:087b5655778d | 612 | tx_part_reset(tx_part); |
dflet | 0:087b5655778d | 613 | |
dflet | 0:087b5655778d | 614 | goto cl_ctx_send_exit1; /* A Good Send */ |
dflet | 0:087b5655778d | 615 | } |
dflet | 0:087b5655778d | 616 | |
dflet | 0:087b5655778d | 617 | do_net_close_tx(cl_ctx, "snd-err"); |
dflet | 0:087b5655778d | 618 | |
dflet | 0:087b5655778d | 619 | cl_ctx_send_exit1: |
dflet | 0:087b5655778d | 620 | USR_INFO("C: %s 0x%02x to net %d, %s (%d Bytes) [@ %u]\n\r", |
dflet | 0:087b5655778d | 621 | ofs? "PartN" : "FH-B1", B1, cl_ctx->net, |
dflet | 0:087b5655778d | 622 | (rv > 0)? "Sent" : "Fail", rv, net_ops->time()); |
dflet | 0:087b5655778d | 623 | |
dflet | 0:087b5655778d | 624 | return rv; |
dflet | 0:087b5655778d | 625 | } |
dflet | 0:087b5655778d | 626 | |
dflet | 0:087b5655778d | 627 | static int32_t cl_ctx_seg1_send(struct client_ctx *cl_ctx, const uint8_t *buf, uint32_t len, |
dflet | 0:087b5655778d | 628 | bool is_conn_msg, struct mqtt_packet *tx_mqp) |
dflet | 0:087b5655778d | 629 | { |
dflet | 0:087b5655778d | 630 | |
dflet | 0:087b5655778d | 631 | struct tx_part_pkt *tx_part = &CLIENT(cl_ctx)->tx_part; |
dflet | 0:087b5655778d | 632 | |
dflet | 0:087b5655778d | 633 | /* For CONNECT msg, a client context mustn't be already connected. |
dflet | 0:087b5655778d | 634 | For others msgs, a client context must be conected to server */ |
dflet | 0:087b5655778d | 635 | if(false == (is_conn_msg ^ is_connected(cl_ctx))) |
dflet | 0:087b5655778d | 636 | return MQP_ERR_NOTCONN; |
dflet | 0:087b5655778d | 637 | |
dflet | 0:087b5655778d | 638 | if(TX_PART_IN_USE(tx_part)) |
dflet | 0:087b5655778d | 639 | return MQP_ERR_BADCALL; |
dflet | 0:087b5655778d | 640 | |
dflet | 0:087b5655778d | 641 | tx_part_setup(tx_part, buf, len, tx_mqp); |
dflet | 0:087b5655778d | 642 | |
dflet | 0:087b5655778d | 643 | return cl_ctx_part_send(cl_ctx); |
dflet | 0:087b5655778d | 644 | } |
dflet | 0:087b5655778d | 645 | |
dflet | 0:087b5655778d | 646 | int32_t mqtt_client_send_progress(void *ctx) |
dflet | 0:087b5655778d | 647 | { |
dflet | 0:087b5655778d | 648 | struct client_ctx *cl_ctx = CL_CTX(ctx); |
dflet | 0:087b5655778d | 649 | struct tx_part_pkt *tx_part = NULL; |
dflet | 0:087b5655778d | 650 | int32_t rv = MQP_ERR_BADCALL; |
dflet | 0:087b5655778d | 651 | |
dflet | 0:087b5655778d | 652 | if(NULL == ctx) |
dflet | 0:087b5655778d | 653 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 654 | |
dflet | 0:087b5655778d | 655 | tx_part = &CLIENT(cl_ctx)->tx_part; |
dflet | 0:087b5655778d | 656 | |
dflet | 0:087b5655778d | 657 | if(!TX_PART_IN_USE(tx_part)) |
dflet | 0:087b5655778d | 658 | return rv; |
dflet | 0:087b5655778d | 659 | |
dflet | 0:087b5655778d | 660 | rv = cl_ctx_part_send(cl_ctx); |
dflet | 0:087b5655778d | 661 | if(rv > 0) |
dflet | 0:087b5655778d | 662 | rv = TX_PART_BUF_SZ(tx_part); |
dflet | 0:087b5655778d | 663 | |
dflet | 0:087b5655778d | 664 | return rv; |
dflet | 0:087b5655778d | 665 | } |
dflet | 0:087b5655778d | 666 | |
dflet | 0:087b5655778d | 667 | static int32_t wr_connect_pl(struct client_ctx *cl_ctx, uint8_t *buf, |
dflet | 0:087b5655778d | 668 | uint32_t fsz, uint8_t *conn_opts) |
dflet | 0:087b5655778d | 669 | { |
dflet | 0:087b5655778d | 670 | |
dflet | 0:087b5655778d | 671 | /* UTF8 usage selection order: Client, W-Topic, W-Msg, User-Name, Pwd */ |
dflet | 0:087b5655778d | 672 | uint8_t utf8_sel[] = {0x00, WILL_CONFIG_VAL, 0x00, |
dflet | 0:087b5655778d | 673 | USER_NAME_OPVAL, PASS_WORD_OPVAL |
dflet | 0:087b5655778d | 674 | }; |
dflet | 0:087b5655778d | 675 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 676 | uint8_t *ref = buf; |
dflet | 0:087b5655778d | 677 | |
dflet | 0:087b5655778d | 678 | int32_t i = 0; |
dflet | 0:087b5655778d | 679 | |
dflet | 0:087b5655778d | 680 | for(i = 0; i < 5; i++) { /* TBD 5 --> macro */ |
dflet | 0:087b5655778d | 681 | uint16_t len = 2; |
dflet | 0:087b5655778d | 682 | const struct utf8_string *utf8 = client->conn_pl_utf8s[i]; |
dflet | 0:087b5655778d | 683 | if(NULL == utf8) { |
dflet | 0:087b5655778d | 684 | /* UTF8 absent: Client ID (i = 0) and Will MSG (i = 2) |
dflet | 0:087b5655778d | 685 | set zero byte length in the CONNECT message */ |
dflet | 0:087b5655778d | 686 | if(0 != i) |
dflet | 0:087b5655778d | 687 | if(!((2 == i) && (*conn_opts & WILL_CONFIG_VAL))) |
dflet | 0:087b5655778d | 688 | continue; /* Others, just pass */ |
dflet | 0:087b5655778d | 689 | } else { |
dflet | 0:087b5655778d | 690 | len += utf8->length; |
dflet | 0:087b5655778d | 691 | } |
dflet | 0:087b5655778d | 692 | |
dflet | 0:087b5655778d | 693 | if(fsz < (buf - ref + len)) { /* TBD end = ref + fsz */ |
dflet | 0:087b5655778d | 694 | Uart_Write((uint8_t*)"Payload: no space left fail\r\n"); |
dflet | 0:087b5655778d | 695 | return MQP_ERR_PKT_LEN; /* Payload: no space left */ |
dflet | 0:087b5655778d | 696 | } |
dflet | 0:087b5655778d | 697 | if(2 == len) { |
dflet | 0:087b5655778d | 698 | buf += buf_wr_nbo_2B(buf, 0); /* WR 0 byte length */ |
dflet | 0:087b5655778d | 699 | } else { |
dflet | 0:087b5655778d | 700 | buf += mqp_buf_wr_utf8(buf, utf8); |
dflet | 0:087b5655778d | 701 | } |
dflet | 0:087b5655778d | 702 | *conn_opts |= utf8_sel[i]; /* Enable message flags */ |
dflet | 0:087b5655778d | 703 | } |
dflet | 0:087b5655778d | 704 | |
dflet | 0:087b5655778d | 705 | return buf - ref; |
dflet | 0:087b5655778d | 706 | } |
dflet | 0:087b5655778d | 707 | |
dflet | 0:087b5655778d | 708 | |
dflet | 0:087b5655778d | 709 | /* Define protocol information for the supported versions */ |
dflet | 0:087b5655778d | 710 | static uint8_t mqtt310[] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', 0x03}; |
dflet | 0:087b5655778d | 711 | static uint8_t mqtt311[] = {0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04}; |
dflet | 0:087b5655778d | 712 | |
dflet | 0:087b5655778d | 713 | static inline uint16_t get_connect_vh_len(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 714 | { |
dflet | 0:087b5655778d | 715 | |
dflet | 0:087b5655778d | 716 | return (IS_PROTO_VER31(cl_ctx)? sizeof(mqtt310) : sizeof(mqtt311)) |
dflet | 0:087b5655778d | 717 | + 3; |
dflet | 0:087b5655778d | 718 | } |
dflet | 0:087b5655778d | 719 | |
dflet | 0:087b5655778d | 720 | static int32_t wr_connect_vh(struct client_ctx *cl_ctx, uint8_t *buf, |
dflet | 0:087b5655778d | 721 | uint16_t ka_secs, uint8_t conn_opts) |
dflet | 0:087b5655778d | 722 | { |
dflet | 0:087b5655778d | 723 | |
dflet | 0:087b5655778d | 724 | uint8_t *ref = buf; |
dflet | 0:087b5655778d | 725 | |
dflet | 0:087b5655778d | 726 | if(IS_PROTO_VER31(cl_ctx)) |
dflet | 0:087b5655778d | 727 | buf += buf_wr_nbytes(buf, mqtt310, sizeof(mqtt310)); |
dflet | 0:087b5655778d | 728 | else |
dflet | 0:087b5655778d | 729 | buf += buf_wr_nbytes(buf, mqtt311, sizeof(mqtt311)); |
dflet | 0:087b5655778d | 730 | |
dflet | 0:087b5655778d | 731 | *buf++ = conn_opts; |
dflet | 0:087b5655778d | 732 | buf += buf_wr_nbo_2B(buf, ka_secs); |
dflet | 0:087b5655778d | 733 | |
dflet | 0:087b5655778d | 734 | return buf - ref; |
dflet | 0:087b5655778d | 735 | } |
dflet | 0:087b5655778d | 736 | |
dflet | 0:087b5655778d | 737 | static int32_t net_connect(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 738 | { |
dflet | 0:087b5655778d | 739 | |
dflet | 0:087b5655778d | 740 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 741 | |
dflet | 0:087b5655778d | 742 | if(NEED_NET_CLOSE(cl_ctx)) { |
dflet | 0:087b5655778d | 743 | Uart_Write((uint8_t*)"Return MQP_ERR_NOT_DEF\r\n"); |
dflet | 0:087b5655778d | 744 | return MQP_ERR_NOT_DEF; |
dflet | 0:087b5655778d | 745 | } |
dflet | 0:087b5655778d | 746 | if(NULL == net_ops) { |
dflet | 0:087b5655778d | 747 | Uart_Write((uint8_t*)"Return MQP_ERR_NET_OPS\r\n"); |
dflet | 0:087b5655778d | 748 | return MQP_ERR_NET_OPS; |
dflet | 0:087b5655778d | 749 | } |
dflet | 0:087b5655778d | 750 | cl_ctx->net = net_ops->open(client->nwconn_opts | DEV_NETCONN_OPT_TCP, |
dflet | 0:087b5655778d | 751 | client->server_addr, |
dflet | 0:087b5655778d | 752 | client->port_number, |
dflet | 0:087b5655778d | 753 | &client->nw_security); |
dflet | 0:087b5655778d | 754 | |
dflet | 0:087b5655778d | 755 | return (-1 == cl_ctx->net)? MQP_ERR_NETWORK : 0; |
dflet | 0:087b5655778d | 756 | } |
dflet | 0:087b5655778d | 757 | |
dflet | 0:087b5655778d | 758 | static |
dflet | 0:087b5655778d | 759 | int32_t cl_ctx_conn_state_try_locked(struct client_ctx *cl_ctx, const uint8_t *buf, |
dflet | 0:087b5655778d | 760 | uint32_t len, uint16_t ka_secs, bool clean_session, |
dflet | 0:087b5655778d | 761 | struct mqtt_packet *tx_mqp) |
dflet | 0:087b5655778d | 762 | { |
dflet | 0:087b5655778d | 763 | |
dflet | 0:087b5655778d | 764 | int32_t rv = 0; |
dflet | 0:087b5655778d | 765 | |
dflet | 0:087b5655778d | 766 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 767 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 768 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 769 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 770 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 100 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 771 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 772 | // shared resource. |
dflet | 0:087b5655778d | 773 | |
dflet | 0:087b5655778d | 774 | rv = net_connect(cl_ctx); |
dflet | 0:087b5655778d | 775 | if(rv < 0) { |
dflet | 0:087b5655778d | 776 | Uart_Write((uint8_t*)"net_connect failed\r\n"); |
dflet | 0:087b5655778d | 777 | goto cl_ctx_conn_state_try_locked_exit1; |
dflet | 0:087b5655778d | 778 | } |
dflet | 0:087b5655778d | 779 | /* Ensure LIB is initialized & CTX isn't awaiting CONNACK */ |
dflet | 0:087b5655778d | 780 | rv = MQP_ERR_BADCALL; |
dflet | 0:087b5655778d | 781 | if(false == ((INIT_DONE_STATE != cl_lib_state) || (AWAITS_CONNACK(cl_ctx)))) { |
dflet | 0:087b5655778d | 782 | rv = cl_ctx_seg1_send(cl_ctx, buf, len, true, tx_mqp); |
dflet | 0:087b5655778d | 783 | } |
dflet | 0:087b5655778d | 784 | if(rv < 0) { |
dflet | 0:087b5655778d | 785 | Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n"); |
dflet | 0:087b5655778d | 786 | goto cl_ctx_conn_state_try_locked_exit1; /* Fail */ |
dflet | 0:087b5655778d | 787 | } |
dflet | 0:087b5655778d | 788 | /* Successfully sent CONNECT msg - let's do housekeeping */ |
dflet | 0:087b5655778d | 789 | cl_ctx->timeout = net_ops->time();/* Fixup: CONN TX Time */ |
dflet | 0:087b5655778d | 790 | cl_ctx->flags |= DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG; |
dflet | 0:087b5655778d | 791 | cl_ctx->flags |= clean_session? CLEAN_SESSION_FLAG : 0; |
dflet | 0:087b5655778d | 792 | |
dflet | 0:087b5655778d | 793 | cl_ctx->ka_secs = ka_secs; |
dflet | 0:087b5655778d | 794 | |
dflet | 0:087b5655778d | 795 | if(A_GROUP_MEMBER(cl_ctx)) { |
dflet | 0:087b5655778d | 796 | cl_ctx->next = conn_ctxs; |
dflet | 0:087b5655778d | 797 | conn_ctxs = cl_ctx; |
dflet | 0:087b5655778d | 798 | |
dflet | 0:087b5655778d | 799 | /* First entry in 'conn_ctxs': schedule a move to |
dflet | 0:087b5655778d | 800 | 'used_conn' (for house-keeping and tracking) */ |
dflet | 0:087b5655778d | 801 | if(NULL == cl_ctx->next) { |
dflet | 0:087b5655778d | 802 | rv = loopb_trigger(); |
dflet | 0:087b5655778d | 803 | } |
dflet | 0:087b5655778d | 804 | } |
dflet | 0:087b5655778d | 805 | |
dflet | 0:087b5655778d | 806 | |
dflet | 0:087b5655778d | 807 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 808 | // semaphore. |
dflet | 0:087b5655778d | 809 | //xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 810 | } else { |
dflet | 0:087b5655778d | 811 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 812 | // the shared resource safely. |
dflet | 0:087b5655778d | 813 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 814 | } |
dflet | 0:087b5655778d | 815 | } |
dflet | 0:087b5655778d | 816 | |
dflet | 0:087b5655778d | 817 | cl_ctx_conn_state_try_locked_exit1: |
dflet | 0:087b5655778d | 818 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 819 | xSemaphoreGive(xSemaphore); |
dflet | 0:087b5655778d | 820 | |
dflet | 0:087b5655778d | 821 | return rv; |
dflet | 0:087b5655778d | 822 | } |
dflet | 0:087b5655778d | 823 | |
dflet | 0:087b5655778d | 824 | static |
dflet | 0:087b5655778d | 825 | int32_t connect_msg_send(struct client_ctx *cl_ctx, bool clean_session, uint16_t ka_secs) |
dflet | 0:087b5655778d | 826 | { |
dflet | 0:087b5655778d | 827 | |
dflet | 0:087b5655778d | 828 | struct mqtt_packet *mqp = mqp_client_send_alloc(MQTT_CONNECT); |
dflet | 0:087b5655778d | 829 | uint8_t *buf, *ref, conn_opts = clean_session? CLEAN_START_VAL : 0; |
dflet | 0:087b5655778d | 830 | int32_t rv = MQP_ERR_PKT_LEN; |
dflet | 0:087b5655778d | 831 | uint32_t fsz; /* Free buffer size in PKT */ |
dflet | 0:087b5655778d | 832 | uint16_t vhl = get_connect_vh_len(cl_ctx); |
dflet | 0:087b5655778d | 833 | |
dflet | 0:087b5655778d | 834 | if(NULL == mqp) { |
dflet | 0:087b5655778d | 835 | Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\r\n"); |
dflet | 0:087b5655778d | 836 | return MQP_ERR_PKT_AVL; |
dflet | 0:087b5655778d | 837 | } |
dflet | 0:087b5655778d | 838 | fsz = MQP_FREEBUF_LEN(mqp); |
dflet | 0:087b5655778d | 839 | if(fsz < vhl) { |
dflet | 0:087b5655778d | 840 | Uart_Write((uint8_t*)"No space for VAR HDR\r\n"); |
dflet | 0:087b5655778d | 841 | goto connect_msg_send_exit1; /* No space for VAR HDR */ |
dflet | 0:087b5655778d | 842 | } |
dflet | 0:087b5655778d | 843 | mqp->vh_len = vhl; /* Reserve buffer for variable header */ |
dflet | 0:087b5655778d | 844 | buf = ref = MQP_PAYLOAD_BUF(mqp);/* Get started to incorporate payload */ |
dflet | 0:087b5655778d | 845 | |
dflet | 0:087b5655778d | 846 | rv = wr_connect_pl(cl_ctx, buf, fsz - vhl, &conn_opts);/* Payload data */ |
dflet | 0:087b5655778d | 847 | if(rv < 0) { |
dflet | 0:087b5655778d | 848 | memset(print_buf, 0x00, PRINT_BUF_LEN); |
dflet | 0:087b5655778d | 849 | sprintf((char*) print_buf, "Payload WR failed %i\r\n",rv); |
dflet | 0:087b5655778d | 850 | Uart_Write((uint8_t *) print_buf); |
dflet | 0:087b5655778d | 851 | goto connect_msg_send_exit1; /* Payload WR failed */ |
dflet | 0:087b5655778d | 852 | } |
dflet | 0:087b5655778d | 853 | buf += rv; |
dflet | 0:087b5655778d | 854 | mqp->pl_len = buf - ref; |
dflet | 0:087b5655778d | 855 | |
dflet | 0:087b5655778d | 856 | wr_connect_vh(cl_ctx, ref - vhl, ka_secs, |
dflet | 0:087b5655778d | 857 | CLIENT(cl_ctx)->will_opts | conn_opts); /* Var Header */ |
dflet | 0:087b5655778d | 858 | |
dflet | 0:087b5655778d | 859 | mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, MQTT_QOS0, false));/* Fix Header */ |
dflet | 0:087b5655778d | 860 | ref = MQP_FHEADER_BUF(mqp); |
dflet | 0:087b5655778d | 861 | |
dflet | 0:087b5655778d | 862 | /* Following routine frees up MQP - whether error or not */ |
dflet | 0:087b5655778d | 863 | return cl_ctx_conn_state_try_locked(cl_ctx, ref, buf - ref, |
dflet | 0:087b5655778d | 864 | ka_secs, clean_session, |
dflet | 0:087b5655778d | 865 | mqp); |
dflet | 0:087b5655778d | 866 | connect_msg_send_exit1: |
dflet | 0:087b5655778d | 867 | |
dflet | 0:087b5655778d | 868 | if(mqp) { |
dflet | 0:087b5655778d | 869 | mqp_free_locked(mqp); |
dflet | 0:087b5655778d | 870 | } |
dflet | 0:087b5655778d | 871 | return rv; |
dflet | 0:087b5655778d | 872 | } |
dflet | 0:087b5655778d | 873 | |
dflet | 0:087b5655778d | 874 | int32_t mqtt_connect_msg_send(void *ctx, bool clean_session, uint16_t ka_secs) |
dflet | 0:087b5655778d | 875 | { |
dflet | 0:087b5655778d | 876 | |
dflet | 0:087b5655778d | 877 | return ctx? |
dflet | 0:087b5655778d | 878 | connect_msg_send(CL_CTX(ctx), clean_session, ka_secs) : -1; |
dflet | 0:087b5655778d | 879 | } |
dflet | 0:087b5655778d | 880 | |
dflet | 0:087b5655778d | 881 | /* |
dflet | 0:087b5655778d | 882 | To be used for the following messages: PUBLISH, SUBSCRIBE, UNSUBSCRIBE |
dflet | 0:087b5655778d | 883 | Dispatches msg to broker over socket. Frees-up MQP, in case, MSG has QoS0 or |
dflet | 0:087b5655778d | 884 | if client-lib allocated MQP encounters an error in dispatch. |
dflet | 0:087b5655778d | 885 | Returns, on success, number of bytes transfered, otherwise -1 |
dflet | 0:087b5655778d | 886 | */ |
dflet | 0:087b5655778d | 887 | static int32_t _msg_dispatch(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 888 | enum mqtt_qos qos, bool retain) |
dflet | 0:087b5655778d | 889 | { |
dflet | 0:087b5655778d | 890 | |
dflet | 0:087b5655778d | 891 | bool not_qos0 = (MQTT_QOS0 != qos)? true : false; |
dflet | 0:087b5655778d | 892 | uint16_t msg_id = mqp->msg_id; |
dflet | 0:087b5655778d | 893 | int32_t rv = MQP_ERR_NETWORK; |
dflet | 0:087b5655778d | 894 | |
dflet | 0:087b5655778d | 895 | mqp_prep_fh(mqp, MAKE_FH_FLAGS(false, qos, retain)); |
dflet | 0:087b5655778d | 896 | |
dflet | 0:087b5655778d | 897 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 898 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 899 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 900 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 901 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 200 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 902 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 903 | // shared resource. |
dflet | 0:087b5655778d | 904 | |
dflet | 0:087b5655778d | 905 | if(not_qos0) { |
dflet | 0:087b5655778d | 906 | mqp->n_refs++; /* Need to enlist, do not free-up MQP */ |
dflet | 0:087b5655778d | 907 | } |
dflet | 0:087b5655778d | 908 | /* Tries to free-up MQP either on error or if full pkt is sent */ |
dflet | 0:087b5655778d | 909 | rv = cl_ctx_seg1_send(cl_ctx, MQP_FHEADER_BUF(mqp), |
dflet | 0:087b5655778d | 910 | MQP_CONTENT_LEN(mqp), false, |
dflet | 0:087b5655778d | 911 | mqp); |
dflet | 0:087b5655778d | 912 | |
dflet | 0:087b5655778d | 913 | /* At this point, error or not, QoS0 MQP would have been freed */ |
dflet | 0:087b5655778d | 914 | |
dflet | 0:087b5655778d | 915 | if((rv <= 0) && not_qos0) { |
dflet | 0:087b5655778d | 916 | mqp_free(mqp); /* Err: Explicitly free-up non QoS0 MQP */ |
dflet | 0:087b5655778d | 917 | Uart_Write((uint8_t*)"cl_ctx_seg1_send failed\r\n"); |
dflet | 0:087b5655778d | 918 | goto _msg_dispatch_exit1; |
dflet | 0:087b5655778d | 919 | } |
dflet | 0:087b5655778d | 920 | |
dflet | 0:087b5655778d | 921 | rv = msg_id; /* Make progress for a good send to the server */ |
dflet | 0:087b5655778d | 922 | |
dflet | 0:087b5655778d | 923 | if(not_qos0) { /* Enlist non QOS0 MQP to await ACK from server */ |
dflet | 0:087b5655778d | 924 | mqp_ack_wlist_append(&CLIENT(cl_ctx)->qos_ack1_wl, mqp); |
dflet | 0:087b5655778d | 925 | } |
dflet | 0:087b5655778d | 926 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 927 | // semaphore. |
dflet | 0:087b5655778d | 928 | //xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 929 | } else { |
dflet | 0:087b5655778d | 930 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 931 | // the shared resource safely. |
dflet | 0:087b5655778d | 932 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 933 | } |
dflet | 0:087b5655778d | 934 | } |
dflet | 0:087b5655778d | 935 | |
dflet | 0:087b5655778d | 936 | _msg_dispatch_exit1: |
dflet | 0:087b5655778d | 937 | |
dflet | 0:087b5655778d | 938 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 939 | xSemaphoreGive(xSemaphore); |
dflet | 0:087b5655778d | 940 | |
dflet | 0:087b5655778d | 941 | return rv; |
dflet | 0:087b5655778d | 942 | } |
dflet | 0:087b5655778d | 943 | |
dflet | 0:087b5655778d | 944 | static |
dflet | 0:087b5655778d | 945 | int32_t msg_dispatch_no_free(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 946 | enum mqtt_qos qos, bool retain) |
dflet | 0:087b5655778d | 947 | { |
dflet | 0:087b5655778d | 948 | if((NULL == mqp) || (NULL == cl_ctx)) |
dflet | 0:087b5655778d | 949 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 950 | |
dflet | 0:087b5655778d | 951 | mqp->n_refs++; /* Ensures caller that MQP is not freed-up */ |
dflet | 0:087b5655778d | 952 | |
dflet | 0:087b5655778d | 953 | return _msg_dispatch(cl_ctx, mqp, qos, retain); |
dflet | 0:087b5655778d | 954 | } |
dflet | 0:087b5655778d | 955 | |
dflet | 0:087b5655778d | 956 | int32_t mqtt_client_pub_msg_send(void *ctx, const struct utf8_string *topic, |
dflet | 0:087b5655778d | 957 | const uint8_t *data_buf, uint32_t data_len, |
dflet | 0:087b5655778d | 958 | enum mqtt_qos qos, bool retain) |
dflet | 0:087b5655778d | 959 | { |
dflet | 0:087b5655778d | 960 | |
dflet | 0:087b5655778d | 961 | struct mqtt_packet *mqp = NULL; |
dflet | 0:087b5655778d | 962 | |
dflet | 0:087b5655778d | 963 | if((NULL == ctx) || |
dflet | 0:087b5655778d | 964 | (NULL == topic) || |
dflet | 0:087b5655778d | 965 | ((data_len > 0) && (NULL == data_buf))) { |
dflet | 0:087b5655778d | 966 | Uart_Write((uint8_t*)"MQP_ERR_FNPARAM\n\r"); |
dflet | 0:087b5655778d | 967 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 968 | } |
dflet | 0:087b5655778d | 969 | if(false == is_valid_utf8_string(topic)) { |
dflet | 0:087b5655778d | 970 | Uart_Write((uint8_t*)"MQP_ERR_CONTENT\n\r"); |
dflet | 0:087b5655778d | 971 | return MQP_ERR_CONTENT; |
dflet | 0:087b5655778d | 972 | } |
dflet | 0:087b5655778d | 973 | mqp = mqp_client_send_alloc(MQTT_PUBLISH); |
dflet | 0:087b5655778d | 974 | if(NULL == mqp) { |
dflet | 0:087b5655778d | 975 | Uart_Write((uint8_t*)"MQP_ERR_PKT_AVL\n\r"); |
dflet | 0:087b5655778d | 976 | return MQP_ERR_PKT_AVL; |
dflet | 0:087b5655778d | 977 | } |
dflet | 0:087b5655778d | 978 | if((0 > mqp_pub_append_topic(mqp, topic, qos? assign_new_msg_id(): 0)) || |
dflet | 0:087b5655778d | 979 | (data_len && (0 > mqp_pub_append_data(mqp, data_buf, data_len)))) { |
dflet | 0:087b5655778d | 980 | Uart_Write((uint8_t*)"len_err\n\r"); |
dflet | 0:087b5655778d | 981 | return len_err_free_mqp(mqp); |
dflet | 0:087b5655778d | 982 | } |
dflet | 0:087b5655778d | 983 | |
dflet | 0:087b5655778d | 984 | return _msg_dispatch(CL_CTX(ctx), mqp, qos, retain); |
dflet | 0:087b5655778d | 985 | } |
dflet | 0:087b5655778d | 986 | |
dflet | 0:087b5655778d | 987 | int32_t mqtt_client_pub_dispatch(void *ctx, struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 988 | enum mqtt_qos qos, bool retain) |
dflet | 0:087b5655778d | 989 | { |
dflet | 0:087b5655778d | 990 | return msg_dispatch_no_free(CL_CTX(ctx), mqp, qos, retain); |
dflet | 0:087b5655778d | 991 | } |
dflet | 0:087b5655778d | 992 | |
dflet | 0:087b5655778d | 993 | static int32_t tail_incorp_msg_id(struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 994 | { |
dflet | 0:087b5655778d | 995 | uint8_t *buf = MQP_FHEADER_BUF(mqp) + mqp->vh_len; |
dflet | 0:087b5655778d | 996 | |
dflet | 0:087b5655778d | 997 | if(0 == mqp->msg_id) { |
dflet | 0:087b5655778d | 998 | mqp->msg_id = assign_new_msg_id(); |
dflet | 0:087b5655778d | 999 | buf += buf_wr_nbo_2B(buf, mqp->msg_id); |
dflet | 0:087b5655778d | 1000 | mqp->vh_len += 2; |
dflet | 0:087b5655778d | 1001 | |
dflet | 0:087b5655778d | 1002 | return 2; |
dflet | 0:087b5655778d | 1003 | } |
dflet | 0:087b5655778d | 1004 | |
dflet | 0:087b5655778d | 1005 | return 0; |
dflet | 0:087b5655778d | 1006 | } |
dflet | 0:087b5655778d | 1007 | |
dflet | 0:087b5655778d | 1008 | static int32_t buf_utf8_wr_try(uint8_t *buf, uint32_t fsz, const struct utf8_string *topic, |
dflet | 0:087b5655778d | 1009 | uint8_t qid) |
dflet | 0:087b5655778d | 1010 | { |
dflet | 0:087b5655778d | 1011 | uint8_t *ref = buf; |
dflet | 0:087b5655778d | 1012 | |
dflet | 0:087b5655778d | 1013 | if(fsz < (topic->length + 2 + (QFL_VALUE == qid)? 0 : 1)) |
dflet | 0:087b5655778d | 1014 | return MQP_ERR_PKT_LEN; /* No buf */ |
dflet | 0:087b5655778d | 1015 | |
dflet | 0:087b5655778d | 1016 | if(false == is_valid_utf8_string(topic)) |
dflet | 0:087b5655778d | 1017 | return MQP_ERR_CONTENT;/* Invalid */ |
dflet | 0:087b5655778d | 1018 | |
dflet | 0:087b5655778d | 1019 | buf += mqp_buf_wr_utf8(buf, topic); |
dflet | 0:087b5655778d | 1020 | if(QFL_VALUE != qid) |
dflet | 0:087b5655778d | 1021 | *buf++ = qid; |
dflet | 0:087b5655778d | 1022 | |
dflet | 0:087b5655778d | 1023 | return buf - ref; |
dflet | 0:087b5655778d | 1024 | } |
dflet | 0:087b5655778d | 1025 | |
dflet | 0:087b5655778d | 1026 | static int32_t utf8_array_send(struct client_ctx *cl_ctx, |
dflet | 0:087b5655778d | 1027 | const struct utf8_strqos *subsc_vec, |
dflet | 0:087b5655778d | 1028 | const struct utf8_string *unsub_vec, |
dflet | 0:087b5655778d | 1029 | uint32_t n_elem) |
dflet | 0:087b5655778d | 1030 | { |
dflet | 0:087b5655778d | 1031 | struct mqtt_packet *mqp; |
dflet | 0:087b5655778d | 1032 | uint8_t *ref, *buf, *end; |
dflet | 0:087b5655778d | 1033 | uint32_t i; |
dflet | 0:087b5655778d | 1034 | |
dflet | 0:087b5655778d | 1035 | if((NULL == cl_ctx) || !((!!subsc_vec) ^ (!!unsub_vec)) || (0 == n_elem)) |
dflet | 0:087b5655778d | 1036 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 1037 | |
dflet | 0:087b5655778d | 1038 | mqp = mqp_client_send_alloc(subsc_vec? |
dflet | 0:087b5655778d | 1039 | MQTT_SUBSCRIBE : MQTT_UNSUBSCRIBE); |
dflet | 0:087b5655778d | 1040 | if(NULL == mqp) |
dflet | 0:087b5655778d | 1041 | return MQP_ERR_PKT_AVL; |
dflet | 0:087b5655778d | 1042 | |
dflet | 0:087b5655778d | 1043 | buf = MQP_VHEADER_BUF(mqp); |
dflet | 0:087b5655778d | 1044 | end = MQP_FREEBUF_LEN(mqp) + buf; /* End of free buffer */ |
dflet | 0:087b5655778d | 1045 | if((end - buf) < 2) |
dflet | 0:087b5655778d | 1046 | return len_err_free_mqp(mqp);/* MSG-ID: no space */ |
dflet | 0:087b5655778d | 1047 | |
dflet | 0:087b5655778d | 1048 | buf += tail_incorp_msg_id(mqp); |
dflet | 0:087b5655778d | 1049 | ref = buf; |
dflet | 0:087b5655778d | 1050 | |
dflet | 0:087b5655778d | 1051 | for(i = 0; i < n_elem; i++) { |
dflet | 0:087b5655778d | 1052 | const struct utf8_string *topic; |
dflet | 0:087b5655778d | 1053 | struct utf8_string topreq; |
dflet | 0:087b5655778d | 1054 | int32_t rv; |
dflet | 0:087b5655778d | 1055 | |
dflet | 0:087b5655778d | 1056 | if(subsc_vec) { |
dflet | 0:087b5655778d | 1057 | topreq.length = subsc_vec[i].length; |
dflet | 0:087b5655778d | 1058 | topreq.buffer = subsc_vec[i].buffer; |
dflet | 0:087b5655778d | 1059 | topic = &topreq; |
dflet | 0:087b5655778d | 1060 | } else |
dflet | 0:087b5655778d | 1061 | topic = unsub_vec + i; |
dflet | 0:087b5655778d | 1062 | |
dflet | 0:087b5655778d | 1063 | rv = buf_utf8_wr_try(buf, end - buf, topic, subsc_vec? |
dflet | 0:087b5655778d | 1064 | (uint8_t)subsc_vec[i].qosreq : QFL_VALUE); |
dflet | 0:087b5655778d | 1065 | if(rv < 0) { |
dflet | 0:087b5655778d | 1066 | mqp_free(mqp); |
dflet | 0:087b5655778d | 1067 | return rv; |
dflet | 0:087b5655778d | 1068 | } |
dflet | 0:087b5655778d | 1069 | |
dflet | 0:087b5655778d | 1070 | buf += rv; |
dflet | 0:087b5655778d | 1071 | } |
dflet | 0:087b5655778d | 1072 | |
dflet | 0:087b5655778d | 1073 | mqp->pl_len = buf - ref; /* Total length of topics data */ |
dflet | 0:087b5655778d | 1074 | |
dflet | 0:087b5655778d | 1075 | return _msg_dispatch(cl_ctx, mqp, MQTT_QOS1, false); |
dflet | 0:087b5655778d | 1076 | } |
dflet | 0:087b5655778d | 1077 | |
dflet | 0:087b5655778d | 1078 | int32_t mqtt_sub_msg_send(void *ctx, const struct utf8_strqos *qos_topics, uint32_t count) |
dflet | 0:087b5655778d | 1079 | { |
dflet | 0:087b5655778d | 1080 | return utf8_array_send(CL_CTX(ctx), qos_topics, NULL, count); |
dflet | 0:087b5655778d | 1081 | } |
dflet | 0:087b5655778d | 1082 | |
dflet | 0:087b5655778d | 1083 | int32_t mqtt_sub_dispatch(void *ctx, struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 1084 | { |
dflet | 0:087b5655778d | 1085 | return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false); |
dflet | 0:087b5655778d | 1086 | } |
dflet | 0:087b5655778d | 1087 | |
dflet | 0:087b5655778d | 1088 | int32_t mqtt_unsub_msg_send(void *ctx, const struct utf8_string *topics, uint32_t count) |
dflet | 0:087b5655778d | 1089 | { |
dflet | 0:087b5655778d | 1090 | return utf8_array_send(CL_CTX(ctx), NULL, topics, count); |
dflet | 0:087b5655778d | 1091 | } |
dflet | 0:087b5655778d | 1092 | |
dflet | 0:087b5655778d | 1093 | int32_t mqtt_unsub_dispatch(void *ctx, struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 1094 | { |
dflet | 0:087b5655778d | 1095 | return msg_dispatch_no_free(CL_CTX(ctx), mqp, MQTT_QOS1, false); |
dflet | 0:087b5655778d | 1096 | } |
dflet | 0:087b5655778d | 1097 | |
dflet | 0:087b5655778d | 1098 | /* Note: in this revision of implementation, vh_msg_send() is being invoked |
dflet | 0:087b5655778d | 1099 | from a locked RX context. Should this situation change, so should the |
dflet | 0:087b5655778d | 1100 | 'locking' considerations in the routine. */ |
dflet | 0:087b5655778d | 1101 | static int32_t vh_msg_send(struct client_ctx *cl_ctx, uint8_t msg_type, |
dflet | 0:087b5655778d | 1102 | enum mqtt_qos qos, bool has_vh, |
dflet | 0:087b5655778d | 1103 | uint16_t vh_data) |
dflet | 0:087b5655778d | 1104 | { |
dflet | 0:087b5655778d | 1105 | uint8_t buf[4]; |
dflet | 0:087b5655778d | 1106 | uint32_t len = 2; |
dflet | 0:087b5655778d | 1107 | |
dflet | 0:087b5655778d | 1108 | buf[0] = MAKE_FH_BYTE1(msg_type, MAKE_FH_FLAGS(false, qos, false)); |
dflet | 0:087b5655778d | 1109 | buf[1] = has_vh ? 2 : 0; |
dflet | 0:087b5655778d | 1110 | |
dflet | 0:087b5655778d | 1111 | if(has_vh) |
dflet | 0:087b5655778d | 1112 | len += buf_wr_nbo_2B(buf + 2, vh_data); |
dflet | 0:087b5655778d | 1113 | |
dflet | 0:087b5655778d | 1114 | return cl_ctx_seg1_send(cl_ctx, buf, len, false, NULL); |
dflet | 0:087b5655778d | 1115 | } |
dflet | 0:087b5655778d | 1116 | |
dflet | 0:087b5655778d | 1117 | static int32_t pingreq_send(struct client_ctx *cl_ctx, uint32_t rsp_flag) |
dflet | 0:087b5655778d | 1118 | { |
dflet | 0:087b5655778d | 1119 | |
dflet | 0:087b5655778d | 1120 | int32_t rv = 0; |
dflet | 0:087b5655778d | 1121 | uint8_t buf[2]; |
dflet | 0:087b5655778d | 1122 | |
dflet | 0:087b5655778d | 1123 | buf[0] = MAKE_FH_BYTE1(MQTT_PINGREQ, |
dflet | 0:087b5655778d | 1124 | MAKE_FH_FLAGS(false, MQTT_QOS0, false)); |
dflet | 0:087b5655778d | 1125 | buf[1] = 0; |
dflet | 0:087b5655778d | 1126 | |
dflet | 0:087b5655778d | 1127 | /* Note: in case of error in network send, cl_ctx_send() may |
dflet | 0:087b5655778d | 1128 | try to terminate connection with server. */ |
dflet | 0:087b5655778d | 1129 | rv = cl_ctx_seg1_send(cl_ctx, buf, 2, false, NULL); |
dflet | 0:087b5655778d | 1130 | if(rv > 0) |
dflet | 0:087b5655778d | 1131 | cl_ctx->flags |= rsp_flag; |
dflet | 0:087b5655778d | 1132 | |
dflet | 0:087b5655778d | 1133 | return rv; |
dflet | 0:087b5655778d | 1134 | } |
dflet | 0:087b5655778d | 1135 | |
dflet | 0:087b5655778d | 1136 | int32_t mqtt_pingreq_send(void *ctx) |
dflet | 0:087b5655778d | 1137 | { |
dflet | 0:087b5655778d | 1138 | Uart_Write((uint8_t*)"mqtt_pingreq_send\r\n"); |
dflet | 0:087b5655778d | 1139 | int32_t rv = 0; |
dflet | 0:087b5655778d | 1140 | |
dflet | 0:087b5655778d | 1141 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 1142 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 1143 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 1144 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 1145 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 1146 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 1147 | // shared resource. |
dflet | 0:087b5655778d | 1148 | rv = pingreq_send(CL_CTX(ctx), USER_PING_RSP_FLAG); |
dflet | 0:087b5655778d | 1149 | |
dflet | 0:087b5655778d | 1150 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 1151 | // semaphore. |
dflet | 0:087b5655778d | 1152 | xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 1153 | |
dflet | 0:087b5655778d | 1154 | } else { |
dflet | 0:087b5655778d | 1155 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 1156 | // the shared resource safely. |
dflet | 0:087b5655778d | 1157 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 1158 | } |
dflet | 0:087b5655778d | 1159 | } |
dflet | 0:087b5655778d | 1160 | |
dflet | 0:087b5655778d | 1161 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 1162 | return rv; |
dflet | 0:087b5655778d | 1163 | } |
dflet | 0:087b5655778d | 1164 | |
dflet | 0:087b5655778d | 1165 | int32_t mqtt_disconn_send(void *ctx) |
dflet | 0:087b5655778d | 1166 | { |
dflet | 0:087b5655778d | 1167 | Uart_Write((uint8_t*)"mqtt_disconn_send\r\n"); |
dflet | 0:087b5655778d | 1168 | uint8_t buf[2]; |
dflet | 0:087b5655778d | 1169 | |
dflet | 0:087b5655778d | 1170 | buf[0] = MAKE_FH_BYTE1(MQTT_DISCONNECT, |
dflet | 0:087b5655778d | 1171 | MAKE_FH_FLAGS(false, MQTT_QOS0, false)); |
dflet | 0:087b5655778d | 1172 | buf[1] = 0; |
dflet | 0:087b5655778d | 1173 | |
dflet | 0:087b5655778d | 1174 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 1175 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 1176 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 1177 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 1178 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 1179 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 1180 | // shared resource. |
dflet | 0:087b5655778d | 1181 | /* Note: in case of error in network send, cl_ctx_send() may |
dflet | 0:087b5655778d | 1182 | try to terminate connection with server. */ |
dflet | 0:087b5655778d | 1183 | if(cl_ctx_seg1_send(CL_CTX(ctx), buf, 2, false, NULL) > 0) { |
dflet | 0:087b5655778d | 1184 | /* Terminate connection on application's request */ |
dflet | 0:087b5655778d | 1185 | do_net_close_tx(CL_CTX(ctx), "DISCONN"); |
dflet | 0:087b5655778d | 1186 | } |
dflet | 0:087b5655778d | 1187 | |
dflet | 0:087b5655778d | 1188 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 1189 | // semaphore. |
dflet | 0:087b5655778d | 1190 | xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 1191 | |
dflet | 0:087b5655778d | 1192 | } else { |
dflet | 0:087b5655778d | 1193 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 1194 | // the shared resource safely. |
dflet | 0:087b5655778d | 1195 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 1196 | } |
dflet | 0:087b5655778d | 1197 | } |
dflet | 0:087b5655778d | 1198 | |
dflet | 0:087b5655778d | 1199 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 1200 | return 0; |
dflet | 0:087b5655778d | 1201 | } |
dflet | 0:087b5655778d | 1202 | |
dflet | 0:087b5655778d | 1203 | /*------------------------------------------------------------------------------ |
dflet | 0:087b5655778d | 1204 | * MQTT RX Routines |
dflet | 0:087b5655778d | 1205 | *------------------------------------------------------------------------------ |
dflet | 0:087b5655778d | 1206 | */ |
dflet | 0:087b5655778d | 1207 | static bool ack1_wl_mqp_dispatch(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 1208 | { |
dflet | 0:087b5655778d | 1209 | struct mqtt_ack_wlist *wlist = &CLIENT(cl_ctx)->qos_ack1_wl; |
dflet | 0:087b5655778d | 1210 | struct mqtt_packet *mqp = NULL; |
dflet | 0:087b5655778d | 1211 | bool rv = true; |
dflet | 0:087b5655778d | 1212 | |
dflet | 0:087b5655778d | 1213 | for(mqp = wlist->head; mqp && (true == rv); mqp = mqp->next) { |
dflet | 0:087b5655778d | 1214 | uint8_t *buf = MQP_FHEADER_BUF(mqp); |
dflet | 0:087b5655778d | 1215 | mqp->fh_byte1 = *buf |= DUP_FLAG_VAL(true); |
dflet | 0:087b5655778d | 1216 | |
dflet | 0:087b5655778d | 1217 | mqp->n_refs++; /* Ensures MQP is not freed by following */ |
dflet | 0:087b5655778d | 1218 | |
dflet | 0:087b5655778d | 1219 | /* Error or not, following routine tries to free up MQP */ |
dflet | 0:087b5655778d | 1220 | if(cl_ctx_seg1_send(cl_ctx, buf, MQP_CONTENT_LEN(mqp), |
dflet | 0:087b5655778d | 1221 | false, mqp) <= 0) |
dflet | 0:087b5655778d | 1222 | rv = false; |
dflet | 0:087b5655778d | 1223 | } |
dflet | 0:087b5655778d | 1224 | |
dflet | 0:087b5655778d | 1225 | return rv; |
dflet | 0:087b5655778d | 1226 | } |
dflet | 0:087b5655778d | 1227 | |
dflet | 0:087b5655778d | 1228 | /* TBD candidate for common */ |
dflet | 0:087b5655778d | 1229 | static bool ack2_msg_id_dispatch(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 1230 | { |
dflet | 0:087b5655778d | 1231 | struct pub_qos2_cq *tx_cq = &CLIENT(cl_ctx)->qos2_tx_cq; |
dflet | 0:087b5655778d | 1232 | uint8_t rd_idx = tx_cq->rd_idx; |
dflet | 0:087b5655778d | 1233 | uint8_t n_free = tx_cq->n_free; |
dflet | 0:087b5655778d | 1234 | bool rv = true; |
dflet | 0:087b5655778d | 1235 | uint8_t i = 0; |
dflet | 0:087b5655778d | 1236 | |
dflet | 0:087b5655778d | 1237 | for(i = rd_idx; i < (MAX_PUBREL_INFLT - n_free) && (true == rv); i++) { |
dflet | 0:087b5655778d | 1238 | if(vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1, |
dflet | 0:087b5655778d | 1239 | true, tx_cq->id_vec[i]) <= 0) |
dflet | 0:087b5655778d | 1240 | rv = false; |
dflet | 0:087b5655778d | 1241 | } |
dflet | 0:087b5655778d | 1242 | |
dflet | 0:087b5655778d | 1243 | return rv; |
dflet | 0:087b5655778d | 1244 | } |
dflet | 0:087b5655778d | 1245 | |
dflet | 0:087b5655778d | 1246 | static void session_resume(struct client_ctx *cl_ctx) |
dflet | 0:087b5655778d | 1247 | { |
dflet | 0:087b5655778d | 1248 | DBG_INFO("C: Re-send ACK awaited QoS1/2 msgs to net %d\n\r", |
dflet | 0:087b5655778d | 1249 | cl_ctx->net); |
dflet | 0:087b5655778d | 1250 | |
dflet | 0:087b5655778d | 1251 | if(ack1_wl_mqp_dispatch(cl_ctx)) |
dflet | 0:087b5655778d | 1252 | ack2_msg_id_dispatch(cl_ctx); |
dflet | 0:087b5655778d | 1253 | |
dflet | 0:087b5655778d | 1254 | return; |
dflet | 0:087b5655778d | 1255 | } |
dflet | 0:087b5655778d | 1256 | |
dflet | 0:087b5655778d | 1257 | static bool ack1_wl_rmfree(struct mqtt_ack_wlist *wl, uint16_t msg_id) |
dflet | 0:087b5655778d | 1258 | { |
dflet | 0:087b5655778d | 1259 | struct mqtt_packet *mqp = mqp_ack_wlist_remove(wl, msg_id); |
dflet | 0:087b5655778d | 1260 | if(NULL != mqp) { |
dflet | 0:087b5655778d | 1261 | mqp_free(mqp); |
dflet | 0:087b5655778d | 1262 | return true; |
dflet | 0:087b5655778d | 1263 | } |
dflet | 0:087b5655778d | 1264 | |
dflet | 0:087b5655778d | 1265 | USR_INFO("Err: Unexpected ACK w/ ID 0x%04x\n\r", msg_id); |
dflet | 0:087b5655778d | 1266 | |
dflet | 0:087b5655778d | 1267 | return false; |
dflet | 0:087b5655778d | 1268 | } |
dflet | 0:087b5655778d | 1269 | |
dflet | 0:087b5655778d | 1270 | static bool _proc_pub_rec_rx(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 1271 | { |
dflet | 0:087b5655778d | 1272 | /* Follow-up messages for QOS2 PUB must be transacted in the |
dflet | 0:087b5655778d | 1273 | same order as the initial sequence of QOS2 PUB dispatches. |
dflet | 0:087b5655778d | 1274 | Therefore, checking the first entry should be OK |
dflet | 0:087b5655778d | 1275 | */ |
dflet | 0:087b5655778d | 1276 | struct mqtt_packet *mqp = CLIENT(cl_ctx)->qos_ack1_wl.head; |
dflet | 0:087b5655778d | 1277 | |
dflet | 0:087b5655778d | 1278 | if((msg_id == mqp->msg_id) && ack2_msg_id_logup(cl_ctx, msg_id)) { |
dflet | 0:087b5655778d | 1279 | |
dflet | 0:087b5655778d | 1280 | ack1_wl_rmfree(&CLIENT(cl_ctx)->qos_ack1_wl, msg_id); |
dflet | 0:087b5655778d | 1281 | |
dflet | 0:087b5655778d | 1282 | vh_msg_send(cl_ctx, MQTT_PUBREL, MQTT_QOS1, |
dflet | 0:087b5655778d | 1283 | true, msg_id); |
dflet | 0:087b5655778d | 1284 | |
dflet | 0:087b5655778d | 1285 | return true; |
dflet | 0:087b5655778d | 1286 | } |
dflet | 0:087b5655778d | 1287 | |
dflet | 0:087b5655778d | 1288 | return false; /* Unexpected PUBREC or QOS2 store exceeded */ |
dflet | 0:087b5655778d | 1289 | } |
dflet | 0:087b5655778d | 1290 | |
dflet | 0:087b5655778d | 1291 | static bool _proc_pub_rel_rx(struct client_ctx *cl_ctx, uint16_t msg_id) |
dflet | 0:087b5655778d | 1292 | { |
dflet | 0:087b5655778d | 1293 | /* For a PUB-REL RX, send PUBCOMP to let server make progress */ |
dflet | 0:087b5655778d | 1294 | vh_msg_send(cl_ctx, MQTT_PUBCOMP, MQTT_QOS0, true, msg_id); |
dflet | 0:087b5655778d | 1295 | |
dflet | 0:087b5655778d | 1296 | if(qos2_pub_rx_is_done(cl_ctx, msg_id)) |
dflet | 0:087b5655778d | 1297 | qos2_pub_rx_unlog(cl_ctx, msg_id); /* Expunge record */ |
dflet | 0:087b5655778d | 1298 | |
dflet | 0:087b5655778d | 1299 | return true; |
dflet | 0:087b5655778d | 1300 | } |
dflet | 0:087b5655778d | 1301 | |
dflet | 0:087b5655778d | 1302 | /* |
dflet | 0:087b5655778d | 1303 | Process ACK Message from Broker. |
dflet | 0:087b5655778d | 1304 | Returns true on success, otherwise false. |
dflet | 0:087b5655778d | 1305 | Used for: PUBACK, SUBACK and UNSUBACK |
dflet | 0:087b5655778d | 1306 | */ |
dflet | 0:087b5655778d | 1307 | static |
dflet | 0:087b5655778d | 1308 | bool _proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1309 | { |
dflet | 0:087b5655778d | 1310 | struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); |
dflet | 0:087b5655778d | 1311 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 1312 | uint16_t msg_id = mqp_raw->msg_id; |
dflet | 0:087b5655778d | 1313 | uint32_t len = mqp_raw->pl_len; |
dflet | 0:087b5655778d | 1314 | |
dflet | 0:087b5655778d | 1315 | /* Caters to SUB-ACK, UNSUB-ACK and PUB-ACK Messages */ |
dflet | 0:087b5655778d | 1316 | if(false == ack1_wl_rmfree(&client->qos_ack1_wl, msg_id)) |
dflet | 0:087b5655778d | 1317 | return false; /* Err: MSG_ID was not awaited */ |
dflet | 0:087b5655778d | 1318 | |
dflet | 0:087b5655778d | 1319 | if(ctx_cbs->ack_notify) |
dflet | 0:087b5655778d | 1320 | ctx_cbs->ack_notify(client->app, mqp_raw->msg_type, msg_id, |
dflet | 0:087b5655778d | 1321 | len? MQP_PAYLOAD_BUF(mqp_raw): NULL, |
dflet | 0:087b5655778d | 1322 | len); |
dflet | 0:087b5655778d | 1323 | return true; |
dflet | 0:087b5655778d | 1324 | } |
dflet | 0:087b5655778d | 1325 | |
dflet | 0:087b5655778d | 1326 | static |
dflet | 0:087b5655778d | 1327 | bool proc_ack_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1328 | { |
dflet | 0:087b5655778d | 1329 | uint8_t msg_type = mqp_raw->msg_type; |
dflet | 0:087b5655778d | 1330 | bool rv = false; |
dflet | 0:087b5655778d | 1331 | uint16_t msg_id = 0; |
dflet | 0:087b5655778d | 1332 | |
dflet | 0:087b5655778d | 1333 | if(false == mqp_proc_msg_id_ack_rx(mqp_raw, MQTT_SUBACK == msg_type)) |
dflet | 0:087b5655778d | 1334 | return rv; /* Problem in contents received from server */ |
dflet | 0:087b5655778d | 1335 | |
dflet | 0:087b5655778d | 1336 | msg_id = mqp_raw->msg_id; |
dflet | 0:087b5655778d | 1337 | |
dflet | 0:087b5655778d | 1338 | if(MQTT_PUBREC == msg_type) { |
dflet | 0:087b5655778d | 1339 | rv = _proc_pub_rec_rx(cl_ctx, msg_id); |
dflet | 0:087b5655778d | 1340 | if(rv) |
dflet | 0:087b5655778d | 1341 | MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */ |
dflet | 0:087b5655778d | 1342 | |
dflet | 0:087b5655778d | 1343 | } else if(MQTT_PUBREL == msg_type) { |
dflet | 0:087b5655778d | 1344 | rv = _proc_pub_rel_rx(cl_ctx, msg_id); |
dflet | 0:087b5655778d | 1345 | if(rv) |
dflet | 0:087b5655778d | 1346 | MQP_RX_DO_NOT_RPT_SET(mqp_raw); /* Don't report to App */ |
dflet | 0:087b5655778d | 1347 | } else if(MQTT_PUBCOMP == msg_type) { |
dflet | 0:087b5655778d | 1348 | rv = ack2_msg_id_unlog(cl_ctx, msg_id); |
dflet | 0:087b5655778d | 1349 | } else { |
dflet | 0:087b5655778d | 1350 | rv = _proc_ack_msg_rx(cl_ctx, mqp_raw); |
dflet | 0:087b5655778d | 1351 | } |
dflet | 0:087b5655778d | 1352 | |
dflet | 0:087b5655778d | 1353 | return rv; |
dflet | 0:087b5655778d | 1354 | } |
dflet | 0:087b5655778d | 1355 | |
dflet | 0:087b5655778d | 1356 | static |
dflet | 0:087b5655778d | 1357 | bool proc_pub_msg_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1358 | { |
dflet | 0:087b5655778d | 1359 | struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); |
dflet | 0:087b5655778d | 1360 | bool good_pub = mqp_proc_pub_rx(mqp_raw); |
dflet | 0:087b5655778d | 1361 | uint8_t B = mqp_raw->fh_byte1; |
dflet | 0:087b5655778d | 1362 | enum mqtt_qos qos = ENUM_QOS(B); |
dflet | 0:087b5655778d | 1363 | uint16_t msg_id = 0; |
dflet | 0:087b5655778d | 1364 | |
dflet | 0:087b5655778d | 1365 | if(false == good_pub) |
dflet | 0:087b5655778d | 1366 | return false; /* Didn't get nicely composed PUB Packet */ |
dflet | 0:087b5655778d | 1367 | |
dflet | 0:087b5655778d | 1368 | msg_id = mqp_raw->msg_id; |
dflet | 0:087b5655778d | 1369 | |
dflet | 0:087b5655778d | 1370 | /* Irrespective of the result of the ACK through vh_msg_send(), |
dflet | 0:087b5655778d | 1371 | the implementation has chosen to process the good PUB packet. |
dflet | 0:087b5655778d | 1372 | Any error will be handled in next iteration of rx processing. |
dflet | 0:087b5655778d | 1373 | */ |
dflet | 0:087b5655778d | 1374 | if(MQTT_QOS1 == qos) |
dflet | 0:087b5655778d | 1375 | vh_msg_send(cl_ctx, MQTT_PUBACK, MQTT_QOS0, true, msg_id); |
dflet | 0:087b5655778d | 1376 | |
dflet | 0:087b5655778d | 1377 | if(MQTT_QOS2 == qos) { |
dflet | 0:087b5655778d | 1378 | /* Ensuring "only once" philosophy for MQTT QoS2 PUBs */ |
dflet | 0:087b5655778d | 1379 | if(qos2_pub_rx_is_done(cl_ctx, msg_id)) { |
dflet | 0:087b5655778d | 1380 | /* Already delivered. Drop it & do not report */ |
dflet | 0:087b5655778d | 1381 | MQP_RX_DO_NOT_RPT_SET(mqp_raw); |
dflet | 0:087b5655778d | 1382 | return true; /* No more follow-up; all's good */ |
dflet | 0:087b5655778d | 1383 | } |
dflet | 0:087b5655778d | 1384 | |
dflet | 0:087b5655778d | 1385 | if(false == qos2_pub_rx_logup(cl_ctx, msg_id)) |
dflet | 0:087b5655778d | 1386 | return false; /* Failed to record New RX PUB */ |
dflet | 0:087b5655778d | 1387 | |
dflet | 0:087b5655778d | 1388 | vh_msg_send(cl_ctx, MQTT_PUBREC, MQTT_QOS0, true, msg_id); |
dflet | 0:087b5655778d | 1389 | } |
dflet | 0:087b5655778d | 1390 | |
dflet | 0:087b5655778d | 1391 | /* QoS obliations completed, present PUBLISH RX packet to app */ |
dflet | 0:087b5655778d | 1392 | if(ctx_cbs->publish_rx) { |
dflet | 0:087b5655778d | 1393 | /* App has chosen the callback method to receive PKT */ |
dflet | 0:087b5655778d | 1394 | mqp_raw->n_refs++; /* Make app owner of this packet */ |
dflet | 0:087b5655778d | 1395 | if(ctx_cbs->publish_rx(CLIENT(cl_ctx)->app, BOOL_DUP(B), |
dflet | 0:087b5655778d | 1396 | qos, BOOL_RETAIN(B), mqp_raw)) { |
dflet | 0:087b5655778d | 1397 | /* App has no use of PKT any more, so free it */ |
dflet | 0:087b5655778d | 1398 | mqp_raw->n_refs--; /* Take back ownership */ |
dflet | 0:087b5655778d | 1399 | } |
dflet | 0:087b5655778d | 1400 | } |
dflet | 0:087b5655778d | 1401 | |
dflet | 0:087b5655778d | 1402 | return true; |
dflet | 0:087b5655778d | 1403 | } |
dflet | 0:087b5655778d | 1404 | |
dflet | 0:087b5655778d | 1405 | static |
dflet | 0:087b5655778d | 1406 | bool proc_connack_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1407 | { |
dflet | 0:087b5655778d | 1408 | struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); |
dflet | 0:087b5655778d | 1409 | uint8_t *buf = MQP_VHEADER_BUF(mqp_raw); |
dflet | 0:087b5655778d | 1410 | |
dflet | 0:087b5655778d | 1411 | mqp_raw->vh_len += 2; |
dflet | 0:087b5655778d | 1412 | mqp_raw->pl_len -= 2; |
dflet | 0:087b5655778d | 1413 | |
dflet | 0:087b5655778d | 1414 | if(0 != mqp_raw->pl_len) |
dflet | 0:087b5655778d | 1415 | return false; /* There is no payload in message */ |
dflet | 0:087b5655778d | 1416 | |
dflet | 0:087b5655778d | 1417 | cl_ctx->flags &= ~(DO_CONNACK_TO_FLAG | CONNACK_AWAIT_FLAG); |
dflet | 0:087b5655778d | 1418 | |
dflet | 0:087b5655778d | 1419 | if(VHB_CONNACK_RC(buf)) |
dflet | 0:087b5655778d | 1420 | /* Server has refused the connection, inform the app */ |
dflet | 0:087b5655778d | 1421 | goto proc_connack_rx_exit1; |
dflet | 0:087b5655778d | 1422 | |
dflet | 0:087b5655778d | 1423 | cl_ctx->flags |= NOW_CONNECTED_FLAG; |
dflet | 0:087b5655778d | 1424 | cl_ctx_timeout_update(cl_ctx, net_ops->time()); /* start KA */ |
dflet | 0:087b5655778d | 1425 | |
dflet | 0:087b5655778d | 1426 | if(IS_CLN_SESSION(cl_ctx)) |
dflet | 0:087b5655778d | 1427 | session_delete(cl_ctx); |
dflet | 0:087b5655778d | 1428 | else |
dflet | 0:087b5655778d | 1429 | session_resume(cl_ctx); |
dflet | 0:087b5655778d | 1430 | |
dflet | 0:087b5655778d | 1431 | proc_connack_rx_exit1: |
dflet | 0:087b5655778d | 1432 | if(ctx_cbs->ack_notify) |
dflet | 0:087b5655778d | 1433 | ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, mqp_raw->msg_type, |
dflet | 0:087b5655778d | 1434 | 0, buf, 2); |
dflet | 0:087b5655778d | 1435 | |
dflet | 0:087b5655778d | 1436 | return true; |
dflet | 0:087b5655778d | 1437 | } |
dflet | 0:087b5655778d | 1438 | |
dflet | 0:087b5655778d | 1439 | static |
dflet | 0:087b5655778d | 1440 | bool proc_pingrsp_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1441 | { |
dflet | 0:087b5655778d | 1442 | struct mqtt_client_ctx_cbs *ctx_cbs = CTX_CBS_PTR(cl_ctx); |
dflet | 0:087b5655778d | 1443 | |
dflet | 0:087b5655778d | 1444 | if(0 != mqp_raw->pl_len) |
dflet | 0:087b5655778d | 1445 | return false; |
dflet | 0:087b5655778d | 1446 | |
dflet | 0:087b5655778d | 1447 | if(AWAITS_KA_PING(cl_ctx)) { |
dflet | 0:087b5655778d | 1448 | cl_ctx->flags &= ~KA_PINGER_RSP_FLAG; |
dflet | 0:087b5655778d | 1449 | return true; |
dflet | 0:087b5655778d | 1450 | } |
dflet | 0:087b5655778d | 1451 | |
dflet | 0:087b5655778d | 1452 | if(AWAITS_PINGRSP(cl_ctx)) { |
dflet | 0:087b5655778d | 1453 | cl_ctx->flags &= ~USER_PING_RSP_FLAG; |
dflet | 0:087b5655778d | 1454 | if(ctx_cbs->ack_notify) |
dflet | 0:087b5655778d | 1455 | ctx_cbs->ack_notify(CLIENT(cl_ctx)->app, |
dflet | 0:087b5655778d | 1456 | mqp_raw->msg_type, |
dflet | 0:087b5655778d | 1457 | 0, NULL, 0); |
dflet | 0:087b5655778d | 1458 | return true; |
dflet | 0:087b5655778d | 1459 | } |
dflet | 0:087b5655778d | 1460 | |
dflet | 0:087b5655778d | 1461 | return false; |
dflet | 0:087b5655778d | 1462 | } |
dflet | 0:087b5655778d | 1463 | |
dflet | 0:087b5655778d | 1464 | static |
dflet | 0:087b5655778d | 1465 | bool conn_sent_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1466 | { |
dflet | 0:087b5655778d | 1467 | bool rv = false; |
dflet | 0:087b5655778d | 1468 | |
dflet | 0:087b5655778d | 1469 | switch(mqp_raw->msg_type) { |
dflet | 0:087b5655778d | 1470 | |
dflet | 0:087b5655778d | 1471 | case MQTT_CONNACK: |
dflet | 0:087b5655778d | 1472 | /* Changes client_ctx->flags to CONNECTED */ |
dflet | 0:087b5655778d | 1473 | rv = proc_connack_rx(cl_ctx, mqp_raw); |
dflet | 0:087b5655778d | 1474 | break; |
dflet | 0:087b5655778d | 1475 | |
dflet | 0:087b5655778d | 1476 | default: |
dflet | 0:087b5655778d | 1477 | break; |
dflet | 0:087b5655778d | 1478 | } |
dflet | 0:087b5655778d | 1479 | |
dflet | 0:087b5655778d | 1480 | return rv; |
dflet | 0:087b5655778d | 1481 | } |
dflet | 0:087b5655778d | 1482 | |
dflet | 0:087b5655778d | 1483 | static |
dflet | 0:087b5655778d | 1484 | bool connected_state_rx(struct client_ctx *cl_ctx, struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1485 | { |
dflet | 0:087b5655778d | 1486 | bool rv = false; |
dflet | 0:087b5655778d | 1487 | |
dflet | 0:087b5655778d | 1488 | switch(mqp_raw->msg_type) { |
dflet | 0:087b5655778d | 1489 | |
dflet | 0:087b5655778d | 1490 | case MQTT_SUBACK: |
dflet | 0:087b5655778d | 1491 | case MQTT_PUBACK: |
dflet | 0:087b5655778d | 1492 | case MQTT_PUBREC: |
dflet | 0:087b5655778d | 1493 | case MQTT_PUBREL: |
dflet | 0:087b5655778d | 1494 | case MQTT_PUBCOMP: |
dflet | 0:087b5655778d | 1495 | case MQTT_UNSUBACK: |
dflet | 0:087b5655778d | 1496 | rv = proc_ack_msg_rx(cl_ctx, mqp_raw); |
dflet | 0:087b5655778d | 1497 | break; |
dflet | 0:087b5655778d | 1498 | |
dflet | 0:087b5655778d | 1499 | case MQTT_PINGRSP: |
dflet | 0:087b5655778d | 1500 | rv = proc_pingrsp_rx(cl_ctx, mqp_raw); |
dflet | 0:087b5655778d | 1501 | break; |
dflet | 0:087b5655778d | 1502 | |
dflet | 0:087b5655778d | 1503 | case MQTT_PUBLISH: |
dflet | 0:087b5655778d | 1504 | rv = proc_pub_msg_rx(cl_ctx, mqp_raw); |
dflet | 0:087b5655778d | 1505 | break; |
dflet | 0:087b5655778d | 1506 | |
dflet | 0:087b5655778d | 1507 | case MQTT_CONNACK: /* not expected */ |
dflet | 0:087b5655778d | 1508 | default: |
dflet | 0:087b5655778d | 1509 | break; |
dflet | 0:087b5655778d | 1510 | } |
dflet | 0:087b5655778d | 1511 | |
dflet | 0:087b5655778d | 1512 | return rv; |
dflet | 0:087b5655778d | 1513 | } |
dflet | 0:087b5655778d | 1514 | |
dflet | 0:087b5655778d | 1515 | static bool process_recv(struct client_ctx *cl_ctx, |
dflet | 0:087b5655778d | 1516 | struct mqtt_packet *mqp_raw) |
dflet | 0:087b5655778d | 1517 | { |
dflet | 0:087b5655778d | 1518 | bool rv; |
dflet | 0:087b5655778d | 1519 | |
dflet | 0:087b5655778d | 1520 | USR_INFO("C: Rcvd msg Fix-Hdr (Byte1) 0x%02x from net %d [@ %u]\n\r", |
dflet | 0:087b5655778d | 1521 | mqp_raw->fh_byte1, cl_ctx->net, net_ops->time()); |
dflet | 0:087b5655778d | 1522 | |
dflet | 0:087b5655778d | 1523 | /* Working Principle: Only RX processing errors should be |
dflet | 0:087b5655778d | 1524 | reported as 'false'. Status of TX as a follow-up to RX |
dflet | 0:087b5655778d | 1525 | messages need not be reported by the xyz_rx() routines. |
dflet | 0:087b5655778d | 1526 | Error observed in TX is either dealt in next iteration |
dflet | 0:087b5655778d | 1527 | of RX loop (in case, there is a dedicated RX task for |
dflet | 0:087b5655778d | 1528 | the CTX) or in TX routine itself (in case, there is no |
dflet | 0:087b5655778d | 1529 | dedicated RX task for the CTX). |
dflet | 0:087b5655778d | 1530 | */ |
dflet | 0:087b5655778d | 1531 | rv = AWAITS_CONNACK(cl_ctx)? |
dflet | 0:087b5655778d | 1532 | conn_sent_state_rx(cl_ctx, mqp_raw) : |
dflet | 0:087b5655778d | 1533 | connected_state_rx(cl_ctx, mqp_raw); |
dflet | 0:087b5655778d | 1534 | |
dflet | 0:087b5655778d | 1535 | DBG_INFO("C: Msg w/ ID 0x%04x, processing status: %s\n\r", |
dflet | 0:087b5655778d | 1536 | mqp_raw->msg_id, rv? "Good" : "Fail"); |
dflet | 0:087b5655778d | 1537 | |
dflet | 0:087b5655778d | 1538 | return rv; |
dflet | 0:087b5655778d | 1539 | } |
dflet | 0:087b5655778d | 1540 | |
dflet | 0:087b5655778d | 1541 | static int32_t net_recv(int32_t net, struct mqtt_packet *mqp, uint32_t wait_secs, void *ctx) |
dflet | 0:087b5655778d | 1542 | { |
dflet | 0:087b5655778d | 1543 | bool timed_out = false; |
dflet | 0:087b5655778d | 1544 | int32_t rv = mqp_recv(net, net_ops, mqp, wait_secs, &timed_out, ctx); |
dflet | 0:087b5655778d | 1545 | if(rv <= 0) { |
dflet | 0:087b5655778d | 1546 | USR_INFO("C: Net %d, Raw Error %d, Time Out: %c\n\r", |
dflet | 0:087b5655778d | 1547 | net, rv, timed_out? 'Y' : 'N'); |
dflet | 0:087b5655778d | 1548 | |
dflet | 0:087b5655778d | 1549 | if(timed_out) |
dflet | 0:087b5655778d | 1550 | rv = MQP_ERR_TIMEOUT; |
dflet | 0:087b5655778d | 1551 | } |
dflet | 0:087b5655778d | 1552 | |
dflet | 0:087b5655778d | 1553 | return rv; |
dflet | 0:087b5655778d | 1554 | } |
dflet | 0:087b5655778d | 1555 | |
dflet | 0:087b5655778d | 1556 | /* |
dflet | 0:087b5655778d | 1557 | MQTT 3.1.1 implementation |
dflet | 0:087b5655778d | 1558 | ------------------------- |
dflet | 0:087b5655778d | 1559 | |
dflet | 0:087b5655778d | 1560 | Keep Alive Time is maxmimum interval within which a client should send a |
dflet | 0:087b5655778d | 1561 | packet to broker. If there are either no packets to be sent to broker or |
dflet | 0:087b5655778d | 1562 | no retries left, then client is expected to a send a PINGREQ within Keep |
dflet | 0:087b5655778d | 1563 | Alive Time. Broker should respond by sending PINGRSP with-in reasonable |
dflet | 0:087b5655778d | 1564 | time of 'wait_secs'. If Keep Alive Time is set as 0, then client is not |
dflet | 0:087b5655778d | 1565 | expected to be disconnected due to in-activity of MQTT messages. Value |
dflet | 0:087b5655778d | 1566 | of 'wait_secs' is assumed to be quite smaller than (non-zero) 'ka_secs'. |
dflet | 0:087b5655778d | 1567 | */ |
dflet | 0:087b5655778d | 1568 | static void conn2used_ctxs(uint32_t wait_secs) |
dflet | 0:087b5655778d | 1569 | { |
dflet | 0:087b5655778d | 1570 | |
dflet | 0:087b5655778d | 1571 | while(conn_ctxs) { |
dflet | 0:087b5655778d | 1572 | struct client_ctx *cl_ctx = conn_ctxs; |
dflet | 0:087b5655778d | 1573 | conn_ctxs = conn_ctxs->next; |
dflet | 0:087b5655778d | 1574 | |
dflet | 0:087b5655778d | 1575 | cl_ctx_timeout_insert(&used_ctxs, cl_ctx); |
dflet | 0:087b5655778d | 1576 | } |
dflet | 0:087b5655778d | 1577 | } |
dflet | 0:087b5655778d | 1578 | |
dflet | 0:087b5655778d | 1579 | static int32_t single_ctx_ka_sequence(struct client_ctx *cl_ctx, uint32_t wait_secs) |
dflet | 0:087b5655778d | 1580 | { |
dflet | 0:087b5655778d | 1581 | |
dflet | 0:087b5655778d | 1582 | uint32_t now_secs = net_ops->time(); |
dflet | 0:087b5655778d | 1583 | |
dflet | 0:087b5655778d | 1584 | if(AWAITS_CONNACK(cl_ctx) && CFG_CONNACK_TO(cl_ctx)) { |
dflet | 0:087b5655778d | 1585 | cl_ctx->timeout += wait_secs; /* Set CONNACK timeout value */ |
dflet | 0:087b5655778d | 1586 | cl_ctx->flags &= ~DO_CONNACK_TO_FLAG; |
dflet | 0:087b5655778d | 1587 | } |
dflet | 0:087b5655778d | 1588 | |
dflet | 0:087b5655778d | 1589 | if(cl_ctx->timeout > now_secs) { |
dflet | 0:087b5655778d | 1590 | return 1; /* Still have time for next message transaction */ |
dflet | 0:087b5655778d | 1591 | } |
dflet | 0:087b5655778d | 1592 | if(is_connected(cl_ctx)) { |
dflet | 0:087b5655778d | 1593 | /* Timeout has happened. Check for PINGRESP if PINGREQ done. |
dflet | 0:087b5655778d | 1594 | Otherwise, send PINGREQ (Are You there?) to the server. */ |
dflet | 0:087b5655778d | 1595 | if(AWAITS_KA_PING(cl_ctx)) { |
dflet | 0:087b5655778d | 1596 | goto single_ctx_ka_sequence_exit1; /* No PINGRESP */ |
dflet | 0:087b5655778d | 1597 | } |
dflet | 0:087b5655778d | 1598 | return pingreq_send(cl_ctx, KA_PINGER_RSP_FLAG); /* Hello! */ |
dflet | 0:087b5655778d | 1599 | } |
dflet | 0:087b5655778d | 1600 | |
dflet | 0:087b5655778d | 1601 | single_ctx_ka_sequence_exit1: |
dflet | 0:087b5655778d | 1602 | |
dflet | 0:087b5655778d | 1603 | USR_INFO("C: Net %d, no RX MSG in reasonable time\n\r", cl_ctx->net); |
dflet | 0:087b5655778d | 1604 | return -1; |
dflet | 0:087b5655778d | 1605 | } |
dflet | 0:087b5655778d | 1606 | |
dflet | 0:087b5655778d | 1607 | static uint32_t single_ctx_adj_wait_secs_get(struct client_ctx *cl_ctx, uint32_t wait_secs) |
dflet | 0:087b5655778d | 1608 | { |
dflet | 0:087b5655778d | 1609 | |
dflet | 0:087b5655778d | 1610 | return (KA_TIMEOUT_NONE != cl_ctx->timeout)? |
dflet | 0:087b5655778d | 1611 | MIN(cl_ctx->timeout - net_ops->time(), wait_secs) : wait_secs; |
dflet | 0:087b5655778d | 1612 | } |
dflet | 0:087b5655778d | 1613 | |
dflet | 0:087b5655778d | 1614 | static int32_t single_ctx_rx_prep(struct client_ctx *cl_ctx, uint32_t *secs2wait) |
dflet | 0:087b5655778d | 1615 | { |
dflet | 0:087b5655778d | 1616 | |
dflet | 0:087b5655778d | 1617 | int32_t rv; |
dflet | 0:087b5655778d | 1618 | |
dflet | 0:087b5655778d | 1619 | if(-1 == cl_ctx->net) |
dflet | 0:087b5655778d | 1620 | return MQP_ERR_NOTCONN; /* Likely for a ctx w/o a receive task */ |
dflet | 0:087b5655778d | 1621 | |
dflet | 0:087b5655778d | 1622 | if(NEED_NET_CLOSE(cl_ctx)) |
dflet | 0:087b5655778d | 1623 | rv = MQP_ERR_NOTCONN; |
dflet | 0:087b5655778d | 1624 | else if(0 > single_ctx_ka_sequence(cl_ctx, *secs2wait)) |
dflet | 0:087b5655778d | 1625 | rv = MQP_ERR_NETWORK; |
dflet | 0:087b5655778d | 1626 | else { |
dflet | 0:087b5655778d | 1627 | *secs2wait = single_ctx_adj_wait_secs_get(cl_ctx, *secs2wait); |
dflet | 0:087b5655778d | 1628 | return 1; |
dflet | 0:087b5655778d | 1629 | } |
dflet | 0:087b5655778d | 1630 | |
dflet | 0:087b5655778d | 1631 | do_net_close_rx(cl_ctx, rv); |
dflet | 0:087b5655778d | 1632 | return rv; |
dflet | 0:087b5655778d | 1633 | } |
dflet | 0:087b5655778d | 1634 | |
dflet | 0:087b5655778d | 1635 | static |
dflet | 0:087b5655778d | 1636 | int32_t proc_ctx_data_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 1637 | uint32_t wait_secs, void **app) |
dflet | 0:087b5655778d | 1638 | { |
dflet | 0:087b5655778d | 1639 | |
dflet | 0:087b5655778d | 1640 | int32_t rv = MQP_ERR_NOTCONN; |
dflet | 0:087b5655778d | 1641 | int32_t net = cl_ctx->net; |
dflet | 0:087b5655778d | 1642 | |
dflet | 0:087b5655778d | 1643 | *app = cl_ctx->usr; |
dflet | 0:087b5655778d | 1644 | |
dflet | 0:087b5655778d | 1645 | rv = net_recv(net, mqp, wait_secs, (void*)cl_ctx); |
dflet | 0:087b5655778d | 1646 | |
dflet | 0:087b5655778d | 1647 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 1648 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 1649 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 1650 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 1651 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 1652 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 1653 | // shared resource. |
dflet | 0:087b5655778d | 1654 | if(rv > 0) { |
dflet | 0:087b5655778d | 1655 | if(false == process_recv(cl_ctx, mqp)) { |
dflet | 0:087b5655778d | 1656 | Uart_Write((uint8_t*)"MQP_ERR_CONTENT\r\n"); |
dflet | 0:087b5655778d | 1657 | rv = MQP_ERR_CONTENT; |
dflet | 0:087b5655778d | 1658 | } |
dflet | 0:087b5655778d | 1659 | } |
dflet | 0:087b5655778d | 1660 | /* RX: close the network connection to the server for this context, if |
dflet | 0:087b5655778d | 1661 | (a) there is a processing / protocol error other than time-out |
dflet | 0:087b5655778d | 1662 | (b) A good MQTT CONNACK has a return code - connection refused |
dflet | 0:087b5655778d | 1663 | */ |
dflet | 0:087b5655778d | 1664 | if(((rv < 0) && (rv != MQP_ERR_TIMEOUT)) || |
dflet | 0:087b5655778d | 1665 | ((MQTT_CONNACK == mqp->msg_type) && |
dflet | 0:087b5655778d | 1666 | MQP_CONNACK_RC(mqp))) { |
dflet | 0:087b5655778d | 1667 | do_net_close_rx(cl_ctx, rv); |
dflet | 0:087b5655778d | 1668 | } |
dflet | 0:087b5655778d | 1669 | |
dflet | 0:087b5655778d | 1670 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 1671 | // semaphore. |
dflet | 0:087b5655778d | 1672 | xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 1673 | |
dflet | 0:087b5655778d | 1674 | } else { |
dflet | 0:087b5655778d | 1675 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 1676 | // the shared resource safely. |
dflet | 0:087b5655778d | 1677 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 1678 | } |
dflet | 0:087b5655778d | 1679 | } |
dflet | 0:087b5655778d | 1680 | |
dflet | 0:087b5655778d | 1681 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 1682 | return rv; |
dflet | 0:087b5655778d | 1683 | } |
dflet | 0:087b5655778d | 1684 | |
dflet | 0:087b5655778d | 1685 | static int32_t mqp_setup_proc_ctx_data_recv(struct client_ctx *cl_ctx, |
dflet | 0:087b5655778d | 1686 | struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 1687 | uint32_t wait_secs, void **app) |
dflet | 0:087b5655778d | 1688 | { |
dflet | 0:087b5655778d | 1689 | struct mqtt_packet *rx_mqp = CLIENT(cl_ctx)->rx_mqp; |
dflet | 0:087b5655778d | 1690 | int32_t rv; |
dflet | 0:087b5655778d | 1691 | |
dflet | 0:087b5655778d | 1692 | if(NULL != mqp) { |
dflet | 0:087b5655778d | 1693 | /* Input MQP must be same as MQP for partial RX, if any */ |
dflet | 0:087b5655778d | 1694 | if(rx_mqp) { |
dflet | 0:087b5655778d | 1695 | if(mqp != rx_mqp) |
dflet | 0:087b5655778d | 1696 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 1697 | } else |
dflet | 0:087b5655778d | 1698 | mqp_reset(mqp); |
dflet | 0:087b5655778d | 1699 | } |
dflet | 0:087b5655778d | 1700 | |
dflet | 0:087b5655778d | 1701 | if(NULL == mqp) { |
dflet | 0:087b5655778d | 1702 | mqp = rx_mqp? rx_mqp : mqp_client_recv_alloc(0); |
dflet | 0:087b5655778d | 1703 | if(NULL == mqp) |
dflet | 0:087b5655778d | 1704 | return MQP_ERR_PKT_AVL; |
dflet | 0:087b5655778d | 1705 | } |
dflet | 0:087b5655778d | 1706 | |
dflet | 0:087b5655778d | 1707 | rv = proc_ctx_data_recv(cl_ctx, mqp, wait_secs, app); |
dflet | 0:087b5655778d | 1708 | if(rv == MQP_ERR_TIMEOUT) { |
dflet | 0:087b5655778d | 1709 | CLIENT(cl_ctx)->rx_mqp = mqp; /* Save partial RX MQP */ |
dflet | 0:087b5655778d | 1710 | } else { |
dflet | 0:087b5655778d | 1711 | /* Control reaches here due to either an error in RX or the |
dflet | 0:087b5655778d | 1712 | completion of RX. In both the cases, the MQP needs to be |
dflet | 0:087b5655778d | 1713 | detached and processed. For completion of RX: |
dflet | 0:087b5655778d | 1714 | callback mode: Application has used up MQP data; free it |
dflet | 0:087b5655778d | 1715 | Non-callback mode: Application will now use complete MQP |
dflet | 0:087b5655778d | 1716 | */ |
dflet | 0:087b5655778d | 1717 | CLIENT(cl_ctx)->rx_mqp = NULL; |
dflet | 0:087b5655778d | 1718 | if(mqp->free) |
dflet | 0:087b5655778d | 1719 | mqp_free_locked(mqp); /* For only callback mode */ |
dflet | 0:087b5655778d | 1720 | } |
dflet | 0:087b5655778d | 1721 | |
dflet | 0:087b5655778d | 1722 | return rv; |
dflet | 0:087b5655778d | 1723 | } |
dflet | 0:087b5655778d | 1724 | |
dflet | 0:087b5655778d | 1725 | static int32_t cl_ctx_recv(struct client_ctx *cl_ctx, struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 1726 | uint32_t wait_secs) |
dflet | 0:087b5655778d | 1727 | { |
dflet | 0:087b5655778d | 1728 | |
dflet | 0:087b5655778d | 1729 | void *app = NULL; |
dflet | 0:087b5655778d | 1730 | int32_t rv = 0; |
dflet | 0:087b5655778d | 1731 | |
dflet | 0:087b5655778d | 1732 | do { |
dflet | 0:087b5655778d | 1733 | if(mqp && (NULL == CLIENT(cl_ctx)->rx_mqp)) |
dflet | 0:087b5655778d | 1734 | mqp_reset(mqp); |
dflet | 0:087b5655778d | 1735 | |
dflet | 0:087b5655778d | 1736 | rv = single_ctx_rx_prep(cl_ctx, &wait_secs); |
dflet | 0:087b5655778d | 1737 | if(rv > 0) |
dflet | 0:087b5655778d | 1738 | rv = mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, |
dflet | 0:087b5655778d | 1739 | wait_secs, |
dflet | 0:087b5655778d | 1740 | &app); |
dflet | 0:087b5655778d | 1741 | |
dflet | 0:087b5655778d | 1742 | /* 'mqp' must be valid, if rv > 0 but including further |
dflet | 0:087b5655778d | 1743 | & additional check for sake of static cod eanalysis.*/ |
dflet | 0:087b5655778d | 1744 | } while((rv > 0) && mqp && MQP_RX_DO_NOT_RPT_COR(mqp)); |
dflet | 0:087b5655778d | 1745 | |
dflet | 0:087b5655778d | 1746 | return rv; |
dflet | 0:087b5655778d | 1747 | } |
dflet | 0:087b5655778d | 1748 | |
dflet | 0:087b5655778d | 1749 | int32_t mqtt_client_ctx_await_msg(void *ctx, uint8_t msg_type, struct mqtt_packet *mqp, |
dflet | 0:087b5655778d | 1750 | uint32_t wait_secs) |
dflet | 0:087b5655778d | 1751 | { |
dflet | 0:087b5655778d | 1752 | struct client_ctx *cl_ctx = CL_CTX(ctx); |
dflet | 0:087b5655778d | 1753 | int32_t rv = -1; |
dflet | 0:087b5655778d | 1754 | |
dflet | 0:087b5655778d | 1755 | if((NULL == cl_ctx) || (NULL == mqp)) |
dflet | 0:087b5655778d | 1756 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 1757 | |
dflet | 0:087b5655778d | 1758 | do { |
dflet | 0:087b5655778d | 1759 | rv = cl_ctx_recv(cl_ctx, mqp, wait_secs); |
dflet | 0:087b5655778d | 1760 | |
dflet | 0:087b5655778d | 1761 | } while((rv > 0) && |
dflet | 0:087b5655778d | 1762 | (0 != msg_type) && (msg_type != mqp->msg_type)); |
dflet | 0:087b5655778d | 1763 | |
dflet | 0:087b5655778d | 1764 | return rv; |
dflet | 0:087b5655778d | 1765 | } |
dflet | 0:087b5655778d | 1766 | |
dflet | 0:087b5655778d | 1767 | int32_t mqtt_client_ctx_run(void *ctx, uint32_t wait_secs) |
dflet | 0:087b5655778d | 1768 | { |
dflet | 0:087b5655778d | 1769 | |
dflet | 0:087b5655778d | 1770 | int32_t rv; |
dflet | 0:087b5655778d | 1771 | |
dflet | 0:087b5655778d | 1772 | if(NULL == ctx) |
dflet | 0:087b5655778d | 1773 | return MQP_ERR_FNPARAM; |
dflet | 0:087b5655778d | 1774 | |
dflet | 0:087b5655778d | 1775 | do { |
dflet | 0:087b5655778d | 1776 | rv = cl_ctx_recv(CL_CTX(ctx), NULL, wait_secs); |
dflet | 0:087b5655778d | 1777 | |
dflet | 0:087b5655778d | 1778 | } while(rv > 0); |
dflet | 0:087b5655778d | 1779 | |
dflet | 0:087b5655778d | 1780 | return rv; |
dflet | 0:087b5655778d | 1781 | } |
dflet | 0:087b5655778d | 1782 | |
dflet | 0:087b5655778d | 1783 | static struct client_ctx *group_ctxs_ka_sequence(uint32_t wait_secs) { |
dflet | 0:087b5655778d | 1784 | |
dflet | 0:087b5655778d | 1785 | struct client_ctx *cl_ctx = used_ctxs; |
dflet | 0:087b5655778d | 1786 | |
dflet | 0:087b5655778d | 1787 | while(cl_ctx) { |
dflet | 0:087b5655778d | 1788 | struct client_ctx *next = cl_ctx->next; |
dflet | 0:087b5655778d | 1789 | if(single_ctx_rx_prep(cl_ctx, &wait_secs) < 0) { |
dflet | 0:087b5655778d | 1790 | /* 'CTX' no more eligible for operation |
dflet | 0:087b5655778d | 1791 | and has been removed from used_list */ |
dflet | 0:087b5655778d | 1792 | if(false == grp_has_cbfn) |
dflet | 0:087b5655778d | 1793 | return cl_ctx; |
dflet | 0:087b5655778d | 1794 | } |
dflet | 0:087b5655778d | 1795 | |
dflet | 0:087b5655778d | 1796 | cl_ctx = next; |
dflet | 0:087b5655778d | 1797 | } |
dflet | 0:087b5655778d | 1798 | |
dflet | 0:087b5655778d | 1799 | return NULL; |
dflet | 0:087b5655778d | 1800 | } |
dflet | 0:087b5655778d | 1801 | |
dflet | 0:087b5655778d | 1802 | #define IO_MON_NO_TIMEOUT (0xFFFFFFFF) |
dflet | 0:087b5655778d | 1803 | |
dflet | 0:087b5655778d | 1804 | static uint32_t group_ctxs_adj_wait_secs_get(uint32_t wait_secs) |
dflet | 0:087b5655778d | 1805 | { |
dflet | 0:087b5655778d | 1806 | |
dflet | 0:087b5655778d | 1807 | return used_ctxs? |
dflet | 0:087b5655778d | 1808 | single_ctx_adj_wait_secs_get(used_ctxs, wait_secs) : wait_secs; |
dflet | 0:087b5655778d | 1809 | } |
dflet | 0:087b5655778d | 1810 | |
dflet | 0:087b5655778d | 1811 | static int32_t recv_hvec[MAX_NWCONN + 1 + 1]; /* GROUP LISTEN PORT + VEC END */ |
dflet | 0:087b5655778d | 1812 | static int32_t send_hvec = -1; |
dflet | 0:087b5655778d | 1813 | static int32_t rsvd_hvec = -1; |
dflet | 0:087b5655778d | 1814 | |
dflet | 0:087b5655778d | 1815 | /* Caller must ensure atomic enviroment for execution of this routine */ |
dflet | 0:087b5655778d | 1816 | static void recv_hvec_load(int32_t *hvec_recv, uint32_t size, struct client_ctx *list) |
dflet | 0:087b5655778d | 1817 | { |
dflet | 0:087b5655778d | 1818 | |
dflet | 0:087b5655778d | 1819 | int32_t i = 0; |
dflet | 0:087b5655778d | 1820 | |
dflet | 0:087b5655778d | 1821 | for(i = 0; (i < size) && (NULL != list); i++, list = list->next) |
dflet | 0:087b5655778d | 1822 | hvec_recv[i] = list->net; |
dflet | 0:087b5655778d | 1823 | |
dflet | 0:087b5655778d | 1824 | hvec_recv[i] = -1; |
dflet | 0:087b5655778d | 1825 | |
dflet | 0:087b5655778d | 1826 | return; |
dflet | 0:087b5655778d | 1827 | } |
dflet | 0:087b5655778d | 1828 | |
dflet | 0:087b5655778d | 1829 | static int32_t group_ctxs_rx_prep(uint32_t wait_secs, void **app) |
dflet | 0:087b5655778d | 1830 | { |
dflet | 0:087b5655778d | 1831 | |
dflet | 0:087b5655778d | 1832 | /* CHK 'used ctx'(s) have live connection w/ server. If not, drop it */ |
dflet | 0:087b5655778d | 1833 | struct client_ctx *ctx_kaTO = group_ctxs_ka_sequence(wait_secs); |
dflet | 0:087b5655778d | 1834 | int32_t n_hnds; |
dflet | 0:087b5655778d | 1835 | |
dflet | 0:087b5655778d | 1836 | if(ctx_kaTO) { |
dflet | 0:087b5655778d | 1837 | *app = CLIENT(ctx_kaTO)->app; |
dflet | 0:087b5655778d | 1838 | return MQP_ERR_NETWORK; |
dflet | 0:087b5655778d | 1839 | } |
dflet | 0:087b5655778d | 1840 | |
dflet | 0:087b5655778d | 1841 | conn2used_ctxs(wait_secs); /* Now, add new 'ctx'(s) to 'used ctxs' */ |
dflet | 0:087b5655778d | 1842 | |
dflet | 0:087b5655778d | 1843 | recv_hvec[0] = loopb_net; |
dflet | 0:087b5655778d | 1844 | recv_hvec_load(&recv_hvec[1], MAX_NWCONN + 1, used_ctxs); |
dflet | 0:087b5655778d | 1845 | |
dflet | 0:087b5655778d | 1846 | wait_secs = group_ctxs_adj_wait_secs_get(wait_secs); |
dflet | 0:087b5655778d | 1847 | |
dflet | 0:087b5655778d | 1848 | n_hnds = net_ops->io_mon(recv_hvec, &send_hvec, |
dflet | 0:087b5655778d | 1849 | &rsvd_hvec, wait_secs); |
dflet | 0:087b5655778d | 1850 | if(0 == n_hnds) |
dflet | 0:087b5655778d | 1851 | n_hnds = MQP_ERR_TIMEOUT; |
dflet | 0:087b5655778d | 1852 | else if(n_hnds < 0) |
dflet | 0:087b5655778d | 1853 | n_hnds = MQP_ERR_LIBQUIT; |
dflet | 0:087b5655778d | 1854 | |
dflet | 0:087b5655778d | 1855 | return n_hnds; |
dflet | 0:087b5655778d | 1856 | } |
dflet | 0:087b5655778d | 1857 | |
dflet | 0:087b5655778d | 1858 | static int32_t proc_loopback_recv(int32_t net) |
dflet | 0:087b5655778d | 1859 | { |
dflet | 0:087b5655778d | 1860 | |
dflet | 0:087b5655778d | 1861 | int32_t rv = 0; |
dflet | 0:087b5655778d | 1862 | uint8_t buf[LOOP_DLEN]; |
dflet | 0:087b5655778d | 1863 | |
dflet | 0:087b5655778d | 1864 | /* Thanks for waking-up thread, but ain't got much to do now */ |
dflet | 0:087b5655778d | 1865 | rv = net_ops->recv_from(net, buf, LOOP_DLEN, NULL, NULL, 0); |
dflet | 0:087b5655778d | 1866 | if(rv <= 0) { |
dflet | 0:087b5655778d | 1867 | memset(print_buf, 0x00, PRINT_BUF_LEN); |
dflet | 0:087b5655778d | 1868 | sprintf((char*) print_buf, "MQP_ERR_LIBQUIT %i\r\n",MQP_ERR_LIBQUIT); |
dflet | 0:087b5655778d | 1869 | Uart_Write((uint8_t *) print_buf); |
dflet | 0:087b5655778d | 1870 | net_ops->close(net); |
dflet | 0:087b5655778d | 1871 | return MQP_ERR_LIBQUIT; |
dflet | 0:087b5655778d | 1872 | } |
dflet | 0:087b5655778d | 1873 | |
dflet | 0:087b5655778d | 1874 | return rv; |
dflet | 0:087b5655778d | 1875 | } |
dflet | 0:087b5655778d | 1876 | |
dflet | 0:087b5655778d | 1877 | static struct client_ctx *net_cl_ctx_find(int32_t net) { |
dflet | 0:087b5655778d | 1878 | struct client_ctx *cl_ctx = used_ctxs; |
dflet | 0:087b5655778d | 1879 | |
dflet | 0:087b5655778d | 1880 | while(cl_ctx && (net != cl_ctx->net)) |
dflet | 0:087b5655778d | 1881 | cl_ctx = cl_ctx->next; |
dflet | 0:087b5655778d | 1882 | |
dflet | 0:087b5655778d | 1883 | return cl_ctx; |
dflet | 0:087b5655778d | 1884 | } |
dflet | 0:087b5655778d | 1885 | |
dflet | 0:087b5655778d | 1886 | static int32_t proc_net_data_recv(int32_t net, struct mqtt_packet *mqp, void **app) |
dflet | 0:087b5655778d | 1887 | { |
dflet | 0:087b5655778d | 1888 | |
dflet | 0:087b5655778d | 1889 | /* Note: used_ctxs are always managed by a single RX task */ |
dflet | 0:087b5655778d | 1890 | struct client_ctx *cl_ctx = net_cl_ctx_find(net); |
dflet | 0:087b5655778d | 1891 | int32_t rv = MQP_ERR_NOTCONN; |
dflet | 0:087b5655778d | 1892 | |
dflet | 0:087b5655778d | 1893 | if(NULL == cl_ctx) { |
dflet | 0:087b5655778d | 1894 | return rv; /* TX removed it interim, mustn't happen */ |
dflet | 0:087b5655778d | 1895 | } |
dflet | 0:087b5655778d | 1896 | return mqp_setup_proc_ctx_data_recv(cl_ctx, mqp, 1, app); |
dflet | 0:087b5655778d | 1897 | } |
dflet | 0:087b5655778d | 1898 | |
dflet | 0:087b5655778d | 1899 | static int32_t cl_recv(struct mqtt_packet *mqp, uint32_t wait_secs, void **app) |
dflet | 0:087b5655778d | 1900 | { |
dflet | 0:087b5655778d | 1901 | |
dflet | 0:087b5655778d | 1902 | int32_t rv = MQP_ERR_NETWORK; |
dflet | 0:087b5655778d | 1903 | int32_t n_hnds = 0, idx = 0; |
dflet | 0:087b5655778d | 1904 | |
dflet | 0:087b5655778d | 1905 | rv = group_ctxs_rx_prep(wait_secs, app); |
dflet | 0:087b5655778d | 1906 | if(rv > 0) |
dflet | 0:087b5655778d | 1907 | n_hnds = rv; |
dflet | 0:087b5655778d | 1908 | |
dflet | 0:087b5655778d | 1909 | for(idx = 0; (idx < n_hnds) && (rv > 0); idx++) { |
dflet | 0:087b5655778d | 1910 | int32_t net = recv_hvec[idx]; |
dflet | 0:087b5655778d | 1911 | if(loopb_net == net) |
dflet | 0:087b5655778d | 1912 | rv = proc_loopback_recv(net); /* UDP Packet */ |
dflet | 0:087b5655778d | 1913 | else { |
dflet | 0:087b5655778d | 1914 | rv = proc_net_data_recv(net, mqp, app); |
dflet | 0:087b5655778d | 1915 | if(false == grp_has_cbfn) |
dflet | 0:087b5655778d | 1916 | break; /* 'CTX': inform application */ |
dflet | 0:087b5655778d | 1917 | } |
dflet | 0:087b5655778d | 1918 | } |
dflet | 0:087b5655778d | 1919 | |
dflet | 0:087b5655778d | 1920 | return rv; |
dflet | 0:087b5655778d | 1921 | } |
dflet | 0:087b5655778d | 1922 | |
dflet | 0:087b5655778d | 1923 | static int32_t grp_net_setup_create() |
dflet | 0:087b5655778d | 1924 | { |
dflet | 0:087b5655778d | 1925 | |
dflet | 0:087b5655778d | 1926 | if(0 == loopb_portid) { |
dflet | 0:087b5655778d | 1927 | return MQP_ERR_NOT_DEF; |
dflet | 0:087b5655778d | 1928 | } |
dflet | 0:087b5655778d | 1929 | if(NULL == net_ops) { |
dflet | 0:087b5655778d | 1930 | return MQP_ERR_NET_OPS; |
dflet | 0:087b5655778d | 1931 | } |
dflet | 0:087b5655778d | 1932 | if(-1 == loopb_net) { |
dflet | 0:087b5655778d | 1933 | loopb_net = net_ops->open(DEV_NETCONN_OPT_UDP, NULL, loopb_portid, NULL); |
dflet | 0:087b5655778d | 1934 | |
dflet | 0:087b5655778d | 1935 | if(-1 == loopb_net) { |
dflet | 0:087b5655778d | 1936 | return MQP_ERR_LIBQUIT; |
dflet | 0:087b5655778d | 1937 | } |
dflet | 0:087b5655778d | 1938 | } |
dflet | 0:087b5655778d | 1939 | |
dflet | 0:087b5655778d | 1940 | return 1; |
dflet | 0:087b5655778d | 1941 | } |
dflet | 0:087b5655778d | 1942 | |
dflet | 0:087b5655778d | 1943 | int32_t mqtt_client_await_msg(struct mqtt_packet *mqp, uint32_t wait_secs, void **app) |
dflet | 0:087b5655778d | 1944 | { |
dflet | 0:087b5655778d | 1945 | |
dflet | 0:087b5655778d | 1946 | int32_t rv = MQP_ERR_NOTCONN; |
dflet | 0:087b5655778d | 1947 | *app = NULL; |
dflet | 0:087b5655778d | 1948 | |
dflet | 0:087b5655778d | 1949 | if(NULL == mqp) |
dflet | 0:087b5655778d | 1950 | return MQP_ERR_FNPARAM; /* Didn't get a valid MQP */ |
dflet | 0:087b5655778d | 1951 | |
dflet | 0:087b5655778d | 1952 | if(true == grp_has_cbfn) |
dflet | 0:087b5655778d | 1953 | return MQP_ERR_BADCALL; /* Err: LIB has CB config */ |
dflet | 0:087b5655778d | 1954 | |
dflet | 0:087b5655778d | 1955 | rv = grp_net_setup_create(); |
dflet | 0:087b5655778d | 1956 | if(rv <= 0) |
dflet | 0:087b5655778d | 1957 | return rv; |
dflet | 0:087b5655778d | 1958 | |
dflet | 0:087b5655778d | 1959 | do { |
dflet | 0:087b5655778d | 1960 | rv = cl_recv(mqp, wait_secs, app); |
dflet | 0:087b5655778d | 1961 | |
dflet | 0:087b5655778d | 1962 | } while((rv > 0) && MQP_RX_DO_NOT_RPT_COR(mqp)); |
dflet | 0:087b5655778d | 1963 | |
dflet | 0:087b5655778d | 1964 | return rv; |
dflet | 0:087b5655778d | 1965 | } |
dflet | 0:087b5655778d | 1966 | |
dflet | 0:087b5655778d | 1967 | int32_t mqtt_client_run(uint32_t wait_secs) |
dflet | 0:087b5655778d | 1968 | { |
dflet | 0:087b5655778d | 1969 | |
dflet | 0:087b5655778d | 1970 | void *app = NULL; |
dflet | 0:087b5655778d | 1971 | int32_t rv = -1; |
dflet | 0:087b5655778d | 1972 | |
dflet | 0:087b5655778d | 1973 | if(false == grp_has_cbfn) { |
dflet | 0:087b5655778d | 1974 | return MQP_ERR_BADCALL; /* Err: LIB has no CB config */ |
dflet | 0:087b5655778d | 1975 | } |
dflet | 0:087b5655778d | 1976 | rv = grp_net_setup_create(); |
dflet | 0:087b5655778d | 1977 | if(rv <= 0) { |
dflet | 0:087b5655778d | 1978 | return rv; |
dflet | 0:087b5655778d | 1979 | } |
dflet | 0:087b5655778d | 1980 | do { |
dflet | 0:087b5655778d | 1981 | rv = cl_recv(NULL, wait_secs, &app); |
dflet | 0:087b5655778d | 1982 | |
dflet | 0:087b5655778d | 1983 | } while((rv > 0) || |
dflet | 0:087b5655778d | 1984 | /* 'ctx' related errors are handled by the callbacks */ |
dflet | 0:087b5655778d | 1985 | ((rv != MQP_ERR_LIBQUIT) && (rv != MQP_ERR_TIMEOUT))); |
dflet | 0:087b5655778d | 1986 | |
dflet | 0:087b5655778d | 1987 | return rv; |
dflet | 0:087b5655778d | 1988 | } |
dflet | 0:087b5655778d | 1989 | |
dflet | 0:087b5655778d | 1990 | /*------------------------------------------------------------------------------ |
dflet | 0:087b5655778d | 1991 | * Buffer Pool and management, other registrations and initialization. |
dflet | 0:087b5655778d | 1992 | *------------------------------------------------------------------------------ |
dflet | 0:087b5655778d | 1993 | */ |
dflet | 0:087b5655778d | 1994 | static struct mqtt_packet *free_list = NULL; |
dflet | 0:087b5655778d | 1995 | |
dflet | 0:087b5655778d | 1996 | static struct mqtt_packet *mqp_alloc_atomic(void) { |
dflet | 0:087b5655778d | 1997 | |
dflet | 0:087b5655778d | 1998 | struct mqtt_packet *mqp = NULL; |
dflet | 0:087b5655778d | 1999 | |
dflet | 0:087b5655778d | 2000 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 2001 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 2002 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 2003 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 2004 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 2005 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 2006 | // shared resource. |
dflet | 0:087b5655778d | 2007 | mqp = free_list; |
dflet | 0:087b5655778d | 2008 | if(mqp) { |
dflet | 0:087b5655778d | 2009 | free_list = mqp->next; |
dflet | 0:087b5655778d | 2010 | } |
dflet | 0:087b5655778d | 2011 | |
dflet | 0:087b5655778d | 2012 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 2013 | // semaphore. |
dflet | 0:087b5655778d | 2014 | xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 2015 | |
dflet | 0:087b5655778d | 2016 | } else { |
dflet | 0:087b5655778d | 2017 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 2018 | // the shared resource safely. |
dflet | 0:087b5655778d | 2019 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 2020 | } |
dflet | 0:087b5655778d | 2021 | } |
dflet | 0:087b5655778d | 2022 | |
dflet | 0:087b5655778d | 2023 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 2024 | return mqp; |
dflet | 0:087b5655778d | 2025 | } |
dflet | 0:087b5655778d | 2026 | |
dflet | 0:087b5655778d | 2027 | struct mqtt_packet *mqp_client_alloc(uint8_t msg_type, uint8_t offset) { |
dflet | 0:087b5655778d | 2028 | |
dflet | 0:087b5655778d | 2029 | struct mqtt_packet *mqp = mqp_alloc_atomic(); |
dflet | 0:087b5655778d | 2030 | if(NULL == mqp) { |
dflet | 0:087b5655778d | 2031 | USR_INFO("MQP alloc failed - msg type 0x%02x\n\r", msg_type); |
dflet | 0:087b5655778d | 2032 | return NULL; |
dflet | 0:087b5655778d | 2033 | } |
dflet | 0:087b5655778d | 2034 | |
dflet | 0:087b5655778d | 2035 | mqp_init(mqp, offset); |
dflet | 0:087b5655778d | 2036 | mqp->msg_type = msg_type; |
dflet | 0:087b5655778d | 2037 | |
dflet | 0:087b5655778d | 2038 | return mqp; |
dflet | 0:087b5655778d | 2039 | } |
dflet | 0:087b5655778d | 2040 | |
dflet | 0:087b5655778d | 2041 | /* Do not use this routine with-in this file. */ |
dflet | 0:087b5655778d | 2042 | static void free_mqp(struct mqtt_packet *mqp) |
dflet | 0:087b5655778d | 2043 | { |
dflet | 0:087b5655778d | 2044 | /* Must be used in a locked state */ |
dflet | 0:087b5655778d | 2045 | mqp->next = free_list; |
dflet | 0:087b5655778d | 2046 | free_list = mqp; |
dflet | 0:087b5655778d | 2047 | } |
dflet | 0:087b5655778d | 2048 | |
dflet | 0:087b5655778d | 2049 | int32_t mqtt_client_buffers_register(uint32_t num_mqp, struct mqtt_packet *mqp_vec, |
dflet | 0:087b5655778d | 2050 | uint32_t buf_len, uint8_t *buf_vec) |
dflet | 0:087b5655778d | 2051 | { |
dflet | 0:087b5655778d | 2052 | uint32_t i, j; |
dflet | 0:087b5655778d | 2053 | |
dflet | 0:087b5655778d | 2054 | if((0 == num_mqp) || (0 == buf_len) || free_list) |
dflet | 0:087b5655778d | 2055 | return -1; |
dflet | 0:087b5655778d | 2056 | |
dflet | 0:087b5655778d | 2057 | for(i = 0, j = 0; i < num_mqp; i++, j += buf_len) { |
dflet | 0:087b5655778d | 2058 | struct mqtt_packet *mqp = mqp_vec + i; |
dflet | 0:087b5655778d | 2059 | |
dflet | 0:087b5655778d | 2060 | mqp->buffer = buf_vec + j; |
dflet | 0:087b5655778d | 2061 | mqp->maxlen = buf_len; |
dflet | 0:087b5655778d | 2062 | |
dflet | 0:087b5655778d | 2063 | mqp->free = free_mqp; |
dflet | 0:087b5655778d | 2064 | mqp->next = free_list; |
dflet | 0:087b5655778d | 2065 | free_list = mqp; |
dflet | 0:087b5655778d | 2066 | } |
dflet | 0:087b5655778d | 2067 | |
dflet | 0:087b5655778d | 2068 | return 0; |
dflet | 0:087b5655778d | 2069 | } |
dflet | 0:087b5655778d | 2070 | |
dflet | 0:087b5655778d | 2071 | int32_t mqtt_client_ctx_will_register(void *ctx, |
dflet | 0:087b5655778d | 2072 | const struct utf8_string *will_top, |
dflet | 0:087b5655778d | 2073 | const struct utf8_string *will_msg, |
dflet | 0:087b5655778d | 2074 | enum mqtt_qos will_qos, bool retain) |
dflet | 0:087b5655778d | 2075 | { |
dflet | 0:087b5655778d | 2076 | uint8_t B = 0; |
dflet | 0:087b5655778d | 2077 | |
dflet | 0:087b5655778d | 2078 | if((NULL == ctx) || ((NULL == will_top) && (NULL != will_msg))) |
dflet | 0:087b5655778d | 2079 | return -1; /* Bad Combo */ |
dflet | 0:087b5655778d | 2080 | |
dflet | 0:087b5655778d | 2081 | if(NULL != will_top) { |
dflet | 0:087b5655778d | 2082 | RET_IF_INVALID_UTF8(will_top); |
dflet | 0:087b5655778d | 2083 | |
dflet | 0:087b5655778d | 2084 | B = QOS_VALUE(will_qos) << 3; |
dflet | 0:087b5655778d | 2085 | if(retain) |
dflet | 0:087b5655778d | 2086 | B |= WILL_RETAIN_VAL; |
dflet | 0:087b5655778d | 2087 | |
dflet | 0:087b5655778d | 2088 | if(NULL != will_msg) |
dflet | 0:087b5655778d | 2089 | RET_IF_INVALID_UTF8(will_msg); |
dflet | 0:087b5655778d | 2090 | } |
dflet | 0:087b5655778d | 2091 | |
dflet | 0:087b5655778d | 2092 | CLIENT(ctx)->conn_pl_utf8s[1] = will_top; |
dflet | 0:087b5655778d | 2093 | CLIENT(ctx)->conn_pl_utf8s[2] = will_msg; |
dflet | 0:087b5655778d | 2094 | |
dflet | 0:087b5655778d | 2095 | CLIENT(ctx)->will_opts = B; |
dflet | 0:087b5655778d | 2096 | |
dflet | 0:087b5655778d | 2097 | return 0; |
dflet | 0:087b5655778d | 2098 | } |
dflet | 0:087b5655778d | 2099 | |
dflet | 0:087b5655778d | 2100 | int32_t mqtt_client_ctx_info_register(void *ctx, |
dflet | 0:087b5655778d | 2101 | const struct utf8_string *client_id, |
dflet | 0:087b5655778d | 2102 | const struct utf8_string *user_name, |
dflet | 0:087b5655778d | 2103 | const struct utf8_string *pass_word) |
dflet | 0:087b5655778d | 2104 | { |
dflet | 0:087b5655778d | 2105 | const struct utf8_string *users_pwd = NULL; |
dflet | 0:087b5655778d | 2106 | |
dflet | 0:087b5655778d | 2107 | if(NULL == ctx) |
dflet | 0:087b5655778d | 2108 | return -1; |
dflet | 0:087b5655778d | 2109 | |
dflet | 0:087b5655778d | 2110 | if(NULL != client_id) |
dflet | 0:087b5655778d | 2111 | RET_IF_INVALID_UTF8(client_id); |
dflet | 0:087b5655778d | 2112 | |
dflet | 0:087b5655778d | 2113 | if(NULL != user_name) { |
dflet | 0:087b5655778d | 2114 | RET_IF_INVALID_UTF8(user_name); |
dflet | 0:087b5655778d | 2115 | |
dflet | 0:087b5655778d | 2116 | if(NULL != pass_word) |
dflet | 0:087b5655778d | 2117 | RET_IF_INVALID_UTF8(pass_word); |
dflet | 0:087b5655778d | 2118 | |
dflet | 0:087b5655778d | 2119 | users_pwd = pass_word; |
dflet | 0:087b5655778d | 2120 | } |
dflet | 0:087b5655778d | 2121 | |
dflet | 0:087b5655778d | 2122 | CLIENT(ctx)->conn_pl_utf8s[0] = client_id; |
dflet | 0:087b5655778d | 2123 | CLIENT(ctx)->conn_pl_utf8s[3] = user_name; |
dflet | 0:087b5655778d | 2124 | CLIENT(ctx)->conn_pl_utf8s[4] = users_pwd; |
dflet | 0:087b5655778d | 2125 | |
dflet | 0:087b5655778d | 2126 | return 0; |
dflet | 0:087b5655778d | 2127 | } |
dflet | 0:087b5655778d | 2128 | |
dflet | 0:087b5655778d | 2129 | int32_t mqtt_client_net_svc_register(const struct device_net_services *net) |
dflet | 0:087b5655778d | 2130 | { |
dflet | 0:087b5655778d | 2131 | if(net && net->open && net->send && net->recv && |
dflet | 0:087b5655778d | 2132 | net->send_dest && net->recv_from && net->close |
dflet | 0:087b5655778d | 2133 | && net->io_mon && net->time) { |
dflet | 0:087b5655778d | 2134 | net_ops = net; |
dflet | 0:087b5655778d | 2135 | return 0; |
dflet | 0:087b5655778d | 2136 | } |
dflet | 0:087b5655778d | 2137 | |
dflet | 0:087b5655778d | 2138 | return -1; |
dflet | 0:087b5655778d | 2139 | } |
dflet | 0:087b5655778d | 2140 | |
dflet | 0:087b5655778d | 2141 | static void cl_ctx_setup(struct client_ctx *cl_ctx, /* WR Object */ |
dflet | 0:087b5655778d | 2142 | const struct mqtt_client_ctx_cfg *ctx_cfg, |
dflet | 0:087b5655778d | 2143 | const struct mqtt_client_ctx_cbs *ctx_cbs, |
dflet | 0:087b5655778d | 2144 | void *app) |
dflet | 0:087b5655778d | 2145 | { |
dflet | 0:087b5655778d | 2146 | |
dflet | 0:087b5655778d | 2147 | struct client_desc *client = CLIENT(cl_ctx); |
dflet | 0:087b5655778d | 2148 | |
dflet | 0:087b5655778d | 2149 | cl_ctx->flags = ctx_cfg->config_opts; |
dflet | 0:087b5655778d | 2150 | |
dflet | 0:087b5655778d | 2151 | client->nwconn_opts = ctx_cfg->nwconn_opts; |
dflet | 0:087b5655778d | 2152 | client->server_addr = ctx_cfg->server_addr; |
dflet | 0:087b5655778d | 2153 | client->port_number = ctx_cfg->port_number; |
dflet | 0:087b5655778d | 2154 | |
dflet | 0:087b5655778d | 2155 | client->app = app; |
dflet | 0:087b5655778d | 2156 | |
dflet | 0:087b5655778d | 2157 | |
dflet | 0:087b5655778d | 2158 | if(NULL != ctx_cfg->nw_security) |
dflet | 0:087b5655778d | 2159 | buf_wr_nbytes((uint8_t*)&client->nw_security, |
dflet | 0:087b5655778d | 2160 | (uint8_t*)ctx_cfg->nw_security, |
dflet | 0:087b5655778d | 2161 | sizeof(struct secure_conn)); |
dflet | 0:087b5655778d | 2162 | |
dflet | 0:087b5655778d | 2163 | if(NULL != ctx_cbs) { |
dflet | 0:087b5655778d | 2164 | // set callback flag |
dflet | 0:087b5655778d | 2165 | struct mqtt_client_ctx_cbs *cbs_ctx = CTX_CBS_PTR(client); |
dflet | 0:087b5655778d | 2166 | cbs_ctx->publish_rx = ctx_cbs->publish_rx; |
dflet | 0:087b5655778d | 2167 | cbs_ctx->ack_notify = ctx_cbs->ack_notify; |
dflet | 0:087b5655778d | 2168 | cbs_ctx->disconn_cb = ctx_cbs->disconn_cb; |
dflet | 0:087b5655778d | 2169 | } |
dflet | 0:087b5655778d | 2170 | |
dflet | 0:087b5655778d | 2171 | return; |
dflet | 0:087b5655778d | 2172 | } |
dflet | 0:087b5655778d | 2173 | |
dflet | 0:087b5655778d | 2174 | int32_t mqtt_client_ctx_create(const struct mqtt_client_ctx_cfg *ctx_cfg, |
dflet | 0:087b5655778d | 2175 | const struct mqtt_client_ctx_cbs *ctx_cbs, |
dflet | 0:087b5655778d | 2176 | void *app, void **ctx) |
dflet | 0:087b5655778d | 2177 | { |
dflet | 0:087b5655778d | 2178 | |
dflet | 0:087b5655778d | 2179 | struct client_ctx *cl_ctx = NULL; |
dflet | 0:087b5655778d | 2180 | |
dflet | 0:087b5655778d | 2181 | if((NULL == ctx_cfg) || |
dflet | 0:087b5655778d | 2182 | (NULL == ctx_cfg->server_addr) || |
dflet | 0:087b5655778d | 2183 | (0 == ctx_cfg->port_number)) { |
dflet | 0:087b5655778d | 2184 | return -1; |
dflet | 0:087b5655778d | 2185 | } |
dflet | 0:087b5655778d | 2186 | if(ctx_cfg->config_opts & MQTT_CFG_MK_GROUP_CTX) { |
dflet | 0:087b5655778d | 2187 | if(grp_has_cbfn ^ (!!ctx_cbs)) { |
dflet | 0:087b5655778d | 2188 | return -1; |
dflet | 0:087b5655778d | 2189 | } |
dflet | 0:087b5655778d | 2190 | } |
dflet | 0:087b5655778d | 2191 | |
dflet | 0:087b5655778d | 2192 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 2193 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 2194 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 2195 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 2196 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 2197 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 2198 | // shared resource. |
dflet | 0:087b5655778d | 2199 | if(free_ctxs) { |
dflet | 0:087b5655778d | 2200 | cl_ctx = free_ctxs; |
dflet | 0:087b5655778d | 2201 | free_ctxs = cl_ctx->next; |
dflet | 0:087b5655778d | 2202 | cl_ctx->next = NULL; |
dflet | 0:087b5655778d | 2203 | } |
dflet | 0:087b5655778d | 2204 | |
dflet | 0:087b5655778d | 2205 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 2206 | // semaphore. |
dflet | 0:087b5655778d | 2207 | xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 2208 | |
dflet | 0:087b5655778d | 2209 | } else { |
dflet | 0:087b5655778d | 2210 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 2211 | // the shared resource safely. |
dflet | 0:087b5655778d | 2212 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 2213 | } |
dflet | 0:087b5655778d | 2214 | } |
dflet | 0:087b5655778d | 2215 | |
dflet | 0:087b5655778d | 2216 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 2217 | |
dflet | 0:087b5655778d | 2218 | if(cl_ctx) { |
dflet | 0:087b5655778d | 2219 | cl_ctx_setup(cl_ctx, ctx_cfg, ctx_cbs, app); |
dflet | 0:087b5655778d | 2220 | *ctx = (void*) cl_ctx; |
dflet | 0:087b5655778d | 2221 | return 0; |
dflet | 0:087b5655778d | 2222 | } |
dflet | 0:087b5655778d | 2223 | |
dflet | 0:087b5655778d | 2224 | return -1; |
dflet | 0:087b5655778d | 2225 | } |
dflet | 0:087b5655778d | 2226 | |
dflet | 0:087b5655778d | 2227 | int32_t mqtt_client_ctx_delete(void *ctx) |
dflet | 0:087b5655778d | 2228 | { |
dflet | 0:087b5655778d | 2229 | |
dflet | 0:087b5655778d | 2230 | struct client_ctx *cl_ctx = (struct client_ctx*) ctx; |
dflet | 0:087b5655778d | 2231 | int32_t rv = -1; /* Not sure about deletion as yet */ |
dflet | 0:087b5655778d | 2232 | |
dflet | 0:087b5655778d | 2233 | // MUTEX_LOCKIN(); |
dflet | 0:087b5655778d | 2234 | if( xSemaphore != NULL ) { |
dflet | 0:087b5655778d | 2235 | // See if we can obtain the semaphore. If the semaphore is not available |
dflet | 0:087b5655778d | 2236 | // wait 10 ticks to see if it becomes free. |
dflet | 0:087b5655778d | 2237 | if( xSemaphoreTake( xSemaphore, ( TickType_t ) 40 ) == pdTRUE ) { |
dflet | 0:087b5655778d | 2238 | // We were able to obtain the semaphore and can now access the |
dflet | 0:087b5655778d | 2239 | // shared resource. |
dflet | 0:087b5655778d | 2240 | if((NULL == cl_ctx) || |
dflet | 0:087b5655778d | 2241 | (-1 != cl_ctx->net) || |
dflet | 0:087b5655778d | 2242 | (awaits_pkts(cl_ctx))) { |
dflet | 0:087b5655778d | 2243 | goto mqtt_client_ctx_delete_exit1; |
dflet | 0:087b5655778d | 2244 | } |
dflet | 0:087b5655778d | 2245 | rv = 0; /* OK to delete ctx */ |
dflet | 0:087b5655778d | 2246 | client_reset(CLIENT(cl_ctx)); |
dflet | 0:087b5655778d | 2247 | cl_ctx_freeup(cl_ctx); |
dflet | 0:087b5655778d | 2248 | |
dflet | 0:087b5655778d | 2249 | // We have finished accessing the shared resource. Release the |
dflet | 0:087b5655778d | 2250 | // semaphore. |
dflet | 0:087b5655778d | 2251 | //xSemaphoreGive( xSemaphore ); |
dflet | 0:087b5655778d | 2252 | } else { |
dflet | 0:087b5655778d | 2253 | // We could not obtain the semaphore and can therefore not access |
dflet | 0:087b5655778d | 2254 | // the shared resource safely. |
dflet | 0:087b5655778d | 2255 | Uart_Write((uint8_t*)"Unable to obtain the semaphore\r\n"); |
dflet | 0:087b5655778d | 2256 | } |
dflet | 0:087b5655778d | 2257 | } |
dflet | 0:087b5655778d | 2258 | |
dflet | 0:087b5655778d | 2259 | mqtt_client_ctx_delete_exit1: |
dflet | 0:087b5655778d | 2260 | // MUTEX_UNLOCK(); |
dflet | 0:087b5655778d | 2261 | xSemaphoreGive(xSemaphore); |
dflet | 0:087b5655778d | 2262 | |
dflet | 0:087b5655778d | 2263 | return rv; |
dflet | 0:087b5655778d | 2264 | } |
dflet | 0:087b5655778d | 2265 | |
dflet | 0:087b5655778d | 2266 | int32_t mqtt_client_lib_init(const struct mqtt_client_lib_cfg *lib_cfg) |
dflet | 0:087b5655778d | 2267 | { |
dflet | 0:087b5655778d | 2268 | if((NULL == lib_cfg) || (NULL == lib_cfg->debug_printf)) |
dflet | 0:087b5655778d | 2269 | return -1; |
dflet | 0:087b5655778d | 2270 | |
dflet | 0:087b5655778d | 2271 | debug_printf = lib_cfg->debug_printf; /* Facilitate debug */ |
dflet | 0:087b5655778d | 2272 | |
dflet | 0:087b5655778d | 2273 | if(INIT_DONE_STATE == cl_lib_state) { |
dflet | 0:087b5655778d | 2274 | Uart_Write((uint8_t*)"C: Error trying to re-initialize \n\r"); |
dflet | 0:087b5655778d | 2275 | USR_INFO("C: Error trying to re-initialize \n\r"); |
dflet | 0:087b5655778d | 2276 | return -1; |
dflet | 0:087b5655778d | 2277 | } |
dflet | 0:087b5655778d | 2278 | |
dflet | 0:087b5655778d | 2279 | USR_INFO("Version: Client LIB %s, Common LIB %s.\n\r", |
dflet | 0:087b5655778d | 2280 | MQTT_CLIENT_VERSTR, MQTT_COMMON_VERSTR); |
dflet | 0:087b5655778d | 2281 | |
dflet | 0:087b5655778d | 2282 | client_desc_init(); |
dflet | 0:087b5655778d | 2283 | |
dflet | 0:087b5655778d | 2284 | cl_lib_state = INIT_DONE_STATE; |
dflet | 0:087b5655778d | 2285 | |
dflet | 0:087b5655778d | 2286 | loopb_portid = lib_cfg->loopback_port; |
dflet | 0:087b5655778d | 2287 | grp_has_cbfn = lib_cfg->grp_uses_cbfn; |
dflet | 0:087b5655778d | 2288 | |
dflet | 0:087b5655778d | 2289 | mutex = lib_cfg->mutex; |
dflet | 0:087b5655778d | 2290 | mutex_lockin = lib_cfg->mutex_lockin; |
dflet | 0:087b5655778d | 2291 | mutex_unlock = lib_cfg->mutex_unlock; |
dflet | 0:087b5655778d | 2292 | |
dflet | 0:087b5655778d | 2293 | aux_dbg_enbl = lib_cfg->aux_debug_en; |
dflet | 0:087b5655778d | 2294 | |
dflet | 0:087b5655778d | 2295 | return 0; |
dflet | 0:087b5655778d | 2296 | } |
dflet | 0:087b5655778d | 2297 | |
dflet | 0:087b5655778d | 2298 | int32_t mqtt_client_lib_exit() |
dflet | 0:087b5655778d | 2299 | { |
dflet | 0:087b5655778d | 2300 | struct client_ctx *cl_ctx = free_ctxs; |
dflet | 0:087b5655778d | 2301 | int32_t count = 0; |
dflet | 0:087b5655778d | 2302 | |
dflet | 0:087b5655778d | 2303 | while(cl_ctx) { |
dflet | 0:087b5655778d | 2304 | cl_ctx = cl_ctx->next; |
dflet | 0:087b5655778d | 2305 | count++; |
dflet | 0:087b5655778d | 2306 | } |
dflet | 0:087b5655778d | 2307 | |
dflet | 0:087b5655778d | 2308 | if(MAX_NWCONN == count) { |
dflet | 0:087b5655778d | 2309 | cl_lib_state = WAIT_INIT_STATE; |
dflet | 0:087b5655778d | 2310 | free_ctxs = NULL; |
dflet | 0:087b5655778d | 2311 | return 0; |
dflet | 0:087b5655778d | 2312 | } |
dflet | 0:087b5655778d | 2313 | |
dflet | 0:087b5655778d | 2314 | return -1; |
dflet | 0:087b5655778d | 2315 | } |
dflet | 0:087b5655778d | 2316 | |
dflet | 0:087b5655778d | 2317 | }//namespace mbed_mqtt |