A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Revision:
23:06fac173529e
Parent:
22:826657a00c44
Child:
25:326f00faa092
--- a/main.cpp	Tue Mar 21 12:57:07 2017 +0000
+++ b/main.cpp	Sun Mar 26 04:35:46 2017 +0000
@@ -29,54 +29,53 @@
 #include "mbed.h"
 #include "rtos.h"
 #include "easy-connect.h"
-#include "MQTTLogging.h"
-#include "MQTTNetwork.h"
-#include "MQTTmbed.h"
-#include "MQTTClient.h"
+#include "MQTTThreadedClient.h"
+
 
 Serial pc(USBTX, USBRX, 115200);
+Thread msgSender;
+
+static const char * clientID = "mbed-sample";
+static const char * userID = "mbedhacks";
+static const char * password = "qwer123";
+static const char * topic_1 = "mbed-sample";
+static const char * topic_2 = "test";
+
 int arrivedcount = 0;
 
-Thread msgSender;
-
-static MemoryPool<MQTT::Message, 16> pool;
-static Queue<MQTT::Message, 16> queue;
-
-void messageArrived(MQTT::MessageData& md)
+void messageArrived(MessageData& md)
 {
-    MQTT::Message &message = md.message;
-    printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+    Message &message = md.message;
+    printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
     printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
     ++arrivedcount;
 }
 
-void SendDataThread()
+class CallbackTest
 {
-    unsigned int i;
-    while(true)
+    public:
+    
+    CallbackTest()
+        : arrivedcount(0)
+    {}
+    
+    void messageArrived(MessageData& md)
     {
-        MQTT::Message * message = pool.alloc();
-        char * buff = new char[sizeof(char) * 100];
-        
-        sprintf(buff, "message test %d", i);
-        message->qos = MQTT::QOS0;
-        message->retained = false;
-        message->dup = false;
-        message->payload = (void*)buff;
-        message->payloadlen = strlen(buff)+1;
-                    
-        // publish the message to mqtt
-        queue.put(message);  
-        i++;
-        
-        Thread::wait(2000);         
-    }   
-}
+        Message &message = md.message;
+        printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+        printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
+        ++arrivedcount;
+    }
+    
+    private:
+    
+    int arrivedcount;
+};
 
 int main(int argc, char* argv[])
 {
     float version = 0.6;
-    char* topic = "mbed-sample";
+    CallbackTest testcb;
 
     printf("HelloMQTT: version is %.2f\r\n", version);
 
@@ -85,74 +84,49 @@
         return -1;
     }
 
-    MQTTNetwork mqttNetwork(network);
+    MQTTThreadedClient mqtt(network);
 
-    MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork);
-
-    const char* hostname = "m2m.eclipse.org";
+    const char* hostname = "mqtt.mbedhacks.com";
+    // const char* hostname = "192.168.0.7";    
     int port = 1883;
     printf("Connecting to %s:%d\r\n", hostname, port);
-    int rc = mqttNetwork.connect(hostname, port);
-    if (rc != 0)
-        printf("rc from TCP connect is %d\r\n", rc);
+
 
     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
     data.MQTTVersion = 3;
-    data.clientID.cstring = "mbed-sample";
-    data.username.cstring = "testuser";
-    data.password.cstring = "testpassword";
-    if ((rc = client.connect(data)) != 0)
-        printf("rc from MQTT connect is %d\r\n", rc);
+    data.clientID.cstring = (char *) clientID;
+    data.username.cstring = (char *) userID;
+    data.password.cstring = (char *) password;
+    data.keepAliveInterval = 100; // default is 60
 
-    if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
-        printf("rc from MQTT subscribe is %d\r\n", rc);
+    int rc = mqtt.connect(hostname, port, data);
+    if (rc != 0)
+        printf("rc from TCP connect is %d\r\n", rc);
+        
+    if ((rc = mqtt.subscribe(topic_1, QOS0, messageArrived)) != 0)
+        printf("rc from MQTT subscribe 1 is %d\r\n", rc);
+        
+    if ((rc = mqtt.subscribe(topic_2, QOS0, &testcb, &CallbackTest::messageArrived)) != 0)
+        printf("rc from MQTT subscribe 2 is %d\r\b", rc);
 
     // Start the data producer
-    msgSender.start(SendDataThread);
+    msgSender.start(mbed::callback(&mqtt, &MQTTThreadedClient::startListener));
     
+    int i = 0;
     while(true)
     {
-        osEvent evt = queue.get(10);
-        if (evt.status == osEventMessage) {
-            //printf("Message arrived from main thread ...\r\n");
-            // Unpack the message
-            MQTT::Message * message = (MQTT::Message *)evt.value.p;
-            
-            printf("Publishing message to MQTT ...\r\n");
-            // Push to mqtt
-            rc = client.publish(topic, *message);
-            if (rc < 0)
-                printf("Error sending mqtt message rc = %d\r\n", rc);
-            else
-                printf("Message published ...\r\n");
-            
-            //printf("Deleting payload ...\r\n");
-            // Delete payload
-            delete [] message->payload;
-            
-            //printf("Deleting pool allocation ...\r\n");
-            // Don't forget this!
-            pool.free(message);
-        }
+        PubMessage message;
+        message.qos = QOS0;
+        message.id = 123;
         
-        //printf("MQTT client yield ...\r\n");
-        if (client.yield(100) != MQTT::SUCCESS)
-        {
-            printf("Yield error, client disconnected? ...\r\n");
-            break;
-        }
-        //printf("MQTT client yield successful ...\r\n");               
+        strcpy(&message.topic[0], topic_1);
+        sprintf(&message.payload[0], "Testing %d", i);
+        message.payloadlen = strlen((const char *) &message.payload[0]);
+        mqtt.publish(message);
+        
+        i++;
+        //TODO: Nothing here yet ...
+        Thread::wait(1000);
     }
 
-    if ((rc = client.unsubscribe(topic)) != 0)
-        printf("rc from unsubscribe was %d\r\n", rc);
-
-    if ((rc = client.disconnect()) != 0)
-        printf("rc from disconnect was %d\r\n", rc);
-
-    mqttNetwork.disconnect();
-
-    printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
-
-    return 0;
 }