A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

cbs.c

Committer:
AzureIoTClient
Date:
2017-03-24
Revision:
21:f9c433d8e6ca
Parent:
19:000ab4e6a2c1
Child:
22:524bded3f7a8

File content as of revision 21:f9c433d8e6ca:

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

#include <stdlib.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include "azure_c_shared_utility/optimize_size.h"
#include "azure_c_shared_utility/gballoc.h"
#include "azure_c_shared_utility/singlylinkedlist.h"
#include "azure_uamqp_c/cbs.h"
#include "azure_uamqp_c/amqp_management.h"
#include "azure_uamqp_c/session.h"

typedef enum CBS_STATE_TAG
{
    CBS_STATE_CLOSED,
    CBS_STATE_OPENING,
    CBS_STATE_OPEN,
    CBS_STATE_ERROR
} CBS_STATE;

typedef struct CBS_OPERATION_TAG
{
    ON_CBS_OPERATION_COMPLETE on_cbs_operation_complete;
    void* on_cbs_operation_complete_context;
    SINGLYLINKEDLIST_HANDLE pending_operations;
} CBS_OPERATION;

typedef struct CBS_INSTANCE_TAG
{
    AMQP_MANAGEMENT_HANDLE amqp_management;
    CBS_STATE cbs_state;
    ON_CBS_OPEN_COMPLETE on_cbs_open_complete;
    void* on_cbs_open_complete_context;
    ON_CBS_ERROR on_cbs_error;
    void* on_cbs_error_context;
    SINGLYLINKEDLIST_HANDLE pending_operations;
} CBS_INSTANCE;

static int add_string_key_value_pair_to_map(AMQP_VALUE map, const char* key, const char* value)
{
    int result;

    AMQP_VALUE key_value = amqpvalue_create_string(key);
    if (key_value == NULL)
    {
        /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
        LogError("Failed creating value for property key %s", key);
        result = __FAILURE__;
    }
    else
    {
        AMQP_VALUE value_value = amqpvalue_create_string(value);
        if (value_value == NULL)
        {
            /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
            LogError("Failed creating value for property value %s", value);
            result = __FAILURE__;
        }
        else
        {
            if (amqpvalue_set_map_value(map, key_value, value_value) != 0)
            {
                /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                LogError("Failed inserting key/value pair in the map");
                result = __FAILURE__;
            }
            else
            {
                result = 0;
            }

            amqpvalue_destroy(value_value);
        }

        amqpvalue_destroy(key_value);
    }

    return result;
}

