Version to make it easier to reuse without source modifications

Committer:
JMF
Date:
Tue Mar 27 17:26:35 2018 +0000
Revision:
0:5cd4781e0c88
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:5cd4781e0c88 1 /*******************************************************************************
JMF 0:5cd4781e0c88 2 * Copyright (c) 2014, 2017 IBM Corp.
JMF 0:5cd4781e0c88 3 *
JMF 0:5cd4781e0c88 4 * All rights reserved. This program and the accompanying materials
JMF 0:5cd4781e0c88 5 * are made available under the terms of the Eclipse Public License v1.0
JMF 0:5cd4781e0c88 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
JMF 0:5cd4781e0c88 7 *
JMF 0:5cd4781e0c88 8 * The Eclipse Public License is available at
JMF 0:5cd4781e0c88 9 * http://www.eclipse.org/legal/epl-v10.html
JMF 0:5cd4781e0c88 10 * and the Eclipse Distribution License is available at
JMF 0:5cd4781e0c88 11 * http://www.eclipse.org/org/documents/edl-v10.php.
JMF 0:5cd4781e0c88 12 *
JMF 0:5cd4781e0c88 13 * Contributors:
JMF 0:5cd4781e0c88 14 * Ian Craggs - initial API and implementation and/or initial documentation
JMF 0:5cd4781e0c88 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
JMF 0:5cd4781e0c88 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
JMF 0:5cd4781e0c88 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
JMF 0:5cd4781e0c88 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
JMF 0:5cd4781e0c88 19 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
JMF 0:5cd4781e0c88 20 * Ian Craggs - fix for bug 475749 - packetid modified twice
JMF 0:5cd4781e0c88 21 * Ian Craggs - add ability to set message handler separately #6
JMF 0:5cd4781e0c88 22 *******************************************************************************/
JMF 0:5cd4781e0c88 23
JMF 0:5cd4781e0c88 24 #if !defined(MQTTCLIENT_H)
JMF 0:5cd4781e0c88 25 #define MQTTCLIENT_H
JMF 0:5cd4781e0c88 26
JMF 0:5cd4781e0c88 27 #include "MQTTPacket.h"
JMF 0:5cd4781e0c88 28 #include <stdio.h>
JMF 0:5cd4781e0c88 29 #include "MQTTLogging.h"
JMF 0:5cd4781e0c88 30
JMF 0:5cd4781e0c88 31 #if !defined(MQTTCLIENT_QOS1)
JMF 0:5cd4781e0c88 32 #define MQTTCLIENT_QOS1 1
JMF 0:5cd4781e0c88 33 #endif
JMF 0:5cd4781e0c88 34 #if !defined(MQTTCLIENT_QOS2)
JMF 0:5cd4781e0c88 35 #define MQTTCLIENT_QOS2 0
JMF 0:5cd4781e0c88 36 #endif
JMF 0:5cd4781e0c88 37
JMF 0:5cd4781e0c88 38 namespace MQTT
JMF 0:5cd4781e0c88 39 {
JMF 0:5cd4781e0c88 40
JMF 0:5cd4781e0c88 41
JMF 0:5cd4781e0c88 42 enum QoS { QOS0, QOS1, QOS2 };
JMF 0:5cd4781e0c88 43
JMF 0:5cd4781e0c88 44 // all failure return codes must be negative
JMF 0:5cd4781e0c88 45 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
JMF 0:5cd4781e0c88 46
JMF 0:5cd4781e0c88 47
JMF 0:5cd4781e0c88 48 struct Message
JMF 0:5cd4781e0c88 49 {
JMF 0:5cd4781e0c88 50 enum QoS qos;
JMF 0:5cd4781e0c88 51 bool retained;
JMF 0:5cd4781e0c88 52 bool dup;
JMF 0:5cd4781e0c88 53 unsigned short id;
JMF 0:5cd4781e0c88 54 void *payload;
JMF 0:5cd4781e0c88 55 size_t payloadlen;
JMF 0:5cd4781e0c88 56 };
JMF 0:5cd4781e0c88 57
JMF 0:5cd4781e0c88 58
JMF 0:5cd4781e0c88 59 struct MessageData
JMF 0:5cd4781e0c88 60 {
JMF 0:5cd4781e0c88 61 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
JMF 0:5cd4781e0c88 62 { }
JMF 0:5cd4781e0c88 63
JMF 0:5cd4781e0c88 64 struct Message &message;
JMF 0:5cd4781e0c88 65 MQTTString &topicName;
JMF 0:5cd4781e0c88 66 };
JMF 0:5cd4781e0c88 67
JMF 0:5cd4781e0c88 68
JMF 0:5cd4781e0c88 69 struct connackData
JMF 0:5cd4781e0c88 70 {
JMF 0:5cd4781e0c88 71 int rc;
JMF 0:5cd4781e0c88 72 bool sessionPresent;
JMF 0:5cd4781e0c88 73 };
JMF 0:5cd4781e0c88 74
JMF 0:5cd4781e0c88 75
JMF 0:5cd4781e0c88 76 struct subackData
JMF 0:5cd4781e0c88 77 {
JMF 0:5cd4781e0c88 78 int grantedQoS;
JMF 0:5cd4781e0c88 79 };
JMF 0:5cd4781e0c88 80
JMF 0:5cd4781e0c88 81
JMF 0:5cd4781e0c88 82 class PacketId
JMF 0:5cd4781e0c88 83 {
JMF 0:5cd4781e0c88 84 public:
JMF 0:5cd4781e0c88 85 PacketId()
JMF 0:5cd4781e0c88 86 {
JMF 0:5cd4781e0c88 87 next = 0;
JMF 0:5cd4781e0c88 88 }
JMF 0:5cd4781e0c88 89
JMF 0:5cd4781e0c88 90 int getNext()
JMF 0:5cd4781e0c88 91 {
JMF 0:5cd4781e0c88 92 return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
JMF 0:5cd4781e0c88 93 }
JMF 0:5cd4781e0c88 94
JMF 0:5cd4781e0c88 95 private:
JMF 0:5cd4781e0c88 96 static const int MAX_PACKET_ID = 65535;
JMF 0:5cd4781e0c88 97 int next;
JMF 0:5cd4781e0c88 98 };
JMF 0:5cd4781e0c88 99
JMF 0:5cd4781e0c88 100
JMF 0:5cd4781e0c88 101 /**
JMF 0:5cd4781e0c88 102 * @class Client
JMF 0:5cd4781e0c88 103 * @brief blocking, non-threaded MQTT client API
JMF 0:5cd4781e0c88 104 *
JMF 0:5cd4781e0c88 105 * This version of the API blocks on all method calls, until they are complete. This means that only one
JMF 0:5cd4781e0c88 106 * MQTT request can be in process at any one time.
JMF 0:5cd4781e0c88 107 * @param Network a network class which supports send, receive
JMF 0:5cd4781e0c88 108 * @param Timer a timer class with the methods:
JMF 0:5cd4781e0c88 109 */
JMF 0:5cd4781e0c88 110 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
JMF 0:5cd4781e0c88 111 class Client
JMF 0:5cd4781e0c88 112 {
JMF 0:5cd4781e0c88 113
JMF 0:5cd4781e0c88 114 public:
JMF 0:5cd4781e0c88 115
JMF 0:5cd4781e0c88 116 typedef void (*messageHandler)(MessageData&);
JMF 0:5cd4781e0c88 117
JMF 0:5cd4781e0c88 118 /** Construct the client
JMF 0:5cd4781e0c88 119 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
JMF 0:5cd4781e0c88 120 * before calling MQTT connect
JMF 0:5cd4781e0c88 121 * @param limits an instance of the Limit class - to alter limits as required
JMF 0:5cd4781e0c88 122 */
JMF 0:5cd4781e0c88 123 Client(Network& network, unsigned int command_timeout_ms = 30000);
JMF 0:5cd4781e0c88 124
JMF 0:5cd4781e0c88 125 /** Set the default message handling callback - used for any message which does not match a subscription message handler
JMF 0:5cd4781e0c88 126 * @param mh - pointer to the callback function. Set to 0 to remove.
JMF 0:5cd4781e0c88 127 */
JMF 0:5cd4781e0c88 128 void setDefaultMessageHandler(messageHandler mh)
JMF 0:5cd4781e0c88 129 {
JMF 0:5cd4781e0c88 130 if (mh != 0)
JMF 0:5cd4781e0c88 131 defaultMessageHandler=mh;
JMF 0:5cd4781e0c88 132 else
JMF 0:5cd4781e0c88 133 defaultMessageHandler=NULL;
JMF 0:5cd4781e0c88 134 }
JMF 0:5cd4781e0c88 135
JMF 0:5cd4781e0c88 136 /** Set a message handling callback. This can be used outside of the the subscribe method.
JMF 0:5cd4781e0c88 137 * @param topicFilter - a topic pattern which can include wildcards
JMF 0:5cd4781e0c88 138 * @param mh - pointer to the callback function. If 0, removes the callback if any
JMF 0:5cd4781e0c88 139 */
JMF 0:5cd4781e0c88 140 int setMessageHandler(const char* topicFilter, messageHandler mh);
JMF 0:5cd4781e0c88 141
JMF 0:5cd4781e0c88 142 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
JMF 0:5cd4781e0c88 143 * The nework object must be connected to the network endpoint before calling this
JMF 0:5cd4781e0c88 144 * Default connect options are used
JMF 0:5cd4781e0c88 145 * @return success code -
JMF 0:5cd4781e0c88 146 */
JMF 0:5cd4781e0c88 147 int connect();
JMF 0:5cd4781e0c88 148
JMF 0:5cd4781e0c88 149 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
JMF 0:5cd4781e0c88 150 * The nework object must be connected to the network endpoint before calling this
JMF 0:5cd4781e0c88 151 * @param options - connect options
JMF 0:5cd4781e0c88 152 * @return success code -
JMF 0:5cd4781e0c88 153 */
JMF 0:5cd4781e0c88 154 int connect(MQTTPacket_connectData& options);
JMF 0:5cd4781e0c88 155
JMF 0:5cd4781e0c88 156 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
JMF 0:5cd4781e0c88 157 * The nework object must be connected to the network endpoint before calling this
JMF 0:5cd4781e0c88 158 * @param options - connect options
JMF 0:5cd4781e0c88 159 * @param connackData - connack data to be returned
JMF 0:5cd4781e0c88 160 * @return success code -
JMF 0:5cd4781e0c88 161 */
JMF 0:5cd4781e0c88 162 int connect(MQTTPacket_connectData& options, connackData& data);
JMF 0:5cd4781e0c88 163
JMF 0:5cd4781e0c88 164 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
JMF 0:5cd4781e0c88 165 * @param topic - the topic to publish to
JMF 0:5cd4781e0c88 166 * @param message - the message to send
JMF 0:5cd4781e0c88 167 * @return success code -
JMF 0:5cd4781e0c88 168 */
JMF 0:5cd4781e0c88 169 int publish(const char* topicName, Message& message);
JMF 0:5cd4781e0c88 170
JMF 0:5cd4781e0c88 171 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
JMF 0:5cd4781e0c88 172 * @param topic - the topic to publish to
JMF 0:5cd4781e0c88 173 * @param payload - the data to send
JMF 0:5cd4781e0c88 174 * @param payloadlen - the length of the data
JMF 0:5cd4781e0c88 175 * @param qos - the QoS to send the publish at
JMF 0:5cd4781e0c88 176 * @param retained - whether the message should be retained
JMF 0:5cd4781e0c88 177 * @return success code -
JMF 0:5cd4781e0c88 178 */
JMF 0:5cd4781e0c88 179 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
JMF 0:5cd4781e0c88 180
JMF 0:5cd4781e0c88 181 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
JMF 0:5cd4781e0c88 182 * @param topic - the topic to publish to
JMF 0:5cd4781e0c88 183 * @param payload - the data to send
JMF 0:5cd4781e0c88 184 * @param payloadlen - the length of the data
JMF 0:5cd4781e0c88 185 * @param id - the packet id used - returned
JMF 0:5cd4781e0c88 186 * @param qos - the QoS to send the publish at
JMF 0:5cd4781e0c88 187 * @param retained - whether the message should be retained
JMF 0:5cd4781e0c88 188 * @return success code -
JMF 0:5cd4781e0c88 189 */
JMF 0:5cd4781e0c88 190 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
JMF 0:5cd4781e0c88 191
JMF 0:5cd4781e0c88 192 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
JMF 0:5cd4781e0c88 193 * @param topicFilter - a topic pattern which can include wildcards
JMF 0:5cd4781e0c88 194 * @param qos - the MQTT QoS to subscribe at
JMF 0:5cd4781e0c88 195 * @param mh - the callback function to be invoked when a message is received for this subscription
JMF 0:5cd4781e0c88 196 * @return success code -
JMF 0:5cd4781e0c88 197 */
JMF 0:5cd4781e0c88 198 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
JMF 0:5cd4781e0c88 199
JMF 0:5cd4781e0c88 200 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
JMF 0:5cd4781e0c88 201 * @param topicFilter - a topic pattern which can include wildcards
JMF 0:5cd4781e0c88 202 * @param qos - the MQTT QoS to subscribe at©
JMF 0:5cd4781e0c88 203 * @param mh - the callback function to be invoked when a message is received for this subscription
JMF 0:5cd4781e0c88 204 * @param
JMF 0:5cd4781e0c88 205 * @return success code -
JMF 0:5cd4781e0c88 206 */
JMF 0:5cd4781e0c88 207 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
JMF 0:5cd4781e0c88 208
JMF 0:5cd4781e0c88 209 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
JMF 0:5cd4781e0c88 210 * @param topicFilter - a topic pattern which can include wildcards
JMF 0:5cd4781e0c88 211 * @return success code -
JMF 0:5cd4781e0c88 212 */
JMF 0:5cd4781e0c88 213 int unsubscribe(const char* topicFilter);
JMF 0:5cd4781e0c88 214
JMF 0:5cd4781e0c88 215 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
JMF 0:5cd4781e0c88 216 * @return success code -
JMF 0:5cd4781e0c88 217 */
JMF 0:5cd4781e0c88 218 int disconnect();
JMF 0:5cd4781e0c88 219
JMF 0:5cd4781e0c88 220 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
JMF 0:5cd4781e0c88 221 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
JMF 0:5cd4781e0c88 222 * received.
JMF 0:5cd4781e0c88 223 * @param timeout_ms the time to wait, in milliseconds
JMF 0:5cd4781e0c88 224 * @return success code - on failure, this means the client has disconnected
JMF 0:5cd4781e0c88 225 */
JMF 0:5cd4781e0c88 226 int yield(unsigned long timeout_ms = 1000L);
JMF 0:5cd4781e0c88 227
JMF 0:5cd4781e0c88 228 /** Is the client connected?
JMF 0:5cd4781e0c88 229 * @return flag - is the client connected or not?
JMF 0:5cd4781e0c88 230 */
JMF 0:5cd4781e0c88 231 bool isConnected()
JMF 0:5cd4781e0c88 232 {
JMF 0:5cd4781e0c88 233 return isconnected;
JMF 0:5cd4781e0c88 234 }
JMF 0:5cd4781e0c88 235
JMF 0:5cd4781e0c88 236 private:
JMF 0:5cd4781e0c88 237
JMF 0:5cd4781e0c88 238 void closeSession();
JMF 0:5cd4781e0c88 239 void cleanSession();
JMF 0:5cd4781e0c88 240 int cycle(Timer& timer);
JMF 0:5cd4781e0c88 241 int waitfor(int packet_type, Timer& timer);
JMF 0:5cd4781e0c88 242 int keepalive();
JMF 0:5cd4781e0c88 243 int publish(int len, Timer& timer, enum QoS qos);
JMF 0:5cd4781e0c88 244
JMF 0:5cd4781e0c88 245 int decodePacket(int* value, int timeout);
JMF 0:5cd4781e0c88 246 int readPacket(Timer& timer);
JMF 0:5cd4781e0c88 247 int sendPacket(int length, Timer& timer);
JMF 0:5cd4781e0c88 248 int deliverMessage(MQTTString& topicName, Message& message);
JMF 0:5cd4781e0c88 249 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
JMF 0:5cd4781e0c88 250
JMF 0:5cd4781e0c88 251 Network& ipstack;
JMF 0:5cd4781e0c88 252 unsigned long command_timeout_ms;
JMF 0:5cd4781e0c88 253
JMF 0:5cd4781e0c88 254 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
JMF 0:5cd4781e0c88 255 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
JMF 0:5cd4781e0c88 256
JMF 0:5cd4781e0c88 257 Timer last_sent, last_received;
JMF 0:5cd4781e0c88 258 unsigned int keepAliveInterval;
JMF 0:5cd4781e0c88 259 bool ping_outstanding;
JMF 0:5cd4781e0c88 260 bool cleansession;
JMF 0:5cd4781e0c88 261
JMF 0:5cd4781e0c88 262 PacketId packetid;
JMF 0:5cd4781e0c88 263
JMF 0:5cd4781e0c88 264 struct MessageHandlers
JMF 0:5cd4781e0c88 265 {
JMF 0:5cd4781e0c88 266 const char* topicFilter;
JMF 0:5cd4781e0c88 267 Callback<void( MessageData&)> fp;
JMF 0:5cd4781e0c88 268 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
JMF 0:5cd4781e0c88 269
JMF 0:5cd4781e0c88 270 Callback<void(MessageData&)> defaultMessageHandler;
JMF 0:5cd4781e0c88 271
JMF 0:5cd4781e0c88 272 bool isconnected;
JMF 0:5cd4781e0c88 273
JMF 0:5cd4781e0c88 274 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 275 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
JMF 0:5cd4781e0c88 276 int inflightLen;
JMF 0:5cd4781e0c88 277 unsigned short inflightMsgid;
JMF 0:5cd4781e0c88 278 enum QoS inflightQoS;
JMF 0:5cd4781e0c88 279 #endif
JMF 0:5cd4781e0c88 280
JMF 0:5cd4781e0c88 281 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 282 bool pubrel;
JMF 0:5cd4781e0c88 283 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
JMF 0:5cd4781e0c88 284 #define MAX_INCOMING_QOS2_MESSAGES 10
JMF 0:5cd4781e0c88 285 #endif
JMF 0:5cd4781e0c88 286 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
JMF 0:5cd4781e0c88 287 bool isQoS2msgidFree(unsigned short id);
JMF 0:5cd4781e0c88 288 bool useQoS2msgid(unsigned short id);
JMF 0:5cd4781e0c88 289 void freeQoS2msgid(unsigned short id);
JMF 0:5cd4781e0c88 290 #endif
JMF 0:5cd4781e0c88 291
JMF 0:5cd4781e0c88 292 };
JMF 0:5cd4781e0c88 293
JMF 0:5cd4781e0c88 294 }
JMF 0:5cd4781e0c88 295
JMF 0:5cd4781e0c88 296
JMF 0:5cd4781e0c88 297 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 298 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
JMF 0:5cd4781e0c88 299 {
JMF 0:5cd4781e0c88 300 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 301 messageHandlers[i].topicFilter = 0;
JMF 0:5cd4781e0c88 302
JMF 0:5cd4781e0c88 303 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 304 inflightMsgid = 0;
JMF 0:5cd4781e0c88 305 inflightQoS = QOS0;
JMF 0:5cd4781e0c88 306 #endif
JMF 0:5cd4781e0c88 307
JMF 0:5cd4781e0c88 308 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 309 pubrel = false;
JMF 0:5cd4781e0c88 310 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
JMF 0:5cd4781e0c88 311 incomingQoS2messages[i] = 0;
JMF 0:5cd4781e0c88 312 #endif
JMF 0:5cd4781e0c88 313 }
JMF 0:5cd4781e0c88 314
JMF 0:5cd4781e0c88 315
JMF 0:5cd4781e0c88 316 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 317 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
JMF 0:5cd4781e0c88 318 {
JMF 0:5cd4781e0c88 319 ping_outstanding = false;
JMF 0:5cd4781e0c88 320 isconnected = false;
JMF 0:5cd4781e0c88 321 if (cleansession)
JMF 0:5cd4781e0c88 322 cleanSession();
JMF 0:5cd4781e0c88 323 }
JMF 0:5cd4781e0c88 324
JMF 0:5cd4781e0c88 325
JMF 0:5cd4781e0c88 326 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 327 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
JMF 0:5cd4781e0c88 328 {
JMF 0:5cd4781e0c88 329 this->command_timeout_ms = command_timeout_ms;
JMF 0:5cd4781e0c88 330 cleansession = true;
JMF 0:5cd4781e0c88 331 closeSession();
JMF 0:5cd4781e0c88 332 }
JMF 0:5cd4781e0c88 333
JMF 0:5cd4781e0c88 334
JMF 0:5cd4781e0c88 335 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 336 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 337 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
JMF 0:5cd4781e0c88 338 {
JMF 0:5cd4781e0c88 339 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
JMF 0:5cd4781e0c88 340 {
JMF 0:5cd4781e0c88 341 if (incomingQoS2messages[i] == id)
JMF 0:5cd4781e0c88 342 return false;
JMF 0:5cd4781e0c88 343 }
JMF 0:5cd4781e0c88 344 return true;
JMF 0:5cd4781e0c88 345 }
JMF 0:5cd4781e0c88 346
JMF 0:5cd4781e0c88 347
JMF 0:5cd4781e0c88 348 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 349 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
JMF 0:5cd4781e0c88 350 {
JMF 0:5cd4781e0c88 351 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
JMF 0:5cd4781e0c88 352 {
JMF 0:5cd4781e0c88 353 if (incomingQoS2messages[i] == 0)
JMF 0:5cd4781e0c88 354 {
JMF 0:5cd4781e0c88 355 incomingQoS2messages[i] = id;
JMF 0:5cd4781e0c88 356 return true;
JMF 0:5cd4781e0c88 357 }
JMF 0:5cd4781e0c88 358 }
JMF 0:5cd4781e0c88 359 return false;
JMF 0:5cd4781e0c88 360 }
JMF 0:5cd4781e0c88 361
JMF 0:5cd4781e0c88 362
JMF 0:5cd4781e0c88 363 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 364 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
JMF 0:5cd4781e0c88 365 {
JMF 0:5cd4781e0c88 366 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
JMF 0:5cd4781e0c88 367 {
JMF 0:5cd4781e0c88 368 if (incomingQoS2messages[i] == id)
JMF 0:5cd4781e0c88 369 {
JMF 0:5cd4781e0c88 370 incomingQoS2messages[i] = 0;
JMF 0:5cd4781e0c88 371 return;
JMF 0:5cd4781e0c88 372 }
JMF 0:5cd4781e0c88 373 }
JMF 0:5cd4781e0c88 374 }
JMF 0:5cd4781e0c88 375 #endif
JMF 0:5cd4781e0c88 376
JMF 0:5cd4781e0c88 377
JMF 0:5cd4781e0c88 378 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 379 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
JMF 0:5cd4781e0c88 380 {
JMF 0:5cd4781e0c88 381 int rc = FAILURE,
JMF 0:5cd4781e0c88 382 sent = 0;
JMF 0:5cd4781e0c88 383
JMF 0:5cd4781e0c88 384 while (sent < length)
JMF 0:5cd4781e0c88 385 {
JMF 0:5cd4781e0c88 386 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
JMF 0:5cd4781e0c88 387 if (rc < 0) // there was an error writing the data
JMF 0:5cd4781e0c88 388 break;
JMF 0:5cd4781e0c88 389 sent += rc;
JMF 0:5cd4781e0c88 390 if (timer.expired()) // only check expiry after at least one attempt to write
JMF 0:5cd4781e0c88 391 break;
JMF 0:5cd4781e0c88 392 }
JMF 0:5cd4781e0c88 393 if (sent == length)
JMF 0:5cd4781e0c88 394 {
JMF 0:5cd4781e0c88 395 if (this->keepAliveInterval > 0)
JMF 0:5cd4781e0c88 396 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
JMF 0:5cd4781e0c88 397 rc = SUCCESS;
JMF 0:5cd4781e0c88 398 }
JMF 0:5cd4781e0c88 399 else
JMF 0:5cd4781e0c88 400 rc = FAILURE;
JMF 0:5cd4781e0c88 401
JMF 0:5cd4781e0c88 402 #if defined(MQTT_DEBUG)
JMF 0:5cd4781e0c88 403 char printbuf[150];
JMF 0:5cd4781e0c88 404 DEBUG("Rc %d from sending packet %s\r\n", rc,
JMF 0:5cd4781e0c88 405 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
JMF 0:5cd4781e0c88 406 #endif
JMF 0:5cd4781e0c88 407 return rc;
JMF 0:5cd4781e0c88 408 }
JMF 0:5cd4781e0c88 409
JMF 0:5cd4781e0c88 410
JMF 0:5cd4781e0c88 411 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 412 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
JMF 0:5cd4781e0c88 413 {
JMF 0:5cd4781e0c88 414 unsigned char c;
JMF 0:5cd4781e0c88 415 int multiplier = 1;
JMF 0:5cd4781e0c88 416 int len = 0;
JMF 0:5cd4781e0c88 417 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
JMF 0:5cd4781e0c88 418
JMF 0:5cd4781e0c88 419 *value = 0;
JMF 0:5cd4781e0c88 420 do
JMF 0:5cd4781e0c88 421 {
JMF 0:5cd4781e0c88 422 int rc = MQTTPACKET_READ_ERROR;
JMF 0:5cd4781e0c88 423
JMF 0:5cd4781e0c88 424 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
JMF 0:5cd4781e0c88 425 {
JMF 0:5cd4781e0c88 426 rc = MQTTPACKET_READ_ERROR; /* bad data */
JMF 0:5cd4781e0c88 427 goto exit;
JMF 0:5cd4781e0c88 428 }
JMF 0:5cd4781e0c88 429 rc = ipstack.read(&c, 1, timeout);
JMF 0:5cd4781e0c88 430 if (rc != 1)
JMF 0:5cd4781e0c88 431 goto exit;
JMF 0:5cd4781e0c88 432 *value += (c & 127) * multiplier;
JMF 0:5cd4781e0c88 433 multiplier *= 128;
JMF 0:5cd4781e0c88 434 } while ((c & 128) != 0);
JMF 0:5cd4781e0c88 435 exit:
JMF 0:5cd4781e0c88 436 return len;
JMF 0:5cd4781e0c88 437 }
JMF 0:5cd4781e0c88 438
JMF 0:5cd4781e0c88 439
JMF 0:5cd4781e0c88 440 /**
JMF 0:5cd4781e0c88 441 * If any read fails in this method, then we should disconnect from the network, as on reconnect
JMF 0:5cd4781e0c88 442 * the packets can be retried.
JMF 0:5cd4781e0c88 443 * @param timeout the max time to wait for the packet read to complete, in milliseconds
JMF 0:5cd4781e0c88 444 * @return the MQTT packet type, 0 if none, -1 if error
JMF 0:5cd4781e0c88 445 */
JMF 0:5cd4781e0c88 446 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 447 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
JMF 0:5cd4781e0c88 448 {
JMF 0:5cd4781e0c88 449 int rc = FAILURE;
JMF 0:5cd4781e0c88 450 MQTTHeader header = {0};
JMF 0:5cd4781e0c88 451 int len = 0;
JMF 0:5cd4781e0c88 452 int rem_len = 0;
JMF 0:5cd4781e0c88 453
JMF 0:5cd4781e0c88 454 /* 1. read the header byte. This has the packet type in it */
JMF 0:5cd4781e0c88 455 rc = ipstack.read(readbuf, 1, timer.left_ms());
JMF 0:5cd4781e0c88 456 if (rc != 1)
JMF 0:5cd4781e0c88 457 goto exit;
JMF 0:5cd4781e0c88 458
JMF 0:5cd4781e0c88 459 len = 1;
JMF 0:5cd4781e0c88 460 /* 2. read the remaining length. This is variable in itself */
JMF 0:5cd4781e0c88 461 decodePacket(&rem_len, timer.left_ms());
JMF 0:5cd4781e0c88 462 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
JMF 0:5cd4781e0c88 463
JMF 0:5cd4781e0c88 464 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
JMF 0:5cd4781e0c88 465 {
JMF 0:5cd4781e0c88 466 rc = BUFFER_OVERFLOW;
JMF 0:5cd4781e0c88 467 goto exit;
JMF 0:5cd4781e0c88 468 }
JMF 0:5cd4781e0c88 469
JMF 0:5cd4781e0c88 470 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
JMF 0:5cd4781e0c88 471 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
JMF 0:5cd4781e0c88 472 goto exit;
JMF 0:5cd4781e0c88 473
JMF 0:5cd4781e0c88 474 header.byte = readbuf[0];
JMF 0:5cd4781e0c88 475 rc = header.bits.type;
JMF 0:5cd4781e0c88 476 if (this->keepAliveInterval > 0)
JMF 0:5cd4781e0c88 477 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
JMF 0:5cd4781e0c88 478 exit:
JMF 0:5cd4781e0c88 479
JMF 0:5cd4781e0c88 480 #if defined(MQTT_DEBUG)
JMF 0:5cd4781e0c88 481 if (rc >= 0)
JMF 0:5cd4781e0c88 482 {
JMF 0:5cd4781e0c88 483 char printbuf[50];
JMF 0:5cd4781e0c88 484 DEBUG("Rc %d receiving packet %s\r\n", rc,
JMF 0:5cd4781e0c88 485 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
JMF 0:5cd4781e0c88 486 }
JMF 0:5cd4781e0c88 487 #endif
JMF 0:5cd4781e0c88 488 return rc;
JMF 0:5cd4781e0c88 489 }
JMF 0:5cd4781e0c88 490
JMF 0:5cd4781e0c88 491
JMF 0:5cd4781e0c88 492 // assume topic filter and name is in correct format
JMF 0:5cd4781e0c88 493 // # can only be at end
JMF 0:5cd4781e0c88 494 // + and # can only be next to separator
JMF 0:5cd4781e0c88 495 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 496 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
JMF 0:5cd4781e0c88 497 {
JMF 0:5cd4781e0c88 498 char* curf = topicFilter;
JMF 0:5cd4781e0c88 499 char* curn = topicName.lenstring.data;
JMF 0:5cd4781e0c88 500 char* curn_end = curn + topicName.lenstring.len;
JMF 0:5cd4781e0c88 501
JMF 0:5cd4781e0c88 502 while (*curf && curn < curn_end)
JMF 0:5cd4781e0c88 503 {
JMF 0:5cd4781e0c88 504 if (*curn == '/' && *curf != '/')
JMF 0:5cd4781e0c88 505 break;
JMF 0:5cd4781e0c88 506 if (*curf != '+' && *curf != '#' && *curf != *curn)
JMF 0:5cd4781e0c88 507 break;
JMF 0:5cd4781e0c88 508 if (*curf == '+')
JMF 0:5cd4781e0c88 509 { // skip until we meet the next separator, or end of string
JMF 0:5cd4781e0c88 510 char* nextpos = curn + 1;
JMF 0:5cd4781e0c88 511 while (nextpos < curn_end && *nextpos != '/')
JMF 0:5cd4781e0c88 512 nextpos = ++curn + 1;
JMF 0:5cd4781e0c88 513 }
JMF 0:5cd4781e0c88 514 else if (*curf == '#')
JMF 0:5cd4781e0c88 515 curn = curn_end - 1; // skip until end of string
JMF 0:5cd4781e0c88 516 curf++;
JMF 0:5cd4781e0c88 517 curn++;
JMF 0:5cd4781e0c88 518 };
JMF 0:5cd4781e0c88 519
JMF 0:5cd4781e0c88 520 return (curn == curn_end) && (*curf == '\0');
JMF 0:5cd4781e0c88 521 }
JMF 0:5cd4781e0c88 522
JMF 0:5cd4781e0c88 523
JMF 0:5cd4781e0c88 524
JMF 0:5cd4781e0c88 525 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 526 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
JMF 0:5cd4781e0c88 527 {
JMF 0:5cd4781e0c88 528 int rc = FAILURE;
JMF 0:5cd4781e0c88 529
JMF 0:5cd4781e0c88 530 // we have to find the right message handler - indexed by topic
JMF 0:5cd4781e0c88 531 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 532 {
JMF 0:5cd4781e0c88 533 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
JMF 0:5cd4781e0c88 534 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
JMF 0:5cd4781e0c88 535 {
JMF 0:5cd4781e0c88 536 if (messageHandlers[i].fp) //test to see if it is attached
JMF 0:5cd4781e0c88 537 {
JMF 0:5cd4781e0c88 538 MessageData md(topicName, message);
JMF 0:5cd4781e0c88 539 messageHandlers[i].fp(md);
JMF 0:5cd4781e0c88 540 rc = SUCCESS;
JMF 0:5cd4781e0c88 541 }
JMF 0:5cd4781e0c88 542 }
JMF 0:5cd4781e0c88 543 }
JMF 0:5cd4781e0c88 544
JMF 0:5cd4781e0c88 545 if (rc == FAILURE && defaultMessageHandler )
JMF 0:5cd4781e0c88 546 {
JMF 0:5cd4781e0c88 547 MessageData md(topicName, message);
JMF 0:5cd4781e0c88 548 defaultMessageHandler(md);
JMF 0:5cd4781e0c88 549 rc = SUCCESS;
JMF 0:5cd4781e0c88 550 }
JMF 0:5cd4781e0c88 551
JMF 0:5cd4781e0c88 552 return rc;
JMF 0:5cd4781e0c88 553 }
JMF 0:5cd4781e0c88 554
JMF 0:5cd4781e0c88 555
JMF 0:5cd4781e0c88 556
JMF 0:5cd4781e0c88 557 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 558 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
JMF 0:5cd4781e0c88 559 {
JMF 0:5cd4781e0c88 560 int rc = SUCCESS;
JMF 0:5cd4781e0c88 561 Timer timer;
JMF 0:5cd4781e0c88 562
JMF 0:5cd4781e0c88 563 timer.countdown_ms(timeout_ms);
JMF 0:5cd4781e0c88 564 while (!timer.expired())
JMF 0:5cd4781e0c88 565 {
JMF 0:5cd4781e0c88 566 if (cycle(timer) < 0)
JMF 0:5cd4781e0c88 567 {
JMF 0:5cd4781e0c88 568 rc = FAILURE;
JMF 0:5cd4781e0c88 569 break;
JMF 0:5cd4781e0c88 570 }
JMF 0:5cd4781e0c88 571 }
JMF 0:5cd4781e0c88 572
JMF 0:5cd4781e0c88 573 return rc;
JMF 0:5cd4781e0c88 574 }
JMF 0:5cd4781e0c88 575
JMF 0:5cd4781e0c88 576
JMF 0:5cd4781e0c88 577 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 578 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
JMF 0:5cd4781e0c88 579 {
JMF 0:5cd4781e0c88 580 // get one piece of work off the wire and one pass through
JMF 0:5cd4781e0c88 581 int len = 0,
JMF 0:5cd4781e0c88 582 rc = SUCCESS;
JMF 0:5cd4781e0c88 583
JMF 0:5cd4781e0c88 584 int packet_type = readPacket(timer); // read the socket, see what work is due
JMF 0:5cd4781e0c88 585
JMF 0:5cd4781e0c88 586 switch (packet_type)
JMF 0:5cd4781e0c88 587 {
JMF 0:5cd4781e0c88 588 default:
JMF 0:5cd4781e0c88 589 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
JMF 0:5cd4781e0c88 590 rc = packet_type;
JMF 0:5cd4781e0c88 591 goto exit;
JMF 0:5cd4781e0c88 592 case 0: // timed out reading packet
JMF 0:5cd4781e0c88 593 break;
JMF 0:5cd4781e0c88 594 case CONNACK:
JMF 0:5cd4781e0c88 595 case PUBACK:
JMF 0:5cd4781e0c88 596 case SUBACK:
JMF 0:5cd4781e0c88 597 break;
JMF 0:5cd4781e0c88 598 case PUBLISH:
JMF 0:5cd4781e0c88 599 {
JMF 0:5cd4781e0c88 600 MQTTString topicName = MQTTString_initializer;
JMF 0:5cd4781e0c88 601 Message msg;
JMF 0:5cd4781e0c88 602 int intQoS;
JMF 0:5cd4781e0c88 603 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
JMF 0:5cd4781e0c88 604 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
JMF 0:5cd4781e0c88 605 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
JMF 0:5cd4781e0c88 606 goto exit;
JMF 0:5cd4781e0c88 607 msg.qos = (enum QoS)intQoS;
JMF 0:5cd4781e0c88 608 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 609 if (msg.qos != QOS2)
JMF 0:5cd4781e0c88 610 #endif
JMF 0:5cd4781e0c88 611 deliverMessage(topicName, msg);
JMF 0:5cd4781e0c88 612 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 613 else if (isQoS2msgidFree(msg.id))
JMF 0:5cd4781e0c88 614 {
JMF 0:5cd4781e0c88 615 if (useQoS2msgid(msg.id))
JMF 0:5cd4781e0c88 616 deliverMessage(topicName, msg);
JMF 0:5cd4781e0c88 617 else
JMF 0:5cd4781e0c88 618 WARN("Maximum number of incoming QoS2 messages exceeded");
JMF 0:5cd4781e0c88 619 }
JMF 0:5cd4781e0c88 620 #endif
JMF 0:5cd4781e0c88 621 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 622 if (msg.qos != QOS0)
JMF 0:5cd4781e0c88 623 {
JMF 0:5cd4781e0c88 624 if (msg.qos == QOS1)
JMF 0:5cd4781e0c88 625 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
JMF 0:5cd4781e0c88 626 else if (msg.qos == QOS2)
JMF 0:5cd4781e0c88 627 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
JMF 0:5cd4781e0c88 628 if (len <= 0)
JMF 0:5cd4781e0c88 629 rc = FAILURE;
JMF 0:5cd4781e0c88 630 else
JMF 0:5cd4781e0c88 631 rc = sendPacket(len, timer);
JMF 0:5cd4781e0c88 632 if (rc == FAILURE)
JMF 0:5cd4781e0c88 633 goto exit; // there was a problem
JMF 0:5cd4781e0c88 634 }
JMF 0:5cd4781e0c88 635 break;
JMF 0:5cd4781e0c88 636 #endif
JMF 0:5cd4781e0c88 637 }
JMF 0:5cd4781e0c88 638 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 639 case PUBREC:
JMF 0:5cd4781e0c88 640 case PUBREL:
JMF 0:5cd4781e0c88 641 unsigned short mypacketid;
JMF 0:5cd4781e0c88 642 unsigned char dup, type;
JMF 0:5cd4781e0c88 643 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
JMF 0:5cd4781e0c88 644 rc = FAILURE;
JMF 0:5cd4781e0c88 645 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
JMF 0:5cd4781e0c88 646 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
JMF 0:5cd4781e0c88 647 rc = FAILURE;
JMF 0:5cd4781e0c88 648 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
JMF 0:5cd4781e0c88 649 rc = FAILURE; // there was a problem
JMF 0:5cd4781e0c88 650 if (rc == FAILURE)
JMF 0:5cd4781e0c88 651 goto exit; // there was a problem
JMF 0:5cd4781e0c88 652 if (packet_type == PUBREL)
JMF 0:5cd4781e0c88 653 freeQoS2msgid(mypacketid);
JMF 0:5cd4781e0c88 654 break;
JMF 0:5cd4781e0c88 655
JMF 0:5cd4781e0c88 656 case PUBCOMP:
JMF 0:5cd4781e0c88 657 break;
JMF 0:5cd4781e0c88 658 #endif
JMF 0:5cd4781e0c88 659 case PINGRESP:
JMF 0:5cd4781e0c88 660 ping_outstanding = false;
JMF 0:5cd4781e0c88 661 break;
JMF 0:5cd4781e0c88 662 }
JMF 0:5cd4781e0c88 663
JMF 0:5cd4781e0c88 664 if (keepalive() != SUCCESS)
JMF 0:5cd4781e0c88 665 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
JMF 0:5cd4781e0c88 666 rc = FAILURE;
JMF 0:5cd4781e0c88 667
JMF 0:5cd4781e0c88 668 exit:
JMF 0:5cd4781e0c88 669 if (rc == SUCCESS)
JMF 0:5cd4781e0c88 670 rc = packet_type;
JMF 0:5cd4781e0c88 671 else if (isconnected)
JMF 0:5cd4781e0c88 672 closeSession();
JMF 0:5cd4781e0c88 673 return rc;
JMF 0:5cd4781e0c88 674 }
JMF 0:5cd4781e0c88 675
JMF 0:5cd4781e0c88 676
JMF 0:5cd4781e0c88 677 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 678 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
JMF 0:5cd4781e0c88 679 {
JMF 0:5cd4781e0c88 680 int rc = SUCCESS;
JMF 0:5cd4781e0c88 681 static Timer ping_sent;
JMF 0:5cd4781e0c88 682
JMF 0:5cd4781e0c88 683 if (keepAliveInterval == 0)
JMF 0:5cd4781e0c88 684 goto exit;
JMF 0:5cd4781e0c88 685
JMF 0:5cd4781e0c88 686 if (ping_outstanding)
JMF 0:5cd4781e0c88 687 {
JMF 0:5cd4781e0c88 688 if (ping_sent.expired())
JMF 0:5cd4781e0c88 689 {
JMF 0:5cd4781e0c88 690 rc = FAILURE; // session failure
JMF 0:5cd4781e0c88 691 #if defined(MQTT_DEBUG)
JMF 0:5cd4781e0c88 692 DEBUG("PINGRESP not received in keepalive interval\r\n");
JMF 0:5cd4781e0c88 693 #endif
JMF 0:5cd4781e0c88 694 }
JMF 0:5cd4781e0c88 695 }
JMF 0:5cd4781e0c88 696 else if (last_sent.expired() || last_received.expired())
JMF 0:5cd4781e0c88 697 {
JMF 0:5cd4781e0c88 698 Timer timer(1000);
JMF 0:5cd4781e0c88 699 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
JMF 0:5cd4781e0c88 700 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
JMF 0:5cd4781e0c88 701 {
JMF 0:5cd4781e0c88 702 ping_outstanding = true;
JMF 0:5cd4781e0c88 703 ping_sent.countdown(this->keepAliveInterval);
JMF 0:5cd4781e0c88 704 }
JMF 0:5cd4781e0c88 705 }
JMF 0:5cd4781e0c88 706 exit:
JMF 0:5cd4781e0c88 707 return rc;
JMF 0:5cd4781e0c88 708 }
JMF 0:5cd4781e0c88 709
JMF 0:5cd4781e0c88 710
JMF 0:5cd4781e0c88 711 // only used in single-threaded mode where one command at a time is in process
JMF 0:5cd4781e0c88 712 template<class Network, class Timer, int a, int b>
JMF 0:5cd4781e0c88 713 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
JMF 0:5cd4781e0c88 714 {
JMF 0:5cd4781e0c88 715 int rc = FAILURE;
JMF 0:5cd4781e0c88 716
JMF 0:5cd4781e0c88 717 do
JMF 0:5cd4781e0c88 718 {
JMF 0:5cd4781e0c88 719 if (timer.expired())
JMF 0:5cd4781e0c88 720 break; // we timed out
JMF 0:5cd4781e0c88 721 rc = cycle(timer);
JMF 0:5cd4781e0c88 722 }
JMF 0:5cd4781e0c88 723 while (rc != packet_type && rc >= 0);
JMF 0:5cd4781e0c88 724
JMF 0:5cd4781e0c88 725 return rc;
JMF 0:5cd4781e0c88 726 }
JMF 0:5cd4781e0c88 727
JMF 0:5cd4781e0c88 728
JMF 0:5cd4781e0c88 729 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 730 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
JMF 0:5cd4781e0c88 731 {
JMF 0:5cd4781e0c88 732 Timer connect_timer(command_timeout_ms);
JMF 0:5cd4781e0c88 733 int rc = FAILURE;
JMF 0:5cd4781e0c88 734 int len = 0;
JMF 0:5cd4781e0c88 735
JMF 0:5cd4781e0c88 736 if (isconnected) // don't send connect packet again if we are already connected
JMF 0:5cd4781e0c88 737 goto exit;
JMF 0:5cd4781e0c88 738
JMF 0:5cd4781e0c88 739 this->keepAliveInterval = options.keepAliveInterval;
JMF 0:5cd4781e0c88 740 this->cleansession = options.cleansession;
JMF 0:5cd4781e0c88 741 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
JMF 0:5cd4781e0c88 742 goto exit;
JMF 0:5cd4781e0c88 743 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
JMF 0:5cd4781e0c88 744 goto exit; // there was a problem
JMF 0:5cd4781e0c88 745
JMF 0:5cd4781e0c88 746 if (this->keepAliveInterval > 0)
JMF 0:5cd4781e0c88 747 last_received.countdown(this->keepAliveInterval);
JMF 0:5cd4781e0c88 748 // this will be a blocking call, wait for the connack
JMF 0:5cd4781e0c88 749 if (waitfor(CONNACK, connect_timer) == CONNACK)
JMF 0:5cd4781e0c88 750 {
JMF 0:5cd4781e0c88 751 data.rc = 0;
JMF 0:5cd4781e0c88 752 data.sessionPresent = false;
JMF 0:5cd4781e0c88 753 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
JMF 0:5cd4781e0c88 754 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 755 rc = data.rc;
JMF 0:5cd4781e0c88 756 else
JMF 0:5cd4781e0c88 757 rc = FAILURE;
JMF 0:5cd4781e0c88 758 }
JMF 0:5cd4781e0c88 759 else
JMF 0:5cd4781e0c88 760 rc = FAILURE;
JMF 0:5cd4781e0c88 761
JMF 0:5cd4781e0c88 762 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 763 // resend any inflight publish
JMF 0:5cd4781e0c88 764 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
JMF 0:5cd4781e0c88 765 {
JMF 0:5cd4781e0c88 766 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
JMF 0:5cd4781e0c88 767 rc = FAILURE;
JMF 0:5cd4781e0c88 768 else
JMF 0:5cd4781e0c88 769 rc = publish(len, connect_timer, inflightQoS);
JMF 0:5cd4781e0c88 770 }
JMF 0:5cd4781e0c88 771 else
JMF 0:5cd4781e0c88 772 #endif
JMF 0:5cd4781e0c88 773 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 774 if (inflightMsgid > 0)
JMF 0:5cd4781e0c88 775 {
JMF 0:5cd4781e0c88 776 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
JMF 0:5cd4781e0c88 777 rc = publish(inflightLen, connect_timer, inflightQoS);
JMF 0:5cd4781e0c88 778 }
JMF 0:5cd4781e0c88 779 #endif
JMF 0:5cd4781e0c88 780
JMF 0:5cd4781e0c88 781 exit:
JMF 0:5cd4781e0c88 782 if (rc == SUCCESS)
JMF 0:5cd4781e0c88 783 {
JMF 0:5cd4781e0c88 784 isconnected = true;
JMF 0:5cd4781e0c88 785 ping_outstanding = false;
JMF 0:5cd4781e0c88 786 }
JMF 0:5cd4781e0c88 787 return rc;
JMF 0:5cd4781e0c88 788 }
JMF 0:5cd4781e0c88 789
JMF 0:5cd4781e0c88 790
JMF 0:5cd4781e0c88 791 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 792 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
JMF 0:5cd4781e0c88 793 {
JMF 0:5cd4781e0c88 794 connackData data;
JMF 0:5cd4781e0c88 795 return connect(options, data);
JMF 0:5cd4781e0c88 796 }
JMF 0:5cd4781e0c88 797
JMF 0:5cd4781e0c88 798
JMF 0:5cd4781e0c88 799 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 800 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
JMF 0:5cd4781e0c88 801 {
JMF 0:5cd4781e0c88 802 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
JMF 0:5cd4781e0c88 803 return connect(default_options);
JMF 0:5cd4781e0c88 804 }
JMF 0:5cd4781e0c88 805
JMF 0:5cd4781e0c88 806
JMF 0:5cd4781e0c88 807 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 808 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
JMF 0:5cd4781e0c88 809 {
JMF 0:5cd4781e0c88 810 int rc = FAILURE;
JMF 0:5cd4781e0c88 811 int i = -1;
JMF 0:5cd4781e0c88 812
JMF 0:5cd4781e0c88 813 // first check for an existing matching slot
JMF 0:5cd4781e0c88 814 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 815 {
JMF 0:5cd4781e0c88 816 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
JMF 0:5cd4781e0c88 817 {
JMF 0:5cd4781e0c88 818 if (messageHandler == 0) // remove existing
JMF 0:5cd4781e0c88 819 {
JMF 0:5cd4781e0c88 820 messageHandlers[i].topicFilter = 0;
JMF 0:5cd4781e0c88 821 messageHandlers[i].fp.detach();
JMF 0:5cd4781e0c88 822 }
JMF 0:5cd4781e0c88 823 rc = SUCCESS; // return i when adding new subscription
JMF 0:5cd4781e0c88 824 break;
JMF 0:5cd4781e0c88 825 }
JMF 0:5cd4781e0c88 826 }
JMF 0:5cd4781e0c88 827 // if no existing, look for empty slot (unless we are removing)
JMF 0:5cd4781e0c88 828 if (messageHandler != 0) {
JMF 0:5cd4781e0c88 829 if (rc == FAILURE)
JMF 0:5cd4781e0c88 830 {
JMF 0:5cd4781e0c88 831 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
JMF 0:5cd4781e0c88 832 {
JMF 0:5cd4781e0c88 833 if (messageHandlers[i].topicFilter == 0)
JMF 0:5cd4781e0c88 834 {
JMF 0:5cd4781e0c88 835 rc = SUCCESS;
JMF 0:5cd4781e0c88 836 break;
JMF 0:5cd4781e0c88 837 }
JMF 0:5cd4781e0c88 838 }
JMF 0:5cd4781e0c88 839 }
JMF 0:5cd4781e0c88 840 if (i < MAX_MESSAGE_HANDLERS)
JMF 0:5cd4781e0c88 841 {
JMF 0:5cd4781e0c88 842 messageHandlers[i].topicFilter = topicFilter;
JMF 0:5cd4781e0c88 843 messageHandlers[i].fp.attach(messageHandler);
JMF 0:5cd4781e0c88 844 }
JMF 0:5cd4781e0c88 845 }
JMF 0:5cd4781e0c88 846 return rc;
JMF 0:5cd4781e0c88 847 }
JMF 0:5cd4781e0c88 848
JMF 0:5cd4781e0c88 849
JMF 0:5cd4781e0c88 850 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 851 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
JMF 0:5cd4781e0c88 852 enum QoS qos, messageHandler messageHandler, subackData& data)
JMF 0:5cd4781e0c88 853 {
JMF 0:5cd4781e0c88 854 int rc = FAILURE;
JMF 0:5cd4781e0c88 855 Timer timer(command_timeout_ms);
JMF 0:5cd4781e0c88 856 int len = 0;
JMF 0:5cd4781e0c88 857 MQTTString topic = {(char*)topicFilter, {0, 0}};
JMF 0:5cd4781e0c88 858
JMF 0:5cd4781e0c88 859 if (!isconnected)
JMF 0:5cd4781e0c88 860 goto exit;
JMF 0:5cd4781e0c88 861
JMF 0:5cd4781e0c88 862 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
JMF 0:5cd4781e0c88 863 if (len <= 0)
JMF 0:5cd4781e0c88 864 goto exit;
JMF 0:5cd4781e0c88 865 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
JMF 0:5cd4781e0c88 866 goto exit; // there was a problem
JMF 0:5cd4781e0c88 867
JMF 0:5cd4781e0c88 868 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
JMF 0:5cd4781e0c88 869 {
JMF 0:5cd4781e0c88 870 int count = 0;
JMF 0:5cd4781e0c88 871 unsigned short mypacketid;
JMF 0:5cd4781e0c88 872 data.grantedQoS = 0;
JMF 0:5cd4781e0c88 873 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 874 {
JMF 0:5cd4781e0c88 875 if (data.grantedQoS != 0x80)
JMF 0:5cd4781e0c88 876 rc = setMessageHandler(topicFilter, messageHandler);
JMF 0:5cd4781e0c88 877 }
JMF 0:5cd4781e0c88 878 }
JMF 0:5cd4781e0c88 879 else
JMF 0:5cd4781e0c88 880 rc = FAILURE;
JMF 0:5cd4781e0c88 881
JMF 0:5cd4781e0c88 882 exit:
JMF 0:5cd4781e0c88 883 if (rc == FAILURE)
JMF 0:5cd4781e0c88 884 closeSession();
JMF 0:5cd4781e0c88 885 return rc;
JMF 0:5cd4781e0c88 886 }
JMF 0:5cd4781e0c88 887
JMF 0:5cd4781e0c88 888
JMF 0:5cd4781e0c88 889 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 890 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
JMF 0:5cd4781e0c88 891 {
JMF 0:5cd4781e0c88 892 subackData data;
JMF 0:5cd4781e0c88 893 return subscribe(topicFilter, qos, messageHandler, data);
JMF 0:5cd4781e0c88 894 }
JMF 0:5cd4781e0c88 895
JMF 0:5cd4781e0c88 896
JMF 0:5cd4781e0c88 897 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
JMF 0:5cd4781e0c88 898 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
JMF 0:5cd4781e0c88 899 {
JMF 0:5cd4781e0c88 900 int rc = FAILURE;
JMF 0:5cd4781e0c88 901 Timer timer(command_timeout_ms);
JMF 0:5cd4781e0c88 902 MQTTString topic = {(char*)topicFilter, {0, 0}};
JMF 0:5cd4781e0c88 903 int len = 0;
JMF 0:5cd4781e0c88 904
JMF 0:5cd4781e0c88 905 if (!isconnected)
JMF 0:5cd4781e0c88 906 goto exit;
JMF 0:5cd4781e0c88 907
JMF 0:5cd4781e0c88 908 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
JMF 0:5cd4781e0c88 909 goto exit;
JMF 0:5cd4781e0c88 910 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
JMF 0:5cd4781e0c88 911 goto exit; // there was a problem
JMF 0:5cd4781e0c88 912
JMF 0:5cd4781e0c88 913 if (waitfor(UNSUBACK, timer) == UNSUBACK)
JMF 0:5cd4781e0c88 914 {
JMF 0:5cd4781e0c88 915 unsigned short mypacketid; // should be the same as the packetid above
JMF 0:5cd4781e0c88 916 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
JMF 0:5cd4781e0c88 917 {
JMF 0:5cd4781e0c88 918 // remove the subscription message handler associated with this topic, if there is one
JMF 0:5cd4781e0c88 919 setMessageHandler(topicFilter, 0);
JMF 0:5cd4781e0c88 920 }
JMF 0:5cd4781e0c88 921 }
JMF 0:5cd4781e0c88 922 else
JMF 0:5cd4781e0c88 923 rc = FAILURE;
JMF 0:5cd4781e0c88 924
JMF 0:5cd4781e0c88 925 exit:
JMF 0:5cd4781e0c88 926 if (rc != SUCCESS)
JMF 0:5cd4781e0c88 927 closeSession();
JMF 0:5cd4781e0c88 928 return rc;
JMF 0:5cd4781e0c88 929 }
JMF 0:5cd4781e0c88 930
JMF 0:5cd4781e0c88 931
JMF 0:5cd4781e0c88 932 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 933 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
JMF 0:5cd4781e0c88 934 {
JMF 0:5cd4781e0c88 935 int rc;
JMF 0:5cd4781e0c88 936
JMF 0:5cd4781e0c88 937 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
JMF 0:5cd4781e0c88 938 goto exit; // there was a problem
JMF 0:5cd4781e0c88 939
JMF 0:5cd4781e0c88 940 #if MQTTCLIENT_QOS1
JMF 0:5cd4781e0c88 941 if (qos == QOS1)
JMF 0:5cd4781e0c88 942 {
JMF 0:5cd4781e0c88 943 if (waitfor(PUBACK, timer) == PUBACK)
JMF 0:5cd4781e0c88 944 {
JMF 0:5cd4781e0c88 945 unsigned short mypacketid;
JMF 0:5cd4781e0c88 946 unsigned char dup, type;
JMF 0:5cd4781e0c88 947 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
JMF 0:5cd4781e0c88 948 rc = FAILURE;
JMF 0:5cd4781e0c88 949 else if (inflightMsgid == mypacketid)
JMF 0:5cd4781e0c88 950 inflightMsgid = 0;
JMF 0:5cd4781e0c88 951 }
JMF 0:5cd4781e0c88 952 else
JMF 0:5cd4781e0c88 953 rc = FAILURE;
JMF 0:5cd4781e0c88 954 }
JMF 0:5cd4781e0c88 955 #endif
JMF 0:5cd4781e0c88 956 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 957 else if (qos == QOS2)
JMF 0:5cd4781e0c88 958 {
JMF 0:5cd4781e0c88 959 if (waitfor(PUBCOMP, timer) == PUBCOMP)
JMF 0:5cd4781e0c88 960 {
JMF 0:5cd4781e0c88 961 unsigned short mypacketid;
JMF 0:5cd4781e0c88 962 unsigned char dup, type;
JMF 0:5cd4781e0c88 963 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
JMF 0:5cd4781e0c88 964 rc = FAILURE;
JMF 0:5cd4781e0c88 965 else if (inflightMsgid == mypacketid)
JMF 0:5cd4781e0c88 966 inflightMsgid = 0;
JMF 0:5cd4781e0c88 967 }
JMF 0:5cd4781e0c88 968 else
JMF 0:5cd4781e0c88 969 rc = FAILURE;
JMF 0:5cd4781e0c88 970 }
JMF 0:5cd4781e0c88 971 #endif
JMF 0:5cd4781e0c88 972
JMF 0:5cd4781e0c88 973 exit:
JMF 0:5cd4781e0c88 974 if (rc != SUCCESS)
JMF 0:5cd4781e0c88 975 closeSession();
JMF 0:5cd4781e0c88 976 return rc;
JMF 0:5cd4781e0c88 977 }
JMF 0:5cd4781e0c88 978
JMF 0:5cd4781e0c88 979
JMF 0:5cd4781e0c88 980
JMF 0:5cd4781e0c88 981 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 982 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
JMF 0:5cd4781e0c88 983 {
JMF 0:5cd4781e0c88 984 int rc = FAILURE;
JMF 0:5cd4781e0c88 985 Timer timer(command_timeout_ms);
JMF 0:5cd4781e0c88 986 MQTTString topicString = MQTTString_initializer;
JMF 0:5cd4781e0c88 987 int len = 0;
JMF 0:5cd4781e0c88 988
JMF 0:5cd4781e0c88 989 if (!isconnected)
JMF 0:5cd4781e0c88 990 goto exit;
JMF 0:5cd4781e0c88 991
JMF 0:5cd4781e0c88 992 topicString.cstring = (char*)topicName;
JMF 0:5cd4781e0c88 993
JMF 0:5cd4781e0c88 994 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 995 if (qos == QOS1 || qos == QOS2)
JMF 0:5cd4781e0c88 996 id = packetid.getNext();
JMF 0:5cd4781e0c88 997 #endif
JMF 0:5cd4781e0c88 998
JMF 0:5cd4781e0c88 999 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
JMF 0:5cd4781e0c88 1000 topicString, (unsigned char*)payload, payloadlen);
JMF 0:5cd4781e0c88 1001 if (len <= 0)
JMF 0:5cd4781e0c88 1002 goto exit;
JMF 0:5cd4781e0c88 1003
JMF 0:5cd4781e0c88 1004 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 1005 if (!cleansession)
JMF 0:5cd4781e0c88 1006 {
JMF 0:5cd4781e0c88 1007 memcpy(pubbuf, sendbuf, len);
JMF 0:5cd4781e0c88 1008 inflightMsgid = id;
JMF 0:5cd4781e0c88 1009 inflightLen = len;
JMF 0:5cd4781e0c88 1010 inflightQoS = qos;
JMF 0:5cd4781e0c88 1011 #if MQTTCLIENT_QOS2
JMF 0:5cd4781e0c88 1012 pubrel = false;
JMF 0:5cd4781e0c88 1013 #endif
JMF 0:5cd4781e0c88 1014 }
JMF 0:5cd4781e0c88 1015 #endif
JMF 0:5cd4781e0c88 1016
JMF 0:5cd4781e0c88 1017 rc = publish(len, timer, qos);
JMF 0:5cd4781e0c88 1018 exit:
JMF 0:5cd4781e0c88 1019 return rc;
JMF 0:5cd4781e0c88 1020 }
JMF 0:5cd4781e0c88 1021
JMF 0:5cd4781e0c88 1022
JMF 0:5cd4781e0c88 1023 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 1024 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
JMF 0:5cd4781e0c88 1025 {
JMF 0:5cd4781e0c88 1026 unsigned short id = 0; // dummy - not used for anything
JMF 0:5cd4781e0c88 1027 return publish(topicName, payload, payloadlen, id, qos, retained);
JMF 0:5cd4781e0c88 1028 }
JMF 0:5cd4781e0c88 1029
JMF 0:5cd4781e0c88 1030
JMF 0:5cd4781e0c88 1031 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 1032 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
JMF 0:5cd4781e0c88 1033 {
JMF 0:5cd4781e0c88 1034 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
JMF 0:5cd4781e0c88 1035 }
JMF 0:5cd4781e0c88 1036
JMF 0:5cd4781e0c88 1037
JMF 0:5cd4781e0c88 1038 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
JMF 0:5cd4781e0c88 1039 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
JMF 0:5cd4781e0c88 1040 {
JMF 0:5cd4781e0c88 1041 int rc = FAILURE;
JMF 0:5cd4781e0c88 1042 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
JMF 0:5cd4781e0c88 1043 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
JMF 0:5cd4781e0c88 1044 if (len > 0)
JMF 0:5cd4781e0c88 1045 rc = sendPacket(len, timer); // send the disconnect packet
JMF 0:5cd4781e0c88 1046 closeSession();
JMF 0:5cd4781e0c88 1047 return rc;
JMF 0:5cd4781e0c88 1048 }
JMF 0:5cd4781e0c88 1049
JMF 0:5cd4781e0c88 1050 #endif