hellomqttt to thingspeak mqtt and ifttt

Dependencies:   Servo MQTTPacket FP

Committer:
jasonberry
Date:
Wed May 05 14:48:01 2021 +0000
Revision:
25:ca1b1098c77f
TEST

Who changed what in which revision?

UserRevisionLine numberNew contents of line
jasonberry 25:ca1b1098c77f 1 /*******************************************************************************
jasonberry 25:ca1b1098c77f 2 * Copyright (c) 2014, 2017 IBM Corp.
jasonberry 25:ca1b1098c77f 3 *
jasonberry 25:ca1b1098c77f 4 * All rights reserved. This program and the accompanying materials
jasonberry 25:ca1b1098c77f 5 * are made available under the terms of the Eclipse Public License v1.0
jasonberry 25:ca1b1098c77f 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
jasonberry 25:ca1b1098c77f 7 *
jasonberry 25:ca1b1098c77f 8 * The Eclipse Public License is available at
jasonberry 25:ca1b1098c77f 9 * http://www.eclipse.org/legal/epl-v10.html
jasonberry 25:ca1b1098c77f 10 * and the Eclipse Distribution License is available at
jasonberry 25:ca1b1098c77f 11 * http://www.eclipse.org/org/documents/edl-v10.php.
jasonberry 25:ca1b1098c77f 12 *
jasonberry 25:ca1b1098c77f 13 * Contributors:
jasonberry 25:ca1b1098c77f 14 * Ian Craggs - initial API and implementation and/or initial documentation
jasonberry 25:ca1b1098c77f 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
jasonberry 25:ca1b1098c77f 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
jasonberry 25:ca1b1098c77f 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
jasonberry 25:ca1b1098c77f 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
jasonberry 25:ca1b1098c77f 19 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
jasonberry 25:ca1b1098c77f 20 * Ian Craggs - fix for bug 475749 - packetid modified twice
jasonberry 25:ca1b1098c77f 21 * Ian Craggs - add ability to set message handler separately #6
jasonberry 25:ca1b1098c77f 22 *******************************************************************************/
jasonberry 25:ca1b1098c77f 23
jasonberry 25:ca1b1098c77f 24 #if !defined(MQTTCLIENT_H)
jasonberry 25:ca1b1098c77f 25 #define MQTTCLIENT_H
jasonberry 25:ca1b1098c77f 26
jasonberry 25:ca1b1098c77f 27 #include "FP.h"
jasonberry 25:ca1b1098c77f 28 #include "MQTTPacket.h"
jasonberry 25:ca1b1098c77f 29 #include <stdio.h>
jasonberry 25:ca1b1098c77f 30 #include "MQTTLogging.h"
jasonberry 25:ca1b1098c77f 31
jasonberry 25:ca1b1098c77f 32 #if !defined(MQTTCLIENT_QOS1)
jasonberry 25:ca1b1098c77f 33 #define MQTTCLIENT_QOS1 1
jasonberry 25:ca1b1098c77f 34 #endif
jasonberry 25:ca1b1098c77f 35 #if !defined(MQTTCLIENT_QOS2)
jasonberry 25:ca1b1098c77f 36 #define MQTTCLIENT_QOS2 0
jasonberry 25:ca1b1098c77f 37 #endif
jasonberry 25:ca1b1098c77f 38
jasonberry 25:ca1b1098c77f 39 namespace MQTT
jasonberry 25:ca1b1098c77f 40 {
jasonberry 25:ca1b1098c77f 41
jasonberry 25:ca1b1098c77f 42
jasonberry 25:ca1b1098c77f 43 enum QoS { QOS0, QOS1, QOS2 };
jasonberry 25:ca1b1098c77f 44
jasonberry 25:ca1b1098c77f 45 // all failure return codes must be negative
jasonberry 25:ca1b1098c77f 46 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
jasonberry 25:ca1b1098c77f 47
jasonberry 25:ca1b1098c77f 48
jasonberry 25:ca1b1098c77f 49 struct Message
jasonberry 25:ca1b1098c77f 50 {
jasonberry 25:ca1b1098c77f 51 enum QoS qos;
jasonberry 25:ca1b1098c77f 52 bool retained;
jasonberry 25:ca1b1098c77f 53 bool dup;
jasonberry 25:ca1b1098c77f 54 unsigned short id;
jasonberry 25:ca1b1098c77f 55 void *payload;
jasonberry 25:ca1b1098c77f 56 size_t payloadlen;
jasonberry 25:ca1b1098c77f 57 };
jasonberry 25:ca1b1098c77f 58
jasonberry 25:ca1b1098c77f 59
jasonberry 25:ca1b1098c77f 60 struct MessageData
jasonberry 25:ca1b1098c77f 61 {
jasonberry 25:ca1b1098c77f 62 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
jasonberry 25:ca1b1098c77f 63 { }
jasonberry 25:ca1b1098c77f 64
jasonberry 25:ca1b1098c77f 65 struct Message &message;
jasonberry 25:ca1b1098c77f 66 MQTTString &topicName;
jasonberry 25:ca1b1098c77f 67 };
jasonberry 25:ca1b1098c77f 68
jasonberry 25:ca1b1098c77f 69
jasonberry 25:ca1b1098c77f 70 struct connackData
jasonberry 25:ca1b1098c77f 71 {
jasonberry 25:ca1b1098c77f 72 int rc;
jasonberry 25:ca1b1098c77f 73 bool sessionPresent;
jasonberry 25:ca1b1098c77f 74 };
jasonberry 25:ca1b1098c77f 75
jasonberry 25:ca1b1098c77f 76
jasonberry 25:ca1b1098c77f 77 struct subackData
jasonberry 25:ca1b1098c77f 78 {
jasonberry 25:ca1b1098c77f 79 int grantedQoS;
jasonberry 25:ca1b1098c77f 80 };
jasonberry 25:ca1b1098c77f 81
jasonberry 25:ca1b1098c77f 82
jasonberry 25:ca1b1098c77f 83 class PacketId
jasonberry 25:ca1b1098c77f 84 {
jasonberry 25:ca1b1098c77f 85 public:
jasonberry 25:ca1b1098c77f 86 PacketId()
jasonberry 25:ca1b1098c77f 87 {
jasonberry 25:ca1b1098c77f 88 next = 0;
jasonberry 25:ca1b1098c77f 89 }
jasonberry 25:ca1b1098c77f 90
jasonberry 25:ca1b1098c77f 91 int getNext()
jasonberry 25:ca1b1098c77f 92 {
jasonberry 25:ca1b1098c77f 93 return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
jasonberry 25:ca1b1098c77f 94 }
jasonberry 25:ca1b1098c77f 95
jasonberry 25:ca1b1098c77f 96 private:
jasonberry 25:ca1b1098c77f 97 static const int MAX_PACKET_ID = 65535;
jasonberry 25:ca1b1098c77f 98 int next;
jasonberry 25:ca1b1098c77f 99 };
jasonberry 25:ca1b1098c77f 100
jasonberry 25:ca1b1098c77f 101
jasonberry 25:ca1b1098c77f 102 /**
jasonberry 25:ca1b1098c77f 103 * @class Client
jasonberry 25:ca1b1098c77f 104 * @brief blocking, non-threaded MQTT client API
jasonberry 25:ca1b1098c77f 105 *
jasonberry 25:ca1b1098c77f 106 * This version of the API blocks on all method calls, until they are complete. This means that only one
jasonberry 25:ca1b1098c77f 107 * MQTT request can be in process at any one time.
jasonberry 25:ca1b1098c77f 108 * @param Network a network class which supports send, receive
jasonberry 25:ca1b1098c77f 109 * @param Timer a timer class with the methods:
jasonberry 25:ca1b1098c77f 110 */
jasonberry 25:ca1b1098c77f 111 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 200, int MAX_MESSAGE_HANDLERS = 5>
jasonberry 25:ca1b1098c77f 112 class Client
jasonberry 25:ca1b1098c77f 113 {
jasonberry 25:ca1b1098c77f 114
jasonberry 25:ca1b1098c77f 115 public:
jasonberry 25:ca1b1098c77f 116
jasonberry 25:ca1b1098c77f 117 typedef void (*messageHandler)(MessageData&);
jasonberry 25:ca1b1098c77f 118
jasonberry 25:ca1b1098c77f 119 /** Construct the client
jasonberry 25:ca1b1098c77f 120 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
jasonberry 25:ca1b1098c77f 121 * before calling MQTT connect
jasonberry 25:ca1b1098c77f 122 * @param limits an instance of the Limit class - to alter limits as required
jasonberry 25:ca1b1098c77f 123 */
jasonberry 25:ca1b1098c77f 124 Client(Network& network, unsigned int command_timeout_ms = 30000);
jasonberry 25:ca1b1098c77f 125
jasonberry 25:ca1b1098c77f 126 /** Set the default message handling callback - used for any message which does not match a subscription message handler
jasonberry 25:ca1b1098c77f 127 * @param mh - pointer to the callback function. Set to 0 to remove.
jasonberry 25:ca1b1098c77f 128 */
jasonberry 25:ca1b1098c77f 129 void setDefaultMessageHandler(messageHandler mh)
jasonberry 25:ca1b1098c77f 130 {
jasonberry 25:ca1b1098c77f 131 if (mh != 0)
jasonberry 25:ca1b1098c77f 132 defaultMessageHandler.attach(mh);
jasonberry 25:ca1b1098c77f 133 else
jasonberry 25:ca1b1098c77f 134 defaultMessageHandler.detach();
jasonberry 25:ca1b1098c77f 135 }
jasonberry 25:ca1b1098c77f 136
jasonberry 25:ca1b1098c77f 137 /** Set a message handling callback. This can be used outside of the the subscribe method.
jasonberry 25:ca1b1098c77f 138 * @param topicFilter - a topic pattern which can include wildcards
jasonberry 25:ca1b1098c77f 139 * @param mh - pointer to the callback function. If 0, removes the callback if any
jasonberry 25:ca1b1098c77f 140 */
jasonberry 25:ca1b1098c77f 141 int setMessageHandler(const char* topicFilter, messageHandler mh);
jasonberry 25:ca1b1098c77f 142
jasonberry 25:ca1b1098c77f 143 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jasonberry 25:ca1b1098c77f 144 * The nework object must be connected to the network endpoint before calling this
jasonberry 25:ca1b1098c77f 145 * Default connect options are used
jasonberry 25:ca1b1098c77f 146 * @return success code -
jasonberry 25:ca1b1098c77f 147 */
jasonberry 25:ca1b1098c77f 148 int connect();
jasonberry 25:ca1b1098c77f 149
jasonberry 25:ca1b1098c77f 150 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jasonberry 25:ca1b1098c77f 151 * The nework object must be connected to the network endpoint before calling this
jasonberry 25:ca1b1098c77f 152 * @param options - connect options
jasonberry 25:ca1b1098c77f 153 * @return success code -
jasonberry 25:ca1b1098c77f 154 */
jasonberry 25:ca1b1098c77f 155 int connect(MQTTPacket_connectData& options);
jasonberry 25:ca1b1098c77f 156
jasonberry 25:ca1b1098c77f 157 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jasonberry 25:ca1b1098c77f 158 * The nework object must be connected to the network endpoint before calling this
jasonberry 25:ca1b1098c77f 159 * @param options - connect options
jasonberry 25:ca1b1098c77f 160 * @param connackData - connack data to be returned
jasonberry 25:ca1b1098c77f 161 * @return success code -
jasonberry 25:ca1b1098c77f 162 */
jasonberry 25:ca1b1098c77f 163 int connect(MQTTPacket_connectData& options, connackData& data);
jasonberry 25:ca1b1098c77f 164
jasonberry 25:ca1b1098c77f 165 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jasonberry 25:ca1b1098c77f 166 * @param topic - the topic to publish to
jasonberry 25:ca1b1098c77f 167 * @param message - the message to send
jasonberry 25:ca1b1098c77f 168 * @return success code -
jasonberry 25:ca1b1098c77f 169 */
jasonberry 25:ca1b1098c77f 170 int publish(const char* topicName, Message& message);
jasonberry 25:ca1b1098c77f 171
jasonberry 25:ca1b1098c77f 172 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jasonberry 25:ca1b1098c77f 173 * @param topic - the topic to publish to
jasonberry 25:ca1b1098c77f 174 * @param payload - the data to send
jasonberry 25:ca1b1098c77f 175 * @param payloadlen - the length of the data
jasonberry 25:ca1b1098c77f 176 * @param qos - the QoS to send the publish at
jasonberry 25:ca1b1098c77f 177 * @param retained - whether the message should be retained
jasonberry 25:ca1b1098c77f 178 * @return success code -
jasonberry 25:ca1b1098c77f 179 */
jasonberry 25:ca1b1098c77f 180 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
jasonberry 25:ca1b1098c77f 181
jasonberry 25:ca1b1098c77f 182 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jasonberry 25:ca1b1098c77f 183 * @param topic - the topic to publish to
jasonberry 25:ca1b1098c77f 184 * @param payload - the data to send
jasonberry 25:ca1b1098c77f 185 * @param payloadlen - the length of the data
jasonberry 25:ca1b1098c77f 186 * @param id - the packet id used - returned
jasonberry 25:ca1b1098c77f 187 * @param qos - the QoS to send the publish at
jasonberry 25:ca1b1098c77f 188 * @param retained - whether the message should be retained
jasonberry 25:ca1b1098c77f 189 * @return success code -
jasonberry 25:ca1b1098c77f 190 */
jasonberry 25:ca1b1098c77f 191 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
jasonberry 25:ca1b1098c77f 192
jasonberry 25:ca1b1098c77f 193 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
jasonberry 25:ca1b1098c77f 194 * @param topicFilter - a topic pattern which can include wildcards
jasonberry 25:ca1b1098c77f 195 * @param qos - the MQTT QoS to subscribe at
jasonberry 25:ca1b1098c77f 196 * @param mh - the callback function to be invoked when a message is received for this subscription
jasonberry 25:ca1b1098c77f 197 * @return success code -
jasonberry 25:ca1b1098c77f 198 */
jasonberry 25:ca1b1098c77f 199 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
jasonberry 25:ca1b1098c77f 200
jasonberry 25:ca1b1098c77f 201 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
jasonberry 25:ca1b1098c77f 202 * @param topicFilter - a topic pattern which can include wildcards
jasonberry 25:ca1b1098c77f 203 * @param qos - the MQTT QoS to subscribe at©
jasonberry 25:ca1b1098c77f 204 * @param mh - the callback function to be invoked when a message is received for this subscription
jasonberry 25:ca1b1098c77f 205 * @param
jasonberry 25:ca1b1098c77f 206 * @return success code -
jasonberry 25:ca1b1098c77f 207 */
jasonberry 25:ca1b1098c77f 208 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
jasonberry 25:ca1b1098c77f 209
jasonberry 25:ca1b1098c77f 210 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
jasonberry 25:ca1b1098c77f 211 * @param topicFilter - a topic pattern which can include wildcards
jasonberry 25:ca1b1098c77f 212 * @return success code -
jasonberry 25:ca1b1098c77f 213 */
jasonberry 25:ca1b1098c77f 214 int unsubscribe(const char* topicFilter);
jasonberry 25:ca1b1098c77f 215
jasonberry 25:ca1b1098c77f 216 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
jasonberry 25:ca1b1098c77f 217 * @return success code -
jasonberry 25:ca1b1098c77f 218 */
jasonberry 25:ca1b1098c77f 219 int disconnect();
jasonberry 25:ca1b1098c77f 220
jasonberry 25:ca1b1098c77f 221 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
jasonberry 25:ca1b1098c77f 222 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
jasonberry 25:ca1b1098c77f 223 * received.
jasonberry 25:ca1b1098c77f 224 * @param timeout_ms the time to wait, in milliseconds
jasonberry 25:ca1b1098c77f 225 * @return success code - on failure, this means the client has disconnected
jasonberry 25:ca1b1098c77f 226 */
jasonberry 25:ca1b1098c77f 227 int yield(unsigned long timeout_ms = 1000L);
jasonberry 25:ca1b1098c77f 228
jasonberry 25:ca1b1098c77f 229 /** Is the client connected?
jasonberry 25:ca1b1098c77f 230 * @return flag - is the client connected or not?
jasonberry 25:ca1b1098c77f 231 */
jasonberry 25:ca1b1098c77f 232 bool isConnected()
jasonberry 25:ca1b1098c77f 233 {
jasonberry 25:ca1b1098c77f 234 return isconnected;
jasonberry 25:ca1b1098c77f 235 }
jasonberry 25:ca1b1098c77f 236
jasonberry 25:ca1b1098c77f 237 private:
jasonberry 25:ca1b1098c77f 238
jasonberry 25:ca1b1098c77f 239 void closeSession();
jasonberry 25:ca1b1098c77f 240 void cleanSession();
jasonberry 25:ca1b1098c77f 241 int cycle(Timer& timer);
jasonberry 25:ca1b1098c77f 242 int waitfor(int packet_type, Timer& timer);
jasonberry 25:ca1b1098c77f 243 int keepalive();
jasonberry 25:ca1b1098c77f 244 int publish(int len, Timer& timer, enum QoS qos);
jasonberry 25:ca1b1098c77f 245
jasonberry 25:ca1b1098c77f 246 int decodePacket(int* value, int timeout);
jasonberry 25:ca1b1098c77f 247 int readPacket(Timer& timer);
jasonberry 25:ca1b1098c77f 248 int sendPacket(int length, Timer& timer);
jasonberry 25:ca1b1098c77f 249 int deliverMessage(MQTTString& topicName, Message& message);
jasonberry 25:ca1b1098c77f 250 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
jasonberry 25:ca1b1098c77f 251
jasonberry 25:ca1b1098c77f 252 Network& ipstack;
jasonberry 25:ca1b1098c77f 253 unsigned long command_timeout_ms;
jasonberry 25:ca1b1098c77f 254
jasonberry 25:ca1b1098c77f 255 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
jasonberry 25:ca1b1098c77f 256 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
jasonberry 25:ca1b1098c77f 257
jasonberry 25:ca1b1098c77f 258 Timer last_sent, last_received;
jasonberry 25:ca1b1098c77f 259 unsigned int keepAliveInterval;
jasonberry 25:ca1b1098c77f 260 bool ping_outstanding;
jasonberry 25:ca1b1098c77f 261 bool cleansession;
jasonberry 25:ca1b1098c77f 262
jasonberry 25:ca1b1098c77f 263 PacketId packetid;
jasonberry 25:ca1b1098c77f 264
jasonberry 25:ca1b1098c77f 265 struct MessageHandlers
jasonberry 25:ca1b1098c77f 266 {
jasonberry 25:ca1b1098c77f 267 const char* topicFilter;
jasonberry 25:ca1b1098c77f 268 FP<void, MessageData&> fp;
jasonberry 25:ca1b1098c77f 269 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
jasonberry 25:ca1b1098c77f 270
jasonberry 25:ca1b1098c77f 271 FP<void, MessageData&> defaultMessageHandler;
jasonberry 25:ca1b1098c77f 272
jasonberry 25:ca1b1098c77f 273 bool isconnected;
jasonberry 25:ca1b1098c77f 274
jasonberry 25:ca1b1098c77f 275 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 276 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
jasonberry 25:ca1b1098c77f 277 int inflightLen;
jasonberry 25:ca1b1098c77f 278 unsigned short inflightMsgid;
jasonberry 25:ca1b1098c77f 279 enum QoS inflightQoS;
jasonberry 25:ca1b1098c77f 280 #endif
jasonberry 25:ca1b1098c77f 281
jasonberry 25:ca1b1098c77f 282 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 283 bool pubrel;
jasonberry 25:ca1b1098c77f 284 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
jasonberry 25:ca1b1098c77f 285 #define MAX_INCOMING_QOS2_MESSAGES 10
jasonberry 25:ca1b1098c77f 286 #endif
jasonberry 25:ca1b1098c77f 287 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
jasonberry 25:ca1b1098c77f 288 bool isQoS2msgidFree(unsigned short id);
jasonberry 25:ca1b1098c77f 289 bool useQoS2msgid(unsigned short id);
jasonberry 25:ca1b1098c77f 290 void freeQoS2msgid(unsigned short id);
jasonberry 25:ca1b1098c77f 291 #endif
jasonberry 25:ca1b1098c77f 292
jasonberry 25:ca1b1098c77f 293 };
jasonberry 25:ca1b1098c77f 294
jasonberry 25:ca1b1098c77f 295 }
jasonberry 25:ca1b1098c77f 296
jasonberry 25:ca1b1098c77f 297
jasonberry 25:ca1b1098c77f 298 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 299 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
jasonberry 25:ca1b1098c77f 300 {
jasonberry 25:ca1b1098c77f 301 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jasonberry 25:ca1b1098c77f 302 messageHandlers[i].topicFilter = 0;
jasonberry 25:ca1b1098c77f 303
jasonberry 25:ca1b1098c77f 304 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 305 inflightMsgid = 0;
jasonberry 25:ca1b1098c77f 306 inflightQoS = QOS0;
jasonberry 25:ca1b1098c77f 307 #endif
jasonberry 25:ca1b1098c77f 308
jasonberry 25:ca1b1098c77f 309 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 310 pubrel = false;
jasonberry 25:ca1b1098c77f 311 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jasonberry 25:ca1b1098c77f 312 incomingQoS2messages[i] = 0;
jasonberry 25:ca1b1098c77f 313 #endif
jasonberry 25:ca1b1098c77f 314 }
jasonberry 25:ca1b1098c77f 315
jasonberry 25:ca1b1098c77f 316
jasonberry 25:ca1b1098c77f 317 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 318 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
jasonberry 25:ca1b1098c77f 319 {
jasonberry 25:ca1b1098c77f 320 ping_outstanding = false;
jasonberry 25:ca1b1098c77f 321 isconnected = false;
jasonberry 25:ca1b1098c77f 322 if (cleansession)
jasonberry 25:ca1b1098c77f 323 cleanSession();
jasonberry 25:ca1b1098c77f 324 }
jasonberry 25:ca1b1098c77f 325
jasonberry 25:ca1b1098c77f 326
jasonberry 25:ca1b1098c77f 327 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 328 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
jasonberry 25:ca1b1098c77f 329 {
jasonberry 25:ca1b1098c77f 330 this->command_timeout_ms = command_timeout_ms;
jasonberry 25:ca1b1098c77f 331 cleansession = true;
jasonberry 25:ca1b1098c77f 332 closeSession();
jasonberry 25:ca1b1098c77f 333 }
jasonberry 25:ca1b1098c77f 334
jasonberry 25:ca1b1098c77f 335
jasonberry 25:ca1b1098c77f 336 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 337 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 338 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
jasonberry 25:ca1b1098c77f 339 {
jasonberry 25:ca1b1098c77f 340 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jasonberry 25:ca1b1098c77f 341 {
jasonberry 25:ca1b1098c77f 342 if (incomingQoS2messages[i] == id)
jasonberry 25:ca1b1098c77f 343 return false;
jasonberry 25:ca1b1098c77f 344 }
jasonberry 25:ca1b1098c77f 345 return true;
jasonberry 25:ca1b1098c77f 346 }
jasonberry 25:ca1b1098c77f 347
jasonberry 25:ca1b1098c77f 348
jasonberry 25:ca1b1098c77f 349 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 350 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
jasonberry 25:ca1b1098c77f 351 {
jasonberry 25:ca1b1098c77f 352 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jasonberry 25:ca1b1098c77f 353 {
jasonberry 25:ca1b1098c77f 354 if (incomingQoS2messages[i] == 0)
jasonberry 25:ca1b1098c77f 355 {
jasonberry 25:ca1b1098c77f 356 incomingQoS2messages[i] = id;
jasonberry 25:ca1b1098c77f 357 return true;
jasonberry 25:ca1b1098c77f 358 }
jasonberry 25:ca1b1098c77f 359 }
jasonberry 25:ca1b1098c77f 360 return false;
jasonberry 25:ca1b1098c77f 361 }
jasonberry 25:ca1b1098c77f 362
jasonberry 25:ca1b1098c77f 363
jasonberry 25:ca1b1098c77f 364 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 365 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
jasonberry 25:ca1b1098c77f 366 {
jasonberry 25:ca1b1098c77f 367 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jasonberry 25:ca1b1098c77f 368 {
jasonberry 25:ca1b1098c77f 369 if (incomingQoS2messages[i] == id)
jasonberry 25:ca1b1098c77f 370 {
jasonberry 25:ca1b1098c77f 371 incomingQoS2messages[i] = 0;
jasonberry 25:ca1b1098c77f 372 return;
jasonberry 25:ca1b1098c77f 373 }
jasonberry 25:ca1b1098c77f 374 }
jasonberry 25:ca1b1098c77f 375 }
jasonberry 25:ca1b1098c77f 376 #endif
jasonberry 25:ca1b1098c77f 377
jasonberry 25:ca1b1098c77f 378
jasonberry 25:ca1b1098c77f 379 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 380 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
jasonberry 25:ca1b1098c77f 381 {
jasonberry 25:ca1b1098c77f 382 int rc = FAILURE,
jasonberry 25:ca1b1098c77f 383 sent = 0;
jasonberry 25:ca1b1098c77f 384
jasonberry 25:ca1b1098c77f 385 while (sent < length)
jasonberry 25:ca1b1098c77f 386 {
jasonberry 25:ca1b1098c77f 387 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
jasonberry 25:ca1b1098c77f 388 if (rc < 0) // there was an error writing the data
jasonberry 25:ca1b1098c77f 389 break;
jasonberry 25:ca1b1098c77f 390 sent += rc;
jasonberry 25:ca1b1098c77f 391 if (timer.expired()) // only check expiry after at least one attempt to write
jasonberry 25:ca1b1098c77f 392 break;
jasonberry 25:ca1b1098c77f 393 }
jasonberry 25:ca1b1098c77f 394 if (sent == length)
jasonberry 25:ca1b1098c77f 395 {
jasonberry 25:ca1b1098c77f 396 if (this->keepAliveInterval > 0)
jasonberry 25:ca1b1098c77f 397 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
jasonberry 25:ca1b1098c77f 398 rc = SUCCESS;
jasonberry 25:ca1b1098c77f 399 }
jasonberry 25:ca1b1098c77f 400 else
jasonberry 25:ca1b1098c77f 401 rc = FAILURE;
jasonberry 25:ca1b1098c77f 402
jasonberry 25:ca1b1098c77f 403 #if defined(MQTT_DEBUG)
jasonberry 25:ca1b1098c77f 404 char printbuf[150];
jasonberry 25:ca1b1098c77f 405 DEBUG("Rc %d from sending packet %s\r\n", rc,
jasonberry 25:ca1b1098c77f 406 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
jasonberry 25:ca1b1098c77f 407 #endif
jasonberry 25:ca1b1098c77f 408 return rc;
jasonberry 25:ca1b1098c77f 409 }
jasonberry 25:ca1b1098c77f 410
jasonberry 25:ca1b1098c77f 411
jasonberry 25:ca1b1098c77f 412 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 413 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
jasonberry 25:ca1b1098c77f 414 {
jasonberry 25:ca1b1098c77f 415 unsigned char c;
jasonberry 25:ca1b1098c77f 416 int multiplier = 1;
jasonberry 25:ca1b1098c77f 417 int len = 0;
jasonberry 25:ca1b1098c77f 418 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
jasonberry 25:ca1b1098c77f 419
jasonberry 25:ca1b1098c77f 420 *value = 0;
jasonberry 25:ca1b1098c77f 421 do
jasonberry 25:ca1b1098c77f 422 {
jasonberry 25:ca1b1098c77f 423 int rc = MQTTPACKET_READ_ERROR;
jasonberry 25:ca1b1098c77f 424
jasonberry 25:ca1b1098c77f 425 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
jasonberry 25:ca1b1098c77f 426 {
jasonberry 25:ca1b1098c77f 427 rc = MQTTPACKET_READ_ERROR; /* bad data */
jasonberry 25:ca1b1098c77f 428 goto exit;
jasonberry 25:ca1b1098c77f 429 }
jasonberry 25:ca1b1098c77f 430 rc = ipstack.read(&c, 1, timeout);
jasonberry 25:ca1b1098c77f 431 if (rc != 1)
jasonberry 25:ca1b1098c77f 432 goto exit;
jasonberry 25:ca1b1098c77f 433 *value += (c & 127) * multiplier;
jasonberry 25:ca1b1098c77f 434 multiplier *= 128;
jasonberry 25:ca1b1098c77f 435 } while ((c & 128) != 0);
jasonberry 25:ca1b1098c77f 436 exit:
jasonberry 25:ca1b1098c77f 437 return len;
jasonberry 25:ca1b1098c77f 438 }
jasonberry 25:ca1b1098c77f 439
jasonberry 25:ca1b1098c77f 440
jasonberry 25:ca1b1098c77f 441 /**
jasonberry 25:ca1b1098c77f 442 * If any read fails in this method, then we should disconnect from the network, as on reconnect
jasonberry 25:ca1b1098c77f 443 * the packets can be retried.
jasonberry 25:ca1b1098c77f 444 * @param timeout the max time to wait for the packet read to complete, in milliseconds
jasonberry 25:ca1b1098c77f 445 * @return the MQTT packet type, 0 if none, -1 if error
jasonberry 25:ca1b1098c77f 446 */
jasonberry 25:ca1b1098c77f 447 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 448 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
jasonberry 25:ca1b1098c77f 449 {
jasonberry 25:ca1b1098c77f 450 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 451 MQTTHeader header = {0};
jasonberry 25:ca1b1098c77f 452 int len = 0;
jasonberry 25:ca1b1098c77f 453 int rem_len = 0;
jasonberry 25:ca1b1098c77f 454
jasonberry 25:ca1b1098c77f 455 /* 1. read the header byte. This has the packet type in it */
jasonberry 25:ca1b1098c77f 456 rc = ipstack.read(readbuf, 1, timer.left_ms());
jasonberry 25:ca1b1098c77f 457 if (rc != 1)
jasonberry 25:ca1b1098c77f 458 goto exit;
jasonberry 25:ca1b1098c77f 459
jasonberry 25:ca1b1098c77f 460 len = 1;
jasonberry 25:ca1b1098c77f 461 /* 2. read the remaining length. This is variable in itself */
jasonberry 25:ca1b1098c77f 462 decodePacket(&rem_len, timer.left_ms());
jasonberry 25:ca1b1098c77f 463 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
jasonberry 25:ca1b1098c77f 464
jasonberry 25:ca1b1098c77f 465 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
jasonberry 25:ca1b1098c77f 466 {
jasonberry 25:ca1b1098c77f 467 rc = BUFFER_OVERFLOW;
jasonberry 25:ca1b1098c77f 468 goto exit;
jasonberry 25:ca1b1098c77f 469 }
jasonberry 25:ca1b1098c77f 470
jasonberry 25:ca1b1098c77f 471 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
jasonberry 25:ca1b1098c77f 472 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
jasonberry 25:ca1b1098c77f 473 goto exit;
jasonberry 25:ca1b1098c77f 474
jasonberry 25:ca1b1098c77f 475 header.byte = readbuf[0];
jasonberry 25:ca1b1098c77f 476 rc = header.bits.type;
jasonberry 25:ca1b1098c77f 477 if (this->keepAliveInterval > 0)
jasonberry 25:ca1b1098c77f 478 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
jasonberry 25:ca1b1098c77f 479 exit:
jasonberry 25:ca1b1098c77f 480
jasonberry 25:ca1b1098c77f 481 #if defined(MQTT_DEBUG)
jasonberry 25:ca1b1098c77f 482 if (rc >= 0)
jasonberry 25:ca1b1098c77f 483 {
jasonberry 25:ca1b1098c77f 484 char printbuf[50];
jasonberry 25:ca1b1098c77f 485 DEBUG("Rc %d receiving packet %s\r\n", rc,
jasonberry 25:ca1b1098c77f 486 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
jasonberry 25:ca1b1098c77f 487 }
jasonberry 25:ca1b1098c77f 488 #endif
jasonberry 25:ca1b1098c77f 489 return rc;
jasonberry 25:ca1b1098c77f 490 }
jasonberry 25:ca1b1098c77f 491
jasonberry 25:ca1b1098c77f 492
jasonberry 25:ca1b1098c77f 493 // assume topic filter and name is in correct format
jasonberry 25:ca1b1098c77f 494 // # can only be at end
jasonberry 25:ca1b1098c77f 495 // + and # can only be next to separator
jasonberry 25:ca1b1098c77f 496 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 497 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
jasonberry 25:ca1b1098c77f 498 {
jasonberry 25:ca1b1098c77f 499 char* curf = topicFilter;
jasonberry 25:ca1b1098c77f 500 char* curn = topicName.lenstring.data;
jasonberry 25:ca1b1098c77f 501 char* curn_end = curn + topicName.lenstring.len;
jasonberry 25:ca1b1098c77f 502
jasonberry 25:ca1b1098c77f 503 while (*curf && curn < curn_end)
jasonberry 25:ca1b1098c77f 504 {
jasonberry 25:ca1b1098c77f 505 if (*curn == '/' && *curf != '/')
jasonberry 25:ca1b1098c77f 506 break;
jasonberry 25:ca1b1098c77f 507 if (*curf != '+' && *curf != '#' && *curf != *curn)
jasonberry 25:ca1b1098c77f 508 break;
jasonberry 25:ca1b1098c77f 509 if (*curf == '+')
jasonberry 25:ca1b1098c77f 510 { // skip until we meet the next separator, or end of string
jasonberry 25:ca1b1098c77f 511 char* nextpos = curn + 1;
jasonberry 25:ca1b1098c77f 512 while (nextpos < curn_end && *nextpos != '/')
jasonberry 25:ca1b1098c77f 513 nextpos = ++curn + 1;
jasonberry 25:ca1b1098c77f 514 }
jasonberry 25:ca1b1098c77f 515 else if (*curf == '#')
jasonberry 25:ca1b1098c77f 516 curn = curn_end - 1; // skip until end of string
jasonberry 25:ca1b1098c77f 517 curf++;
jasonberry 25:ca1b1098c77f 518 curn++;
jasonberry 25:ca1b1098c77f 519 };
jasonberry 25:ca1b1098c77f 520
jasonberry 25:ca1b1098c77f 521 return (curn == curn_end) && (*curf == '\0');
jasonberry 25:ca1b1098c77f 522 }
jasonberry 25:ca1b1098c77f 523
jasonberry 25:ca1b1098c77f 524
jasonberry 25:ca1b1098c77f 525
jasonberry 25:ca1b1098c77f 526 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 527 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
jasonberry 25:ca1b1098c77f 528 {
jasonberry 25:ca1b1098c77f 529 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 530
jasonberry 25:ca1b1098c77f 531 // we have to find the right message handler - indexed by topic
jasonberry 25:ca1b1098c77f 532 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jasonberry 25:ca1b1098c77f 533 {
jasonberry 25:ca1b1098c77f 534 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
jasonberry 25:ca1b1098c77f 535 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
jasonberry 25:ca1b1098c77f 536 {
jasonberry 25:ca1b1098c77f 537 if (messageHandlers[i].fp.attached())
jasonberry 25:ca1b1098c77f 538 {
jasonberry 25:ca1b1098c77f 539 MessageData md(topicName, message);
jasonberry 25:ca1b1098c77f 540 messageHandlers[i].fp(md);
jasonberry 25:ca1b1098c77f 541 rc = SUCCESS;
jasonberry 25:ca1b1098c77f 542 }
jasonberry 25:ca1b1098c77f 543 }
jasonberry 25:ca1b1098c77f 544 }
jasonberry 25:ca1b1098c77f 545
jasonberry 25:ca1b1098c77f 546 if (rc == FAILURE && defaultMessageHandler.attached())
jasonberry 25:ca1b1098c77f 547 {
jasonberry 25:ca1b1098c77f 548 MessageData md(topicName, message);
jasonberry 25:ca1b1098c77f 549 defaultMessageHandler(md);
jasonberry 25:ca1b1098c77f 550 rc = SUCCESS;
jasonberry 25:ca1b1098c77f 551 }
jasonberry 25:ca1b1098c77f 552
jasonberry 25:ca1b1098c77f 553 return rc;
jasonberry 25:ca1b1098c77f 554 }
jasonberry 25:ca1b1098c77f 555
jasonberry 25:ca1b1098c77f 556
jasonberry 25:ca1b1098c77f 557
jasonberry 25:ca1b1098c77f 558 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 559 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
jasonberry 25:ca1b1098c77f 560 {
jasonberry 25:ca1b1098c77f 561 int rc = SUCCESS;
jasonberry 25:ca1b1098c77f 562 Timer timer;
jasonberry 25:ca1b1098c77f 563
jasonberry 25:ca1b1098c77f 564 timer.countdown_ms(timeout_ms);
jasonberry 25:ca1b1098c77f 565 while (!timer.expired())
jasonberry 25:ca1b1098c77f 566 {
jasonberry 25:ca1b1098c77f 567 if (cycle(timer) < 0)
jasonberry 25:ca1b1098c77f 568 {
jasonberry 25:ca1b1098c77f 569 rc = FAILURE;
jasonberry 25:ca1b1098c77f 570 break;
jasonberry 25:ca1b1098c77f 571 }
jasonberry 25:ca1b1098c77f 572 }
jasonberry 25:ca1b1098c77f 573
jasonberry 25:ca1b1098c77f 574 return rc;
jasonberry 25:ca1b1098c77f 575 }
jasonberry 25:ca1b1098c77f 576
jasonberry 25:ca1b1098c77f 577
jasonberry 25:ca1b1098c77f 578 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 579 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
jasonberry 25:ca1b1098c77f 580 {
jasonberry 25:ca1b1098c77f 581 // get one piece of work off the wire and one pass through
jasonberry 25:ca1b1098c77f 582 int len = 0,
jasonberry 25:ca1b1098c77f 583 rc = SUCCESS;
jasonberry 25:ca1b1098c77f 584
jasonberry 25:ca1b1098c77f 585 int packet_type = readPacket(timer); // read the socket, see what work is due
jasonberry 25:ca1b1098c77f 586
jasonberry 25:ca1b1098c77f 587 switch (packet_type)
jasonberry 25:ca1b1098c77f 588 {
jasonberry 25:ca1b1098c77f 589 default:
jasonberry 25:ca1b1098c77f 590 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
jasonberry 25:ca1b1098c77f 591 rc = packet_type;
jasonberry 25:ca1b1098c77f 592 goto exit;
jasonberry 25:ca1b1098c77f 593 case 0: // timed out reading packet
jasonberry 25:ca1b1098c77f 594 break;
jasonberry 25:ca1b1098c77f 595 case CONNACK:
jasonberry 25:ca1b1098c77f 596 case PUBACK:
jasonberry 25:ca1b1098c77f 597 case SUBACK:
jasonberry 25:ca1b1098c77f 598 break;
jasonberry 25:ca1b1098c77f 599 case PUBLISH:
jasonberry 25:ca1b1098c77f 600 {
jasonberry 25:ca1b1098c77f 601 MQTTString topicName = MQTTString_initializer;
jasonberry 25:ca1b1098c77f 602 Message msg;
jasonberry 25:ca1b1098c77f 603 int intQoS;
jasonberry 25:ca1b1098c77f 604 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
jasonberry 25:ca1b1098c77f 605 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
jasonberry 25:ca1b1098c77f 606 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jasonberry 25:ca1b1098c77f 607 goto exit;
jasonberry 25:ca1b1098c77f 608 msg.qos = (enum QoS)intQoS;
jasonberry 25:ca1b1098c77f 609 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 610 if (msg.qos != QOS2)
jasonberry 25:ca1b1098c77f 611 #endif
jasonberry 25:ca1b1098c77f 612 deliverMessage(topicName, msg);
jasonberry 25:ca1b1098c77f 613 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 614 else if (isQoS2msgidFree(msg.id))
jasonberry 25:ca1b1098c77f 615 {
jasonberry 25:ca1b1098c77f 616 if (useQoS2msgid(msg.id))
jasonberry 25:ca1b1098c77f 617 deliverMessage(topicName, msg);
jasonberry 25:ca1b1098c77f 618 else
jasonberry 25:ca1b1098c77f 619 WARN("Maximum number of incoming QoS2 messages exceeded");
jasonberry 25:ca1b1098c77f 620 }
jasonberry 25:ca1b1098c77f 621 #endif
jasonberry 25:ca1b1098c77f 622 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 623 if (msg.qos != QOS0)
jasonberry 25:ca1b1098c77f 624 {
jasonberry 25:ca1b1098c77f 625 if (msg.qos == QOS1)
jasonberry 25:ca1b1098c77f 626 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
jasonberry 25:ca1b1098c77f 627 else if (msg.qos == QOS2)
jasonberry 25:ca1b1098c77f 628 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
jasonberry 25:ca1b1098c77f 629 if (len <= 0)
jasonberry 25:ca1b1098c77f 630 rc = FAILURE;
jasonberry 25:ca1b1098c77f 631 else
jasonberry 25:ca1b1098c77f 632 rc = sendPacket(len, timer);
jasonberry 25:ca1b1098c77f 633 if (rc == FAILURE)
jasonberry 25:ca1b1098c77f 634 goto exit; // there was a problem
jasonberry 25:ca1b1098c77f 635 }
jasonberry 25:ca1b1098c77f 636 break;
jasonberry 25:ca1b1098c77f 637 #endif
jasonberry 25:ca1b1098c77f 638 }
jasonberry 25:ca1b1098c77f 639 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 640 case PUBREC:
jasonberry 25:ca1b1098c77f 641 case PUBREL:
jasonberry 25:ca1b1098c77f 642 unsigned short mypacketid;
jasonberry 25:ca1b1098c77f 643 unsigned char dup, type;
jasonberry 25:ca1b1098c77f 644 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jasonberry 25:ca1b1098c77f 645 rc = FAILURE;
jasonberry 25:ca1b1098c77f 646 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
jasonberry 25:ca1b1098c77f 647 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
jasonberry 25:ca1b1098c77f 648 rc = FAILURE;
jasonberry 25:ca1b1098c77f 649 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
jasonberry 25:ca1b1098c77f 650 rc = FAILURE; // there was a problem
jasonberry 25:ca1b1098c77f 651 if (rc == FAILURE)
jasonberry 25:ca1b1098c77f 652 goto exit; // there was a problem
jasonberry 25:ca1b1098c77f 653 if (packet_type == PUBREL)
jasonberry 25:ca1b1098c77f 654 freeQoS2msgid(mypacketid);
jasonberry 25:ca1b1098c77f 655 break;
jasonberry 25:ca1b1098c77f 656
jasonberry 25:ca1b1098c77f 657 case PUBCOMP:
jasonberry 25:ca1b1098c77f 658 break;
jasonberry 25:ca1b1098c77f 659 #endif
jasonberry 25:ca1b1098c77f 660 case PINGRESP:
jasonberry 25:ca1b1098c77f 661 ping_outstanding = false;
jasonberry 25:ca1b1098c77f 662 break;
jasonberry 25:ca1b1098c77f 663 }
jasonberry 25:ca1b1098c77f 664
jasonberry 25:ca1b1098c77f 665 if (keepalive() != SUCCESS)
jasonberry 25:ca1b1098c77f 666 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
jasonberry 25:ca1b1098c77f 667 rc = FAILURE;
jasonberry 25:ca1b1098c77f 668
jasonberry 25:ca1b1098c77f 669 exit:
jasonberry 25:ca1b1098c77f 670 if (rc == SUCCESS)
jasonberry 25:ca1b1098c77f 671 rc = packet_type;
jasonberry 25:ca1b1098c77f 672 else if (isconnected)
jasonberry 25:ca1b1098c77f 673 closeSession();
jasonberry 25:ca1b1098c77f 674 return rc;
jasonberry 25:ca1b1098c77f 675 }
jasonberry 25:ca1b1098c77f 676
jasonberry 25:ca1b1098c77f 677
jasonberry 25:ca1b1098c77f 678 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 679 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
jasonberry 25:ca1b1098c77f 680 {
jasonberry 25:ca1b1098c77f 681 int rc = SUCCESS;
jasonberry 25:ca1b1098c77f 682 static Timer ping_sent;
jasonberry 25:ca1b1098c77f 683
jasonberry 25:ca1b1098c77f 684 if (keepAliveInterval == 0)
jasonberry 25:ca1b1098c77f 685 goto exit;
jasonberry 25:ca1b1098c77f 686
jasonberry 25:ca1b1098c77f 687 if (ping_outstanding)
jasonberry 25:ca1b1098c77f 688 {
jasonberry 25:ca1b1098c77f 689 if (ping_sent.expired())
jasonberry 25:ca1b1098c77f 690 {
jasonberry 25:ca1b1098c77f 691 rc = FAILURE; // session failure
jasonberry 25:ca1b1098c77f 692 #if defined(MQTT_DEBUG)
jasonberry 25:ca1b1098c77f 693 DEBUG("PINGRESP not received in keepalive interval\r\n");
jasonberry 25:ca1b1098c77f 694 #endif
jasonberry 25:ca1b1098c77f 695 }
jasonberry 25:ca1b1098c77f 696 }
jasonberry 25:ca1b1098c77f 697 else if (last_sent.expired() || last_received.expired())
jasonberry 25:ca1b1098c77f 698 {
jasonberry 25:ca1b1098c77f 699 Timer timer(1000);
jasonberry 25:ca1b1098c77f 700 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
jasonberry 25:ca1b1098c77f 701 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
jasonberry 25:ca1b1098c77f 702 {
jasonberry 25:ca1b1098c77f 703 ping_outstanding = true;
jasonberry 25:ca1b1098c77f 704 ping_sent.countdown(this->keepAliveInterval);
jasonberry 25:ca1b1098c77f 705 }
jasonberry 25:ca1b1098c77f 706 }
jasonberry 25:ca1b1098c77f 707 exit:
jasonberry 25:ca1b1098c77f 708 return rc;
jasonberry 25:ca1b1098c77f 709 }
jasonberry 25:ca1b1098c77f 710
jasonberry 25:ca1b1098c77f 711
jasonberry 25:ca1b1098c77f 712 // only used in single-threaded mode where one command at a time is in process
jasonberry 25:ca1b1098c77f 713 template<class Network, class Timer, int a, int b>
jasonberry 25:ca1b1098c77f 714 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
jasonberry 25:ca1b1098c77f 715 {
jasonberry 25:ca1b1098c77f 716 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 717
jasonberry 25:ca1b1098c77f 718 do
jasonberry 25:ca1b1098c77f 719 {
jasonberry 25:ca1b1098c77f 720 if (timer.expired())
jasonberry 25:ca1b1098c77f 721 break; // we timed out
jasonberry 25:ca1b1098c77f 722 rc = cycle(timer);
jasonberry 25:ca1b1098c77f 723 }
jasonberry 25:ca1b1098c77f 724 while (rc != packet_type && rc >= 0);
jasonberry 25:ca1b1098c77f 725
jasonberry 25:ca1b1098c77f 726 return rc;
jasonberry 25:ca1b1098c77f 727 }
jasonberry 25:ca1b1098c77f 728
jasonberry 25:ca1b1098c77f 729
jasonberry 25:ca1b1098c77f 730 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 731 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
jasonberry 25:ca1b1098c77f 732 {
jasonberry 25:ca1b1098c77f 733 Timer connect_timer(command_timeout_ms);
jasonberry 25:ca1b1098c77f 734 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 735 int len = 0;
jasonberry 25:ca1b1098c77f 736
jasonberry 25:ca1b1098c77f 737 if (isconnected) // don't send connect packet again if we are already connected
jasonberry 25:ca1b1098c77f 738 goto exit;
jasonberry 25:ca1b1098c77f 739
jasonberry 25:ca1b1098c77f 740 this->keepAliveInterval = options.keepAliveInterval;
jasonberry 25:ca1b1098c77f 741 this->cleansession = options.cleansession;
jasonberry 25:ca1b1098c77f 742 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
jasonberry 25:ca1b1098c77f 743 goto exit;
jasonberry 25:ca1b1098c77f 744 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
jasonberry 25:ca1b1098c77f 745 goto exit; // there was a problem
jasonberry 25:ca1b1098c77f 746
jasonberry 25:ca1b1098c77f 747 if (this->keepAliveInterval > 0)
jasonberry 25:ca1b1098c77f 748 last_received.countdown(this->keepAliveInterval);
jasonberry 25:ca1b1098c77f 749 // this will be a blocking call, wait for the connack
jasonberry 25:ca1b1098c77f 750 if (waitfor(CONNACK, connect_timer) == CONNACK)
jasonberry 25:ca1b1098c77f 751 {
jasonberry 25:ca1b1098c77f 752 data.rc = 0;
jasonberry 25:ca1b1098c77f 753 data.sessionPresent = false;
jasonberry 25:ca1b1098c77f 754 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
jasonberry 25:ca1b1098c77f 755 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jasonberry 25:ca1b1098c77f 756 rc = data.rc;
jasonberry 25:ca1b1098c77f 757 else
jasonberry 25:ca1b1098c77f 758 rc = FAILURE;
jasonberry 25:ca1b1098c77f 759 }
jasonberry 25:ca1b1098c77f 760 else
jasonberry 25:ca1b1098c77f 761 rc = FAILURE;
jasonberry 25:ca1b1098c77f 762
jasonberry 25:ca1b1098c77f 763 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 764 // resend any inflight publish
jasonberry 25:ca1b1098c77f 765 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
jasonberry 25:ca1b1098c77f 766 {
jasonberry 25:ca1b1098c77f 767 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
jasonberry 25:ca1b1098c77f 768 rc = FAILURE;
jasonberry 25:ca1b1098c77f 769 else
jasonberry 25:ca1b1098c77f 770 rc = publish(len, connect_timer, inflightQoS);
jasonberry 25:ca1b1098c77f 771 }
jasonberry 25:ca1b1098c77f 772 else
jasonberry 25:ca1b1098c77f 773 #endif
jasonberry 25:ca1b1098c77f 774 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 775 if (inflightMsgid > 0)
jasonberry 25:ca1b1098c77f 776 {
jasonberry 25:ca1b1098c77f 777 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
jasonberry 25:ca1b1098c77f 778 rc = publish(inflightLen, connect_timer, inflightQoS);
jasonberry 25:ca1b1098c77f 779 }
jasonberry 25:ca1b1098c77f 780 #endif
jasonberry 25:ca1b1098c77f 781
jasonberry 25:ca1b1098c77f 782 exit:
jasonberry 25:ca1b1098c77f 783 if (rc == SUCCESS)
jasonberry 25:ca1b1098c77f 784 {
jasonberry 25:ca1b1098c77f 785 isconnected = true;
jasonberry 25:ca1b1098c77f 786 ping_outstanding = false;
jasonberry 25:ca1b1098c77f 787 }
jasonberry 25:ca1b1098c77f 788 return rc;
jasonberry 25:ca1b1098c77f 789 }
jasonberry 25:ca1b1098c77f 790
jasonberry 25:ca1b1098c77f 791
jasonberry 25:ca1b1098c77f 792 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 793 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
jasonberry 25:ca1b1098c77f 794 {
jasonberry 25:ca1b1098c77f 795 connackData data;
jasonberry 25:ca1b1098c77f 796 return connect(options, data);
jasonberry 25:ca1b1098c77f 797 }
jasonberry 25:ca1b1098c77f 798
jasonberry 25:ca1b1098c77f 799
jasonberry 25:ca1b1098c77f 800 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 801 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
jasonberry 25:ca1b1098c77f 802 {
jasonberry 25:ca1b1098c77f 803 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
jasonberry 25:ca1b1098c77f 804 return connect(default_options);
jasonberry 25:ca1b1098c77f 805 }
jasonberry 25:ca1b1098c77f 806
jasonberry 25:ca1b1098c77f 807
jasonberry 25:ca1b1098c77f 808 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 809 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
jasonberry 25:ca1b1098c77f 810 {
jasonberry 25:ca1b1098c77f 811 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 812 int i = -1;
jasonberry 25:ca1b1098c77f 813
jasonberry 25:ca1b1098c77f 814 // first check for an existing matching slot
jasonberry 25:ca1b1098c77f 815 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jasonberry 25:ca1b1098c77f 816 {
jasonberry 25:ca1b1098c77f 817 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
jasonberry 25:ca1b1098c77f 818 {
jasonberry 25:ca1b1098c77f 819 if (messageHandler == 0) // remove existing
jasonberry 25:ca1b1098c77f 820 {
jasonberry 25:ca1b1098c77f 821 messageHandlers[i].topicFilter = 0;
jasonberry 25:ca1b1098c77f 822 messageHandlers[i].fp.detach();
jasonberry 25:ca1b1098c77f 823 }
jasonberry 25:ca1b1098c77f 824 rc = SUCCESS; // return i when adding new subscription
jasonberry 25:ca1b1098c77f 825 break;
jasonberry 25:ca1b1098c77f 826 }
jasonberry 25:ca1b1098c77f 827 }
jasonberry 25:ca1b1098c77f 828 // if no existing, look for empty slot (unless we are removing)
jasonberry 25:ca1b1098c77f 829 if (messageHandler != 0) {
jasonberry 25:ca1b1098c77f 830 if (rc == FAILURE)
jasonberry 25:ca1b1098c77f 831 {
jasonberry 25:ca1b1098c77f 832 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jasonberry 25:ca1b1098c77f 833 {
jasonberry 25:ca1b1098c77f 834 if (messageHandlers[i].topicFilter == 0)
jasonberry 25:ca1b1098c77f 835 {
jasonberry 25:ca1b1098c77f 836 rc = SUCCESS;
jasonberry 25:ca1b1098c77f 837 break;
jasonberry 25:ca1b1098c77f 838 }
jasonberry 25:ca1b1098c77f 839 }
jasonberry 25:ca1b1098c77f 840 }
jasonberry 25:ca1b1098c77f 841 if (i < MAX_MESSAGE_HANDLERS)
jasonberry 25:ca1b1098c77f 842 {
jasonberry 25:ca1b1098c77f 843 messageHandlers[i].topicFilter = topicFilter;
jasonberry 25:ca1b1098c77f 844 messageHandlers[i].fp.attach(messageHandler);
jasonberry 25:ca1b1098c77f 845 }
jasonberry 25:ca1b1098c77f 846 }
jasonberry 25:ca1b1098c77f 847 return rc;
jasonberry 25:ca1b1098c77f 848 }
jasonberry 25:ca1b1098c77f 849
jasonberry 25:ca1b1098c77f 850
jasonberry 25:ca1b1098c77f 851 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 852 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
jasonberry 25:ca1b1098c77f 853 enum QoS qos, messageHandler messageHandler, subackData& data)
jasonberry 25:ca1b1098c77f 854 {
jasonberry 25:ca1b1098c77f 855 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 856 Timer timer(command_timeout_ms);
jasonberry 25:ca1b1098c77f 857 int len = 0;
jasonberry 25:ca1b1098c77f 858 MQTTString topic = {(char*)topicFilter, {0, 0}};
jasonberry 25:ca1b1098c77f 859
jasonberry 25:ca1b1098c77f 860 if (!isconnected)
jasonberry 25:ca1b1098c77f 861 goto exit;
jasonberry 25:ca1b1098c77f 862
jasonberry 25:ca1b1098c77f 863 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
jasonberry 25:ca1b1098c77f 864 if (len <= 0)
jasonberry 25:ca1b1098c77f 865 goto exit;
jasonberry 25:ca1b1098c77f 866 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
jasonberry 25:ca1b1098c77f 867 goto exit; // there was a problem
jasonberry 25:ca1b1098c77f 868
jasonberry 25:ca1b1098c77f 869 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
jasonberry 25:ca1b1098c77f 870 {
jasonberry 25:ca1b1098c77f 871 int count = 0;
jasonberry 25:ca1b1098c77f 872 unsigned short mypacketid;
jasonberry 25:ca1b1098c77f 873 data.grantedQoS = 0;
jasonberry 25:ca1b1098c77f 874 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jasonberry 25:ca1b1098c77f 875 {
jasonberry 25:ca1b1098c77f 876 if (data.grantedQoS != 0x80)
jasonberry 25:ca1b1098c77f 877 rc = setMessageHandler(topicFilter, messageHandler);
jasonberry 25:ca1b1098c77f 878 }
jasonberry 25:ca1b1098c77f 879 }
jasonberry 25:ca1b1098c77f 880 else
jasonberry 25:ca1b1098c77f 881 rc = FAILURE;
jasonberry 25:ca1b1098c77f 882
jasonberry 25:ca1b1098c77f 883 exit:
jasonberry 25:ca1b1098c77f 884 if (rc == FAILURE)
jasonberry 25:ca1b1098c77f 885 closeSession();
jasonberry 25:ca1b1098c77f 886 return rc;
jasonberry 25:ca1b1098c77f 887 }
jasonberry 25:ca1b1098c77f 888
jasonberry 25:ca1b1098c77f 889
jasonberry 25:ca1b1098c77f 890 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 891 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
jasonberry 25:ca1b1098c77f 892 {
jasonberry 25:ca1b1098c77f 893 subackData data;
jasonberry 25:ca1b1098c77f 894 return subscribe(topicFilter, qos, messageHandler, data);
jasonberry 25:ca1b1098c77f 895 }
jasonberry 25:ca1b1098c77f 896
jasonberry 25:ca1b1098c77f 897
jasonberry 25:ca1b1098c77f 898 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jasonberry 25:ca1b1098c77f 899 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
jasonberry 25:ca1b1098c77f 900 {
jasonberry 25:ca1b1098c77f 901 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 902 Timer timer(command_timeout_ms);
jasonberry 25:ca1b1098c77f 903 MQTTString topic = {(char*)topicFilter, {0, 0}};
jasonberry 25:ca1b1098c77f 904 int len = 0;
jasonberry 25:ca1b1098c77f 905
jasonberry 25:ca1b1098c77f 906 if (!isconnected)
jasonberry 25:ca1b1098c77f 907 goto exit;
jasonberry 25:ca1b1098c77f 908
jasonberry 25:ca1b1098c77f 909 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
jasonberry 25:ca1b1098c77f 910 goto exit;
jasonberry 25:ca1b1098c77f 911 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
jasonberry 25:ca1b1098c77f 912 goto exit; // there was a problem
jasonberry 25:ca1b1098c77f 913
jasonberry 25:ca1b1098c77f 914 if (waitfor(UNSUBACK, timer) == UNSUBACK)
jasonberry 25:ca1b1098c77f 915 {
jasonberry 25:ca1b1098c77f 916 unsigned short mypacketid; // should be the same as the packetid above
jasonberry 25:ca1b1098c77f 917 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jasonberry 25:ca1b1098c77f 918 {
jasonberry 25:ca1b1098c77f 919 // remove the subscription message handler associated with this topic, if there is one
jasonberry 25:ca1b1098c77f 920 setMessageHandler(topicFilter, 0);
jasonberry 25:ca1b1098c77f 921 }
jasonberry 25:ca1b1098c77f 922 }
jasonberry 25:ca1b1098c77f 923 else
jasonberry 25:ca1b1098c77f 924 rc = FAILURE;
jasonberry 25:ca1b1098c77f 925
jasonberry 25:ca1b1098c77f 926 exit:
jasonberry 25:ca1b1098c77f 927 if (rc != SUCCESS)
jasonberry 25:ca1b1098c77f 928 closeSession();
jasonberry 25:ca1b1098c77f 929 return rc;
jasonberry 25:ca1b1098c77f 930 }
jasonberry 25:ca1b1098c77f 931
jasonberry 25:ca1b1098c77f 932
jasonberry 25:ca1b1098c77f 933 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 934 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
jasonberry 25:ca1b1098c77f 935 {
jasonberry 25:ca1b1098c77f 936 int rc;
jasonberry 25:ca1b1098c77f 937
jasonberry 25:ca1b1098c77f 938 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
jasonberry 25:ca1b1098c77f 939 goto exit; // there was a problem
jasonberry 25:ca1b1098c77f 940
jasonberry 25:ca1b1098c77f 941 #if MQTTCLIENT_QOS1
jasonberry 25:ca1b1098c77f 942 if (qos == QOS1)
jasonberry 25:ca1b1098c77f 943 {
jasonberry 25:ca1b1098c77f 944 if (waitfor(PUBACK, timer) == PUBACK)
jasonberry 25:ca1b1098c77f 945 {
jasonberry 25:ca1b1098c77f 946 unsigned short mypacketid;
jasonberry 25:ca1b1098c77f 947 unsigned char dup, type;
jasonberry 25:ca1b1098c77f 948 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jasonberry 25:ca1b1098c77f 949 rc = FAILURE;
jasonberry 25:ca1b1098c77f 950 else if (inflightMsgid == mypacketid)
jasonberry 25:ca1b1098c77f 951 inflightMsgid = 0;
jasonberry 25:ca1b1098c77f 952 }
jasonberry 25:ca1b1098c77f 953 else
jasonberry 25:ca1b1098c77f 954 rc = FAILURE;
jasonberry 25:ca1b1098c77f 955 }
jasonberry 25:ca1b1098c77f 956 #endif
jasonberry 25:ca1b1098c77f 957 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 958 else if (qos == QOS2)
jasonberry 25:ca1b1098c77f 959 {
jasonberry 25:ca1b1098c77f 960 if (waitfor(PUBCOMP, timer) == PUBCOMP)
jasonberry 25:ca1b1098c77f 961 {
jasonberry 25:ca1b1098c77f 962 unsigned short mypacketid;
jasonberry 25:ca1b1098c77f 963 unsigned char dup, type;
jasonberry 25:ca1b1098c77f 964 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jasonberry 25:ca1b1098c77f 965 rc = FAILURE;
jasonberry 25:ca1b1098c77f 966 else if (inflightMsgid == mypacketid)
jasonberry 25:ca1b1098c77f 967 inflightMsgid = 0;
jasonberry 25:ca1b1098c77f 968 }
jasonberry 25:ca1b1098c77f 969 else
jasonberry 25:ca1b1098c77f 970 rc = FAILURE;
jasonberry 25:ca1b1098c77f 971 }
jasonberry 25:ca1b1098c77f 972 #endif
jasonberry 25:ca1b1098c77f 973
jasonberry 25:ca1b1098c77f 974 exit:
jasonberry 25:ca1b1098c77f 975 if (rc != SUCCESS)
jasonberry 25:ca1b1098c77f 976 closeSession();
jasonberry 25:ca1b1098c77f 977 return rc;
jasonberry 25:ca1b1098c77f 978 }
jasonberry 25:ca1b1098c77f 979
jasonberry 25:ca1b1098c77f 980
jasonberry 25:ca1b1098c77f 981
jasonberry 25:ca1b1098c77f 982 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 983 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
jasonberry 25:ca1b1098c77f 984 {
jasonberry 25:ca1b1098c77f 985 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 986 Timer timer(command_timeout_ms);
jasonberry 25:ca1b1098c77f 987 MQTTString topicString = MQTTString_initializer;
jasonberry 25:ca1b1098c77f 988 int len = 0;
jasonberry 25:ca1b1098c77f 989
jasonberry 25:ca1b1098c77f 990 if (!isconnected)
jasonberry 25:ca1b1098c77f 991 goto exit;
jasonberry 25:ca1b1098c77f 992
jasonberry 25:ca1b1098c77f 993 topicString.cstring = (char*)topicName;
jasonberry 25:ca1b1098c77f 994
jasonberry 25:ca1b1098c77f 995 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 996 if (qos == QOS1 || qos == QOS2)
jasonberry 25:ca1b1098c77f 997 id = packetid.getNext();
jasonberry 25:ca1b1098c77f 998 #endif
jasonberry 25:ca1b1098c77f 999
jasonberry 25:ca1b1098c77f 1000 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
jasonberry 25:ca1b1098c77f 1001 topicString, (unsigned char*)payload, payloadlen);
jasonberry 25:ca1b1098c77f 1002 if (len <= 0)
jasonberry 25:ca1b1098c77f 1003 goto exit;
jasonberry 25:ca1b1098c77f 1004
jasonberry 25:ca1b1098c77f 1005 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 1006 if (!cleansession)
jasonberry 25:ca1b1098c77f 1007 {
jasonberry 25:ca1b1098c77f 1008 memcpy(pubbuf, sendbuf, len);
jasonberry 25:ca1b1098c77f 1009 inflightMsgid = id;
jasonberry 25:ca1b1098c77f 1010 inflightLen = len;
jasonberry 25:ca1b1098c77f 1011 inflightQoS = qos;
jasonberry 25:ca1b1098c77f 1012 #if MQTTCLIENT_QOS2
jasonberry 25:ca1b1098c77f 1013 pubrel = false;
jasonberry 25:ca1b1098c77f 1014 #endif
jasonberry 25:ca1b1098c77f 1015 }
jasonberry 25:ca1b1098c77f 1016 #endif
jasonberry 25:ca1b1098c77f 1017
jasonberry 25:ca1b1098c77f 1018 rc = publish(len, timer, qos);
jasonberry 25:ca1b1098c77f 1019 exit:
jasonberry 25:ca1b1098c77f 1020 return rc;
jasonberry 25:ca1b1098c77f 1021 }
jasonberry 25:ca1b1098c77f 1022
jasonberry 25:ca1b1098c77f 1023
jasonberry 25:ca1b1098c77f 1024 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 1025 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
jasonberry 25:ca1b1098c77f 1026 {
jasonberry 25:ca1b1098c77f 1027 unsigned short id = 0; // dummy - not used for anything
jasonberry 25:ca1b1098c77f 1028 return publish(topicName, payload, payloadlen, id, qos, retained);
jasonberry 25:ca1b1098c77f 1029 }
jasonberry 25:ca1b1098c77f 1030
jasonberry 25:ca1b1098c77f 1031
jasonberry 25:ca1b1098c77f 1032 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 1033 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
jasonberry 25:ca1b1098c77f 1034 {
jasonberry 25:ca1b1098c77f 1035 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
jasonberry 25:ca1b1098c77f 1036 }
jasonberry 25:ca1b1098c77f 1037
jasonberry 25:ca1b1098c77f 1038
jasonberry 25:ca1b1098c77f 1039 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jasonberry 25:ca1b1098c77f 1040 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
jasonberry 25:ca1b1098c77f 1041 {
jasonberry 25:ca1b1098c77f 1042 int rc = FAILURE;
jasonberry 25:ca1b1098c77f 1043 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
jasonberry 25:ca1b1098c77f 1044 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
jasonberry 25:ca1b1098c77f 1045 if (len > 0)
jasonberry 25:ca1b1098c77f 1046 rc = sendPacket(len, timer); // send the disconnect packet
jasonberry 25:ca1b1098c77f 1047 closeSession();
jasonberry 25:ca1b1098c77f 1048 return rc;
jasonberry 25:ca1b1098c77f 1049 }
jasonberry 25:ca1b1098c77f 1050
jasonberry 25:ca1b1098c77f 1051 #endif