Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
lwip_mqtt.c
00001 /** 00002 * @file 00003 * MQTT client 00004 * 00005 * @defgroup mqtt MQTT client 00006 * @ingroup apps 00007 * @verbinclude mqtt_client.txt 00008 */ 00009 00010 /* 00011 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> 00012 * All rights reserved. 00013 * 00014 * Redistribution and use in source and binary forms, with or without modification, 00015 * are permitted provided that the following conditions are met: 00016 * 00017 * 1. Redistributions of source code must retain the above copyright notice, 00018 * this list of conditions and the following disclaimer. 00019 * 2. Redistributions in binary form must reproduce the above copyright notice, 00020 * this list of conditions and the following disclaimer in the documentation 00021 * and/or other materials provided with the distribution. 00022 * 3. The name of the author may not be used to endorse or promote products 00023 * derived from this software without specific prior written permission. 00024 * 00025 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 00026 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 00027 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 00028 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00029 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 00030 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00031 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00032 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 00033 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY 00034 * OF SUCH DAMAGE. 00035 * 00036 * This file is part of the lwIP TCP/IP stack 00037 * 00038 * Author: Erik Andersson <erian747@gmail.com> 00039 * 00040 * 00041 * @todo: 00042 * - Handle large outgoing payloads for PUBLISH messages 00043 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) 00044 * - Add support for legacy MQTT protocol version 00045 * 00046 * Please coordinate changes and requests with Erik Andersson 00047 * Erik Andersson <erian747@gmail.com> 00048 * 00049 */ 00050 #include "lwip/apps/mqtt.h" 00051 #include "lwip/timeouts.h" 00052 #include "lwip/ip_addr.h" 00053 #include "lwip/mem.h" 00054 #include "lwip/err.h" 00055 #include "lwip/pbuf.h" 00056 #include "lwip/tcp.h" 00057 #include <string.h> 00058 00059 #if LWIP_TCP && LWIP_CALLBACK_API 00060 00061 /** 00062 * MQTT_DEBUG: Default is off. 00063 */ 00064 #if !defined MQTT_DEBUG || defined __DOXYGEN__ 00065 #define MQTT_DEBUG LWIP_DBG_OFF 00066 #endif 00067 00068 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) 00069 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) 00070 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) 00071 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) 00072 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) 00073 00074 static void mqtt_cyclic_timer(void *arg); 00075 00076 /** 00077 * MQTT client connection states 00078 */ 00079 enum { 00080 TCP_DISCONNECTED, 00081 TCP_CONNECTING, 00082 MQTT_CONNECTING, 00083 MQTT_CONNECTED 00084 }; 00085 00086 /** 00087 * MQTT control message types 00088 */ 00089 enum mqtt_message_type { 00090 MQTT_MSG_TYPE_CONNECT = 1, 00091 MQTT_MSG_TYPE_CONNACK = 2, 00092 MQTT_MSG_TYPE_PUBLISH = 3, 00093 MQTT_MSG_TYPE_PUBACK = 4, 00094 MQTT_MSG_TYPE_PUBREC = 5, 00095 MQTT_MSG_TYPE_PUBREL = 6, 00096 MQTT_MSG_TYPE_PUBCOMP = 7, 00097 MQTT_MSG_TYPE_SUBSCRIBE = 8, 00098 MQTT_MSG_TYPE_SUBACK = 9, 00099 MQTT_MSG_TYPE_UNSUBSCRIBE = 10, 00100 MQTT_MSG_TYPE_UNSUBACK = 11, 00101 MQTT_MSG_TYPE_PINGREQ = 12, 00102 MQTT_MSG_TYPE_PINGRESP = 13, 00103 MQTT_MSG_TYPE_DISCONNECT = 14 00104 }; 00105 00106 /** Helpers to extract control packet type and qos from first byte in fixed header */ 00107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) 00108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) 00109 00110 /** 00111 * MQTT connect flags, only used in CONNECT message 00112 */ 00113 enum mqtt_connect_flag { 00114 MQTT_CONNECT_FLAG_USERNAME = 1 << 7, 00115 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, 00116 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, 00117 MQTT_CONNECT_FLAG_WILL = 1 << 2, 00118 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 00119 }; 00120 00121 00122 #if defined(LWIP_DEBUG) 00123 static const char * const mqtt_message_type_str[15] = 00124 { 00125 "UNDEFINED", 00126 "CONNECT", 00127 "CONNACK", 00128 "PUBLISH", 00129 "PUBACK", 00130 "PUBREC", 00131 "PUBREL", 00132 "PUBCOMP", 00133 "SUBSCRIBE", 00134 "SUBACK", 00135 "UNSUBSCRIBE", 00136 "UNSUBACK", 00137 "PINGREQ", 00138 "PINGRESP", 00139 "DISCONNECT" 00140 }; 00141 00142 /** 00143 * Message type value to string 00144 * @param msg_type see enum mqtt_message_type 00145 * 00146 * @return Control message type text string 00147 */ 00148 static const char * 00149 mqtt_msg_type_to_str(u8_t msg_type) 00150 { 00151 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { 00152 msg_type = 0; 00153 } 00154 return mqtt_message_type_str[msg_type]; 00155 } 00156 00157 #endif 00158 00159 00160 /** 00161 * Generate MQTT packet identifier 00162 * @param client MQTT client 00163 * @return New packet identifier, range 1 to 65535 00164 */ 00165 static u16_t 00166 msg_generate_packet_id(mqtt_client_t *client) 00167 { 00168 client->pkt_id_seq++; 00169 if (client->pkt_id_seq == 0) { 00170 client->pkt_id_seq++; 00171 } 00172 return client->pkt_id_seq; 00173 } 00174 00175 /*--------------------------------------------------------------------------------------------------------------------- */ 00176 /* Output ring buffer */ 00177 00178 00179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1) 00180 00181 /** Add single item to ring buffer */ 00182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item) 00183 00184 /** Return number of bytes in ring buffer */ 00185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get)) 00186 00187 /** Return number of bytes free in ring buffer */ 00188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) 00189 00190 /** Return number of bytes possible to read without wrapping around */ 00191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK))) 00192 00193 /** Return pointer to ring buffer get position */ 00194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK]) 00195 00196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len)) 00197 00198 00199 /** 00200 * Try send as many bytes as possible from output ring buffer 00201 * @param rb Output ring buffer 00202 * @param tpcb TCP connection handle 00203 */ 00204 static void 00205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb) 00206 { 00207 err_t err; 00208 u8_t wrap = 0; 00209 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); 00210 u16_t send_len = tcp_sndbuf(tpcb); 00211 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); 00212 00213 if (send_len == 0 || ringbuf_lin_len == 0) { 00214 return; 00215 } 00216 00217 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", 00218 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK))); 00219 00220 if (send_len > ringbuf_lin_len) { 00221 /* Space in TCP output buffer is larger than available in ring buffer linear portion */ 00222 send_len = ringbuf_lin_len; 00223 /* Wrap around if more data in ring buffer after linear portion */ 00224 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); 00225 } 00226 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); 00227 if ((err == ERR_OK) && wrap) { 00228 mqtt_ringbuf_advance_get_idx(rb, send_len); 00229 /* Use the lesser one of ring buffer linear length and TCP send buffer size */ 00230 send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb)); 00231 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); 00232 } 00233 00234 if (err == ERR_OK) { 00235 mqtt_ringbuf_advance_get_idx(rb, send_len); 00236 /* Flush */ 00237 tcp_output(tpcb); 00238 } else { 00239 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); 00240 } 00241 } 00242 00243 00244 00245 /*--------------------------------------------------------------------------------------------------------------------- */ 00246 /* Request queue */ 00247 00248 /** 00249 * Create request item 00250 * @param r_objs Pointer to request objects 00251 * @param pkt_id Packet identifier of request 00252 * @param cb Packet callback to call when requests lifetime ends 00253 * @param arg Parameter following callback 00254 * @return Request or NULL if failed to create 00255 */ 00256 static struct mqtt_request_t * 00257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) 00258 { 00259 struct mqtt_request_t *r = NULL; 00260 u8_t n; 00261 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); 00262 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { 00263 /* Item point to itself if not in use */ 00264 if (r_objs[n].next == &r_objs[n]) { 00265 r = &r_objs[n]; 00266 r->next = NULL; 00267 r->cb = cb; 00268 r->arg = arg; 00269 r->pkt_id = pkt_id; 00270 break; 00271 } 00272 } 00273 return r; 00274 } 00275 00276 00277 /** 00278 * Append request to pending request queue 00279 * @param tail Pointer to request queue tail pointer 00280 * @param r Request to append 00281 */ 00282 static void 00283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) 00284 { 00285 struct mqtt_request_t *head = NULL; 00286 s16_t time_before = 0; 00287 struct mqtt_request_t *iter; 00288 00289 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); 00290 00291 /* Iterate trough queue to find head, and count total timeout time */ 00292 for (iter = *tail; iter != NULL; iter = iter->next) { 00293 time_before += iter->timeout_diff; 00294 head = iter; 00295 } 00296 00297 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); 00298 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; 00299 if (head == NULL) { 00300 *tail = r; 00301 } else { 00302 head->next = r; 00303 } 00304 } 00305 00306 00307 /** 00308 * Delete request item 00309 * @param r Request item to delete 00310 */ 00311 static void 00312 mqtt_delete_request(struct mqtt_request_t *r) 00313 { 00314 if (r != NULL) { 00315 r->next = r; 00316 } 00317 } 00318 00319 /** 00320 * Remove a request item with a specific packet identifier from request queue 00321 * @param tail Pointer to request queue tail pointer 00322 * @param pkt_id Packet identifier of request to take 00323 * @return Request item if found, NULL if not 00324 */ 00325 static struct mqtt_request_t * 00326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) 00327 { 00328 struct mqtt_request_t *iter = NULL, *prev = NULL; 00329 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); 00330 /* Search all request for pkt_id */ 00331 for (iter = *tail; iter != NULL; iter = iter->next) { 00332 if (iter->pkt_id == pkt_id) { 00333 break; 00334 } 00335 prev = iter; 00336 } 00337 00338 /* If request was found */ 00339 if (iter != NULL) { 00340 /* unchain */ 00341 if (prev == NULL) { 00342 *tail= iter->next; 00343 } else { 00344 prev->next = iter->next; 00345 } 00346 /* If exists, add remaining timeout time for the request to next */ 00347 if (iter->next != NULL) { 00348 iter->next->timeout_diff += iter->timeout_diff; 00349 } 00350 iter->next = NULL; 00351 } 00352 return iter; 00353 } 00354 00355 /** 00356 * Handle requests timeout 00357 * @param tail Pointer to request queue tail pointer 00358 * @param t Time since last call in seconds 00359 */ 00360 static void 00361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) 00362 { 00363 struct mqtt_request_t *r = *tail; 00364 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); 00365 while (t > 0 && r != NULL) { 00366 if (t >= r->timeout_diff) { 00367 t -= (u8_t)r->timeout_diff; 00368 /* Unchain */ 00369 *tail = r->next; 00370 /* Notify upper layer about timeout */ 00371 if (r->cb != NULL) { 00372 r->cb(r->arg, ERR_TIMEOUT); 00373 } 00374 mqtt_delete_request(r); 00375 /* Tail might be be modified in callback, so re-read it in every iteration */ 00376 r = *(struct mqtt_request_t * const volatile *)tail; 00377 } else { 00378 r->timeout_diff -= t; 00379 t = 0; 00380 } 00381 } 00382 } 00383 00384 /** 00385 * Free all request items 00386 * @param tail Pointer to request queue tail pointer 00387 */ 00388 static void 00389 mqtt_clear_requests(struct mqtt_request_t **tail) 00390 { 00391 struct mqtt_request_t *iter, *next; 00392 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); 00393 for (iter = *tail; iter != NULL; iter = next) { 00394 next = iter->next; 00395 mqtt_delete_request(iter); 00396 } 00397 *tail = NULL; 00398 } 00399 /** 00400 * Initialize all request items 00401 * @param r_objs Pointer to request objects 00402 */ 00403 static void 00404 mqtt_init_requests(struct mqtt_request_t *r_objs) 00405 { 00406 u8_t n; 00407 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); 00408 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { 00409 /* Item pointing to itself indicates unused */ 00410 r_objs[n].next = &r_objs[n]; 00411 } 00412 } 00413 00414 /*--------------------------------------------------------------------------------------------------------------------- */ 00415 /* Output message build helpers */ 00416 00417 00418 static void 00419 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) 00420 { 00421 mqtt_ringbuf_put(rb, value); 00422 } 00423 00424 static 00425 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) 00426 { 00427 mqtt_ringbuf_put(rb, value >> 8); 00428 mqtt_ringbuf_put(rb, value & 0xff); 00429 } 00430 00431 static void 00432 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) 00433 { 00434 u16_t n; 00435 for (n = 0; n < length; n++) { 00436 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); 00437 } 00438 } 00439 00440 static void 00441 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) 00442 { 00443 u16_t n; 00444 mqtt_ringbuf_put(rb, length >> 8); 00445 mqtt_ringbuf_put(rb, length & 0xff); 00446 for (n = 0; n < length; n++) { 00447 mqtt_ringbuf_put(rb, str[n]); 00448 } 00449 } 00450 00451 /** 00452 * Append fixed header 00453 * @param rb Output ring buffer 00454 * @param msg_type see enum mqtt_message_type 00455 * @param dup MQTT DUP flag 00456 * @param qos MQTT QoS field 00457 * @param retain MQTT retain flag 00458 * @param r_length Remaining length after fixed header 00459 */ 00460 00461 static void 00462 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup, 00463 u8_t qos, u8_t retain, u16_t r_length) 00464 { 00465 /* Start with control byte */ 00466 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1))); 00467 /* Encode remaining length field */ 00468 do { 00469 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); 00470 r_length >>= 7; 00471 } while (r_length > 0); 00472 } 00473 00474 00475 /** 00476 * Check output buffer space 00477 * @param rb Output ring buffer 00478 * @param r_length Remaining length after fixed header 00479 * @return 1 if message will fit, 0 if not enough buffer space 00480 */ 00481 static u8_t 00482 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) 00483 { 00484 /* Start with length of type byte + remaining length */ 00485 u16_t total_len = 1 + r_length; 00486 00487 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); 00488 00489 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ 00490 do { 00491 total_len++; 00492 r_length >>= 7; 00493 } while (r_length > 0); 00494 00495 return (total_len <= mqtt_ringbuf_free(rb)); 00496 } 00497 00498 00499 /** 00500 * Close connection to server 00501 * @param client MQTT client 00502 * @param reason Reason for disconnection 00503 */ 00504 static void 00505 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) 00506 { 00507 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); 00508 00509 /* Bring down TCP connection if not already done */ 00510 if (client->conn != NULL) { 00511 err_t res; 00512 tcp_recv(client->conn, NULL); 00513 tcp_err(client->conn, NULL); 00514 tcp_sent(client->conn, NULL); 00515 res = tcp_close(client->conn); 00516 if (res != ERR_OK) { 00517 tcp_abort(client->conn); 00518 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res))); 00519 } 00520 client->conn = NULL; 00521 } 00522 00523 /* Remove all pending requests */ 00524 mqtt_clear_requests(&client->pend_req_queue); 00525 /* Stop cyclic timer */ 00526 sys_untimeout(mqtt_cyclic_timer, client); 00527 00528 /* Notify upper layer of disconnection if changed state */ 00529 if (client->conn_state != TCP_DISCONNECTED) { 00530 00531 client->conn_state = TCP_DISCONNECTED; 00532 if (client->connect_cb != NULL) { 00533 client->connect_cb(client, client->connect_arg, reason); 00534 } 00535 } 00536 } 00537 00538 00539 /** 00540 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states 00541 * @param arg MQTT client 00542 */ 00543 static void 00544 mqtt_cyclic_timer(void *arg) 00545 { 00546 u8_t restart_timer = 1; 00547 mqtt_client_t *client = (mqtt_client_t *)arg; 00548 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); 00549 00550 if (client->conn_state == MQTT_CONNECTING) { 00551 client->cyclic_tick++; 00552 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { 00553 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); 00554 /* Disconnect TCP */ 00555 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 00556 restart_timer = 0; 00557 } 00558 } else if (client->conn_state == MQTT_CONNECTED) { 00559 /* Handle timeout for pending requests */ 00560 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); 00561 00562 /* keep_alive > 0 means keep alive functionality shall be used */ 00563 if (client->keep_alive > 0) { 00564 00565 client->server_watchdog++; 00566 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ 00567 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) { 00568 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); 00569 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 00570 restart_timer = 0; 00571 } 00572 00573 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ 00574 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { 00575 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n")); 00576 if (mqtt_output_check_space(&client->output, 0) != 0) { 00577 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); 00578 client->cyclic_tick = 0; 00579 } 00580 } else { 00581 client->cyclic_tick++; 00582 } 00583 } 00584 } else { 00585 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); 00586 restart_timer = 0; 00587 } 00588 if (restart_timer) { 00589 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg); 00590 } 00591 } 00592 00593 00594 /** 00595 * Send PUBACK, PUBREC or PUBREL response message 00596 * @param client MQTT client 00597 * @param msg PUBACK, PUBREC or PUBREL 00598 * @param pkt_id Packet identifier 00599 * @param qos QoS value 00600 * @return ERR_OK if successful, ERR_MEM if out of memory 00601 */ 00602 static err_t 00603 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) 00604 { 00605 err_t err = ERR_OK; 00606 if (mqtt_output_check_space(&client->output, 2)) { 00607 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); 00608 mqtt_output_append_u16(&client->output, pkt_id); 00609 mqtt_output_send(&client->output, client->conn); 00610 } else { 00611 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", 00612 mqtt_msg_type_to_str(msg), pkt_id)); 00613 err = ERR_MEM; 00614 } 00615 return err; 00616 } 00617 00618 /** 00619 * Subscribe response from server 00620 * @param r Matching request 00621 * @param result Result code from server 00622 */ 00623 static void 00624 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) 00625 { 00626 if (r->cb != NULL) { 00627 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); 00628 } 00629 } 00630 00631 00632 /** 00633 * Complete MQTT message received or buffer full 00634 * @param client MQTT client 00635 * @param fixed_hdr_idx header index 00636 * @param length length received part 00637 * @param remaining_length Remaining length of complete message 00638 */ 00639 static mqtt_connection_status_t 00640 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) 00641 { 00642 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; 00643 00644 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; 00645 00646 /* Control packet type */ 00647 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); 00648 u16_t pkt_id = 0; 00649 00650 if (pkt_type == MQTT_MSG_TYPE_CONNACK) { 00651 if (client->conn_state == MQTT_CONNECTING) { 00652 /* Get result code from CONNACK */ 00653 res = (mqtt_connection_status_t)var_hdr_payload[1]; 00654 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res)); 00655 if (res == MQTT_CONNECT_ACCEPTED) { 00656 /* Reset cyclic_tick when changing to connected state */ 00657 client->cyclic_tick = 0; 00658 client->conn_state = MQTT_CONNECTED; 00659 /* Notify upper layer */ 00660 if (client->connect_cb != 0) { 00661 client->connect_cb(client, client->connect_arg, res); 00662 } 00663 } 00664 } else { 00665 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n")); 00666 } 00667 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { 00668 LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n")); 00669 00670 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { 00671 u16_t payload_offset = 0; 00672 u16_t payload_length = length; 00673 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); 00674 00675 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { 00676 /* Should have topic and pkt id*/ 00677 uint8_t *topic; 00678 uint16_t after_topic; 00679 u8_t bkp; 00680 u16_t topic_len = var_hdr_payload[0]; 00681 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); 00682 00683 topic = var_hdr_payload + 2; 00684 after_topic = 2 + topic_len; 00685 /* Check length, add one byte even for QoS 0 so that zero termination will fit */ 00686 if ((after_topic + (qos? 2 : 1)) > length) { 00687 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); 00688 goto out_disconnect; 00689 } 00690 00691 /* id for QoS 1 and 2 */ 00692 if (qos > 0) { 00693 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; 00694 after_topic += 2; 00695 } else { 00696 client->inpub_pkt_id = 0; 00697 } 00698 /* Take backup of byte after topic */ 00699 bkp = topic[topic_len]; 00700 /* Zero terminate string */ 00701 topic[topic_len] = 0; 00702 /* Payload data remaining in receive buffer */ 00703 payload_length = length - after_topic; 00704 payload_offset = after_topic; 00705 00706 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n", 00707 qos, topic, remaining_length + payload_length)); 00708 if (client->pub_cb != NULL) { 00709 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); 00710 } 00711 /* Restore byte after topic */ 00712 topic[topic_len] = bkp; 00713 } 00714 if (payload_length > 0 || remaining_length == 0) { 00715 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); 00716 /* Reply if QoS > 0 */ 00717 if (remaining_length == 0 && qos > 0) { 00718 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ 00719 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; 00720 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n", 00721 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); 00722 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); 00723 } 00724 } 00725 } else { 00726 /* Get packet identifier */ 00727 pkt_id = (u16_t)var_hdr_payload[0] << 8; 00728 pkt_id |= (u16_t)var_hdr_payload[1]; 00729 if (pkt_id == 0) { 00730 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n")); 00731 goto out_disconnect; 00732 } 00733 if (pkt_type == MQTT_MSG_TYPE_PUBREC) { 00734 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id)); 00735 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); 00736 00737 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { 00738 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id)); 00739 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); 00740 00741 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || 00742 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { 00743 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); 00744 if (r != NULL) { 00745 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 00746 if (pkt_type == MQTT_MSG_TYPE_SUBACK) { 00747 if (length < 3) { 00748 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n")); 00749 goto out_disconnect; 00750 } else { 00751 mqtt_incomming_suback(r, var_hdr_payload[2]); 00752 } 00753 } else if (r->cb != NULL) { 00754 r->cb(r->arg, ERR_OK); 00755 } 00756 mqtt_delete_request(r); 00757 } else { 00758 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 00759 } 00760 } else { 00761 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); 00762 goto out_disconnect; 00763 } 00764 } 00765 return res; 00766 out_disconnect: 00767 return MQTT_CONNECT_DISCONNECTED; 00768 } 00769 00770 00771 /** 00772 * MQTT incoming message parser 00773 * @param client MQTT client 00774 * @param p PBUF chain of received data 00775 * @return Connection status 00776 */ 00777 static mqtt_connection_status_t 00778 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) 00779 { 00780 u16_t in_offset = 0; 00781 u32_t msg_rem_len = 0; 00782 u8_t fixed_hdr_idx = 0; 00783 u8_t b = 0; 00784 00785 while (p->tot_len > in_offset) { 00786 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { 00787 00788 if (fixed_hdr_idx < client->msg_idx) { 00789 b = client->rx_buffer[fixed_hdr_idx]; 00790 } else { 00791 b = pbuf_get_at(p, in_offset++); 00792 client->rx_buffer[client->msg_idx++] = b; 00793 } 00794 fixed_hdr_idx++; 00795 00796 if (fixed_hdr_idx >= 2) { 00797 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); 00798 if ((b & 0x80) == 0) { 00799 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len)); 00800 if (msg_rem_len == 0) { 00801 /* Complete message with no extra headers of payload received */ 00802 mqtt_message_received(client, fixed_hdr_idx, 0, 0); 00803 client->msg_idx = 0; 00804 fixed_hdr_idx = 0; 00805 } else { 00806 /* Bytes remaining in message */ 00807 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; 00808 } 00809 } 00810 } 00811 } else { 00812 u16_t cpy_len, cpy_start, buffer_space; 00813 00814 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx; 00815 00816 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ 00817 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); 00818 00819 /* Limit to available space in buffer */ 00820 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; 00821 if (cpy_len > buffer_space) { 00822 cpy_len = buffer_space; 00823 } 00824 pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset); 00825 00826 /* Advance get and put indexes */ 00827 client->msg_idx += cpy_len; 00828 in_offset += cpy_len; 00829 msg_rem_len -= cpy_len; 00830 00831 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len)); 00832 if (msg_rem_len == 0 || cpy_len == buffer_space) { 00833 /* Whole message received or buffer is full */ 00834 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); 00835 if (res != MQTT_CONNECT_ACCEPTED) { 00836 return res; 00837 } 00838 if (msg_rem_len == 0) { 00839 /* Reset parser state */ 00840 client->msg_idx = 0; 00841 /* msg_tot_len = 0; */ 00842 fixed_hdr_idx = 0; 00843 } 00844 } 00845 } 00846 } 00847 return MQTT_CONNECT_ACCEPTED; 00848 } 00849 00850 00851 /** 00852 * TCP received callback function. @see tcp_recv_fn 00853 * @param arg MQTT client 00854 * @param p PBUF chain of received data 00855 * @param err Passed as return value if not ERR_OK 00856 * @return ERR_OK or err passed into callback 00857 */ 00858 static err_t 00859 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) 00860 { 00861 mqtt_client_t *client = (mqtt_client_t *)arg; 00862 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); 00863 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); 00864 00865 if (p == NULL) { 00866 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); 00867 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 00868 } else { 00869 mqtt_connection_status_t res; 00870 if (err != ERR_OK) { 00871 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err)); 00872 pbuf_free(p); 00873 return err; 00874 } 00875 00876 /* Tell remote that data has been received */ 00877 tcp_recved(pcb, p->tot_len); 00878 res = mqtt_parse_incoming(client, p); 00879 pbuf_free(p); 00880 00881 if (res != MQTT_CONNECT_ACCEPTED) { 00882 mqtt_close(client, res); 00883 } 00884 /* If keep alive functionality is used */ 00885 if (client->keep_alive != 0) { 00886 /* Reset server alive watchdog */ 00887 client->server_watchdog = 0; 00888 } 00889 00890 } 00891 return ERR_OK; 00892 } 00893 00894 00895 /** 00896 * TCP data sent callback function. @see tcp_sent_fn 00897 * @param arg MQTT client 00898 * @param tpcb TCP connection handle 00899 * @param len Number of bytes sent 00900 * @return ERR_OK 00901 */ 00902 static err_t 00903 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len) 00904 { 00905 mqtt_client_t *client = (mqtt_client_t *)arg; 00906 00907 LWIP_UNUSED_ARG(tpcb); 00908 LWIP_UNUSED_ARG(len); 00909 00910 if (client->conn_state == MQTT_CONNECTED) { 00911 struct mqtt_request_t *r; 00912 00913 /* Reset keep-alive send timer and server watchdog */ 00914 client->cyclic_tick = 0; 00915 client->server_watchdog = 0; 00916 /* QoS 0 publish has no response from server, so call its callbacks here */ 00917 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { 00918 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); 00919 if (r->cb != NULL) { 00920 r->cb(r->arg, ERR_OK); 00921 } 00922 mqtt_delete_request(r); 00923 } 00924 /* Try send any remaining buffers from output queue */ 00925 mqtt_output_send(&client->output, client->conn); 00926 } 00927 return ERR_OK; 00928 } 00929 00930 /** 00931 * TCP error callback function. @see tcp_err_fn 00932 * @param arg MQTT client 00933 * @param err Error encountered 00934 */ 00935 static void 00936 mqtt_tcp_err_cb(void *arg, err_t err) 00937 { 00938 mqtt_client_t *client = (mqtt_client_t *)arg; 00939 LWIP_UNUSED_ARG(err); /* only used for debug output */ 00940 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); 00941 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); 00942 /* Set conn to null before calling close as pcb is already deallocated*/ 00943 client->conn = 0; 00944 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 00945 } 00946 00947 /** 00948 * TCP poll callback function. @see tcp_poll_fn 00949 * @param arg MQTT client 00950 * @param tpcb TCP connection handle 00951 * @return err ERR_OK 00952 */ 00953 static err_t 00954 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb) 00955 { 00956 mqtt_client_t *client = (mqtt_client_t *)arg; 00957 if (client->conn_state == MQTT_CONNECTED) { 00958 /* Try send any remaining buffers from output queue */ 00959 mqtt_output_send(&client->output, tpcb); 00960 } 00961 return ERR_OK; 00962 } 00963 00964 /** 00965 * TCP connect callback function. @see tcp_connected_fn 00966 * @param arg MQTT client 00967 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error 00968 * @return ERR_OK 00969 */ 00970 static err_t 00971 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err) 00972 { 00973 mqtt_client_t* client = (mqtt_client_t *)arg; 00974 00975 if (err != ERR_OK) { 00976 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); 00977 return err; 00978 } 00979 00980 /* Initiate receiver state */ 00981 client->msg_idx = 0; 00982 00983 /* Setup TCP callbacks */ 00984 tcp_recv(tpcb, mqtt_tcp_recv_cb); 00985 tcp_sent(tpcb, mqtt_tcp_sent_cb); 00986 tcp_poll(tpcb, mqtt_tcp_poll_cb, 2); 00987 00988 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n")); 00989 /* Enter MQTT connect state */ 00990 client->conn_state = MQTT_CONNECTING; 00991 00992 /* Start cyclic timer */ 00993 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client); 00994 client->cyclic_tick = 0; 00995 00996 /* Start transmission from output queue, connect message is the first one out*/ 00997 mqtt_output_send(&client->output, client->conn); 00998 00999 return ERR_OK; 01000 } 01001 01002 01003 01004 /*---------------------------------------------------------------------------------------------------- */ 01005 /* Public API */ 01006 01007 01008 /** 01009 * @ingroup mqtt 01010 * MQTT publish function. 01011 * @param client MQTT client 01012 * @param topic Publish topic string 01013 * @param payload Data to publish (NULL is allowed) 01014 * @param payload_length: Length of payload (0 is allowed) 01015 * @param qos Quality of service, 0 1 or 2 01016 * @param retain MQTT retain flag 01017 * @param cb Callback to call when publish is complete or has timed out 01018 * @param arg User supplied argument to publish callback 01019 * @return ERR_OK if successful 01020 * ERR_CONN if client is disconnected 01021 * ERR_MEM if short on memory 01022 */ 01023 err_t 01024 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, 01025 mqtt_request_cb_t cb, void *arg) 01026 { 01027 struct mqtt_request_t *r; 01028 u16_t pkt_id; 01029 size_t topic_strlen; 01030 size_t total_len; 01031 u16_t topic_len; 01032 u16_t remaining_length; 01033 01034 LWIP_ASSERT("mqtt_publish: client != NULL", client); 01035 LWIP_ASSERT("mqtt_publish: topic != NULL", topic); 01036 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); 01037 01038 topic_strlen = strlen(topic); 01039 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 01040 topic_len = (u16_t)topic_strlen; 01041 total_len = 2 + topic_len + payload_length; 01042 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 01043 remaining_length = (u16_t)total_len; 01044 01045 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); 01046 01047 if (qos > 0) { 01048 remaining_length += 2; 01049 /* Generate pkt_id id for QoS1 and 2 */ 01050 pkt_id = msg_generate_packet_id(client); 01051 } else { 01052 /* Use reserved value pkt_id 0 for QoS 0 in request handle */ 01053 pkt_id = 0; 01054 } 01055 01056 r = mqtt_create_request(client->req_list, pkt_id, cb, arg); 01057 if (r == NULL) { 01058 return ERR_MEM; 01059 } 01060 01061 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01062 mqtt_delete_request(r); 01063 return ERR_MEM; 01064 } 01065 /* Append fixed header */ 01066 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); 01067 01068 /* Append Topic */ 01069 mqtt_output_append_string(&client->output, topic, topic_len); 01070 01071 /* Append packet if for QoS 1 and 2*/ 01072 if (qos > 0) { 01073 mqtt_output_append_u16(&client->output, pkt_id); 01074 } 01075 01076 /* Append optional publish payload */ 01077 if ((payload != NULL) && (payload_length > 0)) { 01078 mqtt_output_append_buf(&client->output, payload, payload_length); 01079 } 01080 01081 mqtt_append_request(&client->pend_req_queue, r); 01082 mqtt_output_send(&client->output, client->conn); 01083 return ERR_OK; 01084 } 01085 01086 01087 /** 01088 * @ingroup mqtt 01089 * MQTT subscribe/unsubscribe function. 01090 * @param client MQTT client 01091 * @param topic topic to subscribe to 01092 * @param qos Quality of service, 0 1 or 2 (only used for subscribe) 01093 * @param cb Callback to call when subscribe/unsubscribe reponse is received 01094 * @param arg User supplied argument to publish callback 01095 * @param sub 1 for subscribe, 0 for unsubscribe 01096 * @return ERR_OK if successful, @see err_t enum for other results 01097 */ 01098 err_t 01099 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) 01100 { 01101 size_t topic_strlen; 01102 size_t total_len; 01103 u16_t topic_len; 01104 u16_t remaining_length; 01105 u16_t pkt_id; 01106 struct mqtt_request_t *r; 01107 01108 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); 01109 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); 01110 01111 topic_strlen = strlen(topic); 01112 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 01113 topic_len = (u16_t)topic_strlen; 01114 /* Topic string, pkt_id, qos for subscribe */ 01115 total_len = topic_len + 2 + 2 + (sub != 0); 01116 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 01117 remaining_length = (u16_t)total_len; 01118 01119 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); 01120 if (client->conn_state == TCP_DISCONNECTED) { 01121 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); 01122 return ERR_CONN; 01123 } 01124 01125 pkt_id = msg_generate_packet_id(client); 01126 r = mqtt_create_request(client->req_list, pkt_id, cb, arg); 01127 if (r == NULL) { 01128 return ERR_MEM; 01129 } 01130 01131 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01132 mqtt_delete_request(r); 01133 return ERR_MEM; 01134 } 01135 01136 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); 01137 01138 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); 01139 /* Packet id */ 01140 mqtt_output_append_u16(&client->output, pkt_id); 01141 /* Topic */ 01142 mqtt_output_append_string(&client->output, topic, topic_len); 01143 /* QoS */ 01144 if (sub != 0) { 01145 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); 01146 } 01147 01148 mqtt_append_request(&client->pend_req_queue, r); 01149 mqtt_output_send(&client->output, client->conn); 01150 return ERR_OK; 01151 } 01152 01153 01154 /** 01155 * @ingroup mqtt 01156 * Set callback to handle incoming publish requests from server 01157 * @param client MQTT client 01158 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload 01159 * @param data_cb Callback for each fragment of payload that arrives 01160 * @param arg User supplied argument to both callbacks 01161 */ 01162 void 01163 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, 01164 mqtt_incoming_data_cb_t data_cb, void *arg) 01165 { 01166 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); 01167 client->data_cb = data_cb; 01168 client->pub_cb = pub_cb; 01169 client->inpub_arg = arg; 01170 } 01171 01172 /** 01173 * @ingroup mqtt 01174 * Create a new MQTT client instance 01175 * @return Pointer to instance on success, NULL otherwise 01176 */ 01177 mqtt_client_t * 01178 mqtt_client_new(void) 01179 { 01180 mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t)); 01181 if (client != NULL) { 01182 memset(client, 0, sizeof(mqtt_client_t)); 01183 } 01184 return client; 01185 } 01186 01187 01188 /** 01189 * @ingroup mqtt 01190 * Connect to MQTT server 01191 * @param client MQTT client 01192 * @param ip_addr Server IP 01193 * @param port Server port 01194 * @param cb Connection state change callback 01195 * @param arg User supplied argument to connection callback 01196 * @param client_info Client identification and connection options 01197 * @return ERR_OK if successful, @see err_t enum for other results 01198 */ 01199 err_t 01200 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, 01201 const struct mqtt_connect_client_info_t *client_info) 01202 { 01203 err_t err; 01204 size_t len; 01205 u16_t client_id_length; 01206 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ 01207 u16_t remaining_length = 2 + 4 + 1 + 1 + 2; 01208 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; 01209 01210 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); 01211 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); 01212 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); 01213 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); 01214 01215 if (client->conn_state != TCP_DISCONNECTED) { 01216 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n")); 01217 return ERR_ISCONN; 01218 } 01219 01220 /* Wipe clean */ 01221 memset(client, 0, sizeof(mqtt_client_t)); 01222 client->connect_arg = arg; 01223 client->connect_cb = cb; 01224 client->keep_alive = client_info->keep_alive; 01225 mqtt_init_requests(client->req_list); 01226 01227 /* Build connect message */ 01228 if (client_info->will_topic != NULL && client_info->will_msg != NULL) { 01229 flags |= MQTT_CONNECT_FLAG_WILL; 01230 flags |= (client_info->will_qos & 3) << 3; 01231 if (client_info->will_retain) { 01232 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; 01233 } 01234 len = strlen(client_info->will_topic); 01235 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); 01236 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL); 01237 will_topic_len = (u8_t)len; 01238 len = strlen(client_info->will_msg); 01239 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); 01240 will_msg_len = (u8_t)len; 01241 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; 01242 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01243 remaining_length = (u16_t)len; 01244 } 01245 01246 /* Don't complicate things, always connect using clean session */ 01247 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; 01248 01249 len = strlen(client_info->client_id); 01250 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); 01251 client_id_length = (u16_t)len; 01252 len = remaining_length + 2 + client_id_length; 01253 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01254 remaining_length = (u16_t)len; 01255 01256 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01257 return ERR_MEM; 01258 } 01259 01260 client->conn = tcp_new(); 01261 if (client->conn == NULL) { 01262 return ERR_MEM; 01263 } 01264 01265 /* Set arg pointer for callbacks */ 01266 tcp_arg(client->conn, client); 01267 /* Any local address, pick random local port number */ 01268 err = tcp_bind(client->conn, IP_ADDR_ANY, 0); 01269 if (err != ERR_OK) { 01270 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); 01271 goto tcp_fail; 01272 } 01273 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); 01274 01275 /* Connect to server */ 01276 err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb); 01277 if (err != ERR_OK) { 01278 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); 01279 goto tcp_fail; 01280 } 01281 /* Set error callback */ 01282 tcp_err(client->conn, mqtt_tcp_err_cb); 01283 client->conn_state = TCP_CONNECTING; 01284 01285 /* Append fixed header */ 01286 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); 01287 /* Append Protocol string */ 01288 mqtt_output_append_string(&client->output, "MQTT", 4); 01289 /* Append Protocol level */ 01290 mqtt_output_append_u8(&client->output, 4); 01291 /* Append connect flags */ 01292 mqtt_output_append_u8(&client->output, flags); 01293 /* Append keep-alive */ 01294 mqtt_output_append_u16(&client->output, client_info->keep_alive); 01295 /* Append client id */ 01296 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); 01297 /* Append will message if used */ 01298 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) { 01299 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); 01300 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); 01301 } 01302 return ERR_OK; 01303 01304 tcp_fail: 01305 tcp_abort(client->conn); 01306 client->conn = NULL; 01307 return err; 01308 } 01309 01310 01311 /** 01312 * @ingroup mqtt 01313 * Disconnect from MQTT server 01314 * @param client MQTT client 01315 */ 01316 void 01317 mqtt_disconnect(mqtt_client_t *client) 01318 { 01319 LWIP_ASSERT("mqtt_disconnect: client != NULL", client); 01320 /* If connection in not already closed */ 01321 if (client->conn_state != TCP_DISCONNECTED) { 01322 /* Set conn_state before calling mqtt_close to prevent callback from being called */ 01323 client->conn_state = TCP_DISCONNECTED; 01324 mqtt_close(client, (mqtt_connection_status_t)0); 01325 } 01326 } 01327 01328 /** 01329 * @ingroup mqtt 01330 * Check connection with server 01331 * @param client MQTT client 01332 * @return 1 if connected to server, 0 otherwise 01333 */ 01334 u8_t 01335 mqtt_client_is_connected(mqtt_client_t *client) 01336 { 01337 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); 01338 return client->conn_state == MQTT_CONNECTED; 01339 } 01340 01341 #endif /* LWIP_TCP && LWIP_CALLBACK_API */
Generated on Tue Aug 9 2022 00:37:11 by
 1.7.2
 1.7.2