Azure IoT / iothub_amqp_transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

Files at this revision

API Documentation at this revision

Comitter:
AzureIoTClient
Date:
Fri Dec 15 14:08:19 2017 -0800
Parent:
45:c09fac270e60
Child:
47:8a238e75a0f7
Commit message:
1.1.29

Changed in this revision

iothubtransport_amqp_common.c Show annotated file Show diff for this revision Revisions of this file
iothubtransport_amqp_connection.c Show annotated file Show diff for this revision Revisions of this file
iothubtransport_amqp_telemetry_messenger.c Show annotated file Show diff for this revision Revisions of this file
uamqp_messaging.c Show annotated file Show diff for this revision Revisions of this file
uamqp_messaging.h Show annotated file Show diff for this revision Revisions of this file
--- a/iothubtransport_amqp_common.c	Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_common.c	Fri Dec 15 14:08:19 2017 -0800
@@ -58,14 +58,27 @@
     AMQP_TRANSPORT_AUTHENTICATION_MODE_X509
 } AMQP_TRANSPORT_AUTHENTICATION_MODE;
 
+/*
+Definition of transport states:
 
-#define AMQP_TRANSPORT_STATE_STRINGS              \
-    AMQP_TRANSPORT_STATE_NOT_CONNECTED,           \
-    AMQP_TRANSPORT_STATE_CONNECTING,              \
-    AMQP_TRANSPORT_STATE_CONNECTED,               \
-    AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED,   \
-    AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION,  \
-    AMQP_TRANSPORT_STATE_RECONNECTING,            \
+AMQP_TRANSPORT_STATE_NOT_CONNECTED:                    Initial state when the transport is created. 
+AMQP_TRANSPORT_STATE_CONNECTING:                       First connection ever.
+AMQP_TRANSPORT_STATE_CONNECTED:                        Transition from AMQP_TRANSPORT_STATE_CONNECTING or AMQP_TRANSPORT_STATE_RECONNECTING. 
+AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED:            When a failure occurred and the transport identifies a reconnection is needed.
+AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION:           Transition from AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED after all prep is done (transient instances are destroyed, devices are stopped).
+AMQP_TRANSPORT_STATE_RECONNECTING:                     Transition from AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION.
+AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES:    State reached if the maximum number/length of reconnections has been reached.
+AMQP_TRANSPORT_STATE_BEING_DESTROYED:                  State set if IoTHubTransport_AMQP_Common_Destroy function is invoked.
+*/
+
+#define AMQP_TRANSPORT_STATE_STRINGS                    \
+    AMQP_TRANSPORT_STATE_NOT_CONNECTED,                 \
+    AMQP_TRANSPORT_STATE_CONNECTING,                    \
+    AMQP_TRANSPORT_STATE_CONNECTED,                     \
+    AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED,         \
+    AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION,        \
+    AMQP_TRANSPORT_STATE_RECONNECTING,                  \
+    AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES, \
     AMQP_TRANSPORT_STATE_BEING_DESTROYED
 
 DEFINE_LOCAL_ENUM(AMQP_TRANSPORT_STATE, AMQP_TRANSPORT_STATE_STRINGS);
@@ -227,6 +240,17 @@
     return result;
 }
 
+static void raise_connection_status_callback_retry_expired(const void* item, const void* action_context, bool* continue_processing)
+{
+    (void)action_context;
+
+    AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)item;
+
+    IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED);
+
+    *continue_processing = true;
+}
+
 // @brief
 //     Saves the new state, if it is different than the previous one.
 static void on_device_state_changed_callback(void* context, DEVICE_STATE previous_state, DEVICE_STATE new_state)
