Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
41:410450f16a9f
Parent:
40:cb03d6a6f46d
--- a/iothubtransport_mqtt_common.c	Tue Jun 26 19:14:02 2018 -0700
+++ b/iothubtransport_mqtt_common.c	Tue Sep 11 11:12:42 2018 -0700
@@ -11,6 +11,7 @@
 #include "azure_c_shared_utility/doublylinkedlist.h"
 #include "azure_c_shared_utility/crt_abstractions.h"
 #include "azure_c_shared_utility/agenttime.h"
+#include "azure_c_shared_utility/threadapi.h"
 
 #include "iothub_client_core_ll.h"
 #include "iothub_client_options.h"
@@ -52,6 +53,7 @@
 
 #define DEFAULT_RETRY_POLICY                IOTHUB_CLIENT_RETRY_EXPONENTIAL_BACKOFF_WITH_JITTER
 #define DEFAULT_RETRY_TIMEOUT_IN_SECONDS    0
+#define MAX_DISCONNECT_VALUE                50
 
 static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin";
 static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods";
@@ -165,6 +167,7 @@
     STRING_HANDLE module_id;
     STRING_HANDLE devicesAndModulesPath;
     int portNum;
+    bool conn_attempted;
 
     MQTT_GET_IO_TRANSPORT get_io_transport;
 
@@ -179,18 +182,20 @@
     // Upper layer
     IOTHUB_CLIENT_CORE_LL_HANDLE llClientHandle;
 
-    // Protocol 
+    // Protocol
     MQTT_CLIENT_HANDLE mqttClient;
     XIO_HANDLE xioTransport;
 
     // Session - connection
     uint16_t packetId;
+    uint16_t twin_resp_packet_id;
 
     // Connection state control
     bool isRegistered;
     MQTT_CLIENT_STATUS mqttClientStatus;
     bool isDestroyCalled;
     bool device_twin_get_sent;
+    bool twin_resp_sub_recv;
     bool isRecoverableError;
     uint16_t keepAliveValue;
     uint16_t connect_timeout_in_sec;
@@ -300,7 +305,7 @@
     set_saved_tls_options(transport_data, NULL);
 
     tickcounter_destroy(transport_data->msgTickCounter);
-    
+
     free_proxy_data(transport_data);
 
     STRING_delete(transport_data->devicesAndModulesPath);
@@ -314,7 +319,7 @@
     STRING_delete(transport_data->topic_NotifyState);
     STRING_delete(transport_data->topic_DeviceMethods);
     STRING_delete(transport_data->topic_InputQueue);
-    
+
     free(transport_data);
 }
 
@@ -357,7 +362,7 @@
 
 static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transport_data)
 {
-    if (transport_data->packetId+1 >= USHRT_MAX)
+    if (transport_data->packetId + 1 >= USHRT_MAX)
     {
         transport_data->packetId = 1;
     }
@@ -368,6 +373,7 @@
     return transport_data->packetId;
 }
 
+#ifndef NO_LOGGING
 static const char* retrieve_mqtt_return_codes(CONNECT_RETURN_CODE rtn_code)
 {
     switch (rtn_code)
@@ -389,6 +395,7 @@
             return "Unknown";
     }
 }
