Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
10:75c5e0d8537d
Parent:
9:8abcd13782f4
Child:
11:62d7b956e76e
--- a/iothubtransportamqp.c	Tue Feb 16 14:24:48 2016 -0800
+++ b/iothubtransportamqp.c	Fri Mar 11 16:59:34 2016 -0800
@@ -5,11 +5,15 @@
 #ifdef _CRTDBG_MAP_ALLOC
 #include <crtdbg.h>
 #endif
+#include <stdint.h>
+#include <time.h>
+#include <limits.h>
 #include "gballoc.h"
 
 #include "cbs.h"
 #include "link.h"
 #include "message.h"
+#include "amqpvalue.h"
 #include "message_receiver.h"
 #include "message_sender.h"
 #include "messaging.h"
@@ -35,7 +39,6 @@
 #define RESULT_FAILURE 1
 #define RESULT_TIMEOUT 2
 
-#define TIME_MAX SIZE_MAX
 #define RFC1035_MAX_FQDN_LENGTH 255
 #define DEFAULT_IOTHUB_AMQP_PORT 5671
 #define DEFAULT_SAS_TOKEN_LIFETIME_MS 3600000
@@ -43,7 +46,7 @@
 #define DEFAULT_MESSAGE_SEND_TIMEOUT_MS 300000
 #define CBS_AUDIENCE "servicebus.windows.net:sastoken"
 #define DEFAULT_CONTAINER_ID "default_container_id"
-#define DEFAULT_INCOMING_WINDOW_SIZE SIZE_MAX
+#define DEFAULT_INCOMING_WINDOW_SIZE UINT_MAX
 #define DEFAULT_OUTGOING_WINDOW_SIZE 100
 #define MESSAGE_RECEIVER_LINK_NAME "receiver-link"
 #define MESSAGE_RECEIVER_TARGET_ADDRESS "ingress-rx"
@@ -63,71 +66,73 @@
 
 typedef struct AMQP_TRANSPORT_STATE_TAG
 {
-	// FQDN of the IoT Hub.
-	STRING_HANDLE iotHubHostFqdn;
-	// AMQP port of the IoT Hub.
-	int iotHubPort;
-	// Certificates to be used by the TLS I/O.
-	char* trusted_certificates;
-	// Key associated to the device to be used.
-	STRING_HANDLE deviceKey;
-	// Address to which the transport will connect to and send events.
-	STRING_HANDLE targetAddress;
-	// Address to which the transport will connect to and receive messages from.
-	STRING_HANDLE messageReceiveAddress;
-	// A component of the SAS token. Currently this must be an empty string.
-	STRING_HANDLE sasTokenKeyName;
-	// Internal parameter that identifies the current logical device within the service.
-	STRING_HANDLE devicesPath;
-	// How long a SAS token created by the transport is valid, in milliseconds.
-	size_t sas_token_lifetime;
-	// Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds.
-	size_t sas_token_refresh_time;
-	// Maximum time the transport waits for  uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds.
-	size_t cbs_request_timeout;
-	// Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds.
-	size_t message_send_timeout;
-	// Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds.
-	size_t connection_timeout;
-	// Saved reference to the IoTHub LL Client.
-	IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
-	
-	// TSL I/O transport.
-	XIO_HANDLE tls_io;
-	// Pointer to the function that creates the TLS I/O (internal use only).
-	TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider;
-	// AMQP SASL I/O transport created on top of the TLS I/O layer.
-	XIO_HANDLE sasl_io;
-	// AMQP SASL I/O mechanism to be used.
-	SASL_MECHANISM_HANDLE sasl_mechanism;
-	// AMQP connection.
-	CONNECTION_HANDLE connection;
-	// Current AMQP connection state;
-	AMQP_MANAGEMENT_STATE connection_state;
-	// Last time the AMQP connection establishment was initiated.
-	size_t connection_establish_time;
-	// AMQP session.
-	SESSION_HANDLE session;
-	// AMQP link used by the event sender.
-	LINK_HANDLE sender_link;
-	// uAMQP event sender.
-	MESSAGE_SENDER_HANDLE message_sender;
-	// Internal flag that controls if messages should be received or not.
-	bool receive_messages;
-	// AMQP link used by the message receiver.
-	LINK_HANDLE receiver_link;
-	// uAMQP message receiver.
-	MESSAGE_RECEIVER_HANDLE message_receiver;
-	// List with events still pending to be sent. It is provided by the upper layer.
-	PDLIST_ENTRY waitingToSend;
-	// Internal list with the items currently being processed/sent through uAMQP.
-	DLIST_ENTRY inProgress;
-	// Connection instance with the Azure IoT CBS.
-	CBS_HANDLE cbs;
-	// Current state of the CBS connection.
-	CBS_STATE cbs_state;
-	// Time when the current SAS token was created, in seconds since epoch.
-	size_t current_sas_token_create_time;
+    // FQDN of the IoT Hub.
+    STRING_HANDLE iotHubHostFqdn;
+    // AMQP port of the IoT Hub.
+    int iotHubPort;
+    // Certificates to be used by the TLS I/O.
+    char* trusted_certificates;
+    // Key associated to the device to be used.
+    STRING_HANDLE deviceKey;
+    // Address to which the transport will connect to and send events.
+    STRING_HANDLE targetAddress;
+    // Address to which the transport will connect to and receive messages from.
+    STRING_HANDLE messageReceiveAddress;
+    // A component of the SAS token. Currently this must be an empty string.
+    STRING_HANDLE sasTokenKeyName;
+    // Internal parameter that identifies the current logical device within the service.
+    STRING_HANDLE devicesPath;
+    // How long a SAS token created by the transport is valid, in milliseconds.
+    size_t sas_token_lifetime;
+    // Maximum period of time for the transport to wait before refreshing the SAS token it created previously, in milliseconds.
+    size_t sas_token_refresh_time;
+    // Maximum time the transport waits for  uAMQP cbs_put_token() to complete before marking it a failure, in milliseconds.
+    size_t cbs_request_timeout;
+    // Maximum time the transport waits for an event to be sent before marking it a failure, in milliseconds.
+    size_t message_send_timeout;
+    // Maximum time for the connection establishment/retry logic should wait for a connection to succeed, in milliseconds.
+    size_t connection_timeout;
+    // Saved reference to the IoTHub LL Client.
+    IOTHUB_CLIENT_LL_HANDLE iothub_client_handle;
+    
+    // TSL I/O transport.
+    XIO_HANDLE tls_io;
+    // Pointer to the function that creates the TLS I/O (internal use only).
+    TLS_IO_TRANSPORT_PROVIDER tls_io_transport_provider;
+    // AMQP SASL I/O transport created on top of the TLS I/O layer.
+    XIO_HANDLE sasl_io;
+    // AMQP SASL I/O mechanism to be used.
+    SASL_MECHANISM_HANDLE sasl_mechanism;
+    // AMQP connection.
+    CONNECTION_HANDLE connection;
+    // Current AMQP connection state;
+    AMQP_MANAGEMENT_STATE connection_state;
+    // Last time the AMQP connection establishment was initiated.
+    size_t connection_establish_time;
+    // AMQP session.
+    SESSION_HANDLE session;
+    // AMQP link used by the event sender.
+    LINK_HANDLE sender_link;
+    // uAMQP event sender.
+    MESSAGE_SENDER_HANDLE message_sender;
+    // Internal flag that controls if messages should be received or not.
+    bool receive_messages;
+    // AMQP link used by the message receiver.
+    LINK_HANDLE receiver_link;
+    // uAMQP message receiver.
+    MESSAGE_RECEIVER_HANDLE message_receiver;
+    // List with events still pending to be sent. It is provided by the upper layer.
+    PDLIST_ENTRY waitingToSend;
+    // Internal list with the items currently being processed/sent through uAMQP.
+    DLIST_ENTRY inProgress;
+    // Connection instance with the Azure IoT CBS.
+    CBS_HANDLE cbs;
+    // Current state of the CBS connection.
+    CBS_STATE cbs_state;
+    // Time when the current SAS token was created, in seconds since epoch.
+    size_t current_sas_token_create_time;
+    // Mark if device is registered in transport (only one device per transport).
+    bool isRegistered;
 } AMQP_TRANSPORT_INSTANCE;
 
 // This structure is used to track an event being sent and the time it was sent.
