The Cayenne MQTT mbed Library provides functions to easily connect to the Cayenne IoT project builder.

Fork of Cayenne-MQTT-mbed by myDevicesIoT

Committer:
jburhenn
Date:
Wed Jan 25 10:34:16 2017 -0700
Branch:
feature/multivalue
Revision:
22:0dbabcc6e7b2
Parent:
0:09ef59d2d0f7
Added support for multi-value arrays and size_t data type modifications from the Cayenne-MQTT-C library.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
jburhenn 0:09ef59d2d0f7 1 /*******************************************************************************
jburhenn 0:09ef59d2d0f7 2 * Copyright (c) 2014, 2015 IBM Corp.
jburhenn 0:09ef59d2d0f7 3 *
jburhenn 0:09ef59d2d0f7 4 * All rights reserved. This program and the accompanying materials
jburhenn 0:09ef59d2d0f7 5 * are made available under the terms of the Eclipse Public License v1.0
jburhenn 0:09ef59d2d0f7 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
jburhenn 0:09ef59d2d0f7 7 *
jburhenn 0:09ef59d2d0f7 8 * The Eclipse Public License is available at
jburhenn 0:09ef59d2d0f7 9 * http://www.eclipse.org/legal/epl-v10.html
jburhenn 0:09ef59d2d0f7 10 * and the Eclipse Distribution License is available at
jburhenn 0:09ef59d2d0f7 11 * http://www.eclipse.org/org/documents/edl-v10.php.
jburhenn 0:09ef59d2d0f7 12 *
jburhenn 0:09ef59d2d0f7 13 * Contributors:
jburhenn 0:09ef59d2d0f7 14 * Ian Craggs - initial API and implementation and/or initial documentation
jburhenn 0:09ef59d2d0f7 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
jburhenn 0:09ef59d2d0f7 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
jburhenn 0:09ef59d2d0f7 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
jburhenn 0:09ef59d2d0f7 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
jburhenn 0:09ef59d2d0f7 19 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
jburhenn 0:09ef59d2d0f7 20 * Ian Craggs - fix for bug 475749 - packetid modified twice
jburhenn 0:09ef59d2d0f7 21 *******************************************************************************/
jburhenn 0:09ef59d2d0f7 22
jburhenn 0:09ef59d2d0f7 23 #if !defined(MQTTCLIENT_H)
jburhenn 0:09ef59d2d0f7 24 #define MQTTCLIENT_H
jburhenn 0:09ef59d2d0f7 25
jburhenn 0:09ef59d2d0f7 26 #include "FP.h"
jburhenn 0:09ef59d2d0f7 27 #include "MQTTPacket.h"
jburhenn 0:09ef59d2d0f7 28 #include "stdio.h"
jburhenn 0:09ef59d2d0f7 29 #include "MQTTLogging.h"
jburhenn 0:09ef59d2d0f7 30
jburhenn 0:09ef59d2d0f7 31 #if !defined(MQTTCLIENT_QOS1)
jburhenn 0:09ef59d2d0f7 32 #define MQTTCLIENT_QOS1 1
jburhenn 0:09ef59d2d0f7 33 #endif
jburhenn 0:09ef59d2d0f7 34 #if !defined(MQTTCLIENT_QOS2)
jburhenn 0:09ef59d2d0f7 35 #define MQTTCLIENT_QOS2 0
jburhenn 0:09ef59d2d0f7 36 #endif
jburhenn 0:09ef59d2d0f7 37
jburhenn 0:09ef59d2d0f7 38 namespace MQTT
jburhenn 0:09ef59d2d0f7 39 {
jburhenn 0:09ef59d2d0f7 40
jburhenn 0:09ef59d2d0f7 41
jburhenn 0:09ef59d2d0f7 42 enum QoS { QOS0, QOS1, QOS2 };
jburhenn 0:09ef59d2d0f7 43
jburhenn 0:09ef59d2d0f7 44 // all failure return codes must be negative
jburhenn 0:09ef59d2d0f7 45 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
jburhenn 0:09ef59d2d0f7 46
jburhenn 0:09ef59d2d0f7 47
jburhenn 0:09ef59d2d0f7 48 struct Message
jburhenn 0:09ef59d2d0f7 49 {
jburhenn 0:09ef59d2d0f7 50 enum QoS qos;
jburhenn 0:09ef59d2d0f7 51 bool retained;
jburhenn 0:09ef59d2d0f7 52 bool dup;
jburhenn 0:09ef59d2d0f7 53 unsigned short id;
jburhenn 0:09ef59d2d0f7 54 void *payload;
jburhenn 0:09ef59d2d0f7 55 size_t payloadlen;
jburhenn 0:09ef59d2d0f7 56 };
jburhenn 0:09ef59d2d0f7 57
jburhenn 0:09ef59d2d0f7 58
jburhenn 0:09ef59d2d0f7 59 struct MessageData
jburhenn 0:09ef59d2d0f7 60 {
jburhenn 0:09ef59d2d0f7 61 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
jburhenn 0:09ef59d2d0f7 62 { }
jburhenn 0:09ef59d2d0f7 63
jburhenn 0:09ef59d2d0f7 64 struct Message &message;
jburhenn 0:09ef59d2d0f7 65 MQTTString &topicName;
jburhenn 0:09ef59d2d0f7 66 };
jburhenn 0:09ef59d2d0f7 67
jburhenn 0:09ef59d2d0f7 68
jburhenn 0:09ef59d2d0f7 69 class PacketId
jburhenn 0:09ef59d2d0f7 70 {
jburhenn 0:09ef59d2d0f7 71 public:
jburhenn 0:09ef59d2d0f7 72 PacketId()
jburhenn 0:09ef59d2d0f7 73 {
jburhenn 0:09ef59d2d0f7 74 next = 0;
jburhenn 0:09ef59d2d0f7 75 }
jburhenn 0:09ef59d2d0f7 76
jburhenn 0:09ef59d2d0f7 77 int getNext()
jburhenn 0:09ef59d2d0f7 78 {
jburhenn 0:09ef59d2d0f7 79 return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
jburhenn 0:09ef59d2d0f7 80 }
jburhenn 0:09ef59d2d0f7 81
jburhenn 0:09ef59d2d0f7 82 private:
jburhenn 0:09ef59d2d0f7 83 static const int MAX_PACKET_ID = 65535;
jburhenn 0:09ef59d2d0f7 84 int next;
jburhenn 0:09ef59d2d0f7 85 };
jburhenn 0:09ef59d2d0f7 86
jburhenn 0:09ef59d2d0f7 87
jburhenn 0:09ef59d2d0f7 88 /**
jburhenn 0:09ef59d2d0f7 89 * @class Client
jburhenn 0:09ef59d2d0f7 90 * @brief blocking, non-threaded MQTT client API
jburhenn 0:09ef59d2d0f7 91 *
jburhenn 0:09ef59d2d0f7 92 * This version of the API blocks on all method calls, until they are complete. This means that only one
jburhenn 0:09ef59d2d0f7 93 * MQTT request can be in process at any one time.
jburhenn 0:09ef59d2d0f7 94 * @param Network a network class with the methods: read, write. See NetworkInterface.h for function definitions.
jburhenn 0:09ef59d2d0f7 95 * @param Timer a timer class with the methods: countdown_ms, countdown, left_ms, expired. See TimerInterface.h for function definitions.
jburhenn 0:09ef59d2d0f7 96 */
jburhenn 0:09ef59d2d0f7 97 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
jburhenn 0:09ef59d2d0f7 98 class Client
jburhenn 0:09ef59d2d0f7 99 {
jburhenn 0:09ef59d2d0f7 100
jburhenn 0:09ef59d2d0f7 101 public:
jburhenn 0:09ef59d2d0f7 102
jburhenn 0:09ef59d2d0f7 103 typedef void (*messageHandler)(MessageData&);
jburhenn 0:09ef59d2d0f7 104
jburhenn 0:09ef59d2d0f7 105 /** Construct the client
jburhenn 0:09ef59d2d0f7 106 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
jburhenn 0:09ef59d2d0f7 107 * before calling MQTT connect
jburhenn 0:09ef59d2d0f7 108 * @param limits an instance of the Limit class - to alter limits as required
jburhenn 0:09ef59d2d0f7 109 */
jburhenn 0:09ef59d2d0f7 110 Client(Network& network, unsigned int command_timeout_ms = 30000);
jburhenn 0:09ef59d2d0f7 111
jburhenn 0:09ef59d2d0f7 112 /** Set the default message handling callback - used for any message which does not match a subscription message handler
jburhenn 0:09ef59d2d0f7 113 * @param mh - pointer to the callback function
jburhenn 0:09ef59d2d0f7 114 */
jburhenn 0:09ef59d2d0f7 115 void setDefaultMessageHandler(messageHandler mh)
jburhenn 0:09ef59d2d0f7 116 {
jburhenn 0:09ef59d2d0f7 117 defaultMessageHandler.attach(mh);
jburhenn 0:09ef59d2d0f7 118 }
jburhenn 0:09ef59d2d0f7 119
jburhenn 0:09ef59d2d0f7 120 /** Set the default message handling callback - used for any message which does not match a subscription message handler
jburhenn 0:09ef59d2d0f7 121 * @param time - address of initialized object
jburhenn 0:09ef59d2d0f7 122 * @param mh - pointer to the callback function
jburhenn 0:09ef59d2d0f7 123 */
jburhenn 0:09ef59d2d0f7 124 template<class T>
jburhenn 0:09ef59d2d0f7 125 void setDefaultMessageHandler(T *item, void (T::*method)(MessageData&))
jburhenn 0:09ef59d2d0f7 126 {
jburhenn 0:09ef59d2d0f7 127 defaultMessageHandler.attach(item, method);
jburhenn 0:09ef59d2d0f7 128 }
jburhenn 0:09ef59d2d0f7 129
jburhenn 0:09ef59d2d0f7 130 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jburhenn 0:09ef59d2d0f7 131 * The nework object must be connected to the network endpoint before calling this
jburhenn 0:09ef59d2d0f7 132 * Default connect options are used
jburhenn 0:09ef59d2d0f7 133 * @return success code -
jburhenn 0:09ef59d2d0f7 134 */
jburhenn 0:09ef59d2d0f7 135 int connect();
jburhenn 0:09ef59d2d0f7 136
jburhenn 0:09ef59d2d0f7 137 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jburhenn 0:09ef59d2d0f7 138 * The nework object must be connected to the network endpoint before calling this
jburhenn 0:09ef59d2d0f7 139 * @param options - connect options
jburhenn 0:09ef59d2d0f7 140 * @return success code -
jburhenn 0:09ef59d2d0f7 141 */
jburhenn 0:09ef59d2d0f7 142 int connect(MQTTPacket_connectData& options);
jburhenn 0:09ef59d2d0f7 143
jburhenn 0:09ef59d2d0f7 144 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jburhenn 0:09ef59d2d0f7 145 * @param topic - the topic to publish to
jburhenn 0:09ef59d2d0f7 146 * @param message - the message to send
jburhenn 0:09ef59d2d0f7 147 * @return success code -
jburhenn 0:09ef59d2d0f7 148 */
jburhenn 0:09ef59d2d0f7 149 int publish(const char* topicName, Message& message);
jburhenn 0:09ef59d2d0f7 150
jburhenn 0:09ef59d2d0f7 151 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jburhenn 0:09ef59d2d0f7 152 * @param topic - the topic to publish to
jburhenn 0:09ef59d2d0f7 153 * @param payload - the data to send
jburhenn 0:09ef59d2d0f7 154 * @param payloadlen - the length of the data
jburhenn 0:09ef59d2d0f7 155 * @param qos - the QoS to send the publish at
jburhenn 0:09ef59d2d0f7 156 * @param retained - whether the message should be retained
jburhenn 0:09ef59d2d0f7 157 * @return success code -
jburhenn 0:09ef59d2d0f7 158 */
jburhenn 0:09ef59d2d0f7 159 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
jburhenn 0:09ef59d2d0f7 160
jburhenn 0:09ef59d2d0f7 161 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jburhenn 0:09ef59d2d0f7 162 * @param topic - the topic to publish to
jburhenn 0:09ef59d2d0f7 163 * @param payload - the data to send
jburhenn 0:09ef59d2d0f7 164 * @param payloadlen - the length of the data
jburhenn 0:09ef59d2d0f7 165 * @param id - the packet id used - returned
jburhenn 0:09ef59d2d0f7 166 * @param qos - the QoS to send the publish at
jburhenn 0:09ef59d2d0f7 167 * @param retained - whether the message should be retained
jburhenn 0:09ef59d2d0f7 168 * @return success code -
jburhenn 0:09ef59d2d0f7 169 */
jburhenn 0:09ef59d2d0f7 170 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
jburhenn 0:09ef59d2d0f7 171
jburhenn 0:09ef59d2d0f7 172 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
jburhenn 0:09ef59d2d0f7 173 * @param topicFilter - a topic pattern which can include wildcards
jburhenn 0:09ef59d2d0f7 174 * @param qos - the MQTT QoS to subscribe at
jburhenn 0:09ef59d2d0f7 175 * @param mh - the callback function to be invoked when a message is received for this subscription
jburhenn 0:09ef59d2d0f7 176 * @return success code -
jburhenn 0:09ef59d2d0f7 177 */
jburhenn 0:09ef59d2d0f7 178 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
jburhenn 0:09ef59d2d0f7 179
jburhenn 0:09ef59d2d0f7 180 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
jburhenn 0:09ef59d2d0f7 181 * @param topicFilter - a topic pattern which can include wildcards
jburhenn 0:09ef59d2d0f7 182 * @return success code -
jburhenn 0:09ef59d2d0f7 183 */
jburhenn 0:09ef59d2d0f7 184 int unsubscribe(const char* topicFilter);
jburhenn 0:09ef59d2d0f7 185
jburhenn 0:09ef59d2d0f7 186 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
jburhenn 0:09ef59d2d0f7 187 * @return success code -
jburhenn 0:09ef59d2d0f7 188 */
jburhenn 0:09ef59d2d0f7 189 int disconnect();
jburhenn 0:09ef59d2d0f7 190
jburhenn 0:09ef59d2d0f7 191 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
jburhenn 0:09ef59d2d0f7 192 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
jburhenn 0:09ef59d2d0f7 193 * received.
jburhenn 0:09ef59d2d0f7 194 * @param timeout_ms the time to wait, in milliseconds
jburhenn 0:09ef59d2d0f7 195 * @return success code - on failure, this means the client has disconnected
jburhenn 0:09ef59d2d0f7 196 */
jburhenn 0:09ef59d2d0f7 197 int yield(unsigned long timeout_ms = 1000L);
jburhenn 0:09ef59d2d0f7 198
jburhenn 0:09ef59d2d0f7 199 /** Is the client connected?
jburhenn 0:09ef59d2d0f7 200 * @return flag - is the client connected or not?
jburhenn 0:09ef59d2d0f7 201 */
jburhenn 0:09ef59d2d0f7 202 bool isConnected()
jburhenn 0:09ef59d2d0f7 203 {
jburhenn 0:09ef59d2d0f7 204 return isconnected;
jburhenn 0:09ef59d2d0f7 205 }
jburhenn 0:09ef59d2d0f7 206
jburhenn 0:09ef59d2d0f7 207 private:
jburhenn 0:09ef59d2d0f7 208
jburhenn 0:09ef59d2d0f7 209 void cleanSession();
jburhenn 0:09ef59d2d0f7 210 int cycle(Timer& timer);
jburhenn 0:09ef59d2d0f7 211 int waitfor(int packet_type, Timer& timer);
jburhenn 0:09ef59d2d0f7 212 int keepalive();
jburhenn 0:09ef59d2d0f7 213 int publish(int len, Timer& timer, enum QoS qos);
jburhenn 0:09ef59d2d0f7 214
jburhenn 0:09ef59d2d0f7 215 int decodePacket(int* value, int timeout);
jburhenn 0:09ef59d2d0f7 216 int readPacket(Timer& timer);
jburhenn 0:09ef59d2d0f7 217 int sendPacket(int length, Timer& timer);
jburhenn 0:09ef59d2d0f7 218 int deliverMessage(MQTTString& topicName, Message& message);
jburhenn 0:09ef59d2d0f7 219 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
jburhenn 0:09ef59d2d0f7 220
jburhenn 0:09ef59d2d0f7 221 Network& ipstack;
jburhenn 0:09ef59d2d0f7 222 unsigned long command_timeout_ms;
jburhenn 0:09ef59d2d0f7 223
jburhenn 0:09ef59d2d0f7 224 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
jburhenn 0:09ef59d2d0f7 225 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
jburhenn 0:09ef59d2d0f7 226
jburhenn 0:09ef59d2d0f7 227 Timer last_sent, last_received, ping_response;
jburhenn 0:09ef59d2d0f7 228 unsigned int keepAliveInterval;
jburhenn 0:09ef59d2d0f7 229 bool ping_outstanding;
jburhenn 0:09ef59d2d0f7 230 bool cleansession;
jburhenn 0:09ef59d2d0f7 231
jburhenn 0:09ef59d2d0f7 232 PacketId packetid;
jburhenn 0:09ef59d2d0f7 233
jburhenn 0:09ef59d2d0f7 234 struct MessageHandlers
jburhenn 0:09ef59d2d0f7 235 {
jburhenn 0:09ef59d2d0f7 236 const char* topicFilter;
jburhenn 0:09ef59d2d0f7 237 FP<void, MessageData&> fp;
jburhenn 0:09ef59d2d0f7 238 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
jburhenn 0:09ef59d2d0f7 239
jburhenn 0:09ef59d2d0f7 240 FP<void, MessageData&> defaultMessageHandler;
jburhenn 0:09ef59d2d0f7 241
jburhenn 0:09ef59d2d0f7 242 bool isconnected;
jburhenn 0:09ef59d2d0f7 243
jburhenn 0:09ef59d2d0f7 244 bool connAckReceived;
jburhenn 0:09ef59d2d0f7 245 bool subAckReceived;
jburhenn 0:09ef59d2d0f7 246 bool unsubAckReceived;
jburhenn 0:09ef59d2d0f7 247 bool pubAckReceived;
jburhenn 0:09ef59d2d0f7 248 bool pubCompReceived;
jburhenn 0:09ef59d2d0f7 249
jburhenn 0:09ef59d2d0f7 250 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 251 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
jburhenn 0:09ef59d2d0f7 252 int inflightLen;
jburhenn 0:09ef59d2d0f7 253 unsigned short inflightMsgid;
jburhenn 0:09ef59d2d0f7 254 enum QoS inflightQoS;
jburhenn 0:09ef59d2d0f7 255 #endif
jburhenn 0:09ef59d2d0f7 256
jburhenn 0:09ef59d2d0f7 257 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 258 bool pubrel;
jburhenn 0:09ef59d2d0f7 259 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
jburhenn 0:09ef59d2d0f7 260 #define MAX_INCOMING_QOS2_MESSAGES 10
jburhenn 0:09ef59d2d0f7 261 #endif
jburhenn 0:09ef59d2d0f7 262 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
jburhenn 0:09ef59d2d0f7 263 bool isQoS2msgidFree(unsigned short id);
jburhenn 0:09ef59d2d0f7 264 bool useQoS2msgid(unsigned short id);
jburhenn 0:09ef59d2d0f7 265 void freeQoS2msgid(unsigned short id);
jburhenn 0:09ef59d2d0f7 266 #endif
jburhenn 0:09ef59d2d0f7 267
jburhenn 0:09ef59d2d0f7 268 };
jburhenn 0:09ef59d2d0f7 269
jburhenn 0:09ef59d2d0f7 270 }
jburhenn 0:09ef59d2d0f7 271
jburhenn 0:09ef59d2d0f7 272
jburhenn 0:09ef59d2d0f7 273 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jburhenn 0:09ef59d2d0f7 274 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
jburhenn 0:09ef59d2d0f7 275 {
jburhenn 0:09ef59d2d0f7 276 ping_outstanding = false;
jburhenn 0:09ef59d2d0f7 277 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jburhenn 0:09ef59d2d0f7 278 messageHandlers[i].topicFilter = 0;
jburhenn 0:09ef59d2d0f7 279 isconnected = false;
jburhenn 0:09ef59d2d0f7 280
jburhenn 0:09ef59d2d0f7 281 connAckReceived = false;
jburhenn 0:09ef59d2d0f7 282 subAckReceived = false;
jburhenn 0:09ef59d2d0f7 283 unsubAckReceived = false;
jburhenn 0:09ef59d2d0f7 284 pubAckReceived = false;
jburhenn 0:09ef59d2d0f7 285 pubCompReceived = false;
jburhenn 0:09ef59d2d0f7 286
jburhenn 0:09ef59d2d0f7 287 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 288 inflightMsgid = 0;
jburhenn 0:09ef59d2d0f7 289 inflightQoS = QOS0;
jburhenn 0:09ef59d2d0f7 290 #endif
jburhenn 0:09ef59d2d0f7 291
jburhenn 0:09ef59d2d0f7 292 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 293 pubrel = false;
jburhenn 0:09ef59d2d0f7 294 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jburhenn 0:09ef59d2d0f7 295 incomingQoS2messages[i] = 0;
jburhenn 0:09ef59d2d0f7 296 #endif
jburhenn 0:09ef59d2d0f7 297 }
jburhenn 0:09ef59d2d0f7 298
jburhenn 0:09ef59d2d0f7 299
jburhenn 0:09ef59d2d0f7 300 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jburhenn 0:09ef59d2d0f7 301 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
jburhenn 0:09ef59d2d0f7 302 {
jburhenn 0:09ef59d2d0f7 303 this->command_timeout_ms = command_timeout_ms;
jburhenn 0:09ef59d2d0f7 304 cleanSession();
jburhenn 0:09ef59d2d0f7 305 }
jburhenn 0:09ef59d2d0f7 306
jburhenn 0:09ef59d2d0f7 307
jburhenn 0:09ef59d2d0f7 308 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 309 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 310 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
jburhenn 0:09ef59d2d0f7 311 {
jburhenn 0:09ef59d2d0f7 312 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jburhenn 0:09ef59d2d0f7 313 {
jburhenn 0:09ef59d2d0f7 314 if (incomingQoS2messages[i] == id)
jburhenn 0:09ef59d2d0f7 315 return false;
jburhenn 0:09ef59d2d0f7 316 }
jburhenn 0:09ef59d2d0f7 317 return true;
jburhenn 0:09ef59d2d0f7 318 }
jburhenn 0:09ef59d2d0f7 319
jburhenn 0:09ef59d2d0f7 320
jburhenn 0:09ef59d2d0f7 321 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 322 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
jburhenn 0:09ef59d2d0f7 323 {
jburhenn 0:09ef59d2d0f7 324 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jburhenn 0:09ef59d2d0f7 325 {
jburhenn 0:09ef59d2d0f7 326 if (incomingQoS2messages[i] == 0)
jburhenn 0:09ef59d2d0f7 327 {
jburhenn 0:09ef59d2d0f7 328 incomingQoS2messages[i] = id;
jburhenn 0:09ef59d2d0f7 329 return true;
jburhenn 0:09ef59d2d0f7 330 }
jburhenn 0:09ef59d2d0f7 331 }
jburhenn 0:09ef59d2d0f7 332 return false;
jburhenn 0:09ef59d2d0f7 333 }
jburhenn 0:09ef59d2d0f7 334
jburhenn 0:09ef59d2d0f7 335
jburhenn 0:09ef59d2d0f7 336 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 337 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
jburhenn 0:09ef59d2d0f7 338 {
jburhenn 0:09ef59d2d0f7 339 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jburhenn 0:09ef59d2d0f7 340 {
jburhenn 0:09ef59d2d0f7 341 if (incomingQoS2messages[i] == id)
jburhenn 0:09ef59d2d0f7 342 {
jburhenn 0:09ef59d2d0f7 343 incomingQoS2messages[i] = 0;
jburhenn 0:09ef59d2d0f7 344 return;
jburhenn 0:09ef59d2d0f7 345 }
jburhenn 0:09ef59d2d0f7 346 }
jburhenn 0:09ef59d2d0f7 347 }
jburhenn 0:09ef59d2d0f7 348 #endif
jburhenn 0:09ef59d2d0f7 349
jburhenn 0:09ef59d2d0f7 350
jburhenn 0:09ef59d2d0f7 351 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 352 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
jburhenn 0:09ef59d2d0f7 353 {
jburhenn 0:09ef59d2d0f7 354 int rc = FAILURE,
jburhenn 0:09ef59d2d0f7 355 sent = 0;
jburhenn 0:09ef59d2d0f7 356
jburhenn 0:09ef59d2d0f7 357 while (sent < length && !timer.expired())
jburhenn 0:09ef59d2d0f7 358 {
jburhenn 0:09ef59d2d0f7 359 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
jburhenn 0:09ef59d2d0f7 360 if (rc < 0) // there was an error writing the data
jburhenn 0:09ef59d2d0f7 361 break;
jburhenn 0:09ef59d2d0f7 362 sent += rc;
jburhenn 0:09ef59d2d0f7 363 }
jburhenn 0:09ef59d2d0f7 364 if (sent == length)
jburhenn 0:09ef59d2d0f7 365 {
jburhenn 0:09ef59d2d0f7 366 if (this->keepAliveInterval > 0)
jburhenn 0:09ef59d2d0f7 367 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
jburhenn 0:09ef59d2d0f7 368 rc = SUCCESS;
jburhenn 0:09ef59d2d0f7 369 }
jburhenn 0:09ef59d2d0f7 370 else
jburhenn 0:09ef59d2d0f7 371 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 372
jburhenn 0:09ef59d2d0f7 373 #if defined(MQTT_DEBUG)
jburhenn 0:09ef59d2d0f7 374 char printbuf[150];
jburhenn 0:09ef59d2d0f7 375 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
jburhenn 0:09ef59d2d0f7 376 #endif
jburhenn 0:09ef59d2d0f7 377 return rc;
jburhenn 0:09ef59d2d0f7 378 }
jburhenn 0:09ef59d2d0f7 379
jburhenn 0:09ef59d2d0f7 380
jburhenn 0:09ef59d2d0f7 381 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 382 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
jburhenn 0:09ef59d2d0f7 383 {
jburhenn 0:09ef59d2d0f7 384 unsigned char c;
jburhenn 0:09ef59d2d0f7 385 int multiplier = 1;
jburhenn 0:09ef59d2d0f7 386 int len = 0;
jburhenn 0:09ef59d2d0f7 387 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
jburhenn 0:09ef59d2d0f7 388
jburhenn 0:09ef59d2d0f7 389 *value = 0;
jburhenn 0:09ef59d2d0f7 390 do
jburhenn 0:09ef59d2d0f7 391 {
jburhenn 0:09ef59d2d0f7 392 int rc = MQTTPACKET_READ_ERROR;
jburhenn 0:09ef59d2d0f7 393
jburhenn 0:09ef59d2d0f7 394 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
jburhenn 0:09ef59d2d0f7 395 {
jburhenn 0:09ef59d2d0f7 396 rc = MQTTPACKET_READ_ERROR; /* bad data */
jburhenn 0:09ef59d2d0f7 397 goto exit;
jburhenn 0:09ef59d2d0f7 398 }
jburhenn 0:09ef59d2d0f7 399 rc = ipstack.read(&c, 1, timeout);
jburhenn 0:09ef59d2d0f7 400 if (rc != 1)
jburhenn 0:09ef59d2d0f7 401 goto exit;
jburhenn 0:09ef59d2d0f7 402 *value += (c & 127) * multiplier;
jburhenn 0:09ef59d2d0f7 403 multiplier *= 128;
jburhenn 0:09ef59d2d0f7 404 } while ((c & 128) != 0);
jburhenn 0:09ef59d2d0f7 405 exit:
jburhenn 0:09ef59d2d0f7 406 return len;
jburhenn 0:09ef59d2d0f7 407 }
jburhenn 0:09ef59d2d0f7 408
jburhenn 0:09ef59d2d0f7 409
jburhenn 0:09ef59d2d0f7 410 /**
jburhenn 0:09ef59d2d0f7 411 * If any read fails in this method, then we should disconnect from the network, as on reconnect
jburhenn 0:09ef59d2d0f7 412 * the packets can be retried.
jburhenn 0:09ef59d2d0f7 413 * @param timeout the max time to wait for the packet read to complete, in milliseconds
jburhenn 0:09ef59d2d0f7 414 * @return the MQTT packet type, or -1 if none
jburhenn 0:09ef59d2d0f7 415 */
jburhenn 0:09ef59d2d0f7 416 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 417 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
jburhenn 0:09ef59d2d0f7 418 {
jburhenn 0:09ef59d2d0f7 419 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 420 MQTTHeader header = {0};
jburhenn 0:09ef59d2d0f7 421 int len = 0;
jburhenn 0:09ef59d2d0f7 422 int rem_len = 0;
jburhenn 0:09ef59d2d0f7 423
jburhenn 0:09ef59d2d0f7 424 /* 1. read the header byte. This has the packet type in it */
jburhenn 0:09ef59d2d0f7 425 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
jburhenn 0:09ef59d2d0f7 426 goto exit;
jburhenn 0:09ef59d2d0f7 427
jburhenn 0:09ef59d2d0f7 428 len = 1;
jburhenn 0:09ef59d2d0f7 429 /* 2. read the remaining length. This is variable in itself */
jburhenn 0:09ef59d2d0f7 430 decodePacket(&rem_len, timer.left_ms());
jburhenn 0:09ef59d2d0f7 431 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
jburhenn 0:09ef59d2d0f7 432
jburhenn 0:09ef59d2d0f7 433 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
jburhenn 0:09ef59d2d0f7 434 {
jburhenn 0:09ef59d2d0f7 435 rc = BUFFER_OVERFLOW;
jburhenn 0:09ef59d2d0f7 436 goto exit;
jburhenn 0:09ef59d2d0f7 437 }
jburhenn 0:09ef59d2d0f7 438
jburhenn 0:09ef59d2d0f7 439 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
jburhenn 0:09ef59d2d0f7 440 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
jburhenn 0:09ef59d2d0f7 441 goto exit;
jburhenn 0:09ef59d2d0f7 442
jburhenn 0:09ef59d2d0f7 443 header.byte = readbuf[0];
jburhenn 0:09ef59d2d0f7 444 rc = header.bits.type;
jburhenn 0:09ef59d2d0f7 445 if (this->keepAliveInterval > 0)
jburhenn 0:09ef59d2d0f7 446 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
jburhenn 0:09ef59d2d0f7 447 exit:
jburhenn 0:09ef59d2d0f7 448
jburhenn 0:09ef59d2d0f7 449 #if defined(MQTT_DEBUG)
jburhenn 0:09ef59d2d0f7 450 if (rc >= 0)
jburhenn 0:09ef59d2d0f7 451 {
jburhenn 0:09ef59d2d0f7 452 char printbuf[50];
jburhenn 0:09ef59d2d0f7 453 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
jburhenn 0:09ef59d2d0f7 454 }
jburhenn 0:09ef59d2d0f7 455 #endif
jburhenn 0:09ef59d2d0f7 456 return rc;
jburhenn 0:09ef59d2d0f7 457 }
jburhenn 0:09ef59d2d0f7 458
jburhenn 0:09ef59d2d0f7 459
jburhenn 0:09ef59d2d0f7 460 // assume topic filter and name is in correct format
jburhenn 0:09ef59d2d0f7 461 // # can only be at end
jburhenn 0:09ef59d2d0f7 462 // + and # can only be next to separator
jburhenn 0:09ef59d2d0f7 463 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 464 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
jburhenn 0:09ef59d2d0f7 465 {
jburhenn 0:09ef59d2d0f7 466 char* curf = topicFilter;
jburhenn 0:09ef59d2d0f7 467 char* curn = topicName.lenstring.data;
jburhenn 0:09ef59d2d0f7 468 char* curn_end = curn + topicName.lenstring.len;
jburhenn 0:09ef59d2d0f7 469
jburhenn 0:09ef59d2d0f7 470 while (*curf && curn < curn_end)
jburhenn 0:09ef59d2d0f7 471 {
jburhenn 0:09ef59d2d0f7 472 if (*curn == '/' && *curf != '/')
jburhenn 0:09ef59d2d0f7 473 break;
jburhenn 0:09ef59d2d0f7 474 if (*curf != '+' && *curf != '#' && *curf != *curn)
jburhenn 0:09ef59d2d0f7 475 break;
jburhenn 0:09ef59d2d0f7 476 if (*curf == '+')
jburhenn 0:09ef59d2d0f7 477 { // skip until we meet the next separator, or end of string
jburhenn 0:09ef59d2d0f7 478 char* nextpos = curn + 1;
jburhenn 0:09ef59d2d0f7 479 while (nextpos < curn_end && *nextpos != '/')
jburhenn 0:09ef59d2d0f7 480 nextpos = ++curn + 1;
jburhenn 0:09ef59d2d0f7 481 }
jburhenn 0:09ef59d2d0f7 482 else if (*curf == '#')
jburhenn 0:09ef59d2d0f7 483 curn = curn_end - 1; // skip until end of string
jburhenn 0:09ef59d2d0f7 484 curf++;
jburhenn 0:09ef59d2d0f7 485 curn++;
jburhenn 0:09ef59d2d0f7 486 };
jburhenn 0:09ef59d2d0f7 487
jburhenn 0:09ef59d2d0f7 488 return (curn == curn_end) && (*curf == '\0');
jburhenn 0:09ef59d2d0f7 489 }
jburhenn 0:09ef59d2d0f7 490
jburhenn 0:09ef59d2d0f7 491
jburhenn 0:09ef59d2d0f7 492
jburhenn 0:09ef59d2d0f7 493 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
jburhenn 0:09ef59d2d0f7 494 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
jburhenn 0:09ef59d2d0f7 495 {
jburhenn 0:09ef59d2d0f7 496 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 497 // we have to find the right message handler - indexed by topic
jburhenn 0:09ef59d2d0f7 498 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jburhenn 0:09ef59d2d0f7 499 {
jburhenn 0:09ef59d2d0f7 500 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
jburhenn 0:09ef59d2d0f7 501 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
jburhenn 0:09ef59d2d0f7 502 {
jburhenn 0:09ef59d2d0f7 503 if (messageHandlers[i].fp.attached())
jburhenn 0:09ef59d2d0f7 504 {
jburhenn 0:09ef59d2d0f7 505 MessageData md(topicName, message);
jburhenn 0:09ef59d2d0f7 506 messageHandlers[i].fp(md);
jburhenn 0:09ef59d2d0f7 507 rc = SUCCESS;
jburhenn 0:09ef59d2d0f7 508 }
jburhenn 0:09ef59d2d0f7 509 }
jburhenn 0:09ef59d2d0f7 510 }
jburhenn 0:09ef59d2d0f7 511
jburhenn 0:09ef59d2d0f7 512 if (rc == FAILURE && defaultMessageHandler.attached())
jburhenn 0:09ef59d2d0f7 513 {
jburhenn 0:09ef59d2d0f7 514 MessageData md(topicName, message);
jburhenn 0:09ef59d2d0f7 515 defaultMessageHandler(md);
jburhenn 0:09ef59d2d0f7 516 rc = SUCCESS;
jburhenn 0:09ef59d2d0f7 517 }
jburhenn 0:09ef59d2d0f7 518
jburhenn 0:09ef59d2d0f7 519 return rc;
jburhenn 0:09ef59d2d0f7 520 }
jburhenn 0:09ef59d2d0f7 521
jburhenn 0:09ef59d2d0f7 522
jburhenn 0:09ef59d2d0f7 523
jburhenn 0:09ef59d2d0f7 524 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 525 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
jburhenn 0:09ef59d2d0f7 526 {
jburhenn 0:09ef59d2d0f7 527 int rc = SUCCESS;
jburhenn 0:09ef59d2d0f7 528 Timer timer;
jburhenn 0:09ef59d2d0f7 529
jburhenn 0:09ef59d2d0f7 530 timer.countdown_ms(timeout_ms);
jburhenn 0:09ef59d2d0f7 531 while (!timer.expired())
jburhenn 0:09ef59d2d0f7 532 {
jburhenn 0:09ef59d2d0f7 533 if (cycle(timer) < 0)
jburhenn 0:09ef59d2d0f7 534 {
jburhenn 0:09ef59d2d0f7 535 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 536 break;
jburhenn 0:09ef59d2d0f7 537 }
jburhenn 0:09ef59d2d0f7 538 }
jburhenn 0:09ef59d2d0f7 539
jburhenn 0:09ef59d2d0f7 540 return rc;
jburhenn 0:09ef59d2d0f7 541 }
jburhenn 0:09ef59d2d0f7 542
jburhenn 0:09ef59d2d0f7 543
jburhenn 0:09ef59d2d0f7 544 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 545 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
jburhenn 0:09ef59d2d0f7 546 {
jburhenn 0:09ef59d2d0f7 547 /* get one piece of work off the wire and one pass through */
jburhenn 0:09ef59d2d0f7 548
jburhenn 0:09ef59d2d0f7 549 // read the socket, see what work is due
jburhenn 0:09ef59d2d0f7 550 int packet_type = readPacket(timer);
jburhenn 0:09ef59d2d0f7 551
jburhenn 0:09ef59d2d0f7 552 int len = 0,
jburhenn 0:09ef59d2d0f7 553 rc = SUCCESS;
jburhenn 0:09ef59d2d0f7 554
jburhenn 0:09ef59d2d0f7 555 switch (packet_type)
jburhenn 0:09ef59d2d0f7 556 {
jburhenn 0:09ef59d2d0f7 557 case FAILURE:
jburhenn 0:09ef59d2d0f7 558 case BUFFER_OVERFLOW:
jburhenn 0:09ef59d2d0f7 559 rc = packet_type;
jburhenn 0:09ef59d2d0f7 560 break;
jburhenn 0:09ef59d2d0f7 561 case CONNACK_MSG:
jburhenn 0:09ef59d2d0f7 562 connAckReceived = true;
jburhenn 0:09ef59d2d0f7 563 break;
jburhenn 0:09ef59d2d0f7 564 case PUBACK_MSG:
jburhenn 0:09ef59d2d0f7 565 pubAckReceived = true;
jburhenn 0:09ef59d2d0f7 566 break;
jburhenn 0:09ef59d2d0f7 567 case SUBACK_MSG:
jburhenn 0:09ef59d2d0f7 568 subAckReceived = true;
jburhenn 0:09ef59d2d0f7 569 break;
jburhenn 0:09ef59d2d0f7 570 case UNSUBACK_MSG:
jburhenn 0:09ef59d2d0f7 571 unsubAckReceived = true;
jburhenn 0:09ef59d2d0f7 572 break;
jburhenn 0:09ef59d2d0f7 573 case PUBLISH_MSG:
jburhenn 0:09ef59d2d0f7 574 {
jburhenn 0:09ef59d2d0f7 575 MQTTString topicName = MQTTString_initializer;
jburhenn 0:09ef59d2d0f7 576 Message msg;
jburhenn 0:09ef59d2d0f7 577 int intQoS;
jburhenn 0:09ef59d2d0f7 578 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
jburhenn 22:0dbabcc6e7b2 579 (unsigned char**)&msg.payload, &msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jburhenn 0:09ef59d2d0f7 580 goto exit;
jburhenn 0:09ef59d2d0f7 581 msg.qos = (enum QoS)intQoS;
jburhenn 0:09ef59d2d0f7 582 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 583 if (msg.qos != QOS2)
jburhenn 0:09ef59d2d0f7 584 #endif
jburhenn 0:09ef59d2d0f7 585 deliverMessage(topicName, msg);
jburhenn 0:09ef59d2d0f7 586 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 587 else if (isQoS2msgidFree(msg.id))
jburhenn 0:09ef59d2d0f7 588 {
jburhenn 0:09ef59d2d0f7 589 if (useQoS2msgid(msg.id))
jburhenn 0:09ef59d2d0f7 590 deliverMessage(topicName, msg);
jburhenn 0:09ef59d2d0f7 591 else
jburhenn 0:09ef59d2d0f7 592 WARN("Maximum number of incoming QoS2 messages exceeded");
jburhenn 0:09ef59d2d0f7 593 }
jburhenn 0:09ef59d2d0f7 594 #endif
jburhenn 0:09ef59d2d0f7 595 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 596 if (msg.qos != QOS0)
jburhenn 0:09ef59d2d0f7 597 {
jburhenn 0:09ef59d2d0f7 598 if (msg.qos == QOS1)
jburhenn 0:09ef59d2d0f7 599 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK_MSG, 0, msg.id);
jburhenn 0:09ef59d2d0f7 600 else if (msg.qos == QOS2)
jburhenn 0:09ef59d2d0f7 601 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC_MSG, 0, msg.id);
jburhenn 0:09ef59d2d0f7 602 if (len <= 0)
jburhenn 0:09ef59d2d0f7 603 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 604 else
jburhenn 0:09ef59d2d0f7 605 rc = sendPacket(len, timer);
jburhenn 0:09ef59d2d0f7 606 if (rc == FAILURE)
jburhenn 0:09ef59d2d0f7 607 goto exit; // there was a problem
jburhenn 0:09ef59d2d0f7 608 }
jburhenn 0:09ef59d2d0f7 609 break;
jburhenn 0:09ef59d2d0f7 610 #endif
jburhenn 0:09ef59d2d0f7 611 }
jburhenn 0:09ef59d2d0f7 612 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 613 case PUBREC_MSG:
jburhenn 0:09ef59d2d0f7 614 case PUBREL_MSG:
jburhenn 0:09ef59d2d0f7 615 unsigned short mypacketid;
jburhenn 0:09ef59d2d0f7 616 unsigned char dup, type;
jburhenn 0:09ef59d2d0f7 617 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jburhenn 0:09ef59d2d0f7 618 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 619 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
jburhenn 0:09ef59d2d0f7 620 (packet_type == PUBREC_MSG) ? PUBREL_MSG : PUBCOMP_MSG, 0, mypacketid)) <= 0)
jburhenn 0:09ef59d2d0f7 621 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 622 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL_MSG packet
jburhenn 0:09ef59d2d0f7 623 rc = FAILURE; // there was a problem
jburhenn 0:09ef59d2d0f7 624 if (rc == FAILURE)
jburhenn 0:09ef59d2d0f7 625 goto exit; // there was a problem
jburhenn 0:09ef59d2d0f7 626 if (packet_type == PUBREL_MSG)
jburhenn 0:09ef59d2d0f7 627 freeQoS2msgid(mypacketid);
jburhenn 0:09ef59d2d0f7 628 break;
jburhenn 0:09ef59d2d0f7 629
jburhenn 0:09ef59d2d0f7 630 case PUBCOMP_MSG:
jburhenn 0:09ef59d2d0f7 631 pubCompReceived = true;
jburhenn 0:09ef59d2d0f7 632 break;
jburhenn 0:09ef59d2d0f7 633 #endif
jburhenn 0:09ef59d2d0f7 634 case PINGRESP_MSG:
jburhenn 0:09ef59d2d0f7 635 ping_outstanding = false;
jburhenn 0:09ef59d2d0f7 636 break;
jburhenn 0:09ef59d2d0f7 637 }
jburhenn 0:09ef59d2d0f7 638 keepalive();
jburhenn 0:09ef59d2d0f7 639 exit:
jburhenn 0:09ef59d2d0f7 640 if (rc == SUCCESS)
jburhenn 0:09ef59d2d0f7 641 rc = packet_type;
jburhenn 0:09ef59d2d0f7 642 return rc;
jburhenn 0:09ef59d2d0f7 643 }
jburhenn 0:09ef59d2d0f7 644
jburhenn 0:09ef59d2d0f7 645
jburhenn 0:09ef59d2d0f7 646 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 647 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
jburhenn 0:09ef59d2d0f7 648 {
jburhenn 0:09ef59d2d0f7 649 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 650
jburhenn 0:09ef59d2d0f7 651 if (keepAliveInterval == 0)
jburhenn 0:09ef59d2d0f7 652 {
jburhenn 0:09ef59d2d0f7 653 rc = SUCCESS;
jburhenn 0:09ef59d2d0f7 654 goto exit;
jburhenn 0:09ef59d2d0f7 655 }
jburhenn 0:09ef59d2d0f7 656
jburhenn 0:09ef59d2d0f7 657 if (last_sent.expired() || last_received.expired())
jburhenn 0:09ef59d2d0f7 658 {
jburhenn 0:09ef59d2d0f7 659 if (!ping_outstanding)
jburhenn 0:09ef59d2d0f7 660 {
jburhenn 0:09ef59d2d0f7 661 Timer timer(1000);
jburhenn 0:09ef59d2d0f7 662 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
jburhenn 0:09ef59d2d0f7 663 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
jburhenn 0:09ef59d2d0f7 664 {
jburhenn 0:09ef59d2d0f7 665 ping_response.countdown(this->keepAliveInterval);
jburhenn 0:09ef59d2d0f7 666 ping_outstanding = true;
jburhenn 0:09ef59d2d0f7 667 }
jburhenn 0:09ef59d2d0f7 668 }
jburhenn 0:09ef59d2d0f7 669 else if(ping_response.expired())
jburhenn 0:09ef59d2d0f7 670 {
jburhenn 0:09ef59d2d0f7 671 isconnected = false;
jburhenn 0:09ef59d2d0f7 672 }
jburhenn 0:09ef59d2d0f7 673 }
jburhenn 0:09ef59d2d0f7 674
jburhenn 0:09ef59d2d0f7 675 exit:
jburhenn 0:09ef59d2d0f7 676 return rc;
jburhenn 0:09ef59d2d0f7 677 }
jburhenn 0:09ef59d2d0f7 678
jburhenn 0:09ef59d2d0f7 679
jburhenn 0:09ef59d2d0f7 680 // only used in single-threaded mode where one command at a time is in process
jburhenn 0:09ef59d2d0f7 681 template<class Network, class Timer, int a, int b>
jburhenn 0:09ef59d2d0f7 682 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
jburhenn 0:09ef59d2d0f7 683 {
jburhenn 0:09ef59d2d0f7 684 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 685
jburhenn 0:09ef59d2d0f7 686 // Use bool values to determine if a packet type has been received. This only works if waitfor is
jburhenn 0:09ef59d2d0f7 687 // called once at a time per type. However, it can be called with a different type at the same
jburhenn 0:09ef59d2d0f7 688 // time, for instance, while waiting for a subscription acknowledgement (SUBACK_MSG) we could
jburhenn 0:09ef59d2d0f7 689 // publish a QoS1 message and wait for the acknowledgement (PUBACK_MSG).
jburhenn 0:09ef59d2d0f7 690 switch(packet_type)
jburhenn 0:09ef59d2d0f7 691 {
jburhenn 0:09ef59d2d0f7 692 case CONNACK_MSG:
jburhenn 0:09ef59d2d0f7 693 connAckReceived = false;
jburhenn 0:09ef59d2d0f7 694 break;
jburhenn 0:09ef59d2d0f7 695 case SUBACK_MSG:
jburhenn 0:09ef59d2d0f7 696 subAckReceived = false;
jburhenn 0:09ef59d2d0f7 697 break;
jburhenn 0:09ef59d2d0f7 698 case UNSUBACK_MSG:
jburhenn 0:09ef59d2d0f7 699 unsubAckReceived = false;
jburhenn 0:09ef59d2d0f7 700 break;
jburhenn 0:09ef59d2d0f7 701 case PUBACK_MSG:
jburhenn 0:09ef59d2d0f7 702 pubAckReceived = false;
jburhenn 0:09ef59d2d0f7 703 break;
jburhenn 0:09ef59d2d0f7 704 case PUBCOMP_MSG:
jburhenn 0:09ef59d2d0f7 705 pubCompReceived = false;
jburhenn 0:09ef59d2d0f7 706 break;
jburhenn 0:09ef59d2d0f7 707 }
jburhenn 0:09ef59d2d0f7 708
jburhenn 0:09ef59d2d0f7 709 do
jburhenn 0:09ef59d2d0f7 710 {
jburhenn 0:09ef59d2d0f7 711 switch(packet_type)
jburhenn 0:09ef59d2d0f7 712 {
jburhenn 0:09ef59d2d0f7 713 case CONNACK_MSG:
jburhenn 0:09ef59d2d0f7 714 if(connAckReceived) {
jburhenn 0:09ef59d2d0f7 715 connAckReceived = false;
jburhenn 0:09ef59d2d0f7 716 return packet_type;
jburhenn 0:09ef59d2d0f7 717 }
jburhenn 0:09ef59d2d0f7 718 break;
jburhenn 0:09ef59d2d0f7 719 case SUBACK_MSG:
jburhenn 0:09ef59d2d0f7 720 if(subAckReceived) {
jburhenn 0:09ef59d2d0f7 721 subAckReceived = false;
jburhenn 0:09ef59d2d0f7 722 return packet_type;
jburhenn 0:09ef59d2d0f7 723 }
jburhenn 0:09ef59d2d0f7 724 break;
jburhenn 0:09ef59d2d0f7 725 case UNSUBACK_MSG:
jburhenn 0:09ef59d2d0f7 726 if(unsubAckReceived) {
jburhenn 0:09ef59d2d0f7 727 unsubAckReceived = false;
jburhenn 0:09ef59d2d0f7 728 return packet_type;
jburhenn 0:09ef59d2d0f7 729 }
jburhenn 0:09ef59d2d0f7 730 break;
jburhenn 0:09ef59d2d0f7 731 case PUBACK_MSG:
jburhenn 0:09ef59d2d0f7 732 if(pubAckReceived) {
jburhenn 0:09ef59d2d0f7 733 pubAckReceived = false;
jburhenn 0:09ef59d2d0f7 734 return packet_type;
jburhenn 0:09ef59d2d0f7 735 }
jburhenn 0:09ef59d2d0f7 736 break;
jburhenn 0:09ef59d2d0f7 737 case PUBCOMP_MSG:
jburhenn 0:09ef59d2d0f7 738 if(pubCompReceived) {
jburhenn 0:09ef59d2d0f7 739 pubCompReceived = false;
jburhenn 0:09ef59d2d0f7 740 return packet_type;
jburhenn 0:09ef59d2d0f7 741 }
jburhenn 0:09ef59d2d0f7 742 break;
jburhenn 0:09ef59d2d0f7 743 }
jburhenn 0:09ef59d2d0f7 744 if (timer.expired())
jburhenn 0:09ef59d2d0f7 745 break; // we timed out
jburhenn 0:09ef59d2d0f7 746 cycle(timer);
jburhenn 0:09ef59d2d0f7 747 }
jburhenn 0:09ef59d2d0f7 748 while (true);
jburhenn 0:09ef59d2d0f7 749
jburhenn 0:09ef59d2d0f7 750 return rc;
jburhenn 0:09ef59d2d0f7 751 }
jburhenn 0:09ef59d2d0f7 752
jburhenn 0:09ef59d2d0f7 753
jburhenn 0:09ef59d2d0f7 754 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 755 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
jburhenn 0:09ef59d2d0f7 756 {
jburhenn 0:09ef59d2d0f7 757 Timer connect_timer(command_timeout_ms);
jburhenn 0:09ef59d2d0f7 758 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 759 int len = 0;
jburhenn 0:09ef59d2d0f7 760
jburhenn 0:09ef59d2d0f7 761 if (isconnected) // don't send connect packet again if we are already connected
jburhenn 0:09ef59d2d0f7 762 goto exit;
jburhenn 0:09ef59d2d0f7 763
jburhenn 0:09ef59d2d0f7 764 this->keepAliveInterval = options.keepAliveInterval;
jburhenn 22:0dbabcc6e7b2 765 this->cleansession = (options.cleansession != 0);
jburhenn 0:09ef59d2d0f7 766 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
jburhenn 0:09ef59d2d0f7 767 goto exit;
jburhenn 0:09ef59d2d0f7 768 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
jburhenn 0:09ef59d2d0f7 769 goto exit; // there was a problem
jburhenn 0:09ef59d2d0f7 770
jburhenn 0:09ef59d2d0f7 771 if (this->keepAliveInterval > 0)
jburhenn 0:09ef59d2d0f7 772 last_received.countdown(this->keepAliveInterval);
jburhenn 0:09ef59d2d0f7 773 // this will be a blocking call, wait for the connack
jburhenn 0:09ef59d2d0f7 774 if (waitfor(CONNACK_MSG, connect_timer) == CONNACK_MSG)
jburhenn 0:09ef59d2d0f7 775 {
jburhenn 0:09ef59d2d0f7 776 unsigned char connack_rc = 255;
jburhenn 0:09ef59d2d0f7 777 bool sessionPresent = false;
jburhenn 0:09ef59d2d0f7 778 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jburhenn 0:09ef59d2d0f7 779 rc = connack_rc;
jburhenn 0:09ef59d2d0f7 780 else
jburhenn 0:09ef59d2d0f7 781 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 782 }
jburhenn 0:09ef59d2d0f7 783 else
jburhenn 0:09ef59d2d0f7 784 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 785
jburhenn 0:09ef59d2d0f7 786 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 787 // resend any inflight publish
jburhenn 0:09ef59d2d0f7 788 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
jburhenn 0:09ef59d2d0f7 789 {
jburhenn 0:09ef59d2d0f7 790 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL_MSG, 0, inflightMsgid)) <= 0)
jburhenn 0:09ef59d2d0f7 791 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 792 else
jburhenn 0:09ef59d2d0f7 793 rc = publish(len, connect_timer, inflightQoS);
jburhenn 0:09ef59d2d0f7 794 }
jburhenn 0:09ef59d2d0f7 795 else
jburhenn 0:09ef59d2d0f7 796 #endif
jburhenn 0:09ef59d2d0f7 797 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 798 if (inflightMsgid > 0)
jburhenn 0:09ef59d2d0f7 799 {
jburhenn 0:09ef59d2d0f7 800 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
jburhenn 0:09ef59d2d0f7 801 rc = publish(inflightLen, connect_timer, inflightQoS);
jburhenn 0:09ef59d2d0f7 802 }
jburhenn 0:09ef59d2d0f7 803 #endif
jburhenn 0:09ef59d2d0f7 804
jburhenn 0:09ef59d2d0f7 805 exit:
jburhenn 0:09ef59d2d0f7 806 if (rc == SUCCESS)
jburhenn 0:09ef59d2d0f7 807 isconnected = true;
jburhenn 0:09ef59d2d0f7 808 return rc;
jburhenn 0:09ef59d2d0f7 809 }
jburhenn 0:09ef59d2d0f7 810
jburhenn 0:09ef59d2d0f7 811
jburhenn 0:09ef59d2d0f7 812 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 813 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
jburhenn 0:09ef59d2d0f7 814 {
jburhenn 0:09ef59d2d0f7 815 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
jburhenn 0:09ef59d2d0f7 816 return connect(default_options);
jburhenn 0:09ef59d2d0f7 817 }
jburhenn 0:09ef59d2d0f7 818
jburhenn 0:09ef59d2d0f7 819
jburhenn 0:09ef59d2d0f7 820 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jburhenn 0:09ef59d2d0f7 821 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
jburhenn 0:09ef59d2d0f7 822 {
jburhenn 0:09ef59d2d0f7 823 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 824 Timer timer(command_timeout_ms);
jburhenn 0:09ef59d2d0f7 825 int len = 0;
jburhenn 0:09ef59d2d0f7 826 MQTTString topic = {(char*)topicFilter, {0, 0}};
jburhenn 0:09ef59d2d0f7 827
jburhenn 0:09ef59d2d0f7 828 if (!isconnected)
jburhenn 0:09ef59d2d0f7 829 goto exit;
jburhenn 0:09ef59d2d0f7 830
jburhenn 0:09ef59d2d0f7 831 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
jburhenn 0:09ef59d2d0f7 832 if (len <= 0)
jburhenn 0:09ef59d2d0f7 833 goto exit;
jburhenn 0:09ef59d2d0f7 834 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
jburhenn 0:09ef59d2d0f7 835 goto exit; // there was a problem
jburhenn 0:09ef59d2d0f7 836
jburhenn 0:09ef59d2d0f7 837 if (waitfor(SUBACK_MSG, timer) == SUBACK_MSG) // wait for suback
jburhenn 0:09ef59d2d0f7 838 {
jburhenn 0:09ef59d2d0f7 839 int count = 0, grantedQoS = -1;
jburhenn 0:09ef59d2d0f7 840 unsigned short mypacketid;
jburhenn 0:09ef59d2d0f7 841 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jburhenn 0:09ef59d2d0f7 842 rc = grantedQoS; // 0, 1, 2 or 0x80
jburhenn 0:09ef59d2d0f7 843 if (rc != 0x80)
jburhenn 0:09ef59d2d0f7 844 {
jburhenn 0:09ef59d2d0f7 845 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jburhenn 0:09ef59d2d0f7 846 {
jburhenn 0:09ef59d2d0f7 847 if (messageHandlers[i].topicFilter == 0)
jburhenn 0:09ef59d2d0f7 848 {
jburhenn 0:09ef59d2d0f7 849 messageHandlers[i].topicFilter = topicFilter;
jburhenn 0:09ef59d2d0f7 850 messageHandlers[i].fp.attach(messageHandler);
jburhenn 0:09ef59d2d0f7 851 rc = 0;
jburhenn 0:09ef59d2d0f7 852 break;
jburhenn 0:09ef59d2d0f7 853 }
jburhenn 0:09ef59d2d0f7 854 }
jburhenn 0:09ef59d2d0f7 855 }
jburhenn 0:09ef59d2d0f7 856 }
jburhenn 0:09ef59d2d0f7 857 else
jburhenn 0:09ef59d2d0f7 858 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 859
jburhenn 0:09ef59d2d0f7 860 exit:
jburhenn 0:09ef59d2d0f7 861 if (rc != SUCCESS)
jburhenn 0:09ef59d2d0f7 862 cleanSession();
jburhenn 0:09ef59d2d0f7 863 return rc;
jburhenn 0:09ef59d2d0f7 864 }
jburhenn 0:09ef59d2d0f7 865
jburhenn 0:09ef59d2d0f7 866
jburhenn 0:09ef59d2d0f7 867 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jburhenn 0:09ef59d2d0f7 868 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
jburhenn 0:09ef59d2d0f7 869 {
jburhenn 0:09ef59d2d0f7 870 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 871 Timer timer(command_timeout_ms);
jburhenn 0:09ef59d2d0f7 872 MQTTString topic = {(char*)topicFilter, {0, 0}};
jburhenn 0:09ef59d2d0f7 873 int len = 0;
jburhenn 0:09ef59d2d0f7 874
jburhenn 0:09ef59d2d0f7 875 if (!isconnected)
jburhenn 0:09ef59d2d0f7 876 goto exit;
jburhenn 0:09ef59d2d0f7 877
jburhenn 0:09ef59d2d0f7 878 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
jburhenn 0:09ef59d2d0f7 879 goto exit;
jburhenn 0:09ef59d2d0f7 880 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
jburhenn 0:09ef59d2d0f7 881 goto exit; // there was a problem
jburhenn 0:09ef59d2d0f7 882
jburhenn 0:09ef59d2d0f7 883 if (waitfor(UNSUBACK_MSG, timer) == UNSUBACK_MSG)
jburhenn 0:09ef59d2d0f7 884 {
jburhenn 0:09ef59d2d0f7 885 unsigned short mypacketid; // should be the same as the packetid above
jburhenn 0:09ef59d2d0f7 886 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jburhenn 0:09ef59d2d0f7 887 {
jburhenn 0:09ef59d2d0f7 888 rc = 0;
jburhenn 0:09ef59d2d0f7 889
jburhenn 0:09ef59d2d0f7 890 // remove the subscription message handler associated with this topic, if there is one
jburhenn 0:09ef59d2d0f7 891 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jburhenn 0:09ef59d2d0f7 892 {
jburhenn 0:09ef59d2d0f7 893 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
jburhenn 0:09ef59d2d0f7 894 {
jburhenn 0:09ef59d2d0f7 895 messageHandlers[i].topicFilter = 0;
jburhenn 0:09ef59d2d0f7 896 break;
jburhenn 0:09ef59d2d0f7 897 }
jburhenn 0:09ef59d2d0f7 898 }
jburhenn 0:09ef59d2d0f7 899 }
jburhenn 0:09ef59d2d0f7 900 }
jburhenn 0:09ef59d2d0f7 901 else
jburhenn 0:09ef59d2d0f7 902 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 903
jburhenn 0:09ef59d2d0f7 904 exit:
jburhenn 0:09ef59d2d0f7 905 if (rc != SUCCESS)
jburhenn 0:09ef59d2d0f7 906 cleanSession();
jburhenn 0:09ef59d2d0f7 907 return rc;
jburhenn 0:09ef59d2d0f7 908 }
jburhenn 0:09ef59d2d0f7 909
jburhenn 0:09ef59d2d0f7 910
jburhenn 0:09ef59d2d0f7 911 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 912 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
jburhenn 0:09ef59d2d0f7 913 {
jburhenn 0:09ef59d2d0f7 914 int rc;
jburhenn 0:09ef59d2d0f7 915
jburhenn 0:09ef59d2d0f7 916 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
jburhenn 0:09ef59d2d0f7 917 goto exit; // there was a problem
jburhenn 0:09ef59d2d0f7 918
jburhenn 0:09ef59d2d0f7 919 #if MQTTCLIENT_QOS1
jburhenn 0:09ef59d2d0f7 920 if (qos == QOS1)
jburhenn 0:09ef59d2d0f7 921 {
jburhenn 0:09ef59d2d0f7 922 if (waitfor(PUBACK_MSG, timer) == PUBACK_MSG)
jburhenn 0:09ef59d2d0f7 923 {
jburhenn 0:09ef59d2d0f7 924 unsigned short mypacketid;
jburhenn 0:09ef59d2d0f7 925 unsigned char dup, type;
jburhenn 0:09ef59d2d0f7 926 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jburhenn 0:09ef59d2d0f7 927 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 928 else if (inflightMsgid == mypacketid)
jburhenn 0:09ef59d2d0f7 929 inflightMsgid = 0;
jburhenn 0:09ef59d2d0f7 930 }
jburhenn 0:09ef59d2d0f7 931 else
jburhenn 0:09ef59d2d0f7 932 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 933 }
jburhenn 0:09ef59d2d0f7 934 #endif
jburhenn 0:09ef59d2d0f7 935 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 936 if (qos == QOS2)
jburhenn 0:09ef59d2d0f7 937 {
jburhenn 0:09ef59d2d0f7 938 if (waitfor(PUBCOMP_MSG, timer) == PUBCOMP_MSG)
jburhenn 0:09ef59d2d0f7 939 {
jburhenn 0:09ef59d2d0f7 940 unsigned short mypacketid;
jburhenn 0:09ef59d2d0f7 941 unsigned char dup, type;
jburhenn 0:09ef59d2d0f7 942 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jburhenn 0:09ef59d2d0f7 943 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 944 else if (inflightMsgid == mypacketid)
jburhenn 0:09ef59d2d0f7 945 inflightMsgid = 0;
jburhenn 0:09ef59d2d0f7 946 }
jburhenn 0:09ef59d2d0f7 947 else
jburhenn 0:09ef59d2d0f7 948 rc = FAILURE;
jburhenn 0:09ef59d2d0f7 949 }
jburhenn 0:09ef59d2d0f7 950 #endif
jburhenn 0:09ef59d2d0f7 951
jburhenn 0:09ef59d2d0f7 952 exit:
jburhenn 0:09ef59d2d0f7 953 if (rc != SUCCESS)
jburhenn 0:09ef59d2d0f7 954 cleanSession();
jburhenn 0:09ef59d2d0f7 955 return rc;
jburhenn 0:09ef59d2d0f7 956 }
jburhenn 0:09ef59d2d0f7 957
jburhenn 0:09ef59d2d0f7 958
jburhenn 0:09ef59d2d0f7 959
jburhenn 0:09ef59d2d0f7 960 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 961 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
jburhenn 0:09ef59d2d0f7 962 {
jburhenn 0:09ef59d2d0f7 963 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 964 Timer timer(command_timeout_ms);
jburhenn 0:09ef59d2d0f7 965 MQTTString topicString = MQTTString_initializer;
jburhenn 0:09ef59d2d0f7 966 int len = 0;
jburhenn 0:09ef59d2d0f7 967
jburhenn 0:09ef59d2d0f7 968 if (!isconnected)
jburhenn 0:09ef59d2d0f7 969 goto exit;
jburhenn 0:09ef59d2d0f7 970
jburhenn 0:09ef59d2d0f7 971 topicString.cstring = (char*)topicName;
jburhenn 0:09ef59d2d0f7 972
jburhenn 0:09ef59d2d0f7 973 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 974 if (qos == QOS1 || qos == QOS2)
jburhenn 0:09ef59d2d0f7 975 id = packetid.getNext();
jburhenn 0:09ef59d2d0f7 976 #endif
jburhenn 0:09ef59d2d0f7 977
jburhenn 0:09ef59d2d0f7 978 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
jburhenn 0:09ef59d2d0f7 979 topicString, (unsigned char*)payload, payloadlen);
jburhenn 0:09ef59d2d0f7 980 if (len <= 0)
jburhenn 0:09ef59d2d0f7 981 goto exit;
jburhenn 0:09ef59d2d0f7 982
jburhenn 0:09ef59d2d0f7 983 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 984 if (!cleansession)
jburhenn 0:09ef59d2d0f7 985 {
jburhenn 0:09ef59d2d0f7 986 memcpy(pubbuf, sendbuf, len);
jburhenn 0:09ef59d2d0f7 987 inflightMsgid = id;
jburhenn 0:09ef59d2d0f7 988 inflightLen = len;
jburhenn 0:09ef59d2d0f7 989 inflightQoS = qos;
jburhenn 0:09ef59d2d0f7 990 #if MQTTCLIENT_QOS2
jburhenn 0:09ef59d2d0f7 991 pubrel = false;
jburhenn 0:09ef59d2d0f7 992 #endif
jburhenn 0:09ef59d2d0f7 993 }
jburhenn 0:09ef59d2d0f7 994 #endif
jburhenn 0:09ef59d2d0f7 995
jburhenn 0:09ef59d2d0f7 996 rc = publish(len, timer, qos);
jburhenn 0:09ef59d2d0f7 997 exit:
jburhenn 0:09ef59d2d0f7 998 return rc;
jburhenn 0:09ef59d2d0f7 999 }
jburhenn 0:09ef59d2d0f7 1000
jburhenn 0:09ef59d2d0f7 1001
jburhenn 0:09ef59d2d0f7 1002 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 1003 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
jburhenn 0:09ef59d2d0f7 1004 {
jburhenn 0:09ef59d2d0f7 1005 unsigned short id = 0; // dummy - not used for anything
jburhenn 0:09ef59d2d0f7 1006 return publish(topicName, payload, payloadlen, id, qos, retained);
jburhenn 0:09ef59d2d0f7 1007 }
jburhenn 0:09ef59d2d0f7 1008
jburhenn 0:09ef59d2d0f7 1009
jburhenn 0:09ef59d2d0f7 1010 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 1011 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
jburhenn 0:09ef59d2d0f7 1012 {
jburhenn 0:09ef59d2d0f7 1013 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
jburhenn 0:09ef59d2d0f7 1014 }
jburhenn 0:09ef59d2d0f7 1015
jburhenn 0:09ef59d2d0f7 1016
jburhenn 0:09ef59d2d0f7 1017 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
jburhenn 0:09ef59d2d0f7 1018 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
jburhenn 0:09ef59d2d0f7 1019 {
jburhenn 0:09ef59d2d0f7 1020 int rc = FAILURE;
jburhenn 0:09ef59d2d0f7 1021 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
jburhenn 0:09ef59d2d0f7 1022 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
jburhenn 0:09ef59d2d0f7 1023 if (len > 0)
jburhenn 0:09ef59d2d0f7 1024 rc = sendPacket(len, timer); // send the disconnect packet
jburhenn 0:09ef59d2d0f7 1025
jburhenn 0:09ef59d2d0f7 1026 if (cleansession)
jburhenn 0:09ef59d2d0f7 1027 cleanSession();
jburhenn 0:09ef59d2d0f7 1028 else
jburhenn 0:09ef59d2d0f7 1029 isconnected = false;
jburhenn 0:09ef59d2d0f7 1030 return rc;
jburhenn 0:09ef59d2d0f7 1031 }
jburhenn 0:09ef59d2d0f7 1032
jburhenn 0:09ef59d2d0f7 1033
jburhenn 0:09ef59d2d0f7 1034 #endif