【mbed OS5対応バージョン】データの保存、更新、取得ができるWebサービス「milkcocoa」に接続し、データのプッシュ、送信、取得ができるライブラリです。 https://mlkcca.com/

Dependents:   mbed-os-example-wifi-milkcocoa MilkcocoaOsSample_Eth MilkcocoaOsSample_ESP8266 MilkcocoaOsSample_Eth_DigitalIn

Committer:
jksoft
Date:
Thu Sep 07 01:26:48 2017 +0000
Revision:
11:278ba4de2e99
Parent:
10:c52abd2d6595
???????(FP->_FP)

Who changed what in which revision?

UserRevisionLine numberNew contents of line
jksoft 10:c52abd2d6595 1 /*******************************************************************************
jksoft 10:c52abd2d6595 2 * Copyright (c) 2014, 2015 IBM Corp.
jksoft 10:c52abd2d6595 3 *
jksoft 10:c52abd2d6595 4 * All rights reserved. This program and the accompanying materials
jksoft 10:c52abd2d6595 5 * are made available under the terms of the Eclipse Public License v1.0
jksoft 10:c52abd2d6595 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
jksoft 10:c52abd2d6595 7 *
jksoft 10:c52abd2d6595 8 * The Eclipse Public License is available at
jksoft 10:c52abd2d6595 9 * http://www.eclipse.org/legal/epl-v10.html
jksoft 10:c52abd2d6595 10 * and the Eclipse Distribution License is available at
jksoft 10:c52abd2d6595 11 * http://www.eclipse.org/org/documents/edl-v10.php.
jksoft 10:c52abd2d6595 12 *
jksoft 10:c52abd2d6595 13 * Contributors:
jksoft 10:c52abd2d6595 14 * Ian Craggs - initial API and implementation and/or initial documentation
jksoft 10:c52abd2d6595 15 * Ian Craggs - fix for bug 458512 - QoS 2 messages
jksoft 10:c52abd2d6595 16 * Ian Craggs - fix for bug 460389 - send loop uses wrong length
jksoft 10:c52abd2d6595 17 * Ian Craggs - fix for bug 464169 - clearing subscriptions
jksoft 10:c52abd2d6595 18 * Ian Craggs - fix for bug 464551 - enums and ints can be different size
jksoft 10:c52abd2d6595 19 *******************************************************************************/
jksoft 10:c52abd2d6595 20
jksoft 10:c52abd2d6595 21 #if !defined(MQTTCLIENT_H)
jksoft 10:c52abd2d6595 22 #define MQTTCLIENT_H
jksoft 10:c52abd2d6595 23
jksoft 10:c52abd2d6595 24 #include "FP.h"
jksoft 10:c52abd2d6595 25 #include "MQTTPacket.h"
jksoft 10:c52abd2d6595 26 #include "stdio.h"
jksoft 10:c52abd2d6595 27 #include "MQTTLogging.h"
jksoft 10:c52abd2d6595 28 #include "OldTimer.h"
jksoft 10:c52abd2d6595 29
jksoft 10:c52abd2d6595 30 #if !defined(MQTTCLIENT_QOS1)
jksoft 10:c52abd2d6595 31 #define MQTTCLIENT_QOS1 1
jksoft 10:c52abd2d6595 32 #endif
jksoft 10:c52abd2d6595 33 #if !defined(MQTTCLIENT_QOS2)
jksoft 10:c52abd2d6595 34 #define MQTTCLIENT_QOS2 0
jksoft 10:c52abd2d6595 35 #endif
jksoft 10:c52abd2d6595 36
jksoft 10:c52abd2d6595 37 #if 1
jksoft 10:c52abd2d6595 38 extern RawSerial pc;
jksoft 10:c52abd2d6595 39
jksoft 10:c52abd2d6595 40 #define DBG(x) x
jksoft 10:c52abd2d6595 41 #else
jksoft 10:c52abd2d6595 42 #define DBG(x)
jksoft 10:c52abd2d6595 43 #endif
jksoft 10:c52abd2d6595 44
jksoft 10:c52abd2d6595 45 namespace MQTT
jksoft 10:c52abd2d6595 46 {
jksoft 10:c52abd2d6595 47
jksoft 10:c52abd2d6595 48
jksoft 10:c52abd2d6595 49 enum QoS { QOS0, QOS1, QOS2 };
jksoft 10:c52abd2d6595 50
jksoft 10:c52abd2d6595 51 // all failure return codes must be negative
jksoft 10:c52abd2d6595 52 enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
jksoft 10:c52abd2d6595 53
jksoft 10:c52abd2d6595 54
jksoft 10:c52abd2d6595 55 struct Message
jksoft 10:c52abd2d6595 56 {
jksoft 10:c52abd2d6595 57 enum QoS qos;
jksoft 10:c52abd2d6595 58 bool retained;
jksoft 10:c52abd2d6595 59 bool dup;
jksoft 10:c52abd2d6595 60 unsigned short id;
jksoft 10:c52abd2d6595 61 void *payload;
jksoft 10:c52abd2d6595 62 size_t payloadlen;
jksoft 10:c52abd2d6595 63 };
jksoft 10:c52abd2d6595 64
jksoft 10:c52abd2d6595 65
jksoft 10:c52abd2d6595 66 struct MessageData
jksoft 10:c52abd2d6595 67 {
jksoft 10:c52abd2d6595 68 MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName)
jksoft 10:c52abd2d6595 69 { }
jksoft 10:c52abd2d6595 70
jksoft 10:c52abd2d6595 71 struct Message &message;
jksoft 10:c52abd2d6595 72 MQTTString &topicName;
jksoft 10:c52abd2d6595 73 };
jksoft 10:c52abd2d6595 74
jksoft 10:c52abd2d6595 75
jksoft 10:c52abd2d6595 76 class PacketId
jksoft 10:c52abd2d6595 77 {
jksoft 10:c52abd2d6595 78 public:
jksoft 10:c52abd2d6595 79 PacketId()
jksoft 10:c52abd2d6595 80 {
jksoft 10:c52abd2d6595 81 next = 0;
jksoft 10:c52abd2d6595 82 }
jksoft 10:c52abd2d6595 83
jksoft 10:c52abd2d6595 84 int getNext()
jksoft 10:c52abd2d6595 85 {
jksoft 10:c52abd2d6595 86 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
jksoft 10:c52abd2d6595 87 }
jksoft 10:c52abd2d6595 88
jksoft 10:c52abd2d6595 89 private:
jksoft 10:c52abd2d6595 90 static const int MAX_PACKET_ID = 65535;
jksoft 10:c52abd2d6595 91 int next;
jksoft 10:c52abd2d6595 92 };
jksoft 10:c52abd2d6595 93
jksoft 10:c52abd2d6595 94
jksoft 10:c52abd2d6595 95 /**
jksoft 10:c52abd2d6595 96 * @class Client
jksoft 10:c52abd2d6595 97 * @brief blocking, non-threaded MQTT client API
jksoft 10:c52abd2d6595 98 *
jksoft 10:c52abd2d6595 99 * This version of the API blocks on all method calls, until they are complete. This means that only one
jksoft 10:c52abd2d6595 100 * MQTT request can be in process at any one time.
jksoft 10:c52abd2d6595 101 * @param Network a network class which supports send, receive
jksoft 10:c52abd2d6595 102 * @param Timer a timer class with the methods:
jksoft 10:c52abd2d6595 103 */
jksoft 10:c52abd2d6595 104 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE = 256, int MAX_MESSAGE_HANDLERS = 5>
jksoft 10:c52abd2d6595 105 class Client
jksoft 10:c52abd2d6595 106 {
jksoft 10:c52abd2d6595 107
jksoft 10:c52abd2d6595 108 public:
jksoft 10:c52abd2d6595 109
jksoft 10:c52abd2d6595 110 typedef void (*messageHandler)(MessageData&);
jksoft 10:c52abd2d6595 111
jksoft 10:c52abd2d6595 112 /** Construct the client
jksoft 10:c52abd2d6595 113 * @param network - pointer to an instance of the Network class - must be connected to the endpoint
jksoft 10:c52abd2d6595 114 * before calling MQTT connect
jksoft 10:c52abd2d6595 115 * @param limits an instance of the Limit class - to alter limits as required
jksoft 10:c52abd2d6595 116 */
jksoft 10:c52abd2d6595 117 Client(Network& network, unsigned int command_timeout_ms = 30000);
jksoft 10:c52abd2d6595 118
jksoft 10:c52abd2d6595 119 /** Set the default message handling callback - used for any message which does not match a subscription message handler
jksoft 10:c52abd2d6595 120 * @param mh - pointer to the callback function
jksoft 10:c52abd2d6595 121 */
jksoft 10:c52abd2d6595 122 void setDefaultMessageHandler(messageHandler mh)
jksoft 10:c52abd2d6595 123 {
jksoft 10:c52abd2d6595 124 defaultMessageHandler.attach(mh);
jksoft 10:c52abd2d6595 125 }
jksoft 10:c52abd2d6595 126
jksoft 10:c52abd2d6595 127 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jksoft 10:c52abd2d6595 128 * The nework object must be connected to the network endpoint before calling this
jksoft 10:c52abd2d6595 129 * Default connect options are used
jksoft 10:c52abd2d6595 130 * @return success code -
jksoft 10:c52abd2d6595 131 */
jksoft 10:c52abd2d6595 132 int connect();
jksoft 10:c52abd2d6595 133
jksoft 10:c52abd2d6595 134 /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
jksoft 10:c52abd2d6595 135 * The nework object must be connected to the network endpoint before calling this
jksoft 10:c52abd2d6595 136 * @param options - connect options
jksoft 10:c52abd2d6595 137 * @return success code -
jksoft 10:c52abd2d6595 138 */
jksoft 10:c52abd2d6595 139 int connect(MQTTPacket_connectData& options);
jksoft 10:c52abd2d6595 140
jksoft 10:c52abd2d6595 141 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jksoft 10:c52abd2d6595 142 * @param topic - the topic to publish to
jksoft 10:c52abd2d6595 143 * @param message - the message to send
jksoft 10:c52abd2d6595 144 * @return success code -
jksoft 10:c52abd2d6595 145 */
jksoft 10:c52abd2d6595 146 int publish(const char* topicName, Message& message);
jksoft 10:c52abd2d6595 147
jksoft 10:c52abd2d6595 148 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jksoft 10:c52abd2d6595 149 * @param topic - the topic to publish to
jksoft 10:c52abd2d6595 150 * @param payload - the data to send
jksoft 10:c52abd2d6595 151 * @param payloadlen - the length of the data
jksoft 10:c52abd2d6595 152 * @param qos - the QoS to send the publish at
jksoft 10:c52abd2d6595 153 * @param retained - whether the message should be retained
jksoft 10:c52abd2d6595 154 * @return success code -
jksoft 10:c52abd2d6595 155 */
jksoft 10:c52abd2d6595 156 int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
jksoft 10:c52abd2d6595 157
jksoft 10:c52abd2d6595 158 /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
jksoft 10:c52abd2d6595 159 * @param topic - the topic to publish to
jksoft 10:c52abd2d6595 160 * @param payload - the data to send
jksoft 10:c52abd2d6595 161 * @param payloadlen - the length of the data
jksoft 10:c52abd2d6595 162 * @param id - the packet id used - returned
jksoft 10:c52abd2d6595 163 * @param qos - the QoS to send the publish at
jksoft 10:c52abd2d6595 164 * @param retained - whether the message should be retained
jksoft 10:c52abd2d6595 165 * @return success code -
jksoft 10:c52abd2d6595 166 */
jksoft 10:c52abd2d6595 167 int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
jksoft 10:c52abd2d6595 168
jksoft 10:c52abd2d6595 169 /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback
jksoft 10:c52abd2d6595 170 * @param topicFilter - a topic pattern which can include wildcards
jksoft 10:c52abd2d6595 171 * @param qos - the MQTT QoS to subscribe at
jksoft 10:c52abd2d6595 172 * @param mh - the callback function to be invoked when a message is received for this subscription
jksoft 10:c52abd2d6595 173 * @return success code -
jksoft 10:c52abd2d6595 174 */
jksoft 10:c52abd2d6595 175 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh);
jksoft 10:c52abd2d6595 176
jksoft 10:c52abd2d6595 177 /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback
jksoft 10:c52abd2d6595 178 * @param topicFilter - a topic pattern which can include wildcards
jksoft 10:c52abd2d6595 179 * @return success code -
jksoft 10:c52abd2d6595 180 */
jksoft 10:c52abd2d6595 181 int unsubscribe(const char* topicFilter);
jksoft 10:c52abd2d6595 182
jksoft 10:c52abd2d6595 183 /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state
jksoft 10:c52abd2d6595 184 * @return success code -
jksoft 10:c52abd2d6595 185 */
jksoft 10:c52abd2d6595 186 int disconnect();
jksoft 10:c52abd2d6595 187
jksoft 10:c52abd2d6595 188 /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive
jksoft 10:c52abd2d6595 189 * yield can be called if no other MQTT operation is needed. This will also allow messages to be
jksoft 10:c52abd2d6595 190 * received.
jksoft 10:c52abd2d6595 191 * @param timeout_ms the time to wait, in milliseconds
jksoft 10:c52abd2d6595 192 * @return success code - on failure, this means the client has disconnected
jksoft 10:c52abd2d6595 193 */
jksoft 10:c52abd2d6595 194 int yield(unsigned long timeout_ms = 1000L);
jksoft 10:c52abd2d6595 195
jksoft 10:c52abd2d6595 196 /** Is the client connected?
jksoft 10:c52abd2d6595 197 * @return flag - is the client connected or not?
jksoft 10:c52abd2d6595 198 */
jksoft 10:c52abd2d6595 199 bool isConnected()
jksoft 10:c52abd2d6595 200 {
jksoft 10:c52abd2d6595 201 return isconnected;
jksoft 10:c52abd2d6595 202 }
jksoft 10:c52abd2d6595 203
jksoft 10:c52abd2d6595 204 private:
jksoft 10:c52abd2d6595 205
jksoft 10:c52abd2d6595 206 void cleanSession();
jksoft 10:c52abd2d6595 207 int cycle(OldTimer& timer);
jksoft 10:c52abd2d6595 208 int waitfor(int packet_type, OldTimer& timer);
jksoft 10:c52abd2d6595 209 int keepalive();
jksoft 10:c52abd2d6595 210 int publish(int len, OldTimer& timer, enum QoS qos);
jksoft 10:c52abd2d6595 211
jksoft 10:c52abd2d6595 212 int decodePacket(int* value, int timeout);
jksoft 10:c52abd2d6595 213 int readPacket(OldTimer& timer);
jksoft 10:c52abd2d6595 214 int sendPacket(int length, OldTimer& timer);
jksoft 10:c52abd2d6595 215 int deliverMessage(MQTTString& topicName, Message& message);
jksoft 10:c52abd2d6595 216 bool isTopicMatched(char* topicFilter, MQTTString& topicName);
jksoft 10:c52abd2d6595 217
jksoft 10:c52abd2d6595 218 Network& ipstack;
jksoft 10:c52abd2d6595 219 unsigned long command_timeout_ms;
jksoft 10:c52abd2d6595 220
jksoft 10:c52abd2d6595 221 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
jksoft 10:c52abd2d6595 222 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
jksoft 10:c52abd2d6595 223
jksoft 10:c52abd2d6595 224 OldTimer last_sent, last_received;
jksoft 10:c52abd2d6595 225 unsigned int keepAliveInterval;
jksoft 10:c52abd2d6595 226 bool ping_outstanding;
jksoft 10:c52abd2d6595 227 bool cleansession;
jksoft 10:c52abd2d6595 228
jksoft 10:c52abd2d6595 229 PacketId packetid;
jksoft 10:c52abd2d6595 230
jksoft 10:c52abd2d6595 231 struct MessageHandlers
jksoft 10:c52abd2d6595 232 {
jksoft 10:c52abd2d6595 233 const char* topicFilter;
jksoft 11:278ba4de2e99 234 _FP<void, MessageData&> fp;
jksoft 10:c52abd2d6595 235 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic
jksoft 10:c52abd2d6595 236
jksoft 11:278ba4de2e99 237 _FP<void, MessageData&> defaultMessageHandler;
jksoft 10:c52abd2d6595 238
jksoft 10:c52abd2d6595 239 bool isconnected;
jksoft 10:c52abd2d6595 240
jksoft 10:c52abd2d6595 241 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 242 unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect
jksoft 10:c52abd2d6595 243 int inflightLen;
jksoft 10:c52abd2d6595 244 unsigned short inflightMsgid;
jksoft 10:c52abd2d6595 245 enum QoS inflightQoS;
jksoft 10:c52abd2d6595 246 #endif
jksoft 10:c52abd2d6595 247
jksoft 10:c52abd2d6595 248 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 249 bool pubrel;
jksoft 10:c52abd2d6595 250 #if !defined(MAX_INCOMING_QOS2_MESSAGES)
jksoft 10:c52abd2d6595 251 #define MAX_INCOMING_QOS2_MESSAGES 10
jksoft 10:c52abd2d6595 252 #endif
jksoft 10:c52abd2d6595 253 unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES];
jksoft 10:c52abd2d6595 254 bool isQoS2msgidFree(unsigned short id);
jksoft 10:c52abd2d6595 255 bool useQoS2msgid(unsigned short id);
jksoft 10:c52abd2d6595 256 void freeQoS2msgid(unsigned short id);
jksoft 10:c52abd2d6595 257 #endif
jksoft 10:c52abd2d6595 258
jksoft 10:c52abd2d6595 259 };
jksoft 10:c52abd2d6595 260
jksoft 10:c52abd2d6595 261 }
jksoft 10:c52abd2d6595 262
jksoft 10:c52abd2d6595 263
jksoft 10:c52abd2d6595 264 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS>
jksoft 10:c52abd2d6595 265 void MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::cleanSession()
jksoft 10:c52abd2d6595 266 {
jksoft 10:c52abd2d6595 267 ping_outstanding = false;
jksoft 10:c52abd2d6595 268 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jksoft 10:c52abd2d6595 269 messageHandlers[i].topicFilter = 0;
jksoft 10:c52abd2d6595 270 isconnected = false;
jksoft 10:c52abd2d6595 271
jksoft 10:c52abd2d6595 272 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 273 inflightMsgid = 0;
jksoft 10:c52abd2d6595 274 inflightQoS = QOS0;
jksoft 10:c52abd2d6595 275 #endif
jksoft 10:c52abd2d6595 276
jksoft 10:c52abd2d6595 277 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 278 pubrel = false;
jksoft 10:c52abd2d6595 279 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jksoft 10:c52abd2d6595 280 incomingQoS2messages[i] = 0;
jksoft 10:c52abd2d6595 281 #endif
jksoft 10:c52abd2d6595 282 }
jksoft 10:c52abd2d6595 283
jksoft 10:c52abd2d6595 284
jksoft 10:c52abd2d6595 285 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS>
jksoft 10:c52abd2d6595 286 MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid()
jksoft 10:c52abd2d6595 287 {
jksoft 10:c52abd2d6595 288 last_sent = OldTimer();
jksoft 10:c52abd2d6595 289 last_received = OldTimer();
jksoft 10:c52abd2d6595 290 this->command_timeout_ms = command_timeout_ms;
jksoft 10:c52abd2d6595 291 cleanSession();
jksoft 10:c52abd2d6595 292 }
jksoft 10:c52abd2d6595 293
jksoft 10:c52abd2d6595 294
jksoft 10:c52abd2d6595 295 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 296 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 297 bool MQTT::Client<Network, OldTimer, a, b>::isQoS2msgidFree(unsigned short id)
jksoft 10:c52abd2d6595 298 {
jksoft 10:c52abd2d6595 299 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jksoft 10:c52abd2d6595 300 {
jksoft 10:c52abd2d6595 301 if (incomingQoS2messages[i] == id)
jksoft 10:c52abd2d6595 302 return false;
jksoft 10:c52abd2d6595 303 }
jksoft 10:c52abd2d6595 304 return true;
jksoft 10:c52abd2d6595 305 }
jksoft 10:c52abd2d6595 306
jksoft 10:c52abd2d6595 307
jksoft 10:c52abd2d6595 308 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 309 bool MQTT::Client<Network, OldTimer, a, b>::useQoS2msgid(unsigned short id)
jksoft 10:c52abd2d6595 310 {
jksoft 10:c52abd2d6595 311 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jksoft 10:c52abd2d6595 312 {
jksoft 10:c52abd2d6595 313 if (incomingQoS2messages[i] == 0)
jksoft 10:c52abd2d6595 314 {
jksoft 10:c52abd2d6595 315 incomingQoS2messages[i] = id;
jksoft 10:c52abd2d6595 316 return true;
jksoft 10:c52abd2d6595 317 }
jksoft 10:c52abd2d6595 318 }
jksoft 10:c52abd2d6595 319 return false;
jksoft 10:c52abd2d6595 320 }
jksoft 10:c52abd2d6595 321
jksoft 10:c52abd2d6595 322
jksoft 10:c52abd2d6595 323 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 324 void MQTT::Client<Network, OldTimer, a, b>::freeQoS2msgid(unsigned short id)
jksoft 10:c52abd2d6595 325 {
jksoft 10:c52abd2d6595 326 for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i)
jksoft 10:c52abd2d6595 327 {
jksoft 10:c52abd2d6595 328 if (incomingQoS2messages[i] == id)
jksoft 10:c52abd2d6595 329 {
jksoft 10:c52abd2d6595 330 incomingQoS2messages[i] = 0;
jksoft 10:c52abd2d6595 331 return;
jksoft 10:c52abd2d6595 332 }
jksoft 10:c52abd2d6595 333 }
jksoft 10:c52abd2d6595 334 }
jksoft 10:c52abd2d6595 335 #endif
jksoft 10:c52abd2d6595 336
jksoft 10:c52abd2d6595 337
jksoft 10:c52abd2d6595 338 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 339 int MQTT::Client<Network, OldTimer, a, b>::sendPacket(int length, OldTimer& timer)
jksoft 10:c52abd2d6595 340 {
jksoft 10:c52abd2d6595 341 int rc = FAILURE,
jksoft 10:c52abd2d6595 342 sent = 0;
jksoft 10:c52abd2d6595 343
jksoft 10:c52abd2d6595 344 while (sent < length && !timer.expired())
jksoft 10:c52abd2d6595 345 {
jksoft 10:c52abd2d6595 346 rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms());
jksoft 10:c52abd2d6595 347 if (rc < 0) // there was an error writing the data
jksoft 10:c52abd2d6595 348 break;
jksoft 10:c52abd2d6595 349 sent += rc;
jksoft 10:c52abd2d6595 350 }
jksoft 10:c52abd2d6595 351 if (sent == length)
jksoft 10:c52abd2d6595 352 {
jksoft 10:c52abd2d6595 353 if (this->keepAliveInterval > 0)
jksoft 10:c52abd2d6595 354 last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet
jksoft 10:c52abd2d6595 355 rc = SUCCESS;
jksoft 10:c52abd2d6595 356 }
jksoft 10:c52abd2d6595 357 else
jksoft 10:c52abd2d6595 358 rc = FAILURE;
jksoft 10:c52abd2d6595 359
jksoft 10:c52abd2d6595 360 #if defined(MQTT_DEBUG)
jksoft 10:c52abd2d6595 361 char printbuf[150];
jksoft 10:c52abd2d6595 362 DEBUG("Rc %d from sending packet %s\n", rc, MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length));
jksoft 10:c52abd2d6595 363 #endif
jksoft 10:c52abd2d6595 364 return rc;
jksoft 10:c52abd2d6595 365 }
jksoft 10:c52abd2d6595 366
jksoft 10:c52abd2d6595 367
jksoft 10:c52abd2d6595 368 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 369 int MQTT::Client<Network, OldTimer, a, b>::decodePacket(int* value, int timeout)
jksoft 10:c52abd2d6595 370 {
jksoft 10:c52abd2d6595 371 unsigned char c;
jksoft 10:c52abd2d6595 372 int multiplier = 1;
jksoft 10:c52abd2d6595 373 int len = 0;
jksoft 10:c52abd2d6595 374 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
jksoft 10:c52abd2d6595 375
jksoft 10:c52abd2d6595 376 *value = 0;
jksoft 10:c52abd2d6595 377 do
jksoft 10:c52abd2d6595 378 {
jksoft 10:c52abd2d6595 379 int rc = MQTTPACKET_READ_ERROR;
jksoft 10:c52abd2d6595 380
jksoft 10:c52abd2d6595 381 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
jksoft 10:c52abd2d6595 382 {
jksoft 10:c52abd2d6595 383 rc = MQTTPACKET_READ_ERROR; /* bad data */
jksoft 10:c52abd2d6595 384 goto exit;
jksoft 10:c52abd2d6595 385 }
jksoft 10:c52abd2d6595 386 rc = ipstack.read(&c, 1, timeout);
jksoft 10:c52abd2d6595 387 if (rc != 1)
jksoft 10:c52abd2d6595 388 goto exit;
jksoft 10:c52abd2d6595 389 *value += (c & 127) * multiplier;
jksoft 10:c52abd2d6595 390 multiplier *= 128;
jksoft 10:c52abd2d6595 391 } while ((c & 128) != 0);
jksoft 10:c52abd2d6595 392 exit:
jksoft 10:c52abd2d6595 393 return len;
jksoft 10:c52abd2d6595 394 }
jksoft 10:c52abd2d6595 395
jksoft 10:c52abd2d6595 396
jksoft 10:c52abd2d6595 397 /**
jksoft 10:c52abd2d6595 398 * If any read fails in this method, then we should disconnect from the network, as on reconnect
jksoft 10:c52abd2d6595 399 * the packets can be retried.
jksoft 10:c52abd2d6595 400 * @param timeout the max time to wait for the packet read to complete, in milliseconds
jksoft 10:c52abd2d6595 401 * @return the MQTT packet type, or -1 if none
jksoft 10:c52abd2d6595 402 */
jksoft 10:c52abd2d6595 403 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 404 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::readPacket(OldTimer& timer)
jksoft 10:c52abd2d6595 405 {
jksoft 10:c52abd2d6595 406 int rc = FAILURE;
jksoft 10:c52abd2d6595 407 MQTTHeader header = {0};
jksoft 10:c52abd2d6595 408 int len = 0;
jksoft 10:c52abd2d6595 409 int rem_len = 0;
jksoft 10:c52abd2d6595 410
jksoft 10:c52abd2d6595 411 /* 1. read the header byte. This has the packet type in it */
jksoft 10:c52abd2d6595 412 if (ipstack.read(readbuf, 1, timer.left_ms()) != 1)
jksoft 10:c52abd2d6595 413 goto exit;
jksoft 10:c52abd2d6595 414
jksoft 10:c52abd2d6595 415 len = 1;
jksoft 10:c52abd2d6595 416 /* 2. read the remaining length. This is variable in itself */
jksoft 10:c52abd2d6595 417 decodePacket(&rem_len, timer.left_ms());
jksoft 10:c52abd2d6595 418 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
jksoft 10:c52abd2d6595 419
jksoft 10:c52abd2d6595 420 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
jksoft 10:c52abd2d6595 421 {
jksoft 10:c52abd2d6595 422 rc = BUFFER_OVERFLOW;
jksoft 10:c52abd2d6595 423 goto exit;
jksoft 10:c52abd2d6595 424 }
jksoft 10:c52abd2d6595 425
jksoft 10:c52abd2d6595 426 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
jksoft 10:c52abd2d6595 427 if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len))
jksoft 10:c52abd2d6595 428 goto exit;
jksoft 10:c52abd2d6595 429
jksoft 10:c52abd2d6595 430 header.byte = readbuf[0];
jksoft 10:c52abd2d6595 431 rc = header.bits.type;
jksoft 10:c52abd2d6595 432 if (this->keepAliveInterval > 0)
jksoft 10:c52abd2d6595 433 last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet
jksoft 10:c52abd2d6595 434 exit:
jksoft 10:c52abd2d6595 435
jksoft 10:c52abd2d6595 436 #if defined(MQTT_DEBUG)
jksoft 10:c52abd2d6595 437 if (rc >= 0)
jksoft 10:c52abd2d6595 438 {
jksoft 10:c52abd2d6595 439 char printbuf[50];
jksoft 10:c52abd2d6595 440 DEBUG("Rc %d from receiving packet %s\n", rc, MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len));
jksoft 10:c52abd2d6595 441 }
jksoft 10:c52abd2d6595 442 #endif
jksoft 10:c52abd2d6595 443 return rc;
jksoft 10:c52abd2d6595 444 }
jksoft 10:c52abd2d6595 445
jksoft 10:c52abd2d6595 446
jksoft 10:c52abd2d6595 447 // assume topic filter and name is in correct format
jksoft 10:c52abd2d6595 448 // # can only be at end
jksoft 10:c52abd2d6595 449 // + and # can only be next to separator
jksoft 10:c52abd2d6595 450 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 451 bool MQTT::Client<Network, OldTimer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName)
jksoft 10:c52abd2d6595 452 {
jksoft 10:c52abd2d6595 453 char* curf = topicFilter;
jksoft 10:c52abd2d6595 454 char* curn = topicName.lenstring.data;
jksoft 10:c52abd2d6595 455 char* curn_end = curn + topicName.lenstring.len;
jksoft 10:c52abd2d6595 456
jksoft 10:c52abd2d6595 457 while (*curf && curn < curn_end)
jksoft 10:c52abd2d6595 458 {
jksoft 10:c52abd2d6595 459 if (*curn == '/' && *curf != '/')
jksoft 10:c52abd2d6595 460 break;
jksoft 10:c52abd2d6595 461 if (*curf != '+' && *curf != '#' && *curf != *curn)
jksoft 10:c52abd2d6595 462 break;
jksoft 10:c52abd2d6595 463 if (*curf == '+')
jksoft 10:c52abd2d6595 464 { // skip until we meet the next separator, or end of string
jksoft 10:c52abd2d6595 465 char* nextpos = curn + 1;
jksoft 10:c52abd2d6595 466 while (nextpos < curn_end && *nextpos != '/')
jksoft 10:c52abd2d6595 467 nextpos = ++curn + 1;
jksoft 10:c52abd2d6595 468 }
jksoft 10:c52abd2d6595 469 else if (*curf == '#')
jksoft 10:c52abd2d6595 470 curn = curn_end - 1; // skip until end of string
jksoft 10:c52abd2d6595 471 curf++;
jksoft 10:c52abd2d6595 472 curn++;
jksoft 10:c52abd2d6595 473 };
jksoft 10:c52abd2d6595 474
jksoft 10:c52abd2d6595 475 return (curn == curn_end) && (*curf == '\0');
jksoft 10:c52abd2d6595 476 }
jksoft 10:c52abd2d6595 477
jksoft 10:c52abd2d6595 478
jksoft 10:c52abd2d6595 479
jksoft 10:c52abd2d6595 480 template<class Network, class OldTimer, int a, int MAX_MESSAGE_HANDLERS>
jksoft 10:c52abd2d6595 481 int MQTT::Client<Network, OldTimer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message)
jksoft 10:c52abd2d6595 482 {
jksoft 10:c52abd2d6595 483 int rc = FAILURE;
jksoft 10:c52abd2d6595 484
jksoft 10:c52abd2d6595 485 // we have to find the right message handler - indexed by topic
jksoft 10:c52abd2d6595 486 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jksoft 10:c52abd2d6595 487 {
jksoft 10:c52abd2d6595 488 if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) ||
jksoft 10:c52abd2d6595 489 isTopicMatched((char*)messageHandlers[i].topicFilter, topicName)))
jksoft 10:c52abd2d6595 490 {
jksoft 10:c52abd2d6595 491 if (messageHandlers[i].fp.attached())
jksoft 10:c52abd2d6595 492 {
jksoft 10:c52abd2d6595 493 MessageData md(topicName, message);
jksoft 10:c52abd2d6595 494 messageHandlers[i].fp(md);
jksoft 10:c52abd2d6595 495 rc = SUCCESS;
jksoft 10:c52abd2d6595 496 }
jksoft 10:c52abd2d6595 497 }
jksoft 10:c52abd2d6595 498 }
jksoft 10:c52abd2d6595 499
jksoft 10:c52abd2d6595 500 if (rc == FAILURE && defaultMessageHandler.attached())
jksoft 10:c52abd2d6595 501 {
jksoft 10:c52abd2d6595 502 MessageData md(topicName, message);
jksoft 10:c52abd2d6595 503 defaultMessageHandler(md);
jksoft 10:c52abd2d6595 504 rc = SUCCESS;
jksoft 10:c52abd2d6595 505 }
jksoft 10:c52abd2d6595 506
jksoft 10:c52abd2d6595 507 return rc;
jksoft 10:c52abd2d6595 508 }
jksoft 10:c52abd2d6595 509
jksoft 10:c52abd2d6595 510
jksoft 10:c52abd2d6595 511
jksoft 10:c52abd2d6595 512 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 513 int MQTT::Client<Network, OldTimer, a, b>::yield(unsigned long timeout_ms)
jksoft 10:c52abd2d6595 514 {
jksoft 10:c52abd2d6595 515 int rc = SUCCESS;
jksoft 10:c52abd2d6595 516 OldTimer timer = OldTimer();
jksoft 10:c52abd2d6595 517
jksoft 10:c52abd2d6595 518 timer.countdown_ms(timeout_ms);
jksoft 10:c52abd2d6595 519 while (!timer.expired())
jksoft 10:c52abd2d6595 520 {
jksoft 10:c52abd2d6595 521 if (cycle(timer) < 0)
jksoft 10:c52abd2d6595 522 {
jksoft 10:c52abd2d6595 523 rc = FAILURE;
jksoft 10:c52abd2d6595 524 break;
jksoft 10:c52abd2d6595 525 }
jksoft 10:c52abd2d6595 526 Thread::wait(10);
jksoft 10:c52abd2d6595 527 }
jksoft 10:c52abd2d6595 528
jksoft 10:c52abd2d6595 529 return rc;
jksoft 10:c52abd2d6595 530 }
jksoft 10:c52abd2d6595 531
jksoft 10:c52abd2d6595 532
jksoft 10:c52abd2d6595 533 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 534 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::cycle(OldTimer& timer)
jksoft 10:c52abd2d6595 535 {
jksoft 10:c52abd2d6595 536 /* get one piece of work off the wire and one pass through */
jksoft 10:c52abd2d6595 537
jksoft 10:c52abd2d6595 538 // read the socket, see what work is due
jksoft 10:c52abd2d6595 539 int packet_type = readPacket(timer);
jksoft 10:c52abd2d6595 540
jksoft 10:c52abd2d6595 541 int len = 0,
jksoft 10:c52abd2d6595 542 rc = SUCCESS;
jksoft 10:c52abd2d6595 543
jksoft 10:c52abd2d6595 544 switch (packet_type)
jksoft 10:c52abd2d6595 545 {
jksoft 10:c52abd2d6595 546 case FAILURE:
jksoft 10:c52abd2d6595 547 case BUFFER_OVERFLOW:
jksoft 10:c52abd2d6595 548 rc = packet_type;
jksoft 10:c52abd2d6595 549 break;
jksoft 10:c52abd2d6595 550 case CONNACK:
jksoft 10:c52abd2d6595 551 case PUBACK:
jksoft 10:c52abd2d6595 552 case SUBACK:
jksoft 10:c52abd2d6595 553 break;
jksoft 10:c52abd2d6595 554 case PUBLISH:
jksoft 10:c52abd2d6595 555 {
jksoft 10:c52abd2d6595 556 MQTTString topicName = MQTTString_initializer;
jksoft 10:c52abd2d6595 557 Message msg;
jksoft 10:c52abd2d6595 558 int intQoS;
jksoft 10:c52abd2d6595 559 if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName,
jksoft 10:c52abd2d6595 560 (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jksoft 10:c52abd2d6595 561 goto exit;
jksoft 10:c52abd2d6595 562 msg.qos = (enum QoS)intQoS;
jksoft 10:c52abd2d6595 563 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 564 if (msg.qos != QOS2)
jksoft 10:c52abd2d6595 565 #endif
jksoft 10:c52abd2d6595 566 deliverMessage(topicName, msg);
jksoft 10:c52abd2d6595 567 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 568 else if (isQoS2msgidFree(msg.id))
jksoft 10:c52abd2d6595 569 {
jksoft 10:c52abd2d6595 570 if (useQoS2msgid(msg.id))
jksoft 10:c52abd2d6595 571 deliverMessage(topicName, msg);
jksoft 10:c52abd2d6595 572 else
jksoft 10:c52abd2d6595 573 WARN("Maximum number of incoming QoS2 messages exceeded");
jksoft 10:c52abd2d6595 574 }
jksoft 10:c52abd2d6595 575 #endif
jksoft 10:c52abd2d6595 576 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 577 if (msg.qos != QOS0)
jksoft 10:c52abd2d6595 578 {
jksoft 10:c52abd2d6595 579 if (msg.qos == QOS1)
jksoft 10:c52abd2d6595 580 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id);
jksoft 10:c52abd2d6595 581 else if (msg.qos == QOS2)
jksoft 10:c52abd2d6595 582 len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id);
jksoft 10:c52abd2d6595 583 if (len <= 0)
jksoft 10:c52abd2d6595 584 rc = FAILURE;
jksoft 10:c52abd2d6595 585 else
jksoft 10:c52abd2d6595 586 rc = sendPacket(len, timer);
jksoft 10:c52abd2d6595 587 if (rc == FAILURE)
jksoft 10:c52abd2d6595 588 goto exit; // there was a problem
jksoft 10:c52abd2d6595 589 }
jksoft 10:c52abd2d6595 590 break;
jksoft 10:c52abd2d6595 591 #endif
jksoft 10:c52abd2d6595 592 }
jksoft 10:c52abd2d6595 593 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 594 case PUBREC:
jksoft 10:c52abd2d6595 595 case PUBREL:
jksoft 10:c52abd2d6595 596 unsigned short mypacketid;
jksoft 10:c52abd2d6595 597 unsigned char dup, type;
jksoft 10:c52abd2d6595 598 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jksoft 10:c52abd2d6595 599 rc = FAILURE;
jksoft 10:c52abd2d6595 600 else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE,
jksoft 10:c52abd2d6595 601 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
jksoft 10:c52abd2d6595 602 rc = FAILURE;
jksoft 10:c52abd2d6595 603 else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet
jksoft 10:c52abd2d6595 604 rc = FAILURE; // there was a problem
jksoft 10:c52abd2d6595 605 if (rc == FAILURE)
jksoft 10:c52abd2d6595 606 goto exit; // there was a problem
jksoft 10:c52abd2d6595 607 if (packet_type == PUBREL)
jksoft 10:c52abd2d6595 608 freeQoS2msgid(mypacketid);
jksoft 10:c52abd2d6595 609 break;
jksoft 10:c52abd2d6595 610
jksoft 10:c52abd2d6595 611 case PUBCOMP:
jksoft 10:c52abd2d6595 612 break;
jksoft 10:c52abd2d6595 613 #endif
jksoft 10:c52abd2d6595 614 case PINGRESP:
jksoft 10:c52abd2d6595 615 ping_outstanding = false;
jksoft 10:c52abd2d6595 616 break;
jksoft 10:c52abd2d6595 617 }
jksoft 10:c52abd2d6595 618 keepalive();
jksoft 10:c52abd2d6595 619 exit:
jksoft 10:c52abd2d6595 620 if (rc == SUCCESS)
jksoft 10:c52abd2d6595 621 rc = packet_type;
jksoft 10:c52abd2d6595 622 return rc;
jksoft 10:c52abd2d6595 623 }
jksoft 10:c52abd2d6595 624
jksoft 10:c52abd2d6595 625
jksoft 10:c52abd2d6595 626 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 627 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::keepalive()
jksoft 10:c52abd2d6595 628 {
jksoft 10:c52abd2d6595 629 int rc = FAILURE;
jksoft 10:c52abd2d6595 630
jksoft 10:c52abd2d6595 631 if (keepAliveInterval == 0)
jksoft 10:c52abd2d6595 632 {
jksoft 10:c52abd2d6595 633 rc = SUCCESS;
jksoft 10:c52abd2d6595 634 goto exit;
jksoft 10:c52abd2d6595 635 }
jksoft 10:c52abd2d6595 636
jksoft 10:c52abd2d6595 637 if (last_sent.expired() || last_received.expired())
jksoft 10:c52abd2d6595 638 {
jksoft 10:c52abd2d6595 639 if (!ping_outstanding)
jksoft 10:c52abd2d6595 640 {
jksoft 10:c52abd2d6595 641 OldTimer timer(1000);
jksoft 10:c52abd2d6595 642 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
jksoft 10:c52abd2d6595 643 if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet
jksoft 10:c52abd2d6595 644 ping_outstanding = true;
jksoft 10:c52abd2d6595 645 }
jksoft 10:c52abd2d6595 646 }
jksoft 10:c52abd2d6595 647
jksoft 10:c52abd2d6595 648 exit:
jksoft 10:c52abd2d6595 649 return rc;
jksoft 10:c52abd2d6595 650 }
jksoft 10:c52abd2d6595 651
jksoft 10:c52abd2d6595 652
jksoft 10:c52abd2d6595 653 // only used in single-threaded mode where one command at a time is in process
jksoft 10:c52abd2d6595 654 template<class Network, class OldTimer, int a, int b>
jksoft 10:c52abd2d6595 655 int MQTT::Client<Network, OldTimer, a, b>::waitfor(int packet_type, OldTimer& timer)
jksoft 10:c52abd2d6595 656 {
jksoft 10:c52abd2d6595 657 int rc = FAILURE;
jksoft 10:c52abd2d6595 658
jksoft 10:c52abd2d6595 659 do
jksoft 10:c52abd2d6595 660 {
jksoft 10:c52abd2d6595 661 if (timer.expired())
jksoft 10:c52abd2d6595 662 break; // we timed out
jksoft 10:c52abd2d6595 663 }
jksoft 10:c52abd2d6595 664 while ((rc = cycle(timer)) != packet_type);
jksoft 10:c52abd2d6595 665
jksoft 10:c52abd2d6595 666 return rc;
jksoft 10:c52abd2d6595 667 }
jksoft 10:c52abd2d6595 668
jksoft 10:c52abd2d6595 669
jksoft 10:c52abd2d6595 670 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 671 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options)
jksoft 10:c52abd2d6595 672 {
jksoft 10:c52abd2d6595 673 OldTimer connect_timer(command_timeout_ms);
jksoft 10:c52abd2d6595 674 int rc = FAILURE;
jksoft 10:c52abd2d6595 675 int len = 0;
jksoft 10:c52abd2d6595 676
jksoft 10:c52abd2d6595 677 if (isconnected) // don't send connect packet again if we are already connected
jksoft 10:c52abd2d6595 678 goto exit;
jksoft 10:c52abd2d6595 679
jksoft 10:c52abd2d6595 680 this->keepAliveInterval = options.keepAliveInterval;
jksoft 10:c52abd2d6595 681 this->cleansession = options.cleansession;
jksoft 10:c52abd2d6595 682 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
jksoft 10:c52abd2d6595 683 goto exit;
jksoft 10:c52abd2d6595 684 if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet
jksoft 10:c52abd2d6595 685 goto exit; // there was a problem
jksoft 10:c52abd2d6595 686
jksoft 10:c52abd2d6595 687 if (this->keepAliveInterval > 0)
jksoft 10:c52abd2d6595 688 last_received.countdown(this->keepAliveInterval);
jksoft 10:c52abd2d6595 689 // this will be a blocking call, wait for the connack
jksoft 10:c52abd2d6595 690 if (waitfor(CONNACK, connect_timer) == CONNACK)
jksoft 10:c52abd2d6595 691 {
jksoft 10:c52abd2d6595 692 unsigned char connack_rc = 255;
jksoft 10:c52abd2d6595 693 bool sessionPresent = false;
jksoft 10:c52abd2d6595 694 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jksoft 10:c52abd2d6595 695 rc = connack_rc;
jksoft 10:c52abd2d6595 696 else
jksoft 10:c52abd2d6595 697 rc = FAILURE;
jksoft 10:c52abd2d6595 698 }
jksoft 10:c52abd2d6595 699 else
jksoft 10:c52abd2d6595 700 rc = FAILURE;
jksoft 10:c52abd2d6595 701
jksoft 10:c52abd2d6595 702 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 703 // resend any inflight publish
jksoft 10:c52abd2d6595 704 if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel)
jksoft 10:c52abd2d6595 705 {
jksoft 10:c52abd2d6595 706 if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0)
jksoft 10:c52abd2d6595 707 rc = FAILURE;
jksoft 10:c52abd2d6595 708 else
jksoft 10:c52abd2d6595 709 rc = publish(len, connect_timer, inflightQoS);
jksoft 10:c52abd2d6595 710 }
jksoft 10:c52abd2d6595 711 else
jksoft 10:c52abd2d6595 712 #endif
jksoft 10:c52abd2d6595 713 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 714 if (inflightMsgid > 0)
jksoft 10:c52abd2d6595 715 {
jksoft 10:c52abd2d6595 716 memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE);
jksoft 10:c52abd2d6595 717 rc = publish(inflightLen, connect_timer, inflightQoS);
jksoft 10:c52abd2d6595 718 }
jksoft 10:c52abd2d6595 719 #endif
jksoft 10:c52abd2d6595 720
jksoft 10:c52abd2d6595 721 exit:
jksoft 10:c52abd2d6595 722 if (rc == SUCCESS)
jksoft 10:c52abd2d6595 723 isconnected = true;
jksoft 10:c52abd2d6595 724 return rc;
jksoft 10:c52abd2d6595 725 }
jksoft 10:c52abd2d6595 726
jksoft 10:c52abd2d6595 727
jksoft 10:c52abd2d6595 728 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 729 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::connect()
jksoft 10:c52abd2d6595 730 {
jksoft 10:c52abd2d6595 731 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
jksoft 10:c52abd2d6595 732 return connect(default_options);
jksoft 10:c52abd2d6595 733 }
jksoft 10:c52abd2d6595 734
jksoft 10:c52abd2d6595 735
jksoft 10:c52abd2d6595 736 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jksoft 10:c52abd2d6595 737 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler)
jksoft 10:c52abd2d6595 738 {
jksoft 10:c52abd2d6595 739 int rc = FAILURE;
jksoft 10:c52abd2d6595 740 OldTimer timer(command_timeout_ms);
jksoft 10:c52abd2d6595 741 int len = 0;
jksoft 10:c52abd2d6595 742 MQTTString topic = {(char*)topicFilter, {0, 0}};
jksoft 10:c52abd2d6595 743
jksoft 10:c52abd2d6595 744 if (!isconnected)
jksoft 10:c52abd2d6595 745 goto exit;
jksoft 10:c52abd2d6595 746
jksoft 10:c52abd2d6595 747 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
jksoft 10:c52abd2d6595 748 if (len <= 0)
jksoft 10:c52abd2d6595 749 goto exit;
jksoft 10:c52abd2d6595 750 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet
jksoft 10:c52abd2d6595 751 goto exit; // there was a problem
jksoft 10:c52abd2d6595 752
jksoft 10:c52abd2d6595 753 if (waitfor(SUBACK, timer) == SUBACK) // wait for suback
jksoft 10:c52abd2d6595 754 {
jksoft 10:c52abd2d6595 755 int count = 0, grantedQoS = -1;
jksoft 10:c52abd2d6595 756 unsigned short mypacketid;
jksoft 10:c52abd2d6595 757 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jksoft 10:c52abd2d6595 758 rc = grantedQoS; // 0, 1, 2 or 0x80
jksoft 10:c52abd2d6595 759 if (rc != 0x80)
jksoft 10:c52abd2d6595 760 {
jksoft 10:c52abd2d6595 761 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jksoft 10:c52abd2d6595 762 {
jksoft 10:c52abd2d6595 763 if (messageHandlers[i].topicFilter == 0)
jksoft 10:c52abd2d6595 764 {
jksoft 10:c52abd2d6595 765 messageHandlers[i].topicFilter = topicFilter;
jksoft 10:c52abd2d6595 766 messageHandlers[i].fp.attach(messageHandler);
jksoft 10:c52abd2d6595 767 rc = 0;
jksoft 10:c52abd2d6595 768 break;
jksoft 10:c52abd2d6595 769 }
jksoft 10:c52abd2d6595 770 }
jksoft 10:c52abd2d6595 771 }
jksoft 10:c52abd2d6595 772 }
jksoft 10:c52abd2d6595 773 else
jksoft 10:c52abd2d6595 774 rc = FAILURE;
jksoft 10:c52abd2d6595 775
jksoft 10:c52abd2d6595 776 exit:
jksoft 10:c52abd2d6595 777 if (rc != SUCCESS)
jksoft 10:c52abd2d6595 778 cleanSession();
jksoft 10:c52abd2d6595 779 return rc;
jksoft 10:c52abd2d6595 780 }
jksoft 10:c52abd2d6595 781
jksoft 10:c52abd2d6595 782
jksoft 10:c52abd2d6595 783 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS>
jksoft 10:c52abd2d6595 784 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter)
jksoft 10:c52abd2d6595 785 {
jksoft 10:c52abd2d6595 786 int rc = FAILURE;
jksoft 10:c52abd2d6595 787 OldTimer timer(command_timeout_ms);
jksoft 10:c52abd2d6595 788 MQTTString topic = {(char*)topicFilter, {0, 0}};
jksoft 10:c52abd2d6595 789 int len = 0;
jksoft 10:c52abd2d6595 790
jksoft 10:c52abd2d6595 791 if (!isconnected)
jksoft 10:c52abd2d6595 792 goto exit;
jksoft 10:c52abd2d6595 793
jksoft 10:c52abd2d6595 794 if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0)
jksoft 10:c52abd2d6595 795 goto exit;
jksoft 10:c52abd2d6595 796 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet
jksoft 10:c52abd2d6595 797 goto exit; // there was a problem
jksoft 10:c52abd2d6595 798
jksoft 10:c52abd2d6595 799 if (waitfor(UNSUBACK, timer) == UNSUBACK)
jksoft 10:c52abd2d6595 800 {
jksoft 10:c52abd2d6595 801 unsigned short mypacketid; // should be the same as the packetid above
jksoft 10:c52abd2d6595 802 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
jksoft 10:c52abd2d6595 803 {
jksoft 10:c52abd2d6595 804 rc = 0;
jksoft 10:c52abd2d6595 805
jksoft 10:c52abd2d6595 806 // remove the subscription message handler associated with this topic, if there is one
jksoft 10:c52abd2d6595 807 for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
jksoft 10:c52abd2d6595 808 {
jksoft 10:c52abd2d6595 809 if (messageHandlers[i].topicFilter && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0)
jksoft 10:c52abd2d6595 810 {
jksoft 10:c52abd2d6595 811 messageHandlers[i].topicFilter = 0;
jksoft 10:c52abd2d6595 812 break;
jksoft 10:c52abd2d6595 813 }
jksoft 10:c52abd2d6595 814 }
jksoft 10:c52abd2d6595 815 }
jksoft 10:c52abd2d6595 816 }
jksoft 10:c52abd2d6595 817 else
jksoft 10:c52abd2d6595 818 rc = FAILURE;
jksoft 10:c52abd2d6595 819
jksoft 10:c52abd2d6595 820 exit:
jksoft 10:c52abd2d6595 821 if (rc != SUCCESS)
jksoft 10:c52abd2d6595 822 cleanSession();
jksoft 10:c52abd2d6595 823 return rc;
jksoft 10:c52abd2d6595 824 }
jksoft 10:c52abd2d6595 825
jksoft 10:c52abd2d6595 826
jksoft 10:c52abd2d6595 827 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 828 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, OldTimer& timer, enum QoS qos)
jksoft 10:c52abd2d6595 829 {
jksoft 10:c52abd2d6595 830 int rc;
jksoft 10:c52abd2d6595 831
jksoft 10:c52abd2d6595 832 if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet
jksoft 10:c52abd2d6595 833 goto exit; // there was a problem
jksoft 10:c52abd2d6595 834
jksoft 10:c52abd2d6595 835 #if MQTTCLIENT_QOS1
jksoft 10:c52abd2d6595 836 if (qos == QOS1)
jksoft 10:c52abd2d6595 837 {
jksoft 10:c52abd2d6595 838 if (waitfor(PUBACK, timer) == PUBACK)
jksoft 10:c52abd2d6595 839 {
jksoft 10:c52abd2d6595 840 unsigned short mypacketid;
jksoft 10:c52abd2d6595 841 unsigned char dup, type;
jksoft 10:c52abd2d6595 842 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jksoft 10:c52abd2d6595 843 rc = FAILURE;
jksoft 10:c52abd2d6595 844 else if (inflightMsgid == mypacketid)
jksoft 10:c52abd2d6595 845 inflightMsgid = 0;
jksoft 10:c52abd2d6595 846 }
jksoft 10:c52abd2d6595 847 else
jksoft 10:c52abd2d6595 848 rc = FAILURE;
jksoft 10:c52abd2d6595 849 }
jksoft 10:c52abd2d6595 850 #elif MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 851 else if (qos == QOS2)
jksoft 10:c52abd2d6595 852 {
jksoft 10:c52abd2d6595 853 if (waitfor(PUBCOMP, timer) == PUBCOMP)
jksoft 10:c52abd2d6595 854 {
jksoft 10:c52abd2d6595 855 unsigned short mypacketid;
jksoft 10:c52abd2d6595 856 unsigned char dup, type;
jksoft 10:c52abd2d6595 857 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
jksoft 10:c52abd2d6595 858 rc = FAILURE;
jksoft 10:c52abd2d6595 859 else if (inflightMsgid == mypacketid)
jksoft 10:c52abd2d6595 860 inflightMsgid = 0;
jksoft 10:c52abd2d6595 861 }
jksoft 10:c52abd2d6595 862 else
jksoft 10:c52abd2d6595 863 rc = FAILURE;
jksoft 10:c52abd2d6595 864 }
jksoft 10:c52abd2d6595 865 #endif
jksoft 10:c52abd2d6595 866
jksoft 10:c52abd2d6595 867 exit:
jksoft 10:c52abd2d6595 868 if (rc != SUCCESS)
jksoft 10:c52abd2d6595 869 cleanSession();
jksoft 10:c52abd2d6595 870 return rc;
jksoft 10:c52abd2d6595 871 }
jksoft 10:c52abd2d6595 872
jksoft 10:c52abd2d6595 873
jksoft 10:c52abd2d6595 874
jksoft 10:c52abd2d6595 875 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 876 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained)
jksoft 10:c52abd2d6595 877 {
jksoft 10:c52abd2d6595 878 int rc = FAILURE;
jksoft 10:c52abd2d6595 879 OldTimer timer(command_timeout_ms);
jksoft 10:c52abd2d6595 880 MQTTString topicString = MQTTString_initializer;
jksoft 10:c52abd2d6595 881 int len = 0;
jksoft 10:c52abd2d6595 882
jksoft 10:c52abd2d6595 883 if (!isconnected)
jksoft 10:c52abd2d6595 884 goto exit;
jksoft 10:c52abd2d6595 885
jksoft 10:c52abd2d6595 886 topicString.cstring = (char*)topicName;
jksoft 10:c52abd2d6595 887
jksoft 10:c52abd2d6595 888 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 889 if (qos == QOS1 || qos == QOS2)
jksoft 10:c52abd2d6595 890 id = packetid.getNext();
jksoft 10:c52abd2d6595 891 #endif
jksoft 10:c52abd2d6595 892
jksoft 10:c52abd2d6595 893 len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id,
jksoft 10:c52abd2d6595 894 topicString, (unsigned char*)payload, payloadlen);
jksoft 10:c52abd2d6595 895 if (len <= 0)
jksoft 10:c52abd2d6595 896 goto exit;
jksoft 10:c52abd2d6595 897
jksoft 10:c52abd2d6595 898 #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 899 if (!cleansession)
jksoft 10:c52abd2d6595 900 {
jksoft 10:c52abd2d6595 901 memcpy(pubbuf, sendbuf, len);
jksoft 10:c52abd2d6595 902 inflightMsgid = id;
jksoft 10:c52abd2d6595 903 inflightLen = len;
jksoft 10:c52abd2d6595 904 inflightQoS = qos;
jksoft 10:c52abd2d6595 905 #if MQTTCLIENT_QOS2
jksoft 10:c52abd2d6595 906 pubrel = false;
jksoft 10:c52abd2d6595 907 #endif
jksoft 10:c52abd2d6595 908 }
jksoft 10:c52abd2d6595 909 #endif
jksoft 10:c52abd2d6595 910
jksoft 10:c52abd2d6595 911 rc = publish(len, timer, qos);
jksoft 10:c52abd2d6595 912 exit:
jksoft 10:c52abd2d6595 913 return rc;
jksoft 10:c52abd2d6595 914 }
jksoft 10:c52abd2d6595 915
jksoft 10:c52abd2d6595 916
jksoft 10:c52abd2d6595 917 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 918 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained)
jksoft 10:c52abd2d6595 919 {
jksoft 10:c52abd2d6595 920 unsigned short id = 0; // dummy - not used for anything
jksoft 10:c52abd2d6595 921 return publish(topicName, payload, payloadlen, id, qos, retained);
jksoft 10:c52abd2d6595 922 }
jksoft 10:c52abd2d6595 923
jksoft 10:c52abd2d6595 924
jksoft 10:c52abd2d6595 925 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 926 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
jksoft 10:c52abd2d6595 927 {
jksoft 10:c52abd2d6595 928 return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
jksoft 10:c52abd2d6595 929 }
jksoft 10:c52abd2d6595 930
jksoft 10:c52abd2d6595 931
jksoft 10:c52abd2d6595 932 template<class Network, class OldTimer, int MAX_MQTT_PACKET_SIZE, int b>
jksoft 10:c52abd2d6595 933 int MQTT::Client<Network, OldTimer, MAX_MQTT_PACKET_SIZE, b>::disconnect()
jksoft 10:c52abd2d6595 934 {
jksoft 10:c52abd2d6595 935 int rc = FAILURE;
jksoft 10:c52abd2d6595 936 OldTimer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete
jksoft 10:c52abd2d6595 937 int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE);
jksoft 10:c52abd2d6595 938 if (len > 0)
jksoft 10:c52abd2d6595 939 rc = sendPacket(len, timer); // send the disconnect packet
jksoft 10:c52abd2d6595 940
jksoft 10:c52abd2d6595 941 if (cleansession)
jksoft 10:c52abd2d6595 942 cleanSession();
jksoft 10:c52abd2d6595 943 else
jksoft 10:c52abd2d6595 944 isconnected = false;
jksoft 10:c52abd2d6595 945 return rc;
jksoft 10:c52abd2d6595 946 }
jksoft 10:c52abd2d6595 947
jksoft 10:c52abd2d6595 948
jksoft 10:c52abd2d6595 949 #endif