Vergil Cola
/
MQTTGateway2
Fork of my original MQTTGateway
Embed:
(wiki syntax)
Show/hide line numbers
MQTTThreadedClient.h
00001 #ifndef _MQTT_THREADED_CLIENT_H_ 00002 #define _MQTT_THREADED_CLIENT_H_ 00003 00004 #include "mbed.h" 00005 #include "rtos.h" 00006 #include "MQTTPacket.h" 00007 #include "NetworkInterface.h" 00008 #include "FP.h" 00009 #include "config.h" 00010 00011 #include <cstdio> 00012 #include <string> 00013 #include <map> 00014 00015 //#define MQTT_DEBUG 1 00016 00017 00018 #define COMMAND_TIMEOUT 5000 00019 #define DEFAULT_SOCKET_TIMEOUT 1000 00020 #define MAX_MQTT_PACKET_SIZE 500 00021 #define MAX_MQTT_PAYLOAD_SIZE 300 00022 00023 namespace MQTT 00024 { 00025 00026 typedef enum { QOS0, QOS1, QOS2 } QoS; 00027 00028 // all failure return codes must be negative 00029 typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; 00030 00031 00032 typedef struct 00033 { 00034 QoS qos; 00035 bool retained; 00036 bool dup; 00037 unsigned short id; 00038 void *payload; 00039 size_t payloadlen; 00040 }Message, *pMessage; 00041 00042 // TODO: 00043 // Merge this struct with the one above, in order to use the same 00044 // data structure for sending and receiving. I need to simplify 00045 // the PubMessage to not contain pointers like the one above. 00046 typedef struct 00047 { 00048 char topic[100]; 00049 QoS qos; 00050 unsigned short id; 00051 size_t payloadlen; 00052 char payload[MAX_MQTT_PAYLOAD_SIZE]; 00053 }PubMessage, *pPubMessage; 00054 00055 struct MessageData 00056 { 00057 MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) 00058 { } 00059 Message &message; 00060 MQTTString &topicName; 00061 }; 00062 00063 class PacketId 00064 { 00065 public: 00066 PacketId() 00067 { 00068 next = 0; 00069 } 00070 00071 int getNext() 00072 { 00073 return next = (next == MAX_PACKET_ID) ? 1 : ++next; 00074 } 00075 00076 private: 00077 static const int MAX_PACKET_ID = 65535; 00078 int next; 00079 }; 00080 00081 00082 00083 class MQTTThreadedClient 00084 { 00085 public: 00086 MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) 00087 : network(aNetwork), 00088 ssl_ca_pem(pem), 00089 port((pem != NULL) ? 8883 : 1883), 00090 queue(32 * EVENTS_EVENT_SIZE), 00091 isConnected(false), 00092 hasSavedSession(false), 00093 useTLS(pem != NULL) 00094 { 00095 DRBG_PERS = "mbed TLS MQTT client"; 00096 tcpSocket = new TCPSocket(); 00097 setupTLS(); 00098 } 00099 00100 ~MQTTThreadedClient() 00101 { 00102 // TODO: signal the thread to shutdown 00103 freeTLS(); 00104 00105 if (isConnected) 00106 disconnect(); 00107 00108 } 00109 /** 00110 * Sets the connection parameters. Must be called before running the startListener as a thread. 00111 * 00112 * @param host - pointer to the host where the MQTT server is running 00113 * @param port - the port number to connect, 1883 for non secure connections, 8883 for 00114 * secure connections 00115 * @param options - the connect data used for logging into the MQTT server. 00116 */ 00117 void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); 00118 int publish(PubMessage& message); 00119 00120 void addTopicHandler(const char * topic, void (*function)(MessageData &)); 00121 template<typename T> 00122 void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) 00123 { 00124 FP<void,MessageData &> fp; 00125 fp.attach(object, member); 00126 00127 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); 00128 } 00129 00130 // TODO: Add unsubscribe functionality. 00131 00132 // Start the listener thread and start polling 00133 // MQTT server. 00134 void startListener(); 00135 // Stop the listerner thread and closes connection 00136 void stopListener(); 00137 00138 protected: 00139 00140 int handlePublishMsg(); 00141 void disconnect(); 00142 int connect(); 00143 00144 00145 private: 00146 NetworkInterface * network; 00147 const char * ssl_ca_pem; 00148 TCPSocket * tcpSocket; 00149 PacketId packetid; 00150 const char *DRBG_PERS; 00151 nsapi_error_t _error; 00152 // Connection options 00153 std::string host; 00154 uint16_t port; 00155 MQTTPacket_connectData connect_options; 00156 // Event queue 00157 EventQueue queue; 00158 bool isConnected; 00159 bool hasSavedSession; 00160 00161 // TODO: Because I'm using a map, I can only have one handler 00162 // for each topic (one that's mapped to the topic string). 00163 // Attaching another handler on the same topic is not possible. 00164 // In the future, use a vector instead of maps to allow multiple 00165 // handlers for the same topic. 00166 std::map<std::string, FP<void, MessageData &> > topicCBMap; 00167 00168 unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; 00169 unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; 00170 00171 unsigned int keepAliveInterval; 00172 Timer comTimer; 00173 00174 // SSL/TLS functions 00175 bool useTLS; 00176 void setupTLS(); 00177 int initTLS(); 00178 void freeTLS(); 00179 int doTLSHandshake(); 00180 00181 int processSubscriptions(); 00182 int readPacket(); 00183 int sendPacket(size_t length); 00184 int readPacketLength(int* value); 00185 int readUntil(int packetType, int timeout); 00186 int readBytesToBuffer(char * buffer, size_t size, int timeout); 00187 int sendBytesFromBuffer(char * buffer, size_t size, int timeout); 00188 bool isTopicMatched(char* topic, MQTTString& topicName); 00189 int sendPublish(PubMessage& message); 00190 void resetConnectionTimer(); 00191 void sendPingRequest(); 00192 bool hasConnectionTimedOut(); 00193 int login(); 00194 }; 00195 00196 } 00197 #endif
Generated on Tue Jul 12 2022 18:06:46 by 1.7.2