Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
12:841a4c36bd36
Parent:
11:62d7b956e76e
Child:
13:a4af7c301e02
--- a/iothubtransportamqp.c	Fri Mar 25 15:59:44 2016 -0700
+++ b/iothubtransportamqp.c	Fri Apr 08 13:23:54 2016 -0700
@@ -8,27 +8,25 @@
 #include <stdint.h>
 #include <time.h>
 #include <limits.h>
-#include "gballoc.h"
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/crt_abstractions.h"
+#include "azure_c_shared_utility/doublylinkedlist.h"
+#include "azure_c_shared_utility/iot_logging.h"
+#include "azure_c_shared_utility/platform.h"
+#include "azure_c_shared_utility/sastoken.h"
+#include "azure_c_shared_utility/strings.h"
+#include "azure_c_shared_utility/urlencode.h"
+#include "azure_c_shared_utility/tlsio.h"
 
-#include "cbs.h"
-#include "link.h"
-#include "message.h"
-#include "amqpvalue.h"
-#include "message_receiver.h"
-#include "message_sender.h"
-#include "messaging.h"
-#include "sasl_mssbcbs.h"
-#include "saslclientio.h"
-
-#include "crt_abstractions.h"
-#include "doublylinkedlist.h"
-#include "iot_logging.h"
-#include "platform.h"
-#include "sastoken.h"
-#include "strings.h"
-#include "urlencode.h"
-
-#include "tlsio.h"
+#include "azure_uamqp_c/cbs.h"
+#include "azure_uamqp_c/link.h"
+#include "azure_uamqp_c/message.h"
+#include "azure_uamqp_c/amqpvalue.h"
+#include "azure_uamqp_c/message_receiver.h"
+#include "azure_uamqp_c/message_sender.h"
+#include "azure_uamqp_c/messaging.h"
+#include "azure_uamqp_c/sasl_mssbcbs.h"
+#include "azure_uamqp_c/saslclientio.h"
 
 #include "iothub_client_ll.h"
 #include "iothub_client_private.h"
@@ -43,7 +41,6 @@
 #define DEFAULT_IOTHUB_AMQP_PORT 5671
 #define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000
 #define DEFAULT_CBS_REQUEST_TIMEOUT_MS 30000
-#define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000
 #define CBS_AUDIENCE "servicebus.windows.net:sastoken"
 #define DEFAULT_CONTAINER_ID "default_container_id"
 #define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX
@@ -86,8 +83,6 @@
     size_t sas_token_refresh_time;
     // Maximum time the transport waits for  uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds.
     size_t cbs_request_timeout;
-    // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds.
-    size_t message_send_timeout;
     // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds.
     size_t connection_timeout;
     // Saved reference to the IoTHub LL Client.
@@ -133,14 +128,6 @@
     bool isRegistered;
 } AMQP_TRANSPORT_INSTANCE;
 
-// This structure is used to track an event being sent and the time it was sent.
-typedef struct EVENT_TRACKER_TAG
-{
-    IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
-    IOTHUB_MESSAGE_LIST* message;
-    time_t time_sent;
-    DLIST_ENTRY entry;
-} EVENT_TRACKER;
 
 
 // Auxiliary functions
@@ -172,27 +159,10 @@
     return (size_t)(difftime(get_time(NULL), (time_t)0));
 }
 
-static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
+static void trackEventInProgress(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    EVENT_TRACKER* event_tracker;
-
-    if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL)
-    {
-        event_tracker->iothub_client_handle = transport_state->iothub_client_handle;
-        event_tracker->message = message;
-        event_tracker->time_sent = get_time(NULL);
-        DList_InitializeListHead(&event_tracker->entry);
-
-        DList_RemoveEntryList(&message->entry);
-        DList_InsertTailList(&transport_state->inProgress, &event_tracker->entry);
-    }
-
-    return event_tracker;
-}
-
-static void destroyEventTracker(EVENT_TRACKER* event_tracker)
-{
-    free(event_tracker);
+    DList_RemoveEntryList(&message->entry);
+    DList_InsertTailList(&transport_state->inProgress, &message->entry);
 }
 
 static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -212,22 +182,21 @@
     return message;
 }
 
-static int isEventInInProgressList(EVENT_TRACKER* event_tracker)
+static int isEventInInProgressList(IOTHUB_MESSAGE_LIST* message)
 {
-    return !DList_IsListEmpty(&event_tracker->entry);
+    return !DList_IsListEmpty(&message->entry);
 }
 
