Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
39:e98d5df6dc74
Parent:
38:6435c6e3d55c
Child:
41:71c01aa3df1a
--- a/iothubtransport_amqp_common.c	Fri Jul 28 09:51:42 2017 -0700
+++ b/iothubtransport_amqp_common.c	Fri Aug 11 14:01:56 2017 -0700
@@ -59,7 +59,7 @@
     AMQP_TRANSPORT_AUTHENTICATION_MODE_CBS,
     AMQP_TRANSPORT_AUTHENTICATION_MODE_X509
 } AMQP_TRANSPORT_AUTHENTICATION_MODE;
-                                         \
+
 
 #define AMQP_TRANSPORT_STATE_STRINGS              \
     AMQP_TRANSPORT_STATE_NOT_CONNECTED,           \
@@ -130,6 +130,13 @@
     delivery_number message_id;
 } MESSAGE_DISPOSITION_CONTEXT;
 
+typedef struct AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT_TAG
+{
+	uint32_t item_id;
+	IOTHUB_CLIENT_LL_HANDLE client_handle;
+} AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT;
+
+
 // ---------- General Helpers ---------- //
 
 static void free_proxy_data(AMQP_TRANSPORT_INSTANCE* amqp_transport_instance)
@@ -294,6 +301,21 @@
     return is_device_registered_ex(amqp_device_instance->transport_instance->registered_devices, device_id, &list_item);
 }
 
+static size_t get_number_of_registered_devices(AMQP_TRANSPORT_INSTANCE* transport)
+{
+	size_t result = 0;
+
+	LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices);
+
+	while (list_item != NULL)
+	{
+		result++;
+		list_item = singlylinkedlist_get_next_item(list_item);
+	}
+
+	return result;
+}
+
 
 // ---------- Callbacks ---------- //
 
@@ -407,7 +429,6 @@
     return device_disposition_result;
 }
 
-
 #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */
 static void on_methods_error(void* context)
 {
@@ -478,6 +499,47 @@
 }
 #endif
 
+static void on_device_send_twin_update_complete_callback(DEVICE_TWIN_UPDATE_RESULT result, int status_code, void* context)
+{
+	(void)result;
+
+	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_151: [If `context` is NULL, the callback shall return]
+	if (context == NULL)
+	{
+		LogError("Invalid argument (context is NULL)");
+	}
+	else
+	{
+		AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)context;
+
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_152: [`IoTHubClient_LL_ReportedStateComplete` shall be invoked passing `status_code` and `context` details]
+		IoTHubClient_LL_ReportedStateComplete(dev_twin_ctx->client_handle, dev_twin_ctx->item_id, status_code);
+
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_153: [The memory allocated for `context` shall be released]
+		free(dev_twin_ctx);
+	}
+}
+
+static void on_device_twin_update_received_callback(DEVICE_TWIN_UPDATE_TYPE update_type, const unsigned char* message, size_t length, void* context)
+{
+	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_137: [If `context` is NULL, the callback shall return.]
+	if (context == NULL)
+	{
+		LogError("Invalid argument (context is NULL)");
+	}
+	else
+	{
+		AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context;
+
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_138: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_PARTIAL IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_PARTIAL`, `payload` and `size`.]
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_139: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_COMPLETE IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_COMPLETE`, `payload` and `size`.]
+		IoTHubClient_LL_RetrievePropertyComplete(
+			registered_device->iothub_client_handle, 
+			(update_type == DEVICE_TWIN_UPDATE_TYPE_COMPLETE ? DEVICE_TWIN_UPDATE_COMPLETE : DEVICE_TWIN_UPDATE_PARTIAL), 
+			message, length);
+	}
+}
+
 
 // ---------- Underlying TLS I/O Helpers ---------- //
 
