Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
Diff: main.cpp
- Revision:
- 23:06fac173529e
- Parent:
- 22:826657a00c44
- Child:
- 25:326f00faa092
diff -r 826657a00c44 -r 06fac173529e main.cpp
--- a/main.cpp Tue Mar 21 12:57:07 2017 +0000
+++ b/main.cpp Sun Mar 26 04:35:46 2017 +0000
@@ -29,54 +29,53 @@
#include "mbed.h"
#include "rtos.h"
#include "easy-connect.h"
-#include "MQTTLogging.h"
-#include "MQTTNetwork.h"
-#include "MQTTmbed.h"
-#include "MQTTClient.h"
+#include "MQTTThreadedClient.h"
+
Serial pc(USBTX, USBRX, 115200);
+Thread msgSender;
+
+static const char * clientID = "mbed-sample";
+static const char * userID = "mbedhacks";
+static const char * password = "qwer123";
+static const char * topic_1 = "mbed-sample";
+static const char * topic_2 = "test";
+
int arrivedcount = 0;
-Thread msgSender;
-
-static MemoryPool<MQTT::Message, 16> pool;
-static Queue<MQTT::Message, 16> queue;
-
-void messageArrived(MQTT::MessageData& md)
+void messageArrived(MessageData& md)
{
- MQTT::Message &message = md.message;
- printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ Message &message = md.message;
+ printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
-void SendDataThread()
+class CallbackTest
{
- unsigned int i;
- while(true)
+ public:
+
+ CallbackTest()
+ : arrivedcount(0)
+ {}
+
+ void messageArrived(MessageData& md)
{
- MQTT::Message * message = pool.alloc();
- char * buff = new char[sizeof(char) * 100];
-
- sprintf(buff, "message test %d", i);
- message->qos = MQTT::QOS0;
- message->retained = false;
- message->dup = false;
- message->payload = (void*)buff;
- message->payloadlen = strlen(buff)+1;
-
- // publish the message to mqtt
- queue.put(message);
- i++;
-
- Thread::wait(2000);
- }
-}
+ Message &message = md.message;
+ printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
+ printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
+ ++arrivedcount;
+ }
+
+ private:
+
+ int arrivedcount;
+};
int main(int argc, char* argv[])
{
float version = 0.6;
- char* topic = "mbed-sample";
+ CallbackTest testcb;
printf("HelloMQTT: version is %.2f\r\n", version);
@@ -85,74 +84,49 @@
return -1;
}
- MQTTNetwork mqttNetwork(network);
+ MQTTThreadedClient mqtt(network);
- MQTT::Client<MQTTNetwork, Countdown> client = MQTT::Client<MQTTNetwork, Countdown>(mqttNetwork);
-
- const char* hostname = "m2m.eclipse.org";
+ const char* hostname = "mqtt.mbedhacks.com";
+ // const char* hostname = "192.168.0.7";
int port = 1883;
printf("Connecting to %s:%d\r\n", hostname, port);
- int rc = mqttNetwork.connect(hostname, port);
- if (rc != 0)
- printf("rc from TCP connect is %d\r\n", rc);
+
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
- data.clientID.cstring = "mbed-sample";
- data.username.cstring = "testuser";
- data.password.cstring = "testpassword";
- if ((rc = client.connect(data)) != 0)
- printf("rc from MQTT connect is %d\r\n", rc);
+ data.clientID.cstring = (char *) clientID;
+ data.username.cstring = (char *) userID;
+ data.password.cstring = (char *) password;
+ data.keepAliveInterval = 100; // default is 60
- if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
- printf("rc from MQTT subscribe is %d\r\n", rc);
+ int rc = mqtt.connect(hostname, port, data);
+ if (rc != 0)
+ printf("rc from TCP connect is %d\r\n", rc);
+
+ if ((rc = mqtt.subscribe(topic_1, QOS0, messageArrived)) != 0)
+ printf("rc from MQTT subscribe 1 is %d\r\n", rc);
+
+ if ((rc = mqtt.subscribe(topic_2, QOS0, &testcb, &CallbackTest::messageArrived)) != 0)
+ printf("rc from MQTT subscribe 2 is %d\r\b", rc);
// Start the data producer
- msgSender.start(SendDataThread);
+ msgSender.start(mbed::callback(&mqtt, &MQTTThreadedClient::startListener));
+ int i = 0;
while(true)
{
- osEvent evt = queue.get(10);
- if (evt.status == osEventMessage) {
- //printf("Message arrived from main thread ...\r\n");
- // Unpack the message
- MQTT::Message * message = (MQTT::Message *)evt.value.p;
-
- printf("Publishing message to MQTT ...\r\n");
- // Push to mqtt
- rc = client.publish(topic, *message);
- if (rc < 0)
- printf("Error sending mqtt message rc = %d\r\n", rc);
- else
- printf("Message published ...\r\n");
-
- //printf("Deleting payload ...\r\n");
- // Delete payload
- delete [] message->payload;
-
- //printf("Deleting pool allocation ...\r\n");
- // Don't forget this!
- pool.free(message);
- }
+ PubMessage message;
+ message.qos = QOS0;
+ message.id = 123;
- //printf("MQTT client yield ...\r\n");
- if (client.yield(100) != MQTT::SUCCESS)
- {
- printf("Yield error, client disconnected? ...\r\n");
- break;
- }
- //printf("MQTT client yield successful ...\r\n");
+ strcpy(&message.topic[0], topic_1);
+ sprintf(&message.payload[0], "Testing %d", i);
+ message.payloadlen = strlen((const char *) &message.payload[0]);
+ mqtt.publish(message);
+
+ i++;
+ //TODO: Nothing here yet ...
+ Thread::wait(1000);
}
- if ((rc = client.unsubscribe(topic)) != 0)
- printf("rc from unsubscribe was %d\r\n", rc);
-
- if ((rc = client.disconnect()) != 0)
- printf("rc from disconnect was %d\r\n", rc);
-
- mqttNetwork.disconnect();
-
- printf("Version %.2f: finish %d msgs\r\n", version, arrivedcount);
-
- return 0;
}