@@ -251,7 +275,17 @@
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [If `new_state` is DEVICE_STATE_STOPPED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK]
         else if (new_state == DEVICE_STATE_STOPPED)
         {
-            IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+            if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_CONNECTED ||
+                registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_BEING_DESTROYED)
+            {
+                IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+            }
+            else if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED ||
+                registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION ||
+                registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTING)
+            {
+                IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK);
+            }
         }
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [If `new_state` is DEVICE_STATE_ERROR_AUTH, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL]
         else if (new_state == DEVICE_STATE_ERROR_AUTH)
@@ -1493,11 +1527,15 @@
         AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle;
         LIST_ITEM_HANDLE list_item;
 
+        if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES)
+        {
+            // Nothing to be done.
+        }
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_017: [If `instance->state` is `RECONNECTION_REQUIRED`, IoTHubTransport_AMQP_Common_DoWork shall attempt to trigger the connection-retry logic and return]
-        if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED)
+        else if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED)
         {
             RETRY_ACTION retry_action;
-        
+
             // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_126: [The connection retry shall be attempted only if retry_control_should_retry() returns RETRY_ACTION_NOW, or if it fails]
             if (retry_control_should_retry(transport_instance->connection_retry_control, &retry_action) != RESULT_OK)
             {
@@ -1509,58 +1547,67 @@
             {
                 prepare_for_connection_retry(transport_instance);
             }
-        }
-        // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices]
-        else if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL)
-        {
-            // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since
-            // there is not a preferred authentication mode set yet on the transport.
+            else if (retry_action == RETRY_ACTION_STOP_RETRYING)
+            {
+                update_state(transport_instance, AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES);
 
-            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established]
-            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ]
-            if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK)
-            {
-                LogError("AMQP transport failed to establish connection with service.");
-
-                update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+                (void)singlylinkedlist_foreach(transport_instance->registered_devices, raise_connection_status_callback_retry_expired, NULL);
             }
-            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each]
-            else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED)
+        }
+        else
+        {
+            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices]
+            if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL)
             {
-                while (list_item != NULL)
-                {
-                    AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
+                // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since
+                // there is not a preferred authentication mode set yet on the transport.
 
-                    if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
-                    {
-                        LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)");
-                    }
-                    else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established]
+                // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ]
+                if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK)
+                {
+                    LogError("AMQP transport failed to establish connection with service.");
+
+                    update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+                }
+                // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each]
+                else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED)
+                {
+                    while (list_item != NULL)
                     {
-                        LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id));
+                        AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
 
-                        update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
-                    }
-                    else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK)
-                    {
-                        // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered]
-                        if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                        if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
                         {
-                            LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id));
+                            LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)");
+                        }
+                        else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                        {
+                            LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id));
 
                             update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
                         }
-                    }
+                        else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK)
+                        {
+                            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered]
+                            if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                            {
+                                LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id));
 
-                    list_item = singlylinkedlist_get_next_item(list_item);
+                                update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+                            }
+                        }
+
+                        list_item = singlylinkedlist_get_next_item(list_item);
+                    }
                 }
             }
-        }
 
-        // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked]
-        if (transport_instance->amqp_connection != NULL)
-        {
-            amqp_connection_do_work(transport_instance->amqp_connection);
+            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked]
+            if (transport_instance->amqp_connection != NULL)
+            {
+                amqp_connection_do_work(transport_instance->amqp_connection);
+            }
         }
     }
 }
@@ -2297,6 +2344,11 @@
 
             LogInfo("Retry policy set (%d, timeout = %d)", retryPolicy, retryTimeoutLimitInSeconds);
 
+            if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES)
+            {
+                transport_instance->state = AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED;
+            }
+
             // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_128: [If no errors occur, `IoTHubTransport_AMQP_Common_SetRetryPolicy` shall return zero.]
             result = RESULT_OK;
         }
--- a/iothubtransport_amqp_connection.c	Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_connection.c	Fri Dec 15 14:08:19 2017 -0800
@@ -29,6 +29,8 @@
     SESSION_HANDLE session_handle;
     XIO_HANDLE sasl_io;
     SASL_MECHANISM_HANDLE sasl_mechanism;
