Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
2:9db4da50abaa
Parent:
1:f2e563755d91
Child:
3:40f482ed0be8
--- a/iothubtransportmqtt.c	Fri Apr 08 13:24:20 2016 -0700
+++ b/iothubtransportmqtt.c	Sun Apr 24 16:40:02 2016 -0700
@@ -22,12 +22,14 @@
 #include "azure_c_shared_utility/tlsio.h"
 #include "azure_c_shared_utility/platform.h"
 
+#include "azure_c_shared_utility/string_tokenizer.h"
 #include "iothub_client_version.h"
 
 #include <stdarg.h>
 #include <stdio.h>
 
 #define SAS_TOKEN_DEFAULT_LIFETIME  3600
+#define SAS_REFRESH_MULTIPLIER      .8
 #define EPOCH_TIME_T_VALUE          0
 #define DEFAULT_MQTT_KEEPALIVE      4*60 // 4 min
 #define DEFAULT_PORT_NUMBER         8883
@@ -37,6 +39,29 @@
 #define SAS_TOKEN_DEFAULT_LEN       10
 #define RESEND_TIMEOUT_VALUE_MIN    1*60
 #define MAX_SEND_RECOUNT_LIMIT      2
+#define DEFAULT_CONNECTION_INTERVAL 30
+#define FAILED_CONN_BACKOFF_VALUE   5
+
+static const char* DEVICE_MSG_TOPIC = "devices/%s/messages/devicebound/#";
+static const char* DEVICE_DEVICE_TOPIC = "devices/%s/messages/events/";
+static const char* PROPERTY_SEPARATOR = "&";
+
+typedef struct SYSTEM_PROPERTY_INFO_TAG
+{
+    const char* propName;
+    size_t propLength;
+} SYSTEM_PROPERTY_INFO;
+
+static SYSTEM_PROPERTY_INFO sysPropList[] = {
+    { "%24.exp", 7 },
+    { "%24.mid", 7 },
+    { "%24.uid", 7 },
+    { "%24.to", 6 },
+    { "%24.cid", 7 },
+    { "devices/", 8 },
+    { "iothub-operation", 16 },
+    { "iothub-ack", 10 }
+};
 
 TICK_COUNTER_HANDLE g_msgTickCounter;
 
@@ -69,6 +94,9 @@
     CONTROL_PACKET_TYPE currPacketState;
     XIO_HANDLE xioTransport;
     int keepAliveValue;
+    uint64_t mqtt_connect_time;
+    size_t connectFailCount;
+    uint64_t connectTick;
 } MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA;
 
 typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG
@@ -102,28 +130,190 @@
     IoTHubClient_LL_SendComplete(transportState->llClientHandle, &messageCompleted, batchResult);
 }
 
