Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
21:32a1746384ba
Parent:
20:8dec76e7ba34
Child:
22:8b70cf813f25
--- a/iothubtransportamqp.c	Fri Jul 29 15:52:42 2016 -0700
+++ b/iothubtransportamqp.c	Fri Aug 12 10:03:28 2016 -0700
@@ -29,6 +29,7 @@
 #include "azure_uamqp_c/saslclientio.h"
 
 #include "iothub_client_ll.h"
+#include "iothub_client_options.h"
 #include "iothub_client_private.h"
 #include "iothubtransportamqp.h"
 #include "iothub_client_version.h"
@@ -263,8 +264,90 @@
     }
 }
 
+/*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_193: [**IoTHubTransportAMQP_DoWork shall set the AMQP the message-id and correlation-id if found to the UAMQP message before passing down**]***/
+static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
+{
+    int result = RESULT_OK;
+    const char* messageId;
+    const char* correlationId;
+    PROPERTIES_HANDLE uamqp_message_properties;
+    int api_call_result;
 
-static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
+    /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_194: [**Uamqp message properties shall be retrieved using message_get_properties to update message-id/Correlation-Id **]***/
+    if ((api_call_result = message_get_properties(uamqp_message, &uamqp_message_properties)) != 0)
+    {
+        LogError("Failed to get properties map from uAMQP message (error code %d).", api_call_result);
+        result = __LINE__;
+    }
+    /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_195: [**If UAMQP message properties were not present then new properties shall be created using properties_create()**]***/
+    else if (uamqp_message_properties == NULL &&
+        (uamqp_message_properties = properties_create()) == NULL)
+    {
+        LogError("Failed to create properties map for uAMQP message (error code %d).", api_call_result);
+        result = __LINE__;
+    }
+    else
+    {
+        /***Codes_SRS_IOTHUBTRANSPORTAMQP_25_200: [**As message - id is optional field, if it is not set by the client, processing shall ignore and continue normally**] * */
+        /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_196: [**Message-id from the IotHub Client shall be read using IoTHubMessage_GetMessageId()**]***/
+        if ((messageId = IoTHubMessage_GetMessageId(iothub_message_handle)) != NULL)
+        {
+            AMQP_VALUE uamqp_message_id;
+            /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_197: [**Uamqp message id shall be created using amqpvalue_create_string()**]***/
+            if ((uamqp_message_id = amqpvalue_create_string(messageId)) == NULL)
+            {
+                LogError("Failed to create an AMQP_VALUE for the messageId property value.");
+                result = __LINE__;
+            }
+            else
+            {
+                /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_198: [**Message id would be set to Uamqp using properties_set_message_id()**]***/
+                if ((api_call_result = properties_set_message_id(uamqp_message_properties, uamqp_message_id)) != 0)
+                {
+                    LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result);
+                    result = __LINE__;
+                }
+                /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_199: [**Uamqp value used for message id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/
+                amqpvalue_destroy(uamqp_message_id);
+            }
+        }
+        /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_205: [**As Correlation-id is optional field, if it is not set by the client, processing shall ignore and continue normally**]***/
+        /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_201: [**Correlation-id from the IotHub Client shall be read using IoTHubMessage_GetCorrelationId()**]***/
+        if ((correlationId = IoTHubMessage_GetCorrelationId(iothub_message_handle)) != NULL)
+        {
+            AMQP_VALUE uamqp_correlation_id;
+            /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_202: [**Uamqp value for Correlation id shall be created using amqpvalue_create_string()**]***/
+            if ((uamqp_correlation_id = amqpvalue_create_string(correlationId)) == NULL)
+            {
+                LogError("Failed to create an AMQP_VALUE for the messageId property value.");
+                result = __LINE__;
+            }
+            else
+            {
+                /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_203: [**Correlation id would be set to Uamqp using properties_set_correlation_id()**]***/
+                if ((api_call_result = properties_set_correlation_id(uamqp_message_properties, uamqp_correlation_id)) != 0)
+                {
+                    LogInfo("Failed to set value of uAMQP message 'message-id' property (%d).", api_call_result);
+                    result = __LINE__;
+                }
+                /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_204: [**Uamqp value used for Correlation id shall be destroyed using amqpvalue_destroy() upon completion of its use**]***/
+                amqpvalue_destroy(uamqp_correlation_id);
+            }
+        }
+        /*Codes_**SRS_IOTHUBTRANSPORTAMQP_25_206: [**Modified Uamqp properties shall be set using message_set_properties()**]***/
+        if ((api_call_result = message_set_properties(uamqp_message, uamqp_message_properties)) != 0)
+        {
+            LogError("Failed to set properties map on uAMQP message (error code %d).", api_call_result);
+            result = __LINE__;
+        }
+    }
+
+    properties_destroy(uamqp_message_properties);
+
+    return result;
+}
+
+static int addApplicationPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
 {
     int result;
     MAP_HANDLE properties_map;
@@ -857,7 +940,7 @@
                             transport_state->connection_establish_time = getSecondsSinceEpoch();
                             transport_state->cbs.cbs_state = CBS_STATE_IDLE;
                             connection_set_trace(transport_state->connection, transport_state->is_trace_on);
-                            (void)xio_setoption(transport_state->cbs.sasl_io, "logtrace", &transport_state->is_trace_on);
+                            (void)xio_setoption(transport_state->cbs.sasl_io, OPTION_LOG_TRACE, &transport_state->is_trace_on);
                             result = RESULT_OK;
                         }
                     }
