Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 23:2c6779675a01
- Parent:
- 22:8b70cf813f25
- Child:
- 24:834cf977abcf
diff -r 8b70cf813f25 -r 2c6779675a01 iothubtransportamqp.c --- a/iothubtransportamqp.c Fri Aug 26 12:58:26 2016 -0700 +++ b/iothubtransportamqp.c Fri Sep 09 13:37:13 2016 -0700 @@ -28,6 +28,7 @@ #include "azure_uamqp_c/sasl_mssbcbs.h" #include "azure_uamqp_c/saslclientio.h" +#include "uamqp_messaging.h" #include "iothub_client_ll.h" #include "iothub_client_options.h" #include "iothub_client_private.h" @@ -281,365 +282,6 @@ } } -/*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_193: [**IoTHubTransportAMQP_DoWork shall set the AMQP the message-id and correlation-id if found to the UAMQP message before passing down**]***/ -static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) -{ - int result = RESULT_OK; - const char* messageId; - const char* correlationId; - PROPERTIES_HANDLE uamqp_message_properties; - int api_call_result; - - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_194: [**Uamqp message properties shall be retrieved using message_get_properties to update message-id/Correlation-Id **]***/ - if ((api_call_result = message_get_properties(uamqp_message, &uamqp_message_properties)) != 0) - { - LogError("Failed to get properties map from uAMQP message (error code %d).", api_call_result); - result = __LINE__; - } - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_195: [**If UAMQP message properties were not present then new properties shall be created using properties_create()**]***/ - else if (uamqp_message_properties == NULL && - (uamqp_message_properties = properties_create()) == NULL) - { - LogError("Failed to create properties map for uAMQP message (error code %d).", api_call_result); - result = __LINE__; - } - else - { - /***Codes_SRS_IOTHUBTRANSPORTAMQP_25_200: [**As message - id is optional field, if it is not set by the client, processing shall ignore and continue normally**] * */ - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_196: [**Message-id from the IotHub Client shall be read using IoTHubMessage_GetMessageId()**]***/ - if ((messageId = IoTHubMessage_GetMessageId(iothub_message_handle)) != NULL) - { - AMQP_VALUE uamqp_message_id; - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_197: [**Uamqp message id shall be created using amqpvalue_create_string()**]***/ - if ((uamqp_message_id = amqpvalue_create_string(messageId)) == NULL) - { - LogError("Failed to create an AMQP_VALUE for the messageId property value."); - result = __LINE__; - } - else - { - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_198: [**Message id would be set to Uamqp using properties_set_message_id()**]***/ - if ((api_call_result = properties_set_message_id(uamqp_message_properties, uamqp_message_id)) != 0) - { - LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result); - result = __LINE__; - } - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_199: [**Uamqp value used for message id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/ - amqpvalue_destroy(uamqp_message_id); - } - } - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_205: [**As Correlation-id is optional field, if it is not set by the client, processing shall ignore and continue normally**]***/ - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_201: [**Correlation-id from the IotHub Client shall be read using IoTHubMessage_GetCorrelationId()**]***/ - if ((correlationId = IoTHubMessage_GetCorrelationId(iothub_message_handle)) != NULL) - { - AMQP_VALUE uamqp_correlation_id; - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_202: [**Uamqp value for Correlation id shall be created using amqpvalue_create_string()**]***/ - if ((uamqp_correlation_id = amqpvalue_create_string(correlationId)) == NULL) - { - LogError("Failed to create an AMQP_VALUE for the messageId property value."); - result = __LINE__; - } - else - { - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_203: [**Correlation id would be set to Uamqp using properties_set_correlation_id()**]***/ - if ((api_call_result = properties_set_correlation_id(uamqp_message_properties, uamqp_correlation_id)) != 0) - { - LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result); - result = __LINE__; - } - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_204: [**Uamqp value used for Correlation id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/ - amqpvalue_destroy(uamqp_correlation_id); - } - } - /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_206: [**Modified Uamqp properties shall be set using message_set_properties()**]***/ - if ((api_call_result = message_set_properties(uamqp_message, uamqp_message_properties)) != 0) - { - LogError("Failed to set properties map on uAMQP message (error code %d).", api_call_result); - result = __LINE__; - } - } - - properties_destroy(uamqp_message_properties); - - return result; -} - -static int addApplicationPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) -{ - int result; - MAP_HANDLE properties_map; - const char* const* propertyKeys; - const char* const* propertyValues; - size_t propertyCount; - - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_007: [The IoTHub message properties shall be obtained by calling IoTHubMessage_Properties.] */ - properties_map = IoTHubMessage_Properties(iothub_message_handle); - if (properties_map == NULL) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to get property map from IoTHub message."); - result = __LINE__; - } - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_015: [The actual keys and values, as well as the number of properties shall be obtained by calling Map_GetInternals on the handle obtained from IoTHubMessage_Properties.] */ - else if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to get the internals of the property map."); - result = __LINE__; - } - else - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_016: [If the number of properties is 0, no uAMQP map shall be created and no application properties shall be set on the uAMQP message.] */ - if (propertyCount != 0) - { - size_t i; - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_009: [The uAMQP map shall be created by calling amqpvalue_create_map.] */ - AMQP_VALUE uamqp_map = amqpvalue_create_map(); - if (uamqp_map == NULL) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP map for the properties."); - result = __LINE__; - } - else - { - for (i = 0; i < propertyCount; i++) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_010: [A key uAMQP value shall be created by using amqpvalue_create_string.] */ - AMQP_VALUE map_key_value = amqpvalue_create_string(propertyKeys[i]); - if (map_key_value == NULL) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP property key value."); - break; - } - - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_011: [A value uAMQP value shall be created by using amqpvalue_create_string.] */ - AMQP_VALUE map_value_value = amqpvalue_create_string(propertyValues[i]); - if (map_value_value == NULL) - { - amqpvalue_destroy(map_key_value); - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP property key value."); - break; - } - - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_008: [All properties shall be transferred to a uAMQP map.] */ - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_012: [The key/value pair for the property shall be set into the uAMQP property map by calling amqpvalue_map_set_value.] */ - if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0) - { - amqpvalue_destroy(map_key_value); - amqpvalue_destroy(map_value_value); - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to create uAMQP property key value."); - break; - } - - amqpvalue_destroy(map_key_value); - amqpvalue_destroy(map_value_value); - } - - if (i < propertyCount) - { - result = __LINE__; - } - else - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_013: [After all properties have been filled in the uAMQP map, the uAMQP properties map shall be set on the uAMQP message by calling message_set_application_properties.] */ - if (message_set_application_properties(uamqp_message, uamqp_map) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - LogError("Failed to transfer the message properties to the uAMQP message."); - result = __LINE__; - } - else - { - result = 0; - } - } - - amqpvalue_destroy(uamqp_map); - } - } - else - { - result = 0; - } - } - - return result; -} - -static int readPropertiesFromuAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) -{ - int return_value; - PROPERTIES_HANDLE uamqp_message_properties; - AMQP_VALUE uamqp_message_property; - const char* uamqp_message_property_value; - int api_call_result; - - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_155: [uAMQP message properties shall be retrieved using message_get_properties.] */ - if ((api_call_result = message_get_properties(uamqp_message, &uamqp_message_properties)) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_156: [If message_get_properties fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogError("Failed to get property properties map from uAMQP message (error code %d).", api_call_result); - return_value = __LINE__; - } - else - { - return_value = 0; // Properties 'message-id' and 'correlation-id' are optional according to the AMQP 1.0 spec. - - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_157: [The message-id property shall be read from the uAMQP message by calling properties_get_message_id.] */ - if ((api_call_result = properties_get_message_id(uamqp_message_properties, &uamqp_message_property)) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_158: [If properties_get_message_id fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogInfo("Failed to get value of uAMQP message 'message-id' property (%d).", api_call_result); - return_value = __LINE__; - } - else if (amqpvalue_get_type(uamqp_message_property) != AMQP_TYPE_NULL) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_159: [The message-id value shall be retrieved from the AMQP_VALUE as char* by calling amqpvalue_get_string.] */ - if ((api_call_result = amqpvalue_get_string(uamqp_message_property, &uamqp_message_property_value)) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_160: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogError("Failed to get value of uAMQP message 'message-id' property (%d).", api_call_result); - return_value = __LINE__; - } - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_161: [The message-id property shall be set on the IOTHUB_MESSAGE_HANDLE by calling IoTHubMessage_SetMessageId, passing the value read from the uAMQP message.] */ - else if (IoTHubMessage_SetMessageId(iothub_message_handle, uamqp_message_property_value) != IOTHUB_MESSAGE_OK) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_162: [If IoTHubMessage_SetMessageId fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'message-id' property."); - return_value = __LINE__; - } - } - - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_163: [The correlation-id property shall be read from the uAMQP message by calling properties_get_correlation_id.] */ - if ((api_call_result = properties_get_correlation_id(uamqp_message_properties, &uamqp_message_property)) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_164: [If properties_get_correlation_id fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogError("Failed to get value of uAMQP message 'correlation-id' property (%d).", api_call_result); - return_value = __LINE__; - } - else if (amqpvalue_get_type(uamqp_message_property) != AMQP_TYPE_NULL) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_165: [The correlation-id value shall be retrieved from the AMQP_VALUE as char* by calling amqpvalue_get_string.] */ - if ((api_call_result = amqpvalue_get_string(uamqp_message_property, &uamqp_message_property_value)) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_166: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogError("Failed to get value of uAMQP message 'correlation-id' property (%d).", api_call_result); - return_value = __LINE__; - } - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_167: [The correlation-id property shall be set on the IOTHUB_MESSAGE_HANDLE by calling IoTHubMessage_SetCorrelationId, passing the value read from the uAMQP message.] */ - else if (IoTHubMessage_SetCorrelationId(iothub_message_handle, uamqp_message_property_value) != IOTHUB_MESSAGE_OK) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_168: [If IoTHubMessage_SetCorrelationId fails, the error shall be notified and 'on_message_received' shall continue.] */ - LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlation-id' property."); - return_value = __LINE__; - } - } - properties_destroy(uamqp_message_properties); - } - - return return_value; -} - -static int readApplicationPropertiesFromuAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message) -{ - int result; - AMQP_VALUE uamqp_app_properties = NULL; - uint32_t property_count; - MAP_HANDLE iothub_message_properties_map; - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_170: [The IOTHUB_MESSAGE_HANDLE properties shall be retrieved using IoTHubMessage_Properties.] - if ((iothub_message_properties_map = IoTHubMessage_Properties(iothub_message_handle)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_186: [If IoTHubMessage_Properties fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed to get property map from IoTHub message."); - result = __LINE__; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_171: [uAMQP message application properties shall be retrieved using message_get_application_properties.] - else if ((result = message_get_application_properties(uamqp_message, &uamqp_app_properties)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_172: [If message_get_application_properties fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed reading the incoming uAMQP message properties (return code %d).", result); - result = __LINE__; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_187: [If message_get_application_properties succeeds but returns a NULL application properties map (there are no properties), 'on_message_received' shall continue normally.] - else - { - if (uamqp_app_properties == NULL) - { - result = 0; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_173: [The actual uAMQP message application properties should be extracted from the result of message_get_application_properties using amqpvalue_get_inplace_described_value.] - else if ((uamqp_app_properties = amqpvalue_get_inplace_described_value(uamqp_app_properties)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_174: [If amqpvalue_get_inplace_described_value fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed getting the map of uAMQP message application properties (return code %d).", result); - result = __LINE__; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_175: [The number of items in the uAMQP message application properties shall be obtained using amqpvalue_get_map_pair_count.] - else if ((result = amqpvalue_get_map_pair_count(uamqp_app_properties, &property_count)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_176: [If amqpvalue_get_map_pair_count fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed reading the number of values in the uAMQP property map (return code %d).", result); - result = __LINE__; - } - else - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_177: ['on_message_received' shall iterate through each uAMQP application property and add it on IOTHUB_MESSAGE_HANDLE properties.] - uint32_t i; - for (i = 0; i < property_count; i++) - { - AMQP_VALUE map_key_name = NULL; - AMQP_VALUE map_key_value = NULL; - const char *key_name; - const char* key_value; - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_178: [The uAMQP application property name and value shall be obtained using amqpvalue_get_map_key_value_pair.] - if ((result = amqpvalue_get_map_key_value_pair(uamqp_app_properties, i, &map_key_name, &map_key_value)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_179: [If amqpvalue_get_map_key_value_pair fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed reading the key/value pair from the uAMQP property map (return code %d).", result); - result = __LINE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_180: [The uAMQP application property name shall be extracted as string using amqpvalue_get_string.] - else if ((result = amqpvalue_get_string(map_key_name, &key_name)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_181: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed parsing the uAMQP property name (return code %d).", result); - result = __LINE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_182: [The uAMQP application property value shall be extracted as string using amqpvalue_get_string.] - else if ((result = amqpvalue_get_string(map_key_value, &key_value)) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_183: [If amqpvalue_get_string fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed parsing the uAMQP property value (return code %d).", result); - result = __LINE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_184: [The application property name and value shall be added to IOTHUB_MESSAGE_HANDLE properties using Map_AddOrUpdate.] - else if (Map_AddOrUpdate(iothub_message_properties_map, key_name, key_value) != MAP_OK) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_185: [If Map_AddOrUpdate fails, the error shall be notified and 'on_message_received' shall continue.] - LogError("Failed to add/update IoTHub message property map."); - result = __LINE__; - break; - } - amqpvalue_destroy(map_key_name); - amqpvalue_destroy(map_key_value); - - } - } - amqpvalue_destroy(uamqp_app_properties); - } - - return result; -} - static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result) { IOTHUB_MESSAGE_LIST* message = (IOTHUB_MESSAGE_LIST*)context; @@ -698,81 +340,44 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message) { AMQP_VALUE result = NULL; - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] + int api_call_result; IOTHUB_MESSAGE_HANDLE iothub_message = NULL; - MESSAGE_BODY_TYPE body_type; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_195: [The callback 'on_message_received' shall shall get a IOTHUB_MESSAGE_HANDLE instance out of the uamqp's MESSAGE_HANDLE instance by using IoTHubMessage_CreateFromUamqpMessage()] + if ((api_call_result = IoTHubMessage_CreateFromUamqpMessage(message, &iothub_message)) != RESULT_OK) + { + LogError("Transport failed processing the message received (error = %d).", api_call_result); - if (message_get_body_type(message, &body_type) != 0) - { - LogError("Failed to get the type of the message received by the transport."); - } - else - { - if (body_type == MESSAGE_BODY_TYPE_DATA) - { - BINARY_DATA binary_data; - if (message_get_body_amqp_data(message, 0, &binary_data) != 0) - { - LogError("Failed to get the body of the message received by the transport."); - } - else - { - iothub_message = IoTHubMessage_CreateFromByteArray(binary_data.bytes, binary_data.length); - } - } - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_196: [If IoTHubMessage_CreateFromUamqpMessage fails, the callback 'on_message_received' shall reject the incoming message by calling messaging_delivery_rejected() and return.] + result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading AMQP message"); + } + else + { + IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result; - if (iothub_message == NULL) - { - LogError("Transport failed processing the message received."); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] + disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); - result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading message body"); - } - else - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_153: [The callback 'on_message_received' shall read the message-id property from the uAMQP message and set it on the IoT Hub Message if the property is defined.] */ - /* Codes_SRS_IOTHUBTRANSPORTAMQP_09_154: [The callback 'on_message_received' shall read the correlation-id property from the uAMQP message and set it on the IoT Hub Message if the property is defined.] */ - if (readPropertiesFromuAMQPMessage(iothub_message, message) != 0) - { - LogError("Transport failed reading properties of the message received."); - } - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_169: [The callback 'on_message_received' shall read the application properties from the uAMQP message and set it on the IoT Hub Message if any are provided.] - if (readApplicationPropertiesFromuAMQPMessage(iothub_message, message) != 0) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_188: [If 'on_message_received' fails reading the application properties from the uAMQP message, it shall NOT call IoTHubClient_LL_MessageCallback and shall reject the message.] - LogError("Transport failed reading application properties of the message received."); - - result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading application properties"); - } - else - { - IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_197: [The callback 'on_message_received' shall destroy the IOTHUB_MESSAGE_HANDLE instance after invoking IoTHubClient_LL_MessageCallback().] + IoTHubMessage_Destroy(iothub_message); - disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] + if (disposition_result == IOTHUBMESSAGE_ACCEPTED) + { + result = messaging_delivery_accepted(); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] + else if (disposition_result == IOTHUBMESSAGE_ABANDONED) + { + result = messaging_delivery_released(); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] + else if (disposition_result == IOTHUBMESSAGE_REJECTED) + { + result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); + } + } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] - if (disposition_result == IOTHUBMESSAGE_ACCEPTED) - { - result = messaging_delivery_accepted(); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] - else if (disposition_result == IOTHUBMESSAGE_ABANDONED) - { - result = messaging_delivery_released(); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] - else if (disposition_result == IOTHUBMESSAGE_REJECTED) - { - result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); - } - } - - IoTHubMessage_Destroy(iothub_message); - } - - return result; + return result; } static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port) @@ -1424,85 +1029,31 @@ { result = RESULT_FAILURE; - IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle); - const unsigned char* messageContent = NULL; - size_t messageContentSize = 0; MESSAGE_HANDLE amqp_message = NULL; bool is_message_error = false; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending] trackEventInProgress(message, transport_state); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()] - if (contentType == IOTHUBMESSAGE_BYTEARRAY && - IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK) - { - LogError("Failed getting the BYTE array representation of the event content to be sent."); - is_message_error = true; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_090: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain the size of its char* representation using strlen()] - else if (contentType == IOTHUBMESSAGE_STRING && - ((messageContent = (const unsigned char*)IoTHubMessage_GetString(message->messageHandle)) == NULL)) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_193: [IoTHubTransportAMQP_DoWork shall get a MESSAGE_HANDLE instance out of the event's IOTHUB_MESSAGE_HANDLE instance by using message_create_from_iothub_message().] + if ((result = message_create_from_iothub_message(message->messageHandle, &amqp_message)) != RESULT_OK) + { + LogError("Failed creating AMQP message (error=%d).", result); + result = __LINE__; + is_message_error = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the MESSAGE_HANDLE intance to uAMQP for sending (along with on_message_send_complete callback) using messagesender_send()] + else if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK) { - LogError("Failed getting the STRING representation of the event content to be sent."); - is_message_error = true; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_092: [If the event contains a message of type IOTHUBMESSAGE_UNKNOWN, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] - else if (contentType == IOTHUBMESSAGE_UNKNOWN) - { - LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN."); - is_message_error = true; - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() uAMQP API] - else if ((amqp_message = message_create()) == NULL) - { - LogError("Failed allocating the AMQP message for sending the event."); + LogError("Failed sending the AMQP message."); + result = __LINE__; } else { - BINARY_DATA binary_data; - - if (contentType == IOTHUBMESSAGE_STRING) - { - messageContentSize = strlen((const char*)messageContent); - } - - binary_data.bytes = messageContent; - binary_data.length = messageContentSize; - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] - if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK) - { - LogError("Failed setting the body of the AMQP message."); - } - else - { - if (addPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - is_message_error = true; - } - else if (addApplicationPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */ - is_message_error = true; - } - else - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] - if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK) - { - LogError("Failed sending the AMQP message."); - } - else - { - result = RESULT_OK; - } - } - } + result = RESULT_OK; } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_194: [IoTHubTransportAMQP_DoWork shall destroy the MESSAGE_HANDLE instance after messagesender_send() is invoked.] if (amqp_message != NULL) { // It can be destroyed because AMQP keeps a clone of the message. @@ -1519,9 +1070,9 @@ } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create_from_iothub_message() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSend list and return] // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSend list and return] rollEventBackToWaitList(message, transport_state); break; }