MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_ublox_ethernet

Dependencies:   C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

MQTTTransport.cpp

Committer:
ansond
Date:
2014-02-26
Revision:
9:ff877db53cfd
Parent:
8:45f9a920e82c
Child:
13:25448d92c205

File content as of revision 9:ff877db53cfd:

/* Copyright C2013 Doug Anson, MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
 * and associated documentation files the "Software", to deal in the Software without restriction,
 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all copies or
 * substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */
 
 #include "MQTTTransport.h"
 
 // Endpoint Support
 #include "MBEDEndpoint.h"
 
 // EmulatedResourceFactory support
 #include "EmulatedResourceFactory.h"
 
 // splitstring support
 #include "splitstring.h"
 
 // our transmitt instance 
 MQTTTransport *instance =  NULL;

 // MQTT callback to handle received messages
 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
    if (instance != NULL) instance->processMessage(instance->getEndpointNameFromTopic(topic),payload,length);
 }
 
 // our MQTT client endpoint
 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
  
 // default constructor
 MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint) : Transport(error_handler,endpoint) {
     this->m_mqtt = NULL;
     instance = this;
     this->m_map = new MBEDToIOCResourceMap(error_handler);
     this->initTopic();
 }
 
 // default destructor
 MQTTTransport::~MQTTTransport() {
     this->disconnect();
     if (this->m_mqtt != NULL) delete this->m_mqtt;
 }
 
 // init our topic
 void MQTTTransport::initTopic() {
     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
     char *endpoint_name = endpoint->getEndpointName();
     memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
     sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
 }
 
 // get our topic
 char *MQTTTransport::getTopic() { return this->m_topic; }
 
 // pull the endpoint name from the MQTT topic
 char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
     memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1);
     splitstring *tmp = new splitstring(topic); 
     vector<string> data = tmp->split('/');    
     if (data.size() > 0) {
          char *ep  = (char *)data[data.size()-1].c_str();
          strncpy(this->m_endpoint_name,ep,strlen(ep));
     }
     if (tmp != NULL) delete tmp;
     return this->m_endpoint_name;
 }
 
 // process a MQTT Message
 void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int len) {
     char *message_type = "";
     char *message_verb = "";
     char *message_value = "";
     char *message_opt = "";
     
     // get our endpoint
     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
     char *endpoint_name = endpoint->getEndpointName();
     
     // DEBUG
     //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);

     // only respond if its for our node
     if (strcmp(endpoint_name,message_name) == 0) {                  
         // format of the MQTT message:   message_type:verb|Parameter_X:value|keyword:optional_data        
         splitstring *tmp = new splitstring(payload); 
         vector<string> data = tmp->split(':');    
         if (data.size() > 0) message_type  = (char *)data[0].c_str();
         if (data.size() > 1) message_verb  = (char *)data[1].c_str();
         if (data.size() > 2) message_value = (char *)data[2].c_str();
         if (data.size() > 3) message_opt   = (char *)data[3].c_str();
               
         // DEBUG
         //this->logger()->log("Type: %s Name: %s Verb: %s Value: %s",message_type,message_name,message_verb,message_value);
                          
         // load endpoints
         if (message_type != NULL && strcmp(message_type,"Endpoint") == 0) {
             if (message_name != NULL && strcmp(message_name,"all") == 0) {
                 if (message_verb != NULL && strcmp(message_verb,"load") == 0) {
                     // load up our endpoint
                     endpoint->loadEndpoint();
                 }
                 if (message_verb != NULL && strcmp(message_verb,"Update") == 0) {
                     // update our endpoint
                     endpoint->updateEndpoint();
                 }
             }
             else {
                 // destined for our lights?
                 int index = -1;
                 if (message_name != NULL) {
                     index = endpoint->indexOfLight((char *)message_name);
                     if (index >= 0) {
                         if (message_verb != NULL && strcmp(message_verb,"Update") == 0) {
                            // update our endpoint
                            endpoint->updateEndpoint();
                         }
                     }
                 }
             }
         }
         
         // change a resource value
         if (message_type != NULL && strcmp(message_type,"Change") == 0) {
              if (message_name != NULL) {
                 // destined for our lights?
                 int index = endpoint->indexOfLight((char *)message_name);
                 if (index >= 0) {
                     if (message_verb != NULL) {
                         // map the parameter to one of ours
                         char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
                         if (mapped_resource != NULL) {
                             if (message_value != NULL) {
                                 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
                                 bool success = factory->setResourceValue(mapped_resource,message_value);
                                 
                                 // end the resource value back over MQTT
                                 this->sendResult(message_name,message_verb,message_value,success);
                             }
                         }
                     }
                 }
             }
         }
         
         // get a resource value
         if (message_type != NULL && strcmp(message_type,"Get") == 0) {
             if (message_name != NULL) {
                 // destined for our lights?
                 int index = endpoint->indexOfLight((char *)message_name);
                 if (index >= 0) {
                     if (message_verb != NULL) {
                         // map the parameter to one of ours
                         char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
                         if (mapped_resource != NULL) {
                             EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
                             message_value = factory->getResourceValue((char *)mapped_resource);
                             bool success = false; if (message_value != NULL) success = true;
                             
                             // log resource get
                             if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
                             
                             // end the resource value back over MQTT
                             this->sendResult(message_name,message_verb,message_value,success);
                         }
                     }
                 }
             }
         }
         
         // clean up
         if (tmp != NULL) delete tmp;
     }
     else {
         // message not bound for our node
         this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
     }     
 }
 
 // send result back to MQTT
 void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
     if (this->m_connected == true) {
         // send the response back to MQTT
         this->logger()->log("Sending Response back to MQTT...");
         char message[MAX_MQTT_MESSAGE_LENGTH+1];
         memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
         char *str_success = "OK"; if (!success) str_success = "FAILED";
         sprintf(message,"response:{ENDPOINT=%s RESOURCE=%s VALUE=%s}:%s",endpoint_name,resource_name,value,str_success);
         bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
         if (sent) {
             this->logger()->log("Result sent successfully");
             this->logger()->blinkMQTTTransportTxLED();
         }
         else {
             this->logger()->log("Result send FAILED");
         }
     }
     else {
         // unable to send the response 
         this->logger()->log("Unable to send response back to MQTT. Not connected.");
     }
 }
 
 char *MQTTTransport::mapEndpointResourceToIOCResource(char *ioc_name) { return this->m_map->getMBEDMappedName(ioc_name); }
 
 char *MQTTTransport::makeID(char *id_template,char *buffer) {
     srand(time(0));
     srand(rand());
     sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE);
     return buffer;
 }
 
 // connect up MQTT
 bool MQTTTransport::connect() {
     char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
     memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
     if (this->m_connected == false) {
         this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
         this->m_mqtt = &_mqtt;
         if (this->m_mqtt != NULL) {
             char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
             this->logger()->log("MQTT Connect: ID: %s...",id);
             if (this->m_mqtt->connect(id)) {
                 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
                 if (this->m_mqtt->subscribe(this->getTopic())) {
                     this->logger()->log("MQTT CONNECTED.");
                     this->m_connected = true;
                 }
                 else {
                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
                     this->logger()->turnLEDRed();
                     this->m_connected = false;
                 }
             }
             else {
                 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
                 this->logger()->turnLEDRed();
                 this->m_connected = false;
             }
         }
         else {
             this->logger()->log("MQTT Unable to allocate new instance");
             this->logger()->turnLEDRed();
             this->m_connected = false;
         }
     }
     else {
         this->logger()->log("MQTT already connected (OK)");
     }
     return this->m_connected;
 }
 
 // disconnect from MQTT
 bool MQTTTransport::disconnect() {
     if (this->m_mqtt != NULL) {
         this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
         this->m_mqtt->unsubscribe(this->getTopic());
         this->logger()->log("MQTT Disconnecting...");
         this->m_mqtt->disconnect();
     }
     else {
         this->logger()->log("MQTT already disconnected (OK)");
     }
     this->m_connected = false;
     return true;
 }
 
 // check transport and process stuff
 void MQTTTransport::checkAndProcess() {
     if (this->m_mqtt != NULL && this->m_connected == true) {
         this->m_mqtt->loop();
         this->logger()->blinkMQTTTransportRxLED();
     }
 }