Microsoft Azure IoTHub client MQTT transport

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
14:4dc2b011be33
Parent:
13:606465879c57
Child:
15:81c95c94c8d7
--- a/iothubtransport_mqtt_common.c	Wed Dec 14 15:59:37 2016 -0800
+++ b/iothubtransport_mqtt_common.c	Sun Jan 08 11:11:42 2017 -0800
@@ -49,6 +49,8 @@
 #define ERROR_TIME_FOR_RETRY_SECS   5
 #define WAIT_TIME_SECS              (ERROR_TIME_FOR_RETRY_SECS - 1)
 
+#define TOUPPER(c)      (((c>='a') && (c<='z'))?c-'a'+'A':c)
+
 static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin";
 static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods";
 
@@ -66,7 +68,7 @@
 static const char* PROPERTY_SEPARATOR = "&";
 static const char* REPORTED_PROPERTIES_TOPIC = "$iothub/twin/PATCH/properties/reported/?$rid=%"PRIu16;
 static const char* GET_PROPERTIES_TOPIC = "$iothub/twin/GET/?$rid=%"PRIu16;
-static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%"PRIu16;
+static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%s";
 
 static const char* REQUEST_ID_PROPERTY = "?$rid=";
 
@@ -231,6 +233,11 @@
     DLIST_ENTRY entry;
 } MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST;
 
+typedef struct DEVICE_METHOD_INFO_TAG
+{
+    STRING_HANDLE request_id;
+} DEVICE_METHOD_INFO;
+
 static int RetryPolicy_Exponential_BackOff_With_Jitter(bool *permit, size_t* delay, void* retryContextCallback)
 {
     int result;
@@ -558,7 +565,7 @@
     }
 }
 
-static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, uint16_t* request_id)
+static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
 {
     int result;
     STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
@@ -566,7 +573,6 @@
     {
         LogError("Failed creating token from device twin topic.");
         result = __LINE__;
-        *request_id = 0;
     }
     else
     {
@@ -575,7 +581,6 @@
         {
             LogError("Failed allocating new string .");
             result = __LINE__;
-            *request_id = 0;
         }
         else
         {
@@ -588,8 +593,8 @@
                 {
                     if (STRING_concat_with_STRING(method_name, token_value) != 0)
                     {
+                        LogError("Failed STRING_concat_with_STRING.");
                         result = __LINE__;
-                        *request_id = 0;
                         break;
                     }
                 }
@@ -600,8 +605,15 @@
                         const char* request_id_value = STRING_c_str(token_value);
                         if (memcmp(request_id_value, REQUEST_ID_PROPERTY, request_id_length) == 0)
                         {
-                            *request_id = (uint16_t)atol(request_id_value+request_id_length);
-                            result = 0;
+                            if (STRING_concat(request_id, request_id_value+request_id_length) != 0)
+                            {
+                                LogError("Failed STRING_concat failed.");
+                                result = __LINE__;
+                            }
+                            else
+                            {
+                                result = 0;
+                            }
                             break;
                         }
                     }
@@ -710,7 +722,7 @@
                     {
                         break;
                     }
-                    else if (toupper(TOPIC_DEVICE_TWIN_PREFIX[index]) != toupper(topic_resp[index]))
+                    else if (TOUPPER(TOPIC_DEVICE_TWIN_PREFIX[index]) != TOUPPER(topic_resp[index]))
                     {
                         search_device_twin = false;
                     }
@@ -728,7 +740,7 @@
                     {
                         break;
                     }
-                    else if (toupper(TOPIC_DEVICE_METHOD_PREFIX[index]) != toupper(topic_resp[index]))
+                    else if (TOUPPER(TOPIC_DEVICE_METHOD_PREFIX[index]) != TOUPPER(topic_resp[index]))
                     {
                         search_device_method = false;
                     }
@@ -835,12 +847,12 @@
     return result;
 }
 
-static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, uint16_t request_id, BUFFER_HANDLE response)
+static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size)
 {
     int result;
     uint16_t packet_id = get_next_packet_id(transport_data);
 
-    STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, request_id);
+    STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id) );
     if (msg_topic == NULL)
     {
         LogError("Failed constructing message topic.");
@@ -848,7 +860,7 @@
     }
     else
     {
-        MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, BUFFER_u_char(response), BUFFER_length(response));
+        MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, response, response_size);
         if (mqtt_get_msg == NULL)
         {
             LogError("Failed constructing mqtt message.");
@@ -1088,8 +1100,10 @@
 
 static void mqtt_notification_callback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx)
 {
+    /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqtt_notification_callback shall do nothing. ] */
     if (msgHandle != NULL && callbackCtx != NULL)
     {
+        /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqtt_notification_callback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */
         const char* topic_resp = mqttmessage_getTopicName(msgHandle);
         if (topic_resp == NULL)
         {
@@ -1129,10 +1143,12 @@
                                 (void)DList_RemoveEntryList(dev_twin_item);
                                 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES)
                                 {
+                                    /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClient_LL_RetrievePropertyComplete... ] */
                                     IoTHubClient_LL_RetrievePropertyComplete(transportData->llClientHandle, DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length);
                                 }
                                 else
                                 {
+                                    /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClient_LL_ReportedStateComplete ] */
                                     IoTHubClient_LL_ReportedStateComplete(transportData->llClientHandle, msg_entry->iothub_msg_id, status_code);
                                 }
                                 free(msg_entry);
@@ -1145,7 +1161,6 @@
             }
             else if (type == IOTHUB_TYPE_DEVICE_METHODS)
             {
-                uint16_t request_id;
                 STRING_HANDLE method_name = STRING_new();
                 if (method_name == NULL)
                 {
@@ -1153,24 +1168,36 @@
                 }
                 else
                 {
-                    BUFFER_HANDLE result_buffer;
-                    if (retrieve_device_method_rid_info(topic_resp, method_name, &request_id) != 0)
+                    DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO) );
+                    if (dev_method_info == NULL)
                     {
-                        LogError("Failure: retrieve device topic info");
-                    }
-                    else if ((result_buffer = BUFFER_new()) == NULL)
-                    {
-                        LogError("Failure: retrieve device topic info");
+                        LogError("Failure: allocating DEVICE_METHOD_INFO object");
                     }
                     else
                     {
-                        const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
-                        int status_code = IoTHubClient_LL_DeviceMethodComplete(transportData->llClientHandle, STRING_c_str(method_name), payload->message, payload->length, result_buffer);
-                        if (publish_device_method_message(transportData, status_code, request_id, result_buffer) != 0)
+                        dev_method_info->request_id = STRING_new();
+                        if (dev_method_info->request_id == NULL)
+                        {
+                            LogError("Failure constructing request_id string");
+                            free(dev_method_info);
+                        }
+                        else if (retrieve_device_method_rid_info(topic_resp, method_name, dev_method_info->request_id) != 0)
                         {
-                            LogError("Failure: publishing device method response");
+                            LogError("Failure: retrieve device topic info");
+                            STRING_delete(dev_method_info->request_id);
+                            free(dev_method_info);
                         }
-                        BUFFER_delete(result_buffer);
+                        else
+                        {
+                            /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqtt_notification_callback shall call IoTHubClient_LL_DeviceMethodComplete. ] */
+                            const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
+                            if (IoTHubClient_LL_DeviceMethodComplete(transportData->llClientHandle, STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info) != 0)
+                            {
+                                LogError("Failure: IoTHubClient_LL_DeviceMethodComplete");
+                                STRING_delete(dev_method_info->request_id);
+                                free(dev_method_info);
+                            }
+                        }
                     }
                     STRING_delete(method_name);
                 }
@@ -1192,6 +1219,7 @@
                     }
                     else
                     {
+                        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqtt_notification_callback shall call IoTHubClient_LL_MessageCallback. ] */
                         if (IoTHubClient_LL_MessageCallback(transportData->llClientHandle, IoTHubMessage) != IOTHUBMESSAGE_ACCEPTED)
                         {
                             LogError("Event not accepted by our client.");
@@ -2156,6 +2184,44 @@
     }
 }
 
+int IoTHubTransport_MQTT_Common_DeviceMethod_Response(IOTHUB_DEVICE_HANDLE handle, METHOD_HANDLE methodId, const unsigned char* response, size_t respSize, int status)
+{
+    int result;
+    MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
+    if (transport_data != NULL)
+    {
+        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_042: [ IoTHubTransport_MQTT_Common_DeviceMethod_Response shall publish an mqtt message for the device method response. ] */
+        DEVICE_METHOD_INFO* dev_method_info = (DEVICE_METHOD_INFO*)methodId;
+        if (dev_method_info == NULL)
+        {
+            LogError("Failure: DEVICE_METHOD_INFO was NULL");
+            result = __LINE__;
+        }
+        else
+        {
+            if (publish_device_method_message(transport_data, status, dev_method_info->request_id, response, respSize) != 0)
+            {
+                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
+                LogError("Failure: publishing device method response");
+                result = __LINE__;
+            }
+            else
+            {
+                result = 0;
+            }
+            STRING_delete(dev_method_info->request_id);
+            free(dev_method_info);
+        }
+    }
+    else
+    {
+        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
+        result = __LINE__;
+        LogError("Failure: invalid IOTHUB_DEVICE_HANDLE parameter specified");
+    }
+    return result;
+}
+
 int IoTHubTransport_MQTT_Common_Subscribe(IOTHUB_DEVICE_HANDLE handle)
 {
     int result;
@@ -2577,9 +2643,9 @@
     return result;
 }
 
-// Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ IoTHubTransport_MQTT_Common_Unregister shall return. ]
 void IoTHubTransport_MQTT_Common_Unregister(IOTHUB_DEVICE_HANDLE deviceHandle)
 {
+    // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ If deviceHandle is NULL `IoTHubTransport_MQTT_Common_Unregister` shall do nothing. ]
     if (deviceHandle != NULL)
     {
         MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)deviceHandle;