A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: header_detect_io.c
- Revision:
- 28:add19eb7defa
- Parent:
- 27:d74f1cea23e1
- Child:
- 29:4a11413cf217
diff -r d74f1cea23e1 -r add19eb7defa header_detect_io.c --- a/header_detect_io.c Fri Jun 02 15:53:07 2017 -0700 +++ b/header_detect_io.c Fri Jun 30 10:41:22 2017 -0700 @@ -9,25 +9,25 @@ typedef enum IO_STATE_TAG { - IO_STATE_NOT_OPEN, - IO_STATE_OPENING_UNDERLYING_IO, - IO_STATE_WAIT_FOR_HEADER, - IO_STATE_OPEN, - IO_STATE_CLOSING, - IO_STATE_ERROR + IO_STATE_NOT_OPEN, + IO_STATE_OPENING_UNDERLYING_IO, + IO_STATE_WAIT_FOR_HEADER, + IO_STATE_OPEN, + IO_STATE_CLOSING, + IO_STATE_ERROR } IO_STATE; typedef struct HEADER_DETECT_IO_INSTANCE_TAG { - XIO_HANDLE underlying_io; - size_t header_pos; - IO_STATE io_state; - ON_IO_OPEN_COMPLETE on_io_open_complete; - ON_IO_CLOSE_COMPLETE on_io_close_complete; - ON_IO_ERROR on_io_error; - ON_BYTES_RECEIVED on_bytes_received; - void* on_io_open_complete_context; - void* on_io_close_complete_context; + XIO_HANDLE underlying_io; + size_t header_pos; + IO_STATE io_state; + ON_IO_OPEN_COMPLETE on_io_open_complete; + ON_IO_CLOSE_COMPLETE on_io_close_complete; + ON_IO_ERROR on_io_error; + ON_BYTES_RECEIVED on_bytes_received; + void* on_io_open_complete_context; + void* on_io_close_complete_context; void* on_io_error_context; void* on_bytes_received_context; } HEADER_DETECT_IO_INSTANCE; @@ -36,18 +36,18 @@ static void indicate_error(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) { - if (header_detect_io_instance->on_io_error != NULL) - { - header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context); - } + if (header_detect_io_instance->on_io_error != NULL) + { + header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context); + } } static void indicate_open_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance, IO_OPEN_RESULT open_result) { - if (header_detect_io_instance->on_io_open_complete != NULL) - { - header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result); - } + if (header_detect_io_instance->on_io_open_complete != NULL) + { + header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result); + } } static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) @@ -67,189 +67,189 @@ static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size) { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; - while (size > 0) - { - switch (header_detect_io_instance->io_state) - { - default: - break; + while (size > 0) + { + switch (header_detect_io_instance->io_state) + { + default: + break; - case IO_STATE_WAIT_FOR_HEADER: - if (amqp_header[header_detect_io_instance->header_pos] != buffer[0]) - { + case IO_STATE_WAIT_FOR_HEADER: + if (amqp_header[header_detect_io_instance->header_pos] != buffer[0]) + { /* Send expected header, then close as per spec. We do not care if we fail */ (void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context); header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; - indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); - size = 0; - } - else - { - header_detect_io_instance->header_pos++; - size--; - buffer++; - if (header_detect_io_instance->header_pos == sizeof(amqp_header)) - { - if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0) - { - header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; - indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); - } - else - { - header_detect_io_instance->io_state = IO_STATE_OPEN; - indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); - } - } - } - break; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + size = 0; + } + else + { + header_detect_io_instance->header_pos++; + size--; + buffer++; + if (header_detect_io_instance->header_pos == sizeof(amqp_header)) + { + if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0) + { + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + } + else + { + header_detect_io_instance->io_state = IO_STATE_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); + } + } + } + break; - case IO_STATE_OPEN: - header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size); - size = 0; - break; - } - } + case IO_STATE_OPEN: + header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size); + size = 0; + break; + } + } } static void on_underlying_io_close_complete(void* context) { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; - switch (header_detect_io_instance->io_state) - { - default: - break; + switch (header_detect_io_instance->io_state) + { + default: + break; case IO_STATE_CLOSING: header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_close_complete(header_detect_io_instance); break; - case IO_STATE_WAIT_FOR_HEADER: - case IO_STATE_OPENING_UNDERLYING_IO: - header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; - indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); - break; - } + case IO_STATE_WAIT_FOR_HEADER: + case IO_STATE_OPENING_UNDERLYING_IO: + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + break; + } } static void on_underlying_io_open_complete(void* context, IO_OPEN_RESULT open_result) { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; - switch (header_detect_io_instance->io_state) - { - default: - break; + switch (header_detect_io_instance->io_state) + { + default: + break; - case IO_STATE_OPENING_UNDERLYING_IO: - if (open_result == IO_OPEN_OK) - { - header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER; - } - else - { - if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) - { - header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; - indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); - } - } + case IO_STATE_OPENING_UNDERLYING_IO: + if (open_result == IO_OPEN_OK) + { + header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER; + } + else + { + if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) + { + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + } + } - break; - } + break; + } } static void on_underlying_io_error(void* context) { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; - switch (header_detect_io_instance->io_state) - { - default: - break; + switch (header_detect_io_instance->io_state) + { + default: + break; - case IO_STATE_WAIT_FOR_HEADER: - case IO_STATE_OPENING_UNDERLYING_IO: - header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; - indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); - break; + case IO_STATE_WAIT_FOR_HEADER: + case IO_STATE_OPENING_UNDERLYING_IO: + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + break; - case IO_STATE_OPEN: - header_detect_io_instance->io_state = IO_STATE_ERROR; - indicate_error(header_detect_io_instance); - break; - } + case IO_STATE_OPEN: + header_detect_io_instance->io_state = IO_STATE_ERROR; + indicate_error(header_detect_io_instance); + break; + } } CONCRETE_IO_HANDLE headerdetectio_create(void* io_create_parameters) { - HEADER_DETECT_IO_INSTANCE* result; + HEADER_DETECT_IO_INSTANCE* result; - if (io_create_parameters == NULL) - { - result = NULL; - } - else - { - HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters; - result = (HEADER_DETECT_IO_INSTANCE*)malloc(sizeof(HEADER_DETECT_IO_INSTANCE)); - if (result != NULL) - { - result->underlying_io = header_detect_io_config->underlying_io; - result->on_io_open_complete = NULL; + if (io_create_parameters == NULL) + { + result = NULL; + } + else + { + HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters; + result = (HEADER_DETECT_IO_INSTANCE*)malloc(sizeof(HEADER_DETECT_IO_INSTANCE)); + if (result != NULL) + { + result->underlying_io = header_detect_io_config->underlying_io; + result->on_io_open_complete = NULL; result->on_io_close_complete = NULL; - result->on_io_error = NULL; - result->on_bytes_received = NULL; - result->on_io_open_complete_context = NULL; + result->on_io_error = NULL; + result->on_bytes_received = NULL; + result->on_io_open_complete_context = NULL; result->on_io_close_complete_context = NULL; result->on_io_error_context = NULL; result->on_bytes_received_context = NULL; - result->io_state = IO_STATE_NOT_OPEN; - } - } + result->io_state = IO_STATE_NOT_OPEN; + } + } - return result; + return result; } void headerdetectio_destroy(CONCRETE_IO_HANDLE header_detect_io) { - if (header_detect_io != NULL) - { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; - (void)headerdetectio_close(header_detect_io, NULL, NULL); - xio_destroy(header_detect_io_instance->underlying_io); - free(header_detect_io); - } + if (header_detect_io != NULL) + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + (void)headerdetectio_close(header_detect_io, NULL, NULL); + xio_destroy(header_detect_io_instance->underlying_io); + free(header_detect_io); + } } int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context) { - int result; + int result; - if (header_detect_io == NULL) - { - result = __FAILURE__; - } - else - { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + if (header_detect_io == NULL) + { + result = __FAILURE__; + } + else + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN && header_detect_io_instance->io_state != IO_STATE_OPEN) { result = __FAILURE__; } - else - { - header_detect_io_instance->on_bytes_received = on_bytes_received; - header_detect_io_instance->on_io_open_complete = on_io_open_complete; - header_detect_io_instance->on_io_error = on_io_error; + else + { + header_detect_io_instance->on_bytes_received = on_bytes_received; + header_detect_io_instance->on_io_open_complete = on_io_open_complete; + header_detect_io_instance->on_io_error = on_io_error; header_detect_io_instance->on_bytes_received_context = on_bytes_received_context; header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context; header_detect_io_instance->on_io_error_context = on_io_error_context; @@ -273,93 +273,93 @@ result = 0; } } - } - } + } + } - return result; + return result; } int headerdetectio_close(CONCRETE_IO_HANDLE header_detect_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context) { - int result; + int result; - if (header_detect_io == NULL) - { - result = __FAILURE__; - } - else - { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + if (header_detect_io == NULL) + { + result = __FAILURE__; + } + else + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; - if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) || - (header_detect_io_instance->io_state == IO_STATE_CLOSING)) - { - result = __FAILURE__; - } - else - { - header_detect_io_instance->io_state = IO_STATE_CLOSING; + if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) || + (header_detect_io_instance->io_state == IO_STATE_CLOSING)) + { + result = __FAILURE__; + } + else + { + header_detect_io_instance->io_state = IO_STATE_CLOSING; header_detect_io_instance->on_io_close_complete = on_io_close_complete; header_detect_io_instance->on_io_close_complete_context = callback_context; - if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) - { - result = __FAILURE__; - } - else - { - result = 0; - } - } - } + if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) + { + result = __FAILURE__; + } + else + { + result = 0; + } + } + } - return result; + return result; } int headerdetectio_send(CONCRETE_IO_HANDLE header_detect_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context) { - int result; + int result; - if (header_detect_io == NULL) - { - result = __FAILURE__; - } - else - { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + if (header_detect_io == NULL) + { + result = __FAILURE__; + } + else + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; - if (header_detect_io_instance->io_state != IO_STATE_OPEN) - { - result = __FAILURE__; - } - else - { - if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0) - { - result = __FAILURE__; - } - else - { - result = 0; - } - } - } + if (header_detect_io_instance->io_state != IO_STATE_OPEN) + { + result = __FAILURE__; + } + else + { + if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0) + { + result = __FAILURE__; + } + else + { + result = 0; + } + } + } - return result; + return result; } void headerdetectio_dowork(CONCRETE_IO_HANDLE header_detect_io) { - if (header_detect_io != NULL) - { - HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + if (header_detect_io != NULL) + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; - if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) && - (header_detect_io_instance->io_state != IO_STATE_ERROR)) - { - xio_dowork(header_detect_io_instance->underlying_io); - } - } + if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) && + (header_detect_io_instance->io_state != IO_STATE_ERROR)) + { + xio_dowork(header_detect_io_instance->underlying_io); + } + } } int headerdetectio_setoption(CONCRETE_IO_HANDLE header_detect_io, const char* optionName, const void* value) @@ -422,16 +422,16 @@ static const IO_INTERFACE_DESCRIPTION header_detect_io_interface_description = { headerdetectio_retrieveoptions, - headerdetectio_create, - headerdetectio_destroy, - headerdetectio_open, - headerdetectio_close, - headerdetectio_send, - headerdetectio_dowork, + headerdetectio_create, + headerdetectio_destroy, + headerdetectio_open, + headerdetectio_close, + headerdetectio_send, + headerdetectio_dowork, headerdetectio_setoption }; const IO_INTERFACE_DESCRIPTION* headerdetectio_get_interface_description(void) { - return &header_detect_io_interface_description; + return &header_detect_io_interface_description; }