Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
39:e98d5df6dc74
Parent:
36:f78f9a56869e
Child:
41:71c01aa3df1a
--- a/iothubtransport_amqp_device.c	Fri Jul 28 09:51:42 2017 -0700
+++ b/iothubtransport_amqp_device.c	Fri Aug 11 14:01:56 2017 -0700
@@ -2,7 +2,6 @@
 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
 
 #include <stdlib.h>
-#include "iothubtransport_amqp_telemetry_messenger.h"
 #include "azure_c_shared_utility/optimize_size.h"
 #include "azure_c_shared_utility/gballoc.h"
 #include "azure_c_shared_utility/agenttime.h" 
@@ -10,6 +9,16 @@
 #include "azure_c_shared_utility/strings.h"
 #include "iothubtransport_amqp_cbs_auth.h"
 #include "iothubtransport_amqp_device.h"
+#include "iothubtransport_amqp_telemetry_messenger.h"
+#include "iothubtransport_amqp_twin_messenger.h"
+
+DEFINE_ENUM_STRINGS(DEVICE_STATE, DEVICE_STATE_VALUES);
+DEFINE_ENUM_STRINGS(DEVICE_AUTH_MODE, DEVICE_AUTH_MODE_VALUES);
+DEFINE_ENUM_STRINGS(DEVICE_SEND_STATUS, DEVICE_SEND_STATUS_VALUES);
+DEFINE_ENUM_STRINGS(D2C_EVENT_SEND_RESULT, D2C_EVENT_SEND_RESULT_VALUES);
+DEFINE_ENUM_STRINGS(DEVICE_MESSAGE_DISPOSITION_RESULT, DEVICE_MESSAGE_DISPOSITION_RESULT_VALUES);
+DEFINE_ENUM_STRINGS(DEVICE_TWIN_UPDATE_RESULT, DEVICE_TWIN_UPDATE_RESULT_STRINGS);
+DEFINE_ENUM_STRINGS(DEVICE_TWIN_UPDATE_TYPE, DEVICE_TWIN_UPDATE_TYPE_STRINGS)
 
 #define RESULT_OK                                  0
 #define INDEFINITE_TIME                            ((time_t)-1)
@@ -40,6 +49,13 @@
 
     ON_DEVICE_C2D_MESSAGE_RECEIVED on_message_received_callback;
     void* on_message_received_context;
+
+	TWIN_MESSENGER_HANDLE twin_messenger_handle;
+	TWIN_MESSENGER_STATE twin_msgr_state;
+	time_t twin_msgr_state_last_changed_time;
+	size_t twin_msgr_state_change_timeout_secs;
+	DEVICE_TWIN_UPDATE_RECEIVED_CALLBACK on_device_twin_update_received_callback;
+	void* on_device_twin_update_received_context;
 } DEVICE_INSTANCE;
 
 typedef struct DEVICE_SEND_EVENT_TASK_TAG
@@ -48,6 +64,12 @@
     void* on_event_send_complete_context;
 } DEVICE_SEND_EVENT_TASK;
 
+typedef struct DEVICE_SEND_TWIN_UPDATE_CONTEXT_TAG
+{
+	DEVICE_SEND_TWIN_UPDATE_COMPLETE_CALLBACK on_send_twin_update_complete_callback;
+	void* context;
+} DEVICE_SEND_TWIN_UPDATE_CONTEXT;
+
 // Internal state control
 static void update_state(DEVICE_INSTANCE* instance, DEVICE_STATE new_state)
 {
@@ -99,7 +121,9 @@
     return result;
 }
 
-// Callback Handlers
+
+//---------- Callback Handlers ----------//
+
 static D2C_EVENT_SEND_RESULT get_d2c_event_send_result_from(TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT result)
 {
     D2C_EVENT_SEND_RESULT d2c_esr;
@@ -207,6 +231,95 @@
     }
 }
 
