Sergey Pastor / 1

Dependents:   Nucleo

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mqtt_client.c Source File

mqtt_client.c

Go to the documentation of this file.
00001 /**
00002  * @file mqtt_client.c
00003  * @brief MQTT client
00004  *
00005  * @section License
00006  *
00007  * Copyright (C) 2010-2017 Oryx Embedded SARL. All rights reserved.
00008  *
00009  * This file is part of CycloneTCP Open.
00010  *
00011  * This program is free software; you can redistribute it and/or
00012  * modify it under the terms of the GNU General Public License
00013  * as published by the Free Software Foundation; either version 2
00014  * of the License, or (at your option) any later version.
00015  *
00016  * This program is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  * GNU General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU General Public License
00022  * along with this program; if not, write to the Free Software Foundation,
00023  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00024  *
00025  * @author Oryx Embedded SARL (www.oryx-embedded.com)
00026  * @version 1.7.6
00027  **/
00028 
00029 //Switch to the appropriate trace level
00030 #define TRACE_LEVEL MQTT_TRACE_LEVEL
00031 
00032 //Dependencies
00033 #include "core/net.h"
00034 #include "mqtt/mqtt_client.h"
00035 #include "mqtt/mqtt_client_packet.h"
00036 #include "mqtt/mqtt_client_transport.h"
00037 #include "mqtt/mqtt_client_misc.h"
00038 #include "debug.h"
00039 
00040 //Check TCP/IP stack configuration
00041 #if (MQTT_CLIENT_SUPPORT == ENABLED)
00042 
00043 
00044 /**
00045  * @brief Initialize MQTT client context
00046  * @param[in] context Pointer to the MQTT client context
00047  **/
00048 
00049 void mqttClientInit(MqttClientContext *context)
00050 {
00051    //Sanity check
00052    if(context != NULL)
00053    {
00054       //Clear MQTT client context
00055       memset(context, 0, sizeof(MqttClientContext));
00056 
00057       //Default protocol version
00058       context->settings.protocolLevel = MQTT_PROTOCOL_LEVEL_3_1_1;
00059       //Default transport protocol
00060       context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
00061       //Default keep-alive time interval
00062       context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
00063       //Default communication timeout
00064       context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
00065 
00066 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00067       //Default resource name (for WebSocket connections only)
00068       strcpy(context->settings.uri, "/");
00069 #endif
00070 
00071       //Initialize state machine
00072       context->state = MQTT_CLIENT_STATE_CLOSED;
00073       //Initialize packet identifier
00074       context->packetId = 0;
00075    }
00076 }
00077 
00078 
00079 /**
00080  * @brief Initialize callback structure
00081  * @param[in] callbacks Pointer to a structure that contains callback functions
00082  * @return Error code
00083  **/
00084 
00085 void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
00086 {
00087    //Initialize callback structure
00088    memset(callbacks, 0, sizeof(MqttClientCallbacks));
00089 }
00090 
00091 
00092 /**
00093  * @brief Register MQTT client callbacks
00094  * @param[in] context Pointer to the MQTT client context
00095  * @param[in] callbacks Pointer to a structure that contains callback functions
00096  * @return Error code
00097  **/
00098 
00099 error_t mqttClientRegisterCallbacks(MqttClientContext *context,
00100    const MqttClientCallbacks *callbacks)
00101 {
00102    //Make sure the MQTT client context is valid
00103    if(context == NULL)
00104       return ERROR_INVALID_PARAMETER;
00105 
00106    //Attach callback functions
00107    context->callbacks = *callbacks;
00108 
00109    //Successful processing
00110    return NO_ERROR;
00111 }
00112 
00113 
00114 /**
00115  * @brief Set the MQTT protocol version to be used
00116  * @param[in] context Pointer to the MQTT client context
00117  * @param[in] protocolLevel MQTT protocol level (3.1 or 3.1.1)
00118  * @return Error code
00119  **/
00120 
00121 error_t mqttClientSetProtocolLevel(MqttClientContext *context,
00122    MqttProtocolLevel protocolLevel)
00123 {
00124    //Make sure the MQTT client context is valid
00125    if(context == NULL)
00126       return ERROR_INVALID_PARAMETER;
00127 
00128    //Save the MQTT protocol version to be used
00129    context->settings.protocolLevel = protocolLevel;
00130 
00131    //Successful processing
00132    return NO_ERROR;
00133 }
00134 
00135 
00136 /**
00137  * @brief Set the transport protocol to be used
00138  * @param[in] context Pointer to the MQTT client context
00139  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
00140  *   WebSocket, or secure WebSocket)
00141  * @return Error code
00142  **/
00143 
00144 error_t mqttClientSetTransportProtocol(MqttClientContext *context,
00145    MqttTransportProtocol transportProtocol)
00146 {
00147    //Make sure the MQTT client context is valid
00148    if(context == NULL)
00149       return ERROR_INVALID_PARAMETER;
00150 
00151    //Save the transport protocol to be used
00152    context->settings.transportProtocol = transportProtocol;
00153 
00154    //Successful processing
00155    return NO_ERROR;
00156 }
00157 
00158 
00159 /**
00160  * @brief Set keep-alive value
00161  * @param[in] context Pointer to the MQTT client context
00162  * @param[in] keepAlive Maximum time interval that is permitted to elapse
00163  *   between the point at which the client finishes transmitting one control
00164  *   packet and the point it starts sending the next
00165  * @return Error code
00166  **/
00167 
00168 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
00169 {
00170    //Make sure the MQTT client context is valid
00171    if(context == NULL)
00172       return ERROR_INVALID_PARAMETER;
00173 
00174    //Save keep-alive value
00175    context->settings.keepAlive = keepAlive;
00176 
00177    //Successful processing
00178    return NO_ERROR;
00179 }
00180 
00181 
00182 /**
00183  * @brief Set communication timeout
00184  * @param[in] context Pointer to the MQTT client context
00185  * @param[in] timeout Timeout value, in seconds
00186  * @return Error code
00187  **/
00188 
00189 error_t mqttClientSetTimeout(MqttClientContext *context, uint16_t timeout)
00190 {
00191    //Make sure the MQTT client context is valid
00192    if(context == NULL)
00193       return ERROR_INVALID_PARAMETER;
00194 
00195    //Save timeout value
00196    context->settings.timeout = timeout;
00197 
00198    //Successful processing
00199    return NO_ERROR;
00200 }
00201 
00202 
00203 /**
00204  * @brief Set the hostname of the resource being requested
00205  * @param[in] context Pointer to the MQTT client context
00206  * @param[in] host NULL-terminated string containing the hostname
00207  * @return Error code
00208  **/
00209 
00210 error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
00211 {
00212    //Check parameters
00213    if(context == NULL || host == NULL)
00214       return ERROR_INVALID_PARAMETER;
00215 
00216    //Make sure the length of the hostname is acceptable
00217    if(strlen(host) > MQTT_CLIENT_MAX_HOST_LEN)
00218       return ERROR_INVALID_LENGTH;
00219 
00220 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00221    //Save hostname (for WebSocket connections only)
00222    strcpy(context->settings.host, host);
00223 #endif
00224 
00225    //Successful processing
00226    return NO_ERROR;
00227 }
00228 
00229 
00230 /**
00231  * @brief Set the name of the resource being requested
00232  * @param[in] context Pointer to the MQTT client context
00233  * @param[in] uri NULL-terminated string containing the URI
00234  * @return Error code
00235  **/
00236 
00237 error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
00238 {
00239    //Check parameters
00240    if(context == NULL || uri == NULL)
00241       return ERROR_INVALID_PARAMETER;
00242 
00243    //Make sure the length of the resource name is acceptable
00244    if(strlen(uri) > MQTT_CLIENT_MAX_URI_LEN)
00245       return ERROR_INVALID_LENGTH;
00246 
00247 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00248    //Save resource name (for WebSocket connections only)
00249    strcpy(context->settings.uri, uri);
00250 #endif
00251 
00252    //Successful processing
00253    return NO_ERROR;
00254 }
00255 
00256 
00257 /**
00258  * @brief Set client identifier
00259  * @param[in] context Pointer to the MQTT client context
00260  * @param[in] clientId NULL-terminated string containing the client identifier
00261  * @return Error code
00262  **/
00263 
00264 error_t mqttClientSetIdentifier(MqttClientContext *context,
00265    const char_t *clientId)
00266 {
00267    //Check parameters
00268    if(context == NULL || clientId == NULL)
00269       return ERROR_INVALID_PARAMETER;
00270 
00271    //Make sure the length of the client identifier is acceptable
00272    if(strlen(clientId) > MQTT_CLIENT_MAX_ID_LEN)
00273       return ERROR_INVALID_LENGTH;
00274 
00275    //Save client identifier
00276    strcpy(context->settings.clientId, clientId);
00277 
00278    //Successful processing
00279    return NO_ERROR;
00280 }
00281 
00282 
00283 /**
00284  * @brief Set authentication information
00285  * @param[in] context Pointer to the MQTT client context
00286  * @param[in] username NULL-terminated string containing the user name to be used
00287  * @param[in] password NULL-terminated string containing the password to be used
00288  * @return Error code
00289  **/
00290 
00291 error_t mqttClientSetAuthInfo(MqttClientContext *context,
00292    const char_t *username, const char_t *password)
00293 {
00294    //Check parameters
00295    if(context == NULL || username == NULL || password == NULL)
00296       return ERROR_INVALID_PARAMETER;
00297 
00298    //Make sure the length of the user name is acceptable
00299    if(strlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
00300       return ERROR_INVALID_LENGTH;
00301 
00302    //Save user name
00303    strcpy(context->settings.username, username);
00304 
00305    //Make sure the length of the password is acceptable
00306    if(strlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
00307       return ERROR_INVALID_LENGTH;
00308 
00309    //Save password
00310    strcpy(context->settings.password, password);
00311 
00312    //Successful processing
00313    return NO_ERROR;
00314 }
00315 
00316 
00317 /**
00318  * @brief Specify the Will message
00319  * @param[in] context Pointer to the MQTT client context
00320  * @param[in] topic Will topic name
00321  * @param[in] message Will message
00322  * @param[in] length Length of the Will message
00323  * @param[in] qos QoS level to be used when publishing the Will message
00324  * @param[in] retain This flag specifies if the Will message is to be retained
00325  * @return Error code
00326  **/
00327 
00328 error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic,
00329    const void *message, size_t length, MqttQosLevel qos, bool_t retain)
00330 {
00331    MqttClientWillMessage *willMessage;
00332 
00333    //Check parameters
00334    if(context == NULL || topic == NULL)
00335       return ERROR_INVALID_PARAMETER;
00336 
00337    //Make sure the length of the Will topic is acceptable
00338    if(strlen(topic) > MQTT_CLIENT_MAX_WILL_TOPIC_LEN)
00339       return ERROR_INVALID_LENGTH;
00340 
00341    //Point to the Will message
00342    willMessage = &context->settings.willMessage;
00343 
00344    //Save Will topic
00345    strcpy(willMessage->topic, topic);
00346 
00347    //Any message payload
00348    if(length > 0)
00349    {
00350       //Sanity check
00351       if(message == NULL)
00352          return ERROR_INVALID_PARAMETER;
00353 
00354       //Make sure the length of the Will message payload is acceptable
00355       if(strlen(message) > MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN)
00356          return ERROR_INVALID_LENGTH;
00357 
00358       //Save Will message payload
00359       memcpy(willMessage->payload, message, length);
00360    }
00361 
00362    //Length of the Will message payload
00363    willMessage->length = length;
00364    //QoS level to be used when publishing the Will message
00365    willMessage->qos = qos;
00366    //This flag specifies if the Will message is to be retained
00367    willMessage->retain = retain;
00368 
00369    //Successful processing
00370    return NO_ERROR;
00371 }
00372 
00373 
00374 /**
00375  * @brief Bind the MQTT client to a particular network interface
00376  * @param[in] context Pointer to the MQTT client context
00377  * @param[in] interface Network interface to be used
00378  * @return Error code
00379  **/
00380 
00381 error_t mqttClientBindToInterface(MqttClientContext *context,
00382    NetInterface *interface)
00383 {
00384    //Make sure the MQTT client context is valid
00385    if(context == NULL)
00386       return ERROR_INVALID_PARAMETER;
00387 
00388    //Explicitly associate the MQTT client with the specified interface
00389    context->interface = interface;
00390 
00391    //Successful processing
00392    return NO_ERROR;
00393 }
00394 
00395 
00396 /**
00397  * @brief Establish connection with the MQTT server
00398  * @param[in] context Pointer to the MQTT client context
00399  * @param[in] serverIpAddr IP address of the MQTT server to connect to
00400  * @param[in] serverPort TCP port number that will be used to establish the
00401  *   connection
00402  * @param[in] cleanSession If this flag is set, then the client and server
00403  *   must discard any previous session and start a new one
00404  * @return Error code
00405  **/
00406 
00407 error_t mqttClientConnect(MqttClientContext *context,
00408    const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
00409 {
00410    error_t error;
00411 
00412    //Check parameters
00413    if(context == NULL || serverIpAddr == NULL)
00414       return ERROR_INVALID_PARAMETER;
00415 
00416    //Initialize status code
00417    error = NO_ERROR;
00418 
00419    //Establish network connection
00420    while(context->state != MQTT_CLIENT_STATE_IDLE)
00421    {
00422       //Check current state
00423       if(context->state == MQTT_CLIENT_STATE_CLOSED)
00424       {
00425          //Open network connection
00426          error = mqttClientOpenConnection(context);
00427 
00428          //Check status code
00429          if(!error)
00430          {
00431             //Debug message
00432             TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
00433                ipAddrToString(serverIpAddr, NULL), serverPort);
00434 
00435             //The network connection is open
00436             mqttClientChangeState(context, MQTT_CLIENT_STATE_CONNECTING);
00437          }
00438          else
00439          {
00440             //Clean up side effects
00441             mqttClientCloseConnection(context);
00442          }
00443       }
00444       else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
00445       {
00446          //Establish network connection
00447          error = mqttClientEstablishConnection(context,
00448             serverIpAddr, serverPort);
00449 
00450          //Check status code
00451          if(!error)
00452          {
00453             //Debug message
00454             TRACE_INFO("MQTT: Connected to server\r\n");
00455 
00456             //The network connection is established
00457             mqttClientChangeState(context, MQTT_CLIENT_STATE_CONNECTED);
00458          }
00459       }
00460       else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
00461       {
00462          //Format CONNECT packet
00463          error = mqttClientFormatConnect(context, cleanSession);
00464 
00465          //Check status code
00466          if(!error)
00467          {
00468             //Debug message
00469             TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
00470             TRACE_DEBUG_ARRAY("  ", context->packet, context->packetLen);
00471 
00472             //Save the type of the MQTT packet to be sent
00473             context->packetType = MQTT_PACKET_TYPE_CONNECT;
00474             //Point to the beginning of the packet
00475             context->packetPos = 0;
00476 
00477             //Send CONNECT packet
00478             mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
00479          }
00480       }
00481       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
00482       {
00483          //Send more data
00484          error = mqttClientProcessEvents(context, context->settings.timeout);
00485       }
00486       else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
00487       {
00488          //Wait for CONNACK packet
00489          error = mqttClientProcessEvents(context, context->settings.timeout);
00490       }
00491       else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
00492       {
00493          //Receive more data
00494          error = mqttClientProcessEvents(context, context->settings.timeout);
00495       }
00496       else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
00497       {
00498          //Reset packet type
00499          context->packetType = MQTT_PACKET_TYPE_INVALID;
00500          //A CONNACK packet has been received
00501          mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00502       }
00503       else
00504       {
00505          //Invalid state
00506          error = ERROR_NOT_CONNECTED;
00507       }
00508 
00509       //Any error to report?
00510       if(error)
00511       {
00512 #if (NET_RTOS_SUPPORT == DISABLED)
00513          //Timeout error?
00514          if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
00515             break;
00516 #endif
00517          //Close connection
00518          mqttClientCloseConnection(context);
00519          //The connection is closed
00520          mqttClientChangeState(context, MQTT_CLIENT_STATE_CLOSED);
00521          //Exit immediately
00522          break;
00523       }
00524    }
00525 
00526    //Return status code
00527    return error;
00528 }
00529 
00530 
00531 /**
00532  * @brief Publish message
00533  * @param[in] context Pointer to the MQTT client context
00534  * @param[in] topic Topic name
00535  * @param[in] message Message payload
00536  * @param[in] length Length of the message payload
00537  * @param[in] qos QoS level to be used when publishing the message
00538  * @param[in] retain This flag specifies if the message is to be retained
00539  * @param[out] packetId Packet identifier used to send the PUBLISH packet
00540  * @return Error code
00541  **/
00542 
00543 error_t mqttClientPublish(MqttClientContext *context,
00544    const char_t *topic, const void *message, size_t length,
00545    MqttQosLevel qos, bool_t retain, uint16_t *packetId)
00546 {
00547    error_t error;
00548 
00549    //Make sure the MQTT client context is valid
00550    if(context == NULL)
00551       return ERROR_INVALID_PARAMETER;
00552 
00553    //Initialize status code
00554    error = NO_ERROR;
00555 
00556    //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
00557    do
00558    {
00559       //Check current state
00560       if(context->state == MQTT_CLIENT_STATE_IDLE)
00561       {
00562          //Format PUBLISH packet
00563          error = mqttClientFormatPublish(context, topic, message, length, qos, retain);
00564 
00565          //Check status code
00566          if(!error)
00567          {
00568             //Save the packet identifier used to send the PUBLISH packet
00569             if(packetId != NULL)
00570                *packetId = context->packetId;
00571 
00572             //Debug message
00573             TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
00574             TRACE_DEBUG_ARRAY("  ", context->packet, context->packetLen);
00575 
00576             //Save the type of the MQTT packet to be sent
00577             context->packetType = MQTT_PACKET_TYPE_PUBLISH;
00578             //Point to the beginning of the packet
00579             context->packetPos = 0;
00580 
00581             //Send PUBLISH packet
00582             mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
00583          }
00584       }
00585       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
00586       {
00587          //Send more data
00588          error = mqttClientProcessEvents(context, context->settings.timeout);
00589       }
00590       else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
00591       {
00592          //The last parameter is optional
00593          if(packetId != NULL)
00594          {
00595             //Reset packet type
00596             context->packetType = MQTT_PACKET_TYPE_INVALID;
00597             //Do not wait for PUBACK/PUBCOMP packet
00598             mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00599          }
00600          else
00601          {
00602             //Check QoS level
00603             if(qos == MQTT_QOS_LEVEL_0)
00604             {
00605                //Reset packet type
00606                context->packetType = MQTT_PACKET_TYPE_INVALID;
00607                //No response is sent by the receiver and no retry is performed by the sender
00608                mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00609             }
00610             else
00611             {
00612                //Wait for PUBACK/PUBCOMP packet
00613                error = mqttClientProcessEvents(context, context->settings.timeout);
00614             }
00615          }
00616       }
00617       else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
00618       {
00619          //Receive more data
00620          error = mqttClientProcessEvents(context, context->settings.timeout);
00621       }
00622       else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
00623       {
00624          //Reset packet type
00625          context->packetType = MQTT_PACKET_TYPE_INVALID;
00626          //A PUBACK/PUBCOMP packet has been received
00627          mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00628       }
00629       else
00630       {
00631          //Invalid state
00632          error = ERROR_NOT_CONNECTED;
00633       }
00634 
00635       //Any error to report?
00636       if(error)
00637          break;
00638 
00639       //Evaluate the loop condition
00640    } while(context->state != MQTT_CLIENT_STATE_IDLE);
00641 
00642    //Return status code
00643    return error;
00644 }
00645 
00646 
00647 /**
00648  * @brief Subscribe to topics
00649  * @param[in] context Pointer to the MQTT client context
00650  * @param[in] topic Topic filter
00651  * @param[in] qos Maximum QoS level at which the server can send application
00652  *   messages to the client
00653  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
00654  * @return Error code
00655  **/
00656 
00657 error_t mqttClientSubscribe(MqttClientContext *context,
00658    const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
00659 {
00660    error_t error;
00661 
00662    //Make sure the MQTT client context is valid
00663    if(context == NULL)
00664       return ERROR_INVALID_PARAMETER;
00665 
00666    //Initialize status code
00667    error = NO_ERROR;
00668 
00669    //Send SUBSCRIBE packet and wait for SUBACK packet to be received
00670    do
00671    {
00672       //Check current state
00673       if(context->state == MQTT_CLIENT_STATE_IDLE)
00674       {
00675          //Format SUBSCRIBE packet
00676          error = mqttClientFormatSubscribe(context, topic, qos);
00677 
00678          //Check status code
00679          if(!error)
00680          {
00681             //Save the packet identifier used to send the SUBSCRIBE packet
00682             if(packetId != NULL)
00683                *packetId = context->packetId;
00684 
00685             //Debug message
00686             TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
00687             TRACE_DEBUG_ARRAY("  ", context->packet, context->packetLen);
00688 
00689             //Save the type of the MQTT packet to be sent
00690             context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
00691             //Point to the beginning of the packet
00692             context->packetPos = 0;
00693 
00694             //Send SUBSCRIBE packet
00695             mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
00696          }
00697       }
00698       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
00699       {
00700          //Send more data
00701          error = mqttClientProcessEvents(context, context->settings.timeout);
00702       }
00703       else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
00704       {
00705          //The last parameter is optional
00706          if(packetId != NULL)
00707          {
00708             //Reset packet type
00709             context->packetType = MQTT_PACKET_TYPE_INVALID;
00710             //Do not wait for SUBACK packet
00711             mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00712          }
00713          else
00714          {
00715             //Wait for SUBACK packet
00716             error = mqttClientProcessEvents(context, context->settings.timeout);
00717          }
00718       }
00719       else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
00720       {
00721          //Receive more data
00722          error = mqttClientProcessEvents(context, context->settings.timeout);
00723       }
00724       else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
00725       {
00726          //Reset packet type
00727          context->packetType = MQTT_PACKET_TYPE_INVALID;
00728          //A SUBACK packet has been received
00729          mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00730       }
00731       else
00732       {
00733          //Invalid state
00734          error = ERROR_NOT_CONNECTED;
00735       }
00736 
00737       //Any error to report?
00738       if(error)
00739          break;
00740 
00741       //Evaluate the loop condition
00742    } while(context->state != MQTT_CLIENT_STATE_IDLE);
00743 
00744    //Return status code
00745    return error;
00746 }
00747 
00748 
00749 /**
00750  * @brief Unsubscribe from topics
00751  * @param[in] context Pointer to the MQTT client context
00752  * @param[in] topic Topic filter
00753  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
00754  * @return Error code
00755  **/
00756 
00757 error_t mqttClientUnsubscribe(MqttClientContext *context,
00758    const char_t *topic, uint16_t *packetId)
00759 {
00760    error_t error;
00761 
00762    //Make sure the MQTT client context is valid
00763    if(context == NULL)
00764       return ERROR_INVALID_PARAMETER;
00765 
00766    //Initialize status code
00767    error = NO_ERROR;
00768 
00769    //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
00770    do
00771    {
00772       //Check current state
00773       if(context->state == MQTT_CLIENT_STATE_IDLE)
00774       {
00775          //Format UNSUBSCRIBE packet
00776          error = mqttClientFormatUnsubscribe(context, topic);
00777 
00778          //Check status code
00779          if(!error)
00780          {
00781             //Save the packet identifier used to send the UNSUBSCRIBE packet
00782             if(packetId != NULL)
00783                *packetId = context->packetId;
00784 
00785             //Debug message
00786             TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
00787             TRACE_DEBUG_ARRAY("  ", context->packet, context->packetLen);
00788 
00789             //Save the type of the MQTT packet to be sent
00790             context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
00791             //Point to the beginning of the packet
00792             context->packetPos = 0;
00793 
00794             //Send UNSUBSCRIBE packet
00795             mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
00796          }
00797       }
00798       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
00799       {
00800          //Send more data
00801          error = mqttClientProcessEvents(context, context->settings.timeout);
00802       }
00803       else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
00804       {
00805          //The last parameter is optional
00806          if(packetId != NULL)
00807          {
00808             //Reset packet type
00809             context->packetType = MQTT_PACKET_TYPE_INVALID;
00810             //Do not wait for UNSUBACK packet
00811             mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00812          }
00813          else
00814          {
00815             //Wait for UNSUBACK packet
00816             error = mqttClientProcessEvents(context, context->settings.timeout);
00817          }
00818       }
00819       else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
00820       {
00821          //Receive more data
00822          error = mqttClientProcessEvents(context, context->settings.timeout);
00823       }
00824       else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
00825       {
00826          //Reset packet type
00827          context->packetType = MQTT_PACKET_TYPE_INVALID;
00828          //An UNSUBACK packet has been received
00829          mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00830       }
00831       else
00832       {
00833          //Invalid state
00834          error = ERROR_NOT_CONNECTED;
00835       }
00836 
00837       //Any error to report?
00838       if(error)
00839          break;
00840 
00841       //Evaluate the loop condition
00842    } while(context->state != MQTT_CLIENT_STATE_IDLE);
00843 
00844    //Return status code
00845    return error;
00846 }
00847 
00848 
00849 /**
00850  * @brief Send ping request
00851  * @param[in] context Pointer to the MQTT client context
00852  * @param[out] rtt Round-trip time (optional parameter)
00853  * @return Error code
00854  **/
00855 
00856 error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
00857 {
00858    error_t error;
00859 
00860    //Make sure the MQTT client context is valid
00861    if(context == NULL)
00862       return ERROR_INVALID_PARAMETER;
00863 
00864    //Initialize status code
00865    error = NO_ERROR;
00866 
00867    //Send PINGREQ packet and wait for PINGRESP packet to be received
00868    do
00869    {
00870       //Check current state
00871       if(context->state == MQTT_CLIENT_STATE_IDLE)
00872       {
00873          //Format PINGREQ packet
00874          error = mqttClientFormatPingReq(context);
00875 
00876          //Check status code
00877          if(!error)
00878          {
00879             //Debug message
00880             TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
00881             TRACE_DEBUG_ARRAY("  ", context->packet, context->packetLen);
00882 
00883             //Save the type of the MQTT packet to be sent
00884             context->packetType = MQTT_PACKET_TYPE_PINGREQ;
00885             //Point to the beginning of the packet
00886             context->packetPos = 0;
00887 
00888             //Send PINGREQ packet
00889             mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
00890 
00891             //Save the time at which the request was sent
00892             if(rtt != NULL)
00893                context->pingTimestamp = osGetSystemTime();
00894          }
00895       }
00896       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
00897       {
00898          //Send more data
00899          error = mqttClientProcessEvents(context, context->settings.timeout);
00900       }
00901       else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
00902       {
00903          //The last parameter is optional
00904          if(rtt != NULL)
00905          {
00906             //Wait for PINGRESP packet
00907             error = mqttClientProcessEvents(context, context->settings.timeout);
00908          }
00909          else
00910          {
00911             //Reset packet type
00912             context->packetType = MQTT_PACKET_TYPE_INVALID;
00913             //Do not wait for PINGRESP packet
00914             mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00915          }
00916       }
00917       else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
00918       {
00919          //Receive more data
00920          error = mqttClientProcessEvents(context, context->settings.timeout);
00921       }
00922       else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
00923       {
00924          //The last parameter is optional
00925          if(rtt != NULL)
00926          {
00927             //Compute round-trip time
00928             *rtt = osGetSystemTime() - context->pingTimestamp;
00929          }
00930 
00931          //Reset packet type
00932          context->packetType = MQTT_PACKET_TYPE_INVALID;
00933          //A PINGRESP packet has been received
00934          mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
00935       }
00936       else
00937       {
00938          //Invalid state
00939          error = ERROR_NOT_CONNECTED;
00940       }
00941 
00942       //Any error to report?
00943       if(error)
00944          break;
00945 
00946       //Evaluate the loop condition
00947    } while(context->state != MQTT_CLIENT_STATE_IDLE);
00948 
00949    //Return status code
00950    return error;
00951 }
00952 
00953 
00954 /**
00955  * @brief Gracefully disconnect from the MQTT server
00956  * @param[in] context Pointer to the MQTT client context
00957  * @return Error code
00958  **/
00959 
00960 error_t mqttClientDisconnect(MqttClientContext *context)
00961 {
00962    error_t error;
00963 
00964    //Make sure the MQTT client context is valid
00965    if(context == NULL)
00966       return ERROR_INVALID_PARAMETER;
00967 
00968    //Initialize status code
00969    error = NO_ERROR;
00970 
00971    //Send DISCONNECT packet and shutdown network connection
00972    while(context->state != MQTT_CLIENT_STATE_DISCONNECTED)
00973    {
00974       //Check current state
00975       if(context->state == MQTT_CLIENT_STATE_IDLE)
00976       {
00977          //Format DISCONNECT packet
00978          error = mqttClientFormatDisconnect(context);
00979 
00980          //Check status code
00981          if(!error)
00982          {
00983             //Debug message
00984             TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
00985             TRACE_DEBUG_ARRAY("  ", context->packet, context->packetLen);
00986 
00987             //Save the type of the MQTT packet to be sent
00988             context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
00989             //Point to the beginning of the packet
00990             context->packetPos = 0;
00991 
00992             //Send DISCONNECT packet
00993             mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
00994          }
00995       }
00996       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
00997       {
00998          //Send more data
00999          error = mqttClientProcessEvents(context, context->settings.timeout);
01000       }
01001       else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
01002       {
01003          //Debug message
01004          TRACE_INFO("MQTT: Shutting down connection...\r\n");
01005 
01006          //After sending a DISCONNECT packet the client must not send any
01007          //more control packets on that network connection
01008          mqttClientChangeState(context, MQTT_CLIENT_STATE_DISCONNECTING);
01009       }
01010       else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
01011       {
01012          //Properly dispose the network connection
01013          error = mqttClientShutdownConnection(context);
01014 
01015          //Check status code
01016          if(!error)
01017          {
01018             //The MQTT client is disconnected
01019             mqttClientChangeState(context, MQTT_CLIENT_STATE_DISCONNECTED);
01020          }
01021       }
01022       else
01023       {
01024          //Invalid state
01025          error = ERROR_NOT_CONNECTED;
01026       }
01027 
01028       //Any error to report?
01029       if(error)
01030          break;
01031    }
01032 
01033    //Return status code
01034    return error;
01035 }
01036 
01037 
01038 /**
01039  * @brief Close the connection with the MQTT server
01040  * @param[in] context Pointer to the MQTT client context
01041  * @return Error code
01042  **/
01043 
01044 error_t mqttClientClose(MqttClientContext *context)
01045 {
01046    //Make sure the MQTT client context is valid
01047    if(context == NULL)
01048       return ERROR_INVALID_PARAMETER;
01049 
01050    //Close connection
01051    mqttClientCloseConnection(context);
01052    //The connection is closed
01053    mqttClientChangeState(context, MQTT_CLIENT_STATE_CLOSED);
01054 
01055    //Network connection successfully closed
01056    return NO_ERROR;
01057 }
01058 
01059 
01060 /**
01061  * @brief Process MQTT client events
01062  * @param[in] context Pointer to the MQTT client context
01063  * @param[in] timeout Maximum time to wait before returning
01064  * @return Error code
01065  **/
01066 
01067 error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
01068 {
01069    error_t error;
01070    size_t n;
01071 
01072    //It is the responsibility of the client to ensure that the interval
01073    //between control packets being sent does not exceed the keep-alive value
01074    error = mqttClientCheckKeepAlive(context);
01075 
01076    //Check status code
01077    if(!error)
01078    {
01079       //Check current state
01080       if(context->state == MQTT_CLIENT_STATE_IDLE ||
01081          context->state == MQTT_CLIENT_STATE_PACKET_SENT)
01082       {
01083          //Wait for incoming data
01084          error = mqttClientWaitForData(context, timeout);
01085 
01086          //Check status code
01087          if(!error)
01088          {
01089             //Initialize context
01090             context->packet = context->buffer;
01091             context->packetPos = 0;
01092             context->packetLen = 0;
01093             context->remainingLen = 0;
01094 
01095             //Start receiving the packet
01096             mqttClientChangeState(context, MQTT_CLIENT_STATE_RECEIVING_PACKET);
01097          }
01098       }
01099       else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
01100       {
01101          //Receive the incoming packet
01102          error = mqttClientReceivePacket(context);
01103 
01104          //Check status code
01105          if(!error)
01106          {
01107             //Process MQTT control packet
01108             error = mqttClientProcessPacket(context);
01109 
01110             //Update MQTT client state
01111             if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
01112             {
01113                if(context->packetType == MQTT_PACKET_TYPE_INVALID)
01114                   mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
01115                else
01116                   mqttClientChangeState(context, MQTT_CLIENT_STATE_PACKET_SENT);
01117             }
01118          }
01119       }
01120       else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
01121       {
01122          //Any remaining data to be sent?
01123          if(context->packetPos < context->packetLen)
01124          {
01125             //Send more data
01126             error = mqttClientSendData(context, context->packet + context->packetPos,
01127                context->packetLen - context->packetPos, &n, 0);
01128 
01129             //Advance data pointer
01130             context->packetPos += n;
01131          }
01132          else
01133          {
01134             //Save the time at which the message was sent
01135             context->keepAliveTimestamp = osGetSystemTime();
01136 
01137             //Update MQTT client state
01138             if(context->packetType == MQTT_PACKET_TYPE_INVALID)
01139                mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
01140             else
01141                mqttClientChangeState(context, MQTT_CLIENT_STATE_PACKET_SENT);
01142          }
01143       }
01144    }
01145 
01146    //Return status code
01147    return error;
01148 }
01149 
01150 #endif
01151