A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
46:01f7ca900e07
Parent:
45:83b4eda4891c
Child:
47:365a93fdb5bb
--- a/message_sender.c	Thu Jul 12 18:09:41 2018 -0700
+++ b/message_sender.c	Tue Sep 11 11:13:43 2018 -0700
@@ -119,6 +119,7 @@
             else
             {
                 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
+                AMQP_VALUE described = amqpvalue_get_inplace_described_value(delivery_state);
 
                 if (descriptor == NULL)
                 {
@@ -126,24 +127,24 @@
                 }
                 else if (is_accepted_type_by_descriptor(descriptor))
                 {
-                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
+                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, described);
                 }
                 else
                 {
-                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
+                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, described);
                 }
             }
 
             break;
         case LINK_DELIVERY_SETTLE_REASON_SETTLED:
-            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, NULL);
             break;
         case LINK_DELIVERY_SETTLE_REASON_TIMEOUT:
-            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT);
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT, NULL);
             break;
         case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
         default:
-            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL);
             break;
         }
     }
@@ -360,38 +361,46 @@
                 }
                 else
                 {
-                    for (i = 0; i < body_data_count; i++)
+                    if (body_data_count == 0)
                     {
-                        if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
-                        {
-                            LogError("Cannot get body AMQP data %u", (unsigned int)i);
-                            result = SEND_ONE_MESSAGE_ERROR;
-                        }
-                        else
+                        LogError("Body data count is zero");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+                    else
+                    {
+                        for (i = 0; i < body_data_count; i++)
                         {
-                            AMQP_VALUE body_amqp_data;
-                            amqp_binary binary_value;
-                            binary_value.bytes = binary_data.bytes;
-                            binary_value.length = (uint32_t)binary_data.length;
-                            body_amqp_data = amqpvalue_create_data(binary_value);
-                            if (body_amqp_data == NULL)
+                            if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                             {
-                                LogError("Cannot create body AMQP data");
+                                LogError("Cannot get body AMQP data %u", (unsigned int)i);
                                 result = SEND_ONE_MESSAGE_ERROR;
                             }
                             else
                             {
-                                if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
+                                AMQP_VALUE body_amqp_data;
+                                amqp_binary binary_value;
+                                binary_value.bytes = binary_data.bytes;
+                                binary_value.length = (uint32_t)binary_data.length;
+                                body_amqp_data = amqpvalue_create_data(binary_value);
+                                if (body_amqp_data == NULL)
                                 {
-                                    LogError("Cannot get body AMQP data encoded size");
+                                    LogError("Cannot create body AMQP data");
                                     result = SEND_ONE_MESSAGE_ERROR;
                                 }
                                 else
                                 {
-                                    total_encoded_size += encoded_size;
+                                    if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
+                                    {
+                                        LogError("Cannot get body AMQP data encoded size");
+                                        result = SEND_ONE_MESSAGE_ERROR;
+                                    }
+                                    else
+                                    {
+                                        total_encoded_size += encoded_size;
+                                    }
+
+                                    amqpvalue_destroy(body_amqp_data);
                                 }
-
-                                amqpvalue_destroy(body_amqp_data);
                             }
                         }
                     }
@@ -520,7 +529,7 @@
                     LINK_TRANSFER_RESULT link_transfer_error;
                     MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
                     message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
-                    
+
                     transfer_async_operation = link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, &link_transfer_error, message_with_callback->timeout);
                     if (transfer_async_operation == NULL)
                     {
@@ -612,7 +621,7 @@
 
                 if (on_message_send_complete != NULL)
                 {
-                    on_message_send_complete(context, MESSAGE_SEND_ERROR);
+                    on_message_send_complete(context, MESSAGE_SEND_ERROR, NULL);
                 }
 
                 i = message_sender->message_count;
@@ -650,7 +659,7 @@
         MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
         if (message_with_callback->on_message_send_complete != NULL)
         {
-            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL);
         }
 
         if (message_with_callback->message != NULL)
@@ -689,9 +698,9 @@
         if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
             (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
         {
-            /* User initiated transition, we should be good */
+            /* switch to closing so that no more requests should be accepted */
+            indicate_all_messages_as_error(message_sender);
             set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE);
-            indicate_all_messages_as_error(message_sender);
         }
         else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
         {
@@ -702,8 +711,8 @@
     case LINK_STATE_ERROR:
         if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
         {
+            indicate_all_messages_as_error(message_sender);
             set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
-            indicate_all_messages_as_error(message_sender);
         }
         break;
     }
@@ -744,8 +753,8 @@
     }
     else
     {
+        indicate_all_messages_as_error(message_sender);
         (void)messagesender_close(message_sender);
-        indicate_all_messages_as_error(message_sender);
 
         free(message_sender);
     }
@@ -825,7 +834,7 @@
     MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation);
     if (message_with_callback->on_message_send_complete != NULL)
     {
-        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED);
+        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED, NULL);
     }
 
     remove_pending_message(message_with_callback->message_sender, send_operation);