+#endif // NO_LOGGING
 
 static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
 {
@@ -430,7 +437,7 @@
                         const char* request_id_value = STRING_c_str(token_value);
                         if (memcmp(request_id_value, REQUEST_ID_PROPERTY, request_id_length) == 0)
                         {
-                            if (STRING_concat(request_id, request_id_value+request_id_length) != 0)
+                            if (STRING_concat(request_id, request_id_value + request_id_length) != 0)
                             {
                                 LogError("Failed STRING_concat failed.");
                                 result = __FAILURE__;
@@ -527,7 +534,7 @@
     {
         result = 0;
 
-        while(n-- && result == 0)
+        while (n-- && result == 0)
         {
             if (*s1 == 0) result = -1;
             else if (*s2 == 0) result = 1;
@@ -548,16 +555,16 @@
 static IOTHUB_IDENTITY_TYPE retrieve_topic_type(const char* topic_resp, const char* input_queue)
 {
     IOTHUB_IDENTITY_TYPE type;
-    if (InternStrnicmp(topic_resp, TOPIC_DEVICE_TWIN_PREFIX, sizeof(TOPIC_DEVICE_TWIN_PREFIX)-1) == 0)
+    if (InternStrnicmp(topic_resp, TOPIC_DEVICE_TWIN_PREFIX, sizeof(TOPIC_DEVICE_TWIN_PREFIX) - 1) == 0)
     {
         type = IOTHUB_TYPE_DEVICE_TWIN;
     }
-    else if (InternStrnicmp(topic_resp, TOPIC_DEVICE_METHOD_PREFIX, sizeof(TOPIC_DEVICE_METHOD_PREFIX)-1) == 0)
+    else if (InternStrnicmp(topic_resp, TOPIC_DEVICE_METHOD_PREFIX, sizeof(TOPIC_DEVICE_METHOD_PREFIX) - 1) == 0)
     {
         type = IOTHUB_TYPE_DEVICE_METHODS;
     }
     // input_queue contains additional "#" from subscribe, which we strip off on comparing incoming.
-    else if ((input_queue != NULL) && InternStrnicmp(topic_resp, input_queue, strlen(input_queue)-1) == 0)
+    else if ((input_queue != NULL) && InternStrnicmp(topic_resp, input_queue, strlen(input_queue) - 1) == 0)
     {
         type = IOTHUB_TYPE_EVENT_QUEUE;
     }
@@ -871,7 +878,7 @@
     int result;
     uint16_t packet_id = get_next_packet_id(transport_data);
 
-    STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id) );
+    STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id));
     if (msg_topic == NULL)
     {
         LogError("Failed constructing message topic.");
@@ -1005,10 +1012,53 @@
     return result;
 }
 
+static void changeStateToSubscribeIfAllowed(PMQTTTRANSPORT_HANDLE_DATA transport_data)
+{
+    if (transport_data->currPacketState != CONNACK_TYPE &&
+        transport_data->currPacketState != CONNECT_TYPE &&
+        transport_data->currPacketState != DISCONNECT_TYPE &&
+        transport_data->currPacketState != PACKET_TYPE_ERROR)
+    {
+        transport_data->currPacketState = SUBSCRIBE_TYPE;
+    }
+}
+
+static int subscribeToNotifyStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
+{
+    int result;
+
+    if (transport_data->topic_NotifyState == NULL)
+    {
+        transport_data->topic_NotifyState = STRING_construct(TOPIC_NOTIFICATION_STATE);
+        if (transport_data->topic_NotifyState == NULL)
+        {
+            LogError("Failure: unable constructing notify state topic");
+            result = __FAILURE__;
+        }
+        else
+        {
+            transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
+            result = 0;
+        }
+    }
+    else
+    {
+        result = 0;
+    }
+
+    if (result == 0)
+    {
+        changeStateToSubscribeIfAllowed(transport_data);
+    }
+
+    return result;
+}
+
+
 static bool isSystemProperty(const char* tokenData)
 {
     bool result = false;
-    size_t propCount = sizeof(sysPropList)/sizeof(sysPropList[0]);
+    size_t propCount = sizeof(sysPropList) / sizeof(sysPropList[0]);
     size_t index = 0;
     for (index = 0; index < propCount; index++)
     {
@@ -1069,7 +1119,7 @@
             result = __FAILURE__;
         }
         STRING_TOKENIZER_destroy(token_handle);
-     }
+    }
 
     return result;
 }
@@ -1078,7 +1128,7 @@
 {
     // Not finding a system property to map to isn't an error.
     int result = 0;
-    
+
     if (nameLen > 4)
     {
         if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_DEVICE_ID) == 0)
