Microsoft Azure IoTHub client MQTT transport
Dependents: STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more
Diff: iothubtransportmqtt.c
- Revision:
- 6:16875b609849
- Parent:
- 5:73603e7a6542
- Child:
- 7:7fdd306e6224
--- a/iothubtransportmqtt.c Fri Jul 01 10:42:25 2016 -0700 +++ b/iothubtransportmqtt.c Mon Jul 18 16:44:47 2016 -0700 @@ -28,6 +28,8 @@ #include <stdarg.h> #include <stdio.h> +#include <limits.h> + #define SAS_TOKEN_DEFAULT_LIFETIME 3600 #define SAS_REFRESH_MULTIPLIER .8 #define EPOCH_TIME_T_VALUE 0 @@ -94,7 +96,7 @@ IOTHUB_CLIENT_LL_HANDLE llClientHandle; CONTROL_PACKET_TYPE currPacketState; XIO_HANDLE xioTransport; - int keepAliveValue; + uint16_t keepAliveValue; uint64_t mqtt_connect_time; size_t connectFailCount; uint64_t connectTick; @@ -110,6 +112,19 @@ DLIST_ENTRY entry; } MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST; +static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transportState) +{ + if (transportState->packetId+1 >= USHRT_MAX) + { + transportState->packetId = 1; + } + else + { + transportState->packetId++; + } + return transportState->packetId; +} + static void sendMsgComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transportState, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult) { DLIST_ENTRY messageCompleted; @@ -168,6 +183,7 @@ static int publishMqttMessage(PMQTTTRANSPORT_HANDLE_DATA transportState, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len) { int result; + mqttMsgEntry->msgPacketId = get_next_packet_id(transportState); STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transportState->mqttEventTopic)); if (msgTopic == NULL) { @@ -175,7 +191,7 @@ } else { - MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(transportState->packetId++, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len); + MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(mqttMsgEntry->msgPacketId, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len); if (mqttMsg == NULL) { result = __LINE__; @@ -431,9 +447,11 @@ const XIO_HANDLE getIoTransportProvider(const char* fqdn, int port) { - TLSIO_CONFIG tls_io_config = { fqdn, port }; + TLSIO_CONFIG tls_io_config; const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio(); - return (void*)xio_create(io_interface_description, &tls_io_config); + tls_io_config.hostname = fqdn; + tls_io_config.port = port; + return xio_create(io_interface_description, &tls_io_config); } static int SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transportState) @@ -442,11 +460,13 @@ if (transportState->receiveMessages && !transportState->subscribed) { - SUBSCRIBE_PAYLOAD subscribe[] = { - { STRING_c_str(transportState->mqttMessageTopic), DELIVER_AT_LEAST_ONCE } - }; + SUBSCRIBE_PAYLOAD subscribe[1]; + + subscribe[0].subscribeTopic = STRING_c_str(transportState->mqttMessageTopic); + subscribe[0].qosReturn = DELIVER_AT_LEAST_ONCE; + /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransportMqtt_Subscribe shall call mqtt_client_subscribe to subscribe to the Message Topic.] */ - if (mqtt_client_subscribe(transportState->mqttClient, transportState->packetId++, subscribe, 1) != 0) + if (mqtt_client_subscribe(transportState->mqttClient, get_next_packet_id(transportState), subscribe, 1) != 0) { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransportMqtt_Subscribe shall return a non-zero value.] */ result = __LINE__; @@ -469,6 +489,7 @@ { transportState->currPacketState = PUBLISH_TYPE; } + result = 0; } return result; } @@ -1019,8 +1040,9 @@ if (transportState != NULL && transportState->subscribed) { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_020: [IoTHubTransportMqtt_Unsubscribe shall call mqtt_client_unsubscribe to unsubscribe the mqtt message topic.] */ - const char* unsubscribe[] = { STRING_c_str(transportState->mqttMessageTopic) }; - (void)mqtt_client_unsubscribe(transportState->mqttClient, transportState->packetId++, unsubscribe, 1); + const char* unsubscribe[1]; + unsubscribe[0] = STRING_c_str(transportState->mqttMessageTopic); + (void)mqtt_client_unsubscribe(transportState->mqttClient, get_next_packet_id(transportState), unsubscribe, 1); transportState->subscribed = false; transportState->receiveMessages = false; } @@ -1122,9 +1144,7 @@ else { mqttMsgEntry->retryCount = 0; - mqttMsgEntry->msgPacketId = transportState->packetId; mqttMsgEntry->iotHubMessageEntry = iothubMsgList; - if (publishMqttMessage(transportState, mqttMsgEntry, messagePayload, messageLength) != 0) { (void)(DList_RemoveEntryList(currentListEntry)); @@ -1205,7 +1225,7 @@ /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_037 : [If the option parameter is set to supplied int_ptr keepalive is the same value as the existing keepalive then IoTHubTransportMqtt_SetOption shall do nothing.] */ if (*keepAliveOption != transportState->keepAliveValue) { - transportState->keepAliveValue = *keepAliveOption; + transportState->keepAliveValue = (uint16_t)(*keepAliveOption); if (transportState->connected) { /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransportMqtt_SetOption shall disconnect and reconnect with the specified keepalive value.] */ @@ -1241,6 +1261,8 @@ static IOTHUB_DEVICE_HANDLE IoTHubTransportMqtt_Register(TRANSPORT_LL_HANDLE handle, const IOTHUB_DEVICE_CONFIG* device, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend) { IOTHUB_DEVICE_HANDLE result; + (void)iotHubClientHandle; + // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_001: [ IoTHubTransportMqtt_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.] // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_002: [ IoTHubTransportMqtt_Register shall return NULL if device or waitingToSend are NULL.] if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL))