Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed
Revision 131:27f29e230bbb, committed 2014-03-16
- Comitter:
- ansond
- Date:
- Sun Mar 16 22:38:55 2014 +0000
- Parent:
- 130:9c52e163e733
- Child:
- 132:563a1ee99efc
- Commit message:
- updates
Changed in this revision
--- a/Definitions.h Sun Mar 16 17:41:10 2014 +0000 +++ b/Definitions.h Sun Mar 16 22:38:55 2014 +0000 @@ -111,15 +111,13 @@ #define MQTT_MAXID_VALUE 99 // IOC MQTT Endpoint ID maximum integer value #define MQTT_ENDPOINT_IDLEN 64 // IOC MQTT Endpoint ID length (max) #define MQTT_IOC_TOPIC "ARM/sensinode/control/%s" // IOC MQTT Topic -#define MQTT_IOC_ANNOUNCE_TOPIC "ARM/sensinode/control/all" // IOC MQTT Topic (since MQTT on MBED cannot seem to handle wildcards) -#define MQTT_PING_TOPIC "ARM/mqtt/ping" // GW-Node Ping/Pong topic (to keep this endpoint MQTT connection alive) #define MQTT_IOC_ALL_ENDPOINT "all" // must be the same as the last element of MATT_IOC_ANNOUNCE_TOPIC #define MQTT_IOC_TOPIC_LEN 64 // max length for the topic string #define MQTT_PAYLOAD_SEGMENT_LEN 64 // max length for a segment of the payload #define MQTT_USERNAME "" // IOC MQTT Username #define MQTT_PASSWORD "" // IOC MQTT Password #define MQTT_PING_VERB_LEN 10 // Ping or Pong -#define MQTT_PING_COUNTDOWN 1200 // every 1200 250ms iterations (5 minutes) +#define MQTT_PING_COUNTDOWN 120 // every 1200 250ms iterations (5 minutes) #define MQTT_MAX_COUNTER 32768 // largest Ping counter before reset back to 1 #endif // _DEFINITIONS_H \ No newline at end of file
--- a/MQTTTransport.cpp Sun Mar 16 17:41:10 2014 +0000
+++ b/MQTTTransport.cpp Sun Mar 16 22:38:55 2014 +0000
@@ -29,9 +29,25 @@
// MQTT callback to handle received messages
void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
+ char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
+ char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
+
+ memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+ memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+ memcpy(buffer,payload,length);
+ strcpy(rcv_topic,topic);
+
if (_mqtt_instance != NULL) {
- if (_mqtt_instance->isPongMessage(topic,payload,length)) _mqtt_instance->processPongMessage(payload,length);
- else _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
+ if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
+ _mqtt_instance->processPongMessage(buffer,length);
+ }
+ else {
+ memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+ memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+ memcpy(buffer,payload,length);
+ strcpy(rcv_topic,topic);
+ _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
+ }
}
}
@@ -52,7 +68,7 @@
MQTTTransport::~MQTTTransport() {
this->disconnect();
}
-
+
// init our topic
void MQTTTransport::initTopic() {
MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
@@ -224,7 +240,8 @@
}
else {
// message not bound for our node
- this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+ //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+ ;
}
}
@@ -265,21 +282,23 @@
bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
bool isPong = false;
char verb[MQTT_PING_VERB_LEN+1];
- char ep_name[LIGHT_NAME_LEN+1];
+ char end[MQTT_PING_VERB_LEN+1];
int counter = 0;
// clean
memset(verb,0,MQTT_PING_VERB_LEN+1);
- memset(ep_name,0,LIGHT_NAME_LEN+1);
-
- // make sure we have the right topic
- if (topic != NULL && strcmp(topic,MQTT_PING_TOPIC) == 0) {
+ memset(end,0,MQTT_PING_VERB_LEN+1);
+
+ // make sure this is for us...
+ char *topic_ep_name = this->getEndpointNameFromTopic(topic);
+ if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
// parse the payload
for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
- sscanf(payload,"%s %s %d",verb,ep_name,&counter);
+ sscanf(payload,"%s%d",verb,&counter);
// check the contents to make sure its for us...
- if (strcmp(verb,"pong") == 0 && strcmp(ep_name,this->m_endpoint_name) == 0 && counter == this->m_ping_counter) {
+ this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
+ if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {
// its a PONG message to our PING...
isPong = true;
}
@@ -311,14 +330,18 @@
memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
// build message
- sprintf(message,"ping:%s:%d",this->m_endpoint_name,this->m_ping_counter);
+ sprintf(message,"ping:%d:",this->m_ping_counter);
// send the message over the ping/pong topic
- sent = this->m_mqtt->publish(MQTT_PING_TOPIC,message,strlen(message));
+ this->logger()->log("Sending PING on %s Message: %s",this->getTopic(),message);
+ sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
if (sent) {
// send succeeded
this->logger()->log("PING %d sent successfully",this->m_ping_counter);
this->logger()->blinkTransportTxLED();
+
+ // wait for 1 second
+ wait_ms(1000);
}
else {
// send failed! - reconnect
@@ -347,25 +370,11 @@
if (this->m_mqtt->connect(id)) {
this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
if (this->m_mqtt->subscribe(this->getTopic())) {
- if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
- if (this->m_mqtt->subscribe(MQTT_PING_TOPIC)) {
- this->logger()->log("MQTT CONNECTED.");
- this->m_connected = true;
- }
- else {
- this->logger()->log("MQTT Subscribe: Topic(PING): %s FAILED (continuing...)",MQTT_PING_TOPIC);
- //this->logger()->turnLEDRed();
- //this->m_connected = false;
- }
- }
- else {
- this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",MQTT_IOC_ANNOUNCE_TOPIC);
- this->logger()->turnLEDRed();
- this->m_connected = false;
- }
+ this->logger()->log("MQTT CONNECTED.");
+ this->m_connected = true;
}
else {
- this->logger()->log("MQTT Subscribe: Topic(ENDPOINT): %s FAILED",this->getTopic());
+ this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
this->logger()->turnLEDRed();
this->m_connected = false;
}
@@ -393,10 +402,6 @@
if (this->m_mqtt != NULL) {
this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
this->m_mqtt->unsubscribe(this->getTopic());
- this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_ANNOUNCE_TOPIC);
- this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
- this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_PING_TOPIC);
- this->m_mqtt->unsubscribe(MQTT_PING_TOPIC);
this->logger()->log("MQTT Disconnecting...");
this->m_mqtt->disconnect();
}
@@ -411,15 +416,21 @@
void MQTTTransport::checkAndProcess() {
// process any MQTT messages
if (this->m_mqtt != NULL && this->m_connected == true) {
- this->m_mqtt->loop();
- this->logger()->blinkTransportRxLED();
+ bool connected = this->m_mqtt->loop();
+ if (connected) {
+ this->logger()->blinkTransportRxLED();
+ }
+ else {
+ this->disconnect();
+ this->connect();
+ }
}
// send a PING if time for it
- --this->m_ping_countdown;
- if (this->m_ping_countdown <= 0) {
- this->logger()->log("MQTT: Sending PING...");
- this->m_ping_countdown = MQTT_PING_COUNTDOWN;
- this->sendPingMessage();
- }
+ //--this->m_ping_countdown;
+ //if (this->m_ping_countdown <= 0) {
+ // this->logger()->log("MQTT: Sending PING...");
+ // this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+ // this->sendPingMessage();
+ //}
}
\ No newline at end of file
--- a/MQTTTransport.h Sun Mar 16 17:41:10 2014 +0000
+++ b/MQTTTransport.h Sun Mar 16 22:38:55 2014 +0000
@@ -47,7 +47,7 @@
virtual bool disconnect();
virtual void checkAndProcess();
-
+
char *getEndpointNameFromTopic(char *topic);
bool isPongMessage(char *topic,char *payload,int payload_length);