@@ -145,29 +150,33 @@
 static STRING_HANDLE concat3Params(const char* prefix, const char* infix, const char* suffix)
 {
     STRING_HANDLE result = NULL;
-    char* concat = NULL;
+    char* concat;
     size_t totalLength = strlen(prefix) + strlen(infix) + strlen(suffix) + 1; // One extra for \0.
 
-    if ((concat = (char*)malloc(sizeof(char) * totalLength)) != NULL)
+    if ((concat = (char*)malloc(totalLength)) != NULL)
     {
-        if (snprintf(concat, totalLength, "%s%s%s", prefix, infix, suffix) > 0)
-        {
-            result = STRING_construct(concat);
-        }
+        (void)strcpy(concat, prefix);
+        (void)strcat(concat, infix);
+        (void)strcat(concat, suffix);
+        result = STRING_construct(concat);
         free(concat);
     }
+    else
+    {
+        result = NULL;
+    }
 
     return result;
 }
 
-static size_t getSecondsSinceEpoch()
+static size_t getSecondsSinceEpoch(void)
 {
-    return (size_t)(difftime(get_time(NULL), (time_t)0) + 0);
+    return (size_t)(difftime(get_time(NULL), (time_t)0));
 }
 
 static EVENT_TRACKER* trackEvent(IOTHUB_MESSAGE_LIST* message, AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    EVENT_TRACKER* event_tracker = NULL;
+    EVENT_TRACKER* event_tracker;
 
     if ((event_tracker = (EVENT_TRACKER*)malloc(sizeof(EVENT_TRACKER))) != NULL)
     {
@@ -190,13 +199,17 @@
 
 static IOTHUB_MESSAGE_LIST* getNextEventToSend(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    IOTHUB_MESSAGE_LIST* message = NULL;
+    IOTHUB_MESSAGE_LIST* message;
 
     if (!DList_IsListEmpty(transport_state->waitingToSend))
     {
         PDLIST_ENTRY list_entry = transport_state->waitingToSend->Flink;
         message = containingRecord(list_entry, IOTHUB_MESSAGE_LIST, entry);
     }
+    else
+    {
+        message = NULL;
+    }
 
     return message;
 }
@@ -231,46 +244,43 @@
     }
 }
 
