A small memory footprint AMQP implimentation

Dependents:   iothub_client_sample_amqp remote_monitoring simplesample_amqp

Revision:
35:d0bed2404ee9
Parent:
30:0407b2db334c
Child:
38:7631b92cc772
--- a/connection.c	Sat Oct 21 20:12:19 2017 +0000
+++ b/connection.c	Fri Nov 03 13:18:57 2017 -0700
@@ -69,6 +69,8 @@
     uint16_t channel_max;
     milliseconds idle_timeout;
     milliseconds remote_idle_timeout;
+    milliseconds remote_idle_timeout_send_frame_millisecond;
+    double idle_timeout_empty_frame_send_ratio;
     tickcounter_ms_t last_frame_received_time;
     tickcounter_ms_t last_frame_sent_time;
 
@@ -816,7 +818,12 @@
                             }
                             else
                             {
-                                (void)open_get_idle_time_out(open_handle, &connection->remote_idle_timeout);
+                                if (open_get_idle_time_out(open_handle, &connection->remote_idle_timeout) == 0)
+                                {
+                                    /* since we obtained the remote_idle_timeout, compute at what millisecond we should send the empty frame */
+                                    connection->remote_idle_timeout_send_frame_millisecond = (milliseconds)(connection->idle_timeout_empty_frame_send_ratio * connection->remote_idle_timeout);
+                                }
+
                                 if ((open_get_max_frame_size(open_handle, &connection->remote_max_frame_size) != 0) ||
                                     /* Codes_SRS_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */
                                     (connection->remote_max_frame_size < 512))
@@ -1051,7 +1058,7 @@
 /* Codes_SRS_CONNECTION_22_002: [connection_create shall allow registering connections state and io error callbacks.] */
 CONNECTION_HANDLE connection_create2(XIO_HANDLE xio, const char* hostname, const char* container_id, ON_NEW_ENDPOINT on_new_endpoint, void* callback_context, ON_CONNECTION_STATE_CHANGED on_connection_state_changed, void* on_connection_state_changed_context, ON_IO_ERROR on_io_error, void* on_io_error_context)
 {
-    CONNECTION_HANDLE result;
+    CONNECTION_HANDLE connection;
 
     if ((xio == NULL) ||
         (container_id == NULL))
@@ -1059,143 +1066,145 @@
         /* Codes_SRS_CONNECTION_01_071: [If xio or container_id is NULL, connection_create shall return NULL.] */
         LogError("Bad arguments: xio = %p, container_id = %p",
             xio, container_id);
-        result = NULL;
+        connection = NULL;
     }
     else
     {
-        result = (CONNECTION_HANDLE)malloc(sizeof(CONNECTION_INSTANCE));
+        connection = (CONNECTION_HANDLE)malloc(sizeof(CONNECTION_INSTANCE));
         /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */
-        if (result == NULL)
+        if (connection == NULL)
         {
             LogError("Cannot allocate memory for connection");
         }
         else
         {
-            result->io = xio;
+            connection->io = xio;
 
             /* Codes_SRS_CONNECTION_01_082: [connection_create shall allocate a new frame_codec instance to be used for frame encoding/decoding.] */
-            result->frame_codec = frame_codec_create(frame_codec_error, result);
-            if (result->frame_codec == NULL)
+            connection->frame_codec = frame_codec_create(frame_codec_error, connection);
+            if (connection->frame_codec == NULL)
             {
                 /* Codes_SRS_CONNECTION_01_083: [If frame_codec_create fails then connection_create shall return NULL.] */
                 LogError("Cannot create frame_codec");
-                free(result);
-                result = NULL;
+                free(connection);
+                connection = NULL;
             }
             else
             {
-                result->amqp_frame_codec = amqp_frame_codec_create(result->frame_codec, on_amqp_frame_received, on_empty_amqp_frame_received, amqp_frame_codec_error, result);
-                if (result->amqp_frame_codec == NULL)
+                connection->amqp_frame_codec = amqp_frame_codec_create(connection->frame_codec, on_amqp_frame_received, on_empty_amqp_frame_received, amqp_frame_codec_error, connection);
+                if (connection->amqp_frame_codec == NULL)
                 {
                     /* Codes_SRS_CONNECTION_01_108: [If amqp_frame_codec_create fails, connection_create shall return NULL.] */
                     LogError("Cannot create amqp_frame_codec");
-                    frame_codec_destroy(result->frame_codec);
-                    free(result);
-                    result = NULL;
+                    frame_codec_destroy(connection->frame_codec);
+                    free(connection);
+                    connection = NULL;
                 }
                 else
                 {
                     if (hostname != NULL)
                     {
                         size_t hostname_length = strlen(hostname);
-                        result->host_name = (char*)malloc(hostname_length + 1);
-                        if (result->host_name == NULL)
+                        connection->host_name = (char*)malloc(hostname_length + 1);
+                        if (connection->host_name == NULL)
                         {
                             /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */
                             LogError("Cannot allocate memory for host name");
-                            amqp_frame_codec_destroy(result->amqp_frame_codec);
-                            frame_codec_destroy(result->frame_codec);
-                            free(result);
-                            result = NULL;
+                            amqp_frame_codec_destroy(connection->amqp_frame_codec);
+                            frame_codec_destroy(connection->frame_codec);
+                            free(connection);
+                            connection = NULL;
                         }
                         else
                         {
-                            (void)memcpy(result->host_name, hostname, hostname_length + 1);
+                            (void)memcpy(connection->host_name, hostname, hostname_length + 1);
                         }
                     }
                     else
                     {
-                        result->host_name = NULL;
+                        connection->host_name = NULL;
                     }
 
-                    if (result != NULL)
+                    if (connection != NULL)
                     {
                         size_t container_id_length = strlen(container_id);
-                        result->container_id = (char*)malloc(container_id_length + 1);
-                        if (result->container_id == NULL)
+                        connection->container_id = (char*)malloc(container_id_length + 1);
+                        if (connection->container_id == NULL)
                         {
                             /* Codes_SRS_CONNECTION_01_081: [If allocating the memory for the connection fails then connection_create shall return NULL.] */
                             LogError("Cannot allocate memory for container_id");
-                            free(result->host_name);
-                            amqp_frame_codec_destroy(result->amqp_frame_codec);
-                            frame_codec_destroy(result->frame_codec);
-                            free(result);
-                            result = NULL;
+                            free(connection->host_name);
+                            amqp_frame_codec_destroy(connection->amqp_frame_codec);
+                            frame_codec_destroy(connection->frame_codec);
+                            free(connection);
+                            connection = NULL;
                         }
                         else
                         {
-                            result->tick_counter = tickcounter_create();
-                            if (result->tick_counter == NULL)
+                            connection->tick_counter = tickcounter_create();
+                            if (connection->tick_counter == NULL)
                             {
                                 LogError("Cannot create tick counter");
-                                free(result->container_id);
-                                free(result->host_name);
-                                amqp_frame_codec_destroy(result->amqp_frame_codec);
-                                frame_codec_destroy(result->frame_codec);
-                                free(result);
-                                result = NULL;
+                                free(connection->container_id);
+                                free(connection->host_name);
+                                amqp_frame_codec_destroy(connection->amqp_frame_codec);
+                                frame_codec_destroy(connection->frame_codec);
+                                free(connection);
+                                connection = NULL;
                             }
                             else
                             {
-                                (void)memcpy(result->container_id, container_id, container_id_length + 1);
+                                (void)memcpy(connection->container_id, container_id, container_id_length + 1);
 
                                 /* Codes_SRS_CONNECTION_01_173: [<field name="max-frame-size" type="uint" default="4294967295"/>] */
-                                result->max_frame_size = 4294967295u;
+                                connection->max_frame_size = 4294967295u;
                                 /* Codes: [<field name="channel-max" type="ushort" default="65535"/>] */
-                                result->channel_max = 65535;
+                                connection->channel_max = 65535;
 
                                 /* Codes_SRS_CONNECTION_01_175: [<field name="idle-time-out" type="milliseconds"/>] */
                                 /* Codes_SRS_CONNECTION_01_192: [A value of zero is the same as if it was not set (null).] */
-                                result->idle_timeout = 0;
-                                result->remote_idle_timeout = 0;
+                                connection->idle_timeout = 0;
+                                connection->remote_idle_timeout = 0;
+                                connection->remote_idle_timeout_send_frame_millisecond = 0;
+                                connection->idle_timeout_empty_frame_send_ratio = 0.5;
 
-                                result->endpoint_count = 0;
-                                result->endpoints = NULL;
-                                result->header_bytes_received = 0;
-                                result->is_remote_frame_received = 0;
+                                connection->endpoint_count = 0;
+                                connection->endpoints = NULL;
+                                connection->header_bytes_received = 0;
+                                connection->is_remote_frame_received = 0;
 
-                                result->is_underlying_io_open = 0;
-                                result->remote_max_frame_size = 512;
-                                result->is_trace_on = 0;
+                                connection->is_underlying_io_open = 0;
+                                connection->remote_max_frame_size = 512;
+                                connection->is_trace_on = 0;
 
                                 /* Mark that settings have not yet been set by the user */
-                                result->idle_timeout_specified = 0;
+                                connection->idle_timeout_specified = 0;
 
-                                result->on_new_endpoint = on_new_endpoint;
-                                result->on_new_endpoint_callback_context = callback_context;
+                                connection->on_new_endpoint = on_new_endpoint;
+                                connection->on_new_endpoint_callback_context = callback_context;
 
-                                result->on_io_error = on_io_error;
-                                result->on_io_error_callback_context = on_io_error_context;
-                                result->on_connection_state_changed = on_connection_state_changed;
-                                result->on_connection_state_changed_callback_context = on_connection_state_changed_context;
+                                connection->on_io_error = on_io_error;
+                                connection->on_io_error_callback_context = on_io_error_context;
+                                connection->on_connection_state_changed = on_connection_state_changed;
+                                connection->on_connection_state_changed_callback_context = on_connection_state_changed_context;
 
-                                if (tickcounter_get_current_ms(result->tick_counter, &result->last_frame_received_time) != 0)
+                                if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0)
                                 {
                                     LogError("Could not retrieve time for last frame received time");
-                                    tickcounter_destroy(result->tick_counter);
-                                    free(result->container_id);
-                                    free(result->host_name);
-                                    amqp_frame_codec_destroy(result->amqp_frame_codec);
-                                    frame_codec_destroy(result->frame_codec);
-                                    free(result);
-                                    result = NULL;
+                                    tickcounter_destroy(connection->tick_counter);
+                                    free(connection->container_id);
+                                    free(connection->host_name);
+                                    amqp_frame_codec_destroy(connection->amqp_frame_codec);
+                                    frame_codec_destroy(connection->frame_codec);
+                                    free(connection);
+                                    connection = NULL;
                                 }
                                 else
                                 {
-                                    result->last_frame_sent_time = result->last_frame_received_time;
+                                    connection->last_frame_sent_time = connection->last_frame_received_time;
 
                                     /* Codes_SRS_CONNECTION_01_072: [When connection_create succeeds, the state of the connection shall be CONNECTION_STATE_START.] */
-                                    connection_set_state(result, CONNECTION_STATE_START);
+                                    connection_set_state(connection, CONNECTION_STATE_START);
                                 }
                             }
                         }
@@ -1205,7 +1214,7 @@
         }
     }
 
-    return result;
+    return connection;
 }
 
 void connection_destroy(CONNECTION_HANDLE connection)
@@ -1546,7 +1555,7 @@
 
 uint64_t connection_handle_deadlines(CONNECTION_HANDLE connection)
 {
-    uint64_t local_deadline = (uint64_t )-1;
+    uint64_t local_deadline = (uint64_t)-1;
     uint64_t remote_deadline = (uint64_t)-1;
 
     if (connection == NULL)
@@ -1586,7 +1595,7 @@
             {
                 /* Calculate time until remote idle timeout expires */
 
-                uint64_t remote_idle_timeout = (connection->remote_idle_timeout / 2);
+                uint64_t remote_idle_timeout = connection->remote_idle_timeout_send_frame_millisecond;
                 uint64_t time_since_last_sent = current_ms - connection->last_frame_sent_time;
 
                 if (time_since_last_sent < remote_idle_timeout)
@@ -1883,3 +1892,24 @@
         connection->is_trace_on = trace_on ? 1 : 0;
     }
 }
+
+int connection_set_remote_idle_timeout_empty_frame_send_ratio(CONNECTION_HANDLE connection, double idle_timeout_empty_frame_send_ratio)
+{
+    int result;
+
+    if ((connection == NULL) ||
+        (idle_timeout_empty_frame_send_ratio <= 0.0) ||
+        (idle_timeout_empty_frame_send_ratio > 1.0))
+    {
+        LogError("Bad arguments: connection = %p, idle_timeout_empty_frame_send_ratio = %f",
+            connection, idle_timeout_empty_frame_send_ratio);
+        result = __FAILURE__;
+    }
+    else
+    {
+        connection->idle_timeout_empty_frame_send_ratio = idle_timeout_empty_frame_send_ratio;
+        result = 0;
+    }
+
+    return result;
+}