A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Committer:
Azure.IoT Build
Date:
Fri Jul 01 10:42:48 2016 -0700
Revision:
5:ae49385aff34
Parent:
2:64b4feb67cd3
Child:
6:641a9672db08
1.0.10

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Azure.IoT Build 0:6ae2f7bca550 1 // Copyright (c) Microsoft. All rights reserved.
Azure.IoT Build 0:6ae2f7bca550 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
Azure.IoT Build 0:6ae2f7bca550 3
Azure.IoT Build 0:6ae2f7bca550 4 #include <stdlib.h>
Azure.IoT Build 0:6ae2f7bca550 5 #ifdef _CRTDBG_MAP_ALLOC
Azure.IoT Build 0:6ae2f7bca550 6 #include <crtdbg.h>
Azure.IoT Build 0:6ae2f7bca550 7 #endif
Azure.IoT Build 0:6ae2f7bca550 8 #include <string.h>
Azure.IoT Build 5:ae49385aff34 9 #include "azure_c_shared_utility/xlogging.h"
Azure.IoT Build 0:6ae2f7bca550 10 #include "azure_uamqp_c/message_sender.h"
Azure.IoT Build 0:6ae2f7bca550 11 #include "azure_uamqp_c/amqpalloc.h"
Azure.IoT Build 0:6ae2f7bca550 12 #include "azure_uamqp_c/amqpvalue_to_string.h"
Azure.IoT Build 0:6ae2f7bca550 13
Azure.IoT Build 0:6ae2f7bca550 14 typedef enum MESSAGE_SEND_STATE_TAG
Azure.IoT Build 0:6ae2f7bca550 15 {
Azure.IoT Build 0:6ae2f7bca550 16 MESSAGE_SEND_STATE_NOT_SENT,
Azure.IoT Build 0:6ae2f7bca550 17 MESSAGE_SEND_STATE_PENDING
Azure.IoT Build 0:6ae2f7bca550 18 } MESSAGE_SEND_STATE;
Azure.IoT Build 0:6ae2f7bca550 19
Azure.IoT Build 0:6ae2f7bca550 20 typedef enum SEND_ONE_MESSAGE_RESULT_TAG
Azure.IoT Build 0:6ae2f7bca550 21 {
Azure.IoT Build 0:6ae2f7bca550 22 SEND_ONE_MESSAGE_OK,
Azure.IoT Build 0:6ae2f7bca550 23 SEND_ONE_MESSAGE_ERROR,
Azure.IoT Build 0:6ae2f7bca550 24 SEND_ONE_MESSAGE_BUSY
Azure.IoT Build 0:6ae2f7bca550 25 } SEND_ONE_MESSAGE_RESULT;
Azure.IoT Build 0:6ae2f7bca550 26
Azure.IoT Build 0:6ae2f7bca550 27 typedef struct MESSAGE_WITH_CALLBACK_TAG
Azure.IoT Build 0:6ae2f7bca550 28 {
Azure.IoT Build 0:6ae2f7bca550 29 MESSAGE_HANDLE message;
Azure.IoT Build 0:6ae2f7bca550 30 ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 31 void* context;
Azure.IoT Build 0:6ae2f7bca550 32 MESSAGE_SENDER_HANDLE message_sender;
Azure.IoT Build 0:6ae2f7bca550 33 MESSAGE_SEND_STATE message_send_state;
Azure.IoT Build 0:6ae2f7bca550 34 } MESSAGE_WITH_CALLBACK;
Azure.IoT Build 0:6ae2f7bca550 35
Azure.IoT Build 0:6ae2f7bca550 36 typedef struct MESSAGE_SENDER_INSTANCE_TAG
Azure.IoT Build 0:6ae2f7bca550 37 {
Azure.IoT Build 0:6ae2f7bca550 38 LINK_HANDLE link;
Azure.IoT Build 0:6ae2f7bca550 39 size_t message_count;
Azure.IoT Build 0:6ae2f7bca550 40 MESSAGE_WITH_CALLBACK** messages;
Azure.IoT Build 0:6ae2f7bca550 41 MESSAGE_SENDER_STATE message_sender_state;
Azure.IoT Build 0:6ae2f7bca550 42 ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
Azure.IoT Build 0:6ae2f7bca550 43 void* on_message_sender_state_changed_context;
Azure.IoT Build 0:6ae2f7bca550 44 } MESSAGE_SENDER_INSTANCE;
Azure.IoT Build 0:6ae2f7bca550 45
Azure.IoT Build 0:6ae2f7bca550 46 static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
Azure.IoT Build 0:6ae2f7bca550 47 {
Azure.IoT Build 0:6ae2f7bca550 48 MESSAGE_WITH_CALLBACK** new_messages;
Azure.IoT Build 0:6ae2f7bca550 49
Azure.IoT Build 0:6ae2f7bca550 50 if (message_sender_instance->messages[index]->message != NULL)
Azure.IoT Build 0:6ae2f7bca550 51 {
Azure.IoT Build 0:6ae2f7bca550 52 message_destroy(message_sender_instance->messages[index]->message);
Azure.IoT Build 0:6ae2f7bca550 53 message_sender_instance->messages[index]->message = NULL;
Azure.IoT Build 0:6ae2f7bca550 54 }
Azure.IoT Build 0:6ae2f7bca550 55
Azure.IoT Build 0:6ae2f7bca550 56 amqpalloc_free(message_sender_instance->messages[index]);
Azure.IoT Build 0:6ae2f7bca550 57
Azure.IoT Build 0:6ae2f7bca550 58 if (message_sender_instance->message_count - index > 1)
Azure.IoT Build 0:6ae2f7bca550 59 {
Azure.IoT Build 0:6ae2f7bca550 60 (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
Azure.IoT Build 0:6ae2f7bca550 61 }
Azure.IoT Build 0:6ae2f7bca550 62
Azure.IoT Build 0:6ae2f7bca550 63 message_sender_instance->message_count--;
Azure.IoT Build 0:6ae2f7bca550 64
Azure.IoT Build 0:6ae2f7bca550 65 if (message_sender_instance->message_count > 0)
Azure.IoT Build 0:6ae2f7bca550 66 {
Azure.IoT Build 0:6ae2f7bca550 67 new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
Azure.IoT Build 0:6ae2f7bca550 68 if (new_messages != NULL)
Azure.IoT Build 0:6ae2f7bca550 69 {
Azure.IoT Build 0:6ae2f7bca550 70 message_sender_instance->messages = new_messages;
Azure.IoT Build 0:6ae2f7bca550 71 }
Azure.IoT Build 0:6ae2f7bca550 72 }
Azure.IoT Build 0:6ae2f7bca550 73 else
Azure.IoT Build 0:6ae2f7bca550 74 {
Azure.IoT Build 0:6ae2f7bca550 75 amqpalloc_free(message_sender_instance->messages);
Azure.IoT Build 0:6ae2f7bca550 76 message_sender_instance->messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 77 }
Azure.IoT Build 0:6ae2f7bca550 78 }
Azure.IoT Build 0:6ae2f7bca550 79
Azure.IoT Build 0:6ae2f7bca550 80 static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 81 {
Azure.IoT Build 0:6ae2f7bca550 82 size_t i;
Azure.IoT Build 0:6ae2f7bca550 83
Azure.IoT Build 0:6ae2f7bca550 84 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 85 {
Azure.IoT Build 0:6ae2f7bca550 86 if (message_sender_instance->messages[i] == message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 87 {
Azure.IoT Build 0:6ae2f7bca550 88 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 89 break;
Azure.IoT Build 0:6ae2f7bca550 90 }
Azure.IoT Build 0:6ae2f7bca550 91 }
Azure.IoT Build 0:6ae2f7bca550 92 }
Azure.IoT Build 0:6ae2f7bca550 93
Azure.IoT Build 0:6ae2f7bca550 94 static void on_delivery_settled(void* context, delivery_number delivery_no)
Azure.IoT Build 0:6ae2f7bca550 95 {
Azure.IoT Build 0:6ae2f7bca550 96 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
Azure.IoT Build 0:6ae2f7bca550 97 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
Azure.IoT Build 0:6ae2f7bca550 98
Azure.IoT Build 0:6ae2f7bca550 99 if (message_with_callback->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 100 {
Azure.IoT Build 0:6ae2f7bca550 101 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
Azure.IoT Build 0:6ae2f7bca550 102 }
Azure.IoT Build 0:6ae2f7bca550 103
Azure.IoT Build 0:6ae2f7bca550 104 remove_pending_message(message_sender_instance, message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 105 }
Azure.IoT Build 0:6ae2f7bca550 106
Azure.IoT Build 0:6ae2f7bca550 107 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
Azure.IoT Build 0:6ae2f7bca550 108 {
Azure.IoT Build 0:6ae2f7bca550 109 PAYLOAD* payload = (PAYLOAD*)context;
Azure.IoT Build 0:6ae2f7bca550 110 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
Azure.IoT Build 0:6ae2f7bca550 111 payload->length += length;
Azure.IoT Build 0:6ae2f7bca550 112 return 0;
Azure.IoT Build 0:6ae2f7bca550 113 }
Azure.IoT Build 0:6ae2f7bca550 114
Azure.IoT Build 0:6ae2f7bca550 115 static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
Azure.IoT Build 0:6ae2f7bca550 116 {
Azure.IoT Build 5:ae49385aff34 117 if (xlogging_get_log_function() != NULL)
Azure.IoT Build 0:6ae2f7bca550 118 {
Azure.IoT Build 0:6ae2f7bca550 119 char* value_as_string = NULL;
Azure.IoT Build 5:ae49385aff34 120 LOG(LOG_TRACE, 0, "%s", name);
Azure.IoT Build 5:ae49385aff34 121 LOG(LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
Azure.IoT Build 0:6ae2f7bca550 122 if (value_as_string != NULL)
Azure.IoT Build 0:6ae2f7bca550 123 {
Azure.IoT Build 0:6ae2f7bca550 124 amqpalloc_free(value_as_string);
Azure.IoT Build 0:6ae2f7bca550 125 }
Azure.IoT Build 0:6ae2f7bca550 126 }
Azure.IoT Build 0:6ae2f7bca550 127 }
Azure.IoT Build 0:6ae2f7bca550 128
Azure.IoT Build 0:6ae2f7bca550 129 static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
Azure.IoT Build 0:6ae2f7bca550 130 {
Azure.IoT Build 0:6ae2f7bca550 131 SEND_ONE_MESSAGE_RESULT result;
Azure.IoT Build 0:6ae2f7bca550 132
Azure.IoT Build 0:6ae2f7bca550 133 size_t encoded_size;
Azure.IoT Build 0:6ae2f7bca550 134 size_t total_encoded_size = 0;
Azure.IoT Build 0:6ae2f7bca550 135 MESSAGE_BODY_TYPE message_body_type;
Azure.IoT Build 0:6ae2f7bca550 136 message_format message_format;
Azure.IoT Build 0:6ae2f7bca550 137
Azure.IoT Build 0:6ae2f7bca550 138 if ((message_get_body_type(message, &message_body_type) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 139 (message_get_message_format(message, &message_format) != 0))
Azure.IoT Build 0:6ae2f7bca550 140 {
Azure.IoT Build 0:6ae2f7bca550 141 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 142 }
Azure.IoT Build 0:6ae2f7bca550 143 else
Azure.IoT Build 0:6ae2f7bca550 144 {
Azure.IoT Build 0:6ae2f7bca550 145 // header
Azure.IoT Build 0:6ae2f7bca550 146 HEADER_HANDLE header;
Azure.IoT Build 0:6ae2f7bca550 147 AMQP_VALUE header_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 148 PROPERTIES_HANDLE properties;
Azure.IoT Build 0:6ae2f7bca550 149 AMQP_VALUE properties_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 150 AMQP_VALUE application_properties;
Azure.IoT Build 0:6ae2f7bca550 151 AMQP_VALUE application_properties_value;
Azure.IoT Build 0:6ae2f7bca550 152 AMQP_VALUE body_amqp_value = NULL;
Azure.IoT Build 0:6ae2f7bca550 153 size_t body_data_count;
Azure.IoT Build 0:6ae2f7bca550 154
Azure.IoT Build 0:6ae2f7bca550 155 message_get_header(message, &header);
Azure.IoT Build 0:6ae2f7bca550 156 header_amqp_value = amqpvalue_create_header(header);
Azure.IoT Build 0:6ae2f7bca550 157 if (header != NULL)
Azure.IoT Build 0:6ae2f7bca550 158 {
Azure.IoT Build 0:6ae2f7bca550 159 amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 160 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 161 }
Azure.IoT Build 0:6ae2f7bca550 162
Azure.IoT Build 0:6ae2f7bca550 163 // properties
Azure.IoT Build 0:6ae2f7bca550 164 message_get_properties(message, &properties);
Azure.IoT Build 0:6ae2f7bca550 165 properties_amqp_value = amqpvalue_create_properties(properties);
Azure.IoT Build 0:6ae2f7bca550 166 if (properties != NULL)
Azure.IoT Build 0:6ae2f7bca550 167 {
Azure.IoT Build 0:6ae2f7bca550 168 amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 169 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 170 }
Azure.IoT Build 0:6ae2f7bca550 171
Azure.IoT Build 0:6ae2f7bca550 172 // application properties
Azure.IoT Build 0:6ae2f7bca550 173 message_get_application_properties(message, &application_properties);
Azure.IoT Build 0:6ae2f7bca550 174 application_properties_value = amqpvalue_create_application_properties(application_properties);
Azure.IoT Build 0:6ae2f7bca550 175 if (application_properties != NULL)
Azure.IoT Build 0:6ae2f7bca550 176 {
Azure.IoT Build 0:6ae2f7bca550 177 amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
Azure.IoT Build 0:6ae2f7bca550 178 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 179 }
Azure.IoT Build 0:6ae2f7bca550 180
Azure.IoT Build 0:6ae2f7bca550 181 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 182
Azure.IoT Build 0:6ae2f7bca550 183 // body - amqp data
Azure.IoT Build 0:6ae2f7bca550 184 switch (message_body_type)
Azure.IoT Build 0:6ae2f7bca550 185 {
Azure.IoT Build 0:6ae2f7bca550 186 default:
Azure.IoT Build 0:6ae2f7bca550 187 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 188 break;
Azure.IoT Build 0:6ae2f7bca550 189
Azure.IoT Build 0:6ae2f7bca550 190 case MESSAGE_BODY_TYPE_VALUE:
Azure.IoT Build 0:6ae2f7bca550 191 {
Azure.IoT Build 0:6ae2f7bca550 192 AMQP_VALUE message_body_amqp_value;
Azure.IoT Build 0:6ae2f7bca550 193 if (message_get_inplace_body_amqp_value(message, &message_body_amqp_value) != 0)
Azure.IoT Build 0:6ae2f7bca550 194 {
Azure.IoT Build 0:6ae2f7bca550 195 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 196 }
Azure.IoT Build 0:6ae2f7bca550 197 else
Azure.IoT Build 0:6ae2f7bca550 198 {
Azure.IoT Build 0:6ae2f7bca550 199 body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 200 if ((body_amqp_value == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 201 (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
Azure.IoT Build 0:6ae2f7bca550 202 {
Azure.IoT Build 0:6ae2f7bca550 203 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 204 }
Azure.IoT Build 0:6ae2f7bca550 205 else
Azure.IoT Build 0:6ae2f7bca550 206 {
Azure.IoT Build 0:6ae2f7bca550 207 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 208 }
Azure.IoT Build 0:6ae2f7bca550 209 }
Azure.IoT Build 0:6ae2f7bca550 210
Azure.IoT Build 0:6ae2f7bca550 211 break;
Azure.IoT Build 0:6ae2f7bca550 212 }
Azure.IoT Build 0:6ae2f7bca550 213
Azure.IoT Build 0:6ae2f7bca550 214 case MESSAGE_BODY_TYPE_DATA:
Azure.IoT Build 0:6ae2f7bca550 215 {
Azure.IoT Build 0:6ae2f7bca550 216 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 217 size_t i;
Azure.IoT Build 0:6ae2f7bca550 218
Azure.IoT Build 0:6ae2f7bca550 219 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
Azure.IoT Build 0:6ae2f7bca550 220 {
Azure.IoT Build 0:6ae2f7bca550 221 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 222 }
Azure.IoT Build 0:6ae2f7bca550 223 else
Azure.IoT Build 0:6ae2f7bca550 224 {
Azure.IoT Build 0:6ae2f7bca550 225 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 226 {
Azure.IoT Build 0:6ae2f7bca550 227 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 228 {
Azure.IoT Build 0:6ae2f7bca550 229 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 230 }
Azure.IoT Build 0:6ae2f7bca550 231 else
Azure.IoT Build 0:6ae2f7bca550 232 {
Azure.IoT Build 0:6ae2f7bca550 233 amqp_binary binary_value = { binary_data.bytes, binary_data.length };
Azure.IoT Build 0:6ae2f7bca550 234 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 235 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 236 {
Azure.IoT Build 0:6ae2f7bca550 237 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 238 }
Azure.IoT Build 0:6ae2f7bca550 239 else
Azure.IoT Build 0:6ae2f7bca550 240 {
Azure.IoT Build 0:6ae2f7bca550 241 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
Azure.IoT Build 0:6ae2f7bca550 242 {
Azure.IoT Build 0:6ae2f7bca550 243 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 244 }
Azure.IoT Build 0:6ae2f7bca550 245 else
Azure.IoT Build 0:6ae2f7bca550 246 {
Azure.IoT Build 0:6ae2f7bca550 247 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 248 }
Azure.IoT Build 0:6ae2f7bca550 249
Azure.IoT Build 0:6ae2f7bca550 250 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 251 }
Azure.IoT Build 0:6ae2f7bca550 252 }
Azure.IoT Build 0:6ae2f7bca550 253 }
Azure.IoT Build 0:6ae2f7bca550 254 }
Azure.IoT Build 0:6ae2f7bca550 255 break;
Azure.IoT Build 0:6ae2f7bca550 256 }
Azure.IoT Build 0:6ae2f7bca550 257 }
Azure.IoT Build 0:6ae2f7bca550 258
Azure.IoT Build 0:6ae2f7bca550 259 if (result == 0)
Azure.IoT Build 0:6ae2f7bca550 260 {
Azure.IoT Build 0:6ae2f7bca550 261 void* data_bytes = amqpalloc_malloc(total_encoded_size);
Azure.IoT Build 0:6ae2f7bca550 262 PAYLOAD payload = { data_bytes, 0 };
Azure.IoT Build 0:6ae2f7bca550 263 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 264
Azure.IoT Build 0:6ae2f7bca550 265 if (header != NULL)
Azure.IoT Build 0:6ae2f7bca550 266 {
Azure.IoT Build 0:6ae2f7bca550 267 if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 268 {
Azure.IoT Build 0:6ae2f7bca550 269 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 270 }
Azure.IoT Build 0:6ae2f7bca550 271
Azure.IoT Build 0:6ae2f7bca550 272 log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 273 }
Azure.IoT Build 0:6ae2f7bca550 274
Azure.IoT Build 0:6ae2f7bca550 275 if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
Azure.IoT Build 0:6ae2f7bca550 276 {
Azure.IoT Build 0:6ae2f7bca550 277 if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 278 {
Azure.IoT Build 0:6ae2f7bca550 279 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 280 }
Azure.IoT Build 0:6ae2f7bca550 281
Azure.IoT Build 0:6ae2f7bca550 282 log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 283 }
Azure.IoT Build 0:6ae2f7bca550 284
Azure.IoT Build 0:6ae2f7bca550 285 if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
Azure.IoT Build 0:6ae2f7bca550 286 {
Azure.IoT Build 0:6ae2f7bca550 287 if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 288 {
Azure.IoT Build 0:6ae2f7bca550 289 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 290 }
Azure.IoT Build 0:6ae2f7bca550 291
Azure.IoT Build 0:6ae2f7bca550 292 log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
Azure.IoT Build 0:6ae2f7bca550 293 }
Azure.IoT Build 0:6ae2f7bca550 294
Azure.IoT Build 0:6ae2f7bca550 295 if (result == SEND_ONE_MESSAGE_OK)
Azure.IoT Build 0:6ae2f7bca550 296 {
Azure.IoT Build 0:6ae2f7bca550 297 switch (message_body_type)
Azure.IoT Build 0:6ae2f7bca550 298 {
Azure.IoT Build 0:6ae2f7bca550 299 case MESSAGE_BODY_TYPE_VALUE:
Azure.IoT Build 0:6ae2f7bca550 300 {
Azure.IoT Build 0:6ae2f7bca550 301 if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 302 {
Azure.IoT Build 0:6ae2f7bca550 303 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 304 }
Azure.IoT Build 0:6ae2f7bca550 305
Azure.IoT Build 0:6ae2f7bca550 306 log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 307 break;
Azure.IoT Build 0:6ae2f7bca550 308 }
Azure.IoT Build 0:6ae2f7bca550 309 case MESSAGE_BODY_TYPE_DATA:
Azure.IoT Build 0:6ae2f7bca550 310 {
Azure.IoT Build 0:6ae2f7bca550 311 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 312 size_t i;
Azure.IoT Build 0:6ae2f7bca550 313
Azure.IoT Build 0:6ae2f7bca550 314 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 315 {
Azure.IoT Build 0:6ae2f7bca550 316 if (message_get_body_amqp_data(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 317 {
Azure.IoT Build 0:6ae2f7bca550 318 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 319 }
Azure.IoT Build 0:6ae2f7bca550 320 else
Azure.IoT Build 0:6ae2f7bca550 321 {
Azure.IoT Build 0:6ae2f7bca550 322 amqp_binary binary_value = { binary_data.bytes, binary_data.length };
Azure.IoT Build 0:6ae2f7bca550 323 AMQP_VALUE body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 324 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 325 {
Azure.IoT Build 0:6ae2f7bca550 326 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 327 }
Azure.IoT Build 0:6ae2f7bca550 328 else
Azure.IoT Build 0:6ae2f7bca550 329 {
Azure.IoT Build 0:6ae2f7bca550 330 if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 331 {
Azure.IoT Build 0:6ae2f7bca550 332 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 333 break;
Azure.IoT Build 0:6ae2f7bca550 334 }
Azure.IoT Build 0:6ae2f7bca550 335
Azure.IoT Build 0:6ae2f7bca550 336 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 337 }
Azure.IoT Build 0:6ae2f7bca550 338 }
Azure.IoT Build 0:6ae2f7bca550 339 }
Azure.IoT Build 0:6ae2f7bca550 340 break;
Azure.IoT Build 0:6ae2f7bca550 341 }
Azure.IoT Build 0:6ae2f7bca550 342 }
Azure.IoT Build 0:6ae2f7bca550 343 }
Azure.IoT Build 0:6ae2f7bca550 344
Azure.IoT Build 0:6ae2f7bca550 345 if (result == SEND_ONE_MESSAGE_OK)
Azure.IoT Build 0:6ae2f7bca550 346 {
Azure.IoT Build 0:6ae2f7bca550 347 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
Azure.IoT Build 0:6ae2f7bca550 348 switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
Azure.IoT Build 0:6ae2f7bca550 349 {
Azure.IoT Build 0:6ae2f7bca550 350 default:
Azure.IoT Build 0:6ae2f7bca550 351 case LINK_TRANSFER_ERROR:
Azure.IoT Build 0:6ae2f7bca550 352 if (message_with_callback->on_message_send_complete != NULL)
Azure.IoT Build 0:6ae2f7bca550 353 {
Azure.IoT Build 0:6ae2f7bca550 354 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 355 }
Azure.IoT Build 0:6ae2f7bca550 356
Azure.IoT Build 0:6ae2f7bca550 357 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 358 break;
Azure.IoT Build 0:6ae2f7bca550 359
Azure.IoT Build 0:6ae2f7bca550 360 case LINK_TRANSFER_BUSY:
Azure.IoT Build 0:6ae2f7bca550 361 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 362 result = SEND_ONE_MESSAGE_BUSY;
Azure.IoT Build 0:6ae2f7bca550 363 break;
Azure.IoT Build 0:6ae2f7bca550 364
Azure.IoT Build 0:6ae2f7bca550 365 case LINK_TRANSFER_OK:
Azure.IoT Build 0:6ae2f7bca550 366 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 367 break;
Azure.IoT Build 0:6ae2f7bca550 368 }
Azure.IoT Build 0:6ae2f7bca550 369 }
Azure.IoT Build 0:6ae2f7bca550 370
Azure.IoT Build 0:6ae2f7bca550 371 amqpalloc_free(data_bytes);
Azure.IoT Build 0:6ae2f7bca550 372
Azure.IoT Build 0:6ae2f7bca550 373 if (body_amqp_value != NULL)
Azure.IoT Build 0:6ae2f7bca550 374 {
Azure.IoT Build 0:6ae2f7bca550 375 amqpvalue_destroy(body_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 376 }
Azure.IoT Build 0:6ae2f7bca550 377
Azure.IoT Build 0:6ae2f7bca550 378 amqpvalue_destroy(application_properties);
Azure.IoT Build 0:6ae2f7bca550 379 amqpvalue_destroy(application_properties_value);
Azure.IoT Build 0:6ae2f7bca550 380 amqpvalue_destroy(properties_amqp_value);
Azure.IoT Build 0:6ae2f7bca550 381 properties_destroy(properties);
Azure.IoT Build 0:6ae2f7bca550 382 }
Azure.IoT Build 0:6ae2f7bca550 383 }
Azure.IoT Build 0:6ae2f7bca550 384
Azure.IoT Build 0:6ae2f7bca550 385 return result;
Azure.IoT Build 0:6ae2f7bca550 386 }
Azure.IoT Build 0:6ae2f7bca550 387
Azure.IoT Build 0:6ae2f7bca550 388 static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
Azure.IoT Build 0:6ae2f7bca550 389 {
Azure.IoT Build 0:6ae2f7bca550 390 size_t i;
Azure.IoT Build 0:6ae2f7bca550 391
Azure.IoT Build 0:6ae2f7bca550 392 for (i = 0; i < message_sender_instance->message_count; i++)
Azure.IoT Build 0:6ae2f7bca550 393 {
Azure.IoT Build 0:6ae2f7bca550 394 if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
Azure.IoT Build 0:6ae2f7bca550 395 {
Azure.IoT Build 0:6ae2f7bca550 396 switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
Azure.IoT Build 0:6ae2f7bca550 397 {
Azure.IoT Build 0:6ae2f7bca550 398 default:
Azure.IoT Build 0:6ae2f7bca550 399 case SEND_ONE_MESSAGE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 400 {
Azure.IoT Build 0:6ae2f7bca550 401 ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 402 void* context = message_sender_instance->messages[i]->context;
Azure.IoT Build 0:6ae2f7bca550 403 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 404
Azure.IoT Build 0:6ae2f7bca550 405 on_message_send_complete(context, MESSAGE_SEND_ERROR);
Azure.IoT Build 0:6ae2f7bca550 406 i = message_sender_instance->message_count;
Azure.IoT Build 0:6ae2f7bca550 407 break;
Azure.IoT Build 0:6ae2f7bca550 408 }
Azure.IoT Build 0:6ae2f7bca550 409 case SEND_ONE_MESSAGE_BUSY:
Azure.IoT Build 0:6ae2f7bca550 410 i = message_sender_instance->message_count + 1;
Azure.IoT Build 0:6ae2f7bca550 411 break;
Azure.IoT Build 0:6ae2f7bca550 412
Azure.IoT Build 0:6ae2f7bca550 413 case SEND_ONE_MESSAGE_OK:
Azure.IoT Build 0:6ae2f7bca550 414 break;
Azure.IoT Build 0:6ae2f7bca550 415 }
Azure.IoT Build 0:6ae2f7bca550 416
Azure.IoT Build 0:6ae2f7bca550 417 i--;
Azure.IoT Build 0:6ae2f7bca550 418 }
Azure.IoT Build 0:6ae2f7bca550 419 }
Azure.IoT Build 0:6ae2f7bca550 420 }
Azure.IoT Build 0:6ae2f7bca550 421
Azure.IoT Build 0:6ae2f7bca550 422 static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
Azure.IoT Build 0:6ae2f7bca550 423 {
Azure.IoT Build 0:6ae2f7bca550 424 MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
Azure.IoT Build 0:6ae2f7bca550 425 message_sender_instance->message_sender_state = new_state;
Azure.IoT Build 0:6ae2f7bca550 426 if (message_sender_instance->on_message_sender_state_changed != NULL)
Azure.IoT Build 0:6ae2f7bca550 427 {
Azure.IoT Build 0:6ae2f7bca550 428 message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
Azure.IoT Build 0:6ae2f7bca550 429 }
Azure.IoT Build 0:6ae2f7bca550 430 }
Azure.IoT Build 0:6ae2f7bca550 431
AzureIoTClient 2:64b4feb67cd3 432 static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
AzureIoTClient 2:64b4feb67cd3 433 {
AzureIoTClient 2:64b4feb67cd3 434 size_t i;
AzureIoTClient 2:64b4feb67cd3 435
AzureIoTClient 2:64b4feb67cd3 436 for (i = 0; i < message_sender_instance->message_count; i++)
AzureIoTClient 2:64b4feb67cd3 437 {
AzureIoTClient 2:64b4feb67cd3 438 if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
AzureIoTClient 2:64b4feb67cd3 439 {
AzureIoTClient 2:64b4feb67cd3 440 message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
AzureIoTClient 2:64b4feb67cd3 441 }
AzureIoTClient 2:64b4feb67cd3 442
AzureIoTClient 2:64b4feb67cd3 443 message_destroy(message_sender_instance->messages[i]->message);
AzureIoTClient 2:64b4feb67cd3 444 amqpalloc_free(message_sender_instance->messages[i]);
AzureIoTClient 2:64b4feb67cd3 445 }
AzureIoTClient 2:64b4feb67cd3 446
AzureIoTClient 2:64b4feb67cd3 447 if (message_sender_instance->messages != NULL)
AzureIoTClient 2:64b4feb67cd3 448 {
AzureIoTClient 2:64b4feb67cd3 449 message_sender_instance->message_count = 0;
AzureIoTClient 2:64b4feb67cd3 450
AzureIoTClient 2:64b4feb67cd3 451 amqpalloc_free(message_sender_instance->messages);
AzureIoTClient 2:64b4feb67cd3 452 message_sender_instance->messages = NULL;
AzureIoTClient 2:64b4feb67cd3 453 }
AzureIoTClient 2:64b4feb67cd3 454 }
AzureIoTClient 2:64b4feb67cd3 455
Azure.IoT Build 0:6ae2f7bca550 456 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
Azure.IoT Build 0:6ae2f7bca550 457 {
Azure.IoT Build 0:6ae2f7bca550 458 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 459
Azure.IoT Build 0:6ae2f7bca550 460 switch (new_link_state)
Azure.IoT Build 0:6ae2f7bca550 461 {
Azure.IoT Build 0:6ae2f7bca550 462 case LINK_STATE_ATTACHED:
Azure.IoT Build 0:6ae2f7bca550 463 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
Azure.IoT Build 0:6ae2f7bca550 464 {
Azure.IoT Build 0:6ae2f7bca550 465 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
Azure.IoT Build 0:6ae2f7bca550 466 }
Azure.IoT Build 0:6ae2f7bca550 467 break;
Azure.IoT Build 0:6ae2f7bca550 468 case LINK_STATE_DETACHED:
Azure.IoT Build 0:6ae2f7bca550 469 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
Azure.IoT Build 0:6ae2f7bca550 470 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
Azure.IoT Build 0:6ae2f7bca550 471 {
Azure.IoT Build 0:6ae2f7bca550 472 /* User initiated transition, we should be good */
AzureIoTClient 2:64b4feb67cd3 473 indicate_all_messages_as_error(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 474 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
Azure.IoT Build 0:6ae2f7bca550 475 }
Azure.IoT Build 0:6ae2f7bca550 476 else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 477 {
Azure.IoT Build 0:6ae2f7bca550 478 /* Any other transition must be an error */
Azure.IoT Build 0:6ae2f7bca550 479 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 480 }
Azure.IoT Build 0:6ae2f7bca550 481 break;
AzureIoTClient 1:eab586236bfe 482 case LINK_STATE_ERROR:
AzureIoTClient 1:eab586236bfe 483 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
AzureIoTClient 1:eab586236bfe 484 {
AzureIoTClient 2:64b4feb67cd3 485 indicate_all_messages_as_error(message_sender_instance);
AzureIoTClient 1:eab586236bfe 486 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
AzureIoTClient 1:eab586236bfe 487 }
AzureIoTClient 1:eab586236bfe 488 break;
Azure.IoT Build 0:6ae2f7bca550 489 }
Azure.IoT Build 0:6ae2f7bca550 490 }
Azure.IoT Build 0:6ae2f7bca550 491
Azure.IoT Build 0:6ae2f7bca550 492 static void on_link_flow_on(void* context)
Azure.IoT Build 0:6ae2f7bca550 493 {
Azure.IoT Build 0:6ae2f7bca550 494 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
Azure.IoT Build 0:6ae2f7bca550 495 send_all_pending_messages(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 496 }
Azure.IoT Build 0:6ae2f7bca550 497
Azure.IoT Build 5:ae49385aff34 498 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
Azure.IoT Build 0:6ae2f7bca550 499 {
Azure.IoT Build 0:6ae2f7bca550 500 MESSAGE_SENDER_INSTANCE* result = amqpalloc_malloc(sizeof(MESSAGE_SENDER_INSTANCE));
Azure.IoT Build 0:6ae2f7bca550 501 if (result != NULL)
Azure.IoT Build 0:6ae2f7bca550 502 {
Azure.IoT Build 0:6ae2f7bca550 503 result->messages = NULL;
Azure.IoT Build 0:6ae2f7bca550 504 result->message_count = 0;
Azure.IoT Build 0:6ae2f7bca550 505 result->link = link;
Azure.IoT Build 0:6ae2f7bca550 506 result->on_message_sender_state_changed = on_message_sender_state_changed;
Azure.IoT Build 0:6ae2f7bca550 507 result->on_message_sender_state_changed_context = context;
Azure.IoT Build 0:6ae2f7bca550 508 result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
Azure.IoT Build 0:6ae2f7bca550 509 }
Azure.IoT Build 0:6ae2f7bca550 510
Azure.IoT Build 0:6ae2f7bca550 511 return result;
Azure.IoT Build 0:6ae2f7bca550 512 }
Azure.IoT Build 0:6ae2f7bca550 513
Azure.IoT Build 0:6ae2f7bca550 514 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 515 {
Azure.IoT Build 0:6ae2f7bca550 516 if (message_sender != NULL)
Azure.IoT Build 0:6ae2f7bca550 517 {
Azure.IoT Build 0:6ae2f7bca550 518 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 519
Azure.IoT Build 0:6ae2f7bca550 520 messagesender_close(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 521
AzureIoTClient 2:64b4feb67cd3 522 indicate_all_messages_as_error(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 523
Azure.IoT Build 0:6ae2f7bca550 524 amqpalloc_free(message_sender);
Azure.IoT Build 0:6ae2f7bca550 525 }
Azure.IoT Build 0:6ae2f7bca550 526 }
Azure.IoT Build 0:6ae2f7bca550 527
Azure.IoT Build 0:6ae2f7bca550 528 int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 529 {
Azure.IoT Build 0:6ae2f7bca550 530 int result;
Azure.IoT Build 0:6ae2f7bca550 531
Azure.IoT Build 0:6ae2f7bca550 532 if (message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 533 {
Azure.IoT Build 0:6ae2f7bca550 534 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 535 }
Azure.IoT Build 0:6ae2f7bca550 536 else
Azure.IoT Build 0:6ae2f7bca550 537 {
Azure.IoT Build 0:6ae2f7bca550 538 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 539
Azure.IoT Build 0:6ae2f7bca550 540 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 541 {
Azure.IoT Build 0:6ae2f7bca550 542 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
Azure.IoT Build 0:6ae2f7bca550 543 if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
Azure.IoT Build 0:6ae2f7bca550 544 {
Azure.IoT Build 0:6ae2f7bca550 545 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 546 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 547 }
Azure.IoT Build 0:6ae2f7bca550 548 else
Azure.IoT Build 0:6ae2f7bca550 549 {
Azure.IoT Build 0:6ae2f7bca550 550 result = 0;
Azure.IoT Build 0:6ae2f7bca550 551 }
Azure.IoT Build 0:6ae2f7bca550 552 }
Azure.IoT Build 0:6ae2f7bca550 553 else
Azure.IoT Build 0:6ae2f7bca550 554 {
Azure.IoT Build 0:6ae2f7bca550 555 result = 0;
Azure.IoT Build 0:6ae2f7bca550 556 }
Azure.IoT Build 0:6ae2f7bca550 557 }
Azure.IoT Build 0:6ae2f7bca550 558
Azure.IoT Build 0:6ae2f7bca550 559 return result;
Azure.IoT Build 0:6ae2f7bca550 560 }
Azure.IoT Build 0:6ae2f7bca550 561
Azure.IoT Build 0:6ae2f7bca550 562 int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 563 {
Azure.IoT Build 0:6ae2f7bca550 564 int result;
Azure.IoT Build 0:6ae2f7bca550 565
Azure.IoT Build 0:6ae2f7bca550 566 if (message_sender == NULL)
Azure.IoT Build 0:6ae2f7bca550 567 {
Azure.IoT Build 0:6ae2f7bca550 568 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 569 }
Azure.IoT Build 0:6ae2f7bca550 570 else
Azure.IoT Build 0:6ae2f7bca550 571 {
Azure.IoT Build 0:6ae2f7bca550 572 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 573
Azure.IoT Build 0:6ae2f7bca550 574 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
Azure.IoT Build 0:6ae2f7bca550 575 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
Azure.IoT Build 0:6ae2f7bca550 576 {
Azure.IoT Build 0:6ae2f7bca550 577 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
Azure.IoT Build 0:6ae2f7bca550 578 if (link_detach(message_sender_instance->link) != 0)
Azure.IoT Build 0:6ae2f7bca550 579 {
Azure.IoT Build 0:6ae2f7bca550 580 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 581 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 582 }
Azure.IoT Build 0:6ae2f7bca550 583 else
Azure.IoT Build 0:6ae2f7bca550 584 {
Azure.IoT Build 0:6ae2f7bca550 585 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
Azure.IoT Build 0:6ae2f7bca550 586 result = 0;
Azure.IoT Build 0:6ae2f7bca550 587 }
Azure.IoT Build 0:6ae2f7bca550 588 }
Azure.IoT Build 0:6ae2f7bca550 589 else
Azure.IoT Build 0:6ae2f7bca550 590 {
Azure.IoT Build 0:6ae2f7bca550 591 result = 0;
Azure.IoT Build 0:6ae2f7bca550 592 }
Azure.IoT Build 0:6ae2f7bca550 593 }
Azure.IoT Build 0:6ae2f7bca550 594
Azure.IoT Build 0:6ae2f7bca550 595 return result;
Azure.IoT Build 0:6ae2f7bca550 596 }
Azure.IoT Build 0:6ae2f7bca550 597
Azure.IoT Build 0:6ae2f7bca550 598 int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
Azure.IoT Build 0:6ae2f7bca550 599 {
Azure.IoT Build 0:6ae2f7bca550 600 int result;
Azure.IoT Build 0:6ae2f7bca550 601
Azure.IoT Build 0:6ae2f7bca550 602 if ((message_sender == NULL) ||
Azure.IoT Build 0:6ae2f7bca550 603 (message == NULL))
Azure.IoT Build 0:6ae2f7bca550 604 {
Azure.IoT Build 0:6ae2f7bca550 605 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 606 }
Azure.IoT Build 0:6ae2f7bca550 607 else
Azure.IoT Build 0:6ae2f7bca550 608 {
Azure.IoT Build 0:6ae2f7bca550 609 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 610 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
Azure.IoT Build 0:6ae2f7bca550 611 {
Azure.IoT Build 0:6ae2f7bca550 612 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 613 }
Azure.IoT Build 0:6ae2f7bca550 614 else
Azure.IoT Build 0:6ae2f7bca550 615 {
Azure.IoT Build 0:6ae2f7bca550 616 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)amqpalloc_malloc(sizeof(MESSAGE_WITH_CALLBACK));
Azure.IoT Build 0:6ae2f7bca550 617 if (message_with_callback == NULL)
Azure.IoT Build 0:6ae2f7bca550 618 {
Azure.IoT Build 0:6ae2f7bca550 619 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 620 }
Azure.IoT Build 0:6ae2f7bca550 621 else
Azure.IoT Build 0:6ae2f7bca550 622 {
Azure.IoT Build 0:6ae2f7bca550 623 MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)amqpalloc_realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
Azure.IoT Build 0:6ae2f7bca550 624 if (new_messages == NULL)
Azure.IoT Build 0:6ae2f7bca550 625 {
Azure.IoT Build 0:6ae2f7bca550 626 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 627 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 628 }
Azure.IoT Build 0:6ae2f7bca550 629 else
Azure.IoT Build 0:6ae2f7bca550 630 {
Azure.IoT Build 0:6ae2f7bca550 631 result = 0;
Azure.IoT Build 0:6ae2f7bca550 632
Azure.IoT Build 0:6ae2f7bca550 633 message_sender_instance->messages = new_messages;
Azure.IoT Build 0:6ae2f7bca550 634 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
Azure.IoT Build 0:6ae2f7bca550 635 {
Azure.IoT Build 0:6ae2f7bca550 636 message_with_callback->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 637 if (message_with_callback->message == NULL)
Azure.IoT Build 0:6ae2f7bca550 638 {
Azure.IoT Build 0:6ae2f7bca550 639 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 640 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 641 }
Azure.IoT Build 0:6ae2f7bca550 642
Azure.IoT Build 0:6ae2f7bca550 643 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 644 }
Azure.IoT Build 0:6ae2f7bca550 645 else
Azure.IoT Build 0:6ae2f7bca550 646 {
Azure.IoT Build 0:6ae2f7bca550 647 message_with_callback->message = NULL;
Azure.IoT Build 0:6ae2f7bca550 648 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
Azure.IoT Build 0:6ae2f7bca550 649 }
Azure.IoT Build 0:6ae2f7bca550 650
Azure.IoT Build 0:6ae2f7bca550 651 if (result == 0)
Azure.IoT Build 0:6ae2f7bca550 652 {
Azure.IoT Build 0:6ae2f7bca550 653 message_with_callback->on_message_send_complete = on_message_send_complete;
Azure.IoT Build 0:6ae2f7bca550 654 message_with_callback->context = callback_context;
Azure.IoT Build 0:6ae2f7bca550 655 message_with_callback->message_sender = message_sender_instance;
Azure.IoT Build 0:6ae2f7bca550 656
Azure.IoT Build 0:6ae2f7bca550 657 message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
Azure.IoT Build 0:6ae2f7bca550 658 message_sender_instance->message_count++;
Azure.IoT Build 0:6ae2f7bca550 659
Azure.IoT Build 0:6ae2f7bca550 660 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
Azure.IoT Build 0:6ae2f7bca550 661 {
Azure.IoT Build 0:6ae2f7bca550 662 switch (send_one_message(message_sender_instance, message_with_callback, message))
Azure.IoT Build 0:6ae2f7bca550 663 {
Azure.IoT Build 0:6ae2f7bca550 664 default:
Azure.IoT Build 0:6ae2f7bca550 665 case SEND_ONE_MESSAGE_ERROR:
Azure.IoT Build 0:6ae2f7bca550 666 remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
Azure.IoT Build 0:6ae2f7bca550 667 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 668 break;
Azure.IoT Build 0:6ae2f7bca550 669
Azure.IoT Build 0:6ae2f7bca550 670 case SEND_ONE_MESSAGE_BUSY:
Azure.IoT Build 0:6ae2f7bca550 671 message_with_callback->message = message_clone(message);
Azure.IoT Build 0:6ae2f7bca550 672 if (message_with_callback->message == NULL)
Azure.IoT Build 0:6ae2f7bca550 673 {
Azure.IoT Build 0:6ae2f7bca550 674 amqpalloc_free(message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 675 result = __LINE__;
Azure.IoT Build 0:6ae2f7bca550 676 }
Azure.IoT Build 0:6ae2f7bca550 677 else
Azure.IoT Build 0:6ae2f7bca550 678 {
Azure.IoT Build 0:6ae2f7bca550 679 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
Azure.IoT Build 0:6ae2f7bca550 680 result = 0;
Azure.IoT Build 0:6ae2f7bca550 681 }
Azure.IoT Build 0:6ae2f7bca550 682
Azure.IoT Build 0:6ae2f7bca550 683 break;
Azure.IoT Build 0:6ae2f7bca550 684
Azure.IoT Build 0:6ae2f7bca550 685 case SEND_ONE_MESSAGE_OK:
Azure.IoT Build 0:6ae2f7bca550 686 result = 0;
Azure.IoT Build 0:6ae2f7bca550 687 break;
Azure.IoT Build 0:6ae2f7bca550 688 }
Azure.IoT Build 0:6ae2f7bca550 689 }
Azure.IoT Build 0:6ae2f7bca550 690 }
Azure.IoT Build 0:6ae2f7bca550 691 }
Azure.IoT Build 0:6ae2f7bca550 692 }
Azure.IoT Build 0:6ae2f7bca550 693 }
Azure.IoT Build 0:6ae2f7bca550 694 }
Azure.IoT Build 0:6ae2f7bca550 695
Azure.IoT Build 0:6ae2f7bca550 696 return result;
Azure.IoT Build 0:6ae2f7bca550 697 }