+static DEVICE_TWIN_UPDATE_RESULT get_device_twin_update_result_from(TWIN_REPORT_STATE_RESULT result)
+{
+	DEVICE_TWIN_UPDATE_RESULT device_result;
+
+	switch (result)
+	{
+	case TWIN_REPORT_STATE_RESULT_SUCCESS:
+		device_result = DEVICE_TWIN_UPDATE_RESULT_OK;
+		break;
+	case TWIN_REPORT_STATE_RESULT_ERROR:
+		device_result = DEVICE_TWIN_UPDATE_RESULT_ERROR;
+		break;
+	default:
+		device_result = DEVICE_TWIN_UPDATE_RESULT_ERROR;
+	};
+
+	return device_result;
+}
+
+static void on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT result, TWIN_REPORT_STATE_REASON reason, int status_code, const void* context)
+{
+	(void)reason;
+	
+	if (context == NULL)
+	{
+		LogError("Invalid argument (context is NULL)");
+	}
+	else
+	{
+		DEVICE_SEND_TWIN_UPDATE_CONTEXT* twin_ctx = (DEVICE_SEND_TWIN_UPDATE_CONTEXT*)context;
+		
+		// Codes_SRS_DEVICE_09_141: [on_send_twin_update_complete_callback (if provided by user) shall be invoked passing the corresponding device result and `status_code`]
+		if (twin_ctx->on_send_twin_update_complete_callback != NULL)
+		{
+			DEVICE_TWIN_UPDATE_RESULT device_result;
+
+			device_result = get_device_twin_update_result_from(result);
+
+			twin_ctx->on_send_twin_update_complete_callback(device_result, status_code, twin_ctx->context);
+		}
+
+		// Codes_SRS_DEVICE_09_142: [Memory allocated for `context` shall be released]
+		free(twin_ctx);
+	}
+}
+
+static void on_twin_state_update_callback(TWIN_UPDATE_TYPE update_type, const char* payload, size_t size, const void* context)
+{
+	if (payload == NULL || context == NULL)
+	{
+		LogError("Invalid argument (context=%p, payload=%p)", context, payload);
+	}
+	else
+	{
+		DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)context;
+
+		DEVICE_TWIN_UPDATE_TYPE device_update_type;
+
+		if (update_type == TWIN_UPDATE_TYPE_COMPLETE)
+		{
+			device_update_type = DEVICE_TWIN_UPDATE_TYPE_COMPLETE;
+		}
+		else
+		{
+			device_update_type = DEVICE_TWIN_UPDATE_TYPE_PARTIAL;
+		}
+
+		// Codes_SRS_DEVICE_09_151: [on_device_twin_update_received_callback (provided by user) shall be invoked passing the corresponding update type, `payload` and `size`]
+		instance->on_device_twin_update_received_callback(device_update_type, (const unsigned char*)payload, size, instance->on_device_twin_update_received_context);
+	}
+}
+
+static void on_twin_messenger_state_changed_callback(void* context, TWIN_MESSENGER_STATE previous_state, TWIN_MESSENGER_STATE new_state)
+{
+	if (context != NULL && new_state != previous_state)
+	{
+		DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)context;
+		instance->twin_msgr_state = new_state;
+
+		if ((instance->twin_msgr_state_last_changed_time = get_time(NULL)) == INDEFINITE_TIME)
+		{
+			LogError("Failed setting time of last twin messenger state changed event");
+		}
+	}
+}
+
+
+//---------- Message Dispostion ----------//
+
 static DEVICE_MESSAGE_DISPOSITION_INFO* create_device_message_disposition_info_from(TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* messenger_disposition_info)
 {
     DEVICE_MESSAGE_DISPOSITION_INFO* device_disposition_info;
@@ -339,7 +452,9 @@
     return msgr_disposition_result;
 }
 
-// Configuration Helpers
+
+//---------- Configuration Helpers ----------//
+
 static void destroy_device_config(DEVICE_CONFIG* config)
 {
     if (config != NULL)
@@ -417,6 +532,11 @@
             telemetry_messenger_destroy(instance->messenger_handle);
         }
 
+		if (instance->twin_messenger_handle != NULL)
+		{
+			twin_messenger_destroy(instance->twin_messenger_handle);
+		}
+
         if (instance->authentication_handle != NULL)
         {
             authentication_destroy(instance->authentication_handle);
@@ -447,7 +567,7 @@
     return result;
 }
 
