A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Committer:
AzureIoTClient
Date:
Fri Aug 11 14:02:44 2017 -0700
Revision:
31:cafc87baef79
Parent:
29:4a11413cf217
Child:
33:08b53020ff0d
1.1.21

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Azure.IoT Build 0:6ae2f7bca550 1 // Copyright (c) Microsoft. All rights reserved.
Azure.IoT Build 0:6ae2f7bca550 2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
Azure.IoT Build 0:6ae2f7bca550 3
Azure.IoT Build 0:6ae2f7bca550 4 #include <stdlib.h>
AzureIoTClient 6:641a9672db08 5 #include <stdbool.h>
Azure.IoT Build 0:6ae2f7bca550 6 #include <string.h>
AzureIoTClient 19:000ab4e6a2c1 7 #include "azure_c_shared_utility/optimize_size.h"
AzureIoTClient 21:f9c433d8e6ca 8 #include "azure_c_shared_utility/gballoc.h"
Azure.IoT Build 5:ae49385aff34 9 #include "azure_c_shared_utility/xlogging.h"
Azure.IoT Build 0:6ae2f7bca550 10 #include "azure_uamqp_c/message_sender.h"
Azure.IoT Build 0:6ae2f7bca550 11 #include "azure_uamqp_c/amqpvalue_to_string.h"
Azure.IoT Build 0:6ae2f7bca550 12
Azure.IoT Build 0:6ae2f7bca550 13 typedef enum MESSAGE_SEND_STATE_TAG
Azure.IoT Build 0:6ae2f7bca550 14 {
AzureIoTClient 6:641a9672db08 15 MESSAGE_SEND_STATE_NOT_SENT,
AzureIoTClient 6:641a9672db08 16 MESSAGE_SEND_STATE_PENDING
Azure.IoT Build 0:6ae2f7bca550 17 } MESSAGE_SEND_STATE;
Azure.IoT Build 0:6ae2f7bca550 18
Azure.IoT Build 0:6ae2f7bca550 19 typedef enum SEND_ONE_MESSAGE_RESULT_TAG
Azure.IoT Build 0:6ae2f7bca550 20 {
AzureIoTClient 6:641a9672db08 21 SEND_ONE_MESSAGE_OK,
AzureIoTClient 6:641a9672db08 22 SEND_ONE_MESSAGE_ERROR,
AzureIoTClient 6:641a9672db08 23 SEND_ONE_MESSAGE_BUSY
Azure.IoT Build 0:6ae2f7bca550 24 } SEND_ONE_MESSAGE_RESULT;
Azure.IoT Build 0:6ae2f7bca550 25
Azure.IoT Build 0:6ae2f7bca550 26 typedef struct MESSAGE_WITH_CALLBACK_TAG
Azure.IoT Build 0:6ae2f7bca550 27 {
AzureIoTClient 6:641a9672db08 28 MESSAGE_HANDLE message;
AzureIoTClient 6:641a9672db08 29 ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
AzureIoTClient 6:641a9672db08 30 void* context;
AzureIoTClient 6:641a9672db08 31 MESSAGE_SENDER_HANDLE message_sender;
AzureIoTClient 6:641a9672db08 32 MESSAGE_SEND_STATE message_send_state;
Azure.IoT Build 0:6ae2f7bca550 33 } MESSAGE_WITH_CALLBACK;
Azure.IoT Build 0:6ae2f7bca550 34
Azure.IoT Build 0:6ae2f7bca550 35 typedef struct MESSAGE_SENDER_INSTANCE_TAG
Azure.IoT Build 0:6ae2f7bca550 36 {
AzureIoTClient 6:641a9672db08 37 LINK_HANDLE link;
AzureIoTClient 6:641a9672db08 38 size_t message_count;
AzureIoTClient 6:641a9672db08 39 MESSAGE_WITH_CALLBACK** messages;
AzureIoTClient 6:641a9672db08 40 MESSAGE_SENDER_STATE message_sender_state;
AzureIoTClient 6:641a9672db08 41 ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
AzureIoTClient 6:641a9672db08 42 void* on_message_sender_state_changed_context;
AzureIoTClient 6:641a9672db08 43 unsigned int is_trace_on : 1;
Azure.IoT Build 0:6ae2f7bca550 44 } MESSAGE_SENDER_INSTANCE;
Azure.IoT Build 0:6ae2f7bca550 45
Azure.IoT Build 0:6ae2f7bca550 46 static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
Azure.IoT Build 0:6ae2f7bca550 47 {
AzureIoTClient 6:641a9672db08 48 MESSAGE_WITH_CALLBACK** new_messages;
Azure.IoT Build 0:6ae2f7bca550 49
AzureIoTClient 6:641a9672db08 50 if (message_sender_instance->messages[index]->message != NULL)
AzureIoTClient 6:641a9672db08 51 {
AzureIoTClient 6:641a9672db08 52 message_destroy(message_sender_instance->messages[index]->message);
AzureIoTClient 6:641a9672db08 53 message_sender_instance->messages[index]->message = NULL;
AzureIoTClient 6:641a9672db08 54 }
Azure.IoT Build 0:6ae2f7bca550 55
AzureIoTClient 21:f9c433d8e6ca 56 free(message_sender_instance->messages[index]);
Azure.IoT Build 0:6ae2f7bca550 57
AzureIoTClient 6:641a9672db08 58 if (message_sender_instance->message_count - index > 1)
AzureIoTClient 6:641a9672db08 59 {
AzureIoTClient 6:641a9672db08 60 (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
AzureIoTClient 6:641a9672db08 61 }
Azure.IoT Build 0:6ae2f7bca550 62
AzureIoTClient 6:641a9672db08 63 message_sender_instance->message_count--;
Azure.IoT Build 0:6ae2f7bca550 64
AzureIoTClient 6:641a9672db08 65 if (message_sender_instance->message_count > 0)
AzureIoTClient 6:641a9672db08 66 {
AzureIoTClient 21:f9c433d8e6ca 67 new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
AzureIoTClient 6:641a9672db08 68 if (new_messages != NULL)
AzureIoTClient 6:641a9672db08 69 {
AzureIoTClient 6:641a9672db08 70 message_sender_instance->messages = new_messages;
AzureIoTClient 6:641a9672db08 71 }
AzureIoTClient 6:641a9672db08 72 }
AzureIoTClient 6:641a9672db08 73 else
AzureIoTClient 6:641a9672db08 74 {
AzureIoTClient 21:f9c433d8e6ca 75 free(message_sender_instance->messages);
AzureIoTClient 6:641a9672db08 76 message_sender_instance->messages = NULL;
AzureIoTClient 6:641a9672db08 77 }
Azure.IoT Build 0:6ae2f7bca550 78 }
Azure.IoT Build 0:6ae2f7bca550 79
Azure.IoT Build 0:6ae2f7bca550 80 static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
Azure.IoT Build 0:6ae2f7bca550 81 {
AzureIoTClient 6:641a9672db08 82 size_t i;
Azure.IoT Build 0:6ae2f7bca550 83
AzureIoTClient 6:641a9672db08 84 for (i = 0; i < message_sender_instance->message_count; i++)
AzureIoTClient 6:641a9672db08 85 {
AzureIoTClient 6:641a9672db08 86 if (message_sender_instance->messages[i] == message_with_callback)
AzureIoTClient 6:641a9672db08 87 {
AzureIoTClient 6:641a9672db08 88 remove_pending_message_by_index(message_sender_instance, i);
AzureIoTClient 6:641a9672db08 89 break;
AzureIoTClient 6:641a9672db08 90 }
AzureIoTClient 6:641a9672db08 91 }
Azure.IoT Build 0:6ae2f7bca550 92 }
Azure.IoT Build 0:6ae2f7bca550 93
AzureIoTClient 23:1111ee8bcba4 94 static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
Azure.IoT Build 0:6ae2f7bca550 95 {
AzureIoTClient 6:641a9672db08 96 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
AzureIoTClient 6:641a9672db08 97 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
AzureIoTClient 6:641a9672db08 98 (void)delivery_no;
Azure.IoT Build 0:6ae2f7bca550 99
AzureIoTClient 6:641a9672db08 100 if (message_with_callback->on_message_send_complete != NULL)
AzureIoTClient 6:641a9672db08 101 {
AzureIoTClient 23:1111ee8bcba4 102 switch (reason)
AzureIoTClient 7:9e9ab3b0efef 103 {
AzureIoTClient 23:1111ee8bcba4 104 case LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED:
AzureIoTClient 23:1111ee8bcba4 105 if (delivery_state == NULL)
AzureIoTClient 7:9e9ab3b0efef 106 {
AzureIoTClient 23:1111ee8bcba4 107 LogError("delivery state not provided");
AzureIoTClient 7:9e9ab3b0efef 108 }
AzureIoTClient 7:9e9ab3b0efef 109 else
AzureIoTClient 7:9e9ab3b0efef 110 {
AzureIoTClient 23:1111ee8bcba4 111 AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
AzureIoTClient 23:1111ee8bcba4 112
AzureIoTClient 23:1111ee8bcba4 113 if (descriptor == NULL)
AzureIoTClient 23:1111ee8bcba4 114 {
AzureIoTClient 23:1111ee8bcba4 115 LogError("Error getting descriptor for delivery state");
AzureIoTClient 23:1111ee8bcba4 116 }
AzureIoTClient 23:1111ee8bcba4 117 else if (is_accepted_type_by_descriptor(descriptor))
AzureIoTClient 23:1111ee8bcba4 118 {
AzureIoTClient 23:1111ee8bcba4 119 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
AzureIoTClient 23:1111ee8bcba4 120 }
AzureIoTClient 23:1111ee8bcba4 121 else
AzureIoTClient 23:1111ee8bcba4 122 {
AzureIoTClient 23:1111ee8bcba4 123 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
AzureIoTClient 23:1111ee8bcba4 124 }
AzureIoTClient 7:9e9ab3b0efef 125 }
AzureIoTClient 7:9e9ab3b0efef 126
AzureIoTClient 23:1111ee8bcba4 127 break;
AzureIoTClient 23:1111ee8bcba4 128 case LINK_DELIVERY_SETTLE_REASON_SETTLED:
AzureIoTClient 23:1111ee8bcba4 129 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
AzureIoTClient 23:1111ee8bcba4 130 break;
AzureIoTClient 23:1111ee8bcba4 131 case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
AzureIoTClient 23:1111ee8bcba4 132 default:
AzureIoTClient 23:1111ee8bcba4 133 message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
AzureIoTClient 23:1111ee8bcba4 134 break;
AzureIoTClient 7:9e9ab3b0efef 135 }
AzureIoTClient 6:641a9672db08 136 }
Azure.IoT Build 0:6ae2f7bca550 137
AzureIoTClient 6:641a9672db08 138 remove_pending_message(message_sender_instance, message_with_callback);
Azure.IoT Build 0:6ae2f7bca550 139 }
Azure.IoT Build 0:6ae2f7bca550 140
Azure.IoT Build 0:6ae2f7bca550 141 static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
Azure.IoT Build 0:6ae2f7bca550 142 {
AzureIoTClient 6:641a9672db08 143 PAYLOAD* payload = (PAYLOAD*)context;
AzureIoTClient 6:641a9672db08 144 (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
AzureIoTClient 6:641a9672db08 145 payload->length += length;
AzureIoTClient 6:641a9672db08 146 return 0;
Azure.IoT Build 0:6ae2f7bca550 147 }
Azure.IoT Build 0:6ae2f7bca550 148
Azure.IoT Build 0:6ae2f7bca550 149 static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
Azure.IoT Build 0:6ae2f7bca550 150 {
AzureIoTClient 9:c22db038556c 151 #ifdef NO_LOGGING
AzureIoTClient 9:c22db038556c 152 UNUSED(message_sender_instance);
AzureIoTClient 9:c22db038556c 153 UNUSED(name);
AzureIoTClient 9:c22db038556c 154 UNUSED(value);
AzureIoTClient 9:c22db038556c 155 #else
AzureIoTClient 6:641a9672db08 156 if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1)
AzureIoTClient 6:641a9672db08 157 {
AzureIoTClient 6:641a9672db08 158 char* value_as_string = NULL;
AzureIoTClient 16:22a72cf8e416 159 LOG(AZ_LOG_TRACE, 0, "%s", name);
AzureIoTClient 16:22a72cf8e416 160 LOG(AZ_LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
AzureIoTClient 6:641a9672db08 161 if (value_as_string != NULL)
AzureIoTClient 6:641a9672db08 162 {
AzureIoTClient 21:f9c433d8e6ca 163 free(value_as_string);
AzureIoTClient 6:641a9672db08 164 }
AzureIoTClient 6:641a9672db08 165 }
AzureIoTClient 9:c22db038556c 166 #endif
Azure.IoT Build 0:6ae2f7bca550 167 }
Azure.IoT Build 0:6ae2f7bca550 168
Azure.IoT Build 0:6ae2f7bca550 169 static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
Azure.IoT Build 0:6ae2f7bca550 170 {
AzureIoTClient 6:641a9672db08 171 SEND_ONE_MESSAGE_RESULT result;
Azure.IoT Build 0:6ae2f7bca550 172
AzureIoTClient 6:641a9672db08 173 size_t encoded_size;
AzureIoTClient 6:641a9672db08 174 size_t total_encoded_size = 0;
AzureIoTClient 6:641a9672db08 175 MESSAGE_BODY_TYPE message_body_type;
Azure.IoT Build 0:6ae2f7bca550 176 message_format message_format;
Azure.IoT Build 0:6ae2f7bca550 177
AzureIoTClient 6:641a9672db08 178 if ((message_get_body_type(message, &message_body_type) != 0) ||
Azure.IoT Build 0:6ae2f7bca550 179 (message_get_message_format(message, &message_format) != 0))
AzureIoTClient 6:641a9672db08 180 {
AzureIoTClient 29:4a11413cf217 181 LogError("Failure getting message body type and/or message format");
AzureIoTClient 6:641a9672db08 182 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 183 }
AzureIoTClient 6:641a9672db08 184 else
AzureIoTClient 6:641a9672db08 185 {
AzureIoTClient 6:641a9672db08 186 // header
AzureIoTClient 31:cafc87baef79 187 HEADER_HANDLE header = NULL;
AzureIoTClient 31:cafc87baef79 188 AMQP_VALUE header_amqp_value = NULL;
AzureIoTClient 31:cafc87baef79 189 PROPERTIES_HANDLE properties = NULL;
AzureIoTClient 31:cafc87baef79 190 AMQP_VALUE properties_amqp_value = NULL;
AzureIoTClient 31:cafc87baef79 191 AMQP_VALUE application_properties = NULL;
AzureIoTClient 31:cafc87baef79 192 AMQP_VALUE application_properties_value = NULL;
AzureIoTClient 6:641a9672db08 193 AMQP_VALUE body_amqp_value = NULL;
AzureIoTClient 6:641a9672db08 194 size_t body_data_count = 0;
AzureIoTClient 13:9abd748f4e78 195 AMQP_VALUE msg_annotations = NULL;
Azure.IoT Build 0:6ae2f7bca550 196
AzureIoTClient 6:641a9672db08 197 message_get_header(message, &header);
AzureIoTClient 6:641a9672db08 198 header_amqp_value = amqpvalue_create_header(header);
AzureIoTClient 6:641a9672db08 199 if (header != NULL)
AzureIoTClient 6:641a9672db08 200 {
AzureIoTClient 6:641a9672db08 201 amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
AzureIoTClient 6:641a9672db08 202 total_encoded_size += encoded_size;
AzureIoTClient 6:641a9672db08 203 }
Azure.IoT Build 0:6ae2f7bca550 204
AzureIoTClient 13:9abd748f4e78 205 // message annotations
AzureIoTClient 13:9abd748f4e78 206 if ((message_get_message_annotations(message, &msg_annotations) == 0) &&
AzureIoTClient 13:9abd748f4e78 207 (msg_annotations != NULL))
AzureIoTClient 13:9abd748f4e78 208 {
AzureIoTClient 13:9abd748f4e78 209 amqpvalue_get_encoded_size(msg_annotations, &encoded_size);
AzureIoTClient 13:9abd748f4e78 210 total_encoded_size += encoded_size;
AzureIoTClient 13:9abd748f4e78 211 }
AzureIoTClient 13:9abd748f4e78 212
AzureIoTClient 6:641a9672db08 213 // properties
AzureIoTClient 6:641a9672db08 214 message_get_properties(message, &properties);
AzureIoTClient 6:641a9672db08 215 properties_amqp_value = amqpvalue_create_properties(properties);
AzureIoTClient 6:641a9672db08 216 if (properties != NULL)
AzureIoTClient 6:641a9672db08 217 {
AzureIoTClient 6:641a9672db08 218 amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
AzureIoTClient 6:641a9672db08 219 total_encoded_size += encoded_size;
AzureIoTClient 6:641a9672db08 220 }
Azure.IoT Build 0:6ae2f7bca550 221
AzureIoTClient 6:641a9672db08 222 // application properties
AzureIoTClient 6:641a9672db08 223 message_get_application_properties(message, &application_properties);
AzureIoTClient 6:641a9672db08 224 if (application_properties != NULL)
AzureIoTClient 6:641a9672db08 225 {
AzureIoTClient 23:1111ee8bcba4 226 application_properties_value = amqpvalue_create_application_properties(application_properties);
AzureIoTClient 6:641a9672db08 227 amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
AzureIoTClient 6:641a9672db08 228 total_encoded_size += encoded_size;
AzureIoTClient 6:641a9672db08 229 }
AzureIoTClient 23:1111ee8bcba4 230 else
AzureIoTClient 23:1111ee8bcba4 231 {
AzureIoTClient 23:1111ee8bcba4 232 application_properties_value = NULL;
AzureIoTClient 23:1111ee8bcba4 233 }
Azure.IoT Build 0:6ae2f7bca550 234
AzureIoTClient 6:641a9672db08 235 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 236
AzureIoTClient 6:641a9672db08 237 // body - amqp data
AzureIoTClient 6:641a9672db08 238 switch (message_body_type)
AzureIoTClient 6:641a9672db08 239 {
AzureIoTClient 6:641a9672db08 240 default:
AzureIoTClient 6:641a9672db08 241 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 242 break;
Azure.IoT Build 0:6ae2f7bca550 243
AzureIoTClient 6:641a9672db08 244 case MESSAGE_BODY_TYPE_VALUE:
AzureIoTClient 6:641a9672db08 245 {
AzureIoTClient 6:641a9672db08 246 AMQP_VALUE message_body_amqp_value;
AzureIoTClient 24:2c59c2d43ebf 247 if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
AzureIoTClient 6:641a9672db08 248 {
AzureIoTClient 6:641a9672db08 249 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 250 }
AzureIoTClient 6:641a9672db08 251 else
AzureIoTClient 6:641a9672db08 252 {
AzureIoTClient 6:641a9672db08 253 body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
AzureIoTClient 6:641a9672db08 254 if ((body_amqp_value == NULL) ||
AzureIoTClient 6:641a9672db08 255 (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
AzureIoTClient 6:641a9672db08 256 {
AzureIoTClient 6:641a9672db08 257 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 258 }
AzureIoTClient 6:641a9672db08 259 else
AzureIoTClient 6:641a9672db08 260 {
AzureIoTClient 6:641a9672db08 261 total_encoded_size += encoded_size;
AzureIoTClient 6:641a9672db08 262 }
AzureIoTClient 6:641a9672db08 263 }
Azure.IoT Build 0:6ae2f7bca550 264
AzureIoTClient 6:641a9672db08 265 break;
AzureIoTClient 6:641a9672db08 266 }
Azure.IoT Build 0:6ae2f7bca550 267
AzureIoTClient 6:641a9672db08 268 case MESSAGE_BODY_TYPE_DATA:
AzureIoTClient 6:641a9672db08 269 {
AzureIoTClient 6:641a9672db08 270 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 271 size_t i;
Azure.IoT Build 0:6ae2f7bca550 272
Azure.IoT Build 0:6ae2f7bca550 273 if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
Azure.IoT Build 0:6ae2f7bca550 274 {
Azure.IoT Build 0:6ae2f7bca550 275 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 276 }
Azure.IoT Build 0:6ae2f7bca550 277 else
Azure.IoT Build 0:6ae2f7bca550 278 {
Azure.IoT Build 0:6ae2f7bca550 279 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 280 {
AzureIoTClient 24:2c59c2d43ebf 281 if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 282 {
Azure.IoT Build 0:6ae2f7bca550 283 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 284 }
Azure.IoT Build 0:6ae2f7bca550 285 else
Azure.IoT Build 0:6ae2f7bca550 286 {
AzureIoTClient 28:add19eb7defa 287 AMQP_VALUE body_amqp_data;
AzureIoTClient 6:641a9672db08 288 amqp_binary binary_value;
AzureIoTClient 6:641a9672db08 289 binary_value.bytes = binary_data.bytes;
AzureIoTClient 6:641a9672db08 290 binary_value.length = (uint32_t)binary_data.length;
AzureIoTClient 25:1101516ee67d 291 body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 292 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 293 {
Azure.IoT Build 0:6ae2f7bca550 294 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 295 }
Azure.IoT Build 0:6ae2f7bca550 296 else
Azure.IoT Build 0:6ae2f7bca550 297 {
Azure.IoT Build 0:6ae2f7bca550 298 if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
Azure.IoT Build 0:6ae2f7bca550 299 {
Azure.IoT Build 0:6ae2f7bca550 300 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 301 }
Azure.IoT Build 0:6ae2f7bca550 302 else
Azure.IoT Build 0:6ae2f7bca550 303 {
Azure.IoT Build 0:6ae2f7bca550 304 total_encoded_size += encoded_size;
Azure.IoT Build 0:6ae2f7bca550 305 }
Azure.IoT Build 0:6ae2f7bca550 306
Azure.IoT Build 0:6ae2f7bca550 307 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 308 }
Azure.IoT Build 0:6ae2f7bca550 309 }
Azure.IoT Build 0:6ae2f7bca550 310 }
Azure.IoT Build 0:6ae2f7bca550 311 }
AzureIoTClient 6:641a9672db08 312 break;
AzureIoTClient 6:641a9672db08 313 }
AzureIoTClient 6:641a9672db08 314 }
Azure.IoT Build 0:6ae2f7bca550 315
AzureIoTClient 6:641a9672db08 316 if (result == 0)
AzureIoTClient 6:641a9672db08 317 {
AzureIoTClient 21:f9c433d8e6ca 318 void* data_bytes = malloc(total_encoded_size);
AzureIoTClient 6:641a9672db08 319 PAYLOAD payload;
AzureIoTClient 23:1111ee8bcba4 320 payload.bytes = (const unsigned char*)data_bytes;
AzureIoTClient 6:641a9672db08 321 payload.length = 0;
AzureIoTClient 6:641a9672db08 322 result = SEND_ONE_MESSAGE_OK;
Azure.IoT Build 0:6ae2f7bca550 323
AzureIoTClient 6:641a9672db08 324 if (header != NULL)
AzureIoTClient 6:641a9672db08 325 {
AzureIoTClient 6:641a9672db08 326 if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
AzureIoTClient 6:641a9672db08 327 {
AzureIoTClient 6:641a9672db08 328 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 329 }
Azure.IoT Build 0:6ae2f7bca550 330
AzureIoTClient 6:641a9672db08 331 log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
AzureIoTClient 6:641a9672db08 332 }
Azure.IoT Build 0:6ae2f7bca550 333
AzureIoTClient 13:9abd748f4e78 334 if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
AzureIoTClient 13:9abd748f4e78 335 {
AzureIoTClient 13:9abd748f4e78 336 if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
AzureIoTClient 13:9abd748f4e78 337 {
AzureIoTClient 13:9abd748f4e78 338 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 13:9abd748f4e78 339 }
AzureIoTClient 13:9abd748f4e78 340
AzureIoTClient 13:9abd748f4e78 341 log_message_chunk(message_sender_instance, "Message Annotations:", msg_annotations);
AzureIoTClient 13:9abd748f4e78 342 }
AzureIoTClient 13:9abd748f4e78 343
AzureIoTClient 6:641a9672db08 344 if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
AzureIoTClient 6:641a9672db08 345 {
AzureIoTClient 6:641a9672db08 346 if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
AzureIoTClient 6:641a9672db08 347 {
AzureIoTClient 6:641a9672db08 348 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 349 }
Azure.IoT Build 0:6ae2f7bca550 350
AzureIoTClient 6:641a9672db08 351 log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
AzureIoTClient 6:641a9672db08 352 }
Azure.IoT Build 0:6ae2f7bca550 353
AzureIoTClient 6:641a9672db08 354 if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
AzureIoTClient 6:641a9672db08 355 {
AzureIoTClient 6:641a9672db08 356 if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
AzureIoTClient 6:641a9672db08 357 {
AzureIoTClient 6:641a9672db08 358 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 359 }
Azure.IoT Build 0:6ae2f7bca550 360
AzureIoTClient 6:641a9672db08 361 log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
AzureIoTClient 6:641a9672db08 362 }
Azure.IoT Build 0:6ae2f7bca550 363
AzureIoTClient 6:641a9672db08 364 if (result == SEND_ONE_MESSAGE_OK)
AzureIoTClient 6:641a9672db08 365 {
AzureIoTClient 6:641a9672db08 366 switch (message_body_type)
AzureIoTClient 6:641a9672db08 367 {
AzureIoTClient 17:923575db8b2d 368 default:
AzureIoTClient 17:923575db8b2d 369 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 17:923575db8b2d 370 break;
AzureIoTClient 17:923575db8b2d 371
AzureIoTClient 6:641a9672db08 372 case MESSAGE_BODY_TYPE_VALUE:
AzureIoTClient 6:641a9672db08 373 {
AzureIoTClient 6:641a9672db08 374 if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
AzureIoTClient 6:641a9672db08 375 {
AzureIoTClient 6:641a9672db08 376 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 377 }
Azure.IoT Build 0:6ae2f7bca550 378
AzureIoTClient 6:641a9672db08 379 log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
AzureIoTClient 6:641a9672db08 380 break;
AzureIoTClient 6:641a9672db08 381 }
AzureIoTClient 6:641a9672db08 382 case MESSAGE_BODY_TYPE_DATA:
AzureIoTClient 6:641a9672db08 383 {
Azure.IoT Build 0:6ae2f7bca550 384 BINARY_DATA binary_data;
Azure.IoT Build 0:6ae2f7bca550 385 size_t i;
Azure.IoT Build 0:6ae2f7bca550 386
Azure.IoT Build 0:6ae2f7bca550 387 for (i = 0; i < body_data_count; i++)
Azure.IoT Build 0:6ae2f7bca550 388 {
AzureIoTClient 24:2c59c2d43ebf 389 if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
Azure.IoT Build 0:6ae2f7bca550 390 {
Azure.IoT Build 0:6ae2f7bca550 391 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 392 }
Azure.IoT Build 0:6ae2f7bca550 393 else
Azure.IoT Build 0:6ae2f7bca550 394 {
AzureIoTClient 28:add19eb7defa 395 AMQP_VALUE body_amqp_data;
AzureIoTClient 6:641a9672db08 396 amqp_binary binary_value;
AzureIoTClient 6:641a9672db08 397 binary_value.bytes = binary_data.bytes;
AzureIoTClient 6:641a9672db08 398 binary_value.length = (uint32_t)binary_data.length;
AzureIoTClient 25:1101516ee67d 399 body_amqp_data = amqpvalue_create_data(binary_value);
Azure.IoT Build 0:6ae2f7bca550 400 if (body_amqp_data == NULL)
Azure.IoT Build 0:6ae2f7bca550 401 {
Azure.IoT Build 0:6ae2f7bca550 402 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 403 }
Azure.IoT Build 0:6ae2f7bca550 404 else
Azure.IoT Build 0:6ae2f7bca550 405 {
Azure.IoT Build 0:6ae2f7bca550 406 if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
Azure.IoT Build 0:6ae2f7bca550 407 {
Azure.IoT Build 0:6ae2f7bca550 408 result = SEND_ONE_MESSAGE_ERROR;
Azure.IoT Build 0:6ae2f7bca550 409 break;
Azure.IoT Build 0:6ae2f7bca550 410 }
Azure.IoT Build 0:6ae2f7bca550 411
Azure.IoT Build 0:6ae2f7bca550 412 amqpvalue_destroy(body_amqp_data);
Azure.IoT Build 0:6ae2f7bca550 413 }
Azure.IoT Build 0:6ae2f7bca550 414 }
Azure.IoT Build 0:6ae2f7bca550 415 }
AzureIoTClient 6:641a9672db08 416 break;
AzureIoTClient 6:641a9672db08 417 }
AzureIoTClient 6:641a9672db08 418 }
AzureIoTClient 6:641a9672db08 419 }
Azure.IoT Build 0:6ae2f7bca550 420
AzureIoTClient 6:641a9672db08 421 if (result == SEND_ONE_MESSAGE_OK)
AzureIoTClient 6:641a9672db08 422 {
AzureIoTClient 6:641a9672db08 423 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
AzureIoTClient 6:641a9672db08 424 switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
AzureIoTClient 6:641a9672db08 425 {
AzureIoTClient 6:641a9672db08 426 default:
AzureIoTClient 6:641a9672db08 427 case LINK_TRANSFER_ERROR:
AzureIoTClient 6:641a9672db08 428 result = SEND_ONE_MESSAGE_ERROR;
AzureIoTClient 6:641a9672db08 429 break;
Azure.IoT Build 0:6ae2f7bca550 430
AzureIoTClient 6:641a9672db08 431 case LINK_TRANSFER_BUSY:
AzureIoTClient 6:641a9672db08 432 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
AzureIoTClient 6:641a9672db08 433 result = SEND_ONE_MESSAGE_BUSY;
AzureIoTClient 6:641a9672db08 434 break;
Azure.IoT Build 0:6ae2f7bca550 435
AzureIoTClient 6:641a9672db08 436 case LINK_TRANSFER_OK:
AzureIoTClient 6:641a9672db08 437 result = SEND_ONE_MESSAGE_OK;
AzureIoTClient 6:641a9672db08 438 break;
AzureIoTClient 6:641a9672db08 439 }
AzureIoTClient 6:641a9672db08 440 }
Azure.IoT Build 0:6ae2f7bca550 441
AzureIoTClient 21:f9c433d8e6ca 442 free(data_bytes);
Azure.IoT Build 0:6ae2f7bca550 443
AzureIoTClient 6:641a9672db08 444 if (body_amqp_value != NULL)
AzureIoTClient 6:641a9672db08 445 {
AzureIoTClient 6:641a9672db08 446 amqpvalue_destroy(body_amqp_value);
AzureIoTClient 6:641a9672db08 447 }
AzureIoTClient 20:206846c14c80 448 }
Azure.IoT Build 0:6ae2f7bca550 449
AzureIoTClient 31:cafc87baef79 450 if (header != NULL)
AzureIoTClient 31:cafc87baef79 451 {
AzureIoTClient 31:cafc87baef79 452 header_destroy(header);
AzureIoTClient 31:cafc87baef79 453 }
AzureIoTClient 31:cafc87baef79 454
AzureIoTClient 31:cafc87baef79 455 if (header_amqp_value != NULL)
AzureIoTClient 31:cafc87baef79 456 {
AzureIoTClient 31:cafc87baef79 457 amqpvalue_destroy(header_amqp_value);
AzureIoTClient 31:cafc87baef79 458 }
AzureIoTClient 31:cafc87baef79 459
AzureIoTClient 31:cafc87baef79 460 if (msg_annotations != NULL)
AzureIoTClient 31:cafc87baef79 461 {
AzureIoTClient 31:cafc87baef79 462 annotations_destroy(msg_annotations);
AzureIoTClient 31:cafc87baef79 463 }
AzureIoTClient 31:cafc87baef79 464
AzureIoTClient 23:1111ee8bcba4 465 if (application_properties != NULL)
AzureIoTClient 23:1111ee8bcba4 466 {
AzureIoTClient 23:1111ee8bcba4 467 amqpvalue_destroy(application_properties);
AzureIoTClient 23:1111ee8bcba4 468 }
AzureIoTClient 23:1111ee8bcba4 469 if (application_properties_value != NULL)
AzureIoTClient 23:1111ee8bcba4 470 {
AzureIoTClient 23:1111ee8bcba4 471 amqpvalue_destroy(application_properties_value);
AzureIoTClient 23:1111ee8bcba4 472 }
AzureIoTClient 23:1111ee8bcba4 473 if (properties_amqp_value != NULL)
AzureIoTClient 23:1111ee8bcba4 474 {
AzureIoTClient 23:1111ee8bcba4 475 amqpvalue_destroy(properties_amqp_value);
AzureIoTClient 23:1111ee8bcba4 476 }
AzureIoTClient 23:1111ee8bcba4 477 if (properties != NULL)
AzureIoTClient 23:1111ee8bcba4 478 {
AzureIoTClient 23:1111ee8bcba4 479 properties_destroy(properties);
AzureIoTClient 23:1111ee8bcba4 480 }
AzureIoTClient 6:641a9672db08 481 }
Azure.IoT Build 0:6ae2f7bca550 482
AzureIoTClient 6:641a9672db08 483 return result;
Azure.IoT Build 0:6ae2f7bca550 484 }
Azure.IoT Build 0:6ae2f7bca550 485
Azure.IoT Build 0:6ae2f7bca550 486 static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
Azure.IoT Build 0:6ae2f7bca550 487 {
AzureIoTClient 6:641a9672db08 488 size_t i;
Azure.IoT Build 0:6ae2f7bca550 489
AzureIoTClient 6:641a9672db08 490 for (i = 0; i < message_sender_instance->message_count; i++)
AzureIoTClient 6:641a9672db08 491 {
AzureIoTClient 6:641a9672db08 492 if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
AzureIoTClient 6:641a9672db08 493 {
AzureIoTClient 6:641a9672db08 494 switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
AzureIoTClient 6:641a9672db08 495 {
AzureIoTClient 6:641a9672db08 496 default:
AzureIoTClient 6:641a9672db08 497 case SEND_ONE_MESSAGE_ERROR:
AzureIoTClient 6:641a9672db08 498 {
AzureIoTClient 6:641a9672db08 499 ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
AzureIoTClient 6:641a9672db08 500 void* context = message_sender_instance->messages[i]->context;
AzureIoTClient 6:641a9672db08 501 remove_pending_message_by_index(message_sender_instance, i);
Azure.IoT Build 0:6ae2f7bca550 502
AzureIoTClient 23:1111ee8bcba4 503 if (on_message_send_complete != NULL)
AzureIoTClient 23:1111ee8bcba4 504 {
AzureIoTClient 23:1111ee8bcba4 505 on_message_send_complete(context, MESSAGE_SEND_ERROR);
AzureIoTClient 23:1111ee8bcba4 506 }
AzureIoTClient 18:a922a4a30a82 507
AzureIoTClient 6:641a9672db08 508 i = message_sender_instance->message_count;
AzureIoTClient 6:641a9672db08 509 break;
AzureIoTClient 6:641a9672db08 510 }
AzureIoTClient 6:641a9672db08 511 case SEND_ONE_MESSAGE_BUSY:
AzureIoTClient 6:641a9672db08 512 i = message_sender_instance->message_count + 1;
AzureIoTClient 6:641a9672db08 513 break;
Azure.IoT Build 0:6ae2f7bca550 514
AzureIoTClient 6:641a9672db08 515 case SEND_ONE_MESSAGE_OK:
AzureIoTClient 6:641a9672db08 516 break;
AzureIoTClient 6:641a9672db08 517 }
Azure.IoT Build 0:6ae2f7bca550 518
AzureIoTClient 6:641a9672db08 519 i--;
AzureIoTClient 6:641a9672db08 520 }
AzureIoTClient 6:641a9672db08 521 }
Azure.IoT Build 0:6ae2f7bca550 522 }
Azure.IoT Build 0:6ae2f7bca550 523
Azure.IoT Build 0:6ae2f7bca550 524 static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
Azure.IoT Build 0:6ae2f7bca550 525 {
AzureIoTClient 6:641a9672db08 526 MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
AzureIoTClient 6:641a9672db08 527 message_sender_instance->message_sender_state = new_state;
AzureIoTClient 6:641a9672db08 528 if (message_sender_instance->on_message_sender_state_changed != NULL)
AzureIoTClient 6:641a9672db08 529 {
AzureIoTClient 6:641a9672db08 530 message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
AzureIoTClient 6:641a9672db08 531 }
Azure.IoT Build 0:6ae2f7bca550 532 }
Azure.IoT Build 0:6ae2f7bca550 533
AzureIoTClient 2:64b4feb67cd3 534 static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
AzureIoTClient 2:64b4feb67cd3 535 {
AzureIoTClient 2:64b4feb67cd3 536 size_t i;
AzureIoTClient 2:64b4feb67cd3 537
AzureIoTClient 2:64b4feb67cd3 538 for (i = 0; i < message_sender_instance->message_count; i++)
AzureIoTClient 2:64b4feb67cd3 539 {
AzureIoTClient 2:64b4feb67cd3 540 if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
AzureIoTClient 2:64b4feb67cd3 541 {
AzureIoTClient 2:64b4feb67cd3 542 message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
AzureIoTClient 2:64b4feb67cd3 543 }
AzureIoTClient 2:64b4feb67cd3 544
AzureIoTClient 2:64b4feb67cd3 545 message_destroy(message_sender_instance->messages[i]->message);
AzureIoTClient 21:f9c433d8e6ca 546 free(message_sender_instance->messages[i]);
AzureIoTClient 2:64b4feb67cd3 547 }
AzureIoTClient 2:64b4feb67cd3 548
AzureIoTClient 2:64b4feb67cd3 549 if (message_sender_instance->messages != NULL)
AzureIoTClient 2:64b4feb67cd3 550 {
AzureIoTClient 2:64b4feb67cd3 551 message_sender_instance->message_count = 0;
AzureIoTClient 2:64b4feb67cd3 552
AzureIoTClient 21:f9c433d8e6ca 553 free(message_sender_instance->messages);
AzureIoTClient 2:64b4feb67cd3 554 message_sender_instance->messages = NULL;
AzureIoTClient 2:64b4feb67cd3 555 }
AzureIoTClient 2:64b4feb67cd3 556 }
AzureIoTClient 2:64b4feb67cd3 557
Azure.IoT Build 0:6ae2f7bca550 558 static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
Azure.IoT Build 0:6ae2f7bca550 559 {
AzureIoTClient 6:641a9672db08 560 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
AzureIoTClient 6:641a9672db08 561 (void)previous_link_state;
Azure.IoT Build 0:6ae2f7bca550 562
AzureIoTClient 6:641a9672db08 563 switch (new_link_state)
AzureIoTClient 6:641a9672db08 564 {
AzureIoTClient 17:923575db8b2d 565 default:
AzureIoTClient 17:923575db8b2d 566 break;
AzureIoTClient 17:923575db8b2d 567
AzureIoTClient 6:641a9672db08 568 case LINK_STATE_ATTACHED:
AzureIoTClient 6:641a9672db08 569 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
AzureIoTClient 6:641a9672db08 570 {
AzureIoTClient 6:641a9672db08 571 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
AzureIoTClient 6:641a9672db08 572 }
AzureIoTClient 6:641a9672db08 573 break;
AzureIoTClient 6:641a9672db08 574 case LINK_STATE_DETACHED:
Azure.IoT Build 0:6ae2f7bca550 575 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
Azure.IoT Build 0:6ae2f7bca550 576 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
Azure.IoT Build 0:6ae2f7bca550 577 {
Azure.IoT Build 0:6ae2f7bca550 578 /* User initiated transition, we should be good */
AzureIoTClient 23:1111ee8bcba4 579 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
AzureIoTClient 2:64b4feb67cd3 580 indicate_all_messages_as_error(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 581 }
Azure.IoT Build 0:6ae2f7bca550 582 else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
Azure.IoT Build 0:6ae2f7bca550 583 {
Azure.IoT Build 0:6ae2f7bca550 584 /* Any other transition must be an error */
Azure.IoT Build 0:6ae2f7bca550 585 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
Azure.IoT Build 0:6ae2f7bca550 586 }
Azure.IoT Build 0:6ae2f7bca550 587 break;
AzureIoTClient 6:641a9672db08 588 case LINK_STATE_ERROR:
AzureIoTClient 6:641a9672db08 589 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
AzureIoTClient 6:641a9672db08 590 {
AzureIoTClient 23:1111ee8bcba4 591 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
AzureIoTClient 2:64b4feb67cd3 592 indicate_all_messages_as_error(message_sender_instance);
AzureIoTClient 6:641a9672db08 593 }
AzureIoTClient 6:641a9672db08 594 break;
AzureIoTClient 6:641a9672db08 595 }
Azure.IoT Build 0:6ae2f7bca550 596 }
Azure.IoT Build 0:6ae2f7bca550 597
Azure.IoT Build 0:6ae2f7bca550 598 static void on_link_flow_on(void* context)
Azure.IoT Build 0:6ae2f7bca550 599 {
AzureIoTClient 6:641a9672db08 600 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
AzureIoTClient 6:641a9672db08 601 send_all_pending_messages(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 602 }
Azure.IoT Build 0:6ae2f7bca550 603
Azure.IoT Build 5:ae49385aff34 604 MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
Azure.IoT Build 0:6ae2f7bca550 605 {
AzureIoTClient 23:1111ee8bcba4 606 MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
AzureIoTClient 6:641a9672db08 607 if (result != NULL)
AzureIoTClient 6:641a9672db08 608 {
AzureIoTClient 6:641a9672db08 609 result->messages = NULL;
AzureIoTClient 6:641a9672db08 610 result->message_count = 0;
AzureIoTClient 6:641a9672db08 611 result->link = link;
AzureIoTClient 6:641a9672db08 612 result->on_message_sender_state_changed = on_message_sender_state_changed;
AzureIoTClient 6:641a9672db08 613 result->on_message_sender_state_changed_context = context;
AzureIoTClient 6:641a9672db08 614 result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
AzureIoTClient 6:641a9672db08 615 result->is_trace_on = 0;
AzureIoTClient 6:641a9672db08 616 }
Azure.IoT Build 0:6ae2f7bca550 617
AzureIoTClient 6:641a9672db08 618 return result;
Azure.IoT Build 0:6ae2f7bca550 619 }
Azure.IoT Build 0:6ae2f7bca550 620
Azure.IoT Build 0:6ae2f7bca550 621 void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 622 {
AzureIoTClient 6:641a9672db08 623 if (message_sender != NULL)
AzureIoTClient 6:641a9672db08 624 {
AzureIoTClient 6:641a9672db08 625 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 626
AzureIoTClient 6:641a9672db08 627 messagesender_close(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 628
AzureIoTClient 2:64b4feb67cd3 629 indicate_all_messages_as_error(message_sender_instance);
Azure.IoT Build 0:6ae2f7bca550 630
AzureIoTClient 21:f9c433d8e6ca 631 free(message_sender);
AzureIoTClient 6:641a9672db08 632 }
Azure.IoT Build 0:6ae2f7bca550 633 }
Azure.IoT Build 0:6ae2f7bca550 634
Azure.IoT Build 0:6ae2f7bca550 635 int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 636 {
AzureIoTClient 6:641a9672db08 637 int result;
Azure.IoT Build 0:6ae2f7bca550 638
AzureIoTClient 6:641a9672db08 639 if (message_sender == NULL)
AzureIoTClient 6:641a9672db08 640 {
AzureIoTClient 19:000ab4e6a2c1 641 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 642 }
AzureIoTClient 6:641a9672db08 643 else
AzureIoTClient 6:641a9672db08 644 {
AzureIoTClient 6:641a9672db08 645 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 646
AzureIoTClient 6:641a9672db08 647 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
AzureIoTClient 6:641a9672db08 648 {
AzureIoTClient 6:641a9672db08 649 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
AzureIoTClient 6:641a9672db08 650 if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
AzureIoTClient 6:641a9672db08 651 {
AzureIoTClient 19:000ab4e6a2c1 652 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 653 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
AzureIoTClient 6:641a9672db08 654 }
AzureIoTClient 6:641a9672db08 655 else
AzureIoTClient 6:641a9672db08 656 {
AzureIoTClient 6:641a9672db08 657 result = 0;
AzureIoTClient 6:641a9672db08 658 }
AzureIoTClient 6:641a9672db08 659 }
AzureIoTClient 6:641a9672db08 660 else
AzureIoTClient 6:641a9672db08 661 {
AzureIoTClient 6:641a9672db08 662 result = 0;
AzureIoTClient 6:641a9672db08 663 }
AzureIoTClient 6:641a9672db08 664 }
Azure.IoT Build 0:6ae2f7bca550 665
AzureIoTClient 6:641a9672db08 666 return result;
Azure.IoT Build 0:6ae2f7bca550 667 }
Azure.IoT Build 0:6ae2f7bca550 668
Azure.IoT Build 0:6ae2f7bca550 669 int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
Azure.IoT Build 0:6ae2f7bca550 670 {
AzureIoTClient 6:641a9672db08 671 int result;
Azure.IoT Build 0:6ae2f7bca550 672
AzureIoTClient 6:641a9672db08 673 if (message_sender == NULL)
AzureIoTClient 6:641a9672db08 674 {
AzureIoTClient 19:000ab4e6a2c1 675 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 676 }
AzureIoTClient 6:641a9672db08 677 else
AzureIoTClient 6:641a9672db08 678 {
AzureIoTClient 6:641a9672db08 679 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
Azure.IoT Build 0:6ae2f7bca550 680
AzureIoTClient 6:641a9672db08 681 if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
AzureIoTClient 6:641a9672db08 682 (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
AzureIoTClient 6:641a9672db08 683 {
AzureIoTClient 6:641a9672db08 684 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
AzureIoTClient 12:b30dacf113f2 685 if (link_detach(message_sender_instance->link, true) != 0)
AzureIoTClient 6:641a9672db08 686 {
AzureIoTClient 19:000ab4e6a2c1 687 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 688 set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
AzureIoTClient 6:641a9672db08 689 }
AzureIoTClient 6:641a9672db08 690 else
AzureIoTClient 6:641a9672db08 691 {
AzureIoTClient 6:641a9672db08 692 result = 0;
AzureIoTClient 6:641a9672db08 693 }
AzureIoTClient 6:641a9672db08 694 }
AzureIoTClient 6:641a9672db08 695 else
AzureIoTClient 6:641a9672db08 696 {
AzureIoTClient 6:641a9672db08 697 result = 0;
AzureIoTClient 6:641a9672db08 698 }
AzureIoTClient 6:641a9672db08 699 }
Azure.IoT Build 0:6ae2f7bca550 700
AzureIoTClient 6:641a9672db08 701 return result;
Azure.IoT Build 0:6ae2f7bca550 702 }
Azure.IoT Build 0:6ae2f7bca550 703
Azure.IoT Build 0:6ae2f7bca550 704 int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
Azure.IoT Build 0:6ae2f7bca550 705 {
AzureIoTClient 6:641a9672db08 706 int result;
Azure.IoT Build 0:6ae2f7bca550 707
AzureIoTClient 6:641a9672db08 708 if ((message_sender == NULL) ||
AzureIoTClient 6:641a9672db08 709 (message == NULL))
AzureIoTClient 6:641a9672db08 710 {
AzureIoTClient 19:000ab4e6a2c1 711 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 712 }
AzureIoTClient 6:641a9672db08 713 else
AzureIoTClient 6:641a9672db08 714 {
AzureIoTClient 6:641a9672db08 715 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
AzureIoTClient 6:641a9672db08 716 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
AzureIoTClient 6:641a9672db08 717 {
AzureIoTClient 19:000ab4e6a2c1 718 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 719 }
AzureIoTClient 6:641a9672db08 720 else
AzureIoTClient 6:641a9672db08 721 {
AzureIoTClient 21:f9c433d8e6ca 722 MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)malloc(sizeof(MESSAGE_WITH_CALLBACK));
AzureIoTClient 6:641a9672db08 723 if (message_with_callback == NULL)
AzureIoTClient 6:641a9672db08 724 {
AzureIoTClient 19:000ab4e6a2c1 725 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 726 }
AzureIoTClient 6:641a9672db08 727 else
AzureIoTClient 6:641a9672db08 728 {
AzureIoTClient 21:f9c433d8e6ca 729 MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
AzureIoTClient 6:641a9672db08 730 if (new_messages == NULL)
AzureIoTClient 6:641a9672db08 731 {
AzureIoTClient 21:f9c433d8e6ca 732 free(message_with_callback);
AzureIoTClient 19:000ab4e6a2c1 733 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 734 }
AzureIoTClient 6:641a9672db08 735 else
AzureIoTClient 6:641a9672db08 736 {
AzureIoTClient 6:641a9672db08 737 result = 0;
Azure.IoT Build 0:6ae2f7bca550 738
AzureIoTClient 6:641a9672db08 739 message_sender_instance->messages = new_messages;
AzureIoTClient 6:641a9672db08 740 if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 6:641a9672db08 741 {
AzureIoTClient 6:641a9672db08 742 message_with_callback->message = message_clone(message);
AzureIoTClient 6:641a9672db08 743 if (message_with_callback->message == NULL)
AzureIoTClient 6:641a9672db08 744 {
AzureIoTClient 21:f9c433d8e6ca 745 free(message_with_callback);
AzureIoTClient 19:000ab4e6a2c1 746 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 747 }
AzureIoTClient 6:641a9672db08 748
AzureIoTClient 6:641a9672db08 749 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
AzureIoTClient 6:641a9672db08 750 }
AzureIoTClient 6:641a9672db08 751 else
AzureIoTClient 6:641a9672db08 752 {
AzureIoTClient 6:641a9672db08 753 message_with_callback->message = NULL;
AzureIoTClient 6:641a9672db08 754 message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
AzureIoTClient 6:641a9672db08 755 }
Azure.IoT Build 0:6ae2f7bca550 756
AzureIoTClient 6:641a9672db08 757 if (result == 0)
AzureIoTClient 6:641a9672db08 758 {
AzureIoTClient 6:641a9672db08 759 message_with_callback->on_message_send_complete = on_message_send_complete;
AzureIoTClient 6:641a9672db08 760 message_with_callback->context = callback_context;
AzureIoTClient 6:641a9672db08 761 message_with_callback->message_sender = message_sender_instance;
AzureIoTClient 6:641a9672db08 762
AzureIoTClient 6:641a9672db08 763 message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
AzureIoTClient 6:641a9672db08 764 message_sender_instance->message_count++;
Azure.IoT Build 0:6ae2f7bca550 765
AzureIoTClient 6:641a9672db08 766 if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
AzureIoTClient 6:641a9672db08 767 {
AzureIoTClient 6:641a9672db08 768 switch (send_one_message(message_sender_instance, message_with_callback, message))
AzureIoTClient 6:641a9672db08 769 {
AzureIoTClient 6:641a9672db08 770 default:
AzureIoTClient 6:641a9672db08 771 case SEND_ONE_MESSAGE_ERROR:
AzureIoTClient 23:1111ee8bcba4 772
AzureIoTClient 6:641a9672db08 773 remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
AzureIoTClient 19:000ab4e6a2c1 774 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 775 break;
Azure.IoT Build 0:6ae2f7bca550 776
AzureIoTClient 6:641a9672db08 777 case SEND_ONE_MESSAGE_BUSY:
AzureIoTClient 6:641a9672db08 778 message_with_callback->message = message_clone(message);
AzureIoTClient 6:641a9672db08 779 if (message_with_callback->message == NULL)
AzureIoTClient 6:641a9672db08 780 {
AzureIoTClient 21:f9c433d8e6ca 781 free(message_with_callback);
AzureIoTClient 19:000ab4e6a2c1 782 result = __FAILURE__;
AzureIoTClient 6:641a9672db08 783 }
AzureIoTClient 6:641a9672db08 784 else
AzureIoTClient 6:641a9672db08 785 {
AzureIoTClient 6:641a9672db08 786 message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
AzureIoTClient 6:641a9672db08 787 result = 0;
AzureIoTClient 6:641a9672db08 788 }
AzureIoTClient 6:641a9672db08 789 break;
Azure.IoT Build 0:6ae2f7bca550 790
AzureIoTClient 6:641a9672db08 791 case SEND_ONE_MESSAGE_OK:
AzureIoTClient 6:641a9672db08 792 result = 0;
AzureIoTClient 6:641a9672db08 793 break;
AzureIoTClient 6:641a9672db08 794 }
AzureIoTClient 6:641a9672db08 795 }
AzureIoTClient 6:641a9672db08 796 }
AzureIoTClient 6:641a9672db08 797 }
AzureIoTClient 6:641a9672db08 798 }
AzureIoTClient 6:641a9672db08 799 }
AzureIoTClient 6:641a9672db08 800 }
AzureIoTClient 6:641a9672db08 801 return result;
AzureIoTClient 6:641a9672db08 802 }
Azure.IoT Build 0:6ae2f7bca550 803
AzureIoTClient 6:641a9672db08 804 void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
AzureIoTClient 6:641a9672db08 805 {
AzureIoTClient 6:641a9672db08 806 MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
AzureIoTClient 6:641a9672db08 807 if (message_sender_instance != NULL)
AzureIoTClient 6:641a9672db08 808 {
AzureIoTClient 6:641a9672db08 809 message_sender_instance->is_trace_on = traceOn ? 1 : 0;
AzureIoTClient 6:641a9672db08 810 }
Azure.IoT Build 0:6ae2f7bca550 811 }