Simple sample that demonstrates reading the FXOS8700CQ accelerometer, convert the data to JSON and send to an Azure IoT Hub.
Dependencies: azure_umqtt_c iothub_mqtt_transport mbed-rtos mbed wolfSSL Socket lwip-eth lwip-sys lwip
Diff: iothub_client/iothubtransport.c
- Revision:
- 3:c0556ff7b8e3
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iothub_client/iothubtransport.c Thu Dec 08 00:11:40 2016 +0000 @@ -0,0 +1,384 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#include <stdlib.h> +#ifdef _CRTDBG_MAP_ALLOC +#include <crtdbg.h> +#endif + +#include "azure_c_shared_utility/gballoc.h" + +#include <stdlib.h> +#include <signal.h> +#include <stddef.h> +#include "azure_c_shared_utility/crt_abstractions.h" +#include "iothubtransport.h" +#include "iothub_client.h" +#include "iothub_client_private.h" +#include "azure_c_shared_utility/threadapi.h" +#include "azure_c_shared_utility/lock.h" +#include "azure_c_shared_utility/xlogging.h" +#include "azure_c_shared_utility/vector.h" + +typedef struct TRANSPORT_HANDLE_DATA_TAG +{ + TRANSPORT_LL_HANDLE transportLLHandle; + THREAD_HANDLE workerThreadHandle; + LOCK_HANDLE lockHandle; + sig_atomic_t stopThread; + TRANSPORT_PROVIDER_FIELDS; + VECTOR_HANDLE clients; +} TRANSPORT_HANDLE_DATA; + +/* Used for Unit test */ +const size_t IoTHubTransport_ThreadTerminationOffset = offsetof(TRANSPORT_HANDLE_DATA, stopThread); + +TRANSPORT_HANDLE IoTHubTransport_Create(IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol, const char* iotHubName, const char* iotHubSuffix) +{ + TRANSPORT_HANDLE_DATA *result; + + if (protocol == NULL || iotHubName == NULL || iotHubSuffix == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_002: [ If protocol is NULL, this function shall return NULL. ]*/ + /*Codes_SRS_IOTHUBTRANSPORT_17_003: [ If iotHubName is NULL, this function shall return NULL. ]*/ + /*Codes_SRS_IOTHUBTRANSPORT_17_004: [ If iotHubSuffix is NULL, this function shall return NULL. ]*/ + LogError("Invalid NULL argument, protocol [%p], name [%p], suffix [%p].", protocol, iotHubName, iotHubSuffix); + result = NULL; + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_032: [ IoTHubTransport_Create shall allocate memory for the transport data. ]*/ + result = (TRANSPORT_HANDLE_DATA*)malloc(sizeof(TRANSPORT_HANDLE_DATA)); + if (result == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_040: [ If memory allocation fails, IoTHubTransport_Create shall return NULL. ]*/ + LogError("Transport handle was not allocated."); + } + else + { + TRANSPORT_PROVIDER * transportProtocol = (TRANSPORT_PROVIDER*)(protocol()); + IOTHUB_CLIENT_CONFIG upperConfig; + upperConfig.deviceId = NULL; + upperConfig.deviceKey = NULL; + upperConfig.iotHubName = iotHubName; + upperConfig.iotHubSuffix = iotHubSuffix; + upperConfig.protocol = protocol; + upperConfig.protocolGatewayHostName = NULL; + + IOTHUBTRANSPORT_CONFIG transportLLConfig; + transportLLConfig.upperConfig = &upperConfig; + transportLLConfig.waitingToSend = NULL; + + /*Codes_SRS_IOTHUBTRANSPORT_17_005: [ IoTHubTransport_Create shall create the lower layer transport by calling the protocol's IoTHubTransport_Create function. ]*/ + result->transportLLHandle = transportProtocol->IoTHubTransport_Create(&transportLLConfig); + if (result->transportLLHandle == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_006: [ If the creation of the transport fails, IoTHubTransport_Create shall return NULL. ]*/ + LogError("Lower Layer transport not created."); + free(result); + result = NULL; + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_007: [ IoTHubTransport_Create shall create the transport lock by Calling Lock_Init. ]*/ + result->lockHandle = Lock_Init(); + if (result->lockHandle == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_008: [ If the lock creation fails, IoTHubTransport_Create shall return NULL. ]*/ + LogError("transport Lock not created."); + transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle); + free(result); + result = NULL; + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_038: [ IoTHubTransport_Create shall call VECTOR_Create to make a list of IOTHUB_CLIENT_HANDLE using this transport. ]*/ + result->clients = VECTOR_create(sizeof(IOTHUB_CLIENT_HANDLE)); + if (result->clients == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_039: [ If the Vector creation fails, IoTHubTransport_Create shall return NULL. ]*/ + /*Codes_SRS_IOTHUBTRANSPORT_17_009: [ IoTHubTransport_Create shall clean up any resources it creates if the function does not succeed. ]*/ + LogError("clients list not created."); + Lock_Deinit(result->lockHandle); + transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle); + free(result); + result = NULL; + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_001: [ IoTHubTransport_Create shall return a non-NULL handle on success.]*/ + result->stopThread = 1; + result->workerThreadHandle = NULL; /* create thread when work needs to be done */ + result->IoTHubTransport_GetHostname = transportProtocol->IoTHubTransport_GetHostname; + result->IoTHubTransport_SetOption = transportProtocol->IoTHubTransport_SetOption; + result->IoTHubTransport_Create = transportProtocol->IoTHubTransport_Create; + result->IoTHubTransport_Destroy = transportProtocol->IoTHubTransport_Destroy; + result->IoTHubTransport_Register = transportProtocol->IoTHubTransport_Register; + result->IoTHubTransport_Unregister = transportProtocol->IoTHubTransport_Unregister; + result->IoTHubTransport_Subscribe = transportProtocol->IoTHubTransport_Subscribe; + result->IoTHubTransport_Unsubscribe = transportProtocol->IoTHubTransport_Unsubscribe; + result->IoTHubTransport_DoWork = transportProtocol->IoTHubTransport_DoWork; + result->IoTHubTransport_SetRetryPolicy = transportProtocol->IoTHubTransport_SetRetryPolicy; + result->IoTHubTransport_GetSendStatus = transportProtocol->IoTHubTransport_GetSendStatus; + } + } + } + } + } + + return result; +} + +static int transport_worker_thread(void* threadArgument) +{ + printf("transport_worker_thread\r\n"); + TRANSPORT_HANDLE_DATA* transportData = (TRANSPORT_HANDLE_DATA*)threadArgument; + + while (1) + { + printf("Acquiring lock\r\n"); + /*Codes_SRS_IOTHUBTRANSPORT_17_030: [ All calls to lower layer transport DoWork shall be protected by the lock created in IoTHubTransport_Create. ]*/ + if (Lock(transportData->lockHandle) == LOCK_OK) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_031: [ If acquiring the lock fails, lower layer transport DoWork shall not be called. ]*/ + printf("stopThread=%d\r\n", transportData->stopThread); + if (transportData->stopThread) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_028: [ The thread shall exit when IoTHubTransport_EndWorkerThread has been called for each clientHandle which invoked IoTHubTransport_StartWorkerThread. ]*/ + (void)Unlock(transportData->lockHandle); + break; + } + else + { + (transportData->IoTHubTransport_DoWork)(transportData->transportLLHandle, NULL); + (void)Unlock(transportData->lockHandle); + } + } + /*Codes_SRS_IOTHUBTRANSPORT_17_029: [ The thread shall call lower layer transport DoWork every 1 ms. ]*/ + ThreadAPI_Sleep(1); + } + + return 0; +} + +static bool find_by_handle(const void* element, const void* value) +{ + /* data stored at element is device handle */ + const IOTHUB_CLIENT_HANDLE * guess = (const IOTHUB_CLIENT_HANDLE *)element; + const IOTHUB_CLIENT_HANDLE match = (const IOTHUB_CLIENT_HANDLE)value; + return (*guess == match); +} + +static IOTHUB_CLIENT_RESULT start_worker_if_needed(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_HANDLE clientHandle) +{ + printf("start_worker_thread_if_needed\r\n"); + IOTHUB_CLIENT_RESULT result; + if (transportData->workerThreadHandle == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_018: [ If the worker thread does not exist, IoTHubTransport_StartWorkerThread shall start the thread using ThreadAPI_Create. ]*/ + transportData->stopThread = 0; + if (ThreadAPI_Create(&transportData->workerThreadHandle, transport_worker_thread, transportData) != THREADAPI_OK) + { + transportData->workerThreadHandle = NULL; + } + } + if (transportData->workerThreadHandle != NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_020: [ IoTHubTransport_StartWorkerThread shall search for IoTHubClient clientHandle in the list of IoTHubClient handles. ]*/ + bool addToList = ((VECTOR_size(transportData->clients) == 0) || (VECTOR_find_if(transportData->clients, find_by_handle, clientHandle) == NULL)); + if (addToList) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_021: [ If handle is not found, then clientHandle shall be added to the list. ]*/ + if (VECTOR_push_back(transportData->clients, &clientHandle, 1) != 0) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_042: [ If Adding to the client list fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. ]*/ + result = IOTHUB_CLIENT_ERROR; + } + else + { + result = IOTHUB_CLIENT_OK; + } + } + else + { + result = IOTHUB_CLIENT_OK; + } + } + else + { + result = IOTHUB_CLIENT_ERROR; + } + return result; +} + +static void stop_worker_thread(TRANSPORT_HANDLE_DATA * transportData) +{ + printf("stop_worker_thread\r\n"); + /*Codes_SRS_IOTHUBTRANSPORT_17_043: [** IoTHubTransport_SignalEndWorkerThread shall signal the worker thread to end.*/ + transportData->stopThread = 1; +} + +static void wait_worker_thread(TRANSPORT_HANDLE_DATA * transportData) +{ + printf("wait_worker_thread\r\n"); + if (transportData->workerThreadHandle != NULL) + { + int res; + /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ If handle list is empty, IoTHubTransport_EndWorkerThread shall be joined. ]*/ + if (ThreadAPI_Join(transportData->workerThreadHandle, &res) != THREADAPI_OK) + { + LogError("ThreadAPI_Join failed"); + } + else + { + transportData->workerThreadHandle = NULL; + } + } +} + +static bool signal_end_worker_thread(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_HANDLE clientHandle) +{ + printf("signal_end_worker_thread\r\n"); + bool okToJoin; + void* element = VECTOR_find_if(transportData->clients, find_by_handle, clientHandle); + if (element != NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_026: [ IoTHubTransport_EndWorkerThread shall remove clientHandlehandle from handle list. ]*/ + VECTOR_erase(transportData->clients, element, 1); + } + /*Codes_SRS_IOTHUBTRANSPORT_17_025: [ If the worker thread does not exist, then IoTHubTransport_EndWorkerThread shall return. ]*/ + if (transportData->workerThreadHandle != NULL) + { + if (VECTOR_size(transportData->clients) == 0) + { + stop_worker_thread(transportData); + okToJoin = true; + } + else + { + okToJoin = false; + } + } + else + { + okToJoin = false; + } + return okToJoin; +} + +void IoTHubTransport_Destroy(TRANSPORT_HANDLE transportHandle) +{ + /*Codes_SRS_IOTHUBTRANSPORT_17_011: [ IoTHubTransport_Destroy shall do nothing if transportHandle is NULL. ]*/ + if (transportHandle != NULL) + { + TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; + /*Codes_SRS_IOTHUBTRANSPORT_17_033: [ IoTHubTransport_Destroy shall lock the transport lock. ]*/ + if (Lock(transportData->lockHandle) != LOCK_OK) + { + LogError("Unable to lock - will still attempt to end thread without thread safety"); + stop_worker_thread(transportData); + } + else + { + stop_worker_thread(transportData); + (void)Unlock(transportData->lockHandle); + } + wait_worker_thread(transportData); + /*Codes_SRS_IOTHUBTRANSPORT_17_010: [ IoTHubTransport_Destroy shall free all resources. ]*/ + Lock_Deinit(transportData->lockHandle); + (transportData->IoTHubTransport_Destroy)(transportData->transportLLHandle); + VECTOR_destroy(transportData->clients); + free(transportHandle); + } +} + +LOCK_HANDLE IoTHubTransport_GetLock(TRANSPORT_HANDLE transportHandle) +{ + LOCK_HANDLE lock; + if (transportHandle == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_013: [ If transportHandle is NULL, IoTHubTransport_GetLock shall return NULL. ]*/ + lock = NULL; + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_012: [ IoTHubTransport_GetLock shall return a handle to the transport lock. ]*/ + TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; + lock = transportData->lockHandle; + } + return lock; +} + +TRANSPORT_LL_HANDLE IoTHubTransport_GetLLTransport(TRANSPORT_HANDLE transportHandle) +{ + TRANSPORT_LL_HANDLE llTransport; + if (transportHandle == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_015: [ If transportHandle is NULL, IoTHubTransport_GetLLTransport shall return NULL. ]*/ + llTransport = NULL; + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_014: [ IoTHubTransport_GetLLTransport shall return a handle to the lower layer transport. ]*/ + TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; + llTransport = transportData->transportLLHandle; + } + return llTransport; +} + +IOTHUB_CLIENT_RESULT IoTHubTransport_StartWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle) +{ + printf("In IoTHubTransport_StartWorkerThread\r\n"); + IOTHUB_CLIENT_RESULT result; + if (transportHandle == NULL || clientHandle == NULL) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_016: [ If transportHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ + /*Codes_SRS_IOTHUBTRANSPORT_17_017: [ If clientHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ + result = IOTHUB_CLIENT_INVALID_ARG; + } + else + { + TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; + + if ((result = start_worker_if_needed(transportData, clientHandle)) != IOTHUB_CLIENT_OK) + { + /*Codes_SRS_IOTHUBTRANSPORT_17_019: [ If thread creation fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. */ + LogError("Unable to start thread safely"); + } + else + { + /*Codes_SRS_IOTHUBTRANSPORT_17_022: [ Upon success, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_OK. ]*/ + result = IOTHUB_CLIENT_OK; + } + } + return result; +} + +bool IoTHubTransport_SignalEndWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle) +{ + bool okToJoin; + /*Codes_SRS_IOTHUBTRANSPORT_17_023: [ If transportHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/ + /*Codes_SRS_IOTHUBTRANSPORT_17_024: [ If clientHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/ + if (!(transportHandle == NULL || clientHandle == NULL)) + { + TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; + okToJoin = signal_end_worker_thread(transportData, clientHandle); + } + else + { + okToJoin = false; + } + return okToJoin; +} + +void IoTHubTransport_JoinWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_HANDLE clientHandle) +{ + /*Codes_SRS_IOTHUBTRANSPORT_17_044: [ If transportHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/ + /*Codes_SRS_IOTHUBTRANSPORT_17_045: [ If clientHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/ + if (!(transportHandle == NULL || clientHandle == NULL)) + { + TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle; + /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ The worker thread shall be joined. ]*/ + wait_worker_thread(transportData); + } +}