Dependents:   Nucleo

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers mqtt_client_transport.c Source File

mqtt_client_transport.c

Go to the documentation of this file.
00001 /**
00002  * @file mqtt_client_transport.c
00003  * @brief Transport protocol abstraction layer
00004  *
00005  * @section License
00006  *
00007  * Copyright (C) 2010-2017 Oryx Embedded SARL. All rights reserved.
00008  *
00009  * This file is part of CycloneSSL Open.
00010  *
00011  * This program is free software; you can redistribute it and/or
00012  * modify it under the terms of the GNU General Public License
00013  * as published by the Free Software Foundation; either version 2
00014  * of the License, or (at your option) any later version.
00015  *
00016  * This program is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  * GNU General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU General Public License
00022  * along with this program; if not, write to the Free Software Foundation,
00023  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00024  *
00025  * @author Oryx Embedded SARL (www.oryx-embedded.com)
00026  * @version 1.7.6
00027  **/
00028 
00029 //Switch to the appropriate trace level
00030 #define TRACE_LEVEL MQTT_TRACE_LEVEL
00031 
00032 //Dependencies
00033 #include "core/net.h"
00034 #include "core/tcp_misc.h"
00035 #include "mqtt/mqtt_client.h"
00036 #include "mqtt/mqtt_client_packet.h"
00037 #include "mqtt/mqtt_client_transport.h"
00038 #include "mqtt/mqtt_client_misc.h"
00039 #include "debug.h"
00040 
00041 //Check TCP/IP stack configuration
00042 #if (MQTT_CLIENT_SUPPORT == ENABLED)
00043 
00044 
00045 /**
00046  * @brief Open network connection
00047  * @param[in] context Pointer to the MQTT client context
00048  * @return Error code
00049  **/
00050 
00051 error_t mqttClientOpenConnection(MqttClientContext *context)
00052 {
00053    error_t error;
00054 
00055    //TCP transport protocol?
00056    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00057    {
00058       //Open a TCP socket
00059       context->socket = socketOpen(SOCKET_TYPE_STREAM, SOCKET_IP_PROTO_TCP);
00060 
00061       //Valid socket handle?
00062       if(context->socket != NULL)
00063       {
00064          //Associate the socket with the relevant interface
00065          error = socketBindToInterface(context->socket, context->interface);
00066       }
00067       else
00068       {
00069          //Report an error
00070          error = ERROR_OPEN_FAILED;
00071       }
00072    }
00073 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00074    //TLS transport protocol?
00075    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00076    {
00077       //Open a TCP socket
00078       context->socket = socketOpen(SOCKET_TYPE_STREAM, SOCKET_IP_PROTO_TCP);
00079 
00080       //Valid socket handle?
00081       if(context->socket != NULL)
00082       {
00083          //Associate the socket with the relevant interface
00084          error = socketBindToInterface(context->socket, context->interface);
00085 
00086          //Check status code
00087          if(!error)
00088          {
00089             //Allocate SSL/TLS context
00090             context->tlsContext = tlsInit();
00091 
00092             //Valid SSL/TLS handle?
00093             if(context->tlsContext != NULL)
00094             {
00095                //Select client operation mode
00096                error = tlsSetConnectionEnd(context->tlsContext,
00097                   TLS_CONNECTION_END_CLIENT);
00098 
00099                //Check status code
00100                if(!error)
00101                {
00102                   //Bind TLS to the relevant socket
00103                   error = tlsSetSocket(context->tlsContext, context->socket);
00104                }
00105 
00106                //Check status code
00107                if(!error)
00108                {
00109                   //Restore SSL/TLS session, if any
00110                   if(context->tlsSession.idLength > 0)
00111                   {
00112                      //Restore SSL/TLS session
00113                      error = tlsRestoreSession(context->tlsContext,
00114                         &context->tlsSession);
00115                   }
00116                }
00117 
00118                //Check status code
00119                if(!error)
00120                {
00121                   //Invoke user-defined callback, if any
00122                   if(context->callbacks.tlsInitCallback != NULL)
00123                   {
00124                      //Perform SSL/TLS related initialization
00125                      error = context->callbacks.tlsInitCallback(context,
00126                         context->tlsContext);
00127                   }
00128                }
00129             }
00130             else
00131             {
00132                //Report an error
00133                error = ERROR_OPEN_FAILED;
00134             }
00135          }
00136       }
00137       else
00138       {
00139          //Report an error
00140          error = ERROR_OPEN_FAILED;
00141       }
00142    }
00143 #endif
00144 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00145    //WebSocket transport protocol?
00146    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS)
00147    {
00148       //Open a WebSocket
00149       context->webSocket = webSocketOpen();
00150 
00151       //Valid WebSocket handle?
00152       if(context->webSocket != NULL)
00153       {
00154          //Associate the WebSocket with the relevant interface
00155          error = webSocketBindToInterface(context->webSocket,
00156             context->interface);
00157       }
00158       else
00159       {
00160          //Report an error
00161          error = ERROR_OPEN_FAILED;
00162       }
00163    }
00164    //Secure WebSocket transport protocol?
00165    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00166    {
00167       //Open a WebSocket
00168       context->webSocket = webSocketOpen();
00169 
00170       //Valid WebSocket handle?
00171       if(context->webSocket != NULL)
00172       {
00173          //Associate the WebSocket with the relevant interface
00174          error = webSocketBindToInterface(context->webSocket,
00175             context->interface);
00176 
00177          //Check status code
00178          if(!error)
00179          {
00180             //Register SSL/TLS initialization callback
00181             error = webSocketRegisterTlsInitCallback(context->webSocket,
00182                (WebSocketTlsInitCallback) context->callbacks.tlsInitCallback);
00183          }
00184       }
00185       else
00186       {
00187          //Report an error
00188          error = ERROR_OPEN_FAILED;
00189       }
00190    }
00191 #endif
00192    //Unknown transport protocol?
00193    else
00194    {
00195       //Report an error
00196       error = ERROR_INVALID_PROTOCOL;
00197    }
00198 
00199    //Return status code
00200    return error;
00201 }
00202 
00203 
00204 /**
00205  * @brief Establish network connection
00206  * @param[in] context Pointer to the MQTT client context
00207  * @param[in] serverIpAddr IP address of the MQTT server to connect to
00208  * @param[in] serverPort TCP port number that will be used to establish the
00209  *   connection
00210  * @return Error code
00211  **/
00212 
00213 error_t mqttClientEstablishConnection(MqttClientContext *context,
00214    const IpAddr *serverIpAddr, uint16_t serverPort)
00215 {
00216    error_t error;
00217 
00218    //TCP transport protocol?
00219    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00220    {
00221       //Set timeout
00222       error = socketSetTimeout(context->socket, context->settings.timeout);
00223 
00224       //Check status code
00225       if(!error)
00226       {
00227          //Connect to the MQTT server using TCP
00228          error = socketConnect(context->socket, serverIpAddr, serverPort);
00229       }
00230    }
00231 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00232    //TLS transport protocol?
00233    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00234    {
00235       //Set timeout
00236       error = socketSetTimeout(context->socket, context->settings.timeout);
00237 
00238       //Check status code
00239       if(!error)
00240       {
00241          //Connect to the MQTT server using TCP
00242          error = socketConnect(context->socket, serverIpAddr, serverPort);
00243       }
00244 
00245       //Check status code
00246       if(!error)
00247       {
00248          //Establish a SSL/TLS session
00249          error = tlsConnect(context->tlsContext);
00250       }
00251    }
00252 #endif
00253 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00254    //WebSocket transport protocol?
00255    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS ||
00256       context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00257    {
00258       //Set timeout
00259       error = webSocketSetTimeout(context->webSocket, context->settings.timeout);
00260 
00261       //Check status code
00262       if(!error)
00263       {
00264          //Set the hostname of the remote server
00265          error = webSocketSetHost(context->webSocket, context->settings.host);
00266       }
00267 
00268       //Check status code
00269       if(!error)
00270       {
00271          //The client MUST include "mqtt" in the list of WebSocket
00272          //sub-protocols it offers
00273          error = webSocketSetSubProtocol(context->webSocket, "mqtt");
00274       }
00275 
00276       //Check status code
00277       if(!error)
00278       {
00279          //Connect to the MQTT server using WebSocket
00280          error = webSocketConnect(context->webSocket, serverIpAddr,
00281             serverPort, context->settings.uri);
00282       }
00283    }
00284 #endif
00285    //Unknown transport protocol?
00286    else
00287    {
00288       //Report an error
00289       error = ERROR_INVALID_PROTOCOL;
00290    }
00291 
00292    //Return status code
00293    return error;
00294 }
00295 
00296 
00297 /**
00298  * @brief Shutdown network connection
00299  * @param[in] context Pointer to the MQTT client context
00300  * @return Error code
00301  **/
00302 
00303 error_t mqttClientShutdownConnection(MqttClientContext *context)
00304 {
00305    error_t error;
00306 
00307    //TCP transport protocol?
00308    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00309    {
00310       //Set timeout
00311       error = socketSetTimeout(context->socket, context->settings.timeout);
00312 
00313       //Check status code
00314       if(!error)
00315       {
00316          //Shutdown TCP connection
00317          error = socketShutdown(context->socket, SOCKET_SD_BOTH);
00318       }
00319    }
00320 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00321    //TLS transport protocol?
00322    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00323    {
00324       //Set timeout
00325       error = socketSetTimeout(context->socket, context->settings.timeout);
00326 
00327       //Check status code
00328       if(!error)
00329       {
00330          //Shutdown SSL/TLS session
00331          error = tlsShutdown(context->tlsContext);
00332       }
00333 
00334       //Check status code
00335       if(!error)
00336       {
00337          //Shutdown TCP connection
00338          error = socketShutdown(context->socket, SOCKET_SD_BOTH);
00339       }
00340    }
00341 #endif
00342 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00343    //WebSocket transport protocol?
00344    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS ||
00345       context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00346    {
00347       //Set timeout
00348       error = webSocketSetTimeout(context->webSocket, context->settings.timeout);
00349 
00350       //Check status code
00351       if(!error)
00352       {
00353          //Connect to the MQTT server using WebSocket
00354          error = webSocketShutdown(context->webSocket);
00355       }
00356    }
00357 #endif
00358    //Unknown transport protocol?
00359    else
00360    {
00361       //Report an error
00362       error = ERROR_INVALID_PROTOCOL;
00363    }
00364 
00365    //Return status code
00366    return error;
00367 }
00368 
00369 
00370 /**
00371  * @brief Close network connection
00372  * @param[in] context Pointer to the MQTT client context
00373  **/
00374 
00375 void mqttClientCloseConnection(MqttClientContext *context)
00376 {
00377    //TCP transport protocol?
00378    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00379    {
00380       //Close TCP connection
00381       if(context->socket != NULL)
00382       {
00383          socketClose(context->socket);
00384          context->socket = NULL;
00385       }
00386    }
00387 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00388    //TLS transport protocol?
00389    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00390    {
00391       //Release SSL context
00392       if(context->tlsContext != NULL)
00393       {
00394          tlsFree(context->tlsContext);
00395          context->tlsContext = NULL;
00396       }
00397 
00398       //Close TCP connection
00399       if(context->socket != NULL)
00400       {
00401          socketClose(context->socket);
00402          context->socket = NULL;
00403       }
00404    }
00405 #endif
00406 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00407    //WebSocket transport protocol?
00408    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS ||
00409       context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00410    {
00411       //Close WebSocket connection
00412       if(context->webSocket != NULL)
00413       {
00414          webSocketClose(context->webSocket);
00415          context->webSocket = NULL;
00416       }
00417    }
00418 #endif
00419 }
00420 
00421 
00422 /**
00423  * @brief Send data using the relevant transport protocol
00424  * @param[in] context Pointer to the MQTT client context
00425  * @param[in] data Pointer to a buffer containing the data to be transmitted
00426  * @param[in] length Number of bytes to be transmitted
00427  * @param[out] written Actual number of bytes written (optional parameter)
00428  * @param[in] flags Set of flags that influences the behavior of this function
00429  * @return Error code
00430  **/
00431 
00432 error_t mqttClientSendData(MqttClientContext *context,
00433    const void *data, size_t length, size_t *written, uint_t flags)
00434 {
00435    error_t error;
00436 
00437    //TCP transport protocol?
00438    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00439    {
00440       //Set timeout
00441       error = socketSetTimeout(context->socket, context->settings.timeout);
00442 
00443       //Check status code
00444       if(!error)
00445       {
00446          //Transmit data
00447          error = socketSend(context->socket, data, length, written, flags);
00448       }
00449    }
00450 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00451    //TLS transport protocol?
00452    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00453    {
00454       //Set timeout
00455       error = socketSetTimeout(context->socket, context->settings.timeout);
00456 
00457       //Check status code
00458       if(!error)
00459       {
00460          //Transmit data
00461          error = tlsWrite(context->tlsContext, data, length, written, flags);
00462       }
00463    }
00464 #endif
00465 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00466    //WebSocket transport protocol?
00467    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS ||
00468       context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00469    {
00470       //Set timeout
00471       error = webSocketSetTimeout(context->webSocket, context->settings.timeout);
00472 
00473       //Check status code
00474       if(!error)
00475       {
00476          //MQTT control packets must be sent in WebSocket binary data frames
00477          error = webSocketSend(context->webSocket, data, length,
00478             WS_FRAME_TYPE_BINARY, written);
00479       }
00480    }
00481 #endif
00482    //Unknown transport protocol?
00483    else
00484    {
00485       //Report an error
00486       error = ERROR_INVALID_PROTOCOL;
00487    }
00488 
00489    //Return status code
00490    return error;
00491 }
00492 
00493 
00494 /**
00495  * @brief Receive data using the relevant transport protocol
00496  * @param[in] context Pointer to the MQTT client context
00497  * @param[out] data Buffer into which received data will be placed
00498  * @param[in] size Maximum number of bytes that can be received
00499  * @param[out] received Number of bytes that have been received
00500  * @param[in] flags Set of flags that influences the behavior of this function
00501  * @return Error code
00502  **/
00503 
00504 error_t mqttClientReceiveData(MqttClientContext *context,
00505    void *data, size_t size, size_t *received, uint_t flags)
00506 {
00507    error_t error;
00508 
00509    //No data has been read yet
00510    *received = 0;
00511 
00512    //TCP transport protocol?
00513    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00514    {
00515       //Set timeout
00516       error = socketSetTimeout(context->socket, context->settings.timeout);
00517 
00518       //Check status code
00519       if(!error)
00520       {
00521          //Receive data
00522          error = socketReceive(context->socket, data, size, received, flags);
00523       }
00524    }
00525 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00526    //TLS transport protocol?
00527    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00528    {
00529       //Set timeout
00530       error = socketSetTimeout(context->socket, context->settings.timeout);
00531 
00532       //Check status code
00533       if(!error)
00534       {
00535          //Receive data
00536          error = tlsRead(context->tlsContext, data, size, received, flags);
00537       }
00538    }
00539 #endif
00540 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00541    //WebSocket transport protocol?
00542    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS ||
00543       context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00544    {
00545       WebSocketFrameType type;
00546 
00547       //Set timeout
00548       error = webSocketSetTimeout(context->webSocket, context->settings.timeout);
00549 
00550       //Check status code
00551       if(!error)
00552       {
00553          //Receive data
00554          error = webSocketReceive(context->webSocket, data, size, &type, received);
00555       }
00556 
00557       //Check status code
00558       if(!error)
00559       {
00560          //MQTT control packets must be sent in WebSocket binary data frames. If
00561          //any other type of data frame is received the recipient must close the
00562          //network connection
00563          if(type != WS_FRAME_TYPE_BINARY && type != WS_FRAME_TYPE_CONTINUATION)
00564             error = ERROR_INVALID_TYPE;
00565       }
00566    }
00567 #endif
00568    //Unknown transport protocol?
00569    else
00570    {
00571       //Report an error
00572       error = ERROR_INVALID_PROTOCOL;
00573    }
00574 
00575    //Return status code
00576    return error;
00577 }
00578 
00579 
00580 /**
00581  * @brief Wait for incoming data
00582  * @param[in] context Pointer to the MQTT client context
00583  * @param[in] timeout Maximum time to wait before returning
00584  * @return Error code
00585  **/
00586 
00587 error_t mqttClientWaitForData(MqttClientContext *context, systime_t timeout)
00588 {
00589    uint_t event;
00590 
00591    //TCP transport protocol?
00592    if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TCP)
00593    {
00594       //Get exclusive access
00595       osAcquireMutex(&netMutex);
00596       //Wait for some data to be available for reading
00597       event = tcpWaitForEvents(context->socket, SOCKET_EVENT_RX_READY, timeout);
00598       //Release exclusive access
00599       osReleaseMutex(&netMutex);
00600    }
00601 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
00602    //TLS transport protocol?
00603    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_TLS)
00604    {
00605       //Sanity check
00606       if(context->tlsContext == NULL)
00607          return ERROR_FAILURE;
00608 
00609       //Check whether some data is pending in the receive buffer
00610       if(context->tlsContext->rxBufferLen > 0)
00611       {
00612          //No need to poll the underlying socket for incoming traffic...
00613          event = SOCKET_EVENT_RX_READY;
00614       }
00615       else
00616       {
00617          //Get exclusive access
00618          osAcquireMutex(&netMutex);
00619          //Wait for some data to be available for reading
00620          event = tcpWaitForEvents(context->socket, SOCKET_EVENT_RX_READY, timeout);
00621          //Release exclusive access
00622          osReleaseMutex(&netMutex);
00623       }
00624    }
00625 #endif
00626 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
00627    //WebSocket transport protocol?
00628    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WS)
00629    {
00630       //Sanity check
00631       if(context->webSocket == NULL)
00632          return ERROR_FAILURE;
00633 
00634       //Get exclusive access
00635       osAcquireMutex(&netMutex);
00636       //Wait for some data to be available for reading
00637       event = tcpWaitForEvents(context->webSocket->socket, SOCKET_EVENT_RX_READY, timeout);
00638       //Release exclusive access
00639       osReleaseMutex(&netMutex);
00640    }
00641 #endif
00642 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED && WEB_SOCKET_TLS_SUPPORT)
00643    //Secure WebSocket transport protocol?
00644    else if(context->settings.transportProtocol == MQTT_TRANSPORT_PROTOCOL_WSS)
00645    {
00646       //Sanity check
00647       if(context->webSocket == NULL || context->webSocket->tlsContext == NULL)
00648          return ERROR_FAILURE;
00649 
00650       //Check whether some data is pending in the receive buffer
00651       if(context->webSocket->tlsContext->rxBufferLen > 0)
00652       {
00653          //No need to poll the underlying socket for incoming traffic...
00654          event = SOCKET_EVENT_RX_READY;
00655       }
00656       else
00657       {
00658          //Get exclusive access
00659          osAcquireMutex(&netMutex);
00660          //Wait for some data to be available for reading
00661          event = tcpWaitForEvents(context->webSocket->socket, SOCKET_EVENT_RX_READY, timeout);
00662          //Release exclusive access
00663          osReleaseMutex(&netMutex);
00664       }
00665    }
00666 #endif
00667    //Unknown transport protocol?
00668    else
00669    {
00670       //Report an error
00671       return ERROR_INVALID_PROTOCOL;
00672    }
00673 
00674    //Check whether some data is available for reading
00675    if(event == SOCKET_EVENT_RX_READY)
00676       return NO_ERROR;
00677    else
00678       return ERROR_TIMEOUT;
00679 }
00680 
00681 #endif
00682