An API for using MQTT over multiple transports
Fork of MQTT by
Diff: MQTTClient.h
- Revision:
- 19:57f6f976e878
- Parent:
- 16:91c2f9a144d4
- Child:
- 20:cad3d54d7ecf
--- a/MQTTClient.h Mon Apr 14 19:00:56 2014 +0000 +++ b/MQTTClient.h Mon Apr 14 21:58:58 2014 +0100 @@ -98,11 +98,12 @@ int unsubscribe(const char* topicFilter, resultHandler rh = 0); int disconnect(int timeout, resultHandler rh = 0); - - void run(void const *argument); + + void yield(int timeout); -private: - +private: + + void run(void const *argument); int cycle(int timeout); int waitfor(int packet_type, Timer& atimer); int keepalive(); @@ -255,7 +256,7 @@ template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) { int rc = -1; - +; // we have to find the right message handler - indexed by topic for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) { @@ -269,16 +270,25 @@ return rc; } + + + +template<class Network, class Timer, class Thread, class Mutex> void MQTT::Client<Network, Timer, Thread, Mutex>::yield(int timeout) +{ + Timer atimer = Timer(); + + atimer.countdown_ms(timeout); + while (!atimer.expired()) + cycle(atimer.left_ms()); +} template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::cycle(int timeout) { /* get one piece of work off the wire and one pass through */ - + printf("timeout %d\n", timeout); // read the socket, see what work is due int packet_type = readPacket(timeout); - - printf("packet type %d\n", packet_type); int len, rc; switch (packet_type) @@ -302,7 +312,7 @@ MQTTString topicName; Message msg; rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, - (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE); + (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; if (msg.qos == QOS0) deliverMessage(&topicName, &msg); break; @@ -488,12 +498,13 @@ int index = 0; if (this->thread) index = findFreeOperation(); - Timer& atimer = operations[index].timer; - + Timer& atimer = operations[index].timer; + + atimer.countdown(limits.command_timeout); MQTTString topic = {(char*)topicFilter, 0, 0}; - int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic); - int rc = sendPacket(len, atimer); // send the subscribe packet + int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); + int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet if (rc != len) goto exit; // there was a problem @@ -504,7 +515,7 @@ if (waitfor(UNSUBACK) == UNSUBACK) { int mypacketid; - if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } } @@ -515,23 +526,25 @@ } exit: - command_timer.stop(); - command_timer.reset(); return rc; } template<class Network, class Timer, class Thread, class Mutex> int MQTT::Client<Network, Timer, Thread, Mutex>::publish(const char* topicName, Message* message, resultHandler resultHandler) -{ - command_timer.start(); - +{ + int index = 0; + if (this->thread) + index = findFreeOperation(); + Timer& atimer = operations[index].timer; + + atimer.countdown(limits.command_timeout); MQTTString topic = {(char*)topicName, 0, 0}; message->id = packetid.getNext(); - int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen); - int rc = sendPacket(len); // send the subscribe packet + int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); + int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet if (rc != len) goto exit; // there was a problem @@ -540,19 +553,19 @@ { if (message->qos == QOS1) { - if (waitfor(PUBACK) == PUBACK) + if (waitfor(PUBACK, atimer) == PUBACK) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } } else if (message->qos == QOS2) { - if (waitfor(PUBCOMP) == PUBCOMP) + if (waitfor(PUBCOMP, atimer) == PUBCOMP) { int type, dup, mypacketid; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) rc = 0; } @@ -565,8 +578,6 @@ } exit: - command_timer.stop(); - command_timer.reset(); return rc; }