+    bool has_cbs;
+    bool has_sasl_mechanism;
     bool is_trace_on;
     AMQP_CONNECTION_STATE current_state;
     ON_AMQP_CONNECTION_STATE_CHANGED on_state_changed_callback;
@@ -114,12 +116,12 @@
     AMQP_CONNECTION_INSTANCE* instance = (AMQP_CONNECTION_INSTANCE*)context;
 
     // Codes_SRS_IOTHUBTRANSPORT_AMQP_CONNECTION_09_063: [If `on_connection_state_changed` is called back, `instance->on_state_changed_callback` shall be invoked, if defined]
-    if (instance->cbs_handle == NULL || instance->sasl_io == NULL)
+    if (new_connection_state == CONNECTION_STATE_START)
     {
         // connection is using x509 authentication.
         // At this point uamqp's connection only raises CONNECTION_STATE_START when using X509 auth.
         // So that should be all we expect to consider the amqp_connection_handle opened.
-        if (new_connection_state == CONNECTION_STATE_START)
+        if (instance->has_cbs == false || instance->has_sasl_mechanism == false)
         {
             update_state(instance, AMQP_CONNECTION_STATE_OPENED);
         }
@@ -381,6 +383,8 @@
                 instance->on_state_changed_callback = config->on_state_changed_callback;
                 // Codes_SRS_IOTHUBTRANSPORT_AMQP_CONNECTION_09_061: [`config->on_state_changed_context` shall be saved on `instance->on_state_changed_context`]
                 instance->on_state_changed_context = config->on_state_changed_context;
+                instance->has_sasl_mechanism = config->create_sasl_io;
+                instance->has_cbs = config->create_cbs_connection;
 
                 instance->c2d_keep_alive_freq_secs = (uint32_t)config->c2d_keep_alive_freq_secs;
 
@@ -410,8 +414,6 @@
                 }
                 else
                 {
-                    
-
                     // Codes_SRS_IOTHUBTRANSPORT_AMQP_CONNECTION_09_034: [If no failures occur, amqp_connection_create() shall return the handle to the connection state]
                     result = (AMQP_CONNECTION_HANDLE)instance;
                 }
--- a/iothubtransport_amqp_telemetry_messenger.c	Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_telemetry_messenger.c	Fri Dec 15 14:08:19 2017 -0800
@@ -1104,8 +1104,8 @@
     }
     else if (message_set_message_format(send_pending_events_state->message_batch_container, AMQP_BATCHING_FORMAT_CODE) != 0)
     {
-        LogError("Failed setting the message format to batching format");
-        result = __FAILURE__;
+         LogError("Failed setting the message format to batching format");
+         result = __FAILURE__;
     }
     else if ((send_pending_events_state->task = create_task(instance)) == NULL)
     {
@@ -1214,7 +1214,7 @@
             break;
         }
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_200: [Retrieve an AMQP encoded representation of this message for later appending to main batched message.  On error, invoke callback but continue send loop; this is NOT a fatal error.]
-        else if (message_create_uamqp_encoding_from_iothub_message(caller_info->message->messageHandle, &body_binary_data) != RESULT_OK)
+        else if (message_create_uamqp_encoding_from_iothub_message(send_pending_events_state.message_batch_container, caller_info->message->messageHandle, &body_binary_data) != RESULT_OK)
         {
             // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_201: [If message_create_uamqp_encoding_from_iothub_message fails, invoke callback with TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE]
             LogError("message_create_uamqp_encoding_from_iothub_message() failed.  Will continue to try to process messages, result");
--- a/uamqp_messaging.c	Fri Nov 17 13:56:41 2017 -0800
+++ b/uamqp_messaging.c	Fri Dec 15 14:08:19 2017 -0800
@@ -219,8 +219,90 @@
     return result;
 }
 
