Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_common.c
- Revision:
- 30:20a85b733111
- Parent:
- 29:7e8852b14e3b
- Child:
- 31:adadaef857c1
--- a/iothubtransport_amqp_common.c Fri Feb 24 14:00:00 2017 -0800 +++ b/iothubtransport_amqp_common.c Fri Mar 10 11:46:55 2017 -0800 @@ -9,438 +9,353 @@ #include "azure_c_shared_utility/agenttime.h" #include "azure_c_shared_utility/gballoc.h" #include "azure_c_shared_utility/crt_abstractions.h" +#include "azure_c_shared_utility/singlylinkedlist.h" #include "azure_c_shared_utility/doublylinkedlist.h" #include "azure_c_shared_utility/xlogging.h" #include "azure_c_shared_utility/platform.h" #include "azure_c_shared_utility/strings.h" #include "azure_c_shared_utility/urlencode.h" #include "azure_c_shared_utility/tlsio.h" -#include "azure_c_shared_utility/vector.h" +#include "azure_c_shared_utility/optionhandler.h" #include "azure_uamqp_c/cbs.h" -#include "azure_uamqp_c/link.h" +#include "azure_uamqp_c/session.h" #include "azure_uamqp_c/message.h" -#include "azure_uamqp_c/amqpvalue.h" -#include "azure_uamqp_c/message_receiver.h" -#include "azure_uamqp_c/message_sender.h" #include "azure_uamqp_c/messaging.h" -#include "azure_uamqp_c/sasl_mssbcbs.h" -#include "azure_uamqp_c/saslclientio.h" -#include "uamqp_messaging.h" #include "iothub_client_ll.h" #include "iothub_client_options.h" #include "iothub_client_private.h" -#include "iothubtransportamqp_auth.h" #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ #include "iothubtransportamqp_methods.h" #endif #include "iothubtransport_amqp_common.h" +#include "iothubtransport_amqp_connection.h" +#include "iothubtransport_amqp_device.h" #include "iothub_client_version.h" -#define RESULT_OK 0 +#define RESULT_OK 0 +#define INDEFINITE_TIME ((time_t)(-1)) +#define DEFAULT_CBS_REQUEST_TIMEOUT_SECS 30 +#define DEFAULT_DEVICE_STATE_CHANGE_TIMEOUT_SECS 60 +#define DEFAULT_EVENT_SEND_TIMEOUT_SECS 300 +#define DEFAULT_SAS_TOKEN_LIFETIME_SECS 3600 +#define DEFAULT_SAS_TOKEN_REFRESH_TIME_SECS 1800 +#define MAX_NUMBER_OF_DEVICE_FAILURES 5 -#define INDEFINITE_TIME ((time_t)(-1)) -#define RFC1035_MAX_FQDN_LENGTH 255 -#define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000 -#define DEFAULT_CBS_REQUEST_TIMEOUT_MS 30000 -#define DEFAULT_CONTAINER_ID "default_container_id" -#define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX -#define DEFAULT_OUTGOING_WINDOW_SIZE 100 -#define MESSAGE_RECEIVER_LINK_NAME_TAG "receiver" -#define MESSAGE_RECEIVER_TARGET_ADDRESS "target" -#define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536 -#define MESSAGE_SENDER_LINK_NAME_TAG "sender" -#define MESSAGE_SENDER_SOURCE_NAME_TAG "source" -#define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX -typedef enum RESULT_TAG -{ - RESULT_SUCCESS, - RESULT_INVALID_ARGUMENT, - RESULT_TIME_OUT, - RESULT_RETRYABLE_ERROR, - RESULT_CRITICAL_ERROR -} RESULT; +// ---------- Data Definitions ---------- // -typedef struct AMQP_TRANSPORT_STATE_TAG +typedef enum AMQP_TRANSPORT_AUTHENTICATION_MODE_TAG { - // FQDN of the IoT Hub. - STRING_HANDLE iotHubHostFqdn; - - // TSL I/O transport. - XIO_HANDLE tls_io; - // Pointer to the function that creates the TLS I/O (internal use only). - AMQP_GET_IO_TRANSPORT underlying_io_transport_provider; - // AMQP connection. - CONNECTION_HANDLE connection; - // AMQP session. - SESSION_HANDLE session; - // All things CBS (and only CBS) - AMQP_TRANSPORT_CBS_CONNECTION cbs_connection; + AMQP_TRANSPORT_AUTHENTICATION_MODE_NOT_SET, + AMQP_TRANSPORT_AUTHENTICATION_MODE_CBS, + AMQP_TRANSPORT_AUTHENTICATION_MODE_X509 +} AMQP_TRANSPORT_AUTHENTICATION_MODE; - // Current AMQP connection state; - AMQP_MANAGEMENT_STATE connection_state; - - AMQP_TRANSPORT_CREDENTIAL_TYPE preferred_credential_type; - // List of registered devices. - VECTOR_HANDLE registered_devices; - // Turns logging on and off - bool is_trace_on; - // Used to generate unique AMQP link names - int link_count; - - /*here are the options from the xio layer if any is saved*/ - OPTIONHANDLER_HANDLE xioOptions; +typedef struct AMQP_TRANSPORT_INSTANCE_TAG +{ + STRING_HANDLE iothub_host_fqdn; // FQDN of the IoT Hub. + XIO_HANDLE tls_io; // TSL I/O transport. + AMQP_GET_IO_TRANSPORT underlying_io_transport_provider; // Pointer to the function that creates the TLS I/O (internal use only). + AMQP_CONNECTION_HANDLE amqp_connection; // Base amqp connection with service. + AMQP_CONNECTION_STATE amqp_connection_state; // Current state of the amqp_connection. + AMQP_TRANSPORT_AUTHENTICATION_MODE preferred_authentication_mode; // Used to avoid registered devices using different authentication modes. + SINGLYLINKEDLIST_HANDLE registered_devices; // List of devices currently registered in this transport. + bool is_trace_on; // Turns logging on and off. + OPTIONHANDLER_HANDLE saved_tls_options; // Here are the options from the xio layer if any is saved. + bool is_connection_retry_required; // Flag that controls whether the connection should be restablished or not. + + size_t option_sas_token_lifetime_secs; // Device-specific option. + size_t option_sas_token_refresh_time_secs; // Device-specific option. + size_t option_cbs_request_timeout_secs; // Device-specific option. + size_t option_send_event_timeout_secs; // Device-specific option. } AMQP_TRANSPORT_INSTANCE; -typedef struct AMQP_TRANSPORT_DEVICE_STATE_TAG +typedef struct AMQP_TRANSPORT_DEVICE_INSTANCE_TAG { - // Identity of the device. - STRING_HANDLE deviceId; - // contains the credentials to be used - AUTHENTICATION_STATE_HANDLE authentication; - - // Address to which the transport will connect to and send events. - STRING_HANDLE targetAddress; - // Address to which the transport will connect to and receive messages from. - STRING_HANDLE messageReceiveAddress; - // Internal parameter that identifies the current logical device within the service. - STRING_HANDLE devicesPath; - // Saved reference to the IoTHub LL Client. - IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; - // Saved reference to the transport the device is registered on. - AMQP_TRANSPORT_INSTANCE* transport_state; - // AMQP link used by the event sender. - LINK_HANDLE sender_link; - // uAMQP event sender. - MESSAGE_SENDER_HANDLE message_sender; - // State of the message sender. - MESSAGE_SENDER_STATE message_sender_state; - // Internal flag that controls if messages should be received or not. - bool receive_messages; - // AMQP link used by the message receiver. - LINK_HANDLE receiver_link; - // uAMQP message receiver. - MESSAGE_RECEIVER_HANDLE message_receiver; - // Message receiver state. - MESSAGE_RECEIVER_STATE message_receiver_state; - // List with events still pending to be sent. It is provided by the upper layer. - PDLIST_ENTRY waitingToSend; - // Internal list with the items currently being processed/sent through uAMQP. - DLIST_ENTRY inProgress; + STRING_HANDLE device_id; // Identity of the device. + DEVICE_HANDLE device_handle; // Logic unit that performs authentication, messaging, etc. + IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; // Saved reference to the IoTHub LL Client. + AMQP_TRANSPORT_INSTANCE* transport_instance; // Saved reference to the transport the device is registered on. + PDLIST_ENTRY waiting_to_send; // List of events waiting to be sent to the iot hub (i.e., haven't been processed by the transport yet). + DEVICE_STATE device_state; // Current state of the device_handle instance. + size_t number_of_previous_failures; // Number of times the device has failed in sequence; this value is reset to 0 if device succeeds to authenticate, send and/or recv messages. + size_t number_of_send_event_complete_failures; // Number of times on_event_send_complete was called in row with an error. + time_t time_of_last_state_change; // Time the device_handle last changed state; used to track timeouts of device_start_async and device_stop. + unsigned int max_state_change_timeout_secs; // Maximum number of seconds allowed for device_handle to complete start and stop state changes. #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ // the methods portion - IOTHUBTRANSPORT_AMQP_METHODS_HANDLE methods_handle; + IOTHUBTRANSPORT_AMQP_METHODS_HANDLE methods_handle; // Handle to instance of module that deals with device methods for AMQP. // is subscription for methods needed? - bool subscribe_methods_needed; + bool subscribe_methods_needed; // Indicates if should subscribe for device methods. // is the transport subscribed for methods? - bool subscribed_for_methods; + bool subscribed_for_methods; // Indicates if device is subscribed for device methods. #endif -} AMQP_TRANSPORT_DEVICE_STATE; +} AMQP_TRANSPORT_DEVICE_INSTANCE; +typedef struct MESSAGE_DISPOSITION_CONTEXT_TAG +{ + AMQP_TRANSPORT_DEVICE_INSTANCE* device_state; + char* link_name; + delivery_number message_id; +} MESSAGE_DISPOSITION_CONTEXT; -// Auxiliary functions +// ---------- General Helpers ---------- // -static STRING_HANDLE concat3Params(const char* prefix, const char* infix, const char* suffix) +// @brief +// Evaluates if the ammount of time since start_time is greater or lesser than timeout_in_secs. +// @param is_timed_out +// Set to true if a timeout has been reached, false otherwise. Not set if any failure occurs. +// @returns +// 0 if no failures occur, non-zero otherwise. +static int is_timeout_reached(time_t start_time, unsigned int timeout_in_secs, bool *is_timed_out) { - STRING_HANDLE result = NULL; - char* concat; - size_t totalLength = strlen(prefix) + strlen(infix) + strlen(suffix) + 1; // One extra for \0. + int result; + + if (start_time == INDEFINITE_TIME) + { + LogError("Failed to verify timeout (start_time is INDEFINITE)"); + result = __FAILURE__; + } + else + { + time_t current_time; - if ((concat = (char*)malloc(totalLength)) != NULL) - { - (void)strcpy(concat, prefix); - (void)strcat(concat, infix); - (void)strcat(concat, suffix); - result = STRING_construct(concat); - free(concat); - } - else - { - result = NULL; - } + if ((current_time = get_time(NULL)) == INDEFINITE_TIME) + { + LogError("Failed to verify timeout (get_time failed)"); + result = __FAILURE__; + } + else + { + if (get_difftime(current_time, start_time) >= timeout_in_secs) + { + *is_timed_out = 1; + } + else + { + *is_timed_out = 0; + } - return result; + result = RESULT_OK; + } + } + + return result; } -static int getSecondsSinceEpoch(size_t* seconds) +static STRING_HANDLE get_target_iothub_fqdn(const IOTHUBTRANSPORT_CONFIG* config) { - int result; - time_t current_time; - - if ((current_time = get_time(NULL)) == INDEFINITE_TIME) - { - LogError("Failed getting the current local time (get_time() failed)"); - result = __FAILURE__; - } - else - { - *seconds = (size_t)get_difftime(current_time, (time_t)0); - - result = RESULT_OK; - } - - return result; + STRING_HANDLE fqdn; + + if (config->upperConfig->protocolGatewayHostName == NULL) + { + if ((fqdn = STRING_construct_sprintf("%s.%s", config->upperConfig->iotHubName, config->upperConfig->iotHubSuffix)) == NULL) + { + LogError("Failed to copy iotHubName and iotHubSuffix (STRING_construct_sprintf failed)"); + } + } + else if ((fqdn = STRING_construct(config->upperConfig->protocolGatewayHostName)) == NULL) + { + LogError("Failed to copy protocolGatewayHostName (STRING_construct failed)"); + } + + return fqdn; } -static STRING_HANDLE create_link_name(const char* deviceId, const char* tag, int index) -{ - STRING_HANDLE name = NULL; - char name_str[1024]; - if (sprintf(name_str, "link-%s-%s-%i", deviceId, tag, index) <= 0) - { - LogError("create_link_name failed (sprintf failed)"); - } - else if ((name = STRING_construct(name_str)) == NULL) - { - LogError("create_link_name failed (STRING_construct failed)"); - } - - return name; -} - -static STRING_HANDLE create_link_source_name(STRING_HANDLE link_name) -{ - STRING_HANDLE name = NULL; - char name_str[1024]; +// ---------- Register/Unregister Helpers ---------- // - if (sprintf(name_str, "%s-source", STRING_c_str(link_name)) <= 0) - { - LogError("create_link_source_name failed (sprintf failed)"); - } - else if ((name = STRING_construct(name_str)) == NULL) - { - LogError("create_link_source_name failed (STRING_construct failed)"); - } - - return name; -} - -static STRING_HANDLE create_link_target_name(STRING_HANDLE link_name) +static void internal_destroy_amqp_device_instance(AMQP_TRANSPORT_DEVICE_INSTANCE *trdev_inst) { - STRING_HANDLE name = NULL; - char name_str[1024]; +#ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ + if (trdev_inst->methods_handle != NULL) + { + iothubtransportamqp_methods_destroy(trdev_inst->methods_handle); + } +#endif + if (trdev_inst->device_handle != NULL) + { + device_destroy(trdev_inst->device_handle); + } - if (sprintf(name_str, "%s-target", STRING_c_str(link_name)) <= 0) - { - LogError("create_link_target_name failed (sprintf failed)"); - } - else if ((name = STRING_construct(name_str)) == NULL) - { - LogError("create_link_target_name failed (STRING_construct failed)"); - } + if (trdev_inst->device_id != NULL) + { + STRING_delete(trdev_inst->device_id); + } - return name; + free(trdev_inst); } -// Auxiliary function to be used on VECTOR_find_if() -static bool findDeviceById(const void* element, const void* value) -{ - const AMQP_TRANSPORT_DEVICE_STATE* device_state = *(const AMQP_TRANSPORT_DEVICE_STATE **)element; - const char* deviceId = (const char *)value; - - return (strcmp(STRING_c_str(device_state->deviceId), deviceId) == 0); -} - -static void trackEventInProgress(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - DList_RemoveEntryList(&message->entry); - DList_InsertTailList(&device_state->inProgress, &message->entry); -} - -static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_DEVICE_STATE* device_state) +// @brief +// Saves the new state, if it is different than the previous one. +static void on_device_state_changed_callback(void* context, DEVICE_STATE previous_state, DEVICE_STATE new_state) { - IOTHUB_MESSAGE_LIST* message; - - if (!DList_IsListEmpty(device_state->waitingToSend)) - { - PDLIST_ENTRY list_entry = device_state->waitingToSend->Flink; - message = containingRecord(list_entry, IOTHUB_MESSAGE_LIST, entry); - } - else - { - message = NULL; - } - - return message; -} - -static int isEventInInProgressList(IOTHUB_MESSAGE_LIST* message) -{ - return !DList_IsListEmpty(&message->entry); -} - -static void removeEventFromInProgressList(IOTHUB_MESSAGE_LIST* message) -{ - DList_RemoveEntryList(&message->entry); - DList_InitializeListHead(&message->entry); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_061: [If `new_state` is the same as `previous_state`, on_device_state_changed_callback shall return] + if (context != NULL && new_state != previous_state) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_062: [If `new_state` shall be saved into the `registered_device` instance] + registered_device->device_state = new_state; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_063: [If `registered_device->time_of_last_state_change` shall be set using get_time()] + registered_device->time_of_last_state_change = get_time(NULL); + } } -static void rollEventBackToWaitList(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - removeEventFromInProgressList(message); - DList_InsertTailList(device_state->waitingToSend, &message->entry); -} - -static void rollEventsBackToWaitList(AMQP_TRANSPORT_DEVICE_STATE* device_state) +// @brief Auxiliary function to be used to find a device in the registered_devices list. +// @returns true if the device ids match, false otherwise. +static bool find_device_by_id_callback(LIST_ITEM_HANDLE list_item, const void* match_context) { - PDLIST_ENTRY entry = device_state->inProgress.Blink; + bool result; + + if (match_context == NULL) + { + result = false; + } + else + { + AMQP_TRANSPORT_DEVICE_INSTANCE* device_instance = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item); - while (entry != &device_state->inProgress) - { - IOTHUB_MESSAGE_LIST* message = containingRecord(entry, IOTHUB_MESSAGE_LIST, entry); - entry = entry->Blink; - rollEventBackToWaitList(message, device_state); - } + if (device_instance == NULL || + device_instance->device_id == NULL || + STRING_c_str(device_instance->device_id) != match_context) + { + result = false; + } + else + { + result = true; + } + } + + return result; } -static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result) +// @brief Verifies if a device is already registered within the transport that owns the list of registered devices. +// @remarks Returns the correspoding LIST_ITEM_HANDLE in registered_devices, if found. +// @returns true if the device is already in the list, false otherwise. +static bool is_device_registered_ex(SINGLYLINKEDLIST_HANDLE registered_devices, const char* device_id, LIST_ITEM_HANDLE *list_item) { - IOTHUB_MESSAGE_LIST* message = (IOTHUB_MESSAGE_LIST*)context; - - IOTHUB_CLIENT_CONFIRMATION_RESULT iot_hub_send_result; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_142: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK] - if (send_result == MESSAGE_SEND_OK) - { - iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR] - else - { - iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; - } + return ((*list_item = singlylinkedlist_find(registered_devices, find_device_by_id_callback, device_id)) != NULL ? 1 : 0); +} - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided] - if (message->callback != NULL) - { - message->callback(iot_hub_send_result, message->context); - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] - if (isEventInInProgressList(message)) - { - removeEventFromInProgressList(message); - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()] - IoTHubMessage_Destroy(message->messageHandle); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance] - free(message); +// @brief Verifies if a device is already registered within the transport that owns the list of registered devices. +// @returns true if the device is already in the list, false otherwise. +static bool is_device_registered(AMQP_TRANSPORT_DEVICE_INSTANCE* amqp_device_instance) +{ + LIST_ITEM_HANDLE list_item; + const char* device_id = STRING_c_str(amqp_device_instance->device_id); + return is_device_registered_ex(amqp_device_instance->transport_instance->registered_devices, device_id, &list_item); } -static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message) + +// ---------- Callbacks ---------- // + +static MESSAGE_CALLBACK_INFO* MESSAGE_CALLBACK_INFO_Create(IOTHUB_MESSAGE_HANDLE message, DEVICE_MESSAGE_DISPOSITION_INFO* disposition_info, AMQP_TRANSPORT_DEVICE_INSTANCE* device_state) { - AMQP_VALUE result = NULL; - int api_call_result; - IOTHUB_MESSAGE_HANDLE iothub_message = NULL; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_195: [The callback 'on_message_received' shall shall get a IOTHUB_MESSAGE_HANDLE instance out of the uamqp's MESSAGE_HANDLE instance by using IoTHubMessage_CreateFromUamqpMessage()] - if ((api_call_result = IoTHubMessage_CreateFromUamqpMessage(message, &iothub_message)) != RESULT_OK) - { - LogError("Transport failed processing the message received (error = %d).", api_call_result); + MESSAGE_CALLBACK_INFO* result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_196: [If IoTHubMessage_CreateFromUamqpMessage fails, the callback 'on_message_received' shall reject the incoming message by calling messaging_delivery_rejected() and return.] - result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading AMQP message"); - } - else - { - IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] - disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); + if ((result = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO))) == NULL) + { + LogError("Failed creating MESSAGE_CALLBACK_INFO (malloc failed)"); + } + else + { + MESSAGE_DISPOSITION_CONTEXT* tc; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_197: [The callback 'on_message_received' shall destroy the IOTHUB_MESSAGE_HANDLE instance after invoking IoTHubClient_LL_MessageCallback().] - IoTHubMessage_Destroy(iothub_message); + if ((tc = (MESSAGE_DISPOSITION_CONTEXT*)malloc(sizeof(MESSAGE_DISPOSITION_CONTEXT))) == NULL) + { + LogError("Failed creating MESSAGE_DISPOSITION_CONTEXT (malloc failed)"); + free(result); + result = NULL; + } + else + { + if (mallocAndStrcpy_s(&(tc->link_name), disposition_info->source) == 0) + { + tc->device_state = device_state; + tc->message_id = disposition_info->message_id; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] - if (disposition_result == IOTHUBMESSAGE_ACCEPTED) - { - result = messaging_delivery_accepted(); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] - else if (disposition_result == IOTHUBMESSAGE_ABANDONED) - { - result = messaging_delivery_released(); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] - else if (disposition_result == IOTHUBMESSAGE_REJECTED) - { - result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); - } - } + result->messageHandle = message; + result->transportContext = tc; + } + else + { + LogError("Failed creating MESSAGE_CALLBACK_INFO (mallocAndStrcyp_s failed)"); + free(tc); + free(result); + result = NULL; + } + } + } - return result; + return result; +} + +static void MESSAGE_CALLBACK_INFO_Destroy(MESSAGE_CALLBACK_INFO* message_callback_info) +{ + free(message_callback_info->transportContext->link_name); + free(message_callback_info->transportContext); + free(message_callback_info); } -static void destroyConnection(AMQP_TRANSPORT_INSTANCE* transport_state) +static DEVICE_MESSAGE_DISPOSITION_RESULT get_device_disposition_result_from(IOTHUBMESSAGE_DISPOSITION_RESULT iothubclient_disposition_result) { - if (transport_state->cbs_connection.cbs_handle != NULL) - { - cbs_destroy(transport_state->cbs_connection.cbs_handle); - transport_state->cbs_connection.cbs_handle = NULL; - } - - if (transport_state->session != NULL) - { - session_destroy(transport_state->session); - transport_state->session = NULL; - } - - if (transport_state->connection != NULL) - { - connection_destroy(transport_state->connection); - transport_state->connection = NULL; - } + DEVICE_MESSAGE_DISPOSITION_RESULT device_disposition_result; - if (transport_state->cbs_connection.sasl_io != NULL) - { - xio_destroy(transport_state->cbs_connection.sasl_io); - transport_state->cbs_connection.sasl_io = NULL; - } - - if (transport_state->cbs_connection.sasl_mechanism != NULL) - { - saslmechanism_destroy(transport_state->cbs_connection.sasl_mechanism); - transport_state->cbs_connection.sasl_mechanism = NULL; - } + if (iothubclient_disposition_result == IOTHUBMESSAGE_ACCEPTED) + { + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_ACCEPTED; + } + else if (iothubclient_disposition_result == IOTHUBMESSAGE_ABANDONED) + { + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_RELEASED; + } + else if (iothubclient_disposition_result == IOTHUBMESSAGE_REJECTED) + { + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_REJECTED; + } + else + { + LogError("Failed getting corresponding DEVICE_MESSAGE_DISPOSITION_RESULT for IOTHUBMESSAGE_DISPOSITION_RESULT (%d is not supported)", iothubclient_disposition_result); + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_RELEASED; + } - if (transport_state->tls_io != NULL) - { - /*before destroying, we shall save its options for later use*/ - transport_state->xioOptions = xio_retrieveoptions(transport_state->tls_io); - if (transport_state->xioOptions == NULL) - { - LogError("unable to retrieve xio_retrieveoptions"); - } - - xio_destroy(transport_state->tls_io); - transport_state->tls_io = NULL; - } + return device_disposition_result; } -static void on_amqp_management_state_changed(void* context, AMQP_MANAGEMENT_STATE new_amqp_management_state, AMQP_MANAGEMENT_STATE previous_amqp_management_state) +static DEVICE_MESSAGE_DISPOSITION_RESULT on_message_received(IOTHUB_MESSAGE_HANDLE message, DEVICE_MESSAGE_DISPOSITION_INFO* disposition_info, void* context) { - (void)previous_amqp_management_state; - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + AMQP_TRANSPORT_DEVICE_INSTANCE* amqp_device_instance = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; + DEVICE_MESSAGE_DISPOSITION_RESULT device_disposition_result; + MESSAGE_CALLBACK_INFO* message_data; - if (transport_state != NULL) - { - transport_state->connection_state = new_amqp_management_state; - } + if ((message_data = MESSAGE_CALLBACK_INFO_Create(message, disposition_info, amqp_device_instance)) == NULL) + { + LogError("Failed processing message received (failed to assemble callback info)"); + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_RELEASED; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_089: [IoTHubClient_LL_MessageCallback() shall be invoked passing the client and the incoming message handles as parameters] + if (IoTHubClient_LL_MessageCallback(amqp_device_instance->iothub_client_handle, message_data) != true) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_090: [If IoTHubClient_LL_MessageCallback() fails, on_message_received_callback shall return DEVICE_MESSAGE_DISPOSITION_RESULT_RELEASED] + LogError("Failed processing message received (IoTHubClient_LL_MessageCallback failed)"); + IoTHubMessage_Destroy(message); + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_RELEASED; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_091: [If IoTHubClient_LL_MessageCallback() succeeds, on_message_received_callback shall return DEVICE_MESSAGE_DISPOSITION_RESULT_NONE] + device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_NONE; + } + } + + return device_disposition_result; } -static void on_connection_io_error(void* context) -{ - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; - - if (transport_state != NULL) - { - transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; - } -} #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ static void on_methods_error(void* context) @@ -452,14 +367,14 @@ static void on_methods_unsubscribed(void* context) { /* Codess_SRS_IOTHUBTRANSPORT_AMQP_METHODS_12_001: [ `on_methods_unsubscribed` calls iothubtransportamqp_methods_unsubscribe. ]*/ - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)context; + AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; IoTHubTransport_AMQP_Common_Unsubscribe_DeviceMethod(device_state); } static int on_method_request_received(void* context, const char* method_name, const unsigned char* request, size_t request_size, IOTHUBTRANSPORT_AMQP_METHOD_HANDLE method_handle) { int result; - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)context; + AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_017: [ `on_methods_request_received` shall call the `IoTHubClient_LL_DeviceMethodComplete` passing the method name, request buffer and size and the newly created BUFFER handle. ]*/ /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_022: [ The status code shall be the return value of the call to `IoTHubClient_LL_DeviceMethodComplete`. ]*/ @@ -475,7 +390,7 @@ return result; } -static int subscribe_methods(AMQP_TRANSPORT_DEVICE_STATE* deviceState) +static int subscribe_methods(AMQP_TRANSPORT_DEVICE_INSTANCE* deviceState) { int result; @@ -485,9 +400,16 @@ } else { + SESSION_HANDLE session_handle; + + if ((amqp_connection_get_session_handle(deviceState->transport_instance->amqp_connection, &session_handle)) != RESULT_OK) + { + LogError("Device '%s' failed subscribing for methods (failed getting session handle)", STRING_c_str(deviceState->device_id)); + result = __FAILURE__; + } /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_024: [ If the device authentication status is AUTHENTICATION_STATUS_OK and `IoTHubTransport_AMQP_Common_Subscribe_DeviceMethod` was called to register for methods, `IoTHubTransport_AMQP_Common_DoWork` shall call `iothubtransportamqp_methods_subscribe`. ]*/ /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_027: [ The current session handle shall be passed to `iothubtransportamqp_methods_subscribe`. ]*/ - if (iothubtransportamqp_methods_subscribe(deviceState->methods_handle, deviceState->transport_state->session, on_methods_error, deviceState, on_method_request_received, deviceState, on_methods_unsubscribed, deviceState) != 0) + else if (iothubtransportamqp_methods_subscribe(deviceState->methods_handle, session_handle, on_methods_error, deviceState, on_method_request_received, deviceState, on_methods_unsubscribed, deviceState) != 0) { LogError("Cannot subscribe for methods"); result = __FAILURE__; @@ -503,785 +425,777 @@ } #endif -static void set_session_options(SESSION_HANDLE session) + +// ---------- Underlying TLS I/O Helpers ---------- // + +// @brief +// Retrieves the options of the current underlying TLS I/O instance and saves in the transport instance. +// @remarks +// This is used when the new underlying I/O transport (TLS I/O, or WebSockets, etc) needs to be recreated, +// and the options previously set must persist. +// +// If no TLS I/O instance was created yet, results in failure. +// @returns +// 0 if succeeds, non-zero otherwise. +static int save_underlying_io_transport_options(AMQP_TRANSPORT_INSTANCE* transport_instance) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_065: [IoTHubTransport_AMQP_Common_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] - if (session_set_incoming_window(session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) - { - LogError("Failed to set the AMQP incoming window size."); - } + int result; + + if (transport_instance->tls_io == NULL) + { + LogError("failed saving underlying I/O transport options (tls_io instance is NULL)"); + result = __FAILURE__; + } + else + { + OPTIONHANDLER_HANDLE fresh_options; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_115: [IoTHubTransport_AMQP_Common_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] - if (session_set_outgoing_window(session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) - { - LogError("Failed to set the AMQP outgoing window size."); - } + if ((fresh_options = xio_retrieveoptions(transport_instance->tls_io)) == NULL) + { + LogError("failed saving underlying I/O transport options (tls_io instance is NULL)"); + result = __FAILURE__; + } + else + { + OPTIONHANDLER_HANDLE previous_options = transport_instance->saved_tls_options; + transport_instance->saved_tls_options = fresh_options; + + if (previous_options != NULL) + { + OptionHandler_Destroy(previous_options); + } + + result = RESULT_OK; + } + } + + return result; +} + +static void destroy_underlying_io_transport_options(AMQP_TRANSPORT_INSTANCE* transport_instance) +{ + if (transport_instance->saved_tls_options != NULL) + { + OptionHandler_Destroy(transport_instance->saved_tls_options); + transport_instance->saved_tls_options = NULL; + } } -static int establishConnection(AMQP_TRANSPORT_INSTANCE* transport_state) +// @brief +// Applies TLS I/O options if previously saved to a new TLS I/O instance. +// @returns +// 0 if succeeds, non-zero otherwise. +static int restore_underlying_io_transport_options(AMQP_TRANSPORT_INSTANCE* transport_instance, XIO_HANDLE xio_handle) +{ + int result; + + if (transport_instance->saved_tls_options == NULL) + { + result = RESULT_OK; + } + else + { + if (OptionHandler_FeedOptions(transport_instance->saved_tls_options, xio_handle) != OPTIONHANDLER_OK) + { + LogError("Failed feeding existing options to new TLS instance."); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } + + return result; +} + +// @brief Destroys the XIO_HANDLE obtained with underlying_io_transport_provider(), saving its options beforehand. +static void destroy_underlying_io_transport(AMQP_TRANSPORT_INSTANCE* transport_instance) +{ + if (transport_instance->tls_io != NULL) + { + xio_destroy(transport_instance->tls_io); + transport_instance->tls_io = NULL; + } +} + +// @brief Invokes underlying_io_transport_provider() and retrieves a new XIO_HANDLE to use for I/O (TLS, or websockets, or w/e is supported). +// @param xio_handle: if successfull, set with the new XIO_HANDLE acquired; not changed otherwise. +// @returns 0 if successfull, non-zero otherwise. +static int get_new_underlying_io_transport(AMQP_TRANSPORT_INSTANCE* transport_instance, XIO_HANDLE *xio_handle) +{ + int result; + + if ((*xio_handle = transport_instance->underlying_io_transport_provider(STRING_c_str(transport_instance->iothub_host_fqdn))) == NULL) + { + LogError("Failed to obtain a TLS I/O transport layer (underlying_io_transport_provider() failed)"); + result = __FAILURE__; + } + else + { + if (restore_underlying_io_transport_options(transport_instance, *xio_handle) != RESULT_OK) + { + /*pessimistically hope TLS will fail, be recreated and options re-given*/ + LogError("Failed to apply options previous saved to new underlying I/O transport instance."); + } + + result = RESULT_OK; + } + + return result; +} + + +// ---------- AMQP connection establishment/tear-down, connectry retry ---------- // + +static void on_amqp_connection_state_changed(const void* context, AMQP_CONNECTION_STATE previous_state, AMQP_CONNECTION_STATE new_state) +{ + if (context != NULL && new_state != previous_state) + { + AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)context; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_059: [`new_state` shall be saved in to the transport instance] + transport_instance->amqp_connection_state = new_state; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_060: [If `new_state` is AMQP_CONNECTION_STATE_ERROR, the connection shall be flagged as faulty (so the connection retry logic can be triggered)] + if (new_state == AMQP_CONNECTION_STATE_ERROR) + { + LogError("Transport received an ERROR from the amqp_connection (state changed %d->%d); it will be flagged for connection retry.", previous_state, new_state); + + transport_instance->is_connection_retry_required = true; + } + } +} + +static int establish_amqp_connection(AMQP_TRANSPORT_INSTANCE* transport_instance) { int result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_110: [IoTHubTransport_AMQP_Common_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function] - if (transport_state->tls_io == NULL && - (transport_state->tls_io = transport_state->underlying_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn))) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to obtain a TLS I/O transport layer."); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_239: [IoTHubTransport_AMQP_Common_DoWork shall apply any TLS I/O saved options to the new TLS instance using OptionHandler_FeedOptions] - if (transport_state->xioOptions != NULL) - { - if (OptionHandler_FeedOptions(transport_state->xioOptions, transport_state->tls_io) != 0) - { - LogError("unable to replay options to TLS"); /*pessimistically hope TLS will fail, be recreated and options re-given*/ - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_240: [If OptionHandler_FeedOptions succeeds, IoTHubTransport_AMQP_Common_DoWork shall destroy any TLS options saved on the transport state] - OptionHandler_Destroy(transport_state->xioOptions); - transport_state->xioOptions = NULL; - } - } - - switch (transport_state->preferred_credential_type) - { - case (DEVICE_KEY): - case (DEVICE_SAS_TOKEN): - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_056: [IoTHubTransport_AMQP_Common_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] - if ((transport_state->cbs_connection.sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_057: [If saslmechanism_create() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to create a SASL mechanism."); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_060: [IoTHubTransport_AMQP_Common_DoWork shall create the SASL I / O layer using the xio_create() C Shared Utility API] - SASLCLIENTIO_CONFIG sasl_client_config; - sasl_client_config.sasl_mechanism = transport_state->cbs_connection.sasl_mechanism; - sasl_client_config.underlying_io = transport_state->tls_io; - if ((transport_state->cbs_connection.sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_client_config)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_061: [If xio_create() fails creating the SASL I/O layer, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to create a SASL I/O layer."); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_062: [IoTHubTransport_AMQP_Common_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] - else if ((transport_state->connection = connection_create2(transport_state->cbs_connection.sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_063: [If connection_create2() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately.] - result = __FAILURE__; - LogError("Failed to create the AMQP connection."); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_137: [IoTHubTransport_AMQP_Common_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] - else if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_138 : [If session_create() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to create the AMQP session."); - } - else - { - set_session_options(transport_state->session); + if (transport_instance->preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_NOT_SET) + { + LogError("Failed establishing connection (transport doesn't have a preferred authentication mode set; unexpected!)."); + result = __FAILURE__; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_023: [If `instance->tls_io` is NULL, it shall be set invoking instance->underlying_io_transport_provider()] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_025: [When `instance->tls_io` is created, it shall be set with `instance->saved_tls_options` using OptionHandler_FeedOptions()] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_111: [If OptionHandler_FeedOptions() fails, it shall be ignored] + else if (transport_instance->tls_io == NULL && + get_new_underlying_io_transport(transport_instance, &transport_instance->tls_io) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_024: [If instance->underlying_io_transport_provider() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return] + LogError("Failed establishing connection (failed to obtain a TLS I/O transport layer)."); + result = __FAILURE__; + } + else + { + AMQP_CONNECTION_CONFIG amqp_connection_config; + amqp_connection_config.iothub_host_fqdn = STRING_c_str(transport_instance->iothub_host_fqdn); + amqp_connection_config.underlying_io_transport = transport_instance->tls_io; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_029: [`instance->is_trace_on` shall be set into `AMQP_CONNECTION_CONFIG->is_trace_on`] + amqp_connection_config.is_trace_on = transport_instance->is_trace_on; + amqp_connection_config.on_state_changed_callback = on_amqp_connection_state_changed; + amqp_connection_config.on_state_changed_context = transport_instance; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_066: [IoTHubTransport_AMQP_Common_DoWork shall establish the CBS connection using the cbs_create() AMQP API] - if ((transport_state->cbs_connection.cbs_handle = cbs_create(transport_state->session, on_amqp_management_state_changed, NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_067: [If cbs_create() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to create the CBS connection."); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_139: [IoTHubTransport_AMQP_Common_DoWork shall open the CBS connection using the cbs_open() AMQP API] - else if (cbs_open(transport_state->cbs_connection.cbs_handle) != 0) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_140: [If cbs_open() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to open the connection with CBS."); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_199: [The value of the option `logtrace` saved by the transport instance shall be applied to each new connection instance using connection_set_trace().] - connection_set_trace(transport_state->connection, transport_state->is_trace_on); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_200: [The value of the option `logtrace` saved by the transport instance shall be applied to each new SASL_IO instance using xio_setoption().] - (void)xio_setoption(transport_state->cbs_connection.sasl_io, OPTION_LOG_TRACE, &transport_state->is_trace_on); - result = RESULT_OK; - } - } - } - break; - } - case(X509): - { - /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_006: [ IoTHubTransport_AMQP_Common_DoWork shall not establish a CBS connection. ]*/ - /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_005: [ IoTHubTransport_AMQP_Common_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the TLS I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks) ]*/ - if ((transport_state->connection = connection_create2(transport_state->tls_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_063: [If connection_create2() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately.] - result = __FAILURE__; - LogError("Failed to create the AMQP connection."); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_137: [IoTHubTransport_AMQP_Common_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] - if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_138 : [If session_create() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately] - result = __FAILURE__; - LogError("Failed to create the AMQP session."); - } - else - { - set_session_options(transport_state->session); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_199: [The value of the option `logtrace` saved by the transport instance shall be applied to each new connection instance using connection_set_trace().] - connection_set_trace(transport_state->connection, transport_state->is_trace_on); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_201: [The value of the option `logtrace` saved by the transport instance shall be applied to each new TLS_IO instance using xio_setoption().] - (void)xio_setoption(transport_state->tls_io, OPTION_LOG_TRACE, &transport_state->is_trace_on); - result = RESULT_OK; - } - } - break; - } - default: - { - LogError("internal error: unexpected enum value for transport_state->credential.credentialType = %d", transport_state->preferred_credential_type); - result = __FAILURE__; - break; - } - }/*switch*/ - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_027: [If `transport->preferred_authentication_method` is CBS, AMQP_CONNECTION_CONFIG shall be set with `create_sasl_io` = true and `create_cbs_connection` = true] + if (transport_instance->preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_CBS) + { + amqp_connection_config.create_sasl_io = true; + amqp_connection_config.create_cbs_connection = true; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_028: [If `transport->preferred_credential_method` is X509, AMQP_CONNECTION_CONFIG shall be set with `create_sasl_io` = false and `create_cbs_connection` = false] + else if (transport_instance->preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_X509) + { + amqp_connection_config.create_sasl_io = false; + amqp_connection_config.create_cbs_connection = false; + } + // If new AMQP_TRANSPORT_AUTHENTICATION_MODE values are added, they need to be covered here. - if (result != RESULT_OK) - { - destroyConnection(transport_state); - } + transport_instance->amqp_connection_state = AMQP_CONNECTION_STATE_CLOSED; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_026: [If `transport->connection` is NULL, it shall be created using amqp_connection_create()] + if ((transport_instance->amqp_connection = amqp_connection_create(&amqp_connection_config)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_030: [If amqp_connection_create() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return] + LogError("Failed establishing connection (failed to create the amqp_connection instance)."); + result = __FAILURE__; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_110: [If amqp_connection_create() succeeds, IoTHubTransport_AMQP_Common_DoWork shall proceed to invoke amqp_connection_do_work] + result = RESULT_OK; + } + } return result; } -static void attachDeviceClientTypeToLink(LINK_HANDLE link) +static void prepare_device_for_connection_retry(AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device) { - fields attach_properties; - AMQP_VALUE deviceClientTypeKeyName; - AMQP_VALUE deviceClientTypeValue; - int result; - - // - // Attempt to add the device client type string to the attach properties. - // If this doesn't happen, well, this isn't that important. We can operate - // without this property. It's worth noting that even though we are going - // on, the reasons any of these operations fail don't bode well for the - // actual upcoming attach. - // - - if ((attach_properties = amqpvalue_create_map()) == NULL) - { - LogError("Failed to create the map for device client type."); - } - else - { - if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL) - { - LogError("Failed to create the key name for the device client type."); - } - else - { - if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL) - { - LogError("Failed to create the key value for the device client type."); - } - else - { - if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0) - { - LogError("Failed to set the property map for the device client type. Error code is: %d", result); - } - else if ((result = link_set_attach_properties(link, attach_properties)) != 0) - { - LogError("Unable to attach the device client type to the link properties. Error code is: %d", result); - } +#ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_032: [ Each `instance->registered_devices` shall unsubscribe from receiving C2D method requests by calling `iothubtransportamqp_methods_unsubscribe`] + iothubtransportamqp_methods_unsubscribe(registered_device->methods_handle); + registered_device->subscribed_for_methods = 0; +#endif - amqpvalue_destroy(deviceClientTypeValue); - } - - amqpvalue_destroy(deviceClientTypeKeyName); - } - - amqpvalue_destroy(attach_properties); - } -} - -static void destroyEventSender(AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - if (device_state->message_sender != NULL) - { - messagesender_destroy(device_state->message_sender); - device_state->message_sender = NULL; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_031: [device_stop() shall be invoked on all `instance->registered_devices` that are not already stopped] + if (registered_device->device_state != DEVICE_STATE_STOPPED) + { + if (device_stop(registered_device->device_handle) != RESULT_OK) + { + LogError("Failed preparing device '%s' for connection retry (device_stop failed)", STRING_c_str(registered_device->device_id)); + } + } - link_destroy(device_state->sender_link); - device_state->sender_link = NULL; - } -} - -static void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state) -{ - if (context != NULL) - { - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)context; - - if (device_state->transport_state->is_trace_on) - { - LogInfo("Event sender state changed [%s, %d->%d]", STRING_c_str(device_state->deviceId), previous_state, new_state); - } - - device_state->message_sender_state = new_state; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_192: [If a message sender instance changes its state to MESSAGE_SENDER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] - if (new_state != previous_state && new_state == MESSAGE_SENDER_STATE_ERROR) - { - device_state->transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; - } - } + registered_device->number_of_previous_failures = 0; + registered_device->number_of_send_event_complete_failures = 0; } -static int createEventSender(AMQP_TRANSPORT_DEVICE_STATE* device_state) +static void prepare_for_connection_retry(AMQP_TRANSPORT_INSTANCE* transport_instance) { - int result; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_034: [`instance->tls_io` options shall be saved on `instance->saved_tls_options` using xio_retrieveoptions()] + if (save_underlying_io_transport_options(transport_instance) != RESULT_OK) + { + LogError("Failed saving TLS I/O options while preparing for connection retry; failure will be ignored"); + } - STRING_HANDLE link_name = NULL; - STRING_HANDLE source_name = NULL; - AMQP_VALUE source = NULL; - AMQP_VALUE target = NULL; + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices); + + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_251: [Every new message_sender AMQP link shall be created using unique link and source names per device, per connection] - if ((link_name = create_link_name(STRING_c_str(device_state->deviceId), MESSAGE_SENDER_LINK_NAME_TAG, device_state->transport_state->link_count++)) == NULL) - { - LogError("Failed creating a name for the AMQP message sender link."); - result = __FAILURE__; - } - else if ((source_name = create_link_source_name(link_name)) == NULL) - { - LogError("Failed creating a name for the AMQP message sender source."); - result = __FAILURE__; - } - else if ((source = messaging_create_source(STRING_c_str(source_name))) == NULL) - { - LogError("Failed creating AMQP messaging source attribute."); - result = __FAILURE__; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_252: [The message_sender AMQP link shall be created using the `target` address created according to SRS_IOTHUBTRANSPORTAMQP_09_014] - else if ((target = messaging_create_target(STRING_c_str(device_state->targetAddress))) == NULL) - { - LogError("Failed creating AMQP messaging target attribute."); - result = __FAILURE__; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_068: [IoTHubTransport_AMQP_Common_DoWork shall create the AMQP link using link_create(), with role as 'role_sender'] - else if ((device_state->sender_link = link_create(device_state->transport_state->session, STRING_c_str(link_name), role_sender, source, target)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_069: [If IoTHubTransport_AMQP_Common_DoWork fails to create the AMQP link for sending messages, the function shall fail and return immediately, flagging the connection to be re-stablished] - LogError("Failed creating AMQP link for message sender."); - result = __FAILURE__; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_119: [IoTHubTransport_AMQP_Common_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size'] - if (link_set_max_message_size(device_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK) - { - LogError("Failed setting AMQP link max message size."); - } + if (registered_device == NULL) + { + LogError("Failed preparing device for connection retry (singlylinkedlist_item_get_value failed)"); + } + else + { + prepare_device_for_connection_retry(registered_device); + } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_249: [The message sender link should have a property set with the type and version of the IoT Hub client application, set as `CLIENT_DEVICE_TYPE_PREFIX/IOTHUB_SDK_VERSION`] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_250: [If the message sender link fails to have the client type and version set on its properties, the failure shall be ignored] - attachDeviceClientTypeToLink(device_state->sender_link); + list_item = singlylinkedlist_get_next_item(list_item); + } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_070: [IoTHubTransport_AMQP_Common_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_191: [IoTHubTransport_AMQP_Common_DoWork shall create each AMQP message sender tracking its state changes with a callback function] - if ((device_state->message_sender = messagesender_create(device_state->sender_link, on_event_sender_state_changed, (void*)device_state)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_071: [IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] - LogError("Could not allocate AMQP message sender"); - result = __FAILURE__; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_072: [IoTHubTransport_AMQP_Common_DoWork shall open the AMQP message sender using messagesender_open() AMQP API] - if (messagesender_open(device_state->message_sender) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_073: [IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately if the AMQP message sender instance fails to be opened, flagging the connection to be re-established] - LogError("Failed opening the AMQP message sender."); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } - } - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_033: [`instance->connection` shall be destroyed using amqp_connection_destroy()] + amqp_connection_destroy(transport_instance->amqp_connection); + transport_instance->amqp_connection = NULL; + transport_instance->amqp_connection_state = AMQP_CONNECTION_STATE_CLOSED; - if (link_name != NULL) - STRING_delete(link_name); - if (source_name != NULL) - STRING_delete(source_name); - if (source != NULL) - amqpvalue_destroy(source); - if (target != NULL) - amqpvalue_destroy(target); - - return result; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_035: [`instance->tls_io` shall be destroyed using xio_destroy()] + destroy_underlying_io_transport(transport_instance); } -static int destroyMessageReceiver(AMQP_TRANSPORT_DEVICE_STATE* device_state) + +// @brief Verifies if the crendentials used by the device match the requirements and authentication mode currently supported by the transport. +// @returns true if credentials are good, false otherwise. +static bool is_device_credential_acceptable(const IOTHUB_DEVICE_CONFIG* device_config, AMQP_TRANSPORT_AUTHENTICATION_MODE preferred_authentication_mode) { - int result; + bool result; - if (device_state->message_receiver == NULL) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_003: [IoTHubTransport_AMQP_Common_Register shall return NULL if both deviceKey and deviceSasToken are not NULL.] + if ((device_config->deviceSasToken != NULL) && (device_config->deviceKey != NULL)) + { + LogError("Credential of device '%s' is not acceptable (must provide EITHER deviceSasToken OR deviceKey)", device_config->deviceId); + result = false; + } + else if (preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_NOT_SET) { - result = RESULT_OK; + result = true; + } + else if (preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_X509 && (device_config->deviceKey != NULL || device_config->deviceSasToken != NULL)) + { + LogError("Credential of device '%s' is not acceptable (transport is using X509 certificate authentication, but device config contains deviceKey or sasToken)", device_config->deviceId); + result = false; + } + else if (preferred_authentication_mode != AMQP_TRANSPORT_AUTHENTICATION_MODE_X509 && (device_config->deviceKey == NULL && device_config->deviceSasToken == NULL)) + { + LogError("Credential of device '%s' is not acceptable (transport is using CBS authentication, but device config does not contain deviceKey nor sasToken)", device_config->deviceId); + result = false; } else { - if (messagereceiver_close(device_state->message_receiver) != RESULT_OK) - { - LogError("Failed closing the AMQP message receiver."); - result = __FAILURE__; - } - else - { - messagereceiver_destroy(device_state->message_receiver); - - device_state->message_receiver = NULL; - - link_destroy(device_state->receiver_link); - - device_state->receiver_link = NULL; - - result = RESULT_OK; - } - } - - return result; -} - -static void on_message_receiver_state_changed(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state) -{ - if (context != NULL) - { - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)context; - - if (device_state->transport_state->is_trace_on) - { - LogInfo("Message receiver state changed [%s; %d->%d]", STRING_c_str(device_state->deviceId), previous_state, new_state); - } - - device_state->message_receiver_state = new_state; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_190: [If a message_receiver instance changes its state to MESSAGE_RECEIVER_STATE_ERROR (first transition only) the connection retry logic shall be triggered] - if (new_state != previous_state && new_state == MESSAGE_RECEIVER_STATE_ERROR) - { - device_state->transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR; - } - } -} - -static int createMessageReceiver(AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - int result; - - STRING_HANDLE link_name = NULL; - STRING_HANDLE target_name = NULL; - AMQP_VALUE source = NULL; - AMQP_VALUE target = NULL; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_246: [Every new message_receiver AMQP link shall be created using unique link and target names per device, per connection] - if ((link_name = create_link_name(STRING_c_str(device_state->deviceId), MESSAGE_RECEIVER_LINK_NAME_TAG, device_state->transport_state->link_count++)) == NULL) - { - LogError("Failed creating a name for the AMQP message receiver link."); - result = __FAILURE__; - } - else if ((target_name = create_link_target_name(link_name)) == NULL) - { - LogError("Failed creating a name for the AMQP message receiver target."); - result = __FAILURE__; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_253: [The message_receiver AMQP link shall be created using the `source` address created according to SRS_IOTHUBTRANSPORTAMQP_09_053] - else if ((source = messaging_create_source(STRING_c_str(device_state->messageReceiveAddress))) == NULL) - { - LogError("Failed creating AMQP message receiver source attribute."); - result = __FAILURE__; - } - else if ((target = messaging_create_target(STRING_c_str(target_name))) == NULL) - { - LogError("Failed creating AMQP message receiver target attribute."); - result = __FAILURE__; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_074: [IoTHubTransport_AMQP_Common_DoWork shall create the AMQP link using link_create(), with role as 'role_receiver'] - else if ((device_state->receiver_link = link_create(device_state->transport_state->session, STRING_c_str(link_name), role_receiver, source, target)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_075: [If IoTHubTransport_AMQP_Common_DoWork fails to create the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] - LogError("Failed creating AMQP link for message receiver."); - result = __FAILURE__; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_076: [IoTHubTransport_AMQP_Common_DoWork shall set the receiver link settle mode as receiver_settle_mode_first] - else if (link_set_rcv_settle_mode(device_state->receiver_link, receiver_settle_mode_first) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_141: [If IoTHubTransport_AMQP_Common_DoWork fails to set the settle mode on the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] - LogError("Failed setting AMQP link settle mode for message receiver."); - result = __FAILURE__; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_119: [IoTHubTransport_AMQP_Common_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size'] - if (link_set_max_message_size(device_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK) - { - LogError("Failed setting AMQP link max message size for message receiver."); - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_247: [The message receiver link should have a property set with the type and version of the IoT Hub client application, set as `CLIENT_DEVICE_TYPE_PREFIX/IOTHUB_SDK_VERSION`] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_248: [If the message receiver link fails to have the client type and version set on its properties, the failure shall be ignored] - attachDeviceClientTypeToLink(device_state->receiver_link); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_077: [IoTHubTransport_AMQP_Common_DoWork shall create the AMQP message receiver using messagereceiver_create() AMQP API] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_189: [IoTHubTransport_AMQP_Common_DoWork shall create each AMQP message_receiver tracking its state changes with a callback function] - if ((device_state->message_receiver = messagereceiver_create(device_state->receiver_link, on_message_receiver_state_changed, (void*)device_state)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_078: [IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] - LogError("Could not allocate AMQP message receiver."); - result = __FAILURE__; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_079: [IoTHubTransport_AMQP_Common_DoWork shall open the AMQP message receiver using messagereceiver_open() AMQP API, passing a callback function for handling C2D incoming messages] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_123: [IoTHubTransport_AMQP_Common_DoWork shall create each AMQP message_receiver passing the 'on_message_received' as the callback function] - if (messagereceiver_open(device_state->message_receiver, on_message_received, (const void*)device_state->iothub_client_handle) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_080: [IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established] - LogError("Failed opening the AMQP message receiver."); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } - } - } - - if (link_name != NULL) - STRING_delete(link_name); - if (target_name != NULL) - STRING_delete(target_name); - if (source != NULL) - amqpvalue_destroy(source); - if (target != NULL) - amqpvalue_destroy(target); - - return result; -} - -static int sendPendingEvents(AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - int result = RESULT_OK; - IOTHUB_MESSAGE_LIST* message; - - while ((message = getNextEventToSend(device_state)) != NULL) - { - result = __FAILURE__; - - MESSAGE_HANDLE amqp_message = NULL; - bool is_message_error = false; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_086: [IoTHubTransport_AMQP_Common_DoWork shall move queued events to an "in-progress" list right before processing them for sending] - trackEventInProgress(message, device_state); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_193: [IoTHubTransport_AMQP_Common_DoWork shall get a MESSAGE_HANDLE instance out of the event's IOTHUB_MESSAGE_HANDLE instance by using message_create_from_iothub_message().] - if ((result = message_create_from_iothub_message(message->messageHandle, &amqp_message)) != RESULT_OK) - { - LogError("Failed creating AMQP message (error=%d).", result); - result = __FAILURE__; - is_message_error = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_097: [IoTHubTransport_AMQP_Common_DoWork shall pass the MESSAGE_HANDLE intance to uAMQP for sending (along with on_message_send_complete callback) using messagesender_send()] - else if (messagesender_send(device_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK) - { - LogError("Failed sending the AMQP message."); - result = __FAILURE__; - } - else - { - result = RESULT_OK; - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_194: [IoTHubTransport_AMQP_Common_DoWork shall destroy the MESSAGE_HANDLE instance after messagesender_send() is invoked.] - if (amqp_message != NULL) - { - // It can be destroyed because AMQP keeps a clone of the message. - message_destroy(amqp_message); - } - - if (result != RESULT_OK) - { - if (is_message_error) - { - on_message_send_complete(message, MESSAGE_SEND_ERROR); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_111: [If message_create_from_iothub_message() fails, IoTHubTransport_AMQP_Common_DoWork notify the failure, roll back the event to waitToSend list and return] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_113: [If messagesender_send() fails, IoTHubTransport_AMQP_Common_DoWork notify the failure, roll back the event to waitToSend list and return] - rollEventBackToWaitList(message, device_state); - break; - } - } - } - - return result; -} - -static void prepareDeviceForConnectionRetry(AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - if (authentication_reset(device_state->authentication) != RESULT_OK) - { - LogError("Failed resetting the authenticatication state of device %s", STRING_c_str(device_state->deviceId)); - } - -#ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - iothubtransportamqp_methods_unsubscribe(device_state->methods_handle); - device_state->subscribed_for_methods = false; -#endif - - destroyMessageReceiver(device_state); - destroyEventSender(device_state); - rollEventsBackToWaitList(device_state); -} - -static void prepareForConnectionRetry(AMQP_TRANSPORT_INSTANCE* transport_state) -{ - size_t number_of_registered_devices = VECTOR_size(transport_state->registered_devices); - - for (size_t i = 0; i < number_of_registered_devices; i++) - { - AMQP_TRANSPORT_DEVICE_STATE* device_state = *(AMQP_TRANSPORT_DEVICE_STATE**)VECTOR_element(transport_state->registered_devices, i); - - prepareDeviceForConnectionRetry(device_state); - } - - destroyConnection(transport_state); - transport_state->connection_state = AMQP_MANAGEMENT_STATE_IDLE; -} - -static int is_credential_compatible(const IOTHUB_DEVICE_CONFIG* device_config, AMQP_TRANSPORT_CREDENTIAL_TYPE preferred_authentication_type) -{ - int result; - - if (preferred_authentication_type == CREDENTIAL_NOT_BUILD) - { - result = RESULT_OK; - } - else if (preferred_authentication_type == X509 && (device_config->deviceKey != NULL || device_config->deviceSasToken != NULL)) - { - LogError("Incompatible credentials: transport is using X509 certificate authentication, but device config contains deviceKey and/or sasToken"); - result = __FAILURE__; - } - else if (preferred_authentication_type != X509 && (device_config->deviceKey == NULL && device_config->deviceSasToken == NULL)) - { - LogError("Incompatible credentials: transport is using CBS authentication, but device config does not contain deviceKey nor sasToken"); - result = __FAILURE__; - } - else - { - result = RESULT_OK; + result = true; } return result; } -// API functions + +//---------- DoWork Helpers ----------// + +static IOTHUB_MESSAGE_LIST* get_next_event_to_send(AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device) +{ + IOTHUB_MESSAGE_LIST* message; + + if (!DList_IsListEmpty(registered_device->waiting_to_send)) + { + PDLIST_ENTRY list_entry = registered_device->waiting_to_send->Flink; + message = containingRecord(list_entry, IOTHUB_MESSAGE_LIST, entry); + (void)DList_RemoveEntryList(list_entry); + } + else + { + message = NULL; + } + + return message; +} + +// @brief "Parses" the D2C_EVENT_SEND_RESULT (from iothubtransport_amqp_device module) into a IOTHUB_CLIENT_CONFIRMATION_RESULT. +static IOTHUB_CLIENT_CONFIRMATION_RESULT get_iothub_client_confirmation_result_from(D2C_EVENT_SEND_RESULT result) +{ + IOTHUB_CLIENT_CONFIRMATION_RESULT iothub_send_result; + + switch (result) + { + case D2C_EVENT_SEND_COMPLETE_RESULT_OK: + iothub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK; + break; + case D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE: + case D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING: + iothub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; + break; + case D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_TIMEOUT: + iothub_send_result = IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT; + break; + case D2C_EVENT_SEND_COMPLETE_RESULT_DEVICE_DESTROYED: + iothub_send_result = IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY; + break; + case D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_UNKNOWN: + default: + iothub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; + break; + } + + return iothub_send_result; +} + +// @brief +// Callback function for device_send_event_async. +static void on_event_send_complete(IOTHUB_MESSAGE_LIST* message, D2C_EVENT_SEND_RESULT result, void* context) +{ + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; + + if (result != D2C_EVENT_SEND_COMPLETE_RESULT_OK && result != D2C_EVENT_SEND_COMPLETE_RESULT_DEVICE_DESTROYED) + { + registered_device->number_of_send_event_complete_failures++; + } + else + { + registered_device->number_of_send_event_complete_failures = 0; + } + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_056: [If `message->callback` is not NULL, it shall invoked with the `iothub_send_result`] + if (message->callback != NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_050: [If result is D2C_EVENT_SEND_COMPLETE_RESULT_OK, `iothub_send_result` shall be set using IOTHUB_CLIENT_CONFIRMATION_OK] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_051: [If result is D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE, `iothub_send_result` shall be set using IOTHUB_CLIENT_CONFIRMATION_ERROR] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_052: [If result is D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING, `iothub_send_result` shall be set using IOTHUB_CLIENT_CONFIRMATION_ERROR] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_053: [If result is D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_TIMEOUT, `iothub_send_result` shall be set using IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_054: [If result is D2C_EVENT_SEND_COMPLETE_RESULT_DEVICE_DESTROYED, `iothub_send_result` shall be set using IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_055: [If result is D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_UNKNOWN, `iothub_send_result` shall be set using IOTHUB_CLIENT_CONFIRMATION_ERROR] + IOTHUB_CLIENT_CONFIRMATION_RESULT iothub_send_result = get_iothub_client_confirmation_result_from(result); + + message->callback(iothub_send_result, message->context); + } + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_057: [`message->messageHandle` shall be destroyed using IoTHubMessage_Destroy] + IoTHubMessage_Destroy(message->messageHandle); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_058: [`message` shall be destroyed using free] + free(message); +} + +// @brief +// Gets events from wait to send list and sends to service in the order they were added. +// @returns +// 0 if all events could be sent to the next layer successfully, non-zero otherwise. +static int send_pending_events(AMQP_TRANSPORT_DEVICE_INSTANCE* device_state) +{ + int result; + IOTHUB_MESSAGE_LIST* message; + + result = RESULT_OK; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_047: [If the registered device is started, each event on `registered_device->wait_to_send_list` shall be removed from the list and sent using device_send_event_async()] + while ((message = get_next_event_to_send(device_state)) != NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_048: [device_send_event_async() shall be invoked passing `on_event_send_complete`] + if (device_send_event_async(device_state->device_handle, message, on_event_send_complete, device_state) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_049: [If device_send_event_async() fails, `on_event_send_complete` shall be invoked passing EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING and return] + LogError("Device '%s' failed to send message (device_send_event_async failed)", STRING_c_str(device_state->device_id)); + result = __FAILURE__; + + on_event_send_complete(message, D2C_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING, device_state); + break; + } + } + + return result; +} + +// @brief +// Auxiliary function for the public DoWork API, performing DoWork activities (authenticate, messaging) for a specific device. +// @requires +// The transport to have a valid instance of AMQP_CONNECTION (from which to obtain SESSION_HANDLE and CBS_HANDLE) +// @returns +// 0 if no errors occur, non-zero otherwise. +static int IoTHubTransport_AMQP_Common_Device_DoWork(AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device) +{ + int result; + + if (registered_device->device_state != DEVICE_STATE_STARTED) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_036: [If the device state is DEVICE_STATE_STOPPED, it shall be started] + if (registered_device->device_state == DEVICE_STATE_STOPPED) + { + SESSION_HANDLE session_handle; + CBS_HANDLE cbs_handle = NULL; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_039: [amqp_connection_get_session_handle() shall be invoked on `instance->connection`] + if (amqp_connection_get_session_handle(registered_device->transport_instance->amqp_connection, &session_handle) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_040: [If amqp_connection_get_session_handle() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return] + LogError("Failed performing DoWork for device '%s' (failed to get the amqp_connection session_handle)", STRING_c_str(registered_device->device_id)); + result = __FAILURE__; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_037: [If transport is using CBS authentication, amqp_connection_get_cbs_handle() shall be invoked on `instance->connection`] + else if (registered_device->transport_instance->preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_CBS && + amqp_connection_get_cbs_handle(registered_device->transport_instance->amqp_connection, &cbs_handle) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_038: [If amqp_connection_get_cbs_handle() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and return] + LogError("Failed performing DoWork for device '%s' (failed to get the amqp_connection cbs_handle)", STRING_c_str(registered_device->device_id)); + result = __FAILURE__; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_041: [The device handle shall be started using device_start_async()] + else if (device_start_async(registered_device->device_handle, session_handle, cbs_handle) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_042: [If device_start_async() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and skip to the next registered device] + LogError("Failed performing DoWork for device '%s' (failed to start device)", STRING_c_str(registered_device->device_id)); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } + else if (registered_device->device_state == DEVICE_STATE_STARTING || + registered_device->device_state == DEVICE_STATE_STOPPING) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_043: [If the device handle is in state DEVICE_STATE_STARTING or DEVICE_STATE_STOPPING, it shall be checked for state change timeout] + bool is_timed_out; + if (is_timeout_reached(registered_device->time_of_last_state_change, registered_device->max_state_change_timeout_secs, &is_timed_out) != RESULT_OK) + { + LogError("Failed performing DoWork for device '%s' (failed tracking timeout of device %d state)", STRING_c_str(registered_device->device_id), registered_device->device_state); + registered_device->device_state = DEVICE_STATE_ERROR_AUTH; // if time could not be calculated, the worst must be assumed. + result = __FAILURE__; + } + else if (is_timed_out) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_044: [If the device times out in state DEVICE_STATE_STARTING or DEVICE_STATE_STOPPING, the registered device shall be marked with failure] + LogError("Failed performing DoWork for device '%s' (device failed to start or stop within expected timeout)", STRING_c_str(registered_device->device_id)); + registered_device->device_state = DEVICE_STATE_ERROR_AUTH; // this will cause device to be stopped bellow on the next call to this function. + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } + else // i.e., DEVICE_STATE_ERROR_AUTH || DEVICE_STATE_ERROR_AUTH_TIMEOUT || DEVICE_STATE_ERROR_MSG + { + LogError("Failed performing DoWork for device '%s' (device reported state %d; number of previous failures: %d)", + STRING_c_str(registered_device->device_id), registered_device->device_state, registered_device->number_of_previous_failures); + + registered_device->number_of_previous_failures++; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_046: [If the device has failed for MAX_NUMBER_OF_DEVICE_FAILURES in a row, it shall trigger a connection retry on the transport] + if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + { + result = __FAILURE__; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_045: [If the registered device has a failure, it shall be stopped using device_stop()] + else if (device_stop(registered_device->device_handle) != RESULT_OK) + { + LogError("Failed to stop reset device '%s' (device_stop failed)", STRING_c_str(registered_device->device_id)); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } + } +#ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_031: [ Once the device is authenticated, `iothubtransportamqp_methods_subscribe` shall be invoked (subsequent DoWork calls shall not call it if already subscribed). ] + else if (registered_device->subscribe_methods_needed && + !registered_device->subscribed_for_methods && + subscribe_methods(registered_device) != RESULT_OK) + { + LogError("Failed performing DoWork for device '%s' (failed registering for device methods)", STRING_c_str(registered_device->device_id)); + registered_device->number_of_previous_failures++; + result = __FAILURE__; + } +#endif + else + { + if (send_pending_events(registered_device) != RESULT_OK) + { + LogError("Failed performing DoWork for device '%s' (failed sending pending events)", STRING_c_str(registered_device->device_id)); + registered_device->number_of_previous_failures++; + result = __FAILURE__; + } + else + { + registered_device->number_of_previous_failures = 0; + result = RESULT_OK; + } + } + + // No harm in invoking this as API will simply exit if the state is not "started". + device_do_work(registered_device->device_handle); + + return result; +} + + +//---------- SetOption-ish Helpers ----------// + +// @brief +// Gets all the device-specific options and replicates them into this new registered device. +// @returns +// 0 if the function succeeds, non-zero otherwise. +static int replicate_device_options_to(AMQP_TRANSPORT_DEVICE_INSTANCE* dev_instance, DEVICE_AUTH_MODE auth_mode) +{ + int result; + + if (device_set_option( + dev_instance->device_handle, + DEVICE_OPTION_EVENT_SEND_TIMEOUT_SECS, + &dev_instance->transport_instance->option_send_event_timeout_secs) != RESULT_OK) + { + LogError("Failed to apply option DEVICE_OPTION_EVENT_SEND_TIMEOUT_SECS to device '%s' (device_set_option failed)", STRING_c_str(dev_instance->device_id)); + result = __FAILURE__; + } + else if (auth_mode == DEVICE_AUTH_MODE_CBS) + { + if (device_set_option( + dev_instance->device_handle, + DEVICE_OPTION_CBS_REQUEST_TIMEOUT_SECS, + &dev_instance->transport_instance->option_cbs_request_timeout_secs) != RESULT_OK) + { + LogError("Failed to apply option DEVICE_OPTION_CBS_REQUEST_TIMEOUT_SECS to device '%s' (device_set_option failed)", STRING_c_str(dev_instance->device_id)); + result = __FAILURE__; + } + else if (device_set_option( + dev_instance->device_handle, + DEVICE_OPTION_SAS_TOKEN_LIFETIME_SECS, + &dev_instance->transport_instance->option_sas_token_lifetime_secs) != RESULT_OK) + { + LogError("Failed to apply option DEVICE_OPTION_SAS_TOKEN_LIFETIME_SECS to device '%s' (device_set_option failed)", STRING_c_str(dev_instance->device_id)); + result = __FAILURE__; + } + else if (device_set_option( + dev_instance->device_handle, + DEVICE_OPTION_SAS_TOKEN_REFRESH_TIME_SECS, + &dev_instance->transport_instance->option_sas_token_refresh_time_secs) != RESULT_OK) + { + LogError("Failed to apply option DEVICE_OPTION_SAS_TOKEN_REFRESH_TIME_SECS to device '%s' (device_set_option failed)", STRING_c_str(dev_instance->device_id)); + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + } + else + { + result = RESULT_OK; + } + + return result; +} + +// @brief +// Translates from the option names supported by iothubtransport_amqp_common to the ones supported by iothubtransport_amqp_device. +static const char* get_device_option_name_from(const char* iothubclient_option_name) +{ + const char* device_option_name; + + if (strcmp(OPTION_SAS_TOKEN_LIFETIME, iothubclient_option_name) == 0) + { + device_option_name = DEVICE_OPTION_SAS_TOKEN_LIFETIME_SECS; + } + else if (strcmp(OPTION_SAS_TOKEN_REFRESH_TIME, iothubclient_option_name) == 0) + { + device_option_name = DEVICE_OPTION_SAS_TOKEN_REFRESH_TIME_SECS; + } + else if (strcmp(OPTION_CBS_REQUEST_TIMEOUT, iothubclient_option_name) == 0) + { + device_option_name = DEVICE_OPTION_CBS_REQUEST_TIMEOUT_SECS; + } + else if (strcmp(OPTION_EVENT_SEND_TIMEOUT_SECS, iothubclient_option_name) == 0) + { + device_option_name = DEVICE_OPTION_EVENT_SEND_TIMEOUT_SECS; + } + else + { + device_option_name = NULL; + } + + return device_option_name; +} + +// @brief +// Auxiliary function invoked by IoTHubTransport_AMQP_Common_SetOption to set an option on every registered device. +// @returns +// 0 if it succeeds, non-zero otherwise. +static int IoTHubTransport_AMQP_Common_Device_SetOption(TRANSPORT_LL_HANDLE handle, const char* option, void* value) +{ + int result; + const char* device_option; + + if ((device_option = get_device_option_name_from(option)) == NULL) + { + LogError("failed setting option '%s' to registered device (could not match name to options supported by device)", option); + result = __FAILURE__; + } + else + { + AMQP_TRANSPORT_INSTANCE* instance = (AMQP_TRANSPORT_INSTANCE*)handle; + result = RESULT_OK; + + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(instance->registered_devices); + + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("failed setting option '%s' to registered device (singlylinkedlist_item_get_value failed)", option); + result = __FAILURE__; + break; + } + else if (device_set_option(registered_device->device_handle, device_option, value) != RESULT_OK) + { + LogError("failed setting option '%s' to registered device '%s' (device_set_option failed)", + option, STRING_c_str(registered_device->device_id)); + result = __FAILURE__; + break; + } + + list_item = singlylinkedlist_get_next_item(list_item); + } + } + + return result; +} + +static void internal_destroy_instance(AMQP_TRANSPORT_INSTANCE* instance) +{ + if (instance != NULL) + { + if (instance->registered_devices != NULL) + { + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(instance->registered_devices); + + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item); + list_item = singlylinkedlist_get_next_item(list_item); + IoTHubTransport_AMQP_Common_Unregister(registered_device); + } + + singlylinkedlist_destroy(instance->registered_devices); + } + + if (instance->amqp_connection != NULL) + { + amqp_connection_destroy(instance->amqp_connection); + } + + destroy_underlying_io_transport(instance); + destroy_underlying_io_transport_options(instance); + + STRING_delete(instance->iothub_host_fqdn); + + free(instance); + } +} + + +// ---------- API functions ---------- // TRANSPORT_LL_HANDLE IoTHubTransport_AMQP_Common_Create(const IOTHUBTRANSPORT_CONFIG* config, AMQP_GET_IO_TRANSPORT get_io_transport) { - AMQP_TRANSPORT_INSTANCE* transport_state = NULL; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_005: [If parameter config or config->upperConfig are NULL then IoTHubTransport_AMQP_Common_Create shall fail and return NULL.] - if (config == NULL || config->upperConfig == NULL) - { - LogError("IoTHub AMQP client transport null configuration parameter."); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_006: [IoTHubTransport_AMQP_Common_Create shall fail and return NULL if any fields of the config->upperConfig structure are NULL.] - else if (config->upperConfig->protocol == NULL) - { - LogError("Invalid configuration (NULL protocol detected)"); - } - else if (config->upperConfig->iotHubName == NULL) - { - LogError("Invalid configuration (NULL iotHubName detected)"); - } - else if (config->upperConfig->iotHubSuffix == NULL) - { - LogError("Invalid configuration (NULL iotHubSuffix detected)"); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_134: [IoTHubTransport_AMQP_Common_Create shall fail and return NULL if the combined length of config->iotHubName and config->iotHubSuffix exceeds 254 bytes (RFC1035)] - else if ((strlen(config->upperConfig->iotHubName) + strlen(config->upperConfig->iotHubSuffix)) > (RFC1035_MAX_FQDN_LENGTH - 1)) - { - LogError("The lengths of iotHubName and iotHubSuffix together exceed the maximum FQDN length allowed (RFC 1035)"); - } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_254: [If parameter `get_io_transport` is NULL then IoTHubTransport_AMQP_Common_Create shall fail and return NULL.] - else if (get_io_transport == NULL) - { - LogError("Invalid configuration (get_io_transport is NULL)"); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_009: [IoTHubTransport_AMQP_Common_Create shall fail and return NULL if memory allocation of the transport's internal state structure fails.] - if ((transport_state = (AMQP_TRANSPORT_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_INSTANCE))) == NULL) - { - LogError("Could not allocate AMQP transport state"); - } - else - { - bool cleanup_required = false; + TRANSPORT_LL_HANDLE result; - transport_state->iotHubHostFqdn = NULL; - transport_state->connection = NULL; - transport_state->connection_state = AMQP_MANAGEMENT_STATE_IDLE; - transport_state->session = NULL; - transport_state->tls_io = NULL; - transport_state->underlying_io_transport_provider = get_io_transport; - transport_state->is_trace_on = false; - - transport_state->cbs_connection.cbs_handle = NULL; - transport_state->cbs_connection.sasl_io = NULL; - transport_state->cbs_connection.sasl_mechanism = NULL; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [IoTHubTransport_AMQP_Common_Create shall set parameter device_state->sas_token_lifetime with the default value of 3600000 (milliseconds).] - transport_state->cbs_connection.sas_token_lifetime = DEFAULT_SAS_TOKEN_LIFETIME_MS; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_128: [IoTHubTransport_AMQP_Common_Create shall set parameter device_state->sas_token_refresh_time with the default value of sas_token_lifetime/2 (milliseconds).] - transport_state->cbs_connection.sas_token_refresh_time = transport_state->cbs_connection.sas_token_lifetime / 2; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_129 : [IoTHubTransport_AMQP_Common_Create shall set parameter device_state->cbs_request_timeout with the default value of 30000 (milliseconds).] - transport_state->cbs_connection.cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS; - - transport_state->preferred_credential_type = CREDENTIAL_NOT_BUILD; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_001: [If `config` or `config->upperConfig` or `get_io_transport` are NULL then IoTHubTransport_AMQP_Common_Create shall fail and return NULL.] + if (config == NULL || config->upperConfig == NULL || get_io_transport == NULL) + { + LogError("IoTHub AMQP client transport null configuration parameter (config=%p, get_io_transport=%p).", config, get_io_transport); + result = NULL; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_002: [IoTHubTransport_AMQP_Common_Create shall fail and return NULL if `config->upperConfig->protocol` is NULL] + else if (config->upperConfig->protocol == NULL) + { + LogError("Failed to create the AMQP transport common instance (NULL parameter received: protocol=%p, iotHubName=%p, iotHubSuffix=%p)", + config->upperConfig->protocol, config->upperConfig->iotHubName, config->upperConfig->iotHubSuffix); + result = NULL; + } + else + { + AMQP_TRANSPORT_INSTANCE* instance; - transport_state->xioOptions = NULL; - transport_state->link_count = 0; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_010: [If config->upperConfig->protocolGatewayHostName is NULL, IoTHubTransport_AMQP_Common_Create shall create an immutable string, referred to as iotHubHostFqdn, from the following pieces: config->iotHubName + "." + config->iotHubSuffix.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_20_001: [If config->upperConfig->protocolGatewayHostName is not NULL, IoTHubTransport_AMQP_Common_Create shall use it as iotHubHostFqdn] - if ((transport_state->iotHubHostFqdn = (config->upperConfig->protocolGatewayHostName != NULL ? STRING_construct(config->upperConfig->protocolGatewayHostName) : concat3Params(config->upperConfig->iotHubName, ".", config->upperConfig->iotHubSuffix))) == NULL) - { - LogError("Failed to set transport_state->iotHubHostFqdn."); - cleanup_required = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_218: [IoTHubTransport_AMQP_Common_Create shall initialize the transport state registered device list with a VECTOR instance.] - else if ((transport_state->registered_devices = VECTOR_create(sizeof(IOTHUB_DEVICE_HANDLE))) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_219: [If VECTOR_create fails, IoTHubTransport_AMQP_Common_Create shall fail and return.] - LogError("Failed to initialize the internal list of registered devices"); - cleanup_required = true; - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_236: [If IoTHubTransport_AMQP_Common_Create fails it shall free any memory it allocated (iotHubHostFqdn, transport state).] - if (cleanup_required) - { - if (transport_state->iotHubHostFqdn != NULL) - STRING_delete(transport_state->iotHubHostFqdn); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_003: [Memory shall be allocated for the transport's internal state structure (`instance`)] + if ((instance = (AMQP_TRANSPORT_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_INSTANCE))) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_004: [If malloc() fails, IoTHubTransport_AMQP_Common_Create shall fail and return NULL] + LogError("Could not allocate AMQP transport state (malloc failed)"); + result = NULL; + } + else + { + memset(instance, 0, sizeof(AMQP_TRANSPORT_INSTANCE)); - free(transport_state); - transport_state = NULL; - } - } - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_023: [If IoTHubTransport_AMQP_Common_Create succeeds it shall return a non-NULL pointer to the structure that represents the transport.] - return transport_state; -} - -static RESULT device_DoWork(AMQP_TRANSPORT_DEVICE_STATE* device_state) -{ - RESULT result = RESULT_SUCCESS; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_243: [IoTHubTransport_AMQP_Common_DoWork shall retrieve the authenticatication status of the device using deviceauthentication_get_status()] - AUTHENTICATION_STATUS auth_status = authentication_get_status(device_state->authentication); - - switch (auth_status) - { - case AUTHENTICATION_STATUS_IDLE: - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_243: [If the device authentication status is AUTHENTICATION_STATUS_IDLE, IoTHubTransport_AMQP_Common_DoWork shall authenticate it using authentication_authenticate()] - if (authentication_authenticate(device_state->authentication) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_146: [If authentication_authenticate() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and process the next device] - LogError("Failed authenticating AMQP connection [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_RETRYABLE_ERROR; - } - break; - case AUTHENTICATION_STATUS_REFRESH_REQUIRED: - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_081: [If the device authentication status is AUTHENTICATION_STATUS_REFRESH_REQUIRED, IoTHubTransport_AMQP_Common_DoWork shall refresh it using authentication_refresh()] - if (authentication_refresh(device_state->authentication) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_082: [**If authentication_refresh() fails, IoTHubTransport_AMQP_Common_DoWork shall fail and process the next device] - LogError("AMQP transport failed to refresh authentication [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_RETRYABLE_ERROR; - } - break; - case AUTHENTICATION_STATUS_OK: -#ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_031: [ `iothubtransportamqp_methods_subscribe` shall only be called once (subsequent DoWork calls shall not call it if already subscribed). ]*/ - if ((device_state->subscribe_methods_needed) && - (subscribe_methods(device_state) != 0)) - { - LogError("Failed subscribing for methods"); - } -#endif + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_005: [If `config->upperConfig->protocolGatewayHostName` is NULL, `instance->iothub_target_fqdn` shall be set as `config->upperConfig->iotHubName` + "." + `config->upperConfig->iotHubSuffix`] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_006: [If `config->upperConfig->protocolGatewayHostName` is not NULL, `instance->iothub_target_fqdn` shall be set with a copy of it] + if ((instance->iothub_host_fqdn = get_target_iothub_fqdn(config)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_007: [If `instance->iothub_target_fqdn` fails to be set, IoTHubTransport_AMQP_Common_Create shall fail and return NULL] + LogError("Failed to obtain the iothub target fqdn."); + result = NULL; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_008: [`instance->registered_devices` shall be set using singlylinkedlist_create()] + else if ((instance->registered_devices = singlylinkedlist_create()) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_009: [If singlylinkedlist_create() fails, IoTHubTransport_AMQP_Common_Create shall fail and return NULL] + LogError("Failed to initialize the internal list of registered devices (singlylinkedlist_create failed)"); + result = NULL; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_010: [`get_io_transport` shall be saved on `instance->underlying_io_transport_provider`] + instance->underlying_io_transport_provider = get_io_transport; + instance->preferred_authentication_mode = AMQP_TRANSPORT_AUTHENTICATION_MODE_NOT_SET; + instance->option_sas_token_lifetime_secs = DEFAULT_SAS_TOKEN_LIFETIME_SECS; + instance->option_sas_token_refresh_time_secs = DEFAULT_SAS_TOKEN_REFRESH_TIME_SECS; + instance->option_cbs_request_timeout_secs = DEFAULT_CBS_REQUEST_TIMEOUT_SECS; + instance->option_send_event_timeout_secs = DEFAULT_EVENT_SEND_TIMEOUT_SECS; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_012: [If IoTHubTransport_AMQP_Common_Create succeeds it shall return a pointer to `instance`.] + result = (TRANSPORT_LL_HANDLE)instance; + } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_145: [If the device authentication status is AUTHENTICATION_STATUS_OK, IoTHubTransport_AMQP_Common_DoWork shall proceed to sending events, registering for messages] - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_121: [IoTHubTransport_AMQP_Common_DoWork shall create an AMQP message_receiver if transport_state->message_receive is NULL and transport_state->receive_messages is true] - if (device_state->receive_messages == true && - device_state->message_receiver == NULL && - createMessageReceiver(device_state) != RESULT_OK) - { - LogError("Failed creating AMQP transport message receiver [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_CRITICAL_ERROR; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_122: [IoTHubTransport_AMQP_Common_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] - else if (device_state->receive_messages == false && - device_state->message_receiver != NULL && - destroyMessageReceiver(device_state) != RESULT_OK) - { - LogError("Failed destroying AMQP transport message receiver [%s]", STRING_c_str(device_state->deviceId)); - } - - if (device_state->message_sender == NULL && - createEventSender(device_state) != RESULT_OK) - { - LogError("Failed creating AMQP transport event sender [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_CRITICAL_ERROR; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_245: [IoTHubTransport_AMQP_Common_DoWork shall skip sending events if the state of the message_sender is not MESSAGE_SENDER_STATE_OPEN] - else if (device_state->message_sender_state == MESSAGE_SENDER_STATE_OPEN && - sendPendingEvents(device_state) != RESULT_OK) - { - LogError("AMQP transport failed sending events [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_CRITICAL_ERROR; - } - break; - case AUTHENTICATION_STATUS_FAILURE: - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_083: [If the device authentication status is AUTHENTICATION_STATUS_FAILURE, IoTHubTransport_AMQP_Common_DoWork shall fail and process the next device] - LogError("Authentication failed [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_CRITICAL_ERROR; - break; - case AUTHENTICATION_STATUS_TIMEOUT: - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_084: [If the device authentication status is AUTHENTICATION_STATUS_TIMEOUT, IoTHubTransport_AMQP_Common_DoWork shall fail and process the next device] - LogError("Authentication timed-out [%s]", STRING_c_str(device_state->deviceId)); - result = RESULT_CRITICAL_ERROR; - break; - default: - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_244: [If the device authentication status is AUTHENTICATION_STATUS_IN_PROGRESS, IoTHubTransport_AMQP_Common_DoWork shall skip and process the next device] - break; - } + if (result == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_011: [If IoTHubTransport_AMQP_Common_Create fails it shall free any memory it allocated] + internal_destroy_instance(instance); + } + } + } return result; } @@ -1297,60 +1211,75 @@ void IoTHubTransport_AMQP_Common_DoWork(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) { - (void)iotHubClientHandle; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_051: [IoTHubTransport_AMQP_Common_DoWork shall fail and return immediately if the transport handle parameter is NULL] + (void)iotHubClientHandle; // unused as of now. + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_016: [If `handle` is NULL, IoTHubTransport_AMQP_Common_DoWork shall return without doing any work] if (handle == NULL) { LogError("IoTHubClient DoWork failed: transport handle parameter is NULL."); } else { - bool trigger_connection_retry = false; - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - size_t number_of_registered_devices = VECTOR_size(transport_state->registered_devices); + AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle; + LIST_ITEM_HANDLE list_item; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_017: [If `instance->is_connection_retry_required` is true, IoTHubTransport_AMQP_Common_DoWork shall trigger the connection-retry logic and return] + if (transport_instance->is_connection_retry_required) + { + LogError("An error occured on AMQP connection. The connection will be restablished."); + + prepare_for_connection_retry(transport_instance); + + transport_instance->is_connection_retry_required = false; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_018: [If there are no devices registered on the transport, IoTHubTransport_AMQP_Common_DoWork shall skip do_work for devices] + else if ((list_item = singlylinkedlist_get_head_item(transport_instance->registered_devices)) != NULL) + { + // We need to check if there are devices, otherwise the amqp_connection won't be able to be created since + // there is not a preferred authentication mode set yet on the transport. - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_237: [IoTHubTransport_AMQP_Common_DoWork shall return immediately if there are no devices registered on the transport] - if (number_of_registered_devices > 0) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_019: [If `instance->amqp_connection` is NULL, it shall be established] + if (transport_instance->amqp_connection == NULL && establish_amqp_connection(transport_instance) != RESULT_OK) + { + LogError("AMQP transport failed to establish connection with service."); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_020: [If the amqp_connection is OPENED, the transport shall iterate through each registered device and perform a device-specific do_work on each] + else if (transport_instance->amqp_connection_state == AMQP_CONNECTION_STATE_OPENED) + { + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("Transport had an unexpected failure during DoWork (failed to fetch a registered_devices list item value)"); + } + else if (registered_device->number_of_send_event_complete_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + { + LogError("Device '%s' reported a critical failure (events completed sending with failures); connection retry will be triggered.", STRING_c_str(registered_device->device_id)); + + transport_instance->is_connection_retry_required = true; + } + else if (IoTHubTransport_AMQP_Common_Device_DoWork(registered_device) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_021: [If DoWork fails for the registered device for more than MAX_NUMBER_OF_DEVICE_FAILURES, connection retry shall be triggered] + if (registered_device->number_of_previous_failures >= MAX_NUMBER_OF_DEVICE_FAILURES) + { + LogError("Device '%s' reported a critical failure; connection retry will be triggered.", STRING_c_str(registered_device->device_id)); + + transport_instance->is_connection_retry_required = true; + } + } + + list_item = singlylinkedlist_get_next_item(list_item); + } + } + } + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_022: [If `instance->amqp_connection` is not NULL, amqp_connection_do_work shall be invoked] + if (transport_instance->amqp_connection != NULL) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_238: [If the transport state has a faulty connection state, IoTHubTransport_AMQP_Common_DoWork shall trigger the connection-retry logic] - if (transport_state->connection != NULL && - transport_state->connection_state == AMQP_MANAGEMENT_STATE_ERROR) - { - LogError("An error occured on AMQP connection. The connection will be restablished."); - trigger_connection_retry = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_055: [If the transport handle has a NULL connection, IoTHubTransport_AMQP_Common_DoWork shall instantiate and initialize the AMQP components and establish the connection] - else if (transport_state->connection == NULL && - establishConnection(transport_state) != RESULT_OK) - { - LogError("AMQP transport failed to establish connection with service."); - trigger_connection_retry = true; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_241: [IoTHubTransport_AMQP_Common_DoWork shall iterate through all its registered devices to process authentication, events to be sent, messages to be received] - for (size_t i = 0; i < number_of_registered_devices; i++) - { - AMQP_TRANSPORT_DEVICE_STATE* device_state = *(AMQP_TRANSPORT_DEVICE_STATE**)VECTOR_element(transport_state->registered_devices, i); - - RESULT actionable_result = device_DoWork(device_state); - - if (actionable_result == RESULT_CRITICAL_ERROR) - { - trigger_connection_retry = true; - } - } - } - - if (trigger_connection_retry) - { - prepareForConnectionRetry(transport_state); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_103: [IoTHubTransport_AMQP_Common_DoWork shall invoke connection_dowork() on AMQP for triggering sending and receiving messages] - connection_dowork(transport_state->connection); - } + amqp_connection_do_work(transport_instance->amqp_connection); } } } @@ -1359,7 +1288,7 @@ { int result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_037: [IoTHubTransport_AMQP_Common_Subscribe shall fail if the transport handle parameter received is NULL.] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_084: [If `handle` is NULL, IoTHubTransport_AMQP_Common_Subscribe shall return a non-zero result] if (handle == NULL) { LogError("Invalid handle to IoTHubClient AMQP transport device handle."); @@ -1367,10 +1296,26 @@ } else { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_038: [IoTHubTransport_AMQP_Common_Subscribe shall set transport_handle->receive_messages to true and return success code.] - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)handle; - device_state->receive_messages = true; - result = 0; + AMQP_TRANSPORT_DEVICE_INSTANCE* amqp_device_instance = (AMQP_TRANSPORT_DEVICE_INSTANCE*)handle; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_085: [If `amqp_device_instance` is not registered, IoTHubTransport_AMQP_Common_Subscribe shall return a non-zero result] + if (!is_device_registered(amqp_device_instance)) + { + LogError("Device '%s' failed subscribing to cloud-to-device messages (device is not registered)", STRING_c_str(amqp_device_instance->device_id)); + result = __FAILURE__; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_086: [device_subscribe_message() shall be invoked passing `on_message_received_callback`] + else if (device_subscribe_message(amqp_device_instance->device_handle, on_message_received, amqp_device_instance) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_087: [If device_subscribe_message() fails, IoTHubTransport_AMQP_Common_Subscribe shall return a non-zero result] + LogError("Device '%s' failed subscribing to cloud-to-device messages (device_subscribe_message failed)", STRING_c_str(amqp_device_instance->device_id)); + result = __FAILURE__; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_088: [If no failures occur, IoTHubTransport_AMQP_Common_Subscribe shall return 0] + result = RESULT_OK; + } } return result; @@ -1378,16 +1323,25 @@ void IoTHubTransport_AMQP_Common_Unsubscribe(IOTHUB_DEVICE_HANDLE handle) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_039: [IoTHubTransport_AMQP_Common_Unsubscribe shall fail if the transport handle parameter received is NULL.] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_093: [If `handle` is NULL, IoTHubTransport_AMQP_Common_Subscribe shall return] if (handle == NULL) { LogError("Invalid handle to IoTHubClient AMQP transport device handle."); } else { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_040: [IoTHubTransport_AMQP_Common_Unsubscribe shall set transport_handle->receive_messages to false.] - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)handle; - device_state->receive_messages = false; + AMQP_TRANSPORT_DEVICE_INSTANCE* amqp_device_instance = (AMQP_TRANSPORT_DEVICE_INSTANCE*)handle; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_094: [If `amqp_device_instance` is not registered, IoTHubTransport_AMQP_Common_Subscribe shall return] + if (!is_device_registered(amqp_device_instance)) + { + LogError("Device '%s' failed unsubscribing to cloud-to-device messages (device is not registered)", STRING_c_str(amqp_device_instance->device_id)); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_095: [device_unsubscribe_message() shall be invoked passing `amqp_device_instance->device_handle`] + else if (device_unsubscribe_message(amqp_device_instance->device_handle) != RESULT_OK) + { + LogError("Device '%s' failed unsubscribing to cloud-to-device messages (device_unsubscribe_message failed)", STRING_c_str(amqp_device_instance->device_id)); + } } } @@ -1420,7 +1374,7 @@ else { #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)handle; + AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)handle; /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_026: [ `IoTHubTransport_AMQP_Common_Subscribe_DeviceMethod` shall remember that a subscribe is to be performed in the next call to DoWork and on success it shall return 0. ]*/ /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_005: [ If the transport is already subscribed to receive C2D method requests, `IoTHubTransport_AMQP_Common_Subscribe_DeviceMethod` shall perform no additional action and return 0. ]*/ device_state->subscribe_methods_needed = true; @@ -1445,14 +1399,15 @@ else { #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)handle; + AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)handle; /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_008: [ If the transport is not subscribed to receive C2D method requests then `IoTHubTransport_AMQP_Common_Unsubscribe_DeviceMethod` shall do nothing. ]*/ if (device_state->subscribe_methods_needed) { /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_007: [ `IoTHubTransport_AMQP_Common_Unsubscribe_DeviceMethod` shall unsubscribe from receiving C2D method requests by calling `iothubtransportamqp_methods_unsubscribe`. ]*/ - device_state->subscribed_for_methods = false; - iothubtransportamqp_methods_unsubscribe(device_state->methods_handle); + device_state->subscribed_for_methods = false; + device_state->subscribe_methods_needed = false; + iothubtransportamqp_methods_unsubscribe(device_state->methods_handle); } #else LogError("Not implemented"); @@ -1467,7 +1422,7 @@ (void)status_response; (void)methodId; int result; - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)handle; + AMQP_TRANSPORT_DEVICE_INSTANCE* device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)handle; if (device_state != NULL) { #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ @@ -1499,34 +1454,40 @@ { IOTHUB_CLIENT_RESULT result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_041: [IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] - if (handle == NULL) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_096: [If `handle` or `iotHubClientStatus` are NULL, IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG] + if (handle == NULL || iotHubClientStatus == NULL) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid handle to IoTHubClient AMQP transport instance."); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_041: [IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] - else if (iotHubClientStatus == NULL) - { - result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS."); + LogError("Failed retrieving the device send status (either handle (%p) or iotHubClientStatus (%p) are NULL)", handle, iotHubClientStatus); } else { - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)handle; + AMQP_TRANSPORT_DEVICE_INSTANCE* amqp_device_state = (AMQP_TRANSPORT_DEVICE_INSTANCE*)handle; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_043: [IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] - if (!DList_IsListEmpty(device_state->waitingToSend) || !DList_IsListEmpty(&(device_state->inProgress))) - { - *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_042: [IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] - else - { - *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE; - } + DEVICE_SEND_STATUS device_send_status; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_097: [IoTHubTransport_AMQP_Common_GetSendStatus shall invoke device_get_send_status()] + if (device_get_send_status(amqp_device_state->device_handle, &device_send_status) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_098: [If device_get_send_status() fails, IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_ERROR] + LogError("Failed retrieving the device send status (device_get_send_status failed)"); + result = IOTHUB_CLIENT_ERROR; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_099: [If device_get_send_status() returns DEVICE_SEND_STATUS_BUSY, IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY] + if (device_send_status == DEVICE_SEND_STATUS_BUSY) + { + *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_100: [If device_get_send_status() returns DEVICE_SEND_STATUS_IDLE, IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE] + else // DEVICE_SEND_STATUS_IDLE + { + *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE; + } - result = IOTHUB_CLIENT_OK; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_109: [If no failures occur, IoTHubTransport_AMQP_Common_GetSendStatus shall return IOTHUB_CLIENT_OK] + result = IOTHUB_CLIENT_OK; + } } return result; @@ -1536,137 +1497,122 @@ { IOTHUB_CLIENT_RESULT result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_044: [If handle parameter is NULL then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_045: [If parameter optionName is NULL then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_046: [If parameter value is NULL then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] - if ( - (handle == NULL) || - (option == NULL) || - (value == NULL) - ) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_101: [If `handle`, `option` or `value` are NULL then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] + if ((handle == NULL) || (option == NULL) || (value == NULL)) { - result = IOTHUB_CLIENT_INVALID_ARG; - LogError("Invalid parameter (NULL) passed to AMQP transport SetOption()"); + LogError("Invalid parameter (NULL) passed to AMQP transport SetOption (handle=%p, options=%p, value=%p)", handle, option, value); + result = IOTHUB_CLIENT_INVALID_ARG; } else { - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] - if (strcmp(OPTION_SAS_TOKEN_LIFETIME, option) == 0) - { - transport_state->cbs_connection.sas_token_lifetime = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] - else if (strcmp(OPTION_SAS_TOKEN_REFRESH_TIME, option) == 0) - { - transport_state->cbs_connection.sas_token_refresh_time = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] - else if (strcmp(OPTION_CBS_REQUEST_TIMEOUT, option) == 0) - { - transport_state->cbs_connection.cbs_request_timeout = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } - else if (strcmp(OPTION_LOG_TRACE, option) == 0) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_198: [If `optionName` is `logtrace`, IoTHubTransport_AMQP_Common_SetOption shall save the value on the transport instance.] - transport_state->is_trace_on = *((bool*)value); + AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle; + bool is_device_specific_option; - if (transport_state->connection != NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_202: [If `optionName` is `logtrace`, IoTHubTransport_AMQP_Common_SetOption shall apply it using connection_set_trace() to current connection instance if it exists and return IOTHUB_CLIENT_OK.] - connection_set_trace(transport_state->connection, transport_state->is_trace_on); - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_102: [If `option` is a device-specific option, it shall be saved and applied to each registered device using device_set_option()] + if (strcmp(OPTION_SAS_TOKEN_LIFETIME, option) == 0) + { + is_device_specific_option = true; + transport_instance->option_sas_token_lifetime_secs = *(size_t*)value; + } + else if (strcmp(OPTION_SAS_TOKEN_REFRESH_TIME, option) == 0) + { + is_device_specific_option = true; + transport_instance->option_sas_token_refresh_time_secs = *(size_t*)value; + } + else if (strcmp(OPTION_CBS_REQUEST_TIMEOUT, option) == 0) + { + is_device_specific_option = true; + transport_instance->option_cbs_request_timeout_secs = *(size_t*)value; + } + else if (strcmp(OPTION_EVENT_SEND_TIMEOUT_SECS, option) == 0) + { + is_device_specific_option = true; + transport_instance->option_send_event_timeout_secs = *(size_t*)value; + } + else + { + is_device_specific_option = false; + } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_203: [If `optionName` is `logtrace`, IoTHubTransport_AMQP_Common_SetOption shall apply it using xio_setoption() to current SASL IO instance if it exists.] - if (transport_state->cbs_connection.sasl_io != NULL && - xio_setoption(transport_state->cbs_connection.sasl_io, OPTION_LOG_TRACE, &transport_state->is_trace_on) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_204: [If xio_setoption() fails, IoTHubTransport_AMQP_Common_SetOption shall fail and return IOTHUB_CLIENT_ERROR.] - LogError("IoTHubTransport_AMQP_Common_SetOption failed (xio_setoption failed to set logging on SASL IO)"); - result = IOTHUB_CLIENT_ERROR; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_205: [If xio_setoption() succeeds, IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_OK.] - result = IOTHUB_CLIENT_OK; - } - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_047: [If the option name does not match one of the options handled by this module, IoTHubTransport_AMQP_Common_SetOption shall pass the value and name to the XIO using xio_setoption().] - else - { - result = IOTHUB_CLIENT_OK; + if (is_device_specific_option) + { + if (IoTHubTransport_AMQP_Common_Device_SetOption(handle, option, (void*)value) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_103: [If device_set_option() fails, IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_ERROR] + LogError("transport failed setting option '%s' (failed setting option on one or more registered devices)", option); + result = IOTHUB_CLIENT_ERROR; + } + else + { + result = IOTHUB_CLIENT_OK; + } + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_104: [If `option` is `logtrace`, `value` shall be saved and applied to `instance->connection` using amqp_connection_set_logging()] + else if (strcmp(OPTION_LOG_TRACE, option) == 0) + { + transport_instance->is_trace_on = *((bool*)value); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_007: [ If optionName is x509certificate and the authentication method is not x509 then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ] - if (strcmp(OPTION_X509_CERT, option) == 0) - { - if (transport_state->preferred_credential_type == CREDENTIAL_NOT_BUILD) - { - transport_state->preferred_credential_type = X509; - } - else if (transport_state->preferred_credential_type != X509) - { - LogError("x509certificate specified, but authentication method is not x509"); - result = IOTHUB_CLIENT_INVALID_ARG; - } - } - /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_008: [ If optionName is x509privatekey and the authentication method is not x509 then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ]*/ - else if (strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) - { - if (transport_state->preferred_credential_type == CREDENTIAL_NOT_BUILD) - { - transport_state->preferred_credential_type = X509; - } - else if (transport_state->preferred_credential_type != X509) - { - LogError("x509privatekey specified, but authentication method is not x509"); - result = IOTHUB_CLIENT_INVALID_ARG; - } - } + if (transport_instance->amqp_connection != NULL && + amqp_connection_set_logging(transport_instance->amqp_connection, transport_instance->is_trace_on) != RESULT_OK) + { + LogError("transport failed setting option '%s' (amqp_connection_set_logging failed)", option); + result = IOTHUB_CLIENT_ERROR; + } + else + { + result = IOTHUB_CLIENT_OK; + } + } + else + { + result = IOTHUB_CLIENT_OK; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_007: [ If `option` is `x509certificate` and the transport preferred authentication method is not x509 then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_008: [ If `option` is `x509privatekey` and the transport preferred authentication method is not x509 then IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG. ] + if (strcmp(OPTION_X509_CERT, option) == 0 || strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) + { + if (transport_instance->preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_NOT_SET) + { + transport_instance->preferred_authentication_mode = AMQP_TRANSPORT_AUTHENTICATION_MODE_X509; + } + else if (transport_instance->preferred_authentication_mode != AMQP_TRANSPORT_AUTHENTICATION_MODE_X509) + { + LogError("transport failed setting option '%s' (preferred authentication method is not x509)", option); + result = IOTHUB_CLIENT_INVALID_ARG; + } + } - if (result != IOTHUB_CLIENT_INVALID_ARG) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_206: [If the TLS IO does not exist, IoTHubTransport_AMQP_Common_SetOption shall create it and save it on the transport instance.] - if (transport_state->tls_io == NULL && - (transport_state->tls_io = transport_state->underlying_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn))) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_207: [If IoTHubTransport_AMQP_Common_SetOption fails creating the TLS IO instance, it shall fail and return IOTHUB_CLIENT_ERROR.] - result = IOTHUB_CLIENT_ERROR; - LogError("IoTHubTransport_AMQP_Common_SetOption failed (failed to obtain a TLS I/O transport layer)."); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_208: [When a new TLS IO instance is created, IoTHubTransport_AMQP_Common_SetOption shall apply the TLS I/O Options with OptionHandler_FeedOptions() if it is has any saved.] - if (transport_state->xioOptions != NULL) - { - if (OptionHandler_FeedOptions(transport_state->xioOptions, transport_state->tls_io) != 0) - { - LogError("IoTHubTransport_AMQP_Common_SetOption failed (unable to replay options to TLS)"); - } - else - { - OptionHandler_Destroy(transport_state->xioOptions); - transport_state->xioOptions = NULL; - } - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_105: [If `option` does not match one of the options handled by this module, it shall be passed to `instance->tls_io` using xio_setoption()] + if (result != IOTHUB_CLIENT_INVALID_ARG) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_106: [If `instance->tls_io` is NULL, it shall be set invoking instance->underlying_io_transport_provider()] + if (transport_instance->tls_io == NULL && + get_new_underlying_io_transport(transport_instance, &transport_instance->tls_io) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_107: [If instance->underlying_io_transport_provider() fails, IoTHubTransport_AMQP_Common_SetOption shall fail and return IOTHUB_CLIENT_ERROR] + LogError("transport failed setting option '%s' (failed to obtain a TLS I/O transport).", option); + result = IOTHUB_CLIENT_ERROR; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_108: [When `instance->tls_io` is created, IoTHubTransport_AMQP_Common_SetOption shall apply `instance->saved_tls_options` with OptionHandler_FeedOptions()] + else if (xio_setoption(transport_instance->tls_io, option, value) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_001: [If xio_setoption fails, IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_ERROR.] + LogError("transport failed setting option '%s' (xio_setoption failed)", option); + result = IOTHUB_CLIENT_ERROR; + } + else + { + if (save_underlying_io_transport_options(transport_instance) != RESULT_OK) + { + LogError("IoTHubTransport_AMQP_Common_SetOption failed to save underlying I/O options; failure will be ignored"); + } - if (xio_setoption(transport_state->tls_io, option, value) != RESULT_OK) - { - /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_001: [If xio_setoption fails, IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_ERROR.] */ - result = IOTHUB_CLIENT_ERROR; - LogError("Invalid option (%s) passed to IoTHubTransport_AMQP_Common_SetOption", option); - } - else - { - result = IOTHUB_CLIENT_OK; - } - } - } - } - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_001: [If no failures occur, IoTHubTransport_AMQP_Common_SetOption shall return IOTHUB_CLIENT_OK.] + result = IOTHUB_CLIENT_OK; + } + } + } + } return result; } @@ -1677,178 +1623,145 @@ UNUSED(iotHubClientHandle); #endif - IOTHUB_DEVICE_HANDLE result = NULL; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_17_001: [IoTHubTransport_AMQP_Common_Register shall return NULL if device, or waitingToSend are NULL.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_17_005: [IoTHubTransport_AMQP_Common_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.] - if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL)) + IOTHUB_DEVICE_HANDLE result; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_17_005: [If `handle`, `device`, `iotHubClientHandle` or `waitingToSend` is NULL, IoTHubTransport_AMQP_Common_Register shall return NULL] + if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL) || (iotHubClientHandle == NULL)) { - LogError("invalid parameter TRANSPORT_LL_HANDLE handle=%p, const IOTHUB_DEVICE_CONFIG* device=%p, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle=%p, PDLIST_ENTRY waitingToSend=%p", + LogError("invalid parameter TRANSPORT_LL_HANDLE handle=%p, const IOTHUB_DEVICE_CONFIG* device=%p, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle=%p, PDLIST_ENTRY waiting_to_send=%p", handle, device, iotHubClientHandle, waitingToSend); + result = NULL; } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_220: [IoTHubTransport_AMQP_Common_Register shall fail and return NULL if the IOTHUB_CLIENT_LL_HANDLE is NULL.] - else if (iotHubClientHandle == NULL) - { - LogError("IoTHubTransport_AMQP_Common_Register failed (invalid parameter; iotHubClientHandle list is NULL)"); - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_002: [IoTHubTransport_AMQP_Common_Register shall return NULL if `device->deviceId` is NULL.] + else if (device->deviceId == NULL) + { + LogError("Transport failed to register device (device_id provided is NULL)"); + result = NULL; + } else { - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; + LIST_ITEM_HANDLE list_item; + AMQP_TRANSPORT_INSTANCE* transport_instance = (AMQP_TRANSPORT_INSTANCE*)handle; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_002: [IoTHubTransport_AMQP_Common_Register shall return NULL if deviceId is NULL.] - if (device->deviceId == NULL) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_064: [If the device is already registered, IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] + if (is_device_registered_ex(transport_instance->registered_devices, device->deviceId, &list_item)) + { + LogError("IoTHubTransport_AMQP_Common_Register failed (device '%s' already registered on this transport instance)", device->deviceId); + result = NULL; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_065: [IoTHubTransport_AMQP_Common_Register shall fail and return NULL if the device is not using an authentication mode compatible with the currently used by the transport.] + else if (!is_device_credential_acceptable(device, transport_instance->preferred_authentication_mode)) { - LogError("IoTHubTransport_AMQP_Common_Register failed (deviceId provided is NULL)"); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_03_003: [IoTHubTransport_AMQP_Common_Register shall return NULL if both deviceKey and deviceSasToken are not NULL.] - else if ((device->deviceSasToken != NULL) && (device->deviceKey != NULL)) - { - LogError("IoTHubTransport_AMQP_Common_Register failed (invalid IOTHUB_DEVICE_CONFIG; must provide EITHER 'deviceSasToken' OR 'deviceKey')"); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_221: [IoTHubTransport_AMQP_Common_Register shall fail and return NULL if the device is not using an authentication mode compatible with the currently used by the transport.] - else if (is_credential_compatible(device, transport_state->preferred_credential_type) != RESULT_OK) - { - LogError("IoTHubTransport_AMQP_Common_Register failed (transport does not support mixed authentication methods)"); - } + LogError("Transport failed to register device '%s' (device credential was not accepted)", device->deviceId); + result = NULL; + } else { - AMQP_TRANSPORT_DEVICE_STATE* device_state; + AMQP_TRANSPORT_DEVICE_INSTANCE* amqp_device_instance; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_222: [If a device matching the deviceId provided is already registered, IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - if (VECTOR_find_if(transport_state->registered_devices, findDeviceById, device->deviceId) != NULL) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_066: [IoTHubTransport_AMQP_Common_Register shall allocate an instance of AMQP_TRANSPORT_DEVICE_INSTANCE to store the state of the new registered device.] + if ((amqp_device_instance = (AMQP_TRANSPORT_DEVICE_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_DEVICE_INSTANCE))) == NULL) { - LogError("IoTHubTransport_AMQP_Common_Register failed (device '%s' already registered on this transport instance)", device->deviceId); - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_223: [IoTHubTransport_AMQP_Common_Register shall allocate an instance of AMQP_TRANSPORT_DEVICE_STATE to store the state of the new registered device.] - else if ((device_state = (AMQP_TRANSPORT_DEVICE_STATE*)malloc(sizeof(AMQP_TRANSPORT_DEVICE_STATE))) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_224: [If malloc fails to allocate memory for AMQP_TRANSPORT_DEVICE_STATE, IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed (malloc failed)"); - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_067: [If malloc fails, IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] + LogError("Transport failed to register device '%s' (failed to create the device state instance; malloc failed)", device->deviceId); + result = NULL; + } else { - bool cleanup_required; - const char* deviceId = device->deviceId; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_225: [IoTHubTransport_AMQP_Common_Register shall save the handle references to the IoTHubClient, transport, waitingToSend list on the device state.] - device_state->iothub_client_handle = iotHubClientHandle; - device_state->transport_state = transport_state; - - device_state->waitingToSend = waitingToSend; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_226: [IoTHubTransport_AMQP_Common_Register shall initialize the device state inProgress list using DList_InitializeListHead().] - DList_InitializeListHead(&device_state->inProgress); - - device_state->deviceId = NULL; - device_state->authentication = NULL; - device_state->devicesPath = NULL; - device_state->messageReceiveAddress = NULL; - device_state->targetAddress = NULL; - - device_state->receive_messages = false; - device_state->message_receiver = NULL; - device_state->message_sender = NULL; - device_state->message_sender_state = MESSAGE_SENDER_STATE_IDLE; - device_state->receiver_link = NULL; - device_state->sender_link = NULL; + memset(amqp_device_instance, 0, sizeof(AMQP_TRANSPORT_DEVICE_INSTANCE)); + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_068: [IoTHubTransport_AMQP_Common_Register shall save the handle references to the IoTHubClient, transport, waitingToSend list on `amqp_device_instance`.] + amqp_device_instance->iothub_client_handle = iotHubClientHandle; + amqp_device_instance->transport_instance = transport_instance; + amqp_device_instance->waiting_to_send = waitingToSend; + amqp_device_instance->device_state = DEVICE_STATE_STOPPED; + amqp_device_instance->max_state_change_timeout_secs = DEFAULT_DEVICE_STATE_CHANGE_TIMEOUT_SECS; + #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - device_state->subscribe_methods_needed = false; - device_state->subscribed_for_methods = false; + amqp_device_instance->subscribe_methods_needed = false; + amqp_device_instance->subscribed_for_methods = false; #endif - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_227: [IoTHubTransport_AMQP_Common_Register shall store a copy of config->deviceId into device_state->deviceId.] - if ((device_state->deviceId = STRING_construct(deviceId)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_228: [If STRING_construct fails to copy config->deviceId, IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed to copy the deviceId."); - cleanup_required = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_012: [IoTHubTransport_AMQP_Common_Register shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + "/devices/" + deviceId.] - else if ((device_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", STRING_c_str(device_state->deviceId))) == NULL) + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_069: [A copy of `config->deviceId` shall be saved into `device_state->device_id`] + if ((amqp_device_instance->device_id = STRING_construct(device->deviceId)) == NULL) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_013: [If creating devicesPath fails for any reason then IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed to construct the devicesPath."); - cleanup_required = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_014: [IoTHubTransport_AMQP_Common_Register shall create an immutable string, referred to as targetAddress, from the following parts: "amqps://" + devicesPath + "/messages/events".] - else if ((device_state->targetAddress = concat3Params("amqps://", STRING_c_str(device_state->devicesPath), "/messages/events")) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_015: [If creating the targetAddress fails for any reason then IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed to construct the targetAddress."); - cleanup_required = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_053: [IoTHubTransport_AMQP_Common_Register shall define the source address for receiving messages as "amqps://" + devicesPath + "/messages/devicebound", stored in the transport handle as messageReceiveAddress] - else if ((device_state->messageReceiveAddress = concat3Params("amqps://", STRING_c_str(device_state->devicesPath), "/messages/devicebound")) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_054: [If creating the messageReceiveAddress fails for any reason then IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed to construct the messageReceiveAddress."); - cleanup_required = true; - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_070: [If STRING_construct() fails, IoTHubTransport_AMQP_Common_Register shall fail and return NULL] + LogError("Transport failed to register device '%s' (failed to copy the deviceId)", device->deviceId); + result = NULL; + } else { + DEVICE_CONFIG device_config; + memset(&device_config, 0, sizeof(DEVICE_CONFIG)); + device_config.device_id = (char*)device->deviceId; + device_config.iothub_host_fqdn = (char*)STRING_c_str(transport_instance->iothub_host_fqdn); + device_config.device_primary_key = (char*)device->deviceKey; + device_config.device_sas_token = (char*)device->deviceSasToken; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_072: [The configuration for device_create shall be set according to the authentication preferred by IOTHUB_DEVICE_CONFIG] + device_config.authentication_mode = (device->deviceKey != NULL || device->deviceSasToken != NULL ? DEVICE_AUTH_MODE_CBS : DEVICE_AUTH_MODE_X509); + device_config.on_state_changed_callback = on_device_state_changed_callback; + device_config.on_state_changed_context = amqp_device_instance; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_071: [`amqp_device_instance->device_handle` shall be set using device_create()] + if ((amqp_device_instance->device_handle = device_create(&device_config)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_073: [If device_create() fails, IoTHubTransport_AMQP_Common_Register shall fail and return NULL] + LogError("Transport failed to register device '%s' (failed to create the DEVICE_HANDLE instance)", device->deviceId); + result = NULL; + } + else + { + bool is_first_device_being_registered = (singlylinkedlist_get_head_item(transport_instance->registered_devices) == NULL); + #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_010: [ `IoTHubTransport_AMQP_Common_Create` shall create a new iothubtransportamqp_methods instance by calling `iothubtransportamqp_methods_create` while passing to it the the fully qualified domain name and the device Id. ]*/ - device_state->methods_handle = iothubtransportamqp_methods_create(STRING_c_str(transport_state->iotHubHostFqdn), deviceId); - if (device_state->methods_handle == NULL) - { - /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_011: [ If `iothubtransportamqp_methods_create` fails, `IoTHubTransport_AMQP_Common_Create` shall fail and return NULL. ]*/ - LogError("Cannot create the methods module"); - cleanup_required = true; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_231: [IoTHubTransport_AMQP_Common_Register shall add the device to transport_state->registered_devices using VECTOR_push_back().] - else + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_010: [ `IoTHubTransport_AMQP_Common_Create` shall create a new iothubtransportamqp_methods instance by calling `iothubtransportamqp_methods_create` while passing to it the the fully qualified domain name and the device Id. ]*/ + amqp_device_instance->methods_handle = iothubtransportamqp_methods_create(STRING_c_str(transport_instance->iothub_host_fqdn), device->deviceId); + if (amqp_device_instance->methods_handle == NULL) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_011: [ If `iothubtransportamqp_methods_create` fails, `IoTHubTransport_AMQP_Common_Create` shall fail and return NULL. ]*/ + LogError("Transport failed to register device '%s' (Cannot create the methods module)", device->deviceId); + result = NULL; + } + else #endif - if (VECTOR_push_back(transport_state->registered_devices, &device_state, 1) != 0) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_232: [If VECTOR_push_back() fails to add the new registered device, IoTHubTransport_AMQP_Common_Register shall clean the memory it allocated, fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed to add the new device to its list of registered devices (VECTOR_push_back failed)."); - cleanup_required = true; - } - else - { - AUTHENTICATION_CONFIG auth_config; - auth_config.device_id = deviceId; - auth_config.device_key = device->deviceKey; - auth_config.device_sas_token = device->deviceSasToken; - auth_config.cbs_connection = &device_state->transport_state->cbs_connection; - auth_config.iot_hub_host_fqdn = STRING_c_str(device_state->transport_state->iotHubHostFqdn); + if (replicate_device_options_to(amqp_device_instance, device_config.authentication_mode) != RESULT_OK) + { + LogError("Transport failed to register device '%s' (failed to replicate options)", device->deviceId); + result = NULL; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_074: [IoTHubTransport_AMQP_Common_Register shall add the `amqp_device_instance` to `instance->registered_devices`] + else if (singlylinkedlist_add(transport_instance->registered_devices, amqp_device_instance) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_075: [If it fails to add `amqp_device_instance`, IoTHubTransport_AMQP_Common_Register shall fail and return NULL] + LogError("Transport failed to register device '%s' (singlylinkedlist_add failed)", device->deviceId); + result = NULL; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_076: [If the device is the first being registered on the transport, IoTHubTransport_AMQP_Common_Register shall save its authentication mode as the transport preferred authentication mode] + if (transport_instance->preferred_authentication_mode == AMQP_TRANSPORT_AUTHENTICATION_MODE_NOT_SET && + is_first_device_being_registered) + { + if (device_config.authentication_mode == DEVICE_AUTH_MODE_CBS) + { + transport_instance->preferred_authentication_mode = AMQP_TRANSPORT_AUTHENTICATION_MODE_CBS; + } + else + { + transport_instance->preferred_authentication_mode = AMQP_TRANSPORT_AUTHENTICATION_MODE_X509; + } + } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_229: [IoTHubTransport_AMQP_Common_Register shall create an authentication state for the device using authentication_create() and store it on the device state.] - if ((device_state->authentication = authentication_create(&auth_config)) == NULL) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_230: [If authentication_create() fails, IoTHubTransport_AMQP_Common_Register shall fail and return NULL.] - LogError("IoTHubTransport_AMQP_Common_Register failed to create an authentication state for the device."); - cleanup_required = true; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_234: [If the device is the first being registered on the transport, IoTHubTransport_AMQP_Common_Register shall save its authentication mode as the transport preferred authentication mode.] - if (VECTOR_size(transport_state->registered_devices) == 1) - { - transport_state->preferred_credential_type = authentication_get_credential(device_state->authentication)->type; - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_233: [IoTHubTransport_AMQP_Common_Register shall return its internal device representation as a IOTHUB_DEVICE_HANDLE.] - result = (IOTHUB_DEVICE_HANDLE)device_state; - cleanup_required = false; - } - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_078: [IoTHubTransport_AMQP_Common_Register shall return a handle to `amqp_device_instance` as a IOTHUB_DEVICE_HANDLE] + result = (IOTHUB_DEVICE_HANDLE)amqp_device_instance; + } + } } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_233: [If IoTHubTransport_AMQP_Common_Register fails, it shall free all memory it alloacated (destroy deviceId, authentication state, targetAddress, messageReceiveAddress, devicesPath, device state).] - if (cleanup_required) + if (result == NULL) { - if (device_state->deviceId != NULL) - STRING_delete(device_state->deviceId); - if (device_state->authentication != NULL) - authentication_destroy(device_state->authentication); - if (device_state->targetAddress != NULL) - STRING_delete(device_state->targetAddress); - if (device_state->messageReceiveAddress != NULL) - STRING_delete(device_state->messageReceiveAddress); - if (device_state->devicesPath != NULL) - STRING_delete(device_state->devicesPath); - - free(device_state); - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_077: [If IoTHubTransport_AMQP_Common_Register fails, it shall free all memory it allocated] + internal_destroy_amqp_device_instance(amqp_device_instance); + } } } } @@ -1858,101 +1771,62 @@ void IoTHubTransport_AMQP_Common_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_214: [IoTHubTransport_AMQP_Common_Unregister should fail and return if the IOTHUB_DEVICE_HANDLE parameter provided is NULL.] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_079: [if `deviceHandle` provided is NULL, IoTHubTransport_AMQP_Common_Unregister shall return.] if (deviceHandle == NULL) { - LogError("IoTHubTransport_AMQP_Common_Unregister failed (deviceHandle is NULL)."); + LogError("Failed to unregister device (deviceHandle is NULL)."); } else { - AMQP_TRANSPORT_DEVICE_STATE* device_state = (AMQP_TRANSPORT_DEVICE_STATE*)deviceHandle; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_215: [IoTHubTransport_AMQP_Common_Unregister should fail and return if the IOTHUB_DEVICE_HANDLE parameter provided has a NULL reference to its transport instance.] - if (device_state->transport_state == NULL) - { - LogError("IoTHubTransport_AMQP_Common_Unregister failed (deviceHandle does not have a transport state associated to)."); - } - else - { - IOTHUB_DEVICE_HANDLE* registered_device_state; - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_216: [IoTHubTransport_AMQP_Common_Unregister should fail and return if the device is not registered with this transport.] - if ((registered_device_state = VECTOR_find_if(device_state->transport_state->registered_devices, findDeviceById, STRING_c_str(device_state->deviceId))) == NULL) - { - LogError("IoTHubTransport_AMQP_Common_Unregister failed (device '%s' is not registered on this transport instance)", STRING_c_str(device_state->deviceId)); - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_024: [IoTHubTransport_AMQP_Common_Unregister shall destroy the AMQP message_sender.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_029: [IoTHubTransport_AMQP_Common_Unregister shall destroy the AMQP message_sender link.] - destroyEventSender(device_state); + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)deviceHandle; + const char* device_id; + LIST_ITEM_HANDLE list_item; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_025: [IoTHubTransport_AMQP_Common_Unregister shall destroy the AMQP message_receiver.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_211: [IoTHubTransport_AMQP_Common_Unregister shall destroy the AMQP message_receiver link.] - destroyMessageReceiver(device_state); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_036: [IoTHubTransport_AMQP_Common_Unregister shall return the remaining items in inProgress to waitingToSend list.] - rollEventsBackToWaitList(device_state); + if ((device_id = STRING_c_str(registered_device->device_id)) == NULL) + { + LogError("Failed to unregister device (failed to get device id char ptr)"); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_080: [if `deviceHandle` has a NULL reference to its transport instance, IoTHubTransport_AMQP_Common_Unregister shall return.] + else if (registered_device->transport_instance == NULL) + { + LogError("Failed to unregister device '%s' (deviceHandle does not have a transport state associated to).", device_id); + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_081: [If the device is not registered with this transport, IoTHubTransport_AMQP_Common_Unregister shall return] + else if (!is_device_registered_ex(registered_device->transport_instance->registered_devices, device_id, &list_item)) + { + LogError("Failed to unregister device '%s' (device is not registered within this transport).", device_id); + } + else + { + // Removing it first so the race hazzard is reduced between this function and DoWork. Best would be to use locks. + if (singlylinkedlist_remove(registered_device->transport_instance->registered_devices, list_item) != RESULT_OK) + { + LogError("Failed to unregister device '%s' (singlylinkedlist_remove failed).", device_id); + } + else + { + // TODO: Q: should we go through waiting_to_send list and raise on_event_send_complete with BECAUSE_DESTROY ? - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_035: [IoTHubTransport_AMQP_Common_Unregister shall delete its internally-set parameters (targetAddress, messageReceiveAddress, devicesPath, deviceId).] - STRING_delete(device_state->targetAddress); - STRING_delete(device_state->messageReceiveAddress); - STRING_delete(device_state->devicesPath); - STRING_delete(device_state->deviceId); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_217: [IoTHubTransport_AMQP_Common_Unregister shall destroy the authentication state of the device using authentication_destroy.] - authentication_destroy(device_state->authentication); - -#ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ - /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_012: [IoTHubTransport_AMQP_Common_Unregister shall destroy the C2D methods handler by calling iothubtransportamqp_methods_destroy.]*/ - iothubtransportamqp_methods_destroy(device_state->methods_handle); -#endif - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_218: [IoTHubTransport_AMQP_Common_Unregister shall remove the device from its list of registered devices using VECTOR_erase().] - VECTOR_erase(device_state->transport_state->registered_devices, registered_device_state, 1); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_219: [IoTHubTransport_AMQP_Common_Unregister shall destroy the IOTHUB_DEVICE_HANDLE instance provided.] - free(device_state); - } - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_012: [IoTHubTransport_AMQP_Common_Unregister shall destroy the C2D methods handler by calling iothubtransportamqp_methods_destroy] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_083: [IoTHubTransport_AMQP_Common_Unregister shall free all the memory allocated for the `device_instance`] + internal_destroy_amqp_device_instance(registered_device); + } + } } } void IoTHubTransport_AMQP_Common_Destroy(TRANSPORT_LL_HANDLE handle) { - if (handle != NULL) - { - AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - - size_t numberOfRegisteredDevices = VECTOR_size(transport_state->registered_devices); - - for (size_t i = 0; i < numberOfRegisteredDevices; i++) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_209: [IoTHubTransport_AMQP_Common_Destroy shall invoke IoTHubTransport_AMQP_Common_Unregister on each of its registered devices.] - IoTHubTransport_AMQP_Common_Unregister(*(AMQP_TRANSPORT_DEVICE_STATE**)VECTOR_element(transport_state->registered_devices, i)); - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_210: [IoTHubTransport_AMQP_Common_Destroy shall its list of registered devices using VECTOR_destroy().] - VECTOR_destroy(transport_state->registered_devices); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_027: [IoTHubTransport_AMQP_Common_Destroy shall destroy the AMQP cbs instance] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_030: [IoTHubTransport_AMQP_Common_Destroy shall destroy the AMQP session.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_031: [IoTHubTransport_AMQP_Common_Destroy shall destroy the AMQP connection.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_032: [IoTHubTransport_AMQP_Common_Destroy shall destroy the AMQP SASL I / O transport.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_033: [IoTHubTransport_AMQP_Common_Destroy shall destroy the AMQP SASL mechanism.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_034: [IoTHubTransport_AMQP_Common_Destroy shall destroy the AMQP TLS I/O transport.] - destroyConnection(transport_state); - - // CodeS_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_212: [IoTHubTransport_AMQP_Common_Destroy shall destroy the IoTHub FQDN value saved on the transport instance] - STRING_delete(transport_state->iotHubHostFqdn); - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_213: [IoTHubTransport_AMQP_Common_Destroy shall destroy any TLS I/O options saved on the transport instance using OptionHandler_Destroy()] - if (transport_state->xioOptions != NULL) - { - OptionHandler_Destroy(transport_state->xioOptions); - } - - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_150: [IoTHubTransport_AMQP_Common_Destroy shall destroy the transport instance] - free(transport_state); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_013: [If `handle` is NULL, IoTHubTransport_AMQP_Common_Destroy shall return immediatelly] + if (handle == NULL) + { + LogError("Failed to destroy AMQP transport instance (handle is NULL)"); + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_014: [IoTHubTransport_AMQP_Common_Destroy shall invoke IoTHubTransport_AMQP_Common_Unregister on each of its registered devices.] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_015: [All members of `instance` (including tls_io) shall be destroyed and its memory released] + internal_destroy_instance((AMQP_TRANSPORT_INSTANCE*)handle); } } @@ -1972,16 +1846,103 @@ STRING_HANDLE IoTHubTransport_AMQP_Common_GetHostname(TRANSPORT_LL_HANDLE handle) { STRING_HANDLE result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_001: [If parameter handle is NULL then IoTHubTransport_AMQP_Common_GetHostname shall return NULL.] - if (handle == NULL) + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_001: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_GetHostname` shall return NULL.] + if (handle == NULL) { - result = NULL; + LogError("Cannot provide the target host name (transport handle is NULL)."); + + result = NULL; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_002: [IoTHubTransport_AMQP_Common_GetHostname shall return a copy of `instance->iothub_target_fqdn`.] + else if ((result = STRING_clone(((AMQP_TRANSPORT_INSTANCE*)(handle))->iothub_host_fqdn)) == NULL) + { + LogError("Cannot provide the target host name (STRING_clone failed)."); + } + + return result; +} + +static DEVICE_MESSAGE_DISPOSITION_INFO* create_device_message_disposition_info_from(MESSAGE_CALLBACK_INFO* message_data) +{ + DEVICE_MESSAGE_DISPOSITION_INFO* result; + + if ((result = (DEVICE_MESSAGE_DISPOSITION_INFO*)malloc(sizeof(DEVICE_MESSAGE_DISPOSITION_INFO))) == NULL) + { + LogError("Failed creating DEVICE_MESSAGE_DISPOSITION_INFO (malloc failed)"); + } + else if (mallocAndStrcpy_s(&result->source, message_data->transportContext->link_name) != RESULT_OK) + { + LogError("Failed creating DEVICE_MESSAGE_DISPOSITION_INFO (mallocAndStrcpy_s failed)"); + free(result); + result = NULL; + } + else + { + result->message_id = message_data->transportContext->message_id; + } + + return result; +} + +static void destroy_device_message_disposition_info(DEVICE_MESSAGE_DISPOSITION_INFO* device_message_disposition_info) +{ + free(device_message_disposition_info->source); + free(device_message_disposition_info); +} + +IOTHUB_CLIENT_RESULT IoTHubTransport_AMQP_Common_SendMessageDisposition(MESSAGE_CALLBACK_INFO* message_data, IOTHUBMESSAGE_DISPOSITION_RESULT disposition) +{ + IOTHUB_CLIENT_RESULT result; + if (message_data == NULL) + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_10_001: [If messageData is NULL, IoTHubTransport_AMQP_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_INVALID_ARG.] */ + LogError("Failed sending message disposition (message_data is NULL)"); + result = IOTHUB_CLIENT_INVALID_ARG; } else { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_002: [Otherwise IoTHubTransport_AMQP_Common_GetHostname shall return the target IoT Hub FQDN as a STRING_HANDLE.] - result = ((AMQP_TRANSPORT_INSTANCE*)(handle))->iotHubHostFqdn; + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_10_002: [If any of the messageData fields are NULL, IoTHubTransport_AMQP_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_INVALID_ARG.] */ + if (message_data->messageHandle == NULL || message_data->transportContext == NULL) + { + LogError("Failed sending message disposition (message_data->messageHandle (%p) or message_data->transportContext (%p) are NULL)", message_data->messageHandle, message_data->transportContext); + result = IOTHUB_CLIENT_INVALID_ARG; + } + else + { + DEVICE_MESSAGE_DISPOSITION_INFO* device_message_disposition_info; + + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_10_004: [IoTHubTransport_AMQP_Common_SendMessageDisposition shall convert the given IOTHUBMESSAGE_DISPOSITION_RESULT to the equivalent AMQP_VALUE and will return the result of calling messagereceiver_send_message_disposition. ] */ + DEVICE_MESSAGE_DISPOSITION_RESULT device_disposition_result = get_device_disposition_result_from(disposition); + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_112: [A DEVICE_MESSAGE_DISPOSITION_INFO instance shall be created with a copy of the `link_name` and `message_id` contained in `message_data`] + if ((device_message_disposition_info = create_device_message_disposition_info_from(message_data)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_113: [If the DEVICE_MESSAGE_DISPOSITION_INFO fails to be created, `IoTHubTransport_AMQP_Common_SendMessageDisposition()` shall fail and return IOTHUB_CLIENT_ERROR] + LogError("Device '%s' failed sending message disposition (failed creating DEVICE_MESSAGE_DISPOSITION_RESULT)", STRING_c_str(message_data->transportContext->device_state->device_id)); + result = IOTHUB_CLIENT_ERROR; + } + else + { + /* Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_10_003: [IoTHubTransport_AMQP_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR if the POST message fails, otherwise return IOTHUB_CLIENT_OK.] */ + if (device_send_message_disposition(message_data->transportContext->device_state->device_handle, device_message_disposition_info, device_disposition_result) != RESULT_OK) + { + LogError("Device '%s' failed sending message disposition (device_send_message_disposition failed)", STRING_c_str(message_data->transportContext->device_state->device_id)); + result = IOTHUB_CLIENT_ERROR; + } + else + { + IoTHubMessage_Destroy(message_data->messageHandle); + MESSAGE_CALLBACK_INFO_Destroy(message_data); + result = IOTHUB_CLIENT_OK; + } + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_114: [`IoTHubTransport_AMQP_Common_SendMessageDisposition()` shall destroy the DEVICE_MESSAGE_DISPOSITION_INFO instance] + destroy_device_message_disposition_info(device_message_disposition_info); + } + } } + return result; }