-static void removeEventFromInProgressList(EVENT_TRACKER* event_tracker)
+static void removeEventFromInProgressList(IOTHUB_MESSAGE_LIST* message)
 {
-    DList_RemoveEntryList(&event_tracker->entry);
-    DList_InitializeListHead(&event_tracker->entry);
+    DList_RemoveEntryList(&message->entry);
+    DList_InitializeListHead(&message->entry);
 }
 
-static void rollEventBackToWaitList(EVENT_TRACKER* event_tracker, AMQP_TRANSPORT_INSTANCE* transport_state)
+static void rollEventBackToWaitList(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    removeEventFromInProgressList(event_tracker);
-    DList_InsertTailList(transport_state->waitingToSend, &event_tracker->message->entry);
-    destroyEventTracker(event_tracker);
+    removeEventFromInProgressList(message);
+	DList_InsertTailList(transport_state->waitingToSend, &message->entry);
 }
 
 static void rollEventsBackToWaitList(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -236,15 +205,15 @@
 
     while (entry != &transport_state->inProgress)
     {
-        EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry);
+		IOTHUB_MESSAGE_LIST* message = containingRecord(entry, IOTHUB_MESSAGE_LIST, entry);
         entry = entry->Blink;
-        rollEventBackToWaitList(event_tracker, transport_state);
+        rollEventBackToWaitList(message, transport_state);
     }
 }
 
 static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result)
 {
-    EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context;
+	IOTHUB_MESSAGE_LIST* message = (IOTHUB_MESSAGE_LIST*)context;
 
     IOTHUB_CLIENT_RESULT iot_hub_send_result;
 
@@ -254,35 +223,35 @@
         iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK;
     }
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR]
-    else if (send_result == MESSAGE_SEND_ERROR)
+    else
     {
         iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR;
     }
 
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided] 
-    if (event_tracker->message->callback != NULL)
+    if (message->callback != NULL)
     {
-        event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context);
+        message->callback(iot_hub_send_result, message->context);
     }
 
+	// Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] 
+	if (isEventInInProgressList(message))
+	{
+		removeEventFromInProgressList(message);
+	}
+
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()]
-    IoTHubMessage_Destroy(event_tracker->message->messageHandle);
-    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance]
-    free(event_tracker->message);
+    IoTHubMessage_Destroy(message->messageHandle);
 
-    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] 
-    if (isEventInInProgressList(event_tracker))
-    {
-        removeEventFromInProgressList(event_tracker);
-        destroyEventTracker(event_tracker);
-    }
+	// Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance]
+    free(message);
 }
 
 static void on_put_token_complete(void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
 {
     AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context;
 
-    if (operation_result == OPERATION_RESULT_OK)
+    if (operation_result == CBS_OPERATION_RESULT_OK)
     {
         transportState->cbs_state = CBS_STATE_AUTHENTICATED;
     }
@@ -535,26 +504,6 @@
     return ((getSecondsSinceEpoch() - transport_state->current_sas_token_create_time) * 1000 >= transport_state->cbs_request_timeout) ? RESULT_TIMEOUT : RESULT_OK;
 }
 
-static void handleEventSendTimeouts(AMQP_TRANSPORT_INSTANCE* transport_state)
-{
-    time_t current_time = get_time(NULL);
-    PDLIST_ENTRY entry = transport_state->inProgress.Flink;
-
-    while (entry != &transport_state->inProgress)
-    {
-        EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry);
-
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to 'message_send_timeout' milliseconds] 
-        if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout)
-        {
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a 'message_send_timeout' occurs the timed out events removed from the inProgress and the upper layer notified of the send error] 
-            on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR);
-        }
-
-        entry = entry->Flink;
-    }
-}
-
 static void attachDeviceClientTypeToLink(LINK_HANDLE link)
 {
     fields attach_properties;
@@ -897,17 +846,14 @@
         IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle);
         const unsigned char* messageContent;
         size_t messageContentSize;
-        MESSAGE_HANDLE amqp_message;
+        MESSAGE_HANDLE amqp_message = NULL;
         bool is_message_error = false;
-        EVENT_TRACKER* event_tracker;
 
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending]
-        if ((event_tracker = trackEvent(message, transport_state)) == NULL)
-        {
-            LogError("Failed tracking the event to be sent.\r\n");
-        }
+		trackEventInProgress(message, transport_state);
+
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_087: [If the event contains a message of type IOTHUBMESSAGE_BYTEARRAY, IoTHubTransportAMQP_DoWork shall obtain its char* representation and size using IoTHubMessage_GetByteArray()] 
-        else if (contentType == IOTHUBMESSAGE_BYTEARRAY &&
+        if (contentType == IOTHUBMESSAGE_BYTEARRAY &&
             IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK)
         {
             LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n");
@@ -959,7 +905,7 @@
                 else
                 {
                     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] 
-                    if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK)
+                    if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK)
                     {
                         LogError("Failed sending the AMQP message.\r\n");
                     }