static void on_underlying_amqp_management_state_changed(void* context, AMQP_MANAGEMENT_STATE new_amqp_management_state, AMQP_MANAGEMENT_STATE previous_amqp_management_state)
{
    if (context == NULL)
    {
        /* Codes_SRS_CBS_01_068: [ When `on_amqp_management_state_changed` is called with NULL `context`, it shall do nothing. ]*/
        LogError("on_underlying_amqp_management_state_changed called with NULL context");
    }
    else
    {
        CBS_HANDLE cbs = (CBS_HANDLE)context;

        /* Codes_SRS_CBS_01_069: [ If there is a transition detected to a new state the following shall be performed: ]*/
        if (new_amqp_management_state != previous_amqp_management_state)
        {
            switch (cbs->cbs_state)
            {
            default:
                LogError("AMQP management state transition while CBS instance is in unknown state");
                break;

            case CBS_STATE_CLOSED:
                LogError("AMQP management state transition while CBS instance is CLOSED");
                break;

            case CBS_STATE_ERROR:
            {
                LogError("AMQP management state transition while CBS instance is in ERROR");
                break;
            }
            case CBS_STATE_OPENING:
            {
                switch (new_amqp_management_state)
                {
                default:
                    LogError("Unknown AMQP management state");
                case AMQP_MANAGEMENT_STATE_ERROR:
                    /* Codes_SRS_CBS_01_073: [ If CBS is opening and the new state is not `AMQP_MANAGEMENT_STATE_OPEN` the callback `on_cbs_open_complete` shall be called with `CBS_OPEN_ERROR` and the `on_cbs_open_complete_context` shall be passed as argument. ]*/
                    cbs->cbs_state = CBS_STATE_CLOSED;
                    cbs->on_cbs_open_complete(cbs->on_cbs_open_complete_context, CBS_OPEN_ERROR);
                    break;

                case AMQP_MANAGEMENT_STATE_OPENING:
                    /* do nothing */
                    break;

                case AMQP_MANAGEMENT_STATE_OPEN:
                    /* Codes_SRS_CBS_01_070: [ If CBS is opening and the new state is `AMQP_MANAGEMENT_STATE_OPEN` the callback `on_cbs_open_complete` shall be called with `CBS_OPEN_OK` and the `on_cbs_open_complete_context` shall be passed as argument. ]*/
                    cbs->cbs_state = CBS_STATE_OPEN;
                    cbs->on_cbs_open_complete(cbs->on_cbs_open_complete_context, CBS_OPEN_OK);
                    break;
                }
                break;
            }
            case CBS_STATE_OPEN:
            {
                switch (new_amqp_management_state)
                {
                default:
                    LogError("Unknown AMQP management state");
                    /* Codes_SRS_CBS_01_074: [ If CBS is open and the new state is not `AMQP_MANAGEMENT_STATE_OPEN` the callback `on_cbs_error` shall be called and the `on_cbs_error_context` shall be passed as argument. ]*/
                case AMQP_MANAGEMENT_STATE_ERROR:
                case AMQP_MANAGEMENT_STATE_OPENING:
                    cbs->cbs_state = CBS_STATE_ERROR;
                    cbs->on_cbs_error(cbs->on_cbs_error_context);
                    break;

                case AMQP_MANAGEMENT_STATE_OPEN:
                    LogError("Unexpected transition to OPEN while CBS is already OPEN");
                    break;
                }
                break;
            }
        }
        }
    }
}

