A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
36:8e1d94b0a70c
Parent:
34:6be9c2058664
Child:
37:c923ba7f6cf9
--- a/link.c	Fri Nov 03 13:18:57 2017 -0700
+++ b/link.c	Fri Nov 17 13:58:00 2017 -0800
@@ -15,6 +15,7 @@
 #include "azure_uamqp_c/amqpvalue.h"
 #include "azure_uamqp_c/amqp_definitions.h"
 #include "azure_uamqp_c/amqp_frame_codec.h"
+#include "azure_uamqp_c/async_operation.h"
 
 #define DEFAULT_LINK_CREDIT 10000
 
@@ -61,6 +62,8 @@
     TICK_COUNTER_HANDLE tick_counter;
 } LINK_INSTANCE;
 
+DEFINE_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE);
+
 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
 {
     link_instance->previous_link_state = link_instance->link_state;
@@ -80,14 +83,16 @@
         while (item != NULL)
         {
             LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
-            DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
-            if (delivery_instance != NULL)
+            ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item);
+            if (pending_delivery_operation != NULL)
             {
+                DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);
                 if (indicate_settled && (delivery_instance->on_delivery_settled != NULL))
                 {
                     delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED, NULL);
                 }
-                free(delivery_instance);
+
+                async_operation_destroy(pending_delivery_operation);
             }
 
             item = next_item;
@@ -478,14 +483,16 @@
                     while (pending_delivery != NULL)
                     {
                         LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery);
-                        DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(pending_delivery);
-                        if (delivery_instance == NULL)
+                        ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(pending_delivery);
+                        if (pending_delivery_operation == NULL)
                         {
                             /* error */
                             break;
                         }
                         else
                         {
+                            DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);
+
                             if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last))
                             {
                                 AMQP_VALUE delivery_state;
@@ -496,7 +503,7 @@
                                 else
                                 {
                                     delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, delivery_state);
-                                    free(delivery_instance);
+                                    async_operation_destroy(pending_delivery_operation);
                                     if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0)
                                     {
                                         /* error */
@@ -585,13 +592,13 @@
     }
     else if (new_session_state == SESSION_STATE_DISCARDING)
     {
-                remove_all_pending_deliveries(link_instance, true);
-                set_link_state(link_instance, LINK_STATE_DETACHED);
+        remove_all_pending_deliveries(link_instance, true);
+        set_link_state(link_instance, LINK_STATE_DETACHED);
     }
     else if (new_session_state == SESSION_STATE_ERROR)
     {
-                remove_all_pending_deliveries(link_instance, true);
-                set_link_state(link_instance, LINK_STATE_ERROR);
+        remove_all_pending_deliveries(link_instance, true);
+        set_link_state(link_instance, LINK_STATE_ERROR);
     }
 }
 
@@ -607,14 +614,15 @@
 static void on_send_complete(void* context, IO_SEND_RESULT send_result)
 {
     LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context;
-    DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item);
-    LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link;
+    ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(delivery_instance_list_item);
+    DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);
+    LINK_HANDLE link = (LINK_HANDLE)delivery_instance->link;
     (void)send_result;
-    if (link_instance->snd_settle_mode == sender_settle_mode_settled)
+    if (link->snd_settle_mode == sender_settle_mode_settled)
     {
         delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_SETTLED, NULL);
-        free(delivery_instance);
-        (void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item);
+        async_operation_destroy(pending_delivery_operation);
+        (void)singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
     }
 }
 
@@ -1091,129 +1099,186 @@
     return result;
 }
 
-LINK_TRANSFER_RESULT link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, tickcounter_ms_t timeout)
+static bool remove_pending_delivery_condition_function(const void* item, const void* match_context, bool* continue_processing)
 {
-    LINK_TRANSFER_RESULT result;
+    bool result;
+
+    if (item == match_context)
+    {
+        result = true;
+        *continue_processing = false;
+    }
+    else
+    {
+        result = false;
+        *continue_processing = true;
+    }
+
+    return result;
+}
 
