A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
message.c
- Committer:
- AzureIoTClient
- Date:
- 2017-04-21
- Revision:
- 23:1111ee8bcba4
- Parent:
- 20:206846c14c80
- Child:
- 24:2c59c2d43ebf
File content as of revision 23:1111ee8bcba4:
// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. #include <stdlib.h> #include <string.h> #include "azure_c_shared_utility/optimize_size.h" #include "azure_c_shared_utility/gballoc.h" #include "azure_uamqp_c/message.h" #include "azure_uamqp_c/amqpvalue.h" typedef struct BODY_AMQP_DATA_TAG { unsigned char* body_data_section_bytes; size_t body_data_section_length; } BODY_AMQP_DATA; typedef struct MESSAGE_INSTANCE_TAG { BODY_AMQP_DATA* body_amqp_data_items; size_t body_amqp_data_count; AMQP_VALUE* body_amqp_sequence_items; size_t body_amqp_sequence_count; AMQP_VALUE body_amqp_value; HEADER_HANDLE header; annotations delivery_annotations; annotations message_annotations; PROPERTIES_HANDLE properties; application_properties application_properties; annotations footer; uint32_t message_format; } MESSAGE_INSTANCE; static void free_all_body_data_items(MESSAGE_INSTANCE* message_instance) { size_t i; for (i = 0; i < message_instance->body_amqp_data_count; i++) { if (message_instance->body_amqp_data_items[i].body_data_section_bytes != NULL) { free(message_instance->body_amqp_data_items[i].body_data_section_bytes); } } free(message_instance->body_amqp_data_items); message_instance->body_amqp_data_count = 0; message_instance->body_amqp_data_items = NULL; } static void free_all_body_sequence_items(MESSAGE_INSTANCE* message_instance) { size_t i; for (i = 0; i < message_instance->body_amqp_sequence_count; i++) { if (message_instance->body_amqp_sequence_items[i] != NULL) { amqpvalue_destroy(message_instance->body_amqp_sequence_items[i]); } } free(message_instance->body_amqp_sequence_items); message_instance->body_amqp_sequence_count = 0; message_instance->body_amqp_sequence_items = NULL; } MESSAGE_HANDLE message_create(void) { MESSAGE_INSTANCE* result = (MESSAGE_INSTANCE*)malloc(sizeof(MESSAGE_INSTANCE)); /* Codes_SRS_MESSAGE_01_002: [If allocating memory for the message fails, message_create shall fail and return NULL.] */ if (result != NULL) { result->header = NULL; result->delivery_annotations = NULL; result->message_annotations = NULL; result->properties = NULL; result->application_properties = NULL; result->footer = NULL; result->body_amqp_data_items = NULL; result->body_amqp_data_count = 0; result->body_amqp_value = NULL; result->body_amqp_sequence_items = NULL; result->body_amqp_sequence_count = 0; result->message_format = 0; } /* Codes_SRS_MESSAGE_01_001: [message_create shall create a new AMQP message instance and on success it shall return a non-NULL handle for the newly created message instance.] */ return result; } MESSAGE_HANDLE message_clone(MESSAGE_HANDLE source_message) { MESSAGE_INSTANCE* result; /* Codes_SRS_MESSAGE_01_062: [If source_message is NULL, message_clone shall fail and return NULL.] */ if (source_message == NULL) { result = NULL; } else { MESSAGE_INSTANCE* source_message_instance = (MESSAGE_INSTANCE*)source_message; result = (MESSAGE_INSTANCE*)message_create(); /* Codes_SRS_MESSAGE_01_003: [message_clone shall clone a message entirely and on success return a non-NULL handle to the cloned message.] */ /* Codes_SRS_MESSAGE_01_004: [If allocating memory for the new cloned message fails, message_clone shall fail and return NULL.] */ if (result != NULL) { result->message_format = source_message_instance->message_format; if (source_message_instance->header != NULL) { /* Codes_SRS_MESSAGE_01_005: [If a header exists on the source message it shall be cloned by using header_clone.] */ result->header = header_clone(source_message_instance->header); if (result->header == NULL) { message_destroy(result); result = NULL; } } if ((result != NULL) && (source_message_instance->delivery_annotations != NULL)) { /* Codes_SRS_MESSAGE_01_006: [If delivery annotations exist on the source message they shall be cloned by using annotations_clone.] */ result->delivery_annotations = annotations_clone(source_message_instance->delivery_annotations); if (result->delivery_annotations == NULL) { message_destroy(result); result = NULL; } } if ((result != NULL) && (source_message_instance->message_annotations != NULL)) { /* Codes_SRS_MESSAGE_01_007: [If message annotations exist on the source message they shall be cloned by using annotations_clone.] */ result->message_annotations = annotations_clone(source_message_instance->message_annotations); if (result->message_annotations == NULL) { message_destroy(result); result = NULL; } } if ((result != NULL) && (source_message_instance->properties != NULL)) { /* Codes_SRS_MESSAGE_01_008: [If message properties exist on the source message they shall be cloned by using properties_clone.] */ result->properties = properties_clone(source_message_instance->properties); if (result->properties == NULL) { message_destroy(result); result = NULL; } } if ((result != NULL) && (source_message_instance->application_properties != NULL)) { /* Codes_SRS_MESSAGE_01_009: [If application properties exist on the source message they shall be cloned by using amqpvalue_clone.] */ result->application_properties = amqpvalue_clone(source_message_instance->application_properties); if (result->application_properties == NULL) { message_destroy(result); result = NULL; } } if ((result != NULL) && (source_message_instance->footer != NULL)) { /* Codes_SRS_MESSAGE_01_010: [If a footer exists on the source message it shall be cloned by using annotations_clone.] */ result->footer = amqpvalue_clone(source_message_instance->footer); if (result->footer == NULL) { message_destroy(result); result = NULL; } } if ((result != NULL) && (source_message_instance->body_amqp_data_count > 0)) { size_t i; result->body_amqp_data_items = (BODY_AMQP_DATA*)malloc(source_message_instance->body_amqp_data_count * sizeof(BODY_AMQP_DATA)); if (result->body_amqp_data_items == NULL) { message_destroy(result); result = NULL; } else { for (i = 0; i < source_message_instance->body_amqp_data_count; i++) { result->body_amqp_data_items[i].body_data_section_length = source_message_instance->body_amqp_data_items[i].body_data_section_length; /* Codes_SRS_MESSAGE_01_011: [If an AMQP data has been set as message body on the source message it shall be cloned by allocating memory for the binary payload.] */ result->body_amqp_data_items[i].body_data_section_bytes = (unsigned char*)malloc(source_message_instance->body_amqp_data_items[i].body_data_section_length); if (result->body_amqp_data_items[i].body_data_section_bytes == NULL) { break; } else { (void)memcpy(result->body_amqp_data_items[i].body_data_section_bytes, source_message_instance->body_amqp_data_items[i].body_data_section_bytes, result->body_amqp_data_items[i].body_data_section_length); } } result->body_amqp_data_count = i; if (i < source_message_instance->body_amqp_data_count) { message_destroy(result); result = NULL; } } } if ((result != NULL) && (source_message_instance->body_amqp_sequence_count > 0)) { size_t i; result->body_amqp_sequence_items = (AMQP_VALUE*)malloc(source_message_instance->body_amqp_sequence_count * sizeof(AMQP_VALUE)); if (result->body_amqp_sequence_items == NULL) { message_destroy(result); result = NULL; } else { for (i = 0; i < source_message_instance->body_amqp_sequence_count; i++) { /* Codes_SRS_MESSAGE_01_011: [If an AMQP data has been set as message body on the source message it shall be cloned by allocating memory for the binary payload.] */ result->body_amqp_sequence_items[i] = amqpvalue_clone(source_message_instance->body_amqp_sequence_items[i]); if (result->body_amqp_sequence_items[i] == NULL) { break; } } result->body_amqp_sequence_count = i; if (i < source_message_instance->body_amqp_sequence_count) { message_destroy(result); result = NULL; } } } if ((result != NULL) && (source_message_instance->body_amqp_value != NULL)) { result->body_amqp_value = amqpvalue_clone(source_message_instance->body_amqp_value); if (result->body_amqp_value == NULL) { message_destroy(result); result = NULL; } } } } return result; } void message_destroy(MESSAGE_HANDLE message) { if (message != NULL) { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->header != NULL) { header_destroy(message_instance->header); } if (message_instance->properties != NULL) { properties_destroy(message_instance->properties); } if (message_instance->application_properties != NULL) { application_properties_destroy(message_instance->application_properties); } if (message_instance->footer != NULL) { annotations_destroy(message_instance->footer); } if (message_instance->body_amqp_value != NULL) { amqpvalue_destroy(message_instance->body_amqp_value); } if (message_instance->message_annotations != NULL) { application_properties_destroy(message_instance->message_annotations); } free_all_body_data_items(message_instance); free_all_body_sequence_items(message_instance); free(message_instance); } } int message_set_header(MESSAGE_HANDLE message, HEADER_HANDLE header) { int result; if ((message == NULL) || (header == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; HEADER_HANDLE new_header; new_header = header_clone(header); if (new_header == NULL) { result = __FAILURE__; } else { if (message_instance->header != NULL) { header_destroy(message_instance->header); } message_instance->header = new_header; result = 0; } } return result; } int message_get_header(MESSAGE_HANDLE message, HEADER_HANDLE* header) { int result; if ((message == NULL) || (header == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->header == NULL) { *header = NULL; result = 0; } else { *header = header_clone(message_instance->header); if (*header == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_set_delivery_annotations(MESSAGE_HANDLE message, annotations delivery_annotations) { int result; if ((message == NULL) || (delivery_annotations == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; annotations new_delivery_annotations; new_delivery_annotations = annotations_clone(delivery_annotations); if (new_delivery_annotations == NULL) { result = __FAILURE__; } else { if (message_instance->delivery_annotations != NULL) { annotations_destroy(message_instance->delivery_annotations); } message_instance->delivery_annotations = new_delivery_annotations; result = 0; } } return result; } int message_get_delivery_annotations(MESSAGE_HANDLE message, annotations* delivery_annotations) { int result; if ((message == NULL) || (delivery_annotations == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->delivery_annotations == NULL) { *delivery_annotations = NULL; result = 0; } else { *delivery_annotations = annotations_clone(message_instance->delivery_annotations); if (*delivery_annotations == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_set_message_annotations(MESSAGE_HANDLE message, annotations message_annotations) { int result; if ((message == NULL) || (message_annotations == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; annotations new_message_annotations; new_message_annotations = annotations_clone(message_annotations); if (new_message_annotations == NULL) { result = __FAILURE__; } else { if (message_instance->message_annotations != NULL) { annotations_destroy(message_instance->message_annotations); } message_instance->message_annotations = new_message_annotations; result = 0; } } return result; } int message_get_message_annotations(MESSAGE_HANDLE message, annotations* message_annotations) { int result; if ((message == NULL) || (message_annotations == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->message_annotations == NULL) { *message_annotations = NULL; result = 0; } else { *message_annotations = annotations_clone(message_instance->message_annotations); if (*message_annotations == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_set_properties(MESSAGE_HANDLE message, PROPERTIES_HANDLE properties) { int result; if ((message == NULL) || (properties == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; PROPERTIES_HANDLE new_properties; new_properties = properties_clone(properties); if (new_properties == NULL) { result = __FAILURE__; } else { if (message_instance->properties != NULL) { properties_destroy(message_instance->properties); } message_instance->properties = new_properties; result = 0; } } return result; } int message_get_properties(MESSAGE_HANDLE message, PROPERTIES_HANDLE* properties) { int result; if ((message == NULL) || (properties == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->properties == NULL) { *properties = NULL; result = 0; } else { *properties = properties_clone(message_instance->properties); if (*properties == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_set_application_properties(MESSAGE_HANDLE message, AMQP_VALUE application_properties) { int result; if ((message == NULL) || (application_properties == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; AMQP_VALUE new_application_properties; new_application_properties = application_properties_clone(application_properties); if (new_application_properties == NULL) { result = __FAILURE__; } else { if (message_instance->application_properties != NULL) { amqpvalue_destroy(message_instance->application_properties); } message_instance->application_properties = new_application_properties; result = 0; } } return result; } int message_get_application_properties(MESSAGE_HANDLE message, AMQP_VALUE* application_properties) { int result; if ((message == NULL) || (application_properties == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->application_properties == NULL) { *application_properties = NULL; result = 0; } else { *application_properties = application_properties_clone(message_instance->application_properties); if (*application_properties == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_set_footer(MESSAGE_HANDLE message, annotations footer) { int result; if ((message == NULL) || (footer == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; AMQP_VALUE new_footer; new_footer = annotations_clone(footer); if (new_footer == NULL) { result = __FAILURE__; } else { if (message_instance->footer != NULL) { annotations_destroy(message_instance->footer); } message_instance->footer = new_footer; result = 0; } } return result; } int message_get_footer(MESSAGE_HANDLE message, annotations* footer) { int result; if ((message == NULL) || (footer == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->footer == NULL) { *footer = NULL; result = 0; } else { *footer = annotations_clone(message_instance->footer); if (*footer == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_add_body_amqp_data(MESSAGE_HANDLE message, BINARY_DATA binary_data) { int result; MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if ((message == NULL) || ((binary_data.bytes == NULL) && (binary_data.length != 0))) { result = __FAILURE__; } else { BODY_AMQP_DATA* new_body_amqp_data_items = (BODY_AMQP_DATA*)realloc(message_instance->body_amqp_data_items, sizeof(BODY_AMQP_DATA) * (message_instance->body_amqp_data_count + 1)); if (new_body_amqp_data_items == NULL) { result = __FAILURE__; } else { message_instance->body_amqp_data_items = new_body_amqp_data_items; message_instance->body_amqp_data_items[message_instance->body_amqp_data_count].body_data_section_bytes = (unsigned char*)malloc(binary_data.length); if (message_instance->body_amqp_data_items[message_instance->body_amqp_data_count].body_data_section_bytes == NULL) { result = __FAILURE__; } else { message_instance->body_amqp_data_items[message_instance->body_amqp_data_count].body_data_section_length = binary_data.length; (void)memcpy(message_instance->body_amqp_data_items[message_instance->body_amqp_data_count].body_data_section_bytes, binary_data.bytes, binary_data.length); if (message_instance->body_amqp_value != NULL) { amqpvalue_destroy(message_instance->body_amqp_value); message_instance->body_amqp_value = NULL; } free_all_body_sequence_items(message_instance); message_instance->body_amqp_data_count++; result = 0; } } } return result; } int message_get_body_amqp_data(MESSAGE_HANDLE message, size_t index, BINARY_DATA* binary_data) { int result; if ((message == NULL) || (binary_data == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (index >= message_instance->body_amqp_data_count) { result = __FAILURE__; } else { binary_data->bytes = message_instance->body_amqp_data_items[index].body_data_section_bytes; binary_data->length = message_instance->body_amqp_data_items[index].body_data_section_length; result = 0; } } return result; } int message_get_body_amqp_data_count(MESSAGE_HANDLE message, size_t* count) { int result; MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if ((message == NULL) || (count == NULL)) { result = __FAILURE__; } else { *count = message_instance->body_amqp_data_count; result = 0; } return result; } int message_add_body_amqp_sequence(MESSAGE_HANDLE message, AMQP_VALUE sequence_list) { int result; size_t item_count; MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if ((message == NULL) || (sequence_list == NULL) || (amqpvalue_get_list_item_count(sequence_list, (uint32_t*)&item_count) != 0)) { result = __FAILURE__; } else { AMQP_VALUE* new_body_amqp_sequence_items = (AMQP_VALUE*)realloc(message_instance->body_amqp_sequence_items, sizeof(AMQP_VALUE) * (message_instance->body_amqp_sequence_count + 1)); if (new_body_amqp_sequence_items == NULL) { result = __FAILURE__; } else { message_instance->body_amqp_sequence_items = new_body_amqp_sequence_items; message_instance->body_amqp_sequence_items[message_instance->body_amqp_sequence_count] = amqpvalue_clone(sequence_list); if (message_instance->body_amqp_sequence_items[message_instance->body_amqp_sequence_count] == NULL) { result = __FAILURE__; } else { if (message_instance->body_amqp_value != NULL) { amqpvalue_destroy(message_instance->body_amqp_value); message_instance->body_amqp_value = NULL; } free_all_body_data_items(message_instance); message_instance->body_amqp_sequence_count++; result = 0; } } } return result; } int message_get_body_amqp_sequence(MESSAGE_HANDLE message, size_t index, AMQP_VALUE* sequence_list) { int result; if ((message == NULL) || (sequence_list == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (index >= message_instance->body_amqp_sequence_count) { result = __FAILURE__; } else { *sequence_list = amqpvalue_clone(message_instance->body_amqp_sequence_items[index]); if (*sequence_list == NULL) { result = __FAILURE__; } else { result = 0; } } } return result; } int message_get_body_amqp_sequence_count(MESSAGE_HANDLE message, size_t* count) { int result; MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if ((message == NULL) || (count == NULL)) { result = __FAILURE__; } else { *count = message_instance->body_amqp_sequence_count; result = 0; } return result; } int message_set_body_amqp_value(MESSAGE_HANDLE message, AMQP_VALUE body_amqp_value) { int result; MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if ((message == NULL) || (body_amqp_value == NULL)) { result = __FAILURE__; } else { message_instance->body_amqp_value = amqpvalue_clone(body_amqp_value); free_all_body_data_items(message_instance); free_all_body_sequence_items(message_instance); result = 0; } return result; } int message_get_inplace_body_amqp_value(MESSAGE_HANDLE message, AMQP_VALUE* body_amqp_value) { int result; if ((message == NULL) || (body_amqp_value == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; *body_amqp_value = message_instance->body_amqp_value; result = 0; } return result; } int message_get_body_type(MESSAGE_HANDLE message, MESSAGE_BODY_TYPE* body_type) { int result; if ((message == NULL) || (body_type == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; if (message_instance->body_amqp_value != NULL) { *body_type = MESSAGE_BODY_TYPE_VALUE; } else if (message_instance->body_amqp_data_count > 0) { *body_type = MESSAGE_BODY_TYPE_DATA; } else if (message_instance->body_amqp_sequence_count > 0) { *body_type = MESSAGE_BODY_TYPE_SEQUENCE; } else { *body_type = MESSAGE_BODY_TYPE_NONE; } result = 0; } return result; } int message_set_message_format(MESSAGE_HANDLE message, uint32_t message_format) { int result; if (message == NULL) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; message_instance->message_format = message_format; result = 0; } return result; } int message_get_message_format(MESSAGE_HANDLE message, uint32_t *message_format) { int result; if ((message == NULL) || (message_format == NULL)) { result = __FAILURE__; } else { MESSAGE_INSTANCE* message_instance = (MESSAGE_INSTANCE*)message; *message_format = message_instance->message_format; result = 0; } return result; }