Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

Revision:
2:1e6040e0c035
Parent:
0:1b5f413bf328
--- a/iothubtransportamqp.c	Tue Sep 15 23:52:53 2015 -0700
+++ b/iothubtransportamqp.c	Tue Sep 22 20:36:39 2015 -0700
@@ -225,7 +225,7 @@
     return result;
 }
 
-/* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On notifications the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */
+/* Code_SRS_IOTHUBTRANSPORTTAMQP_07_002: [On messages the properties shall be retrieved from the message using pn_message_properties and will be entered in the IoTHubMessage_Properties Map.] */
 static int cloneProperties(IOTHUB_MESSAGE_HANDLE ioTMessage, pn_message_t* msg)
 {
     int result = 0;
@@ -457,7 +457,7 @@
         {
             pn_message_free(transportState->message);
         }
-        STRING_delete(transportState->notificationAddress);
+        STRING_delete(transportState->messageAddress);
         STRING_delete(transportState->eventAddress);
         STRING_delete(transportState->urledDeviceId);
         STRING_delete(transportState->devicesPortionPath);
@@ -772,10 +772,10 @@
             if (amqpState->putTokenWasSuccessful == true)
             {
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_041: [pn_messenger_subscribe shall be invoked.]*/
-                if ((amqpState->notificationSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->notificationAddress))) == NULL)
+                if ((amqpState->messageSubscription = pn_messenger_subscribe(amqpState->messenger, (const char*)STRING_c_str(amqpState->messageAddress))) == NULL)
                 {
                     /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_042: [If pn_messenger_subscribe returns a non-zero value then messenger initialization fails.]*/
-                    LogError("unable to create a subscription to the notification address\r\n");
+                    LogError("unable to create a subscription to the message address\r\n");
                 }
                 else
                 {
@@ -962,7 +962,7 @@
         if (amqpBatchResult == amqp_batch_nowork)
         {
             // no work to do.  Stop trying.
-            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the notification.]*/
+            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_089: [If there is nothing on the list then go on to the message.]*/
             break;
         }
         else if (amqpBatchResult == amqp_batch_error)
@@ -1134,10 +1134,10 @@
     pn_messenger_settle(transportState->messenger, tracker, 0);
 }
 
-static void processNotification(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker)
+static void processMessage(PAMQP_TRANSPORT_STATE transportState, pn_tracker_t tracker)
 {
-    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullNotification is false then the message shall be rejected and processNotification shall return.]*/
-    if (transportState->DoWork_PullNotifications == false)
+    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_111: [If the state variable DoWork_PullMessage is false then the message shall be rejected and processMessage shall return.]*/
+    if (transportState->DoWork_PullMessages == false)
     {
         //
         // Nothing to do with the message.  We reject it.  In that way it can come back later.
@@ -1149,10 +1149,10 @@
     {
         pn_data_t *body;
 
-        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processNotification will acquire the body of the notification by invoking pn_message_body.]*/
+        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_050: [processMessage will acquire the body of the message by invoking pn_message_body.]*/
         if ((body = pn_message_body(transportState->message)) == NULL)
         {
-            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the notification will be abandoned.]*/
+            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_051: [If the body is NULL then the message will be abandoned.]*/
             LogError("Failed to get the proton message body\r\n");
             pn_messenger_release(transportState->messenger, tracker, 0);
             pn_messenger_settle(transportState->messenger, tracker, 0);
@@ -1160,8 +1160,8 @@
         else
         {
             pn_bytes_t sizeAndData;
-            bool keepProcessingThisNotification = true;
-            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processNotification will check for a null (note the lowercase) body by invoking pn_data_next.]*/
+            bool keepProcessingThisMessage = true;
+            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_054: [processMessage will check for a null (note the lowercase) body by invoking pn_data_next.]*/
             if (pn_data_next(body) == false)
             {
                 //
@@ -1174,16 +1174,16 @@
             }
             else
             {
-                /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processNotification will check that the body is of type PN_BINARY.]*/
+                /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_055: [If the body is NOT null then processMessage will check that the body is of type PN_BINARY.]*/
                 if (PN_BINARY != pn_data_type(body))
                 {
-                    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processNotification will abandon the notification with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/
-                    LogError("Notification received via AMQP was not PN_BINARY\r\n");
+                    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_056: [processMessage will abandon the message with body types that are NOT PN_BINARY, the IOTHUB_MESSAGE_HANDLE will be destroyed and the loop will be continued.]*/
+                    LogError("Message received via AMQP was not PN_BINARY\r\n");
                     sizeAndData.size = 0;           // Compilers get upset.
                     sizeAndData.start = NULL;
                     pn_messenger_release(transportState->messenger, tracker, 0);
                     pn_messenger_settle(transportState->messenger, tracker, 0);
-                    keepProcessingThisNotification = false;
+                    keepProcessingThisMessage = false;
                 }
                 else
                 {
@@ -1191,13 +1191,13 @@
                     sizeAndData = pn_data_get_binary(body);
                 }
             }