-    if (link == NULL)
+static void link_transfer_cancel_handler(ASYNC_OPERATION_HANDLE link_transfer_operation)
+{
+    DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, link_transfer_operation);
+    if (pending_delivery->on_delivery_settled != NULL)
     {
-        result = LINK_TRANSFER_ERROR;
+        pending_delivery->on_delivery_settled(pending_delivery->callback_context, pending_delivery->delivery_id, LINK_DELIVERY_SETTLE_REASON_CANCELLED, NULL);
+    }
+    free(pending_delivery);
+    
+    (void)singlylinkedlist_remove_if(((LINK_HANDLE)pending_delivery->link)->pending_deliveries, remove_pending_delivery_condition_function, pending_delivery);
+}
+
+ASYNC_OPERATION_HANDLE link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, LINK_TRANSFER_RESULT* link_transfer_error, tickcounter_ms_t timeout)
+{
+    ASYNC_OPERATION_HANDLE result;
+
+    if ((link == NULL) ||
+        (link_transfer_error == NULL))
+    {
+        if (link_transfer_error != NULL)
+        {
+            *link_transfer_error = LINK_TRANSFER_ERROR;
+        }
+
+        result = NULL;
     }
     else
     {
         if ((link->role != role_sender) ||
             (link->link_state != LINK_STATE_ATTACHED))
         {
-            result = LINK_TRANSFER_ERROR;
+            *link_transfer_error = LINK_TRANSFER_ERROR;
+            result = NULL;
         }
         else if (link->link_credit == 0)
         {
-            result = LINK_TRANSFER_BUSY;
+            *link_transfer_error = LINK_TRANSFER_BUSY;
+            result = NULL;
         }
         else
         {
-            TRANSFER_HANDLE transfer = transfer_create(0);
-            if (transfer == NULL)
-            {
-                result = LINK_TRANSFER_ERROR;
-            }
-            else
+            result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler);
+            if (result != NULL)
             {
-                sequence_no delivery_count = link->delivery_count + 1;
-                unsigned char delivery_tag_bytes[sizeof(delivery_count)];
-                delivery_tag delivery_tag;
-                bool settled;
-
-                (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count));
-
-                delivery_tag.bytes = &delivery_tag_bytes;
-                delivery_tag.length = sizeof(delivery_tag_bytes);
-
-                if (link->snd_settle_mode == sender_settle_mode_unsettled)
+                TRANSFER_HANDLE transfer = transfer_create(0);
+                if (transfer == NULL)
                 {
-                    settled = false;
+                    *link_transfer_error = LINK_TRANSFER_ERROR;
+                    async_operation_destroy(result);
+                    result = NULL;
                 }
                 else
                 {
-                    settled = true;
-                }
+                    sequence_no delivery_count = link->delivery_count + 1;
+                    unsigned char delivery_tag_bytes[sizeof(delivery_count)];
+                    delivery_tag delivery_tag;
+                    bool settled;
+
+                    (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count));
+
+                    delivery_tag.bytes = &delivery_tag_bytes;
+                    delivery_tag.length = sizeof(delivery_tag_bytes);
 
-                if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) ||
-                    (transfer_set_message_format(transfer, message_format) != 0) ||
-                    (transfer_set_settled(transfer, settled) != 0))
-                {
-                    result = LINK_TRANSFER_ERROR;
-                }
-                else
-                {
-                    AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer);
+                    if (link->snd_settle_mode == sender_settle_mode_unsettled)
+                    {
+                        settled = false;
+                    }
+                    else
+                    {
+                        settled = true;
+                    }
 
