A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Committer:
vpcola
Date:
Sun Mar 26 04:35:46 2017 +0000
Revision:
23:06fac173529e
Child:
25:326f00faa092
Code is stable now in both publish/subscribe ...

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 23:06fac173529e 1 #ifndef _MQTT_THREADED_CLIENT_H_
vpcola 23:06fac173529e 2 #define _MQTT_THREADED_CLIENT_H_
vpcola 23:06fac173529e 3
vpcola 23:06fac173529e 4 #include "mbed.h"
vpcola 23:06fac173529e 5 #include "rtos.h"
vpcola 23:06fac173529e 6 #include "MQTTPacket.h"
vpcola 23:06fac173529e 7 #include "NetworkInterface.h"
vpcola 23:06fac173529e 8 #include "FP.h"
vpcola 23:06fac173529e 9
vpcola 23:06fac173529e 10 #include <cstdio>
vpcola 23:06fac173529e 11 #include <string>
vpcola 23:06fac173529e 12 #include <map>
vpcola 23:06fac173529e 13
vpcola 23:06fac173529e 14 #define COMMAND_TIMEOUT 5000
vpcola 23:06fac173529e 15 #define DEFAULT_SOCKET_TIMEOUT 1000
vpcola 23:06fac173529e 16 #define MAX_MQTT_PACKET_SIZE 200
vpcola 23:06fac173529e 17 #define MAX_MQTT_PAYLOAD_SIZE 100
vpcola 23:06fac173529e 18
vpcola 23:06fac173529e 19 typedef enum { QOS0, QOS1, QOS2 } QoS;
vpcola 23:06fac173529e 20
vpcola 23:06fac173529e 21 // all failure return codes must be negative
vpcola 23:06fac173529e 22 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
vpcola 23:06fac173529e 23
vpcola 23:06fac173529e 24
vpcola 23:06fac173529e 25 typedef struct
vpcola 23:06fac173529e 26 {
vpcola 23:06fac173529e 27 QoS qos;
vpcola 23:06fac173529e 28 bool retained;
vpcola 23:06fac173529e 29 bool dup;
vpcola 23:06fac173529e 30 unsigned short id;
vpcola 23:06fac173529e 31 void *payload;
vpcola 23:06fac173529e 32 size_t payloadlen;
vpcola 23:06fac173529e 33 }Message, *pMessage;
vpcola 23:06fac173529e 34
vpcola 23:06fac173529e 35 // TODO:
vpcola 23:06fac173529e 36 // Merge this struct with the one above, in order to use the same
vpcola 23:06fac173529e 37 // data structure for sending and receiving. I need to simplify
vpcola 23:06fac173529e 38 // the PubMessage to not contain pointers like the one above.
vpcola 23:06fac173529e 39 typedef struct
vpcola 23:06fac173529e 40 {
vpcola 23:06fac173529e 41 char topic[100];
vpcola 23:06fac173529e 42 QoS qos;
vpcola 23:06fac173529e 43 unsigned short id;
vpcola 23:06fac173529e 44 size_t payloadlen;
vpcola 23:06fac173529e 45 char payload[MAX_MQTT_PAYLOAD_SIZE];
vpcola 23:06fac173529e 46 }PubMessage, *pPubMessage;
vpcola 23:06fac173529e 47
vpcola 23:06fac173529e 48 struct MessageData
vpcola 23:06fac173529e 49 {
vpcola 23:06fac173529e 50 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
vpcola 23:06fac173529e 51 { }
vpcola 23:06fac173529e 52 Message &message;
vpcola 23:06fac173529e 53 MQTTString &topicName;
vpcola 23:06fac173529e 54 };
vpcola 23:06fac173529e 55
vpcola 23:06fac173529e 56 class PacketId
vpcola 23:06fac173529e 57 {
vpcola 23:06fac173529e 58 public:
vpcola 23:06fac173529e 59 PacketId()
vpcola 23:06fac173529e 60 {
vpcola 23:06fac173529e 61 next = 0;
vpcola 23:06fac173529e 62 }
vpcola 23:06fac173529e 63
vpcola 23:06fac173529e 64 int getNext()
vpcola 23:06fac173529e 65 {
vpcola 23:06fac173529e 66 return next = (next == MAX_PACKET_ID) ? 1 : ++next;
vpcola 23:06fac173529e 67 }
vpcola 23:06fac173529e 68
vpcola 23:06fac173529e 69 private:
vpcola 23:06fac173529e 70 static const int MAX_PACKET_ID = 65535;
vpcola 23:06fac173529e 71 int next;
vpcola 23:06fac173529e 72 };
vpcola 23:06fac173529e 73
vpcola 23:06fac173529e 74
vpcola 23:06fac173529e 75
vpcola 23:06fac173529e 76 class MQTTThreadedClient
vpcola 23:06fac173529e 77 {
vpcola 23:06fac173529e 78 public:
vpcola 23:06fac173529e 79 MQTTThreadedClient(NetworkInterface * aNetwork)
vpcola 23:06fac173529e 80 : network(aNetwork),
vpcola 23:06fac173529e 81 queue(32 * EVENTS_EVENT_SIZE),
vpcola 23:06fac173529e 82 isConnected(false) {
vpcola 23:06fac173529e 83 tcpSocket = new TCPSocket();
vpcola 23:06fac173529e 84 }
vpcola 23:06fac173529e 85
vpcola 23:06fac173529e 86 int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
vpcola 23:06fac173529e 87 int publish(PubMessage& message);
vpcola 23:06fac173529e 88 int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
vpcola 23:06fac173529e 89
vpcola 23:06fac173529e 90 template<typename T>
vpcola 23:06fac173529e 91 int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
vpcola 23:06fac173529e 92 int rc = FAILURE;
vpcola 23:06fac173529e 93 int len = 0;
vpcola 23:06fac173529e 94
vpcola 23:06fac173529e 95 MQTTString topic = {(char*)topicstr, {0, 0}};
vpcola 23:06fac173529e 96
vpcola 23:06fac173529e 97 if (!isConnected) {
vpcola 23:06fac173529e 98 printf("Session already connected!!\r\n");
vpcola 23:06fac173529e 99 return rc;
vpcola 23:06fac173529e 100 }
vpcola 23:06fac173529e 101
vpcola 23:06fac173529e 102 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
vpcola 23:06fac173529e 103 if (len <= 0) {
vpcola 23:06fac173529e 104 printf("Error serializing subscribe packet ...\r\n");
vpcola 23:06fac173529e 105 return rc;
vpcola 23:06fac173529e 106 }
vpcola 23:06fac173529e 107
vpcola 23:06fac173529e 108 if ((rc = sendPacket(len)) != SUCCESS) {
vpcola 23:06fac173529e 109 printf("Error sending subscribe packet [%d]\r\n", rc);
vpcola 23:06fac173529e 110 return rc;
vpcola 23:06fac173529e 111 }
vpcola 23:06fac173529e 112
vpcola 23:06fac173529e 113 // Wait for SUBACK, dropping packets read along the way ...
vpcola 23:06fac173529e 114 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
vpcola 23:06fac173529e 115 int count = 0, grantedQoS = -1;
vpcola 23:06fac173529e 116 unsigned short mypacketid;
vpcola 23:06fac173529e 117 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 23:06fac173529e 118 rc = grantedQoS; // 0, 1, 2 or 0x80
vpcola 23:06fac173529e 119 // For as long as we do not get 0x80 ..
vpcola 23:06fac173529e 120 if (rc != 0x80) {
vpcola 23:06fac173529e 121 // Add message handlers to the map
vpcola 23:06fac173529e 122 FP<void,MessageData &> fp;
vpcola 23:06fac173529e 123 fp.attach(object, member);
vpcola 23:06fac173529e 124
vpcola 23:06fac173529e 125 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
vpcola 23:06fac173529e 126
vpcola 23:06fac173529e 127 // Reset connection timers here ...
vpcola 23:06fac173529e 128 resetConnectionTimer();
vpcola 23:06fac173529e 129
vpcola 23:06fac173529e 130 rc = SUCCESS;
vpcola 23:06fac173529e 131 }
vpcola 23:06fac173529e 132 } else
vpcola 23:06fac173529e 133 rc = FAILURE;
vpcola 23:06fac173529e 134
vpcola 23:06fac173529e 135 return rc;
vpcola 23:06fac173529e 136 }
vpcola 23:06fac173529e 137
vpcola 23:06fac173529e 138 // the listener thread ...
vpcola 23:06fac173529e 139 void startListener();
vpcola 23:06fac173529e 140
vpcola 23:06fac173529e 141 protected:
vpcola 23:06fac173529e 142 int connect(MQTTPacket_connectData& options);
vpcola 23:06fac173529e 143 int handlePublishMsg();
vpcola 23:06fac173529e 144
vpcola 23:06fac173529e 145
vpcola 23:06fac173529e 146 private:
vpcola 23:06fac173529e 147 NetworkInterface * network;
vpcola 23:06fac173529e 148 TCPSocket * tcpSocket;
vpcola 23:06fac173529e 149 PacketId packetid;
vpcola 23:06fac173529e 150 // Event queue
vpcola 23:06fac173529e 151 EventQueue queue;
vpcola 23:06fac173529e 152 bool isConnected;
vpcola 23:06fac173529e 153
vpcola 23:06fac173529e 154 // TODO: Because I'm using a map, I can only have one handler
vpcola 23:06fac173529e 155 // for each topic (one that's mapped to the topic string).
vpcola 23:06fac173529e 156 // Attaching another handler on the same topic is not possible.
vpcola 23:06fac173529e 157 // In the future, use a vector instead of maps to allow multiple
vpcola 23:06fac173529e 158 // handlers for the same topic.
vpcola 23:06fac173529e 159 std::map<std::string, FP<void, MessageData &> > topicCBMap;
vpcola 23:06fac173529e 160
vpcola 23:06fac173529e 161 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
vpcola 23:06fac173529e 162 unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
vpcola 23:06fac173529e 163
vpcola 23:06fac173529e 164 unsigned int keepAliveInterval;
vpcola 23:06fac173529e 165 Timer comTimer;
vpcola 23:06fac173529e 166
vpcola 23:06fac173529e 167 int readPacket();
vpcola 23:06fac173529e 168 int sendPacket(size_t length);
vpcola 23:06fac173529e 169 int readPacketLength(int* value);
vpcola 23:06fac173529e 170 int readUntil(int packetType, int timeout);
vpcola 23:06fac173529e 171 int readBytesToBuffer(char * buffer, size_t size, int timeout);
vpcola 23:06fac173529e 172 int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
vpcola 23:06fac173529e 173 bool isTopicMatched(char* topic, MQTTString& topicName);
vpcola 23:06fac173529e 174 int sendPublish(PubMessage& message);
vpcola 23:06fac173529e 175 void resetConnectionTimer();
vpcola 23:06fac173529e 176 void sendPingRequest();
vpcola 23:06fac173529e 177 bool hasConnectionTimedOut();
vpcola 23:06fac173529e 178
vpcola 23:06fac173529e 179 };
vpcola 23:06fac173529e 180
vpcola 23:06fac173529e 181 #endif