A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

MQTTThreadedClient.h

Committer:
vpcola
Date:
2017-03-27
Revision:
25:326f00faa092
Parent:
23:06fac173529e
Child:
26:4b21de8043a5

File content as of revision 25:326f00faa092:

#ifndef _MQTT_THREADED_CLIENT_H_
#define _MQTT_THREADED_CLIENT_H_

#include "mbed.h"
#include "rtos.h"
#include "MQTTPacket.h"
#include "NetworkInterface.h"
#include "FP.h"


#include <cstdio>
#include <string>
#include <map>

#define COMMAND_TIMEOUT 5000
#define DEFAULT_SOCKET_TIMEOUT 1000
#define MAX_MQTT_PACKET_SIZE 200
#define MAX_MQTT_PAYLOAD_SIZE 100

typedef enum { QOS0, QOS1, QOS2 } QoS;

// all failure return codes must be negative
typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;


typedef struct
{
    QoS qos;
    bool retained;
    bool dup;
    unsigned short id;
    void *payload;
    size_t payloadlen;
}Message, *pMessage;

// TODO:
// Merge this struct with the one above, in order to use the same
// data structure for sending and receiving. I need to simplify
// the PubMessage to not contain pointers like the one above.
typedef struct
{
    char topic[100];
    QoS qos;
    unsigned short id;
    size_t payloadlen;
    char payload[MAX_MQTT_PAYLOAD_SIZE];
}PubMessage, *pPubMessage;

struct MessageData
{
    MessageData(MQTTString &aTopicName, Message &aMessage)  : message(aMessage), topicName(aTopicName)
    { }
    Message &message;
    MQTTString &topicName;
};

class PacketId
{
public:
    PacketId()
    {
        next = 0;
    }

    int getNext()
    {
        return next = (next == MAX_PACKET_ID) ? 1 : ++next;
    }

private:
    static const int MAX_PACKET_ID = 65535;
    int next;
};



class MQTTThreadedClient
{
public:
    MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
        : network(aNetwork),
          ssl_ca_pem(pem),
          queue(32 * EVENTS_EVENT_SIZE),
          isConnected(false) 
    {
        DRBG_PERS = "mbed TLS MQTT client";
        tcpSocket = new TCPSocket();
        setupTLS();
    }
    
    ~MQTTThreadedClient()
    {
        // TODO: signal the thread to shutdown
        freeTLS();
           
        if (isConnected)
            disconnect();
                
    }


    int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
    void disconnect();
    
    int publish(PubMessage& message);
    
    void addTopicHandler(const char * topic, void (*function)(MessageData &));
    template<typename T>
    void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
    {
        FP<void,MessageData &> fp;
        fp.attach(object, member);

        topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));        
    }
    
    int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
    template<typename T>
    int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
        int rc = FAILURE;
        int len = 0;

        MQTTString topic = {(char*)topicstr, {0, 0}};
        printf("Subscribing to topic [%s]\r\n", topicstr);
        
        if (!isConnected) {
            printf("Session already connected!!\r\n");
            return rc;
        }

        len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
        if (len <= 0) {
            printf("Error serializing subscribe packet ...\r\n");
            return rc;
        }

        if ((rc = sendPacket(len)) != SUCCESS) {
            printf("Error sending subscribe packet [%d]\r\n", rc);
            return rc;
        }
        printf("Waiting for subscription ack ...\r\n");
        // Wait for SUBACK, dropping packets read along the way ...
        if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
            int count = 0, grantedQoS = -1;
            unsigned short mypacketid;
            if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
                rc = grantedQoS; // 0, 1, 2 or 0x80
            // For as long as we do not get 0x80 ..
            if (rc != 0x80) {
                // Add message handlers to the map
                FP<void,MessageData &> fp;
                fp.attach(object, member);

                topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));

                // Reset connection timers here ...
                resetConnectionTimer();
                printf("Successfully subscribed to %s ...\r\n", topicstr);
                rc = SUCCESS;
            }
        } else
        {
            printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
            rc = FAILURE;
        }

        return rc;
    }

    // the listener thread ...
    void startListener();

protected:
    int connect(MQTTPacket_connectData& options);
    int handlePublishMsg();


private:
    NetworkInterface * network;
    const char * ssl_ca_pem;
    TCPSocket * tcpSocket;
    PacketId packetid;
    const char *DRBG_PERS;
    nsapi_error_t _error;    
    // Connection options
    std::string host;
    uint16_t port;
    MQTTPacket_connectData connect_options;

    
    // Event queue
    EventQueue queue;
    bool isConnected;

    // TODO: Because I'm using a map, I can only have one handler
    // for each topic (one that's mapped to the topic string).
    // Attaching another handler on the same topic is not possible.
    // In the future, use a vector instead of maps to allow multiple
    // handlers for the same topic.
    std::map<std::string, FP<void, MessageData &> > topicCBMap;
    std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
    
    unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
    unsigned char readbuf[MAX_MQTT_PACKET_SIZE];

    unsigned int keepAliveInterval;
    Timer comTimer;

    // SSL/TLS functions
    void setupTLS();
    int initTLS();    
    void freeTLS();
    int doTLSHandshake();
    
    int processSubscriptions();
    int readPacket();
    int sendPacket(size_t length);
    int readPacketLength(int* value);
    int readUntil(int packetType, int timeout);
    int readBytesToBuffer(char * buffer, size_t size, int timeout);
    int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
    bool isTopicMatched(char* topic, MQTTString& topicName);
    int  sendPublish(PubMessage& message);
    void resetConnectionTimer();
    void sendPingRequest();
    bool hasConnectionTimedOut();

};

#endif