Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
20:594780be216d
Parent:
19:f87dfe76bc70
Child:
21:167f8fcf6fb1
--- a/iothubtransport_mqtt_common.c	Fri Mar 10 11:47:21 2017 -0800
+++ b/iothubtransport_mqtt_common.c	Fri Mar 24 16:35:20 2017 -0700
@@ -22,6 +22,7 @@
 #include "azure_c_shared_utility/platform.h"
 
 #include "azure_c_shared_utility/string_tokenizer.h"
+#include "azure_c_shared_utility/shared_util_options.h"
 #include "iothub_client_version.h"
 
 #include "iothubtransport_mqtt_common.h"
@@ -141,6 +142,13 @@
     } CREDENTIAL_VALUE;
 } MQTT_TRANSPORT_CREDENTIALS;
 
+typedef enum MQTT_CLIENT_STATUS_TAG
+{
+	MQTT_CLIENT_STATUS_NOT_CONNECTED,
+	MQTT_CLIENT_STATUS_CONNECTING,
+	MQTT_CLIENT_STATUS_CONNECTED
+} MQTT_CLIENT_STATUS;
+
 typedef struct MQTTTRANSPORT_HANDLE_DATA_TAG
 {
     // Topic control
@@ -184,7 +192,7 @@
 
     // Connection state control
     bool isRegistered;
-    bool isConnected;
+    MQTT_CLIENT_STATUS mqttClientStatus;
     bool isDestroyCalled;
     bool device_twin_get_sent;
     bool isRecoverableError;
@@ -208,6 +216,11 @@
 
     //Retry Logic
     RETRY_LOGIC* retryLogic;
+
+    char* http_proxy_hostname;
+    int http_proxy_port;
+    char* http_proxy_username;
+    char* http_proxy_password;
 } MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA;
 
 typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
@@ -237,6 +250,27 @@
     STRING_HANDLE request_id;
 } DEVICE_METHOD_INFO;
 
+static void free_proxy_data(MQTTTRANSPORT_HANDLE_DATA* mqtt_transport_instance)
+{
+    if (mqtt_transport_instance->http_proxy_hostname != NULL)
+    {
+        free(mqtt_transport_instance->http_proxy_hostname);
+        mqtt_transport_instance->http_proxy_hostname = NULL;
+    }
+
+    if (mqtt_transport_instance->http_proxy_username != NULL)
+    {
+        free(mqtt_transport_instance->http_proxy_username);
+        mqtt_transport_instance->http_proxy_username = NULL;
+    }
+
+    if (mqtt_transport_instance->http_proxy_password != NULL)
+    {
+        free(mqtt_transport_instance->http_proxy_password);
+        mqtt_transport_instance->http_proxy_password = NULL;
+    }
+}
+
 static int RetryPolicy_Exponential_BackOff_With_Jitter(bool *permit, size_t* delay, void* retryContextCallback)
 {
     int result;
@@ -1355,6 +1389,7 @@
                         // The connect packet has been acked
                         transport_data->currPacketState = CONNACK_TYPE;
                         transport_data->isRecoverableError = true;
+                        transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTED;
                         StopRetryTimer(transport_data->retryLogic);
                         IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK);
                     }
@@ -1375,7 +1410,7 @@
                         }
                         LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode) );
                         (void)mqtt_client_disconnect(transport_data->mqttClient);
-                        transport_data->isConnected = false;
+                        transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
                         transport_data->currPacketState = PACKET_TYPE_ERROR;
                     }
                 }
@@ -1416,7 +1451,7 @@
             case MQTT_CLIENT_ON_DISCONNECT:
             {
                 // Close the client so we can reconnect again
-                transport_data->isConnected = false;
+                transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
                 transport_data->currPacketState = DISCONNECT_TYPE;
                 break;
             }
@@ -1441,6 +1476,11 @@
                 IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK);
                 break;
             }
+            case MQTT_CLIENT_COMMUNICATION_ERROR:
+            {
+                IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR);
+                break;
+            }
             case MQTT_CLIENT_NO_PING_RESPONSE:
             {
                 LogError("Mqtt Ping Response was not encountered.  Reconnecting device...");
@@ -1448,13 +1488,13 @@
             }
             case MQTT_CLIENT_PARSE_ERROR:
             case MQTT_CLIENT_MEMORY_ERROR:
