Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: MQTTClient.h
- Revision:
- 15:64a57183aa03
- Parent:
- 13:fd82db992024
- Child:
- 16:91c2f9a144d4
--- a/MQTTClient.h Fri Apr 11 22:46:37 2014 +0100
+++ b/MQTTClient.h Sun Apr 13 22:32:28 2014 +0000
@@ -45,7 +45,7 @@
public:
PacketId();
- int getNext();
+ int getNext();
private:
static const int MAX_PACKET_ID = 65535;
@@ -57,16 +57,32 @@
template<class Network, class Timer, class Thread> class Client
{
-public:
-
+public:
+
struct Result
{
/* success or failure result data */
- Client<Network, Timer, Thread>* client;
+ Client<Network, Timer, Thread>* client;
int connack_rc;
- };
-
+ };
+
typedef void (*resultHandler)(Result*);
+
+ struct limits
+ {
+ int MAX_MQTT_PACKET_SIZE; //
+ int MAX_MESSAGE_HANDLERS; // 5 - each subscription requires a message handler
+ int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
+ int command_timeout;
+
+ limits()
+ {
+ MAX_MQTT_PACKET_SIZE = 100;
+ MAX_MESSAGE_HANDLERS = 5;
+ MAX_CONCURRENT_OPERATIONS= 5;
+ command_timeout = 30;
+ }
+ };
Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30);
@@ -87,61 +103,74 @@
private:
- int cycle(int timeout);
+ int cycle(int timeout);
int keepalive();
int decodePacket(int* value, int timeout);
int readPacket(int timeout = -1);
int sendPacket(int length, int timeout = -1);
+ int deliverMessage(MQTTString* topic, Message* message);
Thread* thread;
Network* ipstack;
Timer command_timer, ping_timer;
- char* buf;
+ char buf[];
int buflen;
char* readbuf;
- int readbuflen;
-
- unsigned int keepAliveInterval;
+ int readbuflen;
+
+ unsigned int keepAliveInterval;
bool ping_outstanding;
int command_timeout; // max time to wait for any MQTT command to complete, in seconds
PacketId packetid;
typedef FP<void, Result*> resultHandlerFP;
- // how many concurrent operations should we allow? Each one will require a function pointer
resultHandlerFP connectHandler;
#define MAX_MESSAGE_HANDLERS 5
typedef FP<void, Message*> messageHandlerFP;
- messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size?
-
+ struct
+ {
+ char* topic;
+ messageHandlerFP fp;
+ } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are linked to a subscription topic
+
+ // how many concurrent operations should we allow? Each one will require a function pointer
+ struct
+ {
+ unsigned short id;
+ resultHandlerFP fp;
+ MQTTString* topic; // if this is a publish, store topic name in case republishing is required
+ Message* message; // for publish,
+ } *operations; // result handlers are indexed by packet ids
+
static void threadfn(void* arg);
};
-}
-
-
-template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
-{
- ((Client<Network, Timer, Thread>*) arg)->run(NULL);
+}
+
+
+template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
+{
+ ((Client<Network, Timer, Thread>*) arg)->run(NULL);
}
template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout) : packetid()
{
-
- buf = new char[MAX_MQTT_PACKET_SIZE];
+ //buf = new char[MAX_MQTT_PACKET_SIZE];
readbuf = new char[MAX_MQTT_PACKET_SIZE];
buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
+
this->command_timeout = command_timeout;
this->thread = 0;
this->ipstack = network;
- this->command_timer = Timer();
- this->ping_timer = Timer();
+ this->command_timer = Timer();
+ this->ping_timer = Timer();
this->ping_outstanding = 0;
}
@@ -151,7 +180,7 @@
int sent = 0;
while (sent < length)
- sent += ipstack->write(&buf[sent], length, -1);
+ sent += ipstack->write(&buf[sent], length, -1);
if (sent == length)
ping_timer.reset(); // record the fact that we have successfully sent the packet
return sent;
@@ -163,7 +192,7 @@
char c;
int multiplier = 1;
int len = 0;
-#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
+ const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0;
do
@@ -219,6 +248,11 @@
}
+template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message)
+{
+}
+
+
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
{
/* get one piece of work off the wire and one pass through */
@@ -227,116 +261,114 @@
int packet_type = readPacket(timeout);
printf("packet type %d\n", packet_type);
-
+
int len, rc;
switch (packet_type)
{
- case CONNACK:
- if (this->thread)
- {
- Result res = {this, 0};
- int connack_rc = -1;
+ case CONNACK:
+ if (this->thread)
+ {
+ Result res = {this, 0};
if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
- ;
- connectHandler(&res);
- }
+ ;
+ connectHandler(&res);
+ connectHandler.detach(); // only invoke the callback once
+ }
case PUBACK:
case SUBACK:
break;
- case PUBLISH:
- MQTTString topicName;
- Message msg;
- rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
- (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
- if (msg.qos == QOS0)
- messageHandlers[0](&msg);
+ case PUBLISH:
+ MQTTString topicName;
+ Message msg;
+ rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
+ (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
+ if (msg.qos == QOS0)
+ deliverMessage(&topicName, &msg);
break;
- case PUBREC:
+ case PUBREC:
int type, dup, mypacketid;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
- ;
+ ;
+ // must lock this access against the application thread, if we are multi-threaded
len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
- rc = sendPacket(len); // send the subscribe packet
- if (rc != len)
- goto exit; // there was a problem
+ rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
+ goto exit; // there was a problem
break;
case PUBCOMP:
break;
- case PINGRESP:
- if (ping_outstanding)
- ping_outstanding = false;
- //else disconnect();
+ case PINGRESP:
+ ping_outstanding = false;
break;
- case -1:
- break;
- }
- keepalive();
+ }
+ keepalive();
exit:
return packet_type;
-}
-
-
+}
+
+
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
-{
- int rc = 0;
-
- if (keepAliveInterval == 0)
- goto exit;
-
- if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
- {
- if (ping_outstanding)
- rc = -1;
- else
- {
- int len = MQTTSerialize_pingreq(buf, buflen);
- rc = sendPacket(len); // send the connect packet
- if (rc != len)
- rc = -1; // indicate there's a problem
- else
- ping_outstanding = true;
- }
- }
-
-exit:
+{
+ int rc = 0;
+
+ if (keepAliveInterval == 0)
+ goto exit;
+
+ if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
+ {
+ if (ping_outstanding)
+ rc = -1;
+ else
+ {
+ int len = MQTTSerialize_pingreq(buf, buflen);
+ rc = sendPacket(len); // send the connect packet
+ if (rc != len)
+ rc = -1; // indicate there's a problem
+ else
+ ping_outstanding = true;
+ }
+ }
+
+exit:
return rc;
}
template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
-{
- while (true)
+{
+ while (true)
cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
}
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
-{
- command_timer.start();
+{
+ command_timer.start();
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
if (options == 0)
options = &default_options; // set default options if none were supplied
- this->keepAliveInterval = options->keepAliveInterval;
+ this->keepAliveInterval = options->keepAliveInterval;
ping_timer.start();
int len = MQTTSerialize_connect(buf, buflen, options);
- int rc = sendPacket(len); // send the connect packet
- if (rc != len)
+ int rc = sendPacket(len); // send the connect packet
+ if (rc != len)
goto exit; // there was a problem
if (resultHandler == 0) // wait until the connack is received
- {
- if (command_timer.read_ms() > (command_timeout * 1000))
- goto exit; // we timed out
+ {
// this will be a blocking call, wait for the connack
- if (cycle(command_timeout - command_timer.read_ms()) == CONNACK)
+ do
{
- int connack_rc = -1;
- if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
- rc = connack_rc;
- }
+ if (command_timer.read_ms() > (command_timeout * 1000))
+ goto exit; // we timed out
+ }
+ while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
+ int connack_rc = -1;
+ if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
+ rc = connack_rc;
}
else
{
@@ -346,23 +378,23 @@
// start background thread
this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
}
-
-exit:
- command_timer.stop();
+
+exit:
+ command_timer.stop();
command_timer.reset();
return rc;
}
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
-{
- command_timer.start();
+{
+ command_timer.start();
MQTTString topic = {(char*)topicFilter, 0, 0};
int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
- int rc = sendPacket(len); // send the subscribe packet
- if (rc != len)
+ int rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
goto exit; // there was a problem
/* wait for suback */
@@ -381,23 +413,23 @@
// set subscribe response callback function
}
-
-exit:
- command_timer.stop();
+
+exit:
+ command_timer.stop();
command_timer.reset();
return rc;
-}
-
-
+}
+
+
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
-{
- command_timer.start();
+{
+ command_timer.start();
MQTTString topic = {(char*)topicFilter, 0, 0};
int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
- int rc = sendPacket(len); // send the subscribe packet
- if (rc != len)
+ int rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
goto exit; // there was a problem
/* wait for suback */
@@ -416,57 +448,49 @@
// set unsubscribe response callback function
}
-
-exit:
- command_timer.stop();
+
+exit:
+ command_timer.stop();
command_timer.reset();
return rc;
-}
-
-
-
+}
+
+
+
template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
-{
- command_timer.start();
+{
+ command_timer.start();
- MQTTString topic = {(char*)topicName, 0, 0};
-
+ MQTTString topic = {(char*)topicName, 0, 0};
+
message->id = packetid.getNext();
-
+
int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
- int rc = sendPacket(len); // send the subscribe packet
- if (rc != len)
+ int rc = sendPacket(len); // send the subscribe packet
+ if (rc != len)
goto exit; // there was a problem
/* wait for acks */
if (resultHandler == 0)
{
- if (message->qos == QOS1)
+ if (message->qos == QOS1)
{
if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
{
int type, dup, mypacketid;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
rc = 0;
- }
- }
- else if (message->qos == QOS2)
- {
- if (cycle(command_timeout - command_timer.read_ms()) == PUBREC)
- {
- int type, dup, mypacketid;
- if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
- rc = 0;
- len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, message->id);
- rc = sendPacket(len); // send the subscribe packet
- if (rc != len)
- goto exit; // there was a problem
- if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
- {
- if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
- rc = 0;
- }
- }
+ }
+ }
+ else if (message->qos == QOS2)
+ {
+ if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
+ {
+ int type, dup, mypacketid;
+ if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
+ rc = 0;
+ }
+
}
}
else
@@ -474,9 +498,9 @@
// set publish response callback function
}
-
-exit:
- command_timer.stop();
+
+exit:
+ command_timer.stop();
command_timer.reset();
return rc;
}