A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.h
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTThreadedClient.h Sun Mar 26 04:35:46 2017 +0000 @@ -0,0 +1,181 @@ +#ifndef _MQTT_THREADED_CLIENT_H_ +#define _MQTT_THREADED_CLIENT_H_ + +#include "mbed.h" +#include "rtos.h" +#include "MQTTPacket.h" +#include "NetworkInterface.h" +#include "FP.h" + +#include <cstdio> +#include <string> +#include <map> + +#define COMMAND_TIMEOUT 5000 +#define DEFAULT_SOCKET_TIMEOUT 1000 +#define MAX_MQTT_PACKET_SIZE 200 +#define MAX_MQTT_PAYLOAD_SIZE 100 + +typedef enum { QOS0, QOS1, QOS2 } QoS; + +// all failure return codes must be negative +typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; + + +typedef struct +{ + QoS qos; + bool retained; + bool dup; + unsigned short id; + void *payload; + size_t payloadlen; +}Message, *pMessage; + +// TODO: +// Merge this struct with the one above, in order to use the same +// data structure for sending and receiving. I need to simplify +// the PubMessage to not contain pointers like the one above. +typedef struct +{ + char topic[100]; + QoS qos; + unsigned short id; + size_t payloadlen; + char payload[MAX_MQTT_PAYLOAD_SIZE]; +}PubMessage, *pPubMessage; + +struct MessageData +{ + MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) + { } + Message &message; + MQTTString &topicName; +}; + +class PacketId +{ +public: + PacketId() + { + next = 0; + } + + int getNext() + { + return next = (next == MAX_PACKET_ID) ? 1 : ++next; + } + +private: + static const int MAX_PACKET_ID = 65535; + int next; +}; + + + +class MQTTThreadedClient +{ +public: + MQTTThreadedClient(NetworkInterface * aNetwork) + : network(aNetwork), + queue(32 * EVENTS_EVENT_SIZE), + isConnected(false) { + tcpSocket = new TCPSocket(); + } + + int connect(const char * host, uint16_t port, MQTTPacket_connectData & options); + int publish(PubMessage& message); + int subscribe(const char * topic, QoS qos, void (*function)(MessageData &)); + + template<typename T> + int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) { + int rc = FAILURE; + int len = 0; + + MQTTString topic = {(char*)topicstr, {0, 0}}; + + if (!isConnected) { + printf("Session already connected!!\r\n"); + return rc; + } + + len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); + if (len <= 0) { + printf("Error serializing subscribe packet ...\r\n"); + return rc; + } + + if ((rc = sendPacket(len)) != SUCCESS) { + printf("Error sending subscribe packet [%d]\r\n", rc); + return rc; + } + + // Wait for SUBACK, dropping packets read along the way ... + if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback + int count = 0, grantedQoS = -1; + unsigned short mypacketid; + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) + rc = grantedQoS; // 0, 1, 2 or 0x80 + // For as long as we do not get 0x80 .. + if (rc != 0x80) { + // Add message handlers to the map + FP<void,MessageData &> fp; + fp.attach(object, member); + + topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); + + // Reset connection timers here ... + resetConnectionTimer(); + + rc = SUCCESS; + } + } else + rc = FAILURE; + + return rc; + } + + // the listener thread ... + void startListener(); + +protected: + int connect(MQTTPacket_connectData& options); + int handlePublishMsg(); + + +private: + NetworkInterface * network; + TCPSocket * tcpSocket; + PacketId packetid; + // Event queue + EventQueue queue; + bool isConnected; + + // TODO: Because I'm using a map, I can only have one handler + // for each topic (one that's mapped to the topic string). + // Attaching another handler on the same topic is not possible. + // In the future, use a vector instead of maps to allow multiple + // handlers for the same topic. + std::map<std::string, FP<void, MessageData &> > topicCBMap; + + unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; + unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; + + unsigned int keepAliveInterval; + Timer comTimer; + + int readPacket(); + int sendPacket(size_t length); + int readPacketLength(int* value); + int readUntil(int packetType, int timeout); + int readBytesToBuffer(char * buffer, size_t size, int timeout); + int sendBytesFromBuffer(char * buffer, size_t size, int timeout); + bool isTopicMatched(char* topic, MQTTString& topicName); + int sendPublish(PubMessage& message); + void resetConnectionTimer(); + void sendPingRequest(); + bool hasConnectionTimedOut(); + +}; + +#endif