A small footprint MQTT library

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
9:37d14c31ff6e
Parent:
8:83bb166aba73
Child:
10:2ab268507775
--- a/mqtt_client.c	Thu Oct 20 17:07:55 2016 -0700
+++ b/mqtt_client.c	Wed Nov 16 21:38:15 2016 -0800
@@ -9,6 +9,7 @@
 #include "azure_c_shared_utility/xlogging.h"
 #include "azure_c_shared_utility/strings.h"
 #include "azure_c_shared_utility/agenttime.h"
+#include "azure_c_shared_utility/threadapi.h"
 
 #include "azure_umqtt_c/mqtt_client.h"
 #include "azure_umqtt_c/mqtt_codec.h"
@@ -23,6 +24,7 @@
 #define CONNECT_PACKET_MASK             0xf0
 #define TIME_MAX_BUFFER                 16
 #define DEFAULT_MAX_PING_RESPONSE_TIME  80  // % of time to send pings
+#define MAX_CLOSE_RETRIES               10
 
 static const char* FORMAT_HEX_CHAR = "0x%02x ";
 static const char* TRUE_CONST = "true";
@@ -40,6 +42,8 @@
     ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
     ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
     void* ctx;
+    ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack;
+    void* errorCBCtx;
     QOS_VALUE qosValue;
     uint16_t keepAliveInterval;
     MQTT_CLIENT_OPTIONS mqttOptions;
@@ -51,6 +55,37 @@
     uint16_t maxPingRespTime;
 } MQTT_CLIENT;
 
+static void on_connection_closed(void* context)
+{
+    size_t* close_complete = (size_t*)context;
+    *close_complete = 1;
+}
+
+static void close_connection(MQTT_CLIENT* mqtt_client)
+{
+    size_t close_complete = 0;
+    (void)xio_close(mqtt_client->xioHandle, on_connection_closed, &close_complete);
+
+    size_t counter = 0;
+    do
+    {
+        xio_dowork(mqtt_client->xioHandle);
+        counter++;
+        ThreadAPI_Sleep(10);
+    } while (close_complete == 0 && counter < MAX_CLOSE_RETRIES);
+    mqtt_client->socketConnected = false;
+    mqtt_client->clientConnected = false;
+}
+
+static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type)
+{
+    if (mqtt_client->fnOnErrorCallBack)
+    {
+        mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx);
+    }
+    close_connection(mqtt_client);
+}
+
 static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client)
 {
     STRING_HANDLE trace_log;
@@ -127,29 +162,30 @@
 static void sendComplete(void* context, IO_SEND_RESULT send_result)
 {
     MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
-    if (mqtt_client != NULL && mqtt_client->fnOperationCallback != NULL && send_result == IO_SEND_OK)
+    if (mqtt_client != NULL)
     {
-        if (mqtt_client->packetState == DISCONNECT_TYPE)
+        if (send_result == IO_SEND_OK)
         {
-            /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/
-            mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
-
-            // close the xio
-            if (xio_close(mqtt_client->xioHandle, NULL, mqtt_client->ctx) != 0)
+            if (mqtt_client->packetState == DISCONNECT_TYPE)
             {
-                LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result.");
+                /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
+                if (mqtt_client->fnOperationCallback != NULL)
+                {
+                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
+                }
+                // close the xio
+                close_connection(mqtt_client);
             }
-            mqtt_client->socketConnected = false;
-            mqtt_client->clientConnected = false;
+        }
+        else if (send_result == IO_SEND_ERROR)
+        {
+            LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure send_result: %d", (int)send_result);
+            set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
         }
     }
     else
     {
-        LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure send_result: %d", (int)send_result);
-        if (mqtt_client->fnOperationCallback)
-        {
-            mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-        }
+        LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure with NULL mqtt_client");
     }
 }
 
@@ -331,10 +367,11 @@
         }
         else
         {
-            if (mqtt_client->fnOperationCallback)
+            if (mqtt_client->socketConnected == false && mqtt_client->fnOnErrorCallBack)
             {
-                mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
+                mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
             }
+            close_connection(mqtt_client);
         }
     }
     else
@@ -350,10 +387,7 @@
     {
         if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0)
         {
-            if (mqtt_client->fnOperationCallback)
-            {
-                mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-            }
+            set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
         }
     }
     else
