MQTT client to test the ENC28J60-EMAC on NUCLEO-F446RE.

Dependencies:   ENC28J60-EMAC

Committer:
hudakz
Date:
Mon Mar 29 09:32:44 2021 +0000
Revision:
5:d9570dbf2f82
Parent:
0:238f0d0c0ba3
MQTT client to test the ENC28J60-EMAC on NUCLEO-F446RE.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
hudakz 0:238f0d0c0ba3 1 /*******************************************************************************
hudakz 0:238f0d0c0ba3 2 * Copyright (c) 2014, 2017 IBM Corp.
hudakz 0:238f0d0c0ba3 3 *
hudakz 0:238f0d0c0ba3 4 * All rights reserved. This program and the accompanying materials
hudakz 0:238f0d0c0ba3 5 * are made available under the terms of the Eclipse Public License v1.0
hudakz 0:238f0d0c0ba3 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
hudakz 0:238f0d0c0ba3 7 *
hudakz 0:238f0d0c0ba3 8 * The Eclipse Public License is available at
hudakz 0:238f0d0c0ba3 9 * http://www.eclipse.org/legal/epl-v10.html
hudakz 0:238f0d0c0ba3 10 * and the Eclipse Distribution License is available at
hudakz 0:238f0d0c0ba3 11 * http://www.eclipse.org/org/documents/edl-v10.php.
hudakz 0:238f0d0c0ba3 12 *
hudakz 0:238f0d0c0ba3 13 * Contributors:
hudakz 0:238f0d0c0ba3 14 * Ian Craggs - initial API and implementation and/or initial documentation
hudakz 0:238f0d0c0ba3 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
hudakz 0:238f0d0c0ba3 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
hudakz 0:238f0d0c0ba3 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
hudakz 0:238f0d0c0ba3 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
hudakz 0:238f0d0c0ba3 19 * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer
hudakz 0:238f0d0c0ba3 20 * Ian Craggs - fix for bug 475749 - packetid modified twice
hudakz 0:238f0d0c0ba3 21 * Ian Craggs - add ability to set message handler separately #6
hudakz 0:238f0d0c0ba3 22 *******************************************************************************/
hudakz 0:238f0d0c0ba3 23
hudakz 0:238f0d0c0ba3 24 #if !defined(MQTTCLIENT_H)
hudakz 0:238f0d0c0ba3 25 #define MQTTCLIENT_H
hudakz 0:238f0d0c0ba3 26
hudakz 0:238f0d0c0ba3 27 #include "FP.h"
hudakz 0:238f0d0c0ba3 28 #include "MQTTPacket.h"
hudakz 0:238f0d0c0ba3 29 #include <stdio.h>
hudakz 0:238f0d0c0ba3 30 #include "MQTTLogging.h"
hudakz 0:238f0d0c0ba3 31
hudakz 0:238f0d0c0ba3 32 #if !defined(MQTTCLIENT_QOS1)
hudakz 0:238f0d0c0ba3 33 #define MQTTCLIENT_QOS1 1
hudakz 0:238f0d0c0ba3 34 #endif
hudakz 0:238f0d0c0ba3 35 #if !defined(MQTTCLIENT_QOS2)
hudakz 0:238f0d0c0ba3 36 #define MQTTCLIENT_QOS2 0
hudakz 0:238f0d0c0ba3 37 #endif
hudakz 0:238f0d0c0ba3 38
hudakz 0:238f0d0c0ba3 39 namespace MQTT
hudakz 0:238f0d0c0ba3 40 {
hudakz 0:238f0d0c0ba3 41
hudakz 0:238f0d0c0ba3 42
hudakz 0:238f0d0c0ba3 43 enum QoS { QOS0, QOS1, QOS2 };
hudakz 0:238f0d0c0ba3 44
hudakz 0:238f0d0c0ba3 45 // all failure return codes must be negative
hudakz 0:238f0d0c0ba3 46 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
hudakz 0:238f0d0c0ba3 47
hudakz 0:238f0d0c0ba3 48
hudakz 0:238f0d0c0ba3 49 struct Message
hudakz 0:238f0d0c0ba3 50 {
hudakz 0:238f0d0c0ba3 51 enum QoS qos;
hudakz 0:238f0d0c0ba3 52 bool retained;
hudakz 0:238f0d0c0ba3 53 bool dup;
hudakz 0:238f0d0c0ba3 54 unsigned short id;
hudakz 0:238f0d0c0ba3 55 void *payload;
hudakz 0:238f0d0c0ba3 56 size_t payloadlen;
hudakz 0:238f0d0c0ba3 57 };
hudakz 0:238f0d0c0ba3 58
hudakz 0:238f0d0c0ba3 59
hudakz 0:238f0d0c0ba3 60 struct MessageData
hudakz 0:238f0d0c0ba3 61 {
hudakz 0:238f0d0c0ba3 62 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
hudakz 0:238f0d0c0ba3 63 { }
hudakz 0:238f0d0c0ba3 64
hudakz 0:238f0d0c0ba3 65 struct Message &message;
hudakz 0:238f0d0c0ba3 66 MQTTString &topicName;
hudakz 0:238f0d0c0ba3 67 };
hudakz 0:238f0d0c0ba3 68
hudakz 0:238f0d0c0ba3 69
hudakz 0:238f0d0c0ba3 70 struct connackData
hudakz 0:238f0d0c0ba3 71 {
hudakz 0:238f0d0c0ba3 72 int rc;
hudakz 0:238f0d0c0ba3 73 bool sessionPresent;
hudakz 0:238f0d0c0ba3 74 };
hudakz 0:238f0d0c0ba3 75
hudakz 0:238f0d0c0ba3 76
hudakz 0:238f0d0c0ba3 77 struct subackData
hudakz 0:238f0d0c0ba3 78 {
hudakz 0:238f0d0c0ba3 79 int grantedQoS;
hudakz 0:238f0d0c0ba3 80 };
hudakz 0:238f0d0c0ba3 81
hudakz 0:238f0d0c0ba3 82
hudakz 0:238f0d0c0ba3 83 class PacketId
hudakz 0:238f0d0c0ba3 84 {
hudakz 0:238f0d0c0ba3 85 public:
hudakz 0:238f0d0c0ba3 86 PacketId()
hudakz 0:238f0d0c0ba3 87 {
hudakz 0:238f0d0c0ba3 88 next = 0;
hudakz 0:238f0d0c0ba3 89 }
hudakz 0:238f0d0c0ba3 90
hudakz 0:238f0d0c0ba3 91 int getNext()
hudakz 0:238f0d0c0ba3 92 {
hudakz 0:238f0d0c0ba3 93 return next = (next == MAX_PACKET_ID) ? 1 : next + 1;
hudakz 0:238f0d0c0ba3 94 }
hudakz 0:238f0d0c0ba3 95
hudakz 0:238f0d0c0ba3 96 private:
hudakz 0:238f0d0c0ba3 97 static const int MAX_PACKET_ID = 65535;
hudakz 0:238f0d0c0ba3 98 int next;
hudakz 0:238f0d0c0ba3 99 };
hudakz 0:238f0d0c0ba3 100
hudakz 0:238f0d0c0ba3 101
hudakz 0:238f0d0c0ba3 102 /**
hudakz 0:238f0d0c0ba3 103 * @class Client
hudakz 0:238f0d0c0ba3 104 * @brief blocking, non-threaded MQTT client API
hudakz 0:238f0d0c0ba3 105 *
hudakz 0:238f0d0c0ba3 106 * This version of the API blocks on all method calls, until they are complete. This means that only one
hudakz 0:238f0d0c0ba3 107 * MQTT request can be in process at any one time.
hudakz 0:238f0d0c0ba3 108 * @param Network a network class which supports send, receive
hudakz 0:238f0d0c0ba3 109 * @param Timer a timer class with the methods:
hudakz 0:238f0d0c0ba3 110 */
hudakz 0:238f0d0c0ba3 111 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>
hudakz 0:238f0d0c0ba3 112 class Client
hudakz 0:238f0d0c0ba3 113 {
hudakz 0:238f0d0c0ba3 114
hudakz 0:238f0d0c0ba3 115 public:
hudakz 0:238f0d0c0ba3 116
hudakz 0:238f0d0c0ba3 117 typedef void (*messageHandler)(MessageData&);
hudakz 0:238f0d0c0ba3 118
hudakz 0:238f0d0c0ba3 119 /** Construct the client
hudakz 0:238f0d0c0ba3 120 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
hudakz 0:238f0d0c0ba3 121 * before calling MQTT connect
hudakz 0:238f0d0c0ba3 122 * @param limits an instance of the Limit class - to alter limits as required
hudakz 0:238f0d0c0ba3 123 */
hudakz 0:238f0d0c0ba3 124 Client(Network& network, unsigned int command_timeout_ms = 30000);
hudakz 0:238f0d0c0ba3 125
hudakz 0:238f0d0c0ba3 126 /** Set the default message handling callback - used for any message which does not match a subscription message handler
hudakz 0:238f0d0c0ba3 127 * @param mh - pointer to the callback function. Set to 0 to remove.
hudakz 0:238f0d0c0ba3 128 */
hudakz 0:238f0d0c0ba3 129 void setDefaultMessageHandler(messageHandler mh)
hudakz 0:238f0d0c0ba3 130 {
hudakz 0:238f0d0c0ba3 131 if (mh != 0)
hudakz 0:238f0d0c0ba3 132 defaultMessageHandler.attach(mh);
hudakz 0:238f0d0c0ba3 133 else
hudakz 0:238f0d0c0ba3 134 defaultMessageHandler.detach();
hudakz 0:238f0d0c0ba3 135 }
hudakz 0:238f0d0c0ba3 136
hudakz 0:238f0d0c0ba3 137 /** Set a message handling callback. This can be used outside of the the subscribe method.
hudakz 0:238f0d0c0ba3 138 * @param topicFilter - a topic pattern which can include wildcards
hudakz 0:238f0d0c0ba3 139 * @param mh - pointer to the callback function. If 0, removes the callback if any
hudakz 0:238f0d0c0ba3 140 */
hudakz 0:238f0d0c0ba3 141 int setMessageHandler(const char* topicFilter, messageHandler mh);
hudakz 0:238f0d0c0ba3 142
hudakz 0:238f0d0c0ba3 143 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
hudakz 0:238f0d0c0ba3 144 * The nework object must be connected to the network endpoint before calling this
hudakz 0:238f0d0c0ba3 145 * Default connect options are used
hudakz 0:238f0d0c0ba3 146 * @return success code -
hudakz 0:238f0d0c0ba3 147 */
hudakz 0:238f0d0c0ba3 148 int connect();
hudakz 0:238f0d0c0ba3 149
hudakz 0:238f0d0c0ba3 150 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
hudakz 0:238f0d0c0ba3 151 * The nework object must be connected to the network endpoint before calling this
hudakz 0:238f0d0c0ba3 152 * @param options - connect options
hudakz 0:238f0d0c0ba3 153 * @return success code -
hudakz 0:238f0d0c0ba3 154 */
hudakz 0:238f0d0c0ba3 155 int connect(MQTTPacket_connectData& options);
hudakz 0:238f0d0c0ba3 156
hudakz 0:238f0d0c0ba3 157 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
hudakz 0:238f0d0c0ba3 158 * The nework object must be connected to the network endpoint before calling this
hudakz 0:238f0d0c0ba3 159 * @param options - connect options
hudakz 0:238f0d0c0ba3 160 * @param connackData - connack data to be returned
hudakz 0:238f0d0c0ba3 161 * @return success code -
hudakz 0:238f0d0c0ba3 162 */
hudakz 0:238f0d0c0ba3 163 int connect(MQTTPacket_connectData& options, connackData& data);
hudakz 0:238f0d0c0ba3 164
hudakz 0:238f0d0c0ba3 165 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
hudakz 0:238f0d0c0ba3 166 * @param topic - the topic to publish to
hudakz 0:238f0d0c0ba3 167 * @param message - the message to send
hudakz 0:238f0d0c0ba3 168 * @return success code -
hudakz 0:238f0d0c0ba3 169 */
hudakz 0:238f0d0c0ba3 170 int publish(const char* topicName, Message& message);
hudakz 0:238f0d0c0ba3 171
hudakz 0:238f0d0c0ba3 172 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
hudakz 0:238f0d0c0ba3 173 * @param topic - the topic to publish to
hudakz 0:238f0d0c0ba3 174 * @param payload - the data to send
hudakz 0:238f0d0c0ba3 175 * @param payloadlen - the length of the data
hudakz 0:238f0d0c0ba3 176 * @param qos - the QoS to send the publish at
hudakz 0:238f0d0c0ba3 177 * @param retained - whether the message should be retained
hudakz 0:238f0d0c0ba3 178 * @return success code -
hudakz 0:238f0d0c0ba3 179 */
hudakz 0:238f0d0c0ba3 180 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
hudakz 0:238f0d0c0ba3 181
hudakz 0:238f0d0c0ba3 182 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
hudakz 0:238f0d0c0ba3 183 * @param topic - the topic to publish to
hudakz 0:238f0d0c0ba3 184 * @param payload - the data to send
hudakz 0:238f0d0c0ba3 185 * @param payloadlen - the length of the data
hudakz 0:238f0d0c0ba3 186 * @param id - the packet id used - returned
hudakz 0:238f0d0c0ba3 187 * @param qos - the QoS to send the publish at
hudakz 0:238f0d0c0ba3 188 * @param retained - whether the message should be retained
hudakz 0:238f0d0c0ba3 189 * @return success code -
hudakz 0:238f0d0c0ba3 190 */
hudakz 0:238f0d0c0ba3 191 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
hudakz 0:238f0d0c0ba3 192
hudakz 0:238f0d0c0ba3 193 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
hudakz 0:238f0d0c0ba3 194 * @param topicFilter - a topic pattern which can include wildcards
hudakz 0:238f0d0c0ba3 195 * @param qos - the MQTT QoS to subscribe at
hudakz 0:238f0d0c0ba3 196 * @param mh - the callback function to be invoked when a message is received for this subscription
hudakz 0:238f0d0c0ba3 197 * @return success code -
hudakz 0:238f0d0c0ba3 198 */
hudakz 0:238f0d0c0ba3 199 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
hudakz 0:238f0d0c0ba3 200
hudakz 0:238f0d0c0ba3 201 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
hudakz 0:238f0d0c0ba3 202 * @param topicFilter - a topic pattern which can include wildcards
hudakz 0:238f0d0c0ba3 203 * @param qos - the MQTT QoS to subscribe at©
hudakz 0:238f0d0c0ba3 204 * @param mh - the callback function to be invoked when a message is received for this subscription
hudakz 0:238f0d0c0ba3 205 * @param
hudakz 0:238f0d0c0ba3 206 * @return success code -
hudakz 0:238f0d0c0ba3 207 */
hudakz 0:238f0d0c0ba3 208 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data);
hudakz 0:238f0d0c0ba3 209
hudakz 0:238f0d0c0ba3 210 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
hudakz 0:238f0d0c0ba3 211 * @param topicFilter - a topic pattern which can include wildcards
hudakz 0:238f0d0c0ba3 212 * @return success code -
hudakz 0:238f0d0c0ba3 213 */
hudakz 0:238f0d0c0ba3 214 int unsubscribe(const char* topicFilter);
hudakz 0:238f0d0c0ba3 215
hudakz 0:238f0d0c0ba3 216 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
hudakz 0:238f0d0c0ba3 217 * @return success code -
hudakz 0:238f0d0c0ba3 218 */
hudakz 0:238f0d0c0ba3 219 int disconnect();
hudakz 0:238f0d0c0ba3 220
hudakz 0:238f0d0c0ba3 221 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
hudakz 0:238f0d0c0ba3 222 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
hudakz 0:238f0d0c0ba3 223 * received.
hudakz 0:238f0d0c0ba3 224 * @param timeout_ms the time to wait, in milliseconds
hudakz 0:238f0d0c0ba3 225 * @return success code - on failure, this means the client has disconnected
hudakz 0:238f0d0c0ba3 226 */
hudakz 0:238f0d0c0ba3 227 int yield(unsigned long timeout_ms = 1000L);
hudakz 0:238f0d0c0ba3 228
hudakz 0:238f0d0c0ba3 229 /** Is the client connected?
hudakz 0:238f0d0c0ba3 230 * @return flag - is the client connected or not?
hudakz 0:238f0d0c0ba3 231 */
hudakz 0:238f0d0c0ba3 232 bool isConnected()
hudakz 0:238f0d0c0ba3 233 {
hudakz 0:238f0d0c0ba3 234 return isconnected;
hudakz 0:238f0d0c0ba3 235 }
hudakz 0:238f0d0c0ba3 236
hudakz 0:238f0d0c0ba3 237 private:
hudakz 0:238f0d0c0ba3 238
hudakz 0:238f0d0c0ba3 239 void closeSession();
hudakz 0:238f0d0c0ba3 240 void cleanSession();
hudakz 0:238f0d0c0ba3 241 int cycle(Timer& timer);
hudakz 0:238f0d0c0ba3 242 int waitfor(int packet_type, Timer& timer);
hudakz 0:238f0d0c0ba3 243 int keepalive();
hudakz 0:238f0d0c0ba3 244 int publish(int len, Timer& timer, enum QoS qos);
hudakz 0:238f0d0c0ba3 245
hudakz 0:238f0d0c0ba3 246 int decodePacket(int* value, int timeout);
hudakz 0:238f0d0c0ba3 247 int readPacket(Timer& timer);
hudakz 0:238f0d0c0ba3 248 int sendPacket(int length, Timer& timer);
hudakz 0:238f0d0c0ba3 249 int deliverMessage(MQTTString& topicName, Message& message);
hudakz 0:238f0d0c0ba3 250 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
hudakz 0:238f0d0c0ba3 251
hudakz 0:238f0d0c0ba3 252 Network& ipstack;
hudakz 0:238f0d0c0ba3 253 unsigned long command_timeout_ms;
hudakz 0:238f0d0c0ba3 254
hudakz 0:238f0d0c0ba3 255 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
hudakz 0:238f0d0c0ba3 256 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
hudakz 0:238f0d0c0ba3 257
hudakz 0:238f0d0c0ba3 258 Timer last_sent, last_received;
hudakz 0:238f0d0c0ba3 259 unsigned int keepAliveInterval;
hudakz 0:238f0d0c0ba3 260 bool ping_outstanding;
hudakz 0:238f0d0c0ba3 261 bool cleansession;
hudakz 0:238f0d0c0ba3 262
hudakz 0:238f0d0c0ba3 263 PacketId packetid;
hudakz 0:238f0d0c0ba3 264
hudakz 0:238f0d0c0ba3 265 struct MessageHandlers
hudakz 0:238f0d0c0ba3 266 {
hudakz 0:238f0d0c0ba3 267 const char* topicFilter;
hudakz 0:238f0d0c0ba3 268 FP<void, MessageData&> fp;
hudakz 0:238f0d0c0ba3 269 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
hudakz 0:238f0d0c0ba3 270
hudakz 0:238f0d0c0ba3 271 FP<void, MessageData&> defaultMessageHandler;
hudakz 0:238f0d0c0ba3 272
hudakz 0:238f0d0c0ba3 273 bool isconnected;
hudakz 0:238f0d0c0ba3 274
hudakz 0:238f0d0c0ba3 275 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 276 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
hudakz 0:238f0d0c0ba3 277 int inflightLen;
hudakz 0:238f0d0c0ba3 278 unsigned short inflightMsgid;
hudakz 0:238f0d0c0ba3 279 enum QoS inflightQoS;
hudakz 0:238f0d0c0ba3 280 #endif
hudakz 0:238f0d0c0ba3 281
hudakz 0:238f0d0c0ba3 282 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 283 bool pubrel;
hudakz 0:238f0d0c0ba3 284 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
hudakz 0:238f0d0c0ba3 285 #define MAX_INCOMING_QOS2_MESSAGES 10
hudakz 0:238f0d0c0ba3 286 #endif
hudakz 0:238f0d0c0ba3 287 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
hudakz 0:238f0d0c0ba3 288 bool isQoS2msgidFree(unsigned short id);
hudakz 0:238f0d0c0ba3 289 bool useQoS2msgid(unsigned short id);
hudakz 0:238f0d0c0ba3 290 void freeQoS2msgid(unsigned short id);
hudakz 0:238f0d0c0ba3 291 #endif
hudakz 0:238f0d0c0ba3 292
hudakz 0:238f0d0c0ba3 293 };
hudakz 0:238f0d0c0ba3 294
hudakz 0:238f0d0c0ba3 295 }
hudakz 0:238f0d0c0ba3 296
hudakz 0:238f0d0c0ba3 297
hudakz 0:238f0d0c0ba3 298 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 299 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
hudakz 0:238f0d0c0ba3 300 {
hudakz 0:238f0d0c0ba3 301 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
hudakz 0:238f0d0c0ba3 302 messageHandlers[i].topicFilter = 0;
hudakz 0:238f0d0c0ba3 303
hudakz 0:238f0d0c0ba3 304 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 305 inflightMsgid = 0;
hudakz 0:238f0d0c0ba3 306 inflightQoS = QOS0;
hudakz 0:238f0d0c0ba3 307 #endif
hudakz 0:238f0d0c0ba3 308
hudakz 0:238f0d0c0ba3 309 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 310 pubrel = false;
hudakz 0:238f0d0c0ba3 311 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
hudakz 0:238f0d0c0ba3 312 incomingQoS2messages[i] = 0;
hudakz 0:238f0d0c0ba3 313 #endif
hudakz 0:238f0d0c0ba3 314 }
hudakz 0:238f0d0c0ba3 315
hudakz 0:238f0d0c0ba3 316
hudakz 0:238f0d0c0ba3 317 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 318 void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession()
hudakz 0:238f0d0c0ba3 319 {
hudakz 0:238f0d0c0ba3 320 ping_outstanding = false;
hudakz 0:238f0d0c0ba3 321 isconnected = false;
hudakz 0:238f0d0c0ba3 322 if (cleansession)
hudakz 0:238f0d0c0ba3 323 cleanSession();
hudakz 0:238f0d0c0ba3 324 }
hudakz 0:238f0d0c0ba3 325
hudakz 0:238f0d0c0ba3 326
hudakz 0:238f0d0c0ba3 327 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 328 MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
hudakz 0:238f0d0c0ba3 329 {
hudakz 0:238f0d0c0ba3 330 this->command_timeout_ms = command_timeout_ms;
hudakz 0:238f0d0c0ba3 331 cleansession = true;
hudakz 0:238f0d0c0ba3 332 closeSession();
hudakz 0:238f0d0c0ba3 333 }
hudakz 0:238f0d0c0ba3 334
hudakz 0:238f0d0c0ba3 335
hudakz 0:238f0d0c0ba3 336 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 337 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 338 bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id)
hudakz 0:238f0d0c0ba3 339 {
hudakz 0:238f0d0c0ba3 340 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
hudakz 0:238f0d0c0ba3 341 {
hudakz 0:238f0d0c0ba3 342 if (incomingQoS2messages[i] == id)
hudakz 0:238f0d0c0ba3 343 return false;
hudakz 0:238f0d0c0ba3 344 }
hudakz 0:238f0d0c0ba3 345 return true;
hudakz 0:238f0d0c0ba3 346 }
hudakz 0:238f0d0c0ba3 347
hudakz 0:238f0d0c0ba3 348
hudakz 0:238f0d0c0ba3 349 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 350 bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id)
hudakz 0:238f0d0c0ba3 351 {
hudakz 0:238f0d0c0ba3 352 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
hudakz 0:238f0d0c0ba3 353 {
hudakz 0:238f0d0c0ba3 354 if (incomingQoS2messages[i] == 0)
hudakz 0:238f0d0c0ba3 355 {
hudakz 0:238f0d0c0ba3 356 incomingQoS2messages[i] = id;
hudakz 0:238f0d0c0ba3 357 return true;
hudakz 0:238f0d0c0ba3 358 }
hudakz 0:238f0d0c0ba3 359 }
hudakz 0:238f0d0c0ba3 360 return false;
hudakz 0:238f0d0c0ba3 361 }
hudakz 0:238f0d0c0ba3 362
hudakz 0:238f0d0c0ba3 363
hudakz 0:238f0d0c0ba3 364 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 365 void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id)
hudakz 0:238f0d0c0ba3 366 {
hudakz 0:238f0d0c0ba3 367 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
hudakz 0:238f0d0c0ba3 368 {
hudakz 0:238f0d0c0ba3 369 if (incomingQoS2messages[i] == id)
hudakz 0:238f0d0c0ba3 370 {
hudakz 0:238f0d0c0ba3 371 incomingQoS2messages[i] = 0;
hudakz 0:238f0d0c0ba3 372 return;
hudakz 0:238f0d0c0ba3 373 }
hudakz 0:238f0d0c0ba3 374 }
hudakz 0:238f0d0c0ba3 375 }
hudakz 0:238f0d0c0ba3 376 #endif
hudakz 0:238f0d0c0ba3 377
hudakz 0:238f0d0c0ba3 378
hudakz 0:238f0d0c0ba3 379 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 380 int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer)
hudakz 0:238f0d0c0ba3 381 {
hudakz 0:238f0d0c0ba3 382 int rc = FAILURE,
hudakz 0:238f0d0c0ba3 383 sent = 0;
hudakz 0:238f0d0c0ba3 384
hudakz 0:238f0d0c0ba3 385 while (sent < length)
hudakz 0:238f0d0c0ba3 386 {
hudakz 0:238f0d0c0ba3 387 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
hudakz 0:238f0d0c0ba3 388 if (rc < 0) // there was an error writing the data
hudakz 0:238f0d0c0ba3 389 break;
hudakz 0:238f0d0c0ba3 390 sent += rc;
hudakz 0:238f0d0c0ba3 391 if (timer.expired()) // only check expiry after at least one attempt to write
hudakz 0:238f0d0c0ba3 392 break;
hudakz 0:238f0d0c0ba3 393 }
hudakz 0:238f0d0c0ba3 394 if (sent == length)
hudakz 0:238f0d0c0ba3 395 {
hudakz 0:238f0d0c0ba3 396 if (this->keepAliveInterval > 0)
hudakz 0:238f0d0c0ba3 397 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
hudakz 0:238f0d0c0ba3 398 rc = SUCCESS;
hudakz 0:238f0d0c0ba3 399 }
hudakz 0:238f0d0c0ba3 400 else
hudakz 0:238f0d0c0ba3 401 rc = FAILURE;
hudakz 0:238f0d0c0ba3 402
hudakz 0:238f0d0c0ba3 403 #if defined(MQTT_DEBUG)
hudakz 0:238f0d0c0ba3 404 char printbuf[150];
hudakz 0:238f0d0c0ba3 405 DEBUG("Rc %d from sending packet %s\r\n", rc,
hudakz 0:238f0d0c0ba3 406 MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
hudakz 0:238f0d0c0ba3 407 #endif
hudakz 0:238f0d0c0ba3 408 return rc;
hudakz 0:238f0d0c0ba3 409 }
hudakz 0:238f0d0c0ba3 410
hudakz 0:238f0d0c0ba3 411
hudakz 0:238f0d0c0ba3 412 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 413 int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout)
hudakz 0:238f0d0c0ba3 414 {
hudakz 0:238f0d0c0ba3 415 unsigned char c;
hudakz 0:238f0d0c0ba3 416 int multiplier = 1;
hudakz 0:238f0d0c0ba3 417 int len = 0;
hudakz 0:238f0d0c0ba3 418 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
hudakz 0:238f0d0c0ba3 419
hudakz 0:238f0d0c0ba3 420 *value = 0;
hudakz 0:238f0d0c0ba3 421 do
hudakz 0:238f0d0c0ba3 422 {
hudakz 0:238f0d0c0ba3 423 int rc = MQTTPACKET_READ_ERROR;
hudakz 0:238f0d0c0ba3 424
hudakz 0:238f0d0c0ba3 425 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
hudakz 0:238f0d0c0ba3 426 {
hudakz 0:238f0d0c0ba3 427 rc = MQTTPACKET_READ_ERROR; /* bad data */
hudakz 0:238f0d0c0ba3 428 goto exit;
hudakz 0:238f0d0c0ba3 429 }
hudakz 0:238f0d0c0ba3 430 rc = ipstack.read(&c, 1, timeout);
hudakz 0:238f0d0c0ba3 431 if (rc != 1)
hudakz 0:238f0d0c0ba3 432 goto exit;
hudakz 0:238f0d0c0ba3 433 *value += (c & 127) * multiplier;
hudakz 0:238f0d0c0ba3 434 multiplier *= 128;
hudakz 0:238f0d0c0ba3 435 } while ((c & 128) != 0);
hudakz 0:238f0d0c0ba3 436 exit:
hudakz 0:238f0d0c0ba3 437 return len;
hudakz 0:238f0d0c0ba3 438 }
hudakz 0:238f0d0c0ba3 439
hudakz 0:238f0d0c0ba3 440
hudakz 0:238f0d0c0ba3 441 /**
hudakz 0:238f0d0c0ba3 442 * If any read fails in this method, then we should disconnect from the network, as on reconnect
hudakz 0:238f0d0c0ba3 443 * the packets can be retried.
hudakz 0:238f0d0c0ba3 444 * @param timeout the max time to wait for the packet read to complete, in milliseconds
hudakz 0:238f0d0c0ba3 445 * @return the MQTT packet type, 0 if none, -1 if error
hudakz 0:238f0d0c0ba3 446 */
hudakz 0:238f0d0c0ba3 447 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 448 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer)
hudakz 0:238f0d0c0ba3 449 {
hudakz 0:238f0d0c0ba3 450 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 451 MQTTHeader header = {0};
hudakz 0:238f0d0c0ba3 452 int len = 0;
hudakz 0:238f0d0c0ba3 453 int rem_len = 0;
hudakz 0:238f0d0c0ba3 454
hudakz 0:238f0d0c0ba3 455 /* 1. read the header byte. This has the packet type in it */
hudakz 0:238f0d0c0ba3 456 rc = ipstack.read(readbuf, 1, timer.left_ms());
hudakz 0:238f0d0c0ba3 457 if (rc != 1)
hudakz 0:238f0d0c0ba3 458 goto exit;
hudakz 0:238f0d0c0ba3 459
hudakz 0:238f0d0c0ba3 460 len = 1;
hudakz 0:238f0d0c0ba3 461 /* 2. read the remaining length. This is variable in itself */
hudakz 0:238f0d0c0ba3 462 decodePacket(&rem_len, timer.left_ms());
hudakz 0:238f0d0c0ba3 463 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
hudakz 0:238f0d0c0ba3 464
hudakz 0:238f0d0c0ba3 465 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
hudakz 0:238f0d0c0ba3 466 {
hudakz 0:238f0d0c0ba3 467 rc = BUFFER_OVERFLOW;
hudakz 0:238f0d0c0ba3 468 goto exit;
hudakz 0:238f0d0c0ba3 469 }
hudakz 0:238f0d0c0ba3 470
hudakz 0:238f0d0c0ba3 471 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
hudakz 0:238f0d0c0ba3 472 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
hudakz 0:238f0d0c0ba3 473 goto exit;
hudakz 0:238f0d0c0ba3 474
hudakz 0:238f0d0c0ba3 475 header.byte = readbuf[0];
hudakz 0:238f0d0c0ba3 476 rc = header.bits.type;
hudakz 0:238f0d0c0ba3 477 if (this->keepAliveInterval > 0)
hudakz 0:238f0d0c0ba3 478 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
hudakz 0:238f0d0c0ba3 479 exit:
hudakz 0:238f0d0c0ba3 480
hudakz 0:238f0d0c0ba3 481 #if defined(MQTT_DEBUG)
hudakz 0:238f0d0c0ba3 482 if (rc >= 0)
hudakz 0:238f0d0c0ba3 483 {
hudakz 0:238f0d0c0ba3 484 char printbuf[50];
hudakz 0:238f0d0c0ba3 485 DEBUG("Rc %d receiving packet %s\r\n", rc,
hudakz 0:238f0d0c0ba3 486 MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
hudakz 0:238f0d0c0ba3 487 }
hudakz 0:238f0d0c0ba3 488 #endif
hudakz 0:238f0d0c0ba3 489 return rc;
hudakz 0:238f0d0c0ba3 490 }
hudakz 0:238f0d0c0ba3 491
hudakz 0:238f0d0c0ba3 492
hudakz 0:238f0d0c0ba3 493 // assume topic filter and name is in correct format
hudakz 0:238f0d0c0ba3 494 // # can only be at end
hudakz 0:238f0d0c0ba3 495 // + and # can only be next to separator
hudakz 0:238f0d0c0ba3 496 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 497 bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
hudakz 0:238f0d0c0ba3 498 {
hudakz 0:238f0d0c0ba3 499 char* curf = topicFilter;
hudakz 0:238f0d0c0ba3 500 char* curn = topicName.lenstring.data;
hudakz 0:238f0d0c0ba3 501 char* curn_end = curn + topicName.lenstring.len;
hudakz 0:238f0d0c0ba3 502
hudakz 0:238f0d0c0ba3 503 while (*curf && curn < curn_end)
hudakz 0:238f0d0c0ba3 504 {
hudakz 0:238f0d0c0ba3 505 if (*curn == '/' && *curf != '/')
hudakz 0:238f0d0c0ba3 506 break;
hudakz 0:238f0d0c0ba3 507 if (*curf != '+' && *curf != '#' && *curf != *curn)
hudakz 0:238f0d0c0ba3 508 break;
hudakz 0:238f0d0c0ba3 509 if (*curf == '+')
hudakz 0:238f0d0c0ba3 510 { // skip until we meet the next separator, or end of string
hudakz 0:238f0d0c0ba3 511 char* nextpos = curn + 1;
hudakz 0:238f0d0c0ba3 512 while (nextpos < curn_end && *nextpos != '/')
hudakz 0:238f0d0c0ba3 513 nextpos = ++curn + 1;
hudakz 0:238f0d0c0ba3 514 }
hudakz 0:238f0d0c0ba3 515 else if (*curf == '#')
hudakz 0:238f0d0c0ba3 516 curn = curn_end - 1; // skip until end of string
hudakz 0:238f0d0c0ba3 517 curf++;
hudakz 0:238f0d0c0ba3 518 curn++;
hudakz 0:238f0d0c0ba3 519 };
hudakz 0:238f0d0c0ba3 520
hudakz 0:238f0d0c0ba3 521 return (curn == curn_end) && (*curf == '\0');
hudakz 0:238f0d0c0ba3 522 }
hudakz 0:238f0d0c0ba3 523
hudakz 0:238f0d0c0ba3 524
hudakz 0:238f0d0c0ba3 525
hudakz 0:238f0d0c0ba3 526 template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 527 int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
hudakz 0:238f0d0c0ba3 528 {
hudakz 0:238f0d0c0ba3 529 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 530
hudakz 0:238f0d0c0ba3 531 // we have to find the right message handler - indexed by topic
hudakz 0:238f0d0c0ba3 532 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
hudakz 0:238f0d0c0ba3 533 {
hudakz 0:238f0d0c0ba3 534 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
hudakz 0:238f0d0c0ba3 535 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
hudakz 0:238f0d0c0ba3 536 {
hudakz 0:238f0d0c0ba3 537 if (messageHandlers[i].fp.attached())
hudakz 0:238f0d0c0ba3 538 {
hudakz 0:238f0d0c0ba3 539 MessageData md(topicName, message);
hudakz 0:238f0d0c0ba3 540 messageHandlers[i].fp(md);
hudakz 0:238f0d0c0ba3 541 rc = SUCCESS;
hudakz 0:238f0d0c0ba3 542 }
hudakz 0:238f0d0c0ba3 543 }
hudakz 0:238f0d0c0ba3 544 }
hudakz 0:238f0d0c0ba3 545
hudakz 0:238f0d0c0ba3 546 if (rc == FAILURE && defaultMessageHandler.attached())
hudakz 0:238f0d0c0ba3 547 {
hudakz 0:238f0d0c0ba3 548 MessageData md(topicName, message);
hudakz 0:238f0d0c0ba3 549 defaultMessageHandler(md);
hudakz 0:238f0d0c0ba3 550 rc = SUCCESS;
hudakz 0:238f0d0c0ba3 551 }
hudakz 0:238f0d0c0ba3 552
hudakz 0:238f0d0c0ba3 553 return rc;
hudakz 0:238f0d0c0ba3 554 }
hudakz 0:238f0d0c0ba3 555
hudakz 0:238f0d0c0ba3 556
hudakz 0:238f0d0c0ba3 557
hudakz 0:238f0d0c0ba3 558 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 559 int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms)
hudakz 0:238f0d0c0ba3 560 {
hudakz 0:238f0d0c0ba3 561 int rc = SUCCESS;
hudakz 0:238f0d0c0ba3 562 Timer timer;
hudakz 0:238f0d0c0ba3 563
hudakz 0:238f0d0c0ba3 564 timer.countdown_ms(timeout_ms);
hudakz 0:238f0d0c0ba3 565 while (!timer.expired())
hudakz 0:238f0d0c0ba3 566 {
hudakz 0:238f0d0c0ba3 567 if (cycle(timer) < 0)
hudakz 0:238f0d0c0ba3 568 {
hudakz 0:238f0d0c0ba3 569 rc = FAILURE;
hudakz 0:238f0d0c0ba3 570 break;
hudakz 0:238f0d0c0ba3 571 }
hudakz 0:238f0d0c0ba3 572 }
hudakz 0:238f0d0c0ba3 573
hudakz 0:238f0d0c0ba3 574 return rc;
hudakz 0:238f0d0c0ba3 575 }
hudakz 0:238f0d0c0ba3 576
hudakz 0:238f0d0c0ba3 577
hudakz 0:238f0d0c0ba3 578 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 579 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer)
hudakz 0:238f0d0c0ba3 580 {
hudakz 0:238f0d0c0ba3 581 // get one piece of work off the wire and one pass through
hudakz 0:238f0d0c0ba3 582 int len = 0,
hudakz 0:238f0d0c0ba3 583 rc = SUCCESS;
hudakz 0:238f0d0c0ba3 584
hudakz 0:238f0d0c0ba3 585 int packet_type = readPacket(timer); // read the socket, see what work is due
hudakz 0:238f0d0c0ba3 586
hudakz 0:238f0d0c0ba3 587 switch (packet_type)
hudakz 0:238f0d0c0ba3 588 {
hudakz 0:238f0d0c0ba3 589 default:
hudakz 0:238f0d0c0ba3 590 // no more data to read, unrecoverable. Or read packet fails due to unexpected network error
hudakz 0:238f0d0c0ba3 591 rc = packet_type;
hudakz 0:238f0d0c0ba3 592 goto exit;
hudakz 0:238f0d0c0ba3 593 case 0: // timed out reading packet
hudakz 0:238f0d0c0ba3 594 break;
hudakz 0:238f0d0c0ba3 595 case CONNACK:
hudakz 0:238f0d0c0ba3 596 case PUBACK:
hudakz 0:238f0d0c0ba3 597 case SUBACK:
hudakz 0:238f0d0c0ba3 598 break;
hudakz 0:238f0d0c0ba3 599 case PUBLISH:
hudakz 0:238f0d0c0ba3 600 {
hudakz 0:238f0d0c0ba3 601 MQTTString topicName = MQTTString_initializer;
hudakz 0:238f0d0c0ba3 602 Message msg;
hudakz 0:238f0d0c0ba3 603 int intQoS;
hudakz 0:238f0d0c0ba3 604 msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
hudakz 0:238f0d0c0ba3 605 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
hudakz 0:238f0d0c0ba3 606 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
hudakz 0:238f0d0c0ba3 607 goto exit;
hudakz 0:238f0d0c0ba3 608 msg.qos = (enum QoS)intQoS;
hudakz 0:238f0d0c0ba3 609 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 610 if (msg.qos != QOS2)
hudakz 0:238f0d0c0ba3 611 #endif
hudakz 0:238f0d0c0ba3 612 deliverMessage(topicName, msg);
hudakz 0:238f0d0c0ba3 613 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 614 else if (isQoS2msgidFree(msg.id))
hudakz 0:238f0d0c0ba3 615 {
hudakz 0:238f0d0c0ba3 616 if (useQoS2msgid(msg.id))
hudakz 0:238f0d0c0ba3 617 deliverMessage(topicName, msg);
hudakz 0:238f0d0c0ba3 618 else
hudakz 0:238f0d0c0ba3 619 WARN("Maximum number of incoming QoS2 messages exceeded");
hudakz 0:238f0d0c0ba3 620 }
hudakz 0:238f0d0c0ba3 621 #endif
hudakz 0:238f0d0c0ba3 622 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 623 if (msg.qos != QOS0)
hudakz 0:238f0d0c0ba3 624 {
hudakz 0:238f0d0c0ba3 625 if (msg.qos == QOS1)
hudakz 0:238f0d0c0ba3 626 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
hudakz 0:238f0d0c0ba3 627 else if (msg.qos == QOS2)
hudakz 0:238f0d0c0ba3 628 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
hudakz 0:238f0d0c0ba3 629 if (len <= 0)
hudakz 0:238f0d0c0ba3 630 rc = FAILURE;
hudakz 0:238f0d0c0ba3 631 else
hudakz 0:238f0d0c0ba3 632 rc = sendPacket(len, timer);
hudakz 0:238f0d0c0ba3 633 if (rc == FAILURE)
hudakz 0:238f0d0c0ba3 634 goto exit; // there was a problem
hudakz 0:238f0d0c0ba3 635 }
hudakz 0:238f0d0c0ba3 636 break;
hudakz 0:238f0d0c0ba3 637 #endif
hudakz 0:238f0d0c0ba3 638 }
hudakz 0:238f0d0c0ba3 639 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 640 case PUBREC:
hudakz 0:238f0d0c0ba3 641 case PUBREL:
hudakz 0:238f0d0c0ba3 642 unsigned short mypacketid;
hudakz 0:238f0d0c0ba3 643 unsigned char dup, type;
hudakz 0:238f0d0c0ba3 644 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
hudakz 0:238f0d0c0ba3 645 rc = FAILURE;
hudakz 0:238f0d0c0ba3 646 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
hudakz 0:238f0d0c0ba3 647 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
hudakz 0:238f0d0c0ba3 648 rc = FAILURE;
hudakz 0:238f0d0c0ba3 649 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
hudakz 0:238f0d0c0ba3 650 rc = FAILURE; // there was a problem
hudakz 0:238f0d0c0ba3 651 if (rc == FAILURE)
hudakz 0:238f0d0c0ba3 652 goto exit; // there was a problem
hudakz 0:238f0d0c0ba3 653 if (packet_type == PUBREL)
hudakz 0:238f0d0c0ba3 654 freeQoS2msgid(mypacketid);
hudakz 0:238f0d0c0ba3 655 break;
hudakz 0:238f0d0c0ba3 656
hudakz 0:238f0d0c0ba3 657 case PUBCOMP:
hudakz 0:238f0d0c0ba3 658 break;
hudakz 0:238f0d0c0ba3 659 #endif
hudakz 0:238f0d0c0ba3 660 case PINGRESP:
hudakz 0:238f0d0c0ba3 661 ping_outstanding = false;
hudakz 0:238f0d0c0ba3 662 break;
hudakz 0:238f0d0c0ba3 663 }
hudakz 0:238f0d0c0ba3 664
hudakz 0:238f0d0c0ba3 665 if (keepalive() != SUCCESS)
hudakz 0:238f0d0c0ba3 666 //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
hudakz 0:238f0d0c0ba3 667 rc = FAILURE;
hudakz 0:238f0d0c0ba3 668
hudakz 0:238f0d0c0ba3 669 exit:
hudakz 0:238f0d0c0ba3 670 if (rc == SUCCESS)
hudakz 0:238f0d0c0ba3 671 rc = packet_type;
hudakz 0:238f0d0c0ba3 672 else if (isconnected)
hudakz 0:238f0d0c0ba3 673 closeSession();
hudakz 0:238f0d0c0ba3 674 return rc;
hudakz 0:238f0d0c0ba3 675 }
hudakz 0:238f0d0c0ba3 676
hudakz 0:238f0d0c0ba3 677
hudakz 0:238f0d0c0ba3 678 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 679 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
hudakz 0:238f0d0c0ba3 680 {
hudakz 0:238f0d0c0ba3 681 int rc = SUCCESS;
hudakz 0:238f0d0c0ba3 682 static Timer ping_sent;
hudakz 0:238f0d0c0ba3 683
hudakz 0:238f0d0c0ba3 684 if (keepAliveInterval == 0)
hudakz 0:238f0d0c0ba3 685 goto exit;
hudakz 0:238f0d0c0ba3 686
hudakz 0:238f0d0c0ba3 687 if (ping_outstanding)
hudakz 0:238f0d0c0ba3 688 {
hudakz 0:238f0d0c0ba3 689 if (ping_sent.expired())
hudakz 0:238f0d0c0ba3 690 {
hudakz 0:238f0d0c0ba3 691 rc = FAILURE; // session failure
hudakz 0:238f0d0c0ba3 692 #if defined(MQTT_DEBUG)
hudakz 0:238f0d0c0ba3 693 DEBUG("PINGRESP not received in keepalive interval\r\n");
hudakz 0:238f0d0c0ba3 694 #endif
hudakz 0:238f0d0c0ba3 695 }
hudakz 0:238f0d0c0ba3 696 }
hudakz 0:238f0d0c0ba3 697 else if (last_sent.expired() || last_received.expired())
hudakz 0:238f0d0c0ba3 698 {
hudakz 0:238f0d0c0ba3 699 Timer timer(1000);
hudakz 0:238f0d0c0ba3 700 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
hudakz 0:238f0d0c0ba3 701 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
hudakz 0:238f0d0c0ba3 702 {
hudakz 0:238f0d0c0ba3 703 ping_outstanding = true;
hudakz 0:238f0d0c0ba3 704 ping_sent.countdown(this->keepAliveInterval);
hudakz 0:238f0d0c0ba3 705 }
hudakz 0:238f0d0c0ba3 706 }
hudakz 0:238f0d0c0ba3 707 exit:
hudakz 0:238f0d0c0ba3 708 return rc;
hudakz 0:238f0d0c0ba3 709 }
hudakz 0:238f0d0c0ba3 710
hudakz 0:238f0d0c0ba3 711
hudakz 0:238f0d0c0ba3 712 // only used in single-threaded mode where one command at a time is in process
hudakz 0:238f0d0c0ba3 713 template<class Network, class Timer, int a, int b>
hudakz 0:238f0d0c0ba3 714 int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer)
hudakz 0:238f0d0c0ba3 715 {
hudakz 0:238f0d0c0ba3 716 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 717
hudakz 0:238f0d0c0ba3 718 do
hudakz 0:238f0d0c0ba3 719 {
hudakz 0:238f0d0c0ba3 720 if (timer.expired())
hudakz 0:238f0d0c0ba3 721 break; // we timed out
hudakz 0:238f0d0c0ba3 722 rc = cycle(timer);
hudakz 0:238f0d0c0ba3 723 }
hudakz 0:238f0d0c0ba3 724 while (rc != packet_type && rc >= 0);
hudakz 0:238f0d0c0ba3 725
hudakz 0:238f0d0c0ba3 726 return rc;
hudakz 0:238f0d0c0ba3 727 }
hudakz 0:238f0d0c0ba3 728
hudakz 0:238f0d0c0ba3 729
hudakz 0:238f0d0c0ba3 730 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 731 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data)
hudakz 0:238f0d0c0ba3 732 {
hudakz 0:238f0d0c0ba3 733 Timer connect_timer(command_timeout_ms);
hudakz 0:238f0d0c0ba3 734 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 735 int len = 0;
hudakz 0:238f0d0c0ba3 736
hudakz 0:238f0d0c0ba3 737 if (isconnected) // don't send connect packet again if we are already connected
hudakz 0:238f0d0c0ba3 738 goto exit;
hudakz 0:238f0d0c0ba3 739
hudakz 0:238f0d0c0ba3 740 this->keepAliveInterval = options.keepAliveInterval;
hudakz 0:238f0d0c0ba3 741 this->cleansession = options.cleansession;
hudakz 0:238f0d0c0ba3 742 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
hudakz 0:238f0d0c0ba3 743 goto exit;
hudakz 0:238f0d0c0ba3 744 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
hudakz 0:238f0d0c0ba3 745 goto exit; // there was a problem
hudakz 0:238f0d0c0ba3 746
hudakz 0:238f0d0c0ba3 747 if (this->keepAliveInterval > 0)
hudakz 0:238f0d0c0ba3 748 last_received.countdown(this->keepAliveInterval);
hudakz 0:238f0d0c0ba3 749 // this will be a blocking call, wait for the connack
hudakz 0:238f0d0c0ba3 750 if (waitfor(CONNACK, connect_timer) == CONNACK)
hudakz 0:238f0d0c0ba3 751 {
hudakz 0:238f0d0c0ba3 752 data.rc = 0;
hudakz 0:238f0d0c0ba3 753 data.sessionPresent = false;
hudakz 0:238f0d0c0ba3 754 if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent,
hudakz 0:238f0d0c0ba3 755 (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
hudakz 0:238f0d0c0ba3 756 rc = data.rc;
hudakz 0:238f0d0c0ba3 757 else
hudakz 0:238f0d0c0ba3 758 rc = FAILURE;
hudakz 0:238f0d0c0ba3 759 }
hudakz 0:238f0d0c0ba3 760 else
hudakz 0:238f0d0c0ba3 761 rc = FAILURE;
hudakz 0:238f0d0c0ba3 762
hudakz 0:238f0d0c0ba3 763 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 764 // resend any inflight publish
hudakz 0:238f0d0c0ba3 765 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
hudakz 0:238f0d0c0ba3 766 {
hudakz 0:238f0d0c0ba3 767 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
hudakz 0:238f0d0c0ba3 768 rc = FAILURE;
hudakz 0:238f0d0c0ba3 769 else
hudakz 0:238f0d0c0ba3 770 rc = publish(len, connect_timer, inflightQoS);
hudakz 0:238f0d0c0ba3 771 }
hudakz 0:238f0d0c0ba3 772 else
hudakz 0:238f0d0c0ba3 773 #endif
hudakz 0:238f0d0c0ba3 774 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 775 if (inflightMsgid > 0)
hudakz 0:238f0d0c0ba3 776 {
hudakz 0:238f0d0c0ba3 777 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
hudakz 0:238f0d0c0ba3 778 rc = publish(inflightLen, connect_timer, inflightQoS);
hudakz 0:238f0d0c0ba3 779 }
hudakz 0:238f0d0c0ba3 780 #endif
hudakz 0:238f0d0c0ba3 781
hudakz 0:238f0d0c0ba3 782 exit:
hudakz 0:238f0d0c0ba3 783 if (rc == SUCCESS)
hudakz 0:238f0d0c0ba3 784 {
hudakz 0:238f0d0c0ba3 785 isconnected = true;
hudakz 0:238f0d0c0ba3 786 ping_outstanding = false;
hudakz 0:238f0d0c0ba3 787 }
hudakz 0:238f0d0c0ba3 788 return rc;
hudakz 0:238f0d0c0ba3 789 }
hudakz 0:238f0d0c0ba3 790
hudakz 0:238f0d0c0ba3 791
hudakz 0:238f0d0c0ba3 792 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 793 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
hudakz 0:238f0d0c0ba3 794 {
hudakz 0:238f0d0c0ba3 795 connackData data;
hudakz 0:238f0d0c0ba3 796 return connect(options, data);
hudakz 0:238f0d0c0ba3 797 }
hudakz 0:238f0d0c0ba3 798
hudakz 0:238f0d0c0ba3 799
hudakz 0:238f0d0c0ba3 800 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 801 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect()
hudakz 0:238f0d0c0ba3 802 {
hudakz 0:238f0d0c0ba3 803 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
hudakz 0:238f0d0c0ba3 804 return connect(default_options);
hudakz 0:238f0d0c0ba3 805 }
hudakz 0:238f0d0c0ba3 806
hudakz 0:238f0d0c0ba3 807
hudakz 0:238f0d0c0ba3 808 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 809 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler)
hudakz 0:238f0d0c0ba3 810 {
hudakz 0:238f0d0c0ba3 811 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 812 int i = -1;
hudakz 0:238f0d0c0ba3 813
hudakz 0:238f0d0c0ba3 814 // first check for an existing matching slot
hudakz 0:238f0d0c0ba3 815 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
hudakz 0:238f0d0c0ba3 816 {
hudakz 0:238f0d0c0ba3 817 if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
hudakz 0:238f0d0c0ba3 818 {
hudakz 0:238f0d0c0ba3 819 if (messageHandler == 0) // remove existing
hudakz 0:238f0d0c0ba3 820 {
hudakz 0:238f0d0c0ba3 821 messageHandlers[i].topicFilter = 0;
hudakz 0:238f0d0c0ba3 822 messageHandlers[i].fp.detach();
hudakz 0:238f0d0c0ba3 823 }
hudakz 0:238f0d0c0ba3 824 rc = SUCCESS; // return i when adding new subscription
hudakz 0:238f0d0c0ba3 825 break;
hudakz 0:238f0d0c0ba3 826 }
hudakz 0:238f0d0c0ba3 827 }
hudakz 0:238f0d0c0ba3 828 // if no existing, look for empty slot (unless we are removing)
hudakz 0:238f0d0c0ba3 829 if (messageHandler != 0) {
hudakz 0:238f0d0c0ba3 830 if (rc == FAILURE)
hudakz 0:238f0d0c0ba3 831 {
hudakz 0:238f0d0c0ba3 832 for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
hudakz 0:238f0d0c0ba3 833 {
hudakz 0:238f0d0c0ba3 834 if (messageHandlers[i].topicFilter == 0)
hudakz 0:238f0d0c0ba3 835 {
hudakz 0:238f0d0c0ba3 836 rc = SUCCESS;
hudakz 0:238f0d0c0ba3 837 break;
hudakz 0:238f0d0c0ba3 838 }
hudakz 0:238f0d0c0ba3 839 }
hudakz 0:238f0d0c0ba3 840 }
hudakz 0:238f0d0c0ba3 841 if (i < MAX_MESSAGE_HANDLERS)
hudakz 0:238f0d0c0ba3 842 {
hudakz 0:238f0d0c0ba3 843 messageHandlers[i].topicFilter = topicFilter;
hudakz 0:238f0d0c0ba3 844 messageHandlers[i].fp.attach(messageHandler);
hudakz 0:238f0d0c0ba3 845 }
hudakz 0:238f0d0c0ba3 846 }
hudakz 0:238f0d0c0ba3 847 return rc;
hudakz 0:238f0d0c0ba3 848 }
hudakz 0:238f0d0c0ba3 849
hudakz 0:238f0d0c0ba3 850
hudakz 0:238f0d0c0ba3 851 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 852 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter,
hudakz 0:238f0d0c0ba3 853 enum QoS qos, messageHandler messageHandler, subackData& data)
hudakz 0:238f0d0c0ba3 854 {
hudakz 0:238f0d0c0ba3 855 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 856 Timer timer(command_timeout_ms);
hudakz 0:238f0d0c0ba3 857 int len = 0;
hudakz 0:238f0d0c0ba3 858 MQTTString topic = {(char*)topicFilter, {0, 0}};
hudakz 0:238f0d0c0ba3 859
hudakz 0:238f0d0c0ba3 860 if (!isconnected)
hudakz 0:238f0d0c0ba3 861 goto exit;
hudakz 0:238f0d0c0ba3 862
hudakz 0:238f0d0c0ba3 863 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
hudakz 0:238f0d0c0ba3 864 if (len <= 0)
hudakz 0:238f0d0c0ba3 865 goto exit;
hudakz 0:238f0d0c0ba3 866 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
hudakz 0:238f0d0c0ba3 867 goto exit; // there was a problem
hudakz 0:238f0d0c0ba3 868
hudakz 0:238f0d0c0ba3 869 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
hudakz 0:238f0d0c0ba3 870 {
hudakz 0:238f0d0c0ba3 871 int count = 0;
hudakz 0:238f0d0c0ba3 872 unsigned short mypacketid;
hudakz 0:238f0d0c0ba3 873 data.grantedQoS = 0;
hudakz 0:238f0d0c0ba3 874 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
hudakz 0:238f0d0c0ba3 875 {
hudakz 0:238f0d0c0ba3 876 if (data.grantedQoS != 0x80)
hudakz 0:238f0d0c0ba3 877 rc = setMessageHandler(topicFilter, messageHandler);
hudakz 0:238f0d0c0ba3 878 }
hudakz 0:238f0d0c0ba3 879 }
hudakz 0:238f0d0c0ba3 880 else
hudakz 0:238f0d0c0ba3 881 rc = FAILURE;
hudakz 0:238f0d0c0ba3 882
hudakz 0:238f0d0c0ba3 883 exit:
hudakz 0:238f0d0c0ba3 884 if (rc == FAILURE)
hudakz 0:238f0d0c0ba3 885 closeSession();
hudakz 0:238f0d0c0ba3 886 return rc;
hudakz 0:238f0d0c0ba3 887 }
hudakz 0:238f0d0c0ba3 888
hudakz 0:238f0d0c0ba3 889
hudakz 0:238f0d0c0ba3 890 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 891 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
hudakz 0:238f0d0c0ba3 892 {
hudakz 0:238f0d0c0ba3 893 subackData data;
hudakz 0:238f0d0c0ba3 894 return subscribe(topicFilter, qos, messageHandler, data);
hudakz 0:238f0d0c0ba3 895 }
hudakz 0:238f0d0c0ba3 896
hudakz 0:238f0d0c0ba3 897
hudakz 0:238f0d0c0ba3 898 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
hudakz 0:238f0d0c0ba3 899 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
hudakz 0:238f0d0c0ba3 900 {
hudakz 0:238f0d0c0ba3 901 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 902 Timer timer(command_timeout_ms);
hudakz 0:238f0d0c0ba3 903 MQTTString topic = {(char*)topicFilter, {0, 0}};
hudakz 0:238f0d0c0ba3 904 int len = 0;
hudakz 0:238f0d0c0ba3 905
hudakz 0:238f0d0c0ba3 906 if (!isconnected)
hudakz 0:238f0d0c0ba3 907 goto exit;
hudakz 0:238f0d0c0ba3 908
hudakz 0:238f0d0c0ba3 909 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
hudakz 0:238f0d0c0ba3 910 goto exit;
hudakz 0:238f0d0c0ba3 911 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
hudakz 0:238f0d0c0ba3 912 goto exit; // there was a problem
hudakz 0:238f0d0c0ba3 913
hudakz 0:238f0d0c0ba3 914 if (waitfor(UNSUBACK, timer) == UNSUBACK)
hudakz 0:238f0d0c0ba3 915 {
hudakz 0:238f0d0c0ba3 916 unsigned short mypacketid; // should be the same as the packetid above
hudakz 0:238f0d0c0ba3 917 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
hudakz 0:238f0d0c0ba3 918 {
hudakz 0:238f0d0c0ba3 919 // remove the subscription message handler associated with this topic, if there is one
hudakz 0:238f0d0c0ba3 920 setMessageHandler(topicFilter, 0);
hudakz 0:238f0d0c0ba3 921 }
hudakz 0:238f0d0c0ba3 922 }
hudakz 0:238f0d0c0ba3 923 else
hudakz 0:238f0d0c0ba3 924 rc = FAILURE;
hudakz 0:238f0d0c0ba3 925
hudakz 0:238f0d0c0ba3 926 exit:
hudakz 0:238f0d0c0ba3 927 if (rc != SUCCESS)
hudakz 0:238f0d0c0ba3 928 closeSession();
hudakz 0:238f0d0c0ba3 929 return rc;
hudakz 0:238f0d0c0ba3 930 }
hudakz 0:238f0d0c0ba3 931
hudakz 0:238f0d0c0ba3 932
hudakz 0:238f0d0c0ba3 933 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 934 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos)
hudakz 0:238f0d0c0ba3 935 {
hudakz 0:238f0d0c0ba3 936 int rc;
hudakz 0:238f0d0c0ba3 937
hudakz 0:238f0d0c0ba3 938 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
hudakz 0:238f0d0c0ba3 939 goto exit; // there was a problem
hudakz 0:238f0d0c0ba3 940
hudakz 0:238f0d0c0ba3 941 #if MQTTCLIENT_QOS1
hudakz 0:238f0d0c0ba3 942 if (qos == QOS1)
hudakz 0:238f0d0c0ba3 943 {
hudakz 0:238f0d0c0ba3 944 if (waitfor(PUBACK, timer) == PUBACK)
hudakz 0:238f0d0c0ba3 945 {
hudakz 0:238f0d0c0ba3 946 unsigned short mypacketid;
hudakz 0:238f0d0c0ba3 947 unsigned char dup, type;
hudakz 0:238f0d0c0ba3 948 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
hudakz 0:238f0d0c0ba3 949 rc = FAILURE;
hudakz 0:238f0d0c0ba3 950 else if (inflightMsgid == mypacketid)
hudakz 0:238f0d0c0ba3 951 inflightMsgid = 0;
hudakz 0:238f0d0c0ba3 952 }
hudakz 0:238f0d0c0ba3 953 else
hudakz 0:238f0d0c0ba3 954 rc = FAILURE;
hudakz 0:238f0d0c0ba3 955 }
hudakz 0:238f0d0c0ba3 956 #endif
hudakz 0:238f0d0c0ba3 957 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 958 else if (qos == QOS2)
hudakz 0:238f0d0c0ba3 959 {
hudakz 0:238f0d0c0ba3 960 if (waitfor(PUBCOMP, timer) == PUBCOMP)
hudakz 0:238f0d0c0ba3 961 {
hudakz 0:238f0d0c0ba3 962 unsigned short mypacketid;
hudakz 0:238f0d0c0ba3 963 unsigned char dup, type;
hudakz 0:238f0d0c0ba3 964 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
hudakz 0:238f0d0c0ba3 965 rc = FAILURE;
hudakz 0:238f0d0c0ba3 966 else if (inflightMsgid == mypacketid)
hudakz 0:238f0d0c0ba3 967 inflightMsgid = 0;
hudakz 0:238f0d0c0ba3 968 }
hudakz 0:238f0d0c0ba3 969 else
hudakz 0:238f0d0c0ba3 970 rc = FAILURE;
hudakz 0:238f0d0c0ba3 971 }
hudakz 0:238f0d0c0ba3 972 #endif
hudakz 0:238f0d0c0ba3 973
hudakz 0:238f0d0c0ba3 974 exit:
hudakz 0:238f0d0c0ba3 975 if (rc != SUCCESS)
hudakz 0:238f0d0c0ba3 976 closeSession();
hudakz 0:238f0d0c0ba3 977 return rc;
hudakz 0:238f0d0c0ba3 978 }
hudakz 0:238f0d0c0ba3 979
hudakz 0:238f0d0c0ba3 980
hudakz 0:238f0d0c0ba3 981
hudakz 0:238f0d0c0ba3 982 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 983 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
hudakz 0:238f0d0c0ba3 984 {
hudakz 0:238f0d0c0ba3 985 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 986 Timer timer(command_timeout_ms);
hudakz 0:238f0d0c0ba3 987 MQTTString topicString = MQTTString_initializer;
hudakz 0:238f0d0c0ba3 988 int len = 0;
hudakz 0:238f0d0c0ba3 989
hudakz 0:238f0d0c0ba3 990 if (!isconnected)
hudakz 0:238f0d0c0ba3 991 goto exit;
hudakz 0:238f0d0c0ba3 992
hudakz 0:238f0d0c0ba3 993 topicString.cstring = (char*)topicName;
hudakz 0:238f0d0c0ba3 994
hudakz 0:238f0d0c0ba3 995 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 996 if (qos == QOS1 || qos == QOS2)
hudakz 0:238f0d0c0ba3 997 id = packetid.getNext();
hudakz 0:238f0d0c0ba3 998 #endif
hudakz 0:238f0d0c0ba3 999
hudakz 0:238f0d0c0ba3 1000 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
hudakz 0:238f0d0c0ba3 1001 topicString, (unsigned char*)payload, payloadlen);
hudakz 0:238f0d0c0ba3 1002 if (len <= 0)
hudakz 0:238f0d0c0ba3 1003 goto exit;
hudakz 0:238f0d0c0ba3 1004
hudakz 0:238f0d0c0ba3 1005 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 1006 if (!cleansession)
hudakz 0:238f0d0c0ba3 1007 {
hudakz 0:238f0d0c0ba3 1008 memcpy(pubbuf, sendbuf, len);
hudakz 0:238f0d0c0ba3 1009 inflightMsgid = id;
hudakz 0:238f0d0c0ba3 1010 inflightLen = len;
hudakz 0:238f0d0c0ba3 1011 inflightQoS = qos;
hudakz 0:238f0d0c0ba3 1012 #if MQTTCLIENT_QOS2
hudakz 0:238f0d0c0ba3 1013 pubrel = false;
hudakz 0:238f0d0c0ba3 1014 #endif
hudakz 0:238f0d0c0ba3 1015 }
hudakz 0:238f0d0c0ba3 1016 #endif
hudakz 0:238f0d0c0ba3 1017
hudakz 0:238f0d0c0ba3 1018 rc = publish(len, timer, qos);
hudakz 0:238f0d0c0ba3 1019 exit:
hudakz 0:238f0d0c0ba3 1020 return rc;
hudakz 0:238f0d0c0ba3 1021 }
hudakz 0:238f0d0c0ba3 1022
hudakz 0:238f0d0c0ba3 1023
hudakz 0:238f0d0c0ba3 1024 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 1025 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
hudakz 0:238f0d0c0ba3 1026 {
hudakz 0:238f0d0c0ba3 1027 unsigned short id = 0; // dummy - not used for anything
hudakz 0:238f0d0c0ba3 1028 return publish(topicName, payload, payloadlen, id, qos, retained);
hudakz 0:238f0d0c0ba3 1029 }
hudakz 0:238f0d0c0ba3 1030
hudakz 0:238f0d0c0ba3 1031
hudakz 0:238f0d0c0ba3 1032 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 1033 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
hudakz 0:238f0d0c0ba3 1034 {
hudakz 0:238f0d0c0ba3 1035 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
hudakz 0:238f0d0c0ba3 1036 }
hudakz 0:238f0d0c0ba3 1037
hudakz 0:238f0d0c0ba3 1038
hudakz 0:238f0d0c0ba3 1039 template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
hudakz 0:238f0d0c0ba3 1040 int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
hudakz 0:238f0d0c0ba3 1041 {
hudakz 0:238f0d0c0ba3 1042 int rc = FAILURE;
hudakz 0:238f0d0c0ba3 1043 Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
hudakz 0:238f0d0c0ba3 1044 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
hudakz 0:238f0d0c0ba3 1045 if (len > 0)
hudakz 0:238f0d0c0ba3 1046 rc = sendPacket(len, timer); // send the disconnect packet
hudakz 0:238f0d0c0ba3 1047 closeSession();
hudakz 0:238f0d0c0ba3 1048 return rc;
hudakz 0:238f0d0c0ba3 1049 }
hudakz 0:238f0d0c0ba3 1050
hudakz 0:238f0d0c0ba3 1051 #endif