A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Revision 21:a7506c90aa84, committed 2017-03-21
- Comitter:
- vpcola
- Date:
- Tue Mar 21 03:32:27 2017 +0000
- Parent:
- 20:49c9daf2b0ff
- Child:
- 22:826657a00c44
- Commit message:
- test
Changed in this revision
--- a/C12832.lib Tue Jan 10 18:10:17 2017 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -https://mbed.org/teams/components/code/C12832/#03069e3deaa4
--- a/MQTTNetwork.h Tue Jan 10 18:10:17 2017 -0600
+++ b/MQTTNetwork.h Tue Mar 21 03:32:27 2017 +0000
@@ -5,8 +5,10 @@
class MQTTNetwork {
public:
- MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork) {
+ MQTTNetwork(NetworkInterface* aNetwork) : network(aNetwork)
+ {
socket = new TCPSocket();
+ socket->set_blocking(false);
}
~MQTTNetwork() {
@@ -14,10 +16,12 @@
}
int read(unsigned char* buffer, int len, int timeout) {
+ socket->set_timeout(timeout);
return socket->recv(buffer, len);
}
int write(unsigned char* buffer, int len, int timeout) {
+ socket->set_timeout(timeout);
return socket->send(buffer, len);
}
--- a/main.cpp Tue Jan 10 18:10:17 2017 -0600
+++ b/main.cpp Tue Mar 21 03:32:27 2017 +0000
@@ -25,48 +25,59 @@
*/
- // change this to 0 to output messages to serial instead of LCD
-#define USE_LCD 1
-
-#if USE_LCD
-#include "C12832.h"
-
-// the actual pins are defined in mbed_app.json and can be overridden per target
-C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);
-
-#define logMessage lcd.cls();lcd.printf
-
-#else
-
-#define logMessage printf
-
-#endif
-
#define MQTTCLIENT_QOS2 1
-
+#include "mbed.h"
+#include "rtos.h"
#include "easy-connect.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"
+Serial pc(USBTX, USBRX, 115200);
int arrivedcount = 0;
+Thread msgSender;
+
+static MemoryPool<MQTT::Message, 16> pool;
+static Queue<MQTT::Message, 16> queue;
void messageArrived(MQTT::MessageData& md)
{
MQTT::Message &message = md.message;
- logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
- logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+ printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
+void SendDataThread()
+{
+ unsigned int i;
+ while(true)
+ {
+ MQTT::Message * message = pool.alloc();
+ char * buff = new char[sizeof(char) * 100];
+
+ sprintf(buff, "message test %d", i);
+ message->qos = MQTT::QOS0;
+ message->retained = false;
+ message->dup = false;
+ message->payload = (void*)buff;
+ message->payloadlen = strlen(buff)+1;
+
+ // publish the message to mqtt
+ queue.put(message);
+ i++;
+
+ Thread::wait(2000);
+ }
+}
int main(int argc, char* argv[])
{
float version = 0.6;
char* topic = "mbed-sample";
- logMessage("HelloMQTT: version is %.2f\r\n", version);
+ printf("HelloMQTT: version is %.2f\r\n", version);
NetworkInterface* network = easy_connect(true);
if (!network) {
@@ -79,10 +90,10 @@
const char* hostname = "m2m.eclipse.org";
int port = 1883;
- logMessage("Connecting to %s:%d\r\n", hostname, port);
+ printf("Connecting to %s:%d\r\n", hostname, port);
int rc = mqttNetwork.connect(hostname, port);
if (rc != 0)
- logMessage("rc from TCP connect is %d\r\n", rc);
+ printf("rc from TCP connect is %d\r\n", rc);
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
@@ -90,50 +101,57 @@
data.username.cstring = "testuser";
data.password.cstring = "testpassword";
if ((rc = client.connect(data)) != 0)
- logMessage("rc from MQTT connect is %d\r\n", rc);
+ printf("rc from MQTT connect is %d\r\n", rc);
if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
- logMessage("rc from MQTT subscribe is %d\r\n", rc);
-
- MQTT::Message message;
-
- // QoS 0
- char buf[100];
- sprintf(buf, "Hello World! QoS 0 message from app version %f\r\n", version);
- message.qos = MQTT::QOS0;
- message.retained = false;
- message.dup = false;
- message.payload = (void*)buf;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 1)
- client.yield(100);
+ printf("rc from MQTT subscribe is %d\r\n", rc);
- // QoS 1
- sprintf(buf, "Hello World! QoS 1 message from app version %f\r\n", version);
- message.qos = MQTT::QOS1;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 2)
- client.yield(100);
-
- // QoS 2
- sprintf(buf, "Hello World! QoS 2 message from app version %f\r\n", version);
- message.qos = MQTT::QOS2;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 3)
- client.yield(100);
+ // Start the data source
+ msgSender.start(SendDataThread);
+
+ while(true)
+ {
+ osEvent evt = queue.get(10);
+ if (evt.status == osEventMessage) {
+ printf("Message arrived from main thread ...\r\n");
+ // Unpack the message
+ MQTT::Message * message = (MQTT::Message *)evt.value.p;
+
+ printf("Publishing message to MQTT ...\r\n");
+ // Push to mqtt
+ int rc = client.publish(topic, *message);
+ if (rc < 0)
+ printf("Error sending mqtt message \r\n");
+ else
+ printf("Message published ...\r\n");
+
+ printf("Deleting payload ...\r\n");
+ // Delete payload
+ delete [] message->payload;
+
+ printf("Deleting pool allocation ...\r\n");
+ // Don't forget this!
+ pool.free(message);
+ }
+
+ printf("MQTT client yeild ...\r\n");
+ if (client.yield(100) != MQTT::SUCCESS)
+ {
+ printf("Yield error, client disconnected? ...\r\n");
+ break;
+ }
+ printf("MQTT client yeild successful ...\r\n");
+ }
if ((rc = client.unsubscribe(topic)) != 0)
- logMessage("rc from unsubscribe was %d\r\n", rc);
+ printf("rc from unsubscribe was %d\r\n", rc);
if ((rc = client.disconnect()) != 0)
- logMessage("rc from disconnect was %d\r\n", rc);
+ printf("rc from disconnect was %d\r\n", rc);
mqttNetwork.disconnect();
- logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
+ printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
return 0;
}