-            case MQTT_CLIENT_COMMUNICATION_ERROR:
             case MQTT_CLIENT_UNKNOWN_ERROR: 
             {
                 LogError("INTERNAL ERROR: unexpected error value received %s", ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error));
+                break;
             }
         }
-        transport_data->isConnected = false;
+        transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
         transport_data->currPacketState = PACKET_TYPE_ERROR;
         transport_data->device_twin_get_sent = false;
         if (transport_data->topic_MqttMessage != NULL)
@@ -1588,7 +1628,16 @@
     {
         // construct address
         const char* hostAddress = STRING_c_str(transport_data->hostAddress);
-        transport_data->xioTransport = transport_data->get_io_transport(hostAddress);
+        MQTT_TRANSPORT_PROXY_OPTIONS mqtt_proxy_options;
+
+        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_011: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create`. ]*/
+        mqtt_proxy_options.host_address = transport_data->http_proxy_hostname;
+        mqtt_proxy_options.port = transport_data->http_proxy_port;
+        mqtt_proxy_options.username = transport_data->http_proxy_username;
+        mqtt_proxy_options.password = transport_data->http_proxy_password;
+
+        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_010: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create` to obtain the underlying IO handle. ]*/
+        transport_data->xioTransport = transport_data->get_io_transport(hostAddress, (transport_data->http_proxy_hostname == NULL) ? NULL : &mqtt_proxy_options);
         if (transport_data->xioTransport == NULL)
         {
             LogError("Unable to create the lower level TLS layer.");
@@ -1688,6 +1737,16 @@
     return result;
 }
 
+static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
+{
+	(void)mqtt_client_disconnect(transport_data->mqttClient);
+	xio_destroy(transport_data->xioTransport);
+	transport_data->xioTransport = NULL;
+
+	transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
+	transport_data->currPacketState = DISCONNECT_TYPE;
+}
+
 static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transport_data)
 {
     int result = 0;
@@ -1695,9 +1754,9 @@
     // Make sure we're not destroying the object
     if (!transport_data->isDestroyCalled)
     {
-        // If we are not isConnected then check to see if we need 
+        // If we are MQTT_CLIENT_STATUS_NOT_CONNECTED then check to see if we need 
         // to back off the connecting to the server
-        if (!transport_data->isConnected && transport_data->isRecoverableError && CanRetry(transport_data->retryLogic))
+        if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError && CanRetry(transport_data->retryLogic))
         {
             if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
             {
@@ -1713,16 +1772,31 @@
                 }
                 else
                 {
+                    transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTING;
                     transport_data->connectFailCount = 0;
-                    transport_data->isConnected = true;
                     result = 0;
                 }
             }
         }
-
-        if (transport_data->isConnected)
+		// Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_001: [ IoTHubTransport_MQTT_Common_DoWork shall trigger reconnection if the mqtt_client_connect does not complete within `keepalive` seconds]
+        else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTING)
         {
-            // We are isConnected and not being closed, so does SAS need to reconnect?
+            tickcounter_ms_t current_time;
+            if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
+            {
+                LogError("failed verifying MQTT_CLIENT_STATUS_CONNECTING timeout");
+                result = __FAILURE__;
+            }
+            else if ((current_time - transport_data->mqtt_connect_time) / 1000 > transport_data->keepAliveValue) 
+            {
+                LogError("mqtt_client timed out waiting for CONNACK");
+                DisconnectFromClient(transport_data);
+                result = 0;
+            }
+        }
+        else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
+        {
+            // We are connected and not being closed, so does SAS need to reconnect?
             tickcounter_ms_t current_time;
             if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
             {
@@ -1735,7 +1809,7 @@
                 {
                     (void)mqtt_client_disconnect(transport_data->mqttClient);
                     IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN);
-                    transport_data->isConnected = false;
+                    transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
                     transport_data->currPacketState = UNKNOWN_TYPE;
                     transport_data->device_twin_get_sent = false;
                     if (transport_data->topic_MqttMessage != NULL)
@@ -1937,7 +2011,7 @@
                     DList_InitializeListHead(&(state->ack_waiting_queue));
                     state->isDestroyCalled = false;
                     state->isRegistered = false;
-                    state->isConnected = false;
+                    state->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
                     state->device_twin_get_sent = false;
                     state->isRecoverableError = true;
                     state->packetId = 1;
@@ -2025,6 +2099,9 @@
         if (result != NULL)
         {
             result->get_io_transport = get_io_transport;
+            result->http_proxy_hostname = NULL;
+            result->http_proxy_username = NULL;
+            result->http_proxy_password = NULL;
         }
     }
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_009: [If any error is encountered then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
@@ -2032,16 +2109,6 @@
     return result;
 }
 
-static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
-{
-    (void)mqtt_client_disconnect(transport_data->mqttClient);
-    xio_destroy(transport_data->xioTransport);
-    transport_data->xioTransport = NULL;
-
-    transport_data->isConnected = false;
-    transport_data->currPacketState = DISCONNECT_TYPE;
-}
-
 void IoTHubTransport_MQTT_Common_Destroy(TRANSPORT_LL_HANDLE handle)
 {
     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_012: [IoTHubTransport_MQTT_Common_Destroy shall do nothing if parameter handle is NULL.] */
@@ -2095,6 +2162,8 @@
 
         tickcounter_destroy(transport_data->msgTickCounter);
         DestroyRetryLogic(transport_data->retryLogic);
+        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/
+        free_proxy_data(transport_data);
         free(transport_data);
     }
 }
