A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
34:6be9c2058664
Parent:
29:4a11413cf217
Child:
36:8e1d94b0a70c
--- a/link.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/link.c	Sat Oct 21 20:12:19 2017 +0000
@@ -9,6 +9,7 @@
 #include "azure_c_shared_utility/optimize_size.h"
 #include "azure_c_shared_utility/xlogging.h"
 #include "azure_c_shared_utility/singlylinkedlist.h"
+#include "azure_c_shared_utility/tickcounter.h"
 #include "azure_uamqp_c/link.h"
 #include "azure_uamqp_c/session.h"
 #include "azure_uamqp_c/amqpvalue.h"
@@ -23,6 +24,8 @@
     ON_DELIVERY_SETTLED on_delivery_settled;
     void* callback_context;
     void* link;
+    tickcounter_ms_t start_tick;
+    tickcounter_ms_t timeout;
 } DELIVERY_INSTANCE;
 
 typedef struct LINK_INSTANCE_TAG
@@ -55,6 +58,7 @@
     unsigned char* received_payload;
     uint32_t received_payload_size;
     delivery_number received_delivery_id;
+    TICK_COUNTER_HANDLE tick_counter;
 } LINK_INSTANCE;
 
 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
@@ -639,37 +643,49 @@
         result->received_payload_size = 0;
         result->received_delivery_id = 0;
 
-        result->pending_deliveries = singlylinkedlist_create();
-        if (result->pending_deliveries == NULL)
+        result->tick_counter = tickcounter_create();
+        if (result->tick_counter == NULL)
         {
             free(result);
             result = NULL;
         }
         else
         {
-            size_t name_length = strlen(name);
-            result->name = (char*)malloc(name_length + 1);
-            if (result->name == NULL)
+            result->pending_deliveries = singlylinkedlist_create();
+            if (result->pending_deliveries == NULL)
             {
-                singlylinkedlist_destroy(result->pending_deliveries);
+                tickcounter_destroy(result->tick_counter);
                 free(result);
                 result = NULL;
             }
             else
             {
-                result->on_link_state_changed = NULL;
-                result->callback_context = NULL;
-                set_link_state(result, LINK_STATE_DETACHED);
-
-                (void)memcpy(result->name, name, name_length + 1);
-                result->link_endpoint = session_create_link_endpoint(session, name);
-                if (result->link_endpoint == NULL)
+                size_t name_length = strlen(name);
+                result->name = (char*)malloc(name_length + 1);
+                if (result->name == NULL)
                 {
+                    tickcounter_destroy(result->tick_counter);
                     singlylinkedlist_destroy(result->pending_deliveries);
-                    free(result->name);
                     free(result);
                     result = NULL;
                 }
+                else
+                {
+                    result->on_link_state_changed = NULL;
+                    result->callback_context = NULL;
+                    set_link_state(result, LINK_STATE_DETACHED);
+
+                    (void)memcpy(result->name, name, name_length + 1);
+                    result->link_endpoint = session_create_link_endpoint(session, name);
+                    if (result->link_endpoint == NULL)
+                    {
+                        tickcounter_destroy(result->tick_counter);
+                        singlylinkedlist_destroy(result->pending_deliveries);
+                        free(result->name);
+                        free(result);
+                        result = NULL;
+                    }
+                }
             }
         }
     }
@@ -709,28 +725,39 @@
             result->role = role_sender;
         }
 
-        result->pending_deliveries = singlylinkedlist_create();
-        if (result->pending_deliveries == NULL)
+        result->tick_counter = tickcounter_create();
+        if (result->tick_counter == NULL)
         {
             free(result);
             result = NULL;
         }
         else
         {
-            size_t name_length = strlen(name);
-            result->name = (char*)malloc(name_length + 1);
-            if (result->name == NULL)
+            result->pending_deliveries = singlylinkedlist_create();
+            if (result->pending_deliveries == NULL)
             {
-                singlylinkedlist_destroy(result->pending_deliveries);
+                tickcounter_destroy(result->tick_counter);
                 free(result);
                 result = NULL;
             }
             else
             {
-                (void)memcpy(result->name, name, name_length + 1);
-                result->on_link_state_changed = NULL;
-                result->callback_context = NULL;
-                result->link_endpoint = link_endpoint;
+                size_t name_length = strlen(name);
+                result->name = (char*)malloc(name_length + 1);
+                if (result->name == NULL)
+                {
+                    tickcounter_destroy(result->tick_counter);
+                    singlylinkedlist_destroy(result->pending_deliveries);
+                    free(result);
+                    result = NULL;
+                }
+                else
+                {
+                    (void)memcpy(result->name, name, name_length + 1);
+                    result->on_link_state_changed = NULL;
+                    result->callback_context = NULL;
+                    result->link_endpoint = link_endpoint;
+                }
             }
         }
     }
