Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
MQTTThreadedClient.h
00001 #ifndef _MQTT_THREADED_CLIENT_H_ 00002 #define _MQTT_THREADED_CLIENT_H_ 00003 00004 #include "mbed.h" 00005 #include "rtos.h" 00006 #include "MQTTPacket.h" 00007 #include "NetworkInterface.h" 00008 #include "FP.h" 00009 00010 //#define MQTT_DEBUG 1 00011 00012 #ifdef MQTT_DEBUG 00013 #define DBG(fmt, args...) printf(fmt, ## args) 00014 #else 00015 #define DBG(fmt, args...) /* Don't do anything in release builds */ 00016 #endif 00017 00018 #include <cstdio> 00019 #include <string> 00020 #include <map> 00021 00022 #define COMMAND_TIMEOUT 5000 00023 #define DEFAULT_SOCKET_TIMEOUT 1000 00024 #define MAX_MQTT_PACKET_SIZE 200 00025 #define MAX_MQTT_PAYLOAD_SIZE 100 00026 00027 namespace MQTT 00028 { 00029 00030 typedef enum { QOS0, QOS1, QOS2 } QoS; 00031 00032 // all failure return codes must be negative 00033 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; 00034 00035 00036 typedef struct 00037 { 00038 QoS qos; 00039 bool retained; 00040 bool dup; 00041 unsigned short id; 00042 void *payload; 00043 size_t payloadlen; 00044 }Message, *pMessage; 00045 00046 // TODO: 00047 // Merge this struct with the one above, in order to use the same 00048 // data structure for sending and receiving. I need to simplify 00049 // the PubMessage to not contain pointers like the one above. 00050 typedef struct 00051 { 00052 char topic[100]; 00053 QoS qos; 00054 unsigned short id; 00055 size_t payloadlen; 00056 char payload[MAX_MQTT_PAYLOAD_SIZE]; 00057 }PubMessage, *pPubMessage; 00058 00059 struct MessageData 00060 { 00061 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) 00062 { } 00063 Message &message; 00064 MQTTString &topicName; 00065 }; 00066 00067 class PacketId 00068 { 00069 public: 00070 PacketId() 00071 { 00072 next = 0; 00073 } 00074 00075 int getNext() 00076 { 00077 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00078 } 00079 00080 private: 00081 static const int MAX_PACKET_ID = 65535; 00082 int next; 00083 }; 00084 00085 00086 00087 class MQTTThreadedClient 00088 { 00089 public: 00090 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) 00091 : network(aNetwork), 00092 ssl_ca_pem(pem), 00093 port((pem != NULL) ? 8883 : 1883), 00094 queue(32 * EVENTS_EVENT_SIZE), 00095 isConnected(false), 00096 hasSavedSession(false), 00097 useTLS(pem != NULL) 00098 { 00099 DRBG_PERS = "mbed TLS MQTT client"; 00100 tcpSocket = new TCPSocket(); 00101 setupTLS(); 00102 } 00103 00104 ~MQTTThreadedClient() 00105 { 00106 // TODO: signal the thread to shutdown 00107 freeTLS(); 00108 00109 if (isConnected) 00110 disconnect(); 00111 00112 } 00113 /** 00114 * Sets the connection parameters. Must be called before running the startListener as a thread. 00115 * 00116 * @param host - pointer to the host where the MQTT server is running 00117 * @param port - the port number to connect, 1883 for non secure connections, 8883 for 00118 * secure connections 00119 * @param options - the connect data used for logging into the MQTT server. 00120 */ 00121 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); 00122 int publish(PubMessage& message); 00123 00124 void addTopicHandler(const char * topic, void (*function)(MessageData &)); 00125 template<typename T> 00126 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) 00127 { 00128 FP<void,MessageData &> fp; 00129 fp.attach(object, member); 00130 00131 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); 00132 } 00133 00134 // TODO: Add unsubscribe functionality. 00135 00136 // Start the listener thread and start polling 00137 // MQTT server. 00138 void startListener(); 00139 // Stop the listerner thread and closes connection 00140 void stopListener(); 00141 00142 protected: 00143 00144 int handlePublishMsg(); 00145 void disconnect(); 00146 int connect(); 00147 00148 00149 private: 00150 NetworkInterface * network; 00151 const char * ssl_ca_pem; 00152 TCPSocket * tcpSocket; 00153 PacketId packetid; 00154 const char *DRBG_PERS; 00155 nsapi_error_t _error; 00156 // Connection options 00157 std::string host; 00158 uint16_t port; 00159 MQTTPacket_connectData connect_options; 00160 // Event queue 00161 EventQueue queue; 00162 bool isConnected; 00163 bool hasSavedSession; 00164 00165 // TODO: Because I'm using a map, I can only have one handler 00166 // for each topic (one that's mapped to the topic string). 00167 // Attaching another handler on the same topic is not possible. 00168 // In the future, use a vector instead of maps to allow multiple 00169 // handlers for the same topic. 00170 std::map<std::string, FP<void, MessageData &> > topicCBMap; 00171 00172 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00173 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00174 00175 unsigned int keepAliveInterval; 00176 Timer comTimer; 00177 00178 // SSL/TLS functions 00179 bool useTLS; 00180 void setupTLS(); 00181 int initTLS(); 00182 void freeTLS(); 00183 int doTLSHandshake(); 00184 00185 int processSubscriptions(); 00186 int readPacket(); 00187 int sendPacket(size_t length); 00188 int readPacketLength(int* value); 00189 int readUntil(int packetType, int timeout); 00190 int readBytesToBuffer(char * buffer, size_t size, int timeout); 00191 int sendBytesFromBuffer(char * buffer, size_t size, int timeout); 00192 bool isTopicMatched(char* topic, MQTTString& topicName); 00193 int sendPublish(PubMessage& message); 00194 void resetConnectionTimer(); 00195 void sendPingRequest(); 00196 bool hasConnectionTimedOut(); 00197 int login(); 00198 }; 00199 00200 } 00201 #endif
Generated on Thu Jul 14 2022 03:56:17 by
1.7.2
