Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 20:594780be216d
- Parent:
- 19:f87dfe76bc70
- Child:
- 21:167f8fcf6fb1
--- a/iothubtransport_mqtt_common.c Fri Mar 10 11:47:21 2017 -0800 +++ b/iothubtransport_mqtt_common.c Fri Mar 24 16:35:20 2017 -0700 @@ -22,6 +22,7 @@ #include "azure_c_shared_utility/platform.h" #include "azure_c_shared_utility/string_tokenizer.h" +#include "azure_c_shared_utility/shared_util_options.h" #include "iothub_client_version.h" #include "iothubtransport_mqtt_common.h" @@ -141,6 +142,13 @@ } CREDENTIAL_VALUE; } MQTT_TRANSPORT_CREDENTIALS; +typedef enum MQTT_CLIENT_STATUS_TAG +{ + MQTT_CLIENT_STATUS_NOT_CONNECTED, + MQTT_CLIENT_STATUS_CONNECTING, + MQTT_CLIENT_STATUS_CONNECTED +} MQTT_CLIENT_STATUS; + typedef struct MQTTTRANSPORT_HANDLE_DATA_TAG { // Topic control @@ -184,7 +192,7 @@ // Connection state control bool isRegistered; - bool isConnected; + MQTT_CLIENT_STATUS mqttClientStatus; bool isDestroyCalled; bool device_twin_get_sent; bool isRecoverableError; @@ -208,6 +216,11 @@ //Retry Logic RETRY_LOGIC* retryLogic; + + char* http_proxy_hostname; + int http_proxy_port; + char* http_proxy_username; + char* http_proxy_password; } MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA; typedef struct MQTT_DEVICE_TWIN_ITEM_TAG @@ -237,6 +250,27 @@ STRING_HANDLE request_id; } DEVICE_METHOD_INFO; +static void free_proxy_data(MQTTTRANSPORT_HANDLE_DATA* mqtt_transport_instance) +{ + if (mqtt_transport_instance->http_proxy_hostname != NULL) + { + free(mqtt_transport_instance->http_proxy_hostname); + mqtt_transport_instance->http_proxy_hostname = NULL; + } + + if (mqtt_transport_instance->http_proxy_username != NULL) + { + free(mqtt_transport_instance->http_proxy_username); + mqtt_transport_instance->http_proxy_username = NULL; + } + + if (mqtt_transport_instance->http_proxy_password != NULL) + { + free(mqtt_transport_instance->http_proxy_password); + mqtt_transport_instance->http_proxy_password = NULL; + } +} + static int RetryPolicy_Exponential_BackOff_With_Jitter(bool *permit, size_t* delay, void* retryContextCallback) { int result; @@ -1355,6 +1389,7 @@ // The connect packet has been acked transport_data->currPacketState = CONNACK_TYPE; transport_data->isRecoverableError = true; + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTED; StopRetryTimer(transport_data->retryLogic); IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK); } @@ -1375,7 +1410,7 @@ } LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode) ); (void)mqtt_client_disconnect(transport_data->mqttClient); - transport_data->isConnected = false; + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED; transport_data->currPacketState = PACKET_TYPE_ERROR; } } @@ -1416,7 +1451,7 @@ case MQTT_CLIENT_ON_DISCONNECT: { // Close the client so we can reconnect again - transport_data->isConnected = false; + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED; transport_data->currPacketState = DISCONNECT_TYPE; break; } @@ -1441,6 +1476,11 @@ IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK); break; } + case MQTT_CLIENT_COMMUNICATION_ERROR: + { + IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR); + break; + } case MQTT_CLIENT_NO_PING_RESPONSE: { LogError("Mqtt Ping Response was not encountered. Reconnecting device..."); @@ -1448,13 +1488,13 @@ } case MQTT_CLIENT_PARSE_ERROR: case MQTT_CLIENT_MEMORY_ERROR: - case MQTT_CLIENT_COMMUNICATION_ERROR: case MQTT_CLIENT_UNKNOWN_ERROR: { LogError("INTERNAL ERROR: unexpected error value received %s", ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error)); + break; } } - transport_data->isConnected = false; + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED; transport_data->currPacketState = PACKET_TYPE_ERROR; transport_data->device_twin_get_sent = false; if (transport_data->topic_MqttMessage != NULL) @@ -1588,7 +1628,16 @@ { // construct address const char* hostAddress = STRING_c_str(transport_data->hostAddress); - transport_data->xioTransport = transport_data->get_io_transport(hostAddress); + MQTT_TRANSPORT_PROXY_OPTIONS mqtt_proxy_options; + + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_011: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create`. ]*/ + mqtt_proxy_options.host_address = transport_data->http_proxy_hostname; + mqtt_proxy_options.port = transport_data->http_proxy_port; + mqtt_proxy_options.username = transport_data->http_proxy_username; + mqtt_proxy_options.password = transport_data->http_proxy_password; + + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_010: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create` to obtain the underlying IO handle. ]*/ + transport_data->xioTransport = transport_data->get_io_transport(hostAddress, (transport_data->http_proxy_hostname == NULL) ? NULL : &mqtt_proxy_options); if (transport_data->xioTransport == NULL) { LogError("Unable to create the lower level TLS layer."); @@ -1688,6 +1737,16 @@ return result; } +static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data) +{ + (void)mqtt_client_disconnect(transport_data->mqttClient); + xio_destroy(transport_data->xioTransport); + transport_data->xioTransport = NULL; + + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED; + transport_data->currPacketState = DISCONNECT_TYPE; +} + static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transport_data) { int result = 0; @@ -1695,9 +1754,9 @@ // Make sure we're not destroying the object if (!transport_data->isDestroyCalled) { - // If we are not isConnected then check to see if we need + // If we are MQTT_CLIENT_STATUS_NOT_CONNECTED then check to see if we need // to back off the connecting to the server - if (!transport_data->isConnected && transport_data->isRecoverableError && CanRetry(transport_data->retryLogic)) + if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError && CanRetry(transport_data->retryLogic)) { if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0) { @@ -1713,16 +1772,31 @@ } else { + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTING; transport_data->connectFailCount = 0; - transport_data->isConnected = true; result = 0; } } } - - if (transport_data->isConnected) + // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_001: [ IoTHubTransport_MQTT_Common_DoWork shall trigger reconnection if the mqtt_client_connect does not complete within `keepalive` seconds] + else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTING) { - // We are isConnected and not being closed, so does SAS need to reconnect? + tickcounter_ms_t current_time; + if (tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_time) != 0) + { + LogError("failed verifying MQTT_CLIENT_STATUS_CONNECTING timeout"); + result = __FAILURE__; + } + else if ((current_time - transport_data->mqtt_connect_time) / 1000 > transport_data->keepAliveValue) + { + LogError("mqtt_client timed out waiting for CONNACK"); + DisconnectFromClient(transport_data); + result = 0; + } + } + else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED) + { + // We are connected and not being closed, so does SAS need to reconnect? tickcounter_ms_t current_time; if (tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_time) != 0) { @@ -1735,7 +1809,7 @@ { (void)mqtt_client_disconnect(transport_data->mqttClient); IoTHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN); - transport_data->isConnected = false; + transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED; transport_data->currPacketState = UNKNOWN_TYPE; transport_data->device_twin_get_sent = false; if (transport_data->topic_MqttMessage != NULL) @@ -1937,7 +2011,7 @@ DList_InitializeListHead(&(state->ack_waiting_queue)); state->isDestroyCalled = false; state->isRegistered = false; - state->isConnected = false; + state->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED; state->device_twin_get_sent = false; state->isRecoverableError = true; state->packetId = 1; @@ -2025,6 +2099,9 @@ if (result != NULL) { result->get_io_transport = get_io_transport; + result->http_proxy_hostname = NULL; + result->http_proxy_username = NULL; + result->http_proxy_password = NULL; } } /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_009: [If any error is encountered then IoTHubTransport_MQTT_Common_Create shall return NULL.] */ @@ -2032,16 +2109,6 @@ return result; } -static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data) -{ - (void)mqtt_client_disconnect(transport_data->mqttClient); - xio_destroy(transport_data->xioTransport); - transport_data->xioTransport = NULL; - - transport_data->isConnected = false; - transport_data->currPacketState = DISCONNECT_TYPE; -} - void IoTHubTransport_MQTT_Common_Destroy(TRANSPORT_LL_HANDLE handle) { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_012: [IoTHubTransport_MQTT_Common_Destroy shall do nothing if parameter handle is NULL.] */ @@ -2095,6 +2162,8 @@ tickcounter_destroy(transport_data->msgTickCounter); DestroyRetryLogic(transport_data->retryLogic); + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/ + free_proxy_data(transport_data); free(transport_data); } } @@ -2555,7 +2624,7 @@ currentListEntry = savedFromCurrentListEntry.Flink; } } - /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is isConnected.] */ + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */ mqtt_client_dowork(transport_data->mqttClient); } } @@ -2600,7 +2669,7 @@ ) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption."); + LogError("invalid parameter (NULL) passed to IoTHubTransport_MQTT_Common_SetOption."); } else { @@ -2626,9 +2695,9 @@ if (*keepAliveOption != transport_data->keepAliveValue) { transport_data->keepAliveValue = (uint16_t)(*keepAliveOption); - if (transport_data->isConnected) + if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_NOT_CONNECTED) { - /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is isConnected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */ + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */ DisconnectFromClient(transport_data); } } @@ -2646,6 +2715,78 @@ LogError("x509privatekey specified, but authentication method is not x509"); result = IOTHUB_CLIENT_INVALID_ARG; } + else if (strcmp(OPTION_HTTP_PROXY, option) == 0) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/ + HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value; + + if (transport_data->xioTransport != NULL) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_007: [ If the underlying IO has already been created, then `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + LogError("Cannot set proxy option once the underlying IO is created"); + result = IOTHUB_CLIENT_ERROR; + } + else if (proxy_options->host_address == NULL) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_003: [ If `host_address` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/ + LogError("NULL host_address in proxy options"); + result = IOTHUB_CLIENT_INVALID_ARG; + } + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_006: [ If only one of `username` and `password` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/ + else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) && + (proxy_options->username != proxy_options->password)) + { + LogError("Only one of username and password for proxy settings was NULL"); + result = IOTHUB_CLIENT_INVALID_ARG; + } + else + { + char* copied_proxy_hostname = NULL; + char* copied_proxy_username = NULL; + char* copied_proxy_password = NULL; + + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_002: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/ + transport_data->http_proxy_port = proxy_options->port; + if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + LogError("Cannot copy HTTP proxy hostname"); + result = IOTHUB_CLIENT_ERROR; + } + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/ + else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0)) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + free(copied_proxy_hostname); + LogError("Cannot copy HTTP proxy username"); + result = IOTHUB_CLIENT_ERROR; + } + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/ + else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0)) + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/ + if (copied_proxy_username != NULL) + { + free(copied_proxy_username); + } + free(copied_proxy_hostname); + LogError("Cannot copy HTTP proxy password"); + result = IOTHUB_CLIENT_ERROR; + } + else + { + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/ + free_proxy_data(transport_data); + + transport_data->http_proxy_hostname = copied_proxy_hostname; + transport_data->http_proxy_username = copied_proxy_username; + transport_data->http_proxy_password = copied_proxy_password; + + /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_008: [ If setting the `proxy_data` option suceeds, `IoTHubTransport_MQTT_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/ + result = IOTHUB_CLIENT_OK; + } + } + } else { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */