Vergil Cola
/
MQTTGatewayK64
Fork of my MQTTGateway
Diff: MQTTSManager/MQTTThreadedClient.h
- Revision:
- 0:f1d3878b8dd9
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTSManager/MQTTThreadedClient.h Sat Apr 08 14:45:51 2017 +0000 @@ -0,0 +1,197 @@ +#ifndef _MQTT_THREADED_CLIENT_H_ +#define _MQTT_THREADED_CLIENT_H_ + +#include "mbed.h" +#include "rtos.h" +#include "MQTTPacket.h" +#include "NetworkInterface.h" +#include "FP.h" +#include "config.h" + +#include <cstdio> +#include <string> +#include <map> + +//#define MQTT_DEBUG 1 + + +#define COMMAND_TIMEOUT 5000 +#define DEFAULT_SOCKET_TIMEOUT 1000 +#define MAX_MQTT_PACKET_SIZE 500 +#define MAX_MQTT_PAYLOAD_SIZE 300 + +namespace MQTT +{ + +typedef enum { QOS0, QOS1, QOS2 } QoS; + +// all failure return codes must be negative +typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; + + +typedef struct +{ + QoS qos; + bool retained; + bool dup; + unsigned short id; + void *payload; + size_t payloadlen; +}Message, *pMessage; + +// TODO: +// Merge this struct with the one above, in order to use the same +// data structure for sending and receiving. I need to simplify +// the PubMessage to not contain pointers like the one above. +typedef struct +{ + char topic[100]; + QoS qos; + unsigned short id; + size_t payloadlen; + char payload[MAX_MQTT_PAYLOAD_SIZE]; +}PubMessage, *pPubMessage; + +struct MessageData +{ + MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) + { } + Message &message; + MQTTString &topicName; +}; + +class PacketId +{ +public: + PacketId() + { + next = 0; + } + + int getNext() + { + return next = (next == MAX_PACKET_ID) ? 1 : ++next; + } + +private: + static const int MAX_PACKET_ID = 65535; + int next; +}; + + + +class MQTTThreadedClient +{ +public: + MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) + : network(aNetwork), + ssl_ca_pem(pem), + port((pem != NULL) ? 8883 : 1883), + queue(32 * EVENTS_EVENT_SIZE), + isConnected(false), + hasSavedSession(false), + useTLS(pem != NULL) + { + DRBG_PERS = "mbed TLS MQTT client"; + tcpSocket = new TCPSocket(); + setupTLS(); + } + + ~MQTTThreadedClient() + { + // TODO: signal the thread to shutdown + freeTLS(); + + if (isConnected) + disconnect(); + + } + /** + * Sets the connection parameters. Must be called before running the startListener as a thread. + * + * @param host - pointer to the host where the MQTT server is running + * @param port - the port number to connect, 1883 for non secure connections, 8883 for + * secure connections + * @param options - the connect data used for logging into the MQTT server. + */ + void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); + int publish(PubMessage& message); + + void addTopicHandler(const char * topic, void (*function)(MessageData &)); + template<typename T> + void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) + { + FP<void,MessageData &> fp; + fp.attach(object, member); + + topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); + } + + // TODO: Add unsubscribe functionality. + + // Start the listener thread and start polling + // MQTT server. + void startListener(); + // Stop the listerner thread and closes connection + void stopListener(); + +protected: + + int handlePublishMsg(); + void disconnect(); + int connect(); + + +private: + NetworkInterface * network; + const char * ssl_ca_pem; + TCPSocket * tcpSocket; + PacketId packetid; + const char *DRBG_PERS; + nsapi_error_t _error; + // Connection options + std::string host; + uint16_t port; + MQTTPacket_connectData connect_options; + // Event queue + EventQueue queue; + bool isConnected; + bool hasSavedSession; + + // TODO: Because I'm using a map, I can only have one handler + // for each topic (one that's mapped to the topic string). + // Attaching another handler on the same topic is not possible. + // In the future, use a vector instead of maps to allow multiple + // handlers for the same topic. + std::map<std::string, FP<void, MessageData &> > topicCBMap; + + unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; + unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; + + unsigned int keepAliveInterval; + Timer comTimer; + + // SSL/TLS functions + bool useTLS; + void setupTLS(); + int initTLS(); + void freeTLS(); + int doTLSHandshake(); + + int processSubscriptions(); + int readPacket(); + int sendPacket(size_t length); + int readPacketLength(int* value); + int readUntil(int packetType, int timeout); + int readBytesToBuffer(char * buffer, size_t size, int timeout); + int sendBytesFromBuffer(char * buffer, size_t size, int timeout); + bool isTopicMatched(char* topic, MQTTString& topicName); + int sendPublish(PubMessage& message); + void resetConnectionTimer(); + void sendPingRequest(); + bool hasConnectionTimedOut(); + int login(); +}; + +} +#endif