Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
6:16875b609849
Parent:
5:73603e7a6542
Child:
7:7fdd306e6224
--- a/iothubtransportmqtt.c	Fri Jul 01 10:42:25 2016 -0700
+++ b/iothubtransportmqtt.c	Mon Jul 18 16:44:47 2016 -0700
@@ -28,6 +28,8 @@
 #include <stdarg.h>
 #include <stdio.h>
 
+#include <limits.h>
+
 #define SAS_TOKEN_DEFAULT_LIFETIME  3600
 #define SAS_REFRESH_MULTIPLIER      .8
 #define EPOCH_TIME_T_VALUE          0
@@ -94,7 +96,7 @@
     IOTHUB_CLIENT_LL_HANDLE llClientHandle;
     CONTROL_PACKET_TYPE currPacketState;
     XIO_HANDLE xioTransport;
-    int keepAliveValue;
+    uint16_t keepAliveValue;
     uint64_t mqtt_connect_time;
     size_t connectFailCount;
     uint64_t connectTick;
@@ -110,6 +112,19 @@
     DLIST_ENTRY entry;
 } MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST;
 
+static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transportState)
+{
+    if (transportState->packetId+1 >= USHRT_MAX)
+    {
+        transportState->packetId = 1;
+    }
+    else
+    {
+        transportState->packetId++;
+    }
+    return transportState->packetId;
+}
+
 static void sendMsgComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transportState, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult)
 {
     DLIST_ENTRY messageCompleted;
@@ -168,6 +183,7 @@
 static int publishMqttMessage(PMQTTTRANSPORT_HANDLE_DATA transportState, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
 {
     int result;
+    mqttMsgEntry->msgPacketId = get_next_packet_id(transportState);
     STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transportState->mqttEventTopic));
     if (msgTopic == NULL)
     {
@@ -175,7 +191,7 @@
     }
     else
     {
-        MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(transportState->packetId++, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
+        MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create(mqttMsgEntry->msgPacketId, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
         if (mqttMsg == NULL)
         {
             result = __LINE__;
@@ -431,9 +447,11 @@
 
 const XIO_HANDLE getIoTransportProvider(const char* fqdn, int port)
 {
-    TLSIO_CONFIG tls_io_config = { fqdn, port };
+    TLSIO_CONFIG tls_io_config;
     const IO_INTERFACE_DESCRIPTION* io_interface_description = platform_get_default_tlsio();
-    return (void*)xio_create(io_interface_description, &tls_io_config);
+    tls_io_config.hostname = fqdn;
+    tls_io_config.port = port;
+    return xio_create(io_interface_description, &tls_io_config);
 }
 
 static int SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transportState)
@@ -442,11 +460,13 @@
 
     if (transportState->receiveMessages && !transportState->subscribed)
     {
-        SUBSCRIBE_PAYLOAD subscribe[] = {
-            { STRING_c_str(transportState->mqttMessageTopic), DELIVER_AT_LEAST_ONCE }
-        };
+        SUBSCRIBE_PAYLOAD subscribe[1];
+        
+        subscribe[0].subscribeTopic = STRING_c_str(transportState->mqttMessageTopic);
+        subscribe[0].qosReturn = DELIVER_AT_LEAST_ONCE;
+
         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransportMqtt_Subscribe shall call mqtt_client_subscribe to subscribe to the Message Topic.] */
-        if (mqtt_client_subscribe(transportState->mqttClient, transportState->packetId++, subscribe, 1) != 0)
+        if (mqtt_client_subscribe(transportState->mqttClient, get_next_packet_id(transportState), subscribe, 1) != 0)
         {
             /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransportMqtt_Subscribe shall return a non-zero value.] */
             result = __LINE__;
@@ -469,6 +489,7 @@
         {
             transportState->currPacketState = PUBLISH_TYPE;
         }
+        result = 0;
     }
     return result;
 }
@@ -1019,8 +1040,9 @@
     if (transportState != NULL && transportState->subscribed)
     {
         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_020: [IoTHubTransportMqtt_Unsubscribe shall call mqtt_client_unsubscribe to unsubscribe the mqtt message topic.] */
-        const char* unsubscribe[] = { STRING_c_str(transportState->mqttMessageTopic) };
-        (void)mqtt_client_unsubscribe(transportState->mqttClient, transportState->packetId++, unsubscribe, 1);
+        const char* unsubscribe[1];
+        unsubscribe[0] = STRING_c_str(transportState->mqttMessageTopic);
+        (void)mqtt_client_unsubscribe(transportState->mqttClient, get_next_packet_id(transportState), unsubscribe, 1);
         transportState->subscribed = false;
         transportState->receiveMessages = false;
     }
@@ -1122,9 +1144,7 @@
                         else
                         {
                             mqttMsgEntry->retryCount = 0;
-                            mqttMsgEntry->msgPacketId = transportState->packetId;
                             mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
-
                             if (publishMqttMessage(transportState, mqttMsgEntry, messagePayload, messageLength) != 0)
                             {
                                 (void)(DList_RemoveEntryList(currentListEntry));
@@ -1205,7 +1225,7 @@
             /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_037 : [If the option parameter is set to supplied int_ptr keepalive is the same value as the existing keepalive then IoTHubTransportMqtt_SetOption shall do nothing.] */
             if (*keepAliveOption != transportState->keepAliveValue)
             {
-                transportState->keepAliveValue = *keepAliveOption;
+                transportState->keepAliveValue = (uint16_t)(*keepAliveOption);
                 if (transportState->connected)
                 {
                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransportMqtt_SetOption shall disconnect and reconnect with the specified keepalive value.] */
@@ -1241,6 +1261,8 @@
 static IOTHUB_DEVICE_HANDLE IoTHubTransportMqtt_Register(TRANSPORT_LL_HANDLE handle, const IOTHUB_DEVICE_CONFIG* device, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle, PDLIST_ENTRY waitingToSend)
 {
     IOTHUB_DEVICE_HANDLE result;
+    (void)iotHubClientHandle;
+
     // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_001: [ IoTHubTransportMqtt_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
     // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_002: [ IoTHubTransportMqtt_Register shall return NULL if device or waitingToSend are NULL.]
     if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL))