An API for using MQTT over multiple transports for mbed OS 5

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Committer:
Ian Craggs
Date:
Mon Apr 07 23:52:57 2014 +0100
Revision:
5:389ccac5a50c
Parent:
4:4ef00243708e
Child:
6:4d312a49200b
Latest updates

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sam_grove 0:fe461e4d7afe 1 /**
sam_grove 0:fe461e4d7afe 2 * @file MQTTPubSub.cpp
sam_grove 0:fe461e4d7afe 3 * @brief API - for MQTT
sam_grove 0:fe461e4d7afe 4 * @author
sam_grove 0:fe461e4d7afe 5 * @version 1.0
sam_grove 0:fe461e4d7afe 6 * @see
sam_grove 0:fe461e4d7afe 7 *
sam_grove 0:fe461e4d7afe 8 * Copyright (c) 2014
sam_grove 0:fe461e4d7afe 9 *
sam_grove 0:fe461e4d7afe 10 * Licensed under the Apache License, Version 2.0 (the "License");
sam_grove 0:fe461e4d7afe 11 * you may not use this file except in compliance with the License.
sam_grove 0:fe461e4d7afe 12 * You may obtain a copy of the License at
sam_grove 0:fe461e4d7afe 13 *
sam_grove 0:fe461e4d7afe 14 * http://www.apache.org/licenses/LICENSE-2.0
sam_grove 0:fe461e4d7afe 15 *
sam_grove 0:fe461e4d7afe 16 * Unless required by applicable law or agreed to in writing, software
sam_grove 0:fe461e4d7afe 17 * distributed under the License is distributed on an "AS IS" BASIS,
sam_grove 0:fe461e4d7afe 18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
sam_grove 0:fe461e4d7afe 19 * See the License for the specific language governing permissions and
sam_grove 0:fe461e4d7afe 20 * limitations under the License.
sam_grove 0:fe461e4d7afe 21 */
sam_grove 0:fe461e4d7afe 22
icraggs 2:dcfdd2abfe71 23 #include "MQTTClient.h"
icraggs 2:dcfdd2abfe71 24 #include "MQTTPacket.h"
icraggs 2:dcfdd2abfe71 25
icraggs 4:4ef00243708e 26 template<class Network, class Thread> MQTT::Client<Network, Thread>::Client(Network* network, const int buffer_size, const int command_timeout)
icraggs 2:dcfdd2abfe71 27 {
icraggs 2:dcfdd2abfe71 28
Ian Craggs 5:389ccac5a50c 29 buf = new char[buffer_size];
Ian Craggs 5:389ccac5a50c 30 readbuf = new char[buffer_size];
icraggs 3:dbff6b768d28 31 this->ipstack = ipstack;
icraggs 4:4ef00243708e 32 this->command_timeout = command_timeout;
Ian Craggs 5:389ccac5a50c 33 //this->thread = new Thread(0); // only need a background thread for non-blocking mode
icraggs 4:4ef00243708e 34 this->ipstack = network;
icraggs 3:dbff6b768d28 35 }
icraggs 3:dbff6b768d28 36
icraggs 3:dbff6b768d28 37
icraggs 4:4ef00243708e 38 template<class Network, class Thread> int MQTT::Client<Network, Thread>::sendPacket(int length)
icraggs 3:dbff6b768d28 39 {
icraggs 3:dbff6b768d28 40 int sent = 0;
icraggs 3:dbff6b768d28 41
icraggs 3:dbff6b768d28 42 while (sent < length)
icraggs 4:4ef00243708e 43 sent += ipstack->write(&buf[sent], length, -1);
icraggs 3:dbff6b768d28 44
icraggs 3:dbff6b768d28 45 return sent;
icraggs 3:dbff6b768d28 46 }
icraggs 3:dbff6b768d28 47
icraggs 3:dbff6b768d28 48
icraggs 4:4ef00243708e 49 template<class Network, class Thread> int MQTT::Client<Network, Thread>::decodePacket(int* value, int timeout)
icraggs 3:dbff6b768d28 50 {
icraggs 3:dbff6b768d28 51 char c;
icraggs 3:dbff6b768d28 52 int multiplier = 1;
icraggs 3:dbff6b768d28 53 int len = 0;
icraggs 3:dbff6b768d28 54 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
icraggs 3:dbff6b768d28 55
icraggs 3:dbff6b768d28 56 *value = 0;
icraggs 3:dbff6b768d28 57 do
icraggs 3:dbff6b768d28 58 {
icraggs 3:dbff6b768d28 59 int rc = MQTTPACKET_READ_ERROR;
icraggs 3:dbff6b768d28 60
icraggs 3:dbff6b768d28 61 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 3:dbff6b768d28 62 {
icraggs 3:dbff6b768d28 63 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 3:dbff6b768d28 64 goto exit;
icraggs 3:dbff6b768d28 65 }
icraggs 3:dbff6b768d28 66 rc = ipstack->read(&c, 1, timeout);
icraggs 3:dbff6b768d28 67 if (rc != 1)
icraggs 3:dbff6b768d28 68 goto exit;
icraggs 3:dbff6b768d28 69 *value += (c & 127) * multiplier;
icraggs 3:dbff6b768d28 70 multiplier *= 128;
icraggs 3:dbff6b768d28 71 } while ((c & 128) != 0);
icraggs 3:dbff6b768d28 72 exit:
icraggs 3:dbff6b768d28 73 return len;
icraggs 2:dcfdd2abfe71 74 }
icraggs 2:dcfdd2abfe71 75
sam_grove 0:fe461e4d7afe 76
icraggs 4:4ef00243708e 77 template<class Network, class Thread> int MQTT::Client<Network,Thread>::readPacket(int timeout)
icraggs 2:dcfdd2abfe71 78 {
icraggs 3:dbff6b768d28 79 int rc = -1;
icraggs 4:4ef00243708e 80 MQTTHeader header = {0};
icraggs 3:dbff6b768d28 81 int len = 0;
icraggs 3:dbff6b768d28 82 int rem_len = 0;
icraggs 3:dbff6b768d28 83
icraggs 3:dbff6b768d28 84 /* 1. read the header byte. This has the packet type in it */
icraggs 3:dbff6b768d28 85 if ((rc = ipstack->read(readbuf, 1, timeout)) != 1)
icraggs 3:dbff6b768d28 86 goto exit;
icraggs 3:dbff6b768d28 87
icraggs 3:dbff6b768d28 88 len = 1;
icraggs 3:dbff6b768d28 89 /* 2. read the remaining length. This is variable in itself */
icraggs 3:dbff6b768d28 90 decodePacket(&rem_len, timeout);
icraggs 3:dbff6b768d28 91 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 3:dbff6b768d28 92
icraggs 3:dbff6b768d28 93 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 3:dbff6b768d28 94 if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len)
icraggs 3:dbff6b768d28 95 goto exit;
icraggs 3:dbff6b768d28 96
Ian Craggs 5:389ccac5a50c 97 header.byte = readbuf[0];
icraggs 3:dbff6b768d28 98 rc = header.bits.type;
icraggs 3:dbff6b768d28 99 exit:
icraggs 3:dbff6b768d28 100 return rc;
icraggs 3:dbff6b768d28 101 }
icraggs 3:dbff6b768d28 102
icraggs 3:dbff6b768d28 103
icraggs 4:4ef00243708e 104 template<class Network, class Thread> int MQTT::Client<Network, Thread>::cycle()
icraggs 3:dbff6b768d28 105 {
icraggs 3:dbff6b768d28 106 int timeout = 1000L;
icraggs 3:dbff6b768d28 107 /* get one piece of work off the wire and one pass through */
icraggs 2:dcfdd2abfe71 108
icraggs 3:dbff6b768d28 109 // 1. read the socket, see what work is due.
Ian Craggs 5:389ccac5a50c 110 int packet_type = readPacket(-1);
Ian Craggs 5:389ccac5a50c 111
Ian Craggs 5:389ccac5a50c 112 printf("packet type %d\n", packet_type);
icraggs 2:dcfdd2abfe71 113
icraggs 4:4ef00243708e 114 switch (packet_type)
icraggs 4:4ef00243708e 115 {
Ian Craggs 5:389ccac5a50c 116 case CONNACK:
Ian Craggs 5:389ccac5a50c 117 printf("connack received\n");
icraggs 4:4ef00243708e 118 break;
icraggs 4:4ef00243708e 119 case PUBACK:
icraggs 4:4ef00243708e 120 break;
icraggs 4:4ef00243708e 121 case SUBACK:
icraggs 4:4ef00243708e 122 break;
icraggs 4:4ef00243708e 123 case PUBREC:
icraggs 4:4ef00243708e 124 break;
icraggs 4:4ef00243708e 125 case PUBCOMP:
icraggs 4:4ef00243708e 126 break;
icraggs 4:4ef00243708e 127 case PINGRESP:
icraggs 4:4ef00243708e 128 break;
icraggs 4:4ef00243708e 129 }
icraggs 4:4ef00243708e 130 return packet_type;
icraggs 3:dbff6b768d28 131 }
icraggs 3:dbff6b768d28 132
icraggs 3:dbff6b768d28 133
icraggs 4:4ef00243708e 134 template<class Network, class Thread> int MQTT::Client<Network, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
icraggs 3:dbff6b768d28 135 {
Ian Craggs 5:389ccac5a50c 136 int len = 0;
Ian Craggs 5:389ccac5a50c 137 int rc = -99;
Ian Craggs 5:389ccac5a50c 138
Ian Craggs 5:389ccac5a50c 139 /* 2. if the connect was successful, send the MQTT connect packet */
Ian Craggs 5:389ccac5a50c 140 if (options == 0)
Ian Craggs 5:389ccac5a50c 141 {
Ian Craggs 5:389ccac5a50c 142 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
Ian Craggs 5:389ccac5a50c 143 default_options.clientID.cstring = "me";
Ian Craggs 5:389ccac5a50c 144 len = MQTTSerialize_connect(buf, buflen, &default_options);
Ian Craggs 5:389ccac5a50c 145 }
Ian Craggs 5:389ccac5a50c 146 else
Ian Craggs 5:389ccac5a50c 147 len = MQTTSerialize_connect(buf, buflen, options);
Ian Craggs 5:389ccac5a50c 148 rc = sendPacket(len); // send the connect packet
Ian Craggs 5:389ccac5a50c 149 printf("rc from send is %d\n", rc);
icraggs 3:dbff6b768d28 150
icraggs 3:dbff6b768d28 151 /* 3. wait until the connack is received */
icraggs 4:4ef00243708e 152 if (resultHandler == 0)
icraggs 2:dcfdd2abfe71 153 {
Ian Craggs 5:389ccac5a50c 154 // this will be a blocking call, wait for the connack
Ian Craggs 5:389ccac5a50c 155 if (cycle() == CONNACK)
Ian Craggs 5:389ccac5a50c 156 {
Ian Craggs 5:389ccac5a50c 157 int connack_rc = -1;
Ian Craggs 5:389ccac5a50c 158 if (MQTTDeserialize_connack(&connack_rc, readbuf, readbuflen) == 1)
Ian Craggs 5:389ccac5a50c 159 rc = connack_rc;
Ian Craggs 5:389ccac5a50c 160 }
icraggs 4:4ef00243708e 161 }
icraggs 4:4ef00243708e 162 else
icraggs 4:4ef00243708e 163 {
icraggs 4:4ef00243708e 164 // set connect response callback function
icraggs 4:4ef00243708e 165 }
icraggs 2:dcfdd2abfe71 166
Ian Craggs 5:389ccac5a50c 167 return rc;
icraggs 2:dcfdd2abfe71 168 }