Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: TYBLE16_simple_data_logger TYBLE16_MP3_Air
lwip_mqtt.c
00001 /** 00002 * @file 00003 * MQTT client 00004 * 00005 * @defgroup mqtt MQTT client 00006 * @ingroup apps 00007 * @verbinclude mqtt_client.txt 00008 */ 00009 00010 /* 00011 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> 00012 * All rights reserved. 00013 * 00014 * Redistribution and use in source and binary forms, with or without modification, 00015 * are permitted provided that the following conditions are met: 00016 * 00017 * 1. Redistributions of source code must retain the above copyright notice, 00018 * this list of conditions and the following disclaimer. 00019 * 2. Redistributions in binary form must reproduce the above copyright notice, 00020 * this list of conditions and the following disclaimer in the documentation 00021 * and/or other materials provided with the distribution. 00022 * 3. The name of the author may not be used to endorse or promote products 00023 * derived from this software without specific prior written permission. 00024 * 00025 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 00026 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 00027 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 00028 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00029 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 00030 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00031 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00032 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 00033 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY 00034 * OF SUCH DAMAGE. 00035 * 00036 * This file is part of the lwIP TCP/IP stack 00037 * 00038 * Author: Erik Andersson <erian747@gmail.com> 00039 * 00040 * 00041 * @todo: 00042 * - Handle large outgoing payloads for PUBLISH messages 00043 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) 00044 * - Add support for legacy MQTT protocol version 00045 * 00046 * Please coordinate changes and requests with Erik Andersson 00047 * Erik Andersson <erian747@gmail.com> 00048 * 00049 */ 00050 #include "lwip/apps/mqtt.h" 00051 #include "lwip/apps/mqtt_priv.h" 00052 #include "lwip/timeouts.h" 00053 #include "lwip/ip_addr.h" 00054 #include "lwip/mem.h" 00055 #include "lwip/err.h" 00056 #include "lwip/pbuf.h" 00057 #include "lwip/altcp.h" 00058 #include "lwip/altcp_tcp.h" 00059 #include "lwip/altcp_tls.h" 00060 #include <string.h> 00061 00062 #if LWIP_TCP && LWIP_CALLBACK_API 00063 00064 /** 00065 * MQTT_DEBUG: Default is off. 00066 */ 00067 #if !defined MQTT_DEBUG || defined __DOXYGEN__ 00068 #define MQTT_DEBUG LWIP_DBG_OFF 00069 #endif 00070 00071 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) 00072 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) 00073 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) 00074 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) 00075 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) 00076 00077 00078 00079 /** 00080 * MQTT client connection states 00081 */ 00082 enum { 00083 TCP_DISCONNECTED, 00084 TCP_CONNECTING, 00085 MQTT_CONNECTING, 00086 MQTT_CONNECTED 00087 }; 00088 00089 /** 00090 * MQTT control message types 00091 */ 00092 enum mqtt_message_type { 00093 MQTT_MSG_TYPE_CONNECT = 1, 00094 MQTT_MSG_TYPE_CONNACK = 2, 00095 MQTT_MSG_TYPE_PUBLISH = 3, 00096 MQTT_MSG_TYPE_PUBACK = 4, 00097 MQTT_MSG_TYPE_PUBREC = 5, 00098 MQTT_MSG_TYPE_PUBREL = 6, 00099 MQTT_MSG_TYPE_PUBCOMP = 7, 00100 MQTT_MSG_TYPE_SUBSCRIBE = 8, 00101 MQTT_MSG_TYPE_SUBACK = 9, 00102 MQTT_MSG_TYPE_UNSUBSCRIBE = 10, 00103 MQTT_MSG_TYPE_UNSUBACK = 11, 00104 MQTT_MSG_TYPE_PINGREQ = 12, 00105 MQTT_MSG_TYPE_PINGRESP = 13, 00106 MQTT_MSG_TYPE_DISCONNECT = 14 00107 }; 00108 00109 /** Helpers to extract control packet type and qos from first byte in fixed header */ 00110 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) 00111 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) 00112 00113 /** 00114 * MQTT connect flags, only used in CONNECT message 00115 */ 00116 enum mqtt_connect_flag { 00117 MQTT_CONNECT_FLAG_USERNAME = 1 << 7, 00118 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, 00119 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, 00120 MQTT_CONNECT_FLAG_WILL = 1 << 2, 00121 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 00122 }; 00123 00124 00125 static void mqtt_cyclic_timer(void *arg); 00126 00127 #if defined(LWIP_DEBUG) 00128 static const char *const mqtt_message_type_str[15] = { 00129 "UNDEFINED", 00130 "CONNECT", 00131 "CONNACK", 00132 "PUBLISH", 00133 "PUBACK", 00134 "PUBREC", 00135 "PUBREL", 00136 "PUBCOMP", 00137 "SUBSCRIBE", 00138 "SUBACK", 00139 "UNSUBSCRIBE", 00140 "UNSUBACK", 00141 "PINGREQ", 00142 "PINGRESP", 00143 "DISCONNECT" 00144 }; 00145 00146 /** 00147 * Message type value to string 00148 * @param msg_type see enum mqtt_message_type 00149 * 00150 * @return Control message type text string 00151 */ 00152 static const char * 00153 mqtt_msg_type_to_str(u8_t msg_type) 00154 { 00155 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { 00156 msg_type = 0; 00157 } 00158 return mqtt_message_type_str[msg_type]; 00159 } 00160 00161 #endif 00162 00163 00164 /** 00165 * Generate MQTT packet identifier 00166 * @param client MQTT client 00167 * @return New packet identifier, range 1 to 65535 00168 */ 00169 static u16_t 00170 msg_generate_packet_id(mqtt_client_t *client) 00171 { 00172 client->pkt_id_seq++; 00173 if (client->pkt_id_seq == 0) { 00174 client->pkt_id_seq++; 00175 } 00176 return client->pkt_id_seq; 00177 } 00178 00179 /*--------------------------------------------------------------------------------------------------------------------- */ 00180 /* Output ring buffer */ 00181 00182 /** Add single item to ring buffer */ 00183 static void 00184 mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item) 00185 { 00186 rb->buf[rb->put] = item; 00187 rb->put++; 00188 if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) { 00189 rb->put = 0; 00190 } 00191 } 00192 00193 /** Return pointer to ring buffer get position */ 00194 static u8_t * 00195 mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb) 00196 { 00197 return &rb->buf[rb->get]; 00198 } 00199 00200 static void 00201 mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len) 00202 { 00203 LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE); 00204 00205 rb->get += len; 00206 if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) { 00207 rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE; 00208 } 00209 } 00210 00211 /** Return number of bytes in ring buffer */ 00212 static u16_t 00213 mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb) 00214 { 00215 u32_t len = rb->put - rb->get; 00216 if (len > 0xFFFF) { 00217 len += MQTT_OUTPUT_RINGBUF_SIZE; 00218 } 00219 return (u16_t)len; 00220 } 00221 00222 /** Return number of bytes free in ring buffer */ 00223 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) 00224 00225 /** Return number of bytes possible to read without wrapping around */ 00226 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get)) 00227 00228 /** 00229 * Try send as many bytes as possible from output ring buffer 00230 * @param rb Output ring buffer 00231 * @param tpcb TCP connection handle 00232 */ 00233 static void 00234 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb) 00235 { 00236 err_t err; 00237 u8_t wrap = 0; 00238 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); 00239 u16_t send_len = altcp_sndbuf (tpcb); 00240 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); 00241 00242 if (send_len == 0 || ringbuf_lin_len == 0) { 00243 return; 00244 } 00245 00246 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", 00247 send_len, ringbuf_lin_len, rb->get, rb->put)); 00248 00249 if (send_len > ringbuf_lin_len) { 00250 /* Space in TCP output buffer is larger than available in ring buffer linear portion */ 00251 send_len = ringbuf_lin_len; 00252 /* Wrap around if more data in ring buffer after linear portion */ 00253 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); 00254 } 00255 err = altcp_write (tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); 00256 if ((err == ERR_OK) && wrap) { 00257 mqtt_ringbuf_advance_get_idx(rb, send_len); 00258 /* Use the lesser one of ring buffer linear length and TCP send buffer size */ 00259 send_len = LWIP_MIN(altcp_sndbuf (tpcb), mqtt_ringbuf_linear_read_length(rb)); 00260 err = altcp_write (tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); 00261 } 00262 00263 if (err == ERR_OK) { 00264 mqtt_ringbuf_advance_get_idx(rb, send_len); 00265 /* Flush */ 00266 altcp_output (tpcb); 00267 } else { 00268 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); 00269 } 00270 } 00271 00272 00273 00274 /*--------------------------------------------------------------------------------------------------------------------- */ 00275 /* Request queue */ 00276 00277 /** 00278 * Create request item 00279 * @param r_objs Pointer to request objects 00280 * @param r_objs_len Number of array entries 00281 * @param pkt_id Packet identifier of request 00282 * @param cb Packet callback to call when requests lifetime ends 00283 * @param arg Parameter following callback 00284 * @return Request or NULL if failed to create 00285 */ 00286 static struct mqtt_request_t * 00287 mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) 00288 { 00289 struct mqtt_request_t *r = NULL; 00290 u8_t n; 00291 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); 00292 for (n = 0; n < r_objs_len; n++) { 00293 /* Item point to itself if not in use */ 00294 if (r_objs[n].next == &r_objs[n]) { 00295 r = &r_objs[n]; 00296 r->next = NULL; 00297 r->cb = cb; 00298 r->arg = arg; 00299 r->pkt_id = pkt_id; 00300 break; 00301 } 00302 } 00303 return r; 00304 } 00305 00306 00307 /** 00308 * Append request to pending request queue 00309 * @param tail Pointer to request queue tail pointer 00310 * @param r Request to append 00311 */ 00312 static void 00313 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) 00314 { 00315 struct mqtt_request_t *head = NULL; 00316 s16_t time_before = 0; 00317 struct mqtt_request_t *iter; 00318 00319 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); 00320 00321 /* Iterate trough queue to find head, and count total timeout time */ 00322 for (iter = *tail; iter != NULL; iter = iter->next) { 00323 time_before += iter->timeout_diff; 00324 head = iter; 00325 } 00326 00327 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); 00328 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; 00329 if (head == NULL) { 00330 *tail = r; 00331 } else { 00332 head->next = r; 00333 } 00334 } 00335 00336 00337 /** 00338 * Delete request item 00339 * @param r Request item to delete 00340 */ 00341 static void 00342 mqtt_delete_request(struct mqtt_request_t *r) 00343 { 00344 if (r != NULL) { 00345 r->next = r; 00346 } 00347 } 00348 00349 /** 00350 * Remove a request item with a specific packet identifier from request queue 00351 * @param tail Pointer to request queue tail pointer 00352 * @param pkt_id Packet identifier of request to take 00353 * @return Request item if found, NULL if not 00354 */ 00355 static struct mqtt_request_t * 00356 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) 00357 { 00358 struct mqtt_request_t *iter = NULL, *prev = NULL; 00359 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); 00360 /* Search all request for pkt_id */ 00361 for (iter = *tail; iter != NULL; iter = iter->next) { 00362 if (iter->pkt_id == pkt_id) { 00363 break; 00364 } 00365 prev = iter; 00366 } 00367 00368 /* If request was found */ 00369 if (iter != NULL) { 00370 /* unchain */ 00371 if (prev == NULL) { 00372 *tail = iter->next; 00373 } else { 00374 prev->next = iter->next; 00375 } 00376 /* If exists, add remaining timeout time for the request to next */ 00377 if (iter->next != NULL) { 00378 iter->next->timeout_diff += iter->timeout_diff; 00379 } 00380 iter->next = NULL; 00381 } 00382 return iter; 00383 } 00384 00385 /** 00386 * Handle requests timeout 00387 * @param tail Pointer to request queue tail pointer 00388 * @param t Time since last call in seconds 00389 */ 00390 static void 00391 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) 00392 { 00393 struct mqtt_request_t *r; 00394 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); 00395 r = *tail; 00396 while (t > 0 && r != NULL) { 00397 if (t >= r->timeout_diff) { 00398 t -= (u8_t)r->timeout_diff; 00399 /* Unchain */ 00400 *tail = r->next; 00401 /* Notify upper layer about timeout */ 00402 if (r->cb != NULL) { 00403 r->cb(r->arg, ERR_TIMEOUT); 00404 } 00405 mqtt_delete_request(r); 00406 /* Tail might be be modified in callback, so re-read it in every iteration */ 00407 r = *(struct mqtt_request_t *const volatile *)tail; 00408 } else { 00409 r->timeout_diff -= t; 00410 t = 0; 00411 } 00412 } 00413 } 00414 00415 /** 00416 * Free all request items 00417 * @param tail Pointer to request queue tail pointer 00418 */ 00419 static void 00420 mqtt_clear_requests(struct mqtt_request_t **tail) 00421 { 00422 struct mqtt_request_t *iter, *next; 00423 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); 00424 for (iter = *tail; iter != NULL; iter = next) { 00425 next = iter->next; 00426 mqtt_delete_request(iter); 00427 } 00428 *tail = NULL; 00429 } 00430 /** 00431 * Initialize all request items 00432 * @param r_objs Pointer to request objects 00433 * @param r_objs_len Number of array entries 00434 */ 00435 static void 00436 mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len) 00437 { 00438 u8_t n; 00439 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); 00440 for (n = 0; n < r_objs_len; n++) { 00441 /* Item pointing to itself indicates unused */ 00442 r_objs[n].next = &r_objs[n]; 00443 } 00444 } 00445 00446 /*--------------------------------------------------------------------------------------------------------------------- */ 00447 /* Output message build helpers */ 00448 00449 00450 static void 00451 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) 00452 { 00453 mqtt_ringbuf_put(rb, value); 00454 } 00455 00456 static 00457 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) 00458 { 00459 mqtt_ringbuf_put(rb, value >> 8); 00460 mqtt_ringbuf_put(rb, value & 0xff); 00461 } 00462 00463 static void 00464 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) 00465 { 00466 u16_t n; 00467 for (n = 0; n < length; n++) { 00468 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); 00469 } 00470 } 00471 00472 static void 00473 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) 00474 { 00475 u16_t n; 00476 mqtt_ringbuf_put(rb, length >> 8); 00477 mqtt_ringbuf_put(rb, length & 0xff); 00478 for (n = 0; n < length; n++) { 00479 mqtt_ringbuf_put(rb, str[n]); 00480 } 00481 } 00482 00483 /** 00484 * Append fixed header 00485 * @param rb Output ring buffer 00486 * @param msg_type see enum mqtt_message_type 00487 * @param fdup MQTT DUP flag 00488 * @param fqos MQTT QoS field 00489 * @param fretain MQTT retain flag 00490 * @param r_length Remaining length after fixed header 00491 */ 00492 00493 static void 00494 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup, 00495 u8_t fqos, u8_t fretain, u16_t r_length) 00496 { 00497 /* Start with control byte */ 00498 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1))); 00499 /* Encode remaining length field */ 00500 do { 00501 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); 00502 r_length >>= 7; 00503 } while (r_length > 0); 00504 } 00505 00506 00507 /** 00508 * Check output buffer space 00509 * @param rb Output ring buffer 00510 * @param r_length Remaining length after fixed header 00511 * @return 1 if message will fit, 0 if not enough buffer space 00512 */ 00513 static u8_t 00514 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) 00515 { 00516 /* Start with length of type byte + remaining length */ 00517 u16_t total_len = 1 + r_length; 00518 00519 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); 00520 00521 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ 00522 do { 00523 total_len++; 00524 r_length >>= 7; 00525 } while (r_length > 0); 00526 00527 return (total_len <= mqtt_ringbuf_free(rb)); 00528 } 00529 00530 00531 /** 00532 * Close connection to server 00533 * @param client MQTT client 00534 * @param reason Reason for disconnection 00535 */ 00536 static void 00537 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) 00538 { 00539 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); 00540 00541 /* Bring down TCP connection if not already done */ 00542 if (client->conn != NULL) { 00543 err_t res; 00544 altcp_recv (client->conn, NULL); 00545 altcp_err (client->conn, NULL); 00546 altcp_sent (client->conn, NULL); 00547 res = altcp_close (client->conn); 00548 if (res != ERR_OK) { 00549 altcp_abort (client->conn); 00550 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res))); 00551 } 00552 client->conn = NULL; 00553 } 00554 00555 /* Remove all pending requests */ 00556 mqtt_clear_requests(&client->pend_req_queue); 00557 /* Stop cyclic timer */ 00558 sys_untimeout(mqtt_cyclic_timer, client); 00559 00560 /* Notify upper layer of disconnection if changed state */ 00561 if (client->conn_state != TCP_DISCONNECTED) { 00562 00563 client->conn_state = TCP_DISCONNECTED; 00564 if (client->connect_cb != NULL) { 00565 client->connect_cb(client, client->connect_arg, reason); 00566 } 00567 } 00568 } 00569 00570 00571 /** 00572 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states 00573 * @param arg MQTT client 00574 */ 00575 static void 00576 mqtt_cyclic_timer(void *arg) 00577 { 00578 u8_t restart_timer = 1; 00579 mqtt_client_t *client = (mqtt_client_t *)arg; 00580 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); 00581 00582 if (client->conn_state == MQTT_CONNECTING) { 00583 client->cyclic_tick++; 00584 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { 00585 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); 00586 /* Disconnect TCP */ 00587 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 00588 restart_timer = 0; 00589 } 00590 } else if (client->conn_state == MQTT_CONNECTED) { 00591 /* Handle timeout for pending requests */ 00592 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); 00593 00594 /* keep_alive > 0 means keep alive functionality shall be used */ 00595 if (client->keep_alive > 0) { 00596 00597 client->server_watchdog++; 00598 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ 00599 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) { 00600 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); 00601 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 00602 restart_timer = 0; 00603 } 00604 00605 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ 00606 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { 00607 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n")); 00608 if (mqtt_output_check_space(&client->output, 0) != 0) { 00609 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); 00610 client->cyclic_tick = 0; 00611 } 00612 } else { 00613 client->cyclic_tick++; 00614 } 00615 } 00616 } else { 00617 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); 00618 restart_timer = 0; 00619 } 00620 if (restart_timer) { 00621 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg); 00622 } 00623 } 00624 00625 00626 /** 00627 * Send PUBACK, PUBREC or PUBREL response message 00628 * @param client MQTT client 00629 * @param msg PUBACK, PUBREC or PUBREL 00630 * @param pkt_id Packet identifier 00631 * @param qos QoS value 00632 * @return ERR_OK if successful, ERR_MEM if out of memory 00633 */ 00634 static err_t 00635 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) 00636 { 00637 err_t err = ERR_OK; 00638 if (mqtt_output_check_space(&client->output, 2)) { 00639 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); 00640 mqtt_output_append_u16(&client->output, pkt_id); 00641 mqtt_output_send(&client->output, client->conn); 00642 } else { 00643 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", 00644 mqtt_msg_type_to_str(msg), pkt_id)); 00645 err = ERR_MEM; 00646 } 00647 return err; 00648 } 00649 00650 /** 00651 * Subscribe response from server 00652 * @param r Matching request 00653 * @param result Result code from server 00654 */ 00655 static void 00656 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) 00657 { 00658 if (r->cb != NULL) { 00659 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); 00660 } 00661 } 00662 00663 00664 /** 00665 * Complete MQTT message received or buffer full 00666 * @param client MQTT client 00667 * @param fixed_hdr_idx header index 00668 * @param length length received part 00669 * @param remaining_length Remaining length of complete message 00670 */ 00671 static mqtt_connection_status_t 00672 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) 00673 { 00674 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; 00675 00676 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; 00677 size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_idx; 00678 00679 /* Control packet type */ 00680 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); 00681 u16_t pkt_id = 0; 00682 00683 LWIP_ASSERT("client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN", client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN); 00684 LWIP_ASSERT("fixed_hdr_idx <= client->msg_idx", fixed_hdr_idx <= client->msg_idx); 00685 LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN, 00686 return MQTT_CONNECT_DISCONNECTED); 00687 00688 if (pkt_type == MQTT_MSG_TYPE_CONNACK) { 00689 if (client->conn_state == MQTT_CONNECTING) { 00690 if (length < 2) { 00691 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n")); 00692 goto out_disconnect; 00693 } 00694 /* Get result code from CONNACK */ 00695 res = (mqtt_connection_status_t)var_hdr_payload[1]; 00696 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res)); 00697 if (res == MQTT_CONNECT_ACCEPTED) { 00698 /* Reset cyclic_tick when changing to connected state */ 00699 client->cyclic_tick = 0; 00700 client->conn_state = MQTT_CONNECTED; 00701 /* Notify upper layer */ 00702 if (client->connect_cb != 0) { 00703 client->connect_cb(client, client->connect_arg, res); 00704 } 00705 } 00706 } else { 00707 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n")); 00708 } 00709 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { 00710 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n")); 00711 00712 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { 00713 u16_t payload_offset = 0; 00714 u16_t payload_length = length; 00715 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); 00716 00717 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { 00718 /* Should have topic and pkt id*/ 00719 u8_t *topic; 00720 u16_t after_topic; 00721 u8_t bkp; 00722 u16_t topic_len; 00723 u16_t qos_len = (qos ? 2U : 0U); 00724 if (length < 2 + qos_len) { 00725 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n")); 00726 goto out_disconnect; 00727 } 00728 topic_len = var_hdr_payload[0]; 00729 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); 00730 if ((topic_len > length - (2 + qos_len)) || 00731 (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) { 00732 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n")); 00733 goto out_disconnect; 00734 } 00735 00736 topic = var_hdr_payload + 2; 00737 after_topic = 2 + topic_len; 00738 /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */ 00739 if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) { 00740 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); 00741 goto out_disconnect; 00742 } 00743 00744 /* id for QoS 1 and 2 */ 00745 if (qos > 0) { 00746 if (length < after_topic + 2U) { 00747 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n")); 00748 goto out_disconnect; 00749 } 00750 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; 00751 after_topic += 2; 00752 } else { 00753 client->inpub_pkt_id = 0; 00754 } 00755 /* Take backup of byte after topic */ 00756 bkp = topic[topic_len]; 00757 /* Zero terminate string */ 00758 topic[topic_len] = 0; 00759 /* Payload data remaining in receive buffer */ 00760 payload_length = length - after_topic; 00761 payload_offset = after_topic; 00762 00763 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n", 00764 qos, topic, remaining_length + payload_length)); 00765 if (client->pub_cb != NULL) { 00766 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); 00767 } 00768 /* Restore byte after topic */ 00769 topic[topic_len] = bkp; 00770 } 00771 if (payload_length > 0 || remaining_length == 0) { 00772 if (length < (size_t)(payload_offset + payload_length)) { 00773 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n")); 00774 goto out_disconnect; 00775 } 00776 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); 00777 /* Reply if QoS > 0 */ 00778 if (remaining_length == 0 && qos > 0) { 00779 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ 00780 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; 00781 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n", 00782 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); 00783 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); 00784 } 00785 } 00786 } else { 00787 /* Get packet identifier */ 00788 pkt_id = (u16_t)var_hdr_payload[0] << 8; 00789 pkt_id |= (u16_t)var_hdr_payload[1]; 00790 if (pkt_id == 0) { 00791 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n")); 00792 goto out_disconnect; 00793 } 00794 if (pkt_type == MQTT_MSG_TYPE_PUBREC) { 00795 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id)); 00796 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); 00797 00798 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { 00799 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id)); 00800 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); 00801 00802 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || 00803 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { 00804 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); 00805 if (r != NULL) { 00806 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 00807 if (pkt_type == MQTT_MSG_TYPE_SUBACK) { 00808 if (length < 3) { 00809 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n")); 00810 goto out_disconnect; 00811 } else { 00812 mqtt_incomming_suback(r, var_hdr_payload[2]); 00813 } 00814 } else if (r->cb != NULL) { 00815 r->cb(r->arg, ERR_OK); 00816 } 00817 mqtt_delete_request(r); 00818 } else { 00819 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 00820 } 00821 } else { 00822 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); 00823 goto out_disconnect; 00824 } 00825 } 00826 return res; 00827 out_disconnect: 00828 return MQTT_CONNECT_DISCONNECTED; 00829 } 00830 00831 00832 /** 00833 * MQTT incoming message parser 00834 * @param client MQTT client 00835 * @param p PBUF chain of received data 00836 * @return Connection status 00837 */ 00838 static mqtt_connection_status_t 00839 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) 00840 { 00841 u16_t in_offset = 0; 00842 u32_t msg_rem_len = 0; 00843 u8_t fixed_hdr_idx = 0; 00844 u8_t b = 0; 00845 00846 while (p->tot_len > in_offset) { 00847 /* We ALWAYS parse the header here first. Even if the header was not 00848 included in this segment, we re-parse it here by buffering it in 00849 client->rx_buffer. client->msg_idx keeps track of this. */ 00850 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { 00851 00852 if (fixed_hdr_idx < client->msg_idx) { 00853 /* parse header from old pbuf (buffered in client->rx_buffer) */ 00854 b = client->rx_buffer[fixed_hdr_idx]; 00855 } else { 00856 /* parse header from this pbuf and save it in client->rx_buffer in case 00857 it comes in segmented */ 00858 b = pbuf_get_at(p, in_offset++); 00859 client->rx_buffer[client->msg_idx++] = b; 00860 } 00861 fixed_hdr_idx++; 00862 00863 if (fixed_hdr_idx >= 2) { 00864 /* fixed header contains at least 2 bytes but can contain more, depending on 00865 'remaining length'. All bytes but the last of this have 0x80 set to 00866 indicate more bytes are coming. */ 00867 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); 00868 if ((b & 0x80) == 0) { 00869 /* fixed header is done */ 00870 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len)); 00871 if (msg_rem_len == 0) { 00872 /* Complete message with no extra headers of payload received */ 00873 mqtt_message_received(client, fixed_hdr_idx, 0, 0); 00874 client->msg_idx = 0; 00875 fixed_hdr_idx = 0; 00876 } else { 00877 /* Bytes remaining in message (changes remaining length if this is 00878 not the first segment of this message) */ 00879 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; 00880 } 00881 } 00882 } 00883 } else { 00884 /* Fixed header has been parsed, parse variable header */ 00885 u16_t cpy_len, cpy_start, buffer_space; 00886 00887 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx; 00888 00889 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ 00890 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); 00891 00892 /* Limit to available space in buffer */ 00893 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; 00894 if (cpy_len > buffer_space) { 00895 cpy_len = buffer_space; 00896 } 00897 pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset); 00898 00899 /* Advance get and put indexes */ 00900 client->msg_idx += cpy_len; 00901 in_offset += cpy_len; 00902 msg_rem_len -= cpy_len; 00903 00904 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len)); 00905 if ((msg_rem_len == 0) || (cpy_len == buffer_space)) { 00906 /* Whole message received or buffer is full */ 00907 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); 00908 if (res != MQTT_CONNECT_ACCEPTED) { 00909 return res; 00910 } 00911 if (msg_rem_len == 0) { 00912 /* Reset parser state */ 00913 client->msg_idx = 0; 00914 /* msg_tot_len = 0; */ 00915 fixed_hdr_idx = 0; 00916 } 00917 } 00918 } 00919 } 00920 return MQTT_CONNECT_ACCEPTED; 00921 } 00922 00923 00924 /** 00925 * TCP received callback function. @see tcp_recv_fn 00926 * @param arg MQTT client 00927 * @param p PBUF chain of received data 00928 * @param err Passed as return value if not ERR_OK 00929 * @return ERR_OK or err passed into callback 00930 */ 00931 static err_t 00932 mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err) 00933 { 00934 mqtt_client_t *client = (mqtt_client_t *)arg; 00935 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); 00936 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); 00937 00938 if (p == NULL) { 00939 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); 00940 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 00941 } else { 00942 mqtt_connection_status_t res; 00943 if (err != ERR_OK) { 00944 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err)); 00945 pbuf_free(p); 00946 return err; 00947 } 00948 00949 /* Tell remote that data has been received */ 00950 altcp_recved (pcb, p->tot_len); 00951 res = mqtt_parse_incoming(client, p); 00952 pbuf_free(p); 00953 00954 if (res != MQTT_CONNECT_ACCEPTED) { 00955 mqtt_close(client, res); 00956 } 00957 /* If keep alive functionality is used */ 00958 if (client->keep_alive != 0) { 00959 /* Reset server alive watchdog */ 00960 client->server_watchdog = 0; 00961 } 00962 00963 } 00964 return ERR_OK; 00965 } 00966 00967 00968 /** 00969 * TCP data sent callback function. @see tcp_sent_fn 00970 * @param arg MQTT client 00971 * @param tpcb TCP connection handle 00972 * @param len Number of bytes sent 00973 * @return ERR_OK 00974 */ 00975 static err_t 00976 mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len) 00977 { 00978 mqtt_client_t *client = (mqtt_client_t *)arg; 00979 00980 LWIP_UNUSED_ARG(tpcb); 00981 LWIP_UNUSED_ARG(len); 00982 00983 if (client->conn_state == MQTT_CONNECTED) { 00984 struct mqtt_request_t *r; 00985 00986 /* Reset keep-alive send timer and server watchdog */ 00987 client->cyclic_tick = 0; 00988 client->server_watchdog = 0; 00989 /* QoS 0 publish has no response from server, so call its callbacks here */ 00990 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { 00991 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); 00992 if (r->cb != NULL) { 00993 r->cb(r->arg, ERR_OK); 00994 } 00995 mqtt_delete_request(r); 00996 } 00997 /* Try send any remaining buffers from output queue */ 00998 mqtt_output_send(&client->output, client->conn); 00999 } 01000 return ERR_OK; 01001 } 01002 01003 /** 01004 * TCP error callback function. @see tcp_err_fn 01005 * @param arg MQTT client 01006 * @param err Error encountered 01007 */ 01008 static void 01009 mqtt_tcp_err_cb(void *arg, err_t err) 01010 { 01011 mqtt_client_t *client = (mqtt_client_t *)arg; 01012 LWIP_UNUSED_ARG(err); /* only used for debug output */ 01013 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); 01014 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); 01015 /* Set conn to null before calling close as pcb is already deallocated*/ 01016 client->conn = 0; 01017 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 01018 } 01019 01020 /** 01021 * TCP poll callback function. @see tcp_poll_fn 01022 * @param arg MQTT client 01023 * @param tpcb TCP connection handle 01024 * @return err ERR_OK 01025 */ 01026 static err_t 01027 mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb) 01028 { 01029 mqtt_client_t *client = (mqtt_client_t *)arg; 01030 if (client->conn_state == MQTT_CONNECTED) { 01031 /* Try send any remaining buffers from output queue */ 01032 mqtt_output_send(&client->output, tpcb); 01033 } 01034 return ERR_OK; 01035 } 01036 01037 /** 01038 * TCP connect callback function. @see tcp_connected_fn 01039 * @param arg MQTT client 01040 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error 01041 * @return ERR_OK 01042 */ 01043 static err_t 01044 mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err) 01045 { 01046 mqtt_client_t *client = (mqtt_client_t *)arg; 01047 01048 if (err != ERR_OK) { 01049 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); 01050 return err; 01051 } 01052 01053 /* Initiate receiver state */ 01054 client->msg_idx = 0; 01055 01056 /* Setup TCP callbacks */ 01057 altcp_recv (tpcb, mqtt_tcp_recv_cb); 01058 altcp_sent (tpcb, mqtt_tcp_sent_cb); 01059 altcp_poll (tpcb, mqtt_tcp_poll_cb, 2); 01060 01061 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n")); 01062 /* Enter MQTT connect state */ 01063 client->conn_state = MQTT_CONNECTING; 01064 01065 /* Start cyclic timer */ 01066 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client); 01067 client->cyclic_tick = 0; 01068 01069 /* Start transmission from output queue, connect message is the first one out*/ 01070 mqtt_output_send(&client->output, client->conn); 01071 01072 return ERR_OK; 01073 } 01074 01075 01076 01077 /*---------------------------------------------------------------------------------------------------- */ 01078 /* Public API */ 01079 01080 01081 /** 01082 * @ingroup mqtt 01083 * MQTT publish function. 01084 * @param client MQTT client 01085 * @param topic Publish topic string 01086 * @param payload Data to publish (NULL is allowed) 01087 * @param payload_length Length of payload (0 is allowed) 01088 * @param qos Quality of service, 0 1 or 2 01089 * @param retain MQTT retain flag 01090 * @param cb Callback to call when publish is complete or has timed out 01091 * @param arg User supplied argument to publish callback 01092 * @return ERR_OK if successful 01093 * ERR_CONN if client is disconnected 01094 * ERR_MEM if short on memory 01095 */ 01096 err_t 01097 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, 01098 mqtt_request_cb_t cb, void *arg) 01099 { 01100 struct mqtt_request_t *r; 01101 u16_t pkt_id; 01102 size_t topic_strlen; 01103 size_t total_len; 01104 u16_t topic_len; 01105 u16_t remaining_length; 01106 01107 LWIP_ASSERT_CORE_LOCKED(); 01108 LWIP_ASSERT("mqtt_publish: client != NULL", client); 01109 LWIP_ASSERT("mqtt_publish: topic != NULL", topic); 01110 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); 01111 01112 topic_strlen = strlen(topic); 01113 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 01114 topic_len = (u16_t)topic_strlen; 01115 total_len = 2 + topic_len + payload_length; 01116 01117 if (qos > 0) { 01118 total_len += 2; 01119 /* Generate pkt_id id for QoS1 and 2 */ 01120 pkt_id = msg_generate_packet_id(client); 01121 } else { 01122 /* Use reserved value pkt_id 0 for QoS 0 in request handle */ 01123 pkt_id = 0; 01124 } 01125 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 01126 remaining_length = (u16_t)total_len; 01127 01128 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); 01129 01130 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg); 01131 if (r == NULL) { 01132 return ERR_MEM; 01133 } 01134 01135 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01136 mqtt_delete_request(r); 01137 return ERR_MEM; 01138 } 01139 /* Append fixed header */ 01140 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); 01141 01142 /* Append Topic */ 01143 mqtt_output_append_string(&client->output, topic, topic_len); 01144 01145 /* Append packet if for QoS 1 and 2*/ 01146 if (qos > 0) { 01147 mqtt_output_append_u16(&client->output, pkt_id); 01148 } 01149 01150 /* Append optional publish payload */ 01151 if ((payload != NULL) && (payload_length > 0)) { 01152 mqtt_output_append_buf(&client->output, payload, payload_length); 01153 } 01154 01155 mqtt_append_request(&client->pend_req_queue, r); 01156 mqtt_output_send(&client->output, client->conn); 01157 return ERR_OK; 01158 } 01159 01160 01161 /** 01162 * @ingroup mqtt 01163 * MQTT subscribe/unsubscribe function. 01164 * @param client MQTT client 01165 * @param topic topic to subscribe to 01166 * @param qos Quality of service, 0 1 or 2 (only used for subscribe) 01167 * @param cb Callback to call when subscribe/unsubscribe reponse is received 01168 * @param arg User supplied argument to publish callback 01169 * @param sub 1 for subscribe, 0 for unsubscribe 01170 * @return ERR_OK if successful, @see err_t enum for other results 01171 */ 01172 err_t 01173 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) 01174 { 01175 size_t topic_strlen; 01176 size_t total_len; 01177 u16_t topic_len; 01178 u16_t remaining_length; 01179 u16_t pkt_id; 01180 struct mqtt_request_t *r; 01181 01182 LWIP_ASSERT_CORE_LOCKED(); 01183 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); 01184 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); 01185 01186 topic_strlen = strlen(topic); 01187 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 01188 topic_len = (u16_t)topic_strlen; 01189 /* Topic string, pkt_id, qos for subscribe */ 01190 total_len = topic_len + 2 + 2 + (sub != 0); 01191 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 01192 remaining_length = (u16_t)total_len; 01193 01194 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); 01195 if (client->conn_state == TCP_DISCONNECTED) { 01196 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); 01197 return ERR_CONN; 01198 } 01199 01200 pkt_id = msg_generate_packet_id(client); 01201 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg); 01202 if (r == NULL) { 01203 return ERR_MEM; 01204 } 01205 01206 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01207 mqtt_delete_request(r); 01208 return ERR_MEM; 01209 } 01210 01211 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); 01212 01213 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); 01214 /* Packet id */ 01215 mqtt_output_append_u16(&client->output, pkt_id); 01216 /* Topic */ 01217 mqtt_output_append_string(&client->output, topic, topic_len); 01218 /* QoS */ 01219 if (sub != 0) { 01220 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); 01221 } 01222 01223 mqtt_append_request(&client->pend_req_queue, r); 01224 mqtt_output_send(&client->output, client->conn); 01225 return ERR_OK; 01226 } 01227 01228 01229 /** 01230 * @ingroup mqtt 01231 * Set callback to handle incoming publish requests from server 01232 * @param client MQTT client 01233 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload 01234 * @param data_cb Callback for each fragment of payload that arrives 01235 * @param arg User supplied argument to both callbacks 01236 */ 01237 void 01238 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, 01239 mqtt_incoming_data_cb_t data_cb, void *arg) 01240 { 01241 LWIP_ASSERT_CORE_LOCKED(); 01242 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); 01243 client->data_cb = data_cb; 01244 client->pub_cb = pub_cb; 01245 client->inpub_arg = arg; 01246 } 01247 01248 /** 01249 * @ingroup mqtt 01250 * Create a new MQTT client instance 01251 * @return Pointer to instance on success, NULL otherwise 01252 */ 01253 mqtt_client_t * 01254 mqtt_client_new(void) 01255 { 01256 LWIP_ASSERT_CORE_LOCKED(); 01257 return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t)); 01258 } 01259 01260 /** 01261 * @ingroup mqtt 01262 * Free MQTT client instance 01263 * @param client Pointer to instance to be freed 01264 */ 01265 void 01266 mqtt_client_free(mqtt_client_t *client) 01267 { 01268 mem_free(client); 01269 } 01270 01271 /** 01272 * @ingroup mqtt 01273 * Connect to MQTT server 01274 * @param client MQTT client 01275 * @param ip_addr Server IP 01276 * @param port Server port 01277 * @param cb Connection state change callback 01278 * @param arg User supplied argument to connection callback 01279 * @param client_info Client identification and connection options 01280 * @return ERR_OK if successful, @see err_t enum for other results 01281 */ 01282 err_t 01283 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, 01284 const struct mqtt_connect_client_info_t *client_info) 01285 { 01286 err_t err; 01287 size_t len; 01288 u16_t client_id_length; 01289 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ 01290 u16_t remaining_length = 2 + 4 + 1 + 1 + 2; 01291 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; 01292 u16_t client_user_len = 0, client_pass_len = 0; 01293 01294 LWIP_ASSERT_CORE_LOCKED(); 01295 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); 01296 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); 01297 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); 01298 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); 01299 01300 if (client->conn_state != TCP_DISCONNECTED) { 01301 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n")); 01302 return ERR_ISCONN; 01303 } 01304 01305 /* Wipe clean */ 01306 memset(client, 0, sizeof(mqtt_client_t)); 01307 client->connect_arg = arg; 01308 client->connect_cb = cb; 01309 client->keep_alive = client_info->keep_alive; 01310 mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list)); 01311 01312 /* Build connect message */ 01313 if (client_info->will_topic != NULL && client_info->will_msg != NULL) { 01314 flags |= MQTT_CONNECT_FLAG_WILL; 01315 flags |= (client_info->will_qos & 3) << 3; 01316 if (client_info->will_retain) { 01317 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; 01318 } 01319 len = strlen(client_info->will_topic); 01320 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); 01321 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL); 01322 will_topic_len = (u8_t)len; 01323 len = strlen(client_info->will_msg); 01324 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); 01325 will_msg_len = (u8_t)len; 01326 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; 01327 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01328 remaining_length = (u16_t)len; 01329 } 01330 if (client_info->client_user != NULL) { 01331 flags |= MQTT_CONNECT_FLAG_USERNAME; 01332 len = strlen(client_info->client_user); 01333 LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL); 01334 LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL); 01335 client_user_len = (u16_t)len; 01336 len = remaining_length + 2 + client_user_len; 01337 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01338 remaining_length = (u16_t)len; 01339 } 01340 if (client_info->client_pass != NULL) { 01341 flags |= MQTT_CONNECT_FLAG_PASSWORD; 01342 len = strlen(client_info->client_pass); 01343 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL); 01344 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL); 01345 client_pass_len = (u16_t)len; 01346 len = remaining_length + 2 + client_pass_len; 01347 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01348 remaining_length = (u16_t)len; 01349 } 01350 01351 /* Don't complicate things, always connect using clean session */ 01352 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; 01353 01354 len = strlen(client_info->client_id); 01355 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); 01356 client_id_length = (u16_t)len; 01357 len = remaining_length + 2 + client_id_length; 01358 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01359 remaining_length = (u16_t)len; 01360 01361 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01362 return ERR_MEM; 01363 } 01364 01365 #if LWIP_ALTCP && LWIP_ALTCP_TLS 01366 if (client_info->tls_config) { 01367 client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr)); 01368 } else 01369 #endif 01370 { 01371 client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr)); 01372 } 01373 if (client->conn == NULL) { 01374 return ERR_MEM; 01375 } 01376 01377 /* Set arg pointer for callbacks */ 01378 altcp_arg (client->conn, client); 01379 /* Any local address, pick random local port number */ 01380 err = altcp_bind (client->conn, IP_ADDR_ANY, 0); 01381 if (err != ERR_OK) { 01382 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); 01383 goto tcp_fail; 01384 } 01385 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); 01386 01387 /* Connect to server */ 01388 err = altcp_connect (client->conn, ip_addr, port, mqtt_tcp_connect_cb); 01389 if (err != ERR_OK) { 01390 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); 01391 goto tcp_fail; 01392 } 01393 /* Set error callback */ 01394 altcp_err (client->conn, mqtt_tcp_err_cb); 01395 client->conn_state = TCP_CONNECTING; 01396 01397 /* Append fixed header */ 01398 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); 01399 /* Append Protocol string */ 01400 mqtt_output_append_string(&client->output, "MQTT", 4); 01401 /* Append Protocol level */ 01402 mqtt_output_append_u8(&client->output, 4); 01403 /* Append connect flags */ 01404 mqtt_output_append_u8(&client->output, flags); 01405 /* Append keep-alive */ 01406 mqtt_output_append_u16(&client->output, client_info->keep_alive); 01407 /* Append client id */ 01408 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); 01409 /* Append will message if used */ 01410 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) { 01411 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); 01412 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); 01413 } 01414 /* Append user name if given */ 01415 if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) { 01416 mqtt_output_append_string(&client->output, client_info->client_user, client_user_len); 01417 } 01418 /* Append password if given */ 01419 if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) { 01420 mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len); 01421 } 01422 return ERR_OK; 01423 01424 tcp_fail: 01425 altcp_abort (client->conn); 01426 client->conn = NULL; 01427 return err; 01428 } 01429 01430 01431 /** 01432 * @ingroup mqtt 01433 * Disconnect from MQTT server 01434 * @param client MQTT client 01435 */ 01436 void 01437 mqtt_disconnect(mqtt_client_t *client) 01438 { 01439 LWIP_ASSERT_CORE_LOCKED(); 01440 LWIP_ASSERT("mqtt_disconnect: client != NULL", client); 01441 /* If connection in not already closed */ 01442 if (client->conn_state != TCP_DISCONNECTED) { 01443 /* Set conn_state before calling mqtt_close to prevent callback from being called */ 01444 client->conn_state = TCP_DISCONNECTED; 01445 mqtt_close(client, (mqtt_connection_status_t)0); 01446 } 01447 } 01448 01449 /** 01450 * @ingroup mqtt 01451 * Check connection with server 01452 * @param client MQTT client 01453 * @return 1 if connected to server, 0 otherwise 01454 */ 01455 u8_t 01456 mqtt_client_is_connected(mqtt_client_t *client) 01457 { 01458 LWIP_ASSERT_CORE_LOCKED(); 01459 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); 01460 return client->conn_state == MQTT_CONNECTED; 01461 } 01462 01463 #endif /* LWIP_TCP && LWIP_CALLBACK_API */
Generated on Tue Jul 12 2022 13:54:29 by