-static void on_message_send_complete(const void* context, MESSAGE_SEND_RESULT send_result)
+static void on_message_send_complete(void* context, MESSAGE_SEND_RESULT send_result)
 {
-    if (context != NULL)
+    EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context;
+
+    IOTHUB_CLIENT_RESULT iot_hub_send_result;
+
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK] 
+    if (send_result == MESSAGE_SEND_OK)
     {
-        EVENT_TRACKER* event_tracker = (EVENT_TRACKER*)context;
-
-        IOTHUB_CLIENT_RESULT iot_hub_send_result;
+        iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK;
+    }
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback 'on_message_send_complete' shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR]
+    else if (send_result == MESSAGE_SEND_ERROR)
+    {
+        iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR;
+    }
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_142: [The callback ‘on_message_send_complete’ shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_OK if the result received is MESSAGE_SEND_OK] 
-        if (send_result == MESSAGE_SEND_OK)
-        {
-            iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_OK;
-        }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_143: [The callback ‘on_message_send_complete’ shall pass to the upper layer callback an IOTHUB_CLIENT_CONFIRMATION_ERROR if the result received is MESSAGE_SEND_ERROR]
-        else if (send_result == MESSAGE_SEND_ERROR)
-        {
-            iot_hub_send_result = IOTHUB_CLIENT_CONFIRMATION_ERROR;
-        }
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback 'on_message_send_complete' shall invoke the upper layer callback for message received if provided] 
+    if (event_tracker->message->callback != NULL)
+    {
+        event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context);
+    }
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_102: [The callback ‘on_message_send_complete’ shall invoke the upper layer callback for message received if provided] 
-        if (event_tracker->message->callback != NULL)
-        {
-            event_tracker->message->callback(iot_hub_send_result, event_tracker->message->context);
-        }
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback 'on_message_send_complete' shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()]
+    IoTHubMessage_Destroy(event_tracker->message->messageHandle);
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback 'on_message_send_complete' shall destroy the IOTHUB_MESSAGE_LIST instance]
+    free(event_tracker->message);
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_151: [The callback ‘on_message_send_complete’ shall destroy the message handle (IOTHUB_MESSAGE_HANDLE) using IoTHubMessage_Destroy()]
-        IoTHubMessage_Destroy(event_tracker->message->messageHandle);
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_152: [The callback ‘on_message_send_complete’ shall destroy the IOTHUB_MESSAGE_LIST instance]
-        free(event_tracker->message);
-
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback ‘on_message_send_complete’ shall remove the target message from the in-progress list after the upper layer callback] 
-        if (isEventInInProgressList(event_tracker))
-        {
-            removeEventFromInProgressList(event_tracker);
-            destroyEventTracker(event_tracker);
-        }
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_100: [The callback 'on_message_send_complete' shall remove the target message from the in-progress list after the upper layer callback] 
+    if (isEventInInProgressList(event_tracker))
+    {
+        removeEventFromInProgressList(event_tracker);
+        destroyEventTracker(event_tracker);
     }
 }
 
