Contains example code to connect the mbed LPC1768 or FRDM-K64F devices to the IBM Internet of Things Cloud service via ethernet.

Dependencies:   C12832 MQTT LM75B MMA7660

Dependents:   MFT_IoT_demo_USB400 IBM_RFID

Committer:
samdanbury
Date:
Wed Aug 20 12:45:14 2014 +0000
Revision:
6:37b6d0d56190
Child:
8:80d49dd91542
Code completely changed to improve the structure, flow and memory usage of the application

Who changed what in which revision?

UserRevisionLine numberNew contents of line
samdanbury 6:37b6d0d56190 1 /*******************************************************************************
samdanbury 6:37b6d0d56190 2 * Copyright (c) 2014 IBM Corp.
samdanbury 6:37b6d0d56190 3 *
samdanbury 6:37b6d0d56190 4 * All rights reserved. This program and the accompanying materials
samdanbury 6:37b6d0d56190 5 * are made available under the terms of the Eclipse Public License v1.0
samdanbury 6:37b6d0d56190 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
samdanbury 6:37b6d0d56190 7 *
samdanbury 6:37b6d0d56190 8 * The Eclipse Public License is available at
samdanbury 6:37b6d0d56190 9 * http://www.eclipse.org/legal/epl-v10.html
samdanbury 6:37b6d0d56190 10 * and the Eclipse Distribution License is available at
samdanbury 6:37b6d0d56190 11 * http://www.eclipse.org/org/documents/edl-v10.php.
samdanbury 6:37b6d0d56190 12 *
samdanbury 6:37b6d0d56190 13 * Contributors:
samdanbury 6:37b6d0d56190 14 * Ian Craggs - initial API and implementation and/or initial documentation
samdanbury 6:37b6d0d56190 15 *******************************************************************************/
samdanbury 6:37b6d0d56190 16
samdanbury 6:37b6d0d56190 17 /*
samdanbury 6:37b6d0d56190 18
samdanbury 6:37b6d0d56190 19 TODO:
samdanbury 6:37b6d0d56190 20
samdanbury 6:37b6d0d56190 21 ensure publish packets are retried on reconnect
samdanbury 6:37b6d0d56190 22
samdanbury 6:37b6d0d56190 23 */
samdanbury 6:37b6d0d56190 24
samdanbury 6:37b6d0d56190 25 #if !defined(MQTTCLIENT_H)
samdanbury 6:37b6d0d56190 26 #define MQTTCLIENT_H
samdanbury 6:37b6d0d56190 27
samdanbury 6:37b6d0d56190 28 #include "FP.h"
samdanbury 6:37b6d0d56190 29 #include "MQTTPacket.h"
samdanbury 6:37b6d0d56190 30 #include "stdio.h"
samdanbury 6:37b6d0d56190 31 #include "MQTT_logging.h"
samdanbury 6:37b6d0d56190 32
samdanbury 6:37b6d0d56190 33 namespace MQTT
samdanbury 6:37b6d0d56190 34 {
samdanbury 6:37b6d0d56190 35
samdanbury 6:37b6d0d56190 36
samdanbury 6:37b6d0d56190 37 enum QoS { QOS0, QOS1, QOS2 };
samdanbury 6:37b6d0d56190 38
samdanbury 6:37b6d0d56190 39 // all failure return codes must be negative
samdanbury 6:37b6d0d56190 40 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
samdanbury 6:37b6d0d56190 41
samdanbury 6:37b6d0d56190 42
samdanbury 6:37b6d0d56190 43 struct Message
samdanbury 6:37b6d0d56190 44 {
samdanbury 6:37b6d0d56190 45 enum QoS qos;
samdanbury 6:37b6d0d56190 46 bool retained;
samdanbury 6:37b6d0d56190 47 bool dup;
samdanbury 6:37b6d0d56190 48 unsigned short id;
samdanbury 6:37b6d0d56190 49 void *payload;
samdanbury 6:37b6d0d56190 50 size_t payloadlen;
samdanbury 6:37b6d0d56190 51 };
samdanbury 6:37b6d0d56190 52
samdanbury 6:37b6d0d56190 53
samdanbury 6:37b6d0d56190 54 struct MessageData
samdanbury 6:37b6d0d56190 55 {
samdanbury 6:37b6d0d56190 56 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
samdanbury 6:37b6d0d56190 57 { }
samdanbury 6:37b6d0d56190 58
samdanbury 6:37b6d0d56190 59 struct Message &message;
samdanbury 6:37b6d0d56190 60 MQTTString &topicName;
samdanbury 6:37b6d0d56190 61 };
samdanbury 6:37b6d0d56190 62
samdanbury 6:37b6d0d56190 63
samdanbury 6:37b6d0d56190 64 class PacketId
samdanbury 6:37b6d0d56190 65 {
samdanbury 6:37b6d0d56190 66 public:
samdanbury 6:37b6d0d56190 67 PacketId()
samdanbury 6:37b6d0d56190 68 {
samdanbury 6:37b6d0d56190 69 next = 0;
samdanbury 6:37b6d0d56190 70 }
samdanbury 6:37b6d0d56190 71
samdanbury 6:37b6d0d56190 72 int getNext()
samdanbury 6:37b6d0d56190 73 {
samdanbury 6:37b6d0d56190 74 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
samdanbury 6:37b6d0d56190 75 }
samdanbury 6:37b6d0d56190 76
samdanbury 6:37b6d0d56190 77 private:
samdanbury 6:37b6d0d56190 78 static const int MAX_PACKET_ID = 65535;
samdanbury 6:37b6d0d56190 79 int next;
samdanbury 6:37b6d0d56190 80 };
samdanbury 6:37b6d0d56190 81
samdanbury 6:37b6d0d56190 82
samdanbury 6:37b6d0d56190 83 class QoS2
samdanbury 6:37b6d0d56190 84 {
samdanbury 6:37b6d0d56190 85 public:
samdanbury 6:37b6d0d56190 86
samdanbury 6:37b6d0d56190 87
samdanbury 6:37b6d0d56190 88 private:
samdanbury 6:37b6d0d56190 89
samdanbury 6:37b6d0d56190 90
samdanbury 6:37b6d0d56190 91 };
samdanbury 6:37b6d0d56190 92
samdanbury 6:37b6d0d56190 93
samdanbury 6:37b6d0d56190 94 /**
samdanbury 6:37b6d0d56190 95 * @class Client
samdanbury 6:37b6d0d56190 96 * @brief blocking, non-threaded MQTT client API
samdanbury 6:37b6d0d56190 97 *
samdanbury 6:37b6d0d56190 98 * This version of the API blocks on all method calls, until they are complete. This means that only one
samdanbury 6:37b6d0d56190 99 * MQTT request can be in process at any one time.
samdanbury 6:37b6d0d56190 100 * @param Network a network class which supports send, receive
samdanbury 6:37b6d0d56190 101 * @param Timer a timer class with the methods:
samdanbury 6:37b6d0d56190 102 */
samdanbury 6:37b6d0d56190 103 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
samdanbury 6:37b6d0d56190 104 class Client
samdanbury 6:37b6d0d56190 105 {
samdanbury 6:37b6d0d56190 106
samdanbury 6:37b6d0d56190 107 public:
samdanbury 6:37b6d0d56190 108
samdanbury 6:37b6d0d56190 109 typedef void (*messageHandler)(MessageData&);
samdanbury 6:37b6d0d56190 110
samdanbury 6:37b6d0d56190 111 /** Construct the client
samdanbury 6:37b6d0d56190 112 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
samdanbury 6:37b6d0d56190 113 * before calling MQTT connect
samdanbury 6:37b6d0d56190 114 * @param limits an instance of the Limit class - to alter limits as required
samdanbury 6:37b6d0d56190 115 */
samdanbury 6:37b6d0d56190 116 Client(Network& network, unsigned int command_timeout_ms = 30000);
samdanbury 6:37b6d0d56190 117
samdanbury 6:37b6d0d56190 118 /** Set the default message handling callback - used for any message which does not match a subscription message handler
samdanbury 6:37b6d0d56190 119 * @param mh - pointer to the callback function
samdanbury 6:37b6d0d56190 120 */
samdanbury 6:37b6d0d56190 121 void setDefaultMessageHandler(messageHandler mh)
samdanbury 6:37b6d0d56190 122 {
samdanbury 6:37b6d0d56190 123 defaultMessageHandler.attach(mh);
samdanbury 6:37b6d0d56190 124 }
samdanbury 6:37b6d0d56190 125
samdanbury 6:37b6d0d56190 126 void setLogHandler()
samdanbury 6:37b6d0d56190 127 {
samdanbury 6:37b6d0d56190 128 logHandler.attach(lh);
samdanbury 6:37b6d0d56190 129 }
samdanbury 6:37b6d0d56190 130
samdanbury 6:37b6d0d56190 131
samdanbury 6:37b6d0d56190 132 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
samdanbury 6:37b6d0d56190 133 * The nework object must be connected to the network endpoint before calling this
samdanbury 6:37b6d0d56190 134 * @param options - connect options
samdanbury 6:37b6d0d56190 135 * @return success code -
samdanbury 6:37b6d0d56190 136 */
samdanbury 6:37b6d0d56190 137 int connect(MQTTPacket_connectData* options = 0);
samdanbury 6:37b6d0d56190 138
samdanbury 6:37b6d0d56190 139 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
samdanbury 6:37b6d0d56190 140 * @param topic - the topic to publish to
samdanbury 6:37b6d0d56190 141 * @param message - the message to send
samdanbury 6:37b6d0d56190 142 * @return success code -
samdanbury 6:37b6d0d56190 143 */
samdanbury 6:37b6d0d56190 144 int publish(const char* topicName, Message* message);
samdanbury 6:37b6d0d56190 145
samdanbury 6:37b6d0d56190 146 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
samdanbury 6:37b6d0d56190 147 * @param topicFilter - a topic pattern which can include wildcards
samdanbury 6:37b6d0d56190 148 * @param qos - the MQTT QoS to subscribe at
samdanbury 6:37b6d0d56190 149 * @param mh - the callback function to be invoked when a message is received for this subscription
samdanbury 6:37b6d0d56190 150 * @return success code -
samdanbury 6:37b6d0d56190 151 */
samdanbury 6:37b6d0d56190 152 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
samdanbury 6:37b6d0d56190 153
samdanbury 6:37b6d0d56190 154 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
samdanbury 6:37b6d0d56190 155 * @param topicFilter - a topic pattern which can include wildcards
samdanbury 6:37b6d0d56190 156 * @return success code -
samdanbury 6:37b6d0d56190 157 */
samdanbury 6:37b6d0d56190 158 int unsubscribe(const char* topicFilter);
samdanbury 6:37b6d0d56190 159
samdanbury 6:37b6d0d56190 160 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
samdanbury 6:37b6d0d56190 161 * @return success code -
samdanbury 6:37b6d0d56190 162 */
samdanbury 6:37b6d0d56190 163 int disconnect();
samdanbury 6:37b6d0d56190 164
samdanbury 6:37b6d0d56190 165 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
samdanbury 6:37b6d0d56190 166 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
samdanbury 6:37b6d0d56190 167 * received.
samdanbury 6:37b6d0d56190 168 * @param timeout_ms the time to wait, in milliseconds
samdanbury 6:37b6d0d56190 169 * @return success code - on failure, this means the client has disconnected
samdanbury 6:37b6d0d56190 170 */
samdanbury 6:37b6d0d56190 171 int yield(int timeout_ms = 1000);
samdanbury 6:37b6d0d56190 172
samdanbury 6:37b6d0d56190 173 private:
samdanbury 6:37b6d0d56190 174
samdanbury 6:37b6d0d56190 175 int cycle(Timer& timer);
samdanbury 6:37b6d0d56190 176 int waitfor(int packet_type, Timer& timer);
samdanbury 6:37b6d0d56190 177 int keepalive();
samdanbury 6:37b6d0d56190 178
samdanbury 6:37b6d0d56190 179 int decodePacket(int* value, int timeout);
samdanbury 6:37b6d0d56190 180 int readPacket(Timer& timer);
samdanbury 6:37b6d0d56190 181 int sendPacket(int length, Timer& timer);
samdanbury 6:37b6d0d56190 182 int deliverMessage(MQTTString& topicName, Message& message);
samdanbury 6:37b6d0d56190 183 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
samdanbury 6:37b6d0d56190 184
samdanbury 6:37b6d0d56190 185 Network& ipstack;
samdanbury 6:37b6d0d56190 186 unsigned int command_timeout_ms;
samdanbury 6:37b6d0d56190 187
samdanbury 6:37b6d0d56190 188 unsigned char buf[MAX_MQTT_PACKET_SIZE];
samdanbury 6:37b6d0d56190 189 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
samdanbury 6:37b6d0d56190 190
samdanbury 6:37b6d0d56190 191 Timer ping_timer;
samdanbury 6:37b6d0d56190 192 unsigned int keepAliveInterval;
samdanbury 6:37b6d0d56190 193 bool ping_outstanding;
samdanbury 6:37b6d0d56190 194
samdanbury 6:37b6d0d56190 195 PacketId packetid;
samdanbury 6:37b6d0d56190 196
samdanbury 6:37b6d0d56190 197 struct MessageHandlers
samdanbury 6:37b6d0d56190 198 {
samdanbury 6:37b6d0d56190 199 const char* topicFilter;
samdanbury 6:37b6d0d56190 200 FP<void, MessageData&> fp;
samdanbury 6:37b6d0d56190 201 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
samdanbury 6:37b6d0d56190 202
samdanbury 6:37b6d0d56190 203 FP<void, MessageData&> defaultMessageHandler;
samdanbury 6:37b6d0d56190 204
samdanbury 6:37b6d0d56190 205 bool isconnected;
samdanbury 6:37b6d0d56190 206
samdanbury 6:37b6d0d56190 207 #if 0
samdanbury 6:37b6d0d56190 208 struct
samdanbury 6:37b6d0d56190 209 {
samdanbury 6:37b6d0d56190 210 bool used;
samdanbury 6:37b6d0d56190 211 int id;
samdanbury 6:37b6d0d56190 212 } QoS2messages[MAX_QOS2_MESSAGES];
samdanbury 6:37b6d0d56190 213
samdanbury 6:37b6d0d56190 214 #endif
samdanbury 6:37b6d0d56190 215
samdanbury 6:37b6d0d56190 216 };
samdanbury 6:37b6d0d56190 217
samdanbury 6:37b6d0d56190 218 }
samdanbury 6:37b6d0d56190 219
samdanbury 6:37b6d0d56190 220
samdanbury 6:37b6d0d56190 221 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 222 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
samdanbury 6:37b6d0d56190 223 {
samdanbury 6:37b6d0d56190 224 ping_timer = Timer();
samdanbury 6:37b6d0d56190 225 ping_outstanding = 0;
samdanbury 6:37b6d0d56190 226 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
samdanbury 6:37b6d0d56190 227 messageHandlers[i].topicFilter = 0;
samdanbury 6:37b6d0d56190 228 this->command_timeout_ms = command_timeout_ms;
samdanbury 6:37b6d0d56190 229 isconnected = false;
samdanbury 6:37b6d0d56190 230 }
samdanbury 6:37b6d0d56190 231
samdanbury 6:37b6d0d56190 232
samdanbury 6:37b6d0d56190 233 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 234 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
samdanbury 6:37b6d0d56190 235 {
samdanbury 6:37b6d0d56190 236 int rc = FAILURE,
samdanbury 6:37b6d0d56190 237 sent = 0;
samdanbury 6:37b6d0d56190 238
samdanbury 6:37b6d0d56190 239 while (sent < length && !timer.expired())
samdanbury 6:37b6d0d56190 240 {
samdanbury 6:37b6d0d56190 241 rc = ipstack.write(&buf[sent], length, timer.left_ms());
samdanbury 6:37b6d0d56190 242 if (rc < 0) // there was an error writing the data
samdanbury 6:37b6d0d56190 243 break;
samdanbury 6:37b6d0d56190 244 sent += rc;
samdanbury 6:37b6d0d56190 245 }
samdanbury 6:37b6d0d56190 246 if (sent == length)
samdanbury 6:37b6d0d56190 247 {
samdanbury 6:37b6d0d56190 248 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
samdanbury 6:37b6d0d56190 249 rc = SUCCESS;
samdanbury 6:37b6d0d56190 250 //if (debug)
samdanbury 6:37b6d0d56190 251 // Log (packet)
samdanbury 6:37b6d0d56190 252 }
samdanbury 6:37b6d0d56190 253 else
samdanbury 6:37b6d0d56190 254 rc = FAILURE;
samdanbury 6:37b6d0d56190 255 return rc;
samdanbury 6:37b6d0d56190 256 }
samdanbury 6:37b6d0d56190 257
samdanbury 6:37b6d0d56190 258
samdanbury 6:37b6d0d56190 259 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 260 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
samdanbury 6:37b6d0d56190 261 {
samdanbury 6:37b6d0d56190 262 unsigned char c;
samdanbury 6:37b6d0d56190 263 int multiplier = 1;
samdanbury 6:37b6d0d56190 264 int len = 0;
samdanbury 6:37b6d0d56190 265 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
samdanbury 6:37b6d0d56190 266
samdanbury 6:37b6d0d56190 267 *value = 0;
samdanbury 6:37b6d0d56190 268 do
samdanbury 6:37b6d0d56190 269 {
samdanbury 6:37b6d0d56190 270 int rc = MQTTPACKET_READ_ERROR;
samdanbury 6:37b6d0d56190 271
samdanbury 6:37b6d0d56190 272 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
samdanbury 6:37b6d0d56190 273 {
samdanbury 6:37b6d0d56190 274 rc = MQTTPACKET_READ_ERROR; /* bad data */
samdanbury 6:37b6d0d56190 275 goto exit;
samdanbury 6:37b6d0d56190 276 }
samdanbury 6:37b6d0d56190 277 rc = ipstack.read(&c, 1, timeout);
samdanbury 6:37b6d0d56190 278 if (rc != 1)
samdanbury 6:37b6d0d56190 279 goto exit;
samdanbury 6:37b6d0d56190 280 *value += (c & 127) * multiplier;
samdanbury 6:37b6d0d56190 281 multiplier *= 128;
samdanbury 6:37b6d0d56190 282 } while ((c & 128) != 0);
samdanbury 6:37b6d0d56190 283 exit:
samdanbury 6:37b6d0d56190 284 return len;
samdanbury 6:37b6d0d56190 285 }
samdanbury 6:37b6d0d56190 286
samdanbury 6:37b6d0d56190 287
samdanbury 6:37b6d0d56190 288 /**
samdanbury 6:37b6d0d56190 289 * If any read fails in this method, then we should disconnect from the network, as on reconnect
samdanbury 6:37b6d0d56190 290 * the packets can be retried.
samdanbury 6:37b6d0d56190 291 * @param timeout the max time to wait for the packet read to complete, in milliseconds
samdanbury 6:37b6d0d56190 292 * @return the MQTT packet type, or -1 if none
samdanbury 6:37b6d0d56190 293 */
samdanbury 6:37b6d0d56190 294 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 295 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
samdanbury 6:37b6d0d56190 296 {
samdanbury 6:37b6d0d56190 297 int rc = FAILURE;
samdanbury 6:37b6d0d56190 298 MQTTHeader header = {0};
samdanbury 6:37b6d0d56190 299 int len = 0;
samdanbury 6:37b6d0d56190 300 int rem_len = 0;
samdanbury 6:37b6d0d56190 301
samdanbury 6:37b6d0d56190 302 /* 1. read the header byte. This has the packet type in it */
samdanbury 6:37b6d0d56190 303 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
samdanbury 6:37b6d0d56190 304 goto exit;
samdanbury 6:37b6d0d56190 305
samdanbury 6:37b6d0d56190 306 len = 1;
samdanbury 6:37b6d0d56190 307 /* 2. read the remaining length. This is variable in itself */
samdanbury 6:37b6d0d56190 308 decodePacket(&rem_len, timer.left_ms());
samdanbury 6:37b6d0d56190 309 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
samdanbury 6:37b6d0d56190 310
samdanbury 6:37b6d0d56190 311 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
samdanbury 6:37b6d0d56190 312 if (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)
samdanbury 6:37b6d0d56190 313 goto exit;
samdanbury 6:37b6d0d56190 314
samdanbury 6:37b6d0d56190 315 header.byte = readbuf[0];
samdanbury 6:37b6d0d56190 316 rc = header.bits.type;
samdanbury 6:37b6d0d56190 317 exit:
samdanbury 6:37b6d0d56190 318 return rc;
samdanbury 6:37b6d0d56190 319 }
samdanbury 6:37b6d0d56190 320
samdanbury 6:37b6d0d56190 321
samdanbury 6:37b6d0d56190 322 // assume topic filter and name is in correct format
samdanbury 6:37b6d0d56190 323 // # can only be at end
samdanbury 6:37b6d0d56190 324 // + and # can only be next to separator
samdanbury 6:37b6d0d56190 325 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 326 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
samdanbury 6:37b6d0d56190 327 {
samdanbury 6:37b6d0d56190 328 char* curf = topicFilter;
samdanbury 6:37b6d0d56190 329 char* curn = topicName.lenstring.data;
samdanbury 6:37b6d0d56190 330 char* curn_end = curn + topicName.lenstring.len;
samdanbury 6:37b6d0d56190 331
samdanbury 6:37b6d0d56190 332 while (*curf && curn < curn_end)
samdanbury 6:37b6d0d56190 333 {
samdanbury 6:37b6d0d56190 334 if (*curn == '/' && *curf != '/')
samdanbury 6:37b6d0d56190 335 break;
samdanbury 6:37b6d0d56190 336 if (*curf != '+' && *curf != '#' && *curf != *curn)
samdanbury 6:37b6d0d56190 337 break;
samdanbury 6:37b6d0d56190 338 if (*curf == '+')
samdanbury 6:37b6d0d56190 339 { // skip until we meet the next separator, or end of string
samdanbury 6:37b6d0d56190 340 char* nextpos = curn + 1;
samdanbury 6:37b6d0d56190 341 while (nextpos < curn_end && *nextpos != '/')
samdanbury 6:37b6d0d56190 342 nextpos = ++curn + 1;
samdanbury 6:37b6d0d56190 343 }
samdanbury 6:37b6d0d56190 344 else if (*curf == '#')
samdanbury 6:37b6d0d56190 345 curn = curn_end - 1; // skip until end of string
samdanbury 6:37b6d0d56190 346 curf++;
samdanbury 6:37b6d0d56190 347 curn++;
samdanbury 6:37b6d0d56190 348 };
samdanbury 6:37b6d0d56190 349
samdanbury 6:37b6d0d56190 350 return (curn == curn_end) && (*curf == '\0');
samdanbury 6:37b6d0d56190 351 }
samdanbury 6:37b6d0d56190 352
samdanbury 6:37b6d0d56190 353
samdanbury 6:37b6d0d56190 354
samdanbury 6:37b6d0d56190 355 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 356 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
samdanbury 6:37b6d0d56190 357 {
samdanbury 6:37b6d0d56190 358 int rc = FAILURE;
samdanbury 6:37b6d0d56190 359
samdanbury 6:37b6d0d56190 360 // we have to find the right message handler - indexed by topic
samdanbury 6:37b6d0d56190 361 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
samdanbury 6:37b6d0d56190 362 {
samdanbury 6:37b6d0d56190 363 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
samdanbury 6:37b6d0d56190 364 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
samdanbury 6:37b6d0d56190 365 {
samdanbury 6:37b6d0d56190 366 if (messageHandlers[i].fp.attached())
samdanbury 6:37b6d0d56190 367 {
samdanbury 6:37b6d0d56190 368 MessageData md(topicName, message);
samdanbury 6:37b6d0d56190 369 messageHandlers[i].fp(md);
samdanbury 6:37b6d0d56190 370 rc = SUCCESS;
samdanbury 6:37b6d0d56190 371 }
samdanbury 6:37b6d0d56190 372 }
samdanbury 6:37b6d0d56190 373 }
samdanbury 6:37b6d0d56190 374
samdanbury 6:37b6d0d56190 375 if (rc == FAILURE && defaultMessageHandler.attached())
samdanbury 6:37b6d0d56190 376 {
samdanbury 6:37b6d0d56190 377 MessageData md(topicName, message);
samdanbury 6:37b6d0d56190 378 defaultMessageHandler(md);
samdanbury 6:37b6d0d56190 379 rc = SUCCESS;
samdanbury 6:37b6d0d56190 380 }
samdanbury 6:37b6d0d56190 381
samdanbury 6:37b6d0d56190 382 return rc;
samdanbury 6:37b6d0d56190 383 }
samdanbury 6:37b6d0d56190 384
samdanbury 6:37b6d0d56190 385
samdanbury 6:37b6d0d56190 386
samdanbury 6:37b6d0d56190 387 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 388 int MQTT::Client<Network, Timer, a, b>::yield(int timeout_ms)
samdanbury 6:37b6d0d56190 389 {
samdanbury 6:37b6d0d56190 390 int rc = SUCCESS;
samdanbury 6:37b6d0d56190 391 Timer timer = Timer();
samdanbury 6:37b6d0d56190 392
samdanbury 6:37b6d0d56190 393 timer.countdown_ms(timeout_ms);
samdanbury 6:37b6d0d56190 394 while (!timer.expired())
samdanbury 6:37b6d0d56190 395 {
samdanbury 6:37b6d0d56190 396 if (cycle(timer) == FAILURE)
samdanbury 6:37b6d0d56190 397 {
samdanbury 6:37b6d0d56190 398 rc = FAILURE;
samdanbury 6:37b6d0d56190 399 break;
samdanbury 6:37b6d0d56190 400 }
samdanbury 6:37b6d0d56190 401 }
samdanbury 6:37b6d0d56190 402
samdanbury 6:37b6d0d56190 403 return rc;
samdanbury 6:37b6d0d56190 404 }
samdanbury 6:37b6d0d56190 405
samdanbury 6:37b6d0d56190 406
samdanbury 6:37b6d0d56190 407 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 408 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
samdanbury 6:37b6d0d56190 409 {
samdanbury 6:37b6d0d56190 410 /* get one piece of work off the wire and one pass through */
samdanbury 6:37b6d0d56190 411
samdanbury 6:37b6d0d56190 412 // read the socket, see what work is due
samdanbury 6:37b6d0d56190 413 unsigned short packet_type = readPacket(timer);
samdanbury 6:37b6d0d56190 414
samdanbury 6:37b6d0d56190 415 int len = 0,
samdanbury 6:37b6d0d56190 416 rc = SUCCESS;
samdanbury 6:37b6d0d56190 417
samdanbury 6:37b6d0d56190 418 switch (packet_type)
samdanbury 6:37b6d0d56190 419 {
samdanbury 6:37b6d0d56190 420 case CONNACK:
samdanbury 6:37b6d0d56190 421 case PUBACK:
samdanbury 6:37b6d0d56190 422 case SUBACK:
samdanbury 6:37b6d0d56190 423 break;
samdanbury 6:37b6d0d56190 424 case PUBLISH:
samdanbury 6:37b6d0d56190 425 MQTTString topicName;
samdanbury 6:37b6d0d56190 426 Message msg;
samdanbury 6:37b6d0d56190 427 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
samdanbury 6:37b6d0d56190 428 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 429 goto exit;
samdanbury 6:37b6d0d56190 430 // if (msg.qos != QOS2)
samdanbury 6:37b6d0d56190 431 deliverMessage(topicName, msg);
samdanbury 6:37b6d0d56190 432 #if 0
samdanbury 6:37b6d0d56190 433 else if (isQoS2msgidFree(msg.id))
samdanbury 6:37b6d0d56190 434 {
samdanbury 6:37b6d0d56190 435 UseQoS2msgid(msg.id);
samdanbury 6:37b6d0d56190 436 deliverMessage(topicName, msg);
samdanbury 6:37b6d0d56190 437 }
samdanbury 6:37b6d0d56190 438 #endif
samdanbury 6:37b6d0d56190 439 if (msg.qos != QOS0)
samdanbury 6:37b6d0d56190 440 {
samdanbury 6:37b6d0d56190 441 if (msg.qos == QOS1)
samdanbury 6:37b6d0d56190 442 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
samdanbury 6:37b6d0d56190 443 else if (msg.qos == QOS2)
samdanbury 6:37b6d0d56190 444 len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
samdanbury 6:37b6d0d56190 445 if (len <= 0)
samdanbury 6:37b6d0d56190 446 rc = FAILURE;
samdanbury 6:37b6d0d56190 447 else
samdanbury 6:37b6d0d56190 448 rc = sendPacket(len, timer);
samdanbury 6:37b6d0d56190 449 if (rc == FAILURE)
samdanbury 6:37b6d0d56190 450 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 451 }
samdanbury 6:37b6d0d56190 452 break;
samdanbury 6:37b6d0d56190 453 case PUBREC:
samdanbury 6:37b6d0d56190 454 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 455 unsigned char dup, type;
samdanbury 6:37b6d0d56190 456 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 457 rc = FAILURE;
samdanbury 6:37b6d0d56190 458 else if ((len = MQTTSerialize_ack(buf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
samdanbury 6:37b6d0d56190 459 rc = FAILURE;
samdanbury 6:37b6d0d56190 460 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
samdanbury 6:37b6d0d56190 461 rc = FAILURE; // there was a problem
samdanbury 6:37b6d0d56190 462 if (rc == FAILURE)
samdanbury 6:37b6d0d56190 463 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 464 break;
samdanbury 6:37b6d0d56190 465 case PUBCOMP:
samdanbury 6:37b6d0d56190 466 break;
samdanbury 6:37b6d0d56190 467 case PINGRESP:
samdanbury 6:37b6d0d56190 468 ping_outstanding = false;
samdanbury 6:37b6d0d56190 469 break;
samdanbury 6:37b6d0d56190 470 }
samdanbury 6:37b6d0d56190 471 keepalive();
samdanbury 6:37b6d0d56190 472 exit:
samdanbury 6:37b6d0d56190 473 if (rc == SUCCESS)
samdanbury 6:37b6d0d56190 474 rc = packet_type;
samdanbury 6:37b6d0d56190 475 return rc;
samdanbury 6:37b6d0d56190 476 }
samdanbury 6:37b6d0d56190 477
samdanbury 6:37b6d0d56190 478
samdanbury 6:37b6d0d56190 479 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 480 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
samdanbury 6:37b6d0d56190 481 {
samdanbury 6:37b6d0d56190 482 int rc = FAILURE;
samdanbury 6:37b6d0d56190 483
samdanbury 6:37b6d0d56190 484 if (keepAliveInterval == 0)
samdanbury 6:37b6d0d56190 485 {
samdanbury 6:37b6d0d56190 486 rc = SUCCESS;
samdanbury 6:37b6d0d56190 487 goto exit;
samdanbury 6:37b6d0d56190 488 }
samdanbury 6:37b6d0d56190 489
samdanbury 6:37b6d0d56190 490 if (ping_timer.expired())
samdanbury 6:37b6d0d56190 491 {
samdanbury 6:37b6d0d56190 492 if (!ping_outstanding)
samdanbury 6:37b6d0d56190 493 {
samdanbury 6:37b6d0d56190 494 Timer timer = Timer(1000);
samdanbury 6:37b6d0d56190 495 int len = MQTTSerialize_pingreq(buf, MAX_MQTT_PACKET_SIZE);
samdanbury 6:37b6d0d56190 496 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
samdanbury 6:37b6d0d56190 497 ping_outstanding = true;
samdanbury 6:37b6d0d56190 498 }
samdanbury 6:37b6d0d56190 499 }
samdanbury 6:37b6d0d56190 500
samdanbury 6:37b6d0d56190 501 exit:
samdanbury 6:37b6d0d56190 502 return rc;
samdanbury 6:37b6d0d56190 503 }
samdanbury 6:37b6d0d56190 504
samdanbury 6:37b6d0d56190 505
samdanbury 6:37b6d0d56190 506 // only used in single-threaded mode where one command at a time is in process
samdanbury 6:37b6d0d56190 507 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 508 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
samdanbury 6:37b6d0d56190 509 {
samdanbury 6:37b6d0d56190 510 int rc = FAILURE;
samdanbury 6:37b6d0d56190 511
samdanbury 6:37b6d0d56190 512 do
samdanbury 6:37b6d0d56190 513 {
samdanbury 6:37b6d0d56190 514 if (timer.expired())
samdanbury 6:37b6d0d56190 515 break; // we timed out
samdanbury 6:37b6d0d56190 516 }
samdanbury 6:37b6d0d56190 517 while ((rc = cycle(timer)) != packet_type);
samdanbury 6:37b6d0d56190 518
samdanbury 6:37b6d0d56190 519 return rc;
samdanbury 6:37b6d0d56190 520 }
samdanbury 6:37b6d0d56190 521
samdanbury 6:37b6d0d56190 522
samdanbury 6:37b6d0d56190 523 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 524 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData* options)
samdanbury 6:37b6d0d56190 525 {
samdanbury 6:37b6d0d56190 526 Timer connect_timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 527 int rc = FAILURE;
samdanbury 6:37b6d0d56190 528 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
samdanbury 6:37b6d0d56190 529 int len = 0;
samdanbury 6:37b6d0d56190 530
samdanbury 6:37b6d0d56190 531 if (isconnected) // don't send connect packet again if we are already connected
samdanbury 6:37b6d0d56190 532 goto exit;
samdanbury 6:37b6d0d56190 533
samdanbury 6:37b6d0d56190 534 if (options == 0)
samdanbury 6:37b6d0d56190 535 options = &default_options; // set default options if none were supplied
samdanbury 6:37b6d0d56190 536
samdanbury 6:37b6d0d56190 537 this->keepAliveInterval = options->keepAliveInterval;
samdanbury 6:37b6d0d56190 538 ping_timer.countdown(this->keepAliveInterval);
samdanbury 6:37b6d0d56190 539 if ((len = MQTTSerialize_connect(buf, MAX_MQTT_PACKET_SIZE, options)) <= 0)
samdanbury 6:37b6d0d56190 540 goto exit;
samdanbury 6:37b6d0d56190 541 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
samdanbury 6:37b6d0d56190 542 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 543
samdanbury 6:37b6d0d56190 544 // this will be a blocking call, wait for the connack
samdanbury 6:37b6d0d56190 545 if (waitfor(CONNACK, connect_timer) == CONNACK)
samdanbury 6:37b6d0d56190 546 {
samdanbury 6:37b6d0d56190 547 unsigned char connack_rc = 255;
samdanbury 6:37b6d0d56190 548 bool sessionPresent = false;
samdanbury 6:37b6d0d56190 549 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
samdanbury 6:37b6d0d56190 550 rc = connack_rc;
samdanbury 6:37b6d0d56190 551 else
samdanbury 6:37b6d0d56190 552 rc = FAILURE;
samdanbury 6:37b6d0d56190 553 }
samdanbury 6:37b6d0d56190 554 else
samdanbury 6:37b6d0d56190 555 rc = FAILURE;
samdanbury 6:37b6d0d56190 556
samdanbury 6:37b6d0d56190 557 exit:
samdanbury 6:37b6d0d56190 558 if (rc == SUCCESS)
samdanbury 6:37b6d0d56190 559 isconnected = true;
samdanbury 6:37b6d0d56190 560 return rc;
samdanbury 6:37b6d0d56190 561 }
samdanbury 6:37b6d0d56190 562
samdanbury 6:37b6d0d56190 563
samdanbury 6:37b6d0d56190 564 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 565 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
samdanbury 6:37b6d0d56190 566 {
samdanbury 6:37b6d0d56190 567 int rc = FAILURE;
samdanbury 6:37b6d0d56190 568 Timer timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 569 int len = 0;
samdanbury 6:37b6d0d56190 570 MQTTString topic = {(char*)topicFilter, 0, 0};
samdanbury 6:37b6d0d56190 571
samdanbury 6:37b6d0d56190 572 if (!isconnected)
samdanbury 6:37b6d0d56190 573 goto exit;
samdanbury 6:37b6d0d56190 574
samdanbury 6:37b6d0d56190 575 len = MQTTSerialize_subscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
samdanbury 6:37b6d0d56190 576 if (len <= 0)
samdanbury 6:37b6d0d56190 577 goto exit;
samdanbury 6:37b6d0d56190 578 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
samdanbury 6:37b6d0d56190 579 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 580
samdanbury 6:37b6d0d56190 581 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
samdanbury 6:37b6d0d56190 582 {
samdanbury 6:37b6d0d56190 583 int count = 0, grantedQoS = -1;
samdanbury 6:37b6d0d56190 584 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 585 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
samdanbury 6:37b6d0d56190 586 rc = grantedQoS; // 0, 1, 2 or 0x80
samdanbury 6:37b6d0d56190 587 if (rc != 0x80)
samdanbury 6:37b6d0d56190 588 {
samdanbury 6:37b6d0d56190 589 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
samdanbury 6:37b6d0d56190 590 {
samdanbury 6:37b6d0d56190 591 if (messageHandlers[i].topicFilter == 0)
samdanbury 6:37b6d0d56190 592 {
samdanbury 6:37b6d0d56190 593 messageHandlers[i].topicFilter = topicFilter;
samdanbury 6:37b6d0d56190 594 messageHandlers[i].fp.attach(messageHandler);
samdanbury 6:37b6d0d56190 595 rc = 0;
samdanbury 6:37b6d0d56190 596 break;
samdanbury 6:37b6d0d56190 597 }
samdanbury 6:37b6d0d56190 598 }
samdanbury 6:37b6d0d56190 599 }
samdanbury 6:37b6d0d56190 600 }
samdanbury 6:37b6d0d56190 601 else
samdanbury 6:37b6d0d56190 602 rc = FAILURE;
samdanbury 6:37b6d0d56190 603
samdanbury 6:37b6d0d56190 604 exit:
samdanbury 6:37b6d0d56190 605 return rc;
samdanbury 6:37b6d0d56190 606 }
samdanbury 6:37b6d0d56190 607
samdanbury 6:37b6d0d56190 608
samdanbury 6:37b6d0d56190 609 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 610 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
samdanbury 6:37b6d0d56190 611 {
samdanbury 6:37b6d0d56190 612 int rc = FAILURE;
samdanbury 6:37b6d0d56190 613 Timer timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 614 MQTTString topic = {(char*)topicFilter, 0, 0};
samdanbury 6:37b6d0d56190 615 int len = 0;
samdanbury 6:37b6d0d56190 616
samdanbury 6:37b6d0d56190 617 if (!isconnected)
samdanbury 6:37b6d0d56190 618 goto exit;
samdanbury 6:37b6d0d56190 619
samdanbury 6:37b6d0d56190 620 if ((len = MQTTSerialize_unsubscribe(buf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
samdanbury 6:37b6d0d56190 621 goto exit;
samdanbury 6:37b6d0d56190 622 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
samdanbury 6:37b6d0d56190 623 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 624
samdanbury 6:37b6d0d56190 625 if (waitfor(UNSUBACK, timer) == UNSUBACK)
samdanbury 6:37b6d0d56190 626 {
samdanbury 6:37b6d0d56190 627 unsigned short mypacketid; // should be the same as the packetid above
samdanbury 6:37b6d0d56190 628 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
samdanbury 6:37b6d0d56190 629 rc = 0;
samdanbury 6:37b6d0d56190 630 }
samdanbury 6:37b6d0d56190 631 else
samdanbury 6:37b6d0d56190 632 rc = FAILURE;
samdanbury 6:37b6d0d56190 633
samdanbury 6:37b6d0d56190 634 exit:
samdanbury 6:37b6d0d56190 635 return rc;
samdanbury 6:37b6d0d56190 636 }
samdanbury 6:37b6d0d56190 637
samdanbury 6:37b6d0d56190 638
samdanbury 6:37b6d0d56190 639
samdanbury 6:37b6d0d56190 640 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 641 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message* message)
samdanbury 6:37b6d0d56190 642 {
samdanbury 6:37b6d0d56190 643 int rc = FAILURE;
samdanbury 6:37b6d0d56190 644 Timer timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 645 MQTTString topicString = {(char*)topicName, 0, 0};
samdanbury 6:37b6d0d56190 646 int len = 0;
samdanbury 6:37b6d0d56190 647
samdanbury 6:37b6d0d56190 648 if (!isconnected)
samdanbury 6:37b6d0d56190 649 goto exit;
samdanbury 6:37b6d0d56190 650
samdanbury 6:37b6d0d56190 651 if (message->qos == QOS1 || message->qos == QOS2)
samdanbury 6:37b6d0d56190 652 message->id = packetid.getNext();
samdanbury 6:37b6d0d56190 653
samdanbury 6:37b6d0d56190 654 len = MQTTSerialize_publish(buf, MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id,
samdanbury 6:37b6d0d56190 655 topicString, (unsigned char*)message->payload, message->payloadlen);
samdanbury 6:37b6d0d56190 656 if (len <= 0)
samdanbury 6:37b6d0d56190 657 goto exit;
samdanbury 6:37b6d0d56190 658 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
samdanbury 6:37b6d0d56190 659 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 660
samdanbury 6:37b6d0d56190 661 if (message->qos == QOS1)
samdanbury 6:37b6d0d56190 662 {
samdanbury 6:37b6d0d56190 663 if (waitfor(PUBACK, timer) == PUBACK)
samdanbury 6:37b6d0d56190 664 {
samdanbury 6:37b6d0d56190 665 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 666 unsigned char dup, type;
samdanbury 6:37b6d0d56190 667 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 668 rc = FAILURE;
samdanbury 6:37b6d0d56190 669 }
samdanbury 6:37b6d0d56190 670 else
samdanbury 6:37b6d0d56190 671 rc = FAILURE;
samdanbury 6:37b6d0d56190 672 }
samdanbury 6:37b6d0d56190 673 else if (message->qos == QOS2)
samdanbury 6:37b6d0d56190 674 {
samdanbury 6:37b6d0d56190 675 if (waitfor(PUBCOMP, timer) == PUBCOMP)
samdanbury 6:37b6d0d56190 676 {
samdanbury 6:37b6d0d56190 677 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 678 unsigned char dup, type;
samdanbury 6:37b6d0d56190 679 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 680 rc = FAILURE;
samdanbury 6:37b6d0d56190 681 }
samdanbury 6:37b6d0d56190 682 else
samdanbury 6:37b6d0d56190 683 rc = FAILURE;
samdanbury 6:37b6d0d56190 684 }
samdanbury 6:37b6d0d56190 685
samdanbury 6:37b6d0d56190 686 exit:
samdanbury 6:37b6d0d56190 687 return rc;
samdanbury 6:37b6d0d56190 688 }
samdanbury 6:37b6d0d56190 689
samdanbury 6:37b6d0d56190 690
samdanbury 6:37b6d0d56190 691 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 692 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
samdanbury 6:37b6d0d56190 693 {
samdanbury 6:37b6d0d56190 694 int rc = FAILURE;
samdanbury 6:37b6d0d56190 695 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
samdanbury 6:37b6d0d56190 696 int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
samdanbury 6:37b6d0d56190 697 if (len > 0)
samdanbury 6:37b6d0d56190 698 rc = sendPacket(len, timer); // send the disconnect packet
samdanbury 6:37b6d0d56190 699
samdanbury 6:37b6d0d56190 700 isconnected = false;
samdanbury 6:37b6d0d56190 701 return rc;
samdanbury 6:37b6d0d56190 702 }
samdanbury 6:37b6d0d56190 703
samdanbury 6:37b6d0d56190 704
samdanbury 6:37b6d0d56190 705 #endif