Microsoft Azure IoTHub client libraries

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp f767zi_mqtt ... more

This library implements the Microsoft Azure IoTHub client library. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
89:a2ed767a532e
Parent:
88:248736be106e
Child:
91:bbf806070c5f
--- a/iothub_client_core_ll.c	Mon Jun 11 15:39:23 2018 -0700
+++ b/iothub_client_core_ll.c	Tue Jun 26 19:14:28 2018 -0700
@@ -12,6 +12,9 @@
 #include "azure_c_shared_utility/tickcounter.h"
 #include "azure_c_shared_utility/constbuffer.h"
 #include "azure_c_shared_utility/platform.h"
+#include "azure_c_shared_utility/singlylinkedlist.h" 
+#include "azure_c_shared_utility/shared_util_options.h"
+#include "azure_c_shared_utility/agenttime.h"
 
 #include "iothub_client_core_ll.h"
 #include "internal/iothub_client_authorization.h"
@@ -20,6 +23,7 @@
 #include "iothub_client_options.h"
 #include "iothub_client_version.h"
 #include "internal/iothub_client_diagnostic.h"
+#include <stdint.h>
 #include "internal/iothubtransport.h"
 
 #ifndef DONT_USE_UPLOADTOBLOB
@@ -61,6 +65,15 @@
     void* userContextCallback;
 }IOTHUB_METHOD_CALLBACK_DATA;
 
+typedef struct IOTHUB_EVENT_CALLBACK_TAG
+{
+    STRING_HANDLE inputName;
+    IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC callbackAsync;
+    IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC_EX callbackAsyncEx;
+    void* userContextCallback;
+    void* userContextCallbackEx;
+}IOTHUB_EVENT_CALLBACK;
+
 typedef struct IOTHUB_MESSAGE_CALLBACK_DATA_TAG
 {
     CALLBACK_TYPE type;
@@ -98,6 +111,7 @@
     IOTHUB_AUTHORIZATION_HANDLE authorization_module;
     STRING_HANDLE product_info;
     IOTHUB_DIAGNOSTIC_SETTING_DATA diagnostic_setting;
+    SINGLYLINKEDLIST_HANDLE event_callbacks;  // List of IOTHUB_EVENT_CALLBACK's
 }IOTHUB_CLIENT_CORE_LL_HANDLE_DATA;
 
 static const char HOSTNAME_TOKEN[] = "HostName";
@@ -106,7 +120,8 @@
 static const char X509_TOKEN_ONLY_ACCEPTABLE_VALUE[] = "true";
 static const char DEVICEKEY_TOKEN[] = "SharedAccessKey";
 static const char DEVICESAS_TOKEN[] = "SharedAccessSignature";
-static const char PROTOCOL_GATEWAY_HOST[] = "GatewayHostName";
+static const char PROTOCOL_GATEWAY_HOST_TOKEN[] = "GatewayHostName";
+static const char MODULE_ID_TOKEN[] = "ModuleId";
 static const char PROVISIONING_TOKEN[] = "UseProvisioning";
 static const char PROVISIONING_ACCEPTABLE_VALUE[] = "true";
 
@@ -130,6 +145,8 @@
     handleData->IoTHubTransport_Subscribe_DeviceMethod = protocol->IoTHubTransport_Subscribe_DeviceMethod;
     handleData->IoTHubTransport_Unsubscribe_DeviceMethod = protocol->IoTHubTransport_Unsubscribe_DeviceMethod;
     handleData->IoTHubTransport_DeviceMethod_Response = protocol->IoTHubTransport_DeviceMethod_Response;
+    handleData->IoTHubTransport_Subscribe_InputQueue = protocol->IoTHubTransport_Subscribe_InputQueue;
+    handleData->IoTHubTransport_Unsubscribe_InputQueue = protocol->IoTHubTransport_Unsubscribe_InputQueue;
 }
 
 static void device_twin_data_destroy(IOTHUB_DEVICE_TWIN* client_item)
@@ -194,7 +211,7 @@
     return result;
 }
 
