Microsoft Azure IoTHub client AMQP transport
Dependents: sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more
This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks
iothubtransport_amqp_telemetry_messenger.c@53:e21e1e88460f, 2018-06-11 (annotated)
- Committer:
- AzureIoTClient
- Date:
- Mon Jun 11 15:38:09 2018 -0700
- Revision:
- 53:e21e1e88460f
- Parent:
- 51:269e65571b39
- Child:
- 54:830550fef7ea
1.2.5
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
AzureIoTClient | 36:f78f9a56869e | 1 | // Copyright (c) Microsoft. All rights reserved. |
AzureIoTClient | 36:f78f9a56869e | 2 | // Licensed under the MIT license. See LICENSE file in the project root for full license information. |
AzureIoTClient | 36:f78f9a56869e | 3 | |
AzureIoTClient | 36:f78f9a56869e | 4 | #include <stdlib.h> |
AzureIoTClient | 36:f78f9a56869e | 5 | #include <stdbool.h> |
AzureIoTClient | 36:f78f9a56869e | 6 | #include "azure_c_shared_utility/optimize_size.h" |
AzureIoTClient | 36:f78f9a56869e | 7 | #include "azure_c_shared_utility/crt_abstractions.h" |
AzureIoTClient | 36:f78f9a56869e | 8 | #include "azure_c_shared_utility/gballoc.h" |
AzureIoTClient | 36:f78f9a56869e | 9 | #include "azure_c_shared_utility/agenttime.h" |
AzureIoTClient | 36:f78f9a56869e | 10 | #include "azure_c_shared_utility/xlogging.h" |
AzureIoTClient | 36:f78f9a56869e | 11 | #include "azure_c_shared_utility/uniqueid.h" |
AzureIoTClient | 36:f78f9a56869e | 12 | #include "azure_c_shared_utility/singlylinkedlist.h" |
AzureIoTClient | 36:f78f9a56869e | 13 | #include "azure_uamqp_c/link.h" |
AzureIoTClient | 36:f78f9a56869e | 14 | #include "azure_uamqp_c/messaging.h" |
AzureIoTClient | 36:f78f9a56869e | 15 | #include "azure_uamqp_c/message_sender.h" |
AzureIoTClient | 36:f78f9a56869e | 16 | #include "azure_uamqp_c/message_receiver.h" |
AzureIoTClient | 53:e21e1e88460f | 17 | #include "internal/uamqp_messaging.h" |
AzureIoTClient | 53:e21e1e88460f | 18 | #include "internal/iothub_client_private.h" |
AzureIoTClient | 36:f78f9a56869e | 19 | #include "iothub_client_version.h" |
AzureIoTClient | 53:e21e1e88460f | 20 | #include "internal/iothubtransport_amqp_telemetry_messenger.h" |
AzureIoTClient | 36:f78f9a56869e | 21 | |
AzureIoTClient | 36:f78f9a56869e | 22 | #define RESULT_OK 0 |
AzureIoTClient | 36:f78f9a56869e | 23 | #define INDEFINITE_TIME ((time_t)(-1)) |
AzureIoTClient | 36:f78f9a56869e | 24 | |
AzureIoTClient | 36:f78f9a56869e | 25 | #define IOTHUB_DEVICES_PATH_FMT "%s/devices/%s" |
AzureIoTClient | 36:f78f9a56869e | 26 | #define IOTHUB_EVENT_SEND_ADDRESS_FMT "amqps://%s/messages/events" |
AzureIoTClient | 36:f78f9a56869e | 27 | #define IOTHUB_MESSAGE_RECEIVE_ADDRESS_FMT "amqps://%s/messages/devicebound" |
AzureIoTClient | 36:f78f9a56869e | 28 | #define MESSAGE_SENDER_LINK_NAME_PREFIX "link-snd" |
AzureIoTClient | 36:f78f9a56869e | 29 | #define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX |
AzureIoTClient | 36:f78f9a56869e | 30 | #define MESSAGE_RECEIVER_LINK_NAME_PREFIX "link-rcv" |
AzureIoTClient | 36:f78f9a56869e | 31 | #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536 |
AzureIoTClient | 36:f78f9a56869e | 32 | #define DEFAULT_EVENT_SEND_RETRY_LIMIT 10 |
AzureIoTClient | 36:f78f9a56869e | 33 | #define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600 |
AzureIoTClient | 36:f78f9a56869e | 34 | #define MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS 300 |
AzureIoTClient | 36:f78f9a56869e | 35 | #define MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS 300 |
AzureIoTClient | 36:f78f9a56869e | 36 | #define UNIQUE_ID_BUFFER_SIZE 37 |
AzureIoTClient | 36:f78f9a56869e | 37 | #define STRING_NULL_TERMINATOR '\0' |
AzureIoTClient | 41:71c01aa3df1a | 38 | |
AzureIoTClient | 41:71c01aa3df1a | 39 | #define AMQP_BATCHING_FORMAT_CODE 0x80013700 |
AzureIoTClient | 36:f78f9a56869e | 40 | |
AzureIoTClient | 36:f78f9a56869e | 41 | typedef struct TELEMETRY_MESSENGER_INSTANCE_TAG |
AzureIoTClient | 36:f78f9a56869e | 42 | { |
AzureIoTClient | 41:71c01aa3df1a | 43 | STRING_HANDLE device_id; |
AzureIoTClient | 36:f78f9a56869e | 44 | STRING_HANDLE product_info; |
AzureIoTClient | 41:71c01aa3df1a | 45 | STRING_HANDLE iothub_host_fqdn; |
AzureIoTClient | 41:71c01aa3df1a | 46 | SINGLYLINKEDLIST_HANDLE waiting_to_send; // List of MESSENGER_SEND_EVENT_CALLER_INFORMATION's |
AzureIoTClient | 41:71c01aa3df1a | 47 | SINGLYLINKEDLIST_HANDLE in_progress_list; // List of MESSENGER_SEND_EVENT_TASK's |
AzureIoTClient | 41:71c01aa3df1a | 48 | TELEMETRY_MESSENGER_STATE state; |
AzureIoTClient | 41:71c01aa3df1a | 49 | |
AzureIoTClient | 41:71c01aa3df1a | 50 | ON_TELEMETRY_MESSENGER_STATE_CHANGED_CALLBACK on_state_changed_callback; |
AzureIoTClient | 41:71c01aa3df1a | 51 | void* on_state_changed_context; |
AzureIoTClient | 36:f78f9a56869e | 52 | |
AzureIoTClient | 41:71c01aa3df1a | 53 | bool receive_messages; |
AzureIoTClient | 41:71c01aa3df1a | 54 | ON_TELEMETRY_MESSENGER_MESSAGE_RECEIVED on_message_received_callback; |
AzureIoTClient | 41:71c01aa3df1a | 55 | void* on_message_received_context; |
AzureIoTClient | 36:f78f9a56869e | 56 | |
AzureIoTClient | 41:71c01aa3df1a | 57 | SESSION_HANDLE session_handle; |
AzureIoTClient | 41:71c01aa3df1a | 58 | LINK_HANDLE sender_link; |
AzureIoTClient | 41:71c01aa3df1a | 59 | MESSAGE_SENDER_HANDLE message_sender; |
AzureIoTClient | 41:71c01aa3df1a | 60 | MESSAGE_SENDER_STATE message_sender_current_state; |
AzureIoTClient | 41:71c01aa3df1a | 61 | MESSAGE_SENDER_STATE message_sender_previous_state; |
AzureIoTClient | 41:71c01aa3df1a | 62 | LINK_HANDLE receiver_link; |
AzureIoTClient | 41:71c01aa3df1a | 63 | MESSAGE_RECEIVER_HANDLE message_receiver; |
AzureIoTClient | 41:71c01aa3df1a | 64 | MESSAGE_RECEIVER_STATE message_receiver_current_state; |
AzureIoTClient | 41:71c01aa3df1a | 65 | MESSAGE_RECEIVER_STATE message_receiver_previous_state; |
AzureIoTClient | 36:f78f9a56869e | 66 | |
AzureIoTClient | 41:71c01aa3df1a | 67 | size_t event_send_retry_limit; |
AzureIoTClient | 41:71c01aa3df1a | 68 | size_t event_send_error_count; |
AzureIoTClient | 41:71c01aa3df1a | 69 | size_t event_send_timeout_secs; |
AzureIoTClient | 41:71c01aa3df1a | 70 | time_t last_message_sender_state_change_time; |
AzureIoTClient | 41:71c01aa3df1a | 71 | time_t last_message_receiver_state_change_time; |
AzureIoTClient | 36:f78f9a56869e | 72 | } TELEMETRY_MESSENGER_INSTANCE; |
AzureIoTClient | 36:f78f9a56869e | 73 | |
AzureIoTClient | 41:71c01aa3df1a | 74 | // MESSENGER_SEND_EVENT_CALLER_INFORMATION corresponds to a message sent from the API, including |
AzureIoTClient | 41:71c01aa3df1a | 75 | // the message and callback information to alert caller about what happened. |
AzureIoTClient | 41:71c01aa3df1a | 76 | typedef struct MESSENGER_SEND_EVENT_CALLER_INFORMATION_TAG |
AzureIoTClient | 41:71c01aa3df1a | 77 | { |
AzureIoTClient | 41:71c01aa3df1a | 78 | IOTHUB_MESSAGE_LIST* message; |
AzureIoTClient | 41:71c01aa3df1a | 79 | ON_TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE on_event_send_complete_callback; |
AzureIoTClient | 41:71c01aa3df1a | 80 | void* context; |
AzureIoTClient | 41:71c01aa3df1a | 81 | } MESSENGER_SEND_EVENT_CALLER_INFORMATION; |
AzureIoTClient | 41:71c01aa3df1a | 82 | |
AzureIoTClient | 41:71c01aa3df1a | 83 | // MESSENGER_SEND_EVENT_TASK interfaces with underlying uAMQP layer. It receives the callback |
AzureIoTClient | 41:71c01aa3df1a | 84 | // from this lower layer which is used to pass the results back to the API via the callback_list. |
AzureIoTClient | 36:f78f9a56869e | 85 | typedef struct MESSENGER_SEND_EVENT_TASK_TAG |
AzureIoTClient | 36:f78f9a56869e | 86 | { |
AzureIoTClient | 41:71c01aa3df1a | 87 | SINGLYLINKEDLIST_HANDLE callback_list; // List of MESSENGER_SEND_EVENT_CALLER_INFORMATION's |
AzureIoTClient | 41:71c01aa3df1a | 88 | time_t send_time; |
AzureIoTClient | 41:71c01aa3df1a | 89 | TELEMETRY_MESSENGER_INSTANCE *messenger; |
AzureIoTClient | 41:71c01aa3df1a | 90 | bool is_timed_out; |
AzureIoTClient | 36:f78f9a56869e | 91 | } MESSENGER_SEND_EVENT_TASK; |
AzureIoTClient | 36:f78f9a56869e | 92 | |
AzureIoTClient | 41:71c01aa3df1a | 93 | |
AzureIoTClient | 36:f78f9a56869e | 94 | // @brief |
AzureIoTClient | 36:f78f9a56869e | 95 | // Evaluates if the ammount of time since start_time is greater or lesser than timeout_in_secs. |
AzureIoTClient | 36:f78f9a56869e | 96 | // @param is_timed_out |
AzureIoTClient | 36:f78f9a56869e | 97 | // Set to 1 if a timeout has been reached, 0 otherwise. Not set if any failure occurs. |
AzureIoTClient | 36:f78f9a56869e | 98 | // @returns |
AzureIoTClient | 36:f78f9a56869e | 99 | // 0 if no failures occur, non-zero otherwise. |
AzureIoTClient | 36:f78f9a56869e | 100 | static int is_timeout_reached(time_t start_time, size_t timeout_in_secs, int *is_timed_out) |
AzureIoTClient | 36:f78f9a56869e | 101 | { |
AzureIoTClient | 41:71c01aa3df1a | 102 | int result; |
AzureIoTClient | 36:f78f9a56869e | 103 | |
AzureIoTClient | 41:71c01aa3df1a | 104 | if (start_time == INDEFINITE_TIME) |
AzureIoTClient | 41:71c01aa3df1a | 105 | { |
AzureIoTClient | 41:71c01aa3df1a | 106 | LogError("Failed to verify timeout (start_time is INDEFINITE)"); |
AzureIoTClient | 41:71c01aa3df1a | 107 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 108 | } |
AzureIoTClient | 41:71c01aa3df1a | 109 | else |
AzureIoTClient | 41:71c01aa3df1a | 110 | { |
AzureIoTClient | 41:71c01aa3df1a | 111 | time_t current_time; |
AzureIoTClient | 36:f78f9a56869e | 112 | |
AzureIoTClient | 41:71c01aa3df1a | 113 | if ((current_time = get_time(NULL)) == INDEFINITE_TIME) |
AzureIoTClient | 41:71c01aa3df1a | 114 | { |
AzureIoTClient | 41:71c01aa3df1a | 115 | LogError("Failed to verify timeout (get_time failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 116 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 117 | } |
AzureIoTClient | 41:71c01aa3df1a | 118 | else |
AzureIoTClient | 41:71c01aa3df1a | 119 | { |
AzureIoTClient | 41:71c01aa3df1a | 120 | if (get_difftime(current_time, start_time) >= timeout_in_secs) |
AzureIoTClient | 41:71c01aa3df1a | 121 | { |
AzureIoTClient | 41:71c01aa3df1a | 122 | *is_timed_out = 1; |
AzureIoTClient | 41:71c01aa3df1a | 123 | } |
AzureIoTClient | 41:71c01aa3df1a | 124 | else |
AzureIoTClient | 41:71c01aa3df1a | 125 | { |
AzureIoTClient | 41:71c01aa3df1a | 126 | *is_timed_out = 0; |
AzureIoTClient | 41:71c01aa3df1a | 127 | } |
AzureIoTClient | 36:f78f9a56869e | 128 | |
AzureIoTClient | 41:71c01aa3df1a | 129 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 130 | } |
AzureIoTClient | 41:71c01aa3df1a | 131 | } |
AzureIoTClient | 36:f78f9a56869e | 132 | |
AzureIoTClient | 41:71c01aa3df1a | 133 | return result; |
AzureIoTClient | 36:f78f9a56869e | 134 | } |
AzureIoTClient | 36:f78f9a56869e | 135 | |
AzureIoTClient | 36:f78f9a56869e | 136 | static STRING_HANDLE create_devices_path(STRING_HANDLE iothub_host_fqdn, STRING_HANDLE device_id) |
AzureIoTClient | 36:f78f9a56869e | 137 | { |
AzureIoTClient | 41:71c01aa3df1a | 138 | STRING_HANDLE devices_path; |
AzureIoTClient | 36:f78f9a56869e | 139 | |
AzureIoTClient | 43:3da2d93bb955 | 140 | if ((devices_path = STRING_new()) == NULL) |
AzureIoTClient | 43:3da2d93bb955 | 141 | { |
AzureIoTClient | 43:3da2d93bb955 | 142 | LogError("Failed creating devices_path (STRING_new failed)"); |
AzureIoTClient | 43:3da2d93bb955 | 143 | } |
AzureIoTClient | 43:3da2d93bb955 | 144 | else |
AzureIoTClient | 41:71c01aa3df1a | 145 | { |
AzureIoTClient | 43:3da2d93bb955 | 146 | const char* iothub_host_fqdn_char_ptr = STRING_c_str(iothub_host_fqdn); |
AzureIoTClient | 43:3da2d93bb955 | 147 | const char* device_id_char_ptr = STRING_c_str(device_id); |
AzureIoTClient | 43:3da2d93bb955 | 148 | if (STRING_sprintf(devices_path, IOTHUB_DEVICES_PATH_FMT, iothub_host_fqdn_char_ptr, device_id_char_ptr) != RESULT_OK) |
AzureIoTClient | 43:3da2d93bb955 | 149 | { |
AzureIoTClient | 43:3da2d93bb955 | 150 | STRING_delete(devices_path); |
AzureIoTClient | 43:3da2d93bb955 | 151 | devices_path = NULL; |
AzureIoTClient | 43:3da2d93bb955 | 152 | LogError("Failed creating devices_path (STRING_sprintf failed)"); |
AzureIoTClient | 43:3da2d93bb955 | 153 | } |
AzureIoTClient | 41:71c01aa3df1a | 154 | } |
AzureIoTClient | 36:f78f9a56869e | 155 | |
AzureIoTClient | 41:71c01aa3df1a | 156 | return devices_path; |
AzureIoTClient | 36:f78f9a56869e | 157 | } |
AzureIoTClient | 36:f78f9a56869e | 158 | |
AzureIoTClient | 36:f78f9a56869e | 159 | static STRING_HANDLE create_event_send_address(STRING_HANDLE devices_path) |
AzureIoTClient | 36:f78f9a56869e | 160 | { |
AzureIoTClient | 41:71c01aa3df1a | 161 | STRING_HANDLE event_send_address; |
AzureIoTClient | 36:f78f9a56869e | 162 | |
AzureIoTClient | 41:71c01aa3df1a | 163 | if ((event_send_address = STRING_new()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 164 | { |
AzureIoTClient | 41:71c01aa3df1a | 165 | LogError("Failed creating the event_send_address (STRING_new failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 166 | } |
AzureIoTClient | 41:71c01aa3df1a | 167 | else |
AzureIoTClient | 41:71c01aa3df1a | 168 | { |
AzureIoTClient | 41:71c01aa3df1a | 169 | const char* devices_path_char_ptr = STRING_c_str(devices_path); |
AzureIoTClient | 41:71c01aa3df1a | 170 | if (STRING_sprintf(event_send_address, IOTHUB_EVENT_SEND_ADDRESS_FMT, devices_path_char_ptr) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 171 | { |
AzureIoTClient | 41:71c01aa3df1a | 172 | STRING_delete(event_send_address); |
AzureIoTClient | 41:71c01aa3df1a | 173 | event_send_address = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 174 | LogError("Failed creating the event_send_address (STRING_sprintf failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 175 | } |
AzureIoTClient | 41:71c01aa3df1a | 176 | } |
AzureIoTClient | 36:f78f9a56869e | 177 | |
AzureIoTClient | 41:71c01aa3df1a | 178 | return event_send_address; |
AzureIoTClient | 36:f78f9a56869e | 179 | } |
AzureIoTClient | 36:f78f9a56869e | 180 | |
AzureIoTClient | 36:f78f9a56869e | 181 | static STRING_HANDLE create_event_sender_source_name(STRING_HANDLE link_name) |
AzureIoTClient | 36:f78f9a56869e | 182 | { |
AzureIoTClient | 41:71c01aa3df1a | 183 | STRING_HANDLE source_name; |
AzureIoTClient | 41:71c01aa3df1a | 184 | |
AzureIoTClient | 41:71c01aa3df1a | 185 | if ((source_name = STRING_new()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 186 | { |
AzureIoTClient | 41:71c01aa3df1a | 187 | LogError("Failed creating the source_name (STRING_new failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 188 | } |
AzureIoTClient | 41:71c01aa3df1a | 189 | else |
AzureIoTClient | 41:71c01aa3df1a | 190 | { |
AzureIoTClient | 41:71c01aa3df1a | 191 | const char* link_name_char_ptr = STRING_c_str(link_name); |
AzureIoTClient | 41:71c01aa3df1a | 192 | if (STRING_sprintf(source_name, "%s-source", link_name_char_ptr) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 193 | { |
AzureIoTClient | 41:71c01aa3df1a | 194 | STRING_delete(source_name); |
AzureIoTClient | 41:71c01aa3df1a | 195 | source_name = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 196 | LogError("Failed creating the source_name (STRING_sprintf failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 197 | } |
AzureIoTClient | 41:71c01aa3df1a | 198 | } |
AzureIoTClient | 36:f78f9a56869e | 199 | |
AzureIoTClient | 41:71c01aa3df1a | 200 | return source_name; |
AzureIoTClient | 36:f78f9a56869e | 201 | } |
AzureIoTClient | 36:f78f9a56869e | 202 | |
AzureIoTClient | 36:f78f9a56869e | 203 | static STRING_HANDLE create_message_receive_address(STRING_HANDLE devices_path) |
AzureIoTClient | 36:f78f9a56869e | 204 | { |
AzureIoTClient | 41:71c01aa3df1a | 205 | STRING_HANDLE message_receive_address; |
AzureIoTClient | 36:f78f9a56869e | 206 | |
AzureIoTClient | 41:71c01aa3df1a | 207 | if ((message_receive_address = STRING_new()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 208 | { |
AzureIoTClient | 41:71c01aa3df1a | 209 | LogError("Failed creating the message_receive_address (STRING_new failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 210 | } |
AzureIoTClient | 41:71c01aa3df1a | 211 | else |
AzureIoTClient | 41:71c01aa3df1a | 212 | { |
AzureIoTClient | 41:71c01aa3df1a | 213 | const char* devices_path_char_ptr = STRING_c_str(devices_path); |
AzureIoTClient | 41:71c01aa3df1a | 214 | if (STRING_sprintf(message_receive_address, IOTHUB_MESSAGE_RECEIVE_ADDRESS_FMT, devices_path_char_ptr) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 215 | { |
AzureIoTClient | 41:71c01aa3df1a | 216 | STRING_delete(message_receive_address); |
AzureIoTClient | 41:71c01aa3df1a | 217 | message_receive_address = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 218 | LogError("Failed creating the message_receive_address (STRING_sprintf failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 219 | } |
AzureIoTClient | 41:71c01aa3df1a | 220 | } |
AzureIoTClient | 36:f78f9a56869e | 221 | |
AzureIoTClient | 41:71c01aa3df1a | 222 | return message_receive_address; |
AzureIoTClient | 36:f78f9a56869e | 223 | } |
AzureIoTClient | 36:f78f9a56869e | 224 | |
AzureIoTClient | 36:f78f9a56869e | 225 | static STRING_HANDLE create_message_receiver_target_name(STRING_HANDLE link_name) |
AzureIoTClient | 36:f78f9a56869e | 226 | { |
AzureIoTClient | 41:71c01aa3df1a | 227 | STRING_HANDLE target_name; |
AzureIoTClient | 36:f78f9a56869e | 228 | |
AzureIoTClient | 41:71c01aa3df1a | 229 | if ((target_name = STRING_new()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 230 | { |
AzureIoTClient | 41:71c01aa3df1a | 231 | LogError("Failed creating the target_name (STRING_new failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 232 | } |
AzureIoTClient | 41:71c01aa3df1a | 233 | else |
AzureIoTClient | 41:71c01aa3df1a | 234 | { |
AzureIoTClient | 41:71c01aa3df1a | 235 | const char* link_name_char_ptr = STRING_c_str(link_name); |
AzureIoTClient | 41:71c01aa3df1a | 236 | if (STRING_sprintf(target_name, "%s-target", link_name_char_ptr) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 237 | { |
AzureIoTClient | 41:71c01aa3df1a | 238 | STRING_delete(target_name); |
AzureIoTClient | 41:71c01aa3df1a | 239 | target_name = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 240 | LogError("Failed creating the target_name (STRING_sprintf failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 241 | } |
AzureIoTClient | 41:71c01aa3df1a | 242 | } |
AzureIoTClient | 36:f78f9a56869e | 243 | |
AzureIoTClient | 41:71c01aa3df1a | 244 | return target_name; |
AzureIoTClient | 36:f78f9a56869e | 245 | } |
AzureIoTClient | 36:f78f9a56869e | 246 | |
AzureIoTClient | 36:f78f9a56869e | 247 | static STRING_HANDLE create_link_name(const char* prefix, const char* infix) |
AzureIoTClient | 36:f78f9a56869e | 248 | { |
AzureIoTClient | 41:71c01aa3df1a | 249 | char* unique_id; |
AzureIoTClient | 41:71c01aa3df1a | 250 | STRING_HANDLE tag = NULL; |
AzureIoTClient | 36:f78f9a56869e | 251 | |
AzureIoTClient | 41:71c01aa3df1a | 252 | if ((unique_id = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 253 | { |
AzureIoTClient | 41:71c01aa3df1a | 254 | LogError("Failed generating an unique tag (malloc failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 255 | } |
AzureIoTClient | 41:71c01aa3df1a | 256 | else |
AzureIoTClient | 41:71c01aa3df1a | 257 | { |
AzureIoTClient | 36:f78f9a56869e | 258 | memset(unique_id, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1); |
AzureIoTClient | 36:f78f9a56869e | 259 | |
AzureIoTClient | 41:71c01aa3df1a | 260 | if (UniqueId_Generate(unique_id, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK) |
AzureIoTClient | 41:71c01aa3df1a | 261 | { |
AzureIoTClient | 41:71c01aa3df1a | 262 | LogError("Failed generating an unique tag (UniqueId_Generate failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 263 | } |
AzureIoTClient | 41:71c01aa3df1a | 264 | else if ((tag = STRING_new()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 265 | { |
AzureIoTClient | 41:71c01aa3df1a | 266 | LogError("Failed generating an unique tag (STRING_new failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 267 | } |
AzureIoTClient | 41:71c01aa3df1a | 268 | else if (STRING_sprintf(tag, "%s-%s-%s", prefix, infix, unique_id) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 269 | { |
AzureIoTClient | 41:71c01aa3df1a | 270 | STRING_delete(tag); |
AzureIoTClient | 41:71c01aa3df1a | 271 | tag = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 272 | LogError("Failed generating an unique tag (STRING_sprintf failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 273 | } |
AzureIoTClient | 36:f78f9a56869e | 274 | |
AzureIoTClient | 41:71c01aa3df1a | 275 | free(unique_id); |
AzureIoTClient | 41:71c01aa3df1a | 276 | } |
AzureIoTClient | 36:f78f9a56869e | 277 | |
AzureIoTClient | 41:71c01aa3df1a | 278 | return tag; |
AzureIoTClient | 36:f78f9a56869e | 279 | } |
AzureIoTClient | 36:f78f9a56869e | 280 | |
AzureIoTClient | 36:f78f9a56869e | 281 | static void update_messenger_state(TELEMETRY_MESSENGER_INSTANCE* instance, TELEMETRY_MESSENGER_STATE new_state) |
AzureIoTClient | 36:f78f9a56869e | 282 | { |
AzureIoTClient | 41:71c01aa3df1a | 283 | if (new_state != instance->state) |
AzureIoTClient | 41:71c01aa3df1a | 284 | { |
AzureIoTClient | 41:71c01aa3df1a | 285 | TELEMETRY_MESSENGER_STATE previous_state = instance->state; |
AzureIoTClient | 41:71c01aa3df1a | 286 | instance->state = new_state; |
AzureIoTClient | 36:f78f9a56869e | 287 | |
AzureIoTClient | 41:71c01aa3df1a | 288 | if (instance->on_state_changed_callback != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 289 | { |
AzureIoTClient | 41:71c01aa3df1a | 290 | instance->on_state_changed_callback(instance->on_state_changed_context, previous_state, new_state); |
AzureIoTClient | 41:71c01aa3df1a | 291 | } |
AzureIoTClient | 41:71c01aa3df1a | 292 | } |
AzureIoTClient | 36:f78f9a56869e | 293 | } |
AzureIoTClient | 36:f78f9a56869e | 294 | |
AzureIoTClient | 36:f78f9a56869e | 295 | static void attach_device_client_type_to_link(LINK_HANDLE link, STRING_HANDLE product_info) |
AzureIoTClient | 36:f78f9a56869e | 296 | { |
AzureIoTClient | 41:71c01aa3df1a | 297 | fields attach_properties; |
AzureIoTClient | 41:71c01aa3df1a | 298 | AMQP_VALUE device_client_type_key_name; |
AzureIoTClient | 41:71c01aa3df1a | 299 | AMQP_VALUE device_client_type_value; |
AzureIoTClient | 41:71c01aa3df1a | 300 | int result; |
AzureIoTClient | 36:f78f9a56869e | 301 | |
AzureIoTClient | 41:71c01aa3df1a | 302 | if ((attach_properties = amqpvalue_create_map()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 303 | { |
AzureIoTClient | 41:71c01aa3df1a | 304 | LogError("Failed to create the map for device client type."); |
AzureIoTClient | 41:71c01aa3df1a | 305 | } |
AzureIoTClient | 41:71c01aa3df1a | 306 | else |
AzureIoTClient | 41:71c01aa3df1a | 307 | { |
AzureIoTClient | 41:71c01aa3df1a | 308 | if ((device_client_type_key_name = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 309 | { |
AzureIoTClient | 41:71c01aa3df1a | 310 | LogError("Failed to create the key name for the device client type."); |
AzureIoTClient | 41:71c01aa3df1a | 311 | } |
AzureIoTClient | 41:71c01aa3df1a | 312 | else |
AzureIoTClient | 41:71c01aa3df1a | 313 | { |
AzureIoTClient | 41:71c01aa3df1a | 314 | if ((device_client_type_value = amqpvalue_create_string(STRING_c_str(product_info))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 315 | { |
AzureIoTClient | 41:71c01aa3df1a | 316 | LogError("Failed to create the key value for the device client type."); |
AzureIoTClient | 41:71c01aa3df1a | 317 | } |
AzureIoTClient | 41:71c01aa3df1a | 318 | else |
AzureIoTClient | 41:71c01aa3df1a | 319 | { |
AzureIoTClient | 41:71c01aa3df1a | 320 | if ((result = amqpvalue_set_map_value(attach_properties, device_client_type_key_name, device_client_type_value)) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 321 | { |
AzureIoTClient | 41:71c01aa3df1a | 322 | LogError("Failed to set the property map for the device client type (error code is: %d)", result); |
AzureIoTClient | 41:71c01aa3df1a | 323 | } |
AzureIoTClient | 41:71c01aa3df1a | 324 | else if ((result = link_set_attach_properties(link, attach_properties)) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 325 | { |
AzureIoTClient | 41:71c01aa3df1a | 326 | LogError("Unable to attach the device client type to the link properties (error code is: %d)", result); |
AzureIoTClient | 41:71c01aa3df1a | 327 | } |
AzureIoTClient | 36:f78f9a56869e | 328 | |
AzureIoTClient | 41:71c01aa3df1a | 329 | amqpvalue_destroy(device_client_type_value); |
AzureIoTClient | 41:71c01aa3df1a | 330 | } |
AzureIoTClient | 36:f78f9a56869e | 331 | |
AzureIoTClient | 41:71c01aa3df1a | 332 | amqpvalue_destroy(device_client_type_key_name); |
AzureIoTClient | 41:71c01aa3df1a | 333 | } |
AzureIoTClient | 36:f78f9a56869e | 334 | |
AzureIoTClient | 41:71c01aa3df1a | 335 | amqpvalue_destroy(attach_properties); |
AzureIoTClient | 41:71c01aa3df1a | 336 | } |
AzureIoTClient | 36:f78f9a56869e | 337 | } |
AzureIoTClient | 36:f78f9a56869e | 338 | |
AzureIoTClient | 36:f78f9a56869e | 339 | static void destroy_event_sender(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 340 | { |
AzureIoTClient | 41:71c01aa3df1a | 341 | if (instance->message_sender != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 342 | { |
AzureIoTClient | 41:71c01aa3df1a | 343 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_060: [`instance->message_sender` shall be destroyed using messagesender_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 344 | messagesender_destroy(instance->message_sender); |
AzureIoTClient | 41:71c01aa3df1a | 345 | instance->message_sender = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 346 | } |
AzureIoTClient | 36:f78f9a56869e | 347 | |
AzureIoTClient | 45:c09fac270e60 | 348 | instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE; |
AzureIoTClient | 45:c09fac270e60 | 349 | instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE; |
AzureIoTClient | 45:c09fac270e60 | 350 | instance->last_message_sender_state_change_time = INDEFINITE_TIME; |
AzureIoTClient | 45:c09fac270e60 | 351 | |
AzureIoTClient | 41:71c01aa3df1a | 352 | if (instance->sender_link != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 353 | { |
AzureIoTClient | 41:71c01aa3df1a | 354 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [`instance->sender_link` shall be destroyed using link_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 355 | link_destroy(instance->sender_link); |
AzureIoTClient | 41:71c01aa3df1a | 356 | instance->sender_link = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 357 | } |
AzureIoTClient | 36:f78f9a56869e | 358 | } |
AzureIoTClient | 36:f78f9a56869e | 359 | |
AzureIoTClient | 36:f78f9a56869e | 360 | static void on_event_sender_state_changed_callback(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state) |
AzureIoTClient | 36:f78f9a56869e | 361 | { |
AzureIoTClient | 41:71c01aa3df1a | 362 | if (context == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 363 | { |
AzureIoTClient | 41:71c01aa3df1a | 364 | LogError("on_event_sender_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored"); |
AzureIoTClient | 41:71c01aa3df1a | 365 | } |
AzureIoTClient | 41:71c01aa3df1a | 366 | else |
AzureIoTClient | 41:71c01aa3df1a | 367 | { |
AzureIoTClient | 41:71c01aa3df1a | 368 | if (new_state != previous_state) |
AzureIoTClient | 41:71c01aa3df1a | 369 | { |
AzureIoTClient | 41:71c01aa3df1a | 370 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)context; |
AzureIoTClient | 41:71c01aa3df1a | 371 | instance->message_sender_current_state = new_state; |
AzureIoTClient | 41:71c01aa3df1a | 372 | instance->message_sender_previous_state = previous_state; |
AzureIoTClient | 41:71c01aa3df1a | 373 | instance->last_message_sender_state_change_time = get_time(NULL); |
AzureIoTClient | 41:71c01aa3df1a | 374 | } |
AzureIoTClient | 41:71c01aa3df1a | 375 | } |
AzureIoTClient | 36:f78f9a56869e | 376 | } |
AzureIoTClient | 36:f78f9a56869e | 377 | |
AzureIoTClient | 36:f78f9a56869e | 378 | static int create_event_sender(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 379 | { |
AzureIoTClient | 41:71c01aa3df1a | 380 | int result; |
AzureIoTClient | 36:f78f9a56869e | 381 | |
AzureIoTClient | 41:71c01aa3df1a | 382 | STRING_HANDLE link_name = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 383 | STRING_HANDLE source_name = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 384 | AMQP_VALUE source = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 385 | AMQP_VALUE target = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 386 | STRING_HANDLE devices_path = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 387 | STRING_HANDLE event_send_address = NULL; |
AzureIoTClient | 36:f78f9a56869e | 388 | |
AzureIoTClient | 41:71c01aa3df1a | 389 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_033: [A variable, named `devices_path`, shall be created concatenating `instance->iothub_host_fqdn`, "/devices/" and `instance->device_id`] |
AzureIoTClient | 41:71c01aa3df1a | 390 | if ((devices_path = create_devices_path(instance->iothub_host_fqdn, instance->device_id)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 391 | { |
AzureIoTClient | 41:71c01aa3df1a | 392 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_034: [If `devices_path` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 393 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 394 | LogError("Failed creating the message sender (failed creating the 'devices_path')"); |
AzureIoTClient | 41:71c01aa3df1a | 395 | } |
AzureIoTClient | 41:71c01aa3df1a | 396 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_035: [A variable, named `event_send_address`, shall be created concatenating "amqps://", `devices_path` and "/messages/events"] |
AzureIoTClient | 41:71c01aa3df1a | 397 | else if ((event_send_address = create_event_send_address(devices_path)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 398 | { |
AzureIoTClient | 41:71c01aa3df1a | 399 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_036: [If `event_send_address` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 400 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 401 | LogError("Failed creating the message sender (failed creating the 'event_send_address')"); |
AzureIoTClient | 41:71c01aa3df1a | 402 | } |
AzureIoTClient | 41:71c01aa3df1a | 403 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_037: [A `link_name` variable shall be created using an unique string label per AMQP session] |
AzureIoTClient | 41:71c01aa3df1a | 404 | else if ((link_name = create_link_name(MESSAGE_SENDER_LINK_NAME_PREFIX, STRING_c_str(instance->device_id))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 405 | { |
AzureIoTClient | 41:71c01aa3df1a | 406 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_038: [If `link_name` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 407 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 408 | LogError("Failed creating the message sender (failed creating an unique link name)"); |
AzureIoTClient | 41:71c01aa3df1a | 409 | } |
AzureIoTClient | 41:71c01aa3df1a | 410 | else if ((source_name = create_event_sender_source_name(link_name)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 411 | { |
AzureIoTClient | 41:71c01aa3df1a | 412 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 413 | LogError("Failed creating the message sender (failed creating an unique source name)"); |
AzureIoTClient | 41:71c01aa3df1a | 414 | } |
AzureIoTClient | 41:71c01aa3df1a | 415 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_039: [A `source` variable shall be created with messaging_create_source() using an unique string label per AMQP session] |
AzureIoTClient | 41:71c01aa3df1a | 416 | else if ((source = messaging_create_source(STRING_c_str(source_name))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 417 | { |
AzureIoTClient | 41:71c01aa3df1a | 418 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_040: [If `source` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 419 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 420 | LogError("Failed creating the message sender (messaging_create_source failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 421 | } |
AzureIoTClient | 41:71c01aa3df1a | 422 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_041: [A `target` variable shall be created with messaging_create_target() using `event_send_address`] |
AzureIoTClient | 41:71c01aa3df1a | 423 | else if ((target = messaging_create_target(STRING_c_str(event_send_address))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 424 | { |
AzureIoTClient | 41:71c01aa3df1a | 425 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_042: [If `target` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 426 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 427 | LogError("Failed creating the message sender (messaging_create_target failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 428 | } |
AzureIoTClient | 41:71c01aa3df1a | 429 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_043: [`instance->sender_link` shall be set using link_create(), passing `instance->session_handle`, `link_name`, "role_sender", `source` and `target` as parameters] |
AzureIoTClient | 41:71c01aa3df1a | 430 | else if ((instance->sender_link = link_create(instance->session_handle, STRING_c_str(link_name), role_sender, source, target)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 431 | { |
AzureIoTClient | 41:71c01aa3df1a | 432 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_044: [If link_create() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 433 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 434 | LogError("Failed creating the message sender (link_create failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 435 | } |
AzureIoTClient | 41:71c01aa3df1a | 436 | else |
AzureIoTClient | 41:71c01aa3df1a | 437 | { |
AzureIoTClient | 41:71c01aa3df1a | 438 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_047: [`instance->sender_link` maximum message size shall be set to UINT64_MAX using link_set_max_message_size()] |
AzureIoTClient | 41:71c01aa3df1a | 439 | if (link_set_max_message_size(instance->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 440 | { |
AzureIoTClient | 41:71c01aa3df1a | 441 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_048: [If link_set_max_message_size() fails, it shall be logged and ignored.] |
AzureIoTClient | 41:71c01aa3df1a | 442 | LogError("Failed setting message sender link max message size."); |
AzureIoTClient | 41:71c01aa3df1a | 443 | } |
AzureIoTClient | 36:f78f9a56869e | 444 | |
AzureIoTClient | 41:71c01aa3df1a | 445 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_049: [`instance->sender_link` should have a property "com.microsoft:client-version" set as `CLIENT_DEVICE_TYPE_PREFIX/IOTHUB_SDK_VERSION`, using amqpvalue_set_map_value() and link_set_attach_properties()] |
AzureIoTClient | 41:71c01aa3df1a | 446 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_050: [If amqpvalue_set_map_value() or link_set_attach_properties() fail, the failure shall be ignored] |
AzureIoTClient | 41:71c01aa3df1a | 447 | attach_device_client_type_to_link(instance->sender_link, instance->product_info); |
AzureIoTClient | 36:f78f9a56869e | 448 | |
AzureIoTClient | 41:71c01aa3df1a | 449 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_051: [`instance->message_sender` shall be created using messagesender_create(), passing the `instance->sender_link` and `on_event_sender_state_changed_callback`] |
AzureIoTClient | 41:71c01aa3df1a | 450 | if ((instance->message_sender = messagesender_create(instance->sender_link, on_event_sender_state_changed_callback, (void*)instance)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 451 | { |
AzureIoTClient | 41:71c01aa3df1a | 452 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_052: [If messagesender_create() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 45:c09fac270e60 | 453 | LogError("Failed creating the message sender (messagesender_create failed)"); |
AzureIoTClient | 45:c09fac270e60 | 454 | destroy_event_sender(instance); |
AzureIoTClient | 41:71c01aa3df1a | 455 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 456 | } |
AzureIoTClient | 41:71c01aa3df1a | 457 | else |
AzureIoTClient | 41:71c01aa3df1a | 458 | { |
AzureIoTClient | 41:71c01aa3df1a | 459 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_053: [`instance->message_sender` shall be opened using messagesender_open()] |
AzureIoTClient | 41:71c01aa3df1a | 460 | if (messagesender_open(instance->message_sender) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 461 | { |
AzureIoTClient | 41:71c01aa3df1a | 462 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_054: [If messagesender_open() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 45:c09fac270e60 | 463 | LogError("Failed opening the AMQP message sender."); |
AzureIoTClient | 45:c09fac270e60 | 464 | destroy_event_sender(instance); |
AzureIoTClient | 41:71c01aa3df1a | 465 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 466 | } |
AzureIoTClient | 41:71c01aa3df1a | 467 | else |
AzureIoTClient | 41:71c01aa3df1a | 468 | { |
AzureIoTClient | 41:71c01aa3df1a | 469 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 470 | } |
AzureIoTClient | 41:71c01aa3df1a | 471 | } |
AzureIoTClient | 41:71c01aa3df1a | 472 | } |
AzureIoTClient | 36:f78f9a56869e | 473 | |
AzureIoTClient | 41:71c01aa3df1a | 474 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_055: [Before returning, telemetry_messenger_do_work() shall release all the temporary memory it has allocated] |
AzureIoTClient | 41:71c01aa3df1a | 475 | if (link_name != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 476 | STRING_delete(link_name); |
AzureIoTClient | 41:71c01aa3df1a | 477 | if (source_name != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 478 | STRING_delete(source_name); |
AzureIoTClient | 41:71c01aa3df1a | 479 | if (source != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 480 | amqpvalue_destroy(source); |
AzureIoTClient | 41:71c01aa3df1a | 481 | if (target != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 482 | amqpvalue_destroy(target); |
AzureIoTClient | 41:71c01aa3df1a | 483 | if (devices_path != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 484 | STRING_delete(devices_path); |
AzureIoTClient | 41:71c01aa3df1a | 485 | if (event_send_address != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 486 | STRING_delete(event_send_address); |
AzureIoTClient | 36:f78f9a56869e | 487 | |
AzureIoTClient | 41:71c01aa3df1a | 488 | return result; |
AzureIoTClient | 36:f78f9a56869e | 489 | } |
AzureIoTClient | 36:f78f9a56869e | 490 | |
AzureIoTClient | 36:f78f9a56869e | 491 | static void destroy_message_receiver(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 492 | { |
AzureIoTClient | 41:71c01aa3df1a | 493 | if (instance->message_receiver != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 494 | { |
AzureIoTClient | 41:71c01aa3df1a | 495 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_061: [`instance->message_receiver` shall be closed using messagereceiver_close()] |
AzureIoTClient | 41:71c01aa3df1a | 496 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_093: [`instance->message_receiver` shall be closed using messagereceiver_close()] |
AzureIoTClient | 41:71c01aa3df1a | 497 | if (messagereceiver_close(instance->message_receiver) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 498 | { |
AzureIoTClient | 41:71c01aa3df1a | 499 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_094: [If messagereceiver_close() fails, it shall be logged and ignored] |
AzureIoTClient | 41:71c01aa3df1a | 500 | LogError("Failed closing the AMQP message receiver (this failure will be ignored)."); |
AzureIoTClient | 41:71c01aa3df1a | 501 | } |
AzureIoTClient | 36:f78f9a56869e | 502 | |
AzureIoTClient | 41:71c01aa3df1a | 503 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 504 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_095: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 505 | messagereceiver_destroy(instance->message_receiver); |
AzureIoTClient | 36:f78f9a56869e | 506 | |
AzureIoTClient | 41:71c01aa3df1a | 507 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_096: [`instance->message_receiver` shall be set to NULL] |
AzureIoTClient | 41:71c01aa3df1a | 508 | instance->message_receiver = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 509 | } |
AzureIoTClient | 36:f78f9a56869e | 510 | |
AzureIoTClient | 45:c09fac270e60 | 511 | instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE; |
AzureIoTClient | 45:c09fac270e60 | 512 | instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE; |
AzureIoTClient | 45:c09fac270e60 | 513 | instance->last_message_receiver_state_change_time = INDEFINITE_TIME; |
AzureIoTClient | 45:c09fac270e60 | 514 | |
AzureIoTClient | 41:71c01aa3df1a | 515 | if (instance->receiver_link != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 516 | { |
AzureIoTClient | 41:71c01aa3df1a | 517 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_064: [`instance->receiver_link` shall be destroyed using link_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 518 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_097: [`instance->receiver_link` shall be destroyed using link_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 519 | link_destroy(instance->receiver_link); |
AzureIoTClient | 41:71c01aa3df1a | 520 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_098: [`instance->receiver_link` shall be set to NULL] |
AzureIoTClient | 41:71c01aa3df1a | 521 | instance->receiver_link = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 522 | } |
AzureIoTClient | 36:f78f9a56869e | 523 | } |
AzureIoTClient | 36:f78f9a56869e | 524 | |
AzureIoTClient | 36:f78f9a56869e | 525 | static void on_message_receiver_state_changed_callback(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state) |
AzureIoTClient | 36:f78f9a56869e | 526 | { |
AzureIoTClient | 41:71c01aa3df1a | 527 | if (context == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 528 | { |
AzureIoTClient | 41:71c01aa3df1a | 529 | LogError("on_message_receiver_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored"); |
AzureIoTClient | 41:71c01aa3df1a | 530 | } |
AzureIoTClient | 41:71c01aa3df1a | 531 | else |
AzureIoTClient | 41:71c01aa3df1a | 532 | { |
AzureIoTClient | 41:71c01aa3df1a | 533 | if (new_state != previous_state) |
AzureIoTClient | 41:71c01aa3df1a | 534 | { |
AzureIoTClient | 41:71c01aa3df1a | 535 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)context; |
AzureIoTClient | 41:71c01aa3df1a | 536 | instance->message_receiver_current_state = new_state; |
AzureIoTClient | 41:71c01aa3df1a | 537 | instance->message_receiver_previous_state = previous_state; |
AzureIoTClient | 41:71c01aa3df1a | 538 | instance->last_message_receiver_state_change_time = get_time(NULL); |
AzureIoTClient | 41:71c01aa3df1a | 539 | } |
AzureIoTClient | 41:71c01aa3df1a | 540 | } |
AzureIoTClient | 36:f78f9a56869e | 541 | } |
AzureIoTClient | 36:f78f9a56869e | 542 | |
AzureIoTClient | 36:f78f9a56869e | 543 | static TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* create_message_disposition_info(TELEMETRY_MESSENGER_INSTANCE* messenger) |
AzureIoTClient | 36:f78f9a56869e | 544 | { |
AzureIoTClient | 41:71c01aa3df1a | 545 | TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* result; |
AzureIoTClient | 36:f78f9a56869e | 546 | |
AzureIoTClient | 41:71c01aa3df1a | 547 | if ((result = (TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO*)malloc(sizeof(TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 548 | { |
AzureIoTClient | 41:71c01aa3df1a | 549 | LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (malloc failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 550 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 551 | } |
AzureIoTClient | 41:71c01aa3df1a | 552 | else |
AzureIoTClient | 41:71c01aa3df1a | 553 | { |
AzureIoTClient | 41:71c01aa3df1a | 554 | delivery_number message_id; |
AzureIoTClient | 36:f78f9a56869e | 555 | |
AzureIoTClient | 41:71c01aa3df1a | 556 | if (messagereceiver_get_received_message_id(messenger->message_receiver, &message_id) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 557 | { |
AzureIoTClient | 41:71c01aa3df1a | 558 | LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_received_message_id failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 559 | free(result); |
AzureIoTClient | 41:71c01aa3df1a | 560 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 561 | } |
AzureIoTClient | 41:71c01aa3df1a | 562 | else |
AzureIoTClient | 41:71c01aa3df1a | 563 | { |
AzureIoTClient | 41:71c01aa3df1a | 564 | const char* link_name; |
AzureIoTClient | 36:f78f9a56869e | 565 | |
AzureIoTClient | 41:71c01aa3df1a | 566 | if (messagereceiver_get_link_name(messenger->message_receiver, &link_name) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 567 | { |
AzureIoTClient | 41:71c01aa3df1a | 568 | LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_link_name failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 569 | free(result); |
AzureIoTClient | 41:71c01aa3df1a | 570 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 571 | } |
AzureIoTClient | 41:71c01aa3df1a | 572 | else if (mallocAndStrcpy_s(&result->source, link_name) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 573 | { |
AzureIoTClient | 41:71c01aa3df1a | 574 | LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (failed copying link name)"); |
AzureIoTClient | 41:71c01aa3df1a | 575 | free(result); |
AzureIoTClient | 41:71c01aa3df1a | 576 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 577 | } |
AzureIoTClient | 41:71c01aa3df1a | 578 | else |
AzureIoTClient | 41:71c01aa3df1a | 579 | { |
AzureIoTClient | 41:71c01aa3df1a | 580 | result->message_id = message_id; |
AzureIoTClient | 41:71c01aa3df1a | 581 | } |
AzureIoTClient | 41:71c01aa3df1a | 582 | } |
AzureIoTClient | 41:71c01aa3df1a | 583 | } |
AzureIoTClient | 36:f78f9a56869e | 584 | |
AzureIoTClient | 41:71c01aa3df1a | 585 | return result; |
AzureIoTClient | 36:f78f9a56869e | 586 | } |
AzureIoTClient | 36:f78f9a56869e | 587 | |
AzureIoTClient | 36:f78f9a56869e | 588 | static void destroy_message_disposition_info(TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info) |
AzureIoTClient | 36:f78f9a56869e | 589 | { |
AzureIoTClient | 41:71c01aa3df1a | 590 | free(disposition_info->source); |
AzureIoTClient | 41:71c01aa3df1a | 591 | free(disposition_info); |
AzureIoTClient | 36:f78f9a56869e | 592 | } |
AzureIoTClient | 36:f78f9a56869e | 593 | |
AzureIoTClient | 36:f78f9a56869e | 594 | static AMQP_VALUE create_uamqp_disposition_result_from(TELEMETRY_MESSENGER_DISPOSITION_RESULT disposition_result) |
AzureIoTClient | 36:f78f9a56869e | 595 | { |
AzureIoTClient | 41:71c01aa3df1a | 596 | AMQP_VALUE uamqp_disposition_result; |
AzureIoTClient | 36:f78f9a56869e | 597 | |
AzureIoTClient | 41:71c01aa3df1a | 598 | if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_NONE) |
AzureIoTClient | 41:71c01aa3df1a | 599 | { |
AzureIoTClient | 41:71c01aa3df1a | 600 | uamqp_disposition_result = NULL; // intentionally not sending an answer. |
AzureIoTClient | 41:71c01aa3df1a | 601 | } |
AzureIoTClient | 41:71c01aa3df1a | 602 | else if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_ACCEPTED) |
AzureIoTClient | 41:71c01aa3df1a | 603 | { |
AzureIoTClient | 41:71c01aa3df1a | 604 | uamqp_disposition_result = messaging_delivery_accepted(); |
AzureIoTClient | 41:71c01aa3df1a | 605 | } |
AzureIoTClient | 41:71c01aa3df1a | 606 | else if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_RELEASED) |
AzureIoTClient | 41:71c01aa3df1a | 607 | { |
AzureIoTClient | 41:71c01aa3df1a | 608 | uamqp_disposition_result = messaging_delivery_released(); |
AzureIoTClient | 41:71c01aa3df1a | 609 | } |
AzureIoTClient | 41:71c01aa3df1a | 610 | else if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_REJECTED) |
AzureIoTClient | 41:71c01aa3df1a | 611 | { |
AzureIoTClient | 41:71c01aa3df1a | 612 | uamqp_disposition_result = messaging_delivery_rejected("Rejected by application", "Rejected by application"); |
AzureIoTClient | 41:71c01aa3df1a | 613 | } |
AzureIoTClient | 41:71c01aa3df1a | 614 | else |
AzureIoTClient | 41:71c01aa3df1a | 615 | { |
AzureIoTClient | 41:71c01aa3df1a | 616 | LogError("Failed creating a disposition result for messagereceiver (result %d is not supported)", disposition_result); |
AzureIoTClient | 41:71c01aa3df1a | 617 | uamqp_disposition_result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 618 | } |
AzureIoTClient | 36:f78f9a56869e | 619 | |
AzureIoTClient | 41:71c01aa3df1a | 620 | return uamqp_disposition_result; |
AzureIoTClient | 36:f78f9a56869e | 621 | } |
AzureIoTClient | 36:f78f9a56869e | 622 | |
AzureIoTClient | 36:f78f9a56869e | 623 | static AMQP_VALUE on_message_received_internal_callback(const void* context, MESSAGE_HANDLE message) |
AzureIoTClient | 36:f78f9a56869e | 624 | { |
AzureIoTClient | 41:71c01aa3df1a | 625 | AMQP_VALUE result; |
AzureIoTClient | 41:71c01aa3df1a | 626 | int api_call_result; |
AzureIoTClient | 41:71c01aa3df1a | 627 | IOTHUB_MESSAGE_HANDLE iothub_message; |
AzureIoTClient | 36:f78f9a56869e | 628 | |
AzureIoTClient | 41:71c01aa3df1a | 629 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_121: [An IOTHUB_MESSAGE_HANDLE shall be obtained from MESSAGE_HANDLE using message_create_IoTHubMessage_from_uamqp_message()] |
AzureIoTClient | 41:71c01aa3df1a | 630 | if ((api_call_result = message_create_IoTHubMessage_from_uamqp_message(message, &iothub_message)) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 631 | { |
AzureIoTClient | 41:71c01aa3df1a | 632 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_122: [If message_create_IoTHubMessage_from_uamqp_message() fails, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()] |
AzureIoTClient | 41:71c01aa3df1a | 633 | result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading AMQP message"); |
AzureIoTClient | 36:f78f9a56869e | 634 | |
AzureIoTClient | 41:71c01aa3df1a | 635 | LogError("on_message_received_internal_callback failed (message_create_IoTHubMessage_from_uamqp_message; error = %d).", api_call_result); |
AzureIoTClient | 41:71c01aa3df1a | 636 | } |
AzureIoTClient | 41:71c01aa3df1a | 637 | else |
AzureIoTClient | 41:71c01aa3df1a | 638 | { |
AzureIoTClient | 41:71c01aa3df1a | 639 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)context; |
AzureIoTClient | 41:71c01aa3df1a | 640 | TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* message_disposition_info; |
AzureIoTClient | 36:f78f9a56869e | 641 | |
AzureIoTClient | 41:71c01aa3df1a | 642 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_186: [A TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be created containing the source link name and message delivery ID] |
AzureIoTClient | 41:71c01aa3df1a | 643 | if ((message_disposition_info = create_message_disposition_info(instance)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 644 | { |
AzureIoTClient | 41:71c01aa3df1a | 645 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_187: [**If the TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance fails to be created, on_message_received_internal_callback shall return messaging_delivery_released()] |
AzureIoTClient | 41:71c01aa3df1a | 646 | LogError("on_message_received_internal_callback failed (failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO)."); |
AzureIoTClient | 41:71c01aa3df1a | 647 | result = messaging_delivery_released(); |
AzureIoTClient | 41:71c01aa3df1a | 648 | } |
AzureIoTClient | 41:71c01aa3df1a | 649 | else |
AzureIoTClient | 41:71c01aa3df1a | 650 | { |
AzureIoTClient | 41:71c01aa3df1a | 651 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_123: [`instance->on_message_received_callback` shall be invoked passing the IOTHUB_MESSAGE_HANDLE and TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance] |
AzureIoTClient | 41:71c01aa3df1a | 652 | TELEMETRY_MESSENGER_DISPOSITION_RESULT disposition_result = instance->on_message_received_callback(iothub_message, message_disposition_info, instance->on_message_received_context); |
AzureIoTClient | 36:f78f9a56869e | 653 | |
AzureIoTClient | 41:71c01aa3df1a | 654 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_188: [The memory allocated for the TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be released] |
AzureIoTClient | 41:71c01aa3df1a | 655 | destroy_message_disposition_info(message_disposition_info); |
AzureIoTClient | 36:f78f9a56869e | 656 | |
AzureIoTClient | 41:71c01aa3df1a | 657 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_125: [If `instance->on_message_received_callback` returns TELEMETRY_MESSENGER_DISPOSITION_RESULT_ACCEPTED, on_message_received_internal_callback shall return the result of messaging_delivery_accepted()] |
AzureIoTClient | 41:71c01aa3df1a | 658 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_126: [If `instance->on_message_received_callback` returns TELEMETRY_MESSENGER_DISPOSITION_RESULT_RELEASED, on_message_received_internal_callback shall return the result of messaging_delivery_released()] |
AzureIoTClient | 41:71c01aa3df1a | 659 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_127: [If `instance->on_message_received_callback` returns TELEMETRY_MESSENGER_DISPOSITION_RESULT_REJECTED, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()] |
AzureIoTClient | 41:71c01aa3df1a | 660 | result = create_uamqp_disposition_result_from(disposition_result); |
AzureIoTClient | 41:71c01aa3df1a | 661 | } |
AzureIoTClient | 41:71c01aa3df1a | 662 | } |
AzureIoTClient | 36:f78f9a56869e | 663 | |
AzureIoTClient | 41:71c01aa3df1a | 664 | return result; |
AzureIoTClient | 36:f78f9a56869e | 665 | } |
AzureIoTClient | 36:f78f9a56869e | 666 | |
AzureIoTClient | 36:f78f9a56869e | 667 | static int create_message_receiver(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 668 | { |
AzureIoTClient | 41:71c01aa3df1a | 669 | int result; |
AzureIoTClient | 36:f78f9a56869e | 670 | |
AzureIoTClient | 41:71c01aa3df1a | 671 | STRING_HANDLE devices_path = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 672 | STRING_HANDLE message_receive_address = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 673 | STRING_HANDLE link_name = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 674 | STRING_HANDLE target_name = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 675 | AMQP_VALUE source = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 676 | AMQP_VALUE target = NULL; |
AzureIoTClient | 36:f78f9a56869e | 677 | |
AzureIoTClient | 41:71c01aa3df1a | 678 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_068: [A variable, named `devices_path`, shall be created concatenating `instance->iothub_host_fqdn`, "/devices/" and `instance->device_id`] |
AzureIoTClient | 41:71c01aa3df1a | 679 | if ((devices_path = create_devices_path(instance->iothub_host_fqdn, instance->device_id)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 680 | { |
AzureIoTClient | 41:71c01aa3df1a | 681 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_069: [If `devices_path` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 682 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 683 | LogError("Failed creating the message receiver (failed creating the 'devices_path')"); |
AzureIoTClient | 41:71c01aa3df1a | 684 | } |
AzureIoTClient | 41:71c01aa3df1a | 685 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_070: [A variable, named `message_receive_address`, shall be created concatenating "amqps://", `devices_path` and "/messages/devicebound"] |
AzureIoTClient | 41:71c01aa3df1a | 686 | else if ((message_receive_address = create_message_receive_address(devices_path)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 687 | { |
AzureIoTClient | 41:71c01aa3df1a | 688 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_071: [If `message_receive_address` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 689 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 690 | LogError("Failed creating the message receiver (failed creating the 'message_receive_address')"); |
AzureIoTClient | 41:71c01aa3df1a | 691 | } |
AzureIoTClient | 41:71c01aa3df1a | 692 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_072: [A `link_name` variable shall be created using an unique string label per AMQP session] |
AzureIoTClient | 41:71c01aa3df1a | 693 | else if ((link_name = create_link_name(MESSAGE_RECEIVER_LINK_NAME_PREFIX, STRING_c_str(instance->device_id))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 694 | { |
AzureIoTClient | 41:71c01aa3df1a | 695 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_073: [If `link_name` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 696 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 697 | LogError("Failed creating the message receiver (failed creating an unique link name)"); |
AzureIoTClient | 41:71c01aa3df1a | 698 | } |
AzureIoTClient | 41:71c01aa3df1a | 699 | else if ((target_name = create_message_receiver_target_name(link_name)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 700 | { |
AzureIoTClient | 41:71c01aa3df1a | 701 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 702 | LogError("Failed creating the message receiver (failed creating an unique target name)"); |
AzureIoTClient | 41:71c01aa3df1a | 703 | } |
AzureIoTClient | 41:71c01aa3df1a | 704 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_074: [A `target` variable shall be created with messaging_create_target() using an unique string label per AMQP session] |
AzureIoTClient | 41:71c01aa3df1a | 705 | else if ((target = messaging_create_target(STRING_c_str(target_name))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 706 | { |
AzureIoTClient | 41:71c01aa3df1a | 707 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_075: [If `target` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 708 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 709 | LogError("Failed creating the message receiver (messaging_create_target failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 710 | } |
AzureIoTClient | 41:71c01aa3df1a | 711 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_076: [A `source` variable shall be created with messaging_create_source() using `message_receive_address`] |
AzureIoTClient | 41:71c01aa3df1a | 712 | else if ((source = messaging_create_source(STRING_c_str(message_receive_address))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 713 | { |
AzureIoTClient | 41:71c01aa3df1a | 714 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_077: [If `source` fails to be created, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 715 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 716 | LogError("Failed creating the message receiver (messaging_create_source failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 717 | } |
AzureIoTClient | 41:71c01aa3df1a | 718 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_078: [`instance->receiver_link` shall be set using link_create(), passing `instance->session_handle`, `link_name`, "role_receiver", `source` and `target` as parameters] |
AzureIoTClient | 41:71c01aa3df1a | 719 | else if ((instance->receiver_link = link_create(instance->session_handle, STRING_c_str(link_name), role_receiver, source, target)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 720 | { |
AzureIoTClient | 41:71c01aa3df1a | 721 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_079: [If link_create() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 722 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 723 | LogError("Failed creating the message receiver (link_create failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 724 | } |
AzureIoTClient | 41:71c01aa3df1a | 725 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_080: [`instance->receiver_link` settle mode shall be set to "receiver_settle_mode_first" using link_set_rcv_settle_mode(), ] |
AzureIoTClient | 41:71c01aa3df1a | 726 | else if (link_set_rcv_settle_mode(instance->receiver_link, receiver_settle_mode_first) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 727 | { |
AzureIoTClient | 41:71c01aa3df1a | 728 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_081: [If link_set_rcv_settle_mode() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 729 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 730 | LogError("Failed creating the message receiver (link_set_rcv_settle_mode failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 731 | } |
AzureIoTClient | 41:71c01aa3df1a | 732 | else |
AzureIoTClient | 41:71c01aa3df1a | 733 | { |
AzureIoTClient | 41:71c01aa3df1a | 734 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_082: [`instance->receiver_link` maximum message size shall be set to 65536 using link_set_max_message_size()] |
AzureIoTClient | 41:71c01aa3df1a | 735 | if (link_set_max_message_size(instance->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 736 | { |
AzureIoTClient | 41:71c01aa3df1a | 737 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_083: [If link_set_max_message_size() fails, it shall be logged and ignored.] |
AzureIoTClient | 41:71c01aa3df1a | 738 | LogError("Failed setting message receiver link max message size."); |
AzureIoTClient | 41:71c01aa3df1a | 739 | } |
AzureIoTClient | 36:f78f9a56869e | 740 | |
AzureIoTClient | 41:71c01aa3df1a | 741 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_084: [`instance->receiver_link` should have a property "com.microsoft:client-version" set as `CLIENT_DEVICE_TYPE_PREFIX/IOTHUB_SDK_VERSION`, using amqpvalue_set_map_value() and link_set_attach_properties()] |
AzureIoTClient | 41:71c01aa3df1a | 742 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_085: [If amqpvalue_set_map_value() or link_set_attach_properties() fail, the failure shall be ignored] |
AzureIoTClient | 41:71c01aa3df1a | 743 | attach_device_client_type_to_link(instance->receiver_link, instance->product_info); |
AzureIoTClient | 36:f78f9a56869e | 744 | |
AzureIoTClient | 41:71c01aa3df1a | 745 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_086: [`instance->message_receiver` shall be created using messagereceiver_create(), passing the `instance->receiver_link` and `on_messagereceiver_state_changed_callback`] |
AzureIoTClient | 41:71c01aa3df1a | 746 | if ((instance->message_receiver = messagereceiver_create(instance->receiver_link, on_message_receiver_state_changed_callback, (void*)instance)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 747 | { |
AzureIoTClient | 41:71c01aa3df1a | 748 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_087: [If messagereceiver_create() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 45:c09fac270e60 | 749 | LogError("Failed creating the message receiver (messagereceiver_create failed)"); |
AzureIoTClient | 45:c09fac270e60 | 750 | destroy_message_receiver(instance); |
AzureIoTClient | 41:71c01aa3df1a | 751 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 752 | } |
AzureIoTClient | 41:71c01aa3df1a | 753 | else |
AzureIoTClient | 41:71c01aa3df1a | 754 | { |
AzureIoTClient | 41:71c01aa3df1a | 755 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_088: [`instance->message_receiver` shall be opened using messagereceiver_open(), passing `on_message_received_internal_callback`] |
AzureIoTClient | 41:71c01aa3df1a | 756 | if (messagereceiver_open(instance->message_receiver, on_message_received_internal_callback, (void*)instance) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 757 | { |
AzureIoTClient | 41:71c01aa3df1a | 758 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_089: [If messagereceiver_open() fails, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 45:c09fac270e60 | 759 | LogError("Failed opening the AMQP message receiver."); |
AzureIoTClient | 45:c09fac270e60 | 760 | destroy_message_receiver(instance); |
AzureIoTClient | 41:71c01aa3df1a | 761 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 762 | } |
AzureIoTClient | 41:71c01aa3df1a | 763 | else |
AzureIoTClient | 41:71c01aa3df1a | 764 | { |
AzureIoTClient | 41:71c01aa3df1a | 765 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 766 | } |
AzureIoTClient | 41:71c01aa3df1a | 767 | } |
AzureIoTClient | 41:71c01aa3df1a | 768 | } |
AzureIoTClient | 36:f78f9a56869e | 769 | |
AzureIoTClient | 41:71c01aa3df1a | 770 | if (devices_path != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 771 | STRING_delete(devices_path); |
AzureIoTClient | 41:71c01aa3df1a | 772 | if (message_receive_address != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 773 | STRING_delete(message_receive_address); |
AzureIoTClient | 41:71c01aa3df1a | 774 | if (link_name != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 775 | STRING_delete(link_name); |
AzureIoTClient | 41:71c01aa3df1a | 776 | if (target_name != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 777 | STRING_delete(target_name); |
AzureIoTClient | 41:71c01aa3df1a | 778 | if (source != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 779 | amqpvalue_destroy(source); |
AzureIoTClient | 41:71c01aa3df1a | 780 | if (target != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 781 | amqpvalue_destroy(target); |
AzureIoTClient | 36:f78f9a56869e | 782 | |
AzureIoTClient | 41:71c01aa3df1a | 783 | return result; |
AzureIoTClient | 36:f78f9a56869e | 784 | } |
AzureIoTClient | 36:f78f9a56869e | 785 | |
AzureIoTClient | 36:f78f9a56869e | 786 | static int move_event_to_in_progress_list(MESSENGER_SEND_EVENT_TASK* task) |
AzureIoTClient | 36:f78f9a56869e | 787 | { |
AzureIoTClient | 41:71c01aa3df1a | 788 | int result; |
AzureIoTClient | 36:f78f9a56869e | 789 | |
AzureIoTClient | 41:71c01aa3df1a | 790 | if (singlylinkedlist_add(task->messenger->in_progress_list, (void*)task) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 791 | { |
AzureIoTClient | 41:71c01aa3df1a | 792 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 793 | LogError("Failed moving event to in_progress list (singlylinkedlist_add failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 794 | } |
AzureIoTClient | 41:71c01aa3df1a | 795 | else |
AzureIoTClient | 41:71c01aa3df1a | 796 | { |
AzureIoTClient | 41:71c01aa3df1a | 797 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 798 | } |
AzureIoTClient | 36:f78f9a56869e | 799 | |
AzureIoTClient | 41:71c01aa3df1a | 800 | return result; |
AzureIoTClient | 36:f78f9a56869e | 801 | } |
AzureIoTClient | 36:f78f9a56869e | 802 | |
AzureIoTClient | 36:f78f9a56869e | 803 | static bool find_MESSENGER_SEND_EVENT_TASK_on_list(LIST_ITEM_HANDLE list_item, const void* match_context) |
AzureIoTClient | 36:f78f9a56869e | 804 | { |
AzureIoTClient | 41:71c01aa3df1a | 805 | return (list_item != NULL && singlylinkedlist_item_get_value(list_item) == match_context); |
AzureIoTClient | 36:f78f9a56869e | 806 | } |
AzureIoTClient | 36:f78f9a56869e | 807 | |
AzureIoTClient | 36:f78f9a56869e | 808 | static void remove_event_from_in_progress_list(MESSENGER_SEND_EVENT_TASK *task) |
AzureIoTClient | 36:f78f9a56869e | 809 | { |
AzureIoTClient | 41:71c01aa3df1a | 810 | LIST_ITEM_HANDLE list_item = singlylinkedlist_find(task->messenger->in_progress_list, find_MESSENGER_SEND_EVENT_TASK_on_list, (void*)task); |
AzureIoTClient | 36:f78f9a56869e | 811 | |
AzureIoTClient | 41:71c01aa3df1a | 812 | if (list_item != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 813 | { |
AzureIoTClient | 41:71c01aa3df1a | 814 | if (singlylinkedlist_remove(task->messenger->in_progress_list, list_item) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 815 | { |
AzureIoTClient | 41:71c01aa3df1a | 816 | LogError("Failed removing event from in_progress list (singlylinkedlist_remove failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 817 | } |
AzureIoTClient | 41:71c01aa3df1a | 818 | } |
AzureIoTClient | 36:f78f9a56869e | 819 | } |
AzureIoTClient | 36:f78f9a56869e | 820 | |
AzureIoTClient | 51:269e65571b39 | 821 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_201: [Freeing a `task` will free callback items associated with it and free the data itself] |
AzureIoTClient | 51:269e65571b39 | 822 | static void free_task(MESSENGER_SEND_EVENT_TASK* task) |
AzureIoTClient | 51:269e65571b39 | 823 | { |
AzureIoTClient | 51:269e65571b39 | 824 | LIST_ITEM_HANDLE list_node; |
AzureIoTClient | 51:269e65571b39 | 825 | |
AzureIoTClient | 51:269e65571b39 | 826 | if (NULL != task->callback_list) |
AzureIoTClient | 51:269e65571b39 | 827 | { |
AzureIoTClient | 51:269e65571b39 | 828 | while ((list_node = singlylinkedlist_get_head_item(task->callback_list)) != NULL) |
AzureIoTClient | 51:269e65571b39 | 829 | { |
AzureIoTClient | 51:269e65571b39 | 830 | MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_node); |
AzureIoTClient | 51:269e65571b39 | 831 | (void)singlylinkedlist_remove(task->callback_list, list_node); |
AzureIoTClient | 51:269e65571b39 | 832 | free(caller_info); |
AzureIoTClient | 51:269e65571b39 | 833 | } |
AzureIoTClient | 51:269e65571b39 | 834 | singlylinkedlist_destroy(task->callback_list); |
AzureIoTClient | 51:269e65571b39 | 835 | } |
AzureIoTClient | 51:269e65571b39 | 836 | |
AzureIoTClient | 51:269e65571b39 | 837 | free(task); |
AzureIoTClient | 51:269e65571b39 | 838 | } |
AzureIoTClient | 51:269e65571b39 | 839 | |
AzureIoTClient | 36:f78f9a56869e | 840 | static int copy_events_to_list(SINGLYLINKEDLIST_HANDLE from_list, SINGLYLINKEDLIST_HANDLE to_list) |
AzureIoTClient | 36:f78f9a56869e | 841 | { |
AzureIoTClient | 41:71c01aa3df1a | 842 | int result; |
AzureIoTClient | 41:71c01aa3df1a | 843 | LIST_ITEM_HANDLE list_item; |
AzureIoTClient | 41:71c01aa3df1a | 844 | |
AzureIoTClient | 41:71c01aa3df1a | 845 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 846 | list_item = singlylinkedlist_get_head_item(from_list); |
AzureIoTClient | 41:71c01aa3df1a | 847 | |
AzureIoTClient | 41:71c01aa3df1a | 848 | while (list_item != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 849 | { |
AzureIoTClient | 41:71c01aa3df1a | 850 | MESSENGER_SEND_EVENT_TASK *task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_item); |
AzureIoTClient | 36:f78f9a56869e | 851 | |
AzureIoTClient | 41:71c01aa3df1a | 852 | if (singlylinkedlist_add(to_list, task) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 853 | { |
AzureIoTClient | 41:71c01aa3df1a | 854 | LogError("Failed copying event to destination list (singlylinkedlist_add failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 855 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 856 | break; |
AzureIoTClient | 41:71c01aa3df1a | 857 | } |
AzureIoTClient | 41:71c01aa3df1a | 858 | else |
AzureIoTClient | 41:71c01aa3df1a | 859 | { |
AzureIoTClient | 41:71c01aa3df1a | 860 | list_item = singlylinkedlist_get_next_item(list_item); |
AzureIoTClient | 41:71c01aa3df1a | 861 | } |
AzureIoTClient | 41:71c01aa3df1a | 862 | } |
AzureIoTClient | 36:f78f9a56869e | 863 | |
AzureIoTClient | 41:71c01aa3df1a | 864 | return result; |
AzureIoTClient | 41:71c01aa3df1a | 865 | } |
AzureIoTClient | 41:71c01aa3df1a | 866 | |
AzureIoTClient | 41:71c01aa3df1a | 867 | static int copy_events_from_in_progress_to_waiting_list(TELEMETRY_MESSENGER_INSTANCE* instance, SINGLYLINKEDLIST_HANDLE to_list) |
AzureIoTClient | 41:71c01aa3df1a | 868 | { |
AzureIoTClient | 41:71c01aa3df1a | 869 | int result; |
AzureIoTClient | 41:71c01aa3df1a | 870 | LIST_ITEM_HANDLE list_task_item; |
AzureIoTClient | 51:269e65571b39 | 871 | LIST_ITEM_HANDLE list_task_item_next; |
AzureIoTClient | 36:f78f9a56869e | 872 | |
AzureIoTClient | 41:71c01aa3df1a | 873 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 874 | list_task_item = singlylinkedlist_get_head_item(instance->in_progress_list); |
AzureIoTClient | 36:f78f9a56869e | 875 | |
AzureIoTClient | 41:71c01aa3df1a | 876 | while (list_task_item != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 877 | { |
AzureIoTClient | 41:71c01aa3df1a | 878 | MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_task_item); |
AzureIoTClient | 41:71c01aa3df1a | 879 | |
AzureIoTClient | 41:71c01aa3df1a | 880 | LIST_ITEM_HANDLE list_caller_item; |
AzureIoTClient | 41:71c01aa3df1a | 881 | |
AzureIoTClient | 41:71c01aa3df1a | 882 | list_caller_item = singlylinkedlist_get_head_item(task->callback_list); |
AzureIoTClient | 41:71c01aa3df1a | 883 | |
AzureIoTClient | 41:71c01aa3df1a | 884 | while (list_caller_item != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 885 | { |
AzureIoTClient | 41:71c01aa3df1a | 886 | MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_information = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_caller_item); |
AzureIoTClient | 41:71c01aa3df1a | 887 | |
AzureIoTClient | 41:71c01aa3df1a | 888 | if (singlylinkedlist_add(to_list, caller_information) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 889 | { |
AzureIoTClient | 41:71c01aa3df1a | 890 | LogError("Failed copying event to destination list (singlylinkedlist_add failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 891 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 892 | break; |
AzureIoTClient | 41:71c01aa3df1a | 893 | } |
AzureIoTClient | 41:71c01aa3df1a | 894 | |
AzureIoTClient | 41:71c01aa3df1a | 895 | list_caller_item = singlylinkedlist_get_next_item(list_caller_item); |
AzureIoTClient | 41:71c01aa3df1a | 896 | } |
AzureIoTClient | 41:71c01aa3df1a | 897 | |
AzureIoTClient | 51:269e65571b39 | 898 | list_task_item_next = singlylinkedlist_get_next_item(list_task_item); |
AzureIoTClient | 51:269e65571b39 | 899 | |
AzureIoTClient | 51:269e65571b39 | 900 | singlylinkedlist_destroy(task->callback_list); |
AzureIoTClient | 51:269e65571b39 | 901 | task->callback_list = NULL; |
AzureIoTClient | 51:269e65571b39 | 902 | |
AzureIoTClient | 51:269e65571b39 | 903 | free_task(task); |
AzureIoTClient | 51:269e65571b39 | 904 | singlylinkedlist_remove(instance->in_progress_list, list_task_item); |
AzureIoTClient | 51:269e65571b39 | 905 | list_task_item = list_task_item_next; |
AzureIoTClient | 41:71c01aa3df1a | 906 | } |
AzureIoTClient | 41:71c01aa3df1a | 907 | |
AzureIoTClient | 41:71c01aa3df1a | 908 | return result; |
AzureIoTClient | 36:f78f9a56869e | 909 | } |
AzureIoTClient | 36:f78f9a56869e | 910 | |
AzureIoTClient | 41:71c01aa3df1a | 911 | |
AzureIoTClient | 36:f78f9a56869e | 912 | static int move_events_to_wait_to_send_list(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 913 | { |
AzureIoTClient | 41:71c01aa3df1a | 914 | int result; |
AzureIoTClient | 41:71c01aa3df1a | 915 | LIST_ITEM_HANDLE list_item; |
AzureIoTClient | 41:71c01aa3df1a | 916 | |
AzureIoTClient | 41:71c01aa3df1a | 917 | if ((list_item = singlylinkedlist_get_head_item(instance->in_progress_list)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 918 | { |
AzureIoTClient | 41:71c01aa3df1a | 919 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 920 | } |
AzureIoTClient | 41:71c01aa3df1a | 921 | else |
AzureIoTClient | 41:71c01aa3df1a | 922 | { |
AzureIoTClient | 41:71c01aa3df1a | 923 | SINGLYLINKEDLIST_HANDLE new_wait_to_send_list; |
AzureIoTClient | 36:f78f9a56869e | 924 | |
AzureIoTClient | 41:71c01aa3df1a | 925 | if ((new_wait_to_send_list = singlylinkedlist_create()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 926 | { |
AzureIoTClient | 41:71c01aa3df1a | 927 | LogError("Failed moving events back to wait_to_send list (singlylinkedlist_create failed to create new wait_to_send_list)"); |
AzureIoTClient | 41:71c01aa3df1a | 928 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 929 | } |
AzureIoTClient | 41:71c01aa3df1a | 930 | else |
AzureIoTClient | 41:71c01aa3df1a | 931 | { |
AzureIoTClient | 41:71c01aa3df1a | 932 | SINGLYLINKEDLIST_HANDLE new_in_progress_list; |
AzureIoTClient | 41:71c01aa3df1a | 933 | |
AzureIoTClient | 41:71c01aa3df1a | 934 | if (copy_events_from_in_progress_to_waiting_list(instance, new_wait_to_send_list) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 935 | { |
AzureIoTClient | 41:71c01aa3df1a | 936 | LogError("Failed moving events back to wait_to_send list (failed adding in_progress_list items to new_wait_to_send_list)"); |
AzureIoTClient | 41:71c01aa3df1a | 937 | singlylinkedlist_destroy(new_wait_to_send_list); |
AzureIoTClient | 41:71c01aa3df1a | 938 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 939 | } |
AzureIoTClient | 41:71c01aa3df1a | 940 | else if (copy_events_to_list(instance->waiting_to_send, new_wait_to_send_list) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 941 | { |
AzureIoTClient | 41:71c01aa3df1a | 942 | LogError("Failed moving events back to wait_to_send list (failed adding wait_to_send items to new_wait_to_send_list)"); |
AzureIoTClient | 41:71c01aa3df1a | 943 | singlylinkedlist_destroy(new_wait_to_send_list); |
AzureIoTClient | 41:71c01aa3df1a | 944 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 945 | } |
AzureIoTClient | 41:71c01aa3df1a | 946 | else if ((new_in_progress_list = singlylinkedlist_create()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 947 | { |
AzureIoTClient | 41:71c01aa3df1a | 948 | LogError("Failed moving events back to wait_to_send list (singlylinkedlist_create failed to create new in_progress_list)"); |
AzureIoTClient | 41:71c01aa3df1a | 949 | singlylinkedlist_destroy(new_wait_to_send_list); |
AzureIoTClient | 41:71c01aa3df1a | 950 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 951 | } |
AzureIoTClient | 41:71c01aa3df1a | 952 | else |
AzureIoTClient | 41:71c01aa3df1a | 953 | { |
AzureIoTClient | 41:71c01aa3df1a | 954 | singlylinkedlist_destroy(instance->waiting_to_send); |
AzureIoTClient | 41:71c01aa3df1a | 955 | singlylinkedlist_destroy(instance->in_progress_list); |
AzureIoTClient | 41:71c01aa3df1a | 956 | instance->waiting_to_send = new_wait_to_send_list; |
AzureIoTClient | 41:71c01aa3df1a | 957 | instance->in_progress_list = new_in_progress_list; |
AzureIoTClient | 41:71c01aa3df1a | 958 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 959 | } |
AzureIoTClient | 41:71c01aa3df1a | 960 | } |
AzureIoTClient | 41:71c01aa3df1a | 961 | } |
AzureIoTClient | 41:71c01aa3df1a | 962 | |
AzureIoTClient | 41:71c01aa3df1a | 963 | return result; |
AzureIoTClient | 41:71c01aa3df1a | 964 | } |
AzureIoTClient | 41:71c01aa3df1a | 965 | |
AzureIoTClient | 41:71c01aa3df1a | 966 | static MESSENGER_SEND_EVENT_TASK* create_task(TELEMETRY_MESSENGER_INSTANCE *messenger) |
AzureIoTClient | 41:71c01aa3df1a | 967 | { |
AzureIoTClient | 41:71c01aa3df1a | 968 | MESSENGER_SEND_EVENT_TASK* task = NULL; |
AzureIoTClient | 36:f78f9a56869e | 969 | |
AzureIoTClient | 41:71c01aa3df1a | 970 | if (NULL == (task = (MESSENGER_SEND_EVENT_TASK *)malloc(sizeof(MESSENGER_SEND_EVENT_TASK)))) |
AzureIoTClient | 41:71c01aa3df1a | 971 | { |
AzureIoTClient | 41:71c01aa3df1a | 972 | LogError("malloc of MESSENGER_SEND_EVENT_TASK failed"); |
AzureIoTClient | 41:71c01aa3df1a | 973 | } |
AzureIoTClient | 41:71c01aa3df1a | 974 | else |
AzureIoTClient | 41:71c01aa3df1a | 975 | { |
AzureIoTClient | 41:71c01aa3df1a | 976 | memset(task, 0, sizeof(*task )); |
AzureIoTClient | 41:71c01aa3df1a | 977 | task->messenger = messenger; |
AzureIoTClient | 41:71c01aa3df1a | 978 | task->send_time = INDEFINITE_TIME; |
AzureIoTClient | 41:71c01aa3df1a | 979 | if (NULL == (task->callback_list = singlylinkedlist_create())) |
AzureIoTClient | 41:71c01aa3df1a | 980 | { |
AzureIoTClient | 41:71c01aa3df1a | 981 | LogError("singlylinkedlist_create failed to create callback_list"); |
AzureIoTClient | 41:71c01aa3df1a | 982 | free_task(task); |
AzureIoTClient | 41:71c01aa3df1a | 983 | task = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 984 | } |
AzureIoTClient | 41:71c01aa3df1a | 985 | else if (RESULT_OK != move_event_to_in_progress_list(task)) |
AzureIoTClient | 41:71c01aa3df1a | 986 | { |
AzureIoTClient | 41:71c01aa3df1a | 987 | LogError("move_event_to_in_progress_list failed"); |
AzureIoTClient | 41:71c01aa3df1a | 988 | free_task(task); |
AzureIoTClient | 41:71c01aa3df1a | 989 | task = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 990 | } |
AzureIoTClient | 41:71c01aa3df1a | 991 | } |
AzureIoTClient | 41:71c01aa3df1a | 992 | return task; |
AzureIoTClient | 41:71c01aa3df1a | 993 | } |
AzureIoTClient | 41:71c01aa3df1a | 994 | |
AzureIoTClient | 41:71c01aa3df1a | 995 | |
AzureIoTClient | 41:71c01aa3df1a | 996 | |
AzureIoTClient | 41:71c01aa3df1a | 997 | static void invoke_callback(const void* item, const void* action_context, bool* continue_processing) |
AzureIoTClient | 41:71c01aa3df1a | 998 | { |
AzureIoTClient | 41:71c01aa3df1a | 999 | MESSENGER_SEND_EVENT_CALLER_INFORMATION *caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)item; |
AzureIoTClient | 41:71c01aa3df1a | 1000 | |
AzureIoTClient | 41:71c01aa3df1a | 1001 | if (NULL != caller_info->on_event_send_complete_callback) |
AzureIoTClient | 41:71c01aa3df1a | 1002 | { |
AzureIoTClient | 41:71c01aa3df1a | 1003 | #pragma warning(push) |
AzureIoTClient | 41:71c01aa3df1a | 1004 | #pragma warning(disable:4305) // Allow typecasting to smaller type on 64 bit systems, since we control ultimate caller. |
AzureIoTClient | 41:71c01aa3df1a | 1005 | TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT messenger_send_result = (TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT)action_context; |
AzureIoTClient | 41:71c01aa3df1a | 1006 | #pragma warning(pop) |
AzureIoTClient | 41:71c01aa3df1a | 1007 | caller_info->on_event_send_complete_callback(caller_info->message, messenger_send_result, caller_info->context); |
AzureIoTClient | 41:71c01aa3df1a | 1008 | } |
AzureIoTClient | 41:71c01aa3df1a | 1009 | *continue_processing = true; |
AzureIoTClient | 36:f78f9a56869e | 1010 | } |
AzureIoTClient | 36:f78f9a56869e | 1011 | |
AzureIoTClient | 36:f78f9a56869e | 1012 | static void internal_on_event_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_result) |
AzureIoTClient | 36:f78f9a56869e | 1013 | { |
AzureIoTClient | 41:71c01aa3df1a | 1014 | if (context != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1015 | { |
AzureIoTClient | 41:71c01aa3df1a | 1016 | MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)context; |
AzureIoTClient | 41:71c01aa3df1a | 1017 | |
AzureIoTClient | 41:71c01aa3df1a | 1018 | if (task->messenger->message_sender_current_state != MESSAGE_SENDER_STATE_ERROR) |
AzureIoTClient | 41:71c01aa3df1a | 1019 | { |
AzureIoTClient | 41:71c01aa3df1a | 1020 | if (task->is_timed_out == false) |
AzureIoTClient | 41:71c01aa3df1a | 1021 | { |
AzureIoTClient | 41:71c01aa3df1a | 1022 | TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT messenger_send_result; |
AzureIoTClient | 36:f78f9a56869e | 1023 | |
AzureIoTClient | 41:71c01aa3df1a | 1024 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_189: [If no failure occurs, `on_event_send_complete_callback` shall be invoked with result TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_OK for all callers associated with this task] |
AzureIoTClient | 41:71c01aa3df1a | 1025 | if (send_result == MESSAGE_SEND_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1026 | { |
AzureIoTClient | 41:71c01aa3df1a | 1027 | messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1028 | } |
AzureIoTClient | 41:71c01aa3df1a | 1029 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_190: [If a failure occured, `on_event_send_complete_callback` shall be invoked with result TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING for all callers associated with this task] |
AzureIoTClient | 41:71c01aa3df1a | 1030 | else |
AzureIoTClient | 41:71c01aa3df1a | 1031 | { |
AzureIoTClient | 41:71c01aa3df1a | 1032 | messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING; |
AzureIoTClient | 41:71c01aa3df1a | 1033 | } |
AzureIoTClient | 41:71c01aa3df1a | 1034 | |
AzureIoTClient | 41:71c01aa3df1a | 1035 | // Initially typecast to a size_t to avoid 64 bit compiler warnings on casting of void* to larger type. |
AzureIoTClient | 41:71c01aa3df1a | 1036 | singlylinkedlist_foreach(task->callback_list, invoke_callback, (void*)((size_t)messenger_send_result)); |
AzureIoTClient | 41:71c01aa3df1a | 1037 | } |
AzureIoTClient | 41:71c01aa3df1a | 1038 | else |
AzureIoTClient | 41:71c01aa3df1a | 1039 | { |
AzureIoTClient | 41:71c01aa3df1a | 1040 | LogInfo("messenger on_event_send_complete_callback invoked for timed out event %p; not firing upper layer callback.", task); |
AzureIoTClient | 41:71c01aa3df1a | 1041 | } |
AzureIoTClient | 36:f78f9a56869e | 1042 | |
AzureIoTClient | 41:71c01aa3df1a | 1043 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_128: [`task` shall be removed from `instance->in_progress_list`] |
AzureIoTClient | 41:71c01aa3df1a | 1044 | remove_event_from_in_progress_list(task); |
AzureIoTClient | 41:71c01aa3df1a | 1045 | |
AzureIoTClient | 41:71c01aa3df1a | 1046 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_130: [`task` shall be destroyed()] |
AzureIoTClient | 41:71c01aa3df1a | 1047 | free_task(task); |
AzureIoTClient | 41:71c01aa3df1a | 1048 | } |
AzureIoTClient | 41:71c01aa3df1a | 1049 | } |
AzureIoTClient | 41:71c01aa3df1a | 1050 | } |
AzureIoTClient | 41:71c01aa3df1a | 1051 | |
AzureIoTClient | 41:71c01aa3df1a | 1052 | static MESSENGER_SEND_EVENT_CALLER_INFORMATION* get_next_caller_message_to_send(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 41:71c01aa3df1a | 1053 | { |
AzureIoTClient | 41:71c01aa3df1a | 1054 | MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info; |
AzureIoTClient | 41:71c01aa3df1a | 1055 | LIST_ITEM_HANDLE list_item; |
AzureIoTClient | 36:f78f9a56869e | 1056 | |
AzureIoTClient | 41:71c01aa3df1a | 1057 | if ((list_item = singlylinkedlist_get_head_item(instance->waiting_to_send)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1058 | { |
AzureIoTClient | 41:71c01aa3df1a | 1059 | caller_info = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1060 | } |
AzureIoTClient | 41:71c01aa3df1a | 1061 | else |
AzureIoTClient | 41:71c01aa3df1a | 1062 | { |
AzureIoTClient | 41:71c01aa3df1a | 1063 | caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_item); |
AzureIoTClient | 36:f78f9a56869e | 1064 | |
AzureIoTClient | 41:71c01aa3df1a | 1065 | if (singlylinkedlist_remove(instance->waiting_to_send, list_item) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1066 | { |
AzureIoTClient | 41:71c01aa3df1a | 1067 | LogError("Failed removing item from waiting_to_send list (singlylinkedlist_remove failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 1068 | } |
AzureIoTClient | 41:71c01aa3df1a | 1069 | } |
AzureIoTClient | 36:f78f9a56869e | 1070 | |
AzureIoTClient | 41:71c01aa3df1a | 1071 | return caller_info; |
AzureIoTClient | 36:f78f9a56869e | 1072 | } |
AzureIoTClient | 36:f78f9a56869e | 1073 | |
AzureIoTClient | 41:71c01aa3df1a | 1074 | typedef struct SEND_PENDING_EVENTS_STATE_TAG |
AzureIoTClient | 41:71c01aa3df1a | 1075 | { |
AzureIoTClient | 41:71c01aa3df1a | 1076 | MESSENGER_SEND_EVENT_TASK* task; |
AzureIoTClient | 41:71c01aa3df1a | 1077 | MESSAGE_HANDLE message_batch_container; |
AzureIoTClient | 41:71c01aa3df1a | 1078 | uint64_t bytes_pending; |
AzureIoTClient | 41:71c01aa3df1a | 1079 | } SEND_PENDING_EVENTS_STATE; |
AzureIoTClient | 41:71c01aa3df1a | 1080 | |
AzureIoTClient | 41:71c01aa3df1a | 1081 | |
AzureIoTClient | 41:71c01aa3df1a | 1082 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_191: [Creates an AMQP message, sets it to be batch mode, and creates an associated task for its callbacks. Errors cause the send events loop to break.] |
AzureIoTClient | 41:71c01aa3df1a | 1083 | static int create_send_pending_events_state(TELEMETRY_MESSENGER_INSTANCE* instance, SEND_PENDING_EVENTS_STATE *send_pending_events_state) |
AzureIoTClient | 36:f78f9a56869e | 1084 | { |
AzureIoTClient | 41:71c01aa3df1a | 1085 | int result; |
AzureIoTClient | 41:71c01aa3df1a | 1086 | memset(send_pending_events_state, 0, sizeof(*send_pending_events_state)); |
AzureIoTClient | 41:71c01aa3df1a | 1087 | |
AzureIoTClient | 41:71c01aa3df1a | 1088 | if ((send_pending_events_state->message_batch_container = message_create()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1089 | { |
AzureIoTClient | 41:71c01aa3df1a | 1090 | LogError("messageBatchContainer = message_create() failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1091 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1092 | } |
AzureIoTClient | 41:71c01aa3df1a | 1093 | else if (message_set_message_format(send_pending_events_state->message_batch_container, AMQP_BATCHING_FORMAT_CODE) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 1094 | { |
AzureIoTClient | 46:c688c75b63b9 | 1095 | LogError("Failed setting the message format to batching format"); |
AzureIoTClient | 46:c688c75b63b9 | 1096 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1097 | } |
AzureIoTClient | 41:71c01aa3df1a | 1098 | else if ((send_pending_events_state->task = create_task(instance)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1099 | { |
AzureIoTClient | 41:71c01aa3df1a | 1100 | LogError("create_task() failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1101 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1102 | } |
AzureIoTClient | 41:71c01aa3df1a | 1103 | else |
AzureIoTClient | 41:71c01aa3df1a | 1104 | { |
AzureIoTClient | 41:71c01aa3df1a | 1105 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1106 | } |
AzureIoTClient | 41:71c01aa3df1a | 1107 | |
AzureIoTClient | 41:71c01aa3df1a | 1108 | return result; |
AzureIoTClient | 41:71c01aa3df1a | 1109 | } |
AzureIoTClient | 41:71c01aa3df1a | 1110 | |
AzureIoTClient | 41:71c01aa3df1a | 1111 | static void invoke_callback_on_error(MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT messenger_event_send_complete_result) |
AzureIoTClient | 41:71c01aa3df1a | 1112 | { |
AzureIoTClient | 41:71c01aa3df1a | 1113 | caller_info->on_event_send_complete_callback(caller_info->message, messenger_event_send_complete_result, (void*)caller_info->context); |
AzureIoTClient | 41:71c01aa3df1a | 1114 | } |
AzureIoTClient | 36:f78f9a56869e | 1115 | |
AzureIoTClient | 41:71c01aa3df1a | 1116 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_194: [When message is ready to send, invoke AMQP's messagesender_send and free temporary values associated with this batch.] |
AzureIoTClient | 41:71c01aa3df1a | 1117 | static int send_batched_message_and_reset_state(TELEMETRY_MESSENGER_INSTANCE* instance, SEND_PENDING_EVENTS_STATE *send_pending_events_state) |
AzureIoTClient | 41:71c01aa3df1a | 1118 | { |
AzureIoTClient | 41:71c01aa3df1a | 1119 | int result; |
AzureIoTClient | 41:71c01aa3df1a | 1120 | |
AzureIoTClient | 43:3da2d93bb955 | 1121 | if (messagesender_send_async(instance->message_sender, send_pending_events_state->message_batch_container, internal_on_event_send_complete_callback, send_pending_events_state->task, 0) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1122 | { |
AzureIoTClient | 41:71c01aa3df1a | 1123 | LogError("messagesender_send failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1124 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1125 | } |
AzureIoTClient | 41:71c01aa3df1a | 1126 | else |
AzureIoTClient | 41:71c01aa3df1a | 1127 | { |
AzureIoTClient | 41:71c01aa3df1a | 1128 | send_pending_events_state->task->send_time = get_time(NULL); |
AzureIoTClient | 41:71c01aa3df1a | 1129 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1130 | } |
AzureIoTClient | 41:71c01aa3df1a | 1131 | |
AzureIoTClient | 41:71c01aa3df1a | 1132 | message_destroy(send_pending_events_state->message_batch_container); |
AzureIoTClient | 41:71c01aa3df1a | 1133 | memset(send_pending_events_state, 0, sizeof(*send_pending_events_state)); |
AzureIoTClient | 41:71c01aa3df1a | 1134 | |
AzureIoTClient | 41:71c01aa3df1a | 1135 | return result; |
AzureIoTClient | 41:71c01aa3df1a | 1136 | } |
AzureIoTClient | 36:f78f9a56869e | 1137 | |
AzureIoTClient | 41:71c01aa3df1a | 1138 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_196: [Determine the maximum message size we can send over this link from AMQP, then remove AMQP_BATCHING_RESERVE_SIZE (1024) bytes as reserve buffer.] |
AzureIoTClient | 41:71c01aa3df1a | 1139 | static int get_max_message_size_for_batching(TELEMETRY_MESSENGER_INSTANCE* instance, uint64_t* max_messagesize) |
AzureIoTClient | 41:71c01aa3df1a | 1140 | { |
AzureIoTClient | 41:71c01aa3df1a | 1141 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1142 | |
AzureIoTClient | 41:71c01aa3df1a | 1143 | if (link_get_peer_max_message_size(instance->sender_link, max_messagesize) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 1144 | { |
AzureIoTClient | 41:71c01aa3df1a | 1145 | LogError("link_get_peer_max_message_size failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1146 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1147 | } |
AzureIoTClient | 41:71c01aa3df1a | 1148 | // Reserve AMQP_BATCHING_RESERVE_SIZE bytes for AMQP overhead of the "main" message itself. |
AzureIoTClient | 41:71c01aa3df1a | 1149 | else if (*max_messagesize <= AMQP_BATCHING_RESERVE_SIZE) |
AzureIoTClient | 41:71c01aa3df1a | 1150 | { |
AzureIoTClient | 41:71c01aa3df1a | 1151 | LogError("link_get_peer_max_message_size (%d) is less than the reserve size (%d)", max_messagesize, AMQP_BATCHING_RESERVE_SIZE); |
AzureIoTClient | 41:71c01aa3df1a | 1152 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1153 | } |
AzureIoTClient | 41:71c01aa3df1a | 1154 | else |
AzureIoTClient | 41:71c01aa3df1a | 1155 | { |
AzureIoTClient | 41:71c01aa3df1a | 1156 | *max_messagesize -= AMQP_BATCHING_RESERVE_SIZE; |
AzureIoTClient | 41:71c01aa3df1a | 1157 | result = 0; |
AzureIoTClient | 41:71c01aa3df1a | 1158 | } |
AzureIoTClient | 41:71c01aa3df1a | 1159 | |
AzureIoTClient | 41:71c01aa3df1a | 1160 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1161 | } |
AzureIoTClient | 36:f78f9a56869e | 1162 | |
AzureIoTClient | 36:f78f9a56869e | 1163 | static int send_pending_events(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 1164 | { |
AzureIoTClient | 41:71c01aa3df1a | 1165 | int result = RESULT_OK; |
AzureIoTClient | 36:f78f9a56869e | 1166 | |
AzureIoTClient | 41:71c01aa3df1a | 1167 | MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info; |
AzureIoTClient | 41:71c01aa3df1a | 1168 | BINARY_DATA body_binary_data; |
AzureIoTClient | 36:f78f9a56869e | 1169 | |
AzureIoTClient | 41:71c01aa3df1a | 1170 | SEND_PENDING_EVENTS_STATE send_pending_events_state; |
AzureIoTClient | 41:71c01aa3df1a | 1171 | memset(&send_pending_events_state, 0, sizeof(send_pending_events_state)); |
AzureIoTClient | 41:71c01aa3df1a | 1172 | memset(&body_binary_data, 0, sizeof(body_binary_data)); |
AzureIoTClient | 36:f78f9a56869e | 1173 | |
AzureIoTClient | 41:71c01aa3df1a | 1174 | uint64_t max_messagesize = 0; |
AzureIoTClient | 36:f78f9a56869e | 1175 | |
AzureIoTClient | 41:71c01aa3df1a | 1176 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_192: [Enumerate through all messages waiting to send, building up AMQP message to send and sending when size will be greater than link max size.] |
AzureIoTClient | 41:71c01aa3df1a | 1177 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_198: [While processing pending messages, errors shall result in user callback being invoked.] |
AzureIoTClient | 41:71c01aa3df1a | 1178 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_199: [Errors specific to a message (e.g. failure to encode) are NOT fatal but we'll keep processing. More general errors (e.g. out of memory) will stop processing.] |
AzureIoTClient | 41:71c01aa3df1a | 1179 | while ((caller_info = get_next_caller_message_to_send(instance)) != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1180 | { |
AzureIoTClient | 41:71c01aa3df1a | 1181 | if (body_binary_data.bytes != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1182 | { |
AzureIoTClient | 41:71c01aa3df1a | 1183 | // Free here as this may have been set during previous run through loop. |
AzureIoTClient | 41:71c01aa3df1a | 1184 | free((unsigned char*)body_binary_data.bytes); |
AzureIoTClient | 41:71c01aa3df1a | 1185 | } |
AzureIoTClient | 41:71c01aa3df1a | 1186 | memset(&body_binary_data, 0, sizeof(body_binary_data)); |
AzureIoTClient | 41:71c01aa3df1a | 1187 | |
AzureIoTClient | 41:71c01aa3df1a | 1188 | if ((0 == max_messagesize) && (get_max_message_size_for_batching(instance, &max_messagesize)) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 1189 | { |
AzureIoTClient | 41:71c01aa3df1a | 1190 | LogError("get_max_message_size_for_batching failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1191 | invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING); |
AzureIoTClient | 41:71c01aa3df1a | 1192 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1193 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1194 | break; |
AzureIoTClient | 41:71c01aa3df1a | 1195 | } |
AzureIoTClient | 41:71c01aa3df1a | 1196 | else if ((send_pending_events_state.task == 0) && (create_send_pending_events_state(instance, &send_pending_events_state) != 0)) |
AzureIoTClient | 41:71c01aa3df1a | 1197 | { |
AzureIoTClient | 41:71c01aa3df1a | 1198 | LogError("create_send_pending_events_state failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1199 | invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING); |
AzureIoTClient | 41:71c01aa3df1a | 1200 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1201 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1202 | break; |
AzureIoTClient | 41:71c01aa3df1a | 1203 | } |
AzureIoTClient | 41:71c01aa3df1a | 1204 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_200: [Retrieve an AMQP encoded representation of this message for later appending to main batched message. On error, invoke callback but continue send loop; this is NOT a fatal error.] |
AzureIoTClient | 46:c688c75b63b9 | 1205 | else if (message_create_uamqp_encoding_from_iothub_message(send_pending_events_state.message_batch_container, caller_info->message->messageHandle, &body_binary_data) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1206 | { |
AzureIoTClient | 41:71c01aa3df1a | 1207 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_201: [If message_create_uamqp_encoding_from_iothub_message fails, invoke callback with TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE] |
AzureIoTClient | 41:71c01aa3df1a | 1208 | LogError("message_create_uamqp_encoding_from_iothub_message() failed. Will continue to try to process messages, result"); |
AzureIoTClient | 41:71c01aa3df1a | 1209 | invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE); |
AzureIoTClient | 41:71c01aa3df1a | 1210 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1211 | continue; |
AzureIoTClient | 41:71c01aa3df1a | 1212 | } |
AzureIoTClient | 41:71c01aa3df1a | 1213 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_197: [If a single message is greater than our maximum AMQP send size, ignore the message. Invoke the callback but continue send loop; this is NOT a fatal error.] |
AzureIoTClient | 41:71c01aa3df1a | 1214 | else if (body_binary_data.length > (int)max_messagesize) |
AzureIoTClient | 41:71c01aa3df1a | 1215 | { |
AzureIoTClient | 41:71c01aa3df1a | 1216 | LogError("a single message will encode to be %d bytes, larger than max we will send the link %lld. Will continue to try to process messages", body_binary_data.length, max_messagesize); |
AzureIoTClient | 41:71c01aa3df1a | 1217 | invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING); |
AzureIoTClient | 41:71c01aa3df1a | 1218 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1219 | continue; |
AzureIoTClient | 41:71c01aa3df1a | 1220 | } |
AzureIoTClient | 41:71c01aa3df1a | 1221 | else if (singlylinkedlist_add(send_pending_events_state.task->callback_list, (void*)caller_info) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1222 | { |
AzureIoTClient | 41:71c01aa3df1a | 1223 | LogError("singlylinkedlist_add failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1224 | invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING); |
AzureIoTClient | 41:71c01aa3df1a | 1225 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1226 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1227 | break; |
AzureIoTClient | 41:71c01aa3df1a | 1228 | } |
AzureIoTClient | 36:f78f9a56869e | 1229 | |
AzureIoTClient | 41:71c01aa3df1a | 1230 | // Once we've added the caller_info to the callback_list, don't directly 'invoke_callback_on_error' anymore directly. |
AzureIoTClient | 41:71c01aa3df1a | 1231 | // The task is responsible for running through its callers for callbacks, even for errors in this function. |
AzureIoTClient | 41:71c01aa3df1a | 1232 | // Similarly, responsibility for freeing this memory falls on the 'task' cleanup also. |
AzureIoTClient | 36:f78f9a56869e | 1233 | |
AzureIoTClient | 41:71c01aa3df1a | 1234 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_193: [If (length of current user AMQP message) + (length of user messages pending for this batched message) + (1KB reserve buffer) > maximum link send, send pending messages and create new batched message.] |
AzureIoTClient | 41:71c01aa3df1a | 1235 | if (body_binary_data.length + send_pending_events_state.bytes_pending > max_messagesize) |
AzureIoTClient | 41:71c01aa3df1a | 1236 | { |
AzureIoTClient | 41:71c01aa3df1a | 1237 | // If we tried to add the current message, we would overflow. Send what we've queued immediately. |
AzureIoTClient | 41:71c01aa3df1a | 1238 | if (send_batched_message_and_reset_state(instance, &send_pending_events_state) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1239 | { |
AzureIoTClient | 41:71c01aa3df1a | 1240 | LogError("send_batched_message_and_reset_state failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1241 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1242 | break; |
AzureIoTClient | 41:71c01aa3df1a | 1243 | } |
AzureIoTClient | 36:f78f9a56869e | 1244 | |
AzureIoTClient | 41:71c01aa3df1a | 1245 | // Now that we've given off the task to uAMQP layer, allocate a new task. |
AzureIoTClient | 41:71c01aa3df1a | 1246 | if (create_send_pending_events_state(instance, &send_pending_events_state) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 1247 | { |
AzureIoTClient | 41:71c01aa3df1a | 1248 | LogError("create_send_pending_events_state failed, result"); |
AzureIoTClient | 41:71c01aa3df1a | 1249 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1250 | break; |
AzureIoTClient | 41:71c01aa3df1a | 1251 | } |
AzureIoTClient | 41:71c01aa3df1a | 1252 | } |
AzureIoTClient | 36:f78f9a56869e | 1253 | |
AzureIoTClient | 41:71c01aa3df1a | 1254 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_195: [Append the current message's encoded data to the batched message tracked by uAMQP layer.] |
AzureIoTClient | 41:71c01aa3df1a | 1255 | if (message_add_body_amqp_data(send_pending_events_state.message_batch_container, body_binary_data) != 0) |
AzureIoTClient | 41:71c01aa3df1a | 1256 | { |
AzureIoTClient | 41:71c01aa3df1a | 1257 | LogError("message_add_body_amqp_data failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1258 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1259 | break; |
AzureIoTClient | 41:71c01aa3df1a | 1260 | } |
AzureIoTClient | 36:f78f9a56869e | 1261 | |
AzureIoTClient | 41:71c01aa3df1a | 1262 | send_pending_events_state.bytes_pending += body_binary_data.length; |
AzureIoTClient | 41:71c01aa3df1a | 1263 | } |
AzureIoTClient | 41:71c01aa3df1a | 1264 | |
AzureIoTClient | 41:71c01aa3df1a | 1265 | if ((result == 0) && (send_pending_events_state.bytes_pending != 0)) |
AzureIoTClient | 41:71c01aa3df1a | 1266 | { |
AzureIoTClient | 41:71c01aa3df1a | 1267 | if (send_batched_message_and_reset_state(instance, &send_pending_events_state) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1268 | { |
AzureIoTClient | 41:71c01aa3df1a | 1269 | LogError("send_batched_message_and_reset_state failed"); |
AzureIoTClient | 41:71c01aa3df1a | 1270 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1271 | } |
AzureIoTClient | 41:71c01aa3df1a | 1272 | } |
AzureIoTClient | 36:f78f9a56869e | 1273 | |
AzureIoTClient | 41:71c01aa3df1a | 1274 | if (body_binary_data.bytes != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1275 | { |
AzureIoTClient | 41:71c01aa3df1a | 1276 | free((unsigned char*)body_binary_data.bytes); |
AzureIoTClient | 41:71c01aa3df1a | 1277 | } |
AzureIoTClient | 36:f78f9a56869e | 1278 | |
AzureIoTClient | 41:71c01aa3df1a | 1279 | // A non-NULL task indicates error, since otherwise send_batched_message_and_reset_state would've sent off messages and reset send_pending_events_state |
AzureIoTClient | 41:71c01aa3df1a | 1280 | if (send_pending_events_state.task != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1281 | { |
AzureIoTClient | 41:71c01aa3df1a | 1282 | singlylinkedlist_foreach(send_pending_events_state.task->callback_list, invoke_callback, (void*)TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING); |
AzureIoTClient | 41:71c01aa3df1a | 1283 | remove_event_from_in_progress_list(send_pending_events_state.task); |
AzureIoTClient | 41:71c01aa3df1a | 1284 | free_task(send_pending_events_state.task); |
AzureIoTClient | 41:71c01aa3df1a | 1285 | } |
AzureIoTClient | 41:71c01aa3df1a | 1286 | |
AzureIoTClient | 41:71c01aa3df1a | 1287 | if (send_pending_events_state.message_batch_container != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1288 | { |
AzureIoTClient | 41:71c01aa3df1a | 1289 | message_destroy(send_pending_events_state.message_batch_container); |
AzureIoTClient | 41:71c01aa3df1a | 1290 | } |
AzureIoTClient | 41:71c01aa3df1a | 1291 | |
AzureIoTClient | 41:71c01aa3df1a | 1292 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1293 | } |
AzureIoTClient | 36:f78f9a56869e | 1294 | |
AzureIoTClient | 36:f78f9a56869e | 1295 | // @brief |
AzureIoTClient | 36:f78f9a56869e | 1296 | // Goes through each task in in_progress_list and checks if the events timed out to be sent. |
AzureIoTClient | 36:f78f9a56869e | 1297 | // @remarks |
AzureIoTClient | 36:f78f9a56869e | 1298 | // If an event is timed out, it is marked as such but not removed, and the upper layer callback is invoked. |
AzureIoTClient | 36:f78f9a56869e | 1299 | // @returns |
AzureIoTClient | 36:f78f9a56869e | 1300 | // 0 if no failures occur, non-zero otherwise. |
AzureIoTClient | 36:f78f9a56869e | 1301 | static int process_event_send_timeouts(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 1302 | { |
AzureIoTClient | 41:71c01aa3df1a | 1303 | int result = RESULT_OK; |
AzureIoTClient | 36:f78f9a56869e | 1304 | |
AzureIoTClient | 41:71c01aa3df1a | 1305 | if (instance->event_send_timeout_secs > 0) |
AzureIoTClient | 41:71c01aa3df1a | 1306 | { |
AzureIoTClient | 41:71c01aa3df1a | 1307 | LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(instance->in_progress_list); |
AzureIoTClient | 36:f78f9a56869e | 1308 | |
AzureIoTClient | 41:71c01aa3df1a | 1309 | while (list_item != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1310 | { |
AzureIoTClient | 41:71c01aa3df1a | 1311 | MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_item); |
AzureIoTClient | 36:f78f9a56869e | 1312 | |
AzureIoTClient | 41:71c01aa3df1a | 1313 | if (task->is_timed_out == false) |
AzureIoTClient | 41:71c01aa3df1a | 1314 | { |
AzureIoTClient | 41:71c01aa3df1a | 1315 | int is_timed_out; |
AzureIoTClient | 36:f78f9a56869e | 1316 | |
AzureIoTClient | 41:71c01aa3df1a | 1317 | if (is_timeout_reached(task->send_time, instance->event_send_timeout_secs, &is_timed_out) == RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1318 | { |
AzureIoTClient | 41:71c01aa3df1a | 1319 | if (is_timed_out) |
AzureIoTClient | 41:71c01aa3df1a | 1320 | { |
AzureIoTClient | 41:71c01aa3df1a | 1321 | task->is_timed_out = true; |
AzureIoTClient | 41:71c01aa3df1a | 1322 | singlylinkedlist_foreach(task->callback_list, invoke_callback, (void*)TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_TIMEOUT); |
AzureIoTClient | 41:71c01aa3df1a | 1323 | } |
AzureIoTClient | 41:71c01aa3df1a | 1324 | } |
AzureIoTClient | 41:71c01aa3df1a | 1325 | else |
AzureIoTClient | 41:71c01aa3df1a | 1326 | { |
AzureIoTClient | 41:71c01aa3df1a | 1327 | LogError("messenger failed to evaluate event send timeout of event %d", task); |
AzureIoTClient | 41:71c01aa3df1a | 1328 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1329 | } |
AzureIoTClient | 41:71c01aa3df1a | 1330 | } |
AzureIoTClient | 36:f78f9a56869e | 1331 | |
AzureIoTClient | 41:71c01aa3df1a | 1332 | list_item = singlylinkedlist_get_next_item(list_item); |
AzureIoTClient | 41:71c01aa3df1a | 1333 | } |
AzureIoTClient | 41:71c01aa3df1a | 1334 | } |
AzureIoTClient | 36:f78f9a56869e | 1335 | |
AzureIoTClient | 41:71c01aa3df1a | 1336 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1337 | } |
AzureIoTClient | 36:f78f9a56869e | 1338 | |
AzureIoTClient | 36:f78f9a56869e | 1339 | // @brief |
AzureIoTClient | 36:f78f9a56869e | 1340 | // Removes all the timed out events from the in_progress_list, without invoking callbacks or detroying the messages. |
AzureIoTClient | 36:f78f9a56869e | 1341 | static void remove_timed_out_events(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 1342 | { |
AzureIoTClient | 41:71c01aa3df1a | 1343 | LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(instance->in_progress_list); |
AzureIoTClient | 36:f78f9a56869e | 1344 | |
AzureIoTClient | 41:71c01aa3df1a | 1345 | while (list_item != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1346 | { |
AzureIoTClient | 41:71c01aa3df1a | 1347 | MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_item); |
AzureIoTClient | 36:f78f9a56869e | 1348 | |
AzureIoTClient | 41:71c01aa3df1a | 1349 | if (task->is_timed_out == true) |
AzureIoTClient | 41:71c01aa3df1a | 1350 | { |
AzureIoTClient | 41:71c01aa3df1a | 1351 | remove_event_from_in_progress_list(task); |
AzureIoTClient | 36:f78f9a56869e | 1352 | |
AzureIoTClient | 41:71c01aa3df1a | 1353 | free_task(task); |
AzureIoTClient | 41:71c01aa3df1a | 1354 | } |
AzureIoTClient | 36:f78f9a56869e | 1355 | |
AzureIoTClient | 41:71c01aa3df1a | 1356 | list_item = singlylinkedlist_get_next_item(list_item); |
AzureIoTClient | 41:71c01aa3df1a | 1357 | } |
AzureIoTClient | 36:f78f9a56869e | 1358 | } |
AzureIoTClient | 36:f78f9a56869e | 1359 | |
AzureIoTClient | 36:f78f9a56869e | 1360 | |
AzureIoTClient | 36:f78f9a56869e | 1361 | // ---------- Set/Retrieve Options Helpers ----------// |
AzureIoTClient | 36:f78f9a56869e | 1362 | |
AzureIoTClient | 36:f78f9a56869e | 1363 | static void* telemetry_messenger_clone_option(const char* name, const void* value) |
AzureIoTClient | 36:f78f9a56869e | 1364 | { |
AzureIoTClient | 41:71c01aa3df1a | 1365 | void* result; |
AzureIoTClient | 36:f78f9a56869e | 1366 | |
AzureIoTClient | 41:71c01aa3df1a | 1367 | if (name == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1368 | { |
AzureIoTClient | 41:71c01aa3df1a | 1369 | LogError("Failed to clone messenger option (name is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1370 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1371 | } |
AzureIoTClient | 41:71c01aa3df1a | 1372 | else if (value == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1373 | { |
AzureIoTClient | 41:71c01aa3df1a | 1374 | LogError("Failed to clone messenger option (value is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1375 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1376 | } |
AzureIoTClient | 41:71c01aa3df1a | 1377 | else |
AzureIoTClient | 41:71c01aa3df1a | 1378 | { |
AzureIoTClient | 51:269e65571b39 | 1379 | if (strcmp(TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0 || |
AzureIoTClient | 51:269e65571b39 | 1380 | strcmp(TELEMETRY_MESSENGER_OPTION_SAVED_OPTIONS, name) == 0) |
AzureIoTClient | 41:71c01aa3df1a | 1381 | { |
AzureIoTClient | 41:71c01aa3df1a | 1382 | result = (void*)value; |
AzureIoTClient | 41:71c01aa3df1a | 1383 | } |
AzureIoTClient | 41:71c01aa3df1a | 1384 | else |
AzureIoTClient | 41:71c01aa3df1a | 1385 | { |
AzureIoTClient | 41:71c01aa3df1a | 1386 | LogError("Failed to clone messenger option (option with name '%s' is not suppported)", name); |
AzureIoTClient | 41:71c01aa3df1a | 1387 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1388 | } |
AzureIoTClient | 41:71c01aa3df1a | 1389 | } |
AzureIoTClient | 36:f78f9a56869e | 1390 | |
AzureIoTClient | 41:71c01aa3df1a | 1391 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1392 | } |
AzureIoTClient | 36:f78f9a56869e | 1393 | |
AzureIoTClient | 36:f78f9a56869e | 1394 | static void telemetry_messenger_destroy_option(const char* name, const void* value) |
AzureIoTClient | 36:f78f9a56869e | 1395 | { |
AzureIoTClient | 41:71c01aa3df1a | 1396 | if (name == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1397 | { |
AzureIoTClient | 41:71c01aa3df1a | 1398 | LogError("Failed to destroy messenger option (name is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1399 | } |
AzureIoTClient | 41:71c01aa3df1a | 1400 | else if (value == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1401 | { |
AzureIoTClient | 41:71c01aa3df1a | 1402 | LogError("Failed to destroy messenger option (value is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1403 | } |
AzureIoTClient | 41:71c01aa3df1a | 1404 | else |
AzureIoTClient | 41:71c01aa3df1a | 1405 | { |
AzureIoTClient | 41:71c01aa3df1a | 1406 | // Nothing to be done for the supported options. |
AzureIoTClient | 41:71c01aa3df1a | 1407 | } |
AzureIoTClient | 36:f78f9a56869e | 1408 | } |
AzureIoTClient | 36:f78f9a56869e | 1409 | |
AzureIoTClient | 36:f78f9a56869e | 1410 | |
AzureIoTClient | 36:f78f9a56869e | 1411 | // Public API: |
AzureIoTClient | 36:f78f9a56869e | 1412 | |
AzureIoTClient | 36:f78f9a56869e | 1413 | int telemetry_messenger_subscribe_for_messages(TELEMETRY_MESSENGER_HANDLE messenger_handle, ON_TELEMETRY_MESSENGER_MESSAGE_RECEIVED on_message_received_callback, void* context) |
AzureIoTClient | 36:f78f9a56869e | 1414 | { |
AzureIoTClient | 41:71c01aa3df1a | 1415 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1416 | |
AzureIoTClient | 41:71c01aa3df1a | 1417 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_016: [If `messenger_handle` is NULL, telemetry_messenger_subscribe_for_messages() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1418 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1419 | { |
AzureIoTClient | 41:71c01aa3df1a | 1420 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1421 | LogError("telemetry_messenger_subscribe_for_messages failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1422 | } |
AzureIoTClient | 41:71c01aa3df1a | 1423 | else |
AzureIoTClient | 41:71c01aa3df1a | 1424 | { |
AzureIoTClient | 41:71c01aa3df1a | 1425 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1426 | |
AzureIoTClient | 41:71c01aa3df1a | 1427 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_017: [If `instance->receive_messages` is already true, telemetry_messenger_subscribe_for_messages() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1428 | if (instance->receive_messages) |
AzureIoTClient | 41:71c01aa3df1a | 1429 | { |
AzureIoTClient | 41:71c01aa3df1a | 1430 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1431 | LogError("telemetry_messenger_subscribe_for_messages failed (messenger already subscribed)"); |
AzureIoTClient | 41:71c01aa3df1a | 1432 | } |
AzureIoTClient | 41:71c01aa3df1a | 1433 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_018: [If `on_message_received_callback` is NULL, telemetry_messenger_subscribe_for_messages() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1434 | else if (on_message_received_callback == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1435 | { |
AzureIoTClient | 41:71c01aa3df1a | 1436 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1437 | LogError("telemetry_messenger_subscribe_for_messages failed (on_message_received_callback is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1438 | } |
AzureIoTClient | 41:71c01aa3df1a | 1439 | else |
AzureIoTClient | 41:71c01aa3df1a | 1440 | { |
AzureIoTClient | 41:71c01aa3df1a | 1441 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_019: [`on_message_received_callback` shall be saved on `instance->on_message_received_callback`] |
AzureIoTClient | 41:71c01aa3df1a | 1442 | instance->on_message_received_callback = on_message_received_callback; |
AzureIoTClient | 36:f78f9a56869e | 1443 | |
AzureIoTClient | 41:71c01aa3df1a | 1444 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_020: [`context` shall be saved on `instance->on_message_received_context`] |
AzureIoTClient | 41:71c01aa3df1a | 1445 | instance->on_message_received_context = context; |
AzureIoTClient | 36:f78f9a56869e | 1446 | |
AzureIoTClient | 41:71c01aa3df1a | 1447 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_021: [telemetry_messenger_subscribe_for_messages() shall set `instance->receive_messages` to true] |
AzureIoTClient | 41:71c01aa3df1a | 1448 | instance->receive_messages = true; |
AzureIoTClient | 36:f78f9a56869e | 1449 | |
AzureIoTClient | 41:71c01aa3df1a | 1450 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_022: [If no failures occurr, telemetry_messenger_subscribe_for_messages() shall return 0] |
AzureIoTClient | 41:71c01aa3df1a | 1451 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1452 | } |
AzureIoTClient | 41:71c01aa3df1a | 1453 | } |
AzureIoTClient | 36:f78f9a56869e | 1454 | |
AzureIoTClient | 41:71c01aa3df1a | 1455 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1456 | } |
AzureIoTClient | 36:f78f9a56869e | 1457 | |
AzureIoTClient | 36:f78f9a56869e | 1458 | int telemetry_messenger_unsubscribe_for_messages(TELEMETRY_MESSENGER_HANDLE messenger_handle) |
AzureIoTClient | 36:f78f9a56869e | 1459 | { |
AzureIoTClient | 41:71c01aa3df1a | 1460 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1461 | |
AzureIoTClient | 41:71c01aa3df1a | 1462 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_023: [If `messenger_handle` is NULL, telemetry_messenger_unsubscribe_for_messages() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1463 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1464 | { |
AzureIoTClient | 41:71c01aa3df1a | 1465 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1466 | LogError("telemetry_messenger_unsubscribe_for_messages failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1467 | } |
AzureIoTClient | 41:71c01aa3df1a | 1468 | else |
AzureIoTClient | 41:71c01aa3df1a | 1469 | { |
AzureIoTClient | 41:71c01aa3df1a | 1470 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1471 | |
AzureIoTClient | 41:71c01aa3df1a | 1472 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_024: [If `instance->receive_messages` is already false, telemetry_messenger_unsubscribe_for_messages() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1473 | if (instance->receive_messages == false) |
AzureIoTClient | 41:71c01aa3df1a | 1474 | { |
AzureIoTClient | 41:71c01aa3df1a | 1475 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1476 | LogError("telemetry_messenger_unsubscribe_for_messages failed (messenger is not subscribed)"); |
AzureIoTClient | 41:71c01aa3df1a | 1477 | } |
AzureIoTClient | 41:71c01aa3df1a | 1478 | else |
AzureIoTClient | 41:71c01aa3df1a | 1479 | { |
AzureIoTClient | 41:71c01aa3df1a | 1480 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [telemetry_messenger_unsubscribe_for_messages() shall set `instance->receive_messages` to false] |
AzureIoTClient | 41:71c01aa3df1a | 1481 | instance->receive_messages = false; |
AzureIoTClient | 41:71c01aa3df1a | 1482 | |
AzureIoTClient | 41:71c01aa3df1a | 1483 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [telemetry_messenger_unsubscribe_for_messages() shall set `instance->on_message_received_callback` to NULL] |
AzureIoTClient | 41:71c01aa3df1a | 1484 | instance->on_message_received_callback = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1485 | |
AzureIoTClient | 41:71c01aa3df1a | 1486 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_027: [telemetry_messenger_unsubscribe_for_messages() shall set `instance->on_message_received_context` to NULL] |
AzureIoTClient | 41:71c01aa3df1a | 1487 | instance->on_message_received_context = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1488 | |
AzureIoTClient | 41:71c01aa3df1a | 1489 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_028: [If no failures occurr, telemetry_messenger_unsubscribe_for_messages() shall return 0] |
AzureIoTClient | 41:71c01aa3df1a | 1490 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1491 | } |
AzureIoTClient | 41:71c01aa3df1a | 1492 | } |
AzureIoTClient | 36:f78f9a56869e | 1493 | |
AzureIoTClient | 41:71c01aa3df1a | 1494 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1495 | } |
AzureIoTClient | 36:f78f9a56869e | 1496 | |
AzureIoTClient | 36:f78f9a56869e | 1497 | int telemetry_messenger_send_message_disposition(TELEMETRY_MESSENGER_HANDLE messenger_handle, TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, TELEMETRY_MESSENGER_DISPOSITION_RESULT disposition_result) |
AzureIoTClient | 36:f78f9a56869e | 1498 | { |
AzureIoTClient | 41:71c01aa3df1a | 1499 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1500 | |
AzureIoTClient | 41:71c01aa3df1a | 1501 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_179: [If `messenger_handle` or `disposition_info` are NULL, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1502 | if (messenger_handle == NULL || disposition_info == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1503 | { |
AzureIoTClient | 41:71c01aa3df1a | 1504 | LogError("Failed sending message disposition (either messenger_handle (%p) or disposition_info (%p) are NULL)", messenger_handle, disposition_info); |
AzureIoTClient | 41:71c01aa3df1a | 1505 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1506 | } |
AzureIoTClient | 41:71c01aa3df1a | 1507 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_180: [If `disposition_info->source` is NULL, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1508 | else if (disposition_info->source == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1509 | { |
AzureIoTClient | 41:71c01aa3df1a | 1510 | LogError("Failed sending message disposition (disposition_info->source is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1511 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1512 | } |
AzureIoTClient | 41:71c01aa3df1a | 1513 | else |
AzureIoTClient | 41:71c01aa3df1a | 1514 | { |
AzureIoTClient | 41:71c01aa3df1a | 1515 | TELEMETRY_MESSENGER_INSTANCE* messenger = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1516 | |
AzureIoTClient | 41:71c01aa3df1a | 1517 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_189: [If `messenger_handle->message_receiver` is NULL, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1518 | if (messenger->message_receiver == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1519 | { |
AzureIoTClient | 41:71c01aa3df1a | 1520 | LogError("Failed sending message disposition (message_receiver is not created; check if it is subscribed)"); |
AzureIoTClient | 41:71c01aa3df1a | 1521 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1522 | } |
AzureIoTClient | 41:71c01aa3df1a | 1523 | else |
AzureIoTClient | 41:71c01aa3df1a | 1524 | { |
AzureIoTClient | 41:71c01aa3df1a | 1525 | AMQP_VALUE uamqp_disposition_result; |
AzureIoTClient | 36:f78f9a56869e | 1526 | |
AzureIoTClient | 41:71c01aa3df1a | 1527 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_181: [An AMQP_VALUE disposition result shall be created corresponding to the `disposition_result` provided] |
AzureIoTClient | 41:71c01aa3df1a | 1528 | if ((uamqp_disposition_result = create_uamqp_disposition_result_from(disposition_result)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1529 | { |
AzureIoTClient | 41:71c01aa3df1a | 1530 | LogError("Failed sending message disposition (disposition result %d is not supported)", disposition_result); |
AzureIoTClient | 41:71c01aa3df1a | 1531 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1532 | } |
AzureIoTClient | 41:71c01aa3df1a | 1533 | else |
AzureIoTClient | 41:71c01aa3df1a | 1534 | { |
AzureIoTClient | 41:71c01aa3df1a | 1535 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_182: [`messagereceiver_send_message_disposition()` shall be invoked passing `disposition_info->source`, `disposition_info->message_id` and the corresponding AMQP_VALUE disposition result] |
AzureIoTClient | 41:71c01aa3df1a | 1536 | if (messagereceiver_send_message_disposition(messenger->message_receiver, disposition_info->source, disposition_info->message_id, uamqp_disposition_result) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1537 | { |
AzureIoTClient | 41:71c01aa3df1a | 1538 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_183: [If `messagereceiver_send_message_disposition()` fails, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1539 | LogError("Failed sending message disposition (messagereceiver_send_message_disposition failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 1540 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1541 | } |
AzureIoTClient | 41:71c01aa3df1a | 1542 | else |
AzureIoTClient | 41:71c01aa3df1a | 1543 | { |
AzureIoTClient | 41:71c01aa3df1a | 1544 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_185: [If no failures occurr, telemetry_messenger_send_message_disposition() shall return 0] |
AzureIoTClient | 41:71c01aa3df1a | 1545 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1546 | } |
AzureIoTClient | 36:f78f9a56869e | 1547 | |
AzureIoTClient | 41:71c01aa3df1a | 1548 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_184: [telemetry_messenger_send_message_disposition() shall destroy the AMQP_VALUE disposition result] |
AzureIoTClient | 41:71c01aa3df1a | 1549 | amqpvalue_destroy(uamqp_disposition_result); |
AzureIoTClient | 41:71c01aa3df1a | 1550 | } |
AzureIoTClient | 41:71c01aa3df1a | 1551 | } |
AzureIoTClient | 41:71c01aa3df1a | 1552 | } |
AzureIoTClient | 36:f78f9a56869e | 1553 | |
AzureIoTClient | 41:71c01aa3df1a | 1554 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1555 | } |
AzureIoTClient | 36:f78f9a56869e | 1556 | |
AzureIoTClient | 36:f78f9a56869e | 1557 | int telemetry_messenger_send_async(TELEMETRY_MESSENGER_HANDLE messenger_handle, IOTHUB_MESSAGE_LIST* message, ON_TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE on_messenger_event_send_complete_callback, void* context) |
AzureIoTClient | 36:f78f9a56869e | 1558 | { |
AzureIoTClient | 41:71c01aa3df1a | 1559 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1560 | |
AzureIoTClient | 41:71c01aa3df1a | 1561 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_134: [If `messenger_handle` is NULL, telemetry_messenger_send_async() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1562 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1563 | { |
AzureIoTClient | 41:71c01aa3df1a | 1564 | LogError("Failed sending event (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1565 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1566 | } |
AzureIoTClient | 41:71c01aa3df1a | 1567 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_135: [If `message` is NULL, telemetry_messenger_send_async() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1568 | else if (message == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1569 | { |
AzureIoTClient | 41:71c01aa3df1a | 1570 | LogError("Failed sending event (message is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1571 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1572 | } |
AzureIoTClient | 41:71c01aa3df1a | 1573 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_136: [If `on_event_send_complete_callback` is NULL, telemetry_messenger_send_async() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1574 | else if (on_messenger_event_send_complete_callback == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1575 | { |
AzureIoTClient | 41:71c01aa3df1a | 1576 | LogError("Failed sending event (on_event_send_complete_callback is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1577 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1578 | } |
AzureIoTClient | 41:71c01aa3df1a | 1579 | else |
AzureIoTClient | 41:71c01aa3df1a | 1580 | { |
AzureIoTClient | 41:71c01aa3df1a | 1581 | MESSENGER_SEND_EVENT_CALLER_INFORMATION *caller_info; |
AzureIoTClient | 41:71c01aa3df1a | 1582 | TELEMETRY_MESSENGER_INSTANCE *instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1583 | |
AzureIoTClient | 41:71c01aa3df1a | 1584 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_137: [telemetry_messenger_send_async() shall allocate memory for a MESSENGER_SEND_EVENT_CALLER_INFORMATION structure] |
AzureIoTClient | 41:71c01aa3df1a | 1585 | if ((caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)malloc(sizeof(MESSENGER_SEND_EVENT_CALLER_INFORMATION))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1586 | { |
AzureIoTClient | 41:71c01aa3df1a | 1587 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_138: [If malloc() fails, telemetry_messenger_send_async() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1588 | LogError("Failed sending event (failed to create struct for task; malloc failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 1589 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1590 | } |
AzureIoTClient | 41:71c01aa3df1a | 1591 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_100: [`task` shall be added to `instance->waiting_to_send` using singlylinkedlist_add()] |
AzureIoTClient | 41:71c01aa3df1a | 1592 | else if (singlylinkedlist_add(instance->waiting_to_send, caller_info) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1593 | { |
AzureIoTClient | 41:71c01aa3df1a | 1594 | LogError("Failed sending event (singlylinkedlist_add failed)"); |
AzureIoTClient | 36:f78f9a56869e | 1595 | |
AzureIoTClient | 41:71c01aa3df1a | 1596 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If singlylinkedlist_add() fails, telemetry_messenger_send_async() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1597 | result = __FAILURE__; |
AzureIoTClient | 36:f78f9a56869e | 1598 | |
AzureIoTClient | 41:71c01aa3df1a | 1599 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_142: [If any failure occurs, telemetry_messenger_send_async() shall free any memory it has allocated] |
AzureIoTClient | 41:71c01aa3df1a | 1600 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1601 | } |
AzureIoTClient | 41:71c01aa3df1a | 1602 | else |
AzureIoTClient | 41:71c01aa3df1a | 1603 | { |
AzureIoTClient | 41:71c01aa3df1a | 1604 | memset(caller_info, 0, sizeof(MESSENGER_SEND_EVENT_CALLER_INFORMATION)); |
AzureIoTClient | 41:71c01aa3df1a | 1605 | caller_info->message = message; |
AzureIoTClient | 41:71c01aa3df1a | 1606 | caller_info->on_event_send_complete_callback = on_messenger_event_send_complete_callback; |
AzureIoTClient | 41:71c01aa3df1a | 1607 | caller_info->context = context; |
AzureIoTClient | 41:71c01aa3df1a | 1608 | |
AzureIoTClient | 41:71c01aa3df1a | 1609 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_143: [If no failures occur, telemetry_messenger_send_async() shall return zero] |
AzureIoTClient | 41:71c01aa3df1a | 1610 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1611 | } |
AzureIoTClient | 41:71c01aa3df1a | 1612 | } |
AzureIoTClient | 36:f78f9a56869e | 1613 | |
AzureIoTClient | 41:71c01aa3df1a | 1614 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1615 | } |
AzureIoTClient | 36:f78f9a56869e | 1616 | |
AzureIoTClient | 36:f78f9a56869e | 1617 | int telemetry_messenger_get_send_status(TELEMETRY_MESSENGER_HANDLE messenger_handle, TELEMETRY_MESSENGER_SEND_STATUS* send_status) |
AzureIoTClient | 36:f78f9a56869e | 1618 | { |
AzureIoTClient | 41:71c01aa3df1a | 1619 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1620 | |
AzureIoTClient | 41:71c01aa3df1a | 1621 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_144: [If `messenger_handle` is NULL, telemetry_messenger_get_send_status() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1622 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1623 | { |
AzureIoTClient | 41:71c01aa3df1a | 1624 | LogError("telemetry_messenger_get_send_status failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1625 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1626 | } |
AzureIoTClient | 41:71c01aa3df1a | 1627 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_145: [If `send_status` is NULL, telemetry_messenger_get_send_status() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1628 | else if (send_status == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1629 | { |
AzureIoTClient | 41:71c01aa3df1a | 1630 | LogError("telemetry_messenger_get_send_status failed (send_status is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1631 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1632 | } |
AzureIoTClient | 41:71c01aa3df1a | 1633 | else |
AzureIoTClient | 41:71c01aa3df1a | 1634 | { |
AzureIoTClient | 41:71c01aa3df1a | 1635 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 41:71c01aa3df1a | 1636 | LIST_ITEM_HANDLE wts_list_head = singlylinkedlist_get_head_item(instance->waiting_to_send); |
AzureIoTClient | 41:71c01aa3df1a | 1637 | LIST_ITEM_HANDLE ip_list_head = singlylinkedlist_get_head_item(instance->in_progress_list); |
AzureIoTClient | 36:f78f9a56869e | 1638 | |
AzureIoTClient | 41:71c01aa3df1a | 1639 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_147: [If `instance->in_progress_list` and `instance->wait_to_send_list` are empty, send_status shall be set to TELEMETRY_MESSENGER_SEND_STATUS_IDLE] |
AzureIoTClient | 41:71c01aa3df1a | 1640 | if (wts_list_head == NULL && ip_list_head == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1641 | { |
AzureIoTClient | 41:71c01aa3df1a | 1642 | *send_status = TELEMETRY_MESSENGER_SEND_STATUS_IDLE; |
AzureIoTClient | 41:71c01aa3df1a | 1643 | } |
AzureIoTClient | 41:71c01aa3df1a | 1644 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_148: [Otherwise, send_status shall be set to TELEMETRY_MESSENGER_SEND_STATUS_BUSY] |
AzureIoTClient | 41:71c01aa3df1a | 1645 | else |
AzureIoTClient | 41:71c01aa3df1a | 1646 | { |
AzureIoTClient | 41:71c01aa3df1a | 1647 | *send_status = TELEMETRY_MESSENGER_SEND_STATUS_BUSY; |
AzureIoTClient | 41:71c01aa3df1a | 1648 | } |
AzureIoTClient | 36:f78f9a56869e | 1649 | |
AzureIoTClient | 41:71c01aa3df1a | 1650 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_149: [If no failures occur, telemetry_messenger_get_send_status() shall return 0] |
AzureIoTClient | 41:71c01aa3df1a | 1651 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1652 | } |
AzureIoTClient | 36:f78f9a56869e | 1653 | |
AzureIoTClient | 41:71c01aa3df1a | 1654 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1655 | } |
AzureIoTClient | 36:f78f9a56869e | 1656 | |
AzureIoTClient | 36:f78f9a56869e | 1657 | int telemetry_messenger_start(TELEMETRY_MESSENGER_HANDLE messenger_handle, SESSION_HANDLE session_handle) |
AzureIoTClient | 36:f78f9a56869e | 1658 | { |
AzureIoTClient | 41:71c01aa3df1a | 1659 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1660 | |
AzureIoTClient | 41:71c01aa3df1a | 1661 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` is NULL, telemetry_messenger_start() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1662 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1663 | { |
AzureIoTClient | 41:71c01aa3df1a | 1664 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1665 | LogError("telemetry_messenger_start failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1666 | } |
AzureIoTClient | 41:71c01aa3df1a | 1667 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_030: [If `session_handle` is NULL, telemetry_messenger_start() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1668 | else if (session_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1669 | { |
AzureIoTClient | 41:71c01aa3df1a | 1670 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1671 | LogError("telemetry_messenger_start failed (session_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1672 | } |
AzureIoTClient | 41:71c01aa3df1a | 1673 | else |
AzureIoTClient | 41:71c01aa3df1a | 1674 | { |
AzureIoTClient | 41:71c01aa3df1a | 1675 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1676 | |
AzureIoTClient | 41:71c01aa3df1a | 1677 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_031: [If `instance->state` is not TELEMETRY_MESSENGER_STATE_STOPPED, telemetry_messenger_start() shall fail and return __FAILURE__] |
AzureIoTClient | 41:71c01aa3df1a | 1678 | if (instance->state != TELEMETRY_MESSENGER_STATE_STOPPED) |
AzureIoTClient | 41:71c01aa3df1a | 1679 | { |
AzureIoTClient | 41:71c01aa3df1a | 1680 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1681 | LogError("telemetry_messenger_start failed (current state is %d; expected TELEMETRY_MESSENGER_STATE_STOPPED)", instance->state); |
AzureIoTClient | 41:71c01aa3df1a | 1682 | } |
AzureIoTClient | 41:71c01aa3df1a | 1683 | else |
AzureIoTClient | 41:71c01aa3df1a | 1684 | { |
AzureIoTClient | 41:71c01aa3df1a | 1685 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_032: [`session_handle` shall be saved on `instance->session_handle`] |
AzureIoTClient | 41:71c01aa3df1a | 1686 | instance->session_handle = session_handle; |
AzureIoTClient | 36:f78f9a56869e | 1687 | |
AzureIoTClient | 41:71c01aa3df1a | 1688 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_115: [If no failures occurr, `instance->state` shall be set to TELEMETRY_MESSENGER_STATE_STARTING, and `instance->on_state_changed_callback` invoked if provided] |
AzureIoTClient | 41:71c01aa3df1a | 1689 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STARTING); |
AzureIoTClient | 36:f78f9a56869e | 1690 | |
AzureIoTClient | 41:71c01aa3df1a | 1691 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_056: [If no failures occurr, telemetry_messenger_start() shall return 0] |
AzureIoTClient | 41:71c01aa3df1a | 1692 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1693 | } |
AzureIoTClient | 41:71c01aa3df1a | 1694 | } |
AzureIoTClient | 36:f78f9a56869e | 1695 | |
AzureIoTClient | 41:71c01aa3df1a | 1696 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1697 | } |
AzureIoTClient | 36:f78f9a56869e | 1698 | |
AzureIoTClient | 36:f78f9a56869e | 1699 | int telemetry_messenger_stop(TELEMETRY_MESSENGER_HANDLE messenger_handle) |
AzureIoTClient | 36:f78f9a56869e | 1700 | { |
AzureIoTClient | 41:71c01aa3df1a | 1701 | int result; |
AzureIoTClient | 36:f78f9a56869e | 1702 | |
AzureIoTClient | 41:71c01aa3df1a | 1703 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_057: [If `messenger_handle` is NULL, telemetry_messenger_stop() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1704 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1705 | { |
AzureIoTClient | 41:71c01aa3df1a | 1706 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1707 | LogError("telemetry_messenger_stop failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1708 | } |
AzureIoTClient | 41:71c01aa3df1a | 1709 | else |
AzureIoTClient | 41:71c01aa3df1a | 1710 | { |
AzureIoTClient | 41:71c01aa3df1a | 1711 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1712 | |
AzureIoTClient | 41:71c01aa3df1a | 1713 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_058: [If `instance->state` is TELEMETRY_MESSENGER_STATE_STOPPED, telemetry_messenger_stop() shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 1714 | if (instance->state == TELEMETRY_MESSENGER_STATE_STOPPED) |
AzureIoTClient | 41:71c01aa3df1a | 1715 | { |
AzureIoTClient | 41:71c01aa3df1a | 1716 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1717 | LogError("telemetry_messenger_stop failed (messenger is already stopped)"); |
AzureIoTClient | 41:71c01aa3df1a | 1718 | } |
AzureIoTClient | 41:71c01aa3df1a | 1719 | else |
AzureIoTClient | 41:71c01aa3df1a | 1720 | { |
AzureIoTClient | 41:71c01aa3df1a | 1721 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STOPPING); |
AzureIoTClient | 36:f78f9a56869e | 1722 | |
AzureIoTClient | 41:71c01aa3df1a | 1723 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_152: [telemetry_messenger_stop() shall close and destroy `instance->message_sender` and `instance->message_receiver`] |
AzureIoTClient | 41:71c01aa3df1a | 1724 | destroy_event_sender(instance); |
AzureIoTClient | 41:71c01aa3df1a | 1725 | destroy_message_receiver(instance); |
AzureIoTClient | 36:f78f9a56869e | 1726 | |
AzureIoTClient | 41:71c01aa3df1a | 1727 | remove_timed_out_events(instance); |
AzureIoTClient | 36:f78f9a56869e | 1728 | |
AzureIoTClient | 41:71c01aa3df1a | 1729 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_162: [telemetry_messenger_stop() shall move all items from `instance->in_progress_list` to the beginning of `instance->wait_to_send_list`] |
AzureIoTClient | 41:71c01aa3df1a | 1730 | if (move_events_to_wait_to_send_list(instance) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1731 | { |
AzureIoTClient | 41:71c01aa3df1a | 1732 | LogError("Messenger failed to move events in progress back to wait_to_send list"); |
AzureIoTClient | 41:71c01aa3df1a | 1733 | |
AzureIoTClient | 41:71c01aa3df1a | 1734 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_163: [If not all items from `instance->in_progress_list` can be moved back to `instance->wait_to_send_list`, `instance->state` shall be set to TELEMETRY_MESSENGER_STATE_ERROR, and `instance->on_state_changed_callback` invoked] |
AzureIoTClient | 41:71c01aa3df1a | 1735 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1736 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 1737 | } |
AzureIoTClient | 41:71c01aa3df1a | 1738 | else |
AzureIoTClient | 41:71c01aa3df1a | 1739 | { |
AzureIoTClient | 41:71c01aa3df1a | 1740 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_164: [If all items get successfuly moved back to `instance->wait_to_send_list`, `instance->state` shall be set to TELEMETRY_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked] |
AzureIoTClient | 41:71c01aa3df1a | 1741 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STOPPED); |
AzureIoTClient | 41:71c01aa3df1a | 1742 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 1743 | } |
AzureIoTClient | 41:71c01aa3df1a | 1744 | } |
AzureIoTClient | 41:71c01aa3df1a | 1745 | } |
AzureIoTClient | 36:f78f9a56869e | 1746 | |
AzureIoTClient | 41:71c01aa3df1a | 1747 | return result; |
AzureIoTClient | 36:f78f9a56869e | 1748 | } |
AzureIoTClient | 36:f78f9a56869e | 1749 | |
AzureIoTClient | 36:f78f9a56869e | 1750 | // @brief |
AzureIoTClient | 36:f78f9a56869e | 1751 | // Sets the messenger module state based on the state changes from messagesender and messagereceiver |
AzureIoTClient | 36:f78f9a56869e | 1752 | static void process_state_changes(TELEMETRY_MESSENGER_INSTANCE* instance) |
AzureIoTClient | 36:f78f9a56869e | 1753 | { |
AzureIoTClient | 41:71c01aa3df1a | 1754 | // Note: messagesender and messagereceiver are still not created or already destroyed |
AzureIoTClient | 41:71c01aa3df1a | 1755 | // when state is TELEMETRY_MESSENGER_STATE_STOPPED, so no checking is needed there. |
AzureIoTClient | 36:f78f9a56869e | 1756 | |
AzureIoTClient | 41:71c01aa3df1a | 1757 | if (instance->state == TELEMETRY_MESSENGER_STATE_STARTED) |
AzureIoTClient | 41:71c01aa3df1a | 1758 | { |
AzureIoTClient | 41:71c01aa3df1a | 1759 | if (instance->message_sender_current_state != MESSAGE_SENDER_STATE_OPEN) |
AzureIoTClient | 41:71c01aa3df1a | 1760 | { |
AzureIoTClient | 41:71c01aa3df1a | 1761 | LogError("messagesender reported unexpected state %d while messenger was started", instance->message_sender_current_state); |
AzureIoTClient | 41:71c01aa3df1a | 1762 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1763 | } |
AzureIoTClient | 41:71c01aa3df1a | 1764 | else if (instance->message_receiver != NULL && instance->message_receiver_current_state != MESSAGE_RECEIVER_STATE_OPEN) |
AzureIoTClient | 41:71c01aa3df1a | 1765 | { |
AzureIoTClient | 41:71c01aa3df1a | 1766 | if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_OPENING) |
AzureIoTClient | 41:71c01aa3df1a | 1767 | { |
AzureIoTClient | 41:71c01aa3df1a | 1768 | int is_timed_out; |
AzureIoTClient | 41:71c01aa3df1a | 1769 | if (is_timeout_reached(instance->last_message_receiver_state_change_time, MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1770 | { |
AzureIoTClient | 41:71c01aa3df1a | 1771 | LogError("messenger got an error (failed to verify messagereceiver start timeout)"); |
AzureIoTClient | 41:71c01aa3df1a | 1772 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1773 | } |
AzureIoTClient | 41:71c01aa3df1a | 1774 | else if (is_timed_out == 1) |
AzureIoTClient | 41:71c01aa3df1a | 1775 | { |
AzureIoTClient | 41:71c01aa3df1a | 1776 | LogError("messenger got an error (messagereceiver failed to start within expected timeout (%d secs))", MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS); |
AzureIoTClient | 41:71c01aa3df1a | 1777 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1778 | } |
AzureIoTClient | 41:71c01aa3df1a | 1779 | } |
AzureIoTClient | 41:71c01aa3df1a | 1780 | else if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_ERROR || |
AzureIoTClient | 41:71c01aa3df1a | 1781 | instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_IDLE) |
AzureIoTClient | 41:71c01aa3df1a | 1782 | { |
AzureIoTClient | 41:71c01aa3df1a | 1783 | LogError("messagereceiver reported unexpected state %d while messenger is starting", instance->message_receiver_current_state); |
AzureIoTClient | 41:71c01aa3df1a | 1784 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1785 | } |
AzureIoTClient | 41:71c01aa3df1a | 1786 | } |
AzureIoTClient | 41:71c01aa3df1a | 1787 | } |
AzureIoTClient | 41:71c01aa3df1a | 1788 | else |
AzureIoTClient | 41:71c01aa3df1a | 1789 | { |
AzureIoTClient | 41:71c01aa3df1a | 1790 | if (instance->state == TELEMETRY_MESSENGER_STATE_STARTING) |
AzureIoTClient | 41:71c01aa3df1a | 1791 | { |
AzureIoTClient | 41:71c01aa3df1a | 1792 | if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPEN) |
AzureIoTClient | 41:71c01aa3df1a | 1793 | { |
AzureIoTClient | 41:71c01aa3df1a | 1794 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STARTED); |
AzureIoTClient | 41:71c01aa3df1a | 1795 | } |
AzureIoTClient | 41:71c01aa3df1a | 1796 | else if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPENING) |
AzureIoTClient | 41:71c01aa3df1a | 1797 | { |
AzureIoTClient | 41:71c01aa3df1a | 1798 | int is_timed_out; |
AzureIoTClient | 41:71c01aa3df1a | 1799 | if (is_timeout_reached(instance->last_message_sender_state_change_time, MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1800 | { |
AzureIoTClient | 41:71c01aa3df1a | 1801 | LogError("messenger failed to start (failed to verify messagesender start timeout)"); |
AzureIoTClient | 41:71c01aa3df1a | 1802 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1803 | } |
AzureIoTClient | 41:71c01aa3df1a | 1804 | else if (is_timed_out == 1) |
AzureIoTClient | 41:71c01aa3df1a | 1805 | { |
AzureIoTClient | 41:71c01aa3df1a | 1806 | LogError("messenger failed to start (messagesender failed to start within expected timeout (%d secs))", MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS); |
AzureIoTClient | 41:71c01aa3df1a | 1807 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1808 | } |
AzureIoTClient | 41:71c01aa3df1a | 1809 | } |
AzureIoTClient | 41:71c01aa3df1a | 1810 | // For this module, the only valid scenario where messagesender state is IDLE is if |
AzureIoTClient | 41:71c01aa3df1a | 1811 | // the messagesender hasn't been created yet or already destroyed. |
AzureIoTClient | 41:71c01aa3df1a | 1812 | else if ((instance->message_sender_current_state == MESSAGE_SENDER_STATE_ERROR) || |
AzureIoTClient | 41:71c01aa3df1a | 1813 | (instance->message_sender_current_state == MESSAGE_SENDER_STATE_CLOSING) || |
AzureIoTClient | 41:71c01aa3df1a | 1814 | (instance->message_sender_current_state == MESSAGE_SENDER_STATE_IDLE && instance->message_sender != NULL)) |
AzureIoTClient | 41:71c01aa3df1a | 1815 | { |
AzureIoTClient | 41:71c01aa3df1a | 1816 | LogError("messagesender reported unexpected state %d while messenger is starting", instance->message_sender_current_state); |
AzureIoTClient | 41:71c01aa3df1a | 1817 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1818 | } |
AzureIoTClient | 41:71c01aa3df1a | 1819 | } |
AzureIoTClient | 41:71c01aa3df1a | 1820 | // message sender and receiver are stopped/destroyed synchronously, so no need for state control. |
AzureIoTClient | 41:71c01aa3df1a | 1821 | } |
AzureIoTClient | 36:f78f9a56869e | 1822 | } |
AzureIoTClient | 36:f78f9a56869e | 1823 | |
AzureIoTClient | 36:f78f9a56869e | 1824 | void telemetry_messenger_do_work(TELEMETRY_MESSENGER_HANDLE messenger_handle) |
AzureIoTClient | 36:f78f9a56869e | 1825 | { |
AzureIoTClient | 41:71c01aa3df1a | 1826 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_065: [If `messenger_handle` is NULL, telemetry_messenger_do_work() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 1827 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1828 | { |
AzureIoTClient | 41:71c01aa3df1a | 1829 | LogError("telemetry_messenger_do_work failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1830 | } |
AzureIoTClient | 41:71c01aa3df1a | 1831 | else |
AzureIoTClient | 41:71c01aa3df1a | 1832 | { |
AzureIoTClient | 41:71c01aa3df1a | 1833 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1834 | |
AzureIoTClient | 41:71c01aa3df1a | 1835 | process_state_changes(instance); |
AzureIoTClient | 36:f78f9a56869e | 1836 | |
AzureIoTClient | 41:71c01aa3df1a | 1837 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_151: [If `instance->state` is TELEMETRY_MESSENGER_STATE_STARTING, telemetry_messenger_do_work() shall create and open `instance->message_sender`] |
AzureIoTClient | 41:71c01aa3df1a | 1838 | if (instance->state == TELEMETRY_MESSENGER_STATE_STARTING) |
AzureIoTClient | 41:71c01aa3df1a | 1839 | { |
AzureIoTClient | 41:71c01aa3df1a | 1840 | if (instance->message_sender == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1841 | { |
AzureIoTClient | 41:71c01aa3df1a | 1842 | if (create_event_sender(instance) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1843 | { |
AzureIoTClient | 41:71c01aa3df1a | 1844 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1845 | } |
AzureIoTClient | 41:71c01aa3df1a | 1846 | } |
AzureIoTClient | 41:71c01aa3df1a | 1847 | } |
AzureIoTClient | 41:71c01aa3df1a | 1848 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_066: [If `instance->state` is not TELEMETRY_MESSENGER_STATE_STARTED, telemetry_messenger_do_work() shall return] |
AzureIoTClient | 41:71c01aa3df1a | 1849 | else if (instance->state == TELEMETRY_MESSENGER_STATE_STARTED) |
AzureIoTClient | 41:71c01aa3df1a | 1850 | { |
AzureIoTClient | 41:71c01aa3df1a | 1851 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_067: [If `instance->receive_messages` is true and `instance->message_receiver` is NULL, a message_receiver shall be created] |
AzureIoTClient | 41:71c01aa3df1a | 1852 | if (instance->receive_messages == true && |
AzureIoTClient | 41:71c01aa3df1a | 1853 | instance->message_receiver == NULL && |
AzureIoTClient | 41:71c01aa3df1a | 1854 | create_message_receiver(instance) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1855 | { |
AzureIoTClient | 41:71c01aa3df1a | 1856 | LogError("telemetry_messenger_do_work warning (failed creating the message receiver [%s])", STRING_c_str(instance->device_id)); |
AzureIoTClient | 41:71c01aa3df1a | 1857 | } |
AzureIoTClient | 41:71c01aa3df1a | 1858 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_092: [If `instance->receive_messages` is false and `instance->message_receiver` is not NULL, it shall be destroyed] |
AzureIoTClient | 41:71c01aa3df1a | 1859 | else if (instance->receive_messages == false && instance->message_receiver != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1860 | { |
AzureIoTClient | 41:71c01aa3df1a | 1861 | destroy_message_receiver(instance); |
AzureIoTClient | 41:71c01aa3df1a | 1862 | } |
AzureIoTClient | 36:f78f9a56869e | 1863 | |
AzureIoTClient | 41:71c01aa3df1a | 1864 | if (process_event_send_timeouts(instance) != RESULT_OK) |
AzureIoTClient | 41:71c01aa3df1a | 1865 | { |
AzureIoTClient | 41:71c01aa3df1a | 1866 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1867 | } |
AzureIoTClient | 41:71c01aa3df1a | 1868 | else if (send_pending_events(instance) != RESULT_OK && instance->event_send_retry_limit > 0) |
AzureIoTClient | 41:71c01aa3df1a | 1869 | { |
AzureIoTClient | 41:71c01aa3df1a | 1870 | instance->event_send_error_count++; |
AzureIoTClient | 36:f78f9a56869e | 1871 | |
AzureIoTClient | 41:71c01aa3df1a | 1872 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_161: [If telemetry_messenger_do_work() fail sending events for `instance->event_send_retry_limit` times in a row, it shall invoke `instance->on_state_changed_callback`, if provided, with error code TELEMETRY_MESSENGER_STATE_ERROR] |
AzureIoTClient | 41:71c01aa3df1a | 1873 | if (instance->event_send_error_count >= instance->event_send_retry_limit) |
AzureIoTClient | 41:71c01aa3df1a | 1874 | { |
AzureIoTClient | 41:71c01aa3df1a | 1875 | LogError("telemetry_messenger_do_work failed (failed sending events; reached max number of consecutive attempts)"); |
AzureIoTClient | 41:71c01aa3df1a | 1876 | update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR); |
AzureIoTClient | 41:71c01aa3df1a | 1877 | } |
AzureIoTClient | 41:71c01aa3df1a | 1878 | } |
AzureIoTClient | 41:71c01aa3df1a | 1879 | else |
AzureIoTClient | 41:71c01aa3df1a | 1880 | { |
AzureIoTClient | 41:71c01aa3df1a | 1881 | instance->event_send_error_count = 0; |
AzureIoTClient | 41:71c01aa3df1a | 1882 | } |
AzureIoTClient | 41:71c01aa3df1a | 1883 | } |
AzureIoTClient | 41:71c01aa3df1a | 1884 | } |
AzureIoTClient | 36:f78f9a56869e | 1885 | } |
AzureIoTClient | 36:f78f9a56869e | 1886 | |
AzureIoTClient | 36:f78f9a56869e | 1887 | void telemetry_messenger_destroy(TELEMETRY_MESSENGER_HANDLE messenger_handle) |
AzureIoTClient | 36:f78f9a56869e | 1888 | { |
AzureIoTClient | 41:71c01aa3df1a | 1889 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_109: [If `messenger_handle` is NULL, telemetry_messenger_destroy() shall fail and return] |
AzureIoTClient | 41:71c01aa3df1a | 1890 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1891 | { |
AzureIoTClient | 41:71c01aa3df1a | 1892 | LogError("telemetry_messenger_destroy failed (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1893 | } |
AzureIoTClient | 41:71c01aa3df1a | 1894 | else |
AzureIoTClient | 41:71c01aa3df1a | 1895 | { |
AzureIoTClient | 41:71c01aa3df1a | 1896 | LIST_ITEM_HANDLE list_node; |
AzureIoTClient | 41:71c01aa3df1a | 1897 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 1898 | |
AzureIoTClient | 41:71c01aa3df1a | 1899 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_110: [If the `instance->state` is not TELEMETRY_MESSENGER_STATE_STOPPED, telemetry_messenger_destroy() shall invoke telemetry_messenger_stop()] |
AzureIoTClient | 41:71c01aa3df1a | 1900 | if (instance->state != TELEMETRY_MESSENGER_STATE_STOPPED) |
AzureIoTClient | 41:71c01aa3df1a | 1901 | { |
AzureIoTClient | 41:71c01aa3df1a | 1902 | (void)telemetry_messenger_stop(messenger_handle); |
AzureIoTClient | 41:71c01aa3df1a | 1903 | } |
AzureIoTClient | 36:f78f9a56869e | 1904 | |
AzureIoTClient | 41:71c01aa3df1a | 1905 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_111: [All elements of `instance->in_progress_list` and `instance->wait_to_send_list` shall be removed, invoking `task->on_event_send_complete_callback` for each with EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED] |
AzureIoTClient | 36:f78f9a56869e | 1906 | |
AzureIoTClient | 41:71c01aa3df1a | 1907 | // Note: yes telemetry_messenger_stop() tried to move all events from in_progress_list to wait_to_send_list, |
AzureIoTClient | 41:71c01aa3df1a | 1908 | // but we need to iterate through in case any events failed to be moved. |
AzureIoTClient | 41:71c01aa3df1a | 1909 | while ((list_node = singlylinkedlist_get_head_item(instance->in_progress_list)) != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1910 | { |
AzureIoTClient | 41:71c01aa3df1a | 1911 | MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_node); |
AzureIoTClient | 36:f78f9a56869e | 1912 | |
AzureIoTClient | 41:71c01aa3df1a | 1913 | (void)singlylinkedlist_remove(instance->in_progress_list, list_node); |
AzureIoTClient | 36:f78f9a56869e | 1914 | |
AzureIoTClient | 41:71c01aa3df1a | 1915 | if (task != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1916 | { |
AzureIoTClient | 41:71c01aa3df1a | 1917 | singlylinkedlist_foreach(task->callback_list, invoke_callback, (void*)TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED); |
AzureIoTClient | 41:71c01aa3df1a | 1918 | free_task(task); |
AzureIoTClient | 41:71c01aa3df1a | 1919 | } |
AzureIoTClient | 41:71c01aa3df1a | 1920 | } |
AzureIoTClient | 36:f78f9a56869e | 1921 | |
AzureIoTClient | 41:71c01aa3df1a | 1922 | while ((list_node = singlylinkedlist_get_head_item(instance->waiting_to_send)) != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1923 | { |
AzureIoTClient | 41:71c01aa3df1a | 1924 | MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_node); |
AzureIoTClient | 36:f78f9a56869e | 1925 | |
AzureIoTClient | 41:71c01aa3df1a | 1926 | (void)singlylinkedlist_remove(instance->waiting_to_send, list_node); |
AzureIoTClient | 36:f78f9a56869e | 1927 | |
AzureIoTClient | 41:71c01aa3df1a | 1928 | if (caller_info != NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1929 | { |
AzureIoTClient | 41:71c01aa3df1a | 1930 | caller_info->on_event_send_complete_callback(caller_info->message, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED, (void*)caller_info->context); |
AzureIoTClient | 41:71c01aa3df1a | 1931 | free(caller_info); |
AzureIoTClient | 41:71c01aa3df1a | 1932 | } |
AzureIoTClient | 41:71c01aa3df1a | 1933 | } |
AzureIoTClient | 36:f78f9a56869e | 1934 | |
AzureIoTClient | 41:71c01aa3df1a | 1935 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_150: [`instance->in_progress_list` and `instance->wait_to_send_list` shall be destroyed using singlylinkedlist_destroy()] |
AzureIoTClient | 41:71c01aa3df1a | 1936 | singlylinkedlist_destroy(instance->waiting_to_send); |
AzureIoTClient | 41:71c01aa3df1a | 1937 | singlylinkedlist_destroy(instance->in_progress_list); |
AzureIoTClient | 36:f78f9a56869e | 1938 | |
AzureIoTClient | 41:71c01aa3df1a | 1939 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_112: [`instance->iothub_host_fqdn` shall be destroyed using STRING_delete()] |
AzureIoTClient | 41:71c01aa3df1a | 1940 | STRING_delete(instance->iothub_host_fqdn); |
AzureIoTClient | 41:71c01aa3df1a | 1941 | |
AzureIoTClient | 41:71c01aa3df1a | 1942 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_113: [`instance->device_id` shall be destroyed using STRING_delete()] |
AzureIoTClient | 41:71c01aa3df1a | 1943 | STRING_delete(instance->device_id); |
AzureIoTClient | 36:f78f9a56869e | 1944 | |
AzureIoTClient | 36:f78f9a56869e | 1945 | STRING_delete(instance->product_info); |
AzureIoTClient | 36:f78f9a56869e | 1946 | |
AzureIoTClient | 41:71c01aa3df1a | 1947 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_114: [telemetry_messenger_destroy() shall destroy `instance` with free()] |
AzureIoTClient | 41:71c01aa3df1a | 1948 | (void)free(instance); |
AzureIoTClient | 41:71c01aa3df1a | 1949 | } |
AzureIoTClient | 36:f78f9a56869e | 1950 | } |
AzureIoTClient | 36:f78f9a56869e | 1951 | |
AzureIoTClient | 36:f78f9a56869e | 1952 | TELEMETRY_MESSENGER_HANDLE telemetry_messenger_create(const TELEMETRY_MESSENGER_CONFIG* messenger_config, const char* product_info) |
AzureIoTClient | 36:f78f9a56869e | 1953 | { |
AzureIoTClient | 41:71c01aa3df1a | 1954 | TELEMETRY_MESSENGER_HANDLE handle; |
AzureIoTClient | 36:f78f9a56869e | 1955 | |
AzureIoTClient | 41:71c01aa3df1a | 1956 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_001: [If parameter `messenger_config` is NULL, telemetry_messenger_create() shall return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 1957 | if (messenger_config == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1958 | { |
AzureIoTClient | 41:71c01aa3df1a | 1959 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1960 | LogError("telemetry_messenger_create failed (messenger_config is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1961 | } |
AzureIoTClient | 41:71c01aa3df1a | 1962 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_002: [If `messenger_config->device_id` is NULL, telemetry_messenger_create() shall return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 1963 | else if (messenger_config->device_id == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1964 | { |
AzureIoTClient | 41:71c01aa3df1a | 1965 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1966 | LogError("telemetry_messenger_create failed (messenger_config->device_id is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1967 | } |
AzureIoTClient | 41:71c01aa3df1a | 1968 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_003: [If `messenger_config->iothub_host_fqdn` is NULL, telemetry_messenger_create() shall return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 1969 | else if (messenger_config->iothub_host_fqdn == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1970 | { |
AzureIoTClient | 41:71c01aa3df1a | 1971 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1972 | LogError("telemetry_messenger_create failed (messenger_config->iothub_host_fqdn is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1973 | } |
AzureIoTClient | 41:71c01aa3df1a | 1974 | else |
AzureIoTClient | 41:71c01aa3df1a | 1975 | { |
AzureIoTClient | 41:71c01aa3df1a | 1976 | TELEMETRY_MESSENGER_INSTANCE* instance; |
AzureIoTClient | 36:f78f9a56869e | 1977 | |
AzureIoTClient | 41:71c01aa3df1a | 1978 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_006: [telemetry_messenger_create() shall allocate memory for the messenger instance structure (aka `instance`)] |
AzureIoTClient | 41:71c01aa3df1a | 1979 | if ((instance = (TELEMETRY_MESSENGER_INSTANCE*)malloc(sizeof(TELEMETRY_MESSENGER_INSTANCE))) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 1980 | { |
AzureIoTClient | 41:71c01aa3df1a | 1981 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_007: [If malloc() fails, telemetry_messenger_create() shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 1982 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 1983 | LogError("telemetry_messenger_create failed (messenger_config->wait_to_send_list is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 1984 | } |
AzureIoTClient | 41:71c01aa3df1a | 1985 | else |
AzureIoTClient | 41:71c01aa3df1a | 1986 | { |
AzureIoTClient | 41:71c01aa3df1a | 1987 | memset(instance, 0, sizeof(TELEMETRY_MESSENGER_INSTANCE)); |
AzureIoTClient | 41:71c01aa3df1a | 1988 | instance->state = TELEMETRY_MESSENGER_STATE_STOPPED; |
AzureIoTClient | 41:71c01aa3df1a | 1989 | instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE; |
AzureIoTClient | 41:71c01aa3df1a | 1990 | instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE; |
AzureIoTClient | 41:71c01aa3df1a | 1991 | instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE; |
AzureIoTClient | 41:71c01aa3df1a | 1992 | instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE; |
AzureIoTClient | 41:71c01aa3df1a | 1993 | instance->event_send_retry_limit = DEFAULT_EVENT_SEND_RETRY_LIMIT; |
AzureIoTClient | 41:71c01aa3df1a | 1994 | instance->event_send_timeout_secs = DEFAULT_EVENT_SEND_TIMEOUT_SECS; |
AzureIoTClient | 41:71c01aa3df1a | 1995 | instance->last_message_sender_state_change_time = INDEFINITE_TIME; |
AzureIoTClient | 41:71c01aa3df1a | 1996 | instance->last_message_receiver_state_change_time = INDEFINITE_TIME; |
AzureIoTClient | 36:f78f9a56869e | 1997 | |
AzureIoTClient | 41:71c01aa3df1a | 1998 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_008: [telemetry_messenger_create() shall save a copy of `messenger_config->device_id` into `instance->device_id`] |
AzureIoTClient | 41:71c01aa3df1a | 1999 | if ((instance->device_id = STRING_construct(messenger_config->device_id)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2000 | { |
AzureIoTClient | 41:71c01aa3df1a | 2001 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_009: [If STRING_construct() fails, telemetry_messenger_create() shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 2002 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2003 | LogError("telemetry_messenger_create failed (device_id could not be copied; STRING_construct failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 2004 | } |
AzureIoTClient | 36:f78f9a56869e | 2005 | else if ((instance->product_info = STRING_construct(product_info)) == NULL) |
AzureIoTClient | 36:f78f9a56869e | 2006 | { |
AzureIoTClient | 36:f78f9a56869e | 2007 | handle = NULL; |
AzureIoTClient | 36:f78f9a56869e | 2008 | LogError("telemetry_messenger_create failed (product_info could not be copied; STRING_construct failed)"); |
AzureIoTClient | 36:f78f9a56869e | 2009 | } |
AzureIoTClient | 41:71c01aa3df1a | 2010 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_010: [telemetry_messenger_create() shall save a copy of `messenger_config->iothub_host_fqdn` into `instance->iothub_host_fqdn`] |
AzureIoTClient | 41:71c01aa3df1a | 2011 | else if ((instance->iothub_host_fqdn = STRING_construct(messenger_config->iothub_host_fqdn)) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2012 | { |
AzureIoTClient | 41:71c01aa3df1a | 2013 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_011: [If STRING_construct() fails, telemetry_messenger_create() shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 2014 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2015 | LogError("telemetry_messenger_create failed (iothub_host_fqdn could not be copied; STRING_construct failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 2016 | } |
AzureIoTClient | 41:71c01aa3df1a | 2017 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_165: [`instance->wait_to_send_list` shall be set using singlylinkedlist_create()] |
AzureIoTClient | 41:71c01aa3df1a | 2018 | else if ((instance->waiting_to_send = singlylinkedlist_create()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2019 | { |
AzureIoTClient | 41:71c01aa3df1a | 2020 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_166: [If singlylinkedlist_create() fails, telemetry_messenger_create() shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 2021 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2022 | LogError("telemetry_messenger_create failed (singlylinkedlist_create failed to create wait_to_send_list)"); |
AzureIoTClient | 41:71c01aa3df1a | 2023 | } |
AzureIoTClient | 41:71c01aa3df1a | 2024 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [`instance->in_progress_list` shall be set using singlylinkedlist_create()] |
AzureIoTClient | 41:71c01aa3df1a | 2025 | else if ((instance->in_progress_list = singlylinkedlist_create()) == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2026 | { |
AzureIoTClient | 41:71c01aa3df1a | 2027 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_133: [If singlylinkedlist_create() fails, telemetry_messenger_create() shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 2028 | handle = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2029 | LogError("telemetry_messenger_create failed (singlylinkedlist_create failed to create in_progress_list)"); |
AzureIoTClient | 41:71c01aa3df1a | 2030 | } |
AzureIoTClient | 41:71c01aa3df1a | 2031 | else |
AzureIoTClient | 41:71c01aa3df1a | 2032 | { |
AzureIoTClient | 41:71c01aa3df1a | 2033 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_013: [`messenger_config->on_state_changed_callback` shall be saved into `instance->on_state_changed_callback`] |
AzureIoTClient | 41:71c01aa3df1a | 2034 | instance->on_state_changed_callback = messenger_config->on_state_changed_callback; |
AzureIoTClient | 36:f78f9a56869e | 2035 | |
AzureIoTClient | 41:71c01aa3df1a | 2036 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_014: [`messenger_config->on_state_changed_context` shall be saved into `instance->on_state_changed_context`] |
AzureIoTClient | 41:71c01aa3df1a | 2037 | instance->on_state_changed_context = messenger_config->on_state_changed_context; |
AzureIoTClient | 36:f78f9a56869e | 2038 | |
AzureIoTClient | 41:71c01aa3df1a | 2039 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_015: [If no failures occurr, telemetry_messenger_create() shall return a handle to `instance`] |
AzureIoTClient | 41:71c01aa3df1a | 2040 | handle = (TELEMETRY_MESSENGER_HANDLE)instance; |
AzureIoTClient | 41:71c01aa3df1a | 2041 | } |
AzureIoTClient | 41:71c01aa3df1a | 2042 | } |
AzureIoTClient | 36:f78f9a56869e | 2043 | |
AzureIoTClient | 41:71c01aa3df1a | 2044 | if (handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2045 | { |
AzureIoTClient | 41:71c01aa3df1a | 2046 | telemetry_messenger_destroy((TELEMETRY_MESSENGER_HANDLE)instance); |
AzureIoTClient | 41:71c01aa3df1a | 2047 | } |
AzureIoTClient | 41:71c01aa3df1a | 2048 | } |
AzureIoTClient | 36:f78f9a56869e | 2049 | |
AzureIoTClient | 41:71c01aa3df1a | 2050 | return handle; |
AzureIoTClient | 36:f78f9a56869e | 2051 | } |
AzureIoTClient | 36:f78f9a56869e | 2052 | |
AzureIoTClient | 36:f78f9a56869e | 2053 | int telemetry_messenger_set_option(TELEMETRY_MESSENGER_HANDLE messenger_handle, const char* name, void* value) |
AzureIoTClient | 36:f78f9a56869e | 2054 | { |
AzureIoTClient | 41:71c01aa3df1a | 2055 | int result; |
AzureIoTClient | 36:f78f9a56869e | 2056 | |
AzureIoTClient | 41:71c01aa3df1a | 2057 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_167: [If `messenger_handle` or `name` or `value` is NULL, telemetry_messenger_set_option shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 2058 | if (messenger_handle == NULL || name == NULL || value == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2059 | { |
AzureIoTClient | 41:71c01aa3df1a | 2060 | LogError("telemetry_messenger_set_option failed (one of the followin are NULL: messenger_handle=%p, name=%p, value=%p)", |
AzureIoTClient | 41:71c01aa3df1a | 2061 | messenger_handle, name, value); |
AzureIoTClient | 41:71c01aa3df1a | 2062 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 2063 | } |
AzureIoTClient | 41:71c01aa3df1a | 2064 | else |
AzureIoTClient | 41:71c01aa3df1a | 2065 | { |
AzureIoTClient | 41:71c01aa3df1a | 2066 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 2067 | |
AzureIoTClient | 51:269e65571b39 | 2068 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_168: [If name matches TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, `value` shall be saved on `instance->event_send_timeout_secs`] |
AzureIoTClient | 51:269e65571b39 | 2069 | if (strcmp(TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0) |
AzureIoTClient | 41:71c01aa3df1a | 2070 | { |
AzureIoTClient | 41:71c01aa3df1a | 2071 | instance->event_send_timeout_secs = *((size_t*)value); |
AzureIoTClient | 41:71c01aa3df1a | 2072 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 2073 | } |
AzureIoTClient | 51:269e65571b39 | 2074 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_169: [If name matches TELEMETRY_MESSENGER_OPTION_SAVED_OPTIONS, `value` shall be applied using OptionHandler_FeedOptions] |
AzureIoTClient | 51:269e65571b39 | 2075 | else if (strcmp(TELEMETRY_MESSENGER_OPTION_SAVED_OPTIONS, name) == 0) |
AzureIoTClient | 41:71c01aa3df1a | 2076 | { |
AzureIoTClient | 41:71c01aa3df1a | 2077 | if (OptionHandler_FeedOptions((OPTIONHANDLER_HANDLE)value, messenger_handle) != OPTIONHANDLER_OK) |
AzureIoTClient | 41:71c01aa3df1a | 2078 | { |
AzureIoTClient | 41:71c01aa3df1a | 2079 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_170: [If OptionHandler_FeedOptions fails, telemetry_messenger_set_option shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 2080 | LogError("telemetry_messenger_set_option failed (OptionHandler_FeedOptions failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 2081 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 2082 | } |
AzureIoTClient | 41:71c01aa3df1a | 2083 | else |
AzureIoTClient | 41:71c01aa3df1a | 2084 | { |
AzureIoTClient | 41:71c01aa3df1a | 2085 | result = RESULT_OK; |
AzureIoTClient | 41:71c01aa3df1a | 2086 | } |
AzureIoTClient | 41:71c01aa3df1a | 2087 | } |
AzureIoTClient | 41:71c01aa3df1a | 2088 | else |
AzureIoTClient | 41:71c01aa3df1a | 2089 | { |
AzureIoTClient | 41:71c01aa3df1a | 2090 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_171: [If name does not match any supported option, authentication_set_option shall fail and return a non-zero value] |
AzureIoTClient | 41:71c01aa3df1a | 2091 | LogError("telemetry_messenger_set_option failed (option with name '%s' is not suppported)", name); |
AzureIoTClient | 41:71c01aa3df1a | 2092 | result = __FAILURE__; |
AzureIoTClient | 41:71c01aa3df1a | 2093 | } |
AzureIoTClient | 41:71c01aa3df1a | 2094 | } |
AzureIoTClient | 36:f78f9a56869e | 2095 | |
AzureIoTClient | 41:71c01aa3df1a | 2096 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_172: [If no errors occur, telemetry_messenger_set_option shall return 0] |
AzureIoTClient | 41:71c01aa3df1a | 2097 | return result; |
AzureIoTClient | 36:f78f9a56869e | 2098 | } |
AzureIoTClient | 36:f78f9a56869e | 2099 | |
AzureIoTClient | 36:f78f9a56869e | 2100 | OPTIONHANDLER_HANDLE telemetry_messenger_retrieve_options(TELEMETRY_MESSENGER_HANDLE messenger_handle) |
AzureIoTClient | 36:f78f9a56869e | 2101 | { |
AzureIoTClient | 41:71c01aa3df1a | 2102 | OPTIONHANDLER_HANDLE result; |
AzureIoTClient | 36:f78f9a56869e | 2103 | |
AzureIoTClient | 41:71c01aa3df1a | 2104 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_173: [If `messenger_handle` is NULL, telemetry_messenger_retrieve_options shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 2105 | if (messenger_handle == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2106 | { |
AzureIoTClient | 41:71c01aa3df1a | 2107 | LogError("Failed to retrieve options from messenger instance (messenger_handle is NULL)"); |
AzureIoTClient | 41:71c01aa3df1a | 2108 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2109 | } |
AzureIoTClient | 41:71c01aa3df1a | 2110 | else |
AzureIoTClient | 41:71c01aa3df1a | 2111 | { |
AzureIoTClient | 41:71c01aa3df1a | 2112 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_174: [An OPTIONHANDLER_HANDLE instance shall be created using OptionHandler_Create] |
AzureIoTClient | 41:71c01aa3df1a | 2113 | OPTIONHANDLER_HANDLE options = OptionHandler_Create(telemetry_messenger_clone_option, telemetry_messenger_destroy_option, (pfSetOption)telemetry_messenger_set_option); |
AzureIoTClient | 36:f78f9a56869e | 2114 | |
AzureIoTClient | 41:71c01aa3df1a | 2115 | if (options == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2116 | { |
AzureIoTClient | 41:71c01aa3df1a | 2117 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_175: [If an OPTIONHANDLER_HANDLE instance fails to be created, telemetry_messenger_retrieve_options shall fail and return NULL] |
AzureIoTClient | 41:71c01aa3df1a | 2118 | LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed)"); |
AzureIoTClient | 41:71c01aa3df1a | 2119 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2120 | } |
AzureIoTClient | 41:71c01aa3df1a | 2121 | else |
AzureIoTClient | 41:71c01aa3df1a | 2122 | { |
AzureIoTClient | 41:71c01aa3df1a | 2123 | TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle; |
AzureIoTClient | 36:f78f9a56869e | 2124 | |
AzureIoTClient | 41:71c01aa3df1a | 2125 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_176: [Each option of `instance` shall be added to the OPTIONHANDLER_HANDLE instance using OptionHandler_AddOption] |
AzureIoTClient | 41:71c01aa3df1a | 2126 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_177: [If OptionHandler_AddOption fails, telemetry_messenger_retrieve_options shall fail and return NULL] |
AzureIoTClient | 51:269e65571b39 | 2127 | if (OptionHandler_AddOption(options, TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, (void*)&instance->event_send_timeout_secs) != OPTIONHANDLER_OK) |
AzureIoTClient | 41:71c01aa3df1a | 2128 | { |
AzureIoTClient | 51:269e65571b39 | 2129 | LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed for option '%s')", TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS); |
AzureIoTClient | 41:71c01aa3df1a | 2130 | result = NULL; |
AzureIoTClient | 41:71c01aa3df1a | 2131 | } |
AzureIoTClient | 41:71c01aa3df1a | 2132 | else |
AzureIoTClient | 41:71c01aa3df1a | 2133 | { |
AzureIoTClient | 41:71c01aa3df1a | 2134 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_179: [If no failures occur, telemetry_messenger_retrieve_options shall return the OPTIONHANDLER_HANDLE instance] |
AzureIoTClient | 41:71c01aa3df1a | 2135 | result = options; |
AzureIoTClient | 41:71c01aa3df1a | 2136 | } |
AzureIoTClient | 36:f78f9a56869e | 2137 | |
AzureIoTClient | 41:71c01aa3df1a | 2138 | if (result == NULL) |
AzureIoTClient | 41:71c01aa3df1a | 2139 | { |
AzureIoTClient | 41:71c01aa3df1a | 2140 | // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_178: [If telemetry_messenger_retrieve_options fails, any allocated memory shall be freed] |
AzureIoTClient | 41:71c01aa3df1a | 2141 | OptionHandler_Destroy(options); |
AzureIoTClient | 41:71c01aa3df1a | 2142 | } |
AzureIoTClient | 41:71c01aa3df1a | 2143 | } |
AzureIoTClient | 41:71c01aa3df1a | 2144 | } |
AzureIoTClient | 36:f78f9a56869e | 2145 | |
AzureIoTClient | 41:71c01aa3df1a | 2146 | return result; |
AzureIoTClient | 36:f78f9a56869e | 2147 | } |