@@ -2555,7 +2624,7 @@
                     currentListEntry = savedFromCurrentListEntry.Flink;
                 }
             }
-            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is isConnected.] */
+            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */
             mqtt_client_dowork(transport_data->mqttClient);
         }
     }
@@ -2600,7 +2669,7 @@
         )
     {
         result = IOTHUB_CLIENT_INVALID_ARG;
-        LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption.");
+        LogError("invalid parameter (NULL) passed to IoTHubTransport_MQTT_Common_SetOption.");
     }
     else
     {
@@ -2626,9 +2695,9 @@
             if (*keepAliveOption != transport_data->keepAliveValue)
             {
                 transport_data->keepAliveValue = (uint16_t)(*keepAliveOption);
-                if (transport_data->isConnected)
+                if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_NOT_CONNECTED)
                 {
-                    /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is isConnected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */
+                    /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */
                     DisconnectFromClient(transport_data);
                 }
             }
@@ -2646,6 +2715,78 @@
             LogError("x509privatekey specified, but authentication method is not x509");
             result = IOTHUB_CLIENT_INVALID_ARG;
         }
+        else if (strcmp(OPTION_HTTP_PROXY, option) == 0)
+        {
+            /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/
+            HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value;
+
+            if (transport_data->xioTransport != NULL)
+            {
+                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_007: [ If the underlying IO has already been created, then `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                LogError("Cannot set proxy option once the underlying IO is created");
+                result = IOTHUB_CLIENT_ERROR;
+            }
+            else if (proxy_options->host_address == NULL)
+            {
+                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_003: [ If `host_address` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
+                LogError("NULL host_address in proxy options");
+                result = IOTHUB_CLIENT_INVALID_ARG;
+            }
+            /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_006: [ If only one of `username` and `password` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
+            else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) &&
+                (proxy_options->username != proxy_options->password))
+            {
+                LogError("Only one of username and password for proxy settings was NULL");
+                result = IOTHUB_CLIENT_INVALID_ARG;
+            }
+            else
+            {
+                char* copied_proxy_hostname = NULL;
+                char* copied_proxy_username = NULL;
+                char* copied_proxy_password = NULL;
+
+                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_002: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/
+                transport_data->http_proxy_port = proxy_options->port;
+                if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0)
+                {
+                    /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                    LogError("Cannot copy HTTP proxy hostname");
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
+                else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0))
+                {
+                    /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                    free(copied_proxy_hostname);
+                    LogError("Cannot copy HTTP proxy username");
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
+                else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0))
+                {
+                    /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
+                    if (copied_proxy_username != NULL)
+                    {
+                        free(copied_proxy_username);
+                    }
+                    free(copied_proxy_hostname);
+                    LogError("Cannot copy HTTP proxy password");
+                    result = IOTHUB_CLIENT_ERROR;
+                }
+                else
+                {
+                    /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/
+                    free_proxy_data(transport_data);
+
+                    transport_data->http_proxy_hostname = copied_proxy_hostname;
+                    transport_data->http_proxy_username = copied_proxy_username;
+                    transport_data->http_proxy_password = copied_proxy_password;
+
+                    /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_008: [ If setting the `proxy_data` option suceeds, `IoTHubTransport_MQTT_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/
+                    result = IOTHUB_CLIENT_OK;
+                }
+            }
+        }
         else
         {
             /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */