Better timing

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Committer:
icraggs
Date:
Wed Apr 30 13:03:45 2014 +0000
Revision:
22:aadb79d29330
Parent:
21:e918525e529d
Child:
23:05fc7de97d4a
Documentation updates

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 6:4d312a49200b 1 /*******************************************************************************
icraggs 6:4d312a49200b 2 * Copyright (c) 2014 IBM Corp.
sam_grove 0:fe461e4d7afe 3 *
icraggs 6:4d312a49200b 4 * All rights reserved. This program and the accompanying materials
icraggs 6:4d312a49200b 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 6:4d312a49200b 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
sam_grove 0:fe461e4d7afe 7 *
icraggs 6:4d312a49200b 8 * The Eclipse Public License is available at
icraggs 6:4d312a49200b 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 6:4d312a49200b 10 * and the Eclipse Distribution License is available at
icraggs 6:4d312a49200b 11 * http://www.eclipse.org/org/documents/edl-v10.php.
sam_grove 0:fe461e4d7afe 12 *
icraggs 6:4d312a49200b 13 * Contributors:
icraggs 6:4d312a49200b 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 6:4d312a49200b 15 *******************************************************************************/
icraggs 21:e918525e529d 16
icraggs 21:e918525e529d 17 /*
icraggs 21:e918525e529d 18
icraggs 21:e918525e529d 19 TODO:
icraggs 21:e918525e529d 20
icraggs 21:e918525e529d 21 log messages - use macros
icraggs 21:e918525e529d 22 define return code constants
icraggs 21:e918525e529d 23
icraggs 21:e918525e529d 24 */
sam_grove 0:fe461e4d7afe 25
icraggs 2:dcfdd2abfe71 26 #if !defined(MQTTCLIENT_H)
icraggs 2:dcfdd2abfe71 27 #define MQTTCLIENT_H
icraggs 2:dcfdd2abfe71 28
icraggs 2:dcfdd2abfe71 29 #include "FP.h"
icraggs 3:dbff6b768d28 30 #include "MQTTPacket.h"
icraggs 6:4d312a49200b 31 #include "stdio.h"
icraggs 2:dcfdd2abfe71 32
icraggs 3:dbff6b768d28 33 namespace MQTT
icraggs 3:dbff6b768d28 34 {
icraggs 3:dbff6b768d28 35
icraggs 2:dcfdd2abfe71 36
icraggs 2:dcfdd2abfe71 37 enum QoS { QOS0, QOS1, QOS2 };
sam_grove 0:fe461e4d7afe 38
icraggs 2:dcfdd2abfe71 39
icraggs 3:dbff6b768d28 40 struct Message
icraggs 2:dcfdd2abfe71 41 {
icraggs 2:dcfdd2abfe71 42 enum QoS qos;
icraggs 2:dcfdd2abfe71 43 bool retained;
icraggs 2:dcfdd2abfe71 44 bool dup;
Ian Craggs 12:cc7f2d62a393 45 unsigned short id;
icraggs 2:dcfdd2abfe71 46 void *payload;
icraggs 2:dcfdd2abfe71 47 size_t payloadlen;
sam_grove 0:fe461e4d7afe 48 };
sam_grove 0:fe461e4d7afe 49
icraggs 4:4ef00243708e 50
icraggs 20:cad3d54d7ecf 51 struct MessageData
icraggs 20:cad3d54d7ecf 52 {
icraggs 20:cad3d54d7ecf 53 struct Message message;
icraggs 20:cad3d54d7ecf 54 char* topicName;
icraggs 20:cad3d54d7ecf 55 };
icraggs 20:cad3d54d7ecf 56
icraggs 20:cad3d54d7ecf 57
icraggs 9:01b8cc7d94cc 58 class PacketId
icraggs 9:01b8cc7d94cc 59 {
icraggs 9:01b8cc7d94cc 60 public:
icraggs 9:01b8cc7d94cc 61 PacketId();
icraggs 9:01b8cc7d94cc 62
icraggs 15:64a57183aa03 63 int getNext();
Ian Craggs 12:cc7f2d62a393 64
icraggs 9:01b8cc7d94cc 65 private:
icraggs 9:01b8cc7d94cc 66 static const int MAX_PACKET_ID = 65535;
icraggs 9:01b8cc7d94cc 67 int next;
icraggs 9:01b8cc7d94cc 68 };
icraggs 9:01b8cc7d94cc 69
icraggs 9:01b8cc7d94cc 70 typedef void (*messageHandler)(Message*);
icraggs 16:91c2f9a144d4 71
icraggs 16:91c2f9a144d4 72 typedef struct limits
icraggs 16:91c2f9a144d4 73 {
icraggs 20:cad3d54d7ecf 74 int MAX_MQTT_PACKET_SIZE; //
icraggs 20:cad3d54d7ecf 75 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler
icraggs 20:cad3d54d7ecf 76 long command_timeout_ms;
icraggs 20:cad3d54d7ecf 77
icraggs 20:cad3d54d7ecf 78 limits()
icraggs 20:cad3d54d7ecf 79 {
icraggs 20:cad3d54d7ecf 80 MAX_MQTT_PACKET_SIZE = 100;
icraggs 20:cad3d54d7ecf 81 MAX_MESSAGE_HANDLERS = 5;
icraggs 20:cad3d54d7ecf 82 command_timeout_ms = 30000;
icraggs 20:cad3d54d7ecf 83 }
icraggs 16:91c2f9a144d4 84 } Limits;
icraggs 2:dcfdd2abfe71 85
icraggs 16:91c2f9a144d4 86
icraggs 21:e918525e529d 87 /**
icraggs 21:e918525e529d 88 * @class Client
icraggs 22:aadb79d29330 89 * @brief blocking, non-threaded MQTT client API
icraggs 21:e918525e529d 90 * @param Network a network class which supports send, receive
icraggs 21:e918525e529d 91 * @param Timer a timer class with the methods:
icraggs 21:e918525e529d 92 */
icraggs 20:cad3d54d7ecf 93 template<class Network, class Timer> class Client
icraggs 2:dcfdd2abfe71 94 {
icraggs 2:dcfdd2abfe71 95
icraggs 22:aadb79d29330 96 public:
icraggs 15:64a57183aa03 97
icraggs 20:cad3d54d7ecf 98 /** Construct the client
icraggs 20:cad3d54d7ecf 99 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
icraggs 20:cad3d54d7ecf 100 * before calling MQTT connect
icraggs 20:cad3d54d7ecf 101 * @param limits an instance of the Limit class - to alter limits as required
icraggs 20:cad3d54d7ecf 102 */
icraggs 16:91c2f9a144d4 103 Client(Network* network, const Limits limits = Limits());
icraggs 20:cad3d54d7ecf 104
icraggs 20:cad3d54d7ecf 105 typedef struct
icraggs 20:cad3d54d7ecf 106 {
icraggs 20:cad3d54d7ecf 107 Client* client;
icraggs 20:cad3d54d7ecf 108 Network* network;
icraggs 20:cad3d54d7ecf 109 } connectionLostInfo;
icraggs 20:cad3d54d7ecf 110
icraggs 20:cad3d54d7ecf 111 typedef int (*connectionLostHandlers)(connectionLostInfo*);
icraggs 20:cad3d54d7ecf 112
icraggs 20:cad3d54d7ecf 113 /** Set the connection lost callback - called whenever the connection is lost and we should be connected
icraggs 20:cad3d54d7ecf 114 * @param clh - pointer to the callback function
icraggs 20:cad3d54d7ecf 115 */
icraggs 20:cad3d54d7ecf 116 void setConnectionLostHandler(connectionLostHandlers clh)
icraggs 20:cad3d54d7ecf 117 {
icraggs 20:cad3d54d7ecf 118 connectionLostHandler.attach(clh);
icraggs 20:cad3d54d7ecf 119 }
icraggs 20:cad3d54d7ecf 120
icraggs 20:cad3d54d7ecf 121 /** Set the default message handling callback - used for any message which does not match a subscription message handler
icraggs 20:cad3d54d7ecf 122 * @param mh - pointer to the callback function
icraggs 20:cad3d54d7ecf 123 */
icraggs 20:cad3d54d7ecf 124 void setDefaultMessageHandler(messageHandler mh)
icraggs 20:cad3d54d7ecf 125 {
icraggs 20:cad3d54d7ecf 126 defaultMessageHandler.attach(mh);
icraggs 20:cad3d54d7ecf 127 }
icraggs 2:dcfdd2abfe71 128
icraggs 20:cad3d54d7ecf 129 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 20:cad3d54d7ecf 130 * The nework object must be connected to the network endpoint before calling this
icraggs 20:cad3d54d7ecf 131 * @param options - connect options
icraggs 20:cad3d54d7ecf 132 * @return success code -
icraggs 20:cad3d54d7ecf 133 */
icraggs 20:cad3d54d7ecf 134 int connect(MQTTPacket_connectData* options = 0);
icraggs 20:cad3d54d7ecf 135
icraggs 20:cad3d54d7ecf 136 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 20:cad3d54d7ecf 137 * @param topic - the topic to publish to
icraggs 20:cad3d54d7ecf 138 * @param message - the message to send
icraggs 20:cad3d54d7ecf 139 * @return success code -
icraggs 20:cad3d54d7ecf 140 */
icraggs 20:cad3d54d7ecf 141 int publish(const char* topicName, Message* message);
icraggs 20:cad3d54d7ecf 142
icraggs 20:cad3d54d7ecf 143 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
icraggs 20:cad3d54d7ecf 144 * @param topicFilter - a topic pattern which can include wildcards
icraggs 20:cad3d54d7ecf 145 * @param qos - the MQTT QoS to subscribe at
icraggs 20:cad3d54d7ecf 146 * @param mh - the callback function to be invoked when a message is received for this subscription
icraggs 20:cad3d54d7ecf 147 * @return success code -
icraggs 20:cad3d54d7ecf 148 */
icraggs 20:cad3d54d7ecf 149 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
icraggs 9:01b8cc7d94cc 150
icraggs 20:cad3d54d7ecf 151 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
icraggs 20:cad3d54d7ecf 152 * @param topicFilter - a topic pattern which can include wildcards
icraggs 20:cad3d54d7ecf 153 * @return success code -
icraggs 20:cad3d54d7ecf 154 */
icraggs 20:cad3d54d7ecf 155 int unsubscribe(const char* topicFilter);
icraggs 20:cad3d54d7ecf 156
icraggs 20:cad3d54d7ecf 157 /** MQTT Disconnect - send an MQTT disconnect packet
icraggs 20:cad3d54d7ecf 158 * @return success code -
icraggs 20:cad3d54d7ecf 159 */
icraggs 20:cad3d54d7ecf 160 int disconnect();
icraggs 8:c46930bd6c82 161
icraggs 20:cad3d54d7ecf 162 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
icraggs 20:cad3d54d7ecf 163 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
icraggs 20:cad3d54d7ecf 164 * received.
icraggs 20:cad3d54d7ecf 165 */
icraggs 20:cad3d54d7ecf 166 void yield(int timeout);
icraggs 20:cad3d54d7ecf 167
icraggs 20:cad3d54d7ecf 168 private:
icraggs 20:cad3d54d7ecf 169
icraggs 20:cad3d54d7ecf 170 int cycle(Timer& timer);
icraggs 20:cad3d54d7ecf 171 int waitfor(int packet_type, Timer& timer);
icraggs 20:cad3d54d7ecf 172 int keepalive();
icraggs 2:dcfdd2abfe71 173
icraggs 3:dbff6b768d28 174 int decodePacket(int* value, int timeout);
icraggs 20:cad3d54d7ecf 175 int readPacket(Timer& timer);
icraggs 20:cad3d54d7ecf 176 int sendPacket(int length, Timer& timer);
icraggs 20:cad3d54d7ecf 177 int deliverMessage(MQTTString* topic, Message* message);
icraggs 3:dbff6b768d28 178
icraggs 4:4ef00243708e 179 Network* ipstack;
icraggs 16:91c2f9a144d4 180
icraggs 16:91c2f9a144d4 181 Limits limits;
icraggs 4:4ef00243708e 182
icraggs 16:91c2f9a144d4 183 char* buf;
icraggs 4:4ef00243708e 184 char* readbuf;
icraggs 15:64a57183aa03 185
icraggs 20:cad3d54d7ecf 186 Timer ping_timer;
icraggs 15:64a57183aa03 187 unsigned int keepAliveInterval;
icraggs 20:cad3d54d7ecf 188 bool ping_outstanding;
icraggs 4:4ef00243708e 189
icraggs 9:01b8cc7d94cc 190 PacketId packetid;
icraggs 9:01b8cc7d94cc 191
icraggs 9:01b8cc7d94cc 192 typedef FP<void, Message*> messageHandlerFP;
icraggs 16:91c2f9a144d4 193 struct MessageHandlers
icraggs 15:64a57183aa03 194 {
icraggs 20:cad3d54d7ecf 195 const char* topic;
icraggs 20:cad3d54d7ecf 196 messageHandlerFP fp;
icraggs 16:91c2f9a144d4 197 } *messageHandlers; // Message handlers are indexed by subscription topic
icraggs 15:64a57183aa03 198
icraggs 20:cad3d54d7ecf 199 messageHandlerFP defaultMessageHandler;
icraggs 20:cad3d54d7ecf 200
icraggs 20:cad3d54d7ecf 201 typedef FP<int, connectionLostInfo*> connectionLostFP;
icraggs 20:cad3d54d7ecf 202
icraggs 20:cad3d54d7ecf 203 connectionLostFP connectionLostHandler;
icraggs 11:db15da110a37 204
sam_grove 0:fe461e4d7afe 205 };
sam_grove 0:fe461e4d7afe 206
icraggs 15:64a57183aa03 207 }
icraggs 15:64a57183aa03 208
icraggs 15:64a57183aa03 209
icraggs 20:cad3d54d7ecf 210 template<class Network, class Timer> MQTT::Client<Network, Timer>::Client(Network* network, Limits limits) : limits(limits), packetid()
icraggs 15:64a57183aa03 211 {
icraggs 20:cad3d54d7ecf 212 this->ipstack = network;
icraggs 20:cad3d54d7ecf 213 this->ping_timer = Timer();
icraggs 20:cad3d54d7ecf 214 this->ping_outstanding = 0;
icraggs 20:cad3d54d7ecf 215
icraggs 20:cad3d54d7ecf 216 // How to make these memory allocations portable? I was hoping to avoid the heap
icraggs 20:cad3d54d7ecf 217 buf = new char[limits.MAX_MQTT_PACKET_SIZE];
icraggs 20:cad3d54d7ecf 218 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE];
icraggs 20:cad3d54d7ecf 219 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS];
icraggs 20:cad3d54d7ecf 220 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 221 messageHandlers[i].topic = 0;
icraggs 11:db15da110a37 222 }
icraggs 11:db15da110a37 223
icraggs 11:db15da110a37 224
icraggs 20:cad3d54d7ecf 225 template<class Network, class Timer> int MQTT::Client<Network, Timer>::sendPacket(int length, Timer& timer)
icraggs 8:c46930bd6c82 226 {
icraggs 8:c46930bd6c82 227 int sent = 0;
icraggs 8:c46930bd6c82 228
icraggs 8:c46930bd6c82 229 while (sent < length)
icraggs 20:cad3d54d7ecf 230 sent += ipstack->write(&buf[sent], length, timer.left_ms());
icraggs 20:cad3d54d7ecf 231 if (sent == length)
icraggs 20:cad3d54d7ecf 232 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
icraggs 8:c46930bd6c82 233 return sent;
icraggs 8:c46930bd6c82 234 }
icraggs 8:c46930bd6c82 235
icraggs 8:c46930bd6c82 236
icraggs 20:cad3d54d7ecf 237 template<class Network, class Timer> int MQTT::Client<Network, Timer>::decodePacket(int* value, int timeout)
icraggs 8:c46930bd6c82 238 {
icraggs 8:c46930bd6c82 239 char c;
icraggs 8:c46930bd6c82 240 int multiplier = 1;
icraggs 8:c46930bd6c82 241 int len = 0;
icraggs 20:cad3d54d7ecf 242 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
icraggs 8:c46930bd6c82 243
icraggs 8:c46930bd6c82 244 *value = 0;
icraggs 8:c46930bd6c82 245 do
icraggs 8:c46930bd6c82 246 {
icraggs 8:c46930bd6c82 247 int rc = MQTTPACKET_READ_ERROR;
icraggs 8:c46930bd6c82 248
icraggs 8:c46930bd6c82 249 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 8:c46930bd6c82 250 {
icraggs 8:c46930bd6c82 251 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 8:c46930bd6c82 252 goto exit;
icraggs 8:c46930bd6c82 253 }
icraggs 8:c46930bd6c82 254 rc = ipstack->read(&c, 1, timeout);
icraggs 8:c46930bd6c82 255 if (rc != 1)
icraggs 8:c46930bd6c82 256 goto exit;
icraggs 8:c46930bd6c82 257 *value += (c & 127) * multiplier;
icraggs 8:c46930bd6c82 258 multiplier *= 128;
icraggs 8:c46930bd6c82 259 } while ((c & 128) != 0);
icraggs 8:c46930bd6c82 260 exit:
icraggs 8:c46930bd6c82 261 return len;
icraggs 8:c46930bd6c82 262 }
icraggs 8:c46930bd6c82 263
icraggs 8:c46930bd6c82 264
icraggs 8:c46930bd6c82 265 /**
icraggs 8:c46930bd6c82 266 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 8:c46930bd6c82 267 * the packets can be retried.
icraggs 8:c46930bd6c82 268 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 8:c46930bd6c82 269 * @return the MQTT packet type, or -1 if none
icraggs 8:c46930bd6c82 270 */
icraggs 20:cad3d54d7ecf 271 template<class Network, class Timer> int MQTT::Client<Network, Timer>::readPacket(Timer& timer)
icraggs 8:c46930bd6c82 272 {
icraggs 8:c46930bd6c82 273 int rc = -1;
icraggs 8:c46930bd6c82 274 MQTTHeader header = {0};
icraggs 8:c46930bd6c82 275 int len = 0;
icraggs 8:c46930bd6c82 276 int rem_len = 0;
icraggs 8:c46930bd6c82 277
icraggs 8:c46930bd6c82 278 /* 1. read the header byte. This has the packet type in it */
icraggs 20:cad3d54d7ecf 279 if (ipstack->read(readbuf, 1, timer.left_ms()) != 1)
icraggs 8:c46930bd6c82 280 goto exit;
icraggs 8:c46930bd6c82 281
icraggs 8:c46930bd6c82 282 len = 1;
icraggs 8:c46930bd6c82 283 /* 2. read the remaining length. This is variable in itself */
icraggs 20:cad3d54d7ecf 284 decodePacket(&rem_len, timer.left_ms());
icraggs 8:c46930bd6c82 285 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 8:c46930bd6c82 286
icraggs 8:c46930bd6c82 287 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 20:cad3d54d7ecf 288 if (ipstack->read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
icraggs 8:c46930bd6c82 289 goto exit;
icraggs 8:c46930bd6c82 290
icraggs 8:c46930bd6c82 291 header.byte = readbuf[0];
icraggs 8:c46930bd6c82 292 rc = header.bits.type;
icraggs 8:c46930bd6c82 293 exit:
icraggs 8:c46930bd6c82 294 return rc;
icraggs 3:dbff6b768d28 295 }
icraggs 3:dbff6b768d28 296
icraggs 8:c46930bd6c82 297
icraggs 20:cad3d54d7ecf 298 template<class Network, class Timer> int MQTT::Client<Network, Timer>::deliverMessage(MQTTString* topic, Message* message)
icraggs 15:64a57183aa03 299 {
icraggs 20:cad3d54d7ecf 300 int rc = -1;
icraggs 20:cad3d54d7ecf 301
icraggs 20:cad3d54d7ecf 302 // we have to find the right message handler - indexed by topic
icraggs 20:cad3d54d7ecf 303 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 304 {
icraggs 20:cad3d54d7ecf 305 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
icraggs 20:cad3d54d7ecf 306 {
icraggs 20:cad3d54d7ecf 307 messageHandlers[i].fp(message);
icraggs 20:cad3d54d7ecf 308 rc = 0;
icraggs 20:cad3d54d7ecf 309 break;
icraggs 20:cad3d54d7ecf 310 }
icraggs 20:cad3d54d7ecf 311 }
icraggs 20:cad3d54d7ecf 312 if (rc == -1)
icraggs 20:cad3d54d7ecf 313 defaultMessageHandler(message);
icraggs 20:cad3d54d7ecf 314
icraggs 20:cad3d54d7ecf 315 return rc;
icraggs 15:64a57183aa03 316 }
icraggs 15:64a57183aa03 317
icraggs 15:64a57183aa03 318
icraggs 20:cad3d54d7ecf 319
icraggs 20:cad3d54d7ecf 320 template<class Network, class Timer> void MQTT::Client<Network, Timer>::yield(int timeout)
icraggs 20:cad3d54d7ecf 321 {
icraggs 20:cad3d54d7ecf 322 Timer timer = Timer();
icraggs 20:cad3d54d7ecf 323
icraggs 20:cad3d54d7ecf 324 timer.countdown_ms(timeout);
icraggs 20:cad3d54d7ecf 325 while (!timer.expired())
icraggs 20:cad3d54d7ecf 326 cycle(timer);
icraggs 20:cad3d54d7ecf 327 }
icraggs 20:cad3d54d7ecf 328
icraggs 20:cad3d54d7ecf 329
icraggs 20:cad3d54d7ecf 330 template<class Network, class Timer> int MQTT::Client<Network, Timer>::cycle(Timer& timer)
icraggs 8:c46930bd6c82 331 {
icraggs 8:c46930bd6c82 332 /* get one piece of work off the wire and one pass through */
icraggs 20:cad3d54d7ecf 333
Ian Craggs 12:cc7f2d62a393 334 // read the socket, see what work is due
icraggs 20:cad3d54d7ecf 335 int packet_type = readPacket(timer);
icraggs 15:64a57183aa03 336
icraggs 20:cad3d54d7ecf 337 int len, rc;
icraggs 8:c46930bd6c82 338 switch (packet_type)
icraggs 8:c46930bd6c82 339 {
icraggs 15:64a57183aa03 340 case CONNACK:
icraggs 8:c46930bd6c82 341 case PUBACK:
icraggs 8:c46930bd6c82 342 case SUBACK:
icraggs 8:c46930bd6c82 343 break;
icraggs 15:64a57183aa03 344 case PUBLISH:
icraggs 20:cad3d54d7ecf 345 MQTTString topicName;
icraggs 20:cad3d54d7ecf 346 Message msg;
icraggs 20:cad3d54d7ecf 347 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
icraggs 20:cad3d54d7ecf 348 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);;
icraggs 20:cad3d54d7ecf 349 deliverMessage(&topicName, &msg);
icraggs 20:cad3d54d7ecf 350 if (msg.qos != QOS0)
icraggs 20:cad3d54d7ecf 351 {
icraggs 20:cad3d54d7ecf 352 if (msg.qos == QOS1)
icraggs 20:cad3d54d7ecf 353 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
icraggs 20:cad3d54d7ecf 354 else if (msg.qos == QOS2)
icraggs 20:cad3d54d7ecf 355 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
icraggs 20:cad3d54d7ecf 356 rc = sendPacket(len, timer);
icraggs 20:cad3d54d7ecf 357 if (rc != len)
icraggs 20:cad3d54d7ecf 358 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 359 }
Ian Craggs 12:cc7f2d62a393 360 break;
icraggs 15:64a57183aa03 361 case PUBREC:
icraggs 20:cad3d54d7ecf 362 int type, dup, mypacketid;
icraggs 20:cad3d54d7ecf 363 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 364 ;
icraggs 20:cad3d54d7ecf 365 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
icraggs 20:cad3d54d7ecf 366 rc = sendPacket(len, timer); // send the PUBREL packet
icraggs 20:cad3d54d7ecf 367 if (rc != len)
icraggs 20:cad3d54d7ecf 368 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 369
icraggs 8:c46930bd6c82 370 break;
icraggs 8:c46930bd6c82 371 case PUBCOMP:
icraggs 8:c46930bd6c82 372 break;
icraggs 15:64a57183aa03 373 case PINGRESP:
icraggs 20:cad3d54d7ecf 374 ping_outstanding = false;
icraggs 8:c46930bd6c82 375 break;
icraggs 15:64a57183aa03 376 }
icraggs 20:cad3d54d7ecf 377 keepalive();
Ian Craggs 12:cc7f2d62a393 378 exit:
icraggs 8:c46930bd6c82 379 return packet_type;
icraggs 15:64a57183aa03 380 }
icraggs 15:64a57183aa03 381
icraggs 15:64a57183aa03 382
icraggs 20:cad3d54d7ecf 383 template<class Network, class Timer> int MQTT::Client<Network, Timer>::keepalive()
icraggs 15:64a57183aa03 384 {
icraggs 20:cad3d54d7ecf 385 int rc = 0;
icraggs 15:64a57183aa03 386
icraggs 20:cad3d54d7ecf 387 if (keepAliveInterval == 0)
icraggs 20:cad3d54d7ecf 388 goto exit;
icraggs 15:64a57183aa03 389
icraggs 20:cad3d54d7ecf 390 if (ping_timer.expired())
icraggs 20:cad3d54d7ecf 391 {
icraggs 20:cad3d54d7ecf 392 if (ping_outstanding)
icraggs 20:cad3d54d7ecf 393 rc = -1;
icraggs 20:cad3d54d7ecf 394 else
icraggs 20:cad3d54d7ecf 395 {
icraggs 20:cad3d54d7ecf 396 Timer timer = Timer(1000);
icraggs 20:cad3d54d7ecf 397 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE);
icraggs 20:cad3d54d7ecf 398 rc = sendPacket(len, timer); // send the ping packet
icraggs 20:cad3d54d7ecf 399 if (rc != len)
icraggs 20:cad3d54d7ecf 400 rc = -1; // indicate there's a problem
icraggs 20:cad3d54d7ecf 401 else
icraggs 20:cad3d54d7ecf 402 ping_outstanding = true;
icraggs 20:cad3d54d7ecf 403 }
icraggs 20:cad3d54d7ecf 404 }
icraggs 15:64a57183aa03 405
icraggs 15:64a57183aa03 406 exit:
icraggs 20:cad3d54d7ecf 407 return rc;
icraggs 8:c46930bd6c82 408 }
icraggs 8:c46930bd6c82 409
icraggs 8:c46930bd6c82 410
icraggs 16:91c2f9a144d4 411 // only used in single-threaded mode where one command at a time is in process
icraggs 20:cad3d54d7ecf 412 template<class Network, class Timer> int MQTT::Client<Network, Timer>::waitfor(int packet_type, Timer& timer)
icraggs 15:64a57183aa03 413 {
icraggs 20:cad3d54d7ecf 414 int rc = -1;
icraggs 20:cad3d54d7ecf 415
icraggs 20:cad3d54d7ecf 416 do
icraggs 16:91c2f9a144d4 417 {
icraggs 20:cad3d54d7ecf 418 if (timer.expired())
icraggs 20:cad3d54d7ecf 419 break; // we timed out
icraggs 20:cad3d54d7ecf 420 }
icraggs 20:cad3d54d7ecf 421 while ((rc = cycle(timer)) != packet_type);
icraggs 20:cad3d54d7ecf 422
icraggs 20:cad3d54d7ecf 423 return rc;
icraggs 16:91c2f9a144d4 424 }
icraggs 16:91c2f9a144d4 425
icraggs 16:91c2f9a144d4 426
icraggs 20:cad3d54d7ecf 427 template<class Network, class Timer> int MQTT::Client<Network, Timer>::connect(MQTTPacket_connectData* options)
icraggs 16:91c2f9a144d4 428 {
icraggs 20:cad3d54d7ecf 429 Timer connect_timer = Timer(limits.command_timeout_ms);
icraggs 8:c46930bd6c82 430
Ian Craggs 12:cc7f2d62a393 431 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 8:c46930bd6c82 432 if (options == 0)
Ian Craggs 12:cc7f2d62a393 433 options = &default_options; // set default options if none were supplied
icraggs 8:c46930bd6c82 434
icraggs 15:64a57183aa03 435 this->keepAliveInterval = options->keepAliveInterval;
icraggs 20:cad3d54d7ecf 436 ping_timer.countdown(this->keepAliveInterval);
icraggs 16:91c2f9a144d4 437 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options);
icraggs 20:cad3d54d7ecf 438 int rc = sendPacket(len, connect_timer); // send the connect packet
icraggs 20:cad3d54d7ecf 439 if (rc != len)
icraggs 20:cad3d54d7ecf 440 goto exit; // there was a problem
icraggs 8:c46930bd6c82 441
icraggs 20:cad3d54d7ecf 442 // this will be a blocking call, wait for the connack
icraggs 20:cad3d54d7ecf 443 if (waitfor(CONNACK, connect_timer) == CONNACK)
icraggs 15:64a57183aa03 444 {
icraggs 20:cad3d54d7ecf 445 int connack_rc = -1;
icraggs 20:cad3d54d7ecf 446 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 447 rc = connack_rc;
icraggs 8:c46930bd6c82 448 }
icraggs 15:64a57183aa03 449
icraggs 15:64a57183aa03 450 exit:
icraggs 8:c46930bd6c82 451 return rc;
icraggs 8:c46930bd6c82 452 }
icraggs 8:c46930bd6c82 453
icraggs 8:c46930bd6c82 454
icraggs 20:cad3d54d7ecf 455 template<class Network, class Timer> int MQTT::Client<Network, Timer>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
icraggs 20:cad3d54d7ecf 456 {
icraggs 20:cad3d54d7ecf 457 int len = -1;
icraggs 20:cad3d54d7ecf 458 Timer timer = Timer(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 459
icraggs 8:c46930bd6c82 460 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 461
icraggs 20:cad3d54d7ecf 462 int rc = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
icraggs 20:cad3d54d7ecf 463 if (rc <= 0)
icraggs 20:cad3d54d7ecf 464 goto exit;
icraggs 20:cad3d54d7ecf 465 len = rc;
icraggs 20:cad3d54d7ecf 466 if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet
icraggs 20:cad3d54d7ecf 467 goto exit; // there was a problem
icraggs 8:c46930bd6c82 468
icraggs 20:cad3d54d7ecf 469 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
icraggs 8:c46930bd6c82 470 {
icraggs 20:cad3d54d7ecf 471 int count = 0, grantedQoS = -1, mypacketid;
icraggs 20:cad3d54d7ecf 472 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 473 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 20:cad3d54d7ecf 474 if (rc != 0x80)
icraggs 8:c46930bd6c82 475 {
icraggs 20:cad3d54d7ecf 476 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i)
icraggs 16:91c2f9a144d4 477 {
icraggs 20:cad3d54d7ecf 478 if (messageHandlers[i].topic == 0)
icraggs 20:cad3d54d7ecf 479 {
icraggs 20:cad3d54d7ecf 480 messageHandlers[i].topic = topicFilter;
icraggs 20:cad3d54d7ecf 481 messageHandlers[i].fp.attach(messageHandler);
icraggs 20:cad3d54d7ecf 482 rc = 0;
icraggs 20:cad3d54d7ecf 483 break;
icraggs 20:cad3d54d7ecf 484 }
icraggs 16:91c2f9a144d4 485 }
icraggs 8:c46930bd6c82 486 }
icraggs 8:c46930bd6c82 487 }
icraggs 15:64a57183aa03 488
icraggs 15:64a57183aa03 489 exit:
Ian Craggs 12:cc7f2d62a393 490 return rc;
icraggs 15:64a57183aa03 491 }
icraggs 15:64a57183aa03 492
icraggs 15:64a57183aa03 493
icraggs 20:cad3d54d7ecf 494 template<class Network, class Timer> int MQTT::Client<Network, Timer>::unsubscribe(const char* topicFilter)
icraggs 20:cad3d54d7ecf 495 {
icraggs 20:cad3d54d7ecf 496 int len = -1;
icraggs 20:cad3d54d7ecf 497 Timer timer = Timer(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 498
Ian Craggs 12:cc7f2d62a393 499 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 500
icraggs 20:cad3d54d7ecf 501 int rc = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
icraggs 20:cad3d54d7ecf 502 if (rc <= 0)
icraggs 20:cad3d54d7ecf 503 goto exit;
icraggs 20:cad3d54d7ecf 504 len = rc;
icraggs 20:cad3d54d7ecf 505 if ((rc = sendPacket(len, timer)) != len) // send the subscribe packet
icraggs 20:cad3d54d7ecf 506 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 507
icraggs 20:cad3d54d7ecf 508 if (waitfor(UNSUBACK, timer) == UNSUBACK)
Ian Craggs 12:cc7f2d62a393 509 {
icraggs 20:cad3d54d7ecf 510 int mypacketid; // should be the same as the packetid above
icraggs 20:cad3d54d7ecf 511 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 512 rc = 0;
Ian Craggs 12:cc7f2d62a393 513 }
icraggs 15:64a57183aa03 514
icraggs 15:64a57183aa03 515 exit:
Ian Craggs 12:cc7f2d62a393 516 return rc;
icraggs 15:64a57183aa03 517 }
icraggs 15:64a57183aa03 518
icraggs 15:64a57183aa03 519
icraggs 15:64a57183aa03 520
icraggs 20:cad3d54d7ecf 521 template<class Network, class Timer> int MQTT::Client<Network, Timer>::publish(const char* topicName, Message* message)
icraggs 20:cad3d54d7ecf 522 {
icraggs 20:cad3d54d7ecf 523 Timer timer = Timer(limits.command_timeout_ms);
icraggs 20:cad3d54d7ecf 524
icraggs 20:cad3d54d7ecf 525 MQTTString topicString = {(char*)topicName, 0, 0};
icraggs 15:64a57183aa03 526
icraggs 20:cad3d54d7ecf 527 if (message->qos == QOS1 || message->qos == QOS2)
icraggs 20:cad3d54d7ecf 528 message->id = packetid.getNext();
icraggs 15:64a57183aa03 529
icraggs 20:cad3d54d7ecf 530 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
icraggs 20:cad3d54d7ecf 531 topicString, (char*)message->payload, message->payloadlen);
icraggs 20:cad3d54d7ecf 532 int rc = sendPacket(len, timer); // send the subscribe packet
icraggs 20:cad3d54d7ecf 533 if (rc != len)
icraggs 20:cad3d54d7ecf 534 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 535
icraggs 20:cad3d54d7ecf 536 if (message->qos == QOS1)
Ian Craggs 12:cc7f2d62a393 537 {
icraggs 20:cad3d54d7ecf 538 if (waitfor(PUBACK, timer) == PUBACK)
icraggs 20:cad3d54d7ecf 539 {
icraggs 20:cad3d54d7ecf 540 int type, dup, mypacketid;
icraggs 20:cad3d54d7ecf 541 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 542 rc = 0;
icraggs 20:cad3d54d7ecf 543 }
Ian Craggs 12:cc7f2d62a393 544 }
icraggs 20:cad3d54d7ecf 545 else if (message->qos == QOS2)
Ian Craggs 12:cc7f2d62a393 546 {
icraggs 20:cad3d54d7ecf 547 if (waitfor(PUBCOMP, timer) == PUBCOMP)
icraggs 20:cad3d54d7ecf 548 {
icraggs 20:cad3d54d7ecf 549 int type, dup, mypacketid;
icraggs 20:cad3d54d7ecf 550 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 551 rc = 0;
icraggs 20:cad3d54d7ecf 552 }
Ian Craggs 12:cc7f2d62a393 553 }
icraggs 15:64a57183aa03 554
icraggs 15:64a57183aa03 555 exit:
icraggs 8:c46930bd6c82 556 return rc;
icraggs 8:c46930bd6c82 557 }
icraggs 8:c46930bd6c82 558
icraggs 8:c46930bd6c82 559
icraggs 20:cad3d54d7ecf 560 template<class Network, class Timer> int MQTT::Client<Network, Timer>::disconnect()
icraggs 20:cad3d54d7ecf 561 {
icraggs 20:cad3d54d7ecf 562 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete
icraggs 20:cad3d54d7ecf 563 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE);
icraggs 20:cad3d54d7ecf 564 int rc = sendPacket(len, timer); // send the disconnect packet
icraggs 20:cad3d54d7ecf 565
icraggs 21:e918525e529d 566 return (rc == len) ? 0 : -1;
icraggs 20:cad3d54d7ecf 567 }
icraggs 20:cad3d54d7ecf 568
icraggs 20:cad3d54d7ecf 569
icraggs 20:cad3d54d7ecf 570 #endif