A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.h
00001 #ifndef _MQTT_THREADED_CLIENT_H_ 00002 #define _MQTT_THREADED_CLIENT_H_ 00003 00004 #include "mbed.h" 00005 #include "rtos.h" 00006 #include "MQTTPacket.h" 00007 #include "NetworkInterface.h" 00008 #include "FP.h" 00009 00010 //#define MQTT_DEBUG 1 00011 00012 #ifdef MQTT_DEBUG 00013 #define DBG(fmt, args...) printf(fmt, ## args) 00014 #else 00015 #define DBG(fmt, args...) /* Don't do anything in release builds */ 00016 #endif 00017 00018 #include <cstdio> 00019 #include <string> 00020 #include <map> 00021 00022 #define COMMAND_TIMEOUT 5000 00023 #define DEFAULT_SOCKET_TIMEOUT 1000 00024 #define MAX_MQTT_PACKET_SIZE 200 00025 #define MAX_MQTT_PAYLOAD_SIZE 100 00026 00027 namespace MQTT 00028 { 00029 00030 typedef enum { QOS0, QOS1, QOS2 } QoS; 00031 00032 // all failure return codes must be negative 00033 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; 00034 00035 00036 typedef struct 00037 { 00038 QoS qos; 00039 bool retained; 00040 bool dup; 00041 unsigned short id; 00042 void *payload; 00043 size_t payloadlen; 00044 }Message, *pMessage; 00045 00046 // TODO: 00047 // Merge this struct with the one above, in order to use the same 00048 // data structure for sending and receiving. I need to simplify 00049 // the PubMessage to not contain pointers like the one above. 00050 typedef struct 00051 { 00052 char topic[100]; 00053 QoS qos; 00054 unsigned short id; 00055 size_t payloadlen; 00056 char payload[MAX_MQTT_PAYLOAD_SIZE]; 00057 }PubMessage, *pPubMessage; 00058 00059 struct MessageData 00060 { 00061 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) 00062 { } 00063 Message &message; 00064 MQTTString &topicName; 00065 }; 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 00087 class MQTTThreadedClient 00088 { 00089 public: 00090 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) 00091 : network(aNetwork), 00092 ssl_ca_pem(pem), 00093 port((pem != NULL) ? 8883 : 1883), 00094 queue(32 * EVENTS_EVENT_SIZE), 00095 isConnected(false), 00096 hasSavedSession(false), 00097 useTLS(pem != NULL) 00098 { 00099 DRBG_PERS = "mbed TLS MQTT client"; 00100 tcpSocket = new TCPSocket(); 00101 setupTLS(); 00102 } 00103 00104 ~MQTTThreadedClient() 00105 { 00106 // TODO: signal the thread to shutdown 00107 freeTLS(); 00108 00109 if (isConnected) 00110 disconnect(); 00111 00112 } 00113 /** 00114 * Sets the connection parameters. Must be called before running the startListener as a thread. 00115 * 00116 * @param host - pointer to the host where the MQTT server is running 00117 * @param port - the port number to connect, 1883 for non secure connections, 8883 for 00118 * secure connections 00119 * @param options - the connect data used for logging into the MQTT server. 00120 */ 00121 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); 00122 int publish(PubMessage& message); 00123 00124 void addTopicHandler(const char * topic, void (*function)(MessageData &)); 00125 template<typename T> 00126 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) 00127 { 00128 FP<void,MessageData &> fp; 00129 fp.attach(object, member); 00130 00131 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); 00132 } 00133 00134 // TODO: Add unsubscribe functionality. 00135 00136 // Start the listener thread and start polling 00137 // MQTT server. 00138 void startListener(); 00139 // Stop the listerner thread and closes connection 00140 void stopListener(); 00141 00142 protected: 00143 00144 int handlePublishMsg(); 00145 void disconnect(); 00146 int connect(); 00147 00148 00149 private: 00150 NetworkInterface * network; 00151 const char * ssl_ca_pem; 00152 TCPSocket * tcpSocket; 00153 PacketId packetid; 00154 const char *DRBG_PERS; 00155 nsapi_error_t _error; 00156 // Connection options 00157 std::string host; 00158 uint16_t port; 00159 MQTTPacket_connectData connect_options; 00160 // Event queue 00161 EventQueue queue; 00162 bool isConnected; 00163 bool hasSavedSession; 00164 00165 // TODO: Because I'm using a map, I can only have one handler 00166 // for each topic (one that's mapped to the topic string). 00167 // Attaching another handler on the same topic is not possible. 00168 // In the future, use a vector instead of maps to allow multiple 00169 // handlers for the same topic. 00170 std::map<std::string, FP<void, MessageData &> > topicCBMap; 00171 00172 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00173 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00174 00175 unsigned int keepAliveInterval; 00176 Timer comTimer; 00177 00178 // SSL/TLS functions 00179 bool useTLS; 00180 void setupTLS(); 00181 int initTLS(); 00182 void freeTLS(); 00183 int doTLSHandshake(); 00184 00185 int processSubscriptions(); 00186 int readPacket(); 00187 int sendPacket(size_t length); 00188 int readPacketLength(int* value); 00189 int readUntil(int packetType, int timeout); 00190 int readBytesToBuffer(char * buffer, size_t size, int timeout); 00191 int sendBytesFromBuffer(char * buffer, size_t size, int timeout); 00192 bool isTopicMatched(char* topic, MQTTString& topicName); 00193 int sendPublish(PubMessage& message); 00194 void resetConnectionTimer(); 00195 void sendPingRequest(); 00196 bool hasConnectionTimedOut(); 00197 int login(); 00198 }; 00199 00200 } 00201 #endif
Generated on Thu Jul 14 2022 03:56:17 by 1.7.2