-static IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* initialize_iothub_client(const IOTHUB_CLIENT_CONFIG* client_config, const IOTHUB_CLIENT_DEVICE_CONFIG* device_config, bool use_dev_auth)
+static IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* initialize_iothub_client(const IOTHUB_CLIENT_CONFIG* client_config, const IOTHUB_CLIENT_DEVICE_CONFIG* device_config, bool use_dev_auth, const char* module_id)
 {
     IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* result;
     srand((unsigned int)time(NULL));
@@ -222,7 +239,7 @@
             memset(result, 0, sizeof(IOTHUB_CLIENT_CORE_LL_HANDLE_DATA));
             if (use_dev_auth)
             {
-                if ((result->authorization_module = IoTHubClient_Auth_CreateFromDeviceAuth(client_config->deviceId)) == NULL)
+                if ((result->authorization_module = IoTHubClient_Auth_CreateFromDeviceAuth(client_config->deviceId, module_id)) == NULL)
                 {
                     LogError("Failed create authorization module");
                     free(result);
@@ -249,7 +266,7 @@
                 }
 
                 /* Codes_SRS_IOTHUBCLIENT_LL_07_029: [ IoTHubClientCore_LL_Create shall create the Auth module with the device_key, device_id, and/or deviceSasToken values ] */
-                if ((result->authorization_module = IoTHubClient_Auth_Create(device_key, device_id, sas_token)) == NULL)
+                if ((result->authorization_module = IoTHubClient_Auth_Create(device_key, device_id, sas_token, module_id)) == NULL)
                 {
                     LogError("Failed create authorization module");
                     free(result);
@@ -268,6 +285,7 @@
                     lowerLayerConfig.upperConfig = client_config;
                     lowerLayerConfig.waitingToSend = &(result->waitingToSend);
                     lowerLayerConfig.auth_module_handle = result->authorization_module;
+                    lowerLayerConfig.moduleId = module_id;
 
                     setTransportProtocol(result, (TRANSPORT_PROVIDER*)client_config->protocol());
                     if ((result->transportHandle = result->IoTHubTransport_Create(&lowerLayerConfig)) == NULL)
@@ -414,6 +432,7 @@
                         deviceConfig.deviceKey = config->deviceKey;
                         deviceConfig.deviceSasToken = config->deviceSasToken;
                         deviceConfig.authorization_module = result->authorization_module;
+                        deviceConfig.moduleId = module_id;
 
                         /*Codes_SRS_IOTHUBCLIENT_LL_17_008: [IoTHubClientCore_LL_Create shall call the transport _Register function with a populated structure of type IOTHUB_DEVICE_CONFIG and waitingToSend list.] */
                         if ((result->deviceHandle = result->IoTHubTransport_Register(result->transportHandle, &deviceConfig, result, &(result->waitingToSend))) == NULL)
@@ -522,6 +541,31 @@
     return result;
 }
 
+static void delete_event(IOTHUB_EVENT_CALLBACK* event_callback)
+{
+    STRING_delete(event_callback->inputName);
+    free(event_callback->userContextCallbackEx);
+    free(event_callback);
+}
+
+static void delete_event_callback(const void* item, const void* action_context, bool* continue_processing)
+{
+    (void)action_context;
+    delete_event((IOTHUB_EVENT_CALLBACK*)item);
+    *continue_processing = true;
+}
+
+static void delete_event_callback_list(IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData)
+{
+    if (handleData->event_callbacks != NULL)
+    {
+        singlylinkedlist_foreach(handleData->event_callbacks, delete_event_callback, NULL);
+        singlylinkedlist_destroy(handleData->event_callbacks);
+        handleData->event_callbacks = NULL;
+    }
+}
+
+
 IOTHUB_CLIENT_CORE_LL_HANDLE IoTHubClientCore_LL_CreateFromDeviceAuth(const char* iothub_uri, const char* device_id, IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol)
 {
     IOTHUB_CLIENT_CORE_LL_HANDLE result;
@@ -598,7 +642,7 @@
             }
             else
             {
-                IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = initialize_iothub_client(config, NULL, true);
+                IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = initialize_iothub_client(config, NULL, true, NULL);
                 if (handleData == NULL)
                 {
                     LogError("initialize iothub client");
@@ -659,14 +703,10 @@
             STRING_HANDLE deviceKeyString = NULL;
             STRING_HANDLE deviceSasTokenString = NULL;
             STRING_HANDLE protocolGateway = NULL;
-
-            config->protocol = protocol;
+            STRING_HANDLE moduleId = NULL;
 
-            config->iotHubName = NULL;
-            config->iotHubSuffix = NULL;
-            config->deviceId = NULL;
-            config->deviceKey = NULL;
-            config->deviceSasToken = NULL;
+            memset(config, 0, sizeof(*config));
+            config->protocol = protocol;
 
             /* Codes_SRS_IOTHUBCLIENT_LL_04_002: [If it does not, it shall pass the protocolGatewayHostName NULL.] */
             config->protocolGatewayHostName = NULL;
@@ -820,7 +860,7 @@
                             }
 
                             /* Codes_SRS_IOTHUBCLIENT_LL_04_001: [IoTHubClientCore_LL_CreateFromConnectionString shall verify the existence of key/value pair GatewayHostName. If it does exist it shall pass the value to IoTHubClientCore_LL_Create API.] */
-                            else if (strcmp(s_token, PROTOCOL_GATEWAY_HOST) == 0)
+                            else if (strcmp(s_token, PROTOCOL_GATEWAY_HOST_TOKEN) == 0)
                             {
                                 protocolGateway = STRING_clone(valueString);
                                 if (protocolGateway != NULL)
@@ -833,6 +873,16 @@
                                     break;
                                 }
                             }
+                            /*Codes_SRS_IOTHUBCLIENT_LL_31_126: [IoTHubClient_LL_CreateFromConnectionString shall optionally parse ModuleId, if present.] */
+                            else if (strcmp(s_token, MODULE_ID_TOKEN) == 0)
+                            {
+                                moduleId = STRING_clone(valueString);
+                                if (moduleId == NULL)
+                                {
+                                    LogError("Failure cloning moduleId string");
+                                    break;
+                                }
+                            }
                         }
                     }
                 }
