..
Dependencies: FP
MQTTSNClient.h@5:7fb937012f6c, 2019-01-15 (annotated)
- Committer:
- ImranBilalButt
- Date:
- Tue Jan 15 17:12:08 2019 +0000
- Revision:
- 5:7fb937012f6c
- Parent:
- 3:d85693ff9ec8
- Child:
- 6:1957f23ac496
..;
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
icraggs | 0:ae83cacd60d2 | 1 | /******************************************************************************* |
icraggs | 0:ae83cacd60d2 | 2 | * Copyright (c) 2014, 2015 IBM Corp. |
icraggs | 0:ae83cacd60d2 | 3 | * |
icraggs | 0:ae83cacd60d2 | 4 | * All rights reserved. This program and the accompanying materials |
icraggs | 0:ae83cacd60d2 | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
icraggs | 0:ae83cacd60d2 | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
icraggs | 0:ae83cacd60d2 | 7 | * |
icraggs | 0:ae83cacd60d2 | 8 | * The Eclipse Public License is available at |
icraggs | 0:ae83cacd60d2 | 9 | * http://www.eclipse.org/legal/epl-v10.html |
icraggs | 0:ae83cacd60d2 | 10 | * and the Eclipse Distribution License is available at |
icraggs | 0:ae83cacd60d2 | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
icraggs | 0:ae83cacd60d2 | 12 | * |
icraggs | 0:ae83cacd60d2 | 13 | * Contributors: |
icraggs | 0:ae83cacd60d2 | 14 | * Ian Craggs - initial API and implementation and/or initial documentation |
icraggs | 0:ae83cacd60d2 | 15 | *******************************************************************************/ |
icraggs | 0:ae83cacd60d2 | 16 | |
icraggs | 0:ae83cacd60d2 | 17 | #if !defined(MQTTSNCLIENT_H) |
icraggs | 0:ae83cacd60d2 | 18 | #define MQTTSNCLIENT_H |
icraggs | 0:ae83cacd60d2 | 19 | |
icraggs | 0:ae83cacd60d2 | 20 | #include "FP.h" |
icraggs | 0:ae83cacd60d2 | 21 | #include "MQTTSNPacket.h" |
icraggs | 0:ae83cacd60d2 | 22 | #include "MQTTLogging.h" |
icraggs | 0:ae83cacd60d2 | 23 | |
icraggs | 0:ae83cacd60d2 | 24 | // Data limits |
icraggs | 0:ae83cacd60d2 | 25 | #if !defined(MAX_REGISTRATIONS) |
icraggs | 0:ae83cacd60d2 | 26 | #define MAX_REGISTRATIONS 5 |
icraggs | 0:ae83cacd60d2 | 27 | #endif |
icraggs | 0:ae83cacd60d2 | 28 | #if !defined(MAX_REGISTRATION_TOPIC_NAME_LENGTH) |
icraggs | 0:ae83cacd60d2 | 29 | #define MAX_REGISTRATION_TOPIC_NAME_LENGTH 20 |
icraggs | 0:ae83cacd60d2 | 30 | #endif |
icraggs | 0:ae83cacd60d2 | 31 | #if !defined(MAX_INCOMING_QOS2_MESSAGES) |
icraggs | 0:ae83cacd60d2 | 32 | #define MAX_INCOMING_QOS2_MESSAGES 10 |
icraggs | 0:ae83cacd60d2 | 33 | #endif |
icraggs | 0:ae83cacd60d2 | 34 | |
icraggs | 0:ae83cacd60d2 | 35 | #if !defined(MQTTSNCLIENT_QOS1) |
icraggs | 0:ae83cacd60d2 | 36 | #define MQTTSNCLIENT_QOS1 1 |
icraggs | 0:ae83cacd60d2 | 37 | #endif |
icraggs | 0:ae83cacd60d2 | 38 | #if !defined(MQTTSNCLIENT_QOS2) |
m_ahsan | 3:d85693ff9ec8 | 39 | #define MQTTSNCLIENT_QOS2 1 |
icraggs | 0:ae83cacd60d2 | 40 | #endif |
icraggs | 0:ae83cacd60d2 | 41 | |
icraggs | 0:ae83cacd60d2 | 42 | namespace MQTTSN |
icraggs | 0:ae83cacd60d2 | 43 | { |
icraggs | 0:ae83cacd60d2 | 44 | |
icraggs | 0:ae83cacd60d2 | 45 | |
icraggs | 0:ae83cacd60d2 | 46 | enum QoS { QOS0, QOS1, QOS2 }; |
icraggs | 0:ae83cacd60d2 | 47 | |
icraggs | 0:ae83cacd60d2 | 48 | // all failure return codes must be negative |
icraggs | 0:ae83cacd60d2 | 49 | enum returnCode { MAX_SUBSCRIPTIONS_EXCEEDED = -3, BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; |
icraggs | 0:ae83cacd60d2 | 50 | |
icraggs | 0:ae83cacd60d2 | 51 | |
icraggs | 0:ae83cacd60d2 | 52 | struct Message |
icraggs | 0:ae83cacd60d2 | 53 | { |
icraggs | 0:ae83cacd60d2 | 54 | enum QoS qos; |
icraggs | 0:ae83cacd60d2 | 55 | bool retained; |
icraggs | 0:ae83cacd60d2 | 56 | bool dup; |
icraggs | 0:ae83cacd60d2 | 57 | unsigned short id; |
icraggs | 0:ae83cacd60d2 | 58 | void *payload; |
icraggs | 0:ae83cacd60d2 | 59 | size_t payloadlen; |
icraggs | 0:ae83cacd60d2 | 60 | }; |
icraggs | 0:ae83cacd60d2 | 61 | |
icraggs | 0:ae83cacd60d2 | 62 | |
icraggs | 0:ae83cacd60d2 | 63 | struct MessageData |
icraggs | 0:ae83cacd60d2 | 64 | { |
icraggs | 0:ae83cacd60d2 | 65 | MessageData(MQTTSN_topicid &aTopic, struct Message &aMessage) : message(aMessage), topic(aTopic) |
icraggs | 0:ae83cacd60d2 | 66 | { } |
icraggs | 0:ae83cacd60d2 | 67 | |
icraggs | 0:ae83cacd60d2 | 68 | struct Message &message; |
icraggs | 0:ae83cacd60d2 | 69 | MQTTSN_topicid &topic; |
icraggs | 0:ae83cacd60d2 | 70 | }; |
icraggs | 0:ae83cacd60d2 | 71 | |
icraggs | 0:ae83cacd60d2 | 72 | |
icraggs | 0:ae83cacd60d2 | 73 | class PacketId |
icraggs | 0:ae83cacd60d2 | 74 | { |
icraggs | 0:ae83cacd60d2 | 75 | public: |
icraggs | 0:ae83cacd60d2 | 76 | PacketId() |
icraggs | 0:ae83cacd60d2 | 77 | { |
icraggs | 0:ae83cacd60d2 | 78 | next = 0; |
icraggs | 0:ae83cacd60d2 | 79 | } |
icraggs | 0:ae83cacd60d2 | 80 | |
icraggs | 0:ae83cacd60d2 | 81 | int getNext() |
icraggs | 0:ae83cacd60d2 | 82 | { |
icraggs | 0:ae83cacd60d2 | 83 | return next = (next == MAX_PACKET_ID) ? 1 : ++next; |
icraggs | 0:ae83cacd60d2 | 84 | } |
icraggs | 0:ae83cacd60d2 | 85 | |
icraggs | 0:ae83cacd60d2 | 86 | private: |
icraggs | 0:ae83cacd60d2 | 87 | static const int MAX_PACKET_ID = 65535; |
icraggs | 0:ae83cacd60d2 | 88 | int next; |
icraggs | 0:ae83cacd60d2 | 89 | }; |
icraggs | 0:ae83cacd60d2 | 90 | |
icraggs | 0:ae83cacd60d2 | 91 | |
icraggs | 0:ae83cacd60d2 | 92 | /** |
icraggs | 0:ae83cacd60d2 | 93 | * @class MQTTSNClient |
icraggs | 0:ae83cacd60d2 | 94 | * @brief blocking, non-threaded MQTTSN client API |
icraggs | 0:ae83cacd60d2 | 95 | * |
icraggs | 0:ae83cacd60d2 | 96 | * This version of the API blocks on all method calls, until they are complete. This means that only one |
icraggs | 0:ae83cacd60d2 | 97 | * MQTT request can be in process at any one time. |
icraggs | 0:ae83cacd60d2 | 98 | * @param Network a network class which supports send, receive |
icraggs | 0:ae83cacd60d2 | 99 | * @param Timer a timer class with the methods: |
icraggs | 0:ae83cacd60d2 | 100 | */ |
m_ahsan | 3:d85693ff9ec8 | 101 | template<class Network, class Timer, int MAX_PACKET_SIZE = 1024, int MAX_MESSAGE_HANDLERS = 5> |
icraggs | 0:ae83cacd60d2 | 102 | class Client |
icraggs | 0:ae83cacd60d2 | 103 | { |
icraggs | 0:ae83cacd60d2 | 104 | |
icraggs | 0:ae83cacd60d2 | 105 | public: |
icraggs | 0:ae83cacd60d2 | 106 | |
icraggs | 0:ae83cacd60d2 | 107 | typedef void (*messageHandler)(MessageData&); |
icraggs | 0:ae83cacd60d2 | 108 | |
icraggs | 0:ae83cacd60d2 | 109 | /** Construct the client |
icraggs | 0:ae83cacd60d2 | 110 | * @param network - pointer to an instance of the Network class - must be connected to the endpoint |
icraggs | 0:ae83cacd60d2 | 111 | * before calling MQTT connect |
icraggs | 0:ae83cacd60d2 | 112 | * @param limits an instance of the Limit class - to alter limits as required |
icraggs | 0:ae83cacd60d2 | 113 | */ |
icraggs | 0:ae83cacd60d2 | 114 | Client(Network& network, unsigned int command_timeout_ms = 30000); |
icraggs | 0:ae83cacd60d2 | 115 | |
icraggs | 0:ae83cacd60d2 | 116 | /** Set the default message handling callback - used for any message which does not match a subscription message handler |
icraggs | 0:ae83cacd60d2 | 117 | * @param mh - pointer to the callback function |
icraggs | 0:ae83cacd60d2 | 118 | */ |
icraggs | 0:ae83cacd60d2 | 119 | void setDefaultMessageHandler(messageHandler mh) |
icraggs | 0:ae83cacd60d2 | 120 | { |
icraggs | 0:ae83cacd60d2 | 121 | defaultMessageHandler.attach(mh); |
icraggs | 0:ae83cacd60d2 | 122 | } |
icraggs | 0:ae83cacd60d2 | 123 | |
icraggs | 0:ae83cacd60d2 | 124 | /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack |
icraggs | 0:ae83cacd60d2 | 125 | * The nework object must be connected to the network endpoint before calling this |
icraggs | 0:ae83cacd60d2 | 126 | * Default connect options are used |
icraggs | 0:ae83cacd60d2 | 127 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 128 | */ |
icraggs | 0:ae83cacd60d2 | 129 | int connect(); |
icraggs | 0:ae83cacd60d2 | 130 | |
icraggs | 0:ae83cacd60d2 | 131 | /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack |
icraggs | 0:ae83cacd60d2 | 132 | * The nework object must be connected to the network endpoint before calling this |
icraggs | 0:ae83cacd60d2 | 133 | * @param options - connect options |
icraggs | 0:ae83cacd60d2 | 134 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 135 | */ |
icraggs | 0:ae83cacd60d2 | 136 | int connect(MQTTSNPacket_connectData& options); |
icraggs | 0:ae83cacd60d2 | 137 | |
icraggs | 0:ae83cacd60d2 | 138 | /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs |
icraggs | 0:ae83cacd60d2 | 139 | * @param topic - the topic to publish to |
icraggs | 0:ae83cacd60d2 | 140 | * @param message - the message to send |
icraggs | 0:ae83cacd60d2 | 141 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 142 | */ |
icraggs | 0:ae83cacd60d2 | 143 | int publish(MQTTSN_topicid& topic, Message& message); |
icraggs | 0:ae83cacd60d2 | 144 | |
icraggs | 0:ae83cacd60d2 | 145 | /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs |
icraggs | 0:ae83cacd60d2 | 146 | * @param topic - the topic to publish to |
icraggs | 0:ae83cacd60d2 | 147 | * @param payload - the data to send |
icraggs | 0:ae83cacd60d2 | 148 | * @param payloadlen - the length of the data |
icraggs | 0:ae83cacd60d2 | 149 | * @param qos - the QoS to send the publish at |
icraggs | 0:ae83cacd60d2 | 150 | * @param retained - whether the message should be retained |
icraggs | 0:ae83cacd60d2 | 151 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 152 | */ |
icraggs | 0:ae83cacd60d2 | 153 | int publish(MQTTSN_topicid &topic, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false); |
icraggs | 0:ae83cacd60d2 | 154 | |
icraggs | 0:ae83cacd60d2 | 155 | /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs |
icraggs | 0:ae83cacd60d2 | 156 | * @param topic - the topic to publish to |
icraggs | 0:ae83cacd60d2 | 157 | * @param payload - the data to send |
icraggs | 0:ae83cacd60d2 | 158 | * @param payloadlen - the length of the data |
icraggs | 0:ae83cacd60d2 | 159 | * @param id - the packet id used - returned |
icraggs | 0:ae83cacd60d2 | 160 | * @param qos - the QoS to send the publish at |
icraggs | 0:ae83cacd60d2 | 161 | * @param retained - whether the message should be retained |
icraggs | 0:ae83cacd60d2 | 162 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 163 | */ |
icraggs | 0:ae83cacd60d2 | 164 | int publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false); |
icraggs | 0:ae83cacd60d2 | 165 | |
icraggs | 0:ae83cacd60d2 | 166 | /** MQTT Subscribe - send an MQTT subscribe packet and wait for the suback |
icraggs | 0:ae83cacd60d2 | 167 | * @param topicFilter - a topic pattern which can include wildcards |
icraggs | 0:ae83cacd60d2 | 168 | * @param qos - the MQTT QoS to subscribe at |
icraggs | 0:ae83cacd60d2 | 169 | * @param mh - the callback function to be invoked when a message is received for this subscription |
icraggs | 0:ae83cacd60d2 | 170 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 171 | */ |
icraggs | 0:ae83cacd60d2 | 172 | int subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler mh); |
icraggs | 0:ae83cacd60d2 | 173 | |
icraggs | 0:ae83cacd60d2 | 174 | /** MQTT Unsubscribe - send an MQTT unsubscribe packet and wait for the unsuback |
icraggs | 0:ae83cacd60d2 | 175 | * @param topicFilter - a topic pattern which can include wildcards |
icraggs | 0:ae83cacd60d2 | 176 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 177 | */ |
icraggs | 0:ae83cacd60d2 | 178 | int unsubscribe(MQTTSN_topicid& topicFilter); |
icraggs | 0:ae83cacd60d2 | 179 | |
icraggs | 0:ae83cacd60d2 | 180 | /** MQTT Disconnect - send an MQTT disconnect packet, and clean up any state |
icraggs | 0:ae83cacd60d2 | 181 | * @param duration - used for sleeping clients, 0 means no duration |
icraggs | 0:ae83cacd60d2 | 182 | * @return success code - |
icraggs | 0:ae83cacd60d2 | 183 | */ |
icraggs | 0:ae83cacd60d2 | 184 | int disconnect(unsigned short duration = 0); |
icraggs | 0:ae83cacd60d2 | 185 | |
icraggs | 0:ae83cacd60d2 | 186 | /** A call to this API must be made within the keepAlive interval to keep the MQTT connection alive |
icraggs | 0:ae83cacd60d2 | 187 | * yield can be called if no other MQTT operation is needed. This will also allow messages to be |
icraggs | 0:ae83cacd60d2 | 188 | * received. |
icraggs | 0:ae83cacd60d2 | 189 | * @param timeout_ms the time to wait, in milliseconds |
icraggs | 0:ae83cacd60d2 | 190 | * @return success code - on failure, this means the client has disconnected |
icraggs | 0:ae83cacd60d2 | 191 | */ |
icraggs | 0:ae83cacd60d2 | 192 | int yield(unsigned long timeout_ms = 1000L); |
icraggs | 0:ae83cacd60d2 | 193 | |
icraggs | 0:ae83cacd60d2 | 194 | /** Is the client connected? |
icraggs | 0:ae83cacd60d2 | 195 | * @return flag - is the client connected or not? |
icraggs | 0:ae83cacd60d2 | 196 | */ |
icraggs | 0:ae83cacd60d2 | 197 | bool isConnected() |
icraggs | 0:ae83cacd60d2 | 198 | { |
icraggs | 0:ae83cacd60d2 | 199 | return isconnected; |
icraggs | 0:ae83cacd60d2 | 200 | } |
icraggs | 0:ae83cacd60d2 | 201 | |
icraggs | 0:ae83cacd60d2 | 202 | protected: |
icraggs | 0:ae83cacd60d2 | 203 | |
icraggs | 0:ae83cacd60d2 | 204 | int cycle(Timer& timer); |
icraggs | 0:ae83cacd60d2 | 205 | int waitfor(int packet_type, Timer& timer); |
icraggs | 0:ae83cacd60d2 | 206 | |
icraggs | 0:ae83cacd60d2 | 207 | private: |
icraggs | 0:ae83cacd60d2 | 208 | |
icraggs | 0:ae83cacd60d2 | 209 | int keepalive(); |
icraggs | 0:ae83cacd60d2 | 210 | int publish(int len, Timer& timer, enum QoS qos); |
icraggs | 0:ae83cacd60d2 | 211 | |
icraggs | 0:ae83cacd60d2 | 212 | int readPacket(Timer& timer); |
icraggs | 0:ae83cacd60d2 | 213 | int sendPacket(int length, Timer& timer); |
icraggs | 0:ae83cacd60d2 | 214 | int deliverMessage(MQTTSN_topicid& topic, Message& message); |
icraggs | 0:ae83cacd60d2 | 215 | bool isTopicMatched(char* topicFilter, MQTTSNString& topicName); |
icraggs | 0:ae83cacd60d2 | 216 | |
icraggs | 0:ae83cacd60d2 | 217 | Network& ipstack; |
icraggs | 0:ae83cacd60d2 | 218 | unsigned long command_timeout_ms; |
icraggs | 0:ae83cacd60d2 | 219 | |
icraggs | 0:ae83cacd60d2 | 220 | unsigned char sendbuf[MAX_PACKET_SIZE]; |
icraggs | 0:ae83cacd60d2 | 221 | unsigned char readbuf[MAX_PACKET_SIZE]; |
icraggs | 0:ae83cacd60d2 | 222 | |
icraggs | 0:ae83cacd60d2 | 223 | Timer last_sent, last_received; |
icraggs | 0:ae83cacd60d2 | 224 | unsigned short duration; |
icraggs | 0:ae83cacd60d2 | 225 | bool ping_outstanding; |
icraggs | 0:ae83cacd60d2 | 226 | bool cleansession; |
icraggs | 0:ae83cacd60d2 | 227 | |
icraggs | 0:ae83cacd60d2 | 228 | PacketId packetid; |
icraggs | 0:ae83cacd60d2 | 229 | |
icraggs | 0:ae83cacd60d2 | 230 | struct MessageHandlers |
icraggs | 0:ae83cacd60d2 | 231 | { |
icraggs | 0:ae83cacd60d2 | 232 | MQTTSN_topicid* topicFilter; |
icraggs | 0:ae83cacd60d2 | 233 | FP<void, MessageData&> fp; |
icraggs | 0:ae83cacd60d2 | 234 | } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are indexed by subscription topic |
icraggs | 0:ae83cacd60d2 | 235 | |
icraggs | 0:ae83cacd60d2 | 236 | FP<void, MessageData&> defaultMessageHandler; |
icraggs | 0:ae83cacd60d2 | 237 | |
icraggs | 0:ae83cacd60d2 | 238 | bool isconnected; |
icraggs | 0:ae83cacd60d2 | 239 | |
icraggs | 0:ae83cacd60d2 | 240 | struct Registrations |
icraggs | 0:ae83cacd60d2 | 241 | { |
icraggs | 0:ae83cacd60d2 | 242 | unsigned short id; |
icraggs | 0:ae83cacd60d2 | 243 | char name[MAX_REGISTRATION_TOPIC_NAME_LENGTH]; |
icraggs | 0:ae83cacd60d2 | 244 | } registrations[MAX_REGISTRATIONS]; |
icraggs | 0:ae83cacd60d2 | 245 | |
icraggs | 0:ae83cacd60d2 | 246 | #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 247 | unsigned char pubbuf[MAX_PACKET_SIZE]; // store the last publish for sending on reconnect |
icraggs | 0:ae83cacd60d2 | 248 | int inflightLen; |
icraggs | 0:ae83cacd60d2 | 249 | unsigned short inflightMsgid; |
icraggs | 0:ae83cacd60d2 | 250 | enum QoS inflightQoS; |
icraggs | 0:ae83cacd60d2 | 251 | #endif |
icraggs | 0:ae83cacd60d2 | 252 | |
icraggs | 0:ae83cacd60d2 | 253 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 254 | bool pubrel; |
icraggs | 0:ae83cacd60d2 | 255 | unsigned short incomingQoS2messages[MAX_INCOMING_QOS2_MESSAGES]; |
icraggs | 0:ae83cacd60d2 | 256 | bool isQoS2msgidFree(unsigned short id); |
icraggs | 0:ae83cacd60d2 | 257 | bool useQoS2msgid(unsigned short id); |
icraggs | 0:ae83cacd60d2 | 258 | #endif |
icraggs | 0:ae83cacd60d2 | 259 | |
icraggs | 0:ae83cacd60d2 | 260 | }; |
icraggs | 0:ae83cacd60d2 | 261 | |
icraggs | 0:ae83cacd60d2 | 262 | } |
icraggs | 0:ae83cacd60d2 | 263 | |
icraggs | 0:ae83cacd60d2 | 264 | |
icraggs | 0:ae83cacd60d2 | 265 | template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> |
icraggs | 0:ae83cacd60d2 | 266 | MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::Client(Network& network, unsigned int command_timeout_ms) : ipstack(network), packetid() |
icraggs | 0:ae83cacd60d2 | 267 | { |
icraggs | 0:ae83cacd60d2 | 268 | last_sent = Timer(); |
icraggs | 0:ae83cacd60d2 | 269 | last_received = Timer(); |
icraggs | 0:ae83cacd60d2 | 270 | ping_outstanding = false; |
icraggs | 0:ae83cacd60d2 | 271 | for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 0:ae83cacd60d2 | 272 | messageHandlers[i].topicFilter = 0; |
icraggs | 0:ae83cacd60d2 | 273 | this->command_timeout_ms = command_timeout_ms; |
icraggs | 0:ae83cacd60d2 | 274 | isconnected = false; |
icraggs | 0:ae83cacd60d2 | 275 | |
icraggs | 0:ae83cacd60d2 | 276 | #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 277 | inflightMsgid = 0; |
icraggs | 0:ae83cacd60d2 | 278 | inflightQoS = QOS0; |
icraggs | 0:ae83cacd60d2 | 279 | #endif |
icraggs | 0:ae83cacd60d2 | 280 | |
icraggs | 0:ae83cacd60d2 | 281 | |
icraggs | 0:ae83cacd60d2 | 282 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 283 | pubrel = false; |
icraggs | 0:ae83cacd60d2 | 284 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 0:ae83cacd60d2 | 285 | incomingQoS2messages[i] = 0; |
icraggs | 0:ae83cacd60d2 | 286 | #endif |
icraggs | 0:ae83cacd60d2 | 287 | } |
icraggs | 0:ae83cacd60d2 | 288 | |
icraggs | 0:ae83cacd60d2 | 289 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 290 | template<class Network, class Timer, int a, int b> |
icraggs | 0:ae83cacd60d2 | 291 | bool MQTTSN::Client<Network, Timer, a, b>::isQoS2msgidFree(unsigned short id) |
icraggs | 0:ae83cacd60d2 | 292 | { |
icraggs | 0:ae83cacd60d2 | 293 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 0:ae83cacd60d2 | 294 | { |
icraggs | 0:ae83cacd60d2 | 295 | if (incomingQoS2messages[i] == id) |
icraggs | 0:ae83cacd60d2 | 296 | return false; |
icraggs | 0:ae83cacd60d2 | 297 | } |
icraggs | 0:ae83cacd60d2 | 298 | return true; |
icraggs | 0:ae83cacd60d2 | 299 | } |
icraggs | 0:ae83cacd60d2 | 300 | |
icraggs | 0:ae83cacd60d2 | 301 | |
icraggs | 0:ae83cacd60d2 | 302 | template<class Network, class Timer, int a, int b> |
icraggs | 0:ae83cacd60d2 | 303 | bool MQTTSN::Client<Network, Timer, a, b>::useQoS2msgid(unsigned short id) |
icraggs | 0:ae83cacd60d2 | 304 | { |
icraggs | 0:ae83cacd60d2 | 305 | for (int i = 0; i < MAX_INCOMING_QOS2_MESSAGES; ++i) |
icraggs | 0:ae83cacd60d2 | 306 | { |
icraggs | 0:ae83cacd60d2 | 307 | if (incomingQoS2messages[i] == 0) |
icraggs | 0:ae83cacd60d2 | 308 | { |
icraggs | 0:ae83cacd60d2 | 309 | incomingQoS2messages[i] = id; |
icraggs | 0:ae83cacd60d2 | 310 | return true; |
icraggs | 0:ae83cacd60d2 | 311 | } |
icraggs | 0:ae83cacd60d2 | 312 | } |
icraggs | 0:ae83cacd60d2 | 313 | return false; |
icraggs | 0:ae83cacd60d2 | 314 | } |
icraggs | 0:ae83cacd60d2 | 315 | #endif |
icraggs | 0:ae83cacd60d2 | 316 | |
icraggs | 0:ae83cacd60d2 | 317 | |
icraggs | 0:ae83cacd60d2 | 318 | template<class Network, class Timer, int a, int b> |
icraggs | 0:ae83cacd60d2 | 319 | int MQTTSN::Client<Network, Timer, a, b>::sendPacket(int length, Timer& timer) |
icraggs | 0:ae83cacd60d2 | 320 | { |
icraggs | 0:ae83cacd60d2 | 321 | int rc = FAILURE, |
icraggs | 0:ae83cacd60d2 | 322 | sent = 0; |
icraggs | 0:ae83cacd60d2 | 323 | |
icraggs | 0:ae83cacd60d2 | 324 | do |
icraggs | 0:ae83cacd60d2 | 325 | { |
icraggs | 0:ae83cacd60d2 | 326 | sent = ipstack.write(sendbuf, length, timer.left_ms()); |
icraggs | 0:ae83cacd60d2 | 327 | printf("sendPacket, rc %d from write of %d bytes\n", sent, length); |
icraggs | 0:ae83cacd60d2 | 328 | if (sent < 0) // there was an error writing the data |
icraggs | 0:ae83cacd60d2 | 329 | break; |
icraggs | 0:ae83cacd60d2 | 330 | } |
icraggs | 0:ae83cacd60d2 | 331 | while (sent != length && !timer.expired()); |
icraggs | 0:ae83cacd60d2 | 332 | |
icraggs | 0:ae83cacd60d2 | 333 | if (sent == length) |
icraggs | 0:ae83cacd60d2 | 334 | { |
icraggs | 0:ae83cacd60d2 | 335 | if (this->duration > 0) |
icraggs | 0:ae83cacd60d2 | 336 | last_sent.countdown(this->duration); // record the fact that we have successfully sent the packet |
icraggs | 0:ae83cacd60d2 | 337 | rc = SUCCESS; |
icraggs | 0:ae83cacd60d2 | 338 | } |
icraggs | 0:ae83cacd60d2 | 339 | else |
icraggs | 0:ae83cacd60d2 | 340 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 341 | |
icraggs | 0:ae83cacd60d2 | 342 | #if defined(MQTT_DEBUG) |
icraggs | 0:ae83cacd60d2 | 343 | char printbuf[50]; |
icraggs | 0:ae83cacd60d2 | 344 | DEBUG("Rc %d from sending packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), sendbuf, length)); |
icraggs | 0:ae83cacd60d2 | 345 | #endif |
icraggs | 0:ae83cacd60d2 | 346 | return rc; |
icraggs | 0:ae83cacd60d2 | 347 | } |
icraggs | 0:ae83cacd60d2 | 348 | |
icraggs | 0:ae83cacd60d2 | 349 | |
icraggs | 0:ae83cacd60d2 | 350 | /** |
icraggs | 0:ae83cacd60d2 | 351 | * If any read fails in this method, then we should disconnect from the network, as on reconnect |
icraggs | 0:ae83cacd60d2 | 352 | * the packets can be retried. |
icraggs | 0:ae83cacd60d2 | 353 | * @param timeout the max time to wait for the packet read to complete, in milliseconds |
icraggs | 0:ae83cacd60d2 | 354 | * @return the MQTT packet type, or -1 if none |
icraggs | 0:ae83cacd60d2 | 355 | */ |
icraggs | 0:ae83cacd60d2 | 356 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 357 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::readPacket(Timer& timer) |
icraggs | 0:ae83cacd60d2 | 358 | { |
icraggs | 0:ae83cacd60d2 | 359 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 360 | int len = 0; // the length of the whole packet including length field |
icraggs | 0:ae83cacd60d2 | 361 | int lenlen = 0; |
icraggs | 0:ae83cacd60d2 | 362 | int datalen = 0; |
icraggs | 0:ae83cacd60d2 | 363 | |
icraggs | 0:ae83cacd60d2 | 364 | #define MQTTSN_MIN_PACKET_LENGTH 3 |
icraggs | 0:ae83cacd60d2 | 365 | // 1. read the packet, datagram style |
icraggs | 0:ae83cacd60d2 | 366 | if ((len = ipstack.read(readbuf, MAX_PACKET_SIZE, timer.left_ms())) < MQTTSN_MIN_PACKET_LENGTH) |
icraggs | 0:ae83cacd60d2 | 367 | goto exit; |
icraggs | 0:ae83cacd60d2 | 368 | |
icraggs | 0:ae83cacd60d2 | 369 | // 2. read the length. This is variable in itself |
icraggs | 0:ae83cacd60d2 | 370 | lenlen = MQTTSNPacket_decode(readbuf, len, &datalen); |
icraggs | 0:ae83cacd60d2 | 371 | if (datalen != len) |
icraggs | 0:ae83cacd60d2 | 372 | goto exit; // there was an error |
icraggs | 0:ae83cacd60d2 | 373 | |
icraggs | 0:ae83cacd60d2 | 374 | rc = readbuf[lenlen]; |
icraggs | 0:ae83cacd60d2 | 375 | if (this->duration > 0) |
icraggs | 0:ae83cacd60d2 | 376 | last_received.countdown(this->duration); // record the fact that we have successfully received a packet |
icraggs | 0:ae83cacd60d2 | 377 | exit: |
icraggs | 0:ae83cacd60d2 | 378 | |
icraggs | 0:ae83cacd60d2 | 379 | #if defined(MQTT_DEBUG) |
icraggs | 0:ae83cacd60d2 | 380 | char printbuf[50]; |
icraggs | 0:ae83cacd60d2 | 381 | DEBUG("Rc %d from receiving packet %s\n", rc, MQTTPacket_toString(printbuf, sizeof(printbuf), readbuf, len)); |
icraggs | 0:ae83cacd60d2 | 382 | #endif |
icraggs | 0:ae83cacd60d2 | 383 | return rc; |
icraggs | 0:ae83cacd60d2 | 384 | } |
icraggs | 0:ae83cacd60d2 | 385 | |
icraggs | 0:ae83cacd60d2 | 386 | |
icraggs | 0:ae83cacd60d2 | 387 | // assume topic filter and name is in correct format |
icraggs | 0:ae83cacd60d2 | 388 | // # can only be at end |
icraggs | 0:ae83cacd60d2 | 389 | // + and # can only be next to separator |
icraggs | 0:ae83cacd60d2 | 390 | template<class Network, class Timer, int a, int b> |
icraggs | 0:ae83cacd60d2 | 391 | bool MQTTSN::Client<Network, Timer, a, b>::isTopicMatched(char* topicFilter, MQTTSNString& topicName) |
icraggs | 0:ae83cacd60d2 | 392 | { |
icraggs | 0:ae83cacd60d2 | 393 | char* curf = topicFilter; |
icraggs | 0:ae83cacd60d2 | 394 | char* curn = topicName.lenstring.data; |
icraggs | 0:ae83cacd60d2 | 395 | char* curn_end = curn + topicName.lenstring.len; |
icraggs | 0:ae83cacd60d2 | 396 | |
icraggs | 0:ae83cacd60d2 | 397 | while (*curf && curn < curn_end) |
icraggs | 0:ae83cacd60d2 | 398 | { |
icraggs | 0:ae83cacd60d2 | 399 | if (*curn == '/' && *curf != '/') |
icraggs | 0:ae83cacd60d2 | 400 | break; |
icraggs | 0:ae83cacd60d2 | 401 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
icraggs | 0:ae83cacd60d2 | 402 | break; |
icraggs | 0:ae83cacd60d2 | 403 | if (*curf == '+') |
icraggs | 0:ae83cacd60d2 | 404 | { // skip until we meet the next separator, or end of string |
icraggs | 0:ae83cacd60d2 | 405 | char* nextpos = curn + 1; |
icraggs | 0:ae83cacd60d2 | 406 | while (nextpos < curn_end && *nextpos != '/') |
icraggs | 0:ae83cacd60d2 | 407 | nextpos = ++curn + 1; |
icraggs | 0:ae83cacd60d2 | 408 | } |
icraggs | 0:ae83cacd60d2 | 409 | else if (*curf == '#') |
icraggs | 0:ae83cacd60d2 | 410 | curn = curn_end - 1; // skip until end of string |
icraggs | 0:ae83cacd60d2 | 411 | curf++; |
icraggs | 0:ae83cacd60d2 | 412 | curn++; |
icraggs | 0:ae83cacd60d2 | 413 | }; |
icraggs | 0:ae83cacd60d2 | 414 | |
icraggs | 0:ae83cacd60d2 | 415 | return (curn == curn_end) && (*curf == '\0'); |
icraggs | 0:ae83cacd60d2 | 416 | } |
icraggs | 0:ae83cacd60d2 | 417 | |
icraggs | 0:ae83cacd60d2 | 418 | |
icraggs | 0:ae83cacd60d2 | 419 | |
icraggs | 0:ae83cacd60d2 | 420 | template<class Network, class Timer, int a, int MAX_MESSAGE_HANDLERS> |
icraggs | 0:ae83cacd60d2 | 421 | int MQTTSN::Client<Network, Timer, a, MAX_MESSAGE_HANDLERS>::deliverMessage(MQTTSN_topicid& topic, Message& message) |
icraggs | 0:ae83cacd60d2 | 422 | { |
icraggs | 0:ae83cacd60d2 | 423 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 424 | printf("deliverMessage topic id is %d\n", topic.data.id); |
icraggs | 0:ae83cacd60d2 | 425 | |
icraggs | 0:ae83cacd60d2 | 426 | // we have to find the right message handler - indexed by topic |
icraggs | 0:ae83cacd60d2 | 427 | for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 0:ae83cacd60d2 | 428 | { |
icraggs | 0:ae83cacd60d2 | 429 | if (messageHandlers[i].topicFilter != 0) |
icraggs | 0:ae83cacd60d2 | 430 | printf("messageHandler %d topic id is %d\n", i, messageHandlers[i].topicFilter->data.id); |
icraggs | 0:ae83cacd60d2 | 431 | if (messageHandlers[i].topicFilter != 0 && (topic.data.id == messageHandlers[i].topicFilter->data.id)) |
icraggs | 0:ae83cacd60d2 | 432 | //MQTTSNtopic_equals(&topic, messageHandlers[i].topicFilter) || |
icraggs | 0:ae83cacd60d2 | 433 | // isTopicMatched(messageHandlers[i].topicFilter, topic))) |
icraggs | 0:ae83cacd60d2 | 434 | { |
icraggs | 0:ae83cacd60d2 | 435 | if (messageHandlers[i].fp.attached()) |
icraggs | 0:ae83cacd60d2 | 436 | { |
icraggs | 0:ae83cacd60d2 | 437 | MessageData md(topic, message); |
icraggs | 0:ae83cacd60d2 | 438 | messageHandlers[i].fp(md); |
icraggs | 0:ae83cacd60d2 | 439 | rc = SUCCESS; |
icraggs | 0:ae83cacd60d2 | 440 | } |
icraggs | 0:ae83cacd60d2 | 441 | } |
icraggs | 0:ae83cacd60d2 | 442 | } |
icraggs | 0:ae83cacd60d2 | 443 | |
icraggs | 0:ae83cacd60d2 | 444 | if (rc == FAILURE && defaultMessageHandler.attached()) |
icraggs | 0:ae83cacd60d2 | 445 | { |
icraggs | 0:ae83cacd60d2 | 446 | MessageData md(topic, message); |
icraggs | 0:ae83cacd60d2 | 447 | defaultMessageHandler(md); |
icraggs | 0:ae83cacd60d2 | 448 | rc = SUCCESS; |
icraggs | 0:ae83cacd60d2 | 449 | } |
icraggs | 0:ae83cacd60d2 | 450 | |
icraggs | 0:ae83cacd60d2 | 451 | return rc; |
icraggs | 0:ae83cacd60d2 | 452 | } |
icraggs | 0:ae83cacd60d2 | 453 | |
icraggs | 0:ae83cacd60d2 | 454 | |
icraggs | 0:ae83cacd60d2 | 455 | |
icraggs | 0:ae83cacd60d2 | 456 | template<class Network, class Timer, int a, int b> |
icraggs | 0:ae83cacd60d2 | 457 | int MQTTSN::Client<Network, Timer, a, b>::yield(unsigned long timeout_ms) |
icraggs | 0:ae83cacd60d2 | 458 | { |
icraggs | 0:ae83cacd60d2 | 459 | int rc = SUCCESS; |
icraggs | 0:ae83cacd60d2 | 460 | Timer timer = Timer(); |
icraggs | 0:ae83cacd60d2 | 461 | |
icraggs | 0:ae83cacd60d2 | 462 | timer.countdown_ms(timeout_ms); |
icraggs | 0:ae83cacd60d2 | 463 | while (!timer.expired()) |
icraggs | 0:ae83cacd60d2 | 464 | { |
icraggs | 0:ae83cacd60d2 | 465 | if (cycle(timer) == FAILURE) |
icraggs | 0:ae83cacd60d2 | 466 | { |
icraggs | 0:ae83cacd60d2 | 467 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 468 | break; |
icraggs | 0:ae83cacd60d2 | 469 | } |
icraggs | 0:ae83cacd60d2 | 470 | } |
icraggs | 0:ae83cacd60d2 | 471 | |
icraggs | 0:ae83cacd60d2 | 472 | return rc; |
icraggs | 0:ae83cacd60d2 | 473 | } |
icraggs | 0:ae83cacd60d2 | 474 | |
icraggs | 0:ae83cacd60d2 | 475 | |
icraggs | 0:ae83cacd60d2 | 476 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 477 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::cycle(Timer& timer) |
icraggs | 0:ae83cacd60d2 | 478 | { |
icraggs | 0:ae83cacd60d2 | 479 | /* get one piece of work off the wire and one pass through */ |
icraggs | 0:ae83cacd60d2 | 480 | |
icraggs | 0:ae83cacd60d2 | 481 | // read the socket, see what work is due |
icraggs | 0:ae83cacd60d2 | 482 | unsigned short packet_type = readPacket(timer); |
icraggs | 0:ae83cacd60d2 | 483 | |
icraggs | 0:ae83cacd60d2 | 484 | int len = 0; |
icraggs | 0:ae83cacd60d2 | 485 | int rc = SUCCESS; |
icraggs | 0:ae83cacd60d2 | 486 | |
icraggs | 0:ae83cacd60d2 | 487 | switch (packet_type) |
icraggs | 0:ae83cacd60d2 | 488 | { |
icraggs | 0:ae83cacd60d2 | 489 | case MQTTSN_CONNACK: |
icraggs | 0:ae83cacd60d2 | 490 | case MQTTSN_PUBACK: |
icraggs | 0:ae83cacd60d2 | 491 | case MQTTSN_SUBACK: |
icraggs | 0:ae83cacd60d2 | 492 | case MQTTSN_REGACK: |
icraggs | 0:ae83cacd60d2 | 493 | break; |
icraggs | 0:ae83cacd60d2 | 494 | case MQTTSN_REGISTER: |
icraggs | 0:ae83cacd60d2 | 495 | { |
icraggs | 0:ae83cacd60d2 | 496 | unsigned short topicid, packetid; |
icraggs | 0:ae83cacd60d2 | 497 | MQTTSNString topicName; |
icraggs | 0:ae83cacd60d2 | 498 | unsigned char reg_rc = MQTTSN_RC_ACCEPTED; |
icraggs | 0:ae83cacd60d2 | 499 | if (MQTTSNDeserialize_register(&topicid, &packetid, &topicName, readbuf, MAX_PACKET_SIZE) != 1) |
icraggs | 0:ae83cacd60d2 | 500 | goto exit; |
icraggs | 0:ae83cacd60d2 | 501 | len = MQTTSNSerialize_regack(sendbuf, MAX_PACKET_SIZE, topicid, packetid, reg_rc); |
icraggs | 0:ae83cacd60d2 | 502 | if (len <= 0) |
icraggs | 0:ae83cacd60d2 | 503 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 504 | else |
icraggs | 0:ae83cacd60d2 | 505 | rc = sendPacket(len, timer); |
icraggs | 0:ae83cacd60d2 | 506 | break; |
icraggs | 0:ae83cacd60d2 | 507 | } |
icraggs | 0:ae83cacd60d2 | 508 | case MQTTSN_PUBLISH: |
icraggs | 0:ae83cacd60d2 | 509 | MQTTSN_topicid topicid; |
icraggs | 0:ae83cacd60d2 | 510 | Message msg; |
icraggs | 0:ae83cacd60d2 | 511 | if (MQTTSNDeserialize_publish((unsigned char*)&msg.dup, (int*)&msg.qos, (unsigned char*)&msg.retained, &msg.id, &topicid, |
icraggs | 0:ae83cacd60d2 | 512 | (unsigned char**)&msg.payload, (int*)&msg.payloadlen, readbuf, MAX_PACKET_SIZE) != 1) |
icraggs | 0:ae83cacd60d2 | 513 | goto exit; |
icraggs | 0:ae83cacd60d2 | 514 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 515 | if (msg.qos != QOS2) |
icraggs | 0:ae83cacd60d2 | 516 | #endif |
icraggs | 0:ae83cacd60d2 | 517 | deliverMessage(topicid, msg); |
icraggs | 0:ae83cacd60d2 | 518 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 519 | else if (isQoS2msgidFree(msg.id)) |
icraggs | 0:ae83cacd60d2 | 520 | { |
icraggs | 0:ae83cacd60d2 | 521 | if (useQoS2msgid(msg.id)) |
icraggs | 0:ae83cacd60d2 | 522 | deliverMessage(topicid, msg); |
icraggs | 0:ae83cacd60d2 | 523 | else |
icraggs | 0:ae83cacd60d2 | 524 | WARN("Maximum number of incoming QoS2 messages exceeded"); |
icraggs | 0:ae83cacd60d2 | 525 | } |
icraggs | 0:ae83cacd60d2 | 526 | #endif |
icraggs | 0:ae83cacd60d2 | 527 | #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 528 | if (msg.qos != QOS0) |
icraggs | 0:ae83cacd60d2 | 529 | { |
icraggs | 0:ae83cacd60d2 | 530 | if (msg.qos == QOS1) |
icraggs | 0:ae83cacd60d2 | 531 | len = MQTTSNSerialize_puback(sendbuf, MAX_PACKET_SIZE, topicid.data.id, msg.id, 0); |
icraggs | 0:ae83cacd60d2 | 532 | else if (msg.qos == QOS2) |
icraggs | 0:ae83cacd60d2 | 533 | len = MQTTSNSerialize_pubrec(sendbuf, MAX_PACKET_SIZE, msg.id); |
icraggs | 0:ae83cacd60d2 | 534 | if (len <= 0) |
icraggs | 0:ae83cacd60d2 | 535 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 536 | else |
icraggs | 0:ae83cacd60d2 | 537 | { |
icraggs | 0:ae83cacd60d2 | 538 | printf("sending puback len %d\n", len); |
icraggs | 0:ae83cacd60d2 | 539 | rc = sendPacket(len, timer); |
icraggs | 0:ae83cacd60d2 | 540 | printf("rc %d from sending puback\n", rc); |
icraggs | 0:ae83cacd60d2 | 541 | } |
icraggs | 0:ae83cacd60d2 | 542 | if (rc == FAILURE) |
icraggs | 0:ae83cacd60d2 | 543 | goto exit; // there was a problem |
icraggs | 0:ae83cacd60d2 | 544 | } |
icraggs | 0:ae83cacd60d2 | 545 | break; |
icraggs | 0:ae83cacd60d2 | 546 | #endif |
icraggs | 0:ae83cacd60d2 | 547 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 548 | case MQTTSN_PUBREC: |
icraggs | 0:ae83cacd60d2 | 549 | unsigned short mypacketid; |
icraggs | 0:ae83cacd60d2 | 550 | unsigned char type; |
icraggs | 0:ae83cacd60d2 | 551 | if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) |
icraggs | 0:ae83cacd60d2 | 552 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 553 | else if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, mypacketid)) <= 0) |
icraggs | 0:ae83cacd60d2 | 554 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 555 | else if ((rc = sendPacket(len, timer)) != SUCCESS) // send the PUBREL packet |
icraggs | 0:ae83cacd60d2 | 556 | rc = FAILURE; // there was a problem |
icraggs | 0:ae83cacd60d2 | 557 | if (rc == FAILURE) |
icraggs | 0:ae83cacd60d2 | 558 | goto exit; // there was a problem |
icraggs | 0:ae83cacd60d2 | 559 | break; |
icraggs | 0:ae83cacd60d2 | 560 | case MQTTSN_PUBCOMP: |
icraggs | 0:ae83cacd60d2 | 561 | break; |
icraggs | 0:ae83cacd60d2 | 562 | #endif |
icraggs | 0:ae83cacd60d2 | 563 | case MQTTSN_PINGRESP: |
icraggs | 0:ae83cacd60d2 | 564 | ping_outstanding = false; |
icraggs | 0:ae83cacd60d2 | 565 | break; |
icraggs | 0:ae83cacd60d2 | 566 | } |
icraggs | 0:ae83cacd60d2 | 567 | keepalive(); |
icraggs | 0:ae83cacd60d2 | 568 | exit: |
icraggs | 0:ae83cacd60d2 | 569 | if (rc == SUCCESS) |
icraggs | 0:ae83cacd60d2 | 570 | rc = packet_type; |
icraggs | 0:ae83cacd60d2 | 571 | return rc; |
icraggs | 0:ae83cacd60d2 | 572 | } |
icraggs | 0:ae83cacd60d2 | 573 | |
icraggs | 0:ae83cacd60d2 | 574 | |
icraggs | 0:ae83cacd60d2 | 575 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 576 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::keepalive() |
icraggs | 0:ae83cacd60d2 | 577 | { |
icraggs | 0:ae83cacd60d2 | 578 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 579 | |
icraggs | 0:ae83cacd60d2 | 580 | if (duration == 0) |
icraggs | 0:ae83cacd60d2 | 581 | { |
icraggs | 0:ae83cacd60d2 | 582 | rc = SUCCESS; |
icraggs | 0:ae83cacd60d2 | 583 | goto exit; |
icraggs | 0:ae83cacd60d2 | 584 | } |
icraggs | 0:ae83cacd60d2 | 585 | |
icraggs | 0:ae83cacd60d2 | 586 | if (last_sent.expired() || last_received.expired()) |
icraggs | 0:ae83cacd60d2 | 587 | { |
icraggs | 0:ae83cacd60d2 | 588 | if (!ping_outstanding) |
icraggs | 0:ae83cacd60d2 | 589 | { |
icraggs | 0:ae83cacd60d2 | 590 | MQTTSNString clientid = MQTTSNString_initializer; |
icraggs | 0:ae83cacd60d2 | 591 | Timer timer = Timer(1000); |
icraggs | 0:ae83cacd60d2 | 592 | int len = MQTTSNSerialize_pingreq(sendbuf, MAX_PACKET_SIZE, clientid); |
icraggs | 0:ae83cacd60d2 | 593 | if (len > 0 && (rc = sendPacket(len, timer)) == SUCCESS) // send the ping packet |
icraggs | 0:ae83cacd60d2 | 594 | ping_outstanding = true; |
icraggs | 0:ae83cacd60d2 | 595 | } |
icraggs | 0:ae83cacd60d2 | 596 | } |
icraggs | 0:ae83cacd60d2 | 597 | |
icraggs | 0:ae83cacd60d2 | 598 | exit: |
icraggs | 0:ae83cacd60d2 | 599 | return rc; |
icraggs | 0:ae83cacd60d2 | 600 | } |
icraggs | 0:ae83cacd60d2 | 601 | |
icraggs | 0:ae83cacd60d2 | 602 | |
icraggs | 0:ae83cacd60d2 | 603 | // only used in single-threaded mode where one command at a time is in process |
icraggs | 0:ae83cacd60d2 | 604 | template<class Network, class Timer, int a, int b> |
icraggs | 0:ae83cacd60d2 | 605 | int MQTTSN::Client<Network, Timer, a, b>::waitfor(int packet_type, Timer& timer) |
icraggs | 0:ae83cacd60d2 | 606 | { |
icraggs | 0:ae83cacd60d2 | 607 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 608 | |
icraggs | 0:ae83cacd60d2 | 609 | do |
icraggs | 0:ae83cacd60d2 | 610 | { |
icraggs | 0:ae83cacd60d2 | 611 | if (timer.expired()) |
icraggs | 0:ae83cacd60d2 | 612 | break; // we timed out |
icraggs | 0:ae83cacd60d2 | 613 | } |
icraggs | 0:ae83cacd60d2 | 614 | while ((rc = cycle(timer)) != packet_type); |
icraggs | 0:ae83cacd60d2 | 615 | |
icraggs | 0:ae83cacd60d2 | 616 | return rc; |
icraggs | 0:ae83cacd60d2 | 617 | } |
icraggs | 0:ae83cacd60d2 | 618 | |
icraggs | 0:ae83cacd60d2 | 619 | |
icraggs | 0:ae83cacd60d2 | 620 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 621 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect(MQTTSNPacket_connectData& options) |
icraggs | 0:ae83cacd60d2 | 622 | { |
icraggs | 0:ae83cacd60d2 | 623 | Timer connect_timer = Timer(command_timeout_ms); |
icraggs | 0:ae83cacd60d2 | 624 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 625 | int len = 0; |
icraggs | 0:ae83cacd60d2 | 626 | |
icraggs | 0:ae83cacd60d2 | 627 | if (isconnected) // don't send connect packet again if we are already connected |
icraggs | 0:ae83cacd60d2 | 628 | goto exit; |
icraggs | 0:ae83cacd60d2 | 629 | |
icraggs | 0:ae83cacd60d2 | 630 | this->duration = options.duration; |
icraggs | 0:ae83cacd60d2 | 631 | this->cleansession = options.cleansession; |
icraggs | 0:ae83cacd60d2 | 632 | if ((len = MQTTSNSerialize_connect(sendbuf, MAX_PACKET_SIZE, &options)) <= 0) |
icraggs | 0:ae83cacd60d2 | 633 | goto exit; |
icraggs | 0:ae83cacd60d2 | 634 | if ((rc = sendPacket(len, connect_timer)) != SUCCESS) // send the connect packet |
icraggs | 0:ae83cacd60d2 | 635 | goto exit; // there was a problem |
icraggs | 0:ae83cacd60d2 | 636 | |
icraggs | 0:ae83cacd60d2 | 637 | if (this->duration > 0) |
icraggs | 0:ae83cacd60d2 | 638 | last_received.countdown(this->duration); |
icraggs | 0:ae83cacd60d2 | 639 | // this will be a blocking call, wait for the connack |
icraggs | 0:ae83cacd60d2 | 640 | if (waitfor(MQTTSN_CONNACK, connect_timer) == MQTTSN_CONNACK) |
icraggs | 0:ae83cacd60d2 | 641 | { |
icraggs | 0:ae83cacd60d2 | 642 | //unsigned char connack_rc = 255; |
icraggs | 0:ae83cacd60d2 | 643 | int connack_rc = 255; |
icraggs | 0:ae83cacd60d2 | 644 | if (MQTTSNDeserialize_connack(&connack_rc, readbuf, MAX_PACKET_SIZE) == 1) |
icraggs | 0:ae83cacd60d2 | 645 | rc = connack_rc; |
icraggs | 0:ae83cacd60d2 | 646 | else |
icraggs | 0:ae83cacd60d2 | 647 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 648 | } |
icraggs | 0:ae83cacd60d2 | 649 | else |
icraggs | 0:ae83cacd60d2 | 650 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 651 | |
icraggs | 0:ae83cacd60d2 | 652 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 653 | // resend an inflight publish |
icraggs | 0:ae83cacd60d2 | 654 | if (inflightMsgid >0 && inflightQoS == QOS2 && pubrel) |
icraggs | 0:ae83cacd60d2 | 655 | { |
icraggs | 0:ae83cacd60d2 | 656 | if ((len = MQTTSNSerialize_pubrel(sendbuf, MAX_PACKET_SIZE, inflightMsgid)) <= 0) |
icraggs | 0:ae83cacd60d2 | 657 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 658 | else |
icraggs | 0:ae83cacd60d2 | 659 | rc = publish(len, connect_timer, inflightQoS); |
icraggs | 0:ae83cacd60d2 | 660 | } |
icraggs | 0:ae83cacd60d2 | 661 | else |
icraggs | 0:ae83cacd60d2 | 662 | #endif |
icraggs | 0:ae83cacd60d2 | 663 | #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 664 | if (inflightMsgid > 0) |
icraggs | 0:ae83cacd60d2 | 665 | { |
icraggs | 0:ae83cacd60d2 | 666 | memcpy(sendbuf, pubbuf, MAX_PACKET_SIZE); |
icraggs | 0:ae83cacd60d2 | 667 | rc = publish(inflightLen, connect_timer, inflightQoS); |
icraggs | 0:ae83cacd60d2 | 668 | } |
icraggs | 0:ae83cacd60d2 | 669 | #endif |
icraggs | 0:ae83cacd60d2 | 670 | |
icraggs | 0:ae83cacd60d2 | 671 | exit: |
icraggs | 0:ae83cacd60d2 | 672 | if (rc == SUCCESS) |
icraggs | 0:ae83cacd60d2 | 673 | isconnected = true; |
icraggs | 0:ae83cacd60d2 | 674 | return rc; |
icraggs | 0:ae83cacd60d2 | 675 | } |
icraggs | 0:ae83cacd60d2 | 676 | |
icraggs | 0:ae83cacd60d2 | 677 | |
icraggs | 0:ae83cacd60d2 | 678 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 679 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::connect() |
icraggs | 0:ae83cacd60d2 | 680 | { |
icraggs | 0:ae83cacd60d2 | 681 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
icraggs | 0:ae83cacd60d2 | 682 | return connect(default_options); |
icraggs | 0:ae83cacd60d2 | 683 | } |
icraggs | 0:ae83cacd60d2 | 684 | |
icraggs | 0:ae83cacd60d2 | 685 | |
icraggs | 0:ae83cacd60d2 | 686 | template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> |
icraggs | 0:ae83cacd60d2 | 687 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::subscribe(MQTTSN_topicid& topicFilter, enum QoS qos, enum QoS &grantedQoS, messageHandler messageHandler) |
icraggs | 0:ae83cacd60d2 | 688 | { |
icraggs | 0:ae83cacd60d2 | 689 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 690 | Timer timer = Timer(command_timeout_ms); |
icraggs | 0:ae83cacd60d2 | 691 | int len = 0; |
icraggs | 0:ae83cacd60d2 | 692 | |
icraggs | 0:ae83cacd60d2 | 693 | if (!isconnected) |
icraggs | 0:ae83cacd60d2 | 694 | goto exit; |
icraggs | 0:ae83cacd60d2 | 695 | |
icraggs | 0:ae83cacd60d2 | 696 | bool freeHandler = false; |
icraggs | 0:ae83cacd60d2 | 697 | for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 0:ae83cacd60d2 | 698 | { |
icraggs | 0:ae83cacd60d2 | 699 | if (messageHandlers[i].topicFilter == 0) |
icraggs | 0:ae83cacd60d2 | 700 | { |
icraggs | 0:ae83cacd60d2 | 701 | freeHandler = true; |
icraggs | 0:ae83cacd60d2 | 702 | break; |
icraggs | 0:ae83cacd60d2 | 703 | } |
icraggs | 0:ae83cacd60d2 | 704 | } |
icraggs | 0:ae83cacd60d2 | 705 | if (!freeHandler) |
icraggs | 0:ae83cacd60d2 | 706 | { // No message handler free |
icraggs | 0:ae83cacd60d2 | 707 | rc = MAX_SUBSCRIPTIONS_EXCEEDED; |
icraggs | 0:ae83cacd60d2 | 708 | goto exit; |
icraggs | 0:ae83cacd60d2 | 709 | } |
icraggs | 0:ae83cacd60d2 | 710 | |
icraggs | 0:ae83cacd60d2 | 711 | len = MQTTSNSerialize_subscribe(sendbuf, MAX_PACKET_SIZE, 0, qos, packetid.getNext(), &topicFilter); |
icraggs | 0:ae83cacd60d2 | 712 | if (len <= 0) |
icraggs | 0:ae83cacd60d2 | 713 | goto exit; |
icraggs | 0:ae83cacd60d2 | 714 | if ((rc = sendPacket(len, timer)) != SUCCESS) // send the subscribe packet |
icraggs | 0:ae83cacd60d2 | 715 | goto exit; // there was a problem |
icraggs | 0:ae83cacd60d2 | 716 | |
icraggs | 0:ae83cacd60d2 | 717 | if (waitfor(MQTTSN_SUBACK, timer) == MQTTSN_SUBACK) // wait for suback |
icraggs | 0:ae83cacd60d2 | 718 | { |
icraggs | 0:ae83cacd60d2 | 719 | unsigned short mypacketid; |
icraggs | 0:ae83cacd60d2 | 720 | unsigned char suback_rc; |
icraggs | 0:ae83cacd60d2 | 721 | if (MQTTSNDeserialize_suback((int*)&grantedQoS, &topicFilter.data.id, &mypacketid, &suback_rc, readbuf, MAX_PACKET_SIZE) != 1) |
icraggs | 0:ae83cacd60d2 | 722 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 723 | else |
icraggs | 0:ae83cacd60d2 | 724 | rc = suback_rc; |
icraggs | 0:ae83cacd60d2 | 725 | if (suback_rc == MQTTSN_RC_ACCEPTED) |
icraggs | 0:ae83cacd60d2 | 726 | { |
icraggs | 0:ae83cacd60d2 | 727 | for (int i = 0; i < MAX_MESSAGE_HANDLERS; ++i) |
icraggs | 0:ae83cacd60d2 | 728 | { |
icraggs | 0:ae83cacd60d2 | 729 | if (messageHandlers[i].topicFilter == 0) |
icraggs | 0:ae83cacd60d2 | 730 | { |
icraggs | 0:ae83cacd60d2 | 731 | messageHandlers[i].topicFilter = &topicFilter; |
icraggs | 0:ae83cacd60d2 | 732 | messageHandlers[i].fp.attach(messageHandler); |
icraggs | 0:ae83cacd60d2 | 733 | rc = 0; |
icraggs | 0:ae83cacd60d2 | 734 | break; |
icraggs | 0:ae83cacd60d2 | 735 | } |
icraggs | 0:ae83cacd60d2 | 736 | } |
icraggs | 0:ae83cacd60d2 | 737 | } |
icraggs | 0:ae83cacd60d2 | 738 | } |
icraggs | 0:ae83cacd60d2 | 739 | else |
icraggs | 0:ae83cacd60d2 | 740 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 741 | |
icraggs | 0:ae83cacd60d2 | 742 | exit: |
icraggs | 0:ae83cacd60d2 | 743 | if (rc != SUCCESS) |
icraggs | 0:ae83cacd60d2 | 744 | isconnected = false; |
icraggs | 0:ae83cacd60d2 | 745 | return rc; |
icraggs | 0:ae83cacd60d2 | 746 | } |
icraggs | 0:ae83cacd60d2 | 747 | |
icraggs | 0:ae83cacd60d2 | 748 | |
icraggs | 0:ae83cacd60d2 | 749 | template<class Network, class Timer, int MAX_PACKET_SIZE, int MAX_MESSAGE_HANDLERS> |
icraggs | 0:ae83cacd60d2 | 750 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, MAX_MESSAGE_HANDLERS>::unsubscribe(MQTTSN_topicid& topicFilter) |
icraggs | 0:ae83cacd60d2 | 751 | { |
icraggs | 0:ae83cacd60d2 | 752 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 753 | Timer timer = Timer(command_timeout_ms); |
icraggs | 0:ae83cacd60d2 | 754 | int len = 0; |
icraggs | 0:ae83cacd60d2 | 755 | |
icraggs | 0:ae83cacd60d2 | 756 | if (!isconnected) |
icraggs | 0:ae83cacd60d2 | 757 | goto exit; |
icraggs | 0:ae83cacd60d2 | 758 | |
icraggs | 0:ae83cacd60d2 | 759 | if ((len = MQTTSNSerialize_unsubscribe(sendbuf, MAX_PACKET_SIZE, packetid.getNext(), &topicFilter)) <= 0) |
icraggs | 0:ae83cacd60d2 | 760 | goto exit; |
icraggs | 0:ae83cacd60d2 | 761 | if ((rc = sendPacket(len, timer)) != SUCCESS) // send the unsubscribe packet |
icraggs | 0:ae83cacd60d2 | 762 | goto exit; // there was a problem |
icraggs | 0:ae83cacd60d2 | 763 | |
icraggs | 0:ae83cacd60d2 | 764 | if (waitfor(MQTTSN_UNSUBACK, timer) == MQTTSN_UNSUBACK) |
icraggs | 0:ae83cacd60d2 | 765 | { |
icraggs | 0:ae83cacd60d2 | 766 | unsigned short mypacketid; // should be the same as the packetid above |
icraggs | 0:ae83cacd60d2 | 767 | if (MQTTSNDeserialize_unsuback(&mypacketid, readbuf, MAX_PACKET_SIZE) == 1) |
icraggs | 0:ae83cacd60d2 | 768 | rc = 0; |
icraggs | 0:ae83cacd60d2 | 769 | } |
icraggs | 0:ae83cacd60d2 | 770 | else |
icraggs | 0:ae83cacd60d2 | 771 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 772 | |
icraggs | 0:ae83cacd60d2 | 773 | exit: |
icraggs | 0:ae83cacd60d2 | 774 | if (rc != SUCCESS) |
icraggs | 0:ae83cacd60d2 | 775 | isconnected = false; |
icraggs | 0:ae83cacd60d2 | 776 | return rc; |
icraggs | 0:ae83cacd60d2 | 777 | } |
icraggs | 0:ae83cacd60d2 | 778 | |
icraggs | 0:ae83cacd60d2 | 779 | |
icraggs | 0:ae83cacd60d2 | 780 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 781 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(int len, Timer& timer, enum QoS qos) |
icraggs | 0:ae83cacd60d2 | 782 | { |
icraggs | 0:ae83cacd60d2 | 783 | int rc; |
icraggs | 0:ae83cacd60d2 | 784 | |
icraggs | 0:ae83cacd60d2 | 785 | if ((rc = sendPacket(len, timer)) != SUCCESS) // send the publish packet |
icraggs | 0:ae83cacd60d2 | 786 | goto exit; // there was a problem |
icraggs | 0:ae83cacd60d2 | 787 | |
icraggs | 0:ae83cacd60d2 | 788 | #if MQTTSNCLIENT_QOS1 |
icraggs | 0:ae83cacd60d2 | 789 | if (qos == QOS1) |
icraggs | 0:ae83cacd60d2 | 790 | { |
icraggs | 0:ae83cacd60d2 | 791 | if (waitfor(MQTTSN_PUBACK, timer) == MQTTSN_PUBACK) |
icraggs | 0:ae83cacd60d2 | 792 | { |
icraggs | 0:ae83cacd60d2 | 793 | unsigned short mypacketid; |
icraggs | 0:ae83cacd60d2 | 794 | unsigned char type; |
icraggs | 0:ae83cacd60d2 | 795 | if (MQTTSNDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) |
icraggs | 0:ae83cacd60d2 | 796 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 797 | else if (inflightMsgid == mypacketid) |
icraggs | 0:ae83cacd60d2 | 798 | inflightMsgid = 0; |
icraggs | 0:ae83cacd60d2 | 799 | } |
icraggs | 0:ae83cacd60d2 | 800 | else |
icraggs | 0:ae83cacd60d2 | 801 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 802 | } |
icraggs | 0:ae83cacd60d2 | 803 | #elif MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 804 | else if (qos == QOS2) |
icraggs | 0:ae83cacd60d2 | 805 | { |
icraggs | 0:ae83cacd60d2 | 806 | if (waitfor(PUBCOMP, timer) == PUBCOMP) |
icraggs | 0:ae83cacd60d2 | 807 | { |
icraggs | 0:ae83cacd60d2 | 808 | unsigned short mypacketid; |
icraggs | 0:ae83cacd60d2 | 809 | unsigned char type; |
icraggs | 0:ae83cacd60d2 | 810 | if (MQTTDeserialize_ack(&type, &mypacketid, readbuf, MAX_PACKET_SIZE) != 1) |
icraggs | 0:ae83cacd60d2 | 811 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 812 | else if (inflightMsgid == mypacketid) |
icraggs | 0:ae83cacd60d2 | 813 | inflightMsgid = 0; |
icraggs | 0:ae83cacd60d2 | 814 | } |
icraggs | 0:ae83cacd60d2 | 815 | else |
icraggs | 0:ae83cacd60d2 | 816 | rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 817 | } |
icraggs | 0:ae83cacd60d2 | 818 | #endif |
icraggs | 0:ae83cacd60d2 | 819 | |
icraggs | 0:ae83cacd60d2 | 820 | exit: |
icraggs | 0:ae83cacd60d2 | 821 | if (rc != SUCCESS) |
icraggs | 0:ae83cacd60d2 | 822 | isconnected = false; |
icraggs | 0:ae83cacd60d2 | 823 | return rc; |
icraggs | 0:ae83cacd60d2 | 824 | } |
icraggs | 0:ae83cacd60d2 | 825 | |
icraggs | 0:ae83cacd60d2 | 826 | |
icraggs | 0:ae83cacd60d2 | 827 | |
icraggs | 0:ae83cacd60d2 | 828 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 829 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topic, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos, bool retained) |
icraggs | 0:ae83cacd60d2 | 830 | { |
icraggs | 0:ae83cacd60d2 | 831 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 832 | Timer timer = Timer(command_timeout_ms); |
icraggs | 0:ae83cacd60d2 | 833 | int len = 0; |
icraggs | 0:ae83cacd60d2 | 834 | |
icraggs | 0:ae83cacd60d2 | 835 | if (!isconnected) |
icraggs | 0:ae83cacd60d2 | 836 | goto exit; |
icraggs | 0:ae83cacd60d2 | 837 | |
icraggs | 0:ae83cacd60d2 | 838 | #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 839 | if (qos == QOS1 || qos == QOS2) |
icraggs | 0:ae83cacd60d2 | 840 | id = packetid.getNext(); |
icraggs | 0:ae83cacd60d2 | 841 | #endif |
icraggs | 0:ae83cacd60d2 | 842 | |
icraggs | 0:ae83cacd60d2 | 843 | len = MQTTSNSerialize_publish(sendbuf, MAX_PACKET_SIZE, 0, qos, retained, id, |
icraggs | 0:ae83cacd60d2 | 844 | topic, (unsigned char*)payload, payloadlen); |
icraggs | 0:ae83cacd60d2 | 845 | if (len <= 0) |
icraggs | 0:ae83cacd60d2 | 846 | goto exit; |
icraggs | 0:ae83cacd60d2 | 847 | |
icraggs | 0:ae83cacd60d2 | 848 | #if MQTTSNCLIENT_QOS1 || MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 849 | if (!cleansession) |
icraggs | 0:ae83cacd60d2 | 850 | { |
icraggs | 0:ae83cacd60d2 | 851 | memcpy(pubbuf, sendbuf, len); |
icraggs | 0:ae83cacd60d2 | 852 | inflightMsgid = id; |
icraggs | 0:ae83cacd60d2 | 853 | inflightLen = len; |
icraggs | 0:ae83cacd60d2 | 854 | inflightQoS = qos; |
icraggs | 0:ae83cacd60d2 | 855 | #if MQTTSNCLIENT_QOS2 |
icraggs | 0:ae83cacd60d2 | 856 | pubrel = false; |
icraggs | 0:ae83cacd60d2 | 857 | #endif |
icraggs | 0:ae83cacd60d2 | 858 | } |
icraggs | 0:ae83cacd60d2 | 859 | #endif |
icraggs | 0:ae83cacd60d2 | 860 | |
icraggs | 0:ae83cacd60d2 | 861 | rc = publish(len, timer, qos); |
icraggs | 0:ae83cacd60d2 | 862 | exit: |
icraggs | 0:ae83cacd60d2 | 863 | return rc; |
icraggs | 0:ae83cacd60d2 | 864 | } |
icraggs | 0:ae83cacd60d2 | 865 | |
icraggs | 0:ae83cacd60d2 | 866 | |
icraggs | 0:ae83cacd60d2 | 867 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 868 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, void* payload, size_t payloadlen, enum QoS qos, bool retained) |
icraggs | 0:ae83cacd60d2 | 869 | { |
icraggs | 0:ae83cacd60d2 | 870 | unsigned short id = 0; // dummy - not used for anything |
icraggs | 0:ae83cacd60d2 | 871 | return publish(topicName, payload, payloadlen, id, qos, retained); |
icraggs | 0:ae83cacd60d2 | 872 | } |
icraggs | 0:ae83cacd60d2 | 873 | |
icraggs | 0:ae83cacd60d2 | 874 | |
icraggs | 0:ae83cacd60d2 | 875 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 876 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::publish(MQTTSN_topicid& topicName, Message& message) |
icraggs | 0:ae83cacd60d2 | 877 | { |
icraggs | 0:ae83cacd60d2 | 878 | return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained); |
icraggs | 0:ae83cacd60d2 | 879 | } |
icraggs | 0:ae83cacd60d2 | 880 | |
icraggs | 0:ae83cacd60d2 | 881 | |
icraggs | 0:ae83cacd60d2 | 882 | template<class Network, class Timer, int MAX_PACKET_SIZE, int b> |
icraggs | 0:ae83cacd60d2 | 883 | int MQTTSN::Client<Network, Timer, MAX_PACKET_SIZE, b>::disconnect(unsigned short duration) |
icraggs | 0:ae83cacd60d2 | 884 | { |
icraggs | 0:ae83cacd60d2 | 885 | int rc = FAILURE; |
icraggs | 0:ae83cacd60d2 | 886 | Timer timer = Timer(command_timeout_ms); // we might wait for incomplete incoming publishes to complete |
icraggs | 0:ae83cacd60d2 | 887 | int int_duration = (duration == 0) ? -1 : (int)duration; |
icraggs | 0:ae83cacd60d2 | 888 | int len = MQTTSNSerialize_disconnect(sendbuf, MAX_PACKET_SIZE, int_duration); |
icraggs | 0:ae83cacd60d2 | 889 | if (len > 0) |
icraggs | 0:ae83cacd60d2 | 890 | rc = sendPacket(len, timer); // send the disconnect packet |
icraggs | 0:ae83cacd60d2 | 891 | |
icraggs | 0:ae83cacd60d2 | 892 | isconnected = false; |
icraggs | 0:ae83cacd60d2 | 893 | return rc; |
icraggs | 0:ae83cacd60d2 | 894 | } |
icraggs | 0:ae83cacd60d2 | 895 | |
icraggs | 0:ae83cacd60d2 | 896 | |
icraggs | 0:ae83cacd60d2 | 897 | #endif |