An API for using MQTT over multiple transports for mbed OS 5
Fork of MQTT by
MQTTClient.cpp@6:4d312a49200b, 2014-04-08 (annotated)
- Committer:
- icraggs
- Date:
- Tue Apr 08 22:54:37 2014 +0000
- Revision:
- 6:4d312a49200b
- Parent:
- 5:389ccac5a50c
- Child:
- 7:f9d690fb6dad
Subscribing
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
icraggs | 6:4d312a49200b | 1 | /******************************************************************************* |
icraggs | 6:4d312a49200b | 2 | * Copyright (c) 2014 IBM Corp. |
sam_grove | 0:fe461e4d7afe | 3 | * |
icraggs | 6:4d312a49200b | 4 | * All rights reserved. This program and the accompanying materials |
icraggs | 6:4d312a49200b | 5 | * are made available under the terms of the Eclipse Public License v1.0 |
icraggs | 6:4d312a49200b | 6 | * and Eclipse Distribution License v1.0 which accompany this distribution. |
sam_grove | 0:fe461e4d7afe | 7 | * |
icraggs | 6:4d312a49200b | 8 | * The Eclipse Public License is available at |
icraggs | 6:4d312a49200b | 9 | * http://www.eclipse.org/legal/epl-v10.html |
icraggs | 6:4d312a49200b | 10 | * and the Eclipse Distribution License is available at |
icraggs | 6:4d312a49200b | 11 | * http://www.eclipse.org/org/documents/edl-v10.php. |
sam_grove | 0:fe461e4d7afe | 12 | * |
icraggs | 6:4d312a49200b | 13 | * Contributors: |
icraggs | 6:4d312a49200b | 14 | * Ian Craggs - initial API and implementation and/or initial documentation |
icraggs | 6:4d312a49200b | 15 | *******************************************************************************/ |
sam_grove | 0:fe461e4d7afe | 16 | |
icraggs | 2:dcfdd2abfe71 | 17 | #include "MQTTClient.h" |
icraggs | 2:dcfdd2abfe71 | 18 | #include "MQTTPacket.h" |
icraggs | 2:dcfdd2abfe71 | 19 | |
icraggs | 6:4d312a49200b | 20 | template<class Network, class Timer, class Thread> MQTT::Client<Network, Timer, Thread>::Client(Network* network, Timer* timer, const int buffer_size, const int command_timeout) |
icraggs | 2:dcfdd2abfe71 | 21 | { |
icraggs | 2:dcfdd2abfe71 | 22 | |
icraggs | 6:4d312a49200b | 23 | buf = new char[buffer_size]; |
Ian Craggs |
5:389ccac5a50c | 24 | readbuf = new char[buffer_size]; |
icraggs | 3:dbff6b768d28 | 25 | this->ipstack = ipstack; |
icraggs | 4:4ef00243708e | 26 | this->command_timeout = command_timeout; |
Ian Craggs |
5:389ccac5a50c | 27 | //this->thread = new Thread(0); // only need a background thread for non-blocking mode |
icraggs | 4:4ef00243708e | 28 | this->ipstack = network; |
icraggs | 6:4d312a49200b | 29 | this->packetid = 0; |
icraggs | 6:4d312a49200b | 30 | this->timer = timer; |
icraggs | 3:dbff6b768d28 | 31 | } |
icraggs | 3:dbff6b768d28 | 32 | |
icraggs | 3:dbff6b768d28 | 33 | |
icraggs | 6:4d312a49200b | 34 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::getPacketId() |
icraggs | 6:4d312a49200b | 35 | { |
icraggs | 6:4d312a49200b | 36 | return this->packetid = (this->packetid == MAX_PACKET_ID) ? 1 : ++this->packetid; |
icraggs | 6:4d312a49200b | 37 | } |
icraggs | 6:4d312a49200b | 38 | |
icraggs | 6:4d312a49200b | 39 | |
icraggs | 6:4d312a49200b | 40 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::sendPacket(int length, int timeout) |
icraggs | 3:dbff6b768d28 | 41 | { |
icraggs | 3:dbff6b768d28 | 42 | int sent = 0; |
icraggs | 3:dbff6b768d28 | 43 | |
icraggs | 3:dbff6b768d28 | 44 | while (sent < length) |
icraggs | 4:4ef00243708e | 45 | sent += ipstack->write(&buf[sent], length, -1); |
icraggs | 3:dbff6b768d28 | 46 | |
icraggs | 3:dbff6b768d28 | 47 | return sent; |
icraggs | 3:dbff6b768d28 | 48 | } |
icraggs | 3:dbff6b768d28 | 49 | |
icraggs | 3:dbff6b768d28 | 50 | |
icraggs | 6:4d312a49200b | 51 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::decodePacket(int* value, int timeout) |
icraggs | 3:dbff6b768d28 | 52 | { |
icraggs | 3:dbff6b768d28 | 53 | char c; |
icraggs | 3:dbff6b768d28 | 54 | int multiplier = 1; |
icraggs | 3:dbff6b768d28 | 55 | int len = 0; |
icraggs | 3:dbff6b768d28 | 56 | #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 |
icraggs | 3:dbff6b768d28 | 57 | |
icraggs | 3:dbff6b768d28 | 58 | *value = 0; |
icraggs | 3:dbff6b768d28 | 59 | do |
icraggs | 3:dbff6b768d28 | 60 | { |
icraggs | 3:dbff6b768d28 | 61 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 3:dbff6b768d28 | 62 | |
icraggs | 3:dbff6b768d28 | 63 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 3:dbff6b768d28 | 64 | { |
icraggs | 3:dbff6b768d28 | 65 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 3:dbff6b768d28 | 66 | goto exit; |
icraggs | 3:dbff6b768d28 | 67 | } |
icraggs | 3:dbff6b768d28 | 68 | rc = ipstack->read(&c, 1, timeout); |
icraggs | 3:dbff6b768d28 | 69 | if (rc != 1) |
icraggs | 3:dbff6b768d28 | 70 | goto exit; |
icraggs | 3:dbff6b768d28 | 71 | *value += (c & 127) * multiplier; |
icraggs | 3:dbff6b768d28 | 72 | multiplier *= 128; |
icraggs | 3:dbff6b768d28 | 73 | } while ((c & 128) != 0); |
icraggs | 3:dbff6b768d28 | 74 | exit: |
icraggs | 3:dbff6b768d28 | 75 | return len; |
icraggs | 2:dcfdd2abfe71 | 76 | } |
icraggs | 2:dcfdd2abfe71 | 77 | |
sam_grove | 0:fe461e4d7afe | 78 | |
icraggs | 6:4d312a49200b | 79 | /** |
icraggs | 6:4d312a49200b | 80 | * If any read fails in this method, then we should disconnect from the network, as on reconnect |
icraggs | 6:4d312a49200b | 81 | * the packets can be retried. |
icraggs | 6:4d312a49200b | 82 | * @param timeout the max time to wait for the packet read to complete, in milliseconds |
icraggs | 6:4d312a49200b | 83 | * @return the MQTT packet type, or -1 if none |
icraggs | 6:4d312a49200b | 84 | */ |
icraggs | 6:4d312a49200b | 85 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::readPacket(int timeout) |
icraggs | 2:dcfdd2abfe71 | 86 | { |
icraggs | 3:dbff6b768d28 | 87 | int rc = -1; |
icraggs | 4:4ef00243708e | 88 | MQTTHeader header = {0}; |
icraggs | 3:dbff6b768d28 | 89 | int len = 0; |
icraggs | 3:dbff6b768d28 | 90 | int rem_len = 0; |
icraggs | 3:dbff6b768d28 | 91 | |
icraggs | 3:dbff6b768d28 | 92 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 6:4d312a49200b | 93 | if (ipstack->read(readbuf, 1, timeout) != 1) |
icraggs | 3:dbff6b768d28 | 94 | goto exit; |
icraggs | 3:dbff6b768d28 | 95 | |
icraggs | 3:dbff6b768d28 | 96 | len = 1; |
icraggs | 3:dbff6b768d28 | 97 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 3:dbff6b768d28 | 98 | decodePacket(&rem_len, timeout); |
icraggs | 3:dbff6b768d28 | 99 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ |
icraggs | 3:dbff6b768d28 | 100 | |
icraggs | 3:dbff6b768d28 | 101 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 6:4d312a49200b | 102 | if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) |
icraggs | 3:dbff6b768d28 | 103 | goto exit; |
icraggs | 3:dbff6b768d28 | 104 | |
Ian Craggs |
5:389ccac5a50c | 105 | header.byte = readbuf[0]; |
icraggs | 3:dbff6b768d28 | 106 | rc = header.bits.type; |
icraggs | 3:dbff6b768d28 | 107 | exit: |
icraggs | 3:dbff6b768d28 | 108 | return rc; |
icraggs | 3:dbff6b768d28 | 109 | } |
icraggs | 3:dbff6b768d28 | 110 | |
icraggs | 3:dbff6b768d28 | 111 | |
icraggs | 6:4d312a49200b | 112 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::cycle() |
icraggs | 3:dbff6b768d28 | 113 | { |
icraggs | 3:dbff6b768d28 | 114 | int timeout = 1000L; |
icraggs | 3:dbff6b768d28 | 115 | /* get one piece of work off the wire and one pass through */ |
icraggs | 2:dcfdd2abfe71 | 116 | |
icraggs | 3:dbff6b768d28 | 117 | // 1. read the socket, see what work is due. |
icraggs | 6:4d312a49200b | 118 | int packet_type = readPacket(-1); |
icraggs | 6:4d312a49200b | 119 | |
Ian Craggs |
5:389ccac5a50c | 120 | printf("packet type %d\n", packet_type); |
icraggs | 2:dcfdd2abfe71 | 121 | |
icraggs | 4:4ef00243708e | 122 | switch (packet_type) |
icraggs | 4:4ef00243708e | 123 | { |
icraggs | 6:4d312a49200b | 124 | case CONNACK: |
Ian Craggs |
5:389ccac5a50c | 125 | printf("connack received\n"); |
icraggs | 4:4ef00243708e | 126 | break; |
icraggs | 4:4ef00243708e | 127 | case PUBACK: |
icraggs | 4:4ef00243708e | 128 | break; |
icraggs | 4:4ef00243708e | 129 | case SUBACK: |
icraggs | 4:4ef00243708e | 130 | break; |
icraggs | 4:4ef00243708e | 131 | case PUBREC: |
icraggs | 4:4ef00243708e | 132 | break; |
icraggs | 4:4ef00243708e | 133 | case PUBCOMP: |
icraggs | 4:4ef00243708e | 134 | break; |
icraggs | 4:4ef00243708e | 135 | case PINGRESP: |
icraggs | 4:4ef00243708e | 136 | break; |
icraggs | 4:4ef00243708e | 137 | } |
icraggs | 4:4ef00243708e | 138 | return packet_type; |
icraggs | 3:dbff6b768d28 | 139 | } |
icraggs | 3:dbff6b768d28 | 140 | |
icraggs | 3:dbff6b768d28 | 141 | |
icraggs | 6:4d312a49200b | 142 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler) |
icraggs | 3:dbff6b768d28 | 143 | { |
icraggs | 6:4d312a49200b | 144 | int len = 0; |
icraggs | 6:4d312a49200b | 145 | int rc = -99; |
icraggs | 6:4d312a49200b | 146 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
icraggs | 6:4d312a49200b | 147 | |
icraggs | 6:4d312a49200b | 148 | /* 2. if the connect was successful, send the MQTT connect packet */ |
icraggs | 6:4d312a49200b | 149 | if (options == 0) |
icraggs | 6:4d312a49200b | 150 | { |
icraggs | 6:4d312a49200b | 151 | default_options.clientID.cstring = "me"; |
icraggs | 6:4d312a49200b | 152 | options = &default_options; |
icraggs | 6:4d312a49200b | 153 | } |
icraggs | 6:4d312a49200b | 154 | |
icraggs | 6:4d312a49200b | 155 | this->keepalive = options->keepAliveInterval; |
icraggs | 6:4d312a49200b | 156 | len = MQTTSerialize_connect(buf, buflen, options); |
icraggs | 6:4d312a49200b | 157 | rc = sendPacket(len); // send the connect packet |
Ian Craggs |
5:389ccac5a50c | 158 | printf("rc from send is %d\n", rc); |
icraggs | 3:dbff6b768d28 | 159 | |
icraggs | 3:dbff6b768d28 | 160 | /* 3. wait until the connack is received */ |
icraggs | 4:4ef00243708e | 161 | if (resultHandler == 0) |
icraggs | 2:dcfdd2abfe71 | 162 | { |
icraggs | 6:4d312a49200b | 163 | // this will be a blocking call, wait for the connack |
icraggs | 6:4d312a49200b | 164 | if (cycle() == CONNACK) |
icraggs | 6:4d312a49200b | 165 | { |
icraggs | 6:4d312a49200b | 166 | int connack_rc = -1; |
icraggs | 6:4d312a49200b | 167 | if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) |
icraggs | 6:4d312a49200b | 168 | rc = connack_rc; |
Ian Craggs |
5:389ccac5a50c | 169 | } |
icraggs | 4:4ef00243708e | 170 | } |
icraggs | 4:4ef00243708e | 171 | else |
icraggs | 4:4ef00243708e | 172 | { |
icraggs | 4:4ef00243708e | 173 | // set connect response callback function |
icraggs | 4:4ef00243708e | 174 | } |
icraggs | 2:dcfdd2abfe71 | 175 | |
Ian Craggs |
5:389ccac5a50c | 176 | return rc; |
icraggs | 2:dcfdd2abfe71 | 177 | } |
icraggs | 6:4d312a49200b | 178 | |
icraggs | 6:4d312a49200b | 179 | |
icraggs | 6:4d312a49200b | 180 | template<class Network, class Timer, class Thread> int MQTT::Client<Network, Timer, Thread>::subscribe(const char* topicFilter, enum QoS qos, FP<void, Message*> messageHandler, |
icraggs | 6:4d312a49200b | 181 | FP<void, Result*> *resultHandler) |
icraggs | 6:4d312a49200b | 182 | { |
icraggs | 6:4d312a49200b | 183 | int rc = -1, |
icraggs | 6:4d312a49200b | 184 | len = 0; |
icraggs | 6:4d312a49200b | 185 | MQTTString topic = {(char*)topicFilter, 0, 0}; |
icraggs | 6:4d312a49200b | 186 | |
icraggs | 6:4d312a49200b | 187 | len = MQTTSerialize_subscribe(buf, buflen, 0, getPacketId(), 1, &topic, (int*)&qos); |
icraggs | 6:4d312a49200b | 188 | rc = sendPacket(len); // send the subscribe packet |
icraggs | 6:4d312a49200b | 189 | |
icraggs | 6:4d312a49200b | 190 | /* wait for suback */ |
icraggs | 6:4d312a49200b | 191 | if (resultHandler == 0) |
icraggs | 6:4d312a49200b | 192 | { |
icraggs | 6:4d312a49200b | 193 | // this will block |
icraggs | 6:4d312a49200b | 194 | if (cycle() == SUBACK) |
icraggs | 6:4d312a49200b | 195 | { |
icraggs | 6:4d312a49200b | 196 | int count = 0, grantedQoS = -1; |
icraggs | 6:4d312a49200b | 197 | if (MQTTDeserialize_suback(&packetid, 1, &count, &grantedQoS, readbuf, readbuflen) == 1) |
icraggs | 6:4d312a49200b | 198 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
icraggs | 6:4d312a49200b | 199 | } |
icraggs | 6:4d312a49200b | 200 | } |
icraggs | 6:4d312a49200b | 201 | else |
icraggs | 6:4d312a49200b | 202 | { |
icraggs | 6:4d312a49200b | 203 | // set subscribe response callback function |
icraggs | 6:4d312a49200b | 204 | |
icraggs | 6:4d312a49200b | 205 | } |
icraggs | 6:4d312a49200b | 206 | |
icraggs | 6:4d312a49200b | 207 | return rc; |
icraggs | 6:4d312a49200b | 208 | } |