Microsoft Azure IoTHub client libraries

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more

This library implements the Microsoft Azure IoTHub client library. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
61:8b85a4e797cf
Parent:
60:41648c4e7036
Child:
62:5a4cdacf5090
--- a/iothub_client.c	Fri Feb 24 14:00:43 2017 -0800
+++ b/iothub_client.c	Fri Mar 10 11:47:36 2017 -0800
@@ -11,6 +11,7 @@
 #include "azure_c_shared_utility/crt_abstractions.h"
 #include "iothub_client.h"
 #include "iothub_client_ll.h"
+#include "iothub_client_private.h"
 #include "iothubtransport.h"
 #include "azure_c_shared_utility/threadapi.h"
 #include "azure_c_shared_utility/lock.h"
@@ -37,8 +38,10 @@
     IOTHUB_CLIENT_REPORTED_STATE_CALLBACK reported_state_callback;
     IOTHUB_CLIENT_CONNECTION_STATUS_CALLBACK connection_status_callback;
     IOTHUB_CLIENT_INBOUND_DEVICE_METHOD_CALLBACK device_method_callback;
+    IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC message_callback;
     struct IOTHUB_QUEUE_CONTEXT_TAG* devicetwin_user_context;
     struct IOTHUB_QUEUE_CONTEXT_TAG* connection_status_user_context;
+    struct IOTHUB_QUEUE_CONTEXT_TAG* message_user_context;
 } IOTHUB_CLIENT_INSTANCE;
 
 #ifndef DONT_USE_UPLOADTOBLOB
@@ -61,7 +64,8 @@
     CALLBACK_TYPE_EVENT_CONFIRM,    \
     CALLBACK_TYPE_REPORTED_STATE,   \
     CALLBACK_TYPE_CONNECTION_STATUS, \
-    CALLBACK_TYPE_DEVICE_METHOD
+    CALLBACK_TYPE_DEVICE_METHOD,     \
+    CALLBACK_TYPE_MESSAGE
 
 DEFINE_ENUM(USER_CALLBACK_TYPE, USER_CALLBACK_TYPE_VALUES)
 DEFINE_ENUM_STRINGS(USER_CALLBACK_TYPE, USER_CALLBACK_TYPE_VALUES)
@@ -107,6 +111,7 @@
         REPORTED_STATE_CALLBACK_INFO reported_state_cb_info;
         CONNECTION_STATUS_CALLBACK_INFO connection_status_cb_info;
         METHOD_CALLBACK_INFO method_cb_info;
+        MESSAGE_CALLBACK_INFO* message_cb_info;
     } iothub_callback;
 } USER_CALLBACK_INFO;
 
@@ -168,11 +173,32 @@
 }
 #endif
 
-static IOTHUBMESSAGE_DISPOSITION_RESULT iothub_ll_message_callback(IOTHUB_MESSAGE_HANDLE message, void* userContextCallback)
+static bool iothub_ll_message_callback(MESSAGE_CALLBACK_INFO* messageData, void* userContextCallback)
 {
-    (void)message;
-    (void)userContextCallback;
-    return IOTHUBMESSAGE_ABANDONED;
+    bool result;
+    IOTHUB_QUEUE_CONTEXT* queue_context = (IOTHUB_QUEUE_CONTEXT*)userContextCallback;
+    if (queue_context == NULL)
+    {
+        LogError("invalid parameter userContextCallback(NULL)");
+        result = false;
+    }
+    else
+    {
+        USER_CALLBACK_INFO queue_cb_info;
+        queue_cb_info.type = CALLBACK_TYPE_MESSAGE;
+        queue_cb_info.userContextCallback = queue_context->userContextCallback;
+        queue_cb_info.iothub_callback.message_cb_info = messageData;
+        if (VECTOR_push_back(queue_context->iotHubClientHandle->saved_user_callback_list, &queue_cb_info, 1) == 0)
+        {
+            result = true;
+        }
+        else
+        {
+            LogError("message callback vector push failed.");
+            result = false;
+        }
+    }
+    return result;
 }
 
 static int iothub_ll_inbound_device_method_callback(const char* method_name, const unsigned char* payload, size_t size, METHOD_HANDLE method_id, void* userContextCallback)
