Fork of my MQTTGateway

Dependencies:   mbed-http

Committer:
vpcola
Date:
Sat Apr 08 14:45:51 2017 +0000
Revision:
0:f1d3878b8dd9
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 0:f1d3878b8dd9 1 #ifndef _MQTT_THREADED_CLIENT_H_
vpcola 0:f1d3878b8dd9 2 #define _MQTT_THREADED_CLIENT_H_
vpcola 0:f1d3878b8dd9 3
vpcola 0:f1d3878b8dd9 4 #include "mbed.h"
vpcola 0:f1d3878b8dd9 5 #include "rtos.h"
vpcola 0:f1d3878b8dd9 6 #include "MQTTPacket.h"
vpcola 0:f1d3878b8dd9 7 #include "NetworkInterface.h"
vpcola 0:f1d3878b8dd9 8 #include "FP.h"
vpcola 0:f1d3878b8dd9 9 #include "config.h"
vpcola 0:f1d3878b8dd9 10
vpcola 0:f1d3878b8dd9 11 #include <cstdio>
vpcola 0:f1d3878b8dd9 12 #include <string>
vpcola 0:f1d3878b8dd9 13 #include <map>
vpcola 0:f1d3878b8dd9 14
vpcola 0:f1d3878b8dd9 15 //#define MQTT_DEBUG 1
vpcola 0:f1d3878b8dd9 16
vpcola 0:f1d3878b8dd9 17
vpcola 0:f1d3878b8dd9 18 #define COMMAND_TIMEOUT 5000
vpcola 0:f1d3878b8dd9 19 #define DEFAULT_SOCKET_TIMEOUT 1000
vpcola 0:f1d3878b8dd9 20 #define MAX_MQTT_PACKET_SIZE 500
vpcola 0:f1d3878b8dd9 21 #define MAX_MQTT_PAYLOAD_SIZE 300
vpcola 0:f1d3878b8dd9 22
vpcola 0:f1d3878b8dd9 23 namespace MQTT
vpcola 0:f1d3878b8dd9 24 {
vpcola 0:f1d3878b8dd9 25
vpcola 0:f1d3878b8dd9 26 typedef enum { QOS0, QOS1, QOS2 } QoS;
vpcola 0:f1d3878b8dd9 27
vpcola 0:f1d3878b8dd9 28 // all failure return codes must be negative
vpcola 0:f1d3878b8dd9 29 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
vpcola 0:f1d3878b8dd9 30
vpcola 0:f1d3878b8dd9 31
vpcola 0:f1d3878b8dd9 32 typedef struct
vpcola 0:f1d3878b8dd9 33 {
vpcola 0:f1d3878b8dd9 34 QoS qos;
vpcola 0:f1d3878b8dd9 35 bool retained;
vpcola 0:f1d3878b8dd9 36 bool dup;
vpcola 0:f1d3878b8dd9 37 unsigned short id;
vpcola 0:f1d3878b8dd9 38 void *payload;
vpcola 0:f1d3878b8dd9 39 size_t payloadlen;
vpcola 0:f1d3878b8dd9 40 }Message, *pMessage;
vpcola 0:f1d3878b8dd9 41
vpcola 0:f1d3878b8dd9 42 // TODO:
vpcola 0:f1d3878b8dd9 43 // Merge this struct with the one above, in order to use the same
vpcola 0:f1d3878b8dd9 44 // data structure for sending and receiving. I need to simplify
vpcola 0:f1d3878b8dd9 45 // the PubMessage to not contain pointers like the one above.
vpcola 0:f1d3878b8dd9 46 typedef struct
vpcola 0:f1d3878b8dd9 47 {
vpcola 0:f1d3878b8dd9 48 char topic[100];
vpcola 0:f1d3878b8dd9 49 QoS qos;
vpcola 0:f1d3878b8dd9 50 unsigned short id;
vpcola 0:f1d3878b8dd9 51 size_t payloadlen;
vpcola 0:f1d3878b8dd9 52 char payload[MAX_MQTT_PAYLOAD_SIZE];
vpcola 0:f1d3878b8dd9 53 }PubMessage, *pPubMessage;
vpcola 0:f1d3878b8dd9 54
vpcola 0:f1d3878b8dd9 55 struct MessageData
vpcola 0:f1d3878b8dd9 56 {
vpcola 0:f1d3878b8dd9 57 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
vpcola 0:f1d3878b8dd9 58 { }
vpcola 0:f1d3878b8dd9 59 Message &message;
vpcola 0:f1d3878b8dd9 60 MQTTString &topicName;
vpcola 0:f1d3878b8dd9 61 };
vpcola 0:f1d3878b8dd9 62
vpcola 0:f1d3878b8dd9 63 class PacketId
vpcola 0:f1d3878b8dd9 64 {
vpcola 0:f1d3878b8dd9 65 public:
vpcola 0:f1d3878b8dd9 66 PacketId()
vpcola 0:f1d3878b8dd9 67 {
vpcola 0:f1d3878b8dd9 68 next = 0;
vpcola 0:f1d3878b8dd9 69 }
vpcola 0:f1d3878b8dd9 70
vpcola 0:f1d3878b8dd9 71 int getNext()
vpcola 0:f1d3878b8dd9 72 {
vpcola 0:f1d3878b8dd9 73 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
vpcola 0:f1d3878b8dd9 74 }
vpcola 0:f1d3878b8dd9 75
vpcola 0:f1d3878b8dd9 76 private:
vpcola 0:f1d3878b8dd9 77 static const int MAX_PACKET_ID = 65535;
vpcola 0:f1d3878b8dd9 78 int next;
vpcola 0:f1d3878b8dd9 79 };
vpcola 0:f1d3878b8dd9 80
vpcola 0:f1d3878b8dd9 81
vpcola 0:f1d3878b8dd9 82
vpcola 0:f1d3878b8dd9 83 class MQTTThreadedClient
vpcola 0:f1d3878b8dd9 84 {
vpcola 0:f1d3878b8dd9 85 public:
vpcola 0:f1d3878b8dd9 86 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
vpcola 0:f1d3878b8dd9 87 : network(aNetwork),
vpcola 0:f1d3878b8dd9 88 ssl_ca_pem(pem),
vpcola 0:f1d3878b8dd9 89 port((pem != NULL) ? 8883 : 1883),
vpcola 0:f1d3878b8dd9 90 queue(32 * EVENTS_EVENT_SIZE),
vpcola 0:f1d3878b8dd9 91 isConnected(false),
vpcola 0:f1d3878b8dd9 92 hasSavedSession(false),
vpcola 0:f1d3878b8dd9 93 useTLS(pem != NULL)
vpcola 0:f1d3878b8dd9 94 {
vpcola 0:f1d3878b8dd9 95 DRBG_PERS = "mbed TLS MQTT client";
vpcola 0:f1d3878b8dd9 96 tcpSocket = new TCPSocket();
vpcola 0:f1d3878b8dd9 97 setupTLS();
vpcola 0:f1d3878b8dd9 98 }
vpcola 0:f1d3878b8dd9 99
vpcola 0:f1d3878b8dd9 100 ~MQTTThreadedClient()
vpcola 0:f1d3878b8dd9 101 {
vpcola 0:f1d3878b8dd9 102 // TODO: signal the thread to shutdown
vpcola 0:f1d3878b8dd9 103 freeTLS();
vpcola 0:f1d3878b8dd9 104
vpcola 0:f1d3878b8dd9 105 if (isConnected)
vpcola 0:f1d3878b8dd9 106 disconnect();
vpcola 0:f1d3878b8dd9 107
vpcola 0:f1d3878b8dd9 108 }
vpcola 0:f1d3878b8dd9 109 /**
vpcola 0:f1d3878b8dd9 110 * Sets the connection parameters. Must be called before running the startListener as a thread.
vpcola 0:f1d3878b8dd9 111 *
vpcola 0:f1d3878b8dd9 112 * @param host - pointer to the host where the MQTT server is running
vpcola 0:f1d3878b8dd9 113 * @param port - the port number to connect, 1883 for non secure connections, 8883 for
vpcola 0:f1d3878b8dd9 114 * secure connections
vpcola 0:f1d3878b8dd9 115 * @param options - the connect data used for logging into the MQTT server.
vpcola 0:f1d3878b8dd9 116 */
vpcola 0:f1d3878b8dd9 117 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
vpcola 0:f1d3878b8dd9 118 int publish(PubMessage& message);
vpcola 0:f1d3878b8dd9 119
vpcola 0:f1d3878b8dd9 120 void addTopicHandler(const char * topic, void (*function)(MessageData &));
vpcola 0:f1d3878b8dd9 121 template<typename T>
vpcola 0:f1d3878b8dd9 122 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
vpcola 0:f1d3878b8dd9 123 {
vpcola 0:f1d3878b8dd9 124 FP<void,MessageData &> fp;
vpcola 0:f1d3878b8dd9 125 fp.attach(object, member);
vpcola 0:f1d3878b8dd9 126
vpcola 0:f1d3878b8dd9 127 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));
vpcola 0:f1d3878b8dd9 128 }
vpcola 0:f1d3878b8dd9 129
vpcola 0:f1d3878b8dd9 130 // TODO: Add unsubscribe functionality.
vpcola 0:f1d3878b8dd9 131
vpcola 0:f1d3878b8dd9 132 // Start the listener thread and start polling
vpcola 0:f1d3878b8dd9 133 // MQTT server.
vpcola 0:f1d3878b8dd9 134 void startListener();
vpcola 0:f1d3878b8dd9 135 // Stop the listerner thread and closes connection
vpcola 0:f1d3878b8dd9 136 void stopListener();
vpcola 0:f1d3878b8dd9 137
vpcola 0:f1d3878b8dd9 138 protected:
vpcola 0:f1d3878b8dd9 139
vpcola 0:f1d3878b8dd9 140 int handlePublishMsg();
vpcola 0:f1d3878b8dd9 141 void disconnect();
vpcola 0:f1d3878b8dd9 142 int connect();
vpcola 0:f1d3878b8dd9 143
vpcola 0:f1d3878b8dd9 144
vpcola 0:f1d3878b8dd9 145 private:
vpcola 0:f1d3878b8dd9 146 NetworkInterface * network;
vpcola 0:f1d3878b8dd9 147 const char * ssl_ca_pem;
vpcola 0:f1d3878b8dd9 148 TCPSocket * tcpSocket;
vpcola 0:f1d3878b8dd9 149 PacketId packetid;
vpcola 0:f1d3878b8dd9 150 const char *DRBG_PERS;
vpcola 0:f1d3878b8dd9 151 nsapi_error_t _error;
vpcola 0:f1d3878b8dd9 152 // Connection options
vpcola 0:f1d3878b8dd9 153 std::string host;
vpcola 0:f1d3878b8dd9 154 uint16_t port;
vpcola 0:f1d3878b8dd9 155 MQTTPacket_connectData connect_options;
vpcola 0:f1d3878b8dd9 156 // Event queue
vpcola 0:f1d3878b8dd9 157 EventQueue queue;
vpcola 0:f1d3878b8dd9 158 bool isConnected;
vpcola 0:f1d3878b8dd9 159 bool hasSavedSession;
vpcola 0:f1d3878b8dd9 160
vpcola 0:f1d3878b8dd9 161 // TODO: Because I'm using a map, I can only have one handler
vpcola 0:f1d3878b8dd9 162 // for each topic (one that's mapped to the topic string).
vpcola 0:f1d3878b8dd9 163 // Attaching another handler on the same topic is not possible.
vpcola 0:f1d3878b8dd9 164 // In the future, use a vector instead of maps to allow multiple
vpcola 0:f1d3878b8dd9 165 // handlers for the same topic.
vpcola 0:f1d3878b8dd9 166 std::map<std::string, FP<void, MessageData &> > topicCBMap;
vpcola 0:f1d3878b8dd9 167
vpcola 0:f1d3878b8dd9 168 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
vpcola 0:f1d3878b8dd9 169 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
vpcola 0:f1d3878b8dd9 170
vpcola 0:f1d3878b8dd9 171 unsigned int keepAliveInterval;
vpcola 0:f1d3878b8dd9 172 Timer comTimer;
vpcola 0:f1d3878b8dd9 173
vpcola 0:f1d3878b8dd9 174 // SSL/TLS functions
vpcola 0:f1d3878b8dd9 175 bool useTLS;
vpcola 0:f1d3878b8dd9 176 void setupTLS();
vpcola 0:f1d3878b8dd9 177 int initTLS();
vpcola 0:f1d3878b8dd9 178 void freeTLS();
vpcola 0:f1d3878b8dd9 179 int doTLSHandshake();
vpcola 0:f1d3878b8dd9 180
vpcola 0:f1d3878b8dd9 181 int processSubscriptions();
vpcola 0:f1d3878b8dd9 182 int readPacket();
vpcola 0:f1d3878b8dd9 183 int sendPacket(size_t length);
vpcola 0:f1d3878b8dd9 184 int readPacketLength(int* value);
vpcola 0:f1d3878b8dd9 185 int readUntil(int packetType, int timeout);
vpcola 0:f1d3878b8dd9 186 int readBytesToBuffer(char * buffer, size_t size, int timeout);
vpcola 0:f1d3878b8dd9 187 int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
vpcola 0:f1d3878b8dd9 188 bool isTopicMatched(char* topic, MQTTString& topicName);
vpcola 0:f1d3878b8dd9 189 int sendPublish(PubMessage& message);
vpcola 0:f1d3878b8dd9 190 void resetConnectionTimer();
vpcola 0:f1d3878b8dd9 191 void sendPingRequest();
vpcola 0:f1d3878b8dd9 192 bool hasConnectionTimedOut();
vpcola 0:f1d3878b8dd9 193 int login();
vpcola 0:f1d3878b8dd9 194 };
vpcola 0:f1d3878b8dd9 195
vpcola 0:f1d3878b8dd9 196 }
vpcola 0:f1d3878b8dd9 197 #endif