A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
header_detect_io.c
- Committer:
- AzureIoTClient
- Date:
- 2016-04-24
- Revision:
- 1:eab586236bfe
- Parent:
- 0:6ae2f7bca550
- Child:
- 5:ae49385aff34
File content as of revision 1:eab586236bfe:
// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. #include "azure_uamqp_c/header_detect_io.h" #include "azure_uamqp_c/amqpalloc.h" typedef enum IO_STATE_TAG { IO_STATE_NOT_OPEN, IO_STATE_OPENING_UNDERLYING_IO, IO_STATE_WAIT_FOR_HEADER, IO_STATE_OPEN, IO_STATE_CLOSING, IO_STATE_ERROR } IO_STATE; typedef struct HEADER_DETECT_IO_INSTANCE_TAG { XIO_HANDLE underlying_io; size_t header_pos; IO_STATE io_state; ON_IO_OPEN_COMPLETE on_io_open_complete; ON_IO_CLOSE_COMPLETE on_io_close_complete; ON_IO_ERROR on_io_error; ON_BYTES_RECEIVED on_bytes_received; void* on_io_open_complete_context; void* on_io_close_complete_context; void* on_io_error_context; void* on_bytes_received_context; } HEADER_DETECT_IO_INSTANCE; static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }; static void indicate_error(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) { if (header_detect_io_instance->on_io_error != NULL) { header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context); } } static void indicate_open_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance, IO_OPEN_RESULT open_result) { if (header_detect_io_instance->on_io_open_complete != NULL) { header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result); } } static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance) { if (header_detect_io_instance->on_io_close_complete != NULL) { header_detect_io_instance->on_io_close_complete(header_detect_io_instance->on_io_close_complete_context); } } static void on_underlying_io_error(void* context); static void on_send_complete_close(void* context, IO_SEND_RESULT send_result) { on_underlying_io_error(context); } static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size) { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; while (size > 0) { switch (header_detect_io_instance->io_state) { default: break; case IO_STATE_WAIT_FOR_HEADER: if (amqp_header[header_detect_io_instance->header_pos] != buffer[0]) { /* Send expected header, then close as per spec. We do not care if we fail */ (void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context); header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); size = 0; } else { header_detect_io_instance->header_pos++; size--; buffer++; if (header_detect_io_instance->header_pos == sizeof(amqp_header)) { if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0) { header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); } else { header_detect_io_instance->io_state = IO_STATE_OPEN; indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); } } } break; case IO_STATE_OPEN: header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size); size = 0; break; } } } static void on_underlying_io_close_complete(void* context) { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; switch (header_detect_io_instance->io_state) { default: break; case IO_STATE_CLOSING: header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_close_complete(header_detect_io_instance); break; case IO_STATE_WAIT_FOR_HEADER: case IO_STATE_OPENING_UNDERLYING_IO: header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); break; } } static void on_underlying_io_open_complete(void* context, IO_OPEN_RESULT open_result) { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; switch (header_detect_io_instance->io_state) { default: break; case IO_STATE_OPENING_UNDERLYING_IO: if (open_result == IO_OPEN_OK) { header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER; } else { if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) { header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); } } break; } } static void on_underlying_io_error(void* context) { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context; switch (header_detect_io_instance->io_state) { default: break; case IO_STATE_WAIT_FOR_HEADER: case IO_STATE_OPENING_UNDERLYING_IO: header_detect_io_instance->io_state = IO_STATE_NOT_OPEN; indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR); break; case IO_STATE_OPEN: header_detect_io_instance->io_state = IO_STATE_ERROR; indicate_error(header_detect_io_instance); break; } } CONCRETE_IO_HANDLE headerdetectio_create(void* io_create_parameters, LOGGER_LOG logger_log) { HEADER_DETECT_IO_INSTANCE* result; if (io_create_parameters == NULL) { result = NULL; } else { HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters; result = (HEADER_DETECT_IO_INSTANCE*)amqpalloc_malloc(sizeof(HEADER_DETECT_IO_INSTANCE)); if (result != NULL) { result->underlying_io = header_detect_io_config->underlying_io; result->on_io_open_complete = NULL; result->on_io_close_complete = NULL; result->on_io_error = NULL; result->on_bytes_received = NULL; result->on_io_open_complete_context = NULL; result->on_io_close_complete_context = NULL; result->on_io_error_context = NULL; result->on_bytes_received_context = NULL; result->io_state = IO_STATE_NOT_OPEN; } } return result; } void headerdetectio_destroy(CONCRETE_IO_HANDLE header_detect_io) { if (header_detect_io != NULL) { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; (void)headerdetectio_close(header_detect_io, NULL, NULL); xio_destroy(header_detect_io_instance->underlying_io); amqpalloc_free(header_detect_io); } } int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context) { int result; if (header_detect_io == NULL) { result = __LINE__; } else { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN && header_detect_io_instance->io_state != IO_STATE_OPEN) { result = __LINE__; } else { header_detect_io_instance->on_bytes_received = on_bytes_received; header_detect_io_instance->on_io_open_complete = on_io_open_complete; header_detect_io_instance->on_io_error = on_io_error; header_detect_io_instance->on_bytes_received_context = on_bytes_received_context; header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context; header_detect_io_instance->on_io_error_context = on_io_error_context; if (header_detect_io_instance->io_state == IO_STATE_OPEN) { indicate_open_complete(header_detect_io_instance, IO_OPEN_OK); result = 0; } else { header_detect_io_instance->header_pos = 0; header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO; if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0) { result = __LINE__; } else { result = 0; } } } } return result; } int headerdetectio_close(CONCRETE_IO_HANDLE header_detect_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context) { int result; if (header_detect_io == NULL) { result = __LINE__; } else { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) || (header_detect_io_instance->io_state == IO_STATE_CLOSING)) { result = __LINE__; } else { header_detect_io_instance->io_state = IO_STATE_CLOSING; header_detect_io_instance->on_io_close_complete = on_io_close_complete; header_detect_io_instance->on_io_close_complete_context = callback_context; if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0) { result = __LINE__; } else { result = 0; } } } return result; } int headerdetectio_send(CONCRETE_IO_HANDLE header_detect_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context) { int result; if (header_detect_io == NULL) { result = __LINE__; } else { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; if (header_detect_io_instance->io_state != IO_STATE_OPEN) { result = __LINE__; } else { if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0) { result = __LINE__; } else { result = 0; } } } return result; } void headerdetectio_dowork(CONCRETE_IO_HANDLE header_detect_io) { if (header_detect_io != NULL) { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) && (header_detect_io_instance->io_state != IO_STATE_ERROR)) { xio_dowork(header_detect_io_instance->underlying_io); } } } int headerdetectio_setoption(CONCRETE_IO_HANDLE header_detect_io, const char* optionName, const void* value) { int result; if (header_detect_io == NULL) { result = __LINE__; } else { HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io; if (header_detect_io_instance->underlying_io == NULL) { result = __LINE__; } else { result = xio_setoption(header_detect_io_instance->underlying_io, optionName, value); } } return result; } static const IO_INTERFACE_DESCRIPTION header_detect_io_interface_description = { headerdetectio_create, headerdetectio_destroy, headerdetectio_open, headerdetectio_close, headerdetectio_send, headerdetectio_dowork, headerdetectio_setoption }; const IO_INTERFACE_DESCRIPTION* headerdetectio_get_interface_description(void) { return &header_detect_io_interface_description; }