A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
22:524bded3f7a8
Parent:
21:f9c433d8e6ca
Child:
23:1111ee8bcba4
--- a/amqp_management.c	Fri Mar 24 16:35:45 2017 -0700
+++ b/amqp_management.c	Thu Apr 06 14:11:27 2017 -0700
@@ -24,11 +24,19 @@
 {
     MESSAGE_HANDLE message;
     OPERATION_STATE operation_state;
-    ON_OPERATION_COMPLETE on_operation_complete;
+    ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete;
     void* callback_context;
     unsigned long message_id;
 } OPERATION_MESSAGE_INSTANCE;
 
+typedef enum AMQP_MANAGEMENT_STATE_TAG
+{
+    AMQP_MANAGEMENT_STATE_IDLE,
+    AMQP_MANAGEMENT_STATE_OPENING,
+    AMQP_MANAGEMENT_STATE_OPEN,
+    AMQP_MANAGEMENT_STATE_ERROR
+} AMQP_MANAGEMENT_STATE;
+
 typedef struct AMQP_MANAGEMENT_INSTANCE_TAG
 {
     SESSION_HANDLE session;
@@ -39,25 +47,15 @@
     OPERATION_MESSAGE_INSTANCE** operation_messages;
     size_t operation_message_count;
     unsigned long next_message_id;
-    ON_AMQP_MANAGEMENT_STATE_CHANGED on_amqp_management_state_changed;
-    void* callback_context;
+    ON_AMQP_MANAGEMENT_OPEN_COMPLETE on_amqp_management_open_complete;
+    void* on_amqp_management_open_complete_context;
+    ON_AMQP_MANAGEMENT_ERROR on_amqp_management_error;
+    void* on_amqp_management_error_context;
     AMQP_MANAGEMENT_STATE amqp_management_state;
-    AMQP_MANAGEMENT_STATE previous_amqp_management_state;
     int sender_connected : 1;
     int receiver_connected : 1;
 } AMQP_MANAGEMENT_INSTANCE;
 
-static void amqpmanagement_set_state(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance, AMQP_MANAGEMENT_STATE amqp_management_state)
-{
-    amqp_management_instance->previous_amqp_management_state = amqp_management_instance->amqp_management_state;
-    amqp_management_instance->amqp_management_state = amqp_management_state;
-
-    if (amqp_management_instance->on_amqp_management_state_changed != NULL)
-    {
-        amqp_management_instance->on_amqp_management_state_changed(amqp_management_instance->callback_context, amqp_management_instance->amqp_management_state, amqp_management_instance->previous_amqp_management_state);
-    }
-}
-
 static void remove_operation_message_by_index(AMQP_MANAGEMENT_INSTANCE* amqp_management_instance, size_t index)
 {
     message_destroy(amqp_management_instance->operation_messages[index]->message);
@@ -166,7 +164,7 @@
                                         if (amqp_management_instance->operation_messages[i]->operation_state == OPERATION_STATE_AWAIT_REPLY)
                                         {
                                             AMQP_VALUE expected_message_id = amqpvalue_create_ulong(amqp_management_instance->operation_messages[i]->message_id);
-                                            OPERATION_RESULT operation_result;
+                                            AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT execute_operation_result;
 
                                             if (expected_message_id == NULL)
                                             {
@@ -179,14 +177,14 @@
                                                     /* 202 is not mentioned in the draft in any way, this is a workaround for an EH bug for now */
                                                     if ((status_code != 200) && (status_code != 202))
                                                     {
-                                                        operation_result = OPERATION_RESULT_OPERATION_FAILED;
+                                                        execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS;
                                                     }
                                                     else
                                                     {
-                                                        operation_result = OPERATION_RESULT_OK;
+                                                        execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_OK;
                                                     }
 
-                                                    amqp_management_instance->operation_messages[i]->on_operation_complete(amqp_management_instance->operation_messages[i]->callback_context, operation_result, status_code, status_description);
+                                                    amqp_management_instance->operation_messages[i]->on_execute_operation_complete(amqp_management_instance->operation_messages[i]->callback_context, execute_operation_result, status_code, status_description);
 
                                                     remove_operation_message_by_index(amqp_management_instance, i);
 
@@ -281,14 +279,52 @@
         break;
 
     case MESSAGE_SENDER_STATE_OPEN:
-        amqp_management_instance->sender_connected = 1;
-        (void)send_operation_messages(amqp_management_instance);
+        switch (amqp_management_instance->amqp_management_state)
+        {
+        default:
+        case AMQP_MANAGEMENT_STATE_ERROR:
+            /* do nothing */
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPENING:
+            amqp_management_instance->sender_connected = -1;
+            if (amqp_management_instance->receiver_connected != 0)
+            {
+                (void)send_operation_messages(amqp_management_instance);
+                amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_OPEN;
+                amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_OK);
+            }
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPEN:
+            amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR;
+            amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context);
+            break;
+        }
+
         break;
 
     case MESSAGE_SENDER_STATE_CLOSING:
+    case MESSAGE_SENDER_STATE_IDLE:
     case MESSAGE_SENDER_STATE_ERROR:
         amqp_management_instance->sender_connected = 0;
-        amqpmanagement_set_state(amqp_management_instance, AMQP_MANAGEMENT_STATE_ERROR);
+        switch (amqp_management_instance->amqp_management_state)
+        {
+        default:
+        case AMQP_MANAGEMENT_STATE_ERROR:
+            /* do nothing */
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPENING:
+            amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
+            amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_ERROR);
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPEN:
+            amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR;
+            amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context);
+            break;
+        }
         break;
     }
 }
