A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
41:0e723f9cbd89
Parent:
40:f0ceafa8d570
Child:
43:4c1e4e94cdd3
--- a/amqp_management.c	Mon Mar 05 17:41:28 2018 -0800
+++ b/amqp_management.c	Tue Mar 20 10:29:38 2018 -0700
@@ -27,12 +27,14 @@
     ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete;
     void* callback_context;
     uint64_t message_id;
+    AMQP_MANAGEMENT_HANDLE amqp_management;
 } OPERATION_MESSAGE_INSTANCE;
 
 typedef enum AMQP_MANAGEMENT_STATE_TAG
 {
     AMQP_MANAGEMENT_STATE_IDLE,
     AMQP_MANAGEMENT_STATE_OPENING,
+    AMQP_MANAGEMENT_STATE_CLOSING,
     AMQP_MANAGEMENT_STATE_OPEN,
     AMQP_MANAGEMENT_STATE_ERROR
 } AMQP_MANAGEMENT_STATE;
@@ -330,6 +332,46 @@
     return result;
 }
 
+static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result)
+{
+    if (context == NULL)
+    {
+        /* Codes_SRS_AMQP_MANAGEMENT_01_167: [ When `on_message_send_complete` is called with a NULL context it shall return. ]*/
+        LogError("NULL context");
+    }
+    else
+    {
+        if (send_result == MESSAGE_SEND_OK)
+        {
+            /* Codes_SRS_AMQP_MANAGEMENT_01_170: [ If `send_result` is `MESSAGE_SEND_OK`, `on_message_send_complete` shall return. ]*/
+        }
+        else
+        {
+            /* Codes_SRS_AMQP_MANAGEMENT_01_172: [ If `send_result` is different then `MESSAGE_SEND_OK`: ]*/
+            /* Codes_SRS_AMQP_MANAGEMENT_01_168: [ - `context` shall be used as a LIST_ITEM_HANDLE containing the pending operation. ]*/
+            LIST_ITEM_HANDLE pending_operation_list_item_handle = (LIST_ITEM_HANDLE)context;
+
+            /* Codes_SRS_AMQP_MANAGEMENT_01_169: [ - `on_message_send_complete` shall obtain the pending operation by calling `singlylinkedlist_item_get_value`. ]*/
+            OPERATION_MESSAGE_INSTANCE* pending_operation_message = (OPERATION_MESSAGE_INSTANCE*)singlylinkedlist_item_get_value(pending_operation_list_item_handle);
+            AMQP_MANAGEMENT_HANDLE amqp_management = pending_operation_message->amqp_management;
+
+            /* Codes_SRS_AMQP_MANAGEMENT_01_171: [ - `on_message_send_complete` shall removed the pending operation from the pending operations list. ]*/
+            if (singlylinkedlist_remove(amqp_management->pending_operations, pending_operation_list_item_handle) != 0)
+            {
+                /* Tests_SRS_AMQP_MANAGEMENT_01_174: [ If any error occurs in removing the pending operation from the list `on_amqp_management_error` callback shall be invoked while passing the `on_amqp_management_error_context` as argument. ]*/
+                amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
+                LogError("Cannot remove pending operation");
+            }
+            else
+            {
+                /* Codes_SRS_AMQP_MANAGEMENT_01_173: [ - The callback associated with the pending operation shall be called with `AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR`. ]*/
+                pending_operation_message->on_execute_operation_complete(pending_operation_message->callback_context, AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR, 0, NULL, NULL);
+                free(pending_operation_message);
+            }
+        }
+    }
+}
+
 static void on_message_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
 {
     if (context == NULL)
@@ -400,6 +442,26 @@
                 }
                 break;
             }
