Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_common.c
- Revision:
- 41:71c01aa3df1a
- Parent:
- 39:e98d5df6dc74
- Child:
- 42:c2eaa912a28c
--- a/iothubtransport_amqp_common.c Fri Aug 25 11:22:00 2017 -0700 +++ b/iothubtransport_amqp_common.c Mon Sep 11 09:22:16 2017 -0700 @@ -132,8 +132,8 @@ typedef struct AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT_TAG { - uint32_t item_id; - IOTHUB_CLIENT_LL_HANDLE client_handle; + uint32_t item_id; + IOTHUB_CLIENT_LL_HANDLE client_handle; } AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT; @@ -181,10 +181,7 @@ static void update_state(AMQP_TRANSPORT_INSTANCE* transport_instance, AMQP_TRANSPORT_STATE new_state) { - AMQP_TRANSPORT_STATE previous_state = transport_instance->state; transport_instance->state = new_state; - - LogInfo("Transport state changed from %s to %s", ENUM_TO_STRING(AMQP_TRANSPORT_STATE, previous_state), ENUM_TO_STRING(AMQP_TRANSPORT_STATE, new_state)); } static void reset_retry_control(AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device) @@ -216,6 +213,25 @@ free(trdev_inst); } +static DEVICE_AUTH_MODE get_authentication_mode(const IOTHUB_DEVICE_CONFIG* device) +{ + DEVICE_AUTH_MODE result; + + if (device->deviceKey != NULL || device->deviceSasToken != NULL) + { + result = DEVICE_AUTH_MODE_CBS; + } + else if (IoTHubClient_Auth_Get_Credential_Type(device->authorization_module) == IOTHUB_CREDENTIAL_TYPE_DEVICE_AUTH) + { + result = DEVICE_AUTH_MODE_CBS; + } + else + { + result = DEVICE_AUTH_MODE_X509; + } + return result; +} + // @brief // Saves the new state, if it is different than the previous one. static void on_device_state_changed_callback(void* context, DEVICE_STATE previous_state, DEVICE_STATE new_state) @@ -303,17 +319,17 @@ static size_t get_number_of_registered_devices(AMQP_TRANSPORT_INSTANCE* transport) { - size_t result = 0; + size_t result = 0; - LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); - while (list_item != NULL) - { - result++; - list_item = singlylinkedlist_get_next_item(list_item); - } + while (list_item != NULL) + { + result++; + list_item = singlylinkedlist_get_next_item(list_item); + } - return result; + return result; } @@ -501,43 +517,43 @@ static void on_device_send_twin_update_complete_callback(DEVICE_TWIN_UPDATE_RESULT result, int status_code, void* context) { - (void)result; + (void)result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_151: [If `context` is NULL, the callback shall return] - if (context == NULL) - { - LogError("Invalid argument (context is NULL)"); - } - else - { - AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)context; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_151: [If `context` is NULL, the callback shall return] + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)context; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_152: [`IoTHubClient_LL_ReportedStateComplete` shall be invoked passing `status_code` and `context` details] - IoTHubClient_LL_ReportedStateComplete(dev_twin_ctx->client_handle, dev_twin_ctx->item_id, status_code); + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_152: [`IoTHubClient_LL_ReportedStateComplete` shall be invoked passing `status_code` and `context` details] + IoTHubClient_LL_ReportedStateComplete(dev_twin_ctx->client_handle, dev_twin_ctx->item_id, status_code); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_153: [The memory allocated for `context` shall be released] - free(dev_twin_ctx); - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_153: [The memory allocated for `context` shall be released] + free(dev_twin_ctx); + } } static void on_device_twin_update_received_callback(DEVICE_TWIN_UPDATE_TYPE update_type, const unsigned char* message, size_t length, void* context) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_137: [If `context` is NULL, the callback shall return.] - if (context == NULL) - { - LogError("Invalid argument (context is NULL)"); - } - else - { - AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_137: [If `context` is NULL, the callback shall return.] + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)context; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_138: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_PARTIAL IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_PARTIAL`, `payload` and `size`.] - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_139: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_COMPLETE IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_COMPLETE`, `payload` and `size`.] - IoTHubClient_LL_RetrievePropertyComplete( - registered_device->iothub_client_handle, - (update_type == DEVICE_TWIN_UPDATE_TYPE_COMPLETE ? DEVICE_TWIN_UPDATE_COMPLETE : DEVICE_TWIN_UPDATE_PARTIAL), - message, length); - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_138: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_PARTIAL IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_PARTIAL`, `payload` and `size`.] + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_139: [If `update_type` is DEVICE_TWIN_UPDATE_TYPE_COMPLETE IoTHubClient_LL_RetrievePropertyComplete shall be invoked passing `context` as handle, `DEVICE_TWIN_UPDATE_COMPLETE`, `payload` and `size`.] + IoTHubClient_LL_RetrievePropertyComplete( + registered_device->iothub_client_handle, + (update_type == DEVICE_TWIN_UPDATE_TYPE_COMPLETE ? DEVICE_TWIN_UPDATE_COMPLETE : DEVICE_TWIN_UPDATE_PARTIAL), + message, length); + } } @@ -656,6 +672,25 @@ } else { + // If this is the DPS x509 ECC certificate + if (IoTHubClient_Auth_Get_Credential_Type(transport_instance->authorization_module) == IOTHUB_CREDENTIAL_TYPE_X509_ECC) + { + // Set the xio_handle + if (IoTHubClient_Auth_Set_xio_Certificate(transport_instance->authorization_module, *xio_handle) != 0) + { + LogError("Unable to create the lower level TLS layer."); + result = __FAILURE__; + } + else + { + result = 0; + } + } + else + { + result = 0; + } + if (restore_underlying_io_transport_options(transport_instance, *xio_handle) != RESULT_OK) { /*pessimistically hope TLS will fail, be recreated and options re-given*/ @@ -1400,59 +1435,59 @@ IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_AMQP_Common_ProcessItem(TRANSPORT_LL_HANDLE handle, IOTHUB_IDENTITY_TYPE item_type, IOTHUB_IDENTITY_INFO* iothub_item) { - IOTHUB_PROCESS_ITEM_RESULT result; + IOTHUB_PROCESS_ITEM_RESULT result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_144: [If `handle` or `iothub_item` are NULL, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] - if (handle == NULL || iothub_item == NULL) - { - LogError("Invalid argument (handle=%p, iothub_item=%p)", handle, iothub_item); - result = IOTHUB_PROCESS_ERROR; - } - else - { - if (item_type == IOTHUB_TYPE_DEVICE_TWIN) - { - AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx; - - if ((dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)malloc(sizeof(AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT))) == NULL) - { - LogError("Failed allocating context for TWIN message"); - result = IOTHUB_PROCESS_ERROR; - } - else - { - AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)iothub_item->device_twin->device_handle; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_144: [If `handle` or `iothub_item` are NULL, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] + if (handle == NULL || iothub_item == NULL) + { + LogError("Invalid argument (handle=%p, iothub_item=%p)", handle, iothub_item); + result = IOTHUB_PROCESS_ERROR; + } + else + { + if (item_type == IOTHUB_TYPE_DEVICE_TWIN) + { + AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx; + + if ((dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)malloc(sizeof(AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT))) == NULL) + { + LogError("Failed allocating context for TWIN message"); + result = IOTHUB_PROCESS_ERROR; + } + else + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)iothub_item->device_twin->device_handle; - dev_twin_ctx->client_handle = iothub_item->device_twin->client_handle; - dev_twin_ctx->item_id = iothub_item->device_twin->item_id; + dev_twin_ctx->client_handle = iothub_item->device_twin->client_handle; + dev_twin_ctx->item_id = iothub_item->device_twin->item_id; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_146: [device_send_twin_update_async() shall be invoked passing `iothub_item->device_twin->report_data_handle` and `on_device_send_twin_update_complete_callback`] - if (device_send_twin_update_async( - registered_device->device_handle, - iothub_item->device_twin->report_data_handle, - on_device_send_twin_update_complete_callback, (void*)dev_twin_ctx) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_147: [If device_send_twin_update_async() fails, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] - LogError("Failed sending TWIN update"); - free(dev_twin_ctx); - result = IOTHUB_PROCESS_ERROR; - } - else - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_150: [If no errors occur, `IoTHubTransport_AMQP_Common_ProcessItem` shall return IOTHUB_PROCESS_OK.] - result = IOTHUB_PROCESS_OK; - } - } - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_145: [If `item_type` is not IOTHUB_TYPE_DEVICE_TWIN, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] - else - { - LogError("Item type not supported (%d)", item_type); - result = IOTHUB_PROCESS_ERROR; - } - } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_146: [device_send_twin_update_async() shall be invoked passing `iothub_item->device_twin->report_data_handle` and `on_device_send_twin_update_complete_callback`] + if (device_send_twin_update_async( + registered_device->device_handle, + iothub_item->device_twin->report_data_handle, + on_device_send_twin_update_complete_callback, (void*)dev_twin_ctx) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_147: [If device_send_twin_update_async() fails, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] + LogError("Failed sending TWIN update"); + free(dev_twin_ctx); + result = IOTHUB_PROCESS_ERROR; + } + else + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_150: [If no errors occur, `IoTHubTransport_AMQP_Common_ProcessItem` shall return IOTHUB_PROCESS_OK.] + result = IOTHUB_PROCESS_OK; + } + } + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_145: [If `item_type` is not IOTHUB_TYPE_DEVICE_TWIN, `IoTHubTransport_AMQP_Common_ProcessItem` shall fail and return IOTHUB_PROCESS_ERROR.] + else + { + LogError("Item type not supported (%d)", item_type); + result = IOTHUB_PROCESS_ERROR; + } + } - return result; + return result; } void IoTHubTransport_AMQP_Common_DoWork(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle) @@ -1604,97 +1639,97 @@ int IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin(IOTHUB_DEVICE_HANDLE handle) { - int result; + int result; - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_131: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.] - if (handle == NULL) - { - LogError("Invalid argument (handle is NULL"); - result = __FAILURE__; - } - else - { - AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_131: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.] + if (handle == NULL) + { + LogError("Invalid argument (handle is NULL"); + result = __FAILURE__; + } + else + { + AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle; - if (get_number_of_registered_devices(transport) != 1) - { - LogError("Device Twin not supported on device multiplexing scenario"); - result = __FAILURE__; - } - else - { - LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); + if (get_number_of_registered_devices(transport) != 1) + { + LogError("Device Twin not supported on device multiplexing scenario"); + result = __FAILURE__; + } + else + { + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_136: [If no errors occur, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall return zero.] - result = RESULT_OK; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_136: [If no errors occur, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall return zero.] + result = RESULT_OK; - while (list_item != NULL) - { - AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; - if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) - { - LogError("Failed retrieving registered device information"); - result = __FAILURE__; - break; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_134: [device_subscribe_for_twin_updates() shall be invoked for the registered device, passing `on_device_twin_update_received_callback`] - else if (device_subscribe_for_twin_updates(registered_device->device_handle, on_device_twin_update_received_callback, (void*)registered_device) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_135: [If device_subscribe_for_twin_updates() fails, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.] - LogError("Failed subscribing for device Twin updates"); - result = __FAILURE__; - break; - } + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("Failed retrieving registered device information"); + result = __FAILURE__; + break; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_134: [device_subscribe_for_twin_updates() shall be invoked for the registered device, passing `on_device_twin_update_received_callback`] + else if (device_subscribe_for_twin_updates(registered_device->device_handle, on_device_twin_update_received_callback, (void*)registered_device) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_135: [If device_subscribe_for_twin_updates() fails, `IoTHubTransport_AMQP_Common_Subscribe_DeviceTwin` shall fail and return non-zero.] + LogError("Failed subscribing for device Twin updates"); + result = __FAILURE__; + break; + } - list_item = singlylinkedlist_get_next_item(list_item); - } - } - } + list_item = singlylinkedlist_get_next_item(list_item); + } + } + } - return result; + return result; } void IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin(IOTHUB_DEVICE_HANDLE handle) { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_140: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin` shall return.] - if (handle == NULL) - { - LogError("Invalid argument (handle is NULL"); - } - else - { - AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle; + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_140: [If `handle` is NULL, `IoTHubTransport_AMQP_Common_Unsubscribe_DeviceTwin` shall return.] + if (handle == NULL) + { + LogError("Invalid argument (handle is NULL"); + } + else + { + AMQP_TRANSPORT_INSTANCE* transport = (AMQP_TRANSPORT_INSTANCE*)handle; - if (get_number_of_registered_devices(transport) != 1) - { - LogError("Device Twin not supported on device multiplexing scenario"); - } - else - { - LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); + if (get_number_of_registered_devices(transport) != 1) + { + LogError("Device Twin not supported on device multiplexing scenario"); + } + else + { + LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(transport->registered_devices); - while (list_item != NULL) - { - AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; + while (list_item != NULL) + { + AMQP_TRANSPORT_DEVICE_INSTANCE* registered_device; - if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) - { - LogError("Failed retrieving registered device information"); - break; - } - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_142: [device_unsubscribe_for_twin_updates() shall be invoked for the registered device] - else if (device_unsubscribe_for_twin_updates(registered_device->device_handle) != RESULT_OK) - { - // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_143: [If `device_unsubscribe_for_twin_updates` fails, the error shall be ignored] - LogError("Failed unsubscribing for device Twin updates"); - break; - } + if ((registered_device = (AMQP_TRANSPORT_DEVICE_INSTANCE*)singlylinkedlist_item_get_value(list_item)) == NULL) + { + LogError("Failed retrieving registered device information"); + break; + } + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_142: [device_unsubscribe_for_twin_updates() shall be invoked for the registered device] + else if (device_unsubscribe_for_twin_updates(registered_device->device_handle) != RESULT_OK) + { + // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_143: [If `device_unsubscribe_for_twin_updates` fails, the error shall be ignored] + LogError("Failed unsubscribing for device Twin updates"); + break; + } - list_item = singlylinkedlist_get_next_item(list_item); - } - } - } + list_item = singlylinkedlist_get_next_item(list_item); + } + } + } } int IoTHubTransport_AMQP_Common_Subscribe_DeviceMethod(IOTHUB_DEVICE_HANDLE handle) @@ -2124,7 +2159,7 @@ device_config.authorization_module = device->authorization_module; // Codes_SRS_IOTHUBTRANSPORT_AMQP_COMMON_09_072: [The configuration for device_create shall be set according to the authentication preferred by IOTHUB_DEVICE_CONFIG] - device_config.authentication_mode = (device->deviceKey != NULL || device->deviceSasToken != NULL ? DEVICE_AUTH_MODE_CBS : DEVICE_AUTH_MODE_X509); + device_config.authentication_mode = get_authentication_mode(device); device_config.on_state_changed_callback = on_device_state_changed_callback; device_config.on_state_changed_context = amqp_device_instance; device_config.product_info = local_product_info;