MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_ublox_ethernet

Dependencies:   C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

Files at this revision

API Documentation at this revision

Comitter:
ansond
Date:
Sun Mar 16 22:38:55 2014 +0000
Parent:
130:9c52e163e733
Child:
132:563a1ee99efc
Commit message:
updates

Changed in this revision

Definitions.h Show annotated file Show diff for this revision Revisions of this file
MQTTTransport.cpp Show annotated file Show diff for this revision Revisions of this file
MQTTTransport.h Show annotated file Show diff for this revision Revisions of this file
--- a/Definitions.h	Sun Mar 16 17:41:10 2014 +0000
+++ b/Definitions.h	Sun Mar 16 22:38:55 2014 +0000
@@ -111,15 +111,13 @@
 #define MQTT_MAXID_VALUE            99                              // IOC MQTT Endpoint ID maximum integer value
 #define MQTT_ENDPOINT_IDLEN         64                              // IOC MQTT Endpoint ID length (max)
 #define MQTT_IOC_TOPIC              "ARM/sensinode/control/%s"      // IOC MQTT Topic 
-#define MQTT_IOC_ANNOUNCE_TOPIC     "ARM/sensinode/control/all"     // IOC MQTT Topic (since MQTT on MBED cannot seem to handle wildcards)
-#define MQTT_PING_TOPIC             "ARM/mqtt/ping"                 // GW-Node Ping/Pong topic (to keep this endpoint MQTT connection alive)
 #define MQTT_IOC_ALL_ENDPOINT       "all"                           // must be the same as the last element of MATT_IOC_ANNOUNCE_TOPIC
 #define MQTT_IOC_TOPIC_LEN          64                              // max length for the topic string
 #define MQTT_PAYLOAD_SEGMENT_LEN    64                              // max length for a segment of the payload
 #define MQTT_USERNAME               ""                              // IOC MQTT Username
 #define MQTT_PASSWORD               ""                              // IOC MQTT Password
 #define MQTT_PING_VERB_LEN          10                              // Ping or Pong
-#define MQTT_PING_COUNTDOWN         1200                            // every 1200 250ms iterations (5 minutes)
+#define MQTT_PING_COUNTDOWN         120                            // every 1200 250ms iterations (5 minutes)
 #define MQTT_MAX_COUNTER            32768                           // largest Ping counter before reset back to 1
 
 #endif // _DEFINITIONS_H
\ No newline at end of file
--- a/MQTTTransport.cpp	Sun Mar 16 17:41:10 2014 +0000
+++ b/MQTTTransport.cpp	Sun Mar 16 22:38:55 2014 +0000
@@ -29,9 +29,25 @@
 
  // MQTT callback to handle received messages
  void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
+    char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
+    char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
+    
+    memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+    memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+    memcpy(buffer,payload,length);
+    strcpy(rcv_topic,topic);
+    
     if (_mqtt_instance != NULL) {
-        if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length);
-        else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
+            _mqtt_instance->processPongMessage(buffer,length);
+        }
+        else {
+            memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+            memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+            memcpy(buffer,payload,length);
+            strcpy(rcv_topic,topic);
+            _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
+        }
     }
  }
  
@@ -52,7 +68,7 @@
  MQTTTransport::~MQTTTransport() {
      this->disconnect();
  }
- 
+  
  // init our topic
  void MQTTTransport::initTopic() {
      MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
@@ -224,7 +240,8 @@
      }
      else {
          // message not bound for our node
-         this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+         //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+         ;
      }     
  }
  
@@ -265,21 +282,23 @@
  bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
      bool isPong = false;
      char verb[MQTT_PING_VERB_LEN+1];
-     char ep_name[LIGHT_NAME_LEN+1];
+     char end[MQTT_PING_VERB_LEN+1];
      int counter = 0;
      
      // clean
      memset(verb,0,MQTT_PING_VERB_LEN+1);
