MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_ublox_ethernet

Dependencies:   C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

Revision:
131:27f29e230bbb
Parent:
130:9c52e163e733
Child:
132:563a1ee99efc
--- a/MQTTTransport.cpp	Sun Mar 16 17:41:10 2014 +0000
+++ b/MQTTTransport.cpp	Sun Mar 16 22:38:55 2014 +0000
@@ -29,9 +29,25 @@
 
  // MQTT callback to handle received messages
  void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
+    char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
+    char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
+    
+    memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+    memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+    memcpy(buffer,payload,length);
+    strcpy(rcv_topic,topic);
+    
     if (_mqtt_instance != NULL) {
-        if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length);
-        else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
+            _mqtt_instance->processPongMessage(buffer,length);
+        }
+        else {
+            memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+            memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+            memcpy(buffer,payload,length);
+            strcpy(rcv_topic,topic);
+            _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
+        }
     }
  }
  
@@ -52,7 +68,7 @@
  MQTTTransport::~MQTTTransport() {
      this->disconnect();
  }
- 
+  
  // init our topic
  void MQTTTransport::initTopic() {
      MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
@@ -224,7 +240,8 @@
      }
      else {
          // message not bound for our node
-         this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+         //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+         ;
      }     
  }
  
@@ -265,21 +282,23 @@
  bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
      bool isPong = false;
      char verb[MQTT_PING_VERB_LEN+1];
-     char ep_name[LIGHT_NAME_LEN+1];
+     char end[MQTT_PING_VERB_LEN+1];
      int counter = 0;
      
      // clean
      memset(verb,0,MQTT_PING_VERB_LEN+1);
-     memset(ep_name,0,LIGHT_NAME_LEN+1);
-     
-     // make sure we have the right topic
-     if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) {
+     memset(end,0,MQTT_PING_VERB_LEN+1);
+         
+     // make sure this is for us...
+     char *topic_ep_name = this->getEndpointNameFromTopic(topic);
+     if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
          // parse the payload
          for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
-         sscanf(payload,"%s %s %d",verb,ep_name,&counter);
+         sscanf(payload,"%s%d",verb,&counter);
          
          // check the contents to make sure its for us...
-         if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) {     
+         this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
+         if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {     
              // its a PONG message to our PING... 
              isPong = true;
          }
@@ -311,14 +330,18 @@
      memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
      
      // build message
-     sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter);
+     sprintf(message,"ping:%d:",this->m_ping_counter);
      
      // send the message over the ping/pong topic
-    sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message));
+     this->logger()->log("Sending PING on %s Message: %s",this->getTopic(),message);
+     sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
      if (sent) {
          // send succeeded
          this->logger()->log("PING %d sent successfully",this->m_ping_counter);
          this->logger()->blinkTransportTxLED();
+         
+         // wait for 1 second
+         wait_ms(1000);
      }
      else {
          // send failed! - reconnect
@@ -347,25 +370,11 @@
              if (this->m_mqtt->connect(id)) {
                  this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
                  if (this->m_mqtt->subscribe(this->getTopic())) {
-                    if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
-                        if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) {
-                            this->logger()->log("MQTT CONNECTED.");
-                            this->m_connected = true;
-                        }
-                        else {
-                            this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED (continuing...)",MQTT_PING_TOPIC);
-                            //this->logger()->turnLEDRed();
-                            //this->m_connected = false;
-                        }
-                    }
-                    else {
-                        this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC);
-                        this->logger()->turnLEDRed();
-                        this->m_connected = false;
-                    }
+                    this->logger()->log("MQTT CONNECTED.");
+                    this->m_connected = true;
                  }
                  else {
-                     this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic());
+                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
                      this->logger()->turnLEDRed();
                      this->m_connected = false;
                  }
@@ -393,10 +402,6 @@
      if (this->m_mqtt != NULL) {
          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
          this->m_mqtt->unsubscribe(this->getTopic());
-         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC);
-         this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
-         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC);
-         this->m_mqtt->unsubscribe(MQTT_PING_TOPIC);
          this->logger()->log("MQTT Disconnecting...");
          this->m_mqtt->disconnect();
      }
@@ -411,15 +416,21 @@
  void MQTTTransport::checkAndProcess() {
      // process any MQTT messages
      if (this->m_mqtt != NULL && this->m_connected == true) {
-         this->m_mqtt->loop();
-         this->logger()->blinkTransportRxLED();
+         bool connected = this->m_mqtt->loop();
+         if (connected) {
+            this->logger()->blinkTransportRxLED();
+         }
+         else {
+            this->disconnect();
+            this->connect();
+         }
      }
      
      // send a PING if time for it
-     --this->m_ping_countdown;
-     if (this->m_ping_countdown <= 0) {
-         this->logger()->log("MQTT: Sending PING...");
-         this->m_ping_countdown = MQTT_PING_COUNTDOWN;
-         this->sendPingMessage();
-     }
+     //--this->m_ping_countdown;
+     //if (this->m_ping_countdown <= 0) {
+     //    this->logger()->log("MQTT: Sending PING...");
+     //    this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+     //    this->sendPingMessage();
+     //}
  }
\ No newline at end of file