-            if (keepProcessingThisNotification == true)
+            if (keepProcessingThisMessage == true)
             {
                 IOTHUB_MESSAGE_HANDLE ioTMessage;
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_058: [The IOTHUB_MESSAGE_HANDLE will be set by invoking IotHubMessage_SetData with the previously saved length and pointer to payload.]*/
                 if ((ioTMessage = IoTHubMessage_CreateFromByteArray((const unsigned char*)sizeAndData.start, sizeAndData.size)) == NULL)
                 {
-                    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processNotification will abandon the notification if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/
+                    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_059: [processMessage will abandon the message if IotHubMessage_SetData returns any status other than IOTHUB_MESSAGE_OK.]*/
                     LogError("Failed to set data for IoT hub message\r\n");
                     pn_messenger_release(transportState->messenger, tracker, 0);
                     pn_messenger_settle(transportState->messenger, tracker, 0);
@@ -1210,27 +1210,27 @@
                 }
                 else
                 {
-                    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_NotificationCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/
-                    IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_NotificationCallback(transportState->savedClientHandle, ioTMessage);
+                    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_060: [If IOTHUB_MESSAGE_OK had been returned IoTHubClient_LL_MessageCallback will be invoked with the IOTHUB_MESSAGE_HANDLE.]*/
+                    IOTHUBMESSAGE_DISPOSITION_RESULT upperLayerDisposition = IoTHubClient_LL_MessageCallback(transportState->savedClientHandle, ioTMessage);
 
                     if (upperLayerDisposition == IOTHUBMESSAGE_ACCEPTED)
                     {
-                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_NotificationCallback returns IOTHUBMESSAGE_ACCEPTED value then the notification will be accepted.] */
+                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_006: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ACCEPTED value then the message will be accepted.] */
                         pn_messenger_accept(transportState->messenger, tracker, 0);
                     }
                     else if (upperLayerDisposition == IOTHUBMESSAGE_ABANDONED)
                     {
-                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_NotificationCallback returns IOTHUBMESSAGE_ABANDONED value then the notification will be abandoned.] */
+                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_02_007: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_ABANDONED value then the message will be abandoned.] */
                         pn_messenger_release(transportState->messenger, tracker, 0);
                     }
                     else
                     {
-                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_NotificationCallback returns IOTHUBMESSAGE_REJECTED value then the notification will be rejected.]*/
+                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_061: [If IoTHubClient_MessageCallback returns IOTHUBMESSAGE_REJECTED value then the message will be rejected.]*/
                         pn_messenger_reject(transportState->messenger, tracker, 0);
                     }
                     pn_messenger_settle(transportState->messenger, tracker, 0);
                 }
-                /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processNotification will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/
+                /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_063: [processMessage will destroy the IOTHUB_MESSAGE_HANDLE by invoking IoTHubMessage_Destroy.]*/
                 IoTHubMessage_Destroy(ioTMessage);
             }
         }
@@ -1242,16 +1242,16 @@
     PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
     int receiveResult;
 
-    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullNotification is false then DoWork will cancel any preceding notifications by invoking pn_messenger_recv.]*/
+    /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_043: [If DoWork_PullMessage is false then DoWork will cancel any preceding messages by invoking pn_messenger_recv.]*/
 
