A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
12:b30dacf113f2
Parent:
8:20dc883fb313
Child:
13:9abd748f4e78
--- a/link.c	Thu Sep 22 18:16:30 2016 -0700
+++ b/link.c	Thu Oct 20 17:07:44 2016 -0700
@@ -15,7 +15,7 @@
 #include "azure_uamqp_c/amqpalloc.h"
 #include "azure_uamqp_c/amqp_frame_codec.h"
 #include "azure_c_shared_utility/xlogging.h"
-#include "azure_c_shared_utility/list.h"
+#include "azure_c_shared_utility/singlylinkedlist.h"
 
 #define DEFAULT_LINK_CREDIT 10000
 
@@ -37,7 +37,7 @@
 	handle handle;
 	LINK_ENDPOINT_HANDLE link_endpoint;
 	char* name;
-	LIST_HANDLE pending_deliveries;
+	SINGLYLINKEDLIST_HANDLE pending_deliveries;
 	sequence_no delivery_count;
 	role role;
 	ON_LINK_STATE_CHANGED on_link_state_changed;
@@ -51,7 +51,11 @@
 	uint32_t link_credit;
 	uint32_t available;
     fields attach_properties;
-	int is_underlying_session_begun : 1;
+    bool is_underlying_session_begun;
+    bool is_closed;
+    unsigned char* received_payload;
+    uint32_t received_payload_size;
+    delivery_number received_delivery_id;
 } LINK_INSTANCE;
 
 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
@@ -135,7 +139,7 @@
 	return result;
 }
 
-static int send_detach_frame(LINK_INSTANCE* link_instance, ERROR_HANDLE error_handle)
+static int send_detach(LINK_INSTANCE* link_instance, bool close, ERROR_HANDLE error_handle)
 {
 	int result;
 	DETACH_HANDLE detach_performative;
@@ -152,7 +156,12 @@
 		{
 			result = __LINE__;
 		}
-		else
+        else if (close &&
+            (detach_set_closed(detach_performative, true) != 0))
+        {
+            result = __LINE__;
+        }
+        else
 		{
 			if (session_send_detach(link_instance->link_endpoint, detach_performative) != 0)
 			{
@@ -160,6 +169,12 @@
 			}
 			else
 			{
+                if (close)
+                {
+                    /* Declare link to be closed */
+                    link_instance->is_closed = true;
+                }
+
 				result = 0;
 			}
 		}
@@ -170,6 +185,55 @@
 	return result;
 }
 
