A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
23:1111ee8bcba4
Parent:
21:f9c433d8e6ca
Child:
24:2c59c2d43ebf
--- a/message_sender.c	Thu Apr 06 14:11:27 2017 -0700
+++ b/message_sender.c	Fri Apr 21 14:50:32 2017 -0700
@@ -91,7 +91,7 @@
     }
 }
 
-static void on_delivery_settled(void* context, delivery_number delivery_no, AMQP_VALUE delivery_state)
+static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
 {
     MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
     MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
@@ -99,26 +99,39 @@
 
     if (message_with_callback->on_message_send_complete != NULL)
     {
-        AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
-        if ((descriptor == NULL) && (delivery_state != NULL))
-        {
-            LogError("Error getting descriptor for delivery state");
-        }
-        else
+        switch (reason)
         {
-            MESSAGE_SEND_RESULT message_send_result;
-
-            if ((delivery_state == NULL) ||
-                (is_accepted_type_by_descriptor(descriptor)))
+        case LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED:
+            if (delivery_state == NULL)
             {
-                message_send_result = MESSAGE_SEND_OK;
+                LogError("delivery state not provided");
             }
             else
             {
-                message_send_result = MESSAGE_SEND_ERROR;
+                AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
+
+                if (descriptor == NULL)
+                {
+                    LogError("Error getting descriptor for delivery state");
+                }
+                else if (is_accepted_type_by_descriptor(descriptor))
+                {
+                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
+                }
+                else
+                {
+                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
+                }
             }
 
-            message_with_callback->on_message_send_complete(message_with_callback->context, message_send_result);
+            break;
+        case LINK_DELIVERY_SETTLE_REASON_SETTLED:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
+            break;
+        case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
+        default:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
+            break;
         }
     }
 
@@ -207,12 +220,16 @@
 
         // application properties
         message_get_application_properties(message, &application_properties);
-        application_properties_value = amqpvalue_create_application_properties(application_properties);
         if (application_properties != NULL)
         {
+            application_properties_value = amqpvalue_create_application_properties(application_properties);
             amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
             total_encoded_size += encoded_size;
         }
+        else
+        {
+            application_properties_value = NULL;
+        }
 
         result = SEND_ONE_MESSAGE_OK;
 
@@ -298,7 +315,7 @@
         {
             void* data_bytes = malloc(total_encoded_size);
             PAYLOAD payload;
-            payload.bytes = data_bytes;
+            payload.bytes = (const unsigned char*)data_bytes;
             payload.length = 0;
             result = SEND_ONE_MESSAGE_OK;
 
@@ -427,10 +444,22 @@
             }
         }
 
-        amqpvalue_destroy(application_properties);
-        amqpvalue_destroy(application_properties_value);
-        amqpvalue_destroy(properties_amqp_value);
-        properties_destroy(properties);
+        if (application_properties != NULL)
+        {
+            amqpvalue_destroy(application_properties);
+        }
+        if (application_properties_value != NULL)
+        {
+            amqpvalue_destroy(application_properties_value);
+        }
+        if (properties_amqp_value != NULL)
+        {
+            amqpvalue_destroy(properties_amqp_value);
+        }
+        if (properties != NULL)
+        {
+            properties_destroy(properties);
+        }
     }
 
     return result;
@@ -453,10 +482,10 @@
                 void* context = message_sender_instance->messages[i]->context;
                 remove_pending_message_by_index(message_sender_instance, i);
 
-				if (on_message_send_complete != NULL)
-				{
-					on_message_send_complete(context, MESSAGE_SEND_ERROR);
-				}
+                if (on_message_send_complete != NULL)
+                {
+                    on_message_send_complete(context, MESSAGE_SEND_ERROR);
+                }
 
                 i = message_sender_instance->message_count;
                 break;
@@ -529,8 +558,8 @@
             (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
         {
             /* User initiated transition, we should be good */
+            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
             indicate_all_messages_as_error(message_sender_instance);
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
         }
         else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
         {
@@ -541,8 +570,8 @@
     case LINK_STATE_ERROR:
         if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
         {
+            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
             indicate_all_messages_as_error(message_sender_instance);
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
         }
         break;
     }
@@ -556,7 +585,7 @@
 
 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
 {
-    MESSAGE_SENDER_INSTANCE* result = malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+    MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
     if (result != NULL)
     {
         result->messages = NULL;
@@ -722,7 +751,7 @@
                             {
                             default:
                             case SEND_ONE_MESSAGE_ERROR:
-							
+                            
                                 remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
                                 result = __FAILURE__;
                                 break;