-     memset(ep_name,0,LIGHT_NAME_LEN+1);
-     
-     // make sure we have the right topic
-     if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) {
+     memset(end,0,MQTT_PING_VERB_LEN+1);
+         
+     // make sure this is for us...
+     char *topic_ep_name = this->getEndpointNameFromTopic(topic);
+     if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
          // parse the payload
          for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
-         sscanf(payload,"%s %s %d",verb,ep_name,&counter);
+         sscanf(payload,"%s%d",verb,&counter);
          
          // check the contents to make sure its for us...
-         if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) {     
+         this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
+         if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {     
              // its a PONG message to our PING... 
              isPong = true;
          }
@@ -311,14 +330,18 @@
      memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
      
      // build message
-     sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter);
+     sprintf(message,"ping:%d:",this->m_ping_counter);
      
      // send the message over the ping/pong topic
-    sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message));
+     this->logger()->log("Sending PING on %s Message: %s",this->getTopic(),message);
+     sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
      if (sent) {
          // send succeeded
          this->logger()->log("PING %d sent successfully",this->m_ping_counter);
          this->logger()->blinkTransportTxLED();
+         
+         // wait for 1 second
+         wait_ms(1000);
      }
      else {
          // send failed! - reconnect
@@ -347,25 +370,11 @@
              if (this->m_mqtt->connect(id)) {
                  this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
                  if (this->m_mqtt->subscribe(this->getTopic())) {
-                    if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
-                        if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) {
-                            this->logger()->log("MQTT CONNECTED.");
-                            this->m_connected = true;
-                        }
-                        else {
-                            this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED (continuing...)",MQTT_PING_TOPIC);
-                            //this->logger()->turnLEDRed();
-                            //this->m_connected = false;
-                        }
-                    }
-                    else {
-                        this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC);
-                        this->logger()->turnLEDRed();
-                        this->m_connected = false;
-                    }
+                    this->logger()->log("MQTT CONNECTED.");
+                    this->m_connected = true;
                  }
                  else {
-                     this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic());
+                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
                      this->logger()->turnLEDRed();
                      this->m_connected = false;
                  }
@@ -393,10 +402,6 @@
      if (this->m_mqtt != NULL) {
          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
          this->m_mqtt->unsubscribe(this->getTopic());
-         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC);
-         this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
-         this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC);
-         this->m_mqtt->unsubscribe(MQTT_PING_TOPIC);
          this->logger()->log("MQTT Disconnecting...");
          this->m_mqtt->disconnect();
      }
@@ -411,15 +416,21 @@
  void MQTTTransport::checkAndProcess() {
      // process any MQTT messages
      if (this->m_mqtt != NULL && this->m_connected == true) {
-         this->m_mqtt->loop();
-         this->logger()->blinkTransportRxLED();
+         bool connected = this->m_mqtt->loop();
+         if (connected) {
+            this->logger()->blinkTransportRxLED();
+         }
+         else {
+            this->disconnect();
+            this->connect();
+         }
      }
      
      // send a PING if time for it
-     --this->m_ping_countdown;
-     if (this->m_ping_countdown <= 0) {
-         this->logger()->log("MQTT: Sending PING...");
-         this->m_ping_countdown = MQTT_PING_COUNTDOWN;
-         this->sendPingMessage();
-     }
+     //--this->m_ping_countdown;
+     //if (this->m_ping_countdown <= 0) {
+     //    this->logger()->log("MQTT: Sending PING...");
+     //    this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+     //    this->sendPingMessage();
+     //}
  }
\ No newline at end of file
--- a/MQTTTransport.h	Sun Mar 16 17:41:10 2014 +0000
+++ b/MQTTTransport.h	Sun Mar 16 22:38:55 2014 +0000
@@ -47,7 +47,7 @@
         virtual bool disconnect();
         
         virtual void checkAndProcess();
-
+ 
         char *getEndpointNameFromTopic(char *topic);
         
         bool isPongMessage(char *topic,char *payload,int payload_length);