A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
34:6be9c2058664
Parent:
33:08b53020ff0d
Child:
35:d0bed2404ee9
--- a/message_sender.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/message_sender.c	Sat Oct 21 20:12:19 2017 +0000
@@ -7,8 +7,10 @@
 #include "azure_c_shared_utility/optimize_size.h"
 #include "azure_c_shared_utility/gballoc.h"
 #include "azure_c_shared_utility/xlogging.h"
+#include "azure_c_shared_utility/tickcounter.h"
 #include "azure_uamqp_c/message_sender.h"
 #include "azure_uamqp_c/amqpvalue_to_string.h"
+#include "azure_uamqp_c/async_operation.h"
 
 typedef enum MESSAGE_SEND_STATE_TAG
 {
@@ -30,62 +32,66 @@
     void* context;
     MESSAGE_SENDER_HANDLE message_sender;
     MESSAGE_SEND_STATE message_send_state;
+    tickcounter_ms_t timeout;
 } MESSAGE_WITH_CALLBACK;
 
+DEFINE_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK);
+
 typedef struct MESSAGE_SENDER_INSTANCE_TAG
 {
     LINK_HANDLE link;
     size_t message_count;
-    MESSAGE_WITH_CALLBACK** messages;
+    ASYNC_OPERATION_HANDLE* messages;
     MESSAGE_SENDER_STATE message_sender_state;
     ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
     void* on_message_sender_state_changed_context;
     unsigned int is_trace_on : 1;
 } MESSAGE_SENDER_INSTANCE;
 
-static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
+static void remove_pending_message_by_index(MESSAGE_SENDER_HANDLE message_sender, size_t index)
 {
-    MESSAGE_WITH_CALLBACK** new_messages;
+    ASYNC_OPERATION_HANDLE* new_messages;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[index]);
 
-    if (message_sender_instance->messages[index]->message != NULL)
+    if (message_with_callback->message != NULL)
     {
-        message_destroy(message_sender_instance->messages[index]->message);
-        message_sender_instance->messages[index]->message = NULL;
+        message_destroy(message_with_callback->message);
+        message_with_callback->message = NULL;
     }
 
-    free(message_sender_instance->messages[index]);
+    async_operation_destroy(message_sender->messages[index]);
 
-    if (message_sender_instance->message_count - index > 1)
+    if (message_sender->message_count - index > 1)
     {
-        (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
+        (void)memmove(&message_sender->messages[index], &message_sender->messages[index + 1], sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count - index - 1));
     }
 
-    message_sender_instance->message_count--;
+    message_sender->message_count--;
 
-    if (message_sender_instance->message_count > 0)
+    if (message_sender->message_count > 0)
     {
-        new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
+        new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count));
         if (new_messages != NULL)
         {
-            message_sender_instance->messages = new_messages;
+            message_sender->messages = new_messages;
         }
     }
     else
     {
-        free(message_sender_instance->messages);
-        message_sender_instance->messages = NULL;
+        free(message_sender->messages);
+        message_sender->messages = NULL;
     }
 }
 
-static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
+static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send)
 {
     size_t i;
 
-    for (i = 0; i < message_sender_instance->message_count; i++)
+    for (i = 0; i < message_sender->message_count; i++)
     {
-        if (message_sender_instance->messages[i] == message_with_callback)
+        if (message_sender->messages[i] == pending_send)
         {
-            remove_pending_message_by_index(message_sender_instance, i);
+            remove_pending_message_by_index(message_sender, i);
             break;
         }
     }
@@ -93,8 +99,9 @@
 
 static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
 {
-    MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+    ASYNC_OPERATION_HANDLE pending_send = (ASYNC_OPERATION_HANDLE)context;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
     (void)delivery_no;
 
     if (message_with_callback->on_message_send_complete != NULL)
@@ -128,6 +135,9 @@
         case LINK_DELIVERY_SETTLE_REASON_SETTLED:
             message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
             break;
+        case LINK_DELIVERY_SETTLE_REASON_TIMEOUT:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT);
+            break;
         case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
         default:
             message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
@@ -135,7 +145,7 @@
         }
     }
 
-    remove_pending_message(message_sender_instance, message_with_callback);
+    remove_pending_message(message_sender, pending_send);
 }
 
 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
@@ -146,18 +156,18 @@
     return 0;
 }
 