-static int create_messenger_instance(DEVICE_INSTANCE* instance, const char* pi)
+static int create_telemetry_messenger_instance(DEVICE_INSTANCE* instance, const char* pi)
 {
     int result;
 
@@ -470,7 +590,32 @@
     return result;
 }
 
+static int create_twin_messenger(DEVICE_INSTANCE* instance)
+{
+	int result;
+	TWIN_MESSENGER_CONFIG twin_msgr_config;
+
+	twin_msgr_config.client_version = instance->config->product_info;
+	twin_msgr_config.device_id = instance->config->device_id;
+	twin_msgr_config.iothub_host_fqdn = instance->config->iothub_host_fqdn;
+	twin_msgr_config.on_state_changed_callback = on_twin_messenger_state_changed_callback;
+	twin_msgr_config.on_state_changed_context = (void*)instance;
+
+	if ((instance->twin_messenger_handle = twin_messenger_create(&twin_msgr_config)) == NULL)
+	{
+		result = __FAILURE__;
+	}
+	else
+	{
+		result = RESULT_OK;
+	}
+
+	return result;
+}
+
+
 // ---------- Set/Retrieve Options Helpers ----------//
+
 static void* device_clone_option(const char* name, const void* value)
 {
     void* result;
@@ -520,7 +665,9 @@
     }
 }
 
-// Public APIs:
+
+//---------- Public APIs ----------//
+
 DEVICE_HANDLE device_create(DEVICE_CONFIG *config)
 {
     DEVICE_INSTANCE *instance;
@@ -574,21 +721,31 @@
             result = __FAILURE__;
         }
         // Codes_SRS_DEVICE_09_008: [`instance->messenger_handle` shall be set using telemetry_messenger_create()]
-        else if (create_messenger_instance(instance, config->product_info) != RESULT_OK)
+        else if (create_telemetry_messenger_instance(instance, config->product_info) != RESULT_OK)
         {
             // Codes_SRS_DEVICE_09_009: [If the TELEMETRY_MESSENGER_HANDLE fails to be created, device_create shall fail and return NULL]
             LogError("Failed creating the device instance for device '%s' (failed creating the messenger instance)", instance->config->device_id);
             result = __FAILURE__;
         }
+		// Codes_SRS_DEVICE_09_122: [`instance->twin_messenger_handle` shall be set using twin_messenger_create()]
+		else if (create_twin_messenger(instance) != RESULT_OK)
+		{
+			// Codes_SRS_DEVICE_09_123: [If the TWIN_MESSENGER_HANDLE fails to be created, device_create shall fail and return NULL]
+			LogError("Failed creating the twin messenger for device '%s'", instance->config->device_id);
+			result = __FAILURE__;
+		}
         else
         {
             instance->auth_state = AUTHENTICATION_STATE_STOPPED;
-            instance->msgr_state = TELEMETRY_MESSENGER_STATE_STOPPED;
+			instance->msgr_state = TELEMETRY_MESSENGER_STATE_STOPPED;
+			instance->twin_msgr_state = TWIN_MESSENGER_STATE_STOPPED;
             instance->state = DEVICE_STATE_STOPPED;
             instance->auth_state_last_changed_time = INDEFINITE_TIME;
             instance->auth_state_change_timeout_secs = DEFAULT_AUTH_STATE_CHANGED_TIMEOUT_SECS;
             instance->msgr_state_last_changed_time = INDEFINITE_TIME;
             instance->msgr_state_change_timeout_secs = DEFAULT_MSGR_STATE_CHANGED_TIMEOUT_SECS;
+			instance->twin_msgr_state_last_changed_time = INDEFINITE_TIME;
+			instance->twin_msgr_state_change_timeout_secs = DEFAULT_MSGR_STATE_CHANGED_TIMEOUT_SECS;
 
             result = RESULT_OK;
         }