-static void on_put_token_complete(const void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
+static void on_put_token_complete(void* context, CBS_OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
 {
     AMQP_TRANSPORT_INSTANCE* transportState = (AMQP_TRANSPORT_INSTANCE*)context;
 
@@ -280,16 +290,14 @@
     }
 }
 
-AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
+static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
 {
     AMQP_VALUE result = NULL;
-    IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result = IOTHUBMESSAGE_REJECTED;
+    IOTHUBMESSAGE_DISPOSITION_RESULT disposition_result;
 
-    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback ‘on_message_received’ shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] 
-    IOTHUB_CLIENT_LL_HANDLE iothub_client_handle = (IOTHUB_CLIENT_LL_HANDLE)context;
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_104: [The callback 'on_message_received' shall invoke IoTHubClient_LL_MessageCallback() passing the client and the incoming message handles as parameters] 
     IOTHUB_MESSAGE_HANDLE iothub_message = NULL;
     MESSAGE_BODY_TYPE body_type;
-
     
     if (message_get_body_type(message, &body_type) != 0)
     {
@@ -313,23 +321,24 @@
     
     if (iothub_message == NULL)
     {
+        disposition_result = IOTHUBMESSAGE_REJECTED;
         LogError("Transport failed processing the message received.\r\n");
     }
     else
     {
         disposition_result = IoTHubClient_LL_MessageCallback((IOTHUB_CLIENT_LL_HANDLE)context, iothub_message);
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback ‘on_message_received’ shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_105: [The callback 'on_message_received' shall return the result of messaging_delivery_accepted() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ACCEPTED] 
         if (disposition_result == IOTHUBMESSAGE_ACCEPTED)
         {
             result = messaging_delivery_accepted();
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback ‘on_message_received’ shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_106: [The callback 'on_message_received' shall return the result of messaging_delivery_released() if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_ABANDONED] 
         else if (disposition_result == IOTHUBMESSAGE_ABANDONED)
         {
             result = messaging_delivery_released();
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback ‘on_message_received’ shall return the result of messaging_delivery_rejected(“Rejected by application”, “Rejected by application”) if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_107: [The callback 'on_message_received' shall return the result of messaging_delivery_rejected("Rejected by application", "Rejected by application") if the IoTHubClient_LL_MessageCallback() returns IOTHUBMESSAGE_REJECTED] 
         else if (disposition_result == IOTHUBMESSAGE_REJECTED)
         {
             result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
@@ -339,61 +348,60 @@
     return result;
 }
 
-XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates)
+static XIO_HANDLE getTLSIOTransport(const char* fqdn, int port, const char* certificates)
 {
-	TLSIO_CONFIG tls_io_config = { fqdn, port };
-	const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio();
-	return xio_create(io_interface_description, &tls_io_config, NULL);
+    TLSIO_CONFIG tls_io_config = { fqdn, port };
+    const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio();
+    return xio_create(io_interface_description, &tls_io_config, NULL);
 }
 
 static void destroyConnection(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    if (transport_state != NULL)
+    if (transport_state->cbs != NULL)
     {
-        if (transport_state->cbs != NULL)
-        {
-            cbs_destroy(transport_state->cbs);
-            transport_state->cbs = NULL;
-        }
+        cbs_destroy(transport_state->cbs);
+        transport_state->cbs = NULL;
+    }
 
-        if (transport_state->session != NULL)
-        {
-            session_destroy(transport_state->session);
-            transport_state->session = NULL;
-        }
+    if (transport_state->session != NULL)
+    {
+        session_destroy(transport_state->session);
+        transport_state->session = NULL;
+    }
 
-        if (transport_state->connection != NULL)
-        {
-            connection_destroy(transport_state->connection);
-            transport_state->connection = NULL;
-        }
+    if (transport_state->connection != NULL)
+    {
+        connection_destroy(transport_state->connection);
+        transport_state->connection = NULL;
+    }
 
-        if (transport_state->sasl_io != NULL)
-        {
-            xio_destroy(transport_state->sasl_io);
-            transport_state->sasl_io = NULL;
-        }
+    if (transport_state->sasl_io != NULL)
+    {
+        xio_destroy(transport_state->sasl_io);
+        transport_state->sasl_io = NULL;
+    }
 
-        if (transport_state->sasl_mechanism != NULL)
-        {
-            saslmechanism_destroy(transport_state->sasl_mechanism);
-            transport_state->sasl_mechanism = NULL;
-        }
+    if (transport_state->sasl_mechanism != NULL)
+    {
+        saslmechanism_destroy(transport_state->sasl_mechanism);
+        transport_state->sasl_mechanism = NULL;
+    }
 
-		if (transport_state->tls_io != NULL)
-		{
-			// Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.]
-			xio_destroy(transport_state->tls_io);
-			transport_state->tls_io = NULL;
-		}
-	}
+    if (transport_state->tls_io != NULL)
+    {
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_034: [IoTHubTransportAMQP_Destroy shall destroy the AMQP TLS I/O transport.]
+        xio_destroy(transport_state->tls_io);
+        transport_state->tls_io = NULL;
+    }
 }
 
 static void on_amqp_management_state_changed(void* context, AMQP_MANAGEMENT_STATE new_amqp_management_state, AMQP_MANAGEMENT_STATE previous_amqp_management_state)
 {
-    if (context != NULL)
+    (void)previous_amqp_management_state;
+    AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+    
+    if (transport_state != NULL)
     {
-        AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
         transport_state->connection_state = new_amqp_management_state;
     }
 }
@@ -404,13 +412,13 @@
 
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_110: [IoTHubTransportAMQP_DoWork shall create the TLS IO using transport_state->io_transport_provider callback function] 
     if (transport_state->tls_io == NULL &&
-		(transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL)
+        (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort, transport_state->trusted_certificates)) == NULL)
     {
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
         result = RESULT_FAILURE;
-		LogError("Failed to obtain a TLS I/O transport layer.\r\n");
+        LogError("Failed to obtain a TLS I/O transport layer.\r\n");
     }
-    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP’s saslmechanism_create() API] 
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] 
     else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL)
     {
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
@@ -443,13 +451,13 @@
         }
         else
         {
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter ‘AMQP incoming window’] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] 
             if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0)
             {
                 LogError("Failed to set the AMQP incoming window size.\r\n");
             }
 
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter ‘AMQP outgoing window’] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] 
             if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0)
             {
                 LogError("Failed to set the AMQP outgoing window size.\r\n");
@@ -463,7 +471,7 @@
                 LogError("Failed to create the CBS connection.\r\n");
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] 
-            else if ((cbs_open(transport_state->cbs)) != 0)
+            else if (cbs_open(transport_state->cbs) != 0)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
                 result = RESULT_FAILURE;
@@ -488,11 +496,11 @@
 
 static int startAuthentication(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    int result = RESULT_FAILURE;
+    int result;
 
     size_t sas_token_create_time = getSecondsSinceEpoch(); // I.e.: NOW, in seconds since epoch.
     
-    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to ‘sas_token_lifetime’ milliseconds from the time of creation]
+    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_083: [Each new SAS token created by the transport shall be valid for up to 'sas_token_lifetime' milliseconds from the time of creation]
     size_t new_expiry_time = sas_token_create_time + (transport_state->sas_token_lifetime / 1000);
 
     STRING_HANDLE newSASToken = SASToken_Create(transport_state->deviceKey, transport_state->devicesPath, transport_state->sasTokenKeyName, new_expiry_time);
@@ -500,10 +508,12 @@
     if (newSASToken == NULL)
     {
         LogError("Could not generate a new SAS token for the CBS\r\n");
+        result = RESULT_FAILURE;
     }
     else if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(newSASToken), on_put_token_complete, transport_state) != RESULT_OK)
     {
         LogError("Failed applying new SAS token to CBS\r\n");
+        result = RESULT_FAILURE;
     }
     else
     {
@@ -514,7 +524,9 @@
 
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_145: [Each new SAS token created shall be deleted from memory immediately after sending it to CBS]
     if (newSASToken != NULL)
+    {
         STRING_delete(newSASToken);
+    }
 
     return result;
 }
