Microsoft Azure IoTHub client libraries
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more
This library implements the Microsoft Azure IoTHub client library. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothub_client_core.c
- Revision:
- 92:97148cf9aa2a
- Parent:
- 91:bbf806070c5f
diff -r bbf806070c5f -r 97148cf9aa2a iothub_client_core.c --- a/iothub_client_core.c Thu Jul 12 18:09:13 2018 -0700 +++ b/iothub_client_core.c Tue Sep 11 11:13:11 2018 -0700 @@ -1,7 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -#include <stdlib.h> +#include <stdlib.h> #include "azure_c_shared_utility/umock_c_prod.h" #include "azure_c_shared_utility/gballoc.h" @@ -29,9 +29,7 @@ THREAD_HANDLE ThreadHandle; LOCK_HANDLE LockHandle; sig_atomic_t StopThread; -#ifndef DONT_USE_UPLOADTOBLOB - SINGLYLINKEDLIST_HANDLE savedDataToBeCleaned; /*list containing UPLOADTOBLOB_SAVED_DATA*/ -#endif + SINGLYLINKEDLIST_HANDLE httpWorkerThreadInfoList; /*list containing HTTPWORKER_THREAD_INFO*/ int created_with_transport_handle; VECTOR_HANDLE saved_user_callback_list; IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK desired_state_callback; @@ -47,7 +45,12 @@ struct IOTHUB_QUEUE_CONTEXT_TAG* method_user_context; } IOTHUB_CLIENT_CORE_INSTANCE; -#ifndef DONT_USE_UPLOADTOBLOB +typedef enum HTTPWORKER_THREAD_TYPE_TAG +{ + HTTPWORKER_THREAD_UPLOAD_TO_BLOB, + HTTPWORKER_THREAD_INVOKE_METHOD +} HTTPWORKER_THREAD_TYPE; + typedef struct UPLOADTOBLOB_SAVED_DATA_TAG { unsigned char* source; @@ -61,19 +64,29 @@ IOTHUB_CLIENT_FILE_UPLOAD_GET_DATA_CALLBACK_EX getDataCallbackEx; }UPLOADTOBLOB_MULTIBLOCK_SAVED_DATA; -typedef struct UPLOADTOBLOB_THREAD_INFO_TAG +typedef struct INVOKE_METHOD_SAVED_DATA_TAG { + const char* deviceId; + const char* moduleId; + const char* methodName; + const char* methodPayload; + unsigned int timeout; + IOTHUB_METHOD_INVOKE_CALLBACK methodInvokeCallback; +} INVOKE_METHOD_SAVED_DATA; + +typedef struct HTTPWORKER_THREAD_INFO_TAG +{ + HTTPWORKER_THREAD_TYPE workerThreadType; char* destinationFileName; - THREAD_HANDLE uploadingThreadHandle; + THREAD_HANDLE threadHandle; LOCK_HANDLE lockGarbage; - int canBeGarbageCollected; /*flag indicating that the UPLOADTOBLOB_SAVED_DATA structure can be freed because the thread deadling with it finished*/ + int canBeGarbageCollected; /*flag indicating that the structure can be freed because the thread deadling with it finished*/ IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle; void* context; UPLOADTOBLOB_SAVED_DATA uploadBlobSavedData; + INVOKE_METHOD_SAVED_DATA invokeMethodSavedData; UPLOADTOBLOB_MULTIBLOCK_SAVED_DATA uploadBlobMultiblockSavedData; -}UPLOADTOBLOB_THREAD_INFO; - -#endif +}HTTPWORKER_THREAD_INFO; #define USER_CALLBACK_TYPE_VALUES \ CALLBACK_TYPE_DEVICE_TWIN, \ @@ -165,12 +178,22 @@ CREATE_HUB_INSTANCE_FROM_DEVICE_AUTH } CREATE_HUB_INSTANCE_TYPE; -#ifndef DONT_USE_UPLOADTOBLOB -static void freeUploadToBlobThreadInfo(UPLOADTOBLOB_THREAD_INFO* threadInfo) +static void freeHttpWorkerThreadInfo(HTTPWORKER_THREAD_INFO* threadInfo) { Lock_Deinit(threadInfo->lockGarbage); - free(threadInfo->uploadBlobSavedData.source); - free(threadInfo->destinationFileName); + if (threadInfo->workerThreadType == HTTPWORKER_THREAD_UPLOAD_TO_BLOB) + { + free(threadInfo->uploadBlobSavedData.source); + free(threadInfo->destinationFileName); + } + else if (threadInfo->workerThreadType == HTTPWORKER_THREAD_INVOKE_METHOD) + { + free((char*)threadInfo->invokeMethodSavedData.deviceId); + free((char*)threadInfo->invokeMethodSavedData.moduleId); + free((char*)threadInfo->invokeMethodSavedData.methodName); + free((char*)threadInfo->invokeMethodSavedData.methodPayload); + } + free(threadInfo); } @@ -179,10 +202,10 @@ { /*see if any savedData structures can be disposed of*/ /*Codes_SRS_IOTHUBCLIENT_02_072: [ All threads marked as disposable (upon completion of a file upload) shall be joined and the data structures build for them shall be freed. ]*/ - LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(iotHubClientInstance->savedDataToBeCleaned); + LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(iotHubClientInstance->httpWorkerThreadInfoList); while (item != NULL) { - UPLOADTOBLOB_THREAD_INFO* threadInfo = (UPLOADTOBLOB_THREAD_INFO*)singlylinkedlist_item_get_value(item); + HTTPWORKER_THREAD_INFO* threadInfo = (HTTPWORKER_THREAD_INFO*)singlylinkedlist_item_get_value(item); LIST_ITEM_HANDLE old_item = item; item = singlylinkedlist_get_next_item(item); @@ -195,17 +218,17 @@ if (threadInfo->canBeGarbageCollected == 1) { int notUsed; - if (ThreadAPI_Join(threadInfo->uploadingThreadHandle, ¬Used) != THREADAPI_OK) + if (ThreadAPI_Join(threadInfo->threadHandle, ¬Used) != THREADAPI_OK) { LogError("unable to ThreadAPI_Join"); } - (void)singlylinkedlist_remove(iotHubClientInstance->savedDataToBeCleaned, old_item); + (void)singlylinkedlist_remove(iotHubClientInstance->httpWorkerThreadInfoList, old_item); if (Unlock(threadInfo->lockGarbage) != LOCK_OK) { LogError("unable to unlock after locking"); } - freeUploadToBlobThreadInfo(threadInfo); + freeHttpWorkerThreadInfo(threadInfo); } else { @@ -217,7 +240,7 @@ } } } -#endif + static bool iothub_ll_message_callback(MESSAGE_CALLBACK_INFO* messageData, void* userContextCallback) { @@ -599,7 +622,7 @@ } break; case CALLBACK_TYPE_MESSAGE: - if (message_callback) + if (message_callback && message_user_context_handle) { IOTHUBMESSAGE_DISPOSITION_RESULT disposition = message_callback(queued_cb->iothub_callback.message_cb_info->messageHandle, queued_cb->userContextCallback); @@ -653,9 +676,7 @@ { IOTHUB_CLIENT_CORE_INSTANCE* iotHubClientInstance = (IOTHUB_CLIENT_CORE_INSTANCE*)iotHubClientHandle; -#ifndef DONT_USE_UPLOADTOBLOB garbageCollectorImpl(iotHubClientInstance); -#endif if (Lock(iotHubClientInstance->LockHandle) == LOCK_OK) { VECTOR_HANDLE call_backs = VECTOR_move(iotHubClientInstance->saved_user_callback_list); @@ -696,9 +717,7 @@ /* Codes_SRS_IOTHUBCLIENT_01_039: [All calls to IoTHubClientCore_LL_DoWork shall be protected by the lock created in IotHubClient_Create.] */ IoTHubClientCore_LL_DoWork(iotHubClientInstance->IoTHubClientLLHandle); -#ifndef DONT_USE_UPLOADTOBLOB garbageCollectorImpl(iotHubClientInstance); -#endif VECTOR_HANDLE call_backs = VECTOR_move(iotHubClientInstance->saved_user_callback_list); (void)Unlock(iotHubClientInstance->LockHandle); if (call_backs == NULL) @@ -766,6 +785,8 @@ /* Codes_SRS_IOTHUBCLIENT_01_004: [If allocating memory for the new IoTHubClient instance fails, then IoTHubClient_Create shall return NULL.] */ if (result != NULL) { + memset((void *)result, 0, sizeof(IOTHUB_CLIENT_CORE_INSTANCE)); + /* Codes_SRS_IOTHUBCLIENT_01_029: [IoTHubClient_Create shall create a lock object to be used later for serializing IoTHubClient calls.] */ if ((result->saved_user_callback_list = VECTOR_create(sizeof(USER_CALLBACK_INFO))) == NULL) { @@ -775,9 +796,8 @@ } else { -#ifndef DONT_USE_UPLOADTOBLOB /*Codes_SRS_IOTHUBCLIENT_02_060: [ IoTHubClient_Create shall create a SINGLYLINKEDLIST_HANDLE containing THREAD_HANDLE (created by future calls to IoTHubClient_UploadToBlobAsync). ]*/ - if ((result->savedDataToBeCleaned = singlylinkedlist_create()) == NULL) + if ((result->httpWorkerThreadInfoList = singlylinkedlist_create()) == NULL) { /*Codes_SRS_IOTHUBCLIENT_02_061: [ If creating the SINGLYLINKEDLIST_HANDLE fails then IoTHubClient_Create shall fail and return NULL. ]*/ LogError("unable to singlylinkedlist_create"); @@ -786,7 +806,6 @@ result = NULL; } else -#endif { result->TransportHandle = transportHandle; result->created_with_transport_handle = 0; @@ -914,13 +933,11 @@ /* Codes_SRS_IOTHUBCLIENT_01_003: [If IoTHubClientCore_LL_Create fails, then IoTHubClient_Create shall return NULL.] */ /* Codes_SRS_IOTHUBCLIENT_01_031: [If IoTHubClient_Create fails, all resources allocated by it shall be freed.] */ /* Codes_SRS_IOTHUBCLIENT_17_006: [ If IoTHubTransport_GetLock fails, then IoTHubClient_CreateWithTransport shall return NULL. ]*/ - if (transportHandle == NULL) + if ((transportHandle == NULL) && (result->LockHandle != NULL)) { Lock_Deinit(result->LockHandle); } -#ifndef DONT_USE_UPLOADTOBLOB - singlylinkedlist_destroy(result->savedDataToBeCleaned); -#endif + singlylinkedlist_destroy(result->httpWorkerThreadInfoList); LogError("Failure creating iothub handle"); VECTOR_destroy(result->saved_user_callback_list); free(result); @@ -1025,7 +1042,7 @@ /* Codes_SRS_IOTHUBCLIENT_12_022: [** `IoTHubClient_CreateFromDeviceAuth` shall create a lock object to be used later for serializing IoTHubClient calls. **] */ /* Codes_SRS_IOTHUBCLIENT_12_023: [** If creating the lock fails, then IoTHubClient_CreateFromDeviceAuth shall return NULL. **] */ /* Codes_SRS_IOTHUBCLIENT_12_024: [** If IoTHubClient_CreateFromDeviceAuth fails, all resources allocated by it shall be freed. **] */ - /* Codes_SRS_IOTHUBCLIENT_12_025: [** `IoTHubClient_CreateFromDeviceAuth` shall instantiate a new `IoTHubClientCore_LL` instance by calling `IoTHubClientCore_LL_CreateFromDeviceAuth` and passing iothub_uri, device_id and protocol argument. **] */ + /* Codes_SRS_IOTHUBCLIENT_12_025: [** `IoTHubClient_CreateFromDeviceAuth` shall instantiate a new `IoTHubClientCore_LL` instance by calling `IoTHubClientCore_LL_CreateFromDeviceAuth` and passing iothub_uri, device_id and protocol argument. **] */ result = create_iothub_instance(CREATE_HUB_INSTANCE_FROM_DEVICE_AUTH, NULL, NULL, NULL, protocol, iothub_uri, device_id); } return result; @@ -1104,19 +1121,17 @@ LogError("unable to Lock - - will still proceed to try to end the thread without locking"); } -#ifndef DONT_USE_UPLOADTOBLOB /*Codes_SRS_IOTHUBCLIENT_02_069: [ IoTHubClient_Destroy shall free all data created by IoTHubClient_UploadToBlobAsync ]*/ /*wait for all uploading threads to finish*/ - while (singlylinkedlist_get_head_item(iotHubClientInstance->savedDataToBeCleaned) != NULL) + while (singlylinkedlist_get_head_item(iotHubClientInstance->httpWorkerThreadInfoList) != NULL) { garbageCollectorImpl(iotHubClientInstance); } - if (iotHubClientInstance->savedDataToBeCleaned != NULL) + if (iotHubClientInstance->httpWorkerThreadInfoList != NULL) { - singlylinkedlist_destroy(iotHubClientInstance->savedDataToBeCleaned); + singlylinkedlist_destroy(iotHubClientInstance->httpWorkerThreadInfoList); } -#endif /* Codes_SRS_IOTHUBCLIENT_01_006: [That includes destroying the IoTHubClientCore_LL instance by calling IoTHubClientCore_LL_Destroy.] */ IoTHubClientCore_LL_Destroy(iotHubClientInstance->IoTHubClientLLHandle); @@ -1963,31 +1978,38 @@ return result; } -#ifndef DONT_USE_UPLOADTOBLOB -static IOTHUB_CLIENT_RESULT startUploadToBlobWorkerThread(UPLOADTOBLOB_THREAD_INFO* threadInfo, THREAD_START_FUNC uploadThreadFunc) +#if !defined(DONT_USE_UPLOADTOBLOB) || defined(USE_EDGE_MODULES) +static IOTHUB_CLIENT_RESULT startHttpWorkerThread(IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle, HTTPWORKER_THREAD_INFO* threadInfo, THREAD_START_FUNC httpWorkerThreadFunc) { IOTHUB_CLIENT_RESULT result; LIST_ITEM_HANDLE item; - if (Lock(threadInfo->iotHubClientHandle->LockHandle) != LOCK_OK) + // StartWorkerThreadIfNeeded creates the "main" worker thread used for transports. Though its not used + // for these HTTP based worker threads (see ThreadAPI_Create call below) the main one is needed for garbage collection. + if ((result = StartWorkerThreadIfNeeded(iotHubClientHandle)) != IOTHUB_CLIENT_OK) + { + /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ + LogError("Could not start worker thread"); + } + else if (Lock(threadInfo->iotHubClientHandle->LockHandle) != LOCK_OK) { LogError("Lock failed"); result = IOTHUB_CLIENT_ERROR; } else { - if ((item = singlylinkedlist_add(threadInfo->iotHubClientHandle->savedDataToBeCleaned, threadInfo)) == NULL) + if ((item = singlylinkedlist_add(threadInfo->iotHubClientHandle->httpWorkerThreadInfoList, threadInfo)) == NULL) { LogError("Adding item to list failed"); result = IOTHUB_CLIENT_ERROR; } - else if (ThreadAPI_Create(&threadInfo->uploadingThreadHandle, uploadThreadFunc, threadInfo) != THREADAPI_OK) + else if (ThreadAPI_Create(&threadInfo->threadHandle, httpWorkerThreadFunc, threadInfo) != THREADAPI_OK) { /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ LogError("unable to ThreadAPI_Create"); // Remove the item from linked list here, while we're still under lock. Final garbage collector also does it under lock. - (void)singlylinkedlist_remove(threadInfo->iotHubClientHandle->savedDataToBeCleaned, item); + (void)singlylinkedlist_remove(threadInfo->iotHubClientHandle->httpWorkerThreadInfoList, item); result = IOTHUB_CLIENT_ERROR; } else @@ -2000,38 +2022,7 @@ return result; } -static UPLOADTOBLOB_THREAD_INFO* allocateUploadToBlob(const char* destinationFileName, IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle, void* context) -{ - UPLOADTOBLOB_THREAD_INFO* threadInfo = (UPLOADTOBLOB_THREAD_INFO*)malloc(sizeof(UPLOADTOBLOB_THREAD_INFO)); - if (threadInfo == NULL) - { - LogError("unable to allocate thread object"); - } - else - { - memset(threadInfo, 0, sizeof(UPLOADTOBLOB_THREAD_INFO)); - threadInfo->iotHubClientHandle = iotHubClientHandle; - threadInfo->context = context; - - if (mallocAndStrcpy_s(&threadInfo->destinationFileName, destinationFileName) != 0) - { - /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ - LogError("unable to mallocAndStrcpy_s"); - freeUploadToBlobThreadInfo(threadInfo); - threadInfo = NULL; - } - else if ((threadInfo->lockGarbage = Lock_Init()) == NULL) - { - LogError("unable to allocate a lock"); - freeUploadToBlobThreadInfo(threadInfo); - threadInfo = NULL; - } - } - - return threadInfo; -} - -static int markThreadReadyToBeGarbageCollected(UPLOADTOBLOB_THREAD_INFO* threadInfo) +static int markThreadReadyToBeGarbageCollected(HTTPWORKER_THREAD_INFO* threadInfo) { /*Codes_SRS_IOTHUBCLIENT_02_071: [ The thread shall mark itself as disposable. ]*/ if (Lock(threadInfo->lockGarbage) != LOCK_OK) @@ -2053,7 +2044,43 @@ return 0; } -static IOTHUB_CLIENT_RESULT initializeUploadToBlobData(UPLOADTOBLOB_THREAD_INFO* threadInfo, const unsigned char* source, size_t size, IOTHUB_CLIENT_FILE_UPLOAD_CALLBACK iotHubClientFileUploadCallback) +#endif // !defined(DONT_USE_UPLOADTOBLOB) || defined(USE_EDGE_MODULES) + +#if !defined(DONT_USE_UPLOADTOBLOB) +static HTTPWORKER_THREAD_INFO* allocateUploadToBlob(const char* destinationFileName, IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle, void* context) +{ + HTTPWORKER_THREAD_INFO* threadInfo = (HTTPWORKER_THREAD_INFO*)malloc(sizeof(HTTPWORKER_THREAD_INFO)); + if (threadInfo == NULL) + { + LogError("unable to allocate thread object"); + } + else + { + memset(threadInfo, 0, sizeof(HTTPWORKER_THREAD_INFO)); + threadInfo->workerThreadType = HTTPWORKER_THREAD_UPLOAD_TO_BLOB; + threadInfo->iotHubClientHandle = iotHubClientHandle; + threadInfo->context = context; + + if (mallocAndStrcpy_s(&threadInfo->destinationFileName, destinationFileName) != 0) + { + /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ + LogError("unable to mallocAndStrcpy_s"); + freeHttpWorkerThreadInfo(threadInfo); + threadInfo = NULL; + } + else if ((threadInfo->lockGarbage = Lock_Init()) == NULL) + { + LogError("unable to allocate a lock"); + freeHttpWorkerThreadInfo(threadInfo); + threadInfo = NULL; + } + } + + return threadInfo; +} + + +static IOTHUB_CLIENT_RESULT initializeUploadToBlobData(HTTPWORKER_THREAD_INFO* threadInfo, const unsigned char* source, size_t size, IOTHUB_CLIENT_FILE_UPLOAD_CALLBACK iotHubClientFileUploadCallback) { IOTHUB_CLIENT_RESULT result; @@ -2085,7 +2112,7 @@ static int uploadingThread(void *data) { IOTHUB_CLIENT_FILE_UPLOAD_RESULT upload_result; - UPLOADTOBLOB_THREAD_INFO* threadInfo = (UPLOADTOBLOB_THREAD_INFO*)data; + HTTPWORKER_THREAD_INFO* threadInfo = (HTTPWORKER_THREAD_INFO*)data; /*it so happens that IoTHubClientCore_LL_UploadToBlob is thread-safe because there's no saved state in the handle and there are no globals, so no need to protect it*/ /*not having it protected means multiple simultaneous uploads can happen*/ @@ -2134,7 +2161,7 @@ else { /*Codes_SRS_IOTHUBCLIENT_02_051: [IoTHubClient_UploadToBlobAsync shall copy the souce, size, iotHubClientFileUploadCallback, context into a structure.]*/ - UPLOADTOBLOB_THREAD_INFO *threadInfo = allocateUploadToBlob(destinationFileName, iotHubClientHandle, context); + HTTPWORKER_THREAD_INFO *threadInfo = allocateUploadToBlob(destinationFileName, iotHubClientHandle, context); if (threadInfo == NULL) { /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ @@ -2147,18 +2174,12 @@ LogError("unable to initialize upload blob info"); result = IOTHUB_CLIENT_ERROR; } - else if ((result = StartWorkerThreadIfNeeded(iotHubClientHandle)) != IOTHUB_CLIENT_OK) - { - /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ - LogError("Could not start worker thread"); - freeUploadToBlobThreadInfo(threadInfo); - } /*Codes_SRS_IOTHUBCLIENT_02_052: [ IoTHubClient_UploadToBlobAsync shall spawn a thread passing the structure build in SRS IOTHUBCLIENT 02 051 as thread data.]*/ - else if ((result = startUploadToBlobWorkerThread(threadInfo, uploadingThread)) != IOTHUB_CLIENT_OK) + else if ((result = startHttpWorkerThread(iotHubClientHandle, threadInfo, uploadingThread)) != IOTHUB_CLIENT_OK) { /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ LogError("unable to start upload thread"); - freeUploadToBlobThreadInfo(threadInfo); + freeHttpWorkerThreadInfo(threadInfo); } else { @@ -2171,7 +2192,7 @@ static int uploadMultipleBlock_thread(void* data) { - UPLOADTOBLOB_THREAD_INFO* threadInfo = (UPLOADTOBLOB_THREAD_INFO*)data; + HTTPWORKER_THREAD_INFO* threadInfo = (HTTPWORKER_THREAD_INFO*)data; IOTHUB_CLIENT_CORE_LL_HANDLE llHandle = threadInfo->iotHubClientHandle->IoTHubClientLLHandle; /*Codes_SRS_IOTHUBCLIENT_99_078: [ The thread shall call `IoTHubClientCore_LL_UploadMultipleBlocksToBlob` or `IoTHubClientCore_LL_UploadMultipleBlocksToBlobEx` passing the information packed in the structure. ]*/ @@ -2214,7 +2235,7 @@ else { /*Codes_SRS_IOTHUBCLIENT_99_075: [ `IoTHubClient_UploadMultipleBlocksToBlobAsync(Ex)` shall copy the `destinationFileName`, `getDataCallback`, `context` and `iotHubClientHandle` into a structure. ]*/ - UPLOADTOBLOB_THREAD_INFO *threadInfo = allocateUploadToBlob(destinationFileName, iotHubClientHandle, context); + HTTPWORKER_THREAD_INFO *threadInfo = allocateUploadToBlob(destinationFileName, iotHubClientHandle, context); if (threadInfo == NULL) { /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ @@ -2227,17 +2248,11 @@ threadInfo->uploadBlobMultiblockSavedData.getDataCallback = getDataCallback; threadInfo->uploadBlobMultiblockSavedData.getDataCallbackEx = getDataCallbackEx; - if ((result = StartWorkerThreadIfNeeded(iotHubClientHandle)) != IOTHUB_CLIENT_OK) - { - /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ - LogError("Could not start worker thread"); - freeUploadToBlobThreadInfo(threadInfo); - } - else if ((result = startUploadToBlobWorkerThread(threadInfo, uploadMultipleBlock_thread)) != IOTHUB_CLIENT_OK) + if ((result = startHttpWorkerThread(iotHubClientHandle, threadInfo, uploadMultipleBlock_thread)) != IOTHUB_CLIENT_OK) { /*Codes_SRS_IOTHUBCLIENT_02_053: [ If copying to the structure or spawning the thread fails, then IoTHubClient_UploadToBlobAsync shall fail and return IOTHUB_CLIENT_ERROR. ]*/ LogError("unable to start upload thread"); - freeUploadToBlobThreadInfo(threadInfo); + freeHttpWorkerThreadInfo(threadInfo); } else { @@ -2313,7 +2328,7 @@ inputMessageCallbackContext.iotHubClientHandle = iotHubClientHandle; inputMessageCallbackContext.eventHandlerCallback = eventHandlerCallback; inputMessageCallbackContext.userContextCallback = userContextCallback; - + result = IoTHubClientCore_LL_SetInputMessageCallbackEx(iotHubClientInstance->IoTHubClientLLHandle, inputName, iothub_ll_inputmessage_callback, (void*)&inputMessageCallbackContext, sizeof(inputMessageCallbackContext)); (void)Unlock(iotHubClientInstance->LockHandle); } @@ -2323,27 +2338,105 @@ return result; } -/* Temporary function until replacement during iothub_client refactor*/ #ifdef USE_EDGE_MODULES -IOTHUB_CLIENT_RESULT IoTHubClientCore_GenericMethodInvoke(IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle, const char* deviceId, const char* moduleId, const char* methodName, const char* methodPayload, unsigned int timeout, int* responseStatus, unsigned char** responsePayload, size_t* responsePayloadSize) + +HTTPWORKER_THREAD_INFO * allocateMethodInvoke(IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle, const char* deviceId, const char* moduleId, const char* methodName, const char* methodPayload, unsigned int timeout, IOTHUB_METHOD_INVOKE_CALLBACK methodInvokeCallback, void* context) { - IOTHUB_CLIENT_RESULT result; - if (iotHubClientHandle == NULL) + HTTPWORKER_THREAD_INFO* threadInfo = (HTTPWORKER_THREAD_INFO*)malloc(sizeof(HTTPWORKER_THREAD_INFO)); + if (threadInfo == NULL) { - LogError("Argument cannot be NULL"); - result = IOTHUB_CLIENT_INVALID_ARG; - } - else if (Lock(iotHubClientHandle->LockHandle) != LOCK_OK) - { - LogError("failed locking for dispatch_user_callbacks"); - result = IOTHUB_CLIENT_ERROR; + LogError("unable to allocate thread object"); } else { - result = IoTHubClientCore_LL_GenericMethodInvoke(iotHubClientHandle->IoTHubClientLLHandle, deviceId, moduleId, methodName, methodPayload, timeout, responseStatus, responsePayload, responsePayloadSize); - (void)Unlock(iotHubClientHandle->LockHandle); + memset(threadInfo, 0, sizeof(HTTPWORKER_THREAD_INFO)); + threadInfo->workerThreadType = HTTPWORKER_THREAD_INVOKE_METHOD; + threadInfo->iotHubClientHandle = iotHubClientHandle; + threadInfo->context = context; + + threadInfo->invokeMethodSavedData.timeout = timeout; + threadInfo->invokeMethodSavedData.methodInvokeCallback = methodInvokeCallback; + + if ((mallocAndStrcpy_s((char**)&threadInfo->invokeMethodSavedData.deviceId, deviceId) != 0) || + ((moduleId != NULL) && mallocAndStrcpy_s((char**)&threadInfo->invokeMethodSavedData.moduleId, moduleId) != 0) || + (mallocAndStrcpy_s((char**)&threadInfo->invokeMethodSavedData.methodName, methodName) != 0) || + (mallocAndStrcpy_s((char**)&threadInfo->invokeMethodSavedData.methodPayload, methodPayload) != 0)) + { + LogError("Allocating resources failed"); + freeHttpWorkerThreadInfo(threadInfo); + threadInfo = NULL; + } + else if ((threadInfo->lockGarbage = Lock_Init()) == NULL) + { + LogError("unable to allocate a lock"); + freeHttpWorkerThreadInfo(threadInfo); + threadInfo = NULL; + } } + return threadInfo; +} + +static int uploadMethodInvoke_thread(void* data) +{ + IOTHUB_CLIENT_RESULT result; + + HTTPWORKER_THREAD_INFO* threadInfo = (HTTPWORKER_THREAD_INFO*)data; + + int responseStatus; + unsigned char* responsePayload = NULL; + size_t responsePayloadSize; + + result = IoTHubClientCore_LL_GenericMethodInvoke(threadInfo->iotHubClientHandle->IoTHubClientLLHandle, + threadInfo->invokeMethodSavedData.deviceId, + threadInfo->invokeMethodSavedData.moduleId, + threadInfo->invokeMethodSavedData.methodName, + threadInfo->invokeMethodSavedData.methodPayload, + threadInfo->invokeMethodSavedData.timeout, + &responseStatus, + &responsePayload, + &responsePayloadSize); + + if (threadInfo->invokeMethodSavedData.methodInvokeCallback != NULL) + { + threadInfo->invokeMethodSavedData.methodInvokeCallback(result, responseStatus, responsePayload, responsePayloadSize, threadInfo->context); + } + + if (responsePayload != NULL) + { + free(responsePayload); + } + + (void)markThreadReadyToBeGarbageCollected(threadInfo); + return result; +} + + +IOTHUB_CLIENT_RESULT IoTHubClientCore_GenericMethodInvoke(IOTHUB_CLIENT_CORE_HANDLE iotHubClientHandle, const char* deviceId, const char* moduleId, const char* methodName, const char* methodPayload, unsigned int timeout, IOTHUB_METHOD_INVOKE_CALLBACK methodInvokeCallback, void* context) +{ + IOTHUB_CLIENT_RESULT result; + HTTPWORKER_THREAD_INFO *threadInfo; + + if ((iotHubClientHandle == NULL) || (deviceId == NULL) || (methodName == NULL) || (methodPayload == NULL)) + { + LogError("Invalid argument (iotHubClientHandle=%p, deviceId=%p, methodName=%p, methodPayload=%p)", iotHubClientHandle, deviceId, methodName, methodPayload); + result = IOTHUB_CLIENT_INVALID_ARG; + } + else if ((threadInfo = allocateMethodInvoke(iotHubClientHandle, deviceId, moduleId, methodName, methodPayload, timeout, methodInvokeCallback, context)) == NULL) + { + LogError("failed allocating method invoke thread info"); + result = IOTHUB_CLIENT_ERROR; + } + else if ((result = startHttpWorkerThread(iotHubClientHandle, threadInfo, uploadMethodInvoke_thread)) != IOTHUB_CLIENT_OK) + { + LogError("unable to start method invoke thread"); + freeHttpWorkerThreadInfo(threadInfo); + } + else + { + result = IOTHUB_CLIENT_OK; + } return result; } #endif /* USE_EDGE_MODULES */ +