Better timing

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Committer:
icraggs
Date:
Wed Apr 09 13:48:20 2014 +0000
Revision:
8:c46930bd6c82
Parent:
6:4d312a49200b
Child:
9:01b8cc7d94cc
Threading updates

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 6:4d312a49200b 1 /*******************************************************************************
icraggs 6:4d312a49200b 2 * Copyright (c) 2014 IBM Corp.
sam_grove 0:fe461e4d7afe 3 *
icraggs 6:4d312a49200b 4 * All rights reserved. This program and the accompanying materials
icraggs 6:4d312a49200b 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 6:4d312a49200b 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
sam_grove 0:fe461e4d7afe 7 *
icraggs 6:4d312a49200b 8 * The Eclipse Public License is available at
icraggs 6:4d312a49200b 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 6:4d312a49200b 10 * and the Eclipse Distribution License is available at
icraggs 6:4d312a49200b 11 * http://www.eclipse.org/org/documents/edl-v10.php.
sam_grove 0:fe461e4d7afe 12 *
icraggs 6:4d312a49200b 13 * Contributors:
icraggs 6:4d312a49200b 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 6:4d312a49200b 15 *******************************************************************************/
sam_grove 0:fe461e4d7afe 16
icraggs 2:dcfdd2abfe71 17 #if !defined(MQTTCLIENT_H)
icraggs 2:dcfdd2abfe71 18 #define MQTTCLIENT_H
icraggs 2:dcfdd2abfe71 19
icraggs 2:dcfdd2abfe71 20 #include "FP.h"
icraggs 3:dbff6b768d28 21 #include "MQTTPacket.h"
icraggs 6:4d312a49200b 22 #include "stdio.h"
icraggs 2:dcfdd2abfe71 23
icraggs 3:dbff6b768d28 24 namespace MQTT
icraggs 3:dbff6b768d28 25 {
icraggs 6:4d312a49200b 26
icraggs 6:4d312a49200b 27 const int MAX_PACKET_ID = 65535;
icraggs 3:dbff6b768d28 28
icraggs 2:dcfdd2abfe71 29
icraggs 2:dcfdd2abfe71 30 enum QoS { QOS0, QOS1, QOS2 };
sam_grove 0:fe461e4d7afe 31
icraggs 2:dcfdd2abfe71 32
icraggs 3:dbff6b768d28 33 struct Message
icraggs 2:dcfdd2abfe71 34 {
icraggs 2:dcfdd2abfe71 35 enum QoS qos;
icraggs 2:dcfdd2abfe71 36 bool retained;
icraggs 2:dcfdd2abfe71 37 bool dup;
icraggs 2:dcfdd2abfe71 38 unsigned short msgid;
icraggs 2:dcfdd2abfe71 39 void *payload;
icraggs 2:dcfdd2abfe71 40 size_t payloadlen;
sam_grove 0:fe461e4d7afe 41 };
sam_grove 0:fe461e4d7afe 42
icraggs 6:4d312a49200b 43 template<class Network, class Timer, class Thread> class Client;
icraggs 4:4ef00243708e 44
icraggs 4:4ef00243708e 45 class Result
icraggs 4:4ef00243708e 46 {
icraggs 4:4ef00243708e 47 /* success or failure result data */
icraggs 6:4d312a49200b 48 Client<class Network, class Timer, class Thread>* client;
icraggs 4:4ef00243708e 49 };
icraggs 4:4ef00243708e 50
icraggs 2:dcfdd2abfe71 51
icraggs 6:4d312a49200b 52 template<class Network, class Timer, class Thread> class Client
icraggs 2:dcfdd2abfe71 53 {
icraggs 2:dcfdd2abfe71 54
sam_grove 0:fe461e4d7afe 55 public:
icraggs 4:4ef00243708e 56
icraggs 6:4d312a49200b 57 Client(Network* network, Timer* timer, const int buffer_size = 100, const int command_timeout = 30);
icraggs 2:dcfdd2abfe71 58
icraggs 4:4ef00243708e 59 int connect(MQTTPacket_connectData* options = 0, FP<void, Result*> *resultHandler = 0);
icraggs 2:dcfdd2abfe71 60
icraggs 6:4d312a49200b 61 int publish(const char* topic, Message* message, FP<void, Result*> *resultHandler = 0);
icraggs 2:dcfdd2abfe71 62
icraggs 6:4d312a49200b 63 int subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, FP<void, Result*> *resultHandler = 0);
sam_grove 0:fe461e4d7afe 64
icraggs 4:4ef00243708e 65 int unsubscribe(char* topicFilter, FP<void, Result*> *resultHandler = 0);
icraggs 2:dcfdd2abfe71 66
icraggs 4:4ef00243708e 67 int disconnect(int timeout, FP<void, Result*> *resultHandler = 0);
sam_grove 0:fe461e4d7afe 68
icraggs 8:c46930bd6c82 69 void run(void const *argument);
icraggs 8:c46930bd6c82 70
icraggs 2:dcfdd2abfe71 71 private:
icraggs 2:dcfdd2abfe71 72
icraggs 6:4d312a49200b 73 int getPacketId();
icraggs 4:4ef00243708e 74 int cycle();
icraggs 2:dcfdd2abfe71 75
icraggs 3:dbff6b768d28 76 int decodePacket(int* value, int timeout);
icraggs 4:4ef00243708e 77 int readPacket(int timeout = -1);
icraggs 6:4d312a49200b 78 int sendPacket(int length, int timeout = -1);
icraggs 3:dbff6b768d28 79
icraggs 4:4ef00243708e 80 Thread* thread;
icraggs 4:4ef00243708e 81 Network* ipstack;
icraggs 6:4d312a49200b 82 Timer* timer;
icraggs 4:4ef00243708e 83
icraggs 2:dcfdd2abfe71 84 char* buf;
icraggs 2:dcfdd2abfe71 85 int buflen;
icraggs 2:dcfdd2abfe71 86
icraggs 4:4ef00243708e 87 char* readbuf;
icraggs 4:4ef00243708e 88 int readbuflen;
icraggs 4:4ef00243708e 89
icraggs 6:4d312a49200b 90 int command_timeout; // max time to wait for any MQTT command to complete, in seconds
icraggs 6:4d312a49200b 91 int keepalive;
icraggs 6:4d312a49200b 92 int packetid;
icraggs 4:4ef00243708e 93
sam_grove 0:fe461e4d7afe 94 };
sam_grove 0:fe461e4d7afe 95
icraggs 8:c46930bd6c82 96 void threadfn(void* arg);
icraggs 8:c46930bd6c82 97
icraggs 8:c46930bd6c82 98 }
icraggs 8:c46930bd6c82 99
icraggs 8:c46930bd6c82 100 template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)
icraggs 8:c46930bd6c82 101 {
icraggs 8:c46930bd6c82 102
icraggs 8:c46930bd6c82 103 buf = new char[buffer_size];
icraggs 8:c46930bd6c82 104 readbuf = new char[buffer_size];
icraggs 8:c46930bd6c82 105 buflen = readbuflen = buffer_size;
icraggs 8:c46930bd6c82 106 this->command_timeout = command_timeout;
icraggs 8:c46930bd6c82 107 this->thread = 0;
icraggs 8:c46930bd6c82 108 this->ipstack = network;
icraggs 8:c46930bd6c82 109 this->packetid = 0;
icraggs 8:c46930bd6c82 110 this->timer = timer;
icraggs 8:c46930bd6c82 111 }
icraggs 8:c46930bd6c82 112
icraggs 8:c46930bd6c82 113
icraggs 8:c46930bd6c82 114 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId()
icraggs 8:c46930bd6c82 115 {
icraggs 8:c46930bd6c82 116 return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid;
icraggs 8:c46930bd6c82 117 }
icraggs 8:c46930bd6c82 118
icraggs 8:c46930bd6c82 119
icraggs 8:c46930bd6c82 120 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout)
icraggs 8:c46930bd6c82 121 {
icraggs 8:c46930bd6c82 122 int sent = 0;
icraggs 8:c46930bd6c82 123
icraggs 8:c46930bd6c82 124 while (sent < length)
icraggs 8:c46930bd6c82 125 sent += ipstack->write(&buf[sent], length, -1);
icraggs 8:c46930bd6c82 126
icraggs 8:c46930bd6c82 127 return sent;
icraggs 8:c46930bd6c82 128 }
icraggs 8:c46930bd6c82 129
icraggs 8:c46930bd6c82 130
icraggs 8:c46930bd6c82 131 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout)
icraggs 8:c46930bd6c82 132 {
icraggs 8:c46930bd6c82 133 char c;
icraggs 8:c46930bd6c82 134 int multiplier = 1;
icraggs 8:c46930bd6c82 135 int len = 0;
icraggs 8:c46930bd6c82 136 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
icraggs 8:c46930bd6c82 137
icraggs 8:c46930bd6c82 138 *value = 0;
icraggs 8:c46930bd6c82 139 do
icraggs 8:c46930bd6c82 140 {
icraggs 8:c46930bd6c82 141 int rc = MQTTPACKET_READ_ERROR;
icraggs 8:c46930bd6c82 142
icraggs 8:c46930bd6c82 143 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 8:c46930bd6c82 144 {
icraggs 8:c46930bd6c82 145 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 8:c46930bd6c82 146 goto exit;
icraggs 8:c46930bd6c82 147 }
icraggs 8:c46930bd6c82 148 rc = ipstack->read(&c, 1, timeout);
icraggs 8:c46930bd6c82 149 if (rc != 1)
icraggs 8:c46930bd6c82 150 goto exit;
icraggs 8:c46930bd6c82 151 *value += (c & 127) * multiplier;
icraggs 8:c46930bd6c82 152 multiplier *= 128;
icraggs 8:c46930bd6c82 153 } while ((c & 128) != 0);
icraggs 8:c46930bd6c82 154 exit:
icraggs 8:c46930bd6c82 155 return len;
icraggs 8:c46930bd6c82 156 }
icraggs 8:c46930bd6c82 157
icraggs 8:c46930bd6c82 158
icraggs 8:c46930bd6c82 159 /**
icraggs 8:c46930bd6c82 160 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 8:c46930bd6c82 161 * the packets can be retried.
icraggs 8:c46930bd6c82 162 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 8:c46930bd6c82 163 * @return the MQTT packet type, or -1 if none
icraggs 8:c46930bd6c82 164 */
icraggs 8:c46930bd6c82 165 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout)
icraggs 8:c46930bd6c82 166 {
icraggs 8:c46930bd6c82 167 int rc = -1;
icraggs 8:c46930bd6c82 168 MQTTHeader header = {0};
icraggs 8:c46930bd6c82 169 int len = 0;
icraggs 8:c46930bd6c82 170 int rem_len = 0;
icraggs 8:c46930bd6c82 171
icraggs 8:c46930bd6c82 172 /* 1. read the header byte. This has the packet type in it */
icraggs 8:c46930bd6c82 173 if (ipstack->read(readbuf, 1, timeout) != 1)
icraggs 8:c46930bd6c82 174 goto exit;
icraggs 8:c46930bd6c82 175
icraggs 8:c46930bd6c82 176 len = 1;
icraggs 8:c46930bd6c82 177 /* 2. read the remaining length. This is variable in itself */
icraggs 8:c46930bd6c82 178 decodePacket(&rem_len, timeout);
icraggs 8:c46930bd6c82 179 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 8:c46930bd6c82 180
icraggs 8:c46930bd6c82 181 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 8:c46930bd6c82 182 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
icraggs 8:c46930bd6c82 183 goto exit;
icraggs 8:c46930bd6c82 184
icraggs 8:c46930bd6c82 185 header.byte = readbuf[0];
icraggs 8:c46930bd6c82 186 rc = header.bits.type;
icraggs 8:c46930bd6c82 187 exit:
icraggs 8:c46930bd6c82 188 return rc;
icraggs 3:dbff6b768d28 189 }
icraggs 3:dbff6b768d28 190
icraggs 8:c46930bd6c82 191
icraggs 8:c46930bd6c82 192 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
icraggs 8:c46930bd6c82 193 {
icraggs 8:c46930bd6c82 194 int timeout = 1000L;
icraggs 8:c46930bd6c82 195 /* get one piece of work off the wire and one pass through */
icraggs 8:c46930bd6c82 196
icraggs 8:c46930bd6c82 197 // 1. read the socket, see what work is due.
icraggs 8:c46930bd6c82 198 int packet_type = readPacket(-1);
icraggs 8:c46930bd6c82 199
icraggs 8:c46930bd6c82 200 printf("packet type %d\n", packet_type);
icraggs 8:c46930bd6c82 201
icraggs 8:c46930bd6c82 202 switch (packet_type)
icraggs 8:c46930bd6c82 203 {
icraggs 8:c46930bd6c82 204 case CONNACK:
icraggs 8:c46930bd6c82 205 printf("connack received\n");
icraggs 8:c46930bd6c82 206 break;
icraggs 8:c46930bd6c82 207 case PUBLISH:
icraggs 8:c46930bd6c82 208 break;
icraggs 8:c46930bd6c82 209 case PUBACK:
icraggs 8:c46930bd6c82 210 break;
icraggs 8:c46930bd6c82 211 case SUBACK:
icraggs 8:c46930bd6c82 212 break;
icraggs 8:c46930bd6c82 213 case PUBREC:
icraggs 8:c46930bd6c82 214 break;
icraggs 8:c46930bd6c82 215 case PUBCOMP:
icraggs 8:c46930bd6c82 216 break;
icraggs 8:c46930bd6c82 217 case PINGRESP:
icraggs 8:c46930bd6c82 218 break;
icraggs 8:c46930bd6c82 219 case -1:
icraggs 8:c46930bd6c82 220 break;
icraggs 8:c46930bd6c82 221 }
icraggs 8:c46930bd6c82 222 return packet_type;
icraggs 8:c46930bd6c82 223 }
icraggs 8:c46930bd6c82 224
icraggs 8:c46930bd6c82 225
icraggs 8:c46930bd6c82 226 template<class Network, class Timer, class Thread> void MQTT::Client<Network, Timer, Thread>::run(void const *argument)
icraggs 8:c46930bd6c82 227 {
icraggs 8:c46930bd6c82 228 }
icraggs 8:c46930bd6c82 229
icraggs 8:c46930bd6c82 230
icraggs 8:c46930bd6c82 231 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
icraggs 8:c46930bd6c82 232 {
icraggs 8:c46930bd6c82 233 int len = 0;
icraggs 8:c46930bd6c82 234 int rc = -99;
icraggs 8:c46930bd6c82 235 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 8:c46930bd6c82 236
icraggs 8:c46930bd6c82 237 /* 2. if the connect was successful, send the MQTT connect packet */
icraggs 8:c46930bd6c82 238 if (options == 0)
icraggs 8:c46930bd6c82 239 {
icraggs 8:c46930bd6c82 240 default_options.clientID.cstring = "me";
icraggs 8:c46930bd6c82 241 options = &default_options;
icraggs 8:c46930bd6c82 242 }
icraggs 8:c46930bd6c82 243
icraggs 8:c46930bd6c82 244 this->keepalive = options->keepAliveInterval;
icraggs 8:c46930bd6c82 245 len = MQTTSerialize_connect(buf, buflen, options);
icraggs 8:c46930bd6c82 246 printf("len from send is %d %d\n", len, buflen);
icraggs 8:c46930bd6c82 247 rc = sendPacket(len); // send the connect packet
icraggs 8:c46930bd6c82 248 printf("rc from send is %d\n", rc);
icraggs 8:c46930bd6c82 249
icraggs 8:c46930bd6c82 250 /* 3. wait until the connack is received */
icraggs 8:c46930bd6c82 251 if (resultHandler == 0)
icraggs 8:c46930bd6c82 252 {
icraggs 8:c46930bd6c82 253 // this will be a blocking call, wait for the connack
icraggs 8:c46930bd6c82 254 if (cycle() == CONNACK)
icraggs 8:c46930bd6c82 255 {
icraggs 8:c46930bd6c82 256 int connack_rc = -1;
icraggs 8:c46930bd6c82 257 if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
icraggs 8:c46930bd6c82 258 rc = connack_rc;
icraggs 8:c46930bd6c82 259 }
icraggs 8:c46930bd6c82 260 }
icraggs 8:c46930bd6c82 261 else
icraggs 8:c46930bd6c82 262 {
icraggs 8:c46930bd6c82 263 // set connect response callback function
icraggs 8:c46930bd6c82 264
icraggs 8:c46930bd6c82 265 // start background thread
icraggs 8:c46930bd6c82 266
icraggs 8:c46930bd6c82 267 this->thread = new Thread((void (*)(void const *argument))&MQTT::threadfn, (void*)this);
icraggs 8:c46930bd6c82 268 }
icraggs 8:c46930bd6c82 269
icraggs 8:c46930bd6c82 270 return rc;
icraggs 8:c46930bd6c82 271 }
icraggs 8:c46930bd6c82 272
icraggs 8:c46930bd6c82 273
icraggs 8:c46930bd6c82 274 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler,
icraggs 8:c46930bd6c82 275 FP<void, Result*> *resultHandler)
icraggs 8:c46930bd6c82 276 {
icraggs 8:c46930bd6c82 277 int rc = -1,
icraggs 8:c46930bd6c82 278 len = 0;
icraggs 8:c46930bd6c82 279 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 8:c46930bd6c82 280
icraggs 8:c46930bd6c82 281 len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos);
icraggs 8:c46930bd6c82 282 rc = sendPacket(len); // send the subscribe packet
icraggs 8:c46930bd6c82 283
icraggs 8:c46930bd6c82 284 /* wait for suback */
icraggs 8:c46930bd6c82 285 if (resultHandler == 0)
icraggs 8:c46930bd6c82 286 {
icraggs 8:c46930bd6c82 287 // this will block
icraggs 8:c46930bd6c82 288 if (cycle() == SUBACK)
icraggs 8:c46930bd6c82 289 {
icraggs 8:c46930bd6c82 290 int count = 0, grantedQoS = -1;
icraggs 8:c46930bd6c82 291 if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
icraggs 8:c46930bd6c82 292 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 8:c46930bd6c82 293 }
icraggs 8:c46930bd6c82 294 }
icraggs 8:c46930bd6c82 295 else
icraggs 8:c46930bd6c82 296 {
icraggs 8:c46930bd6c82 297 // set subscribe response callback function
icraggs 8:c46930bd6c82 298
icraggs 8:c46930bd6c82 299 }
icraggs 8:c46930bd6c82 300
icraggs 8:c46930bd6c82 301 return rc;
icraggs 8:c46930bd6c82 302 }
icraggs 8:c46930bd6c82 303
icraggs 8:c46930bd6c82 304
sam_grove 0:fe461e4d7afe 305 #endif