A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: main.cpp
- Revision:
- 23:06fac173529e
- Parent:
- 22:826657a00c44
- Child:
- 25:326f00faa092
--- a/main.cpp Tue Mar 21 12:57:07 2017 +0000
+++ b/main.cpp Sun Mar 26 04:35:46 2017 +0000
@@ -29,54 +29,53 @@
#include "mbed.h"
#include "rtos.h"
#include "easy-connect.h"
-#include "MQTTLogging.h"
-#include "MQTTNetwork.h"
-#include "MQTTmbed.h"
-#include "MQTTClient.h"
+#include "MQTTThreadedClient.h"
+
Serial pc(USBTX, USBRX, 115200);
+Thread msgSender;
+
+static const char * clientID = "mbed-sample";
+static const char * userID = "mbedhacks";
+static const char * password = "qwer123";
+static const char * topic_1 = "mbed-sample";
+static const char * topic_2 = "test";
+
int arrivedcount = 0;
-Thread msgSender;
-
-static MemoryPool<MQTT::Message, 16> pool;
-static Queue<MQTT::Message, 16> queue;
-
-void messageArrived(MQTT::MessageData& md)
+void messageArrived(MessageData& md)
{
- MQTT::Message &message = md.message;
- printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ Message &message = md.message;
+ printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
-void SendDataThread()
+class CallbackTest
{
- unsigned int i;
- while(true)
+ public:
+
+ CallbackTest()
+ : arrivedcount(0)
+ {}
+
+ void messageArrived(MessageData& md)
{
- MQTT::Message * message = pool.alloc();
- char * buff = new char[sizeof(char) * 100];
-
- sprintf(buff, "message test %d", i);
- message->qos = MQTT::QOS0;
- message->retained = false;
- message->dup = false;
- message->payload = (void*)buff;
- message->payloadlen = strlen(buff)+1;
-
- // publish the message to mqtt
- queue.put(message);
- i++;
-
- Thread::wait(2000);
- }
-}
+ Message &message = md.message;
+ printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
+ ++arrivedcount;
+ }
+
+ private:
+
+ int arrivedcount;
+};
int main(int argc, char* argv[])
{
float version = 0.6;
- char* topic = "mbed-sample";
+ CallbackTest testcb;
printf("HelloMQTT: version is %.2f\r\n", version);
@@ -85,74 +84,49 @@
return -1;
}
- MQTTNetwork mqttNetwork(network);
+ MQTTThreadedClient mqtt(network);
- MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork);
-
- const char* hostname = "m2m.eclipse.org";
+ const char* hostname = "mqtt.mbedhacks.com";
+ // const char* hostname = "192.168.0.7";
int port = 1883;
printf("Connecting to %s:%d\r\n", hostname, port);
- int rc = mqttNetwork.connect(hostname, port);
- if (rc != 0)
- printf("rc from TCP connect is %d\r\n", rc);
+
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
- data.clientID.cstring = "mbed-sample";
- data.username.cstring = "testuser";
- data.password.cstring = "testpassword";
- if ((rc = client.connect(data)) != 0)
- printf("rc from MQTT connect is %d\r\n", rc);
+ data.clientID.cstring = (char *) clientID;
+ data.username.cstring = (char *) userID;
+ data.password.cstring = (char *) password;
+ data.keepAliveInterval = 100; // default is 60
- if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
- printf("rc from MQTT subscribe is %d\r\n", rc);
+ int rc = mqtt.connect(hostname, port, data);
+ if (rc != 0)
+ printf("rc from TCP connect is %d\r\n", rc);
+
+ if ((rc = mqtt.subscribe(topic_1, QOS0, messageArrived)) != 0)
+ printf("rc from MQTT subscribe 1 is %d\r\n", rc);
+
+ if ((rc = mqtt.subscribe(topic_2, QOS0, &testcb, &CallbackTest::messageArrived)) != 0)
+ printf("rc from MQTT subscribe 2 is %d\r\b", rc);
// Start the data producer
- msgSender.start(SendDataThread);
+ msgSender.start(mbed::callback(&mqtt, &MQTTThreadedClient::startListener));
+ int i = 0;
while(true)
{
- osEvent evt = queue.get(10);
- if (evt.status == osEventMessage) {
- //printf("Message arrived from main thread ...\r\n");
- // Unpack the message
- MQTT::Message * message = (MQTT::Message *)evt.value.p;
-
- printf("Publishing message to MQTT ...\r\n");
- // Push to mqtt
- rc = client.publish(topic, *message);
- if (rc < 0)
- printf("Error sending mqtt message rc = %d\r\n", rc);
- else
- printf("Message published ...\r\n");
-
- //printf("Deleting payload ...\r\n");
- // Delete payload
- delete [] message->payload;
-
- //printf("Deleting pool allocation ...\r\n");
- // Don't forget this!
- pool.free(message);
- }
+ PubMessage message;
+ message.qos = QOS0;
+ message.id = 123;
- //printf("MQTT client yield ...\r\n");
- if (client.yield(100) != MQTT::SUCCESS)
- {
- printf("Yield error, client disconnected? ...\r\n");
- break;
- }
- //printf("MQTT client yield successful ...\r\n");
+ strcpy(&message.topic[0], topic_1);
+ sprintf(&message.payload[0], "Testing %d", i);
+ message.payloadlen = strlen((const char *) &message.payload[0]);
+ mqtt.publish(message);
+
+ i++;
+ //TODO: Nothing here yet ...
+ Thread::wait(1000);
}
- if ((rc = client.unsubscribe(topic)) != 0)
- printf("rc from unsubscribe was %d\r\n", rc);
-
- if ((rc = client.disconnect()) != 0)
- printf("rc from disconnect was %d\r\n", rc);
-
- mqttNetwork.disconnect();
-
- printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
-
- return 0;
}