@@ -303,14 +339,53 @@
         break;
 
     case MESSAGE_RECEIVER_STATE_OPEN:
-        amqp_management_instance->receiver_connected = 1;
-        (void)send_operation_messages(amqp_management_instance);
+        switch (amqp_management_instance->amqp_management_state)
+        {
+        default:
+        case AMQP_MANAGEMENT_STATE_ERROR:
+            /* do nothing */
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPENING:
+            amqp_management_instance->receiver_connected = -1;
+
+            if (amqp_management_instance->sender_connected != 0)
+            {
+                (void)send_operation_messages(amqp_management_instance);
+                amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_OPEN;
+                amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_OK);
+            }
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPEN:
+            amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR;
+            amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context);
+            break;
+        }
+
         break;
 
     case MESSAGE_RECEIVER_STATE_CLOSING:
+    case MESSAGE_RECEIVER_STATE_IDLE:
     case MESSAGE_RECEIVER_STATE_ERROR:
-        amqp_management_instance->receiver_connected = 0;
-        amqpmanagement_set_state(amqp_management_instance, AMQP_MANAGEMENT_STATE_ERROR);
+        amqp_management_instance->sender_connected = 0;
+        switch (amqp_management_instance->amqp_management_state)
+        {
+        default:
+        case AMQP_MANAGEMENT_STATE_ERROR:
+            /* do nothing */
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPENING:
+            amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
+            amqp_management_instance->on_amqp_management_open_complete(amqp_management_instance->on_amqp_management_open_complete_context, AMQP_MANAGEMENT_OPEN_ERROR);
+            break;
+
+        case AMQP_MANAGEMENT_STATE_OPEN:
+            amqp_management_instance->amqp_management_state = AMQP_MANAGEMENT_STATE_ERROR;
+            amqp_management_instance->on_amqp_management_error(amqp_management_instance->on_amqp_management_error_context);
+            break;
+        }
         break;
     }
 }
@@ -400,7 +475,7 @@
     return result;
 }
 
-AMQP_MANAGEMENT_HANDLE amqpmanagement_create(SESSION_HANDLE session, const char* management_node, ON_AMQP_MANAGEMENT_STATE_CHANGED on_amqp_management_state_changed, void* callback_context)
+AMQP_MANAGEMENT_HANDLE amqp_management_create(SESSION_HANDLE session, const char* management_node)
 {
     AMQP_MANAGEMENT_INSTANCE* result;
 
@@ -418,8 +493,10 @@
             result->receiver_connected = 0;
             result->operation_message_count = 0;
             result->operation_messages = NULL;
-            result->on_amqp_management_state_changed = on_amqp_management_state_changed;
-            result->callback_context = callback_context;
+            result->on_amqp_management_open_complete = NULL;
+            result->on_amqp_management_open_complete_context = NULL;
+            result->on_amqp_management_error = NULL;
+            result->on_amqp_management_error_context = NULL;
             result->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
 
             AMQP_VALUE source = messaging_create_source(management_node);
@@ -534,11 +611,11 @@
     return result;
 }
 
