Mark Hembrow / Mbed 2 deprecated MQTTTEST

Dependencies:   EthernetNetIf TextLCD mbed

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File

MQTTClient.cpp

00001 #include "MQTTClient.h"
00002 
00003 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*))
00004 {
00005     this->port = port;
00006     callback_server = callback;
00007     serverIp = server;
00008     connected = false;
00009     sessionOpened = false;
00010     timer.start();
00011 }
00012 
00013 MQTTClient::~MQTTClient()
00014 {
00015 }
00016 
00017 int MQTTClient::send_data(const char* msg, int size)
00018 {
00019     int transLen = pTCPSocket->send(msg, size);
00020     
00021     /*Check send length matches the message length*/
00022     if(transLen != size){
00023         for(int i = 0; i < size; i++) {
00024             printf("%x, ", msg[i]);
00025         }
00026         printf("Error on send.\r\n");
00027         return -1;
00028     }
00029     return 1;
00030 }
00031 
00032 int MQTTClient::open_session(char* id)
00033 {
00034     /*variable header*/
00035     char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,KEEPALIVE/500,0x00,strlen(id)};
00036     
00037     /*fixed header: 2 bytes, big endian*/
00038     char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
00039     
00040     char packet[sizeof(fixed_header) + sizeof(var_header) + sizeof(id)];
00041     
00042     memset(packet,0,sizeof(packet));
00043     memcpy(packet,fixed_header,sizeof(fixed_header));
00044     memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
00045     memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
00046     
00047     /*
00048      for(int i = 0; i < sizeof(packet); i++) {
00049      printf("%x, ", packet[i]);
00050      }
00051      printf("\r\n");*/
00052     
00053     if(!send_data(packet, sizeof(packet)))
00054     {
00055         return -1;
00056     }
00057     return 1;
00058 }
00059 
00060 int MQTTClient::connect(char* id)
00061 {
00062     clientId = id;
00063     
00064     /*Initial TCP socket*/
00065     pTCPSocket = new TCPSocket;
00066     pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
00067     
00068     host.setPort(1883);
00069     host.setIp(serverIp);
00070     host.setName("localhost");
00071     
00072     /*Trying to connect to host*/
00073     printf("Trying to connect to host..\r\n\r\n");
00074     TCPSocketErr err = pTCPSocket->connect(host);
00075     
00076     Net::poll();   
00077     if(err)
00078     {
00079         printf("Error connecting to host [%d]\r\n", (int) err);
00080         return -1;
00081     } 
00082     printf("Connect to host sucessed..\r\n\r\n");    
00083     
00084     /*Wait TCP connection with server to be established*/
00085     int i = 0;
00086     while(!connected)
00087     {
00088         Net::poll();
00089         wait(1);
00090         i++;
00091         printf("Wait for connections %d..\r\n", i);
00092         if(i == 35)
00093         {//If wait too long, give up.
00094             return -1;
00095         }
00096     }
00097     
00098     /*Send open session message to server*/
00099     open_session(id);
00100     
00101     /*Wait server notice of open sesion*/
00102     while(!sessionOpened)
00103     {
00104         Net::poll();
00105         wait(1);
00106         if(!connected){
00107             break;
00108         }
00109         printf("Wait for session..\r\n");
00110     }
00111     if(!connected)
00112     {
00113         return -2;
00114     }
00115     lastActivity = timer.read_ms();
00116     return 1;
00117 }
00118 
00119 int MQTTClient::publish(char* pub_topic, char* msg) 
00120 {
00121     uint8_t var_header_pub[strlen(pub_topic)+3];
00122     strcpy((char *)&var_header_pub[2], pub_topic);
00123     var_header_pub[0] = 0;
00124     var_header_pub[1] = strlen(pub_topic);
00125     var_header_pub[sizeof(var_header_pub)-1] = 0;
00126     
00127     uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
00128     
00129     uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
00130     memset(packet_pub,0,sizeof(packet_pub));
00131     memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
00132     memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
00133     memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
00134     
00135     if(!send_data((char*)packet_pub, sizeof(packet_pub)))
00136     {
00137         return -1;
00138     }
00139     return 1;
00140 }
00141 
00142 
00143 void MQTTClient::disconnect() 
00144 {
00145     char packet_224[] = {0xe0, 0x00};
00146     send_data((char*)packet_224, 2);
00147     
00148     connected = false;
00149 }
00150 
00151 void MQTTClient::read_data()
00152 {
00153     char buffer[1024];
00154     int len = 0, readLen;
00155     
00156     while((readLen = pTCPSocket->recv(buffer, 1024)) != 0){
00157         len += readLen;
00158     }
00159     
00160     buffer[len] = '\0';
00161     
00162     /*
00163      printf("Read length: %d %d\r\n", len, readLen);
00164      
00165      for(int i = 0; i < len; i++) {
00166      printf("%x\r\n", buffer[i],buffer[i]);
00167      }
00168      printf("\r\n");
00169      */
00170     char type = buffer[0]>>4;
00171     if (type == 3) { // PUBLISH
00172         if (callback_server) {
00173             uint8_t tl = (buffer[2]<<3)+buffer[3];
00174             char topic[tl+1];
00175             for (int i=0;i<tl;i++) {
00176                 topic[i] = buffer[4+i];
00177             }
00178             topic[tl] = 0;
00179             // ignore msgID - only support QoS 0 subs
00180             char *payload = buffer+4+tl;
00181             callback_server(topic,(char*)payload);
00182         }
00183     } else if (type == 12) { // PINGREG -- Ask for alive
00184         char packet_208[] = {0xd0, 0x00};
00185         send_data((char*)packet_208, 2);
00186         lastActivity = timer.read_ms();
00187     }
00188 }
00189 
00190 void MQTTClient::read_open_session()
00191 {
00192     char buffer[32];
00193     int len = 0, readLen;
00194     
00195     while((readLen = pTCPSocket->recv(buffer, 32)) != 0)
00196     {
00197         len += readLen;
00198     }
00199     
00200     if(len == 4 && buffer[3] == 0)
00201     {
00202         printf("Session opened\r\n");
00203         sessionOpened = true;
00204     }
00205 }
00206 
00207 int MQTTClient::subscribe(char* topic)
00208 {
00209     
00210     if (connected) 
00211     {
00212         uint8_t var_header_topic[] = {0,10};    
00213         uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
00214         
00215         //utf topic
00216         uint8_t utf_topic[strlen(topic)+3];
00217         strcpy((char *)&utf_topic[2], topic);
00218         
00219         utf_topic[0] = 0;
00220         utf_topic[1] = strlen(topic);
00221         utf_topic[sizeof(utf_topic)-1] = 0;
00222         
00223         char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
00224         memset(packet_topic,0,sizeof(packet_topic));
00225         memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
00226         memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
00227         memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
00228         
00229         if (!send_data(packet_topic, sizeof(packet_topic)))
00230         {
00231             return -1;
00232         }
00233         return 1;
00234     }
00235     return -1;
00236 }
00237 
00238 void MQTTClient::live() 
00239 {
00240     if (connected) 
00241     {
00242         int t = timer.read_ms();
00243         if (t - lastActivity > KEEPALIVE) {
00244             //Send 192 0 to broker
00245             printf("Send 192\r\n");
00246             char packet_192[] = {0xc0, 0x00};
00247             send_data((char*)packet_192, 2);
00248             lastActivity = t;
00249         }
00250     }
00251 }
00252 
00253 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e)
00254 {
00255     switch(e)
00256     {
00257         case TCPSOCKET_ACCEPT:
00258             printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
00259             break;
00260         case TCPSOCKET_CONNECTED: 
00261             printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
00262             connected = true;
00263             break;
00264         case TCPSOCKET_WRITEABLE: 
00265             printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");  
00266             break;
00267         case TCPSOCKET_READABLE:
00268             printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
00269             if(!sessionOpened)
00270             {
00271                 read_open_session();
00272             }
00273             read_data();
00274             break;
00275         case TCPSOCKET_CONTIMEOUT:
00276             printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
00277             break;
00278         case TCPSOCKET_CONRST:
00279             printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
00280             break;
00281         case TCPSOCKET_CONABRT:
00282             printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
00283             break;
00284         case TCPSOCKET_ERROR:
00285             printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
00286             break;
00287         case TCPSOCKET_DISCONNECTED:
00288             printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
00289             pTCPSocket->close();
00290             connected = false;
00291             break;
00292     }
00293 }