Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
31:adadaef857c1
Parent:
30:20a85b733111
Child:
32:babfd49032ff
--- a/iothubtransport_amqp_common.c	Fri Mar 10 11:46:55 2017 -0800
+++ b/iothubtransport_amqp_common.c	Fri Mar 24 16:35:00 2017 -0700
@@ -3,6 +3,7 @@
 
 #include <stdlib.h>
 #include <stdint.h>
+#include <stdbool.h>
 #include <time.h>
 #include <limits.h>
 #include "azure_c_shared_utility/optimize_size.h"
@@ -17,6 +18,8 @@
 #include "azure_c_shared_utility/urlencode.h"
 #include "azure_c_shared_utility/tlsio.h"
 #include "azure_c_shared_utility/optionhandler.h"
+#include "azure_c_shared_utility/shared_util_options.h"
+#include "azure_c_shared_utility/macro_utils.h"
 
 #include "azure_uamqp_c/cbs.h"
 #include "azure_uamqp_c/session.h"
@@ -60,11 +63,17 @@
     AMQP_GET_IO_TRANSPORT underlying_io_transport_provider;             // Pointer to the function that creates the TLS I/O (internal use only).
 	AMQP_CONNECTION_HANDLE amqp_connection;                             // Base amqp connection with service.
 	AMQP_CONNECTION_STATE amqp_connection_state;                        // Current state of the amqp_connection.
+	bool is_connection_shutting_down;                                   // Indicates whether the amqp_connection is being closed/destroyed intentionally or not.
 	AMQP_TRANSPORT_AUTHENTICATION_MODE preferred_authentication_mode;   // Used to avoid registered devices using different authentication modes.
 	SINGLYLINKEDLIST_HANDLE registered_devices;                         // List of devices currently registered in this transport.
     bool is_trace_on;                                                   // Turns logging on and off.
     OPTIONHANDLER_HANDLE saved_tls_options;                             // Here are the options from the xio layer if any is saved.
 	bool is_connection_retry_required;                                  // Flag that controls whether the connection should be restablished or not.
+
+    char* http_proxy_hostname;
+    int http_proxy_port;
+    char* http_proxy_username;
+    char* http_proxy_password;
 	
 	size_t option_sas_token_lifetime_secs;                              // Device-specific option.
 	size_t option_sas_token_refresh_time_secs;                          // Device-specific option.
@@ -103,6 +112,27 @@
 
 // ---------- General Helpers ---------- //
 
+static void free_proxy_data(AMQP_TRANSPORT_INSTANCE* amqp_transport_instance)
+{
+    if (amqp_transport_instance->http_proxy_hostname != NULL)
+    {
+        free(amqp_transport_instance->http_proxy_hostname);
+        amqp_transport_instance->http_proxy_hostname = NULL;
+    }
+
+    if (amqp_transport_instance->http_proxy_username != NULL)
+    {
+        free(amqp_transport_instance->http_proxy_username);
+        amqp_transport_instance->http_proxy_username = NULL;
+    }
+
+    if (amqp_transport_instance->http_proxy_password != NULL)
+    {
+        free(amqp_transport_instance->http_proxy_password);
+        amqp_transport_instance->http_proxy_password = NULL;
+    }
+}
+
 // @brief
 //     Evaluates if the ammount of time since start_time is greater or lesser than timeout_in_secs.
 // @param is_timed_out
@@ -200,6 +230,27 @@
 		registered_device->device_state = new_state;
 		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_063: [If `registered_device->time_of_last_state_change` shall be set using get_time()]
 		registered_device->time_of_last_state_change = get_time(NULL);
