Azure IoT / azure_uamqp_c

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Files at this revision

API Documentation at this revision

Comitter:
AzureIoTClient
Date:
Sat Oct 21 20:12:19 2017 +0000
Parent:
33:08b53020ff0d
Child:
35:d0bed2404ee9
Commit message:
1.1.26

Changed in this revision

amqp_management.c Show annotated file Show diff for this revision Revisions of this file
async_operation.c Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/amqp_management.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/amqp_types.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/async_operation.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/cbs.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/connection.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/link.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/message_receiver.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/message_sender.h Show annotated file Show diff for this revision Revisions of this file
azure_uamqp_c/session.h Show annotated file Show diff for this revision Revisions of this file
link.c Show annotated file Show diff for this revision Revisions of this file
message_receiver.c Show annotated file Show diff for this revision Revisions of this file
message_sender.c Show annotated file Show diff for this revision Revisions of this file
messaging.c Show annotated file Show diff for this revision Revisions of this file
--- a/amqp_management.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/amqp_management.c	Sat Oct 21 20:12:19 2017 +0000
@@ -1071,9 +1071,9 @@
                                 else
                                 {
                                     /* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send`. ]*/
-                                    if (messagesender_send(amqp_management->message_sender, cloned_message, NULL, NULL) != 0)
+                                    if (messagesender_send_async(amqp_management->message_sender, cloned_message, NULL, NULL, 0) == NULL)
                                     {
-                                        /* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send` fails, `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/
+                                        /* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send_async` fails, `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/
                                         LogError("Could not send request message");
                                         (void)singlylinkedlist_remove(amqp_management->pending_operations, added_item);
                                         free(pending_operation_message);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/async_operation.c	Sat Oct 21 20:12:19 2017 +0000
@@ -0,0 +1,85 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <string.h>
+#include "azure_c_shared_utility/optimize_size.h"
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/xlogging.h"
+#include "azure_uamqp_c/async_operation.h"
+
+typedef struct ASYNC_OPERATION_INSTANCE_TAG
+{
+    ASYNC_OPERATION_CANCEL_HANDLER_FUNC async_operation_cancel_handler;
+} ASYNC_OPERATION_INSTANCE;
+
+ASYNC_OPERATION_HANDLE async_operation_create(ASYNC_OPERATION_CANCEL_HANDLER_FUNC async_operation_cancel_handler, size_t context_size)
+{
+    ASYNC_OPERATION_INSTANCE* async_operation;
+
+    if (async_operation_cancel_handler == NULL)
+    {
+        /* Codes_SRS_ASYNC_OPERATION_01_002: [ If `async_operation_cancel_handler` is NULL, `async_operation_create` shall fail and return NULL.]*/
+        LogError("Cannot allocate memory for async operation");
+        async_operation = NULL;
+    }
+    else if (context_size < sizeof(ASYNC_OPERATION_INSTANCE))
+    {
+        /* Codes_SRS_ASYNC_OPERATION_01_003: [ If `context_size` is less than the size of the `async_operation_cancel_handler` argument, `async_operation_create` shall fail and return NULL.]*/
+        LogError("Context size too small");
+        async_operation = NULL;
+    }
+    else
+    {
+        async_operation = (ASYNC_OPERATION_INSTANCE*)malloc(context_size);
+        if (async_operation == NULL)
+        {
+            /* Codes_SRS_ASYNC_OPERATION_01_004: [ If allocating memory for the new asynchronous operation instance fails, `async_operation_create` shall fail and return NULL.]*/
+            LogError("Cannot allocate memory for async operation");
+        }
+        else
+        {
+            /* Codes_SRS_ASYNC_OPERATION_01_001: [ `async_operation_create` shall return a non-NULL handle to a newly created asynchronous operation instance.]*/
+            async_operation->async_operation_cancel_handler = async_operation_cancel_handler;
+        }
+    }
+
+    return async_operation;
+}
+
+void async_operation_destroy(ASYNC_OPERATION_HANDLE async_operation)
+{
+    if (async_operation == NULL)
+    {
+        /* Codes_SRS_ASYNC_OPERATION_01_006: [ If `async_operation` is NULL, `async_operation_destroy` shall do nothing.]*/
+        LogError("NULL async_operation");
+    }
+    else
+    {
+        /* Codes_SRS_ASYNC_OPERATION_01_005: [ `async_operation_destroy` shall free all recources associated with the asyncronous operation instance.]*/
+        free(async_operation);
+    }
+}
+
+int async_operation_cancel(ASYNC_OPERATION_HANDLE async_operation)
+{
+    int result;
+
+    if (async_operation == NULL)
+    {
+        LogError("NULL async_operation");
+        result = __FAILURE__;
+    }
+    else
+    {
+        /* Codes_SRS_ASYNC_OPERATION_01_007: [ `async_operation_cancel` shall cancel the operation by calling the cancel handler function passed to `async_operation_create`.]*/
+        async_operation->async_operation_cancel_handler(async_operation);
+
+        /* Codes_SRS_ASYNC_OPERATION_01_008: [ On success `async_operation_cancel` shall return 0.]*/
+        result = 0;
+    }
+
+    return result;
+}
--- a/azure_uamqp_c/amqp_management.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/amqp_management.h	Sat Oct 21 20:12:19 2017 +0000
@@ -5,6 +5,7 @@
 #define AMQP_MANAGEMENT_H
 
 #include <stdbool.h>
+#include "azure_c_shared_utility/macro_utils.h"
 #include "azure_uamqp_c/session.h"
 #include "azure_uamqp_c/message.h"
 
@@ -14,20 +15,20 @@
 
 #include "azure_c_shared_utility/umock_c_prod.h"
 
-    typedef enum AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT_TAG
-    {
-        AMQP_MANAGEMENT_EXECUTE_OPERATION_OK,
-        AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR,
-        AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS,
-        AMQP_MANAGEMENT_EXECUTE_OPERATION_INSTANCE_CLOSED
-    } AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT;
+#define AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT_VALUES \
+    AMQP_MANAGEMENT_EXECUTE_OPERATION_OK, \
+    AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR, \
+    AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS, \
+    AMQP_MANAGEMENT_EXECUTE_OPERATION_INSTANCE_CLOSED
 
-    typedef enum AMQP_MANAGEMENT_OPEN_RESULT_TAG
-    {
-        AMQP_MANAGEMENT_OPEN_OK,
-        AMQP_MANAGEMENT_OPEN_ERROR,
-        AMQP_MANAGEMENT_OPEN_CANCELLED
-    } AMQP_MANAGEMENT_OPEN_RESULT;
+DEFINE_ENUM(AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT, AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT_VALUES)
+
+#define AMQP_MANAGEMENT_OPEN_RESULT_VALUES \
+    AMQP_MANAGEMENT_OPEN_OK, \
+    AMQP_MANAGEMENT_OPEN_ERROR, \
+    AMQP_MANAGEMENT_OPEN_CANCELLED
+
+DEFINE_ENUM(AMQP_MANAGEMENT_OPEN_RESULT, AMQP_MANAGEMENT_OPEN_RESULT_VALUES)
 
     typedef struct AMQP_MANAGEMENT_INSTANCE_TAG* AMQP_MANAGEMENT_HANDLE;
     typedef void(*ON_AMQP_MANAGEMENT_OPEN_COMPLETE)(void* context, AMQP_MANAGEMENT_OPEN_RESULT open_result);
--- a/azure_uamqp_c/amqp_types.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/amqp_types.h	Sat Oct 21 20:12:19 2017 +0000
@@ -5,38 +5,39 @@
 #define ANQP_TYPES_H
 
 #include <stddef.h>
