A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: header_detect_io.c
- Revision:
- 0:6ae2f7bca550
- Child:
- 1:eab586236bfe
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/header_detect_io.c Fri Apr 08 12:01:10 2016 -0700 @@ -0,0 +1,381 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +#include "azure_uamqp_c/header_detect_io.h" +#include "azure_uamqp_c/amqpalloc.h" + +typedef enum IO_STATE_TAG +{ + IO_STATE_NOT_OPEN, + IO_STATE_OPENING_UNDERLYING_IO, + IO_STATE_WAIT_FOR_HEADER, + IO_STATE_OPEN, + IO_STATE_CLOSING, + IO_STATE_ERROR +} IO_STATE; + +typedef struct HEADER_DETECT_IO_INSTANCE_TAG +{ + XIO_HANDLE underlying_io; + size_t header_pos; + IO_STATE io_state; + ON_IO_OPEN_COMPLETE on_io_open_complete; + ON_IO_CLOSE_COMPLETE on_io_close_complete; + ON_IO_ERROR on_io_error; + ON_BYTES_RECEIVED on_bytes_received; + void* on_io_open_complete_context; + void* on_io_close_complete_context; + void* on_io_error_context; + void* on_bytes_received_context; +} HEADER_DETECT_IO_INSTANCE; + +static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }; + +static void indicate_error(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) +{ + if (header_detect_io_instance->on_io_error != NULL) + { + header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context); + } +} + +static void indicate_open_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance, IO_OPEN_RESULT open_result) +{ + if (header_detect_io_instance->on_io_open_complete != NULL) + { + header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result); + } +} + +static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) +{ + if (header_detect_io_instance->on_io_close_complete != NULL) + { + header_detect_io_instance->on_io_close_complete(header_detect_io_instance->on_io_close_complete_context); + } +} + +static void on_underlying_io_error(void* context); +static void on_send_complete_close(void* context, IO_SEND_RESULT send_result) +{ + on_underlying_io_error(context); +} + +static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size) +{ + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + + while (size > 0) + { + switch (header_detect_io_instance->io_state) + { + default: + break; + + case IO_STATE_WAIT_FOR_HEADER: + if (amqp_header[header_detect_io_instance->header_pos] != buffer[0]) + { + /* Send expected header, then close as per spec. We do not care if we fail */ + (void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context); + + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + size = 0; + } + else + { + header_detect_io_instance->header_pos++; + size--; + buffer++; + if (header_detect_io_instance->header_pos == sizeof(amqp_header)) + { + if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0) + { + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + } + else + { + header_detect_io_instance->io_state = IO_STATE_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); + } + } + } + break; + + case IO_STATE_OPEN: + header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size); + size = 0; + break; + } + } +} + +static void on_underlying_io_close_complete(void* context) +{ + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + + switch (header_detect_io_instance->io_state) + { + default: + break; + + case IO_STATE_CLOSING: + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_close_complete(header_detect_io_instance); + break; + + case IO_STATE_WAIT_FOR_HEADER: + case IO_STATE_OPENING_UNDERLYING_IO: + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + break; + } +} + +static void on_underlying_io_open_complete(void* context, IO_OPEN_RESULT open_result) +{ + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + + switch (header_detect_io_instance->io_state) + { + default: + break; + + case IO_STATE_OPENING_UNDERLYING_IO: + if (open_result == IO_OPEN_OK) + { + header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER; + } + else + { + if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) + { + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + } + } + + break; + } +} + +static void on_underlying_io_error(void* context) +{ + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; + + switch (header_detect_io_instance->io_state) + { + default: + break; + + case IO_STATE_WAIT_FOR_HEADER: + case IO_STATE_OPENING_UNDERLYING_IO: + header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; + indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); + break; + + case IO_STATE_OPEN: + header_detect_io_instance->io_state = IO_STATE_ERROR; + indicate_error(header_detect_io_instance); + break; + } +} + +CONCRETE_IO_HANDLE headerdetectio_create(void* io_create_parameters, LOGGER_LOG logger_log) +{ + HEADER_DETECT_IO_INSTANCE* result; + + if (io_create_parameters == NULL) + { + result = NULL; + } + else + { + HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters; + result = (HEADER_DETECT_IO_INSTANCE*)amqpalloc_malloc(sizeof(HEADER_DETECT_IO_INSTANCE)); + if (result != NULL) + { + result->underlying_io = header_detect_io_config->underlying_io; + result->on_io_open_complete = NULL; + result->on_io_close_complete = NULL; + result->on_io_error = NULL; + result->on_bytes_received = NULL; + result->on_io_open_complete_context = NULL; + result->on_io_close_complete_context = NULL; + result->on_io_error_context = NULL; + result->on_bytes_received_context = NULL; + + result->io_state = IO_STATE_NOT_OPEN; + } + } + + return result; +} + +void headerdetectio_destroy(CONCRETE_IO_HANDLE header_detect_io) +{ + if (header_detect_io != NULL) + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + (void)headerdetectio_close(header_detect_io, NULL, NULL); + xio_destroy(header_detect_io_instance->underlying_io); + amqpalloc_free(header_detect_io); + } +} + +int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context) +{ + int result; + + if (header_detect_io == NULL) + { + result = __LINE__; + } + else + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + + if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN && + header_detect_io_instance->io_state != IO_STATE_OPEN) + { + result = __LINE__; + } + else + { + header_detect_io_instance->on_bytes_received = on_bytes_received; + header_detect_io_instance->on_io_open_complete = on_io_open_complete; + header_detect_io_instance->on_io_error = on_io_error; + header_detect_io_instance->on_bytes_received_context = on_bytes_received_context; + header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context; + header_detect_io_instance->on_io_error_context = on_io_error_context; + + if (header_detect_io_instance->io_state == IO_STATE_OPEN) + { + indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); + result = 0; + } + else + { + header_detect_io_instance->header_pos = 0; + header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO; + + if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0) + { + result = __LINE__; + } + else + { + result = 0; + } + } + } + } + + return result; +} + +int headerdetectio_close(CONCRETE_IO_HANDLE header_detect_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context) +{ + int result; + + if (header_detect_io == NULL) + { + result = __LINE__; + } + else + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + + if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) || + (header_detect_io_instance->io_state == IO_STATE_CLOSING)) + { + result = __LINE__; + } + else + { + header_detect_io_instance->io_state = IO_STATE_CLOSING; + header_detect_io_instance->on_io_close_complete = on_io_close_complete; + header_detect_io_instance->on_io_close_complete_context = callback_context; + + if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) + { + result = __LINE__; + } + else + { + result = 0; + } + } + } + + return result; +} + +int headerdetectio_send(CONCRETE_IO_HANDLE header_detect_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context) +{ + int result; + + if (header_detect_io == NULL) + { + result = __LINE__; + } + else + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + + if (header_detect_io_instance->io_state != IO_STATE_OPEN) + { + result = __LINE__; + } + else + { + if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0) + { + result = __LINE__; + } + else + { + result = 0; + } + } + } + + return result; +} + +void headerdetectio_dowork(CONCRETE_IO_HANDLE header_detect_io) +{ + if (header_detect_io != NULL) + { + HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; + + if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) && + (header_detect_io_instance->io_state != IO_STATE_ERROR)) + { + xio_dowork(header_detect_io_instance->underlying_io); + } + } +} + +int headerdetectio_setoption(CONCRETE_IO_HANDLE socket_io, const char* optionName, const void* value) +{ + return __LINE__; +} + + +static const IO_INTERFACE_DESCRIPTION header_detect_io_interface_description = +{ + headerdetectio_create, + headerdetectio_destroy, + headerdetectio_open, + headerdetectio_close, + headerdetectio_send, + headerdetectio_dowork, + headerdetectio_setoption +}; + +const IO_INTERFACE_DESCRIPTION* headerdetectio_get_interface_description(void) +{ + return &header_detect_io_interface_description; +}