@@ -979,26 +925,19 @@
 
         if (result != RESULT_OK)
         {
-            if (event_tracker == NULL)
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] 
+            if (is_message_error)
             {
-                break;
+                on_message_send_complete(message, MESSAGE_SEND_ERROR);
             }
             else
             {
-                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_088: [If IoTHubMessage_GetByteArray() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] 
-                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_091: [If IoTHubMessage_GetString() fails, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] 
-                if (is_message_error)
-                {
-                    on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR);
-                }
-                else
-                {
-                    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
-                    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
-                    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
-                    rollEventBackToWaitList(event_tracker, transport_state);
-                    break;
-                }
+                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_111: [If message_create() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
+                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_112: [If message_add_body_amqp_data() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
+                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_113: [If messagesender_send() fails, IoTHubTransportAMQP_DoWork notify the failure, roll back the event to waitToSent list and return]
+                rollEventBackToWaitList(message, transport_state);
+                break;
             }
         }
     }
@@ -1168,9 +1107,6 @@
 
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_129 : [IoTHubTransportAMQP_Create shall set parameter transport_state->cbs_request_timeout with the default value of 30000 (milliseconds).]
                 transport_state->cbs_request_timeout = DEFAULT_CBS_REQUEST_TIMEOUT_MS;
-
-                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_130 : [IoTHubTransportAMQP_Create shall set parameter transport_state->message_send_timeout with the default value of 300000 (milliseconds).]
-                transport_state->message_send_timeout = DEFAULT_MESSAGE_SEND_TIMEOUT_MS;
             }
         }
     }
@@ -1317,8 +1253,6 @@
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_103: [IoTHubTransportAMQP_DoWork shall invoke connection_dowork() on AMQP for triggering sending and receiving messages] 
             connection_dowork(transport_state->connection);
         }
-
-        handleEventSendTimeouts(transport_state);
     }
 }
 
@@ -1432,12 +1366,6 @@
             transport_state->cbs_request_timeout = *((size_t*)value);
             result = IOTHUB_CLIENT_OK;
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK]
-        else if (strcmp("message_send_timeout", option) == 0)
-        {
-            transport_state->message_send_timeout = *((size_t*)value);
-            result = IOTHUB_CLIENT_OK;
-        }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If the option name does not match one of the options handled by this module, then IoTHubTransportAMQP_SetOption shall get  the handle to the XIO and invoke the xio_setoption passing down the option name and value parameters.] 
         else
         {
@@ -1466,11 +1394,11 @@
     return result;
 }
 
-static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
+static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQP_Register(TRANSPORT_LL_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
 {
     IOTHUB_DEVICE_HANDLE result;
-    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] 
-    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
+    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQP_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] 
+    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQP_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
     if ((handle == NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL))
     {
         result = NULL;
@@ -1487,7 +1415,7 @@
         }
         else
         {
-            // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQ_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.] 
+            // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQP_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.] 
             if (strcmp(STRING_c_str(transport_state->devicesPath), STRING_c_str(devicesPath)) != 0)
             {
                 LogError("Attemping to add new device to AMQP transport, not allowed.");
@@ -1508,7 +1436,7 @@
                 else
                 {
                     transport_state->isRegistered = true;
-                    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.] 
+                    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQP_Register shall return the TRANSPORT_LL_HANDLE as the IOTHUB_DEVICE_HANDLE.] 
                     result = (IOTHUB_DEVICE_HANDLE)handle;
                 }
             }
@@ -1519,8 +1447,8 @@
     return result;
 }
 
-// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQ_Unregister shall return.] 
-static void IoTHubTransportAMQ_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
+// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQP_Unregister shall return.] 
+static void IoTHubTransportAMQP_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
 {
     if (deviceHandle != NULL)
     {
@@ -1534,8 +1462,8 @@
     IoTHubTransportAMQP_SetOption,
     IoTHubTransportAMQP_Create,
     IoTHubTransportAMQP_Destroy,
-    IoTHubTransportAMQ_Register,
-    IoTHubTransportAMQ_Unregister,
+    IoTHubTransportAMQP_Register,
+    IoTHubTransportAMQP_Unregister,
     IoTHubTransportAMQP_Subscribe,
     IoTHubTransportAMQP_Unsubscribe,
     IoTHubTransportAMQP_DoWork,