Azure IoT / azure_umqtt_c

Dependents:   STM32F746_iothub_client_sample_mqtt FXOS8700CQ_To_Azure_IoT f767zi_mqtt FXOS8700CQ_To_Azure_IoT ... more

Files at this revision

API Documentation at this revision

Comitter:
AzureIoTClient
Date:
Fri Jul 29 15:58:49 2016 -0700
Parent:
4:e7167dabd6e4
Child:
6:d9c4702f91ca
Commit message:
1.0.10

Changed in this revision

azure_umqtt_c/mqttconst.h Show annotated file Show diff for this revision Revisions of this file
mqtt_client.c Show annotated file Show diff for this revision Revisions of this file
mqtt_codec.c Show annotated file Show diff for this revision Revisions of this file
--- a/azure_umqtt_c/mqttconst.h	Fri Jul 01 10:42:59 2016 -0700
+++ b/azure_umqtt_c/mqttconst.h	Fri Jul 29 15:58:49 2016 -0700
@@ -56,7 +56,7 @@
     char* willMessage;
     char* username;
     char* password;
-    int keepAliveInterval;
+    uint16_t keepAliveInterval;
     bool messageRetain;
     bool useCleanSession;
     QOS_VALUE qualityOfServiceValue;
--- a/mqtt_client.c	Fri Jul 01 10:42:59 2016 -0700
+++ b/mqtt_client.c	Fri Jul 29 15:58:49 2016 -0700
@@ -95,7 +95,7 @@
 static void sendComplete(void* context, IO_SEND_RESULT send_result)
 {
     MQTT_CLIENT* mqttData = (MQTT_CLIENT*)context;
-    if (mqttData != NULL && mqttData->fnOperationCallback != NULL)
+    if (mqttData != NULL && mqttData->fnOperationCallback != NULL && send_result == IO_SEND_OK)
     {
         if (mqttData->packetState == DISCONNECT_TYPE)
         {
@@ -103,11 +103,22 @@
             mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_DISCONNECT, NULL, mqttData->ctx);
 
             // close the xio
-            (void)xio_close(mqttData->xioHandle, NULL, mqttData->ctx);
+            if (xio_close(mqttData->xioHandle, NULL, mqttData->ctx) != 0)
+            {
+                LOG(LOG_ERROR, LOG_LINE, "MQTT xio_close failed to return a successful result.");
+            }
             mqttData->socketConnected = false;
             mqttData->clientConnected = false;
         }
     }
+    else
+    {
+        LOG(LOG_ERROR, LOG_LINE, "MQTT Send Complete Failure");
+        if (mqttData->fnOperationCallback)
+        {
+            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+        }
+    }
 }
 
 static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
@@ -245,7 +256,7 @@
         }
         else if (open_result == IO_OPEN_ERROR)
         {
-            (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
         }
     }
 }
