Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more
iothubtransport.c
00001 // Copyright (c) Microsoft. All rights reserved. 00002 // Licensed under the MIT license. See LICENSE file in the project root for full license information. 00003 00004 #include <stdlib.h> 00005 #include <signal.h> 00006 #include <stddef.h> 00007 #include "azure_c_shared_utility/gballoc.h" 00008 #include "azure_c_shared_utility/crt_abstractions.h" 00009 #include "internal/iothubtransport.h" 00010 #include "iothub_client_core.h" 00011 #include "internal/iothub_client_private.h" 00012 #include "azure_c_shared_utility/threadapi.h" 00013 #include "azure_c_shared_utility/lock.h" 00014 #include "azure_c_shared_utility/xlogging.h" 00015 #include "azure_c_shared_utility/vector.h" 00016 00017 #include "internal/iothubtransport.h" 00018 #include "internal/iothub_client_private.h" 00019 #include "iothub_transport_ll.h" 00020 #include "iothub_client_core.h" 00021 00022 typedef struct TRANSPORT_HANDLE_DATA_TAG 00023 { 00024 TRANSPORT_LL_HANDLE transportLLHandle; 00025 THREAD_HANDLE workerThreadHandle; 00026 LOCK_HANDLE lockHandle; 00027 sig_atomic_t stopThread; 00028 TRANSPORT_PROVIDER_FIELDS; 00029 VECTOR_HANDLE clients; 00030 LOCK_HANDLE clientsLockHandle; 00031 IOTHUB_CLIENT_MULTIPLEXED_DO_WORK clientDoWork; 00032 } TRANSPORT_HANDLE_DATA; 00033 00034 /* Used for Unit test */ 00035 const size_t IoTHubTransport_ThreadTerminationOffset = offsetof(TRANSPORT_HANDLE_DATA, stopThread); 00036 00037 TRANSPORT_HANDLE IoTHubTransport_Create(IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol, const char* iotHubName, const char* iotHubSuffix) 00038 { 00039 TRANSPORT_HANDLE_DATA *result; 00040 00041 if (protocol == NULL || iotHubName == NULL || iotHubSuffix == NULL) 00042 { 00043 /*Codes_SRS_IOTHUBTRANSPORT_17_002: [ If protocol is NULL, this function shall return NULL. ]*/ 00044 /*Codes_SRS_IOTHUBTRANSPORT_17_003: [ If iotHubName is NULL, this function shall return NULL. ]*/ 00045 /*Codes_SRS_IOTHUBTRANSPORT_17_004: [ If iotHubSuffix is NULL, this function shall return NULL. ]*/ 00046 LogError("Invalid NULL argument, protocol [%p], name [%p], suffix [%p].", protocol, iotHubName, iotHubSuffix); 00047 result = NULL; 00048 } 00049 else 00050 { 00051 /*Codes_SRS_IOTHUBTRANSPORT_17_032: [ IoTHubTransport_Create shall allocate memory for the transport data. ]*/ 00052 result = (TRANSPORT_HANDLE_DATA*)malloc(sizeof(TRANSPORT_HANDLE_DATA)); 00053 if (result == NULL) 00054 { 00055 /*Codes_SRS_IOTHUBTRANSPORT_17_040: [ If memory allocation fails, IoTHubTransport_Create shall return NULL. ]*/ 00056 LogError("Transport handle was not allocated."); 00057 } 00058 else 00059 { 00060 TRANSPORT_PROVIDER * transportProtocol = (TRANSPORT_PROVIDER*)(protocol()); 00061 IOTHUB_CLIENT_CONFIG upperConfig; 00062 upperConfig.deviceId = NULL; 00063 upperConfig.deviceKey = NULL; 00064 upperConfig.iotHubName = iotHubName; 00065 upperConfig.iotHubSuffix = iotHubSuffix; 00066 upperConfig.protocol = protocol; 00067 upperConfig.protocolGatewayHostName = NULL; 00068 00069 IOTHUBTRANSPORT_CONFIG transportLLConfig; 00070 memset(&transportLLConfig, 0, sizeof(IOTHUBTRANSPORT_CONFIG)); 00071 transportLLConfig.upperConfig = &upperConfig; 00072 transportLLConfig.waitingToSend = NULL; 00073 00074 /*Codes_SRS_IOTHUBTRANSPORT_17_005: [ IoTHubTransport_Create shall create the lower layer transport by calling the protocol's IoTHubTransport_Create function. ]*/ 00075 result->transportLLHandle = transportProtocol->IoTHubTransport_Create(&transportLLConfig); 00076 if (result->transportLLHandle == NULL) 00077 { 00078 /*Codes_SRS_IOTHUBTRANSPORT_17_006: [ If the creation of the transport fails, IoTHubTransport_Create shall return NULL. ]*/ 00079 LogError("Lower Layer transport not created."); 00080 free(result); 00081 result = NULL; 00082 } 00083 else 00084 { 00085 /*Codes_SRS_IOTHUBTRANSPORT_17_007: [ IoTHubTransport_Create shall create the transport lock by Calling Lock_Init. ]*/ 00086 result->lockHandle = Lock_Init(); 00087 if (result->lockHandle == NULL) 00088 { 00089 /*Codes_SRS_IOTHUBTRANSPORT_17_008: [ If the lock creation fails, IoTHubTransport_Create shall return NULL. ]*/ 00090 LogError("transport Lock not created."); 00091 transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle); 00092 free(result); 00093 result = NULL; 00094 } 00095 else if ((result->clientsLockHandle = Lock_Init()) == NULL) 00096 { 00097 LogError("clients Lock not created."); 00098 Lock_Deinit(result->lockHandle); 00099 transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle); 00100 free(result); 00101 result = NULL; 00102 } 00103 else 00104 { 00105 /*Codes_SRS_IOTHUBTRANSPORT_17_038: [ IoTHubTransport_Create shall call VECTOR_Create to make a list of IOTHUB_CLIENT_CORE_HANDLE using this transport. ]*/ 00106 result->clients = VECTOR_create(sizeof(IOTHUB_CLIENT_CORE_HANDLE)); 00107 if (result->clients == NULL) 00108 { 00109 /*Codes_SRS_IOTHUBTRANSPORT_17_039: [ If the Vector creation fails, IoTHubTransport_Create shall return NULL. ]*/ 00110 /*Codes_SRS_IOTHUBTRANSPORT_17_009: [ IoTHubTransport_Create shall clean up any resources it creates if the function does not succeed. ]*/ 00111 LogError("clients list not created."); 00112 Lock_Deinit(result->clientsLockHandle); 00113 Lock_Deinit(result->lockHandle); 00114 transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle); 00115 free(result); 00116 result = NULL; 00117 } 00118 else 00119 { 00120 /*Codes_SRS_IOTHUBTRANSPORT_17_001: [ IoTHubTransport_Create shall return a non-NULL handle on success.]*/ 00121 result->stopThread = 1; 00122 result->clientDoWork = NULL; 00123 result->workerThreadHandle = NULL; /* create thread when work needs to be done */ 00124 result->IoTHubTransport_GetHostname = transportProtocol->IoTHubTransport_GetHostname; 00125 result->IoTHubTransport_SetOption = transportProtocol->IoTHubTransport_SetOption; 00126 result->IoTHubTransport_Create = transportProtocol->IoTHubTransport_Create; 00127 result->IoTHubTransport_Destroy = transportProtocol->IoTHubTransport_Destroy; 00128 result->IoTHubTransport_Register = transportProtocol->IoTHubTransport_Register; 00129 result->IoTHubTransport_Unregister = transportProtocol->IoTHubTransport_Unregister; 00130 result->IoTHubTransport_Subscribe = transportProtocol->IoTHubTransport_Subscribe; 00131 result->IoTHubTransport_Unsubscribe = transportProtocol->IoTHubTransport_Unsubscribe; 00132 result->IoTHubTransport_DoWork = transportProtocol->IoTHubTransport_DoWork; 00133 result->IoTHubTransport_SetRetryPolicy = transportProtocol->IoTHubTransport_SetRetryPolicy; 00134 result->IoTHubTransport_GetSendStatus = transportProtocol->IoTHubTransport_GetSendStatus; 00135 } 00136 } 00137 } 00138 } 00139 } 00140 00141 return result; 00142 } 00143 00144 static void multiplexed_client_do_work(TRANSPORT_HANDLE_DATA* transportData) 00145 { 00146 if (Lock(transportData->clientsLockHandle) != LOCK_OK) 00147 { 00148 LogError("failed to lock for multiplexed_client_do_work"); 00149 } 00150 else 00151 { 00152 size_t numberOfClients; 00153 size_t iterator; 00154 00155 numberOfClients = VECTOR_size(transportData->clients); 00156 for (iterator = 0; iterator < numberOfClients; iterator++) 00157 { 00158 IOTHUB_CLIENT_CORE_HANDLE* clientHandle = (IOTHUB_CLIENT_CORE_HANDLE*)VECTOR_element(transportData->clients, iterator); 00159 00160 if (clientHandle != NULL) 00161 { 00162 transportData->clientDoWork(*clientHandle); 00163 } 00164 } 00165 00166 if (Unlock(transportData->clientsLockHandle) != LOCK_OK) 00167 { 00168 LogError("failed to unlock on multiplexed_client_do_work"); 00169 } 00170 } 00171 } 00172 00173 static int transport_worker_thread(void* threadArgument) 00174 { 00175 TRANSPORT_HANDLE_DATA* transportData = (TRANSPORT_HANDLE_DATA*)threadArgument; 00176 00177 while (1) 00178 { 00179 /*Codes_SRS_IOTHUBTRANSPORT_17_030: [ All calls to lower layer transport DoWork shall be protected by the lock created in IoTHubTransport_Create. ]*/ 00180 if (Lock(transportData->lockHandle) == LOCK_OK) 00181 { 00182 /*Codes_SRS_IOTHUBTRANSPORT_17_031: [ If acquiring the lock fails, lower layer transport DoWork shall not be called. ]*/ 00183 if (transportData->stopThread) 00184 { 00185 /*Codes_SRS_IOTHUBTRANSPORT_17_028: [ The thread shall exit when IoTHubTransport_EndWorkerThread has been called for each clientHandle which invoked IoTHubTransport_StartWorkerThread. ]*/ 00186 (void)Unlock(transportData->lockHandle); 00187 break; 00188 } 00189 else 00190 { 00191 (transportData->IoTHubTransport_DoWork)(transportData->transportLLHandle, NULL); 00192 00193 (void)Unlock(transportData->lockHandle); 00194 } 00195 } 00196 00197 multiplexed_client_do_work(transportData); 00198 00199 /*Codes_SRS_IOTHUBTRANSPORT_17_029: [ The thread shall call lower layer transport DoWork every 1 ms. ]*/ 00200 ThreadAPI_Sleep(1); 00201 } 00202 00203 ThreadAPI_Exit(0); 00204 return 0; 00205 } 00206 00207 static bool find_by_handle(const void* element, const void* value) 00208 { 00209 /* data stored at element is device handle */ 00210 const IOTHUB_CLIENT_CORE_HANDLE * guess = (const IOTHUB_CLIENT_CORE_HANDLE *)element; 00211 const IOTHUB_CLIENT_CORE_HANDLE match = (const IOTHUB_CLIENT_CORE_HANDLE)value; 00212 return (*guess == match); 00213 } 00214 00215 static IOTHUB_CLIENT_RESULT start_worker_if_needed(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_CORE_HANDLE clientHandle) 00216 { 00217 IOTHUB_CLIENT_RESULT result; 00218 if (transportData->workerThreadHandle == NULL) 00219 { 00220 /*Codes_SRS_IOTHUBTRANSPORT_17_018: [ If the worker thread does not exist, IoTHubTransport_StartWorkerThread shall start the thread using ThreadAPI_Create. ]*/ 00221 transportData->stopThread = 0; 00222 if (ThreadAPI_Create(&transportData->workerThreadHandle, transport_worker_thread, transportData) != THREADAPI_OK) 00223 { 00224 transportData->workerThreadHandle = NULL; 00225 } 00226 } 00227 if (transportData->workerThreadHandle != NULL) 00228 { 00229 if (Lock(transportData->clientsLockHandle) != LOCK_OK) 00230 { 00231 LogError("failed to lock for start_worker_if_needed"); 00232 result = IOTHUB_CLIENT_ERROR; 00233 } 00234 else 00235 { 00236 /*Codes_SRS_IOTHUBTRANSPORT_17_020: [ IoTHubTransport_StartWorkerThread shall search for IoTHubClient clientHandle in the list of IoTHubClient handles. ]*/ 00237 bool addToList = ((VECTOR_size(transportData->clients) == 0) || (VECTOR_find_if(transportData->clients, find_by_handle, clientHandle) == NULL)); 00238 if (addToList) 00239 { 00240 /*Codes_SRS_IOTHUBTRANSPORT_17_021: [ If handle is not found, then clientHandle shall be added to the list. ]*/ 00241 if (VECTOR_push_back(transportData->clients, &clientHandle, 1) != 0) 00242 { 00243 LogError("Failed adding device to list (VECTOR_push_back failed)"); 00244 /*Codes_SRS_IOTHUBTRANSPORT_17_042: [ If Adding to the client list fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. ]*/ 00245 result = IOTHUB_CLIENT_ERROR; 00246 } 00247 else 00248 { 00249 result = IOTHUB_CLIENT_OK; 00250 } 00251 } 00252 else 00253 { 00254 result = IOTHUB_CLIENT_OK; 00255 } 00256 00257 if (Unlock(transportData->clientsLockHandle) != LOCK_OK) 00258 { 00259 LogError("failed to unlock on start_worker_if_needed"); 00260 } 00261 } 00262 } 00263 else 00264 { 00265 result = IOTHUB_CLIENT_ERROR; 00266 } 00267 return result; 00268 } 00269 00270 static void stop_worker_thread(TRANSPORT_HANDLE_DATA * transportData) 00271 { 00272 /*Codes_SRS_IOTHUBTRANSPORT_17_043: [** IoTHubTransport_SignalEndWorkerThread shall signal the worker thread to end.*/ 00273 transportData->stopThread = 1; 00274 } 00275 00276 static void wait_worker_thread(TRANSPORT_HANDLE_DATA * transportData) 00277 { 00278 if (transportData->workerThreadHandle != NULL) 00279 { 00280 int res; 00281 /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ If handle list is empty, IoTHubTransport_EndWorkerThread shall be joined. ]*/ 00282 if (ThreadAPI_Join(transportData->workerThreadHandle, &res) != THREADAPI_OK) 00283 { 00284 LogError("ThreadAPI_Join failed"); 00285 } 00286 else 00287 { 00288 transportData->workerThreadHandle = NULL; 00289 } 00290 } 00291 } 00292 00293 static bool signal_end_worker_thread(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_CORE_HANDLE clientHandle) 00294 { 00295 bool okToJoin; 00296 00297 if (Lock(transportData->clientsLockHandle) != LOCK_OK) 00298 { 00299 LogError("failed to lock for signal_end_worker_thread"); 00300 okToJoin = false; 00301 } 00302 else 00303 { 00304 void* element = VECTOR_find_if(transportData->clients, find_by_handle, clientHandle); 00305 if (element != NULL) 00306 { 00307 /*Codes_SRS_IOTHUBTRANSPORT_17_026: [ IoTHubTransport_EndWorkerThread shall remove clientHandlehandle from handle list. ]*/ 00308 VECTOR_erase(transportData->clients, element, 1); 00309 } 00310 /*Codes_SRS_IOTHUBTRANSPORT_17_025: [ If the worker thread does not exist, then IoTHubTransport_EndWorkerThread shall return. ]*/ 00311 if (transportData->workerThreadHandle != NULL) 00312 { 00313 if (VECTOR_size(transportData->clients) == 0) 00314 { 00315 stop_worker_thread(transportData); 00316 okToJoin = true; 00317 } 00318 else 00319 { 00320 okToJoin = false; 00321 } 00322 } 00323 else 00324 { 00325 okToJoin = false; 00326 } 00327 00328 if (Unlock(transportData->clientsLockHandle) != LOCK_OK) 00329 { 00330 LogError("failed to unlock on signal_end_worker_thread"); 00331 } 00332 } 00333 return okToJoin; 00334 } 00335 00336 void IoTHubTransport_Destroy(TRANSPORT_HANDLE transportHandle) 00337 { 00338 /*Codes_SRS_IOTHUBTRANSPORT_17_011: [ IoTHubTransport_Destroy shall do nothing if transportHandle is NULL. ]*/ 00339 if (transportHandle != NULL) 00340 { 00341 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; 00342 /*Codes_SRS_IOTHUBTRANSPORT_17_033: [ IoTHubTransport_Destroy shall lock the transport lock. ]*/ 00343 if (Lock(transportData->lockHandle) != LOCK_OK) 00344 { 00345 LogError("Unable to lock - will still attempt to end thread without thread safety"); 00346 stop_worker_thread(transportData); 00347 } 00348 else 00349 { 00350 stop_worker_thread(transportData); 00351 (void)Unlock(transportData->lockHandle); 00352 } 00353 wait_worker_thread(transportData); 00354 /*Codes_SRS_IOTHUBTRANSPORT_17_010: [ IoTHubTransport_Destroy shall free all resources. ]*/ 00355 Lock_Deinit(transportData->lockHandle); 00356 (transportData->IoTHubTransport_Destroy)(transportData->transportLLHandle); 00357 VECTOR_destroy(transportData->clients); 00358 Lock_Deinit(transportData->clientsLockHandle); 00359 free(transportHandle); 00360 } 00361 } 00362 00363 LOCK_HANDLE IoTHubTransport_GetLock(TRANSPORT_HANDLE transportHandle) 00364 { 00365 LOCK_HANDLE lock; 00366 if (transportHandle == NULL) 00367 { 00368 /*Codes_SRS_IOTHUBTRANSPORT_17_013: [ If transportHandle is NULL, IoTHubTransport_GetLock shall return NULL. ]*/ 00369 lock = NULL; 00370 } 00371 else 00372 { 00373 /*Codes_SRS_IOTHUBTRANSPORT_17_012: [ IoTHubTransport_GetLock shall return a handle to the transport lock. ]*/ 00374 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; 00375 lock = transportData->lockHandle; 00376 } 00377 return lock; 00378 } 00379 00380 TRANSPORT_LL_HANDLE IoTHubTransport_GetLLTransport(TRANSPORT_HANDLE transportHandle) 00381 { 00382 TRANSPORT_LL_HANDLE llTransport; 00383 if (transportHandle == NULL) 00384 { 00385 /*Codes_SRS_IOTHUBTRANSPORT_17_015: [ If transportHandle is NULL, IoTHubTransport_GetLLTransport shall return NULL. ]*/ 00386 llTransport = NULL; 00387 } 00388 else 00389 { 00390 /*Codes_SRS_IOTHUBTRANSPORT_17_014: [ IoTHubTransport_GetLLTransport shall return a handle to the lower layer transport. ]*/ 00391 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; 00392 llTransport = transportData->transportLLHandle; 00393 } 00394 return llTransport; 00395 } 00396 00397 IOTHUB_CLIENT_RESULT IoTHubTransport_StartWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle, IOTHUB_CLIENT_MULTIPLEXED_DO_WORK muxDoWork) 00398 { 00399 IOTHUB_CLIENT_RESULT result; 00400 if (transportHandle == NULL || clientHandle == NULL) 00401 { 00402 /*Codes_SRS_IOTHUBTRANSPORT_17_016: [ If transportHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ 00403 /*Codes_SRS_IOTHUBTRANSPORT_17_017: [ If clientHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ 00404 result = IOTHUB_CLIENT_INVALID_ARG; 00405 } 00406 else 00407 { 00408 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; 00409 00410 if (transportData->clientDoWork == NULL) 00411 { 00412 transportData->clientDoWork = muxDoWork; 00413 } 00414 00415 if ((result = start_worker_if_needed(transportData, clientHandle)) != IOTHUB_CLIENT_OK) 00416 { 00417 /*Codes_SRS_IOTHUBTRANSPORT_17_019: [ If thread creation fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. */ 00418 LogError("Unable to start thread safely"); 00419 } 00420 else 00421 { 00422 /*Codes_SRS_IOTHUBTRANSPORT_17_022: [ Upon success, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_OK. ]*/ 00423 result = IOTHUB_CLIENT_OK; 00424 } 00425 } 00426 return result; 00427 } 00428 00429 bool IoTHubTransport_SignalEndWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle) 00430 { 00431 bool okToJoin; 00432 /*Codes_SRS_IOTHUBTRANSPORT_17_023: [ If transportHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/ 00433 /*Codes_SRS_IOTHUBTRANSPORT_17_024: [ If clientHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/ 00434 if (!(transportHandle == NULL || clientHandle == NULL)) 00435 { 00436 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; 00437 okToJoin = signal_end_worker_thread(transportData, clientHandle); 00438 } 00439 else 00440 { 00441 okToJoin = false; 00442 } 00443 return okToJoin; 00444 } 00445 00446 void IoTHubTransport_JoinWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle) 00447 { 00448 /*Codes_SRS_IOTHUBTRANSPORT_17_044: [ If transportHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/ 00449 /*Codes_SRS_IOTHUBTRANSPORT_17_045: [ If clientHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/ 00450 if (!(transportHandle == NULL || clientHandle == NULL)) 00451 { 00452 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; 00453 /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ The worker thread shall be joined. ]*/ 00454 wait_worker_thread(transportData); 00455 } 00456 }
Generated on Tue Jul 12 2022 14:17:26 by
