Yield function call with timeout works, there was an issue with blockant socket.
Fork of HelloMQTT by
Revision 2:638c854c0695, committed 2014-04-30
- Comitter:
- icraggs
- Date:
- Wed Apr 30 13:16:09 2014 +0000
- Parent:
- 1:a1d5c7a6acbc
- Child:
- 3:7a6a899de7cc
- Commit message:
- Update to the latest level of code
Changed in this revision
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTT.lib Wed Apr 30 13:16:09 2014 +0000 @@ -0,0 +1,1 @@ +http://mbed.org/teams/mqtt/code/MQTT/#aadb79d29330
--- a/MQTTClient.lib Wed Feb 05 10:50:15 2014 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -http://mbed.org/users/icraggs/code/MQTTClient/#7734401cc1b4
--- a/main.cpp Wed Feb 05 10:50:15 2014 +0000 +++ b/main.cpp Wed Apr 30 13:16:09 2014 +0000 @@ -13,77 +13,198 @@ * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ + + /** + This is a sample program to illustrate the use of the MQTT Client library + on the mbed platform. The Client class requires two classes which mediate + access to system interfaces for networking and timing. As long as these two + classes provide the required public programming interfaces, it does not matter + what facilities they use underneath. In this program, they use the mbed + system libraries. + + */ #include "mbed.h" #include "EthernetInterface.h" + #include "C12832_lcd.h" - -#include "MQTTPacket.h" - -DigitalOut myled(LED2); C12832_LCD lcd; -int publish() +#include "FP.cpp" +#include "MQTTClient.h" + + + +class IPStack { - MQTTPacket_connectData data = MQTTPacket_connectData_initializer; - int rc = 0; - char buf[200]; - int buflen = sizeof(buf); +public: + IPStack() + { + eth.init(); // Use DHCP + eth.connect(); + mysock.set_blocking(false, 1000); // 1 second Timeout + } + + int connect(char* hostname, int port) + { + return mysock.connect(hostname, port); + } + + int read(char* buffer, int len, int timeout) + { + mysock.set_blocking(false, timeout); + return mysock.receive(buffer, len); + } + + int write(char* buffer, int len, int timeout) + { + mysock.set_blocking(false, timeout); + return mysock.send(buffer, len); + } + + int disconnect() + { + return mysock.close(); + } + +private: + + EthernetInterface eth; TCPSocketConnection mysock; - MQTTString topicString = MQTTString_initializer; - char* payload = "I'm alive!"; - int payloadlen = strlen(payload); - int len = 0; + +}; + + +class Countdown +{ +public: + Countdown() + { + t = Timer(); + } + + Countdown(int ms) + { + t = Timer(); + countdown_ms(ms); + } + - mysock.connect("m2m.eclipse.org", 1883); - - data.clientID.cstring = "mbed test client - Ian Craggs"; - data.keepAliveInterval = 20; - data.cleansession = 1; - data.MQTTVersion = 3; - - len = MQTTSerialize_connect(buf, buflen, &data); + bool expired() + { + return t.read_ms() >= interval_end_ms; + } + + void countdown_ms(int ms) + { + t.stop(); + interval_end_ms = ms; + t.reset(); + t.start(); + } + + void countdown(int seconds) + { + countdown_ms(seconds * 1000); + } + + int left_ms() + { + return interval_end_ms - t.read_ms(); + } + +private: + Timer t; + int interval_end_ms; +}; - topicString.cstring = "mbed NXP LPC1768"; - len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen); +int arrivedcount = 0; + +void messageArrived(MQTT::Message* message) +{ + lcd.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id); + lcd.printf("Payload %.*s\n", message->payloadlen, (char*)message->payload); + ++arrivedcount; +} - len += MQTTSerialize_disconnect(buf + len, buflen - len); +int connect(MQTT::Client<IPStack, Countdown>::connectionLostInfo* info) +{ + char* hostname = "m2m.eclipse.org"; + int port = 1883; + lcd.printf("Connecting to %s:%d\n", hostname, port); + int rc = info->network->connect(hostname, port); + lcd.printf("rc from TCP connect is %d\n", rc); + + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; + data.MQTTVersion = 3; + data.clientID.cstring = "mbed-icraggs"; + rc = info->client->connect(&data); + lcd.printf("rc from MQTT connect is %d\n", rc); + + return rc; +} + + +int main(int argc, char* argv[]) +{ + IPStack ipstack = IPStack(); + float version = 0.3; + char* topic = "mbed-sample"; + + lcd.printf("Version is %f\n", version); + + MQTT::Client<IPStack, Countdown>client = MQTT::Client<IPStack, Countdown>(&ipstack); - rc = 0; - while (rc < len) - { - int rc1 = mysock.send(buf, len); - if (rc1 == -1) - { - lcd.printf("Send failed\n"); - break; - } - else - rc += rc1; - } - if (rc == len) - lcd.printf("Send succeeded\n"); - wait(0.2); + MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack}; + int rc = connect(&info); + + client.setConnectionLostHandler(connect); + + rc = client.subscribe(topic, MQTT::QOS1, messageArrived); + if (rc != 0) + lcd.printf("rc from MQTT subscribe is %d\n", rc); + + MQTT::Message message; + // QoS 0 + char buf[100]; + sprintf(buf, "Hello World! QoS 0 message from app version %f\n", version); + message.qos = MQTT::QOS0; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf)+1; + rc = client.publish(topic, &message); + while (arrivedcount == 0) + client.yield(100); + + // QoS 1 + sprintf(buf, "Hello World! QoS 1 message from app version %f\n", version); + message.qos = MQTT::QOS1; + message.payloadlen = strlen(buf)+1; + rc = client.publish(topic, &message); + while (arrivedcount == 1) + client.yield(100); + + // QoS 2 + sprintf(buf, "Hello World! QoS 2 message from app version %f\n", version); + message.qos = MQTT::QOS2; + message.payloadlen = strlen(buf)+1; + rc = client.publish(topic, &message); + while (arrivedcount == 2) + client.yield(100); + + rc = client.unsubscribe(topic); + if (rc != 0) + lcd.printf("rc from unsubscribe was %d\n", rc); + + rc = client.disconnect(); + if (rc != 0) + lcd.printf("rc from disconnect was %d\n", rc); + + ipstack.disconnect(); + + lcd.printf("Finishing with %d messages received\n", arrivedcount); + return 0; } - -int main() -{ - EthernetInterface eth; - eth.init(); //Use DHCP - eth.connect(); - lcd.printf("IP Address is %s\n", eth.getIPAddress()); - - while(1) - { - myled = 1; - publish(); - wait(0.2); - myled = 0; - publish(); - wait(0.2); - } - - eth.disconnect(); -}