Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed
MQTTTransport.cpp
- Committer:
- ansond
- Date:
- 2014-03-16
- Revision:
- 131:27f29e230bbb
- Parent:
- 130:9c52e163e733
- Child:
- 132:563a1ee99efc
File content as of revision 131:27f29e230bbb:
/* Copyright C2013 Doug Anson, MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software
* and associated documentation files the "Software", to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge, publish, distribute,
* sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or
* substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "MQTTTransport.h"
// Endpoint Support
#include "MBEDEndpoint.h"
// EmulatedResourceFactory support
#include "EmulatedResourceFactory.h"
// our transmitt instance
MQTTTransport *_mqtt_instance = NULL;
// MQTT callback to handle received messages
void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
memcpy(buffer,payload,length);
strcpy(rcv_topic,topic);
if (_mqtt_instance != NULL) {
if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
_mqtt_instance->processPongMessage(buffer,length);
}
else {
memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
memcpy(buffer,payload,length);
strcpy(rcv_topic,topic);
_mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
}
}
}
// our MQTT client endpoint
PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
// default constructor
MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
this->m_mqtt = NULL;
_mqtt_instance = this;
this->m_map = map;
this->m_ping_counter = 1;
this->m_ping_countdown = MQTT_PING_COUNTDOWN;
this->initTopic();
}
// default destructor
MQTTTransport::~MQTTTransport() {
this->disconnect();
}
// init our topic
void MQTTTransport::initTopic() {
MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
char *endpoint_name = endpoint->getEndpointName();
memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
}
// get our topic
char *MQTTTransport::getTopic() { return this->m_topic; }
// get the IOC <--> MBED resource map
MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
// pull the endpoint name from the MQTT topic
char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
if (topic != NULL) {
memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1);
char trash[MQTT_IOC_TOPIC_LEN+1];
char ep[MQTT_IOC_TOPIC_LEN+1];
memset(trash,0,MQTT_IOC_TOPIC_LEN+1);
memset(ep,0,MQTT_IOC_TOPIC_LEN+1);
bool done = false;
int length = 0; if (topic != NULL) length = strlen(topic);
for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; }
sscanf(topic,"%s%s",trash,ep);
if (strlen(ep) > 0) {
if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) {
// just insert the name and let the parser determine if its for us or not...
strncpy(this->m_endpoint_name,ep,strlen(ep));
}
else {
// this is a broadcast message - so we need to process it
MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
char *endpoint_name = endpoint->getEndpointName();
strcpy(this->m_endpoint_name,endpoint_name);
}
}
//this->logger()->log("MQTT Topic (discovered): %s Original: %s",this->m_endpoint_name,topic);
return this->m_endpoint_name;
}
//this->logger()->log("MQTT Topic (discovered): NULL Original: %s",topic);
return NULL;
}
// process a MQTT Message
void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) {
char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1];
char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1];
char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1];
char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1];
// initialize
memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
// get our endpoint
MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
char *endpoint_name = endpoint->getEndpointName();
// DEBUG
//this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
// only respond if its for our node
if (strcmp(endpoint_name,message_name) == 0) {
// format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data
char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
memcpy(buffer,payload,payload_length);
int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count;
for(int i=0;i<payload_length;++i) {
if (buffer[i] == ':') {
if (i < (payload_length-1)) buffer[i] = ' ';
else buffer[i] = '\0';
}
}
if (count == 1) sscanf(buffer,"%s %s",message_type,message_verb);
if (count == 2) sscanf(buffer,"%s %s %s",message_type,message_verb,message_value);
if (count == 3) sscanf(buffer,"%s %s %s %s",message_type,message_verb,message_value,message_opt);
// DEBUG
//this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length);
//this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer));
//this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value);
// load endpoints
if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint
if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load
if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
// load up our endpoints
endpoint->loadEndpoints();
}
else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) {
// load up our endpoints (us only)
endpoint->loadEndpoints();
}
}
else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update
if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
// update our endpoints
endpoint->updateEndpoints();
}
else {
// update just our endpoint
int index = -1;
if (message_name != NULL) {
index = endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
// update our endpoint
endpoint->updateEndpoints(index);
}
}
}
}
}
}
// change a resource value
if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
if (message_name != NULL) {
// destined for our lights?
int index = endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
if (message_verb != NULL) {
// map the parameter to one of ours
char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
if (mapped_resource != NULL) {
if (message_value != NULL) {
EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
bool success = factory->setResourceValue(mapped_resource,message_value);
// end the resource value back over MQTT
this->sendResult(message_name,message_verb,message_value,success);
}
}
}
}
}
}
// get a resource value
if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
if (message_name != NULL) {
// destined for our lights?
int index = endpoint->indexOfLight((char *)message_name);
if (index >= 0) {
if (message_verb != NULL) {
// map the parameter to one of ours
char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
if (mapped_resource != NULL) {
EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
strcpy(message_value,factory->getResourceValue((char *)mapped_resource));
bool success = false; if (message_value != NULL) success = true;
// log resource get
if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
// end the resource value back over MQTT
this->sendResult(message_name,message_verb,message_value,success);
}
}
}
}
}
}
else {
// message not bound for our node
//this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
;
}
}
// send result back to MQTT
void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
if (this->m_connected == true) {
// send the response back to MQTT
this->logger()->log("Sending Response back to MQTT...");
char message[MAX_MQTT_MESSAGE_LENGTH+1];
memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
if (sent) {
this->logger()->log("Result sent successfully");
this->logger()->blinkTransportTxLED();
}
else {
this->logger()->log("Result send FAILED");
}
}
else {
// unable to send the response
this->logger()->log("Unable to send response back to MQTT. Not connected.");
}
}
char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
char *MQTTTransport::makeID(char *id_template,char *buffer) {
srand(time(0));
srand(rand());
sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE);
return buffer;
}
// is this message a PONG message?
bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
bool isPong = false;
char verb[MQTT_PING_VERB_LEN+1];
char end[MQTT_PING_VERB_LEN+1];
int counter = 0;
// clean
memset(verb,0,MQTT_PING_VERB_LEN+1);
memset(end,0,MQTT_PING_VERB_LEN+1);
// make sure this is for us...
char *topic_ep_name = this->getEndpointNameFromTopic(topic);
if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
// parse the payload
for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
sscanf(payload,"%s%d",verb,&counter);
// check the contents to make sure its for us...
this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {
// its a PONG message to our PING...
isPong = true;
}
}
// return isPong status
return isPong;
}
// process this PONG message
void MQTTTransport::processPongMessage(char *payload,int payload_length) {
// DEBUG
this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
// simply increment the counter
++this->m_ping_counter;
// reset counter if maxed
if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
}
// send a PING message
bool MQTTTransport::sendPingMessage() {
bool sent = false;
char message[MAX_MQTT_MESSAGE_LENGTH+1];
// initialize...
memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
// build message
sprintf(message,"ping:%d:",this->m_ping_counter);
// send the message over the ping/pong topic
this->logger()->log("Sending PING on %s Message: %s",this->getTopic(),message);
sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
if (sent) {
// send succeeded
this->logger()->log("PING %d sent successfully",this->m_ping_counter);
this->logger()->blinkTransportTxLED();
// wait for 1 second
wait_ms(1000);
}
else {
// send failed! - reconnect
this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
// attempt reconnect
this->disconnect();
sent = this->connect();
if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
}
// return our status
return sent;
}
// connect up MQTT
bool MQTTTransport::connect() {
char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
if (this->m_connected == false) {
this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
this->m_mqtt = &_mqtt;
if (this->m_mqtt != NULL) {
char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
this->logger()->log("MQTT Connect: ID: %s...",id);
if (this->m_mqtt->connect(id)) {
this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
if (this->m_mqtt->subscribe(this->getTopic())) {
this->logger()->log("MQTT CONNECTED.");
this->m_connected = true;
}
else {
this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
this->logger()->turnLEDRed();
this->m_connected = false;
}
}
else {
this->logger()->log("MQTT Connect: ID: %s FAILED",id);
this->logger()->turnLEDRed();
this->m_connected = false;
}
}
else {
this->logger()->log("MQTT Unable to allocate new instance");
this->logger()->turnLEDRed();
this->m_connected = false;
}
}
else {
this->logger()->log("MQTT already connected (OK)");
}
return this->m_connected;
}
// disconnect from MQTT
bool MQTTTransport::disconnect() {
if (this->m_mqtt != NULL) {
this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
this->m_mqtt->unsubscribe(this->getTopic());
this->logger()->log("MQTT Disconnecting...");
this->m_mqtt->disconnect();
}
else {
this->logger()->log("MQTT already disconnected (OK)");
}
this->m_connected = false;
return true;
}
// check transport and process stuff
void MQTTTransport::checkAndProcess() {
// process any MQTT messages
if (this->m_mqtt != NULL && this->m_connected == true) {
bool connected = this->m_mqtt->loop();
if (connected) {
this->logger()->blinkTransportRxLED();
}
else {
this->disconnect();
this->connect();
}
}
// send a PING if time for it
//--this->m_ping_countdown;
//if (this->m_ping_countdown <= 0) {
// this->logger()->log("MQTT: Sending PING...");
// this->m_ping_countdown = MQTT_PING_COUNTDOWN;
// this->sendPingMessage();
//}
}