Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
18:ec8e5e97c6a4
Parent:
17:774695cb8554
Child:
19:f87dfe76bc70
diff -r 774695cb8554 -r ec8e5e97c6a4 iothubtransport_mqtt_common.c
--- a/iothubtransport_mqtt_common.c	Sat Jan 28 09:34:14 2017 -0800
+++ b/iothubtransport_mqtt_common.c	Fri Feb 24 14:00:29 2017 -0800
@@ -3,6 +3,7 @@
 
 #include <stdlib.h>
 #include <ctype.h>
+#include "azure_c_shared_utility/optimize_size.h"
 #include "azure_c_shared_utility/gballoc.h"
 
 #include "azure_c_shared_utility/xlogging.h"
@@ -66,6 +67,9 @@
 
 static const char* REQUEST_ID_PROPERTY = "?$rid=";
 
+static const char* MESSAGE_ID_PROPERTY = "mid";
+static const char* CORRELATION_ID_PROPERTY = "cid";
+
 #define UNSUBSCRIBE_FROM_TOPIC                  0x0000
 #define SUBSCRIBE_GET_REPORTED_STATE_TOPIC      0x0001
 #define SUBSCRIBE_NOTIFICATION_STATE_TOPIC      0x0002
@@ -264,7 +268,7 @@
     }
     else
     {
-        result = __LINE__;
+        result = __FAILURE__;
     }
     return result;
 }
@@ -384,7 +388,7 @@
     {
         /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */
         LogError("Invalid handle parameter. NULL.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -409,7 +413,7 @@
         {
             /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_044: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds cannot be created then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line]*/
             LogError("Retry Logic is not created");
-            result = __LINE__;
+            result = __FAILURE__;
         }
     }
     return result;
