A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTThreadedClient.h Source File

MQTTThreadedClient.h

00001 #ifndef _MQTT_THREADED_CLIENT_H_
00002 #define _MQTT_THREADED_CLIENT_H_
00003 
00004 #include "mbed.h"
00005 #include "rtos.h"
00006 #include "MQTTPacket.h"
00007 #include "NetworkInterface.h"
00008 #include "FP.h"
00009 
00010 //#define MQTT_DEBUG 1
00011 
00012 #ifdef MQTT_DEBUG
00013 #define DBG(fmt, args...)    printf(fmt, ## args)
00014 #else
00015 #define DBG(fmt, args...)    /* Don't do anything in release builds */
00016 #endif
00017 
00018 #include <cstdio>
00019 #include <string>
00020 #include <map>
00021 
00022 #define COMMAND_TIMEOUT 5000
00023 #define DEFAULT_SOCKET_TIMEOUT 1000
00024 #define MAX_MQTT_PACKET_SIZE 200
00025 #define MAX_MQTT_PAYLOAD_SIZE 100
00026 
00027 namespace MQTT
00028 {
00029     
00030 typedef enum { QOS0, QOS1, QOS2 } QoS;
00031 
00032 // all failure return codes must be negative
00033 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
00034 
00035 
00036 typedef struct
00037 {
00038     QoS qos;
00039     bool retained;
00040     bool dup;
00041     unsigned short id;
00042     void *payload;
00043     size_t payloadlen;
00044 }Message, *pMessage;
00045 
00046 // TODO:
00047 // Merge this struct with the one above, in order to use the same
00048 // data structure for sending and receiving. I need to simplify
00049 // the PubMessage to not contain pointers like the one above.
00050 typedef struct
00051 {
00052     char topic[100];
00053     QoS qos;
00054     unsigned short id;
00055     size_t payloadlen;
00056     char payload[MAX_MQTT_PAYLOAD_SIZE];
00057 }PubMessage, *pPubMessage;
00058 
00059 struct MessageData
00060 {
00061     MessageData(MQTTString &aTopicName, Message &aMessage)  : message(aMessage), topicName(aTopicName)
00062     { }
00063     Message &message;
00064     MQTTString &topicName;
00065 };
00066 
00067 class PacketId
00068 {
00069 public:
00070     PacketId()
00071     {
00072         next = 0;
00073     }
00074 
00075     int getNext()
00076     {
00077         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00078     }
00079 
00080 private:
00081     static const int MAX_PACKET_ID = 65535;
00082     int next;
00083 };
00084 
00085 
00086 
00087 class MQTTThreadedClient
00088 {
00089 public:
00090     MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
00091         : network(aNetwork),
00092           ssl_ca_pem(pem),
00093           port((pem != NULL) ? 8883 : 1883),
00094           queue(32 * EVENTS_EVENT_SIZE),
00095           isConnected(false),          
00096           hasSavedSession(false),
00097           useTLS(pem != NULL)
00098     {
00099         DRBG_PERS = "mbed TLS MQTT client";
00100         tcpSocket = new TCPSocket();
00101         setupTLS();
00102     }
00103     
00104     ~MQTTThreadedClient()
00105     {
00106         // TODO: signal the thread to shutdown
00107         freeTLS();
00108            
00109         if (isConnected)
00110             disconnect();
00111                 
00112     }
00113     /** 
00114      *  Sets the connection parameters. Must be called before running the startListener as a thread.
00115      *
00116      *  @param host - pointer to the host where the MQTT server is running
00117      *  @param port - the port number to connect, 1883 for non secure connections, 8883 for 
00118      *                secure connections
00119      *  @param options - the connect data used for logging into the MQTT server.
00120      */
00121     void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
00122     int publish(PubMessage& message);
00123     
00124     void addTopicHandler(const char * topic, void (*function)(MessageData &));
00125     template<typename T>
00126     void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
00127     {
00128         FP<void,MessageData &> fp;
00129         fp.attach(object, member);
00130 
00131         topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));  
00132     }
00133     
00134     // TODO: Add unsubscribe functionality.
00135     
00136     // Start the listener thread and start polling 
00137     // MQTT server.
00138     void startListener();
00139     // Stop the listerner thread and closes connection
00140     void stopListener();
00141 
00142 protected:
00143 
00144     int handlePublishMsg();
00145     void disconnect();  
00146     int connect();      
00147 
00148 
00149 private:
00150     NetworkInterface * network;
00151     const char * ssl_ca_pem;
00152     TCPSocket * tcpSocket;
00153     PacketId packetid;
00154     const char *DRBG_PERS;
00155     nsapi_error_t _error;    
00156     // Connection options
00157     std::string host;
00158     uint16_t port;
00159     MQTTPacket_connectData connect_options;
00160     // Event queue
00161     EventQueue queue;
00162     bool isConnected;
00163     bool hasSavedSession;    
00164 
00165     // TODO: Because I'm using a map, I can only have one handler
00166     // for each topic (one that's mapped to the topic string).
00167     // Attaching another handler on the same topic is not possible.
00168     // In the future, use a vector instead of maps to allow multiple
00169     // handlers for the same topic.
00170     std::map<std::string, FP<void, MessageData &> > topicCBMap;
00171     
00172     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00173     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00174 
00175     unsigned int keepAliveInterval;
00176     Timer comTimer;
00177 
00178     // SSL/TLS functions
00179     bool useTLS;
00180     void setupTLS();
00181     int initTLS();    
00182     void freeTLS();
00183     int doTLSHandshake();
00184     
00185     int processSubscriptions();
00186     int readPacket();
00187     int sendPacket(size_t length);
00188     int readPacketLength(int* value);
00189     int readUntil(int packetType, int timeout);
00190     int readBytesToBuffer(char * buffer, size_t size, int timeout);
00191     int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
00192     bool isTopicMatched(char* topic, MQTTString& topicName);
00193     int  sendPublish(PubMessage& message);
00194     void resetConnectionTimer();
00195     void sendPingRequest();
00196     bool hasConnectionTimedOut();
00197     int login();
00198 };
00199 
00200 }
00201 #endif