Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_common.c
- Revision:
- 39:e98d5df6dc74
- Parent:
- 38:6435c6e3d55c
- Child:
- 41:71c01aa3df1a
--- a/iothubtransport_amqp_common.c Fri Jul 28 09:51:42 2017 -0700 +++ b/iothubtransport_amqp_common.c Fri Aug 11 14:01:56 2017 -0700 @@ -59,7 +59,7 @@ AMQP_TRANSPORT_AUTHENTICATION_MODE_CBS, AMQP_TRANSPORT_AUTHENTICATION_MODE_X509 } AMQP_TRANSPORT_AUTHENTICATION_MODE; - \ + #define AMQP_TRANSPORT_STATE_STRINGS \ AMQP_TRANSPORT_STATE_NOT_CONNECTED, \ @@ -130,6 +130,13 @@ delivery_number message_id; } MESSAGE_DISPOSITION_CONTEXT; +typedef struct AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT_TAG +{ + uint32_t item_id; + IOTHUB_CLIENT_LL_HANDLE client_handle; +} AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT; + + // ---------- General Helpers ---------- // static void free_proxy_data(AMQP_TRANSPORT_INSTANCE* amqp_transport_instance) @@ -294,6 +301,21 @@ return is_device_registered_ex(amqp_device_instance->transport_instance->registered_devices, device_id, &list_item); } +static size_t get_number_of_registered_devices(AMQP_TRANSPORT_INSTANCE* transport) +{ + size_t result = 0; + + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); + + while (list_item != NULL) + { + result++; + list_item = singlylinkedlist_get_next_item(list_item); + } + + return result; +} + // ---------- Callbacks ---------- // @@ -407,7 +429,6 @@ return device_disposition_result; } - #ifdef WIP_C2D_METHODS_AMQP /* This feature is WIP, do not use yet */ static void on_methods_error(void* context) { @@ -478,6 +499,47 @@ } #endif +static void on_device_send_twin_update_complete_callback(DEVICE_TWIN_UPDATE_RESULT result, int status_code, void* context) +{ + (void)result; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_151: [If `context` is NULL, the callback shall return] + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)context; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_152: [`IoTHubClient_LL_ReportedStateComplete` shall be invoked passing `status_code` and `context` details] + IoTHubClient_LL_ReportedStateComplete(dev_twin_ctx->client_handle, dev_twin_ctx->item_id, status_code); + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_153: [The memory allocated for `context` shall be released] + free(dev_twin_ctx); + } +} + +static void on_device_twin_update_received_callback(DEVICE_TWIN_UPDATE_TYPE update_type, const unsigned char* message, size_t length, void* context) +{ + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_137: [If `context` is NULL, the callback shall return.] + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_138: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_PARTIAL IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_PARTIAL`, `payload` and `size`.] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_139: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_COMPLETE IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_COMPLETE`, `payload` and `size`.] + IoTHubClient_LL_RetrievePropertyComplete( + registered_device->iothub_client_handle, + (update_type == DEVICE_TWIN_UPDATE_TYPE_COMPLETE ? DEVICE_TWIN_UPDATE_COMPLETE : DEVICE_TWIN_UPDATE_PARTIAL), + message, length); + } +} + // ---------- Underlying TLS I/O Helpers ---------- // @@ -1338,11 +1400,59 @@ IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_AMQP_Common_ProcessItem(TRANSPORT_LL_HANDLE handle, IOTHUB_IDENTITY_TYPE item_type, IOTHUB_IDENTITY_INFO* iothub_item) { - (void)handle; - (void)item_type; - (void)iothub_item; - LogError("Currently Not Supported."); - return IOTHUB_PROCESS_ERROR; + IOTHUB_PROCESS_ITEM_RESULT result; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_144: [If `handle` or `iothub_item` are NULL, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] + if (handle == NULL || iothub_item == NULL) + { + LogError("Invalid argument (handle=%p, iothub_item=%p)", handle, iothub_item); + result = IOTHUB_PROCESS_ERROR; + } + else + { + if (item_type == IOTHUB_TYPE_DEVICE_TWIN) + { + AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx; + + if ((dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)malloc(sizeof(AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT))) == NULL) + { + LogError("Failed allocating context for TWIN message"); + result = IOTHUB_PROCESS_ERROR; + } + else + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)iothub_item->device_twin->device_handle; + + dev_twin_ctx->client_handle = iothub_item->device_twin->client_handle; + dev_twin_ctx->item_id = iothub_item->device_twin->item_id; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_146: [device_send_twin_update_async() shall be invoked passing `iothub_item->device_twin->report_data_handle` and `on_device_send_twin_update_complete_callback`] + if (device_send_twin_update_async( + registered_device->device_handle, + iothub_item->device_twin->report_data_handle, + on_device_send_twin_update_complete_callback, (void*)dev_twin_ctx) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_147: [If device_send_twin_update_async() fails, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] + LogError("Failed sending TWIN update"); + free(dev_twin_ctx); + result = IOTHUB_PROCESS_ERROR; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_150: [If no errors occur, `IoTHubTransport_AMQP_Common_ProcessItem` shall return IOTHUB_PROCESS_OK.] + result = IOTHUB_PROCESS_OK; + } + } + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_145: [If `item_type` is not IOTHUB_TYPE_DEVICE_TWIN, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] + else + { + LogError("Item type not supported (%d)", item_type); + result = IOTHUB_PROCESS_ERROR; + } + } + + return result; } void IoTHubTransport_AMQP_Common_DoWork(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) @@ -1494,18 +1604,97 @@ int IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin(IOTHUB_DEVICE_HANDLE handle) { - (void)handle; - /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_009: [ IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin shall return a non-zero value. ]*/ - int result = __FAILURE__; - LogError("IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin Not supported"); - return result; + int result; + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_131: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.] + if (handle == NULL) + { + LogError("Invalid argument (handle is NULL"); + result = __FAILURE__; + } + else + { + AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle; + + if (get_number_of_registered_devices(transport) != 1) + { + LogError("Device Twin not supported on device multiplexing scenario"); + result = __FAILURE__; + } + else + { + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); + + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_136: [If no errors occur, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall return zero.] + result = RESULT_OK; + + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("Failed retrieving registered device information"); + result = __FAILURE__; + break; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_134: [device_subscribe_for_twin_updates() shall be invoked for the registered device, passing `on_device_twin_update_received_callback`] + else if (device_subscribe_for_twin_updates(registered_device->device_handle, on_device_twin_update_received_callback, (void*)registered_device) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_135: [If device_subscribe_for_twin_updates() fails, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.] + LogError("Failed subscribing for device Twin updates"); + result = __FAILURE__; + break; + } + + list_item = singlylinkedlist_get_next_item(list_item); + } + } + } + + return result; } void IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin(IOTHUB_DEVICE_HANDLE handle) { - (void)handle; - /*Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_02_010: [ IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin shall return. ]*/ - LogError("IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin Not supported"); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_140: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin` shall return.] + if (handle == NULL) + { + LogError("Invalid argument (handle is NULL"); + } + else + { + AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle; + + if (get_number_of_registered_devices(transport) != 1) + { + LogError("Device Twin not supported on device multiplexing scenario"); + } + else + { + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); + + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("Failed retrieving registered device information"); + break; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_142: [device_unsubscribe_for_twin_updates() shall be invoked for the registered device] + else if (device_unsubscribe_for_twin_updates(registered_device->device_handle) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_143: [If `device_unsubscribe_for_twin_updates` fails, the error shall be ignored] + LogError("Failed unsubscribing for device Twin updates"); + break; + } + + list_item = singlylinkedlist_get_next_item(list_item); + } + } + } } int IoTHubTransport_AMQP_Common_Subscribe_DeviceMethod(IOTHUB_DEVICE_HANDLE handle) @@ -2045,8 +2234,6 @@ } else { - // TODO: Q: should we go through waiting_to_send list and raise on_event_send_complete with BECAUSE_DESTROY ? - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_01_012: [IoTHubTransport_AMQP_Common_Unregister shall destroy the C2D methods handler by calling iothubtransportamqp_methods_destroy] // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_083: [IoTHubTransport_AMQP_Common_Unregister shall free all the memory allocated for the `device_instance`] internal_destroy_amqp_device_instance(registered_device);