A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
6:641a9672db08
Parent:
5:ae49385aff34
Child:
7:9e9ab3b0efef
--- a/message_sender.c	Fri Jul 01 10:42:48 2016 -0700
+++ b/message_sender.c	Fri Jul 29 15:58:39 2016 -0700
@@ -5,6 +5,7 @@
 #ifdef _CRTDBG_MAP_ALLOC
 #include <crtdbg.h>
 #endif
+#include <stdbool.h>
 #include <string.h>
 #include "azure_c_shared_utility/xlogging.h"
 #include "azure_uamqp_c/message_sender.h"
@@ -13,207 +14,209 @@
 
 typedef enum MESSAGE_SEND_STATE_TAG
 {
-	MESSAGE_SEND_STATE_NOT_SENT,
-	MESSAGE_SEND_STATE_PENDING
+    MESSAGE_SEND_STATE_NOT_SENT,
+    MESSAGE_SEND_STATE_PENDING
 } MESSAGE_SEND_STATE;
 
 typedef enum SEND_ONE_MESSAGE_RESULT_TAG
 {
-	SEND_ONE_MESSAGE_OK,
-	SEND_ONE_MESSAGE_ERROR,
-	SEND_ONE_MESSAGE_BUSY
+    SEND_ONE_MESSAGE_OK,
+    SEND_ONE_MESSAGE_ERROR,
+    SEND_ONE_MESSAGE_BUSY
 } SEND_ONE_MESSAGE_RESULT;
 
 typedef struct MESSAGE_WITH_CALLBACK_TAG
 {
-	MESSAGE_HANDLE message;
-	ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
-	void* context;
-	MESSAGE_SENDER_HANDLE message_sender;
-	MESSAGE_SEND_STATE message_send_state;
+    MESSAGE_HANDLE message;
+    ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
+    void* context;
+    MESSAGE_SENDER_HANDLE message_sender;
+    MESSAGE_SEND_STATE message_send_state;
 } MESSAGE_WITH_CALLBACK;
 
 typedef struct MESSAGE_SENDER_INSTANCE_TAG
 {
-	LINK_HANDLE link;
-	size_t message_count;
-	MESSAGE_WITH_CALLBACK** messages;
-	MESSAGE_SENDER_STATE message_sender_state;
-	ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
-	void* on_message_sender_state_changed_context;
+    LINK_HANDLE link;
+    size_t message_count;
+    MESSAGE_WITH_CALLBACK** messages;
+    MESSAGE_SENDER_STATE message_sender_state;
+    ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
+    void* on_message_sender_state_changed_context;
+    unsigned int is_trace_on : 1;
 } MESSAGE_SENDER_INSTANCE;
 
 static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
 {
-	MESSAGE_WITH_CALLBACK** new_messages;
+    MESSAGE_WITH_CALLBACK** new_messages;
 
-	if (message_sender_instance->messages[index]->message != NULL)
-	{
-		message_destroy(message_sender_instance->messages[index]->message);
-		message_sender_instance->messages[index]->message = NULL;
-	}
+    if (message_sender_instance->messages[index]->message != NULL)
+    {
+        message_destroy(message_sender_instance->messages[index]->message);
+        message_sender_instance->messages[index]->message = NULL;
+    }
 
-	amqpalloc_free(message_sender_instance->messages[index]);
+    amqpalloc_free(message_sender_instance->messages[index]);
 
-	if (message_sender_instance->message_count - index > 1)
-	{
-		(void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
-	}
+    if (message_sender_instance->message_count - index > 1)
+    {
+        (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
+    }
 
-	message_sender_instance->message_count--;
+    message_sender_instance->message_count--;
 
-	if (message_sender_instance->message_count > 0)
-	{
-		new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
-		if (new_messages != NULL)
-		{
-			message_sender_instance->messages = new_messages;
-		}
-	}
-	else
-	{
-		amqpalloc_free(message_sender_instance->messages);
-		message_sender_instance->messages = NULL;
-	}
+    if (message_sender_instance->message_count > 0)
+    {
+        new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
+        if (new_messages != NULL)
+        {
+            message_sender_instance->messages = new_messages;
+        }
+    }
+    else
+    {
+        amqpalloc_free(message_sender_instance->messages);
+        message_sender_instance->messages = NULL;
+    }
 }
 
 static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
 {
-	size_t i;
+    size_t i;
 
-	for (i = 0; i < message_sender_instance->message_count; i++)
-	{
-		if (message_sender_instance->messages[i] == message_with_callback)
-		{
-			remove_pending_message_by_index(message_sender_instance, i);
-			break;
-		}
-	}
+    for (i = 0; i < message_sender_instance->message_count; i++)
+    {
+        if (message_sender_instance->messages[i] == message_with_callback)
+        {
+            remove_pending_message_by_index(message_sender_instance, i);
+            break;
+        }
+    }
 }
 
 static void on_delivery_settled(void* context, delivery_number delivery_no)
 {
-	MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
-	MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+    MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
+    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+    (void)delivery_no;
 
-	if (message_with_callback->on_message_send_complete != NULL)
-	{
-		message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
-	}
+    if (message_with_callback->on_message_send_complete != NULL)
+    {
+        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
+    }
 
-	remove_pending_message(message_sender_instance, message_with_callback);
+    remove_pending_message(message_sender_instance, message_with_callback);
 }
 
 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
 {
-	PAYLOAD* payload = (PAYLOAD*)context;
-	(void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
-	payload->length += length;
-	return 0;
+    PAYLOAD* payload = (PAYLOAD*)context;
+    (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
+    payload->length += length;
+    return 0;
 }
 
 static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
 {
-	if (xlogging_get_log_function() != NULL)
-	{
-		char* value_as_string = NULL;
-		LOG(LOG_TRACE, 0, "%s", name);
-		LOG(LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
-		if (value_as_string != NULL)
-		{
-			amqpalloc_free(value_as_string);
-		}
-	}
+    if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1)
+    {
+        char* value_as_string = NULL;
+        LOG(LOG_TRACE, 0, "%s", name);
+        LOG(LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
+        if (value_as_string != NULL)
+        {
+            amqpalloc_free(value_as_string);
+        }
+    }
 }
 
 static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
 {
-	SEND_ONE_MESSAGE_RESULT result;
+    SEND_ONE_MESSAGE_RESULT result;
 
-	size_t encoded_size;
-	size_t total_encoded_size = 0;
-	MESSAGE_BODY_TYPE message_body_type;
+    size_t encoded_size;
+    size_t total_encoded_size = 0;
+    MESSAGE_BODY_TYPE message_body_type;
     message_format message_format;
 
-	if ((message_get_body_type(message, &message_body_type) != 0) ||
+    if ((message_get_body_type(message, &message_body_type) != 0) ||
         (message_get_message_format(message, &message_format) != 0))
-	{
-		result = SEND_ONE_MESSAGE_ERROR;
-	}
-	else
-	{
-		// header
-		HEADER_HANDLE header;
-		AMQP_VALUE header_amqp_value;
-		PROPERTIES_HANDLE properties;
-		AMQP_VALUE properties_amqp_value;
-		AMQP_VALUE application_properties;
-		AMQP_VALUE application_properties_value;
-		AMQP_VALUE body_amqp_value = NULL;
-        size_t body_data_count;
+    {
+        result = SEND_ONE_MESSAGE_ERROR;
+    }
+    else
+    {
+        // header
+        HEADER_HANDLE header;
+        AMQP_VALUE header_amqp_value;
+        PROPERTIES_HANDLE properties;
+        AMQP_VALUE properties_amqp_value;
+        AMQP_VALUE application_properties;
+        AMQP_VALUE application_properties_value;
+        AMQP_VALUE body_amqp_value = NULL;
+        size_t body_data_count = 0;
 
-		message_get_header(message, &header);
-		header_amqp_value = amqpvalue_create_header(header);
-		if (header != NULL)
-		{
-			amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
-			total_encoded_size += encoded_size;
-		}
+        message_get_header(message, &header);
+        header_amqp_value = amqpvalue_create_header(header);
+        if (header != NULL)
+        {
+            amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
+            total_encoded_size += encoded_size;
+        }
 
-		// properties
-		message_get_properties(message, &properties);
-		properties_amqp_value = amqpvalue_create_properties(properties);
-		if (properties != NULL)
-		{
-			amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
-			total_encoded_size += encoded_size;
-		}
+        // properties
+        message_get_properties(message, &properties);
+        properties_amqp_value = amqpvalue_create_properties(properties);
+        if (properties != NULL)
+        {
+            amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
+            total_encoded_size += encoded_size;
+        }
 
-		// application properties
-		message_get_application_properties(message, &application_properties);
-		application_properties_value = amqpvalue_create_application_properties(application_properties);
-		if (application_properties != NULL)
-		{
-			amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
-			total_encoded_size += encoded_size;
-		}
+        // application properties
+        message_get_application_properties(message, &application_properties);
+        application_properties_value = amqpvalue_create_application_properties(application_properties);
+        if (application_properties != NULL)
+        {
+            amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
+            total_encoded_size += encoded_size;
+        }
 
-		result = SEND_ONE_MESSAGE_OK;
+        result = SEND_ONE_MESSAGE_OK;
 
-		// body - amqp data
-		switch (message_body_type)
-		{
-			default:
-				result = SEND_ONE_MESSAGE_ERROR;
-				break;
+        // body - amqp data
+        switch (message_body_type)
+        {
+            default:
+                result = SEND_ONE_MESSAGE_ERROR;
+                break;
 
-			case MESSAGE_BODY_TYPE_VALUE:
-			{
-				AMQP_VALUE message_body_amqp_value;
-				if (message_get_inplace_body_amqp_value(message, &message_body_amqp_value) != 0)
-				{
-					result = SEND_ONE_MESSAGE_ERROR;
-				}
-				else
-				{
-					body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
-					if ((body_amqp_value == NULL) ||
-						(amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
-					{
-						result = SEND_ONE_MESSAGE_ERROR;
-					}
-					else
-					{
-						total_encoded_size += encoded_size;
-					}
-				}
+            case MESSAGE_BODY_TYPE_VALUE:
+            {
+                AMQP_VALUE message_body_amqp_value;
+                if (message_get_inplace_body_amqp_value(message, &message_body_amqp_value) != 0)
+                {
+                    result = SEND_ONE_MESSAGE_ERROR;
+                }
+                else
+                {
+                    body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
+                    if ((body_amqp_value == NULL) ||
+                        (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
+                    {
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+                    else
+                    {
+                        total_encoded_size += encoded_size;
+                    }
+                }
 
-				break;
-			}
+                break;
+            }
 
-			case MESSAGE_BODY_TYPE_DATA:
-			{
-				BINARY_DATA binary_data;
+            case MESSAGE_BODY_TYPE_DATA:
+            {
+                BINARY_DATA binary_data;
                 size_t i;
 
                 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
@@ -230,7 +233,9 @@
                         }
                         else
                         {
-                            amqp_binary binary_value = { binary_data.bytes, binary_data.length };
+                            amqp_binary binary_value;
+                            binary_value.bytes = binary_data.bytes;
+                            binary_value.length = (uint32_t)binary_data.length;
                             AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
                             if (body_amqp_data == NULL)
                             {
@@ -252,62 +257,64 @@
                         }
                     }
                 }
-				break;
-			}
-		}
+                break;
+            }
+        }
 
-		if (result == 0)
-		{
-			void* data_bytes = amqpalloc_malloc(total_encoded_size);
-			PAYLOAD payload = { data_bytes, 0 };
-			result = SEND_ONE_MESSAGE_OK;
+        if (result == 0)
+        {
+            void* data_bytes = amqpalloc_malloc(total_encoded_size);
+            PAYLOAD payload;
+            payload.bytes = data_bytes;
+            payload.length = 0;
+            result = SEND_ONE_MESSAGE_OK;
 
-			if (header != NULL)
-			{
-				if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
-				{
-					result = SEND_ONE_MESSAGE_ERROR;
-				}
+            if (header != NULL)
+            {
+                if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
+                {
+                    result = SEND_ONE_MESSAGE_ERROR;
+                }
 
-				log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
-			}
+                log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
+            }
 
-			if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
-			{
-				if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
-				{
-					result = SEND_ONE_MESSAGE_ERROR;
-				}
+            if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
+            {
+                if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
+                {
+                    result = SEND_ONE_MESSAGE_ERROR;
+                }
 
-				log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
-			}
+                log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
+            }
 
-			if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
-			{
-				if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
-				{
-					result = SEND_ONE_MESSAGE_ERROR;
-				}
+            if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
+            {
+                if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
+                {
+                    result = SEND_ONE_MESSAGE_ERROR;
+                }
 
-				log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
-			}
+                log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
+            }
 
-			if (result == SEND_ONE_MESSAGE_OK)
-			{
-				switch (message_body_type)
-				{
-				case MESSAGE_BODY_TYPE_VALUE:
-				{
-					if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
-					{
-						result = SEND_ONE_MESSAGE_ERROR;
-					}
+            if (result == SEND_ONE_MESSAGE_OK)
+            {
+                switch (message_body_type)
+                {
+                case MESSAGE_BODY_TYPE_VALUE:
+                {
+                    if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
+                    {
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
 
-					log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
-					break;
-				}
-				case MESSAGE_BODY_TYPE_DATA:
-				{
+                    log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
+                    break;
+                }
+                case MESSAGE_BODY_TYPE_DATA:
+                {
                     BINARY_DATA binary_data;
                     size_t i;
 
@@ -319,7 +326,9 @@
                         }
                         else
                         {
-                            amqp_binary binary_value = { binary_data.bytes, binary_data.length };
+                            amqp_binary binary_value;
+                            binary_value.bytes = binary_data.bytes;
+                            binary_value.length = (uint32_t)binary_data.length;
                             AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
                             if (body_amqp_data == NULL)
                             {
@@ -337,96 +346,96 @@
                             }
                         }
                     }
-					break;
-				}
-				}
-			}
+                    break;
+                }
+                }
+            }
 
-			if (result == SEND_ONE_MESSAGE_OK)
-			{
-				message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
-				switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
-				{
-				default:
-				case LINK_TRANSFER_ERROR:
-					if (message_with_callback->on_message_send_complete != NULL)
-					{
-						message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
-					}
+            if (result == SEND_ONE_MESSAGE_OK)
+            {
+                message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+                switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
+                {
+                default:
+                case LINK_TRANSFER_ERROR:
+                    if (message_with_callback->on_message_send_complete != NULL)
+                    {
+                        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
+                    }
 
-					result = SEND_ONE_MESSAGE_ERROR;
-					break;
+                    result = SEND_ONE_MESSAGE_ERROR;
+                    break;
 
-				case LINK_TRANSFER_BUSY:
-					message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-					result = SEND_ONE_MESSAGE_BUSY;
-					break;
+                case LINK_TRANSFER_BUSY:
+                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                    result = SEND_ONE_MESSAGE_BUSY;
+                    break;
 
-				case LINK_TRANSFER_OK:
-					result = SEND_ONE_MESSAGE_OK;
-					break;
-				}
-			}
+                case LINK_TRANSFER_OK:
+                    result = SEND_ONE_MESSAGE_OK;
+                    break;
+                }
+            }
 
-			amqpalloc_free(data_bytes);
+            amqpalloc_free(data_bytes);
 
-			if (body_amqp_value != NULL)
-			{
-				amqpvalue_destroy(body_amqp_value);
-			}
+            if (body_amqp_value != NULL)
+            {
+                amqpvalue_destroy(body_amqp_value);
+            }
 
-			amqpvalue_destroy(application_properties);
-			amqpvalue_destroy(application_properties_value);
-			amqpvalue_destroy(properties_amqp_value);
-			properties_destroy(properties);
-		}
-	}
+            amqpvalue_destroy(application_properties);
+            amqpvalue_destroy(application_properties_value);
+            amqpvalue_destroy(properties_amqp_value);
+            properties_destroy(properties);
+        }
+    }
 
-	return result;
+    return result;
 }
 
 static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
 {
-	size_t i;
+    size_t i;
 
-	for (i = 0; i < message_sender_instance->message_count; i++)
-	{
-		if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
-		{
-			switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
-			{
-			default:
-			case SEND_ONE_MESSAGE_ERROR:
-			{
-				ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
-				void* context = message_sender_instance->messages[i]->context;
-				remove_pending_message_by_index(message_sender_instance, i);
+    for (i = 0; i < message_sender_instance->message_count; i++)
+    {
+        if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
+        {
+            switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
+            {
+            default:
+            case SEND_ONE_MESSAGE_ERROR:
+            {
+                ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
+                void* context = message_sender_instance->messages[i]->context;
+                remove_pending_message_by_index(message_sender_instance, i);
 
-				on_message_send_complete(context, MESSAGE_SEND_ERROR);
-				i = message_sender_instance->message_count;
-				break;
-			}
-			case SEND_ONE_MESSAGE_BUSY:
-				i = message_sender_instance->message_count + 1;
-				break;
+                on_message_send_complete(context, MESSAGE_SEND_ERROR);
+                i = message_sender_instance->message_count;
+                break;
+            }
+            case SEND_ONE_MESSAGE_BUSY:
+                i = message_sender_instance->message_count + 1;
+                break;
 
-			case SEND_ONE_MESSAGE_OK:
-				break;
-			}
+            case SEND_ONE_MESSAGE_OK:
+                break;
+            }
 
-			i--;
-		}
-	}
+            i--;
+        }
+    }
 }
 
 static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
 {
-	MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
-	message_sender_instance->message_sender_state = new_state;
-	if (message_sender_instance->on_message_sender_state_changed != NULL)
-	{
-		message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
-	}
+    MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
+    message_sender_instance->message_sender_state = new_state;
+    if (message_sender_instance->on_message_sender_state_changed != NULL)
+    {
+        message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
+    }
 }
 
 static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
@@ -455,17 +464,18 @@
 
 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
 {
-	MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
+    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
+    (void)previous_link_state;
 
-	switch (new_link_state)
-	{
-	case LINK_STATE_ATTACHED:
-		if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
-		{
-			set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
-		}
-		break;
-	case LINK_STATE_DETACHED:
+    switch (new_link_state)
+    {
+    case LINK_STATE_ATTACHED:
+        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
+        {
+            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
+        }
+        break;
+    case LINK_STATE_DETACHED:
         if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
             (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
         {
@@ -479,219 +489,227 @@
             set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
         }
         break;
-	case LINK_STATE_ERROR:
-		if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
-		{
+    case LINK_STATE_ERROR:
+        if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
+        {
             indicate_all_messages_as_error(message_sender_instance);
-			set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
-		}
-		break;
-	}
+            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+        }
+        break;
+    }
 }
 
 static void on_link_flow_on(void* context)
 {
-	MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
-	send_all_pending_messages(message_sender_instance);
+    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
+    send_all_pending_messages(message_sender_instance);
 }
 
 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
 {
-	MESSAGE_SENDER_INSTANCE* result = amqpalloc_malloc(sizeof(MESSAGE_SENDER_INSTANCE));
-	if (result != NULL)
-	{
-		result->messages = NULL;
-		result->message_count = 0;
-		result->link = link;
-		result->on_message_sender_state_changed = on_message_sender_state_changed;
-		result->on_message_sender_state_changed_context = context;
-		result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
-	}
+    MESSAGE_SENDER_INSTANCE* result = amqpalloc_malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+    if (result != NULL)
+    {
+        result->messages = NULL;
+        result->message_count = 0;
+        result->link = link;
+        result->on_message_sender_state_changed = on_message_sender_state_changed;
+        result->on_message_sender_state_changed_context = context;
+        result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
+        result->is_trace_on = 0;
+    }
 
-	return result;
+    return result;
 }
 
 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
 {
-	if (message_sender != NULL)
-	{
-		MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
+    if (message_sender != NULL)
+    {
+        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
 
-		messagesender_close(message_sender_instance);
+        messagesender_close(message_sender_instance);
 
         indicate_all_messages_as_error(message_sender_instance);
 
-		amqpalloc_free(message_sender);
-	}
+        amqpalloc_free(message_sender);
+    }
 }
 
 int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
 {
-	int result;
+    int result;
 
-	if (message_sender == NULL)
-	{
-		result = __LINE__;
-	}
-	else
-	{
-		MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
+    if (message_sender == NULL)
+    {
+        result = __LINE__;
+    }
+    else
+    {
+        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
 
-		if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
-		{
-			set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
-			if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
-			{
-				result = __LINE__;
-				set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
-			}
-			else
-			{
-				result = 0;
-			}
-		}
-		else
-		{
-			result = 0;
-		}
-	}
+        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
+        {
+            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
+            if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
+            {
+                result = __LINE__;
+                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+            }
+            else
+            {
+                result = 0;
+            }
+        }
+        else
+        {
+            result = 0;
+        }
+    }
 
-	return result;
+    return result;
 }
 
 int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
 {
-	int result;
+    int result;
 
-	if (message_sender == NULL)
-	{
-		result = __LINE__;
-	}
-	else
-	{
-		MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
+    if (message_sender == NULL)
+    {
+        result = __LINE__;
+    }
+    else
+    {
+        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
 
-		if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
-			(message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
-		{
-			set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
-			if (link_detach(message_sender_instance->link) != 0)
-			{
-				result = __LINE__;
-				set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
-			}
-			else
-			{
-				set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
-				result = 0;
-			}
-		}
-		else
-		{
-			result = 0;
-		}
-	}
+        if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
+            (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
+        {
+            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
+            if (link_detach(message_sender_instance->link) != 0)
+            {
+                result = __LINE__;
+                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+            }
+            else
+            {
+                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
+                result = 0;
+            }
+        }
+        else
+        {
+            result = 0;
+        }
+    }
 
-	return result;
+    return result;
 }
 
 int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
 {
-	int result;
+    int result;
 
-	if ((message_sender == NULL) ||
-		(message == NULL))
-	{
-		result = __LINE__;
-	}
-	else
-	{
-		MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-		if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
-		{
-			result = __LINE__;
-		}
-		else
-		{
-			MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)amqpalloc_malloc(sizeof(MESSAGE_WITH_CALLBACK));
-			if (message_with_callback == NULL)
-			{
-				result = __LINE__;
-			}
-			else
-			{
-				MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
-				if (new_messages == NULL)
-				{
-					amqpalloc_free(message_with_callback);
-					result = __LINE__;
-				}
-				else
-				{
-					result = 0;
+    if ((message_sender == NULL) ||
+        (message == NULL))
+    {
+        result = __LINE__;
+    }
+    else
+    {
+        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
+        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
+        {
+            result = __LINE__;
+        }
+        else
+        {
+            MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)amqpalloc_malloc(sizeof(MESSAGE_WITH_CALLBACK));
+            if (message_with_callback == NULL)
+            {
+                result = __LINE__;
+            }
+            else
+            {
+                MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
+                if (new_messages == NULL)
+                {
+                    amqpalloc_free(message_with_callback);
+                    result = __LINE__;
+                }
+                else
+                {
+                    result = 0;
 
-					message_sender_instance->messages = new_messages;
-					if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
-					{
-						message_with_callback->message = message_clone(message);
-						if (message_with_callback->message == NULL)
-						{
-							amqpalloc_free(message_with_callback);
-							result = __LINE__;
-						}
+                    message_sender_instance->messages = new_messages;
+                    if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
+                    {
+                        message_with_callback->message = message_clone(message);
+                        if (message_with_callback->message == NULL)
+                        {
+                            amqpalloc_free(message_with_callback);
+                            result = __LINE__;
+                        }
+
+                        message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                    }
+                    else
+                    {
+                        message_with_callback->message = NULL;
+                        message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+                    }
 
-						message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-					}
-					else
-					{
-						message_with_callback->message = NULL;
-						message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
-					}
+                    if (result == 0)
+                    {
+                        message_with_callback->on_message_send_complete = on_message_send_complete;
+                        message_with_callback->context = callback_context;
+                        message_with_callback->message_sender = message_sender_instance;
+
+                        message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
+                        message_sender_instance->message_count++;
 
-					if (result == 0)
-					{
-						message_with_callback->on_message_send_complete = on_message_send_complete;
-						message_with_callback->context = callback_context;
-						message_with_callback->message_sender = message_sender_instance;
-
-						message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
-						message_sender_instance->message_count++;
-
-						if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
-						{
-							switch (send_one_message(message_sender_instance, message_with_callback, message))
-							{
-							default:
-							case SEND_ONE_MESSAGE_ERROR:
-								remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
-								result = __LINE__;
-								break;
+                        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
+                        {
+                            switch (send_one_message(message_sender_instance, message_with_callback, message))
+                            {
+                            default:
+                            case SEND_ONE_MESSAGE_ERROR:
+                                remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
+                                result = __LINE__;
+                                break;
 
-							case SEND_ONE_MESSAGE_BUSY:
-								message_with_callback->message = message_clone(message);
-								if (message_with_callback->message == NULL)
-								{
-									amqpalloc_free(message_with_callback);
-									result = __LINE__;
-								}
-								else
-								{
-									message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-									result = 0;
-								}
+                            case SEND_ONE_MESSAGE_BUSY:
+                                message_with_callback->message = message_clone(message);
+                                if (message_with_callback->message == NULL)
+                                {
+                                    amqpalloc_free(message_with_callback);
+                                    result = __LINE__;
+                                }
+                                else
+                                {
+                                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                                    result = 0;
+                                }
+                                break;
 
-								break;
+                            case SEND_ONE_MESSAGE_OK:
+                                result = 0;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+    return result;
+}
 
-							case SEND_ONE_MESSAGE_OK:
-								result = 0;
-								break;
-							}
-						}
-					}
-				}
-			}
-		}
-	}
-
-	return result;
+void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
+{
+    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
+    if (message_sender_instance != NULL)
+    {
+        message_sender_instance->is_trace_on = traceOn ? 1 : 0;
+    }
 }