A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.h
- Committer:
- vpcola
- Date:
- 2017-03-26
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
File content as of revision 23:06fac173529e:
#ifndef _MQTT_THREADED_CLIENT_H_
#define _MQTT_THREADED_CLIENT_H_
#include "mbed.h"
#include "rtos.h"
#include "MQTTPacket.h"
#include "NetworkInterface.h"
#include "FP.h"
#include <cstdio>
#include <string>
#include <map>
#define COMMAND_TIMEOUT 5000
#define DEFAULT_SOCKET_TIMEOUT 1000
#define MAX_MQTT_PACKET_SIZE 200
#define MAX_MQTT_PAYLOAD_SIZE 100
typedef enum { QOS0, QOS1, QOS2 } QoS;
// all failure return codes must be negative
typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode;
typedef struct
{
QoS qos;
bool retained;
bool dup;
unsigned short id;
void *payload;
size_t payloadlen;
}Message, *pMessage;
// TODO:
// Merge this struct with the one above, in order to use the same
// data structure for sending and receiving. I need to simplify
// the PubMessage to not contain pointers like the one above.
typedef struct
{
char topic[100];
QoS qos;
unsigned short id;
size_t payloadlen;
char payload[MAX_MQTT_PAYLOAD_SIZE];
}PubMessage, *pPubMessage;
struct MessageData
{
MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName)
{ }
Message &message;
MQTTString &topicName;
};
class PacketId
{
public:
PacketId()
{
next = 0;
}
int getNext()
{
return next = (next == MAX_PACKET_ID) ? 1 : ++next;
}
private:
static const int MAX_PACKET_ID = 65535;
int next;
};
class MQTTThreadedClient
{
public:
MQTTThreadedClient(NetworkInterface * aNetwork)
: network(aNetwork),
queue(32 * EVENTS_EVENT_SIZE),
isConnected(false) {
tcpSocket = new TCPSocket();
}
int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
int publish(PubMessage& message);
int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
template<typename T>
int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
int rc = FAILURE;
int len = 0;
MQTTString topic = {(char*)topicstr, {0, 0}};
if (!isConnected) {
printf("Session already connected!!\r\n");
return rc;
}
len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
if (len <= 0) {
printf("Error serializing subscribe packet ...\r\n");
return rc;
}
if ((rc = sendPacket(len)) != SUCCESS) {
printf("Error sending subscribe packet [%d]\r\n", rc);
return rc;
}
// Wait for SUBACK, dropping packets read along the way ...
if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
int count = 0, grantedQoS = -1;
unsigned short mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80
// For as long as we do not get 0x80 ..
if (rc != 0x80) {
// Add message handlers to the map
FP<void,MessageData &> fp;
fp.attach(object, member);
topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
// Reset connection timers here ...
resetConnectionTimer();
rc = SUCCESS;
}
} else
rc = FAILURE;
return rc;
}
// the listener thread ...
void startListener();
protected:
int connect(MQTTPacket_connectData& options);
int handlePublishMsg();
private:
NetworkInterface * network;
TCPSocket * tcpSocket;
PacketId packetid;
// Event queue
EventQueue queue;
bool isConnected;
// TODO: Because I'm using a map, I can only have one handler
// for each topic (one that's mapped to the topic string).
// Attaching another handler on the same topic is not possible.
// In the future, use a vector instead of maps to allow multiple
// handlers for the same topic.
std::map<std::string, FP<void, MessageData &> > topicCBMap;
unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
unsigned int keepAliveInterval;
Timer comTimer;
int readPacket();
int sendPacket(size_t length);
int readPacketLength(int* value);
int readUntil(int packetType, int timeout);
int readBytesToBuffer(char * buffer, size_t size, int timeout);
int sendBytesFromBuffer(char * buffer, size_t size, int timeout);
bool isTopicMatched(char* topic, MQTTString& topicName);
int sendPublish(PubMessage& message);
void resetConnectionTimer();
void sendPingRequest();
bool hasConnectionTimedOut();
};
#endif
