Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
MQTTThreadedClient.h
00001 #ifndef _MQTT_THREADED_CLIENT_H_ 00002 #define _MQTT_THREADED_CLIENT_H_ 00003 00004 #include "mbed.h" 00005 #include "rtos.h" 00006 #include "MQTTPacket.h" 00007 #include "NetworkInterface.h" 00008 #include "FP.h" 00009 #include "config.h" 00010 00011 #include <cstdio> 00012 #include <string> 00013 #include <map> 00014 00015 //#define MQTT_DEBUG 1 00016 00017 00018 #define COMMAND_TIMEOUT 5000 00019 #define DEFAULT_SOCKET_TIMEOUT 1000 00020 #define MAX_MQTT_PACKET_SIZE 500 00021 #define MAX_MQTT_PAYLOAD_SIZE 300 00022 00023 namespace MQTT 00024 { 00025 00026 typedef enum { QOS0, QOS1, QOS2 } QoS; 00027 00028 // all failure return codes must be negative 00029 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; 00030 00031 00032 typedef struct 00033 { 00034 QoS qos; 00035 bool retained; 00036 bool dup; 00037 unsigned short id; 00038 void *payload; 00039 size_t payloadlen; 00040 }Message, *pMessage; 00041 00042 // TODO: 00043 // Merge this struct with the one above, in order to use the same 00044 // data structure for sending and receiving. I need to simplify 00045 // the PubMessage to not contain pointers like the one above. 00046 typedef struct 00047 { 00048 char topic[100]; 00049 QoS qos; 00050 unsigned short id; 00051 size_t payloadlen; 00052 char payload[MAX_MQTT_PAYLOAD_SIZE]; 00053 }PubMessage, *pPubMessage; 00054 00055 struct MessageData 00056 { 00057 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) 00058 { } 00059 Message &message; 00060 MQTTString &topicName; 00061 }; 00062 00063 class PacketId 00064 { 00065 public: 00066 PacketId() 00067 { 00068 next = 0; 00069 } 00070 00071 int getNext() 00072 { 00073 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00074 } 00075 00076 private: 00077 static const int MAX_PACKET_ID = 65535; 00078 int next; 00079 }; 00080 00081 00082 00083 class MQTTThreadedClient 00084 { 00085 public: 00086 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) 00087 : network(aNetwork), 00088 ssl_ca_pem(pem), 00089 port((pem != NULL) ? 8883 : 1883), 00090 queue(32 * EVENTS_EVENT_SIZE), 00091 isConnected(false), 00092 hasSavedSession(false), 00093 useTLS(pem != NULL) 00094 { 00095 DRBG_PERS = "mbed TLS MQTT client"; 00096 tcpSocket = new TCPSocket(); 00097 setupTLS(); 00098 } 00099 00100 ~MQTTThreadedClient() 00101 { 00102 // TODO: signal the thread to shutdown 00103 freeTLS(); 00104 00105 if (isConnected) 00106 disconnect(); 00107 00108 } 00109 /** 00110 * Sets the connection parameters. Must be called before running the startListener as a thread. 00111 * 00112 * @param host - pointer to the host where the MQTT server is running 00113 * @param port - the port number to connect, 1883 for non secure connections, 8883 for 00114 * secure connections 00115 * @param options - the connect data used for logging into the MQTT server. 00116 */ 00117 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); 00118 int publish(PubMessage& message); 00119 00120 void addTopicHandler(const char * topic, void (*function)(MessageData &)); 00121 template<typename T> 00122 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) 00123 { 00124 FP<void,MessageData &> fp; 00125 fp.attach(object, member); 00126 00127 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); 00128 } 00129 00130 // TODO: Add unsubscribe functionality. 00131 00132 // Start the listener thread and start polling 00133 // MQTT server. 00134 void startListener(); 00135 // Stop the listerner thread and closes connection 00136 void stopListener(); 00137 00138 protected: 00139 00140 int handlePublishMsg(); 00141 void disconnect(); 00142 int connect(); 00143 00144 00145 private: 00146 NetworkInterface * network; 00147 const char * ssl_ca_pem; 00148 TCPSocket * tcpSocket; 00149 PacketId packetid; 00150 const char *DRBG_PERS; 00151 nsapi_error_t _error; 00152 // Connection options 00153 std::string host; 00154 uint16_t port; 00155 MQTTPacket_connectData connect_options; 00156 // Event queue 00157 EventQueue queue; 00158 bool isConnected; 00159 bool hasSavedSession; 00160 00161 // TODO: Because I'm using a map, I can only have one handler 00162 // for each topic (one that's mapped to the topic string). 00163 // Attaching another handler on the same topic is not possible. 00164 // In the future, use a vector instead of maps to allow multiple 00165 // handlers for the same topic. 00166 std::map<std::string, FP<void, MessageData &> > topicCBMap; 00167 00168 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00169 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00170 00171 unsigned int keepAliveInterval; 00172 Timer comTimer; 00173 00174 // SSL/TLS functions 00175 bool useTLS; 00176 void setupTLS(); 00177 int initTLS(); 00178 void freeTLS(); 00179 int doTLSHandshake(); 00180 00181 int processSubscriptions(); 00182 int readPacket(); 00183 int sendPacket(size_t length); 00184 int readPacketLength(int* value); 00185 int readUntil(int packetType, int timeout); 00186 int readBytesToBuffer(char * buffer, size_t size, int timeout); 00187 int sendBytesFromBuffer(char * buffer, size_t size, int timeout); 00188 bool isTopicMatched(char* topic, MQTTString& topicName); 00189 int sendPublish(PubMessage& message); 00190 void resetConnectionTimer(); 00191 void sendPingRequest(); 00192 bool hasConnectionTimedOut(); 00193 int login(); 00194 }; 00195 00196 } 00197 #endif
Generated on Tue Jul 12 2022 18:06:46 by
1.7.2