A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
34:6be9c2058664
Parent:
28:add19eb7defa
Child:
40:f0ceafa8d570
diff -r 08b53020ff0d -r 6be9c2058664 message_receiver.c
--- a/message_receiver.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/message_receiver.c	Sat Oct 21 20:12:19 2017 +0000
@@ -22,27 +22,28 @@
     bool decode_error;
 } MESSAGE_RECEIVER_INSTANCE;
 
-static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state)
+static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver, MESSAGE_RECEIVER_STATE new_state)
 {
-    MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state;
-    message_receiver_instance->message_receiver_state = new_state;
-    if (message_receiver_instance->on_message_receiver_state_changed != NULL)
+    MESSAGE_RECEIVER_STATE previous_state = message_receiver->message_receiver_state;
+    message_receiver->message_receiver_state = new_state;
+    if (message_receiver->on_message_receiver_state_changed != NULL)
     {
-        message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state);
+        message_receiver->on_message_receiver_state_changed(message_receiver->on_message_receiver_state_changed_context, new_state, previous_state);
     }
 }
 
 static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
 {
-    MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
-    MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message;
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
+    MESSAGE_HANDLE decoded_message = message_receiver->decoded_message;
     AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
 
     if (is_application_properties_type_by_descriptor(descriptor))
     {
         if (message_set_application_properties(decoded_message, decoded_value) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error setting application properties on received message");
+            message_receiver->decode_error = true;
         }
     }
     else if (is_properties_type_by_descriptor(descriptor))
@@ -50,13 +51,15 @@
         PROPERTIES_HANDLE properties;
         if (amqpvalue_get_properties(decoded_value, &properties) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message properties");
+            message_receiver->decode_error = true;
         }
         else
         {
             if (message_set_properties(decoded_message, properties) != 0)
             {
-                message_receiver_instance->decode_error = true;
+                LogError("Error setting message properties on received message");
+                message_receiver->decode_error = true;
             }
 
             properties_destroy(properties);
@@ -65,19 +68,35 @@
     else if (is_delivery_annotations_type_by_descriptor(descriptor))
     {
         annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
-        if ((delivery_annotations == NULL) ||
-            (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0))
+        if (delivery_annotations == NULL)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting delivery annotations");
+            message_receiver->decode_error = true;
+        }
+        else
+        {
+            if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)
+            {
+                LogError("Error setting delivery annotations on received message");
+                message_receiver->decode_error = true;
+            }
         }
     }
     else if (is_message_annotations_type_by_descriptor(descriptor))
     {
         annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
-        if ((message_annotations == NULL) ||
-            (message_set_message_annotations(decoded_message, message_annotations) != 0))
+        if (message_annotations == NULL)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message annotations");
+            message_receiver->decode_error = true;
+        }
+        else
+        {
+            if (message_set_message_annotations(decoded_message, message_annotations) != 0)
+            {
+                LogError("Error setting message annotations on received message");
+                message_receiver->decode_error = true;
+            }
         }
     }
     else if (is_header_type_by_descriptor(descriptor))
