A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Committer:
Azure.IoT Build
Date:
Fri Apr 08 12:01:10 2016 -0700
Revision:
0:6ae2f7bca550
Child:
1:eab586236bfe
1.0.4

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Azure.IoT Build 0:6ae2f7bca550 1 // Copyright (c) Microsoft. All rights reserved.
Azure.IoT Build 0:6ae2f7bca550 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
Azure.IoT Build 0:6ae2f7bca550 3
Azure.IoT Build 0:6ae2f7bca550 4 #include <stdlib.h>
Azure.IoT Build 0:6ae2f7bca550 5 #ifdef _CRTDBG_MAP_ALLOC
Azure.IoT Build 0:6ae2f7bca550 6 #include <crtdbg.h>
Azure.IoT Build 0:6ae2f7bca550 7 #endif
Azure.IoT Build 0:6ae2f7bca550 8 #include <string.h>
Azure.IoT Build 0:6ae2f7bca550 9 #include "azure_uamqp_c/message_sender.h"
Azure.IoT Build 0:6ae2f7bca550 10 #include "azure_uamqp_c/amqpalloc.h"
Azure.IoT Build 0:6ae2f7bca550 11 #include "azure_c_shared_utility/xlogging.h"
Azure.IoT Build 0:6ae2f7bca550 12 #include "azure_uamqp_c/amqpvalue_to_string.h"
Azure.IoT Build 0:6ae2f7bca550 13
Azure.IoT Build 0:6ae2f7bca550 14 typedef enum MESSAGE_SEND_STATE_TAG
Azure.IoT Build 0:6ae2f7bca550 15 {
Azure.IoT Build 0:6ae2f7bca550 16 MESSAGE_SEND_STATE_NOT_SENT,
Azure.IoT Build 0:6ae2f7bca550 17 MESSAGE_SEND_STATE_PENDING
Azure.IoT Build 0:6ae2f7bca550 18 } MESSAGE_SEND_STATE;
Azure.IoT Build 0:6ae2f7bca550 19
Azure.IoT Build 0:6ae2f7bca550 20 typedef enum SEND_ONE_MESSAGE_RESULT_TAG
Azure.IoT Build 0:6ae2f7bca550 21 {
Azure.IoT Build 0:6ae2f7bca550 22 SEND_ONE_MESSAGE_OK,
Azure.IoT Build 0:6ae2f7bca550 23 SEND_ONE_MESSAGE_ERROR,
Azure.IoT Build 0:6ae2f7bca550 24 SEND_ONE_MESSAGE_BUSY
Azure.IoT Build 0:6ae2f7bca550 25 } SEND_ONE_MESSAGE_RESULT;
Azure.IoT Build 0:6ae2f7bca550 26
Azure.IoT Build 0:6ae2f7bca550 27 typedef struct MESSAGE_WITH_CALLBACK_TAG
Azure.IoT Build 0:6ae2f7bca550 28 {
Azure.IoT Build 0:6ae2f7bca550 29 MESSAGE_HANDLE message;
Azure.IoT Build 0:6ae2f7bca550 30 ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 31 void* context;
Azure.IoT Build 0:6ae2f7bca550 32 MESSAGE_SENDER_HANDLE message_sender;
Azure.IoT Build 0:6ae2f7bca550 33 MESSAGE_SEND_STATE message_send_state;
Azure.IoT Build 0:6ae2f7bca550 34 } MESSAGE_WITH_CALLBACK;
Azure.IoT Build 0:6ae2f7bca550 35
Azure.IoT Build 0:6ae2f7bca550 36 typedef struct MESSAGE_SENDER_INSTANCE_TAG
Azure.IoT Build 0:6ae2f7bca550 37 {
Azure.IoT Build 0:6ae2f7bca550 38 LINK_HANDLE link;
Azure.IoT Build 0:6ae2f7bca550 39 size_t message_count;
Azure.IoT Build 0:6ae2f7bca550 40 MESSAGE_WITH_CALLBACK** messages;
Azure.IoT Build 0:6ae2f7bca550 41 MESSAGE_SENDER_STATE message_sender_state;
Azure.IoT Build 0:6ae2f7bca550 42 ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
Azure.IoT Build 0:6ae2f7bca550 43 void* on_message_sender_state_changed_context;
Azure.IoT Build 0:6ae2f7bca550 44 LOGGER_LOG logger_log;
Azure.IoT Build 0:6ae2f7bca550 45 } MESSAGE_SENDER_INSTANCE;
Azure.IoT Build 0:6ae2f7bca550 46
Azure.IoT Build 0:6ae2f7bca550 47 static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
Azure.IoT Build 0:6ae2f7bca550 48 {
Azure.IoT Build 0:6ae2f7bca550 49 MESSAGE_WITH_CALLBACK** new_messages;
Azure.IoT Build 0:6ae2f7bca550 50
Azure.IoT Build 0:6ae2f7bca550 51 if (message_sender_instance->messages[index]->message != NULL)
Azure.IoT Build 0:6ae2f7bca550 52 {
Azure.IoT Build 0:6ae2f7bca550 53 message_destroy(message_sender_instance->messages[index]->message);
Azure.IoT Build 0:6ae2f7bca550 54 message_sender_instance->messages[index]->message = NULL;
Azure.IoT Build 0:6ae2f7bca550 55 }
Azure.IoT Build 0:6ae2f7bca550 56
Azure.IoT Build 0:6ae2f7bca550 57 amqpalloc_free(message_sender_instance->messages[index]);
Azure.IoT Build 0:6ae2f7bca550 58
Azure.IoT Build 0:6ae2f7bca550 59 if (message_sender_instance->message_count - index > 1)
Azure.IoT Build 0:6ae2f7bca550 60 {
Azure.IoT Build 0:6ae2f7bca550 61 (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
Azure.IoT Build 0:6ae2f7bca550 62 }
Azure.IoT Build 0:6ae2f7bca550 63
Azure.IoT Build 0:6ae2f7bca550 64 message_sender_instance->message_count--;
Azure.IoT Build 0:6ae2f7bca550 65
Azure.IoT Build 0:6ae2f7bca550 66 if (message_sender_instance->message_count > 0)
Azure.IoT Build 0:6ae2f7bca550 67 {
Azure.IoT Build 0:6ae2f7bca550 68 new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
Azure.IoT Build 0:6ae2f7bca550 69 if (new_messages != NULL)
Azure.IoT Build 0:6ae2f7bca550 70 {
Azure.IoT Build 0:6ae2f7bca550 71 message_sender_instance->messages = new_messages;
Azure.IoT Build 0:6ae2f7bca550 72 }
Azure.IoT Build 0:6ae2f7bca550 73 }
Azure.IoT Build 0:6ae2f7bca550 74 else
Azure.IoT Build 0:6ae2f7bca550 75 {
Azure.IoT Build 0:6ae2f7bca550 76 amqpalloc_free(message_sender_instance->messages);
Azure.IoT Build 0:6ae2f7bca550 77 message_sender_instance->messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 78 }
Azure.IoT Build 0:6ae2f7bca550 79 }
Azure.IoT Build 0:6ae2f7bca550 80
Azure.IoT Build 0:6ae2f7bca550 81 static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 82 {
Azure.IoT Build 0:6ae2f7bca550 83 size_t i;
Azure.IoT Build 0:6ae2f7bca550 84
Azure.IoT Build 0:6ae2f7bca550 85 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 86 {
Azure.IoT Build 0:6ae2f7bca550 87 if (message_sender_instance->messages[i] == message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 88 {
Azure.IoT Build 0:6ae2f7bca550 89 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 90 break;
Azure.IoT Build 0:6ae2f7bca550 91 }
Azure.IoT Build 0:6ae2f7bca550 92 }
Azure.IoT Build 0:6ae2f7bca550 93 }
Azure.IoT Build 0:6ae2f7bca550 94
Azure.IoT Build 0:6ae2f7bca550 95 static void on_delivery_settled(void* context, delivery_number delivery_no)
Azure.IoT Build 0:6ae2f7bca550 96 {
Azure.IoT Build 0:6ae2f7bca550 97 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
Azure.IoT Build 0:6ae2f7bca550 98 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
Azure.IoT Build 0:6ae2f7bca550 99
Azure.IoT Build 0:6ae2f7bca550 100 if (message_with_callback->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 101 {
Azure.IoT Build 0:6ae2f7bca550 102 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
Azure.IoT Build 0:6ae2f7bca550 103 }
Azure.IoT Build 0:6ae2f7bca550 104
Azure.IoT Build 0:6ae2f7bca550 105 remove_pending_message(message_sender_instance, message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 106 }
Azure.IoT Build 0:6ae2f7bca550 107
Azure.IoT Build 0:6ae2f7bca550 108 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
Azure.IoT Build 0:6ae2f7bca550 109 {
Azure.IoT Build 0:6ae2f7bca550 110 PAYLOAD* payload = (PAYLOAD*)context;
Azure.IoT Build 0:6ae2f7bca550 111 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
Azure.IoT Build 0:6ae2f7bca550 112 payload->length += length;
Azure.IoT Build 0:6ae2f7bca550 113 return 0;
Azure.IoT Build 0:6ae2f7bca550 114 }
Azure.IoT Build 0:6ae2f7bca550 115
Azure.IoT Build 0:6ae2f7bca550 116 static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
Azure.IoT Build 0:6ae2f7bca550 117 {
Azure.IoT Build 0:6ae2f7bca550 118 if (message_sender_instance->logger_log != NULL)
Azure.IoT Build 0:6ae2f7bca550 119 {
Azure.IoT Build 0:6ae2f7bca550 120 char* value_as_string = NULL;
Azure.IoT Build 0:6ae2f7bca550 121 LOG(message_sender_instance->logger_log, 0, "%s", name);
Azure.IoT Build 0:6ae2f7bca550 122 LOG(message_sender_instance->logger_log, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
Azure.IoT Build 0:6ae2f7bca550 123 if (value_as_string != NULL)
Azure.IoT Build 0:6ae2f7bca550 124 {
Azure.IoT Build 0:6ae2f7bca550 125 amqpalloc_free(value_as_string);
Azure.IoT Build 0:6ae2f7bca550 126 }
Azure.IoT Build 0:6ae2f7bca550 127 }
Azure.IoT Build 0:6ae2f7bca550 128 }
Azure.IoT Build 0:6ae2f7bca550 129
Azure.IoT Build 0:6ae2f7bca550 130 static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
Azure.IoT Build 0:6ae2f7bca550 131 {
Azure.IoT Build 0:6ae2f7bca550 132 SEND_ONE_MESSAGE_RESULT result;
Azure.IoT Build 0:6ae2f7bca550 133
Azure.IoT Build 0:6ae2f7bca550 134 size_t encoded_size;
Azure.IoT Build 0:6ae2f7bca550 135 size_t total_encoded_size = 0;
Azure.IoT Build 0:6ae2f7bca550 136 MESSAGE_BODY_TYPE message_body_type;
Azure.IoT Build 0:6ae2f7bca550 137 message_format message_format;
Azure.IoT Build 0:6ae2f7bca550 138
Azure.IoT Build 0:6ae2f7bca550 139 if ((message_get_body_type(message, &message_body_type) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 140 (message_get_message_format(message, &message_format) != 0))
Azure.IoT Build 0:6ae2f7bca550 141 {
Azure.IoT Build 0:6ae2f7bca550 142 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 143 }
Azure.IoT Build 0:6ae2f7bca550 144 else
Azure.IoT Build 0:6ae2f7bca550 145 {
Azure.IoT Build 0:6ae2f7bca550 146 // header
Azure.IoT Build 0:6ae2f7bca550 147 HEADER_HANDLE header;
Azure.IoT Build 0:6ae2f7bca550 148 AMQP_VALUE header_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 149 PROPERTIES_HANDLE properties;
Azure.IoT Build 0:6ae2f7bca550 150 AMQP_VALUE properties_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 151 AMQP_VALUE application_properties;
Azure.IoT Build 0:6ae2f7bca550 152 AMQP_VALUE application_properties_value;
Azure.IoT Build 0:6ae2f7bca550 153 AMQP_VALUE body_amqp_value = NULL;
Azure.IoT Build 0:6ae2f7bca550 154 size_t body_data_count;
Azure.IoT Build 0:6ae2f7bca550 155
Azure.IoT Build 0:6ae2f7bca550 156 message_get_header(message, &header);
Azure.IoT Build 0:6ae2f7bca550 157 header_amqp_value = amqpvalue_create_header(header);
Azure.IoT Build 0:6ae2f7bca550 158 if (header != NULL)
Azure.IoT Build 0:6ae2f7bca550 159 {
Azure.IoT Build 0:6ae2f7bca550 160 amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 161 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 162 }
Azure.IoT Build 0:6ae2f7bca550 163
Azure.IoT Build 0:6ae2f7bca550 164 // properties
Azure.IoT Build 0:6ae2f7bca550 165 message_get_properties(message, &properties);
Azure.IoT Build 0:6ae2f7bca550 166 properties_amqp_value = amqpvalue_create_properties(properties);
Azure.IoT Build 0:6ae2f7bca550 167 if (properties != NULL)
Azure.IoT Build 0:6ae2f7bca550 168 {
Azure.IoT Build 0:6ae2f7bca550 169 amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 170 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 171 }
Azure.IoT Build 0:6ae2f7bca550 172
Azure.IoT Build 0:6ae2f7bca550 173 // application properties
Azure.IoT Build 0:6ae2f7bca550 174 message_get_application_properties(message, &application_properties);
Azure.IoT Build 0:6ae2f7bca550 175 application_properties_value = amqpvalue_create_application_properties(application_properties);
Azure.IoT Build 0:6ae2f7bca550 176 if (application_properties != NULL)
Azure.IoT Build 0:6ae2f7bca550 177 {
Azure.IoT Build 0:6ae2f7bca550 178 amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 179 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 180 }
Azure.IoT Build 0:6ae2f7bca550 181
Azure.IoT Build 0:6ae2f7bca550 182 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 183
Azure.IoT Build 0:6ae2f7bca550 184 // body - amqp data
Azure.IoT Build 0:6ae2f7bca550 185 switch (message_body_type)
Azure.IoT Build 0:6ae2f7bca550 186 {
Azure.IoT Build 0:6ae2f7bca550 187 default:
Azure.IoT Build 0:6ae2f7bca550 188 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 189 break;
Azure.IoT Build 0:6ae2f7bca550 190
Azure.IoT Build 0:6ae2f7bca550 191 case MESSAGE_BODY_TYPE_VALUE:
Azure.IoT Build 0:6ae2f7bca550 192 {
Azure.IoT Build 0:6ae2f7bca550 193 AMQP_VALUE message_body_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 194 if (message_get_inplace_body_amqp_value(message, &message_body_amqp_value) != 0)
Azure.IoT Build 0:6ae2f7bca550 195 {
Azure.IoT Build 0:6ae2f7bca550 196 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 197 }
Azure.IoT Build 0:6ae2f7bca550 198 else
Azure.IoT Build 0:6ae2f7bca550 199 {
Azure.IoT Build 0:6ae2f7bca550 200 body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 201 if ((body_amqp_value == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 202 (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
Azure.IoT Build 0:6ae2f7bca550 203 {
Azure.IoT Build 0:6ae2f7bca550 204 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 205 }
Azure.IoT Build 0:6ae2f7bca550 206 else
Azure.IoT Build 0:6ae2f7bca550 207 {
Azure.IoT Build 0:6ae2f7bca550 208 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 209 }
Azure.IoT Build 0:6ae2f7bca550 210 }
Azure.IoT Build 0:6ae2f7bca550 211
Azure.IoT Build 0:6ae2f7bca550 212 break;
Azure.IoT Build 0:6ae2f7bca550 213 }
Azure.IoT Build 0:6ae2f7bca550 214
Azure.IoT Build 0:6ae2f7bca550 215 case MESSAGE_BODY_TYPE_DATA:
Azure.IoT Build 0:6ae2f7bca550 216 {
Azure.IoT Build 0:6ae2f7bca550 217 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 218 size_t i;
Azure.IoT Build 0:6ae2f7bca550 219
Azure.IoT Build 0:6ae2f7bca550 220 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
Azure.IoT Build 0:6ae2f7bca550 221 {
Azure.IoT Build 0:6ae2f7bca550 222 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 223 }
Azure.IoT Build 0:6ae2f7bca550 224 else
Azure.IoT Build 0:6ae2f7bca550 225 {
Azure.IoT Build 0:6ae2f7bca550 226 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 227 {
Azure.IoT Build 0:6ae2f7bca550 228 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 229 {
Azure.IoT Build 0:6ae2f7bca550 230 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 231 }
Azure.IoT Build 0:6ae2f7bca550 232 else
Azure.IoT Build 0:6ae2f7bca550 233 {
Azure.IoT Build 0:6ae2f7bca550 234 amqp_binary binary_value = { binary_data.bytes, binary_data.length };
Azure.IoT Build 0:6ae2f7bca550 235 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 236 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 237 {
Azure.IoT Build 0:6ae2f7bca550 238 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 239 }
Azure.IoT Build 0:6ae2f7bca550 240 else
Azure.IoT Build 0:6ae2f7bca550 241 {
Azure.IoT Build 0:6ae2f7bca550 242 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
Azure.IoT Build 0:6ae2f7bca550 243 {
Azure.IoT Build 0:6ae2f7bca550 244 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 245 }
Azure.IoT Build 0:6ae2f7bca550 246 else
Azure.IoT Build 0:6ae2f7bca550 247 {
Azure.IoT Build 0:6ae2f7bca550 248 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 249 }
Azure.IoT Build 0:6ae2f7bca550 250
Azure.IoT Build 0:6ae2f7bca550 251 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 252 }
Azure.IoT Build 0:6ae2f7bca550 253 }
Azure.IoT Build 0:6ae2f7bca550 254 }
Azure.IoT Build 0:6ae2f7bca550 255 }
Azure.IoT Build 0:6ae2f7bca550 256 break;
Azure.IoT Build 0:6ae2f7bca550 257 }
Azure.IoT Build 0:6ae2f7bca550 258 }
Azure.IoT Build 0:6ae2f7bca550 259
Azure.IoT Build 0:6ae2f7bca550 260 if (result == 0)
Azure.IoT Build 0:6ae2f7bca550 261 {
Azure.IoT Build 0:6ae2f7bca550 262 void* data_bytes = amqpalloc_malloc(total_encoded_size);
Azure.IoT Build 0:6ae2f7bca550 263 PAYLOAD payload = { data_bytes, 0 };
Azure.IoT Build 0:6ae2f7bca550 264 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 265
Azure.IoT Build 0:6ae2f7bca550 266 if (header != NULL)
Azure.IoT Build 0:6ae2f7bca550 267 {
Azure.IoT Build 0:6ae2f7bca550 268 if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 269 {
Azure.IoT Build 0:6ae2f7bca550 270 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 271 }
Azure.IoT Build 0:6ae2f7bca550 272
Azure.IoT Build 0:6ae2f7bca550 273 log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 274 }
Azure.IoT Build 0:6ae2f7bca550 275
Azure.IoT Build 0:6ae2f7bca550 276 if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
Azure.IoT Build 0:6ae2f7bca550 277 {
Azure.IoT Build 0:6ae2f7bca550 278 if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 279 {
Azure.IoT Build 0:6ae2f7bca550 280 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 281 }
Azure.IoT Build 0:6ae2f7bca550 282
Azure.IoT Build 0:6ae2f7bca550 283 log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 284 }
Azure.IoT Build 0:6ae2f7bca550 285
Azure.IoT Build 0:6ae2f7bca550 286 if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
Azure.IoT Build 0:6ae2f7bca550 287 {
Azure.IoT Build 0:6ae2f7bca550 288 if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 289 {
Azure.IoT Build 0:6ae2f7bca550 290 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 291 }
Azure.IoT Build 0:6ae2f7bca550 292
Azure.IoT Build 0:6ae2f7bca550 293 log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
Azure.IoT Build 0:6ae2f7bca550 294 }
Azure.IoT Build 0:6ae2f7bca550 295
Azure.IoT Build 0:6ae2f7bca550 296 if (result == SEND_ONE_MESSAGE_OK)
Azure.IoT Build 0:6ae2f7bca550 297 {
Azure.IoT Build 0:6ae2f7bca550 298 switch (message_body_type)
Azure.IoT Build 0:6ae2f7bca550 299 {
Azure.IoT Build 0:6ae2f7bca550 300 case MESSAGE_BODY_TYPE_VALUE:
Azure.IoT Build 0:6ae2f7bca550 301 {
Azure.IoT Build 0:6ae2f7bca550 302 if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 303 {
Azure.IoT Build 0:6ae2f7bca550 304 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 305 }
Azure.IoT Build 0:6ae2f7bca550 306
Azure.IoT Build 0:6ae2f7bca550 307 log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 308 break;
Azure.IoT Build 0:6ae2f7bca550 309 }
Azure.IoT Build 0:6ae2f7bca550 310 case MESSAGE_BODY_TYPE_DATA:
Azure.IoT Build 0:6ae2f7bca550 311 {
Azure.IoT Build 0:6ae2f7bca550 312 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 313 size_t i;
Azure.IoT Build 0:6ae2f7bca550 314
Azure.IoT Build 0:6ae2f7bca550 315 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 316 {
Azure.IoT Build 0:6ae2f7bca550 317 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 318 {
Azure.IoT Build 0:6ae2f7bca550 319 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 320 }
Azure.IoT Build 0:6ae2f7bca550 321 else
Azure.IoT Build 0:6ae2f7bca550 322 {
Azure.IoT Build 0:6ae2f7bca550 323 amqp_binary binary_value = { binary_data.bytes, binary_data.length };
Azure.IoT Build 0:6ae2f7bca550 324 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 325 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 326 {
Azure.IoT Build 0:6ae2f7bca550 327 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 328 }
Azure.IoT Build 0:6ae2f7bca550 329 else
Azure.IoT Build 0:6ae2f7bca550 330 {
Azure.IoT Build 0:6ae2f7bca550 331 if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 332 {
Azure.IoT Build 0:6ae2f7bca550 333 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 334 break;
Azure.IoT Build 0:6ae2f7bca550 335 }
Azure.IoT Build 0:6ae2f7bca550 336
Azure.IoT Build 0:6ae2f7bca550 337 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 338 }
Azure.IoT Build 0:6ae2f7bca550 339 }
Azure.IoT Build 0:6ae2f7bca550 340 }
Azure.IoT Build 0:6ae2f7bca550 341 break;
Azure.IoT Build 0:6ae2f7bca550 342 }
Azure.IoT Build 0:6ae2f7bca550 343 }
Azure.IoT Build 0:6ae2f7bca550 344 }
Azure.IoT Build 0:6ae2f7bca550 345
Azure.IoT Build 0:6ae2f7bca550 346 if (result == SEND_ONE_MESSAGE_OK)
Azure.IoT Build 0:6ae2f7bca550 347 {
Azure.IoT Build 0:6ae2f7bca550 348 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
Azure.IoT Build 0:6ae2f7bca550 349 switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
Azure.IoT Build 0:6ae2f7bca550 350 {
Azure.IoT Build 0:6ae2f7bca550 351 default:
Azure.IoT Build 0:6ae2f7bca550 352 case LINK_TRANSFER_ERROR:
Azure.IoT Build 0:6ae2f7bca550 353 if (message_with_callback->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 354 {
Azure.IoT Build 0:6ae2f7bca550 355 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 356 }
Azure.IoT Build 0:6ae2f7bca550 357
Azure.IoT Build 0:6ae2f7bca550 358 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 359 break;
Azure.IoT Build 0:6ae2f7bca550 360
Azure.IoT Build 0:6ae2f7bca550 361 case LINK_TRANSFER_BUSY:
Azure.IoT Build 0:6ae2f7bca550 362 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 363 result = SEND_ONE_MESSAGE_BUSY;
Azure.IoT Build 0:6ae2f7bca550 364 break;
Azure.IoT Build 0:6ae2f7bca550 365
Azure.IoT Build 0:6ae2f7bca550 366 case LINK_TRANSFER_OK:
Azure.IoT Build 0:6ae2f7bca550 367 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 368 break;
Azure.IoT Build 0:6ae2f7bca550 369 }
Azure.IoT Build 0:6ae2f7bca550 370 }
Azure.IoT Build 0:6ae2f7bca550 371
Azure.IoT Build 0:6ae2f7bca550 372 amqpalloc_free(data_bytes);
Azure.IoT Build 0:6ae2f7bca550 373
Azure.IoT Build 0:6ae2f7bca550 374 if (body_amqp_value != NULL)
Azure.IoT Build 0:6ae2f7bca550 375 {
Azure.IoT Build 0:6ae2f7bca550 376 amqpvalue_destroy(body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 377 }
Azure.IoT Build 0:6ae2f7bca550 378
Azure.IoT Build 0:6ae2f7bca550 379 amqpvalue_destroy(application_properties);
Azure.IoT Build 0:6ae2f7bca550 380 amqpvalue_destroy(application_properties_value);
Azure.IoT Build 0:6ae2f7bca550 381 amqpvalue_destroy(properties_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 382 properties_destroy(properties);
Azure.IoT Build 0:6ae2f7bca550 383 }
Azure.IoT Build 0:6ae2f7bca550 384 }
Azure.IoT Build 0:6ae2f7bca550 385
Azure.IoT Build 0:6ae2f7bca550 386 return result;
Azure.IoT Build 0:6ae2f7bca550 387 }
Azure.IoT Build 0:6ae2f7bca550 388
Azure.IoT Build 0:6ae2f7bca550 389 static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
Azure.IoT Build 0:6ae2f7bca550 390 {
Azure.IoT Build 0:6ae2f7bca550 391 size_t i;
Azure.IoT Build 0:6ae2f7bca550 392
Azure.IoT Build 0:6ae2f7bca550 393 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 394 {
Azure.IoT Build 0:6ae2f7bca550 395 if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
Azure.IoT Build 0:6ae2f7bca550 396 {
Azure.IoT Build 0:6ae2f7bca550 397 switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
Azure.IoT Build 0:6ae2f7bca550 398 {
Azure.IoT Build 0:6ae2f7bca550 399 default:
Azure.IoT Build 0:6ae2f7bca550 400 case SEND_ONE_MESSAGE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 401 {
Azure.IoT Build 0:6ae2f7bca550 402 ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 403 void* context = message_sender_instance->messages[i]->context;
Azure.IoT Build 0:6ae2f7bca550 404 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 405
Azure.IoT Build 0:6ae2f7bca550 406 on_message_send_complete(context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 407 i = message_sender_instance->message_count;
Azure.IoT Build 0:6ae2f7bca550 408 break;
Azure.IoT Build 0:6ae2f7bca550 409 }
Azure.IoT Build 0:6ae2f7bca550 410 case SEND_ONE_MESSAGE_BUSY:
Azure.IoT Build 0:6ae2f7bca550 411 i = message_sender_instance->message_count + 1;
Azure.IoT Build 0:6ae2f7bca550 412 break;
Azure.IoT Build 0:6ae2f7bca550 413
Azure.IoT Build 0:6ae2f7bca550 414 case SEND_ONE_MESSAGE_OK:
Azure.IoT Build 0:6ae2f7bca550 415 break;
Azure.IoT Build 0:6ae2f7bca550 416 }
Azure.IoT Build 0:6ae2f7bca550 417
Azure.IoT Build 0:6ae2f7bca550 418 i--;
Azure.IoT Build 0:6ae2f7bca550 419 }
Azure.IoT Build 0:6ae2f7bca550 420 }
Azure.IoT Build 0:6ae2f7bca550 421 }
Azure.IoT Build 0:6ae2f7bca550 422
Azure.IoT Build 0:6ae2f7bca550 423 static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
Azure.IoT Build 0:6ae2f7bca550 424 {
Azure.IoT Build 0:6ae2f7bca550 425 MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
Azure.IoT Build 0:6ae2f7bca550 426 message_sender_instance->message_sender_state = new_state;
Azure.IoT Build 0:6ae2f7bca550 427 if (message_sender_instance->on_message_sender_state_changed != NULL)
Azure.IoT Build 0:6ae2f7bca550 428 {
Azure.IoT Build 0:6ae2f7bca550 429 message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
Azure.IoT Build 0:6ae2f7bca550 430 }
Azure.IoT Build 0:6ae2f7bca550 431 }
Azure.IoT Build 0:6ae2f7bca550 432
Azure.IoT Build 0:6ae2f7bca550 433 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
Azure.IoT Build 0:6ae2f7bca550 434 {
Azure.IoT Build 0:6ae2f7bca550 435 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 436
Azure.IoT Build 0:6ae2f7bca550 437 switch (new_link_state)
Azure.IoT Build 0:6ae2f7bca550 438 {
Azure.IoT Build 0:6ae2f7bca550 439 case LINK_STATE_ATTACHED:
Azure.IoT Build 0:6ae2f7bca550 440 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
Azure.IoT Build 0:6ae2f7bca550 441 {
Azure.IoT Build 0:6ae2f7bca550 442 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
Azure.IoT Build 0:6ae2f7bca550 443 }
Azure.IoT Build 0:6ae2f7bca550 444 break;
Azure.IoT Build 0:6ae2f7bca550 445 case LINK_STATE_DETACHED:
Azure.IoT Build 0:6ae2f7bca550 446 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
Azure.IoT Build 0:6ae2f7bca550 447 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
Azure.IoT Build 0:6ae2f7bca550 448 {
Azure.IoT Build 0:6ae2f7bca550 449 /* User initiated transition, we should be good */
Azure.IoT Build 0:6ae2f7bca550 450 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
Azure.IoT Build 0:6ae2f7bca550 451 }
Azure.IoT Build 0:6ae2f7bca550 452 else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 453 {
Azure.IoT Build 0:6ae2f7bca550 454 /* Any other transition must be an error */
Azure.IoT Build 0:6ae2f7bca550 455 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 456 }
Azure.IoT Build 0:6ae2f7bca550 457 break;
Azure.IoT Build 0:6ae2f7bca550 458 }
Azure.IoT Build 0:6ae2f7bca550 459 }
Azure.IoT Build 0:6ae2f7bca550 460
Azure.IoT Build 0:6ae2f7bca550 461 static void on_link_flow_on(void* context)
Azure.IoT Build 0:6ae2f7bca550 462 {
Azure.IoT Build 0:6ae2f7bca550 463 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 464 send_all_pending_messages(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 465 }
Azure.IoT Build 0:6ae2f7bca550 466
Azure.IoT Build 0:6ae2f7bca550 467 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context, LOGGER_LOG logger_log)
Azure.IoT Build 0:6ae2f7bca550 468 {
Azure.IoT Build 0:6ae2f7bca550 469 MESSAGE_SENDER_INSTANCE* result = amqpalloc_malloc(sizeof(MESSAGE_SENDER_INSTANCE));
Azure.IoT Build 0:6ae2f7bca550 470 if (result != NULL)
Azure.IoT Build 0:6ae2f7bca550 471 {
Azure.IoT Build 0:6ae2f7bca550 472 result->messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 473 result->message_count = 0;
Azure.IoT Build 0:6ae2f7bca550 474 result->link = link;
Azure.IoT Build 0:6ae2f7bca550 475 result->on_message_sender_state_changed = on_message_sender_state_changed;
Azure.IoT Build 0:6ae2f7bca550 476 result->on_message_sender_state_changed_context = context;
Azure.IoT Build 0:6ae2f7bca550 477 result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
Azure.IoT Build 0:6ae2f7bca550 478 result->logger_log = logger_log;
Azure.IoT Build 0:6ae2f7bca550 479 }
Azure.IoT Build 0:6ae2f7bca550 480
Azure.IoT Build 0:6ae2f7bca550 481 return result;
Azure.IoT Build 0:6ae2f7bca550 482 }
Azure.IoT Build 0:6ae2f7bca550 483
Azure.IoT Build 0:6ae2f7bca550 484 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 485 {
Azure.IoT Build 0:6ae2f7bca550 486 if (message_sender != NULL)
Azure.IoT Build 0:6ae2f7bca550 487 {
Azure.IoT Build 0:6ae2f7bca550 488 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 489 size_t i;
Azure.IoT Build 0:6ae2f7bca550 490
Azure.IoT Build 0:6ae2f7bca550 491 messagesender_close(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 492
Azure.IoT Build 0:6ae2f7bca550 493 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 494 {
Azure.IoT Build 0:6ae2f7bca550 495 if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 496 {
Azure.IoT Build 0:6ae2f7bca550 497 message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 498 }
Azure.IoT Build 0:6ae2f7bca550 499
Azure.IoT Build 0:6ae2f7bca550 500 message_destroy(message_sender_instance->messages[i]->message);
Azure.IoT Build 0:6ae2f7bca550 501 amqpalloc_free(message_sender_instance->messages[i]);
Azure.IoT Build 0:6ae2f7bca550 502 }
Azure.IoT Build 0:6ae2f7bca550 503
Azure.IoT Build 0:6ae2f7bca550 504 if (message_sender_instance->messages != NULL)
Azure.IoT Build 0:6ae2f7bca550 505 {
Azure.IoT Build 0:6ae2f7bca550 506 amqpalloc_free(message_sender_instance->messages);
Azure.IoT Build 0:6ae2f7bca550 507 }
Azure.IoT Build 0:6ae2f7bca550 508
Azure.IoT Build 0:6ae2f7bca550 509 amqpalloc_free(message_sender);
Azure.IoT Build 0:6ae2f7bca550 510 }
Azure.IoT Build 0:6ae2f7bca550 511 }
Azure.IoT Build 0:6ae2f7bca550 512
Azure.IoT Build 0:6ae2f7bca550 513 int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 514 {
Azure.IoT Build 0:6ae2f7bca550 515 int result;
Azure.IoT Build 0:6ae2f7bca550 516
Azure.IoT Build 0:6ae2f7bca550 517 if (message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 518 {
Azure.IoT Build 0:6ae2f7bca550 519 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 520 }
Azure.IoT Build 0:6ae2f7bca550 521 else
Azure.IoT Build 0:6ae2f7bca550 522 {
Azure.IoT Build 0:6ae2f7bca550 523 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 524
Azure.IoT Build 0:6ae2f7bca550 525 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 526 {
Azure.IoT Build 0:6ae2f7bca550 527 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
Azure.IoT Build 0:6ae2f7bca550 528 if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
Azure.IoT Build 0:6ae2f7bca550 529 {
Azure.IoT Build 0:6ae2f7bca550 530 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 531 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 532 }
Azure.IoT Build 0:6ae2f7bca550 533 else
Azure.IoT Build 0:6ae2f7bca550 534 {
Azure.IoT Build 0:6ae2f7bca550 535 result = 0;
Azure.IoT Build 0:6ae2f7bca550 536 }
Azure.IoT Build 0:6ae2f7bca550 537 }
Azure.IoT Build 0:6ae2f7bca550 538 else
Azure.IoT Build 0:6ae2f7bca550 539 {
Azure.IoT Build 0:6ae2f7bca550 540 result = 0;
Azure.IoT Build 0:6ae2f7bca550 541 }
Azure.IoT Build 0:6ae2f7bca550 542 }
Azure.IoT Build 0:6ae2f7bca550 543
Azure.IoT Build 0:6ae2f7bca550 544 return result;
Azure.IoT Build 0:6ae2f7bca550 545 }
Azure.IoT Build 0:6ae2f7bca550 546
Azure.IoT Build 0:6ae2f7bca550 547 int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 548 {
Azure.IoT Build 0:6ae2f7bca550 549 int result;
Azure.IoT Build 0:6ae2f7bca550 550
Azure.IoT Build 0:6ae2f7bca550 551 if (message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 552 {
Azure.IoT Build 0:6ae2f7bca550 553 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 554 }
Azure.IoT Build 0:6ae2f7bca550 555 else
Azure.IoT Build 0:6ae2f7bca550 556 {
Azure.IoT Build 0:6ae2f7bca550 557 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 558
Azure.IoT Build 0:6ae2f7bca550 559 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
Azure.IoT Build 0:6ae2f7bca550 560 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
Azure.IoT Build 0:6ae2f7bca550 561 {
Azure.IoT Build 0:6ae2f7bca550 562 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
Azure.IoT Build 0:6ae2f7bca550 563 if (link_detach(message_sender_instance->link) != 0)
Azure.IoT Build 0:6ae2f7bca550 564 {
Azure.IoT Build 0:6ae2f7bca550 565 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 566 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 567 }
Azure.IoT Build 0:6ae2f7bca550 568 else
Azure.IoT Build 0:6ae2f7bca550 569 {
Azure.IoT Build 0:6ae2f7bca550 570 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
Azure.IoT Build 0:6ae2f7bca550 571 result = 0;
Azure.IoT Build 0:6ae2f7bca550 572 }
Azure.IoT Build 0:6ae2f7bca550 573 }
Azure.IoT Build 0:6ae2f7bca550 574 else
Azure.IoT Build 0:6ae2f7bca550 575 {
Azure.IoT Build 0:6ae2f7bca550 576 result = 0;
Azure.IoT Build 0:6ae2f7bca550 577 }
Azure.IoT Build 0:6ae2f7bca550 578 }
Azure.IoT Build 0:6ae2f7bca550 579
Azure.IoT Build 0:6ae2f7bca550 580 return result;
Azure.IoT Build 0:6ae2f7bca550 581 }
Azure.IoT Build 0:6ae2f7bca550 582
Azure.IoT Build 0:6ae2f7bca550 583 int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
Azure.IoT Build 0:6ae2f7bca550 584 {
Azure.IoT Build 0:6ae2f7bca550 585 int result;
Azure.IoT Build 0:6ae2f7bca550 586
Azure.IoT Build 0:6ae2f7bca550 587 if ((message_sender == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 588 (message == NULL))
Azure.IoT Build 0:6ae2f7bca550 589 {
Azure.IoT Build 0:6ae2f7bca550 590 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 591 }
Azure.IoT Build 0:6ae2f7bca550 592 else
Azure.IoT Build 0:6ae2f7bca550 593 {
Azure.IoT Build 0:6ae2f7bca550 594 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 595 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
Azure.IoT Build 0:6ae2f7bca550 596 {
Azure.IoT Build 0:6ae2f7bca550 597 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 598 }
Azure.IoT Build 0:6ae2f7bca550 599 else
Azure.IoT Build 0:6ae2f7bca550 600 {
Azure.IoT Build 0:6ae2f7bca550 601 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)amqpalloc_malloc(sizeof(MESSAGE_WITH_CALLBACK));
Azure.IoT Build 0:6ae2f7bca550 602 if (message_with_callback == NULL)
Azure.IoT Build 0:6ae2f7bca550 603 {
Azure.IoT Build 0:6ae2f7bca550 604 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 605 }
Azure.IoT Build 0:6ae2f7bca550 606 else
Azure.IoT Build 0:6ae2f7bca550 607 {
Azure.IoT Build 0:6ae2f7bca550 608 MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
Azure.IoT Build 0:6ae2f7bca550 609 if (new_messages == NULL)
Azure.IoT Build 0:6ae2f7bca550 610 {
Azure.IoT Build 0:6ae2f7bca550 611 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 612 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 613 }
Azure.IoT Build 0:6ae2f7bca550 614 else
Azure.IoT Build 0:6ae2f7bca550 615 {
Azure.IoT Build 0:6ae2f7bca550 616 result = 0;
Azure.IoT Build 0:6ae2f7bca550 617
Azure.IoT Build 0:6ae2f7bca550 618 message_sender_instance->messages = new_messages;
Azure.IoT Build 0:6ae2f7bca550 619 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
Azure.IoT Build 0:6ae2f7bca550 620 {
Azure.IoT Build 0:6ae2f7bca550 621 message_with_callback->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 622 if (message_with_callback->message == NULL)
Azure.IoT Build 0:6ae2f7bca550 623 {
Azure.IoT Build 0:6ae2f7bca550 624 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 625 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 626 }
Azure.IoT Build 0:6ae2f7bca550 627
Azure.IoT Build 0:6ae2f7bca550 628 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 629 }
Azure.IoT Build 0:6ae2f7bca550 630 else
Azure.IoT Build 0:6ae2f7bca550 631 {
Azure.IoT Build 0:6ae2f7bca550 632 message_with_callback->message = NULL;
Azure.IoT Build 0:6ae2f7bca550 633 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
Azure.IoT Build 0:6ae2f7bca550 634 }
Azure.IoT Build 0:6ae2f7bca550 635
Azure.IoT Build 0:6ae2f7bca550 636 if (result == 0)
Azure.IoT Build 0:6ae2f7bca550 637 {
Azure.IoT Build 0:6ae2f7bca550 638 message_with_callback->on_message_send_complete = on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 639 message_with_callback->context = callback_context;
Azure.IoT Build 0:6ae2f7bca550 640 message_with_callback->message_sender = message_sender_instance;
Azure.IoT Build 0:6ae2f7bca550 641
Azure.IoT Build 0:6ae2f7bca550 642 message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
Azure.IoT Build 0:6ae2f7bca550 643 message_sender_instance->message_count++;
Azure.IoT Build 0:6ae2f7bca550 644
Azure.IoT Build 0:6ae2f7bca550 645 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
Azure.IoT Build 0:6ae2f7bca550 646 {
Azure.IoT Build 0:6ae2f7bca550 647 switch (send_one_message(message_sender_instance, message_with_callback, message))
Azure.IoT Build 0:6ae2f7bca550 648 {
Azure.IoT Build 0:6ae2f7bca550 649 default:
Azure.IoT Build 0:6ae2f7bca550 650 case SEND_ONE_MESSAGE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 651 remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
Azure.IoT Build 0:6ae2f7bca550 652 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 653 break;
Azure.IoT Build 0:6ae2f7bca550 654
Azure.IoT Build 0:6ae2f7bca550 655 case SEND_ONE_MESSAGE_BUSY:
Azure.IoT Build 0:6ae2f7bca550 656 message_with_callback->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 657 if (message_with_callback->message == NULL)
Azure.IoT Build 0:6ae2f7bca550 658 {
Azure.IoT Build 0:6ae2f7bca550 659 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 660 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 661 }
Azure.IoT Build 0:6ae2f7bca550 662 else
Azure.IoT Build 0:6ae2f7bca550 663 {
Azure.IoT Build 0:6ae2f7bca550 664 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 665 result = 0;
Azure.IoT Build 0:6ae2f7bca550 666 }
Azure.IoT Build 0:6ae2f7bca550 667
Azure.IoT Build 0:6ae2f7bca550 668 break;
Azure.IoT Build 0:6ae2f7bca550 669
Azure.IoT Build 0:6ae2f7bca550 670 case SEND_ONE_MESSAGE_OK:
Azure.IoT Build 0:6ae2f7bca550 671 result = 0;
Azure.IoT Build 0:6ae2f7bca550 672 break;
Azure.IoT Build 0:6ae2f7bca550 673 }
Azure.IoT Build 0:6ae2f7bca550 674 }
Azure.IoT Build 0:6ae2f7bca550 675 }
Azure.IoT Build 0:6ae2f7bca550 676 }
Azure.IoT Build 0:6ae2f7bca550 677 }
Azure.IoT Build 0:6ae2f7bca550 678 }
Azure.IoT Build 0:6ae2f7bca550 679 }
Azure.IoT Build 0:6ae2f7bca550 680
Azure.IoT Build 0:6ae2f7bca550 681 return result;
Azure.IoT Build 0:6ae2f7bca550 682 }