Fork of my original MQTTGateway

Dependencies:   mbed-http

Committer:
vpcola
Date:
Sat Apr 08 14:43:14 2017 +0000
Revision:
0:a1734fe1ec4b
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 0:a1734fe1ec4b 1 #ifndef _MQTT_THREADED_CLIENT_H_
vpcola 0:a1734fe1ec4b 2 #define _MQTT_THREADED_CLIENT_H_
vpcola 0:a1734fe1ec4b 3
vpcola 0:a1734fe1ec4b 4 #include "mbed.h"
vpcola 0:a1734fe1ec4b 5 #include "rtos.h"
vpcola 0:a1734fe1ec4b 6 #include "MQTTPacket.h"
vpcola 0:a1734fe1ec4b 7 #include "NetworkInterface.h"
vpcola 0:a1734fe1ec4b 8 #include "FP.h"
vpcola 0:a1734fe1ec4b 9 #include "config.h"
vpcola 0:a1734fe1ec4b 10
vpcola 0:a1734fe1ec4b 11 #include <cstdio>
vpcola 0:a1734fe1ec4b 12 #include <string>
vpcola 0:a1734fe1ec4b 13 #include <map>
vpcola 0:a1734fe1ec4b 14
vpcola 0:a1734fe1ec4b 15 //#define MQTT_DEBUG 1
vpcola 0:a1734fe1ec4b 16
vpcola 0:a1734fe1ec4b 17
vpcola 0:a1734fe1ec4b 18 #define COMMAND_TIMEOUT 5000
vpcola 0:a1734fe1ec4b 19 #define DEFAULT_SOCKET_TIMEOUT 1000
vpcola 0:a1734fe1ec4b 20 #define MAX_MQTT_PACKET_SIZE 500
vpcola 0:a1734fe1ec4b 21 #define MAX_MQTT_PAYLOAD_SIZE 300
vpcola 0:a1734fe1ec4b 22
vpcola 0:a1734fe1ec4b 23 namespace MQTT
vpcola 0:a1734fe1ec4b 24 {
vpcola 0:a1734fe1ec4b 25
vpcola 0:a1734fe1ec4b 26 typedef enum { QOS0, QOS1, QOS2 } QoS;
vpcola 0:a1734fe1ec4b 27
vpcola 0:a1734fe1ec4b 28 // all failure return codes must be negative
vpcola 0:a1734fe1ec4b 29 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
vpcola 0:a1734fe1ec4b 30
vpcola 0:a1734fe1ec4b 31
vpcola 0:a1734fe1ec4b 32 typedef struct
vpcola 0:a1734fe1ec4b 33 {
vpcola 0:a1734fe1ec4b 34 QoS qos;
vpcola 0:a1734fe1ec4b 35 bool retained;
vpcola 0:a1734fe1ec4b 36 bool dup;
vpcola 0:a1734fe1ec4b 37 unsigned short id;
vpcola 0:a1734fe1ec4b 38 void *payload;
vpcola 0:a1734fe1ec4b 39 size_t payloadlen;
vpcola 0:a1734fe1ec4b 40 }Message, *pMessage;
vpcola 0:a1734fe1ec4b 41
vpcola 0:a1734fe1ec4b 42 // TODO:
vpcola 0:a1734fe1ec4b 43 // Merge this struct with the one above, in order to use the same
vpcola 0:a1734fe1ec4b 44 // data structure for sending and receiving. I need to simplify
vpcola 0:a1734fe1ec4b 45 // the PubMessage to not contain pointers like the one above.
vpcola 0:a1734fe1ec4b 46 typedef struct
vpcola 0:a1734fe1ec4b 47 {
vpcola 0:a1734fe1ec4b 48 char topic[100];
vpcola 0:a1734fe1ec4b 49 QoS qos;
vpcola 0:a1734fe1ec4b 50 unsigned short id;
vpcola 0:a1734fe1ec4b 51 size_t payloadlen;
vpcola 0:a1734fe1ec4b 52 char payload[MAX_MQTT_PAYLOAD_SIZE];
vpcola 0:a1734fe1ec4b 53 }PubMessage, *pPubMessage;
vpcola 0:a1734fe1ec4b 54
vpcola 0:a1734fe1ec4b 55 struct MessageData
vpcola 0:a1734fe1ec4b 56 {
vpcola 0:a1734fe1ec4b 57 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
vpcola 0:a1734fe1ec4b 58 { }
vpcola 0:a1734fe1ec4b 59 Message &message;
vpcola 0:a1734fe1ec4b 60 MQTTString &topicName;
vpcola 0:a1734fe1ec4b 61 };
vpcola 0:a1734fe1ec4b 62
vpcola 0:a1734fe1ec4b 63 class PacketId
vpcola 0:a1734fe1ec4b 64 {
vpcola 0:a1734fe1ec4b 65 public:
vpcola 0:a1734fe1ec4b 66 PacketId()
vpcola 0:a1734fe1ec4b 67 {
vpcola 0:a1734fe1ec4b 68 next = 0;
vpcola 0:a1734fe1ec4b 69 }
vpcola 0:a1734fe1ec4b 70
vpcola 0:a1734fe1ec4b 71 int getNext()
vpcola 0:a1734fe1ec4b 72 {
vpcola 0:a1734fe1ec4b 73 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
vpcola 0:a1734fe1ec4b 74 }
vpcola 0:a1734fe1ec4b 75
vpcola 0:a1734fe1ec4b 76 private:
vpcola 0:a1734fe1ec4b 77 static const int MAX_PACKET_ID = 65535;
vpcola 0:a1734fe1ec4b 78 int next;
vpcola 0:a1734fe1ec4b 79 };
vpcola 0:a1734fe1ec4b 80
vpcola 0:a1734fe1ec4b 81
vpcola 0:a1734fe1ec4b 82
vpcola 0:a1734fe1ec4b 83 class MQTTThreadedClient
vpcola 0:a1734fe1ec4b 84 {
vpcola 0:a1734fe1ec4b 85 public:
vpcola 0:a1734fe1ec4b 86 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
vpcola 0:a1734fe1ec4b 87 : network(aNetwork),
vpcola 0:a1734fe1ec4b 88 ssl_ca_pem(pem),
vpcola 0:a1734fe1ec4b 89 port((pem != NULL) ? 8883 : 1883),
vpcola 0:a1734fe1ec4b 90 queue(32 * EVENTS_EVENT_SIZE),
vpcola 0:a1734fe1ec4b 91 isConnected(false),
vpcola 0:a1734fe1ec4b 92 hasSavedSession(false),
vpcola 0:a1734fe1ec4b 93 useTLS(pem != NULL)
vpcola 0:a1734fe1ec4b 94 {
vpcola 0:a1734fe1ec4b 95 DRBG_PERS = "mbed TLS MQTT client";
vpcola 0:a1734fe1ec4b 96 tcpSocket = new TCPSocket();
vpcola 0:a1734fe1ec4b 97 setupTLS();
vpcola 0:a1734fe1ec4b 98 }
vpcola 0:a1734fe1ec4b 99
vpcola 0:a1734fe1ec4b 100 ~MQTTThreadedClient()
vpcola 0:a1734fe1ec4b 101 {
vpcola 0:a1734fe1ec4b 102 // TODO: signal the thread to shutdown
vpcola 0:a1734fe1ec4b 103 freeTLS();
vpcola 0:a1734fe1ec4b 104
vpcola 0:a1734fe1ec4b 105 if (isConnected)
vpcola 0:a1734fe1ec4b 106 disconnect();
vpcola 0:a1734fe1ec4b 107
vpcola 0:a1734fe1ec4b 108 }
vpcola 0:a1734fe1ec4b 109 /**
vpcola 0:a1734fe1ec4b 110 * Sets the connection parameters. Must be called before running the startListener as a thread.
vpcola 0:a1734fe1ec4b 111 *
vpcola 0:a1734fe1ec4b 112 * @param host - pointer to the host where the MQTT server is running
vpcola 0:a1734fe1ec4b 113 * @param port - the port number to connect, 1883 for non secure connections, 8883 for
vpcola 0:a1734fe1ec4b 114 * secure connections
vpcola 0:a1734fe1ec4b 115 * @param options - the connect data used for logging into the MQTT server.
vpcola 0:a1734fe1ec4b 116 */
vpcola 0:a1734fe1ec4b 117 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
vpcola 0:a1734fe1ec4b 118 int publish(PubMessage& message);
vpcola 0:a1734fe1ec4b 119
vpcola 0:a1734fe1ec4b 120 void addTopicHandler(const char * topic, void (*function)(MessageData &));
vpcola 0:a1734fe1ec4b 121 template<typename T>
vpcola 0:a1734fe1ec4b 122 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
vpcola 0:a1734fe1ec4b 123 {
vpcola 0:a1734fe1ec4b 124 FP<void,MessageData &> fp;
vpcola 0:a1734fe1ec4b 125 fp.attach(object, member);
vpcola 0:a1734fe1ec4b 126
vpcola 0:a1734fe1ec4b 127 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));
vpcola 0:a1734fe1ec4b 128 }
vpcola 0:a1734fe1ec4b 129
vpcola 0:a1734fe1ec4b 130 // TODO: Add unsubscribe functionality.
vpcola 0:a1734fe1ec4b 131
vpcola 0:a1734fe1ec4b 132 // Start the listener thread and start polling
vpcola 0:a1734fe1ec4b 133 // MQTT server.
vpcola 0:a1734fe1ec4b 134 void startListener();
vpcola 0:a1734fe1ec4b 135 // Stop the listerner thread and closes connection
vpcola 0:a1734fe1ec4b 136 void stopListener();
vpcola 0:a1734fe1ec4b 137
vpcola 0:a1734fe1ec4b 138 protected:
vpcola 0:a1734fe1ec4b 139
vpcola 0:a1734fe1ec4b 140 int handlePublishMsg();
vpcola 0:a1734fe1ec4b 141 void disconnect();
vpcola 0:a1734fe1ec4b 142 int connect();
vpcola 0:a1734fe1ec4b 143
vpcola 0:a1734fe1ec4b 144
vpcola 0:a1734fe1ec4b 145 private:
vpcola 0:a1734fe1ec4b 146 NetworkInterface * network;
vpcola 0:a1734fe1ec4b 147 const char * ssl_ca_pem;
vpcola 0:a1734fe1ec4b 148 TCPSocket * tcpSocket;
vpcola 0:a1734fe1ec4b 149 PacketId packetid;
vpcola 0:a1734fe1ec4b 150 const char *DRBG_PERS;
vpcola 0:a1734fe1ec4b 151 nsapi_error_t _error;
vpcola 0:a1734fe1ec4b 152 // Connection options
vpcola 0:a1734fe1ec4b 153 std::string host;
vpcola 0:a1734fe1ec4b 154 uint16_t port;
vpcola 0:a1734fe1ec4b 155 MQTTPacket_connectData connect_options;
vpcola 0:a1734fe1ec4b 156 // Event queue
vpcola 0:a1734fe1ec4b 157 EventQueue queue;
vpcola 0:a1734fe1ec4b 158 bool isConnected;
vpcola 0:a1734fe1ec4b 159 bool hasSavedSession;
vpcola 0:a1734fe1ec4b 160
vpcola 0:a1734fe1ec4b 161 // TODO: Because I'm using a map, I can only have one handler
vpcola 0:a1734fe1ec4b 162 // for each topic (one that's mapped to the topic string).
vpcola 0:a1734fe1ec4b 163 // Attaching another handler on the same topic is not possible.
vpcola 0:a1734fe1ec4b 164 // In the future, use a vector instead of maps to allow multiple
vpcola 0:a1734fe1ec4b 165 // handlers for the same topic.
vpcola 0:a1734fe1ec4b 166 std::map<std::string, FP<void, MessageData &> > topicCBMap;
vpcola 0:a1734fe1ec4b 167
vpcola 0:a1734fe1ec4b 168 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
vpcola 0:a1734fe1ec4b 169 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
vpcola 0:a1734fe1ec4b 170
vpcola 0:a1734fe1ec4b 171 unsigned int keepAliveInterval;
vpcola 0:a1734fe1ec4b 172 Timer comTimer;
vpcola 0:a1734fe1ec4b 173
vpcola 0:a1734fe1ec4b 174 // SSL/TLS functions
vpcola 0:a1734fe1ec4b 175 bool useTLS;
vpcola 0:a1734fe1ec4b 176 void setupTLS();
vpcola 0:a1734fe1ec4b 177 int initTLS();
vpcola 0:a1734fe1ec4b 178 void freeTLS();
vpcola 0:a1734fe1ec4b 179 int doTLSHandshake();
vpcola 0:a1734fe1ec4b 180
vpcola 0:a1734fe1ec4b 181 int processSubscriptions();
vpcola 0:a1734fe1ec4b 182 int readPacket();
vpcola 0:a1734fe1ec4b 183 int sendPacket(size_t length);
vpcola 0:a1734fe1ec4b 184 int readPacketLength(int* value);
vpcola 0:a1734fe1ec4b 185 int readUntil(int packetType, int timeout);
vpcola 0:a1734fe1ec4b 186 int readBytesToBuffer(char * buffer, size_t size, int timeout);
vpcola 0:a1734fe1ec4b 187 int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
vpcola 0:a1734fe1ec4b 188 bool isTopicMatched(char* topic, MQTTString& topicName);
vpcola 0:a1734fe1ec4b 189 int sendPublish(PubMessage& message);
vpcola 0:a1734fe1ec4b 190 void resetConnectionTimer();
vpcola 0:a1734fe1ec4b 191 void sendPingRequest();
vpcola 0:a1734fe1ec4b 192 bool hasConnectionTimedOut();
vpcola 0:a1734fe1ec4b 193 int login();
vpcola 0:a1734fe1ec4b 194 };
vpcola 0:a1734fe1ec4b 195
vpcola 0:a1734fe1ec4b 196 }
vpcola 0:a1734fe1ec4b 197 #endif