Fork of my original MQTTGateway

Dependencies:   mbed-http

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTThreadedClient.h Source File

MQTTThreadedClient.h

00001 #ifndef _MQTT_THREADED_CLIENT_H_
00002 #define _MQTT_THREADED_CLIENT_H_
00003 
00004 #include "mbed.h"
00005 #include "rtos.h"
00006 #include "MQTTPacket.h"
00007 #include "NetworkInterface.h"
00008 #include "FP.h"
00009 #include "config.h"
00010 
00011 #include <cstdio>
00012 #include <string>
00013 #include <map>
00014 
00015 //#define MQTT_DEBUG 1
00016 
00017 
00018 #define COMMAND_TIMEOUT 5000
00019 #define DEFAULT_SOCKET_TIMEOUT 1000
00020 #define MAX_MQTT_PACKET_SIZE 500
00021 #define MAX_MQTT_PAYLOAD_SIZE 300
00022 
00023 namespace MQTT
00024 {
00025     
00026 typedef enum { QOS0, QOS1, QOS2 } QoS;
00027 
00028 // all failure return codes must be negative
00029 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
00030 
00031 
00032 typedef struct
00033 {
00034     QoS qos;
00035     bool retained;
00036     bool dup;
00037     unsigned short id;
00038     void *payload;
00039     size_t payloadlen;
00040 }Message, *pMessage;
00041 
00042 // TODO:
00043 // Merge this struct with the one above, in order to use the same
00044 // data structure for sending and receiving. I need to simplify
00045 // the PubMessage to not contain pointers like the one above.
00046 typedef struct
00047 {
00048     char topic[100];
00049     QoS qos;
00050     unsigned short id;
00051     size_t payloadlen;
00052     char payload[MAX_MQTT_PAYLOAD_SIZE];
00053 }PubMessage, *pPubMessage;
00054 
00055 struct MessageData
00056 {
00057     MessageData(MQTTString &aTopicName, Message &aMessage)  : message(aMessage), topicName(aTopicName)
00058     { }
00059     Message &message;
00060     MQTTString &topicName;
00061 };
00062 
00063 class PacketId
00064 {
00065 public:
00066     PacketId()
00067     {
00068         next = 0;
00069     }
00070 
00071     int getNext()
00072     {
00073         return next = (next == MAX_PACKET_ID) ? 1 : ++next;
00074     }
00075 
00076 private:
00077     static const int MAX_PACKET_ID = 65535;
00078     int next;
00079 };
00080 
00081 
00082 
00083 class MQTTThreadedClient
00084 {
00085 public:
00086     MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
00087         : network(aNetwork),
00088           ssl_ca_pem(pem),
00089           port((pem != NULL) ? 8883 : 1883),
00090           queue(32 * EVENTS_EVENT_SIZE),
00091           isConnected(false),          
00092           hasSavedSession(false),
00093           useTLS(pem != NULL)
00094     {
00095         DRBG_PERS = "mbed TLS MQTT client";
00096         tcpSocket = new TCPSocket();
00097         setupTLS();
00098     }
00099     
00100     ~MQTTThreadedClient()
00101     {
00102         // TODO: signal the thread to shutdown
00103         freeTLS();
00104            
00105         if (isConnected)
00106             disconnect();
00107                 
00108     }
00109     /** 
00110      *  Sets the connection parameters. Must be called before running the startListener as a thread.
00111      *
00112      *  @param host - pointer to the host where the MQTT server is running
00113      *  @param port - the port number to connect, 1883 for non secure connections, 8883 for 
00114      *                secure connections
00115      *  @param options - the connect data used for logging into the MQTT server.
00116      */
00117     void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
00118     int publish(PubMessage& message);
00119     
00120     void addTopicHandler(const char * topic, void (*function)(MessageData &));
00121     template<typename T>
00122     void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
00123     {
00124         FP<void,MessageData &> fp;
00125         fp.attach(object, member);
00126 
00127         topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));  
00128     }
00129     
00130     // TODO: Add unsubscribe functionality.
00131     
00132     // Start the listener thread and start polling 
00133     // MQTT server.
00134     void startListener();
00135     // Stop the listerner thread and closes connection
00136     void stopListener();
00137 
00138 protected:
00139 
00140     int handlePublishMsg();
00141     void disconnect();  
00142     int connect();      
00143 
00144 
00145 private:
00146     NetworkInterface * network;
00147     const char * ssl_ca_pem;
00148     TCPSocket * tcpSocket;
00149     PacketId packetid;
00150     const char *DRBG_PERS;
00151     nsapi_error_t _error;    
00152     // Connection options
00153     std::string host;
00154     uint16_t port;
00155     MQTTPacket_connectData connect_options;
00156     // Event queue
00157     EventQueue queue;
00158     bool isConnected;
00159     bool hasSavedSession;    
00160 
00161     // TODO: Because I'm using a map, I can only have one handler
00162     // for each topic (one that's mapped to the topic string).
00163     // Attaching another handler on the same topic is not possible.
00164     // In the future, use a vector instead of maps to allow multiple
00165     // handlers for the same topic.
00166     std::map<std::string, FP<void, MessageData &> > topicCBMap;
00167     
00168     unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
00169     unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
00170 
00171     unsigned int keepAliveInterval;
00172     Timer comTimer;
00173 
00174     // SSL/TLS functions
00175     bool useTLS;
00176     void setupTLS();
00177     int initTLS();    
00178     void freeTLS();
00179     int doTLSHandshake();
00180     
00181     int processSubscriptions();
00182     int readPacket();
00183     int sendPacket(size_t length);
00184     int readPacketLength(int* value);
00185     int readUntil(int packetType, int timeout);
00186     int readBytesToBuffer(char * buffer, size_t size, int timeout);
00187     int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
00188     bool isTopicMatched(char* topic, MQTTString& topicName);
00189     int  sendPublish(PubMessage& message);
00190     void resetConnectionTimer();
00191     void sendPingRequest();
00192     bool hasConnectionTimedOut();
00193     int login();
00194 };
00195 
00196 }
00197 #endif