A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Files at this revision

API Documentation at this revision

Comitter:
vpcola
Date:
Tue Mar 21 03:32:27 2017 +0000
Parent:
20:49c9daf2b0ff
Child:
22:826657a00c44
Commit message:
test

Changed in this revision

C12832.lib Show diff for this revision Revisions of this file
MQTTNetwork.h Show annotated file Show diff for this revision Revisions of this file
main.cpp Show annotated file Show diff for this revision Revisions of this file
--- a/C12832.lib	Tue Jan 10 18:10:17 2017 -0600
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-https://mbed.org/teams/components/code/C12832/#03069e3deaa4
--- a/MQTTNetwork.h	Tue Jan 10 18:10:17 2017 -0600
+++ b/MQTTNetwork.h	Tue Mar 21 03:32:27 2017 +0000
@@ -5,8 +5,10 @@
 
 class MQTTNetwork {
 public:
-    MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork) {
+    MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork) 
+    {
         socket = new TCPSocket();
+        socket->set_blocking(false);
     }
 
     ~MQTTNetwork() {
@@ -14,10 +16,12 @@
     }
 
     int read(unsigned char* buffer, int len, int timeout) {
+        socket->set_timeout(timeout);
         return socket->recv(buffer, len);
     }
 
     int write(unsigned char* buffer, int len, int timeout) {
+        socket->set_timeout(timeout);
         return socket->send(buffer, len);
     }
 
--- a/main.cpp	Tue Jan 10 18:10:17 2017 -0600
+++ b/main.cpp	Tue Mar 21 03:32:27 2017 +0000
@@ -25,48 +25,59 @@
 
  */
 
- // change this to 0 to output messages to serial instead of LCD
-#define USE_LCD 1
-
-#if USE_LCD
-#include "C12832.h"
-
-// the actual pins are defined in mbed_app.json and can be overridden per target
-C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);
-
-#define logMessage lcd.cls();lcd.printf
-
-#else
-
-#define logMessage printf
-
-#endif
-
 #define MQTTCLIENT_QOS2 1
-
+#include "mbed.h"
+#include "rtos.h"
 #include "easy-connect.h"
 #include "MQTTNetwork.h"
 #include "MQTTmbed.h"
 #include "MQTTClient.h"
 
+Serial pc(USBTX, USBRX, 115200);
 int arrivedcount = 0;
 
+Thread msgSender;
+
+static MemoryPool<MQTT::Message, 16> pool;
+static Queue<MQTT::Message, 16> queue;
 
 void messageArrived(MQTT::MessageData& md)
 {
     MQTT::Message &message = md.message;
-    logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
-    logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+    printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
     ++arrivedcount;
 }
 
+void SendDataThread()
+{
+    unsigned int i;
+    while(true)
+    {
+        MQTT::Message * message = pool.alloc();
+        char * buff = new char[sizeof(char) * 100];
+        
+        sprintf(buff, "message test %d", i);
+        message->qos = MQTT::QOS0;
+        message->retained = false;
+        message->dup = false;
+        message->payload = (void*)buff;
+        message->payloadlen = strlen(buff)+1;
+                    
+        // publish the message to mqtt
+        queue.put(message);  
+        i++;
+        
+        Thread::wait(2000);         
+    }   
+}
 
 int main(int argc, char* argv[])
 {
     float version = 0.6;
     char* topic = "mbed-sample";
 
-    logMessage("HelloMQTT: version is %.2f\r\n", version);
+    printf("HelloMQTT: version is %.2f\r\n", version);
 
     NetworkInterface* network = easy_connect(true);
     if (!network) {
@@ -79,10 +90,10 @@
 
     const char* hostname = "m2m.eclipse.org";
     int port = 1883;
-    logMessage("Connecting to %s:%d\r\n", hostname, port);
+    printf("Connecting to %s:%d\r\n", hostname, port);
     int rc = mqttNetwork.connect(hostname, port);
     if (rc != 0)
-        logMessage("rc from TCP connect is %d\r\n", rc);
+        printf("rc from TCP connect is %d\r\n", rc);
 
     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
     data.MQTTVersion = 3;
@@ -90,50 +101,57 @@
     data.username.cstring = "testuser";
     data.password.cstring = "testpassword";
     if ((rc = client.connect(data)) != 0)
-        logMessage("rc from MQTT connect is %d\r\n", rc);
+        printf("rc from MQTT connect is %d\r\n", rc);
 
     if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
-        logMessage("rc from MQTT subscribe is %d\r\n", rc);
-
-    MQTT::Message message;
-
-    // QoS 0
-    char buf[100];
-    sprintf(buf, "Hello World!  QoS 0 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS0;
-    message.retained = false;
-    message.dup = false;
-    message.payload = (void*)buf;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 1)
-        client.yield(100);
+        printf("rc from MQTT subscribe is %d\r\n", rc);
 
-    // QoS 1
-    sprintf(buf, "Hello World!  QoS 1 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS1;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 2)
-        client.yield(100);
-
-    // QoS 2
-    sprintf(buf, "Hello World!  QoS 2 message from app version %f\r\n", version);
-    message.qos = MQTT::QOS2;
-    message.payloadlen = strlen(buf)+1;
-    rc = client.publish(topic, message);
-    while (arrivedcount < 3)
-        client.yield(100);
+    // Start the data source
+    msgSender.start(SendDataThread);
+    
+    while(true)
+    {
+        osEvent evt = queue.get(10);
+        if (evt.status == osEventMessage) {
+            printf("Message arrived from main thread ...\r\n");
+            // Unpack the message
+            MQTT::Message * message = (MQTT::Message *)evt.value.p;
+            
+            printf("Publishing message to MQTT ...\r\n");
+            // Push to mqtt
+            int rc = client.publish(topic, *message);
+            if (rc < 0)
+                printf("Error sending mqtt message \r\n");
+            else
+                printf("Message published ...\r\n");
+            
+            printf("Deleting payload ...\r\n");
+            // Delete payload
+            delete [] message->payload;
+            
+            printf("Deleting pool allocation ...\r\n");
+            // Don't forget this!
+            pool.free(message);
+        }
+        
+        printf("MQTT client yeild ...\r\n");
+        if (client.yield(100) != MQTT::SUCCESS)
+        {
+            printf("Yield error, client disconnected? ...\r\n");
+            break;
+        }
+        printf("MQTT client yeild successful ...\r\n");               
+    }
 
     if ((rc = client.unsubscribe(topic)) != 0)
-        logMessage("rc from unsubscribe was %d\r\n", rc);
+        printf("rc from unsubscribe was %d\r\n", rc);
 
     if ((rc = client.disconnect()) != 0)
-        logMessage("rc from disconnect was %d\r\n", rc);
+        printf("rc from disconnect was %d\r\n", rc);
 
     mqttNetwork.disconnect();
 
-    logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
+    printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
 
     return 0;
 }