Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransportmqtt.c
- Revision:
- 2:9db4da50abaa
- Parent:
- 1:f2e563755d91
- Child:
- 3:40f482ed0be8
diff -r f2e563755d91 -r 9db4da50abaa iothubtransportmqtt.c --- a/iothubtransportmqtt.c Fri Apr 08 13:24:20 2016 -0700 +++ b/iothubtransportmqtt.c Sun Apr 24 16:40:02 2016 -0700 @@ -22,12 +22,14 @@ #include "azure_c_shared_utility/tlsio.h" #include "azure_c_shared_utility/platform.h" +#include "azure_c_shared_utility/string_tokenizer.h" #include "iothub_client_version.h" #include <stdarg.h> #include <stdio.h> #define SAS_TOKEN_DEFAULT_LIFETIME 3600 +#define SAS_REFRESH_MULTIPLIER .8 #define EPOCH_TIME_T_VALUE 0 #define DEFAULT_MQTT_KEEPALIVE 4*60 // 4 min #define DEFAULT_PORT_NUMBER 8883 @@ -37,6 +39,29 @@ #define SAS_TOKEN_DEFAULT_LEN 10 #define RESEND_TIMEOUT_VALUE_MIN 1*60 #define MAX_SEND_RECOUNT_LIMIT 2 +#define DEFAULT_CONNECTION_INTERVAL 30 +#define FAILED_CONN_BACKOFF_VALUE 5 + +static const char* DEVICE_MSG_TOPIC = "devices/%s/messages/devicebound/#"; +static const char* DEVICE_DEVICE_TOPIC = "devices/%s/messages/events/"; +static const char* PROPERTY_SEPARATOR = "&"; + +typedef struct SYSTEM_PROPERTY_INFO_TAG +{ + const char* propName; + size_t propLength; +} SYSTEM_PROPERTY_INFO; + +static SYSTEM_PROPERTY_INFO sysPropList[] = { + { "%24.exp", 7 }, + { "%24.mid", 7 }, + { "%24.uid", 7 }, + { "%24.to", 6 }, + { "%24.cid", 7 }, + { "devices/", 8 }, + { "iothub-operation", 16 }, + { "iothub-ack", 10 } +}; TICK_COUNTER_HANDLE g_msgTickCounter; @@ -69,6 +94,9 @@ CONTROL_PACKET_TYPE currPacketState; XIO_HANDLE xioTransport; int keepAliveValue; + uint64_t mqtt_connect_time; + size_t connectFailCount; + uint64_t connectTick; } MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA; typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG @@ -102,28 +130,190 @@ IoTHubClient_LL_SendComplete(transportState->llClientHandle, &messageCompleted, batchResult); } +static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic) +{ + STRING_HANDLE result = STRING_construct(eventTopic); + const char* const* propertyKeys; + const char* const* propertyValues; + size_t propertyCount; + + // Construct Properties + MAP_HANDLE properties_map = IoTHubMessage_Properties(iothub_message_handle); + if (properties_map != NULL) + { + if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK) + { + LogError("Failed to get the internals of the property map."); + STRING_delete(result); + result = NULL; + } + else + { + if (propertyCount != 0) + { + for (size_t index = 0; index < propertyCount && result != NULL; index++) + { + size_t len = strlen(propertyKeys[index]) + strlen(propertyValues[index]) + 2; + char* propValues = malloc(len+1); + if (propValues == NULL) + { + STRING_delete(result); + result = NULL; + } + else + { + sprintf(propValues, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : "&"); + if (STRING_concat(result, propValues) != 0) + { + STRING_delete(result); + result = NULL; + } + free(propValues); + } + } + } + } + } + return result; +} + static int publishMqttMessage(PMQTTTRANSPORT_HANDLE_DATA transportState, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len) { int result; - MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(transportState->packetId++, STRING_c_str(transportState->mqttEventTopic), DELIVER_AT_LEAST_ONCE, payload, len); - if (mqttMsg == NULL) + STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transportState->mqttEventTopic)); + if (msgTopic == NULL) { result = __LINE__; } else { - if (mqtt_client_publish(transportState->mqttClient, mqttMsg) != 0) + MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(transportState->packetId++, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len); + if (mqttMsg == NULL) { result = __LINE__; } else { - mqttMsgEntry->retryCount++; - (void)tickcounter_get_current_ms(g_msgTickCounter, &mqttMsgEntry->msgPublishTime); - result = 0; + if (mqtt_client_publish(transportState->mqttClient, mqttMsg) != 0) + { + result = __LINE__; + } + else + { + mqttMsgEntry->retryCount++; + (void)tickcounter_get_current_ms(g_msgTickCounter, &mqttMsgEntry->msgPublishTime); + result = 0; + } + mqttmessage_destroy(mqttMsg); + } + STRING_delete(msgTopic); + } + return result; +} + +static bool isSystemProperty(const char* tokenData) +{ + bool result = false; + size_t propCount = sizeof(sysPropList)/sizeof(sysPropList[0]); + for (size_t index = 0; index < propCount; index++) + { + if (memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0) + { + result = true; + break; + } + } + return result; +} + +static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, MQTT_MESSAGE_HANDLE msgHandle) +{ + int result; + STRING_HANDLE mqttTopic = STRING_construct(mqttmessage_getTopicName(msgHandle)); + + STRING_TOKENIZER_HANDLE token = STRING_TOKENIZER_create(mqttTopic); + if (token != NULL) + { + MAP_HANDLE propertyMap = IoTHubMessage_Properties(IoTHubMessage); + if (propertyMap == NULL) + { + LogError("Failure to retrieve IoTHubMessage_properties."); + result = __LINE__; } - mqttmessage_destroy(mqttMsg); + else + { + STRING_HANDLE output = STRING_new(); + if (output == NULL) + { + LogError("Failure to allocate STRING_new."); + result = __LINE__; + } + else + { + result = 0; + while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0) + { + const char* tokenData = STRING_c_str(output); + size_t tokenLen = strlen(tokenData); + if (tokenData == NULL || tokenLen == 0) + { + break; + } + else + { + if (!isSystemProperty(tokenData) ) + { + const char* iterator = tokenData; + while (iterator != NULL && *iterator != '\0' && result == 0) + { + if (*iterator == '=') + { + size_t nameLen = iterator - tokenData; + char* propName = malloc(nameLen + 1); + + size_t valLen = tokenLen - (nameLen + 1) + 1; + char* propValue = malloc(valLen + 1); + + if (propName == NULL || propValue == NULL) + { + result = __LINE__; + } + else + { + strncpy(propName, tokenData, nameLen); + propName[nameLen] = '\0'; + + strncpy(propValue, iterator + 1, valLen); + propValue[valLen] = '\0'; + + if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK) + { + LogError("Map_AddOrUpdate failed."); + result = __LINE__; + } + } + free(propName); + free(propValue); + + break; + } + iterator++; + } + } + } + } + STRING_delete(output); + } + } + STRING_TOKENIZER_destroy(token); } + else + { + LogError("Unable to create Tokenizer object."); + result = __LINE__; + } + STRING_delete(mqttTopic); + return result; } @@ -135,14 +325,16 @@ IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length); if (IoTHubMessage == NULL) { - LogError("IotHub Message creation has failed.\r\n"); + LogError("IotHub Message creation has failed."); } else { + // Will need to update this when the service has messages that can be rejected + (void)extractMqttProperties(IoTHubMessage, msgHandle); PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx; if (IoTHubClient_LL_MessageCallback(transportData->llClientHandle, IoTHubMessage) != IOTHUBMESSAGE_ACCEPTED) { - LogError("Event not accepted by our client.\r\n"); + LogError("Event not accepted by our client."); } IoTHubMessage_Destroy(IoTHubMessage); } @@ -194,7 +386,7 @@ } else { - LogError("Connection not accepted, return code: %d.\r\n", connack->returnCode); + LogError("Connection not accepted, return code: %d.", connack->returnCode); (void)mqtt_client_disconnect(transportData->mqttClient); transportData->connected = false; transportData->currPacketState = PACKET_TYPE_ERROR; @@ -202,7 +394,7 @@ } else { - LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL.\r\n"); + LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL."); } break; } @@ -218,7 +410,7 @@ } else { - LogError("QOS count was not expected: %d.\r\n", (int)suback->qosCount); + LogError("QOS count was not expected: %d.", (int)suback->qosCount); } } break; @@ -300,7 +492,7 @@ { if (IoTHubMessage_GetByteArray(messageHandle, &result, length) != IOTHUB_MESSAGE_OK) { - LogError("Failure result from IoTHubMessage_GetByteArray\r\n"); + LogError("Failure result from IoTHubMessage_GetByteArray"); result = NULL; *length = 0; } @@ -310,7 +502,7 @@ result = (const unsigned char*)IoTHubMessage_GetString(messageHandle); if (result == NULL) { - LogError("Failure result from IoTHubMessage_GetString\r\n"); + LogError("Failure result from IoTHubMessage_GetString"); result = NULL; *length = 0; } @@ -360,7 +552,7 @@ } else { - (void)sprintf(eventTopic, "devices/%s/messages/events/", deviceId); + (void)sprintf(eventTopic, DEVICE_DEVICE_TOPIC, deviceId); result = STRING_construct(eventTopic); free(eventTopic); } @@ -379,7 +571,7 @@ } else { - (void)sprintf(messageTopic, "devices/%s/messages/devicebound/#", deviceId); + (void)sprintf(messageTopic, DEVICE_MSG_TOPIC, deviceId); result = STRING_construct(messageTopic); free(messageTopic); } @@ -407,7 +599,7 @@ transportState->xioTransport = getIoTransportProvider(hostName, transportState->portNum); if (transportState->xioTransport == NULL) { - LogError("Unable to create the lower level TLS layer.\r\n"); + LogError("Unable to create the lower level TLS layer."); result = __LINE__; } else @@ -421,9 +613,10 @@ } return result; } + static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transportState) { - int result = 0; + int result; // Construct SAS token size_t secSinceEpoch = (size_t)(difftime(get_time(NULL), EPOCH_TIME_T_VALUE) + 0); @@ -447,18 +640,25 @@ options.useCleanSession = false; options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE; - if ((result = GetTransportProviderIfNecessary(transportState)) == 0) + if (GetTransportProviderIfNecessary(transportState) == 0) { if (mqtt_client_connect(transportState->mqttClient, transportState->xioTransport, &options) != 0) { - LogError("failure connecting to address %s:%d.\r\n", STRING_c_str(transportState->hostAddress), transportState->portNum); + LogError("failure connecting to address %s:%d.", STRING_c_str(transportState->hostAddress), transportState->portNum); + xio_destroy(transportState->xioTransport); + transportState->xioTransport = NULL; result = __LINE__; } else { + (void)tickcounter_get_current_ms(g_msgTickCounter, &transportState->mqtt_connect_time); result = 0; } } + else + { + result = __LINE__; + } } STRING_delete(emptyKeyName); STRING_delete(sasToken); @@ -468,19 +668,62 @@ static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transportState) { int result = 0; - if (!transportState->connected && !transportState->destroyCalled) + + // Make sure we're not destroying the object + if (!transportState->destroyCalled) { + // If we are not connected then check to see if we need + // to back off the connecting to the server + if (!transportState->connected) + { + // Default makeConnection as true if something goes wrong we'll make the connection + bool makeConnection = true; + // If we've failed for FAILED_CONN_BACKOFF_VALUE straight times them let's slow down connection + // to the service + if (transportState->connectFailCount > FAILED_CONN_BACKOFF_VALUE) + { + uint64_t currentTick; + if (tickcounter_get_current_ms(g_msgTickCounter, ¤tTick) == 0) + { + if ( ((currentTick - transportState->connectTick)/1000) <= DEFAULT_CONNECTION_INTERVAL) + { + result = __LINE__; + makeConnection = false; + } + } + } + + if (makeConnection) + { + (void)tickcounter_get_current_ms(g_msgTickCounter, &transportState->connectTick); if (SendMqttConnectMsg(transportState) != 0) { - transportState->connected = false; + transportState->connectFailCount++; result = __LINE__; } else { + transportState->connectFailCount = 0; transportState->connected = true; result = 0; } } + } + + if (transportState->connected) + { + // We are connected and not being closed, so does SAS need to reconnect? + uint64_t current_time; + (void)tickcounter_get_current_ms(g_msgTickCounter, ¤t_time); + if ((current_time - transportState->mqtt_connect_time) / 1000 > (SAS_TOKEN_DEFAULT_LIFETIME*SAS_REFRESH_MULTIPLIER)) + { + (void)mqtt_client_disconnect(transportState->mqttClient); + transportState->subscribed = false; + transportState->connected = false; + transportState->currPacketState = UNKNOWN_TYPE; + } + } + } return result; } @@ -508,7 +751,7 @@ PMQTTTRANSPORT_HANDLE_DATA state = (PMQTTTRANSPORT_HANDLE_DATA)malloc(sizeof(MQTTTRANSPORT_HANDLE_DATA)); if (state == NULL) { - LogError("Could not create MQTT transport state. Memory allocation failed.\r\n"); + LogError("Could not create MQTT transport state. Memory allocation failed."); } else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL) { @@ -517,14 +760,14 @@ } else if ((state->device_key = STRING_construct(upperConfig->deviceKey)) == NULL) { - LogError("Could not create device key for MQTT\r\n"); + LogError("Could not create device key for MQTT"); STRING_delete(state->device_id); free(state); state = NULL; } else if ( (state->sasTokenSr = ConstructSasToken(upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId) ) == NULL) { - LogError("Could not create Sas Token Sr String.\r\n"); + LogError("Could not create Sas Token Sr String."); STRING_delete(state->device_key); STRING_delete(state->device_id); free(state); @@ -532,7 +775,7 @@ } else if ( (state->mqttEventTopic = ConstructEventTopic(upperConfig->deviceId) ) == NULL) { - LogError("Could not create mqttEventTopic for MQTT\r\n"); + LogError("Could not create mqttEventTopic for MQTT"); STRING_delete(state->sasTokenSr); STRING_delete(state->device_key); STRING_delete(state->device_id); @@ -541,7 +784,7 @@ } else if ((state->mqttMessageTopic = ConstructMessageTopic(upperConfig->deviceId) ) == NULL) { - LogError("Could not create mqttMessageTopic for MQTT\r\n"); + LogError("Could not create mqttMessageTopic for MQTT"); STRING_delete(state->mqttEventTopic); STRING_delete(state->sasTokenSr); STRING_delete(state->device_key); @@ -605,6 +848,8 @@ state->waitingToSend = waitingToSend; state->currPacketState = CONNECT_TYPE; state->keepAliveValue = DEFAULT_MQTT_KEEPALIVE; + state->connectFailCount = 0; + state->connectTick = 0; } } } @@ -619,7 +864,7 @@ /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_001: [If parameter config is NULL then IoTHubTransportMqtt_Create shall return NULL.] */ if (config == NULL) { - LogError("Invalid Argument: Config Parameter is NULL.\r\n"); + LogError("Invalid Argument: Config Parameter is NULL."); result = NULL; } /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransportMqtt_Create shall return NULL.] */ @@ -627,31 +872,31 @@ else if (config->upperConfig == NULL || config->upperConfig->protocol == NULL || config->upperConfig->deviceId == NULL || config->upperConfig->deviceKey == NULL || config->upperConfig->iotHubName == NULL || config->upperConfig->iotHubSuffix == NULL) { - LogError("Invalid Argument: upperConfig structure contains an invalid parameter\r\n"); + LogError("Invalid Argument: upperConfig structure contains an invalid parameter"); result = NULL; } /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransportMqtt_Create shall return NULL.] */ else if (config->waitingToSend == NULL) { - LogError("Invalid Argument: waitingToSend is NULL)\r\n"); + LogError("Invalid Argument: waitingToSend is NULL)"); result = NULL; } /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_006: [If the upperConfig's variables deviceId is an empty strings or length is greater then 128 then IoTHubTransportMqtt_Create shall return NULL.] */ else if ( ( (deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0) ) { - LogError("Invalid Argument: DeviceId is of an invalid size\r\n"); + LogError("Invalid Argument: DeviceId is of an invalid size"); result = NULL; } /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransportMqtt_Create shall return NULL.] */ else if (strlen(config->upperConfig->deviceKey) == 0) { - LogError("Invalid Argument: deviceKey is empty\r\n"); + LogError("Invalid Argument: deviceKey is empty"); result = NULL; } /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransportMqtt_Create shall return NULL.] */ else if (strlen(config->upperConfig->iotHubName) == 0) { - LogError("Invalid Argument: iotHubName is empty\r\n"); + LogError("Invalid Argument: iotHubName is empty"); result = NULL; } else @@ -677,6 +922,7 @@ (void)mqtt_client_disconnect(transportState->mqttClient); xio_destroy(transportState->xioTransport); + transportState->xioTransport = NULL; transportState->connected = false; transportState->currPacketState = DISCONNECT_TYPE; @@ -722,7 +968,7 @@ if (transportState == NULL) { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransportMqtt_Subscribe shall return a non-zero value.] */ - LogError("Invalid handle parameter. NULL.\r\n"); + LogError("Invalid handle parameter. NULL."); result = __LINE__; } else @@ -756,7 +1002,7 @@ } else { - LogError("Invalid argument to unsubscribe (NULL). \r\n"); + LogError("Invalid argument to unsubscribe (NULL)."); } } @@ -810,7 +1056,7 @@ const unsigned char* messagePayload = RetrieveMessagePayload(mqttMsgEntry->iotHubMessageEntry->messageHandle, &messageLength); if (messageLength == 0 || messagePayload == NULL) { - LogError("Failure from creating Message IoTHubMessage_GetData\r\n"); + LogError("Failure from creating Message IoTHubMessage_GetData"); } else { @@ -839,7 +1085,7 @@ const unsigned char* messagePayload = RetrieveMessagePayload(iothubMsgList->messageHandle, &messageLength); if (messageLength == 0 || messagePayload == NULL) { - LogError("Failure result from IoTHubMessage_GetData\r\n"); + LogError("Failure result from IoTHubMessage_GetData"); } else { @@ -847,7 +1093,7 @@ MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST)); if (mqttMsgEntry == NULL) { - LogError("Allocation Error: Failure allocating MQTT Message Detail List.\r\n"); + LogError("Allocation Error: Failure allocating MQTT Message Detail List."); } else { @@ -884,7 +1130,7 @@ if (handle == NULL || iotHubClientStatus == NULL) { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransportMqtt_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */ - LogError("invalid arument. \r\n"); + LogError("invalid arument."); result = IOTHUB_CLIENT_INVALID_ARG; } else @@ -916,7 +1162,7 @@ ) { result = IOTHUB_CLIENT_INVALID_ARG; - LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n"); + LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption."); } else {