-    if ((transportState->DoWork_PullNotifications == false) && (transportState->waitingForPutTokenReply == false))
+    if ((transportState->DoWork_PullMessages == false) && (transportState->waitingForPutTokenReply == false))
     {
         if ((receiveResult = pn_messenger_recv(transportState->messenger, 0)) != 0)
         {
             if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT))
             {
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/
-                LogError("Error attempting to receive notifications: %d\r\n", receiveResult);
+                LogError("Error attempting to receive messages: %d\r\n", receiveResult);
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/
                 transportState->messengerInitialized = false;
             }
@@ -1259,13 +1259,13 @@
     }
     else
     {
-        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullNotification or the waitingForPutTokenReply flag is true then processReceives will pull notifications with pn_messenger_recv.]*/
+        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_044: [If the DoWork_PullMessage or the waitingForPutTokenReply flag is true then processReceives will pull messages with pn_messenger_recv.]*/
         if ((receiveResult = pn_messenger_recv(transportState->messenger, PROTON_INCOMING_WINDOW_SIZE)) != 0)
         {
             if ((receiveResult != PN_INPROGRESS) && (receiveResult != PN_TIMEOUT))
             {
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_046: [If pn_messenger_recv fails it will be logged.]*/
-                LogError("Error attempting to receive notifications: %d\r\n", receiveResult);
+                LogError("Error attempting to receive messages: %d\r\n", receiveResult);
                 /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_105: [processReceives will then go on to the next action of DoWork.]*/
                 transportState->messengerInitialized = false;
             }
@@ -1276,7 +1276,7 @@
             // If any are present we need to dispose of messages in the incoming queue.
             //
             size_t depthOfReceiveQueue;
-            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of notifications received by invoking pn_messenger_incoming.]*/
+            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_045: [processReceives will check the number of messages received by invoking pn_messenger_incoming.]*/
             for (depthOfReceiveQueue = pn_messenger_incoming(transportState->messenger); depthOfReceiveQueue > 0; depthOfReceiveQueue--)
             {
                 int result;
@@ -1307,16 +1307,16 @@
                     // Currently we can receive two types of messages.  We can recieve status messages from the $CBS that indicates the success
                     // or failure of renewing the SAS for the device.
                     //
-                    // Otherwise we can recieve a simple notification.  We can get a pointer to the subscription from the tracker.  We can compare
+                    // Otherwise we can recieve a simple message.  We can get a pointer to the subscription from the tracker.  We can compare
                     // that pointer to the subscriptions that we stored when we initialized the messenger.  We then dispatch appropriately.
                     //
 
                     /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_106: [processReceives acquires the subscription of the message by invoking pn_messneger_incoming_subscription.] This value is henceforth known as theMessageSource.*/
                     theMessageSource = pn_messenger_incoming_subscription(transportState->messenger);
-                    if (theMessageSource == transportState->notificationSubscription)
+                    if (theMessageSource == transportState->messageSubscription)
                     {
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_108: [If theMessageSource is equal to the state variable cbsSubscription invoke processCBSReply.]*/
-                        processNotification(transportState, tracker);
+                        processMessage(transportState, tracker);
                     }
                     else if (theMessageSource == transportState->cbsSubscription)
                     {
@@ -1382,11 +1382,11 @@
         (STRING_concat(state->eventAddress, EVENT_ENDPOINT) != 0)) ? (false) : (true);
 }
 
