Kenji Arai / mbed-os_TYBLE16

Dependents:   TYBLE16_simple_data_logger TYBLE16_MP3_Air

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers lwip_mqtt.c Source File

lwip_mqtt.c

Go to the documentation of this file.
00001 /**
00002  * @file
00003  * MQTT client
00004  *
00005  * @defgroup mqtt MQTT client
00006  * @ingroup apps
00007  * @verbinclude mqtt_client.txt
00008  */
00009 
00010 /*
00011  * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
00012  * All rights reserved.
00013  *
00014  * Redistribution and use in source and binary forms, with or without modification,
00015  * are permitted provided that the following conditions are met:
00016  *
00017  * 1. Redistributions of source code must retain the above copyright notice,
00018  *    this list of conditions and the following disclaimer.
00019  * 2. Redistributions in binary form must reproduce the above copyright notice,
00020  *    this list of conditions and the following disclaimer in the documentation
00021  *    and/or other materials provided with the distribution.
00022  * 3. The name of the author may not be used to endorse or promote products
00023  *    derived from this software without specific prior written permission.
00024  *
00025  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
00026  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
00027  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
00028  * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00029  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
00030  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00031  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00032  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
00033  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
00034  * OF SUCH DAMAGE.
00035  *
00036  * This file is part of the lwIP TCP/IP stack
00037  *
00038  * Author: Erik Andersson <erian747@gmail.com>
00039  *
00040  *
00041  * @todo:
00042  * - Handle large outgoing payloads for PUBLISH messages
00043  * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
00044  * - Add support for legacy MQTT protocol version
00045  *
00046  * Please coordinate changes and requests with Erik Andersson
00047  * Erik Andersson <erian747@gmail.com>
00048  *
00049  */
00050 #include "lwip/apps/mqtt.h"
00051 #include "lwip/apps/mqtt_priv.h"
00052 #include "lwip/timeouts.h"
00053 #include "lwip/ip_addr.h"
00054 #include "lwip/mem.h"
00055 #include "lwip/err.h"
00056 #include "lwip/pbuf.h"
00057 #include "lwip/altcp.h"
00058 #include "lwip/altcp_tcp.h"
00059 #include "lwip/altcp_tls.h"
00060 #include <string.h>
00061 
00062 #if LWIP_TCP && LWIP_CALLBACK_API
00063 
00064 /**
00065  * MQTT_DEBUG: Default is off.
00066  */
00067 #if !defined MQTT_DEBUG || defined __DOXYGEN__
00068 #define MQTT_DEBUG                  LWIP_DBG_OFF
00069 #endif
00070 
00071 #define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
00072 #define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
00073 #define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
00074 #define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
00075 #define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
00076 
00077 
00078 
00079 /**
00080  * MQTT client connection states
00081  */
00082 enum {
00083   TCP_DISCONNECTED,
00084   TCP_CONNECTING,
00085   MQTT_CONNECTING,
00086   MQTT_CONNECTED
00087 };
00088 
00089 /**
00090  * MQTT control message types
00091  */
00092 enum mqtt_message_type {
00093   MQTT_MSG_TYPE_CONNECT = 1,
00094   MQTT_MSG_TYPE_CONNACK = 2,
00095   MQTT_MSG_TYPE_PUBLISH = 3,
00096   MQTT_MSG_TYPE_PUBACK = 4,
00097   MQTT_MSG_TYPE_PUBREC = 5,
00098   MQTT_MSG_TYPE_PUBREL = 6,
00099   MQTT_MSG_TYPE_PUBCOMP = 7,
00100   MQTT_MSG_TYPE_SUBSCRIBE = 8,
00101   MQTT_MSG_TYPE_SUBACK = 9,
00102   MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
00103   MQTT_MSG_TYPE_UNSUBACK = 11,
00104   MQTT_MSG_TYPE_PINGREQ = 12,
00105   MQTT_MSG_TYPE_PINGRESP = 13,
00106   MQTT_MSG_TYPE_DISCONNECT = 14
00107 };
00108 
00109 /** Helpers to extract control packet type and qos from first byte in fixed header */
00110 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
00111 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
00112 
00113 /**
00114  * MQTT connect flags, only used in CONNECT message
00115  */
00116 enum mqtt_connect_flag {
00117   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
00118   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
00119   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
00120   MQTT_CONNECT_FLAG_WILL = 1 << 2,
00121   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
00122 };
00123 
00124 
00125 static void mqtt_cyclic_timer(void *arg);
00126 
00127 #if defined(LWIP_DEBUG)
00128 static const char *const mqtt_message_type_str[15] = {
00129   "UNDEFINED",
00130   "CONNECT",
00131   "CONNACK",
00132   "PUBLISH",
00133   "PUBACK",
00134   "PUBREC",
00135   "PUBREL",
00136   "PUBCOMP",
00137   "SUBSCRIBE",
00138   "SUBACK",
00139   "UNSUBSCRIBE",
00140   "UNSUBACK",
00141   "PINGREQ",
00142   "PINGRESP",
00143   "DISCONNECT"
00144 };
00145 
00146 /**
00147  * Message type value to string
00148  * @param msg_type see enum mqtt_message_type
00149  *
00150  * @return Control message type text string
00151  */
00152 static const char *
00153 mqtt_msg_type_to_str(u8_t msg_type)
00154 {
00155   if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
00156     msg_type = 0;
00157   }
00158   return mqtt_message_type_str[msg_type];
00159 }
00160 
00161 #endif
00162 
00163 
00164 /**
00165  * Generate MQTT packet identifier
00166  * @param client MQTT client
00167  * @return New packet identifier, range 1 to 65535
00168  */
00169 static u16_t
00170 msg_generate_packet_id(mqtt_client_t *client)
00171 {
00172   client->pkt_id_seq++;
00173   if (client->pkt_id_seq == 0) {
00174     client->pkt_id_seq++;
00175   }
00176   return client->pkt_id_seq;
00177 }
00178 
00179 /*--------------------------------------------------------------------------------------------------------------------- */
00180 /* Output ring buffer */
00181 
00182 /** Add single item to ring buffer */
00183 static void
00184 mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item)
00185 {
00186   rb->buf[rb->put] = item;
00187   rb->put++;
00188   if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) {
00189     rb->put = 0;
00190   }
00191 }
00192 
00193 /** Return pointer to ring buffer get position */
00194 static u8_t *
00195 mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb)
00196 {
00197   return &rb->buf[rb->get];
00198 }
00199 
00200 static void
00201 mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len)
00202 {
00203   LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE);
00204 
00205   rb->get += len;
00206   if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) {
00207     rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE;
00208   }
00209 }
00210 
00211 /** Return number of bytes in ring buffer */
00212 static u16_t
00213 mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb)
00214 {
00215   u32_t len = rb->put - rb->get;
00216   if (len > 0xFFFF) {
00217     len += MQTT_OUTPUT_RINGBUF_SIZE;
00218   }
00219   return (u16_t)len;
00220 }
00221 
00222 /** Return number of bytes free in ring buffer */
00223 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
00224 
00225 /** Return number of bytes possible to read without wrapping around */
00226 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get))
00227 
00228 /**
00229  * Try send as many bytes as possible from output ring buffer
00230  * @param rb Output ring buffer
00231  * @param tpcb TCP connection handle
00232  */
00233 static void
00234 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb)
00235 {
00236   err_t err;
00237   u8_t wrap = 0;
00238   u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
00239   u16_t send_len = altcp_sndbuf (tpcb);
00240   LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
00241 
00242   if (send_len == 0 || ringbuf_lin_len == 0) {
00243     return;
00244   }
00245 
00246   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
00247                                  send_len, ringbuf_lin_len, rb->get, rb->put));
00248 
00249   if (send_len > ringbuf_lin_len) {
00250     /* Space in TCP output buffer is larger than available in ring buffer linear portion */
00251     send_len = ringbuf_lin_len;
00252     /* Wrap around if more data in ring buffer after linear portion */
00253     wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
00254   }
00255   err = altcp_write (tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
00256   if ((err == ERR_OK) && wrap) {
00257     mqtt_ringbuf_advance_get_idx(rb, send_len);
00258     /* Use the lesser one of ring buffer linear length and TCP send buffer size */
00259     send_len = LWIP_MIN(altcp_sndbuf (tpcb), mqtt_ringbuf_linear_read_length(rb));
00260     err = altcp_write (tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
00261   }
00262 
00263   if (err == ERR_OK) {
00264     mqtt_ringbuf_advance_get_idx(rb, send_len);
00265     /* Flush */
00266     altcp_output (tpcb);
00267   } else {
00268     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
00269   }
00270 }
00271 
00272 
00273 
00274 /*--------------------------------------------------------------------------------------------------------------------- */
00275 /* Request queue */
00276 
00277 /**
00278  * Create request item
00279  * @param r_objs Pointer to request objects
00280  * @param r_objs_len Number of array entries
00281  * @param pkt_id Packet identifier of request
00282  * @param cb Packet callback to call when requests lifetime ends
00283  * @param arg Parameter following callback
00284  * @return Request or NULL if failed to create
00285  */
00286 static struct mqtt_request_t *
00287 mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
00288 {
00289   struct mqtt_request_t *r = NULL;
00290   u8_t n;
00291   LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
00292   for (n = 0; n < r_objs_len; n++) {
00293     /* Item point to itself if not in use */
00294     if (r_objs[n].next == &r_objs[n]) {
00295       r = &r_objs[n];
00296       r->next = NULL;
00297       r->cb = cb;
00298       r->arg = arg;
00299       r->pkt_id = pkt_id;
00300       break;
00301     }
00302   }
00303   return r;
00304 }
00305 
00306 
00307 /**
00308  * Append request to pending request queue
00309  * @param tail Pointer to request queue tail pointer
00310  * @param r Request to append
00311  */
00312 static void
00313 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
00314 {
00315   struct mqtt_request_t *head = NULL;
00316   s16_t time_before = 0;
00317   struct mqtt_request_t *iter;
00318 
00319   LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
00320 
00321   /* Iterate trough queue to find head, and count total timeout time */
00322   for (iter = *tail; iter != NULL; iter = iter->next) {
00323     time_before += iter->timeout_diff;
00324     head = iter;
00325   }
00326 
00327   LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
00328   r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
00329   if (head == NULL) {
00330     *tail = r;
00331   } else {
00332     head->next = r;
00333   }
00334 }
00335 
00336 
00337 /**
00338  * Delete request item
00339  * @param r Request item to delete
00340  */
00341 static void
00342 mqtt_delete_request(struct mqtt_request_t *r)
00343 {
00344   if (r != NULL) {
00345     r->next = r;
00346   }
00347 }
00348 
00349 /**
00350  * Remove a request item with a specific packet identifier from request queue
00351  * @param tail Pointer to request queue tail pointer
00352  * @param pkt_id Packet identifier of request to take
00353  * @return Request item if found, NULL if not
00354  */
00355 static struct mqtt_request_t *
00356 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
00357 {
00358   struct mqtt_request_t *iter = NULL, *prev = NULL;
00359   LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
00360   /* Search all request for pkt_id */
00361   for (iter = *tail; iter != NULL; iter = iter->next) {
00362     if (iter->pkt_id == pkt_id) {
00363       break;
00364     }
00365     prev = iter;
00366   }
00367 
00368   /* If request was found */
00369   if (iter != NULL) {
00370     /* unchain */
00371     if (prev == NULL) {
00372       *tail = iter->next;
00373     } else {
00374       prev->next = iter->next;
00375     }
00376     /* If exists, add remaining timeout time for the request to next */
00377     if (iter->next != NULL) {
00378       iter->next->timeout_diff += iter->timeout_diff;
00379     }
00380     iter->next = NULL;
00381   }
00382   return iter;
00383 }
00384 
00385 /**
00386  * Handle requests timeout
00387  * @param tail Pointer to request queue tail pointer
00388  * @param t Time since last call in seconds
00389  */
00390 static void
00391 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
00392 {
00393   struct mqtt_request_t *r;
00394   LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
00395   r = *tail;
00396   while (t > 0 && r != NULL) {
00397     if (t >= r->timeout_diff) {
00398       t -= (u8_t)r->timeout_diff;
00399       /* Unchain */
00400       *tail = r->next;
00401       /* Notify upper layer about timeout */
00402       if (r->cb != NULL) {
00403         r->cb(r->arg, ERR_TIMEOUT);
00404       }
00405       mqtt_delete_request(r);
00406       /* Tail might be be modified in callback, so re-read it in every iteration */
00407       r = *(struct mqtt_request_t *const volatile *)tail;
00408     } else {
00409       r->timeout_diff -= t;
00410       t = 0;
00411     }
00412   }
00413 }
00414 
00415 /**
00416  * Free all request items
00417  * @param tail Pointer to request queue tail pointer
00418  */
00419 static void
00420 mqtt_clear_requests(struct mqtt_request_t **tail)
00421 {
00422   struct mqtt_request_t *iter, *next;
00423   LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
00424   for (iter = *tail; iter != NULL; iter = next) {
00425     next = iter->next;
00426     mqtt_delete_request(iter);
00427   }
00428   *tail = NULL;
00429 }
00430 /**
00431  * Initialize all request items
00432  * @param r_objs Pointer to request objects
00433  * @param r_objs_len Number of array entries
00434  */
00435 static void
00436 mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len)
00437 {
00438   u8_t n;
00439   LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
00440   for (n = 0; n < r_objs_len; n++) {
00441     /* Item pointing to itself indicates unused */
00442     r_objs[n].next = &r_objs[n];
00443   }
00444 }
00445 
00446 /*--------------------------------------------------------------------------------------------------------------------- */
00447 /* Output message build helpers */
00448 
00449 
00450 static void
00451 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
00452 {
00453   mqtt_ringbuf_put(rb, value);
00454 }
00455 
00456 static
00457 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
00458 {
00459   mqtt_ringbuf_put(rb, value >> 8);
00460   mqtt_ringbuf_put(rb, value & 0xff);
00461 }
00462 
00463 static void
00464 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
00465 {
00466   u16_t n;
00467   for (n = 0; n < length; n++) {
00468     mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
00469   }
00470 }
00471 
00472 static void
00473 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
00474 {
00475   u16_t n;
00476   mqtt_ringbuf_put(rb, length >> 8);
00477   mqtt_ringbuf_put(rb, length & 0xff);
00478   for (n = 0; n < length; n++) {
00479     mqtt_ringbuf_put(rb, str[n]);
00480   }
00481 }
00482 
00483 /**
00484  * Append fixed header
00485  * @param rb Output ring buffer
00486  * @param msg_type see enum mqtt_message_type
00487  * @param fdup MQTT DUP flag
00488  * @param fqos MQTT QoS field
00489  * @param fretain MQTT retain flag
00490  * @param r_length Remaining length after fixed header
00491  */
00492 
00493 static void
00494 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup,
00495                                 u8_t fqos, u8_t fretain, u16_t r_length)
00496 {
00497   /* Start with control byte */
00498   mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1)));
00499   /* Encode remaining length field */
00500   do {
00501     mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
00502     r_length >>= 7;
00503   } while (r_length > 0);
00504 }
00505 
00506 
00507 /**
00508  * Check output buffer space
00509  * @param rb Output ring buffer
00510  * @param r_length Remaining length after fixed header
00511  * @return 1 if message will fit, 0 if not enough buffer space
00512  */
00513 static u8_t
00514 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
00515 {
00516   /* Start with length of type byte + remaining length */
00517   u16_t total_len = 1 + r_length;
00518 
00519   LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
00520 
00521   /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
00522   do {
00523     total_len++;
00524     r_length >>= 7;
00525   } while (r_length > 0);
00526 
00527   return (total_len <= mqtt_ringbuf_free(rb));
00528 }
00529 
00530 
00531 /**
00532  * Close connection to server
00533  * @param client MQTT client
00534  * @param reason Reason for disconnection
00535  */
00536 static void
00537 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
00538 {
00539   LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
00540 
00541   /* Bring down TCP connection if not already done */
00542   if (client->conn != NULL) {
00543     err_t res;
00544     altcp_recv (client->conn, NULL);
00545     altcp_err (client->conn,  NULL);
00546     altcp_sent (client->conn, NULL);
00547     res = altcp_close (client->conn);
00548     if (res != ERR_OK) {
00549       altcp_abort (client->conn);
00550       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res)));
00551     }
00552     client->conn = NULL;
00553   }
00554 
00555   /* Remove all pending requests */
00556   mqtt_clear_requests(&client->pend_req_queue);
00557   /* Stop cyclic timer */
00558   sys_untimeout(mqtt_cyclic_timer, client);
00559 
00560   /* Notify upper layer of disconnection if changed state */
00561   if (client->conn_state != TCP_DISCONNECTED) {
00562 
00563     client->conn_state = TCP_DISCONNECTED;
00564     if (client->connect_cb != NULL) {
00565       client->connect_cb(client, client->connect_arg, reason);
00566     }
00567   }
00568 }
00569 
00570 
00571 /**
00572  * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
00573  * @param arg MQTT client
00574  */
00575 static void
00576 mqtt_cyclic_timer(void *arg)
00577 {
00578   u8_t restart_timer = 1;
00579   mqtt_client_t *client = (mqtt_client_t *)arg;
00580   LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
00581 
00582   if (client->conn_state == MQTT_CONNECTING) {
00583     client->cyclic_tick++;
00584     if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
00585       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
00586       /* Disconnect TCP */
00587       mqtt_close(client, MQTT_CONNECT_TIMEOUT);
00588       restart_timer = 0;
00589     }
00590   } else if (client->conn_state == MQTT_CONNECTED) {
00591     /* Handle timeout for pending requests */
00592     mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
00593 
00594     /* keep_alive > 0 means keep alive functionality shall be used */
00595     if (client->keep_alive > 0) {
00596 
00597       client->server_watchdog++;
00598       /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
00599       if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) {
00600         LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
00601         mqtt_close(client, MQTT_CONNECT_TIMEOUT);
00602         restart_timer = 0;
00603       }
00604 
00605       /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
00606       if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
00607         LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
00608         if (mqtt_output_check_space(&client->output, 0) != 0) {
00609           mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
00610           client->cyclic_tick = 0;
00611         }
00612       } else {
00613         client->cyclic_tick++;
00614       }
00615     }
00616   } else {
00617     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
00618     restart_timer = 0;
00619   }
00620   if (restart_timer) {
00621     sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg);
00622   }
00623 }
00624 
00625 
00626 /**
00627  * Send PUBACK, PUBREC or PUBREL response message
00628  * @param client MQTT client
00629  * @param msg PUBACK, PUBREC or PUBREL
00630  * @param pkt_id Packet identifier
00631  * @param qos QoS value
00632  * @return ERR_OK if successful, ERR_MEM if out of memory
00633  */
00634 static err_t
00635 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
00636 {
00637   err_t err = ERR_OK;
00638   if (mqtt_output_check_space(&client->output, 2)) {
00639     mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
00640     mqtt_output_append_u16(&client->output, pkt_id);
00641     mqtt_output_send(&client->output, client->conn);
00642   } else {
00643     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
00644                                    mqtt_msg_type_to_str(msg), pkt_id));
00645     err = ERR_MEM;
00646   }
00647   return err;
00648 }
00649 
00650 /**
00651  * Subscribe response from server
00652  * @param r Matching request
00653  * @param result Result code from server
00654  */
00655 static void
00656 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
00657 {
00658   if (r->cb != NULL) {
00659     r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
00660   }
00661 }
00662 
00663 
00664 /**
00665  * Complete MQTT message received or buffer full
00666  * @param client MQTT client
00667  * @param fixed_hdr_idx header index
00668  * @param length length received part
00669  * @param remaining_length Remaining length of complete message
00670  */
00671 static mqtt_connection_status_t
00672 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
00673 {
00674   mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
00675 
00676   u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
00677   size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_idx;
00678 
00679   /* Control packet type */
00680   u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
00681   u16_t pkt_id = 0;
00682 
00683   LWIP_ASSERT("client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN", client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN);
00684   LWIP_ASSERT("fixed_hdr_idx <= client->msg_idx", fixed_hdr_idx <= client->msg_idx);
00685   LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN,
00686              return MQTT_CONNECT_DISCONNECTED);
00687 
00688   if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
00689     if (client->conn_state == MQTT_CONNECTING) {
00690       if (length < 2) {
00691         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n"));
00692         goto out_disconnect;
00693       }
00694       /* Get result code from CONNACK */
00695       res = (mqtt_connection_status_t)var_hdr_payload[1];
00696       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res));
00697       if (res == MQTT_CONNECT_ACCEPTED) {
00698         /* Reset cyclic_tick when changing to connected state */
00699         client->cyclic_tick = 0;
00700         client->conn_state = MQTT_CONNECTED;
00701         /* Notify upper layer */
00702         if (client->connect_cb != 0) {
00703           client->connect_cb(client, client->connect_arg, res);
00704         }
00705       }
00706     } else {
00707       LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n"));
00708     }
00709   } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
00710     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n"));
00711 
00712   } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
00713     u16_t payload_offset = 0;
00714     u16_t payload_length = length;
00715     u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
00716 
00717     if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
00718       /* Should have topic and pkt id*/
00719       u8_t *topic;
00720       u16_t after_topic;
00721       u8_t bkp;
00722       u16_t topic_len;
00723       u16_t qos_len = (qos ? 2U : 0U);
00724       if (length < 2 + qos_len) {
00725         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n"));
00726         goto out_disconnect;
00727       }
00728       topic_len = var_hdr_payload[0];
00729       topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
00730       if ((topic_len > length - (2 + qos_len)) ||
00731           (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) {
00732         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n"));
00733         goto out_disconnect;
00734       }
00735 
00736       topic = var_hdr_payload + 2;
00737       after_topic = 2 + topic_len;
00738       /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */
00739       if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) {
00740         LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
00741         goto out_disconnect;
00742       }
00743 
00744       /* id for QoS 1 and 2 */
00745       if (qos > 0) {
00746         if (length < after_topic + 2U) {
00747           LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n"));
00748           goto out_disconnect;
00749         }
00750         client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
00751         after_topic += 2;
00752       } else {
00753         client->inpub_pkt_id = 0;
00754       }
00755       /* Take backup of byte after topic */
00756       bkp = topic[topic_len];
00757       /* Zero terminate string */
00758       topic[topic_len] = 0;
00759       /* Payload data remaining in receive buffer */
00760       payload_length = length - after_topic;
00761       payload_offset = after_topic;
00762 
00763       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n",
00764                                      qos, topic, remaining_length + payload_length));
00765       if (client->pub_cb != NULL) {
00766         client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
00767       }
00768       /* Restore byte after topic */
00769       topic[topic_len] = bkp;
00770     }
00771     if (payload_length > 0 || remaining_length == 0) {
00772       if (length < (size_t)(payload_offset + payload_length)) {
00773         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n"));
00774         goto out_disconnect;
00775       }
00776       client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
00777       /* Reply if QoS > 0 */
00778       if (remaining_length == 0 && qos > 0) {
00779         /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
00780         u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
00781         LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
00782                                        mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
00783         pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
00784       }
00785     }
00786   } else {
00787     /* Get packet identifier */
00788     pkt_id = (u16_t)var_hdr_payload[0] << 8;
00789     pkt_id |= (u16_t)var_hdr_payload[1];
00790     if (pkt_id == 0) {
00791       LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
00792       goto out_disconnect;
00793     }
00794     if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
00795       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id));
00796       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
00797 
00798     } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
00799       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id));
00800       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
00801 
00802     } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
00803                pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
00804       struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
00805       if (r != NULL) {
00806         LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
00807         if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
00808           if (length < 3) {
00809             LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n"));
00810             goto out_disconnect;
00811           } else {
00812             mqtt_incomming_suback(r, var_hdr_payload[2]);
00813           }
00814         } else if (r->cb != NULL) {
00815           r->cb(r->arg, ERR_OK);
00816         }
00817         mqtt_delete_request(r);
00818       } else {
00819         LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
00820       }
00821     } else {
00822       LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
00823       goto out_disconnect;
00824     }
00825   }
00826   return res;
00827 out_disconnect:
00828   return MQTT_CONNECT_DISCONNECTED;
00829 }
00830 
00831 
00832 /**
00833  * MQTT incoming message parser
00834  * @param client MQTT client
00835  * @param p PBUF chain of received data
00836  * @return Connection status
00837  */
00838 static mqtt_connection_status_t
00839 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
00840 {
00841   u16_t in_offset = 0;
00842   u32_t msg_rem_len = 0;
00843   u8_t fixed_hdr_idx = 0;
00844   u8_t b = 0;
00845 
00846   while (p->tot_len > in_offset) {
00847     /* We ALWAYS parse the header here first. Even if the header was not
00848        included in this segment, we re-parse it here by buffering it in
00849        client->rx_buffer. client->msg_idx keeps track of this. */
00850     if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
00851 
00852       if (fixed_hdr_idx < client->msg_idx) {
00853         /* parse header from old pbuf (buffered in client->rx_buffer) */
00854         b = client->rx_buffer[fixed_hdr_idx];
00855       } else {
00856         /* parse header from this pbuf and save it in client->rx_buffer in case
00857            it comes in segmented */
00858         b = pbuf_get_at(p, in_offset++);
00859         client->rx_buffer[client->msg_idx++] = b;
00860       }
00861       fixed_hdr_idx++;
00862 
00863       if (fixed_hdr_idx >= 2) {
00864         /* fixed header contains at least 2 bytes but can contain more, depending on
00865            'remaining length'. All bytes but the last of this have 0x80 set to
00866            indicate more bytes are coming. */
00867         msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
00868         if ((b & 0x80) == 0) {
00869           /* fixed header is done */
00870           LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
00871           if (msg_rem_len == 0) {
00872             /* Complete message with no extra headers of payload received */
00873             mqtt_message_received(client, fixed_hdr_idx, 0, 0);
00874             client->msg_idx = 0;
00875             fixed_hdr_idx = 0;
00876           } else {
00877             /* Bytes remaining in message (changes remaining length if this is
00878                not the first segment of this message) */
00879             msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
00880           }
00881         }
00882       }
00883     } else {
00884       /* Fixed header has been parsed, parse variable header */
00885       u16_t cpy_len, cpy_start, buffer_space;
00886 
00887       cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
00888 
00889       /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
00890       cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
00891 
00892       /* Limit to available space in buffer */
00893       buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
00894       if (cpy_len > buffer_space) {
00895         cpy_len = buffer_space;
00896       }
00897       pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset);
00898 
00899       /* Advance get and put indexes  */
00900       client->msg_idx += cpy_len;
00901       in_offset += cpy_len;
00902       msg_rem_len -= cpy_len;
00903 
00904       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len));
00905       if ((msg_rem_len == 0) || (cpy_len == buffer_space)) {
00906         /* Whole message received or buffer is full */
00907         mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
00908         if (res != MQTT_CONNECT_ACCEPTED) {
00909           return res;
00910         }
00911         if (msg_rem_len == 0) {
00912           /* Reset parser state */
00913           client->msg_idx = 0;
00914           /* msg_tot_len = 0; */
00915           fixed_hdr_idx = 0;
00916         }
00917       }
00918     }
00919   }
00920   return MQTT_CONNECT_ACCEPTED;
00921 }
00922 
00923 
00924 /**
00925  * TCP received callback function. @see tcp_recv_fn
00926  * @param arg MQTT client
00927  * @param p PBUF chain of received data
00928  * @param err Passed as return value if not ERR_OK
00929  * @return ERR_OK or err passed into callback
00930  */
00931 static err_t
00932 mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err)
00933 {
00934   mqtt_client_t *client = (mqtt_client_t *)arg;
00935   LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
00936   LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
00937 
00938   if (p == NULL) {
00939     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
00940     mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
00941   } else {
00942     mqtt_connection_status_t res;
00943     if (err != ERR_OK) {
00944       LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err));
00945       pbuf_free(p);
00946       return err;
00947     }
00948 
00949     /* Tell remote that data has been received */
00950     altcp_recved (pcb, p->tot_len);
00951     res = mqtt_parse_incoming(client, p);
00952     pbuf_free(p);
00953 
00954     if (res != MQTT_CONNECT_ACCEPTED) {
00955       mqtt_close(client, res);
00956     }
00957     /* If keep alive functionality is used */
00958     if (client->keep_alive != 0) {
00959       /* Reset server alive watchdog */
00960       client->server_watchdog = 0;
00961     }
00962 
00963   }
00964   return ERR_OK;
00965 }
00966 
00967 
00968 /**
00969  * TCP data sent callback function. @see tcp_sent_fn
00970  * @param arg MQTT client
00971  * @param tpcb TCP connection handle
00972  * @param len Number of bytes sent
00973  * @return ERR_OK
00974  */
00975 static err_t
00976 mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len)
00977 {
00978   mqtt_client_t *client = (mqtt_client_t *)arg;
00979 
00980   LWIP_UNUSED_ARG(tpcb);
00981   LWIP_UNUSED_ARG(len);
00982 
00983   if (client->conn_state == MQTT_CONNECTED) {
00984     struct mqtt_request_t *r;
00985 
00986     /* Reset keep-alive send timer and server watchdog */
00987     client->cyclic_tick = 0;
00988     client->server_watchdog = 0;
00989     /* QoS 0 publish has no response from server, so call its callbacks here */
00990     while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
00991       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
00992       if (r->cb != NULL) {
00993         r->cb(r->arg, ERR_OK);
00994       }
00995       mqtt_delete_request(r);
00996     }
00997     /* Try send any remaining buffers from output queue */
00998     mqtt_output_send(&client->output, client->conn);
00999   }
01000   return ERR_OK;
01001 }
01002 
01003 /**
01004  * TCP error callback function. @see tcp_err_fn
01005  * @param arg MQTT client
01006  * @param err Error encountered
01007  */
01008 static void
01009 mqtt_tcp_err_cb(void *arg, err_t err)
01010 {
01011   mqtt_client_t *client = (mqtt_client_t *)arg;
01012   LWIP_UNUSED_ARG(err); /* only used for debug output */
01013   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
01014   LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
01015   /* Set conn to null before calling close as pcb is already deallocated*/
01016   client->conn = 0;
01017   mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
01018 }
01019 
01020 /**
01021  * TCP poll callback function. @see tcp_poll_fn
01022  * @param arg MQTT client
01023  * @param tpcb TCP connection handle
01024  * @return err ERR_OK
01025  */
01026 static err_t
01027 mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb)
01028 {
01029   mqtt_client_t *client = (mqtt_client_t *)arg;
01030   if (client->conn_state == MQTT_CONNECTED) {
01031     /* Try send any remaining buffers from output queue */
01032     mqtt_output_send(&client->output, tpcb);
01033   }
01034   return ERR_OK;
01035 }
01036 
01037 /**
01038  * TCP connect callback function. @see tcp_connected_fn
01039  * @param arg MQTT client
01040  * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
01041  * @return ERR_OK
01042  */
01043 static err_t
01044 mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err)
01045 {
01046   mqtt_client_t *client = (mqtt_client_t *)arg;
01047 
01048   if (err != ERR_OK) {
01049     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
01050     return err;
01051   }
01052 
01053   /* Initiate receiver state */
01054   client->msg_idx = 0;
01055 
01056   /* Setup TCP callbacks */
01057   altcp_recv (tpcb, mqtt_tcp_recv_cb);
01058   altcp_sent (tpcb, mqtt_tcp_sent_cb);
01059   altcp_poll (tpcb, mqtt_tcp_poll_cb, 2);
01060 
01061   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n"));
01062   /* Enter MQTT connect state */
01063   client->conn_state = MQTT_CONNECTING;
01064 
01065   /* Start cyclic timer */
01066   sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client);
01067   client->cyclic_tick = 0;
01068 
01069   /* Start transmission from output queue, connect message is the first one out*/
01070   mqtt_output_send(&client->output, client->conn);
01071 
01072   return ERR_OK;
01073 }
01074 
01075 
01076 
01077 /*---------------------------------------------------------------------------------------------------- */
01078 /* Public API */
01079 
01080 
01081 /**
01082  * @ingroup mqtt
01083  * MQTT publish function.
01084  * @param client MQTT client
01085  * @param topic Publish topic string
01086  * @param payload Data to publish (NULL is allowed)
01087  * @param payload_length Length of payload (0 is allowed)
01088  * @param qos Quality of service, 0 1 or 2
01089  * @param retain MQTT retain flag
01090  * @param cb Callback to call when publish is complete or has timed out
01091  * @param arg User supplied argument to publish callback
01092  * @return ERR_OK if successful
01093  *         ERR_CONN if client is disconnected
01094  *         ERR_MEM if short on memory
01095  */
01096 err_t
01097 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
01098              mqtt_request_cb_t cb, void *arg)
01099 {
01100   struct mqtt_request_t *r;
01101   u16_t pkt_id;
01102   size_t topic_strlen;
01103   size_t total_len;
01104   u16_t topic_len;
01105   u16_t remaining_length;
01106 
01107   LWIP_ASSERT_CORE_LOCKED();
01108   LWIP_ASSERT("mqtt_publish: client != NULL", client);
01109   LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
01110   LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
01111 
01112   topic_strlen = strlen(topic);
01113   LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
01114   topic_len = (u16_t)topic_strlen;
01115   total_len = 2 + topic_len + payload_length;
01116 
01117   if (qos > 0) {
01118     total_len += 2;
01119     /* Generate pkt_id id for QoS1 and 2 */
01120     pkt_id = msg_generate_packet_id(client);
01121   } else {
01122     /* Use reserved value pkt_id 0 for QoS 0 in request handle */
01123     pkt_id = 0;
01124   }
01125   LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
01126   remaining_length = (u16_t)total_len;
01127 
01128   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
01129 
01130   r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
01131   if (r == NULL) {
01132     return ERR_MEM;
01133   }
01134 
01135   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01136     mqtt_delete_request(r);
01137     return ERR_MEM;
01138   }
01139   /* Append fixed header */
01140   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
01141 
01142   /* Append Topic */
01143   mqtt_output_append_string(&client->output, topic, topic_len);
01144 
01145   /* Append packet if for QoS 1 and 2*/
01146   if (qos > 0) {
01147     mqtt_output_append_u16(&client->output, pkt_id);
01148   }
01149 
01150   /* Append optional publish payload */
01151   if ((payload != NULL) && (payload_length > 0)) {
01152     mqtt_output_append_buf(&client->output, payload, payload_length);
01153   }
01154 
01155   mqtt_append_request(&client->pend_req_queue, r);
01156   mqtt_output_send(&client->output, client->conn);
01157   return ERR_OK;
01158 }
01159 
01160 
01161 /**
01162  * @ingroup mqtt
01163  * MQTT subscribe/unsubscribe function.
01164  * @param client MQTT client
01165  * @param topic topic to subscribe to
01166  * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
01167  * @param cb Callback to call when subscribe/unsubscribe reponse is received
01168  * @param arg User supplied argument to publish callback
01169  * @param sub 1 for subscribe, 0 for unsubscribe
01170  * @return ERR_OK if successful, @see err_t enum for other results
01171  */
01172 err_t
01173 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
01174 {
01175   size_t topic_strlen;
01176   size_t total_len;
01177   u16_t topic_len;
01178   u16_t remaining_length;
01179   u16_t pkt_id;
01180   struct mqtt_request_t *r;
01181 
01182   LWIP_ASSERT_CORE_LOCKED();
01183   LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
01184   LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
01185 
01186   topic_strlen = strlen(topic);
01187   LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
01188   topic_len = (u16_t)topic_strlen;
01189   /* Topic string, pkt_id, qos for subscribe */
01190   total_len =  topic_len + 2 + 2 + (sub != 0);
01191   LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
01192   remaining_length = (u16_t)total_len;
01193 
01194   LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
01195   if (client->conn_state == TCP_DISCONNECTED) {
01196     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
01197     return ERR_CONN;
01198   }
01199 
01200   pkt_id = msg_generate_packet_id(client);
01201   r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
01202   if (r == NULL) {
01203     return ERR_MEM;
01204   }
01205 
01206   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01207     mqtt_delete_request(r);
01208     return ERR_MEM;
01209   }
01210 
01211   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
01212 
01213   mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
01214   /* Packet id */
01215   mqtt_output_append_u16(&client->output, pkt_id);
01216   /* Topic */
01217   mqtt_output_append_string(&client->output, topic, topic_len);
01218   /* QoS */
01219   if (sub != 0) {
01220     mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
01221   }
01222 
01223   mqtt_append_request(&client->pend_req_queue, r);
01224   mqtt_output_send(&client->output, client->conn);
01225   return ERR_OK;
01226 }
01227 
01228 
01229 /**
01230  * @ingroup mqtt
01231  * Set callback to handle incoming publish requests from server
01232  * @param client MQTT client
01233  * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
01234  * @param data_cb Callback for each fragment of payload that arrives
01235  * @param arg User supplied argument to both callbacks
01236  */
01237 void
01238 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
01239                         mqtt_incoming_data_cb_t data_cb, void *arg)
01240 {
01241   LWIP_ASSERT_CORE_LOCKED();
01242   LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
01243   client->data_cb = data_cb;
01244   client->pub_cb = pub_cb;
01245   client->inpub_arg = arg;
01246 }
01247 
01248 /**
01249  * @ingroup mqtt
01250  * Create a new MQTT client instance
01251  * @return Pointer to instance on success, NULL otherwise
01252  */
01253 mqtt_client_t *
01254 mqtt_client_new(void)
01255 {
01256   LWIP_ASSERT_CORE_LOCKED();
01257   return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t));
01258 }
01259 
01260 /**
01261  * @ingroup mqtt
01262  * Free MQTT client instance
01263  * @param client Pointer to instance to be freed
01264  */
01265 void
01266 mqtt_client_free(mqtt_client_t *client)
01267 {
01268   mem_free(client);
01269 }
01270 
01271 /**
01272  * @ingroup mqtt
01273  * Connect to MQTT server
01274  * @param client MQTT client
01275  * @param ip_addr Server IP
01276  * @param port Server port
01277  * @param cb Connection state change callback
01278  * @param arg User supplied argument to connection callback
01279  * @param client_info Client identification and connection options
01280  * @return ERR_OK if successful, @see err_t enum for other results
01281  */
01282 err_t
01283 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
01284                     const struct mqtt_connect_client_info_t *client_info)
01285 {
01286   err_t err;
01287   size_t len;
01288   u16_t client_id_length;
01289   /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
01290   u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
01291   u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
01292   u16_t client_user_len = 0, client_pass_len = 0;
01293 
01294   LWIP_ASSERT_CORE_LOCKED();
01295   LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
01296   LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
01297   LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
01298   LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
01299 
01300   if (client->conn_state != TCP_DISCONNECTED) {
01301     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n"));
01302     return ERR_ISCONN;
01303   }
01304 
01305   /* Wipe clean */
01306   memset(client, 0, sizeof(mqtt_client_t));
01307   client->connect_arg = arg;
01308   client->connect_cb = cb;
01309   client->keep_alive = client_info->keep_alive;
01310   mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list));
01311 
01312   /* Build connect message */
01313   if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
01314     flags |= MQTT_CONNECT_FLAG_WILL;
01315     flags |= (client_info->will_qos & 3) << 3;
01316     if (client_info->will_retain) {
01317       flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
01318     }
01319     len = strlen(client_info->will_topic);
01320     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
01321     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
01322     will_topic_len = (u8_t)len;
01323     len = strlen(client_info->will_msg);
01324     LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
01325     will_msg_len = (u8_t)len;
01326     len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
01327     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01328     remaining_length = (u16_t)len;
01329   }
01330   if (client_info->client_user != NULL) {
01331     flags |= MQTT_CONNECT_FLAG_USERNAME;
01332     len = strlen(client_info->client_user);
01333     LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL);
01334     LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL);
01335     client_user_len = (u16_t)len;
01336     len = remaining_length + 2 + client_user_len;
01337     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01338     remaining_length = (u16_t)len;
01339   }
01340   if (client_info->client_pass != NULL) {
01341     flags |= MQTT_CONNECT_FLAG_PASSWORD;
01342     len = strlen(client_info->client_pass);
01343     LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL);
01344     LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL);
01345     client_pass_len = (u16_t)len;
01346     len = remaining_length + 2 + client_pass_len;
01347     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01348     remaining_length = (u16_t)len;
01349   }
01350 
01351   /* Don't complicate things, always connect using clean session */
01352   flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
01353 
01354   len = strlen(client_info->client_id);
01355   LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
01356   client_id_length = (u16_t)len;
01357   len = remaining_length + 2 + client_id_length;
01358   LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01359   remaining_length = (u16_t)len;
01360 
01361   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01362     return ERR_MEM;
01363   }
01364 
01365 #if LWIP_ALTCP && LWIP_ALTCP_TLS
01366   if (client_info->tls_config) {
01367     client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr));
01368   } else
01369 #endif
01370   {
01371     client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr));
01372   }
01373   if (client->conn == NULL) {
01374     return ERR_MEM;
01375   }
01376 
01377   /* Set arg pointer for callbacks */
01378   altcp_arg (client->conn, client);
01379   /* Any local address, pick random local port number */
01380   err = altcp_bind (client->conn, IP_ADDR_ANY, 0);
01381   if (err != ERR_OK) {
01382     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
01383     goto tcp_fail;
01384   }
01385   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
01386 
01387   /* Connect to server */
01388   err = altcp_connect (client->conn, ip_addr, port, mqtt_tcp_connect_cb);
01389   if (err != ERR_OK) {
01390     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
01391     goto tcp_fail;
01392   }
01393   /* Set error callback */
01394   altcp_err (client->conn, mqtt_tcp_err_cb);
01395   client->conn_state = TCP_CONNECTING;
01396 
01397   /* Append fixed header */
01398   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
01399   /* Append Protocol string */
01400   mqtt_output_append_string(&client->output, "MQTT", 4);
01401   /* Append Protocol level */
01402   mqtt_output_append_u8(&client->output, 4);
01403   /* Append connect flags */
01404   mqtt_output_append_u8(&client->output, flags);
01405   /* Append keep-alive */
01406   mqtt_output_append_u16(&client->output, client_info->keep_alive);
01407   /* Append client id */
01408   mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
01409   /* Append will message if used */
01410   if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
01411     mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
01412     mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
01413   }
01414   /* Append user name if given */
01415   if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
01416     mqtt_output_append_string(&client->output, client_info->client_user, client_user_len);
01417   }
01418   /* Append password if given */
01419   if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
01420     mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len);
01421   }
01422   return ERR_OK;
01423 
01424 tcp_fail:
01425   altcp_abort (client->conn);
01426   client->conn = NULL;
01427   return err;
01428 }
01429 
01430 
01431 /**
01432  * @ingroup mqtt
01433  * Disconnect from MQTT server
01434  * @param client MQTT client
01435  */
01436 void
01437 mqtt_disconnect(mqtt_client_t *client)
01438 {
01439   LWIP_ASSERT_CORE_LOCKED();
01440   LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
01441   /* If connection in not already closed */
01442   if (client->conn_state != TCP_DISCONNECTED) {
01443     /* Set conn_state before calling mqtt_close to prevent callback from being called */
01444     client->conn_state = TCP_DISCONNECTED;
01445     mqtt_close(client, (mqtt_connection_status_t)0);
01446   }
01447 }
01448 
01449 /**
01450  * @ingroup mqtt
01451  * Check connection with server
01452  * @param client MQTT client
01453  * @return 1 if connected to server, 0 otherwise
01454  */
01455 u8_t
01456 mqtt_client_is_connected(mqtt_client_t *client)
01457 {
01458   LWIP_ASSERT_CORE_LOCKED();
01459   LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
01460   return client->conn_state == MQTT_CONNECTED;
01461 }
01462 
01463 #endif /* LWIP_TCP && LWIP_CALLBACK_API */