Sample MQTT program - simple send and receive

Dependencies:   C12832 MQTT

Fork of HelloMQTT by MQTT

Revision:
2:638c854c0695
Parent:
1:a1d5c7a6acbc
Child:
3:7a6a899de7cc
--- a/main.cpp	Wed Feb 05 10:50:15 2014 +0000
+++ b/main.cpp	Wed Apr 30 13:16:09 2014 +0000
@@ -13,77 +13,198 @@
  * Contributors:
  *    Ian Craggs - initial API and implementation and/or initial documentation
  *******************************************************************************/
+ 
+ /**
+  This is a sample program to illustrate the use of the MQTT Client library
+  on the mbed platform.  The Client class requires two classes which mediate
+  access to system interfaces for networking and timing.  As long as these two
+  classes provide the required public programming interfaces, it does not matter
+  what facilities they use underneath. In this program, they use the mbed
+  system libraries.
+ 
+ */
 
 #include "mbed.h"
 #include "EthernetInterface.h"
+
 #include "C12832_lcd.h"
-
-#include "MQTTPacket.h"
-
-DigitalOut myled(LED2);
 C12832_LCD lcd;
 
-int publish()
+#include "FP.cpp"
+#include "MQTTClient.h"
+
+
+
+class IPStack 
 {
-    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
-    int rc = 0;
-    char buf[200];
-    int buflen = sizeof(buf);
+public:    
+    IPStack()
+    {
+        eth.init();                          // Use DHCP
+        eth.connect();
+        mysock.set_blocking(false, 1000);    // 1 second Timeout 
+    }
+    
+    int connect(char* hostname, int port)
+    {
+        return mysock.connect(hostname, port);
+    }
+
+    int read(char* buffer, int len, int timeout)
+    {
+        mysock.set_blocking(false, timeout);  
+        return mysock.receive(buffer, len);
+    }
+    
+    int write(char* buffer, int len, int timeout)
+    {
+        mysock.set_blocking(false, timeout);  
+        return mysock.send(buffer, len);
+    }
+    
+    int disconnect()
+    {
+        return mysock.close();
+    }
+    
+private:
+
+    EthernetInterface eth;
     TCPSocketConnection mysock; 
-    MQTTString topicString = MQTTString_initializer;
-    char* payload = "I'm alive!";
-    int payloadlen = strlen(payload);
-    int len = 0;
+    
+};
+
+
+class Countdown
+{
+public:
+    Countdown()
+    {
+        t = Timer();   
+    }
+    
+    Countdown(int ms)
+    {
+        t = Timer();
+        countdown_ms(ms);   
+    }
+    
     
-    mysock.connect("m2m.eclipse.org", 1883);
-          
-    data.clientID.cstring = "mbed test client - Ian Craggs";
-    data.keepAliveInterval = 20;
-    data.cleansession = 1;
-    data.MQTTVersion = 3;
-
-    len = MQTTSerialize_connect(buf, buflen, &data);
+    bool expired()
+    {
+        return t.read_ms() >= interval_end_ms;
+    }
+    
+    void countdown_ms(int ms)  
+    {
+        t.stop();
+        interval_end_ms = ms;
+        t.reset();
+        t.start();
+    }
+    
+    void countdown(int seconds)
+    {
+        countdown_ms(seconds * 1000);
+    }
+    
+    int left_ms()
+    {
+        return interval_end_ms - t.read_ms();
+    }
+    
+private:
+    Timer t;
+    int interval_end_ms; 
+};
 
-    topicString.cstring = "mbed NXP LPC1768";
-    len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen);
+int arrivedcount = 0;
+
+void messageArrived(MQTT::Message* message)
+{
+    lcd.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id);
+    lcd.printf("Payload %.*s\n", message->payloadlen, (char*)message->payload);
+    ++arrivedcount;
+}
 
-    len += MQTTSerialize_disconnect(buf + len, buflen - len);
+int connect(MQTT::Client<IPStack, Countdown>::connectionLostInfo* info)
+{
+    char* hostname = "m2m.eclipse.org";
+    int port = 1883;
+    lcd.printf("Connecting to %s:%d\n", hostname, port);
+    int rc = info->network->connect(hostname, port);
+    lcd.printf("rc from TCP connect is %d\n", rc);
+ 
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
+    data.MQTTVersion = 3;
+    data.clientID.cstring = "mbed-icraggs";
+    rc = info->client->connect(&data);
+    lcd.printf("rc from MQTT connect is %d\n", rc);
+    
+    return rc;
+}
+
+
+int main(int argc, char* argv[])
+{   
+    IPStack ipstack = IPStack();
+    float version = 0.3;
+    char* topic = "mbed-sample";
+    
+    lcd.printf("Version is %f\n", version);
+              
+    MQTT::Client<IPStack, Countdown>client = MQTT::Client<IPStack, Countdown>(&ipstack);
 
-    rc = 0;
-    while (rc < len)
-    {
-        int rc1 = mysock.send(buf, len);
-        if (rc1 == -1)
-        {
-            lcd.printf("Send failed\n");
-            break;
-        }
-        else
-            rc += rc1;
-    }
-    if (rc == len)
-        lcd.printf("Send succeeded\n");
-    wait(0.2);
+    MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack};
+    int rc = connect(&info);
+    
+    client.setConnectionLostHandler(connect);
+    
+    rc = client.subscribe(topic, MQTT::QOS1, messageArrived);   
+    if (rc != 0)
+        lcd.printf("rc from MQTT subscribe is %d\n", rc);
+
+    MQTT::Message message;
 
+    // QoS 0
+    char buf[100];
+    sprintf(buf, "Hello World!  QoS 0 message from app version %f\n", version);
+    message.qos = MQTT::QOS0;
+    message.retained = false;
+    message.dup = false;
+    message.payload = (void*)buf;
+    message.payloadlen = strlen(buf)+1;
+    rc = client.publish(topic, &message);
+    while (arrivedcount == 0)
+        client.yield(100);
+        
+    // QoS 1
+    sprintf(buf, "Hello World!  QoS 1 message from app version %f\n", version);
+    message.qos = MQTT::QOS1;
+    message.payloadlen = strlen(buf)+1;
+    rc = client.publish(topic, &message);
+    while (arrivedcount == 1)
+        client.yield(100);
+        
+    // QoS 2
+    sprintf(buf, "Hello World!  QoS 2 message from app version %f\n", version);
+    message.qos = MQTT::QOS2;
+    message.payloadlen = strlen(buf)+1;
+    rc = client.publish(topic, &message);
+    while (arrivedcount == 2)
+        client.yield(100);
+    
+    rc = client.unsubscribe(topic);
+    if (rc != 0)
+        lcd.printf("rc from unsubscribe was %d\n", rc);
+    
+    rc = client.disconnect();
+    if (rc != 0)
+        lcd.printf("rc from disconnect was %d\n", rc);
+    
+    ipstack.disconnect();
+    
+    lcd.printf("Finishing with %d messages received\n", arrivedcount);
+    
     return 0;
 }
-
-int main()
-{
-    EthernetInterface eth;
-    eth.init(); //Use DHCP
-    eth.connect();
-    lcd.printf("IP Address is %s\n", eth.getIPAddress());
-    
-    while(1) 
-    {
-        myled = 1;
-        publish();
-        wait(0.2);
-        myled = 0;
-        publish();
-        wait(0.2);
-    }
-    
-    eth.disconnect();
-}