Xin Zhang / azure-iot-c-sdk-f767zi

Dependents:   samplemqtt

Revision:
0:f7f1f0d76dd6
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/uamqp/src/message_sender.c	Thu Aug 23 06:52:14 2018 +0000
@@ -0,0 +1,948 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <string.h>
+#include "azure_c_shared_utility/optimize_size.h"
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/xlogging.h"
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_uamqp_c/link.h"
+#include "azure_uamqp_c/message.h"
+#include "azure_uamqp_c/message_sender.h"
+#include "azure_uamqp_c/amqpvalue_to_string.h"
+#include "azure_uamqp_c/async_operation.h"
+#include "azure_uamqp_c/amqp_definitions.h"
+
+typedef enum MESSAGE_SEND_STATE_TAG
+{
+    MESSAGE_SEND_STATE_NOT_SENT,
+    MESSAGE_SEND_STATE_PENDING
+} MESSAGE_SEND_STATE;
+
+typedef enum SEND_ONE_MESSAGE_RESULT_TAG
+{
+    SEND_ONE_MESSAGE_OK,
+    SEND_ONE_MESSAGE_ERROR,
+    SEND_ONE_MESSAGE_BUSY
+} SEND_ONE_MESSAGE_RESULT;
+
+typedef struct MESSAGE_WITH_CALLBACK_TAG
+{
+    MESSAGE_HANDLE message;
+    ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
+    void* context;
+    MESSAGE_SENDER_HANDLE message_sender;
+    MESSAGE_SEND_STATE message_send_state;
+    tickcounter_ms_t timeout;
+} MESSAGE_WITH_CALLBACK;
+
+DEFINE_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK);
+
+typedef struct MESSAGE_SENDER_INSTANCE_TAG
+{
+    LINK_HANDLE link;
+    size_t message_count;
+    ASYNC_OPERATION_HANDLE* messages;
+    MESSAGE_SENDER_STATE message_sender_state;
+    ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
+    void* on_message_sender_state_changed_context;
+    unsigned int is_trace_on : 1;
+} MESSAGE_SENDER_INSTANCE;
+
+static void remove_pending_message_by_index(MESSAGE_SENDER_HANDLE message_sender, size_t index)
+{
+    ASYNC_OPERATION_HANDLE* new_messages;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[index]);
+
+    if (message_with_callback->message != NULL)
+    {
+        message_destroy(message_with_callback->message);
+        message_with_callback->message = NULL;
+    }
+
+    async_operation_destroy(message_sender->messages[index]);
+
+    if (message_sender->message_count - index > 1)
+    {
+        (void)memmove(&message_sender->messages[index], &message_sender->messages[index + 1], sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count - index - 1));
+    }
+
+    message_sender->message_count--;
+
+    if (message_sender->message_count > 0)
+    {
+        new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count));
+        if (new_messages != NULL)
+        {
+            message_sender->messages = new_messages;
+        }
+    }
+    else
+    {
+        free(message_sender->messages);
+        message_sender->messages = NULL;
+    }
+}
+
+static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send)
+{
+    size_t i;
+
+    for (i = 0; i < message_sender->message_count; i++)
+    {
+        if (message_sender->messages[i] == pending_send)
+        {
+            remove_pending_message_by_index(message_sender, i);
+            break;
+        }
+    }
+}
+
+static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
+{
+    ASYNC_OPERATION_HANDLE pending_send = (ASYNC_OPERATION_HANDLE)context;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+    (void)delivery_no;
+
+    if (message_with_callback->on_message_send_complete != NULL)
+    {
+        switch (reason)
+        {
+        case LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED:
+            if (delivery_state == NULL)
+            {
+                LogError("delivery state not provided");
+            }
+            else
+            {
+                AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
+                AMQP_VALUE described = amqpvalue_get_inplace_described_value(delivery_state);
+
+                if (descriptor == NULL)
+                {
+                    LogError("Error getting descriptor for delivery state");
+                }
+                else if (is_accepted_type_by_descriptor(descriptor))
+                {
+                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, described);
+                }
+                else
+                {
+                    message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, described);
+                }
+            }
+
+            break;
+        case LINK_DELIVERY_SETTLE_REASON_SETTLED:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, NULL);
+            break;
+        case LINK_DELIVERY_SETTLE_REASON_TIMEOUT:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT, NULL);
+            break;
+        case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
+        default:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL);
+            break;
+        }
+    }
+
+    remove_pending_message(message_sender, pending_send);
+}
+
+static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
+{
+    PAYLOAD* payload = (PAYLOAD*)context;
+    (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
+    payload->length += length;
+    return 0;
+}
+
+static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender, const char* name, AMQP_VALUE value)
+{
+#ifdef NO_LOGGING
+    (void)message_sender;
+    (void)name;
+    (void)value;
+#else
+    if (xlogging_get_log_function() != NULL && message_sender->is_trace_on == 1)
+    {
+        char* value_as_string = NULL;
+        LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL(name));
+        LOG(AZ_LOG_TRACE, 0, "%s", ((value_as_string = amqpvalue_to_string(value)), P_OR_NULL(value_as_string)));
+        if (value_as_string != NULL)
+        {
+            free(value_as_string);
+        }
+    }
+#endif
+}
+
+static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
+{
+    SEND_ONE_MESSAGE_RESULT result;
+
+    size_t encoded_size;
+    size_t total_encoded_size = 0;
+    MESSAGE_BODY_TYPE message_body_type;
+    message_format message_format;
+
+    if ((message_get_body_type(message, &message_body_type) != 0) ||
+        (message_get_message_format(message, &message_format) != 0))
+    {
+        LogError("Failure getting message body type and/or message format");
+        result = SEND_ONE_MESSAGE_ERROR;
+    }
+    else
+    {
+        // header
+        HEADER_HANDLE header = NULL;
+        AMQP_VALUE header_amqp_value = NULL;
+        PROPERTIES_HANDLE properties = NULL;
+        AMQP_VALUE properties_amqp_value = NULL;
+        AMQP_VALUE application_properties = NULL;
+        AMQP_VALUE application_properties_value = NULL;
+        AMQP_VALUE body_amqp_value = NULL;
+        size_t body_data_count = 0;
+        AMQP_VALUE msg_annotations = NULL;
+        bool is_error = false;
+
+        // message header
+        if ((message_get_header(message, &header) == 0) &&
+            (header != NULL))
+        {
+            header_amqp_value = amqpvalue_create_header(header);
+            if (header_amqp_value == NULL)
+            {
+                LogError("Cannot create header AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain header encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+
+        }
+
+        // message annotations
+        if ((!is_error) &&
+            (message_get_message_annotations(message, &msg_annotations) == 0) &&
+            (msg_annotations != NULL))
+        {
+            if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
+            {
+                LogError("Cannot obtain message annotations encoded size");
+                is_error = true;
+            }
+            else
+            {
+                total_encoded_size += encoded_size;
+            }
+        }
+
+        // properties
+        if ((!is_error) &&
+            (message_get_properties(message, &properties) == 0) &&
+            (properties != NULL))
+        {
+            properties_amqp_value = amqpvalue_create_properties(properties);
+            if (properties_amqp_value == NULL)
+            {
+                LogError("Cannot create message properties AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain message properties encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+        }
+
+        // application properties
+        if ((!is_error) &&
+            (message_get_application_properties(message, &application_properties) == 0) &&
+            (application_properties != NULL))
+        {
+            application_properties_value = amqpvalue_create_application_properties(application_properties);
+            if (application_properties_value == NULL)
+            {
+                LogError("Cannot create application properties AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain application properties encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+        }
+
+        if (is_error)
+        {
+            result = SEND_ONE_MESSAGE_ERROR;
+        }
+        else
+        {
+            result = SEND_ONE_MESSAGE_OK;
+
+            // body - amqp data
+            switch (message_body_type)
+            {
+            default:
+                LogError("Unknown body type");
+                result = SEND_ONE_MESSAGE_ERROR;
+                break;
+
+            case MESSAGE_BODY_TYPE_VALUE:
+            {
+                AMQP_VALUE message_body_amqp_value;
+                if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
+                {
+                    LogError("Cannot obtain AMQP value from body");
+                    result = SEND_ONE_MESSAGE_ERROR;
+                }
+                else
+                {
+                    body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
+                    if (body_amqp_value == NULL)
+                    {
+                        LogError("Cannot create body AMQP value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+                    else
+                    {
+                        if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
+                        {
+                            LogError("Cannot get body AMQP value encoded size");
+                            result = SEND_ONE_MESSAGE_ERROR;
+                        }
+                        else
+                        {
+                            total_encoded_size += encoded_size;
+                        }
+                    }
+                }
+
+                break;
+            }
+
+            case MESSAGE_BODY_TYPE_DATA:
+            {
+                BINARY_DATA binary_data;
+                size_t i;
+
+                if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
+                {
+                    LogError("Cannot get body AMQP data count");
+                    result = SEND_ONE_MESSAGE_ERROR;
+                }
+                else
+                {
+                    for (i = 0; i < body_data_count; i++)
+                    {
+                        if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+                        {
+                            LogError("Cannot get body AMQP data %u", (unsigned int)i);
+                            result = SEND_ONE_MESSAGE_ERROR;
+                        }
+                        else
+                        {
+                            AMQP_VALUE body_amqp_data;
+                            amqp_binary binary_value;
+                            binary_value.bytes = binary_data.bytes;
+                            binary_value.length = (uint32_t)binary_data.length;
+                            body_amqp_data = amqpvalue_create_data(binary_value);
+                            if (body_amqp_data == NULL)
+                            {
+                                LogError("Cannot create body AMQP data");
+                                result = SEND_ONE_MESSAGE_ERROR;
+                            }
+                            else
+                            {
+                                if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
+                                {
+                                    LogError("Cannot get body AMQP data encoded size");
+                                    result = SEND_ONE_MESSAGE_ERROR;
+                                }
+                                else
+                                {
+                                    total_encoded_size += encoded_size;
+                                }
+
+                                amqpvalue_destroy(body_amqp_data);
+                            }
+                        }
+                    }
+                }
+                break;
+            }
+            }
+
+            if (result == 0)
+            {
+                void* data_bytes = malloc(total_encoded_size);
+                PAYLOAD payload;
+                payload.bytes = (const unsigned char*)data_bytes;
+                payload.length = 0;
+                result = SEND_ONE_MESSAGE_OK;
+
+                if (header != NULL)
+                {
+                    if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode header value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Header:", header_amqp_value);
+                }
+
+                if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
+                {
+                    if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode message annotations value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
+                }
+
+                if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
+                {
+                    if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode message properties value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Properties:", properties_amqp_value);
+                }
+
+                if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
+                {
+                    if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode application properties value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Application properties:", application_properties_value);
+                }
+
+                if (result == SEND_ONE_MESSAGE_OK)
+                {
+                    switch (message_body_type)
+                    {
+                    default:
+                        LogError("Unknown message type");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                        break;
+
+                    case MESSAGE_BODY_TYPE_VALUE:
+                    {
+                        if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
+                        {
+                            LogError("Cannot encode body AMQP value");
+                            result = SEND_ONE_MESSAGE_ERROR;
+                        }
+
+                        log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
+                        break;
+                    }
+                    case MESSAGE_BODY_TYPE_DATA:
+                    {
+                        BINARY_DATA binary_data;
+                        size_t i;
+
+                        for (i = 0; i < body_data_count; i++)
+                        {
+                            if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+                            {
+                                LogError("Cannot get AMQP data %u", (unsigned int)i);
+                                result = SEND_ONE_MESSAGE_ERROR;
+                            }
+                            else
+                            {
+                                AMQP_VALUE body_amqp_data;
+                                amqp_binary binary_value;
+                                binary_value.bytes = binary_data.bytes;
+                                binary_value.length = (uint32_t)binary_data.length;
+                                body_amqp_data = amqpvalue_create_data(binary_value);
+                                if (body_amqp_data == NULL)
+                                {
+                                    LogError("Cannot create body AMQP data %u", (unsigned int)i);
+                                    result = SEND_ONE_MESSAGE_ERROR;
+                                }
+                                else
+                                {
+                                    if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+                                    {
+                                        LogError("Cannot encode body AMQP data %u", (unsigned int)i);
+                                        result = SEND_ONE_MESSAGE_ERROR;
+                                        break;
+                                    }
+
+                                    amqpvalue_destroy(body_amqp_data);
+                                }
+                            }
+                        }
+                        break;
+                    }
+                    }
+                }
+
+                if (result == SEND_ONE_MESSAGE_OK)
+                {
+                    ASYNC_OPERATION_HANDLE transfer_async_operation;
+                    LINK_TRANSFER_RESULT link_transfer_error;
+                    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+                    
+                    transfer_async_operation = link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, &link_transfer_error, message_with_callback->timeout);
+                    if (transfer_async_operation == NULL)
+                    {
+                        if (link_transfer_error == LINK_TRANSFER_BUSY)
+                        {
+                            message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                            result = SEND_ONE_MESSAGE_BUSY;
+                        }
+                        else
+                        {
+                            LogError("Error in link transfer");
+                            result = SEND_ONE_MESSAGE_ERROR;
+                        }
+                    }
+                    else
+                    {
+                        result = SEND_ONE_MESSAGE_OK;
+                    }
+                }
+
+                free(data_bytes);
+
+                if (body_amqp_value != NULL)
+                {
+                    amqpvalue_destroy(body_amqp_value);
+                }
+            }
+        }
+
+        if (header != NULL)
+        {
+            header_destroy(header);
+        }
+
+        if (header_amqp_value != NULL)
+        {
+            amqpvalue_destroy(header_amqp_value);
+        }
+
+        if (msg_annotations != NULL)
+        {
+            annotations_destroy(msg_annotations);
+        }
+
+        if (application_properties != NULL)
+        {
+            amqpvalue_destroy(application_properties);
+        }
+
+        if (application_properties_value != NULL)
+        {
+            amqpvalue_destroy(application_properties_value);
+        }
+
+        if (properties_amqp_value != NULL)
+        {
+            amqpvalue_destroy(properties_amqp_value);
+        }
+
+        if (properties != NULL)
+        {
+            properties_destroy(properties);
+        }
+    }
+
+    return result;
+}
+
+static void send_all_pending_messages(MESSAGE_SENDER_HANDLE message_sender)
+{
+    size_t i;
+
+    for (i = 0; i < message_sender->message_count; i++)
+    {
+        MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+        if (message_with_callback->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
+        {
+            switch (send_one_message(message_sender, message_sender->messages[i], message_with_callback->message))
+            {
+            default:
+                LogError("Invalid send one message result");
+                break;
+
+            case SEND_ONE_MESSAGE_ERROR:
+            {
+                ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_with_callback->on_message_send_complete;
+                void* context = message_with_callback->context;
+                remove_pending_message_by_index(message_sender, i);
+
+                if (on_message_send_complete != NULL)
+                {
+                    on_message_send_complete(context, MESSAGE_SEND_ERROR, NULL);
+                }
+
+                i = message_sender->message_count;
+                break;
+            }
+            case SEND_ONE_MESSAGE_BUSY:
+                i = message_sender->message_count + 1;
+                break;
+
+            case SEND_ONE_MESSAGE_OK:
+                break;
+            }
+
+            i--;
+        }
+    }
+}
+
+static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender, MESSAGE_SENDER_STATE new_state)
+{
+    MESSAGE_SENDER_STATE previous_state = message_sender->message_sender_state;
+    message_sender->message_sender_state = new_state;
+    if (message_sender->on_message_sender_state_changed != NULL)
+    {
+        message_sender->on_message_sender_state_changed(message_sender->on_message_sender_state_changed_context, new_state, previous_state);
+    }
+}
+
+static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender)
+{
+    size_t i;
+
+    for (i = 0; i < message_sender->message_count; i++)
+    {
+        MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+        if (message_with_callback->on_message_send_complete != NULL)
+        {
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL);
+        }
+
+        if (message_with_callback->message != NULL)
+        {
+            message_destroy(message_with_callback->message);
+        }
+        async_operation_destroy(message_sender->messages[i]);
+    }
+
+    if (message_sender->messages != NULL)
+    {
+        message_sender->message_count = 0;
+
+        free(message_sender->messages);
+        message_sender->messages = NULL;
+    }
+}
+
+static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
+{
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+    (void)previous_link_state;
+
+    switch (new_link_state)
+    {
+    default:
+        break;
+
+    case LINK_STATE_ATTACHED:
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
+        {
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPEN);
+        }
+        break;
+    case LINK_STATE_DETACHED:
+        if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
+            (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
+        {
+            /* switch to closing so that no more requests should be accepted */
+            indicate_all_messages_as_error(message_sender);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE);
+        }
+        else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
+        {
+            /* Any other transition must be an error */
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+        }
+        break;
+    case LINK_STATE_ERROR:
+        if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
+        {
+            indicate_all_messages_as_error(message_sender);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+        }
+        break;
+    }
+}
+
+static void on_link_flow_on(void* context)
+{
+    MESSAGE_SENDER_HANDLE message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+    send_all_pending_messages(message_sender);
+}
+
+MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
+{
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+    if (message_sender == NULL)
+    {
+        LogError("Failed allocating message sender");
+    }
+    else
+    {
+        message_sender->messages = NULL;
+        message_sender->message_count = 0;
+        message_sender->link = link;
+        message_sender->on_message_sender_state_changed = on_message_sender_state_changed;
+        message_sender->on_message_sender_state_changed_context = context;
+        message_sender->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
+        message_sender->is_trace_on = 0;
+    }
+
+    return message_sender;
+}
+
+void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
+{
+    if (message_sender == NULL)
+    {
+        LogError("NULL message_sender");
+    }
+    else
+    {
+        indicate_all_messages_as_error(message_sender);
+        (void)messagesender_close(message_sender);
+
+        free(message_sender);
+    }
+}
+
+int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
+{
+    int result;
+
+    if (message_sender == NULL)
+    {
+        LogError("NULL message_sender");
+        result = __FAILURE__;
+    }
+    else
+    {
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
+        {
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPENING);
+            if (link_attach(message_sender->link, NULL, on_link_state_changed, on_link_flow_on, message_sender) != 0)
+            {
+                LogError("attach link failed");
+                result = __FAILURE__;
+                set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+            }
+            else
+            {
+                result = 0;
+            }
+        }
+        else
+        {
+            result = 0;
+        }
+    }
+
+    return result;
+}
+
+int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
+{
+    int result;
+
+    if (message_sender == NULL)
+    {
+        LogError("NULL message_sender");
+        result = __FAILURE__;
+    }
+    else
+    {
+        if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
+            (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
+        {
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_CLOSING);
+            if (link_detach(message_sender->link, true, NULL, NULL, NULL) != 0)
+            {
+                LogError("Detaching link failed");
+                result = __FAILURE__;
+                set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+            }
+            else
+            {
+                result = 0;
+            }
+        }
+        else
+        {
+            result = 0;
+        }
+    }
+
+    return result;
+}
+
+static void messagesender_send_cancel_handler(ASYNC_OPERATION_HANDLE send_operation)
+{
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation);
+    if (message_with_callback->on_message_send_complete != NULL)
+    {
+        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED, NULL);
+    }
+
+    remove_pending_message(message_with_callback->message_sender, send_operation);
+}
+
+ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout)
+{
+    ASYNC_OPERATION_HANDLE result;
+
+    if ((message_sender == NULL) ||
+        (message == NULL))
+    {
+        LogError("Bad parameters: message_sender = %p, message = %p");
+        result = NULL;
+    }
+    else
+    {
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
+        {
+            LogError("Message sender in ERROR state");
+            result = NULL;
+        }
+        else
+        {
+            result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler);
+            if (result == NULL)
+            {
+                LogError("Failed allocating context for send");
+            }
+            else
+            {
+                MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result);
+                ASYNC_OPERATION_HANDLE* new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count + 1));
+                if (new_messages == NULL)
+                {
+                    LogError("Failed allocating memory for pending sends");
+                    async_operation_destroy(result);
+                    result = NULL;
+                }
+                else
+                {
+                    message_with_callback->timeout = timeout;
+                    message_sender->messages = new_messages;
+                    if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
+                    {
+                        message_with_callback->message = message_clone(message);
+                        if (message_with_callback->message == NULL)
+                        {
+                            LogError("Cannot clone message for placing it in the pending sends list");
+                            async_operation_destroy(result);
+                            result = NULL;
+                        }
+
+                        message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                    }
+                    else
+                    {
+                        message_with_callback->message = NULL;
+                        message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+                    }
+
+                    if (result != NULL)
+                    {
+                        message_with_callback->on_message_send_complete = on_message_send_complete;
+                        message_with_callback->context = callback_context;
+                        message_with_callback->message_sender = message_sender;
+
+                        message_sender->messages[message_sender->message_count] = result;
+                        message_sender->message_count++;
+
+                        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
+                        {
+                            switch (send_one_message(message_sender, result, message))
+                            {
+                            default:
+                            case SEND_ONE_MESSAGE_ERROR:
+                                LogError("Error sending message");
+                                remove_pending_message_by_index(message_sender, message_sender->message_count - 1);
+                                result = NULL;
+                                break;
+
+                            case SEND_ONE_MESSAGE_BUSY:
+                                message_with_callback->message = message_clone(message);
+                                if (message_with_callback->message == NULL)
+                                {
+                                    LogError("Error cloning message for placing it in the pending sends list");
+                                    async_operation_destroy(result);
+                                    result = NULL;
+                                }
+                                else
+                                {
+                                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                                }
+                                break;
+
+                            case SEND_ONE_MESSAGE_OK:
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    return result;
+}
+
+void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
+{
+    if (message_sender == NULL)
+    {
+        LogError("NULL message_sender");
+    }
+    else
+    {
+        message_sender->is_trace_on = traceOn ? 1 : 0;
+    }
+}