+#include "azure_c_shared_utility/macro_utils.h"
 
 #ifdef __cplusplus
 extern "C" {
 #endif /* __cplusplus */
 
-    typedef enum AMQP_TYPE_TAG
-    {
-        AMQP_TYPE_NULL,
-        AMQP_TYPE_BOOL,
-        AMQP_TYPE_UBYTE,
-        AMQP_TYPE_USHORT,
-        AMQP_TYPE_UINT,
-        AMQP_TYPE_ULONG,
-        AMQP_TYPE_BYTE,
-        AMQP_TYPE_SHORT,
-        AMQP_TYPE_INT,
-        AMQP_TYPE_LONG,
-        AMQP_TYPE_FLOAT,
-        AMQP_TYPE_DOUBLE,
-        AMQP_TYPE_CHAR,
-        AMQP_TYPE_TIMESTAMP,
-        AMQP_TYPE_UUID,
-        AMQP_TYPE_BINARY,
-        AMQP_TYPE_STRING,
-        AMQP_TYPE_SYMBOL,
-        AMQP_TYPE_LIST,
-        AMQP_TYPE_MAP,
-        AMQP_TYPE_ARRAY,
-        AMQP_TYPE_DESCRIBED,
-        AMQP_TYPE_COMPOSITE,
-        AMQP_TYPE_UNKNOWN
-    } AMQP_TYPE;
+#define AMQP_TYPE_VALUES \
+    AMQP_TYPE_NULL, \
+    AMQP_TYPE_BOOL, \
+    AMQP_TYPE_UBYTE, \
+    AMQP_TYPE_USHORT, \
+    AMQP_TYPE_UINT, \
+    AMQP_TYPE_ULONG, \
+    AMQP_TYPE_BYTE, \
+    AMQP_TYPE_SHORT, \
+    AMQP_TYPE_INT, \
+    AMQP_TYPE_LONG, \
+    AMQP_TYPE_FLOAT, \
+    AMQP_TYPE_DOUBLE, \
+    AMQP_TYPE_CHAR, \
+    AMQP_TYPE_TIMESTAMP, \
+    AMQP_TYPE_UUID, \
+    AMQP_TYPE_BINARY, \
+    AMQP_TYPE_STRING, \
+    AMQP_TYPE_SYMBOL, \
+    AMQP_TYPE_LIST, \
+    AMQP_TYPE_MAP, \
+    AMQP_TYPE_ARRAY, \
+    AMQP_TYPE_DESCRIBED, \
+    AMQP_TYPE_COMPOSITE, \
+    AMQP_TYPE_UNKNOWN
+
+DEFINE_ENUM(AMQP_TYPE, AMQP_TYPE_VALUES);
 
 #ifdef __cplusplus
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/azure_uamqp_c/async_operation.h	Sat Oct 21 20:12:19 2017 +0000
@@ -0,0 +1,43 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#ifndef ASYNC_OPERATION_H
+#define ASYNC_OPERATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#include <cstdint>
+#include <cstddef>
+#else
+#include <stdint.h>
+#include <stddef.h>
+#endif /* __cplusplus */
+
+#include "azure_c_shared_utility/umock_c_prod.h"
+
+typedef struct ASYNC_OPERATION_INSTANCE_TAG* ASYNC_OPERATION_HANDLE;
+
+typedef void(*ASYNC_OPERATION_CANCEL_HANDLER_FUNC)(ASYNC_OPERATION_HANDLE async_operation);
+
+#define DEFINE_ASYNC_OPERATION_CONTEXT(type) \
+typedef struct C3(ASYNC_OPERATION_CONTEXT_STRUCT_, type, _TAG) \
+{ \
+    ASYNC_OPERATION_CANCEL_HANDLER_FUNC async_operation_cancel_handler; \
+    type context; \
+} C2(ASYNC_OPERATION_CONTEXT_STRUCT_, type);
+
+#define GET_ASYNC_OPERATION_CONTEXT(type, async_operation) \
+    (type*)((unsigned char*)async_operation + offsetof(C2(ASYNC_OPERATION_CONTEXT_STRUCT_, type), context))
+
+#define CREATE_ASYNC_OPERATION(type, async_operation_cancel_handler) \
+    async_operation_create(async_operation_cancel_handler, sizeof(C2(ASYNC_OPERATION_CONTEXT_STRUCT_, type)))
+
+MOCKABLE_FUNCTION(, ASYNC_OPERATION_HANDLE, async_operation_create, ASYNC_OPERATION_CANCEL_HANDLER_FUNC, async_operation_cancel_handler, size_t, context_size);
+MOCKABLE_FUNCTION(, void, async_operation_destroy, ASYNC_OPERATION_HANDLE, async_operation);
+MOCKABLE_FUNCTION(, int, async_operation_cancel, ASYNC_OPERATION_HANDLE, async_operation);
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif /* ASYNC_OPERATION_H */
--- a/azure_uamqp_c/cbs.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/cbs.h	Sat Oct 21 20:12:19 2017 +0000
@@ -5,6 +5,8 @@
 #define CBS_H
 
 #include "azure_uamqp_c/session.h"
+#include "azure_c_shared_utility/macro_utils.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -12,22 +14,20 @@
 #include <stdbool.h>
 #endif /* __cplusplus */
 
-#include "azure_c_shared_utility/umock_c_prod.h"
+#define CBS_OPERATION_RESULT_VALUES \
+    CBS_OPERATION_RESULT_OK, \
+    CBS_OPERATION_RESULT_CBS_ERROR, \
+    CBS_OPERATION_RESULT_OPERATION_FAILED, \
+    CBS_OPERATION_RESULT_INSTANCE_CLOSED
 
-    typedef enum CBS_OPERATION_RESULT_TAG
-    {
-        CBS_OPERATION_RESULT_OK,
-        CBS_OPERATION_RESULT_CBS_ERROR,
-        CBS_OPERATION_RESULT_OPERATION_FAILED,
-        CBS_OPERATION_RESULT_INSTANCE_CLOSED
-    } CBS_OPERATION_RESULT;
+DEFINE_ENUM(CBS_OPERATION_RESULT, CBS_OPERATION_RESULT_VALUES)
 
-    typedef enum CBS_OPEN_COMPLETE_RESULT_TAG
-    {
-        CBS_OPEN_OK,
-        CBS_OPEN_ERROR,
-        CBS_OPEN_CANCELLED
-    } CBS_OPEN_COMPLETE_RESULT;
+#define CBS_OPEN_COMPLETE_RESULT_VALUES \
+    CBS_OPEN_OK, \
+    CBS_OPEN_ERROR, \
+    CBS_OPEN_CANCELLED
+
+DEFINE_ENUM(CBS_OPEN_COMPLETE_RESULT, CBS_OPEN_COMPLETE_RESULT_VALUES)
 
     typedef struct CBS_INSTANCE_TAG* CBS_HANDLE;
     typedef void(*ON_CBS_OPEN_COMPLETE)(void* context, CBS_OPEN_COMPLETE_RESULT open_complete_result);
--- a/azure_uamqp_c/connection.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/connection.h	Sat Oct 21 20:12:19 2017 +0000
@@ -8,6 +8,7 @@
 #include <stdbool.h>
 #include <stdint.h>
 #include "azure_c_shared_utility/xio.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
 #include "azure_uamqp_c/amqp_frame_codec.h"
 #include "azure_uamqp_c/amqp_definitions.h"
 
@@ -15,8 +16,6 @@
 extern "C" {
 #endif /* __cplusplus */
 
-#include "azure_c_shared_utility/umock_c_prod.h"
-
     typedef struct CONNECTION_INSTANCE_TAG* CONNECTION_HANDLE;
     typedef struct ENDPOINT_INSTANCE_TAG* ENDPOINT_HANDLE;
 
--- a/azure_uamqp_c/link.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/link.h	Sat Oct 21 20:12:19 2017 +0000
@@ -5,6 +5,8 @@
 #define LINK_H
 
 #include <stddef.h>
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_c_shared_utility/macro_utils.h"
 #include "azure_uamqp_c/session.h"
 #include "azure_uamqp_c/amqpvalue.h"
 #include "azure_uamqp_c/amqp_definitions.h"
@@ -17,28 +19,29 @@
 
 typedef struct LINK_INSTANCE_TAG* LINK_HANDLE;
 
-typedef enum LINK_STATE_TAG
-{
-    LINK_STATE_DETACHED,
-    LINK_STATE_HALF_ATTACHED_ATTACH_SENT,
-    LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED,
-    LINK_STATE_ATTACHED,
+#define LINK_STATE_VALUES \
+    LINK_STATE_DETACHED, \
+    LINK_STATE_HALF_ATTACHED_ATTACH_SENT, \
+    LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED, \
+    LINK_STATE_ATTACHED, \
     LINK_STATE_ERROR
-} LINK_STATE;
+
+DEFINE_ENUM(LINK_STATE, LINK_STATE_VALUES)
 
-typedef enum LINK_TRANSFER_RESULT_TAG
-{
-    LINK_TRANSFER_OK,
-    LINK_TRANSFER_ERROR,
+#define LINK_TRANSFER_RESULT_VALUES \
+    LINK_TRANSFER_OK, \
+    LINK_TRANSFER_ERROR, \
     LINK_TRANSFER_BUSY
-} LINK_TRANSFER_RESULT;
+
+DEFINE_ENUM(LINK_TRANSFER_RESULT, LINK_TRANSFER_RESULT_VALUES)
 
-typedef enum LINK_DELIVERY_SETTLE_REASON_TAG
-{
-    LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED,
-    LINK_DELIVERY_SETTLE_REASON_SETTLED,
-    LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED
-} LINK_DELIVERY_SETTLE_REASON;
+#define LINK_DELIVERY_SETTLE_REASON_VALUES \
+    LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, \
+    LINK_DELIVERY_SETTLE_REASON_SETTLED, \
+    LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED, \
+    LINK_DELIVERY_SETTLE_REASON_TIMEOUT
+
+DEFINE_ENUM(LINK_DELIVERY_SETTLE_REASON, LINK_DELIVERY_SETTLE_REASON_VALUES)
 
 typedef void(*ON_DELIVERY_SETTLED)(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state);
 typedef AMQP_VALUE(*ON_TRANSFER_RECEIVED)(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes);
@@ -63,7 +66,8 @@
 MOCKABLE_FUNCTION(, int, link_send_disposition, LINK_HANDLE, link, delivery_number, message_number, AMQP_VALUE, delivery_state);
 MOCKABLE_FUNCTION(, int, link_attach, LINK_HANDLE, link, ON_TRANSFER_RECEIVED, on_transfer_received, ON_LINK_STATE_CHANGED, on_link_state_changed, ON_LINK_FLOW_ON, on_link_flow_on, void*, callback_context);
 MOCKABLE_FUNCTION(, int, link_detach, LINK_HANDLE, link, bool, close);
-MOCKABLE_FUNCTION(, LINK_TRANSFER_RESULT, link_transfer, LINK_HANDLE, handle, message_format, message_format, PAYLOAD*, payloads, size_t, payload_count, ON_DELIVERY_SETTLED, on_delivery_settled, void*, callback_context);
+MOCKABLE_FUNCTION(, LINK_TRANSFER_RESULT, link_transfer_async, LINK_HANDLE, handle, message_format, message_format, PAYLOAD*, payloads, size_t, payload_count, ON_DELIVERY_SETTLED, on_delivery_settled, void*, callback_context, tickcounter_ms_t, timeout);
+MOCKABLE_FUNCTION(, void, link_dowork, LINK_HANDLE, link);
 
 #ifdef __cplusplus
 }
--- a/azure_uamqp_c/message_receiver.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/message_receiver.h	Sat Oct 21 20:12:19 2017 +0000
@@ -7,6 +7,8 @@
 #include "azure_uamqp_c/link.h"
 #include "azure_uamqp_c/message.h"
 #include "azure_uamqp_c/amqp_definitions.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
+#include "azure_c_shared_utility/macro_utils.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -14,16 +16,14 @@
 #include <stdbool.h>
 #endif /* __cplusplus */
 
-#include "azure_c_shared_utility/umock_c_prod.h"
+#define MESSAGE_RECEIVER_STATE_VALUES \
+    MESSAGE_RECEIVER_STATE_IDLE, \
+    MESSAGE_RECEIVER_STATE_OPENING, \
+    MESSAGE_RECEIVER_STATE_OPEN, \
+    MESSAGE_RECEIVER_STATE_CLOSING, \
+    MESSAGE_RECEIVER_STATE_ERROR
 
-    typedef enum MESSAGE_RECEIVER_STATE_TAG
-    {
-        MESSAGE_RECEIVER_STATE_IDLE,
-        MESSAGE_RECEIVER_STATE_OPENING,
-        MESSAGE_RECEIVER_STATE_OPEN,
-        MESSAGE_RECEIVER_STATE_CLOSING,
-        MESSAGE_RECEIVER_STATE_ERROR
-    } MESSAGE_RECEIVER_STATE;
+DEFINE_ENUM(MESSAGE_RECEIVER_STATE, MESSAGE_RECEIVER_STATE_VALUES)
 
     typedef struct MESSAGE_RECEIVER_INSTANCE_TAG* MESSAGE_RECEIVER_HANDLE;
     typedef AMQP_VALUE (*ON_MESSAGE_RECEIVED)(const void* context, MESSAGE_HANDLE message);
--- a/azure_uamqp_c/message_sender.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/message_sender.h	Sat Oct 21 20:12:19 2017 +0000
@@ -8,27 +8,31 @@
 #include "azure_uamqp_c/link.h"
 #include "azure_uamqp_c/message.h"
 #include "azure_c_shared_utility/xlogging.h"
+#include "azure_uamqp_c/async_operation.h"
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
+#include "azure_c_shared_utility/macro_utils.h"
 
 #ifdef __cplusplus
 extern "C" {
 #endif /* __cplusplus */
 
-#include "azure_c_shared_utility/umock_c_prod.h"
+#define MESSAGE_SEND_RESULT_VALUES \
+    MESSAGE_SEND_OK, \
+    MESSAGE_SEND_ERROR, \
+    MESSAGE_SEND_TIMEOUT, \
+    MESSAGE_SEND_CANCELLED
 
-    typedef enum MESSAGE_SEND_RESULT_TAG
-    {
-        MESSAGE_SEND_OK,
-        MESSAGE_SEND_ERROR
-    } MESSAGE_SEND_RESULT;
+DEFINE_ENUM(MESSAGE_SEND_RESULT, MESSAGE_SEND_RESULT_VALUES)
 
-    typedef enum MESSAGE_SENDER_STATE_TAG
-    {
-        MESSAGE_SENDER_STATE_IDLE,
-        MESSAGE_SENDER_STATE_OPENING,
-        MESSAGE_SENDER_STATE_OPEN,
-        MESSAGE_SENDER_STATE_CLOSING,
-        MESSAGE_SENDER_STATE_ERROR
-    } MESSAGE_SENDER_STATE;
+#define MESSAGE_SENDER_STATE_VALUES \
+    MESSAGE_SENDER_STATE_IDLE, \
+    MESSAGE_SENDER_STATE_OPENING, \
+    MESSAGE_SENDER_STATE_OPEN, \
+    MESSAGE_SENDER_STATE_CLOSING, \
+    MESSAGE_SENDER_STATE_ERROR
+
+DEFINE_ENUM(MESSAGE_SENDER_STATE, MESSAGE_SENDER_STATE_VALUES)
 
     typedef struct MESSAGE_SENDER_INSTANCE_TAG* MESSAGE_SENDER_HANDLE;
     typedef void(*ON_MESSAGE_SEND_COMPLETE)(void* context, MESSAGE_SEND_RESULT send_result);
@@ -38,7 +42,7 @@
     MOCKABLE_FUNCTION(, void, messagesender_destroy, MESSAGE_SENDER_HANDLE, message_sender);
     MOCKABLE_FUNCTION(, int, messagesender_open, MESSAGE_SENDER_HANDLE, message_sender);
     MOCKABLE_FUNCTION(, int, messagesender_close, MESSAGE_SENDER_HANDLE, message_sender);
-    MOCKABLE_FUNCTION(, int, messagesender_send, MESSAGE_SENDER_HANDLE, message_sender, MESSAGE_HANDLE, message, ON_MESSAGE_SEND_COMPLETE, on_message_send_complete, void*, callback_context);
+    MOCKABLE_FUNCTION(, ASYNC_OPERATION_HANDLE, messagesender_send_async, MESSAGE_SENDER_HANDLE, message_sender, MESSAGE_HANDLE, message, ON_MESSAGE_SEND_COMPLETE, on_message_send_complete, void*, callback_context, tickcounter_ms_t, timeout);
     MOCKABLE_FUNCTION(, void, messagesender_set_trace, MESSAGE_SENDER_HANDLE, message_sender, bool, traceOn);
 
 #ifdef __cplusplus
--- a/azure_uamqp_c/session.h	Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/session.h	Sat Oct 21 20:12:19 2017 +0000
@@ -8,34 +8,34 @@
 #include "azure_uamqp_c/amqpvalue.h"
 #include "azure_uamqp_c/amqp_frame_codec.h"
 #include "azure_uamqp_c/connection.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
+#include "azure_c_shared_utility/macro_utils.h"
 
 #ifdef __cplusplus
 extern "C" {
 #endif /* __cplusplus */
 
-#include "azure_c_shared_utility/umock_c_prod.h"
-
-    typedef struct SESSION_INSTANCE_TAG* SESSION_HANDLE;
-    typedef struct LINK_ENDPOINT_INSTANCE_TAG* LINK_ENDPOINT_HANDLE;
+typedef struct SESSION_INSTANCE_TAG* SESSION_HANDLE;
+typedef struct LINK_ENDPOINT_INSTANCE_TAG* LINK_ENDPOINT_HANDLE;
 
-    typedef enum SESION_STATE_TAG
-    {
-        SESSION_STATE_UNMAPPED,
-        SESSION_STATE_BEGIN_SENT,
-        SESSION_STATE_BEGIN_RCVD,
-        SESSION_STATE_MAPPED,
-        SESSION_STATE_END_SENT,
-        SESSION_STATE_END_RCVD,
-        SESSION_STATE_DISCARDING,
-        SESSION_STATE_ERROR
-    } SESSION_STATE;
+#define SESSION_STATE_VALUES \
+    SESSION_STATE_UNMAPPED, \
+    SESSION_STATE_BEGIN_SENT, \
+    SESSION_STATE_BEGIN_RCVD, \
+    SESSION_STATE_MAPPED, \
+    SESSION_STATE_END_SENT, \
+    SESSION_STATE_END_RCVD, \
+    SESSION_STATE_DISCARDING, \
+    SESSION_STATE_ERROR
 
-    typedef enum SESSION_SEND_TRANSFER_RESULT_TAG
-    {
-        SESSION_SEND_TRANSFER_OK,
-        SESSION_SEND_TRANSFER_ERROR,
-        SESSION_SEND_TRANSFER_BUSY
-    } SESSION_SEND_TRANSFER_RESULT;
+DEFINE_ENUM(SESSION_STATE, SESSION_STATE_VALUES)
+
+#define SESSION_SEND_TRANSFER_RESULT_VALUES \
+    SESSION_SEND_TRANSFER_OK, \
+    SESSION_SEND_TRANSFER_ERROR, \
+    SESSION_SEND_TRANSFER_BUSY
+
+DEFINE_ENUM(SESSION_SEND_TRANSFER_RESULT, SESSION_SEND_TRANSFER_RESULT_VALUES)
 
     typedef void(*LINK_ENDPOINT_FRAME_RECEIVED_CALLBACK)(void* context, AMQP_VALUE performative, uint32_t frame_payload_size, const unsigned char* payload_bytes);
     typedef void(*ON_SESSION_STATE_CHANGED)(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state);
--- a/link.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/link.c	Sat Oct 21 20:12:19 2017 +0000
@@ -9,6 +9,7 @@
 #include "azure_c_shared_utility/optimize_size.h"
 #include "azure_c_shared_utility/xlogging.h"
 #include "azure_c_shared_utility/singlylinkedlist.h"
+#include "azure_c_shared_utility/tickcounter.h"
 #include "azure_uamqp_c/link.h"
 #include "azure_uamqp_c/session.h"
 #include "azure_uamqp_c/amqpvalue.h"
@@ -23,6 +24,8 @@
     ON_DELIVERY_SETTLED on_delivery_settled;
     void* callback_context;
     void* link;
+    tickcounter_ms_t start_tick;
+    tickcounter_ms_t timeout;
 } DELIVERY_INSTANCE;
 
 typedef struct LINK_INSTANCE_TAG
@@ -55,6 +58,7 @@
     unsigned char* received_payload;
     uint32_t received_payload_size;
     delivery_number received_delivery_id;
+    TICK_COUNTER_HANDLE tick_counter;
 } LINK_INSTANCE;
 
 static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
@@ -639,37 +643,49 @@
         result->received_payload_size = 0;
         result->received_delivery_id = 0;
 
-        result->pending_deliveries = singlylinkedlist_create();
-        if (result->pending_deliveries == NULL)
+        result->tick_counter = tickcounter_create();
+        if (result->tick_counter == NULL)
         {
             free(result);
             result = NULL;
         }
         else
         {
-            size_t name_length = strlen(name);
-            result->name = (char*)malloc(name_length + 1);
-            if (result->name == NULL)
+            result->pending_deliveries = singlylinkedlist_create();
+            if (result->pending_deliveries == NULL)
             {
-                singlylinkedlist_destroy(result->pending_deliveries);
+                tickcounter_destroy(result->tick_counter);
                 free(result);
                 result = NULL;
             }
             else
             {
-                result->on_link_state_changed = NULL;
-                result->callback_context = NULL;
-                set_link_state(result, LINK_STATE_DETACHED);
-
-                (void)memcpy(result->name, name, name_length + 1);
-                result->link_endpoint = session_create_link_endpoint(session, name);
-                if (result->link_endpoint == NULL)
+                size_t name_length = strlen(name);
+                result->name = (char*)malloc(name_length + 1);
+                if (result->name == NULL)
                 {
+                    tickcounter_destroy(result->tick_counter);
                     singlylinkedlist_destroy(result->pending_deliveries);
-                    free(result->name);
                     free(result);
                     result = NULL;
                 }
+                else
+                {
+                    result->on_link_state_changed = NULL;
+                    result->callback_context = NULL;
+                    set_link_state(result, LINK_STATE_DETACHED);
+
+                    (void)memcpy(result->name, name, name_length + 1);
+                    result->link_endpoint = session_create_link_endpoint(session, name);
+                    if (result->link_endpoint == NULL)
+                    {
+                        tickcounter_destroy(result->tick_counter);
+                        singlylinkedlist_destroy(result->pending_deliveries);
+                        free(result->name);
+                        free(result);
+                        result = NULL;
+                    }
+                }
             }
         }
     }
