Rtos API example

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers lwip_mqtt.c Source File

lwip_mqtt.c

Go to the documentation of this file.
00001 /**
00002  * @file
00003  * MQTT client
00004  *
00005  * @defgroup mqtt MQTT client
00006  * @ingroup apps
00007  * @verbinclude mqtt_client.txt
00008  */
00009 
00010 /*
00011  * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
00012  * All rights reserved.
00013  *
00014  * Redistribution and use in source and binary forms, with or without modification,
00015  * are permitted provided that the following conditions are met:
00016  *
00017  * 1. Redistributions of source code must retain the above copyright notice,
00018  *    this list of conditions and the following disclaimer.
00019  * 2. Redistributions in binary form must reproduce the above copyright notice,
00020  *    this list of conditions and the following disclaimer in the documentation
00021  *    and/or other materials provided with the distribution.
00022  * 3. The name of the author may not be used to endorse or promote products
00023  *    derived from this software without specific prior written permission.
00024  *
00025  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
00026  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
00027  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
00028  * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00029  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
00030  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00031  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00032  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
00033  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
00034  * OF SUCH DAMAGE.
00035  *
00036  * This file is part of the lwIP TCP/IP stack
00037  *
00038  * Author: Erik Andersson <erian747@gmail.com>
00039  *
00040  *
00041  * @todo:
00042  * - Handle large outgoing payloads for PUBLISH messages
00043  * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
00044  * - Add support for legacy MQTT protocol version
00045  *
00046  * Please coordinate changes and requests with Erik Andersson
00047  * Erik Andersson <erian747@gmail.com>
00048  *
00049  */
00050 #include "lwip/apps/mqtt.h"
00051 #include "lwip/timeouts.h"
00052 #include "lwip/ip_addr.h"
00053 #include "lwip/mem.h"
00054 #include "lwip/err.h"
00055 #include "lwip/pbuf.h"
00056 #include "lwip/tcp.h"
00057 #include <string.h>
00058 
00059 #if LWIP_TCP && LWIP_CALLBACK_API
00060 
00061 /**
00062  * MQTT_DEBUG: Default is off.
00063  */
00064 #if !defined MQTT_DEBUG || defined __DOXYGEN__
00065 #define MQTT_DEBUG                  LWIP_DBG_OFF
00066 #endif
00067 
00068 #define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
00069 #define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
00070 #define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
00071 #define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
00072 #define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
00073 
00074 static void mqtt_cyclic_timer(void *arg);
00075 
00076 /**
00077  * MQTT client connection states
00078  */
00079 enum {
00080   TCP_DISCONNECTED,
00081   TCP_CONNECTING,
00082   MQTT_CONNECTING,
00083   MQTT_CONNECTED
00084 };
00085 
00086 /**
00087  * MQTT control message types
00088  */
00089 enum mqtt_message_type {
00090   MQTT_MSG_TYPE_CONNECT = 1,
00091   MQTT_MSG_TYPE_CONNACK = 2,
00092   MQTT_MSG_TYPE_PUBLISH = 3,
00093   MQTT_MSG_TYPE_PUBACK = 4,
00094   MQTT_MSG_TYPE_PUBREC = 5,
00095   MQTT_MSG_TYPE_PUBREL = 6,
00096   MQTT_MSG_TYPE_PUBCOMP = 7,
00097   MQTT_MSG_TYPE_SUBSCRIBE = 8,
00098   MQTT_MSG_TYPE_SUBACK = 9,
00099   MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
00100   MQTT_MSG_TYPE_UNSUBACK = 11,
00101   MQTT_MSG_TYPE_PINGREQ = 12,
00102   MQTT_MSG_TYPE_PINGRESP = 13,
00103   MQTT_MSG_TYPE_DISCONNECT = 14
00104 };
00105 
00106 /** Helpers to extract control packet type and qos from first byte in fixed header */
00107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
00108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
00109 
00110 /**
00111  * MQTT connect flags, only used in CONNECT message
00112  */
00113 enum mqtt_connect_flag {
00114   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
00115   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
00116   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
00117   MQTT_CONNECT_FLAG_WILL = 1 << 2,
00118   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
00119 };
00120 
00121 
00122 #if defined(LWIP_DEBUG)
00123 static const char * const mqtt_message_type_str[15] =
00124 {
00125   "UNDEFINED",
00126   "CONNECT",
00127   "CONNACK",
00128   "PUBLISH",
00129   "PUBACK",
00130   "PUBREC",
00131   "PUBREL",
00132   "PUBCOMP",
00133   "SUBSCRIBE",
00134   "SUBACK",
00135   "UNSUBSCRIBE",
00136   "UNSUBACK",
00137   "PINGREQ",
00138   "PINGRESP",
00139   "DISCONNECT"
00140 };
00141 
00142 /**
00143  * Message type value to string
00144  * @param msg_type see enum mqtt_message_type
00145  * 
00146  * @return Control message type text string
00147  */
00148 static const char *
00149 mqtt_msg_type_to_str(u8_t msg_type)
00150 {
00151   if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
00152     msg_type = 0;
00153   }
00154   return mqtt_message_type_str[msg_type];
00155 }
00156 
00157 #endif
00158 
00159 
00160 /**
00161  * Generate MQTT packet identifier
00162  * @param client MQTT client
00163  * @return New packet identifier, range 1 to 65535
00164  */
00165 static u16_t
00166 msg_generate_packet_id(mqtt_client_t *client)
00167 {
00168   client->pkt_id_seq++;
00169   if (client->pkt_id_seq == 0) {
00170     client->pkt_id_seq++;
00171   }
00172   return client->pkt_id_seq;
00173 }
00174 
00175 /*--------------------------------------------------------------------------------------------------------------------- */
00176 /* Output ring buffer */
00177 
00178 
00179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1)
00180 
00181 /** Add single item to ring buffer */
00182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item)
00183 
00184 /** Return number of bytes in ring buffer */
00185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get))
00186 
00187 /** Return number of bytes free in ring buffer */
00188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
00189 
00190 /** Return number of bytes possible to read without wrapping around */
00191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK)))
00192 
00193 /** Return pointer to ring buffer get position */
00194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK])
00195 
00196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len))
00197 
00198 
00199 /**
00200  * Try send as many bytes as possible from output ring buffer
00201  * @param rb Output ring buffer
00202  * @param tpcb TCP connection handle
00203  */
00204 static void
00205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
00206 {
00207   err_t err;
00208   u8_t wrap = 0;
00209   u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
00210   u16_t send_len = tcp_sndbuf(tpcb);
00211   LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
00212 
00213   if (send_len == 0 || ringbuf_lin_len == 0) {
00214     return;
00215   }
00216 
00217   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
00218                                 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK)));
00219 
00220   if (send_len > ringbuf_lin_len) {
00221     /* Space in TCP output buffer is larger than available in ring buffer linear portion */
00222     send_len = ringbuf_lin_len;
00223     /* Wrap around if more data in ring buffer after linear portion */
00224     wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
00225   }
00226   err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
00227   if ((err == ERR_OK) && wrap) {
00228     mqtt_ringbuf_advance_get_idx(rb, send_len);
00229     /* Use the lesser one of ring buffer linear length and TCP send buffer size */
00230     send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
00231     err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
00232   }
00233 
00234   if (err == ERR_OK) {
00235     mqtt_ringbuf_advance_get_idx(rb, send_len);
00236     /* Flush */
00237     tcp_output(tpcb);
00238   } else {
00239     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
00240   }
00241 }
00242 
00243 
00244 
00245 /*--------------------------------------------------------------------------------------------------------------------- */
00246 /* Request queue */
00247 
00248 /**
00249  * Create request item
00250  * @param r_objs Pointer to request objects
00251  * @param pkt_id Packet identifier of request
00252  * @param cb Packet callback to call when requests lifetime ends
00253  * @param arg Parameter following callback
00254  * @return Request or NULL if failed to create
00255  */
00256 static struct mqtt_request_t *
00257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
00258 {
00259   struct mqtt_request_t *r = NULL;
00260   u8_t n;
00261   LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
00262   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
00263     /* Item point to itself if not in use */
00264     if (r_objs[n].next == &r_objs[n]) {
00265       r = &r_objs[n];
00266       r->next = NULL;
00267       r->cb = cb;
00268       r->arg = arg;
00269       r->pkt_id = pkt_id;
00270       break;
00271     }
00272   }
00273   return r;
00274 }
00275 
00276 
00277 /**
00278  * Append request to pending request queue
00279  * @param tail Pointer to request queue tail pointer
00280  * @param r Request to append
00281  */
00282 static void
00283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
00284 {
00285   struct mqtt_request_t *head = NULL;
00286   s16_t time_before = 0;
00287   struct mqtt_request_t *iter;
00288 
00289   LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
00290 
00291   /* Iterate trough queue to find head, and count total timeout time */
00292   for (iter = *tail; iter != NULL; iter = iter->next) {
00293     time_before += iter->timeout_diff;
00294     head = iter;
00295   }
00296 
00297   LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
00298   r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
00299   if (head == NULL) {
00300     *tail = r;
00301   } else {
00302     head->next = r;
00303   }
00304 }
00305 
00306 
00307 /**
00308  * Delete request item
00309  * @param r Request item to delete
00310  */
00311 static void
00312 mqtt_delete_request(struct mqtt_request_t *r)
00313 {
00314   if (r != NULL) {
00315     r->next = r;
00316   }
00317 }
00318 
00319 /**
00320  * Remove a request item with a specific packet identifier from request queue
00321  * @param tail Pointer to request queue tail pointer
00322  * @param pkt_id Packet identifier of request to take
00323  * @return Request item if found, NULL if not
00324  */
00325 static struct mqtt_request_t *
00326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
00327 {
00328   struct mqtt_request_t *iter = NULL, *prev = NULL;
00329   LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
00330   /* Search all request for pkt_id */
00331   for (iter = *tail; iter != NULL; iter = iter->next) {
00332     if (iter->pkt_id == pkt_id) {
00333       break;
00334     }
00335     prev = iter;
00336   }
00337 
00338   /* If request was found */
00339   if (iter != NULL) {
00340     /* unchain */
00341     if (prev == NULL) {
00342       *tail= iter->next;
00343     } else {
00344       prev->next = iter->next;
00345     }
00346     /* If exists, add remaining timeout time for the request to next */
00347     if (iter->next != NULL) {
00348       iter->next->timeout_diff += iter->timeout_diff;
00349     }
00350     iter->next = NULL;
00351   }
00352   return iter;
00353 }
00354 
00355 /**
00356  * Handle requests timeout
00357  * @param tail Pointer to request queue tail pointer
00358  * @param t Time since last call in seconds
00359  */
00360 static void
00361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
00362 {
00363   struct mqtt_request_t *r = *tail;
00364   LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
00365   while (t > 0 && r != NULL) {
00366     if (t >= r->timeout_diff) {
00367       t -= (u8_t)r->timeout_diff;
00368       /* Unchain */
00369       *tail = r->next;
00370       /* Notify upper layer about timeout */
00371       if (r->cb != NULL) {
00372         r->cb(r->arg, ERR_TIMEOUT);
00373       }
00374       mqtt_delete_request(r);
00375       /* Tail might be be modified in callback, so re-read it in every iteration */
00376       r = *(struct mqtt_request_t * const volatile *)tail;
00377     } else {
00378       r->timeout_diff -= t;
00379       t = 0;
00380     }
00381   }
00382 }
00383 
00384 /**
00385  * Free all request items
00386  * @param tail Pointer to request queue tail pointer
00387  */
00388 static void
00389 mqtt_clear_requests(struct mqtt_request_t **tail)
00390 {
00391   struct mqtt_request_t *iter, *next;
00392   LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
00393   for (iter = *tail; iter != NULL; iter = next) {
00394     next = iter->next;
00395     mqtt_delete_request(iter);
00396   }
00397   *tail = NULL;
00398 }
00399 /**
00400  * Initialize all request items
00401  * @param r_objs Pointer to request objects
00402  */
00403 static void
00404 mqtt_init_requests(struct mqtt_request_t *r_objs)
00405 {
00406   u8_t n;
00407   LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
00408   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
00409     /* Item pointing to itself indicates unused */
00410     r_objs[n].next = &r_objs[n];
00411   }
00412 }
00413 
00414 /*--------------------------------------------------------------------------------------------------------------------- */
00415 /* Output message build helpers */
00416 
00417 
00418 static void
00419 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
00420 {
00421   mqtt_ringbuf_put(rb, value);
00422 }
00423 
00424 static
00425 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
00426 {
00427   mqtt_ringbuf_put(rb, value >> 8);
00428   mqtt_ringbuf_put(rb, value & 0xff);
00429 }
00430 
00431 static void
00432 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
00433 {
00434   u16_t n;
00435   for (n = 0; n < length; n++) {
00436     mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
00437   }
00438 }
00439 
00440 static void
00441 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
00442 {
00443   u16_t n;
00444   mqtt_ringbuf_put(rb, length >> 8);
00445   mqtt_ringbuf_put(rb, length & 0xff);
00446   for (n = 0; n < length; n++) {
00447     mqtt_ringbuf_put(rb, str[n]);
00448   }
00449 }
00450 
00451 /**
00452  * Append fixed header
00453  * @param rb Output ring buffer
00454  * @param msg_type see enum mqtt_message_type
00455  * @param dup MQTT DUP flag
00456  * @param qos MQTT QoS field
00457  * @param retain MQTT retain flag
00458  * @param r_length Remaining length after fixed header
00459  */
00460 
00461 static void
00462 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
00463                  u8_t qos, u8_t retain, u16_t r_length)
00464 {
00465   /* Start with control byte */
00466   mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
00467   /* Encode remaining length field */
00468   do {
00469     mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
00470     r_length >>= 7;
00471   } while (r_length > 0);
00472 }
00473 
00474 
00475 /**
00476  * Check output buffer space
00477  * @param rb Output ring buffer
00478  * @param r_length Remaining length after fixed header
00479  * @return 1 if message will fit, 0 if not enough buffer space
00480  */
00481 static u8_t
00482 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
00483 {
00484   /* Start with length of type byte + remaining length */
00485   u16_t total_len = 1 + r_length;
00486 
00487   LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
00488 
00489  /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
00490   do {
00491     total_len++;
00492     r_length >>= 7;
00493   } while (r_length > 0);
00494 
00495   return (total_len <= mqtt_ringbuf_free(rb));
00496 }
00497 
00498 
00499 /**
00500  * Close connection to server
00501  * @param client MQTT client
00502  * @param reason Reason for disconnection
00503  */
00504 static void
00505 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
00506 {
00507   LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
00508 
00509   /* Bring down TCP connection if not already done */
00510   if (client->conn != NULL) {
00511     err_t res;
00512     tcp_recv(client->conn, NULL);
00513     tcp_err(client->conn,  NULL);
00514     tcp_sent(client->conn, NULL);
00515     res = tcp_close(client->conn);
00516     if (res != ERR_OK) {
00517       tcp_abort(client->conn);
00518       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
00519     }
00520     client->conn = NULL;
00521   }
00522 
00523   /* Remove all pending requests */
00524   mqtt_clear_requests(&client->pend_req_queue);
00525   /* Stop cyclic timer */
00526   sys_untimeout(mqtt_cyclic_timer, client);
00527 
00528   /* Notify upper layer of disconnection if changed state */
00529   if (client->conn_state != TCP_DISCONNECTED) {
00530 
00531     client->conn_state = TCP_DISCONNECTED;
00532     if (client->connect_cb != NULL) {
00533       client->connect_cb(client, client->connect_arg, reason);
00534     }
00535   }
00536 }
00537 
00538 
00539 /**
00540  * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
00541  * @param arg MQTT client
00542  */
00543 static void
00544 mqtt_cyclic_timer(void *arg)
00545 {
00546   u8_t restart_timer = 1;
00547   mqtt_client_t *client = (mqtt_client_t *)arg;
00548   LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
00549 
00550   if (client->conn_state == MQTT_CONNECTING) {
00551     client->cyclic_tick++;
00552     if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
00553       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
00554       /* Disconnect TCP */
00555       mqtt_close(client, MQTT_CONNECT_TIMEOUT);
00556       restart_timer = 0;
00557     }
00558   } else if (client->conn_state == MQTT_CONNECTED) {
00559     /* Handle timeout for pending requests */
00560     mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
00561 
00562     /* keep_alive > 0 means keep alive functionality shall be used */
00563     if (client->keep_alive > 0) {
00564 
00565       client->server_watchdog++;
00566       /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
00567       if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) {
00568         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
00569         mqtt_close(client, MQTT_CONNECT_TIMEOUT);
00570         restart_timer = 0;
00571       }
00572 
00573       /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
00574       if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
00575         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
00576         if (mqtt_output_check_space(&client->output, 0) != 0) {
00577           mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
00578           client->cyclic_tick = 0;
00579         }
00580       } else {
00581         client->cyclic_tick++;
00582       }
00583     }
00584   } else {
00585     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
00586     restart_timer = 0;
00587   }
00588   if (restart_timer) {
00589     sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg);
00590   }
00591 }
00592 
00593 
00594 /**
00595  * Send PUBACK, PUBREC or PUBREL response message
00596  * @param client MQTT client
00597  * @param msg PUBACK, PUBREC or PUBREL
00598  * @param pkt_id Packet identifier
00599  * @param qos QoS value
00600  * @return ERR_OK if successful, ERR_MEM if out of memory
00601  */
00602 static err_t
00603 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
00604 {
00605   err_t err = ERR_OK;
00606   if (mqtt_output_check_space(&client->output, 2)) {
00607     mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
00608     mqtt_output_append_u16(&client->output, pkt_id);
00609     mqtt_output_send(&client->output, client->conn);
00610   } else {
00611     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
00612                                   mqtt_msg_type_to_str(msg), pkt_id));
00613     err = ERR_MEM;
00614   }
00615   return err;
00616 }
00617 
00618 /**
00619  * Subscribe response from server
00620  * @param r Matching request
00621  * @param result Result code from server
00622  */
00623 static void
00624 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
00625 {
00626   if (r->cb != NULL) {
00627     r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
00628   }
00629 }
00630 
00631 
00632 /**
00633  * Complete MQTT message received or buffer full
00634  * @param client MQTT client
00635  * @param fixed_hdr_idx header index
00636  * @param length length received part
00637  * @param remaining_length Remaining length of complete message
00638  */
00639 static mqtt_connection_status_t
00640   mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
00641 {
00642   mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
00643 
00644   u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
00645 
00646   /* Control packet type */
00647   u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
00648   u16_t pkt_id = 0;
00649 
00650   if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
00651     if (client->conn_state == MQTT_CONNECTING) {
00652       /* Get result code from CONNACK */
00653       res = (mqtt_connection_status_t)var_hdr_payload[1];
00654       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
00655       if (res == MQTT_CONNECT_ACCEPTED) {
00656         /* Reset cyclic_tick when changing to connected state */
00657         client->cyclic_tick = 0;
00658         client->conn_state = MQTT_CONNECTED;
00659         /* Notify upper layer */
00660         if (client->connect_cb != 0) {
00661           client->connect_cb(client, client->connect_arg, res);
00662         }
00663       }
00664     } else {
00665       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n"));
00666     }
00667   } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
00668     LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n"));
00669 
00670   } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
00671     u16_t payload_offset = 0;
00672     u16_t payload_length = length;
00673     u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
00674 
00675     if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
00676       /* Should have topic and pkt id*/
00677       uint8_t *topic;
00678       uint16_t after_topic;
00679       u8_t bkp;
00680       u16_t topic_len = var_hdr_payload[0];
00681       topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
00682 
00683       topic = var_hdr_payload + 2;
00684       after_topic = 2 + topic_len;
00685       /* Check length, add one byte even for QoS 0 so that zero termination will fit */
00686       if ((after_topic + (qos? 2 : 1)) > length) {
00687         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
00688         goto out_disconnect;
00689       }
00690 
00691       /* id for QoS 1 and 2 */
00692       if (qos > 0) {
00693         client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
00694         after_topic += 2;
00695       } else {
00696         client->inpub_pkt_id = 0;
00697       }
00698       /* Take backup of byte after topic */
00699       bkp = topic[topic_len];
00700       /* Zero terminate string */
00701       topic[topic_len] = 0;
00702       /* Payload data remaining in receive buffer */
00703       payload_length = length - after_topic;
00704       payload_offset = after_topic;
00705 
00706       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
00707                                     qos, topic, remaining_length + payload_length));
00708       if (client->pub_cb != NULL) {
00709         client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
00710       }
00711       /* Restore byte after topic */
00712       topic[topic_len] = bkp;
00713     }
00714     if (payload_length > 0 || remaining_length == 0) {
00715       client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
00716       /* Reply if QoS > 0 */
00717       if (remaining_length == 0 && qos > 0) {
00718         /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
00719         u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
00720         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
00721                                       mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
00722         pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
00723       }
00724     }
00725   } else {
00726     /* Get packet identifier */
00727     pkt_id = (u16_t)var_hdr_payload[0] << 8;
00728     pkt_id |= (u16_t)var_hdr_payload[1];
00729     if (pkt_id == 0) {
00730       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
00731       goto out_disconnect;
00732     }
00733     if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
00734       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id));
00735       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
00736 
00737     } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
00738       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id));
00739       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
00740 
00741     } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
00742               pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
00743       struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
00744       if (r != NULL) {
00745         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
00746         if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
00747           if (length < 3) {
00748             LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n"));
00749             goto out_disconnect;
00750           } else {
00751             mqtt_incomming_suback(r, var_hdr_payload[2]);
00752           }
00753         } else if (r->cb != NULL) {
00754           r->cb(r->arg, ERR_OK);
00755         }
00756         mqtt_delete_request(r);
00757       } else {
00758         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
00759       }
00760     } else {
00761       LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
00762       goto out_disconnect;
00763     }
00764   }
00765   return res;
00766 out_disconnect:
00767   return MQTT_CONNECT_DISCONNECTED;
00768 }
00769 
00770 
00771 /**
00772  * MQTT incoming message parser
00773  * @param client MQTT client
00774  * @param p PBUF chain of received data
00775  * @return Connection status
00776  */
00777 static mqtt_connection_status_t
00778 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
00779 {
00780   u16_t in_offset = 0;
00781   u32_t msg_rem_len = 0;
00782   u8_t fixed_hdr_idx = 0;
00783   u8_t b = 0;
00784 
00785   while (p->tot_len > in_offset) {
00786     if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
00787 
00788       if (fixed_hdr_idx < client->msg_idx) {
00789         b = client->rx_buffer[fixed_hdr_idx];
00790       } else {
00791         b = pbuf_get_at(p, in_offset++);
00792         client->rx_buffer[client->msg_idx++] = b;
00793       }
00794       fixed_hdr_idx++;
00795 
00796       if (fixed_hdr_idx >= 2) {
00797         msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
00798         if ((b & 0x80) == 0) {
00799           LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
00800           if (msg_rem_len == 0) {
00801             /* Complete message with no extra headers of payload received */
00802             mqtt_message_received(client, fixed_hdr_idx, 0, 0);
00803             client->msg_idx = 0;
00804             fixed_hdr_idx = 0;
00805           } else {
00806             /* Bytes remaining in message */
00807             msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
00808           }
00809         }
00810       }
00811     } else {
00812       u16_t cpy_len, cpy_start, buffer_space;
00813 
00814       cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
00815 
00816       /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
00817       cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
00818 
00819       /* Limit to available space in buffer */
00820       buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
00821       if (cpy_len > buffer_space) {
00822         cpy_len = buffer_space;
00823       }
00824       pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset);
00825 
00826       /* Advance get and put indexes  */
00827       client->msg_idx += cpy_len;
00828       in_offset += cpy_len;
00829       msg_rem_len -= cpy_len;
00830 
00831       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
00832       if (msg_rem_len == 0 || cpy_len == buffer_space) {
00833         /* Whole message received or buffer is full */
00834         mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
00835         if (res != MQTT_CONNECT_ACCEPTED) {
00836           return res;
00837         }
00838         if (msg_rem_len == 0) {
00839           /* Reset parser state */
00840           client->msg_idx = 0;
00841           /* msg_tot_len = 0; */
00842           fixed_hdr_idx = 0;
00843         }
00844       }
00845     }
00846   }
00847   return MQTT_CONNECT_ACCEPTED;
00848 }
00849 
00850 
00851 /**
00852  * TCP received callback function. @see tcp_recv_fn
00853  * @param arg MQTT client
00854  * @param p PBUF chain of received data
00855  * @param err Passed as return value if not ERR_OK
00856  * @return ERR_OK or err passed into callback
00857  */
00858 static err_t
00859 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
00860 {
00861   mqtt_client_t *client = (mqtt_client_t *)arg;
00862   LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
00863   LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
00864 
00865   if (p == NULL) {
00866     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
00867     mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
00868   } else {
00869     mqtt_connection_status_t res;
00870     if (err != ERR_OK) {
00871       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err));
00872       pbuf_free(p);
00873       return err;
00874     }
00875 
00876     /* Tell remote that data has been received */
00877     tcp_recved(pcb, p->tot_len);
00878     res = mqtt_parse_incoming(client, p);
00879     pbuf_free(p);
00880 
00881     if (res != MQTT_CONNECT_ACCEPTED) {
00882       mqtt_close(client, res);
00883     }
00884     /* If keep alive functionality is used */
00885     if (client->keep_alive != 0) {
00886       /* Reset server alive watchdog */
00887       client->server_watchdog = 0;
00888     }
00889 
00890   }
00891   return ERR_OK;
00892 }
00893 
00894 
00895 /**
00896  * TCP data sent callback function. @see tcp_sent_fn
00897  * @param arg MQTT client
00898  * @param tpcb TCP connection handle
00899  * @param len Number of bytes sent
00900  * @return ERR_OK
00901  */
00902 static err_t
00903 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
00904 {
00905   mqtt_client_t *client = (mqtt_client_t *)arg;
00906 
00907   LWIP_UNUSED_ARG(tpcb);
00908   LWIP_UNUSED_ARG(len);
00909 
00910   if (client->conn_state == MQTT_CONNECTED) {
00911     struct mqtt_request_t *r;
00912 
00913     /* Reset keep-alive send timer and server watchdog */
00914     client->cyclic_tick = 0;
00915     client->server_watchdog = 0;
00916     /* QoS 0 publish has no response from server, so call its callbacks here */
00917     while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
00918       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
00919       if (r->cb != NULL) {
00920         r->cb(r->arg, ERR_OK);
00921       }
00922       mqtt_delete_request(r);
00923     }
00924     /* Try send any remaining buffers from output queue */
00925     mqtt_output_send(&client->output, client->conn);
00926   }
00927   return ERR_OK;
00928 }
00929 
00930 /**
00931  * TCP error callback function. @see tcp_err_fn
00932  * @param arg MQTT client
00933  * @param err Error encountered
00934  */
00935 static void
00936 mqtt_tcp_err_cb(void *arg, err_t err)
00937 {
00938   mqtt_client_t *client = (mqtt_client_t *)arg;
00939   LWIP_UNUSED_ARG(err); /* only used for debug output */
00940   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
00941   LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
00942   /* Set conn to null before calling close as pcb is already deallocated*/
00943   client->conn = 0;
00944   mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
00945 }
00946 
00947 /**
00948  * TCP poll callback function. @see tcp_poll_fn
00949  * @param arg MQTT client
00950  * @param tpcb TCP connection handle
00951  * @return err ERR_OK
00952  */
00953 static err_t
00954 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
00955 {
00956   mqtt_client_t *client = (mqtt_client_t *)arg;
00957   if (client->conn_state == MQTT_CONNECTED) {
00958     /* Try send any remaining buffers from output queue */
00959     mqtt_output_send(&client->output, tpcb);
00960   }
00961   return ERR_OK;
00962 }
00963 
00964 /**
00965  * TCP connect callback function. @see tcp_connected_fn
00966  * @param arg MQTT client
00967  * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
00968  * @return ERR_OK
00969  */
00970 static err_t
00971 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
00972 {
00973   mqtt_client_t* client = (mqtt_client_t *)arg;
00974 
00975   if (err != ERR_OK) {
00976     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
00977     return err;
00978   }
00979 
00980   /* Initiate receiver state */
00981   client->msg_idx = 0;
00982 
00983   /* Setup TCP callbacks */
00984   tcp_recv(tpcb, mqtt_tcp_recv_cb);
00985   tcp_sent(tpcb, mqtt_tcp_sent_cb);
00986   tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
00987 
00988   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
00989   /* Enter MQTT connect state */
00990   client->conn_state = MQTT_CONNECTING;
00991 
00992   /* Start cyclic timer */
00993   sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client);
00994   client->cyclic_tick = 0;
00995 
00996   /* Start transmission from output queue, connect message is the first one out*/
00997   mqtt_output_send(&client->output, client->conn);
00998 
00999   return ERR_OK;
01000 }
01001 
01002 
01003 
01004 /*---------------------------------------------------------------------------------------------------- */
01005 /* Public API */
01006 
01007 
01008 /**
01009  * @ingroup mqtt
01010  * MQTT publish function.
01011  * @param client MQTT client
01012  * @param topic Publish topic string
01013  * @param payload Data to publish (NULL is allowed)
01014  * @param payload_length: Length of payload (0 is allowed)
01015  * @param qos Quality of service, 0 1 or 2
01016  * @param retain MQTT retain flag
01017  * @param cb Callback to call when publish is complete or has timed out
01018  * @param arg User supplied argument to publish callback
01019  * @return ERR_OK if successful
01020  *         ERR_CONN if client is disconnected
01021  *         ERR_MEM if short on memory
01022  */
01023 err_t
01024 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
01025              mqtt_request_cb_t cb, void *arg)
01026 {
01027   struct mqtt_request_t *r;
01028   u16_t pkt_id;
01029   size_t topic_strlen;
01030   size_t total_len;
01031   u16_t topic_len;
01032   u16_t remaining_length;
01033 
01034   LWIP_ASSERT("mqtt_publish: client != NULL", client);
01035   LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
01036   LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
01037 
01038   topic_strlen = strlen(topic);
01039   LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
01040   topic_len = (u16_t)topic_strlen;
01041   total_len = 2 + topic_len + payload_length;
01042   LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
01043   remaining_length = (u16_t)total_len;
01044 
01045   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
01046 
01047   if (qos > 0) {
01048     remaining_length += 2;
01049     /* Generate pkt_id id for QoS1 and 2 */
01050     pkt_id = msg_generate_packet_id(client);
01051   } else {
01052     /* Use reserved value pkt_id 0 for QoS 0 in request handle */
01053     pkt_id = 0;
01054   }
01055 
01056   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
01057   if (r == NULL) {
01058     return ERR_MEM;
01059   }
01060 
01061   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01062     mqtt_delete_request(r);
01063     return ERR_MEM;
01064   }
01065   /* Append fixed header */
01066   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
01067 
01068   /* Append Topic */
01069   mqtt_output_append_string(&client->output, topic, topic_len);
01070 
01071   /* Append packet if for QoS 1 and 2*/
01072   if (qos > 0) {
01073     mqtt_output_append_u16(&client->output, pkt_id);
01074   }
01075 
01076   /* Append optional publish payload */
01077   if ((payload != NULL) && (payload_length > 0)) {
01078     mqtt_output_append_buf(&client->output, payload, payload_length);
01079   }
01080 
01081   mqtt_append_request(&client->pend_req_queue, r);
01082   mqtt_output_send(&client->output, client->conn);
01083   return ERR_OK;
01084 }
01085 
01086 
01087 /**
01088  * @ingroup mqtt
01089  * MQTT subscribe/unsubscribe function.
01090  * @param client MQTT client
01091  * @param topic topic to subscribe to
01092  * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
01093  * @param cb Callback to call when subscribe/unsubscribe reponse is received
01094  * @param arg User supplied argument to publish callback
01095  * @param sub 1 for subscribe, 0 for unsubscribe
01096  * @return ERR_OK if successful, @see err_t enum for other results
01097  */
01098 err_t
01099 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
01100 {
01101   size_t topic_strlen;
01102   size_t total_len;
01103   u16_t topic_len;
01104   u16_t remaining_length;
01105   u16_t pkt_id;
01106   struct mqtt_request_t *r;
01107 
01108   LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
01109   LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
01110 
01111   topic_strlen = strlen(topic);
01112   LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
01113   topic_len = (u16_t)topic_strlen;
01114   /* Topic string, pkt_id, qos for subscribe */
01115   total_len =  topic_len + 2 + 2 + (sub != 0);
01116   LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
01117   remaining_length = (u16_t)total_len;
01118 
01119   LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
01120   if (client->conn_state == TCP_DISCONNECTED) {
01121     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
01122     return ERR_CONN;
01123   }
01124 
01125   pkt_id = msg_generate_packet_id(client);
01126   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
01127   if (r == NULL) {
01128     return ERR_MEM;
01129   }
01130 
01131   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01132     mqtt_delete_request(r);
01133     return ERR_MEM;
01134   }
01135 
01136   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
01137 
01138   mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
01139   /* Packet id */
01140   mqtt_output_append_u16(&client->output, pkt_id);
01141   /* Topic */
01142   mqtt_output_append_string(&client->output, topic, topic_len);
01143   /* QoS */
01144   if (sub != 0) {
01145     mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
01146   }
01147 
01148   mqtt_append_request(&client->pend_req_queue, r);
01149   mqtt_output_send(&client->output, client->conn);
01150   return ERR_OK;
01151 }
01152 
01153 
01154 /**
01155  * @ingroup mqtt
01156  * Set callback to handle incoming publish requests from server
01157  * @param client MQTT client
01158  * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
01159  * @param data_cb Callback for each fragment of payload that arrives
01160  * @param arg User supplied argument to both callbacks
01161  */
01162 void
01163 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
01164                              mqtt_incoming_data_cb_t data_cb, void *arg)
01165 {
01166   LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
01167   client->data_cb = data_cb;
01168   client->pub_cb = pub_cb;
01169   client->inpub_arg = arg;
01170 }
01171 
01172 /**
01173  * @ingroup mqtt
01174  * Create a new MQTT client instance
01175  * @return Pointer to instance on success, NULL otherwise
01176  */
01177 mqtt_client_t *
01178 mqtt_client_new(void)
01179 {
01180   mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t));
01181   if (client != NULL) {
01182     memset(client, 0, sizeof(mqtt_client_t));
01183   }
01184   return client;
01185 }
01186 
01187 
01188 /**
01189  * @ingroup mqtt
01190  * Connect to MQTT server
01191  * @param client MQTT client
01192  * @param ip_addr Server IP
01193  * @param port Server port
01194  * @param cb Connection state change callback
01195  * @param arg User supplied argument to connection callback
01196  * @param client_info Client identification and connection options
01197  * @return ERR_OK if successful, @see err_t enum for other results
01198  */
01199 err_t
01200 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
01201                     const struct mqtt_connect_client_info_t *client_info)
01202 {
01203   err_t err;
01204   size_t len;
01205   u16_t client_id_length;
01206   /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
01207   u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
01208   u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
01209 
01210   LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
01211   LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
01212   LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
01213   LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
01214 
01215   if (client->conn_state != TCP_DISCONNECTED) {
01216     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
01217     return ERR_ISCONN;
01218   }
01219 
01220   /* Wipe clean */
01221   memset(client, 0, sizeof(mqtt_client_t));
01222   client->connect_arg = arg;
01223   client->connect_cb = cb;
01224   client->keep_alive = client_info->keep_alive;
01225   mqtt_init_requests(client->req_list);
01226 
01227   /* Build connect message */
01228   if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
01229     flags |= MQTT_CONNECT_FLAG_WILL;
01230     flags |= (client_info->will_qos & 3) << 3;
01231     if (client_info->will_retain) {
01232       flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
01233     }
01234     len = strlen(client_info->will_topic);
01235     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
01236     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
01237     will_topic_len = (u8_t)len;
01238     len = strlen(client_info->will_msg);
01239     LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
01240     will_msg_len = (u8_t)len;
01241     len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
01242     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01243     remaining_length = (u16_t)len;
01244   }
01245 
01246   /* Don't complicate things, always connect using clean session */
01247   flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
01248 
01249   len = strlen(client_info->client_id);
01250   LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
01251   client_id_length = (u16_t)len;
01252   len = remaining_length + 2 + client_id_length;
01253   LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
01254   remaining_length = (u16_t)len;
01255 
01256   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
01257     return ERR_MEM;
01258   }
01259 
01260   client->conn = tcp_new();
01261   if (client->conn == NULL) {
01262     return ERR_MEM;
01263   }
01264 
01265   /* Set arg pointer for callbacks */
01266   tcp_arg(client->conn, client);
01267   /* Any local address, pick random local port number */
01268   err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
01269   if (err != ERR_OK) {
01270     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
01271     goto tcp_fail;
01272   }
01273   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
01274 
01275   /* Connect to server */
01276   err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
01277   if (err != ERR_OK) {
01278     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
01279     goto tcp_fail;
01280   }
01281   /* Set error callback */
01282   tcp_err(client->conn, mqtt_tcp_err_cb);
01283   client->conn_state = TCP_CONNECTING;
01284 
01285   /* Append fixed header */
01286   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
01287   /* Append Protocol string */
01288   mqtt_output_append_string(&client->output, "MQTT", 4);
01289   /* Append Protocol level */
01290   mqtt_output_append_u8(&client->output, 4);
01291   /* Append connect flags */
01292   mqtt_output_append_u8(&client->output, flags);
01293   /* Append keep-alive */
01294   mqtt_output_append_u16(&client->output, client_info->keep_alive);
01295   /* Append client id */
01296   mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
01297   /* Append will message if used */
01298   if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
01299     mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
01300     mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
01301   }
01302   return ERR_OK;
01303 
01304 tcp_fail:
01305   tcp_abort(client->conn);
01306   client->conn = NULL;
01307   return err;
01308 }
01309 
01310 
01311 /**
01312  * @ingroup mqtt
01313  * Disconnect from MQTT server
01314  * @param client MQTT client
01315  */
01316 void
01317 mqtt_disconnect(mqtt_client_t *client)
01318 {
01319   LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
01320   /* If connection in not already closed */
01321   if (client->conn_state != TCP_DISCONNECTED) {
01322     /* Set conn_state before calling mqtt_close to prevent callback from being called */
01323     client->conn_state = TCP_DISCONNECTED;
01324     mqtt_close(client, (mqtt_connection_status_t)0);
01325   }
01326 }
01327 
01328 /**
01329  * @ingroup mqtt
01330  * Check connection with server
01331  * @param client MQTT client
01332  * @return 1 if connected to server, 0 otherwise
01333  */
01334 u8_t
01335 mqtt_client_is_connected(mqtt_client_t *client)
01336 {
01337   LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
01338   return client->conn_state == MQTT_CONNECTED;
01339 }
01340 
01341 #endif /* LWIP_TCP && LWIP_CALLBACK_API */