A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Committer:
vpcola
Date:
Sat Apr 01 12:41:29 2017 +0000
Revision:
31:d34f6adb7a53
Parent:
27:c90092f35d79
Testing on NUCLEO L476RG

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 23:06fac173529e 1 #ifndef _MQTT_THREADED_CLIENT_H_
vpcola 23:06fac173529e 2 #define _MQTT_THREADED_CLIENT_H_
vpcola 23:06fac173529e 3
vpcola 23:06fac173529e 4 #include "mbed.h"
vpcola 23:06fac173529e 5 #include "rtos.h"
vpcola 23:06fac173529e 6 #include "MQTTPacket.h"
vpcola 23:06fac173529e 7 #include "NetworkInterface.h"
vpcola 23:06fac173529e 8 #include "FP.h"
vpcola 23:06fac173529e 9
vpcola 25:326f00faa092 10
vpcola 23:06fac173529e 11 #include <cstdio>
vpcola 23:06fac173529e 12 #include <string>
vpcola 23:06fac173529e 13 #include <map>
vpcola 23:06fac173529e 14
vpcola 31:d34f6adb7a53 15 //#define MQTT_DEBUG 1
vpcola 31:d34f6adb7a53 16
vpcola 31:d34f6adb7a53 17 #ifdef MQTT_DEBUG
vpcola 31:d34f6adb7a53 18 #define DBG(fmt, args...) printf(fmt, ## args)
vpcola 31:d34f6adb7a53 19 #else
vpcola 31:d34f6adb7a53 20 #define DBG(fmt, args...) /* Don't do anything in release builds */
vpcola 31:d34f6adb7a53 21 #endif
vpcola 31:d34f6adb7a53 22
vpcola 23:06fac173529e 23 #define COMMAND_TIMEOUT 5000
vpcola 23:06fac173529e 24 #define DEFAULT_SOCKET_TIMEOUT 1000
vpcola 31:d34f6adb7a53 25 #define MAX_MQTT_PACKET_SIZE 500
vpcola 31:d34f6adb7a53 26 #define MAX_MQTT_PAYLOAD_SIZE 300
vpcola 23:06fac173529e 27
vpcola 31:d34f6adb7a53 28 namespace MQTT
vpcola 31:d34f6adb7a53 29 {
vpcola 31:d34f6adb7a53 30
vpcola 23:06fac173529e 31 typedef enum { QOS0, QOS1, QOS2 } QoS;
vpcola 23:06fac173529e 32
vpcola 23:06fac173529e 33 // all failure return codes must be negative
vpcola 23:06fac173529e 34 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
vpcola 23:06fac173529e 35
vpcola 23:06fac173529e 36
vpcola 23:06fac173529e 37 typedef struct
vpcola 23:06fac173529e 38 {
vpcola 23:06fac173529e 39 QoS qos;
vpcola 23:06fac173529e 40 bool retained;
vpcola 23:06fac173529e 41 bool dup;
vpcola 23:06fac173529e 42 unsigned short id;
vpcola 23:06fac173529e 43 void *payload;
vpcola 23:06fac173529e 44 size_t payloadlen;
vpcola 23:06fac173529e 45 }Message, *pMessage;
vpcola 23:06fac173529e 46
vpcola 23:06fac173529e 47 // TODO:
vpcola 23:06fac173529e 48 // Merge this struct with the one above, in order to use the same
vpcola 23:06fac173529e 49 // data structure for sending and receiving. I need to simplify
vpcola 23:06fac173529e 50 // the PubMessage to not contain pointers like the one above.
vpcola 23:06fac173529e 51 typedef struct
vpcola 23:06fac173529e 52 {
vpcola 23:06fac173529e 53 char topic[100];
vpcola 23:06fac173529e 54 QoS qos;
vpcola 23:06fac173529e 55 unsigned short id;
vpcola 23:06fac173529e 56 size_t payloadlen;
vpcola 23:06fac173529e 57 char payload[MAX_MQTT_PAYLOAD_SIZE];
vpcola 23:06fac173529e 58 }PubMessage, *pPubMessage;
vpcola 23:06fac173529e 59
vpcola 23:06fac173529e 60 struct MessageData
vpcola 23:06fac173529e 61 {
vpcola 23:06fac173529e 62 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
vpcola 23:06fac173529e 63 { }
vpcola 23:06fac173529e 64 Message &message;
vpcola 23:06fac173529e 65 MQTTString &topicName;
vpcola 23:06fac173529e 66 };
vpcola 23:06fac173529e 67
vpcola 23:06fac173529e 68 class PacketId
vpcola 23:06fac173529e 69 {
vpcola 23:06fac173529e 70 public:
vpcola 23:06fac173529e 71 PacketId()
vpcola 23:06fac173529e 72 {
vpcola 23:06fac173529e 73 next = 0;
vpcola 23:06fac173529e 74 }
vpcola 23:06fac173529e 75
vpcola 23:06fac173529e 76 int getNext()
vpcola 23:06fac173529e 77 {
vpcola 23:06fac173529e 78 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
vpcola 23:06fac173529e 79 }
vpcola 23:06fac173529e 80
vpcola 23:06fac173529e 81 private:
vpcola 23:06fac173529e 82 static const int MAX_PACKET_ID = 65535;
vpcola 23:06fac173529e 83 int next;
vpcola 23:06fac173529e 84 };
vpcola 23:06fac173529e 85
vpcola 23:06fac173529e 86
vpcola 23:06fac173529e 87
vpcola 23:06fac173529e 88 class MQTTThreadedClient
vpcola 23:06fac173529e 89 {
vpcola 23:06fac173529e 90 public:
vpcola 25:326f00faa092 91 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
vpcola 23:06fac173529e 92 : network(aNetwork),
vpcola 25:326f00faa092 93 ssl_ca_pem(pem),
vpcola 26:4b21de8043a5 94 port((pem != NULL) ? 8883 : 1883),
vpcola 23:06fac173529e 95 queue(32 * EVENTS_EVENT_SIZE),
vpcola 26:4b21de8043a5 96 isConnected(false),
vpcola 26:4b21de8043a5 97 hasSavedSession(false),
vpcola 26:4b21de8043a5 98 useTLS(pem != NULL)
vpcola 25:326f00faa092 99 {
vpcola 25:326f00faa092 100 DRBG_PERS = "mbed TLS MQTT client";
vpcola 23:06fac173529e 101 tcpSocket = new TCPSocket();
vpcola 25:326f00faa092 102 setupTLS();
vpcola 25:326f00faa092 103 }
vpcola 25:326f00faa092 104
vpcola 25:326f00faa092 105 ~MQTTThreadedClient()
vpcola 25:326f00faa092 106 {
vpcola 25:326f00faa092 107 // TODO: signal the thread to shutdown
vpcola 25:326f00faa092 108 freeTLS();
vpcola 25:326f00faa092 109
vpcola 25:326f00faa092 110 if (isConnected)
vpcola 25:326f00faa092 111 disconnect();
vpcola 25:326f00faa092 112
vpcola 23:06fac173529e 113 }
vpcola 27:c90092f35d79 114 /**
vpcola 27:c90092f35d79 115 * Sets the connection parameters. Must be called before running the startListener as a thread.
vpcola 27:c90092f35d79 116 *
vpcola 27:c90092f35d79 117 * @param host - pointer to the host where the MQTT server is running
vpcola 27:c90092f35d79 118 * @param port - the port number to connect, 1883 for non secure connections, 8883 for
vpcola 27:c90092f35d79 119 * secure connections
vpcola 27:c90092f35d79 120 * @param options - the connect data used for logging into the MQTT server.
vpcola 27:c90092f35d79 121 */
vpcola 26:4b21de8043a5 122 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
vpcola 23:06fac173529e 123 int publish(PubMessage& message);
vpcola 25:326f00faa092 124
vpcola 25:326f00faa092 125 void addTopicHandler(const char * topic, void (*function)(MessageData &));
vpcola 25:326f00faa092 126 template<typename T>
vpcola 25:326f00faa092 127 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
vpcola 25:326f00faa092 128 {
vpcola 25:326f00faa092 129 FP<void,MessageData &> fp;
vpcola 25:326f00faa092 130 fp.attach(object, member);
vpcola 25:326f00faa092 131
vpcola 26:4b21de8043a5 132 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));
vpcola 25:326f00faa092 133 }
vpcola 25:326f00faa092 134
vpcola 27:c90092f35d79 135 // TODO: Add unsubscribe functionality.
vpcola 27:c90092f35d79 136
vpcola 26:4b21de8043a5 137 // Start the listener thread and start polling
vpcola 26:4b21de8043a5 138 // MQTT server.
vpcola 23:06fac173529e 139 void startListener();
vpcola 26:4b21de8043a5 140 // Stop the listerner thread and closes connection
vpcola 26:4b21de8043a5 141 void stopListener();
vpcola 23:06fac173529e 142
vpcola 23:06fac173529e 143 protected:
vpcola 26:4b21de8043a5 144
vpcola 23:06fac173529e 145 int handlePublishMsg();
vpcola 26:4b21de8043a5 146 void disconnect();
vpcola 26:4b21de8043a5 147 int connect();
vpcola 23:06fac173529e 148
vpcola 23:06fac173529e 149
vpcola 23:06fac173529e 150 private:
vpcola 23:06fac173529e 151 NetworkInterface * network;
vpcola 25:326f00faa092 152 const char * ssl_ca_pem;
vpcola 23:06fac173529e 153 TCPSocket * tcpSocket;
vpcola 23:06fac173529e 154 PacketId packetid;
vpcola 25:326f00faa092 155 const char *DRBG_PERS;
vpcola 25:326f00faa092 156 nsapi_error_t _error;
vpcola 25:326f00faa092 157 // Connection options
vpcola 25:326f00faa092 158 std::string host;
vpcola 25:326f00faa092 159 uint16_t port;
vpcola 25:326f00faa092 160 MQTTPacket_connectData connect_options;
vpcola 23:06fac173529e 161 // Event queue
vpcola 23:06fac173529e 162 EventQueue queue;
vpcola 23:06fac173529e 163 bool isConnected;
vpcola 26:4b21de8043a5 164 bool hasSavedSession;
vpcola 23:06fac173529e 165
vpcola 23:06fac173529e 166 // TODO: Because I'm using a map, I can only have one handler
vpcola 23:06fac173529e 167 // for each topic (one that's mapped to the topic string).
vpcola 23:06fac173529e 168 // Attaching another handler on the same topic is not possible.
vpcola 23:06fac173529e 169 // In the future, use a vector instead of maps to allow multiple
vpcola 23:06fac173529e 170 // handlers for the same topic.
vpcola 23:06fac173529e 171 std::map<std::string, FP<void, MessageData &> > topicCBMap;
vpcola 25:326f00faa092 172
vpcola 23:06fac173529e 173 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
vpcola 23:06fac173529e 174 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
vpcola 23:06fac173529e 175
vpcola 23:06fac173529e 176 unsigned int keepAliveInterval;
vpcola 23:06fac173529e 177 Timer comTimer;
vpcola 23:06fac173529e 178
vpcola 25:326f00faa092 179 // SSL/TLS functions
vpcola 26:4b21de8043a5 180 bool useTLS;
vpcola 25:326f00faa092 181 void setupTLS();
vpcola 25:326f00faa092 182 int initTLS();
vpcola 25:326f00faa092 183 void freeTLS();
vpcola 25:326f00faa092 184 int doTLSHandshake();
vpcola 25:326f00faa092 185
vpcola 25:326f00faa092 186 int processSubscriptions();
vpcola 23:06fac173529e 187 int readPacket();
vpcola 23:06fac173529e 188 int sendPacket(size_t length);
vpcola 23:06fac173529e 189 int readPacketLength(int* value);
vpcola 23:06fac173529e 190 int readUntil(int packetType, int timeout);
vpcola 23:06fac173529e 191 int readBytesToBuffer(char * buffer, size_t size, int timeout);
vpcola 23:06fac173529e 192 int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
vpcola 23:06fac173529e 193 bool isTopicMatched(char* topic, MQTTString& topicName);
vpcola 23:06fac173529e 194 int sendPublish(PubMessage& message);
vpcola 23:06fac173529e 195 void resetConnectionTimer();
vpcola 23:06fac173529e 196 void sendPingRequest();
vpcola 23:06fac173529e 197 bool hasConnectionTimedOut();
vpcola 26:4b21de8043a5 198 int login();
vpcola 23:06fac173529e 199 };
vpcola 23:06fac173529e 200
vpcola 31:d34f6adb7a53 201 }
vpcola 23:06fac173529e 202 #endif