@@ -533,10 +545,10 @@
     {
         EVENT_TRACKER* event_tracker = containingRecord(entry, EVENT_TRACKER, entry);
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to ‘message_send_timeout’ milliseconds] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_085: [IoTHubTransportAMQP_DoWork shall attempt to send all the queued messages for up to 'message_send_timeout' milliseconds] 
         if (difftime(current_time, event_tracker->time_sent) * 1000 >= transport_state->message_send_timeout)
         {
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a ‘message_send_timeout’ occurs the timed out events removed from the inProgress and the upper layer notified of the send error] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_120: [If a 'message_send_timeout' occurs the timed out events removed from the inProgress and the upper layer notified of the send error] 
             on_message_send_complete(event_tracker, MESSAGE_SEND_ERROR);
         }
 
@@ -546,9 +558,9 @@
 
 static void attachDeviceClientTypeToLink(LINK_HANDLE link)
 {
-    fields attach_properties = NULL;
-    AMQP_VALUE deviceClientTypeKeyName = NULL;
-    AMQP_VALUE deviceClientTypeValue = NULL;
+    fields attach_properties;
+    AMQP_VALUE deviceClientTypeKeyName;
+    AMQP_VALUE deviceClientTypeValue;
     int result;
 
     //
@@ -565,46 +577,49 @@
     {
         LogError("Failed to create the map for device client type.\r\n");
     }
-    else if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL)
-    {
-        LogError("Failed to create the key name for the device client type.\r\n");
-    }
-    else if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL)
-    {
-        LogError("Failed to create the key value for the device client type.\r\n");
-    }
-    else if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0)
+    else
     {
-        LogError("Failed to set the property map for the device client type.  Error code is: %d\r\n", result);
+        if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL)
+        {
+            LogError("Failed to create the key name for the device client type.\r\n");
+        }
+        else
+        {
+            if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL)
+            {
+                LogError("Failed to create the key value for the device client type.\r\n");
+            }
+            else
+            {
+                if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0)
+                {
+                    LogError("Failed to set the property map for the device client type.  Error code is: %d\r\n", result);
+                }
+                else if ((result = link_set_attach_properties(link, attach_properties)) != 0)
+                {
+                    LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result);
+                }
+
+                amqpvalue_destroy(deviceClientTypeValue);
+            }
+
+            amqpvalue_destroy(deviceClientTypeKeyName);
+        }
+
+        amqpvalue_destroy(attach_properties);
     }
-    else if ((result = link_set_attach_properties(link, attach_properties)) != 0)
-    {
-        LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result);
-}
-    amqpvalue_destroy(attach_properties);
-    amqpvalue_destroy(deviceClientTypeKeyName);
-    amqpvalue_destroy(deviceClientTypeValue);
-
 }
 
-static int destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
+static void destroyEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
-    int result = RESULT_FAILURE;
-
     if (transport_state->message_sender != NULL)
     {
         messagesender_destroy(transport_state->message_sender);
-
         transport_state->message_sender = NULL;
 
         link_destroy(transport_state->sender_link);
-
         transport_state->sender_link = NULL;
-
-        result = RESULT_OK;
     }
-
-    return result;
 }
 
 static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