@@ -326,96 +352,89 @@
     }
 }
 
-static void dispatch_user_callbacks(IOTHUB_CLIENT_INSTANCE* iotHubClientInstance)
+static void dispatch_user_callbacks(IOTHUB_CLIENT_INSTANCE* iotHubClientInstance, VECTOR_HANDLE call_backs)
 {
-    if (Lock(iotHubClientInstance->LockHandle) == LOCK_OK)
+    size_t callbacks_length = VECTOR_size(call_backs);
+    size_t index;
+    for (index = 0; index < callbacks_length; index++)
     {
-        size_t callbacks_length = VECTOR_size(iotHubClientInstance->saved_user_callback_list);
-        size_t index = 0;
-        for (index = 0; index < callbacks_length; index++)
+        USER_CALLBACK_INFO* queued_cb = (USER_CALLBACK_INFO*)VECTOR_element(call_backs, index);
+        if (queued_cb == NULL)
+        {
+            LogError("VECTOR_element at index %zd is NULL.", index);
+        }
+        else
         {
-            USER_CALLBACK_INFO* queued_cb = (USER_CALLBACK_INFO*)VECTOR_element(iotHubClientInstance->saved_user_callback_list, index);
-            if (queued_cb != NULL)
+            switch (queued_cb->type)
             {
-                switch (queued_cb->type)
-                {
-                    case CALLBACK_TYPE_DEVICE_TWIN:
-                        if (iotHubClientInstance->desired_state_callback)
+                case CALLBACK_TYPE_DEVICE_TWIN:
+                    if (iotHubClientInstance->desired_state_callback)
+                    {
+                        iotHubClientInstance->desired_state_callback(queued_cb->iothub_callback.dev_twin_cb_info.update_state, queued_cb->iothub_callback.dev_twin_cb_info.payLoad, queued_cb->iothub_callback.dev_twin_cb_info.size, queued_cb->userContextCallback);
+                    }
+                    if (queued_cb->iothub_callback.dev_twin_cb_info.payLoad)
+                    {
+                        free(queued_cb->iothub_callback.dev_twin_cb_info.payLoad);
+                    }
+                    break;
+                case CALLBACK_TYPE_EVENT_CONFIRM:
+                    if (iotHubClientInstance->event_confirm_callback)
+                    {
+                        iotHubClientInstance->event_confirm_callback(queued_cb->iothub_callback.event_confirm_cb_info.confirm_result, queued_cb->userContextCallback);
+                    }
+                    break;
+                case CALLBACK_TYPE_REPORTED_STATE:
+                    if (iotHubClientInstance->reported_state_callback)
+                    {
+                        iotHubClientInstance->reported_state_callback(queued_cb->iothub_callback.reported_state_cb_info.status_code, queued_cb->userContextCallback);
+                    }
+                    break;
+                case CALLBACK_TYPE_CONNECTION_STATUS:
+                    if (iotHubClientInstance->connection_status_callback)
+                    {
+                        iotHubClientInstance->connection_status_callback(queued_cb->iothub_callback.connection_status_cb_info.connection_status, queued_cb->iothub_callback.connection_status_cb_info.status_reason, queued_cb->userContextCallback);
+                    }
+                    break;
+                case CALLBACK_TYPE_DEVICE_METHOD:
+                    if (iotHubClientInstance->device_method_callback)
+                    {
+                        const char* method_name = STRING_c_str(queued_cb->iothub_callback.method_cb_info.method_name);
+                        const unsigned char* payload = BUFFER_u_char(queued_cb->iothub_callback.method_cb_info.payload);
+                        size_t payload_len = BUFFER_length(queued_cb->iothub_callback.method_cb_info.payload);
+                        iotHubClientInstance->device_method_callback(method_name, payload, payload_len, queued_cb->iothub_callback.method_cb_info.method_id, queued_cb->userContextCallback);
+
+                        BUFFER_delete(queued_cb->iothub_callback.method_cb_info.payload);
+                        STRING_delete(queued_cb->iothub_callback.method_cb_info.method_name);
+                    }
+                    break;
+                case CALLBACK_TYPE_MESSAGE:
+                    if (iotHubClientInstance->message_callback)
+                    {
+                        IOTHUBMESSAGE_DISPOSITION_RESULT disposition = iotHubClientInstance->message_callback(queued_cb->iothub_callback.message_cb_info->messageHandle, queued_cb->userContextCallback);
+                        IOTHUB_CLIENT_HANDLE handle = iotHubClientInstance->message_user_context->iotHubClientHandle;
+
+                        if (Lock(handle->LockHandle) == LOCK_OK)
                         {
-                            (void)Unlock(iotHubClientInstance->LockHandle);
-                            iotHubClientInstance->desired_state_callback(queued_cb->iothub_callback.dev_twin_cb_info.update_state, queued_cb->iothub_callback.dev_twin_cb_info.payLoad, queued_cb->iothub_callback.dev_twin_cb_info.size, queued_cb->userContextCallback);
-                            if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK)
+                            IOTHUB_CLIENT_RESULT result = IoTHubClient_LL_SendMessageDisposition(handle->IoTHubClientLLHandle, queued_cb->iothub_callback.message_cb_info, disposition);
+                            if (result != IOTHUB_CLIENT_OK)
                             {
-                                LogError("Failed locking desired callback");
+                                LogError("IoTHubClient_LL_Send_Message_Disposition failed");
                             }
-                        }
-                        if (queued_cb->iothub_callback.dev_twin_cb_info.payLoad)
-                        {
-                            free(queued_cb->iothub_callback.dev_twin_cb_info.payLoad);
-                        }
-                        break;
-                    case CALLBACK_TYPE_EVENT_CONFIRM:
-                        if (iotHubClientInstance->event_confirm_callback)
-                        {
-                            (void)Unlock(iotHubClientInstance->LockHandle);
-                            iotHubClientInstance->event_confirm_callback(queued_cb->iothub_callback.event_confirm_cb_info.confirm_result, queued_cb->userContextCallback);
-                            if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK)
-                            {
-                                LogError("Failed locking after event confirm callback");
-                            }
+                            (void)Unlock(handle->LockHandle);
                         }
-                        break;
-                    case CALLBACK_TYPE_REPORTED_STATE:
-                        if (iotHubClientInstance->reported_state_callback)
+                        else
                         {
-                            (void)Unlock(iotHubClientInstance->LockHandle);
-                            iotHubClientInstance->reported_state_callback(queued_cb->iothub_callback.reported_state_cb_info.status_code, queued_cb->userContextCallback);
-                            if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK)
-                            {
-                                LogError("Failed locking after reported state callback");
-                            }
-                        }
-                        break;
-                    case CALLBACK_TYPE_CONNECTION_STATUS:
-                        if (iotHubClientInstance->connection_status_callback)
-                        {
-                            (void)Unlock(iotHubClientInstance->LockHandle);
-                            iotHubClientInstance->connection_status_callback(queued_cb->iothub_callback.connection_status_cb_info.connection_status, queued_cb->iothub_callback.connection_status_cb_info.status_reason, queued_cb->userContextCallback);
-                            if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK)
-                            {
-                                LogError("Failed locking after connection status callback");
-                            }
+                            LogError("Lock failed");
                         }
-                        break;
-                    case CALLBACK_TYPE_DEVICE_METHOD:
-                        if (iotHubClientInstance->device_method_callback)
-                        {
-                            (void)Unlock(iotHubClientInstance->LockHandle);
-                            const char* method_name = STRING_c_str(queued_cb->iothub_callback.method_cb_info.method_name);
-                            const unsigned char* payload = BUFFER_u_char(queued_cb->iothub_callback.method_cb_info.payload);
-                            size_t payload_len = BUFFER_length(queued_cb->iothub_callback.method_cb_info.payload);
-                            iotHubClientInstance->device_method_callback(method_name, payload, payload_len, queued_cb->iothub_callback.method_cb_info.method_id, queued_cb->userContextCallback);
-                            if (Lock(iotHubClientInstance->LockHandle) != LOCK_OK)
-                            {
-                                LogError("Failed locking after connection status callback");
-                            }
-                            BUFFER_delete(queued_cb->iothub_callback.method_cb_info.payload);
-                            STRING_delete(queued_cb->iothub_callback.method_cb_info.method_name);
-                        }
-                        break;
-                    default:
-                        LogError("Invalid callback type '%s'", ENUM_TO_STRING(USER_CALLBACK_TYPE, queued_cb->type));
-                        break;
-                }
+                    }
+                    break;
+                default:
+                    LogError("Invalid callback type '%s'", ENUM_TO_STRING(USER_CALLBACK_TYPE, queued_cb->type));
+                    break;
             }
         }