@@ -85,13 +104,15 @@
         HEADER_HANDLE header;
         if (amqpvalue_get_header(decoded_value, &header) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message header");
+            message_receiver->decode_error = true;
         }
         else
         {
             if (message_set_header(decoded_message, header) != 0)
             {
-                message_receiver_instance->decode_error = true;
+                LogError("Error setting message header on received message");
+                message_receiver->decode_error = true;
             }
 
             header_destroy(header);
@@ -100,57 +121,97 @@
     else if (is_footer_type_by_descriptor(descriptor))
     {
         annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
-        if ((footer == NULL) ||
-            (message_set_footer(decoded_message, footer) != 0))
+        if (footer == NULL)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message footer");
+            message_receiver->decode_error = true;
+        }
+        else
+        {
+            if (message_set_footer(decoded_message, footer) != 0)
+            {
+                LogError("Error setting message footer on received message");
+                message_receiver->decode_error = true;
+            }
         }
     }
     else if (is_amqp_value_type_by_descriptor(descriptor))
     {
         MESSAGE_BODY_TYPE body_type;
-        message_get_body_type(decoded_message, &body_type);
-        if (body_type != MESSAGE_BODY_TYPE_NONE)
+        if (message_get_body_type(decoded_message, &body_type) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message body type");
+            message_receiver->decode_error = true;
         }
         else
         {
-            AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
-            if ((body_amqp_value == NULL) ||
-                (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0))
+            if (body_type != MESSAGE_BODY_TYPE_NONE)
+            {
+                LogError("Body already set on received message");
+                message_receiver->decode_error = true;
+            }
+            else
             {
-                message_receiver_instance->decode_error = true;
+                AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
+                if (body_amqp_value == NULL)
+                {
+                    LogError("Error getting body AMQP value");
+                    message_receiver->decode_error = true;
+                }
+                else
+                {
+                    if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)
+                    {
+                        LogError("Error setting body AMQP value on received message");
+                        message_receiver->decode_error = true;
+                    }
+                }
             }
         }
     }
     else if (is_data_type_by_descriptor(descriptor))
     {
         MESSAGE_BODY_TYPE body_type;
-        message_get_body_type(decoded_message, &body_type);
-        if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
-            (body_type != MESSAGE_BODY_TYPE_DATA))
+        if (message_get_body_type(decoded_message, &body_type) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message body type");
+            message_receiver->decode_error = true;
         }
         else
         {
-            AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
-            data data_value;
-
-            if ((body_data_value == NULL) ||
-                (amqpvalue_get_data(body_data_value, &data_value) != 0))
+            if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
+                (body_type != MESSAGE_BODY_TYPE_DATA))
             {
-                message_receiver_instance->decode_error = true;
+                LogError("Message body type already set to something different than AMQP DATA");
+                message_receiver->decode_error = true;
             }
             else
             {
-                BINARY_DATA binary_data;
-                binary_data.bytes = (const unsigned char*)data_value.bytes;
-                binary_data.length = data_value.length;
-                if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
+                AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
+                if (body_data_value == NULL)
+                {
+                    LogError("Error getting body DATA value");
+                    message_receiver->decode_error = true;
+                }
+                else
                 {
-                    message_receiver_instance->decode_error = true;
+                    data data_value;
+                    if (amqpvalue_get_data(body_data_value, &data_value) != 0)
+                    {
+                        LogError("Error getting body DATA AMQP value");
+                        message_receiver->decode_error = true;
+                    }
+                    else
+                    {
+                        BINARY_DATA binary_data;
+                        binary_data.bytes = (const unsigned char*)data_value.bytes;
+                        binary_data.length = data_value.length;
+                        if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
+                        {
+                            LogError("Error adding body DATA to received message");
+                            message_receiver->decode_error = true;
+                        }
+                    }
                 }
             }
         }
@@ -160,40 +221,44 @@
 static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes)
 {
     AMQP_VALUE result = NULL;
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
 
-    MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
     (void)transfer;
-    if (message_receiver_instance->on_message_received != NULL)
+    if (message_receiver->on_message_received != NULL)
     {
         MESSAGE_HANDLE message = message_create();
         if (message == NULL)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+            LogError("Cannot create message");
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
         }
         else
         {
-            AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance);
+            AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
             if (amqpvalue_decoder == NULL)
             {
-                set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                LogError("Cannot create AMQP value decoder");
+                set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
             }
             else
             {
-                message_receiver_instance->decoded_message = message;
-                message_receiver_instance->decode_error = false;
+                message_receiver->decoded_message = message;
+                message_receiver->decode_error = false;
                 if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
                 {
-                    set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                    LogError("Cannot decode bytes");
+                    set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
                 }
                 else
                 {
-                    if (message_receiver_instance->decode_error)
+                    if (message_receiver->decode_error)
                     {
-                        set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                        LogError("Error decoding message");
+                        set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
                     }
                     else
                     {
-                        result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message);
+                        result = message_receiver->on_message_received(message_receiver->callback_context, message);
                     }
                 }
 
@@ -209,7 +274,7 @@
 
 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
 {
-    MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
     (void)previous_link_state;
 
     switch (new_link_state)
@@ -218,28 +283,28 @@
         break;
 
     case LINK_STATE_ATTACHED:
-        if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
+        if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPEN);
         }
         break;
     case LINK_STATE_DETACHED:
