MQTT lib for WISEAgent
Fork of MQTT by
MQTTClient.cpp@4:4ef00243708e, 2014-04-07 (annotated)
- Committer:
- icraggs
- Date:
- Mon Apr 07 12:24:36 2014 +0000
- Revision:
- 4:4ef00243708e
- Parent:
- 3:dbff6b768d28
- Child:
- 5:389ccac5a50c
Templates for both networking and tasks
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
sam_grove | 0:fe461e4d7afe | 1 | /** |
sam_grove | 0:fe461e4d7afe | 2 | * @file MQTTPubSub.cpp |
sam_grove | 0:fe461e4d7afe | 3 | * @brief API - for MQTT |
sam_grove | 0:fe461e4d7afe | 4 | * @author |
sam_grove | 0:fe461e4d7afe | 5 | * @version 1.0 |
sam_grove | 0:fe461e4d7afe | 6 | * @see |
sam_grove | 0:fe461e4d7afe | 7 | * |
sam_grove | 0:fe461e4d7afe | 8 | * Copyright (c) 2014 |
sam_grove | 0:fe461e4d7afe | 9 | * |
sam_grove | 0:fe461e4d7afe | 10 | * Licensed under the Apache License, Version 2.0 (the "License"); |
sam_grove | 0:fe461e4d7afe | 11 | * you may not use this file except in compliance with the License. |
sam_grove | 0:fe461e4d7afe | 12 | * You may obtain a copy of the License at |
sam_grove | 0:fe461e4d7afe | 13 | * |
sam_grove | 0:fe461e4d7afe | 14 | * http://www.apache.org/licenses/LICENSE-2.0 |
sam_grove | 0:fe461e4d7afe | 15 | * |
sam_grove | 0:fe461e4d7afe | 16 | * Unless required by applicable law or agreed to in writing, software |
sam_grove | 0:fe461e4d7afe | 17 | * distributed under the License is distributed on an "AS IS" BASIS, |
sam_grove | 0:fe461e4d7afe | 18 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
sam_grove | 0:fe461e4d7afe | 19 | * See the License for the specific language governing permissions and |
sam_grove | 0:fe461e4d7afe | 20 | * limitations under the License. |
sam_grove | 0:fe461e4d7afe | 21 | */ |
sam_grove | 0:fe461e4d7afe | 22 | |
icraggs | 2:dcfdd2abfe71 | 23 | #include "MQTTClient.h" |
icraggs | 2:dcfdd2abfe71 | 24 | #include "MQTTPacket.h" |
icraggs | 2:dcfdd2abfe71 | 25 | |
icraggs | 4:4ef00243708e | 26 | template<class Network, class Thread> MQTT::Client<Network, Thread>::Client(Network* network, const int buffer_size, const int command_timeout) |
icraggs | 2:dcfdd2abfe71 | 27 | { |
icraggs | 2:dcfdd2abfe71 | 28 | |
icraggs | 2:dcfdd2abfe71 | 29 | buf = new char[buffer_size]; |
icraggs | 3:dbff6b768d28 | 30 | this->ipstack = ipstack; |
icraggs | 4:4ef00243708e | 31 | this->command_timeout = command_timeout; |
icraggs | 4:4ef00243708e | 32 | this->thread = new Thread(0); // only need a background thread for non-blocking mode |
icraggs | 4:4ef00243708e | 33 | this->ipstack = network; |
icraggs | 3:dbff6b768d28 | 34 | } |
icraggs | 3:dbff6b768d28 | 35 | |
icraggs | 3:dbff6b768d28 | 36 | |
icraggs | 4:4ef00243708e | 37 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::sendPacket(int length) |
icraggs | 3:dbff6b768d28 | 38 | { |
icraggs | 3:dbff6b768d28 | 39 | int sent = 0; |
icraggs | 3:dbff6b768d28 | 40 | |
icraggs | 3:dbff6b768d28 | 41 | while (sent < length) |
icraggs | 4:4ef00243708e | 42 | sent += ipstack->write(&buf[sent], length, -1); |
icraggs | 3:dbff6b768d28 | 43 | |
icraggs | 3:dbff6b768d28 | 44 | return sent; |
icraggs | 3:dbff6b768d28 | 45 | } |
icraggs | 3:dbff6b768d28 | 46 | |
icraggs | 3:dbff6b768d28 | 47 | |
icraggs | 4:4ef00243708e | 48 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::decodePacket(int* value, int timeout) |
icraggs | 3:dbff6b768d28 | 49 | { |
icraggs | 3:dbff6b768d28 | 50 | char c; |
icraggs | 3:dbff6b768d28 | 51 | int multiplier = 1; |
icraggs | 3:dbff6b768d28 | 52 | int len = 0; |
icraggs | 3:dbff6b768d28 | 53 | #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 |
icraggs | 3:dbff6b768d28 | 54 | |
icraggs | 3:dbff6b768d28 | 55 | *value = 0; |
icraggs | 3:dbff6b768d28 | 56 | do |
icraggs | 3:dbff6b768d28 | 57 | { |
icraggs | 3:dbff6b768d28 | 58 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 3:dbff6b768d28 | 59 | |
icraggs | 3:dbff6b768d28 | 60 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 3:dbff6b768d28 | 61 | { |
icraggs | 3:dbff6b768d28 | 62 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 3:dbff6b768d28 | 63 | goto exit; |
icraggs | 3:dbff6b768d28 | 64 | } |
icraggs | 3:dbff6b768d28 | 65 | rc = ipstack->read(&c, 1, timeout); |
icraggs | 3:dbff6b768d28 | 66 | if (rc != 1) |
icraggs | 3:dbff6b768d28 | 67 | goto exit; |
icraggs | 3:dbff6b768d28 | 68 | *value += (c & 127) * multiplier; |
icraggs | 3:dbff6b768d28 | 69 | multiplier *= 128; |
icraggs | 3:dbff6b768d28 | 70 | } while ((c & 128) != 0); |
icraggs | 3:dbff6b768d28 | 71 | exit: |
icraggs | 3:dbff6b768d28 | 72 | return len; |
icraggs | 2:dcfdd2abfe71 | 73 | } |
icraggs | 2:dcfdd2abfe71 | 74 | |
sam_grove | 0:fe461e4d7afe | 75 | |
icraggs | 4:4ef00243708e | 76 | template<class Network, class Thread> int MQTT::Client<Network,Thread>::readPacket(int timeout) |
icraggs | 2:dcfdd2abfe71 | 77 | { |
icraggs | 3:dbff6b768d28 | 78 | int rc = -1; |
icraggs | 4:4ef00243708e | 79 | MQTTHeader header = {0}; |
icraggs | 3:dbff6b768d28 | 80 | int len = 0; |
icraggs | 3:dbff6b768d28 | 81 | int rem_len = 0; |
icraggs | 3:dbff6b768d28 | 82 | |
icraggs | 3:dbff6b768d28 | 83 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 3:dbff6b768d28 | 84 | if ((rc = ipstack->read(readbuf, 1, timeout)) != 1) |
icraggs | 3:dbff6b768d28 | 85 | goto exit; |
icraggs | 3:dbff6b768d28 | 86 | |
icraggs | 3:dbff6b768d28 | 87 | len = 1; |
icraggs | 3:dbff6b768d28 | 88 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 3:dbff6b768d28 | 89 | decodePacket(&rem_len, timeout); |
icraggs | 3:dbff6b768d28 | 90 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ |
icraggs | 3:dbff6b768d28 | 91 | |
icraggs | 3:dbff6b768d28 | 92 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 3:dbff6b768d28 | 93 | if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len) |
icraggs | 3:dbff6b768d28 | 94 | goto exit; |
icraggs | 3:dbff6b768d28 | 95 | |
icraggs | 3:dbff6b768d28 | 96 | header.byte = buf[0]; |
icraggs | 3:dbff6b768d28 | 97 | rc = header.bits.type; |
icraggs | 3:dbff6b768d28 | 98 | exit: |
icraggs | 3:dbff6b768d28 | 99 | return rc; |
icraggs | 3:dbff6b768d28 | 100 | } |
icraggs | 3:dbff6b768d28 | 101 | |
icraggs | 3:dbff6b768d28 | 102 | |
icraggs | 4:4ef00243708e | 103 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::cycle() |
icraggs | 3:dbff6b768d28 | 104 | { |
icraggs | 3:dbff6b768d28 | 105 | int timeout = 1000L; |
icraggs | 3:dbff6b768d28 | 106 | /* get one piece of work off the wire and one pass through */ |
icraggs | 2:dcfdd2abfe71 | 107 | |
icraggs | 3:dbff6b768d28 | 108 | // 1. read the socket, see what work is due. |
icraggs | 4:4ef00243708e | 109 | int packet_type = readPacket(-1); |
icraggs | 2:dcfdd2abfe71 | 110 | |
icraggs | 4:4ef00243708e | 111 | switch (packet_type) |
icraggs | 4:4ef00243708e | 112 | { |
icraggs | 4:4ef00243708e | 113 | case CONNACK: |
icraggs | 4:4ef00243708e | 114 | break; |
icraggs | 4:4ef00243708e | 115 | case PUBACK: |
icraggs | 4:4ef00243708e | 116 | break; |
icraggs | 4:4ef00243708e | 117 | case SUBACK: |
icraggs | 4:4ef00243708e | 118 | break; |
icraggs | 4:4ef00243708e | 119 | case PUBREC: |
icraggs | 4:4ef00243708e | 120 | break; |
icraggs | 4:4ef00243708e | 121 | case PUBCOMP: |
icraggs | 4:4ef00243708e | 122 | break; |
icraggs | 4:4ef00243708e | 123 | case PINGRESP: |
icraggs | 4:4ef00243708e | 124 | break; |
icraggs | 4:4ef00243708e | 125 | } |
icraggs | 4:4ef00243708e | 126 | return packet_type; |
icraggs | 3:dbff6b768d28 | 127 | } |
icraggs | 3:dbff6b768d28 | 128 | |
icraggs | 3:dbff6b768d28 | 129 | |
icraggs | 4:4ef00243708e | 130 | template<class Network, class Thread> int MQTT::Client<Network, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler) |
icraggs | 3:dbff6b768d28 | 131 | { |
icraggs | 3:dbff6b768d28 | 132 | /* 1. connect to the server with the desired transport */ |
icraggs | 4:4ef00243708e | 133 | /*if (!ipstack->connect()) |
icraggs | 4:4ef00243708e | 134 | return -99;*/ |
icraggs | 3:dbff6b768d28 | 135 | |
icraggs | 3:dbff6b768d28 | 136 | /* 2. if the connect was successful, send the MQTT connect packet */ |
icraggs | 3:dbff6b768d28 | 137 | int len = MQTTSerialize_connect(buf, buflen, options); |
icraggs | 3:dbff6b768d28 | 138 | sendPacket(len); // send the connect packet |
icraggs | 3:dbff6b768d28 | 139 | |
icraggs | 3:dbff6b768d28 | 140 | /* 3. wait until the connack is received */ |
icraggs | 4:4ef00243708e | 141 | if (resultHandler == 0) |
icraggs | 2:dcfdd2abfe71 | 142 | { |
icraggs | 2:dcfdd2abfe71 | 143 | // this will be a blocking call, wait for the connack |
icraggs | 4:4ef00243708e | 144 | //waitfor(CONNACK); |
icraggs | 4:4ef00243708e | 145 | } |
icraggs | 4:4ef00243708e | 146 | else |
icraggs | 4:4ef00243708e | 147 | { |
icraggs | 4:4ef00243708e | 148 | // set connect response callback function |
icraggs | 4:4ef00243708e | 149 | } |
icraggs | 2:dcfdd2abfe71 | 150 | |
icraggs | 2:dcfdd2abfe71 | 151 | return len; |
icraggs | 2:dcfdd2abfe71 | 152 | } |