@@ -616,7 +631,7 @@
         AMQP_VALUE source = NULL;
         AMQP_VALUE target = NULL;
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using ‘source’ as “ingress”, target as the IoT hub FQDN, link name as “sender-link” and role as ‘role_sender’] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using 'source' as "ingress", target as the IoT hub FQDN, link name as "sender-link" and role as 'role_sender'] 
         if ((source = messaging_create_source(MESSAGE_SENDER_SOURCE_ADDRESS)) == NULL)
         {
             LogError("Failed creating AMQP messaging source attribute.\r\n");
@@ -632,7 +647,7 @@
         }
         else
         {
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter ‘Link MAX message size’]
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size']
             if (link_set_max_message_size(transport_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
             {
                 LogError("Failed setting AMQP link max message size.\r\n");
@@ -704,7 +719,7 @@
         AMQP_VALUE source = NULL;
         AMQP_VALUE target = NULL;
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using ‘source’ as messageReceiveAddress, target as the “ingress-rx”, link name as “receiver-link” and role as ‘role_receiver’] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using 'source' as messageReceiveAddress, target as the "ingress-rx", link name as "receiver-link" and role as 'role_receiver'] 
         if ((source = messaging_create_source(STRING_c_str(transport_state->messageReceiveAddress))) == NULL)
         {
             LogError("Failed creating AMQP message receiver source attribute.\r\n");
@@ -726,7 +741,7 @@
         }
         else
         {
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter ‘Link MAX message size’]
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size']
             if (link_set_max_message_size(transport_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK)
             {
                 LogError("Failed setting AMQP link max message size for message receiver.\r\n");
@@ -743,7 +758,7 @@
             else
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_079: [IoTHubTransportAMQP_DoWork shall open the AMQP message receiver using messagereceiver_open() AMQP API, passing a callback function for handling C2D incoming messages] 
-                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the ‘on_message_received’ as the callback function] 
+                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_123: [IoTHubTransportAMQP_DoWork shall create each AMQP message_receiver passing the 'on_message_received' as the callback function] 
                 if (messagereceiver_open(transport_state->message_receiver, on_message_received, (const void*)iothub_client_handle) != RESULT_OK)
                 {
                     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_080: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established] 
@@ -765,23 +780,129 @@
     return result;
 }
 
+static int addPropertiesTouAMQPMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, MESSAGE_HANDLE uamqp_message)
+{
+    int result;
+    MAP_HANDLE properties_map;
+    const char* const* propertyKeys;
+    const char* const* propertyValues;
+    size_t propertyCount;
+
+    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_007: [The IoTHub message properties shall be obtained by calling IoTHubMessage_Properties.] */
+    properties_map = IoTHubMessage_Properties(iothub_message_handle);
+    if (properties_map == NULL)
+    {
+        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+        LogError("Failed to get property map from IoTHub message.\r\n");
+        result = __LINE__;
+    }
+    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_015: [The actual keys and values, as well as the number of properties shall be obtained by calling Map_GetInternals on the handle obtained from IoTHubMessage_Properties.] */
+    else if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
+    {
+        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+        LogError("Failed to get the internals of the property map.\r\n");
+        result = __LINE__;
+    }
+    else
+    {
+        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_016: [If the number of properties is 0, no uAMQP map shall be created and no application properties shall be set on the uAMQP message.] */
+        if (propertyCount != 0)
+        {
+            size_t i;
+            /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_009: [The uAMQP map shall be created by calling amqpvalue_create_map.] */
+            AMQP_VALUE uamqp_map = amqpvalue_create_map();
+            if (uamqp_map == NULL)
+            {
+                /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+                LogError("Failed to create uAMQP map for the properties.\r\n");
+                result = __LINE__;
+            }
+            else
+            {
+                for (i = 0; i < propertyCount; i++)
+                {
+                    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_010: [A key uAMQP value shall be created by using amqpvalue_create_string.] */
+                    AMQP_VALUE map_key_value = amqpvalue_create_string(propertyKeys[i]);
+                    if (map_key_value == NULL)
+                    {
+                        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+                        LogError("Failed to create uAMQP property key value.\r\n");
+                        break;
+                    }
+
+                    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_011: [A value uAMQP value shall be created by using amqpvalue_create_string.] */
+                    AMQP_VALUE map_value_value = amqpvalue_create_string(propertyValues[i]);
+                    if (map_value_value == NULL)
+                    {
+                        amqpvalue_destroy(map_key_value);
+                        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+                        LogError("Failed to create uAMQP property key value.\r\n");
+                        break;
+                    }
+
+                    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_008: [All properties shall be transferred to a uAMQP map.] */
+                    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_012: [The key/value pair for the property shall be set into the uAMQP property map by calling amqpvalue_map_set_value.] */
+                    if (amqpvalue_set_map_value(uamqp_map, map_key_value, map_value_value) != 0)
+                    {
+                        amqpvalue_destroy(map_key_value);
+                        amqpvalue_destroy(map_value_value);
+                        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+                        LogError("Failed to create uAMQP property key value.\r\n");
+                        break;
+                    }
+
+                    amqpvalue_destroy(map_key_value);
+                    amqpvalue_destroy(map_value_value);
+                }
+
+                if (i < propertyCount)
+                {
+                    result = __LINE__;
+                }
+                else
+                {
+                    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_013: [After all properties have been filled in the uAMQP map, the uAMQP properties map shall be set on the uAMQP message by calling message_set_application_properties.] */
+                    if (message_set_application_properties(uamqp_message, uamqp_map) != 0)
+                    {
+                        /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+                        LogError("Failed to transfer the message properties to the uAMQP message.\r\n");
+                        result = __LINE__;
+                    }
+                    else
+                    {
+                        result = 0;
+                    }
+                }
+
+                amqpvalue_destroy(uamqp_map);
+            }
+        }
+        else
+        {
+            result = 0;
+        }
+    }
+
+    return result;
+}
+
 static int sendPendingEvents(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
     int result = RESULT_OK;
-    IOTHUB_MESSAGE_LIST* message = NULL;
+    IOTHUB_MESSAGE_LIST* message;
 
     while ((message = getNextEventToSend(transport_state)) != NULL)
     {
         result = RESULT_FAILURE;
         
         IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message->messageHandle);
-        const unsigned char* messageContent = NULL;
-        size_t messageContentSize = 0;
+        const unsigned char* messageContent;
+        size_t messageContentSize;
         MESSAGE_HANDLE amqp_message;
         bool is_message_error = false;
         EVENT_TRACKER* event_tracker;
 
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an “in-progress” list right before processing them for sending]
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_086: [IoTHubTransportAMQP_DoWork shall move queued events to an "in-progress" list right before processing them for sending]
         if ((event_tracker = trackEvent(message, transport_state)) == NULL)
         {
             LogError("Failed tracking the event to be sent.\r\n");
@@ -796,7 +917,7 @@
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()] 
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_090: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain the size of its char* representation using strlen()] 
         else if (contentType == IOTHUBMESSAGE_STRING &&
-            ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL || (messageContentSize = strlen(messageContent)) <= 0))
+            ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL))
         {
             LogError("Failed getting the STRING representation of the event content to be sent.\r\n");
             is_message_error = true;
@@ -807,30 +928,46 @@
             LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.\r\n");
             is_message_error = true;
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() AMQP API] 
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() uAMQP API] 
         else if ((amqp_message = message_create()) == NULL)
         {
             LogError("Failed allocating the AMQP message for sending the event.\r\n");
         }
         else
         {
-            BINARY_DATA binary_data = { messageContent, messageContentSize };
+            BINARY_DATA binary_data;
+
+            if (contentType == IOTHUBMESSAGE_STRING)
+            {
+                messageContentSize = strlen(messageContent);
+            }
+
+            binary_data.bytes = messageContent;
+            binary_data.length = messageContentSize;
             
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() AMQP API] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] 
             if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK)
             {
                 LogError("Failed setting the body of the AMQP message.\r\n");
             }
             else
             {
-                // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] 
-                if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK)
+                if (addPropertiesTouAMQPMessage(message->messageHandle, amqp_message) != 0)
                 {
-                    LogError("Failed sending the AMQP message.\r\n");
+                    /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
+                    is_message_error = true;
                 }
                 else
                 {
-                    result = RESULT_OK;
+                    // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] 
+                    if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, event_tracker) != RESULT_OK)
+                    {
+                        LogError("Failed sending the AMQP message.\r\n");
+                    }
+                    else
+                    {
+                        result = RESULT_OK;
+                    }
                 }
             }
         }
