A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: frame_codec.c
- Revision:
- 21:f9c433d8e6ca
- Parent:
- 19:000ab4e6a2c1
- Child:
- 22:524bded3f7a8
--- a/frame_codec.c Fri Mar 10 11:47:49 2017 -0800 +++ b/frame_codec.c Fri Mar 24 16:35:45 2017 -0700 @@ -6,12 +6,11 @@ #include <stdbool.h> #include <string.h> #include "azure_c_shared_utility/optimize_size.h" +#include "azure_c_shared_utility/gballoc.h" #include "azure_c_shared_utility/xlogging.h" -#include "azure_c_shared_utility/xio.h" #include "azure_c_shared_utility/singlylinkedlist.h" #include "azure_uamqp_c/frame_codec.h" #include "azure_uamqp_c/amqpvalue.h" -#include "azure_uamqp_c/amqpalloc.h" #define FRAME_HEADER_SIZE 8 #define MAX_TYPE_SPECIFIC_SIZE ((255 * 4) - 6) @@ -26,12 +25,6 @@ RECEIVE_FRAME_STATE_ERROR } RECEIVE_FRAME_STATE; -typedef enum ENCODE_FRAME_STATE_TAG -{ - ENCODE_FRAME_STATE_IDLE, - ENCODE_FRAME_STATE_ERROR -} ENCODE_FRAME_STATE; - typedef struct SUBSCRIPTION_TAG { uint8_t frame_type; @@ -56,9 +49,6 @@ ON_FRAME_CODEC_ERROR on_frame_codec_error; void* on_frame_codec_error_callback_context; - /* encode frame */ - ENCODE_FRAME_STATE encode_frame_state; - /* configuration */ uint32_t max_frame_size; } FRAME_CODEC_INSTANCE; @@ -92,12 +82,11 @@ } else { - result = amqpalloc_malloc(sizeof(FRAME_CODEC_INSTANCE)); + result = malloc(sizeof(FRAME_CODEC_INSTANCE)); /* Codes_SRS_FRAME_CODEC_01_022: [If allocating memory for the frame_codec instance fails, frame_codec_create shall return NULL.] */ if (result != NULL) { /* Codes_SRS_FRAME_CODEC_01_021: [frame_codec_create shall create a new instance of frame_codec and return a non-NULL handle to it on success.] */ - result->encode_frame_state = ENCODE_FRAME_STATE_IDLE; result->receive_frame_state = RECEIVE_FRAME_STATE_FRAME_SIZE; result->on_frame_codec_error = on_frame_codec_error; result->on_frame_codec_error_callback_context = callback_context; @@ -124,11 +113,11 @@ singlylinkedlist_destroy(frame_codec_data->subscription_list); if (frame_codec_data->receive_frame_bytes != NULL) { - amqpalloc_free(frame_codec_data->receive_frame_bytes); + free(frame_codec_data->receive_frame_bytes); } /* Codes_SRS_FRAME_CODEC_01_023: [frame_codec_destroy shall free all resources associated with a frame_codec instance.] */ - amqpalloc_free(frame_codec); + free(frame_codec); } } @@ -144,9 +133,7 @@ /* Codes_SRS_FRAME_CODEC_01_081: [If a frame being decoded already has a size bigger than the max_frame_size argument then frame_codec_set_max_frame_size shall return a non-zero value and the previous frame size shall be kept.] */ ((max_frame_size < frame_codec_data->receive_frame_size) && (frame_codec_data->receive_frame_state != RECEIVE_FRAME_STATE_FRAME_SIZE)) || /* Codes_SRS_FRAME_CODEC_01_097: [Setting a frame size on a frame_codec that had a decode error shall fail.] */ - (frame_codec_data->receive_frame_state == RECEIVE_FRAME_STATE_ERROR) || - /* Codes_SRS_FRAME_CODEC_01_098: [Setting a frame size on a frame_codec that had an encode error shall fail.] */ - (frame_codec_data->encode_frame_state == ENCODE_FRAME_STATE_ERROR)) + (frame_codec_data->receive_frame_state == RECEIVE_FRAME_STATE_ERROR)) { result = __FAILURE__; } @@ -290,7 +277,7 @@ frame_codec_data->receive_frame_pos = 0; /* Codes_SRS_FRAME_CODEC_01_102: [frame_codec_receive_bytes shall allocate memory to hold the frame_body bytes.] */ - frame_codec_data->receive_frame_bytes = (unsigned char*)amqpalloc_malloc(frame_codec_data->receive_frame_size - 6); + frame_codec_data->receive_frame_bytes = (unsigned char*)malloc(frame_codec_data->receive_frame_size - 6); if (frame_codec_data->receive_frame_bytes == NULL) { /* Codes_SRS_FRAME_CODEC_01_101: [If the memory for the frame_body bytes cannot be allocated, frame_codec_receive_bytes shall fail and return a non-zero value.] */ @@ -348,7 +335,7 @@ /* Codes_SRS_FRAME_CODEC_01_006: [The treatment of this area depends on the frame type.] */ /* Codes_SRS_FRAME_CODEC_01_100: [If the frame body size is 0, the frame_body pointer passed to on_frame_received shall be NULL.] */ frame_codec_data->receive_frame_subscription->on_frame_received(frame_codec_data->receive_frame_subscription->callback_context, frame_codec_data->receive_frame_bytes, frame_codec_data->type_specific_size, NULL, 0); - amqpalloc_free(frame_codec_data->receive_frame_bytes); + free(frame_codec_data->receive_frame_bytes); frame_codec_data->receive_frame_bytes = NULL; } @@ -393,7 +380,7 @@ /* Codes_SRS_FRAME_CODEC_01_006: [The treatment of this area depends on the frame type.] */ /* Codes_SRS_FRAME_CODEC_01_099: [A pointer to the frame_body bytes shall also be passed to the on_frame_received.] */ frame_codec_data->receive_frame_subscription->on_frame_received(frame_codec_data->receive_frame_subscription->callback_context, frame_codec_data->receive_frame_bytes, frame_codec_data->type_specific_size, frame_codec_data->receive_frame_bytes + frame_codec_data->type_specific_size, frame_body_size); - amqpalloc_free(frame_codec_data->receive_frame_bytes); + free(frame_codec_data->receive_frame_bytes); frame_codec_data->receive_frame_bytes = NULL; } @@ -452,7 +439,7 @@ else { /* add a new subscription */ - subscription = (SUBSCRIPTION*)amqpalloc_malloc(sizeof(SUBSCRIPTION)); + subscription = (SUBSCRIPTION*)malloc(sizeof(SUBSCRIPTION)); /* Codes_SRS_FRAME_CODEC_01_037: [If any failure occurs while performing the subscribe operation, frame_codec_subscribe shall return a non-zero value.] */ if (subscription == NULL) { @@ -467,7 +454,7 @@ /* Codes_SRS_FRAME_CODEC_01_037: [If any failure occurs while performing the subscribe operation, frame_codec_subscribe shall return a non-zero value.] */ if (singlylinkedlist_add(frame_codec_data->subscription_list, subscription) == NULL) { - amqpalloc_free(subscription); + free(subscription); result = __FAILURE__; } else @@ -512,7 +499,7 @@ } else { - amqpalloc_free(subscription); + free(subscription); if (singlylinkedlist_remove(frame_codec_data->subscription_list, list_item) != 0) { /* Codes_SRS_FRAME_CODEC_01_041: [If any failure occurs while performing the unsubscribe operation, frame_codec_unsubscribe shall return a non-zero value.] */ @@ -536,17 +523,25 @@ FRAME_CODEC_INSTANCE* frame_codec_data = (FRAME_CODEC_INSTANCE*)frame_codec; - /* Codes_SRS_FRAME_CODEC_01_044: [If the argument frame_codec is NULL, frame_codec_encode_frame shall return a non-zero value.] */ + /* Codes_SRS_FRAME_CODEC_01_044: [If any of arguments `frame_codec` or `on_bytes_encoded` is NULL, `frame_codec_encode_frame` shall return a non-zero value.] */ if ((frame_codec == NULL) || + (on_bytes_encoded == NULL) || /* Codes_SRS_FRAME_CODEC_01_091: [If the argument type_specific_size is greater than 0 and type_specific_bytes is NULL, frame_codec_encode_frame shall return a non-zero value.] */ ((type_specific_size > 0) && (type_specific_bytes == NULL)) || /* Codes_SRS_FRAME_CODEC_01_092: [If type_specific_size is too big to allow encoding the frame according to the AMQP ISO then frame_codec_encode_frame shall return a non-zero value.] */ - (type_specific_size > MAX_TYPE_SPECIFIC_SIZE) || - (frame_codec_data->encode_frame_state == ENCODE_FRAME_STATE_ERROR)) + (type_specific_size > MAX_TYPE_SPECIFIC_SIZE)) { + LogError("Bad arguments: frame_codec = %p, on_bytes_encoded = %p, type_specific_size = %u, type_specific_bytes = %p", + frame_codec, on_bytes_encoded, (unsigned int)type_specific_size, type_specific_bytes); result = __FAILURE__; } - else + else if ((payloads == NULL) && (payload_count > 0)) + { + /* Codes_SRS_FRAME_CODEC_01_107: [If the argument `payloads` is NULL and `payload_count` is non-zero, `frame_codec_encode_frame` shall return a non-zero value.]*/ + LogError("NULL payloads argument with non-zero payload count"); + result = __FAILURE__; + } + else { /* round up to the 4 bytes for doff */ /* Codes_SRS_FRAME_CODEC_01_067: [The value of the data offset is an unsigned, 8-bit integer specifying a count of 4-byte words.] */ @@ -562,67 +557,102 @@ for (i = 0; i < payload_count; i++) { + /* Codes_SRS_FRAME_CODEC_01_110: [ If the `bytes` member of a payload entry is NULL, `frame_codec_encode_frame` shall fail and return a non-zero value. ] */ + if ((payloads[i].bytes == NULL) || + /* Codes_SRS_FRAME_CODEC_01_111: [ If the `length` member of a payload entry is 0, `frame_codec_encode_frame` shall fail and return a non-zero value. ] */ + (payloads[i].length == 0)) + { + break; + } + frame_body_size += payloads[i].length; } - /* Codes_SRS_FRAME_CODEC_01_063: [This is an unsigned 32-bit integer that MUST contain the total frame size of the frame header, extended header, and frame body.] */ - frame_size = frame_body_size + frame_body_offset; - - if (frame_size > frame_codec_data->max_frame_size) + if (i < payload_count) { - /* Codes_SRS_FRAME_CODEC_01_095: [If the frame_size needed for the frame is bigger than the maximum frame size, frame_codec_encode_frame shall fail and return a non-zero value.] */ + LogError("Bad payload entry"); result = __FAILURE__; } else { - /* Codes_SRS_FRAME_CODEC_01_042: [frame_codec_encode_frame encodes the header and type specific bytes of a frame that has frame_payload_size bytes.] */ - /* Codes_SRS_FRAME_CODEC_01_055: [Frames are divided into three distinct areas: a fixed width frame header, a variable width extended header, and a variable width frame body.] */ - /* Codes_SRS_FRAME_CODEC_01_056: [frame header The frame header is a fixed size (8 byte) structure that precedes each frame.] */ - /* Codes_SRS_FRAME_CODEC_01_057: [The frame header includes mandatory information necessary to parse the rest of the frame including size and type information.] */ - /* Codes_SRS_FRAME_CODEC_01_058: [extended header The extended header is a variable width area preceding the frame body.] */ - /* Codes_SRS_FRAME_CODEC_01_059: [This is an extension point defined for future expansion.] */ - /* Codes_SRS_FRAME_CODEC_01_060: [The treatment of this area depends on the frame type.]*/ - /* Codes_SRS_FRAME_CODEC_01_062: [SIZE Bytes 0-3 of the frame header contain the frame size.] */ /* Codes_SRS_FRAME_CODEC_01_063: [This is an unsigned 32-bit integer that MUST contain the total frame size of the frame header, extended header, and frame body.] */ - /* Codes_SRS_FRAME_CODEC_01_064: [The frame is malformed if the size is less than the size of the frame header (8 bytes).] */ - unsigned char frame_header[6]; + frame_size = frame_body_size + frame_body_offset; - frame_header[0] = (frame_size >> 24) & 0xFF; - frame_header[1] = (frame_size >> 16) & 0xFF; - frame_header[2] = (frame_size >> 8) & 0xFF; - frame_header[3] = frame_size & 0xFF; - /* Codes_SRS_FRAME_CODEC_01_065: [DOFF Byte 4 of the frame header is the data offset.] */ - frame_header[4] = doff; - /* Codes_SRS_FRAME_CODEC_01_069: [TYPE Byte 5 of the frame header is a type code.] */ - frame_header[5] = type; - - /* Codes_SRS_FRAME_CODEC_01_088: [Encoded bytes shall be passed to the on_bytes_encoded callback.] */ - on_bytes_encoded(callback_context, frame_header, sizeof(frame_header), ((frame_body_size + type_specific_bytes + padding_byte_count) == 0) ? true : false); + if (frame_size > frame_codec_data->max_frame_size) + { + /* Codes_SRS_FRAME_CODEC_01_095: [If the frame_size needed for the frame is bigger than the maximum frame size, frame_codec_encode_frame shall fail and return a non-zero value.] */ + LogError("Encoded frame size exceeds the maximum allowed frame size"); + result = __FAILURE__; + } + else + { + /* Codes_SRS_FRAME_CODEC_01_108: [ Memory shall be allocated to hold the entire frame. ]*/ + unsigned char* encoded_frame = (unsigned char*)malloc(frame_size); + if (encoded_frame == NULL) + { + /* Codes_SRS_FRAME_CODEC_01_109: [ If allocating memory fails, `frame_codec_encode_frame` shall fail and return a non-zero value. ]*/ + LogError("Cannot allocate memory for frame"); + result = __FAILURE__; + } + else + { + /* Codes_SRS_FRAME_CODEC_01_042: [frame_codec_encode_frame encodes the header, type specific bytes and frame payload of a frame that has frame_payload_size bytes.]*/ + /* Codes_SRS_FRAME_CODEC_01_055: [Frames are divided into three distinct areas: a fixed width frame header, a variable width extended header, and a variable width frame body.] */ + /* Codes_SRS_FRAME_CODEC_01_056: [frame header The frame header is a fixed size (8 byte) structure that precedes each frame.] */ + /* Codes_SRS_FRAME_CODEC_01_057: [The frame header includes mandatory information necessary to parse the rest of the frame including size and type information.] */ + /* Codes_SRS_FRAME_CODEC_01_058: [extended header The extended header is a variable width area preceding the frame body.] */ + /* Codes_SRS_FRAME_CODEC_01_059: [This is an extension point defined for future expansion.] */ + /* Codes_SRS_FRAME_CODEC_01_060: [The treatment of this area depends on the frame type.]*/ + /* Codes_SRS_FRAME_CODEC_01_062: [SIZE Bytes 0-3 of the frame header contain the frame size.] */ + /* Codes_SRS_FRAME_CODEC_01_063: [This is an unsigned 32-bit integer that MUST contain the total frame size of the frame header, extended header, and frame body.] */ + /* Codes_SRS_FRAME_CODEC_01_064: [The frame is malformed if the size is less than the size of the frame header (8 bytes).] */ + unsigned char frame_header[6]; + size_t current_pos = 0; - /* Codes_SRS_FRAME_CODEC_01_088: [Encoded bytes shall be passed to the on_bytes_encoded callback.] */ - if (type_specific_size > 0) - { - on_bytes_encoded(callback_context, type_specific_bytes, type_specific_size, ((frame_body_size + padding_byte_count) == 0) ? true : false); - } + frame_header[0] = (frame_size >> 24) & 0xFF; + frame_header[1] = (frame_size >> 16) & 0xFF; + frame_header[2] = (frame_size >> 8) & 0xFF; + frame_header[3] = frame_size & 0xFF; + /* Codes_SRS_FRAME_CODEC_01_065: [DOFF Byte 4 of the frame header is the data offset.] */ + frame_header[4] = doff; + /* Codes_SRS_FRAME_CODEC_01_069: [TYPE Byte 5 of the frame header is a type code.] */ + frame_header[5] = type; - /* send padding bytes */ - /* Codes_SRS_FRAME_CODEC_01_090: [If the type_specific_size - 2 does not divide by 4, frame_codec_encode_frame shall pad the type_specific bytes with zeroes so that type specific data is according to the AMQP ISO.] */ - unsigned char padding_bytes[] = { 0x00, 0x00, 0x00 }; + (void)memcpy(encoded_frame, frame_header, sizeof(frame_header)); + current_pos += sizeof(frame_header); + + if (type_specific_size > 0) + { + (void)memcpy(encoded_frame + current_pos, type_specific_bytes, type_specific_size); + current_pos += type_specific_size; + } + + /* send padding bytes */ + /* Codes_SRS_FRAME_CODEC_01_090: [If the type_specific_size - 2 does not divide by 4, frame_codec_encode_frame shall pad the type_specific bytes with zeroes so that type specific data is according to the AMQP ISO.] */ + unsigned char padding_bytes[] = { 0x00, 0x00, 0x00 }; - /* Codes_SRS_FRAME_CODEC_01_088: [Encoded bytes shall be passed to the on_bytes_encoded callback.] */ - if (padding_byte_count > 0) - { - on_bytes_encoded(callback_context, padding_bytes, padding_byte_count, (payload_count == 0) ? true : false); - } + if (padding_byte_count > 0) + { + (void)memcpy(encoded_frame + current_pos, padding_bytes, padding_byte_count); + current_pos += padding_byte_count; + } - for (i = 0; i < payload_count; i++) - { - /* Codes_SRS_FRAME_CODEC_01_088: [Encoded bytes shall be passed to the on_bytes_encoded callback.] */ - on_bytes_encoded(callback_context, payloads[i].bytes, payloads[i].length, (i == payload_count - 1) ? true : false); + /* Codes_SRS_FRAME_CODEC_01_106: [All payloads shall be encoded in order as part of the frame.] */ + for (i = 0; i < payload_count; i++) + { + (void)memcpy(encoded_frame + current_pos, payloads[i].bytes, payloads[i].length); + current_pos += payloads[i].length; + } + + /* Codes_SRS_FRAME_CODEC_01_088: [Encoded bytes shall be passed to the `on_bytes_encoded` callback in a single call, while setting the `encode complete` argument to true.] */ + on_bytes_encoded(callback_context, encoded_frame, frame_size, true); + + free(encoded_frame); + + /* Codes_SRS_FRAME_CODEC_01_043: [On success it shall return 0.] */ + result = 0; + } } - - /* Codes_SRS_FRAME_CODEC_01_043: [On success it shall return 0.] */ - result = 0; } }