Embed:
(wiki syntax)
Show/hide line numbers
mqtt_client_transport.c
Go to the documentation of this file.
00001 /** 00002 * @file mqtt_client_transport.c 00003 * @brief Transport protocol abstraction layer 00004 * 00005 * @section License 00006 * 00007 * Copyright (C) 2010-2017 Oryx Embedded SARL. All rights reserved. 00008 * 00009 * This file is part of CycloneSSL Open. 00010 * 00011 * This program is free software; you can redistribute it and/or 00012 * modify it under the terms of the GNU General Public License 00013 * as published by the Free Software Foundation; either version 2 00014 * of the License, or (at your option) any later version. 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU General Public License for more details. 00020 * 00021 * You should have received a copy of the GNU General Public License 00022 * along with this program; if not, write to the Free Software Foundation, 00023 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00024 * 00025 * @author Oryx Embedded SARL (www.oryx-embedded.com) 00026 * @version 1.7.6 00027 **/ 00028 00029 //Switch to the appropriate trace level 00030 #define TRACE_LEVEL MQTT_TRACE_LEVEL 00031 00032 //Dependencies 00033 #include "core/net.h" 00034 #include "core/tcp_misc.h" 00035 #include "mqtt/mqtt_client.h" 00036 #include "mqtt/mqtt_client_packet.h" 00037 #include "mqtt/mqtt_client_transport.h" 00038 #include "mqtt/mqtt_client_misc.h" 00039 #include "debug.h" 00040 00041 //Check TCP/IP stack configuration 00042 #if (MQTT_CLIENT_SUPPORT == ENABLED) 00043 00044 00045 /** 00046 * @brief Open network connection 00047 * @param[in] context Pointer to the MQTT client context 00048 * @return Error code 00049 **/ 00050 00051 error_t mqttClientOpenConnection(MqttClientContext *context) 00052 { 00053 error_t error; 00054 00055 //TCP transport protocol? 00056 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00057 { 00058 //Open a TCP socket 00059 context->socket = socketOpen(SOCKET_TYPE_STREAM, SOCKET_IP_PROTO_TCP); 00060 00061 //Valid socket handle? 00062 if(context->socket != NULL) 00063 { 00064 //Associate the socket with the relevant interface 00065 error = socketBindToInterface(context->socket, context->interface); 00066 } 00067 else 00068 { 00069 //Report an error 00070 error = ERROR_OPEN_FAILED; 00071 } 00072 } 00073 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00074 //TLS transport protocol? 00075 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00076 { 00077 //Open a TCP socket 00078 context->socket = socketOpen(SOCKET_TYPE_STREAM, SOCKET_IP_PROTO_TCP); 00079 00080 //Valid socket handle? 00081 if(context->socket != NULL) 00082 { 00083 //Associate the socket with the relevant interface 00084 error = socketBindToInterface(context->socket, context->interface); 00085 00086 //Check status code 00087 if(!error) 00088 { 00089 //Allocate SSL/TLS context 00090 context->tlsContext = tlsInit(); 00091 00092 //Valid SSL/TLS handle? 00093 if(context->tlsContext != NULL) 00094 { 00095 //Select client operation mode 00096 error = tlsSetConnectionEnd(context->tlsContext, 00097 TLS_CONNECTION_END_CLIENT); 00098 00099 //Check status code 00100 if(!error) 00101 { 00102 //Bind TLS to the relevant socket 00103 error = tlsSetSocket(context->tlsContext, context->socket); 00104 } 00105 00106 //Check status code 00107 if(!error) 00108 { 00109 //Restore SSL/TLS session, if any 00110 if(context->tlsSession.idLength > 0) 00111 { 00112 //Restore SSL/TLS session 00113 error = tlsRestoreSession(context->tlsContext, 00114 &context->tlsSession); 00115 } 00116 } 00117 00118 //Check status code 00119 if(!error) 00120 { 00121 //Invoke user-defined callback, if any 00122 if(context->callbacks.tlsInitCallback != NULL) 00123 { 00124 //Perform SSL/TLS related initialization 00125 error = context->callbacks.tlsInitCallback(context, 00126 context->tlsContext); 00127 } 00128 } 00129 } 00130 else 00131 { 00132 //Report an error 00133 error = ERROR_OPEN_FAILED; 00134 } 00135 } 00136 } 00137 else 00138 { 00139 //Report an error 00140 error = ERROR_OPEN_FAILED; 00141 } 00142 } 00143 #endif 00144 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00145 //WebSocket transport protocol? 00146 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS) 00147 { 00148 //Open a WebSocket 00149 context->webSocket = webSocketOpen(); 00150 00151 //Valid WebSocket handle? 00152 if(context->webSocket != NULL) 00153 { 00154 //Associate the WebSocket with the relevant interface 00155 error = webSocketBindToInterface(context->webSocket, 00156 context->interface); 00157 } 00158 else 00159 { 00160 //Report an error 00161 error = ERROR_OPEN_FAILED; 00162 } 00163 } 00164 //Secure WebSocket transport protocol? 00165 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00166 { 00167 //Open a WebSocket 00168 context->webSocket = webSocketOpen(); 00169 00170 //Valid WebSocket handle? 00171 if(context->webSocket != NULL) 00172 { 00173 //Associate the WebSocket with the relevant interface 00174 error = webSocketBindToInterface(context->webSocket, 00175 context->interface); 00176 00177 //Check status code 00178 if(!error) 00179 { 00180 //Register SSL/TLS initialization callback 00181 error = webSocketRegisterTlsInitCallback(context->webSocket, 00182 (WebSocketTlsInitCallback) context->callbacks.tlsInitCallback); 00183 } 00184 } 00185 else 00186 { 00187 //Report an error 00188 error = ERROR_OPEN_FAILED; 00189 } 00190 } 00191 #endif 00192 //Unknown transport protocol? 00193 else 00194 { 00195 //Report an error 00196 error = ERROR_INVALID_PROTOCOL; 00197 } 00198 00199 //Return status code 00200 return error; 00201 } 00202 00203 00204 /** 00205 * @brief Establish network connection 00206 * @param[in] context Pointer to the MQTT client context 00207 * @param[in] serverIpAddr IP address of the MQTT server to connect to 00208 * @param[in] serverPort TCP port number that will be used to establish the 00209 * connection 00210 * @return Error code 00211 **/ 00212 00213 error_t mqttClientEstablishConnection(MqttClientContext *context, 00214 const IpAddr *serverIpAddr, uint16_t serverPort) 00215 { 00216 error_t error; 00217 00218 //TCP transport protocol? 00219 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00220 { 00221 //Set timeout 00222 error = socketSetTimeout(context->socket, context->settings.timeout); 00223 00224 //Check status code 00225 if(!error) 00226 { 00227 //Connect to the MQTT server using TCP 00228 error = socketConnect(context->socket, serverIpAddr, serverPort); 00229 } 00230 } 00231 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00232 //TLS transport protocol? 00233 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00234 { 00235 //Set timeout 00236 error = socketSetTimeout(context->socket, context->settings.timeout); 00237 00238 //Check status code 00239 if(!error) 00240 { 00241 //Connect to the MQTT server using TCP 00242 error = socketConnect(context->socket, serverIpAddr, serverPort); 00243 } 00244 00245 //Check status code 00246 if(!error) 00247 { 00248 //Establish a SSL/TLS session 00249 error = tlsConnect(context->tlsContext); 00250 } 00251 } 00252 #endif 00253 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00254 //WebSocket transport protocol? 00255 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS || 00256 context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00257 { 00258 //Set timeout 00259 error = webSocketSetTimeout(context->webSocket, context->settings.timeout); 00260 00261 //Check status code 00262 if(!error) 00263 { 00264 //Set the hostname of the remote server 00265 error = webSocketSetHost(context->webSocket, context->settings.host); 00266 } 00267 00268 //Check status code 00269 if(!error) 00270 { 00271 //The client MUST include "mqtt" in the list of WebSocket 00272 //sub-protocols it offers 00273 error = webSocketSetSubProtocol(context->webSocket, "mqtt"); 00274 } 00275 00276 //Check status code 00277 if(!error) 00278 { 00279 //Connect to the MQTT server using WebSocket 00280 error = webSocketConnect(context->webSocket, serverIpAddr, 00281 serverPort, context->settings.uri); 00282 } 00283 } 00284 #endif 00285 //Unknown transport protocol? 00286 else 00287 { 00288 //Report an error 00289 error = ERROR_INVALID_PROTOCOL; 00290 } 00291 00292 //Return status code 00293 return error; 00294 } 00295 00296 00297 /** 00298 * @brief Shutdown network connection 00299 * @param[in] context Pointer to the MQTT client context 00300 * @return Error code 00301 **/ 00302 00303 error_t mqttClientShutdownConnection(MqttClientContext *context) 00304 { 00305 error_t error; 00306 00307 //TCP transport protocol? 00308 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00309 { 00310 //Set timeout 00311 error = socketSetTimeout(context->socket, context->settings.timeout); 00312 00313 //Check status code 00314 if(!error) 00315 { 00316 //Shutdown TCP connection 00317 error = socketShutdown(context->socket, SOCKET_SD_BOTH); 00318 } 00319 } 00320 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00321 //TLS transport protocol? 00322 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00323 { 00324 //Set timeout 00325 error = socketSetTimeout(context->socket, context->settings.timeout); 00326 00327 //Check status code 00328 if(!error) 00329 { 00330 //Shutdown SSL/TLS session 00331 error = tlsShutdown(context->tlsContext); 00332 } 00333 00334 //Check status code 00335 if(!error) 00336 { 00337 //Shutdown TCP connection 00338 error = socketShutdown(context->socket, SOCKET_SD_BOTH); 00339 } 00340 } 00341 #endif 00342 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00343 //WebSocket transport protocol? 00344 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS || 00345 context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00346 { 00347 //Set timeout 00348 error = webSocketSetTimeout(context->webSocket, context->settings.timeout); 00349 00350 //Check status code 00351 if(!error) 00352 { 00353 //Connect to the MQTT server using WebSocket 00354 error = webSocketShutdown(context->webSocket); 00355 } 00356 } 00357 #endif 00358 //Unknown transport protocol? 00359 else 00360 { 00361 //Report an error 00362 error = ERROR_INVALID_PROTOCOL; 00363 } 00364 00365 //Return status code 00366 return error; 00367 } 00368 00369 00370 /** 00371 * @brief Close network connection 00372 * @param[in] context Pointer to the MQTT client context 00373 **/ 00374 00375 void mqttClientCloseConnection(MqttClientContext *context) 00376 { 00377 //TCP transport protocol? 00378 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00379 { 00380 //Close TCP connection 00381 if(context->socket != NULL) 00382 { 00383 socketClose(context->socket); 00384 context->socket = NULL; 00385 } 00386 } 00387 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00388 //TLS transport protocol? 00389 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00390 { 00391 //Release SSL context 00392 if(context->tlsContext != NULL) 00393 { 00394 tlsFree(context->tlsContext); 00395 context->tlsContext = NULL; 00396 } 00397 00398 //Close TCP connection 00399 if(context->socket != NULL) 00400 { 00401 socketClose(context->socket); 00402 context->socket = NULL; 00403 } 00404 } 00405 #endif 00406 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00407 //WebSocket transport protocol? 00408 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS || 00409 context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00410 { 00411 //Close WebSocket connection 00412 if(context->webSocket != NULL) 00413 { 00414 webSocketClose(context->webSocket); 00415 context->webSocket = NULL; 00416 } 00417 } 00418 #endif 00419 } 00420 00421 00422 /** 00423 * @brief Send data using the relevant transport protocol 00424 * @param[in] context Pointer to the MQTT client context 00425 * @param[in] data Pointer to a buffer containing the data to be transmitted 00426 * @param[in] length Number of bytes to be transmitted 00427 * @param[out] written Actual number of bytes written (optional parameter) 00428 * @param[in] flags Set of flags that influences the behavior of this function 00429 * @return Error code 00430 **/ 00431 00432 error_t mqttClientSendData(MqttClientContext *context, 00433 const void *data, size_t length, size_t *written, uint_t flags) 00434 { 00435 error_t error; 00436 00437 //TCP transport protocol? 00438 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00439 { 00440 //Set timeout 00441 error = socketSetTimeout(context->socket, context->settings.timeout); 00442 00443 //Check status code 00444 if(!error) 00445 { 00446 //Transmit data 00447 error = socketSend(context->socket, data, length, written, flags); 00448 } 00449 } 00450 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00451 //TLS transport protocol? 00452 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00453 { 00454 //Set timeout 00455 error = socketSetTimeout(context->socket, context->settings.timeout); 00456 00457 //Check status code 00458 if(!error) 00459 { 00460 //Transmit data 00461 error = tlsWrite(context->tlsContext, data, length, written, flags); 00462 } 00463 } 00464 #endif 00465 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00466 //WebSocket transport protocol? 00467 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS || 00468 context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00469 { 00470 //Set timeout 00471 error = webSocketSetTimeout(context->webSocket, context->settings.timeout); 00472 00473 //Check status code 00474 if(!error) 00475 { 00476 //MQTT control packets must be sent in WebSocket binary data frames 00477 error = webSocketSend(context->webSocket, data, length, 00478 WS_FRAME_TYPE_BINARY, written); 00479 } 00480 } 00481 #endif 00482 //Unknown transport protocol? 00483 else 00484 { 00485 //Report an error 00486 error = ERROR_INVALID_PROTOCOL; 00487 } 00488 00489 //Return status code 00490 return error; 00491 } 00492 00493 00494 /** 00495 * @brief Receive data using the relevant transport protocol 00496 * @param[in] context Pointer to the MQTT client context 00497 * @param[out] data Buffer into which received data will be placed 00498 * @param[in] size Maximum number of bytes that can be received 00499 * @param[out] received Number of bytes that have been received 00500 * @param[in] flags Set of flags that influences the behavior of this function 00501 * @return Error code 00502 **/ 00503 00504 error_t mqttClientReceiveData(MqttClientContext *context, 00505 void *data, size_t size, size_t *received, uint_t flags) 00506 { 00507 error_t error; 00508 00509 //No data has been read yet 00510 *received = 0; 00511 00512 //TCP transport protocol? 00513 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00514 { 00515 //Set timeout 00516 error = socketSetTimeout(context->socket, context->settings.timeout); 00517 00518 //Check status code 00519 if(!error) 00520 { 00521 //Receive data 00522 error = socketReceive(context->socket, data, size, received, flags); 00523 } 00524 } 00525 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00526 //TLS transport protocol? 00527 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00528 { 00529 //Set timeout 00530 error = socketSetTimeout(context->socket, context->settings.timeout); 00531 00532 //Check status code 00533 if(!error) 00534 { 00535 //Receive data 00536 error = tlsRead(context->tlsContext, data, size, received, flags); 00537 } 00538 } 00539 #endif 00540 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00541 //WebSocket transport protocol? 00542 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS || 00543 context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00544 { 00545 WebSocketFrameType type; 00546 00547 //Set timeout 00548 error = webSocketSetTimeout(context->webSocket, context->settings.timeout); 00549 00550 //Check status code 00551 if(!error) 00552 { 00553 //Receive data 00554 error = webSocketReceive(context->webSocket, data, size, &type, received); 00555 } 00556 00557 //Check status code 00558 if(!error) 00559 { 00560 //MQTT control packets must be sent in WebSocket binary data frames. If 00561 //any other type of data frame is received the recipient must close the 00562 //network connection 00563 if(type != WS_FRAME_TYPE_BINARY && type != WS_FRAME_TYPE_CONTINUATION) 00564 error = ERROR_INVALID_TYPE; 00565 } 00566 } 00567 #endif 00568 //Unknown transport protocol? 00569 else 00570 { 00571 //Report an error 00572 error = ERROR_INVALID_PROTOCOL; 00573 } 00574 00575 //Return status code 00576 return error; 00577 } 00578 00579 00580 /** 00581 * @brief Wait for incoming data 00582 * @param[in] context Pointer to the MQTT client context 00583 * @param[in] timeout Maximum time to wait before returning 00584 * @return Error code 00585 **/ 00586 00587 error_t mqttClientWaitForData(MqttClientContext *context, systime_t timeout) 00588 { 00589 uint_t event; 00590 00591 //TCP transport protocol? 00592 if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP) 00593 { 00594 //Get exclusive access 00595 osAcquireMutex(&netMutex); 00596 //Wait for some data to be available for reading 00597 event = tcpWaitForEvents(context->socket, SOCKET_EVENT_RX_READY, timeout); 00598 //Release exclusive access 00599 osReleaseMutex(&netMutex); 00600 } 00601 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED) 00602 //TLS transport protocol? 00603 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS) 00604 { 00605 //Sanity check 00606 if(context->tlsContext == NULL) 00607 return ERROR_FAILURE; 00608 00609 //Check whether some data is pending in the receive buffer 00610 if(context->tlsContext->rxBufferLen > 0) 00611 { 00612 //No need to poll the underlying socket for incoming traffic... 00613 event = SOCKET_EVENT_RX_READY; 00614 } 00615 else 00616 { 00617 //Get exclusive access 00618 osAcquireMutex(&netMutex); 00619 //Wait for some data to be available for reading 00620 event = tcpWaitForEvents(context->socket, SOCKET_EVENT_RX_READY, timeout); 00621 //Release exclusive access 00622 osReleaseMutex(&netMutex); 00623 } 00624 } 00625 #endif 00626 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED) 00627 //WebSocket transport protocol? 00628 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS) 00629 { 00630 //Sanity check 00631 if(context->webSocket == NULL) 00632 return ERROR_FAILURE; 00633 00634 //Get exclusive access 00635 osAcquireMutex(&netMutex); 00636 //Wait for some data to be available for reading 00637 event = tcpWaitForEvents(context->webSocket->socket, SOCKET_EVENT_RX_READY, timeout); 00638 //Release exclusive access 00639 osReleaseMutex(&netMutex); 00640 } 00641 #endif 00642 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED && WEB_SOCKET_TLS_SUPPORT) 00643 //Secure WebSocket transport protocol? 00644 else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS) 00645 { 00646 //Sanity check 00647 if(context->webSocket == NULL || context->webSocket->tlsContext == NULL) 00648 return ERROR_FAILURE; 00649 00650 //Check whether some data is pending in the receive buffer 00651 if(context->webSocket->tlsContext->rxBufferLen > 0) 00652 { 00653 //No need to poll the underlying socket for incoming traffic... 00654 event = SOCKET_EVENT_RX_READY; 00655 } 00656 else 00657 { 00658 //Get exclusive access 00659 osAcquireMutex(&netMutex); 00660 //Wait for some data to be available for reading 00661 event = tcpWaitForEvents(context->webSocket->socket, SOCKET_EVENT_RX_READY, timeout); 00662 //Release exclusive access 00663 osReleaseMutex(&netMutex); 00664 } 00665 } 00666 #endif 00667 //Unknown transport protocol? 00668 else 00669 { 00670 //Report an error 00671 return ERROR_INVALID_PROTOCOL; 00672 } 00673 00674 //Check whether some data is available for reading 00675 if(event == SOCKET_EVENT_RX_READY) 00676 return NO_ERROR; 00677 else 00678 return ERROR_TIMEOUT; 00679 } 00680 00681 #endif 00682
Generated on Tue Jul 12 2022 17:10:15 by
