Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
13:a4af7c301e02
Parent:
12:841a4c36bd36
Child:
14:8e8e42008807
diff -r 841a4c36bd36 -r a4af7c301e02 iothubtransportamqp.c
--- a/iothubtransportamqp.c	Fri Apr 08 13:23:54 2016 -0700
+++ b/iothubtransportamqp.c	Sun Apr 24 16:37:38 2016 -0700
@@ -268,7 +268,7 @@
 
     if (message_get_body_type(message, &body_type) != 0)
     {
-        LogError("Failed to get the type of the message received by the transport.\r\n");
+        LogError("Failed to get the type of the message received by the transport.");
     }
     else
     {
@@ -277,7 +277,7 @@
             BINARY_DATA binary_data;
             if (message_get_body_amqp_data(message, 0, &binary_data) != 0)
             {
-                LogError("Failed to get the body of the message received by the transport.\r\n");
+                LogError("Failed to get the body of the message received by the transport.");
             }
             else
             {
@@ -289,7 +289,7 @@
     if (iothub_message == NULL)
     {
         disposition_result = IOTHUBMESSAGE_REJECTED;
-        LogError("Transport failed processing the message received.\r\n");
+        LogError("Transport failed processing the message received.");
     }
     else
     {
@@ -374,6 +374,16 @@
     }
 }
 
+static void on_connection_io_error(void* context)
+{
+	AMQP_TRANSPORT_INSTANCE* transport_state = (AMQP_TRANSPORT_INSTANCE*)context;
+
+	if (transport_state != NULL)
+	{
+		transport_state->connection_state = AMQP_MANAGEMENT_STATE_ERROR;
+	}
+}
+
 static int establishConnection(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
     int result;
@@ -384,14 +394,14 @@
     {
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_136: [If transport_state->io_transport_provider_callback fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
         result = RESULT_FAILURE;
-        LogError("Failed to obtain a TLS I/O transport layer.\r\n");
+        LogError("Failed to obtain a TLS I/O transport layer.");
     }
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_056: [IoTHubTransportAMQP_DoWork shall create the SASL mechanism using AMQP's saslmechanism_create() API] 
     else if ((transport_state->sasl_mechanism = saslmechanism_create(saslmssbcbs_get_interface(), NULL)) == NULL)
     {
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_057: [If saslmechanism_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
         result = RESULT_FAILURE;
-        LogError("Failed to create a SASL mechanism.\r\n");
+        LogError("Failed to create a SASL mechanism.");
     }
     else
     {
@@ -401,34 +411,34 @@
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_061: [If xio_create() fails creating the SASL I/O layer, IoTHubTransportAMQP_DoWork shall fail and return immediately] 
             result = RESULT_FAILURE;
-            LogError("Failed to create a SASL I/O layer.\r\n");
+            LogError("Failed to create a SASL I/O layer.");
         }
-        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] 
-        else if ((transport_state->connection = connection_create(transport_state->sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL)) == NULL)
+        // Codes_SRS_IOTHUBTRANSPORTAMQP_09_062: [IoTHubTransportAMQP_DoWork shall create the connection with the IoT service using connection_create2() AMQP API, passing the SASL I/O layer, IoT Hub FQDN and container ID as parameters (pass NULL for callbacks)] 
+		else if ((transport_state->connection = connection_create2(transport_state->sasl_io, STRING_c_str(transport_state->iotHubHostFqdn), DEFAULT_CONTAINER_ID, NULL, NULL, NULL, NULL, on_connection_io_error, (void*)transport_state, NULL)) == NULL)
         {
-            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] 
+            // Codes_SRS_IOTHUBTRANSPORTAMQP_09_063: [If connection_create2() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately.] 
             result = RESULT_FAILURE;
-            LogError("Failed to create the AMQP connection.\r\n");
+            LogError("Failed to create the AMQP connection.");
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_137: [IoTHubTransportAMQP_DoWork shall create the AMQP session session_create() AMQP API, passing the connection instance as parameter]
         else if ((transport_state->session = session_create(transport_state->connection, NULL, NULL)) == NULL)
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_138 : [If session_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
             result = RESULT_FAILURE;
-            LogError("Failed to create the AMQP session.\r\n");
+            LogError("Failed to create the AMQP session.");
         }
         else
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_065: [IoTHubTransportAMQP_DoWork shall apply a default value of UINT_MAX for the parameter 'AMQP incoming window'] 
             if (session_set_incoming_window(transport_state->session, (uint32_t)DEFAULT_INCOMING_WINDOW_SIZE) != 0)
             {
-                LogError("Failed to set the AMQP incoming window size.\r\n");
+                LogError("Failed to set the AMQP incoming window size.");
             }
 
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_115: [IoTHubTransportAMQP_DoWork shall apply a default value of 100 for the parameter 'AMQP outgoing window'] 
             if (session_set_outgoing_window(transport_state->session, DEFAULT_OUTGOING_WINDOW_SIZE) != 0)
             {
-                LogError("Failed to set the AMQP outgoing window size.\r\n");
+                LogError("Failed to set the AMQP outgoing window size.");
             }
 
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_066: [IoTHubTransportAMQP_DoWork shall establish the CBS connection using the cbs_create() AMQP API] 
@@ -436,14 +446,14 @@
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_067: [If cbs_create() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately] 
                 result = RESULT_FAILURE;
-                LogError("Failed to create the CBS connection.\r\n");
+                LogError("Failed to create the CBS connection.");
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_139: [IoTHubTransportAMQP_DoWork shall open the CBS connection using the cbs_open() AMQP API] 
             else if (cbs_open(transport_state->cbs) != 0)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_140: [If cbs_open() fails, IoTHubTransportAMQP_DoWork shall fail and return immediately]
                 result = RESULT_FAILURE;
-                LogError("Failed to open the connection with CBS.\r\n");
+                LogError("Failed to open the connection with CBS.");
             }
             else
             {
@@ -475,12 +485,12 @@
 
     if (newSASToken == NULL)
     {
-        LogError("Could not generate a new SAS token for the CBS\r\n");
+        LogError("Could not generate a new SAS token for the CBS");
         result = RESULT_FAILURE;
     }
     else if (cbs_put_token(transport_state->cbs, CBS_AUDIENCE, STRING_c_str(transport_state->devicesPath), STRING_c_str(newSASToken), on_put_token_complete, transport_state) != RESULT_OK)
     {
-        LogError("Failed applying new SAS token to CBS\r\n");
+        LogError("Failed applying new SAS token to CBS");
         result = RESULT_FAILURE;
     }
     else
@@ -523,29 +533,29 @@
 
     if ((attach_properties = amqpvalue_create_map()) == NULL)
     {
-        LogError("Failed to create the map for device client type.\r\n");
+        LogError("Failed to create the map for device client type.");
     }
     else
     {
         if ((deviceClientTypeKeyName = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL)
         {
-            LogError("Failed to create the key name for the device client type.\r\n");
+            LogError("Failed to create the key name for the device client type.");
         }
         else
         {
             if ((deviceClientTypeValue = amqpvalue_create_string(CLIENT_DEVICE_TYPE_PREFIX CLIENT_DEVICE_BACKSLASH IOTHUB_SDK_VERSION)) == NULL)
             {
-                LogError("Failed to create the key value for the device client type.\r\n");
+                LogError("Failed to create the key value for the device client type.");
             }
             else
             {
                 if ((result = amqpvalue_set_map_value(attach_properties, deviceClientTypeKeyName, deviceClientTypeValue)) != 0)
                 {
-                    LogError("Failed to set the property map for the device client type.  Error code is: %d\r\n", result);
+                    LogError("Failed to set the property map for the device client type.  Error code is: %d", result);
                 }
                 else if ((result = link_set_attach_properties(link, attach_properties)) != 0)
                 {
-                    LogError("Unable to attach the device client type to the link properties. Error code is: %d\r\n", result);
+                    LogError("Unable to attach the device client type to the link properties. Error code is: %d", result);
                 }
 
                 amqpvalue_destroy(deviceClientTypeValue);
@@ -570,6 +580,11 @@
     }
 }
 
+void on_event_sender_state_changed(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
+{
+	LogInfo("Event sender state changed [%d->%d]", previous_state, new_state);
+}
+
 static int createEventSender(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
     int result = RESULT_FAILURE;
@@ -582,32 +597,32 @@
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_068: [IoTHubTransportAMQP_DoWork shall create the AMQP link for sending messages using 'source' as "ingress", target as the IoT hub FQDN, link name as "sender-link" and role as 'role_sender'] 
         if ((source = messaging_create_source(MESSAGE_SENDER_SOURCE_ADDRESS)) == NULL)
         {
-            LogError("Failed creating AMQP messaging source attribute.\r\n");
+            LogError("Failed creating AMQP messaging source attribute.");
         }
         else if ((target = messaging_create_target(STRING_c_str(transport_state->targetAddress))) == NULL)
         {
-            LogError("Failed creating AMQP messaging target attribute.\r\n");
+            LogError("Failed creating AMQP messaging target attribute.");
         }
         else if ((transport_state->sender_link = link_create(transport_state->session, MESSAGE_SENDER_LINK_NAME, role_sender, source, target)) == NULL)
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_069: [If IoTHubTransportAMQP_DoWork fails to create the AMQP link for sending messages, the function shall fail and return immediately, flagging the connection to be re-stablished] 
-            LogError("Failed creating AMQP link for message sender.\r\n");
+            LogError("Failed creating AMQP link for message sender.");
         }
         else
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size']
             if (link_set_max_message_size(transport_state->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
             {
-                LogError("Failed setting AMQP link max message size.\r\n");
+                LogError("Failed setting AMQP link max message size.");
             }
 
             attachDeviceClientTypeToLink(transport_state->sender_link);
 
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_070: [IoTHubTransportAMQP_DoWork shall create the AMQP message sender using messagesender_create() AMQP API] 
-            if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, NULL, NULL, NULL)) == NULL)
+            if ((transport_state->message_sender = messagesender_create(transport_state->sender_link, on_event_sender_state_changed, (void*)transport_state, NULL)) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_071: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be created, flagging the connection to be re-established] 
-                LogError("Could not allocate AMQP message sender\r\n");
+                LogError("Could not allocate AMQP message sender");
             }
             else
             {
@@ -615,7 +630,7 @@
                 if (messagesender_open(transport_state->message_sender) != RESULT_OK)
                 {
                     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_073: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message sender instance fails to be opened, flagging the connection to be re-established] 
-                    LogError("Failed opening the AMQP message sender.\r\n");
+                    LogError("Failed opening the AMQP message sender.");
                 }
                 else
                 {
@@ -641,7 +656,7 @@
     {
         if (messagereceiver_close(transport_state->message_receiver) != RESULT_OK)
         {
-            LogError("Failed closing the AMQP message receiver.\r\n");
+            LogError("Failed closing the AMQP message receiver.");
         }
 
         messagereceiver_destroy(transport_state->message_receiver);
@@ -670,29 +685,29 @@
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_074: [IoTHubTransportAMQP_DoWork shall create the AMQP link for receiving messages using 'source' as messageReceiveAddress, target as the "ingress-rx", link name as "receiver-link" and role as 'role_receiver'] 
         if ((source = messaging_create_source(STRING_c_str(transport_state->messageReceiveAddress))) == NULL)
         {
-            LogError("Failed creating AMQP message receiver source attribute.\r\n");
+            LogError("Failed creating AMQP message receiver source attribute.");
         }
         else if ((target = messaging_create_target(MESSAGE_RECEIVER_TARGET_ADDRESS)) == NULL)
         {
-            LogError("Failed creating AMQP message receiver target attribute.\r\n");
+            LogError("Failed creating AMQP message receiver target attribute.");
         }
         else if ((transport_state->receiver_link = link_create(transport_state->session, MESSAGE_RECEIVER_LINK_NAME, role_receiver, source, target)) == NULL)
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_075: [If IoTHubTransportAMQP_DoWork fails to create the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished] 
-            LogError("Failed creating AMQP link for message receiver.\r\n");
+            LogError("Failed creating AMQP link for message receiver.");
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_076: [IoTHubTransportAMQP_DoWork shall set the receiver link settle mode as receiver_settle_mode_first] 
         else if (link_set_rcv_settle_mode(transport_state->receiver_link, receiver_settle_mode_first) != RESULT_OK)
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_141: [If IoTHubTransportAMQP_DoWork fails to set the settle mode on the AMQP link for receiving messages, the function shall fail and return immediately, flagging the connection to be re-stablished]
-            LogError("Failed setting AMQP link settle mode for message receiver.\r\n");
+            LogError("Failed setting AMQP link settle mode for message receiver.");
         }
         else
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_119: [IoTHubTransportAMQP_DoWork shall apply a default value of 65536 for the parameter 'Link MAX message size']
             if (link_set_max_message_size(transport_state->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK)
             {
-                LogError("Failed setting AMQP link max message size for message receiver.\r\n");
+                LogError("Failed setting AMQP link max message size for message receiver.");
             }
 
             attachDeviceClientTypeToLink(transport_state->receiver_link);
@@ -701,7 +716,7 @@
             if ((transport_state->message_receiver = messagereceiver_create(transport_state->receiver_link, NULL, NULL)) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_078: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be created, flagging the connection to be re-established] 
-                LogError("Could not allocate AMQP message receiver.\r\n");
+                LogError("Could not allocate AMQP message receiver.");
             }
             else
             {
@@ -710,7 +725,7 @@
                 if (messagereceiver_open(transport_state->message_receiver, on_message_received, (const void*)iothub_client_handle) != RESULT_OK)
                 {
                     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_080: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the AMQP message receiver instance fails to be opened, flagging the connection to be re-established] 
-                    LogError("Failed opening the AMQP message receiver.\r\n");
+                    LogError("Failed opening the AMQP message receiver.");
                 }
                 else
                 {
@@ -741,14 +756,14 @@
     if (properties_map == NULL)
     {
         /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-        LogError("Failed to get property map from IoTHub message.\r\n");
+        LogError("Failed to get property map from IoTHub message.");
         result = __LINE__;
     }
     /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_015: [The actual keys and values, as well as the number of properties shall be obtained by calling Map_GetInternals on the handle obtained from IoTHubMessage_Properties.] */
     else if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
     {
         /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-        LogError("Failed to get the internals of the property map.\r\n");
+        LogError("Failed to get the internals of the property map.");
         result = __LINE__;
     }
     else
@@ -762,7 +777,7 @@
             if (uamqp_map == NULL)
             {
                 /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-                LogError("Failed to create uAMQP map for the properties.\r\n");
+                LogError("Failed to create uAMQP map for the properties.");
                 result = __LINE__;
             }
             else
@@ -774,7 +789,7 @@
                     if (map_key_value == NULL)
                     {
                         /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-                        LogError("Failed to create uAMQP property key value.\r\n");
+                        LogError("Failed to create uAMQP property key value.");
                         break;
                     }
 
@@ -784,7 +799,7 @@
                     {
                         amqpvalue_destroy(map_key_value);
                         /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-                        LogError("Failed to create uAMQP property key value.\r\n");
+                        LogError("Failed to create uAMQP property key value.");
                         break;
                     }
 
@@ -795,7 +810,7 @@
                         amqpvalue_destroy(map_key_value);
                         amqpvalue_destroy(map_value_value);
                         /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-                        LogError("Failed to create uAMQP property key value.\r\n");
+                        LogError("Failed to create uAMQP property key value.");
                         break;
                     }
 
@@ -813,7 +828,7 @@
                     if (message_set_application_properties(uamqp_message, uamqp_map) != 0)
                     {
                         /* Codes_SRS_IOTHUBTRANSPORTUAMQP_01_014: [If any of the APIs fails while building the property map and setting it on the uAMQP message, IoTHubTransportAMQP_DoWork shall notify the failure by invoking the upper layer message send callback with IOTHUB_CLIENT_CONFIRMATION_ERROR.] */
-                        LogError("Failed to transfer the message properties to the uAMQP message.\r\n");
+                        LogError("Failed to transfer the message properties to the uAMQP message.");
                         result = __LINE__;
                     }
                     else
@@ -856,7 +871,7 @@
         if (contentType == IOTHUBMESSAGE_BYTEARRAY &&
             IoTHubMessage_GetByteArray(message->messageHandle, &messageContent, &messageContentSize) != IOTHUB_MESSAGE_OK)
         {
-            LogError("Failed getting the BYTE array representation of the event content to be sent.\r\n");
+            LogError("Failed getting the BYTE array representation of the event content to be sent.");
             is_message_error = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_089: [If the event contains a message of type IOTHUBMESSAGE_STRING, IoTHubTransportAMQP_DoWork shall obtain its char* representation using IoTHubMessage_GetString()] 
@@ -864,19 +879,19 @@
         else if (contentType == IOTHUBMESSAGE_STRING &&
             ((messageContent = IoTHubMessage_GetString(message->messageHandle)) == NULL))
         {
-            LogError("Failed getting the STRING representation of the event content to be sent.\r\n");
+            LogError("Failed getting the STRING representation of the event content to be sent.");
             is_message_error = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_092: [If the event contains a message of type IOTHUBMESSAGE_UNKNOWN, IoTHubTransportAMQP_DoWork shall remove the event from the in-progress list and invoke the upper layer callback reporting the error] 
         else if (contentType == IOTHUBMESSAGE_UNKNOWN)
         {
-            LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.\r\n");
+            LogError("Cannot send events with content type IOTHUBMESSAGE_UNKNOWN.");
             is_message_error = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_093: [IoTHubTransportAMQP_DoWork shall create an amqp message using message_create() uAMQP API] 
         else if ((amqp_message = message_create()) == NULL)
         {
-            LogError("Failed allocating the AMQP message for sending the event.\r\n");
+            LogError("Failed allocating the AMQP message for sending the event.");
         }
         else
         {
@@ -893,7 +908,7 @@
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_095: [IoTHubTransportAMQP_DoWork shall set the AMQP message body using message_add_body_amqp_data() uAMQP API] 
             if (message_add_body_amqp_data(amqp_message, binary_data) != RESULT_OK)
             {
-                LogError("Failed setting the body of the AMQP message.\r\n");
+                LogError("Failed setting the body of the AMQP message.");
             }
             else
             {
@@ -907,7 +922,7 @@
                     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_097: [IoTHubTransportAMQP_DoWork shall pass the encoded AMQP message to AMQP for sending (along with on_message_send_complete callback) using messagesender_send()] 
                     if (messagesender_send(transport_state->message_sender, amqp_message, on_message_send_complete, message) != RESULT_OK)
                     {
-                        LogError("Failed sending the AMQP message.\r\n");
+                        LogError("Failed sending the AMQP message.");
                     }
                     else
                     {
@@ -952,7 +967,10 @@
 
 static void prepareForConnectionRetry(AMQP_TRANSPORT_INSTANCE* transport_state)
 {
+	destroyMessageReceiver(transport_state);
+	destroyEventSender(transport_state);
     destroyConnection(transport_state);
+	transport_state->connection_state = AMQP_MANAGEMENT_STATE_IDLE;
     rollEventsBackToWaitList(transport_state);
 }
 
@@ -968,32 +986,32 @@
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_005: [If parameter config (or its fields) is NULL then IoTHubTransportAMQP_Create shall fail and return NULL.] 
     if (config == NULL || config->upperConfig == NULL || config->waitingToSend == NULL)
     {
-        LogError("IoTHub AMQP client transport null configuration parameter.\r\n");
+        LogError("IoTHub AMQP client transport null configuration parameter.");
     }
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_006: [IoTHubTransportAMQP_Create shall fail and return NULL if any fields of the config structure are NULL.]
     else if (config->upperConfig->protocol == NULL)
     {
-        LogError("Invalid configuration (NULL protocol detected)\r\n");
+        LogError("Invalid configuration (NULL protocol detected)");
     }
     else if (config->upperConfig->deviceId == NULL)
     {
-        LogError("Invalid configuration (NULL deviceId detected)\r\n");
+        LogError("Invalid configuration (NULL deviceId detected)");
     }
     else if (config->upperConfig->deviceKey == NULL)
     {
-        LogError("Invalid configuration (NULL deviceKey detected)\r\n");
+        LogError("Invalid configuration (NULL deviceKey detected)");
     }
     else if (config->upperConfig->iotHubName == NULL)
     {
-        LogError("Invalid configuration (NULL iotHubName detected)\r\n");
+        LogError("Invalid configuration (NULL iotHubName detected)");
     }
     else if (config->upperConfig->iotHubSuffix == NULL)
     {
-        LogError("Invalid configuration (NULL iotHubSuffix detected)\r\n");
+        LogError("Invalid configuration (NULL iotHubSuffix detected)");
     }
     else if (!config->waitingToSend)
     {
-        LogError("Invalid configuration (NULL waitingToSend list detected)\r\n");
+        LogError("Invalid configuration (NULL waitingToSend list detected)");
     }
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_008: [IoTHubTransportAMQP_Create shall fail and return NULL if any config field of type string is zero length.] 
     else if ((deviceIdLength = strlen(config->upperConfig->deviceId)) == 0 ||
@@ -1001,17 +1019,17 @@
         (strlen(config->upperConfig->iotHubName) == 0) ||
         (strlen(config->upperConfig->iotHubSuffix) == 0))
     {
-        LogError("Zero-length config parameter (deviceId, deviceKey, iotHubName or iotHubSuffix)\r\n");
+        LogError("Zero-length config parameter (deviceId, deviceKey, iotHubName or iotHubSuffix)");
     }
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_007: [IoTHubTransportAMQP_Create shall fail and return NULL if the deviceId length is greater than 128.]
     else if (deviceIdLength > 128U)
     {
-        LogError("deviceId is too long\r\n");
+        LogError("deviceId is too long");
     }
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_134: [IoTHubTransportAMQP_Create shall fail and return NULL if the combined length of config->iotHubName and config->iotHubSuffix exceeds 254 bytes (RFC1035)]
     else if ((strlen(config->upperConfig->iotHubName) + strlen(config->upperConfig->iotHubSuffix)) > (RFC1035_MAX_FQDN_LENGTH - 1))
     {
-        LogError("The lengths of iotHubName and iotHubSuffix together exceed the maximum FQDN length allowed (RFC 1035)\r\n");
+        LogError("The lengths of iotHubName and iotHubSuffix together exceed the maximum FQDN length allowed (RFC 1035)");
     }
     else
     {
@@ -1020,7 +1038,7 @@
 
         if (transport_state == NULL)
         {
-            LogError("Could not allocate AMQP transport state\r\n");
+            LogError("Could not allocate AMQP transport state");
         }
         else
         {
@@ -1058,35 +1076,35 @@
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_010: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as iotHubHostFqdn, from the following pieces: config->iotHubName + "." + config->iotHubSuffix.] 
             if ((transport_state->iotHubHostFqdn = concat3Params(config->upperConfig->iotHubName, ".", config->upperConfig->iotHubSuffix)) == NULL)
             {
-                LogError("Failed to set transport_state->iotHubHostFqdn.\r\n");
+                LogError("Failed to set transport_state->iotHubHostFqdn.");
                 cleanup_required = true;
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_012: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as devicesPath, from the following parts: host_fqdn + "/devices/" + deviceId.] 
             else if ((transport_state->devicesPath = concat3Params(STRING_c_str(transport_state->iotHubHostFqdn), "/devices/", config->upperConfig->deviceId)) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_013: [If creating devicesPath fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] 
-                LogError("Failed to allocate transport_state->devicesPath.\r\n");
+                LogError("Failed to allocate transport_state->devicesPath.");
                 cleanup_required = true;
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_014: [IoTHubTransportAMQP_Create shall create an immutable string, referred to as targetAddress, from the following parts: "amqps://" + devicesPath + "/messages/events".]
             else if ((transport_state->targetAddress = concat3Params("amqps://", STRING_c_str(transport_state->devicesPath), "/messages/events")) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_015: [If creating the targetAddress fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.] 
-                LogError("Failed to allocate transport_state->targetAddress.\r\n");
+                LogError("Failed to allocate transport_state->targetAddress.");
                 cleanup_required = true;
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_053: [IoTHubTransportAMQP_Create shall define the source address for receiving messages as "amqps://" + devicesPath + "/messages/devicebound", stored in the transport handle as messageReceiveAddress]
             else if ((transport_state->messageReceiveAddress = concat3Params("amqps://", STRING_c_str(transport_state->devicesPath), "/messages/devicebound")) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_054: [If creating the messageReceiveAddress fails for any reason then IoTHubTransportAMQP_Create shall fail and return NULL.]
-                LogError("Failed to allocate transport_state->messageReceiveAddress.\r\n");
+                LogError("Failed to allocate transport_state->messageReceiveAddress.");
                 cleanup_required = true;
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_016: [IoTHubTransportAMQP_Create shall initialize handle->sasTokenKeyName with a zero-length STRING_HANDLE instance.] 
             else if ((transport_state->sasTokenKeyName = STRING_new()) == NULL)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_017: [If IoTHubTransportAMQP_Create fails to initialize handle->sasTokenKeyName with a zero-length STRING the function shall fail and return NULL.] 
-                LogError("Failed to allocate transport_state->sasTokenKeyName.\r\n");
+                LogError("Failed to allocate transport_state->sasTokenKeyName.");
                 cleanup_required = true;
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_018: [IoTHubTransportAMQP_Create shall store a copy of config->deviceKey (passed by upper layer) into the transport's own deviceKey field] 
@@ -1094,7 +1112,7 @@
                 STRING_copy(transport_state->deviceKey, config->upperConfig->deviceKey) != 0)
             {
                 // Codes_SRS_IOTHUBTRANSPORTAMQP_09_019: [If IoTHubTransportAMQP_Create fails to copy config->deviceKey, the function shall fail and return NULL.]
-                LogError("Failed to allocate transport_state->deviceKey.\r\n");
+                LogError("Failed to allocate transport_state->deviceKey.");
                 cleanup_required = true;
             }
             else
@@ -1176,12 +1194,12 @@
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_051: [IoTHubTransportAMQP_DoWork shall fail and return immediately if the transport handle parameter is NULL] 
     if (handle == NULL)
     {
-        LogError("IoTHubClient DoWork failed: transport handle parameter is NULL.\r\n");
+        LogError("IoTHubClient DoWork failed: transport handle parameter is NULL.");
     }
     // Codes_[IoTHubTransportAMQP_DoWork shall fail and return immediately if the client handle parameter is NULL] 
     else if (iotHubClientHandle == NULL)
     {
-        LogError("IoTHubClient DoWork failed: client handle parameter is NULL.\r\n");
+        LogError("IoTHubClient DoWork failed: client handle parameter is NULL.");
     }
     else
     {
@@ -1191,11 +1209,17 @@
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_147: [IoTHubTransportAMQP_DoWork shall save a reference to the client handle in transport_state->iothub_client_handle]
         transport_state->iothub_client_handle = iotHubClientHandle;
 
+		if (transport_state->connection != NULL &&
+			transport_state->connection_state == AMQP_MANAGEMENT_STATE_ERROR)
+		{
+			LogError("An error occured on AMQP connection. The connection will be restablished.");
+			trigger_connection_retry = true;
+		}
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_055: [If the transport handle has a NULL connection, IoTHubTransportAMQP_DoWork shall instantiate and initialize the AMQP components and establish the connection] 
-        if (transport_state->connection == NULL &&
+        else if (transport_state->connection == NULL &&
             establishConnection(transport_state) != RESULT_OK)
         {
-            LogError("AMQP transport failed to establish connection with service.\r\n");
+            LogError("AMQP transport failed to establish connection with service.");
             trigger_connection_retry = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_081: [IoTHubTransportAMQP_DoWork shall put a new SAS token if the one has not been out already, or if the previous one failed to be put due to timeout of cbs_put_token().]
@@ -1204,14 +1228,14 @@
             startAuthentication(transport_state) != RESULT_OK)
         {
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_146: [If the SAS token fails to be sent to CBS (cbs_put_token), IoTHubTransportAMQP_DoWork shall fail and exit immediately]
-            LogError("Failed authenticating AMQP connection within CBS.\r\n");
+            LogError("Failed authenticating AMQP connection within CBS.");
             trigger_connection_retry = true;
         }
         // Codes_SRS_IOTHUBTRANSPORTAMQP_09_084: [IoTHubTransportAMQP_DoWork shall wait for 'cbs_request_timeout' milliseconds for the cbs_put_token() to complete before failing due to timeout]
         else if (transport_state->cbs_state == CBS_STATE_AUTH_IN_PROGRESS &&
             verifyAuthenticationTimeout(transport_state) == RESULT_TIMEOUT)
         {
-            LogError("AMQP transport authentication timed out.\r\n");
+            LogError("AMQP transport authentication timed out.");
             trigger_connection_retry = true;
         }
         else if (transport_state->cbs_state == CBS_STATE_AUTHENTICATED)
@@ -1221,7 +1245,7 @@
                 transport_state->message_receiver == NULL &&
                 createMessageReceiver(transport_state, iotHubClientHandle) != RESULT_OK)
             {
-                LogError("Failed creating AMQP transport message receiver.\r\n");
+                LogError("Failed creating AMQP transport message receiver.");
                 trigger_connection_retry = true;
             }
             // Codes_SRS_IOTHUBTRANSPORTAMQP_09_122: [IoTHubTransportAMQP_DoWork shall destroy the transport_state->message_receiver (and set it to NULL) if it exists and transport_state->receive_messages is false] 
@@ -1229,18 +1253,18 @@
                 transport_state->message_receiver != NULL &&
                 destroyMessageReceiver(transport_state) != RESULT_OK)
             {
-                LogError("Failed destroying AMQP transport message receiver.\r\n");
+                LogError("Failed destroying AMQP transport message receiver.");
             }
 
             if (transport_state->message_sender == NULL &&
                 createEventSender(transport_state) != RESULT_OK)
             {
-                LogError("Failed creating AMQP transport event sender.\r\n");
+                LogError("Failed creating AMQP transport event sender.");
                 trigger_connection_retry = true;
             }
             else if (sendPendingEvents(transport_state) != RESULT_OK)
             {
-                LogError("AMQP transport failed sending events.\r\n");
+                LogError("AMQP transport failed sending events.");
             }
         }
 
@@ -1263,7 +1287,7 @@
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_037: [IoTHubTransportAMQP_Subscribe shall fail if the transport handle parameter received is NULL.] 
     if (handle == NULL)
     {
-        LogError("Invalid handle to IoTHubClient AMQP transport.\r\n");
+        LogError("Invalid handle to IoTHubClient AMQP transport.");
         result = __LINE__;
     }
     else
@@ -1282,7 +1306,7 @@
     // Codes_SRS_IOTHUBTRANSPORTAMQP_09_039: [IoTHubTransportAMQP_Unsubscribe shall fail if the transport handle parameter received is NULL.] 
     if (handle == NULL)
     {
-        LogError("Invalid handle to IoTHubClient AMQP transport.\r\n");
+        LogError("Invalid handle to IoTHubClient AMQP transport.");
     }
     else
     {
@@ -1300,12 +1324,12 @@
     if (handle == NULL)
     {
         result = IOTHUB_CLIENT_INVALID_ARG;
-        LogError("Invalid handle to IoTHubClient AMQP transport instance.\r\n");
+        LogError("Invalid handle to IoTHubClient AMQP transport instance.");
     }
     else if (iotHubClientStatus == NULL)
     {
         result = IOTHUB_CLIENT_INVALID_ARG;
-        LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS.\r\n");
+        LogError("Invalid pointer to output parameter IOTHUB_CLIENT_STATUS.");
     }
     else
     {
@@ -1342,7 +1366,7 @@
         )
     {
         result = IOTHUB_CLIENT_INVALID_ARG;
-        LogError("Invalid parameter (NULL) passed to AMQP transport SetOption()\r\n");
+        LogError("Invalid parameter (NULL) passed to AMQP transport SetOption()");
     }
     else
     {
@@ -1373,7 +1397,7 @@
                 (transport_state->tls_io = transport_state->tls_io_transport_provider(STRING_c_str(transport_state->iotHubHostFqdn), transport_state->iotHubPort)) == NULL)
             {
                 result = IOTHUB_CLIENT_ERROR;
-                LogError("Failed to obtain a TLS I/O transport layer.\r\n");
+                LogError("Failed to obtain a TLS I/O transport layer.");
             }
             else
             {
@@ -1385,7 +1409,7 @@
                 else
                 {
                     result = IOTHUB_CLIENT_ERROR;
-                    LogError("Invalid option (%s) passed to uAMQP transport SetOption()\r\n", option);
+                    LogError("Invalid option (%s) passed to uAMQP transport SetOption()", option);
                 }
             }
         }