-        VECTOR_clear(iotHubClientInstance->saved_user_callback_list);
-        (void)Unlock(iotHubClientInstance->LockHandle);
     }
-    else
-    {
-        LogError("Unable to aquire lock");
-    }
+    VECTOR_destroy(call_backs);
 }
 
 static int ScheduleWork_Thread(void* threadArgument)
@@ -441,7 +460,16 @@
 #ifndef DONT_USE_UPLOADTOBLOB
                 garbageCollectorImpl(iotHubClientInstance);
 #endif
+                VECTOR_HANDLE call_backs = VECTOR_move(iotHubClientInstance->saved_user_callback_list);
                 (void)Unlock(iotHubClientInstance->LockHandle);
+                if (call_backs == NULL)
+                {
+                    LogError("VECTOR_move failed");
+                }
+                else
+                {
+                    dispatch_user_callbacks(iotHubClientInstance, call_backs);
+                }
             }
         }
         else
@@ -449,7 +477,6 @@
             /*Codes_SRS_IOTHUBCLIENT_01_040: [If acquiring the lock fails, IoTHubClient_LL_DoWork shall not be called.]*/
             /*no code, shall retry*/
         }
-        dispatch_user_callbacks(iotHubClientInstance);
         (void)ThreadAPI_Sleep(1);
     }
 