+
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_120: [If `new_state` is DEVICE_STATE_STARTED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_AUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK]
+		if (new_state == DEVICE_STATE_STARTED)
+		{
+			IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+		}
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [If `new_state` is DEVICE_STATE_STOPPED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK]
+		else if (new_state == DEVICE_STATE_STOPPED)
+		{
+			IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+		}
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [If `new_state` is DEVICE_STATE_ERROR_AUTH, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL]
+		else if (new_state == DEVICE_STATE_ERROR_AUTH)
+		{
+			IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL);
+		}
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_123: [If `new_state` is DEVICE_STATE_ERROR_AUTH_TIMEOUT or DEVICE_STATE_ERROR_MSG, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR]
+		else if (new_state == DEVICE_STATE_ERROR_AUTH_TIMEOUT || new_state == DEVICE_STATE_ERROR_MSG)
+		{
+			IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR);
+		}
 	}
 }
 
@@ -366,9 +417,11 @@
 
 static void on_methods_unsubscribed(void* context)
 {
-    /* Codess_SRS_IOTHUBTRANSPORT_AMQP_METHODS_12_001: [ `on_methods_unsubscribed` calls iothubtransportamqp_methods_unsubscribe. ]*/
+    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_METHODS_12_001: [ `on_methods_unsubscribed` calls iothubtransportamqp_methods_unsubscribe. ]*/
     AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context;
-    IoTHubTransport_AMQP_Common_Unsubscribe_DeviceMethod(device_state);
+
+	iothubtransportamqp_methods_unsubscribe(device_state->methods_handle);
+	device_state->subscribed_for_methods = false;
 }
 
 static int on_method_request_received(void* context, const char* method_name, const unsigned char* request, size_t request_size, IOTHUBTRANSPORT_AMQP_METHOD_HANDLE method_handle)
@@ -525,8 +578,16 @@
 static int get_new_underlying_io_transport(AMQP_TRANSPORT_INSTANCE* transport_instance, XIO_HANDLE *xio_handle)
 {
 	int result;
+    AMQP_TRANSPORT_PROXY_OPTIONS amqp_transport_proxy_options;
 
-	if ((*xio_handle = transport_instance->underlying_io_transport_provider(STRING_c_str(transport_instance->iothub_host_fqdn))) == NULL)
+    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_041: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `amqp_transport_proxy_options` when calling the function `underlying_io_transport_provider()` to obtain the underlying IO handle. ]*/
+    amqp_transport_proxy_options.host_address = transport_instance->http_proxy_hostname;
+    amqp_transport_proxy_options.port = transport_instance->http_proxy_port;
+    amqp_transport_proxy_options.username = transport_instance->http_proxy_username;
+    amqp_transport_proxy_options.password = transport_instance->http_proxy_password;
+
+    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_042: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `amqp_transport_proxy_options` when calling the function `underlying_io_transport_provider()`. ]*/
+    if ((*xio_handle = transport_instance->underlying_io_transport_provider(STRING_c_str(transport_instance->iothub_host_fqdn), amqp_transport_proxy_options.host_address == NULL ? NULL : &amqp_transport_proxy_options)) == NULL)
 	{
 		LogError("Failed to obtain a TLS I/O transport layer (underlying_io_transport_provider() failed)");
 		result = __FAILURE__;
@@ -560,7 +621,14 @@
 		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_060: [If `new_state` is AMQP_CONNECTION_STATE_ERROR, the connection shall be flagged as faulty (so the connection retry logic can be triggered)]
 		if (new_state == AMQP_CONNECTION_STATE_ERROR)
 		{
-			LogError("Transport received an ERROR from the amqp_connection (state changed %d->%d); it will be flagged for connection retry.", previous_state, new_state);
+			LogError("Transport received an ERROR from the amqp_connection (state changed %s -> %s); it will be flagged for connection retry.", ENUM_TO_STRING(AMQP_CONNECTION_STATE, previous_state), ENUM_TO_STRING(AMQP_CONNECTION_STATE, new_state));
+
+			transport_instance->is_connection_retry_required = true;
+		}
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_115: [If the AMQP connection is closed by the service side, the connection retry logic shall be triggered]
+		else if (new_state == AMQP_CONNECTION_STATE_CLOSED && previous_state == AMQP_CONNECTION_STATE_OPENED && !transport_instance->is_connection_shutting_down)
+		{
+			LogError("amqp_connection was closed unexpectedly; connection retry will be triggered.");
 
 			transport_instance->is_connection_retry_required = true;
 		}
@@ -677,9 +745,11 @@
 	}
 
 	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_033: [`instance->connection` shall be destroyed using amqp_connection_destroy()]
