mqtt - potistiriProject

Dependencies:   MQTTPacket FP

Committer:
sam_grove
Date:
Fri May 09 23:01:35 2014 +0000
Revision:
25:d13a6c558164
Parent:
23:05fc7de97d4a
Child:
28:8b2abe9bd814
Child:
29:833386b16f3e
Simplify example by moving IPStack into this library for each transport. Working on FP usage to not need the FP.cpp inclusion in main

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 6:4d312a49200b 1 /*******************************************************************************
icraggs 6:4d312a49200b 2 * Copyright (c) 2014 IBM Corp.
sam_grove 0:fe461e4d7afe 3 *
icraggs 6:4d312a49200b 4 * All rights reserved. This program and the accompanying materials
icraggs 6:4d312a49200b 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 6:4d312a49200b 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
sam_grove 0:fe461e4d7afe 7 *
icraggs 6:4d312a49200b 8 * The Eclipse Public License is available at
icraggs 6:4d312a49200b 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 6:4d312a49200b 10 * and the Eclipse Distribution License is available at
icraggs 6:4d312a49200b 11 * http://www.eclipse.org/org/documents/edl-v10.php.
sam_grove 0:fe461e4d7afe 12 *
icraggs 6:4d312a49200b 13 * Contributors:
icraggs 6:4d312a49200b 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 6:4d312a49200b 15 *******************************************************************************/
icraggs 21:e918525e529d 16
icraggs 21:e918525e529d 17 /*
icraggs 21:e918525e529d 18
icraggs 21:e918525e529d 19 TODO:
icraggs 21:e918525e529d 20
icraggs 21:e918525e529d 21 log messages - use macros
icraggs 23:05fc7de97d4a 22
icraggs 21:e918525e529d 23 define return code constants
icraggs 21:e918525e529d 24
icraggs 23:05fc7de97d4a 25 call connectionLost at appropriate points - in sendPacket and readPacket
icraggs 23:05fc7de97d4a 26
icraggs 23:05fc7de97d4a 27 match wildcard topics
icraggs 23:05fc7de97d4a 28
sam_grove 25:d13a6c558164 29 updating usage of FP. Try to remove inclusion of FP.cpp in main. sg-
sam_grove 25:d13a6c558164 30
icraggs 21:e918525e529d 31 */
sam_grove 0:fe461e4d7afe 32
icraggs 2:dcfdd2abfe71 33 #if !defined(MQTTCLIENT_H)
icraggs 2:dcfdd2abfe71 34 #define MQTTCLIENT_H
icraggs 2:dcfdd2abfe71 35
icraggs 2:dcfdd2abfe71 36 #include "FP.h"
icraggs 3:dbff6b768d28 37 #include "MQTTPacket.h"
icraggs 6:4d312a49200b 38 #include "stdio.h"
icraggs 2:dcfdd2abfe71 39
icraggs 3:dbff6b768d28 40 namespace MQTT
icraggs 3:dbff6b768d28 41 {
icraggs 3:dbff6b768d28 42
icraggs 2:dcfdd2abfe71 43
icraggs 2:dcfdd2abfe71 44 enum QoS { QOS0, QOS1, QOS2 };
sam_grove 0:fe461e4d7afe 45
icraggs 23:05fc7de97d4a 46 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
icraggs 23:05fc7de97d4a 47
icraggs 2:dcfdd2abfe71 48
icraggs 3:dbff6b768d28 49 struct Message
icraggs 2:dcfdd2abfe71 50 {
icraggs 2:dcfdd2abfe71 51 enum QoS qos;
icraggs 2:dcfdd2abfe71 52 bool retained;
icraggs 2:dcfdd2abfe71 53 bool dup;
Ian Craggs 12:cc7f2d62a393 54 unsigned short id;
icraggs 2:dcfdd2abfe71 55 void *payload;
icraggs 2:dcfdd2abfe71 56 size_t payloadlen;
sam_grove 0:fe461e4d7afe 57 };
sam_grove 0:fe461e4d7afe 58
icraggs 4:4ef00243708e 59
icraggs 20:cad3d54d7ecf 60 struct MessageData
icraggs 20:cad3d54d7ecf 61 {
icraggs 20:cad3d54d7ecf 62 struct Message message;
icraggs 20:cad3d54d7ecf 63 char* topicName;
icraggs 20:cad3d54d7ecf 64 };
icraggs 20:cad3d54d7ecf 65
icraggs 20:cad3d54d7ecf 66
icraggs 9:01b8cc7d94cc 67 class PacketId
icraggs 9:01b8cc7d94cc 68 {
icraggs 9:01b8cc7d94cc 69 public:
icraggs 23:05fc7de97d4a 70 PacketId()
icraggs 23:05fc7de97d4a 71 {
icraggs 23:05fc7de97d4a 72 next = 0;
icraggs 23:05fc7de97d4a 73 }
icraggs 9:01b8cc7d94cc 74
icraggs 23:05fc7de97d4a 75 int getNext()
icraggs 23:05fc7de97d4a 76 {
icraggs 23:05fc7de97d4a 77 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
icraggs 23:05fc7de97d4a 78 }
Ian Craggs 12:cc7f2d62a393 79
icraggs 9:01b8cc7d94cc 80 private:
icraggs 9:01b8cc7d94cc 81 static const int MAX_PACKET_ID = 65535;
icraggs 9:01b8cc7d94cc 82 int next;
icraggs 9:01b8cc7d94cc 83 };
icraggs 2:dcfdd2abfe71 84
icraggs 16:91c2f9a144d4 85
icraggs 21:e918525e529d 86 /**
icraggs 21:e918525e529d 87 * @class Client
icraggs 22:aadb79d29330 88 * @brief blocking, non-threaded MQTT client API
icraggs 23:05fc7de97d4a 89 *
icraggs 23:05fc7de97d4a 90 * This version of the API blocks on all method calls, until they are complete. This means that only one
icraggs 23:05fc7de97d4a 91 * MQTT request can be in process at any one time.
icraggs 21:e918525e529d 92 * @param Network a network class which supports send, receive
icraggs 21:e918525e529d 93 * @param Timer a timer class with the methods:
icraggs 21:e918525e529d 94 */
icraggs 23:05fc7de97d4a 95 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5> class Client
icraggs 2:dcfdd2abfe71 96 {
icraggs 2:dcfdd2abfe71 97
icraggs 22:aadb79d29330 98 public:
icraggs 15:64a57183aa03 99
icraggs 20:cad3d54d7ecf 100 typedef struct
icraggs 20:cad3d54d7ecf 101 {
icraggs 20:cad3d54d7ecf 102 Client* client;
icraggs 20:cad3d54d7ecf 103 Network* network;
icraggs 20:cad3d54d7ecf 104 } connectionLostInfo;
icraggs 20:cad3d54d7ecf 105
icraggs 20:cad3d54d7ecf 106 typedef int (*connectionLostHandlers)(connectionLostInfo*);
icraggs 23:05fc7de97d4a 107 typedef void (*messageHandler)(Message*);
icraggs 23:05fc7de97d4a 108
icraggs 23:05fc7de97d4a 109 /** Construct the client
icraggs 23:05fc7de97d4a 110 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
icraggs 23:05fc7de97d4a 111 * before calling MQTT connect
icraggs 23:05fc7de97d4a 112 * @param limits an instance of the Limit class - to alter limits as required
icraggs 23:05fc7de97d4a 113 */
icraggs 23:05fc7de97d4a 114 Client(Network& network, unsigned int command_timeout_ms = 30000);
icraggs 20:cad3d54d7ecf 115
icraggs 20:cad3d54d7ecf 116 /** Set the connection lost callback - called whenever the connection is lost and we should be connected
icraggs 20:cad3d54d7ecf 117 * @param clh - pointer to the callback function
icraggs 20:cad3d54d7ecf 118 */
icraggs 20:cad3d54d7ecf 119 void setConnectionLostHandler(connectionLostHandlers clh)
icraggs 20:cad3d54d7ecf 120 {
icraggs 20:cad3d54d7ecf 121 connectionLostHandler.attach(clh);
icraggs 20:cad3d54d7ecf 122 }
icraggs 20:cad3d54d7ecf 123
icraggs 20:cad3d54d7ecf 124 /** Set the default message handling callback - used for any message which does not match a subscription message handler
icraggs 20:cad3d54d7ecf 125 * @param mh - pointer to the callback function
icraggs 20:cad3d54d7ecf 126 */
icraggs 20:cad3d54d7ecf 127 void setDefaultMessageHandler(messageHandler mh)
icraggs 20:cad3d54d7ecf 128 {
icraggs 20:cad3d54d7ecf 129 defaultMessageHandler.attach(mh);
icraggs 20:cad3d54d7ecf 130 }
icraggs 2:dcfdd2abfe71 131
icraggs 20:cad3d54d7ecf 132 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 20:cad3d54d7ecf 133 * The nework object must be connected to the network endpoint before calling this
icraggs 20:cad3d54d7ecf 134 * @param options - connect options
icraggs 20:cad3d54d7ecf 135 * @return success code -
icraggs 20:cad3d54d7ecf 136 */
icraggs 20:cad3d54d7ecf 137 int connect(MQTTPacket_connectData* options = 0);
icraggs 20:cad3d54d7ecf 138
icraggs 20:cad3d54d7ecf 139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 20:cad3d54d7ecf 140 * @param topic - the topic to publish to
icraggs 20:cad3d54d7ecf 141 * @param message - the message to send
icraggs 20:cad3d54d7ecf 142 * @return success code -
icraggs 20:cad3d54d7ecf 143 */
icraggs 20:cad3d54d7ecf 144 int publish(const char* topicName, Message* message);
icraggs 20:cad3d54d7ecf 145
icraggs 20:cad3d54d7ecf 146 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
icraggs 20:cad3d54d7ecf 147 * @param topicFilter - a topic pattern which can include wildcards
icraggs 20:cad3d54d7ecf 148 * @param qos - the MQTT QoS to subscribe at
icraggs 20:cad3d54d7ecf 149 * @param mh - the callback function to be invoked when a message is received for this subscription
icraggs 20:cad3d54d7ecf 150 * @return success code -
icraggs 20:cad3d54d7ecf 151 */
icraggs 20:cad3d54d7ecf 152 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
icraggs 9:01b8cc7d94cc 153
icraggs 20:cad3d54d7ecf 154 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
icraggs 20:cad3d54d7ecf 155 * @param topicFilter - a topic pattern which can include wildcards
icraggs 20:cad3d54d7ecf 156 * @return success code -
icraggs 20:cad3d54d7ecf 157 */
icraggs 20:cad3d54d7ecf 158 int unsubscribe(const char* topicFilter);
icraggs 20:cad3d54d7ecf 159
icraggs 20:cad3d54d7ecf 160 /** MQTT Disconnect - send an MQTT disconnect packet
icraggs 20:cad3d54d7ecf 161 * @return success code -
icraggs 20:cad3d54d7ecf 162 */
icraggs 20:cad3d54d7ecf 163 int disconnect();
icraggs 8:c46930bd6c82 164
icraggs 20:cad3d54d7ecf 165 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
icraggs 20:cad3d54d7ecf 166 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
icraggs 20:cad3d54d7ecf 167 * received.
icraggs 23:05fc7de97d4a 168 * @param timeout_ms the time to wait, in milliseconds
icraggs 20:cad3d54d7ecf 169 */
icraggs 23:05fc7de97d4a 170 void yield(int timeout_ms = 1000);
icraggs 20:cad3d54d7ecf 171
icraggs 20:cad3d54d7ecf 172 private:
icraggs 20:cad3d54d7ecf 173
icraggs 20:cad3d54d7ecf 174 int cycle(Timer& timer);
icraggs 20:cad3d54d7ecf 175 int waitfor(int packet_type, Timer& timer);
icraggs 20:cad3d54d7ecf 176 int keepalive();
icraggs 2:dcfdd2abfe71 177
icraggs 3:dbff6b768d28 178 int decodePacket(int* value, int timeout);
icraggs 20:cad3d54d7ecf 179 int readPacket(Timer& timer);
icraggs 20:cad3d54d7ecf 180 int sendPacket(int length, Timer& timer);
icraggs 20:cad3d54d7ecf 181 int deliverMessage(MQTTString* topic, Message* message);
icraggs 3:dbff6b768d28 182
icraggs 23:05fc7de97d4a 183 Network& ipstack;
icraggs 23:05fc7de97d4a 184 unsigned int command_timeout_ms;
icraggs 16:91c2f9a144d4 185
icraggs 23:05fc7de97d4a 186 char buf[MAX_MQTT_PACKET_SIZE];
icraggs 23:05fc7de97d4a 187 char readbuf[MAX_MQTT_PACKET_SIZE];
icraggs 15:64a57183aa03 188
icraggs 20:cad3d54d7ecf 189 Timer ping_timer;
icraggs 15:64a57183aa03 190 unsigned int keepAliveInterval;
icraggs 20:cad3d54d7ecf 191 bool ping_outstanding;
icraggs 4:4ef00243708e 192
icraggs 9:01b8cc7d94cc 193 PacketId packetid;
icraggs 9:01b8cc7d94cc 194
sam_grove 25:d13a6c558164 195 // typedef FP<void, Message*> messageHandlerFP;
sam_grove 25:d13a6c558164 196 // FP<void, Message*> messageHandlerFP;
icraggs 16:91c2f9a144d4 197 struct MessageHandlers
icraggs 15:64a57183aa03 198 {
icraggs 20:cad3d54d7ecf 199 const char* topic;
sam_grove 25:d13a6c558164 200 FP<void, Message*> fp;
icraggs 23:05fc7de97d4a 201 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
icraggs 15:64a57183aa03 202
sam_grove 25:d13a6c558164 203 FP<void, Message*> defaultMessageHandler;
icraggs 20:cad3d54d7ecf 204
sam_grove 25:d13a6c558164 205 FP<int, connectionLostInfo*> connectionLostHandler;
sam_grove 25:d13a6c558164 206 // connectionLostFP connectionLostHandler;
icraggs 11:db15da110a37 207
sam_grove 0:fe461e4d7afe 208 };
sam_grove 0:fe461e4d7afe 209
icraggs 15:64a57183aa03 210 }
icraggs 15:64a57183aa03 211
icraggs 15:64a57183aa03 212
icraggs 23:05fc7de97d4a 213 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
icraggs 23:05fc7de97d4a 214 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
icraggs 15:64a57183aa03 215 {
icraggs 23:05fc7de97d4a 216 ping_timer = Timer();
icraggs 23:05fc7de97d4a 217 ping_outstanding = 0;
icraggs 23:05fc7de97d4a 218 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 219 messageHandlers[i].topic = 0;
icraggs 23:05fc7de97d4a 220 this->command_timeout_ms = command_timeout_ms;
icraggs 11:db15da110a37 221 }
icraggs 11:db15da110a37 222
icraggs 11:db15da110a37 223
icraggs 23:05fc7de97d4a 224 template<class Network, class Timer, int a, int b>
icraggs 23:05fc7de97d4a 225 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
icraggs 8:c46930bd6c82 226 {
icraggs 23:05fc7de97d4a 227 int rc = FAILURE,
icraggs 23:05fc7de97d4a 228 sent = 0;
icraggs 8:c46930bd6c82 229
icraggs 23:05fc7de97d4a 230 while (sent < length && !timer.expired())
icraggs 23:05fc7de97d4a 231 {
icraggs 23:05fc7de97d4a 232 rc = ipstack.write(&buf[sent], length, timer.left_ms());
icraggs 23:05fc7de97d4a 233 if (rc == -1)
icraggs 23:05fc7de97d4a 234 {
icraggs 23:05fc7de97d4a 235 connectionLostInfo info = {this, &ipstack};
icraggs 23:05fc7de97d4a 236 connectionLostHandler(&info);
icraggs 23:05fc7de97d4a 237 }
icraggs 23:05fc7de97d4a 238 else
icraggs 23:05fc7de97d4a 239 sent += rc;
icraggs 23:05fc7de97d4a 240 }
icraggs 20:cad3d54d7ecf 241 if (sent == length)
icraggs 23:05fc7de97d4a 242 {
icraggs 20:cad3d54d7ecf 243 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
icraggs 23:05fc7de97d4a 244 rc = SUCCESS;
icraggs 23:05fc7de97d4a 245 }
icraggs 23:05fc7de97d4a 246 else
icraggs 23:05fc7de97d4a 247 rc = FAILURE;
icraggs 23:05fc7de97d4a 248 return rc;
icraggs 8:c46930bd6c82 249 }
icraggs 8:c46930bd6c82 250
icraggs 8:c46930bd6c82 251
icraggs 23:05fc7de97d4a 252 template<class Network, class Timer, int a, int b> int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
icraggs 8:c46930bd6c82 253 {
icraggs 8:c46930bd6c82 254 char c;
icraggs 8:c46930bd6c82 255 int multiplier = 1;
icraggs 8:c46930bd6c82 256 int len = 0;
icraggs 20:cad3d54d7ecf 257 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
icraggs 8:c46930bd6c82 258
icraggs 8:c46930bd6c82 259 *value = 0;
icraggs 8:c46930bd6c82 260 do
icraggs 8:c46930bd6c82 261 {
icraggs 8:c46930bd6c82 262 int rc = MQTTPACKET_READ_ERROR;
icraggs 8:c46930bd6c82 263
icraggs 8:c46930bd6c82 264 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 8:c46930bd6c82 265 {
icraggs 8:c46930bd6c82 266 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 8:c46930bd6c82 267 goto exit;
icraggs 8:c46930bd6c82 268 }
icraggs 23:05fc7de97d4a 269 rc = ipstack.read(&c, 1, timeout);
icraggs 8:c46930bd6c82 270 if (rc != 1)
icraggs 8:c46930bd6c82 271 goto exit;
icraggs 8:c46930bd6c82 272 *value += (c & 127) * multiplier;
icraggs 8:c46930bd6c82 273 multiplier *= 128;
icraggs 8:c46930bd6c82 274 } while ((c & 128) != 0);
icraggs 8:c46930bd6c82 275 exit:
icraggs 8:c46930bd6c82 276 return len;
icraggs 8:c46930bd6c82 277 }
icraggs 8:c46930bd6c82 278
icraggs 8:c46930bd6c82 279
icraggs 8:c46930bd6c82 280 /**
icraggs 8:c46930bd6c82 281 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 8:c46930bd6c82 282 * the packets can be retried.
icraggs 8:c46930bd6c82 283 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 8:c46930bd6c82 284 * @return the MQTT packet type, or -1 if none
icraggs 8:c46930bd6c82 285 */
icraggs 23:05fc7de97d4a 286 template<class Network, class Timer, int a, int b>
icraggs 23:05fc7de97d4a 287 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
icraggs 8:c46930bd6c82 288 {
icraggs 8:c46930bd6c82 289 int rc = -1;
icraggs 8:c46930bd6c82 290 MQTTHeader header = {0};
icraggs 8:c46930bd6c82 291 int len = 0;
icraggs 8:c46930bd6c82 292 int rem_len = 0;
icraggs 8:c46930bd6c82 293
icraggs 8:c46930bd6c82 294 /* 1. read the header byte. This has the packet type in it */
icraggs 23:05fc7de97d4a 295 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
icraggs 8:c46930bd6c82 296 goto exit;
icraggs 8:c46930bd6c82 297
icraggs 8:c46930bd6c82 298 len = 1;
icraggs 8:c46930bd6c82 299 /* 2. read the remaining length. This is variable in itself */
icraggs 20:cad3d54d7ecf 300 decodePacket(&rem_len, timer.left_ms());
icraggs 8:c46930bd6c82 301 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 8:c46930bd6c82 302
icraggs 8:c46930bd6c82 303 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 23:05fc7de97d4a 304 if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
icraggs 8:c46930bd6c82 305 goto exit;
icraggs 8:c46930bd6c82 306
icraggs 8:c46930bd6c82 307 header.byte = readbuf[0];
icraggs 8:c46930bd6c82 308 rc = header.bits.type;
icraggs 8:c46930bd6c82 309 exit:
icraggs 8:c46930bd6c82 310 return rc;
icraggs 3:dbff6b768d28 311 }
icraggs 3:dbff6b768d28 312
icraggs 8:c46930bd6c82 313
icraggs 23:05fc7de97d4a 314 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
icraggs 23:05fc7de97d4a 315 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString* topic, Message* message)
icraggs 15:64a57183aa03 316 {
icraggs 20:cad3d54d7ecf 317 int rc = -1;
icraggs 20:cad3d54d7ecf 318
icraggs 20:cad3d54d7ecf 319 // we have to find the right message handler - indexed by topic
icraggs 23:05fc7de97d4a 320 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 20:cad3d54d7ecf 321 {
icraggs 20:cad3d54d7ecf 322 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic))
icraggs 20:cad3d54d7ecf 323 {
icraggs 20:cad3d54d7ecf 324 messageHandlers[i].fp(message);
icraggs 20:cad3d54d7ecf 325 rc = 0;
icraggs 20:cad3d54d7ecf 326 break;
icraggs 20:cad3d54d7ecf 327 }
icraggs 20:cad3d54d7ecf 328 }
icraggs 20:cad3d54d7ecf 329 if (rc == -1)
icraggs 20:cad3d54d7ecf 330 defaultMessageHandler(message);
icraggs 20:cad3d54d7ecf 331
icraggs 20:cad3d54d7ecf 332 return rc;
icraggs 15:64a57183aa03 333 }
icraggs 15:64a57183aa03 334
icraggs 15:64a57183aa03 335
icraggs 20:cad3d54d7ecf 336
icraggs 23:05fc7de97d4a 337 template<class Network, class Timer, int a, int b>
icraggs 23:05fc7de97d4a 338 void MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
icraggs 20:cad3d54d7ecf 339 {
icraggs 20:cad3d54d7ecf 340 Timer timer = Timer();
icraggs 20:cad3d54d7ecf 341
icraggs 23:05fc7de97d4a 342 timer.countdown_ms(timeout_ms);
icraggs 20:cad3d54d7ecf 343 while (!timer.expired())
icraggs 20:cad3d54d7ecf 344 cycle(timer);
icraggs 20:cad3d54d7ecf 345 }
icraggs 20:cad3d54d7ecf 346
icraggs 20:cad3d54d7ecf 347
icraggs 23:05fc7de97d4a 348 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 349 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
icraggs 8:c46930bd6c82 350 {
icraggs 8:c46930bd6c82 351 /* get one piece of work off the wire and one pass through */
icraggs 20:cad3d54d7ecf 352
Ian Craggs 12:cc7f2d62a393 353 // read the socket, see what work is due
icraggs 20:cad3d54d7ecf 354 int packet_type = readPacket(timer);
icraggs 15:64a57183aa03 355
sam_grove 25:d13a6c558164 356 int len = 0, rc;
icraggs 8:c46930bd6c82 357 switch (packet_type)
icraggs 8:c46930bd6c82 358 {
icraggs 15:64a57183aa03 359 case CONNACK:
icraggs 8:c46930bd6c82 360 case PUBACK:
icraggs 8:c46930bd6c82 361 case SUBACK:
icraggs 8:c46930bd6c82 362 break;
icraggs 15:64a57183aa03 363 case PUBLISH:
icraggs 20:cad3d54d7ecf 364 MQTTString topicName;
icraggs 20:cad3d54d7ecf 365 Message msg;
icraggs 20:cad3d54d7ecf 366 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
icraggs 23:05fc7de97d4a 367 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE);;
sam_grove 25:d13a6c558164 368 rc = rc; // make sure optimizer doesnt omit this
icraggs 20:cad3d54d7ecf 369 deliverMessage(&topicName, &msg);
icraggs 20:cad3d54d7ecf 370 if (msg.qos != QOS0)
icraggs 20:cad3d54d7ecf 371 {
icraggs 20:cad3d54d7ecf 372 if (msg.qos == QOS1)
icraggs 23:05fc7de97d4a 373 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
icraggs 20:cad3d54d7ecf 374 else if (msg.qos == QOS2)
icraggs 23:05fc7de97d4a 375 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
icraggs 23:05fc7de97d4a 376 if ((rc = sendPacket(len, timer)) != SUCCESS)
icraggs 20:cad3d54d7ecf 377 goto exit; // there was a problem
icraggs 20:cad3d54d7ecf 378 }
Ian Craggs 12:cc7f2d62a393 379 break;
icraggs 15:64a57183aa03 380 case PUBREC:
icraggs 20:cad3d54d7ecf 381 int type, dup, mypacketid;
icraggs 23:05fc7de97d4a 382 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 383 ;
icraggs 23:05fc7de97d4a 384 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid);
icraggs 23:05fc7de97d4a 385 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
icraggs 20:cad3d54d7ecf 386 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 387
icraggs 8:c46930bd6c82 388 break;
icraggs 8:c46930bd6c82 389 case PUBCOMP:
icraggs 8:c46930bd6c82 390 break;
icraggs 15:64a57183aa03 391 case PINGRESP:
icraggs 20:cad3d54d7ecf 392 ping_outstanding = false;
icraggs 8:c46930bd6c82 393 break;
icraggs 15:64a57183aa03 394 }
icraggs 20:cad3d54d7ecf 395 keepalive();
Ian Craggs 12:cc7f2d62a393 396 exit:
icraggs 8:c46930bd6c82 397 return packet_type;
icraggs 15:64a57183aa03 398 }
icraggs 15:64a57183aa03 399
icraggs 15:64a57183aa03 400
icraggs 23:05fc7de97d4a 401 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 402 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
icraggs 15:64a57183aa03 403 {
icraggs 20:cad3d54d7ecf 404 int rc = 0;
icraggs 15:64a57183aa03 405
icraggs 20:cad3d54d7ecf 406 if (keepAliveInterval == 0)
icraggs 20:cad3d54d7ecf 407 goto exit;
icraggs 15:64a57183aa03 408
icraggs 20:cad3d54d7ecf 409 if (ping_timer.expired())
icraggs 20:cad3d54d7ecf 410 {
icraggs 20:cad3d54d7ecf 411 if (ping_outstanding)
icraggs 20:cad3d54d7ecf 412 rc = -1;
icraggs 20:cad3d54d7ecf 413 else
icraggs 20:cad3d54d7ecf 414 {
icraggs 20:cad3d54d7ecf 415 Timer timer = Timer(1000);
icraggs 23:05fc7de97d4a 416 int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
icraggs 20:cad3d54d7ecf 417 rc = sendPacket(len, timer); // send the ping packet
icraggs 23:05fc7de97d4a 418 if (rc != SUCCESS)
icraggs 20:cad3d54d7ecf 419 rc = -1; // indicate there's a problem
icraggs 20:cad3d54d7ecf 420 else
icraggs 20:cad3d54d7ecf 421 ping_outstanding = true;
icraggs 20:cad3d54d7ecf 422 }
icraggs 20:cad3d54d7ecf 423 }
icraggs 15:64a57183aa03 424
icraggs 15:64a57183aa03 425 exit:
icraggs 20:cad3d54d7ecf 426 return rc;
icraggs 8:c46930bd6c82 427 }
icraggs 8:c46930bd6c82 428
icraggs 8:c46930bd6c82 429
icraggs 16:91c2f9a144d4 430 // only used in single-threaded mode where one command at a time is in process
icraggs 23:05fc7de97d4a 431 template<class Network, class Timer, int a, int b>
icraggs 23:05fc7de97d4a 432 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
icraggs 15:64a57183aa03 433 {
icraggs 20:cad3d54d7ecf 434 int rc = -1;
icraggs 20:cad3d54d7ecf 435
icraggs 20:cad3d54d7ecf 436 do
icraggs 16:91c2f9a144d4 437 {
icraggs 20:cad3d54d7ecf 438 if (timer.expired())
icraggs 20:cad3d54d7ecf 439 break; // we timed out
icraggs 20:cad3d54d7ecf 440 }
icraggs 20:cad3d54d7ecf 441 while ((rc = cycle(timer)) != packet_type);
icraggs 20:cad3d54d7ecf 442
icraggs 20:cad3d54d7ecf 443 return rc;
icraggs 16:91c2f9a144d4 444 }
icraggs 16:91c2f9a144d4 445
icraggs 16:91c2f9a144d4 446
icraggs 23:05fc7de97d4a 447 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 448 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
icraggs 16:91c2f9a144d4 449 {
icraggs 23:05fc7de97d4a 450 Timer connect_timer = Timer(command_timeout_ms);
icraggs 8:c46930bd6c82 451
Ian Craggs 12:cc7f2d62a393 452 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 8:c46930bd6c82 453 if (options == 0)
Ian Craggs 12:cc7f2d62a393 454 options = &default_options; // set default options if none were supplied
icraggs 8:c46930bd6c82 455
icraggs 15:64a57183aa03 456 this->keepAliveInterval = options->keepAliveInterval;
icraggs 20:cad3d54d7ecf 457 ping_timer.countdown(this->keepAliveInterval);
icraggs 23:05fc7de97d4a 458 int len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options);
icraggs 20:cad3d54d7ecf 459 int rc = sendPacket(len, connect_timer); // send the connect packet
icraggs 23:05fc7de97d4a 460 if (rc != SUCCESS)
icraggs 20:cad3d54d7ecf 461 goto exit; // there was a problem
icraggs 8:c46930bd6c82 462
icraggs 20:cad3d54d7ecf 463 // this will be a blocking call, wait for the connack
icraggs 20:cad3d54d7ecf 464 if (waitfor(CONNACK, connect_timer) == CONNACK)
icraggs 15:64a57183aa03 465 {
icraggs 20:cad3d54d7ecf 466 int connack_rc = -1;
icraggs 23:05fc7de97d4a 467 if (MQTTDeserialize_connack(&connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 468 rc = connack_rc;
icraggs 8:c46930bd6c82 469 }
icraggs 15:64a57183aa03 470
icraggs 15:64a57183aa03 471 exit:
icraggs 8:c46930bd6c82 472 return rc;
icraggs 8:c46930bd6c82 473 }
icraggs 8:c46930bd6c82 474
icraggs 8:c46930bd6c82 475
icraggs 23:05fc7de97d4a 476 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
icraggs 23:05fc7de97d4a 477 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
icraggs 20:cad3d54d7ecf 478 {
icraggs 20:cad3d54d7ecf 479 int len = -1;
icraggs 23:05fc7de97d4a 480 Timer timer = Timer(command_timeout_ms);
icraggs 20:cad3d54d7ecf 481
icraggs 8:c46930bd6c82 482 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 483
icraggs 23:05fc7de97d4a 484 int rc = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
icraggs 20:cad3d54d7ecf 485 if (rc <= 0)
icraggs 20:cad3d54d7ecf 486 goto exit;
icraggs 20:cad3d54d7ecf 487 len = rc;
icraggs 23:05fc7de97d4a 488 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
icraggs 20:cad3d54d7ecf 489 goto exit; // there was a problem
icraggs 8:c46930bd6c82 490
icraggs 20:cad3d54d7ecf 491 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
icraggs 8:c46930bd6c82 492 {
icraggs 20:cad3d54d7ecf 493 int count = 0, grantedQoS = -1, mypacketid;
icraggs 23:05fc7de97d4a 494 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 495 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 20:cad3d54d7ecf 496 if (rc != 0x80)
icraggs 8:c46930bd6c82 497 {
icraggs 23:05fc7de97d4a 498 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
icraggs 16:91c2f9a144d4 499 {
icraggs 20:cad3d54d7ecf 500 if (messageHandlers[i].topic == 0)
icraggs 20:cad3d54d7ecf 501 {
icraggs 20:cad3d54d7ecf 502 messageHandlers[i].topic = topicFilter;
icraggs 20:cad3d54d7ecf 503 messageHandlers[i].fp.attach(messageHandler);
icraggs 20:cad3d54d7ecf 504 rc = 0;
icraggs 20:cad3d54d7ecf 505 break;
icraggs 20:cad3d54d7ecf 506 }
icraggs 16:91c2f9a144d4 507 }
icraggs 8:c46930bd6c82 508 }
icraggs 8:c46930bd6c82 509 }
icraggs 15:64a57183aa03 510
icraggs 15:64a57183aa03 511 exit:
Ian Craggs 12:cc7f2d62a393 512 return rc;
icraggs 15:64a57183aa03 513 }
icraggs 15:64a57183aa03 514
icraggs 15:64a57183aa03 515
icraggs 23:05fc7de97d4a 516 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
icraggs 23:05fc7de97d4a 517 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
icraggs 20:cad3d54d7ecf 518 {
icraggs 20:cad3d54d7ecf 519 int len = -1;
icraggs 23:05fc7de97d4a 520 Timer timer = Timer(command_timeout_ms);
icraggs 20:cad3d54d7ecf 521
Ian Craggs 12:cc7f2d62a393 522 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 523
icraggs 23:05fc7de97d4a 524 int rc = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic);
icraggs 20:cad3d54d7ecf 525 if (rc <= 0)
icraggs 20:cad3d54d7ecf 526 goto exit;
icraggs 20:cad3d54d7ecf 527 len = rc;
icraggs 23:05fc7de97d4a 528 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
icraggs 20:cad3d54d7ecf 529 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 530
icraggs 20:cad3d54d7ecf 531 if (waitfor(UNSUBACK, timer) == UNSUBACK)
Ian Craggs 12:cc7f2d62a393 532 {
icraggs 20:cad3d54d7ecf 533 int mypacketid; // should be the same as the packetid above
icraggs 23:05fc7de97d4a 534 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 535 rc = 0;
Ian Craggs 12:cc7f2d62a393 536 }
icraggs 15:64a57183aa03 537
icraggs 15:64a57183aa03 538 exit:
Ian Craggs 12:cc7f2d62a393 539 return rc;
icraggs 15:64a57183aa03 540 }
icraggs 15:64a57183aa03 541
icraggs 15:64a57183aa03 542
icraggs 15:64a57183aa03 543
icraggs 23:05fc7de97d4a 544 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 545 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
icraggs 20:cad3d54d7ecf 546 {
icraggs 23:05fc7de97d4a 547 Timer timer = Timer(command_timeout_ms);
icraggs 20:cad3d54d7ecf 548
icraggs 20:cad3d54d7ecf 549 MQTTString topicString = {(char*)topicName, 0, 0};
icraggs 15:64a57183aa03 550
icraggs 20:cad3d54d7ecf 551 if (message->qos == QOS1 || message->qos == QOS2)
icraggs 20:cad3d54d7ecf 552 message->id = packetid.getNext();
icraggs 15:64a57183aa03 553
icraggs 23:05fc7de97d4a 554 int len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
icraggs 20:cad3d54d7ecf 555 topicString, (char*)message->payload, message->payloadlen);
icraggs 20:cad3d54d7ecf 556 int rc = sendPacket(len, timer); // send the subscribe packet
icraggs 23:05fc7de97d4a 557 if (rc != SUCCESS)
icraggs 20:cad3d54d7ecf 558 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 559
icraggs 20:cad3d54d7ecf 560 if (message->qos == QOS1)
Ian Craggs 12:cc7f2d62a393 561 {
icraggs 20:cad3d54d7ecf 562 if (waitfor(PUBACK, timer) == PUBACK)
icraggs 20:cad3d54d7ecf 563 {
icraggs 20:cad3d54d7ecf 564 int type, dup, mypacketid;
icraggs 23:05fc7de97d4a 565 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 566 rc = 0;
icraggs 20:cad3d54d7ecf 567 }
Ian Craggs 12:cc7f2d62a393 568 }
icraggs 20:cad3d54d7ecf 569 else if (message->qos == QOS2)
Ian Craggs 12:cc7f2d62a393 570 {
icraggs 20:cad3d54d7ecf 571 if (waitfor(PUBCOMP, timer) == PUBCOMP)
icraggs 20:cad3d54d7ecf 572 {
icraggs 20:cad3d54d7ecf 573 int type, dup, mypacketid;
icraggs 23:05fc7de97d4a 574 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 20:cad3d54d7ecf 575 rc = 0;
icraggs 20:cad3d54d7ecf 576 }
Ian Craggs 12:cc7f2d62a393 577 }
icraggs 15:64a57183aa03 578
icraggs 15:64a57183aa03 579 exit:
icraggs 8:c46930bd6c82 580 return rc;
icraggs 8:c46930bd6c82 581 }
icraggs 8:c46930bd6c82 582
icraggs 8:c46930bd6c82 583
icraggs 23:05fc7de97d4a 584 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 23:05fc7de97d4a 585 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
icraggs 20:cad3d54d7ecf 586 {
icraggs 23:05fc7de97d4a 587 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
icraggs 23:05fc7de97d4a 588 int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
icraggs 20:cad3d54d7ecf 589 int rc = sendPacket(len, timer); // send the disconnect packet
icraggs 20:cad3d54d7ecf 590
icraggs 23:05fc7de97d4a 591 return rc;
icraggs 20:cad3d54d7ecf 592 }
icraggs 20:cad3d54d7ecf 593
icraggs 20:cad3d54d7ecf 594
icraggs 20:cad3d54d7ecf 595 #endif