Fork of my MQTTGateway

Dependencies:   mbed-http

Revision:
0:f1d3878b8dd9
diff -r 000000000000 -r f1d3878b8dd9 MQTTSManager/MQTTThreadedClient.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTSManager/MQTTThreadedClient.h	Sat Apr 08 14:45:51 2017 +0000
@@ -0,0 +1,197 @@
+#ifndef _MQTT_THREADED_CLIENT_H_
+#define _MQTT_THREADED_CLIENT_H_
+
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTPacket.h"
+#include "NetworkInterface.h"
+#include "FP.h"
+#include "config.h"
+
+#include <cstdio>
+#include <string>
+#include <map>
+
+//#define MQTT_DEBUG 1
+
+
+#define COMMAND_TIMEOUT 5000
+#define DEFAULT_SOCKET_TIMEOUT 1000
+#define MAX_MQTT_PACKET_SIZE 500
+#define MAX_MQTT_PAYLOAD_SIZE 300
+
+namespace MQTT
+{
+    
+typedef enum { QOS0, QOS1, QOS2 } QoS;
+
+// all failure return codes must be negative
+typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
+
+
+typedef struct
+{
+    QoS qos;
+    bool retained;
+    bool dup;
+    unsigned short id;
+    void *payload;
+    size_t payloadlen;
+}Message, *pMessage;
+
+// TODO:
+// Merge this struct with the one above, in order to use the same
+// data structure for sending and receiving. I need to simplify
+// the PubMessage to not contain pointers like the one above.
+typedef struct
+{
+    char topic[100];
+    QoS qos;
+    unsigned short id;
+    size_t payloadlen;
+    char payload[MAX_MQTT_PAYLOAD_SIZE];
+}PubMessage, *pPubMessage;
+
+struct MessageData
+{
+    MessageData(MQTTString &aTopicName, Message &aMessage)  : message(aMessage), topicName(aTopicName)
+    { }
+    Message &message;
+    MQTTString &topicName;
+};
+
+class PacketId
+{
+public:
+    PacketId()
+    {
+        next = 0;
+    }
+
+    int getNext()
+    {
+        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+    }
+
+private:
+    static const int MAX_PACKET_ID = 65535;
+    int next;
+};
+
+
+
+class MQTTThreadedClient
+{
+public:
+    MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
+        : network(aNetwork),
+          ssl_ca_pem(pem),
+          port((pem != NULL) ? 8883 : 1883),
+          queue(32 * EVENTS_EVENT_SIZE),
+          isConnected(false),          
+          hasSavedSession(false),
+          useTLS(pem != NULL)
+    {
+        DRBG_PERS = "mbed TLS MQTT client";
+        tcpSocket = new TCPSocket();
+        setupTLS();
+    }
+    
+    ~MQTTThreadedClient()
+    {
+        // TODO: signal the thread to shutdown
+        freeTLS();
+           
+        if (isConnected)
+            disconnect();
+                
+    }
+    /** 
+     *  Sets the connection parameters. Must be called before running the startListener as a thread.
+     *
+     *  @param host - pointer to the host where the MQTT server is running
+     *  @param port - the port number to connect, 1883 for non secure connections, 8883 for 
+     *                secure connections
+     *  @param options - the connect data used for logging into the MQTT server.
+     */
+    void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
+    int publish(PubMessage& message);
+    
+    void addTopicHandler(const char * topic, void (*function)(MessageData &));
+    template<typename T>
+    void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
+    {
+        FP<void,MessageData &> fp;
+        fp.attach(object, member);
+
+        topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));  
+    }
+    
+    // TODO: Add unsubscribe functionality.
+    
+    // Start the listener thread and start polling 
+    // MQTT server.
+    void startListener();
+    // Stop the listerner thread and closes connection
+    void stopListener();
+
+protected:
+
+    int handlePublishMsg();
+    void disconnect();  
+    int connect();      
+
+
+private:
+    NetworkInterface * network;
+    const char * ssl_ca_pem;
+    TCPSocket * tcpSocket;
+    PacketId packetid;
+    const char *DRBG_PERS;
+    nsapi_error_t _error;    
+    // Connection options
+    std::string host;
+    uint16_t port;
+    MQTTPacket_connectData connect_options;
+    // Event queue
+    EventQueue queue;
+    bool isConnected;
+    bool hasSavedSession;    
+
+    // TODO: Because I'm using a map, I can only have one handler
+    // for each topic (one that's mapped to the topic string).
+    // Attaching another handler on the same topic is not possible.
+    // In the future, use a vector instead of maps to allow multiple
+    // handlers for the same topic.
+    std::map<std::string, FP<void, MessageData &> > topicCBMap;
+    
+    unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
+    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
+
+    unsigned int keepAliveInterval;
+    Timer comTimer;
+
+    // SSL/TLS functions
+    bool useTLS;
+    void setupTLS();
+    int initTLS();    
+    void freeTLS();
+    int doTLSHandshake();
+    
+    int processSubscriptions();
+    int readPacket();
+    int sendPacket(size_t length);
+    int readPacketLength(int* value);
+    int readUntil(int packetType, int timeout);
+    int readBytesToBuffer(char * buffer, size_t size, int timeout);
+    int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
+    bool isTopicMatched(char* topic, MQTTString& topicName);
+    int  sendPublish(PubMessage& message);
+    void resetConnectionTimer();
+    void sendPingRequest();
+    bool hasConnectionTimedOut();
+    int login();
+};
+
+}
+#endif