+static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic)
+{
+    STRING_HANDLE result = STRING_construct(eventTopic);
+    const char* const* propertyKeys;
+    const char* const* propertyValues;
+    size_t propertyCount;
+
+    // Construct Properties
+    MAP_HANDLE properties_map = IoTHubMessage_Properties(iothub_message_handle);
+    if (properties_map != NULL)
+    {
+        if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
+        {
+            LogError("Failed to get the internals of the property map.");
+            STRING_delete(result);
+            result = NULL;
+        }
+        else
+        {
+            if (propertyCount != 0)
+            {
+                for (size_t index = 0; index < propertyCount && result != NULL; index++)
+                {
+                    size_t len = strlen(propertyKeys[index]) + strlen(propertyValues[index]) + 2;
+                    char* propValues = malloc(len+1);
+                    if (propValues == NULL)
+                    {
+                        STRING_delete(result);
+                        result = NULL;
+                    }
+                    else
+                    {
+                        sprintf(propValues, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : "&");
+                        if (STRING_concat(result, propValues) != 0)
+                        {
+                            STRING_delete(result);
+                            result = NULL;
+                        }
+                        free(propValues);
+                    }
+                }
+            }
+        }
+    }
+    return result;
+}
+
 static int publishMqttMessage(PMQTTTRANSPORT_HANDLE_DATA transportState, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
 {
     int result;
-    MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(transportState->packetId++, STRING_c_str(transportState->mqttEventTopic), DELIVER_AT_LEAST_ONCE, payload, len);
-    if (mqttMsg == NULL)
+    STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transportState->mqttEventTopic));
+    if (msgTopic == NULL)
     {
         result = __LINE__;
     }
     else
     {
-        if (mqtt_client_publish(transportState->mqttClient, mqttMsg) != 0)
+        MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(transportState->packetId++, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
+        if (mqttMsg == NULL)
         {
             result = __LINE__;
         }
         else
         {
-            mqttMsgEntry->retryCount++;
-            (void)tickcounter_get_current_ms(g_msgTickCounter, &mqttMsgEntry->msgPublishTime);
-            result = 0;
+            if (mqtt_client_publish(transportState->mqttClient, mqttMsg) != 0)
+            {
+                result = __LINE__;
+            }
+            else
+            {
+                mqttMsgEntry->retryCount++;
+                (void)tickcounter_get_current_ms(g_msgTickCounter, &mqttMsgEntry->msgPublishTime);
+                result = 0;
+            }
+            mqttmessage_destroy(mqttMsg);
+        }
+        STRING_delete(msgTopic);
+    }
+    return result;
+}
+
+static bool isSystemProperty(const char* tokenData)
+{
+    bool result = false;
+    size_t propCount = sizeof(sysPropList)/sizeof(sysPropList[0]);
+    for (size_t index = 0; index < propCount; index++)
+    {
+        if (memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0)
+        {
+            result = true;
+            break;
+        }
+    }
+    return result;
+}
+
+static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, MQTT_MESSAGE_HANDLE msgHandle)
+{
+    int result;
+    STRING_HANDLE mqttTopic = STRING_construct(mqttmessage_getTopicName(msgHandle));
+
+    STRING_TOKENIZER_HANDLE token = STRING_TOKENIZER_create(mqttTopic);
+    if (token != NULL)
+    {
+        MAP_HANDLE propertyMap = IoTHubMessage_Properties(IoTHubMessage);
+        if (propertyMap == NULL)
+        {
+            LogError("Failure to retrieve IoTHubMessage_properties.");
+            result = __LINE__;
         }
-        mqttmessage_destroy(mqttMsg);
+        else
+        {
+            STRING_HANDLE output = STRING_new();
+            if (output == NULL)
+            {
+                LogError("Failure to allocate STRING_new.");
+                result = __LINE__;
+            }
+            else
+            {
+                result = 0;
+                while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0)
+                {
+                    const char* tokenData = STRING_c_str(output);
+                    size_t tokenLen = strlen(tokenData);
+                    if (tokenData == NULL || tokenLen == 0)
+                    {
+                        break;
+                    }
+                    else
+                    {
+                        if (!isSystemProperty(tokenData) )
+                        {
+                            const char* iterator = tokenData;
+                            while (iterator != NULL && *iterator != '\0' && result == 0)
+                            {
+                                if (*iterator == '=')
+                                {
+                                    size_t nameLen = iterator - tokenData;
+                                    char* propName = malloc(nameLen + 1);
+
+                                    size_t valLen = tokenLen - (nameLen + 1) + 1;
+                                    char* propValue = malloc(valLen + 1);
+
+                                    if (propName == NULL || propValue == NULL)
+                                    {
+                                        result = __LINE__;
+                                    }
+                                    else
+                                    {
+                                        strncpy(propName, tokenData, nameLen);
+                                        propName[nameLen] = '\0';
+
+                                        strncpy(propValue, iterator + 1, valLen);
+                                        propValue[valLen] = '\0';
+
+                                        if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK)
+                                        {
+                                            LogError("Map_AddOrUpdate failed.");
+                                            result = __LINE__;
+                                        }
+                                    }
+                                    free(propName);
+                                    free(propValue);
+
+                                    break;
+                                }
+                                iterator++;
+                            }
+                        }
+                    }
+                }
+                STRING_delete(output);
+            }
+        }
+        STRING_TOKENIZER_destroy(token);
     }
+    else
+    {
+        LogError("Unable to create Tokenizer object.");
+        result = __LINE__;
+    }
+    STRING_delete(mqttTopic);
+
     return result;
 }
 
@@ -135,14 +325,16 @@
         IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length);
         if (IoTHubMessage == NULL)
         {
-            LogError("IotHub Message creation has failed.\r\n");
+            LogError("IotHub Message creation has failed.");
         }
         else
         {
+            // Will need to update this when the service has messages that can be rejected
+            (void)extractMqttProperties(IoTHubMessage, msgHandle);
             PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
             if (IoTHubClient_LL_MessageCallback(transportData->llClientHandle, IoTHubMessage) != IOTHUBMESSAGE_ACCEPTED)
             {
-                LogError("Event not accepted by our client.\r\n");
+                LogError("Event not accepted by our client.");
             }
             IoTHubMessage_Destroy(IoTHubMessage);
         }