-void amqpmanagement_destroy(AMQP_MANAGEMENT_HANDLE amqp_management)
+void amqp_management_destroy(AMQP_MANAGEMENT_HANDLE amqp_management)
 {
     if (amqp_management != NULL)
     {
-        (void)amqpmanagement_close(amqp_management);
+        (void)amqp_management_close(amqp_management);
 
         if (amqp_management->operation_message_count > 0)
         {
@@ -560,7 +637,7 @@
     }
 }
 
-int amqpmanagement_open(AMQP_MANAGEMENT_HANDLE amqp_management)
+int amqp_management_open_async(AMQP_MANAGEMENT_HANDLE amqp_management, ON_AMQP_MANAGEMENT_OPEN_COMPLETE on_amqp_management_open_complete, void* on_amqp_management_open_complete_context, ON_AMQP_MANAGEMENT_ERROR on_amqp_management_error, void* on_amqp_management_error_context)
 {
     int result;
 
@@ -570,14 +647,22 @@
     }
     else
     {
+        amqp_management->on_amqp_management_open_complete = on_amqp_management_open_complete;
+        amqp_management->on_amqp_management_open_complete_context = on_amqp_management_open_complete_context;
+        amqp_management->on_amqp_management_error = on_amqp_management_error;
+        amqp_management->on_amqp_management_error_context = on_amqp_management_error_context;
+        amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_OPENING;
+
         if (messagereceiver_open(amqp_management->message_receiver, on_message_received, amqp_management) != 0)
         {
+            amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
             result = __FAILURE__;
         }
         else
         {
             if (messagesender_open(amqp_management->message_sender) != 0)
             {
+                amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
                 messagereceiver_close(amqp_management->message_receiver);
                 result = __FAILURE__;
             }
@@ -591,7 +676,7 @@
     return result;
 }
 
-int amqpmanagement_close(AMQP_MANAGEMENT_HANDLE amqp_management)
+int amqp_management_close(AMQP_MANAGEMENT_HANDLE amqp_management)
 {
     int result;
 
@@ -608,6 +693,7 @@
         }
         else
         {
+            amqp_management->amqp_management_state = AMQP_MANAGEMENT_STATE_IDLE;
             result = 0;
         }
     }
@@ -615,7 +701,7 @@
     return result;
 }
 
-int amqpmanagement_start_operation(AMQP_MANAGEMENT_HANDLE amqp_management, const char* operation, const char* type, const char* locales, MESSAGE_HANDLE message, ON_OPERATION_COMPLETE on_operation_complete, void* context)
+int amqp_management_execute_operation_async(AMQP_MANAGEMENT_HANDLE amqp_management, const char* operation, const char* type, const char* locales, MESSAGE_HANDLE message, ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete, void* on_execute_operation_complete_context)
 {
     int result;
 
@@ -656,8 +742,8 @@
                     else
                     {
                         pending_operation_message->message = message_clone(message);
-                        pending_operation_message->callback_context = context;
-                        pending_operation_message->on_operation_complete = on_operation_complete;
+                        pending_operation_message->callback_context = on_execute_operation_complete_context;
+                        pending_operation_message->on_execute_operation_complete = on_execute_operation_complete;
                         pending_operation_message->operation_state = OPERATION_STATE_NOT_SENT;
                         pending_operation_message->message_id = amqp_management->next_message_id;
 
@@ -678,9 +764,9 @@
 
                             if (send_operation_messages(amqp_management) != 0)
                             {
-                                if (on_operation_complete != NULL)
+                                if (on_execute_operation_complete != NULL)
                                 {
-                                    on_operation_complete(context, OPERATION_RESULT_ERROR, 0, NULL);
+                                    on_execute_operation_complete(on_execute_operation_complete_context, AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR, 0, NULL);
                                 }
 
                                 result = __FAILURE__;
@@ -700,7 +786,7 @@
     return result;
 }
 
-void amqpmanagement_set_trace(AMQP_MANAGEMENT_HANDLE amqp_management, bool traceOn)
+void amqp_management_set_trace(AMQP_MANAGEMENT_HANDLE amqp_management, bool traceOn)
 {
     if (amqp_management != NULL)
     {