Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: FP MQTTPacket
Dependents: NuMaker-mbed-AWS-IoT-example NuMaker-mbed-OS-6-AWS-IoT-example
Diff: MQTTClient.h
- Revision:
- 12:cc7f2d62a393
- Parent:
- 9:01b8cc7d94cc
- Child:
- 13:fd82db992024
--- a/MQTTClient.h Thu Apr 10 15:19:08 2014 +0000
+++ b/MQTTClient.h Fri Apr 11 22:31:55 2014 +0100
@@ -33,41 +33,42 @@
enum QoS qos;
bool retained;
bool dup;
- unsigned short msgid;
+ unsigned short id;
void *payload;
size_t payloadlen;
};
template<class Network, class Timer, class Thread> class Client;
-class Result
-{
- /* success or failure result data */
- Client<class Network, class Timer, class Thread>* client;
-};
-
-
class PacketId
{
public:
PacketId();
- int getNext();
-
+ int getNext();
+
private:
static const int MAX_PACKET_ID = 65535;
int next;
};
-typedef void (*resultHandler)(Result*);
typedef void (*messageHandler)(Message*);
template<class Network, class Timer, class Thread> class Client
{
-public:
+public:
+
+ struct Result
+ {
+ /* success or failure result data */
+ Client<Network, Timer, Thread>* client;
+ int connack_rc;
+ };
+
+ typedef void (*resultHandler)(Result*);
- Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30);
+ Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30);
int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
@@ -78,7 +79,7 @@
int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
- int unsubscribe(char* topicFilter, resultHandler rh = 0);
+ int unsubscribe(const char* topicFilter, resultHandler rh = 0);
int disconnect(int timeout, resultHandler rh = 0);
@@ -86,7 +87,8 @@
private:
- int cycle();
+ int cycle(int timeout);
+ int keepalive();
int decodePacket(int* value, int timeout);
int readPacket(int timeout = -1);
@@ -94,16 +96,18 @@
Thread* thread;
Network* ipstack;
- Timer* timer;
+ Timer command_timer, ping_timer;
char* buf;
int buflen;
char* readbuf;
- int readbuflen;
+ int readbuflen;
+
+ unsigned int keepAliveInterval;
+ bool ping_outstanding;
int command_timeout; // max time to wait for any MQTT command to complete, in seconds
- int keepalive;
PacketId packetid;
typedef FP<void, Result*> resultHandlerFP;
@@ -112,24 +116,33 @@
#define MAX_MESSAGE_HANDLERS 5
typedef FP<void, Message*> messageHandlerFP;
- messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size?
+ messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size?
+
+ static void threadfn(void* arg);
};
-void threadfn(void* arg);
+}
+
+
+template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+{
+ ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+}
-}
-template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) : packetid()
+template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout) : packetid()
{
- buf = new char[buffer_size];
- readbuf = new char[buffer_size];
- buflen = readbuflen = buffer_size;
+ buf = new char[MAX_MQTT_PACKET_SIZE];
+ readbuf = new char[MAX_MQTT_PACKET_SIZE];
+ buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
this->command_timeout = command_timeout;
this->thread = 0;
this->ipstack = network;
- this->timer = timer;
+ this->command_timer = Timer();
+ this->ping_timer = Timer();
+ this->ping_outstanding = 0;
}
@@ -138,8 +151,9 @@
int sent = 0;
while (sent < length)
- sent += ipstack->write(&buf[sent], length, -1);
-
+ sent += ipstack->write(&buf[sent], length, -1);
+ if (sent == length)
+ ping_timer.reset(); // record the fact that we have successfully sent the packet
return sent;
}
@@ -205,69 +219,119 @@
}
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
{
- int timeout = -1;
/* get one piece of work off the wire and one pass through */
- // 1. read the socket, see what work is due.
+ // read the socket, see what work is due
int packet_type = readPacket(timeout);
printf("packet type %d\n", packet_type);
-
+
+ int len, rc;
switch (packet_type)
{
- case CONNACK:
- printf("connack received\n");
- break;
- case PUBLISH:
- break;
+ case CONNACK:
+ if (this->thread)
+ {
+ Result res = {this, 0};
+ int connack_rc = -1;
+ if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
+ ;
+ connectHandler(&res);
+ }
case PUBACK:
- break;
case SUBACK:
break;
- case PUBREC:
+ case PUBLISH:
+ MQTTString topicName;
+ Message msg;
+ rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+ (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+ if (msg.qos == QOS0)
+ messageHandlers[0](&msg);
+ break;
+ case PUBREC:
+ int type, dup, mypacketid;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+ ;
+ len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
+ rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
+ goto exit; // there was a problem
+
break;
case PUBCOMP:
break;
- case PINGRESP:
+ case PINGRESP:
+ if (ping_outstanding)
+ ping_outstanding = false;
+ //else disconnect();
break;
case -1:
break;
- }
+ }
+ keepalive();
+exit:
return packet_type;
+}
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
+{
+ int rc = 0;
+
+ if (keepAliveInterval == 0)
+ goto exit;
+
+ if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+ {
+ if (ping_outstanding)
+ rc = -1;
+ else
+ {
+ int len = MQTTSerialize_pingreq(buf, buflen);
+ rc = sendPacket(len); // send the connect packet
+ if (rc != len)
+ rc = -1; // indicate there's a problem
+ else
+ ping_outstanding = true;
+ }
+ }
+
+exit:
+ return rc;
}
template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
-{
+{
+ while (true)
+ cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
}
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
-{
- int len = 0;
- int rc = -99;
- MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
+{
+ command_timer.start();
- /* 2. if the connect was successful, send the MQTT connect packet */
+ MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
if (options == 0)
- {
- default_options.clientID.cstring = "me";
- options = &default_options;
- }
+ options = &default_options; // set default options if none were supplied
- this->keepalive = options->keepAliveInterval;
- len = MQTTSerialize_connect(buf, buflen, options);
- printf("len from send is %d %d\n", len, buflen);
- rc = sendPacket(len); // send the connect packet
- printf("rc from send is %d\n", rc);
+ this->keepAliveInterval = options->keepAliveInterval;
+ ping_timer.start();
+ int len = MQTTSerialize_connect(buf, buflen, options);
+ int rc = sendPacket(len); // send the connect packet
+ if (rc != len)
+ goto exit; // there was a problem
- /* 3. wait until the connack is received */
- if (resultHandler == 0)
- {
+ if (resultHandler == 0) // wait until the connack is received
+ {
+ if (command_timer.read_ms() > (command_timeout * 1000))
+ goto exit; // we timed out
// this will be a blocking call, wait for the connack
- if (cycle() == CONNACK)
+ if (cycle(command_timeout - command_timer.read_ms()) == CONNACK)
{
int connack_rc = -1;
if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
@@ -279,30 +343,33 @@
// set connect response callback function
connectHandler.attach(resultHandler);
- // start background thread
-
- this->thread = new Thread((void (*)(void const *argument))&MQTT::threadfn, (void*)this);
+ // start background thread
+ this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
}
-
+
+exit:
+ command_timer.stop();
+ command_timer.reset();
return rc;
}
-template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos,
- messageHandler messageHandler, resultHandler resultHandler)
-{
- int rc = -1,
- len = 0;
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
+{
+ command_timer.start();
+
MQTTString topic = {(char*)topicFilter, 0, 0};
- len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
- rc = sendPacket(len); // send the subscribe packet
+ int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+ int rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
+ goto exit; // there was a problem
/* wait for suback */
if (resultHandler == 0)
{
// this will block
- if (cycle() == SUBACK)
+ if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
{
int count = 0, grantedQoS = -1, mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
@@ -314,7 +381,103 @@
// set subscribe response callback function
}
+
+exit:
+ command_timer.stop();
+ command_timer.reset();
+ return rc;
+}
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
+{
+ command_timer.start();
+
+ MQTTString topic = {(char*)topicFilter, 0, 0};
+ int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
+ int rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
+ goto exit; // there was a problem
+
+ /* wait for suback */
+ if (resultHandler == 0)
+ {
+ // this will block
+ if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
+ {
+ int mypacketid;
+ if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
+ rc = 0;
+ }
+ }
+ else
+ {
+ // set unsubscribe response callback function
+
+ }
+
+exit:
+ command_timer.stop();
+ command_timer.reset();
+ return rc;
+}
+
+
+
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
+{
+ command_timer.start();
+
+ MQTTString topic = {(char*)topicName, 0, 0};
+
+ message->id = packetid.getNext();
+
+ int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
+ int rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
+ goto exit; // there was a problem
+
+ /* wait for acks */
+ if (resultHandler == 0)
+ {
+ if (message->qos == QOS1)
+ {
+ if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
+ {
+ int type, dup, mypacketid;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+ rc = 0;
+ }
+ }
+ else if (message->qos == QOS2)
+ {
+ if (cycle(command_timeout - command_timer.read_ms()) == PUBREC)
+ {
+ int type, dup, mypacketid;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+ rc = 0;
+ len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id);
+ rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
+ goto exit; // there was a problem
+ if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+ {
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+ rc = 0;
+ }
+ }
+ }
+ }
+ else
+ {
+ // set publish response callback function
+
+ }
+
+exit:
+ command_timer.stop();
+ command_timer.reset();
return rc;
}