Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
5:8d58d20699dd
Parent:
4:57e049bce51e
Child:
7:07bc440836b3
--- a/iothubtransportamqp.c	Thu Oct 22 18:33:15 2015 -0700
+++ b/iothubtransportamqp.c	Mon Nov 02 19:21:13 2015 -0800
@@ -104,6 +104,123 @@
     }
     return result;
 }
+
+static int setRecvMessageIds(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg)
+{
+    int result;
+
+    // Function can not fail
+    pn_atom_t msgIdInfo = pn_message_get_id(msg);
+    if (msgIdInfo.type != PN_NULL)
+    {
+        if (msgIdInfo.type != PN_STRING)
+        {
+            LogError("Message Id Failure: Invalid message id Type %d\r\n", msgIdInfo.type);
+            result = __LINE__;
+        }
+        else
+        {
+            if (IoTHubMessage_SetMessageId(ioTMessage, msgIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK)
+            {
+                result = __LINE__;
+            }
+            else
+            {
+                result = 0;
+            }
+        }
+    }
+    else
+    {
+        // NULL values mean it's not set
+        result = 0;
+    }
+
+    if (result == 0)
+    {
+        pn_atom_t corrIdInfo = pn_message_get_correlation_id(msg);
+        if (corrIdInfo.type != PN_NULL)
+        {
+            if (corrIdInfo.type != PN_STRING)
+            {
+                LogError("Message Id Failure: Invalid correlation id Type %d\r\n", corrIdInfo.type);
+                result = __LINE__;
+            }
+            else
+            {
+                if (IoTHubMessage_SetCorrelationId(ioTMessage, corrIdInfo.u.as_bytes.start) != IOTHUB_MESSAGE_OK)
+                {
+                    result = __LINE__;
+                }
+                else
+                {
+                    result = 0;
+                }
+            }
+        }
+        else
+        {
+            // NULL values mean it's not set
+            result = 0;
+        }
+    }
+    return result;
+}
+
+static int setSendMessageIds(PDLIST_ENTRY messagesEntry, pn_message_t* msg)
+{
+    int result;
+
+    IOTHUB_MESSAGE_LIST* currentMessage = containingRecord(messagesEntry->Flink, IOTHUB_MESSAGE_LIST, entry);
+    const char* messageId = IoTHubMessage_GetMessageId(currentMessage->messageHandle);
+    if (messageId != NULL)
+    {
+        pn_bytes_t dataBytes = pn_bytes(strlen(messageId), messageId);
+        pn_atom_t pnMsgId;
+        pnMsgId.type = PN_STRING;
+        pnMsgId.u.as_bytes = dataBytes;
+        if (pn_message_set_id(msg, pnMsgId) == 0)
+        {
+            result = 0;
+        }
+        else
+        {
+            LogError("Failure setting pn_message_set_id.\r\n");
+            result = __LINE__;
+        }
+    }
+    else
+    {
+        result = 0;
+    }
+
+    if (result == 0)
+    {
+        const char* correlationId = IoTHubMessage_GetCorrelationId(currentMessage->messageHandle);
+        if (correlationId != NULL)
+        {
+            pn_bytes_t dataBytes = pn_bytes(strlen(correlationId), correlationId);
+            pn_atom_t pnCorrelationId;
+            pnCorrelationId.type = PN_STRING;
+            pnCorrelationId.u.as_bytes = dataBytes;
+            if (pn_message_set_correlation_id(msg, pnCorrelationId) == 0)
+            {
+                result = 0;
+            }
+            else
+            {
+                LogError("Failure setting pn_message_set_correlation_id.\r\n");
+                result = __LINE__;
+            }
+        }
+        else
+        {
+            result = 0;
+        }
+    }
+    return result;
+}
+
 /* Code_SRS_IOTHUBTRANSPORTTAMQP_07_001: [All IoTHubMessage_Properties shall be enumerated and entered in the pn_message_properties Map.] */
 static int setProperties(PDLIST_ENTRY messagesMakingUpBatch, pn_message_t* msg)
 {
@@ -464,12 +581,12 @@
         STRING_delete(transportState->cbsAddress);
         STRING_delete(transportState->deviceKey);
         STRING_delete(transportState->zeroLengthString);
-		if (transportState->trustedCertificates != NULL)
-		{
-			free(transportState->trustedCertificates);
-		}
+        if (transportState->trustedCertificates != NULL)
+        {
+            free(transportState->trustedCertificates);
+        }
 
-		free(transportState);
+        free(transportState);
     }
 }
 
@@ -711,15 +828,15 @@
             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_028: [If pn_messenger returns NULL then the messenger initialization fails.]*/
             LogError("The messenger is not able to be created.\r\n");
         }
-		/* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */
-		else if ((amqpState->trustedCertificates != NULL) &&
-			(strlen(amqpState->trustedCertificates) > 0) &&
-			(pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0))
-		{
-			/* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */
-			LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n");
-		}
-		/*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/
+        /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_001: [If the option "TrustedCerts" has been set (length greater than 0), the trusted certificates string shall be passed to Proton by a call to pn_messenger_set_trusted_certificates.] */
+        else if ((amqpState->trustedCertificates != NULL) &&
+            (strlen(amqpState->trustedCertificates) > 0) &&
+            (pn_messenger_set_trusted_certificates(amqpState->messenger, amqpState->trustedCertificates) != 0))
+        {
+            /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_002: [If pn_messenger_set_trusted_certificates fails, then messenger initialization shall fail.] */
+            LogError("unable to pass certificate information via pn_messenger_set_trusted_certificates\r\n");
+        }
+        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_029: [pn_messenger_start shall be invoked.]*/
         else if (pn_messenger_start(amqpState->messenger) != 0)
         {
             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_030: [If pn_messenger_start returns a non-zero value then messenger initialization fails.]*/
@@ -988,6 +1105,13 @@
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
                 break;
             }
+            else if (setSendMessageIds(&headOfAvailable->eventMessages, transportState->message))
+            {
+                /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_081: [sendEvent will take the item that had been the head of the waitingToSend and put it back at the head of the waitingToSend list.]*/
+                putSecondListAfterHeadOfFirst(transportState->waitingToSend, &headOfAvailable->eventMessages);
+                /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_082: [sendEvent will then break out of the send loop.]*/
+                break;
+            }
             else
             {
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_083: [If the proton API is successful then invoke pn_messenger_put.]*/
@@ -1204,7 +1328,14 @@
                 }
                 else if (cloneProperties(ioTMessage, transportState->message) != 0)
                 {
-                    LogError("Failed to set data for IoT hub message\r\n");
+                    LogError("Failed to clone properties for IoT hub message\r\n");
+                    pn_messenger_release(transportState->messenger, tracker, 0);
+                    pn_messenger_settle(transportState->messenger, tracker, 0);
+                }
+                /* Codes_SRS_IOTHUBTRANSPORTTAMQP_07_003: [If the pn_message_get_id value is not NULL then the value will be set by calling IotHubMessage_SetMessageId.] */
+                else if (setRecvMessageIds(ioTMessage, transportState->message) != 0)
+                {
+                    LogError("Failed to set MessageId for IoT hub message\r\n");
                     pn_messenger_release(transportState->messenger, tracker, 0);
                     pn_messenger_settle(transportState->messenger, tracker, 0);
                 }
@@ -1491,7 +1622,7 @@
             DList_InitializeListHead(&amqpState->availableWorkItems);
             DList_InitializeListHead(&amqpState->messengerCorral);
             amqpState->DoWork_PullMessages = false;
-			amqpState->trustedCertificates = NULL;
+            amqpState->trustedCertificates = NULL;
             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
             if ((amqpState->message = pn_message()) == NULL)
             {
@@ -1638,47 +1769,47 @@
     /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_002: [If parameter optionName is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
     /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_003: [If parameter value is NULL then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
 
-	if (
-		(handle == NULL) ||
-		(option == NULL) ||
-		(value == NULL)
-		)
-	{
-		result = IOTHUB_CLIENT_INVALID_ARG;
-		LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n");
-	}
-	else
-	{
-		/* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */
-		/* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */
-		if (strcmp("TrustedCerts", option) == 0)
-		{
-			PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle;
-			char* newTrsutedCertificates;
+    if (
+        (handle == NULL) ||
+        (option == NULL) ||
+        (value == NULL)
+        )
+    {
+        result = IOTHUB_CLIENT_INVALID_ARG;
+        LogError("invalid parameter (NULL) passed to clientTransportAMQP_SetOption\r\n");
+    }
+    else
+    {
+        /* Codes_SRS_IOTHUBTRANSPORTTAMQP_02_005: [clientTransportAMQP_SetOption shall set the option "optionName" to *value.] */
+        /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_003: ["TrustedCerts"] */
+        if (strcmp("TrustedCerts", option) == 0)
+        {
+            PAMQP_TRANSPORT_STATE handleData = (PAMQP_TRANSPORT_STATE)handle;
+            char* newTrsutedCertificates;
 
-			if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0)
-			{
-				/* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */
-				result = IOTHUB_CLIENT_ERROR;
-			}
-			else
-			{
-				/* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */
-				if (handleData->trustedCertificates != NULL)
-				{
-					free(handleData->trustedCertificates);
-				}
+            if (mallocAndStrcpy_s(&newTrsutedCertificates, (const char*)value) != 0)
+            {
+                /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_006: [If any other error occurs while setting the option, clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_ERROR.] */
+                result = IOTHUB_CLIENT_ERROR;
+            }
+            else
+            {
+                /* Codes_SRS_IOTHUBTRANSPORTTAMQP_01_004: [Sets a string that should be used as trusted certificates by the transport, freeing any previous TrustedCerts option value.] */
+                if (handleData->trustedCertificates != NULL)
+                {
+                    free(handleData->trustedCertificates);
+                }
 
-				handleData->trustedCertificates = newTrsutedCertificates;
-				result = IOTHUB_CLIENT_OK;
-			}
-		}
-		else
-		{
-			/*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/
-			result = IOTHUB_CLIENT_INVALID_ARG;
-		}
-	}
+                handleData->trustedCertificates = newTrsutedCertificates;
+                result = IOTHUB_CLIENT_OK;
+            }
+        }
+        else
+        {
+            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_004: [If optionName is not an option supported then clientTransportAMQP_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.]*/
+            result = IOTHUB_CLIENT_INVALID_ARG;
+        }
+    }
 
     return result;
 }