Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
Revision 46:c688c75b63b9, committed 2017-12-15
- Comitter:
- AzureIoTClient
- Date:
- Fri Dec 15 14:08:19 2017 -0800
- Parent:
- 45:c09fac270e60
- Child:
- 47:8a238e75a0f7
- Commit message:
- 1.1.29
Changed in this revision
--- a/iothubtransport_amqp_common.c Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_common.c Fri Dec 15 14:08:19 2017 -0800
@@ -58,14 +58,27 @@
AMQP_TRANSPORT_AUTHENTICATION_MODE_X509
} AMQP_TRANSPORT_AUTHENTICATION_MODE;
+/*
+Definition of transport states:
-#define AMQP_TRANSPORT_STATE_STRINGS \
- AMQP_TRANSPORT_STATE_NOT_CONNECTED, \
- AMQP_TRANSPORT_STATE_CONNECTING, \
- AMQP_TRANSPORT_STATE_CONNECTED, \
- AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED, \
- AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION, \
- AMQP_TRANSPORT_STATE_RECONNECTING, \
+AMQP_TRANSPORT_STATE_NOT_CONNECTED: Initial state when the transport is created.
+AMQP_TRANSPORT_STATE_CONNECTING: First connection ever.
+AMQP_TRANSPORT_STATE_CONNECTED: Transition from AMQP_TRANSPORT_STATE_CONNECTING or AMQP_TRANSPORT_STATE_RECONNECTING.
+AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED: When a failure occurred and the transport identifies a reconnection is needed.
+AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION: Transition from AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED after all prep is done (transient instances are destroyed, devices are stopped).
+AMQP_TRANSPORT_STATE_RECONNECTING: Transition from AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION.
+AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES: State reached if the maximum number/length of reconnections has been reached.
+AMQP_TRANSPORT_STATE_BEING_DESTROYED: State set if IoTHubTransport_AMQP_Common_Destroy function is invoked.
+*/
+
+#define AMQP_TRANSPORT_STATE_STRINGS \
+ AMQP_TRANSPORT_STATE_NOT_CONNECTED, \
+ AMQP_TRANSPORT_STATE_CONNECTING, \
+ AMQP_TRANSPORT_STATE_CONNECTED, \
+ AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED, \
+ AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION, \
+ AMQP_TRANSPORT_STATE_RECONNECTING, \
+ AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES, \
AMQP_TRANSPORT_STATE_BEING_DESTROYED
DEFINE_LOCAL_ENUM(AMQP_TRANSPORT_STATE, AMQP_TRANSPORT_STATE_STRINGS);
@@ -227,6 +240,17 @@
return result;
}
+static void raise_connection_status_callback_retry_expired(const void* item, const void* action_context, bool* continue_processing)
+{
+ (void)action_context;
+
+ AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)item;
+
+ IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED);
+
+ *continue_processing = true;
+}
+
// @brief
// Saves the new state, if it is different than the previous one.
static void on_device_state_changed_callback(void* context, DEVICE_STATE previous_state, DEVICE_STATE new_state)
@@ -251,7 +275,17 @@
// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [If `new_state` is DEVICE_STATE_STOPPED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK]
else if (new_state == DEVICE_STATE_STOPPED)
{
- IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+ if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_CONNECTED ||
+ registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_BEING_DESTROYED)
+ {
+ IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+ }
+ else if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED ||
+ registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION ||
+ registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTING)
+ {
+ IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK);
+ }
}
// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [If `new_state` is DEVICE_STATE_ERROR_AUTH, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL]
else if (new_state == DEVICE_STATE_ERROR_AUTH)
@@ -1493,11 +1527,15 @@
AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle;
LIST_ITEM_HANDLE list_item;
+ if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES)
+ {
+ // Nothing to be done.
+ }
// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_017: [If `instance->state` is `RECONNECTION_REQUIRED`, IoTHubTransport_AMQP_Common_DoWork shall attempt to trigger the connection-retry logic and return]
- if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED)
+ else if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED)
{
RETRY_ACTION retry_action;
-
+
// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_126: [The connection retry shall be attempted only if retry_control_should_retry() returns RETRY_ACTION_NOW, or if it fails]
if (retry_control_should_retry(transport_instance->connection_retry_control, &retry_action) != RESULT_OK)
{
@@ -1509,58 +1547,67 @@
{
prepare_for_connection_retry(transport_instance);
}
- }
- // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices]
- else if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL)
- {
- // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since
- // there is not a preferred authentication mode set yet on the transport.
+ else if (retry_action == RETRY_ACTION_STOP_RETRYING)
+ {
+ update_state(transport_instance, AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES);
- // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established]
- // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ]
- if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK)
- {
- LogError("AMQP transport failed to establish connection with service.");
-
- update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+ (void)singlylinkedlist_foreach(transport_instance->registered_devices, raise_connection_status_callback_retry_expired, NULL);
}
- // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each]
- else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED)
+ }
+ else
+ {
+ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices]
+ if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL)
{
- while (list_item != NULL)
- {
- AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
+ // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since
+ // there is not a preferred authentication mode set yet on the transport.
- if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
- {
- LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)");
- }
- else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established]
+ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ]
+ if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK)
+ {
+ LogError("AMQP transport failed to establish connection with service.");
+
+ update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+ }
+ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each]
+ else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED)
+ {
+ while (list_item != NULL)
{
- LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id));
+ AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
- update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
- }
- else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK)
- {
- // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered]
- if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+ if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
{
- LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id));
+ LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)");
+ }
+ else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+ {
+ LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id));
update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
}
- }
+ else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK)
+ {
+ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered]
+ if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+ {
+ LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id));
- list_item = singlylinkedlist_get_next_item(list_item);
+ update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+ }
+ }
+
+ list_item = singlylinkedlist_get_next_item(list_item);
+ }
}
}
- }
- // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked]
- if (transport_instance->amqp_connection != NULL)
- {
- amqp_connection_do_work(transport_instance->amqp_connection);
+ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked]
+ if (transport_instance->amqp_connection != NULL)
+ {
+ amqp_connection_do_work(transport_instance->amqp_connection);
+ }
}
}
}
@@ -2297,6 +2344,11 @@
LogInfo("Retry policy set (%d, timeout = %d)", retryPolicy, retryTimeoutLimitInSeconds);
+ if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES)
+ {
+ transport_instance->state = AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED;
+ }
+
// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_128: [If no errors occur, `IoTHubTransport_AMQP_Common_SetRetryPolicy` shall return zero.]
result = RESULT_OK;
}
--- a/iothubtransport_amqp_connection.c Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_connection.c Fri Dec 15 14:08:19 2017 -0800
@@ -29,6 +29,8 @@
SESSION_HANDLE session_handle;
XIO_HANDLE sasl_io;
SASL_MECHANISM_HANDLE sasl_mechanism;
+ bool has_cbs;
+ bool has_sasl_mechanism;
bool is_trace_on;
AMQP_CONNECTION_STATE current_state;
ON_AMQP_CONNECTION_STATE_CHANGED on_state_changed_callback;
@@ -114,12 +116,12 @@
AMQP_CONNECTION_INSTANCE* instance = (AMQP_CONNECTION_INSTANCE*)context;
// Codes_SRS_IOTHUBTRANSPORT_AMQP_CONNECTION_09_063: [If `on_connection_state_changed` is called back, `instance->on_state_changed_callback` shall be invoked, if defined]
- if (instance->cbs_handle == NULL || instance->sasl_io == NULL)
+ if (new_connection_state == CONNECTION_STATE_START)
{
// connection is using x509 authentication.
// At this point uamqp's connection only raises CONNECTION_STATE_START when using X509 auth.
// So that should be all we expect to consider the amqp_connection_handle opened.
- if (new_connection_state == CONNECTION_STATE_START)
+ if (instance->has_cbs == false || instance->has_sasl_mechanism == false)
{
update_state(instance, AMQP_CONNECTION_STATE_OPENED);
}
@@ -381,6 +383,8 @@
instance->on_state_changed_callback = config->on_state_changed_callback;
// Codes_SRS_IOTHUBTRANSPORT_AMQP_CONNECTION_09_061: [`config->on_state_changed_context` shall be saved on `instance->on_state_changed_context`]
instance->on_state_changed_context = config->on_state_changed_context;
+ instance->has_sasl_mechanism = config->create_sasl_io;
+ instance->has_cbs = config->create_cbs_connection;
instance->c2d_keep_alive_freq_secs = (uint32_t)config->c2d_keep_alive_freq_secs;
@@ -410,8 +414,6 @@
}
else
{
-
-
// Codes_SRS_IOTHUBTRANSPORT_AMQP_CONNECTION_09_034: [If no failures occur, amqp_connection_create() shall return the handle to the connection state]
result = (AMQP_CONNECTION_HANDLE)instance;
}
--- a/iothubtransport_amqp_telemetry_messenger.c Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_telemetry_messenger.c Fri Dec 15 14:08:19 2017 -0800
@@ -1104,8 +1104,8 @@
}
else if (message_set_message_format(send_pending_events_state->message_batch_container, AMQP_BATCHING_FORMAT_CODE) != 0)
{
- LogError("Failed setting the message format to batching format");
- result = __FAILURE__;
+ LogError("Failed setting the message format to batching format");
+ result = __FAILURE__;
}
else if ((send_pending_events_state->task = create_task(instance)) == NULL)
{
@@ -1214,7 +1214,7 @@
break;
}
// Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_200: [Retrieve an AMQP encoded representation of this message for later appending to main batched message. On error, invoke callback but continue send loop; this is NOT a fatal error.]
- else if (message_create_uamqp_encoding_from_iothub_message(caller_info->message->messageHandle, &body_binary_data) != RESULT_OK)
+ else if (message_create_uamqp_encoding_from_iothub_message(send_pending_events_state.message_batch_container, caller_info->message->messageHandle, &body_binary_data) != RESULT_OK)
{
// Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_201: [If message_create_uamqp_encoding_from_iothub_message fails, invoke callback with TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE]
LogError("message_create_uamqp_encoding_from_iothub_message() failed. Will continue to try to process messages, result");
--- a/uamqp_messaging.c Fri Nov 17 13:56:41 2017 -0800
+++ b/uamqp_messaging.c Fri Dec 15 14:08:19 2017 -0800
@@ -219,8 +219,90 @@
return result;
}
+// Adds fault injection properties to an AMQP message.
+static int add_fault_injection_properties(MESSAGE_HANDLE message_batch_container, const char* const* property_keys, const char* const* property_values, size_t property_count)
+{
+ int result;
+ AMQP_VALUE uamqp_map;
+
+ if ((uamqp_map = amqpvalue_create_map()) == NULL)
+ {
+ LogError("Failed to create uAMQP map for the properties.");
+ result = __FAILURE__;
+ }
+ else
+ {
+ result = RESULT_OK;
+
+ for (size_t i = 0; result == RESULT_OK && i < property_count; i++)
+ {
+ AMQP_VALUE map_key_value = NULL;
+ AMQP_VALUE map_value_value = NULL;
+
+ if ((map_key_value = amqpvalue_create_string(property_keys[i])) == NULL)
+ {
+ LogError("Failed to create uAMQP property key name.");
+ result = __FAILURE__;
+ }
+ else if ((map_value_value = amqpvalue_create_string(property_values[i])) == NULL)
+ {
+ LogError("Failed to create uAMQP property key value.");
+ result = __FAILURE__;
+ }
+ else if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0)
+ {
+ LogError("Failed to set key/value into the the uAMQP property map.");
+ result = __FAILURE__;
+ }
+
+ if (map_key_value != NULL)
+ amqpvalue_destroy(map_key_value);
+
+ if (map_value_value != NULL)
+ amqpvalue_destroy(map_value_value);
+ }
+
+ if (result == RESULT_OK)
+ {
+ if (message_set_application_properties(message_batch_container, uamqp_map) != 0)
+ {
+ LogError("Failed to transfer the message properties to the uAMQP message.");
+ result = __FAILURE__;
+ }
+ else
+ {
+ result = RESULT_OK;
+ }
+ }
+ amqpvalue_destroy(uamqp_map);
+ }
+
+ return result;
+}
+
+// To test AMQP fault injection, we currently must have the error properties be specified on the batch_container
+// (not one of the messages sent in this container). As the SDK layer does not support options for configuring
+// this envelope (this is AMQP/batching specific), we will instead intercept fault messages and apply to the container.
+static int override_fault_injection_properties_if_needed(MESSAGE_HANDLE message_batch_container, const char* const* property_keys, const char* const* property_values, size_t property_count, bool *override_for_fault_injection)
+{
+ int result;
+
+ if ((property_count == 0) || (strcmp(property_keys[0], "AzIoTHub_FaultOperationType") != 0))
+ {
+ *override_for_fault_injection = false;
+ result = RESULT_OK;
+ }
+ else
+ {
+ *override_for_fault_injection = true;
+ result = add_fault_injection_properties(message_batch_container, property_keys, property_values, property_count);
+ }
+
+ return result;
+}
+
// Codes_SRS_UAMQP_MESSAGING_31_117: [Get application message properties associated with the IOTHUB_MESSAGE_HANDLE to encode, returning the properties and their encoded length.]
-static int create_application_properties_to_encode(IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length)
+static int create_application_properties_to_encode(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE messageHandle, AMQP_VALUE *application_properties, size_t *application_properties_length)
{
MAP_HANDLE properties_map;
const char* const* property_keys;
@@ -249,52 +331,56 @@
}
else
{
- result = RESULT_OK;
+ bool override_for_fault_injection = false;
+ result = override_fault_injection_properties_if_needed(message_batch_container, property_keys, property_values, property_count, &override_for_fault_injection);
- for (i = 0; i < property_count; i++)
+ if (override_for_fault_injection == false)
{
- AMQP_VALUE map_property_key;
- AMQP_VALUE map_property_value;
-
- if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL)
+ for (i = 0; i < property_count; i++)
{
- LogError("Failed amqpvalue_create_string for key");
- result = __FAILURE__;
- break;
- }
+ AMQP_VALUE map_property_key;
+ AMQP_VALUE map_property_value;
+
+ if ((map_property_key = amqpvalue_create_string(property_keys[i])) == NULL)
+ {
+ LogError("Failed amqpvalue_create_string for key");
+ result = __FAILURE__;
+ break;
+ }
- if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL)
- {
- LogError("Failed amqpvalue_create_string for value");
+ if ((map_property_value = amqpvalue_create_string(property_values[i])) == NULL)
+ {
+ LogError("Failed amqpvalue_create_string for value");
+ amqpvalue_destroy(map_property_key);
+ result = __FAILURE__;
+ break;
+ }
+
+ if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0)
+ {
+ LogError("Failed amqpvalue_set_map_value");
+ amqpvalue_destroy(map_property_key);
+ amqpvalue_destroy(map_property_value);
+ result = __FAILURE__;
+ break;
+ }
+
amqpvalue_destroy(map_property_key);
- result = __FAILURE__;
- break;
+ amqpvalue_destroy(map_property_value);
}
- if (amqpvalue_set_map_value(uamqp_properties_map, map_property_key, map_property_value) != 0)
+ if (RESULT_OK == result)
{
- LogError("Failed amqpvalue_set_map_value");
- amqpvalue_destroy(map_property_key);
- amqpvalue_destroy(map_property_value);
- result = __FAILURE__;
- break;
- }
-
- amqpvalue_destroy(map_property_key);
- amqpvalue_destroy(map_property_value);
- }
-
- if (RESULT_OK == result)
- {
- if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL)
- {
- LogError("Failed amqpvalue_create_application_properties");
- result = __FAILURE__;
- }
- else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0)
- {
- LogError("Failed amqpvalue_get_encoded_size");
- result = __FAILURE__;
+ if ((*application_properties = amqpvalue_create_application_properties(uamqp_properties_map)) == NULL)
+ {
+ LogError("Failed amqpvalue_create_application_properties");
+ result = __FAILURE__;
+ }
+ else if (amqpvalue_get_encoded_size(*application_properties, application_properties_length) != 0)
+ {
+ LogError("Failed amqpvalue_get_encoded_size");
+ result = __FAILURE__;
+ }
}
}
}
@@ -483,7 +569,7 @@
// Codes_SRS_UAMQP_MESSAGING_31_120: [Create a blob that contains AMQP encoding of IOTHUB_MESSAGE_HANDLE.]
// Codes_SRS_UAMQP_MESSAGING_31_121: [Any errors during `message_create_uamqp_encoding_from_iothub_message` stop processing on this message.]
-int message_create_uamqp_encoding_from_iothub_message(IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data)
+int message_create_uamqp_encoding_from_iothub_message(MESSAGE_HANDLE message_batch_container, IOTHUB_MESSAGE_HANDLE message_handle, BINARY_DATA* body_binary_data)
{
int result;
@@ -504,7 +590,7 @@
LogError("create_message_properties_to_encode() failed");
result = __FAILURE__;
}
- else if (create_application_properties_to_encode(message_handle, &application_properties, &application_properties_length) != RESULT_OK)
+ else if (create_application_properties_to_encode(message_batch_container, message_handle, &application_properties, &application_properties_length) != RESULT_OK)
{
LogError("create_application_properties_to_encode() failed");
result = __FAILURE__;
--- a/uamqp_messaging.h Fri Nov 17 13:56:41 2017 -0800 +++ b/uamqp_messaging.h Fri Dec 15 14:08:19 2017 -0800 @@ -14,7 +14,7 @@ #endif MOCKABLE_FUNCTION(, int, message_create_IoTHubMessage_from_uamqp_message, MESSAGE_HANDLE, uamqp_message, IOTHUB_MESSAGE_HANDLE*, iothubclient_message); - MOCKABLE_FUNCTION(, int, message_create_uamqp_encoding_from_iothub_message, IOTHUB_MESSAGE_HANDLE, message_handle, BINARY_DATA*, body_binary_data); + MOCKABLE_FUNCTION(, int, message_create_uamqp_encoding_from_iothub_message, MESSAGE_HANDLE, message_batch_container, IOTHUB_MESSAGE_HANDLE, message_handle, BINARY_DATA*, body_binary_data); #ifdef __cplusplus }