+// Adds fault injection properties to an AMQP message.
+static int add_fault_injection_properties(MESSAGE_HANDLE message_batch_container, const char* const* property_keys, const char* const* property_values, size_t property_count)
+{
+    int result;
+    AMQP_VALUE uamqp_map;
+
+    if ((uamqp_map = amqpvalue_create_map()) == NULL)
+    {
+        LogError("Failed to create uAMQP map for the properties.");
+        result = __FAILURE__;
+    }
+    else
+    {
+        result = RESULT_OK;
+
+        for (size_t i = 0; result == RESULT_OK && i < property_count; i++)
+        {
+            AMQP_VALUE map_key_value = NULL;
+            AMQP_VALUE map_value_value = NULL;
+
+            if ((map_key_value = amqpvalue_create_string(property_keys[i])) == NULL)
+            {
+                LogError("Failed to create uAMQP property key name.");
+                result = __FAILURE__;
+            }
+            else if ((map_value_value = amqpvalue_create_string(property_values[i])) == NULL)
+            {
+                LogError("Failed to create uAMQP property key value.");
+                result = __FAILURE__;
+            }
+            else if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0)
+            {
+                LogError("Failed to set key/value into the the uAMQP property map.");
+                result = __FAILURE__;
+            }
+
+            if (map_key_value != NULL)
+                amqpvalue_destroy(map_key_value);
+
+            if (map_value_value != NULL)
+                amqpvalue_destroy(map_value_value);
+        }
+
+        if (result == RESULT_OK)
+        {
+            if (message_set_application_properties(message_batch_container, uamqp_map) != 0)
+            {
+                LogError("Failed to transfer the message properties to the uAMQP message.");
+                result = __FAILURE__;
+            }
+            else
+            {
+                result = RESULT_OK;
+            }
+        }
+        amqpvalue_destroy(uamqp_map);
+    }
+
+    return result;
+}
+
+// To test AMQP fault injection, we currently must have the error properties be specified on the batch_container
+// (not one of the messages sent in this container).  As the SDK layer does not support options for configuring
+// this envelope (this is AMQP/batching specific), we will instead intercept fault messages and apply to the container.
+static int override_fault_injection_properties_if_needed(MESSAGE_HANDLE message_batch_container,  const char* const* property_keys, const char* const* property_values, size_t property_count, bool *override_for_fault_injection)
+{
+    int result;
+    
+    if ((property_count == 0) || (strcmp(property_keys[0], "AzIoTHub_FaultOperationType") != 0))
+    {
+        *override_for_fault_injection = false;
+        result = RESULT_OK;
+    }
+    else
+    {
+        *override_for_fault_injection = true;
+        result = add_fault_injection_properties(message_batch_container,  property_keys, property_values, property_count);
+    }
+
+    return result;
+}
+
 // Codes_SRS_UAMQP_MESSAGING_31_117: [Get application message properties associated with the IOTHUB_MESSAGE_HANDLE to encode, returning the properties and their encoded length.]
-static int create_application_properties_to_encode(IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length)
+static int create_application_properties_to_encode(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length)
 {
     MAP_HANDLE properties_map;
     const char* const* property_keys;
@@ -249,52 +331,56 @@
         }
         else
         {
-            result = RESULT_OK;
+            bool override_for_fault_injection = false;
+            result = override_fault_injection_properties_if_needed(message_batch_container, property_keys, property_values, property_count, &override_for_fault_injection);
 
-            for (i = 0; i < property_count; i++)
+            if (override_for_fault_injection == false)
             {
-                AMQP_VALUE map_property_key;
-                AMQP_VALUE map_property_value;
-
-                if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL)
+                for (i = 0; i < property_count; i++)
                 {
-                    LogError("Failed amqpvalue_create_string for key");
-                    result = __FAILURE__;
-                    break;
-                }
+                    AMQP_VALUE map_property_key;
+                    AMQP_VALUE map_property_value;
+
+                    if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL)
+                    {
+                        LogError("Failed amqpvalue_create_string for key");
+                        result = __FAILURE__;
+                        break;
+                    }
 
