A small footprint MQTT library

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Revision:
5:34779607059c
Parent:
4:e7167dabd6e4
Child:
6:d9c4702f91ca
--- a/mqtt_client.c	Fri Jul 01 10:42:59 2016 -0700
+++ b/mqtt_client.c	Fri Jul 29 15:58:49 2016 -0700
@@ -95,7 +95,7 @@
 static void sendComplete(void* context, IO_SEND_RESULT send_result)
 {
     MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
-    if (mqttData != NULL && mqttData->fnOperationCallback != NULL)
+    if (mqttData != NULL && mqttData->fnOperationCallback != NULL && send_result == IO_SEND_OK)
     {
         if (mqttData->packetState == DISCONNECT_TYPE)
         {
@@ -103,11 +103,22 @@
             mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx);
 
             // close the xio
-            (void)xio_close(mqttData->xioHandle, NULL, mqttData->ctx);
+            if (xio_close(mqttData->xioHandle, NULL, mqttData->ctx) != 0)
+            {
+                LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result.");
+            }
             mqttData->socketConnected = false;
             mqttData->clientConnected = false;
         }
     }
+    else
+    {
+        LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure");
+        if (mqttData->fnOperationCallback)
+        {
+            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+        }
+    }
 }
 
 static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
@@ -245,7 +256,7 @@
         }
         else if (open_result == IO_OPEN_ERROR)
         {
-            (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
         }
     }
 }
@@ -368,47 +379,87 @@
                 {
                     if (mqttData->fnMessageRecv != NULL)
                     {
-                        //uint8_t ctrlPacket = byteutil_readByte(&iterator);
                         bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
                         bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
                         QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
 
                         uint8_t* initialPos = iterator;
                         char* topicName = byteutil_readUTF(&iterator, NULL);
-                        uint16_t packetId = 0;
-                        if (qosValue != DELIVER_AT_MOST_ONCE)
+                        if (topicName == NULL)
                         {
-                            packetId = byteutil_read_uint16(&iterator);
-                        }
-                        size_t length = len - (iterator - initialPos);
-
-                        MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
-                        if (msgHandle == NULL)
-                        {
-                            LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
+                            LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name");
+                            if (mqttData->fnOperationCallback)
+                            {
+                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                            }
                         }
                         else
                         {
-                            (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg);
-                            (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg);
-                            mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+                            uint16_t packetId = 0;
+                            if (qosValue != DELIVER_AT_MOST_ONCE)
+                            {
+                                packetId = byteutil_read_uint16(&iterator);
+                            }
+                            size_t length = len - (iterator - initialPos);
 
-                            BUFFER_HANDLE pubRel = NULL;
-                            if (qosValue == DELIVER_EXACTLY_ONCE)
+                            MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
+                            if (msgHandle == NULL)
                             {
-                                pubRel = mqtt_codec_publishReceived(packetId);
+                                LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
+                                if (mqttData->fnOperationCallback)
+                                {
+                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                }
                             }
-                            else if (qosValue == DELIVER_AT_LEAST_ONCE)
+                            else
                             {
-                                pubRel = mqtt_codec_publishAck(packetId);
-                            }
-                            if (pubRel != NULL)
-                            {
-                                (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
-                                BUFFER_delete(pubRel);
+                                if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
+                                    mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
+                                {
+                                    LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property");
+                                    if (mqttData->fnOperationCallback)
+                                    {
+                                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                    }
+                                }
+                                else
+                                {
+                                    mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+
+                                    BUFFER_HANDLE pubRel = NULL;
+                                    if (qosValue == DELIVER_EXACTLY_ONCE)
+                                    {
+                                        pubRel = mqtt_codec_publishReceived(packetId);
+                                        if (pubRel == NULL)
+                                        {
+                                            LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message.");
+                                            if (mqttData->fnOperationCallback)
+                                            {
+                                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                            }
+                                        }
+                                    }
+                                    else if (qosValue == DELIVER_AT_LEAST_ONCE)
+                                    {
+                                        pubRel = mqtt_codec_publishAck(packetId);
+                                        if (pubRel == NULL)
+                                        {
+                                            LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message.");
+                                            if (mqttData->fnOperationCallback)
+                                            {
+                                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                            }
+                                        }
+                                    }
+                                    if (pubRel != NULL)
+                                    {
+                                        (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
+                                        BUFFER_delete(pubRel);
+                                    }
+                                }
+                                mqttmessage_destroy(msgHandle);
                             }
                             free(topicName);
-                            mqttmessage_destroy(msgHandle);
                         }
                     }
                     break;
@@ -433,10 +484,26 @@
                         if (packet == PUBREC_TYPE)
                         {
                             pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
+                            if (pubRel == NULL)
+                            {
+                                LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message.");
+                                if (mqttData->fnOperationCallback)
+                                {
+                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                }
+                            }
                         }
                         else if (packet == PUBREL_TYPE)
                         {
                             pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
+                            if (pubRel == NULL)
+                            {
+                                LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message.");
+                                if (mqttData->fnOperationCallback)
+                                {
+                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                }
+                            }
                         }
                         if (pubRel != NULL)
                         {
@@ -466,9 +533,17 @@
                                 suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
                                 remainLen--;
                             }
-                            (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
+                            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
                             free(suback.qosReturn);
                         }
+                        else
+                        {
+                            LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed.");
+                            if (mqttData->fnOperationCallback)
+                            {
+                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                            }
+                        }
                     }
                     break;
                 }
@@ -481,7 +556,7 @@
                         iterator += VARIABLE_HEADER_OFFSET;
                         unsuback.packetId = byteutil_read_uint16(&iterator);
 
-                        (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
+                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
                     }
                     break;
                 }
@@ -583,6 +658,7 @@
     /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
     if (handle == NULL || mqttOptions == NULL)
     {
+        LOG(LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions);
         result = __LINE__;
     }
     else