A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: message_receiver.c
- Revision:
- 28:add19eb7defa
- Parent:
- 27:d74f1cea23e1
- Child:
- 34:6be9c2058664
--- a/message_receiver.c Fri Jun 02 15:53:07 2017 -0700 +++ b/message_receiver.c Fri Jun 30 10:41:22 2017 -0700 @@ -12,218 +12,218 @@ typedef struct MESSAGE_RECEIVER_INSTANCE_TAG { - LINK_HANDLE link; - ON_MESSAGE_RECEIVED on_message_received; - ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed; - MESSAGE_RECEIVER_STATE message_receiver_state; - const void* on_message_receiver_state_changed_context; - const void* callback_context; - MESSAGE_HANDLE decoded_message; - bool decode_error; + LINK_HANDLE link; + ON_MESSAGE_RECEIVED on_message_received; + ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed; + MESSAGE_RECEIVER_STATE message_receiver_state; + const void* on_message_receiver_state_changed_context; + const void* callback_context; + MESSAGE_HANDLE decoded_message; + bool decode_error; } MESSAGE_RECEIVER_INSTANCE; static void set_message_receiver_state(MESSAGE_RECEIVER_INSTANCE* message_receiver_instance, MESSAGE_RECEIVER_STATE new_state) { - MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state; - message_receiver_instance->message_receiver_state = new_state; - if (message_receiver_instance->on_message_receiver_state_changed != NULL) - { - message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state); - } + MESSAGE_RECEIVER_STATE previous_state = message_receiver_instance->message_receiver_state; + message_receiver_instance->message_receiver_state = new_state; + if (message_receiver_instance->on_message_receiver_state_changed != NULL) + { + message_receiver_instance->on_message_receiver_state_changed(message_receiver_instance->on_message_receiver_state_changed_context, new_state, previous_state); + } } static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value) { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; - MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message; - AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value); + MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; + MESSAGE_HANDLE decoded_message = message_receiver_instance->decoded_message; + AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value); - if (is_application_properties_type_by_descriptor(descriptor)) - { - if (message_set_application_properties(decoded_message, decoded_value) != 0) - { - message_receiver_instance->decode_error = true; - } - } - else if (is_properties_type_by_descriptor(descriptor)) - { - PROPERTIES_HANDLE properties; - if (amqpvalue_get_properties(decoded_value, &properties) != 0) - { - message_receiver_instance->decode_error = true; - } - else - { - if (message_set_properties(decoded_message, properties) != 0) - { - message_receiver_instance->decode_error = true; - } + if (is_application_properties_type_by_descriptor(descriptor)) + { + if (message_set_application_properties(decoded_message, decoded_value) != 0) + { + message_receiver_instance->decode_error = true; + } + } + else if (is_properties_type_by_descriptor(descriptor)) + { + PROPERTIES_HANDLE properties; + if (amqpvalue_get_properties(decoded_value, &properties) != 0) + { + message_receiver_instance->decode_error = true; + } + else + { + if (message_set_properties(decoded_message, properties) != 0) + { + message_receiver_instance->decode_error = true; + } - properties_destroy(properties); - } - } - else if (is_delivery_annotations_type_by_descriptor(descriptor)) - { - annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value); - if ((delivery_annotations == NULL) || - (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)) - { - message_receiver_instance->decode_error = true; - } - } - else if (is_message_annotations_type_by_descriptor(descriptor)) - { - annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value); - if ((message_annotations == NULL) || - (message_set_message_annotations(decoded_message, message_annotations) != 0)) - { - message_receiver_instance->decode_error = true; - } - } - else if (is_header_type_by_descriptor(descriptor)) - { - HEADER_HANDLE header; - if (amqpvalue_get_header(decoded_value, &header) != 0) - { - message_receiver_instance->decode_error = true; - } - else - { - if (message_set_header(decoded_message, header) != 0) - { - message_receiver_instance->decode_error = true; - } + properties_destroy(properties); + } + } + else if (is_delivery_annotations_type_by_descriptor(descriptor)) + { + annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value); + if ((delivery_annotations == NULL) || + (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)) + { + message_receiver_instance->decode_error = true; + } + } + else if (is_message_annotations_type_by_descriptor(descriptor)) + { + annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value); + if ((message_annotations == NULL) || + (message_set_message_annotations(decoded_message, message_annotations) != 0)) + { + message_receiver_instance->decode_error = true; + } + } + else if (is_header_type_by_descriptor(descriptor)) + { + HEADER_HANDLE header; + if (amqpvalue_get_header(decoded_value, &header) != 0) + { + message_receiver_instance->decode_error = true; + } + else + { + if (message_set_header(decoded_message, header) != 0) + { + message_receiver_instance->decode_error = true; + } - header_destroy(header); - } - } - else if (is_footer_type_by_descriptor(descriptor)) - { - annotations footer = amqpvalue_get_inplace_described_value(decoded_value); - if ((footer == NULL) || - (message_set_footer(decoded_message, footer) != 0)) - { - message_receiver_instance->decode_error = true; - } - } - else if (is_amqp_value_type_by_descriptor(descriptor)) - { - MESSAGE_BODY_TYPE body_type; - message_get_body_type(decoded_message, &body_type); - if (body_type != MESSAGE_BODY_TYPE_NONE) - { - message_receiver_instance->decode_error = true; - } - else - { - AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value); - if ((body_amqp_value == NULL) || - (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)) - { - message_receiver_instance->decode_error = true; - } - } - } - else if (is_data_type_by_descriptor(descriptor)) - { - MESSAGE_BODY_TYPE body_type; - message_get_body_type(decoded_message, &body_type); - if ((body_type != MESSAGE_BODY_TYPE_NONE) && - (body_type != MESSAGE_BODY_TYPE_DATA)) - { - message_receiver_instance->decode_error = true; - } - else - { - AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value); - data data_value; + header_destroy(header); + } + } + else if (is_footer_type_by_descriptor(descriptor)) + { + annotations footer = amqpvalue_get_inplace_described_value(decoded_value); + if ((footer == NULL) || + (message_set_footer(decoded_message, footer) != 0)) + { + message_receiver_instance->decode_error = true; + } + } + else if (is_amqp_value_type_by_descriptor(descriptor)) + { + MESSAGE_BODY_TYPE body_type; + message_get_body_type(decoded_message, &body_type); + if (body_type != MESSAGE_BODY_TYPE_NONE) + { + message_receiver_instance->decode_error = true; + } + else + { + AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value); + if ((body_amqp_value == NULL) || + (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)) + { + message_receiver_instance->decode_error = true; + } + } + } + else if (is_data_type_by_descriptor(descriptor)) + { + MESSAGE_BODY_TYPE body_type; + message_get_body_type(decoded_message, &body_type); + if ((body_type != MESSAGE_BODY_TYPE_NONE) && + (body_type != MESSAGE_BODY_TYPE_DATA)) + { + message_receiver_instance->decode_error = true; + } + else + { + AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value); + data data_value; - if ((body_data_value == NULL) || - (amqpvalue_get_data(body_data_value, &data_value) != 0)) - { - message_receiver_instance->decode_error = true; - } - else - { + if ((body_data_value == NULL) || + (amqpvalue_get_data(body_data_value, &data_value) != 0)) + { + message_receiver_instance->decode_error = true; + } + else + { BINARY_DATA binary_data; binary_data.bytes = (const unsigned char*)data_value.bytes; binary_data.length = data_value.length; - if (message_add_body_amqp_data(decoded_message, binary_data) != 0) - { - message_receiver_instance->decode_error = true; - } - } - } - } + if (message_add_body_amqp_data(decoded_message, binary_data) != 0) + { + message_receiver_instance->decode_error = true; + } + } + } + } } static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, uint32_t payload_size, const unsigned char* payload_bytes) { - AMQP_VALUE result = NULL; + AMQP_VALUE result = NULL; - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; + MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; (void)transfer; - if (message_receiver_instance->on_message_received != NULL) - { - MESSAGE_HANDLE message = message_create(); - if (message == NULL) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance); - if (amqpvalue_decoder == NULL) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - message_receiver_instance->decoded_message = message; - message_receiver_instance->decode_error = false; - if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - if (message_receiver_instance->decode_error) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { + if (message_receiver_instance->on_message_received != NULL) + { + MESSAGE_HANDLE message = message_create(); + if (message == NULL) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver_instance); + if (amqpvalue_decoder == NULL) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + message_receiver_instance->decoded_message = message; + message_receiver_instance->decode_error = false; + if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + if (message_receiver_instance->decode_error) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { result = message_receiver_instance->on_message_received(message_receiver_instance->callback_context, message); - } - } + } + } - amqpvalue_decoder_destroy(amqpvalue_decoder); - } + amqpvalue_decoder_destroy(amqpvalue_decoder); + } - message_destroy(message); - } - } + message_destroy(message); + } + } - return result; + return result; } static void on_link_state_changed(void* context, LINK_STATE new_link_state, LINK_STATE previous_link_state) { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; + MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)context; (void)previous_link_state; - switch (new_link_state) - { + switch (new_link_state) + { default: break; - case LINK_STATE_ATTACHED: - if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN); - } - break; - case LINK_STATE_DETACHED: + case LINK_STATE_ATTACHED: + if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPEN); + } + break; + case LINK_STATE_DETACHED: if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN) || (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_CLOSING)) { @@ -242,103 +242,103 @@ set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); } break; - } + } } MESSAGE_RECEIVER_HANDLE messagereceiver_create(LINK_HANDLE link, ON_MESSAGE_RECEIVER_STATE_CHANGED on_message_receiver_state_changed, void* context) { - MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE)); - if (result != NULL) - { - result->link = link; - result->on_message_receiver_state_changed = on_message_receiver_state_changed; - result->on_message_receiver_state_changed_context = context; - result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE; - } + MESSAGE_RECEIVER_INSTANCE* result = (MESSAGE_RECEIVER_INSTANCE*)malloc(sizeof(MESSAGE_RECEIVER_INSTANCE)); + if (result != NULL) + { + result->link = link; + result->on_message_receiver_state_changed = on_message_receiver_state_changed; + result->on_message_receiver_state_changed_context = context; + result->message_receiver_state = MESSAGE_RECEIVER_STATE_IDLE; + } - return result; + return result; } void messagereceiver_destroy(MESSAGE_RECEIVER_HANDLE message_receiver) { - if (message_receiver != NULL) - { - (void)messagereceiver_close(message_receiver); - free(message_receiver); - } + if (message_receiver != NULL) + { + (void)messagereceiver_close(message_receiver); + free(message_receiver); + } } int messagereceiver_open(MESSAGE_RECEIVER_HANDLE message_receiver, ON_MESSAGE_RECEIVED on_message_received, void* callback_context) { - int result; + int result; - if (message_receiver == NULL) - { - result = __FAILURE__; - } - else - { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; + if (message_receiver == NULL) + { + result = __FAILURE__; + } + else + { + MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING); - if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0) - { - result = __FAILURE__; - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - message_receiver_instance->on_message_received = on_message_received; - message_receiver_instance->callback_context = callback_context; + if (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_IDLE) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_OPENING); + if (link_attach(message_receiver_instance->link, on_transfer_received, on_link_state_changed, NULL, message_receiver_instance) != 0) + { + result = __FAILURE__; + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + message_receiver_instance->on_message_received = on_message_received; + message_receiver_instance->callback_context = callback_context; - result = 0; - } - } - else - { - result = 0; - } - } + result = 0; + } + } + else + { + result = 0; + } + } - return result; + return result; } int messagereceiver_close(MESSAGE_RECEIVER_HANDLE message_receiver) { - int result; + int result; - if (message_receiver == NULL) - { - result = __FAILURE__; - } - else - { - MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; + if (message_receiver == NULL) + { + result = __FAILURE__; + } + else + { + MESSAGE_RECEIVER_INSTANCE* message_receiver_instance = (MESSAGE_RECEIVER_INSTANCE*)message_receiver; - if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) || - (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN)) - { - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING); + if ((message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPENING) || + (message_receiver_instance->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN)) + { + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_CLOSING); - if (link_detach(message_receiver_instance->link, true) != 0) - { - result = __FAILURE__; - set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - result = 0; - } - } - else - { - result = 0; - } - } + if (link_detach(message_receiver_instance->link, true) != 0) + { + result = __FAILURE__; + set_message_receiver_state(message_receiver_instance, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + result = 0; + } + } + else + { + result = 0; + } + } - return result; + return result; } int messagereceiver_get_link_name(MESSAGE_RECEIVER_HANDLE message_receiver, const char** link_name)