An API for using MQTT over multiple transports for mbed OS 5

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Committer:
icraggs
Date:
Mon Mar 31 15:48:45 2014 +0000
Revision:
3:dbff6b768d28
Parent:
2:dcfdd2abfe71
Child:
4:4ef00243708e
Move parameters around to avoid storing data

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sam_grove 0:fe461e4d7afe 1 /**
sam_grove 0:fe461e4d7afe 2 * @file MQTTPubSub.cpp
sam_grove 0:fe461e4d7afe 3 * @brief API - for MQTT
sam_grove 0:fe461e4d7afe 4 * @author
sam_grove 0:fe461e4d7afe 5 * @version 1.0
sam_grove 0:fe461e4d7afe 6 * @see
sam_grove 0:fe461e4d7afe 7 *
sam_grove 0:fe461e4d7afe 8 * Copyright (c) 2014
sam_grove 0:fe461e4d7afe 9 *
sam_grove 0:fe461e4d7afe 10 * Licensed under the Apache License, Version 2.0 (the "License");
sam_grove 0:fe461e4d7afe 11 * you may not use this file except in compliance with the License.
sam_grove 0:fe461e4d7afe 12 * You may obtain a copy of the License at
sam_grove 0:fe461e4d7afe 13 *
sam_grove 0:fe461e4d7afe 14 * http://www.apache.org/licenses/LICENSE-2.0
sam_grove 0:fe461e4d7afe 15 *
sam_grove 0:fe461e4d7afe 16 * Unless required by applicable law or agreed to in writing, software
sam_grove 0:fe461e4d7afe 17 * distributed under the License is distributed on an "AS IS" BASIS,
sam_grove 0:fe461e4d7afe 18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
sam_grove 0:fe461e4d7afe 19 * See the License for the specific language governing permissions and
sam_grove 0:fe461e4d7afe 20 * limitations under the License.
sam_grove 0:fe461e4d7afe 21 */
sam_grove 0:fe461e4d7afe 22
icraggs 2:dcfdd2abfe71 23 #include "MQTTClient.h"
icraggs 2:dcfdd2abfe71 24 #include "MQTTPacket.h"
icraggs 2:dcfdd2abfe71 25
icraggs 3:dbff6b768d28 26 MQTT::Client::Client(IPStack* ipstack, const int buffer_size)
icraggs 2:dcfdd2abfe71 27 {
icraggs 2:dcfdd2abfe71 28
icraggs 2:dcfdd2abfe71 29 buf = new char[buffer_size];
icraggs 3:dbff6b768d28 30 this->ipstack = ipstack;
icraggs 3:dbff6b768d28 31 }
icraggs 3:dbff6b768d28 32
icraggs 3:dbff6b768d28 33
icraggs 3:dbff6b768d28 34 int MQTT::Client::sendPacket(int length)
icraggs 3:dbff6b768d28 35 {
icraggs 3:dbff6b768d28 36 int sent = 0;
icraggs 3:dbff6b768d28 37
icraggs 3:dbff6b768d28 38 while (sent < length)
icraggs 3:dbff6b768d28 39 sent += ipstack->write(&buf[sent], length);
icraggs 3:dbff6b768d28 40
icraggs 3:dbff6b768d28 41 return sent;
icraggs 3:dbff6b768d28 42 }
icraggs 3:dbff6b768d28 43
icraggs 3:dbff6b768d28 44
icraggs 3:dbff6b768d28 45 int MQTT::Client::decodePacket(int* value, int timeout)
icraggs 3:dbff6b768d28 46 {
icraggs 3:dbff6b768d28 47 char c;
icraggs 3:dbff6b768d28 48 int multiplier = 1;
icraggs 3:dbff6b768d28 49 int len = 0;
icraggs 3:dbff6b768d28 50 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
icraggs 3:dbff6b768d28 51
icraggs 3:dbff6b768d28 52 *value = 0;
icraggs 3:dbff6b768d28 53 do
icraggs 3:dbff6b768d28 54 {
icraggs 3:dbff6b768d28 55 int rc = MQTTPACKET_READ_ERROR;
icraggs 3:dbff6b768d28 56
icraggs 3:dbff6b768d28 57 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 3:dbff6b768d28 58 {
icraggs 3:dbff6b768d28 59 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 3:dbff6b768d28 60 goto exit;
icraggs 3:dbff6b768d28 61 }
icraggs 3:dbff6b768d28 62 rc = ipstack->read(&c, 1, timeout);
icraggs 3:dbff6b768d28 63 if (rc != 1)
icraggs 3:dbff6b768d28 64 goto exit;
icraggs 3:dbff6b768d28 65 *value += (c & 127) * multiplier;
icraggs 3:dbff6b768d28 66 multiplier *= 128;
icraggs 3:dbff6b768d28 67 } while ((c & 128) != 0);
icraggs 3:dbff6b768d28 68 exit:
icraggs 3:dbff6b768d28 69 return len;
icraggs 2:dcfdd2abfe71 70 }
icraggs 2:dcfdd2abfe71 71
sam_grove 0:fe461e4d7afe 72
icraggs 3:dbff6b768d28 73 int MQTT::Client::readPacket(int timeout)
icraggs 2:dcfdd2abfe71 74 {
icraggs 3:dbff6b768d28 75 int rc = -1;
icraggs 3:dbff6b768d28 76 MQTTHeader header;
icraggs 3:dbff6b768d28 77 int len = 0;
icraggs 3:dbff6b768d28 78 int rem_len = 0;
icraggs 3:dbff6b768d28 79
icraggs 3:dbff6b768d28 80 /* 1. read the header byte. This has the packet type in it */
icraggs 3:dbff6b768d28 81 if ((rc = ipstack->read(readbuf, 1, timeout)) != 1)
icraggs 3:dbff6b768d28 82 goto exit;
icraggs 3:dbff6b768d28 83
icraggs 3:dbff6b768d28 84 len = 1;
icraggs 3:dbff6b768d28 85 /* 2. read the remaining length. This is variable in itself */
icraggs 3:dbff6b768d28 86 decodePacket(&rem_len, timeout);
icraggs 3:dbff6b768d28 87 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 3:dbff6b768d28 88
icraggs 3:dbff6b768d28 89 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 3:dbff6b768d28 90 if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len)
icraggs 3:dbff6b768d28 91 goto exit;
icraggs 3:dbff6b768d28 92
icraggs 3:dbff6b768d28 93 header.byte = buf[0];
icraggs 3:dbff6b768d28 94 rc = header.bits.type;
icraggs 3:dbff6b768d28 95 exit:
icraggs 3:dbff6b768d28 96 return rc;
icraggs 3:dbff6b768d28 97 }
icraggs 3:dbff6b768d28 98
icraggs 3:dbff6b768d28 99
icraggs 3:dbff6b768d28 100 void MQTT::Client::cycle()
icraggs 3:dbff6b768d28 101 {
icraggs 3:dbff6b768d28 102 int timeout = 1000L;
icraggs 3:dbff6b768d28 103 /* get one piece of work off the wire and one pass through */
icraggs 2:dcfdd2abfe71 104
icraggs 3:dbff6b768d28 105 // 1. read the socket, see what work is due.
icraggs 3:dbff6b768d28 106 int packet_type = readPacket(buf, buflen, -1);
icraggs 2:dcfdd2abfe71 107
icraggs 3:dbff6b768d28 108 }
icraggs 3:dbff6b768d28 109
icraggs 3:dbff6b768d28 110
icraggs 3:dbff6b768d28 111 int MQTT::Client::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> resultHandler)
icraggs 3:dbff6b768d28 112 {
icraggs 3:dbff6b768d28 113 /* 1. connect to the server with the desired transport */
icraggs 3:dbff6b768d28 114 if (!ipstack->connect())
icraggs 3:dbff6b768d28 115 return -99;
icraggs 3:dbff6b768d28 116
icraggs 3:dbff6b768d28 117 /* 2. if the connect was successful, send the MQTT connect packet */
icraggs 3:dbff6b768d28 118 int len = MQTTSerialize_connect(buf, buflen, options);
icraggs 3:dbff6b768d28 119 sendPacket(len); // send the connect packet
icraggs 3:dbff6b768d28 120
icraggs 3:dbff6b768d28 121 /* 3. wait until the connack is received */
icraggs 2:dcfdd2abfe71 122 /* how to make this check work?
icraggs 2:dcfdd2abfe71 123 if (resultHandler == None)
icraggs 2:dcfdd2abfe71 124 {
icraggs 2:dcfdd2abfe71 125 // this will be a blocking call, wait for the connack
icraggs 2:dcfdd2abfe71 126
icraggs 2:dcfdd2abfe71 127 }*/
icraggs 2:dcfdd2abfe71 128
icraggs 2:dcfdd2abfe71 129 return len;
icraggs 2:dcfdd2abfe71 130 }
icraggs 2:dcfdd2abfe71 131