@@ -368,47 +379,87 @@
                 {
                     if (mqttData->fnMessageRecv != NULL)
                     {
-                        //uint8_t ctrlPacket = byteutil_readByte(&iterator);
                         bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
                         bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
                         QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
 
                         uint8_t* initialPos = iterator;
                         char* topicName = byteutil_readUTF(&iterator, NULL);
-                        uint16_t packetId = 0;
-                        if (qosValue != DELIVER_AT_MOST_ONCE)
+                        if (topicName == NULL)
                         {
-                            packetId = byteutil_read_uint16(&iterator);
-                        }
-                        size_t length = len - (iterator - initialPos);
-
-                        MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
-                        if (msgHandle == NULL)
-                        {
-                            LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
+                            LOG(LOG_ERROR, LOG_LINE, "Publish MSG: failure reading topic name");
+                            if (mqttData->fnOperationCallback)
+                            {
+                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                            }
                         }
                         else
                         {
-                            (void)mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg);
-                            (void)mqttmessage_setIsRetained(msgHandle, isRetainMsg);
-                            mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+                            uint16_t packetId = 0;
+                            if (qosValue != DELIVER_AT_MOST_ONCE)
+                            {
+                                packetId = byteutil_read_uint16(&iterator);
+                            }
+                            size_t length = len - (iterator - initialPos);
 
-                            BUFFER_HANDLE pubRel = NULL;
-                            if (qosValue == DELIVER_EXACTLY_ONCE)
+                            MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create(packetId, topicName, qosValue, iterator, length);
+                            if (msgHandle == NULL)
                             {
-                                pubRel = mqtt_codec_publishReceived(packetId);
+                                LOG(LOG_ERROR, LOG_LINE, "failure in mqttmessage_create");
+                                if (mqttData->fnOperationCallback)
+                                {
+                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                }
                             }
-                            else if (qosValue == DELIVER_AT_LEAST_ONCE)
+                            else
                             {
-                                pubRel = mqtt_codec_publishAck(packetId);
-                            }
-                            if (pubRel != NULL)
-                            {
-                                (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
-                                BUFFER_delete(pubRel);
+                                if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
+                                    mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
+                                {
+                                    LOG(LOG_ERROR, LOG_LINE, "failure setting mqtt message property");
+                                    if (mqttData->fnOperationCallback)
+                                    {
+                                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                    }
+                                }
+                                else
+                                {
+                                    mqttData->fnMessageRecv(msgHandle, mqttData->ctx);
+
+                                    BUFFER_HANDLE pubRel = NULL;
+                                    if (qosValue == DELIVER_EXACTLY_ONCE)
+                                    {
+                                        pubRel = mqtt_codec_publishReceived(packetId);
+                                        if (pubRel == NULL)
+                                        {
+                                            LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish receive message.");
+                                            if (mqttData->fnOperationCallback)
+                                            {
+                                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                            }
+                                        }
+                                    }
+                                    else if (qosValue == DELIVER_AT_LEAST_ONCE)
+                                    {
+                                        pubRel = mqtt_codec_publishAck(packetId);
+                                        if (pubRel == NULL)
+                                        {
+                                            LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish ack message.");
+                                            if (mqttData->fnOperationCallback)
+                                            {
+                                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                            }
+                                        }
+                                    }
+                                    if (pubRel != NULL)
+                                    {
+                                        (void)sendPacketItem(mqttData, BUFFER_u_char(pubRel), BUFFER_length(pubRel));
+                                        BUFFER_delete(pubRel);
+                                    }
+                                }
+                                mqttmessage_destroy(msgHandle);
                             }
                             free(topicName);
-                            mqttmessage_destroy(msgHandle);
                         }
                     }
                     break;
@@ -433,10 +484,26 @@
                         if (packet == PUBREC_TYPE)
                         {
                             pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
+                            if (pubRel == NULL)
+                            {
+                                LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish release message.");
+                                if (mqttData->fnOperationCallback)
+                                {
+                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                }
+                            }
                         }
                         else if (packet == PUBREL_TYPE)
                         {
                             pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
+                            if (pubRel == NULL)
+                            {
+                                LOG(LOG_ERROR, LOG_LINE, "Failed to allocate publish complete message.");
+                                if (mqttData->fnOperationCallback)
+                                {
+                                    mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                                }
+                            }
                         }
                         if (pubRel != NULL)
                         {
@@ -466,9 +533,17 @@
                                 suback.qosReturn[suback.qosCount++] = byteutil_readByte(&iterator);
                                 remainLen--;
                             }
-                            (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
+                            mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqttData->ctx);
                             free(suback.qosReturn);
                         }
+                        else
+                        {
+                            LOG(LOG_ERROR, LOG_LINE, "allocation of quality of service value failed.");
+                            if (mqttData->fnOperationCallback)
+                            {
+                                mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_ERROR, NULL, mqttData->ctx);
+                            }
+                        }
                     }
                     break;
                 }
@@ -481,7 +556,7 @@
                         iterator += VARIABLE_HEADER_OFFSET;
                         unsuback.packetId = byteutil_read_uint16(&iterator);
 
-                        (void)mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
+                        mqttData->fnOperationCallback(mqttData, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqttData->ctx);
                     }
                     break;
                 }
@@ -583,6 +658,7 @@
     /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
     if (handle == NULL || mqttOptions == NULL)
     {
+        LOG(LOG_ERROR, LOG_LINE, "mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p)", handle, mqttOptions);
         result = __LINE__;
     }
     else
--- a/mqtt_codec.c	Fri Jul 01 10:42:59 2016 -0700
+++ b/mqtt_codec.c	Fri Jul 29 15:58:49 2016 -0700
@@ -269,7 +269,7 @@
     return result;
 }
 
-static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, int flags, uint16_t packetId)
+static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, uint8_t flags, uint16_t packetId)
 {
     BUFFER_HANDLE result = BUFFER_new();
     if (result != NULL)
@@ -289,7 +289,7 @@
             }
             else
             {
-                *iterator = type | flags;
+                *iterator = (uint8_t)type | flags;
                 iterator++;
                 *iterator = 0x2;
                 iterator++;
@@ -339,7 +339,7 @@
         else
         {
             uint8_t* iterator = BUFFER_u_char(fixedHeader);
-            *iterator = packetType | flags;
+            *iterator = (uint8_t)packetType | flags;
             iterator++;
             (void)memcpy(iterator, remainSize, index);
 
@@ -400,7 +400,7 @@
         {
             result = __LINE__;
         }
-        else if (usernameLen > 0 && passwordLen == 0)
+        else if (usernameLen == 0 && passwordLen > 0)
         {
             result = __LINE__;
         }
@@ -440,8 +440,12 @@
                 }
                 if (usernameLen > 0)
                 {
-                    packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG | PASSWORD_FLAG;
+                    packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
                     byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
+                }
+                if (passwordLen > 0)
+                {
+                    packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
                     byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
                 }
                 // TODO: Get the rest of the flags
@@ -915,7 +919,6 @@
             }
             else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
             {
-                size_t payloadLen = 0;
                 if (codec_Data->headerData == NULL)
                 {
                     codec_Data->codecState = CODEC_STATE_PAYLOAD;