IoTHub raw messaging client sample using AMQP

Dependencies:   iothub_client EthernetInterface NTPClient iothub_amqp_transport azure_c_shared_utility mbed-rtos mbed azure_uamqp_c wolfSSL

This sample showcases the usage of Azure IoT client libraries with the AMQP transport for sending/receiving raw messages from an IoT Hub.

Revision:
43:a4b2614eb860
Parent:
40:548ef685e177
Child:
47:d87920c7f211
--- a/iothub_client_sample_amqp.c	Tue Jun 07 10:50:09 2016 -0700
+++ b/iothub_client_sample_amqp.c	Fri Jun 17 17:03:45 2016 -0700
@@ -21,6 +21,8 @@
 static const char* connectionString = "[device connection string]";
 
 static int callbackCounter;
+static bool g_continueRunning;
+#define MESSAGE_COUNT 5
 
 DEFINE_ENUM_STRINGS(IOTHUB_CLIENT_CONFIRMATION_RESULT, IOTHUB_CLIENT_CONFIRMATION_RESULT_VALUES);
 
@@ -35,21 +37,21 @@
     int* counter = (int*)userContextCallback;
     const unsigned char* buffer = NULL;
     size_t size = 0;
-	const char* messageId;
-	const char* correlationId;
+    const char* messageId;
+    const char* correlationId;
 
-	// AMQP message properties
-	if ((messageId = IoTHubMessage_GetMessageId(message)) == NULL)
-	{
-		messageId = "<null>";
-	}
+    // AMQP message properties
+    if ((messageId = IoTHubMessage_GetMessageId(message)) == NULL)
+    {
+        messageId = "<null>";
+    }
 
-	if ((correlationId = IoTHubMessage_GetCorrelationId(message)) == NULL)
-	{
-		correlationId = "<null>";
-	}
+    if ((correlationId = IoTHubMessage_GetCorrelationId(message)) == NULL)
+    {
+        correlationId = "<null>";
+    }
 
-	// AMQP message content.
+    // AMQP message content.
     IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(message);
 
     if (contentType == IOTHUBMESSAGE_BYTEARRAY)
@@ -68,6 +70,8 @@
         if ((buffer = IoTHubMessage_GetString(message)) != NULL && (size = strlen(buffer)) > 0)
         {
             (void)printf("Received Message [%d] (message-id: %s, correlation-id: %s) with STRING Data: <<<%.*s>>> & Size=%d\r\n", *counter, messageId, correlationId, (int)size, buffer, (int)size);
+
+            // If we receive the work 'quit' then we stop running
         }
         else
         {
@@ -102,6 +106,11 @@
         }
     }
 
+    if (memcmp(buffer, "quit", size) == 0)
+    {
+        g_continueRunning = false;
+    }
+
     /* Some device specific action code goes here... */
     (*counter)++;
     return IOTHUBMESSAGE_ACCEPTED;
