Version to make it easier to reuse without source modifications

Committer:
JMF
Date:
Tue Mar 27 17:26:35 2018 +0000
Revision:
0:5cd4781e0c88
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:5cd4781e0c88 1 /*******************************************************************************
JMF 0:5cd4781e0c88 2 * Copyright (c) 2014 IBM Corp.
JMF 0:5cd4781e0c88 3 *
JMF 0:5cd4781e0c88 4 * All rights reserved. This program and the accompanying materials
JMF 0:5cd4781e0c88 5 * are made available under the terms of the Eclipse Public License v1.0
JMF 0:5cd4781e0c88 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
JMF 0:5cd4781e0c88 7 *
JMF 0:5cd4781e0c88 8 * The Eclipse Public License is available at
JMF 0:5cd4781e0c88 9 * http://www.eclipse.org/legal/epl-v10.html
JMF 0:5cd4781e0c88 10 * and the Eclipse Distribution License is available at
JMF 0:5cd4781e0c88 11 * http://www.eclipse.org/org/documents/edl-v10.php.
JMF 0:5cd4781e0c88 12 *
JMF 0:5cd4781e0c88 13 * Contributors:
JMF 0:5cd4781e0c88 14 * Ian Craggs - initial API and implementation and/or initial documentation
JMF 0:5cd4781e0c88 15 *******************************************************************************/
JMF 0:5cd4781e0c88 16
JMF 0:5cd4781e0c88 17 #if !defined(MQTTASYNC_H)
JMF 0:5cd4781e0c88 18 #define MQTTASYNC_H
JMF 0:5cd4781e0c88 19
JMF 0:5cd4781e0c88 20 #include "MQTTPacket.h"
JMF 0:5cd4781e0c88 21 #include "stdio.h"
JMF 0:5cd4781e0c88 22
JMF 0:5cd4781e0c88 23 namespace MQTT
JMF 0:5cd4781e0c88 24 {
JMF 0:5cd4781e0c88 25
JMF 0:5cd4781e0c88 26
JMF 0:5cd4781e0c88 27 enum QoS { QOS0, QOS1, QOS2 };
JMF 0:5cd4781e0c88 28
JMF 0:5cd4781e0c88 29
JMF 0:5cd4781e0c88 30 struct Message
JMF 0:5cd4781e0c88 31 {
JMF 0:5cd4781e0c88 32 enum QoS qos;
JMF 0:5cd4781e0c88 33 bool retained;
JMF 0:5cd4781e0c88 34 bool dup;
JMF 0:5cd4781e0c88 35 unsigned short id;
JMF 0:5cd4781e0c88 36 void *payload;
JMF 0:5cd4781e0c88 37 size_t payloadlen;
JMF 0:5cd4781e0c88 38 };
JMF 0:5cd4781e0c88 39
JMF 0:5cd4781e0c88 40
JMF 0:5cd4781e0c88 41 class PacketId
JMF 0:5cd4781e0c88 42 {
JMF 0:5cd4781e0c88 43 public:
JMF 0:5cd4781e0c88 44 PacketId();
JMF 0:5cd4781e0c88 45
JMF 0:5cd4781e0c88 46 int getNext();
JMF 0:5cd4781e0c88 47
JMF 0:5cd4781e0c88 48 private:
JMF 0:5cd4781e0c88 49 static const int MAX_PACKET_ID = 65535;
JMF 0:5cd4781e0c88 50 int next;
JMF 0:5cd4781e0c88 51 };
JMF 0:5cd4781e0c88 52
JMF 0:5cd4781e0c88 53 typedef void (*messageHandler)(Message*);
JMF 0:5cd4781e0c88 54
JMF 0:5cd4781e0c88 55 typedef struct limits
JMF 0:5cd4781e0c88 56 {
JMF 0:5cd4781e0c88 57 int MAX_MQTT_PACKET_SIZE; //
JMF 0:5cd4781e0c88 58 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler
JMF 0:5cd4781e0c88 59 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
JMF 0:5cd4781e0c88 60 int command_timeout_ms;
JMF 0:5cd4781e0c88 61
JMF 0:5cd4781e0c88 62 limits()
JMF 0:5cd4781e0c88 63 {
JMF 0:5cd4781e0c88 64 MAX_MQTT_PACKET_SIZE = 100;
JMF 0:5cd4781e0c88 65 MAX_MESSAGE_HANDLERS = 5;
JMF 0:5cd4781e0c88 66 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
JMF 0:5cd4781e0c88 67 command_timeout_ms = 30000;
JMF 0:5cd4781e0c88 68 }
JMF 0:5cd4781e0c88 69 } Limits;
JMF 0:5cd4781e0c88 70
JMF 0:5cd4781e0c88 71
JMF 0:5cd4781e0c88 72 /**
JMF 0:5cd4781e0c88 73 * @class Async
JMF 0:5cd4781e0c88 74 * @brief non-blocking, threaded MQTT client API
JMF 0:5cd4781e0c88 75 * @param Network a network class which supports send, receive
JMF 0:5cd4781e0c88 76 * @param Timer a timer class with the methods:
JMF 0:5cd4781e0c88 77 */
JMF 0:5cd4781e0c88 78 template<class Network, class Timer, class Thread, class Mutex> class Async
JMF 0:5cd4781e0c88 79 {
JMF 0:5cd4781e0c88 80
JMF 0:5cd4781e0c88 81 public:
JMF 0:5cd4781e0c88 82
JMF 0:5cd4781e0c88 83 struct Result
JMF 0:5cd4781e0c88 84 {
JMF 0:5cd4781e0c88 85 /* success or failure result data */
JMF 0:5cd4781e0c88 86 Async<Network, Timer, Thread, Mutex>* client;
JMF 0:5cd4781e0c88 87 int rc;
JMF 0:5cd4781e0c88 88 };
JMF 0:5cd4781e0c88 89
JMF 0:5cd4781e0c88 90 typedef void (*resultHandler)(Result*);
JMF 0:5cd4781e0c88 91
JMF 0:5cd4781e0c88 92 Async(Network* network, const Limits limits = Limits());
JMF 0:5cd4781e0c88 93
JMF 0:5cd4781e0c88 94 typedef struct
JMF 0:5cd4781e0c88 95 {
JMF 0:5cd4781e0c88 96 Async* client;
JMF 0:5cd4781e0c88 97 Network* network;
JMF 0:5cd4781e0c88 98 } connectionLostInfo;
JMF 0:5cd4781e0c88 99
JMF 0:5cd4781e0c88 100 typedef int (*connectionLostHandlers)(connectionLostInfo*);
JMF 0:5cd4781e0c88 101
JMF 0:5cd4781e0c88 102 /** Set the connection lost callback - called whenever the connection is lost and we should be connected
JMF 0:5cd4781e0c88 103 * @param clh - pointer to the callback function
JMF 0:5cd4781e0c88 104 */
JMF 0:5cd4781e0c88 105 void setConnectionLostHandler(connectionLostHandlers clh)
JMF 0:5cd4781e0c88 106 {
JMF 0:5cd4781e0c88 107 connectionLostHandler.attach(clh);
JMF 0:5cd4781e0c88 108 }
JMF 0:5cd4781e0c88 109
JMF 0:5cd4781e0c88 110 /** Set the default message handling callback - used for any message which does not match a subscription message handler
JMF 0:5cd4781e0c88 111 * @param mh - pointer to the callback function
JMF 0:5cd4781e0c88 112 */
JMF 0:5cd4781e0c88 113 void setDefaultMessageHandler(messageHandler mh)
JMF 0:5cd4781e0c88 114 {
JMF 0:5cd4781e0c88 115 defaultMessageHandler.attach(mh);
JMF 0:5cd4781e0c88 116 }
JMF 0:5cd4781e0c88 117
JMF 0:5cd4781e0c88 118 int connect(resultHandler fn, MQTTPacket_connectData* options = 0);
JMF 0:5cd4781e0c88 119
JMF 0:5cd4781e0c88 120 template<class T>
JMF 0:5cd4781e0c88 121 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function
JMF 0:5cd4781e0c88 122
JMF 0:5cd4781e0c88 123 int publish(resultHandler rh, const char* topic, Message* message);
JMF 0:5cd4781e0c88 124
JMF 0:5cd4781e0c88 125 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh);
JMF 0:5cd4781e0c88 126
JMF 0:5cd4781e0c88 127 int unsubscribe(resultHandler rh, const char* topicFilter);
JMF 0:5cd4781e0c88 128
JMF 0:5cd4781e0c88 129 int disconnect(resultHandler rh);
JMF 0:5cd4781e0c88 130
JMF 0:5cd4781e0c88 131 private:
JMF 0:5cd4781e0c88 132
JMF 0:5cd4781e0c88 133 void run(void const *argument);
JMF 0:5cd4781e0c88 134 int cycle(int timeout);
JMF 0:5cd4781e0c88 135 int waitfor(int packet_type, Timer& atimer);
JMF 0:5cd4781e0c88 136 int keepalive();
JMF 0:5cd4781e0c88 137 int findFreeOperation();
JMF 0:5cd4781e0c88 138
JMF 0:5cd4781e0c88 139 int decodePacket(int* value, int timeout);
JMF 0:5cd4781e0c88 140 int readPacket(int timeout);
JMF 0:5cd4781e0c88 141 int sendPacket(int length, int timeout);
JMF 0:5cd4781e0c88 142 int deliverMessage(MQTTString* topic, Message* message);
JMF 0:5cd4781e0c88 143
JMF 0:5cd4781e0c88 144 Thread* thread;
JMF 0:5cd4781e0c88 145 Network* ipstack;
JMF 0:5cd4781e0c88 146
JMF 0:5cd4781e0c88 147 Limits limits;
JMF 0:5cd4781e0c88 148
JMF 0:5cd4781e0c88 149 char* buf;
JMF 0:5cd4781e0c88 150 char* readbuf;
JMF 0:5cd4781e0c88 151
JMF 0:5cd4781e0c88 152 Timer ping_timer, connect_timer;
JMF 0:5cd4781e0c88 153 unsigned int keepAliveInterval;
JMF 0:5cd4781e0c88 154 bool ping_outstanding;
JMF 0:5cd4781e0c88 155
JMF 0:5cd4781e0c88 156 PacketId packetid;
JMF 0:5cd4781e0c88 157
JMF 0:5cd4781e0c88 158 typedef Callback<void(Result*)> resultHandlerFP;
JMF 0:5cd4781e0c88 159 resultHandlerFP connectHandler;
JMF 0:5cd4781e0c88 160
JMF 0:5cd4781e0c88 161 typedef Callback<void(Message*)> messageHandlerFP;
JMF 0:5cd4781e0c88 162 struct MessageHandlers
JMF 0:5cd4781e0c88 163 {
JMF 0:5cd4781e0c88 164 const char* topic;
JMF 0:5cd4781e0c88 165 messageHandlerFP fp;
JMF 0:5cd4781e0c88 166 } *messageHandlers; // Message handlers are indexed by subscription topic
JMF 0:5cd4781e0c88 167
JMF 0:5cd4781e0c88 168 // how many concurrent operations should we allow? Each one will require a function pointer
JMF 0:5cd4781e0c88 169 struct Operations
JMF 0:5cd4781e0c88 170 {
JMF 0:5cd4781e0c88 171 unsigned short id;
JMF 0:5cd4781e0c88 172 resultHandlerFP fp;
JMF 0:5cd4781e0c88 173 const char* topic; // if this is a publish, store topic name in case republishing is required
JMF 0:5cd4781e0c88 174 Message* message; // for publish,
JMF 0:5cd4781e0c88 175 Timer timer; // to check if the command has timed out
JMF 0:5cd4781e0c88 176 } *operations; // result handlers are indexed by packet ids
JMF 0:5cd4781e0c88 177
JMF 0:5cd4781e0c88 178 static void threadfn(void* arg);
JMF 0:5cd4781e0c88 179
JMF 0:5cd4781e0c88 180 messageHandlerFP defaultMessageHandler;
JMF 0:5cd4781e0c88 181
JMF 0:5cd4781e0c88 182 typedef Callback<int(connectionLostInfo*)> connectionLostFP;
JMF 0:5cd4781e0c88 183
JMF 0:5cd4781e0c88 184 connectionLostFP connectionLostHandler;
JMF 0:5cd4781e0c88 185
JMF 0:5cd4781e0c88 186 };
JMF 0:5cd4781e0c88 187
JMF 0:5cd4781e0c88 188 }
JMF 0:5cd4781e0c88 189
JMF 0:5cd4781e0c88 190
JMF 0:5cd4781e0c88 191 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg)
JMF 0:5cd4781e0c88 192 {
JMF 0:5cd4781e0c88 193 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL);
JMF 0:5cd4781e0c88 194 }
JMF 0:5cd4781e0c88 195
JMF 0:5cd4781e0c88 196
JMF 0:5cd4781e0c88 197 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid()
JMF 0:5cd4781e0c88 198 {
JMF 0:5cd4781e0c88 199 this->thread = 0;
JMF 0:5cd4781e0c88 200 this->ipstack = network;
JMF 0:5cd4781e0c88 201 this->ping_timer = Timer();
JMF 0:5cd4781e0c88 202 this->ping_outstanding = 0;
JMF 0:5cd4781e0c88 203
JMF 0:5cd4781e0c88 204 // How to make these memory allocations portable? I was hoping to avoid the heap
JMF 0:5cd4781e0c88 205 buf = new char[limits.MAX_MQTT_PACKET_SIZE];
JMF 0:5cd4781e0c88 206 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
JMF 0:5cd4781e0c88 207 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS];
JMF 0:5cd4781e0c88 208 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
JMF 0:5cd4781e0c88 209 operations[i].id = 0;
JMF 0:5cd4781e0c88 210 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
JMF 0:5cd4781e0c88 211 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 212 messageHandlers[i].topic = 0;
JMF 0:5cd4781e0c88 213 }
JMF 0:5cd4781e0c88 214
JMF 0:5cd4781e0c88 215
JMF 0:5cd4781e0c88 216 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout)
JMF 0:5cd4781e0c88 217 {
JMF 0:5cd4781e0c88 218 int sent = 0;
JMF 0:5cd4781e0c88 219
JMF 0:5cd4781e0c88 220 while (sent < length)
JMF 0:5cd4781e0c88 221 sent += ipstack->write(&buf[sent], length, timeout);
JMF 0:5cd4781e0c88 222 if (sent == length)
JMF 0:5cd4781e0c88 223 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
JMF 0:5cd4781e0c88 224 return sent;
JMF 0:5cd4781e0c88 225 }
JMF 0:5cd4781e0c88 226
JMF 0:5cd4781e0c88 227
JMF 0:5cd4781e0c88 228 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout)
JMF 0:5cd4781e0c88 229 {
JMF 0:5cd4781e0c88 230 char c;
JMF 0:5cd4781e0c88 231 int multiplier = 1;
JMF 0:5cd4781e0c88 232 int len = 0;
JMF 0:5cd4781e0c88 233 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
JMF 0:5cd4781e0c88 234
JMF 0:5cd4781e0c88 235 *value = 0;
JMF 0:5cd4781e0c88 236 do
JMF 0:5cd4781e0c88 237 {
JMF 0:5cd4781e0c88 238 int rc = MQTTPACKET_READ_ERROR;
JMF 0:5cd4781e0c88 239
JMF 0:5cd4781e0c88 240 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
JMF 0:5cd4781e0c88 241 {
JMF 0:5cd4781e0c88 242 rc = MQTTPACKET_READ_ERROR; /* bad data */
JMF 0:5cd4781e0c88 243 goto exit;
JMF 0:5cd4781e0c88 244 }
JMF 0:5cd4781e0c88 245 rc = ipstack->read(&c, 1, timeout);
JMF 0:5cd4781e0c88 246 if (rc != 1)
JMF 0:5cd4781e0c88 247 goto exit;
JMF 0:5cd4781e0c88 248 *value += (c & 127) * multiplier;
JMF 0:5cd4781e0c88 249 multiplier *= 128;
JMF 0:5cd4781e0c88 250 } while ((c & 128) != 0);
JMF 0:5cd4781e0c88 251 exit:
JMF 0:5cd4781e0c88 252 return len;
JMF 0:5cd4781e0c88 253 }
JMF 0:5cd4781e0c88 254
JMF 0:5cd4781e0c88 255
JMF 0:5cd4781e0c88 256 /**
JMF 0:5cd4781e0c88 257 * If any read fails in this method, then we should disconnect from the network, as on reconnect
JMF 0:5cd4781e0c88 258 * the packets can be retried.
JMF 0:5cd4781e0c88 259 * @param timeout the max time to wait for the packet read to complete, in milliseconds
JMF 0:5cd4781e0c88 260 * @return the MQTT packet type, or -1 if none
JMF 0:5cd4781e0c88 261 */
JMF 0:5cd4781e0c88 262 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout)
JMF 0:5cd4781e0c88 263 {
JMF 0:5cd4781e0c88 264 int rc = -1;
JMF 0:5cd4781e0c88 265 MQTTHeader header = {0};
JMF 0:5cd4781e0c88 266 int len = 0;
JMF 0:5cd4781e0c88 267 int rem_len = 0;
JMF 0:5cd4781e0c88 268
JMF 0:5cd4781e0c88 269 /* 1. read the header byte. This has the packet type in it */
JMF 0:5cd4781e0c88 270 if (ipstack->read(readbuf, 1, timeout) != 1)
JMF 0:5cd4781e0c88 271 goto exit;
JMF 0:5cd4781e0c88 272
JMF 0:5cd4781e0c88 273 len = 1;
JMF 0:5cd4781e0c88 274 /* 2. read the remaining length. This is variable in itself */
JMF 0:5cd4781e0c88 275 decodePacket(&rem_len, timeout);
JMF 0:5cd4781e0c88 276 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
JMF 0:5cd4781e0c88 277
JMF 0:5cd4781e0c88 278 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
JMF 0:5cd4781e0c88 279 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
JMF 0:5cd4781e0c88 280 goto exit;
JMF 0:5cd4781e0c88 281
JMF 0:5cd4781e0c88 282 header.byte = readbuf[0];
JMF 0:5cd4781e0c88 283 rc = header.bits.type;
JMF 0:5cd4781e0c88 284 exit:
JMF 0:5cd4781e0c88 285 return rc;
JMF 0:5cd4781e0c88 286 }
JMF 0:5cd4781e0c88 287
JMF 0:5cd4781e0c88 288
JMF 0:5cd4781e0c88 289 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message)
JMF 0:5cd4781e0c88 290 {
JMF 0:5cd4781e0c88 291 int rc = -1;
JMF 0:5cd4781e0c88 292
JMF 0:5cd4781e0c88 293 // we have to find the right message handler - indexed by topic
JMF 0:5cd4781e0c88 294 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 295 {
JMF 0:5cd4781e0c88 296 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
JMF 0:5cd4781e0c88 297 {
JMF 0:5cd4781e0c88 298 messageHandlers[i].fp(message);
JMF 0:5cd4781e0c88 299 rc = 0;
JMF 0:5cd4781e0c88 300 break;
JMF 0:5cd4781e0c88 301 }
JMF 0:5cd4781e0c88 302 }
JMF 0:5cd4781e0c88 303
JMF 0:5cd4781e0c88 304 return rc;
JMF 0:5cd4781e0c88 305 }
JMF 0:5cd4781e0c88 306
JMF 0:5cd4781e0c88 307
JMF 0:5cd4781e0c88 308
JMF 0:5cd4781e0c88 309 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout)
JMF 0:5cd4781e0c88 310 {
JMF 0:5cd4781e0c88 311 /* get one piece of work off the wire and one pass through */
JMF 0:5cd4781e0c88 312
JMF 0:5cd4781e0c88 313 // read the socket, see what work is due
JMF 0:5cd4781e0c88 314 int packet_type = readPacket(timeout);
JMF 0:5cd4781e0c88 315
JMF 0:5cd4781e0c88 316 int len, rc;
JMF 0:5cd4781e0c88 317 switch (packet_type)
JMF 0:5cd4781e0c88 318 {
JMF 0:5cd4781e0c88 319 case CONNACK:
JMF 0:5cd4781e0c88 320 if (this->thread)
JMF 0:5cd4781e0c88 321 {
JMF 0:5cd4781e0c88 322 Result res = {this, 0};
JMF 0:5cd4781e0c88 323 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 324 ;
JMF 0:5cd4781e0c88 325 connectHandler(&res);
JMF 0:5cd4781e0c88 326 connectHandler.detach(); // only invoke the callback once
JMF 0:5cd4781e0c88 327 }
JMF 0:5cd4781e0c88 328 break;
JMF 0:5cd4781e0c88 329 case PUBACK:
JMF 0:5cd4781e0c88 330 if (this->thread)
JMF 0:5cd4781e0c88 331 ; //call resultHandler
JMF 0:5cd4781e0c88 332 case SUBACK:
JMF 0:5cd4781e0c88 333 break;
JMF 0:5cd4781e0c88 334 case PUBLISH:
JMF 0:5cd4781e0c88 335 MQTTString topicName;
JMF 0:5cd4781e0c88 336 Message msg;
JMF 0:5cd4781e0c88 337 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
JMF 0:5cd4781e0c88 338 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
JMF 0:5cd4781e0c88 339 if (msg.qos == QOS0)
JMF 0:5cd4781e0c88 340 deliverMessage(&topicName, &msg);
JMF 0:5cd4781e0c88 341 break;
JMF 0:5cd4781e0c88 342 case PUBREC:
JMF 0:5cd4781e0c88 343 int type, dup, mypacketid;
JMF 0:5cd4781e0c88 344 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 345 ;
JMF 0:5cd4781e0c88 346 // must lock this access against the application thread, if we are multi-threaded
JMF 0:5cd4781e0c88 347 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
JMF 0:5cd4781e0c88 348 rc = sendPacket(len, timeout); // send the PUBREL packet
JMF 0:5cd4781e0c88 349 if (rc != len)
JMF 0:5cd4781e0c88 350 goto exit; // there was a problem
JMF 0:5cd4781e0c88 351
JMF 0:5cd4781e0c88 352 break;
JMF 0:5cd4781e0c88 353 case PUBCOMP:
JMF 0:5cd4781e0c88 354 break;
JMF 0:5cd4781e0c88 355 case PINGRESP:
JMF 0:5cd4781e0c88 356 ping_outstanding = false;
JMF 0:5cd4781e0c88 357 break;
JMF 0:5cd4781e0c88 358 }
JMF 0:5cd4781e0c88 359 keepalive();
JMF 0:5cd4781e0c88 360 exit:
JMF 0:5cd4781e0c88 361 return packet_type;
JMF 0:5cd4781e0c88 362 }
JMF 0:5cd4781e0c88 363
JMF 0:5cd4781e0c88 364
JMF 0:5cd4781e0c88 365 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive()
JMF 0:5cd4781e0c88 366 {
JMF 0:5cd4781e0c88 367 int rc = 0;
JMF 0:5cd4781e0c88 368
JMF 0:5cd4781e0c88 369 if (keepAliveInterval == 0)
JMF 0:5cd4781e0c88 370 goto exit;
JMF 0:5cd4781e0c88 371
JMF 0:5cd4781e0c88 372 if (ping_timer.expired())
JMF 0:5cd4781e0c88 373 {
JMF 0:5cd4781e0c88 374 if (ping_outstanding)
JMF 0:5cd4781e0c88 375 rc = -1;
JMF 0:5cd4781e0c88 376 else
JMF 0:5cd4781e0c88 377 {
JMF 0:5cd4781e0c88 378 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
JMF 0:5cd4781e0c88 379 rc = sendPacket(len, 1000); // send the ping packet
JMF 0:5cd4781e0c88 380 if (rc != len)
JMF 0:5cd4781e0c88 381 rc = -1; // indicate there's a problem
JMF 0:5cd4781e0c88 382 else
JMF 0:5cd4781e0c88 383 ping_outstanding = true;
JMF 0:5cd4781e0c88 384 }
JMF 0:5cd4781e0c88 385 }
JMF 0:5cd4781e0c88 386
JMF 0:5cd4781e0c88 387 exit:
JMF 0:5cd4781e0c88 388 return rc;
JMF 0:5cd4781e0c88 389 }
JMF 0:5cd4781e0c88 390
JMF 0:5cd4781e0c88 391
JMF 0:5cd4781e0c88 392 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument)
JMF 0:5cd4781e0c88 393 {
JMF 0:5cd4781e0c88 394 while (true)
JMF 0:5cd4781e0c88 395 cycle(ping_timer.left_ms());
JMF 0:5cd4781e0c88 396 }
JMF 0:5cd4781e0c88 397
JMF 0:5cd4781e0c88 398
JMF 0:5cd4781e0c88 399 // only used in single-threaded mode where one command at a time is in process
JMF 0:5cd4781e0c88 400 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer)
JMF 0:5cd4781e0c88 401 {
JMF 0:5cd4781e0c88 402 int rc = -1;
JMF 0:5cd4781e0c88 403
JMF 0:5cd4781e0c88 404 do
JMF 0:5cd4781e0c88 405 {
JMF 0:5cd4781e0c88 406 if (atimer.expired())
JMF 0:5cd4781e0c88 407 break; // we timed out
JMF 0:5cd4781e0c88 408 }
JMF 0:5cd4781e0c88 409 while ((rc = cycle(atimer.left_ms())) != packet_type);
JMF 0:5cd4781e0c88 410
JMF 0:5cd4781e0c88 411 return rc;
JMF 0:5cd4781e0c88 412 }
JMF 0:5cd4781e0c88 413
JMF 0:5cd4781e0c88 414
JMF 0:5cd4781e0c88 415 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options)
JMF 0:5cd4781e0c88 416 {
JMF 0:5cd4781e0c88 417 connect_timer.countdown(limits.command_timeout_ms);
JMF 0:5cd4781e0c88 418
JMF 0:5cd4781e0c88 419 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
JMF 0:5cd4781e0c88 420 if (options == 0)
JMF 0:5cd4781e0c88 421 options = &default_options; // set default options if none were supplied
JMF 0:5cd4781e0c88 422
JMF 0:5cd4781e0c88 423 this->keepAliveInterval = options->keepAliveInterval;
JMF 0:5cd4781e0c88 424 ping_timer.countdown(this->keepAliveInterval);
JMF 0:5cd4781e0c88 425 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
JMF 0:5cd4781e0c88 426 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet
JMF 0:5cd4781e0c88 427 if (rc != len)
JMF 0:5cd4781e0c88 428 goto exit; // there was a problem
JMF 0:5cd4781e0c88 429
JMF 0:5cd4781e0c88 430 if (resultHandler == 0) // wait until the connack is received
JMF 0:5cd4781e0c88 431 {
JMF 0:5cd4781e0c88 432 // this will be a blocking call, wait for the connack
JMF 0:5cd4781e0c88 433 if (waitfor(CONNACK, connect_timer) == CONNACK)
JMF 0:5cd4781e0c88 434 {
JMF 0:5cd4781e0c88 435 int connack_rc = -1;
JMF 0:5cd4781e0c88 436 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 437 rc = connack_rc;
JMF 0:5cd4781e0c88 438 }
JMF 0:5cd4781e0c88 439 }
JMF 0:5cd4781e0c88 440 else
JMF 0:5cd4781e0c88 441 {
JMF 0:5cd4781e0c88 442 // set connect response callback function
JMF 0:5cd4781e0c88 443 connectHandler.attach(resultHandler);
JMF 0:5cd4781e0c88 444
JMF 0:5cd4781e0c88 445 // start background thread
JMF 0:5cd4781e0c88 446 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this);
JMF 0:5cd4781e0c88 447 }
JMF 0:5cd4781e0c88 448
JMF 0:5cd4781e0c88 449 exit:
JMF 0:5cd4781e0c88 450 return rc;
JMF 0:5cd4781e0c88 451 }
JMF 0:5cd4781e0c88 452
JMF 0:5cd4781e0c88 453
JMF 0:5cd4781e0c88 454 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation()
JMF 0:5cd4781e0c88 455 {
JMF 0:5cd4781e0c88 456 int found = -1;
JMF 0:5cd4781e0c88 457 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i)
JMF 0:5cd4781e0c88 458 {
JMF 0:5cd4781e0c88 459 if (operations[i].id == 0)
JMF 0:5cd4781e0c88 460 {
JMF 0:5cd4781e0c88 461 found = i;
JMF 0:5cd4781e0c88 462 break;
JMF 0:5cd4781e0c88 463 }
JMF 0:5cd4781e0c88 464 }
JMF 0:5cd4781e0c88 465 return found;
JMF 0:5cd4781e0c88 466 }
JMF 0:5cd4781e0c88 467
JMF 0:5cd4781e0c88 468
JMF 0:5cd4781e0c88 469 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler)
JMF 0:5cd4781e0c88 470 {
JMF 0:5cd4781e0c88 471 int index = 0;
JMF 0:5cd4781e0c88 472 if (this->thread)
JMF 0:5cd4781e0c88 473 index = findFreeOperation();
JMF 0:5cd4781e0c88 474 Timer& atimer = operations[index].timer;
JMF 0:5cd4781e0c88 475
JMF 0:5cd4781e0c88 476 atimer.countdown(limits.command_timeout_ms);
JMF 0:5cd4781e0c88 477 MQTTString topic = {(char*)topicFilter, 0, 0};
JMF 0:5cd4781e0c88 478
JMF 0:5cd4781e0c88 479 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
JMF 0:5cd4781e0c88 480 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
JMF 0:5cd4781e0c88 481 if (rc != len)
JMF 0:5cd4781e0c88 482 goto exit; // there was a problem
JMF 0:5cd4781e0c88 483
JMF 0:5cd4781e0c88 484 /* wait for suback */
JMF 0:5cd4781e0c88 485 if (resultHandler == 0)
JMF 0:5cd4781e0c88 486 {
JMF 0:5cd4781e0c88 487 // this will block
JMF 0:5cd4781e0c88 488 if (waitfor(SUBACK, atimer) == SUBACK)
JMF 0:5cd4781e0c88 489 {
JMF 0:5cd4781e0c88 490 int count = 0, grantedQoS = -1, mypacketid;
JMF 0:5cd4781e0c88 491 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 492 rc = grantedQoS; // 0, 1, 2 or 0x80
JMF 0:5cd4781e0c88 493 if (rc != 0x80)
JMF 0:5cd4781e0c88 494 {
JMF 0:5cd4781e0c88 495 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 496 {
JMF 0:5cd4781e0c88 497 if (messageHandlers[i].topic == 0)
JMF 0:5cd4781e0c88 498 {
JMF 0:5cd4781e0c88 499 messageHandlers[i].topic = topicFilter;
JMF 0:5cd4781e0c88 500 messageHandlers[i].fp.attach(messageHandler);
JMF 0:5cd4781e0c88 501 rc = 0;
JMF 0:5cd4781e0c88 502 break;
JMF 0:5cd4781e0c88 503 }
JMF 0:5cd4781e0c88 504 }
JMF 0:5cd4781e0c88 505 }
JMF 0:5cd4781e0c88 506 }
JMF 0:5cd4781e0c88 507 }
JMF 0:5cd4781e0c88 508 else
JMF 0:5cd4781e0c88 509 {
JMF 0:5cd4781e0c88 510 // set subscribe response callback function
JMF 0:5cd4781e0c88 511
JMF 0:5cd4781e0c88 512 }
JMF 0:5cd4781e0c88 513
JMF 0:5cd4781e0c88 514 exit:
JMF 0:5cd4781e0c88 515 return rc;
JMF 0:5cd4781e0c88 516 }
JMF 0:5cd4781e0c88 517
JMF 0:5cd4781e0c88 518
JMF 0:5cd4781e0c88 519 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter)
JMF 0:5cd4781e0c88 520 {
JMF 0:5cd4781e0c88 521 int index = 0;
JMF 0:5cd4781e0c88 522 if (this->thread)
JMF 0:5cd4781e0c88 523 index = findFreeOperation();
JMF 0:5cd4781e0c88 524 Timer& atimer = operations[index].timer;
JMF 0:5cd4781e0c88 525
JMF 0:5cd4781e0c88 526 atimer.countdown(limits.command_timeout_ms);
JMF 0:5cd4781e0c88 527 MQTTString topic = {(char*)topicFilter, 0, 0};
JMF 0:5cd4781e0c88 528
JMF 0:5cd4781e0c88 529 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
JMF 0:5cd4781e0c88 530 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
JMF 0:5cd4781e0c88 531 if (rc != len)
JMF 0:5cd4781e0c88 532 goto exit; // there was a problem
JMF 0:5cd4781e0c88 533
JMF 0:5cd4781e0c88 534 // set unsubscribe response callback function
JMF 0:5cd4781e0c88 535
JMF 0:5cd4781e0c88 536
JMF 0:5cd4781e0c88 537 exit:
JMF 0:5cd4781e0c88 538 return rc;
JMF 0:5cd4781e0c88 539 }
JMF 0:5cd4781e0c88 540
JMF 0:5cd4781e0c88 541
JMF 0:5cd4781e0c88 542
JMF 0:5cd4781e0c88 543 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message)
JMF 0:5cd4781e0c88 544 {
JMF 0:5cd4781e0c88 545 int index = 0;
JMF 0:5cd4781e0c88 546 if (this->thread)
JMF 0:5cd4781e0c88 547 index = findFreeOperation();
JMF 0:5cd4781e0c88 548 Timer& atimer = operations[index].timer;
JMF 0:5cd4781e0c88 549
JMF 0:5cd4781e0c88 550 atimer.countdown(limits.command_timeout_ms);
JMF 0:5cd4781e0c88 551 MQTTString topic = {(char*)topicName, 0, 0};
JMF 0:5cd4781e0c88 552
JMF 0:5cd4781e0c88 553 if (message->qos == QOS1 || message->qos == QOS2)
JMF 0:5cd4781e0c88 554 message->id = packetid.getNext();
JMF 0:5cd4781e0c88 555
JMF 0:5cd4781e0c88 556 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen);
JMF 0:5cd4781e0c88 557 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet
JMF 0:5cd4781e0c88 558 if (rc != len)
JMF 0:5cd4781e0c88 559 goto exit; // there was a problem
JMF 0:5cd4781e0c88 560
JMF 0:5cd4781e0c88 561 /* wait for acks */
JMF 0:5cd4781e0c88 562 if (resultHandler == 0)
JMF 0:5cd4781e0c88 563 {
JMF 0:5cd4781e0c88 564 if (message->qos == QOS1)
JMF 0:5cd4781e0c88 565 {
JMF 0:5cd4781e0c88 566 if (waitfor(PUBACK, atimer) == PUBACK)
JMF 0:5cd4781e0c88 567 {
JMF 0:5cd4781e0c88 568 int type, dup, mypacketid;
JMF 0:5cd4781e0c88 569 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 570 rc = 0;
JMF 0:5cd4781e0c88 571 }
JMF 0:5cd4781e0c88 572 }
JMF 0:5cd4781e0c88 573 else if (message->qos == QOS2)
JMF 0:5cd4781e0c88 574 {
JMF 0:5cd4781e0c88 575 if (waitfor(PUBCOMP, atimer) == PUBCOMP)
JMF 0:5cd4781e0c88 576 {
JMF 0:5cd4781e0c88 577 int type, dup, mypacketid;
JMF 0:5cd4781e0c88 578 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 579 rc = 0;
JMF 0:5cd4781e0c88 580 }
JMF 0:5cd4781e0c88 581
JMF 0:5cd4781e0c88 582 }
JMF 0:5cd4781e0c88 583 }
JMF 0:5cd4781e0c88 584 else
JMF 0:5cd4781e0c88 585 {
JMF 0:5cd4781e0c88 586 // set publish response callback function
JMF 0:5cd4781e0c88 587
JMF 0:5cd4781e0c88 588 }
JMF 0:5cd4781e0c88 589
JMF 0:5cd4781e0c88 590 exit:
JMF 0:5cd4781e0c88 591 return rc;
JMF 0:5cd4781e0c88 592 }
JMF 0:5cd4781e0c88 593
JMF 0:5cd4781e0c88 594
JMF 0:5cd4781e0c88 595 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler)
JMF 0:5cd4781e0c88 596 {
JMF 0:5cd4781e0c88 597 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete
JMF 0:5cd4781e0c88 598 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
JMF 0:5cd4781e0c88 599 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet
JMF 0:5cd4781e0c88 600
JMF 0:5cd4781e0c88 601 return (rc == len) ? 0 : -1;
JMF 0:5cd4781e0c88 602 }
JMF 0:5cd4781e0c88 603
JMF 0:5cd4781e0c88 604
JMF 0:5cd4781e0c88 605
JMF 0:5cd4781e0c88 606 #endif