Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransport_mqtt_common.c
- Revision:
- 18:ec8e5e97c6a4
- Parent:
- 17:774695cb8554
- Child:
- 19:f87dfe76bc70
--- a/iothubtransport_mqtt_common.c Sat Jan 28 09:34:14 2017 -0800 +++ b/iothubtransport_mqtt_common.c Fri Feb 24 14:00:29 2017 -0800 @@ -3,6 +3,7 @@ #include <stdlib.h> #include <ctype.h> +#include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/gballoc.h" #include "azure_c_shared_utility/xlogging.h" @@ -66,6 +67,9 @@ static const char* REQUEST_ID_PROPERTY = "?$rid="; +static const char* MESSAGE_ID_PROPERTY = "mid"; +static const char* CORRELATION_ID_PROPERTY = "cid"; + #define UNSUBSCRIBE_FROM_TOPIC 0x0000 #define SUBSCRIBE_GET_REPORTED_STATE_TOPIC 0x0001 #define SUBSCRIBE_NOTIFICATION_STATE_TOPIC 0x0002 @@ -264,7 +268,7 @@ } else { - result = __LINE__; + result = __FAILURE__; } return result; } @@ -384,7 +388,7 @@ { /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */ LogError("Invalid handle parameter. NULL."); - result = __LINE__; + result = __FAILURE__; } else { @@ -409,7 +413,7 @@ { /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_044: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds cannot be created then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line]*/ LogError("Retry Logic is not created"); - result = __LINE__; + result = __FAILURE__; } } return result; @@ -573,7 +577,7 @@ if (token_handle == NULL) { LogError("Failed creating token from device twin topic."); - result = __LINE__; + result = __FAILURE__; } else { @@ -581,13 +585,13 @@ if ((token_value = STRING_new()) == NULL) { LogError("Failed allocating new string ."); - result = __LINE__; + result = __FAILURE__; } else { size_t token_index = 0; size_t request_id_length = strlen(REQUEST_ID_PROPERTY); - result = __LINE__; + result = __FAILURE__; while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0) { if (token_index == 3) @@ -595,7 +599,7 @@ if (STRING_concat_with_STRING(method_name, token_value) != 0) { LogError("Failed STRING_concat_with_STRING."); - result = __LINE__; + result = __FAILURE__; break; } } @@ -609,7 +613,7 @@ if (STRING_concat(request_id, request_id_value+request_id_length) != 0) { LogError("Failed STRING_concat failed."); - result = __LINE__; + result = __FAILURE__; } else { @@ -635,7 +639,7 @@ if (token_handle == NULL) { LogError("Failed creating token from device twin topic."); - result = __LINE__; + result = __FAILURE__; *status_code = 0; *request_id = 0; *patch_msg = false; @@ -646,14 +650,14 @@ if ((token_value = STRING_new()) == NULL) { LogError("Failed allocating new string ."); - result = __LINE__; + result = __FAILURE__; *status_code = 0; *request_id = 0; *patch_msg = false; } else { - result = __LINE__; + result = __FAILURE__; size_t token_count = 0; while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0) { @@ -815,27 +819,27 @@ STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent)); if (msgTopic == NULL) { - result = __LINE__; + result = __FAILURE__; } else { MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(mqttMsgEntry->packet_id, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len); if (mqttMsg == NULL) { - result = __LINE__; + result = __FAILURE__; } else { if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0) { LogError("Failed retrieving tickcounter info"); - result = __LINE__; + result = __FAILURE__; } else { if (mqtt_client_publish(transport_data->mqttClient, mqttMsg) != 0) { - result = __LINE__; + result = __FAILURE__; } else { @@ -859,7 +863,7 @@ if (msg_topic == NULL) { LogError("Failed constructing message topic."); - result = __LINE__; + result = __FAILURE__; } else { @@ -867,14 +871,14 @@ if (mqtt_get_msg == NULL) { LogError("Failed constructing mqtt message."); - result = __LINE__; + result = __FAILURE__; } else { if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0) { LogError("Failed publishing to mqtt client."); - result = __LINE__; + result = __FAILURE__; } else { @@ -894,7 +898,7 @@ if (mqtt_info == NULL) { LogError("Failed allocating device twin data."); - result = __LINE__; + result = __FAILURE__; } else { @@ -910,7 +914,7 @@ { LogError("Failed constructing get Prop topic."); free(mqtt_info); - result = __LINE__; + result = __FAILURE__; } else { @@ -919,7 +923,7 @@ { LogError("Failed constructing mqtt message."); free(mqtt_info); - result = __LINE__; + result = __FAILURE__; } else { @@ -927,7 +931,7 @@ { LogError("Failed publishing to mqtt client."); free(mqtt_info); - result = __LINE__; + result = __FAILURE__; } else { @@ -951,7 +955,7 @@ if (msgTopic == NULL) { LogError("Failed constructing reported prop topic."); - result = __LINE__; + result = __FAILURE__; } else { @@ -960,21 +964,21 @@ if (mqtt_rpt_msg == NULL) { LogError("Failed creating mqtt message"); - result = __LINE__; + result = __FAILURE__; } else { if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0) { LogError("Failed retrieving tickcounter info"); - result = __LINE__; + result = __FAILURE__; } else { if (mqtt_client_publish(transport_data->mqttClient, mqtt_rpt_msg) != 0) { LogError("Failed publishing mqtt message"); - result = __LINE__; + result = __FAILURE__; } else { @@ -1012,7 +1016,7 @@ if (mqttTopic == NULL) { LogError("Failure constructing string topic name."); - result = __LINE__; + result = __FAILURE__; } else { @@ -1023,7 +1027,7 @@ if (propertyMap == NULL) { LogError("Failure to retrieve IoTHubMessage_properties."); - result = __LINE__; + result = __FAILURE__; } else { @@ -1031,11 +1035,12 @@ if (output == NULL) { LogError("Failure to allocate STRING_new."); - result = __LINE__; + result = __FAILURE__; } else { result = 0; + while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0) { const char* tokenData = STRING_c_str(output); @@ -1046,7 +1051,7 @@ } else { - if (!isSystemProperty(tokenData) ) + if (isSystemProperty(tokenData)) { const char* iterator = tokenData; while (iterator != NULL && *iterator != '\0' && result == 0) @@ -1061,7 +1066,61 @@ if (propName == NULL || propValue == NULL) { - result = __LINE__; + result = __FAILURE__; + } + else + { + strncpy(propName, tokenData, nameLen); + propName[nameLen] = '\0'; + + strncpy(propValue, iterator + 1, valLen); + propValue[valLen] = '\0'; + + if (nameLen > 3) + { + if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_ID_PROPERTY) == 0) + { + if (IoTHubMessage_SetMessageId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK) + { + LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property."); + result = __FAILURE__; + } + } + + if (strcmp((const char*)&propName[nameLen - 3], CORRELATION_ID_PROPERTY) == 0) + { + if (IoTHubMessage_SetCorrelationId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK) + { + LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property."); + result = __FAILURE__; + } + } + } + } + free(propName); + free(propValue); + + break; + } + iterator++; + } + } + else + { + const char* iterator = tokenData; + while (iterator != NULL && *iterator != '\0' && result == 0) + { + if (*iterator == '=') + { + size_t nameLen = iterator - tokenData; + char* propName = malloc(nameLen + 1); + + size_t valLen = tokenLen - (nameLen + 1) + 1; + char* propValue = malloc(valLen + 1); + + if (propName == NULL || propValue == NULL) + { + result = __FAILURE__; } else { @@ -1074,7 +1133,7 @@ if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK) { LogError("Map_AddOrUpdate failed."); - result = __LINE__; + result = __FAILURE__; } } free(propName); @@ -1095,7 +1154,7 @@ else { LogError("Unable to create Tokenizer object."); - result = __LINE__; + result = __FAILURE__; } STRING_delete(mqttTopic); } @@ -1520,7 +1579,7 @@ if (transport_data->xioTransport == NULL) { LogError("Unable to create the lower level TLS layer."); - result = __LINE__; + result = __FAILURE__; } else { @@ -1542,7 +1601,7 @@ STRING_HANDLE emptyKeyName = STRING_new(); if (emptyKeyName == NULL) { - result = __LINE__; + result = __FAILURE__; } else { @@ -1557,7 +1616,7 @@ { IotHubClient_LL_ConnectionStatusCallBack(transport_data->llClientHandle, IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN); STRING_delete(sasToken); - result = __LINE__; + result = __FAILURE__; } break; case DEVICE_KEY: @@ -1596,7 +1655,7 @@ if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0) { LogError("failure connecting to address %s:%d.", STRING_c_str(transport_data->hostAddress), transport_data->portNum); - result = __LINE__; + result = __FAILURE__; } else { @@ -1606,7 +1665,7 @@ } else { - result = __LINE__; + result = __FAILURE__; } STRING_delete(sasToken); @@ -1630,14 +1689,14 @@ if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0) { transport_data->connectFailCount++; - result = __LINE__; + result = __FAILURE__; } else { if (SendMqttConnectMsg(transport_data) != 0) { transport_data->connectFailCount++; - result = __LINE__; + result = __FAILURE__; } else { @@ -1655,7 +1714,7 @@ if (tickcounter_get_current_ms(transport_data->msgTickCounter, ¤t_time) != 0) { transport_data->connectFailCount++; - result = __LINE__; + result = __FAILURE__; } else { @@ -1704,12 +1763,12 @@ if (transport_data->transport_creds.CREDENTIAL_VALUE.deviceKey == NULL) { LogError("Could not create device key for MQTT"); - result = __LINE__; + result = __FAILURE__; } else if ((transport_data->devicesPath = STRING_construct_sprintf("%s.%s/devices/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId)) == NULL) { STRING_delete(transport_data->transport_creds.CREDENTIAL_VALUE.deviceKey); - result = __LINE__; + result = __FAILURE__; } else { @@ -1722,7 +1781,7 @@ transport_data->transport_creds.CREDENTIAL_VALUE.deviceSasToken = STRING_construct(upperConfig->deviceSasToken); if (transport_data->transport_creds.CREDENTIAL_VALUE.deviceSasToken == NULL) { - result = __LINE__; + result = __FAILURE__; } else { @@ -2035,7 +2094,7 @@ { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_042: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */ LogError("Invalid handle parameter. NULL."); - result = __LINE__; + result = __FAILURE__; } else { @@ -2047,7 +2106,7 @@ { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_046: [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return a non-zero value.] */ LogError("Failure: unable constructing reported state topic"); - result = __LINE__; + result = __FAILURE__; } else { @@ -2067,7 +2126,7 @@ if (transport_data->topic_NotifyState == NULL) { LogError("Failure: unable constructing notify state topic"); - result = __LINE__; + result = __FAILURE__; } else { @@ -2126,7 +2185,7 @@ { /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_001 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/ LogError("Invalid handle parameter. NULL."); - result = __LINE__; + result = __FAILURE__; } else { @@ -2138,7 +2197,7 @@ { /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_006 : [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/ LogError("Failure: unable constructing device method subscribe topic"); - result = __LINE__; + result = __FAILURE__; } else { @@ -2211,7 +2270,7 @@ if (dev_method_info == NULL) { LogError("Failure: DEVICE_METHOD_INFO was NULL"); - result = __LINE__; + result = __FAILURE__; } else { @@ -2219,7 +2278,7 @@ { /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */ LogError("Failure: publishing device method response"); - result = __LINE__; + result = __FAILURE__; } else { @@ -2232,7 +2291,7 @@ else { /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */ - result = __LINE__; + result = __FAILURE__; LogError("Failure: invalid IOTHUB_DEVICE_HANDLE parameter specified"); } return result; @@ -2246,7 +2305,7 @@ { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */ LogError("Invalid handle parameter. NULL."); - result = __LINE__; + result = __FAILURE__; } else { @@ -2255,7 +2314,7 @@ if (transport_data->topic_MqttMessage == NULL) { LogError("Failure constructing Message Topic"); - result = __LINE__; + result = __FAILURE__; } else {