@@ -709,28 +725,39 @@
             result->role = role_sender;
         }
 
-        result->pending_deliveries = singlylinkedlist_create();
-        if (result->pending_deliveries == NULL)
+        result->tick_counter = tickcounter_create();
+        if (result->tick_counter == NULL)
         {
             free(result);
             result = NULL;
         }
         else
         {
-            size_t name_length = strlen(name);
-            result->name = (char*)malloc(name_length + 1);
-            if (result->name == NULL)
+            result->pending_deliveries = singlylinkedlist_create();
+            if (result->pending_deliveries == NULL)
             {
-                singlylinkedlist_destroy(result->pending_deliveries);
+                tickcounter_destroy(result->tick_counter);
                 free(result);
                 result = NULL;
             }
             else
             {
-                (void)memcpy(result->name, name, name_length + 1);
-                result->on_link_state_changed = NULL;
-                result->callback_context = NULL;
-                result->link_endpoint = link_endpoint;
+                size_t name_length = strlen(name);
+                result->name = (char*)malloc(name_length + 1);
+                if (result->name == NULL)
+                {
+                    tickcounter_destroy(result->tick_counter);
+                    singlylinkedlist_destroy(result->pending_deliveries);
+                    free(result);
+                    result = NULL;
+                }
+                else
+                {
+                    (void)memcpy(result->name, name, name_length + 1);
+                    result->on_link_state_changed = NULL;
+                    result->callback_context = NULL;
+                    result->link_endpoint = link_endpoint;
+                }
             }
         }
     }
