Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
46:c688c75b63b9
Parent:
43:3da2d93bb955
Child:
50:f3a92c6c6534
--- a/uamqp_messaging.c	Fri Nov 17 13:56:41 2017 -0800
+++ b/uamqp_messaging.c	Fri Dec 15 14:08:19 2017 -0800
@@ -219,8 +219,90 @@
     return result;
 }
 
+// Adds fault injection properties to an AMQP message.
+static int add_fault_injection_properties(MESSAGE_HANDLE message_batch_container, const char* const* property_keys, const char* const* property_values, size_t property_count)
+{
+    int result;
+    AMQP_VALUE uamqp_map;
+
+    if ((uamqp_map = amqpvalue_create_map()) == NULL)
+    {
+        LogError("Failed to create uAMQP map for the properties.");
+        result = __FAILURE__;
+    }
+    else
+    {
+        result = RESULT_OK;
+
+        for (size_t i = 0; result == RESULT_OK && i < property_count; i++)
+        {
+            AMQP_VALUE map_key_value = NULL;
+            AMQP_VALUE map_value_value = NULL;
+
+            if ((map_key_value = amqpvalue_create_string(property_keys[i])) == NULL)
+            {
+                LogError("Failed to create uAMQP property key name.");
+                result = __FAILURE__;
+            }
+            else if ((map_value_value = amqpvalue_create_string(property_values[i])) == NULL)
+            {
+                LogError("Failed to create uAMQP property key value.");
+                result = __FAILURE__;
+            }
+            else if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0)
+            {
+                LogError("Failed to set key/value into the the uAMQP property map.");
+                result = __FAILURE__;
+            }
+
+            if (map_key_value != NULL)
+                amqpvalue_destroy(map_key_value);
+
+            if (map_value_value != NULL)
+                amqpvalue_destroy(map_value_value);
+        }
+
+        if (result == RESULT_OK)
+        {
+            if (message_set_application_properties(message_batch_container, uamqp_map) != 0)
+            {
+                LogError("Failed to transfer the message properties to the uAMQP message.");
+                result = __FAILURE__;
+            }
+            else
+            {
+                result = RESULT_OK;
+            }
+        }
+        amqpvalue_destroy(uamqp_map);
+    }
+
+    return result;
+}
+
+// To test AMQP fault injection, we currently must have the error properties be specified on the batch_container
+// (not one of the messages sent in this container).  As the SDK layer does not support options for configuring
+// this envelope (this is AMQP/batching specific), we will instead intercept fault messages and apply to the container.
+static int override_fault_injection_properties_if_needed(MESSAGE_HANDLE message_batch_container,  const char* const* property_keys, const char* const* property_values, size_t property_count, bool *override_for_fault_injection)
+{
+    int result;
+    
+    if ((property_count == 0) || (strcmp(property_keys[0], "AzIoTHub_FaultOperationType") != 0))
+    {
+        *override_for_fault_injection = false;
+        result = RESULT_OK;
+    }
+    else
+    {
+        *override_for_fault_injection = true;
+        result = add_fault_injection_properties(message_batch_container,  property_keys, property_values, property_count);
+    }
+
+    return result;
+}
+
 // Codes_SRS_UAMQP_MESSAGING_31_117: [Get application message properties associated with the IOTHUB_MESSAGE_HANDLE to encode, returning the properties and their encoded length.]
-static int create_application_properties_to_encode(IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length)
+static int create_application_properties_to_encode(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length)
 {
     MAP_HANDLE properties_map;
     const char* const* property_keys;
@@ -249,52 +331,56 @@
         }
         else
         {
-            result = RESULT_OK;
+            bool override_for_fault_injection = false;
+            result = override_fault_injection_properties_if_needed(message_batch_container, property_keys, property_values, property_count, &override_for_fault_injection);
 
-            for (i = 0; i < property_count; i++)
+            if (override_for_fault_injection == false)
             {
-                AMQP_VALUE map_property_key;
-                AMQP_VALUE map_property_value;
-
-                if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL)
+                for (i = 0; i < property_count; i++)
                 {
-                    LogError("Failed amqpvalue_create_string for key");
-                    result = __FAILURE__;
-                    break;
-                }
+                    AMQP_VALUE map_property_key;
+                    AMQP_VALUE map_property_value;
+
+                    if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL)
+                    {
+                        LogError("Failed amqpvalue_create_string for key");
+                        result = __FAILURE__;
+                        break;
+                    }
 
-                if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL)
-                {
-                    LogError("Failed amqpvalue_create_string for value");
+                    if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL)
+                    {
+                        LogError("Failed amqpvalue_create_string for value");
+                        amqpvalue_destroy(map_property_key);
+                        result = __FAILURE__;
+                        break;
+                    }
+
+                    if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0)
+                    {
+                        LogError("Failed amqpvalue_set_map_value");
+                        amqpvalue_destroy(map_property_key);
+                        amqpvalue_destroy(map_property_value);
+                        result = __FAILURE__;
+                        break;
+                    }
+
                     amqpvalue_destroy(map_property_key);
-                    result = __FAILURE__;
-                    break;
+                    amqpvalue_destroy(map_property_value);
                 }
 
-                if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0)
+                if (RESULT_OK == result)
                 {
-                    LogError("Failed amqpvalue_set_map_value");
-                    amqpvalue_destroy(map_property_key);
-                    amqpvalue_destroy(map_property_value);
-                    result = __FAILURE__;
-                    break;
-                }
-
-                amqpvalue_destroy(map_property_key);
-                amqpvalue_destroy(map_property_value);
-            }
-
-            if (RESULT_OK == result)
-            {
-                if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL)
-                {
-                    LogError("Failed amqpvalue_create_application_properties");
-                    result = __FAILURE__;
-                }
-                else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0)
-                {
-                    LogError("Failed amqpvalue_get_encoded_size");
-                    result = __FAILURE__;
+                    if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL)
+                    {
+                        LogError("Failed amqpvalue_create_application_properties");
+                        result = __FAILURE__;
+                    }
+                    else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0)
+                    {
+                        LogError("Failed amqpvalue_get_encoded_size");
+                        result = __FAILURE__;
+                    }
                 }
             }
         }
@@ -483,7 +569,7 @@
 
 // Codes_SRS_UAMQP_MESSAGING_31_120: [Create a blob that contains AMQP encoding of IOTHUB_MESSAGE_HANDLE.]
 // Codes_SRS_UAMQP_MESSAGING_31_121: [Any errors during `message_create_uamqp_encoding_from_iothub_message` stop processing on this message.]
-int message_create_uamqp_encoding_from_iothub_message(IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data)
+int message_create_uamqp_encoding_from_iothub_message(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data)
 {
     int result;
 
@@ -504,7 +590,7 @@
         LogError("create_message_properties_to_encode() failed");
         result = __FAILURE__;
     }
-    else if (create_application_properties_to_encode(message_handle, &application_properties, &application_properties_length) != RESULT_OK)
+    else if (create_application_properties_to_encode(message_batch_container, message_handle, &application_properties, &application_properties_length) != RESULT_OK)
     {
         LogError("create_application_properties_to_encode() failed");
         result = __FAILURE__;