A small footprint MQTT library

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
3:9b4e7158ca0d
Parent:
1:8dba42ff9701
Child:
4:e7167dabd6e4
--- a/mqtt_client.c	Mon May 09 14:37:25 2016 -0700
+++ b/mqtt_client.c	Fri Jun 17 17:03:06 2016 -0700
@@ -19,6 +19,7 @@
 #define DUPLICATE_FLAG_MASK             0x8
 #define CONNECT_PACKET_MASK             0xf0
 #define TIME_MAX_BUFFER                 16
+#define DEFAULT_MAX_PING_RESPONSE_TIME  90
 
 static const char* FORMAT_HEX_CHAR = "0x%02x ";
 
@@ -40,6 +41,8 @@
     bool socketConnected;
     bool logTrace;
     bool rawBytesTrace;
+    uint64_t timeSincePing;
+    uint16_t maxPingRespTime;
 } MQTT_CLIENT;
 
 static uint16_t byteutil_read_uint16(uint8_t** buffer)
@@ -162,17 +165,27 @@
 
 static void logIncomingMsgTrace(MQTT_CLIENT* clientData, CONTROL_PACKET_TYPE packet, int flags, const uint8_t* data, size_t length)
 {
-    if (clientData != NULL && data != NULL && length > 0 && clientData->logTrace)
+    if (clientData != NULL && clientData->logTrace)
     {
-        char tmBuffer[TIME_MAX_BUFFER];
-        getLogTime(tmBuffer, TIME_MAX_BUFFER);
+        if (data != NULL && length > 0)
+        {
+            char tmBuffer[TIME_MAX_BUFFER];
+            getLogTime(tmBuffer, TIME_MAX_BUFFER);
 
-        LOG(clientData->logFunc, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length);
-        for (size_t index = 0; index < length; index++)
+            LOG(clientData->logFunc, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length);
+            for (size_t index = 0; index < length; index++)
+            {
+                LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, data[index]);
+            }
+            LOG(clientData->logFunc, LOG_LINE, "");
+        }
+        else if (packet == PINGRESP_TYPE)
         {
-            LOG(clientData->logFunc, 0, (char*)FORMAT_HEX_CHAR, data[index]);
+            char tmBuffer[TIME_MAX_BUFFER];
+            getLogTime(tmBuffer, TIME_MAX_BUFFER);
+
+            LOG(clientData->logFunc, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((unsigned char)packet), (unsigned char)(packet | flags), length);
         }
-        LOG(clientData->logFunc, LOG_LINE, "");
     }
 }
 
