A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: message_receiver.c
- Revision:
- 34:6be9c2058664
- Parent:
- 28:add19eb7defa
- Child:
- 40:f0ceafa8d570
diff -r 08b53020ff0d -r 6be9c2058664 message_receiver.c --- a/message_receiver.c Mon Sep 25 13:38:40 2017 -0700 +++ b/message_receiver.c Sat Oct 21 20:12:19 2017 +0000 @@ -22,27 +22,28 @@ bool decode_error; } MESSAGE_RECEIVER_INSTANCE; -static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state) +static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver, MESSAGE_RECEIVER_STATE new_state) { - MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state; - message_receiver_instance->message_receiver_state = new_state; - if (message_receiver_instance->on_message_receiver_state_changed != NULL) + MESSAGE_RECEIVER_STATE previous_state = message_receiver->message_receiver_state; + message_receiver->message_receiver_state = new_state; + if (message_receiver->on_message_receiver_state_changed != NULL) { - message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state); + message_receiver->on_message_receiver_state_changed(message_receiver->on_message_receiver_state_changed_context, new_state, previous_state); } } static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value) { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; - MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message; + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; + MESSAGE_HANDLE decoded_message = message_receiver->decoded_message; AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value); if (is_application_properties_type_by_descriptor(descriptor)) { if (message_set_application_properties(decoded_message, decoded_value) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error setting application properties on received message"); + message_receiver->decode_error = true; } } else if (is_properties_type_by_descriptor(descriptor)) @@ -50,13 +51,15 @@ PROPERTIES_HANDLE properties; if (amqpvalue_get_properties(decoded_value, &properties) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error getting message properties"); + message_receiver->decode_error = true; } else { if (message_set_properties(decoded_message, properties) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error setting message properties on received message"); + message_receiver->decode_error = true; } properties_destroy(properties); @@ -65,19 +68,35 @@ else if (is_delivery_annotations_type_by_descriptor(descriptor)) { annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value); - if ((delivery_annotations == NULL) || - (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)) + if (delivery_annotations == NULL) { - message_receiver_instance->decode_error = true; + LogError("Error getting delivery annotations"); + message_receiver->decode_error = true; + } + else + { + if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0) + { + LogError("Error setting delivery annotations on received message"); + message_receiver->decode_error = true; + } } } else if (is_message_annotations_type_by_descriptor(descriptor)) { annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value); - if ((message_annotations == NULL) || - (message_set_message_annotations(decoded_message, message_annotations) != 0)) + if (message_annotations == NULL) { - message_receiver_instance->decode_error = true; + LogError("Error getting message annotations"); + message_receiver->decode_error = true; + } + else + { + if (message_set_message_annotations(decoded_message, message_annotations) != 0) + { + LogError("Error setting message annotations on received message"); + message_receiver->decode_error = true; + } } } else if (is_header_type_by_descriptor(descriptor)) @@ -85,13 +104,15 @@ HEADER_HANDLE header; if (amqpvalue_get_header(decoded_value, &header) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error getting message header"); + message_receiver->decode_error = true; } else { if (message_set_header(decoded_message, header) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error setting message header on received message"); + message_receiver->decode_error = true; } header_destroy(header); @@ -100,57 +121,97 @@ else if (is_footer_type_by_descriptor(descriptor)) { annotations footer = amqpvalue_get_inplace_described_value(decoded_value); - if ((footer == NULL) || - (message_set_footer(decoded_message, footer) != 0)) + if (footer == NULL) { - message_receiver_instance->decode_error = true; + LogError("Error getting message footer"); + message_receiver->decode_error = true; + } + else + { + if (message_set_footer(decoded_message, footer) != 0) + { + LogError("Error setting message footer on received message"); + message_receiver->decode_error = true; + } } } else if (is_amqp_value_type_by_descriptor(descriptor)) { MESSAGE_BODY_TYPE body_type; - message_get_body_type(decoded_message, &body_type); - if (body_type != MESSAGE_BODY_TYPE_NONE) + if (message_get_body_type(decoded_message, &body_type) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error getting message body type"); + message_receiver->decode_error = true; } else { - AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value); - if ((body_amqp_value == NULL) || - (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)) + if (body_type != MESSAGE_BODY_TYPE_NONE) + { + LogError("Body already set on received message"); + message_receiver->decode_error = true; + } + else { - message_receiver_instance->decode_error = true; + AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value); + if (body_amqp_value == NULL) + { + LogError("Error getting body AMQP value"); + message_receiver->decode_error = true; + } + else + { + if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0) + { + LogError("Error setting body AMQP value on received message"); + message_receiver->decode_error = true; + } + } } } } else if (is_data_type_by_descriptor(descriptor)) { MESSAGE_BODY_TYPE body_type; - message_get_body_type(decoded_message, &body_type); - if ((body_type != MESSAGE_BODY_TYPE_NONE) && - (body_type != MESSAGE_BODY_TYPE_DATA)) + if (message_get_body_type(decoded_message, &body_type) != 0) { - message_receiver_instance->decode_error = true; + LogError("Error getting message body type"); + message_receiver->decode_error = true; } else { - AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value); - data data_value; - - if ((body_data_value == NULL) || - (amqpvalue_get_data(body_data_value, &data_value) != 0)) + if ((body_type != MESSAGE_BODY_TYPE_NONE) && + (body_type != MESSAGE_BODY_TYPE_DATA)) { - message_receiver_instance->decode_error = true; + LogError("Message body type already set to something different than AMQP DATA"); + message_receiver->decode_error = true; } else { - BINARY_DATA binary_data; - binary_data.bytes = (const unsigned char*)data_value.bytes; - binary_data.length = data_value.length; - if (message_add_body_amqp_data(decoded_message, binary_data) != 0) + AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value); + if (body_data_value == NULL) + { + LogError("Error getting body DATA value"); + message_receiver->decode_error = true; + } + else { - message_receiver_instance->decode_error = true; + data data_value; + if (amqpvalue_get_data(body_data_value, &data_value) != 0) + { + LogError("Error getting body DATA AMQP value"); + message_receiver->decode_error = true; + } + else + { + BINARY_DATA binary_data; + binary_data.bytes = (const unsigned char*)data_value.bytes; + binary_data.length = data_value.length; + if (message_add_body_amqp_data(decoded_message, binary_data) != 0) + { + LogError("Error adding body DATA to received message"); + message_receiver->decode_error = true; + } + } } } } @@ -160,40 +221,44 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes) { AMQP_VALUE result = NULL; + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; (void)transfer; - if (message_receiver_instance->on_message_received != NULL) + if (message_receiver->on_message_received != NULL) { MESSAGE_HANDLE message = message_create(); if (message == NULL) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + LogError("Cannot create message"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance); + AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver); if (amqpvalue_decoder == NULL) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + LogError("Cannot create AMQP value decoder"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - message_receiver_instance->decoded_message = message; - message_receiver_instance->decode_error = false; + message_receiver->decoded_message = message; + message_receiver->decode_error = false; if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + LogError("Cannot decode bytes"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - if (message_receiver_instance->decode_error) + if (message_receiver->decode_error) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + LogError("Error decoding message"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message); + result = message_receiver->on_message_received(message_receiver->callback_context, message); } } @@ -209,7 +274,7 @@ static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state) { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; (void)previous_link_state; switch (new_link_state) @@ -218,28 +283,28 @@ break; case LINK_STATE_ATTACHED: - if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) + if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPEN); } break; case LINK_STATE_DETACHED: - if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) || - (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING)) + if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) || + (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING)) { /* User initiated transition, we should be good */ - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_IDLE); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_IDLE); } - else if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE) + else if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_IDLE) { /* Any other transition must be an error */ - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } break; case LINK_STATE_ERROR: - if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR) + if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_ERROR) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } break; } @@ -247,21 +312,29 @@ MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context) { - MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE)); - if (result != NULL) + MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE)); + if (message_receiver == NULL) { - result->link = link; - result->on_message_receiver_state_changed = on_message_receiver_state_changed; - result->on_message_receiver_state_changed_context = context; - result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE; + LogError("Error creating message receiver"); + } + else + { + message_receiver->link = link; + message_receiver->on_message_receiver_state_changed = on_message_receiver_state_changed; + message_receiver->on_message_receiver_state_changed_context = context; + message_receiver->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE; } - return result; + return message_receiver; } void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver) { - if (message_receiver != NULL) + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + } + else { (void)messagereceiver_close(message_receiver); free(message_receiver); @@ -274,24 +347,24 @@ if (message_receiver == NULL) { + LogError("NULL message_receiver"); result = __FAILURE__; } else { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - - if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE) + if (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING); - if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0) + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_OPENING); + if (link_attach(message_receiver->link, on_transfer_received, on_link_state_changed, NULL, message_receiver) != 0) { + LogError("Link attach failed"); result = __FAILURE__; - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - message_receiver_instance->on_message_received = on_message_received; - message_receiver_instance->callback_context = callback_context; + message_receiver->on_message_received = on_message_received; + message_receiver->callback_context = callback_context; result = 0; } @@ -311,21 +384,21 @@ if (message_receiver == NULL) { + LogError("NULL message_receiver"); result = __FAILURE__; } else { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - - if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) || - (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN)) + if ((message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) || + (message_receiver->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN)) { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_CLOSING); - if (link_detach(message_receiver_instance->link, true) != 0) + if (link_detach(message_receiver->link, true) != 0) { + LogError("link detach failed"); result = __FAILURE__; - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { @@ -347,13 +420,14 @@ if (message_receiver == NULL) { + LogError("NULL message_receiver"); result = __FAILURE__; } else { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - if (link_get_name(message_receiver_instance->link, link_name) != 0) + if (link_get_name(message_receiver->link, link_name) != 0) { + LogError("Getting link name failed"); result = __FAILURE__; } else @@ -371,13 +445,14 @@ if (message_receiver == NULL) { + LogError("NULL message_receiver"); result = __FAILURE__; } else { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - if (link_get_received_message_id(message_receiver_instance->link, message_id) != 0) + if (link_get_received_message_id(message_receiver->link, message_id) != 0) { + LogError("Failed getting received message Id"); result = __FAILURE__; } else @@ -395,32 +470,36 @@ if (message_receiver == NULL) { + LogError("NULL message_receiver"); result = __FAILURE__; } else { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - if (message_receiver_instance->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN) + if (message_receiver->message_receiver_state != MESSAGE_RECEIVER_STATE_OPEN) { + LogError("Message received not open"); result = __FAILURE__; } else { const char* my_name; - if (link_get_name(message_receiver_instance->link, &my_name) != 0) + if (link_get_name(message_receiver->link, &my_name) != 0) { + LogError("Failed getting link name"); result = __FAILURE__; } else { if (strcmp(link_name, my_name) != 0) { + LogError("Link name does not match"); result = __FAILURE__; } else { - if (link_send_disposition(message_receiver_instance->link, message_number, delivery_state) != 0) + if (link_send_disposition(message_receiver->link, message_number, delivery_state) != 0) { + LogError("Seding disposition failed"); result = __FAILURE__; } else @@ -437,7 +516,13 @@ void messagereceiver_set_trace(MESSAGE_RECEIVER_HANDLE message_receiver, bool trace_on) { - /* No tracing is yet implemented for message receiver */ - (void)message_receiver; - (void)trace_on; + if (message_receiver == NULL) + { + LogError("NULL message_receiver"); + } + else + { + /* No tracing is yet implemented for message receiver */ + (void)trace_on; + } }