Daiki Kato / mbed-os-lychee

Dependents:   mbed-os-example-blinky-gr-lychee GR-Boads_Camera_sample GR-Boards_Audio_Recoder GR-Boads_Camera_DisplayApp ... more

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers lwip_mqtt.c Source File

lwip_mqtt.c

Go to the documentation of this file.
00001 /**
00002  * @file
00003  * MQTT client
00004  *
00005  * @defgroup mqtt MQTT client
00006  * @ingroup apps
00007  * @verbinclude mqtt_client.txt
00008  */
00009 
00010 /*
00011  * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
00012  * All rights reserved.
00013  *
00014  * Redistribution and use in source and binary forms, with or without modification,
00015  * are permitted provided that the following conditions are met:
00016  *
00017  * 1. Redistributions of source code must retain the above copyright notice,
00018  *    this list of conditions and the following disclaimer.
00019  * 2. Redistributions in binary form must reproduce the above copyright notice,
00020  *    this list of conditions and the following disclaimer in the documentation
00021  *    and/or other materials provided with the distribution.
00022  * 3. The name of the author may not be used to endorse or promote products
00023  *    derived from this software without specific prior written permission.
00024  *
00025  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
00026  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
00027  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
00028  * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00029  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
00030  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00031  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00032  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
00033  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
00034  * OF SUCH DAMAGE.
00035  *
00036  * This file is part of the lwIP TCP/IP stack
00037  *
00038  * Author: Erik Andersson <erian747@gmail.com>
00039  *
00040  *
00041  * @todo:
00042  * - Handle large outgoing payloads for PUBLISH messages
00043  * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
00044  * - Add support for legacy MQTT protocol version
00045  *
00046  * Please coordinate changes and requests with Erik Andersson
00047  * Erik Andersson <erian747@gmail.com>
00048  *
00049  */
00050 #include <string.h>
00051 #include "lwip/timeouts.h"
00052 #include "lwip/ip_addr.h"
00053 #include "lwip/mem.h"
00054 #include "lwip/err.h"
00055 #include "lwip/pbuf.h"
00056 #include "lwip/tcp.h"
00057 #include "lwip/apps/mqtt.h"
00058 
00059 /**
00060  * MQTT_DEBUG: Default is off.
00061  */
00062 #if !defined MQTT_DEBUG || defined __DOXYGEN__
00063 #define MQTT_DEBUG                  LWIP_DBG_OFF
00064 #endif
00065 
00066 #define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
00067 #define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
00068 #define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
00069 #define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
00070 #define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
00071 
00072 static void mqtt_cyclic_timer(void *arg);
00073 
00074 /**
00075  * MQTT client connection states
00076  */
00077 enum {
00078   TCP_DISCONNECTED,
00079   TCP_CONNECTING,
00080   MQTT_CONNECTING,
00081   MQTT_CONNECTED
00082 };
00083 
00084 /**
00085  * MQTT control message types
00086  */
00087 enum mqtt_message_type {
00088   MQTT_MSG_TYPE_CONNECT = 1,
00089   MQTT_MSG_TYPE_CONNACK = 2,
00090   MQTT_MSG_TYPE_PUBLISH = 3,
00091   MQTT_MSG_TYPE_PUBACK = 4,
00092   MQTT_MSG_TYPE_PUBREC = 5,
00093   MQTT_MSG_TYPE_PUBREL = 6,
00094   MQTT_MSG_TYPE_PUBCOMP = 7,
00095   MQTT_MSG_TYPE_SUBSCRIBE = 8,
00096   MQTT_MSG_TYPE_SUBACK = 9,
00097   MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
00098   MQTT_MSG_TYPE_UNSUBACK = 11,
00099   MQTT_MSG_TYPE_PINGREQ = 12,
00100   MQTT_MSG_TYPE_PINGRESP = 13,
00101   MQTT_MSG_TYPE_DISCONNECT = 14
00102 };
00103 
00104 /** Helpers to extract control packet type and qos from first byte in fixed header */
00105 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
00106 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
00107 
00108 /**
00109  * MQTT connect flags, only used in CONNECT message
00110  */
00111 enum mqtt_connect_flag {
00112   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
00113   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
00114   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
00115   MQTT_CONNECT_FLAG_WILL = 1 << 2,
00116   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
00117 };
00118 
00119 
00120 #if defined(LWIP_DEBUG)
00121 static const char * const mqtt_message_type_str[15] =
00122 {
00123   "UNDEFINED",
00124   "CONNECT",
00125   "CONNACK",
00126   "PUBLISH",
00127   "PUBACK",
00128   "PUBREC",
00129   "PUBREL",
00130   "PUBCOMP",
00131   "SUBSCRIBE",
00132   "SUBACK",
00133   "UNSUBSCRIBE",
00134   "UNSUBACK",
00135   "PINGREQ",
00136   "PINGRESP",
00137   "DISCONNECT"
00138 };
00139 
00140 /**
00141  * Message type value to string
00142  * @param msg_type see enum mqtt_message_type
00143  * 
00144  * @return Control message type text string
00145  */
00146 static const char *
00147 mqtt_msg_type_to_str(u8_t msg_type)
00148 {
00149   if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
00150     msg_type = 0;
00151   }
00152   return mqtt_message_type_str[msg_type];
00153 }
00154 
00155 #endif
00156 
00157 
00158 /**
00159  * Generate MQTT packet identifier
00160  * @param client MQTT client
00161  * @return New packet identifier, range 1 to 65535
00162  */
00163 static u16_t
00164 msg_generate_packet_id(mqtt_client_t *client)
00165 {
00166   client->pkt_id_seq++;
00167   if (client->pkt_id_seq == 0) {
00168     client->pkt_id_seq++;
00169   }
00170   return client->pkt_id_seq;
00171 }
00172 
00173 /*--------------------------------------------------------------------------------------------------------------------- */
00174 /* Output ring buffer */
00175 
00176 
00177 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1)
00178 
00179 /** Add single item to ring buffer */
00180 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item)
00181 
00182 /** Return number of bytes in ring buffer */
00183 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get))
00184 
00185 /** Return number of bytes free in ring buffer */
00186 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
00187 
00188 /** Return number of bytes possible to read without wrapping around */
00189 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK)))
00190 
00191 /** Return pointer to ring buffer get position */
00192 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK])
00193 
00194 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len))
00195 
00196 
00197 /**
00198  * Try send as many bytes as possible from output ring buffer
00199  * @param rb Output ring buffer
00200  * @param tpcb TCP connection handle
00201  */
00202 static void
00203 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
00204 {
00205   err_t err;
00206   u8_t wrap = 0;
00207   u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
00208   u16_t send_len = tcp_sndbuf(tpcb);
00209   LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
00210 
00211   if (send_len == 0 || ringbuf_lin_len == 0) {
00212     return;
00213   }
00214 
00215   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
00216                                 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK)));
00217 
00218   if (send_len > ringbuf_lin_len) {
00219     /* Space in TCP output buffer is larger than available in ring buffer linear portion */
00220     send_len = ringbuf_lin_len;
00221     /* Wrap around if more data in ring buffer after linear portion */
00222     wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
00223   }
00224   err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
00225   if ((err == ERR_OK) && wrap) {
00226     mqtt_ringbuf_advance_get_idx(rb, send_len);
00227     /* Use the lesser one of ring buffer linear length and TCP send buffer size */
00228     send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
00229     err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
00230   }
00231 
00232   if (err == ERR_OK) {
00233     mqtt_ringbuf_advance_get_idx(rb, send_len);
00234     /* Flush */
00235     tcp_output(tpcb);
00236   } else {
00237     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
00238   }
00239 }
00240 
00241 
00242 
00243 /*--------------------------------------------------------------------------------------------------------------------- */
00244 /* Request queue */
00245 
00246 /**
00247  * Create request item
00248  * @param r_objs Pointer to request objects
00249  * @param pkt_id Packet identifier of request
00250  * @param cb Packet callback to call when requests lifetime ends
00251  * @param arg Parameter following callback
00252  * @return Request or NULL if failed to create
00253  */
00254 static struct mqtt_request_t *
00255 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
00256 {
00257   struct mqtt_request_t *r = NULL;
00258   u8_t n;
00259   LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
00260   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT && r == NULL; n++) {
00261     /* Item point to itself if not in use */
00262     if (r_objs[n].next == &r_objs[n]) {
00263       r = &r_objs[n];
00264       r->next = NULL;
00265       r->cb = cb;
00266       r->arg = arg;
00267       r->pkt_id = pkt_id;
00268     }
00269   }
00270   return r;
00271 }
00272 
00273 
00274 /**
00275  * Append request to pending request queue
00276  * @param tail Pointer to request queue tail pointer
00277  * @param r Request to append
00278  */
00279 static void
00280 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
00281 {
00282   struct mqtt_request_t *head = NULL;
00283   s16_t time_before = 0;
00284   struct mqtt_request_t *iter = *tail;
00285 
00286   LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
00287 
00288   /* Iterate trough queue to find head, and count total timeout time */
00289   for (iter = *tail; iter != NULL; iter = iter->next) {
00290     time_before += iter->timeout_diff;
00291     head = iter;
00292   }
00293 
00294   LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
00295   r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
00296   if (head == NULL) {
00297     *tail = r;
00298   } else {
00299     head->next = r;
00300   }
00301 }
00302 
00303 
00304 /**
00305  * Delete request item
00306  * @param r Request item to delete
00307  */
00308 static void
00309 mqtt_delete_request(struct mqtt_request_t *r)
00310 {
00311   if (r != NULL) {
00312     r->next = r;
00313   }
00314 }
00315 
00316 /**
00317  * Remove a request item with a specific packet identifier from request queue
00318  * @param tail Pointer to request queue tail pointer
00319  * @param pkt_id Packet identifier of request to take
00320  * @return Request item if found, NULL if not
00321  */
00322 static struct mqtt_request_t *
00323 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
00324 {
00325   struct mqtt_request_t *iter = NULL, *prev = NULL;
00326   LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
00327   /* Search all request for pkt_id */
00328   for (iter = *tail; iter != NULL; iter = iter->next) {
00329     if (iter->pkt_id == pkt_id) {
00330       break;
00331     }
00332     prev = iter;
00333   }
00334 
00335   /* If request was found */
00336   if (iter != NULL) {
00337     /* unchain */
00338     if (prev == NULL) {
00339       *tail= iter->next;
00340     } else {
00341       prev->next = iter->next;
00342     }
00343     /* If exists, add remaining timeout time for the request to next */
00344     if (iter->next != NULL) {
00345       iter->next->timeout_diff += iter->timeout_diff;
00346     }
00347     iter->next = NULL;
00348   }
00349   return iter;
00350 }
00351 
00352 /**
00353  * Handle requests timeout
00354  * @param tail Pointer to request queue tail pointer
00355  * @param t Time since last call in seconds
00356  */
00357 static void
00358 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
00359 {
00360   struct mqtt_request_t *r = *tail;
00361   LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
00362   while (t > 0 && r != NULL) {
00363     if (t >= r->timeout_diff) {
00364       t -= (u8_t)r->timeout_diff;
00365       /* Unchain */
00366       *tail = r->next;
00367       /* Notify upper layer about timeout */
00368       if (r->cb != NULL) {
00369         r->cb(r->arg, ERR_TIMEOUT);
00370       }
00371       mqtt_delete_request(r);
00372       /* Tail might be be modified in callback, so re-read it in every iteration */
00373       r = *(struct mqtt_request_t * const volatile *)tail;
00374     } else {
00375       r->timeout_diff -= t;
00376       t = 0;
00377     }
00378   }
00379 }
00380 
00381 /**
00382  * Free all request items
00383  * @param tail Pointer to request queue tail pointer
00384  */
00385 static void
00386 mqtt_clear_requests(struct mqtt_request_t **tail)
00387 {
00388   struct mqtt_request_t *iter, *next;
00389   LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
00390   for (iter = *tail; iter != NULL; iter = next) {
00391     next = iter->next;
00392     mqtt_delete_request(iter);
00393   }
00394   *tail = NULL;
00395 }
00396 /**
00397  * Initialize all request items
00398  * @param r_objs Pointer to request objects
00399  */
00400 static void
00401 mqtt_init_requests(struct mqtt_request_t *r_objs)
00402 {
00403   u8_t n;
00404   LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
00405   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
00406     /* Item pointing to itself indicates unused */
00407     r_objs[n].next = &r_objs[n];
00408   }
00409 }
00410 
00411 /*--------------------------------------------------------------------------------------------------------------------- */
00412 /* Output message build helpers */
00413 
00414 
00415 static void
00416 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
00417 {
00418   mqtt_ringbuf_put(rb, value);
00419 }
00420 
00421 static
00422 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
00423 {
00424   mqtt_ringbuf_put(rb, value >> 8);
00425   mqtt_ringbuf_put(rb, value & 0xff);
00426 }
00427 
00428 static void
00429 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
00430 {
00431   u16_t n;
00432   for (n = 0; n < length; n++) {
00433     mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
00434   }
00435 }
00436 
00437 static void
00438 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
00439 {
00440   u16_t n;
00441   mqtt_ringbuf_put(rb, length >> 8);
00442   mqtt_ringbuf_put(rb, length & 0xff);
00443   for (n = 0; n < length; n++) {
00444     mqtt_ringbuf_put(rb, str[n]);
00445   }
00446 }
00447 
00448 /**
00449  * Append fixed header
00450  * @param rb Output ring buffer
00451  * @param msg_type see enum mqtt_message_type
00452  * @param dup MQTT DUP flag
00453  * @param qos MQTT QoS field
00454  * @param retain MQTT retain flag
00455  * @param r_length Remaining length after fixed header
00456  */
00457 
00458 static void
00459 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
00460                  u8_t qos, u8_t retain, u16_t r_length)
00461 {
00462   /* Start with control byte */
00463   mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
00464   /* Encode remaining length field */
00465   do {
00466     mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
00467     r_length >>= 7;
00468   } while (r_length > 0);
00469 }
00470 
00471 
00472 /**
00473  * Check output buffer space
00474  * @param rb Output ring buffer
00475  * @param r_length Remaining length after fixed header
00476  * @return 1 if message will fit, 0 if not enough buffer space
00477  */
00478 static u8_t
00479 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
00480 {
00481   /* Start with length of type byte + remaining length */
00482   u16_t total_len = 1 + r_length;
00483 
00484   LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
00485 
00486  /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
00487   do {
00488     total_len++;
00489     r_length >>= 7;
00490   } while (r_length > 0);
00491 
00492   return (total_len <= mqtt_ringbuf_free(rb));
00493 }
00494 
00495 
00496 /**
00497  * Close connection to server
00498  * @param client MQTT client
00499  * @param reason Reason for disconnection
00500  */
00501 static void
00502 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
00503 {
00504   LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
00505 
00506   /* Bring down TCP connection if not already done */
00507   if (client->conn != NULL) {
00508     err_t res;
00509     tcp_recv(client->conn, NULL);
00510     tcp_err(client->conn,  NULL);
00511     tcp_sent(client->conn, NULL);
00512     res = tcp_close(client->conn);
00513     if (res != ERR_OK) {
00514       tcp_abort(client->conn);
00515       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
00516     }
00517     client->conn = NULL;
00518   }
00519 
00520   /* Remove all pending requests */
00521   mqtt_clear_requests(&client->pend_req_queue);
00522   /* Stop cyclic timer */
00523   sys_untimeout(mqtt_cyclic_timer, client);
00524 
00525   /* Notify upper layer of disconnection if changed state */
00526   if (client->conn_state != TCP_DISCONNECTED) {
00527 
00528     client->conn_state = TCP_DISCONNECTED;
00529     if (client->connect_cb != NULL) {
00530       client->connect_cb(client, client->connect_arg, reason);
00531     }
00532   }
00533 }
00534 
00535 
00536 /**
00537  * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
00538  * @param arg MQTT client
00539  */
00540 static void
00541 mqtt_cyclic_timer(void *arg)
00542 {
00543   u8_t restart_timer = 1;
00544   mqtt_client_t *client = (mqtt_client_t *)arg;
00545   LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
00546 
00547   if (client->conn_state == MQTT_CONNECTING) {
00548     client->cyclic_tick++;
00549     if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
00550       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
00551       /* Disconnect TCP */
00552       mqtt_close(client, MQTT_CONNECT_TIMEOUT);
00553       restart_timer = 0;
00554     }
00555   } else if (client->conn_state == MQTT_CONNECTED) {
00556     /* Handle timeout for pending requests */
00557     mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
00558 
00559     /* keep_alive > 0 means keep alive functionality shall be used */
00560     if (client->keep_alive > 0) {
00561 
00562       client->server_watchdog++;
00563       /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
00564       if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) {
00565         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
00566         mqtt_close(client, MQTT_CONNECT_TIMEOUT);
00567         restart_timer = 0;
00568       }
00569 
00570       /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
00571       if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
00572         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
00573         if (mqtt_output_check_space(&client->output, 0) != 0) {
00574           mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
00575           client->cyclic_tick = 0;
00576         }
00577       } else {
00578         client->cyclic_tick++;
00579       }
00580     }
00581   } else {
00582     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
00583     restart_timer = 0;
00584   }
00585   if (restart_timer) {
00586     sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg);
00587   }
00588 }
00589 
00590 
00591 /**
00592  * Send PUBACK, PUBREC or PUBREL response message
00593  * @param client MQTT client
00594  * @param msg PUBACK, PUBREC or PUBREL
00595  * @param pkt_id Packet identifier
00596  * @param qos QoS value
00597  * @return ERR_OK if successful, ERR_MEM if out of memory
00598  */
00599 static err_t
00600 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
00601 {
00602   err_t err = ERR_OK;
00603   if (mqtt_output_check_space(&client->output, 2)) {
00604     mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
00605     mqtt_output_append_u16(&client->output, pkt_id);
00606     mqtt_output_send(&client->output, client->conn);
00607   } else {
00608     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
00609                                   mqtt_msg_type_to_str(msg), pkt_id));
00610     err = ERR_MEM;
00611   }
00612   return err;
00613 }
00614 
00615 /**
00616  * Subscribe response from server
00617  * @param r Matching request
00618  * @param result Result code from server
00619  */
00620 static void
00621 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
00622 {
00623   if (r->cb != NULL) {
00624     r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
00625   }
00626 }
00627 
00628 
00629 /**
00630  * Complete MQTT message received or buffer full
00631  * @param client MQTT client
00632  * @param fixed_hdr_idx header index
00633  * @param length length received part
00634  * @param remaining_length Remaining length of complete message
00635  */
00636 static mqtt_connection_status_t
00637 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
00638 {
00639   mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
00640 
00641   u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
00642 
00643   /* Control packet type */
00644   u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
00645   u16_t pkt_id = 0;
00646 
00647   if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
00648     if (client->conn_state == MQTT_CONNECTING) {
00649       /* Get result code from CONNACK */
00650       res = (mqtt_connection_status_t)var_hdr_payload[1];
00651       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
00652       if (res == MQTT_CONNECT_ACCEPTED) {
00653         /* Reset cyclic_tick when changing to connected state */
00654         client->cyclic_tick = 0;
00655         client->conn_state = MQTT_CONNECTED;
00656         /* Notify upper layer */
00657         if (client->connect_cb != 0) {
00658           client->connect_cb(client, client->connect_arg, res);
00659         }
00660       }
00661     } else {
00662       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n"));
00663     }
00664   } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
00665     LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n"));
00666 
00667   } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
00668     u16_t payload_offset = 0;
00669     u16_t payload_length = length;
00670     u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
00671 
00672     if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
00673       /* Should have topic and pkt id*/
00674       uint8_t *topic;
00675       uint16_t after_topic;
00676       u8_t bkp;
00677       u16_t topic_len = var_hdr_payload[0];
00678       topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
00679 
00680       topic = var_hdr_payload + 2;
00681       after_topic = 2 + topic_len;
00682       /* Check length, add one byte even for QoS 0 so that zero termination will fit */
00683       if ((after_topic + qos ? 2 : 1) > length) {
00684         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
00685         goto out_disconnect;
00686       }
00687 
00688       /* id for QoS 1 and 2 */
00689       if (qos > 0) {
00690         client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
00691         after_topic += 2;
00692       } else {
00693         client->inpub_pkt_id = 0;
00694       }
00695       /* Take backup of byte after topic */
00696       bkp = topic[topic_len];
00697       /* Zero terminate string */
00698       topic[topic_len] = 0;
00699       /* Payload data remaining in receive buffer */
00700       payload_length = length - after_topic;
00701       payload_offset = after_topic;
00702 
00703       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
00704                                     qos, topic, remaining_length + payload_length));
00705       if (client->pub_cb != NULL) {
00706         client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
00707       }
00708       /* Restore byte after topic */
00709       topic[topic_len] = bkp;
00710     }
00711     if (payload_length > 0 || remaining_length == 0) {
00712       client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
00713       /* Reply if QoS > 0 */
00714       if (remaining_length == 0 && qos > 0) {
00715         /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
00716         u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
00717         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
00718                                       mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
00719         pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
00720       }
00721     }
00722   } else {
00723     /* Get packet identifier */
00724     pkt_id = (u16_t)var_hdr_payload[0] << 8;
00725     pkt_id |= (u16_t)var_hdr_payload[1];
00726     if (pkt_id == 0) {
00727       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
00728       goto out_disconnect;
00729     }
00730     if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
00731       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id));
00732       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
00733 
00734     } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
00735       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id));
00736       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
00737 
00738     } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
00739               pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
00740       struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
00741       if (r != NULL) {
00742         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
00743         if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
00744           if (length < 3) {
00745             LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n"));
00746             goto out_disconnect;
00747           } else {
00748             mqtt_incomming_suback(r, var_hdr_payload[2]);
00749           }
00750         } else if (r->cb != NULL) {
00751           r->cb(r->arg, ERR_OK);
00752         }
00753         mqtt_delete_request(r);
00754       } else {
00755         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
00756       }
00757     } else {
00758       LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
00759       goto out_disconnect;
00760     }
00761   }
00762   return res;
00763 out_disconnect:
00764   return MQTT_CONNECT_DISCONNECTED;
00765 }
00766 
00767 
00768 /**
00769  * MQTT incoming message parser
00770  * @param client MQTT client
00771  * @param p PBUF chain of received data
00772  * @return Connection status
00773  */
00774 static mqtt_connection_status_t
00775 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
00776 {
00777   u16_t in_offset = 0;
00778   u32_t msg_rem_len = 0;
00779   u8_t fixed_hdr_idx = 0;
00780   u8_t b = 0;
00781 
00782   while (p->tot_len > in_offset) {
00783     if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
00784 
00785       if (fixed_hdr_idx < client->msg_idx) {
00786         b = client->rx_buffer[fixed_hdr_idx];
00787       } else {
00788         b = pbuf_get_at(p, in_offset++);
00789         client->rx_buffer[client->msg_idx++] = b;
00790       }
00791       fixed_hdr_idx++;
00792 
00793       if (fixed_hdr_idx >= 2) {
00794         msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
00795         if ((b & 0x80) == 0) {
00796           LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
00797           if (msg_rem_len == 0) {
00798             /* Complete message with no extra headers of payload received */
00799             mqtt_message_received(client, fixed_hdr_idx, 0, 0);
00800             client->msg_idx = 0;
00801             fixed_hdr_idx = 0;
00802           } else {
00803             /* Bytes remaining in message */
00804             msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
00805           }
00806         }
00807       }
00808     } else {
00809       u16_t cpy_len, cpy_start, buffer_space;
00810 
00811       cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
00812 
00813       /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
00814       cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
00815 
00816       /* Limit to available space in buffer */
00817       buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
00818       if (cpy_len > buffer_space) {
00819         cpy_len = buffer_space;
00820       }
00821       pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset);
00822 
00823       /* Advance get and put indexes  */
00824       client->msg_idx += cpy_len;
00825       in_offset += cpy_len;
00826       msg_rem_len -= cpy_len;
00827 
00828       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
00829       if (msg_rem_len == 0 || cpy_len == buffer_space) {
00830         /* Whole message received or buffer is full */
00831         mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
00832         if (res != MQTT_CONNECT_ACCEPTED) {
00833           return res;
00834         }
00835         if (msg_rem_len == 0) {
00836           /* Reset parser state */
00837           client->msg_idx = 0;
00838           /* msg_tot_len = 0; */
00839           fixed_hdr_idx = 0;
00840         }
00841       }
00842     }
00843   }
00844   return MQTT_CONNECT_ACCEPTED;
00845 }
00846 
00847 
00848 /**
00849  * TCP received callback function. @see tcp_recv_fn
00850  * @param arg MQTT client
00851  * @param p PBUF chain of received data
00852  * @param err Passed as return value if not ERR_OK
00853  * @return ERR_OK or err passed into callback
00854  */
00855 static err_t
00856 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
00857 {
00858   mqtt_client_t *client = (mqtt_client_t *)arg;
00859   LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
00860   LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
00861 
00862   if (p == NULL) {
00863     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
00864     mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
00865   } else {
00866     mqtt_connection_status_t res;
00867     if (err != ERR_OK) {
00868       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err));
00869       pbuf_free(p);
00870       return err;
00871     }
00872 
00873     /* Tell remote that data has been received */
00874     tcp_recved(pcb, p->tot_len);
00875     res = mqtt_parse_incoming(client, p);
00876     pbuf_free(p);
00877 
00878     if (res != MQTT_CONNECT_ACCEPTED) {
00879       mqtt_close(client, res);
00880     }
00881     /* If keep alive functionality is used */
00882     if (client->keep_alive != 0) {
00883       /* Reset server alive watchdog */
00884       client->server_watchdog = 0;
00885     }
00886 
00887   }
00888   return ERR_OK;
00889 }
00890 
00891 
00892 /**
00893  * TCP data sent callback function. @see tcp_sent_fn
00894  * @param arg MQTT client
00895  * @param tpcb TCP connection handle
00896  * @param len Number of bytes sent
00897  * @return ERR_OK
00898  */
00899 static err_t
00900 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
00901 {
00902   mqtt_client_t *client = (mqtt_client_t *)arg;
00903 
00904   LWIP_UNUSED_ARG(tpcb);
00905   LWIP_UNUSED_ARG(len);
00906 
00907   if (client->conn_state == MQTT_CONNECTED) {
00908     struct mqtt_request_t *r;
00909 
00910     /* Reset keep-alive send timer and server watchdog */
00911     client->cyclic_tick = 0;
00912     client->server_watchdog = 0;
00913     /* QoS 0 publish has no response from server, so call its callbacks here */
00914     while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
00915       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
00916       if (r->cb != NULL) {
00917         r->cb(r->arg, ERR_OK);
00918       }
00919       mqtt_delete_request(r);
00920     }
00921     /* Try send any remaining buffers from output queue */
00922     mqtt_output_send(&client->output, client->conn);
00923   }
00924   return ERR_OK;
00925 }
00926 
00927 /**
00928  * TCP error callback function. @see tcp_err_fn
00929  * @param arg MQTT client
00930  * @param err Error encountered
00931  */
00932 static void
00933 mqtt_tcp_err_cb(void *arg, err_t err)
00934 {
00935   mqtt_client_t *client = (mqtt_client_t *)arg;
00936   LWIP_UNUSED_ARG(err); /* only used for debug output */
00937   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
00938   LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
00939   /* Set conn to null before calling close as pcb is already deallocated*/
00940   client->conn = 0;
00941   mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
00942 }
00943 
00944 /**
00945  * TCP poll callback function. @see tcp_poll_fn
00946  * @param arg MQTT client
00947  * @param tpcb TCP connection handle
00948  * @return err ERR_OK
00949  */
00950 static err_t
00951 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
00952 {
00953   mqtt_client_t *client = (mqtt_client_t *)arg;
00954   if (client->conn_state == MQTT_CONNECTED) {
00955     /* Try send any remaining buffers from output queue */
00956     mqtt_output_send(&client->output, tpcb);
00957   }
00958   return ERR_OK;
00959 }
00960 
00961 /**
00962  * TCP connect callback function. @see tcp_connected_fn
00963  * @param arg MQTT client
00964  * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
00965  * @return ERR_OK
00966  */
00967 static err_t
00968 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
00969 {
00970   mqtt_client_t* client = (mqtt_client_t *)arg;
00971 
00972   if (err != ERR_OK) {
00973     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
00974     return err;
00975   }
00976 
00977   /* Initiate receiver state */
00978   client->msg_idx = 0;
00979 
00980   /* Setup TCP callbacks */
00981   tcp_recv(tpcb, mqtt_tcp_recv_cb);
00982   tcp_sent(tpcb, mqtt_tcp_sent_cb);
00983   tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
00984 
00985   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
00986   /* Enter MQTT connect state */
00987   client->conn_state = MQTT_CONNECTING;
00988 
00989   /* Start cyclic timer */
00990   sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client);
00991   client->cyclic_tick = 0;
00992 
00993   /* Start transmission from output queue, connect message is the first one out*/
00994   mqtt_output_send(&client->output, client->conn);
00995 
00996   return ERR_OK;
00997 }
00998 
00999 
01000 
01001 /*---------------------------------------------------------------------------------------------------- */
01002 /* Public API */
01003 
01004 
01005 /**
01006  * @ingroup mqtt
01007  * MQTT publish function.
01008  * @param client MQTT client
01009  * @param topic Publish topic string
01010  * @param payload Data to publish (NULL is allowed)
01011  * @param payload_length: Length of payload (0 is allowed)
01012  * @param qos Quality of service, 0 1 or 2
01013  * @param retain MQTT retain flag
01014  * @param cb Callback to call when publish is complete or has timed out
01015  * @param arg User supplied argument to publish callback
01016  * @return ERR_OK if successful
01017  *         ERR_CONN if client is disconnected
01018  *         ERR_MEM if short on memory
01019  */
01020 err_t
01021 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
01022              mqtt_request_cb_t cb, void *arg)
01023 {
01024   struct mqtt_request_t *r;
01025   u16_t pkt_id;
01026   size_t topic_strlen;
01027   size_t total_len;
01028   u16_t topic_len;
01029   u16_t remaining_length;
01030 
01031   LWIP_ASSERT("mqtt_publish: client != NULL", client);
01032   LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
01033   LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
01034 
01035   topic_strlen = strlen(topic);
01036   LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
01037   topic_len = (u16_t)topic_strlen;
01038   total_len = 2 + topic_len + payload_length;
01039   LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
01040   remaining_length = (u16_t)total_len;
01041 
01042   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
01043 
01044   if (qos > 0) {
01045     remaining_length += 2;
01046     /* Generate pkt_id id for QoS1 and 2 */
01047     pkt_id = msg_generate_packet_id(client);
01048   } else {
01049     /* Use reserved value pkt_id 0 for QoS 0 in request handle */
01050     pkt_id = 0;
01051   }
01052 
01053   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
01054   if (r == NULL) {
01055     return ERR_MEM;
01056   }
01057 
01058   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01059     mqtt_delete_request(r);
01060     return ERR_MEM;
01061   }
01062   /* Append fixed header */
01063   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
01064 
01065   /* Append Topic */
01066   mqtt_output_append_string(&client->output, topic, topic_len);
01067 
01068   /* Append packet if for QoS 1 and 2*/
01069   if (qos > 0) {
01070     mqtt_output_append_u16(&client->output, pkt_id);
01071   }
01072 
01073   /* Append optional publish payload */
01074   if ((payload != NULL) && (payload_length > 0)) {
01075     mqtt_output_append_buf(&client->output, payload, payload_length);
01076   }
01077 
01078   mqtt_append_request(&client->pend_req_queue, r);
01079   mqtt_output_send(&client->output, client->conn);
01080   return ERR_OK;
01081 }
01082 
01083 
01084 /**
01085  * @ingroup mqtt
01086  * MQTT subscribe/unsubscribe function.
01087  * @param client MQTT client
01088  * @param topic topic to subscribe to
01089  * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
01090  * @param cb Callback to call when subscribe/unsubscribe reponse is received
01091  * @param arg User supplied argument to publish callback
01092  * @param sub 1 for subscribe, 0 for unsubscribe
01093  * @return ERR_OK if successful, @see err_t enum for other results
01094  */
01095 err_t
01096 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
01097 {
01098   size_t topic_strlen;
01099   size_t total_len;
01100   u16_t topic_len;
01101   u16_t remaining_length;
01102   u16_t pkt_id;
01103   struct mqtt_request_t *r;
01104 
01105   LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
01106   LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
01107 
01108   topic_strlen = strlen(topic);
01109   LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
01110   topic_len = (u16_t)topic_strlen;
01111   /* Topic string, pkt_id, qos for subscribe */
01112   total_len =  topic_len + 2 + 2 + (sub != 0);
01113   LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
01114   remaining_length = (u16_t)total_len;
01115 
01116   LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
01117   if (client->conn_state == TCP_DISCONNECTED) {
01118     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
01119     return ERR_CONN;
01120   }
01121 
01122   pkt_id = msg_generate_packet_id(client);
01123   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
01124   if (r == NULL) {
01125     return ERR_MEM;
01126   }
01127 
01128   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01129     mqtt_delete_request(r);
01130     return ERR_MEM;
01131   }
01132 
01133   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
01134 
01135   mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
01136   /* Packet id */
01137   mqtt_output_append_u16(&client->output, pkt_id);
01138   /* Topic */
01139   mqtt_output_append_string(&client->output, topic, topic_len);
01140   /* QoS */
01141   if (sub != 0) {
01142     mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
01143   }
01144 
01145   mqtt_append_request(&client->pend_req_queue, r);
01146   mqtt_output_send(&client->output, client->conn);
01147   return ERR_OK;
01148 }
01149 
01150 
01151 /**
01152  * @ingroup mqtt
01153  * Set callback to handle incoming publish requests from server
01154  * @param client MQTT client
01155  * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
01156  * @param data_cb Callback for each fragment of payload that arrives
01157  * @param arg User supplied argument to both callbacks
01158  */
01159 void
01160 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
01161                              mqtt_incoming_data_cb_t data_cb, void *arg)
01162 {
01163   LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
01164   client->data_cb = data_cb;
01165   client->pub_cb = pub_cb;
01166   client->inpub_arg = arg;
01167 }
01168 
01169 /**
01170  * @ingroup mqtt
01171  * Create a new MQTT client instance
01172  * @return Pointer to instance on success, NULL otherwise
01173  */
01174 mqtt_client_t *
01175 mqtt_client_new(void)
01176 {
01177   mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t));
01178   if (client != NULL) {
01179     memset(client, 0, sizeof(mqtt_client_t));
01180   }
01181   return client;
01182 }
01183 
01184 
01185 /**
01186  * @ingroup mqtt
01187  * Connect to MQTT server
01188  * @param client MQTT client
01189  * @param ip_addr Server IP
01190  * @param port Server port
01191  * @param cb Connection state change callback
01192  * @param arg User supplied argument to connection callback
01193  * @param client_info Client identification and connection options
01194  * @return ERR_OK if successful, @see err_t enum for other results
01195  */
01196 err_t
01197 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
01198                     const struct mqtt_connect_client_info_t *client_info)
01199 {
01200   err_t err;
01201   size_t len;
01202   u16_t client_id_length;
01203   /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
01204   u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
01205   u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
01206 
01207   LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
01208   LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
01209   LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
01210   LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
01211 
01212   if (client->conn_state != TCP_DISCONNECTED) {
01213     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
01214     return ERR_ISCONN;
01215   }
01216 
01217   /* Wipe clean */
01218   memset(client, 0, sizeof(mqtt_client_t));
01219   client->connect_arg = arg;
01220   client->connect_cb = cb;
01221   client->keep_alive = client_info->keep_alive;
01222   mqtt_init_requests(client->req_list);
01223 
01224   /* Build connect message */
01225   if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
01226     flags |= MQTT_CONNECT_FLAG_WILL;
01227     flags |= (client_info->will_qos & 3) << 3;
01228     if (client_info->will_retain) {
01229       flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
01230     }
01231     len = strlen(client_info->will_topic);
01232     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
01233     will_topic_len = (u8_t)len;
01234     len = strlen(client_info->will_msg);
01235     LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
01236     will_msg_len = (u8_t)len;
01237     len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
01238     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01239     remaining_length = (u16_t)len;
01240   }
01241 
01242   /* Don't complicate things, always connect using clean session */
01243   flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
01244 
01245   len = strlen(client_info->client_id);
01246   LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
01247   client_id_length = (u16_t)len;
01248   len = remaining_length + 2 + client_id_length;
01249   LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01250   remaining_length = (u16_t)len;
01251 
01252   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01253     return ERR_MEM;
01254   }
01255 
01256   client->conn = tcp_new();
01257   if (client->conn == NULL) {
01258     return ERR_MEM;
01259   }
01260 
01261   /* Set arg pointer for callbacks */
01262   tcp_arg(client->conn, client);
01263   /* Any local address, pick random local port number */
01264   err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
01265   if (err != ERR_OK) {
01266     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
01267     goto tcp_fail;
01268   }
01269   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
01270 
01271   /* Connect to server */
01272   err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
01273   if (err != ERR_OK) {
01274     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
01275     goto tcp_fail;
01276   }
01277   /* Set error callback */
01278   tcp_err(client->conn, mqtt_tcp_err_cb);
01279   client->conn_state = TCP_CONNECTING;
01280 
01281   /* Append fixed header */
01282   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
01283   /* Append Protocol string */
01284   mqtt_output_append_string(&client->output, "MQTT", 4);
01285   /* Append Protocol level */
01286   mqtt_output_append_u8(&client->output, 4);
01287   /* Append connect flags */
01288   mqtt_output_append_u8(&client->output, flags);
01289   /* Append keep-alive */
01290   mqtt_output_append_u16(&client->output, client_info->keep_alive);
01291   /* Append client id */
01292   mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
01293   /* Append will message if used */
01294   if (will_topic_len > 0) {
01295     mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
01296     mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
01297   }
01298   return ERR_OK;
01299 
01300 tcp_fail:
01301   tcp_abort(client->conn);
01302   client->conn = NULL;
01303   return err;
01304 }
01305 
01306 
01307 /**
01308  * @ingroup mqtt
01309  * Disconnect from MQTT server
01310  * @param client MQTT client
01311  */
01312 void
01313 mqtt_disconnect(mqtt_client_t *client)
01314 {
01315   LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
01316   /* If connection in not already closed */
01317   if (client->conn_state != TCP_DISCONNECTED) {
01318     /* Set conn_state before calling mqtt_close to prevent callback from being called */
01319     client->conn_state = TCP_DISCONNECTED;
01320     mqtt_close(client, (mqtt_connection_status_t)0);
01321   }
01322 }
01323 
01324 /**
01325  * @ingroup mqtt
01326  * Check connection with server
01327  * @param client MQTT client
01328  * @return 1 if connected to server, 0 otherwise
01329  */
01330 u8_t
01331 mqtt_client_is_connected(mqtt_client_t *client)
01332 {
01333   LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
01334   return client->conn_state == MQTT_CONNECTED;
01335 }