@@ -466,6 +493,7 @@
             iotHubClientInstance->StopThread = 0;
             if (ThreadAPI_Create(&iotHubClientInstance->ThreadHandle, ScheduleWork_Thread, iotHubClientInstance) != THREADAPI_OK)
             {
+                LogError("ThreadAPI_Create failed");
                 iotHubClientInstance->ThreadHandle = NULL;
                 result = IOTHUB_CLIENT_ERROR;
             }
@@ -626,6 +654,8 @@
                     result->devicetwin_user_context = NULL;
                     result->connection_status_callback = NULL;
                     result->connection_status_user_context = NULL;
+                    result->message_callback = NULL;
+                    result->message_user_context = NULL;
                 }
             }
         }
@@ -806,6 +836,10 @@
         {
             free(iotHubClientInstance->connection_status_user_context);
         }
+        if (iotHubClientInstance->message_user_context != NULL)
+        {
+            free(iotHubClientInstance->message_user_context);
+        }
         free(iotHubClientInstance);
     }
 }
@@ -941,6 +975,11 @@
         }
         else
         {
+            if (iotHubClientInstance->created_with_transport_handle == 0)
+            {
+                iotHubClientInstance->message_callback = messageCallback;
+            }
+
             /* Codes_SRS_IOTHUBCLIENT_01_014: [IoTHubClient_SetMessageCallback shall start the worker thread if it was not previously started.] */
             if ((result = StartWorkerThreadIfNeeded(iotHubClientInstance)) != IOTHUB_CLIENT_OK)
             {
@@ -950,8 +989,42 @@
             }
             else
             {
-                /* Codes_SRS_IOTHUBCLIENT_01_017: [IoTHubClient_SetMessageCallback shall call IoTHubClient_LL_SetMessageCallback, while passing the IoTHubClient_LL handle created by IoTHubClient_Create and the parameters messageCallback and userContextCallback.] */
-                result = IoTHubClient_LL_SetMessageCallback(iotHubClientInstance->IoTHubClientLLHandle, messageCallback, userContextCallback);
+                if (iotHubClientInstance->message_user_context != NULL)
+                {
+                    free(iotHubClientInstance->message_user_context);
+                }
+                if (messageCallback == NULL)
+                {
+                    result = IoTHubClient_LL_SetMessageCallbackEx(iotHubClientInstance->IoTHubClientLLHandle, NULL, iotHubClientInstance->message_user_context);
+                }
+                else if (iotHubClientInstance->created_with_transport_handle != 0)
+                {
+                    result = IoTHubClient_LL_SetMessageCallback(iotHubClientInstance->IoTHubClientLLHandle, messageCallback, userContextCallback);
+                }
+                else
+                {
+                    iotHubClientInstance->message_user_context = (IOTHUB_QUEUE_CONTEXT*)malloc(sizeof(IOTHUB_QUEUE_CONTEXT));
+                    if (iotHubClientInstance->message_user_context == NULL)
+                    {
+                        result = IOTHUB_CLIENT_ERROR;
+                        LogError("Failed allocating QUEUE_CONTEXT");
+                    }
+                    else
+                    {
+                        iotHubClientInstance->message_user_context->iotHubClientHandle = iotHubClientHandle;
+                        iotHubClientInstance->message_user_context->userContextCallback = userContextCallback;
+
+                        /* Codes_SRS_IOTHUBCLIENT_01_017: [IoTHubClient_SetMessageCallback shall call IoTHubClient_LL_SetMessageCallbackEx, while passing the IoTHubClient_LL handle created by IoTHubClient_Create and the local iothub_ll_message_callback wrapper of messageCallback and userContextCallback.] */
+                        /* Codes_SRS_IOTHUBCLIENT_01_018: [When IoTHubClient_LL_SetMessageCallbackEx is called, IoTHubClient_SetMessageCallback shall return the result of IoTHubClient_LL_SetMessageCallbackEx.] */
+                        result = IoTHubClient_LL_SetMessageCallbackEx(iotHubClientInstance->IoTHubClientLLHandle, iothub_ll_message_callback, iotHubClientInstance->message_user_context);
+                        if (result != IOTHUB_CLIENT_OK)
+                        {
+                            LogError("IoTHubClient_LL_SetMessageCallback failed");
+                            free(iotHubClientInstance->message_user_context);
+                            iotHubClientInstance->message_user_context = NULL;
+                        }
+                    }
+                }
             }
 
             /* Codes_SRS_IOTHUBCLIENT_01_027: [IoTHubClient_SetMessageCallback shall be made thread-safe by using the lock created in IoTHubClient_Create.] */
@@ -1026,6 +1099,7 @@
                         {
                             LogError("IoTHubClient_LL_SetConnectionStatusCallback failed");
                             free(iotHubClientInstance->connection_status_user_context);
+                            iotHubClientInstance->connection_status_user_context = NULL;
                         }
                     }
                 }
@@ -1263,6 +1337,7 @@
                         {
                             LogError("IoTHubClient_LL_SetDeviceTwinCallback failed");
                             free(iotHubClientInstance->devicetwin_user_context);
+                            iotHubClientInstance->devicetwin_user_context = NULL;
                         }
                     }
                 }