Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 24:4096249decf1
- Parent:
- 23:84f4c36da8c1
- Child:
- 25:a35763780a87
--- a/iothubtransport_mqtt_common.c Mon May 22 10:34:57 2017 -0700 +++ b/iothubtransport_mqtt_common.c Fri Jun 30 10:40:56 2017 -0700 @@ -209,6 +209,7 @@ int http_proxy_port; char* http_proxy_username; char* http_proxy_password; + bool isProductInfoSet; } MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA; typedef struct MQTT_DEVICE_TWIN_ITEM_TAG @@ -508,7 +509,7 @@ { retryLogic->delayFromLastConnectToRetry = delay; - LogInfo("Evaluated delay %d at %d attempt to retry\n", delay, retryLogic->retrycount); + LogInfo("Evaluated delay time %d sec. Retry attempt count %d\n", delay, retryLogic->retrycount); // If the retry policy is telling us to connect right away ( <= ERROR_TIME_FOR_RETRY_SECS), // or if enough time has elapsed, then we retry. @@ -1719,20 +1720,41 @@ if (result == 0) { - void* product_info; - STRING_HANDLE clone; - if ((IoTHubClient_LL_GetOption(transport_data->llClientHandle, OPTION_PRODUCT_INFO, &product_info) == IOTHUB_CLIENT_ERROR) || (product_info == NULL)) + if (!transport_data->isProductInfoSet) { - clone = STRING_construct_sprintf("%s%%2F%s", CLIENT_DEVICE_TYPE_PREFIX, IOTHUB_SDK_VERSION); - } - else - { - clone = URL_Encode(product_info); - } - if (clone != NULL) - { - (void)STRING_concat_with_STRING(transport_data->configPassedThroughUsername, clone); - STRING_delete(clone); + // This requires the iothubClientHandle, which sadly the MQTT transport only gets on DoWork, so this code still needs to remain here. + // The correct place for this would be in the Create method, but we don't get the client handle there. + // Also, when device multiplexing is used, the customer creates the transport directly and explicitly, when the client is still not created. + // This will be a major hurdle when we add device multiplexing to MQTT transport. + + void* product_info; + STRING_HANDLE clone; + if ((IoTHubClient_LL_GetOption(transport_data->llClientHandle, OPTION_PRODUCT_INFO, &product_info) == IOTHUB_CLIENT_ERROR) || (product_info == NULL)) + { + clone = STRING_construct_sprintf("%s%%2F%s", CLIENT_DEVICE_TYPE_PREFIX, IOTHUB_SDK_VERSION); + } + else + { + clone = URL_Encode(product_info); + } + + if (clone == NULL) + { + LogError("Failed obtaining the product info"); + } + else + { + if (STRING_concat_with_STRING(transport_data->configPassedThroughUsername, clone) != 0) + { + LogError("Failed concatenating the product info"); + } + else + { + transport_data->isProductInfoSet = true; + } + + STRING_delete(clone); + } } MQTT_CLIENT_OPTIONS options = { 0 }; @@ -1751,7 +1773,7 @@ { if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0) { - LogError("failure connecting to address %s:%d.", STRING_c_str(transport_data->hostAddress), transport_data->portNum); + LogError("failure connecting to address %s.", STRING_c_str(transport_data->hostAddress)); result = __FAILURE__; } else @@ -1991,6 +2013,7 @@ state->retryLogic = NULL; srand((unsigned int)get_time(NULL)); state->authorization_module = auth_module; + state->isProductInfoSet = false; } } } @@ -2497,21 +2520,36 @@ PDLIST_ENTRY currentListEntry = transport_data->telemetry_waitingForAck.Flink; while (currentListEntry != &transport_data->telemetry_waitingForAck) { + tickcounter_ms_t current_ms; MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentListEntry, MQTT_MESSAGE_DETAILS_LIST, entry); DLIST_ENTRY nextListEntry; nextListEntry.Flink = currentListEntry->Flink; - tickcounter_ms_t current_ms; (void)tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_ms); /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_033: [IoTHubTransport_MQTT_Common_DoWork shall iterate through the Waiting Acknowledge messages looking for any message that has been waiting longer than 2 min.]*/ if (((current_ms - mqttMsgEntry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN) { - /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_034: [If IoTHubTransport_MQTT_Common_DoWork has resent the message two times then it shall fail the message] */ + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_034: [If IoTHubTransport_MQTT_Common_DoWork has resent the message two times then it shall fail the message and reconnect to IoTHub ... ] */ if (mqttMsgEntry->retryCount >= MAX_SEND_RECOUNT_LIMIT) { + PDLIST_ENTRY current_entry; (void)DList_RemoveEntryList(currentListEntry); sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT); free(mqttMsgEntry); + + transport_data->currPacketState = PACKET_TYPE_ERROR; + transport_data->device_twin_get_sent = false; + DisconnectFromClient(transport_data); + + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_057: [ ... then go through all the rest of the waiting messages and reset the retryCount on the message. ]*/ + current_entry = transport_data->telemetry_waitingForAck.Flink; + while (current_entry != &transport_data->telemetry_waitingForAck) + { + MQTT_MESSAGE_DETAILS_LIST* msg_reset_entry; + msg_reset_entry = containingRecord(current_entry, MQTT_MESSAGE_DETAILS_LIST, entry); + msg_reset_entry->retryCount = 0; + current_entry = current_entry->Flink; + } } else {