@@ -899,7 +982,7 @@
 
                         transport_state->connection_establish_time = getSecondsSinceEpoch();
                         connection_set_trace(transport_state->connection, transport_state->is_trace_on);
-                        (void)xio_setoption(transport_state->tls_io, "logtrace", &transport_state->is_trace_on);
+                        (void)xio_setoption(transport_state->tls_io, OPTION_LOG_TRACE, &transport_state->is_trace_on);
                         result = RESULT_OK;
                     }
                 }
@@ -1085,21 +1168,21 @@
 
 void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
 {
-	if (context != NULL)
-	{
-		AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+    if (context != NULL)
+    {
+        AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
 
-		if (transport_state->is_trace_on)
-		{
-			LogInfo("Event sender state changed [%d->%d]", previous_state, new_state);
-		}
+        if (transport_state->is_trace_on)
+        {
+            LogInfo("Event sender state changed [%d->%d]", previous_state, new_state);
+        }
 
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
-		if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR)
-		{
-			transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
-		}
-	}
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
+        if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR)
+        {
+            transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
+        }
+    }
 }
 
 static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -1136,7 +1219,7 @@
             attachDeviceClientTypeToLink(transport_state->sender_link);
 
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] 
-			// Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function]
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_191: [IoTHubTransportAMQP_DoWork shall create each AMQP message sender tracking its state changes with a callback function]
             if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, on_event_sender_state_changed, (void*)transport_state)) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] 
@@ -1193,21 +1276,21 @@
 
 void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
 {
-	if (context != NULL)
-	{
-		AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+    if (context != NULL)
+    {
+        AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
 
-		if (transport_state->is_trace_on)
-		{
-			LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state);
-		}
+        if (transport_state->is_trace_on)
+        {
+            LogInfo("Message receiver state changed [%d->%d]", previous_state, new_state);
+        }
 
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
-		if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR)
-		{
-			transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
-		}
-	}
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered]
+        if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR)
+        {
+            transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
+        }
+    }
 }
 
 static int createMessageReceiver(AMQP_TRANSPORT_INSTANCE* transport_state, IOTHUB_CLIENT_LL_HANDLE iothub_client_handle)
@@ -1250,7 +1333,7 @@
             attachDeviceClientTypeToLink(transport_state->receiver_link);
 
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_077: [IoTHubTransportAMQP_DoWork shall create the AMQP message receiver using messagereceiver_create() AMQP API] 
-			// Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function]
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_189: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function]
             if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, on_message_receiver_state_changed, (void*)transport_state)) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] 
@@ -1349,6 +1432,11 @@
                     /* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
                     is_message_error = true;
                 }
+				else if (addApplicationPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0)
+				{
+					/* Codes_SRS_IOTHUBTRANSPORTAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+					is_message_error = true;
+				}
                 else
                 {
                     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] 
@@ -1959,24 +2047,24 @@
         AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
 
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] 
-        if (strcmp("sas_token_lifetime", option) == 0)
+        if (strcmp(OPTION_SAS_TOKEN_LIFETIME, option) == 0)
         {
             transport_state->cbs.sas_token_lifetime = *((size_t*)value);
             result = IOTHUB_CLIENT_OK;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] 
-        else if (strcmp("sas_token_refresh_time", option) == 0)
+        else if (strcmp(OPTION_SAS_TOKEN_REFRESH_TIME, option) == 0)
         {
             transport_state->cbs.sas_token_refresh_time = *((size_t*)value);
             result = IOTHUB_CLIENT_OK;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] 
-        else if (strcmp("cbs_request_timeout", option) == 0)
+        else if (strcmp(OPTION_CBS_REQUEST_TIMEOUT, option) == 0)
         {
             transport_state->cbs.cbs_request_timeout = *((size_t*)value);
             result = IOTHUB_CLIENT_OK;
         }
-        else if (strcmp("logtrace", option) == 0)
+        else if (strcmp(OPTION_LOG_TRACE, option) == 0)
         {
             transport_state->is_trace_on = *((bool*)value);
             if (transport_state->connection != NULL)
@@ -1986,13 +2074,13 @@
             result = IOTHUB_CLIENT_OK;
         }
         /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_007: [ If optionName is x509certificate and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
-        else if ((strcmp("x509certificate", option) == 0) && (transport_state->credential.credentialType != X509))
+        else if ((strcmp(OPTION_X509_CERT, option) == 0) && (transport_state->credential.credentialType != X509))
         {
             LogError("x509certificate specified, but authentication method is not x509");
             result = IOTHUB_CLIENT_INVALID_ARG;
         }
         /*Codes_SRS_IOTHUBTRANSPORTAMQP_02_008: [ If optionName is x509privatekey and the authentication method is not x509 then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
-        else if ((strcmp("x509privatekey", option) == 0) && (transport_state->credential.credentialType != X509))
+        else if ((strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) && (transport_state->credential.credentialType != X509))
         {
             LogError("x509privatekey specified, but authentication method is not x509");
             result = IOTHUB_CLIENT_INVALID_ARG;