-        if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
-            (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
+        if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
+            (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
         {
             /* User initiated transition, we should be good */
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_IDLE);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_IDLE);
         }
-        else if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
+        else if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
         {
             /* Any other transition must be an error */
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
         }
         break;
     case LINK_STATE_ERROR:
-        if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
+        if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
         }
         break;
     }
@@ -247,21 +312,29 @@
 
 MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context)
 {
-    MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
-    if (result != NULL)
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
+    if (message_receiver == NULL)
     {
-        result->link = link;
-        result->on_message_receiver_state_changed = on_message_receiver_state_changed;
-        result->on_message_receiver_state_changed_context = context;
-        result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
+        LogError("Error creating message receiver");
+    }
+    else
+    {
+        message_receiver->link = link;
+        message_receiver->on_message_receiver_state_changed = on_message_receiver_state_changed;
+        message_receiver->on_message_receiver_state_changed_context = context;
+        message_receiver->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
     }
 
-    return result;
+    return message_receiver;
 }
 
 void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver)
 {
-    if (message_receiver != NULL)
+    if (message_receiver == NULL)
+    {
+        LogError("NULL message_receiver");
+    }
+    else
     {
         (void)messagereceiver_close(message_receiver);
         free(message_receiver);
@@ -274,24 +347,24 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-
-        if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
+        if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING);
-            if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0)
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPENING);
+            if (link_attach(message_receiver->link, on_transfer_received, on_link_state_changed, NULL, message_receiver) != 0)
             {
+                LogError("Link attach failed");
                 result = __FAILURE__;
-                set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
             }
             else
             {
-                message_receiver_instance->on_message_received = on_message_received;
-                message_receiver_instance->callback_context = callback_context;
+                message_receiver->on_message_received = on_message_received;
+                message_receiver->callback_context = callback_context;
 
                 result = 0;
             }
@@ -311,21 +384,21 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-
-        if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
-            (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
+        if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
+            (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_CLOSING);
 
-            if (link_detach(message_receiver_instance->link, true) != 0)
+            if (link_detach(message_receiver->link, true) != 0)
             {
+                LogError("link detach failed");
                 result = __FAILURE__;
-                set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
             }
             else
             {
@@ -347,13 +420,14 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-        if (link_get_name(message_receiver_instance->link, link_name) != 0)
+        if (link_get_name(message_receiver->link, link_name) != 0)
         {
+            LogError("Getting link name failed");
             result = __FAILURE__;
         }
         else
@@ -371,13 +445,14 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-        if (link_get_received_message_id(message_receiver_instance->link, message_id) != 0)
+        if (link_get_received_message_id(message_receiver->link, message_id) != 0)
         {
+            LogError("Failed getting received message Id");
             result = __FAILURE__;
         }
         else
@@ -395,32 +470,36 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-        if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
+        if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
         {
+            LogError("Message received not open");
             result = __FAILURE__;
         }
         else
         {
             const char* my_name;
-            if (link_get_name(message_receiver_instance->link, &my_name) != 0)
+            if (link_get_name(message_receiver->link, &my_name) != 0)
             {
+                LogError("Failed getting link name");
                 result = __FAILURE__;
             }
             else
             {
                 if (strcmp(link_name, my_name) != 0)
                 {
+                    LogError("Link name does not match");
                     result = __FAILURE__;
                 }
                 else
                 {
-                    if (link_send_disposition(message_receiver_instance->link, message_number, delivery_state) != 0)
+                    if (link_send_disposition(message_receiver->link, message_number, delivery_state) != 0)
                     {
+                        LogError("Seding disposition failed");
                         result = __FAILURE__;
                     }
                     else
@@ -437,7 +516,13 @@
 
 void messagereceiver_set_trace(MESSAGE_RECEIVER_HANDLE message_receiver, bool trace_on)
 {
-    /* No tracing is yet implemented for message receiver */
-    (void)message_receiver;
-    (void)trace_on;
+    if (message_receiver == NULL)
+    {
+        LogError("NULL message_receiver");
+    }
+    else
+    {
+        /* No tracing is yet implemented for message receiver */
+        (void)trace_on;
+    }
 }