A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.h
- Revision:
- 25:326f00faa092
- Parent:
- 23:06fac173529e
- Child:
- 26:4b21de8043a5
diff -r 9d5f0300d7ed -r 326f00faa092 MQTTThreadedClient.h --- a/MQTTThreadedClient.h Sun Mar 26 04:38:36 2017 +0000 +++ b/MQTTThreadedClient.h Mon Mar 27 03:53:18 2017 +0000 @@ -7,6 +7,7 @@ #include "NetworkInterface.h" #include "FP.h" + #include <cstdio> #include <string> #include <map> @@ -76,24 +77,52 @@ class MQTTThreadedClient { public: - MQTTThreadedClient(NetworkInterface * aNetwork) + MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) : network(aNetwork), + ssl_ca_pem(pem), queue(32 * EVENTS_EVENT_SIZE), - isConnected(false) { + isConnected(false) + { + DRBG_PERS = "mbed TLS MQTT client"; tcpSocket = new TCPSocket(); + setupTLS(); + } + + ~MQTTThreadedClient() + { + // TODO: signal the thread to shutdown + freeTLS(); + + if (isConnected) + disconnect(); + } + int connect(const char * host, uint16_t port, MQTTPacket_connectData & options); + void disconnect(); + int publish(PubMessage& message); + + void addTopicHandler(const char * topic, void (*function)(MessageData &)); + template<typename T> + void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) + { + FP<void,MessageData &> fp; + fp.attach(object, member); + + topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); + } + int subscribe(const char * topic, QoS qos, void (*function)(MessageData &)); - template<typename T> int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) { int rc = FAILURE; int len = 0; MQTTString topic = {(char*)topicstr, {0, 0}}; - + printf("Subscribing to topic [%s]\r\n", topicstr); + if (!isConnected) { printf("Session already connected!!\r\n"); return rc; @@ -109,7 +138,7 @@ printf("Error sending subscribe packet [%d]\r\n", rc); return rc; } - + printf("Waiting for subscription ack ...\r\n"); // Wait for SUBACK, dropping packets read along the way ... if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback int count = 0, grantedQoS = -1; @@ -126,11 +155,14 @@ // Reset connection timers here ... resetConnectionTimer(); - + printf("Successfully subscribed to %s ...\r\n", topicstr); rc = SUCCESS; } } else + { + printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); rc = FAILURE; + } return rc; } @@ -145,8 +177,17 @@ private: NetworkInterface * network; + const char * ssl_ca_pem; TCPSocket * tcpSocket; PacketId packetid; + const char *DRBG_PERS; + nsapi_error_t _error; + // Connection options + std::string host; + uint16_t port; + MQTTPacket_connectData connect_options; + + // Event queue EventQueue queue; bool isConnected; @@ -157,13 +198,21 @@ // In the future, use a vector instead of maps to allow multiple // handlers for the same topic. std::map<std::string, FP<void, MessageData &> > topicCBMap; - + std::map<std::string, FP<void, MessageData &> > topicCBMap_t; + unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; unsigned int keepAliveInterval; Timer comTimer; + // SSL/TLS functions + void setupTLS(); + int initTLS(); + void freeTLS(); + int doTLSHandshake(); + + int processSubscriptions(); int readPacket(); int sendPacket(size_t length); int readPacketLength(int* value);