Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C12832_lcd EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed
MQTTTransport.cpp
- Committer:
- ansond
- Date:
- 2014-02-26
- Revision:
- 5:4ad5ec5802a2
- Parent:
- 4:11f00b499106
- Child:
- 6:34c07e145caa
File content as of revision 5:4ad5ec5802a2:
/* Copyright C2013 Doug Anson, MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software
* and associated documentation files the "Software", to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge, publish, distribute,
* sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or
* substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "MQTTTransport.h"
// Endpoint Support
#include "MBEDEndpoint.h"
// string splitter support
#include "Splitter.h"
// our transmitt instance
MQTTTransport *instance = NULL;
// MQTT callback to handle received messages
void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
if (instance != NULL) instance->processMessage((char *)payload,length);
}
// our MQTT client endpoint
PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
// default constructor
MQTTTransport::MQTTTransport(ErrorHandler *error_handler) : Transport(error_handler) {
this->m_mqtt = NULL;
instance = this;
this->m_map = new MBEDToIOCResourceMap();
}
// default destructor
MQTTTransport::~MQTTTransport() {
this->disconnect();
if (this->m_mqtt != NULL) delete this->m_mqtt;
}
// process a MQTT Message
void MQTTTransport::processMessage(char *payload, unsigned int len) {
char *message_type = NULL;
char *message_name = NULL;
char *message_verb = NULL;
char *message_value = NULL;
char *message_opt = NULL;
char *endpoint_name = NULL;
MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
// DEBUG
this->logger()->pause("MQTT Message: %s length=%d",payload,len);
// split the string by the delimiter
this->logger()->pause("DOUG1");
Splitter *splitter = new Splitter(payload,':',this->logger());
this->logger()->pause("DOUG2");
char** data = splitter->split();
this->logger()->pause("DOUG3");
int count = splitter->size(data);
this->logger()->pause("DOUG4");
// format of the MQTT message: message_type:name:verb|Parameter_X:value|keyword:optional_data
/*
if (count > 0) message_type = data[0];
if (count > 1) message_name = data[1];
if (count > 2) message_verb = data[2];
if (count > 3) message_value = data[3];
if (count > 4) message_opt = data[4];
*/
// get our endpoint name
this->logger()->pause("DOUG5");
endpoint_name = endpoint->getEndpointName();
// XXX FIXUP until MQTT messages have endpoint_names in them
if (count > 0) message_type = data[0];
message_name = endpoint->getEndpointName(); // if (count > 1) message_name = (char *)data[1].c_str();
if (count > 1) message_verb = data[1];
if (count > 2) message_value = data[2];
if (count > 3) message_opt = data[3];
// DEBUG
this->logger()->log("Type: %s Name: %s Verb: %s Value: %s",message_type,message_name,message_verb,message_value);
// only respond if its for our node
if (false && strcmp(endpoint_name,message_name) == 0) {
// DEBUG
this->logger()->log("Message: %s bound for %s... processing...",payload,endpoint_name);
// load endpoints
if (message_type != NULL && strcmp(message_type,"Endpoint") == 0) {
if (message_name != NULL && strcmp(message_name,"all") == 0) {
if (message_verb != NULL && strcmp(message_verb,"load") == 0) {
// load up our endpoint
endpoint->loadEndpoint();
}
if (message_verb != NULL && strcmp(message_verb,"Update") == 0) {
// update our endpoint
endpoint->updateEndpoint();
}
}
else {
// destined for our lights?
int index = -1;
if (message_name != NULL) {
endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
if (message_verb != NULL && strcmp(message_verb,"Update") == 0) {
// update our endpoint
endpoint->updateEndpoint();
}
}
}
}
}
// change a resource value
if (message_type != NULL && strcmp(message_type,"Change") == 0) {
if (message_name != NULL) {
// destined for our lights?
int index = endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
if (message_verb != NULL) {
// map the parameter to one of ours
char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
if (mapped_resource != NULL) {
if (message_value != NULL) {
ResourceFactory *factory = endpoint->getResources(index);
factory->setResourceValue(message_name,message_value);
}
}
}
}
}
}
// get a resource value
if (message_type != NULL && strcmp(message_type,"Get") == 0) {
if (message_name != NULL) {
// destined for our lights?
int index = endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
if (message_verb != NULL) {
// map the parameter to one of ours
char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
if (mapped_resource != NULL) {
ResourceFactory *factory = endpoint->getResources(index);
char *resource_value = factory->getResourceValue((char *)message_name);
// end the resource value back over MQTT
this->sendResourceValue(message_name,message_verb,resource_value);
}
}
}
}
}
}
else {
// message not bound for our node
this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
}
// clean up the array
if (data != NULL) splitter->clear(data);
// clean up
if (splitter != NULL) delete splitter;
}
void MQTTTransport::sendResourceValue(char *endpoint_name,char *parameter_name,char *value) {
}
char *MQTTTransport::mapEndpointResourceToIOCResource(char *ioc_name) {
return this->m_map->getMBEDMappedName(ioc_name);
}
char *MQTTTransport::makeID(char *id_template,char *buffer) {
srand(time(0));
srand(rand());
sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE);
return buffer;
}
// connect up MQTT
bool MQTTTransport::connect() {
char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
if (this->m_connected == false) {
this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
this->m_mqtt = &_mqtt;
if (this->m_mqtt != NULL) {
char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
this->logger()->log("MQTT Connect: ID: %s...",id);
if (this->m_mqtt->connect(id)) {
this->logger()->log("MQTT Subscribe: Topic: %s...",MQTT_IOC_TOPIC);
if (this->m_mqtt->subscribe(MQTT_IOC_TOPIC)) {
this->logger()->log("MQTT CONNECTED.");
this->m_connected = true;
}
else {
this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_TOPIC);
this->logger()->turnLEDRed();
}
}
else {
this->logger()->log("MQTT Connect: ID: %s FAILED",id);
this->logger()->turnLEDRed();
}
}
else {
this->logger()->log("MQTT Unable to allocate new instance");
this->logger()->turnLEDRed();
}
}
else {
this->logger()->log("MQTT already connected (OK)");
}
return this->m_connected;
}
// disconnect from MQTT
bool MQTTTransport::disconnect() {
if (this->m_mqtt != NULL) {
this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_TOPIC);
this->m_mqtt->unsubscribe(MQTT_IOC_TOPIC);
this->logger()->log("MQTT Disconnecting...");
this->m_mqtt->disconnect();
}
else {
this->logger()->log("MQTT already disconnected (OK)");
}
this->m_connected = false;
return true;
}
// check transport and process stuff
void MQTTTransport::checkAndProcess() {
if (this->m_mqtt != NULL && this->m_connected == true) {
this->m_mqtt->loop();
this->logger()->blinkMQTTTransportRxLED();
}
}