Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransportamqp.c
- Revision:
- 12:841a4c36bd36
- Parent:
- 11:62d7b956e76e
- Child:
- 13:a4af7c301e02
--- a/iothubtransportamqp.c Fri Mar 25 15:59:44 2016 -0700 +++ b/iothubtransportamqp.c Fri Apr 08 13:23:54 2016 -0700 @@ -8,27 +8,25 @@ #include <stdint.h> #include <time.h> #include <limits.h> -#include "gballoc.h" +#include "azure_c_shared_utility/gballoc.h" +#include "azure_c_shared_utility/crt_abstractions.h" +#include "azure_c_shared_utility/doublylinkedlist.h" +#include "azure_c_shared_utility/iot_logging.h" +#include "azure_c_shared_utility/platform.h" +#include "azure_c_shared_utility/sastoken.h" +#include "azure_c_shared_utility/strings.h" +#include "azure_c_shared_utility/urlencode.h" +#include "azure_c_shared_utility/tlsio.h" -#include "cbs.h" -#include "link.h" -#include "message.h" -#include "amqpvalue.h" -#include "message_receiver.h" -#include "message_sender.h" -#include "messaging.h" -#include "sasl_mssbcbs.h" -#include "saslclientio.h" - -#include "crt_abstractions.h" -#include "doublylinkedlist.h" -#include "iot_logging.h" -#include "platform.h" -#include "sastoken.h" -#include "strings.h" -#include "urlencode.h" - -#include "tlsio.h" +#include "azure_uamqp_c/cbs.h" +#include "azure_uamqp_c/link.h" +#include "azure_uamqp_c/message.h" +#include "azure_uamqp_c/amqpvalue.h" +#include "azure_uamqp_c/message_receiver.h" +#include "azure_uamqp_c/message_sender.h" +#include "azure_uamqp_c/messaging.h" +#include "azure_uamqp_c/sasl_mssbcbs.h" +#include "azure_uamqp_c/saslclientio.h" #include "iothub_client_ll.h" #include "iothub_client_private.h" @@ -43,7 +41,6 @@ #define DEFAULT_IOTHUB_AMQP_PORT 5671 #define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000 #define DEFAULT_CBS_REQUEST_TIMEOUT_MS 30000 -#define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000 #define CBS_AUDIENCE "servicebus.windows.net:sastoken" #define DEFAULT_CONTAINER_ID "default_container_id" #define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX @@ -86,8 +83,6 @@ size_t sas_token_refresh_time; // Maximum time the transport waits for uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds. size_t cbs_request_timeout; - // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds. - size_t message_send_timeout; // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds. size_t connection_timeout; // Saved reference to the IoTHub LL Client. @@ -133,14 +128,6 @@ bool isRegistered; } AMQP_TRANSPORT_INSTANCE; -// This structure is used to track an event being sent and the time it was sent. -typedef struct EVENT_TRACKER_TAG -{ - IOTHUB_CLIENT_LL_HANDLE iothub_client_handle; - IOTHUB_MESSAGE_LIST* message; - time_t time_sent; - DLIST_ENTRY entry; -} EVENT_TRACKER; // Auxiliary functions @@ -172,27 +159,10 @@ return (size_t)(difftime(get_time(NULL), (time_t)0)); } -static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state) +static void trackEventInProgress(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state) { - EVENT_TRACKER* event_tracker; - - if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL) - { - event_tracker->iothub_client_handle = transport_state->iothub_client_handle; - event_tracker->message = message; - event_tracker->time_sent = get_time(NULL); - DList_InitializeListHead(&event_tracker->entry); - - DList_RemoveEntryList(&message->entry); - DList_InsertTailList(&transport_state->inProgress, &event_tracker->entry); - } - - return event_tracker; -} - -static void destroyEventTracker(EVENT_TRACKER* event_tracker) -{ - free(event_tracker); + DList_RemoveEntryList(&message->entry); + DList_InsertTailList(&transport_state->inProgress, &message->entry); } static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state) @@ -212,22 +182,21 @@ return message; } -static int isEventInInProgressList(EVENT_TRACKER* event_tracker) +static int isEventInInProgressList(IOTHUB_MESSAGE_LIST* message) { - return !DList_IsListEmpty(&event_tracker->entry); + return !DList_IsListEmpty(&message->entry); } -static void removeEventFromInProgressList(EVENT_TRACKER* event_tracker) +static void removeEventFromInProgressList(IOTHUB_MESSAGE_LIST* message) { - DList_RemoveEntryList(&event_tracker->entry); - DList_InitializeListHead(&event_tracker->entry); + DList_RemoveEntryList(&message->entry); + DList_InitializeListHead(&message->entry); } -static void rollEventBackToWaitList(EVENT_TRACKER* event_tracker, AMQP_TRANSPORT_INSTANCE* transport_state) +static void rollEventBackToWaitList(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state) { - removeEventFromInProgressList(event_tracker); - DList_InsertTailList(transport_state->waitingToSend, &event_tracker->message->entry); - destroyEventTracker(event_tracker); + removeEventFromInProgressList(message); + DList_InsertTailList(transport_state->waitingToSend, &message->entry); } static void rollEventsBackToWaitList(AMQP_TRANSPORT_INSTANCE* transport_state) @@ -236,15 +205,15 @@ while (entry != &transport_state->inProgress) { - EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry); + IOTHUB_MESSAGE_LIST* message = containingRecord(entry, IOTHUB_MESSAGE_LIST, entry); entry = entry->Blink; - rollEventBackToWaitList(event_tracker, transport_state); + rollEventBackToWaitList(message, transport_state); } } static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result) { - EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context; + IOTHUB_MESSAGE_LIST* message = (IOTHUB_MESSAGE_LIST*)context; IOTHUB_CLIENT_RESULT iot_hub_send_result; @@ -254,35 +223,35 @@ iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR] - else if (send_result == MESSAGE_SEND_ERROR) + else { iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR; } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided] - if (event_tracker->message->callback != NULL) + if (message->callback != NULL) { - event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context); + message->callback(iot_hub_send_result, message->context); } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] + if (isEventInInProgressList(message)) + { + removeEventFromInProgressList(message); + } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()] - IoTHubMessage_Destroy(event_tracker->message->messageHandle); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance] - free(event_tracker->message); + IoTHubMessage_Destroy(message->messageHandle); - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] - if (isEventInInProgressList(event_tracker)) - { - removeEventFromInProgressList(event_tracker); - destroyEventTracker(event_tracker); - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance] + free(message); } static void on_put_token_complete(void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description) { AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context; - if (operation_result == OPERATION_RESULT_OK) + if (operation_result == CBS_OPERATION_RESULT_OK) { transportState->cbs_state = CBS_STATE_AUTHENTICATED; } @@ -535,26 +504,6 @@ return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) * 1000 >= transport_state->cbs_request_timeout) ? RESULT_TIMEOUT : RESULT_OK; } -static void handleEventSendTimeouts(AMQP_TRANSPORT_INSTANCE* transport_state) -{ - time_t current_time = get_time(NULL); - PDLIST_ENTRY entry = transport_state->inProgress.Flink; - - while (entry != &transport_state->inProgress) - { - EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry); - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to 'message_send_timeout' milliseconds] - if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout) - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a 'message_send_timeout' occurs the timed out events removed from the inProgress and the upper layer notified of the send error] - on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR); - } - - entry = entry->Flink; - } -} - static void attachDeviceClientTypeToLink(LINK_HANDLE link) { fields attach_properties; @@ -897,17 +846,14 @@ IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle); const unsigned char* messageContent; size_t messageContentSize; - MESSAGE_HANDLE amqp_message; + MESSAGE_HANDLE amqp_message = NULL; bool is_message_error = false; - EVENT_TRACKER* event_tracker; // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending] - if ((event_tracker = trackEvent(message, transport_state)) == NULL) - { - LogError("Failed tracking the event to be sent.\r\n"); - } + trackEventInProgress(message, transport_state); + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()] - else if (contentType == IOTHUBMESSAGE_BYTEARRAY && + if (contentType == IOTHUBMESSAGE_BYTEARRAY && IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK) { LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n"); @@ -959,7 +905,7 @@ else { // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] - if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK) + if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK) { LogError("Failed sending the AMQP message.\r\n"); } @@ -979,26 +925,19 @@ if (result != RESULT_OK) { - if (event_tracker == NULL) + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] + if (is_message_error) { - break; + on_message_send_complete(message, MESSAGE_SEND_ERROR); } else { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] - if (is_message_error) - { - on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR); - } - else - { - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] - rollEventBackToWaitList(event_tracker, transport_state); - break; - } + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return] + rollEventBackToWaitList(message, transport_state); + break; } } } @@ -1168,9 +1107,6 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_129 : [IoTHubTransportAMQP_Create shall set parameter transport_state->cbs_request_timeout with the default value of 30000 (milliseconds).] transport_state->cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS; - - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_130 : [IoTHubTransportAMQP_Create shall set parameter transport_state->message_send_timeout with the default value of 300000 (milliseconds).] - transport_state->message_send_timeout = DEFAULT_MESSAGE_SEND_TIMEOUT_MS; } } } @@ -1317,8 +1253,6 @@ // Codes_SRS_IOTHUBTRANSPORTAMQP_09_103: [IoTHubTransportAMQP_DoWork shall invoke connection_dowork() on AMQP for triggering sending and receiving messages] connection_dowork(transport_state->connection); } - - handleEventSendTimeouts(transport_state); } } @@ -1432,12 +1366,6 @@ transport_state->cbs_request_timeout = *((size_t*)value); result = IOTHUB_CLIENT_OK; } - // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK] - else if (strcmp("message_send_timeout", option) == 0) - { - transport_state->message_send_timeout = *((size_t*)value); - result = IOTHUB_CLIENT_OK; - } // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If the option name does not match one of the options handled by this module, then IoTHubTransportAMQP_SetOption shall get the handle to the XIO and invoke the xio_setoption passing down the option name and value parameters.] else { @@ -1466,11 +1394,11 @@ return result; } -static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend) +static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQP_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend) { IOTHUB_DEVICE_HANDLE result; - // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] - // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.] + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQP_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQP_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.] if ((handle == NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL)) { result = NULL; @@ -1487,7 +1415,7 @@ } else { - // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQ_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.] + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQP_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.] if (strcmp(STRING_c_str(transport_state->devicesPath), STRING_c_str(devicesPath)) != 0) { LogError("Attemping to add new device to AMQP transport, not allowed."); @@ -1508,7 +1436,7 @@ else { transport_state->isRegistered = true; - // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.] + // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQP_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.] result = (IOTHUB_DEVICE_HANDLE)handle; } } @@ -1519,8 +1447,8 @@ return result; } -// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQ_Unregister shall return.] -static void IoTHubTransportAMQ_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle) +// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQP_Unregister shall return.] +static void IoTHubTransportAMQP_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle) { if (deviceHandle != NULL) { @@ -1534,8 +1462,8 @@ IoTHubTransportAMQP_SetOption, IoTHubTransportAMQP_Create, IoTHubTransportAMQP_Destroy, - IoTHubTransportAMQ_Register, - IoTHubTransportAMQ_Unregister, + IoTHubTransportAMQP_Register, + IoTHubTransportAMQP_Unregister, IoTHubTransportAMQP_Subscribe, IoTHubTransportAMQP_Unsubscribe, IoTHubTransportAMQP_DoWork,