@@ -863,7 +913,7 @@
                 else
                 {
                     /* Codes_SRS_IOTHUBCLIENT_LL_12_011: [IoTHubClientCore_LL_CreateFromConnectionString shall call into the IoTHubClientCore_LL_Create API with the current structure and returns with the return value of it] */
-                    result = initialize_iothub_client(config, NULL, use_provisioning);
+                    result = initialize_iothub_client(config, NULL, use_provisioning, STRING_c_str(moduleId));
                     if (result == NULL)
                     {
                         LogError("IoTHubClientCore_LL_Create failed");
@@ -892,6 +942,8 @@
                 STRING_delete(connString);
             if (protocolGateway != NULL)
                 STRING_delete(protocolGateway);
+            if (moduleId != NULL)
+                STRING_delete(moduleId);
 
             if (tokenizer1 != NULL)
                 STRING_TOKENIZER_destroy(tokenizer1);
@@ -902,7 +954,7 @@
     return result;
 }
 
-IOTHUB_CLIENT_CORE_LL_HANDLE IoTHubClientCore_LL_Create(const IOTHUB_CLIENT_CONFIG* config)
+IOTHUB_CLIENT_CORE_LL_HANDLE IoTHubClientCore_LL_CreateImpl(const IOTHUB_CLIENT_CONFIG* config, const char* module_id, bool use_dev_auth)
 {
     IOTHUB_CLIENT_CORE_LL_HANDLE result;
     /*Codes_SRS_IOTHUBCLIENT_LL_02_001: [IoTHubClientCore_LL_Create shall return NULL if config parameter is NULL or protocol field is NULL.]*/
@@ -916,7 +968,7 @@
     }
     else
     {
-        IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = initialize_iothub_client(config, NULL, false);
+        IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = initialize_iothub_client(config, NULL, use_dev_auth, module_id);
         if (handleData == NULL)
         {
             LogError("initialize iothub client");
@@ -931,6 +983,48 @@
     return result;
 }
 
+IOTHUB_CLIENT_CORE_LL_HANDLE IoTHubClientCore_LL_Create(const IOTHUB_CLIENT_CONFIG* config)
+{
+    return IoTHubClientCore_LL_CreateImpl(config, NULL, false);
+}
+
+#ifdef USE_EDGE_MODULES
+IOTHUB_CLIENT_CORE_LL_HANDLE IoTHubClientCore_LL_CreateFromEnvironment(const IOTHUB_CLIENT_CONFIG* config, const char* module_id)
+{
+    IOTHUB_CLIENT_CORE_LL_HANDLE result;
+
+    if (module_id == NULL)
+    {
+        LogError("module_id cannot be NULL");
+        result = NULL;
+    }
+    else if ((result = IoTHubClientCore_LL_CreateImpl(config, module_id, true)) != NULL)
+    {
+        // Because the Edge Hub almost always use self-signed certificates, we need
+        // to query it for the the certificate its using so we can trust it.
+        char* trustedCertificate = IoTHubClient_Auth_Get_TrustBundle(result->authorization_module);
+        IOTHUB_CLIENT_RESULT setTrustResult;
+
+        if (trustedCertificate == NULL)
+        {
+            LogError("IoTHubClient_Auth_Get_TrustBundle failed");
+            IoTHubClientCore_LL_Destroy(result);
+            result = NULL;
+        }
+        else if ((setTrustResult = IoTHubClientCore_LL_SetOption(result, OPTION_TRUSTED_CERT, trustedCertificate)) != IOTHUB_CLIENT_OK) 
+        {
+            LogError("IoTHubClientCore_LL_SetOption failed, err = %d", setTrustResult);
+            IoTHubClientCore_LL_Destroy(result);
+            result = NULL;
+        }
+
+        free(trustedCertificate);
+    }
+    return result;
+}
+#endif
+
+
 IOTHUB_CLIENT_CORE_LL_HANDLE IoTHubClientCore_LL_CreateWithTransport(const IOTHUB_CLIENT_DEVICE_CONFIG * config)
 {
     IOTHUB_CLIENT_CORE_LL_HANDLE result;
@@ -948,7 +1042,7 @@
     }
     else
     {
-        result = initialize_iothub_client(NULL, config, false);
+        result = initialize_iothub_client(NULL, config, false, NULL);
     }
     return result;
 }
@@ -992,6 +1086,9 @@
             device_twin_data_destroy(temp);
         }
 