+	transport_instance->is_connection_shutting_down = true;
 	amqp_connection_destroy(transport_instance->amqp_connection);
 	transport_instance->amqp_connection = NULL;
     transport_instance->amqp_connection_state = AMQP_CONNECTION_STATE_CLOSED;
+	transport_instance->is_connection_shutting_down = false;
 
 	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_035: [`instance->tls_io` shall be destroyed using xio_destroy()]
 	destroy_underlying_io_transport(transport_instance);
@@ -1113,6 +1183,8 @@
 
 		if (instance->amqp_connection != NULL)
 		{
+			instance->is_connection_shutting_down = true;
+			
 			amqp_connection_destroy(instance->amqp_connection);
 		}
 
@@ -1121,6 +1193,9 @@
 
 		STRING_delete(instance->iothub_host_fqdn);
 
+        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ `IoTHubTransport_AMQP_Common_Destroy` shall free the stored proxy options. ]*/
+        free_proxy_data(instance);
+
 		free(instance);
 	}
 }
@@ -1563,7 +1638,79 @@
 				result = IOTHUB_CLIENT_OK;
 			}
 		}
-		else
+        else if (strcmp(OPTION_HTTP_PROXY, option) == 0)
+        {
+            /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_032: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/
+            HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value;
+
+            if (transport_instance->tls_io != NULL)
+            {
+                /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_038: [ If the underlying IO has already been created, then `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                LogError("Cannot set proxy option once the underlying IO is created");
+                result = IOTHUB_CLIENT_ERROR;
+            }
+            else if (proxy_options->host_address == NULL)
+            {
+                /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_034: [ If `host_address` is NULL, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
+                LogError("NULL host_address in proxy options");
+                result = IOTHUB_CLIENT_INVALID_ARG;
+            }
+            /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_037: [ If only one of `username` and `password` is NULL, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
+            else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) &&
+                (proxy_options->username != proxy_options->password))
+            {
+                LogError("Only one of username and password for proxy settings was NULL");
+                result = IOTHUB_CLIENT_INVALID_ARG;
+            }
+            else
+            {
+                char* copied_proxy_hostname = NULL;
+                char* copied_proxy_username = NULL;
+                char* copied_proxy_password = NULL;
+
+                /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_033: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/
+                transport_instance->http_proxy_port = proxy_options->port;
+                if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0)
+                {
+                    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_035: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                    LogError("Cannot copy HTTP proxy hostname");
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_036: [ `username` and `password` shall be allowed to be NULL. ]*/
+                else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0))
+                {
+                    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_035: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                    free(copied_proxy_hostname);
+                    LogError("Cannot copy HTTP proxy username");
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_036: [ `username` and `password` shall be allowed to be NULL. ]*/
+                else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0))
+                {
+                    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_035: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                    if (copied_proxy_username != NULL)
+                    {
+                        free(copied_proxy_username);
+                    }
+                    free(copied_proxy_hostname);
+                    LogError("Cannot copy HTTP proxy password");
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                else
+                {
+                    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_040: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/
+                    free_proxy_data(transport_instance);
+
+                    transport_instance->http_proxy_hostname = copied_proxy_hostname;
+                    transport_instance->http_proxy_username = copied_proxy_username;
+                    transport_instance->http_proxy_password = copied_proxy_password;
+
+                    /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_039: [ If setting the `proxy_data` option succeeds, `IoTHubTransport_AMQP_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/
+                    result = IOTHUB_CLIENT_OK;
+                }
+            }
+        }
+        else
 		{
 			result = IOTHUB_CLIENT_OK;