Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
13:606465879c57
Parent:
12:658ca6865de2
Child:
14:4dc2b011be33
--- a/iothubtransport_mqtt_common.c	Wed Nov 16 21:37:42 2016 -0800
+++ b/iothubtransport_mqtt_common.c	Wed Dec 14 15:59:37 2016 -0800
@@ -95,8 +95,6 @@
     { "iothub-ack", 10 }
 };
 
-static TICK_COUNTER_HANDLE g_msgTickCounter;
-
 typedef enum DEVICE_TWIN_MSG_TYPE_TAG
 {
     REPORTED_STATE,
@@ -190,11 +188,12 @@
     bool device_twin_get_sent;
     bool isRecoverableError;
     uint16_t keepAliveValue;
-    uint64_t mqtt_connect_time;
+    tickcounter_ms_t mqtt_connect_time;
     size_t connectFailCount;
-    uint64_t connectTick;
+    tickcounter_ms_t connectTick;
     bool log_trace;
     bool raw_trace;
+    TICK_COUNTER_HANDLE msgTickCounter;
 
     // Internal lists for message tracking
     PDLIST_ENTRY waitingToSend;
@@ -212,7 +211,7 @@
 
 typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
 {
-    uint64_t msgPublishTime;
+    tickcounter_ms_t msgPublishTime;
     size_t retryCount;
     IOTHUB_IDENTITY_TYPE iothub_type;
     uint16_t packet_id;
@@ -224,7 +223,7 @@
 
 typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG
 {
-    uint64_t msgPublishTime;
+    tickcounter_ms_t msgPublishTime;
     size_t retryCount;
     IOTHUB_MESSAGE_LIST* iotHubMessageEntry;
     void* context;
@@ -812,7 +811,7 @@
         }
         else
         {
-            if (tickcounter_get_current_ms(g_msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
+            if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
             {
                 LogError("Failed retrieving tickcounter info");
                 result = __LINE__;
@@ -950,7 +949,7 @@
         }
         else
         {
-            if (tickcounter_get_current_ms(g_msgTickCounter, &mqtt_info->msgPublishTime) != 0)
+            if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0)
             {
                 LogError("Failed retrieving tickcounter info");
                 result = __LINE__;
@@ -1341,24 +1340,24 @@
         }
         transport_data->isConnected = false;
         transport_data->currPacketState = PACKET_TYPE_ERROR;
-                transport_data->device_twin_get_sent = false;
+        transport_data->device_twin_get_sent = false;
         if (transport_data->topic_MqttMessage != NULL)
         {
-                transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
-                }
-                if (transport_data->topic_GetState != NULL)
-                {
-                    transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
-                }
-                if (transport_data->topic_NotifyState != NULL)
-                {
-                    transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
-                }
-                if (transport_data->topic_DeviceMethods != NULL)
-                {
-                    transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
+            transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
+        }
+        if (transport_data->topic_GetState != NULL)
+        {
+            transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
         }
-     }
+        if (transport_data->topic_NotifyState != NULL)
+        {
+            transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
+        }
+        if (transport_data->topic_DeviceMethods != NULL)
+        {
+            transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
+        }
+    }
     else
     {
         LogError("Failure: mqtt called back with null context.");
@@ -1557,7 +1556,7 @@
                 }
                 else
                 {
-                    (void)tickcounter_get_current_ms(g_msgTickCounter, &transport_data->mqtt_connect_time);
+                    (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time);
                     result = 0;
                 }
             }
@@ -1584,7 +1583,7 @@
         // to back off the connecting to the server
         if (!transport_data->isConnected && transport_data->isRecoverableError && CanRetry(transport_data->retryLogic))
         {
-            if (tickcounter_get_current_ms(g_msgTickCounter, &transport_data->connectTick) != 0)
+            if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
             {
                 transport_data->connectFailCount++;
                 result = __LINE__;
@@ -1608,8 +1607,8 @@
         if (transport_data->isConnected)
         {
             // We are isConnected and not being closed, so does SAS need to reconnect?
-            uint64_t current_time;
-            if (tickcounter_get_current_ms(g_msgTickCounter, &current_time) != 0)
+            tickcounter_ms_t current_time;
+            if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
             {
                 transport_data->connectFailCount++;
                 result = __LINE__;
@@ -1705,9 +1704,16 @@
     {
         LogError("Could not create MQTT transport state. Memory allocation failed.");
     }
+    else if ((state->msgTickCounter = tickcounter_create()) == NULL)
+    {
+        LogError("Invalid Argument: iotHubName is empty");
+        free(state);
+        state = NULL;
+    }
     else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL)
     {
         LogError("failure constructing device_id.");
+        tickcounter_destroy(state->msgTickCounter);
         free(state);
         state = NULL;
     }
@@ -1716,6 +1722,7 @@
         if (construct_credential_information(upperConfig, state) != 0)
         {
             STRING_delete(state->device_id);
+            tickcounter_destroy(state->msgTickCounter);
             free(state);
             state = NULL;
         }
@@ -1732,6 +1739,7 @@
                 STRING_delete(state->transport_creds.CREDENTIAL_VALUE.deviceSasToken);
             }
             STRING_delete(state->device_id);
+            tickcounter_destroy(state->msgTickCounter);
             free(state);
             state = NULL;
         }
@@ -1752,6 +1760,7 @@
                 }
                 STRING_delete(state->topic_MqttEvent);
                 STRING_delete(state->device_id);
+                tickcounter_destroy(state->msgTickCounter);
                 free(state);
                 state = NULL;
             }
@@ -1782,6 +1791,7 @@
                     mqtt_client_deinit(state->mqttClient);
                     STRING_delete(state->topic_MqttEvent);
                     STRING_delete(state->device_id);
+                    tickcounter_destroy(state->msgTickCounter);
                     free(state);
                     state = NULL;
                 }