@@ -1213,10 +1263,10 @@
                                         }
                                         else
                                         {
-                                            strncpy(propName, tokenData, nameLen);
+                                            memcpy(propName, tokenData, nameLen);
                                             propName[nameLen] = '\0';
 
-                                            strncpy(propValue, iterator + 1, valLen);
+                                            memcpy(propValue, iterator + 1, valLen);
                                             propValue[valLen] = '\0';
 
                                             if (urldecode)
@@ -1270,10 +1320,10 @@
                                         }
                                         else
                                         {
-                                            strncpy(propName, tokenData, nameLen);
+                                            memcpy(propName, tokenData, nameLen);
                                             propName[nameLen] = '\0';
 
-                                            strncpy(propValue, iterator + 1, valLen);
+                                            memcpy(propValue, iterator + 1, valLen);
                                             propValue[valLen] = '\0';
 
                                             if (urldecode)
@@ -1374,6 +1424,8 @@
                                 {
                                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */
                                     IoTHubClientCore_LL_RetrievePropertyComplete(transportData->llClientHandle, DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length);
+                                    // Only after receiving device twin request should we start listening for patches.
+                                    (void)subscribeToNotifyStateIfNeeded(transportData);
                                 }
                                 else
                                 {
@@ -1397,7 +1449,7 @@
                 }
                 else
                 {
-                    DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO) );
+                    DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO));
                     if (dev_method_info == NULL)
                     {
                         LogError("Failure: allocating DEVICE_METHOD_INFO object");
@@ -1564,7 +1616,7 @@
                         {
                             transport_data->isRecoverableError = false;
                         }
-                        LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode) );
+                        LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode));
                         transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_PENDING_CLOSE;
                         transport_data->currPacketState = PACKET_TYPE_ERROR;
                     }
@@ -1590,6 +1642,12 @@
                     }
                     // The connect packet has been acked
                     transport_data->currPacketState = SUBACK_TYPE;
+
+                    // Is this a twin message
+                    if (suback->packetId == transport_data->twin_resp_packet_id)
+                    {
+                        transport_data->twin_resp_sub_recv = true;
+                    }
                 }
                 else
                 {
@@ -1615,17 +1673,17 @@
             default:
             {
                 break;
-            } 
+            }
         }
     }
 }
 
-// Prior to creating a new connection, if we have an existing xioTransport we need to clear 
-// it now or else cached settings (especially TLS when communicating with HTTP proxies) will 
-// break reconnection attempt.  
+// Prior to creating a new connection, if we have an existing xioTransport that has been connected before
+// we need to clear it now or else cached settings (especially TLS when communicating with HTTP proxies)
+// will break reconnection attempt.
 static void ResetConnectionIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
 {
-    if (transport_data->xioTransport != NULL)
+    if (transport_data->xioTransport != NULL && transport_data->conn_attempted)
     {
         OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
         set_saved_tls_options(transport_data, options);
@@ -1635,6 +1693,15 @@
     }
 }
 
+static void mqtt_disconnect_cb(void* ctx)
+{
+    if (ctx != NULL)
+    {
+        int* disconn_recv = (int*)ctx;
+        *disconn_recv = 1;
+    }
+}
+
 static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
 {
     if (!transport_data->isDestroyCalled)
@@ -1642,8 +1709,19 @@
         OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
         set_saved_tls_options(transport_data, options);
     }
-
-    (void)mqtt_client_disconnect(transport_data->mqttClient, NULL, NULL);
+    // Ensure the disconnect message is sent
+    if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
+    {
+        int disconn_recv = 0;
+        (void)mqtt_client_disconnect(transport_data->mqttClient, mqtt_disconnect_cb, &disconn_recv);
+        size_t disconnect_ctr = 0;
+        do
+        {
+            mqtt_client_dowork(transport_data->mqttClient);
+            disconnect_ctr++;
+            ThreadAPI_Sleep(50);
+        } while ((disconnect_ctr < MAX_DISCONNECT_VALUE) && (disconn_recv == 0));
+    }
     xio_destroy(transport_data->xioTransport);
     transport_data->xioTransport = NULL;
 
