A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
21:a7506c90aa84
Parent:
20:49c9daf2b0ff
Child:
22:826657a00c44
--- a/main.cpp	Tue Jan 10 18:10:17 2017 -0600
+++ b/main.cpp	Tue Mar 21 03:32:27 2017 +0000
@@ -25,48 +25,59 @@
 
  */
 
- // change this to 0 to output messages to serial instead of LCD
-#define USE_LCD 1
-
-#if USE_LCD
-#include "C12832.h"
-
-// the actual pins are defined in mbed_app.json and can be overridden per target
-C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);
-
-#define logMessage lcd.cls();lcd.printf
-
-#else
-
-#define logMessage printf
-
-#endif
-
 #define MQTTCLIENT_QOS2 1
-
+#include "mbed.h"
+#include "rtos.h"
 #include "easy-connect.h"
 #include "MQTTNetwork.h"
 #include "MQTTmbed.h"
 #include "MQTTClient.h"
 
+Serial pc(USBTX, USBRX, 115200);
 int arrivedcount = 0;
 
+Thread msgSender;
+
+static MemoryPool<MQTT::Message, 16> pool;
+static Queue<MQTT::Message, 16> queue;
 
 void messageArrived(MQTT::MessageData& md)
 {
     MQTT::Message &message = md.message;
-    logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
-    logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+    printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
     ++arrivedcount;
 }
 
+void SendDataThread()
+{
+    unsigned int i;
+    while(true)
+    {
+        MQTT::Message * message = pool.alloc();
+        char * buff = new char[sizeof(char) * 100];
+        
+        sprintf(buff, "message test %d", i);
+        message->qos = MQTT::QOS0;
+        message->retained = false;
+        message->dup = false;
+        message->payload = (void*)buff;
+        message->payloadlen = strlen(buff)+1;
+                    
+        // publish the message to mqtt
+        queue.put(message);  
+        i++;
+        
+        Thread::wait(2000);         
+    }   
+}
 
 int main(int argc, char* argv[])
 {
     float version = 0.6;
     char* topic = "mbed-sample";
 
-    logMessage("HelloMQTT: version is %.2f\r\n", version);
+    printf("HelloMQTT: version is %.2f\r\n", version);
 
     NetworkInterface* network = easy_connect(true);
     if (!network) {
@@ -79,10 +90,10 @@
 
     const char* hostname = "m2m.eclipse.org";
     int port = 1883;
-    logMessage("Connecting to %s:%d\r\n", hostname, port);
+    printf("Connecting to %s:%d\r\n", hostname, port);
     int rc = mqttNetwork.connect(hostname, port);
     if (rc != 0)
-        logMessage("rc from TCP connect is %d\r\n", rc);
+        printf("rc from TCP connect is %d\r\n", rc);
 
     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
     data.MQTTVersion = 3;
@@ -90,50 +101,57 @@
     data.username.cstring = "testuser";
     data.password.cstring = "testpassword";
     if ((rc = client.connect(data)) != 0)
-        logMessage("rc from MQTT connect is %d\r\n", rc);
+        printf("rc from MQTT connect is %d\r\n", rc);
 
     if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
-        logMessage("rc from MQTT subscribe is %d\r\n", rc);
-
-    MQTT::Message message;
-
-    // QoS 0
-    char buf[100];
-    sprintf(buf, "Hello World!  QoS 0 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS0;
-    message.retained = false;
-    message.dup = false;
-    message.payload = (void*)buf;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 1)
-        client.yield(100);
+        printf("rc from MQTT subscribe is %d\r\n", rc);
 
-    // QoS 1
-    sprintf(buf, "Hello World!  QoS 1 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS1;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 2)
-        client.yield(100);
-
-    // QoS 2
-    sprintf(buf, "Hello World!  QoS 2 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS2;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 3)
-        client.yield(100);
+    // Start the data source
+    msgSender.start(SendDataThread);
+    
+    while(true)
+    {
+        osEvent evt = queue.get(10);
+        if (evt.status == osEventMessage) {
+            printf("Message arrived from main thread ...\r\n");
+            // Unpack the message
+            MQTT::Message * message = (MQTT::Message *)evt.value.p;
+            
+            printf("Publishing message to MQTT ...\r\n");
+            // Push to mqtt
+            int rc = client.publish(topic, *message);
+            if (rc < 0)
+                printf("Error sending mqtt message \r\n");
+            else
+                printf("Message published ...\r\n");
+            
+            printf("Deleting payload ...\r\n");
+            // Delete payload
+            delete [] message->payload;
+            
+            printf("Deleting pool allocation ...\r\n");
+            // Don't forget this!
+            pool.free(message);
+        }
+        
+        printf("MQTT client yeild ...\r\n");
+        if (client.yield(100) != MQTT::SUCCESS)
+        {
+            printf("Yield error, client disconnected? ...\r\n");
+            break;
+        }
+        printf("MQTT client yeild successful ...\r\n");               
+    }
 
     if ((rc = client.unsubscribe(topic)) != 0)
-        logMessage("rc from unsubscribe was %d\r\n", rc);
+        printf("rc from unsubscribe was %d\r\n", rc);
 
     if ((rc = client.disconnect()) != 0)
-        logMessage("rc from disconnect was %d\r\n", rc);
+        printf("rc from disconnect was %d\r\n", rc);
 
     mqttNetwork.disconnect();
 
-    logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
+    printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
 
     return 0;
 }