Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Revision 34:6be9c2058664, committed 2017-10-21
- Comitter:
- AzureIoTClient
- Date:
- Sat Oct 21 20:12:19 2017 +0000
- Parent:
- 33:08b53020ff0d
- Child:
- 35:d0bed2404ee9
- Commit message:
- 1.1.26
Changed in this revision
--- a/amqp_management.c Mon Sep 25 13:38:40 2017 -0700
+++ b/amqp_management.c Sat Oct 21 20:12:19 2017 +0000
@@ -1071,9 +1071,9 @@
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send`. ]*/
- if (messagesender_send(amqp_management->message_sender, cloned_message, NULL, NULL) != 0)
+ if (messagesender_send_async(amqp_management->message_sender, cloned_message, NULL, NULL, 0) == NULL)
{
- /* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send` fails, `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/
+ /* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send_async` fails, `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/
LogError("Could not send request message");
(void)singlylinkedlist_remove(amqp_management->pending_operations, added_item);
free(pending_operation_message);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/async_operation.c Sat Oct 21 20:12:19 2017 +0000
@@ -0,0 +1,85 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <string.h>
+#include "azure_c_shared_utility/optimize_size.h"
+#include "azure_c_shared_utility/gballoc.h"
+#include "azure_c_shared_utility/xlogging.h"
+#include "azure_uamqp_c/async_operation.h"
+
+typedef struct ASYNC_OPERATION_INSTANCE_TAG
+{
+ ASYNC_OPERATION_CANCEL_HANDLER_FUNC async_operation_cancel_handler;
+} ASYNC_OPERATION_INSTANCE;
+
+ASYNC_OPERATION_HANDLE async_operation_create(ASYNC_OPERATION_CANCEL_HANDLER_FUNC async_operation_cancel_handler, size_t context_size)
+{
+ ASYNC_OPERATION_INSTANCE* async_operation;
+
+ if (async_operation_cancel_handler == NULL)
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_002: [ If `async_operation_cancel_handler` is NULL, `async_operation_create` shall fail and return NULL.]*/
+ LogError("Cannot allocate memory for async operation");
+ async_operation = NULL;
+ }
+ else if (context_size < sizeof(ASYNC_OPERATION_INSTANCE))
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_003: [ If `context_size` is less than the size of the `async_operation_cancel_handler` argument, `async_operation_create` shall fail and return NULL.]*/
+ LogError("Context size too small");
+ async_operation = NULL;
+ }
+ else
+ {
+ async_operation = (ASYNC_OPERATION_INSTANCE*)malloc(context_size);
+ if (async_operation == NULL)
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_004: [ If allocating memory for the new asynchronous operation instance fails, `async_operation_create` shall fail and return NULL.]*/
+ LogError("Cannot allocate memory for async operation");
+ }
+ else
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_001: [ `async_operation_create` shall return a non-NULL handle to a newly created asynchronous operation instance.]*/
+ async_operation->async_operation_cancel_handler = async_operation_cancel_handler;
+ }
+ }
+
+ return async_operation;
+}
+
+void async_operation_destroy(ASYNC_OPERATION_HANDLE async_operation)
+{
+ if (async_operation == NULL)
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_006: [ If `async_operation` is NULL, `async_operation_destroy` shall do nothing.]*/
+ LogError("NULL async_operation");
+ }
+ else
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_005: [ `async_operation_destroy` shall free all recources associated with the asyncronous operation instance.]*/
+ free(async_operation);
+ }
+}
+
+int async_operation_cancel(ASYNC_OPERATION_HANDLE async_operation)
+{
+ int result;
+
+ if (async_operation == NULL)
+ {
+ LogError("NULL async_operation");
+ result = __FAILURE__;
+ }
+ else
+ {
+ /* Codes_SRS_ASYNC_OPERATION_01_007: [ `async_operation_cancel` shall cancel the operation by calling the cancel handler function passed to `async_operation_create`.]*/
+ async_operation->async_operation_cancel_handler(async_operation);
+
+ /* Codes_SRS_ASYNC_OPERATION_01_008: [ On success `async_operation_cancel` shall return 0.]*/
+ result = 0;
+ }
+
+ return result;
+}
--- a/azure_uamqp_c/amqp_management.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/amqp_management.h Sat Oct 21 20:12:19 2017 +0000
@@ -5,6 +5,7 @@
#define AMQP_MANAGEMENT_H
#include <stdbool.h>
+#include "azure_c_shared_utility/macro_utils.h"
#include "azure_uamqp_c/session.h"
#include "azure_uamqp_c/message.h"
@@ -14,20 +15,20 @@
#include "azure_c_shared_utility/umock_c_prod.h"
- typedef enum AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT_TAG
- {
- AMQP_MANAGEMENT_EXECUTE_OPERATION_OK,
- AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR,
- AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS,
- AMQP_MANAGEMENT_EXECUTE_OPERATION_INSTANCE_CLOSED
- } AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT;
+#define AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT_VALUES \
+ AMQP_MANAGEMENT_EXECUTE_OPERATION_OK, \
+ AMQP_MANAGEMENT_EXECUTE_OPERATION_ERROR, \
+ AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS, \
+ AMQP_MANAGEMENT_EXECUTE_OPERATION_INSTANCE_CLOSED
- typedef enum AMQP_MANAGEMENT_OPEN_RESULT_TAG
- {
- AMQP_MANAGEMENT_OPEN_OK,
- AMQP_MANAGEMENT_OPEN_ERROR,
- AMQP_MANAGEMENT_OPEN_CANCELLED
- } AMQP_MANAGEMENT_OPEN_RESULT;
+DEFINE_ENUM(AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT, AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT_VALUES)
+
+#define AMQP_MANAGEMENT_OPEN_RESULT_VALUES \
+ AMQP_MANAGEMENT_OPEN_OK, \
+ AMQP_MANAGEMENT_OPEN_ERROR, \
+ AMQP_MANAGEMENT_OPEN_CANCELLED
+
+DEFINE_ENUM(AMQP_MANAGEMENT_OPEN_RESULT, AMQP_MANAGEMENT_OPEN_RESULT_VALUES)
typedef struct AMQP_MANAGEMENT_INSTANCE_TAG* AMQP_MANAGEMENT_HANDLE;
typedef void(*ON_AMQP_MANAGEMENT_OPEN_COMPLETE)(void* context, AMQP_MANAGEMENT_OPEN_RESULT open_result);
--- a/azure_uamqp_c/amqp_types.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/amqp_types.h Sat Oct 21 20:12:19 2017 +0000
@@ -5,38 +5,39 @@
#define ANQP_TYPES_H
#include <stddef.h>
+#include "azure_c_shared_utility/macro_utils.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
- typedef enum AMQP_TYPE_TAG
- {
- AMQP_TYPE_NULL,
- AMQP_TYPE_BOOL,
- AMQP_TYPE_UBYTE,
- AMQP_TYPE_USHORT,
- AMQP_TYPE_UINT,
- AMQP_TYPE_ULONG,
- AMQP_TYPE_BYTE,
- AMQP_TYPE_SHORT,
- AMQP_TYPE_INT,
- AMQP_TYPE_LONG,
- AMQP_TYPE_FLOAT,
- AMQP_TYPE_DOUBLE,
- AMQP_TYPE_CHAR,
- AMQP_TYPE_TIMESTAMP,
- AMQP_TYPE_UUID,
- AMQP_TYPE_BINARY,
- AMQP_TYPE_STRING,
- AMQP_TYPE_SYMBOL,
- AMQP_TYPE_LIST,
- AMQP_TYPE_MAP,
- AMQP_TYPE_ARRAY,
- AMQP_TYPE_DESCRIBED,
- AMQP_TYPE_COMPOSITE,
- AMQP_TYPE_UNKNOWN
- } AMQP_TYPE;
+#define AMQP_TYPE_VALUES \
+ AMQP_TYPE_NULL, \
+ AMQP_TYPE_BOOL, \
+ AMQP_TYPE_UBYTE, \
+ AMQP_TYPE_USHORT, \
+ AMQP_TYPE_UINT, \
+ AMQP_TYPE_ULONG, \
+ AMQP_TYPE_BYTE, \
+ AMQP_TYPE_SHORT, \
+ AMQP_TYPE_INT, \
+ AMQP_TYPE_LONG, \
+ AMQP_TYPE_FLOAT, \
+ AMQP_TYPE_DOUBLE, \
+ AMQP_TYPE_CHAR, \
+ AMQP_TYPE_TIMESTAMP, \
+ AMQP_TYPE_UUID, \
+ AMQP_TYPE_BINARY, \
+ AMQP_TYPE_STRING, \
+ AMQP_TYPE_SYMBOL, \
+ AMQP_TYPE_LIST, \
+ AMQP_TYPE_MAP, \
+ AMQP_TYPE_ARRAY, \
+ AMQP_TYPE_DESCRIBED, \
+ AMQP_TYPE_COMPOSITE, \
+ AMQP_TYPE_UNKNOWN
+
+DEFINE_ENUM(AMQP_TYPE, AMQP_TYPE_VALUES);
#ifdef __cplusplus
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/azure_uamqp_c/async_operation.h Sat Oct 21 20:12:19 2017 +0000
@@ -0,0 +1,43 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+#ifndef ASYNC_OPERATION_H
+#define ASYNC_OPERATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#include <cstdint>
+#include <cstddef>
+#else
+#include <stdint.h>
+#include <stddef.h>
+#endif /* __cplusplus */
+
+#include "azure_c_shared_utility/umock_c_prod.h"
+
+typedef struct ASYNC_OPERATION_INSTANCE_TAG* ASYNC_OPERATION_HANDLE;
+
+typedef void(*ASYNC_OPERATION_CANCEL_HANDLER_FUNC)(ASYNC_OPERATION_HANDLE async_operation);
+
+#define DEFINE_ASYNC_OPERATION_CONTEXT(type) \
+typedef struct C3(ASYNC_OPERATION_CONTEXT_STRUCT_, type, _TAG) \
+{ \
+ ASYNC_OPERATION_CANCEL_HANDLER_FUNC async_operation_cancel_handler; \
+ type context; \
+} C2(ASYNC_OPERATION_CONTEXT_STRUCT_, type);
+
+#define GET_ASYNC_OPERATION_CONTEXT(type, async_operation) \
+ (type*)((unsigned char*)async_operation + offsetof(C2(ASYNC_OPERATION_CONTEXT_STRUCT_, type), context))
+
+#define CREATE_ASYNC_OPERATION(type, async_operation_cancel_handler) \
+ async_operation_create(async_operation_cancel_handler, sizeof(C2(ASYNC_OPERATION_CONTEXT_STRUCT_, type)))
+
+MOCKABLE_FUNCTION(, ASYNC_OPERATION_HANDLE, async_operation_create, ASYNC_OPERATION_CANCEL_HANDLER_FUNC, async_operation_cancel_handler, size_t, context_size);
+MOCKABLE_FUNCTION(, void, async_operation_destroy, ASYNC_OPERATION_HANDLE, async_operation);
+MOCKABLE_FUNCTION(, int, async_operation_cancel, ASYNC_OPERATION_HANDLE, async_operation);
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif /* ASYNC_OPERATION_H */
--- a/azure_uamqp_c/cbs.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/cbs.h Sat Oct 21 20:12:19 2017 +0000
@@ -5,6 +5,8 @@
#define CBS_H
#include "azure_uamqp_c/session.h"
+#include "azure_c_shared_utility/macro_utils.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
#ifdef __cplusplus
extern "C" {
@@ -12,22 +14,20 @@
#include <stdbool.h>
#endif /* __cplusplus */
-#include "azure_c_shared_utility/umock_c_prod.h"
+#define CBS_OPERATION_RESULT_VALUES \
+ CBS_OPERATION_RESULT_OK, \
+ CBS_OPERATION_RESULT_CBS_ERROR, \
+ CBS_OPERATION_RESULT_OPERATION_FAILED, \
+ CBS_OPERATION_RESULT_INSTANCE_CLOSED
- typedef enum CBS_OPERATION_RESULT_TAG
- {
- CBS_OPERATION_RESULT_OK,
- CBS_OPERATION_RESULT_CBS_ERROR,
- CBS_OPERATION_RESULT_OPERATION_FAILED,
- CBS_OPERATION_RESULT_INSTANCE_CLOSED
- } CBS_OPERATION_RESULT;
+DEFINE_ENUM(CBS_OPERATION_RESULT, CBS_OPERATION_RESULT_VALUES)
- typedef enum CBS_OPEN_COMPLETE_RESULT_TAG
- {
- CBS_OPEN_OK,
- CBS_OPEN_ERROR,
- CBS_OPEN_CANCELLED
- } CBS_OPEN_COMPLETE_RESULT;
+#define CBS_OPEN_COMPLETE_RESULT_VALUES \
+ CBS_OPEN_OK, \
+ CBS_OPEN_ERROR, \
+ CBS_OPEN_CANCELLED
+
+DEFINE_ENUM(CBS_OPEN_COMPLETE_RESULT, CBS_OPEN_COMPLETE_RESULT_VALUES)
typedef struct CBS_INSTANCE_TAG* CBS_HANDLE;
typedef void(*ON_CBS_OPEN_COMPLETE)(void* context, CBS_OPEN_COMPLETE_RESULT open_complete_result);
--- a/azure_uamqp_c/connection.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/connection.h Sat Oct 21 20:12:19 2017 +0000
@@ -8,6 +8,7 @@
#include <stdbool.h>
#include <stdint.h>
#include "azure_c_shared_utility/xio.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
#include "azure_uamqp_c/amqp_frame_codec.h"
#include "azure_uamqp_c/amqp_definitions.h"
@@ -15,8 +16,6 @@
extern "C" {
#endif /* __cplusplus */
-#include "azure_c_shared_utility/umock_c_prod.h"
-
typedef struct CONNECTION_INSTANCE_TAG* CONNECTION_HANDLE;
typedef struct ENDPOINT_INSTANCE_TAG* ENDPOINT_HANDLE;
--- a/azure_uamqp_c/link.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/link.h Sat Oct 21 20:12:19 2017 +0000
@@ -5,6 +5,8 @@
#define LINK_H
#include <stddef.h>
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_c_shared_utility/macro_utils.h"
#include "azure_uamqp_c/session.h"
#include "azure_uamqp_c/amqpvalue.h"
#include "azure_uamqp_c/amqp_definitions.h"
@@ -17,28 +19,29 @@
typedef struct LINK_INSTANCE_TAG* LINK_HANDLE;
-typedef enum LINK_STATE_TAG
-{
- LINK_STATE_DETACHED,
- LINK_STATE_HALF_ATTACHED_ATTACH_SENT,
- LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED,
- LINK_STATE_ATTACHED,
+#define LINK_STATE_VALUES \
+ LINK_STATE_DETACHED, \
+ LINK_STATE_HALF_ATTACHED_ATTACH_SENT, \
+ LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED, \
+ LINK_STATE_ATTACHED, \
LINK_STATE_ERROR
-} LINK_STATE;
+
+DEFINE_ENUM(LINK_STATE, LINK_STATE_VALUES)
-typedef enum LINK_TRANSFER_RESULT_TAG
-{
- LINK_TRANSFER_OK,
- LINK_TRANSFER_ERROR,
+#define LINK_TRANSFER_RESULT_VALUES \
+ LINK_TRANSFER_OK, \
+ LINK_TRANSFER_ERROR, \
LINK_TRANSFER_BUSY
-} LINK_TRANSFER_RESULT;
+
+DEFINE_ENUM(LINK_TRANSFER_RESULT, LINK_TRANSFER_RESULT_VALUES)
-typedef enum LINK_DELIVERY_SETTLE_REASON_TAG
-{
- LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED,
- LINK_DELIVERY_SETTLE_REASON_SETTLED,
- LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED
-} LINK_DELIVERY_SETTLE_REASON;
+#define LINK_DELIVERY_SETTLE_REASON_VALUES \
+ LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, \
+ LINK_DELIVERY_SETTLE_REASON_SETTLED, \
+ LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED, \
+ LINK_DELIVERY_SETTLE_REASON_TIMEOUT
+
+DEFINE_ENUM(LINK_DELIVERY_SETTLE_REASON, LINK_DELIVERY_SETTLE_REASON_VALUES)
typedef void(*ON_DELIVERY_SETTLED)(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state);
typedef AMQP_VALUE(*ON_TRANSFER_RECEIVED)(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes);
@@ -63,7 +66,8 @@
MOCKABLE_FUNCTION(, int, link_send_disposition, LINK_HANDLE, link, delivery_number, message_number, AMQP_VALUE, delivery_state);
MOCKABLE_FUNCTION(, int, link_attach, LINK_HANDLE, link, ON_TRANSFER_RECEIVED, on_transfer_received, ON_LINK_STATE_CHANGED, on_link_state_changed, ON_LINK_FLOW_ON, on_link_flow_on, void*, callback_context);
MOCKABLE_FUNCTION(, int, link_detach, LINK_HANDLE, link, bool, close);
-MOCKABLE_FUNCTION(, LINK_TRANSFER_RESULT, link_transfer, LINK_HANDLE, handle, message_format, message_format, PAYLOAD*, payloads, size_t, payload_count, ON_DELIVERY_SETTLED, on_delivery_settled, void*, callback_context);
+MOCKABLE_FUNCTION(, LINK_TRANSFER_RESULT, link_transfer_async, LINK_HANDLE, handle, message_format, message_format, PAYLOAD*, payloads, size_t, payload_count, ON_DELIVERY_SETTLED, on_delivery_settled, void*, callback_context, tickcounter_ms_t, timeout);
+MOCKABLE_FUNCTION(, void, link_dowork, LINK_HANDLE, link);
#ifdef __cplusplus
}
--- a/azure_uamqp_c/message_receiver.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/message_receiver.h Sat Oct 21 20:12:19 2017 +0000
@@ -7,6 +7,8 @@
#include "azure_uamqp_c/link.h"
#include "azure_uamqp_c/message.h"
#include "azure_uamqp_c/amqp_definitions.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
+#include "azure_c_shared_utility/macro_utils.h"
#ifdef __cplusplus
extern "C" {
@@ -14,16 +16,14 @@
#include <stdbool.h>
#endif /* __cplusplus */
-#include "azure_c_shared_utility/umock_c_prod.h"
+#define MESSAGE_RECEIVER_STATE_VALUES \
+ MESSAGE_RECEIVER_STATE_IDLE, \
+ MESSAGE_RECEIVER_STATE_OPENING, \
+ MESSAGE_RECEIVER_STATE_OPEN, \
+ MESSAGE_RECEIVER_STATE_CLOSING, \
+ MESSAGE_RECEIVER_STATE_ERROR
- typedef enum MESSAGE_RECEIVER_STATE_TAG
- {
- MESSAGE_RECEIVER_STATE_IDLE,
- MESSAGE_RECEIVER_STATE_OPENING,
- MESSAGE_RECEIVER_STATE_OPEN,
- MESSAGE_RECEIVER_STATE_CLOSING,
- MESSAGE_RECEIVER_STATE_ERROR
- } MESSAGE_RECEIVER_STATE;
+DEFINE_ENUM(MESSAGE_RECEIVER_STATE, MESSAGE_RECEIVER_STATE_VALUES)
typedef struct MESSAGE_RECEIVER_INSTANCE_TAG* MESSAGE_RECEIVER_HANDLE;
typedef AMQP_VALUE (*ON_MESSAGE_RECEIVED)(const void* context, MESSAGE_HANDLE message);
--- a/azure_uamqp_c/message_sender.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/message_sender.h Sat Oct 21 20:12:19 2017 +0000
@@ -8,27 +8,31 @@
#include "azure_uamqp_c/link.h"
#include "azure_uamqp_c/message.h"
#include "azure_c_shared_utility/xlogging.h"
+#include "azure_uamqp_c/async_operation.h"
+#include "azure_c_shared_utility/tickcounter.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
+#include "azure_c_shared_utility/macro_utils.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
-#include "azure_c_shared_utility/umock_c_prod.h"
+#define MESSAGE_SEND_RESULT_VALUES \
+ MESSAGE_SEND_OK, \
+ MESSAGE_SEND_ERROR, \
+ MESSAGE_SEND_TIMEOUT, \
+ MESSAGE_SEND_CANCELLED
- typedef enum MESSAGE_SEND_RESULT_TAG
- {
- MESSAGE_SEND_OK,
- MESSAGE_SEND_ERROR
- } MESSAGE_SEND_RESULT;
+DEFINE_ENUM(MESSAGE_SEND_RESULT, MESSAGE_SEND_RESULT_VALUES)
- typedef enum MESSAGE_SENDER_STATE_TAG
- {
- MESSAGE_SENDER_STATE_IDLE,
- MESSAGE_SENDER_STATE_OPENING,
- MESSAGE_SENDER_STATE_OPEN,
- MESSAGE_SENDER_STATE_CLOSING,
- MESSAGE_SENDER_STATE_ERROR
- } MESSAGE_SENDER_STATE;
+#define MESSAGE_SENDER_STATE_VALUES \
+ MESSAGE_SENDER_STATE_IDLE, \
+ MESSAGE_SENDER_STATE_OPENING, \
+ MESSAGE_SENDER_STATE_OPEN, \
+ MESSAGE_SENDER_STATE_CLOSING, \
+ MESSAGE_SENDER_STATE_ERROR
+
+DEFINE_ENUM(MESSAGE_SENDER_STATE, MESSAGE_SENDER_STATE_VALUES)
typedef struct MESSAGE_SENDER_INSTANCE_TAG* MESSAGE_SENDER_HANDLE;
typedef void(*ON_MESSAGE_SEND_COMPLETE)(void* context, MESSAGE_SEND_RESULT send_result);
@@ -38,7 +42,7 @@
MOCKABLE_FUNCTION(, void, messagesender_destroy, MESSAGE_SENDER_HANDLE, message_sender);
MOCKABLE_FUNCTION(, int, messagesender_open, MESSAGE_SENDER_HANDLE, message_sender);
MOCKABLE_FUNCTION(, int, messagesender_close, MESSAGE_SENDER_HANDLE, message_sender);
- MOCKABLE_FUNCTION(, int, messagesender_send, MESSAGE_SENDER_HANDLE, message_sender, MESSAGE_HANDLE, message, ON_MESSAGE_SEND_COMPLETE, on_message_send_complete, void*, callback_context);
+ MOCKABLE_FUNCTION(, ASYNC_OPERATION_HANDLE, messagesender_send_async, MESSAGE_SENDER_HANDLE, message_sender, MESSAGE_HANDLE, message, ON_MESSAGE_SEND_COMPLETE, on_message_send_complete, void*, callback_context, tickcounter_ms_t, timeout);
MOCKABLE_FUNCTION(, void, messagesender_set_trace, MESSAGE_SENDER_HANDLE, message_sender, bool, traceOn);
#ifdef __cplusplus
--- a/azure_uamqp_c/session.h Mon Sep 25 13:38:40 2017 -0700
+++ b/azure_uamqp_c/session.h Sat Oct 21 20:12:19 2017 +0000
@@ -8,34 +8,34 @@
#include "azure_uamqp_c/amqpvalue.h"
#include "azure_uamqp_c/amqp_frame_codec.h"
#include "azure_uamqp_c/connection.h"
+#include "azure_c_shared_utility/umock_c_prod.h"
+#include "azure_c_shared_utility/macro_utils.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
-#include "azure_c_shared_utility/umock_c_prod.h"
-
- typedef struct SESSION_INSTANCE_TAG* SESSION_HANDLE;
- typedef struct LINK_ENDPOINT_INSTANCE_TAG* LINK_ENDPOINT_HANDLE;
+typedef struct SESSION_INSTANCE_TAG* SESSION_HANDLE;
+typedef struct LINK_ENDPOINT_INSTANCE_TAG* LINK_ENDPOINT_HANDLE;
- typedef enum SESION_STATE_TAG
- {
- SESSION_STATE_UNMAPPED,
- SESSION_STATE_BEGIN_SENT,
- SESSION_STATE_BEGIN_RCVD,
- SESSION_STATE_MAPPED,
- SESSION_STATE_END_SENT,
- SESSION_STATE_END_RCVD,
- SESSION_STATE_DISCARDING,
- SESSION_STATE_ERROR
- } SESSION_STATE;
+#define SESSION_STATE_VALUES \
+ SESSION_STATE_UNMAPPED, \
+ SESSION_STATE_BEGIN_SENT, \
+ SESSION_STATE_BEGIN_RCVD, \
+ SESSION_STATE_MAPPED, \
+ SESSION_STATE_END_SENT, \
+ SESSION_STATE_END_RCVD, \
+ SESSION_STATE_DISCARDING, \
+ SESSION_STATE_ERROR
- typedef enum SESSION_SEND_TRANSFER_RESULT_TAG
- {
- SESSION_SEND_TRANSFER_OK,
- SESSION_SEND_TRANSFER_ERROR,
- SESSION_SEND_TRANSFER_BUSY
- } SESSION_SEND_TRANSFER_RESULT;
+DEFINE_ENUM(SESSION_STATE, SESSION_STATE_VALUES)
+
+#define SESSION_SEND_TRANSFER_RESULT_VALUES \
+ SESSION_SEND_TRANSFER_OK, \
+ SESSION_SEND_TRANSFER_ERROR, \
+ SESSION_SEND_TRANSFER_BUSY
+
+DEFINE_ENUM(SESSION_SEND_TRANSFER_RESULT, SESSION_SEND_TRANSFER_RESULT_VALUES)
typedef void(*LINK_ENDPOINT_FRAME_RECEIVED_CALLBACK)(void* context, AMQP_VALUE performative, uint32_t frame_payload_size, const unsigned char* payload_bytes);
typedef void(*ON_SESSION_STATE_CHANGED)(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state);
--- a/link.c Mon Sep 25 13:38:40 2017 -0700
+++ b/link.c Sat Oct 21 20:12:19 2017 +0000
@@ -9,6 +9,7 @@
#include "azure_c_shared_utility/optimize_size.h"
#include "azure_c_shared_utility/xlogging.h"
#include "azure_c_shared_utility/singlylinkedlist.h"
+#include "azure_c_shared_utility/tickcounter.h"
#include "azure_uamqp_c/link.h"
#include "azure_uamqp_c/session.h"
#include "azure_uamqp_c/amqpvalue.h"
@@ -23,6 +24,8 @@
ON_DELIVERY_SETTLED on_delivery_settled;
void* callback_context;
void* link;
+ tickcounter_ms_t start_tick;
+ tickcounter_ms_t timeout;
} DELIVERY_INSTANCE;
typedef struct LINK_INSTANCE_TAG
@@ -55,6 +58,7 @@
unsigned char* received_payload;
uint32_t received_payload_size;
delivery_number received_delivery_id;
+ TICK_COUNTER_HANDLE tick_counter;
} LINK_INSTANCE;
static void set_link_state(LINK_INSTANCE* link_instance, LINK_STATE link_state)
@@ -639,37 +643,49 @@
result->received_payload_size = 0;
result->received_delivery_id = 0;
- result->pending_deliveries = singlylinkedlist_create();
- if (result->pending_deliveries == NULL)
+ result->tick_counter = tickcounter_create();
+ if (result->tick_counter == NULL)
{
free(result);
result = NULL;
}
else
{
- size_t name_length = strlen(name);
- result->name = (char*)malloc(name_length + 1);
- if (result->name == NULL)
+ result->pending_deliveries = singlylinkedlist_create();
+ if (result->pending_deliveries == NULL)
{
- singlylinkedlist_destroy(result->pending_deliveries);
+ tickcounter_destroy(result->tick_counter);
free(result);
result = NULL;
}
else
{
- result->on_link_state_changed = NULL;
- result->callback_context = NULL;
- set_link_state(result, LINK_STATE_DETACHED);
-
- (void)memcpy(result->name, name, name_length + 1);
- result->link_endpoint = session_create_link_endpoint(session, name);
- if (result->link_endpoint == NULL)
+ size_t name_length = strlen(name);
+ result->name = (char*)malloc(name_length + 1);
+ if (result->name == NULL)
{
+ tickcounter_destroy(result->tick_counter);
singlylinkedlist_destroy(result->pending_deliveries);
- free(result->name);
free(result);
result = NULL;
}
+ else
+ {
+ result->on_link_state_changed = NULL;
+ result->callback_context = NULL;
+ set_link_state(result, LINK_STATE_DETACHED);
+
+ (void)memcpy(result->name, name, name_length + 1);
+ result->link_endpoint = session_create_link_endpoint(session, name);
+ if (result->link_endpoint == NULL)
+ {
+ tickcounter_destroy(result->tick_counter);
+ singlylinkedlist_destroy(result->pending_deliveries);
+ free(result->name);
+ free(result);
+ result = NULL;
+ }
+ }
}
}
}
@@ -709,28 +725,39 @@
result->role = role_sender;
}
- result->pending_deliveries = singlylinkedlist_create();
- if (result->pending_deliveries == NULL)
+ result->tick_counter = tickcounter_create();
+ if (result->tick_counter == NULL)
{
free(result);
result = NULL;
}
else
{
- size_t name_length = strlen(name);
- result->name = (char*)malloc(name_length + 1);
- if (result->name == NULL)
+ result->pending_deliveries = singlylinkedlist_create();
+ if (result->pending_deliveries == NULL)
{
- singlylinkedlist_destroy(result->pending_deliveries);
+ tickcounter_destroy(result->tick_counter);
free(result);
result = NULL;
}
else
{
- (void)memcpy(result->name, name, name_length + 1);
- result->on_link_state_changed = NULL;
- result->callback_context = NULL;
- result->link_endpoint = link_endpoint;
+ size_t name_length = strlen(name);
+ result->name = (char*)malloc(name_length + 1);
+ if (result->name == NULL)
+ {
+ tickcounter_destroy(result->tick_counter);
+ singlylinkedlist_destroy(result->pending_deliveries);
+ free(result);
+ result = NULL;
+ }
+ else
+ {
+ (void)memcpy(result->name, name, name_length + 1);
+ result->on_link_state_changed = NULL;
+ result->callback_context = NULL;
+ result->link_endpoint = link_endpoint;
+ }
}
}
}
@@ -743,6 +770,7 @@
if (link != NULL)
{
remove_all_pending_deliveries((LINK_INSTANCE*)link, false);
+ tickcounter_destroy(link->tick_counter);
link->on_link_state_changed = NULL;
(void)link_detach(link, true);
@@ -1063,7 +1091,7 @@
return result;
}
-LINK_TRANSFER_RESULT link_transfer(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context)
+LINK_TRANSFER_RESULT link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, tickcounter_ms_t timeout)
{
LINK_TRANSFER_RESULT result;
@@ -1133,41 +1161,50 @@
}
else
{
- LIST_ITEM_HANDLE delivery_instance_list_item;
- pending_delivery->on_delivery_settled = on_delivery_settled;
- pending_delivery->callback_context = callback_context;
- pending_delivery->link = link;
- delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
-
- if (delivery_instance_list_item == NULL)
+ if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
{
free(pending_delivery);
result = LINK_TRANSFER_ERROR;
}
else
{
- /* here we should feed data to the transfer frame */
- switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+ LIST_ITEM_HANDLE delivery_instance_list_item;
+ pending_delivery->timeout = timeout;
+ pending_delivery->on_delivery_settled = on_delivery_settled;
+ pending_delivery->callback_context = callback_context;
+ pending_delivery->link = link;
+ delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, pending_delivery);
+
+ if (delivery_instance_list_item == NULL)
{
- default:
- case SESSION_SEND_TRANSFER_ERROR:
- singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
free(pending_delivery);
result = LINK_TRANSFER_ERROR;
- break;
+ }
+ else
+ {
+ /* here we should feed data to the transfer frame */
+ switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
+ {
+ default:
+ case SESSION_SEND_TRANSFER_ERROR:
+ singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+ free(pending_delivery);
+ result = LINK_TRANSFER_ERROR;
+ break;
- case SESSION_SEND_TRANSFER_BUSY:
- /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
- singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
- free(pending_delivery);
- result = LINK_TRANSFER_BUSY;
- break;
+ case SESSION_SEND_TRANSFER_BUSY:
+ /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
+ singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item);
+ free(pending_delivery);
+ result = LINK_TRANSFER_BUSY;
+ break;
- case SESSION_SEND_TRANSFER_OK:
- link->delivery_count = delivery_count;
- link->link_credit--;
- result = LINK_TRANSFER_OK;
- break;
+ case SESSION_SEND_TRANSFER_OK:
+ link->delivery_count = delivery_count;
+ link->link_credit--;
+ result = LINK_TRANSFER_OK;
+ break;
+ }
}
}
}
@@ -1236,3 +1273,40 @@
}
return result;
}
+
+void link_dowork(LINK_HANDLE link)
+{
+ if (link != NULL)
+ {
+ tickcounter_ms_t current_tick;
+
+ if (tickcounter_get_current_ms(link->tick_counter, ¤t_tick) != 0)
+ {
+ /* error */
+ }
+ else
+ {
+ // go through all and find timed out deliveries
+ LIST_ITEM_HANDLE item = singlylinkedlist_get_head_item(link->pending_deliveries);
+ while (item != NULL)
+ {
+ LIST_ITEM_HANDLE next_item = singlylinkedlist_get_next_item(item);
+ DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)singlylinkedlist_item_get_value(item);
+
+ if ((delivery_instance->timeout != 0) &&
+ (current_tick - delivery_instance->start_tick >= delivery_instance->timeout))
+ {
+ if (delivery_instance->on_delivery_settled != NULL)
+ {
+ delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_TIMEOUT, NULL);
+ }
+
+ (void)singlylinkedlist_remove(link->pending_deliveries, item);
+ free(delivery_instance);
+ }
+
+ item = next_item;
+ }
+ }
+ }
+}
--- a/message_receiver.c Mon Sep 25 13:38:40 2017 -0700
+++ b/message_receiver.c Sat Oct 21 20:12:19 2017 +0000
@@ -22,27 +22,28 @@
bool decode_error;
} MESSAGE_RECEIVER_INSTANCE;
-static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state)
+static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver, MESSAGE_RECEIVER_STATE new_state)
{
- MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state;
- message_receiver_instance->message_receiver_state = new_state;
- if (message_receiver_instance->on_message_receiver_state_changed != NULL)
+ MESSAGE_RECEIVER_STATE previous_state = message_receiver->message_receiver_state;
+ message_receiver->message_receiver_state = new_state;
+ if (message_receiver->on_message_receiver_state_changed != NULL)
{
- message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state);
+ message_receiver->on_message_receiver_state_changed(message_receiver->on_message_receiver_state_changed_context, new_state, previous_state);
}
}
static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
- MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message;
+ MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
+ MESSAGE_HANDLE decoded_message = message_receiver->decoded_message;
AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
if (is_application_properties_type_by_descriptor(descriptor))
{
if (message_set_application_properties(decoded_message, decoded_value) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error setting application properties on received message");
+ message_receiver->decode_error = true;
}
}
else if (is_properties_type_by_descriptor(descriptor))
@@ -50,13 +51,15 @@
PROPERTIES_HANDLE properties;
if (amqpvalue_get_properties(decoded_value, &properties) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting message properties");
+ message_receiver->decode_error = true;
}
else
{
if (message_set_properties(decoded_message, properties) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error setting message properties on received message");
+ message_receiver->decode_error = true;
}
properties_destroy(properties);
@@ -65,19 +68,35 @@
else if (is_delivery_annotations_type_by_descriptor(descriptor))
{
annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
- if ((delivery_annotations == NULL) ||
- (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0))
+ if (delivery_annotations == NULL)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting delivery annotations");
+ message_receiver->decode_error = true;
+ }
+ else
+ {
+ if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)
+ {
+ LogError("Error setting delivery annotations on received message");
+ message_receiver->decode_error = true;
+ }
}
}
else if (is_message_annotations_type_by_descriptor(descriptor))
{
annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
- if ((message_annotations == NULL) ||
- (message_set_message_annotations(decoded_message, message_annotations) != 0))
+ if (message_annotations == NULL)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting message annotations");
+ message_receiver->decode_error = true;
+ }
+ else
+ {
+ if (message_set_message_annotations(decoded_message, message_annotations) != 0)
+ {
+ LogError("Error setting message annotations on received message");
+ message_receiver->decode_error = true;
+ }
}
}
else if (is_header_type_by_descriptor(descriptor))
@@ -85,13 +104,15 @@
HEADER_HANDLE header;
if (amqpvalue_get_header(decoded_value, &header) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting message header");
+ message_receiver->decode_error = true;
}
else
{
if (message_set_header(decoded_message, header) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error setting message header on received message");
+ message_receiver->decode_error = true;
}
header_destroy(header);
@@ -100,57 +121,97 @@
else if (is_footer_type_by_descriptor(descriptor))
{
annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
- if ((footer == NULL) ||
- (message_set_footer(decoded_message, footer) != 0))
+ if (footer == NULL)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting message footer");
+ message_receiver->decode_error = true;
+ }
+ else
+ {
+ if (message_set_footer(decoded_message, footer) != 0)
+ {
+ LogError("Error setting message footer on received message");
+ message_receiver->decode_error = true;
+ }
}
}
else if (is_amqp_value_type_by_descriptor(descriptor))
{
MESSAGE_BODY_TYPE body_type;
- message_get_body_type(decoded_message, &body_type);
- if (body_type != MESSAGE_BODY_TYPE_NONE)
+ if (message_get_body_type(decoded_message, &body_type) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting message body type");
+ message_receiver->decode_error = true;
}
else
{
- AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
- if ((body_amqp_value == NULL) ||
- (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0))
+ if (body_type != MESSAGE_BODY_TYPE_NONE)
+ {
+ LogError("Body already set on received message");
+ message_receiver->decode_error = true;
+ }
+ else
{
- message_receiver_instance->decode_error = true;
+ AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
+ if (body_amqp_value == NULL)
+ {
+ LogError("Error getting body AMQP value");
+ message_receiver->decode_error = true;
+ }
+ else
+ {
+ if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)
+ {
+ LogError("Error setting body AMQP value on received message");
+ message_receiver->decode_error = true;
+ }
+ }
}
}
}
else if (is_data_type_by_descriptor(descriptor))
{
MESSAGE_BODY_TYPE body_type;
- message_get_body_type(decoded_message, &body_type);
- if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
- (body_type != MESSAGE_BODY_TYPE_DATA))
+ if (message_get_body_type(decoded_message, &body_type) != 0)
{
- message_receiver_instance->decode_error = true;
+ LogError("Error getting message body type");
+ message_receiver->decode_error = true;
}
else
{
- AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
- data data_value;
-
- if ((body_data_value == NULL) ||
- (amqpvalue_get_data(body_data_value, &data_value) != 0))
+ if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
+ (body_type != MESSAGE_BODY_TYPE_DATA))
{
- message_receiver_instance->decode_error = true;
+ LogError("Message body type already set to something different than AMQP DATA");
+ message_receiver->decode_error = true;
}
else
{
- BINARY_DATA binary_data;
- binary_data.bytes = (const unsigned char*)data_value.bytes;
- binary_data.length = data_value.length;
- if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
+ AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
+ if (body_data_value == NULL)
+ {
+ LogError("Error getting body DATA value");
+ message_receiver->decode_error = true;
+ }
+ else
{
- message_receiver_instance->decode_error = true;
+ data data_value;
+ if (amqpvalue_get_data(body_data_value, &data_value) != 0)
+ {
+ LogError("Error getting body DATA AMQP value");
+ message_receiver->decode_error = true;
+ }
+ else
+ {
+ BINARY_DATA binary_data;
+ binary_data.bytes = (const unsigned char*)data_value.bytes;
+ binary_data.length = data_value.length;
+ if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
+ {
+ LogError("Error adding body DATA to received message");
+ message_receiver->decode_error = true;
+ }
+ }
}
}
}
@@ -160,40 +221,44 @@
static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes)
{
AMQP_VALUE result = NULL;
+ MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
(void)transfer;
- if (message_receiver_instance->on_message_received != NULL)
+ if (message_receiver->on_message_received != NULL)
{
MESSAGE_HANDLE message = message_create();
if (message == NULL)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ LogError("Cannot create message");
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
- AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance);
+ AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ LogError("Cannot create AMQP value decoder");
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
- message_receiver_instance->decoded_message = message;
- message_receiver_instance->decode_error = false;
+ message_receiver->decoded_message = message;
+ message_receiver->decode_error = false;
if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ LogError("Cannot decode bytes");
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
- if (message_receiver_instance->decode_error)
+ if (message_receiver->decode_error)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ LogError("Error decoding message");
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
- result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message);
+ result = message_receiver->on_message_received(message_receiver->callback_context, message);
}
}
@@ -209,7 +274,7 @@
static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context;
+ MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
(void)previous_link_state;
switch (new_link_state)
@@ -218,28 +283,28 @@
break;
case LINK_STATE_ATTACHED:
- if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
+ if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPEN);
}
break;
case LINK_STATE_DETACHED:
- if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
- (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
+ if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) ||
+ (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING))
{
/* User initiated transition, we should be good */
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_IDLE);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_IDLE);
}
- else if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
+ else if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE)
{
/* Any other transition must be an error */
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
break;
case LINK_STATE_ERROR:
- if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
+ if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
break;
}
@@ -247,21 +312,29 @@
MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context)
{
- MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
- if (result != NULL)
+ MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE));
+ if (message_receiver == NULL)
{
- result->link = link;
- result->on_message_receiver_state_changed = on_message_receiver_state_changed;
- result->on_message_receiver_state_changed_context = context;
- result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
+ LogError("Error creating message receiver");
+ }
+ else
+ {
+ message_receiver->link = link;
+ message_receiver->on_message_receiver_state_changed = on_message_receiver_state_changed;
+ message_receiver->on_message_receiver_state_changed_context = context;
+ message_receiver->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE;
}
- return result;
+ return message_receiver;
}
void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver)
{
- if (message_receiver != NULL)
+ if (message_receiver == NULL)
+ {
+ LogError("NULL message_receiver");
+ }
+ else
{
(void)messagereceiver_close(message_receiver);
free(message_receiver);
@@ -274,24 +347,24 @@
if (message_receiver == NULL)
{
+ LogError("NULL message_receiver");
result = __FAILURE__;
}
else
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-
- if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
+ if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE)
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING);
- if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0)
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPENING);
+ if (link_attach(message_receiver->link, on_transfer_received, on_link_state_changed, NULL, message_receiver) != 0)
{
+ LogError("Link attach failed");
result = __FAILURE__;
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
- message_receiver_instance->on_message_received = on_message_received;
- message_receiver_instance->callback_context = callback_context;
+ message_receiver->on_message_received = on_message_received;
+ message_receiver->callback_context = callback_context;
result = 0;
}
@@ -311,21 +384,21 @@
if (message_receiver == NULL)
{
+ LogError("NULL message_receiver");
result = __FAILURE__;
}
else
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
-
- if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
- (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
+ if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) ||
+ (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
{
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_CLOSING);
- if (link_detach(message_receiver_instance->link, true) != 0)
+ if (link_detach(message_receiver->link, true) != 0)
{
+ LogError("link detach failed");
result = __FAILURE__;
- set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR);
+ set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
@@ -347,13 +420,14 @@
if (message_receiver == NULL)
{
+ LogError("NULL message_receiver");
result = __FAILURE__;
}
else
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
- if (link_get_name(message_receiver_instance->link, link_name) != 0)
+ if (link_get_name(message_receiver->link, link_name) != 0)
{
+ LogError("Getting link name failed");
result = __FAILURE__;
}
else
@@ -371,13 +445,14 @@
if (message_receiver == NULL)
{
+ LogError("NULL message_receiver");
result = __FAILURE__;
}
else
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
- if (link_get_received_message_id(message_receiver_instance->link, message_id) != 0)
+ if (link_get_received_message_id(message_receiver->link, message_id) != 0)
{
+ LogError("Failed getting received message Id");
result = __FAILURE__;
}
else
@@ -395,32 +470,36 @@
if (message_receiver == NULL)
{
+ LogError("NULL message_receiver");
result = __FAILURE__;
}
else
{
- MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver;
- if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
+ if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN)
{
+ LogError("Message received not open");
result = __FAILURE__;
}
else
{
const char* my_name;
- if (link_get_name(message_receiver_instance->link, &my_name) != 0)
+ if (link_get_name(message_receiver->link, &my_name) != 0)
{
+ LogError("Failed getting link name");
result = __FAILURE__;
}
else
{
if (strcmp(link_name, my_name) != 0)
{
+ LogError("Link name does not match");
result = __FAILURE__;
}
else
{
- if (link_send_disposition(message_receiver_instance->link, message_number, delivery_state) != 0)
+ if (link_send_disposition(message_receiver->link, message_number, delivery_state) != 0)
{
+ LogError("Seding disposition failed");
result = __FAILURE__;
}
else
@@ -437,7 +516,13 @@
void messagereceiver_set_trace(MESSAGE_RECEIVER_HANDLE message_receiver, bool trace_on)
{
- /* No tracing is yet implemented for message receiver */
- (void)message_receiver;
- (void)trace_on;
+ if (message_receiver == NULL)
+ {
+ LogError("NULL message_receiver");
+ }
+ else
+ {
+ /* No tracing is yet implemented for message receiver */
+ (void)trace_on;
+ }
}
--- a/message_sender.c Mon Sep 25 13:38:40 2017 -0700
+++ b/message_sender.c Sat Oct 21 20:12:19 2017 +0000
@@ -7,8 +7,10 @@
#include "azure_c_shared_utility/optimize_size.h"
#include "azure_c_shared_utility/gballoc.h"
#include "azure_c_shared_utility/xlogging.h"
+#include "azure_c_shared_utility/tickcounter.h"
#include "azure_uamqp_c/message_sender.h"
#include "azure_uamqp_c/amqpvalue_to_string.h"
+#include "azure_uamqp_c/async_operation.h"
typedef enum MESSAGE_SEND_STATE_TAG
{
@@ -30,62 +32,66 @@
void* context;
MESSAGE_SENDER_HANDLE message_sender;
MESSAGE_SEND_STATE message_send_state;
+ tickcounter_ms_t timeout;
} MESSAGE_WITH_CALLBACK;
+DEFINE_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK);
+
typedef struct MESSAGE_SENDER_INSTANCE_TAG
{
LINK_HANDLE link;
size_t message_count;
- MESSAGE_WITH_CALLBACK** messages;
+ ASYNC_OPERATION_HANDLE* messages;
MESSAGE_SENDER_STATE message_sender_state;
ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed;
void* on_message_sender_state_changed_context;
unsigned int is_trace_on : 1;
} MESSAGE_SENDER_INSTANCE;
-static void remove_pending_message_by_index(MESSAGE_SENDER_INSTANCE* message_sender_instance, size_t index)
+static void remove_pending_message_by_index(MESSAGE_SENDER_HANDLE message_sender, size_t index)
{
- MESSAGE_WITH_CALLBACK** new_messages;
+ ASYNC_OPERATION_HANDLE* new_messages;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[index]);
- if (message_sender_instance->messages[index]->message != NULL)
+ if (message_with_callback->message != NULL)
{
- message_destroy(message_sender_instance->messages[index]->message);
- message_sender_instance->messages[index]->message = NULL;
+ message_destroy(message_with_callback->message);
+ message_with_callback->message = NULL;
}
- free(message_sender_instance->messages[index]);
+ async_operation_destroy(message_sender->messages[index]);
- if (message_sender_instance->message_count - index > 1)
+ if (message_sender->message_count - index > 1)
{
- (void)memmove(&message_sender_instance->messages[index], &message_sender_instance->messages[index + 1], sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count - index - 1));
+ (void)memmove(&message_sender->messages[index], &message_sender->messages[index + 1], sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count - index - 1));
}
- message_sender_instance->message_count--;
+ message_sender->message_count--;
- if (message_sender_instance->message_count > 0)
+ if (message_sender->message_count > 0)
{
- new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count));
+ new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count));
if (new_messages != NULL)
{
- message_sender_instance->messages = new_messages;
+ message_sender->messages = new_messages;
}
}
else
{
- free(message_sender_instance->messages);
- message_sender_instance->messages = NULL;
+ free(message_sender->messages);
+ message_sender->messages = NULL;
}
}
-static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback)
+static void remove_pending_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send)
{
size_t i;
- for (i = 0; i < message_sender_instance->message_count; i++)
+ for (i = 0; i < message_sender->message_count; i++)
{
- if (message_sender_instance->messages[i] == message_with_callback)
+ if (message_sender->messages[i] == pending_send)
{
- remove_pending_message_by_index(message_sender_instance, i);
+ remove_pending_message_by_index(message_sender, i);
break;
}
}
@@ -93,8 +99,9 @@
static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state)
{
- MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context;
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
+ ASYNC_OPERATION_HANDLE pending_send = (ASYNC_OPERATION_HANDLE)context;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+ MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender;
(void)delivery_no;
if (message_with_callback->on_message_send_complete != NULL)
@@ -128,6 +135,9 @@
case LINK_DELIVERY_SETTLE_REASON_SETTLED:
message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK);
break;
+ case LINK_DELIVERY_SETTLE_REASON_TIMEOUT:
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT);
+ break;
case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED:
default:
message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
@@ -135,7 +145,7 @@
}
}
- remove_pending_message(message_sender_instance, message_with_callback);
+ remove_pending_message(message_sender, pending_send);
}
static int encode_bytes(void* context, const unsigned char* bytes, size_t length)
@@ -146,18 +156,18 @@
return 0;
}
-static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender_instance, const char* name, AMQP_VALUE value)
+static void log_message_chunk(MESSAGE_SENDER_INSTANCE* message_sender, const char* name, AMQP_VALUE value)
{
#ifdef NO_LOGGING
- UNUSED(message_sender_instance);
+ UNUSED(message_sender);
UNUSED(name);
UNUSED(value);
#else
- if (xlogging_get_log_function() != NULL && message_sender_instance->is_trace_on == 1)
+ if (xlogging_get_log_function() != NULL && message_sender->is_trace_on == 1)
{
char* value_as_string = NULL;
- LOG(AZ_LOG_TRACE, 0, "%s", name);
- LOG(AZ_LOG_TRACE, 0, "%s", (value_as_string = amqpvalue_to_string(value)));
+ LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL(name));
+ LOG(AZ_LOG_TRACE, 0, "%s", P_OR_NULL((value_as_string = amqpvalue_to_string(value))));
if (value_as_string != NULL)
{
free(value_as_string);
@@ -166,7 +176,7 @@
#endif
}
-static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_WITH_CALLBACK* message_with_callback, MESSAGE_HANDLE message)
+static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
{
SEND_ONE_MESSAGE_RESULT result;
@@ -193,51 +203,112 @@
AMQP_VALUE body_amqp_value = NULL;
size_t body_data_count = 0;
AMQP_VALUE msg_annotations = NULL;
+ bool is_error = false;
- message_get_header(message, &header);
- header_amqp_value = amqpvalue_create_header(header);
- if (header != NULL)
+ // message header
+ if ((message_get_header(message, &header) == 0) &&
+ (header != NULL))
{
- amqpvalue_get_encoded_size(header_amqp_value, &encoded_size);
- total_encoded_size += encoded_size;
+ header_amqp_value = amqpvalue_create_header(header);
+ if (header_amqp_value == NULL)
+ {
+ LogError("Cannot create header AMQP value");
+ is_error = true;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain header encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+
}
// message annotations
- if ((message_get_message_annotations(message, &msg_annotations) == 0) &&
+ if ((!is_error) &&
+ (message_get_message_annotations(message, &msg_annotations) == 0) &&
(msg_annotations != NULL))
{
- amqpvalue_get_encoded_size(msg_annotations, &encoded_size);
- total_encoded_size += encoded_size;
+ if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain message annotations encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
}
// properties
- message_get_properties(message, &properties);
- properties_amqp_value = amqpvalue_create_properties(properties);
- if (properties != NULL)
+ if ((!is_error) &&
+ (message_get_properties(message, &properties) == 0) &&
+ (properties != NULL))
{
- amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size);
- total_encoded_size += encoded_size;
+ properties_amqp_value = amqpvalue_create_properties(properties);
+ if (properties_amqp_value == NULL)
+ {
+ LogError("Cannot create message properties AMQP value");
+ is_error = true;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain message properties encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
}
// application properties
- message_get_application_properties(message, &application_properties);
- if (application_properties != NULL)
+ if ((!is_error) &&
+ (message_get_application_properties(message, &application_properties) == 0) &&
+ (application_properties != NULL))
{
application_properties_value = amqpvalue_create_application_properties(application_properties);
- amqpvalue_get_encoded_size(application_properties_value, &encoded_size);
- total_encoded_size += encoded_size;
+ if (application_properties_value == NULL)
+ {
+ LogError("Cannot create application properties AMQP value");
+ is_error = true;
+ }
+ else
+ {
+ if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
+ {
+ LogError("Cannot obtain application properties encoded size");
+ is_error = true;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
+ }
+ }
+
+ if (is_error)
+ {
+ result = SEND_ONE_MESSAGE_ERROR;
}
else
{
- application_properties_value = NULL;
- }
-
- result = SEND_ONE_MESSAGE_OK;
+ result = SEND_ONE_MESSAGE_OK;
- // body - amqp data
- switch (message_body_type)
- {
+ // body - amqp data
+ switch (message_body_type)
+ {
default:
+ LogError("Unknown body type");
result = SEND_ONE_MESSAGE_ERROR;
break;
@@ -246,19 +317,28 @@
AMQP_VALUE message_body_amqp_value;
if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
{
+ LogError("Cannot obtain AMQP value from body");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
- if ((body_amqp_value == NULL) ||
- (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0))
+ if (body_amqp_value == NULL)
{
+ LogError("Cannot create body AMQP value");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
- total_encoded_size += encoded_size;
+ if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
+ {
+ LogError("Cannot get body AMQP value encoded size");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+ else
+ {
+ total_encoded_size += encoded_size;
+ }
}
}
@@ -272,6 +352,7 @@
if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
{
+ LogError("Cannot get body AMQP data count");
result = SEND_ONE_MESSAGE_ERROR;
}
else
@@ -280,6 +361,7 @@
{
if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
{
+ LogError("Cannot get body AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
@@ -291,12 +373,14 @@
body_amqp_data = amqpvalue_create_data(binary_value);
if (body_amqp_data == NULL)
{
+ LogError("Cannot create body AMQP data");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
{
+ LogError("Cannot get body AMQP data encoded size");
result = SEND_ONE_MESSAGE_ERROR;
}
else
@@ -311,139 +395,151 @@
}
break;
}
- }
-
- if (result == 0)
- {
- void* data_bytes = malloc(total_encoded_size);
- PAYLOAD payload;
- payload.bytes = (const unsigned char*)data_bytes;
- payload.length = 0;
- result = SEND_ONE_MESSAGE_OK;
-
- if (header != NULL)
- {
- if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
- {
- result = SEND_ONE_MESSAGE_ERROR;
- }
-
- log_message_chunk(message_sender_instance, "Header:", header_amqp_value);
- }
-
- if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
- {
- if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
- {
- result = SEND_ONE_MESSAGE_ERROR;
- }
-
- log_message_chunk(message_sender_instance, "Message Annotations:", msg_annotations);
}
- if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
+ if (result == 0)
{
- if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
+ void* data_bytes = malloc(total_encoded_size);
+ PAYLOAD payload;
+ payload.bytes = (const unsigned char*)data_bytes;
+ payload.length = 0;
+ result = SEND_ONE_MESSAGE_OK;
+
+ if (header != NULL)
{
- result = SEND_ONE_MESSAGE_ERROR;
+ if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode header value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Header:", header_amqp_value);
}
- log_message_chunk(message_sender_instance, "Properties:", properties_amqp_value);
- }
+ if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
+ {
+ if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode message annotations value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
- if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
- {
- if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
- {
- result = SEND_ONE_MESSAGE_ERROR;
+ log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
}
- log_message_chunk(message_sender_instance, "Application properties:", application_properties_value);
- }
-
- if (result == SEND_ONE_MESSAGE_OK)
- {
- switch (message_body_type)
+ if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
{
- default:
- result = SEND_ONE_MESSAGE_ERROR;
- break;
-
- case MESSAGE_BODY_TYPE_VALUE:
- {
- if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
+ if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
{
+ LogError("Cannot encode message properties value");
result = SEND_ONE_MESSAGE_ERROR;
}
- log_message_chunk(message_sender_instance, "Body - amqp value:", body_amqp_value);
- break;
+ log_message_chunk(message_sender, "Properties:", properties_amqp_value);
}
- case MESSAGE_BODY_TYPE_DATA:
+
+ if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
{
- BINARY_DATA binary_data;
- size_t i;
+ if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode application properties value");
+ result = SEND_ONE_MESSAGE_ERROR;
+ }
+
+ log_message_chunk(message_sender, "Application properties:", application_properties_value);
+ }
- for (i = 0; i < body_data_count; i++)
+ if (result == SEND_ONE_MESSAGE_OK)
+ {
+ switch (message_body_type)
{
- if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
+ default:
+ LogError("Unknown message type");
+ result = SEND_ONE_MESSAGE_ERROR;
+ break;
+
+ case MESSAGE_BODY_TYPE_VALUE:
+ {
+ if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
{
+ LogError("Cannot encode body AMQP value");
result = SEND_ONE_MESSAGE_ERROR;
}
- else
+
+ log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
+ break;
+ }
+ case MESSAGE_BODY_TYPE_DATA:
+ {
+ BINARY_DATA binary_data;
+ size_t i;
+
+ for (i = 0; i < body_data_count; i++)
{
- AMQP_VALUE body_amqp_data;
- amqp_binary binary_value;
- binary_value.bytes = binary_data.bytes;
- binary_value.length = (uint32_t)binary_data.length;
- body_amqp_data = amqpvalue_create_data(binary_value);
- if (body_amqp_data == NULL)
+ if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
{
+ LogError("Cannot get AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
- if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+ AMQP_VALUE body_amqp_data;
+ amqp_binary binary_value;
+ binary_value.bytes = binary_data.bytes;
+ binary_value.length = (uint32_t)binary_data.length;
+ body_amqp_data = amqpvalue_create_data(binary_value);
+ if (body_amqp_data == NULL)
{
+ LogError("Cannot create body AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
- break;
}
+ else
+ {
+ if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
+ {
+ LogError("Cannot encode body AMQP data %u", (unsigned int)i);
+ result = SEND_ONE_MESSAGE_ERROR;
+ break;
+ }
- amqpvalue_destroy(body_amqp_data);
+ amqpvalue_destroy(body_amqp_data);
+ }
}
}
+ break;
}
- break;
+ }
}
- }
- }
- if (result == SEND_ONE_MESSAGE_OK)
- {
- message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
- switch (link_transfer(message_sender_instance->link, message_format, &payload, 1, on_delivery_settled, message_with_callback))
+ if (result == SEND_ONE_MESSAGE_OK)
{
- default:
- case LINK_TRANSFER_ERROR:
- result = SEND_ONE_MESSAGE_ERROR;
- break;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
+ switch (link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, message_with_callback->timeout))
+ {
+ default:
+ case LINK_TRANSFER_ERROR:
+ LogError("Error in link transfer");
+ result = SEND_ONE_MESSAGE_ERROR;
+ break;
- case LINK_TRANSFER_BUSY:
- message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
- result = SEND_ONE_MESSAGE_BUSY;
- break;
+ case LINK_TRANSFER_BUSY:
+ message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
+ result = SEND_ONE_MESSAGE_BUSY;
+ break;
- case LINK_TRANSFER_OK:
- result = SEND_ONE_MESSAGE_OK;
- break;
+ case LINK_TRANSFER_OK:
+ result = SEND_ONE_MESSAGE_OK;
+ break;
+ }
}
- }
- free(data_bytes);
+ free(data_bytes);
- if (body_amqp_value != NULL)
- {
- amqpvalue_destroy(body_amqp_value);
+ if (body_amqp_value != NULL)
+ {
+ amqpvalue_destroy(body_amqp_value);
+ }
}
}
@@ -466,14 +562,17 @@
{
amqpvalue_destroy(application_properties);
}
+
if (application_properties_value != NULL)
{
amqpvalue_destroy(application_properties_value);
}
+
if (properties_amqp_value != NULL)
{
amqpvalue_destroy(properties_amqp_value);
}
+
if (properties != NULL)
{
properties_destroy(properties);
@@ -483,33 +582,37 @@
return result;
}
-static void send_all_pending_messages(MESSAGE_SENDER_INSTANCE* message_sender_instance)
+static void send_all_pending_messages(MESSAGE_SENDER_HANDLE message_sender)
{
size_t i;
- for (i = 0; i < message_sender_instance->message_count; i++)
+ for (i = 0; i < message_sender->message_count; i++)
{
- if (message_sender_instance->messages[i]->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+ if (message_with_callback->message_send_state == MESSAGE_SEND_STATE_NOT_SENT)
{
- switch (send_one_message(message_sender_instance, message_sender_instance->messages[i], message_sender_instance->messages[i]->message))
+ switch (send_one_message(message_sender, message_sender->messages[i], message_with_callback->message))
{
default:
+ LogError("Invalid send one message result");
+ break;
+
case SEND_ONE_MESSAGE_ERROR:
{
- ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_sender_instance->messages[i]->on_message_send_complete;
- void* context = message_sender_instance->messages[i]->context;
- remove_pending_message_by_index(message_sender_instance, i);
+ ON_MESSAGE_SEND_COMPLETE on_message_send_complete = message_with_callback->on_message_send_complete;
+ void* context = message_with_callback->context;
+ remove_pending_message_by_index(message_sender, i);
if (on_message_send_complete != NULL)
{
on_message_send_complete(context, MESSAGE_SEND_ERROR);
}
- i = message_sender_instance->message_count;
+ i = message_sender->message_count;
break;
}
case SEND_ONE_MESSAGE_BUSY:
- i = message_sender_instance->message_count + 1;
+ i = message_sender->message_count + 1;
break;
case SEND_ONE_MESSAGE_OK:
@@ -521,46 +624,47 @@
}
}
-static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender_instance, MESSAGE_SENDER_STATE new_state)
+static void set_message_sender_state(MESSAGE_SENDER_INSTANCE* message_sender, MESSAGE_SENDER_STATE new_state)
{
- MESSAGE_SENDER_STATE previous_state = message_sender_instance->message_sender_state;
- message_sender_instance->message_sender_state = new_state;
- if (message_sender_instance->on_message_sender_state_changed != NULL)
+ MESSAGE_SENDER_STATE previous_state = message_sender->message_sender_state;
+ message_sender->message_sender_state = new_state;
+ if (message_sender->on_message_sender_state_changed != NULL)
{
- message_sender_instance->on_message_sender_state_changed(message_sender_instance->on_message_sender_state_changed_context, new_state, previous_state);
+ message_sender->on_message_sender_state_changed(message_sender->on_message_sender_state_changed_context, new_state, previous_state);
}
}
-static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender_instance)
+static void indicate_all_messages_as_error(MESSAGE_SENDER_INSTANCE* message_sender)
{
size_t i;
- for (i = 0; i < message_sender_instance->message_count; i++)
+ for (i = 0; i < message_sender->message_count; i++)
{
- if (message_sender_instance->messages[i]->on_message_send_complete != NULL)
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]);
+ if (message_with_callback->on_message_send_complete != NULL)
{
- message_sender_instance->messages[i]->on_message_send_complete(message_sender_instance->messages[i]->context, MESSAGE_SEND_ERROR);
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR);
}
- if (message_sender_instance->messages[i]->message != NULL)
+ if (message_with_callback->message != NULL)
{
- message_destroy(message_sender_instance->messages[i]->message);
+ message_destroy(message_with_callback->message);
}
- free(message_sender_instance->messages[i]);
+ async_operation_destroy(message_sender->messages[i]);
}
- if (message_sender_instance->messages != NULL)
+ if (message_sender->messages != NULL)
{
- message_sender_instance->message_count = 0;
+ message_sender->message_count = 0;
- free(message_sender_instance->messages);
- message_sender_instance->messages = NULL;
+ free(message_sender->messages);
+ message_sender->messages = NULL;
}
}
static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state)
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
+ MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)context;
(void)previous_link_state;
switch (new_link_state)
@@ -569,30 +673,30 @@
break;
case LINK_STATE_ATTACHED:
- if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING)
{
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPEN);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPEN);
}
break;
case LINK_STATE_DETACHED:
- if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
- (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
+ if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) ||
+ (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING))
{
/* User initiated transition, we should be good */
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE);
- indicate_all_messages_as_error(message_sender_instance);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE);
+ indicate_all_messages_as_error(message_sender);
}
- else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
+ else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE)
{
/* Any other transition must be an error */
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
}
break;
case LINK_STATE_ERROR:
- if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
+ if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR)
{
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
- indicate_all_messages_as_error(message_sender_instance);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
+ indicate_all_messages_as_error(message_sender);
}
break;
}
@@ -600,36 +704,41 @@
static void on_link_flow_on(void* context)
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)context;
- send_all_pending_messages(message_sender_instance);
+ MESSAGE_SENDER_HANDLE message_sender = (MESSAGE_SENDER_INSTANCE*)context;
+ send_all_pending_messages(message_sender);
}
MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context)
{
- MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
- if (result != NULL)
+ MESSAGE_SENDER_INSTANCE* message_sender = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE));
+ if (message_sender == NULL)
+ {
+ LogError("Failed allocating message sender");
+ }
+ else
{
- result->messages = NULL;
- result->message_count = 0;
- result->link = link;
- result->on_message_sender_state_changed = on_message_sender_state_changed;
- result->on_message_sender_state_changed_context = context;
- result->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
- result->is_trace_on = 0;
+ message_sender->messages = NULL;
+ message_sender->message_count = 0;
+ message_sender->link = link;
+ message_sender->on_message_sender_state_changed = on_message_sender_state_changed;
+ message_sender->on_message_sender_state_changed_context = context;
+ message_sender->message_sender_state = MESSAGE_SENDER_STATE_IDLE;
+ message_sender->is_trace_on = 0;
}
- return result;
+ return message_sender;
}
void messagesender_destroy(MESSAGE_SENDER_HANDLE message_sender)
{
- if (message_sender != NULL)
+ if (message_sender == NULL)
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
- messagesender_close(message_sender_instance);
-
- indicate_all_messages_as_error(message_sender_instance);
+ LogError("NULL message_sender");
+ }
+ else
+ {
+ (void)messagesender_close(message_sender);
+ indicate_all_messages_as_error(message_sender);
free(message_sender);
}
@@ -641,19 +750,19 @@
if (message_sender == NULL)
{
+ LogError("NULL message_sender");
result = __FAILURE__;
}
else
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
- if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_IDLE)
{
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_OPENING);
- if (link_attach(message_sender_instance->link, NULL, on_link_state_changed, on_link_flow_on, message_sender_instance) != 0)
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_OPENING);
+ if (link_attach(message_sender->link, NULL, on_link_state_changed, on_link_flow_on, message_sender) != 0)
{
+ LogError("attach link failed");
result = __FAILURE__;
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
}
else
{
@@ -675,20 +784,20 @@
if (message_sender == NULL)
{
+ LogError("NULL message_sender");
result = __FAILURE__;
}
else
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
-
- if ((message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
- (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
+ if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
+ (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
{
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_CLOSING);
- if (link_detach(message_sender_instance->link, true) != 0)
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_CLOSING);
+ if (link_detach(message_sender->link, true) != 0)
{
+ LogError("Detaching link failed");
result = __FAILURE__;
- set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR);
+ set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR);
}
else
{
@@ -704,49 +813,63 @@
return result;
}
-int messagesender_send(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context)
+static void messagesender_send_cancel_handler(ASYNC_OPERATION_HANDLE send_operation)
{
- int result;
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation);
+ if (message_with_callback->on_message_send_complete != NULL)
+ {
+ message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED);
+ }
+
+ remove_pending_message(message_with_callback->message_sender, send_operation);
+}
+
+ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout)
+{
+ ASYNC_OPERATION_HANDLE result;
if ((message_sender == NULL) ||
(message == NULL))
{
- result = __FAILURE__;
+ LogError("Bad parameters: message_sender = %p, message = %p");
+ result = NULL;
}
else
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
- if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
{
- result = __FAILURE__;
+ LogError("Message sender in ERROR state");
+ result = NULL;
}
else
{
- MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)malloc(sizeof(MESSAGE_WITH_CALLBACK));
- if (message_with_callback == NULL)
+ result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler);
+ if (result == NULL)
{
- result = __FAILURE__;
+ LogError("Failed allocating context for send");
}
else
{
- MESSAGE_WITH_CALLBACK** new_messages = (MESSAGE_WITH_CALLBACK**)realloc(message_sender_instance->messages, sizeof(MESSAGE_WITH_CALLBACK*) * (message_sender_instance->message_count + 1));
+ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result);
+ ASYNC_OPERATION_HANDLE* new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, sizeof(ASYNC_OPERATION_HANDLE) * (message_sender->message_count + 1));
if (new_messages == NULL)
{
- free(message_with_callback);
- result = __FAILURE__;
+ LogError("Failed allocating memory for pending sends");
+ async_operation_destroy(result);
+ result = NULL;
}
else
{
- result = 0;
-
- message_sender_instance->messages = new_messages;
- if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
+ message_with_callback->timeout = timeout;
+ message_sender->messages = new_messages;
+ if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
{
message_with_callback->message = message_clone(message);
if (message_with_callback->message == NULL)
{
- free(message_with_callback);
- result = __FAILURE__;
+ LogError("Cannot clone message for placing it in the pending sends list");
+ async_operation_destroy(result);
+ result = NULL;
}
message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
@@ -757,42 +880,42 @@
message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
}
- if (result == 0)
+ if (result != NULL)
{
message_with_callback->on_message_send_complete = on_message_send_complete;
message_with_callback->context = callback_context;
- message_with_callback->message_sender = message_sender_instance;
+ message_with_callback->message_sender = message_sender;
- message_sender_instance->messages[message_sender_instance->message_count] = message_with_callback;
- message_sender_instance->message_count++;
+ message_sender->messages[message_sender->message_count] = result;
+ message_sender->message_count++;
- if (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
+ if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
{
- switch (send_one_message(message_sender_instance, message_with_callback, message))
+ switch (send_one_message(message_sender, result, message))
{
default:
case SEND_ONE_MESSAGE_ERROR:
-
- remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1);
- result = __FAILURE__;
+ LogError("Error sending message");
+ remove_pending_message_by_index(message_sender, message_sender->message_count - 1);
+ async_operation_destroy(result);
+ result = NULL;
break;
case SEND_ONE_MESSAGE_BUSY:
message_with_callback->message = message_clone(message);
if (message_with_callback->message == NULL)
{
- free(message_with_callback);
- result = __FAILURE__;
+ LogError("Error cloning message for placing it in the pending sends list");
+ async_operation_destroy(result);
+ result = NULL;
}
else
{
message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
- result = 0;
}
break;
case SEND_ONE_MESSAGE_OK:
- result = 0;
break;
}
}
@@ -801,14 +924,18 @@
}
}
}
+
return result;
}
void messagesender_set_trace(MESSAGE_SENDER_HANDLE message_sender, bool traceOn)
{
- MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_sender;
- if (message_sender_instance != NULL)
+ if (message_sender == NULL)
{
- message_sender_instance->is_trace_on = traceOn ? 1 : 0;
+ LogError("NULL message_sender");
+ }
+ else
+ {
+ message_sender->is_trace_on = traceOn ? 1 : 0;
}
}
--- a/messaging.c Mon Sep 25 13:38:40 2017 -0700
+++ b/messaging.c Sat Oct 21 20:12:19 2017 +0000
@@ -2,7 +2,9 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#include <stdlib.h>
+#include <stdint.h>
#include <stdbool.h>
+#include "azure_c_shared_utility/xlogging.h"
#include "azure_uamqp_c/amqpvalue.h"
#include "azure_uamqp_c/amqp_definitions.h"
@@ -10,8 +12,10 @@
{
AMQP_VALUE result;
SOURCE_HANDLE source = source_create();
+
if (source == NULL)
{
+ LogError("NULL source");
result = NULL;
}
else
@@ -19,17 +23,27 @@
AMQP_VALUE address_value = amqpvalue_create_string(address);
if (address_value == NULL)
{
+ LogError("Cannot create address AMQP string");
result = NULL;
}
else
{
if (source_set_address(source, address_value) != 0)
{
+ LogError("Cannot set address on source");
result = NULL;
}
else
{
result = amqpvalue_create_source(source);
+ if (result == NULL)
+ {
+ LogError("Cannot create source");
+ }
+ else
+ {
+ /* all ok */
+ }
}
amqpvalue_destroy(address_value);
@@ -45,8 +59,10 @@
{
AMQP_VALUE result;
TARGET_HANDLE target = target_create();
+
if (target == NULL)
{
+ LogError("NULL target");
result = NULL;
}
else
@@ -54,17 +70,27 @@
AMQP_VALUE address_value = amqpvalue_create_string(address);
if (address_value == NULL)
{
+ LogError("Cannot create address AMQP string");
result = NULL;
}
else
{
if (target_set_address(target, address_value) != 0)
{
+ LogError("Cannot set address on target");
result = NULL;
}
else
{
result = amqpvalue_create_target(target);
+ if (result == NULL)
+ {
+ LogError("Cannot create target");
+ }
+ else
+ {
+ /* all ok */
+ }
}
amqpvalue_destroy(address_value);
@@ -82,11 +108,21 @@
RECEIVED_HANDLE received = received_create(section_number, section_offset);
if (received == NULL)
{
+ LogError("Cannot create RECEIVED delivery state handle");
result = NULL;
}
else
{
result = amqpvalue_create_received(received);
+ if (result == NULL)
+ {
+ LogError("Cannot create RECEIVED delivery state AMQP value");
+ }
+ else
+ {
+ /* all ok */
+ }
+
received_destroy(received);
}
@@ -99,11 +135,21 @@
ACCEPTED_HANDLE accepted = accepted_create();
if (accepted == NULL)
{
+ LogError("Cannot create ACCEPTED delivery state handle");
result = NULL;
}
else
{
result = amqpvalue_create_accepted(accepted);
+ if (result == NULL)
+ {
+ LogError("Cannot create ACCEPTED delivery state AMQP value");
+ }
+ else
+ {
+ /* all ok */
+ }
+
accepted_destroy(accepted);
}
@@ -116,6 +162,7 @@
REJECTED_HANDLE rejected = rejected_create();
if (rejected == NULL)
{
+ LogError("Cannot create REJECTED delivery state handle");
result = NULL;
}
else
@@ -128,6 +175,7 @@
error_handle = error_create(error_condition);
if (error_handle == NULL)
{
+ LogError("Cannot create error AMQP value for REJECTED state");
error_constructing = true;
}
else
@@ -135,12 +183,14 @@
if ((error_description != NULL) &&
(error_set_description(error_handle, error_description) != 0))
{
+ LogError("Cannot set error description on error AMQP value for REJECTED state");
error_constructing = true;
}
else
{
if (rejected_set_error(rejected, error_handle) != 0)
{
+ LogError("Cannot set error on REJECTED state handle");
error_constructing = true;
}
}
@@ -156,6 +206,14 @@
else
{
result = amqpvalue_create_rejected(rejected);
+ if (result == NULL)
+ {
+ LogError("Cannot create REJECTED delivery state AMQP value");
+ }
+ else
+ {
+ /* all ok */
+ }
}
rejected_destroy(rejected);
@@ -170,11 +228,21 @@
RELEASED_HANDLE released = released_create();
if (released == NULL)
{
+ LogError("Cannot create RELEASED delivery state handle");
result = NULL;
}
else
{
result = amqpvalue_create_released(released);
+ if (result == NULL)
+ {
+ LogError("Cannot create RELEASED delivery state AMQP value");
+ }
+ else
+ {
+ /* all ok */
+ }
+
released_destroy(released);
}
@@ -187,19 +255,37 @@
MODIFIED_HANDLE modified = modified_create();
if (modified == NULL)
{
+ LogError("Cannot create MODIFIED delivery state handle");
result = NULL;
}
else
{
- if ((modified_set_delivery_failed(modified, delivery_failed) != 0) ||
- (modified_set_undeliverable_here(modified, undeliverable_here) != 0) ||
- ((message_annotations != NULL) && (modified_set_message_annotations(modified, message_annotations) != 0)))
+ if (modified_set_delivery_failed(modified, delivery_failed) != 0)
+ {
+ LogError("Cannot set delivery failed on MODIFIED delivery state");
+ result = NULL;
+ }
+ else if (modified_set_undeliverable_here(modified, undeliverable_here) != 0)
{
+ LogError("Cannot set undeliverable here on MODIFIED delivery state");
+ result = NULL;
+ }
+ else if ((message_annotations != NULL) && (modified_set_message_annotations(modified, message_annotations) != 0))
+ {
+ LogError("Cannot set message annotations on MODIFIED delivery state");
result = NULL;
}
else
{
result = amqpvalue_create_modified(modified);
+ if (result == NULL)
+ {
+ LogError("Cannot create MODIFIED delivery state AMQP value");
+ }
+ else
+ {
+ /* all ok */
+ }
}
modified_destroy(modified);