@@ -689,7 +846,17 @@
                 telemetry_messenger_stop(instance->messenger_handle) != RESULT_OK)
             {
                 // Codes_SRS_DEVICE_09_028: [If messenger_stop fails, the `instance` state shall be updated to DEVICE_STATE_ERROR_MSG and the function shall return non-zero result]
-                LogError("Failed stopping device '%s' (messenger_stop failed)", instance->config->device_id);
+                LogError("Failed stopping device '%s' (telemetry_messenger_stop failed)", instance->config->device_id);
+                result = __FAILURE__;
+                update_state(instance, DEVICE_STATE_ERROR_MSG);
+            }
+			// Codes_SRS_DEVICE_09_131: [If `instance->twin_messenger_handle` state is not TWIN_MESSENGER_STATE_STOPPED, twin_messenger_stop shall be invoked]
+            else if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPED &&
+                instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPING &&
+                twin_messenger_stop(instance->twin_messenger_handle) != RESULT_OK)
+            {
+				// Codes_SRS_DEVICE_09_132: [If twin_messenger_stop fails, the `instance` state shall be updated to DEVICE_STATE_ERROR_MSG and the function shall return non-zero result]
+				LogError("Failed stopping device '%s' (twin_messenger_stop failed)", instance->config->device_id);
                 result = __FAILURE__;
                 update_state(instance, DEVICE_STATE_ERROR_MSG);
             }
@@ -778,8 +945,11 @@
             }
 
             // Codes_SRS_DEVICE_09_040: [Messenger shall not be started if using CBS authentication and authentication start has not completed yet]
-            if (instance->config->authentication_mode == DEVICE_AUTH_MODE_X509 || instance->auth_state == AUTHENTICATION_STATE_STARTED)
+			// Codes_SRS_DEVICE_09_124: [TWIN Messenger shall not be started if using CBS authentication and authentication start has not completed yet]
+			if (instance->config->authentication_mode == DEVICE_AUTH_MODE_X509 || instance->auth_state == AUTHENTICATION_STATE_STARTED)
             {
+				size_t number_of_messengers_started = 0;
+
                 // Codes_SRS_DEVICE_09_041: [If messenger state is TELEMETRY_MESSENGER_STATE_STOPPED, messenger_start shall be invoked]
                 if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STOPPED)
                 {
@@ -819,8 +989,55 @@
                 // Codes_SRS_DEVICE_09_046: [If messenger state is TELEMETRY_MESSENGER_STATE_STARTED, the device state shall be updated to DEVICE_STATE_STARTED]
                 else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STARTED)
                 {
-                    update_state(instance, DEVICE_STATE_STARTED);
+					number_of_messengers_started++;
                 }
+
+				// Codes_SRS_DEVICE_09_125: [If TWIN messenger state is TWIN_MESSENGER_STATE_STOPPED, twin_messenger_start shall be invoked]
+				if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STOPPED)
+				{
+					if (twin_messenger_start(instance->twin_messenger_handle, instance->session_handle) != RESULT_OK)
+					{
+						// Codes_SRS_DEVICE_09_126: [If twin_messenger_start fails, the device state shall be updated to DEVICE_STATE_ERROR_MSG]
+						LogError("Device '%s' twin messenger failed to be started (messenger_start failed)", instance->config->device_id);
+
+						update_state(instance, DEVICE_STATE_ERROR_MSG);
+					}
+				}
+				else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTING)
+				{
+					int is_timed_out;
+					if (is_timeout_reached(instance->twin_msgr_state_last_changed_time, instance->twin_msgr_state_change_timeout_secs, &is_timed_out) != RESULT_OK)
+					{
+						LogError("Device '%s' failed verifying the timeout for twin messenger start (is_timeout_reached failed)", instance->config->device_id);
+
+						update_state(instance, DEVICE_STATE_ERROR_MSG);
+					}
+					// Codes_SRS_DEVICE_09_127: [If TWIN messenger state is TWIN_MESSENGER_STATE_STARTING, the device shall track the time since last event change and timeout if needed]
+					else if (is_timed_out == 1)
+					{
+						// Codes_SRS_DEVICE_09_128: [If twin_messenger_start times out, the device state shall be updated to DEVICE_STATE_ERROR_MSG]
+						LogError("Device '%s' twin messenger did not complete starting within expected timeout (%d)", instance->config->device_id, instance->twin_msgr_state_change_timeout_secs);
+
+						update_state(instance, DEVICE_STATE_ERROR_MSG);
+					}
+				}
+				// Codes_SRS_DEVICE_09_129: [If TWIN messenger state is TWIN_MESSENGER_STATE_ERROR, the device state shall be updated to DEVICE_STATE_ERROR_MSG]
+				else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_ERROR)
+				{
+					LogError("Device '%s' twin messenger failed to be started (messenger got into error state)", instance->config->device_id);
+
+					update_state(instance, DEVICE_STATE_ERROR_MSG);
+				}
+				// Codes_SRS_DEVICE_09_130: [If TWIN messenger state is TWIN_MESSENGER_STATE_STARTED, the device state shall be updated to DEVICE_STATE_STARTED]
+				else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTED)
+				{
+					number_of_messengers_started++;
+				}
+
+				if (number_of_messengers_started == 2)
+				{
+					update_state(instance, DEVICE_STATE_STARTED);
+				}
             }
         }
         else if (instance->state == DEVICE_STATE_STARTED)
