Webserver+3d print

Dependents:   Nucleo

Committer:
Sergunb
Date:
Sat Feb 04 18:15:49 2017 +0000
Revision:
0:8918a71cdbe9
nothing else

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Sergunb 0:8918a71cdbe9 1 /**
Sergunb 0:8918a71cdbe9 2 * @file mqtt_client.c
Sergunb 0:8918a71cdbe9 3 * @brief MQTT client
Sergunb 0:8918a71cdbe9 4 *
Sergunb 0:8918a71cdbe9 5 * @section License
Sergunb 0:8918a71cdbe9 6 *
Sergunb 0:8918a71cdbe9 7 * Copyright (C) 2010-2017 Oryx Embedded SARL. All rights reserved.
Sergunb 0:8918a71cdbe9 8 *
Sergunb 0:8918a71cdbe9 9 * This file is part of CycloneTCP Open.
Sergunb 0:8918a71cdbe9 10 *
Sergunb 0:8918a71cdbe9 11 * This program is free software; you can redistribute it and/or
Sergunb 0:8918a71cdbe9 12 * modify it under the terms of the GNU General Public License
Sergunb 0:8918a71cdbe9 13 * as published by the Free Software Foundation; either version 2
Sergunb 0:8918a71cdbe9 14 * of the License, or (at your option) any later version.
Sergunb 0:8918a71cdbe9 15 *
Sergunb 0:8918a71cdbe9 16 * This program is distributed in the hope that it will be useful,
Sergunb 0:8918a71cdbe9 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Sergunb 0:8918a71cdbe9 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Sergunb 0:8918a71cdbe9 19 * GNU General Public License for more details.
Sergunb 0:8918a71cdbe9 20 *
Sergunb 0:8918a71cdbe9 21 * You should have received a copy of the GNU General Public License
Sergunb 0:8918a71cdbe9 22 * along with this program; if not, write to the Free Software Foundation,
Sergunb 0:8918a71cdbe9 23 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Sergunb 0:8918a71cdbe9 24 *
Sergunb 0:8918a71cdbe9 25 * @author Oryx Embedded SARL (www.oryx-embedded.com)
Sergunb 0:8918a71cdbe9 26 * @version 1.7.6
Sergunb 0:8918a71cdbe9 27 **/
Sergunb 0:8918a71cdbe9 28
Sergunb 0:8918a71cdbe9 29 //Switch to the appropriate trace level
Sergunb 0:8918a71cdbe9 30 #define TRACE_LEVEL MQTT_TRACE_LEVEL
Sergunb 0:8918a71cdbe9 31
Sergunb 0:8918a71cdbe9 32 //Dependencies
Sergunb 0:8918a71cdbe9 33 #include "core/net.h"
Sergunb 0:8918a71cdbe9 34 #include "mqtt/mqtt_client.h"
Sergunb 0:8918a71cdbe9 35 #include "mqtt/mqtt_client_packet.h"
Sergunb 0:8918a71cdbe9 36 #include "mqtt/mqtt_client_transport.h"
Sergunb 0:8918a71cdbe9 37 #include "mqtt/mqtt_client_misc.h"
Sergunb 0:8918a71cdbe9 38 #include "debug.h"
Sergunb 0:8918a71cdbe9 39
Sergunb 0:8918a71cdbe9 40 //Check TCP/IP stack configuration
Sergunb 0:8918a71cdbe9 41 #if (MQTT_CLIENT_SUPPORT == ENABLED)
Sergunb 0:8918a71cdbe9 42
Sergunb 0:8918a71cdbe9 43
Sergunb 0:8918a71cdbe9 44 /**
Sergunb 0:8918a71cdbe9 45 * @brief Initialize MQTT client context
Sergunb 0:8918a71cdbe9 46 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 47 **/
Sergunb 0:8918a71cdbe9 48
Sergunb 0:8918a71cdbe9 49 void mqttClientInit(MqttClientContext *context)
Sergunb 0:8918a71cdbe9 50 {
Sergunb 0:8918a71cdbe9 51 //Sanity check
Sergunb 0:8918a71cdbe9 52 if(context != NULL)
Sergunb 0:8918a71cdbe9 53 {
Sergunb 0:8918a71cdbe9 54 //Clear MQTT client context
Sergunb 0:8918a71cdbe9 55 memset(context, 0, sizeof(MqttClientContext));
Sergunb 0:8918a71cdbe9 56
Sergunb 0:8918a71cdbe9 57 //Default protocol version
Sergunb 0:8918a71cdbe9 58 context->settings.protocolLevel = MQTT_PROTOCOL_LEVEL_3_1_1;
Sergunb 0:8918a71cdbe9 59 //Default transport protocol
Sergunb 0:8918a71cdbe9 60 context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
Sergunb 0:8918a71cdbe9 61 //Default keep-alive time interval
Sergunb 0:8918a71cdbe9 62 context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
Sergunb 0:8918a71cdbe9 63 //Default communication timeout
Sergunb 0:8918a71cdbe9 64 context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
Sergunb 0:8918a71cdbe9 65
Sergunb 0:8918a71cdbe9 66 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
Sergunb 0:8918a71cdbe9 67 //Default resource name (for WebSocket connections only)
Sergunb 0:8918a71cdbe9 68 strcpy(context->settings.uri, "/");
Sergunb 0:8918a71cdbe9 69 #endif
Sergunb 0:8918a71cdbe9 70
Sergunb 0:8918a71cdbe9 71 //Initialize state machine
Sergunb 0:8918a71cdbe9 72 context->state = MQTT_CLIENT_STATE_CLOSED;
Sergunb 0:8918a71cdbe9 73 //Initialize packet identifier
Sergunb 0:8918a71cdbe9 74 context->packetId = 0;
Sergunb 0:8918a71cdbe9 75 }
Sergunb 0:8918a71cdbe9 76 }
Sergunb 0:8918a71cdbe9 77
Sergunb 0:8918a71cdbe9 78
Sergunb 0:8918a71cdbe9 79 /**
Sergunb 0:8918a71cdbe9 80 * @brief Initialize callback structure
Sergunb 0:8918a71cdbe9 81 * @param[in] callbacks Pointer to a structure that contains callback functions
Sergunb 0:8918a71cdbe9 82 * @return Error code
Sergunb 0:8918a71cdbe9 83 **/
Sergunb 0:8918a71cdbe9 84
Sergunb 0:8918a71cdbe9 85 void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Sergunb 0:8918a71cdbe9 86 {
Sergunb 0:8918a71cdbe9 87 //Initialize callback structure
Sergunb 0:8918a71cdbe9 88 memset(callbacks, 0, sizeof(MqttClientCallbacks));
Sergunb 0:8918a71cdbe9 89 }
Sergunb 0:8918a71cdbe9 90
Sergunb 0:8918a71cdbe9 91
Sergunb 0:8918a71cdbe9 92 /**
Sergunb 0:8918a71cdbe9 93 * @brief Register MQTT client callbacks
Sergunb 0:8918a71cdbe9 94 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 95 * @param[in] callbacks Pointer to a structure that contains callback functions
Sergunb 0:8918a71cdbe9 96 * @return Error code
Sergunb 0:8918a71cdbe9 97 **/
Sergunb 0:8918a71cdbe9 98
Sergunb 0:8918a71cdbe9 99 error_t mqttClientRegisterCallbacks(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 100 const MqttClientCallbacks *callbacks)
Sergunb 0:8918a71cdbe9 101 {
Sergunb 0:8918a71cdbe9 102 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 103 if(context == NULL)
Sergunb 0:8918a71cdbe9 104 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 105
Sergunb 0:8918a71cdbe9 106 //Attach callback functions
Sergunb 0:8918a71cdbe9 107 context->callbacks = *callbacks;
Sergunb 0:8918a71cdbe9 108
Sergunb 0:8918a71cdbe9 109 //Successful processing
Sergunb 0:8918a71cdbe9 110 return NO_ERROR;
Sergunb 0:8918a71cdbe9 111 }
Sergunb 0:8918a71cdbe9 112
Sergunb 0:8918a71cdbe9 113
Sergunb 0:8918a71cdbe9 114 /**
Sergunb 0:8918a71cdbe9 115 * @brief Set the MQTT protocol version to be used
Sergunb 0:8918a71cdbe9 116 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 117 * @param[in] protocolLevel MQTT protocol level (3.1 or 3.1.1)
Sergunb 0:8918a71cdbe9 118 * @return Error code
Sergunb 0:8918a71cdbe9 119 **/
Sergunb 0:8918a71cdbe9 120
Sergunb 0:8918a71cdbe9 121 error_t mqttClientSetProtocolLevel(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 122 MqttProtocolLevel protocolLevel)
Sergunb 0:8918a71cdbe9 123 {
Sergunb 0:8918a71cdbe9 124 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 125 if(context == NULL)
Sergunb 0:8918a71cdbe9 126 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 127
Sergunb 0:8918a71cdbe9 128 //Save the MQTT protocol version to be used
Sergunb 0:8918a71cdbe9 129 context->settings.protocolLevel = protocolLevel;
Sergunb 0:8918a71cdbe9 130
Sergunb 0:8918a71cdbe9 131 //Successful processing
Sergunb 0:8918a71cdbe9 132 return NO_ERROR;
Sergunb 0:8918a71cdbe9 133 }
Sergunb 0:8918a71cdbe9 134
Sergunb 0:8918a71cdbe9 135
Sergunb 0:8918a71cdbe9 136 /**
Sergunb 0:8918a71cdbe9 137 * @brief Set the transport protocol to be used
Sergunb 0:8918a71cdbe9 138 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 139 * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
Sergunb 0:8918a71cdbe9 140 * WebSocket, or secure WebSocket)
Sergunb 0:8918a71cdbe9 141 * @return Error code
Sergunb 0:8918a71cdbe9 142 **/
Sergunb 0:8918a71cdbe9 143
Sergunb 0:8918a71cdbe9 144 error_t mqttClientSetTransportProtocol(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 145 MqttTransportProtocol transportProtocol)
Sergunb 0:8918a71cdbe9 146 {
Sergunb 0:8918a71cdbe9 147 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 148 if(context == NULL)
Sergunb 0:8918a71cdbe9 149 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 150
Sergunb 0:8918a71cdbe9 151 //Save the transport protocol to be used
Sergunb 0:8918a71cdbe9 152 context->settings.transportProtocol = transportProtocol;
Sergunb 0:8918a71cdbe9 153
Sergunb 0:8918a71cdbe9 154 //Successful processing
Sergunb 0:8918a71cdbe9 155 return NO_ERROR;
Sergunb 0:8918a71cdbe9 156 }
Sergunb 0:8918a71cdbe9 157
Sergunb 0:8918a71cdbe9 158
Sergunb 0:8918a71cdbe9 159 /**
Sergunb 0:8918a71cdbe9 160 * @brief Set keep-alive value
Sergunb 0:8918a71cdbe9 161 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 162 * @param[in] keepAlive Maximum time interval that is permitted to elapse
Sergunb 0:8918a71cdbe9 163 * between the point at which the client finishes transmitting one control
Sergunb 0:8918a71cdbe9 164 * packet and the point it starts sending the next
Sergunb 0:8918a71cdbe9 165 * @return Error code
Sergunb 0:8918a71cdbe9 166 **/
Sergunb 0:8918a71cdbe9 167
Sergunb 0:8918a71cdbe9 168 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Sergunb 0:8918a71cdbe9 169 {
Sergunb 0:8918a71cdbe9 170 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 171 if(context == NULL)
Sergunb 0:8918a71cdbe9 172 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 173
Sergunb 0:8918a71cdbe9 174 //Save keep-alive value
Sergunb 0:8918a71cdbe9 175 context->settings.keepAlive = keepAlive;
Sergunb 0:8918a71cdbe9 176
Sergunb 0:8918a71cdbe9 177 //Successful processing
Sergunb 0:8918a71cdbe9 178 return NO_ERROR;
Sergunb 0:8918a71cdbe9 179 }
Sergunb 0:8918a71cdbe9 180
Sergunb 0:8918a71cdbe9 181
Sergunb 0:8918a71cdbe9 182 /**
Sergunb 0:8918a71cdbe9 183 * @brief Set communication timeout
Sergunb 0:8918a71cdbe9 184 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 185 * @param[in] timeout Timeout value, in seconds
Sergunb 0:8918a71cdbe9 186 * @return Error code
Sergunb 0:8918a71cdbe9 187 **/
Sergunb 0:8918a71cdbe9 188
Sergunb 0:8918a71cdbe9 189 error_t mqttClientSetTimeout(MqttClientContext *context, uint16_t timeout)
Sergunb 0:8918a71cdbe9 190 {
Sergunb 0:8918a71cdbe9 191 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 192 if(context == NULL)
Sergunb 0:8918a71cdbe9 193 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 194
Sergunb 0:8918a71cdbe9 195 //Save timeout value
Sergunb 0:8918a71cdbe9 196 context->settings.timeout = timeout;
Sergunb 0:8918a71cdbe9 197
Sergunb 0:8918a71cdbe9 198 //Successful processing
Sergunb 0:8918a71cdbe9 199 return NO_ERROR;
Sergunb 0:8918a71cdbe9 200 }
Sergunb 0:8918a71cdbe9 201
Sergunb 0:8918a71cdbe9 202
Sergunb 0:8918a71cdbe9 203 /**
Sergunb 0:8918a71cdbe9 204 * @brief Set the hostname of the resource being requested
Sergunb 0:8918a71cdbe9 205 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 206 * @param[in] host NULL-terminated string containing the hostname
Sergunb 0:8918a71cdbe9 207 * @return Error code
Sergunb 0:8918a71cdbe9 208 **/
Sergunb 0:8918a71cdbe9 209
Sergunb 0:8918a71cdbe9 210 error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Sergunb 0:8918a71cdbe9 211 {
Sergunb 0:8918a71cdbe9 212 //Check parameters
Sergunb 0:8918a71cdbe9 213 if(context == NULL || host == NULL)
Sergunb 0:8918a71cdbe9 214 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 215
Sergunb 0:8918a71cdbe9 216 //Make sure the length of the hostname is acceptable
Sergunb 0:8918a71cdbe9 217 if(strlen(host) > MQTT_CLIENT_MAX_HOST_LEN)
Sergunb 0:8918a71cdbe9 218 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 219
Sergunb 0:8918a71cdbe9 220 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
Sergunb 0:8918a71cdbe9 221 //Save hostname (for WebSocket connections only)
Sergunb 0:8918a71cdbe9 222 strcpy(context->settings.host, host);
Sergunb 0:8918a71cdbe9 223 #endif
Sergunb 0:8918a71cdbe9 224
Sergunb 0:8918a71cdbe9 225 //Successful processing
Sergunb 0:8918a71cdbe9 226 return NO_ERROR;
Sergunb 0:8918a71cdbe9 227 }
Sergunb 0:8918a71cdbe9 228
Sergunb 0:8918a71cdbe9 229
Sergunb 0:8918a71cdbe9 230 /**
Sergunb 0:8918a71cdbe9 231 * @brief Set the name of the resource being requested
Sergunb 0:8918a71cdbe9 232 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 233 * @param[in] uri NULL-terminated string containing the URI
Sergunb 0:8918a71cdbe9 234 * @return Error code
Sergunb 0:8918a71cdbe9 235 **/
Sergunb 0:8918a71cdbe9 236
Sergunb 0:8918a71cdbe9 237 error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Sergunb 0:8918a71cdbe9 238 {
Sergunb 0:8918a71cdbe9 239 //Check parameters
Sergunb 0:8918a71cdbe9 240 if(context == NULL || uri == NULL)
Sergunb 0:8918a71cdbe9 241 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 242
Sergunb 0:8918a71cdbe9 243 //Make sure the length of the resource name is acceptable
Sergunb 0:8918a71cdbe9 244 if(strlen(uri) > MQTT_CLIENT_MAX_URI_LEN)
Sergunb 0:8918a71cdbe9 245 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 246
Sergunb 0:8918a71cdbe9 247 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
Sergunb 0:8918a71cdbe9 248 //Save resource name (for WebSocket connections only)
Sergunb 0:8918a71cdbe9 249 strcpy(context->settings.uri, uri);
Sergunb 0:8918a71cdbe9 250 #endif
Sergunb 0:8918a71cdbe9 251
Sergunb 0:8918a71cdbe9 252 //Successful processing
Sergunb 0:8918a71cdbe9 253 return NO_ERROR;
Sergunb 0:8918a71cdbe9 254 }
Sergunb 0:8918a71cdbe9 255
Sergunb 0:8918a71cdbe9 256
Sergunb 0:8918a71cdbe9 257 /**
Sergunb 0:8918a71cdbe9 258 * @brief Set client identifier
Sergunb 0:8918a71cdbe9 259 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 260 * @param[in] clientId NULL-terminated string containing the client identifier
Sergunb 0:8918a71cdbe9 261 * @return Error code
Sergunb 0:8918a71cdbe9 262 **/
Sergunb 0:8918a71cdbe9 263
Sergunb 0:8918a71cdbe9 264 error_t mqttClientSetIdentifier(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 265 const char_t *clientId)
Sergunb 0:8918a71cdbe9 266 {
Sergunb 0:8918a71cdbe9 267 //Check parameters
Sergunb 0:8918a71cdbe9 268 if(context == NULL || clientId == NULL)
Sergunb 0:8918a71cdbe9 269 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 270
Sergunb 0:8918a71cdbe9 271 //Make sure the length of the client identifier is acceptable
Sergunb 0:8918a71cdbe9 272 if(strlen(clientId) > MQTT_CLIENT_MAX_ID_LEN)
Sergunb 0:8918a71cdbe9 273 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 274
Sergunb 0:8918a71cdbe9 275 //Save client identifier
Sergunb 0:8918a71cdbe9 276 strcpy(context->settings.clientId, clientId);
Sergunb 0:8918a71cdbe9 277
Sergunb 0:8918a71cdbe9 278 //Successful processing
Sergunb 0:8918a71cdbe9 279 return NO_ERROR;
Sergunb 0:8918a71cdbe9 280 }
Sergunb 0:8918a71cdbe9 281
Sergunb 0:8918a71cdbe9 282
Sergunb 0:8918a71cdbe9 283 /**
Sergunb 0:8918a71cdbe9 284 * @brief Set authentication information
Sergunb 0:8918a71cdbe9 285 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 286 * @param[in] username NULL-terminated string containing the user name to be used
Sergunb 0:8918a71cdbe9 287 * @param[in] password NULL-terminated string containing the password to be used
Sergunb 0:8918a71cdbe9 288 * @return Error code
Sergunb 0:8918a71cdbe9 289 **/
Sergunb 0:8918a71cdbe9 290
Sergunb 0:8918a71cdbe9 291 error_t mqttClientSetAuthInfo(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 292 const char_t *username, const char_t *password)
Sergunb 0:8918a71cdbe9 293 {
Sergunb 0:8918a71cdbe9 294 //Check parameters
Sergunb 0:8918a71cdbe9 295 if(context == NULL || username == NULL || password == NULL)
Sergunb 0:8918a71cdbe9 296 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 297
Sergunb 0:8918a71cdbe9 298 //Make sure the length of the user name is acceptable
Sergunb 0:8918a71cdbe9 299 if(strlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
Sergunb 0:8918a71cdbe9 300 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 301
Sergunb 0:8918a71cdbe9 302 //Save user name
Sergunb 0:8918a71cdbe9 303 strcpy(context->settings.username, username);
Sergunb 0:8918a71cdbe9 304
Sergunb 0:8918a71cdbe9 305 //Make sure the length of the password is acceptable
Sergunb 0:8918a71cdbe9 306 if(strlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
Sergunb 0:8918a71cdbe9 307 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 308
Sergunb 0:8918a71cdbe9 309 //Save password
Sergunb 0:8918a71cdbe9 310 strcpy(context->settings.password, password);
Sergunb 0:8918a71cdbe9 311
Sergunb 0:8918a71cdbe9 312 //Successful processing
Sergunb 0:8918a71cdbe9 313 return NO_ERROR;
Sergunb 0:8918a71cdbe9 314 }
Sergunb 0:8918a71cdbe9 315
Sergunb 0:8918a71cdbe9 316
Sergunb 0:8918a71cdbe9 317 /**
Sergunb 0:8918a71cdbe9 318 * @brief Specify the Will message
Sergunb 0:8918a71cdbe9 319 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 320 * @param[in] topic Will topic name
Sergunb 0:8918a71cdbe9 321 * @param[in] message Will message
Sergunb 0:8918a71cdbe9 322 * @param[in] length Length of the Will message
Sergunb 0:8918a71cdbe9 323 * @param[in] qos QoS level to be used when publishing the Will message
Sergunb 0:8918a71cdbe9 324 * @param[in] retain This flag specifies if the Will message is to be retained
Sergunb 0:8918a71cdbe9 325 * @return Error code
Sergunb 0:8918a71cdbe9 326 **/
Sergunb 0:8918a71cdbe9 327
Sergunb 0:8918a71cdbe9 328 error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic,
Sergunb 0:8918a71cdbe9 329 const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Sergunb 0:8918a71cdbe9 330 {
Sergunb 0:8918a71cdbe9 331 MqttClientWillMessage *willMessage;
Sergunb 0:8918a71cdbe9 332
Sergunb 0:8918a71cdbe9 333 //Check parameters
Sergunb 0:8918a71cdbe9 334 if(context == NULL || topic == NULL)
Sergunb 0:8918a71cdbe9 335 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 336
Sergunb 0:8918a71cdbe9 337 //Make sure the length of the Will topic is acceptable
Sergunb 0:8918a71cdbe9 338 if(strlen(topic) > MQTT_CLIENT_MAX_WILL_TOPIC_LEN)
Sergunb 0:8918a71cdbe9 339 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 340
Sergunb 0:8918a71cdbe9 341 //Point to the Will message
Sergunb 0:8918a71cdbe9 342 willMessage = &context->settings.willMessage;
Sergunb 0:8918a71cdbe9 343
Sergunb 0:8918a71cdbe9 344 //Save Will topic
Sergunb 0:8918a71cdbe9 345 strcpy(willMessage->topic, topic);
Sergunb 0:8918a71cdbe9 346
Sergunb 0:8918a71cdbe9 347 //Any message payload
Sergunb 0:8918a71cdbe9 348 if(length > 0)
Sergunb 0:8918a71cdbe9 349 {
Sergunb 0:8918a71cdbe9 350 //Sanity check
Sergunb 0:8918a71cdbe9 351 if(message == NULL)
Sergunb 0:8918a71cdbe9 352 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 353
Sergunb 0:8918a71cdbe9 354 //Make sure the length of the Will message payload is acceptable
Sergunb 0:8918a71cdbe9 355 if(strlen(message) > MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN)
Sergunb 0:8918a71cdbe9 356 return ERROR_INVALID_LENGTH;
Sergunb 0:8918a71cdbe9 357
Sergunb 0:8918a71cdbe9 358 //Save Will message payload
Sergunb 0:8918a71cdbe9 359 memcpy(willMessage->payload, message, length);
Sergunb 0:8918a71cdbe9 360 }
Sergunb 0:8918a71cdbe9 361
Sergunb 0:8918a71cdbe9 362 //Length of the Will message payload
Sergunb 0:8918a71cdbe9 363 willMessage->length = length;
Sergunb 0:8918a71cdbe9 364 //QoS level to be used when publishing the Will message
Sergunb 0:8918a71cdbe9 365 willMessage->qos = qos;
Sergunb 0:8918a71cdbe9 366 //This flag specifies if the Will message is to be retained
Sergunb 0:8918a71cdbe9 367 willMessage->retain = retain;
Sergunb 0:8918a71cdbe9 368
Sergunb 0:8918a71cdbe9 369 //Successful processing
Sergunb 0:8918a71cdbe9 370 return NO_ERROR;
Sergunb 0:8918a71cdbe9 371 }
Sergunb 0:8918a71cdbe9 372
Sergunb 0:8918a71cdbe9 373
Sergunb 0:8918a71cdbe9 374 /**
Sergunb 0:8918a71cdbe9 375 * @brief Bind the MQTT client to a particular network interface
Sergunb 0:8918a71cdbe9 376 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 377 * @param[in] interface Network interface to be used
Sergunb 0:8918a71cdbe9 378 * @return Error code
Sergunb 0:8918a71cdbe9 379 **/
Sergunb 0:8918a71cdbe9 380
Sergunb 0:8918a71cdbe9 381 error_t mqttClientBindToInterface(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 382 NetInterface *interface)
Sergunb 0:8918a71cdbe9 383 {
Sergunb 0:8918a71cdbe9 384 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 385 if(context == NULL)
Sergunb 0:8918a71cdbe9 386 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 387
Sergunb 0:8918a71cdbe9 388 //Explicitly associate the MQTT client with the specified interface
Sergunb 0:8918a71cdbe9 389 context->interface = interface;
Sergunb 0:8918a71cdbe9 390
Sergunb 0:8918a71cdbe9 391 //Successful processing
Sergunb 0:8918a71cdbe9 392 return NO_ERROR;
Sergunb 0:8918a71cdbe9 393 }
Sergunb 0:8918a71cdbe9 394
Sergunb 0:8918a71cdbe9 395
Sergunb 0:8918a71cdbe9 396 /**
Sergunb 0:8918a71cdbe9 397 * @brief Establish connection with the MQTT server
Sergunb 0:8918a71cdbe9 398 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 399 * @param[in] serverIpAddr IP address of the MQTT server to connect to
Sergunb 0:8918a71cdbe9 400 * @param[in] serverPort TCP port number that will be used to establish the
Sergunb 0:8918a71cdbe9 401 * connection
Sergunb 0:8918a71cdbe9 402 * @param[in] cleanSession If this flag is set, then the client and server
Sergunb 0:8918a71cdbe9 403 * must discard any previous session and start a new one
Sergunb 0:8918a71cdbe9 404 * @return Error code
Sergunb 0:8918a71cdbe9 405 **/
Sergunb 0:8918a71cdbe9 406
Sergunb 0:8918a71cdbe9 407 error_t mqttClientConnect(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 408 const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Sergunb 0:8918a71cdbe9 409 {
Sergunb 0:8918a71cdbe9 410 error_t error;
Sergunb 0:8918a71cdbe9 411
Sergunb 0:8918a71cdbe9 412 //Check parameters
Sergunb 0:8918a71cdbe9 413 if(context == NULL || serverIpAddr == NULL)
Sergunb 0:8918a71cdbe9 414 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 415
Sergunb 0:8918a71cdbe9 416 //Initialize status code
Sergunb 0:8918a71cdbe9 417 error = NO_ERROR;
Sergunb 0:8918a71cdbe9 418
Sergunb 0:8918a71cdbe9 419 //Establish network connection
Sergunb 0:8918a71cdbe9 420 while(context->state != MQTT_CLIENT_STATE_IDLE)
Sergunb 0:8918a71cdbe9 421 {
Sergunb 0:8918a71cdbe9 422 //Check current state
Sergunb 0:8918a71cdbe9 423 if(context->state == MQTT_CLIENT_STATE_CLOSED)
Sergunb 0:8918a71cdbe9 424 {
Sergunb 0:8918a71cdbe9 425 //Open network connection
Sergunb 0:8918a71cdbe9 426 error = mqttClientOpenConnection(context);
Sergunb 0:8918a71cdbe9 427
Sergunb 0:8918a71cdbe9 428 //Check status code
Sergunb 0:8918a71cdbe9 429 if(!error)
Sergunb 0:8918a71cdbe9 430 {
Sergunb 0:8918a71cdbe9 431 //Debug message
Sergunb 0:8918a71cdbe9 432 TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
Sergunb 0:8918a71cdbe9 433 ipAddrToString(serverIpAddr, NULL), serverPort);
Sergunb 0:8918a71cdbe9 434
Sergunb 0:8918a71cdbe9 435 //The network connection is open
Sergunb 0:8918a71cdbe9 436 mqttClientChangeState(context, MQTT_CLIENT_STATE_CONNECTING);
Sergunb 0:8918a71cdbe9 437 }
Sergunb 0:8918a71cdbe9 438 else
Sergunb 0:8918a71cdbe9 439 {
Sergunb 0:8918a71cdbe9 440 //Clean up side effects
Sergunb 0:8918a71cdbe9 441 mqttClientCloseConnection(context);
Sergunb 0:8918a71cdbe9 442 }
Sergunb 0:8918a71cdbe9 443 }
Sergunb 0:8918a71cdbe9 444 else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
Sergunb 0:8918a71cdbe9 445 {
Sergunb 0:8918a71cdbe9 446 //Establish network connection
Sergunb 0:8918a71cdbe9 447 error = mqttClientEstablishConnection(context,
Sergunb 0:8918a71cdbe9 448 serverIpAddr, serverPort);
Sergunb 0:8918a71cdbe9 449
Sergunb 0:8918a71cdbe9 450 //Check status code
Sergunb 0:8918a71cdbe9 451 if(!error)
Sergunb 0:8918a71cdbe9 452 {
Sergunb 0:8918a71cdbe9 453 //Debug message
Sergunb 0:8918a71cdbe9 454 TRACE_INFO("MQTT: Connected to server\r\n");
Sergunb 0:8918a71cdbe9 455
Sergunb 0:8918a71cdbe9 456 //The network connection is established
Sergunb 0:8918a71cdbe9 457 mqttClientChangeState(context, MQTT_CLIENT_STATE_CONNECTED);
Sergunb 0:8918a71cdbe9 458 }
Sergunb 0:8918a71cdbe9 459 }
Sergunb 0:8918a71cdbe9 460 else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
Sergunb 0:8918a71cdbe9 461 {
Sergunb 0:8918a71cdbe9 462 //Format CONNECT packet
Sergunb 0:8918a71cdbe9 463 error = mqttClientFormatConnect(context, cleanSession);
Sergunb 0:8918a71cdbe9 464
Sergunb 0:8918a71cdbe9 465 //Check status code
Sergunb 0:8918a71cdbe9 466 if(!error)
Sergunb 0:8918a71cdbe9 467 {
Sergunb 0:8918a71cdbe9 468 //Debug message
Sergunb 0:8918a71cdbe9 469 TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
Sergunb 0:8918a71cdbe9 470 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
Sergunb 0:8918a71cdbe9 471
Sergunb 0:8918a71cdbe9 472 //Save the type of the MQTT packet to be sent
Sergunb 0:8918a71cdbe9 473 context->packetType = MQTT_PACKET_TYPE_CONNECT;
Sergunb 0:8918a71cdbe9 474 //Point to the beginning of the packet
Sergunb 0:8918a71cdbe9 475 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 476
Sergunb 0:8918a71cdbe9 477 //Send CONNECT packet
Sergunb 0:8918a71cdbe9 478 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
Sergunb 0:8918a71cdbe9 479 }
Sergunb 0:8918a71cdbe9 480 }
Sergunb 0:8918a71cdbe9 481 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 482 {
Sergunb 0:8918a71cdbe9 483 //Send more data
Sergunb 0:8918a71cdbe9 484 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 485 }
Sergunb 0:8918a71cdbe9 486 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 487 {
Sergunb 0:8918a71cdbe9 488 //Wait for CONNACK packet
Sergunb 0:8918a71cdbe9 489 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 490 }
Sergunb 0:8918a71cdbe9 491 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 492 {
Sergunb 0:8918a71cdbe9 493 //Receive more data
Sergunb 0:8918a71cdbe9 494 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 495 }
Sergunb 0:8918a71cdbe9 496 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
Sergunb 0:8918a71cdbe9 497 {
Sergunb 0:8918a71cdbe9 498 //Reset packet type
Sergunb 0:8918a71cdbe9 499 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 500 //A CONNACK packet has been received
Sergunb 0:8918a71cdbe9 501 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 502 }
Sergunb 0:8918a71cdbe9 503 else
Sergunb 0:8918a71cdbe9 504 {
Sergunb 0:8918a71cdbe9 505 //Invalid state
Sergunb 0:8918a71cdbe9 506 error = ERROR_NOT_CONNECTED;
Sergunb 0:8918a71cdbe9 507 }
Sergunb 0:8918a71cdbe9 508
Sergunb 0:8918a71cdbe9 509 //Any error to report?
Sergunb 0:8918a71cdbe9 510 if(error)
Sergunb 0:8918a71cdbe9 511 {
Sergunb 0:8918a71cdbe9 512 #if (NET_RTOS_SUPPORT == DISABLED)
Sergunb 0:8918a71cdbe9 513 //Timeout error?
Sergunb 0:8918a71cdbe9 514 if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
Sergunb 0:8918a71cdbe9 515 break;
Sergunb 0:8918a71cdbe9 516 #endif
Sergunb 0:8918a71cdbe9 517 //Close connection
Sergunb 0:8918a71cdbe9 518 mqttClientCloseConnection(context);
Sergunb 0:8918a71cdbe9 519 //The connection is closed
Sergunb 0:8918a71cdbe9 520 mqttClientChangeState(context, MQTT_CLIENT_STATE_CLOSED);
Sergunb 0:8918a71cdbe9 521 //Exit immediately
Sergunb 0:8918a71cdbe9 522 break;
Sergunb 0:8918a71cdbe9 523 }
Sergunb 0:8918a71cdbe9 524 }
Sergunb 0:8918a71cdbe9 525
Sergunb 0:8918a71cdbe9 526 //Return status code
Sergunb 0:8918a71cdbe9 527 return error;
Sergunb 0:8918a71cdbe9 528 }
Sergunb 0:8918a71cdbe9 529
Sergunb 0:8918a71cdbe9 530
Sergunb 0:8918a71cdbe9 531 /**
Sergunb 0:8918a71cdbe9 532 * @brief Publish message
Sergunb 0:8918a71cdbe9 533 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 534 * @param[in] topic Topic name
Sergunb 0:8918a71cdbe9 535 * @param[in] message Message payload
Sergunb 0:8918a71cdbe9 536 * @param[in] length Length of the message payload
Sergunb 0:8918a71cdbe9 537 * @param[in] qos QoS level to be used when publishing the message
Sergunb 0:8918a71cdbe9 538 * @param[in] retain This flag specifies if the message is to be retained
Sergunb 0:8918a71cdbe9 539 * @param[out] packetId Packet identifier used to send the PUBLISH packet
Sergunb 0:8918a71cdbe9 540 * @return Error code
Sergunb 0:8918a71cdbe9 541 **/
Sergunb 0:8918a71cdbe9 542
Sergunb 0:8918a71cdbe9 543 error_t mqttClientPublish(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 544 const char_t *topic, const void *message, size_t length,
Sergunb 0:8918a71cdbe9 545 MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Sergunb 0:8918a71cdbe9 546 {
Sergunb 0:8918a71cdbe9 547 error_t error;
Sergunb 0:8918a71cdbe9 548
Sergunb 0:8918a71cdbe9 549 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 550 if(context == NULL)
Sergunb 0:8918a71cdbe9 551 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 552
Sergunb 0:8918a71cdbe9 553 //Initialize status code
Sergunb 0:8918a71cdbe9 554 error = NO_ERROR;
Sergunb 0:8918a71cdbe9 555
Sergunb 0:8918a71cdbe9 556 //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
Sergunb 0:8918a71cdbe9 557 do
Sergunb 0:8918a71cdbe9 558 {
Sergunb 0:8918a71cdbe9 559 //Check current state
Sergunb 0:8918a71cdbe9 560 if(context->state == MQTT_CLIENT_STATE_IDLE)
Sergunb 0:8918a71cdbe9 561 {
Sergunb 0:8918a71cdbe9 562 //Format PUBLISH packet
Sergunb 0:8918a71cdbe9 563 error = mqttClientFormatPublish(context, topic, message, length, qos, retain);
Sergunb 0:8918a71cdbe9 564
Sergunb 0:8918a71cdbe9 565 //Check status code
Sergunb 0:8918a71cdbe9 566 if(!error)
Sergunb 0:8918a71cdbe9 567 {
Sergunb 0:8918a71cdbe9 568 //Save the packet identifier used to send the PUBLISH packet
Sergunb 0:8918a71cdbe9 569 if(packetId != NULL)
Sergunb 0:8918a71cdbe9 570 *packetId = context->packetId;
Sergunb 0:8918a71cdbe9 571
Sergunb 0:8918a71cdbe9 572 //Debug message
Sergunb 0:8918a71cdbe9 573 TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
Sergunb 0:8918a71cdbe9 574 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
Sergunb 0:8918a71cdbe9 575
Sergunb 0:8918a71cdbe9 576 //Save the type of the MQTT packet to be sent
Sergunb 0:8918a71cdbe9 577 context->packetType = MQTT_PACKET_TYPE_PUBLISH;
Sergunb 0:8918a71cdbe9 578 //Point to the beginning of the packet
Sergunb 0:8918a71cdbe9 579 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 580
Sergunb 0:8918a71cdbe9 581 //Send PUBLISH packet
Sergunb 0:8918a71cdbe9 582 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
Sergunb 0:8918a71cdbe9 583 }
Sergunb 0:8918a71cdbe9 584 }
Sergunb 0:8918a71cdbe9 585 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 586 {
Sergunb 0:8918a71cdbe9 587 //Send more data
Sergunb 0:8918a71cdbe9 588 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 589 }
Sergunb 0:8918a71cdbe9 590 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 591 {
Sergunb 0:8918a71cdbe9 592 //The last parameter is optional
Sergunb 0:8918a71cdbe9 593 if(packetId != NULL)
Sergunb 0:8918a71cdbe9 594 {
Sergunb 0:8918a71cdbe9 595 //Reset packet type
Sergunb 0:8918a71cdbe9 596 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 597 //Do not wait for PUBACK/PUBCOMP packet
Sergunb 0:8918a71cdbe9 598 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 599 }
Sergunb 0:8918a71cdbe9 600 else
Sergunb 0:8918a71cdbe9 601 {
Sergunb 0:8918a71cdbe9 602 //Check QoS level
Sergunb 0:8918a71cdbe9 603 if(qos == MQTT_QOS_LEVEL_0)
Sergunb 0:8918a71cdbe9 604 {
Sergunb 0:8918a71cdbe9 605 //Reset packet type
Sergunb 0:8918a71cdbe9 606 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 607 //No response is sent by the receiver and no retry is performed by the sender
Sergunb 0:8918a71cdbe9 608 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 609 }
Sergunb 0:8918a71cdbe9 610 else
Sergunb 0:8918a71cdbe9 611 {
Sergunb 0:8918a71cdbe9 612 //Wait for PUBACK/PUBCOMP packet
Sergunb 0:8918a71cdbe9 613 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 614 }
Sergunb 0:8918a71cdbe9 615 }
Sergunb 0:8918a71cdbe9 616 }
Sergunb 0:8918a71cdbe9 617 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 618 {
Sergunb 0:8918a71cdbe9 619 //Receive more data
Sergunb 0:8918a71cdbe9 620 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 621 }
Sergunb 0:8918a71cdbe9 622 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
Sergunb 0:8918a71cdbe9 623 {
Sergunb 0:8918a71cdbe9 624 //Reset packet type
Sergunb 0:8918a71cdbe9 625 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 626 //A PUBACK/PUBCOMP packet has been received
Sergunb 0:8918a71cdbe9 627 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 628 }
Sergunb 0:8918a71cdbe9 629 else
Sergunb 0:8918a71cdbe9 630 {
Sergunb 0:8918a71cdbe9 631 //Invalid state
Sergunb 0:8918a71cdbe9 632 error = ERROR_NOT_CONNECTED;
Sergunb 0:8918a71cdbe9 633 }
Sergunb 0:8918a71cdbe9 634
Sergunb 0:8918a71cdbe9 635 //Any error to report?
Sergunb 0:8918a71cdbe9 636 if(error)
Sergunb 0:8918a71cdbe9 637 break;
Sergunb 0:8918a71cdbe9 638
Sergunb 0:8918a71cdbe9 639 //Evaluate the loop condition
Sergunb 0:8918a71cdbe9 640 } while(context->state != MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 641
Sergunb 0:8918a71cdbe9 642 //Return status code
Sergunb 0:8918a71cdbe9 643 return error;
Sergunb 0:8918a71cdbe9 644 }
Sergunb 0:8918a71cdbe9 645
Sergunb 0:8918a71cdbe9 646
Sergunb 0:8918a71cdbe9 647 /**
Sergunb 0:8918a71cdbe9 648 * @brief Subscribe to topics
Sergunb 0:8918a71cdbe9 649 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 650 * @param[in] topic Topic filter
Sergunb 0:8918a71cdbe9 651 * @param[in] qos Maximum QoS level at which the server can send application
Sergunb 0:8918a71cdbe9 652 * messages to the client
Sergunb 0:8918a71cdbe9 653 * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
Sergunb 0:8918a71cdbe9 654 * @return Error code
Sergunb 0:8918a71cdbe9 655 **/
Sergunb 0:8918a71cdbe9 656
Sergunb 0:8918a71cdbe9 657 error_t mqttClientSubscribe(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 658 const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Sergunb 0:8918a71cdbe9 659 {
Sergunb 0:8918a71cdbe9 660 error_t error;
Sergunb 0:8918a71cdbe9 661
Sergunb 0:8918a71cdbe9 662 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 663 if(context == NULL)
Sergunb 0:8918a71cdbe9 664 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 665
Sergunb 0:8918a71cdbe9 666 //Initialize status code
Sergunb 0:8918a71cdbe9 667 error = NO_ERROR;
Sergunb 0:8918a71cdbe9 668
Sergunb 0:8918a71cdbe9 669 //Send SUBSCRIBE packet and wait for SUBACK packet to be received
Sergunb 0:8918a71cdbe9 670 do
Sergunb 0:8918a71cdbe9 671 {
Sergunb 0:8918a71cdbe9 672 //Check current state
Sergunb 0:8918a71cdbe9 673 if(context->state == MQTT_CLIENT_STATE_IDLE)
Sergunb 0:8918a71cdbe9 674 {
Sergunb 0:8918a71cdbe9 675 //Format SUBSCRIBE packet
Sergunb 0:8918a71cdbe9 676 error = mqttClientFormatSubscribe(context, topic, qos);
Sergunb 0:8918a71cdbe9 677
Sergunb 0:8918a71cdbe9 678 //Check status code
Sergunb 0:8918a71cdbe9 679 if(!error)
Sergunb 0:8918a71cdbe9 680 {
Sergunb 0:8918a71cdbe9 681 //Save the packet identifier used to send the SUBSCRIBE packet
Sergunb 0:8918a71cdbe9 682 if(packetId != NULL)
Sergunb 0:8918a71cdbe9 683 *packetId = context->packetId;
Sergunb 0:8918a71cdbe9 684
Sergunb 0:8918a71cdbe9 685 //Debug message
Sergunb 0:8918a71cdbe9 686 TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
Sergunb 0:8918a71cdbe9 687 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
Sergunb 0:8918a71cdbe9 688
Sergunb 0:8918a71cdbe9 689 //Save the type of the MQTT packet to be sent
Sergunb 0:8918a71cdbe9 690 context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
Sergunb 0:8918a71cdbe9 691 //Point to the beginning of the packet
Sergunb 0:8918a71cdbe9 692 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 693
Sergunb 0:8918a71cdbe9 694 //Send SUBSCRIBE packet
Sergunb 0:8918a71cdbe9 695 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
Sergunb 0:8918a71cdbe9 696 }
Sergunb 0:8918a71cdbe9 697 }
Sergunb 0:8918a71cdbe9 698 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 699 {
Sergunb 0:8918a71cdbe9 700 //Send more data
Sergunb 0:8918a71cdbe9 701 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 702 }
Sergunb 0:8918a71cdbe9 703 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 704 {
Sergunb 0:8918a71cdbe9 705 //The last parameter is optional
Sergunb 0:8918a71cdbe9 706 if(packetId != NULL)
Sergunb 0:8918a71cdbe9 707 {
Sergunb 0:8918a71cdbe9 708 //Reset packet type
Sergunb 0:8918a71cdbe9 709 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 710 //Do not wait for SUBACK packet
Sergunb 0:8918a71cdbe9 711 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 712 }
Sergunb 0:8918a71cdbe9 713 else
Sergunb 0:8918a71cdbe9 714 {
Sergunb 0:8918a71cdbe9 715 //Wait for SUBACK packet
Sergunb 0:8918a71cdbe9 716 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 717 }
Sergunb 0:8918a71cdbe9 718 }
Sergunb 0:8918a71cdbe9 719 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 720 {
Sergunb 0:8918a71cdbe9 721 //Receive more data
Sergunb 0:8918a71cdbe9 722 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 723 }
Sergunb 0:8918a71cdbe9 724 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
Sergunb 0:8918a71cdbe9 725 {
Sergunb 0:8918a71cdbe9 726 //Reset packet type
Sergunb 0:8918a71cdbe9 727 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 728 //A SUBACK packet has been received
Sergunb 0:8918a71cdbe9 729 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 730 }
Sergunb 0:8918a71cdbe9 731 else
Sergunb 0:8918a71cdbe9 732 {
Sergunb 0:8918a71cdbe9 733 //Invalid state
Sergunb 0:8918a71cdbe9 734 error = ERROR_NOT_CONNECTED;
Sergunb 0:8918a71cdbe9 735 }
Sergunb 0:8918a71cdbe9 736
Sergunb 0:8918a71cdbe9 737 //Any error to report?
Sergunb 0:8918a71cdbe9 738 if(error)
Sergunb 0:8918a71cdbe9 739 break;
Sergunb 0:8918a71cdbe9 740
Sergunb 0:8918a71cdbe9 741 //Evaluate the loop condition
Sergunb 0:8918a71cdbe9 742 } while(context->state != MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 743
Sergunb 0:8918a71cdbe9 744 //Return status code
Sergunb 0:8918a71cdbe9 745 return error;
Sergunb 0:8918a71cdbe9 746 }
Sergunb 0:8918a71cdbe9 747
Sergunb 0:8918a71cdbe9 748
Sergunb 0:8918a71cdbe9 749 /**
Sergunb 0:8918a71cdbe9 750 * @brief Unsubscribe from topics
Sergunb 0:8918a71cdbe9 751 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 752 * @param[in] topic Topic filter
Sergunb 0:8918a71cdbe9 753 * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
Sergunb 0:8918a71cdbe9 754 * @return Error code
Sergunb 0:8918a71cdbe9 755 **/
Sergunb 0:8918a71cdbe9 756
Sergunb 0:8918a71cdbe9 757 error_t mqttClientUnsubscribe(MqttClientContext *context,
Sergunb 0:8918a71cdbe9 758 const char_t *topic, uint16_t *packetId)
Sergunb 0:8918a71cdbe9 759 {
Sergunb 0:8918a71cdbe9 760 error_t error;
Sergunb 0:8918a71cdbe9 761
Sergunb 0:8918a71cdbe9 762 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 763 if(context == NULL)
Sergunb 0:8918a71cdbe9 764 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 765
Sergunb 0:8918a71cdbe9 766 //Initialize status code
Sergunb 0:8918a71cdbe9 767 error = NO_ERROR;
Sergunb 0:8918a71cdbe9 768
Sergunb 0:8918a71cdbe9 769 //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
Sergunb 0:8918a71cdbe9 770 do
Sergunb 0:8918a71cdbe9 771 {
Sergunb 0:8918a71cdbe9 772 //Check current state
Sergunb 0:8918a71cdbe9 773 if(context->state == MQTT_CLIENT_STATE_IDLE)
Sergunb 0:8918a71cdbe9 774 {
Sergunb 0:8918a71cdbe9 775 //Format UNSUBSCRIBE packet
Sergunb 0:8918a71cdbe9 776 error = mqttClientFormatUnsubscribe(context, topic);
Sergunb 0:8918a71cdbe9 777
Sergunb 0:8918a71cdbe9 778 //Check status code
Sergunb 0:8918a71cdbe9 779 if(!error)
Sergunb 0:8918a71cdbe9 780 {
Sergunb 0:8918a71cdbe9 781 //Save the packet identifier used to send the UNSUBSCRIBE packet
Sergunb 0:8918a71cdbe9 782 if(packetId != NULL)
Sergunb 0:8918a71cdbe9 783 *packetId = context->packetId;
Sergunb 0:8918a71cdbe9 784
Sergunb 0:8918a71cdbe9 785 //Debug message
Sergunb 0:8918a71cdbe9 786 TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
Sergunb 0:8918a71cdbe9 787 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
Sergunb 0:8918a71cdbe9 788
Sergunb 0:8918a71cdbe9 789 //Save the type of the MQTT packet to be sent
Sergunb 0:8918a71cdbe9 790 context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
Sergunb 0:8918a71cdbe9 791 //Point to the beginning of the packet
Sergunb 0:8918a71cdbe9 792 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 793
Sergunb 0:8918a71cdbe9 794 //Send UNSUBSCRIBE packet
Sergunb 0:8918a71cdbe9 795 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
Sergunb 0:8918a71cdbe9 796 }
Sergunb 0:8918a71cdbe9 797 }
Sergunb 0:8918a71cdbe9 798 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 799 {
Sergunb 0:8918a71cdbe9 800 //Send more data
Sergunb 0:8918a71cdbe9 801 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 802 }
Sergunb 0:8918a71cdbe9 803 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 804 {
Sergunb 0:8918a71cdbe9 805 //The last parameter is optional
Sergunb 0:8918a71cdbe9 806 if(packetId != NULL)
Sergunb 0:8918a71cdbe9 807 {
Sergunb 0:8918a71cdbe9 808 //Reset packet type
Sergunb 0:8918a71cdbe9 809 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 810 //Do not wait for UNSUBACK packet
Sergunb 0:8918a71cdbe9 811 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 812 }
Sergunb 0:8918a71cdbe9 813 else
Sergunb 0:8918a71cdbe9 814 {
Sergunb 0:8918a71cdbe9 815 //Wait for UNSUBACK packet
Sergunb 0:8918a71cdbe9 816 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 817 }
Sergunb 0:8918a71cdbe9 818 }
Sergunb 0:8918a71cdbe9 819 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 820 {
Sergunb 0:8918a71cdbe9 821 //Receive more data
Sergunb 0:8918a71cdbe9 822 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 823 }
Sergunb 0:8918a71cdbe9 824 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
Sergunb 0:8918a71cdbe9 825 {
Sergunb 0:8918a71cdbe9 826 //Reset packet type
Sergunb 0:8918a71cdbe9 827 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 828 //An UNSUBACK packet has been received
Sergunb 0:8918a71cdbe9 829 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 830 }
Sergunb 0:8918a71cdbe9 831 else
Sergunb 0:8918a71cdbe9 832 {
Sergunb 0:8918a71cdbe9 833 //Invalid state
Sergunb 0:8918a71cdbe9 834 error = ERROR_NOT_CONNECTED;
Sergunb 0:8918a71cdbe9 835 }
Sergunb 0:8918a71cdbe9 836
Sergunb 0:8918a71cdbe9 837 //Any error to report?
Sergunb 0:8918a71cdbe9 838 if(error)
Sergunb 0:8918a71cdbe9 839 break;
Sergunb 0:8918a71cdbe9 840
Sergunb 0:8918a71cdbe9 841 //Evaluate the loop condition
Sergunb 0:8918a71cdbe9 842 } while(context->state != MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 843
Sergunb 0:8918a71cdbe9 844 //Return status code
Sergunb 0:8918a71cdbe9 845 return error;
Sergunb 0:8918a71cdbe9 846 }
Sergunb 0:8918a71cdbe9 847
Sergunb 0:8918a71cdbe9 848
Sergunb 0:8918a71cdbe9 849 /**
Sergunb 0:8918a71cdbe9 850 * @brief Send ping request
Sergunb 0:8918a71cdbe9 851 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 852 * @param[out] rtt Round-trip time (optional parameter)
Sergunb 0:8918a71cdbe9 853 * @return Error code
Sergunb 0:8918a71cdbe9 854 **/
Sergunb 0:8918a71cdbe9 855
Sergunb 0:8918a71cdbe9 856 error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Sergunb 0:8918a71cdbe9 857 {
Sergunb 0:8918a71cdbe9 858 error_t error;
Sergunb 0:8918a71cdbe9 859
Sergunb 0:8918a71cdbe9 860 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 861 if(context == NULL)
Sergunb 0:8918a71cdbe9 862 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 863
Sergunb 0:8918a71cdbe9 864 //Initialize status code
Sergunb 0:8918a71cdbe9 865 error = NO_ERROR;
Sergunb 0:8918a71cdbe9 866
Sergunb 0:8918a71cdbe9 867 //Send PINGREQ packet and wait for PINGRESP packet to be received
Sergunb 0:8918a71cdbe9 868 do
Sergunb 0:8918a71cdbe9 869 {
Sergunb 0:8918a71cdbe9 870 //Check current state
Sergunb 0:8918a71cdbe9 871 if(context->state == MQTT_CLIENT_STATE_IDLE)
Sergunb 0:8918a71cdbe9 872 {
Sergunb 0:8918a71cdbe9 873 //Format PINGREQ packet
Sergunb 0:8918a71cdbe9 874 error = mqttClientFormatPingReq(context);
Sergunb 0:8918a71cdbe9 875
Sergunb 0:8918a71cdbe9 876 //Check status code
Sergunb 0:8918a71cdbe9 877 if(!error)
Sergunb 0:8918a71cdbe9 878 {
Sergunb 0:8918a71cdbe9 879 //Debug message
Sergunb 0:8918a71cdbe9 880 TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
Sergunb 0:8918a71cdbe9 881 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
Sergunb 0:8918a71cdbe9 882
Sergunb 0:8918a71cdbe9 883 //Save the type of the MQTT packet to be sent
Sergunb 0:8918a71cdbe9 884 context->packetType = MQTT_PACKET_TYPE_PINGREQ;
Sergunb 0:8918a71cdbe9 885 //Point to the beginning of the packet
Sergunb 0:8918a71cdbe9 886 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 887
Sergunb 0:8918a71cdbe9 888 //Send PINGREQ packet
Sergunb 0:8918a71cdbe9 889 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
Sergunb 0:8918a71cdbe9 890
Sergunb 0:8918a71cdbe9 891 //Save the time at which the request was sent
Sergunb 0:8918a71cdbe9 892 if(rtt != NULL)
Sergunb 0:8918a71cdbe9 893 context->pingTimestamp = osGetSystemTime();
Sergunb 0:8918a71cdbe9 894 }
Sergunb 0:8918a71cdbe9 895 }
Sergunb 0:8918a71cdbe9 896 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 897 {
Sergunb 0:8918a71cdbe9 898 //Send more data
Sergunb 0:8918a71cdbe9 899 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 900 }
Sergunb 0:8918a71cdbe9 901 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 902 {
Sergunb 0:8918a71cdbe9 903 //The last parameter is optional
Sergunb 0:8918a71cdbe9 904 if(rtt != NULL)
Sergunb 0:8918a71cdbe9 905 {
Sergunb 0:8918a71cdbe9 906 //Wait for PINGRESP packet
Sergunb 0:8918a71cdbe9 907 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 908 }
Sergunb 0:8918a71cdbe9 909 else
Sergunb 0:8918a71cdbe9 910 {
Sergunb 0:8918a71cdbe9 911 //Reset packet type
Sergunb 0:8918a71cdbe9 912 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 913 //Do not wait for PINGRESP packet
Sergunb 0:8918a71cdbe9 914 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 915 }
Sergunb 0:8918a71cdbe9 916 }
Sergunb 0:8918a71cdbe9 917 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 918 {
Sergunb 0:8918a71cdbe9 919 //Receive more data
Sergunb 0:8918a71cdbe9 920 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 921 }
Sergunb 0:8918a71cdbe9 922 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
Sergunb 0:8918a71cdbe9 923 {
Sergunb 0:8918a71cdbe9 924 //The last parameter is optional
Sergunb 0:8918a71cdbe9 925 if(rtt != NULL)
Sergunb 0:8918a71cdbe9 926 {
Sergunb 0:8918a71cdbe9 927 //Compute round-trip time
Sergunb 0:8918a71cdbe9 928 *rtt = osGetSystemTime() - context->pingTimestamp;
Sergunb 0:8918a71cdbe9 929 }
Sergunb 0:8918a71cdbe9 930
Sergunb 0:8918a71cdbe9 931 //Reset packet type
Sergunb 0:8918a71cdbe9 932 context->packetType = MQTT_PACKET_TYPE_INVALID;
Sergunb 0:8918a71cdbe9 933 //A PINGRESP packet has been received
Sergunb 0:8918a71cdbe9 934 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 935 }
Sergunb 0:8918a71cdbe9 936 else
Sergunb 0:8918a71cdbe9 937 {
Sergunb 0:8918a71cdbe9 938 //Invalid state
Sergunb 0:8918a71cdbe9 939 error = ERROR_NOT_CONNECTED;
Sergunb 0:8918a71cdbe9 940 }
Sergunb 0:8918a71cdbe9 941
Sergunb 0:8918a71cdbe9 942 //Any error to report?
Sergunb 0:8918a71cdbe9 943 if(error)
Sergunb 0:8918a71cdbe9 944 break;
Sergunb 0:8918a71cdbe9 945
Sergunb 0:8918a71cdbe9 946 //Evaluate the loop condition
Sergunb 0:8918a71cdbe9 947 } while(context->state != MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 948
Sergunb 0:8918a71cdbe9 949 //Return status code
Sergunb 0:8918a71cdbe9 950 return error;
Sergunb 0:8918a71cdbe9 951 }
Sergunb 0:8918a71cdbe9 952
Sergunb 0:8918a71cdbe9 953
Sergunb 0:8918a71cdbe9 954 /**
Sergunb 0:8918a71cdbe9 955 * @brief Gracefully disconnect from the MQTT server
Sergunb 0:8918a71cdbe9 956 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 957 * @return Error code
Sergunb 0:8918a71cdbe9 958 **/
Sergunb 0:8918a71cdbe9 959
Sergunb 0:8918a71cdbe9 960 error_t mqttClientDisconnect(MqttClientContext *context)
Sergunb 0:8918a71cdbe9 961 {
Sergunb 0:8918a71cdbe9 962 error_t error;
Sergunb 0:8918a71cdbe9 963
Sergunb 0:8918a71cdbe9 964 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 965 if(context == NULL)
Sergunb 0:8918a71cdbe9 966 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 967
Sergunb 0:8918a71cdbe9 968 //Initialize status code
Sergunb 0:8918a71cdbe9 969 error = NO_ERROR;
Sergunb 0:8918a71cdbe9 970
Sergunb 0:8918a71cdbe9 971 //Send DISCONNECT packet and shutdown network connection
Sergunb 0:8918a71cdbe9 972 while(context->state != MQTT_CLIENT_STATE_DISCONNECTED)
Sergunb 0:8918a71cdbe9 973 {
Sergunb 0:8918a71cdbe9 974 //Check current state
Sergunb 0:8918a71cdbe9 975 if(context->state == MQTT_CLIENT_STATE_IDLE)
Sergunb 0:8918a71cdbe9 976 {
Sergunb 0:8918a71cdbe9 977 //Format DISCONNECT packet
Sergunb 0:8918a71cdbe9 978 error = mqttClientFormatDisconnect(context);
Sergunb 0:8918a71cdbe9 979
Sergunb 0:8918a71cdbe9 980 //Check status code
Sergunb 0:8918a71cdbe9 981 if(!error)
Sergunb 0:8918a71cdbe9 982 {
Sergunb 0:8918a71cdbe9 983 //Debug message
Sergunb 0:8918a71cdbe9 984 TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
Sergunb 0:8918a71cdbe9 985 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
Sergunb 0:8918a71cdbe9 986
Sergunb 0:8918a71cdbe9 987 //Save the type of the MQTT packet to be sent
Sergunb 0:8918a71cdbe9 988 context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
Sergunb 0:8918a71cdbe9 989 //Point to the beginning of the packet
Sergunb 0:8918a71cdbe9 990 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 991
Sergunb 0:8918a71cdbe9 992 //Send DISCONNECT packet
Sergunb 0:8918a71cdbe9 993 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET);
Sergunb 0:8918a71cdbe9 994 }
Sergunb 0:8918a71cdbe9 995 }
Sergunb 0:8918a71cdbe9 996 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 997 {
Sergunb 0:8918a71cdbe9 998 //Send more data
Sergunb 0:8918a71cdbe9 999 error = mqttClientProcessEvents(context, context->settings.timeout);
Sergunb 0:8918a71cdbe9 1000 }
Sergunb 0:8918a71cdbe9 1001 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 1002 {
Sergunb 0:8918a71cdbe9 1003 //Debug message
Sergunb 0:8918a71cdbe9 1004 TRACE_INFO("MQTT: Shutting down connection...\r\n");
Sergunb 0:8918a71cdbe9 1005
Sergunb 0:8918a71cdbe9 1006 //After sending a DISCONNECT packet the client must not send any
Sergunb 0:8918a71cdbe9 1007 //more control packets on that network connection
Sergunb 0:8918a71cdbe9 1008 mqttClientChangeState(context, MQTT_CLIENT_STATE_DISCONNECTING);
Sergunb 0:8918a71cdbe9 1009 }
Sergunb 0:8918a71cdbe9 1010 else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
Sergunb 0:8918a71cdbe9 1011 {
Sergunb 0:8918a71cdbe9 1012 //Properly dispose the network connection
Sergunb 0:8918a71cdbe9 1013 error = mqttClientShutdownConnection(context);
Sergunb 0:8918a71cdbe9 1014
Sergunb 0:8918a71cdbe9 1015 //Check status code
Sergunb 0:8918a71cdbe9 1016 if(!error)
Sergunb 0:8918a71cdbe9 1017 {
Sergunb 0:8918a71cdbe9 1018 //The MQTT client is disconnected
Sergunb 0:8918a71cdbe9 1019 mqttClientChangeState(context, MQTT_CLIENT_STATE_DISCONNECTED);
Sergunb 0:8918a71cdbe9 1020 }
Sergunb 0:8918a71cdbe9 1021 }
Sergunb 0:8918a71cdbe9 1022 else
Sergunb 0:8918a71cdbe9 1023 {
Sergunb 0:8918a71cdbe9 1024 //Invalid state
Sergunb 0:8918a71cdbe9 1025 error = ERROR_NOT_CONNECTED;
Sergunb 0:8918a71cdbe9 1026 }
Sergunb 0:8918a71cdbe9 1027
Sergunb 0:8918a71cdbe9 1028 //Any error to report?
Sergunb 0:8918a71cdbe9 1029 if(error)
Sergunb 0:8918a71cdbe9 1030 break;
Sergunb 0:8918a71cdbe9 1031 }
Sergunb 0:8918a71cdbe9 1032
Sergunb 0:8918a71cdbe9 1033 //Return status code
Sergunb 0:8918a71cdbe9 1034 return error;
Sergunb 0:8918a71cdbe9 1035 }
Sergunb 0:8918a71cdbe9 1036
Sergunb 0:8918a71cdbe9 1037
Sergunb 0:8918a71cdbe9 1038 /**
Sergunb 0:8918a71cdbe9 1039 * @brief Close the connection with the MQTT server
Sergunb 0:8918a71cdbe9 1040 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 1041 * @return Error code
Sergunb 0:8918a71cdbe9 1042 **/
Sergunb 0:8918a71cdbe9 1043
Sergunb 0:8918a71cdbe9 1044 error_t mqttClientClose(MqttClientContext *context)
Sergunb 0:8918a71cdbe9 1045 {
Sergunb 0:8918a71cdbe9 1046 //Make sure the MQTT client context is valid
Sergunb 0:8918a71cdbe9 1047 if(context == NULL)
Sergunb 0:8918a71cdbe9 1048 return ERROR_INVALID_PARAMETER;
Sergunb 0:8918a71cdbe9 1049
Sergunb 0:8918a71cdbe9 1050 //Close connection
Sergunb 0:8918a71cdbe9 1051 mqttClientCloseConnection(context);
Sergunb 0:8918a71cdbe9 1052 //The connection is closed
Sergunb 0:8918a71cdbe9 1053 mqttClientChangeState(context, MQTT_CLIENT_STATE_CLOSED);
Sergunb 0:8918a71cdbe9 1054
Sergunb 0:8918a71cdbe9 1055 //Network connection successfully closed
Sergunb 0:8918a71cdbe9 1056 return NO_ERROR;
Sergunb 0:8918a71cdbe9 1057 }
Sergunb 0:8918a71cdbe9 1058
Sergunb 0:8918a71cdbe9 1059
Sergunb 0:8918a71cdbe9 1060 /**
Sergunb 0:8918a71cdbe9 1061 * @brief Process MQTT client events
Sergunb 0:8918a71cdbe9 1062 * @param[in] context Pointer to the MQTT client context
Sergunb 0:8918a71cdbe9 1063 * @param[in] timeout Maximum time to wait before returning
Sergunb 0:8918a71cdbe9 1064 * @return Error code
Sergunb 0:8918a71cdbe9 1065 **/
Sergunb 0:8918a71cdbe9 1066
Sergunb 0:8918a71cdbe9 1067 error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Sergunb 0:8918a71cdbe9 1068 {
Sergunb 0:8918a71cdbe9 1069 error_t error;
Sergunb 0:8918a71cdbe9 1070 size_t n;
Sergunb 0:8918a71cdbe9 1071
Sergunb 0:8918a71cdbe9 1072 //It is the responsibility of the client to ensure that the interval
Sergunb 0:8918a71cdbe9 1073 //between control packets being sent does not exceed the keep-alive value
Sergunb 0:8918a71cdbe9 1074 error = mqttClientCheckKeepAlive(context);
Sergunb 0:8918a71cdbe9 1075
Sergunb 0:8918a71cdbe9 1076 //Check status code
Sergunb 0:8918a71cdbe9 1077 if(!error)
Sergunb 0:8918a71cdbe9 1078 {
Sergunb 0:8918a71cdbe9 1079 //Check current state
Sergunb 0:8918a71cdbe9 1080 if(context->state == MQTT_CLIENT_STATE_IDLE ||
Sergunb 0:8918a71cdbe9 1081 context->state == MQTT_CLIENT_STATE_PACKET_SENT)
Sergunb 0:8918a71cdbe9 1082 {
Sergunb 0:8918a71cdbe9 1083 //Wait for incoming data
Sergunb 0:8918a71cdbe9 1084 error = mqttClientWaitForData(context, timeout);
Sergunb 0:8918a71cdbe9 1085
Sergunb 0:8918a71cdbe9 1086 //Check status code
Sergunb 0:8918a71cdbe9 1087 if(!error)
Sergunb 0:8918a71cdbe9 1088 {
Sergunb 0:8918a71cdbe9 1089 //Initialize context
Sergunb 0:8918a71cdbe9 1090 context->packet = context->buffer;
Sergunb 0:8918a71cdbe9 1091 context->packetPos = 0;
Sergunb 0:8918a71cdbe9 1092 context->packetLen = 0;
Sergunb 0:8918a71cdbe9 1093 context->remainingLen = 0;
Sergunb 0:8918a71cdbe9 1094
Sergunb 0:8918a71cdbe9 1095 //Start receiving the packet
Sergunb 0:8918a71cdbe9 1096 mqttClientChangeState(context, MQTT_CLIENT_STATE_RECEIVING_PACKET);
Sergunb 0:8918a71cdbe9 1097 }
Sergunb 0:8918a71cdbe9 1098 }
Sergunb 0:8918a71cdbe9 1099 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 1100 {
Sergunb 0:8918a71cdbe9 1101 //Receive the incoming packet
Sergunb 0:8918a71cdbe9 1102 error = mqttClientReceivePacket(context);
Sergunb 0:8918a71cdbe9 1103
Sergunb 0:8918a71cdbe9 1104 //Check status code
Sergunb 0:8918a71cdbe9 1105 if(!error)
Sergunb 0:8918a71cdbe9 1106 {
Sergunb 0:8918a71cdbe9 1107 //Process MQTT control packet
Sergunb 0:8918a71cdbe9 1108 error = mqttClientProcessPacket(context);
Sergunb 0:8918a71cdbe9 1109
Sergunb 0:8918a71cdbe9 1110 //Update MQTT client state
Sergunb 0:8918a71cdbe9 1111 if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
Sergunb 0:8918a71cdbe9 1112 {
Sergunb 0:8918a71cdbe9 1113 if(context->packetType == MQTT_PACKET_TYPE_INVALID)
Sergunb 0:8918a71cdbe9 1114 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 1115 else
Sergunb 0:8918a71cdbe9 1116 mqttClientChangeState(context, MQTT_CLIENT_STATE_PACKET_SENT);
Sergunb 0:8918a71cdbe9 1117 }
Sergunb 0:8918a71cdbe9 1118 }
Sergunb 0:8918a71cdbe9 1119 }
Sergunb 0:8918a71cdbe9 1120 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
Sergunb 0:8918a71cdbe9 1121 {
Sergunb 0:8918a71cdbe9 1122 //Any remaining data to be sent?
Sergunb 0:8918a71cdbe9 1123 if(context->packetPos < context->packetLen)
Sergunb 0:8918a71cdbe9 1124 {
Sergunb 0:8918a71cdbe9 1125 //Send more data
Sergunb 0:8918a71cdbe9 1126 error = mqttClientSendData(context, context->packet + context->packetPos,
Sergunb 0:8918a71cdbe9 1127 context->packetLen - context->packetPos, &n, 0);
Sergunb 0:8918a71cdbe9 1128
Sergunb 0:8918a71cdbe9 1129 //Advance data pointer
Sergunb 0:8918a71cdbe9 1130 context->packetPos += n;
Sergunb 0:8918a71cdbe9 1131 }
Sergunb 0:8918a71cdbe9 1132 else
Sergunb 0:8918a71cdbe9 1133 {
Sergunb 0:8918a71cdbe9 1134 //Save the time at which the message was sent
Sergunb 0:8918a71cdbe9 1135 context->keepAliveTimestamp = osGetSystemTime();
Sergunb 0:8918a71cdbe9 1136
Sergunb 0:8918a71cdbe9 1137 //Update MQTT client state
Sergunb 0:8918a71cdbe9 1138 if(context->packetType == MQTT_PACKET_TYPE_INVALID)
Sergunb 0:8918a71cdbe9 1139 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE);
Sergunb 0:8918a71cdbe9 1140 else
Sergunb 0:8918a71cdbe9 1141 mqttClientChangeState(context, MQTT_CLIENT_STATE_PACKET_SENT);
Sergunb 0:8918a71cdbe9 1142 }
Sergunb 0:8918a71cdbe9 1143 }
Sergunb 0:8918a71cdbe9 1144 }
Sergunb 0:8918a71cdbe9 1145
Sergunb 0:8918a71cdbe9 1146 //Return status code
Sergunb 0:8918a71cdbe9 1147 return error;
Sergunb 0:8918a71cdbe9 1148 }
Sergunb 0:8918a71cdbe9 1149
Sergunb 0:8918a71cdbe9 1150 #endif
Sergunb 0:8918a71cdbe9 1151