A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
6:641a9672db08
Parent:
5:ae49385aff34
Child:
7:9e9ab3b0efef
--- a/session.c	Fri Jul 01 10:42:48 2016 -0700
+++ b/session.c	Fri Jul 29 15:58:39 2016 -0700
@@ -46,9 +46,12 @@
 	handle handle_max;
 	uint32_t remote_incoming_window;
 	uint32_t remote_outgoing_window;
-	unsigned char is_underlying_connection_open : 1;
+	int is_underlying_connection_open : 1;
 } SESSION_INSTANCE;
 
+#define UNDERLYING_CONNECTION_NOT_OPEN 0
+#define UNDERLYING_CONNECTION_OPEN -1
+
 static void session_set_state(SESSION_INSTANCE* session_instance, SESSION_STATE session_state)
 {
 	uint64_t i;
@@ -664,7 +667,7 @@
 			result->remote_incoming_window = 0;
 			result->remote_outgoing_window = 0;
 			result->previous_session_state = SESSION_STATE_UNMAPPED;
-			result->is_underlying_connection_open = 0;
+			result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN;
 			result->session_state = SESSION_STATE_UNMAPPED;
 			result->on_link_attached = on_link_attached;
 			result->on_link_attached_callback_context = callback_context;
@@ -713,7 +716,7 @@
 			result->remote_incoming_window = 0;
 			result->remote_outgoing_window = 0;
 			result->previous_session_state = SESSION_STATE_UNMAPPED;
-			result->is_underlying_connection_open = 0;
+			result->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN;
 			result->session_state = SESSION_STATE_UNMAPPED;
 			result->on_link_attached = on_link_attached;
 			result->on_link_attached_callback_context = callback_context;
@@ -769,12 +772,12 @@
 			{
 				if (connection_open(session_instance->connection) != 0)
 				{
-					session_instance->is_underlying_connection_open = 0;
+					session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_NOT_OPEN;
 					result = __LINE__;
 				}
 				else
 				{
-					session_instance->is_underlying_connection_open = 1;
+					session_instance->is_underlying_connection_open = UNDERLYING_CONNECTION_OPEN;
 					result = 0;
 				}
 			}
@@ -1268,7 +1271,6 @@
 	}
 	else
 	{
-		LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
 		AMQP_VALUE disposition_performative_value = amqpvalue_create_disposition(disposition);
 		if (disposition_performative_value == NULL)
 		{
@@ -1358,207 +1360,220 @@
 		}
 		else
 		{
-			uint32_t payload_size = 0;
+			size_t payload_size = 0;
 			size_t i;
 
 			for (i = 0; i < payload_count; i++)
 			{
-				payload_size += payloads[i].length;
-			}
+                if ((payloads[i].length > UINT32_MAX) ||
+                    (payload_size + payloads[i].length < payload_size))
+                {
+                    break;
+                }
+
+                payload_size += payloads[i].length;
+            }
 
-			if (session_instance->remote_incoming_window == 0)
-			{
-				result = SESSION_SEND_TRANSFER_BUSY;
-			}
-			else
-			{
-				/* Codes_SRS_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */
-				/* Codes_SRS_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */
-				*delivery_id = session_instance->next_outgoing_id;
-				if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) ||
-					(transfer_set_delivery_id(transfer, *delivery_id) != 0) ||
-                    (transfer_set_more(transfer, false) != 0))
-				{
-					/* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
-					result = SESSION_SEND_TRANSFER_ERROR;
-				}
-				else
-				{
-					AMQP_VALUE transfer_value;
+            if ((i < payload_count) ||
+                (payload_size > UINT32_MAX))
+            {
+                result = SESSION_SEND_TRANSFER_ERROR;
+            }
+            else
+            {
+                if (session_instance->remote_incoming_window == 0)
+                {
+                    result = SESSION_SEND_TRANSFER_BUSY;
+                }
+                else
+                {
+                    /* Codes_SRS_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */
+                    /* Codes_SRS_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */
+                    *delivery_id = session_instance->next_outgoing_id;
+                    if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) ||
+                        (transfer_set_delivery_id(transfer, *delivery_id) != 0) ||
+                        (transfer_set_more(transfer, false) != 0))
+                    {
+                        /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
+                        result = SESSION_SEND_TRANSFER_ERROR;
+                    }
+                    else
+                    {
+                        AMQP_VALUE transfer_value;
 
-					transfer_value = amqpvalue_create_transfer(transfer);
-					if (transfer_value == NULL)
-					{
-						/* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
-						result = SESSION_SEND_TRANSFER_ERROR;
-					}
-					else
-					{
-						uint32_t available_frame_size;
-						size_t encoded_size;
-
-						if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) ||
-							(amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0))
-						{
-							result = SESSION_SEND_TRANSFER_ERROR;
-						}
-						else
-						{
-							uint32_t payload_size = 0;
-							size_t i;
+                        transfer_value = amqpvalue_create_transfer(transfer);
+                        if (transfer_value == NULL)
+                        {
+                            /* Codes_SRS_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
+                            result = SESSION_SEND_TRANSFER_ERROR;
+                        }
+                        else
+                        {
+                            uint32_t available_frame_size;
+                            size_t encoded_size;
 
-							for (i = 0; i < payload_count; i++)
-							{
-								payload_size += payloads[i].length;
-							}
-
-							available_frame_size -= encoded_size;
-							available_frame_size -= 8;
+                            if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) ||
+                                (amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0))
+                            {
+                                result = SESSION_SEND_TRANSFER_ERROR;
+                            }
+                            else
+                            {
+                                payload_size = 0;
 
-							if (available_frame_size >= payload_size)
-							{
-								/* Codes_SRS_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */
-								if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0)
-								{
-									/* Codes_SRS_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */
-									result = SESSION_SEND_TRANSFER_ERROR;
-								}
-								else
-								{
-									/* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
-									session_instance->next_outgoing_id++;
-									session_instance->remote_incoming_window--;
-									session_instance->outgoing_window--;
+                                for (i = 0; i < payload_count; i++)
+                                {
+                                    payload_size += payloads[i].length;
+                                }
+
+                                available_frame_size -= (uint32_t)encoded_size;
+                                available_frame_size -= 8;
 
-									/* Codes_SRS_SESSION_01_053: [On success, session_send_transfer shall return 0.] */
-									result = SESSION_SEND_TRANSFER_OK;
-								}
-							}
-							else
-							{
-								size_t current_payload_index = 0;
-								uint32_t current_payload_pos = 0;
+                                if (available_frame_size >= payload_size)
+                                {
+                                    /* Codes_SRS_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */
+                                    if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0)
+                                    {
+                                        /* Codes_SRS_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */
+                                        result = SESSION_SEND_TRANSFER_ERROR;
+                                    }
+                                    else
+                                    {
+                                        /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
+                                        session_instance->next_outgoing_id++;
+                                        session_instance->remote_incoming_window--;
+                                        session_instance->outgoing_window--;
 
-								/* break it down into different deliveries */
-								while (payload_size > 0)
-								{
-									uint32_t transfer_frame_payload_count = 0;
-									uint32_t current_transfer_frame_payload_size = payload_size;
-									uint32_t byte_counter;
-									size_t temp_current_payload_index = current_payload_index;
-									uint32_t temp_current_payload_pos = current_payload_pos;
-									AMQP_VALUE multi_transfer_amqp_value;
-									bool more;
+                                        /* Codes_SRS_SESSION_01_053: [On success, session_send_transfer shall return 0.] */
+                                        result = SESSION_SEND_TRANSFER_OK;
+                                    }
+                                }
+                                else
+                                {
+                                    size_t current_payload_index = 0;
+                                    uint32_t current_payload_pos = 0;
 
-									if (current_transfer_frame_payload_size > available_frame_size)
-									{
-										current_transfer_frame_payload_size = available_frame_size;
-									}
+                                    /* break it down into different deliveries */
+                                    while (payload_size > 0)
+                                    {
+                                        uint32_t transfer_frame_payload_count = 0;
+                                        uint32_t current_transfer_frame_payload_size = (uint32_t)payload_size;
+                                        uint32_t byte_counter;
+                                        size_t temp_current_payload_index = current_payload_index;
+                                        uint32_t temp_current_payload_pos = current_payload_pos;
+                                        AMQP_VALUE multi_transfer_amqp_value;
+                                        bool more;
+
+                                        if (current_transfer_frame_payload_size > available_frame_size)
+                                        {
+                                            current_transfer_frame_payload_size = available_frame_size;
+                                        }
 
-									if (available_frame_size >= payload_size)
-									{
-										more = false;
-									}
-									else
-									{
-										more = true;
-									}
+                                        if (available_frame_size >= payload_size)
+                                        {
+                                            more = false;
+                                        }
+                                        else
+                                        {
+                                            more = true;
+                                        }
 
-									if (transfer_set_more(transfer, more) != 0)
-									{
-										break;
-									}
+                                        if (transfer_set_more(transfer, more) != 0)
+                                        {
+                                            break;
+                                        }
 
-									multi_transfer_amqp_value = amqpvalue_create_transfer(transfer);
-									if (multi_transfer_amqp_value == NULL)
-									{
-										break;
-									}
+                                        multi_transfer_amqp_value = amqpvalue_create_transfer(transfer);
+                                        if (multi_transfer_amqp_value == NULL)
+                                        {
+                                            break;
+                                        }
 
-									byte_counter = current_transfer_frame_payload_size;
-									while (byte_counter > 0)
-									{
-										if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter)
-										{
-											/* more data than we need */
-											temp_current_payload_pos += byte_counter;
-											byte_counter = 0;
-										}
-										else
-										{
-											byte_counter -= payloads[temp_current_payload_index].length - temp_current_payload_pos;
-											temp_current_payload_index++;
-											temp_current_payload_pos = 0;
-										}
-									}
+                                        byte_counter = current_transfer_frame_payload_size;
+                                        while (byte_counter > 0)
+                                        {
+                                            if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter)
+                                            {
+                                                /* more data than we need */
+                                                temp_current_payload_pos += byte_counter;
+                                                byte_counter = 0;
+                                            }
+                                            else
+                                            {
+                                                byte_counter -= (uint32_t)payloads[temp_current_payload_index].length - temp_current_payload_pos;
+                                                temp_current_payload_index++;
+                                                temp_current_payload_pos = 0;
+                                            }
+                                        }
 
-									transfer_frame_payload_count = temp_current_payload_index - current_payload_index + 1;
-									PAYLOAD* transfer_frame_payloads = (PAYLOAD*)amqpalloc_malloc(transfer_frame_payload_count * sizeof(PAYLOAD));
-									if (transfer_frame_payloads == NULL)
-									{
-										amqpvalue_destroy(multi_transfer_amqp_value);
-										break;
-									}
+                                        transfer_frame_payload_count = (uint32_t)(temp_current_payload_index - current_payload_index + 1);
+                                        PAYLOAD* transfer_frame_payloads = (PAYLOAD*)amqpalloc_malloc(transfer_frame_payload_count * sizeof(PAYLOAD));
+                                        if (transfer_frame_payloads == NULL)
+                                        {
+                                            amqpvalue_destroy(multi_transfer_amqp_value);
+                                            break;
+                                        }
 
-									/* copy data */
-									byte_counter = current_transfer_frame_payload_size;
-									transfer_frame_payload_count = 0;
+                                        /* copy data */
+                                        byte_counter = current_transfer_frame_payload_size;
+                                        transfer_frame_payload_count = 0;
 
-									while (byte_counter > 0)
-									{
-										if (payloads[current_payload_index].length - current_payload_pos > byte_counter)
-										{
-											/* more data than we need */
-											transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
-											transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter;
-											current_payload_pos += byte_counter;
-											byte_counter = 0;
-										}
-										else
-										{
-											/* copy entire payload and move to the next */
-											transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
-											transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos;
-											byte_counter -= payloads[current_payload_index].length - current_payload_pos;
-											current_payload_index++;
-											current_payload_pos = 0;
-										}
+                                        while (byte_counter > 0)
+                                        {
+                                            if (payloads[current_payload_index].length - current_payload_pos > byte_counter)
+                                            {
+                                                /* more data than we need */
+                                                transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
+                                                transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter;
+                                                current_payload_pos += byte_counter;
+                                                byte_counter = 0;
+                                            }
+                                            else
+                                            {
+                                                /* copy entire payload and move to the next */
+                                                transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
+                                                transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos;
+                                                byte_counter -= (uint32_t)payloads[current_payload_index].length - current_payload_pos;
+                                                current_payload_index++;
+                                                current_payload_pos = 0;
+                                            }
 
-										transfer_frame_payload_count++;
-									}
+                                            transfer_frame_payload_count++;
+                                        }
 
-									if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count, on_send_complete, callback_context) != 0)
-									{
-										amqpalloc_free(transfer_frame_payloads);
-										amqpvalue_destroy(multi_transfer_amqp_value);
-										break;
-									}
+                                        if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count, on_send_complete, callback_context) != 0)
+                                        {
+                                            amqpalloc_free(transfer_frame_payloads);
+                                            amqpvalue_destroy(multi_transfer_amqp_value);
+                                            break;
+                                        }
 