+            /* Codes_SRS_AMQP_MANAGEMENT_09_001: [ For the current state of AMQP management being `CLOSING`: ]*/
+            case AMQP_MANAGEMENT_STATE_CLOSING:
+            {
+                switch (new_state)
+                {
+                default:
+                    /* Codes_SRS_AMQP_MANAGEMENT_09_002: [ - If `new_state` is `MESSAGE_SENDER_STATE_OPEN`, `MESSAGE_SENDER_STATE_OPENING`, `MESSAGE_SENDER_STATE_ERROR` the `on_amqp_management_error` callback shall be invoked while passing the `on_amqp_management_error_context` as argument. ]*/
+                case MESSAGE_SENDER_STATE_OPEN:
+                case MESSAGE_SENDER_STATE_OPENING:
+                case MESSAGE_SENDER_STATE_ERROR:
+                    amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR;
+                    amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context);
+                    break;
+                case MESSAGE_SENDER_STATE_IDLE:
+                case MESSAGE_SENDER_STATE_CLOSING:
+                    /* Codes_SRS_AMQP_MANAGEMENT_09_003: [ - If `new_state` is `MESSAGE_SENDER_STATE_CLOSING` or `MESSAGE_SENDER_STATE_IDLE`, `on_message_sender_state_changed` shall do nothing. ]*/
+                    break;
+                }
+                break;
+            }
             /* Codes_SRS_AMQP_MANAGEMENT_01_146: [ For the current state of AMQP management being `ERROR`: ]*/
             case AMQP_MANAGEMENT_STATE_ERROR:
                 /* Codes_SRS_AMQP_MANAGEMENT_01_147: [ - All state transitions shall be ignored. ]*/
@@ -958,6 +1020,17 @@
     }
     else
     {
+        AMQP_MANAGEMENT_STATE previous_state = amqp_management->amqp_management_state;
+
+        amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_CLOSING;
+
+        if (previous_state == AMQP_MANAGEMENT_STATE_OPENING)
+        {
+            /* Codes_SRS_AMQP_MANAGEMENT_01_048: [ `amqp_management_close` on an AMQP management instance that is OPENING shall trigger the `on_amqp_management_open_complete` callback with `AMQP_MANAGEMENT_OPEN_CANCELLED`, while also passing the context passed in `amqp_management_open_async`. ]*/
+            amqp_management->on_amqp_management_open_complete(amqp_management->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_CANCELLED);
+        }
+
+
         /* Codes_SRS_AMQP_MANAGEMENT_01_045: [ `amqp_management_close` shall close the AMQP management instance. ]*/
         /* Codes_SRS_AMQP_MANAGEMENT_01_050: [ `amqp_management_close` shall close the message sender by calling `messagesender_close`. ]*/
         if (messagesender_close(amqp_management->message_sender) != 0)
@@ -998,12 +1071,6 @@
                 list_item_handle = singlylinkedlist_get_head_item(amqp_management->pending_operations);
             }
 
-            if (amqp_management->amqp_management_state == AMQP_MANAGEMENT_STATE_OPENING)
-            {
-                /* Codes_SRS_AMQP_MANAGEMENT_01_048: [ `amqp_management_close` on an AMQP management instance that is OPENING shall trigger the `on_amqp_management_open_complete` callback with `AMQP_MANAGEMENT_OPEN_CANCELLED`, while also passing the context passed in `amqp_management_open_async`. ]*/
-                amqp_management->on_amqp_management_open_complete(amqp_management->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_CANCELLED);
-            }
-
             amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
 
             /* Codes_SRS_AMQP_MANAGEMENT_01_046: [ On success it shall return 0. ]*/
@@ -1126,6 +1193,7 @@
                                 pending_operation_message->callback_context = on_execute_operation_complete_context;
                                 pending_operation_message->on_execute_operation_complete = on_execute_operation_complete;
                                 pending_operation_message->message_id = amqp_management->next_message_id;
+                                pending_operation_message->amqp_management = amqp_management;
 
                                 /* Codes_SRS_AMQP_MANAGEMENT_01_091: [ Once the request message has been sent, an entry shall be stored in the pending operations list by calling `singlylinkedlist_add`. ]*/
                                 added_item = singlylinkedlist_add(amqp_management->pending_operations, pending_operation_message);
@@ -1138,8 +1206,9 @@
                                 }
                                 else
                                 {
-                                    /* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send`. ]*/
-                                    if (messagesender_send_async(amqp_management->message_sender, cloned_message, NULL, NULL, 0) == NULL)
+                                    /* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send_async`. ]*/
+                                    /* Codes_SRS_AMQP_MANAGEMENT_01_166: [ The `on_message_send_complete` callback shall be passed to the `messagesender_send_async` call. ]*/
+                                    if (messagesender_send_async(amqp_management->message_sender, cloned_message, on_message_send_complete, added_item, 0) == NULL)
                                     {
                                         /* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send_async` fails, `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/
                                         LogError("Could not send request message");