@@ -367,11 +401,9 @@
     MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
     if (mqtt_client != NULL && mqtt_client->fnOperationCallback)
     {
-        /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT or MQTT_CLIENT_ON_ERROR the the msgInfo value shall be NULL.]*/
-        mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-        mqtt_client->socketConnected = false;
+        /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
         /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */
-        (void)xio_close(mqtt_client->xioHandle, NULL, NULL); 
+        set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR);
     }
     else
     {
@@ -506,10 +538,7 @@
                         if (topicName == NULL)
                         {
                             LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name");
-                            if (mqtt_client->fnOperationCallback)
-                            {
-                                mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                            }
+                            set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
                             if (trace_log != NULL)
                             {
                                 STRING_delete(trace_log);
@@ -537,10 +566,7 @@
                             if (msgHandle == NULL)
                             {
                                 LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
-                                if (mqtt_client->fnOperationCallback)
-                                {
-                                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                                }
+                                set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                                 if (trace_log != NULL) {
                                     STRING_delete(trace_log);
                                 }
@@ -551,10 +577,7 @@
                                     mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
                                 {
                                     LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property");
-                                    if (mqtt_client->fnOperationCallback)
-                                    {
-                                        mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                                    }
+                                    set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                                     if (trace_log != NULL) {
                                         STRING_delete(trace_log);
                                     }
@@ -577,10 +600,7 @@
                                         if (pubRel == NULL)
                                         {
                                             LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message.");
-                                            if (mqtt_client->fnOperationCallback)
-                                            {
-                                                mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                                            }
+                                            set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                                         }
                                     }
                                     else if (qosValue == DELIVER_AT_LEAST_ONCE)
@@ -589,10 +609,7 @@
                                         if (pubRel == NULL)
                                         {
                                             LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message.");
-                                            if (mqtt_client->fnOperationCallback)
-                                            {
-                                                mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                                            }
+                                            set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                                         }
                                     }
                                     if (pubRel != NULL)
@@ -642,10 +659,7 @@
                             if (pubRel == NULL)
                             {
                                 LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message.");
-                                if (mqtt_client->fnOperationCallback)
-                                {
-                                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                                }
+                                set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                             }
                         }
                         else if (packet == PUBREL_TYPE)
@@ -654,10 +668,7 @@
                             if (pubRel == NULL)
                             {
                                 LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message.");
-                                if (mqtt_client->fnOperationCallback)
-                                {
-                                    mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                                }
+                                set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                             }
                         }
                         if (pubRel != NULL)
@@ -711,10 +722,7 @@
                         else
                         {
                             LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed.");
-                            if (mqtt_client->fnOperationCallback)
-                            {
-                                mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_ERROR, NULL, mqtt_client->ctx);
-                            }
+                            set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
                         }
                     }
                     break;
@@ -757,7 +765,7 @@
     }
 }
 
-MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* callbackCtx)
+MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK opCallback, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx)
 {
     MQTT_CLIENT* result;
     /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
@@ -780,8 +788,10 @@
             result->packetState = UNKNOWN_TYPE;
             result->packetSendTimeMs = 0;
             result->fnOperationCallback = opCallback;
+            result->ctx = opCallbackCtx;
             result->fnMessageRecv = msgRecv;
-            result->ctx = callbackCtx;
+            result->fnOnErrorCallBack = onErrorCallBack;
+            result->errorCBCtx = errorCBCtx;
             result->qosValue = DELIVER_AT_MOST_ONCE;
             result->keepAliveInterval = 0;
             result->packetTickCntr = tickcounter_create();
@@ -1099,16 +1109,11 @@
             }
             else
             {
-                /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Operation Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
+                /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
                 if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime)
                 {
                     // We haven't gotten a ping response in the alloted time
-                    if (mqtt_client->fnOperationCallback != NULL)
-                    {
-                        mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE, NULL, mqtt_client->ctx);
-                    }
-                    mqtt_client->socketConnected = false;
-                    mqtt_client->clientConnected = false;
+                    set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE);
                     mqtt_client->timeSincePing = 0;
                     mqtt_client->packetSendTimeMs = 0;
                     mqtt_client->packetState = UNKNOWN_TYPE;