Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_common.c
- Revision:
- 46:c688c75b63b9
- Parent:
- 45:c09fac270e60
- Child:
- 47:8a238e75a0f7
diff -r c09fac270e60 -r c688c75b63b9 iothubtransport_amqp_common.c --- a/iothubtransport_amqp_common.c Fri Nov 17 13:56:41 2017 -0800 +++ b/iothubtransport_amqp_common.c Fri Dec 15 14:08:19 2017 -0800 @@ -58,14 +58,27 @@ AMQP_TRANSPORT_AUTHENTICATION_MODE_X509 } AMQP_TRANSPORT_AUTHENTICATION_MODE; +/* +Definition of transport states: -#define AMQP_TRANSPORT_STATE_STRINGS \ - AMQP_TRANSPORT_STATE_NOT_CONNECTED, \ - AMQP_TRANSPORT_STATE_CONNECTING, \ - AMQP_TRANSPORT_STATE_CONNECTED, \ - AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED, \ - AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION, \ - AMQP_TRANSPORT_STATE_RECONNECTING, \ +AMQP_TRANSPORT_STATE_NOT_CONNECTED: Initial state when the transport is created. +AMQP_TRANSPORT_STATE_CONNECTING: First connection ever. +AMQP_TRANSPORT_STATE_CONNECTED: Transition from AMQP_TRANSPORT_STATE_CONNECTING or AMQP_TRANSPORT_STATE_RECONNECTING. +AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED: When a failure occurred and the transport identifies a reconnection is needed. +AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION: Transition from AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED after all prep is done (transient instances are destroyed, devices are stopped). +AMQP_TRANSPORT_STATE_RECONNECTING: Transition from AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION. +AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES: State reached if the maximum number/length of reconnections has been reached. +AMQP_TRANSPORT_STATE_BEING_DESTROYED: State set if IoTHubTransport_AMQP_Common_Destroy function is invoked. +*/ + +#define AMQP_TRANSPORT_STATE_STRINGS \ + AMQP_TRANSPORT_STATE_NOT_CONNECTED, \ + AMQP_TRANSPORT_STATE_CONNECTING, \ + AMQP_TRANSPORT_STATE_CONNECTED, \ + AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED, \ + AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION, \ + AMQP_TRANSPORT_STATE_RECONNECTING, \ + AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES, \ AMQP_TRANSPORT_STATE_BEING_DESTROYED DEFINE_LOCAL_ENUM(AMQP_TRANSPORT_STATE, AMQP_TRANSPORT_STATE_STRINGS); @@ -227,6 +240,17 @@ return result; } +static void raise_connection_status_callback_retry_expired(const void* item, const void* action_context, bool* continue_processing) +{ + (void)action_context; + + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)item; + + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED); + + *continue_processing = true; +} + // @brief // Saves the new state, if it is different than the previous one. static void on_device_state_changed_callback(void* context, DEVICE_STATE previous_state, DEVICE_STATE new_state) @@ -251,7 +275,17 @@ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [If `new_state` is DEVICE_STATE_STOPPED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK] else if (new_state == DEVICE_STATE_STOPPED) { - IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK); + if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_CONNECTED || + registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_BEING_DESTROYED) + { + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK); + } + else if (registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED || + registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_READY_FOR_RECONNECTION || + registered_device->transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTING) + { + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK); + } } // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [If `new_state` is DEVICE_STATE_ERROR_AUTH, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL] else if (new_state == DEVICE_STATE_ERROR_AUTH) @@ -1493,11 +1527,15 @@ AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle; LIST_ITEM_HANDLE list_item; + if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES) + { + // Nothing to be done. + } // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_017: [If `instance->state` is `RECONNECTION_REQUIRED`, IoTHubTransport_AMQP_Common_DoWork shall attempt to trigger the connection-retry logic and return] - if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED) + else if (transport_instance->state == AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED) { RETRY_ACTION retry_action; - + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_126: [The connection retry shall be attempted only if retry_control_should_retry() returns RETRY_ACTION_NOW, or if it fails] if (retry_control_should_retry(transport_instance->connection_retry_control, &retry_action) != RESULT_OK) { @@ -1509,58 +1547,67 @@ { prepare_for_connection_retry(transport_instance); } - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices] - else if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL) - { - // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since - // there is not a preferred authentication mode set yet on the transport. + else if (retry_action == RETRY_ACTION_STOP_RETRYING) + { + update_state(transport_instance, AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ] - if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK) - { - LogError("AMQP transport failed to establish connection with service."); - - update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED); + (void)singlylinkedlist_foreach(transport_instance->registered_devices, raise_connection_status_callback_retry_expired, NULL); } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each] - else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED) + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices] + if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL) { - while (list_item != NULL) - { - AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since + // there is not a preferred authentication mode set yet on the transport. - if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) - { - LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)"); - } - else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_12_003: [AMQP connection will be configured using the `c2d_keep_alive_freq_secs` value from SetOption ] + if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK) + { + LogError("AMQP transport failed to establish connection with service."); + + update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each] + else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED) + { + while (list_item != NULL) { - LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id)); + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; - update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED); - } - else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered] - if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) { - LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id)); + LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)"); + } + else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + { + LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id)); update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED); } - } + else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered] + if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + { + LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id)); - list_item = singlylinkedlist_get_next_item(list_item); + update_state(transport_instance, AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED); + } + } + + list_item = singlylinkedlist_get_next_item(list_item); + } } } - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked] - if (transport_instance->amqp_connection != NULL) - { - amqp_connection_do_work(transport_instance->amqp_connection); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked] + if (transport_instance->amqp_connection != NULL) + { + amqp_connection_do_work(transport_instance->amqp_connection); + } } } } @@ -2297,6 +2344,11 @@ LogInfo("Retry policy set (%d, timeout = %d)", retryPolicy, retryTimeoutLimitInSeconds); + if (transport_instance->state == AMQP_TRANSPORT_STATE_NOT_CONNECTED_NO_MORE_RETRIES) + { + transport_instance->state = AMQP_TRANSPORT_STATE_RECONNECTION_REQUIRED; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_128: [If no errors occur, `IoTHubTransport_AMQP_Common_SetRetryPolicy` shall return zero.] result = RESULT_OK; }