Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 13:606465879c57
- Parent:
- 12:658ca6865de2
- Child:
- 14:4dc2b011be33
--- a/iothubtransport_mqtt_common.c Wed Nov 16 21:37:42 2016 -0800 +++ b/iothubtransport_mqtt_common.c Wed Dec 14 15:59:37 2016 -0800 @@ -95,8 +95,6 @@ { "iothub-ack", 10 } }; -static TICK_COUNTER_HANDLE g_msgTickCounter; - typedef enum DEVICE_TWIN_MSG_TYPE_TAG { REPORTED_STATE, @@ -190,11 +188,12 @@ bool device_twin_get_sent; bool isRecoverableError; uint16_t keepAliveValue; - uint64_t mqtt_connect_time; + tickcounter_ms_t mqtt_connect_time; size_t connectFailCount; - uint64_t connectTick; + tickcounter_ms_t connectTick; bool log_trace; bool raw_trace; + TICK_COUNTER_HANDLE msgTickCounter; // Internal lists for message tracking PDLIST_ENTRY waitingToSend; @@ -212,7 +211,7 @@ typedef struct MQTT_DEVICE_TWIN_ITEM_TAG { - uint64_t msgPublishTime; + tickcounter_ms_t msgPublishTime; size_t retryCount; IOTHUB_IDENTITY_TYPE iothub_type; uint16_t packet_id; @@ -224,7 +223,7 @@ typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG { - uint64_t msgPublishTime; + tickcounter_ms_t msgPublishTime; size_t retryCount; IOTHUB_MESSAGE_LIST* iotHubMessageEntry; void* context; @@ -812,7 +811,7 @@ } else { - if (tickcounter_get_current_ms(g_msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0) + if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0) { LogError("Failed retrieving tickcounter info"); result = __LINE__; @@ -950,7 +949,7 @@ } else { - if (tickcounter_get_current_ms(g_msgTickCounter, &mqtt_info->msgPublishTime) != 0) + if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0) { LogError("Failed retrieving tickcounter info"); result = __LINE__; @@ -1341,24 +1340,24 @@ } transport_data->isConnected = false; transport_data->currPacketState = PACKET_TYPE_ERROR; - transport_data->device_twin_get_sent = false; + transport_data->device_twin_get_sent = false; if (transport_data->topic_MqttMessage != NULL) { - transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC; - } - if (transport_data->topic_GetState != NULL) - { - transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC; - } - if (transport_data->topic_NotifyState != NULL) - { - transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC; - } - if (transport_data->topic_DeviceMethods != NULL) - { - transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC; + transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC; + } + if (transport_data->topic_GetState != NULL) + { + transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC; } - } + if (transport_data->topic_NotifyState != NULL) + { + transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC; + } + if (transport_data->topic_DeviceMethods != NULL) + { + transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC; + } + } else { LogError("Failure: mqtt called back with null context."); @@ -1557,7 +1556,7 @@ } else { - (void)tickcounter_get_current_ms(g_msgTickCounter, &transport_data->mqtt_connect_time); + (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time); result = 0; } } @@ -1584,7 +1583,7 @@ // to back off the connecting to the server if (!transport_data->isConnected && transport_data->isRecoverableError && CanRetry(transport_data->retryLogic)) { - if (tickcounter_get_current_ms(g_msgTickCounter, &transport_data->connectTick) != 0) + if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0) { transport_data->connectFailCount++; result = __LINE__; @@ -1608,8 +1607,8 @@ if (transport_data->isConnected) { // We are isConnected and not being closed, so does SAS need to reconnect? - uint64_t current_time; - if (tickcounter_get_current_ms(g_msgTickCounter, ¤t_time) != 0) + tickcounter_ms_t current_time; + if (tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_time) != 0) { transport_data->connectFailCount++; result = __LINE__; @@ -1705,9 +1704,16 @@ { LogError("Could not create MQTT transport state. Memory allocation failed."); } + else if ((state->msgTickCounter = tickcounter_create()) == NULL) + { + LogError("Invalid Argument: iotHubName is empty"); + free(state); + state = NULL; + } else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL) { LogError("failure constructing device_id."); + tickcounter_destroy(state->msgTickCounter); free(state); state = NULL; } @@ -1716,6 +1722,7 @@ if (construct_credential_information(upperConfig, state) != 0) { STRING_delete(state->device_id); + tickcounter_destroy(state->msgTickCounter); free(state); state = NULL; } @@ -1732,6 +1739,7 @@ STRING_delete(state->transport_creds.CREDENTIAL_VALUE.deviceSasToken); } STRING_delete(state->device_id); + tickcounter_destroy(state->msgTickCounter); free(state); state = NULL; } @@ -1752,6 +1760,7 @@ } STRING_delete(state->topic_MqttEvent); STRING_delete(state->device_id); + tickcounter_destroy(state->msgTickCounter); free(state); state = NULL; } @@ -1782,6 +1791,7 @@ mqtt_client_deinit(state->mqttClient); STRING_delete(state->topic_MqttEvent); STRING_delete(state->device_id); + tickcounter_destroy(state->msgTickCounter); free(state); state = NULL; } @@ -1800,6 +1810,7 @@ STRING_delete(state->hostAddress); STRING_delete(state->topic_MqttEvent); STRING_delete(state->device_id); + tickcounter_destroy(state->msgTickCounter); free(state); state = NULL; } @@ -1892,19 +1903,10 @@ LogError("Invalid Argument: iotHubName is empty"); result = NULL; } - else if ((g_msgTickCounter = tickcounter_create()) == NULL) - { - LogError("Invalid Argument: iotHubName is empty"); - result = NULL; - } else { result = InitializeTransportHandleData(config->upperConfig, config->waitingToSend); - if (result == NULL) - { - tickcounter_destroy(g_msgTickCounter); - } - else + if (result != NULL) { result->get_io_transport = get_io_transport; } @@ -1975,7 +1977,7 @@ STRING_delete(transport_data->topic_NotifyState); STRING_delete(transport_data->topic_DeviceMethods); - tickcounter_destroy(g_msgTickCounter); + tickcounter_destroy(transport_data->msgTickCounter); DestroyRetryLogic(transport_data->retryLogic); free(transport_data); } @@ -2321,8 +2323,8 @@ DLIST_ENTRY nextListEntry; nextListEntry.Flink = currentListEntry->Flink; - uint64_t current_ms; - (void)tickcounter_get_current_ms(g_msgTickCounter, ¤t_ms); + tickcounter_ms_t current_ms; + (void)tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_ms); /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_033: [IoTHubTransport_MQTT_Common_DoWork shall iterate through the Waiting Acknowledge messages looking for any message that has been waiting longer than 2 min.]*/ if (((current_ms - mqttMsgEntry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN) {