Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
Diff: iothubtransport_amqp_device.c
- Revision:
- 39:e98d5df6dc74
- Parent:
- 36:f78f9a56869e
- Child:
- 41:71c01aa3df1a
--- a/iothubtransport_amqp_device.c Fri Jul 28 09:51:42 2017 -0700 +++ b/iothubtransport_amqp_device.c Fri Aug 11 14:01:56 2017 -0700 @@ -2,7 +2,6 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. #include <stdlib.h> -#include "iothubtransport_amqp_telemetry_messenger.h" #include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/gballoc.h" #include "azure_c_shared_utility/agenttime.h" @@ -10,6 +9,16 @@ #include "azure_c_shared_utility/strings.h" #include "iothubtransport_amqp_cbs_auth.h" #include "iothubtransport_amqp_device.h" +#include "iothubtransport_amqp_telemetry_messenger.h" +#include "iothubtransport_amqp_twin_messenger.h" + +DEFINE_ENUM_STRINGS(DEVICE_STATE, DEVICE_STATE_VALUES); +DEFINE_ENUM_STRINGS(DEVICE_AUTH_MODE, DEVICE_AUTH_MODE_VALUES); +DEFINE_ENUM_STRINGS(DEVICE_SEND_STATUS, DEVICE_SEND_STATUS_VALUES); +DEFINE_ENUM_STRINGS(D2C_EVENT_SEND_RESULT, D2C_EVENT_SEND_RESULT_VALUES); +DEFINE_ENUM_STRINGS(DEVICE_MESSAGE_DISPOSITION_RESULT, DEVICE_MESSAGE_DISPOSITION_RESULT_VALUES); +DEFINE_ENUM_STRINGS(DEVICE_TWIN_UPDATE_RESULT, DEVICE_TWIN_UPDATE_RESULT_STRINGS); +DEFINE_ENUM_STRINGS(DEVICE_TWIN_UPDATE_TYPE, DEVICE_TWIN_UPDATE_TYPE_STRINGS) #define RESULT_OK 0 #define INDEFINITE_TIME ((time_t)-1) @@ -40,6 +49,13 @@ ON_DEVICE_C2D_MESSAGE_RECEIVED on_message_received_callback; void* on_message_received_context; + + TWIN_MESSENGER_HANDLE twin_messenger_handle; + TWIN_MESSENGER_STATE twin_msgr_state; + time_t twin_msgr_state_last_changed_time; + size_t twin_msgr_state_change_timeout_secs; + DEVICE_TWIN_UPDATE_RECEIVED_CALLBACK on_device_twin_update_received_callback; + void* on_device_twin_update_received_context; } DEVICE_INSTANCE; typedef struct DEVICE_SEND_EVENT_TASK_TAG @@ -48,6 +64,12 @@ void* on_event_send_complete_context; } DEVICE_SEND_EVENT_TASK; +typedef struct DEVICE_SEND_TWIN_UPDATE_CONTEXT_TAG +{ + DEVICE_SEND_TWIN_UPDATE_COMPLETE_CALLBACK on_send_twin_update_complete_callback; + void* context; +} DEVICE_SEND_TWIN_UPDATE_CONTEXT; + // Internal state control static void update_state(DEVICE_INSTANCE* instance, DEVICE_STATE new_state) { @@ -99,7 +121,9 @@ return result; } -// Callback Handlers + +//---------- Callback Handlers ----------// + static D2C_EVENT_SEND_RESULT get_d2c_event_send_result_from(TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT result) { D2C_EVENT_SEND_RESULT d2c_esr; @@ -207,6 +231,95 @@ } } +static DEVICE_TWIN_UPDATE_RESULT get_device_twin_update_result_from(TWIN_REPORT_STATE_RESULT result) +{ + DEVICE_TWIN_UPDATE_RESULT device_result; + + switch (result) + { + case TWIN_REPORT_STATE_RESULT_SUCCESS: + device_result = DEVICE_TWIN_UPDATE_RESULT_OK; + break; + case TWIN_REPORT_STATE_RESULT_ERROR: + device_result = DEVICE_TWIN_UPDATE_RESULT_ERROR; + break; + default: + device_result = DEVICE_TWIN_UPDATE_RESULT_ERROR; + }; + + return device_result; +} + +static void on_report_state_complete_callback(TWIN_REPORT_STATE_RESULT result, TWIN_REPORT_STATE_REASON reason, int status_code, const void* context) +{ + (void)reason; + + if (context == NULL) + { + LogError("Invalid argument (context is NULL)"); + } + else + { + DEVICE_SEND_TWIN_UPDATE_CONTEXT* twin_ctx = (DEVICE_SEND_TWIN_UPDATE_CONTEXT*)context; + + // Codes_SRS_DEVICE_09_141: [on_send_twin_update_complete_callback (if provided by user) shall be invoked passing the corresponding device result and `status_code`] + if (twin_ctx->on_send_twin_update_complete_callback != NULL) + { + DEVICE_TWIN_UPDATE_RESULT device_result; + + device_result = get_device_twin_update_result_from(result); + + twin_ctx->on_send_twin_update_complete_callback(device_result, status_code, twin_ctx->context); + } + + // Codes_SRS_DEVICE_09_142: [Memory allocated for `context` shall be released] + free(twin_ctx); + } +} + +static void on_twin_state_update_callback(TWIN_UPDATE_TYPE update_type, const char* payload, size_t size, const void* context) +{ + if (payload == NULL || context == NULL) + { + LogError("Invalid argument (context=%p, payload=%p)", context, payload); + } + else + { + DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)context; + + DEVICE_TWIN_UPDATE_TYPE device_update_type; + + if (update_type == TWIN_UPDATE_TYPE_COMPLETE) + { + device_update_type = DEVICE_TWIN_UPDATE_TYPE_COMPLETE; + } + else + { + device_update_type = DEVICE_TWIN_UPDATE_TYPE_PARTIAL; + } + + // Codes_SRS_DEVICE_09_151: [on_device_twin_update_received_callback (provided by user) shall be invoked passing the corresponding update type, `payload` and `size`] + instance->on_device_twin_update_received_callback(device_update_type, (const unsigned char*)payload, size, instance->on_device_twin_update_received_context); + } +} + +static void on_twin_messenger_state_changed_callback(void* context, TWIN_MESSENGER_STATE previous_state, TWIN_MESSENGER_STATE new_state) +{ + if (context != NULL && new_state != previous_state) + { + DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)context; + instance->twin_msgr_state = new_state; + + if ((instance->twin_msgr_state_last_changed_time = get_time(NULL)) == INDEFINITE_TIME) + { + LogError("Failed setting time of last twin messenger state changed event"); + } + } +} + + +//---------- Message Dispostion ----------// + static DEVICE_MESSAGE_DISPOSITION_INFO* create_device_message_disposition_info_from(TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* messenger_disposition_info) { DEVICE_MESSAGE_DISPOSITION_INFO* device_disposition_info; @@ -339,7 +452,9 @@ return msgr_disposition_result; } -// Configuration Helpers + +//---------- Configuration Helpers ----------// + static void destroy_device_config(DEVICE_CONFIG* config) { if (config != NULL) @@ -417,6 +532,11 @@ telemetry_messenger_destroy(instance->messenger_handle); } + if (instance->twin_messenger_handle != NULL) + { + twin_messenger_destroy(instance->twin_messenger_handle); + } + if (instance->authentication_handle != NULL) { authentication_destroy(instance->authentication_handle); @@ -447,7 +567,7 @@ return result; } -static int create_messenger_instance(DEVICE_INSTANCE* instance, const char* pi) +static int create_telemetry_messenger_instance(DEVICE_INSTANCE* instance, const char* pi) { int result; @@ -470,7 +590,32 @@ return result; } +static int create_twin_messenger(DEVICE_INSTANCE* instance) +{ + int result; + TWIN_MESSENGER_CONFIG twin_msgr_config; + + twin_msgr_config.client_version = instance->config->product_info; + twin_msgr_config.device_id = instance->config->device_id; + twin_msgr_config.iothub_host_fqdn = instance->config->iothub_host_fqdn; + twin_msgr_config.on_state_changed_callback = on_twin_messenger_state_changed_callback; + twin_msgr_config.on_state_changed_context = (void*)instance; + + if ((instance->twin_messenger_handle = twin_messenger_create(&twin_msgr_config)) == NULL) + { + result = __FAILURE__; + } + else + { + result = RESULT_OK; + } + + return result; +} + + // ---------- Set/Retrieve Options Helpers ----------// + static void* device_clone_option(const char* name, const void* value) { void* result; @@ -520,7 +665,9 @@ } } -// Public APIs: + +//---------- Public APIs ----------// + DEVICE_HANDLE device_create(DEVICE_CONFIG *config) { DEVICE_INSTANCE *instance; @@ -574,21 +721,31 @@ result = __FAILURE__; } // Codes_SRS_DEVICE_09_008: [`instance->messenger_handle` shall be set using telemetry_messenger_create()] - else if (create_messenger_instance(instance, config->product_info) != RESULT_OK) + else if (create_telemetry_messenger_instance(instance, config->product_info) != RESULT_OK) { // Codes_SRS_DEVICE_09_009: [If the TELEMETRY_MESSENGER_HANDLE fails to be created, device_create shall fail and return NULL] LogError("Failed creating the device instance for device '%s' (failed creating the messenger instance)", instance->config->device_id); result = __FAILURE__; } + // Codes_SRS_DEVICE_09_122: [`instance->twin_messenger_handle` shall be set using twin_messenger_create()] + else if (create_twin_messenger(instance) != RESULT_OK) + { + // Codes_SRS_DEVICE_09_123: [If the TWIN_MESSENGER_HANDLE fails to be created, device_create shall fail and return NULL] + LogError("Failed creating the twin messenger for device '%s'", instance->config->device_id); + result = __FAILURE__; + } else { instance->auth_state = AUTHENTICATION_STATE_STOPPED; - instance->msgr_state = TELEMETRY_MESSENGER_STATE_STOPPED; + instance->msgr_state = TELEMETRY_MESSENGER_STATE_STOPPED; + instance->twin_msgr_state = TWIN_MESSENGER_STATE_STOPPED; instance->state = DEVICE_STATE_STOPPED; instance->auth_state_last_changed_time = INDEFINITE_TIME; instance->auth_state_change_timeout_secs = DEFAULT_AUTH_STATE_CHANGED_TIMEOUT_SECS; instance->msgr_state_last_changed_time = INDEFINITE_TIME; instance->msgr_state_change_timeout_secs = DEFAULT_MSGR_STATE_CHANGED_TIMEOUT_SECS; + instance->twin_msgr_state_last_changed_time = INDEFINITE_TIME; + instance->twin_msgr_state_change_timeout_secs = DEFAULT_MSGR_STATE_CHANGED_TIMEOUT_SECS; result = RESULT_OK; } @@ -689,7 +846,17 @@ telemetry_messenger_stop(instance->messenger_handle) != RESULT_OK) { // Codes_SRS_DEVICE_09_028: [If messenger_stop fails, the `instance` state shall be updated to DEVICE_STATE_ERROR_MSG and the function shall return non-zero result] - LogError("Failed stopping device '%s' (messenger_stop failed)", instance->config->device_id); + LogError("Failed stopping device '%s' (telemetry_messenger_stop failed)", instance->config->device_id); + result = __FAILURE__; + update_state(instance, DEVICE_STATE_ERROR_MSG); + } + // Codes_SRS_DEVICE_09_131: [If `instance->twin_messenger_handle` state is not TWIN_MESSENGER_STATE_STOPPED, twin_messenger_stop shall be invoked] + else if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPED && + instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPING && + twin_messenger_stop(instance->twin_messenger_handle) != RESULT_OK) + { + // Codes_SRS_DEVICE_09_132: [If twin_messenger_stop fails, the `instance` state shall be updated to DEVICE_STATE_ERROR_MSG and the function shall return non-zero result] + LogError("Failed stopping device '%s' (twin_messenger_stop failed)", instance->config->device_id); result = __FAILURE__; update_state(instance, DEVICE_STATE_ERROR_MSG); } @@ -778,8 +945,11 @@ } // Codes_SRS_DEVICE_09_040: [Messenger shall not be started if using CBS authentication and authentication start has not completed yet] - if (instance->config->authentication_mode == DEVICE_AUTH_MODE_X509 || instance->auth_state == AUTHENTICATION_STATE_STARTED) + // Codes_SRS_DEVICE_09_124: [TWIN Messenger shall not be started if using CBS authentication and authentication start has not completed yet] + if (instance->config->authentication_mode == DEVICE_AUTH_MODE_X509 || instance->auth_state == AUTHENTICATION_STATE_STARTED) { + size_t number_of_messengers_started = 0; + // Codes_SRS_DEVICE_09_041: [If messenger state is TELEMETRY_MESSENGER_STATE_STOPPED, messenger_start shall be invoked] if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STOPPED) { @@ -819,8 +989,55 @@ // Codes_SRS_DEVICE_09_046: [If messenger state is TELEMETRY_MESSENGER_STATE_STARTED, the device state shall be updated to DEVICE_STATE_STARTED] else if (instance->msgr_state == TELEMETRY_MESSENGER_STATE_STARTED) { - update_state(instance, DEVICE_STATE_STARTED); + number_of_messengers_started++; } + + // Codes_SRS_DEVICE_09_125: [If TWIN messenger state is TWIN_MESSENGER_STATE_STOPPED, twin_messenger_start shall be invoked] + if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STOPPED) + { + if (twin_messenger_start(instance->twin_messenger_handle, instance->session_handle) != RESULT_OK) + { + // Codes_SRS_DEVICE_09_126: [If twin_messenger_start fails, the device state shall be updated to DEVICE_STATE_ERROR_MSG] + LogError("Device '%s' twin messenger failed to be started (messenger_start failed)", instance->config->device_id); + + update_state(instance, DEVICE_STATE_ERROR_MSG); + } + } + else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTING) + { + int is_timed_out; + if (is_timeout_reached(instance->twin_msgr_state_last_changed_time, instance->twin_msgr_state_change_timeout_secs, &is_timed_out) != RESULT_OK) + { + LogError("Device '%s' failed verifying the timeout for twin messenger start (is_timeout_reached failed)", instance->config->device_id); + + update_state(instance, DEVICE_STATE_ERROR_MSG); + } + // Codes_SRS_DEVICE_09_127: [If TWIN messenger state is TWIN_MESSENGER_STATE_STARTING, the device shall track the time since last event change and timeout if needed] + else if (is_timed_out == 1) + { + // Codes_SRS_DEVICE_09_128: [If twin_messenger_start times out, the device state shall be updated to DEVICE_STATE_ERROR_MSG] + LogError("Device '%s' twin messenger did not complete starting within expected timeout (%d)", instance->config->device_id, instance->twin_msgr_state_change_timeout_secs); + + update_state(instance, DEVICE_STATE_ERROR_MSG); + } + } + // Codes_SRS_DEVICE_09_129: [If TWIN messenger state is TWIN_MESSENGER_STATE_ERROR, the device state shall be updated to DEVICE_STATE_ERROR_MSG] + else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_ERROR) + { + LogError("Device '%s' twin messenger failed to be started (messenger got into error state)", instance->config->device_id); + + update_state(instance, DEVICE_STATE_ERROR_MSG); + } + // Codes_SRS_DEVICE_09_130: [If TWIN messenger state is TWIN_MESSENGER_STATE_STARTED, the device state shall be updated to DEVICE_STATE_STARTED] + else if (instance->twin_msgr_state == TWIN_MESSENGER_STATE_STARTED) + { + number_of_messengers_started++; + } + + if (number_of_messengers_started == 2) + { + update_state(instance, DEVICE_STATE_STARTED); + } } } else if (instance->state == DEVICE_STATE_STARTED) @@ -855,6 +1072,13 @@ LogError("Device '%s' is started but messenger reported unexpected state %d", instance->config->device_id, instance->msgr_state); update_state(instance, DEVICE_STATE_ERROR_MSG); } + + // Codes_SRS_DEVICE_09_133: [If TWIN messenger state is not TWIN_MESSENGER_STATE_STARTED, the device state shall be updated to DEVICE_STATE_ERROR_MSG] + if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STARTED) + { + LogError("Device '%s' is started but TWIN messenger reported unexpected state %d", instance->config->device_id, instance->msgr_state); + update_state(instance, DEVICE_STATE_ERROR_MSG); + } } } @@ -870,9 +1094,15 @@ if (instance->msgr_state != TELEMETRY_MESSENGER_STATE_STOPPED && instance->msgr_state != TELEMETRY_MESSENGER_STATE_ERROR) { - // Codes_SRS_DEVICE_09_050: [If `instance->messenger_handle` state is not STOPPED or ERROR, authentication_do_work shall be invoked] + // Codes_SRS_DEVICE_09_050: [If `instance->messenger_handle` state is not STOPPED or ERROR, telemetry_messenger_do_work shall be invoked] telemetry_messenger_do_work(instance->messenger_handle); } + + if (instance->twin_msgr_state != TWIN_MESSENGER_STATE_STOPPED && instance->twin_msgr_state != TWIN_MESSENGER_STATE_ERROR) + { + // Codes_SRS_DEVICE_09_134: [If `instance->twin_messenger_handle` state is not STOPPED or ERROR, twin_messenger_do_work shall be invoked] + twin_messenger_do_work(instance->twin_messenger_handle); + } } } @@ -1315,3 +1545,122 @@ return result; } + +int device_send_twin_update_async(DEVICE_HANDLE handle, CONSTBUFFER_HANDLE data, DEVICE_SEND_TWIN_UPDATE_COMPLETE_CALLBACK on_send_twin_update_complete_callback, void* context) +{ + int result; + + // Codes_SRS_DEVICE_09_135: [If `handle` or `data` are NULL, device_send_twin_update_async shall return a non-zero result] + if (handle == NULL || data == NULL) + { + LogError("Invalid argument (handle=%p, data=%p)", handle, data); + result = __FAILURE__; + } + else + { + DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)handle; + DEVICE_SEND_TWIN_UPDATE_CONTEXT* twin_ctx; + + // Codes_SRS_DEVICE_09_136: [A structure (`twin_ctx`) shall be created to track the send state of the twin report] + if ((twin_ctx = (DEVICE_SEND_TWIN_UPDATE_CONTEXT*)malloc(sizeof(DEVICE_SEND_TWIN_UPDATE_CONTEXT))) == NULL) + { + // Codes_SRS_DEVICE_09_137: [If `twin_ctx` fails to be created, device_send_twin_update_async shall return a non-zero value] + LogError("Cannot send twin update (failed creating TWIN context)"); + result = __FAILURE__; + } + else + { + twin_ctx->on_send_twin_update_complete_callback = on_send_twin_update_complete_callback; + twin_ctx->context = context; + + // Codes_SRS_DEVICE_09_138: [The twin report shall be sent using twin_messenger_report_state_async, passing `on_report_state_complete_callback` and `twin_ctx`] + if (twin_messenger_report_state_async(instance->twin_messenger_handle, data, on_report_state_complete_callback, (const void*)twin_ctx) != 0) + { + // Codes_SRS_DEVICE_09_139: [If twin_messenger_report_state_async fails, device_send_twin_update_async shall return a non-zero value] + LogError("Cannot send twin update (failed creating TWIN messenger)"); + free(twin_ctx); + result = __FAILURE__; + } + else + { + // Codes_SRS_DEVICE_09_140: [If no failures occur, device_send_twin_update_async shall return 0] + result = RESULT_OK; + } + } + } + + return result; +} + +int device_subscribe_for_twin_updates(DEVICE_HANDLE handle, DEVICE_TWIN_UPDATE_RECEIVED_CALLBACK on_device_twin_update_received_callback, void* context) +{ + int result; + + // Codes_SRS_DEVICE_09_143: [If `handle` or `on_device_twin_update_received_callback` are NULL, device_subscribe_for_twin_updates shall return a non-zero result] + if (handle == NULL || on_device_twin_update_received_callback == NULL) + { + LogError("Invalid argument (handle=%p, on_device_twin_update_received_callback=%p)", handle, on_device_twin_update_received_callback); + result = __FAILURE__; + } + else + { + DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)handle; + + DEVICE_TWIN_UPDATE_RECEIVED_CALLBACK previous_callback = instance->on_device_twin_update_received_callback; + void* previous_context = instance->on_device_twin_update_received_context; + + instance->on_device_twin_update_received_callback = on_device_twin_update_received_callback; + instance->on_device_twin_update_received_context = context; + + // Codes_SRS_DEVICE_09_144: [twin_messenger_subscribe shall be invoked passing `on_twin_state_update_callback`] + if (twin_messenger_subscribe(instance->twin_messenger_handle, on_twin_state_update_callback, (void*)instance) != 0) + { + // Codes_SRS_DEVICE_09_145: [If twin_messenger_subscribe fails, device_subscribe_for_twin_updates shall return a non-zero value] + LogError("Failed subscribing for device twin updates"); + instance->on_device_twin_update_received_callback = previous_callback; + instance->on_device_twin_update_received_context = previous_context; + result = __FAILURE__; + } + else + { + // Codes_SRS_DEVICE_09_146: [If no failures occur, device_subscribe_for_twin_updates shall return 0] + result = RESULT_OK; + } + } + + return result; +} + +int device_unsubscribe_for_twin_updates(DEVICE_HANDLE handle) +{ + int result; + + + // Codes_SRS_DEVICE_09_147: [If `handle` is NULL, device_unsubscribe_for_twin_updates shall return a non-zero result] + if (handle == NULL) + { + LogError("Invalid argument (handle is NULL)"); + result = __FAILURE__; + } + else + { + DEVICE_INSTANCE* instance = (DEVICE_INSTANCE*)handle; + + // Codes_SRS_DEVICE_09_148: [twin_messenger_unsubscribe shall be invoked passing `on_twin_state_update_callback`] + if (twin_messenger_unsubscribe(instance->twin_messenger_handle) != 0) + { + // Codes_SRS_DEVICE_09_149: [If twin_messenger_unsubscribe fails, device_unsubscribe_for_twin_updates shall return a non-zero value] + LogError("Failed unsubscribing for device twin updates"); + result = __FAILURE__; + } + else + { + instance->on_device_twin_update_received_callback = NULL; + instance->on_device_twin_update_received_context = NULL; + // Codes_SRS_DEVICE_09_150: [If no failures occur, device_unsubscribe_for_twin_updates shall return 0] + result = RESULT_OK; + } + } + + return result; +} \ No newline at end of file