-static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
+static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender, const char* name, AMQP_VALUE value)
 {
 #ifdef NO_LOGGING
-    UNUSED(message_sender_instance);
+    UNUSED(message_sender);
     UNUSED(name);
     UNUSED(value);
 #else
-    if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1)
+    if (xlogging_get_log_function() != NULL && message_sender->is_trace_on == 1)
     {
         char* value_as_string = NULL;
-        LOG(AZ_LOG_TRACE, 0, "%s", name);
-        LOG(AZ_LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
+        LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL(name));
+        LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL((value_as_string = amqpvalue_to_string(value))));
         if (value_as_string != NULL)
         {
             free(value_as_string);
@@ -166,7 +176,7 @@
 #endif
 }
 
-static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
+static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
 {
     SEND_ONE_MESSAGE_RESULT result;
 
@@ -193,51 +203,112 @@
         AMQP_VALUE body_amqp_value = NULL;
         size_t body_data_count = 0;
         AMQP_VALUE msg_annotations = NULL;
+        bool is_error = false;
 
-        message_get_header(message, &header);
-        header_amqp_value = amqpvalue_create_header(header);
-        if (header != NULL)
+        // message header
+        if ((message_get_header(message, &header) == 0) &&
+            (header != NULL))
         {
-            amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
-            total_encoded_size += encoded_size;
+            header_amqp_value = amqpvalue_create_header(header);
+            if (header_amqp_value == NULL)
+            {
+                LogError("Cannot create header AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain header encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+
         }
 
         // message annotations
-        if ((message_get_message_annotations(message, &msg_annotations) == 0) &&
+        if ((!is_error) &&
+            (message_get_message_annotations(message, &msg_annotations) == 0) &&
             (msg_annotations != NULL))
         {
-            amqpvalue_get_encoded_size(msg_annotations, &encoded_size);
-            total_encoded_size += encoded_size;
+            if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
+            {
+                LogError("Cannot obtain message annotations encoded size");
+                is_error = true;
+            }
+            else
+            {
+                total_encoded_size += encoded_size;
+            }
         }
 
         // properties
-        message_get_properties(message, &properties);
-        properties_amqp_value = amqpvalue_create_properties(properties);
-        if (properties != NULL)
+        if ((!is_error) &&
+            (message_get_properties(message, &properties) == 0) &&
+            (properties != NULL))
         {
-            amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
-            total_encoded_size += encoded_size;
+            properties_amqp_value = amqpvalue_create_properties(properties);
+            if (properties_amqp_value == NULL)
+            {
+                LogError("Cannot create message properties AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain message properties encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
         }
 
         // application properties
-        message_get_application_properties(message, &application_properties);
-        if (application_properties != NULL)
+        if ((!is_error) &&
+            (message_get_application_properties(message, &application_properties) == 0) &&
+            (application_properties != NULL))
         {
             application_properties_value = amqpvalue_create_application_properties(application_properties);
-            amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
-            total_encoded_size += encoded_size;
+            if (application_properties_value == NULL)
+            {
+                LogError("Cannot create application properties AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain application properties encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+        }
+
+        if (is_error)
+        {
+            result = SEND_ONE_MESSAGE_ERROR;
         }
         else
         {
-            application_properties_value = NULL;
-        }
-
-        result = SEND_ONE_MESSAGE_OK;
+            result = SEND_ONE_MESSAGE_OK;
 
-        // body - amqp data
-        switch (message_body_type)
-        {
+            // body - amqp data
+            switch (message_body_type)
+            {
             default:
+                LogError("Unknown body type");
                 result = SEND_ONE_MESSAGE_ERROR;
                 break;
 
@@ -246,19 +317,28 @@
                 AMQP_VALUE message_body_amqp_value;
                 if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
                 {
+                    LogError("Cannot obtain AMQP value from body");
                     result = SEND_ONE_MESSAGE_ERROR;
                 }
                 else
                 {
                     body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
-                    if ((body_amqp_value == NULL) ||
-                        (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
+                    if (body_amqp_value == NULL)
                     {
+                        LogError("Cannot create body AMQP value");
                         result = SEND_ONE_MESSAGE_ERROR;
                     }
                     else
                     {
-                        total_encoded_size += encoded_size;
+                        if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
+                        {
+                            LogError("Cannot get body AMQP value encoded size");
+                            result = SEND_ONE_MESSAGE_ERROR;
+                        }
+                        else
+                        {
+                            total_encoded_size += encoded_size;
+                        }
                     }
                 }
 
@@ -272,6 +352,7 @@
 
                 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
                 {
+                    LogError("Cannot get body AMQP data count");
                     result = SEND_ONE_MESSAGE_ERROR;
                 }
                 else
@@ -280,6 +361,7 @@
                     {
                         if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                         {
+                            LogError("Cannot get body AMQP data %u", (unsigned int)i);
                             result = SEND_ONE_MESSAGE_ERROR;
                         }
                         else
@@ -291,12 +373,14 @@
                             body_amqp_data = amqpvalue_create_data(binary_value);
                             if (body_amqp_data == NULL)
                             {
+                                LogError("Cannot create body AMQP data");
                                 result = SEND_ONE_MESSAGE_ERROR;
                             }
                             else
                             {
                                 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
                                 {
+                                    LogError("Cannot get body AMQP data encoded size");
                                     result = SEND_ONE_MESSAGE_ERROR;
                                 }
                                 else
@@ -311,139 +395,151 @@
                 }
                 break;
             }
-        }
-
-        if (result == 0)
-        {
-            void* data_bytes = malloc(total_encoded_size);
-            PAYLOAD payload;
-            payload.bytes = (const unsigned char*)data_bytes;
-            payload.length = 0;
-            result = SEND_ONE_MESSAGE_OK;
-
-            if (header != NULL)
-            {
-                if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
-                {
-                    result = SEND_ONE_MESSAGE_ERROR;
-                }
-
-                log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
-            }
-
-            if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
-            {
-                if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
-                {
-                    result = SEND_ONE_MESSAGE_ERROR;
-                }
-
-                log_message_chunk(message_sender_instance, "Message Annotations:", msg_annotations);
             }
 
-            if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
+            if (result == 0)
             {
-                if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
+                void* data_bytes = malloc(total_encoded_size);
+                PAYLOAD payload;
+                payload.bytes = (const unsigned char*)data_bytes;
+                payload.length = 0;
+                result = SEND_ONE_MESSAGE_OK;
+
+                if (header != NULL)
                 {
-                    result = SEND_ONE_MESSAGE_ERROR;
+                    if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode header value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Header:", header_amqp_value);
                 }
 
-                log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
-            }
+                if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
+                {
+                    if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode message annotations value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
 
-            if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
-            {
-                if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
-                {
-                    result = SEND_ONE_MESSAGE_ERROR;
+                    log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
                 }
 
-                log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
-            }
-
-            if (result == SEND_ONE_MESSAGE_OK)
-            {
-                switch (message_body_type)
+                if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
                 {
-                default:
-                    result = SEND_ONE_MESSAGE_ERROR;
-                    break;
-
-                case MESSAGE_BODY_TYPE_VALUE:
-                {
-                    if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
+                    if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
                     {
+                        LogError("Cannot encode message properties value");
                         result = SEND_ONE_MESSAGE_ERROR;
                     }
 
-                    log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
-                    break;
+                    log_message_chunk(message_sender, "Properties:", properties_amqp_value);
                 }
-                case MESSAGE_BODY_TYPE_DATA:
+
+                if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
                 {
-                    BINARY_DATA binary_data;
-                    size_t i;
+                    if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode application properties value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Application properties:", application_properties_value);
+                }
 
-                    for (i = 0; i < body_data_count; i++)
+                if (result == SEND_ONE_MESSAGE_OK)
+                {
+                    switch (message_body_type)
                     {
-                        if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+                    default:
+                        LogError("Unknown message type");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                        break;
+
+                    case MESSAGE_BODY_TYPE_VALUE:
+                    {
+                        if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
                         {
+                            LogError("Cannot encode body AMQP value");
                             result = SEND_ONE_MESSAGE_ERROR;
                         }
-                        else
+
+                        log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
+                        break;
+                    }
+                    case MESSAGE_BODY_TYPE_DATA:
+                    {
+                        BINARY_DATA binary_data;
+                        size_t i;
+
+                        for (i = 0; i < body_data_count; i++)
                         {
-                            AMQP_VALUE body_amqp_data;
-                            amqp_binary binary_value;
-                            binary_value.bytes = binary_data.bytes;
-                            binary_value.length = (uint32_t)binary_data.length;
-                            body_amqp_data = amqpvalue_create_data(binary_value);
-                            if (body_amqp_data == NULL)
+                            if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                             {
+                                LogError("Cannot get AMQP data %u", (unsigned int)i);
                                 result = SEND_ONE_MESSAGE_ERROR;
                             }
                             else
                             {
-                                if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+                                AMQP_VALUE body_amqp_data;
+                                amqp_binary binary_value;
+                                binary_value.bytes = binary_data.bytes;
+                                binary_value.length = (uint32_t)binary_data.length;
+                                body_amqp_data = amqpvalue_create_data(binary_value);
+                                if (body_amqp_data == NULL)
                                 {
+                                    LogError("Cannot create body AMQP data %u", (unsigned int)i);
                                     result = SEND_ONE_MESSAGE_ERROR;
-                                    break;
                                 }
+                                else
+                                {
+                                    if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+                                    {
+                                        LogError("Cannot encode body AMQP data %u", (unsigned int)i);
+                                        result = SEND_ONE_MESSAGE_ERROR;
+                                        break;
+                                    }
 
-                                amqpvalue_destroy(body_amqp_data);
+                                    amqpvalue_destroy(body_amqp_data);
+                                }
                             }
                         }
+                        break;
                     }
-                    break;
+                    }
                 }
-                }
-            }
 
-            if (result == SEND_ONE_MESSAGE_OK)
-            {
-                message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
-                switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
+                if (result == SEND_ONE_MESSAGE_OK)
                 {
-                default:
-                case LINK_TRANSFER_ERROR:
-                    result = SEND_ONE_MESSAGE_ERROR;
-                    break;
+                    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+                    switch (link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, message_with_callback->timeout))
+                    {
+                    default:
+                    case LINK_TRANSFER_ERROR:
+                        LogError("Error in link transfer");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                        break;
 
-                case LINK_TRANSFER_BUSY:
-                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-                    result = SEND_ONE_MESSAGE_BUSY;
-                    break;
+                    case LINK_TRANSFER_BUSY:
+                        message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                        result = SEND_ONE_MESSAGE_BUSY;
+                        break;
 
-                case LINK_TRANSFER_OK:
-                    result = SEND_ONE_MESSAGE_OK;
-                    break;
+                    case LINK_TRANSFER_OK:
+                        result = SEND_ONE_MESSAGE_OK;
+                        break;
+                    }
                 }
-            }
 
-            free(data_bytes);
+                free(data_bytes);
 
-            if (body_amqp_value != NULL)
-            {
-                amqpvalue_destroy(body_amqp_value);
+                if (body_amqp_value != NULL)
+                {
+                    amqpvalue_destroy(body_amqp_value);
+                }
             }
         }
 
@@ -466,14 +562,17 @@
         {
             amqpvalue_destroy(application_properties);
         }
+
         if (application_properties_value != NULL)
         {
             amqpvalue_destroy(application_properties_value);
         }
+
         if (properties_amqp_value != NULL)
         {
             amqpvalue_destroy(properties_amqp_value);
         }
+
         if (properties != NULL)
         {
             properties_destroy(properties);
@@ -483,33 +582,37 @@
     return result;
 }
 
-static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
+static void send_all_pending_messages(MESSAGE_SENDER_HANDLE message_sender)
 {
     size_t i;
 
-    for (i = 0; i < message_sender_instance->message_count; i++)
+    for (i = 0; i < message_sender->message_count; i++)
     {
-        if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
+        MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+        if (message_with_callback->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
         {
-            switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
+            switch (send_one_message(message_sender, message_sender->messages[i], message_with_callback->message))
             {
             default:
+                LogError("Invalid send one message result");
+                break;
+
             case SEND_ONE_MESSAGE_ERROR:
             {
-                ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
-                void* context = message_sender_instance->messages[i]->context;
-                remove_pending_message_by_index(message_sender_instance, i);
+                ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_with_callback->on_message_send_complete;
+                void* context = message_with_callback->context;
+                remove_pending_message_by_index(message_sender, i);
 
                 if (on_message_send_complete != NULL)
                 {
                     on_message_send_complete(context, MESSAGE_SEND_ERROR);
                 }
 
-                i = message_sender_instance->message_count;
+                i = message_sender->message_count;
                 break;
             }
             case SEND_ONE_MESSAGE_BUSY:
-                i = message_sender_instance->message_count + 1;
+                i = message_sender->message_count + 1;
                 break;
 
             case SEND_ONE_MESSAGE_OK:
@@ -521,46 +624,47 @@
     }
 }
 
-static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
+static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender, MESSAGE_SENDER_STATE new_state)
 {
-    MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
-    message_sender_instance->message_sender_state = new_state;
-    if (message_sender_instance->on_message_sender_state_changed != NULL)
+    MESSAGE_SENDER_STATE previous_state = message_sender->message_sender_state;
+    message_sender->message_sender_state = new_state;
+    if (message_sender->on_message_sender_state_changed != NULL)
     {
-        message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
+        message_sender->on_message_sender_state_changed(message_sender->on_message_sender_state_changed_context, new_state, previous_state);
     }
 }
 
-static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
+static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender)
 {
     size_t i;
 
-    for (i = 0; i < message_sender_instance->message_count; i++)
+    for (i = 0; i < message_sender->message_count; i++)
     {
-        if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
+        MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+        if (message_with_callback->on_message_send_complete != NULL)
         {
-            message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
         }
 
-        if (message_sender_instance->messages[i]->message != NULL)
+        if (message_with_callback->message != NULL)
         {
-            message_destroy(message_sender_instance->messages[i]->message);
+            message_destroy(message_with_callback->message);
         }
-        free(message_sender_instance->messages[i]);
+        async_operation_destroy(message_sender->messages[i]);
     }
 
-    if (message_sender_instance->messages != NULL)
+    if (message_sender->messages != NULL)
     {
-        message_sender_instance->message_count = 0;
+        message_sender->message_count = 0;
 
-        free(message_sender_instance->messages);
-        message_sender_instance->messages = NULL;
+        free(message_sender->messages);
+        message_sender->messages = NULL;
     }
 }
 
 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
 {
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)context;
     (void)previous_link_state;
 
     switch (new_link_state)
@@ -569,30 +673,30 @@
         break;
 
     case LINK_STATE_ATTACHED:
-        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPEN);
         }
         break;
     case LINK_STATE_DETACHED:
-        if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
-            (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
+        if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
+            (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
         {
             /* User initiated transition, we should be good */
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
-            indicate_all_messages_as_error(message_sender_instance);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE);
+            indicate_all_messages_as_error(message_sender);
         }
-        else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
+        else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
         {
             /* Any other transition must be an error */
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
         }
         break;
     case LINK_STATE_ERROR:
-        if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
+        if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
-            indicate_all_messages_as_error(message_sender_instance);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+            indicate_all_messages_as_error(message_sender);
         }
         break;
     }
@@ -600,36 +704,41 @@
 
 static void on_link_flow_on(void* context)
 {
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
-    send_all_pending_messages(message_sender_instance);
+    MESSAGE_SENDER_HANDLE message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+    send_all_pending_messages(message_sender);
 }
 
 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
 {
-    MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
-    if (result != NULL)
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+    if (message_sender == NULL)
+    {
+        LogError("Failed allocating message sender");
+    }
+    else
     {
-        result->messages = NULL;
-        result->message_count = 0;
-        result->link = link;
-        result->on_message_sender_state_changed = on_message_sender_state_changed;
-        result->on_message_sender_state_changed_context = context;
-        result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
-        result->is_trace_on = 0;
+        message_sender->messages = NULL;
+        message_sender->message_count = 0;
+        message_sender->link = link;
+        message_sender->on_message_sender_state_changed = on_message_sender_state_changed;
+        message_sender->on_message_sender_state_changed_context = context;
+        message_sender->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
+        message_sender->is_trace_on = 0;
     }
 
-    return result;
+    return message_sender;
 }
 
 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
 {
-    if (message_sender != NULL)
+    if (message_sender == NULL)
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
-        messagesender_close(message_sender_instance);
-
-        indicate_all_messages_as_error(message_sender_instance);
+        LogError("NULL message_sender");
+    }
+    else
+    {
+        (void)messagesender_close(message_sender);
+        indicate_all_messages_as_error(message_sender);
 
         free(message_sender);
     }
@@ -641,19 +750,19 @@
 
     if (message_sender == NULL)
     {
+        LogError("NULL message_sender");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
-        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
-            if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPENING);
+            if (link_attach(message_sender->link, NULL, on_link_state_changed, on_link_flow_on, message_sender) != 0)
             {
+                LogError("attach link failed");
                 result = __FAILURE__;
-                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+                set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
             }
             else
             {
@@ -675,20 +784,20 @@
 
     if (message_sender == NULL)
     {
+        LogError("NULL message_sender");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
-        if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
-            (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
+        if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
+            (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
-            if (link_detach(message_sender_instance->link, true) != 0)
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_CLOSING);
+            if (link_detach(message_sender->link, true) != 0)
             {
+                LogError("Detaching link failed");
                 result = __FAILURE__;
-                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+                set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
             }
             else
             {
@@ -704,49 +813,63 @@
     return result;
 }
 
-int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
+static void messagesender_send_cancel_handler(ASYNC_OPERATION_HANDLE send_operation)
 {
-    int result;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation);
+    if (message_with_callback->on_message_send_complete != NULL)
+    {
+        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED);
+    }
+
+    remove_pending_message(message_with_callback->message_sender, send_operation);
+}
+
+ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout)
+{
+    ASYNC_OPERATION_HANDLE result;
 
     if ((message_sender == NULL) ||
         (message == NULL))
     {
-        result = __FAILURE__;
+        LogError("Bad parameters: message_sender = %p, message = %p");
+        result = NULL;
     }
     else
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
         {
-            result = __FAILURE__;
+            LogError("Message sender in ERROR state");
+            result = NULL;
         }
         else
         {
-            MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)malloc(sizeof(MESSAGE_WITH_CALLBACK));
-            if (message_with_callback == NULL)
+            result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler);
+            if (result == NULL)
             {
-                result = __FAILURE__;
+                LogError("Failed allocating context for send");
             }
             else
             {
-                MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
+                MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result);
+                ASYNC_OPERATION_HANDLE* new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count + 1));
                 if (new_messages == NULL)
                 {
-                    free(message_with_callback);
-                    result = __FAILURE__;
+                    LogError("Failed allocating memory for pending sends");
+                    async_operation_destroy(result);
+                    result = NULL;
                 }
                 else
                 {
-                    result = 0;
-
-                    message_sender_instance->messages = new_messages;
-                    if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
+                    message_with_callback->timeout = timeout;
+                    message_sender->messages = new_messages;
+                    if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
                     {
                         message_with_callback->message = message_clone(message);
                         if (message_with_callback->message == NULL)
                         {
-                            free(message_with_callback);
-                            result = __FAILURE__;
+                            LogError("Cannot clone message for placing it in the pending sends list");
+                            async_operation_destroy(result);
+                            result = NULL;
                         }
 
                         message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
@@ -757,42 +880,42 @@
                         message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
                     }
 
-                    if (result == 0)
+                    if (result != NULL)
                     {
                         message_with_callback->on_message_send_complete = on_message_send_complete;
                         message_with_callback->context = callback_context;
-                        message_with_callback->message_sender = message_sender_instance;
+                        message_with_callback->message_sender = message_sender;
 
-                        message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
-                        message_sender_instance->message_count++;
+                        message_sender->messages[message_sender->message_count] = result;
+                        message_sender->message_count++;
 
-                        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
+                        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
                         {
-                            switch (send_one_message(message_sender_instance, message_with_callback, message))
+                            switch (send_one_message(message_sender, result, message))
                             {
                             default:
                             case SEND_ONE_MESSAGE_ERROR:
-                            
-                                remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
-                                result = __FAILURE__;
+                                LogError("Error sending message");
+                                remove_pending_message_by_index(message_sender, message_sender->message_count - 1);
+                                async_operation_destroy(result);
+                                result = NULL;
                                 break;
 
                             case SEND_ONE_MESSAGE_BUSY:
                                 message_with_callback->message = message_clone(message);
                                 if (message_with_callback->message == NULL)
                                 {
-                                    free(message_with_callback);
-                                    result = __FAILURE__;
+                                    LogError("Error cloning message for placing it in the pending sends list");
+                                    async_operation_destroy(result);
+                                    result = NULL;
                                 }
                                 else
                                 {
                                     message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-                                    result = 0;
                                 }
                                 break;
 
                             case SEND_ONE_MESSAGE_OK:
-                                result = 0;
                                 break;
                             }
                         }
@@ -801,14 +924,18 @@
             }
         }
     }
+
     return result;
 }
 
 void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
 {
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-    if (message_sender_instance != NULL)
+    if (message_sender == NULL)
     {
-        message_sender_instance->is_trace_on = traceOn ? 1 : 0;
+        LogError("NULL message_sender");
+    }
+    else
+    {
+        message_sender->is_trace_on = traceOn ? 1 : 0;
     }
 }