@@ -573,7 +577,7 @@
     if (token_handle == NULL)
     {
         LogError("Failed creating token from device twin topic.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -581,13 +585,13 @@
         if ((token_value = STRING_new()) == NULL)
         {
             LogError("Failed allocating new string .");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
             size_t token_index = 0;
             size_t request_id_length = strlen(REQUEST_ID_PROPERTY);
-            result = __LINE__;
+            result = __FAILURE__;
             while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
             {
                 if (token_index == 3)
@@ -595,7 +599,7 @@
                     if (STRING_concat_with_STRING(method_name, token_value) != 0)
                     {
                         LogError("Failed STRING_concat_with_STRING.");
-                        result = __LINE__;
+                        result = __FAILURE__;
                         break;
                     }
                 }
@@ -609,7 +613,7 @@
                             if (STRING_concat(request_id, request_id_value+request_id_length) != 0)
                             {
                                 LogError("Failed STRING_concat failed.");
-                                result = __LINE__;
+                                result = __FAILURE__;
                             }
                             else
                             {
@@ -635,7 +639,7 @@
     if (token_handle == NULL)
     {
         LogError("Failed creating token from device twin topic.");
-        result = __LINE__;
+        result = __FAILURE__;
         *status_code = 0;
         *request_id = 0;
         *patch_msg = false;
@@ -646,14 +650,14 @@
         if ((token_value = STRING_new()) == NULL)
         {
             LogError("Failed allocating new string .");
-            result = __LINE__;
+            result = __FAILURE__;
             *status_code = 0;
             *request_id = 0;
             *patch_msg = false;
         }
         else
         {
-            result = __LINE__;
+            result = __FAILURE__;
             size_t token_count = 0;
             while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
             {
@@ -815,27 +819,27 @@
     STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent));
     if (msgTopic == NULL)
     {
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
         MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(mqttMsgEntry->packet_id, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
         if (mqttMsg == NULL)
         {
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
             if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
             {
                 LogError("Failed retrieving tickcounter info");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
                 if (mqtt_client_publish(transport_data->mqttClient, mqttMsg) != 0)
                 {
-                    result = __LINE__;
+                    result = __FAILURE__;
                 }
                 else
                 {
@@ -859,7 +863,7 @@
     if (msg_topic == NULL)
     {
         LogError("Failed constructing message topic.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -867,14 +871,14 @@
         if (mqtt_get_msg == NULL)
         {
             LogError("Failed constructing mqtt message.");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
             if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
             {
                 LogError("Failed publishing to mqtt client.");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -894,7 +898,7 @@
     if (mqtt_info == NULL)
     {
         LogError("Failed allocating device twin data.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -910,7 +914,7 @@
         {
             LogError("Failed constructing get Prop topic.");
             free(mqtt_info);
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
@@ -919,7 +923,7 @@
             {
                 LogError("Failed constructing mqtt message.");
                 free(mqtt_info);
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -927,7 +931,7 @@
                 {
                     LogError("Failed publishing to mqtt client.");
                     free(mqtt_info);
-                    result = __LINE__;
+                    result = __FAILURE__;
                 }
                 else
                 {
@@ -951,7 +955,7 @@
     if (msgTopic == NULL)
     {
         LogError("Failed constructing reported prop topic.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -960,21 +964,21 @@
         if (mqtt_rpt_msg == NULL)
         {
             LogError("Failed creating mqtt message");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
             if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0)
             {
                 LogError("Failed retrieving tickcounter info");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
                 if (mqtt_client_publish(transport_data->mqttClient, mqtt_rpt_msg) != 0)
                 {
                     LogError("Failed publishing mqtt message");
-                    result = __LINE__;
+                    result = __FAILURE__;
                 }
                 else
                 {
@@ -1012,7 +1016,7 @@
     if (mqttTopic == NULL)
     {
         LogError("Failure constructing string topic name.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -1023,7 +1027,7 @@
             if (propertyMap == NULL)
             {
                 LogError("Failure to retrieve IoTHubMessage_properties.");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -1031,11 +1035,12 @@
                 if (output == NULL)
                 {
                     LogError("Failure to allocate STRING_new.");
-                    result = __LINE__;
+                    result = __FAILURE__;
                 }
                 else
                 {
                     result = 0;
+
                     while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0)
                     {
                         const char* tokenData = STRING_c_str(output);
@@ -1046,7 +1051,7 @@
                         }
                         else
                         {
-                            if (!isSystemProperty(tokenData) )
+                            if (isSystemProperty(tokenData))
                             {
                                 const char* iterator = tokenData;
                                 while (iterator != NULL && *iterator != '\0' && result == 0)
@@ -1061,7 +1066,61 @@
 
                                         if (propName == NULL || propValue == NULL)
                                         {
-                                            result = __LINE__;
+                                            result = __FAILURE__;
+                                        }
+                                        else
+                                        {
+                                            strncpy(propName, tokenData, nameLen);
+                                            propName[nameLen] = '\0';
+
+                                            strncpy(propValue, iterator + 1, valLen);
+                                            propValue[valLen] = '\0';
+
+                                            if (nameLen > 3)
+                                            {
+                                                if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_ID_PROPERTY) == 0)
+                                                {
+                                                    if (IoTHubMessage_SetMessageId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
+                                                    {
+                                                        LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
+                                                        result = __FAILURE__;
+                                                    }
+                                                }
+
+                                                if (strcmp((const char*)&propName[nameLen - 3], CORRELATION_ID_PROPERTY) == 0)
+                                                {
+                                                    if (IoTHubMessage_SetCorrelationId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
+                                                    {
+                                                        LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
+                                                        result = __FAILURE__;
+                                                    }
+                                                }
+                                            }
+                                        }
+                                        free(propName);
+                                        free(propValue);
+
+                                        break;
+                                    }
+                                    iterator++;
+                                }
+                            }
+                            else
+                            {
+                                const char* iterator = tokenData;
+                                while (iterator != NULL && *iterator != '\0' && result == 0)
+                                {
+                                    if (*iterator == '=')
+                                    {
+                                        size_t nameLen = iterator - tokenData;
+                                        char* propName = malloc(nameLen + 1);
+
+                                        size_t valLen = tokenLen - (nameLen + 1) + 1;
+                                        char* propValue = malloc(valLen + 1);
+
+                                        if (propName == NULL || propValue == NULL)
+                                        {
+                                            result = __FAILURE__;
                                         }
                                         else
                                         {
@@ -1074,7 +1133,7 @@
                                             if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK)
                                             {
                                                 LogError("Map_AddOrUpdate failed.");
-                                                result = __LINE__;
+                                                result = __FAILURE__;
                                             }
                                         }
                                         free(propName);
@@ -1095,7 +1154,7 @@
         else
         {
             LogError("Unable to create Tokenizer object.");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         STRING_delete(mqttTopic);
     }
@@ -1520,7 +1579,7 @@
         if (transport_data->xioTransport == NULL)
         {
             LogError("Unable to create the lower level TLS layer.");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
@@ -1542,7 +1601,7 @@
     STRING_HANDLE emptyKeyName = STRING_new();
     if (emptyKeyName == NULL)
     {
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -1557,7 +1616,7 @@
             {
                 IotHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN);
                 STRING_delete(sasToken);
-                result = __LINE__;
+                result = __FAILURE__;
             }
             break;
         case DEVICE_KEY:
@@ -1596,7 +1655,7 @@
                 if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0)
                 {
                     LogError("failure connecting to address %s:%d.", STRING_c_str(transport_data->hostAddress), transport_data->portNum);
-                    result = __LINE__;
+                    result = __FAILURE__;
                 }
                 else
                 {
@@ -1606,7 +1665,7 @@
             }
             else
             {
-                result = __LINE__;
+                result = __FAILURE__;
             }
             
             STRING_delete(sasToken);
@@ -1630,14 +1689,14 @@
             if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
             {
                 transport_data->connectFailCount++;
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
                 if (SendMqttConnectMsg(transport_data) != 0)
                 {
                     transport_data->connectFailCount++;
-                    result = __LINE__;
+                    result = __FAILURE__;
                 }
                 else
                 {
@@ -1655,7 +1714,7 @@
             if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
             {
                 transport_data->connectFailCount++;
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -1704,12 +1763,12 @@
         if (transport_data->transport_creds.CREDENTIAL_VALUE.deviceKey == NULL)
         {
             LogError("Could not create device key for MQTT");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else if ((transport_data->devicesPath = STRING_construct_sprintf("%s.%s/devices/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId)) == NULL)
         {
             STRING_delete(transport_data->transport_creds.CREDENTIAL_VALUE.deviceKey);
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
@@ -1722,7 +1781,7 @@
         transport_data->transport_creds.CREDENTIAL_VALUE.deviceSasToken = STRING_construct(upperConfig->deviceSasToken);
         if (transport_data->transport_creds.CREDENTIAL_VALUE.deviceSasToken == NULL)
         {
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
@@ -2035,7 +2094,7 @@
     {
         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_042: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
         LogError("Invalid handle parameter. NULL.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -2047,7 +2106,7 @@
             {
                 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_046: [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return a non-zero value.] */
                 LogError("Failure: unable constructing reported state topic");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -2067,7 +2126,7 @@
             if (transport_data->topic_NotifyState == NULL)
             {
                 LogError("Failure: unable constructing notify state topic");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -2126,7 +2185,7 @@
     {
         /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_001 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
         LogError("Invalid handle parameter. NULL.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -2138,7 +2197,7 @@
             {
                 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_006 : [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
                 LogError("Failure: unable constructing device method subscribe topic");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -2211,7 +2270,7 @@
         if (dev_method_info == NULL)
         {
             LogError("Failure: DEVICE_METHOD_INFO was NULL");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {
@@ -2219,7 +2278,7 @@
             {
                 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
                 LogError("Failure: publishing device method response");
-                result = __LINE__;
+                result = __FAILURE__;
             }
             else
             {
@@ -2232,7 +2291,7 @@
     else
     {
         /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
-        result = __LINE__;
+        result = __FAILURE__;
         LogError("Failure: invalid IOTHUB_DEVICE_HANDLE parameter specified");
     }
     return result;
@@ -2246,7 +2305,7 @@
     {
         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
         LogError("Invalid handle parameter. NULL.");
-        result = __LINE__;
+        result = __FAILURE__;
     }
     else
     {
@@ -2255,7 +2314,7 @@
         if (transport_data->topic_MqttMessage == NULL)
         {
             LogError("Failure constructing Message Topic");
-            result = __LINE__;
+            result = __FAILURE__;
         }
         else
         {