An API for using MQTT over multiple transports for mbed OS 5

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Committer:
icraggs
Date:
Tue Apr 08 22:54:37 2014 +0000
Revision:
6:4d312a49200b
Parent:
5:389ccac5a50c
Child:
7:f9d690fb6dad
Subscribing

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 6:4d312a49200b 1 /*******************************************************************************
icraggs 6:4d312a49200b 2 * Copyright (c) 2014 IBM Corp.
sam_grove 0:fe461e4d7afe 3 *
icraggs 6:4d312a49200b 4 * All rights reserved. This program and the accompanying materials
icraggs 6:4d312a49200b 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 6:4d312a49200b 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
sam_grove 0:fe461e4d7afe 7 *
icraggs 6:4d312a49200b 8 * The Eclipse Public License is available at
icraggs 6:4d312a49200b 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 6:4d312a49200b 10 * and the Eclipse Distribution License is available at
icraggs 6:4d312a49200b 11 * http://www.eclipse.org/org/documents/edl-v10.php.
sam_grove 0:fe461e4d7afe 12 *
icraggs 6:4d312a49200b 13 * Contributors:
icraggs 6:4d312a49200b 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 6:4d312a49200b 15 *******************************************************************************/
sam_grove 0:fe461e4d7afe 16
icraggs 2:dcfdd2abfe71 17 #include "MQTTClient.h"
icraggs 2:dcfdd2abfe71 18 #include "MQTTPacket.h"
icraggs 2:dcfdd2abfe71 19
icraggs 6:4d312a49200b 20 template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout)
icraggs 2:dcfdd2abfe71 21 {
icraggs 2:dcfdd2abfe71 22
icraggs 6:4d312a49200b 23 buf = new char[buffer_size];
Ian Craggs 5:389ccac5a50c 24 readbuf = new char[buffer_size];
icraggs 3:dbff6b768d28 25 this->ipstack = ipstack;
icraggs 4:4ef00243708e 26 this->command_timeout = command_timeout;
Ian Craggs 5:389ccac5a50c 27 //this->thread = new Thread(0); // only need a background thread for non-blocking mode
icraggs 4:4ef00243708e 28 this->ipstack = network;
icraggs 6:4d312a49200b 29 this->packetid = 0;
icraggs 6:4d312a49200b 30 this->timer = timer;
icraggs 3:dbff6b768d28 31 }
icraggs 3:dbff6b768d28 32
icraggs 3:dbff6b768d28 33
icraggs 6:4d312a49200b 34 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId()
icraggs 6:4d312a49200b 35 {
icraggs 6:4d312a49200b 36 return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid;
icraggs 6:4d312a49200b 37 }
icraggs 6:4d312a49200b 38
icraggs 6:4d312a49200b 39
icraggs 6:4d312a49200b 40 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout)
icraggs 3:dbff6b768d28 41 {
icraggs 3:dbff6b768d28 42 int sent = 0;
icraggs 3:dbff6b768d28 43
icraggs 3:dbff6b768d28 44 while (sent < length)
icraggs 4:4ef00243708e 45 sent += ipstack->write(&buf[sent], length, -1);
icraggs 3:dbff6b768d28 46
icraggs 3:dbff6b768d28 47 return sent;
icraggs 3:dbff6b768d28 48 }
icraggs 3:dbff6b768d28 49
icraggs 3:dbff6b768d28 50
icraggs 6:4d312a49200b 51 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout)
icraggs 3:dbff6b768d28 52 {
icraggs 3:dbff6b768d28 53 char c;
icraggs 3:dbff6b768d28 54 int multiplier = 1;
icraggs 3:dbff6b768d28 55 int len = 0;
icraggs 3:dbff6b768d28 56 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
icraggs 3:dbff6b768d28 57
icraggs 3:dbff6b768d28 58 *value = 0;
icraggs 3:dbff6b768d28 59 do
icraggs 3:dbff6b768d28 60 {
icraggs 3:dbff6b768d28 61 int rc = MQTTPACKET_READ_ERROR;
icraggs 3:dbff6b768d28 62
icraggs 3:dbff6b768d28 63 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 3:dbff6b768d28 64 {
icraggs 3:dbff6b768d28 65 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 3:dbff6b768d28 66 goto exit;
icraggs 3:dbff6b768d28 67 }
icraggs 3:dbff6b768d28 68 rc = ipstack->read(&c, 1, timeout);
icraggs 3:dbff6b768d28 69 if (rc != 1)
icraggs 3:dbff6b768d28 70 goto exit;
icraggs 3:dbff6b768d28 71 *value += (c & 127) * multiplier;
icraggs 3:dbff6b768d28 72 multiplier *= 128;
icraggs 3:dbff6b768d28 73 } while ((c & 128) != 0);
icraggs 3:dbff6b768d28 74 exit:
icraggs 3:dbff6b768d28 75 return len;
icraggs 2:dcfdd2abfe71 76 }
icraggs 2:dcfdd2abfe71 77
sam_grove 0:fe461e4d7afe 78
icraggs 6:4d312a49200b 79 /**
icraggs 6:4d312a49200b 80 * If any read fails in this method, then we should disconnect from the network, as on reconnect
icraggs 6:4d312a49200b 81 * the packets can be retried.
icraggs 6:4d312a49200b 82 * @param timeout the max time to wait for the packet read to complete, in milliseconds
icraggs 6:4d312a49200b 83 * @return the MQTT packet type, or -1 if none
icraggs 6:4d312a49200b 84 */
icraggs 6:4d312a49200b 85 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout)
icraggs 2:dcfdd2abfe71 86 {
icraggs 3:dbff6b768d28 87 int rc = -1;
icraggs 4:4ef00243708e 88 MQTTHeader header = {0};
icraggs 3:dbff6b768d28 89 int len = 0;
icraggs 3:dbff6b768d28 90 int rem_len = 0;
icraggs 3:dbff6b768d28 91
icraggs 3:dbff6b768d28 92 /* 1. read the header byte. This has the packet type in it */
icraggs 6:4d312a49200b 93 if (ipstack->read(readbuf, 1, timeout) != 1)
icraggs 3:dbff6b768d28 94 goto exit;
icraggs 3:dbff6b768d28 95
icraggs 3:dbff6b768d28 96 len = 1;
icraggs 3:dbff6b768d28 97 /* 2. read the remaining length. This is variable in itself */
icraggs 3:dbff6b768d28 98 decodePacket(&rem_len, timeout);
icraggs 3:dbff6b768d28 99 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 3:dbff6b768d28 100
icraggs 3:dbff6b768d28 101 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 6:4d312a49200b 102 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len)
icraggs 3:dbff6b768d28 103 goto exit;
icraggs 3:dbff6b768d28 104
Ian Craggs 5:389ccac5a50c 105 header.byte = readbuf[0];
icraggs 3:dbff6b768d28 106 rc = header.bits.type;
icraggs 3:dbff6b768d28 107 exit:
icraggs 3:dbff6b768d28 108 return rc;
icraggs 3:dbff6b768d28 109 }
icraggs 3:dbff6b768d28 110
icraggs 3:dbff6b768d28 111
icraggs 6:4d312a49200b 112 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle()
icraggs 3:dbff6b768d28 113 {
icraggs 3:dbff6b768d28 114 int timeout = 1000L;
icraggs 3:dbff6b768d28 115 /* get one piece of work off the wire and one pass through */
icraggs 2:dcfdd2abfe71 116
icraggs 3:dbff6b768d28 117 // 1. read the socket, see what work is due.
icraggs 6:4d312a49200b 118 int packet_type = readPacket(-1);
icraggs 6:4d312a49200b 119
Ian Craggs 5:389ccac5a50c 120 printf("packet type %d\n", packet_type);
icraggs 2:dcfdd2abfe71 121
icraggs 4:4ef00243708e 122 switch (packet_type)
icraggs 4:4ef00243708e 123 {
icraggs 6:4d312a49200b 124 case CONNACK:
Ian Craggs 5:389ccac5a50c 125 printf("connack received\n");
icraggs 4:4ef00243708e 126 break;
icraggs 4:4ef00243708e 127 case PUBACK:
icraggs 4:4ef00243708e 128 break;
icraggs 4:4ef00243708e 129 case SUBACK:
icraggs 4:4ef00243708e 130 break;
icraggs 4:4ef00243708e 131 case PUBREC:
icraggs 4:4ef00243708e 132 break;
icraggs 4:4ef00243708e 133 case PUBCOMP:
icraggs 4:4ef00243708e 134 break;
icraggs 4:4ef00243708e 135 case PINGRESP:
icraggs 4:4ef00243708e 136 break;
icraggs 4:4ef00243708e 137 }
icraggs 4:4ef00243708e 138 return packet_type;
icraggs 3:dbff6b768d28 139 }
icraggs 3:dbff6b768d28 140
icraggs 3:dbff6b768d28 141
icraggs 6:4d312a49200b 142 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
icraggs 3:dbff6b768d28 143 {
icraggs 6:4d312a49200b 144 int len = 0;
icraggs 6:4d312a49200b 145 int rc = -99;
icraggs 6:4d312a49200b 146 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
icraggs 6:4d312a49200b 147
icraggs 6:4d312a49200b 148 /* 2. if the connect was successful, send the MQTT connect packet */
icraggs 6:4d312a49200b 149 if (options == 0)
icraggs 6:4d312a49200b 150 {
icraggs 6:4d312a49200b 151 default_options.clientID.cstring = "me";
icraggs 6:4d312a49200b 152 options = &default_options;
icraggs 6:4d312a49200b 153 }
icraggs 6:4d312a49200b 154
icraggs 6:4d312a49200b 155 this->keepalive = options->keepAliveInterval;
icraggs 6:4d312a49200b 156 len = MQTTSerialize_connect(buf, buflen, options);
icraggs 6:4d312a49200b 157 rc = sendPacket(len); // send the connect packet
Ian Craggs 5:389ccac5a50c 158 printf("rc from send is %d\n", rc);
icraggs 3:dbff6b768d28 159
icraggs 3:dbff6b768d28 160 /* 3. wait until the connack is received */
icraggs 4:4ef00243708e 161 if (resultHandler == 0)
icraggs 2:dcfdd2abfe71 162 {
icraggs 6:4d312a49200b 163 // this will be a blocking call, wait for the connack
icraggs 6:4d312a49200b 164 if (cycle() == CONNACK)
icraggs 6:4d312a49200b 165 {
icraggs 6:4d312a49200b 166 int connack_rc = -1;
icraggs 6:4d312a49200b 167 if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
icraggs 6:4d312a49200b 168 rc = connack_rc;
Ian Craggs 5:389ccac5a50c 169 }
icraggs 4:4ef00243708e 170 }
icraggs 4:4ef00243708e 171 else
icraggs 4:4ef00243708e 172 {
icraggs 4:4ef00243708e 173 // set connect response callback function
icraggs 4:4ef00243708e 174 }
icraggs 2:dcfdd2abfe71 175
Ian Craggs 5:389ccac5a50c 176 return rc;
icraggs 2:dcfdd2abfe71 177 }
icraggs 6:4d312a49200b 178
icraggs 6:4d312a49200b 179
icraggs 6:4d312a49200b 180 template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler,
icraggs 6:4d312a49200b 181 FP<void, Result*> *resultHandler)
icraggs 6:4d312a49200b 182 {
icraggs 6:4d312a49200b 183 int rc = -1,
icraggs 6:4d312a49200b 184 len = 0;
icraggs 6:4d312a49200b 185 MQTTString topic = {(char*)topicFilter, 0, 0};
icraggs 6:4d312a49200b 186
icraggs 6:4d312a49200b 187 len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos);
icraggs 6:4d312a49200b 188 rc = sendPacket(len); // send the subscribe packet
icraggs 6:4d312a49200b 189
icraggs 6:4d312a49200b 190 /* wait for suback */
icraggs 6:4d312a49200b 191 if (resultHandler == 0)
icraggs 6:4d312a49200b 192 {
icraggs 6:4d312a49200b 193 // this will block
icraggs 6:4d312a49200b 194 if (cycle() == SUBACK)
icraggs 6:4d312a49200b 195 {
icraggs 6:4d312a49200b 196 int count = 0, grantedQoS = -1;
icraggs 6:4d312a49200b 197 if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1)
icraggs 6:4d312a49200b 198 rc = grantedQoS; // 0, 1, 2 or 0x80
icraggs 6:4d312a49200b 199 }
icraggs 6:4d312a49200b 200 }
icraggs 6:4d312a49200b 201 else
icraggs 6:4d312a49200b 202 {
icraggs 6:4d312a49200b 203 // set subscribe response callback function
icraggs 6:4d312a49200b 204
icraggs 6:4d312a49200b 205 }
icraggs 6:4d312a49200b 206
icraggs 6:4d312a49200b 207 return rc;
icraggs 6:4d312a49200b 208 }