@@ -122,10 +131,11 @@
 
 void iothub_client_sample_amqp_run(void)
 {
-    IOTHUB_CLIENT_HANDLE iotHubClientHandle;
+    IOTHUB_CLIENT_LL_HANDLE iotHubClientHandle;
 
     EVENT_INSTANCE messages[MESSAGE_COUNT];
 
+    g_continueRunning = true;
     srand((unsigned int)time(NULL));
     double avgWindSpeed = 10.0;
 
@@ -140,23 +150,25 @@
     }
     else
     {
-        if ((iotHubClientHandle = IoTHubClient_CreateFromConnectionString(connectionString, AMQP_Protocol)) == NULL)
+        if ((iotHubClientHandle = IoTHubClient_LL_CreateFromConnectionString(connectionString, AMQP_Protocol)) == NULL)
         {
             (void)printf("ERROR: iotHubClientHandle is NULL!\r\n");
         }
         else
         {
+            bool traceOn = true;
+            IoTHubClient_LL_SetOption(iotHubClientHandle, "logtrace", &traceOn);
 
 #ifdef MBED_BUILD_TIMESTAMP
             // For mbed add the certificate information
-            if (IoTHubClient_SetOption(iotHubClientHandle, "TrustedCerts", certificates) != IOTHUB_CLIENT_OK)
+            if (IoTHubClient_LL_SetOption(iotHubClientHandle, "TrustedCerts", certificates) != IOTHUB_CLIENT_OK)
             {
                 printf("failure to set option \"TrustedCerts\"\r\n");
             }
 #endif // MBED_BUILD_TIMESTAMP
 
             /* Setting Message call back, so we can receive Commands. */
-            if (IoTHubClient_SetMessageCallback(iotHubClientHandle, ReceiveMessageCallback, &receiveContext) != IOTHUB_CLIENT_OK)
+            if (IoTHubClient_LL_SetMessageCallback(iotHubClientHandle, ReceiveMessageCallback, &receiveContext) != IOTHUB_CLIENT_OK)
             {
                 (void)printf("ERROR: IoTHubClient_SetMessageCallback..........FAILED!\r\n");
             }
@@ -165,42 +177,51 @@
                 (void)printf("IoTHubClient_SetMessageCallback...successful.\r\n");
 
                 /* Now that we are ready to receive commands, let's send some messages */
-                for (size_t i = 0; i < MESSAGE_COUNT; i++)
+                size_t iterator = 0;
+                do
                 {
-                    sprintf_s(msgText, sizeof(msgText), "{\"deviceId\":\"myFirstDevice\",\"windSpeed\":%.2f}", avgWindSpeed + (rand() % 4 + 2));
-                    if ((messages[i].messageHandle = IoTHubMessage_CreateFromByteArray((const unsigned char*)msgText, strlen(msgText))) == NULL)
-                    {
-                        (void)printf("ERROR: iotHubMessageHandle is NULL!\r\n");
-                    }
-                    else
+                    if (iterator < MESSAGE_COUNT)
                     {
-                        messages[i].messageTrackingId = i;
-
-                        MAP_HANDLE propMap = IoTHubMessage_Properties(messages[i].messageHandle);
-                        sprintf_s(propText, sizeof(propText), "PropMsg_%d", i);
-                        if (Map_AddOrUpdate(propMap, "PropName", propText) != MAP_OK)
+                        sprintf_s(msgText, sizeof(msgText), "{\"deviceId\":\"myFirstDevice\",\"windSpeed\":%.2f}", avgWindSpeed + (rand() % 4 + 2));
+                        if ((messages[iterator].messageHandle = IoTHubMessage_CreateFromByteArray((const unsigned char*)msgText, strlen(msgText))) == NULL)
                         {
-                            (void)printf("ERROR: Map_AddOrUpdate Failed!\r\n");
-                        }
-
-                        if (IoTHubClient_SendEventAsync(iotHubClientHandle, messages[i].messageHandle, SendConfirmationCallback, &messages[i]) != IOTHUB_CLIENT_OK)
-                        {
-                            (void)printf("ERROR: IoTHubClient_SendEventAsync..........FAILED!\r\n");
+                            (void)printf("ERROR: iotHubMessageHandle is NULL!\r\n");
                         }
                         else
                         {
-                            (void)printf("IoTHubClient_SendEventAsync accepted data for transmission to IoT Hub.\r\n");
+                            messages[iterator].messageTrackingId = iterator;
+
+                            MAP_HANDLE propMap = IoTHubMessage_Properties(messages[iterator].messageHandle);
+                            sprintf_s(propText, sizeof(propText), "PropMsg_%d", iterator);
+                            if (Map_AddOrUpdate(propMap, "PropName", propText) != MAP_OK)
+                            {
+                                (void)printf("ERROR: Map_AddOrUpdate Failed!\r\n");
+                            }
+
+                            if (IoTHubClient_LL_SendEventAsync(iotHubClientHandle, messages[iterator].messageHandle, SendConfirmationCallback, &messages[iterator]) != IOTHUB_CLIENT_OK)
+                            {
+                                (void)printf("ERROR: IoTHubClient_SendEventAsync..........FAILED!\r\n");
+                            }
+                            else
+                            {
+                                (void)printf("IoTHubClient_SendEventAsync accepted data for transmission to IoT Hub.\r\n");
+                            }
                         }
                     }
-                }
+                    IoTHubClient_LL_DoWork(iotHubClientHandle);
+                    ThreadAPI_Sleep(1);
 
-                /* Wait for Commands. */
-                (void)printf("Press any key to exit the application. \r\n");
-                (void)getchar();
+                    iterator++;
+                } while (g_continueRunning);
             }
-
-            IoTHubClient_Destroy(iotHubClientHandle);
+            IoTHubClient_LL_Destroy(iotHubClientHandle);
         }
         platform_deinit();
     }
 }
+
+int main(void)
+{
+    iothub_client_sample_amqp_run();
+    return 0;
+}