-                if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL)
-                {
-                    LogError("Failed amqpvalue_create_string for value");
+                    if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL)
+                    {
+                        LogError("Failed amqpvalue_create_string for value");
+                        amqpvalue_destroy(map_property_key);
+                        result = __FAILURE__;
+                        break;
+                    }
+
+                    if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0)
+                    {
+                        LogError("Failed amqpvalue_set_map_value");
+                        amqpvalue_destroy(map_property_key);
+                        amqpvalue_destroy(map_property_value);
+                        result = __FAILURE__;
+                        break;
+                    }
+
                     amqpvalue_destroy(map_property_key);
-                    result = __FAILURE__;
-                    break;
+                    amqpvalue_destroy(map_property_value);
                 }
 
-                if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0)
+                if (RESULT_OK == result)
                 {
-                    LogError("Failed amqpvalue_set_map_value");
-                    amqpvalue_destroy(map_property_key);
-                    amqpvalue_destroy(map_property_value);
-                    result = __FAILURE__;
-                    break;
-                }
-
-                amqpvalue_destroy(map_property_key);
-                amqpvalue_destroy(map_property_value);
-            }
-
-            if (RESULT_OK == result)
-            {
-                if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL)
-                {
-                    LogError("Failed amqpvalue_create_application_properties");
-                    result = __FAILURE__;
-                }
-                else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0)
-                {
-                    LogError("Failed amqpvalue_get_encoded_size");
-                    result = __FAILURE__;
+                    if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL)
+                    {
+                        LogError("Failed amqpvalue_create_application_properties");
+                        result = __FAILURE__;
+                    }
+                    else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0)
+                    {
+                        LogError("Failed amqpvalue_get_encoded_size");
+                        result = __FAILURE__;
+                    }
                 }
             }
         }
@@ -483,7 +569,7 @@
 
 // Codes_SRS_UAMQP_MESSAGING_31_120: [Create a blob that contains AMQP encoding of IOTHUB_MESSAGE_HANDLE.]
 // Codes_SRS_UAMQP_MESSAGING_31_121: [Any errors during `message_create_uamqp_encoding_from_iothub_message` stop processing on this message.]
-int message_create_uamqp_encoding_from_iothub_message(IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data)
+int message_create_uamqp_encoding_from_iothub_message(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data)
 {
     int result;
 
@@ -504,7 +590,7 @@
         LogError("create_message_properties_to_encode() failed");
         result = __FAILURE__;
     }
-    else if (create_application_properties_to_encode(message_handle, &application_properties, &application_properties_length) != RESULT_OK)
+    else if (create_application_properties_to_encode(message_batch_container, message_handle, &application_properties, &application_properties_length) != RESULT_OK)
     {
         LogError("create_application_properties_to_encode() failed");
         result = __FAILURE__;
--- a/uamqp_messaging.h	Fri Nov 17 13:56:41 2017 -0800
+++ b/uamqp_messaging.h	Fri Dec 15 14:08:19 2017 -0800
@@ -14,7 +14,7 @@
 #endif
 
 	MOCKABLE_FUNCTION(, int, message_create_IoTHubMessage_from_uamqp_message, MESSAGE_HANDLE, uamqp_message, IOTHUB_MESSAGE_HANDLE*, iothubclient_message);
-	MOCKABLE_FUNCTION(, int, message_create_uamqp_encoding_from_iothub_message, IOTHUB_MESSAGE_HANDLE, message_handle, BINARY_DATA*, body_binary_data);
+	MOCKABLE_FUNCTION(, int, message_create_uamqp_encoding_from_iothub_message, MESSAGE_HANDLE, message_batch_container, IOTHUB_MESSAGE_HANDLE, message_handle, BINARY_DATA*, body_binary_data);
 
 #ifdef __cplusplus
 }