Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: EthernetNetIf TextLCD mbed
MQTTClient.cpp
00001 #include "MQTTClient.h" 00002 00003 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) 00004 { 00005 this->port = port; 00006 callback_server = callback; 00007 serverIp = server; 00008 connected = false; 00009 sessionOpened = false; 00010 timer.start(); 00011 } 00012 00013 MQTTClient::~MQTTClient() 00014 { 00015 } 00016 00017 int MQTTClient::send_data(const char* msg, int size) 00018 { 00019 int transLen = pTCPSocket->send(msg, size); 00020 00021 /*Check send length matches the message length*/ 00022 if(transLen != size){ 00023 for(int i = 0; i < size; i++) { 00024 printf("%x, ", msg[i]); 00025 } 00026 printf("Error on send.\r\n"); 00027 return -1; 00028 } 00029 return 1; 00030 } 00031 00032 int MQTTClient::open_session(char* id) 00033 { 00034 /*variable header*/ 00035 char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)}; 00036 00037 /*fixed header: 2 bytes, big endian*/ 00038 char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2}; 00039 00040 char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)]; 00041 00042 memset(packet,0,sizeof(packet)); 00043 memcpy(packet,fixed_header,sizeof(fixed_header)); 00044 memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header)); 00045 memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id)); 00046 00047 /* 00048 for(int i = 0; i < sizeof(packet); i++) { 00049 printf("%x, ", packet[i]); 00050 } 00051 printf("\r\n");*/ 00052 00053 if(!send_data(packet, sizeof(packet))) 00054 { 00055 return -1; 00056 } 00057 return 1; 00058 } 00059 00060 int MQTTClient::connect(char* id) 00061 { 00062 clientId = id; 00063 00064 /*Initial TCP socket*/ 00065 pTCPSocket = new TCPSocket; 00066 pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent); 00067 00068 host.setPort(1883); 00069 host.setIp(serverIp); 00070 host.setName("localhost"); 00071 00072 /*Trying to connect to host*/ 00073 printf("Trying to connect to host..\r\n\r\n"); 00074 TCPSocketErr err = pTCPSocket->connect(host); 00075 00076 Net::poll(); 00077 if(err) 00078 { 00079 printf("Error connecting to host [%d]\r\n", (int) err); 00080 return -1; 00081 } 00082 printf("Connect to host sucessed..\r\n\r\n"); 00083 00084 /*Wait TCP connection with server to be established*/ 00085 int i = 0; 00086 while(!connected) 00087 { 00088 Net::poll(); 00089 wait(1); 00090 i++; 00091 printf("Wait for connections %d..\r\n", i); 00092 if(i == 35) 00093 {//If wait too long, give up. 00094 return -1; 00095 } 00096 } 00097 00098 /*Send open session message to server*/ 00099 open_session(id); 00100 00101 /*Wait server notice of open sesion*/ 00102 while(!sessionOpened) 00103 { 00104 Net::poll(); 00105 wait(1); 00106 if(!connected){ 00107 break; 00108 } 00109 printf("Wait for session..\r\n"); 00110 } 00111 if(!connected) 00112 { 00113 return -2; 00114 } 00115 lastActivity = timer.read_ms(); 00116 return 1; 00117 } 00118 00119 int MQTTClient::publish(char* pub_topic, char* msg) 00120 { 00121 uint8_t var_header_pub[strlen(pub_topic)+3]; 00122 strcpy((char *)&var_header_pub[2], pub_topic); 00123 var_header_pub[0] = 0; 00124 var_header_pub[1] = strlen(pub_topic); 00125 var_header_pub[sizeof(var_header_pub)-1] = 0; 00126 00127 uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)}; 00128 00129 uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)]; 00130 memset(packet_pub,0,sizeof(packet_pub)); 00131 memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub)); 00132 memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub)); 00133 memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg)); 00134 00135 if(!send_data((char*)packet_pub, sizeof(packet_pub))) 00136 { 00137 return -1; 00138 } 00139 return 1; 00140 } 00141 00142 00143 void MQTTClient::disconnect() 00144 { 00145 char packet_224[] = {0xe0, 0x00}; 00146 send_data((char*)packet_224, 2); 00147 00148 connected = false; 00149 } 00150 00151 void MQTTClient::read_data() 00152 { 00153 char buffer[1024]; 00154 int len = 0, readLen; 00155 00156 while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){ 00157 len += readLen; 00158 } 00159 00160 buffer[len] = '\0'; 00161 00162 /* 00163 printf("Read length: %d %d\r\n", len, readLen); 00164 00165 for(int i = 0; i < len; i++) { 00166 printf("%x\r\n", buffer[i],buffer[i]); 00167 } 00168 printf("\r\n"); 00169 */ 00170 char type = buffer[0]>>4; 00171 if (type == 3) { // PUBLISH 00172 if (callback_server) { 00173 uint8_t tl = (buffer[2]<<3)+buffer[3]; 00174 char topic[tl+1]; 00175 for (int i=0;i<tl;i++) { 00176 topic[i] = buffer[4+i]; 00177 } 00178 topic[tl] = 0; 00179 // ignore msgID - only support QoS 0 subs 00180 char *payload = buffer+4+tl; 00181 callback_server(topic,(char*)payload); 00182 } 00183 } else if (type == 12) { // PINGREG -- Ask for alive 00184 char packet_208[] = {0xd0, 0x00}; 00185 send_data((char*)packet_208, 2); 00186 lastActivity = timer.read_ms(); 00187 } 00188 } 00189 00190 void MQTTClient::read_open_session() 00191 { 00192 char buffer[32]; 00193 int len = 0, readLen; 00194 00195 while((readLen = pTCPSocket->recv(buffer, 32)) != 0) 00196 { 00197 len += readLen; 00198 } 00199 00200 if(len == 4 && buffer[3] == 0) 00201 { 00202 printf("Session opened\r\n"); 00203 sessionOpened = true; 00204 } 00205 } 00206 00207 int MQTTClient::subscribe(char* topic) 00208 { 00209 00210 if (connected) 00211 { 00212 uint8_t var_header_topic[] = {0,10}; 00213 uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3}; 00214 00215 //utf topic 00216 uint8_t utf_topic[strlen(topic)+3]; 00217 strcpy((char *)&utf_topic[2], topic); 00218 00219 utf_topic[0] = 0; 00220 utf_topic[1] = strlen(topic); 00221 utf_topic[sizeof(utf_topic)-1] = 0; 00222 00223 char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3]; 00224 memset(packet_topic,0,sizeof(packet_topic)); 00225 memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic)); 00226 memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic)); 00227 memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic)); 00228 00229 if (!send_data(packet_topic, sizeof(packet_topic))) 00230 { 00231 return -1; 00232 } 00233 return 1; 00234 } 00235 return -1; 00236 } 00237 00238 void MQTTClient::live() 00239 { 00240 if (connected) 00241 { 00242 int t = timer.read_ms(); 00243 if (t - lastActivity > KEEPALIVE) { 00244 //Send 192 0 to broker 00245 printf("Send 192\r\n"); 00246 char packet_192[] = {0xc0, 0x00}; 00247 send_data((char*)packet_192, 2); 00248 lastActivity = t; 00249 } 00250 } 00251 } 00252 00253 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) 00254 { 00255 switch(e) 00256 { 00257 case TCPSOCKET_ACCEPT: 00258 printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n"); 00259 break; 00260 case TCPSOCKET_CONNECTED: 00261 printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n"); 00262 connected = true; 00263 break; 00264 case TCPSOCKET_WRITEABLE: 00265 printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n"); 00266 break; 00267 case TCPSOCKET_READABLE: 00268 printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n"); 00269 if(!sessionOpened) 00270 { 00271 read_open_session(); 00272 } 00273 read_data(); 00274 break; 00275 case TCPSOCKET_CONTIMEOUT: 00276 printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n"); 00277 break; 00278 case TCPSOCKET_CONRST: 00279 printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n"); 00280 break; 00281 case TCPSOCKET_CONABRT: 00282 printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n"); 00283 break; 00284 case TCPSOCKET_ERROR: 00285 printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n"); 00286 break; 00287 case TCPSOCKET_DISCONNECTED: 00288 printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n"); 00289 pTCPSocket->close(); 00290 connected = false; 00291 break; 00292 } 00293 }
Generated on Thu Jul 14 2022 10:15:43 by
1.7.2