static void on_amqp_management_operation_complete(void* context, OPERATION_RESULT operation_result, unsigned int status_code, const char* status_description)
{
    if (context == NULL)
    {
        /* Codes_SRS_CBS_01_091: [ When `on_amqp_management_operation_complete` is called with a NULL context it shall do nothing. ]*/
        LogError("on_amqp_management_operation_complete called with NULL context");
    }
    else
    {
        /* Codes_SRS_CBS_01_103: [ The `context` shall be used to obtain the pending operation information stored in the pending operations linked list by calling `singlylinkedlist_item_get_value`. ]*/
        CBS_OPERATION* cbs_operation = (CBS_OPERATION*)singlylinkedlist_item_get_value((LIST_ITEM_HANDLE)context);
        CBS_OPERATION_RESULT cbs_operation_result;

        if (cbs_operation == NULL)
        {
            LogError("NULL cbs_operation");
        }
        else
        {
            switch (operation_result)
            {
            default:
                cbs_operation_result = CBS_OPERATION_RESULT_CBS_ERROR;
                break;

            case OPERATION_RESULT_OPERATION_FAILED:
                /* Tests_SRS_CBS_01_094: [ When `on_amqp_management_operation_complete` is called with `OPERATION_RESULT_OPERATION_FAILED`, the associated cbs operation complete callback shall be called with `CBS_OPERATION_RESULT_OPERATION_FAILED` and passing the `on_cbs_put_token_complete_context` as the context argument. ]*/
                cbs_operation_result = CBS_OPERATION_RESULT_OPERATION_FAILED;
                break;

            case OPERATION_RESULT_ERROR:
                /* Tests_SRS_CBS_01_093: [ When `on_amqp_management_operation_complete` is called with `OPERATION_RESULT_ERROR`, the associated cbs operation complete callback shall be called with `CBS_OPERATION_RESULT_CBS_ERROR` and passing the `on_cbs_put_token_complete_context` as the context argument. ]*/
                cbs_operation_result = CBS_OPERATION_RESULT_CBS_ERROR;
                break;

            case OPERATION_RESULT_OK:
                /* Codes_SRS_CBS_01_092: [ When `on_amqp_management_operation_complete` is called with `OPERATION_RESULT_OK`, the associated cbs operation complete callback shall be called with `CBS_OPERATION_RESULT_OK` and passing the `on_cbs_put_token_complete_context` as the context argument. ]*/
                cbs_operation_result = CBS_OPERATION_RESULT_OK;
                break;
            }

            /* Codes_SRS_CBS_01_095: [ `status_code` and `status_description` shall be passed as they are to the cbs operation complete callback. ]*/
            /* Codes_SRS_CBS_01_014: [ The response message has the following application-properties: ]*/
            /* Codes_SRS_CBS_01_013: [ status-code	No	int	HTTP response code [RFC2616]. ]*/
            /* Codes_SRS_CBS_01_015: [ status-description	Yes	string	Description of the status. ]*/
            /* Codes_SRS_CBS_01_016: [ The body of the message MUST be empty. ]*/
            /* Codes_SRS_CBS_01_026: [ The response message has the following application-properties: ]*/
            /* Codes_SRS_CBS_01_027: [ status-code	Yes	int	HTTP response code [RFC2616]. ]*/
            /* Codes_SRS_CBS_01_028: [ status-description	No	string	Description of the status. ]*/
            /* Codes_SRS_CBS_01_029: [ The body of the message MUST be empty. ]*/
            cbs_operation->on_cbs_operation_complete(cbs_operation->on_cbs_operation_complete_context, cbs_operation_result, status_code, status_description);

            /* Codes_SRS_CBS_01_102: [ The pending operation shall be removed from the pending operations list by calling `singlylinkedlist_remove`. ]*/
            if (singlylinkedlist_remove(cbs_operation->pending_operations, (LIST_ITEM_HANDLE)context) != 0)
            {
                LogError("Failed removing operation from the pending list");
            }

            /* Codes_SRS_CBS_01_096: [ The `context` for the operation shall also be freed. ]*/
            free(cbs_operation);
        }
    }
}

CBS_HANDLE cbs_create(SESSION_HANDLE session)
{
    CBS_INSTANCE* result;

    if (session == NULL)
    {
        /* Codes_SRS_CBS_01_033: [** If `session` is NULL then `cbs_create` shall fail and return NULL. ]*/
        LogError("NULL session handle");
        result = NULL;
    }
    else
    {
        result = (CBS_INSTANCE*)malloc(sizeof(CBS_INSTANCE));
        if (result == NULL)
        {
            /* Codes_SRS_CBS_01_076: [ If allocating memory for the new handle fails, `cbs_create` shall fail and return NULL. ]*/
            LogError("Cannot allocate memory for cbs instance.");
        }
        else
        {
            /* Codes_SRS_CBS_01_097: [ `cbs_create` shall create a singly linked list for pending operations by calling `singlylinkedlist_create`. ]*/
            result->pending_operations = singlylinkedlist_create();
            if (result->pending_operations == NULL)
            {
                /* Codes_SRS_CBS_01_101: [ If `singlylinkedlist_create` fails, `cbs_create` shall fail and return NULL. ]*/
                LogError("Cannot allocate pending operations list.");
                free(result);
                result = NULL;
            }
            else
            {
                /* Codes_SRS_CBS_01_034: [ `cbs_create` shall create an AMQP management handle by calling `amqpmanagement_create`. ]*/
                /* Codes_SRS_CBS_01_002: [ Tokens are communicated between AMQP peers by sending specially-formatted AMQP messages to the Claims-based Security Node. ]*/
                /* Codes_SRS_CBS_01_003: [ The mechanism follows the scheme defined in the AMQP Management specification [AMQPMAN]. ]*/
                result->amqp_management = amqpmanagement_create(session, "$cbs", on_underlying_amqp_management_state_changed, result);
                if (result->amqp_management == NULL)
                {
                    free(result);
                    result = NULL;
                }
                else
                {
                    result->cbs_state = CBS_STATE_CLOSED;
                }
            }
        }
    }
    
    /* Codes_SRS_CBS_01_001: [ `cbs_create` shall create a new CBS instance and on success return a non-NULL handle to it. ]*/
    return result;
}