@@ -1338,11 +1400,59 @@
 
 IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_AMQP_Common_ProcessItem(TRANSPORT_LL_HANDLE handle, IOTHUB_IDENTITY_TYPE item_type, IOTHUB_IDENTITY_INFO* iothub_item)
 {
-    (void)handle;
-    (void)item_type;
-    (void)iothub_item;
-    LogError("Currently Not Supported.");
-    return IOTHUB_PROCESS_ERROR;
+	IOTHUB_PROCESS_ITEM_RESULT result;
+
+	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_144: [If `handle` or `iothub_item` are NULL, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.]
+	if (handle == NULL || iothub_item == NULL)
+	{
+		LogError("Invalid argument (handle=%p, iothub_item=%p)", handle, iothub_item);
+		result = IOTHUB_PROCESS_ERROR;
+	}
+	else 
+	{
+		if (item_type == IOTHUB_TYPE_DEVICE_TWIN)
+		{
+			AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx;
+			
+			if ((dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)malloc(sizeof(AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT))) == NULL)
+			{
+				LogError("Failed allocating context for TWIN message");
+				result = IOTHUB_PROCESS_ERROR;
+			}
+			else
+			{
+				AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)iothub_item->device_twin->device_handle;
+
+				dev_twin_ctx->client_handle = iothub_item->device_twin->client_handle;
+				dev_twin_ctx->item_id = iothub_item->device_twin->item_id;
+
+				// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_146: [device_send_twin_update_async() shall be invoked passing `iothub_item->device_twin->report_data_handle` and `on_device_send_twin_update_complete_callback`]
+				if (device_send_twin_update_async(
+					registered_device->device_handle,
+					iothub_item->device_twin->report_data_handle,
+					on_device_send_twin_update_complete_callback, (void*)dev_twin_ctx) != RESULT_OK)
+				{
+					// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_147: [If device_send_twin_update_async() fails, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.]
+					LogError("Failed sending TWIN update");
+					free(dev_twin_ctx);
+					result = IOTHUB_PROCESS_ERROR;
+				}
+				else
+				{
+					// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_150: [If no errors occur, `IoTHubTransport_AMQP_Common_ProcessItem` shall return IOTHUB_PROCESS_OK.]
+					result = IOTHUB_PROCESS_OK;
+				}
+			}
+		}
+		// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_145: [If `item_type` is not IOTHUB_TYPE_DEVICE_TWIN, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.]
+		else
+		{
+			LogError("Item type not supported (%d)", item_type);
+			result = IOTHUB_PROCESS_ERROR;
+		}
+	}
+
+	return result;
 }
 
 void IoTHubTransport_AMQP_Common_DoWork(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle)
@@ -1494,18 +1604,97 @@
 
 int IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin(IOTHUB_DEVICE_HANDLE handle)
 {
-    (void)handle;
-    /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_009: [ IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin shall return a non-zero value. ]*/
-    int result = __FAILURE__;
-    LogError("IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin Not supported");
-    return result;
+	int result;
+
+	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_131: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.]
+	if (handle == NULL)
+	{
+		LogError("Invalid argument (handle is NULL");
+		result = __FAILURE__;
+	}
+	else
+	{
+		AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle;
+
+		if (get_number_of_registered_devices(transport) != 1)
+		{
+			LogError("Device Twin not supported on device multiplexing scenario");
+			result = __FAILURE__;
+		}
+		else
+		{
+			LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices);
+
+			// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_136: [If no errors occur, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall return zero.]
+			result = RESULT_OK;
+
+			while (list_item != NULL)
+			{
+				AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
+
+				if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
+				{
+					LogError("Failed retrieving registered device information");
+					result = __FAILURE__;
+					break;
+				}
+				// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_134: [device_subscribe_for_twin_updates() shall be invoked for the registered device, passing `on_device_twin_update_received_callback`]
+				else if (device_subscribe_for_twin_updates(registered_device->device_handle, on_device_twin_update_received_callback, (void*)registered_device) != RESULT_OK)
+				{
+					// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_135: [If device_subscribe_for_twin_updates() fails, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.]
+					LogError("Failed subscribing for device Twin updates");
+					result = __FAILURE__;
+					break;
+				}
+
+				list_item = singlylinkedlist_get_next_item(list_item);
+			}
+		}
+	}
+
+	return result;
 }
 
 void IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin(IOTHUB_DEVICE_HANDLE handle)
 {
-    (void)handle;
-    /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_010: [ IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin shall return. ]*/
-    LogError("IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin Not supported");
+	// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_140: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin` shall return.]
+	if (handle == NULL)
+	{
+		LogError("Invalid argument (handle is NULL");
+	}
+	else
+	{
+		AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle;
+
+		if (get_number_of_registered_devices(transport) != 1)
+		{
+			LogError("Device Twin not supported on device multiplexing scenario");
+		}
+		else
+		{
+			LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices);
+
+			while (list_item != NULL)
+			{
+				AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device;
+
+				if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL)
+				{
+					LogError("Failed retrieving registered device information");
+					break;
+				}
+				// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_142: [device_unsubscribe_for_twin_updates() shall be invoked for the registered device]
+				else if (device_unsubscribe_for_twin_updates(registered_device->device_handle) != RESULT_OK)
+				{
+					// Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_143: [If `device_unsubscribe_for_twin_updates` fails, the error shall be ignored]
+					LogError("Failed unsubscribing for device Twin updates");
+					break;
+				}
+
+				list_item = singlylinkedlist_get_next_item(list_item);
+			}
+		}
+	}
 }
 
 int IoTHubTransport_AMQP_Common_Subscribe_DeviceMethod(IOTHUB_DEVICE_HANDLE handle)
@@ -2045,8 +2234,6 @@
             }
             else
             {
-                // TODO: Q: should we go through waiting_to_send list and raise on_event_send_complete with BECAUSE_DESTROY ?
-
                 // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_012: [IoTHubTransport_AMQP_Common_Unregister shall destroy the C2D methods handler by calling iothubtransportamqp_methods_destroy]
                 // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_083: [IoTHubTransport_AMQP_Common_Unregister shall free all the memory allocated for the `device_instance`]
                 internal_destroy_amqp_device_instance(registered_device);