Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 19:57f6f976e878
- Parent:
- 16:91c2f9a144d4
- Child:
- 20:cad3d54d7ecf
diff -r 297771d7c0a6 -r 57f6f976e878 MQTTClient.h --- a/MQTTClient.h Mon Apr 14 19:00:56 2014 +0000 +++ b/MQTTClient.h Mon Apr 14 21:58:58 2014 +0100 @@ -98,11 +98,12 @@ int unsubscribe(const char* topicFilter, resultHandler rh = 0); int disconnect(int timeout, resultHandler rh = 0); - - void run(void const *argument); + + void yield(int timeout); -private: - +private: + + void run(void const *argument); int cycle(int timeout); int waitfor(int packet_type, Timer& atimer); int keepalive(); @@ -255,7 +256,7 @@ template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) { int rc = -1; - +; // we have to find the right message handler - indexed by topic for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) { @@ -269,16 +270,25 @@ return rc; } + + + +template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::yield(int timeout) +{ + Timer atimer = Timer(); + + atimer.countdown_ms(timeout); + while (!atimer.expired()) + cycle(atimer.left_ms()); +} template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout) { /* get one piece of work off the wire and one pass through */ - + printf("timeout %d\n", timeout); // read the socket, see what work is due int packet_type = readPacket(timeout); - - printf("packet type %d\n", packet_type); int len, rc; switch (packet_type) @@ -302,7 +312,7 @@ MQTTString topicName; Message msg; rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE); + (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; if (msg.qos == QOS0) deliverMessage(&topicName, &msg); break; @@ -488,12 +498,13 @@ int index = 0; if (this->thread) index = findFreeOperation(); - Timer& atimer = operations[index].timer; - + Timer& atimer = operations[index].timer; + + atimer.countdown(limits.command_timeout); MQTTString topic = {(char*)topicFilter, 0, 0}; - int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic); - int rc = sendPacket(len, atimer); // send the subscribe packet + int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); + int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet if (rc != len) goto exit; // there was a problem @@ -504,7 +515,7 @@ if (waitfor(UNSUBACK) == UNSUBACK) { int mypacketid; - if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } } @@ -515,23 +526,25 @@ } exit: - command_timer.stop(); - command_timer.reset(); return rc; } template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler) -{ - command_timer.start(); - +{ + int index = 0; + if (this->thread) + index = findFreeOperation(); + Timer& atimer = operations[index].timer; + + atimer.countdown(limits.command_timeout); MQTTString topic = {(char*)topicName, 0, 0}; message->id = packetid.getNext(); - int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen); - int rc = sendPacket(len); // send the subscribe packet + int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); + int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet if (rc != len) goto exit; // there was a problem @@ -540,19 +553,19 @@ { if (message->qos == QOS1) { - if (waitfor(PUBACK) == PUBACK) + if (waitfor(PUBACK, atimer) == PUBACK) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } } else if (message->qos == QOS2) { - if (waitfor(PUBCOMP) == PUBCOMP) + if (waitfor(PUBCOMP, atimer) == PUBCOMP) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } @@ -565,8 +578,6 @@ } exit: - command_timer.stop(); - command_timer.reset(); return rc; }