+        /* Codes_SRS_IOTHUBCLIENT_LL_31_141: [ IoTHubClient_LL_Destroy shall iterate registered callbacks for input queues and destroy any remaining items. ] */
+        delete_event_callback_list(handleData);
+
         /*Codes_SRS_IOTHUBCLIENT_LL_17_011: [IoTHubClientCore_LL_Destroy  shall free the resources allocated by IoTHubClient (if any).] */
         IoTHubClient_Auth_Destroy(handleData->authorization_module);
         tickcounter_destroy(handleData->tickCounter);
@@ -1496,70 +1593,191 @@
     }
 }
 
+static bool invoke_message_callback(IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData, MESSAGE_CALLBACK_INFO* messageData)
+{
+    bool result;
+    /* Codes_SRS_IOTHUBCLIENT_LL_09_004: [IoTHubClient_LL_GetLastMessageReceiveTime shall return lastMessageReceiveTime in localtime] */
+    handleData->lastMessageReceiveTime = get_time(NULL);
+    
+    switch (handleData->messageCallback.type)
+    {
+        case CALLBACK_TYPE_NONE:
+        {
+            /*Codes_SRS_IOTHUBCLIENT_LL_02_032: [If the client is not subscribed to receive messages then IoTHubClient_LL_MessageCallback shall return false.] */
+            LogError("Invalid workflow - not currently set up to accept messages");
+            result = false;
+            break;
+        }
+        case CALLBACK_TYPE_SYNC:
+        {
+            /*Codes_SRS_IOTHUBCLIENT_LL_02_030: [If messageCallbackType is LEGACY then IoTHubClient_LL_MessageCallback shall invoke the last callback function (the parameter messageCallback to IoTHubClient_LL_SetMessageCallback) passing the message and the passed userContextCallback.]*/
+            IOTHUBMESSAGE_DISPOSITION_RESULT cb_result = handleData->messageCallback.callbackSync(messageData->messageHandle, handleData->messageCallback.userContextCallback);
+
+            /*Codes_SRS_IOTHUBCLIENT_LL_10_007: [If messageCallbackType is LEGACY then IoTHubClient_LL_MessageCallback shall send the message disposition as returned by the client to the underlying layer.] */
+            if (handleData->IoTHubTransport_SendMessageDisposition(messageData, cb_result) != IOTHUB_CLIENT_OK)
+            {
+                LogError("IoTHubTransport_SendMessageDisposition failed");
+            }
+            result = true;
+            break;
+        }
+        case CALLBACK_TYPE_ASYNC:
+        {
+            /* Codes_SRS_IOTHUBCLIENT_LL_10_009: [If messageCallbackType is ASYNC then IoTHubClient_LL_MessageCallback shall return what messageCallbacEx returns.] */
+            result = handleData->messageCallback.callbackAsync(messageData, handleData->messageCallback.userContextCallback);
+            if (!result)
+            {
+                LogError("messageCallbackEx failed");
+            }
+            break;
+        }
+        default:
+        {
+            LogError("Invalid state");
+            result = false;
+            break;
+        }
+    }
+
+    return result;
+}
+
 bool IoTHubClientCore_LL_MessageCallback(IOTHUB_CLIENT_CORE_LL_HANDLE handle, MESSAGE_CALLBACK_INFO* messageData)
 {
     bool result;
     if ((handle == NULL) || messageData == NULL)
     {
-        /*Codes_SRS_IOTHUBCLIENT_LL_02_029: [If parameter handle is NULL then IoTHubClientCore_LL_MessageCallback shall return IOTHUBMESSAGE_ABANDONED.] */
+        /*Codes_SRS_IOTHUBCLIENT_LL_02_029: [If parameter handle is NULL then IoTHubClient_LL_MessageCallback shall return IOTHUBMESSAGE_ABANDONED.] */
         LogError("invalid argument: handle(%p), messageData(%p)", handle, messageData);
         result = false;
     }
     else if (messageData->messageHandle == NULL)
     {
-        /*Codes_SRS_IOTHUBCLIENT_LL_10_004: [If messageHandle field of paramger messageData is NULL then IoTHubClientCore_LL_MessageCallback shall return IOTHUBMESSAGE_ABANDONED.] */
+        /*Codes_SRS_IOTHUBCLIENT_LL_10_004: [If messageHandle field of paramger messageData is NULL then IoTHubClient_LL_MessageCallback shall return IOTHUBMESSAGE_ABANDONED.] */
         LogError("invalid argument messageData->messageHandle(NULL)");
         result = false;
     }
     else
     {
         IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = (IOTHUB_CLIENT_CORE_LL_HANDLE_DATA*)handle;
+        return invoke_message_callback(handleData, messageData);
+    }
+    return result;
+}
 