+static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role)
+{
+    int result;
+    ATTACH_HANDLE attach = attach_create(name, handle, role);
+
+    if (attach == NULL)
+    {
+        result = __LINE__;
+    }
+    else
+    {
+        result = 0;
+
+        link->delivery_count = link->initial_delivery_count;
+
+        attach_set_snd_settle_mode(attach, link->snd_settle_mode);
+        attach_set_rcv_settle_mode(attach, link->rcv_settle_mode);
+        attach_set_role(attach, role);
+        attach_set_source(attach, link->source);
+        attach_set_target(attach, link->target);
+        attach_set_properties(attach, link->attach_properties);
+
+        if (role == role_sender)
+        {
+            if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0)
+            {
+                result = __LINE__;
+            }
+        }
+
+        if (result == 0)
+        {
+            if ((attach_set_max_message_size(attach, link->max_message_size) != 0) ||
+                (session_send_attach(link->link_endpoint, attach) != 0))
+            {
+                result = __LINE__;
+            }
+            else
+            {
+                result = 0;
+            }
+        }
+
+        attach_destroy(attach);
+    }
+
+    return result;
+}
+
 static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
 {
 	LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
@@ -244,7 +308,7 @@
 			if (amqpvalue_get_transfer(performative, &transfer_handle) == 0)
 			{
 				AMQP_VALUE delivery_state;
-				delivery_number received_delivery_id;
+                bool more;
 
 				link_instance->link_credit--;
 				link_instance->delivery_count++;
@@ -254,24 +318,80 @@
 					send_flow(link_instance);
 				}
 
-				if (transfer_get_delivery_id(transfer_handle, &received_delivery_id) != 0)
-				{
-					/* error */
-				}
-				else
-				{
-					delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, payload_size, payload_bytes);
+                if (transfer_get_more(transfer_handle, &more) != 0)
+                {
+                    LogError("Could not get the more field from the transfer performative");
+                }
+                else
+                {
+                    bool is_error = false;
+
+                    if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0)
+                    {
+                        /* is this not a continuation transfer? */
+                        if (link_instance->received_payload_size == 0)
+                        {
+                            LogError("Could not get the delivery Id from the transfer performative");
+                            is_error = true;
+                        }
+                    }
+                    
+                    if (!is_error)
+                    {
+                        /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */
+                        if ((link_instance->received_payload_size > 0) || more)
+                        {
+                            unsigned char* new_received_payload = (unsigned char*)realloc(link_instance->received_payload, link_instance->received_payload_size + payload_size);
+                            if (new_received_payload == NULL)
+                            {
+                                LogError("Could not allocate memory for the received payload");
+                            }
+                            else
+                            {
+                                link_instance->received_payload = new_received_payload;
+                                (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size);
+                                link_instance->received_payload_size += payload_size;
+                            }
+                        }
 
-					if (send_disposition(link_instance, received_delivery_id, delivery_state) != 0)
-					{
-						/* error */
-					}
+                        if (!more)
+                        {
+                            const unsigned char* indicate_payload_bytes;
+                            uint32_t indicate_payload_size;
+
+                            /* if no previously stored chunks then simply report the current payload */
+                            if (link_instance->received_payload_size > 0)
+                            {
+                                indicate_payload_size = link_instance->received_payload_size;
+                                indicate_payload_bytes = link_instance->received_payload;
+                            }
+                            else
+                            {
+                                indicate_payload_size = payload_size;
+                                indicate_payload_bytes = payload_bytes;
+                            }
+
+                            delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes);
 
-					if (delivery_state != NULL)
-					{
-						amqpvalue_destroy(delivery_state);
-					}
-				}
+                            if (link_instance->received_payload_size > 0)
+                            {
+                                free(link_instance->received_payload);
+                                link_instance->received_payload = NULL;
+                                link_instance->received_payload_size = 0;
+                            }
+
+                            if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0)
+                            {
+                                LogError("Cannot send disposition frame");
+                            }
+
+                            if (delivery_state != NULL)
+                            {
+                                amqpvalue_destroy(delivery_state);
+                            }
+                        }
+                    }
+                }
 
 				transfer_destroy(transfer_handle);
 			}
@@ -310,11 +430,11 @@
 
                 if (settled)
                 {
-                    LIST_ITEM_HANDLE pending_delivery = list_get_head_item(link_instance->pending_deliveries);
+                    LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries);
                     while (pending_delivery != NULL)
                     {
-                        LIST_ITEM_HANDLE next_pending_delivery = list_get_next_item(pending_delivery);
-                        DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)list_item_get_value(pending_delivery);
+                        LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery);
+                        DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(pending_delivery);
                         if (delivery_instance == NULL)
                         {
                             /* error */
@@ -333,7 +453,7 @@
                                 {
                                     delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, delivery_state);
                                     amqpalloc_free(delivery_instance);
-                                    if (list_remove(link_instance->pending_deliveries, pending_delivery) != 0)
+                                    if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0)
                                     {
                                         /* error */
                                         break;
@@ -360,12 +480,10 @@
 	{
         DETACH_HANDLE detach;
 
-        /* Respond with ack */
-        (void)send_detach_frame(link_instance, NULL);
-
         /* Set link state appropriately based on whether we received detach condition */
         if (amqpvalue_get_detach(performative, &detach) == 0)
         {
+            bool closed = false;
             ERROR_HANDLE error;
             if (detach_get_error(detach, &error) == 0)
             {
@@ -373,90 +491,36 @@
 
                 set_link_state(link_instance, LINK_STATE_ERROR);
             }
-            else
+            else 
             {
+                (void)detach_get_closed(detach, &closed);
+
                 set_link_state(link_instance, LINK_STATE_DETACHED);
             }
+
+            /* Received a detach while attached */
+            if (link_instance->previous_link_state == LINK_STATE_ATTACHED)
+            {
+                /* Respond with ack */
+                (void)send_detach(link_instance, closed, NULL);
+            }
+
+            /* Received a closing detach after we sent a non-closing detach. */
+            else if (closed &&
+                (link_instance->previous_link_state == LINK_STATE_HALF_ATTACHED) &&
+                !link_instance->is_closed)
+            {
+
+                /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/
+                (void)send_attach(link_instance, link_instance->name, 0, link_instance->role);
+                (void)send_detach(link_instance, true, NULL);
+            }
+
+            detach_destroy(detach);
         }
     }
 }
 