@@ -1800,6 +1810,7 @@
                     STRING_delete(state->hostAddress);
                     STRING_delete(state->topic_MqttEvent);
                     STRING_delete(state->device_id);
+                    tickcounter_destroy(state->msgTickCounter);
                     free(state);
                     state = NULL;
                 }
@@ -1892,19 +1903,10 @@
         LogError("Invalid Argument: iotHubName is empty");
         result = NULL;
     }
-    else if ((g_msgTickCounter = tickcounter_create()) == NULL)
-    {
-        LogError("Invalid Argument: iotHubName is empty");
-        result = NULL;
-    }
     else
     {
         result = InitializeTransportHandleData(config->upperConfig, config->waitingToSend);
-        if (result == NULL)
-        {
-            tickcounter_destroy(g_msgTickCounter);
-        }
-        else
+        if (result != NULL)
         {
             result->get_io_transport = get_io_transport;
         }
@@ -1975,7 +1977,7 @@
         STRING_delete(transport_data->topic_NotifyState);
         STRING_delete(transport_data->topic_DeviceMethods);
 
-        tickcounter_destroy(g_msgTickCounter);
+        tickcounter_destroy(transport_data->msgTickCounter);
         DestroyRetryLogic(transport_data->retryLogic);
         free(transport_data);
     }
@@ -2321,8 +2323,8 @@
                     DLIST_ENTRY nextListEntry;
                     nextListEntry.Flink = currentListEntry->Flink;
 
-                    uint64_t current_ms;
-                    (void)tickcounter_get_current_ms(g_msgTickCounter, &current_ms);
+                    tickcounter_ms_t current_ms;
+                    (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms);
                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_033: [IoTHubTransport_MQTT_Common_DoWork shall iterate through the Waiting Acknowledge messages looking for any message that has been waiting longer than 2 min.]*/
                     if (((current_ms - mqttMsgEntry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN)
                     {