Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: uamqp_messaging.c
- Revision:
- 46:c688c75b63b9
- Parent:
- 43:3da2d93bb955
- Child:
- 50:f3a92c6c6534
--- a/uamqp_messaging.c Fri Nov 17 13:56:41 2017 -0800 +++ b/uamqp_messaging.c Fri Dec 15 14:08:19 2017 -0800 @@ -219,8 +219,90 @@ return result; } +// Adds fault injection properties to an AMQP message. +static int add_fault_injection_properties(MESSAGE_HANDLE message_batch_container, const char* const* property_keys, const char* const* property_values, size_t property_count) +{ + int result; + AMQP_VALUE uamqp_map; + + if ((uamqp_map = amqpvalue_create_map()) == NULL) + { + LogError("Failed to create uAMQP map for the properties."); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + + for (size_t i = 0; result == RESULT_OK && i < property_count; i++) + { + AMQP_VALUE map_key_value = NULL; + AMQP_VALUE map_value_value = NULL; + + if ((map_key_value = amqpvalue_create_string(property_keys[i])) == NULL) + { + LogError("Failed to create uAMQP property key name."); + result = __FAILURE__; + } + else if ((map_value_value = amqpvalue_create_string(property_values[i])) == NULL) + { + LogError("Failed to create uAMQP property key value."); + result = __FAILURE__; + } + else if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0) + { + LogError("Failed to set key/value into the the uAMQP property map."); + result = __FAILURE__; + } + + if (map_key_value != NULL) + amqpvalue_destroy(map_key_value); + + if (map_value_value != NULL) + amqpvalue_destroy(map_value_value); + } + + if (result == RESULT_OK) + { + if (message_set_application_properties(message_batch_container, uamqp_map) != 0) + { + LogError("Failed to transfer the message properties to the uAMQP message."); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } + amqpvalue_destroy(uamqp_map); + } + + return result; +} + +// To test AMQP fault injection, we currently must have the error properties be specified on the batch_container +// (not one of the messages sent in this container). As the SDK layer does not support options for configuring +// this envelope (this is AMQP/batching specific), we will instead intercept fault messages and apply to the container. +static int override_fault_injection_properties_if_needed(MESSAGE_HANDLE message_batch_container, const char* const* property_keys, const char* const* property_values, size_t property_count, bool *override_for_fault_injection) +{ + int result; + + if ((property_count == 0) || (strcmp(property_keys[0], "AzIoTHub_FaultOperationType") != 0)) + { + *override_for_fault_injection = false; + result = RESULT_OK; + } + else + { + *override_for_fault_injection = true; + result = add_fault_injection_properties(message_batch_container, property_keys, property_values, property_count); + } + + return result; +} + // Codes_SRS_UAMQP_MESSAGING_31_117: [Get application message properties associated with the IOTHUB_MESSAGE_HANDLE to encode, returning the properties and their encoded length.] -static int create_application_properties_to_encode(IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length) +static int create_application_properties_to_encode(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length) { MAP_HANDLE properties_map; const char* const* property_keys; @@ -249,52 +331,56 @@ } else { - result = RESULT_OK; + bool override_for_fault_injection = false; + result = override_fault_injection_properties_if_needed(message_batch_container, property_keys, property_values, property_count, &override_for_fault_injection); - for (i = 0; i < property_count; i++) + if (override_for_fault_injection == false) { - AMQP_VALUE map_property_key; - AMQP_VALUE map_property_value; - - if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL) + for (i = 0; i < property_count; i++) { - LogError("Failed amqpvalue_create_string for key"); - result = __FAILURE__; - break; - } + AMQP_VALUE map_property_key; + AMQP_VALUE map_property_value; + + if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL) + { + LogError("Failed amqpvalue_create_string for key"); + result = __FAILURE__; + break; + } - if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL) - { - LogError("Failed amqpvalue_create_string for value"); + if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL) + { + LogError("Failed amqpvalue_create_string for value"); + amqpvalue_destroy(map_property_key); + result = __FAILURE__; + break; + } + + if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0) + { + LogError("Failed amqpvalue_set_map_value"); + amqpvalue_destroy(map_property_key); + amqpvalue_destroy(map_property_value); + result = __FAILURE__; + break; + } + amqpvalue_destroy(map_property_key); - result = __FAILURE__; - break; + amqpvalue_destroy(map_property_value); } - if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0) + if (RESULT_OK == result) { - LogError("Failed amqpvalue_set_map_value"); - amqpvalue_destroy(map_property_key); - amqpvalue_destroy(map_property_value); - result = __FAILURE__; - break; - } - - amqpvalue_destroy(map_property_key); - amqpvalue_destroy(map_property_value); - } - - if (RESULT_OK == result) - { - if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL) - { - LogError("Failed amqpvalue_create_application_properties"); - result = __FAILURE__; - } - else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0) - { - LogError("Failed amqpvalue_get_encoded_size"); - result = __FAILURE__; + if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL) + { + LogError("Failed amqpvalue_create_application_properties"); + result = __FAILURE__; + } + else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0) + { + LogError("Failed amqpvalue_get_encoded_size"); + result = __FAILURE__; + } } } } @@ -483,7 +569,7 @@ // Codes_SRS_UAMQP_MESSAGING_31_120: [Create a blob that contains AMQP encoding of IOTHUB_MESSAGE_HANDLE.] // Codes_SRS_UAMQP_MESSAGING_31_121: [Any errors during `message_create_uamqp_encoding_from_iothub_message` stop processing on this message.] -int message_create_uamqp_encoding_from_iothub_message(IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data) +int message_create_uamqp_encoding_from_iothub_message(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data) { int result; @@ -504,7 +590,7 @@ LogError("create_message_properties_to_encode() failed"); result = __FAILURE__; } - else if (create_application_properties_to_encode(message_handle, &application_properties, &application_properties_length) != RESULT_OK) + else if (create_application_properties_to_encode(message_batch_container, message_handle, &application_properties, &application_properties_length) != RESULT_OK) { LogError("create_application_properties_to_encode() failed"); result = __FAILURE__;