@@ -743,6 +770,7 @@
     if (link != NULL)
     {
         remove_all_pending_deliveries((LINK_INSTANCE*)link, false);
+        tickcounter_destroy(link->tick_counter);
 
         link->on_link_state_changed = NULL;
         (void)link_detach(link, true);
@@ -1063,7 +1091,7 @@
     return result;
 }
 
-LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context)
+LINK_TRANSFER_RESULT link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, tickcounter_ms_t timeout)
 {
     LINK_TRANSFER_RESULT result;
 
@@ -1133,41 +1161,50 @@
                         }
                         else
                         {
-                            LIST_ITEM_HANDLE delivery_instance_list_item;
-                            pending_delivery->on_delivery_settled = on_delivery_settled;
-                            pending_delivery->callback_context = callback_context;
-                            pending_delivery->link = link;
-                            delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
-
-                            if (delivery_instance_list_item == NULL)
+                            if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
                             {
                                 free(pending_delivery);
                                 result = LINK_TRANSFER_ERROR;
                             }
                             else
                             {
-                                /* here we should feed data to the transfer frame */
-                                switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+                                LIST_ITEM_HANDLE delivery_instance_list_item;
+                                pending_delivery->timeout = timeout;
+                                pending_delivery->on_delivery_settled = on_delivery_settled;
+                                pending_delivery->callback_context = callback_context;
+                                pending_delivery->link = link;
+                                delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
+
+                                if (delivery_instance_list_item == NULL)
                                 {
-                                default:
-                                case SESSION_SEND_TRANSFER_ERROR:
-                                    singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
                                     free(pending_delivery);
                                     result = LINK_TRANSFER_ERROR;
-                                    break;
+                                }
+                                else
+                                {
+                                    /* here we should feed data to the transfer frame */
+                                    switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+                                    {
+                                    default:
+                                    case SESSION_SEND_TRANSFER_ERROR:
+                                        singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                        free(pending_delivery);
+                                        result = LINK_TRANSFER_ERROR;
+                                        break;
 
-                                case SESSION_SEND_TRANSFER_BUSY:
-                                    /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
-                                    singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
-                                    free(pending_delivery);
-                                    result = LINK_TRANSFER_BUSY;
-                                    break;
+                                    case SESSION_SEND_TRANSFER_BUSY:
+                                        /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
+                                        singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+                                        free(pending_delivery);
+                                        result = LINK_TRANSFER_BUSY;
+                                        break;
 
-                                case SESSION_SEND_TRANSFER_OK:
-                                    link->delivery_count = delivery_count;
-                                    link->link_credit--;
-                                    result = LINK_TRANSFER_OK;
-                                    break;
+                                    case SESSION_SEND_TRANSFER_OK:
+                                        link->delivery_count = delivery_count;
+                                        link->link_credit--;
+                                        result = LINK_TRANSFER_OK;
+                                        break;
+                                    }
                                 }
                             }
                         }
@@ -1236,3 +1273,40 @@
     }
     return result;
 }
+
+void link_dowork(LINK_HANDLE link)
+{
+    if (link != NULL)
+    {
+        tickcounter_ms_t current_tick;
+
+        if (tickcounter_get_current_ms(link->tick_counter, &current_tick) != 0)
+        {
+            /* error */
+        }
+        else
+        {
+            // go through all and find timed out deliveries
+            LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
+            while (item != NULL)
+            {
+                LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
+                DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
+
+                if ((delivery_instance->timeout != 0) &&
+                    (current_tick - delivery_instance->start_tick >= delivery_instance->timeout))
+                {
+                    if (delivery_instance->on_delivery_settled != NULL)
+                    {
+                        delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL);
+                    }
+
+                    (void)singlylinkedlist_remove(link->pending_deliveries, item);
+                    free(delivery_instance);
+                }
+
+                item = next_item;
+            }
+        }
+    }
+}
--- a/message_receiver.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/message_receiver.c	Sat Oct 21 20:12:19 2017 +0000
@@ -22,27 +22,28 @@
     bool decode_error;
 } MESSAGE_RECEIVER_INSTANCE;
 
-static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state)
+static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver, MESSAGE_RECEIVER_STATE new_state)
 {
-    MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state;
-    message_receiver_instance->message_receiver_state = new_state;
-    if (message_receiver_instance->on_message_receiver_state_changed != NULL)
+    MESSAGE_RECEIVER_STATE previous_state = message_receiver->message_receiver_state;
+    message_receiver->message_receiver_state = new_state;
+    if (message_receiver->on_message_receiver_state_changed != NULL)
     {
-        message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state);
+        message_receiver->on_message_receiver_state_changed(message_receiver->on_message_receiver_state_changed_context, new_state, previous_state);
     }
 }
 
 static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
 {
-    MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
-    MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message;
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
+    MESSAGE_HANDLE decoded_message = message_receiver->decoded_message;
     AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
 
     if (is_application_properties_type_by_descriptor(descriptor))
     {
         if (message_set_application_properties(decoded_message, decoded_value) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error setting application properties on received message");
+            message_receiver->decode_error = true;
         }
     }
     else if (is_properties_type_by_descriptor(descriptor))
@@ -50,13 +51,15 @@
         PROPERTIES_HANDLE properties;
         if (amqpvalue_get_properties(decoded_value, &properties) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message properties");
+            message_receiver->decode_error = true;
         }
         else
         {
             if (message_set_properties(decoded_message, properties) != 0)
             {
-                message_receiver_instance->decode_error = true;
+                LogError("Error setting message properties on received message");
+                message_receiver->decode_error = true;
             }
 
             properties_destroy(properties);
@@ -65,19 +68,35 @@
     else if (is_delivery_annotations_type_by_descriptor(descriptor))
     {
         annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
-        if ((delivery_annotations == NULL) ||
-            (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0))
+        if (delivery_annotations == NULL)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting delivery annotations");
+            message_receiver->decode_error = true;
+        }
+        else
+        {
+            if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)
+            {
+                LogError("Error setting delivery annotations on received message");
+                message_receiver->decode_error = true;
+            }
         }
     }
     else if (is_message_annotations_type_by_descriptor(descriptor))
     {
         annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
-        if ((message_annotations == NULL) ||
-            (message_set_message_annotations(decoded_message, message_annotations) != 0))
+        if (message_annotations == NULL)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message annotations");
+            message_receiver->decode_error = true;
+        }
+        else
+        {
+            if (message_set_message_annotations(decoded_message, message_annotations) != 0)
+            {
+                LogError("Error setting message annotations on received message");
+                message_receiver->decode_error = true;
+            }
         }
     }
     else if (is_header_type_by_descriptor(descriptor))
