A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.h
- Revision:
- 25:326f00faa092
- Parent:
- 23:06fac173529e
- Child:
- 26:4b21de8043a5
--- a/MQTTThreadedClient.h Sun Mar 26 04:38:36 2017 +0000
+++ b/MQTTThreadedClient.h Mon Mar 27 03:53:18 2017 +0000
@@ -7,6 +7,7 @@
#include "NetworkInterface.h"
#include "FP.h"
+
#include <cstdio>
#include <string>
#include <map>
@@ -76,24 +77,52 @@
class MQTTThreadedClient
{
public:
- MQTTThreadedClient(NetworkInterface * aNetwork)
+ MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
: network(aNetwork),
+ ssl_ca_pem(pem),
queue(32 * EVENTS_EVENT_SIZE),
- isConnected(false) {
+ isConnected(false)
+ {
+ DRBG_PERS = "mbed TLS MQTT client";
tcpSocket = new TCPSocket();
+ setupTLS();
+ }
+
+ ~MQTTThreadedClient()
+ {
+ // TODO: signal the thread to shutdown
+ freeTLS();
+
+ if (isConnected)
+ disconnect();
+
}
+
int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
+ void disconnect();
+
int publish(PubMessage& message);
+
+ void addTopicHandler(const char * topic, void (*function)(MessageData &));
+ template<typename T>
+ void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &))
+ {
+ FP<void,MessageData &> fp;
+ fp.attach(object, member);
+
+ topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+ }
+
int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
-
template<typename T>
int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
int rc = FAILURE;
int len = 0;
MQTTString topic = {(char*)topicstr, {0, 0}};
-
+ printf("Subscribing to topic [%s]\r\n", topicstr);
+
if (!isConnected) {
printf("Session already connected!!\r\n");
return rc;
@@ -109,7 +138,7 @@
printf("Error sending subscribe packet [%d]\r\n", rc);
return rc;
}
-
+ printf("Waiting for subscription ack ...\r\n");
// Wait for SUBACK, dropping packets read along the way ...
if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
int count = 0, grantedQoS = -1;
@@ -126,11 +155,14 @@
// Reset connection timers here ...
resetConnectionTimer();
-
+ printf("Successfully subscribed to %s ...\r\n", topicstr);
rc = SUCCESS;
}
} else
+ {
+ printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
rc = FAILURE;
+ }
return rc;
}
@@ -145,8 +177,17 @@
private:
NetworkInterface * network;
+ const char * ssl_ca_pem;
TCPSocket * tcpSocket;
PacketId packetid;
+ const char *DRBG_PERS;
+ nsapi_error_t _error;
+ // Connection options
+ std::string host;
+ uint16_t port;
+ MQTTPacket_connectData connect_options;
+
+
// Event queue
EventQueue queue;
bool isConnected;
@@ -157,13 +198,21 @@
// In the future, use a vector instead of maps to allow multiple
// handlers for the same topic.
std::map<std::string, FP<void, MessageData &> > topicCBMap;
-
+ std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
+
unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
unsigned int keepAliveInterval;
Timer comTimer;
+ // SSL/TLS functions
+ void setupTLS();
+ int initTLS();
+ void freeTLS();
+ int doTLSHandshake();
+
+ int processSubscriptions();
int readPacket();
int sendPacket(size_t length);
int readPacketLength(int* value);
