MQTT lib for WISEAgent

Dependencies:   FP MQTTPacket

Dependents:   MQTT_G_SENSOR

Fork of MQTT by MQTT

Committer:
icraggs
Date:
Mon Apr 07 12:24:36 2014 +0000
Revision:
4:4ef00243708e
Parent:
3:dbff6b768d28
Child:
5:389ccac5a50c
Templates for both networking and tasks

Who changed what in which revision?

UserRevisionLine numberNew contents of line
sam_grove 0:fe461e4d7afe 1 /**
sam_grove 0:fe461e4d7afe 2 * @file MQTTPubSub.cpp
sam_grove 0:fe461e4d7afe 3 * @brief API - for MQTT
sam_grove 0:fe461e4d7afe 4 * @author
sam_grove 0:fe461e4d7afe 5 * @version 1.0
sam_grove 0:fe461e4d7afe 6 * @see
sam_grove 0:fe461e4d7afe 7 *
sam_grove 0:fe461e4d7afe 8 * Copyright (c) 2014
sam_grove 0:fe461e4d7afe 9 *
sam_grove 0:fe461e4d7afe 10 * Licensed under the Apache License, Version 2.0 (the "License");
sam_grove 0:fe461e4d7afe 11 * you may not use this file except in compliance with the License.
sam_grove 0:fe461e4d7afe 12 * You may obtain a copy of the License at
sam_grove 0:fe461e4d7afe 13 *
sam_grove 0:fe461e4d7afe 14 * http://www.apache.org/licenses/LICENSE-2.0
sam_grove 0:fe461e4d7afe 15 *
sam_grove 0:fe461e4d7afe 16 * Unless required by applicable law or agreed to in writing, software
sam_grove 0:fe461e4d7afe 17 * distributed under the License is distributed on an "AS IS" BASIS,
sam_grove 0:fe461e4d7afe 18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
sam_grove 0:fe461e4d7afe 19 * See the License for the specific language governing permissions and
sam_grove 0:fe461e4d7afe 20 * limitations under the License.
sam_grove 0:fe461e4d7afe 21 */
sam_grove 0:fe461e4d7afe 22
icraggs 2:dcfdd2abfe71 23 #include "MQTTClient.h"
icraggs 2:dcfdd2abfe71 24 #include "MQTTPacket.h"
icraggs 2:dcfdd2abfe71 25
icraggs 4:4ef00243708e 26 template<class Network, class Thread> MQTT::Client<Network, Thread>::Client(Network* network, const int buffer_size, const int command_timeout)
icraggs 2:dcfdd2abfe71 27 {
icraggs 2:dcfdd2abfe71 28
icraggs 2:dcfdd2abfe71 29 buf = new char[buffer_size];
icraggs 3:dbff6b768d28 30 this->ipstack = ipstack;
icraggs 4:4ef00243708e 31 this->command_timeout = command_timeout;
icraggs 4:4ef00243708e 32 this->thread = new Thread(0); // only need a background thread for non-blocking mode
icraggs 4:4ef00243708e 33 this->ipstack = network;
icraggs 3:dbff6b768d28 34 }
icraggs 3:dbff6b768d28 35
icraggs 3:dbff6b768d28 36
icraggs 4:4ef00243708e 37 template<class Network, class Thread> int MQTT::Client<Network, Thread>::sendPacket(int length)
icraggs 3:dbff6b768d28 38 {
icraggs 3:dbff6b768d28 39 int sent = 0;
icraggs 3:dbff6b768d28 40
icraggs 3:dbff6b768d28 41 while (sent < length)
icraggs 4:4ef00243708e 42 sent += ipstack->write(&buf[sent], length, -1);
icraggs 3:dbff6b768d28 43
icraggs 3:dbff6b768d28 44 return sent;
icraggs 3:dbff6b768d28 45 }
icraggs 3:dbff6b768d28 46
icraggs 3:dbff6b768d28 47
icraggs 4:4ef00243708e 48 template<class Network, class Thread> int MQTT::Client<Network, Thread>::decodePacket(int* value, int timeout)
icraggs 3:dbff6b768d28 49 {
icraggs 3:dbff6b768d28 50 char c;
icraggs 3:dbff6b768d28 51 int multiplier = 1;
icraggs 3:dbff6b768d28 52 int len = 0;
icraggs 3:dbff6b768d28 53 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
icraggs 3:dbff6b768d28 54
icraggs 3:dbff6b768d28 55 *value = 0;
icraggs 3:dbff6b768d28 56 do
icraggs 3:dbff6b768d28 57 {
icraggs 3:dbff6b768d28 58 int rc = MQTTPACKET_READ_ERROR;
icraggs 3:dbff6b768d28 59
icraggs 3:dbff6b768d28 60 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
icraggs 3:dbff6b768d28 61 {
icraggs 3:dbff6b768d28 62 rc = MQTTPACKET_READ_ERROR; /* bad data */
icraggs 3:dbff6b768d28 63 goto exit;
icraggs 3:dbff6b768d28 64 }
icraggs 3:dbff6b768d28 65 rc = ipstack->read(&c, 1, timeout);
icraggs 3:dbff6b768d28 66 if (rc != 1)
icraggs 3:dbff6b768d28 67 goto exit;
icraggs 3:dbff6b768d28 68 *value += (c & 127) * multiplier;
icraggs 3:dbff6b768d28 69 multiplier *= 128;
icraggs 3:dbff6b768d28 70 } while ((c & 128) != 0);
icraggs 3:dbff6b768d28 71 exit:
icraggs 3:dbff6b768d28 72 return len;
icraggs 2:dcfdd2abfe71 73 }
icraggs 2:dcfdd2abfe71 74
sam_grove 0:fe461e4d7afe 75
icraggs 4:4ef00243708e 76 template<class Network, class Thread> int MQTT::Client<Network,Thread>::readPacket(int timeout)
icraggs 2:dcfdd2abfe71 77 {
icraggs 3:dbff6b768d28 78 int rc = -1;
icraggs 4:4ef00243708e 79 MQTTHeader header = {0};
icraggs 3:dbff6b768d28 80 int len = 0;
icraggs 3:dbff6b768d28 81 int rem_len = 0;
icraggs 3:dbff6b768d28 82
icraggs 3:dbff6b768d28 83 /* 1. read the header byte. This has the packet type in it */
icraggs 3:dbff6b768d28 84 if ((rc = ipstack->read(readbuf, 1, timeout)) != 1)
icraggs 3:dbff6b768d28 85 goto exit;
icraggs 3:dbff6b768d28 86
icraggs 3:dbff6b768d28 87 len = 1;
icraggs 3:dbff6b768d28 88 /* 2. read the remaining length. This is variable in itself */
icraggs 3:dbff6b768d28 89 decodePacket(&rem_len, timeout);
icraggs 3:dbff6b768d28 90 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
icraggs 3:dbff6b768d28 91
icraggs 3:dbff6b768d28 92 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
icraggs 3:dbff6b768d28 93 if ((rc = ipstack->read(readbuf + len, rem_len, timeout)) != rem_len)
icraggs 3:dbff6b768d28 94 goto exit;
icraggs 3:dbff6b768d28 95
icraggs 3:dbff6b768d28 96 header.byte = buf[0];
icraggs 3:dbff6b768d28 97 rc = header.bits.type;
icraggs 3:dbff6b768d28 98 exit:
icraggs 3:dbff6b768d28 99 return rc;
icraggs 3:dbff6b768d28 100 }
icraggs 3:dbff6b768d28 101
icraggs 3:dbff6b768d28 102
icraggs 4:4ef00243708e 103 template<class Network, class Thread> int MQTT::Client<Network, Thread>::cycle()
icraggs 3:dbff6b768d28 104 {
icraggs 3:dbff6b768d28 105 int timeout = 1000L;
icraggs 3:dbff6b768d28 106 /* get one piece of work off the wire and one pass through */
icraggs 2:dcfdd2abfe71 107
icraggs 3:dbff6b768d28 108 // 1. read the socket, see what work is due.
icraggs 4:4ef00243708e 109 int packet_type = readPacket(-1);
icraggs 2:dcfdd2abfe71 110
icraggs 4:4ef00243708e 111 switch (packet_type)
icraggs 4:4ef00243708e 112 {
icraggs 4:4ef00243708e 113 case CONNACK:
icraggs 4:4ef00243708e 114 break;
icraggs 4:4ef00243708e 115 case PUBACK:
icraggs 4:4ef00243708e 116 break;
icraggs 4:4ef00243708e 117 case SUBACK:
icraggs 4:4ef00243708e 118 break;
icraggs 4:4ef00243708e 119 case PUBREC:
icraggs 4:4ef00243708e 120 break;
icraggs 4:4ef00243708e 121 case PUBCOMP:
icraggs 4:4ef00243708e 122 break;
icraggs 4:4ef00243708e 123 case PINGRESP:
icraggs 4:4ef00243708e 124 break;
icraggs 4:4ef00243708e 125 }
icraggs 4:4ef00243708e 126 return packet_type;
icraggs 3:dbff6b768d28 127 }
icraggs 3:dbff6b768d28 128
icraggs 3:dbff6b768d28 129
icraggs 4:4ef00243708e 130 template<class Network, class Thread> int MQTT::Client<Network, Thread>::connect(MQTTPacket_connectData* options, FP<void, MQTT::Result*> *resultHandler)
icraggs 3:dbff6b768d28 131 {
icraggs 3:dbff6b768d28 132 /* 1. connect to the server with the desired transport */
icraggs 4:4ef00243708e 133 /*if (!ipstack->connect())
icraggs 4:4ef00243708e 134 return -99;*/
icraggs 3:dbff6b768d28 135
icraggs 3:dbff6b768d28 136 /* 2. if the connect was successful, send the MQTT connect packet */
icraggs 3:dbff6b768d28 137 int len = MQTTSerialize_connect(buf, buflen, options);
icraggs 3:dbff6b768d28 138 sendPacket(len); // send the connect packet
icraggs 3:dbff6b768d28 139
icraggs 3:dbff6b768d28 140 /* 3. wait until the connack is received */
icraggs 4:4ef00243708e 141 if (resultHandler == 0)
icraggs 2:dcfdd2abfe71 142 {
icraggs 2:dcfdd2abfe71 143 // this will be a blocking call, wait for the connack
icraggs 4:4ef00243708e 144 //waitfor(CONNACK);
icraggs 4:4ef00243708e 145 }
icraggs 4:4ef00243708e 146 else
icraggs 4:4ef00243708e 147 {
icraggs 4:4ef00243708e 148 // set connect response callback function
icraggs 4:4ef00243708e 149 }
icraggs 2:dcfdd2abfe71 150
icraggs 2:dcfdd2abfe71 151 return len;
icraggs 2:dcfdd2abfe71 152 }