-                    if (transfer_value == NULL)
+                    if ((transfer_set_delivery_tag(transfer, delivery_tag) != 0) ||
+                        (transfer_set_message_format(transfer, message_format) != 0) ||
+                        (transfer_set_settled(transfer, settled) != 0))
                     {
-                        result = LINK_TRANSFER_ERROR;
+                        *link_transfer_error = LINK_TRANSFER_ERROR;
+                        async_operation_destroy(result);
+                        result = NULL;
                     }
                     else
                     {
-                        DELIVERY_INSTANCE* pending_delivery = (DELIVERY_INSTANCE*)malloc(sizeof(DELIVERY_INSTANCE));
-                        if (pending_delivery == NULL)
+                        AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer);
+
+                        if (transfer_value == NULL)
                         {
-                            result = LINK_TRANSFER_ERROR;
+                            *link_transfer_error = LINK_TRANSFER_ERROR;
+                            async_operation_destroy(result);
+                            result = NULL;
                         }
                         else
                         {
-                            if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
+                            DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result);
+                            if (pending_delivery == NULL)
                             {
-                                free(pending_delivery);
-                                result = LINK_TRANSFER_ERROR;
+                                *link_transfer_error = LINK_TRANSFER_ERROR;
+                                async_operation_destroy(result);
+                                result = NULL;
                             }
                             else
                             {
-                                LIST_ITEM_HANDLE delivery_instance_list_item;
-                                pending_delivery->timeout = timeout;
-                                pending_delivery->on_delivery_settled = on_delivery_settled;
-                                pending_delivery->callback_context = callback_context;
-                                pending_delivery->link = link;
-                                delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
-
-                                if (delivery_instance_list_item == NULL)
+                                if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
                                 {
                                     free(pending_delivery);
-                                    result = LINK_TRANSFER_ERROR;
+                                    *link_transfer_error = LINK_TRANSFER_ERROR;
+                                    async_operation_destroy(result);
+                                    result = NULL;
                                 }
                                 else
                                 {
-                                    /* here we should feed data to the transfer frame */
-                                    switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
-                                    {
-                                    default:
-                                    case SESSION_SEND_TRANSFER_ERROR:
-                                        singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
-                                        free(pending_delivery);
-                                        result = LINK_TRANSFER_ERROR;
-                                        break;
+                                    LIST_ITEM_HANDLE delivery_instance_list_item;
+                                    pending_delivery->timeout = timeout;
+                                    pending_delivery->on_delivery_settled = on_delivery_settled;
+                                    pending_delivery->callback_context = callback_context;
+                                    pending_delivery->link = link;
+                                    delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, result);
 
-                                    case SESSION_SEND_TRANSFER_BUSY:
-                                        /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
-                                        singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                    if (delivery_instance_list_item == NULL)
+                                    {
                                         free(pending_delivery);
-                                        result = LINK_TRANSFER_BUSY;
-                                        break;
+                                        *link_transfer_error = LINK_TRANSFER_ERROR;
+                                        async_operation_destroy(result);
+                                        result = NULL;
+                                    }
+                                    else
+                                    {
+                                        /* here we should feed data to the transfer frame */
+                                        switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+                                        {
+                                        default:
+                                        case SESSION_SEND_TRANSFER_ERROR:
+                                            singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                            free(pending_delivery);
+                                            *link_transfer_error = LINK_TRANSFER_ERROR;
+                                            async_operation_destroy(result);
+                                            result = NULL;
+                                            break;
 
-                                    case SESSION_SEND_TRANSFER_OK:
-                                        link->delivery_count = delivery_count;
-                                        link->link_credit--;
-                                        result = LINK_TRANSFER_OK;
-                                        break;
+                                        case SESSION_SEND_TRANSFER_BUSY:
+                                            /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
+                                            singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                            free(pending_delivery);
+                                            *link_transfer_error = LINK_TRANSFER_BUSY;
+                                            async_operation_destroy(result);
+                                            result = NULL;
+                                            break;
+
+                                        case SESSION_SEND_TRANSFER_OK:
+                                            link->delivery_count = delivery_count;
+                                            link->link_credit--;
+                                            break;
+                                        }
                                     }
                                 }
                             }
+
+                            amqpvalue_destroy(transfer_value);
                         }
-
-                        amqpvalue_destroy(transfer_value);
                     }
+
+                    transfer_destroy(transfer);
                 }
-
-                transfer_destroy(transfer);
             }
         }
     }
@@ -1291,7 +1356,8 @@
             while (item != NULL)
             {
                 LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
-                DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
+                ASYNC_OPERATION_HANDLE delivery_instance_async_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(item);
+                DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, delivery_instance_async_operation);
 
                 if ((delivery_instance->timeout != 0) &&
                     (current_tick - delivery_instance->start_tick >= delivery_instance->timeout))
@@ -1302,7 +1368,7 @@
                     }
 
                     (void)singlylinkedlist_remove(link->pending_deliveries, item);
-                    free(delivery_instance);
+                    async_operation_destroy(delivery_instance_async_operation);
                 }
 
                 item = next_item;