Modified MQTT for Mbed OS.

Dependencies:   FP MQTTPacket

Dependents:   mbed-os-mqtt door_lock co657_IoT nucleo-f429zi-mbed-os-mqtt

Fork of MQTT by MQTT

Committer:
icraggs
Date:
Sun Apr 13 22:32:28 2014 +0000
Revision:
15:64a57183aa03
Parent:
13:fd82db992024
Child:
16:91c2f9a144d4
I really want the arrays to be allocated in automatic storage

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 6:4d312a49200b 1 /*******************************************************************************
icraggs 6:4d312a49200b 2 * Copyright (c) 2014 IBM Corp.
sam_grove 0:fe461e4d7afe 3 *
icraggs 6:4d312a49200b 4 * All rights reserved. This program and the accompanying materials
icraggs 6:4d312a49200b 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 6:4d312a49200b 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
sam_grove 0:fe461e4d7afe 7 *
icraggs 6:4d312a49200b 8 * The Eclipse Public License is available at
icraggs 6:4d312a49200b 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 6:4d312a49200b 10 * and the Eclipse Distribution License is available at
icraggs 6:4d312a49200b 11 * http://www.eclipse.org/org/documents/edl-v10.php.
sam_grove 0:fe461e4d7afe 12 *
icraggs 6:4d312a49200b 13 * Contributors:
icraggs 6:4d312a49200b 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 6:4d312a49200b 15 *******************************************************************************/
sam_grove 0:fe461e4d7afe 16
icraggs 2:dcfdd2abfe71 17 #if !defined(MQTTCLIENT_H)
icraggs 2:dcfdd2abfe71 18 #define MQTTCLIENT_H
icraggs 2:dcfdd2abfe71 19
icraggs 2:dcfdd2abfe71 20 #include "FP.h"
icraggs 3:dbff6b768d28 21 #include "MQTTPacket.h"
icraggs 6:4d312a49200b 22 #include "stdio.h"
icraggs 2:dcfdd2abfe71 23
icraggs 3:dbff6b768d28 24 namespace MQTT
icraggs 3:dbff6b768d28 25 {
icraggs 3:dbff6b768d28 26
icraggs 2:dcfdd2abfe71 27
icraggs 2:dcfdd2abfe71 28 enum QoS { QOS0, QOS1, QOS2 };
sam_grove 0:fe461e4d7afe 29
icraggs 2:dcfdd2abfe71 30
icraggs 3:dbff6b768d28 31 struct Message
icraggs 2:dcfdd2abfe71 32 {
icraggs 2:dcfdd2abfe71 33 enum QoS qos;
icraggs 2:dcfdd2abfe71 34 bool retained;
icraggs 2:dcfdd2abfe71 35 bool dup;
Ian Craggs 12:cc7f2d62a393 36 unsigned short id;
icraggs 2:dcfdd2abfe71 37 void *payload;
icraggs 2:dcfdd2abfe71 38 size_t payloadlen;
sam_grove 0:fe461e4d7afe 39 };
sam_grove 0:fe461e4d7afe 40
icraggs 6:4d312a49200b 41 template<class Network, class Timer, class Thread> class Client;
icraggs 4:4ef00243708e 42
icraggs 9:01b8cc7d94cc 43 class PacketId
icraggs 9:01b8cc7d94cc 44 {
icraggs 9:01b8cc7d94cc 45 public:
icraggs 9:01b8cc7d94cc 46 PacketId();
icraggs 9:01b8cc7d94cc 47
icraggs 15:64a57183aa03 48 int getNext();
Ian Craggs 12:cc7f2d62a393 49
icraggs 9:01b8cc7d94cc 50 private:
icraggs 9:01b8cc7d94cc 51 static const int MAX_PACKET_ID = 65535;
icraggs 9:01b8cc7d94cc 52 int next;
icraggs 9:01b8cc7d94cc 53 };
icraggs 9:01b8cc7d94cc 54
icraggs 9:01b8cc7d94cc 55 typedef void (*messageHandler)(Message*);
icraggs 2:dcfdd2abfe71 56
icraggs 6:4d312a49200b 57 template<class Network, class Timer, class Thread> class Client
icraggs 2:dcfdd2abfe71 58 {
icraggs 2:dcfdd2abfe71 59
icraggs 15:64a57183aa03 60 public:
icraggs 15:64a57183aa03 61
Ian Craggs 12:cc7f2d62a393 62 struct Result
Ian Craggs 12:cc7f2d62a393 63 {
Ian Craggs 12:cc7f2d62a393 64 /* success or failure result data */
icraggs 15:64a57183aa03 65 Client<Network, Timer, Thread>* client;
Ian Craggs 12:cc7f2d62a393 66 int connack_rc;
icraggs 15:64a57183aa03 67 };
icraggs 15:64a57183aa03 68
Ian Craggs 12:cc7f2d62a393 69 typedef void (*resultHandler)(Result*);
icraggs 15:64a57183aa03 70
icraggs 15:64a57183aa03 71 struct limits
icraggs 15:64a57183aa03 72 {
icraggs 15:64a57183aa03 73 int MAX_MQTT_PACKET_SIZE; //
icraggs 15:64a57183aa03 74 int MAX_MESSAGE_HANDLERS; // 5 - each subscription requires a message handler
icraggs 15:64a57183aa03 75 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
icraggs 15:64a57183aa03 76 int command_timeout;
icraggs 15:64a57183aa03 77
icraggs 15:64a57183aa03 78 limits()
icraggs 15:64a57183aa03 79 {
icraggs 15:64a57183aa03 80 MAX_MQTT_PACKET_SIZE = 100;
icraggs 15:64a57183aa03 81 MAX_MESSAGE_HANDLERS = 5;
icraggs 15:64a57183aa03 82 MAX_CONCURRENT_OPERATIONS= 5;
icraggs 15:64a57183aa03 83 command_timeout = 30;
icraggs 15:64a57183aa03 84 }
icraggs 15:64a57183aa03 85 };
icraggs 4:4ef00243708e 86
Ian Craggs 12:cc7f2d62a393 87 Client(Network* network, const int MAX_MQTT_PACKET_SIZE = 100, const int command_timeout = 30);
icraggs 2:dcfdd2abfe71 88
icraggs 9:01b8cc7d94cc 89 int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);
icraggs 2:dcfdd2abfe71 90
icraggs 9:01b8cc7d94cc 91 template<class T>
icraggs 9:01b8cc7d94cc 92 int connect(MQTTPacket_connectData* options = 0, T *item = 0, void(T::*method)(Result *) = 0); // alternative to pass in pointer to member function
icraggs 9:01b8cc7d94cc 93
icraggs 9:01b8cc7d94cc 94 int publish(const char* topic, Message* message, resultHandler rh = 0);
sam_grove 0:fe461e4d7afe 95
icraggs 9:01b8cc7d94cc 96 int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
icraggs 2:dcfdd2abfe71 97
Ian Craggs 12:cc7f2d62a393 98 int unsubscribe(const char* topicFilter, resultHandler rh = 0);
icraggs 9:01b8cc7d94cc 99
icraggs 9:01b8cc7d94cc 100 int disconnect(int timeout, resultHandler rh = 0);
sam_grove 0:fe461e4d7afe 101
icraggs 8:c46930bd6c82 102 void run(void const *argument);
icraggs 8:c46930bd6c82 103
icraggs 2:dcfdd2abfe71 104 private:
icraggs 2:dcfdd2abfe71 105
icraggs 15:64a57183aa03 106 int cycle(int timeout);
Ian Craggs 12:cc7f2d62a393 107 int keepalive();
icraggs 2:dcfdd2abfe71 108
icraggs 3:dbff6b768d28 109 int decodePacket(int* value, int timeout);
icraggs 4:4ef00243708e 110 int readPacket(int timeout = -1);
icraggs 6:4d312a49200b 111 int sendPacket(int length, int timeout = -1);
icraggs 15:64a57183aa03 112 int deliverMessage(MQTTString* topic, Message* message);
icraggs 3:dbff6b768d28 113
icraggs 4:4ef00243708e 114 Thread* thread;
icraggs 4:4ef00243708e 115 Network* ipstack;
Ian Craggs 12:cc7f2d62a393 116 Timer command_timer, ping_timer;
icraggs 4:4ef00243708e 117
icraggs 15:64a57183aa03 118 char buf[];
icraggs 2:dcfdd2abfe71 119 int buflen;
icraggs 2:dcfdd2abfe71 120
icraggs 4:4ef00243708e 121 char* readbuf;
icraggs 15:64a57183aa03 122 int readbuflen;
icraggs 15:64a57183aa03 123
icraggs 15:64a57183aa03 124 unsigned int keepAliveInterval;
Ian Craggs 12:cc7f2d62a393 125 bool ping_outstanding;
icraggs 4:4ef00243708e 126
icraggs 6:4d312a49200b 127 int command_timeout; // max time to wait for any MQTT command to complete, in seconds
icraggs 9:01b8cc7d94cc 128 PacketId packetid;
icraggs 9:01b8cc7d94cc 129
icraggs 9:01b8cc7d94cc 130 typedef FP<void, Result*> resultHandlerFP;
icraggs 9:01b8cc7d94cc 131 resultHandlerFP connectHandler;
icraggs 9:01b8cc7d94cc 132
icraggs 9:01b8cc7d94cc 133 #define MAX_MESSAGE_HANDLERS 5
icraggs 9:01b8cc7d94cc 134 typedef FP<void, Message*> messageHandlerFP;
icraggs 15:64a57183aa03 135 struct
icraggs 15:64a57183aa03 136 {
icraggs 15:64a57183aa03 137 char* topic;
icraggs 15:64a57183aa03 138 messageHandlerFP fp;
icraggs 15:64a57183aa03 139 } messageHandlers[MAX_MESSAGE_HANDLERS]; // Message handlers are linked to a subscription topic
icraggs 15:64a57183aa03 140
icraggs 15:64a57183aa03 141 // how many concurrent operations should we allow? Each one will require a function pointer
icraggs 15:64a57183aa03 142 struct
icraggs 15:64a57183aa03 143 {
icraggs 15:64a57183aa03 144 unsigned short id;
icraggs 15:64a57183aa03 145 resultHandlerFP fp;
icraggs 15:64a57183aa03 146 MQTTString* topic; // if this is a publish, store topic name in case republishing is required
icraggs 15:64a57183aa03 147 Message* message; // for publish,
icraggs 15:64a57183aa03 148 } *operations; // result handlers are indexed by packet ids
icraggs 15:64a57183aa03 149
Ian Craggs 12:cc7f2d62a393 150 static void threadfn(void* arg);
icraggs 11:db15da110a37 151
sam_grove 0:fe461e4d7afe 152 };
sam_grove 0:fe461e4d7afe 153
icraggs 15:64a57183aa03 154 }
icraggs 15:64a57183aa03 155
icraggs 15:64a57183aa03 156
icraggs 15:64a57183aa03 157 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::threadfn(void* arg)
icraggs 15:64a57183aa03 158 {
icraggs 15:64a57183aa03 159 ((Client<Network, Timer, Thread>*) arg)->run(NULL);
icraggs 11:db15da110a37 160 }
icraggs 11:db15da110a37 161
icraggs 11:db15da110a37 162
Ian Craggs 12:cc7f2d62a393 163 template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, const int MAX_MQTT_PACKET_SIZE, const int command_timeout) : packetid()
icraggs 8:c46930bd6c82 164 {
icraggs 15:64a57183aa03 165 //buf = new char[MAX_MQTT_PACKET_SIZE];
Ian Craggs 12:cc7f2d62a393 166 readbuf = new char[MAX_MQTT_PACKET_SIZE];
Ian Craggs 12:cc7f2d62a393 167 buflen = readbuflen = MAX_MQTT_PACKET_SIZE;
icraggs 15:64a57183aa03 168
icraggs 8:c46930bd6c82 169 this->command_timeout = command_timeout;
icraggs 8:c46930bd6c82 170 this->thread = 0;
icraggs 8:c46930bd6c82 171 this->ipstack = network;
icraggs 15:64a57183aa03 172 this->command_timer = Timer();
icraggs 15:64a57183aa03 173 this->ping_timer = Timer();
Ian Craggs 12:cc7f2d62a393 174 this->ping_outstanding = 0;
icraggs 8:c46930bd6c82 175 }
icraggs 8:c46930bd6c82 176
icraggs 8:c46930bd6c82 177
icraggs 8:c46930bd6c82 178 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout)
icraggs 8:c46930bd6c82 179 {
icraggs 8:c46930bd6c82 180 int sent = 0;
icraggs 8:c46930bd6c82 181
icraggs 8:c46930bd6c82 182 while (sent < length)
icraggs 15:64a57183aa03 183 sent += ipstack->write(&buf[sent], length, -1);
Ian Craggs 12:cc7f2d62a393 184 if (sent == length)
Ian Craggs 12:cc7f2d62a393 185 ping_timer.reset(); // record the fact that we have successfully sent the packet
icraggs 8:c46930bd6c82 186 return sent;
icraggs 8:c46930bd6c82 187 }
icraggs 8:c46930bd6c82 188
icraggs 8:c46930bd6c82 189
icraggs 8:c46930bd6c82 190 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout)
icraggs 8:c46930bd6c82 191 {
icraggs 8:c46930bd6c82 192 char c;
icraggs 8:c46930bd6c82 193 int multiplier = 1;
icraggs 8:c46930bd6c82 194 int len = 0;
icraggs 15:64a57183aa03 195 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
icraggs 8:c46930bd6c82 196
icraggs 8:c46930bd6c82 197 *value = 0;
icraggs 8:c46930bd6c82 198 do
icraggs 8:c46930bd6c82 199 {
icraggs 8:c46930bd6c82 200 int rc = MQTTPACKET_READ_ERROR;
icraggs 8:c46930bd6c82 201
icraggs 8:c46930bd6c82 202 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 8:c46930bd6c82 203 {
icraggs 8:c46930bd6c82 204 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 8:c46930bd6c82 205 goto exit;
icraggs 8:c46930bd6c82 206 }
icraggs 8:c46930bd6c82 207 rc = ipstack->read(&c, 1, timeout);
icraggs 8:c46930bd6c82 208 if (rc != 1)
icraggs 8:c46930bd6c82 209 goto exit;
icraggs 8:c46930bd6c82 210 *value += (c & 127) * multiplier;
icraggs 8:c46930bd6c82 211 multiplier *= 128;
icraggs 8:c46930bd6c82 212 } while ((c & 128) != 0);
icraggs 8:c46930bd6c82 213 exit:
icraggs 8:c46930bd6c82 214 return len;
icraggs 8:c46930bd6c82 215 }
icraggs 8:c46930bd6c82 216
icraggs 8:c46930bd6c82 217
icraggs 8:c46930bd6c82 218 /**
icraggs 8:c46930bd6c82 219 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 8:c46930bd6c82 220 * the packets can be retried.
icraggs 8:c46930bd6c82 221 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 8:c46930bd6c82 222 * @return the MQTT packet type, or -1 if none
icraggs 8:c46930bd6c82 223 */
icraggs 8:c46930bd6c82 224 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout)
icraggs 8:c46930bd6c82 225 {
icraggs 8:c46930bd6c82 226 int rc = -1;
icraggs 8:c46930bd6c82 227 MQTTHeader header = {0};
icraggs 8:c46930bd6c82 228 int len = 0;
icraggs 8:c46930bd6c82 229 int rem_len = 0;
icraggs 8:c46930bd6c82 230
icraggs 8:c46930bd6c82 231 /* 1. read the header byte. This has the packet type in it */
icraggs 8:c46930bd6c82 232 if (ipstack->read(readbuf, 1, timeout) != 1)
icraggs 8:c46930bd6c82 233 goto exit;
icraggs 8:c46930bd6c82 234
icraggs 8:c46930bd6c82 235 len = 1;
icraggs 8:c46930bd6c82 236 /* 2. read the remaining length. This is variable in itself */
icraggs 8:c46930bd6c82 237 decodePacket(&rem_len, timeout);
icraggs 8:c46930bd6c82 238 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 8:c46930bd6c82 239
icraggs 8:c46930bd6c82 240 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 8:c46930bd6c82 241 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
icraggs 8:c46930bd6c82 242 goto exit;
icraggs 8:c46930bd6c82 243
icraggs 8:c46930bd6c82 244 header.byte = readbuf[0];
icraggs 8:c46930bd6c82 245 rc = header.bits.type;
icraggs 8:c46930bd6c82 246 exit:
icraggs 8:c46930bd6c82 247 return rc;
icraggs 3:dbff6b768d28 248 }
icraggs 3:dbff6b768d28 249
icraggs 8:c46930bd6c82 250
icraggs 15:64a57183aa03 251 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::deliverMessage(MQTTString* topic, Message* message)
icraggs 15:64a57183aa03 252 {
icraggs 15:64a57183aa03 253 }
icraggs 15:64a57183aa03 254
icraggs 15:64a57183aa03 255
Ian Craggs 12:cc7f2d62a393 256 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle(int timeout)
icraggs 8:c46930bd6c82 257 {
icraggs 8:c46930bd6c82 258 /* get one piece of work off the wire and one pass through */
icraggs 8:c46930bd6c82 259
Ian Craggs 12:cc7f2d62a393 260 // read the socket, see what work is due
icraggs 9:01b8cc7d94cc 261 int packet_type = readPacket(timeout);
icraggs 8:c46930bd6c82 262
icraggs 8:c46930bd6c82 263 printf("packet type %d\n", packet_type);
icraggs 15:64a57183aa03 264
Ian Craggs 12:cc7f2d62a393 265 int len, rc;
icraggs 8:c46930bd6c82 266 switch (packet_type)
icraggs 8:c46930bd6c82 267 {
icraggs 15:64a57183aa03 268 case CONNACK:
icraggs 15:64a57183aa03 269 if (this->thread)
icraggs 15:64a57183aa03 270 {
icraggs 15:64a57183aa03 271 Result res = {this, 0};
Ian Craggs 12:cc7f2d62a393 272 if (MQTTDeserialize_connack(&res.connack_rc, readbuf, readbuflen) == 1)
icraggs 15:64a57183aa03 273 ;
icraggs 15:64a57183aa03 274 connectHandler(&res);
icraggs 15:64a57183aa03 275 connectHandler.detach(); // only invoke the callback once
icraggs 15:64a57183aa03 276 }
icraggs 8:c46930bd6c82 277 case PUBACK:
icraggs 8:c46930bd6c82 278 case SUBACK:
icraggs 8:c46930bd6c82 279 break;
icraggs 15:64a57183aa03 280 case PUBLISH:
icraggs 15:64a57183aa03 281 MQTTString topicName;
icraggs 15:64a57183aa03 282 Message msg;
icraggs 15:64a57183aa03 283 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName,
icraggs 15:64a57183aa03 284 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, readbuflen);
icraggs 15:64a57183aa03 285 if (msg.qos == QOS0)
icraggs 15:64a57183aa03 286 deliverMessage(&topicName, &msg);
Ian Craggs 12:cc7f2d62a393 287 break;
icraggs 15:64a57183aa03 288 case PUBREC:
Ian Craggs 12:cc7f2d62a393 289 int type, dup, mypacketid;
Ian Craggs 12:cc7f2d62a393 290 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
icraggs 15:64a57183aa03 291 ;
icraggs 15:64a57183aa03 292 // must lock this access against the application thread, if we are multi-threaded
Ian Craggs 12:cc7f2d62a393 293 len = MQTTSerialize_ack(buf, buflen, PUBREL, 0, mypacketid);
icraggs 15:64a57183aa03 294 rc = sendPacket(len); // send the subscribe packet
icraggs 15:64a57183aa03 295 if (rc != len)
icraggs 15:64a57183aa03 296 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 297
icraggs 8:c46930bd6c82 298 break;
icraggs 8:c46930bd6c82 299 case PUBCOMP:
icraggs 8:c46930bd6c82 300 break;
icraggs 15:64a57183aa03 301 case PINGRESP:
icraggs 15:64a57183aa03 302 ping_outstanding = false;
icraggs 8:c46930bd6c82 303 break;
icraggs 15:64a57183aa03 304 }
icraggs 15:64a57183aa03 305 keepalive();
Ian Craggs 12:cc7f2d62a393 306 exit:
icraggs 8:c46930bd6c82 307 return packet_type;
icraggs 15:64a57183aa03 308 }
icraggs 15:64a57183aa03 309
icraggs 15:64a57183aa03 310
Ian Craggs 12:cc7f2d62a393 311 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::keepalive()
icraggs 15:64a57183aa03 312 {
icraggs 15:64a57183aa03 313 int rc = 0;
icraggs 15:64a57183aa03 314
icraggs 15:64a57183aa03 315 if (keepAliveInterval == 0)
icraggs 15:64a57183aa03 316 goto exit;
icraggs 15:64a57183aa03 317
icraggs 15:64a57183aa03 318 if (ping_timer.read_ms() >= (keepAliveInterval * 1000))
icraggs 15:64a57183aa03 319 {
icraggs 15:64a57183aa03 320 if (ping_outstanding)
icraggs 15:64a57183aa03 321 rc = -1;
icraggs 15:64a57183aa03 322 else
icraggs 15:64a57183aa03 323 {
icraggs 15:64a57183aa03 324 int len = MQTTSerialize_pingreq(buf, buflen);
icraggs 15:64a57183aa03 325 rc = sendPacket(len); // send the connect packet
icraggs 15:64a57183aa03 326 if (rc != len)
icraggs 15:64a57183aa03 327 rc = -1; // indicate there's a problem
icraggs 15:64a57183aa03 328 else
icraggs 15:64a57183aa03 329 ping_outstanding = true;
icraggs 15:64a57183aa03 330 }
icraggs 15:64a57183aa03 331 }
icraggs 15:64a57183aa03 332
icraggs 15:64a57183aa03 333 exit:
Ian Craggs 12:cc7f2d62a393 334 return rc;
icraggs 8:c46930bd6c82 335 }
icraggs 8:c46930bd6c82 336
icraggs 8:c46930bd6c82 337
icraggs 8:c46930bd6c82 338 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
icraggs 15:64a57183aa03 339 {
icraggs 15:64a57183aa03 340 while (true)
Ian Craggs 12:cc7f2d62a393 341 cycle((keepAliveInterval * 1000) - ping_timer.read_ms());
icraggs 8:c46930bd6c82 342 }
icraggs 8:c46930bd6c82 343
icraggs 8:c46930bd6c82 344
icraggs 9:01b8cc7d94cc 345 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, resultHandler resultHandler)
icraggs 15:64a57183aa03 346 {
icraggs 15:64a57183aa03 347 command_timer.start();
icraggs 8:c46930bd6c82 348
Ian Craggs 12:cc7f2d62a393 349 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 8:c46930bd6c82 350 if (options == 0)
Ian Craggs 12:cc7f2d62a393 351 options = &default_options; // set default options if none were supplied
icraggs 8:c46930bd6c82 352
icraggs 15:64a57183aa03 353 this->keepAliveInterval = options->keepAliveInterval;
Ian Craggs 12:cc7f2d62a393 354 ping_timer.start();
Ian Craggs 12:cc7f2d62a393 355 int len = MQTTSerialize_connect(buf, buflen, options);
icraggs 15:64a57183aa03 356 int rc = sendPacket(len); // send the connect packet
icraggs 15:64a57183aa03 357 if (rc != len)
Ian Craggs 12:cc7f2d62a393 358 goto exit; // there was a problem
icraggs 8:c46930bd6c82 359
Ian Craggs 12:cc7f2d62a393 360 if (resultHandler == 0) // wait until the connack is received
icraggs 15:64a57183aa03 361 {
icraggs 8:c46930bd6c82 362 // this will be a blocking call, wait for the connack
icraggs 15:64a57183aa03 363 do
icraggs 8:c46930bd6c82 364 {
icraggs 15:64a57183aa03 365 if (command_timer.read_ms() > (command_timeout * 1000))
icraggs 15:64a57183aa03 366 goto exit; // we timed out
icraggs 15:64a57183aa03 367 }
icraggs 15:64a57183aa03 368 while (cycle(command_timeout - command_timer.read_ms()) != CONNACK);
icraggs 15:64a57183aa03 369 int connack_rc = -1;
icraggs 15:64a57183aa03 370 if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
icraggs 15:64a57183aa03 371 rc = connack_rc;
icraggs 8:c46930bd6c82 372 }
icraggs 8:c46930bd6c82 373 else
icraggs 8:c46930bd6c82 374 {
icraggs 8:c46930bd6c82 375 // set connect response callback function
icraggs 9:01b8cc7d94cc 376 connectHandler.attach(resultHandler);
icraggs 8:c46930bd6c82 377
Ian Craggs 12:cc7f2d62a393 378 // start background thread
icraggs 11:db15da110a37 379 this->thread = new Thread((void (*)(void const *argument))&MQTT::Client<Network, Timer, Thread>::threadfn, (void*)this);
icraggs 8:c46930bd6c82 380 }
icraggs 15:64a57183aa03 381
icraggs 15:64a57183aa03 382 exit:
icraggs 15:64a57183aa03 383 command_timer.stop();
Ian Craggs 12:cc7f2d62a393 384 command_timer.reset();
icraggs 8:c46930bd6c82 385 return rc;
icraggs 8:c46930bd6c82 386 }
icraggs 8:c46930bd6c82 387
icraggs 8:c46930bd6c82 388
Ian Craggs 12:cc7f2d62a393 389 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, messageHandler messageHandler, resultHandler resultHandler)
icraggs 15:64a57183aa03 390 {
icraggs 15:64a57183aa03 391 command_timer.start();
Ian Craggs 12:cc7f2d62a393 392
icraggs 8:c46930bd6c82 393 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 394
Ian Craggs 12:cc7f2d62a393 395 int len = MQTTSerialize_subscribe(buf, buflen, 0, packetid.getNext(), 1, &topic, (int*)&qos);
icraggs 15:64a57183aa03 396 int rc = sendPacket(len); // send the subscribe packet
icraggs 15:64a57183aa03 397 if (rc != len)
Ian Craggs 12:cc7f2d62a393 398 goto exit; // there was a problem
icraggs 8:c46930bd6c82 399
icraggs 8:c46930bd6c82 400 /* wait for suback */
icraggs 8:c46930bd6c82 401 if (resultHandler == 0)
icraggs 8:c46930bd6c82 402 {
icraggs 8:c46930bd6c82 403 // this will block
Ian Craggs 12:cc7f2d62a393 404 if (cycle(command_timeout - command_timer.read_ms()) == SUBACK)
icraggs 8:c46930bd6c82 405 {
icraggs 9:01b8cc7d94cc 406 int count = 0, grantedQoS = -1, mypacketid;
icraggs 9:01b8cc7d94cc 407 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
icraggs 8:c46930bd6c82 408 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 8:c46930bd6c82 409 }
icraggs 8:c46930bd6c82 410 }
icraggs 8:c46930bd6c82 411 else
icraggs 8:c46930bd6c82 412 {
icraggs 8:c46930bd6c82 413 // set subscribe response callback function
icraggs 8:c46930bd6c82 414
icraggs 8:c46930bd6c82 415 }
icraggs 15:64a57183aa03 416
icraggs 15:64a57183aa03 417 exit:
icraggs 15:64a57183aa03 418 command_timer.stop();
Ian Craggs 12:cc7f2d62a393 419 command_timer.reset();
Ian Craggs 12:cc7f2d62a393 420 return rc;
icraggs 15:64a57183aa03 421 }
icraggs 15:64a57183aa03 422
icraggs 15:64a57183aa03 423
Ian Craggs 12:cc7f2d62a393 424 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::unsubscribe(const char* topicFilter, resultHandler resultHandler)
icraggs 15:64a57183aa03 425 {
icraggs 15:64a57183aa03 426 command_timer.start();
Ian Craggs 12:cc7f2d62a393 427
Ian Craggs 12:cc7f2d62a393 428 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 429
Ian Craggs 12:cc7f2d62a393 430 int len = MQTTSerialize_unsubscribe(buf, buflen, 0, packetid.getNext(), 1, &topic);
icraggs 15:64a57183aa03 431 int rc = sendPacket(len); // send the subscribe packet
icraggs 15:64a57183aa03 432 if (rc != len)
Ian Craggs 12:cc7f2d62a393 433 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 434
Ian Craggs 12:cc7f2d62a393 435 /* wait for suback */
Ian Craggs 12:cc7f2d62a393 436 if (resultHandler == 0)
Ian Craggs 12:cc7f2d62a393 437 {
Ian Craggs 12:cc7f2d62a393 438 // this will block
Ian Craggs 12:cc7f2d62a393 439 if (cycle(command_timeout - command_timer.read_ms()) == UNSUBACK)
Ian Craggs 12:cc7f2d62a393 440 {
Ian Craggs 12:cc7f2d62a393 441 int mypacketid;
Ian Craggs 12:cc7f2d62a393 442 if (MQTTDeserialize_unsuback(&mypacketid, readbuf, readbuflen) == 1)
Ian Craggs 12:cc7f2d62a393 443 rc = 0;
Ian Craggs 12:cc7f2d62a393 444 }
Ian Craggs 12:cc7f2d62a393 445 }
Ian Craggs 12:cc7f2d62a393 446 else
Ian Craggs 12:cc7f2d62a393 447 {
Ian Craggs 12:cc7f2d62a393 448 // set unsubscribe response callback function
Ian Craggs 12:cc7f2d62a393 449
Ian Craggs 12:cc7f2d62a393 450 }
icraggs 15:64a57183aa03 451
icraggs 15:64a57183aa03 452 exit:
icraggs 15:64a57183aa03 453 command_timer.stop();
Ian Craggs 12:cc7f2d62a393 454 command_timer.reset();
Ian Craggs 12:cc7f2d62a393 455 return rc;
icraggs 15:64a57183aa03 456 }
icraggs 15:64a57183aa03 457
icraggs 15:64a57183aa03 458
icraggs 15:64a57183aa03 459
Ian Craggs 12:cc7f2d62a393 460 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::publish(const char* topicName, Message* message, resultHandler resultHandler)
icraggs 15:64a57183aa03 461 {
icraggs 15:64a57183aa03 462 command_timer.start();
Ian Craggs 12:cc7f2d62a393 463
icraggs 15:64a57183aa03 464 MQTTString topic = {(char*)topicName, 0, 0};
icraggs 15:64a57183aa03 465
Ian Craggs 12:cc7f2d62a393 466 message->id = packetid.getNext();
icraggs 15:64a57183aa03 467
Ian Craggs 12:cc7f2d62a393 468 int len = MQTTSerialize_publish(buf, buflen, 0, message->qos, message->retained, message->id, topic, message->payload, message->payloadlen);
icraggs 15:64a57183aa03 469 int rc = sendPacket(len); // send the subscribe packet
icraggs 15:64a57183aa03 470 if (rc != len)
Ian Craggs 12:cc7f2d62a393 471 goto exit; // there was a problem
Ian Craggs 12:cc7f2d62a393 472
Ian Craggs 12:cc7f2d62a393 473 /* wait for acks */
Ian Craggs 12:cc7f2d62a393 474 if (resultHandler == 0)
Ian Craggs 12:cc7f2d62a393 475 {
icraggs 15:64a57183aa03 476 if (message->qos == QOS1)
Ian Craggs 12:cc7f2d62a393 477 {
Ian Craggs 12:cc7f2d62a393 478 if (cycle(command_timeout - command_timer.read_ms()) == PUBACK)
Ian Craggs 12:cc7f2d62a393 479 {
Ian Craggs 12:cc7f2d62a393 480 int type, dup, mypacketid;
Ian Craggs 12:cc7f2d62a393 481 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
Ian Craggs 12:cc7f2d62a393 482 rc = 0;
icraggs 15:64a57183aa03 483 }
icraggs 15:64a57183aa03 484 }
icraggs 15:64a57183aa03 485 else if (message->qos == QOS2)
icraggs 15:64a57183aa03 486 {
icraggs 15:64a57183aa03 487 if (cycle(command_timeout - command_timer.read_ms()) == PUBCOMP)
icraggs 15:64a57183aa03 488 {
icraggs 15:64a57183aa03 489 int type, dup, mypacketid;
icraggs 15:64a57183aa03 490 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, readbuflen) == 1)
icraggs 15:64a57183aa03 491 rc = 0;
icraggs 15:64a57183aa03 492 }
icraggs 15:64a57183aa03 493
Ian Craggs 12:cc7f2d62a393 494 }
Ian Craggs 12:cc7f2d62a393 495 }
Ian Craggs 12:cc7f2d62a393 496 else
Ian Craggs 12:cc7f2d62a393 497 {
Ian Craggs 12:cc7f2d62a393 498 // set publish response callback function
Ian Craggs 12:cc7f2d62a393 499
Ian Craggs 12:cc7f2d62a393 500 }
icraggs 15:64a57183aa03 501
icraggs 15:64a57183aa03 502 exit:
icraggs 15:64a57183aa03 503 command_timer.stop();
Ian Craggs 12:cc7f2d62a393 504 command_timer.reset();
icraggs 8:c46930bd6c82 505 return rc;
icraggs 8:c46930bd6c82 506 }
icraggs 8:c46930bd6c82 507
icraggs 8:c46930bd6c82 508
sam_grove 0:fe461e4d7afe 509 #endif