@@ -855,6 +1072,13 @@
                     LogError("Device '%s' is started but messenger reported unexpected state %d", instance->config->device_id, instance->msgr_state);
                     update_state(instance, DEVICE_STATE_ERROR_MSG);
                 }
+
+				// Codes_SRS_DEVICE_09_133: [If TWIN messenger state is not TWIN_MESSENGER_STATE_STARTED, the device state shall be updated to DEVICE_STATE_ERROR_MSG]
+				if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STARTED)
+				{
+					LogError("Device '%s' is started but TWIN messenger reported unexpected state %d", instance->config->device_id, instance->msgr_state);
+					update_state(instance, DEVICE_STATE_ERROR_MSG);
+				}
             }
         }
 
@@ -870,9 +1094,15 @@
 
         if (instance->msgr_state != TELEMETRY_MESSENGER_STATE_STOPPED && instance->msgr_state != TELEMETRY_MESSENGER_STATE_ERROR)
         {
-            // Codes_SRS_DEVICE_09_050: [If `instance->messenger_handle` state is not STOPPED or ERROR, authentication_do_work shall be invoked]
+            // Codes_SRS_DEVICE_09_050: [If `instance->messenger_handle` state is not STOPPED or ERROR, telemetry_messenger_do_work shall be invoked]
             telemetry_messenger_do_work(instance->messenger_handle);
         }
+
+		if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPED && instance->twin_msgr_state != TWIN_MESSENGER_STATE_ERROR)
+		{
+			// Codes_SRS_DEVICE_09_134: [If `instance->twin_messenger_handle` state is not STOPPED or ERROR, twin_messenger_do_work shall be invoked]
+			twin_messenger_do_work(instance->twin_messenger_handle);
+		}
     }
 }
 
@@ -1315,3 +1545,122 @@
 
     return result;
 }