@@ -888,7 +1025,7 @@
 {
     AMQP_TRANSPORT_INSTANCE* transport_state = NULL;
     bool cleanup_required = false;
-    int deviceIdLength = -1;
+    size_t deviceIdLength;
 
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_005: [If parameter config (or its fields) is NULL then IoTHubTransportAMQP_Create shall fail and return NULL.] 
     if (config == NULL || config->upperConfig == NULL || config->waitingToSend == NULL)
@@ -940,7 +1077,7 @@
     }
     else
     {
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transport’s internal state structure fails.]
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_009: [IoTHubTransportAMQP_Create shall fail and return NULL if memory allocation of the transport's internal state structure fails.]
         transport_state = (AMQP_TRANSPORT_INSTANCE*)malloc(sizeof(AMQP_TRANSPORT_INSTANCE));
 
         if (transport_state == NULL)
@@ -951,7 +1088,7 @@
         {
             transport_state->iotHubHostFqdn = NULL;
             transport_state->iotHubPort = DEFAULT_IOTHUB_AMQP_PORT;
-			transport_state->trusted_certificates = NULL;
+            transport_state->trusted_certificates = NULL;
             transport_state->deviceKey = NULL;
             transport_state->devicesPath = NULL;
             transport_state->messageReceiveAddress = NULL;
@@ -975,7 +1112,8 @@
             transport_state->sender_link = NULL;
             transport_state->session = NULL;
             transport_state->tls_io = NULL;
-			transport_state->tls_io_transport_provider = getTLSIOTransport;
+            transport_state->tls_io_transport_provider = getTLSIOTransport;
+            transport_state->isRegistered = false;
 
             transport_state->waitingToSend = config->waitingToSend;
             DList_InitializeListHead(&transport_state->inProgress);
@@ -986,7 +1124,7 @@
                 LogError("Failed to set transport_state->iotHubHostFqdn.\r\n");
                 cleanup_required = true;
             }
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + “/devices/” + deviceId.] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + "/devices/" + deviceId.] 
             else if ((transport_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", config->upperConfig->deviceId)) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_013: [If creating devicesPath fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] 
@@ -1014,7 +1152,7 @@
                 LogError("Failed to allocate transport_state->sasTokenKeyName.\r\n");
                 cleanup_required = true;
             }
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport’s own deviceKey field] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field] 
             else if ((transport_state->deviceKey = STRING_new()) == NULL ||
                     STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0)
             {
@@ -1127,7 +1265,7 @@
             trigger_connection_retry = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().]
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than ‘sas_token_refresh_time’ milliseconds]
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_082: [IoTHubTransportAMQP_DoWork shall refresh the SAS token if the current token has been used for more than 'sas_token_refresh_time' milliseconds]
         else if ((transport_state->cbs_state == CBS_STATE_IDLE || isSasTokenRefreshRequired(transport_state)) &&
                 startAuthentication(transport_state) != RESULT_OK)
         {
@@ -1135,7 +1273,7 @@
             LogError("Failed authenticating AMQP connection within CBS.\r\n");
             trigger_connection_retry = true;
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for ‘cbs_request_timeout’ milliseconds for the cbs_put_token() to complete before failing due to timeout]
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout]
         else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS &&  
                 verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT)
         {
@@ -1169,7 +1307,6 @@
             else if (sendPendingEvents(transport_state) != RESULT_OK)
             {
                 LogError("AMQP transport failed sending events.\r\n");
-                trigger_connection_retry = true;
             }
         }
 
@@ -1187,7 +1324,7 @@
     }
 }
 
-static int IoTHubTransportAMQP_Subscribe(TRANSPORT_HANDLE handle)
+static int IoTHubTransportAMQP_Subscribe(IOTHUB_DEVICE_HANDLE handle)
 {
     int result;
     
@@ -1208,7 +1345,7 @@
     return result;
 }
 
-static void IoTHubTransportAMQP_Unsubscribe(TRANSPORT_HANDLE handle)
+static void IoTHubTransportAMQP_Unsubscribe(IOTHUB_DEVICE_HANDLE handle)
 {
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_039: [IoTHubTransportAMQP_Unsubscribe shall fail if the transport handle parameter received is NULL.] 
     if (handle == NULL)
@@ -1223,7 +1360,7 @@
     }
 }
 