void cbs_destroy(CBS_HANDLE cbs)
{
    if (cbs == NULL)
    {
        /* Codes_SRS_CBS_01_037: [ If `cbs` is NULL, `cbs_destroy` shall do nothing. ]*/
        LogError("NULL cbs handle");
    }
    else
    {
        LIST_ITEM_HANDLE first_pending_operation;

        /* Codes_SRS_CBS_01_100: [ If the CBS instance is not closed, all actions performed by `cbs_close` shall be performed. ]*/
        if (cbs->cbs_state != CBS_STATE_CLOSED)
        {
            (void)amqpmanagement_close(cbs->amqp_management);
        }

        /* Codes_SRS_CBS_01_036: [ `cbs_destroy` shall free all resources associated with the handle `cbs`. ]*/
        /* Codes_SRS_CBS_01_038: [ `cbs_destroy` shall free the AMQP management handle created in `cbs_create` by calling `amqpmanagement_destroy`. ]*/
        amqpmanagement_destroy(cbs->amqp_management);

        /* Codes_SRS_CBS_01_099: [ All pending operations shall be freed. ]*/
        while ((first_pending_operation = singlylinkedlist_get_head_item(cbs->pending_operations)) != NULL)
        {
            CBS_OPERATION* pending_operation = (CBS_OPERATION*)singlylinkedlist_item_get_value(first_pending_operation);
            if (pending_operation != NULL)
            {
                pending_operation->on_cbs_operation_complete(pending_operation->on_cbs_operation_complete_context, CBS_OPERATION_RESULT_BECAUSE_DESTROY, 0, NULL);
                free(pending_operation);
            }

            singlylinkedlist_remove(cbs->pending_operations, first_pending_operation);
        }

        /* Codes_SRS_CBS_01_098: [ `cbs_destroy` shall free the pending operations list by calling `singlylinkedlist_destroy`. ]*/
        singlylinkedlist_destroy(cbs->pending_operations);
        free(cbs);
    }
}

int cbs_open_async(CBS_HANDLE cbs, ON_CBS_OPEN_COMPLETE on_cbs_open_complete, void* on_cbs_open_complete_context, ON_CBS_ERROR on_cbs_error, void* on_cbs_error_context)
{
    int result;

    /* Codes_SRS_CBS_01_042: [ `on_cbs_open_complete_context` and `on_cbs_error_context` shall be allowed to be NULL. ]*/
    if ((cbs == NULL) ||
        (on_cbs_open_complete == NULL) ||
        (on_cbs_error == NULL))
    {
        /* Codes_SRS_CBS_01_040: [ If any of the arguments `cbs`, `on_cbs_open_complete` or `on_cbs_error` is NULL, then `cbs_open_async` shall fail and return a non-zero value. ]*/
        LogError("Bad arguments: cbs = %p, on_cbs_open_complete = %p, on_cbs_error = %p",
            cbs, on_cbs_open_complete, on_cbs_error);
        result = __FAILURE__;
    }
    else if (cbs->cbs_state != CBS_STATE_CLOSED)
    {
        /* Codes_SRS_CBS_01_043: [ `cbs_open_async` while already open or opening shall fail and return a non-zero value. ]*/
        LogError("cbs instance already open");
        result = __FAILURE__;
    }
    else
    {
        /* Codes_SRS_CBS_01_078: [ The cbs instance shall be considered OPENING until the `on_amqp_management_state_changed` is called by the AMQP management instance indicating that the open has completed (succesfull or not). ]*/
        cbs->cbs_state = CBS_STATE_OPENING;
        cbs->on_cbs_open_complete = on_cbs_open_complete;
        cbs->on_cbs_open_complete_context = on_cbs_open_complete_context;
        cbs->on_cbs_error = on_cbs_error;
        cbs->on_cbs_error_context = on_cbs_error_context;

        /* Codes_SRS_CBS_01_039: [ `cbs_open_async` shall open the cbs communication by calling `amqpmanagement_open` on the AMQP management handle created in `cbs_create`. ]*/
        if (amqpmanagement_open(cbs->amqp_management) != 0)
        {
            /* Codes_SRS_CBS_01_041: [ If `amqpmanagement_open` fails, shall fail and return a non-zero value. ]*/
            result = __FAILURE__;
        }
        else
        {
            /* Codes_SRS_CBS_01_077: [ On success, `cbs_open_async` shall return 0. ]*/
            result = 0;
        }
    }

    return result;
}

