Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed-os-example-blinky-gr-lychee GR-Boads_Camera_sample GR-Boards_Audio_Recoder GR-Boads_Camera_DisplayApp ... more
lwip_mqtt.c
00001 /** 00002 * @file 00003 * MQTT client 00004 * 00005 * @defgroup mqtt MQTT client 00006 * @ingroup apps 00007 * @verbinclude mqtt_client.txt 00008 */ 00009 00010 /* 00011 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> 00012 * All rights reserved. 00013 * 00014 * Redistribution and use in source and binary forms, with or without modification, 00015 * are permitted provided that the following conditions are met: 00016 * 00017 * 1. Redistributions of source code must retain the above copyright notice, 00018 * this list of conditions and the following disclaimer. 00019 * 2. Redistributions in binary form must reproduce the above copyright notice, 00020 * this list of conditions and the following disclaimer in the documentation 00021 * and/or other materials provided with the distribution. 00022 * 3. The name of the author may not be used to endorse or promote products 00023 * derived from this software without specific prior written permission. 00024 * 00025 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 00026 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 00027 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 00028 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 00029 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 00030 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00031 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00032 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 00033 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY 00034 * OF SUCH DAMAGE. 00035 * 00036 * This file is part of the lwIP TCP/IP stack 00037 * 00038 * Author: Erik Andersson <erian747@gmail.com> 00039 * 00040 * 00041 * @todo: 00042 * - Handle large outgoing payloads for PUBLISH messages 00043 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) 00044 * - Add support for legacy MQTT protocol version 00045 * 00046 * Please coordinate changes and requests with Erik Andersson 00047 * Erik Andersson <erian747@gmail.com> 00048 * 00049 */ 00050 #include <string.h> 00051 #include "lwip/timeouts.h" 00052 #include "lwip/ip_addr.h" 00053 #include "lwip/mem.h" 00054 #include "lwip/err.h" 00055 #include "lwip/pbuf.h" 00056 #include "lwip/tcp.h" 00057 #include "lwip/apps/mqtt.h" 00058 00059 /** 00060 * MQTT_DEBUG: Default is off. 00061 */ 00062 #if !defined MQTT_DEBUG || defined __DOXYGEN__ 00063 #define MQTT_DEBUG LWIP_DBG_OFF 00064 #endif 00065 00066 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) 00067 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) 00068 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) 00069 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) 00070 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) 00071 00072 static void mqtt_cyclic_timer(void *arg); 00073 00074 /** 00075 * MQTT client connection states 00076 */ 00077 enum { 00078 TCP_DISCONNECTED, 00079 TCP_CONNECTING, 00080 MQTT_CONNECTING, 00081 MQTT_CONNECTED 00082 }; 00083 00084 /** 00085 * MQTT control message types 00086 */ 00087 enum mqtt_message_type { 00088 MQTT_MSG_TYPE_CONNECT = 1, 00089 MQTT_MSG_TYPE_CONNACK = 2, 00090 MQTT_MSG_TYPE_PUBLISH = 3, 00091 MQTT_MSG_TYPE_PUBACK = 4, 00092 MQTT_MSG_TYPE_PUBREC = 5, 00093 MQTT_MSG_TYPE_PUBREL = 6, 00094 MQTT_MSG_TYPE_PUBCOMP = 7, 00095 MQTT_MSG_TYPE_SUBSCRIBE = 8, 00096 MQTT_MSG_TYPE_SUBACK = 9, 00097 MQTT_MSG_TYPE_UNSUBSCRIBE = 10, 00098 MQTT_MSG_TYPE_UNSUBACK = 11, 00099 MQTT_MSG_TYPE_PINGREQ = 12, 00100 MQTT_MSG_TYPE_PINGRESP = 13, 00101 MQTT_MSG_TYPE_DISCONNECT = 14 00102 }; 00103 00104 /** Helpers to extract control packet type and qos from first byte in fixed header */ 00105 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) 00106 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) 00107 00108 /** 00109 * MQTT connect flags, only used in CONNECT message 00110 */ 00111 enum mqtt_connect_flag { 00112 MQTT_CONNECT_FLAG_USERNAME = 1 << 7, 00113 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, 00114 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, 00115 MQTT_CONNECT_FLAG_WILL = 1 << 2, 00116 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 00117 }; 00118 00119 00120 #if defined(LWIP_DEBUG) 00121 static const char * const mqtt_message_type_str[15] = 00122 { 00123 "UNDEFINED", 00124 "CONNECT", 00125 "CONNACK", 00126 "PUBLISH", 00127 "PUBACK", 00128 "PUBREC", 00129 "PUBREL", 00130 "PUBCOMP", 00131 "SUBSCRIBE", 00132 "SUBACK", 00133 "UNSUBSCRIBE", 00134 "UNSUBACK", 00135 "PINGREQ", 00136 "PINGRESP", 00137 "DISCONNECT" 00138 }; 00139 00140 /** 00141 * Message type value to string 00142 * @param msg_type see enum mqtt_message_type 00143 * 00144 * @return Control message type text string 00145 */ 00146 static const char * 00147 mqtt_msg_type_to_str(u8_t msg_type) 00148 { 00149 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { 00150 msg_type = 0; 00151 } 00152 return mqtt_message_type_str[msg_type]; 00153 } 00154 00155 #endif 00156 00157 00158 /** 00159 * Generate MQTT packet identifier 00160 * @param client MQTT client 00161 * @return New packet identifier, range 1 to 65535 00162 */ 00163 static u16_t 00164 msg_generate_packet_id(mqtt_client_t *client) 00165 { 00166 client->pkt_id_seq++; 00167 if (client->pkt_id_seq == 0) { 00168 client->pkt_id_seq++; 00169 } 00170 return client->pkt_id_seq; 00171 } 00172 00173 /*--------------------------------------------------------------------------------------------------------------------- */ 00174 /* Output ring buffer */ 00175 00176 00177 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1) 00178 00179 /** Add single item to ring buffer */ 00180 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item) 00181 00182 /** Return number of bytes in ring buffer */ 00183 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get)) 00184 00185 /** Return number of bytes free in ring buffer */ 00186 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) 00187 00188 /** Return number of bytes possible to read without wrapping around */ 00189 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK))) 00190 00191 /** Return pointer to ring buffer get position */ 00192 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK]) 00193 00194 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len)) 00195 00196 00197 /** 00198 * Try send as many bytes as possible from output ring buffer 00199 * @param rb Output ring buffer 00200 * @param tpcb TCP connection handle 00201 */ 00202 static void 00203 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb) 00204 { 00205 err_t err; 00206 u8_t wrap = 0; 00207 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); 00208 u16_t send_len = tcp_sndbuf(tpcb); 00209 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); 00210 00211 if (send_len == 0 || ringbuf_lin_len == 0) { 00212 return; 00213 } 00214 00215 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", 00216 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK))); 00217 00218 if (send_len > ringbuf_lin_len) { 00219 /* Space in TCP output buffer is larger than available in ring buffer linear portion */ 00220 send_len = ringbuf_lin_len; 00221 /* Wrap around if more data in ring buffer after linear portion */ 00222 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); 00223 } 00224 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); 00225 if ((err == ERR_OK) && wrap) { 00226 mqtt_ringbuf_advance_get_idx(rb, send_len); 00227 /* Use the lesser one of ring buffer linear length and TCP send buffer size */ 00228 send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb)); 00229 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); 00230 } 00231 00232 if (err == ERR_OK) { 00233 mqtt_ringbuf_advance_get_idx(rb, send_len); 00234 /* Flush */ 00235 tcp_output(tpcb); 00236 } else { 00237 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); 00238 } 00239 } 00240 00241 00242 00243 /*--------------------------------------------------------------------------------------------------------------------- */ 00244 /* Request queue */ 00245 00246 /** 00247 * Create request item 00248 * @param r_objs Pointer to request objects 00249 * @param pkt_id Packet identifier of request 00250 * @param cb Packet callback to call when requests lifetime ends 00251 * @param arg Parameter following callback 00252 * @return Request or NULL if failed to create 00253 */ 00254 static struct mqtt_request_t * 00255 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) 00256 { 00257 struct mqtt_request_t *r = NULL; 00258 u8_t n; 00259 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); 00260 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT && r == NULL; n++) { 00261 /* Item point to itself if not in use */ 00262 if (r_objs[n].next == &r_objs[n]) { 00263 r = &r_objs[n]; 00264 r->next = NULL; 00265 r->cb = cb; 00266 r->arg = arg; 00267 r->pkt_id = pkt_id; 00268 } 00269 } 00270 return r; 00271 } 00272 00273 00274 /** 00275 * Append request to pending request queue 00276 * @param tail Pointer to request queue tail pointer 00277 * @param r Request to append 00278 */ 00279 static void 00280 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) 00281 { 00282 struct mqtt_request_t *head = NULL; 00283 s16_t time_before = 0; 00284 struct mqtt_request_t *iter = *tail; 00285 00286 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); 00287 00288 /* Iterate trough queue to find head, and count total timeout time */ 00289 for (iter = *tail; iter != NULL; iter = iter->next) { 00290 time_before += iter->timeout_diff; 00291 head = iter; 00292 } 00293 00294 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); 00295 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; 00296 if (head == NULL) { 00297 *tail = r; 00298 } else { 00299 head->next = r; 00300 } 00301 } 00302 00303 00304 /** 00305 * Delete request item 00306 * @param r Request item to delete 00307 */ 00308 static void 00309 mqtt_delete_request(struct mqtt_request_t *r) 00310 { 00311 if (r != NULL) { 00312 r->next = r; 00313 } 00314 } 00315 00316 /** 00317 * Remove a request item with a specific packet identifier from request queue 00318 * @param tail Pointer to request queue tail pointer 00319 * @param pkt_id Packet identifier of request to take 00320 * @return Request item if found, NULL if not 00321 */ 00322 static struct mqtt_request_t * 00323 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) 00324 { 00325 struct mqtt_request_t *iter = NULL, *prev = NULL; 00326 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); 00327 /* Search all request for pkt_id */ 00328 for (iter = *tail; iter != NULL; iter = iter->next) { 00329 if (iter->pkt_id == pkt_id) { 00330 break; 00331 } 00332 prev = iter; 00333 } 00334 00335 /* If request was found */ 00336 if (iter != NULL) { 00337 /* unchain */ 00338 if (prev == NULL) { 00339 *tail= iter->next; 00340 } else { 00341 prev->next = iter->next; 00342 } 00343 /* If exists, add remaining timeout time for the request to next */ 00344 if (iter->next != NULL) { 00345 iter->next->timeout_diff += iter->timeout_diff; 00346 } 00347 iter->next = NULL; 00348 } 00349 return iter; 00350 } 00351 00352 /** 00353 * Handle requests timeout 00354 * @param tail Pointer to request queue tail pointer 00355 * @param t Time since last call in seconds 00356 */ 00357 static void 00358 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) 00359 { 00360 struct mqtt_request_t *r = *tail; 00361 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); 00362 while (t > 0 && r != NULL) { 00363 if (t >= r->timeout_diff) { 00364 t -= (u8_t)r->timeout_diff; 00365 /* Unchain */ 00366 *tail = r->next; 00367 /* Notify upper layer about timeout */ 00368 if (r->cb != NULL) { 00369 r->cb(r->arg, ERR_TIMEOUT); 00370 } 00371 mqtt_delete_request(r); 00372 /* Tail might be be modified in callback, so re-read it in every iteration */ 00373 r = *(struct mqtt_request_t * const volatile *)tail; 00374 } else { 00375 r->timeout_diff -= t; 00376 t = 0; 00377 } 00378 } 00379 } 00380 00381 /** 00382 * Free all request items 00383 * @param tail Pointer to request queue tail pointer 00384 */ 00385 static void 00386 mqtt_clear_requests(struct mqtt_request_t **tail) 00387 { 00388 struct mqtt_request_t *iter, *next; 00389 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); 00390 for (iter = *tail; iter != NULL; iter = next) { 00391 next = iter->next; 00392 mqtt_delete_request(iter); 00393 } 00394 *tail = NULL; 00395 } 00396 /** 00397 * Initialize all request items 00398 * @param r_objs Pointer to request objects 00399 */ 00400 static void 00401 mqtt_init_requests(struct mqtt_request_t *r_objs) 00402 { 00403 u8_t n; 00404 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); 00405 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { 00406 /* Item pointing to itself indicates unused */ 00407 r_objs[n].next = &r_objs[n]; 00408 } 00409 } 00410 00411 /*--------------------------------------------------------------------------------------------------------------------- */ 00412 /* Output message build helpers */ 00413 00414 00415 static void 00416 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) 00417 { 00418 mqtt_ringbuf_put(rb, value); 00419 } 00420 00421 static 00422 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) 00423 { 00424 mqtt_ringbuf_put(rb, value >> 8); 00425 mqtt_ringbuf_put(rb, value & 0xff); 00426 } 00427 00428 static void 00429 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) 00430 { 00431 u16_t n; 00432 for (n = 0; n < length; n++) { 00433 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); 00434 } 00435 } 00436 00437 static void 00438 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) 00439 { 00440 u16_t n; 00441 mqtt_ringbuf_put(rb, length >> 8); 00442 mqtt_ringbuf_put(rb, length & 0xff); 00443 for (n = 0; n < length; n++) { 00444 mqtt_ringbuf_put(rb, str[n]); 00445 } 00446 } 00447 00448 /** 00449 * Append fixed header 00450 * @param rb Output ring buffer 00451 * @param msg_type see enum mqtt_message_type 00452 * @param dup MQTT DUP flag 00453 * @param qos MQTT QoS field 00454 * @param retain MQTT retain flag 00455 * @param r_length Remaining length after fixed header 00456 */ 00457 00458 static void 00459 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup, 00460 u8_t qos, u8_t retain, u16_t r_length) 00461 { 00462 /* Start with control byte */ 00463 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1))); 00464 /* Encode remaining length field */ 00465 do { 00466 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); 00467 r_length >>= 7; 00468 } while (r_length > 0); 00469 } 00470 00471 00472 /** 00473 * Check output buffer space 00474 * @param rb Output ring buffer 00475 * @param r_length Remaining length after fixed header 00476 * @return 1 if message will fit, 0 if not enough buffer space 00477 */ 00478 static u8_t 00479 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) 00480 { 00481 /* Start with length of type byte + remaining length */ 00482 u16_t total_len = 1 + r_length; 00483 00484 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); 00485 00486 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ 00487 do { 00488 total_len++; 00489 r_length >>= 7; 00490 } while (r_length > 0); 00491 00492 return (total_len <= mqtt_ringbuf_free(rb)); 00493 } 00494 00495 00496 /** 00497 * Close connection to server 00498 * @param client MQTT client 00499 * @param reason Reason for disconnection 00500 */ 00501 static void 00502 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) 00503 { 00504 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); 00505 00506 /* Bring down TCP connection if not already done */ 00507 if (client->conn != NULL) { 00508 err_t res; 00509 tcp_recv(client->conn, NULL); 00510 tcp_err(client->conn, NULL); 00511 tcp_sent(client->conn, NULL); 00512 res = tcp_close(client->conn); 00513 if (res != ERR_OK) { 00514 tcp_abort(client->conn); 00515 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res))); 00516 } 00517 client->conn = NULL; 00518 } 00519 00520 /* Remove all pending requests */ 00521 mqtt_clear_requests(&client->pend_req_queue); 00522 /* Stop cyclic timer */ 00523 sys_untimeout(mqtt_cyclic_timer, client); 00524 00525 /* Notify upper layer of disconnection if changed state */ 00526 if (client->conn_state != TCP_DISCONNECTED) { 00527 00528 client->conn_state = TCP_DISCONNECTED; 00529 if (client->connect_cb != NULL) { 00530 client->connect_cb(client, client->connect_arg, reason); 00531 } 00532 } 00533 } 00534 00535 00536 /** 00537 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states 00538 * @param arg MQTT client 00539 */ 00540 static void 00541 mqtt_cyclic_timer(void *arg) 00542 { 00543 u8_t restart_timer = 1; 00544 mqtt_client_t *client = (mqtt_client_t *)arg; 00545 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); 00546 00547 if (client->conn_state == MQTT_CONNECTING) { 00548 client->cyclic_tick++; 00549 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { 00550 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); 00551 /* Disconnect TCP */ 00552 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 00553 restart_timer = 0; 00554 } 00555 } else if (client->conn_state == MQTT_CONNECTED) { 00556 /* Handle timeout for pending requests */ 00557 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); 00558 00559 /* keep_alive > 0 means keep alive functionality shall be used */ 00560 if (client->keep_alive > 0) { 00561 00562 client->server_watchdog++; 00563 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ 00564 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) { 00565 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); 00566 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 00567 restart_timer = 0; 00568 } 00569 00570 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ 00571 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { 00572 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n")); 00573 if (mqtt_output_check_space(&client->output, 0) != 0) { 00574 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); 00575 client->cyclic_tick = 0; 00576 } 00577 } else { 00578 client->cyclic_tick++; 00579 } 00580 } 00581 } else { 00582 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); 00583 restart_timer = 0; 00584 } 00585 if (restart_timer) { 00586 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg); 00587 } 00588 } 00589 00590 00591 /** 00592 * Send PUBACK, PUBREC or PUBREL response message 00593 * @param client MQTT client 00594 * @param msg PUBACK, PUBREC or PUBREL 00595 * @param pkt_id Packet identifier 00596 * @param qos QoS value 00597 * @return ERR_OK if successful, ERR_MEM if out of memory 00598 */ 00599 static err_t 00600 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) 00601 { 00602 err_t err = ERR_OK; 00603 if (mqtt_output_check_space(&client->output, 2)) { 00604 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); 00605 mqtt_output_append_u16(&client->output, pkt_id); 00606 mqtt_output_send(&client->output, client->conn); 00607 } else { 00608 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", 00609 mqtt_msg_type_to_str(msg), pkt_id)); 00610 err = ERR_MEM; 00611 } 00612 return err; 00613 } 00614 00615 /** 00616 * Subscribe response from server 00617 * @param r Matching request 00618 * @param result Result code from server 00619 */ 00620 static void 00621 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) 00622 { 00623 if (r->cb != NULL) { 00624 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); 00625 } 00626 } 00627 00628 00629 /** 00630 * Complete MQTT message received or buffer full 00631 * @param client MQTT client 00632 * @param fixed_hdr_idx header index 00633 * @param length length received part 00634 * @param remaining_length Remaining length of complete message 00635 */ 00636 static mqtt_connection_status_t 00637 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) 00638 { 00639 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; 00640 00641 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; 00642 00643 /* Control packet type */ 00644 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); 00645 u16_t pkt_id = 0; 00646 00647 if (pkt_type == MQTT_MSG_TYPE_CONNACK) { 00648 if (client->conn_state == MQTT_CONNECTING) { 00649 /* Get result code from CONNACK */ 00650 res = (mqtt_connection_status_t)var_hdr_payload[1]; 00651 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res)); 00652 if (res == MQTT_CONNECT_ACCEPTED) { 00653 /* Reset cyclic_tick when changing to connected state */ 00654 client->cyclic_tick = 0; 00655 client->conn_state = MQTT_CONNECTED; 00656 /* Notify upper layer */ 00657 if (client->connect_cb != 0) { 00658 client->connect_cb(client, client->connect_arg, res); 00659 } 00660 } 00661 } else { 00662 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n")); 00663 } 00664 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { 00665 LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n")); 00666 00667 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { 00668 u16_t payload_offset = 0; 00669 u16_t payload_length = length; 00670 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); 00671 00672 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { 00673 /* Should have topic and pkt id*/ 00674 uint8_t *topic; 00675 uint16_t after_topic; 00676 u8_t bkp; 00677 u16_t topic_len = var_hdr_payload[0]; 00678 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); 00679 00680 topic = var_hdr_payload + 2; 00681 after_topic = 2 + topic_len; 00682 /* Check length, add one byte even for QoS 0 so that zero termination will fit */ 00683 if ((after_topic + qos ? 2 : 1) > length) { 00684 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); 00685 goto out_disconnect; 00686 } 00687 00688 /* id for QoS 1 and 2 */ 00689 if (qos > 0) { 00690 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; 00691 after_topic += 2; 00692 } else { 00693 client->inpub_pkt_id = 0; 00694 } 00695 /* Take backup of byte after topic */ 00696 bkp = topic[topic_len]; 00697 /* Zero terminate string */ 00698 topic[topic_len] = 0; 00699 /* Payload data remaining in receive buffer */ 00700 payload_length = length - after_topic; 00701 payload_offset = after_topic; 00702 00703 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n", 00704 qos, topic, remaining_length + payload_length)); 00705 if (client->pub_cb != NULL) { 00706 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); 00707 } 00708 /* Restore byte after topic */ 00709 topic[topic_len] = bkp; 00710 } 00711 if (payload_length > 0 || remaining_length == 0) { 00712 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); 00713 /* Reply if QoS > 0 */ 00714 if (remaining_length == 0 && qos > 0) { 00715 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ 00716 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; 00717 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n", 00718 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); 00719 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); 00720 } 00721 } 00722 } else { 00723 /* Get packet identifier */ 00724 pkt_id = (u16_t)var_hdr_payload[0] << 8; 00725 pkt_id |= (u16_t)var_hdr_payload[1]; 00726 if (pkt_id == 0) { 00727 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n")); 00728 goto out_disconnect; 00729 } 00730 if (pkt_type == MQTT_MSG_TYPE_PUBREC) { 00731 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id)); 00732 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); 00733 00734 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { 00735 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id)); 00736 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); 00737 00738 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || 00739 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { 00740 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); 00741 if (r != NULL) { 00742 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 00743 if (pkt_type == MQTT_MSG_TYPE_SUBACK) { 00744 if (length < 3) { 00745 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n")); 00746 goto out_disconnect; 00747 } else { 00748 mqtt_incomming_suback(r, var_hdr_payload[2]); 00749 } 00750 } else if (r->cb != NULL) { 00751 r->cb(r->arg, ERR_OK); 00752 } 00753 mqtt_delete_request(r); 00754 } else { 00755 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 00756 } 00757 } else { 00758 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); 00759 goto out_disconnect; 00760 } 00761 } 00762 return res; 00763 out_disconnect: 00764 return MQTT_CONNECT_DISCONNECTED; 00765 } 00766 00767 00768 /** 00769 * MQTT incoming message parser 00770 * @param client MQTT client 00771 * @param p PBUF chain of received data 00772 * @return Connection status 00773 */ 00774 static mqtt_connection_status_t 00775 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) 00776 { 00777 u16_t in_offset = 0; 00778 u32_t msg_rem_len = 0; 00779 u8_t fixed_hdr_idx = 0; 00780 u8_t b = 0; 00781 00782 while (p->tot_len > in_offset) { 00783 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { 00784 00785 if (fixed_hdr_idx < client->msg_idx) { 00786 b = client->rx_buffer[fixed_hdr_idx]; 00787 } else { 00788 b = pbuf_get_at(p, in_offset++); 00789 client->rx_buffer[client->msg_idx++] = b; 00790 } 00791 fixed_hdr_idx++; 00792 00793 if (fixed_hdr_idx >= 2) { 00794 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); 00795 if ((b & 0x80) == 0) { 00796 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len)); 00797 if (msg_rem_len == 0) { 00798 /* Complete message with no extra headers of payload received */ 00799 mqtt_message_received(client, fixed_hdr_idx, 0, 0); 00800 client->msg_idx = 0; 00801 fixed_hdr_idx = 0; 00802 } else { 00803 /* Bytes remaining in message */ 00804 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; 00805 } 00806 } 00807 } 00808 } else { 00809 u16_t cpy_len, cpy_start, buffer_space; 00810 00811 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx; 00812 00813 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ 00814 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); 00815 00816 /* Limit to available space in buffer */ 00817 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; 00818 if (cpy_len > buffer_space) { 00819 cpy_len = buffer_space; 00820 } 00821 pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset); 00822 00823 /* Advance get and put indexes */ 00824 client->msg_idx += cpy_len; 00825 in_offset += cpy_len; 00826 msg_rem_len -= cpy_len; 00827 00828 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len)); 00829 if (msg_rem_len == 0 || cpy_len == buffer_space) { 00830 /* Whole message received or buffer is full */ 00831 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); 00832 if (res != MQTT_CONNECT_ACCEPTED) { 00833 return res; 00834 } 00835 if (msg_rem_len == 0) { 00836 /* Reset parser state */ 00837 client->msg_idx = 0; 00838 /* msg_tot_len = 0; */ 00839 fixed_hdr_idx = 0; 00840 } 00841 } 00842 } 00843 } 00844 return MQTT_CONNECT_ACCEPTED; 00845 } 00846 00847 00848 /** 00849 * TCP received callback function. @see tcp_recv_fn 00850 * @param arg MQTT client 00851 * @param p PBUF chain of received data 00852 * @param err Passed as return value if not ERR_OK 00853 * @return ERR_OK or err passed into callback 00854 */ 00855 static err_t 00856 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) 00857 { 00858 mqtt_client_t *client = (mqtt_client_t *)arg; 00859 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); 00860 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); 00861 00862 if (p == NULL) { 00863 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); 00864 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 00865 } else { 00866 mqtt_connection_status_t res; 00867 if (err != ERR_OK) { 00868 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err)); 00869 pbuf_free(p); 00870 return err; 00871 } 00872 00873 /* Tell remote that data has been received */ 00874 tcp_recved(pcb, p->tot_len); 00875 res = mqtt_parse_incoming(client, p); 00876 pbuf_free(p); 00877 00878 if (res != MQTT_CONNECT_ACCEPTED) { 00879 mqtt_close(client, res); 00880 } 00881 /* If keep alive functionality is used */ 00882 if (client->keep_alive != 0) { 00883 /* Reset server alive watchdog */ 00884 client->server_watchdog = 0; 00885 } 00886 00887 } 00888 return ERR_OK; 00889 } 00890 00891 00892 /** 00893 * TCP data sent callback function. @see tcp_sent_fn 00894 * @param arg MQTT client 00895 * @param tpcb TCP connection handle 00896 * @param len Number of bytes sent 00897 * @return ERR_OK 00898 */ 00899 static err_t 00900 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len) 00901 { 00902 mqtt_client_t *client = (mqtt_client_t *)arg; 00903 00904 LWIP_UNUSED_ARG(tpcb); 00905 LWIP_UNUSED_ARG(len); 00906 00907 if (client->conn_state == MQTT_CONNECTED) { 00908 struct mqtt_request_t *r; 00909 00910 /* Reset keep-alive send timer and server watchdog */ 00911 client->cyclic_tick = 0; 00912 client->server_watchdog = 0; 00913 /* QoS 0 publish has no response from server, so call its callbacks here */ 00914 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { 00915 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); 00916 if (r->cb != NULL) { 00917 r->cb(r->arg, ERR_OK); 00918 } 00919 mqtt_delete_request(r); 00920 } 00921 /* Try send any remaining buffers from output queue */ 00922 mqtt_output_send(&client->output, client->conn); 00923 } 00924 return ERR_OK; 00925 } 00926 00927 /** 00928 * TCP error callback function. @see tcp_err_fn 00929 * @param arg MQTT client 00930 * @param err Error encountered 00931 */ 00932 static void 00933 mqtt_tcp_err_cb(void *arg, err_t err) 00934 { 00935 mqtt_client_t *client = (mqtt_client_t *)arg; 00936 LWIP_UNUSED_ARG(err); /* only used for debug output */ 00937 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); 00938 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); 00939 /* Set conn to null before calling close as pcb is already deallocated*/ 00940 client->conn = 0; 00941 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 00942 } 00943 00944 /** 00945 * TCP poll callback function. @see tcp_poll_fn 00946 * @param arg MQTT client 00947 * @param tpcb TCP connection handle 00948 * @return err ERR_OK 00949 */ 00950 static err_t 00951 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb) 00952 { 00953 mqtt_client_t *client = (mqtt_client_t *)arg; 00954 if (client->conn_state == MQTT_CONNECTED) { 00955 /* Try send any remaining buffers from output queue */ 00956 mqtt_output_send(&client->output, tpcb); 00957 } 00958 return ERR_OK; 00959 } 00960 00961 /** 00962 * TCP connect callback function. @see tcp_connected_fn 00963 * @param arg MQTT client 00964 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error 00965 * @return ERR_OK 00966 */ 00967 static err_t 00968 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err) 00969 { 00970 mqtt_client_t* client = (mqtt_client_t *)arg; 00971 00972 if (err != ERR_OK) { 00973 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); 00974 return err; 00975 } 00976 00977 /* Initiate receiver state */ 00978 client->msg_idx = 0; 00979 00980 /* Setup TCP callbacks */ 00981 tcp_recv(tpcb, mqtt_tcp_recv_cb); 00982 tcp_sent(tpcb, mqtt_tcp_sent_cb); 00983 tcp_poll(tpcb, mqtt_tcp_poll_cb, 2); 00984 00985 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n")); 00986 /* Enter MQTT connect state */ 00987 client->conn_state = MQTT_CONNECTING; 00988 00989 /* Start cyclic timer */ 00990 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client); 00991 client->cyclic_tick = 0; 00992 00993 /* Start transmission from output queue, connect message is the first one out*/ 00994 mqtt_output_send(&client->output, client->conn); 00995 00996 return ERR_OK; 00997 } 00998 00999 01000 01001 /*---------------------------------------------------------------------------------------------------- */ 01002 /* Public API */ 01003 01004 01005 /** 01006 * @ingroup mqtt 01007 * MQTT publish function. 01008 * @param client MQTT client 01009 * @param topic Publish topic string 01010 * @param payload Data to publish (NULL is allowed) 01011 * @param payload_length: Length of payload (0 is allowed) 01012 * @param qos Quality of service, 0 1 or 2 01013 * @param retain MQTT retain flag 01014 * @param cb Callback to call when publish is complete or has timed out 01015 * @param arg User supplied argument to publish callback 01016 * @return ERR_OK if successful 01017 * ERR_CONN if client is disconnected 01018 * ERR_MEM if short on memory 01019 */ 01020 err_t 01021 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, 01022 mqtt_request_cb_t cb, void *arg) 01023 { 01024 struct mqtt_request_t *r; 01025 u16_t pkt_id; 01026 size_t topic_strlen; 01027 size_t total_len; 01028 u16_t topic_len; 01029 u16_t remaining_length; 01030 01031 LWIP_ASSERT("mqtt_publish: client != NULL", client); 01032 LWIP_ASSERT("mqtt_publish: topic != NULL", topic); 01033 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); 01034 01035 topic_strlen = strlen(topic); 01036 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 01037 topic_len = (u16_t)topic_strlen; 01038 total_len = 2 + topic_len + payload_length; 01039 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 01040 remaining_length = (u16_t)total_len; 01041 01042 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); 01043 01044 if (qos > 0) { 01045 remaining_length += 2; 01046 /* Generate pkt_id id for QoS1 and 2 */ 01047 pkt_id = msg_generate_packet_id(client); 01048 } else { 01049 /* Use reserved value pkt_id 0 for QoS 0 in request handle */ 01050 pkt_id = 0; 01051 } 01052 01053 r = mqtt_create_request(client->req_list, pkt_id, cb, arg); 01054 if (r == NULL) { 01055 return ERR_MEM; 01056 } 01057 01058 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01059 mqtt_delete_request(r); 01060 return ERR_MEM; 01061 } 01062 /* Append fixed header */ 01063 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); 01064 01065 /* Append Topic */ 01066 mqtt_output_append_string(&client->output, topic, topic_len); 01067 01068 /* Append packet if for QoS 1 and 2*/ 01069 if (qos > 0) { 01070 mqtt_output_append_u16(&client->output, pkt_id); 01071 } 01072 01073 /* Append optional publish payload */ 01074 if ((payload != NULL) && (payload_length > 0)) { 01075 mqtt_output_append_buf(&client->output, payload, payload_length); 01076 } 01077 01078 mqtt_append_request(&client->pend_req_queue, r); 01079 mqtt_output_send(&client->output, client->conn); 01080 return ERR_OK; 01081 } 01082 01083 01084 /** 01085 * @ingroup mqtt 01086 * MQTT subscribe/unsubscribe function. 01087 * @param client MQTT client 01088 * @param topic topic to subscribe to 01089 * @param qos Quality of service, 0 1 or 2 (only used for subscribe) 01090 * @param cb Callback to call when subscribe/unsubscribe reponse is received 01091 * @param arg User supplied argument to publish callback 01092 * @param sub 1 for subscribe, 0 for unsubscribe 01093 * @return ERR_OK if successful, @see err_t enum for other results 01094 */ 01095 err_t 01096 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) 01097 { 01098 size_t topic_strlen; 01099 size_t total_len; 01100 u16_t topic_len; 01101 u16_t remaining_length; 01102 u16_t pkt_id; 01103 struct mqtt_request_t *r; 01104 01105 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); 01106 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); 01107 01108 topic_strlen = strlen(topic); 01109 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 01110 topic_len = (u16_t)topic_strlen; 01111 /* Topic string, pkt_id, qos for subscribe */ 01112 total_len = topic_len + 2 + 2 + (sub != 0); 01113 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 01114 remaining_length = (u16_t)total_len; 01115 01116 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); 01117 if (client->conn_state == TCP_DISCONNECTED) { 01118 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); 01119 return ERR_CONN; 01120 } 01121 01122 pkt_id = msg_generate_packet_id(client); 01123 r = mqtt_create_request(client->req_list, pkt_id, cb, arg); 01124 if (r == NULL) { 01125 return ERR_MEM; 01126 } 01127 01128 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01129 mqtt_delete_request(r); 01130 return ERR_MEM; 01131 } 01132 01133 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); 01134 01135 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); 01136 /* Packet id */ 01137 mqtt_output_append_u16(&client->output, pkt_id); 01138 /* Topic */ 01139 mqtt_output_append_string(&client->output, topic, topic_len); 01140 /* QoS */ 01141 if (sub != 0) { 01142 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); 01143 } 01144 01145 mqtt_append_request(&client->pend_req_queue, r); 01146 mqtt_output_send(&client->output, client->conn); 01147 return ERR_OK; 01148 } 01149 01150 01151 /** 01152 * @ingroup mqtt 01153 * Set callback to handle incoming publish requests from server 01154 * @param client MQTT client 01155 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload 01156 * @param data_cb Callback for each fragment of payload that arrives 01157 * @param arg User supplied argument to both callbacks 01158 */ 01159 void 01160 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, 01161 mqtt_incoming_data_cb_t data_cb, void *arg) 01162 { 01163 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); 01164 client->data_cb = data_cb; 01165 client->pub_cb = pub_cb; 01166 client->inpub_arg = arg; 01167 } 01168 01169 /** 01170 * @ingroup mqtt 01171 * Create a new MQTT client instance 01172 * @return Pointer to instance on success, NULL otherwise 01173 */ 01174 mqtt_client_t * 01175 mqtt_client_new(void) 01176 { 01177 mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t)); 01178 if (client != NULL) { 01179 memset(client, 0, sizeof(mqtt_client_t)); 01180 } 01181 return client; 01182 } 01183 01184 01185 /** 01186 * @ingroup mqtt 01187 * Connect to MQTT server 01188 * @param client MQTT client 01189 * @param ip_addr Server IP 01190 * @param port Server port 01191 * @param cb Connection state change callback 01192 * @param arg User supplied argument to connection callback 01193 * @param client_info Client identification and connection options 01194 * @return ERR_OK if successful, @see err_t enum for other results 01195 */ 01196 err_t 01197 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, 01198 const struct mqtt_connect_client_info_t *client_info) 01199 { 01200 err_t err; 01201 size_t len; 01202 u16_t client_id_length; 01203 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ 01204 u16_t remaining_length = 2 + 4 + 1 + 1 + 2; 01205 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; 01206 01207 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); 01208 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); 01209 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); 01210 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); 01211 01212 if (client->conn_state != TCP_DISCONNECTED) { 01213 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n")); 01214 return ERR_ISCONN; 01215 } 01216 01217 /* Wipe clean */ 01218 memset(client, 0, sizeof(mqtt_client_t)); 01219 client->connect_arg = arg; 01220 client->connect_cb = cb; 01221 client->keep_alive = client_info->keep_alive; 01222 mqtt_init_requests(client->req_list); 01223 01224 /* Build connect message */ 01225 if (client_info->will_topic != NULL && client_info->will_msg != NULL) { 01226 flags |= MQTT_CONNECT_FLAG_WILL; 01227 flags |= (client_info->will_qos & 3) << 3; 01228 if (client_info->will_retain) { 01229 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; 01230 } 01231 len = strlen(client_info->will_topic); 01232 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); 01233 will_topic_len = (u8_t)len; 01234 len = strlen(client_info->will_msg); 01235 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); 01236 will_msg_len = (u8_t)len; 01237 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; 01238 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01239 remaining_length = (u16_t)len; 01240 } 01241 01242 /* Don't complicate things, always connect using clean session */ 01243 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; 01244 01245 len = strlen(client_info->client_id); 01246 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); 01247 client_id_length = (u16_t)len; 01248 len = remaining_length + 2 + client_id_length; 01249 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 01250 remaining_length = (u16_t)len; 01251 01252 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 01253 return ERR_MEM; 01254 } 01255 01256 client->conn = tcp_new(); 01257 if (client->conn == NULL) { 01258 return ERR_MEM; 01259 } 01260 01261 /* Set arg pointer for callbacks */ 01262 tcp_arg(client->conn, client); 01263 /* Any local address, pick random local port number */ 01264 err = tcp_bind(client->conn, IP_ADDR_ANY, 0); 01265 if (err != ERR_OK) { 01266 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); 01267 goto tcp_fail; 01268 } 01269 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); 01270 01271 /* Connect to server */ 01272 err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb); 01273 if (err != ERR_OK) { 01274 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); 01275 goto tcp_fail; 01276 } 01277 /* Set error callback */ 01278 tcp_err(client->conn, mqtt_tcp_err_cb); 01279 client->conn_state = TCP_CONNECTING; 01280 01281 /* Append fixed header */ 01282 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); 01283 /* Append Protocol string */ 01284 mqtt_output_append_string(&client->output, "MQTT", 4); 01285 /* Append Protocol level */ 01286 mqtt_output_append_u8(&client->output, 4); 01287 /* Append connect flags */ 01288 mqtt_output_append_u8(&client->output, flags); 01289 /* Append keep-alive */ 01290 mqtt_output_append_u16(&client->output, client_info->keep_alive); 01291 /* Append client id */ 01292 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); 01293 /* Append will message if used */ 01294 if (will_topic_len > 0) { 01295 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); 01296 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); 01297 } 01298 return ERR_OK; 01299 01300 tcp_fail: 01301 tcp_abort(client->conn); 01302 client->conn = NULL; 01303 return err; 01304 } 01305 01306 01307 /** 01308 * @ingroup mqtt 01309 * Disconnect from MQTT server 01310 * @param client MQTT client 01311 */ 01312 void 01313 mqtt_disconnect(mqtt_client_t *client) 01314 { 01315 LWIP_ASSERT("mqtt_disconnect: client != NULL", client); 01316 /* If connection in not already closed */ 01317 if (client->conn_state != TCP_DISCONNECTED) { 01318 /* Set conn_state before calling mqtt_close to prevent callback from being called */ 01319 client->conn_state = TCP_DISCONNECTED; 01320 mqtt_close(client, (mqtt_connection_status_t)0); 01321 } 01322 } 01323 01324 /** 01325 * @ingroup mqtt 01326 * Check connection with server 01327 * @param client MQTT client 01328 * @return 1 if connected to server, 0 otherwise 01329 */ 01330 u8_t 01331 mqtt_client_is_connected(mqtt_client_t *client) 01332 { 01333 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); 01334 return client->conn_state == MQTT_CONNECTED; 01335 }
Generated on Tue Jul 12 2022 11:02:27 by
1.7.2