-        /* Codes_SRS_IOTHUBCLIENT_LL_09_004: [IoTHubClientCore_LL_GetLastMessageReceiveTime shall return lastMessageReceiveTime in localtime] */
-        handleData->lastMessageReceiveTime = get_time(NULL);
-        switch (handleData->messageCallback.type)
+static bool is_event_equal(IOTHUB_EVENT_CALLBACK *event_callback, const char *input_name)
+{
+    bool result;
+    
+    if (event_callback != NULL)
+    {
+        const char* event_input_name = STRING_c_str(event_callback->inputName);
+        if ((event_input_name != NULL) && (input_name != NULL))
+        {
+            // Matched the input queue name of a named handler
+            result = (strcmp(event_input_name, input_name) == 0);
+        }
+        else if ((input_name == NULL) && (event_input_name == NULL))
+        {
+            // Matched the default handler
+            result = true;
+        }
+        else
         {
-            case CALLBACK_TYPE_NONE:
+            result = false;
+        }
+    }
+    else
+    {
+        result = false;
+    }
+    return result;
+}
+
+static bool is_event_equal_for_match(LIST_ITEM_HANDLE list_item, const void* match_context)
+{
+    return is_event_equal((IOTHUB_EVENT_CALLBACK*)singlylinkedlist_item_get_value(list_item), (const char*)match_context);
+}
+
+bool IoTHubClientCore_LL_MessageCallbackFromInput(IOTHUB_CLIENT_CORE_LL_HANDLE handle, MESSAGE_CALLBACK_INFO* messageData)
+{
+    bool result;
+    IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = (IOTHUB_CLIENT_CORE_LL_HANDLE_DATA*)handle;
+
+    if ((handle == NULL) || messageData == NULL)
+    {
+        // Codes_SRS_IOTHUBCLIENT_LL_31_137: [ If either parameter `handle` or `messageData` is `NULL` then `IoTHubClient_LL_MessageCallbackFromInput` shall return `false`.** ]
+        LogError("invalid argument: handle(%p), messageData(%p)", handle, messageData);
+        result = false;
+    }
+    else if (messageData->messageHandle == NULL)
+    {
+        // Codes_SRS_IOTHUBCLIENT_LL_31_137: [ If either parameter `handle` or `messageData` is `NULL` then `IoTHubClient_LL_MessageCallbackFromInput` shall return `false`.** ]
+        LogError("invalid argument messageData->messageHandle(NULL)");
+        result = false;
+    }
+    else if (handleData->event_callbacks == NULL)
+    {
+        LogError("Callback from input called but no input specific callbacks registered");
+        result = false;
+    }
+    else
+    {
+        const char* inputName = IoTHubMessage_GetInputName(messageData->messageHandle);
+  
+        LIST_ITEM_HANDLE item_handle = NULL;
+    
+        item_handle = singlylinkedlist_find(handleData->event_callbacks, is_event_equal_for_match, (const void*)inputName);
+
+        if (item_handle == NULL)
+        {
+            // Codes_SRS_IOTHUBCLIENT_LL_31_138: [ If there is no registered handler for the inputName from `IoTHubMessage_GetInputName`, then `IoTHubClient_LL_MessageCallbackFromInput` shall attempt invoke the default handler handler.** ]
+            item_handle = singlylinkedlist_find(handleData->event_callbacks, is_event_equal_for_match, NULL);
+        }
+
+        if (item_handle == NULL)
+        {
+            LogError("Could not find callback (explicit or default) for input queue %s", inputName);
+            result = false;
+        }
+        else
+        {
+            IOTHUB_EVENT_CALLBACK* event_callback = (IOTHUB_EVENT_CALLBACK*)singlylinkedlist_item_get_value(item_handle);
+            if (NULL == event_callback)
             {
-                /*Codes_SRS_IOTHUBCLIENT_LL_02_032: [If the client is not subscribed to receive messages then IoTHubClientCore_LL_MessageCallback shall return false.] */
-                LogError("Invalid workflow - not currently set up to accept messages");
+                LogError("singlylinkedlist_item_get_value for event_callback failed");
                 result = false;
-                break;
             }
-            case CALLBACK_TYPE_SYNC:
+            else
             {
-                /*Codes_SRS_IOTHUBCLIENT_LL_02_030: [If messageCallbackType is LEGACY then IoTHubClientCore_LL_MessageCallback shall invoke the last callback function (the parameter messageCallback to IoTHubClientCore_LL_SetMessageCallback) passing the message and the passed userContextCallback.]*/
-                IOTHUBMESSAGE_DISPOSITION_RESULT cb_result = handleData->messageCallback.callbackSync(messageData->messageHandle, handleData->messageCallback.userContextCallback);
+                // Codes_SRS_IOTHUBCLIENT_LL_09_004: [IoTHubClient_LL_GetLastMessageReceiveTime shall return lastMessageReceiveTime in localtime]
+                handleData->lastMessageReceiveTime = get_time(NULL);
 
-                /*Codes_SRS_IOTHUBCLIENT_LL_10_007: [If messageCallbackType is LEGACY then IoTHubClientCore_LL_MessageCallback shall send the message disposition as returned by the client to the underlying layer.] */
-                if (handleData->IoTHubTransport_SendMessageDisposition(messageData, cb_result) != IOTHUB_CLIENT_OK)
+                if (event_callback->callbackAsyncEx != NULL)
                 {
-                    LogError("IoTHubTransport_SendMessageDisposition failed");
+                    // Codes_SRS_IOTHUBCLIENT_LL_31_139: [ `IoTHubClient_LL_MessageCallbackFromInput` shall the callback from the given inputName queue if it has been registered.** ]
+                    result = event_callback->callbackAsyncEx(messageData, event_callback->userContextCallbackEx);
                 }
-                result = true;
-                break;
-            }
-            case CALLBACK_TYPE_ASYNC:
-            {
-                /* Codes_SRS_IOTHUBCLIENT_LL_10_009: [If messageCallbackType is ASYNC then IoTHubClientCore_LL_MessageCallback shall return what messageCallbacEx returns.] */
-                result = handleData->messageCallback.callbackAsync(messageData, handleData->messageCallback.userContextCallback);
-                if (!result)
+                else
                 {
-                    LogError("messageCallbackEx failed");
+                    // Codes_SRS_IOTHUBCLIENT_LL_31_139: [ `IoTHubClient_LL_MessageCallbackFromInput` shall the callback from the given inputName queue if it has been registered.** ]
+                    IOTHUBMESSAGE_DISPOSITION_RESULT cb_result = event_callback->callbackAsync(messageData->messageHandle, event_callback->userContextCallback);
+                    
+                    // Codes_SRS_IOTHUBCLIENT_LL_31_140: [ `IoTHubClient_LL_MessageCallbackFromInput` shall send the message disposition as returned by the client to the underlying layer and return `true` if an input queue match is found.** ]
+                    if (handleData->IoTHubTransport_SendMessageDisposition(messageData, cb_result) != IOTHUB_CLIENT_OK)
+                    {
+                        LogError("IoTHubTransport_SendMessageDisposition failed");
+                    }
+                    result = true;
                 }
-                break;
-            }
-            default:
-            {
-                LogError("Invalid state");
-                result = false;
-                break;
             }
         }
     }
+
     return result;
 }
 