@@ -1677,7 +1755,7 @@
             }
             case MQTT_CLIENT_PARSE_ERROR:
             case MQTT_CLIENT_MEMORY_ERROR:
-            case MQTT_CLIENT_UNKNOWN_ERROR: 
+            case MQTT_CLIENT_UNKNOWN_ERROR:
             {
                 LogError("INTERNAL ERROR: unexpected error value received %s", ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error));
                 break;
@@ -1722,6 +1800,7 @@
     {
         uint32_t topic_subscription = 0;
         size_t subscribe_count = 0;
+        uint16_t packet_id = get_next_packet_id(transport_data);
         SUBSCRIBE_PAYLOAD subscribe[SUBSCRIBE_TOPIC_COUNT];
         if ((transport_data->topic_MqttMessage != NULL) && (SUBSCRIBE_TELEMETRY_TOPIC & transport_data->topics_ToSubscribe))
         {
@@ -1736,6 +1815,7 @@
             subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
             topic_subscription |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
             subscribe_count++;
+            transport_data->twin_resp_packet_id = packet_id;
         }
         if ((transport_data->topic_NotifyState != NULL) && (SUBSCRIBE_NOTIFICATION_STATE_TOPIC & transport_data->topics_ToSubscribe))
         {
@@ -1762,7 +1842,7 @@
         if (subscribe_count != 0)
         {
             /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall call mqtt_client_subscribe to subscribe to the Message Topic.] */
-            if (mqtt_client_subscribe(transport_data->mqttClient, get_next_packet_id(transport_data), subscribe, subscribe_count) != 0)
+            if (mqtt_client_subscribe(transport_data->mqttClient, packet_id, subscribe, subscribe_count) != 0)
             {
                 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
                 LogError("Failure: mqtt_client_subscribe returned error.");
@@ -1787,37 +1867,41 @@
     }
 }
 
-static const unsigned char* RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, size_t* length)
+static bool RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, const unsigned char** payload, size_t* length)
 {
-    const unsigned char* result;
-
+    bool result;
     IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(messageHandle);
     if (contentType == IOTHUBMESSAGE_BYTEARRAY)
     {
-        if (IoTHubMessage_GetByteArray(messageHandle, &result, length) != IOTHUB_MESSAGE_OK)
+        if (IoTHubMessage_GetByteArray(messageHandle, &(*payload), length) != IOTHUB_MESSAGE_OK)
         {
             LogError("Failure result from IoTHubMessage_GetByteArray");
-            result = NULL;
+            result = false;
             *length = 0;
         }
+        else
+        {
+            result = true;
+        }
     }
     else if (contentType == IOTHUBMESSAGE_STRING)
     {
-        result = (const unsigned char*)IoTHubMessage_GetString(messageHandle);
-        if (result == NULL)
+        *payload = (const unsigned char*)IoTHubMessage_GetString(messageHandle);
+        if (*payload == NULL)
         {
             LogError("Failure result from IoTHubMessage_GetString");
-            result = NULL;
+            result = false;
             *length = 0;
         }
         else
         {
-            *length = strlen((const char*)result);
+            *length = strlen((const char*)*payload);
+            result = true;
         }
     }
     else
     {
-        result = NULL;
+        result = false;
         *length = 0;
     }
     return result;
@@ -1848,6 +1932,7 @@
         }
         else
         {
+            transport_data->conn_attempted = true;
             if (transport_data->saved_tls_options != NULL)
             {
                 if (OptionHandler_FeedOptions(transport_data->saved_tls_options, transport_data->xioTransport) != OPTIONHANDLER_OK)
@@ -2047,7 +2132,7 @@
             {
                 result = __FAILURE__;
             }
-                
+
             if (sasToken != NULL)
             {
                 free(sasToken);
@@ -2069,7 +2154,7 @@
         RETRY_ACTION retry_action = RETRY_ACTION_RETRY_LATER;
 
         // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_007: [ IoTHubTransport_MQTT_Common_DoWork shall try to reconnect according to the current retry policy set ]
-        if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError && 
+        if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError &&
             (retry_control_should_retry(transport_data->retry_control_handle, &retry_action) != 0 || retry_action == RETRY_ACTION_RETRY_NOW))
         {
             // Note: in case retry_control_should_retry fails, the reconnection shall be attempted anyway (defaulting to policy IOTHUB_CLIENT_RETRY_IMMEDIATE).
@@ -2126,14 +2211,9 @@
                 if ((current_time - transport_data->mqtt_connect_time) / 1000 > (transport_data->option_sas_token_lifetime_secs*SAS_REFRESH_MULTIPLIER))
                 {
                     /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_058: [ If the sas token has timed out IoTHubTransport_MQTT_Common_DoWork shall disconnect from the mqtt client and destroy the transport information and wait for reconnect. ] */
-                    OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
-                    set_saved_tls_options(transport_data, options);
-                    (void)mqtt_client_disconnect(transport_data->mqttClient, NULL, NULL);
-                    xio_destroy(transport_data->xioTransport);
-                    transport_data->xioTransport = NULL;
+                    DisconnectFromClient(transport_data);
 
                     IoTHubClientCore_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN);
-                    transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
                     transport_data->currPacketState = UNKNOWN_TYPE;
                     transport_data->device_twin_get_sent = false;
                     if (transport_data->topic_MqttMessage != NULL)
@@ -2208,7 +2288,7 @@
     }
     else
     {
-        memset(state, 0, sizeof(MQTTTRANSPORT_HANDLE_DATA) );
+        memset(state, 0, sizeof(MQTTTRANSPORT_HANDLE_DATA));
         if ((state->msgTickCounter = tickcounter_create()) == NULL)
         {
             LogError("Invalid Argument: iotHubName is empty");
@@ -2242,7 +2322,7 @@
         }
         else
         {
-            if ( (state->topic_MqttEvent = buildMqttEventString(upperConfig->deviceId, moduleId) ) == NULL)
+            if ((state->topic_MqttEvent = buildMqttEventString(upperConfig->deviceId, moduleId)) == NULL)
             {
                 LogError("Could not create topic_MqttEvent for MQTT");
                 free_transport_handle_data(state);
@@ -2262,13 +2342,13 @@
                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_008: [If the upperConfig contains a valid protocolGatewayHostName value the this shall be used for the hostname, otherwise the hostname shall be constructed using the iothubname and iothubSuffix.] */
                     if (upperConfig->protocolGatewayHostName == NULL)
                     {
-                         state->hostAddress = STRING_construct_sprintf("%s.%s", upperConfig->iotHubName, upperConfig->iotHubSuffix);
+                        state->hostAddress = STRING_construct_sprintf("%s.%s", upperConfig->iotHubName, upperConfig->iotHubSuffix);
                     }
                     else
                     {
                         state->hostAddress = STRING_construct(upperConfig->protocolGatewayHostName);
                     }
-                
+
                     if (state->hostAddress == NULL)
                     {
                         LogError("failure constructing host address.");
@@ -2312,6 +2392,7 @@
                         state->isProductInfoSet = false;
                         state->option_sas_token_lifetime_secs = SAS_TOKEN_DEFAULT_LIFETIME;
                         state->auto_url_encode_decode = false;
+                        state->conn_attempted = false;
                     }
                 }
             }
@@ -2342,12 +2423,12 @@
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_003: [If both deviceKey & deviceSasToken fields are NOT NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
     else if (config->upperConfig == NULL ||
-             config->upperConfig->protocol == NULL || 
-             config->upperConfig->deviceId == NULL || 
-             ((config->upperConfig->deviceKey != NULL) && (config->upperConfig->deviceSasToken != NULL)) ||
-             //is_key_validate(config) != 0 ||
-             config->upperConfig->iotHubName == NULL || 
-             config->upperConfig->iotHubSuffix == NULL)
+        config->upperConfig->protocol == NULL ||
+        config->upperConfig->deviceId == NULL ||
+        ((config->upperConfig->deviceKey != NULL) && (config->upperConfig->deviceSasToken != NULL)) ||
+        //is_key_validate(config) != 0 ||
+        config->upperConfig->iotHubName == NULL ||
+        config->upperConfig->iotHubSuffix == NULL)
     {
         LogError("Invalid Argument: upperConfig structure contains an invalid parameter");
         result = NULL;
@@ -2359,7 +2440,7 @@
         result = NULL;
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_006: [If the upperConfig's variables deviceId is an empty strings or length is greater then 128 then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
-    else if ( ( (deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0) )
+    else if (((deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0))
     {
         LogError("Invalid Argument: DeviceId is of an invalid size");
         result = NULL;
@@ -2463,31 +2544,9 @@
         {
             result = 0;
         }
-        if (result == 0 && transport_data->topic_NotifyState == NULL)
-        {
-            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_044: [`IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin` shall construct the get state topic string and the notify state topic string.] */
-            transport_data->topic_NotifyState = STRING_construct(TOPIC_NOTIFICATION_STATE);
-            if (transport_data->topic_NotifyState == NULL)
-            {
-                LogError("Failure: unable constructing notify state topic");
-                result = __FAILURE__;
-            }
-            else
-            {
-                /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_047: [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return 0.] */
-                transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
-                result = 0;
-            }
-        }
         if (result == 0)
         {
-            if (transport_data->currPacketState != CONNACK_TYPE &&
-                transport_data->currPacketState != CONNECT_TYPE &&
-                transport_data->currPacketState != DISCONNECT_TYPE &&
-                transport_data->currPacketState != PACKET_TYPE_ERROR)
-            {
-                transport_data->currPacketState = SUBSCRIBE_TYPE;
-            }
+            changeStateToSubscribeIfAllowed(transport_data);
         }
     }
     return result;
@@ -2560,13 +2619,7 @@
         {
             /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_005 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall schedule the send of the subscription.]*/
             /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_007 : [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return 0.]*/
-            if (transport_data->currPacketState != CONNACK_TYPE &&
-                transport_data->currPacketState != CONNECT_TYPE &&
-                transport_data->currPacketState != DISCONNECT_TYPE &&
-                transport_data->currPacketState != PACKET_TYPE_ERROR)
-            {
-                transport_data->currPacketState = SUBSCRIBE_TYPE;
-            }
+            changeStateToSubscribeIfAllowed(transport_data);
         }
     }
     return result;
@@ -2666,7 +2719,7 @@
     else
     {
         /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the Message Topic.] */
-        transport_data->topic_MqttMessage = buildTopicMqttMessage(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id) );
+        transport_data->topic_MqttMessage = buildTopicMqttMessage(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
         if (transport_data->topic_MqttMessage == NULL)
         {
             LogError("Failure constructing Message Topic");
@@ -2676,13 +2729,7 @@
         {
             transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
             /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_035: [If current packet state is not CONNACT, DISCONNECT_TYPE, or PACKET_TYPE_ERROR then IoTHubTransport_MQTT_Common_Subscribe shall set the packet state to SUBSCRIBE_TYPE.]*/
-            if (transport_data->currPacketState != CONNACK_TYPE &&
-                transport_data->currPacketState != CONNECT_TYPE &&
-                transport_data->currPacketState != DISCONNECT_TYPE &&
-                transport_data->currPacketState != PACKET_TYPE_ERROR)
-            {
-                transport_data->currPacketState = SUBSCRIBE_TYPE;
-            }
+            changeStateToSubscribeIfAllowed(transport_data);
             result = 0;
         }
     }
@@ -2727,7 +2774,8 @@
 
         if (transport_data->currPacketState == PUBLISH_TYPE)
         {
-            if (item_type == IOTHUB_TYPE_DEVICE_TWIN)
+            // Ensure the reported property suback has been received
+            if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv)
             {
                 MQTT_DEVICE_TWIN_ITEM* mqtt_info = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM));
                 if (mqtt_info == NULL)
@@ -2741,7 +2789,7 @@
                     mqtt_info->iothub_type = item_type;
                     mqtt_info->iothub_msg_id = iothub_item->device_twin->item_id;
                     mqtt_info->retryCount = 0;
-                    
+
                     /* Codes_SRS_IOTHUBCLIENT_LL_07_005: [ If successful IoTHubTransport_MQTT_Common_ProcessItem shall add mqtt info structure acknowledgement queue. ] */
                     DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
 
@@ -2855,10 +2903,12 @@
                         else
                         {
                             size_t messageLength;
-                            const unsigned char* messagePayload = RetrieveMessagePayload(mqttMsgEntry->iotHubMessageEntry->messageHandle, &messageLength);
-                            if (messageLength == 0 || messagePayload == NULL)
+                            const unsigned char* messagePayload = NULL;
+                            if (!RetrieveMessagePayload(mqttMsgEntry->iotHubMessageEntry->messageHandle, &messagePayload, &messageLength))
                             {
                                 LogError("Failure from creating Message IoTHubMessage_GetData");
+                                (void)DList_RemoveEntryList(currentListEntry);
+                                sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
                             }
                             else
                             {
@@ -2884,9 +2934,11 @@
 
                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
                     size_t messageLength;
-                    const unsigned char* messagePayload = RetrieveMessagePayload(iothubMsgList->messageHandle, &messageLength);
-                    if (messageLength == 0 || messagePayload == NULL)
+                    const unsigned char* messagePayload = NULL;
+                    if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength))
                     {
+                        (void)(DList_RemoveEntryList(currentListEntry));
+                        sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
                         LogError("Failure result from IoTHubMessage_GetData");
                     }
                     else
@@ -3108,7 +3160,7 @@
         else
         {
             /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
-            if ( ((strcmp(OPTION_X509_CERT, option) == 0) || (strcmp(OPTION_X509_PRIVATE_KEY, option) == 0)) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509) )
+            if (((strcmp(OPTION_X509_CERT, option) == 0) || (strcmp(OPTION_X509_PRIVATE_KEY, option) == 0)) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509))
             {
                 IoTHubClient_Auth_Set_x509_Type(transport_data->authorization_module, true);
             }
@@ -3191,12 +3243,12 @@
                 LogError("IoTHubTransport_MQTT_Common_Register: deviceId does not match.");
                 result = NULL;
             }
-            else if (! check_module_ids_equal(STRING_c_str(transport_data->module_id), device->moduleId))
+            else if (!check_module_ids_equal(STRING_c_str(transport_data->module_id), device->moduleId))
             {
                 LogError("IoTHubTransport_MQTT_Common_Register: moduleId does not match.");
                 result = NULL;
             }
-            else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY && 
+            else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY &&
                 (strcmp(IoTHubClient_Auth_Get_DeviceKey(transport_data->authorization_module), device->deviceKey) != 0))
             {
                 LogError("IoTHubTransport_MQTT_Common_Register: deviceKey does not match.");
@@ -3306,14 +3358,7 @@
     {
         // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_067: [ IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the input queue Message Topic.]
         transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
-        if (transport_data->currPacketState != CONNACK_TYPE &&
-            transport_data->currPacketState != CONNECT_TYPE &&
-            transport_data->currPacketState != DISCONNECT_TYPE &&
-            transport_data->currPacketState != PACKET_TYPE_ERROR)
-        {
-            // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_068: [ If current packet state is not CONNACK, DISCONNECT_TYPE, or PACKET_TYPE_ERROR then IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall set the packet state to SUBSCRIBE_TYPE.]
-            transport_data->currPacketState = SUBSCRIBE_TYPE;
-        }
+        changeStateToSubscribeIfAllowed(transport_data);
         // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_070: [ On success IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return 0.]
         result = 0;
     }