Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mqtt_client.c
00001 /** 00002 * @file mqtt_client.c 00003 * @brief MQTT client 00004 * 00005 * @section License 00006 * 00007 * Copyright (C) 2010-2017 Oryx Embedded SARL. All rights reserved. 00008 * 00009 * This file is part of CycloneTCP Open. 00010 * 00011 * This program is free software; you can redistribute it and/or 00012 * modify it under the terms of the GNU General Public License 00013 * as published by the Free Software Foundation; either version 2 00014 * of the License, or (at your option) any later version. 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU General Public License for more details. 00020 * 00021 * You should have received a copy of the GNU General Public License 00022 * along with this program; if not, write to the Free Software Foundation, 00023 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00024 * 00025 * @author Oryx Embedded SARL (www.oryx-embedded.com) 00026 * @version 1.7.6 00027 **/ 00028 00029 //Switch to the appropriate trace level 00030 #define TRACE_LEVEL MQTT_TRACE_LEVEL 00031 00032 //Dependencies 00033 #include "core/net.h" 00034 #include "mqtt/mqtt_client.h" 00035 #include "mqtt/mqtt_client_packet.h" 00036 #include "mqtt/mqtt_client_transport.h" 00037 #include "mqtt/mqtt_client_misc.h" 00038 #include "debug.h" 00039 00040 //Check TCP/IP stack configuration 00041 #if (MQTT_CLIENT_SUPPORT == ENABLED) 00042 00043 00044 /** 00045 * @brief Initialize MQTT client context 00046 * @param[in] context Pointer to the MQTT client context 00047 **/ 00048 00049 void mqttClientInit(MqttClientContext *context) 00050 { 00051 //Sanity check 00052 if(context != NULL) 00053 { 00054 //Clear MQTT client context 00055 memset(context, 0, sizeof(MqttClientContext)); 00056 00057 //Default protocol version 00058 context->settings.protocolLevel = MQTT_PROTOCOL_LEVEL_3_1_1; 00059 //Default transport protocol 00060 context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP; 00061 //Default keep-alive time interval 00062 context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE; 00063 //Default communication timeout 00064 context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT; 00065 00066 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00067 //Default resource name (for WebSocket connections only) 00068 strcpy(context->settings.uri, "/"); 00069 #endif 00070 00071 //Initialize state machine 00072 context->state = MQTT_CLIENT_STATE_CLOSED; 00073 //Initialize packet identifier 00074 context->packetId = 0; 00075 } 00076 } 00077 00078 00079 /** 00080 * @brief Initialize callback structure 00081 * @param[in] callbacks Pointer to a structure that contains callback functions 00082 * @return Error code 00083 **/ 00084 00085 void mqttClientInitCallbacks(MqttClientCallbacks *callbacks) 00086 { 00087 //Initialize callback structure 00088 memset(callbacks, 0, sizeof(MqttClientCallbacks)); 00089 } 00090 00091 00092 /** 00093 * @brief Register MQTT client callbacks 00094 * @param[in] context Pointer to the MQTT client context 00095 * @param[in] callbacks Pointer to a structure that contains callback functions 00096 * @return Error code 00097 **/ 00098 00099 error_t mqttClientRegisterCallbacks(MqttClientContext *context, 00100 const MqttClientCallbacks *callbacks) 00101 { 00102 //Make sure the MQTT client context is valid 00103 if(context == NULL) 00104 return ERROR_INVALID_PARAMETER; 00105 00106 //Attach callback functions 00107 context->callbacks = *callbacks; 00108 00109 //Successful processing 00110 return NO_ERROR; 00111 } 00112 00113 00114 /** 00115 * @brief Set the MQTT protocol version to be used 00116 * @param[in] context Pointer to the MQTT client context 00117 * @param[in] protocolLevel MQTT protocol level (3.1 or 3.1.1) 00118 * @return Error code 00119 **/ 00120 00121 error_t mqttClientSetProtocolLevel(MqttClientContext *context, 00122 MqttProtocolLevel protocolLevel) 00123 { 00124 //Make sure the MQTT client context is valid 00125 if(context == NULL) 00126 return ERROR_INVALID_PARAMETER; 00127 00128 //Save the MQTT protocol version to be used 00129 context->settings.protocolLevel = protocolLevel; 00130 00131 //Successful processing 00132 return NO_ERROR; 00133 } 00134 00135 00136 /** 00137 * @brief Set the transport protocol to be used 00138 * @param[in] context Pointer to the MQTT client context 00139 * @param[in] transportProtocol Transport protocol to be used (TCP, TLS, 00140 * WebSocket, or secure WebSocket) 00141 * @return Error code 00142 **/ 00143 00144 error_t mqttClientSetTransportProtocol(MqttClientContext *context, 00145 MqttTransportProtocol transportProtocol) 00146 { 00147 //Make sure the MQTT client context is valid 00148 if(context == NULL) 00149 return ERROR_INVALID_PARAMETER; 00150 00151 //Save the transport protocol to be used 00152 context->settings.transportProtocol = transportProtocol; 00153 00154 //Successful processing 00155 return NO_ERROR; 00156 } 00157 00158 00159 /** 00160 * @brief Set keep-alive value 00161 * @param[in] context Pointer to the MQTT client context 00162 * @param[in] keepAlive Maximum time interval that is permitted to elapse 00163 * between the point at which the client finishes transmitting one control 00164 * packet and the point it starts sending the next 00165 * @return Error code 00166 **/ 00167 00168 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive) 00169 { 00170 //Make sure the MQTT client context is valid 00171 if(context == NULL) 00172 return ERROR_INVALID_PARAMETER; 00173 00174 //Save keep-alive value 00175 context->settings.keepAlive = keepAlive; 00176 00177 //Successful processing 00178 return NO_ERROR; 00179 } 00180 00181 00182 /** 00183 * @brief Set communication timeout 00184 * @param[in] context Pointer to the MQTT client context 00185 * @param[in] timeout Timeout value, in seconds 00186 * @return Error code 00187 **/ 00188 00189 error_t mqttClientSetTimeout(MqttClientContext *context, uint16_t timeout) 00190 { 00191 //Make sure the MQTT client context is valid 00192 if(context == NULL) 00193 return ERROR_INVALID_PARAMETER; 00194 00195 //Save timeout value 00196 context->settings.timeout = timeout; 00197 00198 //Successful processing 00199 return NO_ERROR; 00200 } 00201 00202 00203 /** 00204 * @brief Set the hostname of the resource being requested 00205 * @param[in] context Pointer to the MQTT client context 00206 * @param[in] host NULL-terminated string containing the hostname 00207 * @return Error code 00208 **/ 00209 00210 error_t mqttClientSetHost(MqttClientContext *context, const char_t *host) 00211 { 00212 //Check parameters 00213 if(context == NULL || host == NULL) 00214 return ERROR_INVALID_PARAMETER; 00215 00216 //Make sure the length of the hostname is acceptable 00217 if(strlen(host) > MQTT_CLIENT_MAX_HOST_LEN) 00218 return ERROR_INVALID_LENGTH; 00219 00220 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00221 //Save hostname (for WebSocket connections only) 00222 strcpy(context->settings.host, host); 00223 #endif 00224 00225 //Successful processing 00226 return NO_ERROR; 00227 } 00228 00229 00230 /** 00231 * @brief Set the name of the resource being requested 00232 * @param[in] context Pointer to the MQTT client context 00233 * @param[in] uri NULL-terminated string containing the URI 00234 * @return Error code 00235 **/ 00236 00237 error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri) 00238 { 00239 //Check parameters 00240 if(context == NULL || uri == NULL) 00241 return ERROR_INVALID_PARAMETER; 00242 00243 //Make sure the length of the resource name is acceptable 00244 if(strlen(uri) > MQTT_CLIENT_MAX_URI_LEN) 00245 return ERROR_INVALID_LENGTH; 00246 00247 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00248 //Save resource name (for WebSocket connections only) 00249 strcpy(context->settings.uri, uri); 00250 #endif 00251 00252 //Successful processing 00253 return NO_ERROR; 00254 } 00255 00256 00257 /** 00258 * @brief Set client identifier 00259 * @param[in] context Pointer to the MQTT client context 00260 * @param[in] clientId NULL-terminated string containing the client identifier 00261 * @return Error code 00262 **/ 00263 00264 error_t mqttClientSetIdentifier(MqttClientContext *context, 00265 const char_t *clientId) 00266 { 00267 //Check parameters 00268 if(context == NULL || clientId == NULL) 00269 return ERROR_INVALID_PARAMETER; 00270 00271 //Make sure the length of the client identifier is acceptable 00272 if(strlen(clientId) > MQTT_CLIENT_MAX_ID_LEN) 00273 return ERROR_INVALID_LENGTH; 00274 00275 //Save client identifier 00276 strcpy(context->settings.clientId, clientId); 00277 00278 //Successful processing 00279 return NO_ERROR; 00280 } 00281 00282 00283 /** 00284 * @brief Set authentication information 00285 * @param[in] context Pointer to the MQTT client context 00286 * @param[in] username NULL-terminated string containing the user name to be used 00287 * @param[in] password NULL-terminated string containing the password to be used 00288 * @return Error code 00289 **/ 00290 00291 error_t mqttClientSetAuthInfo(MqttClientContext *context, 00292 const char_t *username, const char_t *password) 00293 { 00294 //Check parameters 00295 if(context == NULL || username == NULL || password == NULL) 00296 return ERROR_INVALID_PARAMETER; 00297 00298 //Make sure the length of the user name is acceptable 00299 if(strlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN) 00300 return ERROR_INVALID_LENGTH; 00301 00302 //Save user name 00303 strcpy(context->settings.username, username); 00304 00305 //Make sure the length of the password is acceptable 00306 if(strlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN) 00307 return ERROR_INVALID_LENGTH; 00308 00309 //Save password 00310 strcpy(context->settings.password, password); 00311 00312 //Successful processing 00313 return NO_ERROR; 00314 } 00315 00316 00317 /** 00318 * @brief Specify the Will message 00319 * @param[in] context Pointer to the MQTT client context 00320 * @param[in] topic Will topic name 00321 * @param[in] message Will message 00322 * @param[in] length Length of the Will message 00323 * @param[in] qos QoS level to be used when publishing the Will message 00324 * @param[in] retain This flag specifies if the Will message is to be retained 00325 * @return Error code 00326 **/ 00327 00328 error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, 00329 const void *message, size_t length, MqttQosLevel qos, bool_t retain) 00330 { 00331 MqttClientWillMessage *willMessage; 00332 00333 //Check parameters 00334 if(context == NULL || topic == NULL) 00335 return ERROR_INVALID_PARAMETER; 00336 00337 //Make sure the length of the Will topic is acceptable 00338 if(strlen(topic) > MQTT_CLIENT_MAX_WILL_TOPIC_LEN) 00339 return ERROR_INVALID_LENGTH; 00340 00341 //Point to the Will message 00342 willMessage = &context->settings.willMessage; 00343 00344 //Save Will topic 00345 strcpy(willMessage->topic, topic); 00346 00347 //Any message payload 00348 if(length > 0) 00349 { 00350 //Sanity check 00351 if(message == NULL) 00352 return ERROR_INVALID_PARAMETER; 00353 00354 //Make sure the length of the Will message payload is acceptable 00355 if(strlen(message) > MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN) 00356 return ERROR_INVALID_LENGTH; 00357 00358 //Save Will message payload 00359 memcpy(willMessage->payload, message, length); 00360 } 00361 00362 //Length of the Will message payload 00363 willMessage->length = length; 00364 //QoS level to be used when publishing the Will message 00365 willMessage->qos = qos; 00366 //This flag specifies if the Will message is to be retained 00367 willMessage->retain = retain; 00368 00369 //Successful processing 00370 return NO_ERROR; 00371 } 00372 00373 00374 /** 00375 * @brief Bind the MQTT client to a particular network interface 00376 * @param[in] context Pointer to the MQTT client context 00377 * @param[in] interface Network interface to be used 00378 * @return Error code 00379 **/ 00380 00381 error_t mqttClientBindToInterface(MqttClientContext *context, 00382 NetInterface *interface) 00383 { 00384 //Make sure the MQTT client context is valid 00385 if(context == NULL) 00386 return ERROR_INVALID_PARAMETER; 00387 00388 //Explicitly associate the MQTT client with the specified interface 00389 context->interface = interface; 00390 00391 //Successful processing 00392 return NO_ERROR; 00393 } 00394 00395 00396 /** 00397 * @brief Establish connection with the MQTT server 00398 * @param[in] context Pointer to the MQTT client context 00399 * @param[in] serverIpAddr IP address of the MQTT server to connect to 00400 * @param[in] serverPort TCP port number that will be used to establish the 00401 * connection 00402 * @param[in] cleanSession If this flag is set, then the client and server 00403 * must discard any previous session and start a new one 00404 * @return Error code 00405 **/ 00406 00407 error_t mqttClientConnect(MqttClientContext *context, 00408 const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession) 00409 { 00410 error_t error; 00411 00412 //Check parameters 00413 if(context == NULL || serverIpAddr == NULL) 00414 return ERROR_INVALID_PARAMETER; 00415 00416 //Initialize status code 00417 error = NO_ERROR; 00418 00419 //Establish network connection 00420 while(context->state != MQTT_CLIENT_STATE_IDLE) 00421 { 00422 //Check current state 00423 if(context->state == MQTT_CLIENT_STATE_CLOSED) 00424 { 00425 //Open network connection 00426 error = mqttClientOpenConnection(context); 00427 00428 //Check status code 00429 if(!error) 00430 { 00431 //Debug message 00432 TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n", 00433 ipAddrToString(serverIpAddr, NULL), serverPort); 00434 00435 //The network connection is open 00436 mqttClientChangeState(context, MQTT_CLIENT_STATE_CONNECTING); 00437 } 00438 else 00439 { 00440 //Clean up side effects 00441 mqttClientCloseConnection(context); 00442 } 00443 } 00444 else if(context->state == MQTT_CLIENT_STATE_CONNECTING) 00445 { 00446 //Establish network connection 00447 error = mqttClientEstablishConnection(context, 00448 serverIpAddr, serverPort); 00449 00450 //Check status code 00451 if(!error) 00452 { 00453 //Debug message 00454 TRACE_INFO("MQTT: Connected to server\r\n"); 00455 00456 //The network connection is established 00457 mqttClientChangeState(context, MQTT_CLIENT_STATE_CONNECTED); 00458 } 00459 } 00460 else if(context->state == MQTT_CLIENT_STATE_CONNECTED) 00461 { 00462 //Format CONNECT packet 00463 error = mqttClientFormatConnect(context, cleanSession); 00464 00465 //Check status code 00466 if(!error) 00467 { 00468 //Debug message 00469 TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen); 00470 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen); 00471 00472 //Save the type of the MQTT packet to be sent 00473 context->packetType = MQTT_PACKET_TYPE_CONNECT; 00474 //Point to the beginning of the packet 00475 context->packetPos = 0; 00476 00477 //Send CONNECT packet 00478 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET); 00479 } 00480 } 00481 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 00482 { 00483 //Send more data 00484 error = mqttClientProcessEvents(context, context->settings.timeout); 00485 } 00486 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT) 00487 { 00488 //Wait for CONNACK packet 00489 error = mqttClientProcessEvents(context, context->settings.timeout); 00490 } 00491 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 00492 { 00493 //Receive more data 00494 error = mqttClientProcessEvents(context, context->settings.timeout); 00495 } 00496 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED) 00497 { 00498 //Reset packet type 00499 context->packetType = MQTT_PACKET_TYPE_INVALID; 00500 //A CONNACK packet has been received 00501 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00502 } 00503 else 00504 { 00505 //Invalid state 00506 error = ERROR_NOT_CONNECTED; 00507 } 00508 00509 //Any error to report? 00510 if(error) 00511 { 00512 #if (NET_RTOS_SUPPORT == DISABLED) 00513 //Timeout error? 00514 if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT) 00515 break; 00516 #endif 00517 //Close connection 00518 mqttClientCloseConnection(context); 00519 //The connection is closed 00520 mqttClientChangeState(context, MQTT_CLIENT_STATE_CLOSED); 00521 //Exit immediately 00522 break; 00523 } 00524 } 00525 00526 //Return status code 00527 return error; 00528 } 00529 00530 00531 /** 00532 * @brief Publish message 00533 * @param[in] context Pointer to the MQTT client context 00534 * @param[in] topic Topic name 00535 * @param[in] message Message payload 00536 * @param[in] length Length of the message payload 00537 * @param[in] qos QoS level to be used when publishing the message 00538 * @param[in] retain This flag specifies if the message is to be retained 00539 * @param[out] packetId Packet identifier used to send the PUBLISH packet 00540 * @return Error code 00541 **/ 00542 00543 error_t mqttClientPublish(MqttClientContext *context, 00544 const char_t *topic, const void *message, size_t length, 00545 MqttQosLevel qos, bool_t retain, uint16_t *packetId) 00546 { 00547 error_t error; 00548 00549 //Make sure the MQTT client context is valid 00550 if(context == NULL) 00551 return ERROR_INVALID_PARAMETER; 00552 00553 //Initialize status code 00554 error = NO_ERROR; 00555 00556 //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received 00557 do 00558 { 00559 //Check current state 00560 if(context->state == MQTT_CLIENT_STATE_IDLE) 00561 { 00562 //Format PUBLISH packet 00563 error = mqttClientFormatPublish(context, topic, message, length, qos, retain); 00564 00565 //Check status code 00566 if(!error) 00567 { 00568 //Save the packet identifier used to send the PUBLISH packet 00569 if(packetId != NULL) 00570 *packetId = context->packetId; 00571 00572 //Debug message 00573 TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen); 00574 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen); 00575 00576 //Save the type of the MQTT packet to be sent 00577 context->packetType = MQTT_PACKET_TYPE_PUBLISH; 00578 //Point to the beginning of the packet 00579 context->packetPos = 0; 00580 00581 //Send PUBLISH packet 00582 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET); 00583 } 00584 } 00585 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 00586 { 00587 //Send more data 00588 error = mqttClientProcessEvents(context, context->settings.timeout); 00589 } 00590 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT) 00591 { 00592 //The last parameter is optional 00593 if(packetId != NULL) 00594 { 00595 //Reset packet type 00596 context->packetType = MQTT_PACKET_TYPE_INVALID; 00597 //Do not wait for PUBACK/PUBCOMP packet 00598 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00599 } 00600 else 00601 { 00602 //Check QoS level 00603 if(qos == MQTT_QOS_LEVEL_0) 00604 { 00605 //Reset packet type 00606 context->packetType = MQTT_PACKET_TYPE_INVALID; 00607 //No response is sent by the receiver and no retry is performed by the sender 00608 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00609 } 00610 else 00611 { 00612 //Wait for PUBACK/PUBCOMP packet 00613 error = mqttClientProcessEvents(context, context->settings.timeout); 00614 } 00615 } 00616 } 00617 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 00618 { 00619 //Receive more data 00620 error = mqttClientProcessEvents(context, context->settings.timeout); 00621 } 00622 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED) 00623 { 00624 //Reset packet type 00625 context->packetType = MQTT_PACKET_TYPE_INVALID; 00626 //A PUBACK/PUBCOMP packet has been received 00627 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00628 } 00629 else 00630 { 00631 //Invalid state 00632 error = ERROR_NOT_CONNECTED; 00633 } 00634 00635 //Any error to report? 00636 if(error) 00637 break; 00638 00639 //Evaluate the loop condition 00640 } while(context->state != MQTT_CLIENT_STATE_IDLE); 00641 00642 //Return status code 00643 return error; 00644 } 00645 00646 00647 /** 00648 * @brief Subscribe to topics 00649 * @param[in] context Pointer to the MQTT client context 00650 * @param[in] topic Topic filter 00651 * @param[in] qos Maximum QoS level at which the server can send application 00652 * messages to the client 00653 * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet 00654 * @return Error code 00655 **/ 00656 00657 error_t mqttClientSubscribe(MqttClientContext *context, 00658 const char_t *topic, MqttQosLevel qos, uint16_t *packetId) 00659 { 00660 error_t error; 00661 00662 //Make sure the MQTT client context is valid 00663 if(context == NULL) 00664 return ERROR_INVALID_PARAMETER; 00665 00666 //Initialize status code 00667 error = NO_ERROR; 00668 00669 //Send SUBSCRIBE packet and wait for SUBACK packet to be received 00670 do 00671 { 00672 //Check current state 00673 if(context->state == MQTT_CLIENT_STATE_IDLE) 00674 { 00675 //Format SUBSCRIBE packet 00676 error = mqttClientFormatSubscribe(context, topic, qos); 00677 00678 //Check status code 00679 if(!error) 00680 { 00681 //Save the packet identifier used to send the SUBSCRIBE packet 00682 if(packetId != NULL) 00683 *packetId = context->packetId; 00684 00685 //Debug message 00686 TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen); 00687 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen); 00688 00689 //Save the type of the MQTT packet to be sent 00690 context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE; 00691 //Point to the beginning of the packet 00692 context->packetPos = 0; 00693 00694 //Send SUBSCRIBE packet 00695 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET); 00696 } 00697 } 00698 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 00699 { 00700 //Send more data 00701 error = mqttClientProcessEvents(context, context->settings.timeout); 00702 } 00703 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT) 00704 { 00705 //The last parameter is optional 00706 if(packetId != NULL) 00707 { 00708 //Reset packet type 00709 context->packetType = MQTT_PACKET_TYPE_INVALID; 00710 //Do not wait for SUBACK packet 00711 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00712 } 00713 else 00714 { 00715 //Wait for SUBACK packet 00716 error = mqttClientProcessEvents(context, context->settings.timeout); 00717 } 00718 } 00719 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 00720 { 00721 //Receive more data 00722 error = mqttClientProcessEvents(context, context->settings.timeout); 00723 } 00724 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED) 00725 { 00726 //Reset packet type 00727 context->packetType = MQTT_PACKET_TYPE_INVALID; 00728 //A SUBACK packet has been received 00729 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00730 } 00731 else 00732 { 00733 //Invalid state 00734 error = ERROR_NOT_CONNECTED; 00735 } 00736 00737 //Any error to report? 00738 if(error) 00739 break; 00740 00741 //Evaluate the loop condition 00742 } while(context->state != MQTT_CLIENT_STATE_IDLE); 00743 00744 //Return status code 00745 return error; 00746 } 00747 00748 00749 /** 00750 * @brief Unsubscribe from topics 00751 * @param[in] context Pointer to the MQTT client context 00752 * @param[in] topic Topic filter 00753 * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet 00754 * @return Error code 00755 **/ 00756 00757 error_t mqttClientUnsubscribe(MqttClientContext *context, 00758 const char_t *topic, uint16_t *packetId) 00759 { 00760 error_t error; 00761 00762 //Make sure the MQTT client context is valid 00763 if(context == NULL) 00764 return ERROR_INVALID_PARAMETER; 00765 00766 //Initialize status code 00767 error = NO_ERROR; 00768 00769 //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received 00770 do 00771 { 00772 //Check current state 00773 if(context->state == MQTT_CLIENT_STATE_IDLE) 00774 { 00775 //Format UNSUBSCRIBE packet 00776 error = mqttClientFormatUnsubscribe(context, topic); 00777 00778 //Check status code 00779 if(!error) 00780 { 00781 //Save the packet identifier used to send the UNSUBSCRIBE packet 00782 if(packetId != NULL) 00783 *packetId = context->packetId; 00784 00785 //Debug message 00786 TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen); 00787 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen); 00788 00789 //Save the type of the MQTT packet to be sent 00790 context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE; 00791 //Point to the beginning of the packet 00792 context->packetPos = 0; 00793 00794 //Send UNSUBSCRIBE packet 00795 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET); 00796 } 00797 } 00798 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 00799 { 00800 //Send more data 00801 error = mqttClientProcessEvents(context, context->settings.timeout); 00802 } 00803 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT) 00804 { 00805 //The last parameter is optional 00806 if(packetId != NULL) 00807 { 00808 //Reset packet type 00809 context->packetType = MQTT_PACKET_TYPE_INVALID; 00810 //Do not wait for UNSUBACK packet 00811 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00812 } 00813 else 00814 { 00815 //Wait for UNSUBACK packet 00816 error = mqttClientProcessEvents(context, context->settings.timeout); 00817 } 00818 } 00819 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 00820 { 00821 //Receive more data 00822 error = mqttClientProcessEvents(context, context->settings.timeout); 00823 } 00824 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED) 00825 { 00826 //Reset packet type 00827 context->packetType = MQTT_PACKET_TYPE_INVALID; 00828 //An UNSUBACK packet has been received 00829 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00830 } 00831 else 00832 { 00833 //Invalid state 00834 error = ERROR_NOT_CONNECTED; 00835 } 00836 00837 //Any error to report? 00838 if(error) 00839 break; 00840 00841 //Evaluate the loop condition 00842 } while(context->state != MQTT_CLIENT_STATE_IDLE); 00843 00844 //Return status code 00845 return error; 00846 } 00847 00848 00849 /** 00850 * @brief Send ping request 00851 * @param[in] context Pointer to the MQTT client context 00852 * @param[out] rtt Round-trip time (optional parameter) 00853 * @return Error code 00854 **/ 00855 00856 error_t mqttClientPing(MqttClientContext *context, systime_t *rtt) 00857 { 00858 error_t error; 00859 00860 //Make sure the MQTT client context is valid 00861 if(context == NULL) 00862 return ERROR_INVALID_PARAMETER; 00863 00864 //Initialize status code 00865 error = NO_ERROR; 00866 00867 //Send PINGREQ packet and wait for PINGRESP packet to be received 00868 do 00869 { 00870 //Check current state 00871 if(context->state == MQTT_CLIENT_STATE_IDLE) 00872 { 00873 //Format PINGREQ packet 00874 error = mqttClientFormatPingReq(context); 00875 00876 //Check status code 00877 if(!error) 00878 { 00879 //Debug message 00880 TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen); 00881 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen); 00882 00883 //Save the type of the MQTT packet to be sent 00884 context->packetType = MQTT_PACKET_TYPE_PINGREQ; 00885 //Point to the beginning of the packet 00886 context->packetPos = 0; 00887 00888 //Send PINGREQ packet 00889 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET); 00890 00891 //Save the time at which the request was sent 00892 if(rtt != NULL) 00893 context->pingTimestamp = osGetSystemTime(); 00894 } 00895 } 00896 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 00897 { 00898 //Send more data 00899 error = mqttClientProcessEvents(context, context->settings.timeout); 00900 } 00901 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT) 00902 { 00903 //The last parameter is optional 00904 if(rtt != NULL) 00905 { 00906 //Wait for PINGRESP packet 00907 error = mqttClientProcessEvents(context, context->settings.timeout); 00908 } 00909 else 00910 { 00911 //Reset packet type 00912 context->packetType = MQTT_PACKET_TYPE_INVALID; 00913 //Do not wait for PINGRESP packet 00914 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00915 } 00916 } 00917 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 00918 { 00919 //Receive more data 00920 error = mqttClientProcessEvents(context, context->settings.timeout); 00921 } 00922 else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED) 00923 { 00924 //The last parameter is optional 00925 if(rtt != NULL) 00926 { 00927 //Compute round-trip time 00928 *rtt = osGetSystemTime() - context->pingTimestamp; 00929 } 00930 00931 //Reset packet type 00932 context->packetType = MQTT_PACKET_TYPE_INVALID; 00933 //A PINGRESP packet has been received 00934 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 00935 } 00936 else 00937 { 00938 //Invalid state 00939 error = ERROR_NOT_CONNECTED; 00940 } 00941 00942 //Any error to report? 00943 if(error) 00944 break; 00945 00946 //Evaluate the loop condition 00947 } while(context->state != MQTT_CLIENT_STATE_IDLE); 00948 00949 //Return status code 00950 return error; 00951 } 00952 00953 00954 /** 00955 * @brief Gracefully disconnect from the MQTT server 00956 * @param[in] context Pointer to the MQTT client context 00957 * @return Error code 00958 **/ 00959 00960 error_t mqttClientDisconnect(MqttClientContext *context) 00961 { 00962 error_t error; 00963 00964 //Make sure the MQTT client context is valid 00965 if(context == NULL) 00966 return ERROR_INVALID_PARAMETER; 00967 00968 //Initialize status code 00969 error = NO_ERROR; 00970 00971 //Send DISCONNECT packet and shutdown network connection 00972 while(context->state != MQTT_CLIENT_STATE_DISCONNECTED) 00973 { 00974 //Check current state 00975 if(context->state == MQTT_CLIENT_STATE_IDLE) 00976 { 00977 //Format DISCONNECT packet 00978 error = mqttClientFormatDisconnect(context); 00979 00980 //Check status code 00981 if(!error) 00982 { 00983 //Debug message 00984 TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen); 00985 TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen); 00986 00987 //Save the type of the MQTT packet to be sent 00988 context->packetType = MQTT_PACKET_TYPE_DISCONNECT; 00989 //Point to the beginning of the packet 00990 context->packetPos = 0; 00991 00992 //Send DISCONNECT packet 00993 mqttClientChangeState(context, MQTT_CLIENT_STATE_SENDING_PACKET); 00994 } 00995 } 00996 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 00997 { 00998 //Send more data 00999 error = mqttClientProcessEvents(context, context->settings.timeout); 01000 } 01001 else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT) 01002 { 01003 //Debug message 01004 TRACE_INFO("MQTT: Shutting down connection...\r\n"); 01005 01006 //After sending a DISCONNECT packet the client must not send any 01007 //more control packets on that network connection 01008 mqttClientChangeState(context, MQTT_CLIENT_STATE_DISCONNECTING); 01009 } 01010 else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING) 01011 { 01012 //Properly dispose the network connection 01013 error = mqttClientShutdownConnection(context); 01014 01015 //Check status code 01016 if(!error) 01017 { 01018 //The MQTT client is disconnected 01019 mqttClientChangeState(context, MQTT_CLIENT_STATE_DISCONNECTED); 01020 } 01021 } 01022 else 01023 { 01024 //Invalid state 01025 error = ERROR_NOT_CONNECTED; 01026 } 01027 01028 //Any error to report? 01029 if(error) 01030 break; 01031 } 01032 01033 //Return status code 01034 return error; 01035 } 01036 01037 01038 /** 01039 * @brief Close the connection with the MQTT server 01040 * @param[in] context Pointer to the MQTT client context 01041 * @return Error code 01042 **/ 01043 01044 error_t mqttClientClose(MqttClientContext *context) 01045 { 01046 //Make sure the MQTT client context is valid 01047 if(context == NULL) 01048 return ERROR_INVALID_PARAMETER; 01049 01050 //Close connection 01051 mqttClientCloseConnection(context); 01052 //The connection is closed 01053 mqttClientChangeState(context, MQTT_CLIENT_STATE_CLOSED); 01054 01055 //Network connection successfully closed 01056 return NO_ERROR; 01057 } 01058 01059 01060 /** 01061 * @brief Process MQTT client events 01062 * @param[in] context Pointer to the MQTT client context 01063 * @param[in] timeout Maximum time to wait before returning 01064 * @return Error code 01065 **/ 01066 01067 error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout) 01068 { 01069 error_t error; 01070 size_t n; 01071 01072 //It is the responsibility of the client to ensure that the interval 01073 //between control packets being sent does not exceed the keep-alive value 01074 error = mqttClientCheckKeepAlive(context); 01075 01076 //Check status code 01077 if(!error) 01078 { 01079 //Check current state 01080 if(context->state == MQTT_CLIENT_STATE_IDLE || 01081 context->state == MQTT_CLIENT_STATE_PACKET_SENT) 01082 { 01083 //Wait for incoming data 01084 error = mqttClientWaitForData(context, timeout); 01085 01086 //Check status code 01087 if(!error) 01088 { 01089 //Initialize context 01090 context->packet = context->buffer; 01091 context->packetPos = 0; 01092 context->packetLen = 0; 01093 context->remainingLen = 0; 01094 01095 //Start receiving the packet 01096 mqttClientChangeState(context, MQTT_CLIENT_STATE_RECEIVING_PACKET); 01097 } 01098 } 01099 else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 01100 { 01101 //Receive the incoming packet 01102 error = mqttClientReceivePacket(context); 01103 01104 //Check status code 01105 if(!error) 01106 { 01107 //Process MQTT control packet 01108 error = mqttClientProcessPacket(context); 01109 01110 //Update MQTT client state 01111 if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET) 01112 { 01113 if(context->packetType == MQTT_PACKET_TYPE_INVALID) 01114 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 01115 else 01116 mqttClientChangeState(context, MQTT_CLIENT_STATE_PACKET_SENT); 01117 } 01118 } 01119 } 01120 else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET) 01121 { 01122 //Any remaining data to be sent? 01123 if(context->packetPos < context->packetLen) 01124 { 01125 //Send more data 01126 error = mqttClientSendData(context, context->packet + context->packetPos, 01127 context->packetLen - context->packetPos, &n, 0); 01128 01129 //Advance data pointer 01130 context->packetPos += n; 01131 } 01132 else 01133 { 01134 //Save the time at which the message was sent 01135 context->keepAliveTimestamp = osGetSystemTime(); 01136 01137 //Update MQTT client state 01138 if(context->packetType == MQTT_PACKET_TYPE_INVALID) 01139 mqttClientChangeState(context, MQTT_CLIENT_STATE_IDLE); 01140 else 01141 mqttClientChangeState(context, MQTT_CLIENT_STATE_PACKET_SENT); 01142 } 01143 } 01144 } 01145 01146 //Return status code 01147 return error; 01148 } 01149 01150 #endif 01151
Generated on Tue Jul 12 2022 17:10:15 by
1.7.2