-static bool createNotificationAddress(PAMQP_TRANSPORT_STATE state)
+static bool createMessageAddress(PAMQP_TRANSPORT_STATE state)
 {
-    return (((state->notificationAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
-        (STRING_concat_with_STRING(state->notificationAddress, state->devicesPortionPath) != 0) ||
-        (STRING_concat(state->notificationAddress, NOTIFICATION_ENDPOINT) != 0)) ? (false) : (true);
+    return (((state->messageAddress = STRING_construct(AMQPS_SCHEME)) == NULL) ||
+        (STRING_concat_with_STRING(state->messageAddress, state->devicesPortionPath) != 0) ||
+        (STRING_concat(state->messageAddress, MESSAGE_ENDPOINT) != 0)) ? (false) : (true);
 }
 
 static bool createCbsAddress(PAMQP_TRANSPORT_STATE state, const char* host)
@@ -1468,7 +1468,7 @@
         }
         else
         {
-            amqpState->notificationAddress = NULL;
+            amqpState->messageAddress = NULL;
             amqpState->eventAddress = NULL;
             amqpState->urledDeviceId = NULL;
             amqpState->devicesPortionPath = NULL;
@@ -1476,7 +1476,7 @@
             amqpState->deviceKey = NULL;
             amqpState->savedClientHandle = NULL;
             amqpState->messenger = NULL;
-            amqpState->notificationSubscription = NULL;
+            amqpState->messageSubscription = NULL;
             amqpState->cbsSubscription = NULL;
             amqpState->zeroLengthString = NULL;
             amqpState->messengerInitialized = false;
@@ -1490,7 +1490,7 @@
             DList_InitializeListHead(&amqpState->workInProgress);
             DList_InitializeListHead(&amqpState->availableWorkItems);
             DList_InitializeListHead(&amqpState->messengerCorral);
-            amqpState->DoWork_PullNotifications = false;
+            amqpState->DoWork_PullMessages = false;
 			amqpState->trustedCertificates = NULL;
             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_003: [clientTransportAMQP_Create shall fail and return NULL if memory allocation of the transports basic internal state structures fails.]*/
             if ((amqpState->message = pn_message()) == NULL)
@@ -1530,14 +1530,14 @@
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_091: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as urledDeviceId, by url encoding the config->deviceId.]*/
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_093: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as devicePortionOfPath, from the following pieces: host + "/devices/" + urledDeviceId.]*/
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_097: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as eventAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/events".]*/
-                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as notificationAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/
+                        /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_099: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as messageAddress, from the following pieces: "amqps://" + devicesPortionOfPath + "/messages/devicebound".]*/
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_101: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as cbsAddress, from the following pieces: "amqps://" + host + "/$cbs".]*/
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_103: [clientTransportAMQP_Create shall create an immutable string, referred henceforth as deviceKey, from the config->deviceKey.]*/
                         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_186: [An empty string shall be created.  If this fails then clientTransportAMQP shall fail and return NULL.]*/
                         if ((!createUrledDeviceId(amqpState, config)) ||
                             (!createDevicesPortionPath(amqpState, STRING_c_str(host))) ||
                             (!createEventAddress(amqpState)) ||
-                            (!createNotificationAddress(amqpState)) ||
+                            (!createMessageAddress(amqpState)) ||
                             (!createCbsAddress(amqpState, STRING_c_str(host))) ||
                             ((amqpState->deviceKey = STRING_construct(config->upperConfig->deviceKey)) == NULL) ||
                             ((amqpState->zeroLengthString = STRING_new()) == NULL))
@@ -1545,7 +1545,7 @@
                             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_092: [If creating the urledDeviceId fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
                             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_094: [If creating the devicePortionOfPath fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
                             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_098: [If creating the eventAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
-                            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the notificationAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
+                            /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_100: [If creating the messageAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
                             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_102: [If creating the cbsAddress fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
                             /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_104: [If creating the deviceKey fails for any reason then clientTransportAMQP_Create shall fail and return NULL.]*/
                             clientTransportAMQP_Destroy(amqpState);
@@ -1579,7 +1579,7 @@
     {
         /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_014: [clientTransportAMQP_Subscribe shall succeed and return a value of zero.]*/
         PAMQP_TRANSPORT_STATE transportState = (PAMQP_TRANSPORT_STATE)handle;
-        transportState->DoWork_PullNotifications = true;
+        transportState->DoWork_PullMessages = true;
         result = 0;
     }
     return result;
@@ -1591,7 +1591,7 @@
     /*Codes_SRS_IOTHUBTRANSPORTTAMQP_06_015: [clientTransportAMQP_Unsubscribe shall do nothing if handle is NULL.]*/
     if (transportState)
     {
-        transportState->DoWork_PullNotifications = false;
+        transportState->DoWork_PullMessages = false;
     }
 }
 
@@ -1688,7 +1688,7 @@
     clientTransportAMQP_Create, clientTransportAMQP_Destroy, clientTransportAMQP_Subscribe, clientTransportAMQP_Unsubscribe, clientTransportAMQP_DoWork, clientTransportAMQP_GetSendStatus
 };
 
-extern const void* IoTHubTransportAmqp_ProvideTransportInterface(void)
+extern const void* AMQP_Protocol(void)
 {
     return &myfunc;
 }