Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed
Diff: MQTTTransport.cpp
- Revision:
- 8:45f9a920e82c
- Parent:
- 7:f570eb3f38cd
- Child:
- 9:ff877db53cfd
--- a/MQTTTransport.cpp Wed Feb 26 18:59:08 2014 +0000
+++ b/MQTTTransport.cpp Wed Feb 26 21:29:27 2014 +0000
@@ -21,6 +21,9 @@
// Endpoint Support
#include "MBEDEndpoint.h"
+ // EmulatedResourceFactory support
+ #include "EmulatedResourceFactory.h"
+
// splitstring support
#include "splitstring.h"
@@ -40,6 +43,7 @@
this->m_mqtt = NULL;
instance = this;
this->m_map = new MBEDToIOCResourceMap(error_handler);
+ this->initTopic();
}
// default destructor
@@ -48,18 +52,27 @@
if (this->m_mqtt != NULL) delete this->m_mqtt;
}
+ // init our topic
+ void MQTTTransport::initTopic() {
+ MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
+ char *endpoint_name = endpoint->getEndpointName();
+ memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
+ sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
+ }
+
+ // get our topic
+ char *MQTTTransport::getTopic() { return this->m_topic; }
+
// pull the endpoint name from the MQTT topic
char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1);
-
- // XXX until we get the splitter working, lets just default this name
- char *tmp = ((MBEDEndpoint *)this->getEndpoint())->getEndpointName();
- strncpy(this->m_endpoint_name,tmp,strlen(tmp));
-
- // DEBUG
- if (tmp == NULL) this->logger()->log("ERROR! endpointName is NULL");
- else this->logger()->log("TMP: %s length=%d",tmp,strlen(tmp));
-
+ splitstring *tmp = new splitstring(topic);
+ vector<string> data = tmp->split('/');
+ if (data.size() > 0) {
+ char *ep = (char *)data[data.size()-1].c_str();
+ strncpy(this->m_endpoint_name,ep,strlen(ep));
+ }
+ if (tmp != NULL) delete tmp;
return this->m_endpoint_name;
}
@@ -70,39 +83,26 @@
char *message_value = "";
char *message_opt = "";
- bool enable = true;
-
// get our endpoint
MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
char *endpoint_name = endpoint->getEndpointName();
// DEBUG
- this->logger()->pause("MQTT Message: %s length=%d endpoint_name=%s",payload,len,message_name);
+ //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
// only respond if its for our node
- if (enable && strcmp(endpoint_name,message_name) == 0) {
- // DEBUG
- this->logger()->log("Message: %s bound for %s... processing...",payload,endpoint_name);
-
+ if (strcmp(endpoint_name,message_name) == 0) {
// format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data
splitstring *tmp = new splitstring(payload);
- this->logger()->pause("DOUG1");
vector<string> data = tmp->split(':');
- this->logger()->pause("DOUG2");
if (data.size() > 0) message_type = (char *)data[0].c_str();
- this->logger()->pause("DOUG3");
if (data.size() > 1) message_verb = (char *)data[1].c_str();
- this->logger()->pause("DOUG4");
if (data.size() > 2) message_value = (char *)data[2].c_str();
- this->logger()->pause("DOUG5");
if (data.size() > 3) message_opt = (char *)data[3].c_str();
- this->logger()->pause("DOUG6");
// DEBUG
- this->logger()->log("Type: %s Name: %s Verb: %s Value: %s",message_type,message_name,message_verb,message_value);
-
- return;
-
+ //this->logger()->log("Type: %s Name: %s Verb: %s Value: %s",message_type,message_name,message_verb,message_value);
+
// load endpoints
if (message_type != NULL && strcmp(message_type,"Endpoint") == 0) {
if (message_name != NULL && strcmp(message_name,"all") == 0) {
@@ -132,7 +132,7 @@
// change a resource value
if (message_type != NULL && strcmp(message_type,"Change") == 0) {
- if (message_name != NULL) {
+ if (message_name != NULL) {
// destined for our lights?
int index = endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
@@ -141,8 +141,11 @@
char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
if (mapped_resource != NULL) {
if (message_value != NULL) {
- ResourceFactory *factory = endpoint->getResources(index);
- factory->setResourceValue(message_name,message_value);
+ EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
+ bool success = factory->setResourceValue(mapped_resource,message_value);
+
+ // end the resource value back over MQTT
+ this->sendResult(message_name,message_verb,message_value,success);
}
}
}
@@ -160,22 +163,23 @@
// map the parameter to one of ours
char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
if (mapped_resource != NULL) {
- ResourceFactory *factory = endpoint->getResources(index);
- char *resource_value = factory->getResourceValue((char *)message_name);
+ EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
+ message_value = factory->getResourceValue((char *)mapped_resource);
+ bool success = false; if (message_value != NULL) success = true;
+
+ // log resource get
+ if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
// end the resource value back over MQTT
- this->sendResourceValue(message_name,message_verb,resource_value);
+ this->sendResult(message_name,message_verb,message_value,success);
}
}
}
}
}
- // clean up the array
- //if (data != NULL) splitter->clear(data);
-
// clean up
- //if (splitter != NULL) delete splitter;
+ if (tmp != NULL) delete tmp;
}
else {
// message not bound for our node
@@ -183,12 +187,31 @@
}
}
- void MQTTTransport::sendResourceValue(char *endpoint_name,char *parameter_name,char *value) {
+ // send result back to MQTT
+ void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
+ if (this->m_connected == true) {
+ // send the response back to MQTT
+ this->logger()->log("Sending Response back to MQTT...");
+ char message[MAX_MQTT_MESSAGE_LENGTH+1];
+ memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
+ char *str_success = "OK"; if (!success) str_success = "FAILED";
+ sprintf(message,"response:{ENDPOINT=%s RESOURCE=%s VALUE=%s}:%s",endpoint_name,resource_name,value,str_success);
+ bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
+ if (sent) {
+ this->logger()->log("Result sent successfully");
+ this->logger()->blinkMQTTTransportTxLED();
+ }
+ else {
+ this->logger()->log("Result send FAILED");
+ }
+ }
+ else {
+ // unable to send the response
+ this->logger()->log("Unable to send response back to MQTT. Not connected.");
+ }
}
- char *MQTTTransport::mapEndpointResourceToIOCResource(char *ioc_name) {
- return this->m_map->getMBEDMappedName(ioc_name);
- }
+ char *MQTTTransport::mapEndpointResourceToIOCResource(char *ioc_name) { return this->m_map->getMBEDMappedName(ioc_name); }
char *MQTTTransport::makeID(char *id_template,char *buffer) {
srand(time(0));
@@ -208,13 +231,13 @@
char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
this->logger()->log("MQTT Connect: ID: %s...",id);
if (this->m_mqtt->connect(id)) {
- this->logger()->log("MQTT Subscribe: Topic: %s...",MQTT_IOC_TOPIC);
- if (this->m_mqtt->subscribe(MQTT_IOC_TOPIC)) {
+ this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
+ if (this->m_mqtt->subscribe(this->getTopic())) {
this->logger()->log("MQTT CONNECTED.");
this->m_connected = true;
}
else {
- this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_TOPIC);
+ this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
this->logger()->turnLEDRed();
this->m_connected = false;
}
@@ -240,8 +263,8 @@
// disconnect from MQTT
bool MQTTTransport::disconnect() {
if (this->m_mqtt != NULL) {
- this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_TOPIC);
- this->m_mqtt->unsubscribe(MQTT_IOC_TOPIC);
+ this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
+ this->m_mqtt->unsubscribe(this->getTopic());
this->logger()->log("MQTT Disconnecting...");
this->m_mqtt->disconnect();
}