Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 7:07bc440836b3
- Parent:
- 5:8d58d20699dd
- Child:
- 8:450043aa3c5c
--- a/iothubtransportamqp.c Fri Nov 13 16:07:57 2015 -0800 +++ b/iothubtransportamqp.c Thu Feb 04 11:39:49 2016 -0800 @@ -7,1730 +7,1230 @@ #endif #include "gballoc.h" -#include "proton/message.h" -#include "proton/messenger.h" +#include "cbs.h" +#include "link.h" +#include "message.h" +#include "message_receiver.h" +#include "message_sender.h" +#include "messaging.h" +#include "sasl_mssbcbs.h" +#include "saslclientio.h" +#include "crt_abstractions.h" +#include "doublylinkedlist.h" #include "iot_logging.h" +#include "platform.h" +#include "sastoken.h" #include "strings.h" +#include "tlsio_openssl.h" +#include "tlsio_schannel.h" +#include "tlsio_wolfssl.h" #include "urlencode.h" -#include "doublylinkedlist.h" -#include "crt_abstractions.h" -#include "sastoken.h" + +#include "tlsio.h" #include "iothub_client_ll.h" #include "iothub_client_private.h" #include "iothubtransportamqp.h" -#include "iothub_client_amqp_internal.h" +#include "iothub_client_version.h" + +#define RESULT_OK 0 +#define RESULT_FAILURE 1 +#define RESULT_TIMEOUT 2 + +#define TIME_MAX SIZE_MAX +#define RFC1035_MAX_FQDN_LENGTH 255 +#define DEFAULT_IOTHUB_AMQP_PORT 5671 +#define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000 +#define DEFAULT_CBS_REQUEST_TIMEOUT_MS 30000 +#define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000 +#define CBS_AUDIENCE "servicebus.windows.net:sastoken" +#define DEFAULT_CONTAINER_ID "default_container_id" +#define DEFAULT_INCOMING_WINDOW_SIZE SIZE_MAX +#define DEFAULT_OUTGOING_WINDOW_SIZE 100 +#define MESSAGE_RECEIVER_LINK_NAME "receiver-link" +#define MESSAGE_RECEIVER_TARGET_ADDRESS "ingress-rx" +#define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536 +#define MESSAGE_SENDER_LINK_NAME "sender-link" +#define MESSAGE_SENDER_SOURCE_ADDRESS "ingress" +#define MESSAGE_SENDER_MAX_LINK_SIZE 65536 + +typedef XIO_HANDLE(*TLS_IO_TRANSPORT_PROVIDER)(const char* fqdn, int port, const char* certificates); + +typedef enum CBS_STATE_TAG +{ + CBS_STATE_IDLE, + CBS_STATE_AUTH_IN_PROGRESS, + CBS_STATE_AUTHENTICATED +} CBS_STATE; -static const size_t NUMBER_OF_MESSENGER_STOP_TRIES = PROTON_MESSENGER_STOP_TRIES; -static const size_t MAXIMUM_EVENT_LENGTH = PROTON_MAXIMUM_EVENT_LENGTH; +typedef struct AMQP_TRANSPORT_STATE_TAG +{ + // FQDN of the IoT Hub. + STRING_HANDLE iotHubHostFqdn; + // AMQP port of the IoT Hub. + int iotHubPort; + // Certificates to be used by the TLS I/O. + char* trusted_certificates; + // Key associated to the device to be used. + STRING_HANDLE deviceKey; + // Address to which the transport will connect to and send events. + STRING_HANDLE targetAddress; + // Address to which the transport will connect to and receive messages from. + STRING_HANDLE messageReceiveAddress; + // A component of the SAS token. Currently this must be an empty string. + STRING_HANDLE sasTokenKeyName; + // Internal parameter that identifies the current logical device within the service. + STRING_HANDLE devicesPath; + // How long a SAS token created by the transport is valid, in milliseconds. + size_t sas_token_lifetime; + // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds. + size_t sas_token_refresh_time; + // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds. + size_t cbs_request_timeout; + // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds. + size_t message_send_timeout; + // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds. + size_t connection_timeout; + // Saved reference to the IoTHub LL Client. + IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; + + // TSL I/O transport. + XIO_HANDLE tls_io; + // Pointer to the function that creates the TLS I/O (internal use only). + TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider; + // AMQP SASL I/O transport created on top of the TLS I/O layer. + XIO_HANDLE sasl_io; + // AMQP SASL I/O mechanism to be used. + SASL_MECHANISM_HANDLE sasl_mechanism; + // AMQP connection. + CONNECTION_HANDLE connection; + // Current AMQP connection state; + AMQP_MANAGEMENT_STATE connection_state; + // Last time the AMQP connection establishment was initiated. + size_t connection_establish_time; + // AMQP session. + SESSION_HANDLE session; + // AMQP link used by the event sender. + LINK_HANDLE sender_link; + // uAMQP event sender. + MESSAGE_SENDER_HANDLE message_sender; + // Internal flag that controls if messages should be received or not. + bool receive_messages; + // AMQP link used by the message receiver. + LINK_HANDLE receiver_link; + // uAMQP message receiver. + MESSAGE_RECEIVER_HANDLE message_receiver; + // List with events still pending to be sent. It is provided by the upper layer. + PDLIST_ENTRY waitingToSend; + // Internal list with the items currently being processed/sent through uAMQP. + DLIST_ENTRY inProgress; + // Connection instance with the Azure IoT CBS. + CBS_HANDLE cbs; + // Current state of the CBS connection. + CBS_STATE cbs_state; + // Time when the current SAS token was created, in seconds since epoch. + size_t current_sas_token_create_time; +} AMQP_TRANSPORT_INSTANCE; + +// This structure is used to track an event being sent and the time it was sent. +typedef struct EVENT_TRACKER_TAG +{ + IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; + IOTHUB_MESSAGE_LIST* message; + time_t time_sent; + DLIST_ENTRY entry; +} EVENT_TRACKER; -#define amqp_batch_result_values \ - amqp_batch_nowork, \ - amqp_batch_batch, \ - amqp_batch_nobatch, \ - amqp_batch_error \ - -DEFINE_ENUM(amqp_batch_result, amqp_batch_result_values); - -static const int IO_PROCESSING_YIELD_IN_MILLISECONDS = PROTON_PROCESSING_YIELD_IN_MILLISECONDS; - -static void processReceives(TRANSPORT_HANDLE handle); - -typedef struct MESSENGER_CONTAINER_TAG -{ - // - // This links all of these records onto the messageCorral in the transport state. - DLIST_ENTRY entry; - - // - // This is a messenger that couldn't be stopped. (You have to admire its initiative.) We'll keep trying to stop it when - // we have a spare moment. - pn_messenger_t* messengerThatDidNotStop; -} MESSENGER_CONTAINER, *PMESSENGER_CONTAINER; +// Auxiliary functions -// -// Transport specific structure used to contain everything needed to send an event item. -// -typedef struct AMQP_WORK_ITEM_TAG -{ - // - // At what time should we give up on sending this message. - size_t expiry; - - // - // Used by the transport to determine if the IO operation is complete. - pn_tracker_t tracker; - - // - // This acts as a listhead for all of the messages that make up one transfer frame. - // If no batching is being done then this list will have at most one item. If batching - // is being done then this could be lengthy! - DLIST_ENTRY eventMessages; - - // - // This is used to hold this work item either on the work in progress list or on the available work item list. - DLIST_ENTRY link; -} AMQP_WORK_ITEM, *PAMQP_WORK_ITEM; - -static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState); - -static bool checkForErrorsThenWork(PAMQP_TRANSPORT_STATE transportState) +static STRING_HANDLE concat3Params(const char* prefix, const char* infix, const char* suffix) { - pn_error_t* errorStruct; - bool result; - errorStruct = pn_messenger_error(transportState->messenger); - if (pn_error_code(errorStruct) != 0) - { - LogError("An error was detected in the networking manager: %d\r\n", pn_error_code(errorStruct)); - transportState->messengerInitialized = false; - result = false; - } - else - { - int protonResult; - protonResult = pn_messenger_work(transportState->messenger, IO_PROCESSING_YIELD_IN_MILLISECONDS); - if ((protonResult < 0) && (protonResult != PN_TIMEOUT)) - { - LogError("A networking error occured. Proton error code: %d\r\n", protonResult); - transportState->messengerInitialized = false; - result = false; - } - else - { - result = true; - } - } - return result; -} + STRING_HANDLE result = NULL; + char* concat = NULL; + size_t totalLength = strlen(prefix) + strlen(infix) + strlen(suffix) + 1; // One extra for \0. -static int setRecvMessageIds(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg) -{ - int result; - - // Function can not fail - pn_atom_t msgIdInfo = pn_message_get_id(msg); - if (msgIdInfo.type != PN_NULL) + if ((concat = (char*)malloc(sizeof(char) * totalLength)) != NULL) { - if (msgIdInfo.type != PN_STRING) - { - LogError("Message Id Failure: Invalid message id Type %d\r\n", msgIdInfo.type); - result = __LINE__; - } - else + if (snprintf(concat, totalLength, "%s%s%s", prefix, infix, suffix) > 0) { - if (IoTHubMessage_SetMessageId(ioTMessage, msgIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK) - { - result = __LINE__; - } - else - { - result = 0; - } + result = STRING_construct(concat); } - } - else - { - // NULL values mean it's not set - result = 0; + free(concat); } - if (result == 0) - { - pn_atom_t corrIdInfo = pn_message_get_correlation_id(msg); - if (corrIdInfo.type != PN_NULL) - { - if (corrIdInfo.type != PN_STRING) - { - LogError("Message Id Failure: Invalid correlation id Type %d\r\n", corrIdInfo.type); - result = __LINE__; - } - else - { - if (IoTHubMessage_SetCorrelationId(ioTMessage, corrIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK) - { - result = __LINE__; - } - else - { - result = 0; - } - } - } - else - { - // NULL values mean it's not set - result = 0; - } - } - return result; -} - -static int setSendMessageIds(PDLIST_ENTRY messagesEntry, pn_message_t* msg) -{ - int result; - - IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesEntry->Flink, IOTHUB_MESSAGE_LIST, entry); - const char* messageId = IoTHubMessage_GetMessageId(currentMessage->messageHandle); - if (messageId != NULL) - { - pn_bytes_t dataBytes = pn_bytes(strlen(messageId), messageId); - pn_atom_t pnMsgId; - pnMsgId.type = PN_STRING; - pnMsgId.u.as_bytes = dataBytes; - if (pn_message_set_id(msg, pnMsgId) == 0) - { - result = 0; - } - else - { - LogError("Failure setting pn_message_set_id.\r\n"); - result = __LINE__; - } - } - else - { - result = 0; - } - - if (result == 0) - { - const char* correlationId = IoTHubMessage_GetCorrelationId(currentMessage->messageHandle); - if (correlationId != NULL) - { - pn_bytes_t dataBytes = pn_bytes(strlen(correlationId), correlationId); - pn_atom_t pnCorrelationId; - pnCorrelationId.type = PN_STRING; - pnCorrelationId.u.as_bytes = dataBytes; - if (pn_message_set_correlation_id(msg, pnCorrelationId) == 0) - { - result = 0; - } - else - { - LogError("Failure setting pn_message_set_correlation_id.\r\n"); - result = __LINE__; - } - } - else - { - result = 0; - } - } - return result; -} - -/* Code_SRS_IOTHUBTRANSPORTTAMQP_07_001: [All IoTHubMessage_Properties shall be enumerated and entered in the pn_message_properties Map.] */ -static int setProperties(PDLIST_ENTRY messagesMakingUpBatch, pn_message_t* msg) -{ - int result = 0; - size_t index; - - pn_data_t* propData; - const char*const* keys; - const char*const* values; - size_t propertyCount = 0; - - IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry); - MAP_HANDLE mapProperties = IoTHubMessage_Properties(currentMessage->messageHandle); - if (mapProperties == NULL) - { - LogError("Failure IoTHubMessage_Properties.\r\n"); - result = __LINE__; - } - else if (Map_GetInternals(mapProperties, &keys, &values, &propertyCount) != MAP_OK) - { - LogError("Failure retrieving Map_GetInternals.\r\n"); - result = __LINE__; - } - else if (propertyCount > 0) - { - if ( (propData = pn_message_properties(msg) ) == NULL) - { - LogError("Failure pn_message_properties.\r\n"); - result = __LINE__; - } - else if (pn_data_put_map(propData) != 0) - { - LogError("Failure pn_data_put_map.\r\n"); - result = __LINE__; - } - else if (pn_data_enter(propData) == false ) - { - LogError("Failure pn_data_enter.\r\n"); - result = __LINE__; - } - else - { - for (index = 0; index < propertyCount; index++) - { - if (pn_data_put_symbol(propData, pn_bytes(strlen(keys[index]), keys[index])) != 0) - { - LogError("Failure pn_data_put_symbol.\r\n"); - break; - } - else if (pn_data_put_string(propData, pn_bytes(strlen(values[index]), values[index])) != 0) - { - LogError("Failure pn_data_put_string.\r\n"); - break; - } - } - if (index != propertyCount) - { - result = __LINE__; - } - else if (!pn_data_exit(propData) ) - { - LogError("Failure pn_data_exit.\r\n"); - result = __LINE__; - } - else - { - result = 0; - } - } - } - return result; -} - -static int getMapString(pn_data_t* propData, pn_bytes_t* mapValue) -{ - int result; - if (pn_data_next(propData) == true) - { - if (pn_data_type(propData) == PN_STRING) - { - *mapValue = pn_data_get_string(propData); - result = 0; - } - else - { - LogError("Failure pn_data_type.\r\n"); - result = __LINE__; - } - } - else - { - LogError("Failure pn_data_next.\r\n"); - result = __LINE__; - } - return result; -} - -static int getMapInt(pn_data_t* propData, int32_t* mapValue) -{ - int result; - if (pn_data_next(propData) == true) - { - if (pn_data_type(propData) == PN_INT) - { - *mapValue = pn_data_get_int(propData); - result = 0; - } - else - { - LogError("Failure pn_data_type.\r\n"); - result = __LINE__; - } - } - else - { - LogError("Failure pn_data_next.\r\n"); - result = __LINE__; - } return result; } -/* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On messages the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */ -static int cloneProperties(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg) +static size_t getSecondsSinceEpoch() { - int result = 0; - size_t index = 0; - pn_data_t* propData = pn_message_properties(msg); - if (propData == NULL) - { - LogError("Failure pn_message_properties.\r\n"); - result = __LINE__; - } - else if (pn_data_next(propData) == false ) - { - // This should fail if there is no next sibling which means that there - // are no property and it should continue - result = 0; - } - else - { - size_t propertyCount = pn_data_get_map(propData)/2; - if (propertyCount > 0) - { - if (pn_data_enter(propData) == false) - { - LogError("Failure pn_data_enter.\r\n"); - result = __LINE__; - } - else - { - // Get the Properties from the Message - MAP_HANDLE properties = IoTHubMessage_Properties(ioTMessage); - if (properties == NULL) - { - result = __LINE__; - } - else - { - for (index = 0; index < propertyCount; index++) - { - pn_bytes_t propValue; - pn_bytes_t propName; - if (getMapString(propData, &propValue) != 0) - { - result = __LINE__; - break; - } - if (getMapString(propData, &propName) != 0) - { - result = __LINE__; - break; - } - - if (Map_AddOrUpdate(properties, propValue.start, propName.start) != MAP_OK) - { - LogError("Failure Map_AddOrUpdate.\r\n"); - result = __LINE__; - break; - } - } - if (index != propertyCount) - { - result = __LINE__; - } - else if (!pn_data_exit(propData) ) - { - LogError("Failure pn_data_exit.\r\n"); - result = __LINE__; - } - else - { - result = 0; - } - } - } - } - } - return result; + return (size_t)(difftime(get_time(NULL), (time_t)0) + 0); } -static void putSecondListAfterHeadOfFirst(PDLIST_ENTRY first, PDLIST_ENTRY second) +static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state) { - DList_AppendTailList(first->Flink, second); - DList_RemoveEntryList(second); - DList_InitializeListHead(second); // Just to be tidy. + EVENT_TRACKER* event_tracker = NULL; + + if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL) + { + event_tracker->iothub_client_handle = transport_state->iothub_client_handle; + event_tracker->message = message; + event_tracker->time_sent = get_time(NULL); + DList_InitializeListHead(&event_tracker->entry); + + DList_RemoveEntryList(&message->entry); + DList_InsertTailList(&transport_state->inProgress, &event_tracker->entry); + } + + return event_tracker; +} + +static void destroyEventTracker(EVENT_TRACKER* event_tracker) +{ + free(event_tracker); } -static bool stopMessenger(pn_messenger_t* messenger) +static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state) { - bool succeeded; - int result; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_021: [Invoke pn_messenger_stop on the messenger.]*/ - result = pn_messenger_stop(messenger); - if (result == 0) + IOTHUB_MESSAGE_LIST* message = NULL; + + if (!DList_IsListEmpty(transport_state->waitingToSend)) { - succeeded = true; - } - else if (result != PN_INPROGRESS) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/ - succeeded = false; + PDLIST_ENTRY list_entry = transport_state->waitingToSend->Flink; + message = containingRecord(list_entry, IOTHUB_MESSAGE_LIST, entry); } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_024: [If pn_messenger_stop returns PN_INPROGRESS invoke pn_messenger_stopped a fixed number of time, seeking a return of true.]*/ - size_t numberOfTrys; - succeeded = false; - for (numberOfTrys = NUMBER_OF_MESSENGER_STOP_TRIES; numberOfTrys > 0; numberOfTrys--) - { - pn_messenger_work(messenger,100); - if (pn_messenger_stopped(messenger)) - { - succeeded = true; - break; - } - } - } - return succeeded; + + return message; +} + +static int isEventInInProgressList(EVENT_TRACKER* event_tracker) +{ + return !DList_IsListEmpty(&event_tracker->entry); } -static void herdTheMessengers(PAMQP_TRANSPORT_STATE state, bool logMessengerStopFailures) +static void removeEventFromInProgressList(EVENT_TRACKER* event_tracker) { - PDLIST_ENTRY currentMessengerLink = state->messengerCorral.Flink; - - // - // We walk the list of messenger containers. We try to stop each one in turn. - // - // If we are able to stop the contained messenger, we will free the actual messenger. We will then free the messenger container. - // - while (currentMessengerLink != &state->messengerCorral) - { - PDLIST_ENTRY nextLink = currentMessengerLink->Flink; - pn_messenger_t* oldMessenger = containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry)->messengerThatDidNotStop; - if (stopMessenger(oldMessenger)) - { - (void)(DList_RemoveEntryList(currentMessengerLink)); - free(containingRecord(currentMessengerLink, MESSENGER_CONTAINER, entry)); - pn_messenger_free(oldMessenger); - } - else - { - if (logMessengerStopFailures == true) - { - LogError("Unable to free the messenger at address: %p\r\n", oldMessenger); - } - } - currentMessengerLink = nextLink; - } + DList_RemoveEntryList(&event_tracker->entry); + DList_InitializeListHead(&event_tracker->entry); } -static void disposeOfOldMessenger(PAMQP_TRANSPORT_STATE state) +static void rollEventBackToWaitList(EVENT_TRACKER* event_tracker, AMQP_TRANSPORT_INSTANCE* transport_state) { - if (state->messenger) + removeEventFromInProgressList(event_tracker); + DList_InsertTailList(transport_state->waitingToSend, &event_tracker->message->entry); + destroyEventTracker(event_tracker); +} + +static void rollEventsBackToWaitList(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + PDLIST_ENTRY entry = transport_state->inProgress.Blink; + + while (entry != &transport_state->inProgress) { - if (stopMessenger(state->messenger)) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_022: [If pn_messenger_stop returns 0 then invoke pn_messenger_free on the messenger.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_025: [If pn_messenger_stopped returns true, then call pn_messenger_free.]*/ - // - // Yay. It successfully stopped. - // - pn_messenger_free(state->messenger); - } - else - { - // - // If we couldn't destroy it, then put it on a list and try to do it later in our "spare" time. - // - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_023: [If pn_messenger_stop returns any result other than PN_INPROGRESS then place the messenger on a list for attempting to stop later.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_026: [If pn_messenger_stopped never returns true, then place the messenger on a list for attempting to stop later.]*/ - PMESSENGER_CONTAINER newContainer = malloc(sizeof(MESSENGER_CONTAINER)); - if (newContainer == NULL) - { - // - // Darn! This is not good. These containers are not very big. If we can't allocate one, things are pretty bad. - // However, it doesn't make sense to actually free this messenger. It hasn't been stopped and we can NOT be sure who - // is referencing the memory that it has allocated. The safest thing to do in this very bad situation is to drop - // it on the floor. It's a leak, but a leak is better than anything else. There is a pretty good likelyhood that - // the app is exiting soon. The dropped memory will be cleaned up by exit. - ; - } - else - { - newContainer->messengerThatDidNotStop = state->messenger; - DList_InsertTailList(&state->messengerCorral, &newContainer->entry); - } - } - state->messenger = NULL; + EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry); + entry = entry->Blink; + rollEventBackToWaitList(event_tracker, transport_state); } } -static void clientTransportAMQP_Destroy(TRANSPORT_HANDLE handle) +static void on_message_send_complete(const void* context, MESSAGE_SEND_RESULT send_result) { - PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_010: [clientTransportAMQP_Destroy shall do nothing if the handle is NULL.]*/ - if (transportState) + if (context != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_018: [clientTransportAMQP_Destroy shall free all the resources currently in use.]*/ - // - // Go through all of the active work items. Complete their associated messages. - // - while (!DList_IsListEmpty(&transportState->workInProgress)) - { - PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->workInProgress); - PAMQP_WORK_ITEM currentItem = containingRecord(currentEntry, AMQP_WORK_ITEM, link); - IoTHubClient_LL_SendComplete(transportState->savedClientHandle, ¤tItem->eventMessages, IOTHUB_BATCHSTATE_FAILED); - DList_InsertTailList(&transportState->availableWorkItems, ¤tItem->link); - } + EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context; - // - // Get rid of work items that aren't being used, and that should be everything given the above loop! - // - while (!DList_IsListEmpty(&transportState->availableWorkItems)) + IOTHUB_CLIENT_RESULT iot_hub_send_result; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback on_message_send_complete shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK] + if (send_result == MESSAGE_SEND_OK) { - PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transportState->availableWorkItems); - free(containingRecord(currentEntry, AMQP_WORK_ITEM, link)); + iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK; } - - disposeOfOldMessenger(transportState); - - // - // herdTheMessengers will NOT free a messenger that couldn't be stopped. Therefore the following - // function could result in a memory leak of old messengers. This is by FAR less of a problem then - // having memory returned in to pool that something else might still be referring to. (To get all folksy - // here, don't put a sick chicken back in the hen house.) - // - - herdTheMessengers(transportState, true); - if (transportState->message) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback on_message_send_complete shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR] + else if (send_result == MESSAGE_SEND_ERROR) { - pn_message_free(transportState->message); - } - STRING_delete(transportState->messageAddress); - STRING_delete(transportState->eventAddress); - STRING_delete(transportState->urledDeviceId); - STRING_delete(transportState->devicesPortionPath); - STRING_delete(transportState->cbsAddress); - STRING_delete(transportState->deviceKey); - STRING_delete(transportState->zeroLengthString); - if (transportState->trustedCertificates != NULL) - { - free(transportState->trustedCertificates); + iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; } - free(transportState); - } -} - -static int putToken(PAMQP_TRANSPORT_STATE transportState, STRING_HANDLE token) -{ - int result = 0; - - pn_data_t *properties; - pn_atom_t messageId; - - messageId.u.as_ulong = transportState->sendTokenMessageId; - messageId.type = PN_ULONG; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_130: [putToken clears the single messages that is used for all communication via proton.]*/ - pn_message_clear(transportState->message); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_131: [putToken obtains the properties of the message.]*/ - properties = pn_message_properties(transportState->message); - if (properties == NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_132: [If this fails then putToken fails.]*/ - LogError("unable to pn_message_properties\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_133: [putToken constructs a proton map which describes the put token operation. If this construction fails then putToken fails.]*/ - else if (!((pn_data_put_map(properties) == 0) && - (pn_data_enter(properties) == true) && - (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_OPERATIONS_KEY), PROTON_MAP_OPERATIONS_KEY)) == 0) && // key - (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_PUT_TOKEN_OPERATION), PROTON_MAP_PUT_TOKEN_OPERATION)) == 0) && // value - (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TYPE_KEY), PROTON_MAP_TYPE_KEY)) == 0) && // key - (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_TOKEN_TYPE), PROTON_MAP_TOKEN_TYPE)) == 0) && // value - (pn_data_put_string(properties, pn_bytes(strlen(PROTON_MAP_NAME_KEY), PROTON_MAP_NAME_KEY)) == 0) && // key - (pn_data_put_string(properties, pn_bytes(STRING_length(transportState->devicesPortionPath), STRING_c_str(transportState->devicesPortionPath))) == 0) && // value - (pn_data_exit(properties)) - )) - { - LogError("unable to build CBS put token map\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_134: [putToken sets the address (CBS endpoint) that this put token operation is targeted to.]*/ - else if (pn_message_set_reply_to(transportState->message, CBS_REPLY_TO) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_135: [If this fails then putToken fails.]*/ - LogError("unable to pn_message_set_reply_to\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_136: [putToken sets the reply to address for the put token operation.]*/ - else if (pn_message_set_address(transportState->message, STRING_c_str(transportState->cbsAddress)) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_137: [If this fails then putToken fails.] */ - LogError("unable to pn_message_set_address\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_138: [putToken sets the message to not be inferred.]*/ - else if (pn_message_set_inferred(transportState->message, false) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_139: [If this fails then putToken fails.]*/ - LogError("unable to pn_message_set_inferred\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_140: [putToken sets the messageId of the message to the expiry of the token.]*/ - else if (pn_message_set_id(transportState->message, messageId) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_141: [If this fails then putToken fails.]*/ - LogError("Unable to set the message id on the transfer of the token to the CBS\r\n"); - result = __LINE__; - } - else - { - pn_data_t* body; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_142: [putToken obtains the body of the message.]*/ - body = pn_message_body(transportState->message); - if (body == NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_143: [If this fails then putToken fails.]*/ - LogError("Unable to pn_message_body\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_144: [putToken places the actual SAS token into the body of the message.]*/ - else if (pn_data_put_string(body, pn_bytes(STRING_length(token), STRING_c_str(token))) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_145: [If this fails then putToken fails.]*/ - LogError("Unable to pn_data_put_string\r\n"); - result = __LINE__; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_146: [putToken invokes pn_messenger_put.]*/ - else if (pn_messenger_put(transportState->messenger, transportState->message) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_147: [If this fails then putToken fails.]*/ - LogError("Unable to pn_messenger_put\r\n"); - result = __LINE__; - transportState->messengerInitialized = false; - } - else + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback on_message_send_complete shall invoke the upper layer callback for message received if provided] + if (event_tracker->message->callback != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_148: [A proton tracker is obtained.]*/ - pn_tracker_t tracker = pn_messenger_outgoing_tracker(transportState->messenger); - time_t beginningOfWaitLoop = get_time(NULL); - time_t currentTime; - result = __LINE__; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_149: [putToken will loop invoking pn_messenger_work waiting for the message to reach a terminal delivery state.]*/ - do - { - int protonResult; - if (checkForErrorsThenWork(transportState) == false) - { - break; - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_150: [If a terminal delivery state is not reached then putToken fails.]*/ - if ((protonResult = pn_messenger_status(transportState->messenger, tracker)) != PN_STATUS_PENDING) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_151: [On reaching a terminal delivery state, if the message status is NOT accepted then putToken fails.]*/ - if (protonResult != PN_STATUS_ACCEPTED) - { - LogError("Error sending the token: %s\r\n", pn_error_text(pn_messenger_error(transportState->messenger))); - result = __LINE__; - transportState->messengerInitialized = false; - } - else - { - result = 0; - } - break; - } - } - currentTime = get_time(NULL); - } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsRequestAcceptTime); + event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context); + } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_152: [The tracker is settled.]*/ - pn_messenger_settle(transportState->messenger, tracker, 0); - if (result == 0) - { - // - // Successfully sent the token. Wait around for a reply. - // - transportState->waitingForPutTokenReply = true; - beginningOfWaitLoop = get_time(NULL); - result = __LINE__; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_153: [putToken will again loop waiting for a reply.]*/ - do - { - processReceives(transportState); - if (transportState->waitingForPutTokenReply == true) - { - if (checkForErrorsThenWork(transportState) == false) - { - break; - } - } - else - { - result = 0; - break; - } - currentTime = get_time(NULL); - } while (difftime(currentTime, beginningOfWaitLoop) < transportState->cbsReplyTime); - } - else - { - LogError("An network error occured while waiting for a CBS reply.\r\n"); - transportState->messengerInitialized = false; - } - } - } - pn_message_clear(transportState->message); - return result; -} - -static void renewIfNecessaryTheCBS(PAMQP_TRANSPORT_STATE transportState) -{ - - // - // There is an expiry stored in the state. It has the last expiry value used to define a - // sas token. If we are within 20 minutes OR we're PAST that expiration generate a new - // expiry time and create a new sas token and "put" it to the cbs. - // + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback on_message_send_complete shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()] + IoTHubMessage_Destroy(event_tracker->message->messageHandle); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback on_message_send_complete shall destroy the IOTHUB_MESSAGE_LIST instance] + free(event_tracker->message); - size_t secondsSinceEpoch; - int differenceWithLastExpiry; - - transportState->putTokenWasSuccessful = false; - // - // Get the number of seconds since the epoch. - // - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_163: [Invocation of renewIfNecessaryTheCBS will first obtain the current number of seconds since the epoch.*/ - secondsSinceEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE)+0); // adding zero because a compiler feels it's necessary. - differenceWithLastExpiry = transportState->lastExpiryUsed - secondsSinceEpoch; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_164: [If the difference between lastExpiryUsed and the current value is less than refresh seconds then renewIfNecessaryTheCBS will attempt to renew the SAS.]*/ - if ((differenceWithLastExpiry <= 0) || - (((size_t)differenceWithLastExpiry) < transportState->sasRefreshLine)) // Within refresh minutes (or past if negative) of the expiration - { - // - // We already have the seconds since the epoch. Add an hour to it and generate the new SAS. - // - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_165: [If SAS renewal is to occur then lifetime seconds is added to the current number of seconds.]*/ - size_t possibleNewExpiry = secondsSinceEpoch + transportState->sasTokenLifetime; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_169: [SASToken_Create is invoked.] */ - STRING_HANDLE newSASToken = SASToken_Create(transportState->deviceKey, transportState->devicesPortionPath, transportState->zeroLengthString, possibleNewExpiry); - if (newSASToken == NULL) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback on_message_send_complete shall remove the target message from the in-progress list after the upper layer callback] + if (isEventInInProgressList(event_tracker)) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_170: [If SASToken_Create returns NULL then renewal is considered a failure and we move on to the next phase of DoWork.]*/ - LogError("Could not generate a new SAS token for the CBS\r\n"); - } - else - { - // - // We are going to save off in the transport state this POSSIBLE expiry value. Sending token and response processing code - // will use it as the message id and correlation id for the request/response. This will work - // because the code that sends the token will wait for AT LEAST one second for a valid response. If a valid response comes - // back in that period of time then even if for some bizarre reason we need to renew again in less than a second, we know - // there won't be a spurious response (because we just got the resonse!). - // - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_166: [This value is saved in the state as sendTokenMessageId.]*/ - transportState->sendTokenMessageId = possibleNewExpiry; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_171: [renewIfNecessaryTheCBS will invoke a local static function putToken.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_172: [If putToken fails then the renewal is considered a failure and we move on to the next phase of DoWork.]*/ - if (putToken(transportState, newSASToken) == 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_173: [Note that putToken actually sets a state value which will indicate whether the CBS accepted the token and responded with success.]*/ - if (transportState->putTokenWasSuccessful == true) - { - transportState->lastExpiryUsed = possibleNewExpiry; - } - } - STRING_delete(newSASToken); + removeEventFromInProgressList(event_tracker); + destroyEventTracker(event_tracker); } } } -static bool protonMessengerInit(PAMQP_TRANSPORT_STATE amqpState) +static void on_put_token_complete(const void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description) { - if (!amqpState->messengerInitialized) + AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context; + + if (operation_result == OPERATION_RESULT_OK) { - rollbackEvent(amqpState); - disposeOfOldMessenger(amqpState); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_027: [DoWork shall invoke pn_messenger to create a new messenger.]*/ - if ((amqpState->messenger = pn_messenger(NULL)) == NULL) + transportState->cbs_state = CBS_STATE_AUTHENTICATED; + } +} + +AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message) +{ + AMQP_VALUE result = NULL; + IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result = IOTHUBMESSAGE_REJECTED; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback on_message_received shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] + IOTHUB_CLIENT_LL_HANDLE iothub_client_handle = (IOTHUB_CLIENT_LL_HANDLE)context; + IOTHUB_MESSAGE_HANDLE iothub_message = NULL; + MESSAGE_BODY_TYPE body_type; + + + if (message_get_body_type(message, &body_type) != 0) + { + LogError("Failed to get the type of the message received by the transport.\r\n"); + } + else + { + if (body_type == MESSAGE_BODY_TYPE_DATA) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_028: [If pn_messenger returns NULL then the messenger initialization fails.]*/ - LogError("The messenger is not able to be created.\r\n"); + BINARY_DATA binary_data; + if (message_get_body_amqp_data(message, 0, &binary_data) != 0) + { + LogError("Failed to get the body of the message received by the transport.\r\n"); + } + else + { + iothub_message = IoTHubMessage_CreateFromByteArray(binary_data.bytes, binary_data.length); + } } - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */ - else if ((amqpState->trustedCertificates != NULL) && - (strlen(amqpState->trustedCertificates) > 0) && - (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0)) + } + + if (iothub_message == NULL) + { + LogError("Transport failed processing the message received.\r\n"); + } + else + { + disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback on_message_received shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] + if (disposition_result == IOTHUBMESSAGE_ACCEPTED) { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */ - LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n"); + result = messaging_delivery_accepted(); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback on_message_received shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] + else if (disposition_result == IOTHUBMESSAGE_ABANDONED) + { + result = messaging_delivery_released(); } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/ - else if (pn_messenger_start(amqpState->messenger) != 0) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback on_message_received shall return the result of messaging_delivery_rejected(Rejected by application, Rejected by application) if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] + else if (disposition_result == IOTHUBMESSAGE_REJECTED) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_030: [If pn_messenger_start returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to pn_messenger_start\r\n"); + result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_031: [pn_messenger_set_blocking shall be invoked.]*/ - else if (pn_messenger_set_blocking(amqpState->messenger, false) != 0) + } + + return result; +} + +XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates) +{ + TLSIO_CONFIG tls_io_config = { fqdn, port }; + const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio(); + return xio_create(io_interface_description, &tls_io_config, NULL); +} + +static void destroyConnection(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + if (transport_state != NULL) + { + if (transport_state->cbs != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_032: [If pn_messenger_set_blocking returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to pn_messenger_set_blocking\r\n"); + cbs_destroy(transport_state->cbs); + transport_state->cbs = NULL; } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_033: [pn_messenger_set_snd_settle_mode shall be invoked.]*/ - else if (pn_messenger_set_snd_settle_mode(amqpState->messenger, PN_SND_UNSETTLED) != 0) + + if (transport_state->session != NULL) + { + session_destroy(transport_state->session); + transport_state->session = NULL; + } + + if (transport_state->connection != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_034: [If pn_messenger_set_snd_settle_mode returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to set the send settle mode\r\n"); + connection_destroy(transport_state->connection); + transport_state->connection = NULL; } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_035: [pn_messenger_set_rcv_settle_mode shall be invoked.]*/ - else if (pn_messenger_set_rcv_settle_mode(amqpState->messenger, PN_RCV_FIRST) != 0) + + if (transport_state->sasl_io != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_036: [If pn_messenger_set_rcv_settle_mode returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to set the receive settle mode\r\n"); + xio_destroy(transport_state->sasl_io); + transport_state->sasl_io = NULL; + } + + if (transport_state->sasl_mechanism != NULL) + { + saslmechanism_destroy(transport_state->sasl_mechanism); + transport_state->sasl_mechanism = NULL; } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_037: [pn_messenger_set_outgoing_window shall be invoked.]*/ - else if (pn_messenger_set_outgoing_window(amqpState->messenger, PROTON_OUTGOING_WINDOW_SIZE) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_038: [If pn_messenger_set_outgoing_window returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to pn_messenger_set_outgoing_window\r\n"); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_039: [pn_messenger_set_incoming_window shall be invoked.]*/ - else if (pn_messenger_set_incoming_window(amqpState->messenger, PROTON_INCOMING_WINDOW_SIZE) != 0) + + if (transport_state->tls_io != NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.] + xio_destroy(transport_state->tls_io); + transport_state->tls_io = NULL; + } + } +} + +static void on_amqp_management_state_changed(void* context, AMQP_MANAGEMENT_STATE new_amqp_management_state, AMQP_MANAGEMENT_STATE previous_amqp_management_state) +{ + if (context != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context; + transport_state->connection_state = new_amqp_management_state; + } +} + +static int establishConnection(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + int result; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_110: [IoTHubTransportAMQP_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function] + if (transport_state->tls_io == NULL && + (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to obtain a TLS I/O transport layer.\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQPs saslmechanism_create() API] + else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create a SASL mechanism.\r\n"); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_060: [IoTHubTransportAMQP_DoWork shall create the SASL I / O layer using the xio_create() C Shared Utility API] + SASLCLIENTIO_CONFIG sasl_client_config = { transport_state->tls_io, transport_state->sasl_mechanism }; + if ((transport_state->sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_client_config, NULL)) == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_040: [If pn_messenger_set_incoming_window returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to pn_messenger_set_incoming_window\r\n"); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_061: [If xio_create() fails creating the SASL I/O layer, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create a SASL I/O layer.\r\n"); } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_156: [pn_messenger_subscribe will be invoked on the cbs address.]*/ - else if ((amqpState->cbsSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->cbsAddress))) == NULL) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] + else if ((transport_state->connection = connection_create(transport_state->sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL)) == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_157: [If the pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to create a subscription to the cbs address\r\n"); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] + result = RESULT_FAILURE; + LogError("Failed to create the AMQP connection.\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_137: [IoTHubTransportAMQP_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter] + else if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_138 : [If session_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create the AMQP session.\r\n"); } else { - //We've got the subscription. Now we attempt to connect to the cbs. - /*Codes_During messenger initialization a state variable known as lastExpiryUsed will be set to zero.*/ - amqpState->lastExpiryUsed = 0; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_159: [renewIfNecessaryCBS will be invoked.]*/ - renewIfNecessaryTheCBS(amqpState); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_160: [If after renewIfNecessaryCBS is invoked, the state variable putTokenWasSuccessful is false then the messenger initialization fails.]*/ - if (amqpState->putTokenWasSuccessful == true) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter AMQP incoming window] + if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0) + { + LogError("Failed to set the AMQP incoming window size.\r\n"); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter AMQP outgoing window] + if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0) + { + LogError("Failed to set the AMQP outgoing window size.\r\n"); + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_066: [IoTHubTransportAMQP_DoWork shall establish the CBS connection using the cbs_create() AMQP API] + if ((transport_state->cbs = cbs_create(transport_state->session, on_amqp_management_state_changed, NULL)) == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_041: [pn_messenger_subscribe shall be invoked.]*/ - if ((amqpState->messageSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->messageAddress))) == NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_042: [If pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/ - LogError("unable to create a subscription to the message address\r\n"); - } - else - { - amqpState->messengerInitialized = true; - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_067: [If cbs_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to create the CBS connection.\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] + else if ((cbs_open(transport_state->cbs)) != 0) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] + result = RESULT_FAILURE; + LogError("Failed to open the connection with CBS.\r\n"); + } + else + { + transport_state->connection_establish_time = getSecondsSinceEpoch(); + transport_state->cbs_state = CBS_STATE_IDLE; + result = RESULT_OK; } } } - return amqpState->messengerInitialized; + + if (result == RESULT_FAILURE) + { + destroyConnection(transport_state); + } + + return result; } -static amqp_batch_result prepareBatch(size_t maximumPayload, PDLIST_ENTRY waitingToSend, PDLIST_ENTRY messagesMakingUpBatch, size_t* payloadSize, const unsigned char** payload) +static int startAuthentication(AMQP_TRANSPORT_INSTANCE* transport_state) { - amqp_batch_result result; - if (DList_IsListEmpty(waitingToSend)) + int result = RESULT_FAILURE; + + size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch. + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to sas_token_lifetime milliseconds from the time of creation] + size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000); + + STRING_HANDLE newSASToken = SASToken_Create(transport_state->deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time); + + if (newSASToken == NULL) { - result = amqp_batch_nowork; + LogError("Could not generate a new SAS token for the CBS\r\n"); + } + else if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(newSASToken), on_put_token_complete, transport_state) != RESULT_OK) + { + LogError("Failed applying new SAS token to CBS\r\n"); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_074: [If there is a waitingToSend entry remove it from the list.]*/ - IOTHUB_MESSAGE_LIST* currentMessage; - IOTHUBMESSAGE_CONTENT_TYPE contentType; - size_t messageLength; - const unsigned char* messagePayload; - DList_InitializeListHead(messagesMakingUpBatch); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then DoWork will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/ - DList_InsertTailList(messagesMakingUpBatch, DList_RemoveHeadList(waitingToSend)); - currentMessage = containingRecord(messagesMakingUpBatch->Flink, IOTHUB_MESSAGE_LIST, entry); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_075: [Get its size and payload address by calling IoTHubMessage_GetByteArray.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_008: [If the message type is IOTHUBMESSAGE_STRING then get the data by using IoTHubMesage_GetString and set the size to the length of the string.] */ - contentType = IoTHubMessage_GetContentType(currentMessage->messageHandle); - if (!( - ((contentType == IOTHUBMESSAGE_BYTEARRAY) && (IoTHubMessage_GetByteArray(currentMessage->messageHandle, &messagePayload, &messageLength) == IOTHUB_MESSAGE_OK)) || - ((contentType == IOTHUBMESSAGE_STRING) && ( - messagePayload = (const unsigned char*)IoTHubMessage_GetString(currentMessage->messageHandle), - (messageLength = (messagePayload == NULL) ? 0 : strlen((const char*)messagePayload)), - messagePayload != NULL) - ) - )) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_076: [If that fails then fail that event.]*/ - LogError("Failure in getting message content. Type had decimal value = %d\r\n", contentType); - result = amqp_batch_error; - } - else if (messageLength > maximumPayload) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_077: [If the size of the payload is greater than the maximum (currently 256*1024) fail that event.]*/ - LogError("Batch length is too large.\r\n"); - result = amqp_batch_error; - } - else - { - *payloadSize = messageLength; - *payload = messagePayload; - result = amqp_batch_nobatch; - } + transport_state->cbs_state = CBS_STATE_AUTH_IN_PROGRESS; + transport_state->current_sas_token_create_time = sas_token_create_time; + result = RESULT_OK; } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS] + if (newSASToken != NULL) + STRING_delete(newSASToken); + return result; } -static bool prepareSimpleProtonMessage(pn_message_t* message, STRING_HANDLE endpoint, size_t payloadLength, const unsigned char* payload) +static int verifyAuthenticationTimeout(AMQP_TRANSPORT_INSTANCE* transport_state) { - bool result = false; - pn_data_t * body; + return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) * 1000 >= transport_state->cbs_request_timeout) ? RESULT_TIMEOUT : RESULT_OK; +} - pn_message_clear(message); - if (pn_message_set_address(message, STRING_c_str(endpoint)) != 0) - { - LogError("Unable to set amqp address for proton message.\r\n"); - } - else if (pn_message_set_inferred(message, true) != 0) +static void handleEventSendTimeouts(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + time_t current_time = get_time(NULL); + PDLIST_ENTRY entry = transport_state->inProgress.Flink; + + while (entry != &transport_state->inProgress) { - LogError("Unable to set inferred to true for proton message.\r\n"); + EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to message_send_timeout milliseconds] + if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a message_send_timeout occurs the timed out events removed from the inProgress and the upper layer notified of the send error] + on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR); + } + + entry = entry->Flink; } - else if ((body = pn_message_body(message)) == NULL) +} + +static void attachDeviceClientTypeToLink(LINK_HANDLE link) +{ + fields attach_properties = NULL; + AMQP_VALUE deviceClientTypeKeyName = NULL; + AMQP_VALUE deviceClientTypeValue = NULL; + int result; + + // + // Attempt to add the device client type string to the attach properties. + // If this doesn't happen, well, this isn't that important. We can operate + // without this property. It's worth noting that even though we are going + // on, the reasons any of these operations fail don't bode well for the + // actual upcoming attach. + // + + // Codes_SRS_IOTHUBTRANSPORTAMQP_06_187: [If IotHubTransportAMQP_DoWork fails to create an attach properties map and assign that map to the link the function will STILL proceed with the attempt to create the message sender.] + + if ((attach_properties = amqpvalue_create_map()) == NULL) { - LogError("Unable to set proton message body.\r\n"); + LogError("Failed to create the map for device client type.\r\n"); } - else if (pn_data_put_binary(body, pn_bytes(payloadLength, (const char*)payload)) != 0) + else if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL) + { + LogError("Failed to create the key name for the device client type.\r\n"); + } + else if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL) { - LogError("Unable to set binary data for message body.\r\n"); + LogError("Failed to create the key value for the device client type.\r\n"); + } + else if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0) + { + LogError("Failed to set the property map for the device client type. Error code is: %d\r\n", result); } - else + else if ((result = link_set_attach_properties(link, attach_properties)) != 0) { - result = true; + LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result); +} + amqpvalue_destroy(attach_properties); + amqpvalue_destroy(deviceClientTypeKeyName); + amqpvalue_destroy(deviceClientTypeValue); + +} + +static int destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + int result = RESULT_FAILURE; + + if (transport_state->message_sender != NULL) + { + messagesender_destroy(transport_state->message_sender); + + transport_state->message_sender = NULL; + + link_destroy(transport_state->sender_link); + + transport_state->sender_link = NULL; + + result = RESULT_OK; } return result; } -static void rollbackEvent(PAMQP_TRANSPORT_STATE transportState) +static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_175: [The list of work in progress is tested to see if it is empty.]*/ - while (!DList_IsListEmpty(&transportState->workInProgress)) + int result = RESULT_FAILURE; + + if (transport_state->message_sender == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_176: [If it is not empty then the list is traversed.]*/ - int settleResult; - int protonResult; - PDLIST_ENTRY newestEntry = transportState->workInProgress.Blink; - PAMQP_WORK_ITEM currentWork = containingRecord(newestEntry, AMQP_WORK_ITEM, link); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_179: [The status of the work item is acquired.]*/ - protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_180: [The item is settled.]*/ - if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0) + AMQP_VALUE source = NULL; + AMQP_VALUE target = NULL; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using source as ingress, target as the IoT hub FQDN, link name as sender-link and role as role_sender] + if ((source = messaging_create_source(MESSAGE_SENDER_SOURCE_ADDRESS)) == NULL) { - LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult); + LogError("Failed creating AMQP messaging source attribute.\r\n"); } - if ((protonResult == PN_STATUS_ACCEPTED) || - (protonResult == PN_STATUS_REJECTED) || - (protonResult == PN_STATUS_RELEASED)) + else if ((target = messaging_create_target(STRING_c_str(transport_state->targetAddress))) == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_182: [Items that have been settled by the service will have their IOTHUB_MESSAGE_LIST completed.]*/ - IoTHubClient_LL_SendComplete(transportState->savedClientHandle, ¤tWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED); + LogError("Failed creating AMQP messaging target attribute.\r\n"); + } + else if ((transport_state->sender_link = link_create(transport_state->session, MESSAGE_SENDER_LINK_NAME, role_sender, source, target)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_069: [If IoTHubTransportAMQP_DoWork fails to create the AMQP link for sending messages, the function shall fail and return immediately, flagging the connection to be re-stablished] + LogError("Failed creating AMQP link for message sender.\r\n"); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_178: [Items that are not settled by the service will have their IOTHUB_MESSAGE_LIST removed from the work item and placed back on the waiting to send.]*/ - putSecondListAfterHeadOfFirst(transportState->waitingToSend->Flink, ¤tWork->eventMessages); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter Link MAX message size] + if (link_set_max_message_size(transport_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK) + { + LogError("Failed setting AMQP link max message size.\r\n"); + } + + attachDeviceClientTypeToLink(transport_state->sender_link); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] + if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, NULL, NULL, NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] + LogError("Could not allocate AMQP message sender\r\n"); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_072: [IoTHubTransportAMQP_DoWork shall open the AMQP message sender using messagesender_open() AMQP API] + if (messagesender_open(transport_state->message_sender) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_073: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be opened, flagging the connection to be re-established] + LogError("Failed opening the AMQP message sender.\r\n"); + } + else + { + result = RESULT_OK; + } + } } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_181: [The work item is removed from the in progress list and placed on the available list.]*/ - DList_RemoveEntryList(newestEntry); - DList_InsertTailList(&transportState->availableWorkItems, newestEntry); + + if (source != NULL) + amqpvalue_destroy(source); + if (target != NULL) + amqpvalue_destroy(target); } + + return result; } -static void reclaimEventResources(PAMQP_TRANSPORT_STATE transportState) + +static int destroyMessageReceiver(AMQP_TRANSPORT_INSTANCE* transport_state) { - PDLIST_ENTRY currentListEntry; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_064: [Obtain the head of the workInProgress]*/ - currentListEntry = transportState->workInProgress.Flink; - while (currentListEntry != &transportState->workInProgress) + int result = RESULT_FAILURE; + + if (transport_state->message_receiver != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_065: [While head != listhead reclaimEventResources will do as follows.] */ - int protonResult; - PAMQP_WORK_ITEM currentWork = containingRecord(currentListEntry, AMQP_WORK_ITEM, link); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_066: [Check the current status via pn_messenger_status via the tracker stored in the AMQP_WORK_ITEM.]*/ - if ((protonResult = pn_messenger_status(transportState->messenger, currentWork->tracker)) == PN_STATUS_PENDING) + if (messagereceiver_close(transport_state->message_receiver) != RESULT_OK) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_183: [if the amount of time that the work item has been pending exceeds a timeout stored in the work item, the messenger will be marked as not initialized.]*/ - size_t secondsSinceTheEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + 0); - if (currentWork->expiry < secondsSinceTheEpoch) - { - LogError("A event operation timed out.\r\n"); - transportState->messengerInitialized = false; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_067: [reclaimEventResources shall go onto the next work item if the proton status is PN_STATUS_PENDING.]*/ - currentListEntry = currentListEntry->Flink; + LogError("Failed closing the AMQP message receiver.\r\n"); + } + + messagereceiver_destroy(transport_state->message_receiver); + + transport_state->message_receiver = NULL; + + link_destroy(transport_state->receiver_link); + + transport_state->receiver_link = NULL; + + result = RESULT_OK; + } + + return result; +} + +static int createMessageReceiver(AMQP_TRANSPORT_INSTANCE* transport_state, IOTHUB_CLIENT_LL_HANDLE iothub_client_handle) +{ + int result = RESULT_FAILURE; + + if (transport_state->message_sender == NULL) + { + AMQP_VALUE source = NULL; + AMQP_VALUE target = NULL; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using source as messageReceiveAddress, target as the ingress-rx, link name as receiver-link and role as role_receiver] + if ((source = messaging_create_source(STRING_c_str(transport_state->messageReceiveAddress))) == NULL) + { + LogError("Failed creating AMQP message receiver source attribute.\r\n"); + } + else if ((target = messaging_create_target(MESSAGE_RECEIVER_TARGET_ADDRESS)) == NULL) + { + LogError("Failed creating AMQP message receiver target attribute.\r\n"); + } + else if ((transport_state->receiver_link = link_create(transport_state->session, MESSAGE_RECEIVER_LINK_NAME, role_receiver, source, target)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_075: [If IoTHubTransportAMQP_DoWork fails to create the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] + LogError("Failed creating AMQP link for message receiver.\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_076: [IoTHubTransportAMQP_DoWork shall set the receiver link settle mode as receiver_settle_mode_first] + else if (link_set_rcv_settle_mode(transport_state->receiver_link, receiver_settle_mode_first) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_141: [If IoTHubTransportAMQP_DoWork fails to set the settle mode on the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] + LogError("Failed setting AMQP link settle mode for message receiver.\r\n"); } else { - DLIST_ENTRY savedFromCurrentListEntry; - int settleResult; - // Yay! It's finished. Take it off the in progress list. - savedFromCurrentListEntry.Flink = currentListEntry->Flink; // We need to save this because it will be stomped on when we remove it from the work in progress. + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter Link MAX message size] + if (link_set_max_message_size(transport_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK) + { + LogError("Failed setting AMQP link max message size for message receiver.\r\n"); + } + + attachDeviceClientTypeToLink(transport_state->receiver_link); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_068: [Otherwise reclaimEventResources will use the result of IOTHUB_BATCHSTATE_SUCCESS if the proton status was PN_STATUS_ACCEPTED.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_069: [Otherwise reclaimEventResources will use the status of IOTHUB_BATCHSTATE_FAILED if the proton status was NOT PN_STATUS_ACCEPTED.] */ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_070: [reclaimEventResources will invoke IotHubClient_SendBatchComplete with the IOTHUB_CLIENT_LL_HANDLE, supplied to reclaimEventResources, also pass the eventMessages stored in the AMQP_WORK_ITEM, and finally pass the above IOTHUB_BATCH status.]*/ - IoTHubClient_LL_SendComplete(transportState->savedClientHandle, ¤tWork->eventMessages, (protonResult == PN_STATUS_ACCEPTED) ? IOTHUB_BATCHSTATE_SUCCESS : IOTHUB_BATCHSTATE_FAILED); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_090: [reclaimEventResources will release the tracker by invoking pn_messenger_settle.]*/ - if ((settleResult = pn_messenger_settle(transportState->messenger, currentWork->tracker, 0)) != 0) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_077: [IoTHubTransportAMQP_DoWork shall create the AMQP message receiver using messagereceiver_create() AMQP API] + if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, NULL, NULL)) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] + LogError("Could not allocate AMQP message receiver.\r\n"); + } + else { - LogError("Attempt to settle a tracker produced an error: %d\r\n", settleResult); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_079: [IoTHubTransportAMQP_DoWork shall open the AMQP message receiver using messagereceiver_open() AMQP API, passing a callback function for handling C2D incoming messages] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the on_message_received as the callback function] + if (messagereceiver_open(transport_state->message_receiver, on_message_received, (const void*)iothub_client_handle) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_080: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established] + LogError("Failed opening the AMQP message receiver.\r\n"); + } + else + { + result = RESULT_OK; + } } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_071: [reclaimEventResources will take the current work item from the workInProgress list and insert it on the tail of the availableWorkItems list.]*/ - DList_RemoveEntryList(currentListEntry); - DList_InsertTailList(&transportState->availableWorkItems, currentListEntry); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_072: [reclaimEventResources will continue to iterate over the workInProgress list until it returns to the list head.]*/ - currentListEntry = savedFromCurrentListEntry.Flink; } + + if (source != NULL) + amqpvalue_destroy(source); + if (target != NULL) + amqpvalue_destroy(target); } + + return result; } -static void sendEvent(PAMQP_TRANSPORT_STATE transportState) +static int sendPendingEvents(AMQP_TRANSPORT_INSTANCE* transport_state) { - while (!DList_IsListEmpty(&transportState->availableWorkItems)) + int result = RESULT_OK; + IOTHUB_MESSAGE_LIST* message = NULL; + + while ((message = getNextEventToSend(transport_state)) != NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_073: [While the availableWorkItems is NOT empty loop.] */ - PAMQP_WORK_ITEM headOfAvailable = containingRecord(transportState->availableWorkItems.Flink, AMQP_WORK_ITEM, link); - size_t payloadSize; - const unsigned char* payloadAddress; - amqp_batch_result amqpBatchResult; - amqpBatchResult = prepareBatch(MAXIMUM_EVENT_LENGTH, transportState->waitingToSend, &headOfAvailable->eventMessages, &payloadSize, &payloadAddress); - if (amqpBatchResult == amqp_batch_nowork) + result = RESULT_FAILURE; + + IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle); + const unsigned char* messageContent = NULL; + size_t messageContentSize = 0; + MESSAGE_HANDLE amqp_message; + bool is_message_error = false; + EVENT_TRACKER* event_tracker; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an in-progress list right before processing them for sending] + if ((event_tracker = trackEvent(message, transport_state)) == NULL) + { + LogError("Failed tracking the event to be sent.\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()] + else if (contentType == IOTHUBMESSAGE_BYTEARRAY && + IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK) { - // no work to do. Stop trying. - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the message.]*/ - break; + LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n"); + is_message_error = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_090: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain the size of its char* representation using strlen()] + else if (contentType == IOTHUBMESSAGE_STRING && + ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL || (messageContentSize = strlen(messageContent)) <= 0)) + { + LogError("Failed getting the STRING representation of the event content to be sent.\r\n"); + is_message_error = true; } - else if (amqpBatchResult == amqp_batch_error) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_092: [If the event contains a message of type IOTHUBMESSAGE_UNKNOWN, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] + else if (contentType == IOTHUBMESSAGE_UNKNOWN) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_078: [If the event failed call invoke IotHubClient_SendBatchComplete as above utilizing the eventMessages list of the head of the availableWorkItems and a status of IOTHUB_BATCHSTATE_FAILED.]*/ - IoTHubClient_LL_SendComplete(transportState->savedClientHandle, &headOfAvailable->eventMessages, IOTHUB_BATCHSTATE_FAILED); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_079: [Continue this send loop.]*/ + LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.\r\n"); + is_message_error = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() AMQP API] + else if ((amqp_message = message_create()) == NULL) + { + LogError("Failed allocating the AMQP message for sending the event.\r\n"); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_080: [If the event did not fail then prepare a proton message using the following proton API: pn_message_set_address, pn_message_set_inferred, pn_message_body and pn_data_put_binary with the above saved size and address values.]*/ - if (prepareSimpleProtonMessage(transportState->message, transportState->eventAddress, payloadSize, payloadAddress) == false) + BINARY_DATA binary_data = { messageContent, messageContentSize }; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() AMQP API] + if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ - putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ - break; + LogError("Failed setting the body of the AMQP message.\r\n"); } - else if (setProperties(&headOfAvailable->eventMessages, transportState->message) != 0) + else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ - putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ - break; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] + if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, (const void*)event_tracker) != RESULT_OK) + { + LogError("Failed sending the AMQP message.\r\n"); + } + else + { + result = RESULT_OK; + } } - else if (setSendMessageIds(&headOfAvailable->eventMessages, transportState->message)) + } + + if (amqp_message != NULL) + { + // It can be destroyed because AMQP keeps a clone of the message. + message_destroy(amqp_message); + } + + if (result != RESULT_OK) + { + if (event_tracker == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ - putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/ break; } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_083: [If the proton API is successful then invoke pn_messenger_put.]*/ - int protonResult; - protonResult = pn_messenger_put(transportState->messenger, transportState->message); - if (protonResult != 0) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] + if (is_message_error) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_084: [If that fails, sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/ - putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_085: [sendEvent will then break out of the send loop.]*/ - transportState->messengerInitialized = false; - break; - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_184: [The work item will also contain a timeout that shall denote how long the transport will wait before initiating error recovery.]*/ - headOfAvailable->expiry = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + transportState->eventTimeout); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_086: [If the pn_messenger_put succeeds then sendEvent will save off a tracker obtained by invoking pn_messenger_outgoing_tracker and save it the head of the availableWorkItems. The eventMessages in the AMQP_WORK_ITEM head will also contain the IOTHUB_MESSAGE_LIST.]*/ - headOfAvailable->tracker = pn_messenger_outgoing_tracker(transportState->messenger); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_087: [The head of the availableWorkItems will be removed and placed on the workInProgress list.]*/ - DList_InsertTailList(&transportState->workInProgress, DList_RemoveHeadList(&transportState->availableWorkItems)); - } - } - } -} -static void processCBSReply(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker) -{ - bool goodMessage = false; - pn_atom_t correlationId; - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_112: [The correlationId message property is obtained via a call to pn_message_get_correlation_id.]*/ - correlationId = pn_message_get_correlation_id(transportState->message); - - // - // If it isn't the one we're looking for, simply accept it, because we don't want to see it and we can't do anything with it. - // - - if (correlationId.u.as_ulong != transportState->sendTokenMessageId) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_113: [The id is compared to the state variable sendTokenMessageId. If there is no match, an error is logged, the message is abandoned and processCBSReply returns.]*/ - LogError("An old reply from a CBS renewal drifed in. Moving on\r\n"); - } - else - { - // - // This is the response we're looking for! - // - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_114: [Obtain the properties of the message by invoking pn_message_properties.]*/ - pn_data_t* propertyData = pn_message_properties(transportState->message); - - transportState->waitingForPutTokenReply = false; - if (propertyData == NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_115: [If NULL then abandon the message and return.]*/ - LogError("Invalid response back from the CBS - no application properties\r\n"); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_116: [Advance to the first property of the message via pn_data_next.]*/ - else if (pn_data_next(propertyData) == false) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_117: [If this fails, abandon and return.]*/ - LogError("Invalid response back from the CBS - application properties malformed\r\n"); - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_118: [Invoke pn_data_get_map to obtain the number of properties.]*/ - size_t numberOfProperties = pn_data_get_map(propertyData) / 2; - if (numberOfProperties < 1) // We MUST have at least the status property - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_119: [If the number of properties is 0 then abandon and return.]*/ - LogError("Invalid response back from the CBS - invalid number of properties: %zu\r\n", pn_data_get_map(propertyData)); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_120: [Enter the property map by invoking pn_data_enter.]*/ - else if (pn_data_enter(propertyData) == false) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_121: [If that fails then abandon and return.]*/ - LogError("Invalid response back from the CBS - unable to access the sent properties\r\n"); - } - else - { - pn_bytes_t propertyName; - int32_t resultStatus; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_122: [Get the property name.]*/ - if (getMapString(propertyData, &propertyName) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_123: [If that fails then abandon and return.]*/ - LogError("Unable to acquire the status value from the cbs request\r\n"); - } - else if (strcmp(propertyName.start, PROTON_MAP_STATUS_CODE) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_124: [If the property name is not "status-code" than abandon and return.]*/ - LogError("Unknown application property returned: %s\r\n", propertyName.start); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_125: [Get the actual status code.]*/ - else if (getMapInt(propertyData, &resultStatus) != 0) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_126: [If that fails then abandon and return.]*/ - LogError("Unable to acquire the actual status of the CBS put token\r\n"); - } - else if (resultStatus != 200) - { - pn_bytes_t statusDescription; - // - // Yeah, it's not what we want to hear but it's not as though anything was wrong with - // the message. - // - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_127: [If the status code is not equal to 200 then attempt to acquire the explanation from the properties.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_128: [Accept the message and return.]*/ - goodMessage = true; - LogError("The CBS put token operation failed: %d\r\n", resultStatus); - if (numberOfProperties >= 2) - { - // - // Well we have an error. See if the next property is the status description. - // If it is, then log the value as further explanation - // - if (getMapString(propertyData, &propertyName) == 0) - { - if (strcmp(propertyName.start, PROTON_MAP_STATUS_DESCRIPTION) == 0) - { - if (getMapString(propertyData, &statusDescription) == 0) - { - LogError("The CBS put token operation failed due to: %s\r\n", statusDescription.start); - } - } - } - } + on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_129: [If the status code was 200 then set the state variable putTokenWasSuccessful, accept the message and return.]*/ - goodMessage = true; - transportState->putTokenWasSuccessful = true; + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + rollEventBackToWaitList(event_tracker, transport_state); + break; } } } } - if (goodMessage == true) - { - pn_messenger_accept(transportState->messenger, tracker, 0); - } - else - { - pn_messenger_release(transportState->messenger, tracker, 0); - } - pn_messenger_settle(transportState->messenger, tracker, 0); + + return result; +} + +static bool isSasTokenRefreshRequired(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) >= (transport_state->sas_token_refresh_time / 1000)) ? true : false; +} + +static void prepareForConnectionRetry(AMQP_TRANSPORT_INSTANCE* transport_state) +{ + destroyConnection(transport_state); + rollEventsBackToWaitList(transport_state); } -static void processMessage(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker) + +// API functions + +static TRANSPORT_HANDLE IoTHubTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullMessage is false then the message shall be rejected and processMessage shall return.]*/ - if (transportState->DoWork_PullMessages == false) - { - // - // Nothing to do with the message. We reject it. In that way it can come back later. - // - pn_messenger_reject(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - } - else - { - pn_data_t *body; + AMQP_TRANSPORT_INSTANCE* transport_state = NULL; + bool cleanup_required = false; + int deviceIdLength = -1; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processMessage will acquire the body of the message by invoking pn_message_body.]*/ - if ((body = pn_message_body(transportState->message)) == NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the message will be abandoned.]*/ - LogError("Failed to get the proton message body\r\n"); - pn_messenger_release(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - } - else - { - pn_bytes_t sizeAndData; - bool keepProcessingThisMessage = true; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processMessage will check for a null (note the lowercase) body by invoking pn_data_next.]*/ - if (pn_data_next(body) == false) - { - // - // We have a null body. This is reasonable. It should also be pointed out that - // pn_data_next call shouldn't be thought of as checking for the existence of something. - // It should be thought of as actually MOVING through a tree structure in the message. - // - sizeAndData.size = 0; - sizeAndData.start = NULL; - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processMessage will check that the body is of type PN_BINARY.]*/ - if (PN_BINARY != pn_data_type(body)) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processMessage will abandon the message with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/ - LogError("Message received via AMQP was not PN_BINARY\r\n"); - sizeAndData.size = 0; // Compilers get upset. - sizeAndData.start = NULL; - pn_messenger_release(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - keepProcessingThisMessage = false; - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_057: [The length and a pointer to the payload will be acquired and saved by invoking pn_data_get_binary.]*/ - sizeAndData = pn_data_get_binary(body); - } - } - if (keepProcessingThisMessage == true) - { - IOTHUB_MESSAGE_HANDLE ioTMessage; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_058: [The IOTHUB_MESSAGE_HANDLE will be set by invoking IotHubMessage_SetData with the previously saved length and pointer to payload.]*/ - if ((ioTMessage = IoTHubMessage_CreateFromByteArray((const unsigned char*)sizeAndData.start, sizeAndData.size)) == NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processMessage will abandon the message if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/ - LogError("Failed to set data for IoT hub message\r\n"); - pn_messenger_release(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - } - else if (cloneProperties(ioTMessage, transportState->message) != 0) - { - LogError("Failed to clone properties for IoT hub message\r\n"); - pn_messenger_release(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - } - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_07_003: [If the pn_message_get_id value is not NULL then the value will be set by calling IotHubMessage_SetMessageId.] */ - else if (setRecvMessageIds(ioTMessage, transportState->message) != 0) - { - LogError("Failed to set MessageId for IoT hub message\r\n"); - pn_messenger_release(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_MessageCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/ - IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_MessageCallback(transportState->savedClientHandle, ioTMessage); - - if (upperLayerDisposition == IOTHUBMESSAGE_ACCEPTED) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ACCEPTED value then the message will be accepted.] */ - pn_messenger_accept(transportState->messenger, tracker, 0); - } - else if (upperLayerDisposition == IOTHUBMESSAGE_ABANDONED) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ABANDONED value then the message will be abandoned.] */ - pn_messenger_release(transportState->messenger, tracker, 0); - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_REJECTED value then the message will be rejected.]*/ - pn_messenger_reject(transportState->messenger, tracker, 0); - } - pn_messenger_settle(transportState->messenger, tracker, 0); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processMessage will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/ - IoTHubMessage_Destroy(ioTMessage); - } - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_005: [If parameter config (or its fields) is NULL then IoTHubTransportAMQP_Create shall fail and return NULL.] + if (config == NULL || config->upperConfig == NULL || config->waitingToSend == NULL) + { + LogError("IoTHub AMQP client transport null configuration parameter.\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_006: [IoTHubTransportAMQP_Create shall fail and return NULL if any fields of the config structure are NULL.] + else if (config->upperConfig->protocol == NULL) + { + LogError("Invalid configuration (NULL protocol detected)\r\n"); + } + else if (config->upperConfig->deviceId == NULL) + { + LogError("Invalid configuration (NULL deviceId detected)\r\n"); + } + else if (config->upperConfig->deviceKey == NULL) + { + LogError("Invalid configuration (NULL deviceKey detected)\r\n"); + } + else if (config->upperConfig->iotHubName == NULL) + { + LogError("Invalid configuration (NULL iotHubName detected)\r\n"); } -} - -static void processReceives(TRANSPORT_HANDLE handle) -{ - PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; - int receiveResult; - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullMessage is false then DoWork will cancel any preceding messages by invoking pn_messenger_recv.]*/ - - if ((transportState->DoWork_PullMessages == false) && (transportState->waitingForPutTokenReply == false)) + else if (config->upperConfig->iotHubSuffix == NULL) + { + LogError("Invalid configuration (NULL iotHubSuffix detected)\r\n"); + } + else if (!config->waitingToSend) { - if ((receiveResult = pn_messenger_recv(transportState->messenger, 0)) != 0) - { - if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT)) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/ - LogError("Error attempting to receive messages: %d\r\n", receiveResult); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/ - transportState->messengerInitialized = false; - } - } + LogError("Invalid configuration (NULL waitingToSend list detected)\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_008: [IoTHubTransportAMQP_Create shall fail and return NULL if any config field of type string is zero length.] + else if ((deviceIdLength = strlen(config->upperConfig->deviceId)) == 0 || + (strlen(config->upperConfig->deviceKey) == 0) || + (strlen(config->upperConfig->iotHubName) == 0) || + (strlen(config->upperConfig->iotHubSuffix) == 0)) + { + LogError("Zero-length config parameter (deviceId, deviceKey, iotHubName or iotHubSuffix)\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_007: [IoTHubTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.] + else if (deviceIdLength > 128U) + { + LogError("deviceId is too long\r\n"); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_134: [IoTHubTransportAMQP_Create shall fail and return NULL if the combined length of config->iotHubName and config->iotHubSuffix exceeds 254 bytes (RFC1035)] + else if ((strlen(config->upperConfig->iotHubName) + strlen(config->upperConfig->iotHubSuffix)) > (RFC1035_MAX_FQDN_LENGTH - 1)) + { + LogError("The lengths of iotHubName and iotHubSuffix together exceed the maximum FQDN length allowed (RFC 1035)\r\n"); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullMessage or the waitingForPutTokenReply flag is true then processReceives will pull messages with pn_messenger_recv.]*/ - if ((receiveResult = pn_messenger_recv(transportState->messenger, PROTON_INCOMING_WINDOW_SIZE)) != 0) - { - if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT)) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/ - LogError("Error attempting to receive messages: %d\r\n", receiveResult); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/ - transportState->messengerInitialized = false; - } - } - else - { - // - // If any are present we need to dispose of messages in the incoming queue. - // - size_t depthOfReceiveQueue; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of messages received by invoking pn_messenger_incoming.]*/ - for (depthOfReceiveQueue = pn_messenger_incoming(transportState->messenger); depthOfReceiveQueue > 0; depthOfReceiveQueue--) - { - int result; - pn_message_clear(transportState->message); - // - // Ok something is there. First thing is to get it. - // - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_047: [processReceives will retrieve a message with pn_messenger_get.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_048: [If pn_messenger_get fails we log and break the receive loop.]*/ - if ((result = pn_messenger_get(transportState->messenger, transportState->message)) != 0) - { - // - // Well that's pretty odd. We shouldn't have gotten here if the message queue was empty! - // - LogError("Message reception queue unexpectedly empty! Code: %d\r\n", result); - break; - } - else - { - // - // We'll need a tracker to dispose of the message. We need the subscription to determine who should process it. - // - pn_subscription_t* theMessageSource; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_049: [processReceives shall acquire a tracker by invoking pn_messenger_incoming_tracker.]*/ - pn_tracker_t tracker = pn_messenger_incoming_tracker(transportState->messenger); - - // - // Currently we can receive two types of messages. We can recieve status messages from the $CBS that indicates the success - // or failure of renewing the SAS for the device. - // - // Otherwise we can recieve a simple message. We can get a pointer to the subscription from the tracker. We can compare - // that pointer to the subscriptions that we stored when we initialized the messenger. We then dispatch appropriately. - // - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_106: [processReceives acquires the subscription of the message by invoking pn_messneger_incoming_subscription.] This value is henceforth known as theMessageSource.*/ - theMessageSource = pn_messenger_incoming_subscription(transportState->messenger); - if (theMessageSource == transportState->messageSubscription) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/ - processMessage(transportState, tracker); - } - else if (theMessageSource == transportState->cbsSubscription) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/ - processCBSReply(transportState, tracker); - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_109: [Otherwise, log an error and abandon the message so that it never returns.]*/ - LogError("Message received from an unknown sender\r\n"); - pn_messenger_release(transportState->messenger, tracker, 0); - pn_messenger_settle(transportState->messenger, tracker, 0); - } - } - } - pn_message_clear(transportState->message); - } - } -} - -static void clientTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) -{ - PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_019: [clientTransportAMQP_Dowork shall do no work if handle is NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_020: [clientTransportAMQP_Dowork shall do no work if iotHubClientHandle is NULL.]*/ - if (transportState && iotHubClientHandle) - { - transportState->savedClientHandle = iotHubClientHandle; - if (protonMessengerInit(transportState)) - { - renewIfNecessaryTheCBS(transportState); - if (transportState->messengerInitialized == true) reclaimEventResources(transportState); - if (transportState->messengerInitialized == true) sendEvent(transportState); - if (transportState->messengerInitialized == true) processReceives(transportState); - - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_088: [pn_messenger_work shall be invoked following all the above actions.]*/ - if (transportState->messengerInitialized == true) checkForErrorsThenWork(transportState); - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transports internal state structure fails.] + transport_state = (AMQP_TRANSPORT_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_INSTANCE)); - // - // Try to get rid of any old messengers that we were NOT able to stop. - // - herdTheMessengers(transportState, false); - } - return; -} - -static bool createUrledDeviceId(PAMQP_TRANSPORT_STATE state, const IOTHUBTRANSPORT_CONFIG* config) -{ - return ((state->urledDeviceId = URL_EncodeString(config->upperConfig->deviceId)) == NULL) ? (false) : (true); -} -static bool createDevicesPortionPath(PAMQP_TRANSPORT_STATE state, const char* host) -{ - return (((state->devicesPortionPath = STRING_construct(host)) == NULL) || - (STRING_concat(state->devicesPortionPath, "/devices/") != 0) || - (STRING_concat_with_STRING(state->devicesPortionPath, state->urledDeviceId) != 0)) ? (false) : (true); -} -static bool createEventAddress(PAMQP_TRANSPORT_STATE state) -{ - return (((state->eventAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || - (STRING_concat_with_STRING(state->eventAddress, state->devicesPortionPath) != 0) || - (STRING_concat(state->eventAddress, EVENT_ENDPOINT) != 0)) ? (false) : (true); -} - -static bool createMessageAddress(PAMQP_TRANSPORT_STATE state) -{ - return (((state->messageAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || - (STRING_concat_with_STRING(state->messageAddress, state->devicesPortionPath) != 0) || - (STRING_concat(state->messageAddress, MESSAGE_ENDPOINT) != 0)) ? (false) : (true); -} - -static bool createCbsAddress(PAMQP_TRANSPORT_STATE state, const char* host) -{ - return (((state->cbsAddress = STRING_construct(AMQPS_SCHEME)) == NULL) || - (STRING_concat(state->cbsAddress,host) != 0) || - (STRING_concat(state->cbsAddress, CBS_ENDPOINT) != 0)) ? (false) : (true); -} - -static STRING_HANDLE createHost(const IOTHUBTRANSPORT_CONFIG* config) -{ - STRING_HANDLE result = NULL; - if (((result = STRING_construct(config->upperConfig->iotHubName)) == NULL) || - (STRING_concat(result, ".") != 0) || - (STRING_concat(result, config->upperConfig->iotHubSuffix) != 0)) - { - if (result != NULL) - { - STRING_delete(result); - result = NULL; - } - } - return result; -} - -static TRANSPORT_HANDLE clientTransportAMQP_Create(const IOTHUBTRANSPORT_CONFIG* config) -{ - PAMQP_TRANSPORT_STATE amqpState = NULL; - if (!config) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_001: [If parameter config is NULL then clientTransportAMQP_Create shall fail and return NULL.]*/ - LogError("IoTHub amqp client tranport null configuration parameter.\r\n"); - } - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_002: [clientTransportAMQP_Create shall fail and return NULL if any other fields of the config structure are NULL.]*/ - else if (config->upperConfig->protocol == NULL) - { - LogError("invalid configuration (NULL protocol detected)\r\n"); - } - else if (config->upperConfig->deviceId == NULL) - { - LogError("invalid configuration (NULL deviceId detected)\r\n"); - } - else if (config->upperConfig->deviceKey == NULL) - { - LogError("invalid configuration (NULL deviceKey detected)\r\n"); - } - else if (config->upperConfig->iotHubName == NULL) - { - LogError("invalid configuration (NULL iotHubName detected)\r\n"); - } - else if (config->upperConfig->iotHubSuffix == NULL) - { - LogError("invalid configuration (NULL iotHubSuffix detected)\r\n"); - } - else if (!config->waitingToSend) - { - LogError("invalid configuration (NULL waitingToSend list detected)\r\n"); - } - else if (strlen(config->upperConfig->deviceId) > 128U) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_016: [clientTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.]*/ - LogError("deviceName is too long\r\n"); - } - else if ((strlen(config->upperConfig->deviceId) == 0) || - (strlen(config->upperConfig->deviceKey) == 0) || - (strlen(config->upperConfig->iotHubName) == 0) || - (strlen(config->upperConfig->iotHubSuffix) == 0)) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_017: [clientTransportAMQP_Create shall fail and return NULL if any string config field is zero length.]*/ - LogError("zero length config parameter\r\n"); - } - else - { - amqpState = malloc(sizeof(AMQP_TRANSPORT_STATE)); - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ - if (amqpState == NULL) + if (transport_state == NULL) { LogError("Could not allocate AMQP transport state\r\n"); } else { - amqpState->messageAddress = NULL; - amqpState->eventAddress = NULL; - amqpState->urledDeviceId = NULL; - amqpState->devicesPortionPath = NULL; - amqpState->cbsAddress = NULL; - amqpState->deviceKey = NULL; - amqpState->savedClientHandle = NULL; - amqpState->messenger = NULL; - amqpState->messageSubscription = NULL; - amqpState->cbsSubscription = NULL; - amqpState->zeroLengthString = NULL; - amqpState->messengerInitialized = false; - amqpState->waitingForPutTokenReply = false; - amqpState->cbsRequestAcceptTime = CBS_DEFAULT_REQUEST_ACCEPT_TIME; - amqpState->cbsReplyTime = CBS_DEFAULT_REPLY_TIME; - amqpState->sasTokenLifetime = SAS_TOKEN_DEFAULT_LIFETIME; - amqpState->sasRefreshLine = SAS_TOKEN_DEFAULT_REFRESH_LINE; - amqpState->eventTimeout = EVENT_TIMEOUT_DEFAULT; - amqpState->waitingToSend = config->waitingToSend; - DList_InitializeListHead(&amqpState->workInProgress); - DList_InitializeListHead(&amqpState->availableWorkItems); - DList_InitializeListHead(&amqpState->messengerCorral); - amqpState->DoWork_PullMessages = false; - amqpState->trustedCertificates = NULL; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ - if ((amqpState->message = pn_message()) == NULL) + transport_state->iotHubHostFqdn = NULL; + transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT; + transport_state->trusted_certificates = NULL; + transport_state->deviceKey = NULL; + transport_state->devicesPath = NULL; + transport_state->messageReceiveAddress = NULL; + transport_state->sasTokenKeyName = NULL; + transport_state->targetAddress = NULL; + transport_state->waitingToSend = config->waitingToSend; + + transport_state->cbs = NULL; + transport_state->cbs_state = CBS_STATE_IDLE; + transport_state->current_sas_token_create_time = 0; + transport_state->connection = NULL; + transport_state->connection_state = AMQP_MANAGEMENT_STATE_IDLE; + transport_state->connection_establish_time = 0; + transport_state->iothub_client_handle = NULL; + transport_state->receive_messages = false; + transport_state->message_receiver = NULL; + transport_state->message_sender = NULL; + transport_state->receiver_link = NULL; + transport_state->sasl_io = NULL; + transport_state->sasl_mechanism = NULL; + transport_state->sender_link = NULL; + transport_state->session = NULL; + transport_state->tls_io = NULL; + transport_state->tls_io_transport_provider = getTLSIOTransport; + + transport_state->waitingToSend = config->waitingToSend; + DList_InitializeListHead(&transport_state->inProgress); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_010: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as iotHubHostFqdn, from the following pieces: config->iotHubName + "." + config->iotHubSuffix.] + if ((transport_state->iotHubHostFqdn = concat3Params(config->upperConfig->iotHubName, ".", config->upperConfig->iotHubSuffix)) == NULL) { - LogError("Unable to preallocate the proton message!\r\n"); - clientTransportAMQP_Destroy(amqpState); - amqpState = NULL; + LogError("Failed to set transport_state->iotHubHostFqdn.\r\n"); + cleanup_required = true; } - else + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + /devices/ + deviceId.] + else if ((transport_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", config->upperConfig->deviceId)) == NULL) { - size_t workItemsToPreallocate; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/ - for (workItemsToPreallocate = PROTON_EVENT_OUTGOING_WINDOW_SIZE; workItemsToPreallocate > 0; workItemsToPreallocate--) - { - PAMQP_WORK_ITEM currentItem = malloc(sizeof(AMQP_WORK_ITEM)); - if (!currentItem) - { - // - // Not a particularly good sign. Fail the create. - // - LogError("Unable to preallocate work item!\r\n"); - clientTransportAMQP_Destroy(amqpState); - amqpState = NULL; - break; - } - else - { - DList_InsertTailList(&amqpState->availableWorkItems, ¤tItem->link); - } - } - if (amqpState != NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_095: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as host, from the following pieces: "config->iotHubName + "." + config->iotHubSuffix.]*/ - STRING_HANDLE host = createHost(config); - if (host != NULL) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_091: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as urledDeviceId, by url encoding the config->deviceId.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_093: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as devicePortionOfPath, from the following pieces: host + "/devices/" + urledDeviceId.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_097: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as eventAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/events".]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as messageAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_101: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as cbsAddress, from the following pieces: "amqps://" + host + "/$cbs".]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_103: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as deviceKey, from the config->deviceKey.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_186: [An empty string shall be created. If this fails then clientTransportAMQP shall fail and return NULL.]*/ - if ((!createUrledDeviceId(amqpState, config)) || - (!createDevicesPortionPath(amqpState, STRING_c_str(host))) || - (!createEventAddress(amqpState)) || - (!createMessageAddress(amqpState)) || - (!createCbsAddress(amqpState, STRING_c_str(host))) || - ((amqpState->deviceKey = STRING_construct(config->upperConfig->deviceKey)) == NULL) || - ((amqpState->zeroLengthString = STRING_new()) == NULL)) - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_092: [If creating the urledDeviceId fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_094: [If creating the devicePortionOfPath fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_098: [If creating the eventAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the messageAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_102: [If creating the cbsAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_104: [If creating the deviceKey fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - clientTransportAMQP_Destroy(amqpState); - amqpState = NULL; - } - STRING_delete(host); - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_096: [If creating the schemeAndHost fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/ - clientTransportAMQP_Destroy(amqpState); - amqpState = NULL; - } - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_013: [If creating devicesPath fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] + LogError("Failed to allocate transport_state->devicesPath.\r\n"); + cleanup_required = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_014: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as targetAddress, from the following parts: "amqps://" + devicesPath + "/messages/events".] + else if ((transport_state->targetAddress = concat3Params("amqps://", STRING_c_str(transport_state->devicesPath), "/messages/events")) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_015: [If creating the targetAddress fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] + LogError("Failed to allocate transport_state->targetAddress.\r\n"); + cleanup_required = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_053: [IoTHubTransportAMQP_Create shall define the source address for receiving messages as "amqps://" + devicesPath + "/messages/devicebound", stored in the transport handle as messageReceiveAddress] + else if ((transport_state->messageReceiveAddress = concat3Params("amqps://", STRING_c_str(transport_state->devicesPath), "/messages/devicebound")) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_054: [If creating the messageReceiveAddress fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] + LogError("Failed to allocate transport_state->messageReceiveAddress.\r\n"); + cleanup_required = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_016: [IoTHubTransportAMQP_Create shall initialize handle->sasTokenKeyName with a zero-length STRING_HANDLE instance.] + else if ((transport_state->sasTokenKeyName = STRING_new()) == NULL) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_017: [If IoTHubTransportAMQP_Create fails to initialize handle->sasTokenKeyName with a zero-length STRING the function shall fail and return NULL.] + LogError("Failed to allocate transport_state->sasTokenKeyName.\r\n"); + cleanup_required = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transports own deviceKey field] + else if ((transport_state->deviceKey = STRING_new()) == NULL || + STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_019: [If IoTHubTransportAMQP_Create fails to copy config->deviceKey, the function shall fail and return NULL.] + LogError("Failed to allocate transport_state->deviceKey.\r\n"); + cleanup_required = true; + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_020: [IoTHubTransportAMQP_Create shall set parameter transport_state->sas_token_lifetime with the default value of 3600000 (milliseconds).] + transport_state->sas_token_lifetime = DEFAULT_SAS_TOKEN_LIFETIME_MS; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_128: [IoTHubTransportAMQP_Create shall set parameter transport_state->sas_token_refresh_time with the default value of sas_token_lifetime/2 (milliseconds).] + transport_state->sas_token_refresh_time = transport_state->sas_token_lifetime / 2; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_129 : [IoTHubTransportAMQP_Create shall set parameter transport_state->cbs_request_timeout with the default value of 30000 (milliseconds).] + transport_state->cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_130 : [IoTHubTransportAMQP_Create shall set parameter transport_state->message_send_timeout with the default value of 300000 (milliseconds).] + transport_state->message_send_timeout = DEFAULT_MESSAGE_SEND_TIMEOUT_MS; } } } - return amqpState; + + if (cleanup_required) + { + if (transport_state->deviceKey != NULL) + STRING_delete(transport_state->deviceKey); + if (transport_state->sasTokenKeyName != NULL) + STRING_delete(transport_state->sasTokenKeyName); + if (transport_state->targetAddress != NULL) + STRING_delete(transport_state->targetAddress); + if (transport_state->messageReceiveAddress != NULL) + STRING_delete(transport_state->messageReceiveAddress); + if (transport_state->devicesPath != NULL) + STRING_delete(transport_state->devicesPath); + if (transport_state->iotHubHostFqdn != NULL) + STRING_delete(transport_state->iotHubHostFqdn); + + free(transport_state); + transport_state = NULL; + } + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_023: [If IoTHubTransportAMQP_Create succeeds it shall return a non-NULL pointer to the structure that represents the transport.] + return transport_state; } -static int clientTransportAMQP_Subscribe(TRANSPORT_HANDLE handle) +static void IoTHubTransportAMQP_Destroy(TRANSPORT_HANDLE handle) { - int result; + if (handle != NULL) + { + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_024: [IoTHubTransportAMQP_Destroy shall destroy the AMQP message_sender.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_029 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP link.] + destroyEventSender(transport_state); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_025: [IoTHubTransportAMQP_Destroy shall destroy the AMQP message_receiver.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_029 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP link.] + destroyMessageReceiver(transport_state); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_027 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP cbs instance] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_030 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP session.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_031 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP connection.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_032 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP SASL I / O transport.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_033 : [IoTHubTransportAMQP_Destroy shall destroy the AMQP SASL mechanism.] + destroyConnection(transport_state); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_035 : [IoTHubTransportAMQP_Destroy shall delete its internally - set parameters(deviceKey, targetAddress, devicesPath, sasTokenKeyName).] + STRING_delete(transport_state->targetAddress); + STRING_delete(transport_state->messageReceiveAddress); + STRING_delete(transport_state->sasTokenKeyName); + STRING_delete(transport_state->deviceKey); + STRING_delete(transport_state->devicesPath); + STRING_delete(transport_state->iotHubHostFqdn); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_036 : [IoTHubTransportAMQP_Destroy shall return the remaining items in inProgress to waitingToSend list.] + rollEventsBackToWaitList(transport_state); + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_150: [IoTHubTransportAMQP_Destroy shall destroy the transport instance] + free(transport_state); + } +} + +static void IoTHubTransportAMQP_DoWork(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) +{ + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_051: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the transport handle parameter is NULL] if (handle == NULL) { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_013: [If the parameter handle is NULL then clientTransportAMQP_Subscribe shall fail and return a non-zero value.]*/ - LogError("Invalid handle to IoTHubClient amqp subscribe.\r\n"); - result = 1; + LogError("IoTHubClient DoWork failed: transport handle parameter is NULL.\r\n"); + } + // Codes_[IoTHubTransportAMQP_DoWork shall fail and return immediately if the client handle parameter is NULL] + else if (iotHubClientHandle == NULL) + { + LogError("IoTHubClient DoWork failed: client handle parameter is NULL.\r\n"); } else { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_014: [clientTransportAMQP_Subscribe shall succeed and return a value of zero.]*/ - PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; - transportState->DoWork_PullMessages = true; + bool trigger_connection_retry = false; + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_147: [IoTHubTransportAMQP_DoWork shall save a reference to the client handle in transport_state->iothub_client_handle] + transport_state->iothub_client_handle = iotHubClientHandle; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_055: [If the transport handle has a NULL connection, IoTHubTransportAMQP_DoWork shall instantiate and initialize the AMQP components and establish the connection] + if (transport_state->connection == NULL && + establishConnection(transport_state) != RESULT_OK) + { + LogError("AMQP transport failed to establish connection with service.\r\n"); + trigger_connection_retry = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than sas_token_refresh_time milliseconds] + else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) && + startAuthentication(transport_state) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately] + LogError("Failed authenticating AMQP connection within CBS.\r\n"); + trigger_connection_retry = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for cbs_request_timeout milliseconds for the cbs_put_token() to complete before failing due to timeout] + else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS && + verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT) + { + LogError("AMQP transport authentication timed out.\r\n"); + trigger_connection_retry = true; + } + else if (transport_state->cbs_state == CBS_STATE_AUTHENTICATED) + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_121: [IoTHubTransportAMQP_DoWork shall create an AMQP message_receiver if transport_state->message_receive is NULL and transport_state->receive_messages is true] + if (transport_state->receive_messages == true && + transport_state->message_receiver == NULL && + createMessageReceiver(transport_state, iotHubClientHandle) != RESULT_OK) + { + LogError("Failed creating AMQP transport message receiver.\r\n"); + trigger_connection_retry = true; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_122: [IoTHubTransportAMQP_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] + else if (transport_state->receive_messages == false && + transport_state->message_receiver != NULL && + destroyMessageReceiver(transport_state) != RESULT_OK) + { + LogError("Failed destroying AMQP transport message receiver.\r\n"); + } + + if (transport_state->message_sender == NULL && + createEventSender(transport_state) != RESULT_OK) + { + LogError("Failed creating AMQP transport event sender.\r\n"); + trigger_connection_retry = true; + } + else if (sendPendingEvents(transport_state) != RESULT_OK) + { + LogError("AMQP transport failed sending events.\r\n"); + trigger_connection_retry = true; + } + } + + if (trigger_connection_retry) + { + prepareForConnectionRetry(transport_state); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_103: [IoTHubTransportAMQP_DoWork shall invoke connection_dowork() on AMQP for triggering sending and receiving messages] + connection_dowork(transport_state->connection); + } + + handleEventSendTimeouts(transport_state); + } +} + +static int IoTHubTransportAMQP_Subscribe(TRANSPORT_HANDLE handle) +{ + int result; + + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_037: [IoTHubTransportAMQP_Subscribe shall fail if the transport handle parameter received is NULL.] + if (handle == NULL) + { + LogError("Invalid handle to IoTHubClient AMQP transport.\r\n"); + result = __LINE__; + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_038: [IoTHubTransportAMQP_Subscribe shall set transport_handle->receive_messages to true and return success code.] + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; + transport_state->receive_messages = true; result = 0; } + return result; } -static void clientTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle) +static void IoTHubTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle) { - PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_015: [clientTransportAMQP_Unsubscribe shall do nothing if handle is NULL.]*/ - if (transportState) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_039: [IoTHubTransportAMQP_Unsubscribe shall fail if the transport handle parameter received is NULL.] + if (handle == NULL) { - transportState->DoWork_PullMessages = false; + LogError("Invalid handle to IoTHubClient AMQP transport.\r\n"); + } + else + { + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_040: [IoTHubTransportAMQP_Unsubscribe shall set transport_handle->receive_messages to false and return success code.] + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; + transport_state->receive_messages = false; } } -static IOTHUB_CLIENT_RESULT clientTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus) +static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus) { IOTHUB_CLIENT_RESULT result; - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_001: [clientTransportAMQP_GetSendStatus IoTHubTransportHttp_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter] */ + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_041: [IoTHubTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] if (handle == NULL) { result = IOTHUB_CLIENT_INVALID_ARG; @@ -1743,14 +1243,14 @@ } else { - PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle; + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_002: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent] */ - if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->workInProgress))) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_043: [IoTHubTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] + if (!DList_IsListEmpty(transport_state->waitingToSend) || !DList_IsListEmpty(&(transport_state->inProgress))) { *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY; } - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_09_003: [clientTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent] */ + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_042: [IoTHubTransportAMQP_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] else { *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE; @@ -1762,13 +1262,13 @@ return result; } -static IOTHUB_CLIENT_RESULT clientTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value) +static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_SetOption(TRANSPORT_HANDLE handle, const char* option, const void* value) { IOTHUB_CLIENT_RESULT result; - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_001: [If handle parameter is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_002: [If parameter optionName is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_003: [If parameter value is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */ + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_044: [If handle parameter is NULL then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_045: [If parameter optionName is NULL then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_046: [If parameter value is NULL then IoTHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] if ( (handle == NULL) || (option == NULL) || @@ -1776,50 +1276,63 @@ ) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n"); + LogError("Invalid parameter (NULL) passed to AMQP transport SetOption()\r\n"); } else { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */ - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */ - if (strcmp("TrustedCerts", option) == 0) - { - PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle; - char* newTrsutedCertificates; + AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle; - if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0) - { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */ - result = IOTHUB_CLIENT_ERROR; - } - else - { - /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */ - if (handleData->trustedCertificates != NULL) - { - free(handleData->trustedCertificates); - } - - handleData->trustedCertificates = newTrsutedCertificates; - result = IOTHUB_CLIENT_OK; - } - } - else - { - /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/ - result = IOTHUB_CLIENT_INVALID_ARG; - } - } + if (strcmp("trusted_certificates", option) == 0) + { + transport_state->trusted_certificates = (char*)value; + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] + else if (strcmp("sas_token_lifetime", option) == 0) + { + transport_state->sas_token_lifetime = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] + else if (strcmp("sas_token_refresh_time", option) == 0) + { + transport_state->sas_token_refresh_time = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] + else if (strcmp("cbs_request_timeout", option) == 0) + { + transport_state->cbs_request_timeout = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK] + else if (strcmp("message_send_timeout", option) == 0) + { + transport_state->message_send_timeout = *((size_t*)value); + result = IOTHUB_CLIENT_OK; + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] + else + { + result = IOTHUB_CLIENT_INVALID_ARG; + LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option); + } + } return result; } -static TRANSPORT_PROVIDER myfunc = { - clientTransportAMQP_SetOption, - clientTransportAMQP_Create, clientTransportAMQP_Destroy, clientTransportAMQP_Subscribe, clientTransportAMQP_Unsubscribe, clientTransportAMQP_DoWork, clientTransportAMQP_GetSendStatus +static TRANSPORT_PROVIDER thisTransportProvider = { + IoTHubTransportAMQP_SetOption, + IoTHubTransportAMQP_Create, + IoTHubTransportAMQP_Destroy, + IoTHubTransportAMQP_Subscribe, + IoTHubTransportAMQP_Unsubscribe, + IoTHubTransportAMQP_DoWork, + IoTHubTransportAMQP_GetSendStatus }; extern const void* AMQP_Protocol(void) { - return &myfunc; + return &thisTransportProvider; }