+
 void IoTHubClientCore_LL_ConnectionStatusCallBack(IOTHUB_CLIENT_CORE_LL_HANDLE handle, IOTHUB_CLIENT_CONNECTION_STATUS status, IOTHUB_CLIENT_CONNECTION_STATUS_REASON reason)
 {
     /*Codes_SRS_IOTHUBCLIENT_LL_25_113: [If parameter connectionStatus is NULL or parameter handle is NULL then IoTHubClientCore_LL_ConnectionStatusCallBack shall return.]*/
@@ -2160,6 +2378,223 @@
     return result;
 }
 
+IOTHUB_CLIENT_RESULT IoTHubClientCore_LL_SendEventToOutputAsync(IOTHUB_CLIENT_CORE_LL_HANDLE iotHubClientHandle, IOTHUB_MESSAGE_HANDLE eventMessageHandle, const char* outputName, IOTHUB_CLIENT_EVENT_CONFIRMATION_CALLBACK eventConfirmationCallback, void* userContextCallback)
+{
+    IOTHUB_CLIENT_RESULT result;
+
+    if ((iotHubClientHandle == NULL) || (outputName == NULL) || (eventMessageHandle == NULL) || ((eventConfirmationCallback == NULL) && (userContextCallback != NULL)))
+    {
+        // Codes_SRS_IOTHUBCLIENT_LL_31_127: [ If `iotHubClientHandle`, `outputName`, or `eventConfirmationCallback` is `NULL`, `IoTHubClient_LL_SendEventToOutputAsync` shall return `IOTHUB_CLIENT_INVALID_ARG`. ]
+        LogError("Invalid argument (iotHubClientHandle=%p, outputName=%p, eventMessageHandle=%p)", iotHubClientHandle, outputName, eventMessageHandle);
+        result = IOTHUB_CLIENT_INVALID_ARG;
+    }
+    else
+    {
+        // Codes_SRS_IOTHUBCLIENT_LL_31_128: [ `IoTHubClient_LL_SendEventToOutputAsync` shall set the outputName of the message to send. ]
+        if (IoTHubMessage_SetOutputName(eventMessageHandle, outputName) != IOTHUB_MESSAGE_OK)
+        {
+            LogError("IoTHubMessage_SetOutputName failed");
+            result = IOTHUB_CLIENT_ERROR;
+        }
+        // Codes_SRS_IOTHUBCLIENT_LL_31_129: [ `IoTHubClient_LL_SendEventToOutputAsync` shall invoke `IoTHubClient_LL_SendEventAsync` to send the message. ]
+        else if ((result = IoTHubClientCore_LL_SendEventAsync(iotHubClientHandle, eventMessageHandle, eventConfirmationCallback, userContextCallback)) != IOTHUB_CLIENT_OK)
+        {
+            LogError("Call into IoTHubClient_LL_SendEventAsync failed, result=%d", result);
+        }
+    }
+
+    return result;
+}
+
+
+static IOTHUB_CLIENT_RESULT create_event_handler_callback(IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData, const char* inputName, IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC callbackSync, IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC_EX callbackSyncEx, void* userContextCallback, void* userContextCallbackEx, size_t userContextCallbackExLength)
+{
+    IOTHUB_CLIENT_RESULT result = IOTHUB_CLIENT_ERROR;
+    bool add_to_list = false;
+
+    if ((handleData->event_callbacks == NULL) && ((handleData->event_callbacks = singlylinkedlist_create()) == NULL))
+    {
+        LogError("Could not allocate linked list for callbacks");
+        result = IOTHUB_CLIENT_ERROR;
+    }
+    else
+    {
+        IOTHUB_EVENT_CALLBACK* event_callback = NULL;
+        LIST_ITEM_HANDLE item_handle = singlylinkedlist_find(handleData->event_callbacks, is_event_equal_for_match, (const void*)inputName);
+        if (item_handle == NULL)
+        {
+            // Codes_SRS_IOTHUBCLIENT_LL_31_134: [ `IoTHubClient_LL_SetInputMessageCallback` shall allocate a callback handle to associate callbacks from the transport => client if `inputName` isn't already present in the callback list. ]
+            event_callback = (IOTHUB_EVENT_CALLBACK*)malloc(sizeof(IOTHUB_EVENT_CALLBACK));
+            if (event_callback == NULL)
+            {
+                LogError("Could not allocate IOTHUB_EVENT_CALLBACK");
+                result = IOTHUB_CLIENT_ERROR;
+            }
+            else
+            {
+                memset(event_callback, 0, sizeof(*event_callback));
+                add_to_list = true;
+            }
+        }
+        else
+        {
+            // Codes_SRS_IOTHUBCLIENT_LL_31_135: [ `IoTHubClient_LL_SetInputMessageCallback` shall reuse the existing callback handle if `inputName` is already present in the callback list. ]
+            event_callback = (IOTHUB_EVENT_CALLBACK*)singlylinkedlist_item_get_value(item_handle);
+            if (event_callback == NULL)
+            {
+                LogError("singlylinkedlist_item_get_value failed looking up event callback");
+            }
+        }
+
+        if (event_callback != NULL)
+        {
+            if ((inputName != NULL) && (event_callback->inputName == NULL))
+            {
+                event_callback->inputName = STRING_construct(inputName);
+            }
+
+            if ((inputName == NULL) || (event_callback->inputName != NULL))
+            {
+                event_callback->callbackAsync = callbackSync;
+                event_callback->callbackAsyncEx = callbackSyncEx;
+
+                free(event_callback->userContextCallbackEx);
+                event_callback->userContextCallbackEx = NULL;
+
+                if (userContextCallbackEx == NULL)
+                {
+                    event_callback->userContextCallback = userContextCallback;
+                }
+
+                if ((userContextCallbackEx != NULL) && 
+                    (NULL == (event_callback->userContextCallbackEx = malloc(userContextCallbackExLength))))
+                {
+                    LogError("Unable to allocate userContextCallback");
+                    delete_event(event_callback);
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                else if ((add_to_list == true) && (NULL == singlylinkedlist_add(handleData->event_callbacks, event_callback)))
+                {
+                    LogError("Unable to add eventCallback to list");
+                    delete_event(event_callback);
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                else
+                {
+                    if (userContextCallbackEx != NULL)
+                    {
+                        // Codes_SRS_IOTHUBCLIENT_LL_31_141: [`IoTHubClient_LL_SetInputMessageCallbackEx` shall copy the data passed in extended context. ]
+                        memcpy(event_callback->userContextCallbackEx, userContextCallbackEx, userContextCallbackExLength);
+                    }
+                    result = IOTHUB_CLIENT_OK;
+                }
+            }
+            else
+            {
+                delete_event(event_callback);
+                result = IOTHUB_CLIENT_ERROR;
+            }
+        }
+    }
+
+    return result;
+}
+
+static IOTHUB_CLIENT_RESULT remove_event_unsubscribe_if_needed(IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData, const char* inputName)
+{
+    IOTHUB_CLIENT_RESULT result;
+
+    LIST_ITEM_HANDLE item_handle = singlylinkedlist_find(handleData->event_callbacks, is_event_equal_for_match, (const void*)inputName);
+    if (item_handle == NULL)
+    {
+        // Codes_SRS_IOTHUBCLIENT_LL_31_132: [ If `eventHandlerCallback` is NULL, `IoTHubClient_LL_SetInputMessageCallback` shall return `IOTHUB_CLIENT_ERROR` if the `inputName` is not present. ]
+        LogError("Input name %s was not present", inputName);
+        result = IOTHUB_CLIENT_ERROR;    
+    }
+    else
+    {
+        IOTHUB_EVENT_CALLBACK* event_callback = (IOTHUB_EVENT_CALLBACK*)singlylinkedlist_item_get_value(item_handle);
+        if (event_callback == NULL)
+        {
+            LogError("singlylinkedlist_item_get_value failed");
+            result = IOTHUB_CLIENT_ERROR;
+        }
+        else
+        {
+            delete_event(event_callback);
+            // Codes_SRS_IOTHUBCLIENT_LL_31_131: [ If `eventHandlerCallback` is NULL, `IoTHubClient_LL_SetInputMessageCallback` shall remove the `inputName` from its callback list if present. ]
+            if (singlylinkedlist_remove(handleData->event_callbacks, item_handle) != 0)
+            {
+                LogError("singlylinkedlist_remove failed");
+                result = IOTHUB_CLIENT_ERROR;
+            }
+            else
+            {
+                if (singlylinkedlist_get_head_item(handleData->event_callbacks) == NULL)
+                {
+                    // Codes_SRS_IOTHUBCLIENT_LL_31_133: [ If `eventHandlerCallback` is NULL, `IoTHubClient_LL_SetInputMessageCallback` shall invoke `IoTHubTransport_Unsubscribe_InputQueue` if this was the last input callback. ]
+                    handleData->IoTHubTransport_Unsubscribe_InputQueue(handleData);
+                }
+                result = IOTHUB_CLIENT_OK;
+            }
+        }
+    }
+
+    return result;
+}
+
+
+IOTHUB_CLIENT_RESULT IoTHubClientCore_LL_SetInputMessageCallbackImpl(IOTHUB_CLIENT_CORE_LL_HANDLE iotHubClientHandle, const char* inputName, IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC eventHandlerCallback, IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC_EX eventHandlerCallbackEx, void *userContextCallback, void *userContextCallbackEx, size_t userContextCallbackExLength)
+{
+    IOTHUB_CLIENT_RESULT result;
+
+    if (iotHubClientHandle == NULL)
+    {
+        // Codes_SRS_IOTHUBCLIENT_LL_31_130: [ If `iotHubClientHandle` or `inputName` is NULL, `IoTHubClient_LL_SetInputMessageCallback` shall return IOTHUB_CLIENT_INVALID_ARG. ]
+        LogError("Invalid argument - iotHubClientHandle=%p, inputName=%p", iotHubClientHandle, inputName);
+        result = IOTHUB_CLIENT_INVALID_ARG;
+    }
+    else
+    {
+        IOTHUB_CLIENT_CORE_LL_HANDLE_DATA* handleData = (IOTHUB_CLIENT_CORE_LL_HANDLE_DATA*)iotHubClientHandle;
+        if ((eventHandlerCallback == NULL) && (eventHandlerCallbackEx == NULL))
+        {
+            result = (IOTHUB_CLIENT_RESULT)remove_event_unsubscribe_if_needed(handleData, inputName);
+        }
+        else
+        {
+            bool registered_with_transport_handler = (handleData->event_callbacks != NULL) && (singlylinkedlist_get_head_item(handleData->event_callbacks) != NULL);
+            if ((result = (IOTHUB_CLIENT_RESULT)create_event_handler_callback(handleData, inputName, eventHandlerCallback, eventHandlerCallbackEx, userContextCallback, userContextCallbackEx, userContextCallbackExLength)) != IOTHUB_CLIENT_OK)
+            {
+                LogError("create_event_handler_callback call failed, error = %d", result);
+            }
+            // Codes_SRS_IOTHUBCLIENT_LL_31_136: [ `IoTHubClient_LL_SetInputMessageCallback` shall invoke `IoTHubTransport_Subscribe_InputQueue` if this is the first callback being registered. ]
+            else if (!registered_with_transport_handler && (handleData->IoTHubTransport_Subscribe_InputQueue(handleData->deviceHandle) != 0))
+            {
+                LogError("IoTHubTransport_Subscribe_InputQueue failed");
+                delete_event_callback_list(handleData);
+                result = IOTHUB_CLIENT_ERROR;
+            }
+            else
+            {
+                result = IOTHUB_CLIENT_OK;
+            }
+        }
+    }
+    return result;
+
+}
+
+IOTHUB_CLIENT_RESULT IoTHubClientCore_LL_SetInputMessageCallbackEx(IOTHUB_CLIENT_CORE_LL_HANDLE iotHubClientHandle, const char* inputName, IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC_EX eventHandlerCallbackEx, void *userContextCallbackEx, size_t userContextCallbackExLength)
+{
+    return IoTHubClientCore_LL_SetInputMessageCallbackImpl(iotHubClientHandle, inputName, NULL, eventHandlerCallbackEx, NULL, userContextCallbackEx, userContextCallbackExLength);
+}
+
+IOTHUB_CLIENT_RESULT IoTHubClientCore_LL_SetInputMessageCallback(IOTHUB_CLIENT_CORE_LL_HANDLE iotHubClientHandle, const char* inputName, IOTHUB_CLIENT_MESSAGE_CALLBACK_ASYNC eventHandlerCallback, void* userContextCallback)
+{
+    return IoTHubClientCore_LL_SetInputMessageCallbackImpl(iotHubClientHandle, inputName, eventHandlerCallback, NULL, userContextCallback, NULL, 0);
+}
+
 
 
 #endif /* DONT_USE_UPLOADTOBLOB */