Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_common.c
- Revision:
- 31:adadaef857c1
- Parent:
- 30:20a85b733111
- Child:
- 32:babfd49032ff
--- a/iothubtransport_amqp_common.c Fri Mar 10 11:46:55 2017 -0800 +++ b/iothubtransport_amqp_common.c Fri Mar 24 16:35:00 2017 -0700 @@ -3,6 +3,7 @@ #include <stdlib.h> #include <stdint.h> +#include <stdbool.h> #include <time.h> #include <limits.h> #include "azure_c_shared_utility/optimize_size.h" @@ -17,6 +18,8 @@ #include "azure_c_shared_utility/urlencode.h" #include "azure_c_shared_utility/tlsio.h" #include "azure_c_shared_utility/optionhandler.h" +#include "azure_c_shared_utility/shared_util_options.h" +#include "azure_c_shared_utility/macro_utils.h" #include "azure_uamqp_c/cbs.h" #include "azure_uamqp_c/session.h" @@ -60,11 +63,17 @@ AMQP_GET_IO_TRANSPORT underlying_io_transport_provider; // Pointer to the function that creates the TLS I/O (internal use only). AMQP_CONNECTION_HANDLE amqp_connection; // Base amqp connection with service. AMQP_CONNECTION_STATE amqp_connection_state; // Current state of the amqp_connection. + bool is_connection_shutting_down; // Indicates whether the amqp_connection is being closed/destroyed intentionally or not. AMQP_TRANSPORT_AUTHENTICATION_MODE preferred_authentication_mode; // Used to avoid registered devices using different authentication modes. SINGLYLINKEDLIST_HANDLE registered_devices; // List of devices currently registered in this transport. bool is_trace_on; // Turns logging on and off. OPTIONHANDLER_HANDLE saved_tls_options; // Here are the options from the xio layer if any is saved. bool is_connection_retry_required; // Flag that controls whether the connection should be restablished or not. + + char* http_proxy_hostname; + int http_proxy_port; + char* http_proxy_username; + char* http_proxy_password; size_t option_sas_token_lifetime_secs; // Device-specific option. size_t option_sas_token_refresh_time_secs; // Device-specific option. @@ -103,6 +112,27 @@ // ---------- General Helpers ---------- // +static void free_proxy_data(AMQP_TRANSPORT_INSTANCE* amqp_transport_instance) +{ + if (amqp_transport_instance->http_proxy_hostname != NULL) + { + free(amqp_transport_instance->http_proxy_hostname); + amqp_transport_instance->http_proxy_hostname = NULL; + } + + if (amqp_transport_instance->http_proxy_username != NULL) + { + free(amqp_transport_instance->http_proxy_username); + amqp_transport_instance->http_proxy_username = NULL; + } + + if (amqp_transport_instance->http_proxy_password != NULL) + { + free(amqp_transport_instance->http_proxy_password); + amqp_transport_instance->http_proxy_password = NULL; + } +} + // @brief // Evaluates if the ammount of time since start_time is greater or lesser than timeout_in_secs. // @param is_timed_out @@ -200,6 +230,27 @@ registered_device->device_state = new_state; // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_063: [If `registered_device->time_of_last_state_change` shall be set using get_time()] registered_device->time_of_last_state_change = get_time(NULL); + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_120: [If `new_state` is DEVICE_STATE_STARTED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_AUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK] + if (new_state == DEVICE_STATE_STARTED) + { + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [If `new_state` is DEVICE_STATE_STOPPED, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_OK] + else if (new_state == DEVICE_STATE_STOPPED) + { + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [If `new_state` is DEVICE_STATE_ERROR_AUTH, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL] + else if (new_state == DEVICE_STATE_ERROR_AUTH) + { + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_123: [If `new_state` is DEVICE_STATE_ERROR_AUTH_TIMEOUT or DEVICE_STATE_ERROR_MSG, IoTHubClient_LL_ConnectionStatusCallBack shall be invoked with IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED and IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR] + else if (new_state == DEVICE_STATE_ERROR_AUTH_TIMEOUT || new_state == DEVICE_STATE_ERROR_MSG) + { + IoTHubClient_LL_ConnectionStatusCallBack(registered_device->iothub_client_handle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR); + } } } @@ -366,9 +417,11 @@ static void on_methods_unsubscribed(void* context) { - /* Codess_SRS_IOTHUBTRANSPORT_AMQP_METHODS_12_001: [ `on_methods_unsubscribed` calls iothubtransportamqp_methods_unsubscribe. ]*/ + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_METHODS_12_001: [ `on_methods_unsubscribed` calls iothubtransportamqp_methods_unsubscribe. ]*/ AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; - IoTHubTransport_AMQP_Common_Unsubscribe_DeviceMethod(device_state); + + iothubtransportamqp_methods_unsubscribe(device_state->methods_handle); + device_state->subscribed_for_methods = false; } static int on_method_request_received(void* context, const char* method_name, const unsigned char* request, size_t request_size, IOTHUBTRANSPORT_AMQP_METHOD_HANDLE method_handle) @@ -525,8 +578,16 @@ static int get_new_underlying_io_transport(AMQP_TRANSPORT_INSTANCE* transport_instance, XIO_HANDLE *xio_handle) { int result; + AMQP_TRANSPORT_PROXY_OPTIONS amqp_transport_proxy_options; - if ((*xio_handle = transport_instance->underlying_io_transport_provider(STRING_c_str(transport_instance->iothub_host_fqdn))) == NULL) + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_041: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `amqp_transport_proxy_options` when calling the function `underlying_io_transport_provider()` to obtain the underlying IO handle. ]*/ + amqp_transport_proxy_options.host_address = transport_instance->http_proxy_hostname; + amqp_transport_proxy_options.port = transport_instance->http_proxy_port; + amqp_transport_proxy_options.username = transport_instance->http_proxy_username; + amqp_transport_proxy_options.password = transport_instance->http_proxy_password; + + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_042: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `amqp_transport_proxy_options` when calling the function `underlying_io_transport_provider()`. ]*/ + if ((*xio_handle = transport_instance->underlying_io_transport_provider(STRING_c_str(transport_instance->iothub_host_fqdn), amqp_transport_proxy_options.host_address == NULL ? NULL : &amqp_transport_proxy_options)) == NULL) { LogError("Failed to obtain a TLS I/O transport layer (underlying_io_transport_provider() failed)"); result = __FAILURE__; @@ -560,7 +621,14 @@ // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_060: [If `new_state` is AMQP_CONNECTION_STATE_ERROR, the connection shall be flagged as faulty (so the connection retry logic can be triggered)] if (new_state == AMQP_CONNECTION_STATE_ERROR) { - LogError("Transport received an ERROR from the amqp_connection (state changed %d->%d); it will be flagged for connection retry.", previous_state, new_state); + LogError("Transport received an ERROR from the amqp_connection (state changed %s -> %s); it will be flagged for connection retry.", ENUM_TO_STRING(AMQP_CONNECTION_STATE, previous_state), ENUM_TO_STRING(AMQP_CONNECTION_STATE, new_state)); + + transport_instance->is_connection_retry_required = true; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_115: [If the AMQP connection is closed by the service side, the connection retry logic shall be triggered] + else if (new_state == AMQP_CONNECTION_STATE_CLOSED && previous_state == AMQP_CONNECTION_STATE_OPENED && !transport_instance->is_connection_shutting_down) + { + LogError("amqp_connection was closed unexpectedly; connection retry will be triggered."); transport_instance->is_connection_retry_required = true; } @@ -677,9 +745,11 @@ } // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_033: [`instance->connection` shall be destroyed using amqp_connection_destroy()] + transport_instance->is_connection_shutting_down = true; amqp_connection_destroy(transport_instance->amqp_connection); transport_instance->amqp_connection = NULL; transport_instance->amqp_connection_state = AMQP_CONNECTION_STATE_CLOSED; + transport_instance->is_connection_shutting_down = false; // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_035: [`instance->tls_io` shall be destroyed using xio_destroy()] destroy_underlying_io_transport(transport_instance); @@ -1113,6 +1183,8 @@ if (instance->amqp_connection != NULL) { + instance->is_connection_shutting_down = true; + amqp_connection_destroy(instance->amqp_connection); } @@ -1121,6 +1193,9 @@ STRING_delete(instance->iothub_host_fqdn); + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ `IoTHubTransport_AMQP_Common_Destroy` shall free the stored proxy options. ]*/ + free_proxy_data(instance); + free(instance); } } @@ -1563,7 +1638,79 @@ result = IOTHUB_CLIENT_OK; } } - else + else if (strcmp(OPTION_HTTP_PROXY, option) == 0) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_032: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/ + HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value; + + if (transport_instance->tls_io != NULL) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_038: [ If the underlying IO has already been created, then `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + LogError("Cannot set proxy option once the underlying IO is created"); + result = IOTHUB_CLIENT_ERROR; + } + else if (proxy_options->host_address == NULL) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_034: [ If `host_address` is NULL, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/ + LogError("NULL host_address in proxy options"); + result = IOTHUB_CLIENT_INVALID_ARG; + } + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_037: [ If only one of `username` and `password` is NULL, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/ + else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) && + (proxy_options->username != proxy_options->password)) + { + LogError("Only one of username and password for proxy settings was NULL"); + result = IOTHUB_CLIENT_INVALID_ARG; + } + else + { + char* copied_proxy_hostname = NULL; + char* copied_proxy_username = NULL; + char* copied_proxy_password = NULL; + + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_033: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/ + transport_instance->http_proxy_port = proxy_options->port; + if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_035: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + LogError("Cannot copy HTTP proxy hostname"); + result = IOTHUB_CLIENT_ERROR; + } + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_036: [ `username` and `password` shall be allowed to be NULL. ]*/ + else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0)) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_035: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + free(copied_proxy_hostname); + LogError("Cannot copy HTTP proxy username"); + result = IOTHUB_CLIENT_ERROR; + } + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_036: [ `username` and `password` shall be allowed to be NULL. ]*/ + else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0)) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_035: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_AMQP_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + if (copied_proxy_username != NULL) + { + free(copied_proxy_username); + } + free(copied_proxy_hostname); + LogError("Cannot copy HTTP proxy password"); + result = IOTHUB_CLIENT_ERROR; + } + else + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_040: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/ + free_proxy_data(transport_instance); + + transport_instance->http_proxy_hostname = copied_proxy_hostname; + transport_instance->http_proxy_username = copied_proxy_username; + transport_instance->http_proxy_password = copied_proxy_password; + + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_039: [ If setting the `proxy_data` option succeeds, `IoTHubTransport_AMQP_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/ + result = IOTHUB_CLIENT_OK; + } + } + } + else { result = IOTHUB_CLIENT_OK;