Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
MQTTClient.h@15:64a57183aa03, 2014-04-13 (annotated)
- Committer:
- icraggs
- Date:
- Sun Apr 13 22:32:28 2014 +0000
- Revision:
- 15:64a57183aa03
- Parent:
- 13:fd82db992024
- Child:
- 16:91c2f9a144d4
I really want the arrays to be allocated in automatic storage
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
icraggs | 6:4d312a49200b | 1 | /******************************************************************************* |
icraggs | 6:4d312a49200b | 2 | * Copyright (c) 2014 IBM Corp. |
sam_grove | 0:fe461e4d7afe | 3 | * |
icraggs | 6:4d312a49200b | 4 | * All rights reserved. This program and the accompanying materials |
icraggs | 6:4d312a49200b | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
icraggs | 6:4d312a49200b | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
sam_grove | 0:fe461e4d7afe | 7 | * |
icraggs | 6:4d312a49200b | 8 | * The Eclipse Public License is available at |
icraggs | 6:4d312a49200b | 9 | * http://www.eclipse.org/legal/epl-v10.html |
icraggs | 6:4d312a49200b | 10 | * and the Eclipse Distribution License is available at |
icraggs | 6:4d312a49200b | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
sam_grove | 0:fe461e4d7afe | 12 | * |
icraggs | 6:4d312a49200b | 13 | * Contributors: |
icraggs | 6:4d312a49200b | 14 | * Ian Craggs - initial API and implementation and/or initial documentation |
icraggs | 6:4d312a49200b | 15 | *******************************************************************************/ |
sam_grove | 0:fe461e4d7afe | 16 | |
icraggs | 2:dcfdd2abfe71 | 17 | #if !defined(MQTTCLIENT_H) |
icraggs | 2:dcfdd2abfe71 | 18 | #define MQTTCLIENT_H |
icraggs | 2:dcfdd2abfe71 | 19 | |
icraggs | 2:dcfdd2abfe71 | 20 | #include "FP.h" |
icraggs | 3:dbff6b768d28 | 21 | #include "MQTTPacket.h" |
icraggs | 6:4d312a49200b | 22 | #include "stdio.h" |
icraggs | 2:dcfdd2abfe71 | 23 | |
icraggs | 3:dbff6b768d28 | 24 | namespace MQTT |
icraggs | 3:dbff6b768d28 | 25 | { |
icraggs | 3:dbff6b768d28 | 26 | |
icraggs | 2:dcfdd2abfe71 | 27 | |
icraggs | 2:dcfdd2abfe71 | 28 | enum QoS { QOS0, QOS1, QOS2 }; |
sam_grove | 0:fe461e4d7afe | 29 | |
icraggs | 2:dcfdd2abfe71 | 30 | |
icraggs | 3:dbff6b768d28 | 31 | struct Message |
icraggs | 2:dcfdd2abfe71 | 32 | { |
icraggs | 2:dcfdd2abfe71 | 33 | enum QoS qos; |
icraggs | 2:dcfdd2abfe71 | 34 | bool retained; |
icraggs | 2:dcfdd2abfe71 | 35 | bool dup; |
Ian Craggs |
12:cc7f2d62a393 | 36 | unsigned short id; |
icraggs | 2:dcfdd2abfe71 | 37 | void *payload; |
icraggs | 2:dcfdd2abfe71 | 38 | size_t payloadlen; |
sam_grove | 0:fe461e4d7afe | 39 | }; |
sam_grove | 0:fe461e4d7afe | 40 | |
icraggs | 6:4d312a49200b | 41 | template<class Network, class Timer, class Thread> class Client; |
icraggs | 4:4ef00243708e | 42 | |
icraggs | 9:01b8cc7d94cc | 43 | class PacketId |
icraggs | 9:01b8cc7d94cc | 44 | { |
icraggs | 9:01b8cc7d94cc | 45 | public: |
icraggs | 9:01b8cc7d94cc | 46 | PacketId(); |
icraggs | 9:01b8cc7d94cc | 47 | |
icraggs | 15:64a57183aa03 | 48 | int getNext(); |
Ian Craggs |
12:cc7f2d62a393 | 49 | |
icraggs | 9:01b8cc7d94cc | 50 | private: |
icraggs | 9:01b8cc7d94cc | 51 | static const int MAX_PACKET_ID = 65535; |
icraggs | 9:01b8cc7d94cc | 52 | int next; |
icraggs | 9:01b8cc7d94cc | 53 | }; |
icraggs | 9:01b8cc7d94cc | 54 | |
icraggs | 9:01b8cc7d94cc | 55 | typedef void (*messageHandler)(Message*); |
icraggs | 2:dcfdd2abfe71 | 56 | |
icraggs | 6:4d312a49200b | 57 | template<class Network, class Timer, class Thread> class Client |
icraggs | 2:dcfdd2abfe71 | 58 | { |
icraggs | 2:dcfdd2abfe71 | 59 | |
icraggs | 15:64a57183aa03 | 60 | public: |
icraggs | 15:64a57183aa03 | 61 | |
Ian Craggs |
12:cc7f2d62a393 | 62 | struct Result |
Ian Craggs |
12:cc7f2d62a393 | 63 | { |
Ian Craggs |
12:cc7f2d62a393 | 64 | /* success or failure result data */ |
icraggs | 15:64a57183aa03 | 65 | Client<Network, Timer, Thread>* client; |
Ian Craggs |
12:cc7f2d62a393 | 66 | int connack_rc; |
icraggs | 15:64a57183aa03 | 67 | }; |
icraggs | 15:64a57183aa03 | 68 | |
Ian Craggs |
12:cc7f2d62a393 | 69 | typedef void (*resultHandler)(Result*); |
icraggs | 15:64a57183aa03 | 70 | |
icraggs | 15:64a57183aa03 | 71 | struct limits |
icraggs | 15:64a57183aa03 | 72 | { |
icraggs | 15:64a57183aa03 | 73 | int MAX_MQTT_PACKET_SIZE; // |
icraggs | 15:64a57183aa03 | 74 | int MAX_MESSAGE_HANDLERS; // 5 - each subscription requires a message handler |
icraggs | 15:64a57183aa03 | 75 | int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode |
icraggs | 15:64a57183aa03 | 76 | int command_timeout; |
icraggs | 15:64a57183aa03 | 77 | |
icraggs | 15:64a57183aa03 | 78 | limits() |
icraggs | 15:64a57183aa03 | 79 | { |
icraggs | 15:64a57183aa03 | 80 | MAX_MQTT_PACKET_SIZE = 100; |
icraggs | 15:64a57183aa03 | 81 | MAX_MESSAGE_HANDLERS = 5; |
icraggs | 15:64a57183aa03 | 82 | MAX_CONCURRENT_OPERATIONS= 5; |
icraggs | 15:64a57183aa03 | 83 | command_timeout = 30; |
icraggs | 15:64a57183aa03 | 84 | } |
icraggs | 15:64a57183aa03 | 85 | }; |
icraggs | 4:4ef00243708e | 86 | |
Ian Craggs |
12:cc7f2d62a393 | 87 | Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30); |
icraggs | 2:dcfdd2abfe71 | 88 | |
icraggs | 9:01b8cc7d94cc | 89 | int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0); |
icraggs | 2:dcfdd2abfe71 | 90 | |
icraggs | 9:01b8cc7d94cc | 91 | template<class T> |
icraggs | 9:01b8cc7d94cc | 92 | int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function |
icraggs | 9:01b8cc7d94cc | 93 | |
icraggs | 9:01b8cc7d94cc | 94 | int publish(const char* topic, Message* message, resultHandler rh = 0); |
sam_grove | 0:fe461e4d7afe | 95 | |
icraggs | 9:01b8cc7d94cc | 96 | int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0); |
icraggs | 2:dcfdd2abfe71 | 97 | |
Ian Craggs |
12:cc7f2d62a393 | 98 | int unsubscribe(const char* topicFilter, resultHandler rh = 0); |
icraggs | 9:01b8cc7d94cc | 99 | |
icraggs | 9:01b8cc7d94cc | 100 | int disconnect(int timeout, resultHandler rh = 0); |
sam_grove | 0:fe461e4d7afe | 101 | |
icraggs | 8:c46930bd6c82 | 102 | void run(void const *argument); |
icraggs | 8:c46930bd6c82 | 103 | |
icraggs | 2:dcfdd2abfe71 | 104 | private: |
icraggs | 2:dcfdd2abfe71 | 105 | |
icraggs | 15:64a57183aa03 | 106 | int cycle(int timeout); |
Ian Craggs |
12:cc7f2d62a393 | 107 | int keepalive(); |
icraggs | 2:dcfdd2abfe71 | 108 | |
icraggs | 3:dbff6b768d28 | 109 | int decodePacket(int* value, int timeout); |
icraggs | 4:4ef00243708e | 110 | int readPacket(int timeout = -1); |
icraggs | 6:4d312a49200b | 111 | int sendPacket(int length, int timeout = -1); |
icraggs | 15:64a57183aa03 | 112 | int deliverMessage(MQTTString* topic, Message* message); |
icraggs | 3:dbff6b768d28 | 113 | |
icraggs | 4:4ef00243708e | 114 | Thread* thread; |
icraggs | 4:4ef00243708e | 115 | Network* ipstack; |
Ian Craggs |
12:cc7f2d62a393 | 116 | Timer command_timer, ping_timer; |
icraggs | 4:4ef00243708e | 117 | |
icraggs | 15:64a57183aa03 | 118 | char buf[]; |
icraggs | 2:dcfdd2abfe71 | 119 | int buflen; |
icraggs | 2:dcfdd2abfe71 | 120 | |
icraggs | 4:4ef00243708e | 121 | char* readbuf; |
icraggs | 15:64a57183aa03 | 122 | int readbuflen; |
icraggs | 15:64a57183aa03 | 123 | |
icraggs | 15:64a57183aa03 | 124 | unsigned int keepAliveInterval; |
Ian Craggs |
12:cc7f2d62a393 | 125 | bool ping_outstanding; |
icraggs | 4:4ef00243708e | 126 | |
icraggs | 6:4d312a49200b | 127 | int command_timeout; // max time to wait for any MQTT command to complete, in seconds |
icraggs | 9:01b8cc7d94cc | 128 | PacketId packetid; |
icraggs | 9:01b8cc7d94cc | 129 | |
icraggs | 9:01b8cc7d94cc | 130 | typedef FP<void, Result*> resultHandlerFP; |
icraggs | 9:01b8cc7d94cc | 131 | resultHandlerFP connectHandler; |
icraggs | 9:01b8cc7d94cc | 132 | |
icraggs | 9:01b8cc7d94cc | 133 | #define MAX_MESSAGE_HANDLERS 5 |
icraggs | 9:01b8cc7d94cc | 134 | typedef FP<void, Message*> messageHandlerFP; |
icraggs | 15:64a57183aa03 | 135 | struct |
icraggs | 15:64a57183aa03 | 136 | { |
icraggs | 15:64a57183aa03 | 137 | char* topic; |
icraggs | 15:64a57183aa03 | 138 | messageHandlerFP fp; |
icraggs | 15:64a57183aa03 | 139 | } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are linked to a subscription topic |
icraggs | 15:64a57183aa03 | 140 | |
icraggs | 15:64a57183aa03 | 141 | // how many concurrent operations should we allow? Each one will require a function pointer |
icraggs | 15:64a57183aa03 | 142 | struct |
icraggs | 15:64a57183aa03 | 143 | { |
icraggs | 15:64a57183aa03 | 144 | unsigned short id; |
icraggs | 15:64a57183aa03 | 145 | resultHandlerFP fp; |
icraggs | 15:64a57183aa03 | 146 | MQTTString* topic; // if this is a publish, store topic name in case republishing is required |
icraggs | 15:64a57183aa03 | 147 | Message* message; // for publish, |
icraggs | 15:64a57183aa03 | 148 | } *operations; // result handlers are indexed by packet ids |
icraggs | 15:64a57183aa03 | 149 | |
Ian Craggs |
12:cc7f2d62a393 | 150 | static void threadfn(void* arg); |
icraggs | 11:db15da110a37 | 151 | |
sam_grove | 0:fe461e4d7afe | 152 | }; |
sam_grove | 0:fe461e4d7afe | 153 | |
icraggs | 15:64a57183aa03 | 154 | } |
icraggs | 15:64a57183aa03 | 155 | |
icraggs | 15:64a57183aa03 | 156 | |
icraggs | 15:64a57183aa03 | 157 | template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg) |
icraggs | 15:64a57183aa03 | 158 | { |
icraggs | 15:64a57183aa03 | 159 | ((Client<Network, Timer, Thread>*) arg)->run(NULL); |
icraggs | 11:db15da110a37 | 160 | } |
icraggs | 11:db15da110a37 | 161 | |
icraggs | 11:db15da110a37 | 162 | |
Ian Craggs |
12:cc7f2d62a393 | 163 | template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout) : packetid() |
icraggs | 8:c46930bd6c82 | 164 | { |
icraggs | 15:64a57183aa03 | 165 | //buf = new char[MAX_MQTT_PACKET_SIZE]; |
Ian Craggs |
12:cc7f2d62a393 | 166 | readbuf = new char[MAX_MQTT_PACKET_SIZE]; |
Ian Craggs |
12:cc7f2d62a393 | 167 | buflen = readbuflen = MAX_MQTT_PACKET_SIZE; |
icraggs | 15:64a57183aa03 | 168 | |
icraggs | 8:c46930bd6c82 | 169 | this->command_timeout = command_timeout; |
icraggs | 8:c46930bd6c82 | 170 | this->thread = 0; |
icraggs | 8:c46930bd6c82 | 171 | this->ipstack = network; |
icraggs | 15:64a57183aa03 | 172 | this->command_timer = Timer(); |
icraggs | 15:64a57183aa03 | 173 | this->ping_timer = Timer(); |
Ian Craggs |
12:cc7f2d62a393 | 174 | this->ping_outstanding = 0; |
icraggs | 8:c46930bd6c82 | 175 | } |
icraggs | 8:c46930bd6c82 | 176 | |
icraggs | 8:c46930bd6c82 | 177 | |
icraggs | 8:c46930bd6c82 | 178 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout) |
icraggs | 8:c46930bd6c82 | 179 | { |
icraggs | 8:c46930bd6c82 | 180 | int sent = 0; |
icraggs | 8:c46930bd6c82 | 181 | |
icraggs | 8:c46930bd6c82 | 182 | while (sent < length) |
icraggs | 15:64a57183aa03 | 183 | sent += ipstack->write(&buf[sent], length, -1); |
Ian Craggs |
12:cc7f2d62a393 | 184 | if (sent == length) |
Ian Craggs |
12:cc7f2d62a393 | 185 | ping_timer.reset(); // record the fact that we have successfully sent the packet |
icraggs | 8:c46930bd6c82 | 186 | return sent; |
icraggs | 8:c46930bd6c82 | 187 | } |
icraggs | 8:c46930bd6c82 | 188 | |
icraggs | 8:c46930bd6c82 | 189 | |
icraggs | 8:c46930bd6c82 | 190 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout) |
icraggs | 8:c46930bd6c82 | 191 | { |
icraggs | 8:c46930bd6c82 | 192 | char c; |
icraggs | 8:c46930bd6c82 | 193 | int multiplier = 1; |
icraggs | 8:c46930bd6c82 | 194 | int len = 0; |
icraggs | 15:64a57183aa03 | 195 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
icraggs | 8:c46930bd6c82 | 196 | |
icraggs | 8:c46930bd6c82 | 197 | *value = 0; |
icraggs | 8:c46930bd6c82 | 198 | do |
icraggs | 8:c46930bd6c82 | 199 | { |
icraggs | 8:c46930bd6c82 | 200 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 8:c46930bd6c82 | 201 | |
icraggs | 8:c46930bd6c82 | 202 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 8:c46930bd6c82 | 203 | { |
icraggs | 8:c46930bd6c82 | 204 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 8:c46930bd6c82 | 205 | goto exit; |
icraggs | 8:c46930bd6c82 | 206 | } |
icraggs | 8:c46930bd6c82 | 207 | rc = ipstack->read(&c, 1, timeout); |
icraggs | 8:c46930bd6c82 | 208 | if (rc != 1) |
icraggs | 8:c46930bd6c82 | 209 | goto exit; |
icraggs | 8:c46930bd6c82 | 210 | *value += (c & 127) * multiplier; |
icraggs | 8:c46930bd6c82 | 211 | multiplier *= 128; |
icraggs | 8:c46930bd6c82 | 212 | } while ((c & 128) != 0); |
icraggs | 8:c46930bd6c82 | 213 | exit: |
icraggs | 8:c46930bd6c82 | 214 | return len; |
icraggs | 8:c46930bd6c82 | 215 | } |
icraggs | 8:c46930bd6c82 | 216 | |
icraggs | 8:c46930bd6c82 | 217 | |
icraggs | 8:c46930bd6c82 | 218 | /** |
icraggs | 8:c46930bd6c82 | 219 | * If any read fails in this method, then we should disconnect from the network, as on reconnect |
icraggs | 8:c46930bd6c82 | 220 | * the packets can be retried. |
icraggs | 8:c46930bd6c82 | 221 | * @param timeout the max time to wait for the packet read to complete, in milliseconds |
icraggs | 8:c46930bd6c82 | 222 | * @return the MQTT packet type, or -1 if none |
icraggs | 8:c46930bd6c82 | 223 | */ |
icraggs | 8:c46930bd6c82 | 224 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout) |
icraggs | 8:c46930bd6c82 | 225 | { |
icraggs | 8:c46930bd6c82 | 226 | int rc = -1; |
icraggs | 8:c46930bd6c82 | 227 | MQTTHeader header = {0}; |
icraggs | 8:c46930bd6c82 | 228 | int len = 0; |
icraggs | 8:c46930bd6c82 | 229 | int rem_len = 0; |
icraggs | 8:c46930bd6c82 | 230 | |
icraggs | 8:c46930bd6c82 | 231 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 8:c46930bd6c82 | 232 | if (ipstack->read(readbuf, 1, timeout) != 1) |
icraggs | 8:c46930bd6c82 | 233 | goto exit; |
icraggs | 8:c46930bd6c82 | 234 | |
icraggs | 8:c46930bd6c82 | 235 | len = 1; |
icraggs | 8:c46930bd6c82 | 236 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 8:c46930bd6c82 | 237 | decodePacket(&rem_len, timeout); |
icraggs | 8:c46930bd6c82 | 238 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ |
icraggs | 8:c46930bd6c82 | 239 | |
icraggs | 8:c46930bd6c82 | 240 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 8:c46930bd6c82 | 241 | if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) |
icraggs | 8:c46930bd6c82 | 242 | goto exit; |
icraggs | 8:c46930bd6c82 | 243 | |
icraggs | 8:c46930bd6c82 | 244 | header.byte = readbuf[0]; |
icraggs | 8:c46930bd6c82 | 245 | rc = header.bits.type; |
icraggs | 8:c46930bd6c82 | 246 | exit: |
icraggs | 8:c46930bd6c82 | 247 | return rc; |
icraggs | 3:dbff6b768d28 | 248 | } |
icraggs | 3:dbff6b768d28 | 249 | |
icraggs | 8:c46930bd6c82 | 250 | |
icraggs | 15:64a57183aa03 | 251 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message) |
icraggs | 15:64a57183aa03 | 252 | { |
icraggs | 15:64a57183aa03 | 253 | } |
icraggs | 15:64a57183aa03 | 254 | |
icraggs | 15:64a57183aa03 | 255 | |
Ian Craggs |
12:cc7f2d62a393 | 256 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout) |
icraggs | 8:c46930bd6c82 | 257 | { |
icraggs | 8:c46930bd6c82 | 258 | /* get one piece of work off the wire and one pass through */ |
icraggs | 8:c46930bd6c82 | 259 | |
Ian Craggs |
12:cc7f2d62a393 | 260 | // read the socket, see what work is due |
icraggs | 9:01b8cc7d94cc | 261 | int packet_type = readPacket(timeout); |
icraggs | 8:c46930bd6c82 | 262 | |
icraggs | 8:c46930bd6c82 | 263 | printf("packet type %d\n", packet_type); |
icraggs | 15:64a57183aa03 | 264 | |
Ian Craggs |
12:cc7f2d62a393 | 265 | int len, rc; |
icraggs | 8:c46930bd6c82 | 266 | switch (packet_type) |
icraggs | 8:c46930bd6c82 | 267 | { |
icraggs | 15:64a57183aa03 | 268 | case CONNACK: |
icraggs | 15:64a57183aa03 | 269 | if (this->thread) |
icraggs | 15:64a57183aa03 | 270 | { |
icraggs | 15:64a57183aa03 | 271 | Result res = {this, 0}; |
Ian Craggs |
12:cc7f2d62a393 | 272 | if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1) |
icraggs | 15:64a57183aa03 | 273 | ; |
icraggs | 15:64a57183aa03 | 274 | connectHandler(&res); |
icraggs | 15:64a57183aa03 | 275 | connectHandler.detach(); // only invoke the callback once |
icraggs | 15:64a57183aa03 | 276 | } |
icraggs | 8:c46930bd6c82 | 277 | case PUBACK: |
icraggs | 8:c46930bd6c82 | 278 | case SUBACK: |
icraggs | 8:c46930bd6c82 | 279 | break; |
icraggs | 15:64a57183aa03 | 280 | case PUBLISH: |
icraggs | 15:64a57183aa03 | 281 | MQTTString topicName; |
icraggs | 15:64a57183aa03 | 282 | Message msg; |
icraggs | 15:64a57183aa03 | 283 | rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, |
icraggs | 15:64a57183aa03 | 284 | (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen); |
icraggs | 15:64a57183aa03 | 285 | if (msg.qos == QOS0) |
icraggs | 15:64a57183aa03 | 286 | deliverMessage(&topicName, &msg); |
Ian Craggs |
12:cc7f2d62a393 | 287 | break; |
icraggs | 15:64a57183aa03 | 288 | case PUBREC: |
Ian Craggs |
12:cc7f2d62a393 | 289 | int type, dup, mypacketid; |
Ian Craggs |
12:cc7f2d62a393 | 290 | if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) |
icraggs | 15:64a57183aa03 | 291 | ; |
icraggs | 15:64a57183aa03 | 292 | // must lock this access against the application thread, if we are multi-threaded |
Ian Craggs |
12:cc7f2d62a393 | 293 | len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid); |
icraggs | 15:64a57183aa03 | 294 | rc = sendPacket(len); // send the subscribe packet |
icraggs | 15:64a57183aa03 | 295 | if (rc != len) |
icraggs | 15:64a57183aa03 | 296 | goto exit; // there was a problem |
Ian Craggs |
12:cc7f2d62a393 | 297 | |
icraggs | 8:c46930bd6c82 | 298 | break; |
icraggs | 8:c46930bd6c82 | 299 | case PUBCOMP: |
icraggs | 8:c46930bd6c82 | 300 | break; |
icraggs | 15:64a57183aa03 | 301 | case PINGRESP: |
icraggs | 15:64a57183aa03 | 302 | ping_outstanding = false; |
icraggs | 8:c46930bd6c82 | 303 | break; |
icraggs | 15:64a57183aa03 | 304 | } |
icraggs | 15:64a57183aa03 | 305 | keepalive(); |
Ian Craggs |
12:cc7f2d62a393 | 306 | exit: |
icraggs | 8:c46930bd6c82 | 307 | return packet_type; |
icraggs | 15:64a57183aa03 | 308 | } |
icraggs | 15:64a57183aa03 | 309 | |
icraggs | 15:64a57183aa03 | 310 | |
Ian Craggs |
12:cc7f2d62a393 | 311 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive() |
icraggs | 15:64a57183aa03 | 312 | { |
icraggs | 15:64a57183aa03 | 313 | int rc = 0; |
icraggs | 15:64a57183aa03 | 314 | |
icraggs | 15:64a57183aa03 | 315 | if (keepAliveInterval == 0) |
icraggs | 15:64a57183aa03 | 316 | goto exit; |
icraggs | 15:64a57183aa03 | 317 | |
icraggs | 15:64a57183aa03 | 318 | if (ping_timer.read_ms() >= (keepAliveInterval * 1000)) |
icraggs | 15:64a57183aa03 | 319 | { |
icraggs | 15:64a57183aa03 | 320 | if (ping_outstanding) |
icraggs | 15:64a57183aa03 | 321 | rc = -1; |
icraggs | 15:64a57183aa03 | 322 | else |
icraggs | 15:64a57183aa03 | 323 | { |
icraggs | 15:64a57183aa03 | 324 | int len = MQTTSerialize_pingreq(buf, buflen); |
icraggs | 15:64a57183aa03 | 325 | rc = sendPacket(len); // send the connect packet |
icraggs | 15:64a57183aa03 | 326 | if (rc != len) |
icraggs | 15:64a57183aa03 | 327 | rc = -1; // indicate there's a problem |
icraggs | 15:64a57183aa03 | 328 | else |
icraggs | 15:64a57183aa03 | 329 | ping_outstanding = true; |
icraggs | 15:64a57183aa03 | 330 | } |
icraggs | 15:64a57183aa03 | 331 | } |
icraggs | 15:64a57183aa03 | 332 | |
icraggs | 15:64a57183aa03 | 333 | exit: |
Ian Craggs |
12:cc7f2d62a393 | 334 | return rc; |
icraggs | 8:c46930bd6c82 | 335 | } |
icraggs | 8:c46930bd6c82 | 336 | |
icraggs | 8:c46930bd6c82 | 337 | |
icraggs | 8:c46930bd6c82 | 338 | template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument) |
icraggs | 15:64a57183aa03 | 339 | { |
icraggs | 15:64a57183aa03 | 340 | while (true) |
Ian Craggs |
12:cc7f2d62a393 | 341 | cycle((keepAliveInterval * 1000) - ping_timer.read_ms()); |
icraggs | 8:c46930bd6c82 | 342 | } |
icraggs | 8:c46930bd6c82 | 343 | |
icraggs | 8:c46930bd6c82 | 344 | |
icraggs | 9:01b8cc7d94cc | 345 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) |
icraggs | 15:64a57183aa03 | 346 | { |
icraggs | 15:64a57183aa03 | 347 | command_timer.start(); |
icraggs | 8:c46930bd6c82 | 348 | |
Ian Craggs |
12:cc7f2d62a393 | 349 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
icraggs | 8:c46930bd6c82 | 350 | if (options == 0) |
Ian Craggs |
12:cc7f2d62a393 | 351 | options = &default_options; // set default options if none were supplied |
icraggs | 8:c46930bd6c82 | 352 | |
icraggs | 15:64a57183aa03 | 353 | this->keepAliveInterval = options->keepAliveInterval; |
Ian Craggs |
12:cc7f2d62a393 | 354 | ping_timer.start(); |
Ian Craggs |
12:cc7f2d62a393 | 355 | int len = MQTTSerialize_connect(buf, buflen, options); |
icraggs | 15:64a57183aa03 | 356 | int rc = sendPacket(len); // send the connect packet |
icraggs | 15:64a57183aa03 | 357 | if (rc != len) |
Ian Craggs |
12:cc7f2d62a393 | 358 | goto exit; // there was a problem |
icraggs | 8:c46930bd6c82 | 359 | |
Ian Craggs |
12:cc7f2d62a393 | 360 | if (resultHandler == 0) // wait until the connack is received |
icraggs | 15:64a57183aa03 | 361 | { |
icraggs | 8:c46930bd6c82 | 362 | // this will be a blocking call, wait for the connack |
icraggs | 15:64a57183aa03 | 363 | do |
icraggs | 8:c46930bd6c82 | 364 | { |
icraggs | 15:64a57183aa03 | 365 | if (command_timer.read_ms() > (command_timeout * 1000)) |
icraggs | 15:64a57183aa03 | 366 | goto exit; // we timed out |
icraggs | 15:64a57183aa03 | 367 | } |
icraggs | 15:64a57183aa03 | 368 | while (cycle(command_timeout - command_timer.read_ms()) != CONNACK); |
icraggs | 15:64a57183aa03 | 369 | int connack_rc = -1; |
icraggs | 15:64a57183aa03 | 370 | if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) |
icraggs | 15:64a57183aa03 | 371 | rc = connack_rc; |
icraggs | 8:c46930bd6c82 | 372 | } |
icraggs | 8:c46930bd6c82 | 373 | else |
icraggs | 8:c46930bd6c82 | 374 | { |
icraggs | 8:c46930bd6c82 | 375 | // set connect response callback function |
icraggs | 9:01b8cc7d94cc | 376 | connectHandler.attach(resultHandler); |
icraggs | 8:c46930bd6c82 | 377 | |
Ian Craggs |
12:cc7f2d62a393 | 378 | // start background thread |
icraggs | 11:db15da110a37 | 379 | this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this); |
icraggs | 8:c46930bd6c82 | 380 | } |
icraggs | 15:64a57183aa03 | 381 | |
icraggs | 15:64a57183aa03 | 382 | exit: |
icraggs | 15:64a57183aa03 | 383 | command_timer.stop(); |
Ian Craggs |
12:cc7f2d62a393 | 384 | command_timer.reset(); |
icraggs | 8:c46930bd6c82 | 385 | return rc; |
icraggs | 8:c46930bd6c82 | 386 | } |
icraggs | 8:c46930bd6c82 | 387 | |
icraggs | 8:c46930bd6c82 | 388 | |
Ian Craggs |
12:cc7f2d62a393 | 389 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler) |
icraggs | 15:64a57183aa03 | 390 | { |
icraggs | 15:64a57183aa03 | 391 | command_timer.start(); |
Ian Craggs |
12:cc7f2d62a393 | 392 | |
icraggs | 8:c46930bd6c82 | 393 | MQTTString topic = {(char*)topicFilter, 0, 0}; |
icraggs | 8:c46930bd6c82 | 394 | |
Ian Craggs |
12:cc7f2d62a393 | 395 | int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
icraggs | 15:64a57183aa03 | 396 | int rc = sendPacket(len); // send the subscribe packet |
icraggs | 15:64a57183aa03 | 397 | if (rc != len) |
Ian Craggs |
12:cc7f2d62a393 | 398 | goto exit; // there was a problem |
icraggs | 8:c46930bd6c82 | 399 | |
icraggs | 8:c46930bd6c82 | 400 | /* wait for suback */ |
icraggs | 8:c46930bd6c82 | 401 | if (resultHandler == 0) |
icraggs | 8:c46930bd6c82 | 402 | { |
icraggs | 8:c46930bd6c82 | 403 | // this will block |
Ian Craggs |
12:cc7f2d62a393 | 404 | if (cycle(command_timeout - command_timer.read_ms()) == SUBACK) |
icraggs | 8:c46930bd6c82 | 405 | { |
icraggs | 9:01b8cc7d94cc | 406 | int count = 0, grantedQoS = -1, mypacketid; |
icraggs | 9:01b8cc7d94cc | 407 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) |
icraggs | 8:c46930bd6c82 | 408 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
icraggs | 8:c46930bd6c82 | 409 | } |
icraggs | 8:c46930bd6c82 | 410 | } |
icraggs | 8:c46930bd6c82 | 411 | else |
icraggs | 8:c46930bd6c82 | 412 | { |
icraggs | 8:c46930bd6c82 | 413 | // set subscribe response callback function |
icraggs | 8:c46930bd6c82 | 414 | |
icraggs | 8:c46930bd6c82 | 415 | } |
icraggs | 15:64a57183aa03 | 416 | |
icraggs | 15:64a57183aa03 | 417 | exit: |
icraggs | 15:64a57183aa03 | 418 | command_timer.stop(); |
Ian Craggs |
12:cc7f2d62a393 | 419 | command_timer.reset(); |
Ian Craggs |
12:cc7f2d62a393 | 420 | return rc; |
icraggs | 15:64a57183aa03 | 421 | } |
icraggs | 15:64a57183aa03 | 422 | |
icraggs | 15:64a57183aa03 | 423 | |
Ian Craggs |
12:cc7f2d62a393 | 424 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler) |
icraggs | 15:64a57183aa03 | 425 | { |
icraggs | 15:64a57183aa03 | 426 | command_timer.start(); |
Ian Craggs |
12:cc7f2d62a393 | 427 | |
Ian Craggs |
12:cc7f2d62a393 | 428 | MQTTString topic = {(char*)topicFilter, 0, 0}; |
icraggs | 8:c46930bd6c82 | 429 | |
Ian Craggs |
12:cc7f2d62a393 | 430 | int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic); |
icraggs | 15:64a57183aa03 | 431 | int rc = sendPacket(len); // send the subscribe packet |
icraggs | 15:64a57183aa03 | 432 | if (rc != len) |
Ian Craggs |
12:cc7f2d62a393 | 433 | goto exit; // there was a problem |
Ian Craggs |
12:cc7f2d62a393 | 434 | |
Ian Craggs |
12:cc7f2d62a393 | 435 | /* wait for suback */ |
Ian Craggs |
12:cc7f2d62a393 | 436 | if (resultHandler == 0) |
Ian Craggs |
12:cc7f2d62a393 | 437 | { |
Ian Craggs |
12:cc7f2d62a393 | 438 | // this will block |
Ian Craggs |
12:cc7f2d62a393 | 439 | if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK) |
Ian Craggs |
12:cc7f2d62a393 | 440 | { |
Ian Craggs |
12:cc7f2d62a393 | 441 | int mypacketid; |
Ian Craggs |
12:cc7f2d62a393 | 442 | if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1) |
Ian Craggs |
12:cc7f2d62a393 | 443 | rc = 0; |
Ian Craggs |
12:cc7f2d62a393 | 444 | } |
Ian Craggs |
12:cc7f2d62a393 | 445 | } |
Ian Craggs |
12:cc7f2d62a393 | 446 | else |
Ian Craggs |
12:cc7f2d62a393 | 447 | { |
Ian Craggs |
12:cc7f2d62a393 | 448 | // set unsubscribe response callback function |
Ian Craggs |
12:cc7f2d62a393 | 449 | |
Ian Craggs |
12:cc7f2d62a393 | 450 | } |
icraggs | 15:64a57183aa03 | 451 | |
icraggs | 15:64a57183aa03 | 452 | exit: |
icraggs | 15:64a57183aa03 | 453 | command_timer.stop(); |
Ian Craggs |
12:cc7f2d62a393 | 454 | command_timer.reset(); |
Ian Craggs |
12:cc7f2d62a393 | 455 | return rc; |
icraggs | 15:64a57183aa03 | 456 | } |
icraggs | 15:64a57183aa03 | 457 | |
icraggs | 15:64a57183aa03 | 458 | |
icraggs | 15:64a57183aa03 | 459 | |
Ian Craggs |
12:cc7f2d62a393 | 460 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler) |
icraggs | 15:64a57183aa03 | 461 | { |
icraggs | 15:64a57183aa03 | 462 | command_timer.start(); |
Ian Craggs |
12:cc7f2d62a393 | 463 | |
icraggs | 15:64a57183aa03 | 464 | MQTTString topic = {(char*)topicName, 0, 0}; |
icraggs | 15:64a57183aa03 | 465 | |
Ian Craggs |
12:cc7f2d62a393 | 466 | message->id = packetid.getNext(); |
icraggs | 15:64a57183aa03 | 467 | |
Ian Craggs |
12:cc7f2d62a393 | 468 | int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen); |
icraggs | 15:64a57183aa03 | 469 | int rc = sendPacket(len); // send the subscribe packet |
icraggs | 15:64a57183aa03 | 470 | if (rc != len) |
Ian Craggs |
12:cc7f2d62a393 | 471 | goto exit; // there was a problem |
Ian Craggs |
12:cc7f2d62a393 | 472 | |
Ian Craggs |
12:cc7f2d62a393 | 473 | /* wait for acks */ |
Ian Craggs |
12:cc7f2d62a393 | 474 | if (resultHandler == 0) |
Ian Craggs |
12:cc7f2d62a393 | 475 | { |
icraggs | 15:64a57183aa03 | 476 | if (message->qos == QOS1) |
Ian Craggs |
12:cc7f2d62a393 | 477 | { |
Ian Craggs |
12:cc7f2d62a393 | 478 | if (cycle(command_timeout - command_timer.read_ms()) == PUBACK) |
Ian Craggs |
12:cc7f2d62a393 | 479 | { |
Ian Craggs |
12:cc7f2d62a393 | 480 | int type, dup, mypacketid; |
Ian Craggs |
12:cc7f2d62a393 | 481 | if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) |
Ian Craggs |
12:cc7f2d62a393 | 482 | rc = 0; |
icraggs | 15:64a57183aa03 | 483 | } |
icraggs | 15:64a57183aa03 | 484 | } |
icraggs | 15:64a57183aa03 | 485 | else if (message->qos == QOS2) |
icraggs | 15:64a57183aa03 | 486 | { |
icraggs | 15:64a57183aa03 | 487 | if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP) |
icraggs | 15:64a57183aa03 | 488 | { |
icraggs | 15:64a57183aa03 | 489 | int type, dup, mypacketid; |
icraggs | 15:64a57183aa03 | 490 | if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1) |
icraggs | 15:64a57183aa03 | 491 | rc = 0; |
icraggs | 15:64a57183aa03 | 492 | } |
icraggs | 15:64a57183aa03 | 493 | |
Ian Craggs |
12:cc7f2d62a393 | 494 | } |
Ian Craggs |
12:cc7f2d62a393 | 495 | } |
Ian Craggs |
12:cc7f2d62a393 | 496 | else |
Ian Craggs |
12:cc7f2d62a393 | 497 | { |
Ian Craggs |
12:cc7f2d62a393 | 498 | // set publish response callback function |
Ian Craggs |
12:cc7f2d62a393 | 499 | |
Ian Craggs |
12:cc7f2d62a393 | 500 | } |
icraggs | 15:64a57183aa03 | 501 | |
icraggs | 15:64a57183aa03 | 502 | exit: |
icraggs | 15:64a57183aa03 | 503 | command_timer.stop(); |
Ian Craggs |
12:cc7f2d62a393 | 504 | command_timer.reset(); |
icraggs | 8:c46930bd6c82 | 505 | return rc; |
icraggs | 8:c46930bd6c82 | 506 | } |
icraggs | 8:c46930bd6c82 | 507 | |
icraggs | 8:c46930bd6c82 | 508 | |
sam_grove | 0:fe461e4d7afe | 509 | #endif |