@@ -194,7 +386,7 @@
                     }
                     else
                     {
-                        LogError("Connection not accepted, return code: %d.\r\n", connack->returnCode);
+                        LogError("Connection not accepted, return code: %d.", connack->returnCode);
                         (void)mqtt_client_disconnect(transportData->mqttClient);
                         transportData->connected = false;
                         transportData->currPacketState = PACKET_TYPE_ERROR;
@@ -202,7 +394,7 @@
                 }
                 else
                 {
-                    LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL.\r\n");
+                    LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL.");
                 }
                 break;
             }
@@ -218,7 +410,7 @@
                     }
                     else
                     {
-                        LogError("QOS count was not expected: %d.\r\n", (int)suback->qosCount);
+                        LogError("QOS count was not expected: %d.", (int)suback->qosCount);
                     }
                 }
                 break;
@@ -300,7 +492,7 @@
     {
         if (IoTHubMessage_GetByteArray(messageHandle, &result, length) != IOTHUB_MESSAGE_OK)
         {
-            LogError("Failure result from IoTHubMessage_GetByteArray\r\n");
+            LogError("Failure result from IoTHubMessage_GetByteArray");
             result = NULL;
             *length = 0;
         }
@@ -310,7 +502,7 @@
         result = (const unsigned char*)IoTHubMessage_GetString(messageHandle);
         if (result == NULL)
         {
-            LogError("Failure result from IoTHubMessage_GetString\r\n");
+            LogError("Failure result from IoTHubMessage_GetString");
             result = NULL;
             *length = 0;
         }
@@ -360,7 +552,7 @@
     }
     else
     {
-        (void)sprintf(eventTopic, "devices/%s/messages/events/", deviceId);
+        (void)sprintf(eventTopic, DEVICE_DEVICE_TOPIC, deviceId);
         result = STRING_construct(eventTopic);
         free(eventTopic);
     }
@@ -379,7 +571,7 @@
     }
     else
     {
-        (void)sprintf(messageTopic, "devices/%s/messages/devicebound/#", deviceId);
+        (void)sprintf(messageTopic, DEVICE_MSG_TOPIC, deviceId);
         result = STRING_construct(messageTopic);
         free(messageTopic);
     }
@@ -407,7 +599,7 @@
         transportState->xioTransport = getIoTransportProvider(hostName, transportState->portNum);
         if (transportState->xioTransport == NULL)
         {
-            LogError("Unable to create the lower level TLS layer.\r\n");
+            LogError("Unable to create the lower level TLS layer.");
             result = __LINE__;
         }
         else
@@ -421,9 +613,10 @@
     }
     return result;
 }
+
 static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transportState)
 {
-    int result = 0;
+    int result;
 
     // Construct SAS token
     size_t secSinceEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + 0);
@@ -447,18 +640,25 @@
         options.useCleanSession = false;
         options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE;
 
-        if ((result = GetTransportProviderIfNecessary(transportState)) == 0)
+        if (GetTransportProviderIfNecessary(transportState) == 0)
         {
             if (mqtt_client_connect(transportState->mqttClient, transportState->xioTransport, &options) != 0)
             {
-                LogError("failure connecting to address %s:%d.\r\n", STRING_c_str(transportState->hostAddress), transportState->portNum);
+                LogError("failure connecting to address %s:%d.", STRING_c_str(transportState->hostAddress), transportState->portNum);
+                xio_destroy(transportState->xioTransport);
+                transportState->xioTransport = NULL;
                 result = __LINE__;
             }
             else
             {
+                (void)tickcounter_get_current_ms(g_msgTickCounter, &transportState->mqtt_connect_time);
                 result = 0;
             }
         }
+        else
+        {
+            result = __LINE__;
+    }
     }
     STRING_delete(emptyKeyName);
     STRING_delete(sasToken);
