Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 17:774695cb8554
- Parent:
- 16:14640ee83e99
- Child:
- 18:ec8e5e97c6a4
--- a/iothubtransport_mqtt_common.c Tue Jan 24 15:23:25 2017 -0800 +++ b/iothubtransport_mqtt_common.c Sat Jan 28 09:34:14 2017 -0800 @@ -43,8 +43,7 @@ #define FAILED_CONN_BACKOFF_VALUE 5 #define STATUS_CODE_FAILURE_VALUE 500 #define STATUS_CODE_TIMEOUT_VALUE 408 -#define ERROR_TIME_FOR_RETRY_SECS 5 -#define WAIT_TIME_SECS (ERROR_TIME_FOR_RETRY_SECS - 1) +#define ERROR_TIME_FOR_RETRY_SECS 5 // We won't retry more than once every 5 seconds static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin"; static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods"; @@ -75,6 +74,8 @@ #define SUBSCRIBE_DEVICE_METHOD_TOPIC 0x0010 #define SUBSCRIBE_TOPIC_COUNT 4 +DEFINE_ENUM_STRINGS(MQTT_CLIENT_EVENT_ERROR, MQTT_CLIENT_EVENT_ERROR_VALUES) + typedef struct SYSTEM_PROPERTY_INFO_TAG { const char* propName; @@ -110,17 +111,16 @@ typedef struct RETRY_LOGIC_TAG { - IOTHUB_CLIENT_RETRY_POLICY retryPolicy; - size_t retryTimeoutLimitInSeconds; - RETRY_POLICY fnRetryPolicy; - time_t start; - time_t stop; - time_t lastConnect; - bool retryStarted; - bool retryExpired; - bool firstAttempt; - size_t retrycount; - size_t delayFromLastConnectToRetry; + IOTHUB_CLIENT_RETRY_POLICY retryPolicy; // Type of policy we're using + size_t retryTimeoutLimitInSeconds; // If we don't connect in this many seconds, give up even trying. + RETRY_POLICY fnRetryPolicy; // Pointer to the policy function + time_t start; // When did we start retrying? + time_t lastConnect; // When did we last try to connect? + bool retryStarted; // true if the retry timer is set and we're trying to retry + bool retryExpired; // true if we haven't connected in retryTimeoutLimitInSeconds seconds + bool firstAttempt; // true on init so we can connect the first time without waiting for any timeouts. + size_t retrycount; // How many times have we tried connecting? + size_t delayFromLastConnectToRetry; // last time delta betweewn retry attempts. } RETRY_LOGIC; typedef struct MQTT_TRANSPORT_CREDENTIALS_TAG @@ -309,12 +309,11 @@ break; } retryLogic->start = EPOCH_TIME_T_VALUE; - retryLogic->stop = EPOCH_TIME_T_VALUE; retryLogic->delayFromLastConnectToRetry = 0; retryLogic->lastConnect = EPOCH_TIME_T_VALUE; retryLogic->retryStarted = false; retryLogic->retryExpired = false; - retryLogic->firstAttempt = false; + retryLogic->firstAttempt = true; retryLogic->retrycount = 0; } else @@ -345,10 +344,8 @@ retryLogic->retryStarted = true; } - if (retryLogic->firstAttempt == false) - { - retryLogic->firstAttempt = true; - } + retryLogic->firstAttempt = false + ; } else { @@ -363,7 +360,6 @@ { if (retryLogic->retryStarted == true) { - retryLogic->stop = get_time(NULL); retryLogic->retryStarted = false; retryLogic->delayFromLastConnectToRetry = 0; retryLogic->lastConnect = EPOCH_TIME_T_VALUE; @@ -430,18 +426,19 @@ LogError("Retry Logic is not created, retrying forever"); result = true; } - else if (now < 0 || retryLogic->start < 0 || retryLogic->stop < 0) + else if (now < 0 || retryLogic->start < 0) { LogError("Time could not be retrieved, retrying forever"); result = true; } else if (retryLogic->retryExpired) { + // We've given up trying to retry. Don't do anything. result = false; } - else if (retryLogic->firstAttempt == false) + else if (retryLogic->firstAttempt) { - // try now + // This is the first time ever running through this code. We need to try connecting no matter what. StartRetryTimer(retryLogic); retryLogic->lastConnect = now; retryLogic->retrycount++; @@ -449,44 +446,49 @@ } else { + // Are we trying to retry? if (retryLogic->retryStarted) { + // How long since we last tried to connect? Store this in difftime. double diffTime = get_difftime(now, retryLogic->lastConnect); + + // Has it been less than 5 seconds since we tried last? If so, we have to + // be careful so we don't hit the server too quickly. if (diffTime <= ERROR_TIME_FOR_RETRY_SECS) { - // Just right time to retry - if(diffTime > WAIT_TIME_SECS) - { - retryLogic->lastConnect = now; - retryLogic->retrycount++; - result = true; - } - else - { - // As do_work can be called within as little as 1 ms, wait to avoid throtling server - result = false; - } + // As do_work can be called within as little as 1 ms, wait to avoid throtling server + result = false; } else if (diffTime < retryLogic->delayFromLastConnectToRetry) { + // delayFromLastConnectionToRetry is either 0 (the first time around) + // or it's the backoff delta from the last time through the loop. + // If we're less than that, don't even bother trying. It's // Too early to retry result = false; } else { // last retry time evaluated have crossed, determine when to try next + // In other words, it migth be time to retry, so we should validate with the retry policy function. bool permit = false; size_t delay; if (retryLogic->fnRetryPolicy != NULL && (retryLogic->fnRetryPolicy(&permit, &delay, retryLogic) == 0)) { + // Does the policy function want us to retry (permit == true), or are we still allowed to retry? + // (in other words, are we within retryTimeoutLimitInSeconds seconds since starting to retry?) + // If so, see if we _really_ want to retry. if ((permit == true) && ((retryLogic->retryTimeoutLimitInSeconds == 0) || retryLogic->retryTimeoutLimitInSeconds >= (delay + get_difftime(now, retryLogic->start)))) { retryLogic->delayFromLastConnectToRetry = delay; LogInfo("Evaluated delay %d at %d attempt to retry\n", delay, retryLogic->retrycount); - if (retryLogic->delayFromLastConnectToRetry <= ERROR_TIME_FOR_RETRY_SECS) + // If the retry policy is telling us to connect right away ( <= ERROR_TIME_FOR_RETRY_SECS), + // or if enough time has elapsed, then we retry. + if ((retryLogic->delayFromLastConnectToRetry <= ERROR_TIME_FOR_RETRY_SECS) || + (diffTime >= retryLogic->delayFromLastConnectToRetry)) { retryLogic->lastConnect = now; retryLogic->retrycount++; @@ -494,12 +496,13 @@ } else { + // To soon to retry according to policy. result = false; } } else { - // Retry expired + // Retry expired. Stop trying. LogError("Retry timeout expired after %d attempts", retryLogic->retrycount); retryLogic->retryExpired = true; StopRetryTimer(retryLogic); @@ -508,6 +511,7 @@ } else { + // We don't have a retry policy. Sorry, can't even guess. Don't bother even trying to retry. LogError("Cannot evaluate the next best time to retry"); result = false; } @@ -515,7 +519,9 @@ } else { - // start retry when connection breaks + // Since this function is only called when the connection is + // already broken, we can start doing the rety logic. We'll do the + // actual interval checking next time around this loop. StartRetryTimer(retryLogic); //wait for next do work to evaluate next best attempt result = false; @@ -1342,6 +1348,10 @@ transport_data->currPacketState = DISCONNECT_TYPE; break; } + case MQTT_CLIENT_ON_UNSUBSCRIBE_ACK: + { + break; + } } } } @@ -1364,6 +1374,13 @@ LogError("Mqtt Ping Response was not encountered. Reconnecting device..."); break; } + case MQTT_CLIENT_PARSE_ERROR: + case MQTT_CLIENT_MEMORY_ERROR: + case MQTT_CLIENT_COMMUNICATION_ERROR: + case MQTT_CLIENT_UNKNOWN_ERROR: + { + LogError("INTERNAL ERROR: unexpected error value received %s", ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error)); + } } transport_data->isConnected = false; transport_data->currPacketState = PACKET_TYPE_ERROR;