Added function to return network interface.
MQTTClient.h@65:ff8eaf67f510, 2020-03-21 (annotated)
- Committer:
- lamell
- Date:
- Sat Mar 21 16:17:57 2020 -0400
- Revision:
- 65:ff8eaf67f510
- Parent:
- 64:9adad8863791
- Parent:
- 63:90c73a6bfa42
File MQTTClient.h:
Changed MAX_MQTT_PACKET_SIZE = 2048, original size is limited to 100 bytes.
If the message is larher than this limit, then no message will be identified in the server.
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
icraggs | 6:4d312a49200b | 1 | /******************************************************************************* |
icraggs | 53:15b5a280d22d | 2 | * Copyright (c) 2014, 2017 IBM Corp. |
sam_grove | 0:fe461e4d7afe | 3 | * |
icraggs | 6:4d312a49200b | 4 | * All rights reserved. This program and the accompanying materials |
icraggs | 6:4d312a49200b | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
icraggs | 6:4d312a49200b | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
sam_grove | 0:fe461e4d7afe | 7 | * |
icraggs | 6:4d312a49200b | 8 | * The Eclipse Public License is available at |
icraggs | 6:4d312a49200b | 9 | * http://www.eclipse.org/legal/epl-v10.html |
icraggs | 6:4d312a49200b | 10 | * and the Eclipse Distribution License is available at |
icraggs | 6:4d312a49200b | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
sam_grove | 0:fe461e4d7afe | 12 | * |
icraggs | 6:4d312a49200b | 13 | * Contributors: |
icraggs | 6:4d312a49200b | 14 | * Ian Craggs - initial API and implementation and/or initial documentation |
icraggs | 46:e335fcc1a663 | 15 | * Ian Craggs - fix for bug 458512 - QoS 2 messages |
icraggs | 46:e335fcc1a663 | 16 | * Ian Craggs - fix for bug 460389 - send loop uses wrong length |
icraggs | 46:e335fcc1a663 | 17 | * Ian Craggs - fix for bug 464169 - clearing subscriptions |
icraggs | 46:e335fcc1a663 | 18 | * Ian Craggs - fix for bug 464551 - enums and ints can be different size |
icraggs | 53:15b5a280d22d | 19 | * Mark Sonnentag - fix for bug 475204 - inefficient instantiation of Timer |
icraggs | 53:15b5a280d22d | 20 | * Ian Craggs - fix for bug 475749 - packetid modified twice |
icraggs | 53:15b5a280d22d | 21 | * Ian Craggs - add ability to set message handler separately #6 |
icraggs | 6:4d312a49200b | 22 | *******************************************************************************/ |
sam_grove | 0:fe461e4d7afe | 23 | |
icraggs | 2:dcfdd2abfe71 | 24 | #if !defined(MQTTCLIENT_H) |
icraggs | 2:dcfdd2abfe71 | 25 | #define MQTTCLIENT_H |
icraggs | 2:dcfdd2abfe71 | 26 | |
lamell | 60:fccb353e53aa | 27 | #include <string.h> |
icraggs | 34:e18a166198df | 28 | #include "FP.h" |
icraggs | 3:dbff6b768d28 | 29 | #include "MQTTPacket.h" |
icraggs | 53:15b5a280d22d | 30 | #include <stdio.h> |
icraggs | 43:21da1f744243 | 31 | #include "MQTTLogging.h" |
icraggs | 43:21da1f744243 | 32 | |
icraggs | 43:21da1f744243 | 33 | #if !defined(MQTTCLIENT_QOS1) |
icraggs | 43:21da1f744243 | 34 | #define MQTTCLIENT_QOS1 1 |
icraggs | 43:21da1f744243 | 35 | #endif |
icraggs | 43:21da1f744243 | 36 | #if !defined(MQTTCLIENT_QOS2) |
icraggs | 43:21da1f744243 | 37 | #define MQTTCLIENT_QOS2 0 |
icraggs | 43:21da1f744243 | 38 | #endif |
icraggs | 2:dcfdd2abfe71 | 39 | |
icraggs | 3:dbff6b768d28 | 40 | namespace MQTT |
icraggs | 3:dbff6b768d28 | 41 | { |
icraggs | 3:dbff6b768d28 | 42 | |
icraggs | 2:dcfdd2abfe71 | 43 | |
icraggs | 2:dcfdd2abfe71 | 44 | enum QoS { QOS0, QOS1, QOS2 }; |
sam_grove | 0:fe461e4d7afe | 45 | |
icraggs | 31:a51dd239b78e | 46 | // all failure return codes must be negative |
icraggs | 23:05fc7de97d4a | 47 | enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; |
icraggs | 23:05fc7de97d4a | 48 | |
icraggs | 2:dcfdd2abfe71 | 49 | |
icraggs | 3:dbff6b768d28 | 50 | struct Message |
icraggs | 2:dcfdd2abfe71 | 51 | { |
icraggs | 2:dcfdd2abfe71 | 52 | enum QoS qos; |
icraggs | 2:dcfdd2abfe71 | 53 | bool retained; |
icraggs | 2:dcfdd2abfe71 | 54 | bool dup; |
Ian Craggs |
12:cc7f2d62a393 | 55 | unsigned short id; |
icraggs | 2:dcfdd2abfe71 | 56 | void *payload; |
icraggs | 2:dcfdd2abfe71 | 57 | size_t payloadlen; |
sam_grove | 0:fe461e4d7afe | 58 | }; |
sam_grove | 0:fe461e4d7afe | 59 | |
icraggs | 4:4ef00243708e | 60 | |
icraggs | 20:cad3d54d7ecf | 61 | struct MessageData |
icraggs | 20:cad3d54d7ecf | 62 | { |
icraggs | 31:a51dd239b78e | 63 | MessageData(MQTTString &aTopicName, struct Message &aMessage) : message(aMessage), topicName(aTopicName) |
icraggs | 37:e3d64f9b986c | 64 | { } |
icraggs | 43:21da1f744243 | 65 | |
icraggs | 31:a51dd239b78e | 66 | struct Message &message; |
icraggs | 31:a51dd239b78e | 67 | MQTTString &topicName; |
icraggs | 20:cad3d54d7ecf | 68 | }; |
icraggs | 20:cad3d54d7ecf | 69 | |
icraggs | 20:cad3d54d7ecf | 70 | |
icraggs | 53:15b5a280d22d | 71 | struct connackData |
icraggs | 53:15b5a280d22d | 72 | { |
icraggs | 53:15b5a280d22d | 73 | int rc; |
icraggs | 53:15b5a280d22d | 74 | bool sessionPresent; |
icraggs | 53:15b5a280d22d | 75 | }; |
icraggs | 53:15b5a280d22d | 76 | |
icraggs | 53:15b5a280d22d | 77 | |
icraggs | 53:15b5a280d22d | 78 | struct subackData |
icraggs | 53:15b5a280d22d | 79 | { |
icraggs | 53:15b5a280d22d | 80 | int grantedQoS; |
icraggs | 53:15b5a280d22d | 81 | }; |
icraggs | 53:15b5a280d22d | 82 | |
icraggs | 53:15b5a280d22d | 83 | |
icraggs | 9:01b8cc7d94cc | 84 | class PacketId |
icraggs | 9:01b8cc7d94cc | 85 | { |
icraggs | 9:01b8cc7d94cc | 86 | public: |
icraggs | 23:05fc7de97d4a | 87 | PacketId() |
icraggs | 23:05fc7de97d4a | 88 | { |
icraggs | 23:05fc7de97d4a | 89 | next = 0; |
icraggs | 23:05fc7de97d4a | 90 | } |
icraggs | 43:21da1f744243 | 91 | |
icraggs | 23:05fc7de97d4a | 92 | int getNext() |
icraggs | 23:05fc7de97d4a | 93 | { |
icraggs | 53:15b5a280d22d | 94 | return next = (next == MAX_PACKET_ID) ? 1 : next + 1; |
icraggs | 23:05fc7de97d4a | 95 | } |
icraggs | 43:21da1f744243 | 96 | |
icraggs | 9:01b8cc7d94cc | 97 | private: |
icraggs | 9:01b8cc7d94cc | 98 | static const int MAX_PACKET_ID = 65535; |
icraggs | 9:01b8cc7d94cc | 99 | int next; |
icraggs | 9:01b8cc7d94cc | 100 | }; |
icraggs | 31:a51dd239b78e | 101 | |
icraggs | 31:a51dd239b78e | 102 | |
icraggs | 21:e918525e529d | 103 | /** |
icraggs | 21:e918525e529d | 104 | * @class Client |
icraggs | 22:aadb79d29330 | 105 | * @brief blocking, non-threaded MQTT client API |
icraggs | 43:21da1f744243 | 106 | * |
icraggs | 23:05fc7de97d4a | 107 | * This version of the API blocks on all method calls, until they are complete. This means that only one |
icraggs | 43:21da1f744243 | 108 | * MQTT request can be in process at any one time. |
icraggs | 21:e918525e529d | 109 | * @param Network a network class which supports send, receive |
icraggs | 43:21da1f744243 | 110 | * @param Timer a timer class with the methods: |
icraggs | 43:21da1f744243 | 111 | */ |
lamell |
64:9adad8863791 | 112 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 2048, int MAX_MESSAGE_HANDLERS = 5> |
icraggs | 31:a51dd239b78e | 113 | class Client |
icraggs | 2:dcfdd2abfe71 | 114 | { |
icraggs | 43:21da1f744243 | 115 | |
icraggs | 22:aadb79d29330 | 116 | public: |
icraggs | 43:21da1f744243 | 117 | |
icraggs | 31:a51dd239b78e | 118 | typedef void (*messageHandler)(MessageData&); |
icraggs | 23:05fc7de97d4a | 119 | |
icraggs | 23:05fc7de97d4a | 120 | /** Construct the client |
icraggs | 23:05fc7de97d4a | 121 | * @param network - pointer to an instance of the Network class - must be connected to the endpoint |
icraggs | 23:05fc7de97d4a | 122 | * before calling MQTT connect |
icraggs | 23:05fc7de97d4a | 123 | * @param limits an instance of the Limit class - to alter limits as required |
icraggs | 23:05fc7de97d4a | 124 | */ |
icraggs | 43:21da1f744243 | 125 | Client(Network& network, unsigned int command_timeout_ms = 30000); |
icraggs | 43:21da1f744243 | 126 | |
icraggs | 20:cad3d54d7ecf | 127 | /** Set the default message handling callback - used for any message which does not match a subscription message handler |
icraggs | 53:15b5a280d22d | 128 | * @param mh - pointer to the callback function. Set to 0 to remove. |
icraggs | 20:cad3d54d7ecf | 129 | */ |
icraggs | 20:cad3d54d7ecf | 130 | void setDefaultMessageHandler(messageHandler mh) |
icraggs | 20:cad3d54d7ecf | 131 | { |
icraggs | 53:15b5a280d22d | 132 | if (mh != 0) |
icraggs | 53:15b5a280d22d | 133 | defaultMessageHandler.attach(mh); |
icraggs | 53:15b5a280d22d | 134 | else |
icraggs | 53:15b5a280d22d | 135 | defaultMessageHandler.detach(); |
icraggs | 20:cad3d54d7ecf | 136 | } |
icraggs | 42:f5beda831651 | 137 | |
icraggs | 53:15b5a280d22d | 138 | /** Set a message handling callback. This can be used outside of the the subscribe method. |
icraggs | 53:15b5a280d22d | 139 | * @param topicFilter - a topic pattern which can include wildcards |
icraggs | 53:15b5a280d22d | 140 | * @param mh - pointer to the callback function. If 0, removes the callback if any |
icraggs | 53:15b5a280d22d | 141 | */ |
icraggs | 53:15b5a280d22d | 142 | int setMessageHandler(const char* topicFilter, messageHandler mh); |
icraggs | 53:15b5a280d22d | 143 | |
icraggs | 43:21da1f744243 | 144 | /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack |
icraggs | 43:21da1f744243 | 145 | * The nework object must be connected to the network endpoint before calling this |
icraggs | 43:21da1f744243 | 146 | * Default connect options are used |
icraggs | 43:21da1f744243 | 147 | * @return success code - |
icraggs | 43:21da1f744243 | 148 | */ |
icraggs | 43:21da1f744243 | 149 | int connect(); |
icraggs | 53:15b5a280d22d | 150 | |
icraggs | 53:15b5a280d22d | 151 | /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack |
icraggs | 43:21da1f744243 | 152 | * The nework object must be connected to the network endpoint before calling this |
icraggs | 20:cad3d54d7ecf | 153 | * @param options - connect options |
icraggs | 43:21da1f744243 | 154 | * @return success code - |
icraggs | 43:21da1f744243 | 155 | */ |
icraggs | 43:21da1f744243 | 156 | int connect(MQTTPacket_connectData& options); |
icraggs | 43:21da1f744243 | 157 | |
icraggs | 53:15b5a280d22d | 158 | /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack |
icraggs | 53:15b5a280d22d | 159 | * The nework object must be connected to the network endpoint before calling this |
icraggs | 53:15b5a280d22d | 160 | * @param options - connect options |
icraggs | 53:15b5a280d22d | 161 | * @param connackData - connack data to be returned |
icraggs | 53:15b5a280d22d | 162 | * @return success code - |
icraggs | 53:15b5a280d22d | 163 | */ |
icraggs | 53:15b5a280d22d | 164 | int connect(MQTTPacket_connectData& options, connackData& data); |
icraggs | 53:15b5a280d22d | 165 | |
icraggs | 20:cad3d54d7ecf | 166 | /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs |
icraggs | 20:cad3d54d7ecf | 167 | * @param topic - the topic to publish to |
icraggs | 20:cad3d54d7ecf | 168 | * @param message - the message to send |
icraggs | 43:21da1f744243 | 169 | * @return success code - |
icraggs | 43:21da1f744243 | 170 | */ |
icraggs | 43:21da1f744243 | 171 | int publish(const char* topicName, Message& message); |
icraggs | 53:15b5a280d22d | 172 | |
icraggs | 43:21da1f744243 | 173 | /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs |
icraggs | 43:21da1f744243 | 174 | * @param topic - the topic to publish to |
icraggs | 43:21da1f744243 | 175 | * @param payload - the data to send |
icraggs | 43:21da1f744243 | 176 | * @param payloadlen - the length of the data |
icraggs | 43:21da1f744243 | 177 | * @param qos - the QoS to send the publish at |
icraggs | 43:21da1f744243 | 178 | * @param retained - whether the message should be retained |
icraggs | 43:21da1f744243 | 179 | * @return success code - |
icraggs | 43:21da1f744243 | 180 | */ |
icraggs | 43:21da1f744243 | 181 | int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); |
icraggs | 53:15b5a280d22d | 182 | |
icraggs | 43:21da1f744243 | 183 | /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs |
icraggs | 43:21da1f744243 | 184 | * @param topic - the topic to publish to |
icraggs | 43:21da1f744243 | 185 | * @param payload - the data to send |
icraggs | 43:21da1f744243 | 186 | * @param payloadlen - the length of the data |
icraggs | 53:15b5a280d22d | 187 | * @param id - the packet id used - returned |
icraggs | 43:21da1f744243 | 188 | * @param qos - the QoS to send the publish at |
icraggs | 43:21da1f744243 | 189 | * @param retained - whether the message should be retained |
icraggs | 43:21da1f744243 | 190 | * @return success code - |
icraggs | 43:21da1f744243 | 191 | */ |
icraggs | 43:21da1f744243 | 192 | int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); |
icraggs | 43:21da1f744243 | 193 | |
icraggs | 20:cad3d54d7ecf | 194 | /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback |
icraggs | 20:cad3d54d7ecf | 195 | * @param topicFilter - a topic pattern which can include wildcards |
icraggs | 20:cad3d54d7ecf | 196 | * @param qos - the MQTT QoS to subscribe at |
icraggs | 20:cad3d54d7ecf | 197 | * @param mh - the callback function to be invoked when a message is received for this subscription |
icraggs | 43:21da1f744243 | 198 | * @return success code - |
icraggs | 43:21da1f744243 | 199 | */ |
icraggs | 20:cad3d54d7ecf | 200 | int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh); |
icraggs | 43:21da1f744243 | 201 | |
icraggs | 53:15b5a280d22d | 202 | /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback |
icraggs | 53:15b5a280d22d | 203 | * @param topicFilter - a topic pattern which can include wildcards |
icraggs | 53:15b5a280d22d | 204 | * @param qos - the MQTT QoS to subscribe at© |
icraggs | 53:15b5a280d22d | 205 | * @param mh - the callback function to be invoked when a message is received for this subscription |
icraggs | 53:15b5a280d22d | 206 | * @param |
icraggs | 53:15b5a280d22d | 207 | * @return success code - |
icraggs | 53:15b5a280d22d | 208 | */ |
icraggs | 53:15b5a280d22d | 209 | int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, subackData &data); |
icraggs | 53:15b5a280d22d | 210 | |
icraggs | 20:cad3d54d7ecf | 211 | /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback |
icraggs | 20:cad3d54d7ecf | 212 | * @param topicFilter - a topic pattern which can include wildcards |
icraggs | 43:21da1f744243 | 213 | * @return success code - |
icraggs | 43:21da1f744243 | 214 | */ |
icraggs | 20:cad3d54d7ecf | 215 | int unsubscribe(const char* topicFilter); |
icraggs | 43:21da1f744243 | 216 | |
icraggs | 31:a51dd239b78e | 217 | /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state |
icraggs | 43:21da1f744243 | 218 | * @return success code - |
icraggs | 20:cad3d54d7ecf | 219 | */ |
icraggs | 20:cad3d54d7ecf | 220 | int disconnect(); |
icraggs | 43:21da1f744243 | 221 | |
icraggs | 20:cad3d54d7ecf | 222 | /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive |
icraggs | 43:21da1f744243 | 223 | * yield can be called if no other MQTT operation is needed. This will also allow messages to be |
icraggs | 20:cad3d54d7ecf | 224 | * received. |
icraggs | 23:05fc7de97d4a | 225 | * @param timeout_ms the time to wait, in milliseconds |
icraggs | 26:2658bb87c53d | 226 | * @return success code - on failure, this means the client has disconnected |
icraggs | 20:cad3d54d7ecf | 227 | */ |
icraggs | 43:21da1f744243 | 228 | int yield(unsigned long timeout_ms = 1000L); |
icraggs | 43:21da1f744243 | 229 | |
icraggs | 43:21da1f744243 | 230 | /** Is the client connected? |
icraggs | 43:21da1f744243 | 231 | * @return flag - is the client connected or not? |
icraggs | 43:21da1f744243 | 232 | */ |
icraggs | 43:21da1f744243 | 233 | bool isConnected() |
icraggs | 43:21da1f744243 | 234 | { |
icraggs | 43:21da1f744243 | 235 | return isconnected; |
icraggs | 43:21da1f744243 | 236 | } |
icraggs | 43:21da1f744243 | 237 | |
icraggs | 20:cad3d54d7ecf | 238 | private: |
icraggs | 20:cad3d54d7ecf | 239 | |
icraggs | 53:15b5a280d22d | 240 | void closeSession(); |
icraggs | 46:e335fcc1a663 | 241 | void cleanSession(); |
icraggs | 20:cad3d54d7ecf | 242 | int cycle(Timer& timer); |
icraggs | 20:cad3d54d7ecf | 243 | int waitfor(int packet_type, Timer& timer); |
icraggs | 20:cad3d54d7ecf | 244 | int keepalive(); |
icraggs | 43:21da1f744243 | 245 | int publish(int len, Timer& timer, enum QoS qos); |
icraggs | 2:dcfdd2abfe71 | 246 | |
icraggs | 3:dbff6b768d28 | 247 | int decodePacket(int* value, int timeout); |
icraggs | 20:cad3d54d7ecf | 248 | int readPacket(Timer& timer); |
icraggs | 20:cad3d54d7ecf | 249 | int sendPacket(int length, Timer& timer); |
icraggs | 26:2658bb87c53d | 250 | int deliverMessage(MQTTString& topicName, Message& message); |
icraggs | 26:2658bb87c53d | 251 | bool isTopicMatched(char* topicFilter, MQTTString& topicName); |
icraggs | 43:21da1f744243 | 252 | |
icraggs | 23:05fc7de97d4a | 253 | Network& ipstack; |
icraggs | 43:21da1f744243 | 254 | unsigned long command_timeout_ms; |
icraggs | 15:64a57183aa03 | 255 | |
icraggs | 43:21da1f744243 | 256 | unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; |
icraggs | 43:21da1f744243 | 257 | unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; |
icraggs | 43:21da1f744243 | 258 | |
icraggs | 43:21da1f744243 | 259 | Timer last_sent, last_received; |
icraggs | 15:64a57183aa03 | 260 | unsigned int keepAliveInterval; |
icraggs | 20:cad3d54d7ecf | 261 | bool ping_outstanding; |
icraggs | 43:21da1f744243 | 262 | bool cleansession; |
icraggs | 43:21da1f744243 | 263 | |
icraggs | 9:01b8cc7d94cc | 264 | PacketId packetid; |
icraggs | 43:21da1f744243 | 265 | |
icraggs | 16:91c2f9a144d4 | 266 | struct MessageHandlers |
icraggs | 15:64a57183aa03 | 267 | { |
icraggs | 26:2658bb87c53d | 268 | const char* topicFilter; |
icraggs | 31:a51dd239b78e | 269 | FP<void, MessageData&> fp; |
icraggs | 23:05fc7de97d4a | 270 | } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic |
icraggs | 43:21da1f744243 | 271 | |
icraggs | 31:a51dd239b78e | 272 | FP<void, MessageData&> defaultMessageHandler; |
icraggs | 43:21da1f744243 | 273 | |
icraggs | 26:2658bb87c53d | 274 | bool isconnected; |
icraggs | 43:21da1f744243 | 275 | |
icraggs | 43:21da1f744243 | 276 | #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 277 | unsigned char pubbuf[MAX_MQTT_PACKET_SIZE]; // store the last publish for sending on reconnect |
icraggs | 43:21da1f744243 | 278 | int inflightLen; |
icraggs | 43:21da1f744243 | 279 | unsigned short inflightMsgid; |
icraggs | 43:21da1f744243 | 280 | enum QoS inflightQoS; |
icraggs | 43:21da1f744243 | 281 | #endif |
icraggs | 43:21da1f744243 | 282 | |
icraggs | 43:21da1f744243 | 283 | #if MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 284 | bool pubrel; |
icraggs | 43:21da1f744243 | 285 | #if !defined(MAX_INCOMING_QOS2_MESSAGES) |
icraggs | 43:21da1f744243 | 286 | #define MAX_INCOMING_QOS2_MESSAGES 10 |
icraggs | 43:21da1f744243 | 287 | #endif |
icraggs | 43:21da1f744243 | 288 | unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; |
icraggs | 43:21da1f744243 | 289 | bool isQoS2msgidFree(unsigned short id); |
icraggs | 43:21da1f744243 | 290 | bool useQoS2msgid(unsigned short id); |
icraggs | 46:e335fcc1a663 | 291 | void freeQoS2msgid(unsigned short id); |
icraggs | 31:a51dd239b78e | 292 | #endif |
icraggs | 28:8b2abe9bd814 | 293 | |
sam_grove | 0:fe461e4d7afe | 294 | }; |
sam_grove | 0:fe461e4d7afe | 295 | |
icraggs | 15:64a57183aa03 | 296 | } |
icraggs | 15:64a57183aa03 | 297 | |
icraggs | 15:64a57183aa03 | 298 | |
icraggs | 43:21da1f744243 | 299 | template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> |
icraggs | 53:15b5a280d22d | 300 | void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::cleanSession() |
icraggs | 46:e335fcc1a663 | 301 | { |
icraggs | 46:e335fcc1a663 | 302 | for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 46:e335fcc1a663 | 303 | messageHandlers[i].topicFilter = 0; |
icraggs | 46:e335fcc1a663 | 304 | |
icraggs | 46:e335fcc1a663 | 305 | #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 |
icraggs | 46:e335fcc1a663 | 306 | inflightMsgid = 0; |
icraggs | 46:e335fcc1a663 | 307 | inflightQoS = QOS0; |
icraggs | 46:e335fcc1a663 | 308 | #endif |
icraggs | 46:e335fcc1a663 | 309 | |
icraggs | 46:e335fcc1a663 | 310 | #if MQTTCLIENT_QOS2 |
icraggs | 46:e335fcc1a663 | 311 | pubrel = false; |
icraggs | 46:e335fcc1a663 | 312 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 46:e335fcc1a663 | 313 | incomingQoS2messages[i] = 0; |
icraggs | 46:e335fcc1a663 | 314 | #endif |
icraggs | 46:e335fcc1a663 | 315 | } |
icraggs | 46:e335fcc1a663 | 316 | |
icraggs | 46:e335fcc1a663 | 317 | |
icraggs | 46:e335fcc1a663 | 318 | template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> |
icraggs | 53:15b5a280d22d | 319 | void MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::closeSession() |
icraggs | 53:15b5a280d22d | 320 | { |
icraggs | 53:15b5a280d22d | 321 | ping_outstanding = false; |
icraggs | 53:15b5a280d22d | 322 | isconnected = false; |
icraggs | 53:15b5a280d22d | 323 | if (cleansession) |
icraggs | 53:15b5a280d22d | 324 | cleanSession(); |
icraggs | 53:15b5a280d22d | 325 | } |
icraggs | 53:15b5a280d22d | 326 | |
icraggs | 53:15b5a280d22d | 327 | |
icraggs | 53:15b5a280d22d | 328 | template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> |
icraggs | 23:05fc7de97d4a | 329 | MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() |
icraggs | 15:64a57183aa03 | 330 | { |
icraggs | 43:21da1f744243 | 331 | this->command_timeout_ms = command_timeout_ms; |
icraggs | 53:15b5a280d22d | 332 | cleansession = true; |
icraggs | 53:15b5a280d22d | 333 | closeSession(); |
icraggs | 46:e335fcc1a663 | 334 | } |
icraggs | 43:21da1f744243 | 335 | |
icraggs | 43:21da1f744243 | 336 | |
icraggs | 43:21da1f744243 | 337 | #if MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 338 | template<class Network, class Timer, int a, int b> |
icraggs | 43:21da1f744243 | 339 | bool MQTT::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) |
icraggs | 43:21da1f744243 | 340 | { |
icraggs | 43:21da1f744243 | 341 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 43:21da1f744243 | 342 | { |
icraggs | 43:21da1f744243 | 343 | if (incomingQoS2messages[i] == id) |
icraggs | 43:21da1f744243 | 344 | return false; |
icraggs | 43:21da1f744243 | 345 | } |
icraggs | 43:21da1f744243 | 346 | return true; |
icraggs | 11:db15da110a37 | 347 | } |
icraggs | 11:db15da110a37 | 348 | |
icraggs | 11:db15da110a37 | 349 | |
icraggs | 43:21da1f744243 | 350 | template<class Network, class Timer, int a, int b> |
icraggs | 43:21da1f744243 | 351 | bool MQTT::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) |
icraggs | 43:21da1f744243 | 352 | { |
icraggs | 43:21da1f744243 | 353 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 43:21da1f744243 | 354 | { |
icraggs | 43:21da1f744243 | 355 | if (incomingQoS2messages[i] == 0) |
icraggs | 43:21da1f744243 | 356 | { |
icraggs | 43:21da1f744243 | 357 | incomingQoS2messages[i] = id; |
icraggs | 43:21da1f744243 | 358 | return true; |
icraggs | 43:21da1f744243 | 359 | } |
icraggs | 43:21da1f744243 | 360 | } |
icraggs | 43:21da1f744243 | 361 | return false; |
icraggs | 43:21da1f744243 | 362 | } |
icraggs | 46:e335fcc1a663 | 363 | |
icraggs | 46:e335fcc1a663 | 364 | |
icraggs | 46:e335fcc1a663 | 365 | template<class Network, class Timer, int a, int b> |
icraggs | 46:e335fcc1a663 | 366 | void MQTT::Client<Network, Timer, a, b>::freeQoS2msgid(unsigned short id) |
icraggs | 46:e335fcc1a663 | 367 | { |
icraggs | 46:e335fcc1a663 | 368 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 46:e335fcc1a663 | 369 | { |
icraggs | 46:e335fcc1a663 | 370 | if (incomingQoS2messages[i] == id) |
icraggs | 46:e335fcc1a663 | 371 | { |
icraggs | 46:e335fcc1a663 | 372 | incomingQoS2messages[i] = 0; |
icraggs | 46:e335fcc1a663 | 373 | return; |
icraggs | 46:e335fcc1a663 | 374 | } |
icraggs | 46:e335fcc1a663 | 375 | } |
icraggs | 46:e335fcc1a663 | 376 | } |
icraggs | 43:21da1f744243 | 377 | #endif |
icraggs | 43:21da1f744243 | 378 | |
icraggs | 43:21da1f744243 | 379 | |
icraggs | 43:21da1f744243 | 380 | template<class Network, class Timer, int a, int b> |
icraggs | 23:05fc7de97d4a | 381 | int MQTT::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) |
icraggs | 8:c46930bd6c82 | 382 | { |
icraggs | 43:21da1f744243 | 383 | int rc = FAILURE, |
icraggs | 23:05fc7de97d4a | 384 | sent = 0; |
icraggs | 43:21da1f744243 | 385 | |
Ian Craggs |
57:3513ee54ebb4 | 386 | while (sent < length) |
icraggs | 23:05fc7de97d4a | 387 | { |
icraggs | 46:e335fcc1a663 | 388 | rc = ipstack.write(&sendbuf[sent], length - sent, timer.left_ms()); |
icraggs | 26:2658bb87c53d | 389 | if (rc < 0) // there was an error writing the data |
icraggs | 26:2658bb87c53d | 390 | break; |
icraggs | 26:2658bb87c53d | 391 | sent += rc; |
Ian Craggs |
57:3513ee54ebb4 | 392 | if (timer.expired()) // only check expiry after at least one attempt to write |
Ian Craggs |
57:3513ee54ebb4 | 393 | break; |
Ian Craggs |
57:3513ee54ebb4 | 394 | } |
icraggs | 20:cad3d54d7ecf | 395 | if (sent == length) |
icraggs | 23:05fc7de97d4a | 396 | { |
icraggs | 43:21da1f744243 | 397 | if (this->keepAliveInterval > 0) |
icraggs | 43:21da1f744243 | 398 | last_sent.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet |
icraggs | 23:05fc7de97d4a | 399 | rc = SUCCESS; |
icraggs | 23:05fc7de97d4a | 400 | } |
icraggs | 23:05fc7de97d4a | 401 | else |
icraggs | 23:05fc7de97d4a | 402 | rc = FAILURE; |
icraggs | 53:15b5a280d22d | 403 | |
icraggs | 43:21da1f744243 | 404 | #if defined(MQTT_DEBUG) |
icraggs | 46:e335fcc1a663 | 405 | char printbuf[150]; |
icraggs | 56:71ae1a773b64 | 406 | DEBUG("Rc %d from sending packet %s\r\n", rc, |
icraggs | 54:ff9e5c4b52d0 | 407 | MQTTFormat_toServerString(printbuf, sizeof(printbuf), sendbuf, length)); |
icraggs | 43:21da1f744243 | 408 | #endif |
icraggs | 23:05fc7de97d4a | 409 | return rc; |
icraggs | 8:c46930bd6c82 | 410 | } |
icraggs | 8:c46930bd6c82 | 411 | |
icraggs | 8:c46930bd6c82 | 412 | |
icraggs | 43:21da1f744243 | 413 | template<class Network, class Timer, int a, int b> |
icraggs | 26:2658bb87c53d | 414 | int MQTT::Client<Network, Timer, a, b>::decodePacket(int* value, int timeout) |
icraggs | 8:c46930bd6c82 | 415 | { |
icraggs | 36:2f1ada427e56 | 416 | unsigned char c; |
icraggs | 8:c46930bd6c82 | 417 | int multiplier = 1; |
icraggs | 8:c46930bd6c82 | 418 | int len = 0; |
icraggs | 20:cad3d54d7ecf | 419 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
icraggs | 8:c46930bd6c82 | 420 | |
icraggs | 8:c46930bd6c82 | 421 | *value = 0; |
icraggs | 8:c46930bd6c82 | 422 | do |
icraggs | 8:c46930bd6c82 | 423 | { |
icraggs | 8:c46930bd6c82 | 424 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 8:c46930bd6c82 | 425 | |
icraggs | 8:c46930bd6c82 | 426 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 8:c46930bd6c82 | 427 | { |
icraggs | 8:c46930bd6c82 | 428 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 8:c46930bd6c82 | 429 | goto exit; |
icraggs | 8:c46930bd6c82 | 430 | } |
icraggs | 23:05fc7de97d4a | 431 | rc = ipstack.read(&c, 1, timeout); |
icraggs | 8:c46930bd6c82 | 432 | if (rc != 1) |
icraggs | 8:c46930bd6c82 | 433 | goto exit; |
icraggs | 8:c46930bd6c82 | 434 | *value += (c & 127) * multiplier; |
icraggs | 8:c46930bd6c82 | 435 | multiplier *= 128; |
icraggs | 8:c46930bd6c82 | 436 | } while ((c & 128) != 0); |
icraggs | 8:c46930bd6c82 | 437 | exit: |
icraggs | 8:c46930bd6c82 | 438 | return len; |
icraggs | 8:c46930bd6c82 | 439 | } |
icraggs | 8:c46930bd6c82 | 440 | |
icraggs | 8:c46930bd6c82 | 441 | |
icraggs | 8:c46930bd6c82 | 442 | /** |
icraggs | 8:c46930bd6c82 | 443 | * If any read fails in this method, then we should disconnect from the network, as on reconnect |
icraggs | 43:21da1f744243 | 444 | * the packets can be retried. |
icraggs | 8:c46930bd6c82 | 445 | * @param timeout the max time to wait for the packet read to complete, in milliseconds |
icraggs | 53:15b5a280d22d | 446 | * @return the MQTT packet type, 0 if none, -1 if error |
icraggs | 8:c46930bd6c82 | 447 | */ |
icraggs | 46:e335fcc1a663 | 448 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 46:e335fcc1a663 | 449 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::readPacket(Timer& timer) |
icraggs | 8:c46930bd6c82 | 450 | { |
icraggs | 26:2658bb87c53d | 451 | int rc = FAILURE; |
icraggs | 8:c46930bd6c82 | 452 | MQTTHeader header = {0}; |
icraggs | 8:c46930bd6c82 | 453 | int len = 0; |
icraggs | 8:c46930bd6c82 | 454 | int rem_len = 0; |
icraggs | 8:c46930bd6c82 | 455 | |
icraggs | 8:c46930bd6c82 | 456 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 53:15b5a280d22d | 457 | rc = ipstack.read(readbuf, 1, timer.left_ms()); |
icraggs | 53:15b5a280d22d | 458 | if (rc != 1) |
icraggs | 8:c46930bd6c82 | 459 | goto exit; |
icraggs | 8:c46930bd6c82 | 460 | |
icraggs | 8:c46930bd6c82 | 461 | len = 1; |
icraggs | 8:c46930bd6c82 | 462 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 20:cad3d54d7ecf | 463 | decodePacket(&rem_len, timer.left_ms()); |
icraggs | 43:21da1f744243 | 464 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
icraggs | 8:c46930bd6c82 | 465 | |
icraggs | 46:e335fcc1a663 | 466 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
icraggs | 46:e335fcc1a663 | 467 | { |
icraggs | 46:e335fcc1a663 | 468 | rc = BUFFER_OVERFLOW; |
icraggs | 46:e335fcc1a663 | 469 | goto exit; |
icraggs | 46:e335fcc1a663 | 470 | } |
icraggs | 46:e335fcc1a663 | 471 | |
icraggs | 8:c46930bd6c82 | 472 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 43:21da1f744243 | 473 | if (rem_len > 0 && (ipstack.read(readbuf + len, rem_len, timer.left_ms()) != rem_len)) |
icraggs | 8:c46930bd6c82 | 474 | goto exit; |
icraggs | 8:c46930bd6c82 | 475 | |
icraggs | 8:c46930bd6c82 | 476 | header.byte = readbuf[0]; |
icraggs | 8:c46930bd6c82 | 477 | rc = header.bits.type; |
icraggs | 43:21da1f744243 | 478 | if (this->keepAliveInterval > 0) |
icraggs | 43:21da1f744243 | 479 | last_received.countdown(this->keepAliveInterval); // record the fact that we have successfully received a packet |
icraggs | 8:c46930bd6c82 | 480 | exit: |
icraggs | 53:15b5a280d22d | 481 | |
icraggs | 43:21da1f744243 | 482 | #if defined(MQTT_DEBUG) |
icraggs | 46:e335fcc1a663 | 483 | if (rc >= 0) |
icraggs | 46:e335fcc1a663 | 484 | { |
icraggs | 46:e335fcc1a663 | 485 | char printbuf[50]; |
icraggs | 56:71ae1a773b64 | 486 | DEBUG("Rc %d receiving packet %s\r\n", rc, |
icraggs | 53:15b5a280d22d | 487 | MQTTFormat_toClientString(printbuf, sizeof(printbuf), readbuf, len)); |
icraggs | 46:e335fcc1a663 | 488 | } |
icraggs | 43:21da1f744243 | 489 | #endif |
icraggs | 8:c46930bd6c82 | 490 | return rc; |
icraggs | 3:dbff6b768d28 | 491 | } |
icraggs | 3:dbff6b768d28 | 492 | |
icraggs | 8:c46930bd6c82 | 493 | |
icraggs | 26:2658bb87c53d | 494 | // assume topic filter and name is in correct format |
icraggs | 26:2658bb87c53d | 495 | // # can only be at end |
icraggs | 26:2658bb87c53d | 496 | // + and # can only be next to separator |
icraggs | 43:21da1f744243 | 497 | template<class Network, class Timer, int a, int b> |
icraggs | 26:2658bb87c53d | 498 | bool MQTT::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTString& topicName) |
icraggs | 26:2658bb87c53d | 499 | { |
icraggs | 26:2658bb87c53d | 500 | char* curf = topicFilter; |
icraggs | 26:2658bb87c53d | 501 | char* curn = topicName.lenstring.data; |
icraggs | 26:2658bb87c53d | 502 | char* curn_end = curn + topicName.lenstring.len; |
icraggs | 43:21da1f744243 | 503 | |
icraggs | 26:2658bb87c53d | 504 | while (*curf && curn < curn_end) |
icraggs | 26:2658bb87c53d | 505 | { |
icraggs | 26:2658bb87c53d | 506 | if (*curn == '/' && *curf != '/') |
icraggs | 26:2658bb87c53d | 507 | break; |
icraggs | 26:2658bb87c53d | 508 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
icraggs | 26:2658bb87c53d | 509 | break; |
icraggs | 26:2658bb87c53d | 510 | if (*curf == '+') |
icraggs | 26:2658bb87c53d | 511 | { // skip until we meet the next separator, or end of string |
icraggs | 26:2658bb87c53d | 512 | char* nextpos = curn + 1; |
icraggs | 26:2658bb87c53d | 513 | while (nextpos < curn_end && *nextpos != '/') |
icraggs | 26:2658bb87c53d | 514 | nextpos = ++curn + 1; |
icraggs | 26:2658bb87c53d | 515 | } |
icraggs | 26:2658bb87c53d | 516 | else if (*curf == '#') |
icraggs | 26:2658bb87c53d | 517 | curn = curn_end - 1; // skip until end of string |
icraggs | 26:2658bb87c53d | 518 | curf++; |
icraggs | 26:2658bb87c53d | 519 | curn++; |
icraggs | 26:2658bb87c53d | 520 | }; |
icraggs | 43:21da1f744243 | 521 | |
icraggs | 26:2658bb87c53d | 522 | return (curn == curn_end) && (*curf == '\0'); |
icraggs | 26:2658bb87c53d | 523 | } |
icraggs | 26:2658bb87c53d | 524 | |
icraggs | 26:2658bb87c53d | 525 | |
icraggs | 26:2658bb87c53d | 526 | |
icraggs | 43:21da1f744243 | 527 | template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> |
icraggs | 26:2658bb87c53d | 528 | int MQTT::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTString& topicName, Message& message) |
icraggs | 15:64a57183aa03 | 529 | { |
icraggs | 26:2658bb87c53d | 530 | int rc = FAILURE; |
icraggs | 20:cad3d54d7ecf | 531 | |
icraggs | 20:cad3d54d7ecf | 532 | // we have to find the right message handler - indexed by topic |
icraggs | 23:05fc7de97d4a | 533 | for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 20:cad3d54d7ecf | 534 | { |
icraggs | 26:2658bb87c53d | 535 | if (messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(&topicName, (char*)messageHandlers[i].topicFilter) || |
icraggs | 26:2658bb87c53d | 536 | isTopicMatched((char*)messageHandlers[i].topicFilter, topicName))) |
icraggs | 20:cad3d54d7ecf | 537 | { |
icraggs | 26:2658bb87c53d | 538 | if (messageHandlers[i].fp.attached()) |
icraggs | 26:2658bb87c53d | 539 | { |
icraggs | 31:a51dd239b78e | 540 | MessageData md(topicName, message); |
icraggs | 31:a51dd239b78e | 541 | messageHandlers[i].fp(md); |
icraggs | 26:2658bb87c53d | 542 | rc = SUCCESS; |
icraggs | 26:2658bb87c53d | 543 | } |
icraggs | 20:cad3d54d7ecf | 544 | } |
icraggs | 20:cad3d54d7ecf | 545 | } |
icraggs | 43:21da1f744243 | 546 | |
icraggs | 43:21da1f744243 | 547 | if (rc == FAILURE && defaultMessageHandler.attached()) |
icraggs | 26:2658bb87c53d | 548 | { |
icraggs | 31:a51dd239b78e | 549 | MessageData md(topicName, message); |
icraggs | 31:a51dd239b78e | 550 | defaultMessageHandler(md); |
icraggs | 26:2658bb87c53d | 551 | rc = SUCCESS; |
icraggs | 43:21da1f744243 | 552 | } |
icraggs | 43:21da1f744243 | 553 | |
icraggs | 20:cad3d54d7ecf | 554 | return rc; |
icraggs | 15:64a57183aa03 | 555 | } |
icraggs | 15:64a57183aa03 | 556 | |
icraggs | 15:64a57183aa03 | 557 | |
icraggs | 20:cad3d54d7ecf | 558 | |
icraggs | 43:21da1f744243 | 559 | template<class Network, class Timer, int a, int b> |
icraggs | 43:21da1f744243 | 560 | int MQTT::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) |
icraggs | 20:cad3d54d7ecf | 561 | { |
icraggs | 26:2658bb87c53d | 562 | int rc = SUCCESS; |
icraggs | 53:15b5a280d22d | 563 | Timer timer; |
icraggs | 43:21da1f744243 | 564 | |
icraggs | 23:05fc7de97d4a | 565 | timer.countdown_ms(timeout_ms); |
icraggs | 20:cad3d54d7ecf | 566 | while (!timer.expired()) |
icraggs | 26:2658bb87c53d | 567 | { |
icraggs | 46:e335fcc1a663 | 568 | if (cycle(timer) < 0) |
icraggs | 26:2658bb87c53d | 569 | { |
icraggs | 26:2658bb87c53d | 570 | rc = FAILURE; |
icraggs | 26:2658bb87c53d | 571 | break; |
icraggs | 26:2658bb87c53d | 572 | } |
icraggs | 26:2658bb87c53d | 573 | } |
icraggs | 43:21da1f744243 | 574 | |
icraggs | 26:2658bb87c53d | 575 | return rc; |
icraggs | 20:cad3d54d7ecf | 576 | } |
icraggs | 20:cad3d54d7ecf | 577 | |
icraggs | 20:cad3d54d7ecf | 578 | |
icraggs | 43:21da1f744243 | 579 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 23:05fc7de97d4a | 580 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::cycle(Timer& timer) |
icraggs | 8:c46930bd6c82 | 581 | { |
icraggs | 53:15b5a280d22d | 582 | // get one piece of work off the wire and one pass through |
icraggs | 26:2658bb87c53d | 583 | int len = 0, |
icraggs | 26:2658bb87c53d | 584 | rc = SUCCESS; |
icraggs | 28:8b2abe9bd814 | 585 | |
icraggs | 53:15b5a280d22d | 586 | int packet_type = readPacket(timer); // read the socket, see what work is due |
icraggs | 53:15b5a280d22d | 587 | |
icraggs | 8:c46930bd6c82 | 588 | switch (packet_type) |
icraggs | 8:c46930bd6c82 | 589 | { |
icraggs | 53:15b5a280d22d | 590 | default: |
icraggs | 53:15b5a280d22d | 591 | // no more data to read, unrecoverable. Or read packet fails due to unexpected network error |
icraggs | 46:e335fcc1a663 | 592 | rc = packet_type; |
icraggs | 53:15b5a280d22d | 593 | goto exit; |
icraggs | 53:15b5a280d22d | 594 | case 0: // timed out reading packet |
icraggs | 46:e335fcc1a663 | 595 | break; |
icraggs | 15:64a57183aa03 | 596 | case CONNACK: |
icraggs | 8:c46930bd6c82 | 597 | case PUBACK: |
icraggs | 8:c46930bd6c82 | 598 | case SUBACK: |
icraggs | 8:c46930bd6c82 | 599 | break; |
icraggs | 15:64a57183aa03 | 600 | case PUBLISH: |
icraggs | 46:e335fcc1a663 | 601 | { |
icraggs | 46:e335fcc1a663 | 602 | MQTTString topicName = MQTTString_initializer; |
icraggs | 20:cad3d54d7ecf | 603 | Message msg; |
icraggs | 46:e335fcc1a663 | 604 | int intQoS; |
icraggs | 53:15b5a280d22d | 605 | msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ |
icraggs | 46:e335fcc1a663 | 606 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, &intQoS, (unsigned char*)&msg.retained, (unsigned short*)&msg.id, &topicName, |
icraggs | 36:2f1ada427e56 | 607 | (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
icraggs | 26:2658bb87c53d | 608 | goto exit; |
icraggs | 46:e335fcc1a663 | 609 | msg.qos = (enum QoS)intQoS; |
icraggs | 43:21da1f744243 | 610 | #if MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 611 | if (msg.qos != QOS2) |
icraggs | 43:21da1f744243 | 612 | #endif |
icraggs | 31:a51dd239b78e | 613 | deliverMessage(topicName, msg); |
icraggs | 43:21da1f744243 | 614 | #if MQTTCLIENT_QOS2 |
icraggs | 31:a51dd239b78e | 615 | else if (isQoS2msgidFree(msg.id)) |
icraggs | 31:a51dd239b78e | 616 | { |
icraggs | 43:21da1f744243 | 617 | if (useQoS2msgid(msg.id)) |
icraggs | 43:21da1f744243 | 618 | deliverMessage(topicName, msg); |
icraggs | 43:21da1f744243 | 619 | else |
icraggs | 43:21da1f744243 | 620 | WARN("Maximum number of incoming QoS2 messages exceeded"); |
icraggs | 46:e335fcc1a663 | 621 | } |
icraggs | 31:a51dd239b78e | 622 | #endif |
icraggs | 43:21da1f744243 | 623 | #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 |
icraggs | 20:cad3d54d7ecf | 624 | if (msg.qos != QOS0) |
icraggs | 20:cad3d54d7ecf | 625 | { |
icraggs | 20:cad3d54d7ecf | 626 | if (msg.qos == QOS1) |
icraggs | 43:21da1f744243 | 627 | len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBACK, 0, msg.id); |
icraggs | 20:cad3d54d7ecf | 628 | else if (msg.qos == QOS2) |
icraggs | 43:21da1f744243 | 629 | len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREC, 0, msg.id); |
icraggs | 26:2658bb87c53d | 630 | if (len <= 0) |
icraggs | 26:2658bb87c53d | 631 | rc = FAILURE; |
icraggs | 26:2658bb87c53d | 632 | else |
icraggs | 26:2658bb87c53d | 633 | rc = sendPacket(len, timer); |
icraggs | 26:2658bb87c53d | 634 | if (rc == FAILURE) |
icraggs | 20:cad3d54d7ecf | 635 | goto exit; // there was a problem |
icraggs | 20:cad3d54d7ecf | 636 | } |
Ian Craggs |
12:cc7f2d62a393 | 637 | break; |
icraggs | 43:21da1f744243 | 638 | #endif |
icraggs | 46:e335fcc1a663 | 639 | } |
icraggs | 43:21da1f744243 | 640 | #if MQTTCLIENT_QOS2 |
icraggs | 15:64a57183aa03 | 641 | case PUBREC: |
icraggs | 46:e335fcc1a663 | 642 | case PUBREL: |
icraggs | 36:2f1ada427e56 | 643 | unsigned short mypacketid; |
icraggs | 36:2f1ada427e56 | 644 | unsigned char dup, type; |
icraggs | 26:2658bb87c53d | 645 | if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
icraggs | 26:2658bb87c53d | 646 | rc = FAILURE; |
icraggs | 53:15b5a280d22d | 647 | else if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, |
icraggs | 53:15b5a280d22d | 648 | (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) |
icraggs | 26:2658bb87c53d | 649 | rc = FAILURE; |
icraggs | 26:2658bb87c53d | 650 | else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet |
icraggs | 26:2658bb87c53d | 651 | rc = FAILURE; // there was a problem |
icraggs | 26:2658bb87c53d | 652 | if (rc == FAILURE) |
icraggs | 20:cad3d54d7ecf | 653 | goto exit; // there was a problem |
icraggs | 46:e335fcc1a663 | 654 | if (packet_type == PUBREL) |
icraggs | 46:e335fcc1a663 | 655 | freeQoS2msgid(mypacketid); |
icraggs | 8:c46930bd6c82 | 656 | break; |
icraggs | 53:15b5a280d22d | 657 | |
icraggs | 8:c46930bd6c82 | 658 | case PUBCOMP: |
icraggs | 8:c46930bd6c82 | 659 | break; |
icraggs | 43:21da1f744243 | 660 | #endif |
icraggs | 15:64a57183aa03 | 661 | case PINGRESP: |
icraggs | 20:cad3d54d7ecf | 662 | ping_outstanding = false; |
icraggs | 8:c46930bd6c82 | 663 | break; |
icraggs | 15:64a57183aa03 | 664 | } |
icraggs | 53:15b5a280d22d | 665 | |
icraggs | 53:15b5a280d22d | 666 | if (keepalive() != SUCCESS) |
icraggs | 53:15b5a280d22d | 667 | //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT |
icraggs | 53:15b5a280d22d | 668 | rc = FAILURE; |
icraggs | 53:15b5a280d22d | 669 | |
Ian Craggs |
12:cc7f2d62a393 | 670 | exit: |
icraggs | 26:2658bb87c53d | 671 | if (rc == SUCCESS) |
icraggs | 26:2658bb87c53d | 672 | rc = packet_type; |
icraggs | 53:15b5a280d22d | 673 | else if (isconnected) |
icraggs | 53:15b5a280d22d | 674 | closeSession(); |
icraggs | 26:2658bb87c53d | 675 | return rc; |
icraggs | 15:64a57183aa03 | 676 | } |
icraggs | 15:64a57183aa03 | 677 | |
icraggs | 15:64a57183aa03 | 678 | |
icraggs | 23:05fc7de97d4a | 679 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 23:05fc7de97d4a | 680 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::keepalive() |
icraggs | 15:64a57183aa03 | 681 | { |
icraggs | 53:15b5a280d22d | 682 | int rc = SUCCESS; |
icraggs | 54:ff9e5c4b52d0 | 683 | static Timer ping_sent; |
icraggs | 15:64a57183aa03 | 684 | |
icraggs | 20:cad3d54d7ecf | 685 | if (keepAliveInterval == 0) |
icraggs | 20:cad3d54d7ecf | 686 | goto exit; |
icraggs | 54:ff9e5c4b52d0 | 687 | |
icraggs | 54:ff9e5c4b52d0 | 688 | if (ping_outstanding) |
icraggs | 20:cad3d54d7ecf | 689 | { |
icraggs | 54:ff9e5c4b52d0 | 690 | if (ping_sent.expired()) |
icraggs | 53:15b5a280d22d | 691 | { |
icraggs | 53:15b5a280d22d | 692 | rc = FAILURE; // session failure |
icraggs | 53:15b5a280d22d | 693 | #if defined(MQTT_DEBUG) |
icraggs | 56:71ae1a773b64 | 694 | DEBUG("PINGRESP not received in keepalive interval\r\n"); |
icraggs | 53:15b5a280d22d | 695 | #endif |
icraggs | 53:15b5a280d22d | 696 | } |
icraggs | 54:ff9e5c4b52d0 | 697 | } |
icraggs | 54:ff9e5c4b52d0 | 698 | else if (last_sent.expired() || last_received.expired()) |
icraggs | 54:ff9e5c4b52d0 | 699 | { |
icraggs | 54:ff9e5c4b52d0 | 700 | Timer timer(1000); |
icraggs | 54:ff9e5c4b52d0 | 701 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
icraggs | 54:ff9e5c4b52d0 | 702 | if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet |
icraggs | 20:cad3d54d7ecf | 703 | { |
icraggs | 54:ff9e5c4b52d0 | 704 | ping_outstanding = true; |
icraggs | 54:ff9e5c4b52d0 | 705 | ping_sent.countdown(this->keepAliveInterval); |
icraggs | 20:cad3d54d7ecf | 706 | } |
icraggs | 20:cad3d54d7ecf | 707 | } |
icraggs | 15:64a57183aa03 | 708 | exit: |
icraggs | 20:cad3d54d7ecf | 709 | return rc; |
icraggs | 8:c46930bd6c82 | 710 | } |
icraggs | 8:c46930bd6c82 | 711 | |
icraggs | 8:c46930bd6c82 | 712 | |
icraggs | 16:91c2f9a144d4 | 713 | // only used in single-threaded mode where one command at a time is in process |
icraggs | 43:21da1f744243 | 714 | template<class Network, class Timer, int a, int b> |
icraggs | 23:05fc7de97d4a | 715 | int MQTT::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) |
icraggs | 15:64a57183aa03 | 716 | { |
icraggs | 26:2658bb87c53d | 717 | int rc = FAILURE; |
icraggs | 43:21da1f744243 | 718 | |
icraggs | 20:cad3d54d7ecf | 719 | do |
icraggs | 16:91c2f9a144d4 | 720 | { |
icraggs | 43:21da1f744243 | 721 | if (timer.expired()) |
icraggs | 20:cad3d54d7ecf | 722 | break; // we timed out |
icraggs | 53:15b5a280d22d | 723 | rc = cycle(timer); |
icraggs | 20:cad3d54d7ecf | 724 | } |
icraggs | 53:15b5a280d22d | 725 | while (rc != packet_type && rc >= 0); |
icraggs | 43:21da1f744243 | 726 | |
icraggs | 20:cad3d54d7ecf | 727 | return rc; |
icraggs | 16:91c2f9a144d4 | 728 | } |
icraggs | 16:91c2f9a144d4 | 729 | |
icraggs | 16:91c2f9a144d4 | 730 | |
icraggs | 43:21da1f744243 | 731 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 53:15b5a280d22d | 732 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options, connackData& data) |
icraggs | 16:91c2f9a144d4 | 733 | { |
icraggs | 46:e335fcc1a663 | 734 | Timer connect_timer(command_timeout_ms); |
icraggs | 26:2658bb87c53d | 735 | int rc = FAILURE; |
icraggs | 31:a51dd239b78e | 736 | int len = 0; |
icraggs | 43:21da1f744243 | 737 | |
icraggs | 31:a51dd239b78e | 738 | if (isconnected) // don't send connect packet again if we are already connected |
icraggs | 31:a51dd239b78e | 739 | goto exit; |
icraggs | 8:c46930bd6c82 | 740 | |
icraggs | 43:21da1f744243 | 741 | this->keepAliveInterval = options.keepAliveInterval; |
icraggs | 43:21da1f744243 | 742 | this->cleansession = options.cleansession; |
icraggs | 43:21da1f744243 | 743 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) |
icraggs | 26:2658bb87c53d | 744 | goto exit; |
icraggs | 26:2658bb87c53d | 745 | if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet |
icraggs | 20:cad3d54d7ecf | 746 | goto exit; // there was a problem |
icraggs | 43:21da1f744243 | 747 | |
icraggs | 43:21da1f744243 | 748 | if (this->keepAliveInterval > 0) |
icraggs | 43:21da1f744243 | 749 | last_received.countdown(this->keepAliveInterval); |
icraggs | 20:cad3d54d7ecf | 750 | // this will be a blocking call, wait for the connack |
icraggs | 20:cad3d54d7ecf | 751 | if (waitfor(CONNACK, connect_timer) == CONNACK) |
icraggs | 15:64a57183aa03 | 752 | { |
icraggs | 53:15b5a280d22d | 753 | data.rc = 0; |
icraggs | 53:15b5a280d22d | 754 | data.sessionPresent = false; |
icraggs | 53:15b5a280d22d | 755 | if (MQTTDeserialize_connack((unsigned char*)&data.sessionPresent, |
icraggs | 53:15b5a280d22d | 756 | (unsigned char*)&data.rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
icraggs | 53:15b5a280d22d | 757 | rc = data.rc; |
icraggs | 26:2658bb87c53d | 758 | else |
icraggs | 26:2658bb87c53d | 759 | rc = FAILURE; |
icraggs | 8:c46930bd6c82 | 760 | } |
icraggs | 26:2658bb87c53d | 761 | else |
icraggs | 26:2658bb87c53d | 762 | rc = FAILURE; |
icraggs | 46:e335fcc1a663 | 763 | |
icraggs | 43:21da1f744243 | 764 | #if MQTTCLIENT_QOS2 |
icraggs | 46:e335fcc1a663 | 765 | // resend any inflight publish |
icraggs | 46:e335fcc1a663 | 766 | if (inflightMsgid > 0 && inflightQoS == QOS2 && pubrel) |
icraggs | 43:21da1f744243 | 767 | { |
icraggs | 43:21da1f744243 | 768 | if ((len = MQTTSerialize_ack(sendbuf, MAX_MQTT_PACKET_SIZE, PUBREL, 0, inflightMsgid)) <= 0) |
icraggs | 43:21da1f744243 | 769 | rc = FAILURE; |
icraggs | 43:21da1f744243 | 770 | else |
icraggs | 43:21da1f744243 | 771 | rc = publish(len, connect_timer, inflightQoS); |
icraggs | 43:21da1f744243 | 772 | } |
icraggs | 43:21da1f744243 | 773 | else |
icraggs | 43:21da1f744243 | 774 | #endif |
icraggs | 43:21da1f744243 | 775 | #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 776 | if (inflightMsgid > 0) |
icraggs | 43:21da1f744243 | 777 | { |
icraggs | 43:21da1f744243 | 778 | memcpy(sendbuf, pubbuf, MAX_MQTT_PACKET_SIZE); |
icraggs | 43:21da1f744243 | 779 | rc = publish(inflightLen, connect_timer, inflightQoS); |
icraggs | 43:21da1f744243 | 780 | } |
icraggs | 43:21da1f744243 | 781 | #endif |
icraggs | 43:21da1f744243 | 782 | |
icraggs | 15:64a57183aa03 | 783 | exit: |
icraggs | 26:2658bb87c53d | 784 | if (rc == SUCCESS) |
icraggs | 53:15b5a280d22d | 785 | { |
icraggs | 26:2658bb87c53d | 786 | isconnected = true; |
icraggs | 53:15b5a280d22d | 787 | ping_outstanding = false; |
icraggs | 53:15b5a280d22d | 788 | } |
icraggs | 8:c46930bd6c82 | 789 | return rc; |
icraggs | 8:c46930bd6c82 | 790 | } |
icraggs | 8:c46930bd6c82 | 791 | |
icraggs | 8:c46930bd6c82 | 792 | |
icraggs | 43:21da1f744243 | 793 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 53:15b5a280d22d | 794 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect(MQTTPacket_connectData& options) |
icraggs | 53:15b5a280d22d | 795 | { |
icraggs | 53:15b5a280d22d | 796 | connackData data; |
icraggs | 53:15b5a280d22d | 797 | return connect(options, data); |
icraggs | 53:15b5a280d22d | 798 | } |
icraggs | 53:15b5a280d22d | 799 | |
icraggs | 53:15b5a280d22d | 800 | |
icraggs | 53:15b5a280d22d | 801 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 43:21da1f744243 | 802 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::connect() |
icraggs | 43:21da1f744243 | 803 | { |
icraggs | 43:21da1f744243 | 804 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
icraggs | 43:21da1f744243 | 805 | return connect(default_options); |
icraggs | 43:21da1f744243 | 806 | } |
icraggs | 43:21da1f744243 | 807 | |
icraggs | 43:21da1f744243 | 808 | |
icraggs | 43:21da1f744243 | 809 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> |
icraggs | 53:15b5a280d22d | 810 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::setMessageHandler(const char* topicFilter, messageHandler messageHandler) |
icraggs | 53:15b5a280d22d | 811 | { |
icraggs | 53:15b5a280d22d | 812 | int rc = FAILURE; |
icraggs | 53:15b5a280d22d | 813 | int i = -1; |
icraggs | 53:15b5a280d22d | 814 | |
icraggs | 53:15b5a280d22d | 815 | // first check for an existing matching slot |
icraggs | 53:15b5a280d22d | 816 | for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 53:15b5a280d22d | 817 | { |
icraggs | 53:15b5a280d22d | 818 | if (messageHandlers[i].topicFilter != 0 && strcmp(messageHandlers[i].topicFilter, topicFilter) == 0) |
icraggs | 53:15b5a280d22d | 819 | { |
icraggs | 53:15b5a280d22d | 820 | if (messageHandler == 0) // remove existing |
icraggs | 53:15b5a280d22d | 821 | { |
icraggs | 53:15b5a280d22d | 822 | messageHandlers[i].topicFilter = 0; |
icraggs | 53:15b5a280d22d | 823 | messageHandlers[i].fp.detach(); |
icraggs | 53:15b5a280d22d | 824 | } |
icraggs | 53:15b5a280d22d | 825 | rc = SUCCESS; // return i when adding new subscription |
icraggs | 53:15b5a280d22d | 826 | break; |
icraggs | 53:15b5a280d22d | 827 | } |
icraggs | 53:15b5a280d22d | 828 | } |
icraggs | 53:15b5a280d22d | 829 | // if no existing, look for empty slot (unless we are removing) |
icraggs | 53:15b5a280d22d | 830 | if (messageHandler != 0) { |
icraggs | 53:15b5a280d22d | 831 | if (rc == FAILURE) |
icraggs | 53:15b5a280d22d | 832 | { |
icraggs | 53:15b5a280d22d | 833 | for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 53:15b5a280d22d | 834 | { |
icraggs | 53:15b5a280d22d | 835 | if (messageHandlers[i].topicFilter == 0) |
icraggs | 53:15b5a280d22d | 836 | { |
icraggs | 53:15b5a280d22d | 837 | rc = SUCCESS; |
icraggs | 53:15b5a280d22d | 838 | break; |
icraggs | 53:15b5a280d22d | 839 | } |
icraggs | 53:15b5a280d22d | 840 | } |
icraggs | 53:15b5a280d22d | 841 | } |
icraggs | 53:15b5a280d22d | 842 | if (i < MAX_MESSAGE_HANDLERS) |
icraggs | 53:15b5a280d22d | 843 | { |
icraggs | 53:15b5a280d22d | 844 | messageHandlers[i].topicFilter = topicFilter; |
icraggs | 53:15b5a280d22d | 845 | messageHandlers[i].fp.attach(messageHandler); |
icraggs | 53:15b5a280d22d | 846 | } |
icraggs | 53:15b5a280d22d | 847 | } |
icraggs | 53:15b5a280d22d | 848 | return rc; |
icraggs | 53:15b5a280d22d | 849 | } |
icraggs | 53:15b5a280d22d | 850 | |
icraggs | 53:15b5a280d22d | 851 | |
icraggs | 53:15b5a280d22d | 852 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> |
icraggs | 53:15b5a280d22d | 853 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, |
icraggs | 53:15b5a280d22d | 854 | enum QoS qos, messageHandler messageHandler, subackData& data) |
icraggs | 43:21da1f744243 | 855 | { |
icraggs | 43:21da1f744243 | 856 | int rc = FAILURE; |
icraggs | 46:e335fcc1a663 | 857 | Timer timer(command_timeout_ms); |
icraggs | 26:2658bb87c53d | 858 | int len = 0; |
icraggs | 46:e335fcc1a663 | 859 | MQTTString topic = {(char*)topicFilter, {0, 0}}; |
icraggs | 43:21da1f744243 | 860 | |
icraggs | 26:2658bb87c53d | 861 | if (!isconnected) |
icraggs | 20:cad3d54d7ecf | 862 | goto exit; |
icraggs | 43:21da1f744243 | 863 | |
icraggs | 43:21da1f744243 | 864 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
icraggs | 26:2658bb87c53d | 865 | if (len <= 0) |
icraggs | 26:2658bb87c53d | 866 | goto exit; |
icraggs | 23:05fc7de97d4a | 867 | if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet |
icraggs | 26:2658bb87c53d | 868 | goto exit; // there was a problem |
icraggs | 43:21da1f744243 | 869 | |
icraggs | 43:21da1f744243 | 870 | if (waitfor(SUBACK, timer) == SUBACK) // wait for suback |
icraggs | 8:c46930bd6c82 | 871 | { |
icraggs | 53:15b5a280d22d | 872 | int count = 0; |
icraggs | 36:2f1ada427e56 | 873 | unsigned short mypacketid; |
icraggs | 53:15b5a280d22d | 874 | data.grantedQoS = 0; |
icraggs | 53:15b5a280d22d | 875 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &data.grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
icraggs | 8:c46930bd6c82 | 876 | { |
icraggs | 53:15b5a280d22d | 877 | if (data.grantedQoS != 0x80) |
icraggs | 53:15b5a280d22d | 878 | rc = setMessageHandler(topicFilter, messageHandler); |
icraggs | 8:c46930bd6c82 | 879 | } |
icraggs | 8:c46930bd6c82 | 880 | } |
icraggs | 43:21da1f744243 | 881 | else |
icraggs | 26:2658bb87c53d | 882 | rc = FAILURE; |
icraggs | 43:21da1f744243 | 883 | |
icraggs | 15:64a57183aa03 | 884 | exit: |
icraggs | 53:15b5a280d22d | 885 | if (rc == FAILURE) |
icraggs | 53:15b5a280d22d | 886 | closeSession(); |
Ian Craggs |
12:cc7f2d62a393 | 887 | return rc; |
icraggs | 15:64a57183aa03 | 888 | } |
icraggs | 15:64a57183aa03 | 889 | |
icraggs | 15:64a57183aa03 | 890 | |
icraggs | 43:21da1f744243 | 891 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> |
icraggs | 53:15b5a280d22d | 892 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler) |
icraggs | 53:15b5a280d22d | 893 | { |
icraggs | 53:15b5a280d22d | 894 | subackData data; |
icraggs | 53:15b5a280d22d | 895 | return subscribe(topicFilter, qos, messageHandler, data); |
icraggs | 53:15b5a280d22d | 896 | } |
icraggs | 53:15b5a280d22d | 897 | |
icraggs | 53:15b5a280d22d | 898 | |
icraggs | 53:15b5a280d22d | 899 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> |
icraggs | 23:05fc7de97d4a | 900 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(const char* topicFilter) |
icraggs | 43:21da1f744243 | 901 | { |
icraggs | 26:2658bb87c53d | 902 | int rc = FAILURE; |
icraggs | 46:e335fcc1a663 | 903 | Timer timer(command_timeout_ms); |
icraggs | 46:e335fcc1a663 | 904 | MQTTString topic = {(char*)topicFilter, {0, 0}}; |
icraggs | 31:a51dd239b78e | 905 | int len = 0; |
icraggs | 43:21da1f744243 | 906 | |
icraggs | 31:a51dd239b78e | 907 | if (!isconnected) |
icraggs | 31:a51dd239b78e | 908 | goto exit; |
icraggs | 43:21da1f744243 | 909 | |
icraggs | 43:21da1f744243 | 910 | if ((len = MQTTSerialize_unsubscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic)) <= 0) |
icraggs | 20:cad3d54d7ecf | 911 | goto exit; |
icraggs | 43:21da1f744243 | 912 | if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet |
icraggs | 20:cad3d54d7ecf | 913 | goto exit; // there was a problem |
icraggs | 43:21da1f744243 | 914 | |
icraggs | 20:cad3d54d7ecf | 915 | if (waitfor(UNSUBACK, timer) == UNSUBACK) |
Ian Craggs |
12:cc7f2d62a393 | 916 | { |
icraggs | 36:2f1ada427e56 | 917 | unsigned short mypacketid; // should be the same as the packetid above |
icraggs | 23:05fc7de97d4a | 918 | if (MQTTDeserialize_unsuback(&mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
icraggs | 46:e335fcc1a663 | 919 | { |
icraggs | 46:e335fcc1a663 | 920 | // remove the subscription message handler associated with this topic, if there is one |
icraggs | 53:15b5a280d22d | 921 | setMessageHandler(topicFilter, 0); |
icraggs | 46:e335fcc1a663 | 922 | } |
Ian Craggs |
12:cc7f2d62a393 | 923 | } |
icraggs | 26:2658bb87c53d | 924 | else |
icraggs | 26:2658bb87c53d | 925 | rc = FAILURE; |
icraggs | 43:21da1f744243 | 926 | |
icraggs | 15:64a57183aa03 | 927 | exit: |
icraggs | 43:21da1f744243 | 928 | if (rc != SUCCESS) |
icraggs | 53:15b5a280d22d | 929 | closeSession(); |
Ian Craggs |
12:cc7f2d62a393 | 930 | return rc; |
icraggs | 15:64a57183aa03 | 931 | } |
icraggs | 15:64a57183aa03 | 932 | |
icraggs | 15:64a57183aa03 | 933 | |
icraggs | 43:21da1f744243 | 934 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 43:21da1f744243 | 935 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) |
icraggs | 20:cad3d54d7ecf | 936 | { |
icraggs | 43:21da1f744243 | 937 | int rc; |
icraggs | 46:e335fcc1a663 | 938 | |
icraggs | 43:21da1f744243 | 939 | if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet |
icraggs | 43:21da1f744243 | 940 | goto exit; // there was a problem |
icraggs | 15:64a57183aa03 | 941 | |
icraggs | 46:e335fcc1a663 | 942 | #if MQTTCLIENT_QOS1 |
icraggs | 43:21da1f744243 | 943 | if (qos == QOS1) |
Ian Craggs |
12:cc7f2d62a393 | 944 | { |
icraggs | 20:cad3d54d7ecf | 945 | if (waitfor(PUBACK, timer) == PUBACK) |
icraggs | 20:cad3d54d7ecf | 946 | { |
icraggs | 36:2f1ada427e56 | 947 | unsigned short mypacketid; |
icraggs | 36:2f1ada427e56 | 948 | unsigned char dup, type; |
icraggs | 26:2658bb87c53d | 949 | if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
icraggs | 26:2658bb87c53d | 950 | rc = FAILURE; |
icraggs | 43:21da1f744243 | 951 | else if (inflightMsgid == mypacketid) |
icraggs | 43:21da1f744243 | 952 | inflightMsgid = 0; |
icraggs | 20:cad3d54d7ecf | 953 | } |
icraggs | 26:2658bb87c53d | 954 | else |
icraggs | 26:2658bb87c53d | 955 | rc = FAILURE; |
Ian Craggs |
12:cc7f2d62a393 | 956 | } |
icraggs | 53:15b5a280d22d | 957 | #endif |
icraggs | 53:15b5a280d22d | 958 | #if MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 959 | else if (qos == QOS2) |
Ian Craggs |
12:cc7f2d62a393 | 960 | { |
icraggs | 20:cad3d54d7ecf | 961 | if (waitfor(PUBCOMP, timer) == PUBCOMP) |
icraggs | 20:cad3d54d7ecf | 962 | { |
icraggs | 36:2f1ada427e56 | 963 | unsigned short mypacketid; |
icraggs | 36:2f1ada427e56 | 964 | unsigned char dup, type; |
icraggs | 26:2658bb87c53d | 965 | if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
icraggs | 26:2658bb87c53d | 966 | rc = FAILURE; |
icraggs | 43:21da1f744243 | 967 | else if (inflightMsgid == mypacketid) |
icraggs | 43:21da1f744243 | 968 | inflightMsgid = 0; |
icraggs | 20:cad3d54d7ecf | 969 | } |
icraggs | 26:2658bb87c53d | 970 | else |
icraggs | 26:2658bb87c53d | 971 | rc = FAILURE; |
Ian Craggs |
12:cc7f2d62a393 | 972 | } |
icraggs | 43:21da1f744243 | 973 | #endif |
icraggs | 43:21da1f744243 | 974 | |
icraggs | 43:21da1f744243 | 975 | exit: |
icraggs | 43:21da1f744243 | 976 | if (rc != SUCCESS) |
icraggs | 53:15b5a280d22d | 977 | closeSession(); |
icraggs | 43:21da1f744243 | 978 | return rc; |
icraggs | 43:21da1f744243 | 979 | } |
icraggs | 43:21da1f744243 | 980 | |
icraggs | 43:21da1f744243 | 981 | |
icraggs | 43:21da1f744243 | 982 | |
icraggs | 43:21da1f744243 | 983 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 43:21da1f744243 | 984 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) |
icraggs | 43:21da1f744243 | 985 | { |
icraggs | 43:21da1f744243 | 986 | int rc = FAILURE; |
icraggs | 46:e335fcc1a663 | 987 | Timer timer(command_timeout_ms); |
icraggs | 43:21da1f744243 | 988 | MQTTString topicString = MQTTString_initializer; |
icraggs | 43:21da1f744243 | 989 | int len = 0; |
icraggs | 43:21da1f744243 | 990 | |
icraggs | 43:21da1f744243 | 991 | if (!isconnected) |
icraggs | 43:21da1f744243 | 992 | goto exit; |
icraggs | 46:e335fcc1a663 | 993 | |
icraggs | 43:21da1f744243 | 994 | topicString.cstring = (char*)topicName; |
icraggs | 43:21da1f744243 | 995 | |
icraggs | 43:21da1f744243 | 996 | #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 997 | if (qos == QOS1 || qos == QOS2) |
icraggs | 43:21da1f744243 | 998 | id = packetid.getNext(); |
icraggs | 43:21da1f744243 | 999 | #endif |
icraggs | 43:21da1f744243 | 1000 | |
icraggs | 43:21da1f744243 | 1001 | len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, qos, retained, id, |
icraggs | 43:21da1f744243 | 1002 | topicString, (unsigned char*)payload, payloadlen); |
icraggs | 43:21da1f744243 | 1003 | if (len <= 0) |
icraggs | 43:21da1f744243 | 1004 | goto exit; |
icraggs | 46:e335fcc1a663 | 1005 | |
icraggs | 43:21da1f744243 | 1006 | #if MQTTCLIENT_QOS1 || MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 1007 | if (!cleansession) |
icraggs | 43:21da1f744243 | 1008 | { |
icraggs | 43:21da1f744243 | 1009 | memcpy(pubbuf, sendbuf, len); |
icraggs | 43:21da1f744243 | 1010 | inflightMsgid = id; |
icraggs | 43:21da1f744243 | 1011 | inflightLen = len; |
icraggs | 43:21da1f744243 | 1012 | inflightQoS = qos; |
icraggs | 43:21da1f744243 | 1013 | #if MQTTCLIENT_QOS2 |
icraggs | 43:21da1f744243 | 1014 | pubrel = false; |
icraggs | 43:21da1f744243 | 1015 | #endif |
icraggs | 43:21da1f744243 | 1016 | } |
icraggs | 43:21da1f744243 | 1017 | #endif |
icraggs | 46:e335fcc1a663 | 1018 | |
icraggs | 43:21da1f744243 | 1019 | rc = publish(len, timer, qos); |
icraggs | 15:64a57183aa03 | 1020 | exit: |
icraggs | 8:c46930bd6c82 | 1021 | return rc; |
icraggs | 8:c46930bd6c82 | 1022 | } |
icraggs | 8:c46930bd6c82 | 1023 | |
icraggs | 8:c46930bd6c82 | 1024 | |
icraggs | 43:21da1f744243 | 1025 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 43:21da1f744243 | 1026 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) |
icraggs | 43:21da1f744243 | 1027 | { |
icraggs | 43:21da1f744243 | 1028 | unsigned short id = 0; // dummy - not used for anything |
icraggs | 43:21da1f744243 | 1029 | return publish(topicName, payload, payloadlen, id, qos, retained); |
icraggs | 43:21da1f744243 | 1030 | } |
icraggs | 43:21da1f744243 | 1031 | |
icraggs | 43:21da1f744243 | 1032 | |
icraggs | 43:21da1f744243 | 1033 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 43:21da1f744243 | 1034 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message) |
icraggs | 43:21da1f744243 | 1035 | { |
icraggs | 43:21da1f744243 | 1036 | return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); |
icraggs | 43:21da1f744243 | 1037 | } |
icraggs | 43:21da1f744243 | 1038 | |
icraggs | 43:21da1f744243 | 1039 | |
icraggs | 43:21da1f744243 | 1040 | template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b> |
icraggs | 23:05fc7de97d4a | 1041 | int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::disconnect() |
icraggs | 43:21da1f744243 | 1042 | { |
icraggs | 26:2658bb87c53d | 1043 | int rc = FAILURE; |
icraggs | 53:15b5a280d22d | 1044 | Timer timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete |
icraggs | 44:c299463ae853 | 1045 | int len = MQTTSerialize_disconnect(sendbuf, MAX_MQTT_PACKET_SIZE); |
icraggs | 26:2658bb87c53d | 1046 | if (len > 0) |
icraggs | 26:2658bb87c53d | 1047 | rc = sendPacket(len, timer); // send the disconnect packet |
icraggs | 53:15b5a280d22d | 1048 | closeSession(); |
icraggs | 23:05fc7de97d4a | 1049 | return rc; |
icraggs | 20:cad3d54d7ecf | 1050 | } |
icraggs | 20:cad3d54d7ecf | 1051 | |
icraggs | 53:15b5a280d22d | 1052 | #endif |