int cbs_close(CBS_HANDLE cbs)
{
    int result;

    if (cbs == NULL)
    {
        /* Codes_SRS_CBS_01_045: [ If the argument `cbs` is NULL, `cbs_close` shall fail and return a non-zero value. ]*/
        LogError("NULL cbs handle");
        result = __FAILURE__;
    }
    else if (cbs->cbs_state == CBS_STATE_CLOSED)
    {
        /* Codes_SRS_CBS_01_047: [ `cbs_close` when closed shall fail and return a non-zero value. ]*/
        /* Codes_SRS_CBS_01_048: [ `cbs_close` when not opened shall fail and return a non-zero value. ]*/
        LogError("Already closed");
        result = __FAILURE__;
    }
    else
    {
        /* Codes_SRS_CBS_01_044: [ `cbs_close` shall close the CBS instance by calling `amqpmanagement_close` on the underlying AMQP management handle. ]*/
        if (amqpmanagement_close(cbs->amqp_management) != 0)
        {
            /* Codes_SRS_CBS_01_046: [ If `amqpmanagement_close` fails, `cbs_close` shall fail and return a non-zero value. ]*/
            LogError("Failed closing AMQP management instance");
            result = __FAILURE__;
        }
        else
        {
            if (cbs->cbs_state == CBS_STATE_OPENING)
            {
                /* Codes_SRS_CBS_01_079: [ If `cbs_close` is called while OPENING, the `on_cbs_open_complete` callback shall be called with `CBS_OPEN_CANCELLED`, while passing the `on_cbs_open_complete_context` as argument. ]*/
                cbs->on_cbs_open_complete(cbs->on_cbs_open_complete_context, CBS_OPEN_CANCELLED);
            }

            cbs->cbs_state = CBS_STATE_CLOSED;

            /* Codes_SRS_CBS_01_080: [ On success, `cbs_close` shall return 0. ]*/
            result = 0;
        }
    }

    return result;
}