@@ -468,19 +668,62 @@
 static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transportState)
 {
     int result = 0;
-    if (!transportState->connected && !transportState->destroyCalled)
+
+    // Make sure we're not destroying the object
+    if (!transportState->destroyCalled)
     {
+        // If we are not connected then check to see if we need 
+        // to back off the connecting to the server
+        if (!transportState->connected)
+        {
+            // Default makeConnection as true if something goes wrong we'll make the connection
+            bool makeConnection = true;
+            // If we've failed for FAILED_CONN_BACKOFF_VALUE straight times them let's slow down connection
+            // to the service
+            if (transportState->connectFailCount > FAILED_CONN_BACKOFF_VALUE)
+            {
+                uint64_t currentTick;
+                if (tickcounter_get_current_ms(g_msgTickCounter, &currentTick) == 0)
+                {
+                    if ( ((currentTick - transportState->connectTick)/1000) <= DEFAULT_CONNECTION_INTERVAL)
+                    {
+                        result = __LINE__;
+                        makeConnection = false;
+                    }
+                }
+            }
+
+            if (makeConnection)
+    {
+                (void)tickcounter_get_current_ms(g_msgTickCounter, &transportState->connectTick);
         if (SendMqttConnectMsg(transportState) != 0)
         {
-            transportState->connected = false;
+                    transportState->connectFailCount++;
             result = __LINE__;
         }
         else
         {
+                    transportState->connectFailCount = 0;
             transportState->connected = true;
             result = 0;
         }
     }
+        }
+
+        if (transportState->connected)
+        {
+            // We are connected and not being closed, so does SAS need to reconnect?
+            uint64_t current_time;
+            (void)tickcounter_get_current_ms(g_msgTickCounter, &current_time);
+            if ((current_time - transportState->mqtt_connect_time) / 1000 > (SAS_TOKEN_DEFAULT_LIFETIME*SAS_REFRESH_MULTIPLIER))
+            {
+                (void)mqtt_client_disconnect(transportState->mqttClient);
+                transportState->subscribed = false;
+                transportState->connected = false;
+                transportState->currPacketState = UNKNOWN_TYPE;
+            }
+        }
+    }
     return result;
 }
 
@@ -508,7 +751,7 @@
     PMQTTTRANSPORT_HANDLE_DATA state = (PMQTTTRANSPORT_HANDLE_DATA)malloc(sizeof(MQTTTRANSPORT_HANDLE_DATA));
     if (state == NULL)
     {
-        LogError("Could not create MQTT transport state. Memory allocation failed.\r\n");
+        LogError("Could not create MQTT transport state. Memory allocation failed.");
     }
     else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL)
     {
@@ -517,14 +760,14 @@
     }
     else if ((state->device_key = STRING_construct(upperConfig->deviceKey)) == NULL)
     {
-        LogError("Could not create device key for MQTT\r\n");
+        LogError("Could not create device key for MQTT");
         STRING_delete(state->device_id);
         free(state);
         state = NULL;
     }
     else if ( (state->sasTokenSr = ConstructSasToken(upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId) ) == NULL)
     {
-        LogError("Could not create Sas Token Sr String.\r\n");
+        LogError("Could not create Sas Token Sr String.");
         STRING_delete(state->device_key);
         STRING_delete(state->device_id);
         free(state);
@@ -532,7 +775,7 @@
     }
     else if ( (state->mqttEventTopic = ConstructEventTopic(upperConfig->deviceId) ) == NULL)
     {
-        LogError("Could not create mqttEventTopic for MQTT\r\n");
+        LogError("Could not create mqttEventTopic for MQTT");
         STRING_delete(state->sasTokenSr);
         STRING_delete(state->device_key);
         STRING_delete(state->device_id);
@@ -541,7 +784,7 @@
     }
     else if ((state->mqttMessageTopic = ConstructMessageTopic(upperConfig->deviceId) ) == NULL)
     {
-        LogError("Could not create mqttMessageTopic for MQTT\r\n");
+        LogError("Could not create mqttMessageTopic for MQTT");
         STRING_delete(state->mqttEventTopic);
         STRING_delete(state->sasTokenSr);
         STRING_delete(state->device_key);
@@ -605,6 +848,8 @@
                 state->waitingToSend = waitingToSend;
                 state->currPacketState = CONNECT_TYPE;
                 state->keepAliveValue = DEFAULT_MQTT_KEEPALIVE;
+                state->connectFailCount = 0;
+                state->connectTick = 0;
             }
         }
     }
