An API for using MQTT over multiple transports for mbed OS 5
Fork of MQTT by
MQTTClient.cpp@5:389ccac5a50c, 2014-04-07 (annotated)
- Committer:
- Ian Craggs
- Date:
- Mon Apr 07 23:52:57 2014 +0100
- Revision:
- 5:389ccac5a50c
- Parent:
- 4:4ef00243708e
- Child:
- 6:4d312a49200b
Latest updates
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
sam_grove | 0:fe461e4d7afe | 1 | /** |
sam_grove | 0:fe461e4d7afe | 2 | * @file MQTTPubSub.cpp |
sam_grove | 0:fe461e4d7afe | 3 | * @brief API - for MQTT |
sam_grove | 0:fe461e4d7afe | 4 | * @author |
sam_grove | 0:fe461e4d7afe | 5 | * @version 1.0 |
sam_grove | 0:fe461e4d7afe | 6 | * @see |
sam_grove | 0:fe461e4d7afe | 7 | * |
sam_grove | 0:fe461e4d7afe | 8 | * Copyright (c) 2014 |
sam_grove | 0:fe461e4d7afe | 9 | * |
sam_grove | 0:fe461e4d7afe | 10 | * Licensed under the Apache License, Version 2.0 (the "License"); |
sam_grove | 0:fe461e4d7afe | 11 | * you may not use this file except in compliance with the License. |
sam_grove | 0:fe461e4d7afe | 12 | * You may obtain a copy of the License at |
sam_grove | 0:fe461e4d7afe | 13 | * |
sam_grove | 0:fe461e4d7afe | 14 | * http://www.apache.org/licenses/LICENSE-2.0 |
sam_grove | 0:fe461e4d7afe | 15 | * |
sam_grove | 0:fe461e4d7afe | 16 | * Unless required by applicable law or agreed to in writing, software |
sam_grove | 0:fe461e4d7afe | 17 | * distributed under the License is distributed on an "AS IS" BASIS, |
sam_grove | 0:fe461e4d7afe | 18 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
sam_grove | 0:fe461e4d7afe | 19 | * See the License for the specific language governing permissions and |
sam_grove | 0:fe461e4d7afe | 20 | * limitations under the License. |
sam_grove | 0:fe461e4d7afe | 21 | */ |
sam_grove | 0:fe461e4d7afe | 22 | |
icraggs | 2:dcfdd2abfe71 | 23 | #include "MQTTClient.h" |
icraggs | 2:dcfdd2abfe71 | 24 | #include "MQTTPacket.h" |
icraggs | 2:dcfdd2abfe71 | 25 | |
icraggs | 4:4ef00243708e | 26 | template<class Network, class Thread> MQTT::Client<Network, Thread>::Client(Network* network, const int buffer_size, const int command_timeout) |
icraggs | 2:dcfdd2abfe71 | 27 | { |
icraggs | 2:dcfdd2abfe71 | 28 | |
Ian Craggs |
5:389ccac5a50c | 29 | buf = new char[buffer_size]; |
Ian Craggs |
5:389ccac5a50c | 30 | readbuf = new char[buffer_size]; |
icraggs | 3:dbff6b768d28 | 31 | this->ipstack = ipstack; |
icraggs | 4:4ef00243708e | 32 | this->command_timeout = command_timeout; |
Ian Craggs |
5:389ccac5a50c | 33 | //this->thread = new Thread(0); // only need a background thread for non-blocking mode |
icraggs | 4:4ef00243708e | 34 | this->ipstack = network; |
icraggs | 3:dbff6b768d28 | 35 | } |
icraggs | 3:dbff6b768d28 | 36 | |
icraggs | 3:dbff6b768d28 | 37 | |
icraggs | 4:4ef00243708e | 38 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::sendPacket(int length) |
icraggs | 3:dbff6b768d28 | 39 | { |
icraggs | 3:dbff6b768d28 | 40 | int sent = 0; |
icraggs | 3:dbff6b768d28 | 41 | |
icraggs | 3:dbff6b768d28 | 42 | while (sent < length) |
icraggs | 4:4ef00243708e | 43 | sent += ipstack->write(&buf[sent], length, -1); |
icraggs | 3:dbff6b768d28 | 44 | |
icraggs | 3:dbff6b768d28 | 45 | return sent; |
icraggs | 3:dbff6b768d28 | 46 | } |
icraggs | 3:dbff6b768d28 | 47 | |
icraggs | 3:dbff6b768d28 | 48 | |
icraggs | 4:4ef00243708e | 49 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::decodePacket(int* value, int timeout) |
icraggs | 3:dbff6b768d28 | 50 | { |
icraggs | 3:dbff6b768d28 | 51 | char c; |
icraggs | 3:dbff6b768d28 | 52 | int multiplier = 1; |
icraggs | 3:dbff6b768d28 | 53 | int len = 0; |
icraggs | 3:dbff6b768d28 | 54 | #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 |
icraggs | 3:dbff6b768d28 | 55 | |
icraggs | 3:dbff6b768d28 | 56 | *value = 0; |
icraggs | 3:dbff6b768d28 | 57 | do |
icraggs | 3:dbff6b768d28 | 58 | { |
icraggs | 3:dbff6b768d28 | 59 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 3:dbff6b768d28 | 60 | |
icraggs | 3:dbff6b768d28 | 61 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 3:dbff6b768d28 | 62 | { |
icraggs | 3:dbff6b768d28 | 63 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 3:dbff6b768d28 | 64 | goto exit; |
icraggs | 3:dbff6b768d28 | 65 | } |
icraggs | 3:dbff6b768d28 | 66 | rc = ipstack->read(&c, 1, timeout); |
icraggs | 3:dbff6b768d28 | 67 | if (rc != 1) |
icraggs | 3:dbff6b768d28 | 68 | goto exit; |
icraggs | 3:dbff6b768d28 | 69 | *value += (c & 127) * multiplier; |
icraggs | 3:dbff6b768d28 | 70 | multiplier *= 128; |
icraggs | 3:dbff6b768d28 | 71 | } while ((c & 128) != 0); |
icraggs | 3:dbff6b768d28 | 72 | exit: |
icraggs | 3:dbff6b768d28 | 73 | return len; |
icraggs | 2:dcfdd2abfe71 | 74 | } |
icraggs | 2:dcfdd2abfe71 | 75 | |
sam_grove | 0:fe461e4d7afe | 76 | |
icraggs | 4:4ef00243708e | 77 | template<class Network, class Thread> int MQTT::Client<Network,Thread>::readPacket(int timeout) |
icraggs | 2:dcfdd2abfe71 | 78 | { |
icraggs | 3:dbff6b768d28 | 79 | int rc = -1; |
icraggs | 4:4ef00243708e | 80 | MQTTHeader header = {0}; |
icraggs | 3:dbff6b768d28 | 81 | int len = 0; |
icraggs | 3:dbff6b768d28 | 82 | int rem_len = 0; |
icraggs | 3:dbff6b768d28 | 83 | |
icraggs | 3:dbff6b768d28 | 84 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 3:dbff6b768d28 | 85 | if ((rc = ipstack->read(readbuf, 1, timeout)) != 1) |
icraggs | 3:dbff6b768d28 | 86 | goto exit; |
icraggs | 3:dbff6b768d28 | 87 | |
icraggs | 3:dbff6b768d28 | 88 | len = 1; |
icraggs | 3:dbff6b768d28 | 89 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 3:dbff6b768d28 | 90 | decodePacket(&rem_len, timeout); |
icraggs | 3:dbff6b768d28 | 91 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ |
icraggs | 3:dbff6b768d28 | 92 | |
icraggs | 3:dbff6b768d28 | 93 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 3:dbff6b768d28 | 94 | if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len) |
icraggs | 3:dbff6b768d28 | 95 | goto exit; |
icraggs | 3:dbff6b768d28 | 96 | |
Ian Craggs |
5:389ccac5a50c | 97 | header.byte = readbuf[0]; |
icraggs | 3:dbff6b768d28 | 98 | rc = header.bits.type; |
icraggs | 3:dbff6b768d28 | 99 | exit: |
icraggs | 3:dbff6b768d28 | 100 | return rc; |
icraggs | 3:dbff6b768d28 | 101 | } |
icraggs | 3:dbff6b768d28 | 102 | |
icraggs | 3:dbff6b768d28 | 103 | |
icraggs | 4:4ef00243708e | 104 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::cycle() |
icraggs | 3:dbff6b768d28 | 105 | { |
icraggs | 3:dbff6b768d28 | 106 | int timeout = 1000L; |
icraggs | 3:dbff6b768d28 | 107 | /* get one piece of work off the wire and one pass through */ |
icraggs | 2:dcfdd2abfe71 | 108 | |
icraggs | 3:dbff6b768d28 | 109 | // 1. read the socket, see what work is due. |
Ian Craggs |
5:389ccac5a50c | 110 | int packet_type = readPacket(-1); |
Ian Craggs |
5:389ccac5a50c | 111 | |
Ian Craggs |
5:389ccac5a50c | 112 | printf("packet type %d\n", packet_type); |
icraggs | 2:dcfdd2abfe71 | 113 | |
icraggs | 4:4ef00243708e | 114 | switch (packet_type) |
icraggs | 4:4ef00243708e | 115 | { |
Ian Craggs |
5:389ccac5a50c | 116 | case CONNACK: |
Ian Craggs |
5:389ccac5a50c | 117 | printf("connack received\n"); |
icraggs | 4:4ef00243708e | 118 | break; |
icraggs | 4:4ef00243708e | 119 | case PUBACK: |
icraggs | 4:4ef00243708e | 120 | break; |
icraggs | 4:4ef00243708e | 121 | case SUBACK: |
icraggs | 4:4ef00243708e | 122 | break; |
icraggs | 4:4ef00243708e | 123 | case PUBREC: |
icraggs | 4:4ef00243708e | 124 | break; |
icraggs | 4:4ef00243708e | 125 | case PUBCOMP: |
icraggs | 4:4ef00243708e | 126 | break; |
icraggs | 4:4ef00243708e | 127 | case PINGRESP: |
icraggs | 4:4ef00243708e | 128 | break; |
icraggs | 4:4ef00243708e | 129 | } |
icraggs | 4:4ef00243708e | 130 | return packet_type; |
icraggs | 3:dbff6b768d28 | 131 | } |
icraggs | 3:dbff6b768d28 | 132 | |
icraggs | 3:dbff6b768d28 | 133 | |
icraggs | 4:4ef00243708e | 134 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler) |
icraggs | 3:dbff6b768d28 | 135 | { |
Ian Craggs |
5:389ccac5a50c | 136 | int len = 0; |
Ian Craggs |
5:389ccac5a50c | 137 | int rc = -99; |
Ian Craggs |
5:389ccac5a50c | 138 | |
Ian Craggs |
5:389ccac5a50c | 139 | /* 2. if the connect was successful, send the MQTT connect packet */ |
Ian Craggs |
5:389ccac5a50c | 140 | if (options == 0) |
Ian Craggs |
5:389ccac5a50c | 141 | { |
Ian Craggs |
5:389ccac5a50c | 142 | MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; |
Ian Craggs |
5:389ccac5a50c | 143 | default_options.clientID.cstring = "me"; |
Ian Craggs |
5:389ccac5a50c | 144 | len = MQTTSerialize_connect(buf, buflen, &default_options); |
Ian Craggs |
5:389ccac5a50c | 145 | } |
Ian Craggs |
5:389ccac5a50c | 146 | else |
Ian Craggs |
5:389ccac5a50c | 147 | len = MQTTSerialize_connect(buf, buflen, options); |
Ian Craggs |
5:389ccac5a50c | 148 | rc = sendPacket(len); // send the connect packet |
Ian Craggs |
5:389ccac5a50c | 149 | printf("rc from send is %d\n", rc); |
icraggs | 3:dbff6b768d28 | 150 | |
icraggs | 3:dbff6b768d28 | 151 | /* 3. wait until the connack is received */ |
icraggs | 4:4ef00243708e | 152 | if (resultHandler == 0) |
icraggs | 2:dcfdd2abfe71 | 153 | { |
Ian Craggs |
5:389ccac5a50c | 154 | // this will be a blocking call, wait for the connack |
Ian Craggs |
5:389ccac5a50c | 155 | if (cycle() == CONNACK) |
Ian Craggs |
5:389ccac5a50c | 156 | { |
Ian Craggs |
5:389ccac5a50c | 157 | int connack_rc = -1; |
Ian Craggs |
5:389ccac5a50c | 158 | if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1) |
Ian Craggs |
5:389ccac5a50c | 159 | rc = connack_rc; |
Ian Craggs |
5:389ccac5a50c | 160 | } |
icraggs | 4:4ef00243708e | 161 | } |
icraggs | 4:4ef00243708e | 162 | else |
icraggs | 4:4ef00243708e | 163 | { |
icraggs | 4:4ef00243708e | 164 | // set connect response callback function |
icraggs | 4:4ef00243708e | 165 | } |
icraggs | 2:dcfdd2abfe71 | 166 | |
Ian Craggs |
5:389ccac5a50c | 167 | return rc; |
icraggs | 2:dcfdd2abfe71 | 168 | } |