A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

MQTTThreadedClient.h

Committer:
vpcola
Date:
2017-04-04
Revision:
35:c46b92269df4
Parent:
32:16ef25cbb05c

File content as of revision 35:c46b92269df4:

#ifndef _MQTT_THREADED_CLIENT_H_
#define _MQTT_THREADED_CLIENT_H_

#include "mbed.h"
#include "rtos.h"
#include "MQTTPacket.h"
#include "NetworkInterface.h"
#include "FP.h"

//#define MQTT_DEBUG 1

#ifdef MQTT_DEBUG
#define DBG(fmt, args...)    printf(fmt, ## args)
#else
#define DBG(fmt, args...)    /* Don't do anything in release builds */
#endif

#include <cstdio>
#include <string>
#include <map>

#define COMMAND_TIMEOUT 5000
#define DEFAULT_SOCKET_TIMEOUT 1000
#define MAX_MQTT_PACKET_SIZE 200
#define MAX_MQTT_PAYLOAD_SIZE 100

namespace MQTT
{
    
typedef enum { QOS0, QOS1, QOS2 } QoS;

// all failure return codes must be negative
typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;


typedef struct
{
    QoS qos;
    bool retained;
    bool dup;
    unsigned short id;
    void *payload;
    size_t payloadlen;
}Message, *pMessage;

// TODO:
// Merge this struct with the one above, in order to use the same
// data structure for sending and receiving. I need to simplify
// the PubMessage to not contain pointers like the one above.
typedef struct
{
    char topic[100];
    QoS qos;
    unsigned short id;
    size_t payloadlen;
    char payload[MAX_MQTT_PAYLOAD_SIZE];
}PubMessage, *pPubMessage;

struct MessageData
{
    MessageData(MQTTString &aTopicName, Message &aMessage)  : message(aMessage), topicName(aTopicName)
    { }
    Message &message;
    MQTTString &topicName;
};

class PacketId
{
public:
    PacketId()
    {
        next = 0;
    }

    int getNext()
    {
        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
    }

private:
    static const int MAX_PACKET_ID = 65535;
    int next;
};



class MQTTThreadedClient
{
public:
    MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
        : network(aNetwork),
          ssl_ca_pem(pem),
          port((pem != NULL) ? 8883 : 1883),
          queue(32 * EVENTS_EVENT_SIZE),
          isConnected(false),          
          hasSavedSession(false),
          useTLS(pem != NULL)
    {
        DRBG_PERS = "mbed TLS MQTT client";
        tcpSocket = new TCPSocket();
        setupTLS();
    }
    
    ~MQTTThreadedClient()
    {
        // TODO: signal the thread to shutdown
        freeTLS();
           
        if (isConnected)
            disconnect();
                
    }
    /** 
     *  Sets the connection parameters. Must be called before running the startListener as a thread.
     *
     *  @param host - pointer to the host where the MQTT server is running
     *  @param port - the port number to connect, 1883 for non secure connections, 8883 for 
     *                secure connections
     *  @param options - the connect data used for logging into the MQTT server.
     */
    void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
    int publish(PubMessage& message);
    
    void addTopicHandler(const char * topic, void (*function)(MessageData &));
    template<typename T>
    void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
    {
        FP<void,MessageData &> fp;
        fp.attach(object, member);

        topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));  
    }
    
    // TODO: Add unsubscribe functionality.
    
    // Start the listener thread and start polling 
    // MQTT server.
    void startListener();
    // Stop the listerner thread and closes connection
    void stopListener();

protected:

    int handlePublishMsg();
    void disconnect();  
    int connect();      


private:
    NetworkInterface * network;
    const char * ssl_ca_pem;
    TCPSocket * tcpSocket;
    PacketId packetid;
    const char *DRBG_PERS;
    nsapi_error_t _error;    
    // Connection options
    std::string host;
    uint16_t port;
    MQTTPacket_connectData connect_options;
    // Event queue
    EventQueue queue;
    bool isConnected;
    bool hasSavedSession;    

    // TODO: Because I'm using a map, I can only have one handler
    // for each topic (one that's mapped to the topic string).
    // Attaching another handler on the same topic is not possible.
    // In the future, use a vector instead of maps to allow multiple
    // handlers for the same topic.
    std::map<std::string, FP<void, MessageData &> > topicCBMap;
    
    unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];

    unsigned int keepAliveInterval;
    Timer comTimer;

    // SSL/TLS functions
    bool useTLS;
    void setupTLS();
    int initTLS();    
    void freeTLS();
    int doTLSHandshake();
    
    int processSubscriptions();
    int readPacket();
    int sendPacket(size_t length);
    int readPacketLength(int* value);
    int readUntil(int packetType, int timeout);
    int readBytesToBuffer(char * buffer, size_t size, int timeout);
    int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
    bool isTopicMatched(char* topic, MQTTString& topicName);
    int  sendPublish(PubMessage& message);
    void resetConnectionTimer();
    void sendPingRequest();
    bool hasConnectionTimedOut();
    int login();
};

}
#endif