A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Committer:
AzureIoTClient
Date:
Mon May 09 14:37:14 2016 -0700
Revision:
2:64b4feb67cd3
Parent:
1:eab586236bfe
Child:
5:ae49385aff34
1.0.6

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Azure.IoT Build 0:6ae2f7bca550 1 // Copyright (c) Microsoft. All rights reserved.
Azure.IoT Build 0:6ae2f7bca550 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
Azure.IoT Build 0:6ae2f7bca550 3
Azure.IoT Build 0:6ae2f7bca550 4 #include <stdlib.h>
Azure.IoT Build 0:6ae2f7bca550 5 #ifdef _CRTDBG_MAP_ALLOC
Azure.IoT Build 0:6ae2f7bca550 6 #include <crtdbg.h>
Azure.IoT Build 0:6ae2f7bca550 7 #endif
Azure.IoT Build 0:6ae2f7bca550 8 #include <string.h>
Azure.IoT Build 0:6ae2f7bca550 9 #include "azure_uamqp_c/message_sender.h"
Azure.IoT Build 0:6ae2f7bca550 10 #include "azure_uamqp_c/amqpalloc.h"
Azure.IoT Build 0:6ae2f7bca550 11 #include "azure_c_shared_utility/xlogging.h"
Azure.IoT Build 0:6ae2f7bca550 12 #include "azure_uamqp_c/amqpvalue_to_string.h"
Azure.IoT Build 0:6ae2f7bca550 13
Azure.IoT Build 0:6ae2f7bca550 14 typedef enum MESSAGE_SEND_STATE_TAG
Azure.IoT Build 0:6ae2f7bca550 15 {
Azure.IoT Build 0:6ae2f7bca550 16 MESSAGE_SEND_STATE_NOT_SENT,
Azure.IoT Build 0:6ae2f7bca550 17 MESSAGE_SEND_STATE_PENDING
Azure.IoT Build 0:6ae2f7bca550 18 } MESSAGE_SEND_STATE;
Azure.IoT Build 0:6ae2f7bca550 19
Azure.IoT Build 0:6ae2f7bca550 20 typedef enum SEND_ONE_MESSAGE_RESULT_TAG
Azure.IoT Build 0:6ae2f7bca550 21 {
Azure.IoT Build 0:6ae2f7bca550 22 SEND_ONE_MESSAGE_OK,
Azure.IoT Build 0:6ae2f7bca550 23 SEND_ONE_MESSAGE_ERROR,
Azure.IoT Build 0:6ae2f7bca550 24 SEND_ONE_MESSAGE_BUSY
Azure.IoT Build 0:6ae2f7bca550 25 } SEND_ONE_MESSAGE_RESULT;
Azure.IoT Build 0:6ae2f7bca550 26
Azure.IoT Build 0:6ae2f7bca550 27 typedef struct MESSAGE_WITH_CALLBACK_TAG
Azure.IoT Build 0:6ae2f7bca550 28 {
Azure.IoT Build 0:6ae2f7bca550 29 MESSAGE_HANDLE message;
Azure.IoT Build 0:6ae2f7bca550 30 ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 31 void* context;
Azure.IoT Build 0:6ae2f7bca550 32 MESSAGE_SENDER_HANDLE message_sender;
Azure.IoT Build 0:6ae2f7bca550 33 MESSAGE_SEND_STATE message_send_state;
Azure.IoT Build 0:6ae2f7bca550 34 } MESSAGE_WITH_CALLBACK;
Azure.IoT Build 0:6ae2f7bca550 35
Azure.IoT Build 0:6ae2f7bca550 36 typedef struct MESSAGE_SENDER_INSTANCE_TAG
Azure.IoT Build 0:6ae2f7bca550 37 {
Azure.IoT Build 0:6ae2f7bca550 38 LINK_HANDLE link;
Azure.IoT Build 0:6ae2f7bca550 39 size_t message_count;
Azure.IoT Build 0:6ae2f7bca550 40 MESSAGE_WITH_CALLBACK** messages;
Azure.IoT Build 0:6ae2f7bca550 41 MESSAGE_SENDER_STATE message_sender_state;
Azure.IoT Build 0:6ae2f7bca550 42 ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
Azure.IoT Build 0:6ae2f7bca550 43 void* on_message_sender_state_changed_context;
Azure.IoT Build 0:6ae2f7bca550 44 LOGGER_LOG logger_log;
Azure.IoT Build 0:6ae2f7bca550 45 } MESSAGE_SENDER_INSTANCE;
Azure.IoT Build 0:6ae2f7bca550 46
Azure.IoT Build 0:6ae2f7bca550 47 static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
Azure.IoT Build 0:6ae2f7bca550 48 {
Azure.IoT Build 0:6ae2f7bca550 49 MESSAGE_WITH_CALLBACK** new_messages;
Azure.IoT Build 0:6ae2f7bca550 50
Azure.IoT Build 0:6ae2f7bca550 51 if (message_sender_instance->messages[index]->message != NULL)
Azure.IoT Build 0:6ae2f7bca550 52 {
Azure.IoT Build 0:6ae2f7bca550 53 message_destroy(message_sender_instance->messages[index]->message);
Azure.IoT Build 0:6ae2f7bca550 54 message_sender_instance->messages[index]->message = NULL;
Azure.IoT Build 0:6ae2f7bca550 55 }
Azure.IoT Build 0:6ae2f7bca550 56
Azure.IoT Build 0:6ae2f7bca550 57 amqpalloc_free(message_sender_instance->messages[index]);
Azure.IoT Build 0:6ae2f7bca550 58
Azure.IoT Build 0:6ae2f7bca550 59 if (message_sender_instance->message_count - index > 1)
Azure.IoT Build 0:6ae2f7bca550 60 {
Azure.IoT Build 0:6ae2f7bca550 61 (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
Azure.IoT Build 0:6ae2f7bca550 62 }
Azure.IoT Build 0:6ae2f7bca550 63
Azure.IoT Build 0:6ae2f7bca550 64 message_sender_instance->message_count--;
Azure.IoT Build 0:6ae2f7bca550 65
Azure.IoT Build 0:6ae2f7bca550 66 if (message_sender_instance->message_count > 0)
Azure.IoT Build 0:6ae2f7bca550 67 {
Azure.IoT Build 0:6ae2f7bca550 68 new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
Azure.IoT Build 0:6ae2f7bca550 69 if (new_messages != NULL)
Azure.IoT Build 0:6ae2f7bca550 70 {
Azure.IoT Build 0:6ae2f7bca550 71 message_sender_instance->messages = new_messages;
Azure.IoT Build 0:6ae2f7bca550 72 }
Azure.IoT Build 0:6ae2f7bca550 73 }
Azure.IoT Build 0:6ae2f7bca550 74 else
Azure.IoT Build 0:6ae2f7bca550 75 {
Azure.IoT Build 0:6ae2f7bca550 76 amqpalloc_free(message_sender_instance->messages);
Azure.IoT Build 0:6ae2f7bca550 77 message_sender_instance->messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 78 }
Azure.IoT Build 0:6ae2f7bca550 79 }
Azure.IoT Build 0:6ae2f7bca550 80
Azure.IoT Build 0:6ae2f7bca550 81 static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 82 {
Azure.IoT Build 0:6ae2f7bca550 83 size_t i;
Azure.IoT Build 0:6ae2f7bca550 84
Azure.IoT Build 0:6ae2f7bca550 85 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 86 {
Azure.IoT Build 0:6ae2f7bca550 87 if (message_sender_instance->messages[i] == message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 88 {
Azure.IoT Build 0:6ae2f7bca550 89 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 90 break;
Azure.IoT Build 0:6ae2f7bca550 91 }
Azure.IoT Build 0:6ae2f7bca550 92 }
Azure.IoT Build 0:6ae2f7bca550 93 }
Azure.IoT Build 0:6ae2f7bca550 94
Azure.IoT Build 0:6ae2f7bca550 95 static void on_delivery_settled(void* context, delivery_number delivery_no)
Azure.IoT Build 0:6ae2f7bca550 96 {
Azure.IoT Build 0:6ae2f7bca550 97 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
Azure.IoT Build 0:6ae2f7bca550 98 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
Azure.IoT Build 0:6ae2f7bca550 99
Azure.IoT Build 0:6ae2f7bca550 100 if (message_with_callback->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 101 {
Azure.IoT Build 0:6ae2f7bca550 102 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
Azure.IoT Build 0:6ae2f7bca550 103 }
Azure.IoT Build 0:6ae2f7bca550 104
Azure.IoT Build 0:6ae2f7bca550 105 remove_pending_message(message_sender_instance, message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 106 }
Azure.IoT Build 0:6ae2f7bca550 107
Azure.IoT Build 0:6ae2f7bca550 108 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
Azure.IoT Build 0:6ae2f7bca550 109 {
Azure.IoT Build 0:6ae2f7bca550 110 PAYLOAD* payload = (PAYLOAD*)context;
Azure.IoT Build 0:6ae2f7bca550 111 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
Azure.IoT Build 0:6ae2f7bca550 112 payload->length += length;
Azure.IoT Build 0:6ae2f7bca550 113 return 0;
Azure.IoT Build 0:6ae2f7bca550 114 }
Azure.IoT Build 0:6ae2f7bca550 115
Azure.IoT Build 0:6ae2f7bca550 116 static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
Azure.IoT Build 0:6ae2f7bca550 117 {
Azure.IoT Build 0:6ae2f7bca550 118 if (message_sender_instance->logger_log != NULL)
Azure.IoT Build 0:6ae2f7bca550 119 {
Azure.IoT Build 0:6ae2f7bca550 120 char* value_as_string = NULL;
Azure.IoT Build 0:6ae2f7bca550 121 LOG(message_sender_instance->logger_log, 0, "%s", name);
Azure.IoT Build 0:6ae2f7bca550 122 LOG(message_sender_instance->logger_log, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
Azure.IoT Build 0:6ae2f7bca550 123 if (value_as_string != NULL)
Azure.IoT Build 0:6ae2f7bca550 124 {
Azure.IoT Build 0:6ae2f7bca550 125 amqpalloc_free(value_as_string);
Azure.IoT Build 0:6ae2f7bca550 126 }
Azure.IoT Build 0:6ae2f7bca550 127 }
Azure.IoT Build 0:6ae2f7bca550 128 }
Azure.IoT Build 0:6ae2f7bca550 129
Azure.IoT Build 0:6ae2f7bca550 130 static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
Azure.IoT Build 0:6ae2f7bca550 131 {
Azure.IoT Build 0:6ae2f7bca550 132 SEND_ONE_MESSAGE_RESULT result;
Azure.IoT Build 0:6ae2f7bca550 133
Azure.IoT Build 0:6ae2f7bca550 134 size_t encoded_size;
Azure.IoT Build 0:6ae2f7bca550 135 size_t total_encoded_size = 0;
Azure.IoT Build 0:6ae2f7bca550 136 MESSAGE_BODY_TYPE message_body_type;
Azure.IoT Build 0:6ae2f7bca550 137 message_format message_format;
Azure.IoT Build 0:6ae2f7bca550 138
Azure.IoT Build 0:6ae2f7bca550 139 if ((message_get_body_type(message, &message_body_type) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 140 (message_get_message_format(message, &message_format) != 0))
Azure.IoT Build 0:6ae2f7bca550 141 {
Azure.IoT Build 0:6ae2f7bca550 142 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 143 }
Azure.IoT Build 0:6ae2f7bca550 144 else
Azure.IoT Build 0:6ae2f7bca550 145 {
Azure.IoT Build 0:6ae2f7bca550 146 // header
Azure.IoT Build 0:6ae2f7bca550 147 HEADER_HANDLE header;
Azure.IoT Build 0:6ae2f7bca550 148 AMQP_VALUE header_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 149 PROPERTIES_HANDLE properties;
Azure.IoT Build 0:6ae2f7bca550 150 AMQP_VALUE properties_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 151 AMQP_VALUE application_properties;
Azure.IoT Build 0:6ae2f7bca550 152 AMQP_VALUE application_properties_value;
Azure.IoT Build 0:6ae2f7bca550 153 AMQP_VALUE body_amqp_value = NULL;
Azure.IoT Build 0:6ae2f7bca550 154 size_t body_data_count;
Azure.IoT Build 0:6ae2f7bca550 155
Azure.IoT Build 0:6ae2f7bca550 156 message_get_header(message, &header);
Azure.IoT Build 0:6ae2f7bca550 157 header_amqp_value = amqpvalue_create_header(header);
Azure.IoT Build 0:6ae2f7bca550 158 if (header != NULL)
Azure.IoT Build 0:6ae2f7bca550 159 {
Azure.IoT Build 0:6ae2f7bca550 160 amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 161 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 162 }
Azure.IoT Build 0:6ae2f7bca550 163
Azure.IoT Build 0:6ae2f7bca550 164 // properties
Azure.IoT Build 0:6ae2f7bca550 165 message_get_properties(message, &properties);
Azure.IoT Build 0:6ae2f7bca550 166 properties_amqp_value = amqpvalue_create_properties(properties);
Azure.IoT Build 0:6ae2f7bca550 167 if (properties != NULL)
Azure.IoT Build 0:6ae2f7bca550 168 {
Azure.IoT Build 0:6ae2f7bca550 169 amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 170 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 171 }
Azure.IoT Build 0:6ae2f7bca550 172
Azure.IoT Build 0:6ae2f7bca550 173 // application properties
Azure.IoT Build 0:6ae2f7bca550 174 message_get_application_properties(message, &application_properties);
Azure.IoT Build 0:6ae2f7bca550 175 application_properties_value = amqpvalue_create_application_properties(application_properties);
Azure.IoT Build 0:6ae2f7bca550 176 if (application_properties != NULL)
Azure.IoT Build 0:6ae2f7bca550 177 {
Azure.IoT Build 0:6ae2f7bca550 178 amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 179 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 180 }
Azure.IoT Build 0:6ae2f7bca550 181
Azure.IoT Build 0:6ae2f7bca550 182 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 183
Azure.IoT Build 0:6ae2f7bca550 184 // body - amqp data
Azure.IoT Build 0:6ae2f7bca550 185 switch (message_body_type)
Azure.IoT Build 0:6ae2f7bca550 186 {
Azure.IoT Build 0:6ae2f7bca550 187 default:
Azure.IoT Build 0:6ae2f7bca550 188 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 189 break;
Azure.IoT Build 0:6ae2f7bca550 190
Azure.IoT Build 0:6ae2f7bca550 191 case MESSAGE_BODY_TYPE_VALUE:
Azure.IoT Build 0:6ae2f7bca550 192 {
Azure.IoT Build 0:6ae2f7bca550 193 AMQP_VALUE message_body_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 194 if (message_get_inplace_body_amqp_value(message, &message_body_amqp_value) != 0)
Azure.IoT Build 0:6ae2f7bca550 195 {
Azure.IoT Build 0:6ae2f7bca550 196 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 197 }
Azure.IoT Build 0:6ae2f7bca550 198 else
Azure.IoT Build 0:6ae2f7bca550 199 {
Azure.IoT Build 0:6ae2f7bca550 200 body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 201 if ((body_amqp_value == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 202 (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
Azure.IoT Build 0:6ae2f7bca550 203 {
Azure.IoT Build 0:6ae2f7bca550 204 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 205 }
Azure.IoT Build 0:6ae2f7bca550 206 else
Azure.IoT Build 0:6ae2f7bca550 207 {
Azure.IoT Build 0:6ae2f7bca550 208 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 209 }
Azure.IoT Build 0:6ae2f7bca550 210 }
Azure.IoT Build 0:6ae2f7bca550 211
Azure.IoT Build 0:6ae2f7bca550 212 break;
Azure.IoT Build 0:6ae2f7bca550 213 }
Azure.IoT Build 0:6ae2f7bca550 214
Azure.IoT Build 0:6ae2f7bca550 215 case MESSAGE_BODY_TYPE_DATA:
Azure.IoT Build 0:6ae2f7bca550 216 {
Azure.IoT Build 0:6ae2f7bca550 217 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 218 size_t i;
Azure.IoT Build 0:6ae2f7bca550 219
Azure.IoT Build 0:6ae2f7bca550 220 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
Azure.IoT Build 0:6ae2f7bca550 221 {
Azure.IoT Build 0:6ae2f7bca550 222 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 223 }
Azure.IoT Build 0:6ae2f7bca550 224 else
Azure.IoT Build 0:6ae2f7bca550 225 {
Azure.IoT Build 0:6ae2f7bca550 226 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 227 {
Azure.IoT Build 0:6ae2f7bca550 228 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 229 {
Azure.IoT Build 0:6ae2f7bca550 230 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 231 }
Azure.IoT Build 0:6ae2f7bca550 232 else
Azure.IoT Build 0:6ae2f7bca550 233 {
Azure.IoT Build 0:6ae2f7bca550 234 amqp_binary binary_value = { binary_data.bytes, binary_data.length };
Azure.IoT Build 0:6ae2f7bca550 235 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 236 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 237 {
Azure.IoT Build 0:6ae2f7bca550 238 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 239 }
Azure.IoT Build 0:6ae2f7bca550 240 else
Azure.IoT Build 0:6ae2f7bca550 241 {
Azure.IoT Build 0:6ae2f7bca550 242 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
Azure.IoT Build 0:6ae2f7bca550 243 {
Azure.IoT Build 0:6ae2f7bca550 244 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 245 }
Azure.IoT Build 0:6ae2f7bca550 246 else
Azure.IoT Build 0:6ae2f7bca550 247 {
Azure.IoT Build 0:6ae2f7bca550 248 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 249 }
Azure.IoT Build 0:6ae2f7bca550 250
Azure.IoT Build 0:6ae2f7bca550 251 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 252 }
Azure.IoT Build 0:6ae2f7bca550 253 }
Azure.IoT Build 0:6ae2f7bca550 254 }
Azure.IoT Build 0:6ae2f7bca550 255 }
Azure.IoT Build 0:6ae2f7bca550 256 break;
Azure.IoT Build 0:6ae2f7bca550 257 }
Azure.IoT Build 0:6ae2f7bca550 258 }
Azure.IoT Build 0:6ae2f7bca550 259
Azure.IoT Build 0:6ae2f7bca550 260 if (result == 0)
Azure.IoT Build 0:6ae2f7bca550 261 {
Azure.IoT Build 0:6ae2f7bca550 262 void* data_bytes = amqpalloc_malloc(total_encoded_size);
Azure.IoT Build 0:6ae2f7bca550 263 PAYLOAD payload = { data_bytes, 0 };
Azure.IoT Build 0:6ae2f7bca550 264 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 265
Azure.IoT Build 0:6ae2f7bca550 266 if (header != NULL)
Azure.IoT Build 0:6ae2f7bca550 267 {
Azure.IoT Build 0:6ae2f7bca550 268 if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 269 {
Azure.IoT Build 0:6ae2f7bca550 270 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 271 }
Azure.IoT Build 0:6ae2f7bca550 272
Azure.IoT Build 0:6ae2f7bca550 273 log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 274 }
Azure.IoT Build 0:6ae2f7bca550 275
Azure.IoT Build 0:6ae2f7bca550 276 if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
Azure.IoT Build 0:6ae2f7bca550 277 {
Azure.IoT Build 0:6ae2f7bca550 278 if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 279 {
Azure.IoT Build 0:6ae2f7bca550 280 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 281 }
Azure.IoT Build 0:6ae2f7bca550 282
Azure.IoT Build 0:6ae2f7bca550 283 log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 284 }
Azure.IoT Build 0:6ae2f7bca550 285
Azure.IoT Build 0:6ae2f7bca550 286 if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
Azure.IoT Build 0:6ae2f7bca550 287 {
Azure.IoT Build 0:6ae2f7bca550 288 if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 289 {
Azure.IoT Build 0:6ae2f7bca550 290 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 291 }
Azure.IoT Build 0:6ae2f7bca550 292
Azure.IoT Build 0:6ae2f7bca550 293 log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
Azure.IoT Build 0:6ae2f7bca550 294 }
Azure.IoT Build 0:6ae2f7bca550 295
Azure.IoT Build 0:6ae2f7bca550 296 if (result == SEND_ONE_MESSAGE_OK)
Azure.IoT Build 0:6ae2f7bca550 297 {
Azure.IoT Build 0:6ae2f7bca550 298 switch (message_body_type)
Azure.IoT Build 0:6ae2f7bca550 299 {
Azure.IoT Build 0:6ae2f7bca550 300 case MESSAGE_BODY_TYPE_VALUE:
Azure.IoT Build 0:6ae2f7bca550 301 {
Azure.IoT Build 0:6ae2f7bca550 302 if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 303 {
Azure.IoT Build 0:6ae2f7bca550 304 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 305 }
Azure.IoT Build 0:6ae2f7bca550 306
Azure.IoT Build 0:6ae2f7bca550 307 log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 308 break;
Azure.IoT Build 0:6ae2f7bca550 309 }
Azure.IoT Build 0:6ae2f7bca550 310 case MESSAGE_BODY_TYPE_DATA:
Azure.IoT Build 0:6ae2f7bca550 311 {
Azure.IoT Build 0:6ae2f7bca550 312 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 313 size_t i;
Azure.IoT Build 0:6ae2f7bca550 314
Azure.IoT Build 0:6ae2f7bca550 315 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 316 {
Azure.IoT Build 0:6ae2f7bca550 317 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 318 {
Azure.IoT Build 0:6ae2f7bca550 319 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 320 }
Azure.IoT Build 0:6ae2f7bca550 321 else
Azure.IoT Build 0:6ae2f7bca550 322 {
Azure.IoT Build 0:6ae2f7bca550 323 amqp_binary binary_value = { binary_data.bytes, binary_data.length };
Azure.IoT Build 0:6ae2f7bca550 324 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 325 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 326 {
Azure.IoT Build 0:6ae2f7bca550 327 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 328 }
Azure.IoT Build 0:6ae2f7bca550 329 else
Azure.IoT Build 0:6ae2f7bca550 330 {
Azure.IoT Build 0:6ae2f7bca550 331 if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 332 {
Azure.IoT Build 0:6ae2f7bca550 333 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 334 break;
Azure.IoT Build 0:6ae2f7bca550 335 }
Azure.IoT Build 0:6ae2f7bca550 336
Azure.IoT Build 0:6ae2f7bca550 337 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 338 }
Azure.IoT Build 0:6ae2f7bca550 339 }
Azure.IoT Build 0:6ae2f7bca550 340 }
Azure.IoT Build 0:6ae2f7bca550 341 break;
Azure.IoT Build 0:6ae2f7bca550 342 }
Azure.IoT Build 0:6ae2f7bca550 343 }
Azure.IoT Build 0:6ae2f7bca550 344 }
Azure.IoT Build 0:6ae2f7bca550 345
Azure.IoT Build 0:6ae2f7bca550 346 if (result == SEND_ONE_MESSAGE_OK)
Azure.IoT Build 0:6ae2f7bca550 347 {
Azure.IoT Build 0:6ae2f7bca550 348 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
Azure.IoT Build 0:6ae2f7bca550 349 switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
Azure.IoT Build 0:6ae2f7bca550 350 {
Azure.IoT Build 0:6ae2f7bca550 351 default:
Azure.IoT Build 0:6ae2f7bca550 352 case LINK_TRANSFER_ERROR:
Azure.IoT Build 0:6ae2f7bca550 353 if (message_with_callback->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 354 {
Azure.IoT Build 0:6ae2f7bca550 355 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 356 }
Azure.IoT Build 0:6ae2f7bca550 357
Azure.IoT Build 0:6ae2f7bca550 358 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 359 break;
Azure.IoT Build 0:6ae2f7bca550 360
Azure.IoT Build 0:6ae2f7bca550 361 case LINK_TRANSFER_BUSY:
Azure.IoT Build 0:6ae2f7bca550 362 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 363 result = SEND_ONE_MESSAGE_BUSY;
Azure.IoT Build 0:6ae2f7bca550 364 break;
Azure.IoT Build 0:6ae2f7bca550 365
Azure.IoT Build 0:6ae2f7bca550 366 case LINK_TRANSFER_OK:
Azure.IoT Build 0:6ae2f7bca550 367 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 368 break;
Azure.IoT Build 0:6ae2f7bca550 369 }
Azure.IoT Build 0:6ae2f7bca550 370 }
Azure.IoT Build 0:6ae2f7bca550 371
Azure.IoT Build 0:6ae2f7bca550 372 amqpalloc_free(data_bytes);
Azure.IoT Build 0:6ae2f7bca550 373
Azure.IoT Build 0:6ae2f7bca550 374 if (body_amqp_value != NULL)
Azure.IoT Build 0:6ae2f7bca550 375 {
Azure.IoT Build 0:6ae2f7bca550 376 amqpvalue_destroy(body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 377 }
Azure.IoT Build 0:6ae2f7bca550 378
Azure.IoT Build 0:6ae2f7bca550 379 amqpvalue_destroy(application_properties);
Azure.IoT Build 0:6ae2f7bca550 380 amqpvalue_destroy(application_properties_value);
Azure.IoT Build 0:6ae2f7bca550 381 amqpvalue_destroy(properties_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 382 properties_destroy(properties);
Azure.IoT Build 0:6ae2f7bca550 383 }
Azure.IoT Build 0:6ae2f7bca550 384 }
Azure.IoT Build 0:6ae2f7bca550 385
Azure.IoT Build 0:6ae2f7bca550 386 return result;
Azure.IoT Build 0:6ae2f7bca550 387 }
Azure.IoT Build 0:6ae2f7bca550 388
Azure.IoT Build 0:6ae2f7bca550 389 static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
Azure.IoT Build 0:6ae2f7bca550 390 {
Azure.IoT Build 0:6ae2f7bca550 391 size_t i;
Azure.IoT Build 0:6ae2f7bca550 392
Azure.IoT Build 0:6ae2f7bca550 393 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 394 {
Azure.IoT Build 0:6ae2f7bca550 395 if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
Azure.IoT Build 0:6ae2f7bca550 396 {
Azure.IoT Build 0:6ae2f7bca550 397 switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
Azure.IoT Build 0:6ae2f7bca550 398 {
Azure.IoT Build 0:6ae2f7bca550 399 default:
Azure.IoT Build 0:6ae2f7bca550 400 case SEND_ONE_MESSAGE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 401 {
Azure.IoT Build 0:6ae2f7bca550 402 ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 403 void* context = message_sender_instance->messages[i]->context;
Azure.IoT Build 0:6ae2f7bca550 404 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 405
Azure.IoT Build 0:6ae2f7bca550 406 on_message_send_complete(context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 407 i = message_sender_instance->message_count;
Azure.IoT Build 0:6ae2f7bca550 408 break;
Azure.IoT Build 0:6ae2f7bca550 409 }
Azure.IoT Build 0:6ae2f7bca550 410 case SEND_ONE_MESSAGE_BUSY:
Azure.IoT Build 0:6ae2f7bca550 411 i = message_sender_instance->message_count + 1;
Azure.IoT Build 0:6ae2f7bca550 412 break;
Azure.IoT Build 0:6ae2f7bca550 413
Azure.IoT Build 0:6ae2f7bca550 414 case SEND_ONE_MESSAGE_OK:
Azure.IoT Build 0:6ae2f7bca550 415 break;
Azure.IoT Build 0:6ae2f7bca550 416 }
Azure.IoT Build 0:6ae2f7bca550 417
Azure.IoT Build 0:6ae2f7bca550 418 i--;
Azure.IoT Build 0:6ae2f7bca550 419 }
Azure.IoT Build 0:6ae2f7bca550 420 }
Azure.IoT Build 0:6ae2f7bca550 421 }
Azure.IoT Build 0:6ae2f7bca550 422
Azure.IoT Build 0:6ae2f7bca550 423 static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
Azure.IoT Build 0:6ae2f7bca550 424 {
Azure.IoT Build 0:6ae2f7bca550 425 MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
Azure.IoT Build 0:6ae2f7bca550 426 message_sender_instance->message_sender_state = new_state;
Azure.IoT Build 0:6ae2f7bca550 427 if (message_sender_instance->on_message_sender_state_changed != NULL)
Azure.IoT Build 0:6ae2f7bca550 428 {
Azure.IoT Build 0:6ae2f7bca550 429 message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
Azure.IoT Build 0:6ae2f7bca550 430 }
Azure.IoT Build 0:6ae2f7bca550 431 }
Azure.IoT Build 0:6ae2f7bca550 432
AzureIoTClient 2:64b4feb67cd3 433 static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
AzureIoTClient 2:64b4feb67cd3 434 {
AzureIoTClient 2:64b4feb67cd3 435 size_t i;
AzureIoTClient 2:64b4feb67cd3 436
AzureIoTClient 2:64b4feb67cd3 437 for (i = 0; i < message_sender_instance->message_count; i++)
AzureIoTClient 2:64b4feb67cd3 438 {
AzureIoTClient 2:64b4feb67cd3 439 if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
AzureIoTClient 2:64b4feb67cd3 440 {
AzureIoTClient 2:64b4feb67cd3 441 message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
AzureIoTClient 2:64b4feb67cd3 442 }
AzureIoTClient 2:64b4feb67cd3 443
AzureIoTClient 2:64b4feb67cd3 444 message_destroy(message_sender_instance->messages[i]->message);
AzureIoTClient 2:64b4feb67cd3 445 amqpalloc_free(message_sender_instance->messages[i]);
AzureIoTClient 2:64b4feb67cd3 446 }
AzureIoTClient 2:64b4feb67cd3 447
AzureIoTClient 2:64b4feb67cd3 448 if (message_sender_instance->messages != NULL)
AzureIoTClient 2:64b4feb67cd3 449 {
AzureIoTClient 2:64b4feb67cd3 450 message_sender_instance->message_count = 0;
AzureIoTClient 2:64b4feb67cd3 451
AzureIoTClient 2:64b4feb67cd3 452 amqpalloc_free(message_sender_instance->messages);
AzureIoTClient 2:64b4feb67cd3 453 message_sender_instance->messages = NULL;
AzureIoTClient 2:64b4feb67cd3 454 }
AzureIoTClient 2:64b4feb67cd3 455 }
AzureIoTClient 2:64b4feb67cd3 456
Azure.IoT Build 0:6ae2f7bca550 457 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
Azure.IoT Build 0:6ae2f7bca550 458 {
Azure.IoT Build 0:6ae2f7bca550 459 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 460
Azure.IoT Build 0:6ae2f7bca550 461 switch (new_link_state)
Azure.IoT Build 0:6ae2f7bca550 462 {
Azure.IoT Build 0:6ae2f7bca550 463 case LINK_STATE_ATTACHED:
Azure.IoT Build 0:6ae2f7bca550 464 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
Azure.IoT Build 0:6ae2f7bca550 465 {
Azure.IoT Build 0:6ae2f7bca550 466 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
Azure.IoT Build 0:6ae2f7bca550 467 }
Azure.IoT Build 0:6ae2f7bca550 468 break;
Azure.IoT Build 0:6ae2f7bca550 469 case LINK_STATE_DETACHED:
Azure.IoT Build 0:6ae2f7bca550 470 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
Azure.IoT Build 0:6ae2f7bca550 471 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
Azure.IoT Build 0:6ae2f7bca550 472 {
Azure.IoT Build 0:6ae2f7bca550 473 /* User initiated transition, we should be good */
AzureIoTClient 2:64b4feb67cd3 474 indicate_all_messages_as_error(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 475 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
Azure.IoT Build 0:6ae2f7bca550 476 }
Azure.IoT Build 0:6ae2f7bca550 477 else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 478 {
Azure.IoT Build 0:6ae2f7bca550 479 /* Any other transition must be an error */
Azure.IoT Build 0:6ae2f7bca550 480 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 481 }
Azure.IoT Build 0:6ae2f7bca550 482 break;
AzureIoTClient 1:eab586236bfe 483 case LINK_STATE_ERROR:
AzureIoTClient 1:eab586236bfe 484 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
AzureIoTClient 1:eab586236bfe 485 {
AzureIoTClient 2:64b4feb67cd3 486 indicate_all_messages_as_error(message_sender_instance);
AzureIoTClient 1:eab586236bfe 487 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
AzureIoTClient 1:eab586236bfe 488 }
AzureIoTClient 1:eab586236bfe 489 break;
Azure.IoT Build 0:6ae2f7bca550 490 }
Azure.IoT Build 0:6ae2f7bca550 491 }
Azure.IoT Build 0:6ae2f7bca550 492
Azure.IoT Build 0:6ae2f7bca550 493 static void on_link_flow_on(void* context)
Azure.IoT Build 0:6ae2f7bca550 494 {
Azure.IoT Build 0:6ae2f7bca550 495 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 496 send_all_pending_messages(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 497 }
Azure.IoT Build 0:6ae2f7bca550 498
Azure.IoT Build 0:6ae2f7bca550 499 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context, LOGGER_LOG logger_log)
Azure.IoT Build 0:6ae2f7bca550 500 {
Azure.IoT Build 0:6ae2f7bca550 501 MESSAGE_SENDER_INSTANCE* result = amqpalloc_malloc(sizeof(MESSAGE_SENDER_INSTANCE));
Azure.IoT Build 0:6ae2f7bca550 502 if (result != NULL)
Azure.IoT Build 0:6ae2f7bca550 503 {
Azure.IoT Build 0:6ae2f7bca550 504 result->messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 505 result->message_count = 0;
Azure.IoT Build 0:6ae2f7bca550 506 result->link = link;
Azure.IoT Build 0:6ae2f7bca550 507 result->on_message_sender_state_changed = on_message_sender_state_changed;
Azure.IoT Build 0:6ae2f7bca550 508 result->on_message_sender_state_changed_context = context;
Azure.IoT Build 0:6ae2f7bca550 509 result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
Azure.IoT Build 0:6ae2f7bca550 510 result->logger_log = logger_log;
Azure.IoT Build 0:6ae2f7bca550 511 }
Azure.IoT Build 0:6ae2f7bca550 512
Azure.IoT Build 0:6ae2f7bca550 513 return result;
Azure.IoT Build 0:6ae2f7bca550 514 }
Azure.IoT Build 0:6ae2f7bca550 515
Azure.IoT Build 0:6ae2f7bca550 516 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 517 {
Azure.IoT Build 0:6ae2f7bca550 518 if (message_sender != NULL)
Azure.IoT Build 0:6ae2f7bca550 519 {
Azure.IoT Build 0:6ae2f7bca550 520 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 521
Azure.IoT Build 0:6ae2f7bca550 522 messagesender_close(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 523
AzureIoTClient 2:64b4feb67cd3 524 indicate_all_messages_as_error(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 525
Azure.IoT Build 0:6ae2f7bca550 526 amqpalloc_free(message_sender);
Azure.IoT Build 0:6ae2f7bca550 527 }
Azure.IoT Build 0:6ae2f7bca550 528 }
Azure.IoT Build 0:6ae2f7bca550 529
Azure.IoT Build 0:6ae2f7bca550 530 int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 531 {
Azure.IoT Build 0:6ae2f7bca550 532 int result;
Azure.IoT Build 0:6ae2f7bca550 533
Azure.IoT Build 0:6ae2f7bca550 534 if (message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 535 {
Azure.IoT Build 0:6ae2f7bca550 536 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 537 }
Azure.IoT Build 0:6ae2f7bca550 538 else
Azure.IoT Build 0:6ae2f7bca550 539 {
Azure.IoT Build 0:6ae2f7bca550 540 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 541
Azure.IoT Build 0:6ae2f7bca550 542 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 543 {
Azure.IoT Build 0:6ae2f7bca550 544 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
Azure.IoT Build 0:6ae2f7bca550 545 if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
Azure.IoT Build 0:6ae2f7bca550 546 {
Azure.IoT Build 0:6ae2f7bca550 547 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 548 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 549 }
Azure.IoT Build 0:6ae2f7bca550 550 else
Azure.IoT Build 0:6ae2f7bca550 551 {
Azure.IoT Build 0:6ae2f7bca550 552 result = 0;
Azure.IoT Build 0:6ae2f7bca550 553 }
Azure.IoT Build 0:6ae2f7bca550 554 }
Azure.IoT Build 0:6ae2f7bca550 555 else
Azure.IoT Build 0:6ae2f7bca550 556 {
Azure.IoT Build 0:6ae2f7bca550 557 result = 0;
Azure.IoT Build 0:6ae2f7bca550 558 }
Azure.IoT Build 0:6ae2f7bca550 559 }
Azure.IoT Build 0:6ae2f7bca550 560
Azure.IoT Build 0:6ae2f7bca550 561 return result;
Azure.IoT Build 0:6ae2f7bca550 562 }
Azure.IoT Build 0:6ae2f7bca550 563
Azure.IoT Build 0:6ae2f7bca550 564 int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 565 {
Azure.IoT Build 0:6ae2f7bca550 566 int result;
Azure.IoT Build 0:6ae2f7bca550 567
Azure.IoT Build 0:6ae2f7bca550 568 if (message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 569 {
Azure.IoT Build 0:6ae2f7bca550 570 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 571 }
Azure.IoT Build 0:6ae2f7bca550 572 else
Azure.IoT Build 0:6ae2f7bca550 573 {
Azure.IoT Build 0:6ae2f7bca550 574 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 575
Azure.IoT Build 0:6ae2f7bca550 576 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
Azure.IoT Build 0:6ae2f7bca550 577 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
Azure.IoT Build 0:6ae2f7bca550 578 {
Azure.IoT Build 0:6ae2f7bca550 579 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
Azure.IoT Build 0:6ae2f7bca550 580 if (link_detach(message_sender_instance->link) != 0)
Azure.IoT Build 0:6ae2f7bca550 581 {
Azure.IoT Build 0:6ae2f7bca550 582 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 583 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 584 }
Azure.IoT Build 0:6ae2f7bca550 585 else
Azure.IoT Build 0:6ae2f7bca550 586 {
Azure.IoT Build 0:6ae2f7bca550 587 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
Azure.IoT Build 0:6ae2f7bca550 588 result = 0;
Azure.IoT Build 0:6ae2f7bca550 589 }
Azure.IoT Build 0:6ae2f7bca550 590 }
Azure.IoT Build 0:6ae2f7bca550 591 else
Azure.IoT Build 0:6ae2f7bca550 592 {
Azure.IoT Build 0:6ae2f7bca550 593 result = 0;
Azure.IoT Build 0:6ae2f7bca550 594 }
Azure.IoT Build 0:6ae2f7bca550 595 }
Azure.IoT Build 0:6ae2f7bca550 596
Azure.IoT Build 0:6ae2f7bca550 597 return result;
Azure.IoT Build 0:6ae2f7bca550 598 }
Azure.IoT Build 0:6ae2f7bca550 599
Azure.IoT Build 0:6ae2f7bca550 600 int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
Azure.IoT Build 0:6ae2f7bca550 601 {
Azure.IoT Build 0:6ae2f7bca550 602 int result;
Azure.IoT Build 0:6ae2f7bca550 603
Azure.IoT Build 0:6ae2f7bca550 604 if ((message_sender == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 605 (message == NULL))
Azure.IoT Build 0:6ae2f7bca550 606 {
Azure.IoT Build 0:6ae2f7bca550 607 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 608 }
Azure.IoT Build 0:6ae2f7bca550 609 else
Azure.IoT Build 0:6ae2f7bca550 610 {
Azure.IoT Build 0:6ae2f7bca550 611 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 612 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
Azure.IoT Build 0:6ae2f7bca550 613 {
Azure.IoT Build 0:6ae2f7bca550 614 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 615 }
Azure.IoT Build 0:6ae2f7bca550 616 else
Azure.IoT Build 0:6ae2f7bca550 617 {
Azure.IoT Build 0:6ae2f7bca550 618 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)amqpalloc_malloc(sizeof(MESSAGE_WITH_CALLBACK));
Azure.IoT Build 0:6ae2f7bca550 619 if (message_with_callback == NULL)
Azure.IoT Build 0:6ae2f7bca550 620 {
Azure.IoT Build 0:6ae2f7bca550 621 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 622 }
Azure.IoT Build 0:6ae2f7bca550 623 else
Azure.IoT Build 0:6ae2f7bca550 624 {
Azure.IoT Build 0:6ae2f7bca550 625 MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
Azure.IoT Build 0:6ae2f7bca550 626 if (new_messages == NULL)
Azure.IoT Build 0:6ae2f7bca550 627 {
Azure.IoT Build 0:6ae2f7bca550 628 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 629 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 630 }
Azure.IoT Build 0:6ae2f7bca550 631 else
Azure.IoT Build 0:6ae2f7bca550 632 {
Azure.IoT Build 0:6ae2f7bca550 633 result = 0;
Azure.IoT Build 0:6ae2f7bca550 634
Azure.IoT Build 0:6ae2f7bca550 635 message_sender_instance->messages = new_messages;
Azure.IoT Build 0:6ae2f7bca550 636 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
Azure.IoT Build 0:6ae2f7bca550 637 {
Azure.IoT Build 0:6ae2f7bca550 638 message_with_callback->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 639 if (message_with_callback->message == NULL)
Azure.IoT Build 0:6ae2f7bca550 640 {
Azure.IoT Build 0:6ae2f7bca550 641 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 642 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 643 }
Azure.IoT Build 0:6ae2f7bca550 644
Azure.IoT Build 0:6ae2f7bca550 645 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 646 }
Azure.IoT Build 0:6ae2f7bca550 647 else
Azure.IoT Build 0:6ae2f7bca550 648 {
Azure.IoT Build 0:6ae2f7bca550 649 message_with_callback->message = NULL;
Azure.IoT Build 0:6ae2f7bca550 650 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
Azure.IoT Build 0:6ae2f7bca550 651 }
Azure.IoT Build 0:6ae2f7bca550 652
Azure.IoT Build 0:6ae2f7bca550 653 if (result == 0)
Azure.IoT Build 0:6ae2f7bca550 654 {
Azure.IoT Build 0:6ae2f7bca550 655 message_with_callback->on_message_send_complete = on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 656 message_with_callback->context = callback_context;
Azure.IoT Build 0:6ae2f7bca550 657 message_with_callback->message_sender = message_sender_instance;
Azure.IoT Build 0:6ae2f7bca550 658
Azure.IoT Build 0:6ae2f7bca550 659 message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
Azure.IoT Build 0:6ae2f7bca550 660 message_sender_instance->message_count++;
Azure.IoT Build 0:6ae2f7bca550 661
Azure.IoT Build 0:6ae2f7bca550 662 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
Azure.IoT Build 0:6ae2f7bca550 663 {
Azure.IoT Build 0:6ae2f7bca550 664 switch (send_one_message(message_sender_instance, message_with_callback, message))
Azure.IoT Build 0:6ae2f7bca550 665 {
Azure.IoT Build 0:6ae2f7bca550 666 default:
Azure.IoT Build 0:6ae2f7bca550 667 case SEND_ONE_MESSAGE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 668 remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
Azure.IoT Build 0:6ae2f7bca550 669 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 670 break;
Azure.IoT Build 0:6ae2f7bca550 671
Azure.IoT Build 0:6ae2f7bca550 672 case SEND_ONE_MESSAGE_BUSY:
Azure.IoT Build 0:6ae2f7bca550 673 message_with_callback->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 674 if (message_with_callback->message == NULL)
Azure.IoT Build 0:6ae2f7bca550 675 {
Azure.IoT Build 0:6ae2f7bca550 676 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 677 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 678 }
Azure.IoT Build 0:6ae2f7bca550 679 else
Azure.IoT Build 0:6ae2f7bca550 680 {
Azure.IoT Build 0:6ae2f7bca550 681 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 682 result = 0;
Azure.IoT Build 0:6ae2f7bca550 683 }
Azure.IoT Build 0:6ae2f7bca550 684
Azure.IoT Build 0:6ae2f7bca550 685 break;
Azure.IoT Build 0:6ae2f7bca550 686
Azure.IoT Build 0:6ae2f7bca550 687 case SEND_ONE_MESSAGE_OK:
Azure.IoT Build 0:6ae2f7bca550 688 result = 0;
Azure.IoT Build 0:6ae2f7bca550 689 break;
Azure.IoT Build 0:6ae2f7bca550 690 }
Azure.IoT Build 0:6ae2f7bca550 691 }
Azure.IoT Build 0:6ae2f7bca550 692 }
Azure.IoT Build 0:6ae2f7bca550 693 }
Azure.IoT Build 0:6ae2f7bca550 694 }
Azure.IoT Build 0:6ae2f7bca550 695 }
Azure.IoT Build 0:6ae2f7bca550 696 }
Azure.IoT Build 0:6ae2f7bca550 697
Azure.IoT Build 0:6ae2f7bca550 698 return result;
Azure.IoT Build 0:6ae2f7bca550 699 }