MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_nxp

Dependencies:   C12832_lcd EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

Revision:
129:c4fa24308e33
Parent:
93:e3b732068ae9
Child:
130:9c52e163e733
--- a/MQTTTransport.cpp	Sun Mar 16 06:16:04 2014 +0000
+++ b/MQTTTransport.cpp	Sun Mar 16 17:30:59 2014 +0000
@@ -29,7 +29,10 @@
 
  // MQTT callback to handle received messages
  void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
-    if (_mqtt_instance != NULL) _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+    if (_mqtt_instance != NULL) {
+        if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length);
+        else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+    }
  }
  
  // our MQTT client endpoint
@@ -40,6 +43,8 @@
      this->m_mqtt = NULL;
      _mqtt_instance = this;
      this->m_map = map;
+     this->m_ping_counter = 1;
+     this->m_ping_countdown = MQTT_PING_COUNTDOWN;
      this->initTopic();
  }
  
@@ -256,6 +261,79 @@
      return buffer;
  }
  
+ // is this message a PONG message?
+ bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
+     bool isPong = false;
+     char verb[MQTT_PING_VERB_LEN+1];
+     char ep_name[LIGHT_NAME_LEN+1];
+     int counter = 0;
+     
+     // clean
+     memset(verb,0,MQTT_PING_VERB_LEN+1);
+     memset(ep_name,0,LIGHT_NAME_LEN+1);
+     
+     // make sure we have the right topic
+     if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) {
+         // parse the payload
+         for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
+         sscanf(payload,"%s %s %d",verb,ep_name,&counter);
+         
+         // check the contents to make sure its for us...
+         if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) {     
+             // its a PONG message to our PING... 
+             isPong = true;
+         }
+     }
+     
+     // return isPong status
+     return isPong;
+ }
+ 
+ 
+ // process this PONG message
+ void MQTTTransport::processPongMessage(char *payload,int payload_length) {
+     // DEBUG
+     this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
+             
+     // simply increment the counter
+     ++this->m_ping_counter;
+     
+     // reset counter if maxed 
+     if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
+ }
+ 
+ // send a PING message
+ bool MQTTTransport::sendPingMessage() {
+     bool sent = false;
+     char message[MAX_MQTT_MESSAGE_LENGTH+1];
+     
+     // initialize...
+     memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
+     
+     // build message
+     sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter);
+     
+     // send the message over the ping/pong topic
+    sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message));
+     if (sent) {
+         // send succeeded
+         this->logger()->log("PING %d sent successfully",this->m_ping_counter);
+         this->logger()->blinkTransportTxLED();
+     }
+     else {
+         // send failed! - reconnect
+         this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
+         
+         // attempt reconnect
+         this->disconnect();
+         sent = this->connect();
+         if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
+     }
+     
+     // return our status
+     return sent;
+ }
+ 
  // connect up MQTT
  bool MQTTTransport::connect() {
      char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
@@ -270,17 +348,24 @@
                  this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
                  if (this->m_mqtt->subscribe(this->getTopic())) {
                     if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
-                        this->logger()->log("MQTT CONNECTED.");
-                        this->m_connected = true;
+                        if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) {
+                            this->logger()->log("MQTT CONNECTED.");
+                            this->m_connected = true;
+                        }
+                        else {
+                            this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED",MQTT_PING_TOPIC);
+                            this->logger()->turnLEDRed();
+                            this->m_connected = false;
+                        }
                     }
                     else {
-                        this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",this->getTopic());
+                        this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC);
                         this->logger()->turnLEDRed();
                         this->m_connected = false;
                     }
                  }
                  else {
-                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
+                     this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic());
                      this->logger()->turnLEDRed();
                      this->m_connected = false;
                  }
@@ -308,7 +393,10 @@
      if (this->m_mqtt != NULL) {
          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
          this->m_mqtt->unsubscribe(this->getTopic());
+         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC);
          this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
+         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC);
+         this->m_mqtt->unsubscribe(MQTT_PING_TOPIC);
          this->logger()->log("MQTT Disconnecting...");
          this->m_mqtt->disconnect();
      }
@@ -321,8 +409,17 @@
  
  // check transport and process stuff
  void MQTTTransport::checkAndProcess() {
+     // process any MQTT messages
      if (this->m_mqtt != NULL && this->m_connected == true) {
          this->m_mqtt->loop();
          this->logger()->blinkTransportRxLED();
      }
+     
+     // send a PING if time for it
+     --this->m_ping_countdown;
+     if (this->m_ping_countdown <= 0) {
+         this->logger()->log("MQTT: Sending PING...");
+         this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+         this->sendPingMessage();
+     }
  }
\ No newline at end of file