Azure IoT / iothub_client

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers iothubtransport.c Source File

iothubtransport.c

00001 // Copyright (c) Microsoft. All rights reserved.
00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
00003 
00004 #include <stdlib.h>
00005 #include <signal.h>
00006 #include <stddef.h>
00007 #include "azure_c_shared_utility/gballoc.h"
00008 #include "azure_c_shared_utility/crt_abstractions.h"
00009 #include "internal/iothubtransport.h"
00010 #include "iothub_client_core.h"
00011 #include "internal/iothub_client_private.h"
00012 #include "azure_c_shared_utility/threadapi.h"
00013 #include "azure_c_shared_utility/lock.h"
00014 #include "azure_c_shared_utility/xlogging.h"
00015 #include "azure_c_shared_utility/vector.h"
00016 
00017 #include "internal/iothubtransport.h"
00018 #include "internal/iothub_client_private.h"
00019 #include "iothub_transport_ll.h"
00020 #include "iothub_client_core.h"
00021 
00022 typedef struct TRANSPORT_HANDLE_DATA_TAG
00023 {
00024     TRANSPORT_LL_HANDLE transportLLHandle;
00025     THREAD_HANDLE workerThreadHandle;
00026     LOCK_HANDLE lockHandle;
00027     sig_atomic_t stopThread;
00028     TRANSPORT_PROVIDER_FIELDS;
00029     VECTOR_HANDLE clients;
00030     LOCK_HANDLE clientsLockHandle;
00031     IOTHUB_CLIENT_MULTIPLEXED_DO_WORK clientDoWork;
00032 } TRANSPORT_HANDLE_DATA;
00033 
00034 /* Used for Unit test */
00035 const size_t IoTHubTransport_ThreadTerminationOffset = offsetof(TRANSPORT_HANDLE_DATA, stopThread);
00036 
00037 TRANSPORT_HANDLE IoTHubTransport_Create(IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol, const char* iotHubName, const char* iotHubSuffix)
00038 {
00039     TRANSPORT_HANDLE_DATA *result;
00040 
00041     if (protocol == NULL || iotHubName == NULL || iotHubSuffix == NULL)
00042     {
00043         /*Codes_SRS_IOTHUBTRANSPORT_17_002: [ If protocol is NULL, this function shall return NULL. ]*/
00044         /*Codes_SRS_IOTHUBTRANSPORT_17_003: [ If iotHubName is NULL, this function shall return NULL. ]*/
00045         /*Codes_SRS_IOTHUBTRANSPORT_17_004: [ If iotHubSuffix is NULL, this function shall return NULL. ]*/
00046         LogError("Invalid NULL argument, protocol [%p], name [%p], suffix [%p].", protocol, iotHubName, iotHubSuffix);
00047         result = NULL;
00048     }
00049     else
00050     {
00051         /*Codes_SRS_IOTHUBTRANSPORT_17_032: [ IoTHubTransport_Create shall allocate memory for the transport data. ]*/
00052         result = (TRANSPORT_HANDLE_DATA*)malloc(sizeof(TRANSPORT_HANDLE_DATA));
00053         if (result == NULL)
00054         {
00055             /*Codes_SRS_IOTHUBTRANSPORT_17_040: [ If memory allocation fails, IoTHubTransport_Create shall return NULL. ]*/
00056             LogError("Transport handle was not allocated.");
00057         }
00058         else
00059         {
00060             TRANSPORT_PROVIDER * transportProtocol = (TRANSPORT_PROVIDER*)(protocol());
00061             IOTHUB_CLIENT_CONFIG upperConfig;
00062             upperConfig.deviceId = NULL;
00063             upperConfig.deviceKey = NULL;
00064             upperConfig.iotHubName = iotHubName;
00065             upperConfig.iotHubSuffix = iotHubSuffix;
00066             upperConfig.protocol = protocol;
00067             upperConfig.protocolGatewayHostName = NULL;
00068 
00069             IOTHUBTRANSPORT_CONFIG transportLLConfig;
00070             memset(&transportLLConfig, 0, sizeof(IOTHUBTRANSPORT_CONFIG));
00071             transportLLConfig.upperConfig = &upperConfig;
00072             transportLLConfig.waitingToSend = NULL;
00073 
00074             /*Codes_SRS_IOTHUBTRANSPORT_17_005: [ IoTHubTransport_Create shall create the lower layer transport by calling the protocol's IoTHubTransport_Create function. ]*/
00075             result->transportLLHandle = transportProtocol->IoTHubTransport_Create(&transportLLConfig);
00076             if (result->transportLLHandle == NULL)
00077             {
00078                 /*Codes_SRS_IOTHUBTRANSPORT_17_006: [ If the creation of the transport fails, IoTHubTransport_Create shall return NULL. ]*/
00079                 LogError("Lower Layer transport not created.");
00080                 free(result);
00081                 result = NULL;
00082             }
00083             else
00084             {
00085                 /*Codes_SRS_IOTHUBTRANSPORT_17_007: [ IoTHubTransport_Create shall create the transport lock by Calling Lock_Init. ]*/
00086                 result->lockHandle = Lock_Init();
00087                 if (result->lockHandle == NULL)
00088                 {
00089                     /*Codes_SRS_IOTHUBTRANSPORT_17_008: [ If the lock creation fails, IoTHubTransport_Create shall return NULL. ]*/
00090                     LogError("transport Lock not created.");
00091                     transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
00092                     free(result);
00093                     result = NULL;
00094                 }
00095                 else if ((result->clientsLockHandle = Lock_Init()) == NULL)
00096                 {
00097                     LogError("clients Lock not created.");
00098                     Lock_Deinit(result->lockHandle);
00099                     transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
00100                     free(result);
00101                     result = NULL;
00102                 }
00103                 else
00104                 {
00105                     /*Codes_SRS_IOTHUBTRANSPORT_17_038: [ IoTHubTransport_Create shall call VECTOR_Create to make a list of IOTHUB_CLIENT_CORE_HANDLE using this transport. ]*/
00106                     result->clients = VECTOR_create(sizeof(IOTHUB_CLIENT_CORE_HANDLE));
00107                     if (result->clients == NULL)
00108                     {
00109                         /*Codes_SRS_IOTHUBTRANSPORT_17_039: [ If the Vector creation fails, IoTHubTransport_Create shall return NULL. ]*/
00110                         /*Codes_SRS_IOTHUBTRANSPORT_17_009: [ IoTHubTransport_Create shall clean up any resources it creates if the function does not succeed. ]*/
00111                         LogError("clients list not created.");
00112                         Lock_Deinit(result->clientsLockHandle);
00113                         Lock_Deinit(result->lockHandle);
00114                         transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
00115                         free(result);
00116                         result = NULL;
00117                     }
00118                     else
00119                     {
00120                         /*Codes_SRS_IOTHUBTRANSPORT_17_001: [ IoTHubTransport_Create shall return a non-NULL handle on success.]*/
00121                         result->stopThread = 1;
00122                         result->clientDoWork = NULL;
00123                         result->workerThreadHandle = NULL; /* create thread when work needs to be done */
00124                         result->IoTHubTransport_GetHostname = transportProtocol->IoTHubTransport_GetHostname;
00125                         result->IoTHubTransport_SetOption = transportProtocol->IoTHubTransport_SetOption;
00126                         result->IoTHubTransport_Create = transportProtocol->IoTHubTransport_Create;
00127                         result->IoTHubTransport_Destroy = transportProtocol->IoTHubTransport_Destroy;
00128                         result->IoTHubTransport_Register = transportProtocol->IoTHubTransport_Register;
00129                         result->IoTHubTransport_Unregister = transportProtocol->IoTHubTransport_Unregister;
00130                         result->IoTHubTransport_Subscribe = transportProtocol->IoTHubTransport_Subscribe;
00131                         result->IoTHubTransport_Unsubscribe = transportProtocol->IoTHubTransport_Unsubscribe;
00132                         result->IoTHubTransport_DoWork = transportProtocol->IoTHubTransport_DoWork;
00133                         result->IoTHubTransport_SetRetryPolicy = transportProtocol->IoTHubTransport_SetRetryPolicy;
00134                         result->IoTHubTransport_GetSendStatus = transportProtocol->IoTHubTransport_GetSendStatus;
00135                     }
00136                 }
00137             }
00138         }
00139     }
00140 
00141     return result;
00142 }
00143 
00144 static void multiplexed_client_do_work(TRANSPORT_HANDLE_DATA* transportData)
00145 {
00146     if (Lock(transportData->clientsLockHandle) != LOCK_OK)
00147     {
00148         LogError("failed to lock for multiplexed_client_do_work");
00149     }
00150     else
00151     {
00152         size_t numberOfClients;
00153         size_t iterator;
00154 
00155         numberOfClients = VECTOR_size(transportData->clients);
00156         for (iterator = 0; iterator < numberOfClients; iterator++)
00157         {
00158             IOTHUB_CLIENT_CORE_HANDLE* clientHandle = (IOTHUB_CLIENT_CORE_HANDLE*)VECTOR_element(transportData->clients, iterator);
00159 
00160             if (clientHandle != NULL)
00161             {
00162                 transportData->clientDoWork(*clientHandle);
00163             }
00164         }
00165 
00166         if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
00167         {
00168             LogError("failed to unlock on multiplexed_client_do_work");
00169         }
00170     }
00171 }
00172 
00173 static int transport_worker_thread(void* threadArgument)
00174 {
00175     TRANSPORT_HANDLE_DATA* transportData = (TRANSPORT_HANDLE_DATA*)threadArgument;
00176 
00177     while (1)
00178     {
00179         /*Codes_SRS_IOTHUBTRANSPORT_17_030: [ All calls to lower layer transport DoWork shall be protected by the lock created in IoTHubTransport_Create. ]*/
00180         if (Lock(transportData->lockHandle) == LOCK_OK)
00181         {
00182             /*Codes_SRS_IOTHUBTRANSPORT_17_031: [ If acquiring the lock fails, lower layer transport DoWork shall not be called. ]*/
00183             if (transportData->stopThread)
00184             {
00185                 /*Codes_SRS_IOTHUBTRANSPORT_17_028: [ The thread shall exit when IoTHubTransport_EndWorkerThread has been called for each clientHandle which invoked IoTHubTransport_StartWorkerThread. ]*/
00186                 (void)Unlock(transportData->lockHandle);
00187                 break;
00188             }
00189             else
00190             {
00191                 (transportData->IoTHubTransport_DoWork)(transportData->transportLLHandle, NULL);
00192 
00193                 (void)Unlock(transportData->lockHandle);
00194             }
00195         }
00196 
00197         multiplexed_client_do_work(transportData);
00198 
00199         /*Codes_SRS_IOTHUBTRANSPORT_17_029: [ The thread shall call lower layer transport DoWork every 1 ms. ]*/
00200         ThreadAPI_Sleep(1);
00201     }
00202 
00203     ThreadAPI_Exit(0);
00204     return 0;
00205 }
00206 
00207 static bool find_by_handle(const void* element, const void* value)
00208 {
00209     /* data stored at element is device handle */
00210     const IOTHUB_CLIENT_CORE_HANDLE * guess = (const IOTHUB_CLIENT_CORE_HANDLE *)element;
00211     const IOTHUB_CLIENT_CORE_HANDLE match = (const IOTHUB_CLIENT_CORE_HANDLE)value;
00212     return (*guess == match);
00213 }
00214 
00215 static IOTHUB_CLIENT_RESULT start_worker_if_needed(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
00216 {
00217     IOTHUB_CLIENT_RESULT result;
00218     if (transportData->workerThreadHandle == NULL)
00219     {
00220         /*Codes_SRS_IOTHUBTRANSPORT_17_018: [ If the worker thread does not exist, IoTHubTransport_StartWorkerThread shall start the thread using ThreadAPI_Create. ]*/
00221         transportData->stopThread = 0;
00222         if (ThreadAPI_Create(&transportData->workerThreadHandle, transport_worker_thread, transportData) != THREADAPI_OK)
00223         {
00224             transportData->workerThreadHandle = NULL;
00225         }
00226     }
00227     if (transportData->workerThreadHandle != NULL)
00228     {
00229         if (Lock(transportData->clientsLockHandle) != LOCK_OK)
00230         {
00231             LogError("failed to lock for start_worker_if_needed");
00232             result = IOTHUB_CLIENT_ERROR;
00233         }
00234         else
00235         {
00236             /*Codes_SRS_IOTHUBTRANSPORT_17_020: [ IoTHubTransport_StartWorkerThread shall search for IoTHubClient clientHandle in the list of IoTHubClient handles. ]*/
00237             bool addToList = ((VECTOR_size(transportData->clients) == 0) || (VECTOR_find_if(transportData->clients, find_by_handle, clientHandle) == NULL));
00238             if (addToList)
00239             {
00240                 /*Codes_SRS_IOTHUBTRANSPORT_17_021: [ If handle is not found, then clientHandle shall be added to the list. ]*/
00241                 if (VECTOR_push_back(transportData->clients, &clientHandle, 1) != 0)
00242                 {
00243                     LogError("Failed adding device to list (VECTOR_push_back failed)");
00244                     /*Codes_SRS_IOTHUBTRANSPORT_17_042: [ If Adding to the client list fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. ]*/
00245                     result = IOTHUB_CLIENT_ERROR;
00246                 }
00247                 else
00248                 {
00249                     result = IOTHUB_CLIENT_OK;
00250                 }
00251             }
00252             else
00253             {
00254                 result = IOTHUB_CLIENT_OK;
00255             }
00256 
00257             if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
00258             {
00259                 LogError("failed to unlock on start_worker_if_needed");
00260             }
00261         }
00262     }
00263     else
00264     {
00265         result = IOTHUB_CLIENT_ERROR;
00266     }
00267     return result;
00268 }
00269 
00270 static void stop_worker_thread(TRANSPORT_HANDLE_DATA * transportData)
00271 {
00272     /*Codes_SRS_IOTHUBTRANSPORT_17_043: [** IoTHubTransport_SignalEndWorkerThread shall signal the worker thread to end.*/
00273     transportData->stopThread = 1;
00274 }
00275 
00276 static void wait_worker_thread(TRANSPORT_HANDLE_DATA * transportData)
00277 {
00278     if (transportData->workerThreadHandle != NULL)
00279     {
00280         int res;
00281         /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ If handle list is empty, IoTHubTransport_EndWorkerThread shall be joined. ]*/
00282         if (ThreadAPI_Join(transportData->workerThreadHandle, &res) != THREADAPI_OK)
00283         {
00284             LogError("ThreadAPI_Join failed");
00285         }
00286         else
00287         {
00288             transportData->workerThreadHandle = NULL;
00289         }
00290     }
00291 }
00292 
00293 static bool signal_end_worker_thread(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
00294 {
00295     bool okToJoin;
00296 
00297     if (Lock(transportData->clientsLockHandle) != LOCK_OK)
00298     {
00299         LogError("failed to lock for signal_end_worker_thread");
00300         okToJoin = false;
00301     }
00302     else
00303     {
00304         void* element = VECTOR_find_if(transportData->clients, find_by_handle, clientHandle);
00305         if (element != NULL)
00306         {
00307             /*Codes_SRS_IOTHUBTRANSPORT_17_026: [ IoTHubTransport_EndWorkerThread shall remove clientHandlehandle from handle list. ]*/
00308             VECTOR_erase(transportData->clients, element, 1);
00309         }
00310         /*Codes_SRS_IOTHUBTRANSPORT_17_025: [ If the worker thread does not exist, then IoTHubTransport_EndWorkerThread shall return. ]*/
00311         if (transportData->workerThreadHandle != NULL)
00312         {
00313             if (VECTOR_size(transportData->clients) == 0)
00314             {
00315                 stop_worker_thread(transportData);
00316                 okToJoin = true;
00317             }
00318             else
00319             {
00320                 okToJoin = false;
00321             }
00322         }
00323         else
00324         {
00325             okToJoin = false;
00326         }
00327 
00328         if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
00329         {
00330             LogError("failed to unlock on signal_end_worker_thread");
00331         }
00332     }
00333     return okToJoin;
00334 }
00335 
00336 void IoTHubTransport_Destroy(TRANSPORT_HANDLE transportHandle)
00337 {
00338     /*Codes_SRS_IOTHUBTRANSPORT_17_011: [ IoTHubTransport_Destroy shall do nothing if transportHandle is NULL. ]*/
00339     if (transportHandle != NULL)
00340     {
00341         TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
00342         /*Codes_SRS_IOTHUBTRANSPORT_17_033: [ IoTHubTransport_Destroy shall lock the transport lock. ]*/
00343         if (Lock(transportData->lockHandle) != LOCK_OK)
00344         {
00345             LogError("Unable to lock - will still attempt to end thread without thread safety");
00346             stop_worker_thread(transportData);
00347         }
00348         else
00349         {
00350             stop_worker_thread(transportData);
00351             (void)Unlock(transportData->lockHandle);
00352         }
00353         wait_worker_thread(transportData);
00354         /*Codes_SRS_IOTHUBTRANSPORT_17_010: [ IoTHubTransport_Destroy shall free all resources. ]*/
00355         Lock_Deinit(transportData->lockHandle);
00356         (transportData->IoTHubTransport_Destroy)(transportData->transportLLHandle);
00357         VECTOR_destroy(transportData->clients);
00358         Lock_Deinit(transportData->clientsLockHandle);
00359         free(transportHandle);
00360     }
00361 }
00362 
00363 LOCK_HANDLE IoTHubTransport_GetLock(TRANSPORT_HANDLE transportHandle)
00364 {
00365     LOCK_HANDLE lock;
00366     if (transportHandle == NULL)
00367     {
00368         /*Codes_SRS_IOTHUBTRANSPORT_17_013: [ If transportHandle is NULL, IoTHubTransport_GetLock shall return NULL. ]*/
00369         lock = NULL;
00370     }
00371     else
00372     {
00373         /*Codes_SRS_IOTHUBTRANSPORT_17_012: [ IoTHubTransport_GetLock shall return a handle to the transport lock. ]*/
00374         TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
00375         lock = transportData->lockHandle;
00376     }
00377     return lock;
00378 }
00379 
00380 TRANSPORT_LL_HANDLE IoTHubTransport_GetLLTransport(TRANSPORT_HANDLE transportHandle)
00381 {
00382     TRANSPORT_LL_HANDLE llTransport;
00383     if (transportHandle == NULL)
00384     {
00385         /*Codes_SRS_IOTHUBTRANSPORT_17_015: [ If transportHandle is NULL, IoTHubTransport_GetLLTransport shall return NULL. ]*/
00386         llTransport = NULL;
00387     }
00388     else
00389     {
00390         /*Codes_SRS_IOTHUBTRANSPORT_17_014: [ IoTHubTransport_GetLLTransport shall return a handle to the lower layer transport. ]*/
00391         TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
00392         llTransport = transportData->transportLLHandle;
00393     }
00394     return llTransport;
00395 }
00396 
00397 IOTHUB_CLIENT_RESULT IoTHubTransport_StartWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle, IOTHUB_CLIENT_MULTIPLEXED_DO_WORK muxDoWork)
00398 {
00399     IOTHUB_CLIENT_RESULT result;
00400     if (transportHandle == NULL || clientHandle == NULL)
00401     {
00402         /*Codes_SRS_IOTHUBTRANSPORT_17_016: [ If transportHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
00403         /*Codes_SRS_IOTHUBTRANSPORT_17_017: [ If clientHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
00404         result = IOTHUB_CLIENT_INVALID_ARG;
00405     }
00406     else
00407     {
00408         TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
00409 
00410         if (transportData->clientDoWork == NULL)
00411         {
00412             transportData->clientDoWork = muxDoWork;
00413         }
00414 
00415         if ((result = start_worker_if_needed(transportData, clientHandle)) != IOTHUB_CLIENT_OK)
00416         {
00417             /*Codes_SRS_IOTHUBTRANSPORT_17_019: [ If thread creation fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. */
00418             LogError("Unable to start thread safely");
00419         }
00420         else
00421         {
00422             /*Codes_SRS_IOTHUBTRANSPORT_17_022: [ Upon success, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_OK. ]*/
00423             result = IOTHUB_CLIENT_OK;
00424         }
00425     }
00426     return result;
00427 }
00428 
00429 bool IoTHubTransport_SignalEndWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
00430 {
00431     bool okToJoin;
00432     /*Codes_SRS_IOTHUBTRANSPORT_17_023: [ If transportHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
00433     /*Codes_SRS_IOTHUBTRANSPORT_17_024: [ If clientHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
00434     if (!(transportHandle == NULL || clientHandle == NULL))
00435     {
00436         TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
00437         okToJoin = signal_end_worker_thread(transportData, clientHandle);
00438     }
00439     else
00440     {
00441         okToJoin = false;
00442     }
00443     return okToJoin;
00444 }
00445 
00446 void IoTHubTransport_JoinWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
00447 {
00448     /*Codes_SRS_IOTHUBTRANSPORT_17_044: [ If transportHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
00449     /*Codes_SRS_IOTHUBTRANSPORT_17_045: [ If clientHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
00450     if (!(transportHandle == NULL || clientHandle == NULL))
00451     {
00452         TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
00453         /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ The worker thread shall be joined. ]*/
00454         wait_worker_thread(transportData);
00455     }
00456 }