Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: uamqp/src/message_receiver.c
- Revision:
- 0:f7f1f0d76dd6
diff -r 000000000000 -r f7f1f0d76dd6 uamqp/src/message_receiver.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/uamqp/src/message_receiver.c Thu Aug 23 06:52:14 2018 +0000 @@ -0,0 +1,530 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#include <stdlib.h> + +#include "azure_c_shared_utility/optimize_size.h" +#include "azure_c_shared_utility/gballoc.h" +#include "azure_c_shared_utility/xlogging.h" +#include "azure_uamqp_c/link.h" +#include "azure_uamqp_c/amqp_definitions.h" +#include "azure_uamqp_c/message.h" +#include "azure_uamqp_c/message_receiver.h" +#include "azure_uamqp_c/amqpvalue.h" + +typedef struct MESSAGE_RECEIVER_INSTANCE_TAG +{ + LINK_HANDLE link; + ON_MESSAGE_RECEIVED on_message_received; + ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed; + MESSAGE_RECEIVER_STATE message_receiver_state; + const void* on_message_receiver_state_changed_context; + const void* callback_context; + MESSAGE_HANDLE decoded_message; + bool decode_error; +} MESSAGE_RECEIVER_INSTANCE; + +static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver, MESSAGE_RECEIVER_STATE new_state) +{ + MESSAGE_RECEIVER_STATE previous_state = message_receiver->message_receiver_state; + message_receiver->message_receiver_state = new_state; + if (message_receiver->on_message_receiver_state_changed != NULL) + { + message_receiver->on_message_receiver_state_changed(message_receiver->on_message_receiver_state_changed_context, new_state, previous_state); + } +} + +static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value) +{ + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; + MESSAGE_HANDLE decoded_message = message_receiver->decoded_message; + AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value); + + if (is_application_properties_type_by_descriptor(descriptor)) + { + if (message_set_application_properties(decoded_message, decoded_value) != 0) + { + LogError("Error setting application properties on received message"); + message_receiver->decode_error = true; + } + } + else if (is_properties_type_by_descriptor(descriptor)) + { + PROPERTIES_HANDLE properties; + if (amqpvalue_get_properties(decoded_value, &properties) != 0) + { + LogError("Error getting message properties"); + message_receiver->decode_error = true; + } + else + { + if (message_set_properties(decoded_message, properties) != 0) + { + LogError("Error setting message properties on received message"); + message_receiver->decode_error = true; + } + + properties_destroy(properties); + } + } + else if (is_delivery_annotations_type_by_descriptor(descriptor)) + { + annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value); + if (delivery_annotations == NULL) + { + LogError("Error getting delivery annotations"); + message_receiver->decode_error = true; + } + else + { + if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0) + { + LogError("Error setting delivery annotations on received message"); + message_receiver->decode_error = true; + } + } + } + else if (is_message_annotations_type_by_descriptor(descriptor)) + { + annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value); + if (message_annotations == NULL) + { + LogError("Error getting message annotations"); + message_receiver->decode_error = true; + } + else + { + if (message_set_message_annotations(decoded_message, message_annotations) != 0) + { + LogError("Error setting message annotations on received message"); + message_receiver->decode_error = true; + } + } + } + else if (is_header_type_by_descriptor(descriptor)) + { + HEADER_HANDLE header; + if (amqpvalue_get_header(decoded_value, &header) != 0) + { + LogError("Error getting message header"); + message_receiver->decode_error = true; + } + else + { + if (message_set_header(decoded_message, header) != 0) + { + LogError("Error setting message header on received message"); + message_receiver->decode_error = true; + } + + header_destroy(header); + } + } + else if (is_footer_type_by_descriptor(descriptor)) + { + annotations footer = amqpvalue_get_inplace_described_value(decoded_value); + if (footer == NULL) + { + LogError("Error getting message footer"); + message_receiver->decode_error = true; + } + else + { + if (message_set_footer(decoded_message, footer) != 0) + { + LogError("Error setting message footer on received message"); + message_receiver->decode_error = true; + } + } + } + else if (is_amqp_value_type_by_descriptor(descriptor)) + { + MESSAGE_BODY_TYPE body_type; + if (message_get_body_type(decoded_message, &body_type) != 0) + { + LogError("Error getting message body type"); + message_receiver->decode_error = true; + } + else + { + if (body_type != MESSAGE_BODY_TYPE_NONE) + { + LogError("Body already set on received message"); + message_receiver->decode_error = true; + } + else + { + AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value); + if (body_amqp_value == NULL) + { + LogError("Error getting body AMQP value"); + message_receiver->decode_error = true; + } + else + { + if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0) + { + LogError("Error setting body AMQP value on received message"); + message_receiver->decode_error = true; + } + } + } + } + } + else if (is_data_type_by_descriptor(descriptor)) + { + MESSAGE_BODY_TYPE body_type; + if (message_get_body_type(decoded_message, &body_type) != 0) + { + LogError("Error getting message body type"); + message_receiver->decode_error = true; + } + else + { + if ((body_type != MESSAGE_BODY_TYPE_NONE) && + (body_type != MESSAGE_BODY_TYPE_DATA)) + { + LogError("Message body type already set to something different than AMQP DATA"); + message_receiver->decode_error = true; + } + else + { + AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value); + if (body_data_value == NULL) + { + LogError("Error getting body DATA value"); + message_receiver->decode_error = true; + } + else + { + data data_value; + if (amqpvalue_get_data(body_data_value, &data_value) != 0) + { + LogError("Error getting body DATA AMQP value"); + message_receiver->decode_error = true; + } + else + { + BINARY_DATA binary_data; + binary_data.bytes = (const unsigned char*)data_value.bytes; + binary_data.length = data_value.length; + if (message_add_body_amqp_data(decoded_message, binary_data) != 0) + { + LogError("Error adding body DATA to received message"); + message_receiver->decode_error = true; + } + } + } + } + } + } +} + +static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes) +{ + AMQP_VALUE result = NULL; + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; + + (void)transfer; + if (message_receiver->on_message_received != NULL) + { + MESSAGE_HANDLE message = message_create(); + if (message == NULL) + { + LogError("Cannot create message"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver); + if (amqpvalue_decoder == NULL) + { + LogError("Cannot create AMQP value decoder"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + message_receiver->decoded_message = message; + message_receiver->decode_error = false; + if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) + { + LogError("Cannot decode bytes"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + if (message_receiver->decode_error) + { + LogError("Error decoding message"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + result = message_receiver->on_message_received(message_receiver->callback_context, message); + } + } + + amqpvalue_decoder_destroy(amqpvalue_decoder); + } + + message_destroy(message); + } + } + + return result; +} + +static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state) +{ + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; + (void)previous_link_state; + + switch (new_link_state) + { + default: + break; + + case LINK_STATE_ATTACHED: + if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) + { + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPEN); + } + break; + case LINK_STATE_DETACHED: + if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) || + (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING)) + { + /* User initiated transition, we should be good */ + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_IDLE); + } + else if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE) + { + /* Any other transition must be an error */ + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + break; + case LINK_STATE_ERROR: + if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR) + { + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + break; + } +} + +MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context) +{ + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE)); + if (message_receiver == NULL) + { + LogError("Error creating message receiver"); + } + else + { + message_receiver->link = link; + message_receiver->on_message_receiver_state_changed = on_message_receiver_state_changed; + message_receiver->on_message_receiver_state_changed_context = context; + message_receiver->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE; + } + + return message_receiver; +} + +void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver) +{ + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + } + else + { + (void)messagereceiver_close(message_receiver); + free(message_receiver); + } +} + +int messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, ON_MESSAGE_RECEIVED on_message_received, void* callback_context) +{ + int result; + + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + result = __FAILURE__; + } + else + { + if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE) + { + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPENING); + if (link_attach(message_receiver->link, on_transfer_received, on_link_state_changed, NULL, message_receiver) != 0) + { + LogError("Link attach failed"); + result = __FAILURE__; + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + message_receiver->on_message_received = on_message_received; + message_receiver->callback_context = callback_context; + + result = 0; + } + } + else + { + result = 0; + } + } + + return result; +} + +int messagereceiver_close(MESSAGE_RECEIVER_HANDLE message_receiver) +{ + int result; + + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + result = __FAILURE__; + } + else + { + if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) || + (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN)) + { + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_CLOSING); + + if (link_detach(message_receiver->link, true, NULL, NULL, NULL) != 0) + { + LogError("link detach failed"); + result = __FAILURE__; + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + result = 0; + } + } + else + { + result = 0; + } + } + + return result; +} + +int messagereceiver_get_link_name(MESSAGE_RECEIVER_HANDLE message_receiver, const char** link_name) +{ + int result; + + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + result = __FAILURE__; + } + else + { + if (link_get_name(message_receiver->link, link_name) != 0) + { + LogError("Getting link name failed"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + return result; +} + +int messagereceiver_get_received_message_id(MESSAGE_RECEIVER_HANDLE message_receiver, delivery_number* message_id) +{ + int result; + + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + result = __FAILURE__; + } + else + { + if (link_get_received_message_id(message_receiver->link, message_id) != 0) + { + LogError("Failed getting received message Id"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + return result; +} + +int messagereceiver_send_message_disposition(MESSAGE_RECEIVER_HANDLE message_receiver, const char* link_name, delivery_number message_number, AMQP_VALUE delivery_state) +{ + int result; + + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + result = __FAILURE__; + } + else + { + if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN) + { + LogError("Message received not open"); + result = __FAILURE__; + } + else + { + const char* my_name; + if (link_get_name(message_receiver->link, &my_name) != 0) + { + LogError("Failed getting link name"); + result = __FAILURE__; + } + else + { + if (strcmp(link_name, my_name) != 0) + { + LogError("Link name does not match"); + result = __FAILURE__; + } + else + { + if (link_send_disposition(message_receiver->link, message_number, delivery_state) != 0) + { + LogError("Seding disposition failed"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + } + } + } + + return result; +} + +void messagereceiver_set_trace(MESSAGE_RECEIVER_HANDLE message_receiver, bool trace_on) +{ + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + } + else + { + /* No tracing is yet implemented for message receiver */ + (void)trace_on; + } +}