Microsoft Azure IoTHub client libraries
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more
This library implements the Microsoft Azure IoTHub client library. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothub_client.c
- Revision:
- 61:8b85a4e797cf
- Parent:
- 60:41648c4e7036
- Child:
- 62:5a4cdacf5090
--- a/iothub_client.c Fri Feb 24 14:00:43 2017 -0800 +++ b/iothub_client.c Fri Mar 10 11:47:36 2017 -0800 @@ -11,6 +11,7 @@ #include "azure_c_shared_utility/crt_abstractions.h" #include "iothub_client.h" #include "iothub_client_ll.h" +#include "iothub_client_private.h" #include "iothubtransport.h" #include "azure_c_shared_utility/threadapi.h" #include "azure_c_shared_utility/lock.h" @@ -37,8 +38,10 @@ IOTHUB_CLIENT_REPORTED_STATE_CALLBACK reported_state_callback; IOTHUB_CLIENT_CONNECTION_STATUS_CALLBACK connection_status_callback; IOTHUB_CLIENT_INBOUND_DEVICE_METHOD_CALLBACK device_method_callback; + IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC message_callback; struct IOTHUB_QUEUE_CONTEXT_TAG* devicetwin_user_context; struct IOTHUB_QUEUE_CONTEXT_TAG* connection_status_user_context; + struct IOTHUB_QUEUE_CONTEXT_TAG* message_user_context; } IOTHUB_CLIENT_INSTANCE; #ifndef DONT_USE_UPLOADTOBLOB @@ -61,7 +64,8 @@ CALLBACK_TYPE_EVENT_CONFIRM, \ CALLBACK_TYPE_REPORTED_STATE, \ CALLBACK_TYPE_CONNECTION_STATUS, \ - CALLBACK_TYPE_DEVICE_METHOD + CALLBACK_TYPE_DEVICE_METHOD, \ + CALLBACK_TYPE_MESSAGE DEFINE_ENUM(USER_CALLBACK_TYPE, USER_CALLBACK_TYPE_VALUES) DEFINE_ENUM_STRINGS(USER_CALLBACK_TYPE, USER_CALLBACK_TYPE_VALUES) @@ -107,6 +111,7 @@ REPORTED_STATE_CALLBACK_INFO reported_state_cb_info; CONNECTION_STATUS_CALLBACK_INFO connection_status_cb_info; METHOD_CALLBACK_INFO method_cb_info; + MESSAGE_CALLBACK_INFO* message_cb_info; } iothub_callback; } USER_CALLBACK_INFO; @@ -168,11 +173,32 @@ } #endif -static IOTHUBMESSAGE_DISPOSITION_RESULT iothub_ll_message_callback(IOTHUB_MESSAGE_HANDLE message, void* userContextCallback) +static bool iothub_ll_message_callback(MESSAGE_CALLBACK_INFO* messageData, void* userContextCallback) { - (void)message; - (void)userContextCallback; - return IOTHUBMESSAGE_ABANDONED; + bool result; + IOTHUB_QUEUE_CONTEXT* queue_context = (IOTHUB_QUEUE_CONTEXT*)userContextCallback; + if (queue_context == NULL) + { + LogError("invalid parameter userContextCallback(NULL)"); + result = false; + } + else + { + USER_CALLBACK_INFO queue_cb_info; + queue_cb_info.type = CALLBACK_TYPE_MESSAGE; + queue_cb_info.userContextCallback = queue_context->userContextCallback; + queue_cb_info.iothub_callback.message_cb_info = messageData; + if (VECTOR_push_back(queue_context->iotHubClientHandle->saved_user_callback_list, &queue_cb_info, 1) == 0) + { + result = true; + } + else + { + LogError("message callback vector push failed."); + result = false; + } + } + return result; } static int iothub_ll_inbound_device_method_callback(const char* method_name, const unsigned char* payload, size_t size, METHOD_HANDLE method_id, void* userContextCallback) @@ -326,96 +352,89 @@ } } -static void dispatch_user_callbacks(IOTHUB_CLIENT_INSTANCE* iotHubClientInstance) +static void dispatch_user_callbacks(IOTHUB_CLIENT_INSTANCE* iotHubClientInstance, VECTOR_HANDLE call_backs) { - if (Lock(iotHubClientInstance->LockHandle) == LOCK_OK) + size_t callbacks_length = VECTOR_size(call_backs); + size_t index; + for (index = 0; index < callbacks_length; index++) { - size_t callbacks_length = VECTOR_size(iotHubClientInstance->saved_user_callback_list); - size_t index = 0; - for (index = 0; index < callbacks_length; index++) + USER_CALLBACK_INFO* queued_cb = (USER_CALLBACK_INFO*)VECTOR_element(call_backs, index); + if (queued_cb == NULL) + { + LogError("VECTOR_element at index %zd is NULL.", index); + } + else { - USER_CALLBACK_INFO* queued_cb = (USER_CALLBACK_INFO*)VECTOR_element(iotHubClientInstance->saved_user_callback_list, index); - if (queued_cb != NULL) + switch (queued_cb->type) { - switch (queued_cb->type) - { - case CALLBACK_TYPE_DEVICE_TWIN: - if (iotHubClientInstance->desired_state_callback) + case CALLBACK_TYPE_DEVICE_TWIN: + if (iotHubClientInstance->desired_state_callback) + { + iotHubClientInstance->desired_state_callback(queued_cb->iothub_callback.dev_twin_cb_info.update_state, queued_cb->iothub_callback.dev_twin_cb_info.payLoad, queued_cb->iothub_callback.dev_twin_cb_info.size, queued_cb->userContextCallback); + } + if (queued_cb->iothub_callback.dev_twin_cb_info.payLoad) + { + free(queued_cb->iothub_callback.dev_twin_cb_info.payLoad); + } + break; + case CALLBACK_TYPE_EVENT_CONFIRM: + if (iotHubClientInstance->event_confirm_callback) + { + iotHubClientInstance->event_confirm_callback(queued_cb->iothub_callback.event_confirm_cb_info.confirm_result, queued_cb->userContextCallback); + } + break; + case CALLBACK_TYPE_REPORTED_STATE: + if (iotHubClientInstance->reported_state_callback) + { + iotHubClientInstance->reported_state_callback(queued_cb->iothub_callback.reported_state_cb_info.status_code, queued_cb->userContextCallback); + } + break; + case CALLBACK_TYPE_CONNECTION_STATUS: + if (iotHubClientInstance->connection_status_callback) + { + iotHubClientInstance->connection_status_callback(queued_cb->iothub_callback.connection_status_cb_info.connection_status, queued_cb->iothub_callback.connection_status_cb_info.status_reason, queued_cb->userContextCallback); + } + break; + case CALLBACK_TYPE_DEVICE_METHOD: + if (iotHubClientInstance->device_method_callback) + { + const char* method_name = STRING_c_str(queued_cb->iothub_callback.method_cb_info.method_name); + const unsigned char* payload = BUFFER_u_char(queued_cb->iothub_callback.method_cb_info.payload); + size_t payload_len = BUFFER_length(queued_cb->iothub_callback.method_cb_info.payload); + iotHubClientInstance->device_method_callback(method_name, payload, payload_len, queued_cb->iothub_callback.method_cb_info.method_id, queued_cb->userContextCallback); + + BUFFER_delete(queued_cb->iothub_callback.method_cb_info.payload); + STRING_delete(queued_cb->iothub_callback.method_cb_info.method_name); + } + break; + case CALLBACK_TYPE_MESSAGE: + if (iotHubClientInstance->message_callback) + { + IOTHUBMESSAGE_DISPOSITION_RESULT disposition = iotHubClientInstance->message_callback(queued_cb->iothub_callback.message_cb_info->messageHandle, queued_cb->userContextCallback); + IOTHUB_CLIENT_HANDLE handle = iotHubClientInstance->message_user_context->iotHubClientHandle; + + if (Lock(handle->LockHandle) == LOCK_OK) { - (void)Unlock(iotHubClientInstance->LockHandle); - iotHubClientInstance->desired_state_callback(queued_cb->iothub_callback.dev_twin_cb_info.update_state, queued_cb->iothub_callback.dev_twin_cb_info.payLoad, queued_cb->iothub_callback.dev_twin_cb_info.size, queued_cb->userContextCallback); - if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK) + IOTHUB_CLIENT_RESULT result = IoTHubClient_LL_SendMessageDisposition(handle->IoTHubClientLLHandle, queued_cb->iothub_callback.message_cb_info, disposition); + if (result != IOTHUB_CLIENT_OK) { - LogError("Failed locking desired callback"); + LogError("IoTHubClient_LL_Send_Message_Disposition failed"); } - } - if (queued_cb->iothub_callback.dev_twin_cb_info.payLoad) - { - free(queued_cb->iothub_callback.dev_twin_cb_info.payLoad); - } - break; - case CALLBACK_TYPE_EVENT_CONFIRM: - if (iotHubClientInstance->event_confirm_callback) - { - (void)Unlock(iotHubClientInstance->LockHandle); - iotHubClientInstance->event_confirm_callback(queued_cb->iothub_callback.event_confirm_cb_info.confirm_result, queued_cb->userContextCallback); - if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK) - { - LogError("Failed locking after event confirm callback"); - } + (void)Unlock(handle->LockHandle); } - break; - case CALLBACK_TYPE_REPORTED_STATE: - if (iotHubClientInstance->reported_state_callback) + else { - (void)Unlock(iotHubClientInstance->LockHandle); - iotHubClientInstance->reported_state_callback(queued_cb->iothub_callback.reported_state_cb_info.status_code, queued_cb->userContextCallback); - if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK) - { - LogError("Failed locking after reported state callback"); - } - } - break; - case CALLBACK_TYPE_CONNECTION_STATUS: - if (iotHubClientInstance->connection_status_callback) - { - (void)Unlock(iotHubClientInstance->LockHandle); - iotHubClientInstance->connection_status_callback(queued_cb->iothub_callback.connection_status_cb_info.connection_status, queued_cb->iothub_callback.connection_status_cb_info.status_reason, queued_cb->userContextCallback); - if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK) - { - LogError("Failed locking after connection status callback"); - } + LogError("Lock failed"); } - break; - case CALLBACK_TYPE_DEVICE_METHOD: - if (iotHubClientInstance->device_method_callback) - { - (void)Unlock(iotHubClientInstance->LockHandle); - const char* method_name = STRING_c_str(queued_cb->iothub_callback.method_cb_info.method_name); - const unsigned char* payload = BUFFER_u_char(queued_cb->iothub_callback.method_cb_info.payload); - size_t payload_len = BUFFER_length(queued_cb->iothub_callback.method_cb_info.payload); - iotHubClientInstance->device_method_callback(method_name, payload, payload_len, queued_cb->iothub_callback.method_cb_info.method_id, queued_cb->userContextCallback); - if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK) - { - LogError("Failed locking after connection status callback"); - } - BUFFER_delete(queued_cb->iothub_callback.method_cb_info.payload); - STRING_delete(queued_cb->iothub_callback.method_cb_info.method_name); - } - break; - default: - LogError("Invalid callback type '%s'", ENUM_TO_STRING(USER_CALLBACK_TYPE, queued_cb->type)); - break; - } + } + break; + default: + LogError("Invalid callback type '%s'", ENUM_TO_STRING(USER_CALLBACK_TYPE, queued_cb->type)); + break; } } - VECTOR_clear(iotHubClientInstance->saved_user_callback_list); - (void)Unlock(iotHubClientInstance->LockHandle); } - else - { - LogError("Unable to aquire lock"); - } + VECTOR_destroy(call_backs); } static int ScheduleWork_Thread(void* threadArgument) @@ -441,7 +460,16 @@ #ifndef DONT_USE_UPLOADTOBLOB garbageCollectorImpl(iotHubClientInstance); #endif + VECTOR_HANDLE call_backs = VECTOR_move(iotHubClientInstance->saved_user_callback_list); (void)Unlock(iotHubClientInstance->LockHandle); + if (call_backs == NULL) + { + LogError("VECTOR_move failed"); + } + else + { + dispatch_user_callbacks(iotHubClientInstance, call_backs); + } } } else @@ -449,7 +477,6 @@ /*Codes_SRS_IOTHUBCLIENT_01_040: [If acquiring the lock fails, IoTHubClient_LL_DoWork shall not be called.]*/ /*no code, shall retry*/ } - dispatch_user_callbacks(iotHubClientInstance); (void)ThreadAPI_Sleep(1); } @@ -466,6 +493,7 @@ iotHubClientInstance->StopThread = 0; if (ThreadAPI_Create(&iotHubClientInstance->ThreadHandle, ScheduleWork_Thread, iotHubClientInstance) != THREADAPI_OK) { + LogError("ThreadAPI_Create failed"); iotHubClientInstance->ThreadHandle = NULL; result = IOTHUB_CLIENT_ERROR; } @@ -626,6 +654,8 @@ result->devicetwin_user_context = NULL; result->connection_status_callback = NULL; result->connection_status_user_context = NULL; + result->message_callback = NULL; + result->message_user_context = NULL; } } } @@ -806,6 +836,10 @@ { free(iotHubClientInstance->connection_status_user_context); } + if (iotHubClientInstance->message_user_context != NULL) + { + free(iotHubClientInstance->message_user_context); + } free(iotHubClientInstance); } } @@ -941,6 +975,11 @@ } else { + if (iotHubClientInstance->created_with_transport_handle == 0) + { + iotHubClientInstance->message_callback = messageCallback; + } + /* Codes_SRS_IOTHUBCLIENT_01_014: [IoTHubClient_SetMessageCallback shall start the worker thread if it was not previously started.] */ if ((result = StartWorkerThreadIfNeeded(iotHubClientInstance)) != IOTHUB_CLIENT_OK) { @@ -950,8 +989,42 @@ } else { - /* Codes_SRS_IOTHUBCLIENT_01_017: [IoTHubClient_SetMessageCallback shall call IoTHubClient_LL_SetMessageCallback, while passing the IoTHubClient_LL handle created by IoTHubClient_Create and the parameters messageCallback and userContextCallback.] */ - result = IoTHubClient_LL_SetMessageCallback(iotHubClientInstance->IoTHubClientLLHandle, messageCallback, userContextCallback); + if (iotHubClientInstance->message_user_context != NULL) + { + free(iotHubClientInstance->message_user_context); + } + if (messageCallback == NULL) + { + result = IoTHubClient_LL_SetMessageCallbackEx(iotHubClientInstance->IoTHubClientLLHandle, NULL, iotHubClientInstance->message_user_context); + } + else if (iotHubClientInstance->created_with_transport_handle != 0) + { + result = IoTHubClient_LL_SetMessageCallback(iotHubClientInstance->IoTHubClientLLHandle, messageCallback, userContextCallback); + } + else + { + iotHubClientInstance->message_user_context = (IOTHUB_QUEUE_CONTEXT*)malloc(sizeof(IOTHUB_QUEUE_CONTEXT)); + if (iotHubClientInstance->message_user_context == NULL) + { + result = IOTHUB_CLIENT_ERROR; + LogError("Failed allocating QUEUE_CONTEXT"); + } + else + { + iotHubClientInstance->message_user_context->iotHubClientHandle = iotHubClientHandle; + iotHubClientInstance->message_user_context->userContextCallback = userContextCallback; + + /* Codes_SRS_IOTHUBCLIENT_01_017: [IoTHubClient_SetMessageCallback shall call IoTHubClient_LL_SetMessageCallbackEx, while passing the IoTHubClient_LL handle created by IoTHubClient_Create and the local iothub_ll_message_callback wrapper of messageCallback and userContextCallback.] */ + /* Codes_SRS_IOTHUBCLIENT_01_018: [When IoTHubClient_LL_SetMessageCallbackEx is called, IoTHubClient_SetMessageCallback shall return the result of IoTHubClient_LL_SetMessageCallbackEx.] */ + result = IoTHubClient_LL_SetMessageCallbackEx(iotHubClientInstance->IoTHubClientLLHandle, iothub_ll_message_callback, iotHubClientInstance->message_user_context); + if (result != IOTHUB_CLIENT_OK) + { + LogError("IoTHubClient_LL_SetMessageCallback failed"); + free(iotHubClientInstance->message_user_context); + iotHubClientInstance->message_user_context = NULL; + } + } + } } /* Codes_SRS_IOTHUBCLIENT_01_027: [IoTHubClient_SetMessageCallback shall be made thread-safe by using the lock created in IoTHubClient_Create.] */ @@ -1026,6 +1099,7 @@ { LogError("IoTHubClient_LL_SetConnectionStatusCallback failed"); free(iotHubClientInstance->connection_status_user_context); + iotHubClientInstance->connection_status_user_context = NULL; } } } @@ -1263,6 +1337,7 @@ { LogError("IoTHubClient_LL_SetDeviceTwinCallback failed"); free(iotHubClientInstance->devicetwin_user_context); + iotHubClientInstance->devicetwin_user_context = NULL; } } }