A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
43:4c1e4e94cdd3
Parent:
41:0e723f9cbd89
Child:
44:9dd558f13109
--- a/link.c	Fri May 04 13:24:52 2018 -0700
+++ b/link.c	Mon Jun 11 15:39:52 2018 -0700
@@ -29,6 +29,12 @@
     tickcounter_ms_t timeout;
 } DELIVERY_INSTANCE;
 
+typedef struct ON_LINK_DETACH_EVENT_SUBSCRIPTION_TAG
+{
+    ON_LINK_DETACH_RECEIVED on_link_detach_received;
+    void* context;
+} ON_LINK_DETACH_EVENT_SUBSCRIPTION;
+
 typedef struct LINK_INSTANCE_TAG
 {
     SESSION_HANDLE session;
@@ -61,6 +67,7 @@
     uint32_t received_payload_size;
     delivery_number received_delivery_id;
     TICK_COUNTER_HANDLE tick_counter;
+    ON_LINK_DETACH_EVENT_SUBSCRIPTION on_link_detach_received_event_subscription;
 } LINK_INSTANCE;
 
 DEFINE_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE);
@@ -595,7 +602,6 @@
                     LogError("Failed sending detach frame");
                 }
             }
-
             /* Received a closing detach after we sent a non-closing detach. */
             else if (closed &&
                 ((link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT) || (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) &&
@@ -616,10 +622,16 @@
 
             if (detach_get_error(detach, &error) == 0)
             {
-                error_destroy(error);
-
                 remove_all_pending_deliveries(link_instance, true);
                 set_link_state(link_instance, LINK_STATE_ERROR);
+
+                // signal link detach received in order to handle cases like redirect
+                if (link_instance->on_link_detach_received_event_subscription.on_link_detach_received != NULL)
+                {
+                    link_instance->on_link_detach_received_event_subscription.on_link_detach_received(link_instance->on_link_detach_received_event_subscription.context, error);
+                }
+
+                error_destroy(error);
             }
             else 
             {
@@ -714,6 +726,8 @@
         result->received_payload = NULL;
         result->received_payload_size = 0;
         result->received_delivery_id = 0;
+        result->on_link_detach_received_event_subscription.on_link_detach_received = NULL;
+        result->on_link_detach_received_event_subscription.context = NULL;
 
         result->tick_counter = tickcounter_create();
         if (result->tick_counter == NULL)
@@ -797,6 +811,8 @@
         result->received_delivery_id = 0;
         result->source = amqpvalue_clone(target);
         result->target = amqpvalue_clone(source);
+        result->on_link_detach_received_event_subscription.on_link_detach_received = NULL;
+        result->on_link_detach_received_event_subscription.context = NULL;
 
         if (role == role_sender)
         {
@@ -862,7 +878,7 @@
         tickcounter_destroy(link->tick_counter);
 
         link->on_link_state_changed = NULL;
-        (void)link_detach(link, true);
+        (void)link_detach(link, true, NULL, NULL, NULL);
         session_destroy_link_endpoint(link->link_endpoint);
         amqpvalue_destroy(link->source);
         amqpvalue_destroy(link->target);
@@ -1162,10 +1178,14 @@
     return result;
 }
 
-int link_detach(LINK_HANDLE link, bool close)
+int link_detach(LINK_HANDLE link, bool close, const char* error_condition, const char* error_description, AMQP_VALUE info)
 {
     int result;
 
+    (void)error_condition;
+    (void)error_description;
+    (void)info;
+
     if (link == NULL)
     {
         LogError("NULL link");
@@ -1177,12 +1197,45 @@
     }
     else
     {
+        ERROR_HANDLE error;
+
+        if (error_condition != NULL)
+        {
+            error = error_create(error_condition);
+            if (error == NULL)
+            {
+                LogInfo("Cannot create error for detach, detaching without error anyhow");
+            }
+            else
+            {
+                if (error_description != NULL)
+                {
+                    if (error_set_description(error, error_description) != 0)
+                    {
+                        LogInfo("Cannot set error description on detach error, detaching anyhow");
+                    }
+                }
+
+                if (info != NULL)
+                {
+                    if (error_set_info(error, info) != 0)
+                    {
+                        LogInfo("Cannot set info map on detach error, detaching anyhow");
+                    }
+                }
+            }
+        }
+        else
+        {
+            error = NULL;
+        }
+
         switch (link->link_state)
         {
         case LINK_STATE_HALF_ATTACHED_ATTACH_SENT:
         case LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED:
             /* Sending detach when remote is not yet attached */
-            if (send_detach(link, close, NULL) != 0)
+            if (send_detach(link, close, error) != 0)
             {
                 LogError("Sending detach frame failed");
                 result = __FAILURE__;
@@ -1196,7 +1249,7 @@
 
         case LINK_STATE_ATTACHED:
             /* Send detach and wait for remote to respond */
-            if (send_detach(link, close, NULL) != 0)
+            if (send_detach(link, close, error) != 0)
             {
                 LogError("Sending detach frame failed");
                 result = __FAILURE__;
@@ -1219,6 +1272,11 @@
             result = __FAILURE__;
             break;
         }
+
+        if (error != NULL)
+        {
+            error_destroy(error);
+        }
     }
 
     return result;
@@ -1550,3 +1608,46 @@
         }
     }
 }
+
+ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE link_subscribe_on_link_detach_received(LINK_HANDLE link, ON_LINK_DETACH_RECEIVED on_link_detach_received, void* context)
+{
+    ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE result;
+
+    if ((link == NULL) ||
+        (on_link_detach_received == NULL))
+    {
+        LogError("Invalid arguments: link = %p, on_link_detach_received = %p, context = %p",
+            link, on_link_detach_received, context);
+        result = NULL;
+    }
+    else
+    {
+        if (link->on_link_detach_received_event_subscription.on_link_detach_received != NULL)
+        {
+            LogError("Already subscribed for on_link_detach_received events");
+            result = NULL;
+        }
+        else
+        {
+            link->on_link_detach_received_event_subscription.on_link_detach_received = on_link_detach_received;
+            link->on_link_detach_received_event_subscription.context = context;
+
+            result = &link->on_link_detach_received_event_subscription;
+        }
+    }
+
+    return result;
+}
+
+void link_unsubscribe_on_link_detach_received(ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE event_subscription)
+{
+    if (event_subscription == NULL)
+    {
+        LogError("NULL event_subscription");
+    }
+    else
+    {
+        event_subscription->on_link_detach_received = NULL;
+        event_subscription->context = NULL;
+    }
+}