int cbs_put_token_async(CBS_HANDLE cbs, const char* type, const char* audience, const char* token, ON_CBS_OPERATION_COMPLETE on_cbs_put_token_complete, void* on_cbs_put_token_complete_context)
{
    int result;

    /* Codes_SRS_CBS_01_083: [ `on_cbs_put_token_complete_context` shall be allowed to be NULL. ]*/
    if ((cbs == NULL) ||
        (type == NULL) ||
        (audience == NULL) ||
        (token == NULL) ||
        (on_cbs_put_token_complete == NULL))
    {
        /* Codes_SRS_CBS_01_050: [ If any of the arguments `cbs`, `type`, `audience`, `token` or `on_cbs_put_token_complete` is NULL `cbs_put_token_async` shall fail and return a non-zero value. ]*/
        LogError("Bad arguments: cbs = %p, type = %p, audience = %p, token = %p, on_cbs_put_token_complete = %p",
            cbs, type, audience, token, on_cbs_put_token_complete);
        result = __FAILURE__;
    }
    else if ((cbs->cbs_state == CBS_STATE_CLOSED) ||
        (cbs->cbs_state == CBS_STATE_ERROR))
    {
        /* Codes_SRS_CBS_01_058: [ If `cbs_put_token_async` is called when the CBS instance is not yet open or in error, it shall fail and return a non-zero value. ]*/
        LogError("put token called while closed or in error");
        result = __FAILURE__;
    }
    else
    {
        /* Codes_SRS_CBS_01_049: [ `cbs_put_token_async` shall construct a request message for the `put-token` operation. ]*/
        MESSAGE_HANDLE message = message_create();
        if (message == NULL)
        {
            /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
            LogError("message_create failed");
            result = __FAILURE__;
        }
        else
        {
            AMQP_VALUE token_value = amqpvalue_create_string(token);
            if (token_value == NULL)
            {
                /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                LogError("Failed creating token AMQP value");
                result = __FAILURE__;
            }
            else
            {
                /* Codes_SRS_CBS_01_009: [ The body of the message MUST contain the token. ]*/
                if (message_set_body_amqp_value(message, token_value) != 0)
                {
                    /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                    LogError("Failed setting the token in the message body");
                    result = __FAILURE__;
                }
                else
                {
                    AMQP_VALUE application_properties = amqpvalue_create_map();
                    if (application_properties == NULL)
                    {
                        /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                        LogError("Failed creating application properties map");
                        result = __FAILURE__;
                    }
                    else
                    {
                        if (add_string_key_value_pair_to_map(application_properties, "name", audience) != 0)
                        {
                            result = __FAILURE__;
                        }
                        else
                        {
                            if (message_set_application_properties(message, application_properties) != 0)
                            {
                                /* Codes_SRS_CBS_01_072: [ If constructing the message fails, `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                                LogError("Failed setting message application properties");
                                result = __FAILURE__;
                            }
                            else
                            {
                                CBS_OPERATION* cbs_operation = (CBS_OPERATION*)malloc(sizeof(CBS_OPERATION));
                                if (cbs_operation == NULL)
                                {
                                    LogError("Failed allocating CBS operation instance");
                                    result = __FAILURE__;
                                }
                                else
                                {
                                    LIST_ITEM_HANDLE list_item;

                                    cbs_operation->on_cbs_operation_complete = on_cbs_put_token_complete;
                                    cbs_operation->on_cbs_operation_complete_context = on_cbs_put_token_complete_context;
                                    cbs_operation->pending_operations = cbs->pending_operations;

                                    list_item = singlylinkedlist_add(cbs->pending_operations, cbs_operation);
                                    if (list_item == NULL)
                                    {
                                        free(cbs_operation);
                                        LogError("Failed adding pending operation to list");
                                        result = __FAILURE__;
                                    }
                                    else
                                    {
                                        /* Codes_SRS_CBS_01_051: [ `cbs_put_token_async` shall start the AMQP management operation by calling `amqpmanagement_start_operation`, while passing to it: ]*/
                                        /* Codes_SRS_CBS_01_052: [ The `amqp_management` argument shall be the one for the AMQP management instance created in `cbs_create`. ]*/
                                        /* Codes_SRS_CBS_01_053: [ The `operation` argument shall be `put-token`. ]*/
                                        /* Codes_SRS_CBS_01_054: [ The `type` argument shall be set to the `type` argument. ]*/
                                        /* Codes_SRS_CBS_01_055: [ The `locales` argument shall be set to NULL. ]*/
                                        /* Codes_SRS_CBS_01_056: [ The `message` argument shall be the message constructed earlier according to the CBS spec. ]*/
                                        /* Codes_SRS_CBS_01_057: [ The arguments `on_operation_complete` and `context` shall be set to a callback that is to be called by the AMQP management module when the operation is complete. ]*/
                                        /* Codes_SRS_CBS_01_005: [ operation	No	string	"put-token" ]*/
                                        /* Codes_SRS_CBS_01_006: [ Type	No	string	The type of the token being put, e.g., "amqp:jwt". ]*/
                                        /* Codes_SRS_CBS_01_007: [ name	No	string	The "audience" to which the token applies. ]*/
                                        if (amqpmanagement_start_operation(cbs->amqp_management, "put-token", type, NULL, message, on_amqp_management_operation_complete, list_item) != 0)
                                        {
                                            singlylinkedlist_remove(cbs->pending_operations, list_item);
                                            free(cbs_operation);
                                            /* Codes_SRS_CBS_01_084: [ If `amqpmanagement_start_operation` fails `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                                            LogError("Failed starting AMQP management operation");
                                            result = __FAILURE__;
                                        }
                                        else
                                        {
                                            /* Codes_SRS_CBS_01_081: [ On success `cbs_put_token_async` shall return 0. ]*/
                                            result = 0;
                                        }
                                    }
                                }
                            }
                        }

                        amqpvalue_destroy(application_properties);
                    }

                    amqpvalue_destroy(token_value);
                }
            }

            message_destroy(message);
        }
    }

    return result;
}

