Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: uamqp/src/message_sender.c
- Revision:
- 0:f7f1f0d76dd6
diff -r 000000000000 -r f7f1f0d76dd6 uamqp/src/message_sender.c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/uamqp/src/message_sender.c Thu Aug 23 06:52:14 2018 +0000
@@ -0,0 +1,948 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <string.h>
+#include "azure_c_shared_utility/optimize_size.h"
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/xlogging.h"
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_uamqp_c/link.h"
+#include "azure_uamqp_c/message.h"
+#include "azure_uamqp_c/message_sender.h"
+#include "azure_uamqp_c/amqpvalue_to_string.h"
+#include "azure_uamqp_c/async_operation.h"
+#include "azure_uamqp_c/amqp_definitions.h"
+
+typedef enum MESSAGE_SEND_STATE_TAG
+{
+ MESSAGE_SEND_STATE_NOT_SENT,
+ MESSAGE_SEND_STATE_PENDING
+} MESSAGE_SEND_STATE;
+
+typedef enum SEND_ONE_MESSAGE_RESULT_TAG
+{
+ SEND_ONE_MESSAGE_OK,
+ SEND_ONE_MESSAGE_ERROR,
+ SEND_ONE_MESSAGE_BUSY
+} SEND_ONE_MESSAGE_RESULT;
+
+typedef struct MESSAGE_WITH_CALLBACK_TAG
+{
+ MESSAGE_HANDLE message;
+ ON_MESSAGE_SEND_COMPLETE on_message_send_complete;
+ void* context;
+ MESSAGE_SENDER_HANDLE message_sender;
+ MESSAGE_SEND_STATE message_send_state;
+ tickcounter_ms_t timeout;
+} MESSAGE_WITH_CALLBACK;
+
+DEFINE_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK);
+
+typedef struct MESSAGE_SENDER_INSTANCE_TAG
+{
+ LINK_HANDLE link;
+ size_t message_count;
+ ASYNC_OPERATION_HANDLE* messages;
+ MESSAGE_SENDER_STATE message_sender_state;
+ ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
+ void* on_message_sender_state_changed_context;
+ unsigned int is_trace_on : 1;
+} MESSAGE_SENDER_INSTANCE;
+
+static void remove_pending_message_by_index(MESSAGE_SENDER_HANDLE message_sender, size_t index)
+{
+ ASYNC_OPERATION_HANDLE* new_messages;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[index]);
+
+ if (message_with_callback->message != NULL)
+ {
+ message_destroy(message_with_callback->message);
+ message_with_callback->message = NULL;
+ }
+
+ async_operation_destroy(message_sender->messages[index]);
+
+ if (message_sender->message_count - index > 1)
+ {
+ (void)memmove(&message_sender->messages[index], &message_sender->messages[index + 1], sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count - index - 1));
+ }
+
+ message_sender->message_count--;
+
+ if (message_sender->message_count > 0)
+ {
+ new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count));
+ if (new_messages != NULL)
+ {
+ message_sender->messages = new_messages;
+ }
+ }
+ else
+ {
+ free(message_sender->messages);
+ message_sender->messages = NULL;
+ }
+}
+
+static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send)
+{
+ size_t i;
+
+ for (i = 0; i < message_sender->message_count; i++)
+ {
+ if (message_sender->messages[i] == pending_send)
+ {
+ remove_pending_message_by_index(message_sender, i);
+ break;
+ }
+ }
+}
+
+static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
+{
+ ASYNC_OPERATION_HANDLE pending_send = (ASYNC_OPERATION_HANDLE)context;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+ MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+ (void)delivery_no;
+
+ if (message_with_callback->on_message_send_complete != NULL)
+ {
+ switch (reason)
+ {
+ case LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED:
+ if (delivery_state == NULL)
+ {
+ LogError("delivery state not provided");
+ }
+ else
+ {
+ AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state);
+ AMQP_VALUE described = amqpvalue_get_inplace_described_value(delivery_state);
+
+ if (descriptor == NULL)
+ {
+ LogError("Error getting descriptor for delivery state");
+ }
+ else if (is_accepted_type_by_descriptor(descriptor))
+ {
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, described);
+ }
+ else
+ {
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, described);
+ }
+ }
+
+ break;
+ case LINK_DELIVERY_SETTLE_REASON_SETTLED:
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, NULL);
+ break;
+ case LINK_DELIVERY_SETTLE_REASON_TIMEOUT:
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT, NULL);
+ break;
+ case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
+ default:
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL);
+ break;
+ }
+ }
+
+ remove_pending_message(message_sender, pending_send);
+}
+
+static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
+{
+ PAYLOAD* payload = (PAYLOAD*)context;
+ (void)memcpy((unsigned char*)payload->bytes + payload->length, bytes, length);
+ payload->length += length;
+ return 0;
+}
+
+static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender, const char* name, AMQP_VALUE value)
+{
+#ifdef NO_LOGGING
+ (void)message_sender;
+ (void)name;
+ (void)value;
+#else
+ if (xlogging_get_log_function() != NULL && message_sender->is_trace_on == 1)
+ {
+ char* value_as_string = NULL;
+ LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL(name));
+ LOG(AZ_LOG_TRACE, 0, "%s", ((value_as_string = amqpvalue_to_string(value)), P_OR_NULL(value_as_string)));
+ if (value_as_string != NULL)
+ {
+ free(value_as_string);
+ }
+ }
+#endif
+}
+
+static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
+{
+ SEND_ONE_MESSAGE_RESULT result;
+
+ size_t encoded_size;
+ size_t total_encoded_size = 0;
+ MESSAGE_BODY_TYPE message_body_type;
+ message_format message_format;
+
+ if ((message_get_body_type(message, &message_body_type) != 0) ||
+ (message_get_message_format(message, &message_format) != 0))
+ {
+ LogError("Failure getting message body type and/or message format");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ // header
+ HEADER_HANDLE header = NULL;
+ AMQP_VALUE header_amqp_value = NULL;
+ PROPERTIES_HANDLE properties = NULL;
+ AMQP_VALUE properties_amqp_value = NULL;
+ AMQP_VALUE application_properties = NULL;
+ AMQP_VALUE application_properties_value = NULL;
+ AMQP_VALUE body_amqp_value = NULL;
+ size_t body_data_count = 0;
+ AMQP_VALUE msg_annotations = NULL;
+ bool is_error = false;
+
+ // message header
+ if ((message_get_header(message, &header) == 0) &&
+ (header != NULL))
+ {
+ header_amqp_value = amqpvalue_create_header(header);
+ if (header_amqp_value == NULL)
+ {
+ LogError("Cannot create header AMQP value");
+ is_error = true;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain header encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+
+ }
+
+ // message annotations
+ if ((!is_error) &&
+ (message_get_message_annotations(message, &msg_annotations) == 0) &&
+ (msg_annotations != NULL))
+ {
+ if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain message annotations encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+
+ // properties
+ if ((!is_error) &&
+ (message_get_properties(message, &properties) == 0) &&
+ (properties != NULL))
+ {
+ properties_amqp_value = amqpvalue_create_properties(properties);
+ if (properties_amqp_value == NULL)
+ {
+ LogError("Cannot create message properties AMQP value");
+ is_error = true;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain message properties encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+ }
+
+ // application properties
+ if ((!is_error) &&
+ (message_get_application_properties(message, &application_properties) == 0) &&
+ (application_properties != NULL))
+ {
+ application_properties_value = amqpvalue_create_application_properties(application_properties);
+ if (application_properties_value == NULL)
+ {
+ LogError("Cannot create application properties AMQP value");
+ is_error = true;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain application properties encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+ }
+
+ if (is_error)
+ {
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ result = SEND_ONE_MESSAGE_OK;
+
+ // body - amqp data
+ switch (message_body_type)
+ {
+ default:
+ LogError("Unknown body type");
+ result = SEND_ONE_MESSAGE_ERROR;
+ break;
+
+ case MESSAGE_BODY_TYPE_VALUE:
+ {
+ AMQP_VALUE message_body_amqp_value;
+ if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
+ {
+ LogError("Cannot obtain AMQP value from body");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
+ if (body_amqp_value == NULL)
+ {
+ LogError("Cannot create body AMQP value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
+ {
+ LogError("Cannot get body AMQP value encoded size");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+ }
+
+ break;
+ }
+
+ case MESSAGE_BODY_TYPE_DATA:
+ {
+ BINARY_DATA binary_data;
+ size_t i;
+
+ if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
+ {
+ LogError("Cannot get body AMQP data count");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ for (i = 0; i < body_data_count; i++)
+ {
+ if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+ {
+ LogError("Cannot get body AMQP data %u", (unsigned int)i);
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ AMQP_VALUE body_amqp_data;
+ amqp_binary binary_value;
+ binary_value.bytes = binary_data.bytes;
+ binary_value.length = (uint32_t)binary_data.length;
+ body_amqp_data = amqpvalue_create_data(binary_value);
+ if (body_amqp_data == NULL)
+ {
+ LogError("Cannot create body AMQP data");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
+ {
+ LogError("Cannot get body AMQP data encoded size");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+
+ amqpvalue_destroy(body_amqp_data);
+ }
+ }
+ }
+ }
+ break;
+ }
+ }
+
+ if (result == 0)
+ {
+ void* data_bytes = malloc(total_encoded_size);
+ PAYLOAD payload;
+ payload.bytes = (const unsigned char*)data_bytes;
+ payload.length = 0;
+ result = SEND_ONE_MESSAGE_OK;
+
+ if (header != NULL)
+ {
+ if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode header value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Header:", header_amqp_value);
+ }
+
+ if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
+ {
+ if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode message annotations value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
+ }
+
+ if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
+ {
+ if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode message properties value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Properties:", properties_amqp_value);
+ }
+
+ if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
+ {
+ if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode application properties value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Application properties:", application_properties_value);
+ }
+
+ if (result == SEND_ONE_MESSAGE_OK)
+ {
+ switch (message_body_type)
+ {
+ default:
+ LogError("Unknown message type");
+ result = SEND_ONE_MESSAGE_ERROR;
+ break;
+
+ case MESSAGE_BODY_TYPE_VALUE:
+ {
+ if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode body AMQP value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
+ break;
+ }
+ case MESSAGE_BODY_TYPE_DATA:
+ {
+ BINARY_DATA binary_data;
+ size_t i;
+
+ for (i = 0; i < body_data_count; i++)
+ {
+ if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+ {
+ LogError("Cannot get AMQP data %u", (unsigned int)i);
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ AMQP_VALUE body_amqp_data;
+ amqp_binary binary_value;
+ binary_value.bytes = binary_data.bytes;
+ binary_value.length = (uint32_t)binary_data.length;
+ body_amqp_data = amqpvalue_create_data(binary_value);
+ if (body_amqp_data == NULL)
+ {
+ LogError("Cannot create body AMQP data %u", (unsigned int)i);
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode body AMQP data %u", (unsigned int)i);
+ result = SEND_ONE_MESSAGE_ERROR;
+ break;
+ }
+
+ amqpvalue_destroy(body_amqp_data);
+ }
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ if (result == SEND_ONE_MESSAGE_OK)
+ {
+ ASYNC_OPERATION_HANDLE transfer_async_operation;
+ LINK_TRANSFER_RESULT link_transfer_error;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+
+ transfer_async_operation = link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, &link_transfer_error, message_with_callback->timeout);
+ if (transfer_async_operation == NULL)
+ {
+ if (link_transfer_error == LINK_TRANSFER_BUSY)
+ {
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+ result = SEND_ONE_MESSAGE_BUSY;
+ }
+ else
+ {
+ LogError("Error in link transfer");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ }
+ else
+ {
+ result = SEND_ONE_MESSAGE_OK;
+ }
+ }
+
+ free(data_bytes);
+
+ if (body_amqp_value != NULL)
+ {
+ amqpvalue_destroy(body_amqp_value);
+ }
+ }
+ }
+
+ if (header != NULL)
+ {
+ header_destroy(header);
+ }
+
+ if (header_amqp_value != NULL)
+ {
+ amqpvalue_destroy(header_amqp_value);
+ }
+
+ if (msg_annotations != NULL)
+ {
+ annotations_destroy(msg_annotations);
+ }
+
+ if (application_properties != NULL)
+ {
+ amqpvalue_destroy(application_properties);
+ }
+
+ if (application_properties_value != NULL)
+ {
+ amqpvalue_destroy(application_properties_value);
+ }
+
+ if (properties_amqp_value != NULL)
+ {
+ amqpvalue_destroy(properties_amqp_value);
+ }
+
+ if (properties != NULL)
+ {
+ properties_destroy(properties);
+ }
+ }
+
+ return result;
+}
+
+static void send_all_pending_messages(MESSAGE_SENDER_HANDLE message_sender)
+{
+ size_t i;
+
+ for (i = 0; i < message_sender->message_count; i++)
+ {
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+ if (message_with_callback->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
+ {
+ switch (send_one_message(message_sender, message_sender->messages[i], message_with_callback->message))
+ {
+ default:
+ LogError("Invalid send one message result");
+ break;
+
+ case SEND_ONE_MESSAGE_ERROR:
+ {
+ ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_with_callback->on_message_send_complete;
+ void* context = message_with_callback->context;
+ remove_pending_message_by_index(message_sender, i);
+
+ if (on_message_send_complete != NULL)
+ {
+ on_message_send_complete(context, MESSAGE_SEND_ERROR, NULL);
+ }
+
+ i = message_sender->message_count;
+ break;
+ }
+ case SEND_ONE_MESSAGE_BUSY:
+ i = message_sender->message_count + 1;
+ break;
+
+ case SEND_ONE_MESSAGE_OK:
+ break;
+ }
+
+ i--;
+ }
+ }
+}
+
+static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender, MESSAGE_SENDER_STATE new_state)
+{
+ MESSAGE_SENDER_STATE previous_state = message_sender->message_sender_state;
+ message_sender->message_sender_state = new_state;
+ if (message_sender->on_message_sender_state_changed != NULL)
+ {
+ message_sender->on_message_sender_state_changed(message_sender->on_message_sender_state_changed_context, new_state, previous_state);
+ }
+}
+
+static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender)
+{
+ size_t i;
+
+ for (i = 0; i < message_sender->message_count; i++)
+ {
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+ if (message_with_callback->on_message_send_complete != NULL)
+ {
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL);
+ }
+
+ if (message_with_callback->message != NULL)
+ {
+ message_destroy(message_with_callback->message);
+ }
+ async_operation_destroy(message_sender->messages[i]);
+ }
+
+ if (message_sender->messages != NULL)
+ {
+ message_sender->message_count = 0;
+
+ free(message_sender->messages);
+ message_sender->messages = NULL;
+ }
+}
+
+static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
+{
+ MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+ (void)previous_link_state;
+
+ switch (new_link_state)
+ {
+ default:
+ break;
+
+ case LINK_STATE_ATTACHED:
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
+ {
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPEN);
+ }
+ break;
+ case LINK_STATE_DETACHED:
+ if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
+ (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
+ {
+ /* switch to closing so that no more requests should be accepted */
+ indicate_all_messages_as_error(message_sender);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE);
+ }
+ else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
+ {
+ /* Any other transition must be an error */
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+ }
+ break;
+ case LINK_STATE_ERROR:
+ if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
+ {
+ indicate_all_messages_as_error(message_sender);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+ }
+ break;
+ }
+}
+
+static void on_link_flow_on(void* context)
+{
+ MESSAGE_SENDER_HANDLE message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+ send_all_pending_messages(message_sender);
+}
+
+MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
+{
+ MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+ if (message_sender == NULL)
+ {
+ LogError("Failed allocating message sender");
+ }
+ else
+ {
+ message_sender->messages = NULL;
+ message_sender->message_count = 0;
+ message_sender->link = link;
+ message_sender->on_message_sender_state_changed = on_message_sender_state_changed;
+ message_sender->on_message_sender_state_changed_context = context;
+ message_sender->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
+ message_sender->is_trace_on = 0;
+ }
+
+ return message_sender;
+}
+
+void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
+{
+ if (message_sender == NULL)
+ {
+ LogError("NULL message_sender");
+ }
+ else
+ {
+ indicate_all_messages_as_error(message_sender);
+ (void)messagesender_close(message_sender);
+
+ free(message_sender);
+ }
+}
+
+int messagesender_open(MESSAGE_SENDER_HANDLE message_sender)
+{
+ int result;
+
+ if (message_sender == NULL)
+ {
+ LogError("NULL message_sender");
+ result = __FAILURE__;
+ }
+ else
+ {
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
+ {
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPENING);
+ if (link_attach(message_sender->link, NULL, on_link_state_changed, on_link_flow_on, message_sender) != 0)
+ {
+ LogError("attach link failed");
+ result = __FAILURE__;
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+
+ return result;
+}
+
+int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
+{
+ int result;
+
+ if (message_sender == NULL)
+ {
+ LogError("NULL message_sender");
+ result = __FAILURE__;
+ }
+ else
+ {
+ if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
+ (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
+ {
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_CLOSING);
+ if (link_detach(message_sender->link, true, NULL, NULL, NULL) != 0)
+ {
+ LogError("Detaching link failed");
+ result = __FAILURE__;
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+ else
+ {
+ result = 0;
+ }
+ }
+
+ return result;
+}
+
+static void messagesender_send_cancel_handler(ASYNC_OPERATION_HANDLE send_operation)
+{
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation);
+ if (message_with_callback->on_message_send_complete != NULL)
+ {
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED, NULL);
+ }
+
+ remove_pending_message(message_with_callback->message_sender, send_operation);
+}
+
+ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout)
+{
+ ASYNC_OPERATION_HANDLE result;
+
+ if ((message_sender == NULL) ||
+ (message == NULL))
+ {
+ LogError("Bad parameters: message_sender = %p, message = %p");
+ result = NULL;
+ }
+ else
+ {
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
+ {
+ LogError("Message sender in ERROR state");
+ result = NULL;
+ }
+ else
+ {
+ result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler);
+ if (result == NULL)
+ {
+ LogError("Failed allocating context for send");
+ }
+ else
+ {
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result);
+ ASYNC_OPERATION_HANDLE* new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count + 1));
+ if (new_messages == NULL)
+ {
+ LogError("Failed allocating memory for pending sends");
+ async_operation_destroy(result);
+ result = NULL;
+ }
+ else
+ {
+ message_with_callback->timeout = timeout;
+ message_sender->messages = new_messages;
+ if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
+ {
+ message_with_callback->message = message_clone(message);
+ if (message_with_callback->message == NULL)
+ {
+ LogError("Cannot clone message for placing it in the pending sends list");
+ async_operation_destroy(result);
+ result = NULL;
+ }
+
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+ }
+ else
+ {
+ message_with_callback->message = NULL;
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+ }
+
+ if (result != NULL)
+ {
+ message_with_callback->on_message_send_complete = on_message_send_complete;
+ message_with_callback->context = callback_context;
+ message_with_callback->message_sender = message_sender;
+
+ message_sender->messages[message_sender->message_count] = result;
+ message_sender->message_count++;
+
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
+ {
+ switch (send_one_message(message_sender, result, message))
+ {
+ default:
+ case SEND_ONE_MESSAGE_ERROR:
+ LogError("Error sending message");
+ remove_pending_message_by_index(message_sender, message_sender->message_count - 1);
+ result = NULL;
+ break;
+
+ case SEND_ONE_MESSAGE_BUSY:
+ message_with_callback->message = message_clone(message);
+ if (message_with_callback->message == NULL)
+ {
+ LogError("Error cloning message for placing it in the pending sends list");
+ async_operation_destroy(result);
+ result = NULL;
+ }
+ else
+ {
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+ }
+ break;
+
+ case SEND_ONE_MESSAGE_OK:
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return result;
+}
+
+void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
+{
+ if (message_sender == NULL)
+ {
+ LogError("NULL message_sender");
+ }
+ else
+ {
+ message_sender->is_trace_on = traceOn ? 1 : 0;
+ }
+}