-static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(TRANSPORT_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
+static IOTHUB_CLIENT_RESULT IoTHubTransportAMQP_GetSendStatus(IOTHUB_DEVICE_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
 {
     IOTHUB_CLIENT_RESULT result;
 
@@ -1279,50 +1416,116 @@
     {
         AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
 
-		if (strcmp("trusted_certificates", option) == 0)
-		{
-			transport_state->trusted_certificates = (char*)value;
-			result = IOTHUB_CLIENT_OK;
-		}
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] 
-		else if (strcmp("sas_token_lifetime", option) == 0)
-		{
-			transport_state->sas_token_lifetime = *((size_t*)value);
-			result = IOTHUB_CLIENT_OK;
-		}
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] 
-		else if (strcmp("sas_token_refresh_time", option) == 0)
-		{
-			transport_state->sas_token_refresh_time = *((size_t*)value);
-			result = IOTHUB_CLIENT_OK;
-		}
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] 
-		else if (strcmp("cbs_request_timeout", option) == 0)
-		{
-			transport_state->cbs_request_timeout = *((size_t*)value);
-			result = IOTHUB_CLIENT_OK;
-		}
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK]
-		else if (strcmp("message_send_timeout", option) == 0)
-		{
-			transport_state->message_send_timeout = *((size_t*)value);
-			result = IOTHUB_CLIENT_OK;
-		}
-		// Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] 
-		else
-		{
-			result = IOTHUB_CLIENT_INVALID_ARG;
-			LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
-		}
-	}
+        if (strcmp("trusted_certificates", option) == 0)
+        {
+            transport_state->trusted_certificates = (char*)value;
+            result = IOTHUB_CLIENT_OK;
+        }
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_048: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_lifetime", returning IOTHUB_CLIENT_OK] 
+        else if (strcmp("sas_token_lifetime", option) == 0)
+        {
+            transport_state->sas_token_lifetime = *((size_t*)value);
+            result = IOTHUB_CLIENT_OK;
+        }
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_049: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "sas_token_refresh_time", returning IOTHUB_CLIENT_OK] 
+        else if (strcmp("sas_token_refresh_time", option) == 0)
+        {
+            transport_state->sas_token_refresh_time = *((size_t*)value);
+            result = IOTHUB_CLIENT_OK;
+        }
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_148: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "cbs_request_timeout", returning IOTHUB_CLIENT_OK] 
+        else if (strcmp("cbs_request_timeout", option) == 0)
+        {
+            transport_state->cbs_request_timeout = *((size_t*)value);
+            result = IOTHUB_CLIENT_OK;
+        }
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_149: [IotHubTransportAMQP_SetOption shall save and apply the value if the option name is "message_send_timeout", returning IOTHUB_CLIENT_OK]
+        else if (strcmp("message_send_timeout", option) == 0)
+        {
+            transport_state->message_send_timeout = *((size_t*)value);
+            result = IOTHUB_CLIENT_OK;
+        }
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_047: [If optionName is not an option supported then IotHubTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] 
+        else
+        {
+            result = IOTHUB_CLIENT_INVALID_ARG;
+            LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
+        }
+    }
 
     return result;
 }
 
+static IOTHUB_DEVICE_HANDLE IoTHubTransportAMQ_Register(TRANSPORT_HANDLE handle, const char* deviceId, const char* deviceKey, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
+{
+    IOTHUB_DEVICE_HANDLE result;
+    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_001: [IoTHubTransportAMQ_Register shall return NULL if deviceId, deviceKey or waitingToSend are NULL.] 
+    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_005: [IoTHubTransportAMQ_Register shall return NULL if the TRANSPORT_HANDLE is NULL.]
+    if ((handle ==NULL) || (deviceId == NULL) || (deviceKey == NULL) || (waitingToSend == NULL))
+    {
+        result = NULL;
+    }
+    else
+    {
+        AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)handle;
+
+        STRING_HANDLE devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", deviceId);
+        if (devicesPath == NULL)
+        {
+            LogError("Could not create a comparison string");
+            result = NULL;
+        }
+        else
+        {
+            // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_002: [IoTHubTransportAMQ_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransportAMQP_Create.] 
+            if (strcmp(STRING_c_str(transport_state->devicesPath), STRING_c_str(devicesPath)) != 0)
+            {
+                LogError("Attemping to add new device to AMQP transport, not allowed.");
+                result = NULL;
+            }
+            else if (strcmp(STRING_c_str(transport_state->deviceKey), deviceKey) != 0)
+            {
+                LogError("Attemping to add new device to AMQP transport, not allowed.");
+                result = NULL;
+            }
+            else
+            {
+                if (transport_state->isRegistered == true)
+                {
+                    LogError("Transport already has device registered by id: [%s]", deviceId);
+                    result = NULL;
+                }
+                else
+                {
+                    transport_state->isRegistered = true;
+                    // Codes_SRS_IOTHUBTRANSPORTUAMQP_17_003: [IoTHubTransportAMQ_Register shall return the TRANSPORT_HANDLE as the IOTHUB_DEVICE_HANDLE.] 
+                    result = (IOTHUB_DEVICE_HANDLE)handle;
+                }
+            }
+            STRING_delete(devicesPath);
+        }
+    }
+
+    return result;
+}
+
+// Codes_SRS_IOTHUBTRANSPORTUAMQP_17_004: [IoTHubTransportAMQ_Unregister shall return.] 
+static void IoTHubTransportAMQ_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
+{
+    if (deviceHandle != NULL)
+    {
+        AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)deviceHandle;
+
+        transport_state->isRegistered = false;
+    }
+}
+
 static TRANSPORT_PROVIDER thisTransportProvider = {
     IoTHubTransportAMQP_SetOption, 
     IoTHubTransportAMQP_Create, 
     IoTHubTransportAMQP_Destroy, 
+    IoTHubTransportAMQ_Register,
+    IoTHubTransportAMQ_Unregister,
     IoTHubTransportAMQP_Subscribe, 
     IoTHubTransportAMQP_Unsubscribe, 
     IoTHubTransportAMQP_DoWork,