A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

header_detect_io.c

Committer:
AzureIoTClient
Date:
2017-06-30
Revision:
28:add19eb7defa
Parent:
27:d74f1cea23e1
Child:
29:4a11413cf217

File content as of revision 28:add19eb7defa:

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

#include <stdlib.h>
#include "azure_c_shared_utility/optimize_size.h"
#include "azure_c_shared_utility/gballoc.h"
#include "azure_c_shared_utility/xlogging.h"
#include "azure_uamqp_c/header_detect_io.h"

typedef enum IO_STATE_TAG
{
    IO_STATE_NOT_OPEN,
    IO_STATE_OPENING_UNDERLYING_IO,
    IO_STATE_WAIT_FOR_HEADER,
    IO_STATE_OPEN,
    IO_STATE_CLOSING,
    IO_STATE_ERROR
} IO_STATE;

typedef struct HEADER_DETECT_IO_INSTANCE_TAG
{
    XIO_HANDLE underlying_io;
    size_t header_pos;
    IO_STATE io_state;
    ON_IO_OPEN_COMPLETE on_io_open_complete;
    ON_IO_CLOSE_COMPLETE on_io_close_complete;
    ON_IO_ERROR on_io_error;
    ON_BYTES_RECEIVED on_bytes_received;
    void* on_io_open_complete_context;
    void* on_io_close_complete_context;
    void* on_io_error_context;
    void* on_bytes_received_context;
} HEADER_DETECT_IO_INSTANCE;

static const unsigned char amqp_header[] = { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 };

static void indicate_error(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance)
{
    if (header_detect_io_instance->on_io_error != NULL)
    {
        header_detect_io_instance->on_io_error(header_detect_io_instance->on_io_error_context);
    }
}

static void indicate_open_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance, IO_OPEN_RESULT open_result)
{
    if (header_detect_io_instance->on_io_open_complete != NULL)
    {
        header_detect_io_instance->on_io_open_complete(header_detect_io_instance->on_io_open_complete_context, open_result);
    }
}

static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_instance)
{
    if (header_detect_io_instance->on_io_close_complete != NULL)
    {
        header_detect_io_instance->on_io_close_complete(header_detect_io_instance->on_io_close_complete_context);
    }
}

static void on_underlying_io_error(void* context);
static void on_send_complete_close(void* context, IO_SEND_RESULT send_result)
{
    (void)send_result;
    on_underlying_io_error(context);
}

static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size)
{
    HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;

    while (size > 0)
    {
        switch (header_detect_io_instance->io_state)
        {
        default:
            break;

        case IO_STATE_WAIT_FOR_HEADER:
            if (amqp_header[header_detect_io_instance->header_pos] != buffer[0])
            {
                /* Send expected header, then close as per spec.  We do not care if we fail */
                (void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context);

                header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
                indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                size = 0;
            }
            else
            {
                header_detect_io_instance->header_pos++;
                size--;
                buffer++;
                if (header_detect_io_instance->header_pos == sizeof(amqp_header))
                {
                    if (xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), NULL, NULL) != 0)
                    {
                        header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
                        indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                    }
                    else
                    {
                        header_detect_io_instance->io_state = IO_STATE_OPEN;
                        indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
                    }
                }
            }
            break;

        case IO_STATE_OPEN:
            header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size);
            size = 0;
            break;
        }
    }
}

static void on_underlying_io_close_complete(void* context)
{
    HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;

    switch (header_detect_io_instance->io_state)
    {
    default:
        break;

    case IO_STATE_CLOSING:
        header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
        indicate_close_complete(header_detect_io_instance);
        break;

    case IO_STATE_WAIT_FOR_HEADER:
    case IO_STATE_OPENING_UNDERLYING_IO:
        header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
        indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
        break;
    }
}

static void on_underlying_io_open_complete(void* context, IO_OPEN_RESULT open_result)
{
    HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;

    switch (header_detect_io_instance->io_state)
    {
    default:
        break;

    case IO_STATE_OPENING_UNDERLYING_IO:
        if (open_result == IO_OPEN_OK)
        {
            header_detect_io_instance->io_state = IO_STATE_WAIT_FOR_HEADER;
        }
        else
        {
            if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0)
            {
                header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
                indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
            }
        }

        break;
    }
}

static void on_underlying_io_error(void* context)
{
    HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;

    switch (header_detect_io_instance->io_state)
    {
    default:
        break;

    case IO_STATE_WAIT_FOR_HEADER:
    case IO_STATE_OPENING_UNDERLYING_IO:
        header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
        indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
        break;

    case IO_STATE_OPEN:
        header_detect_io_instance->io_state = IO_STATE_ERROR;
        indicate_error(header_detect_io_instance);
        break;
    }
}

CONCRETE_IO_HANDLE headerdetectio_create(void* io_create_parameters)
{
    HEADER_DETECT_IO_INSTANCE* result;

    if (io_create_parameters == NULL)
    {
        result = NULL;
    }
    else
    {
        HEADERDETECTIO_CONFIG* header_detect_io_config = (HEADERDETECTIO_CONFIG*)io_create_parameters;
        result = (HEADER_DETECT_IO_INSTANCE*)malloc(sizeof(HEADER_DETECT_IO_INSTANCE));
        if (result != NULL)
        {
            result->underlying_io = header_detect_io_config->underlying_io;
            result->on_io_open_complete = NULL;
            result->on_io_close_complete = NULL;
            result->on_io_error = NULL;
            result->on_bytes_received = NULL;
            result->on_io_open_complete_context = NULL;
            result->on_io_close_complete_context = NULL;
            result->on_io_error_context = NULL;
            result->on_bytes_received_context = NULL;

            result->io_state = IO_STATE_NOT_OPEN;
        }
    }

    return result;
}