@@ -743,6 +770,7 @@
     if (link != NULL)
     {
         remove_all_pending_deliveries((LINK_INSTANCE*)link, false);
+        tickcounter_destroy(link->tick_counter);
 
         link->on_link_state_changed = NULL;
         (void)link_detach(link, true);
@@ -1063,7 +1091,7 @@
     return result;
 }
 
-LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context)
+LINK_TRANSFER_RESULT link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, tickcounter_ms_t timeout)
 {
     LINK_TRANSFER_RESULT result;
 
@@ -1133,41 +1161,50 @@
                         }
                         else
                         {
-                            LIST_ITEM_HANDLE delivery_instance_list_item;
-                            pending_delivery->on_delivery_settled = on_delivery_settled;
-                            pending_delivery->callback_context = callback_context;
-                            pending_delivery->link = link;
-                            delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
-
-                            if (delivery_instance_list_item == NULL)
+                            if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
                             {
                                 free(pending_delivery);
                                 result = LINK_TRANSFER_ERROR;
                             }
                             else
                             {
-                                /* here we should feed data to the transfer frame */
-                                switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+                                LIST_ITEM_HANDLE delivery_instance_list_item;
+                                pending_delivery->timeout = timeout;
+                                pending_delivery->on_delivery_settled = on_delivery_settled;
+                                pending_delivery->callback_context = callback_context;
+                                pending_delivery->link = link;
+                                delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
+
+                                if (delivery_instance_list_item == NULL)
                                 {
-                                default:
-                                case SESSION_SEND_TRANSFER_ERROR:
-                                    singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
                                     free(pending_delivery);
                                     result = LINK_TRANSFER_ERROR;
-                                    break;
+                                }
+                                else
+                                {
+                                    /* here we should feed data to the transfer frame */
+                                    switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+                                    {
+                                    default:
+                                    case SESSION_SEND_TRANSFER_ERROR:
+                                        singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                        free(pending_delivery);
+                                        result = LINK_TRANSFER_ERROR;
+                                        break;
 
-                                case SESSION_SEND_TRANSFER_BUSY:
-                                    /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
-                                    singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
-                                    free(pending_delivery);
-                                    result = LINK_TRANSFER_BUSY;
-                                    break;
+                                    case SESSION_SEND_TRANSFER_BUSY:
+                                        /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
+                                        singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                        free(pending_delivery);
+                                        result = LINK_TRANSFER_BUSY;
+                                        break;
 
-                                case SESSION_SEND_TRANSFER_OK:
-                                    link->delivery_count = delivery_count;
-                                    link->link_credit--;
-                                    result = LINK_TRANSFER_OK;
-                                    break;
+                                    case SESSION_SEND_TRANSFER_OK:
+                                        link->delivery_count = delivery_count;
+                                        link->link_credit--;
+                                        result = LINK_TRANSFER_OK;
+                                        break;
+                                    }
                                 }
                             }
                         }
@@ -1236,3 +1273,40 @@
     }
     return result;
 }
+
+void link_dowork(LINK_HANDLE link)
+{
+    if (link != NULL)
+    {
+        tickcounter_ms_t current_tick;
+
+        if (tickcounter_get_current_ms(link->tick_counter, &current_tick) != 0)
+        {
+            /* error */
+        }
+        else
+        {
+            // go through all and find timed out deliveries
+            LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
+            while (item != NULL)
+            {
+                LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
+                DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
+
+                if ((delivery_instance->timeout != 0) &&
+                    (current_tick - delivery_instance->start_tick >= delivery_instance->timeout))
+                {
+                    if (delivery_instance->on_delivery_settled != NULL)
+                    {
+                        delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL);
+                    }
+
+                    (void)singlylinkedlist_remove(link->pending_deliveries, item);
+                    free(delivery_instance);
+                }
+
+                item = next_item;
+            }
+        }
+    }
+}