able to subscribe for >10hrs and still running
Dependencies: ADE7758_v1 Crypto DHT11 MQTT MbedJSONValue SDFileSystem SPI_TFT_ILI9341 SWSPI SetRTC TFT_fonts Touch W5500Interface mbed-rtos mbed-src tuanpm
Fork of PB_emma_controller_mbed_src by
Diff: emmaCode.cpp
- Revision:
- 28:cd25d46cb141
- Parent:
- 27:259aaa249619
--- a/emmaCode.cpp Wed Aug 05 10:07:18 2015 +0000 +++ b/emmaCode.cpp Fri Aug 07 02:56:53 2015 +0000 @@ -330,7 +330,7 @@ parse(jsonValue,str.c_str()); - char *parameter[2] = {"wifiSSID","wifiPASS"}; + const char *parameter[2] = {"wifiSSID","wifiPASS"}; for(int i=0; i<2; i++) { if(jsonValue.hasMember(parameter[i])) { @@ -447,7 +447,7 @@ parse(jsonValue,str.c_str()); - char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"}; + const char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"}; for(int i=0; i<4; i++) { if(jsonValue.hasMember(parameter[i])) { @@ -507,7 +507,7 @@ parse(jsonValue,str.c_str()); - char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"}; + const char *parameter[5] = {"gprsAPN","proxySERVER","proxyPORT","proxyAUTH","epochTime"}; for(int i=0; i<4; i++) { if(jsonValue.hasMember(parameter[i])) { @@ -633,7 +633,7 @@ parse(jsonValue,str.c_str()); - char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"}; + const char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"}; //save platform parameter writeSetting(parameter[0],"()"); //sd card need to be initialized @@ -771,7 +771,7 @@ parse(jsonValue,str.c_str()); - char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"}; + const char *parameter[4] = {"platformDOMAIN","platformKEY","platformSECRET","registrationKey"}; //save platform parameter writeSetting(parameter[0],"()"); //sd card need to be initialized @@ -882,7 +882,6 @@ } void emmaModeOperation(void) { - char *parameter[5] = {"\"id\"","\"nType\"","\"nAddr\"","\"dType\"","\"cmd\""}; char mqttClientId[32]; char p[64]; char q[32]; @@ -943,7 +942,7 @@ //sprintf(s,"emma-%s",emmaUID.c_str()); //hmac = calculateMD5(s); //DBG.printf("hmac:%s\r\n",hmac.c_str()); - + int count = 0; if(ethConnected) { DBG.printf("emmaModeOperation - eth\r\n"); DBG.printf("IP Address:%s\r\n",ipstack.getEth().getIPAddress()); @@ -952,85 +951,19 @@ // t.start(); // DBG.printf("start\r\n"); while(true) { -// if(!ipstack.getEth().linkstatus()) { -// NVIC_SystemReset(); -// } - - //check for new command - if(newCommand) { - string cmd(globalCommand); - string cmdBuffer; - int keyIndex, key2Index; - int valueLength; - - DBG.printf("Parsing CMD...\r\n"); - - for (int i=0; i<5; i++) { - keyIndex = cmd.find(parameter[i]); - key2Index = cmd.find(parameter[i+1]); - valueLength = key2Index - keyIndex - strlen(parameter[i]) - 2; - DBG.printf("%s index: %d\r\n", parameter[i], keyIndex); - cmdBuffer.assign(cmd, keyIndex+strlen(parameter[i]), valueLength); - DBG.printf("the value: %s\r\n", cmdBuffer); - } - - DBG.printf("Checking command validity...\r\n"); - //check if command is valid - bool validCommand = true; - DBG.printf("Command validity:%d\r\n",validCommand); - -// if(validCommand) { -// string commandId = jsonValue[parameter[0]].get<std::string>(); -// string commandNType = jsonValue[parameter[1]].get<std::string>(); -// string commandNAddr = jsonValue[parameter[2]].get<std::string>(); -// string commandDType = jsonValue[parameter[3]].get<std::string>(); -// string commandCmd = jsonValue[parameter[4]].get<std::string>(); -// -// if(commandNType == "0") { //switch on panel controller -// DBG.printf("command for switch\r\n"); -// } -// else if(commandNType == "1") { //node with mac address -// DBG.printf("command for node\r\n"); -// //get node ip address based on node mac address -// string nodeIP = readNodeIP(commandNAddr); -// //DBG.printf("nodeIP: %s\r\n",nodeIP.c_str()); -// -// //get cmd string based on device type and command number -// string nodeCmd = readNodeCmd(commandDType,commandCmd); -// //DBG.printf("nodeCmd: %s\r\n",nodeCmd.c_str()); -// //execute command -// //int trial=0; -// trial = 0; -// bool execStatus=false; -// -// while( !execStatus ) { -// sprintf(s,"<?xml version=\"1.0\" encoding=\"utf-8\"?><app_cmd cmd=\"5\" /><app_data code=\"%s\"/>\r\n",nodeCmd.c_str()); -// str = s; -// DBG.printf("str value:%s\r\n",str.c_str()); -// string rcv = ethGET(nodeIP,REMOTE_TCP_PORT,str); // cause mqtt to stop -// DBG.printf("response:%s\r\n",rcv.c_str()); -// str = rcv; -// if(str.find("OK") != std::string::npos) { -// DBG.printf("CMD executed successfully\r\n"); -// execStatus = true; -// } -// if(trial>0) { //two times trial -// DBG.printf("cmd is not executed\r\n"); -// execStatus = false; -// } -// DBG.printf("Trial++\r\n"); -// trial++; -// wait(3); -// } -// DBG.printf("Send execution status\r\n"); -// //send execution status -// } -// } - newCommand = false; - DBG.printf("New command false\r\n"); + if(!ipstack.getEth().linkstatus()) { + NVIC_SystemReset(); } + + if (++count == 200) + { // Publish a message every second + if (publish(&client, &ipstack) != 0) + ethMQTTAttemptConnect(&client, &ipstack); // if we have lost the connection + count = 0; + } + // DBG.printf("Yielding...\r\n"); - client.yield(10); //allow MQTT client to receive message + client.yield(100); //allow MQTT client to receive message // DBG.printf("Yield finished\r\n"); } } else if(wifiConnected) { @@ -1122,7 +1055,7 @@ str.erase(str.begin()+str.rfind("}]")+2,str.end()); parse(jsonValue,str.c_str()); - char *parameter[2] = {"mac","ip"}; + const char *parameter[2] = {"mac","ip"}; TFT.locate(0,40); TFT.printf(" "); @@ -1455,7 +1388,7 @@ TFT.printf("newCommand"); parse(jsonValue,globalCommand.c_str()); - char *parameter[5] = {"name","nType","nAddr","dType","cmd"}; + const char *parameter[5] = {"name","nType","nAddr","dType","cmd"}; //check whether command is valid bool validCommand = true; @@ -1674,7 +1607,7 @@ parse(jsonValue,str.c_str()); - char *parameter[2] = {"firmwareVer","numPart"}; + const char *parameter[2] = {"firmwareVer","numPart"}; for(int i=0; i<2; i++) { if(jsonValue.hasMember(parameter[i])) { @@ -1972,27 +1905,113 @@ } /*end wifi rest*/ +int publish(MQTT::Client<MQTTEthernet, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTEthernet* ipstack) +{ + MQTT::Message message; + char* topic = new char[strlen(platformDOMAIN.c_str())+strlen(emmaUID.c_str())+9]; + *topic = '\0'; + strcat(topic, platformDOMAIN.c_str()); + strcat(topic, "/"); + strcat(topic, emmaUID.c_str()); + strcat(topic, "/dummy"); + + const char buf[25] = "{\"d\":{\"myName\":\"EMMA\"}}"; + + message.qos = MQTT::QOS2; + message.retained = false; + message.dup = false; + message.payload = (void*)buf; + message.payloadlen = strlen(buf); + + DBG.printf("Publishing %s\r\n", buf); + return client->publish(topic, &message); +} + /*start eth mqtt*/ void ethMQTTMessageArrived(MQTT::MessageData& md) { + MQTT::Message &message = md.message; - DBG.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); - DBG.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload); + char topic[md.topicName.lenstring.len + 1]; + string str; + sprintf(topic, "%.*s", md.topicName.lenstring.len, md.topicName.lenstring.data); - string sp = string((char*)(message.payload)); - + DBG.printf("Message arrived on topic %s: %.*s\r\n", topic, message.payloadlen, message.payload); + char *s = (char*)message.payload; + string sp(s); if(sp.find("[") != std::string::npos && sp.find("]") != std::string::npos) { sp.erase(sp.begin(),sp.begin()+sp.find("[")+1); - sp.erase(sp.begin()+sp.find("]"), sp.end()); - globalCommand.assign(sp); - newCommand = true; + sp.erase(sp.begin()+sp.find("]"),sp.end()); + } + else { + DBG.printf("Invalid MQTT command"); + return; + } + + MbedJSONValue jsonValue; + parse(jsonValue, sp.c_str()); + const char *parameter[5] = {"name","nType","nAddr","dType","cmd"}; + + //check if command is valid + bool validCommand = true; + for(int i=0; i<5; i++) { + validCommand = validCommand && jsonValue.hasMember(parameter[i]); } - sp.clear(); + DBG.printf("command validity:%d\r\n",validCommand); + + //check for new command + if(validCommand) { + string commandId = jsonValue[parameter[0]].get<std::string>(); + string commandNType = jsonValue[parameter[1]].get<std::string>(); + string commandNAddr = jsonValue[parameter[2]].get<std::string>(); + string commandDType = jsonValue[parameter[3]].get<std::string>(); + string commandCmd = jsonValue[parameter[4]].get<std::string>(); + + if(commandNType == "0") { //switch on panel controller + DBG.printf("command for switch\r\n"); + } + else if(commandNType == "1") { //node with mac address + DBG.printf("command for node\r\n"); + //get node ip address based on node mac address + string nodeIP = readNodeIP(commandNAddr); + //DBG.printf("nodeIP: %s\r\n",nodeIP.c_str()); + + //get cmd string based on device type and command number + string nodeCmd = readNodeCmd(commandDType,commandCmd); + //DBG.printf("nodeCmd: %s\r\n",nodeCmd.c_str()); + //execute command + int trial=0; + trial = 0; + bool execStatus=false; + + while( !execStatus ) { + sprintf(s,"<?xml version=\"1.0\" encoding=\"utf-8\"?><app_cmd cmd=\"5\" /><app_data code=\"%s\"/>\r\n",nodeCmd.c_str()); + str.assign(s); + DBG.printf("str value:%s\r\n",str.c_str()); + string rcv = ethGET(nodeIP,REMOTE_TCP_PORT,str); // cause mqtt to stop + DBG.printf("response:%s\r\n",rcv.c_str()); + str = rcv; + if(str.find("OK") != std::string::npos) { + DBG.printf("CMD executed successfully\r\n"); + execStatus = true; + break; + } + if(trial>5) { //two times trial + DBG.printf("cmd is not executed\r\n"); + execStatus = false; + } + DBG.printf("Trial++\r\n"); + trial++; + wait(3); + } + DBG.printf("Send execution status\r\n"); + //send execution status + } + } + DBG.printf("Finish processing new command\r\n"); } int ethMQTTConnect(MQTT::Client<MQTTEthernet, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTEthernet* ipstack) { - //char hostname[] = "q.thingfabric.com"; - //char hostname[] = "192.168.131.200"; - //int rc = ipstack->connect(hostname, MQTT_PORT); + int rc = ipstack->connect(MQTT_HOST, MQTT_PORT); if(rc!=0) @@ -2000,44 +2019,57 @@ //MQTT Connect //char clientId[] = "emma/0674ff575349896767072538"; - char clientId [32]; - sprintf(clientId,"emma/%s",emmaUID.c_str()); - //DBG.printf("clientId:%s\r\n",clientId); + char clientId [strlen(emmaUID.c_str())+6]; + sprintf(clientId,"emma/%s", emmaUID.c_str()); + DBG.printf("clientId:%s\r\n",clientId); + MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.MQTTVersion = 3; data.clientID.cstring = clientId; data.username.cstring = "761b233e-a49a-4830-a8ae-87cec3dc1086"; data.password.cstring = "ef25cf4567fbc07113252f8d72b7faf2"; - if((rc = client->connect(&data)) == 0) { - DBG.printf("connected\r\n"); + if ((rc = client->connect(&data)) == 0) + { +// connected = true; + DBG.printf("MQTT connected\r\n"); } //MQTT Subscribe - //char* topic = "gaisbwqreqrfxjc/0674ff575349896767072538/command"; - char s[64]; - sprintf(s,"%s/%s/command",platformDOMAIN.c_str(),emmaUID.c_str()); - string topic = s; - DBG.printf("topic:%s\r\n",topic.c_str()); - if((rc=client->subscribe(topic.c_str(),MQTT::QOS2,ethMQTTMessageArrived)) == 0) { - DBG.printf("subscribe success!\r\n"); +// char* topic = "emma/005300553533510334313732/command"; + char* topic = new char[strlen(platformDOMAIN.c_str())+strlen(emmaUID.c_str())+9]; + *topic = '\0'; + strcat(topic, platformDOMAIN.c_str()); + strcat(topic, "/"); + strcat(topic, emmaUID.c_str()); + strcat(topic, "/command"); + DBG.printf("MQTT subscription topic: %s\r\n", topic); + + if ((rc = client->subscribe(topic, MQTT::QOS2, ethMQTTMessageArrived)) == 0) { + DBG.printf("Subscribe success\r\n"); } + return rc; } +int getConnTimeout(int attemptNumber) +{ // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute + // after 20 attempts, retry every 10 minutes + return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600; +} + void ethMQTTAttemptConnect(MQTT::Client<MQTTEthernet, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTEthernet* ipstack) { int retryAttempt = 0; while(!ipstack->getEth().linkstatus()) { + wait(1.0f); DBG.printf("Ethernet link not present. Check cable connection\r\n"); - wait(1); } while(ethMQTTConnect(client,ipstack) != 0) { - int timeout = 3; + int timeout = getConnTimeout(++retryAttempt); DBG.printf("Retry attempt number %d waiting %d\r\n", retryAttempt, timeout); wait(timeout); - retryAttempt++; } } @@ -2094,6 +2126,7 @@ fclose(fp); return true; } + fclose(fp); return false; } /*end emma settings*/ @@ -2213,9 +2246,9 @@ /*start emma connection function*/ string ethGET(string host, int port, string url) { -// DBG.printf("Entering ethGET with url: %s\r\n", url); -// DBG.printf("Host: %s\r\n", host); -// DBG.printf("Port: %d\r\n", port); + DBG.printf("Entering ethGET with url: %s\r\n", url); + DBG.printf("Host: %s\r\n", host); + DBG.printf("Port: %d\r\n", port); char buf[1024]; char s[256]; int ret;