Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C12832_lcd EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed
Diff: MQTTTransport.cpp
- Revision:
- 129:c4fa24308e33
- Parent:
- 93:e3b732068ae9
- Child:
- 130:9c52e163e733
diff -r 5b7f7bd73117 -r c4fa24308e33 MQTTTransport.cpp
--- a/MQTTTransport.cpp Sun Mar 16 06:16:04 2014 +0000
+++ b/MQTTTransport.cpp Sun Mar 16 17:30:59 2014 +0000
@@ -29,7 +29,10 @@
// MQTT callback to handle received messages
void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
- if (_mqtt_instance != NULL) _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+ if (_mqtt_instance != NULL) {
+ if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length);
+ else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+ }
}
// our MQTT client endpoint
@@ -40,6 +43,8 @@
this->m_mqtt = NULL;
_mqtt_instance = this;
this->m_map = map;
+ this->m_ping_counter = 1;
+ this->m_ping_countdown = MQTT_PING_COUNTDOWN;
this->initTopic();
}
@@ -256,6 +261,79 @@
return buffer;
}
+ // is this message a PONG message?
+ bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
+ bool isPong = false;
+ char verb[MQTT_PING_VERB_LEN+1];
+ char ep_name[LIGHT_NAME_LEN+1];
+ int counter = 0;
+
+ // clean
+ memset(verb,0,MQTT_PING_VERB_LEN+1);
+ memset(ep_name,0,LIGHT_NAME_LEN+1);
+
+ // make sure we have the right topic
+ if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) {
+ // parse the payload
+ for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
+ sscanf(payload,"%s %s %d",verb,ep_name,&counter);
+
+ // check the contents to make sure its for us...
+ if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) {
+ // its a PONG message to our PING...
+ isPong = true;
+ }
+ }
+
+ // return isPong status
+ return isPong;
+ }
+
+
+ // process this PONG message
+ void MQTTTransport::processPongMessage(char *payload,int payload_length) {
+ // DEBUG
+ this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
+
+ // simply increment the counter
+ ++this->m_ping_counter;
+
+ // reset counter if maxed
+ if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
+ }
+
+ // send a PING message
+ bool MQTTTransport::sendPingMessage() {
+ bool sent = false;
+ char message[MAX_MQTT_MESSAGE_LENGTH+1];
+
+ // initialize...
+ memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
+
+ // build message
+ sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter);
+
+ // send the message over the ping/pong topic
+ sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message));
+ if (sent) {
+ // send succeeded
+ this->logger()->log("PING %d sent successfully",this->m_ping_counter);
+ this->logger()->blinkTransportTxLED();
+ }
+ else {
+ // send failed! - reconnect
+ this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
+
+ // attempt reconnect
+ this->disconnect();
+ sent = this->connect();
+ if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
+ }
+
+ // return our status
+ return sent;
+ }
+
// connect up MQTT
bool MQTTTransport::connect() {
char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
@@ -270,17 +348,24 @@
this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
if (this->m_mqtt->subscribe(this->getTopic())) {
if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
- this->logger()->log("MQTT CONNECTED.");
- this->m_connected = true;
+ if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) {
+ this->logger()->log("MQTT CONNECTED.");
+ this->m_connected = true;
+ }
+ else {
+ this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED",MQTT_PING_TOPIC);
+ this->logger()->turnLEDRed();
+ this->m_connected = false;
+ }
}
else {
- this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",this->getTopic());
+ this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC);
this->logger()->turnLEDRed();
this->m_connected = false;
}
}
else {
- this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
+ this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic());
this->logger()->turnLEDRed();
this->m_connected = false;
}
@@ -308,7 +393,10 @@
if (this->m_mqtt != NULL) {
this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
this->m_mqtt->unsubscribe(this->getTopic());
+ this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC);
this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
+ this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC);
+ this->m_mqtt->unsubscribe(MQTT_PING_TOPIC);
this->logger()->log("MQTT Disconnecting...");
this->m_mqtt->disconnect();
}
@@ -321,8 +409,17 @@
// check transport and process stuff
void MQTTTransport::checkAndProcess() {
+ // process any MQTT messages
if (this->m_mqtt != NULL && this->m_connected == true) {
this->m_mqtt->loop();
this->logger()->blinkTransportRxLED();
}
+
+ // send a PING if time for it
+ --this->m_ping_countdown;
+ if (this->m_ping_countdown <= 0) {
+ this->logger()->log("MQTT: Sending PING...");
+ this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+ this->sendPingMessage();
+ }
}
\ No newline at end of file