A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.h
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTThreadedClient.h Sun Mar 26 04:35:46 2017 +0000
@@ -0,0 +1,181 @@
+#ifndef _MQTT_THREADED_CLIENT_H_
+#define _MQTT_THREADED_CLIENT_H_
+
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTPacket.h"
+#include "NetworkInterface.h"
+#include "FP.h"
+
+#include <cstdio>
+#include <string>
+#include <map>
+
+#define COMMAND_TIMEOUT 5000
+#define DEFAULT_SOCKET_TIMEOUT 1000
+#define MAX_MQTT_PACKET_SIZE 200
+#define MAX_MQTT_PAYLOAD_SIZE 100
+
+typedef enum { QOS0, QOS1, QOS2 } QoS;
+
+// all failure return codes must be negative
+typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
+
+
+typedef struct
+{
+ QoS qos;
+ bool retained;
+ bool dup;
+ unsigned short id;
+ void *payload;
+ size_t payloadlen;
+}Message, *pMessage;
+
+// TODO:
+// Merge this struct with the one above, in order to use the same
+// data structure for sending and receiving. I need to simplify
+// the PubMessage to not contain pointers like the one above.
+typedef struct
+{
+ char topic[100];
+ QoS qos;
+ unsigned short id;
+ size_t payloadlen;
+ char payload[MAX_MQTT_PAYLOAD_SIZE];
+}PubMessage, *pPubMessage;
+
+struct MessageData
+{
+ MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
+ { }
+ Message &message;
+ MQTTString &topicName;
+};
+
+class PacketId
+{
+public:
+ PacketId()
+ {
+ next = 0;
+ }
+
+ int getNext()
+ {
+ return next = (next == MAX_PACKET_ID) ? 1 : ++next;
+ }
+
+private:
+ static const int MAX_PACKET_ID = 65535;
+ int next;
+};
+
+
+
+class MQTTThreadedClient
+{
+public:
+ MQTTThreadedClient(NetworkInterface * aNetwork)
+ : network(aNetwork),
+ queue(32 * EVENTS_EVENT_SIZE),
+ isConnected(false) {
+ tcpSocket = new TCPSocket();
+ }
+
+ int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
+ int publish(PubMessage& message);
+ int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
+
+ template<typename T>
+ int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
+ int rc = FAILURE;
+ int len = 0;
+
+ MQTTString topic = {(char*)topicstr, {0, 0}};
+
+ if (!isConnected) {
+ printf("Session already connected!!\r\n");
+ return rc;
+ }
+
+ len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+ if (len <= 0) {
+ printf("Error serializing subscribe packet ...\r\n");
+ return rc;
+ }
+
+ if ((rc = sendPacket(len)) != SUCCESS) {
+ printf("Error sending subscribe packet [%d]\r\n", rc);
+ return rc;
+ }
+
+ // Wait for SUBACK, dropping packets read along the way ...
+ if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
+ int count = 0, grantedQoS = -1;
+ unsigned short mypacketid;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = grantedQoS; // 0, 1, 2 or 0x80
+ // For as long as we do not get 0x80 ..
+ if (rc != 0x80) {
+ // Add message handlers to the map
+ FP<void,MessageData &> fp;
+ fp.attach(object, member);
+
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+
+ // Reset connection timers here ...
+ resetConnectionTimer();
+
+ rc = SUCCESS;
+ }
+ } else
+ rc = FAILURE;
+
+ return rc;
+ }
+
+ // the listener thread ...
+ void startListener();
+
+protected:
+ int connect(MQTTPacket_connectData& options);
+ int handlePublishMsg();
+
+
+private:
+ NetworkInterface * network;
+ TCPSocket * tcpSocket;
+ PacketId packetid;
+ // Event queue
+ EventQueue queue;
+ bool isConnected;
+
+ // TODO: Because I'm using a map, I can only have one handler
+ // for each topic (one that's mapped to the topic string).
+ // Attaching another handler on the same topic is not possible.
+ // In the future, use a vector instead of maps to allow multiple
+ // handlers for the same topic.
+ std::map<std::string, FP<void, MessageData &> > topicCBMap;
+
+ unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
+ unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
+
+ unsigned int keepAliveInterval;
+ Timer comTimer;
+
+ int readPacket();
+ int sendPacket(size_t length);
+ int readPacketLength(int* value);
+ int readUntil(int packetType, int timeout);
+ int readBytesToBuffer(char * buffer, size_t size, int timeout);
+ int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
+ bool isTopicMatched(char* topic, MQTTString& topicName);
+ int sendPublish(PubMessage& message);
+ void resetConnectionTimer();
+ void sendPingRequest();
+ bool hasConnectionTimedOut();
+
+};
+
+#endif
