Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
46:c688c75b63b9
Parent:
45:c09fac270e60
Child:
47:8a238e75a0f7
--- a/iothubtransport_amqp_common.c	Fri Nov 17 13:56:41 2017 -0800
+++ b/iothubtransport_amqp_common.c	Fri Dec 15 14:08:19 2017 -0800
@@ -58,14 +58,27 @@
     AMQP_TRANSPORT_AUTHENTICATION_MODE_X509
 } AMQP_TRANSPORT_AUTHENTICATION_MODE;
 
+/*
+Definition of transport states:
 
-#define AMQP_TRANSPORT_STATE_STRINGS              \
-    AMQP_TRANSPORT_STATE_NOT_CONNECTED,           \
-    AMQP_TRANSPORT_STATE_CONNECTING,              \
-    AMQP_TRANSPORT_STATE_CONNECTED,               \
-    AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED,   \
-    AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION,  \
-    AMQP_TRANSPORT_STATE_RECONNECTING,            \
+AMQP_TRANSPORT_STATE_NOT_CONNECTED:                    Initial state when the transport is created. 
+AMQP_TRANSPORT_STATE_CONNECTING:                       First connection ever.
+AMQP_TRANSPORT_STATE_CONNECTED:                        Transition from AMQP_TRANSPORT_STATE_CONNECTING or AMQP_TRANSPORT_STATE_RECONNECTING. 
+AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED:            When a failure occurred and the transport identifies a reconnection is needed.
+AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION:           Transition from AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED after all prep is done (transient instances are destroyed, devices are stopped).
+AMQP_TRANSPORT_STATE_RECONNECTING:                     Transition from AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION.
+AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES:    State reached if the maximum number/length of reconnections has been reached.
+AMQP_TRANSPORT_STATE_BEING_DESTROYED:                  State set if IoTHubTransport_AMQP_Common_Destroy function is invoked.
+*/
+
+#define AMQP_TRANSPORT_STATE_STRINGS                    \
+    AMQP_TRANSPORT_STATE_NOT_CONNECTED,                 \
+    AMQP_TRANSPORT_STATE_CONNECTING,                    \
+    AMQP_TRANSPORT_STATE_CONNECTED,                     \
+    AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED,         \
+    AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION,        \
+    AMQP_TRANSPORT_STATE_RECONNECTING,                  \
+    AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES, \
     AMQP_TRANSPORT_STATE_BEING_DESTROYED
 
 DEFINE_LOCAL_ENUM(AMQP_TRANSPORT_STATE, AMQP_TRANSPORT_STATE_STRINGS);
@@ -227,6 +240,17 @@
     return result;
 }
 
+static void raise_connection_status_callback_retry_expired(const void* item, const void* action_context, bool* continue_processing)
+{
+    (void)action_context;
+
+    AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)item;
+
+    IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED);
+
+    *continue_processing = true;
+}
+
 // @brief
 //     Saves the new state, if it is different than the previous one.
 static void on_device_state_changed_callback(void* context, DEVICE_STATE previous_state, DEVICE_STATE new_state)
@@ -251,7 +275,17 @@
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [If `new_state` is DEVICE_STATE_STOPPED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK]
         else if (new_state == DEVICE_STATE_STOPPED)
         {
-            IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+            if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_CONNECTED ||
+                registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_BEING_DESTROYED)
+            {
+                IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
+            }
+            else if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED ||
+                registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION ||
+                registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTING)
+            {
+                IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK);
+            }
         }
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [If `new_state` is DEVICE_STATE_ERROR_AUTH, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL]
         else if (new_state == DEVICE_STATE_ERROR_AUTH)
@@ -1493,11 +1527,15 @@
         AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle;
         LIST_ITEM_HANDLE list_item;
 
+        if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES)
+        {
+            // Nothing to be done.
+        }
         // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_017: [If `instance->state` is `RECONNECTION_REQUIRED`, IoTHubTransport_AMQP_Common_DoWork shall attempt to trigger the connection-retry logic and return]
-        if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED)
+        else if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED)
         {
             RETRY_ACTION retry_action;
-        
+
             // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_126: [The connection retry shall be attempted only if retry_control_should_retry() returns RETRY_ACTION_NOW, or if it fails]
             if (retry_control_should_retry(transport_instance->connection_retry_control, &retry_action) != RESULT_OK)
             {
@@ -1509,58 +1547,67 @@
             {
                 prepare_for_connection_retry(transport_instance);
             }
-        }
-        // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices]
-        else if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL)
-        {
-            // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since
-            // there is not a preferred authentication mode set yet on the transport.
+            else if (retry_action == RETRY_ACTION_STOP_RETRYING)
+            {
+                update_state(transport_instance, AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES);
 
-            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established]
-            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ]
-            if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK)
-            {
-                LogError("AMQP transport failed to establish connection with service.");
-
-                update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+                (void)singlylinkedlist_foreach(transport_instance->registered_devices, raise_connection_status_callback_retry_expired, NULL);
             }
-            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each]
-            else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED)
+        }
+        else
+        {
+            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices]
+            if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL)
             {
-                while (list_item != NULL)
-                {
-                    AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
+                // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since
+                // there is not a preferred authentication mode set yet on the transport.
 
-                    if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
-                    {
-                        LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)");
-                    }
-                    else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established]
+                // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ]
+                if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK)
+                {
+                    LogError("AMQP transport failed to establish connection with service.");
+
+                    update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+                }
+                // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each]
+                else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED)
+                {
+                    while (list_item != NULL)
                     {
-                        LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id));
+                        AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
 
-                        update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
-                    }
-                    else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK)
-                    {
-                        // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered]
-                        if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                        if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
                         {
-                            LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id));
+                            LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)");
+                        }
+                        else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                        {
+                            LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id));
 
                             update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
                         }
-                    }
+                        else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK)
+                        {
+                            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered]
+                            if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES)
+                            {
+                                LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id));
 
-                    list_item = singlylinkedlist_get_next_item(list_item);
+                                update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED);
+                            }
+                        }
+
+                        list_item = singlylinkedlist_get_next_item(list_item);
+                    }
                 }
             }
-        }
 
-        // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked]
-        if (transport_instance->amqp_connection != NULL)
-        {
-            amqp_connection_do_work(transport_instance->amqp_connection);
+            // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked]
+            if (transport_instance->amqp_connection != NULL)
+            {
+                amqp_connection_do_work(transport_instance->amqp_connection);
+            }
         }
     }
 }
@@ -2297,6 +2344,11 @@
 
             LogInfo("Retry policy set (%d, timeout = %d)", retryPolicy, retryTimeoutLimitInSeconds);
 
+            if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES)
+            {
+                transport_instance->state = AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED;
+            }
+
             // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_128: [If no errors occur, `IoTHubTransport_AMQP_Common_SetRetryPolicy` shall return zero.]
             result = RESULT_OK;
         }