int cbs_delete_token_async(CBS_HANDLE cbs, const char* type, const char* audience, ON_CBS_OPERATION_COMPLETE on_cbs_delete_token_complete, void* on_cbs_delete_token_complete_context)
{
    int result;

    /* Codes_SRS_CBS_01_086: [ `on_cbs_delete_token_complete_context` shall be allowed to be NULL. ]*/
    if ((cbs == NULL) ||
        (type == NULL) ||
        (audience == NULL) ||
        (on_cbs_delete_token_complete == NULL))
    {
        /* Codes_SRS_CBS_01_060: [ If any of the arguments `cbs`, `type`, `audience` or `on_cbs_delete_token_complete` is NULL `cbs_put_token_async` shall fail and return a non-zero value. ]*/
        LogError("Bad arguments: cbs = %p, type = %p, audience = %p, on_cbs_delete_token_complete = %p",
            cbs, type, audience, on_cbs_delete_token_complete);
        result = __FAILURE__;
    }
    else if ((cbs->cbs_state == CBS_STATE_CLOSED) ||
        (cbs->cbs_state == CBS_STATE_ERROR))
    {
        /* Codes_SRS_CBS_01_067: [ If `cbs_delete_token_async` is called when the CBS instance is not yet open or in error, it shall fail and return a non-zero value. ]*/
        LogError("put token called while closed or in error");
        result = __FAILURE__;
    }
    else
    {
        /* Codes_SRS_CBS_01_025: [ The body of the message MUST be empty. ]*/
        /* Codes_SRS_CBS_01_059: [ `cbs_delete_token_async` shall construct a request message for the `delete-token` operation. ]*/
        MESSAGE_HANDLE message = message_create();
        if (message == NULL)
        {
            /* Codes_SRS_CBS_01_071: [ If constructing the message fails, `cbs_delete_token_async` shall fail and return a non-zero value. ]*/
            LogError("message_create failed");
            result = __FAILURE__;
        }
        else
        {
            AMQP_VALUE application_properties = amqpvalue_create_map();
            if (application_properties == NULL)
            {
                /* Codes_SRS_CBS_01_071: [ If constructing the message fails, `cbs_delete_token_async` shall fail and return a non-zero value. ]*/
                LogError("Failed creating application properties map");
                result = __FAILURE__;
            }
            else
            {
                if (add_string_key_value_pair_to_map(application_properties, "name", audience) != 0)
                {
                    result = __FAILURE__;
                }
                else
                {
                    /* Codes_SRS_CBS_01_021: [ The request message has the following application-properties: ]*/
                    if (message_set_application_properties(message, application_properties) != 0)
                    {
                        /* Codes_SRS_CBS_01_071: [ If constructing the message fails, `cbs_delete_token_async` shall fail and return a non-zero value. ]*/
                        LogError("Failed setting message application properties");
                        result = __FAILURE__;
                    }
                    else
                    {
                        CBS_OPERATION* cbs_operation = (CBS_OPERATION*)malloc(sizeof(CBS_OPERATION));
                        if (cbs_operation == NULL)
                        {
                            LogError("Failed allocating CBS operation instance");
                            result = __FAILURE__;
                        }
                        else
                        {
                            LIST_ITEM_HANDLE list_item;

                            cbs_operation->on_cbs_operation_complete = on_cbs_delete_token_complete;
                            cbs_operation->on_cbs_operation_complete_context = on_cbs_delete_token_complete_context;
                            cbs_operation->pending_operations = cbs->pending_operations;

                            list_item = singlylinkedlist_add(cbs->pending_operations, cbs_operation);
                            if (list_item == NULL)
                            {
                                free(cbs_operation);
                                LogError("Failed adding pending operation to list");
                                result = __FAILURE__;
                            }
                            else
                            {
                                /* Codes_SRS_CBS_01_061: [ `cbs_delete_token_async` shall start the AMQP management operation by calling `amqpmanagement_start_operation`, while passing to it: ]*/
                                /* Codes_SRS_CBS_01_085: [ The `amqp_management` argument shall be the one for the AMQP management instance created in `cbs_create`. ]*/
                                /* Codes_SRS_CBS_01_062: [ The `operation` argument shall be `delete-token`. ]*/
                                /* Codes_SRS_CBS_01_063: [ The `type` argument shall be set to the `type` argument. ]*/
                                /* Codes_SRS_CBS_01_064: [ The `locales` argument shall be set to NULL. ]*/
                                /* Codes_SRS_CBS_01_065: [ The `message` argument shall be the message constructed earlier according to the CBS spec. ]*/
                                /* Codes_SRS_CBS_01_066: [ The arguments `on_operation_complete` and `context` shall be set to a callback that is to be called by the AMQP management module when the operation is complete. ]*/
                                /* Codes_SRS_CBS_01_020: [ To instruct a peer to delete a token associated with a specific audience, a "delete-token" message can be sent to the CBS Node ]*/
                                /* Codes_SRS_CBS_01_022: [ operation	Yes	string	"delete-token" ]*/
                                /* Codes_SRS_CBS_01_023: [ Type	Yes	string	The type of the token being deleted, e.g., "amqp:jwt". ]*/
                                /* Codes_SRS_CBS_01_024: [ name	Yes	string	The "audience" of the token being deleted. ]*/
                                if (amqpmanagement_start_operation(cbs->amqp_management, "delete-token", type, NULL, message, on_amqp_management_operation_complete, list_item) != 0)
                                {
                                    /* Codes_SRS_CBS_01_087: [ If `amqpmanagement_start_operation` fails `cbs_put_token_async` shall fail and return a non-zero value. ]*/
                                    singlylinkedlist_remove(cbs->pending_operations, list_item);
                                    free(cbs_operation);
                                    LogError("Failed starting AMQP management operation");
                                    result = __FAILURE__;
                                }
                                else
                                {
                                    /* Codes_SRS_CBS_01_082: [ On success `cbs_delete_token_async` shall return 0. ]*/
                                    result = 0;
                                }
                            }
                        }
                    }
                }

                amqpvalue_destroy(application_properties);
            }

            message_destroy(message);
        }
    }
    return result;
}

int cbs_set_trace(CBS_HANDLE cbs, bool trace_on)
{
    int result;

    if (cbs == NULL)
    {
        /* Codes_SRS_CBS_01_090: [ If the argument `cbs` is NULL, `cbs_set_trace` shall fail and return a non-zero value. ]*/
        LogError("NULL cbs handle");
        result = __FAILURE__;
    }
    else
    {
        /* Codes_SRS_CBS_01_088: [ `cbs_set_trace` shall enable or disable tracing by calling `amqpmanagement_set_trace` to pass down the `trace_on` value. ]*/
        amqpmanagement_set_trace(cbs->amqp_management, trace_on);

        /* Codes_SRS_CBS_01_089: [ On success, `cbs_set_trace` shall return 0. ]*/
        result = 0;
    }

    return result;
}