@@ -85,13 +104,15 @@
         HEADER_HANDLE header;
         if (amqpvalue_get_header(decoded_value, &header) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message header");
+            message_receiver->decode_error = true;
         }
         else
         {
             if (message_set_header(decoded_message, header) != 0)
             {
-                message_receiver_instance->decode_error = true;
+                LogError("Error setting message header on received message");
+                message_receiver->decode_error = true;
             }
 
             header_destroy(header);
@@ -100,57 +121,97 @@
     else if (is_footer_type_by_descriptor(descriptor))
     {
         annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
-        if ((footer == NULL) ||
-            (message_set_footer(decoded_message, footer) != 0))
+        if (footer == NULL)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message footer");
+            message_receiver->decode_error = true;
+        }
+        else
+        {
+            if (message_set_footer(decoded_message, footer) != 0)
+            {
+                LogError("Error setting message footer on received message");
+                message_receiver->decode_error = true;
+            }
         }
     }
     else if (is_amqp_value_type_by_descriptor(descriptor))
     {
         MESSAGE_BODY_TYPE body_type;
-        message_get_body_type(decoded_message, &body_type);
-        if (body_type != MESSAGE_BODY_TYPE_NONE)
+        if (message_get_body_type(decoded_message, &body_type) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message body type");
+            message_receiver->decode_error = true;
         }
         else
         {
-            AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
-            if ((body_amqp_value == NULL) ||
-                (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0))
+            if (body_type != MESSAGE_BODY_TYPE_NONE)
+            {
+                LogError("Body already set on received message");
+                message_receiver->decode_error = true;
+            }
+            else
             {
-                message_receiver_instance->decode_error = true;
+                AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
+                if (body_amqp_value == NULL)
+                {
+                    LogError("Error getting body AMQP value");
+                    message_receiver->decode_error = true;
+                }
+                else
+                {
+                    if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)
+                    {
+                        LogError("Error setting body AMQP value on received message");
+                        message_receiver->decode_error = true;
+                    }
+                }
             }
         }
     }
     else if (is_data_type_by_descriptor(descriptor))
     {
         MESSAGE_BODY_TYPE body_type;
-        message_get_body_type(decoded_message, &body_type);
-        if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
-            (body_type != MESSAGE_BODY_TYPE_DATA))
+        if (message_get_body_type(decoded_message, &body_type) != 0)
         {
-            message_receiver_instance->decode_error = true;
+            LogError("Error getting message body type");
+            message_receiver->decode_error = true;
         }
         else
         {
-            AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
-            data data_value;
-
-            if ((body_data_value == NULL) ||
-                (amqpvalue_get_data(body_data_value, &data_value) != 0))
+            if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
+                (body_type != MESSAGE_BODY_TYPE_DATA))
             {
-                message_receiver_instance->decode_error = true;
+                LogError("Message body type already set to something different than AMQP DATA");
+                message_receiver->decode_error = true;
             }
             else
             {
-                BINARY_DATA binary_data;
-                binary_data.bytes = (const unsigned char*)data_value.bytes;
-                binary_data.length = data_value.length;
-                if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
+                AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
+                if (body_data_value == NULL)
+                {
+                    LogError("Error getting body DATA value");
+                    message_receiver->decode_error = true;
+                }
+                else
                 {
-                    message_receiver_instance->decode_error = true;
+                    data data_value;
+                    if (amqpvalue_get_data(body_data_value, &data_value) != 0)
+                    {
+                        LogError("Error getting body DATA AMQP value");
+                        message_receiver->decode_error = true;
+                    }
+                    else
+                    {
+                        BINARY_DATA binary_data;
+                        binary_data.bytes = (const unsigned char*)data_value.bytes;
+                        binary_data.length = data_value.length;
+                        if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
+                        {
+                            LogError("Error adding body DATA to received message");
+                            message_receiver->decode_error = true;
+                        }
+                    }
                 }
             }
         }
@@ -160,40 +221,44 @@
 static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes)
 {
     AMQP_VALUE result = NULL;
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
 
-    MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
     (void)transfer;
-    if (message_receiver_instance->on_message_received != NULL)
+    if (message_receiver->on_message_received != NULL)
     {
         MESSAGE_HANDLE message = message_create();
         if (message == NULL)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+            LogError("Cannot create message");
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
         }
         else
         {
-            AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance);
+            AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
             if (amqpvalue_decoder == NULL)
             {
-                set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                LogError("Cannot create AMQP value decoder");
+                set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
             }
             else
             {
-                message_receiver_instance->decoded_message = message;
-                message_receiver_instance->decode_error = false;
+                message_receiver->decoded_message = message;
+                message_receiver->decode_error = false;
                 if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
                 {
-                    set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                    LogError("Cannot decode bytes");
+                    set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
                 }
                 else
                 {
-                    if (message_receiver_instance->decode_error)
+                    if (message_receiver->decode_error)
                     {
-                        set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                        LogError("Error decoding message");
+                        set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
                     }
                     else
                     {
-                        result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message);
+                        result = message_receiver->on_message_received(message_receiver->callback_context, message);
                     }
                 }
 
@@ -209,7 +274,7 @@
 
 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
 {
-    MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
     (void)previous_link_state;
 
     switch (new_link_state)
@@ -218,28 +283,28 @@
         break;
 
     case LINK_STATE_ATTACHED:
-        if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
+        if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPEN);
         }
         break;
     case LINK_STATE_DETACHED:
-        if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
-            (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
+        if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
+            (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
         {
             /* User initiated transition, we should be good */
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_IDLE);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_IDLE);
         }
-        else if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
+        else if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
         {
             /* Any other transition must be an error */
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
         }
         break;
     case LINK_STATE_ERROR:
-        if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
+        if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
         }
         break;
     }
@@ -247,21 +312,29 @@
 
 MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context)
 {
-    MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
-    if (result != NULL)
+    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
+    if (message_receiver == NULL)
     {
-        result->link = link;
-        result->on_message_receiver_state_changed = on_message_receiver_state_changed;
-        result->on_message_receiver_state_changed_context = context;
-        result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
+        LogError("Error creating message receiver");
+    }
+    else
+    {
+        message_receiver->link = link;
+        message_receiver->on_message_receiver_state_changed = on_message_receiver_state_changed;
+        message_receiver->on_message_receiver_state_changed_context = context;
+        message_receiver->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
     }
 
-    return result;
+    return message_receiver;
 }
 
 void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver)
 {
-    if (message_receiver != NULL)
+    if (message_receiver == NULL)
+    {
+        LogError("NULL message_receiver");
+    }
+    else
     {
         (void)messagereceiver_close(message_receiver);
         free(message_receiver);
@@ -274,24 +347,24 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-
-        if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
+        if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING);
-            if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0)
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPENING);
+            if (link_attach(message_receiver->link, on_transfer_received, on_link_state_changed, NULL, message_receiver) != 0)
             {
+                LogError("Link attach failed");
                 result = __FAILURE__;
-                set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
             }
             else
             {
-                message_receiver_instance->on_message_received = on_message_received;
-                message_receiver_instance->callback_context = callback_context;
+                message_receiver->on_message_received = on_message_received;
+                message_receiver->callback_context = callback_context;
 
                 result = 0;
             }
@@ -311,21 +384,21 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-
-        if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
-            (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
+        if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
+            (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
         {
-            set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING);
+            set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_CLOSING);
 
-            if (link_detach(message_receiver_instance->link, true) != 0)
+            if (link_detach(message_receiver->link, true) != 0)
             {
+                LogError("link detach failed");
                 result = __FAILURE__;
-                set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+                set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
             }
             else
             {
@@ -347,13 +420,14 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-        if (link_get_name(message_receiver_instance->link, link_name) != 0)
+        if (link_get_name(message_receiver->link, link_name) != 0)
         {
+            LogError("Getting link name failed");
             result = __FAILURE__;
         }
         else
@@ -371,13 +445,14 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-        if (link_get_received_message_id(message_receiver_instance->link, message_id) != 0)
+        if (link_get_received_message_id(message_receiver->link, message_id) != 0)
         {
+            LogError("Failed getting received message Id");
             result = __FAILURE__;
         }
         else
@@ -395,32 +470,36 @@
 
     if (message_receiver == NULL)
     {
+        LogError("NULL message_receiver");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-        if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
+        if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
         {
+            LogError("Message received not open");
             result = __FAILURE__;
         }
         else
         {
             const char* my_name;
-            if (link_get_name(message_receiver_instance->link, &my_name) != 0)
+            if (link_get_name(message_receiver->link, &my_name) != 0)
             {
+                LogError("Failed getting link name");
                 result = __FAILURE__;
             }
             else
             {
                 if (strcmp(link_name, my_name) != 0)
                 {
+                    LogError("Link name does not match");
                     result = __FAILURE__;
                 }
                 else
                 {
-                    if (link_send_disposition(message_receiver_instance->link, message_number, delivery_state) != 0)
+                    if (link_send_disposition(message_receiver->link, message_number, delivery_state) != 0)
                     {
+                        LogError("Seding disposition failed");
                         result = __FAILURE__;
                     }
                     else
@@ -437,7 +516,13 @@
 
 void messagereceiver_set_trace(MESSAGE_RECEIVER_HANDLE message_receiver, bool trace_on)
 {
-    /* No tracing is yet implemented for message receiver */
-    (void)message_receiver;
-    (void)trace_on;
+    if (message_receiver == NULL)
+    {
+        LogError("NULL message_receiver");
+    }
+    else
+    {
+        /* No tracing is yet implemented for message receiver */
+        (void)trace_on;
+    }
 }
--- a/message_sender.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/message_sender.c	Sat Oct 21 20:12:19 2017 +0000
@@ -7,8 +7,10 @@
 #include "azure_c_shared_utility/optimize_size.h"
 #include "azure_c_shared_utility/gballoc.h"
 #include "azure_c_shared_utility/xlogging.h"
+#include "azure_c_shared_utility/tickcounter.h"
 #include "azure_uamqp_c/message_sender.h"
 #include "azure_uamqp_c/amqpvalue_to_string.h"
+#include "azure_uamqp_c/async_operation.h"
 
 typedef enum MESSAGE_SEND_STATE_TAG
 {
@@ -30,62 +32,66 @@
     void* context;
     MESSAGE_SENDER_HANDLE message_sender;
     MESSAGE_SEND_STATE message_send_state;
+    tickcounter_ms_t timeout;
 } MESSAGE_WITH_CALLBACK;
 
+DEFINE_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK);
+
 typedef struct MESSAGE_SENDER_INSTANCE_TAG
 {
     LINK_HANDLE link;
     size_t message_count;
-    MESSAGE_WITH_CALLBACK** messages;
+    ASYNC_OPERATION_HANDLE* messages;
     MESSAGE_SENDER_STATE message_sender_state;
     ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
     void* on_message_sender_state_changed_context;
     unsigned int is_trace_on : 1;
 } MESSAGE_SENDER_INSTANCE;
 
-static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
+static void remove_pending_message_by_index(MESSAGE_SENDER_HANDLE message_sender, size_t index)
 {
-    MESSAGE_WITH_CALLBACK** new_messages;
+    ASYNC_OPERATION_HANDLE* new_messages;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[index]);
 
-    if (message_sender_instance->messages[index]->message != NULL)
+    if (message_with_callback->message != NULL)
     {
-        message_destroy(message_sender_instance->messages[index]->message);
-        message_sender_instance->messages[index]->message = NULL;
+        message_destroy(message_with_callback->message);
+        message_with_callback->message = NULL;
     }
 
-    free(message_sender_instance->messages[index]);
+    async_operation_destroy(message_sender->messages[index]);
 
-    if (message_sender_instance->message_count - index > 1)
+    if (message_sender->message_count - index > 1)
     {
-        (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
+        (void)memmove(&message_sender->messages[index], &message_sender->messages[index + 1], sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count - index - 1));
     }
 
-    message_sender_instance->message_count--;
+    message_sender->message_count--;
 
-    if (message_sender_instance->message_count > 0)
+    if (message_sender->message_count > 0)
     {
-        new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
+        new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count));
         if (new_messages != NULL)
         {
-            message_sender_instance->messages = new_messages;
+            message_sender->messages = new_messages;
         }
     }
     else
     {
-        free(message_sender_instance->messages);
-        message_sender_instance->messages = NULL;
+        free(message_sender->messages);
+        message_sender->messages = NULL;
     }
 }
 
-static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
+static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send)
 {
     size_t i;
 
-    for (i = 0; i < message_sender_instance->message_count; i++)
+    for (i = 0; i < message_sender->message_count; i++)
     {
-        if (message_sender_instance->messages[i] == message_with_callback)
+        if (message_sender->messages[i] == pending_send)
         {
-            remove_pending_message_by_index(message_sender_instance, i);
+            remove_pending_message_by_index(message_sender, i);
             break;
         }
     }
@@ -93,8 +99,9 @@
 
 static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
 {
-    MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+    ASYNC_OPERATION_HANDLE pending_send = (ASYNC_OPERATION_HANDLE)context;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
     (void)delivery_no;
 
     if (message_with_callback->on_message_send_complete != NULL)
@@ -128,6 +135,9 @@
         case LINK_DELIVERY_SETTLE_REASON_SETTLED:
             message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
             break;
+        case LINK_DELIVERY_SETTLE_REASON_TIMEOUT:
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT);
+            break;
         case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
         default:
             message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
@@ -135,7 +145,7 @@
         }
     }
 