@@ -317,161 +330,162 @@
 static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
 {
     MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
-    if (mqttData != NULL && headerData != NULL)
+    if ((mqttData != NULL && headerData != NULL) || packet == PINGRESP_TYPE)
     {
         size_t len = BUFFER_length(headerData);
         uint8_t* iterator = BUFFER_u_char(headerData);
 
         logIncomingMsgTrace(mqttData, packet, flags, iterator, len);
 
-        if (iterator != NULL && len > 0)
+        if ((iterator != NULL && len > 0) || packet == PINGRESP_TYPE)
         {
             switch (packet)
             {
-            case CONNACK_TYPE:
-            {
-                if (mqttData->fnOperationCallback != NULL)
+                case CONNACK_TYPE:
                 {
-                    /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
-                    CONNECT_ACK connack = { 0 };
-                    connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
-                    connack.returnCode = byteutil_readByte(&iterator);
-
-                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx);
+                    if (mqttData->fnOperationCallback != NULL)
+                    {
+                        /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
+                        CONNECT_ACK connack = { 0 };
+                        connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
+                        connack.returnCode = byteutil_readByte(&iterator);
 
-                    if (connack.returnCode == CONNECTION_ACCEPTED)
-                    {
-                        mqttData->clientConnected = true;
+                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqttData->ctx);
+
+                        if (connack.returnCode == CONNECTION_ACCEPTED)
+                        {
+                            mqttData->clientConnected = true;
+                        }
                     }
+                    break;
                 }
-                break;
-            }
-            case PUBLISH_TYPE:
-            {
-                if (mqttData->fnMessageRecv != NULL)
+                case PUBLISH_TYPE:
                 {
-                    //uint8_t ctrlPacket = byteutil_readByte(&iterator);
-                    bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
-                    bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
-                    QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
+                    if (mqttData->fnMessageRecv != NULL)
+                    {
+                        //uint8_t ctrlPacket = byteutil_readByte(&iterator);
+                        bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
+                        bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
+                        QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
+
+                        uint8_t* initialPos = iterator;
+                        char* topicName = byteutil_readUTF(&iterator, NULL);
+                        uint16_t packetId = 0;
+                        if (qosValue != DELIVER_AT_MOST_ONCE)
+                        {
+                            packetId = byteutil_read_uint16(&iterator);
+                        }
+                        size_t length = len - (iterator - initialPos);
 
-                    uint8_t* initialPos = iterator;
-                    char* topicName = byteutil_readUTF(&iterator, NULL);
-                    uint16_t packetId = 0;
-                    if (qosValue != DELIVER_AT_MOST_ONCE)
-                    {
-                        packetId = byteutil_read_uint16(&iterator);
+                        MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
+                        if (msgHandle == NULL)
+                        {
+                            LOG(mqttData->logFunc, LOG_LINE, "failure in mqttmessage_create");
+                        }
+                        else
+                        {
+                            (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg);
+                            (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg);
+                            mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+
+                            BUFFER_HANDLE pubRel = NULL;
+                            if (qosValue == DELIVER_EXACTLY_ONCE)
+                            {
+                                pubRel = mqtt_codec_publishReceived(packetId);
+                            }
+                            else if (qosValue == DELIVER_AT_LEAST_ONCE)
+                            {
+                                pubRel = mqtt_codec_publishAck(packetId);
+                            }
+                            if (pubRel != NULL)
+                            {
+                                (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
+                                BUFFER_delete(pubRel);
+                            }
+                            free(topicName);
+                            mqttmessage_destroy(msgHandle);
+                        }
                     }
-                    size_t length = len - (iterator - initialPos);
-
-                    MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
-                    if (msgHandle == NULL)
+                    break;
+                }
+                case PUBACK_TYPE:
+                case PUBREC_TYPE:
+                case PUBREL_TYPE:
+                case PUBCOMP_TYPE:
+                {
+                    if (mqttData->fnOperationCallback)
                     {
-                        LOG(mqttData->logFunc, LOG_LINE, "failure in mqttmessage_create");
-                    }
-                    else
-                    {
-                        (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg);
-                        (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg);
-                        mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+                        /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
+                        MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
+                            (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
+                            (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
+
+                        PUBLISH_ACK publish_ack = { 0 };
+                        publish_ack.packetId = byteutil_read_uint16(&iterator);
 
                         BUFFER_HANDLE pubRel = NULL;
-                        if (qosValue == DELIVER_EXACTLY_ONCE)
+                        mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx);
+                        if (packet == PUBREC_TYPE)
                         {
-                            pubRel = mqtt_codec_publishReceived(packetId);
+                            pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
                         }
-                        else if (qosValue == DELIVER_AT_LEAST_ONCE)
+                        else if (packet == PUBREL_TYPE)
                         {
-                            pubRel = mqtt_codec_publishAck(packetId);
+                            pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
                         }
                         if (pubRel != NULL)
                         {
                             (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
                             BUFFER_delete(pubRel);
                         }
-                        free(topicName);
-                        mqttmessage_destroy(msgHandle);
                     }
-                }
-                break;
-            }
-            case PUBACK_TYPE:
-            case PUBREC_TYPE:
-            case PUBREL_TYPE:
-            case PUBCOMP_TYPE:
-            {
-                if (mqttData->fnOperationCallback)
-                {
-                    /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
-                    MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
-                        (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
-                        (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
-
-                    PUBLISH_ACK publish_ack = { 0 };
-                    publish_ack.packetId = byteutil_read_uint16(&iterator);
-
-                    BUFFER_HANDLE pubRel = NULL;
-                    mqttData->fnOperationCallback(mqttData, action, (void*)&publish_ack, mqttData->ctx);
-                    if (packet == PUBREC_TYPE)
-                    {
-                        pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
-                    }
-                    else if (packet == PUBREL_TYPE)
-                    {
-                        pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
-                    }
-                    if (pubRel != NULL)
-                    {
-                        (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
-                        BUFFER_delete(pubRel);
-                    }
+                    break;
                 }
-                break;
-            }
-            case SUBACK_TYPE:
-            {
-                if (mqttData->fnOperationCallback)
+                case SUBACK_TYPE:
                 {
-                    /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
-                    SUBSCRIBE_ACK suback = { 0 };
+                    if (mqttData->fnOperationCallback)
+                    {
+                        /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
+                        SUBSCRIBE_ACK suback = { 0 };
+
+                        size_t remainLen = len;
+                        suback.packetId = byteutil_read_uint16(&iterator);
+                        remainLen -= 2;
 
-                    size_t remainLen = len;
-                    suback.packetId = byteutil_read_uint16(&iterator);
-                    remainLen -= 2;
-
-                    // Allocate the remaining len
-                    suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
-                    if (suback.qosReturn != NULL)
-                    {
-                        while (remainLen > 0)
+                        // Allocate the remaining len
+                        suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
+                        if (suback.qosReturn != NULL)
                         {
-                            suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
-                            remainLen--;
+                            while (remainLen > 0)
+                            {
+                                suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
+                                remainLen--;
+                            }
+                            (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
+                            free(suback.qosReturn);
                         }
-                        (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
-                        free(suback.qosReturn);
                     }
+                    break;
                 }
-                break;
-            }
-            case UNSUBACK_TYPE:
-            {
-                if (mqttData->fnOperationCallback)
+                case UNSUBACK_TYPE:
                 {
-                    /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
-                    UNSUBSCRIBE_ACK unsuback = { 0 };
-                    iterator += VARIABLE_HEADER_OFFSET;
-                    unsuback.packetId = byteutil_read_uint16(&iterator);
+                    if (mqttData->fnOperationCallback)
+                    {
+                        /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
+                        UNSUBSCRIBE_ACK unsuback = { 0 };
+                        iterator += VARIABLE_HEADER_OFFSET;
+                        unsuback.packetId = byteutil_read_uint16(&iterator);
 
-                    (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
+                        (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
+                    }
+                    break;
                 }
-                break;
-            }
-            case PINGRESP_TYPE:
-                // Ping responses do not get forwarded
-                break;
-            default:
-                break;
+                case PINGRESP_TYPE:
+                    mqttData->timeSincePing = 0;
+                    // Ping responses do not get forwarded
+                    break;
+                default:
+                    break;
             }
         }
     }
@@ -515,6 +529,8 @@
             result->clientConnected = false;
             result->logTrace = false;
             result->rawBytesTrace = false;
+            result->timeSincePing = 0;
+            result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
             if (result->packetTickCntr == NULL)
             {
                 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
@@ -580,6 +596,7 @@
             mqttData->packetState = UNKNOWN_TYPE;
             mqttData->qosValue = mqttOptions->qualityOfServiceValue;
             mqttData->keepAliveInterval = mqttOptions->keepAliveInterval;
+            mqttData->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
             if (cloneMqttOptions(mqttData, mqttOptions) != 0)
             {
                 LOG(mqttData->logFunc, LOG_LINE, "Error: Clone Mqtt Options failed");
@@ -790,7 +807,21 @@
             }
             else
             {
-                if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval)
+                /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Operation Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
+                if (mqttData->timeSincePing > 0 && ((current_ms - mqttData->timeSincePing)/1000) > mqttData->maxPingRespTime)
+                {
+                    // We haven't gotten a ping response in the alloted time
+                    if (mqttData->fnOperationCallback != NULL)
+                    {
+                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_NO_PING_RESPONSE, NULL, mqttData->ctx);
+                    }
+                    mqttData->socketConnected = false;
+                    mqttData->clientConnected = false;
+                    mqttData->timeSincePing = 0;
+                    mqttData->packetSendTimeMs = 0;
+                    mqttData->packetState = UNKNOWN_TYPE;
+                }
+                else if ((((current_ms - mqttData->packetSendTimeMs) / 1000) + KEEP_ALIVE_BUFFER_SEC) > mqttData->keepAliveInterval)
                 {
                     /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
                     BUFFER_HANDLE pingPacket = mqtt_codec_ping();
@@ -798,6 +829,7 @@
                     {
                         (void)sendPacketItem(mqttData, BUFFER_u_char(pingPacket), BUFFER_length(pingPacket));
                         BUFFER_delete(pingPacket);
+                        (void)tickcounter_get_current_ms(mqttData->packetTickCntr, &mqttData->timeSincePing);
                     }
                 }
             }