-									amqpalloc_free(transfer_frame_payloads);
-									amqpvalue_destroy(multi_transfer_amqp_value);
-									payload_size -= current_transfer_frame_payload_size;
-								}
+                                        amqpalloc_free(transfer_frame_payloads);
+                                        amqpvalue_destroy(multi_transfer_amqp_value);
+                                        payload_size -= current_transfer_frame_payload_size;
+                                    }
 
-								if (payload_size > 0)
-								{
-									result = SESSION_SEND_TRANSFER_ERROR;
-								}
-								else
-								{
-									/* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
-									session_instance->next_outgoing_id++;
-									session_instance->remote_incoming_window--;
-									session_instance->outgoing_window--;
+                                    if (payload_size > 0)
+                                    {
+                                        result = SESSION_SEND_TRANSFER_ERROR;
+                                    }
+                                    else
+                                    {
+                                        /* Codes_SRS_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
+                                        session_instance->next_outgoing_id++;
+                                        session_instance->remote_incoming_window--;
+                                        session_instance->outgoing_window--;
 
-									result = SESSION_SEND_TRANSFER_OK;
-								}
-							}
-						}
+                                        result = SESSION_SEND_TRANSFER_OK;
+                                    }
+                                }
+                            }
 
-						amqpvalue_destroy(transfer_value);
-					}
-				}
-			}
+                            amqpvalue_destroy(transfer_value);
+                        }
+                    }
+                }
+            }
 		}
 	}