Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
Diff: main.cpp
- Revision:
- 21:a7506c90aa84
- Parent:
- 20:49c9daf2b0ff
- Child:
- 22:826657a00c44
diff -r 49c9daf2b0ff -r a7506c90aa84 main.cpp
--- a/main.cpp Tue Jan 10 18:10:17 2017 -0600
+++ b/main.cpp Tue Mar 21 03:32:27 2017 +0000
@@ -25,48 +25,59 @@
*/
- // change this to 0 to output messages to serial instead of LCD
-#define USE_LCD 1
-
-#if USE_LCD
-#include "C12832.h"
-
-// the actual pins are defined in mbed_app.json and can be overridden per target
-C12832 lcd(LCD_MOSI, LCD_SCK, LCD_MISO, LCD_A0, LCD_NCS);
-
-#define logMessage lcd.cls();lcd.printf
-
-#else
-
-#define logMessage printf
-
-#endif
-
#define MQTTCLIENT_QOS2 1
-
+#include "mbed.h"
+#include "rtos.h"
#include "easy-connect.h"
#include "MQTTNetwork.h"
#include "MQTTmbed.h"
#include "MQTTClient.h"
+Serial pc(USBTX, USBRX, 115200);
int arrivedcount = 0;
+Thread msgSender;
+
+static MemoryPool<MQTT::Message, 16> pool;
+static Queue<MQTT::Message, 16> queue;
void messageArrived(MQTT::MessageData& md)
{
MQTT::Message &message = md.message;
- logMessage("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
- logMessage("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
+ printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
+void SendDataThread()
+{
+ unsigned int i;
+ while(true)
+ {
+ MQTT::Message * message = pool.alloc();
+ char * buff = new char[sizeof(char) * 100];
+
+ sprintf(buff, "message test %d", i);
+ message->qos = MQTT::QOS0;
+ message->retained = false;
+ message->dup = false;
+ message->payload = (void*)buff;
+ message->payloadlen = strlen(buff)+1;
+
+ // publish the message to mqtt
+ queue.put(message);
+ i++;
+
+ Thread::wait(2000);
+ }
+}
int main(int argc, char* argv[])
{
float version = 0.6;
char* topic = "mbed-sample";
- logMessage("HelloMQTT: version is %.2f\r\n", version);
+ printf("HelloMQTT: version is %.2f\r\n", version);
NetworkInterface* network = easy_connect(true);
if (!network) {
@@ -79,10 +90,10 @@
const char* hostname = "m2m.eclipse.org";
int port = 1883;
- logMessage("Connecting to %s:%d\r\n", hostname, port);
+ printf("Connecting to %s:%d\r\n", hostname, port);
int rc = mqttNetwork.connect(hostname, port);
if (rc != 0)
- logMessage("rc from TCP connect is %d\r\n", rc);
+ printf("rc from TCP connect is %d\r\n", rc);
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
@@ -90,50 +101,57 @@
data.username.cstring = "testuser";
data.password.cstring = "testpassword";
if ((rc = client.connect(data)) != 0)
- logMessage("rc from MQTT connect is %d\r\n", rc);
+ printf("rc from MQTT connect is %d\r\n", rc);
if ((rc = client.subscribe(topic, MQTT::QOS2, messageArrived)) != 0)
- logMessage("rc from MQTT subscribe is %d\r\n", rc);
-
- MQTT::Message message;
-
- // QoS 0
- char buf[100];
- sprintf(buf, "Hello World! QoS 0 message from app version %f\r\n", version);
- message.qos = MQTT::QOS0;
- message.retained = false;
- message.dup = false;
- message.payload = (void*)buf;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 1)
- client.yield(100);
+ printf("rc from MQTT subscribe is %d\r\n", rc);
- // QoS 1
- sprintf(buf, "Hello World! QoS 1 message from app version %f\r\n", version);
- message.qos = MQTT::QOS1;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 2)
- client.yield(100);
-
- // QoS 2
- sprintf(buf, "Hello World! QoS 2 message from app version %f\r\n", version);
- message.qos = MQTT::QOS2;
- message.payloadlen = strlen(buf)+1;
- rc = client.publish(topic, message);
- while (arrivedcount < 3)
- client.yield(100);
+ // Start the data source
+ msgSender.start(SendDataThread);
+
+ while(true)
+ {
+ osEvent evt = queue.get(10);
+ if (evt.status == osEventMessage) {
+ printf("Message arrived from main thread ...\r\n");
+ // Unpack the message
+ MQTT::Message * message = (MQTT::Message *)evt.value.p;
+
+ printf("Publishing message to MQTT ...\r\n");
+ // Push to mqtt
+ int rc = client.publish(topic, *message);
+ if (rc < 0)
+ printf("Error sending mqtt message \r\n");
+ else
+ printf("Message published ...\r\n");
+
+ printf("Deleting payload ...\r\n");
+ // Delete payload
+ delete [] message->payload;
+
+ printf("Deleting pool allocation ...\r\n");
+ // Don't forget this!
+ pool.free(message);
+ }
+
+ printf("MQTT client yeild ...\r\n");
+ if (client.yield(100) != MQTT::SUCCESS)
+ {
+ printf("Yield error, client disconnected? ...\r\n");
+ break;
+ }
+ printf("MQTT client yeild successful ...\r\n");
+ }
if ((rc = client.unsubscribe(topic)) != 0)
- logMessage("rc from unsubscribe was %d\r\n", rc);
+ printf("rc from unsubscribe was %d\r\n", rc);
if ((rc = client.disconnect()) != 0)
- logMessage("rc from disconnect was %d\r\n", rc);
+ printf("rc from disconnect was %d\r\n", rc);
mqttNetwork.disconnect();
- logMessage("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
+ printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
return 0;
}
