Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of MQTT by
MQTTClient.h@9:01b8cc7d94cc, 2014-04-09 (annotated)
- Committer:
- icraggs
- Date:
- Wed Apr 09 23:21:54 2014 +0000
- Revision:
- 9:01b8cc7d94cc
- Parent:
- 8:c46930bd6c82
- Child:
- 11:db15da110a37
- Child:
- 12:cc7f2d62a393
Fix function pointer definitions
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
icraggs | 6:4d312a49200b | 1 | /******************************************************************************* |
icraggs | 6:4d312a49200b | 2 | * Copyright (c) 2014 IBM Corp. |
sam_grove | 0:fe461e4d7afe | 3 | * |
icraggs | 6:4d312a49200b | 4 | * All rights reserved. This program and the accompanying materials |
icraggs | 6:4d312a49200b | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
icraggs | 6:4d312a49200b | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
sam_grove | 0:fe461e4d7afe | 7 | * |
icraggs | 6:4d312a49200b | 8 | * The Eclipse Public License is available at |
icraggs | 6:4d312a49200b | 9 | * http://www.eclipse.org/legal/epl-v10.html |
icraggs | 6:4d312a49200b | 10 | * and the Eclipse Distribution License is available at |
icraggs | 6:4d312a49200b | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
sam_grove | 0:fe461e4d7afe | 12 | * |
icraggs | 6:4d312a49200b | 13 | * Contributors: |
icraggs | 6:4d312a49200b | 14 | * Ian Craggs - initial API and implementation and/or initial documentation |
icraggs | 6:4d312a49200b | 15 | *******************************************************************************/ |
sam_grove | 0:fe461e4d7afe | 16 | |
icraggs | 2:dcfdd2abfe71 | 17 | #if !defined(MQTTCLIENT_H) |
icraggs | 2:dcfdd2abfe71 | 18 | #define MQTTCLIENT_H |
icraggs | 2:dcfdd2abfe71 | 19 | |
icraggs | 2:dcfdd2abfe71 | 20 | #include "FP.h" |
icraggs | 3:dbff6b768d28 | 21 | #include "MQTTPacket.h" |
icraggs | 6:4d312a49200b | 22 | #include "stdio.h" |
icraggs | 2:dcfdd2abfe71 | 23 | |
icraggs | 3:dbff6b768d28 | 24 | namespace MQTT |
icraggs | 3:dbff6b768d28 | 25 | { |
icraggs | 3:dbff6b768d28 | 26 | |
icraggs | 2:dcfdd2abfe71 | 27 | |
icraggs | 2:dcfdd2abfe71 | 28 | enum QoS { QOS0, QOS1, QOS2 }; |
sam_grove | 0:fe461e4d7afe | 29 | |
icraggs | 2:dcfdd2abfe71 | 30 | |
icraggs | 3:dbff6b768d28 | 31 | struct Message |
icraggs | 2:dcfdd2abfe71 | 32 | { |
icraggs | 2:dcfdd2abfe71 | 33 | enum QoS qos; |
icraggs | 2:dcfdd2abfe71 | 34 | bool retained; |
icraggs | 2:dcfdd2abfe71 | 35 | bool dup; |
icraggs | 2:dcfdd2abfe71 | 36 | unsigned short msgid; |
icraggs | 2:dcfdd2abfe71 | 37 | void *payload; |
icraggs | 2:dcfdd2abfe71 | 38 | size_t payloadlen; |
sam_grove | 0:fe461e4d7afe | 39 | }; |
sam_grove | 0:fe461e4d7afe | 40 | |
icraggs | 6:4d312a49200b | 41 | template<class Network, class Timer, class Thread> class Client; |
icraggs | 4:4ef00243708e | 42 | |
icraggs | 4:4ef00243708e | 43 | class Result |
icraggs | 4:4ef00243708e | 44 | { |
icraggs | 4:4ef00243708e | 45 | /* success or failure result data */ |
icraggs | 6:4d312a49200b | 46 | Client<class Network, class Timer, class Thread>* client; |
icraggs | 4:4ef00243708e | 47 | }; |
icraggs | 4:4ef00243708e | 48 | |
icraggs | 9:01b8cc7d94cc | 49 | |
icraggs | 9:01b8cc7d94cc | 50 | class PacketId |
icraggs | 9:01b8cc7d94cc | 51 | { |
icraggs | 9:01b8cc7d94cc | 52 | public: |
icraggs | 9:01b8cc7d94cc | 53 | PacketId(); |
icraggs | 9:01b8cc7d94cc | 54 | |
icraggs | 9:01b8cc7d94cc | 55 | int getNext(); |
icraggs | 9:01b8cc7d94cc | 56 | |
icraggs | 9:01b8cc7d94cc | 57 | private: |
icraggs | 9:01b8cc7d94cc | 58 | static const int MAX_PACKET_ID = 65535; |
icraggs | 9:01b8cc7d94cc | 59 | int next; |
icraggs | 9:01b8cc7d94cc | 60 | }; |
icraggs | 9:01b8cc7d94cc | 61 | |
icraggs | 9:01b8cc7d94cc | 62 | typedef void (*resultHandler)(Result*); |
icraggs | 9:01b8cc7d94cc | 63 | typedef void (*messageHandler)(Message*); |
icraggs | 2:dcfdd2abfe71 | 64 | |
icraggs | 6:4d312a49200b | 65 | template<class Network, class Timer, class Thread> class Client |
icraggs | 2:dcfdd2abfe71 | 66 | { |
icraggs | 2:dcfdd2abfe71 | 67 | |
sam_grove | 0:fe461e4d7afe | 68 | public: |
icraggs | 4:4ef00243708e | 69 | |
icraggs | 9:01b8cc7d94cc | 70 | Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30); |
icraggs | 2:dcfdd2abfe71 | 71 | |
icraggs | 9:01b8cc7d94cc | 72 | int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0); |
icraggs | 2:dcfdd2abfe71 | 73 | |
icraggs | 9:01b8cc7d94cc | 74 | template<class T> |
icraggs | 9:01b8cc7d94cc | 75 | int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function |
icraggs | 9:01b8cc7d94cc | 76 | |
icraggs | 9:01b8cc7d94cc | 77 | int publish(const char* topic, Message* message, resultHandler rh = 0); |
sam_grove | 0:fe461e4d7afe | 78 | |
icraggs | 9:01b8cc7d94cc | 79 | int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0); |
icraggs | 2:dcfdd2abfe71 | 80 | |
icraggs | 9:01b8cc7d94cc | 81 | int unsubscribe(char* topicFilter, resultHandler rh = 0); |
icraggs | 9:01b8cc7d94cc | 82 | |
icraggs | 9:01b8cc7d94cc | 83 | int disconnect(int timeout, resultHandler rh = 0); |
sam_grove | 0:fe461e4d7afe | 84 | |
icraggs | 8:c46930bd6c82 | 85 | void run(void const *argument); |
icraggs | 8:c46930bd6c82 | 86 | |
icraggs | 2:dcfdd2abfe71 | 87 | private: |
icraggs | 2:dcfdd2abfe71 | 88 | |
icraggs | 4:4ef00243708e | 89 | int cycle(); |
icraggs | 2:dcfdd2abfe71 | 90 | |
icraggs | 3:dbff6b768d28 | 91 | int decodePacket(int* value, int timeout); |
icraggs | 4:4ef00243708e | 92 | int readPacket(int timeout = -1); |
icraggs | 6:4d312a49200b | 93 | int sendPacket(int length, int timeout = -1); |
icraggs | 3:dbff6b768d28 | 94 | |
icraggs | 4:4ef00243708e | 95 | Thread* thread; |
icraggs | 4:4ef00243708e | 96 | Network* ipstack; |
icraggs | 6:4d312a49200b | 97 | Timer* timer; |
icraggs | 4:4ef00243708e | 98 | |
icraggs | 9:01b8cc7d94cc | 99 | char* buf; |
icraggs | 2:dcfdd2abfe71 | 100 | int buflen; |
icraggs | 2:dcfdd2abfe71 | 101 | |
icraggs | 4:4ef00243708e | 102 | char* readbuf; |
icraggs | 4:4ef00243708e | 103 | int readbuflen; |
icraggs | 4:4ef00243708e | 104 | |
icraggs | 6:4d312a49200b | 105 | int command_timeout; // max time to wait for any MQTT command to complete, in seconds |
icraggs | 6:4d312a49200b | 106 | int keepalive; |
icraggs | 9:01b8cc7d94cc | 107 | PacketId packetid; |
icraggs | 9:01b8cc7d94cc | 108 | |
icraggs | 9:01b8cc7d94cc | 109 | typedef FP<void, Result*> resultHandlerFP; |
icraggs | 9:01b8cc7d94cc | 110 | // how many concurrent operations should we allow? Each one will require a function pointer |
icraggs | 9:01b8cc7d94cc | 111 | resultHandlerFP connectHandler; |
icraggs | 9:01b8cc7d94cc | 112 | |
icraggs | 9:01b8cc7d94cc | 113 | #define MAX_MESSAGE_HANDLERS 5 |
icraggs | 9:01b8cc7d94cc | 114 | typedef FP<void, Message*> messageHandlerFP; |
icraggs | 9:01b8cc7d94cc | 115 | messageHandlerFP messageHandlers[MAX_MESSAGE_HANDLERS]; // Linked list, or constructor parameter to limit array size? |
icraggs | 4:4ef00243708e | 116 | |
sam_grove | 0:fe461e4d7afe | 117 | }; |
sam_grove | 0:fe461e4d7afe | 118 | |
icraggs | 8:c46930bd6c82 | 119 | void threadfn(void* arg); |
icraggs | 8:c46930bd6c82 | 120 | |
icraggs | 8:c46930bd6c82 | 121 | } |
icraggs | 8:c46930bd6c82 | 122 | |
icraggs | 9:01b8cc7d94cc | 123 | template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) : packetid() |
icraggs | 8:c46930bd6c82 | 124 | { |
icraggs | 8:c46930bd6c82 | 125 | |
icraggs | 8:c46930bd6c82 | 126 | buf = new char[buffer_size]; |
icraggs | 8:c46930bd6c82 | 127 | readbuf = new char[buffer_size]; |
icraggs | 8:c46930bd6c82 | 128 | buflen = readbuflen = buffer_size; |
icraggs | 8:c46930bd6c82 | 129 | this->command_timeout = command_timeout; |
icraggs | 8:c46930bd6c82 | 130 | this->thread = 0; |
icraggs | 8:c46930bd6c82 | 131 | this->ipstack = network; |
icraggs | 8:c46930bd6c82 | 132 | this->timer = timer; |
icraggs | 8:c46930bd6c82 | 133 | } |
icraggs | 8:c46930bd6c82 | 134 | |
icraggs | 8:c46930bd6c82 | 135 | |
icraggs | 8:c46930bd6c82 | 136 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout) |
icraggs | 8:c46930bd6c82 | 137 | { |
icraggs | 8:c46930bd6c82 | 138 | int sent = 0; |
icraggs | 8:c46930bd6c82 | 139 | |
icraggs | 8:c46930bd6c82 | 140 | while (sent < length) |
icraggs | 8:c46930bd6c82 | 141 | sent += ipstack->write(&buf[sent], length, -1); |
icraggs | 8:c46930bd6c82 | 142 | |
icraggs | 8:c46930bd6c82 | 143 | return sent; |
icraggs | 8:c46930bd6c82 | 144 | } |
icraggs | 8:c46930bd6c82 | 145 | |
icraggs | 8:c46930bd6c82 | 146 | |
icraggs | 8:c46930bd6c82 | 147 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout) |
icraggs | 8:c46930bd6c82 | 148 | { |
icraggs | 8:c46930bd6c82 | 149 | char c; |
icraggs | 8:c46930bd6c82 | 150 | int multiplier = 1; |
icraggs | 8:c46930bd6c82 | 151 | int len = 0; |
icraggs | 8:c46930bd6c82 | 152 | #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 |
icraggs | 8:c46930bd6c82 | 153 | |
icraggs | 8:c46930bd6c82 | 154 | *value = 0; |
icraggs | 8:c46930bd6c82 | 155 | do |
icraggs | 8:c46930bd6c82 | 156 | { |
icraggs | 8:c46930bd6c82 | 157 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 8:c46930bd6c82 | 158 | |
icraggs | 8:c46930bd6c82 | 159 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 8:c46930bd6c82 | 160 | { |
icraggs | 8:c46930bd6c82 | 161 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 8:c46930bd6c82 | 162 | goto exit; |
icraggs | 8:c46930bd6c82 | 163 | } |
icraggs | 8:c46930bd6c82 | 164 | rc = ipstack->read(&c, 1, timeout); |
icraggs | 8:c46930bd6c82 | 165 | if (rc != 1) |
icraggs | 8:c46930bd6c82 | 166 | goto exit; |
icraggs | 8:c46930bd6c82 | 167 | *value += (c & 127) * multiplier; |
icraggs | 8:c46930bd6c82 | 168 | multiplier *= 128; |
icraggs | 8:c46930bd6c82 | 169 | } while ((c & 128) != 0); |
icraggs | 8:c46930bd6c82 | 170 | exit: |
icraggs | 8:c46930bd6c82 | 171 | return len; |
icraggs | 8:c46930bd6c82 | 172 | } |
icraggs | 8:c46930bd6c82 | 173 | |
icraggs | 8:c46930bd6c82 | 174 | |
icraggs | 8:c46930bd6c82 | 175 | /** |
icraggs | 8:c46930bd6c82 | 176 | * If any read fails in this method, then we should disconnect from the network, as on reconnect |
icraggs | 8:c46930bd6c82 | 177 | * the packets can be retried. |
icraggs | 8:c46930bd6c82 | 178 | * @param timeout the max time to wait for the packet read to complete, in milliseconds |
icraggs | 8:c46930bd6c82 | 179 | * @return the MQTT packet type, or -1 if none |
icraggs | 8:c46930bd6c82 | 180 | */ |
icraggs | 8:c46930bd6c82 | 181 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout) |
icraggs | 8:c46930bd6c82 | 182 | { |
icraggs | 8:c46930bd6c82 | 183 | int rc = -1; |
icraggs | 8:c46930bd6c82 | 184 | MQTTHeader header = {0}; |
icraggs | 8:c46930bd6c82 | 185 | int len = 0; |
icraggs | 8:c46930bd6c82 | 186 | int rem_len = 0; |
icraggs | 8:c46930bd6c82 | 187 | |
icraggs | 8:c46930bd6c82 | 188 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 8:c46930bd6c82 | 189 | if (ipstack->read(readbuf, 1, timeout) != 1) |
icraggs | 8:c46930bd6c82 | 190 | goto exit; |
icraggs | 8:c46930bd6c82 | 191 | |
icraggs | 8:c46930bd6c82 | 192 | len = 1; |
icraggs | 8:c46930bd6c82 | 193 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 8:c46930bd6c82 | 194 | decodePacket(&rem_len, timeout); |
icraggs | 8:c46930bd6c82 | 195 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ |
icraggs | 8:c46930bd6c82 | 196 | |
icraggs | 8:c46930bd6c82 | 197 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 8:c46930bd6c82 | 198 | if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) |
icraggs | 8:c46930bd6c82 | 199 | goto exit; |
icraggs | 8:c46930bd6c82 | 200 | |
icraggs | 8:c46930bd6c82 | 201 | header.byte = readbuf[0]; |
icraggs | 8:c46930bd6c82 | 202 | rc = header.bits.type; |
icraggs | 8:c46930bd6c82 | 203 | exit: |
icraggs | 8:c46930bd6c82 | 204 | return rc; |
icraggs | 3:dbff6b768d28 | 205 | } |
icraggs | 3:dbff6b768d28 | 206 | |
icraggs | 8:c46930bd6c82 | 207 | |
icraggs | 8:c46930bd6c82 | 208 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle() |
icraggs | 8:c46930bd6c82 | 209 | { |
icraggs | 9:01b8cc7d94cc | 210 | int timeout = -1; |
icraggs | 8:c46930bd6c82 | 211 | /* get one piece of work off the wire and one pass through */ |
icraggs | 8:c46930bd6c82 | 212 | |
icraggs | 8:c46930bd6c82 | 213 | // 1. read the socket, see what work is due. |
icraggs | 9:01b8cc7d94cc | 214 | int packet_type = readPacket(timeout); |
icraggs | 8:c46930bd6c82 | 215 | |
icraggs | 8:c46930bd6c82 | 216 | printf("packet type %d\n", packet_type); |
icraggs | 8:c46930bd6c82 | 217 | |
icraggs | 8:c46930bd6c82 | 218 | switch (packet_type) |
icraggs | 8:c46930bd6c82 | 219 | { |
icraggs | 8:c46930bd6c82 | 220 | case CONNACK: |
icraggs | 8:c46930bd6c82 | 221 | printf("connack received\n"); |
icraggs | 8:c46930bd6c82 | 222 | break; |
icraggs | 8:c46930bd6c82 | 223 | case PUBLISH: |
icraggs | 8:c46930bd6c82 | 224 | break; |
icraggs | 8:c46930bd6c82 | 225 | case PUBACK: |
icraggs | 8:c46930bd6c82 | 226 | break; |
icraggs | 8:c46930bd6c82 | 227 | case SUBACK: |
icraggs | 8:c46930bd6c82 | 228 | break; |
icraggs | 8:c46930bd6c82 | 229 | case PUBREC: |
icraggs | 8:c46930bd6c82 | 230 | break; |
icraggs | 8:c46930bd6c82 | 231 | case PUBCOMP: |
icraggs | 8:c46930bd6c82 | 232 | break; |
icraggs | 8:c46930bd6c82 | 233 | case PINGRESP: |
icraggs | 8:c46930bd6c82 | 234 | break; |
icraggs | 8:c46930bd6c82 | 235 | case -1: |
icraggs | 8:c46930bd6c82 | 236 | break; |
icraggs | 8:c46930bd6c82 | 237 | } |
icraggs | 8:c46930bd6c82 | 238 | return packet_type; |
icraggs | 8:c46930bd6c82 | 239 | } |
icraggs | 8:c46930bd6c82 | 240 | |
icraggs | 8:c46930bd6c82 | 241 | |
icraggs | 8:c46930bd6c82 | 242 | template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument) |
icraggs | 8:c46930bd6c82 | 243 | { |
icraggs | 8:c46930bd6c82 | 244 | } |
icraggs | 8:c46930bd6c82 | 245 | |
icraggs | 8:c46930bd6c82 | 246 | |
icraggs | 9:01b8cc7d94cc | 247 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler) |
icraggs | 8:c46930bd6c82 | 248 | { |
icraggs | 8:c46930bd6c82 | 249 | int len = 0; |
icraggs | 8:c46930bd6c82 | 250 | int rc = -99; |
icraggs | 8:c46930bd6c82 | 251 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
icraggs | 8:c46930bd6c82 | 252 | |
icraggs | 8:c46930bd6c82 | 253 | /* 2. if the connect was successful, send the MQTT connect packet */ |
icraggs | 8:c46930bd6c82 | 254 | if (options == 0) |
icraggs | 8:c46930bd6c82 | 255 | { |
icraggs | 8:c46930bd6c82 | 256 | default_options.clientID.cstring = "me"; |
icraggs | 8:c46930bd6c82 | 257 | options = &default_options; |
icraggs | 8:c46930bd6c82 | 258 | } |
icraggs | 8:c46930bd6c82 | 259 | |
icraggs | 8:c46930bd6c82 | 260 | this->keepalive = options->keepAliveInterval; |
icraggs | 8:c46930bd6c82 | 261 | len = MQTTSerialize_connect(buf, buflen, options); |
icraggs | 8:c46930bd6c82 | 262 | printf("len from send is %d %d\n", len, buflen); |
icraggs | 8:c46930bd6c82 | 263 | rc = sendPacket(len); // send the connect packet |
icraggs | 8:c46930bd6c82 | 264 | printf("rc from send is %d\n", rc); |
icraggs | 8:c46930bd6c82 | 265 | |
icraggs | 8:c46930bd6c82 | 266 | /* 3. wait until the connack is received */ |
icraggs | 8:c46930bd6c82 | 267 | if (resultHandler == 0) |
icraggs | 8:c46930bd6c82 | 268 | { |
icraggs | 8:c46930bd6c82 | 269 | // this will be a blocking call, wait for the connack |
icraggs | 8:c46930bd6c82 | 270 | if (cycle() == CONNACK) |
icraggs | 8:c46930bd6c82 | 271 | { |
icraggs | 8:c46930bd6c82 | 272 | int connack_rc = -1; |
icraggs | 8:c46930bd6c82 | 273 | if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) |
icraggs | 8:c46930bd6c82 | 274 | rc = connack_rc; |
icraggs | 8:c46930bd6c82 | 275 | } |
icraggs | 8:c46930bd6c82 | 276 | } |
icraggs | 8:c46930bd6c82 | 277 | else |
icraggs | 8:c46930bd6c82 | 278 | { |
icraggs | 8:c46930bd6c82 | 279 | // set connect response callback function |
icraggs | 9:01b8cc7d94cc | 280 | connectHandler.attach(resultHandler); |
icraggs | 8:c46930bd6c82 | 281 | |
icraggs | 8:c46930bd6c82 | 282 | // start background thread |
icraggs | 8:c46930bd6c82 | 283 | |
icraggs | 8:c46930bd6c82 | 284 | this->thread = new Thread((void (*)(void const *argument))&MQTT::threadfn, (void*)this); |
icraggs | 8:c46930bd6c82 | 285 | } |
icraggs | 8:c46930bd6c82 | 286 | |
icraggs | 8:c46930bd6c82 | 287 | return rc; |
icraggs | 8:c46930bd6c82 | 288 | } |
icraggs | 8:c46930bd6c82 | 289 | |
icraggs | 8:c46930bd6c82 | 290 | |
icraggs | 9:01b8cc7d94cc | 291 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, |
icraggs | 9:01b8cc7d94cc | 292 | messageHandler messageHandler, resultHandler resultHandler) |
icraggs | 8:c46930bd6c82 | 293 | { |
icraggs | 8:c46930bd6c82 | 294 | int rc = -1, |
icraggs | 8:c46930bd6c82 | 295 | len = 0; |
icraggs | 8:c46930bd6c82 | 296 | MQTTString topic = {(char*)topicFilter, 0, 0}; |
icraggs | 8:c46930bd6c82 | 297 | |
icraggs | 9:01b8cc7d94cc | 298 | len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
icraggs | 8:c46930bd6c82 | 299 | rc = sendPacket(len); // send the subscribe packet |
icraggs | 8:c46930bd6c82 | 300 | |
icraggs | 8:c46930bd6c82 | 301 | /* wait for suback */ |
icraggs | 8:c46930bd6c82 | 302 | if (resultHandler == 0) |
icraggs | 8:c46930bd6c82 | 303 | { |
icraggs | 8:c46930bd6c82 | 304 | // this will block |
icraggs | 8:c46930bd6c82 | 305 | if (cycle() == SUBACK) |
icraggs | 8:c46930bd6c82 | 306 | { |
icraggs | 9:01b8cc7d94cc | 307 | int count = 0, grantedQoS = -1, mypacketid; |
icraggs | 9:01b8cc7d94cc | 308 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) |
icraggs | 8:c46930bd6c82 | 309 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
icraggs | 8:c46930bd6c82 | 310 | } |
icraggs | 8:c46930bd6c82 | 311 | } |
icraggs | 8:c46930bd6c82 | 312 | else |
icraggs | 8:c46930bd6c82 | 313 | { |
icraggs | 8:c46930bd6c82 | 314 | // set subscribe response callback function |
icraggs | 8:c46930bd6c82 | 315 | |
icraggs | 8:c46930bd6c82 | 316 | } |
icraggs | 8:c46930bd6c82 | 317 | |
icraggs | 8:c46930bd6c82 | 318 | return rc; |
icraggs | 8:c46930bd6c82 | 319 | } |
icraggs | 8:c46930bd6c82 | 320 | |
icraggs | 8:c46930bd6c82 | 321 | |
sam_grove | 0:fe461e4d7afe | 322 | #endif |