Port to C027 (using AppShield and Ethernet)

Dependencies:   C12832 EthernetInterface LM75B MMA7660 MQTT mbed-rtos mbed

Fork of IBMIoTClientEthernetExample by IBM Watson IoT

Committer:
icraggs
Date:
Wed Oct 01 13:27:35 2014 +0000
Revision:
8:80d49dd91542
Parent:
6:37b6d0d56190
Remove conditional compilation for IBM IoT settings

Who changed what in which revision?

UserRevisionLine numberNew contents of line
samdanbury 6:37b6d0d56190 1 /*******************************************************************************
samdanbury 6:37b6d0d56190 2 * Copyright (c) 2014 IBM Corp.
samdanbury 6:37b6d0d56190 3 *
samdanbury 6:37b6d0d56190 4 * All rights reserved. This program and the accompanying materials
samdanbury 6:37b6d0d56190 5 * are made available under the terms of the Eclipse Public License v1.0
samdanbury 6:37b6d0d56190 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
samdanbury 6:37b6d0d56190 7 *
samdanbury 6:37b6d0d56190 8 * The Eclipse Public License is available at
samdanbury 6:37b6d0d56190 9 * http://www.eclipse.org/legal/epl-v10.html
samdanbury 6:37b6d0d56190 10 * and the Eclipse Distribution License is available at
samdanbury 6:37b6d0d56190 11 * http://www.eclipse.org/org/documents/edl-v10.php.
samdanbury 6:37b6d0d56190 12 *
samdanbury 6:37b6d0d56190 13 * Contributors:
samdanbury 6:37b6d0d56190 14 * Ian Craggs - initial API and implementation and/or initial documentation
samdanbury 6:37b6d0d56190 15 *******************************************************************************/
samdanbury 6:37b6d0d56190 16
samdanbury 6:37b6d0d56190 17 #if !defined(MQTTCLIENT_H)
samdanbury 6:37b6d0d56190 18 #define MQTTCLIENT_H
samdanbury 6:37b6d0d56190 19
samdanbury 6:37b6d0d56190 20 #include "FP.h"
samdanbury 6:37b6d0d56190 21 #include "MQTTPacket.h"
samdanbury 6:37b6d0d56190 22 #include "stdio.h"
icraggs 8:80d49dd91542 23 #include "MQTTLogging.h"
icraggs 8:80d49dd91542 24
icraggs 8:80d49dd91542 25 #if !defined(MQTTCLIENT_QOS1)
icraggs 8:80d49dd91542 26 #define MQTTCLIENT_QOS1 1
icraggs 8:80d49dd91542 27 #endif
icraggs 8:80d49dd91542 28 #if !defined(MQTTCLIENT_QOS2)
icraggs 8:80d49dd91542 29 #define MQTTCLIENT_QOS2 0
icraggs 8:80d49dd91542 30 #endif
samdanbury 6:37b6d0d56190 31
samdanbury 6:37b6d0d56190 32 namespace MQTT
samdanbury 6:37b6d0d56190 33 {
samdanbury 6:37b6d0d56190 34
samdanbury 6:37b6d0d56190 35
samdanbury 6:37b6d0d56190 36 enum QoS { QOS0, QOS1, QOS2 };
samdanbury 6:37b6d0d56190 37
samdanbury 6:37b6d0d56190 38 // all failure return codes must be negative
samdanbury 6:37b6d0d56190 39 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
samdanbury 6:37b6d0d56190 40
samdanbury 6:37b6d0d56190 41
samdanbury 6:37b6d0d56190 42 struct Message
samdanbury 6:37b6d0d56190 43 {
samdanbury 6:37b6d0d56190 44 enum QoS qos;
samdanbury 6:37b6d0d56190 45 bool retained;
samdanbury 6:37b6d0d56190 46 bool dup;
samdanbury 6:37b6d0d56190 47 unsigned short id;
samdanbury 6:37b6d0d56190 48 void *payload;
samdanbury 6:37b6d0d56190 49 size_t payloadlen;
samdanbury 6:37b6d0d56190 50 };
samdanbury 6:37b6d0d56190 51
samdanbury 6:37b6d0d56190 52
samdanbury 6:37b6d0d56190 53 struct MessageData
samdanbury 6:37b6d0d56190 54 {
samdanbury 6:37b6d0d56190 55 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
samdanbury 6:37b6d0d56190 56 { }
icraggs 8:80d49dd91542 57
samdanbury 6:37b6d0d56190 58 struct Message &message;
samdanbury 6:37b6d0d56190 59 MQTTString &topicName;
samdanbury 6:37b6d0d56190 60 };
samdanbury 6:37b6d0d56190 61
samdanbury 6:37b6d0d56190 62
samdanbury 6:37b6d0d56190 63 class PacketId
samdanbury 6:37b6d0d56190 64 {
samdanbury 6:37b6d0d56190 65 public:
samdanbury 6:37b6d0d56190 66 PacketId()
samdanbury 6:37b6d0d56190 67 {
samdanbury 6:37b6d0d56190 68 next = 0;
samdanbury 6:37b6d0d56190 69 }
icraggs 8:80d49dd91542 70
samdanbury 6:37b6d0d56190 71 int getNext()
samdanbury 6:37b6d0d56190 72 {
samdanbury 6:37b6d0d56190 73 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
samdanbury 6:37b6d0d56190 74 }
icraggs 8:80d49dd91542 75
samdanbury 6:37b6d0d56190 76 private:
samdanbury 6:37b6d0d56190 77 static const int MAX_PACKET_ID = 65535;
samdanbury 6:37b6d0d56190 78 int next;
samdanbury 6:37b6d0d56190 79 };
samdanbury 6:37b6d0d56190 80
samdanbury 6:37b6d0d56190 81
samdanbury 6:37b6d0d56190 82 /**
samdanbury 6:37b6d0d56190 83 * @class Client
samdanbury 6:37b6d0d56190 84 * @brief blocking, non-threaded MQTT client API
icraggs 8:80d49dd91542 85 *
samdanbury 6:37b6d0d56190 86 * This version of the API blocks on all method calls, until they are complete. This means that only one
icraggs 8:80d49dd91542 87 * MQTT request can be in process at any one time.
samdanbury 6:37b6d0d56190 88 * @param Network a network class which supports send, receive
icraggs 8:80d49dd91542 89 * @param Timer a timer class with the methods:
icraggs 8:80d49dd91542 90 */
samdanbury 6:37b6d0d56190 91 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
samdanbury 6:37b6d0d56190 92 class Client
samdanbury 6:37b6d0d56190 93 {
icraggs 8:80d49dd91542 94
samdanbury 6:37b6d0d56190 95 public:
icraggs 8:80d49dd91542 96
samdanbury 6:37b6d0d56190 97 typedef void (*messageHandler)(MessageData&);
samdanbury 6:37b6d0d56190 98
samdanbury 6:37b6d0d56190 99 /** Construct the client
samdanbury 6:37b6d0d56190 100 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
samdanbury 6:37b6d0d56190 101 * before calling MQTT connect
samdanbury 6:37b6d0d56190 102 * @param limits an instance of the Limit class - to alter limits as required
samdanbury 6:37b6d0d56190 103 */
icraggs 8:80d49dd91542 104 Client(Network& network, unsigned int command_timeout_ms = 30000);
icraggs 8:80d49dd91542 105
samdanbury 6:37b6d0d56190 106 /** Set the default message handling callback - used for any message which does not match a subscription message handler
samdanbury 6:37b6d0d56190 107 * @param mh - pointer to the callback function
samdanbury 6:37b6d0d56190 108 */
samdanbury 6:37b6d0d56190 109 void setDefaultMessageHandler(messageHandler mh)
samdanbury 6:37b6d0d56190 110 {
samdanbury 6:37b6d0d56190 111 defaultMessageHandler.attach(mh);
samdanbury 6:37b6d0d56190 112 }
samdanbury 6:37b6d0d56190 113
icraggs 8:80d49dd91542 114 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 8:80d49dd91542 115 * The nework object must be connected to the network endpoint before calling this
icraggs 8:80d49dd91542 116 * Default connect options are used
icraggs 8:80d49dd91542 117 * @return success code -
icraggs 8:80d49dd91542 118 */
icraggs 8:80d49dd91542 119 int connect();
samdanbury 6:37b6d0d56190 120
samdanbury 6:37b6d0d56190 121 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
icraggs 8:80d49dd91542 122 * The nework object must be connected to the network endpoint before calling this
samdanbury 6:37b6d0d56190 123 * @param options - connect options
icraggs 8:80d49dd91542 124 * @return success code -
icraggs 8:80d49dd91542 125 */
icraggs 8:80d49dd91542 126 int connect(MQTTPacket_connectData& options);
icraggs 8:80d49dd91542 127
samdanbury 6:37b6d0d56190 128 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
samdanbury 6:37b6d0d56190 129 * @param topic - the topic to publish to
samdanbury 6:37b6d0d56190 130 * @param message - the message to send
icraggs 8:80d49dd91542 131 * @return success code -
icraggs 8:80d49dd91542 132 */
icraggs 8:80d49dd91542 133 int publish(const char* topicName, Message& message);
icraggs 8:80d49dd91542 134
icraggs 8:80d49dd91542 135 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 8:80d49dd91542 136 * @param topic - the topic to publish to
icraggs 8:80d49dd91542 137 * @param payload - the data to send
icraggs 8:80d49dd91542 138 * @param payloadlen - the length of the data
icraggs 8:80d49dd91542 139 * @param qos - the QoS to send the publish at
icraggs 8:80d49dd91542 140 * @param retained - whether the message should be retained
icraggs 8:80d49dd91542 141 * @return success code -
icraggs 8:80d49dd91542 142 */
icraggs 8:80d49dd91542 143 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
icraggs 8:80d49dd91542 144
icraggs 8:80d49dd91542 145 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
icraggs 8:80d49dd91542 146 * @param topic - the topic to publish to
icraggs 8:80d49dd91542 147 * @param payload - the data to send
icraggs 8:80d49dd91542 148 * @param payloadlen - the length of the data
icraggs 8:80d49dd91542 149 * @param id - the packet id used - returned
icraggs 8:80d49dd91542 150 * @param qos - the QoS to send the publish at
icraggs 8:80d49dd91542 151 * @param retained - whether the message should be retained
icraggs 8:80d49dd91542 152 * @return success code -
icraggs 8:80d49dd91542 153 */
icraggs 8:80d49dd91542 154 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
icraggs 8:80d49dd91542 155
samdanbury 6:37b6d0d56190 156 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
samdanbury 6:37b6d0d56190 157 * @param topicFilter - a topic pattern which can include wildcards
samdanbury 6:37b6d0d56190 158 * @param qos - the MQTT QoS to subscribe at
samdanbury 6:37b6d0d56190 159 * @param mh - the callback function to be invoked when a message is received for this subscription
icraggs 8:80d49dd91542 160 * @return success code -
icraggs 8:80d49dd91542 161 */
samdanbury 6:37b6d0d56190 162 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
icraggs 8:80d49dd91542 163
samdanbury 6:37b6d0d56190 164 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
samdanbury 6:37b6d0d56190 165 * @param topicFilter - a topic pattern which can include wildcards
icraggs 8:80d49dd91542 166 * @return success code -
icraggs 8:80d49dd91542 167 */
samdanbury 6:37b6d0d56190 168 int unsubscribe(const char* topicFilter);
icraggs 8:80d49dd91542 169
samdanbury 6:37b6d0d56190 170 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
icraggs 8:80d49dd91542 171 * @return success code -
samdanbury 6:37b6d0d56190 172 */
samdanbury 6:37b6d0d56190 173 int disconnect();
icraggs 8:80d49dd91542 174
samdanbury 6:37b6d0d56190 175 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
icraggs 8:80d49dd91542 176 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
samdanbury 6:37b6d0d56190 177 * received.
samdanbury 6:37b6d0d56190 178 * @param timeout_ms the time to wait, in milliseconds
samdanbury 6:37b6d0d56190 179 * @return success code - on failure, this means the client has disconnected
samdanbury 6:37b6d0d56190 180 */
icraggs 8:80d49dd91542 181 int yield(unsigned long timeout_ms = 1000L);
icraggs 8:80d49dd91542 182
icraggs 8:80d49dd91542 183 /** Is the client connected?
icraggs 8:80d49dd91542 184 * @return flag - is the client connected or not?
icraggs 8:80d49dd91542 185 */
icraggs 8:80d49dd91542 186 bool isConnected()
icraggs 8:80d49dd91542 187 {
icraggs 8:80d49dd91542 188 return isconnected;
icraggs 8:80d49dd91542 189 }
icraggs 8:80d49dd91542 190
samdanbury 6:37b6d0d56190 191 private:
samdanbury 6:37b6d0d56190 192
samdanbury 6:37b6d0d56190 193 int cycle(Timer& timer);
samdanbury 6:37b6d0d56190 194 int waitfor(int packet_type, Timer& timer);
samdanbury 6:37b6d0d56190 195 int keepalive();
icraggs 8:80d49dd91542 196 int publish(int len, Timer& timer, enum QoS qos);
samdanbury 6:37b6d0d56190 197
samdanbury 6:37b6d0d56190 198 int decodePacket(int* value, int timeout);
samdanbury 6:37b6d0d56190 199 int readPacket(Timer& timer);
samdanbury 6:37b6d0d56190 200 int sendPacket(int length, Timer& timer);
samdanbury 6:37b6d0d56190 201 int deliverMessage(MQTTString& topicName, Message& message);
samdanbury 6:37b6d0d56190 202 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
icraggs 8:80d49dd91542 203
samdanbury 6:37b6d0d56190 204 Network& ipstack;
icraggs 8:80d49dd91542 205 unsigned long command_timeout_ms;
samdanbury 6:37b6d0d56190 206
icraggs 8:80d49dd91542 207 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
icraggs 8:80d49dd91542 208 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
icraggs 8:80d49dd91542 209
icraggs 8:80d49dd91542 210 Timer last_sent, last_received;
samdanbury 6:37b6d0d56190 211 unsigned int keepAliveInterval;
samdanbury 6:37b6d0d56190 212 bool ping_outstanding;
icraggs 8:80d49dd91542 213 bool cleansession;
icraggs 8:80d49dd91542 214
samdanbury 6:37b6d0d56190 215 PacketId packetid;
icraggs 8:80d49dd91542 216
samdanbury 6:37b6d0d56190 217 struct MessageHandlers
samdanbury 6:37b6d0d56190 218 {
samdanbury 6:37b6d0d56190 219 const char* topicFilter;
samdanbury 6:37b6d0d56190 220 FP<void, MessageData&> fp;
samdanbury 6:37b6d0d56190 221 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
icraggs 8:80d49dd91542 222
samdanbury 6:37b6d0d56190 223 FP<void, MessageData&> defaultMessageHandler;
icraggs 8:80d49dd91542 224
samdanbury 6:37b6d0d56190 225 bool isconnected;
icraggs 8:80d49dd91542 226
icraggs 8:80d49dd91542 227 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 228 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
icraggs 8:80d49dd91542 229 int inflightLen;
icraggs 8:80d49dd91542 230 unsigned short inflightMsgid;
icraggs 8:80d49dd91542 231 enum QoS inflightQoS;
icraggs 8:80d49dd91542 232 #endif
icraggs 8:80d49dd91542 233
icraggs 8:80d49dd91542 234 #if MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 235 bool pubrel;
icraggs 8:80d49dd91542 236 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
icraggs 8:80d49dd91542 237 #define MAX_INCOMING_QOS2_MESSAGES 10
icraggs 8:80d49dd91542 238 #endif
icraggs 8:80d49dd91542 239 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
icraggs 8:80d49dd91542 240 bool isQoS2msgidFree(unsigned short id);
icraggs 8:80d49dd91542 241 bool useQoS2msgid(unsigned short id);
samdanbury 6:37b6d0d56190 242 #endif
samdanbury 6:37b6d0d56190 243
samdanbury 6:37b6d0d56190 244 };
samdanbury 6:37b6d0d56190 245
samdanbury 6:37b6d0d56190 246 }
samdanbury 6:37b6d0d56190 247
samdanbury 6:37b6d0d56190 248
icraggs 8:80d49dd91542 249 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 250 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
samdanbury 6:37b6d0d56190 251 {
icraggs 8:80d49dd91542 252 last_sent = Timer();
icraggs 8:80d49dd91542 253 last_received = Timer();
icraggs 8:80d49dd91542 254 ping_outstanding = false;
samdanbury 6:37b6d0d56190 255 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
samdanbury 6:37b6d0d56190 256 messageHandlers[i].topicFilter = 0;
icraggs 8:80d49dd91542 257 this->command_timeout_ms = command_timeout_ms;
samdanbury 6:37b6d0d56190 258 isconnected = false;
icraggs 8:80d49dd91542 259
icraggs 8:80d49dd91542 260 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 261 inflightMsgid = 0;
icraggs 8:80d49dd91542 262 inflightQoS = QOS0;
icraggs 8:80d49dd91542 263 #endif
icraggs 8:80d49dd91542 264
icraggs 8:80d49dd91542 265
icraggs 8:80d49dd91542 266 #if MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 267 pubrel = false;
icraggs 8:80d49dd91542 268 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 8:80d49dd91542 269 incomingQoS2messages[i] = 0;
icraggs 8:80d49dd91542 270 #endif
icraggs 8:80d49dd91542 271 }
icraggs 8:80d49dd91542 272
icraggs 8:80d49dd91542 273 #if MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 274 template<class Network, class Timer, int a, int b>
icraggs 8:80d49dd91542 275 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
icraggs 8:80d49dd91542 276 {
icraggs 8:80d49dd91542 277 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 8:80d49dd91542 278 {
icraggs 8:80d49dd91542 279 if (incomingQoS2messages[i] == id)
icraggs 8:80d49dd91542 280 return false;
icraggs 8:80d49dd91542 281 }
icraggs 8:80d49dd91542 282 return true;
samdanbury 6:37b6d0d56190 283 }
samdanbury 6:37b6d0d56190 284
samdanbury 6:37b6d0d56190 285
icraggs 8:80d49dd91542 286 template<class Network, class Timer, int a, int b>
icraggs 8:80d49dd91542 287 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
icraggs 8:80d49dd91542 288 {
icraggs 8:80d49dd91542 289 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
icraggs 8:80d49dd91542 290 {
icraggs 8:80d49dd91542 291 if (incomingQoS2messages[i] == 0)
icraggs 8:80d49dd91542 292 {
icraggs 8:80d49dd91542 293 incomingQoS2messages[i] = id;
icraggs 8:80d49dd91542 294 return true;
icraggs 8:80d49dd91542 295 }
icraggs 8:80d49dd91542 296 }
icraggs 8:80d49dd91542 297 return false;
icraggs 8:80d49dd91542 298 }
icraggs 8:80d49dd91542 299 #endif
icraggs 8:80d49dd91542 300
icraggs 8:80d49dd91542 301
icraggs 8:80d49dd91542 302 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 303 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
samdanbury 6:37b6d0d56190 304 {
icraggs 8:80d49dd91542 305 int rc = FAILURE,
samdanbury 6:37b6d0d56190 306 sent = 0;
icraggs 8:80d49dd91542 307
samdanbury 6:37b6d0d56190 308 while (sent < length && !timer.expired())
samdanbury 6:37b6d0d56190 309 {
icraggs 8:80d49dd91542 310 rc = ipstack.write(&sendbuf[sent], length, timer.left_ms());
samdanbury 6:37b6d0d56190 311 if (rc < 0) // there was an error writing the data
samdanbury 6:37b6d0d56190 312 break;
samdanbury 6:37b6d0d56190 313 sent += rc;
samdanbury 6:37b6d0d56190 314 }
samdanbury 6:37b6d0d56190 315 if (sent == length)
samdanbury 6:37b6d0d56190 316 {
icraggs 8:80d49dd91542 317 if (this->keepAliveInterval > 0)
icraggs 8:80d49dd91542 318 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
samdanbury 6:37b6d0d56190 319 rc = SUCCESS;
samdanbury 6:37b6d0d56190 320 }
samdanbury 6:37b6d0d56190 321 else
samdanbury 6:37b6d0d56190 322 rc = FAILURE;
icraggs 8:80d49dd91542 323
icraggs 8:80d49dd91542 324 #if defined(MQTT_DEBUG)
icraggs 8:80d49dd91542 325 char printbuf[50];
icraggs 8:80d49dd91542 326 DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length));
icraggs 8:80d49dd91542 327 #endif
samdanbury 6:37b6d0d56190 328 return rc;
samdanbury 6:37b6d0d56190 329 }
samdanbury 6:37b6d0d56190 330
samdanbury 6:37b6d0d56190 331
icraggs 8:80d49dd91542 332 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 333 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
samdanbury 6:37b6d0d56190 334 {
samdanbury 6:37b6d0d56190 335 unsigned char c;
samdanbury 6:37b6d0d56190 336 int multiplier = 1;
samdanbury 6:37b6d0d56190 337 int len = 0;
samdanbury 6:37b6d0d56190 338 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
samdanbury 6:37b6d0d56190 339
samdanbury 6:37b6d0d56190 340 *value = 0;
samdanbury 6:37b6d0d56190 341 do
samdanbury 6:37b6d0d56190 342 {
samdanbury 6:37b6d0d56190 343 int rc = MQTTPACKET_READ_ERROR;
samdanbury 6:37b6d0d56190 344
samdanbury 6:37b6d0d56190 345 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
samdanbury 6:37b6d0d56190 346 {
samdanbury 6:37b6d0d56190 347 rc = MQTTPACKET_READ_ERROR; /* bad data */
samdanbury 6:37b6d0d56190 348 goto exit;
samdanbury 6:37b6d0d56190 349 }
samdanbury 6:37b6d0d56190 350 rc = ipstack.read(&c, 1, timeout);
samdanbury 6:37b6d0d56190 351 if (rc != 1)
samdanbury 6:37b6d0d56190 352 goto exit;
samdanbury 6:37b6d0d56190 353 *value += (c & 127) * multiplier;
samdanbury 6:37b6d0d56190 354 multiplier *= 128;
samdanbury 6:37b6d0d56190 355 } while ((c & 128) != 0);
samdanbury 6:37b6d0d56190 356 exit:
samdanbury 6:37b6d0d56190 357 return len;
samdanbury 6:37b6d0d56190 358 }
samdanbury 6:37b6d0d56190 359
samdanbury 6:37b6d0d56190 360
samdanbury 6:37b6d0d56190 361 /**
samdanbury 6:37b6d0d56190 362 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 8:80d49dd91542 363 * the packets can be retried.
samdanbury 6:37b6d0d56190 364 * @param timeout the max time to wait for the packet read to complete, in milliseconds
samdanbury 6:37b6d0d56190 365 * @return the MQTT packet type, or -1 if none
samdanbury 6:37b6d0d56190 366 */
icraggs 8:80d49dd91542 367 template<class Network, class Timer, int a, int b>
icraggs 8:80d49dd91542 368 int MQTT::Client<Network, Timer, a, b>::readPacket(Timer& timer)
samdanbury 6:37b6d0d56190 369 {
samdanbury 6:37b6d0d56190 370 int rc = FAILURE;
samdanbury 6:37b6d0d56190 371 MQTTHeader header = {0};
samdanbury 6:37b6d0d56190 372 int len = 0;
samdanbury 6:37b6d0d56190 373 int rem_len = 0;
samdanbury 6:37b6d0d56190 374
samdanbury 6:37b6d0d56190 375 /* 1. read the header byte. This has the packet type in it */
samdanbury 6:37b6d0d56190 376 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
samdanbury 6:37b6d0d56190 377 goto exit;
samdanbury 6:37b6d0d56190 378
samdanbury 6:37b6d0d56190 379 len = 1;
samdanbury 6:37b6d0d56190 380 /* 2. read the remaining length. This is variable in itself */
samdanbury 6:37b6d0d56190 381 decodePacket(&rem_len, timer.left_ms());
icraggs 8:80d49dd91542 382 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
samdanbury 6:37b6d0d56190 383
samdanbury 6:37b6d0d56190 384 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 8:80d49dd91542 385 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
samdanbury 6:37b6d0d56190 386 goto exit;
samdanbury 6:37b6d0d56190 387
samdanbury 6:37b6d0d56190 388 header.byte = readbuf[0];
samdanbury 6:37b6d0d56190 389 rc = header.bits.type;
icraggs 8:80d49dd91542 390 if (this->keepAliveInterval > 0)
icraggs 8:80d49dd91542 391 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
samdanbury 6:37b6d0d56190 392 exit:
icraggs 8:80d49dd91542 393
icraggs 8:80d49dd91542 394 #if defined(MQTT_DEBUG)
icraggs 8:80d49dd91542 395 char printbuf[50];
icraggs 8:80d49dd91542 396 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len));
icraggs 8:80d49dd91542 397 #endif
samdanbury 6:37b6d0d56190 398 return rc;
samdanbury 6:37b6d0d56190 399 }
samdanbury 6:37b6d0d56190 400
samdanbury 6:37b6d0d56190 401
samdanbury 6:37b6d0d56190 402 // assume topic filter and name is in correct format
samdanbury 6:37b6d0d56190 403 // # can only be at end
samdanbury 6:37b6d0d56190 404 // + and # can only be next to separator
icraggs 8:80d49dd91542 405 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 406 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
samdanbury 6:37b6d0d56190 407 {
samdanbury 6:37b6d0d56190 408 char* curf = topicFilter;
samdanbury 6:37b6d0d56190 409 char* curn = topicName.lenstring.data;
samdanbury 6:37b6d0d56190 410 char* curn_end = curn + topicName.lenstring.len;
icraggs 8:80d49dd91542 411
samdanbury 6:37b6d0d56190 412 while (*curf && curn < curn_end)
samdanbury 6:37b6d0d56190 413 {
samdanbury 6:37b6d0d56190 414 if (*curn == '/' && *curf != '/')
samdanbury 6:37b6d0d56190 415 break;
samdanbury 6:37b6d0d56190 416 if (*curf != '+' && *curf != '#' && *curf != *curn)
samdanbury 6:37b6d0d56190 417 break;
samdanbury 6:37b6d0d56190 418 if (*curf == '+')
samdanbury 6:37b6d0d56190 419 { // skip until we meet the next separator, or end of string
samdanbury 6:37b6d0d56190 420 char* nextpos = curn + 1;
samdanbury 6:37b6d0d56190 421 while (nextpos < curn_end && *nextpos != '/')
samdanbury 6:37b6d0d56190 422 nextpos = ++curn + 1;
samdanbury 6:37b6d0d56190 423 }
samdanbury 6:37b6d0d56190 424 else if (*curf == '#')
samdanbury 6:37b6d0d56190 425 curn = curn_end - 1; // skip until end of string
samdanbury 6:37b6d0d56190 426 curf++;
samdanbury 6:37b6d0d56190 427 curn++;
samdanbury 6:37b6d0d56190 428 };
icraggs 8:80d49dd91542 429
samdanbury 6:37b6d0d56190 430 return (curn == curn_end) && (*curf == '\0');
samdanbury 6:37b6d0d56190 431 }
samdanbury 6:37b6d0d56190 432
samdanbury 6:37b6d0d56190 433
samdanbury 6:37b6d0d56190 434
icraggs 8:80d49dd91542 435 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 436 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
samdanbury 6:37b6d0d56190 437 {
samdanbury 6:37b6d0d56190 438 int rc = FAILURE;
samdanbury 6:37b6d0d56190 439
samdanbury 6:37b6d0d56190 440 // we have to find the right message handler - indexed by topic
samdanbury 6:37b6d0d56190 441 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
samdanbury 6:37b6d0d56190 442 {
samdanbury 6:37b6d0d56190 443 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
samdanbury 6:37b6d0d56190 444 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
samdanbury 6:37b6d0d56190 445 {
samdanbury 6:37b6d0d56190 446 if (messageHandlers[i].fp.attached())
samdanbury 6:37b6d0d56190 447 {
samdanbury 6:37b6d0d56190 448 MessageData md(topicName, message);
samdanbury 6:37b6d0d56190 449 messageHandlers[i].fp(md);
samdanbury 6:37b6d0d56190 450 rc = SUCCESS;
samdanbury 6:37b6d0d56190 451 }
samdanbury 6:37b6d0d56190 452 }
samdanbury 6:37b6d0d56190 453 }
icraggs 8:80d49dd91542 454
icraggs 8:80d49dd91542 455 if (rc == FAILURE && defaultMessageHandler.attached())
samdanbury 6:37b6d0d56190 456 {
samdanbury 6:37b6d0d56190 457 MessageData md(topicName, message);
samdanbury 6:37b6d0d56190 458 defaultMessageHandler(md);
samdanbury 6:37b6d0d56190 459 rc = SUCCESS;
icraggs 8:80d49dd91542 460 }
icraggs 8:80d49dd91542 461
samdanbury 6:37b6d0d56190 462 return rc;
samdanbury 6:37b6d0d56190 463 }
samdanbury 6:37b6d0d56190 464
samdanbury 6:37b6d0d56190 465
samdanbury 6:37b6d0d56190 466
icraggs 8:80d49dd91542 467 template<class Network, class Timer, int a, int b>
icraggs 8:80d49dd91542 468 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
samdanbury 6:37b6d0d56190 469 {
samdanbury 6:37b6d0d56190 470 int rc = SUCCESS;
samdanbury 6:37b6d0d56190 471 Timer timer = Timer();
icraggs 8:80d49dd91542 472
samdanbury 6:37b6d0d56190 473 timer.countdown_ms(timeout_ms);
samdanbury 6:37b6d0d56190 474 while (!timer.expired())
samdanbury 6:37b6d0d56190 475 {
samdanbury 6:37b6d0d56190 476 if (cycle(timer) == FAILURE)
samdanbury 6:37b6d0d56190 477 {
samdanbury 6:37b6d0d56190 478 rc = FAILURE;
samdanbury 6:37b6d0d56190 479 break;
samdanbury 6:37b6d0d56190 480 }
samdanbury 6:37b6d0d56190 481 }
icraggs 8:80d49dd91542 482
samdanbury 6:37b6d0d56190 483 return rc;
samdanbury 6:37b6d0d56190 484 }
samdanbury 6:37b6d0d56190 485
samdanbury 6:37b6d0d56190 486
icraggs 8:80d49dd91542 487 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 488 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
samdanbury 6:37b6d0d56190 489 {
samdanbury 6:37b6d0d56190 490 /* get one piece of work off the wire and one pass through */
samdanbury 6:37b6d0d56190 491
samdanbury 6:37b6d0d56190 492 // read the socket, see what work is due
samdanbury 6:37b6d0d56190 493 unsigned short packet_type = readPacket(timer);
icraggs 8:80d49dd91542 494
samdanbury 6:37b6d0d56190 495 int len = 0,
samdanbury 6:37b6d0d56190 496 rc = SUCCESS;
samdanbury 6:37b6d0d56190 497
samdanbury 6:37b6d0d56190 498 switch (packet_type)
samdanbury 6:37b6d0d56190 499 {
samdanbury 6:37b6d0d56190 500 case CONNACK:
samdanbury 6:37b6d0d56190 501 case PUBACK:
samdanbury 6:37b6d0d56190 502 case SUBACK:
samdanbury 6:37b6d0d56190 503 break;
samdanbury 6:37b6d0d56190 504 case PUBLISH:
samdanbury 6:37b6d0d56190 505 MQTTString topicName;
samdanbury 6:37b6d0d56190 506 Message msg;
samdanbury 6:37b6d0d56190 507 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
samdanbury 6:37b6d0d56190 508 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 509 goto exit;
icraggs 8:80d49dd91542 510 #if MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 511 if (msg.qos != QOS2)
icraggs 8:80d49dd91542 512 #endif
samdanbury 6:37b6d0d56190 513 deliverMessage(topicName, msg);
icraggs 8:80d49dd91542 514 #if MQTTCLIENT_QOS2
samdanbury 6:37b6d0d56190 515 else if (isQoS2msgidFree(msg.id))
samdanbury 6:37b6d0d56190 516 {
icraggs 8:80d49dd91542 517 if (useQoS2msgid(msg.id))
icraggs 8:80d49dd91542 518 deliverMessage(topicName, msg);
icraggs 8:80d49dd91542 519 else
icraggs 8:80d49dd91542 520 WARN("Maximum number of incoming QoS2 messages exceeded");
icraggs 8:80d49dd91542 521 }
samdanbury 6:37b6d0d56190 522 #endif
icraggs 8:80d49dd91542 523 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
samdanbury 6:37b6d0d56190 524 if (msg.qos != QOS0)
samdanbury 6:37b6d0d56190 525 {
samdanbury 6:37b6d0d56190 526 if (msg.qos == QOS1)
icraggs 8:80d49dd91542 527 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
samdanbury 6:37b6d0d56190 528 else if (msg.qos == QOS2)
icraggs 8:80d49dd91542 529 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
samdanbury 6:37b6d0d56190 530 if (len <= 0)
samdanbury 6:37b6d0d56190 531 rc = FAILURE;
samdanbury 6:37b6d0d56190 532 else
samdanbury 6:37b6d0d56190 533 rc = sendPacket(len, timer);
samdanbury 6:37b6d0d56190 534 if (rc == FAILURE)
samdanbury 6:37b6d0d56190 535 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 536 }
samdanbury 6:37b6d0d56190 537 break;
icraggs 8:80d49dd91542 538 #endif
icraggs 8:80d49dd91542 539 #if MQTTCLIENT_QOS2
samdanbury 6:37b6d0d56190 540 case PUBREC:
samdanbury 6:37b6d0d56190 541 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 542 unsigned char dup, type;
samdanbury 6:37b6d0d56190 543 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 544 rc = FAILURE;
icraggs 8:80d49dd91542 545 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid)) <= 0)
samdanbury 6:37b6d0d56190 546 rc = FAILURE;
samdanbury 6:37b6d0d56190 547 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
samdanbury 6:37b6d0d56190 548 rc = FAILURE; // there was a problem
samdanbury 6:37b6d0d56190 549 if (rc == FAILURE)
samdanbury 6:37b6d0d56190 550 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 551 break;
samdanbury 6:37b6d0d56190 552 case PUBCOMP:
samdanbury 6:37b6d0d56190 553 break;
icraggs 8:80d49dd91542 554 #endif
samdanbury 6:37b6d0d56190 555 case PINGRESP:
samdanbury 6:37b6d0d56190 556 ping_outstanding = false;
samdanbury 6:37b6d0d56190 557 break;
samdanbury 6:37b6d0d56190 558 }
samdanbury 6:37b6d0d56190 559 keepalive();
samdanbury 6:37b6d0d56190 560 exit:
samdanbury 6:37b6d0d56190 561 if (rc == SUCCESS)
samdanbury 6:37b6d0d56190 562 rc = packet_type;
samdanbury 6:37b6d0d56190 563 return rc;
samdanbury 6:37b6d0d56190 564 }
samdanbury 6:37b6d0d56190 565
samdanbury 6:37b6d0d56190 566
samdanbury 6:37b6d0d56190 567 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 568 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
samdanbury 6:37b6d0d56190 569 {
samdanbury 6:37b6d0d56190 570 int rc = FAILURE;
samdanbury 6:37b6d0d56190 571
samdanbury 6:37b6d0d56190 572 if (keepAliveInterval == 0)
samdanbury 6:37b6d0d56190 573 {
samdanbury 6:37b6d0d56190 574 rc = SUCCESS;
samdanbury 6:37b6d0d56190 575 goto exit;
samdanbury 6:37b6d0d56190 576 }
samdanbury 6:37b6d0d56190 577
icraggs 8:80d49dd91542 578 if (last_sent.expired() || last_received.expired())
samdanbury 6:37b6d0d56190 579 {
samdanbury 6:37b6d0d56190 580 if (!ping_outstanding)
samdanbury 6:37b6d0d56190 581 {
samdanbury 6:37b6d0d56190 582 Timer timer = Timer(1000);
icraggs 8:80d49dd91542 583 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
samdanbury 6:37b6d0d56190 584 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
samdanbury 6:37b6d0d56190 585 ping_outstanding = true;
samdanbury 6:37b6d0d56190 586 }
samdanbury 6:37b6d0d56190 587 }
samdanbury 6:37b6d0d56190 588
samdanbury 6:37b6d0d56190 589 exit:
samdanbury 6:37b6d0d56190 590 return rc;
samdanbury 6:37b6d0d56190 591 }
samdanbury 6:37b6d0d56190 592
samdanbury 6:37b6d0d56190 593
samdanbury 6:37b6d0d56190 594 // only used in single-threaded mode where one command at a time is in process
icraggs 8:80d49dd91542 595 template<class Network, class Timer, int a, int b>
samdanbury 6:37b6d0d56190 596 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
samdanbury 6:37b6d0d56190 597 {
samdanbury 6:37b6d0d56190 598 int rc = FAILURE;
icraggs 8:80d49dd91542 599
samdanbury 6:37b6d0d56190 600 do
samdanbury 6:37b6d0d56190 601 {
icraggs 8:80d49dd91542 602 if (timer.expired())
samdanbury 6:37b6d0d56190 603 break; // we timed out
samdanbury 6:37b6d0d56190 604 }
icraggs 8:80d49dd91542 605 while ((rc = cycle(timer)) != packet_type);
icraggs 8:80d49dd91542 606
samdanbury 6:37b6d0d56190 607 return rc;
samdanbury 6:37b6d0d56190 608 }
samdanbury 6:37b6d0d56190 609
samdanbury 6:37b6d0d56190 610
icraggs 8:80d49dd91542 611 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 8:80d49dd91542 612 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
samdanbury 6:37b6d0d56190 613 {
samdanbury 6:37b6d0d56190 614 Timer connect_timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 615 int rc = FAILURE;
samdanbury 6:37b6d0d56190 616 int len = 0;
icraggs 8:80d49dd91542 617
samdanbury 6:37b6d0d56190 618 if (isconnected) // don't send connect packet again if we are already connected
samdanbury 6:37b6d0d56190 619 goto exit;
samdanbury 6:37b6d0d56190 620
icraggs 8:80d49dd91542 621 this->keepAliveInterval = options.keepAliveInterval;
icraggs 8:80d49dd91542 622 this->cleansession = options.cleansession;
icraggs 8:80d49dd91542 623 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
samdanbury 6:37b6d0d56190 624 goto exit;
samdanbury 6:37b6d0d56190 625 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
samdanbury 6:37b6d0d56190 626 goto exit; // there was a problem
icraggs 8:80d49dd91542 627
icraggs 8:80d49dd91542 628 if (this->keepAliveInterval > 0)
icraggs 8:80d49dd91542 629 last_received.countdown(this->keepAliveInterval);
samdanbury 6:37b6d0d56190 630 // this will be a blocking call, wait for the connack
samdanbury 6:37b6d0d56190 631 if (waitfor(CONNACK, connect_timer) == CONNACK)
samdanbury 6:37b6d0d56190 632 {
samdanbury 6:37b6d0d56190 633 unsigned char connack_rc = 255;
samdanbury 6:37b6d0d56190 634 bool sessionPresent = false;
samdanbury 6:37b6d0d56190 635 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
samdanbury 6:37b6d0d56190 636 rc = connack_rc;
samdanbury 6:37b6d0d56190 637 else
samdanbury 6:37b6d0d56190 638 rc = FAILURE;
samdanbury 6:37b6d0d56190 639 }
samdanbury 6:37b6d0d56190 640 else
samdanbury 6:37b6d0d56190 641 rc = FAILURE;
icraggs 8:80d49dd91542 642
icraggs 8:80d49dd91542 643 #if MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 644 // resend an inflight publish
icraggs 8:80d49dd91542 645 if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel)
icraggs 8:80d49dd91542 646 {
icraggs 8:80d49dd91542 647 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
icraggs 8:80d49dd91542 648 rc = FAILURE;
icraggs 8:80d49dd91542 649 else
icraggs 8:80d49dd91542 650 rc = publish(len, connect_timer, inflightQoS);
icraggs 8:80d49dd91542 651 }
icraggs 8:80d49dd91542 652 else
icraggs 8:80d49dd91542 653 #endif
icraggs 8:80d49dd91542 654 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 655 if (inflightMsgid > 0)
icraggs 8:80d49dd91542 656 {
icraggs 8:80d49dd91542 657 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
icraggs 8:80d49dd91542 658 rc = publish(inflightLen, connect_timer, inflightQoS);
icraggs 8:80d49dd91542 659 }
icraggs 8:80d49dd91542 660 #endif
icraggs 8:80d49dd91542 661
samdanbury 6:37b6d0d56190 662 exit:
samdanbury 6:37b6d0d56190 663 if (rc == SUCCESS)
samdanbury 6:37b6d0d56190 664 isconnected = true;
samdanbury 6:37b6d0d56190 665 return rc;
samdanbury 6:37b6d0d56190 666 }
samdanbury 6:37b6d0d56190 667
samdanbury 6:37b6d0d56190 668
icraggs 8:80d49dd91542 669 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 8:80d49dd91542 670 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
icraggs 8:80d49dd91542 671 {
icraggs 8:80d49dd91542 672 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 8:80d49dd91542 673 return connect(default_options);
icraggs 8:80d49dd91542 674 }
icraggs 8:80d49dd91542 675
icraggs 8:80d49dd91542 676
icraggs 8:80d49dd91542 677 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 678 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
icraggs 8:80d49dd91542 679 {
icraggs 8:80d49dd91542 680 int rc = FAILURE;
samdanbury 6:37b6d0d56190 681 Timer timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 682 int len = 0;
samdanbury 6:37b6d0d56190 683 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:80d49dd91542 684
samdanbury 6:37b6d0d56190 685 if (!isconnected)
samdanbury 6:37b6d0d56190 686 goto exit;
icraggs 8:80d49dd91542 687
icraggs 8:80d49dd91542 688 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
samdanbury 6:37b6d0d56190 689 if (len <= 0)
samdanbury 6:37b6d0d56190 690 goto exit;
samdanbury 6:37b6d0d56190 691 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
samdanbury 6:37b6d0d56190 692 goto exit; // there was a problem
icraggs 8:80d49dd91542 693
icraggs 8:80d49dd91542 694 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
samdanbury 6:37b6d0d56190 695 {
samdanbury 6:37b6d0d56190 696 int count = 0, grantedQoS = -1;
samdanbury 6:37b6d0d56190 697 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 698 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 8:80d49dd91542 699 rc = grantedQoS; // 0, 1, 2 or 0x80
samdanbury 6:37b6d0d56190 700 if (rc != 0x80)
samdanbury 6:37b6d0d56190 701 {
samdanbury 6:37b6d0d56190 702 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
samdanbury 6:37b6d0d56190 703 {
samdanbury 6:37b6d0d56190 704 if (messageHandlers[i].topicFilter == 0)
samdanbury 6:37b6d0d56190 705 {
samdanbury 6:37b6d0d56190 706 messageHandlers[i].topicFilter = topicFilter;
samdanbury 6:37b6d0d56190 707 messageHandlers[i].fp.attach(messageHandler);
samdanbury 6:37b6d0d56190 708 rc = 0;
samdanbury 6:37b6d0d56190 709 break;
samdanbury 6:37b6d0d56190 710 }
samdanbury 6:37b6d0d56190 711 }
samdanbury 6:37b6d0d56190 712 }
samdanbury 6:37b6d0d56190 713 }
icraggs 8:80d49dd91542 714 else
samdanbury 6:37b6d0d56190 715 rc = FAILURE;
icraggs 8:80d49dd91542 716
samdanbury 6:37b6d0d56190 717 exit:
icraggs 8:80d49dd91542 718 if (rc != SUCCESS)
icraggs 8:80d49dd91542 719 isconnected = false;
samdanbury 6:37b6d0d56190 720 return rc;
samdanbury 6:37b6d0d56190 721 }
samdanbury 6:37b6d0d56190 722
samdanbury 6:37b6d0d56190 723
icraggs 8:80d49dd91542 724 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
samdanbury 6:37b6d0d56190 725 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
icraggs 8:80d49dd91542 726 {
samdanbury 6:37b6d0d56190 727 int rc = FAILURE;
icraggs 8:80d49dd91542 728 Timer timer = Timer(command_timeout_ms);
samdanbury 6:37b6d0d56190 729 MQTTString topic = {(char*)topicFilter, 0, 0};
samdanbury 6:37b6d0d56190 730 int len = 0;
icraggs 8:80d49dd91542 731
samdanbury 6:37b6d0d56190 732 if (!isconnected)
samdanbury 6:37b6d0d56190 733 goto exit;
icraggs 8:80d49dd91542 734
icraggs 8:80d49dd91542 735 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
samdanbury 6:37b6d0d56190 736 goto exit;
icraggs 8:80d49dd91542 737 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
samdanbury 6:37b6d0d56190 738 goto exit; // there was a problem
icraggs 8:80d49dd91542 739
samdanbury 6:37b6d0d56190 740 if (waitfor(UNSUBACK, timer) == UNSUBACK)
samdanbury 6:37b6d0d56190 741 {
samdanbury 6:37b6d0d56190 742 unsigned short mypacketid; // should be the same as the packetid above
samdanbury 6:37b6d0d56190 743 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
icraggs 8:80d49dd91542 744 rc = 0;
samdanbury 6:37b6d0d56190 745 }
samdanbury 6:37b6d0d56190 746 else
samdanbury 6:37b6d0d56190 747 rc = FAILURE;
icraggs 8:80d49dd91542 748
samdanbury 6:37b6d0d56190 749 exit:
icraggs 8:80d49dd91542 750 if (rc != SUCCESS)
icraggs 8:80d49dd91542 751 isconnected = false;
samdanbury 6:37b6d0d56190 752 return rc;
samdanbury 6:37b6d0d56190 753 }
samdanbury 6:37b6d0d56190 754
samdanbury 6:37b6d0d56190 755
icraggs 8:80d49dd91542 756 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 8:80d49dd91542 757 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
samdanbury 6:37b6d0d56190 758 {
icraggs 8:80d49dd91542 759 int rc;
samdanbury 6:37b6d0d56190 760
icraggs 8:80d49dd91542 761 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
icraggs 8:80d49dd91542 762 goto exit; // there was a problem
samdanbury 6:37b6d0d56190 763
icraggs 8:80d49dd91542 764 #if MQTTCLIENT_QOS1
icraggs 8:80d49dd91542 765 if (qos == QOS1)
samdanbury 6:37b6d0d56190 766 {
samdanbury 6:37b6d0d56190 767 if (waitfor(PUBACK, timer) == PUBACK)
samdanbury 6:37b6d0d56190 768 {
samdanbury 6:37b6d0d56190 769 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 770 unsigned char dup, type;
samdanbury 6:37b6d0d56190 771 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 772 rc = FAILURE;
icraggs 8:80d49dd91542 773 else if (inflightMsgid == mypacketid)
icraggs 8:80d49dd91542 774 inflightMsgid = 0;
samdanbury 6:37b6d0d56190 775 }
samdanbury 6:37b6d0d56190 776 else
samdanbury 6:37b6d0d56190 777 rc = FAILURE;
samdanbury 6:37b6d0d56190 778 }
icraggs 8:80d49dd91542 779 #elif MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 780 else if (qos == QOS2)
samdanbury 6:37b6d0d56190 781 {
samdanbury 6:37b6d0d56190 782 if (waitfor(PUBCOMP, timer) == PUBCOMP)
samdanbury 6:37b6d0d56190 783 {
samdanbury 6:37b6d0d56190 784 unsigned short mypacketid;
samdanbury 6:37b6d0d56190 785 unsigned char dup, type;
samdanbury 6:37b6d0d56190 786 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
samdanbury 6:37b6d0d56190 787 rc = FAILURE;
icraggs 8:80d49dd91542 788 else if (inflightMsgid == mypacketid)
icraggs 8:80d49dd91542 789 inflightMsgid = 0;
samdanbury 6:37b6d0d56190 790 }
samdanbury 6:37b6d0d56190 791 else
samdanbury 6:37b6d0d56190 792 rc = FAILURE;
samdanbury 6:37b6d0d56190 793 }
icraggs 8:80d49dd91542 794 #endif
icraggs 8:80d49dd91542 795
icraggs 8:80d49dd91542 796 exit:
icraggs 8:80d49dd91542 797 if (rc != SUCCESS)
icraggs 8:80d49dd91542 798 isconnected = false;
icraggs 8:80d49dd91542 799 return rc;
icraggs 8:80d49dd91542 800 }
icraggs 8:80d49dd91542 801
icraggs 8:80d49dd91542 802
icraggs 8:80d49dd91542 803
icraggs 8:80d49dd91542 804 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 8:80d49dd91542 805 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
icraggs 8:80d49dd91542 806 {
icraggs 8:80d49dd91542 807 int rc = FAILURE;
icraggs 8:80d49dd91542 808 Timer timer = Timer(command_timeout_ms);
icraggs 8:80d49dd91542 809 MQTTString topicString = MQTTString_initializer;
icraggs 8:80d49dd91542 810 int len = 0;
icraggs 8:80d49dd91542 811
icraggs 8:80d49dd91542 812 if (!isconnected)
icraggs 8:80d49dd91542 813 goto exit;
icraggs 8:80d49dd91542 814
icraggs 8:80d49dd91542 815 topicString.cstring = (char*)topicName;
icraggs 8:80d49dd91542 816
icraggs 8:80d49dd91542 817 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 818 if (qos == QOS1 || qos == QOS2)
icraggs 8:80d49dd91542 819 id = packetid.getNext();
icraggs 8:80d49dd91542 820 #endif
icraggs 8:80d49dd91542 821
icraggs 8:80d49dd91542 822 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
icraggs 8:80d49dd91542 823 topicString, (unsigned char*)payload, payloadlen);
icraggs 8:80d49dd91542 824 if (len <= 0)
icraggs 8:80d49dd91542 825 goto exit;
icraggs 8:80d49dd91542 826
icraggs 8:80d49dd91542 827 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 828 if (!cleansession)
icraggs 8:80d49dd91542 829 {
icraggs 8:80d49dd91542 830 memcpy(pubbuf, sendbuf, len);
icraggs 8:80d49dd91542 831 inflightMsgid = id;
icraggs 8:80d49dd91542 832 inflightLen = len;
icraggs 8:80d49dd91542 833 inflightQoS = qos;
icraggs 8:80d49dd91542 834 #if MQTTCLIENT_QOS2
icraggs 8:80d49dd91542 835 pubrel = false;
icraggs 8:80d49dd91542 836 #endif
icraggs 8:80d49dd91542 837 }
icraggs 8:80d49dd91542 838 #endif
icraggs 8:80d49dd91542 839
icraggs 8:80d49dd91542 840 rc = publish(len, timer, qos);
samdanbury 6:37b6d0d56190 841 exit:
samdanbury 6:37b6d0d56190 842 return rc;
samdanbury 6:37b6d0d56190 843 }
samdanbury 6:37b6d0d56190 844
samdanbury 6:37b6d0d56190 845
icraggs 8:80d49dd91542 846 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 8:80d49dd91542 847 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
icraggs 8:80d49dd91542 848 {
icraggs 8:80d49dd91542 849 unsigned short id = 0; // dummy - not used for anything
icraggs 8:80d49dd91542 850 return publish(topicName, payload, payloadlen, id, qos, retained);
icraggs 8:80d49dd91542 851 }
icraggs 8:80d49dd91542 852
icraggs 8:80d49dd91542 853
icraggs 8:80d49dd91542 854 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
icraggs 8:80d49dd91542 855 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
icraggs 8:80d49dd91542 856 {
icraggs 8:80d49dd91542 857 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
icraggs 8:80d49dd91542 858 }
icraggs 8:80d49dd91542 859
icraggs 8:80d49dd91542 860
icraggs 8:80d49dd91542 861 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
samdanbury 6:37b6d0d56190 862 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
icraggs 8:80d49dd91542 863 {
samdanbury 6:37b6d0d56190 864 int rc = FAILURE;
samdanbury 6:37b6d0d56190 865 Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
samdanbury 6:37b6d0d56190 866 int len = MQTTSerialize_disconnect(buf, MAX_MQTT_PACKET_SIZE);
samdanbury 6:37b6d0d56190 867 if (len > 0)
samdanbury 6:37b6d0d56190 868 rc = sendPacket(len, timer); // send the disconnect packet
icraggs 8:80d49dd91542 869
samdanbury 6:37b6d0d56190 870 isconnected = false;
samdanbury 6:37b6d0d56190 871 return rc;
samdanbury 6:37b6d0d56190 872 }
samdanbury 6:37b6d0d56190 873
samdanbury 6:37b6d0d56190 874
icraggs 8:80d49dd91542 875 #endif