@@ -619,7 +864,7 @@
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_001: [If parameter config is NULL then IoTHubTransportMqtt_Create shall return NULL.] */
     if (config == NULL)
     {
-        LogError("Invalid Argument: Config Parameter is NULL.\r\n");
+        LogError("Invalid Argument: Config Parameter is NULL.");
         result = NULL;
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransportMqtt_Create shall return NULL.] */
@@ -627,31 +872,31 @@
     else if (config->upperConfig == NULL || config->upperConfig->protocol == NULL || config->upperConfig->deviceId == NULL || config->upperConfig->deviceKey == NULL ||
         config->upperConfig->iotHubName == NULL || config->upperConfig->iotHubSuffix == NULL)
     {
-        LogError("Invalid Argument: upperConfig structure contains an invalid parameter\r\n");
+        LogError("Invalid Argument: upperConfig structure contains an invalid parameter");
         result = NULL;
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransportMqtt_Create shall return NULL.] */
     else if (config->waitingToSend == NULL)
     {
-        LogError("Invalid Argument: waitingToSend is NULL)\r\n");
+        LogError("Invalid Argument: waitingToSend is NULL)");
         result = NULL;
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_006: [If the upperConfig's variables deviceId is an empty strings or length is greater then 128 then IoTHubTransportMqtt_Create shall return NULL.] */
     else if ( ( (deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0) )
     {
-        LogError("Invalid Argument: DeviceId is of an invalid size\r\n");
+        LogError("Invalid Argument: DeviceId is of an invalid size");
         result = NULL;
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransportMqtt_Create shall return NULL.] */
     else if (strlen(config->upperConfig->deviceKey) == 0)
     {
-        LogError("Invalid Argument: deviceKey is empty\r\n");
+        LogError("Invalid Argument: deviceKey is empty");
         result = NULL;
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransportMqtt_Create shall return NULL.] */
     else if (strlen(config->upperConfig->iotHubName) == 0)
     {
-        LogError("Invalid Argument: iotHubName is empty\r\n");
+        LogError("Invalid Argument: iotHubName is empty");
         result = NULL;
     }
     else
@@ -677,6 +922,7 @@
 
     (void)mqtt_client_disconnect(transportState->mqttClient);
     xio_destroy(transportState->xioTransport);
+    transportState->xioTransport = NULL;
 
     transportState->connected = false;
     transportState->currPacketState = DISCONNECT_TYPE;
@@ -722,7 +968,7 @@
     if (transportState == NULL)
     {
         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransportMqtt_Subscribe shall return a non-zero value.] */
-        LogError("Invalid handle parameter. NULL.\r\n");
+        LogError("Invalid handle parameter. NULL.");
         result = __LINE__;
     }
     else
@@ -756,7 +1002,7 @@
     }
     else
     {
-        LogError("Invalid argument to unsubscribe (NULL). \r\n");
+        LogError("Invalid argument to unsubscribe (NULL).");
     }
 }
 
@@ -810,7 +1056,7 @@
                             const unsigned char* messagePayload = RetrieveMessagePayload(mqttMsgEntry->iotHubMessageEntry->messageHandle, &messageLength);
                             if (messageLength == 0 || messagePayload == NULL)
                             {
-                                LogError("Failure from creating Message IoTHubMessage_GetData\r\n");
+                                LogError("Failure from creating Message IoTHubMessage_GetData");
                             }
                             else
                             {
@@ -839,7 +1085,7 @@
                     const unsigned char* messagePayload = RetrieveMessagePayload(iothubMsgList->messageHandle, &messageLength);
                     if (messageLength == 0 || messagePayload == NULL)
                     {
-                        LogError("Failure result from IoTHubMessage_GetData\r\n");
+                        LogError("Failure result from IoTHubMessage_GetData");
                     }
                     else
                     {
@@ -847,7 +1093,7 @@
                         MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST));
                         if (mqttMsgEntry == NULL)
                         {
-                            LogError("Allocation Error: Failure allocating MQTT Message Detail List.\r\n");
+                            LogError("Allocation Error: Failure allocating MQTT Message Detail List.");
                         }
                         else
                         {
@@ -884,7 +1130,7 @@
     if (handle == NULL || iotHubClientStatus == NULL)
     {
         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransportMqtt_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */
-        LogError("invalid arument. \r\n");
+        LogError("invalid arument.");
         result = IOTHUB_CLIENT_INVALID_ARG;
     }
     else
@@ -916,7 +1162,7 @@
         )
     {
         result = IOTHUB_CLIENT_INVALID_ARG;
-        LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n");
+        LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption.");
     }
     else
     {