Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
24:4096249decf1
Parent:
23:84f4c36da8c1
Child:
25:a35763780a87
--- a/iothubtransport_mqtt_common.c	Mon May 22 10:34:57 2017 -0700
+++ b/iothubtransport_mqtt_common.c	Fri Jun 30 10:40:56 2017 -0700
@@ -209,6 +209,7 @@
     int http_proxy_port;
     char* http_proxy_username;
     char* http_proxy_password;
+    bool isProductInfoSet;
 } MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA;
 
 typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
@@ -508,7 +509,7 @@
                     {
                         retryLogic->delayFromLastConnectToRetry = delay;
 
-                        LogInfo("Evaluated delay %d at %d attempt to retry\n", delay, retryLogic->retrycount);
+                        LogInfo("Evaluated delay time %d sec.  Retry attempt count %d\n", delay, retryLogic->retrycount);
 
                         // If the retry policy is telling us to connect right away ( <= ERROR_TIME_FOR_RETRY_SECS),
                         // or if enough time has elapsed, then we retry.
@@ -1719,20 +1720,41 @@
 
     if (result == 0)
     {
-        void* product_info;
-        STRING_HANDLE clone;
-        if ((IoTHubClient_LL_GetOption(transport_data->llClientHandle, OPTION_PRODUCT_INFO, &product_info) == IOTHUB_CLIENT_ERROR) || (product_info == NULL))
+        if (!transport_data->isProductInfoSet)
         {
-            clone = STRING_construct_sprintf("%s%%2F%s", CLIENT_DEVICE_TYPE_PREFIX, IOTHUB_SDK_VERSION);
-        }
-        else
-        {
-            clone = URL_Encode(product_info);
-        }
-        if (clone != NULL)
-        {
-            (void)STRING_concat_with_STRING(transport_data->configPassedThroughUsername, clone);
-            STRING_delete(clone);
+            // This requires the iothubClientHandle, which sadly the MQTT transport only gets on DoWork, so this code still needs to remain here.
+            // The correct place for this would be in the Create method, but we don't get the client handle there.
+            // Also, when device multiplexing is used, the customer creates the transport directly and explicitly, when the client is still not created.
+            // This will be a major hurdle when we add device multiplexing to MQTT transport.
+
+            void* product_info;
+            STRING_HANDLE clone;
+            if ((IoTHubClient_LL_GetOption(transport_data->llClientHandle, OPTION_PRODUCT_INFO, &product_info) == IOTHUB_CLIENT_ERROR) || (product_info == NULL))
+            {
+                clone = STRING_construct_sprintf("%s%%2F%s", CLIENT_DEVICE_TYPE_PREFIX, IOTHUB_SDK_VERSION);
+            }
+            else
+            {
+                clone = URL_Encode(product_info);
+            }
+
+            if (clone == NULL)
+            {
+                LogError("Failed obtaining the product info");
+            }
+            else
+            {
+                if (STRING_concat_with_STRING(transport_data->configPassedThroughUsername, clone) != 0)
+                {
+                    LogError("Failed concatenating the product info");
+                }
+                else
+                {
+                    transport_data->isProductInfoSet = true;
+                }
+
+                STRING_delete(clone);
+            }
         }
 
         MQTT_CLIENT_OPTIONS options = { 0 };
@@ -1751,7 +1773,7 @@
         {
             if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0)
             {
-                LogError("failure connecting to address %s:%d.", STRING_c_str(transport_data->hostAddress), transport_data->portNum);
+                LogError("failure connecting to address %s.", STRING_c_str(transport_data->hostAddress));
                 result = __FAILURE__;
             }
             else
@@ -1991,6 +2013,7 @@
                         state->retryLogic = NULL;
                         srand((unsigned int)get_time(NULL));
                         state->authorization_module = auth_module;
+                        state->isProductInfoSet = false;
                     }
                 }
             }
@@ -2497,21 +2520,36 @@
                 PDLIST_ENTRY currentListEntry = transport_data->telemetry_waitingForAck.Flink;
                 while (currentListEntry != &transport_data->telemetry_waitingForAck)
                 {
+                    tickcounter_ms_t current_ms;
                     MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentListEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
                     DLIST_ENTRY nextListEntry;
                     nextListEntry.Flink = currentListEntry->Flink;
 
-                    tickcounter_ms_t current_ms;
                     (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms);
                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_033: [IoTHubTransport_MQTT_Common_DoWork shall iterate through the Waiting Acknowledge messages looking for any message that has been waiting longer than 2 min.]*/
                     if (((current_ms - mqttMsgEntry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN)
                     {
-                        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_034: [If IoTHubTransport_MQTT_Common_DoWork has resent the message two times then it shall fail the message] */
+                        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_034: [If IoTHubTransport_MQTT_Common_DoWork has resent the message two times then it shall fail the message and reconnect to IoTHub ... ] */
                         if (mqttMsgEntry->retryCount >= MAX_SEND_RECOUNT_LIMIT)
                         {
+                            PDLIST_ENTRY current_entry;
                             (void)DList_RemoveEntryList(currentListEntry);
                             sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);
                             free(mqttMsgEntry);
+
+                            transport_data->currPacketState = PACKET_TYPE_ERROR;
+                            transport_data->device_twin_get_sent = false;
+                            DisconnectFromClient(transport_data);
+
+                            /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_057: [ ... then go through all the rest of the waiting messages and reset the retryCount on the message. ]*/
+                            current_entry = transport_data->telemetry_waitingForAck.Flink;
+                            while (current_entry != &transport_data->telemetry_waitingForAck)
+                            {
+                                MQTT_MESSAGE_DETAILS_LIST* msg_reset_entry;
+                                msg_reset_entry = containingRecord(current_entry, MQTT_MESSAGE_DETAILS_LIST, entry);
+                                msg_reset_entry->retryCount = 0;
+                                current_entry = current_entry->Flink;
+                            }
                         }
                         else
                         {