Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Committer:
AzureIoTClient
Date:
Thu Oct 04 09:14:47 2018 -0700
Revision:
57:56ac1346c70d
Parent:
56:8704100b3b54
1.2.10

Who changed what in which revision?

UserRevisionLine numberNew contents of line
AzureIoTClient 36:f78f9a56869e 1 // Copyright (c) Microsoft. All rights reserved.
AzureIoTClient 36:f78f9a56869e 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
AzureIoTClient 36:f78f9a56869e 3
AzureIoTClient 36:f78f9a56869e 4 #include <stdlib.h>
AzureIoTClient 36:f78f9a56869e 5 #include <stdbool.h>
AzureIoTClient 36:f78f9a56869e 6 #include "azure_c_shared_utility/optimize_size.h"
AzureIoTClient 36:f78f9a56869e 7 #include "azure_c_shared_utility/crt_abstractions.h"
AzureIoTClient 36:f78f9a56869e 8 #include "azure_c_shared_utility/gballoc.h"
AzureIoTClient 56:8704100b3b54 9 #include "azure_c_shared_utility/agenttime.h"
AzureIoTClient 36:f78f9a56869e 10 #include "azure_c_shared_utility/xlogging.h"
AzureIoTClient 36:f78f9a56869e 11 #include "azure_c_shared_utility/uniqueid.h"
AzureIoTClient 36:f78f9a56869e 12 #include "azure_c_shared_utility/singlylinkedlist.h"
AzureIoTClient 36:f78f9a56869e 13 #include "azure_uamqp_c/link.h"
AzureIoTClient 36:f78f9a56869e 14 #include "azure_uamqp_c/messaging.h"
AzureIoTClient 36:f78f9a56869e 15 #include "azure_uamqp_c/message_sender.h"
AzureIoTClient 36:f78f9a56869e 16 #include "azure_uamqp_c/message_receiver.h"
AzureIoTClient 53:e21e1e88460f 17 #include "internal/uamqp_messaging.h"
AzureIoTClient 53:e21e1e88460f 18 #include "internal/iothub_client_private.h"
AzureIoTClient 36:f78f9a56869e 19 #include "iothub_client_version.h"
AzureIoTClient 53:e21e1e88460f 20 #include "internal/iothubtransport_amqp_telemetry_messenger.h"
AzureIoTClient 36:f78f9a56869e 21
AzureIoTClient 36:f78f9a56869e 22 #define RESULT_OK 0
AzureIoTClient 36:f78f9a56869e 23 #define INDEFINITE_TIME ((time_t)(-1))
AzureIoTClient 36:f78f9a56869e 24
AzureIoTClient 36:f78f9a56869e 25 #define IOTHUB_DEVICES_PATH_FMT "%s/devices/%s"
AzureIoTClient 54:830550fef7ea 26 #define IOTHUB_DEVICES_MODULE_PATH_FMT "%s/devices/%s/modules/%s"
AzureIoTClient 36:f78f9a56869e 27 #define IOTHUB_EVENT_SEND_ADDRESS_FMT "amqps://%s/messages/events"
AzureIoTClient 36:f78f9a56869e 28 #define IOTHUB_MESSAGE_RECEIVE_ADDRESS_FMT "amqps://%s/messages/devicebound"
AzureIoTClient 36:f78f9a56869e 29 #define MESSAGE_SENDER_LINK_NAME_PREFIX "link-snd"
AzureIoTClient 36:f78f9a56869e 30 #define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX
AzureIoTClient 36:f78f9a56869e 31 #define MESSAGE_RECEIVER_LINK_NAME_PREFIX "link-rcv"
AzureIoTClient 36:f78f9a56869e 32 #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536
AzureIoTClient 36:f78f9a56869e 33 #define DEFAULT_EVENT_SEND_RETRY_LIMIT 10
AzureIoTClient 36:f78f9a56869e 34 #define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
AzureIoTClient 36:f78f9a56869e 35 #define MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS 300
AzureIoTClient 36:f78f9a56869e 36 #define MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS 300
AzureIoTClient 36:f78f9a56869e 37 #define UNIQUE_ID_BUFFER_SIZE 37
AzureIoTClient 36:f78f9a56869e 38 #define STRING_NULL_TERMINATOR '\0'
AzureIoTClient 41:71c01aa3df1a 39
AzureIoTClient 41:71c01aa3df1a 40 #define AMQP_BATCHING_FORMAT_CODE 0x80013700
AzureIoTClient 56:8704100b3b54 41
AzureIoTClient 36:f78f9a56869e 42 typedef struct TELEMETRY_MESSENGER_INSTANCE_TAG
AzureIoTClient 36:f78f9a56869e 43 {
AzureIoTClient 41:71c01aa3df1a 44 STRING_HANDLE device_id;
AzureIoTClient 54:830550fef7ea 45 STRING_HANDLE module_id;
AzureIoTClient 36:f78f9a56869e 46 STRING_HANDLE product_info;
AzureIoTClient 41:71c01aa3df1a 47 STRING_HANDLE iothub_host_fqdn;
AzureIoTClient 41:71c01aa3df1a 48 SINGLYLINKEDLIST_HANDLE waiting_to_send; // List of MESSENGER_SEND_EVENT_CALLER_INFORMATION's
AzureIoTClient 41:71c01aa3df1a 49 SINGLYLINKEDLIST_HANDLE in_progress_list; // List of MESSENGER_SEND_EVENT_TASK's
AzureIoTClient 41:71c01aa3df1a 50 TELEMETRY_MESSENGER_STATE state;
AzureIoTClient 56:8704100b3b54 51
AzureIoTClient 41:71c01aa3df1a 52 ON_TELEMETRY_MESSENGER_STATE_CHANGED_CALLBACK on_state_changed_callback;
AzureIoTClient 41:71c01aa3df1a 53 void* on_state_changed_context;
AzureIoTClient 36:f78f9a56869e 54
AzureIoTClient 41:71c01aa3df1a 55 bool receive_messages;
AzureIoTClient 41:71c01aa3df1a 56 ON_TELEMETRY_MESSENGER_MESSAGE_RECEIVED on_message_received_callback;
AzureIoTClient 41:71c01aa3df1a 57 void* on_message_received_context;
AzureIoTClient 36:f78f9a56869e 58
AzureIoTClient 41:71c01aa3df1a 59 SESSION_HANDLE session_handle;
AzureIoTClient 41:71c01aa3df1a 60 LINK_HANDLE sender_link;
AzureIoTClient 41:71c01aa3df1a 61 MESSAGE_SENDER_HANDLE message_sender;
AzureIoTClient 41:71c01aa3df1a 62 MESSAGE_SENDER_STATE message_sender_current_state;
AzureIoTClient 41:71c01aa3df1a 63 MESSAGE_SENDER_STATE message_sender_previous_state;
AzureIoTClient 41:71c01aa3df1a 64 LINK_HANDLE receiver_link;
AzureIoTClient 41:71c01aa3df1a 65 MESSAGE_RECEIVER_HANDLE message_receiver;
AzureIoTClient 41:71c01aa3df1a 66 MESSAGE_RECEIVER_STATE message_receiver_current_state;
AzureIoTClient 41:71c01aa3df1a 67 MESSAGE_RECEIVER_STATE message_receiver_previous_state;
AzureIoTClient 36:f78f9a56869e 68
AzureIoTClient 41:71c01aa3df1a 69 size_t event_send_retry_limit;
AzureIoTClient 41:71c01aa3df1a 70 size_t event_send_error_count;
AzureIoTClient 41:71c01aa3df1a 71 size_t event_send_timeout_secs;
AzureIoTClient 41:71c01aa3df1a 72 time_t last_message_sender_state_change_time;
AzureIoTClient 41:71c01aa3df1a 73 time_t last_message_receiver_state_change_time;
AzureIoTClient 36:f78f9a56869e 74 } TELEMETRY_MESSENGER_INSTANCE;
AzureIoTClient 36:f78f9a56869e 75
AzureIoTClient 41:71c01aa3df1a 76 // MESSENGER_SEND_EVENT_CALLER_INFORMATION corresponds to a message sent from the API, including
AzureIoTClient 41:71c01aa3df1a 77 // the message and callback information to alert caller about what happened.
AzureIoTClient 41:71c01aa3df1a 78 typedef struct MESSENGER_SEND_EVENT_CALLER_INFORMATION_TAG
AzureIoTClient 41:71c01aa3df1a 79 {
AzureIoTClient 41:71c01aa3df1a 80 IOTHUB_MESSAGE_LIST* message;
AzureIoTClient 41:71c01aa3df1a 81 ON_TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE on_event_send_complete_callback;
AzureIoTClient 41:71c01aa3df1a 82 void* context;
AzureIoTClient 41:71c01aa3df1a 83 } MESSENGER_SEND_EVENT_CALLER_INFORMATION;
AzureIoTClient 41:71c01aa3df1a 84
AzureIoTClient 41:71c01aa3df1a 85 // MESSENGER_SEND_EVENT_TASK interfaces with underlying uAMQP layer. It receives the callback
AzureIoTClient 41:71c01aa3df1a 86 // from this lower layer which is used to pass the results back to the API via the callback_list.
AzureIoTClient 36:f78f9a56869e 87 typedef struct MESSENGER_SEND_EVENT_TASK_TAG
AzureIoTClient 36:f78f9a56869e 88 {
AzureIoTClient 41:71c01aa3df1a 89 SINGLYLINKEDLIST_HANDLE callback_list; // List of MESSENGER_SEND_EVENT_CALLER_INFORMATION's
AzureIoTClient 41:71c01aa3df1a 90 time_t send_time;
AzureIoTClient 41:71c01aa3df1a 91 TELEMETRY_MESSENGER_INSTANCE *messenger;
AzureIoTClient 41:71c01aa3df1a 92 bool is_timed_out;
AzureIoTClient 36:f78f9a56869e 93 } MESSENGER_SEND_EVENT_TASK;
AzureIoTClient 36:f78f9a56869e 94
AzureIoTClient 41:71c01aa3df1a 95
AzureIoTClient 36:f78f9a56869e 96 // @brief
AzureIoTClient 36:f78f9a56869e 97 // Evaluates if the ammount of time since start_time is greater or lesser than timeout_in_secs.
AzureIoTClient 36:f78f9a56869e 98 // @param is_timed_out
AzureIoTClient 36:f78f9a56869e 99 // Set to 1 if a timeout has been reached, 0 otherwise. Not set if any failure occurs.
AzureIoTClient 36:f78f9a56869e 100 // @returns
AzureIoTClient 36:f78f9a56869e 101 // 0 if no failures occur, non-zero otherwise.
AzureIoTClient 36:f78f9a56869e 102 static int is_timeout_reached(time_t start_time, size_t timeout_in_secs, int *is_timed_out)
AzureIoTClient 36:f78f9a56869e 103 {
AzureIoTClient 41:71c01aa3df1a 104 int result;
AzureIoTClient 36:f78f9a56869e 105
AzureIoTClient 41:71c01aa3df1a 106 if (start_time == INDEFINITE_TIME)
AzureIoTClient 41:71c01aa3df1a 107 {
AzureIoTClient 41:71c01aa3df1a 108 LogError("Failed to verify timeout (start_time is INDEFINITE)");
AzureIoTClient 41:71c01aa3df1a 109 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 110 }
AzureIoTClient 41:71c01aa3df1a 111 else
AzureIoTClient 41:71c01aa3df1a 112 {
AzureIoTClient 41:71c01aa3df1a 113 time_t current_time;
AzureIoTClient 36:f78f9a56869e 114
AzureIoTClient 41:71c01aa3df1a 115 if ((current_time = get_time(NULL)) == INDEFINITE_TIME)
AzureIoTClient 41:71c01aa3df1a 116 {
AzureIoTClient 41:71c01aa3df1a 117 LogError("Failed to verify timeout (get_time failed)");
AzureIoTClient 41:71c01aa3df1a 118 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 119 }
AzureIoTClient 41:71c01aa3df1a 120 else
AzureIoTClient 41:71c01aa3df1a 121 {
AzureIoTClient 41:71c01aa3df1a 122 if (get_difftime(current_time, start_time) >= timeout_in_secs)
AzureIoTClient 41:71c01aa3df1a 123 {
AzureIoTClient 41:71c01aa3df1a 124 *is_timed_out = 1;
AzureIoTClient 41:71c01aa3df1a 125 }
AzureIoTClient 41:71c01aa3df1a 126 else
AzureIoTClient 41:71c01aa3df1a 127 {
AzureIoTClient 41:71c01aa3df1a 128 *is_timed_out = 0;
AzureIoTClient 41:71c01aa3df1a 129 }
AzureIoTClient 36:f78f9a56869e 130
AzureIoTClient 41:71c01aa3df1a 131 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 132 }
AzureIoTClient 41:71c01aa3df1a 133 }
AzureIoTClient 36:f78f9a56869e 134
AzureIoTClient 41:71c01aa3df1a 135 return result;
AzureIoTClient 36:f78f9a56869e 136 }
AzureIoTClient 36:f78f9a56869e 137
AzureIoTClient 54:830550fef7ea 138 static STRING_HANDLE create_devices_and_modules_path(STRING_HANDLE iothub_host_fqdn, STRING_HANDLE device_id, STRING_HANDLE module_id)
AzureIoTClient 36:f78f9a56869e 139 {
AzureIoTClient 54:830550fef7ea 140 STRING_HANDLE devices_and_modules_path;
AzureIoTClient 36:f78f9a56869e 141
AzureIoTClient 54:830550fef7ea 142 if ((devices_and_modules_path = STRING_new()) == NULL)
AzureIoTClient 43:3da2d93bb955 143 {
AzureIoTClient 54:830550fef7ea 144 LogError("Failed creating devices_and_modules_path (STRING_new failed)");
AzureIoTClient 43:3da2d93bb955 145 }
AzureIoTClient 43:3da2d93bb955 146 else
AzureIoTClient 41:71c01aa3df1a 147 {
AzureIoTClient 43:3da2d93bb955 148 const char* iothub_host_fqdn_char_ptr = STRING_c_str(iothub_host_fqdn);
AzureIoTClient 43:3da2d93bb955 149 const char* device_id_char_ptr = STRING_c_str(device_id);
AzureIoTClient 54:830550fef7ea 150 const char* module_id_char_ptr = STRING_c_str(module_id);
AzureIoTClient 54:830550fef7ea 151
AzureIoTClient 54:830550fef7ea 152 if (module_id_char_ptr != NULL)
AzureIoTClient 43:3da2d93bb955 153 {
AzureIoTClient 54:830550fef7ea 154 if (STRING_sprintf(devices_and_modules_path, IOTHUB_DEVICES_MODULE_PATH_FMT, iothub_host_fqdn_char_ptr, device_id_char_ptr, module_id_char_ptr) != RESULT_OK)
AzureIoTClient 54:830550fef7ea 155 {
AzureIoTClient 54:830550fef7ea 156 STRING_delete(devices_and_modules_path);
AzureIoTClient 54:830550fef7ea 157 devices_and_modules_path = NULL;
AzureIoTClient 54:830550fef7ea 158 LogError("Failed creating devices_and_modules_path (STRING_sprintf failed)");
AzureIoTClient 54:830550fef7ea 159 }
AzureIoTClient 54:830550fef7ea 160 }
AzureIoTClient 54:830550fef7ea 161 else
AzureIoTClient 54:830550fef7ea 162 {
AzureIoTClient 54:830550fef7ea 163 if (STRING_sprintf(devices_and_modules_path, IOTHUB_DEVICES_PATH_FMT, iothub_host_fqdn_char_ptr, device_id_char_ptr) != RESULT_OK)
AzureIoTClient 54:830550fef7ea 164 {
AzureIoTClient 54:830550fef7ea 165 STRING_delete(devices_and_modules_path);
AzureIoTClient 54:830550fef7ea 166 devices_and_modules_path = NULL;
AzureIoTClient 54:830550fef7ea 167 LogError("Failed creating devices_and_modules_path (STRING_sprintf failed)");
AzureIoTClient 54:830550fef7ea 168 }
AzureIoTClient 43:3da2d93bb955 169 }
AzureIoTClient 41:71c01aa3df1a 170 }
AzureIoTClient 36:f78f9a56869e 171
AzureIoTClient 54:830550fef7ea 172 return devices_and_modules_path;
AzureIoTClient 36:f78f9a56869e 173 }
AzureIoTClient 36:f78f9a56869e 174
AzureIoTClient 54:830550fef7ea 175 static STRING_HANDLE create_event_send_address(STRING_HANDLE devices_and_modules_path)
AzureIoTClient 36:f78f9a56869e 176 {
AzureIoTClient 41:71c01aa3df1a 177 STRING_HANDLE event_send_address;
AzureIoTClient 36:f78f9a56869e 178
AzureIoTClient 41:71c01aa3df1a 179 if ((event_send_address = STRING_new()) == NULL)
AzureIoTClient 41:71c01aa3df1a 180 {
AzureIoTClient 41:71c01aa3df1a 181 LogError("Failed creating the event_send_address (STRING_new failed)");
AzureIoTClient 41:71c01aa3df1a 182 }
AzureIoTClient 41:71c01aa3df1a 183 else
AzureIoTClient 41:71c01aa3df1a 184 {
AzureIoTClient 54:830550fef7ea 185 const char* devices_and_modules_path_char_ptr = STRING_c_str(devices_and_modules_path);
AzureIoTClient 54:830550fef7ea 186 if (STRING_sprintf(event_send_address, IOTHUB_EVENT_SEND_ADDRESS_FMT, devices_and_modules_path_char_ptr) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 187 {
AzureIoTClient 41:71c01aa3df1a 188 STRING_delete(event_send_address);
AzureIoTClient 41:71c01aa3df1a 189 event_send_address = NULL;
AzureIoTClient 41:71c01aa3df1a 190 LogError("Failed creating the event_send_address (STRING_sprintf failed)");
AzureIoTClient 41:71c01aa3df1a 191 }
AzureIoTClient 41:71c01aa3df1a 192 }
AzureIoTClient 36:f78f9a56869e 193
AzureIoTClient 41:71c01aa3df1a 194 return event_send_address;
AzureIoTClient 36:f78f9a56869e 195 }
AzureIoTClient 36:f78f9a56869e 196
AzureIoTClient 36:f78f9a56869e 197 static STRING_HANDLE create_event_sender_source_name(STRING_HANDLE link_name)
AzureIoTClient 36:f78f9a56869e 198 {
AzureIoTClient 41:71c01aa3df1a 199 STRING_HANDLE source_name;
AzureIoTClient 56:8704100b3b54 200
AzureIoTClient 41:71c01aa3df1a 201 if ((source_name = STRING_new()) == NULL)
AzureIoTClient 41:71c01aa3df1a 202 {
AzureIoTClient 41:71c01aa3df1a 203 LogError("Failed creating the source_name (STRING_new failed)");
AzureIoTClient 41:71c01aa3df1a 204 }
AzureIoTClient 41:71c01aa3df1a 205 else
AzureIoTClient 41:71c01aa3df1a 206 {
AzureIoTClient 41:71c01aa3df1a 207 const char* link_name_char_ptr = STRING_c_str(link_name);
AzureIoTClient 41:71c01aa3df1a 208 if (STRING_sprintf(source_name, "%s-source", link_name_char_ptr) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 209 {
AzureIoTClient 41:71c01aa3df1a 210 STRING_delete(source_name);
AzureIoTClient 41:71c01aa3df1a 211 source_name = NULL;
AzureIoTClient 41:71c01aa3df1a 212 LogError("Failed creating the source_name (STRING_sprintf failed)");
AzureIoTClient 41:71c01aa3df1a 213 }
AzureIoTClient 41:71c01aa3df1a 214 }
AzureIoTClient 36:f78f9a56869e 215
AzureIoTClient 41:71c01aa3df1a 216 return source_name;
AzureIoTClient 36:f78f9a56869e 217 }
AzureIoTClient 36:f78f9a56869e 218
AzureIoTClient 54:830550fef7ea 219 static STRING_HANDLE create_message_receive_address(STRING_HANDLE devices_and_modules_path)
AzureIoTClient 36:f78f9a56869e 220 {
AzureIoTClient 41:71c01aa3df1a 221 STRING_HANDLE message_receive_address;
AzureIoTClient 36:f78f9a56869e 222
AzureIoTClient 41:71c01aa3df1a 223 if ((message_receive_address = STRING_new()) == NULL)
AzureIoTClient 41:71c01aa3df1a 224 {
AzureIoTClient 41:71c01aa3df1a 225 LogError("Failed creating the message_receive_address (STRING_new failed)");
AzureIoTClient 41:71c01aa3df1a 226 }
AzureIoTClient 41:71c01aa3df1a 227 else
AzureIoTClient 41:71c01aa3df1a 228 {
AzureIoTClient 54:830550fef7ea 229 const char* devices_and_modules_path_char_ptr = STRING_c_str(devices_and_modules_path);
AzureIoTClient 54:830550fef7ea 230 if (STRING_sprintf(message_receive_address, IOTHUB_MESSAGE_RECEIVE_ADDRESS_FMT, devices_and_modules_path_char_ptr) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 231 {
AzureIoTClient 41:71c01aa3df1a 232 STRING_delete(message_receive_address);
AzureIoTClient 41:71c01aa3df1a 233 message_receive_address = NULL;
AzureIoTClient 41:71c01aa3df1a 234 LogError("Failed creating the message_receive_address (STRING_sprintf failed)");
AzureIoTClient 41:71c01aa3df1a 235 }
AzureIoTClient 41:71c01aa3df1a 236 }
AzureIoTClient 36:f78f9a56869e 237
AzureIoTClient 41:71c01aa3df1a 238 return message_receive_address;
AzureIoTClient 36:f78f9a56869e 239 }
AzureIoTClient 36:f78f9a56869e 240
AzureIoTClient 36:f78f9a56869e 241 static STRING_HANDLE create_message_receiver_target_name(STRING_HANDLE link_name)
AzureIoTClient 36:f78f9a56869e 242 {
AzureIoTClient 41:71c01aa3df1a 243 STRING_HANDLE target_name;
AzureIoTClient 36:f78f9a56869e 244
AzureIoTClient 41:71c01aa3df1a 245 if ((target_name = STRING_new()) == NULL)
AzureIoTClient 41:71c01aa3df1a 246 {
AzureIoTClient 41:71c01aa3df1a 247 LogError("Failed creating the target_name (STRING_new failed)");
AzureIoTClient 41:71c01aa3df1a 248 }
AzureIoTClient 41:71c01aa3df1a 249 else
AzureIoTClient 41:71c01aa3df1a 250 {
AzureIoTClient 41:71c01aa3df1a 251 const char* link_name_char_ptr = STRING_c_str(link_name);
AzureIoTClient 41:71c01aa3df1a 252 if (STRING_sprintf(target_name, "%s-target", link_name_char_ptr) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 253 {
AzureIoTClient 41:71c01aa3df1a 254 STRING_delete(target_name);
AzureIoTClient 41:71c01aa3df1a 255 target_name = NULL;
AzureIoTClient 41:71c01aa3df1a 256 LogError("Failed creating the target_name (STRING_sprintf failed)");
AzureIoTClient 41:71c01aa3df1a 257 }
AzureIoTClient 41:71c01aa3df1a 258 }
AzureIoTClient 36:f78f9a56869e 259
AzureIoTClient 41:71c01aa3df1a 260 return target_name;
AzureIoTClient 36:f78f9a56869e 261 }
AzureIoTClient 36:f78f9a56869e 262
AzureIoTClient 36:f78f9a56869e 263 static STRING_HANDLE create_link_name(const char* prefix, const char* infix)
AzureIoTClient 36:f78f9a56869e 264 {
AzureIoTClient 41:71c01aa3df1a 265 char* unique_id;
AzureIoTClient 41:71c01aa3df1a 266 STRING_HANDLE tag = NULL;
AzureIoTClient 36:f78f9a56869e 267
AzureIoTClient 41:71c01aa3df1a 268 if ((unique_id = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL)
AzureIoTClient 41:71c01aa3df1a 269 {
AzureIoTClient 41:71c01aa3df1a 270 LogError("Failed generating an unique tag (malloc failed)");
AzureIoTClient 41:71c01aa3df1a 271 }
AzureIoTClient 41:71c01aa3df1a 272 else
AzureIoTClient 41:71c01aa3df1a 273 {
AzureIoTClient 36:f78f9a56869e 274 memset(unique_id, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1);
AzureIoTClient 36:f78f9a56869e 275
AzureIoTClient 41:71c01aa3df1a 276 if (UniqueId_Generate(unique_id, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK)
AzureIoTClient 41:71c01aa3df1a 277 {
AzureIoTClient 41:71c01aa3df1a 278 LogError("Failed generating an unique tag (UniqueId_Generate failed)");
AzureIoTClient 41:71c01aa3df1a 279 }
AzureIoTClient 41:71c01aa3df1a 280 else if ((tag = STRING_new()) == NULL)
AzureIoTClient 41:71c01aa3df1a 281 {
AzureIoTClient 41:71c01aa3df1a 282 LogError("Failed generating an unique tag (STRING_new failed)");
AzureIoTClient 41:71c01aa3df1a 283 }
AzureIoTClient 41:71c01aa3df1a 284 else if (STRING_sprintf(tag, "%s-%s-%s", prefix, infix, unique_id) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 285 {
AzureIoTClient 41:71c01aa3df1a 286 STRING_delete(tag);
AzureIoTClient 41:71c01aa3df1a 287 tag = NULL;
AzureIoTClient 41:71c01aa3df1a 288 LogError("Failed generating an unique tag (STRING_sprintf failed)");
AzureIoTClient 41:71c01aa3df1a 289 }
AzureIoTClient 36:f78f9a56869e 290
AzureIoTClient 41:71c01aa3df1a 291 free(unique_id);
AzureIoTClient 41:71c01aa3df1a 292 }
AzureIoTClient 36:f78f9a56869e 293
AzureIoTClient 41:71c01aa3df1a 294 return tag;
AzureIoTClient 36:f78f9a56869e 295 }
AzureIoTClient 36:f78f9a56869e 296
AzureIoTClient 36:f78f9a56869e 297 static void update_messenger_state(TELEMETRY_MESSENGER_INSTANCE* instance, TELEMETRY_MESSENGER_STATE new_state)
AzureIoTClient 36:f78f9a56869e 298 {
AzureIoTClient 41:71c01aa3df1a 299 if (new_state != instance->state)
AzureIoTClient 41:71c01aa3df1a 300 {
AzureIoTClient 41:71c01aa3df1a 301 TELEMETRY_MESSENGER_STATE previous_state = instance->state;
AzureIoTClient 41:71c01aa3df1a 302 instance->state = new_state;
AzureIoTClient 36:f78f9a56869e 303
AzureIoTClient 41:71c01aa3df1a 304 if (instance->on_state_changed_callback != NULL)
AzureIoTClient 41:71c01aa3df1a 305 {
AzureIoTClient 41:71c01aa3df1a 306 instance->on_state_changed_callback(instance->on_state_changed_context, previous_state, new_state);
AzureIoTClient 41:71c01aa3df1a 307 }
AzureIoTClient 41:71c01aa3df1a 308 }
AzureIoTClient 36:f78f9a56869e 309 }
AzureIoTClient 36:f78f9a56869e 310
AzureIoTClient 36:f78f9a56869e 311 static void attach_device_client_type_to_link(LINK_HANDLE link, STRING_HANDLE product_info)
AzureIoTClient 36:f78f9a56869e 312 {
AzureIoTClient 41:71c01aa3df1a 313 fields attach_properties;
AzureIoTClient 41:71c01aa3df1a 314 AMQP_VALUE device_client_type_key_name;
AzureIoTClient 41:71c01aa3df1a 315 AMQP_VALUE device_client_type_value;
AzureIoTClient 36:f78f9a56869e 316
AzureIoTClient 41:71c01aa3df1a 317 if ((attach_properties = amqpvalue_create_map()) == NULL)
AzureIoTClient 41:71c01aa3df1a 318 {
AzureIoTClient 41:71c01aa3df1a 319 LogError("Failed to create the map for device client type.");
AzureIoTClient 41:71c01aa3df1a 320 }
AzureIoTClient 41:71c01aa3df1a 321 else
AzureIoTClient 41:71c01aa3df1a 322 {
AzureIoTClient 41:71c01aa3df1a 323 if ((device_client_type_key_name = amqpvalue_create_symbol("com.microsoft:client-version")) == NULL)
AzureIoTClient 41:71c01aa3df1a 324 {
AzureIoTClient 41:71c01aa3df1a 325 LogError("Failed to create the key name for the device client type.");
AzureIoTClient 41:71c01aa3df1a 326 }
AzureIoTClient 41:71c01aa3df1a 327 else
AzureIoTClient 41:71c01aa3df1a 328 {
AzureIoTClient 41:71c01aa3df1a 329 if ((device_client_type_value = amqpvalue_create_string(STRING_c_str(product_info))) == NULL)
AzureIoTClient 41:71c01aa3df1a 330 {
AzureIoTClient 41:71c01aa3df1a 331 LogError("Failed to create the key value for the device client type.");
AzureIoTClient 41:71c01aa3df1a 332 }
AzureIoTClient 41:71c01aa3df1a 333 else
AzureIoTClient 41:71c01aa3df1a 334 {
AzureIoTClient 56:8704100b3b54 335 if (amqpvalue_set_map_value(attach_properties, device_client_type_key_name, device_client_type_value) != 0)
AzureIoTClient 41:71c01aa3df1a 336 {
AzureIoTClient 56:8704100b3b54 337 LogError("Failed to set the property map for the device client type");
AzureIoTClient 41:71c01aa3df1a 338 }
AzureIoTClient 56:8704100b3b54 339 else if (link_set_attach_properties(link, attach_properties) != 0)
AzureIoTClient 41:71c01aa3df1a 340 {
AzureIoTClient 56:8704100b3b54 341 LogError("Unable to attach the device client type to the link properties");
AzureIoTClient 41:71c01aa3df1a 342 }
AzureIoTClient 36:f78f9a56869e 343
AzureIoTClient 41:71c01aa3df1a 344 amqpvalue_destroy(device_client_type_value);
AzureIoTClient 41:71c01aa3df1a 345 }
AzureIoTClient 36:f78f9a56869e 346
AzureIoTClient 41:71c01aa3df1a 347 amqpvalue_destroy(device_client_type_key_name);
AzureIoTClient 41:71c01aa3df1a 348 }
AzureIoTClient 36:f78f9a56869e 349
AzureIoTClient 41:71c01aa3df1a 350 amqpvalue_destroy(attach_properties);
AzureIoTClient 41:71c01aa3df1a 351 }
AzureIoTClient 36:f78f9a56869e 352 }
AzureIoTClient 36:f78f9a56869e 353
AzureIoTClient 36:f78f9a56869e 354 static void destroy_event_sender(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 355 {
AzureIoTClient 41:71c01aa3df1a 356 if (instance->message_sender != NULL)
AzureIoTClient 41:71c01aa3df1a 357 {
AzureIoTClient 41:71c01aa3df1a 358 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_060: [`instance->message_sender` shall be destroyed using messagesender_destroy()]
AzureIoTClient 41:71c01aa3df1a 359 messagesender_destroy(instance->message_sender);
AzureIoTClient 41:71c01aa3df1a 360 instance->message_sender = NULL;
AzureIoTClient 41:71c01aa3df1a 361 }
AzureIoTClient 36:f78f9a56869e 362
AzureIoTClient 45:c09fac270e60 363 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 364 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 365 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
AzureIoTClient 45:c09fac270e60 366
AzureIoTClient 41:71c01aa3df1a 367 if (instance->sender_link != NULL)
AzureIoTClient 41:71c01aa3df1a 368 {
AzureIoTClient 41:71c01aa3df1a 369 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [`instance->sender_link` shall be destroyed using link_destroy()]
AzureIoTClient 41:71c01aa3df1a 370 link_destroy(instance->sender_link);
AzureIoTClient 41:71c01aa3df1a 371 instance->sender_link = NULL;
AzureIoTClient 41:71c01aa3df1a 372 }
AzureIoTClient 36:f78f9a56869e 373 }
AzureIoTClient 36:f78f9a56869e 374
AzureIoTClient 36:f78f9a56869e 375 static void on_event_sender_state_changed_callback(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
AzureIoTClient 36:f78f9a56869e 376 {
AzureIoTClient 41:71c01aa3df1a 377 if (context == NULL)
AzureIoTClient 41:71c01aa3df1a 378 {
AzureIoTClient 41:71c01aa3df1a 379 LogError("on_event_sender_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
AzureIoTClient 41:71c01aa3df1a 380 }
AzureIoTClient 41:71c01aa3df1a 381 else
AzureIoTClient 41:71c01aa3df1a 382 {
AzureIoTClient 41:71c01aa3df1a 383 if (new_state != previous_state)
AzureIoTClient 41:71c01aa3df1a 384 {
AzureIoTClient 41:71c01aa3df1a 385 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)context;
AzureIoTClient 41:71c01aa3df1a 386 instance->message_sender_current_state = new_state;
AzureIoTClient 41:71c01aa3df1a 387 instance->message_sender_previous_state = previous_state;
AzureIoTClient 41:71c01aa3df1a 388 instance->last_message_sender_state_change_time = get_time(NULL);
AzureIoTClient 41:71c01aa3df1a 389 }
AzureIoTClient 41:71c01aa3df1a 390 }
AzureIoTClient 36:f78f9a56869e 391 }
AzureIoTClient 36:f78f9a56869e 392
AzureIoTClient 36:f78f9a56869e 393 static int create_event_sender(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 394 {
AzureIoTClient 41:71c01aa3df1a 395 int result;
AzureIoTClient 36:f78f9a56869e 396
AzureIoTClient 41:71c01aa3df1a 397 STRING_HANDLE link_name = NULL;
AzureIoTClient 41:71c01aa3df1a 398 STRING_HANDLE source_name = NULL;
AzureIoTClient 41:71c01aa3df1a 399 AMQP_VALUE source = NULL;
AzureIoTClient 41:71c01aa3df1a 400 AMQP_VALUE target = NULL;
AzureIoTClient 54:830550fef7ea 401 STRING_HANDLE devices_and_modules_path = NULL;
AzureIoTClient 41:71c01aa3df1a 402 STRING_HANDLE event_send_address = NULL;
AzureIoTClient 36:f78f9a56869e 403
AzureIoTClient 54:830550fef7ea 404 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_033: [A variable, named `devices_and_modules_path`, shall be created concatenating `instance->iothub_host_fqdn`, "/devices/" and `instance->device_id` (and "/modules/" and `instance->module_id` if modules are present)]
AzureIoTClient 54:830550fef7ea 405 if ((devices_and_modules_path = create_devices_and_modules_path(instance->iothub_host_fqdn, instance->device_id, instance->module_id)) == NULL)
AzureIoTClient 41:71c01aa3df1a 406 {
AzureIoTClient 54:830550fef7ea 407 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_034: [If `devices_and_modules_path` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 408 result = __FAILURE__;
AzureIoTClient 54:830550fef7ea 409 LogError("Failed creating the message sender (failed creating the 'devices_and_modules_path')");
AzureIoTClient 41:71c01aa3df1a 410 }
AzureIoTClient 54:830550fef7ea 411 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_035: [A variable, named `event_send_address`, shall be created concatenating "amqps://", `devices_and_modules_path` and "/messages/events"]
AzureIoTClient 54:830550fef7ea 412 else if ((event_send_address = create_event_send_address(devices_and_modules_path)) == NULL)
AzureIoTClient 41:71c01aa3df1a 413 {
AzureIoTClient 41:71c01aa3df1a 414 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_036: [If `event_send_address` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 415 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 416 LogError("Failed creating the message sender (failed creating the 'event_send_address')");
AzureIoTClient 41:71c01aa3df1a 417 }
AzureIoTClient 41:71c01aa3df1a 418 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_037: [A `link_name` variable shall be created using an unique string label per AMQP session]
AzureIoTClient 41:71c01aa3df1a 419 else if ((link_name = create_link_name(MESSAGE_SENDER_LINK_NAME_PREFIX, STRING_c_str(instance->device_id))) == NULL)
AzureIoTClient 41:71c01aa3df1a 420 {
AzureIoTClient 41:71c01aa3df1a 421 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_038: [If `link_name` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 422 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 423 LogError("Failed creating the message sender (failed creating an unique link name)");
AzureIoTClient 41:71c01aa3df1a 424 }
AzureIoTClient 41:71c01aa3df1a 425 else if ((source_name = create_event_sender_source_name(link_name)) == NULL)
AzureIoTClient 41:71c01aa3df1a 426 {
AzureIoTClient 41:71c01aa3df1a 427 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 428 LogError("Failed creating the message sender (failed creating an unique source name)");
AzureIoTClient 41:71c01aa3df1a 429 }
AzureIoTClient 41:71c01aa3df1a 430 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_039: [A `source` variable shall be created with messaging_create_source() using an unique string label per AMQP session]
AzureIoTClient 41:71c01aa3df1a 431 else if ((source = messaging_create_source(STRING_c_str(source_name))) == NULL)
AzureIoTClient 41:71c01aa3df1a 432 {
AzureIoTClient 41:71c01aa3df1a 433 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_040: [If `source` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 434 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 435 LogError("Failed creating the message sender (messaging_create_source failed)");
AzureIoTClient 41:71c01aa3df1a 436 }
AzureIoTClient 41:71c01aa3df1a 437 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_041: [A `target` variable shall be created with messaging_create_target() using `event_send_address`]
AzureIoTClient 41:71c01aa3df1a 438 else if ((target = messaging_create_target(STRING_c_str(event_send_address))) == NULL)
AzureIoTClient 41:71c01aa3df1a 439 {
AzureIoTClient 41:71c01aa3df1a 440 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_042: [If `target` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 441 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 442 LogError("Failed creating the message sender (messaging_create_target failed)");
AzureIoTClient 41:71c01aa3df1a 443 }
AzureIoTClient 41:71c01aa3df1a 444 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_043: [`instance->sender_link` shall be set using link_create(), passing `instance->session_handle`, `link_name`, "role_sender", `source` and `target` as parameters]
AzureIoTClient 41:71c01aa3df1a 445 else if ((instance->sender_link = link_create(instance->session_handle, STRING_c_str(link_name), role_sender, source, target)) == NULL)
AzureIoTClient 41:71c01aa3df1a 446 {
AzureIoTClient 41:71c01aa3df1a 447 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_044: [If link_create() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 448 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 449 LogError("Failed creating the message sender (link_create failed)");
AzureIoTClient 41:71c01aa3df1a 450 }
AzureIoTClient 41:71c01aa3df1a 451 else
AzureIoTClient 41:71c01aa3df1a 452 {
AzureIoTClient 41:71c01aa3df1a 453 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_047: [`instance->sender_link` maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
AzureIoTClient 41:71c01aa3df1a 454 if (link_set_max_message_size(instance->sender_link, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 455 {
AzureIoTClient 41:71c01aa3df1a 456 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_048: [If link_set_max_message_size() fails, it shall be logged and ignored.]
AzureIoTClient 41:71c01aa3df1a 457 LogError("Failed setting message sender link max message size.");
AzureIoTClient 41:71c01aa3df1a 458 }
AzureIoTClient 36:f78f9a56869e 459
AzureIoTClient 41:71c01aa3df1a 460 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_049: [`instance->sender_link` should have a property "com.microsoft:client-version" set as `CLIENT_DEVICE_TYPE_PREFIX/IOTHUB_SDK_VERSION`, using amqpvalue_set_map_value() and link_set_attach_properties()]
AzureIoTClient 41:71c01aa3df1a 461 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_050: [If amqpvalue_set_map_value() or link_set_attach_properties() fail, the failure shall be ignored]
AzureIoTClient 41:71c01aa3df1a 462 attach_device_client_type_to_link(instance->sender_link, instance->product_info);
AzureIoTClient 36:f78f9a56869e 463
AzureIoTClient 41:71c01aa3df1a 464 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_051: [`instance->message_sender` shall be created using messagesender_create(), passing the `instance->sender_link` and `on_event_sender_state_changed_callback`]
AzureIoTClient 41:71c01aa3df1a 465 if ((instance->message_sender = messagesender_create(instance->sender_link, on_event_sender_state_changed_callback, (void*)instance)) == NULL)
AzureIoTClient 41:71c01aa3df1a 466 {
AzureIoTClient 41:71c01aa3df1a 467 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_052: [If messagesender_create() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 45:c09fac270e60 468 LogError("Failed creating the message sender (messagesender_create failed)");
AzureIoTClient 45:c09fac270e60 469 destroy_event_sender(instance);
AzureIoTClient 41:71c01aa3df1a 470 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 471 }
AzureIoTClient 41:71c01aa3df1a 472 else
AzureIoTClient 41:71c01aa3df1a 473 {
AzureIoTClient 41:71c01aa3df1a 474 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_053: [`instance->message_sender` shall be opened using messagesender_open()]
AzureIoTClient 41:71c01aa3df1a 475 if (messagesender_open(instance->message_sender) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 476 {
AzureIoTClient 41:71c01aa3df1a 477 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_054: [If messagesender_open() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 45:c09fac270e60 478 LogError("Failed opening the AMQP message sender.");
AzureIoTClient 45:c09fac270e60 479 destroy_event_sender(instance);
AzureIoTClient 41:71c01aa3df1a 480 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 481 }
AzureIoTClient 41:71c01aa3df1a 482 else
AzureIoTClient 41:71c01aa3df1a 483 {
AzureIoTClient 41:71c01aa3df1a 484 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 485 }
AzureIoTClient 41:71c01aa3df1a 486 }
AzureIoTClient 41:71c01aa3df1a 487 }
AzureIoTClient 36:f78f9a56869e 488
AzureIoTClient 41:71c01aa3df1a 489 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_055: [Before returning, telemetry_messenger_do_work() shall release all the temporary memory it has allocated]
AzureIoTClient 41:71c01aa3df1a 490 if (link_name != NULL)
AzureIoTClient 41:71c01aa3df1a 491 STRING_delete(link_name);
AzureIoTClient 41:71c01aa3df1a 492 if (source_name != NULL)
AzureIoTClient 41:71c01aa3df1a 493 STRING_delete(source_name);
AzureIoTClient 41:71c01aa3df1a 494 if (source != NULL)
AzureIoTClient 41:71c01aa3df1a 495 amqpvalue_destroy(source);
AzureIoTClient 41:71c01aa3df1a 496 if (target != NULL)
AzureIoTClient 41:71c01aa3df1a 497 amqpvalue_destroy(target);
AzureIoTClient 54:830550fef7ea 498 if (devices_and_modules_path != NULL)
AzureIoTClient 54:830550fef7ea 499 STRING_delete(devices_and_modules_path);
AzureIoTClient 41:71c01aa3df1a 500 if (event_send_address != NULL)
AzureIoTClient 41:71c01aa3df1a 501 STRING_delete(event_send_address);
AzureIoTClient 36:f78f9a56869e 502
AzureIoTClient 41:71c01aa3df1a 503 return result;
AzureIoTClient 36:f78f9a56869e 504 }
AzureIoTClient 36:f78f9a56869e 505
AzureIoTClient 36:f78f9a56869e 506 static void destroy_message_receiver(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 507 {
AzureIoTClient 41:71c01aa3df1a 508 if (instance->message_receiver != NULL)
AzureIoTClient 41:71c01aa3df1a 509 {
AzureIoTClient 41:71c01aa3df1a 510 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_061: [`instance->message_receiver` shall be closed using messagereceiver_close()]
AzureIoTClient 41:71c01aa3df1a 511 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_093: [`instance->message_receiver` shall be closed using messagereceiver_close()]
AzureIoTClient 41:71c01aa3df1a 512 if (messagereceiver_close(instance->message_receiver) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 513 {
AzureIoTClient 41:71c01aa3df1a 514 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_094: [If messagereceiver_close() fails, it shall be logged and ignored]
AzureIoTClient 41:71c01aa3df1a 515 LogError("Failed closing the AMQP message receiver (this failure will be ignored).");
AzureIoTClient 41:71c01aa3df1a 516 }
AzureIoTClient 36:f78f9a56869e 517
AzureIoTClient 41:71c01aa3df1a 518 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()]
AzureIoTClient 41:71c01aa3df1a 519 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_095: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()]
AzureIoTClient 41:71c01aa3df1a 520 messagereceiver_destroy(instance->message_receiver);
AzureIoTClient 36:f78f9a56869e 521
AzureIoTClient 41:71c01aa3df1a 522 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_096: [`instance->message_receiver` shall be set to NULL]
AzureIoTClient 41:71c01aa3df1a 523 instance->message_receiver = NULL;
AzureIoTClient 41:71c01aa3df1a 524 }
AzureIoTClient 36:f78f9a56869e 525
AzureIoTClient 45:c09fac270e60 526 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 527 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 45:c09fac270e60 528 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
AzureIoTClient 45:c09fac270e60 529
AzureIoTClient 41:71c01aa3df1a 530 if (instance->receiver_link != NULL)
AzureIoTClient 41:71c01aa3df1a 531 {
AzureIoTClient 41:71c01aa3df1a 532 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_064: [`instance->receiver_link` shall be destroyed using link_destroy()]
AzureIoTClient 41:71c01aa3df1a 533 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_097: [`instance->receiver_link` shall be destroyed using link_destroy()]
AzureIoTClient 41:71c01aa3df1a 534 link_destroy(instance->receiver_link);
AzureIoTClient 41:71c01aa3df1a 535 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_098: [`instance->receiver_link` shall be set to NULL]
AzureIoTClient 41:71c01aa3df1a 536 instance->receiver_link = NULL;
AzureIoTClient 41:71c01aa3df1a 537 }
AzureIoTClient 36:f78f9a56869e 538 }
AzureIoTClient 36:f78f9a56869e 539
AzureIoTClient 36:f78f9a56869e 540 static void on_message_receiver_state_changed_callback(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
AzureIoTClient 36:f78f9a56869e 541 {
AzureIoTClient 41:71c01aa3df1a 542 if (context == NULL)
AzureIoTClient 41:71c01aa3df1a 543 {
AzureIoTClient 41:71c01aa3df1a 544 LogError("on_message_receiver_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
AzureIoTClient 41:71c01aa3df1a 545 }
AzureIoTClient 41:71c01aa3df1a 546 else
AzureIoTClient 41:71c01aa3df1a 547 {
AzureIoTClient 41:71c01aa3df1a 548 if (new_state != previous_state)
AzureIoTClient 41:71c01aa3df1a 549 {
AzureIoTClient 41:71c01aa3df1a 550 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)context;
AzureIoTClient 41:71c01aa3df1a 551 instance->message_receiver_current_state = new_state;
AzureIoTClient 41:71c01aa3df1a 552 instance->message_receiver_previous_state = previous_state;
AzureIoTClient 41:71c01aa3df1a 553 instance->last_message_receiver_state_change_time = get_time(NULL);
AzureIoTClient 41:71c01aa3df1a 554 }
AzureIoTClient 41:71c01aa3df1a 555 }
AzureIoTClient 36:f78f9a56869e 556 }
AzureIoTClient 36:f78f9a56869e 557
AzureIoTClient 36:f78f9a56869e 558 static TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* create_message_disposition_info(TELEMETRY_MESSENGER_INSTANCE* messenger)
AzureIoTClient 36:f78f9a56869e 559 {
AzureIoTClient 41:71c01aa3df1a 560 TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* result;
AzureIoTClient 36:f78f9a56869e 561
AzureIoTClient 41:71c01aa3df1a 562 if ((result = (TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO*)malloc(sizeof(TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO))) == NULL)
AzureIoTClient 41:71c01aa3df1a 563 {
AzureIoTClient 41:71c01aa3df1a 564 LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (malloc failed)");
AzureIoTClient 41:71c01aa3df1a 565 result = NULL;
AzureIoTClient 41:71c01aa3df1a 566 }
AzureIoTClient 41:71c01aa3df1a 567 else
AzureIoTClient 41:71c01aa3df1a 568 {
AzureIoTClient 41:71c01aa3df1a 569 delivery_number message_id;
AzureIoTClient 36:f78f9a56869e 570
AzureIoTClient 41:71c01aa3df1a 571 if (messagereceiver_get_received_message_id(messenger->message_receiver, &message_id) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 572 {
AzureIoTClient 41:71c01aa3df1a 573 LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_received_message_id failed)");
AzureIoTClient 41:71c01aa3df1a 574 free(result);
AzureIoTClient 41:71c01aa3df1a 575 result = NULL;
AzureIoTClient 41:71c01aa3df1a 576 }
AzureIoTClient 41:71c01aa3df1a 577 else
AzureIoTClient 41:71c01aa3df1a 578 {
AzureIoTClient 41:71c01aa3df1a 579 const char* link_name;
AzureIoTClient 36:f78f9a56869e 580
AzureIoTClient 41:71c01aa3df1a 581 if (messagereceiver_get_link_name(messenger->message_receiver, &link_name) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 582 {
AzureIoTClient 41:71c01aa3df1a 583 LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_link_name failed)");
AzureIoTClient 41:71c01aa3df1a 584 free(result);
AzureIoTClient 41:71c01aa3df1a 585 result = NULL;
AzureIoTClient 41:71c01aa3df1a 586 }
AzureIoTClient 41:71c01aa3df1a 587 else if (mallocAndStrcpy_s(&result->source, link_name) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 588 {
AzureIoTClient 41:71c01aa3df1a 589 LogError("Failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO container (failed copying link name)");
AzureIoTClient 41:71c01aa3df1a 590 free(result);
AzureIoTClient 41:71c01aa3df1a 591 result = NULL;
AzureIoTClient 41:71c01aa3df1a 592 }
AzureIoTClient 41:71c01aa3df1a 593 else
AzureIoTClient 41:71c01aa3df1a 594 {
AzureIoTClient 41:71c01aa3df1a 595 result->message_id = message_id;
AzureIoTClient 41:71c01aa3df1a 596 }
AzureIoTClient 41:71c01aa3df1a 597 }
AzureIoTClient 41:71c01aa3df1a 598 }
AzureIoTClient 36:f78f9a56869e 599
AzureIoTClient 41:71c01aa3df1a 600 return result;
AzureIoTClient 36:f78f9a56869e 601 }
AzureIoTClient 36:f78f9a56869e 602
AzureIoTClient 36:f78f9a56869e 603 static void destroy_message_disposition_info(TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
AzureIoTClient 36:f78f9a56869e 604 {
AzureIoTClient 41:71c01aa3df1a 605 free(disposition_info->source);
AzureIoTClient 41:71c01aa3df1a 606 free(disposition_info);
AzureIoTClient 36:f78f9a56869e 607 }
AzureIoTClient 36:f78f9a56869e 608
AzureIoTClient 36:f78f9a56869e 609 static AMQP_VALUE create_uamqp_disposition_result_from(TELEMETRY_MESSENGER_DISPOSITION_RESULT disposition_result)
AzureIoTClient 36:f78f9a56869e 610 {
AzureIoTClient 41:71c01aa3df1a 611 AMQP_VALUE uamqp_disposition_result;
AzureIoTClient 36:f78f9a56869e 612
AzureIoTClient 41:71c01aa3df1a 613 if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_NONE)
AzureIoTClient 41:71c01aa3df1a 614 {
AzureIoTClient 41:71c01aa3df1a 615 uamqp_disposition_result = NULL; // intentionally not sending an answer.
AzureIoTClient 41:71c01aa3df1a 616 }
AzureIoTClient 41:71c01aa3df1a 617 else if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_ACCEPTED)
AzureIoTClient 41:71c01aa3df1a 618 {
AzureIoTClient 41:71c01aa3df1a 619 uamqp_disposition_result = messaging_delivery_accepted();
AzureIoTClient 41:71c01aa3df1a 620 }
AzureIoTClient 41:71c01aa3df1a 621 else if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_RELEASED)
AzureIoTClient 41:71c01aa3df1a 622 {
AzureIoTClient 41:71c01aa3df1a 623 uamqp_disposition_result = messaging_delivery_released();
AzureIoTClient 41:71c01aa3df1a 624 }
AzureIoTClient 41:71c01aa3df1a 625 else if (disposition_result == TELEMETRY_MESSENGER_DISPOSITION_RESULT_REJECTED)
AzureIoTClient 41:71c01aa3df1a 626 {
AzureIoTClient 41:71c01aa3df1a 627 uamqp_disposition_result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
AzureIoTClient 41:71c01aa3df1a 628 }
AzureIoTClient 41:71c01aa3df1a 629 else
AzureIoTClient 41:71c01aa3df1a 630 {
AzureIoTClient 41:71c01aa3df1a 631 LogError("Failed creating a disposition result for messagereceiver (result %d is not supported)", disposition_result);
AzureIoTClient 41:71c01aa3df1a 632 uamqp_disposition_result = NULL;
AzureIoTClient 41:71c01aa3df1a 633 }
AzureIoTClient 36:f78f9a56869e 634
AzureIoTClient 41:71c01aa3df1a 635 return uamqp_disposition_result;
AzureIoTClient 36:f78f9a56869e 636 }
AzureIoTClient 36:f78f9a56869e 637
AzureIoTClient 36:f78f9a56869e 638 static AMQP_VALUE on_message_received_internal_callback(const void* context, MESSAGE_HANDLE message)
AzureIoTClient 36:f78f9a56869e 639 {
AzureIoTClient 41:71c01aa3df1a 640 AMQP_VALUE result;
AzureIoTClient 41:71c01aa3df1a 641 IOTHUB_MESSAGE_HANDLE iothub_message;
AzureIoTClient 36:f78f9a56869e 642
AzureIoTClient 41:71c01aa3df1a 643 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_121: [An IOTHUB_MESSAGE_HANDLE shall be obtained from MESSAGE_HANDLE using message_create_IoTHubMessage_from_uamqp_message()]
AzureIoTClient 56:8704100b3b54 644 if (message_create_IoTHubMessage_from_uamqp_message(message, &iothub_message) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 645 {
AzureIoTClient 41:71c01aa3df1a 646 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_122: [If message_create_IoTHubMessage_from_uamqp_message() fails, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()]
AzureIoTClient 41:71c01aa3df1a 647 result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading AMQP message");
AzureIoTClient 36:f78f9a56869e 648
AzureIoTClient 56:8704100b3b54 649 LogError("on_message_received_internal_callback failed (message_create_IoTHubMessage_from_uamqp_message).");
AzureIoTClient 41:71c01aa3df1a 650 }
AzureIoTClient 41:71c01aa3df1a 651 else
AzureIoTClient 41:71c01aa3df1a 652 {
AzureIoTClient 41:71c01aa3df1a 653 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)context;
AzureIoTClient 41:71c01aa3df1a 654 TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* message_disposition_info;
AzureIoTClient 36:f78f9a56869e 655
AzureIoTClient 41:71c01aa3df1a 656 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_186: [A TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be created containing the source link name and message delivery ID]
AzureIoTClient 41:71c01aa3df1a 657 if ((message_disposition_info = create_message_disposition_info(instance)) == NULL)
AzureIoTClient 41:71c01aa3df1a 658 {
AzureIoTClient 41:71c01aa3df1a 659 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_187: [**If the TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance fails to be created, on_message_received_internal_callback shall return messaging_delivery_released()]
AzureIoTClient 41:71c01aa3df1a 660 LogError("on_message_received_internal_callback failed (failed creating TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO).");
AzureIoTClient 41:71c01aa3df1a 661 result = messaging_delivery_released();
AzureIoTClient 41:71c01aa3df1a 662 }
AzureIoTClient 41:71c01aa3df1a 663 else
AzureIoTClient 41:71c01aa3df1a 664 {
AzureIoTClient 41:71c01aa3df1a 665 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_123: [`instance->on_message_received_callback` shall be invoked passing the IOTHUB_MESSAGE_HANDLE and TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance]
AzureIoTClient 41:71c01aa3df1a 666 TELEMETRY_MESSENGER_DISPOSITION_RESULT disposition_result = instance->on_message_received_callback(iothub_message, message_disposition_info, instance->on_message_received_context);
AzureIoTClient 36:f78f9a56869e 667
AzureIoTClient 41:71c01aa3df1a 668 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_188: [The memory allocated for the TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be released]
AzureIoTClient 41:71c01aa3df1a 669 destroy_message_disposition_info(message_disposition_info);
AzureIoTClient 36:f78f9a56869e 670
AzureIoTClient 41:71c01aa3df1a 671 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_125: [If `instance->on_message_received_callback` returns TELEMETRY_MESSENGER_DISPOSITION_RESULT_ACCEPTED, on_message_received_internal_callback shall return the result of messaging_delivery_accepted()]
AzureIoTClient 41:71c01aa3df1a 672 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_126: [If `instance->on_message_received_callback` returns TELEMETRY_MESSENGER_DISPOSITION_RESULT_RELEASED, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
AzureIoTClient 41:71c01aa3df1a 673 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_127: [If `instance->on_message_received_callback` returns TELEMETRY_MESSENGER_DISPOSITION_RESULT_REJECTED, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()]
AzureIoTClient 41:71c01aa3df1a 674 result = create_uamqp_disposition_result_from(disposition_result);
AzureIoTClient 41:71c01aa3df1a 675 }
AzureIoTClient 41:71c01aa3df1a 676 }
AzureIoTClient 36:f78f9a56869e 677
AzureIoTClient 41:71c01aa3df1a 678 return result;
AzureIoTClient 36:f78f9a56869e 679 }
AzureIoTClient 36:f78f9a56869e 680
AzureIoTClient 36:f78f9a56869e 681 static int create_message_receiver(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 682 {
AzureIoTClient 41:71c01aa3df1a 683 int result;
AzureIoTClient 36:f78f9a56869e 684
AzureIoTClient 54:830550fef7ea 685 STRING_HANDLE devices_and_modules_path = NULL;
AzureIoTClient 41:71c01aa3df1a 686 STRING_HANDLE message_receive_address = NULL;
AzureIoTClient 41:71c01aa3df1a 687 STRING_HANDLE link_name = NULL;
AzureIoTClient 41:71c01aa3df1a 688 STRING_HANDLE target_name = NULL;
AzureIoTClient 41:71c01aa3df1a 689 AMQP_VALUE source = NULL;
AzureIoTClient 41:71c01aa3df1a 690 AMQP_VALUE target = NULL;
AzureIoTClient 36:f78f9a56869e 691
AzureIoTClient 54:830550fef7ea 692 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_068: [A variable, named `devices_and_modules_path`, shall be created concatenating `instance->iothub_host_fqdn`, "/devices/" and `instance->device_id` (and "/modules/" and `instance->module_id` if modules are present)]
AzureIoTClient 54:830550fef7ea 693 if ((devices_and_modules_path = create_devices_and_modules_path(instance->iothub_host_fqdn, instance->device_id, instance->module_id)) == NULL)
AzureIoTClient 41:71c01aa3df1a 694 {
AzureIoTClient 54:830550fef7ea 695 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_069: [If `devices_and_modules_path` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 696 result = __FAILURE__;
AzureIoTClient 54:830550fef7ea 697 LogError("Failed creating the message receiver (failed creating the 'devices_and_modules_path')");
AzureIoTClient 41:71c01aa3df1a 698 }
AzureIoTClient 54:830550fef7ea 699 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_070: [A variable, named `message_receive_address`, shall be created concatenating "amqps://", `devices_and_modules_path` and "/messages/devicebound"]
AzureIoTClient 54:830550fef7ea 700 else if ((message_receive_address = create_message_receive_address(devices_and_modules_path)) == NULL)
AzureIoTClient 41:71c01aa3df1a 701 {
AzureIoTClient 41:71c01aa3df1a 702 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_071: [If `message_receive_address` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 703 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 704 LogError("Failed creating the message receiver (failed creating the 'message_receive_address')");
AzureIoTClient 41:71c01aa3df1a 705 }
AzureIoTClient 41:71c01aa3df1a 706 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_072: [A `link_name` variable shall be created using an unique string label per AMQP session]
AzureIoTClient 41:71c01aa3df1a 707 else if ((link_name = create_link_name(MESSAGE_RECEIVER_LINK_NAME_PREFIX, STRING_c_str(instance->device_id))) == NULL)
AzureIoTClient 41:71c01aa3df1a 708 {
AzureIoTClient 41:71c01aa3df1a 709 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_073: [If `link_name` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 710 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 711 LogError("Failed creating the message receiver (failed creating an unique link name)");
AzureIoTClient 41:71c01aa3df1a 712 }
AzureIoTClient 41:71c01aa3df1a 713 else if ((target_name = create_message_receiver_target_name(link_name)) == NULL)
AzureIoTClient 41:71c01aa3df1a 714 {
AzureIoTClient 41:71c01aa3df1a 715 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 716 LogError("Failed creating the message receiver (failed creating an unique target name)");
AzureIoTClient 41:71c01aa3df1a 717 }
AzureIoTClient 41:71c01aa3df1a 718 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_074: [A `target` variable shall be created with messaging_create_target() using an unique string label per AMQP session]
AzureIoTClient 41:71c01aa3df1a 719 else if ((target = messaging_create_target(STRING_c_str(target_name))) == NULL)
AzureIoTClient 41:71c01aa3df1a 720 {
AzureIoTClient 41:71c01aa3df1a 721 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_075: [If `target` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 722 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 723 LogError("Failed creating the message receiver (messaging_create_target failed)");
AzureIoTClient 41:71c01aa3df1a 724 }
AzureIoTClient 41:71c01aa3df1a 725 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_076: [A `source` variable shall be created with messaging_create_source() using `message_receive_address`]
AzureIoTClient 41:71c01aa3df1a 726 else if ((source = messaging_create_source(STRING_c_str(message_receive_address))) == NULL)
AzureIoTClient 41:71c01aa3df1a 727 {
AzureIoTClient 41:71c01aa3df1a 728 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_077: [If `source` fails to be created, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 729 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 730 LogError("Failed creating the message receiver (messaging_create_source failed)");
AzureIoTClient 41:71c01aa3df1a 731 }
AzureIoTClient 41:71c01aa3df1a 732 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_078: [`instance->receiver_link` shall be set using link_create(), passing `instance->session_handle`, `link_name`, "role_receiver", `source` and `target` as parameters]
AzureIoTClient 41:71c01aa3df1a 733 else if ((instance->receiver_link = link_create(instance->session_handle, STRING_c_str(link_name), role_receiver, source, target)) == NULL)
AzureIoTClient 41:71c01aa3df1a 734 {
AzureIoTClient 41:71c01aa3df1a 735 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_079: [If link_create() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 736 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 737 LogError("Failed creating the message receiver (link_create failed)");
AzureIoTClient 41:71c01aa3df1a 738 }
AzureIoTClient 41:71c01aa3df1a 739 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_080: [`instance->receiver_link` settle mode shall be set to "receiver_settle_mode_first" using link_set_rcv_settle_mode(), ]
AzureIoTClient 41:71c01aa3df1a 740 else if (link_set_rcv_settle_mode(instance->receiver_link, receiver_settle_mode_first) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 741 {
AzureIoTClient 41:71c01aa3df1a 742 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_081: [If link_set_rcv_settle_mode() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 743 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 744 LogError("Failed creating the message receiver (link_set_rcv_settle_mode failed)");
AzureIoTClient 41:71c01aa3df1a 745 }
AzureIoTClient 41:71c01aa3df1a 746 else
AzureIoTClient 41:71c01aa3df1a 747 {
AzureIoTClient 41:71c01aa3df1a 748 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_082: [`instance->receiver_link` maximum message size shall be set to 65536 using link_set_max_message_size()]
AzureIoTClient 41:71c01aa3df1a 749 if (link_set_max_message_size(instance->receiver_link, MESSAGE_RECEIVER_MAX_LINK_SIZE) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 750 {
AzureIoTClient 41:71c01aa3df1a 751 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_083: [If link_set_max_message_size() fails, it shall be logged and ignored.]
AzureIoTClient 41:71c01aa3df1a 752 LogError("Failed setting message receiver link max message size.");
AzureIoTClient 41:71c01aa3df1a 753 }
AzureIoTClient 36:f78f9a56869e 754
AzureIoTClient 41:71c01aa3df1a 755 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_084: [`instance->receiver_link` should have a property "com.microsoft:client-version" set as `CLIENT_DEVICE_TYPE_PREFIX/IOTHUB_SDK_VERSION`, using amqpvalue_set_map_value() and link_set_attach_properties()]
AzureIoTClient 41:71c01aa3df1a 756 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_085: [If amqpvalue_set_map_value() or link_set_attach_properties() fail, the failure shall be ignored]
AzureIoTClient 41:71c01aa3df1a 757 attach_device_client_type_to_link(instance->receiver_link, instance->product_info);
AzureIoTClient 36:f78f9a56869e 758
AzureIoTClient 41:71c01aa3df1a 759 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_086: [`instance->message_receiver` shall be created using messagereceiver_create(), passing the `instance->receiver_link` and `on_messagereceiver_state_changed_callback`]
AzureIoTClient 41:71c01aa3df1a 760 if ((instance->message_receiver = messagereceiver_create(instance->receiver_link, on_message_receiver_state_changed_callback, (void*)instance)) == NULL)
AzureIoTClient 41:71c01aa3df1a 761 {
AzureIoTClient 41:71c01aa3df1a 762 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_087: [If messagereceiver_create() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 45:c09fac270e60 763 LogError("Failed creating the message receiver (messagereceiver_create failed)");
AzureIoTClient 45:c09fac270e60 764 destroy_message_receiver(instance);
AzureIoTClient 41:71c01aa3df1a 765 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 766 }
AzureIoTClient 41:71c01aa3df1a 767 else
AzureIoTClient 41:71c01aa3df1a 768 {
AzureIoTClient 41:71c01aa3df1a 769 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_088: [`instance->message_receiver` shall be opened using messagereceiver_open(), passing `on_message_received_internal_callback`]
AzureIoTClient 41:71c01aa3df1a 770 if (messagereceiver_open(instance->message_receiver, on_message_received_internal_callback, (void*)instance) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 771 {
AzureIoTClient 41:71c01aa3df1a 772 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_089: [If messagereceiver_open() fails, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 45:c09fac270e60 773 LogError("Failed opening the AMQP message receiver.");
AzureIoTClient 45:c09fac270e60 774 destroy_message_receiver(instance);
AzureIoTClient 41:71c01aa3df1a 775 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 776 }
AzureIoTClient 41:71c01aa3df1a 777 else
AzureIoTClient 41:71c01aa3df1a 778 {
AzureIoTClient 41:71c01aa3df1a 779 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 780 }
AzureIoTClient 41:71c01aa3df1a 781 }
AzureIoTClient 41:71c01aa3df1a 782 }
AzureIoTClient 36:f78f9a56869e 783
AzureIoTClient 54:830550fef7ea 784 if (devices_and_modules_path != NULL)
AzureIoTClient 54:830550fef7ea 785 STRING_delete(devices_and_modules_path);
AzureIoTClient 41:71c01aa3df1a 786 if (message_receive_address != NULL)
AzureIoTClient 41:71c01aa3df1a 787 STRING_delete(message_receive_address);
AzureIoTClient 41:71c01aa3df1a 788 if (link_name != NULL)
AzureIoTClient 41:71c01aa3df1a 789 STRING_delete(link_name);
AzureIoTClient 41:71c01aa3df1a 790 if (target_name != NULL)
AzureIoTClient 41:71c01aa3df1a 791 STRING_delete(target_name);
AzureIoTClient 41:71c01aa3df1a 792 if (source != NULL)
AzureIoTClient 41:71c01aa3df1a 793 amqpvalue_destroy(source);
AzureIoTClient 41:71c01aa3df1a 794 if (target != NULL)
AzureIoTClient 41:71c01aa3df1a 795 amqpvalue_destroy(target);
AzureIoTClient 36:f78f9a56869e 796
AzureIoTClient 41:71c01aa3df1a 797 return result;
AzureIoTClient 36:f78f9a56869e 798 }
AzureIoTClient 36:f78f9a56869e 799
AzureIoTClient 36:f78f9a56869e 800 static int move_event_to_in_progress_list(MESSENGER_SEND_EVENT_TASK* task)
AzureIoTClient 36:f78f9a56869e 801 {
AzureIoTClient 56:8704100b3b54 802 int result;
AzureIoTClient 36:f78f9a56869e 803
AzureIoTClient 41:71c01aa3df1a 804 if (singlylinkedlist_add(task->messenger->in_progress_list, (void*)task) == NULL)
AzureIoTClient 41:71c01aa3df1a 805 {
AzureIoTClient 41:71c01aa3df1a 806 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 807 LogError("Failed moving event to in_progress list (singlylinkedlist_add failed)");
AzureIoTClient 41:71c01aa3df1a 808 }
AzureIoTClient 41:71c01aa3df1a 809 else
AzureIoTClient 41:71c01aa3df1a 810 {
AzureIoTClient 41:71c01aa3df1a 811 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 812 }
AzureIoTClient 36:f78f9a56869e 813
AzureIoTClient 41:71c01aa3df1a 814 return result;
AzureIoTClient 36:f78f9a56869e 815 }
AzureIoTClient 36:f78f9a56869e 816
AzureIoTClient 36:f78f9a56869e 817 static bool find_MESSENGER_SEND_EVENT_TASK_on_list(LIST_ITEM_HANDLE list_item, const void* match_context)
AzureIoTClient 36:f78f9a56869e 818 {
AzureIoTClient 41:71c01aa3df1a 819 return (list_item != NULL && singlylinkedlist_item_get_value(list_item) == match_context);
AzureIoTClient 36:f78f9a56869e 820 }
AzureIoTClient 36:f78f9a56869e 821
AzureIoTClient 36:f78f9a56869e 822 static void remove_event_from_in_progress_list(MESSENGER_SEND_EVENT_TASK *task)
AzureIoTClient 36:f78f9a56869e 823 {
AzureIoTClient 41:71c01aa3df1a 824 LIST_ITEM_HANDLE list_item = singlylinkedlist_find(task->messenger->in_progress_list, find_MESSENGER_SEND_EVENT_TASK_on_list, (void*)task);
AzureIoTClient 36:f78f9a56869e 825
AzureIoTClient 41:71c01aa3df1a 826 if (list_item != NULL)
AzureIoTClient 41:71c01aa3df1a 827 {
AzureIoTClient 41:71c01aa3df1a 828 if (singlylinkedlist_remove(task->messenger->in_progress_list, list_item) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 829 {
AzureIoTClient 41:71c01aa3df1a 830 LogError("Failed removing event from in_progress list (singlylinkedlist_remove failed)");
AzureIoTClient 41:71c01aa3df1a 831 }
AzureIoTClient 41:71c01aa3df1a 832 }
AzureIoTClient 36:f78f9a56869e 833 }
AzureIoTClient 36:f78f9a56869e 834
AzureIoTClient 51:269e65571b39 835 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_201: [Freeing a `task` will free callback items associated with it and free the data itself]
AzureIoTClient 51:269e65571b39 836 static void free_task(MESSENGER_SEND_EVENT_TASK* task)
AzureIoTClient 51:269e65571b39 837 {
AzureIoTClient 51:269e65571b39 838 LIST_ITEM_HANDLE list_node;
AzureIoTClient 51:269e65571b39 839
AzureIoTClient 51:269e65571b39 840 if (NULL != task->callback_list)
AzureIoTClient 51:269e65571b39 841 {
AzureIoTClient 51:269e65571b39 842 while ((list_node = singlylinkedlist_get_head_item(task->callback_list)) != NULL)
AzureIoTClient 51:269e65571b39 843 {
AzureIoTClient 51:269e65571b39 844 MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_node);
AzureIoTClient 51:269e65571b39 845 (void)singlylinkedlist_remove(task->callback_list, list_node);
AzureIoTClient 51:269e65571b39 846 free(caller_info);
AzureIoTClient 51:269e65571b39 847 }
AzureIoTClient 51:269e65571b39 848 singlylinkedlist_destroy(task->callback_list);
AzureIoTClient 51:269e65571b39 849 }
AzureIoTClient 51:269e65571b39 850
AzureIoTClient 51:269e65571b39 851 free(task);
AzureIoTClient 51:269e65571b39 852 }
AzureIoTClient 51:269e65571b39 853
AzureIoTClient 36:f78f9a56869e 854 static int copy_events_to_list(SINGLYLINKEDLIST_HANDLE from_list, SINGLYLINKEDLIST_HANDLE to_list)
AzureIoTClient 36:f78f9a56869e 855 {
AzureIoTClient 41:71c01aa3df1a 856 int result;
AzureIoTClient 41:71c01aa3df1a 857 LIST_ITEM_HANDLE list_item;
AzureIoTClient 41:71c01aa3df1a 858
AzureIoTClient 41:71c01aa3df1a 859 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 860 list_item = singlylinkedlist_get_head_item(from_list);
AzureIoTClient 41:71c01aa3df1a 861
AzureIoTClient 41:71c01aa3df1a 862 while (list_item != NULL)
AzureIoTClient 41:71c01aa3df1a 863 {
AzureIoTClient 41:71c01aa3df1a 864 MESSENGER_SEND_EVENT_TASK *task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_item);
AzureIoTClient 36:f78f9a56869e 865
AzureIoTClient 41:71c01aa3df1a 866 if (singlylinkedlist_add(to_list, task) == NULL)
AzureIoTClient 41:71c01aa3df1a 867 {
AzureIoTClient 41:71c01aa3df1a 868 LogError("Failed copying event to destination list (singlylinkedlist_add failed)");
AzureIoTClient 41:71c01aa3df1a 869 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 870 break;
AzureIoTClient 41:71c01aa3df1a 871 }
AzureIoTClient 41:71c01aa3df1a 872 else
AzureIoTClient 41:71c01aa3df1a 873 {
AzureIoTClient 41:71c01aa3df1a 874 list_item = singlylinkedlist_get_next_item(list_item);
AzureIoTClient 41:71c01aa3df1a 875 }
AzureIoTClient 41:71c01aa3df1a 876 }
AzureIoTClient 36:f78f9a56869e 877
AzureIoTClient 41:71c01aa3df1a 878 return result;
AzureIoTClient 41:71c01aa3df1a 879 }
AzureIoTClient 41:71c01aa3df1a 880
AzureIoTClient 41:71c01aa3df1a 881 static int copy_events_from_in_progress_to_waiting_list(TELEMETRY_MESSENGER_INSTANCE* instance, SINGLYLINKEDLIST_HANDLE to_list)
AzureIoTClient 41:71c01aa3df1a 882 {
AzureIoTClient 41:71c01aa3df1a 883 int result;
AzureIoTClient 41:71c01aa3df1a 884 LIST_ITEM_HANDLE list_task_item;
AzureIoTClient 51:269e65571b39 885 LIST_ITEM_HANDLE list_task_item_next;
AzureIoTClient 36:f78f9a56869e 886
AzureIoTClient 41:71c01aa3df1a 887 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 888 list_task_item = singlylinkedlist_get_head_item(instance->in_progress_list);
AzureIoTClient 36:f78f9a56869e 889
AzureIoTClient 41:71c01aa3df1a 890 while (list_task_item != NULL)
AzureIoTClient 41:71c01aa3df1a 891 {
AzureIoTClient 41:71c01aa3df1a 892 MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_task_item);
AzureIoTClient 56:8704100b3b54 893
AzureIoTClient 41:71c01aa3df1a 894 LIST_ITEM_HANDLE list_caller_item;
AzureIoTClient 56:8704100b3b54 895
AzureIoTClient 41:71c01aa3df1a 896 list_caller_item = singlylinkedlist_get_head_item(task->callback_list);
AzureIoTClient 56:8704100b3b54 897
AzureIoTClient 41:71c01aa3df1a 898 while (list_caller_item != NULL)
AzureIoTClient 41:71c01aa3df1a 899 {
AzureIoTClient 41:71c01aa3df1a 900 MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_information = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_caller_item);
AzureIoTClient 56:8704100b3b54 901
AzureIoTClient 41:71c01aa3df1a 902 if (singlylinkedlist_add(to_list, caller_information) == NULL)
AzureIoTClient 41:71c01aa3df1a 903 {
AzureIoTClient 41:71c01aa3df1a 904 LogError("Failed copying event to destination list (singlylinkedlist_add failed)");
AzureIoTClient 41:71c01aa3df1a 905 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 906 break;
AzureIoTClient 41:71c01aa3df1a 907 }
AzureIoTClient 41:71c01aa3df1a 908
AzureIoTClient 41:71c01aa3df1a 909 list_caller_item = singlylinkedlist_get_next_item(list_caller_item);
AzureIoTClient 41:71c01aa3df1a 910 }
AzureIoTClient 56:8704100b3b54 911
AzureIoTClient 51:269e65571b39 912 list_task_item_next = singlylinkedlist_get_next_item(list_task_item);
AzureIoTClient 51:269e65571b39 913
AzureIoTClient 51:269e65571b39 914 singlylinkedlist_destroy(task->callback_list);
AzureIoTClient 51:269e65571b39 915 task->callback_list = NULL;
AzureIoTClient 51:269e65571b39 916
AzureIoTClient 51:269e65571b39 917 free_task(task);
AzureIoTClient 51:269e65571b39 918 singlylinkedlist_remove(instance->in_progress_list, list_task_item);
AzureIoTClient 51:269e65571b39 919 list_task_item = list_task_item_next;
AzureIoTClient 41:71c01aa3df1a 920 }
AzureIoTClient 41:71c01aa3df1a 921
AzureIoTClient 41:71c01aa3df1a 922 return result;
AzureIoTClient 36:f78f9a56869e 923 }
AzureIoTClient 36:f78f9a56869e 924
AzureIoTClient 41:71c01aa3df1a 925
AzureIoTClient 36:f78f9a56869e 926 static int move_events_to_wait_to_send_list(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 927 {
AzureIoTClient 41:71c01aa3df1a 928 int result;
AzureIoTClient 41:71c01aa3df1a 929 LIST_ITEM_HANDLE list_item;
AzureIoTClient 41:71c01aa3df1a 930
AzureIoTClient 41:71c01aa3df1a 931 if ((list_item = singlylinkedlist_get_head_item(instance->in_progress_list)) == NULL)
AzureIoTClient 41:71c01aa3df1a 932 {
AzureIoTClient 41:71c01aa3df1a 933 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 934 }
AzureIoTClient 41:71c01aa3df1a 935 else
AzureIoTClient 41:71c01aa3df1a 936 {
AzureIoTClient 41:71c01aa3df1a 937 SINGLYLINKEDLIST_HANDLE new_wait_to_send_list;
AzureIoTClient 36:f78f9a56869e 938
AzureIoTClient 41:71c01aa3df1a 939 if ((new_wait_to_send_list = singlylinkedlist_create()) == NULL)
AzureIoTClient 41:71c01aa3df1a 940 {
AzureIoTClient 41:71c01aa3df1a 941 LogError("Failed moving events back to wait_to_send list (singlylinkedlist_create failed to create new wait_to_send_list)");
AzureIoTClient 41:71c01aa3df1a 942 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 943 }
AzureIoTClient 41:71c01aa3df1a 944 else
AzureIoTClient 41:71c01aa3df1a 945 {
AzureIoTClient 41:71c01aa3df1a 946 SINGLYLINKEDLIST_HANDLE new_in_progress_list;
AzureIoTClient 56:8704100b3b54 947
AzureIoTClient 41:71c01aa3df1a 948 if (copy_events_from_in_progress_to_waiting_list(instance, new_wait_to_send_list) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 949 {
AzureIoTClient 41:71c01aa3df1a 950 LogError("Failed moving events back to wait_to_send list (failed adding in_progress_list items to new_wait_to_send_list)");
AzureIoTClient 41:71c01aa3df1a 951 singlylinkedlist_destroy(new_wait_to_send_list);
AzureIoTClient 41:71c01aa3df1a 952 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 953 }
AzureIoTClient 41:71c01aa3df1a 954 else if (copy_events_to_list(instance->waiting_to_send, new_wait_to_send_list) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 955 {
AzureIoTClient 41:71c01aa3df1a 956 LogError("Failed moving events back to wait_to_send list (failed adding wait_to_send items to new_wait_to_send_list)");
AzureIoTClient 41:71c01aa3df1a 957 singlylinkedlist_destroy(new_wait_to_send_list);
AzureIoTClient 41:71c01aa3df1a 958 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 959 }
AzureIoTClient 41:71c01aa3df1a 960 else if ((new_in_progress_list = singlylinkedlist_create()) == NULL)
AzureIoTClient 41:71c01aa3df1a 961 {
AzureIoTClient 41:71c01aa3df1a 962 LogError("Failed moving events back to wait_to_send list (singlylinkedlist_create failed to create new in_progress_list)");
AzureIoTClient 41:71c01aa3df1a 963 singlylinkedlist_destroy(new_wait_to_send_list);
AzureIoTClient 41:71c01aa3df1a 964 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 965 }
AzureIoTClient 56:8704100b3b54 966 else
AzureIoTClient 41:71c01aa3df1a 967 {
AzureIoTClient 41:71c01aa3df1a 968 singlylinkedlist_destroy(instance->waiting_to_send);
AzureIoTClient 41:71c01aa3df1a 969 singlylinkedlist_destroy(instance->in_progress_list);
AzureIoTClient 41:71c01aa3df1a 970 instance->waiting_to_send = new_wait_to_send_list;
AzureIoTClient 41:71c01aa3df1a 971 instance->in_progress_list = new_in_progress_list;
AzureIoTClient 41:71c01aa3df1a 972 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 973 }
AzureIoTClient 41:71c01aa3df1a 974 }
AzureIoTClient 41:71c01aa3df1a 975 }
AzureIoTClient 41:71c01aa3df1a 976
AzureIoTClient 41:71c01aa3df1a 977 return result;
AzureIoTClient 41:71c01aa3df1a 978 }
AzureIoTClient 41:71c01aa3df1a 979
AzureIoTClient 41:71c01aa3df1a 980 static MESSENGER_SEND_EVENT_TASK* create_task(TELEMETRY_MESSENGER_INSTANCE *messenger)
AzureIoTClient 41:71c01aa3df1a 981 {
AzureIoTClient 41:71c01aa3df1a 982 MESSENGER_SEND_EVENT_TASK* task = NULL;
AzureIoTClient 36:f78f9a56869e 983
AzureIoTClient 41:71c01aa3df1a 984 if (NULL == (task = (MESSENGER_SEND_EVENT_TASK *)malloc(sizeof(MESSENGER_SEND_EVENT_TASK))))
AzureIoTClient 41:71c01aa3df1a 985 {
AzureIoTClient 41:71c01aa3df1a 986 LogError("malloc of MESSENGER_SEND_EVENT_TASK failed");
AzureIoTClient 41:71c01aa3df1a 987 }
AzureIoTClient 41:71c01aa3df1a 988 else
AzureIoTClient 41:71c01aa3df1a 989 {
AzureIoTClient 41:71c01aa3df1a 990 memset(task, 0, sizeof(*task ));
AzureIoTClient 41:71c01aa3df1a 991 task->messenger = messenger;
AzureIoTClient 41:71c01aa3df1a 992 task->send_time = INDEFINITE_TIME;
AzureIoTClient 41:71c01aa3df1a 993 if (NULL == (task->callback_list = singlylinkedlist_create()))
AzureIoTClient 41:71c01aa3df1a 994 {
AzureIoTClient 41:71c01aa3df1a 995 LogError("singlylinkedlist_create failed to create callback_list");
AzureIoTClient 41:71c01aa3df1a 996 free_task(task);
AzureIoTClient 41:71c01aa3df1a 997 task = NULL;
AzureIoTClient 41:71c01aa3df1a 998 }
AzureIoTClient 41:71c01aa3df1a 999 else if (RESULT_OK != move_event_to_in_progress_list(task))
AzureIoTClient 41:71c01aa3df1a 1000 {
AzureIoTClient 41:71c01aa3df1a 1001 LogError("move_event_to_in_progress_list failed");
AzureIoTClient 41:71c01aa3df1a 1002 free_task(task);
AzureIoTClient 41:71c01aa3df1a 1003 task = NULL;
AzureIoTClient 41:71c01aa3df1a 1004 }
AzureIoTClient 41:71c01aa3df1a 1005 }
AzureIoTClient 41:71c01aa3df1a 1006 return task;
AzureIoTClient 41:71c01aa3df1a 1007 }
AzureIoTClient 41:71c01aa3df1a 1008
AzureIoTClient 41:71c01aa3df1a 1009
AzureIoTClient 41:71c01aa3df1a 1010
AzureIoTClient 41:71c01aa3df1a 1011 static void invoke_callback(const void* item, const void* action_context, bool* continue_processing)
AzureIoTClient 41:71c01aa3df1a 1012 {
AzureIoTClient 41:71c01aa3df1a 1013 MESSENGER_SEND_EVENT_CALLER_INFORMATION *caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)item;
AzureIoTClient 41:71c01aa3df1a 1014
AzureIoTClient 41:71c01aa3df1a 1015 if (NULL != caller_info->on_event_send_complete_callback)
AzureIoTClient 41:71c01aa3df1a 1016 {
AzureIoTClient 41:71c01aa3df1a 1017 #pragma warning(push)
AzureIoTClient 41:71c01aa3df1a 1018 #pragma warning(disable:4305) // Allow typecasting to smaller type on 64 bit systems, since we control ultimate caller.
AzureIoTClient 41:71c01aa3df1a 1019 TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT messenger_send_result = (TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT)action_context;
AzureIoTClient 41:71c01aa3df1a 1020 #pragma warning(pop)
AzureIoTClient 41:71c01aa3df1a 1021 caller_info->on_event_send_complete_callback(caller_info->message, messenger_send_result, caller_info->context);
AzureIoTClient 41:71c01aa3df1a 1022 }
AzureIoTClient 41:71c01aa3df1a 1023 *continue_processing = true;
AzureIoTClient 36:f78f9a56869e 1024 }
AzureIoTClient 36:f78f9a56869e 1025
AzureIoTClient 56:8704100b3b54 1026 static void internal_on_event_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_result, AMQP_VALUE delivery_state)
AzureIoTClient 56:8704100b3b54 1027 {
AzureIoTClient 56:8704100b3b54 1028 (void)delivery_state;
AzureIoTClient 41:71c01aa3df1a 1029 if (context != NULL)
AzureIoTClient 41:71c01aa3df1a 1030 {
AzureIoTClient 41:71c01aa3df1a 1031 MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)context;
AzureIoTClient 41:71c01aa3df1a 1032
AzureIoTClient 41:71c01aa3df1a 1033 if (task->messenger->message_sender_current_state != MESSAGE_SENDER_STATE_ERROR)
AzureIoTClient 41:71c01aa3df1a 1034 {
AzureIoTClient 41:71c01aa3df1a 1035 if (task->is_timed_out == false)
AzureIoTClient 41:71c01aa3df1a 1036 {
AzureIoTClient 41:71c01aa3df1a 1037 TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT messenger_send_result;
AzureIoTClient 36:f78f9a56869e 1038
AzureIoTClient 41:71c01aa3df1a 1039 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_189: [If no failure occurs, `on_event_send_complete_callback` shall be invoked with result TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_OK for all callers associated with this task]
AzureIoTClient 41:71c01aa3df1a 1040 if (send_result == MESSAGE_SEND_OK)
AzureIoTClient 41:71c01aa3df1a 1041 {
AzureIoTClient 41:71c01aa3df1a 1042 messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1043 }
AzureIoTClient 41:71c01aa3df1a 1044 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_190: [If a failure occured, `on_event_send_complete_callback` shall be invoked with result TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING for all callers associated with this task]
AzureIoTClient 41:71c01aa3df1a 1045 else
AzureIoTClient 41:71c01aa3df1a 1046 {
AzureIoTClient 41:71c01aa3df1a 1047 messenger_send_result = TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING;
AzureIoTClient 41:71c01aa3df1a 1048 }
AzureIoTClient 41:71c01aa3df1a 1049
AzureIoTClient 41:71c01aa3df1a 1050 // Initially typecast to a size_t to avoid 64 bit compiler warnings on casting of void* to larger type.
AzureIoTClient 41:71c01aa3df1a 1051 singlylinkedlist_foreach(task->callback_list, invoke_callback, (void*)((size_t)messenger_send_result));
AzureIoTClient 41:71c01aa3df1a 1052 }
AzureIoTClient 41:71c01aa3df1a 1053 else
AzureIoTClient 41:71c01aa3df1a 1054 {
AzureIoTClient 41:71c01aa3df1a 1055 LogInfo("messenger on_event_send_complete_callback invoked for timed out event %p; not firing upper layer callback.", task);
AzureIoTClient 41:71c01aa3df1a 1056 }
AzureIoTClient 36:f78f9a56869e 1057
AzureIoTClient 56:8704100b3b54 1058 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_128: [`task` shall be removed from `instance->in_progress_list`]
AzureIoTClient 41:71c01aa3df1a 1059 remove_event_from_in_progress_list(task);
AzureIoTClient 41:71c01aa3df1a 1060
AzureIoTClient 41:71c01aa3df1a 1061 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_130: [`task` shall be destroyed()]
AzureIoTClient 41:71c01aa3df1a 1062 free_task(task);
AzureIoTClient 41:71c01aa3df1a 1063 }
AzureIoTClient 41:71c01aa3df1a 1064 }
AzureIoTClient 41:71c01aa3df1a 1065 }
AzureIoTClient 41:71c01aa3df1a 1066
AzureIoTClient 41:71c01aa3df1a 1067 static MESSENGER_SEND_EVENT_CALLER_INFORMATION* get_next_caller_message_to_send(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 41:71c01aa3df1a 1068 {
AzureIoTClient 41:71c01aa3df1a 1069 MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info;
AzureIoTClient 41:71c01aa3df1a 1070 LIST_ITEM_HANDLE list_item;
AzureIoTClient 36:f78f9a56869e 1071
AzureIoTClient 41:71c01aa3df1a 1072 if ((list_item = singlylinkedlist_get_head_item(instance->waiting_to_send)) == NULL)
AzureIoTClient 41:71c01aa3df1a 1073 {
AzureIoTClient 41:71c01aa3df1a 1074 caller_info = NULL;
AzureIoTClient 41:71c01aa3df1a 1075 }
AzureIoTClient 41:71c01aa3df1a 1076 else
AzureIoTClient 41:71c01aa3df1a 1077 {
AzureIoTClient 41:71c01aa3df1a 1078 caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_item);
AzureIoTClient 36:f78f9a56869e 1079
AzureIoTClient 41:71c01aa3df1a 1080 if (singlylinkedlist_remove(instance->waiting_to_send, list_item) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1081 {
AzureIoTClient 41:71c01aa3df1a 1082 LogError("Failed removing item from waiting_to_send list (singlylinkedlist_remove failed)");
AzureIoTClient 41:71c01aa3df1a 1083 }
AzureIoTClient 41:71c01aa3df1a 1084 }
AzureIoTClient 36:f78f9a56869e 1085
AzureIoTClient 41:71c01aa3df1a 1086 return caller_info;
AzureIoTClient 36:f78f9a56869e 1087 }
AzureIoTClient 36:f78f9a56869e 1088
AzureIoTClient 41:71c01aa3df1a 1089 typedef struct SEND_PENDING_EVENTS_STATE_TAG
AzureIoTClient 41:71c01aa3df1a 1090 {
AzureIoTClient 41:71c01aa3df1a 1091 MESSENGER_SEND_EVENT_TASK* task;
AzureIoTClient 41:71c01aa3df1a 1092 MESSAGE_HANDLE message_batch_container;
AzureIoTClient 41:71c01aa3df1a 1093 uint64_t bytes_pending;
AzureIoTClient 41:71c01aa3df1a 1094 } SEND_PENDING_EVENTS_STATE;
AzureIoTClient 41:71c01aa3df1a 1095
AzureIoTClient 41:71c01aa3df1a 1096
AzureIoTClient 56:8704100b3b54 1097 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_191: [Creates an AMQP message, sets it to be batch mode, and creates an associated task for its callbacks. Errors cause the send events loop to break.]
AzureIoTClient 41:71c01aa3df1a 1098 static int create_send_pending_events_state(TELEMETRY_MESSENGER_INSTANCE* instance, SEND_PENDING_EVENTS_STATE *send_pending_events_state)
AzureIoTClient 36:f78f9a56869e 1099 {
AzureIoTClient 41:71c01aa3df1a 1100 int result;
AzureIoTClient 41:71c01aa3df1a 1101 memset(send_pending_events_state, 0, sizeof(*send_pending_events_state));
AzureIoTClient 41:71c01aa3df1a 1102
AzureIoTClient 41:71c01aa3df1a 1103 if ((send_pending_events_state->message_batch_container = message_create()) == NULL)
AzureIoTClient 41:71c01aa3df1a 1104 {
AzureIoTClient 41:71c01aa3df1a 1105 LogError("messageBatchContainer = message_create() failed");
AzureIoTClient 41:71c01aa3df1a 1106 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1107 }
AzureIoTClient 41:71c01aa3df1a 1108 else if (message_set_message_format(send_pending_events_state->message_batch_container, AMQP_BATCHING_FORMAT_CODE) != 0)
AzureIoTClient 41:71c01aa3df1a 1109 {
AzureIoTClient 46:c688c75b63b9 1110 LogError("Failed setting the message format to batching format");
AzureIoTClient 46:c688c75b63b9 1111 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1112 }
AzureIoTClient 41:71c01aa3df1a 1113 else if ((send_pending_events_state->task = create_task(instance)) == NULL)
AzureIoTClient 41:71c01aa3df1a 1114 {
AzureIoTClient 41:71c01aa3df1a 1115 LogError("create_task() failed");
AzureIoTClient 41:71c01aa3df1a 1116 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1117 }
AzureIoTClient 41:71c01aa3df1a 1118 else
AzureIoTClient 41:71c01aa3df1a 1119 {
AzureIoTClient 41:71c01aa3df1a 1120 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1121 }
AzureIoTClient 41:71c01aa3df1a 1122
AzureIoTClient 41:71c01aa3df1a 1123 return result;
AzureIoTClient 41:71c01aa3df1a 1124 }
AzureIoTClient 41:71c01aa3df1a 1125
AzureIoTClient 41:71c01aa3df1a 1126 static void invoke_callback_on_error(MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT messenger_event_send_complete_result)
AzureIoTClient 41:71c01aa3df1a 1127 {
AzureIoTClient 41:71c01aa3df1a 1128 caller_info->on_event_send_complete_callback(caller_info->message, messenger_event_send_complete_result, (void*)caller_info->context);
AzureIoTClient 41:71c01aa3df1a 1129 }
AzureIoTClient 36:f78f9a56869e 1130
AzureIoTClient 41:71c01aa3df1a 1131 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_194: [When message is ready to send, invoke AMQP's messagesender_send and free temporary values associated with this batch.]
AzureIoTClient 41:71c01aa3df1a 1132 static int send_batched_message_and_reset_state(TELEMETRY_MESSENGER_INSTANCE* instance, SEND_PENDING_EVENTS_STATE *send_pending_events_state)
AzureIoTClient 41:71c01aa3df1a 1133 {
AzureIoTClient 41:71c01aa3df1a 1134 int result;
AzureIoTClient 41:71c01aa3df1a 1135
AzureIoTClient 43:3da2d93bb955 1136 if (messagesender_send_async(instance->message_sender, send_pending_events_state->message_batch_container, internal_on_event_send_complete_callback, send_pending_events_state->task, 0) == NULL)
AzureIoTClient 41:71c01aa3df1a 1137 {
AzureIoTClient 41:71c01aa3df1a 1138 LogError("messagesender_send failed");
AzureIoTClient 41:71c01aa3df1a 1139 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1140 }
AzureIoTClient 41:71c01aa3df1a 1141 else
AzureIoTClient 41:71c01aa3df1a 1142 {
AzureIoTClient 41:71c01aa3df1a 1143 send_pending_events_state->task->send_time = get_time(NULL);
AzureIoTClient 41:71c01aa3df1a 1144 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1145 }
AzureIoTClient 41:71c01aa3df1a 1146
AzureIoTClient 41:71c01aa3df1a 1147 message_destroy(send_pending_events_state->message_batch_container);
AzureIoTClient 41:71c01aa3df1a 1148 memset(send_pending_events_state, 0, sizeof(*send_pending_events_state));
AzureIoTClient 41:71c01aa3df1a 1149
AzureIoTClient 41:71c01aa3df1a 1150 return result;
AzureIoTClient 41:71c01aa3df1a 1151 }
AzureIoTClient 36:f78f9a56869e 1152
AzureIoTClient 56:8704100b3b54 1153 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_196: [Determine the maximum message size we can send over this link from AMQP, then remove AMQP_BATCHING_RESERVE_SIZE (1024) bytes as reserve buffer.]
AzureIoTClient 41:71c01aa3df1a 1154 static int get_max_message_size_for_batching(TELEMETRY_MESSENGER_INSTANCE* instance, uint64_t* max_messagesize)
AzureIoTClient 41:71c01aa3df1a 1155 {
AzureIoTClient 41:71c01aa3df1a 1156 int result;
AzureIoTClient 36:f78f9a56869e 1157
AzureIoTClient 41:71c01aa3df1a 1158 if (link_get_peer_max_message_size(instance->sender_link, max_messagesize) != 0)
AzureIoTClient 41:71c01aa3df1a 1159 {
AzureIoTClient 41:71c01aa3df1a 1160 LogError("link_get_peer_max_message_size failed");
AzureIoTClient 41:71c01aa3df1a 1161 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1162 }
AzureIoTClient 41:71c01aa3df1a 1163 // Reserve AMQP_BATCHING_RESERVE_SIZE bytes for AMQP overhead of the "main" message itself.
AzureIoTClient 41:71c01aa3df1a 1164 else if (*max_messagesize <= AMQP_BATCHING_RESERVE_SIZE)
AzureIoTClient 41:71c01aa3df1a 1165 {
AzureIoTClient 41:71c01aa3df1a 1166 LogError("link_get_peer_max_message_size (%d) is less than the reserve size (%d)", max_messagesize, AMQP_BATCHING_RESERVE_SIZE);
AzureIoTClient 41:71c01aa3df1a 1167 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1168 }
AzureIoTClient 41:71c01aa3df1a 1169 else
AzureIoTClient 41:71c01aa3df1a 1170 {
AzureIoTClient 41:71c01aa3df1a 1171 *max_messagesize -= AMQP_BATCHING_RESERVE_SIZE;
AzureIoTClient 41:71c01aa3df1a 1172 result = 0;
AzureIoTClient 41:71c01aa3df1a 1173 }
AzureIoTClient 41:71c01aa3df1a 1174
AzureIoTClient 41:71c01aa3df1a 1175 return result;
AzureIoTClient 36:f78f9a56869e 1176 }
AzureIoTClient 36:f78f9a56869e 1177
AzureIoTClient 36:f78f9a56869e 1178 static int send_pending_events(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 1179 {
AzureIoTClient 41:71c01aa3df1a 1180 int result = RESULT_OK;
AzureIoTClient 36:f78f9a56869e 1181
AzureIoTClient 41:71c01aa3df1a 1182 MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info;
AzureIoTClient 41:71c01aa3df1a 1183 BINARY_DATA body_binary_data;
AzureIoTClient 36:f78f9a56869e 1184
AzureIoTClient 41:71c01aa3df1a 1185 SEND_PENDING_EVENTS_STATE send_pending_events_state;
AzureIoTClient 41:71c01aa3df1a 1186 memset(&send_pending_events_state, 0, sizeof(send_pending_events_state));
AzureIoTClient 41:71c01aa3df1a 1187 memset(&body_binary_data, 0, sizeof(body_binary_data));
AzureIoTClient 36:f78f9a56869e 1188
AzureIoTClient 41:71c01aa3df1a 1189 uint64_t max_messagesize = 0;
AzureIoTClient 36:f78f9a56869e 1190
AzureIoTClient 41:71c01aa3df1a 1191 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_192: [Enumerate through all messages waiting to send, building up AMQP message to send and sending when size will be greater than link max size.]
AzureIoTClient 56:8704100b3b54 1192 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_198: [While processing pending messages, errors shall result in user callback being invoked.]
AzureIoTClient 41:71c01aa3df1a 1193 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_199: [Errors specific to a message (e.g. failure to encode) are NOT fatal but we'll keep processing. More general errors (e.g. out of memory) will stop processing.]
AzureIoTClient 41:71c01aa3df1a 1194 while ((caller_info = get_next_caller_message_to_send(instance)) != NULL)
AzureIoTClient 41:71c01aa3df1a 1195 {
AzureIoTClient 41:71c01aa3df1a 1196 if (body_binary_data.bytes != NULL)
AzureIoTClient 41:71c01aa3df1a 1197 {
AzureIoTClient 41:71c01aa3df1a 1198 // Free here as this may have been set during previous run through loop.
AzureIoTClient 41:71c01aa3df1a 1199 free((unsigned char*)body_binary_data.bytes);
AzureIoTClient 41:71c01aa3df1a 1200 }
AzureIoTClient 41:71c01aa3df1a 1201 memset(&body_binary_data, 0, sizeof(body_binary_data));
AzureIoTClient 56:8704100b3b54 1202
AzureIoTClient 41:71c01aa3df1a 1203 if ((0 == max_messagesize) && (get_max_message_size_for_batching(instance, &max_messagesize)) != 0)
AzureIoTClient 41:71c01aa3df1a 1204 {
AzureIoTClient 41:71c01aa3df1a 1205 LogError("get_max_message_size_for_batching failed");
AzureIoTClient 41:71c01aa3df1a 1206 invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING);
AzureIoTClient 41:71c01aa3df1a 1207 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1208 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1209 break;
AzureIoTClient 41:71c01aa3df1a 1210 }
AzureIoTClient 41:71c01aa3df1a 1211 else if ((send_pending_events_state.task == 0) && (create_send_pending_events_state(instance, &send_pending_events_state) != 0))
AzureIoTClient 41:71c01aa3df1a 1212 {
AzureIoTClient 41:71c01aa3df1a 1213 LogError("create_send_pending_events_state failed");
AzureIoTClient 41:71c01aa3df1a 1214 invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING);
AzureIoTClient 41:71c01aa3df1a 1215 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1216 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1217 break;
AzureIoTClient 41:71c01aa3df1a 1218 }
AzureIoTClient 41:71c01aa3df1a 1219 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_200: [Retrieve an AMQP encoded representation of this message for later appending to main batched message. On error, invoke callback but continue send loop; this is NOT a fatal error.]
AzureIoTClient 46:c688c75b63b9 1220 else if (message_create_uamqp_encoding_from_iothub_message(send_pending_events_state.message_batch_container, caller_info->message->messageHandle, &body_binary_data) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1221 {
AzureIoTClient 41:71c01aa3df1a 1222 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_201: [If message_create_uamqp_encoding_from_iothub_message fails, invoke callback with TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE]
AzureIoTClient 41:71c01aa3df1a 1223 LogError("message_create_uamqp_encoding_from_iothub_message() failed. Will continue to try to process messages, result");
AzureIoTClient 41:71c01aa3df1a 1224 invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_CANNOT_PARSE);
AzureIoTClient 41:71c01aa3df1a 1225 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1226 continue;
AzureIoTClient 41:71c01aa3df1a 1227 }
AzureIoTClient 41:71c01aa3df1a 1228 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_197: [If a single message is greater than our maximum AMQP send size, ignore the message. Invoke the callback but continue send loop; this is NOT a fatal error.]
AzureIoTClient 41:71c01aa3df1a 1229 else if (body_binary_data.length > (int)max_messagesize)
AzureIoTClient 41:71c01aa3df1a 1230 {
AzureIoTClient 41:71c01aa3df1a 1231 LogError("a single message will encode to be %d bytes, larger than max we will send the link %lld. Will continue to try to process messages", body_binary_data.length, max_messagesize);
AzureIoTClient 41:71c01aa3df1a 1232 invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING);
AzureIoTClient 41:71c01aa3df1a 1233 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1234 continue;
AzureIoTClient 41:71c01aa3df1a 1235 }
AzureIoTClient 41:71c01aa3df1a 1236 else if (singlylinkedlist_add(send_pending_events_state.task->callback_list, (void*)caller_info) == NULL)
AzureIoTClient 41:71c01aa3df1a 1237 {
AzureIoTClient 41:71c01aa3df1a 1238 LogError("singlylinkedlist_add failed");
AzureIoTClient 41:71c01aa3df1a 1239 invoke_callback_on_error(caller_info, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING);
AzureIoTClient 41:71c01aa3df1a 1240 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1241 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1242 break;
AzureIoTClient 41:71c01aa3df1a 1243 }
AzureIoTClient 36:f78f9a56869e 1244
AzureIoTClient 41:71c01aa3df1a 1245 // Once we've added the caller_info to the callback_list, don't directly 'invoke_callback_on_error' anymore directly.
AzureIoTClient 41:71c01aa3df1a 1246 // The task is responsible for running through its callers for callbacks, even for errors in this function.
AzureIoTClient 41:71c01aa3df1a 1247 // Similarly, responsibility for freeing this memory falls on the 'task' cleanup also.
AzureIoTClient 36:f78f9a56869e 1248
AzureIoTClient 41:71c01aa3df1a 1249 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_193: [If (length of current user AMQP message) + (length of user messages pending for this batched message) + (1KB reserve buffer) > maximum link send, send pending messages and create new batched message.]
AzureIoTClient 41:71c01aa3df1a 1250 if (body_binary_data.length + send_pending_events_state.bytes_pending > max_messagesize)
AzureIoTClient 41:71c01aa3df1a 1251 {
AzureIoTClient 41:71c01aa3df1a 1252 // If we tried to add the current message, we would overflow. Send what we've queued immediately.
AzureIoTClient 41:71c01aa3df1a 1253 if (send_batched_message_and_reset_state(instance, &send_pending_events_state) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1254 {
AzureIoTClient 41:71c01aa3df1a 1255 LogError("send_batched_message_and_reset_state failed");
AzureIoTClient 41:71c01aa3df1a 1256 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1257 break;
AzureIoTClient 41:71c01aa3df1a 1258 }
AzureIoTClient 36:f78f9a56869e 1259
AzureIoTClient 41:71c01aa3df1a 1260 // Now that we've given off the task to uAMQP layer, allocate a new task.
AzureIoTClient 41:71c01aa3df1a 1261 if (create_send_pending_events_state(instance, &send_pending_events_state) != 0)
AzureIoTClient 41:71c01aa3df1a 1262 {
AzureIoTClient 41:71c01aa3df1a 1263 LogError("create_send_pending_events_state failed, result");
AzureIoTClient 41:71c01aa3df1a 1264 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1265 break;
AzureIoTClient 41:71c01aa3df1a 1266 }
AzureIoTClient 41:71c01aa3df1a 1267 }
AzureIoTClient 36:f78f9a56869e 1268
AzureIoTClient 41:71c01aa3df1a 1269 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_31_195: [Append the current message's encoded data to the batched message tracked by uAMQP layer.]
AzureIoTClient 41:71c01aa3df1a 1270 if (message_add_body_amqp_data(send_pending_events_state.message_batch_container, body_binary_data) != 0)
AzureIoTClient 41:71c01aa3df1a 1271 {
AzureIoTClient 41:71c01aa3df1a 1272 LogError("message_add_body_amqp_data failed");
AzureIoTClient 41:71c01aa3df1a 1273 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1274 break;
AzureIoTClient 41:71c01aa3df1a 1275 }
AzureIoTClient 36:f78f9a56869e 1276
AzureIoTClient 41:71c01aa3df1a 1277 send_pending_events_state.bytes_pending += body_binary_data.length;
AzureIoTClient 41:71c01aa3df1a 1278 }
AzureIoTClient 41:71c01aa3df1a 1279
AzureIoTClient 41:71c01aa3df1a 1280 if ((result == 0) && (send_pending_events_state.bytes_pending != 0))
AzureIoTClient 41:71c01aa3df1a 1281 {
AzureIoTClient 41:71c01aa3df1a 1282 if (send_batched_message_and_reset_state(instance, &send_pending_events_state) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1283 {
AzureIoTClient 41:71c01aa3df1a 1284 LogError("send_batched_message_and_reset_state failed");
AzureIoTClient 41:71c01aa3df1a 1285 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1286 }
AzureIoTClient 41:71c01aa3df1a 1287 }
AzureIoTClient 36:f78f9a56869e 1288
AzureIoTClient 41:71c01aa3df1a 1289 if (body_binary_data.bytes != NULL)
AzureIoTClient 41:71c01aa3df1a 1290 {
AzureIoTClient 41:71c01aa3df1a 1291 free((unsigned char*)body_binary_data.bytes);
AzureIoTClient 41:71c01aa3df1a 1292 }
AzureIoTClient 36:f78f9a56869e 1293
AzureIoTClient 41:71c01aa3df1a 1294 // A non-NULL task indicates error, since otherwise send_batched_message_and_reset_state would've sent off messages and reset send_pending_events_state
AzureIoTClient 41:71c01aa3df1a 1295 if (send_pending_events_state.task != NULL)
AzureIoTClient 41:71c01aa3df1a 1296 {
AzureIoTClient 41:71c01aa3df1a 1297 singlylinkedlist_foreach(send_pending_events_state.task->callback_list, invoke_callback, (void*)TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_FAIL_SENDING);
AzureIoTClient 41:71c01aa3df1a 1298 remove_event_from_in_progress_list(send_pending_events_state.task);
AzureIoTClient 41:71c01aa3df1a 1299 free_task(send_pending_events_state.task);
AzureIoTClient 41:71c01aa3df1a 1300 }
AzureIoTClient 41:71c01aa3df1a 1301
AzureIoTClient 41:71c01aa3df1a 1302 if (send_pending_events_state.message_batch_container != NULL)
AzureIoTClient 41:71c01aa3df1a 1303 {
AzureIoTClient 41:71c01aa3df1a 1304 message_destroy(send_pending_events_state.message_batch_container);
AzureIoTClient 41:71c01aa3df1a 1305 }
AzureIoTClient 41:71c01aa3df1a 1306
AzureIoTClient 41:71c01aa3df1a 1307 return result;
AzureIoTClient 36:f78f9a56869e 1308 }
AzureIoTClient 36:f78f9a56869e 1309
AzureIoTClient 36:f78f9a56869e 1310 // @brief
AzureIoTClient 36:f78f9a56869e 1311 // Goes through each task in in_progress_list and checks if the events timed out to be sent.
AzureIoTClient 36:f78f9a56869e 1312 // @remarks
AzureIoTClient 36:f78f9a56869e 1313 // If an event is timed out, it is marked as such but not removed, and the upper layer callback is invoked.
AzureIoTClient 36:f78f9a56869e 1314 // @returns
AzureIoTClient 36:f78f9a56869e 1315 // 0 if no failures occur, non-zero otherwise.
AzureIoTClient 36:f78f9a56869e 1316 static int process_event_send_timeouts(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 1317 {
AzureIoTClient 41:71c01aa3df1a 1318 int result = RESULT_OK;
AzureIoTClient 36:f78f9a56869e 1319
AzureIoTClient 41:71c01aa3df1a 1320 if (instance->event_send_timeout_secs > 0)
AzureIoTClient 41:71c01aa3df1a 1321 {
AzureIoTClient 41:71c01aa3df1a 1322 LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(instance->in_progress_list);
AzureIoTClient 36:f78f9a56869e 1323
AzureIoTClient 41:71c01aa3df1a 1324 while (list_item != NULL)
AzureIoTClient 41:71c01aa3df1a 1325 {
AzureIoTClient 41:71c01aa3df1a 1326 MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_item);
AzureIoTClient 36:f78f9a56869e 1327
AzureIoTClient 41:71c01aa3df1a 1328 if (task->is_timed_out == false)
AzureIoTClient 41:71c01aa3df1a 1329 {
AzureIoTClient 41:71c01aa3df1a 1330 int is_timed_out;
AzureIoTClient 36:f78f9a56869e 1331
AzureIoTClient 41:71c01aa3df1a 1332 if (is_timeout_reached(task->send_time, instance->event_send_timeout_secs, &is_timed_out) == RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1333 {
AzureIoTClient 41:71c01aa3df1a 1334 if (is_timed_out)
AzureIoTClient 41:71c01aa3df1a 1335 {
AzureIoTClient 41:71c01aa3df1a 1336 task->is_timed_out = true;
AzureIoTClient 41:71c01aa3df1a 1337 singlylinkedlist_foreach(task->callback_list, invoke_callback, (void*)TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_ERROR_TIMEOUT);
AzureIoTClient 41:71c01aa3df1a 1338 }
AzureIoTClient 41:71c01aa3df1a 1339 }
AzureIoTClient 41:71c01aa3df1a 1340 else
AzureIoTClient 41:71c01aa3df1a 1341 {
AzureIoTClient 41:71c01aa3df1a 1342 LogError("messenger failed to evaluate event send timeout of event %d", task);
AzureIoTClient 41:71c01aa3df1a 1343 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1344 }
AzureIoTClient 41:71c01aa3df1a 1345 }
AzureIoTClient 36:f78f9a56869e 1346
AzureIoTClient 41:71c01aa3df1a 1347 list_item = singlylinkedlist_get_next_item(list_item);
AzureIoTClient 41:71c01aa3df1a 1348 }
AzureIoTClient 41:71c01aa3df1a 1349 }
AzureIoTClient 36:f78f9a56869e 1350
AzureIoTClient 41:71c01aa3df1a 1351 return result;
AzureIoTClient 36:f78f9a56869e 1352 }
AzureIoTClient 36:f78f9a56869e 1353
AzureIoTClient 36:f78f9a56869e 1354 // @brief
AzureIoTClient 36:f78f9a56869e 1355 // Removes all the timed out events from the in_progress_list, without invoking callbacks or detroying the messages.
AzureIoTClient 36:f78f9a56869e 1356 static void remove_timed_out_events(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 1357 {
AzureIoTClient 41:71c01aa3df1a 1358 LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(instance->in_progress_list);
AzureIoTClient 36:f78f9a56869e 1359
AzureIoTClient 41:71c01aa3df1a 1360 while (list_item != NULL)
AzureIoTClient 41:71c01aa3df1a 1361 {
AzureIoTClient 41:71c01aa3df1a 1362 MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_item);
AzureIoTClient 36:f78f9a56869e 1363
AzureIoTClient 41:71c01aa3df1a 1364 if (task->is_timed_out == true)
AzureIoTClient 41:71c01aa3df1a 1365 {
AzureIoTClient 41:71c01aa3df1a 1366 remove_event_from_in_progress_list(task);
AzureIoTClient 36:f78f9a56869e 1367
AzureIoTClient 41:71c01aa3df1a 1368 free_task(task);
AzureIoTClient 41:71c01aa3df1a 1369 }
AzureIoTClient 36:f78f9a56869e 1370
AzureIoTClient 41:71c01aa3df1a 1371 list_item = singlylinkedlist_get_next_item(list_item);
AzureIoTClient 41:71c01aa3df1a 1372 }
AzureIoTClient 36:f78f9a56869e 1373 }
AzureIoTClient 36:f78f9a56869e 1374
AzureIoTClient 36:f78f9a56869e 1375
AzureIoTClient 36:f78f9a56869e 1376 // ---------- Set/Retrieve Options Helpers ----------//
AzureIoTClient 36:f78f9a56869e 1377
AzureIoTClient 36:f78f9a56869e 1378 static void* telemetry_messenger_clone_option(const char* name, const void* value)
AzureIoTClient 36:f78f9a56869e 1379 {
AzureIoTClient 41:71c01aa3df1a 1380 void* result;
AzureIoTClient 36:f78f9a56869e 1381
AzureIoTClient 41:71c01aa3df1a 1382 if (name == NULL)
AzureIoTClient 41:71c01aa3df1a 1383 {
AzureIoTClient 41:71c01aa3df1a 1384 LogError("Failed to clone messenger option (name is NULL)");
AzureIoTClient 41:71c01aa3df1a 1385 result = NULL;
AzureIoTClient 41:71c01aa3df1a 1386 }
AzureIoTClient 41:71c01aa3df1a 1387 else if (value == NULL)
AzureIoTClient 41:71c01aa3df1a 1388 {
AzureIoTClient 41:71c01aa3df1a 1389 LogError("Failed to clone messenger option (value is NULL)");
AzureIoTClient 41:71c01aa3df1a 1390 result = NULL;
AzureIoTClient 41:71c01aa3df1a 1391 }
AzureIoTClient 41:71c01aa3df1a 1392 else
AzureIoTClient 41:71c01aa3df1a 1393 {
AzureIoTClient 51:269e65571b39 1394 if (strcmp(TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0 ||
AzureIoTClient 51:269e65571b39 1395 strcmp(TELEMETRY_MESSENGER_OPTION_SAVED_OPTIONS, name) == 0)
AzureIoTClient 41:71c01aa3df1a 1396 {
AzureIoTClient 41:71c01aa3df1a 1397 result = (void*)value;
AzureIoTClient 41:71c01aa3df1a 1398 }
AzureIoTClient 41:71c01aa3df1a 1399 else
AzureIoTClient 41:71c01aa3df1a 1400 {
AzureIoTClient 41:71c01aa3df1a 1401 LogError("Failed to clone messenger option (option with name '%s' is not suppported)", name);
AzureIoTClient 41:71c01aa3df1a 1402 result = NULL;
AzureIoTClient 41:71c01aa3df1a 1403 }
AzureIoTClient 41:71c01aa3df1a 1404 }
AzureIoTClient 36:f78f9a56869e 1405
AzureIoTClient 41:71c01aa3df1a 1406 return result;
AzureIoTClient 36:f78f9a56869e 1407 }
AzureIoTClient 36:f78f9a56869e 1408
AzureIoTClient 36:f78f9a56869e 1409 static void telemetry_messenger_destroy_option(const char* name, const void* value)
AzureIoTClient 36:f78f9a56869e 1410 {
AzureIoTClient 41:71c01aa3df1a 1411 if (name == NULL)
AzureIoTClient 41:71c01aa3df1a 1412 {
AzureIoTClient 41:71c01aa3df1a 1413 LogError("Failed to destroy messenger option (name is NULL)");
AzureIoTClient 41:71c01aa3df1a 1414 }
AzureIoTClient 41:71c01aa3df1a 1415 else if (value == NULL)
AzureIoTClient 41:71c01aa3df1a 1416 {
AzureIoTClient 41:71c01aa3df1a 1417 LogError("Failed to destroy messenger option (value is NULL)");
AzureIoTClient 41:71c01aa3df1a 1418 }
AzureIoTClient 41:71c01aa3df1a 1419 else
AzureIoTClient 41:71c01aa3df1a 1420 {
AzureIoTClient 41:71c01aa3df1a 1421 // Nothing to be done for the supported options.
AzureIoTClient 41:71c01aa3df1a 1422 }
AzureIoTClient 36:f78f9a56869e 1423 }
AzureIoTClient 36:f78f9a56869e 1424
AzureIoTClient 36:f78f9a56869e 1425
AzureIoTClient 36:f78f9a56869e 1426 // Public API:
AzureIoTClient 36:f78f9a56869e 1427
AzureIoTClient 36:f78f9a56869e 1428 int telemetry_messenger_subscribe_for_messages(TELEMETRY_MESSENGER_HANDLE messenger_handle, ON_TELEMETRY_MESSENGER_MESSAGE_RECEIVED on_message_received_callback, void* context)
AzureIoTClient 36:f78f9a56869e 1429 {
AzureIoTClient 41:71c01aa3df1a 1430 int result;
AzureIoTClient 36:f78f9a56869e 1431
AzureIoTClient 41:71c01aa3df1a 1432 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_016: [If `messenger_handle` is NULL, telemetry_messenger_subscribe_for_messages() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1433 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1434 {
AzureIoTClient 41:71c01aa3df1a 1435 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1436 LogError("telemetry_messenger_subscribe_for_messages failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1437 }
AzureIoTClient 41:71c01aa3df1a 1438 else
AzureIoTClient 41:71c01aa3df1a 1439 {
AzureIoTClient 41:71c01aa3df1a 1440 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1441
AzureIoTClient 41:71c01aa3df1a 1442 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_017: [If `instance->receive_messages` is already true, telemetry_messenger_subscribe_for_messages() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1443 if (instance->receive_messages)
AzureIoTClient 41:71c01aa3df1a 1444 {
AzureIoTClient 41:71c01aa3df1a 1445 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1446 LogError("telemetry_messenger_subscribe_for_messages failed (messenger already subscribed)");
AzureIoTClient 41:71c01aa3df1a 1447 }
AzureIoTClient 41:71c01aa3df1a 1448 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_018: [If `on_message_received_callback` is NULL, telemetry_messenger_subscribe_for_messages() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1449 else if (on_message_received_callback == NULL)
AzureIoTClient 41:71c01aa3df1a 1450 {
AzureIoTClient 41:71c01aa3df1a 1451 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1452 LogError("telemetry_messenger_subscribe_for_messages failed (on_message_received_callback is NULL)");
AzureIoTClient 41:71c01aa3df1a 1453 }
AzureIoTClient 41:71c01aa3df1a 1454 else
AzureIoTClient 41:71c01aa3df1a 1455 {
AzureIoTClient 41:71c01aa3df1a 1456 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_019: [`on_message_received_callback` shall be saved on `instance->on_message_received_callback`]
AzureIoTClient 41:71c01aa3df1a 1457 instance->on_message_received_callback = on_message_received_callback;
AzureIoTClient 36:f78f9a56869e 1458
AzureIoTClient 41:71c01aa3df1a 1459 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_020: [`context` shall be saved on `instance->on_message_received_context`]
AzureIoTClient 41:71c01aa3df1a 1460 instance->on_message_received_context = context;
AzureIoTClient 36:f78f9a56869e 1461
AzureIoTClient 41:71c01aa3df1a 1462 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_021: [telemetry_messenger_subscribe_for_messages() shall set `instance->receive_messages` to true]
AzureIoTClient 41:71c01aa3df1a 1463 instance->receive_messages = true;
AzureIoTClient 36:f78f9a56869e 1464
AzureIoTClient 41:71c01aa3df1a 1465 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_022: [If no failures occurr, telemetry_messenger_subscribe_for_messages() shall return 0]
AzureIoTClient 41:71c01aa3df1a 1466 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1467 }
AzureIoTClient 41:71c01aa3df1a 1468 }
AzureIoTClient 36:f78f9a56869e 1469
AzureIoTClient 41:71c01aa3df1a 1470 return result;
AzureIoTClient 36:f78f9a56869e 1471 }
AzureIoTClient 36:f78f9a56869e 1472
AzureIoTClient 36:f78f9a56869e 1473 int telemetry_messenger_unsubscribe_for_messages(TELEMETRY_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 36:f78f9a56869e 1474 {
AzureIoTClient 41:71c01aa3df1a 1475 int result;
AzureIoTClient 36:f78f9a56869e 1476
AzureIoTClient 41:71c01aa3df1a 1477 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_023: [If `messenger_handle` is NULL, telemetry_messenger_unsubscribe_for_messages() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1478 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1479 {
AzureIoTClient 41:71c01aa3df1a 1480 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1481 LogError("telemetry_messenger_unsubscribe_for_messages failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1482 }
AzureIoTClient 41:71c01aa3df1a 1483 else
AzureIoTClient 41:71c01aa3df1a 1484 {
AzureIoTClient 41:71c01aa3df1a 1485 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1486
AzureIoTClient 41:71c01aa3df1a 1487 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_024: [If `instance->receive_messages` is already false, telemetry_messenger_unsubscribe_for_messages() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1488 if (instance->receive_messages == false)
AzureIoTClient 41:71c01aa3df1a 1489 {
AzureIoTClient 41:71c01aa3df1a 1490 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1491 LogError("telemetry_messenger_unsubscribe_for_messages failed (messenger is not subscribed)");
AzureIoTClient 41:71c01aa3df1a 1492 }
AzureIoTClient 41:71c01aa3df1a 1493 else
AzureIoTClient 41:71c01aa3df1a 1494 {
AzureIoTClient 41:71c01aa3df1a 1495 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [telemetry_messenger_unsubscribe_for_messages() shall set `instance->receive_messages` to false]
AzureIoTClient 41:71c01aa3df1a 1496 instance->receive_messages = false;
AzureIoTClient 56:8704100b3b54 1497
AzureIoTClient 41:71c01aa3df1a 1498 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [telemetry_messenger_unsubscribe_for_messages() shall set `instance->on_message_received_callback` to NULL]
AzureIoTClient 41:71c01aa3df1a 1499 instance->on_message_received_callback = NULL;
AzureIoTClient 56:8704100b3b54 1500
AzureIoTClient 41:71c01aa3df1a 1501 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_027: [telemetry_messenger_unsubscribe_for_messages() shall set `instance->on_message_received_context` to NULL]
AzureIoTClient 41:71c01aa3df1a 1502 instance->on_message_received_context = NULL;
AzureIoTClient 56:8704100b3b54 1503
AzureIoTClient 41:71c01aa3df1a 1504 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_028: [If no failures occurr, telemetry_messenger_unsubscribe_for_messages() shall return 0]
AzureIoTClient 41:71c01aa3df1a 1505 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1506 }
AzureIoTClient 41:71c01aa3df1a 1507 }
AzureIoTClient 36:f78f9a56869e 1508
AzureIoTClient 41:71c01aa3df1a 1509 return result;
AzureIoTClient 36:f78f9a56869e 1510 }
AzureIoTClient 36:f78f9a56869e 1511
AzureIoTClient 36:f78f9a56869e 1512 int telemetry_messenger_send_message_disposition(TELEMETRY_MESSENGER_HANDLE messenger_handle, TELEMETRY_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, TELEMETRY_MESSENGER_DISPOSITION_RESULT disposition_result)
AzureIoTClient 36:f78f9a56869e 1513 {
AzureIoTClient 41:71c01aa3df1a 1514 int result;
AzureIoTClient 36:f78f9a56869e 1515
AzureIoTClient 56:8704100b3b54 1516 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_179: [If `messenger_handle` or `disposition_info` are NULL, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1517 if (messenger_handle == NULL || disposition_info == NULL)
AzureIoTClient 41:71c01aa3df1a 1518 {
AzureIoTClient 41:71c01aa3df1a 1519 LogError("Failed sending message disposition (either messenger_handle (%p) or disposition_info (%p) are NULL)", messenger_handle, disposition_info);
AzureIoTClient 41:71c01aa3df1a 1520 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1521 }
AzureIoTClient 56:8704100b3b54 1522 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_180: [If `disposition_info->source` is NULL, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1523 else if (disposition_info->source == NULL)
AzureIoTClient 41:71c01aa3df1a 1524 {
AzureIoTClient 41:71c01aa3df1a 1525 LogError("Failed sending message disposition (disposition_info->source is NULL)");
AzureIoTClient 41:71c01aa3df1a 1526 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1527 }
AzureIoTClient 41:71c01aa3df1a 1528 else
AzureIoTClient 41:71c01aa3df1a 1529 {
AzureIoTClient 41:71c01aa3df1a 1530 TELEMETRY_MESSENGER_INSTANCE* messenger = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1531
AzureIoTClient 41:71c01aa3df1a 1532 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_189: [If `messenger_handle->message_receiver` is NULL, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1533 if (messenger->message_receiver == NULL)
AzureIoTClient 41:71c01aa3df1a 1534 {
AzureIoTClient 41:71c01aa3df1a 1535 LogError("Failed sending message disposition (message_receiver is not created; check if it is subscribed)");
AzureIoTClient 41:71c01aa3df1a 1536 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1537 }
AzureIoTClient 41:71c01aa3df1a 1538 else
AzureIoTClient 41:71c01aa3df1a 1539 {
AzureIoTClient 41:71c01aa3df1a 1540 AMQP_VALUE uamqp_disposition_result;
AzureIoTClient 36:f78f9a56869e 1541
AzureIoTClient 41:71c01aa3df1a 1542 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_181: [An AMQP_VALUE disposition result shall be created corresponding to the `disposition_result` provided]
AzureIoTClient 41:71c01aa3df1a 1543 if ((uamqp_disposition_result = create_uamqp_disposition_result_from(disposition_result)) == NULL)
AzureIoTClient 41:71c01aa3df1a 1544 {
AzureIoTClient 41:71c01aa3df1a 1545 LogError("Failed sending message disposition (disposition result %d is not supported)", disposition_result);
AzureIoTClient 41:71c01aa3df1a 1546 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1547 }
AzureIoTClient 41:71c01aa3df1a 1548 else
AzureIoTClient 41:71c01aa3df1a 1549 {
AzureIoTClient 56:8704100b3b54 1550 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_182: [`messagereceiver_send_message_disposition()` shall be invoked passing `disposition_info->source`, `disposition_info->message_id` and the corresponding AMQP_VALUE disposition result]
AzureIoTClient 41:71c01aa3df1a 1551 if (messagereceiver_send_message_disposition(messenger->message_receiver, disposition_info->source, disposition_info->message_id, uamqp_disposition_result) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1552 {
AzureIoTClient 56:8704100b3b54 1553 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_183: [If `messagereceiver_send_message_disposition()` fails, telemetry_messenger_send_message_disposition() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1554 LogError("Failed sending message disposition (messagereceiver_send_message_disposition failed)");
AzureIoTClient 41:71c01aa3df1a 1555 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1556 }
AzureIoTClient 41:71c01aa3df1a 1557 else
AzureIoTClient 41:71c01aa3df1a 1558 {
AzureIoTClient 56:8704100b3b54 1559 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_185: [If no failures occurr, telemetry_messenger_send_message_disposition() shall return 0]
AzureIoTClient 41:71c01aa3df1a 1560 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1561 }
AzureIoTClient 36:f78f9a56869e 1562
AzureIoTClient 41:71c01aa3df1a 1563 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_184: [telemetry_messenger_send_message_disposition() shall destroy the AMQP_VALUE disposition result]
AzureIoTClient 41:71c01aa3df1a 1564 amqpvalue_destroy(uamqp_disposition_result);
AzureIoTClient 41:71c01aa3df1a 1565 }
AzureIoTClient 41:71c01aa3df1a 1566 }
AzureIoTClient 41:71c01aa3df1a 1567 }
AzureIoTClient 36:f78f9a56869e 1568
AzureIoTClient 41:71c01aa3df1a 1569 return result;
AzureIoTClient 36:f78f9a56869e 1570 }
AzureIoTClient 36:f78f9a56869e 1571
AzureIoTClient 36:f78f9a56869e 1572 int telemetry_messenger_send_async(TELEMETRY_MESSENGER_HANDLE messenger_handle, IOTHUB_MESSAGE_LIST* message, ON_TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE on_messenger_event_send_complete_callback, void* context)
AzureIoTClient 36:f78f9a56869e 1573 {
AzureIoTClient 41:71c01aa3df1a 1574 int result;
AzureIoTClient 36:f78f9a56869e 1575
AzureIoTClient 56:8704100b3b54 1576 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_134: [If `messenger_handle` is NULL, telemetry_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1577 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1578 {
AzureIoTClient 41:71c01aa3df1a 1579 LogError("Failed sending event (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1580 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1581 }
AzureIoTClient 56:8704100b3b54 1582 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_135: [If `message` is NULL, telemetry_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1583 else if (message == NULL)
AzureIoTClient 41:71c01aa3df1a 1584 {
AzureIoTClient 41:71c01aa3df1a 1585 LogError("Failed sending event (message is NULL)");
AzureIoTClient 41:71c01aa3df1a 1586 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1587 }
AzureIoTClient 56:8704100b3b54 1588 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_136: [If `on_event_send_complete_callback` is NULL, telemetry_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1589 else if (on_messenger_event_send_complete_callback == NULL)
AzureIoTClient 41:71c01aa3df1a 1590 {
AzureIoTClient 41:71c01aa3df1a 1591 LogError("Failed sending event (on_event_send_complete_callback is NULL)");
AzureIoTClient 41:71c01aa3df1a 1592 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1593 }
AzureIoTClient 41:71c01aa3df1a 1594 else
AzureIoTClient 41:71c01aa3df1a 1595 {
AzureIoTClient 41:71c01aa3df1a 1596 MESSENGER_SEND_EVENT_CALLER_INFORMATION *caller_info;
AzureIoTClient 41:71c01aa3df1a 1597 TELEMETRY_MESSENGER_INSTANCE *instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1598
AzureIoTClient 56:8704100b3b54 1599 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_137: [telemetry_messenger_send_async() shall allocate memory for a MESSENGER_SEND_EVENT_CALLER_INFORMATION structure]
AzureIoTClient 41:71c01aa3df1a 1600 if ((caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)malloc(sizeof(MESSENGER_SEND_EVENT_CALLER_INFORMATION))) == NULL)
AzureIoTClient 41:71c01aa3df1a 1601 {
AzureIoTClient 41:71c01aa3df1a 1602 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_138: [If malloc() fails, telemetry_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1603 LogError("Failed sending event (failed to create struct for task; malloc failed)");
AzureIoTClient 41:71c01aa3df1a 1604 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1605 }
AzureIoTClient 56:8704100b3b54 1606 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_100: [`task` shall be added to `instance->waiting_to_send` using singlylinkedlist_add()]
AzureIoTClient 41:71c01aa3df1a 1607 else if (singlylinkedlist_add(instance->waiting_to_send, caller_info) == NULL)
AzureIoTClient 41:71c01aa3df1a 1608 {
AzureIoTClient 41:71c01aa3df1a 1609 LogError("Failed sending event (singlylinkedlist_add failed)");
AzureIoTClient 36:f78f9a56869e 1610
AzureIoTClient 41:71c01aa3df1a 1611 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If singlylinkedlist_add() fails, telemetry_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1612 result = __FAILURE__;
AzureIoTClient 36:f78f9a56869e 1613
AzureIoTClient 41:71c01aa3df1a 1614 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_142: [If any failure occurs, telemetry_messenger_send_async() shall free any memory it has allocated]
AzureIoTClient 41:71c01aa3df1a 1615 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1616 }
AzureIoTClient 41:71c01aa3df1a 1617 else
AzureIoTClient 41:71c01aa3df1a 1618 {
AzureIoTClient 41:71c01aa3df1a 1619 memset(caller_info, 0, sizeof(MESSENGER_SEND_EVENT_CALLER_INFORMATION));
AzureIoTClient 41:71c01aa3df1a 1620 caller_info->message = message;
AzureIoTClient 41:71c01aa3df1a 1621 caller_info->on_event_send_complete_callback = on_messenger_event_send_complete_callback;
AzureIoTClient 41:71c01aa3df1a 1622 caller_info->context = context;
AzureIoTClient 56:8704100b3b54 1623
AzureIoTClient 56:8704100b3b54 1624 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_143: [If no failures occur, telemetry_messenger_send_async() shall return zero]
AzureIoTClient 41:71c01aa3df1a 1625 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1626 }
AzureIoTClient 41:71c01aa3df1a 1627 }
AzureIoTClient 36:f78f9a56869e 1628
AzureIoTClient 41:71c01aa3df1a 1629 return result;
AzureIoTClient 36:f78f9a56869e 1630 }
AzureIoTClient 36:f78f9a56869e 1631
AzureIoTClient 36:f78f9a56869e 1632 int telemetry_messenger_get_send_status(TELEMETRY_MESSENGER_HANDLE messenger_handle, TELEMETRY_MESSENGER_SEND_STATUS* send_status)
AzureIoTClient 36:f78f9a56869e 1633 {
AzureIoTClient 41:71c01aa3df1a 1634 int result;
AzureIoTClient 36:f78f9a56869e 1635
AzureIoTClient 56:8704100b3b54 1636 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_144: [If `messenger_handle` is NULL, telemetry_messenger_get_send_status() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1637 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1638 {
AzureIoTClient 41:71c01aa3df1a 1639 LogError("telemetry_messenger_get_send_status failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1640 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1641 }
AzureIoTClient 56:8704100b3b54 1642 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_145: [If `send_status` is NULL, telemetry_messenger_get_send_status() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1643 else if (send_status == NULL)
AzureIoTClient 41:71c01aa3df1a 1644 {
AzureIoTClient 41:71c01aa3df1a 1645 LogError("telemetry_messenger_get_send_status failed (send_status is NULL)");
AzureIoTClient 41:71c01aa3df1a 1646 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1647 }
AzureIoTClient 41:71c01aa3df1a 1648 else
AzureIoTClient 41:71c01aa3df1a 1649 {
AzureIoTClient 41:71c01aa3df1a 1650 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 41:71c01aa3df1a 1651 LIST_ITEM_HANDLE wts_list_head = singlylinkedlist_get_head_item(instance->waiting_to_send);
AzureIoTClient 41:71c01aa3df1a 1652 LIST_ITEM_HANDLE ip_list_head = singlylinkedlist_get_head_item(instance->in_progress_list);
AzureIoTClient 36:f78f9a56869e 1653
AzureIoTClient 56:8704100b3b54 1654 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_147: [If `instance->in_progress_list` and `instance->wait_to_send_list` are empty, send_status shall be set to TELEMETRY_MESSENGER_SEND_STATUS_IDLE]
AzureIoTClient 41:71c01aa3df1a 1655 if (wts_list_head == NULL && ip_list_head == NULL)
AzureIoTClient 41:71c01aa3df1a 1656 {
AzureIoTClient 41:71c01aa3df1a 1657 *send_status = TELEMETRY_MESSENGER_SEND_STATUS_IDLE;
AzureIoTClient 41:71c01aa3df1a 1658 }
AzureIoTClient 56:8704100b3b54 1659 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_148: [Otherwise, send_status shall be set to TELEMETRY_MESSENGER_SEND_STATUS_BUSY]
AzureIoTClient 41:71c01aa3df1a 1660 else
AzureIoTClient 41:71c01aa3df1a 1661 {
AzureIoTClient 41:71c01aa3df1a 1662 *send_status = TELEMETRY_MESSENGER_SEND_STATUS_BUSY;
AzureIoTClient 41:71c01aa3df1a 1663 }
AzureIoTClient 36:f78f9a56869e 1664
AzureIoTClient 41:71c01aa3df1a 1665 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_149: [If no failures occur, telemetry_messenger_get_send_status() shall return 0]
AzureIoTClient 41:71c01aa3df1a 1666 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1667 }
AzureIoTClient 36:f78f9a56869e 1668
AzureIoTClient 41:71c01aa3df1a 1669 return result;
AzureIoTClient 36:f78f9a56869e 1670 }
AzureIoTClient 36:f78f9a56869e 1671
AzureIoTClient 36:f78f9a56869e 1672 int telemetry_messenger_start(TELEMETRY_MESSENGER_HANDLE messenger_handle, SESSION_HANDLE session_handle)
AzureIoTClient 36:f78f9a56869e 1673 {
AzureIoTClient 41:71c01aa3df1a 1674 int result;
AzureIoTClient 36:f78f9a56869e 1675
AzureIoTClient 41:71c01aa3df1a 1676 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` is NULL, telemetry_messenger_start() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1677 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1678 {
AzureIoTClient 41:71c01aa3df1a 1679 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1680 LogError("telemetry_messenger_start failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1681 }
AzureIoTClient 41:71c01aa3df1a 1682 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_030: [If `session_handle` is NULL, telemetry_messenger_start() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1683 else if (session_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1684 {
AzureIoTClient 41:71c01aa3df1a 1685 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1686 LogError("telemetry_messenger_start failed (session_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1687 }
AzureIoTClient 41:71c01aa3df1a 1688 else
AzureIoTClient 41:71c01aa3df1a 1689 {
AzureIoTClient 41:71c01aa3df1a 1690 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1691
AzureIoTClient 41:71c01aa3df1a 1692 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_031: [If `instance->state` is not TELEMETRY_MESSENGER_STATE_STOPPED, telemetry_messenger_start() shall fail and return __FAILURE__]
AzureIoTClient 41:71c01aa3df1a 1693 if (instance->state != TELEMETRY_MESSENGER_STATE_STOPPED)
AzureIoTClient 41:71c01aa3df1a 1694 {
AzureIoTClient 41:71c01aa3df1a 1695 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1696 LogError("telemetry_messenger_start failed (current state is %d; expected TELEMETRY_MESSENGER_STATE_STOPPED)", instance->state);
AzureIoTClient 41:71c01aa3df1a 1697 }
AzureIoTClient 41:71c01aa3df1a 1698 else
AzureIoTClient 41:71c01aa3df1a 1699 {
AzureIoTClient 41:71c01aa3df1a 1700 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_032: [`session_handle` shall be saved on `instance->session_handle`]
AzureIoTClient 41:71c01aa3df1a 1701 instance->session_handle = session_handle;
AzureIoTClient 36:f78f9a56869e 1702
AzureIoTClient 41:71c01aa3df1a 1703 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_115: [If no failures occurr, `instance->state` shall be set to TELEMETRY_MESSENGER_STATE_STARTING, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 41:71c01aa3df1a 1704 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STARTING);
AzureIoTClient 36:f78f9a56869e 1705
AzureIoTClient 41:71c01aa3df1a 1706 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_056: [If no failures occurr, telemetry_messenger_start() shall return 0]
AzureIoTClient 41:71c01aa3df1a 1707 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1708 }
AzureIoTClient 41:71c01aa3df1a 1709 }
AzureIoTClient 36:f78f9a56869e 1710
AzureIoTClient 41:71c01aa3df1a 1711 return result;
AzureIoTClient 36:f78f9a56869e 1712 }
AzureIoTClient 36:f78f9a56869e 1713
AzureIoTClient 36:f78f9a56869e 1714 int telemetry_messenger_stop(TELEMETRY_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 36:f78f9a56869e 1715 {
AzureIoTClient 41:71c01aa3df1a 1716 int result;
AzureIoTClient 36:f78f9a56869e 1717
AzureIoTClient 41:71c01aa3df1a 1718 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_057: [If `messenger_handle` is NULL, telemetry_messenger_stop() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1719 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1720 {
AzureIoTClient 41:71c01aa3df1a 1721 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1722 LogError("telemetry_messenger_stop failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1723 }
AzureIoTClient 41:71c01aa3df1a 1724 else
AzureIoTClient 41:71c01aa3df1a 1725 {
AzureIoTClient 41:71c01aa3df1a 1726 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1727
AzureIoTClient 41:71c01aa3df1a 1728 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_058: [If `instance->state` is TELEMETRY_MESSENGER_STATE_STOPPED, telemetry_messenger_stop() shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 1729 if (instance->state == TELEMETRY_MESSENGER_STATE_STOPPED)
AzureIoTClient 41:71c01aa3df1a 1730 {
AzureIoTClient 41:71c01aa3df1a 1731 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1732 LogError("telemetry_messenger_stop failed (messenger is already stopped)");
AzureIoTClient 41:71c01aa3df1a 1733 }
AzureIoTClient 41:71c01aa3df1a 1734 else
AzureIoTClient 41:71c01aa3df1a 1735 {
AzureIoTClient 41:71c01aa3df1a 1736 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STOPPING);
AzureIoTClient 36:f78f9a56869e 1737
AzureIoTClient 56:8704100b3b54 1738 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_152: [telemetry_messenger_stop() shall close and destroy `instance->message_sender` and `instance->message_receiver`]
AzureIoTClient 41:71c01aa3df1a 1739 destroy_event_sender(instance);
AzureIoTClient 41:71c01aa3df1a 1740 destroy_message_receiver(instance);
AzureIoTClient 36:f78f9a56869e 1741
AzureIoTClient 41:71c01aa3df1a 1742 remove_timed_out_events(instance);
AzureIoTClient 36:f78f9a56869e 1743
AzureIoTClient 41:71c01aa3df1a 1744 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_162: [telemetry_messenger_stop() shall move all items from `instance->in_progress_list` to the beginning of `instance->wait_to_send_list`]
AzureIoTClient 41:71c01aa3df1a 1745 if (move_events_to_wait_to_send_list(instance) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1746 {
AzureIoTClient 41:71c01aa3df1a 1747 LogError("Messenger failed to move events in progress back to wait_to_send list");
AzureIoTClient 56:8704100b3b54 1748
AzureIoTClient 41:71c01aa3df1a 1749 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_163: [If not all items from `instance->in_progress_list` can be moved back to `instance->wait_to_send_list`, `instance->state` shall be set to TELEMETRY_MESSENGER_STATE_ERROR, and `instance->on_state_changed_callback` invoked]
AzureIoTClient 41:71c01aa3df1a 1750 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1751 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 1752 }
AzureIoTClient 41:71c01aa3df1a 1753 else
AzureIoTClient 41:71c01aa3df1a 1754 {
AzureIoTClient 41:71c01aa3df1a 1755 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_164: [If all items get successfuly moved back to `instance->wait_to_send_list`, `instance->state` shall be set to TELEMETRY_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked]
AzureIoTClient 41:71c01aa3df1a 1756 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STOPPED);
AzureIoTClient 41:71c01aa3df1a 1757 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 1758 }
AzureIoTClient 41:71c01aa3df1a 1759 }
AzureIoTClient 41:71c01aa3df1a 1760 }
AzureIoTClient 36:f78f9a56869e 1761
AzureIoTClient 41:71c01aa3df1a 1762 return result;
AzureIoTClient 36:f78f9a56869e 1763 }
AzureIoTClient 36:f78f9a56869e 1764
AzureIoTClient 36:f78f9a56869e 1765 // @brief
AzureIoTClient 36:f78f9a56869e 1766 // Sets the messenger module state based on the state changes from messagesender and messagereceiver
AzureIoTClient 36:f78f9a56869e 1767 static void process_state_changes(TELEMETRY_MESSENGER_INSTANCE* instance)
AzureIoTClient 36:f78f9a56869e 1768 {
AzureIoTClient 56:8704100b3b54 1769 // Note: messagesender and messagereceiver are still not created or already destroyed
AzureIoTClient 41:71c01aa3df1a 1770 // when state is TELEMETRY_MESSENGER_STATE_STOPPED, so no checking is needed there.
AzureIoTClient 36:f78f9a56869e 1771
AzureIoTClient 41:71c01aa3df1a 1772 if (instance->state == TELEMETRY_MESSENGER_STATE_STARTED)
AzureIoTClient 41:71c01aa3df1a 1773 {
AzureIoTClient 41:71c01aa3df1a 1774 if (instance->message_sender_current_state != MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 41:71c01aa3df1a 1775 {
AzureIoTClient 41:71c01aa3df1a 1776 LogError("messagesender reported unexpected state %d while messenger was started", instance->message_sender_current_state);
AzureIoTClient 41:71c01aa3df1a 1777 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1778 }
AzureIoTClient 41:71c01aa3df1a 1779 else if (instance->message_receiver != NULL && instance->message_receiver_current_state != MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 41:71c01aa3df1a 1780 {
AzureIoTClient 41:71c01aa3df1a 1781 if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_OPENING)
AzureIoTClient 41:71c01aa3df1a 1782 {
AzureIoTClient 41:71c01aa3df1a 1783 int is_timed_out;
AzureIoTClient 41:71c01aa3df1a 1784 if (is_timeout_reached(instance->last_message_receiver_state_change_time, MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1785 {
AzureIoTClient 41:71c01aa3df1a 1786 LogError("messenger got an error (failed to verify messagereceiver start timeout)");
AzureIoTClient 41:71c01aa3df1a 1787 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1788 }
AzureIoTClient 41:71c01aa3df1a 1789 else if (is_timed_out == 1)
AzureIoTClient 41:71c01aa3df1a 1790 {
AzureIoTClient 41:71c01aa3df1a 1791 LogError("messenger got an error (messagereceiver failed to start within expected timeout (%d secs))", MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS);
AzureIoTClient 41:71c01aa3df1a 1792 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1793 }
AzureIoTClient 41:71c01aa3df1a 1794 }
AzureIoTClient 41:71c01aa3df1a 1795 else if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_ERROR ||
AzureIoTClient 41:71c01aa3df1a 1796 instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_IDLE)
AzureIoTClient 41:71c01aa3df1a 1797 {
AzureIoTClient 41:71c01aa3df1a 1798 LogError("messagereceiver reported unexpected state %d while messenger is starting", instance->message_receiver_current_state);
AzureIoTClient 41:71c01aa3df1a 1799 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1800 }
AzureIoTClient 41:71c01aa3df1a 1801 }
AzureIoTClient 41:71c01aa3df1a 1802 }
AzureIoTClient 41:71c01aa3df1a 1803 else
AzureIoTClient 41:71c01aa3df1a 1804 {
AzureIoTClient 41:71c01aa3df1a 1805 if (instance->state == TELEMETRY_MESSENGER_STATE_STARTING)
AzureIoTClient 41:71c01aa3df1a 1806 {
AzureIoTClient 41:71c01aa3df1a 1807 if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 41:71c01aa3df1a 1808 {
AzureIoTClient 41:71c01aa3df1a 1809 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_STARTED);
AzureIoTClient 41:71c01aa3df1a 1810 }
AzureIoTClient 41:71c01aa3df1a 1811 else if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPENING)
AzureIoTClient 41:71c01aa3df1a 1812 {
AzureIoTClient 41:71c01aa3df1a 1813 int is_timed_out;
AzureIoTClient 41:71c01aa3df1a 1814 if (is_timeout_reached(instance->last_message_sender_state_change_time, MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1815 {
AzureIoTClient 41:71c01aa3df1a 1816 LogError("messenger failed to start (failed to verify messagesender start timeout)");
AzureIoTClient 41:71c01aa3df1a 1817 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1818 }
AzureIoTClient 41:71c01aa3df1a 1819 else if (is_timed_out == 1)
AzureIoTClient 41:71c01aa3df1a 1820 {
AzureIoTClient 41:71c01aa3df1a 1821 LogError("messenger failed to start (messagesender failed to start within expected timeout (%d secs))", MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS);
AzureIoTClient 41:71c01aa3df1a 1822 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1823 }
AzureIoTClient 41:71c01aa3df1a 1824 }
AzureIoTClient 56:8704100b3b54 1825 // For this module, the only valid scenario where messagesender state is IDLE is if
AzureIoTClient 41:71c01aa3df1a 1826 // the messagesender hasn't been created yet or already destroyed.
AzureIoTClient 41:71c01aa3df1a 1827 else if ((instance->message_sender_current_state == MESSAGE_SENDER_STATE_ERROR) ||
AzureIoTClient 41:71c01aa3df1a 1828 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_CLOSING) ||
AzureIoTClient 41:71c01aa3df1a 1829 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_IDLE && instance->message_sender != NULL))
AzureIoTClient 41:71c01aa3df1a 1830 {
AzureIoTClient 41:71c01aa3df1a 1831 LogError("messagesender reported unexpected state %d while messenger is starting", instance->message_sender_current_state);
AzureIoTClient 41:71c01aa3df1a 1832 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1833 }
AzureIoTClient 41:71c01aa3df1a 1834 }
AzureIoTClient 41:71c01aa3df1a 1835 // message sender and receiver are stopped/destroyed synchronously, so no need for state control.
AzureIoTClient 41:71c01aa3df1a 1836 }
AzureIoTClient 36:f78f9a56869e 1837 }
AzureIoTClient 36:f78f9a56869e 1838
AzureIoTClient 36:f78f9a56869e 1839 void telemetry_messenger_do_work(TELEMETRY_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 36:f78f9a56869e 1840 {
AzureIoTClient 41:71c01aa3df1a 1841 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_065: [If `messenger_handle` is NULL, telemetry_messenger_do_work() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 1842 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1843 {
AzureIoTClient 41:71c01aa3df1a 1844 LogError("telemetry_messenger_do_work failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1845 }
AzureIoTClient 41:71c01aa3df1a 1846 else
AzureIoTClient 41:71c01aa3df1a 1847 {
AzureIoTClient 41:71c01aa3df1a 1848 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1849
AzureIoTClient 41:71c01aa3df1a 1850 process_state_changes(instance);
AzureIoTClient 36:f78f9a56869e 1851
AzureIoTClient 41:71c01aa3df1a 1852 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_151: [If `instance->state` is TELEMETRY_MESSENGER_STATE_STARTING, telemetry_messenger_do_work() shall create and open `instance->message_sender`]
AzureIoTClient 41:71c01aa3df1a 1853 if (instance->state == TELEMETRY_MESSENGER_STATE_STARTING)
AzureIoTClient 41:71c01aa3df1a 1854 {
AzureIoTClient 41:71c01aa3df1a 1855 if (instance->message_sender == NULL)
AzureIoTClient 41:71c01aa3df1a 1856 {
AzureIoTClient 41:71c01aa3df1a 1857 if (create_event_sender(instance) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1858 {
AzureIoTClient 41:71c01aa3df1a 1859 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1860 }
AzureIoTClient 41:71c01aa3df1a 1861 }
AzureIoTClient 41:71c01aa3df1a 1862 }
AzureIoTClient 41:71c01aa3df1a 1863 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_066: [If `instance->state` is not TELEMETRY_MESSENGER_STATE_STARTED, telemetry_messenger_do_work() shall return]
AzureIoTClient 41:71c01aa3df1a 1864 else if (instance->state == TELEMETRY_MESSENGER_STATE_STARTED)
AzureIoTClient 41:71c01aa3df1a 1865 {
AzureIoTClient 41:71c01aa3df1a 1866 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_067: [If `instance->receive_messages` is true and `instance->message_receiver` is NULL, a message_receiver shall be created]
AzureIoTClient 41:71c01aa3df1a 1867 if (instance->receive_messages == true &&
AzureIoTClient 41:71c01aa3df1a 1868 instance->message_receiver == NULL &&
AzureIoTClient 41:71c01aa3df1a 1869 create_message_receiver(instance) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1870 {
AzureIoTClient 41:71c01aa3df1a 1871 LogError("telemetry_messenger_do_work warning (failed creating the message receiver [%s])", STRING_c_str(instance->device_id));
AzureIoTClient 41:71c01aa3df1a 1872 }
AzureIoTClient 41:71c01aa3df1a 1873 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_092: [If `instance->receive_messages` is false and `instance->message_receiver` is not NULL, it shall be destroyed]
AzureIoTClient 41:71c01aa3df1a 1874 else if (instance->receive_messages == false && instance->message_receiver != NULL)
AzureIoTClient 41:71c01aa3df1a 1875 {
AzureIoTClient 41:71c01aa3df1a 1876 destroy_message_receiver(instance);
AzureIoTClient 41:71c01aa3df1a 1877 }
AzureIoTClient 36:f78f9a56869e 1878
AzureIoTClient 41:71c01aa3df1a 1879 if (process_event_send_timeouts(instance) != RESULT_OK)
AzureIoTClient 41:71c01aa3df1a 1880 {
AzureIoTClient 41:71c01aa3df1a 1881 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1882 }
AzureIoTClient 41:71c01aa3df1a 1883 else if (send_pending_events(instance) != RESULT_OK && instance->event_send_retry_limit > 0)
AzureIoTClient 41:71c01aa3df1a 1884 {
AzureIoTClient 41:71c01aa3df1a 1885 instance->event_send_error_count++;
AzureIoTClient 36:f78f9a56869e 1886
AzureIoTClient 41:71c01aa3df1a 1887 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_161: [If telemetry_messenger_do_work() fail sending events for `instance->event_send_retry_limit` times in a row, it shall invoke `instance->on_state_changed_callback`, if provided, with error code TELEMETRY_MESSENGER_STATE_ERROR]
AzureIoTClient 41:71c01aa3df1a 1888 if (instance->event_send_error_count >= instance->event_send_retry_limit)
AzureIoTClient 41:71c01aa3df1a 1889 {
AzureIoTClient 41:71c01aa3df1a 1890 LogError("telemetry_messenger_do_work failed (failed sending events; reached max number of consecutive attempts)");
AzureIoTClient 41:71c01aa3df1a 1891 update_messenger_state(instance, TELEMETRY_MESSENGER_STATE_ERROR);
AzureIoTClient 41:71c01aa3df1a 1892 }
AzureIoTClient 41:71c01aa3df1a 1893 }
AzureIoTClient 41:71c01aa3df1a 1894 else
AzureIoTClient 41:71c01aa3df1a 1895 {
AzureIoTClient 41:71c01aa3df1a 1896 instance->event_send_error_count = 0;
AzureIoTClient 41:71c01aa3df1a 1897 }
AzureIoTClient 41:71c01aa3df1a 1898 }
AzureIoTClient 41:71c01aa3df1a 1899 }
AzureIoTClient 36:f78f9a56869e 1900 }
AzureIoTClient 36:f78f9a56869e 1901
AzureIoTClient 36:f78f9a56869e 1902 void telemetry_messenger_destroy(TELEMETRY_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 36:f78f9a56869e 1903 {
AzureIoTClient 41:71c01aa3df1a 1904 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_109: [If `messenger_handle` is NULL, telemetry_messenger_destroy() shall fail and return]
AzureIoTClient 41:71c01aa3df1a 1905 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 1906 {
AzureIoTClient 41:71c01aa3df1a 1907 LogError("telemetry_messenger_destroy failed (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 1908 }
AzureIoTClient 41:71c01aa3df1a 1909 else
AzureIoTClient 41:71c01aa3df1a 1910 {
AzureIoTClient 41:71c01aa3df1a 1911 LIST_ITEM_HANDLE list_node;
AzureIoTClient 41:71c01aa3df1a 1912 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 1913
AzureIoTClient 41:71c01aa3df1a 1914 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_110: [If the `instance->state` is not TELEMETRY_MESSENGER_STATE_STOPPED, telemetry_messenger_destroy() shall invoke telemetry_messenger_stop()]
AzureIoTClient 41:71c01aa3df1a 1915 if (instance->state != TELEMETRY_MESSENGER_STATE_STOPPED)
AzureIoTClient 41:71c01aa3df1a 1916 {
AzureIoTClient 41:71c01aa3df1a 1917 (void)telemetry_messenger_stop(messenger_handle);
AzureIoTClient 41:71c01aa3df1a 1918 }
AzureIoTClient 36:f78f9a56869e 1919
AzureIoTClient 41:71c01aa3df1a 1920 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_111: [All elements of `instance->in_progress_list` and `instance->wait_to_send_list` shall be removed, invoking `task->on_event_send_complete_callback` for each with EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED]
AzureIoTClient 36:f78f9a56869e 1921
AzureIoTClient 56:8704100b3b54 1922 // Note: yes telemetry_messenger_stop() tried to move all events from in_progress_list to wait_to_send_list,
AzureIoTClient 41:71c01aa3df1a 1923 // but we need to iterate through in case any events failed to be moved.
AzureIoTClient 41:71c01aa3df1a 1924 while ((list_node = singlylinkedlist_get_head_item(instance->in_progress_list)) != NULL)
AzureIoTClient 41:71c01aa3df1a 1925 {
AzureIoTClient 41:71c01aa3df1a 1926 MESSENGER_SEND_EVENT_TASK* task = (MESSENGER_SEND_EVENT_TASK*)singlylinkedlist_item_get_value(list_node);
AzureIoTClient 36:f78f9a56869e 1927
AzureIoTClient 41:71c01aa3df1a 1928 (void)singlylinkedlist_remove(instance->in_progress_list, list_node);
AzureIoTClient 36:f78f9a56869e 1929
AzureIoTClient 41:71c01aa3df1a 1930 if (task != NULL)
AzureIoTClient 41:71c01aa3df1a 1931 {
AzureIoTClient 41:71c01aa3df1a 1932 singlylinkedlist_foreach(task->callback_list, invoke_callback, (void*)TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED);
AzureIoTClient 41:71c01aa3df1a 1933 free_task(task);
AzureIoTClient 41:71c01aa3df1a 1934 }
AzureIoTClient 41:71c01aa3df1a 1935 }
AzureIoTClient 36:f78f9a56869e 1936
AzureIoTClient 41:71c01aa3df1a 1937 while ((list_node = singlylinkedlist_get_head_item(instance->waiting_to_send)) != NULL)
AzureIoTClient 41:71c01aa3df1a 1938 {
AzureIoTClient 41:71c01aa3df1a 1939 MESSENGER_SEND_EVENT_CALLER_INFORMATION* caller_info = (MESSENGER_SEND_EVENT_CALLER_INFORMATION*)singlylinkedlist_item_get_value(list_node);
AzureIoTClient 36:f78f9a56869e 1940
AzureIoTClient 41:71c01aa3df1a 1941 (void)singlylinkedlist_remove(instance->waiting_to_send, list_node);
AzureIoTClient 36:f78f9a56869e 1942
AzureIoTClient 41:71c01aa3df1a 1943 if (caller_info != NULL)
AzureIoTClient 41:71c01aa3df1a 1944 {
AzureIoTClient 41:71c01aa3df1a 1945 caller_info->on_event_send_complete_callback(caller_info->message, TELEMETRY_MESSENGER_EVENT_SEND_COMPLETE_RESULT_MESSENGER_DESTROYED, (void*)caller_info->context);
AzureIoTClient 41:71c01aa3df1a 1946 free(caller_info);
AzureIoTClient 41:71c01aa3df1a 1947 }
AzureIoTClient 41:71c01aa3df1a 1948 }
AzureIoTClient 36:f78f9a56869e 1949
AzureIoTClient 41:71c01aa3df1a 1950 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_150: [`instance->in_progress_list` and `instance->wait_to_send_list` shall be destroyed using singlylinkedlist_destroy()]
AzureIoTClient 41:71c01aa3df1a 1951 singlylinkedlist_destroy(instance->waiting_to_send);
AzureIoTClient 41:71c01aa3df1a 1952 singlylinkedlist_destroy(instance->in_progress_list);
AzureIoTClient 36:f78f9a56869e 1953
AzureIoTClient 41:71c01aa3df1a 1954 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_112: [`instance->iothub_host_fqdn` shall be destroyed using STRING_delete()]
AzureIoTClient 41:71c01aa3df1a 1955 STRING_delete(instance->iothub_host_fqdn);
AzureIoTClient 56:8704100b3b54 1956
AzureIoTClient 41:71c01aa3df1a 1957 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_113: [`instance->device_id` shall be destroyed using STRING_delete()]
AzureIoTClient 41:71c01aa3df1a 1958 STRING_delete(instance->device_id);
AzureIoTClient 36:f78f9a56869e 1959
AzureIoTClient 54:830550fef7ea 1960 STRING_delete(instance->module_id);
AzureIoTClient 54:830550fef7ea 1961
AzureIoTClient 36:f78f9a56869e 1962 STRING_delete(instance->product_info);
AzureIoTClient 36:f78f9a56869e 1963
AzureIoTClient 41:71c01aa3df1a 1964 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_114: [telemetry_messenger_destroy() shall destroy `instance` with free()]
AzureIoTClient 41:71c01aa3df1a 1965 (void)free(instance);
AzureIoTClient 41:71c01aa3df1a 1966 }
AzureIoTClient 36:f78f9a56869e 1967 }
AzureIoTClient 36:f78f9a56869e 1968
AzureIoTClient 36:f78f9a56869e 1969 TELEMETRY_MESSENGER_HANDLE telemetry_messenger_create(const TELEMETRY_MESSENGER_CONFIG* messenger_config, const char* product_info)
AzureIoTClient 36:f78f9a56869e 1970 {
AzureIoTClient 41:71c01aa3df1a 1971 TELEMETRY_MESSENGER_HANDLE handle;
AzureIoTClient 36:f78f9a56869e 1972
AzureIoTClient 41:71c01aa3df1a 1973 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_001: [If parameter `messenger_config` is NULL, telemetry_messenger_create() shall return NULL]
AzureIoTClient 41:71c01aa3df1a 1974 if (messenger_config == NULL)
AzureIoTClient 41:71c01aa3df1a 1975 {
AzureIoTClient 41:71c01aa3df1a 1976 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 1977 LogError("telemetry_messenger_create failed (messenger_config is NULL)");
AzureIoTClient 41:71c01aa3df1a 1978 }
AzureIoTClient 41:71c01aa3df1a 1979 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_002: [If `messenger_config->device_id` is NULL, telemetry_messenger_create() shall return NULL]
AzureIoTClient 41:71c01aa3df1a 1980 else if (messenger_config->device_id == NULL)
AzureIoTClient 41:71c01aa3df1a 1981 {
AzureIoTClient 41:71c01aa3df1a 1982 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 1983 LogError("telemetry_messenger_create failed (messenger_config->device_id is NULL)");
AzureIoTClient 41:71c01aa3df1a 1984 }
AzureIoTClient 41:71c01aa3df1a 1985 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_003: [If `messenger_config->iothub_host_fqdn` is NULL, telemetry_messenger_create() shall return NULL]
AzureIoTClient 41:71c01aa3df1a 1986 else if (messenger_config->iothub_host_fqdn == NULL)
AzureIoTClient 41:71c01aa3df1a 1987 {
AzureIoTClient 41:71c01aa3df1a 1988 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 1989 LogError("telemetry_messenger_create failed (messenger_config->iothub_host_fqdn is NULL)");
AzureIoTClient 41:71c01aa3df1a 1990 }
AzureIoTClient 41:71c01aa3df1a 1991 else
AzureIoTClient 41:71c01aa3df1a 1992 {
AzureIoTClient 41:71c01aa3df1a 1993 TELEMETRY_MESSENGER_INSTANCE* instance;
AzureIoTClient 36:f78f9a56869e 1994
AzureIoTClient 41:71c01aa3df1a 1995 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_006: [telemetry_messenger_create() shall allocate memory for the messenger instance structure (aka `instance`)]
AzureIoTClient 41:71c01aa3df1a 1996 if ((instance = (TELEMETRY_MESSENGER_INSTANCE*)malloc(sizeof(TELEMETRY_MESSENGER_INSTANCE))) == NULL)
AzureIoTClient 41:71c01aa3df1a 1997 {
AzureIoTClient 41:71c01aa3df1a 1998 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_007: [If malloc() fails, telemetry_messenger_create() shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 1999 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 2000 LogError("telemetry_messenger_create failed (messenger_config->wait_to_send_list is NULL)");
AzureIoTClient 41:71c01aa3df1a 2001 }
AzureIoTClient 41:71c01aa3df1a 2002 else
AzureIoTClient 41:71c01aa3df1a 2003 {
AzureIoTClient 41:71c01aa3df1a 2004 memset(instance, 0, sizeof(TELEMETRY_MESSENGER_INSTANCE));
AzureIoTClient 41:71c01aa3df1a 2005 instance->state = TELEMETRY_MESSENGER_STATE_STOPPED;
AzureIoTClient 41:71c01aa3df1a 2006 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 41:71c01aa3df1a 2007 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 41:71c01aa3df1a 2008 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 41:71c01aa3df1a 2009 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 41:71c01aa3df1a 2010 instance->event_send_retry_limit = DEFAULT_EVENT_SEND_RETRY_LIMIT;
AzureIoTClient 41:71c01aa3df1a 2011 instance->event_send_timeout_secs = DEFAULT_EVENT_SEND_TIMEOUT_SECS;
AzureIoTClient 41:71c01aa3df1a 2012 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
AzureIoTClient 41:71c01aa3df1a 2013 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
AzureIoTClient 36:f78f9a56869e 2014
AzureIoTClient 41:71c01aa3df1a 2015 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_008: [telemetry_messenger_create() shall save a copy of `messenger_config->device_id` into `instance->device_id`]
AzureIoTClient 41:71c01aa3df1a 2016 if ((instance->device_id = STRING_construct(messenger_config->device_id)) == NULL)
AzureIoTClient 41:71c01aa3df1a 2017 {
AzureIoTClient 41:71c01aa3df1a 2018 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_009: [If STRING_construct() fails, telemetry_messenger_create() shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 2019 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 2020 LogError("telemetry_messenger_create failed (device_id could not be copied; STRING_construct failed)");
AzureIoTClient 41:71c01aa3df1a 2021 }
AzureIoTClient 54:830550fef7ea 2022 else if ((messenger_config->module_id != NULL) && ((instance->module_id = STRING_construct(messenger_config->module_id)) == NULL))
AzureIoTClient 54:830550fef7ea 2023 {
AzureIoTClient 54:830550fef7ea 2024 handle = NULL;
AzureIoTClient 54:830550fef7ea 2025 LogError("telemetry_messenger_create failed (module_id could not be copied; STRING_construct failed)");
AzureIoTClient 54:830550fef7ea 2026 }
AzureIoTClient 36:f78f9a56869e 2027 else if ((instance->product_info = STRING_construct(product_info)) == NULL)
AzureIoTClient 36:f78f9a56869e 2028 {
AzureIoTClient 36:f78f9a56869e 2029 handle = NULL;
AzureIoTClient 36:f78f9a56869e 2030 LogError("telemetry_messenger_create failed (product_info could not be copied; STRING_construct failed)");
AzureIoTClient 36:f78f9a56869e 2031 }
AzureIoTClient 41:71c01aa3df1a 2032 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_010: [telemetry_messenger_create() shall save a copy of `messenger_config->iothub_host_fqdn` into `instance->iothub_host_fqdn`]
AzureIoTClient 41:71c01aa3df1a 2033 else if ((instance->iothub_host_fqdn = STRING_construct(messenger_config->iothub_host_fqdn)) == NULL)
AzureIoTClient 41:71c01aa3df1a 2034 {
AzureIoTClient 41:71c01aa3df1a 2035 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_011: [If STRING_construct() fails, telemetry_messenger_create() shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 2036 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 2037 LogError("telemetry_messenger_create failed (iothub_host_fqdn could not be copied; STRING_construct failed)");
AzureIoTClient 41:71c01aa3df1a 2038 }
AzureIoTClient 41:71c01aa3df1a 2039 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_165: [`instance->wait_to_send_list` shall be set using singlylinkedlist_create()]
AzureIoTClient 41:71c01aa3df1a 2040 else if ((instance->waiting_to_send = singlylinkedlist_create()) == NULL)
AzureIoTClient 41:71c01aa3df1a 2041 {
AzureIoTClient 56:8704100b3b54 2042 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_166: [If singlylinkedlist_create() fails, telemetry_messenger_create() shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 2043 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 2044 LogError("telemetry_messenger_create failed (singlylinkedlist_create failed to create wait_to_send_list)");
AzureIoTClient 41:71c01aa3df1a 2045 }
AzureIoTClient 56:8704100b3b54 2046 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [`instance->in_progress_list` shall be set using singlylinkedlist_create()]
AzureIoTClient 41:71c01aa3df1a 2047 else if ((instance->in_progress_list = singlylinkedlist_create()) == NULL)
AzureIoTClient 41:71c01aa3df1a 2048 {
AzureIoTClient 56:8704100b3b54 2049 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_133: [If singlylinkedlist_create() fails, telemetry_messenger_create() shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 2050 handle = NULL;
AzureIoTClient 41:71c01aa3df1a 2051 LogError("telemetry_messenger_create failed (singlylinkedlist_create failed to create in_progress_list)");
AzureIoTClient 41:71c01aa3df1a 2052 }
AzureIoTClient 41:71c01aa3df1a 2053 else
AzureIoTClient 41:71c01aa3df1a 2054 {
AzureIoTClient 41:71c01aa3df1a 2055 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_013: [`messenger_config->on_state_changed_callback` shall be saved into `instance->on_state_changed_callback`]
AzureIoTClient 41:71c01aa3df1a 2056 instance->on_state_changed_callback = messenger_config->on_state_changed_callback;
AzureIoTClient 36:f78f9a56869e 2057
AzureIoTClient 41:71c01aa3df1a 2058 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_014: [`messenger_config->on_state_changed_context` shall be saved into `instance->on_state_changed_context`]
AzureIoTClient 41:71c01aa3df1a 2059 instance->on_state_changed_context = messenger_config->on_state_changed_context;
AzureIoTClient 36:f78f9a56869e 2060
AzureIoTClient 41:71c01aa3df1a 2061 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_015: [If no failures occurr, telemetry_messenger_create() shall return a handle to `instance`]
AzureIoTClient 41:71c01aa3df1a 2062 handle = (TELEMETRY_MESSENGER_HANDLE)instance;
AzureIoTClient 41:71c01aa3df1a 2063 }
AzureIoTClient 41:71c01aa3df1a 2064 }
AzureIoTClient 36:f78f9a56869e 2065
AzureIoTClient 41:71c01aa3df1a 2066 if (handle == NULL)
AzureIoTClient 41:71c01aa3df1a 2067 {
AzureIoTClient 41:71c01aa3df1a 2068 telemetry_messenger_destroy((TELEMETRY_MESSENGER_HANDLE)instance);
AzureIoTClient 41:71c01aa3df1a 2069 }
AzureIoTClient 41:71c01aa3df1a 2070 }
AzureIoTClient 36:f78f9a56869e 2071
AzureIoTClient 41:71c01aa3df1a 2072 return handle;
AzureIoTClient 36:f78f9a56869e 2073 }
AzureIoTClient 36:f78f9a56869e 2074
AzureIoTClient 36:f78f9a56869e 2075 int telemetry_messenger_set_option(TELEMETRY_MESSENGER_HANDLE messenger_handle, const char* name, void* value)
AzureIoTClient 36:f78f9a56869e 2076 {
AzureIoTClient 41:71c01aa3df1a 2077 int result;
AzureIoTClient 36:f78f9a56869e 2078
AzureIoTClient 41:71c01aa3df1a 2079 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_167: [If `messenger_handle` or `name` or `value` is NULL, telemetry_messenger_set_option shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 2080 if (messenger_handle == NULL || name == NULL || value == NULL)
AzureIoTClient 41:71c01aa3df1a 2081 {
AzureIoTClient 41:71c01aa3df1a 2082 LogError("telemetry_messenger_set_option failed (one of the followin are NULL: messenger_handle=%p, name=%p, value=%p)",
AzureIoTClient 41:71c01aa3df1a 2083 messenger_handle, name, value);
AzureIoTClient 41:71c01aa3df1a 2084 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 2085 }
AzureIoTClient 41:71c01aa3df1a 2086 else
AzureIoTClient 41:71c01aa3df1a 2087 {
AzureIoTClient 41:71c01aa3df1a 2088 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 2089
AzureIoTClient 51:269e65571b39 2090 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_168: [If name matches TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, `value` shall be saved on `instance->event_send_timeout_secs`]
AzureIoTClient 51:269e65571b39 2091 if (strcmp(TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0)
AzureIoTClient 41:71c01aa3df1a 2092 {
AzureIoTClient 41:71c01aa3df1a 2093 instance->event_send_timeout_secs = *((size_t*)value);
AzureIoTClient 41:71c01aa3df1a 2094 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 2095 }
AzureIoTClient 51:269e65571b39 2096 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_169: [If name matches TELEMETRY_MESSENGER_OPTION_SAVED_OPTIONS, `value` shall be applied using OptionHandler_FeedOptions]
AzureIoTClient 51:269e65571b39 2097 else if (strcmp(TELEMETRY_MESSENGER_OPTION_SAVED_OPTIONS, name) == 0)
AzureIoTClient 41:71c01aa3df1a 2098 {
AzureIoTClient 41:71c01aa3df1a 2099 if (OptionHandler_FeedOptions((OPTIONHANDLER_HANDLE)value, messenger_handle) != OPTIONHANDLER_OK)
AzureIoTClient 41:71c01aa3df1a 2100 {
AzureIoTClient 41:71c01aa3df1a 2101 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_170: [If OptionHandler_FeedOptions fails, telemetry_messenger_set_option shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 2102 LogError("telemetry_messenger_set_option failed (OptionHandler_FeedOptions failed)");
AzureIoTClient 41:71c01aa3df1a 2103 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 2104 }
AzureIoTClient 41:71c01aa3df1a 2105 else
AzureIoTClient 41:71c01aa3df1a 2106 {
AzureIoTClient 41:71c01aa3df1a 2107 result = RESULT_OK;
AzureIoTClient 41:71c01aa3df1a 2108 }
AzureIoTClient 41:71c01aa3df1a 2109 }
AzureIoTClient 41:71c01aa3df1a 2110 else
AzureIoTClient 41:71c01aa3df1a 2111 {
AzureIoTClient 41:71c01aa3df1a 2112 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_171: [If name does not match any supported option, authentication_set_option shall fail and return a non-zero value]
AzureIoTClient 41:71c01aa3df1a 2113 LogError("telemetry_messenger_set_option failed (option with name '%s' is not suppported)", name);
AzureIoTClient 41:71c01aa3df1a 2114 result = __FAILURE__;
AzureIoTClient 41:71c01aa3df1a 2115 }
AzureIoTClient 41:71c01aa3df1a 2116 }
AzureIoTClient 36:f78f9a56869e 2117
AzureIoTClient 41:71c01aa3df1a 2118 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_172: [If no errors occur, telemetry_messenger_set_option shall return 0]
AzureIoTClient 41:71c01aa3df1a 2119 return result;
AzureIoTClient 36:f78f9a56869e 2120 }
AzureIoTClient 36:f78f9a56869e 2121
AzureIoTClient 36:f78f9a56869e 2122 OPTIONHANDLER_HANDLE telemetry_messenger_retrieve_options(TELEMETRY_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 36:f78f9a56869e 2123 {
AzureIoTClient 41:71c01aa3df1a 2124 OPTIONHANDLER_HANDLE result;
AzureIoTClient 36:f78f9a56869e 2125
AzureIoTClient 41:71c01aa3df1a 2126 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_173: [If `messenger_handle` is NULL, telemetry_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 2127 if (messenger_handle == NULL)
AzureIoTClient 41:71c01aa3df1a 2128 {
AzureIoTClient 41:71c01aa3df1a 2129 LogError("Failed to retrieve options from messenger instance (messenger_handle is NULL)");
AzureIoTClient 41:71c01aa3df1a 2130 result = NULL;
AzureIoTClient 41:71c01aa3df1a 2131 }
AzureIoTClient 41:71c01aa3df1a 2132 else
AzureIoTClient 41:71c01aa3df1a 2133 {
AzureIoTClient 41:71c01aa3df1a 2134 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_174: [An OPTIONHANDLER_HANDLE instance shall be created using OptionHandler_Create]
AzureIoTClient 41:71c01aa3df1a 2135 OPTIONHANDLER_HANDLE options = OptionHandler_Create(telemetry_messenger_clone_option, telemetry_messenger_destroy_option, (pfSetOption)telemetry_messenger_set_option);
AzureIoTClient 36:f78f9a56869e 2136
AzureIoTClient 41:71c01aa3df1a 2137 if (options == NULL)
AzureIoTClient 41:71c01aa3df1a 2138 {
AzureIoTClient 41:71c01aa3df1a 2139 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_175: [If an OPTIONHANDLER_HANDLE instance fails to be created, telemetry_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 41:71c01aa3df1a 2140 LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed)");
AzureIoTClient 41:71c01aa3df1a 2141 result = NULL;
AzureIoTClient 41:71c01aa3df1a 2142 }
AzureIoTClient 41:71c01aa3df1a 2143 else
AzureIoTClient 41:71c01aa3df1a 2144 {
AzureIoTClient 41:71c01aa3df1a 2145 TELEMETRY_MESSENGER_INSTANCE* instance = (TELEMETRY_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 36:f78f9a56869e 2146
AzureIoTClient 41:71c01aa3df1a 2147 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_176: [Each option of `instance` shall be added to the OPTIONHANDLER_HANDLE instance using OptionHandler_AddOption]
AzureIoTClient 41:71c01aa3df1a 2148 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_177: [If OptionHandler_AddOption fails, telemetry_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 51:269e65571b39 2149 if (OptionHandler_AddOption(options, TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, (void*)&instance->event_send_timeout_secs) != OPTIONHANDLER_OK)
AzureIoTClient 41:71c01aa3df1a 2150 {
AzureIoTClient 51:269e65571b39 2151 LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed for option '%s')", TELEMETRY_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS);
AzureIoTClient 41:71c01aa3df1a 2152 result = NULL;
AzureIoTClient 41:71c01aa3df1a 2153 }
AzureIoTClient 41:71c01aa3df1a 2154 else
AzureIoTClient 41:71c01aa3df1a 2155 {
AzureIoTClient 41:71c01aa3df1a 2156 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_179: [If no failures occur, telemetry_messenger_retrieve_options shall return the OPTIONHANDLER_HANDLE instance]
AzureIoTClient 41:71c01aa3df1a 2157 result = options;
AzureIoTClient 41:71c01aa3df1a 2158 }
AzureIoTClient 36:f78f9a56869e 2159
AzureIoTClient 41:71c01aa3df1a 2160 if (result == NULL)
AzureIoTClient 41:71c01aa3df1a 2161 {
AzureIoTClient 41:71c01aa3df1a 2162 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_178: [If telemetry_messenger_retrieve_options fails, any allocated memory shall be freed]
AzureIoTClient 41:71c01aa3df1a 2163 OptionHandler_Destroy(options);
AzureIoTClient 41:71c01aa3df1a 2164 }
AzureIoTClient 41:71c01aa3df1a 2165 }
AzureIoTClient 41:71c01aa3df1a 2166 }
AzureIoTClient 36:f78f9a56869e 2167
AzureIoTClient 41:71c01aa3df1a 2168 return result;
AzureIoTClient 36:f78f9a56869e 2169 }