Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 37:e6a799428f3d
- Parent:
- 36:3b9944257dd5
- Child:
- 38:369a06de92aa
--- a/iothubtransport_mqtt_common.c Mon Mar 05 17:40:20 2018 -0800 +++ b/iothubtransport_mqtt_common.c Mon Apr 16 14:25:25 2018 -0700 @@ -196,6 +196,7 @@ // Telemetry specific DLIST_ENTRY telemetry_waitingForAck; + bool auto_url_encode_decode; // Controls frequency of reconnection logic. RETRY_CONTROL_HANDLE retry_control_handle; @@ -578,172 +579,231 @@ IoTHubClient_LL_SendComplete(transport_data->llClientHandle, &messageCompleted, confirmResult); } -static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic) +static int addUserPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode) { - STRING_HANDLE result = STRING_construct(eventTopic); + int result = 0; const char* const* propertyKeys; const char* const* propertyValues; size_t propertyCount; - size_t index = 0; - - // Construct Properties + size_t index = *index_ptr; MAP_HANDLE properties_map = IoTHubMessage_Properties(iothub_message_handle); if (properties_map != NULL) { if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK) { LogError("Failed to get the internals of the property map."); - STRING_delete(result); - result = NULL; + result = __FAILURE__; } else { if (propertyCount != 0) { - for (index = 0; index < propertyCount && result != NULL; index++) + for (index = 0; index < propertyCount && result == 0; index++) { - if (STRING_sprintf(result, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0) + if (urlencode) { - LogError("Failed construting property string."); - STRING_delete(result); - result = NULL; + STRING_HANDLE property_key = URL_EncodeString(propertyKeys[index]); + STRING_HANDLE property_value = URL_EncodeString(propertyValues[index]); + if ((property_key == NULL) || (property_value == NULL)) + { + LogError("Failed URL Encoding properties"); + result = __FAILURE__; + } + else if (STRING_sprintf(topic_string, "%s=%s%s", STRING_c_str(property_key), STRING_c_str(property_value), propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0) + { + LogError("Failed constructing property string."); + result = __FAILURE__; + } + STRING_delete(property_key); + STRING_delete(property_value); + } + else + { + if (STRING_sprintf(topic_string, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0) + { + LogError("Failed constructing property string."); + result = __FAILURE__; + } } } } } } + *index_ptr = index; + return result; +} + +static int addSystemPropertyToTopicString(STRING_HANDLE topic_string, size_t index, const char* property_key, const char* property_value, bool urlencode) +{ + int result = 0; + + if (urlencode) + { + STRING_HANDLE encoded_property_value = URL_EncodeString(property_value); + if (encoded_property_value == NULL) + { + LogError("Failed URL encoding %s.", property_key); + result = __FAILURE__; + } + else if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, STRING_c_str(encoded_property_value)) != 0) + { + LogError("Failed setting %s.", property_key); + result = __FAILURE__; + } + STRING_delete(encoded_property_value); + } + else + { + if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, property_value) != 0) + { + LogError("Failed setting %s.", property_key); + result = __FAILURE__; + } + } + return result; +} + +static int addSystemPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode) +{ + (void)urlencode; + int result = 0; + size_t index = *index_ptr; /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_052: [ IoTHubTransport_MQTT_Common_DoWork shall check for the CorrelationId property and if found add the value as a system property in the format of $.cid=<id> ] */ - if (result != NULL) + const char* correlation_id = IoTHubMessage_GetCorrelationId(iothub_message_handle); + if (correlation_id != NULL) { - const char* correlation_id = IoTHubMessage_GetCorrelationId(iothub_message_handle); - if (correlation_id != NULL) - { - if (STRING_sprintf(result, "%s%%24.cid=%s", index == 0 ? "" : PROPERTY_SEPARATOR, correlation_id) != 0) - { - LogError("Failed setting correlation id."); - STRING_delete(result); - result = NULL; - } - index++; - } + result = addSystemPropertyToTopicString(topic_string, index, CORRELATION_ID_PROPERTY, correlation_id, urlencode); + index++; } - /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_053: [ IoTHubTransport_MQTT_Common_DoWork shall check for the MessageId property and if found add the value as a system property in the format of $.mid=<id> ] */ - if (result != NULL) + if (result == 0) { const char* msg_id = IoTHubMessage_GetMessageId(iothub_message_handle); if (msg_id != NULL) { - if (STRING_sprintf(result, "%s%%24.mid=%s", index == 0 ? "" : PROPERTY_SEPARATOR, msg_id) != 0) - { - LogError("Failed setting message id."); - STRING_delete(result); - result = NULL; - } + result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_ID_PROPERTY, msg_id, urlencode); index++; } } - // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_010: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentType property and if found add the `value` as a system property in the format of `$.ct=<value>` ] - if (result != NULL) + if (result == 0) { const char* content_type = IoTHubMessage_GetContentTypeSystemProperty(iothub_message_handle); if (content_type != NULL) { - if (STRING_sprintf(result, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, CONTENT_TYPE_PROPERTY, content_type) != 0) - { - LogError("Failed setting content type."); - STRING_delete(result); - result = NULL; - } + result = addSystemPropertyToTopicString(topic_string, index, CONTENT_TYPE_PROPERTY, content_type, urlencode); index++; } } - // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_011: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentEncoding property and if found add the `value` as a system property in the format of `$.ce=<value>` ] - if (result != NULL) + if (result == 0) { const char* content_encoding = IoTHubMessage_GetContentEncodingSystemProperty(iothub_message_handle); if (content_encoding != NULL) { - if (STRING_sprintf(result, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, CONTENT_ENCODING_PROPERTY, content_encoding) != 0) - { - LogError("Failed setting content encoding."); - STRING_delete(result); - result = NULL; - } + result = addSystemPropertyToTopicString(topic_string, index, CONTENT_ENCODING_PROPERTY, content_encoding, urlencode); index++; } } + *index_ptr = index; + return result; +} + +static int addDiagnosticPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr) +{ + int result = 0; + size_t index = *index_ptr; // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_014: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the diagnostic properties including diagid and diagCreationTimeUtc and if found both add them as system property in the format of `$.diagid` and `$.diagctx` respectively] - if (result != NULL) + const IOTHUB_MESSAGE_DIAGNOSTIC_PROPERTY_DATA* diagnosticData = IoTHubMessage_GetDiagnosticPropertyData(iothub_message_handle); + if (diagnosticData != NULL) { - const IOTHUB_MESSAGE_DIAGNOSTIC_PROPERTY_DATA* diagnosticData = IoTHubMessage_GetDiagnosticPropertyData(iothub_message_handle); - if (diagnosticData != NULL) + const char* diag_id = diagnosticData->diagnosticId; + const char* creation_time_utc = diagnosticData->diagnosticCreationTimeUtc; + //diagid and creationtimeutc must be present/unpresent simultaneously + if (diag_id != NULL && creation_time_utc != NULL) { - const char* diag_id = diagnosticData->diagnosticId; - const char* creation_time_utc = diagnosticData->diagnosticCreationTimeUtc; - //diagid and creationtimeutc must be present/unpresent simultaneously - if (diag_id != NULL && creation_time_utc != NULL) + if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_ID_PROPERTY, diag_id) != 0) { - if (STRING_sprintf(result, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_ID_PROPERTY, diag_id) != 0) + LogError("Failed setting diagnostic id"); + result = __FAILURE__; + } + index++; + + if (result == 0) + { + //construct diagnostic context, it should be urlencode(key1=value1,key2=value2) + STRING_HANDLE diagContextHandle = STRING_construct_sprintf("%s=%s", DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY, creation_time_utc); + if (diagContextHandle == NULL) { - LogError("Failed setting diagnostic id"); - STRING_delete(result); - result = NULL; + LogError("Failed constructing diagnostic context"); + result = __FAILURE__; } - index++; - - if (result != NULL) + else { - //construct diagnostic context, it should be urlencode(key1=value1,key2=value2) - STRING_HANDLE diagContextHandle = STRING_construct_sprintf("%s=%s", DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY, creation_time_utc); - if (diagContextHandle == NULL) + //Add other diagnostic context properties here if have more + STRING_HANDLE encodedContextValueHandle = URL_Encode(diagContextHandle); + const char* encodedContextValueString = NULL; + if (encodedContextValueHandle != NULL && + (encodedContextValueString = STRING_c_str(encodedContextValueHandle)) != NULL) { - LogError("Failed constructing diagnostic context"); - STRING_delete(result); - result = NULL; + if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_CONTEXT_PROPERTY, encodedContextValueString) != 0) + { + LogError("Failed setting diagnostic context"); + result = __FAILURE__; + } + STRING_delete(encodedContextValueHandle); + encodedContextValueHandle = NULL; } else { - //Add other diagnostic context properties here if have more - STRING_HANDLE encodedContextValueHandle = URL_Encode(diagContextHandle); - const char* encodedContextValueString = NULL; - if (encodedContextValueHandle != NULL && - (encodedContextValueString = STRING_c_str(encodedContextValueHandle)) != NULL) - { - if (STRING_sprintf(result, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_CONTEXT_PROPERTY, encodedContextValueString) != 0) - { - LogError("Failed setting diagnostic context"); - STRING_delete(result); - result = NULL; - } - STRING_delete(encodedContextValueHandle); - encodedContextValueHandle = NULL; - } - else - { - LogError("Failed encoding diagnostic context value"); - STRING_delete(result); - result = NULL; - } - STRING_delete(diagContextHandle); - diagContextHandle = NULL; - index++; + LogError("Failed encoding diagnostic context value"); + result = __FAILURE__; } + STRING_delete(diagContextHandle); + diagContextHandle = NULL; + index++; } } - else if (diag_id != NULL || creation_time_utc != NULL) - { - // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_015: [ `IoTHubTransport_MQTT_Common_DoWork` shall check whether diagid and diagCreationTimeUtc be present simultaneously, treat as error if not] - LogError("diagid and diagcreationtimeutc must be present simultaneously."); - STRING_delete(result); - result = NULL; - } + } + else if (diag_id != NULL || creation_time_utc != NULL) + { + // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_015: [ `IoTHubTransport_MQTT_Common_DoWork` shall check whether diagid and diagCreationTimeUtc be present simultaneously, treat as error if not] + LogError("diagid and diagcreationtimeutc must be present simultaneously."); + result = __FAILURE__; } } + return result; +} + + +static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic, bool urlencode) +{ + size_t index = 0; + STRING_HANDLE result = STRING_construct(eventTopic); + if (result == NULL) + { + LogError("Failed to create event topic string handle"); + } + else if (addUserPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0) + { + LogError("Failed adding Properties to uMQTT Message"); + STRING_delete(result); + result = NULL; + } + else if (addSystemPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0) + { + LogError("Failed adding System Properties to uMQTT Message"); + STRING_delete(result); + result = NULL; + } + else if (addDiagnosticPropertiesTouMqttMessage(iothub_message_handle, result, &index) != 0) + { + LogError("Failed adding Diagnostic Properties to uMQTT Message"); + STRING_delete(result); + result = NULL; + } return result; } @@ -751,7 +811,7 @@ static int publish_mqtt_telemetry_msg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len) { int result; - STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent)); + STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent), transport_data->auto_url_encode_decode); if (msgTopic == NULL) { LogError("Failed adding properties to mqtt message"); @@ -997,7 +1057,7 @@ return result; } -static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name) +static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name, bool urldecode) { int result; STRING_HANDLE mqttTopic = STRING_construct(topic_name); @@ -1065,10 +1125,28 @@ strncpy(propValue, iterator + 1, valLen); propValue[valLen] = '\0'; - if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, propValue, nameLen) != 0) + if (urldecode) { - LogError("Unable to set message property"); - result = __FAILURE__; + STRING_HANDLE propValue_decoded; + if ((propValue_decoded = URL_DecodeString(propValue)) == NULL) + { + LogError("Failed to URL decode property value"); + result = __FAILURE__; + } + else if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, STRING_c_str(propValue_decoded), nameLen) != 0) + { + LogError("Unable to set message property"); + result = __FAILURE__; + } + STRING_delete(propValue_decoded); + } + else + { + if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, propValue, nameLen) != 0) + { + LogError("Unable to set message property"); + result = __FAILURE__; + } } } free(propName); @@ -1079,7 +1157,7 @@ iterator++; } } - else + else //User Properties { const char* iterator = tokenData; while (iterator != NULL && *iterator != '\0' && result == 0) @@ -1104,10 +1182,30 @@ strncpy(propValue, iterator + 1, valLen); propValue[valLen] = '\0'; - if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK) + if (urldecode) { - LogError("Map_AddOrUpdate failed."); - result = __FAILURE__; + STRING_HANDLE propName_decoded = URL_DecodeString(propName); + STRING_HANDLE propValue_decoded = URL_DecodeString(propValue); + if (propName_decoded == NULL || propValue_decoded == NULL) + { + LogError("Failed to URL decode property"); + result = __FAILURE__; + } + else if (Map_AddOrUpdate(propertyMap, STRING_c_str(propName_decoded), STRING_c_str(propValue_decoded)) != MAP_OK) + { + LogError("Map_AddOrUpdate failed."); + result = __FAILURE__; + } + STRING_delete(propName_decoded); + STRING_delete(propValue_decoded); + } + else + { + if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK) + { + LogError("Map_AddOrUpdate failed."); + result = __FAILURE__; + } } } free(propName); @@ -1250,7 +1348,7 @@ else { // Will need to update this when the service has messages that can be rejected - if (extractMqttProperties(IoTHubMessage, topic_resp) != 0) + if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0) { LogError("failure extracting mqtt properties."); } @@ -1411,6 +1509,21 @@ } } +// Prior to creating a new connection, if we have an existing xioTransport we need to clear +// it now or else cached settings (especially TLS when communicating with HTTP proxies) will +// break reconnection attempt. +static void ResetConnectionIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data) +{ + if (transport_data->xioTransport != NULL) + { + OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport); + set_saved_tls_options(transport_data, options); + + xio_destroy(transport_data->xioTransport); + transport_data->xioTransport = NULL; + } +} + static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data) { if (!transport_data->isDestroyCalled) @@ -1418,6 +1531,7 @@ OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport); set_saved_tls_options(transport_data, options); } + (void)mqtt_client_disconnect(transport_data->mqttClient, NULL, NULL); xio_destroy(transport_data->xioTransport); transport_data->xioTransport = NULL; @@ -1612,7 +1726,21 @@ } else { - if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_X509_ECC) + if (transport_data->saved_tls_options != NULL) + { + if (OptionHandler_FeedOptions(transport_data->saved_tls_options, transport_data->xioTransport) != OPTIONHANDLER_OK) + { + LogError("Failed feeding existing options to new TLS instance."); + result = __FAILURE__; + } + else + { + // The tlsio has the options, so our copy can be deleted + set_saved_tls_options(transport_data, NULL); + result = 0; + } + } + else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_X509_ECC) { if (IoTHubClient_Auth_Set_xio_Certificate(transport_data->authorization_module, transport_data->xioTransport) != 0) { @@ -1628,28 +1756,6 @@ { result = 0; } - - if (result == 0) - { - if (transport_data->saved_tls_options != NULL) - { - if (OptionHandler_FeedOptions(transport_data->saved_tls_options, transport_data->xioTransport) != OPTIONHANDLER_OK) - { - LogError("Failed feeding existing options to new TLS instance."); - result = __FAILURE__; - } - else - { - // The tlsio has the options, so our copy can be deleted - set_saved_tls_options(transport_data, NULL); - result = 0; - } - } - else - { - result = 0; - } - } } } else @@ -1828,6 +1934,8 @@ } else { + ResetConnectionIfNecessary(transport_data); + if (SendMqttConnectMsg(transport_data) != 0) { transport_data->connectFailCount++; @@ -2014,6 +2122,7 @@ state->authorization_module = auth_module; state->isProductInfoSet = false; state->option_sas_token_lifetime_secs = SAS_TOKEN_DEFAULT_LIFETIME; + state->auto_url_encode_decode = false; } } } @@ -2674,6 +2783,11 @@ mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace); result = IOTHUB_CLIENT_OK; } + else if (strcmp(OPTION_AUTO_URL_ENCODE_DECODE, option) == 0) + { + transport_data->auto_url_encode_decode = *((bool*)value); + result = IOTHUB_CLIENT_OK; + } /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_052: [ If the option parameter is set to "sas_token_lifetime" then the value shall be a size_t_ptr and the value will determine the mqtt sas token lifetime.] */ else if (strcmp(OPTION_SAS_TOKEN_LIFETIME, option) == 0) {