MQTT lib for WISEAgent
Fork of MQTT by
MQTTClient.cpp@3:dbff6b768d28, 2014-03-31 (annotated)
- Committer:
- icraggs
- Date:
- Mon Mar 31 15:48:45 2014 +0000
- Revision:
- 3:dbff6b768d28
- Parent:
- 2:dcfdd2abfe71
- Child:
- 4:4ef00243708e
Move parameters around to avoid storing data
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
sam_grove | 0:fe461e4d7afe | 1 | /** |
sam_grove | 0:fe461e4d7afe | 2 | * @file MQTTPubSub.cpp |
sam_grove | 0:fe461e4d7afe | 3 | * @brief API - for MQTT |
sam_grove | 0:fe461e4d7afe | 4 | * @author |
sam_grove | 0:fe461e4d7afe | 5 | * @version 1.0 |
sam_grove | 0:fe461e4d7afe | 6 | * @see |
sam_grove | 0:fe461e4d7afe | 7 | * |
sam_grove | 0:fe461e4d7afe | 8 | * Copyright (c) 2014 |
sam_grove | 0:fe461e4d7afe | 9 | * |
sam_grove | 0:fe461e4d7afe | 10 | * Licensed under the Apache License, Version 2.0 (the "License"); |
sam_grove | 0:fe461e4d7afe | 11 | * you may not use this file except in compliance with the License. |
sam_grove | 0:fe461e4d7afe | 12 | * You may obtain a copy of the License at |
sam_grove | 0:fe461e4d7afe | 13 | * |
sam_grove | 0:fe461e4d7afe | 14 | * http://www.apache.org/licenses/LICENSE-2.0 |
sam_grove | 0:fe461e4d7afe | 15 | * |
sam_grove | 0:fe461e4d7afe | 16 | * Unless required by applicable law or agreed to in writing, software |
sam_grove | 0:fe461e4d7afe | 17 | * distributed under the License is distributed on an "AS IS" BASIS, |
sam_grove | 0:fe461e4d7afe | 18 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
sam_grove | 0:fe461e4d7afe | 19 | * See the License for the specific language governing permissions and |
sam_grove | 0:fe461e4d7afe | 20 | * limitations under the License. |
sam_grove | 0:fe461e4d7afe | 21 | */ |
sam_grove | 0:fe461e4d7afe | 22 | |
icraggs | 2:dcfdd2abfe71 | 23 | #include "MQTTClient.h" |
icraggs | 2:dcfdd2abfe71 | 24 | #include "MQTTPacket.h" |
icraggs | 2:dcfdd2abfe71 | 25 | |
icraggs | 3:dbff6b768d28 | 26 | MQTT::Client::Client(IPStack* ipstack, const int buffer_size) |
icraggs | 2:dcfdd2abfe71 | 27 | { |
icraggs | 2:dcfdd2abfe71 | 28 | |
icraggs | 2:dcfdd2abfe71 | 29 | buf = new char[buffer_size]; |
icraggs | 3:dbff6b768d28 | 30 | this->ipstack = ipstack; |
icraggs | 3:dbff6b768d28 | 31 | } |
icraggs | 3:dbff6b768d28 | 32 | |
icraggs | 3:dbff6b768d28 | 33 | |
icraggs | 3:dbff6b768d28 | 34 | int MQTT::Client::sendPacket(int length) |
icraggs | 3:dbff6b768d28 | 35 | { |
icraggs | 3:dbff6b768d28 | 36 | int sent = 0; |
icraggs | 3:dbff6b768d28 | 37 | |
icraggs | 3:dbff6b768d28 | 38 | while (sent < length) |
icraggs | 3:dbff6b768d28 | 39 | sent += ipstack->write(&buf[sent], length); |
icraggs | 3:dbff6b768d28 | 40 | |
icraggs | 3:dbff6b768d28 | 41 | return sent; |
icraggs | 3:dbff6b768d28 | 42 | } |
icraggs | 3:dbff6b768d28 | 43 | |
icraggs | 3:dbff6b768d28 | 44 | |
icraggs | 3:dbff6b768d28 | 45 | int MQTT::Client::decodePacket(int* value, int timeout) |
icraggs | 3:dbff6b768d28 | 46 | { |
icraggs | 3:dbff6b768d28 | 47 | char c; |
icraggs | 3:dbff6b768d28 | 48 | int multiplier = 1; |
icraggs | 3:dbff6b768d28 | 49 | int len = 0; |
icraggs | 3:dbff6b768d28 | 50 | #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 |
icraggs | 3:dbff6b768d28 | 51 | |
icraggs | 3:dbff6b768d28 | 52 | *value = 0; |
icraggs | 3:dbff6b768d28 | 53 | do |
icraggs | 3:dbff6b768d28 | 54 | { |
icraggs | 3:dbff6b768d28 | 55 | int rc = MQTTPACKET_READ_ERROR; |
icraggs | 3:dbff6b768d28 | 56 | |
icraggs | 3:dbff6b768d28 | 57 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
icraggs | 3:dbff6b768d28 | 58 | { |
icraggs | 3:dbff6b768d28 | 59 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
icraggs | 3:dbff6b768d28 | 60 | goto exit; |
icraggs | 3:dbff6b768d28 | 61 | } |
icraggs | 3:dbff6b768d28 | 62 | rc = ipstack->read(&c, 1, timeout); |
icraggs | 3:dbff6b768d28 | 63 | if (rc != 1) |
icraggs | 3:dbff6b768d28 | 64 | goto exit; |
icraggs | 3:dbff6b768d28 | 65 | *value += (c & 127) * multiplier; |
icraggs | 3:dbff6b768d28 | 66 | multiplier *= 128; |
icraggs | 3:dbff6b768d28 | 67 | } while ((c & 128) != 0); |
icraggs | 3:dbff6b768d28 | 68 | exit: |
icraggs | 3:dbff6b768d28 | 69 | return len; |
icraggs | 2:dcfdd2abfe71 | 70 | } |
icraggs | 2:dcfdd2abfe71 | 71 | |
sam_grove | 0:fe461e4d7afe | 72 | |
icraggs | 3:dbff6b768d28 | 73 | int MQTT::Client::readPacket(int timeout) |
icraggs | 2:dcfdd2abfe71 | 74 | { |
icraggs | 3:dbff6b768d28 | 75 | int rc = -1; |
icraggs | 3:dbff6b768d28 | 76 | MQTTHeader header; |
icraggs | 3:dbff6b768d28 | 77 | int len = 0; |
icraggs | 3:dbff6b768d28 | 78 | int rem_len = 0; |
icraggs | 3:dbff6b768d28 | 79 | |
icraggs | 3:dbff6b768d28 | 80 | /* 1. read the header byte. This has the packet type in it */ |
icraggs | 3:dbff6b768d28 | 81 | if ((rc = ipstack->read(readbuf, 1, timeout)) != 1) |
icraggs | 3:dbff6b768d28 | 82 | goto exit; |
icraggs | 3:dbff6b768d28 | 83 | |
icraggs | 3:dbff6b768d28 | 84 | len = 1; |
icraggs | 3:dbff6b768d28 | 85 | /* 2. read the remaining length. This is variable in itself */ |
icraggs | 3:dbff6b768d28 | 86 | decodePacket(&rem_len, timeout); |
icraggs | 3:dbff6b768d28 | 87 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ |
icraggs | 3:dbff6b768d28 | 88 | |
icraggs | 3:dbff6b768d28 | 89 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
icraggs | 3:dbff6b768d28 | 90 | if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len) |
icraggs | 3:dbff6b768d28 | 91 | goto exit; |
icraggs | 3:dbff6b768d28 | 92 | |
icraggs | 3:dbff6b768d28 | 93 | header.byte = buf[0]; |
icraggs | 3:dbff6b768d28 | 94 | rc = header.bits.type; |
icraggs | 3:dbff6b768d28 | 95 | exit: |
icraggs | 3:dbff6b768d28 | 96 | return rc; |
icraggs | 3:dbff6b768d28 | 97 | } |
icraggs | 3:dbff6b768d28 | 98 | |
icraggs | 3:dbff6b768d28 | 99 | |
icraggs | 3:dbff6b768d28 | 100 | void MQTT::Client::cycle() |
icraggs | 3:dbff6b768d28 | 101 | { |
icraggs | 3:dbff6b768d28 | 102 | int timeout = 1000L; |
icraggs | 3:dbff6b768d28 | 103 | /* get one piece of work off the wire and one pass through */ |
icraggs | 2:dcfdd2abfe71 | 104 | |
icraggs | 3:dbff6b768d28 | 105 | // 1. read the socket, see what work is due. |
icraggs | 3:dbff6b768d28 | 106 | int packet_type = readPacket(buf, buflen, -1); |
icraggs | 2:dcfdd2abfe71 | 107 | |
icraggs | 3:dbff6b768d28 | 108 | } |
icraggs | 3:dbff6b768d28 | 109 | |
icraggs | 3:dbff6b768d28 | 110 | |
icraggs | 3:dbff6b768d28 | 111 | int MQTT::Client::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> resultHandler) |
icraggs | 3:dbff6b768d28 | 112 | { |
icraggs | 3:dbff6b768d28 | 113 | /* 1. connect to the server with the desired transport */ |
icraggs | 3:dbff6b768d28 | 114 | if (!ipstack->connect()) |
icraggs | 3:dbff6b768d28 | 115 | return -99; |
icraggs | 3:dbff6b768d28 | 116 | |
icraggs | 3:dbff6b768d28 | 117 | /* 2. if the connect was successful, send the MQTT connect packet */ |
icraggs | 3:dbff6b768d28 | 118 | int len = MQTTSerialize_connect(buf, buflen, options); |
icraggs | 3:dbff6b768d28 | 119 | sendPacket(len); // send the connect packet |
icraggs | 3:dbff6b768d28 | 120 | |
icraggs | 3:dbff6b768d28 | 121 | /* 3. wait until the connack is received */ |
icraggs | 2:dcfdd2abfe71 | 122 | /* how to make this check work? |
icraggs | 2:dcfdd2abfe71 | 123 | if (resultHandler == None) |
icraggs | 2:dcfdd2abfe71 | 124 | { |
icraggs | 2:dcfdd2abfe71 | 125 | // this will be a blocking call, wait for the connack |
icraggs | 2:dcfdd2abfe71 | 126 | |
icraggs | 2:dcfdd2abfe71 | 127 | }*/ |
icraggs | 2:dcfdd2abfe71 | 128 | |
icraggs | 2:dcfdd2abfe71 | 129 | return len; |
icraggs | 2:dcfdd2abfe71 | 130 | } |
icraggs | 2:dcfdd2abfe71 | 131 |