Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Committer:
AzureIoTClient
Date:
Thu Oct 04 09:14:47 2018 -0700
Revision:
57:56ac1346c70d
Parent:
56:8704100b3b54
1.2.10

Who changed what in which revision?

UserRevisionLine numberNew contents of line
AzureIoTClient 39:e98d5df6dc74 1 // Copyright (c) Microsoft. All rights reserved.
AzureIoTClient 39:e98d5df6dc74 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
AzureIoTClient 39:e98d5df6dc74 3
AzureIoTClient 39:e98d5df6dc74 4 #include <stdlib.h>
AzureIoTClient 39:e98d5df6dc74 5 #include <stdbool.h>
AzureIoTClient 39:e98d5df6dc74 6 #include "azure_c_shared_utility/optimize_size.h"
AzureIoTClient 39:e98d5df6dc74 7 #include "azure_c_shared_utility/crt_abstractions.h"
AzureIoTClient 39:e98d5df6dc74 8 #include "azure_c_shared_utility/gballoc.h"
AzureIoTClient 56:8704100b3b54 9 #include "azure_c_shared_utility/agenttime.h"
AzureIoTClient 39:e98d5df6dc74 10 #include "azure_c_shared_utility/xlogging.h"
AzureIoTClient 39:e98d5df6dc74 11 #include "azure_c_shared_utility/uniqueid.h"
AzureIoTClient 39:e98d5df6dc74 12 #include "azure_uamqp_c/link.h"
AzureIoTClient 39:e98d5df6dc74 13 #include "azure_uamqp_c/messaging.h"
AzureIoTClient 39:e98d5df6dc74 14 #include "azure_uamqp_c/message_sender.h"
AzureIoTClient 39:e98d5df6dc74 15 #include "azure_uamqp_c/message_receiver.h"
AzureIoTClient 53:e21e1e88460f 16 #include "internal/message_queue.h"
AzureIoTClient 53:e21e1e88460f 17 #include "internal/iothub_client_retry_control.h"
AzureIoTClient 53:e21e1e88460f 18 #include "internal/iothubtransport_amqp_messenger.h"
AzureIoTClient 39:e98d5df6dc74 19
AzureIoTClient 39:e98d5df6dc74 20 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_SEND_STATUS, AMQP_MESSENGER_SEND_STATUS_VALUES);
AzureIoTClient 39:e98d5df6dc74 21 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_SEND_RESULT, AMQP_MESSENGER_SEND_RESULT_VALUES);
AzureIoTClient 39:e98d5df6dc74 22 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_REASON, AMQP_MESSENGER_REASON_VALUES);
AzureIoTClient 39:e98d5df6dc74 23 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_DISPOSITION_RESULT, AMQP_MESSENGER_DISPOSITION_RESULT_VALUES);
AzureIoTClient 39:e98d5df6dc74 24 DEFINE_ENUM_STRINGS(AMQP_MESSENGER_STATE, AMQP_MESSENGER_STATE_VALUES);
AzureIoTClient 39:e98d5df6dc74 25
AzureIoTClient 39:e98d5df6dc74 26
AzureIoTClient 39:e98d5df6dc74 27 #define RESULT_OK 0
AzureIoTClient 39:e98d5df6dc74 28 #define INDEFINITE_TIME ((time_t)(-1))
AzureIoTClient 39:e98d5df6dc74 29
AzureIoTClient 39:e98d5df6dc74 30 // AMQP Link address format: "amqps://<iot hub fqdn>/devices/<device-id>/<suffix>"
AzureIoTClient 39:e98d5df6dc74 31 #define LINK_ADDRESS_FORMAT "amqps://%s/devices/%s/%s"
AzureIoTClient 54:830550fef7ea 32 #define LINK_ADDRESS_MODULE_FORMAT "amqps://%s/devices/%s/modules/%s/%s"
AzureIoTClient 56:8704100b3b54 33 #define SEND_LINK_NAME_PREFIX "link-snd"
AzureIoTClient 39:e98d5df6dc74 34 #define MESSAGE_SENDER_MAX_LINK_SIZE UINT64_MAX
AzureIoTClient 56:8704100b3b54 35 #define RECEIVE_LINK_NAME_PREFIX "link-rcv"
AzureIoTClient 39:e98d5df6dc74 36 #define MESSAGE_RECEIVER_MAX_LINK_SIZE 65536
AzureIoTClient 39:e98d5df6dc74 37 #define DEFAULT_EVENT_SEND_RETRY_LIMIT 0
AzureIoTClient 39:e98d5df6dc74 38 #define DEFAULT_EVENT_SEND_TIMEOUT_SECS 600
AzureIoTClient 39:e98d5df6dc74 39 #define DEFAULT_MAX_SEND_ERROR_COUNT 10
AzureIoTClient 39:e98d5df6dc74 40 #define MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS 300
AzureIoTClient 39:e98d5df6dc74 41 #define MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS 300
AzureIoTClient 39:e98d5df6dc74 42 #define UNIQUE_ID_BUFFER_SIZE 37
AzureIoTClient 39:e98d5df6dc74 43
AzureIoTClient 39:e98d5df6dc74 44 static const char* MESSENGER_SAVED_MQ_OPTIONS = "amqp_message_queue_options";
AzureIoTClient 39:e98d5df6dc74 45
AzureIoTClient 39:e98d5df6dc74 46 typedef struct AMQP_MESSENGER_INSTANCE_TAG
AzureIoTClient 39:e98d5df6dc74 47 {
AzureIoTClient 56:8704100b3b54 48 AMQP_MESSENGER_CONFIG* config;
AzureIoTClient 39:e98d5df6dc74 49
AzureIoTClient 56:8704100b3b54 50 bool receive_messages;
AzureIoTClient 56:8704100b3b54 51 ON_AMQP_MESSENGER_MESSAGE_RECEIVED on_message_received_callback;
AzureIoTClient 56:8704100b3b54 52 void* on_message_received_context;
AzureIoTClient 39:e98d5df6dc74 53
AzureIoTClient 56:8704100b3b54 54 MESSAGE_QUEUE_HANDLE send_queue;
AzureIoTClient 56:8704100b3b54 55 AMQP_MESSENGER_STATE state;
AzureIoTClient 39:e98d5df6dc74 56
AzureIoTClient 56:8704100b3b54 57 SESSION_HANDLE session_handle;
AzureIoTClient 39:e98d5df6dc74 58
AzureIoTClient 56:8704100b3b54 59 LINK_HANDLE sender_link;
AzureIoTClient 56:8704100b3b54 60 MESSAGE_SENDER_HANDLE message_sender;
AzureIoTClient 56:8704100b3b54 61 MESSAGE_SENDER_STATE message_sender_current_state;
AzureIoTClient 56:8704100b3b54 62 MESSAGE_SENDER_STATE message_sender_previous_state;
AzureIoTClient 39:e98d5df6dc74 63
AzureIoTClient 56:8704100b3b54 64 LINK_HANDLE receiver_link;
AzureIoTClient 56:8704100b3b54 65 MESSAGE_RECEIVER_HANDLE message_receiver;
AzureIoTClient 56:8704100b3b54 66 MESSAGE_RECEIVER_STATE message_receiver_current_state;
AzureIoTClient 56:8704100b3b54 67 MESSAGE_RECEIVER_STATE message_receiver_previous_state;
AzureIoTClient 39:e98d5df6dc74 68
AzureIoTClient 56:8704100b3b54 69 size_t send_error_count;
AzureIoTClient 56:8704100b3b54 70 size_t max_send_error_count;
AzureIoTClient 39:e98d5df6dc74 71
AzureIoTClient 56:8704100b3b54 72 time_t last_message_sender_state_change_time;
AzureIoTClient 56:8704100b3b54 73 time_t last_message_receiver_state_change_time;
AzureIoTClient 39:e98d5df6dc74 74 } AMQP_MESSENGER_INSTANCE;
AzureIoTClient 39:e98d5df6dc74 75
AzureIoTClient 39:e98d5df6dc74 76 typedef struct MESSAGE_SEND_CONTEXT_TAG
AzureIoTClient 39:e98d5df6dc74 77 {
AzureIoTClient 56:8704100b3b54 78 MESSAGE_HANDLE message;
AzureIoTClient 56:8704100b3b54 79 bool is_destroyed;
AzureIoTClient 39:e98d5df6dc74 80
AzureIoTClient 56:8704100b3b54 81 AMQP_MESSENGER_INSTANCE* messenger;
AzureIoTClient 39:e98d5df6dc74 82
AzureIoTClient 56:8704100b3b54 83 AMQP_MESSENGER_SEND_COMPLETE_CALLBACK on_send_complete_callback;
AzureIoTClient 56:8704100b3b54 84 void* user_context;
AzureIoTClient 39:e98d5df6dc74 85
AzureIoTClient 56:8704100b3b54 86 PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback;
AzureIoTClient 39:e98d5df6dc74 87 } MESSAGE_SEND_CONTEXT;
AzureIoTClient 39:e98d5df6dc74 88
AzureIoTClient 39:e98d5df6dc74 89
AzureIoTClient 39:e98d5df6dc74 90
AzureIoTClient 39:e98d5df6dc74 91 static MESSAGE_SEND_CONTEXT* create_message_send_context()
AzureIoTClient 39:e98d5df6dc74 92 {
AzureIoTClient 56:8704100b3b54 93 MESSAGE_SEND_CONTEXT* result;
AzureIoTClient 39:e98d5df6dc74 94
AzureIoTClient 56:8704100b3b54 95 if ((result = (MESSAGE_SEND_CONTEXT*)malloc(sizeof(MESSAGE_SEND_CONTEXT))) == NULL)
AzureIoTClient 56:8704100b3b54 96 {
AzureIoTClient 56:8704100b3b54 97 LogError("Failed creating the message send context");
AzureIoTClient 56:8704100b3b54 98 }
AzureIoTClient 56:8704100b3b54 99 else
AzureIoTClient 56:8704100b3b54 100 {
AzureIoTClient 56:8704100b3b54 101 memset(result, 0, sizeof(MESSAGE_SEND_CONTEXT));
AzureIoTClient 56:8704100b3b54 102 }
AzureIoTClient 39:e98d5df6dc74 103
AzureIoTClient 56:8704100b3b54 104 return result;
AzureIoTClient 39:e98d5df6dc74 105 }
AzureIoTClient 39:e98d5df6dc74 106
AzureIoTClient 39:e98d5df6dc74 107 static bool is_valid_configuration(const AMQP_MESSENGER_CONFIG* config)
AzureIoTClient 39:e98d5df6dc74 108 {
AzureIoTClient 56:8704100b3b54 109 bool result;
AzureIoTClient 39:e98d5df6dc74 110
AzureIoTClient 56:8704100b3b54 111 if (config == NULL)
AzureIoTClient 56:8704100b3b54 112 {
AzureIoTClient 56:8704100b3b54 113 LogError("Invalid configuration (NULL)");
AzureIoTClient 56:8704100b3b54 114 result = false;
AzureIoTClient 56:8704100b3b54 115 }
AzureIoTClient 56:8704100b3b54 116 else if (config->client_version == NULL ||
AzureIoTClient 56:8704100b3b54 117 config->device_id == NULL ||
AzureIoTClient 56:8704100b3b54 118 config->iothub_host_fqdn == NULL ||
AzureIoTClient 56:8704100b3b54 119 config->receive_link.source_suffix == NULL ||
AzureIoTClient 56:8704100b3b54 120 config->send_link.target_suffix == NULL)
AzureIoTClient 56:8704100b3b54 121 {
AzureIoTClient 56:8704100b3b54 122 LogError("Invalid configuration (client_version=%p, device_id=%p, iothub_host_fqdn=%p, receive_link (source_suffix=%p), send_link (target_suffix=%p))",
AzureIoTClient 56:8704100b3b54 123 config->client_version, config->device_id, config->iothub_host_fqdn,
AzureIoTClient 56:8704100b3b54 124 config->receive_link.source_suffix, config->send_link.target_suffix);
AzureIoTClient 56:8704100b3b54 125 result = false;
AzureIoTClient 56:8704100b3b54 126 }
AzureIoTClient 56:8704100b3b54 127 else
AzureIoTClient 56:8704100b3b54 128 {
AzureIoTClient 56:8704100b3b54 129 result = true;
AzureIoTClient 56:8704100b3b54 130 }
AzureIoTClient 39:e98d5df6dc74 131
AzureIoTClient 56:8704100b3b54 132 return result;
AzureIoTClient 39:e98d5df6dc74 133 }
AzureIoTClient 39:e98d5df6dc74 134
AzureIoTClient 39:e98d5df6dc74 135 static void destroy_link_configuration(AMQP_MESSENGER_LINK_CONFIG* link_config)
AzureIoTClient 39:e98d5df6dc74 136 {
AzureIoTClient 56:8704100b3b54 137 if (link_config->target_suffix != NULL)
AzureIoTClient 56:8704100b3b54 138 {
AzureIoTClient 56:8704100b3b54 139 free((void*)link_config->target_suffix);
AzureIoTClient 56:8704100b3b54 140 link_config->target_suffix = NULL;
AzureIoTClient 56:8704100b3b54 141 }
AzureIoTClient 39:e98d5df6dc74 142
AzureIoTClient 56:8704100b3b54 143 if (link_config->source_suffix != NULL)
AzureIoTClient 56:8704100b3b54 144 {
AzureIoTClient 56:8704100b3b54 145 free((void*)link_config->source_suffix);
AzureIoTClient 56:8704100b3b54 146 link_config->source_suffix = NULL;
AzureIoTClient 56:8704100b3b54 147 }
AzureIoTClient 39:e98d5df6dc74 148
AzureIoTClient 56:8704100b3b54 149 if (link_config->attach_properties != NULL)
AzureIoTClient 56:8704100b3b54 150 {
AzureIoTClient 56:8704100b3b54 151 Map_Destroy(link_config->attach_properties);
AzureIoTClient 56:8704100b3b54 152 link_config->attach_properties = NULL;
AzureIoTClient 56:8704100b3b54 153 }
AzureIoTClient 39:e98d5df6dc74 154 }
AzureIoTClient 39:e98d5df6dc74 155
AzureIoTClient 39:e98d5df6dc74 156 static void destroy_configuration(AMQP_MESSENGER_CONFIG* config)
AzureIoTClient 39:e98d5df6dc74 157 {
AzureIoTClient 56:8704100b3b54 158 if (config != NULL)
AzureIoTClient 56:8704100b3b54 159 {
AzureIoTClient 56:8704100b3b54 160 if (config->client_version != NULL)
AzureIoTClient 56:8704100b3b54 161 {
AzureIoTClient 56:8704100b3b54 162 free((void*)config->client_version);
AzureIoTClient 56:8704100b3b54 163 }
AzureIoTClient 39:e98d5df6dc74 164
AzureIoTClient 56:8704100b3b54 165 if (config->device_id != NULL)
AzureIoTClient 56:8704100b3b54 166 {
AzureIoTClient 56:8704100b3b54 167 free((void*)config->device_id);
AzureIoTClient 56:8704100b3b54 168 }
AzureIoTClient 39:e98d5df6dc74 169
AzureIoTClient 56:8704100b3b54 170 if (config->module_id != NULL)
AzureIoTClient 56:8704100b3b54 171 {
AzureIoTClient 56:8704100b3b54 172 free((void*)config->module_id);
AzureIoTClient 56:8704100b3b54 173 }
AzureIoTClient 54:830550fef7ea 174
AzureIoTClient 56:8704100b3b54 175 if (config->iothub_host_fqdn != NULL)
AzureIoTClient 56:8704100b3b54 176 {
AzureIoTClient 56:8704100b3b54 177 free((void*)config->iothub_host_fqdn);
AzureIoTClient 56:8704100b3b54 178 }
AzureIoTClient 39:e98d5df6dc74 179
AzureIoTClient 56:8704100b3b54 180 destroy_link_configuration(&config->send_link);
AzureIoTClient 56:8704100b3b54 181 destroy_link_configuration(&config->receive_link);
AzureIoTClient 39:e98d5df6dc74 182
AzureIoTClient 56:8704100b3b54 183 free(config);
AzureIoTClient 56:8704100b3b54 184 }
AzureIoTClient 39:e98d5df6dc74 185 }
AzureIoTClient 39:e98d5df6dc74 186
AzureIoTClient 39:e98d5df6dc74 187 static int clone_link_configuration(role link_role, AMQP_MESSENGER_LINK_CONFIG* dst_config, const AMQP_MESSENGER_LINK_CONFIG* src_config)
AzureIoTClient 39:e98d5df6dc74 188 {
AzureIoTClient 56:8704100b3b54 189 int result;
AzureIoTClient 39:e98d5df6dc74 190
AzureIoTClient 56:8704100b3b54 191 if (link_role == role_sender &&
AzureIoTClient 56:8704100b3b54 192 mallocAndStrcpy_s(&dst_config->target_suffix, src_config->target_suffix) != 0)
AzureIoTClient 56:8704100b3b54 193 {
AzureIoTClient 56:8704100b3b54 194 LogError("Failed copying send_link_target_suffix");
AzureIoTClient 56:8704100b3b54 195 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 196 }
AzureIoTClient 56:8704100b3b54 197 else if (link_role == role_receiver &&
AzureIoTClient 56:8704100b3b54 198 mallocAndStrcpy_s(&dst_config->source_suffix, src_config->source_suffix) != 0)
AzureIoTClient 56:8704100b3b54 199 {
AzureIoTClient 56:8704100b3b54 200 LogError("Failed copying receive_link_source_suffix");
AzureIoTClient 56:8704100b3b54 201 destroy_link_configuration(dst_config);
AzureIoTClient 56:8704100b3b54 202 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 203 }
AzureIoTClient 56:8704100b3b54 204 else if (src_config->attach_properties != NULL &&
AzureIoTClient 56:8704100b3b54 205 (dst_config->attach_properties = Map_Clone(src_config->attach_properties)) == NULL)
AzureIoTClient 56:8704100b3b54 206 {
AzureIoTClient 56:8704100b3b54 207 LogError("Failed copying link attach properties");
AzureIoTClient 56:8704100b3b54 208 destroy_link_configuration(dst_config);
AzureIoTClient 56:8704100b3b54 209 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 210 }
AzureIoTClient 56:8704100b3b54 211 else
AzureIoTClient 56:8704100b3b54 212 {
AzureIoTClient 56:8704100b3b54 213 dst_config->snd_settle_mode = src_config->snd_settle_mode;
AzureIoTClient 56:8704100b3b54 214 dst_config->rcv_settle_mode = src_config->rcv_settle_mode;
AzureIoTClient 39:e98d5df6dc74 215
AzureIoTClient 56:8704100b3b54 216 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 217 }
AzureIoTClient 39:e98d5df6dc74 218
AzureIoTClient 56:8704100b3b54 219 return result;
AzureIoTClient 39:e98d5df6dc74 220 }
AzureIoTClient 39:e98d5df6dc74 221
AzureIoTClient 39:e98d5df6dc74 222 static AMQP_MESSENGER_CONFIG* clone_configuration(const AMQP_MESSENGER_CONFIG* config)
AzureIoTClient 39:e98d5df6dc74 223 {
AzureIoTClient 56:8704100b3b54 224 AMQP_MESSENGER_CONFIG* result;
AzureIoTClient 39:e98d5df6dc74 225
AzureIoTClient 56:8704100b3b54 226 if ((result = (AMQP_MESSENGER_CONFIG*)malloc(sizeof(AMQP_MESSENGER_CONFIG))) == NULL)
AzureIoTClient 56:8704100b3b54 227 {
AzureIoTClient 56:8704100b3b54 228 LogError("Failed allocating AMQP_MESSENGER_CONFIG");
AzureIoTClient 56:8704100b3b54 229 }
AzureIoTClient 56:8704100b3b54 230 else
AzureIoTClient 56:8704100b3b54 231 {
AzureIoTClient 56:8704100b3b54 232 memset(result, 0, sizeof(AMQP_MESSENGER_CONFIG));
AzureIoTClient 39:e98d5df6dc74 233
AzureIoTClient 56:8704100b3b54 234 if (mallocAndStrcpy_s(&result->client_version, config->client_version) != 0)
AzureIoTClient 56:8704100b3b54 235 {
AzureIoTClient 56:8704100b3b54 236 LogError("Failed copying device_id");
AzureIoTClient 56:8704100b3b54 237 destroy_configuration(result);
AzureIoTClient 56:8704100b3b54 238 result = NULL;
AzureIoTClient 56:8704100b3b54 239 }
AzureIoTClient 56:8704100b3b54 240 else if (mallocAndStrcpy_s(&result->device_id, config->device_id) != 0)
AzureIoTClient 56:8704100b3b54 241 {
AzureIoTClient 56:8704100b3b54 242 LogError("Failed copying device_id");
AzureIoTClient 56:8704100b3b54 243 destroy_configuration(result);
AzureIoTClient 56:8704100b3b54 244 result = NULL;
AzureIoTClient 56:8704100b3b54 245 }
AzureIoTClient 56:8704100b3b54 246 else if ((config->module_id != NULL) && (mallocAndStrcpy_s(&result->module_id, config->module_id) != 0))
AzureIoTClient 56:8704100b3b54 247 {
AzureIoTClient 56:8704100b3b54 248 LogError("Failed copying module_id");
AzureIoTClient 56:8704100b3b54 249 destroy_configuration(result);
AzureIoTClient 56:8704100b3b54 250 result = NULL;
AzureIoTClient 56:8704100b3b54 251 }
AzureIoTClient 56:8704100b3b54 252 else if (mallocAndStrcpy_s(&result->iothub_host_fqdn, config->iothub_host_fqdn) != 0)
AzureIoTClient 56:8704100b3b54 253 {
AzureIoTClient 56:8704100b3b54 254 LogError("Failed copying iothub_host_fqdn");
AzureIoTClient 56:8704100b3b54 255 destroy_configuration(result);
AzureIoTClient 56:8704100b3b54 256 result = NULL;
AzureIoTClient 56:8704100b3b54 257 }
AzureIoTClient 56:8704100b3b54 258 else if (clone_link_configuration(role_sender, &result->send_link, &config->send_link) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 259 {
AzureIoTClient 56:8704100b3b54 260 LogError("Failed copying send link configuration");
AzureIoTClient 56:8704100b3b54 261 destroy_configuration(result);
AzureIoTClient 56:8704100b3b54 262 result = NULL;
AzureIoTClient 56:8704100b3b54 263 }
AzureIoTClient 56:8704100b3b54 264 else if (clone_link_configuration(role_receiver, &result->receive_link, &config->receive_link) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 265 {
AzureIoTClient 56:8704100b3b54 266 LogError("Failed copying receive link configuration");
AzureIoTClient 56:8704100b3b54 267 destroy_configuration(result);
AzureIoTClient 56:8704100b3b54 268 result = NULL;
AzureIoTClient 56:8704100b3b54 269 }
AzureIoTClient 56:8704100b3b54 270 else
AzureIoTClient 56:8704100b3b54 271 {
AzureIoTClient 56:8704100b3b54 272 result->on_state_changed_callback = config->on_state_changed_callback;
AzureIoTClient 56:8704100b3b54 273 result->on_state_changed_context = config->on_state_changed_context;
AzureIoTClient 56:8704100b3b54 274 result->on_subscription_changed_callback = config->on_subscription_changed_callback;
AzureIoTClient 56:8704100b3b54 275 result->on_subscription_changed_context = config->on_subscription_changed_context;
AzureIoTClient 56:8704100b3b54 276 }
AzureIoTClient 56:8704100b3b54 277 }
AzureIoTClient 39:e98d5df6dc74 278
AzureIoTClient 56:8704100b3b54 279 return result;
AzureIoTClient 39:e98d5df6dc74 280 }
AzureIoTClient 39:e98d5df6dc74 281
AzureIoTClient 39:e98d5df6dc74 282 static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context)
AzureIoTClient 39:e98d5df6dc74 283 {
AzureIoTClient 54:830550fef7ea 284 free(context);
AzureIoTClient 39:e98d5df6dc74 285 }
AzureIoTClient 39:e98d5df6dc74 286
AzureIoTClient 54:830550fef7ea 287 static STRING_HANDLE create_link_address(const char* host_fqdn, const char* device_id, const char* module_id, const char* address_suffix)
AzureIoTClient 39:e98d5df6dc74 288 {
AzureIoTClient 54:830550fef7ea 289 STRING_HANDLE link_address;
AzureIoTClient 39:e98d5df6dc74 290
AzureIoTClient 54:830550fef7ea 291 if ((link_address = STRING_new()) == NULL)
AzureIoTClient 54:830550fef7ea 292 {
AzureIoTClient 54:830550fef7ea 293 LogError("failed creating link_address (STRING_new failed)");
AzureIoTClient 54:830550fef7ea 294 }
AzureIoTClient 54:830550fef7ea 295 else
AzureIoTClient 54:830550fef7ea 296 {
AzureIoTClient 54:830550fef7ea 297 if (module_id != NULL)
AzureIoTClient 54:830550fef7ea 298 {
AzureIoTClient 54:830550fef7ea 299 if (STRING_sprintf(link_address, LINK_ADDRESS_MODULE_FORMAT, host_fqdn, device_id, module_id, address_suffix) != RESULT_OK)
AzureIoTClient 54:830550fef7ea 300 {
AzureIoTClient 54:830550fef7ea 301 LogError("Failed creating the link_address for a module (STRING_sprintf failed)");
AzureIoTClient 54:830550fef7ea 302 STRING_delete(link_address);
AzureIoTClient 54:830550fef7ea 303 link_address = NULL;
AzureIoTClient 54:830550fef7ea 304 }
AzureIoTClient 54:830550fef7ea 305 }
AzureIoTClient 54:830550fef7ea 306 else
AzureIoTClient 54:830550fef7ea 307 {
AzureIoTClient 54:830550fef7ea 308 if (STRING_sprintf(link_address, LINK_ADDRESS_FORMAT, host_fqdn, device_id, address_suffix) != RESULT_OK)
AzureIoTClient 54:830550fef7ea 309 {
AzureIoTClient 54:830550fef7ea 310 LogError("Failed creating the link_address (STRING_sprintf failed)");
AzureIoTClient 54:830550fef7ea 311 STRING_delete(link_address);
AzureIoTClient 54:830550fef7ea 312 link_address = NULL;
AzureIoTClient 54:830550fef7ea 313 }
AzureIoTClient 54:830550fef7ea 314 }
AzureIoTClient 54:830550fef7ea 315 }
AzureIoTClient 54:830550fef7ea 316 return link_address;
AzureIoTClient 39:e98d5df6dc74 317 }
AzureIoTClient 39:e98d5df6dc74 318
AzureIoTClient 39:e98d5df6dc74 319 static STRING_HANDLE create_link_terminus_name(STRING_HANDLE link_name, const char* suffix)
AzureIoTClient 39:e98d5df6dc74 320 {
AzureIoTClient 56:8704100b3b54 321 STRING_HANDLE terminus_name;
AzureIoTClient 39:e98d5df6dc74 322
AzureIoTClient 56:8704100b3b54 323 if ((terminus_name = STRING_new()) == NULL)
AzureIoTClient 56:8704100b3b54 324 {
AzureIoTClient 56:8704100b3b54 325 LogError("Failed creating the terminus name (STRING_new failed; %s)", suffix);
AzureIoTClient 56:8704100b3b54 326 }
AzureIoTClient 56:8704100b3b54 327 else
AzureIoTClient 56:8704100b3b54 328 {
AzureIoTClient 56:8704100b3b54 329 const char* link_name_char_ptr = STRING_c_str(link_name);
AzureIoTClient 39:e98d5df6dc74 330
AzureIoTClient 56:8704100b3b54 331 if (STRING_sprintf(terminus_name, "%s-%s", link_name_char_ptr, suffix) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 332 {
AzureIoTClient 56:8704100b3b54 333 STRING_delete(terminus_name);
AzureIoTClient 56:8704100b3b54 334 terminus_name = NULL;
AzureIoTClient 56:8704100b3b54 335 LogError("Failed creating the terminus name (STRING_sprintf failed; %s)", suffix);
AzureIoTClient 56:8704100b3b54 336 }
AzureIoTClient 56:8704100b3b54 337 }
AzureIoTClient 39:e98d5df6dc74 338
AzureIoTClient 56:8704100b3b54 339 return terminus_name;
AzureIoTClient 39:e98d5df6dc74 340 }
AzureIoTClient 39:e98d5df6dc74 341
AzureIoTClient 39:e98d5df6dc74 342 static STRING_HANDLE create_link_name(role link_role, const char* device_id)
AzureIoTClient 39:e98d5df6dc74 343 {
AzureIoTClient 56:8704100b3b54 344 char* unique_id;
AzureIoTClient 56:8704100b3b54 345 STRING_HANDLE result;
AzureIoTClient 39:e98d5df6dc74 346
AzureIoTClient 56:8704100b3b54 347 if ((unique_id = (char*)malloc(sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1)) == NULL)
AzureIoTClient 56:8704100b3b54 348 {
AzureIoTClient 56:8704100b3b54 349 LogError("Failed generating an unique tag (malloc failed)");
AzureIoTClient 56:8704100b3b54 350 result = NULL;
AzureIoTClient 56:8704100b3b54 351 }
AzureIoTClient 56:8704100b3b54 352 else
AzureIoTClient 56:8704100b3b54 353 {
AzureIoTClient 56:8704100b3b54 354 memset(unique_id, 0, sizeof(char) * UNIQUE_ID_BUFFER_SIZE + 1);
AzureIoTClient 39:e98d5df6dc74 355
AzureIoTClient 56:8704100b3b54 356 if (UniqueId_Generate(unique_id, UNIQUE_ID_BUFFER_SIZE) != UNIQUEID_OK)
AzureIoTClient 56:8704100b3b54 357 {
AzureIoTClient 56:8704100b3b54 358 LogError("Failed generating an unique tag (UniqueId_Generate failed)");
AzureIoTClient 56:8704100b3b54 359 result = NULL;
AzureIoTClient 56:8704100b3b54 360 }
AzureIoTClient 56:8704100b3b54 361 else if ((result = STRING_new()) == NULL)
AzureIoTClient 56:8704100b3b54 362 {
AzureIoTClient 56:8704100b3b54 363 LogError("Failed generating an unique tag (STRING_new failed)");
AzureIoTClient 56:8704100b3b54 364 }
AzureIoTClient 56:8704100b3b54 365 else if (STRING_sprintf(result, "%s-%s-%s", (link_role == role_sender ? SEND_LINK_NAME_PREFIX : RECEIVE_LINK_NAME_PREFIX), device_id, unique_id) != 0)
AzureIoTClient 56:8704100b3b54 366 {
AzureIoTClient 56:8704100b3b54 367 LogError("Failed generating an unique tag (STRING_sprintf failed)");
AzureIoTClient 56:8704100b3b54 368 STRING_delete(result);
AzureIoTClient 56:8704100b3b54 369 result = NULL;
AzureIoTClient 56:8704100b3b54 370 }
AzureIoTClient 39:e98d5df6dc74 371
AzureIoTClient 56:8704100b3b54 372 free(unique_id);
AzureIoTClient 56:8704100b3b54 373 }
AzureIoTClient 39:e98d5df6dc74 374
AzureIoTClient 56:8704100b3b54 375 return result;
AzureIoTClient 39:e98d5df6dc74 376 }
AzureIoTClient 39:e98d5df6dc74 377
AzureIoTClient 39:e98d5df6dc74 378 static void update_messenger_state(AMQP_MESSENGER_INSTANCE* instance, AMQP_MESSENGER_STATE new_state)
AzureIoTClient 39:e98d5df6dc74 379 {
AzureIoTClient 56:8704100b3b54 380 if (new_state != instance->state)
AzureIoTClient 56:8704100b3b54 381 {
AzureIoTClient 56:8704100b3b54 382 AMQP_MESSENGER_STATE previous_state = instance->state;
AzureIoTClient 56:8704100b3b54 383 instance->state = new_state;
AzureIoTClient 39:e98d5df6dc74 384
AzureIoTClient 56:8704100b3b54 385 if (instance->config != NULL && instance->config->on_state_changed_callback != NULL)
AzureIoTClient 56:8704100b3b54 386 {
AzureIoTClient 56:8704100b3b54 387 instance->config->on_state_changed_callback(instance->config->on_state_changed_context, previous_state, new_state);
AzureIoTClient 56:8704100b3b54 388 }
AzureIoTClient 56:8704100b3b54 389 }
AzureIoTClient 39:e98d5df6dc74 390 }
AzureIoTClient 39:e98d5df6dc74 391
AzureIoTClient 39:e98d5df6dc74 392 static int add_link_attach_properties(LINK_HANDLE link, MAP_HANDLE user_defined_properties)
AzureIoTClient 39:e98d5df6dc74 393 {
AzureIoTClient 56:8704100b3b54 394 int result;
AzureIoTClient 56:8704100b3b54 395 fields attach_properties;
AzureIoTClient 39:e98d5df6dc74 396
AzureIoTClient 56:8704100b3b54 397 if ((attach_properties = amqpvalue_create_map()) == NULL)
AzureIoTClient 56:8704100b3b54 398 {
AzureIoTClient 56:8704100b3b54 399 LogError("Failed to create the map for attach properties.");
AzureIoTClient 56:8704100b3b54 400 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 401 }
AzureIoTClient 56:8704100b3b54 402 else
AzureIoTClient 56:8704100b3b54 403 {
AzureIoTClient 56:8704100b3b54 404 const char* const* keys;
AzureIoTClient 56:8704100b3b54 405 const char* const* values;
AzureIoTClient 56:8704100b3b54 406 size_t count;
AzureIoTClient 39:e98d5df6dc74 407
AzureIoTClient 56:8704100b3b54 408 if (Map_GetInternals(user_defined_properties, &keys, &values, &count) != MAP_OK)
AzureIoTClient 56:8704100b3b54 409 {
AzureIoTClient 56:8704100b3b54 410 LogError("failed getting user defined properties details.");
AzureIoTClient 56:8704100b3b54 411 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 412 }
AzureIoTClient 56:8704100b3b54 413 else
AzureIoTClient 56:8704100b3b54 414 {
AzureIoTClient 56:8704100b3b54 415 size_t i;
AzureIoTClient 56:8704100b3b54 416 result = RESULT_OK;
AzureIoTClient 39:e98d5df6dc74 417
AzureIoTClient 56:8704100b3b54 418 for (i = 0; i < count && result == RESULT_OK; i++)
AzureIoTClient 56:8704100b3b54 419 {
AzureIoTClient 56:8704100b3b54 420 AMQP_VALUE key;
AzureIoTClient 56:8704100b3b54 421 AMQP_VALUE value;
AzureIoTClient 39:e98d5df6dc74 422
AzureIoTClient 56:8704100b3b54 423 if ((key = amqpvalue_create_symbol(keys[i])) == NULL)
AzureIoTClient 56:8704100b3b54 424 {
AzureIoTClient 56:8704100b3b54 425 LogError("Failed creating AMQP_VALUE For key %s.", keys[i]);
AzureIoTClient 56:8704100b3b54 426 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 427 }
AzureIoTClient 56:8704100b3b54 428 else
AzureIoTClient 56:8704100b3b54 429 {
AzureIoTClient 56:8704100b3b54 430 if ((value = amqpvalue_create_string(values[i])) == NULL)
AzureIoTClient 56:8704100b3b54 431 {
AzureIoTClient 56:8704100b3b54 432 LogError("Failed creating AMQP_VALUE For key %s value", keys[i]);
AzureIoTClient 56:8704100b3b54 433 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 434 }
AzureIoTClient 56:8704100b3b54 435 else
AzureIoTClient 56:8704100b3b54 436 {
AzureIoTClient 56:8704100b3b54 437 if (amqpvalue_set_map_value(attach_properties, key, value) != 0)
AzureIoTClient 56:8704100b3b54 438 {
AzureIoTClient 56:8704100b3b54 439 LogError("Failed adding property %s to map", keys[i]);
AzureIoTClient 56:8704100b3b54 440 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 441 }
AzureIoTClient 39:e98d5df6dc74 442
AzureIoTClient 56:8704100b3b54 443 amqpvalue_destroy(value);
AzureIoTClient 56:8704100b3b54 444 }
AzureIoTClient 39:e98d5df6dc74 445
AzureIoTClient 56:8704100b3b54 446 amqpvalue_destroy(key);
AzureIoTClient 56:8704100b3b54 447 }
AzureIoTClient 56:8704100b3b54 448 }
AzureIoTClient 39:e98d5df6dc74 449
AzureIoTClient 56:8704100b3b54 450 if (result == RESULT_OK)
AzureIoTClient 56:8704100b3b54 451 {
AzureIoTClient 56:8704100b3b54 452 if (link_set_attach_properties(link, attach_properties) != 0)
AzureIoTClient 56:8704100b3b54 453 {
AzureIoTClient 56:8704100b3b54 454 LogError("Failed attaching properties to link");
AzureIoTClient 56:8704100b3b54 455 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 456 }
AzureIoTClient 56:8704100b3b54 457 else
AzureIoTClient 56:8704100b3b54 458 {
AzureIoTClient 56:8704100b3b54 459 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 460 }
AzureIoTClient 56:8704100b3b54 461 }
AzureIoTClient 56:8704100b3b54 462 }
AzureIoTClient 39:e98d5df6dc74 463
AzureIoTClient 56:8704100b3b54 464 amqpvalue_destroy(attach_properties);
AzureIoTClient 56:8704100b3b54 465 }
AzureIoTClient 39:e98d5df6dc74 466
AzureIoTClient 56:8704100b3b54 467 return result;
AzureIoTClient 39:e98d5df6dc74 468 }
AzureIoTClient 39:e98d5df6dc74 469
AzureIoTClient 39:e98d5df6dc74 470 static int create_link_terminus(role link_role, STRING_HANDLE link_name, STRING_HANDLE link_address, AMQP_VALUE* source, AMQP_VALUE* target)
AzureIoTClient 39:e98d5df6dc74 471 {
AzureIoTClient 56:8704100b3b54 472 int result;
AzureIoTClient 56:8704100b3b54 473 STRING_HANDLE terminus_name;
AzureIoTClient 56:8704100b3b54 474 const char* source_name;
AzureIoTClient 56:8704100b3b54 475 const char* target_name;
AzureIoTClient 39:e98d5df6dc74 476
AzureIoTClient 56:8704100b3b54 477 if (link_role == role_sender)
AzureIoTClient 56:8704100b3b54 478 {
AzureIoTClient 56:8704100b3b54 479 if ((terminus_name = create_link_terminus_name(link_name, "source")) == NULL)
AzureIoTClient 56:8704100b3b54 480 {
AzureIoTClient 56:8704100b3b54 481 LogError("Failed creating terminus name");
AzureIoTClient 56:8704100b3b54 482 source_name = NULL;
AzureIoTClient 56:8704100b3b54 483 target_name = NULL;
AzureIoTClient 56:8704100b3b54 484 }
AzureIoTClient 56:8704100b3b54 485 else
AzureIoTClient 56:8704100b3b54 486 {
AzureIoTClient 56:8704100b3b54 487 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_077: [The AMQP link source shall be defined as "<link name>-source"]
AzureIoTClient 56:8704100b3b54 488 source_name = STRING_c_str(terminus_name);
AzureIoTClient 56:8704100b3b54 489 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_094: [The AMQP link source shall be defined as <link address>]
AzureIoTClient 56:8704100b3b54 490 target_name = STRING_c_str(link_address);
AzureIoTClient 56:8704100b3b54 491 }
AzureIoTClient 56:8704100b3b54 492 }
AzureIoTClient 56:8704100b3b54 493 else
AzureIoTClient 56:8704100b3b54 494 {
AzureIoTClient 56:8704100b3b54 495 if ((terminus_name = create_link_terminus_name(link_name, "target")) == NULL)
AzureIoTClient 56:8704100b3b54 496 {
AzureIoTClient 56:8704100b3b54 497 LogError("Failed creating terminus name");
AzureIoTClient 56:8704100b3b54 498 source_name = NULL;
AzureIoTClient 56:8704100b3b54 499 target_name = NULL;
AzureIoTClient 56:8704100b3b54 500 }
AzureIoTClient 56:8704100b3b54 501 else
AzureIoTClient 56:8704100b3b54 502 {
AzureIoTClient 56:8704100b3b54 503 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_078: [The AMQP link target shall be defined as <link address>]
AzureIoTClient 56:8704100b3b54 504 source_name = STRING_c_str(link_address);
AzureIoTClient 56:8704100b3b54 505 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_095: [The AMQP link target shall be defined as "<link name>-target"]
AzureIoTClient 56:8704100b3b54 506 target_name = STRING_c_str(terminus_name);
AzureIoTClient 56:8704100b3b54 507 }
AzureIoTClient 56:8704100b3b54 508 }
AzureIoTClient 39:e98d5df6dc74 509
AzureIoTClient 56:8704100b3b54 510 if (source_name == NULL || target_name == NULL)
AzureIoTClient 56:8704100b3b54 511 {
AzureIoTClient 56:8704100b3b54 512 LogError("Failed creating link source and/or target name (source=%p, target=%p)", source_name, target_name);
AzureIoTClient 56:8704100b3b54 513 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 514 }
AzureIoTClient 56:8704100b3b54 515 else
AzureIoTClient 56:8704100b3b54 516 {
AzureIoTClient 56:8704100b3b54 517 if ((*source = messaging_create_source(source_name)) == NULL)
AzureIoTClient 56:8704100b3b54 518 {
AzureIoTClient 56:8704100b3b54 519 LogError("Failed creating link source");
AzureIoTClient 56:8704100b3b54 520 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 521 }
AzureIoTClient 56:8704100b3b54 522 else
AzureIoTClient 56:8704100b3b54 523 {
AzureIoTClient 56:8704100b3b54 524 if ((*target = messaging_create_target(target_name)) == NULL)
AzureIoTClient 56:8704100b3b54 525 {
AzureIoTClient 56:8704100b3b54 526 LogError("Failed creating link target");
AzureIoTClient 56:8704100b3b54 527 amqpvalue_destroy(*source);
AzureIoTClient 56:8704100b3b54 528 *source = NULL;
AzureIoTClient 56:8704100b3b54 529 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 530 }
AzureIoTClient 56:8704100b3b54 531 else
AzureIoTClient 56:8704100b3b54 532 {
AzureIoTClient 56:8704100b3b54 533 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 534 }
AzureIoTClient 56:8704100b3b54 535 }
AzureIoTClient 56:8704100b3b54 536 }
AzureIoTClient 39:e98d5df6dc74 537
AzureIoTClient 39:e98d5df6dc74 538
AzureIoTClient 56:8704100b3b54 539 STRING_delete(terminus_name);
AzureIoTClient 39:e98d5df6dc74 540
AzureIoTClient 56:8704100b3b54 541 return result;
AzureIoTClient 39:e98d5df6dc74 542 }
AzureIoTClient 39:e98d5df6dc74 543
AzureIoTClient 54:830550fef7ea 544 static LINK_HANDLE create_link(role link_role, SESSION_HANDLE session_handle, AMQP_MESSENGER_LINK_CONFIG* link_config, const char* iothub_host_fqdn, const char* device_id, const char* module_id)
AzureIoTClient 39:e98d5df6dc74 545 {
AzureIoTClient 56:8704100b3b54 546 LINK_HANDLE result = NULL;
AzureIoTClient 56:8704100b3b54 547 STRING_HANDLE link_address;
AzureIoTClient 39:e98d5df6dc74 548
AzureIoTClient 56:8704100b3b54 549 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_075: [The AMQP link address shall be defined as "amqps://<`iothub_host_fqdn`>/devices/<`device_id`>/<`instance-config->send_link.source_suffix`>"]
AzureIoTClient 56:8704100b3b54 550 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_092: [The AMQP link address shall be defined as "amqps://<`iothub_host_fqdn`>/devices/<`device_id`>/<`instance-config->receive_link.target_suffix`>"]
AzureIoTClient 56:8704100b3b54 551 if ((link_address = create_link_address(iothub_host_fqdn, device_id, module_id, (link_role == role_sender ? link_config->target_suffix : link_config->source_suffix))) == NULL)
AzureIoTClient 56:8704100b3b54 552 {
AzureIoTClient 56:8704100b3b54 553 LogError("Failed creating the message sender (failed creating the 'link_address')");
AzureIoTClient 56:8704100b3b54 554 result = NULL;
AzureIoTClient 56:8704100b3b54 555 }
AzureIoTClient 56:8704100b3b54 556 else
AzureIoTClient 56:8704100b3b54 557 {
AzureIoTClient 56:8704100b3b54 558 STRING_HANDLE link_name;
AzureIoTClient 39:e98d5df6dc74 559
AzureIoTClient 56:8704100b3b54 560 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_076: [The AMQP link name shall be defined as "link-snd-<`device_id`>-<locally generated UUID>"]
AzureIoTClient 56:8704100b3b54 561 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_093: [The AMQP link name shall be defined as "link-rcv-<`device_id`>-<locally generated UUID>"]
AzureIoTClient 56:8704100b3b54 562 if ((link_name = create_link_name(link_role, device_id)) == NULL)
AzureIoTClient 56:8704100b3b54 563 {
AzureIoTClient 56:8704100b3b54 564 LogError("Failed creating the link name");
AzureIoTClient 56:8704100b3b54 565 result = NULL;
AzureIoTClient 56:8704100b3b54 566 }
AzureIoTClient 56:8704100b3b54 567 else
AzureIoTClient 56:8704100b3b54 568 {
AzureIoTClient 56:8704100b3b54 569 AMQP_VALUE source = NULL;
AzureIoTClient 56:8704100b3b54 570 AMQP_VALUE target = NULL;
AzureIoTClient 39:e98d5df6dc74 571
AzureIoTClient 56:8704100b3b54 572 if (create_link_terminus(link_role, link_name, link_address, &source, &target) == RESULT_OK)
AzureIoTClient 56:8704100b3b54 573 {
AzureIoTClient 56:8704100b3b54 574 if ((result = link_create(session_handle, STRING_c_str(link_name), link_role, source, target)) == NULL)
AzureIoTClient 56:8704100b3b54 575 {
AzureIoTClient 56:8704100b3b54 576 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_079: [If the link fails to be created, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 577 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_096: [If the link fails to be created, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 578 LogError("Failed creating the AMQP link");
AzureIoTClient 56:8704100b3b54 579 }
AzureIoTClient 56:8704100b3b54 580 else
AzureIoTClient 56:8704100b3b54 581 {
AzureIoTClient 56:8704100b3b54 582 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_082: [The AMQP link maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
AzureIoTClient 56:8704100b3b54 583 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_099: [The AMQP link maximum message size shall be set to UINT64_MAX using link_set_max_message_size()]
AzureIoTClient 56:8704100b3b54 584 if (link_set_max_message_size(result, MESSAGE_SENDER_MAX_LINK_SIZE) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 585 {
AzureIoTClient 56:8704100b3b54 586 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_083: [If link_set_max_message_size() fails, it shall be logged and ignored.]
AzureIoTClient 56:8704100b3b54 587 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_100: [If link_set_max_message_size() fails, it shall be logged and ignored.]
AzureIoTClient 56:8704100b3b54 588 LogError("Failed setting link max message size.");
AzureIoTClient 56:8704100b3b54 589 }
AzureIoTClient 39:e98d5df6dc74 590
AzureIoTClient 56:8704100b3b54 591 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_080: [The AMQP link shall have its ATTACH properties set using `instance->config->send_link.attach_properties`]
AzureIoTClient 56:8704100b3b54 592 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_097: [The AMQP link shall have its ATTACH properties set using `instance->config->receive_link.attach_properties`]
AzureIoTClient 56:8704100b3b54 593 if (link_config->attach_properties != NULL &&
AzureIoTClient 56:8704100b3b54 594 add_link_attach_properties(result, link_config->attach_properties) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 595 {
AzureIoTClient 56:8704100b3b54 596 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_081: [If the AMQP link attach properties fail to be set, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 597 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_098: [If the AMQP link attach properties fail to be set, amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 598 LogError("Failed setting link attach properties");
AzureIoTClient 56:8704100b3b54 599 link_destroy(result);
AzureIoTClient 56:8704100b3b54 600 result = NULL;
AzureIoTClient 56:8704100b3b54 601 }
AzureIoTClient 56:8704100b3b54 602 }
AzureIoTClient 39:e98d5df6dc74 603
AzureIoTClient 56:8704100b3b54 604 amqpvalue_destroy(source);
AzureIoTClient 56:8704100b3b54 605 amqpvalue_destroy(target);
AzureIoTClient 56:8704100b3b54 606 }
AzureIoTClient 39:e98d5df6dc74 607
AzureIoTClient 56:8704100b3b54 608 STRING_delete(link_name);
AzureIoTClient 56:8704100b3b54 609 }
AzureIoTClient 39:e98d5df6dc74 610
AzureIoTClient 56:8704100b3b54 611 STRING_delete(link_address);
AzureIoTClient 56:8704100b3b54 612 }
AzureIoTClient 39:e98d5df6dc74 613
AzureIoTClient 56:8704100b3b54 614 return result;
AzureIoTClient 39:e98d5df6dc74 615 }
AzureIoTClient 39:e98d5df6dc74 616
AzureIoTClient 39:e98d5df6dc74 617 static void destroy_message_sender(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 618 {
AzureIoTClient 56:8704100b3b54 619 if (instance->message_sender != NULL)
AzureIoTClient 56:8704100b3b54 620 {
AzureIoTClient 56:8704100b3b54 621 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_090: [`instance->message_sender` shall be destroyed using messagesender_destroy()]
AzureIoTClient 56:8704100b3b54 622 messagesender_destroy(instance->message_sender);
AzureIoTClient 56:8704100b3b54 623 instance->message_sender = NULL;
AzureIoTClient 56:8704100b3b54 624 }
AzureIoTClient 39:e98d5df6dc74 625
AzureIoTClient 56:8704100b3b54 626 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 627 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 628 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
AzureIoTClient 45:c09fac270e60 629
AzureIoTClient 56:8704100b3b54 630 if (instance->sender_link != NULL)
AzureIoTClient 56:8704100b3b54 631 {
AzureIoTClient 56:8704100b3b54 632 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_091: [`instance->sender_link` shall be destroyed using link_destroy()]
AzureIoTClient 56:8704100b3b54 633 link_destroy(instance->sender_link);
AzureIoTClient 56:8704100b3b54 634 instance->sender_link = NULL;
AzureIoTClient 56:8704100b3b54 635 }
AzureIoTClient 39:e98d5df6dc74 636 }
AzureIoTClient 39:e98d5df6dc74 637
AzureIoTClient 39:e98d5df6dc74 638 static void on_message_sender_state_changed_callback(void* context, MESSAGE_SENDER_STATE new_state, MESSAGE_SENDER_STATE previous_state)
AzureIoTClient 39:e98d5df6dc74 639 {
AzureIoTClient 56:8704100b3b54 640 if (context == NULL)
AzureIoTClient 56:8704100b3b54 641 {
AzureIoTClient 56:8704100b3b54 642 LogError("on_message_sender_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
AzureIoTClient 56:8704100b3b54 643 }
AzureIoTClient 56:8704100b3b54 644 else if (new_state != previous_state)
AzureIoTClient 56:8704100b3b54 645 {
AzureIoTClient 56:8704100b3b54 646 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
AzureIoTClient 39:e98d5df6dc74 647
AzureIoTClient 56:8704100b3b54 648 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_088: [`new_state`, `previous_state` shall be saved into `instance->message_sender_previous_state` and `instance->message_sender_current_state`]
AzureIoTClient 56:8704100b3b54 649 instance->message_sender_current_state = new_state;
AzureIoTClient 56:8704100b3b54 650 instance->message_sender_previous_state = previous_state;
AzureIoTClient 56:8704100b3b54 651 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_089: [`instance->last_message_sender_state_change_time` shall be set using get_time()]
AzureIoTClient 56:8704100b3b54 652 instance->last_message_sender_state_change_time = get_time(NULL);
AzureIoTClient 56:8704100b3b54 653 }
AzureIoTClient 39:e98d5df6dc74 654 }
AzureIoTClient 39:e98d5df6dc74 655
AzureIoTClient 39:e98d5df6dc74 656 static int create_message_sender(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 657 {
AzureIoTClient 56:8704100b3b54 658 int result;
AzureIoTClient 39:e98d5df6dc74 659
AzureIoTClient 56:8704100b3b54 660 if ((instance->sender_link = create_link(role_sender,
AzureIoTClient 56:8704100b3b54 661 instance->session_handle, &instance->config->send_link, instance->config->iothub_host_fqdn, instance->config->device_id, instance->config->module_id)) == NULL)
AzureIoTClient 56:8704100b3b54 662 {
AzureIoTClient 56:8704100b3b54 663 LogError("Failed creating the message sender link");
AzureIoTClient 56:8704100b3b54 664 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 665 }
AzureIoTClient 56:8704100b3b54 666 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_084: [`instance->message_sender` shall be created using messagesender_create(), passing the `instance->sender_link` and `on_message_sender_state_changed_callback`]
AzureIoTClient 56:8704100b3b54 667 else if ((instance->message_sender = messagesender_create(instance->sender_link, on_message_sender_state_changed_callback, (void*)instance)) == NULL)
AzureIoTClient 56:8704100b3b54 668 {
AzureIoTClient 56:8704100b3b54 669 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_085: [If messagesender_create() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 56:8704100b3b54 670 LogError("Failed creating the message sender (messagesender_create failed)");
AzureIoTClient 56:8704100b3b54 671 destroy_message_sender(instance);
AzureIoTClient 56:8704100b3b54 672 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 673 }
AzureIoTClient 56:8704100b3b54 674 else
AzureIoTClient 56:8704100b3b54 675 {
AzureIoTClient 56:8704100b3b54 676 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_086: [`instance->message_sender` shall be opened using messagesender_open()]
AzureIoTClient 56:8704100b3b54 677 if (messagesender_open(instance->message_sender) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 678 {
AzureIoTClient 56:8704100b3b54 679 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_087: [If messagesender_open() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 56:8704100b3b54 680 LogError("Failed opening the AMQP message sender.");
AzureIoTClient 56:8704100b3b54 681 destroy_message_sender(instance);
AzureIoTClient 56:8704100b3b54 682 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 683 }
AzureIoTClient 56:8704100b3b54 684 else
AzureIoTClient 56:8704100b3b54 685 {
AzureIoTClient 56:8704100b3b54 686 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 687 }
AzureIoTClient 56:8704100b3b54 688 }
AzureIoTClient 39:e98d5df6dc74 689
AzureIoTClient 56:8704100b3b54 690 return result;
AzureIoTClient 39:e98d5df6dc74 691 }
AzureIoTClient 39:e98d5df6dc74 692
AzureIoTClient 39:e98d5df6dc74 693 static void destroy_message_receiver(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 694 {
AzureIoTClient 56:8704100b3b54 695 if (instance->message_receiver != NULL)
AzureIoTClient 56:8704100b3b54 696 {
AzureIoTClient 56:8704100b3b54 697 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_113: [`instance->message_receiver` shall be closed using messagereceiver_close()]
AzureIoTClient 56:8704100b3b54 698 if (messagereceiver_close(instance->message_receiver) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 699 {
AzureIoTClient 56:8704100b3b54 700 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_114: [If messagereceiver_close() fails, it shall be logged and ignored]
AzureIoTClient 56:8704100b3b54 701 LogError("Failed closing the AMQP message receiver (this failure will be ignored).");
AzureIoTClient 56:8704100b3b54 702 }
AzureIoTClient 39:e98d5df6dc74 703
AzureIoTClient 56:8704100b3b54 704 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_115: [`instance->message_receiver` shall be destroyed using messagereceiver_destroy()]
AzureIoTClient 56:8704100b3b54 705 messagereceiver_destroy(instance->message_receiver);
AzureIoTClient 39:e98d5df6dc74 706
AzureIoTClient 56:8704100b3b54 707 instance->message_receiver = NULL;
AzureIoTClient 56:8704100b3b54 708 }
AzureIoTClient 39:e98d5df6dc74 709
AzureIoTClient 56:8704100b3b54 710 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 711 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 712 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
AzureIoTClient 45:c09fac270e60 713
AzureIoTClient 56:8704100b3b54 714 if (instance->receiver_link != NULL)
AzureIoTClient 56:8704100b3b54 715 {
AzureIoTClient 56:8704100b3b54 716 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_116: [`instance->receiver_link` shall be destroyed using link_destroy()]
AzureIoTClient 56:8704100b3b54 717 link_destroy(instance->receiver_link);
AzureIoTClient 56:8704100b3b54 718 instance->receiver_link = NULL;
AzureIoTClient 56:8704100b3b54 719 }
AzureIoTClient 39:e98d5df6dc74 720 }
AzureIoTClient 39:e98d5df6dc74 721
AzureIoTClient 39:e98d5df6dc74 722 static void on_message_receiver_state_changed_callback(const void* context, MESSAGE_RECEIVER_STATE new_state, MESSAGE_RECEIVER_STATE previous_state)
AzureIoTClient 39:e98d5df6dc74 723 {
AzureIoTClient 56:8704100b3b54 724 if (context == NULL)
AzureIoTClient 56:8704100b3b54 725 {
AzureIoTClient 56:8704100b3b54 726 LogError("on_message_receiver_state_changed_callback was invoked with a NULL context; although unexpected, this failure will be ignored");
AzureIoTClient 56:8704100b3b54 727 }
AzureIoTClient 56:8704100b3b54 728 else
AzureIoTClient 56:8704100b3b54 729 {
AzureIoTClient 56:8704100b3b54 730 if (new_state != previous_state)
AzureIoTClient 56:8704100b3b54 731 {
AzureIoTClient 56:8704100b3b54 732 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
AzureIoTClient 39:e98d5df6dc74 733
AzureIoTClient 56:8704100b3b54 734 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_105: [`new_state`, `previous_state` shall be saved into `instance->message_receiver_previous_state` and `instance->message_receiver_current_state`]
AzureIoTClient 56:8704100b3b54 735 instance->message_receiver_current_state = new_state;
AzureIoTClient 56:8704100b3b54 736 instance->message_receiver_previous_state = previous_state;
AzureIoTClient 56:8704100b3b54 737 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_106: [`instance->last_message_receiver_state_change_time` shall be set using get_time()]
AzureIoTClient 56:8704100b3b54 738 instance->last_message_receiver_state_change_time = get_time(NULL);
AzureIoTClient 39:e98d5df6dc74 739
AzureIoTClient 56:8704100b3b54 740 if (new_state == MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 56:8704100b3b54 741 {
AzureIoTClient 56:8704100b3b54 742 if (instance->config->on_subscription_changed_callback != NULL)
AzureIoTClient 56:8704100b3b54 743 {
AzureIoTClient 56:8704100b3b54 744 instance->config->on_subscription_changed_callback(instance->config->on_subscription_changed_context, true);
AzureIoTClient 56:8704100b3b54 745 }
AzureIoTClient 56:8704100b3b54 746 }
AzureIoTClient 56:8704100b3b54 747 else if (previous_state == MESSAGE_RECEIVER_STATE_OPEN && new_state != MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 56:8704100b3b54 748 {
AzureIoTClient 56:8704100b3b54 749 if (instance->config->on_subscription_changed_callback != NULL)
AzureIoTClient 56:8704100b3b54 750 {
AzureIoTClient 56:8704100b3b54 751 instance->config->on_subscription_changed_callback(instance->config->on_subscription_changed_context, false);
AzureIoTClient 56:8704100b3b54 752 }
AzureIoTClient 56:8704100b3b54 753 }
AzureIoTClient 56:8704100b3b54 754 }
AzureIoTClient 56:8704100b3b54 755 }
AzureIoTClient 39:e98d5df6dc74 756 }
AzureIoTClient 39:e98d5df6dc74 757
AzureIoTClient 39:e98d5df6dc74 758 static AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* create_message_disposition_info(AMQP_MESSENGER_INSTANCE* messenger)
AzureIoTClient 39:e98d5df6dc74 759 {
AzureIoTClient 56:8704100b3b54 760 AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* result;
AzureIoTClient 39:e98d5df6dc74 761
AzureIoTClient 56:8704100b3b54 762 if ((result = (AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO*)malloc(sizeof(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO))) == NULL)
AzureIoTClient 56:8704100b3b54 763 {
AzureIoTClient 56:8704100b3b54 764 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (malloc failed)");
AzureIoTClient 56:8704100b3b54 765 result = NULL;
AzureIoTClient 56:8704100b3b54 766 }
AzureIoTClient 56:8704100b3b54 767 else
AzureIoTClient 56:8704100b3b54 768 {
AzureIoTClient 56:8704100b3b54 769 delivery_number message_id;
AzureIoTClient 39:e98d5df6dc74 770
AzureIoTClient 56:8704100b3b54 771 if (messagereceiver_get_received_message_id(messenger->message_receiver, &message_id) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 772 {
AzureIoTClient 56:8704100b3b54 773 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_received_message_id failed)");
AzureIoTClient 56:8704100b3b54 774 free(result);
AzureIoTClient 56:8704100b3b54 775 result = NULL;
AzureIoTClient 56:8704100b3b54 776 }
AzureIoTClient 56:8704100b3b54 777 else
AzureIoTClient 56:8704100b3b54 778 {
AzureIoTClient 56:8704100b3b54 779 const char* link_name;
AzureIoTClient 39:e98d5df6dc74 780
AzureIoTClient 56:8704100b3b54 781 if (messagereceiver_get_link_name(messenger->message_receiver, &link_name) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 782 {
AzureIoTClient 56:8704100b3b54 783 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (messagereceiver_get_link_name failed)");
AzureIoTClient 56:8704100b3b54 784 free(result);
AzureIoTClient 56:8704100b3b54 785 result = NULL;
AzureIoTClient 56:8704100b3b54 786 }
AzureIoTClient 56:8704100b3b54 787 else if (mallocAndStrcpy_s(&result->source, link_name) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 788 {
AzureIoTClient 56:8704100b3b54 789 LogError("Failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO container (failed copying link name)");
AzureIoTClient 56:8704100b3b54 790 free(result);
AzureIoTClient 56:8704100b3b54 791 result = NULL;
AzureIoTClient 56:8704100b3b54 792 }
AzureIoTClient 56:8704100b3b54 793 else
AzureIoTClient 56:8704100b3b54 794 {
AzureIoTClient 56:8704100b3b54 795 result->message_id = message_id;
AzureIoTClient 56:8704100b3b54 796 }
AzureIoTClient 56:8704100b3b54 797 }
AzureIoTClient 56:8704100b3b54 798 }
AzureIoTClient 39:e98d5df6dc74 799
AzureIoTClient 56:8704100b3b54 800 return result;
AzureIoTClient 39:e98d5df6dc74 801 }
AzureIoTClient 39:e98d5df6dc74 802
AzureIoTClient 39:e98d5df6dc74 803 static void destroy_message_disposition_info(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
AzureIoTClient 39:e98d5df6dc74 804 {
AzureIoTClient 56:8704100b3b54 805 free(disposition_info->source);
AzureIoTClient 56:8704100b3b54 806 free(disposition_info);
AzureIoTClient 39:e98d5df6dc74 807 }
AzureIoTClient 39:e98d5df6dc74 808
AzureIoTClient 39:e98d5df6dc74 809 static AMQP_VALUE create_uamqp_disposition_result_from(AMQP_MESSENGER_DISPOSITION_RESULT disposition_result)
AzureIoTClient 39:e98d5df6dc74 810 {
AzureIoTClient 56:8704100b3b54 811 AMQP_VALUE uamqp_disposition_result;
AzureIoTClient 39:e98d5df6dc74 812
AzureIoTClient 56:8704100b3b54 813 if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_NONE)
AzureIoTClient 56:8704100b3b54 814 {
AzureIoTClient 56:8704100b3b54 815 uamqp_disposition_result = NULL; // intentionally not sending an answer.
AzureIoTClient 56:8704100b3b54 816 }
AzureIoTClient 56:8704100b3b54 817 else if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED)
AzureIoTClient 56:8704100b3b54 818 {
AzureIoTClient 56:8704100b3b54 819 uamqp_disposition_result = messaging_delivery_accepted();
AzureIoTClient 56:8704100b3b54 820 }
AzureIoTClient 56:8704100b3b54 821 else if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_RELEASED)
AzureIoTClient 56:8704100b3b54 822 {
AzureIoTClient 56:8704100b3b54 823 uamqp_disposition_result = messaging_delivery_released();
AzureIoTClient 56:8704100b3b54 824 }
AzureIoTClient 56:8704100b3b54 825 else // id est, if (disposition_result == AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED)
AzureIoTClient 56:8704100b3b54 826 {
AzureIoTClient 56:8704100b3b54 827 uamqp_disposition_result = messaging_delivery_rejected("Rejected by application", "Rejected by application");
AzureIoTClient 56:8704100b3b54 828 }
AzureIoTClient 39:e98d5df6dc74 829
AzureIoTClient 56:8704100b3b54 830 return uamqp_disposition_result;
AzureIoTClient 39:e98d5df6dc74 831 }
AzureIoTClient 39:e98d5df6dc74 832
AzureIoTClient 39:e98d5df6dc74 833 static AMQP_VALUE on_message_received_internal_callback(const void* context, MESSAGE_HANDLE message)
AzureIoTClient 39:e98d5df6dc74 834 {
AzureIoTClient 56:8704100b3b54 835 AMQP_VALUE result;
AzureIoTClient 56:8704100b3b54 836 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)context;
AzureIoTClient 56:8704100b3b54 837 AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* message_disposition_info;
AzureIoTClient 39:e98d5df6dc74 838
AzureIoTClient 56:8704100b3b54 839 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_107: [A AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance shall be created containing the source link name and message delivery ID]
AzureIoTClient 56:8704100b3b54 840 if ((message_disposition_info = create_message_disposition_info(instance)) == NULL)
AzureIoTClient 56:8704100b3b54 841 {
AzureIoTClient 56:8704100b3b54 842 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_108: [If the AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance fails to be created, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
AzureIoTClient 56:8704100b3b54 843 LogError("on_message_received_internal_callback failed (failed creating AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO).");
AzureIoTClient 56:8704100b3b54 844 result = messaging_delivery_released();
AzureIoTClient 56:8704100b3b54 845 }
AzureIoTClient 56:8704100b3b54 846 else
AzureIoTClient 56:8704100b3b54 847 {
AzureIoTClient 56:8704100b3b54 848 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_109: [`instance->on_message_received_callback` shall be invoked passing the `message` and AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO instance]
AzureIoTClient 56:8704100b3b54 849 AMQP_MESSENGER_DISPOSITION_RESULT disposition_result = instance->on_message_received_callback(message, message_disposition_info, instance->on_message_received_context);
AzureIoTClient 39:e98d5df6dc74 850
AzureIoTClient 56:8704100b3b54 851 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_110: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED, on_message_received_internal_callback shall return the result of messaging_delivery_accepted()]
AzureIoTClient 56:8704100b3b54 852 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_111: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_RELEASED, on_message_received_internal_callback shall return the result of messaging_delivery_released()]
AzureIoTClient 56:8704100b3b54 853 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_112: [If `instance->on_message_received_callback` returns AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED, on_message_received_internal_callback shall return the result of messaging_delivery_rejected()]
AzureIoTClient 56:8704100b3b54 854 result = create_uamqp_disposition_result_from(disposition_result);
AzureIoTClient 56:8704100b3b54 855 }
AzureIoTClient 39:e98d5df6dc74 856
AzureIoTClient 56:8704100b3b54 857 return result;
AzureIoTClient 39:e98d5df6dc74 858 }
AzureIoTClient 39:e98d5df6dc74 859
AzureIoTClient 39:e98d5df6dc74 860 static int create_message_receiver(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 861 {
AzureIoTClient 56:8704100b3b54 862 int result;
AzureIoTClient 39:e98d5df6dc74 863
AzureIoTClient 56:8704100b3b54 864 if ((instance->receiver_link = create_link(role_receiver,
AzureIoTClient 56:8704100b3b54 865 instance->session_handle, &instance->config->receive_link, instance->config->iothub_host_fqdn, instance->config->device_id, instance->config->module_id)) == NULL)
AzureIoTClient 56:8704100b3b54 866 {
AzureIoTClient 56:8704100b3b54 867 LogError("Failed creating the message receiver link");
AzureIoTClient 56:8704100b3b54 868 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 869 }
AzureIoTClient 56:8704100b3b54 870 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_101: [`instance->message_receiver` shall be created using messagereceiver_create(), passing the `instance->receiver_link` and `on_message_receiver_state_changed_callback`]
AzureIoTClient 56:8704100b3b54 871 else if ((instance->message_receiver = messagereceiver_create(instance->receiver_link, on_message_receiver_state_changed_callback, (void*)instance)) == NULL)
AzureIoTClient 56:8704100b3b54 872 {
AzureIoTClient 56:8704100b3b54 873 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_102: [If messagereceiver_create() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 56:8704100b3b54 874 LogError("Failed creating the message receiver (messagereceiver_create failed)");
AzureIoTClient 56:8704100b3b54 875 destroy_message_receiver(instance);
AzureIoTClient 56:8704100b3b54 876 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 877 }
AzureIoTClient 56:8704100b3b54 878 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_103: [`instance->message_receiver` shall be opened using messagereceiver_open() passing `on_message_received_internal_callback`]
AzureIoTClient 56:8704100b3b54 879 else if (messagereceiver_open(instance->message_receiver, on_message_received_internal_callback, (void*)instance) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 880 {
AzureIoTClient 56:8704100b3b54 881 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_104: [If messagereceiver_open() fails, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 56:8704100b3b54 882 LogError("Failed opening the AMQP message receiver.");
AzureIoTClient 56:8704100b3b54 883 destroy_message_receiver(instance);
AzureIoTClient 56:8704100b3b54 884 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 885 }
AzureIoTClient 56:8704100b3b54 886 else
AzureIoTClient 56:8704100b3b54 887 {
AzureIoTClient 56:8704100b3b54 888 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 889 }
AzureIoTClient 39:e98d5df6dc74 890
AzureIoTClient 56:8704100b3b54 891 return result;
AzureIoTClient 39:e98d5df6dc74 892 }
AzureIoTClient 39:e98d5df6dc74 893
AzureIoTClient 56:8704100b3b54 894 static void on_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_result, AMQP_VALUE delivery_state)
AzureIoTClient 39:e98d5df6dc74 895 {
AzureIoTClient 56:8704100b3b54 896 (void)delivery_state;
AzureIoTClient 56:8704100b3b54 897 if (context != NULL)
AzureIoTClient 56:8704100b3b54 898 {
AzureIoTClient 56:8704100b3b54 899 MESSAGE_QUEUE_RESULT mq_result;
AzureIoTClient 56:8704100b3b54 900 MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)context;
AzureIoTClient 39:e98d5df6dc74 901
AzureIoTClient 56:8704100b3b54 902 if (send_result == MESSAGE_SEND_OK)
AzureIoTClient 56:8704100b3b54 903 {
AzureIoTClient 56:8704100b3b54 904 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_121: [If no failure occurs, `context->on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_SUCCESS]
AzureIoTClient 56:8704100b3b54 905 mq_result = MESSAGE_QUEUE_SUCCESS;
AzureIoTClient 56:8704100b3b54 906 }
AzureIoTClient 56:8704100b3b54 907 else
AzureIoTClient 56:8704100b3b54 908 {
AzureIoTClient 56:8704100b3b54 909 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_122: [If a failure occurred, `context->on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_ERROR]
AzureIoTClient 56:8704100b3b54 910 mq_result = MESSAGE_QUEUE_ERROR;
AzureIoTClient 56:8704100b3b54 911 }
AzureIoTClient 39:e98d5df6dc74 912
AzureIoTClient 56:8704100b3b54 913 msg_ctx->on_process_message_completed_callback(msg_ctx->messenger->send_queue, (MQ_MESSAGE_HANDLE)msg_ctx->message, mq_result, NULL);
AzureIoTClient 56:8704100b3b54 914 }
AzureIoTClient 39:e98d5df6dc74 915 }
AzureIoTClient 39:e98d5df6dc74 916
AzureIoTClient 39:e98d5df6dc74 917 static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* context)
AzureIoTClient 39:e98d5df6dc74 918 {
AzureIoTClient 56:8704100b3b54 919 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_117: [If any argument is NULL, `on_process_message_callback` shall return immediatelly]
AzureIoTClient 56:8704100b3b54 920 if (message_queue == NULL || message == NULL || on_process_message_completed_callback == NULL || context == NULL)
AzureIoTClient 56:8704100b3b54 921 {
AzureIoTClient 56:8704100b3b54 922 LogError("Invalid argument (message_queue=%p, message=%p, on_process_message_completed_callback=%p, context=%p)", message_queue, message, on_process_message_completed_callback, context);
AzureIoTClient 56:8704100b3b54 923 }
AzureIoTClient 56:8704100b3b54 924 else
AzureIoTClient 56:8704100b3b54 925 {
AzureIoTClient 56:8704100b3b54 926 MESSAGE_SEND_CONTEXT* message_context = (MESSAGE_SEND_CONTEXT*)context;
AzureIoTClient 56:8704100b3b54 927 message_context->on_process_message_completed_callback = on_process_message_completed_callback;
AzureIoTClient 39:e98d5df6dc74 928
AzureIoTClient 56:8704100b3b54 929 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_118: [The MESSAGE_HANDLE shall be submitted for sending using messagesender_send(), passing `on_send_complete_callback`]
AzureIoTClient 56:8704100b3b54 930 if (messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0) == NULL)
AzureIoTClient 56:8704100b3b54 931 {
AzureIoTClient 56:8704100b3b54 932 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_119: [If messagesender_send() fails, `on_process_message_completed_callback` shall be invoked with result MESSAGE_QUEUE_ERROR]
AzureIoTClient 56:8704100b3b54 933 LogError("Failed sending AMQP message");
AzureIoTClient 56:8704100b3b54 934 on_process_message_completed_callback(message_queue, message, MESSAGE_QUEUE_ERROR, NULL);
AzureIoTClient 56:8704100b3b54 935 }
AzureIoTClient 39:e98d5df6dc74 936
AzureIoTClient 56:8704100b3b54 937 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_120: [The MESSAGE_HANDLE shall be destroyed using message_destroy() and marked as destroyed in the context provided]
AzureIoTClient 56:8704100b3b54 938 message_destroy((MESSAGE_HANDLE)message);
AzureIoTClient 56:8704100b3b54 939 message_context->is_destroyed = true;
AzureIoTClient 56:8704100b3b54 940 }
AzureIoTClient 39:e98d5df6dc74 941 }
AzureIoTClient 39:e98d5df6dc74 942
AzureIoTClient 39:e98d5df6dc74 943 static void on_message_processing_completed_callback(MQ_MESSAGE_HANDLE message, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason, void* message_context)
AzureIoTClient 39:e98d5df6dc74 944 {
AzureIoTClient 56:8704100b3b54 945 (void)reason;
AzureIoTClient 39:e98d5df6dc74 946
AzureIoTClient 56:8704100b3b54 947 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_020: [If `messenger_context` is NULL, `on_message_processing_completed_callback` shall return immediately]
AzureIoTClient 56:8704100b3b54 948 if (message_context == NULL)
AzureIoTClient 56:8704100b3b54 949 {
AzureIoTClient 56:8704100b3b54 950 LogError("on_message_processing_completed_callback invoked with NULL context");
AzureIoTClient 56:8704100b3b54 951 }
AzureIoTClient 56:8704100b3b54 952 else
AzureIoTClient 56:8704100b3b54 953 {
AzureIoTClient 56:8704100b3b54 954 MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)message_context;
AzureIoTClient 56:8704100b3b54 955 AMQP_MESSENGER_SEND_RESULT messenger_send_result;
AzureIoTClient 56:8704100b3b54 956 AMQP_MESSENGER_REASON messenger_send_reason;
AzureIoTClient 39:e98d5df6dc74 957
AzureIoTClient 56:8704100b3b54 958 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_021: [If `result` is MESSAGE_QUEUE_SUCCESS, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_SUCCESS and AMQP_MESSENGER_REASON_NONE]
AzureIoTClient 56:8704100b3b54 959 if (result == MESSAGE_QUEUE_SUCCESS)
AzureIoTClient 56:8704100b3b54 960 {
AzureIoTClient 56:8704100b3b54 961 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_SUCCESS;
AzureIoTClient 56:8704100b3b54 962 messenger_send_reason = AMQP_MESSENGER_REASON_NONE;
AzureIoTClient 39:e98d5df6dc74 963
AzureIoTClient 56:8704100b3b54 964 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [If `result` is MESSAGE_QUEUE_SUCCESS or MESSAGE_QUEUE_CANCELLED, `instance->send_error_count` shall be set to 0]
AzureIoTClient 56:8704100b3b54 965 msg_ctx->messenger->send_error_count = 0;
AzureIoTClient 56:8704100b3b54 966 }
AzureIoTClient 56:8704100b3b54 967 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_022: [If `result` is MESSAGE_QUEUE_TIMEOUT, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_ERROR and AMQP_MESSENGER_REASON_TIMEOUT]
AzureIoTClient 56:8704100b3b54 968 else if (result == MESSAGE_QUEUE_TIMEOUT)
AzureIoTClient 56:8704100b3b54 969 {
AzureIoTClient 56:8704100b3b54 970 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_ERROR;
AzureIoTClient 56:8704100b3b54 971 messenger_send_reason = AMQP_MESSENGER_REASON_TIMEOUT;
AzureIoTClient 39:e98d5df6dc74 972
AzureIoTClient 56:8704100b3b54 973 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [Otherwise `instance->send_error_count` shall be incremented by 1]
AzureIoTClient 56:8704100b3b54 974 msg_ctx->messenger->send_error_count++;
AzureIoTClient 56:8704100b3b54 975 }
AzureIoTClient 56:8704100b3b54 976 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_023: [If `result` is MESSAGE_QUEUE_CANCELLED and the messenger is STOPPED, `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_CANCELLED and AMQP_MESSENGER_REASON_MESSENGER_DESTROYED]
AzureIoTClient 56:8704100b3b54 977 else if (result == MESSAGE_QUEUE_CANCELLED && msg_ctx->messenger->state == AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 56:8704100b3b54 978 {
AzureIoTClient 56:8704100b3b54 979 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_CANCELLED;
AzureIoTClient 56:8704100b3b54 980 messenger_send_reason = AMQP_MESSENGER_REASON_MESSENGER_DESTROYED;
AzureIoTClient 39:e98d5df6dc74 981
AzureIoTClient 56:8704100b3b54 982 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_025: [If `result` is MESSAGE_QUEUE_SUCCESS or MESSAGE_QUEUE_CANCELLED, `instance->send_error_count` shall be set to 0]
AzureIoTClient 56:8704100b3b54 983 msg_ctx->messenger->send_error_count = 0;
AzureIoTClient 56:8704100b3b54 984 }
AzureIoTClient 56:8704100b3b54 985 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_024: [Otherwise `send_ctx->on_send_complete_callback` shall be invoked with AMQP_MESSENGER_SEND_RESULT_ERROR and AMQP_MESSENGER_REASON_FAIL_SENDING]
AzureIoTClient 56:8704100b3b54 986 else
AzureIoTClient 56:8704100b3b54 987 {
AzureIoTClient 56:8704100b3b54 988 messenger_send_result = AMQP_MESSENGER_SEND_RESULT_ERROR;
AzureIoTClient 56:8704100b3b54 989 messenger_send_reason = AMQP_MESSENGER_REASON_FAIL_SENDING;
AzureIoTClient 39:e98d5df6dc74 990
AzureIoTClient 56:8704100b3b54 991 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_026: [Otherwise `instance->send_error_count` shall be incremented by 1]
AzureIoTClient 56:8704100b3b54 992 msg_ctx->messenger->send_error_count++;
AzureIoTClient 56:8704100b3b54 993 }
AzureIoTClient 39:e98d5df6dc74 994
AzureIoTClient 56:8704100b3b54 995 if (msg_ctx->on_send_complete_callback != NULL)
AzureIoTClient 56:8704100b3b54 996 {
AzureIoTClient 56:8704100b3b54 997 msg_ctx->on_send_complete_callback(messenger_send_result, messenger_send_reason, msg_ctx->user_context);
AzureIoTClient 56:8704100b3b54 998 }
AzureIoTClient 39:e98d5df6dc74 999
AzureIoTClient 56:8704100b3b54 1000 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_027: [If `message` has not been destroyed, it shall be destroyed using message_destroy()]
AzureIoTClient 56:8704100b3b54 1001 if (!msg_ctx->is_destroyed)
AzureIoTClient 56:8704100b3b54 1002 {
AzureIoTClient 56:8704100b3b54 1003 message_destroy((MESSAGE_HANDLE)message);
AzureIoTClient 56:8704100b3b54 1004 }
AzureIoTClient 39:e98d5df6dc74 1005
AzureIoTClient 56:8704100b3b54 1006 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_028: [`send_ctx` shall be destroyed]
AzureIoTClient 56:8704100b3b54 1007 destroy_message_send_context(msg_ctx);
AzureIoTClient 56:8704100b3b54 1008 }
AzureIoTClient 39:e98d5df6dc74 1009 }
AzureIoTClient 39:e98d5df6dc74 1010
AzureIoTClient 39:e98d5df6dc74 1011
AzureIoTClient 39:e98d5df6dc74 1012 // ---------- Set/Retrieve Options Helpers ----------//
AzureIoTClient 39:e98d5df6dc74 1013
AzureIoTClient 39:e98d5df6dc74 1014 static void* amqp_messenger_clone_option(const char* name, const void* value)
AzureIoTClient 39:e98d5df6dc74 1015 {
AzureIoTClient 56:8704100b3b54 1016 void* result;
AzureIoTClient 39:e98d5df6dc74 1017
AzureIoTClient 56:8704100b3b54 1018 if (name == NULL || value == NULL)
AzureIoTClient 56:8704100b3b54 1019 {
AzureIoTClient 56:8704100b3b54 1020 LogError("invalid argument (name=%p, value=%p)", name, value);
AzureIoTClient 56:8704100b3b54 1021 result = NULL;
AzureIoTClient 56:8704100b3b54 1022 }
AzureIoTClient 56:8704100b3b54 1023 else
AzureIoTClient 56:8704100b3b54 1024 {
AzureIoTClient 56:8704100b3b54 1025 if (strcmp(MESSENGER_SAVED_MQ_OPTIONS, name) == 0)
AzureIoTClient 56:8704100b3b54 1026 {
AzureIoTClient 56:8704100b3b54 1027 if ((result = (void*)OptionHandler_Clone((OPTIONHANDLER_HANDLE)value)) == NULL)
AzureIoTClient 56:8704100b3b54 1028 {
AzureIoTClient 56:8704100b3b54 1029 LogError("failed cloning option '%s'", name);
AzureIoTClient 56:8704100b3b54 1030 }
AzureIoTClient 56:8704100b3b54 1031 }
AzureIoTClient 56:8704100b3b54 1032 else
AzureIoTClient 56:8704100b3b54 1033 {
AzureIoTClient 56:8704100b3b54 1034 LogError("Failed to clone messenger option (option with name '%s' is not suppported)", name);
AzureIoTClient 56:8704100b3b54 1035 result = NULL;
AzureIoTClient 56:8704100b3b54 1036 }
AzureIoTClient 56:8704100b3b54 1037 }
AzureIoTClient 39:e98d5df6dc74 1038
AzureIoTClient 56:8704100b3b54 1039 return result;
AzureIoTClient 39:e98d5df6dc74 1040 }
AzureIoTClient 39:e98d5df6dc74 1041
AzureIoTClient 39:e98d5df6dc74 1042 static void amqp_messenger_destroy_option(const char* name, const void* value)
AzureIoTClient 39:e98d5df6dc74 1043 {
AzureIoTClient 56:8704100b3b54 1044 if (name == NULL || value == NULL)
AzureIoTClient 56:8704100b3b54 1045 {
AzureIoTClient 56:8704100b3b54 1046 LogError("invalid argument (name=%p, value=%p)", name, value);
AzureIoTClient 56:8704100b3b54 1047 }
AzureIoTClient 56:8704100b3b54 1048 else if (strcmp(MESSENGER_SAVED_MQ_OPTIONS, name) == 0)
AzureIoTClient 56:8704100b3b54 1049 {
AzureIoTClient 56:8704100b3b54 1050 OptionHandler_Destroy((OPTIONHANDLER_HANDLE)value);
AzureIoTClient 56:8704100b3b54 1051 }
AzureIoTClient 56:8704100b3b54 1052 else
AzureIoTClient 56:8704100b3b54 1053 {
AzureIoTClient 56:8704100b3b54 1054 LogError("invalid argument (option '%s' is not suppported)", name);
AzureIoTClient 56:8704100b3b54 1055 }
AzureIoTClient 39:e98d5df6dc74 1056 }
AzureIoTClient 39:e98d5df6dc74 1057
AzureIoTClient 39:e98d5df6dc74 1058
AzureIoTClient 39:e98d5df6dc74 1059 // Public API:
AzureIoTClient 39:e98d5df6dc74 1060
AzureIoTClient 39:e98d5df6dc74 1061 int amqp_messenger_subscribe_for_messages(AMQP_MESSENGER_HANDLE messenger_handle, ON_AMQP_MESSENGER_MESSAGE_RECEIVED on_message_received_callback, void* context)
AzureIoTClient 39:e98d5df6dc74 1062 {
AzureIoTClient 56:8704100b3b54 1063 int result;
AzureIoTClient 56:8704100b3b54 1064
AzureIoTClient 56:8704100b3b54 1065 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_035: [If `messenger_handle` or `on_message_received_callback` are NULL, amqp_messenger_subscribe_for_messages() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1066 if (messenger_handle == NULL || on_message_received_callback == NULL || context == NULL)
AzureIoTClient 56:8704100b3b54 1067 {
AzureIoTClient 56:8704100b3b54 1068 LogError("Invalid argument (messenger_handle=%p, on_message_received_callback=%p, context=%p)", messenger_handle, on_message_received_callback, context);
AzureIoTClient 56:8704100b3b54 1069 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1070 }
AzureIoTClient 56:8704100b3b54 1071 else
AzureIoTClient 56:8704100b3b54 1072 {
AzureIoTClient 56:8704100b3b54 1073 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1074
AzureIoTClient 56:8704100b3b54 1075 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_036: [`on_message_received_callback` and `context` shall be saved on `instance->on_message_received_callback`]
AzureIoTClient 56:8704100b3b54 1076 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_037: [amqp_messenger_subscribe_for_messages() shall set `instance->receive_messages` to true]
AzureIoTClient 56:8704100b3b54 1077 instance->on_message_received_callback = on_message_received_callback;
AzureIoTClient 56:8704100b3b54 1078 instance->on_message_received_context = context;
AzureIoTClient 56:8704100b3b54 1079 instance->receive_messages = true;
AzureIoTClient 39:e98d5df6dc74 1080
AzureIoTClient 56:8704100b3b54 1081 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_038: [If no failures occurr, amqp_messenger_subscribe_for_messages() shall return 0]
AzureIoTClient 56:8704100b3b54 1082 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1083 }
AzureIoTClient 39:e98d5df6dc74 1084
AzureIoTClient 56:8704100b3b54 1085 return result;
AzureIoTClient 39:e98d5df6dc74 1086 }
AzureIoTClient 39:e98d5df6dc74 1087
AzureIoTClient 39:e98d5df6dc74 1088 int amqp_messenger_unsubscribe_for_messages(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1089 {
AzureIoTClient 56:8704100b3b54 1090 int result;
AzureIoTClient 39:e98d5df6dc74 1091
AzureIoTClient 56:8704100b3b54 1092 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_039: [If `messenger_handle` is NULL, amqp_messenger_unsubscribe_for_messages() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1093 if (messenger_handle == NULL)
AzureIoTClient 56:8704100b3b54 1094 {
AzureIoTClient 56:8704100b3b54 1095 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 56:8704100b3b54 1096 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1097 }
AzureIoTClient 56:8704100b3b54 1098 else
AzureIoTClient 56:8704100b3b54 1099 {
AzureIoTClient 56:8704100b3b54 1100 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1101
AzureIoTClient 56:8704100b3b54 1102 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_040: [`instance->receive_messages` shall be saved to false]
AzureIoTClient 56:8704100b3b54 1103 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_041: [`instance->on_message_received_callback` and `instance->on_message_received_context` shall be set to NULL]
AzureIoTClient 56:8704100b3b54 1104 instance->receive_messages = false;
AzureIoTClient 56:8704100b3b54 1105 instance->on_message_received_callback = NULL;
AzureIoTClient 56:8704100b3b54 1106 instance->on_message_received_context = NULL;
AzureIoTClient 39:e98d5df6dc74 1107
AzureIoTClient 56:8704100b3b54 1108 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_042: [If no failures occurr, amqp_messenger_unsubscribe_for_messages() shall return 0]
AzureIoTClient 56:8704100b3b54 1109 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1110 }
AzureIoTClient 39:e98d5df6dc74 1111
AzureIoTClient 56:8704100b3b54 1112 return result;
AzureIoTClient 39:e98d5df6dc74 1113 }
AzureIoTClient 39:e98d5df6dc74 1114
AzureIoTClient 39:e98d5df6dc74 1115 int amqp_messenger_send_message_disposition(AMQP_MESSENGER_HANDLE messenger_handle, AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, AMQP_MESSENGER_DISPOSITION_RESULT disposition_result)
AzureIoTClient 39:e98d5df6dc74 1116 {
AzureIoTClient 56:8704100b3b54 1117 int result;
AzureIoTClient 39:e98d5df6dc74 1118
AzureIoTClient 56:8704100b3b54 1119 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_043: [If `messenger_handle` or `disposition_info` are NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1120 if (messenger_handle == NULL || disposition_info == NULL)
AzureIoTClient 56:8704100b3b54 1121 {
AzureIoTClient 56:8704100b3b54 1122 LogError("Invalid argument (messenger_handle=%p, disposition_info=%p)", messenger_handle, disposition_info);
AzureIoTClient 56:8704100b3b54 1123 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1124 }
AzureIoTClient 56:8704100b3b54 1125 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_044: [If `disposition_info->source` is NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1126 else if (disposition_info->source == NULL)
AzureIoTClient 56:8704100b3b54 1127 {
AzureIoTClient 56:8704100b3b54 1128 LogError("Failed sending message disposition (disposition_info->source is NULL)");
AzureIoTClient 56:8704100b3b54 1129 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1130 }
AzureIoTClient 56:8704100b3b54 1131 else
AzureIoTClient 56:8704100b3b54 1132 {
AzureIoTClient 56:8704100b3b54 1133 AMQP_MESSENGER_INSTANCE* messenger = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1134
AzureIoTClient 56:8704100b3b54 1135 if (messenger->message_receiver == NULL)
AzureIoTClient 56:8704100b3b54 1136 {
AzureIoTClient 56:8704100b3b54 1137 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_045: [If `messenger_handle->message_receiver` is NULL, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1138 LogError("Failed sending message disposition (message_receiver is not created; check if it is subscribed)");
AzureIoTClient 56:8704100b3b54 1139 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1140 }
AzureIoTClient 56:8704100b3b54 1141 else
AzureIoTClient 56:8704100b3b54 1142 {
AzureIoTClient 56:8704100b3b54 1143 AMQP_VALUE uamqp_disposition_result;
AzureIoTClient 39:e98d5df6dc74 1144
AzureIoTClient 56:8704100b3b54 1145 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_046: [An AMQP_VALUE disposition result shall be created corresponding to the `disposition_result` provided]
AzureIoTClient 56:8704100b3b54 1146 if ((uamqp_disposition_result = create_uamqp_disposition_result_from(disposition_result)) == NULL)
AzureIoTClient 56:8704100b3b54 1147 {
AzureIoTClient 56:8704100b3b54 1148 LogError("Failed sending message disposition (disposition result %d is not supported)", disposition_result);
AzureIoTClient 56:8704100b3b54 1149 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1150 }
AzureIoTClient 56:8704100b3b54 1151 else
AzureIoTClient 56:8704100b3b54 1152 {
AzureIoTClient 56:8704100b3b54 1153 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_047: [`messagereceiver_send_message_disposition()` shall be invoked passing `disposition_info->source`, `disposition_info->message_id` and the AMQP_VALUE disposition result]
AzureIoTClient 56:8704100b3b54 1154 if (messagereceiver_send_message_disposition(messenger->message_receiver, disposition_info->source, disposition_info->message_id, uamqp_disposition_result) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1155 {
AzureIoTClient 56:8704100b3b54 1156 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_048: [If `messagereceiver_send_message_disposition()` fails, amqp_messenger_send_message_disposition() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1157 LogError("Failed sending message disposition (messagereceiver_send_message_disposition failed)");
AzureIoTClient 56:8704100b3b54 1158 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1159 }
AzureIoTClient 56:8704100b3b54 1160 else
AzureIoTClient 56:8704100b3b54 1161 {
AzureIoTClient 56:8704100b3b54 1162 destroy_message_disposition_info(disposition_info);
AzureIoTClient 39:e98d5df6dc74 1163
AzureIoTClient 56:8704100b3b54 1164 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_050: [If no failures occurr, amqp_messenger_send_message_disposition() shall return 0]
AzureIoTClient 56:8704100b3b54 1165 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1166 }
AzureIoTClient 39:e98d5df6dc74 1167
AzureIoTClient 56:8704100b3b54 1168 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_049: [amqp_messenger_send_message_disposition() shall destroy the AMQP_VALUE disposition result]
AzureIoTClient 56:8704100b3b54 1169 amqpvalue_destroy(uamqp_disposition_result);
AzureIoTClient 56:8704100b3b54 1170 }
AzureIoTClient 56:8704100b3b54 1171 }
AzureIoTClient 56:8704100b3b54 1172 }
AzureIoTClient 39:e98d5df6dc74 1173
AzureIoTClient 56:8704100b3b54 1174 return result;
AzureIoTClient 39:e98d5df6dc74 1175 }
AzureIoTClient 39:e98d5df6dc74 1176
AzureIoTClient 39:e98d5df6dc74 1177 int amqp_messenger_send_async(AMQP_MESSENGER_HANDLE messenger_handle, MESSAGE_HANDLE message, AMQP_MESSENGER_SEND_COMPLETE_CALLBACK on_user_defined_send_complete_callback, void* user_context)
AzureIoTClient 39:e98d5df6dc74 1178 {
AzureIoTClient 56:8704100b3b54 1179 int result;
AzureIoTClient 39:e98d5df6dc74 1180
AzureIoTClient 56:8704100b3b54 1181 if (messenger_handle == NULL || message == NULL || on_user_defined_send_complete_callback == NULL)
AzureIoTClient 56:8704100b3b54 1182 {
AzureIoTClient 56:8704100b3b54 1183 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_010: [If `messenger_handle`, `message` or `on_event_send_complete_callback` are NULL, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1184 LogError("Invalid argument (messenger_handle=%p, message=%p, on_user_defined_send_complete_callback=%p)", messenger_handle, message, on_user_defined_send_complete_callback);
AzureIoTClient 56:8704100b3b54 1185 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1186 }
AzureIoTClient 56:8704100b3b54 1187 else
AzureIoTClient 56:8704100b3b54 1188 {
AzureIoTClient 56:8704100b3b54 1189 MESSAGE_HANDLE cloned_message;
AzureIoTClient 39:e98d5df6dc74 1190
AzureIoTClient 56:8704100b3b54 1191 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_011: [`message` shall be cloned using message_clone()]
AzureIoTClient 56:8704100b3b54 1192 if ((cloned_message = message_clone(message)) == NULL)
AzureIoTClient 56:8704100b3b54 1193 {
AzureIoTClient 56:8704100b3b54 1194 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_012: [If message_clone() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1195 LogError("Failed cloning AMQP message");
AzureIoTClient 56:8704100b3b54 1196 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1197 }
AzureIoTClient 56:8704100b3b54 1198 else
AzureIoTClient 56:8704100b3b54 1199 {
AzureIoTClient 56:8704100b3b54 1200 MESSAGE_SEND_CONTEXT* message_context;
AzureIoTClient 56:8704100b3b54 1201 AMQP_MESSENGER_INSTANCE *instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1202
AzureIoTClient 56:8704100b3b54 1203 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_013: [amqp_messenger_send_async() shall allocate memory for a MESSAGE_SEND_CONTEXT structure (aka `send_ctx`)]
AzureIoTClient 56:8704100b3b54 1204 if ((message_context = create_message_send_context()) == NULL)
AzureIoTClient 56:8704100b3b54 1205 {
AzureIoTClient 56:8704100b3b54 1206 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_014: [If malloc() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1207 LogError("Failed creating context for sending message");
AzureIoTClient 56:8704100b3b54 1208 message_destroy(cloned_message);
AzureIoTClient 56:8704100b3b54 1209 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1210 }
AzureIoTClient 56:8704100b3b54 1211 else
AzureIoTClient 56:8704100b3b54 1212 {
AzureIoTClient 56:8704100b3b54 1213 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_015: [The cloned `message`, callback and context shall be saved in ``send_ctx`]
AzureIoTClient 56:8704100b3b54 1214 message_context->message = cloned_message;
AzureIoTClient 56:8704100b3b54 1215 message_context->messenger = instance;
AzureIoTClient 56:8704100b3b54 1216 message_context->on_send_complete_callback = on_user_defined_send_complete_callback;
AzureIoTClient 56:8704100b3b54 1217 message_context->user_context = user_context;
AzureIoTClient 39:e98d5df6dc74 1218
AzureIoTClient 56:8704100b3b54 1219 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_016: [`send_ctx` shall be added to `instance->send_queue` using message_queue_add(), passing `on_message_processing_completed_callback`]
AzureIoTClient 56:8704100b3b54 1220 if (message_queue_add(instance->send_queue, (MQ_MESSAGE_HANDLE)cloned_message, on_message_processing_completed_callback, (void*)message_context) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1221 {
AzureIoTClient 56:8704100b3b54 1222 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_017: [If message_queue_add() fails, amqp_messenger_send_async() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1223 LogError("Failed adding message to send queue");
AzureIoTClient 56:8704100b3b54 1224 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_018: [If any failure occurs, amqp_messenger_send_async() shall free any memory it has allocated]
AzureIoTClient 56:8704100b3b54 1225 destroy_message_send_context(message_context);
AzureIoTClient 56:8704100b3b54 1226 message_destroy(cloned_message);
AzureIoTClient 56:8704100b3b54 1227 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1228 }
AzureIoTClient 56:8704100b3b54 1229 else
AzureIoTClient 56:8704100b3b54 1230 {
AzureIoTClient 56:8704100b3b54 1231 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_019: [If no failures occur, amqp_messenger_send_async() shall return zero]
AzureIoTClient 56:8704100b3b54 1232 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1233 }
AzureIoTClient 56:8704100b3b54 1234 }
AzureIoTClient 56:8704100b3b54 1235 }
AzureIoTClient 56:8704100b3b54 1236 }
AzureIoTClient 39:e98d5df6dc74 1237
AzureIoTClient 56:8704100b3b54 1238 return result;
AzureIoTClient 39:e98d5df6dc74 1239 }
AzureIoTClient 39:e98d5df6dc74 1240
AzureIoTClient 39:e98d5df6dc74 1241 int amqp_messenger_get_send_status(AMQP_MESSENGER_HANDLE messenger_handle, AMQP_MESSENGER_SEND_STATUS* send_status)
AzureIoTClient 39:e98d5df6dc74 1242 {
AzureIoTClient 56:8704100b3b54 1243 int result;
AzureIoTClient 39:e98d5df6dc74 1244
AzureIoTClient 56:8704100b3b54 1245 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_029: [If `messenger_handle` or `send_status` are NULL, amqp_messenger_get_send_status() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1246 if (messenger_handle == NULL || send_status == NULL)
AzureIoTClient 56:8704100b3b54 1247 {
AzureIoTClient 56:8704100b3b54 1248 LogError("Invalid argument (messenger_handle=%p, send_status=%p)", messenger_handle, send_status);
AzureIoTClient 56:8704100b3b54 1249 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1250 }
AzureIoTClient 56:8704100b3b54 1251 else
AzureIoTClient 56:8704100b3b54 1252 {
AzureIoTClient 56:8704100b3b54 1253 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 56:8704100b3b54 1254 bool is_empty;
AzureIoTClient 39:e98d5df6dc74 1255
AzureIoTClient 56:8704100b3b54 1256 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_030: [message_queue_is_empty() shall be invoked for `instance->send_queue`]
AzureIoTClient 56:8704100b3b54 1257 if (message_queue_is_empty(instance->send_queue, &is_empty) != 0)
AzureIoTClient 56:8704100b3b54 1258 {
AzureIoTClient 56:8704100b3b54 1259 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_031: [If message_queue_is_empty() fails, amqp_messenger_get_send_status() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1260 LogError("Failed verifying if send queue is empty");
AzureIoTClient 56:8704100b3b54 1261 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1262 }
AzureIoTClient 56:8704100b3b54 1263 else
AzureIoTClient 56:8704100b3b54 1264 {
AzureIoTClient 56:8704100b3b54 1265 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_032: [If message_queue_is_empty() returns `true`, `send_status` shall be set to AMQP_MESSENGER_SEND_STATUS_IDLE]
AzureIoTClient 56:8704100b3b54 1266 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_033: [Otherwise, `send_status` shall be set to AMQP_MESSENGER_SEND_STATUS_BUSY]
AzureIoTClient 56:8704100b3b54 1267 *send_status = (is_empty ? AMQP_MESSENGER_SEND_STATUS_IDLE : AMQP_MESSENGER_SEND_STATUS_BUSY);
AzureIoTClient 56:8704100b3b54 1268 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_034: [If no failures occur, amqp_messenger_get_send_status() shall return 0]
AzureIoTClient 56:8704100b3b54 1269 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1270 }
AzureIoTClient 56:8704100b3b54 1271 }
AzureIoTClient 39:e98d5df6dc74 1272
AzureIoTClient 56:8704100b3b54 1273 return result;
AzureIoTClient 39:e98d5df6dc74 1274 }
AzureIoTClient 39:e98d5df6dc74 1275
AzureIoTClient 39:e98d5df6dc74 1276 int amqp_messenger_start(AMQP_MESSENGER_HANDLE messenger_handle, SESSION_HANDLE session_handle)
AzureIoTClient 39:e98d5df6dc74 1277 {
AzureIoTClient 56:8704100b3b54 1278 int result;
AzureIoTClient 39:e98d5df6dc74 1279
AzureIoTClient 56:8704100b3b54 1280 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_051: [If `messenger_handle` or `session_handle` are NULL, amqp_messenger_start() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1281 if (messenger_handle == NULL || session_handle == NULL)
AzureIoTClient 56:8704100b3b54 1282 {
AzureIoTClient 56:8704100b3b54 1283 LogError("Invalid argument (session_handle is NULL)");
AzureIoTClient 56:8704100b3b54 1284 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1285 }
AzureIoTClient 56:8704100b3b54 1286 else
AzureIoTClient 56:8704100b3b54 1287 {
AzureIoTClient 56:8704100b3b54 1288 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1289
AzureIoTClient 56:8704100b3b54 1290 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_052: [If `instance->state` is not AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_start() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1291 if (instance->state != AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 56:8704100b3b54 1292 {
AzureIoTClient 56:8704100b3b54 1293 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1294 LogError("amqp_messenger_start failed (current state is %d; expected AMQP_MESSENGER_STATE_STOPPED)", instance->state);
AzureIoTClient 56:8704100b3b54 1295 }
AzureIoTClient 56:8704100b3b54 1296 else
AzureIoTClient 56:8704100b3b54 1297 {
AzureIoTClient 56:8704100b3b54 1298 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_053: [`session_handle` shall be saved on `instance->session_handle`]
AzureIoTClient 56:8704100b3b54 1299 instance->session_handle = session_handle;
AzureIoTClient 39:e98d5df6dc74 1300
AzureIoTClient 56:8704100b3b54 1301 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_054: [If no failures occurr, `instance->state` shall be set to AMQP_MESSENGER_STATE_STARTING, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 56:8704100b3b54 1302 update_messenger_state(instance, AMQP_MESSENGER_STATE_STARTING);
AzureIoTClient 39:e98d5df6dc74 1303
AzureIoTClient 56:8704100b3b54 1304 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_055: [If no failures occurr, amqp_messenger_start() shall return 0]
AzureIoTClient 56:8704100b3b54 1305 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1306 }
AzureIoTClient 56:8704100b3b54 1307 }
AzureIoTClient 39:e98d5df6dc74 1308
AzureIoTClient 56:8704100b3b54 1309 return result;
AzureIoTClient 39:e98d5df6dc74 1310 }
AzureIoTClient 39:e98d5df6dc74 1311
AzureIoTClient 39:e98d5df6dc74 1312 int amqp_messenger_stop(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1313 {
AzureIoTClient 56:8704100b3b54 1314 int result;
AzureIoTClient 39:e98d5df6dc74 1315
AzureIoTClient 56:8704100b3b54 1316 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_056: [If `messenger_handle` is NULL, amqp_messenger_stop() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1317 if (messenger_handle == NULL)
AzureIoTClient 56:8704100b3b54 1318 {
AzureIoTClient 56:8704100b3b54 1319 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1320 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 56:8704100b3b54 1321 }
AzureIoTClient 56:8704100b3b54 1322 else
AzureIoTClient 56:8704100b3b54 1323 {
AzureIoTClient 56:8704100b3b54 1324 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1325
AzureIoTClient 56:8704100b3b54 1326 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_057: [If `instance->state` is AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_stop() shall fail and return non-zero value]
AzureIoTClient 56:8704100b3b54 1327 if (instance->state == AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 56:8704100b3b54 1328 {
AzureIoTClient 56:8704100b3b54 1329 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1330 LogError("amqp_messenger_stop failed (messenger is already stopped)");
AzureIoTClient 56:8704100b3b54 1331 }
AzureIoTClient 56:8704100b3b54 1332 else
AzureIoTClient 56:8704100b3b54 1333 {
AzureIoTClient 56:8704100b3b54 1334 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_058: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPING, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 56:8704100b3b54 1335 update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPING);
AzureIoTClient 39:e98d5df6dc74 1336
AzureIoTClient 56:8704100b3b54 1337 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_059: [`instance->message_sender` and `instance->message_receiver` shall be destroyed along with all its links]
AzureIoTClient 56:8704100b3b54 1338 destroy_message_sender(instance);
AzureIoTClient 56:8704100b3b54 1339 destroy_message_receiver(instance);
AzureIoTClient 39:e98d5df6dc74 1340
AzureIoTClient 56:8704100b3b54 1341 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_060: [`instance->send_queue` items shall be rolled back using message_queue_move_all_back_to_pending()]
AzureIoTClient 56:8704100b3b54 1342 if (message_queue_move_all_back_to_pending(instance->send_queue) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1343 {
AzureIoTClient 56:8704100b3b54 1344 LogError("Messenger failed to move events in progress back to wait_to_send list");
AzureIoTClient 39:e98d5df6dc74 1345
AzureIoTClient 56:8704100b3b54 1346 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_061: [If message_queue_move_all_back_to_pending() fails, amqp_messenger_stop() shall change the messenger state to AMQP_MESSENGER_STATE_ERROR and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1347 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1348 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1349 }
AzureIoTClient 56:8704100b3b54 1350 else
AzureIoTClient 56:8704100b3b54 1351 {
AzureIoTClient 56:8704100b3b54 1352 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_062: [`instance->state` shall be set to AMQP_MESSENGER_STATE_STOPPED, and `instance->on_state_changed_callback` invoked if provided]
AzureIoTClient 56:8704100b3b54 1353 update_messenger_state(instance, AMQP_MESSENGER_STATE_STOPPED);
AzureIoTClient 56:8704100b3b54 1354 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_063: [If no failures occurr, amqp_messenger_stop() shall return 0]
AzureIoTClient 56:8704100b3b54 1355 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1356 }
AzureIoTClient 56:8704100b3b54 1357 }
AzureIoTClient 56:8704100b3b54 1358 }
AzureIoTClient 39:e98d5df6dc74 1359
AzureIoTClient 56:8704100b3b54 1360 return result;
AzureIoTClient 39:e98d5df6dc74 1361 }
AzureIoTClient 39:e98d5df6dc74 1362
AzureIoTClient 39:e98d5df6dc74 1363 // @brief
AzureIoTClient 39:e98d5df6dc74 1364 // Sets the messenger module state based on the state changes from messagesender and messagereceiver
AzureIoTClient 39:e98d5df6dc74 1365 static void process_state_changes(AMQP_MESSENGER_INSTANCE* instance)
AzureIoTClient 39:e98d5df6dc74 1366 {
AzureIoTClient 56:8704100b3b54 1367 // Note: messagesender and messagereceiver are still not created or already destroyed
AzureIoTClient 56:8704100b3b54 1368 // when state is AMQP_MESSENGER_STATE_STOPPED, so no checking is needed there.
AzureIoTClient 39:e98d5df6dc74 1369
AzureIoTClient 56:8704100b3b54 1370 if (instance->state == AMQP_MESSENGER_STATE_STARTED)
AzureIoTClient 56:8704100b3b54 1371 {
AzureIoTClient 56:8704100b3b54 1372 if (instance->message_sender_current_state != MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 56:8704100b3b54 1373 {
AzureIoTClient 56:8704100b3b54 1374 LogError("messagesender reported unexpected state %d while messenger was started", instance->message_sender_current_state);
AzureIoTClient 56:8704100b3b54 1375 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1376 }
AzureIoTClient 56:8704100b3b54 1377 else if (instance->message_receiver != NULL && instance->message_receiver_current_state != MESSAGE_RECEIVER_STATE_OPEN)
AzureIoTClient 56:8704100b3b54 1378 {
AzureIoTClient 56:8704100b3b54 1379 if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_OPENING)
AzureIoTClient 56:8704100b3b54 1380 {
AzureIoTClient 56:8704100b3b54 1381 bool is_timed_out;
AzureIoTClient 56:8704100b3b54 1382 if (is_timeout_reached(instance->last_message_receiver_state_change_time, MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1383 {
AzureIoTClient 56:8704100b3b54 1384 LogError("messenger got an error (failed to verify messagereceiver start timeout)");
AzureIoTClient 56:8704100b3b54 1385 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1386 }
AzureIoTClient 56:8704100b3b54 1387 else if (is_timed_out)
AzureIoTClient 56:8704100b3b54 1388 {
AzureIoTClient 56:8704100b3b54 1389 LogError("messenger got an error (messagereceiver failed to start within expected timeout (%d secs))", MAX_MESSAGE_RECEIVER_STATE_CHANGE_TIMEOUT_SECS);
AzureIoTClient 56:8704100b3b54 1390 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1391 }
AzureIoTClient 56:8704100b3b54 1392 }
AzureIoTClient 56:8704100b3b54 1393 else if (instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_ERROR ||
AzureIoTClient 56:8704100b3b54 1394 instance->message_receiver_current_state == MESSAGE_RECEIVER_STATE_IDLE)
AzureIoTClient 56:8704100b3b54 1395 {
AzureIoTClient 56:8704100b3b54 1396 LogError("messagereceiver reported unexpected state %d while messenger is starting", instance->message_receiver_current_state);
AzureIoTClient 56:8704100b3b54 1397 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1398 }
AzureIoTClient 56:8704100b3b54 1399 }
AzureIoTClient 56:8704100b3b54 1400 }
AzureIoTClient 56:8704100b3b54 1401 else
AzureIoTClient 56:8704100b3b54 1402 {
AzureIoTClient 56:8704100b3b54 1403 if (instance->state == AMQP_MESSENGER_STATE_STARTING)
AzureIoTClient 56:8704100b3b54 1404 {
AzureIoTClient 56:8704100b3b54 1405 if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 56:8704100b3b54 1406 {
AzureIoTClient 56:8704100b3b54 1407 update_messenger_state(instance, AMQP_MESSENGER_STATE_STARTED);
AzureIoTClient 56:8704100b3b54 1408 }
AzureIoTClient 56:8704100b3b54 1409 else if (instance->message_sender_current_state == MESSAGE_SENDER_STATE_OPENING)
AzureIoTClient 56:8704100b3b54 1410 {
AzureIoTClient 56:8704100b3b54 1411 bool is_timed_out;
AzureIoTClient 56:8704100b3b54 1412 if (is_timeout_reached(instance->last_message_sender_state_change_time, MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS, &is_timed_out) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1413 {
AzureIoTClient 56:8704100b3b54 1414 LogError("messenger failed to start (failed to verify messagesender start timeout)");
AzureIoTClient 56:8704100b3b54 1415 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1416 }
AzureIoTClient 56:8704100b3b54 1417 else if (is_timed_out)
AzureIoTClient 56:8704100b3b54 1418 {
AzureIoTClient 56:8704100b3b54 1419 LogError("messenger failed to start (messagesender failed to start within expected timeout (%d secs))", MAX_MESSAGE_SENDER_STATE_CHANGE_TIMEOUT_SECS);
AzureIoTClient 56:8704100b3b54 1420 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1421 }
AzureIoTClient 56:8704100b3b54 1422 }
AzureIoTClient 56:8704100b3b54 1423 // For this module, the only valid scenario where messagesender state is IDLE is if
AzureIoTClient 56:8704100b3b54 1424 // the messagesender hasn't been created yet or already destroyed.
AzureIoTClient 56:8704100b3b54 1425 else if ((instance->message_sender_current_state == MESSAGE_SENDER_STATE_ERROR) ||
AzureIoTClient 56:8704100b3b54 1426 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_CLOSING) ||
AzureIoTClient 56:8704100b3b54 1427 (instance->message_sender_current_state == MESSAGE_SENDER_STATE_IDLE && instance->message_sender != NULL))
AzureIoTClient 56:8704100b3b54 1428 {
AzureIoTClient 56:8704100b3b54 1429 LogError("messagesender reported unexpected state %d while messenger is starting", instance->message_sender_current_state);
AzureIoTClient 56:8704100b3b54 1430 update_messenger_state(instance, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1431 }
AzureIoTClient 56:8704100b3b54 1432 }
AzureIoTClient 56:8704100b3b54 1433 // message sender and receiver are stopped/destroyed synchronously, so no need for state control.
AzureIoTClient 56:8704100b3b54 1434 }
AzureIoTClient 39:e98d5df6dc74 1435 }
AzureIoTClient 39:e98d5df6dc74 1436
AzureIoTClient 39:e98d5df6dc74 1437 static void manage_amqp_messengers(AMQP_MESSENGER_INSTANCE* msgr)
AzureIoTClient 39:e98d5df6dc74 1438 {
AzureIoTClient 56:8704100b3b54 1439 if (msgr->state == AMQP_MESSENGER_STATE_STARTING)
AzureIoTClient 56:8704100b3b54 1440 {
AzureIoTClient 56:8704100b3b54 1441 if (msgr->message_sender == NULL)
AzureIoTClient 56:8704100b3b54 1442 {
AzureIoTClient 56:8704100b3b54 1443 if (create_message_sender(msgr) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1444 {
AzureIoTClient 56:8704100b3b54 1445 update_messenger_state(msgr, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1446 }
AzureIoTClient 56:8704100b3b54 1447 }
AzureIoTClient 56:8704100b3b54 1448 }
AzureIoTClient 56:8704100b3b54 1449 else if (msgr->state == AMQP_MESSENGER_STATE_STARTED)
AzureIoTClient 56:8704100b3b54 1450 {
AzureIoTClient 56:8704100b3b54 1451 if (msgr->receive_messages == true &&
AzureIoTClient 56:8704100b3b54 1452 msgr->message_receiver == NULL &&
AzureIoTClient 56:8704100b3b54 1453 create_message_receiver(msgr) != RESULT_OK)
AzureIoTClient 56:8704100b3b54 1454 {
AzureIoTClient 56:8704100b3b54 1455 LogError("amqp_messenger_do_work warning (failed creating the message receiver [%s])", msgr->config->device_id);
AzureIoTClient 56:8704100b3b54 1456 }
AzureIoTClient 56:8704100b3b54 1457 else if (msgr->receive_messages == false && msgr->message_receiver != NULL)
AzureIoTClient 56:8704100b3b54 1458 {
AzureIoTClient 56:8704100b3b54 1459 destroy_message_receiver(msgr);
AzureIoTClient 56:8704100b3b54 1460 }
AzureIoTClient 56:8704100b3b54 1461 }
AzureIoTClient 39:e98d5df6dc74 1462 }
AzureIoTClient 39:e98d5df6dc74 1463
AzureIoTClient 39:e98d5df6dc74 1464 static void handle_errors_and_timeouts(AMQP_MESSENGER_INSTANCE* msgr)
AzureIoTClient 39:e98d5df6dc74 1465 {
AzureIoTClient 56:8704100b3b54 1466 if (msgr->send_error_count >= msgr->max_send_error_count)
AzureIoTClient 56:8704100b3b54 1467 {
AzureIoTClient 56:8704100b3b54 1468 LogError("Reached max number of consecutive send failures (%d, %d)", msgr->config->device_id, msgr->max_send_error_count);
AzureIoTClient 56:8704100b3b54 1469 update_messenger_state(msgr, AMQP_MESSENGER_STATE_ERROR);
AzureIoTClient 56:8704100b3b54 1470 }
AzureIoTClient 39:e98d5df6dc74 1471 }
AzureIoTClient 39:e98d5df6dc74 1472
AzureIoTClient 39:e98d5df6dc74 1473 void amqp_messenger_do_work(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1474 {
AzureIoTClient 56:8704100b3b54 1475 if (messenger_handle == NULL)
AzureIoTClient 56:8704100b3b54 1476 {
AzureIoTClient 56:8704100b3b54 1477 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_064: [If `messenger_handle` is NULL, amqp_messenger_do_work() shall fail and return]
AzureIoTClient 56:8704100b3b54 1478 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 56:8704100b3b54 1479 }
AzureIoTClient 56:8704100b3b54 1480 else
AzureIoTClient 56:8704100b3b54 1481 {
AzureIoTClient 56:8704100b3b54 1482 AMQP_MESSENGER_INSTANCE* msgr = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 56:8704100b3b54 1483
AzureIoTClient 56:8704100b3b54 1484 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_065: [amqp_messenger_do_work() shall update the current state according to the states of message sender and receiver]
AzureIoTClient 56:8704100b3b54 1485 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_068: [If the message sender state changes to MESSAGE_SENDER_STATE_OPEN, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_STARTED]
AzureIoTClient 56:8704100b3b54 1486 process_state_changes(msgr);
AzureIoTClient 39:e98d5df6dc74 1487
AzureIoTClient 56:8704100b3b54 1488 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_066: [If the current state is AMQP_MESSENGER_STARTING, amqp_messenger_do_work() shall create and start `instance->message_sender`]
AzureIoTClient 56:8704100b3b54 1489 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_067: [If the `instance->message_sender` fails to be created/started, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 1490 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_069: [If the `instance->receive_messages` is true, amqp_messenger_do_work() shall create and start `instance->message_receiver` if not done before]
AzureIoTClient 56:8704100b3b54 1491 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_070: [If the `instance->message_receiver` fails to be created/started, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 1492 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_071: [If the `instance->receive_messages` is false, amqp_messenger_do_work() shall stop and destroy `instance->message_receiver` if not done before]
AzureIoTClient 56:8704100b3b54 1493 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_072: [If the `instance->message_receiver` fails to be stopped/destroyed, amqp_messenger_do_work() shall set the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 1494 manage_amqp_messengers(msgr);
AzureIoTClient 39:e98d5df6dc74 1495
AzureIoTClient 56:8704100b3b54 1496 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_073: [amqp_messenger_do_work() shall invoke message_queue_do_work() on `instance->send_queue`]
AzureIoTClient 56:8704100b3b54 1497 if (msgr->state == AMQP_MESSENGER_STATE_STARTED)
AzureIoTClient 56:8704100b3b54 1498 {
AzureIoTClient 56:8704100b3b54 1499 message_queue_do_work(msgr->send_queue);
AzureIoTClient 56:8704100b3b54 1500 }
AzureIoTClient 56:8704100b3b54 1501
AzureIoTClient 56:8704100b3b54 1502 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_074: [If `instance->send_error_count` DEFAULT_MAX_SEND_ERROR_COUNT (10), amqp_messenger_do_work() shall change the state to AMQP_MESSENGER_STATE_ERROR]
AzureIoTClient 56:8704100b3b54 1503 handle_errors_and_timeouts(msgr);
AzureIoTClient 56:8704100b3b54 1504 }
AzureIoTClient 39:e98d5df6dc74 1505 }
AzureIoTClient 39:e98d5df6dc74 1506
AzureIoTClient 39:e98d5df6dc74 1507 void amqp_messenger_destroy(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1508 {
AzureIoTClient 56:8704100b3b54 1509 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_123: [If `messenger_handle` is NULL, amqp_messenger_destroy() shall fail and return]
AzureIoTClient 56:8704100b3b54 1510 if (messenger_handle == NULL)
AzureIoTClient 56:8704100b3b54 1511 {
AzureIoTClient 56:8704100b3b54 1512 LogError("invalid argument (messenger_handle is NULL)");
AzureIoTClient 56:8704100b3b54 1513 }
AzureIoTClient 56:8704100b3b54 1514 else
AzureIoTClient 56:8704100b3b54 1515 {
AzureIoTClient 56:8704100b3b54 1516 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1517
AzureIoTClient 56:8704100b3b54 1518 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_124: [If `instance->state` is not AMQP_MESSENGER_STATE_STOPPED, amqp_messenger_destroy() shall invoke amqp_messenger_stop()]
AzureIoTClient 56:8704100b3b54 1519 if (instance->state != AMQP_MESSENGER_STATE_STOPPED)
AzureIoTClient 56:8704100b3b54 1520 {
AzureIoTClient 56:8704100b3b54 1521 (void)amqp_messenger_stop(messenger_handle);
AzureIoTClient 56:8704100b3b54 1522 }
AzureIoTClient 39:e98d5df6dc74 1523
AzureIoTClient 56:8704100b3b54 1524 if (instance->send_queue != NULL)
AzureIoTClient 56:8704100b3b54 1525 {
AzureIoTClient 56:8704100b3b54 1526 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_125: [message_queue_destroy() shall be invoked for `instance->send_queue`]
AzureIoTClient 56:8704100b3b54 1527 message_queue_destroy(instance->send_queue);
AzureIoTClient 56:8704100b3b54 1528 }
AzureIoTClient 39:e98d5df6dc74 1529
AzureIoTClient 56:8704100b3b54 1530 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_126: [amqp_messenger_destroy() shall release all the memory allocated for `instance`]
AzureIoTClient 56:8704100b3b54 1531 destroy_configuration(instance->config);
AzureIoTClient 39:e98d5df6dc74 1532
AzureIoTClient 56:8704100b3b54 1533 free((void*)instance);
AzureIoTClient 56:8704100b3b54 1534 }
AzureIoTClient 39:e98d5df6dc74 1535 }
AzureIoTClient 39:e98d5df6dc74 1536
AzureIoTClient 39:e98d5df6dc74 1537 AMQP_MESSENGER_HANDLE amqp_messenger_create(const AMQP_MESSENGER_CONFIG* messenger_config)
AzureIoTClient 39:e98d5df6dc74 1538 {
AzureIoTClient 56:8704100b3b54 1539 AMQP_MESSENGER_HANDLE handle;
AzureIoTClient 39:e98d5df6dc74 1540
AzureIoTClient 56:8704100b3b54 1541 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_001: [If `messenger_config` is NULL, amqp_messenger_create() shall return NULL]
AzureIoTClient 56:8704100b3b54 1542 if (!is_valid_configuration(messenger_config))
AzureIoTClient 56:8704100b3b54 1543 {
AzureIoTClient 56:8704100b3b54 1544 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_002: [If `messenger_config`'s `device_id`, `iothub_host_fqdn`, `receive_link.source_suffix` or `send_link.target_suffix` are NULL, amqp_messenger_create() shall return NULL]
AzureIoTClient 56:8704100b3b54 1545 handle = NULL;
AzureIoTClient 56:8704100b3b54 1546 }
AzureIoTClient 56:8704100b3b54 1547 else
AzureIoTClient 56:8704100b3b54 1548 {
AzureIoTClient 56:8704100b3b54 1549 AMQP_MESSENGER_INSTANCE* instance;
AzureIoTClient 39:e98d5df6dc74 1550
AzureIoTClient 56:8704100b3b54 1551 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_003: [amqp_messenger_create() shall allocate memory for the messenger instance structure (aka `instance`)]
AzureIoTClient 56:8704100b3b54 1552 if ((instance = (AMQP_MESSENGER_INSTANCE*)malloc(sizeof(AMQP_MESSENGER_INSTANCE))) == NULL)
AzureIoTClient 56:8704100b3b54 1553 {
AzureIoTClient 56:8704100b3b54 1554 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_004: [If malloc() fails, amqp_messenger_create() shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1555 LogError("Failed allocating AMQP_MESSENGER_INSTANCE");
AzureIoTClient 56:8704100b3b54 1556 handle = NULL;
AzureIoTClient 56:8704100b3b54 1557 }
AzureIoTClient 56:8704100b3b54 1558 else
AzureIoTClient 56:8704100b3b54 1559 {
AzureIoTClient 56:8704100b3b54 1560 memset(instance, 0, sizeof(AMQP_MESSENGER_INSTANCE));
AzureIoTClient 39:e98d5df6dc74 1561
AzureIoTClient 56:8704100b3b54 1562 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_005: [amqp_messenger_create() shall save a copy of `messenger_config` into `instance`]
AzureIoTClient 56:8704100b3b54 1563 if ((instance->config = clone_configuration(messenger_config)) == NULL)
AzureIoTClient 56:8704100b3b54 1564 {
AzureIoTClient 56:8704100b3b54 1565 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_006: [If the copy fails, amqp_messenger_create() shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1566 LogError("Failed copying AMQP messenger configuration");
AzureIoTClient 56:8704100b3b54 1567 handle = NULL;
AzureIoTClient 56:8704100b3b54 1568 }
AzureIoTClient 56:8704100b3b54 1569 else
AzureIoTClient 56:8704100b3b54 1570 {
AzureIoTClient 56:8704100b3b54 1571 MESSAGE_QUEUE_CONFIG mq_config;
AzureIoTClient 56:8704100b3b54 1572 mq_config.max_retry_count = DEFAULT_EVENT_SEND_RETRY_LIMIT;
AzureIoTClient 56:8704100b3b54 1573 mq_config.max_message_enqueued_time_secs = DEFAULT_EVENT_SEND_TIMEOUT_SECS;
AzureIoTClient 56:8704100b3b54 1574 mq_config.max_message_processing_time_secs = 0;
AzureIoTClient 56:8704100b3b54 1575 mq_config.on_process_message_callback = on_process_message_callback;
AzureIoTClient 39:e98d5df6dc74 1576
AzureIoTClient 56:8704100b3b54 1577 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_007: [`instance->send_queue` shall be set using message_queue_create(), passing `on_process_message_callback`]
AzureIoTClient 56:8704100b3b54 1578 if ((instance->send_queue = message_queue_create(&mq_config)) == NULL)
AzureIoTClient 56:8704100b3b54 1579 {
AzureIoTClient 56:8704100b3b54 1580 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_008: [If message_queue_create() fails, amqp_messenger_create() shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1581 LogError("Failed creating message queue");
AzureIoTClient 56:8704100b3b54 1582 handle = NULL;
AzureIoTClient 56:8704100b3b54 1583 }
AzureIoTClient 56:8704100b3b54 1584 else
AzureIoTClient 56:8704100b3b54 1585 {
AzureIoTClient 56:8704100b3b54 1586 instance->state = AMQP_MESSENGER_STATE_STOPPED;
AzureIoTClient 56:8704100b3b54 1587 instance->message_sender_current_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 1588 instance->message_sender_previous_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 1589 instance->message_receiver_current_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 1590 instance->message_receiver_previous_state = MESSAGE_RECEIVER_STATE_IDLE;
AzureIoTClient 56:8704100b3b54 1591 instance->last_message_sender_state_change_time = INDEFINITE_TIME;
AzureIoTClient 56:8704100b3b54 1592 instance->last_message_receiver_state_change_time = INDEFINITE_TIME;
AzureIoTClient 56:8704100b3b54 1593 instance->max_send_error_count = DEFAULT_MAX_SEND_ERROR_COUNT;
AzureIoTClient 56:8704100b3b54 1594 instance->receive_messages = false;
AzureIoTClient 39:e98d5df6dc74 1595
AzureIoTClient 56:8704100b3b54 1596 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_009: [If no failures occurr, amqp_messenger_create() shall return a handle to `instance`]
AzureIoTClient 56:8704100b3b54 1597 handle = (AMQP_MESSENGER_HANDLE)instance;
AzureIoTClient 56:8704100b3b54 1598 }
AzureIoTClient 56:8704100b3b54 1599 }
AzureIoTClient 56:8704100b3b54 1600 }
AzureIoTClient 39:e98d5df6dc74 1601
AzureIoTClient 56:8704100b3b54 1602 if (handle == NULL)
AzureIoTClient 56:8704100b3b54 1603 {
AzureIoTClient 56:8704100b3b54 1604 amqp_messenger_destroy((AMQP_MESSENGER_HANDLE)instance);
AzureIoTClient 56:8704100b3b54 1605 }
AzureIoTClient 56:8704100b3b54 1606 }
AzureIoTClient 39:e98d5df6dc74 1607
AzureIoTClient 56:8704100b3b54 1608 return handle;
AzureIoTClient 39:e98d5df6dc74 1609 }
AzureIoTClient 39:e98d5df6dc74 1610
AzureIoTClient 39:e98d5df6dc74 1611 int amqp_messenger_set_option(AMQP_MESSENGER_HANDLE messenger_handle, const char* name, void* value)
AzureIoTClient 39:e98d5df6dc74 1612 {
AzureIoTClient 56:8704100b3b54 1613 int result;
AzureIoTClient 39:e98d5df6dc74 1614
AzureIoTClient 56:8704100b3b54 1615 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_127: [If `messenger_handle` or `name` or `value` is NULL, amqp_messenger_set_option shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1616 if (messenger_handle == NULL || name == NULL || value == NULL)
AzureIoTClient 56:8704100b3b54 1617 {
AzureIoTClient 56:8704100b3b54 1618 LogError("Invalid argument (messenger_handle=%p, name=%p, value=%p)",
AzureIoTClient 56:8704100b3b54 1619 messenger_handle, name, value);
AzureIoTClient 56:8704100b3b54 1620 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1621 }
AzureIoTClient 56:8704100b3b54 1622 else
AzureIoTClient 56:8704100b3b54 1623 {
AzureIoTClient 56:8704100b3b54 1624 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 39:e98d5df6dc74 1625
AzureIoTClient 56:8704100b3b54 1626 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_128: [If name matches AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, `value` shall be set on `instance->send_queue` using message_queue_set_max_message_enqueued_time_secs()]
AzureIoTClient 56:8704100b3b54 1627 if (strcmp(AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS, name) == 0)
AzureIoTClient 56:8704100b3b54 1628 {
AzureIoTClient 56:8704100b3b54 1629 if (message_queue_set_max_message_enqueued_time_secs(instance->send_queue, *(size_t*)value) != 0)
AzureIoTClient 56:8704100b3b54 1630 {
AzureIoTClient 56:8704100b3b54 1631 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_129: [If message_queue_set_max_message_enqueued_time_secs() fails, amqp_messenger_set_option() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1632 LogError("Failed setting option %s", AMQP_MESSENGER_OPTION_EVENT_SEND_TIMEOUT_SECS);
AzureIoTClient 56:8704100b3b54 1633 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1634 }
AzureIoTClient 56:8704100b3b54 1635 else
AzureIoTClient 56:8704100b3b54 1636 {
AzureIoTClient 56:8704100b3b54 1637 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_131: [If no errors occur, amqp_messenger_set_option shall return 0]
AzureIoTClient 56:8704100b3b54 1638 result = RESULT_OK;
AzureIoTClient 56:8704100b3b54 1639 }
AzureIoTClient 56:8704100b3b54 1640 }
AzureIoTClient 56:8704100b3b54 1641 else
AzureIoTClient 56:8704100b3b54 1642 {
AzureIoTClient 56:8704100b3b54 1643 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_130: [If `name` does not match any supported option, amqp_messenger_set_option() shall fail and return a non-zero value]
AzureIoTClient 56:8704100b3b54 1644 LogError("Invalid argument (option '%s' is not valid)", name);
AzureIoTClient 56:8704100b3b54 1645 result = __FAILURE__;
AzureIoTClient 56:8704100b3b54 1646 }
AzureIoTClient 56:8704100b3b54 1647 }
AzureIoTClient 39:e98d5df6dc74 1648
AzureIoTClient 56:8704100b3b54 1649 return result;
AzureIoTClient 39:e98d5df6dc74 1650 }
AzureIoTClient 39:e98d5df6dc74 1651
AzureIoTClient 39:e98d5df6dc74 1652 OPTIONHANDLER_HANDLE amqp_messenger_retrieve_options(AMQP_MESSENGER_HANDLE messenger_handle)
AzureIoTClient 39:e98d5df6dc74 1653 {
AzureIoTClient 56:8704100b3b54 1654 OPTIONHANDLER_HANDLE result;
AzureIoTClient 39:e98d5df6dc74 1655
AzureIoTClient 56:8704100b3b54 1656 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [If `messenger_handle` is NULL, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1657 if (messenger_handle == NULL)
AzureIoTClient 56:8704100b3b54 1658 {
AzureIoTClient 56:8704100b3b54 1659 LogError("Invalid argument (messenger_handle is NULL)");
AzureIoTClient 56:8704100b3b54 1660 result = NULL;
AzureIoTClient 56:8704100b3b54 1661 }
AzureIoTClient 56:8704100b3b54 1662 else
AzureIoTClient 56:8704100b3b54 1663 {
AzureIoTClient 56:8704100b3b54 1664 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_133: [An OPTIONHANDLER_HANDLE instance shall be created using OptionHandler_Create]
AzureIoTClient 56:8704100b3b54 1665 result = OptionHandler_Create(amqp_messenger_clone_option, amqp_messenger_destroy_option, (pfSetOption)amqp_messenger_set_option);
AzureIoTClient 39:e98d5df6dc74 1666
AzureIoTClient 56:8704100b3b54 1667 if (result == NULL)
AzureIoTClient 56:8704100b3b54 1668 {
AzureIoTClient 56:8704100b3b54 1669 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_134: [If an OPTIONHANDLER_HANDLE instance fails to be created, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1670 LogError("Failed to retrieve options from messenger instance (OptionHandler_Create failed)");
AzureIoTClient 56:8704100b3b54 1671 }
AzureIoTClient 56:8704100b3b54 1672 else
AzureIoTClient 56:8704100b3b54 1673 {
AzureIoTClient 56:8704100b3b54 1674 AMQP_MESSENGER_INSTANCE* instance = (AMQP_MESSENGER_INSTANCE*)messenger_handle;
AzureIoTClient 56:8704100b3b54 1675 OPTIONHANDLER_HANDLE mq_options;
AzureIoTClient 39:e98d5df6dc74 1676
AzureIoTClient 56:8704100b3b54 1677 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_135: [`instance->send_queue` options shall be retrieved using message_queue_retrieve_options()]
AzureIoTClient 56:8704100b3b54 1678 if ((mq_options = message_queue_retrieve_options(instance->send_queue)) == NULL)
AzureIoTClient 56:8704100b3b54 1679 {
AzureIoTClient 56:8704100b3b54 1680 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_136: [If message_queue_retrieve_options() fails, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1681 LogError("failed to retrieve options from send queue)");
AzureIoTClient 56:8704100b3b54 1682 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If amqp_messenger_retrieve_options fails, any allocated memory shall be freed]
AzureIoTClient 56:8704100b3b54 1683 OptionHandler_Destroy(result);
AzureIoTClient 56:8704100b3b54 1684 result = NULL;
AzureIoTClient 56:8704100b3b54 1685 }
AzureIoTClient 56:8704100b3b54 1686 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_137: [Each option of `instance` shall be added to the OPTIONHANDLER_HANDLE instance using OptionHandler_AddOption]
AzureIoTClient 56:8704100b3b54 1687 else if (OptionHandler_AddOption(result, MESSENGER_SAVED_MQ_OPTIONS, (void*)mq_options) != OPTIONHANDLER_OK)
AzureIoTClient 56:8704100b3b54 1688 {
AzureIoTClient 56:8704100b3b54 1689 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_138: [If OptionHandler_AddOption fails, amqp_messenger_retrieve_options shall fail and return NULL]
AzureIoTClient 56:8704100b3b54 1690 LogError("failed adding option '%s'", MESSENGER_SAVED_MQ_OPTIONS);
AzureIoTClient 56:8704100b3b54 1691 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_139: [If amqp_messenger_retrieve_options fails, any allocated memory shall be freed]
AzureIoTClient 56:8704100b3b54 1692 OptionHandler_Destroy(mq_options);
AzureIoTClient 56:8704100b3b54 1693 OptionHandler_Destroy(result);
AzureIoTClient 56:8704100b3b54 1694 result = NULL;
AzureIoTClient 56:8704100b3b54 1695 }
AzureIoTClient 56:8704100b3b54 1696 }
AzureIoTClient 56:8704100b3b54 1697 }
AzureIoTClient 39:e98d5df6dc74 1698
AzureIoTClient 56:8704100b3b54 1699 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_140: [If no failures occur, amqp_messenger_retrieve_options shall return the OPTIONHANDLER_HANDLE instance]
AzureIoTClient 56:8704100b3b54 1700 return result;
AzureIoTClient 39:e98d5df6dc74 1701 }
AzureIoTClient 39:e98d5df6dc74 1702
AzureIoTClient 39:e98d5df6dc74 1703 void amqp_messenger_destroy_disposition_info(AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info)
AzureIoTClient 39:e98d5df6dc74 1704 {
AzureIoTClient 56:8704100b3b54 1705 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [If `disposition_info` is NULL, amqp_messenger_destroy_disposition_info shall return immediately]
AzureIoTClient 56:8704100b3b54 1706 if (disposition_info == NULL)
AzureIoTClient 56:8704100b3b54 1707 {
AzureIoTClient 56:8704100b3b54 1708 LogError("Invalid argument (disposition_info is NULL)");
AzureIoTClient 56:8704100b3b54 1709 }
AzureIoTClient 56:8704100b3b54 1710 else
AzureIoTClient 56:8704100b3b54 1711 {
AzureIoTClient 56:8704100b3b54 1712 // Codes_SRS_IOTHUBTRANSPORT_AMQP_MESSENGER_09_132: [All memory allocated for `disposition_info` shall be released]
AzureIoTClient 56:8704100b3b54 1713 destroy_message_disposition_info(disposition_info);
AzureIoTClient 56:8704100b3b54 1714 }
AzureIoTClient 39:e98d5df6dc74 1715 }