+
+int device_send_twin_update_async(DEVICE_HANDLE handle, CONSTBUFFER_HANDLE data, DEVICE_SEND_TWIN_UPDATE_COMPLETE_CALLBACK on_send_twin_update_complete_callback, void* context)
+{
+	int result;
+
+	// Codes_SRS_DEVICE_09_135: [If `handle` or `data` are NULL, device_send_twin_update_async shall return a non-zero result]
+	if (handle == NULL || data == NULL)
+	{
+		LogError("Invalid argument (handle=%p, data=%p)", handle, data);
+		result = __FAILURE__;
+	}
+	else
+	{
+		DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)handle;
+		DEVICE_SEND_TWIN_UPDATE_CONTEXT* twin_ctx;
+
+		// Codes_SRS_DEVICE_09_136: [A structure (`twin_ctx`) shall be created to track the send state of the twin report]
+		if ((twin_ctx = (DEVICE_SEND_TWIN_UPDATE_CONTEXT*)malloc(sizeof(DEVICE_SEND_TWIN_UPDATE_CONTEXT))) == NULL)
+		{
+			// Codes_SRS_DEVICE_09_137: [If `twin_ctx` fails to be created, device_send_twin_update_async shall return a non-zero value]
+			LogError("Cannot send twin update (failed creating TWIN context)");
+			result = __FAILURE__;
+		}
+		else
+		{
+			twin_ctx->on_send_twin_update_complete_callback = on_send_twin_update_complete_callback;
+			twin_ctx->context = context;
+
+			// Codes_SRS_DEVICE_09_138: [The twin report shall be sent using twin_messenger_report_state_async, passing `on_report_state_complete_callback` and `twin_ctx`]
+			if (twin_messenger_report_state_async(instance->twin_messenger_handle, data, on_report_state_complete_callback, (const void*)twin_ctx) != 0)
+			{
+				// Codes_SRS_DEVICE_09_139: [If twin_messenger_report_state_async fails, device_send_twin_update_async shall return a non-zero value]
+				LogError("Cannot send twin update (failed creating TWIN messenger)");
+				free(twin_ctx);
+				result = __FAILURE__;
+			}
+			else
+			{
+				// Codes_SRS_DEVICE_09_140: [If no failures occur, device_send_twin_update_async shall return 0]
+				result = RESULT_OK;
+			}
+		}
+	}
+
+	return result;
+}
+
+int device_subscribe_for_twin_updates(DEVICE_HANDLE handle, DEVICE_TWIN_UPDATE_RECEIVED_CALLBACK on_device_twin_update_received_callback, void* context)
+{
+	int result;
+
+	// Codes_SRS_DEVICE_09_143: [If `handle` or `on_device_twin_update_received_callback` are NULL, device_subscribe_for_twin_updates shall return a non-zero result]
+	if (handle == NULL || on_device_twin_update_received_callback == NULL)
+	{
+		LogError("Invalid argument (handle=%p, on_device_twin_update_received_callback=%p)", handle, on_device_twin_update_received_callback);
+		result = __FAILURE__;
+	}
+	else
+	{
+		DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)handle;
+
+		DEVICE_TWIN_UPDATE_RECEIVED_CALLBACK previous_callback = instance->on_device_twin_update_received_callback;
+		void* previous_context = instance->on_device_twin_update_received_context;
+
+		instance->on_device_twin_update_received_callback = on_device_twin_update_received_callback;
+		instance->on_device_twin_update_received_context = context;
+
+		// Codes_SRS_DEVICE_09_144: [twin_messenger_subscribe shall be invoked passing `on_twin_state_update_callback`]
+		if (twin_messenger_subscribe(instance->twin_messenger_handle, on_twin_state_update_callback, (void*)instance) != 0)
+		{
+			// Codes_SRS_DEVICE_09_145: [If twin_messenger_subscribe fails, device_subscribe_for_twin_updates shall return a non-zero value]
+			LogError("Failed subscribing for device twin updates");
+			instance->on_device_twin_update_received_callback = previous_callback;
+			instance->on_device_twin_update_received_context = previous_context;
+			result = __FAILURE__;
+		}
+		else
+		{
+			// Codes_SRS_DEVICE_09_146: [If no failures occur, device_subscribe_for_twin_updates shall return 0]
+			result = RESULT_OK;
+		}
+	}
+
+	return result;
+}
+
+int device_unsubscribe_for_twin_updates(DEVICE_HANDLE handle)
+{
+	int result;
+
+
+	// Codes_SRS_DEVICE_09_147: [If `handle` is NULL, device_unsubscribe_for_twin_updates shall return a non-zero result]
+	if (handle == NULL)
+	{
+		LogError("Invalid argument (handle is NULL)");
+		result = __FAILURE__;
+	}
+	else
+	{
+		DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)handle;
+
+		// Codes_SRS_DEVICE_09_148: [twin_messenger_unsubscribe shall be invoked passing `on_twin_state_update_callback`]
+		if (twin_messenger_unsubscribe(instance->twin_messenger_handle) != 0)
+		{
+			// Codes_SRS_DEVICE_09_149: [If twin_messenger_unsubscribe fails, device_unsubscribe_for_twin_updates shall return a non-zero value]
+			LogError("Failed unsubscribing for device twin updates");
+			result = __FAILURE__;
+		}
+		else
+		{
+			instance->on_device_twin_update_received_callback = NULL;
+			instance->on_device_twin_update_received_context = NULL;
+			// Codes_SRS_DEVICE_09_150: [If no failures occur, device_unsubscribe_for_twin_updates shall return 0]
+			result = RESULT_OK;
+		}
+	}
+
+	return result;
+}
\ No newline at end of file