-static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, role role)
-{
-	int result;
-	ATTACH_HANDLE attach = attach_create(name, handle, role);
-
-	if (attach == NULL)
-	{
-		result = __LINE__;
-	}
-	else
-	{
-		result = 0;
-
-		link->delivery_count = link->initial_delivery_count;
-
-		attach_set_snd_settle_mode(attach, link->snd_settle_mode);
-		attach_set_rcv_settle_mode(attach, link->rcv_settle_mode);
-		attach_set_role(attach, role);
-		attach_set_source(attach, link->source);
-		attach_set_target(attach, link->target);
-        attach_set_properties(attach, link->attach_properties);
-
-		if (role == role_sender)
-		{
-			if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0)
-			{
-				result = __LINE__;
-			}
-		}
-
-		if (result == 0)
-		{
-			if ((attach_set_max_message_size(attach, link->max_message_size) != 0) ||
-				(session_send_attach(link->link_endpoint, attach) != 0))
-			{
-				result = __LINE__;
-			}
-			else
-			{
-				result = 0;
-			}
-		}
-
-		attach_destroy(attach);
-	}
-
-	return result;
-}
-
-static int send_detach(LINK_INSTANCE* link_instance, ERROR_HANDLE error)
-{
-	int result;
-
-	DETACH_HANDLE detach = detach_create(0);
-	if (detach == NULL)
-	{
-		result = __LINE__;
-	}
-	else
-	{
-		if (((error != NULL) && (detach_set_error(detach, error) != 0)) ||
-			 (session_send_detach(link_instance->link_endpoint, detach) != 0))
-		{
-			result = __LINE__;
-		}
-		else
-		{
-			result = 0;
-		}
-
-		detach_destroy(detach);
-	}
-
-	return result;
-}
-
 static void on_session_state_changed(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state)
 {
 	LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
@@ -464,7 +528,7 @@
 
 	if (new_session_state == SESSION_STATE_MAPPED)
 	{
-		if (link_instance->link_state == LINK_STATE_DETACHED)
+		if ((link_instance->link_state == LINK_STATE_DETACHED) && (!link_instance->is_closed))
 		{
 			if (send_attach(link_instance, link_instance->name, 0, link_instance->role) == 0)
 			{
@@ -494,14 +558,14 @@
 static void on_send_complete(void* context, IO_SEND_RESULT send_result)
 {
 	LIST_ITEM_HANDLE delivery_instance_list_item = (LIST_ITEM_HANDLE)context;
-	DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)list_item_get_value(delivery_instance_list_item);
+	DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(delivery_instance_list_item);
 	LINK_INSTANCE* link_instance = (LINK_INSTANCE*)delivery_instance->link;
     (void)send_result;
 	if (link_instance->snd_settle_mode == sender_settle_mode_settled)
 	{
 		delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, NULL);
 		amqpalloc_free(delivery_instance);
-		(void)list_remove(link_instance->pending_deliveries, delivery_instance_list_item);
+		(void)singlylinkedlist_remove(link_instance->pending_deliveries, delivery_instance_list_item);
 	}
 }
 
@@ -522,10 +586,14 @@
 		result->delivery_count = 0;
 		result->initial_delivery_count = 0;
 		result->max_message_size = 0;
-		result->is_underlying_session_begun = 0;
+		result->is_underlying_session_begun = false;
+        result->is_closed = false;
         result->attach_properties = NULL;
+        result->received_payload = NULL;
+        result->received_payload_size = 0;
+        result->received_delivery_id = 0;
 
-		result->pending_deliveries = list_create();
+		result->pending_deliveries = singlylinkedlist_create();
 		if (result->pending_deliveries == NULL)
 		{
 			amqpalloc_free(result);
@@ -536,7 +604,7 @@
 			result->name = amqpalloc_malloc(strlen(name) + 1);
 			if (result->name == NULL)
 			{
-				list_destroy(result->pending_deliveries);
+				singlylinkedlist_destroy(result->pending_deliveries);
 				amqpalloc_free(result);
 				result = NULL;
 			}
@@ -550,7 +618,7 @@
 				result->link_endpoint = session_create_link_endpoint(session, name);
 				if (result->link_endpoint == NULL)
 				{
-					list_destroy(result->pending_deliveries);
+					singlylinkedlist_destroy(result->pending_deliveries);
 					amqpalloc_free(result->name);
 					amqpalloc_free(result);
 					result = NULL;
@@ -576,8 +644,12 @@
 		result->delivery_count = 0;
 		result->initial_delivery_count = 0;
 		result->max_message_size = 0;
-		result->is_underlying_session_begun = 0;
+		result->is_underlying_session_begun = false;
+        result->is_closed = false;
         result->attach_properties = NULL;
+        result->received_payload = NULL;
+        result->received_payload_size = 0;
+        result->received_delivery_id = 0;
         result->source = amqpvalue_clone(target);
 		result->target = amqpvalue_clone(source);
 		if (role == role_sender)
@@ -589,7 +661,7 @@
 			result->role = role_sender;
 		}
 
-		result->pending_deliveries = list_create();
+		result->pending_deliveries = singlylinkedlist_create();
 		if (result->pending_deliveries == NULL)
 		{
 			amqpalloc_free(result);
@@ -600,7 +672,7 @@
 			result->name = amqpalloc_malloc(strlen(name) + 1);
 			if (result->name == NULL)
 			{
-				list_destroy(result->pending_deliveries);
+				singlylinkedlist_destroy(result->pending_deliveries);
 				amqpalloc_free(result);
 				result = NULL;
 			}
@@ -621,19 +693,18 @@
 {
 	if (link != NULL)
 	{
-		link->on_link_state_changed = NULL;
-		link_detach(link);
-
-		session_destroy_link_endpoint(link->link_endpoint);
+        link->on_link_state_changed = NULL;
+        (void)link_detach(link, true);
+        session_destroy_link_endpoint(link->link_endpoint);
 		amqpvalue_destroy(link->source);
 		amqpvalue_destroy(link->target);
 		if (link->pending_deliveries != NULL)
 		{
-			LIST_ITEM_HANDLE item = list_get_head_item(link->pending_deliveries);
+			LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
 			while (item != NULL)
 			{
-				LIST_ITEM_HANDLE next_item = list_get_next_item(item);
-				DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)list_item_get_value(item);
+				LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
+				DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
 				if (delivery_instance != NULL)
 				{
 					amqpalloc_free(delivery_instance);
@@ -642,7 +713,7 @@
 				item = next_item;
 			}
 
-			list_destroy(link->pending_deliveries);
+			singlylinkedlist_destroy(link->pending_deliveries);
 		}
 
 		if (link->name != NULL)
@@ -655,6 +726,11 @@
 			amqpvalue_destroy(link->attach_properties);
         }
 
+        if (link->received_payload != NULL)
+        {
+            free(link->received_payload);
+        }
+
 		amqpalloc_free(link);
 	}
 }
@@ -828,7 +904,8 @@
 {
 	int result;
 
-	if (link == NULL)
+	if ((link == NULL) ||
+        (link->is_closed))
 	{
 		result = __LINE__;
 	}
@@ -847,7 +924,7 @@
 			}
 			else
 			{
-				link->is_underlying_session_begun = 1;
+				link->is_underlying_session_begun = true;
 
 				if (session_start_link_endpoint(link->link_endpoint, link_frame_received, on_session_state_changed, on_session_flow_on, link) != 0)
 				{
@@ -855,6 +932,8 @@
 				}
 				else
 				{
+                    link->received_payload_size = 0;
+
 					result = 0;
 				}
 			}
@@ -868,40 +947,57 @@
 	return result;
 }
 
-int link_detach(LINK_HANDLE link)
+int link_detach(LINK_HANDLE link, bool close)
 {
 	int result;
 
-	if (link == NULL)
-	{
+    if ((link == NULL) ||
+        (link->is_closed))
+    {
 		result = __LINE__;
 	}
 	else
 	{
-		if (link->link_state == LINK_STATE_ERROR)
-		{
-			result = __LINE__;
-		}
-		else if ((link->link_state == LINK_STATE_HALF_ATTACHED) ||
-			(link->link_state == LINK_STATE_ATTACHED))
-		{
-			if (send_detach(link, NULL) != 0)
-			{
-				result = __LINE__;
-			}
-			else
-			{
-				set_link_state(link, LINK_STATE_DETACHED);
-				link->on_link_state_changed = NULL;
-				result = 0;
-			}
-		}
-		else
-		{
-			set_link_state(link, LINK_STATE_DETACHED);
-			link->on_link_state_changed = NULL;
-			result = 0;
-		}
+        switch (link->link_state)
+        {
+
+        case LINK_STATE_HALF_ATTACHED:
+            /* Sending detach when remote is not yet attached */
+            if (send_detach(link, close, NULL) != 0)
+            {
+                result = __LINE__;
+            }
+            else
+            {
+                set_link_state(link, LINK_STATE_DETACHED);
+                result = 0;
+            }
+            break;
+
+        case LINK_STATE_ATTACHED:
+            /* Send detach and wait for remote to respond */
+            if (send_detach(link, close, NULL) != 0)
+            {
+                result = __LINE__;
+            }
+            else
+            {
+                set_link_state(link, LINK_STATE_HALF_ATTACHED);
+                result = 0;
+            }
+            break;
+
+        case LINK_STATE_DETACHED:
+            /* Already detached */
+            result = 0;
+            break;
+
+        default:
+        case LINK_STATE_ERROR:
+            /* Already detached and in error state */
+            result = __LINE__;
+            break;
+        }
 	}
 
 	return result;
@@ -981,7 +1077,7 @@
 							pending_delivery->on_delivery_settled = on_delivery_settled;
 							pending_delivery->callback_context = callback_context;
 							pending_delivery->link = link;
-							delivery_instance_list_item = list_add(link->pending_deliveries, pending_delivery);
+							delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
 
 							if (delivery_instance_list_item == NULL)
 							{
@@ -995,14 +1091,14 @@
 								{
 								default:
 								case SESSION_SEND_TRANSFER_ERROR:
-									list_remove(link->pending_deliveries, delivery_instance_list_item);
+									singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
 									amqpalloc_free(pending_delivery);
 									result = LINK_TRANSFER_ERROR;
 									break;
 
 								case SESSION_SEND_TRANSFER_BUSY:
 									/* Ensure we remove from list again since sender will attempt to transfer again on flow on */
-									list_remove(link->pending_deliveries, delivery_instance_list_item);
+									singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
 									amqpalloc_free(pending_delivery);
 									result = LINK_TRANSFER_BUSY;
 									break;