void headerdetectio_destroy(CONCRETE_IO_HANDLE header_detect_io)
{
    if (header_detect_io != NULL)
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
        (void)headerdetectio_close(header_detect_io, NULL, NULL);
        xio_destroy(header_detect_io_instance->underlying_io);
        free(header_detect_io);
    }
}

int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context)
{
    int result;

    if (header_detect_io == NULL)
    {
        result = __FAILURE__;
    }
    else
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;

        if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN &&
            header_detect_io_instance->io_state != IO_STATE_OPEN)
        {
            result = __FAILURE__;
        }
        else
        {
            header_detect_io_instance->on_bytes_received = on_bytes_received;
            header_detect_io_instance->on_io_open_complete = on_io_open_complete;
            header_detect_io_instance->on_io_error = on_io_error;
            header_detect_io_instance->on_bytes_received_context = on_bytes_received_context;
            header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context;
            header_detect_io_instance->on_io_error_context = on_io_error_context;

            if (header_detect_io_instance->io_state == IO_STATE_OPEN)
            {
                indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
                result = 0;
            }
            else
            {
                header_detect_io_instance->header_pos = 0;
                header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO;

                if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
                {
                    result = __FAILURE__;
                }
                else
                {
                    result = 0;
                }
            }
        }
    }

    return result;
}

int headerdetectio_close(CONCRETE_IO_HANDLE header_detect_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context)
{
    int result;

    if (header_detect_io == NULL)
    {
        result = __FAILURE__;
    }
    else
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;

        if ((header_detect_io_instance->io_state == IO_STATE_NOT_OPEN) ||
            (header_detect_io_instance->io_state == IO_STATE_CLOSING))
        {
            result = __FAILURE__;
        }
        else
        {
            header_detect_io_instance->io_state = IO_STATE_CLOSING;
            header_detect_io_instance->on_io_close_complete = on_io_close_complete;
            header_detect_io_instance->on_io_close_complete_context = callback_context;

            if (xio_close(header_detect_io_instance->underlying_io, on_underlying_io_close_complete, header_detect_io_instance) != 0)
            {
                result = __FAILURE__;
            }
            else
            {
                result = 0;
            }
        }
    }

    return result;
}

int headerdetectio_send(CONCRETE_IO_HANDLE header_detect_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context)
{
    int result;

    if (header_detect_io == NULL)
    {
        result = __FAILURE__;
    }
    else
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;

        if (header_detect_io_instance->io_state != IO_STATE_OPEN)
        {
            result = __FAILURE__;
        }
        else
        {
            if (xio_send(header_detect_io_instance->underlying_io, buffer, size, on_send_complete, callback_context) != 0)
            {
                result = __FAILURE__;
            }
            else
            {
                result = 0;
            }
        }
    }

    return result;
}

void headerdetectio_dowork(CONCRETE_IO_HANDLE header_detect_io)
{
    if (header_detect_io != NULL)
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;

        if ((header_detect_io_instance->io_state != IO_STATE_NOT_OPEN) &&
            (header_detect_io_instance->io_state != IO_STATE_ERROR))
        {
            xio_dowork(header_detect_io_instance->underlying_io);
        }
    }
}

int headerdetectio_setoption(CONCRETE_IO_HANDLE header_detect_io, const char* optionName, const void* value)
{
    int result;

    if (header_detect_io == NULL)
    {
        result = __FAILURE__;
    }
    else
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;

        if (header_detect_io_instance->underlying_io == NULL)
        {
            result = __FAILURE__;
        }
        else
        {
            result = xio_setoption(header_detect_io_instance->underlying_io, optionName, value);
        }
    }

    return result;
}

/*this function will clone an option given by name and value*/
static void* headerdetectio_CloneOption(const char* name, const void* value)
{
    (void)name;
    (void)value;
    return NULL;
}

/*this function destroys an option previously created*/
static void headerdetectio_DestroyOption(const char* name, const void* value)
{
    (void)name;
    (void)value;
}

static OPTIONHANDLER_HANDLE headerdetectio_retrieveoptions(CONCRETE_IO_HANDLE handle)
{
    OPTIONHANDLER_HANDLE result;
    (void)handle;
    result = OptionHandler_Create(headerdetectio_CloneOption, headerdetectio_DestroyOption, headerdetectio_setoption);
    if (result == NULL)
    {
        LogError("unable to OptionHandler_Create");
        /*return as is*/
    }
    else
    {
        /*insert here work to add the options to "result" handle*/
    }
    return result;
}

static const IO_INTERFACE_DESCRIPTION header_detect_io_interface_description =
{
    headerdetectio_retrieveoptions,
    headerdetectio_create,
    headerdetectio_destroy,
    headerdetectio_open,
    headerdetectio_close,
    headerdetectio_send,
    headerdetectio_dowork,
    headerdetectio_setoption
};

const IO_INTERFACE_DESCRIPTION* headerdetectio_get_interface_description(void)
{
    return &header_detect_io_interface_description;
}