-    remove_pending_message(message_sender_instance, message_with_callback);
+    remove_pending_message(message_sender, pending_send);
 }
 
 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
@@ -146,18 +156,18 @@
     return 0;
 }
 
-static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
+static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender, const char* name, AMQP_VALUE value)
 {
 #ifdef NO_LOGGING
-    UNUSED(message_sender_instance);
+    UNUSED(message_sender);
     UNUSED(name);
     UNUSED(value);
 #else
-    if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1)
+    if (xlogging_get_log_function() != NULL && message_sender->is_trace_on == 1)
     {
         char* value_as_string = NULL;
-        LOG(AZ_LOG_TRACE, 0, "%s", name);
-        LOG(AZ_LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
+        LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL(name));
+        LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL((value_as_string = amqpvalue_to_string(value))));
         if (value_as_string != NULL)
         {
             free(value_as_string);
@@ -166,7 +176,7 @@
 #endif
 }
 
-static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
+static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
 {
     SEND_ONE_MESSAGE_RESULT result;
 
@@ -193,51 +203,112 @@
         AMQP_VALUE body_amqp_value = NULL;
         size_t body_data_count = 0;
         AMQP_VALUE msg_annotations = NULL;
+        bool is_error = false;
 
-        message_get_header(message, &header);
-        header_amqp_value = amqpvalue_create_header(header);
-        if (header != NULL)
+        // message header
+        if ((message_get_header(message, &header) == 0) &&
+            (header != NULL))
         {
-            amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
-            total_encoded_size += encoded_size;
+            header_amqp_value = amqpvalue_create_header(header);
+            if (header_amqp_value == NULL)
+            {
+                LogError("Cannot create header AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain header encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+
         }
 
         // message annotations
-        if ((message_get_message_annotations(message, &msg_annotations) == 0) &&
+        if ((!is_error) &&
+            (message_get_message_annotations(message, &msg_annotations) == 0) &&
             (msg_annotations != NULL))
         {
-            amqpvalue_get_encoded_size(msg_annotations, &encoded_size);
-            total_encoded_size += encoded_size;
+            if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
+            {
+                LogError("Cannot obtain message annotations encoded size");
+                is_error = true;
+            }
+            else
+            {
+                total_encoded_size += encoded_size;
+            }
         }
 
         // properties
-        message_get_properties(message, &properties);
-        properties_amqp_value = amqpvalue_create_properties(properties);
-        if (properties != NULL)
+        if ((!is_error) &&
+            (message_get_properties(message, &properties) == 0) &&
+            (properties != NULL))
         {
-            amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
-            total_encoded_size += encoded_size;
+            properties_amqp_value = amqpvalue_create_properties(properties);
+            if (properties_amqp_value == NULL)
+            {
+                LogError("Cannot create message properties AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain message properties encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
         }
 
         // application properties
-        message_get_application_properties(message, &application_properties);
-        if (application_properties != NULL)
+        if ((!is_error) &&
+            (message_get_application_properties(message, &application_properties) == 0) &&
+            (application_properties != NULL))
         {
             application_properties_value = amqpvalue_create_application_properties(application_properties);
-            amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
-            total_encoded_size += encoded_size;
+            if (application_properties_value == NULL)
+            {
+                LogError("Cannot create application properties AMQP value");
+                is_error = true;
+            }
+            else
+            {
+                if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
+                {
+                    LogError("Cannot obtain application properties encoded size");
+                    is_error = true;
+                }
+                else
+                {
+                    total_encoded_size += encoded_size;
+                }
+            }
+        }
+
+        if (is_error)
+        {
+            result = SEND_ONE_MESSAGE_ERROR;
         }
         else
         {
-            application_properties_value = NULL;
-        }
-
-        result = SEND_ONE_MESSAGE_OK;
+            result = SEND_ONE_MESSAGE_OK;
 
-        // body - amqp data
-        switch (message_body_type)
-        {
+            // body - amqp data
+            switch (message_body_type)
+            {
             default:
+                LogError("Unknown body type");
                 result = SEND_ONE_MESSAGE_ERROR;
                 break;
 
@@ -246,19 +317,28 @@
                 AMQP_VALUE message_body_amqp_value;
                 if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
                 {
+                    LogError("Cannot obtain AMQP value from body");
                     result = SEND_ONE_MESSAGE_ERROR;
                 }
                 else
                 {
                     body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
-                    if ((body_amqp_value == NULL) ||
-                        (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
+                    if (body_amqp_value == NULL)
                     {
+                        LogError("Cannot create body AMQP value");
                         result = SEND_ONE_MESSAGE_ERROR;
                     }
                     else
                     {
-                        total_encoded_size += encoded_size;
+                        if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
+                        {
+                            LogError("Cannot get body AMQP value encoded size");
+                            result = SEND_ONE_MESSAGE_ERROR;
+                        }
+                        else
+                        {
+                            total_encoded_size += encoded_size;
+                        }
                     }
                 }
 
@@ -272,6 +352,7 @@
 
                 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
                 {
+                    LogError("Cannot get body AMQP data count");
                     result = SEND_ONE_MESSAGE_ERROR;
                 }
                 else
@@ -280,6 +361,7 @@
                     {
                         if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                         {
+                            LogError("Cannot get body AMQP data %u", (unsigned int)i);
                             result = SEND_ONE_MESSAGE_ERROR;
                         }
                         else
@@ -291,12 +373,14 @@
                             body_amqp_data = amqpvalue_create_data(binary_value);
                             if (body_amqp_data == NULL)
                             {
+                                LogError("Cannot create body AMQP data");
                                 result = SEND_ONE_MESSAGE_ERROR;
                             }
                             else
                             {
                                 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
                                 {
+                                    LogError("Cannot get body AMQP data encoded size");
                                     result = SEND_ONE_MESSAGE_ERROR;
                                 }
                                 else
@@ -311,139 +395,151 @@
                 }
                 break;
             }
-        }
-
-        if (result == 0)
-        {
-            void* data_bytes = malloc(total_encoded_size);
-            PAYLOAD payload;
-            payload.bytes = (const unsigned char*)data_bytes;
-            payload.length = 0;
-            result = SEND_ONE_MESSAGE_OK;
-
-            if (header != NULL)
-            {
-                if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
-                {
-                    result = SEND_ONE_MESSAGE_ERROR;
-                }
-
-                log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
-            }
-
-            if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
-            {
-                if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
-                {
-                    result = SEND_ONE_MESSAGE_ERROR;
-                }
-
-                log_message_chunk(message_sender_instance, "Message Annotations:", msg_annotations);
             }
 
-            if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
+            if (result == 0)
             {
-                if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
+                void* data_bytes = malloc(total_encoded_size);
+                PAYLOAD payload;
+                payload.bytes = (const unsigned char*)data_bytes;
+                payload.length = 0;
+                result = SEND_ONE_MESSAGE_OK;
+
+                if (header != NULL)
                 {
-                    result = SEND_ONE_MESSAGE_ERROR;
+                    if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode header value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Header:", header_amqp_value);
                 }
 
-                log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
-            }
+                if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
+                {
+                    if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode message annotations value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
 
-            if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
-            {
-                if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
-                {
-                    result = SEND_ONE_MESSAGE_ERROR;
+                    log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
                 }
 
-                log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
-            }
-
-            if (result == SEND_ONE_MESSAGE_OK)
-            {
-                switch (message_body_type)
+                if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
                 {
-                default:
-                    result = SEND_ONE_MESSAGE_ERROR;
-                    break;
-
-                case MESSAGE_BODY_TYPE_VALUE:
-                {
-                    if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
+                    if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
                     {
+                        LogError("Cannot encode message properties value");
                         result = SEND_ONE_MESSAGE_ERROR;
                     }
 
-                    log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
-                    break;
+                    log_message_chunk(message_sender, "Properties:", properties_amqp_value);
                 }
-                case MESSAGE_BODY_TYPE_DATA:
+
+                if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
                 {
-                    BINARY_DATA binary_data;
-                    size_t i;
+                    if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
+                    {
+                        LogError("Cannot encode application properties value");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                    }
+
+                    log_message_chunk(message_sender, "Application properties:", application_properties_value);
+                }
 
-                    for (i = 0; i < body_data_count; i++)
+                if (result == SEND_ONE_MESSAGE_OK)
+                {
+                    switch (message_body_type)
                     {
-                        if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+                    default:
+                        LogError("Unknown message type");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                        break;
+
+                    case MESSAGE_BODY_TYPE_VALUE:
+                    {
+                        if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
                         {
+                            LogError("Cannot encode body AMQP value");
                             result = SEND_ONE_MESSAGE_ERROR;
                         }
-                        else
+
+                        log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
+                        break;
+                    }
+                    case MESSAGE_BODY_TYPE_DATA:
+                    {
+                        BINARY_DATA binary_data;
+                        size_t i;
+
+                        for (i = 0; i < body_data_count; i++)
                         {
-                            AMQP_VALUE body_amqp_data;
-                            amqp_binary binary_value;
-                            binary_value.bytes = binary_data.bytes;
-                            binary_value.length = (uint32_t)binary_data.length;
-                            body_amqp_data = amqpvalue_create_data(binary_value);
-                            if (body_amqp_data == NULL)
+                            if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                             {
+                                LogError("Cannot get AMQP data %u", (unsigned int)i);
                                 result = SEND_ONE_MESSAGE_ERROR;
                             }
                             else
                             {
-                                if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+                                AMQP_VALUE body_amqp_data;
+                                amqp_binary binary_value;
+                                binary_value.bytes = binary_data.bytes;
+                                binary_value.length = (uint32_t)binary_data.length;
+                                body_amqp_data = amqpvalue_create_data(binary_value);
+                                if (body_amqp_data == NULL)
                                 {
+                                    LogError("Cannot create body AMQP data %u", (unsigned int)i);
                                     result = SEND_ONE_MESSAGE_ERROR;
-                                    break;
                                 }
+                                else
+                                {
+                                    if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+                                    {
+                                        LogError("Cannot encode body AMQP data %u", (unsigned int)i);
+                                        result = SEND_ONE_MESSAGE_ERROR;
+                                        break;
+                                    }
 
-                                amqpvalue_destroy(body_amqp_data);
+                                    amqpvalue_destroy(body_amqp_data);
+                                }
                             }
                         }
+                        break;
                     }
-                    break;
+                    }
                 }
-                }
-            }
 
-            if (result == SEND_ONE_MESSAGE_OK)
-            {
-                message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
-                switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
+                if (result == SEND_ONE_MESSAGE_OK)
                 {
-                default:
-                case LINK_TRANSFER_ERROR:
-                    result = SEND_ONE_MESSAGE_ERROR;
-                    break;
+                    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+                    switch (link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, message_with_callback->timeout))
+                    {
+                    default:
+                    case LINK_TRANSFER_ERROR:
+                        LogError("Error in link transfer");
+                        result = SEND_ONE_MESSAGE_ERROR;
+                        break;
 
-                case LINK_TRANSFER_BUSY:
-                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-                    result = SEND_ONE_MESSAGE_BUSY;
-                    break;
+                    case LINK_TRANSFER_BUSY:
+                        message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+                        result = SEND_ONE_MESSAGE_BUSY;
+                        break;
 
-                case LINK_TRANSFER_OK:
-                    result = SEND_ONE_MESSAGE_OK;
-                    break;
+                    case LINK_TRANSFER_OK:
+                        result = SEND_ONE_MESSAGE_OK;
+                        break;
+                    }
                 }
-            }
 
-            free(data_bytes);
+                free(data_bytes);
 
-            if (body_amqp_value != NULL)
-            {
-                amqpvalue_destroy(body_amqp_value);
+                if (body_amqp_value != NULL)
+                {
+                    amqpvalue_destroy(body_amqp_value);
+                }
             }
         }
 
@@ -466,14 +562,17 @@
         {
             amqpvalue_destroy(application_properties);
         }
+
         if (application_properties_value != NULL)
         {
             amqpvalue_destroy(application_properties_value);
         }
+
         if (properties_amqp_value != NULL)
         {
             amqpvalue_destroy(properties_amqp_value);
         }
+
         if (properties != NULL)
         {
             properties_destroy(properties);
@@ -483,33 +582,37 @@
     return result;
 }
 
-static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
+static void send_all_pending_messages(MESSAGE_SENDER_HANDLE message_sender)
 {
     size_t i;
 
-    for (i = 0; i < message_sender_instance->message_count; i++)
+    for (i = 0; i < message_sender->message_count; i++)
     {
-        if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
+        MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+        if (message_with_callback->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
         {
-            switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
+            switch (send_one_message(message_sender, message_sender->messages[i], message_with_callback->message))
             {
             default:
+                LogError("Invalid send one message result");
+                break;
+
             case SEND_ONE_MESSAGE_ERROR:
             {
-                ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
-                void* context = message_sender_instance->messages[i]->context;
-                remove_pending_message_by_index(message_sender_instance, i);
+                ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_with_callback->on_message_send_complete;
+                void* context = message_with_callback->context;
+                remove_pending_message_by_index(message_sender, i);
 
                 if (on_message_send_complete != NULL)
                 {
                     on_message_send_complete(context, MESSAGE_SEND_ERROR);
                 }
 
-                i = message_sender_instance->message_count;
+                i = message_sender->message_count;
                 break;
             }
             case SEND_ONE_MESSAGE_BUSY:
-                i = message_sender_instance->message_count + 1;
+                i = message_sender->message_count + 1;
                 break;
 
             case SEND_ONE_MESSAGE_OK:
@@ -521,46 +624,47 @@
     }
 }
 
-static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
+static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender, MESSAGE_SENDER_STATE new_state)
 {
-    MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
-    message_sender_instance->message_sender_state = new_state;
-    if (message_sender_instance->on_message_sender_state_changed != NULL)
+    MESSAGE_SENDER_STATE previous_state = message_sender->message_sender_state;
+    message_sender->message_sender_state = new_state;
+    if (message_sender->on_message_sender_state_changed != NULL)
     {
-        message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
+        message_sender->on_message_sender_state_changed(message_sender->on_message_sender_state_changed_context, new_state, previous_state);
     }
 }
 
-static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
+static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender)
 {
     size_t i;
 
-    for (i = 0; i < message_sender_instance->message_count; i++)
+    for (i = 0; i < message_sender->message_count; i++)
     {
-        if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
+        MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+        if (message_with_callback->on_message_send_complete != NULL)
         {
-            message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
+            message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
         }
 
-        if (message_sender_instance->messages[i]->message != NULL)
+        if (message_with_callback->message != NULL)
         {
-            message_destroy(message_sender_instance->messages[i]->message);
+            message_destroy(message_with_callback->message);
         }
-        free(message_sender_instance->messages[i]);
+        async_operation_destroy(message_sender->messages[i]);
     }
 
-    if (message_sender_instance->messages != NULL)
+    if (message_sender->messages != NULL)
     {
-        message_sender_instance->message_count = 0;
+        message_sender->message_count = 0;
 
-        free(message_sender_instance->messages);
-        message_sender_instance->messages = NULL;
+        free(message_sender->messages);
+        message_sender->messages = NULL;
     }
 }
 
 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
 {
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)context;
     (void)previous_link_state;
 
     switch (new_link_state)
@@ -569,30 +673,30 @@
         break;
 
     case LINK_STATE_ATTACHED:
-        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPEN);
         }
         break;
     case LINK_STATE_DETACHED:
-        if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
-            (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
+        if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
+            (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
         {
             /* User initiated transition, we should be good */
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
-            indicate_all_messages_as_error(message_sender_instance);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE);
+            indicate_all_messages_as_error(message_sender);
         }
-        else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
+        else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
         {
             /* Any other transition must be an error */
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
         }
         break;
     case LINK_STATE_ERROR:
-        if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
+        if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
-            indicate_all_messages_as_error(message_sender_instance);
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+            indicate_all_messages_as_error(message_sender);
         }
         break;
     }
@@ -600,36 +704,41 @@
 
 static void on_link_flow_on(void* context)
 {
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
-    send_all_pending_messages(message_sender_instance);
+    MESSAGE_SENDER_HANDLE message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+    send_all_pending_messages(message_sender);
 }
 
 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
 {
-    MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
-    if (result != NULL)
+    MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+    if (message_sender == NULL)
+    {
+        LogError("Failed allocating message sender");
+    }
+    else
     {
-        result->messages = NULL;
-        result->message_count = 0;
-        result->link = link;
-        result->on_message_sender_state_changed = on_message_sender_state_changed;
-        result->on_message_sender_state_changed_context = context;
-        result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
-        result->is_trace_on = 0;
+        message_sender->messages = NULL;
+        message_sender->message_count = 0;
+        message_sender->link = link;
+        message_sender->on_message_sender_state_changed = on_message_sender_state_changed;
+        message_sender->on_message_sender_state_changed_context = context;
+        message_sender->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
+        message_sender->is_trace_on = 0;
     }
 
-    return result;
+    return message_sender;
 }
 
 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
 {
-    if (message_sender != NULL)
+    if (message_sender == NULL)
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
-        messagesender_close(message_sender_instance);
-
-        indicate_all_messages_as_error(message_sender_instance);
+        LogError("NULL message_sender");
+    }
+    else
+    {
+        (void)messagesender_close(message_sender);
+        indicate_all_messages_as_error(message_sender);
 
         free(message_sender);
     }
@@ -641,19 +750,19 @@
 
     if (message_sender == NULL)
     {
+        LogError("NULL message_sender");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
-        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
-            if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPENING);
+            if (link_attach(message_sender->link, NULL, on_link_state_changed, on_link_flow_on, message_sender) != 0)
             {
+                LogError("attach link failed");
                 result = __FAILURE__;
-                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+                set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
             }
             else
             {
@@ -675,20 +784,20 @@
 
     if (message_sender == NULL)
     {
+        LogError("NULL message_sender");
         result = __FAILURE__;
     }
     else
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
-        if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
-            (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
+        if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
+            (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
         {
-            set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
-            if (link_detach(message_sender_instance->link, true) != 0)
+            set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_CLOSING);
+            if (link_detach(message_sender->link, true) != 0)
             {
+                LogError("Detaching link failed");
                 result = __FAILURE__;
-                set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+                set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
             }
             else
             {
@@ -704,49 +813,63 @@
     return result;
 }
 
-int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
+static void messagesender_send_cancel_handler(ASYNC_OPERATION_HANDLE send_operation)
 {
-    int result;
+    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation);
+    if (message_with_callback->on_message_send_complete != NULL)
+    {
+        message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED);
+    }
+
+    remove_pending_message(message_with_callback->message_sender, send_operation);
+}
+
+ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout)
+{
+    ASYNC_OPERATION_HANDLE result;
 
     if ((message_sender == NULL) ||
         (message == NULL))
     {
-        result = __FAILURE__;
+        LogError("Bad parameters: message_sender = %p, message = %p");
+        result = NULL;
     }
     else
     {
-        MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
+        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
         {
-            result = __FAILURE__;
+            LogError("Message sender in ERROR state");
+            result = NULL;
         }
         else
         {
-            MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)malloc(sizeof(MESSAGE_WITH_CALLBACK));
-            if (message_with_callback == NULL)
+            result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler);
+            if (result == NULL)
             {
-                result = __FAILURE__;
+                LogError("Failed allocating context for send");
             }
             else
             {
-                MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
+                MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result);
+                ASYNC_OPERATION_HANDLE* new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count + 1));
                 if (new_messages == NULL)
                 {
-                    free(message_with_callback);
-                    result = __FAILURE__;
+                    LogError("Failed allocating memory for pending sends");
+                    async_operation_destroy(result);
+                    result = NULL;
                 }
                 else
                 {
-                    result = 0;
-
-                    message_sender_instance->messages = new_messages;
-                    if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
+                    message_with_callback->timeout = timeout;
+                    message_sender->messages = new_messages;
+                    if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
                     {
                         message_with_callback->message = message_clone(message);
                         if (message_with_callback->message == NULL)
                         {
-                            free(message_with_callback);
-                            result = __FAILURE__;
+                            LogError("Cannot clone message for placing it in the pending sends list");
+                            async_operation_destroy(result);
+                            result = NULL;
                         }
 
                         message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
@@ -757,42 +880,42 @@
                         message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
                     }
 
-                    if (result == 0)
+                    if (result != NULL)
                     {
                         message_with_callback->on_message_send_complete = on_message_send_complete;
                         message_with_callback->context = callback_context;
-                        message_with_callback->message_sender = message_sender_instance;
+                        message_with_callback->message_sender = message_sender;
 
-                        message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
-                        message_sender_instance->message_count++;
+                        message_sender->messages[message_sender->message_count] = result;
+                        message_sender->message_count++;
 
-                        if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
+                        if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
                         {
-                            switch (send_one_message(message_sender_instance, message_with_callback, message))
+                            switch (send_one_message(message_sender, result, message))
                             {
                             default:
                             case SEND_ONE_MESSAGE_ERROR:
-                            
-                                remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
-                                result = __FAILURE__;
+                                LogError("Error sending message");
+                                remove_pending_message_by_index(message_sender, message_sender->message_count - 1);
+                                async_operation_destroy(result);
+                                result = NULL;
                                 break;
 
                             case SEND_ONE_MESSAGE_BUSY:
                                 message_with_callback->message = message_clone(message);
                                 if (message_with_callback->message == NULL)
                                 {
-                                    free(message_with_callback);
-                                    result = __FAILURE__;
+                                    LogError("Error cloning message for placing it in the pending sends list");
+                                    async_operation_destroy(result);
+                                    result = NULL;
                                 }
                                 else
                                 {
                                     message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
-                                    result = 0;
                                 }
                                 break;
 
                             case SEND_ONE_MESSAGE_OK:
-                                result = 0;
                                 break;
                             }
                         }
@@ -801,14 +924,18 @@
             }
         }
     }
+
     return result;
 }
 
 void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
 {
-    MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-    if (message_sender_instance != NULL)
+    if (message_sender == NULL)
     {
-        message_sender_instance->is_trace_on = traceOn ? 1 : 0;
+        LogError("NULL message_sender");
+    }
+    else
+    {
+        message_sender->is_trace_on = traceOn ? 1 : 0;
     }
 }
--- a/messaging.c	Mon Sep 25 13:38:40 2017 -0700
+++ b/messaging.c	Sat Oct 21 20:12:19 2017 +0000
@@ -2,7 +2,9 @@
 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
 
 #include <stdlib.h>
+#include <stdint.h>
 #include <stdbool.h>
+#include "azure_c_shared_utility/xlogging.h"
 #include "azure_uamqp_c/amqpvalue.h"
 #include "azure_uamqp_c/amqp_definitions.h"
 
@@ -10,8 +12,10 @@
 {
     AMQP_VALUE result;
     SOURCE_HANDLE source = source_create();
+
     if (source == NULL)
     {
+        LogError("NULL source");
         result = NULL;
     }
     else
@@ -19,17 +23,27 @@
         AMQP_VALUE address_value = amqpvalue_create_string(address);
         if (address_value == NULL)
         {
+            LogError("Cannot create address AMQP string");
             result = NULL;
         }
         else
         {
             if (source_set_address(source, address_value) != 0)
             {
+                LogError("Cannot set address on source");
                 result = NULL;
             }
             else
             {
                 result = amqpvalue_create_source(source);
+                if (result == NULL)
+                {
+                    LogError("Cannot create source");
+                }
+                else
+                {
+                    /* all ok */
+                }
             }
 
             amqpvalue_destroy(address_value);
@@ -45,8 +59,10 @@
 {
     AMQP_VALUE result;
     TARGET_HANDLE target = target_create();
+
     if (target == NULL)
     {
+        LogError("NULL target");
         result = NULL;
     }
     else
@@ -54,17 +70,27 @@
         AMQP_VALUE address_value = amqpvalue_create_string(address);
         if (address_value == NULL)
         {
+            LogError("Cannot create address AMQP string");
             result = NULL;
         }
         else
         {
             if (target_set_address(target, address_value) != 0)
             {
+                LogError("Cannot set address on target");
                 result = NULL;
             }
             else
             {
                 result = amqpvalue_create_target(target);
+                if (result == NULL)
+                {
+                    LogError("Cannot create target");
+                }
+                else
+                {
+                    /* all ok */
+                }
             }
 
             amqpvalue_destroy(address_value);
@@ -82,11 +108,21 @@
     RECEIVED_HANDLE received = received_create(section_number, section_offset);
     if (received == NULL)
     {
+        LogError("Cannot create RECEIVED delivery state handle");
         result = NULL;
     }
     else
     {
         result = amqpvalue_create_received(received);
+        if (result == NULL)
+        {
+            LogError("Cannot create RECEIVED delivery state AMQP value");
+        }
+        else
+        {
+            /* all ok */
+        }
+
         received_destroy(received);
     }
 
@@ -99,11 +135,21 @@
     ACCEPTED_HANDLE accepted = accepted_create();
     if (accepted == NULL)
     {
+        LogError("Cannot create ACCEPTED delivery state handle");
         result = NULL;
     }
     else
     {
         result = amqpvalue_create_accepted(accepted);
+        if (result == NULL)
+        {
+            LogError("Cannot create ACCEPTED delivery state AMQP value");
+        }
+        else
+        {
+            /* all ok */
+        }
+
         accepted_destroy(accepted);
     }
 
@@ -116,6 +162,7 @@
     REJECTED_HANDLE rejected = rejected_create();
     if (rejected == NULL)
     {
+        LogError("Cannot create REJECTED delivery state handle");
         result = NULL;
     }
     else
@@ -128,6 +175,7 @@
             error_handle = error_create(error_condition);
             if (error_handle == NULL)
             {
+                LogError("Cannot create error AMQP value for REJECTED state");
                 error_constructing = true;
             }
             else
@@ -135,12 +183,14 @@
                 if ((error_description != NULL) &&
                     (error_set_description(error_handle, error_description) != 0))
                 {
+                    LogError("Cannot set error description on error AMQP value for REJECTED state");
                     error_constructing = true;
                 }
                 else
                 {
                     if (rejected_set_error(rejected, error_handle) != 0)
                     {
+                        LogError("Cannot set error on REJECTED state handle");
                         error_constructing = true;
                     }
                 }
@@ -156,6 +206,14 @@
         else
         {
             result = amqpvalue_create_rejected(rejected);
+            if (result == NULL)
+            {
+                LogError("Cannot create REJECTED delivery state AMQP value");
+            }
+            else
+            {
+                /* all ok */
+            }
         }
 
         rejected_destroy(rejected);
@@ -170,11 +228,21 @@
     RELEASED_HANDLE released = released_create();
     if (released == NULL)
     {
+        LogError("Cannot create RELEASED delivery state handle");
         result = NULL;
     }
     else
     {
         result = amqpvalue_create_released(released);
+        if (result == NULL)
+        {
+            LogError("Cannot create RELEASED delivery state AMQP value");
+        }
+        else
+        {
+            /* all ok */
+        }
+
         released_destroy(released);
     }
 
@@ -187,19 +255,37 @@
     MODIFIED_HANDLE modified = modified_create();
     if (modified == NULL)
     {
+        LogError("Cannot create MODIFIED delivery state handle");
         result = NULL;
     }
     else
     {
-        if ((modified_set_delivery_failed(modified, delivery_failed) != 0) ||
-            (modified_set_undeliverable_here(modified, undeliverable_here) != 0) ||
-            ((message_annotations != NULL) && (modified_set_message_annotations(modified, message_annotations) != 0)))
+        if (modified_set_delivery_failed(modified, delivery_failed) != 0)
+        {
+            LogError("Cannot set delivery failed on MODIFIED delivery state");
+            result = NULL;
+        }
+        else if (modified_set_undeliverable_here(modified, undeliverable_here) != 0)
         {
+            LogError("Cannot set undeliverable here on MODIFIED delivery state");
+            result = NULL;
+        }
+        else if ((message_annotations != NULL) && (modified_set_message_annotations(modified, message_annotations) != 0))
+        {
+            LogError("Cannot set message annotations on MODIFIED delivery state");
             result = NULL;
         }
         else
         {
             result = amqpvalue_create_modified(modified);
+            if (result == NULL)
+            {
+                LogError("Cannot create MODIFIED delivery state AMQP value");
+            }
+            else
+            {
+                /* all ok */
+            }
         }
 
         modified_destroy(modified);