mqtt specific components for the impact mbed endpoint library

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

MQTTTransport.cpp

Committer:
ansond
Date:
2014-04-10
Revision:
32:9a024a6af2fb
Parent:
31:e5950e0677be
Child:
42:297585f8e7bd

File content as of revision 32:9a024a6af2fb:

/* Copyright C2013 Doug Anson, MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
 * and associated documentation files the "Software", to deal in the Software without restriction,
 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all copies or
 * substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */
 
 #include "mbed.h"
 #include "rtos.h"
 
 #include "MQTTTransport.h"
 
 // Endpoint Support
 #include "MBEDEndpoint.h"
 
 // EmulatedResourceFactory support
 #include "EmulatedResourceFactory.h"
 
 // Network mutex
 extern Mutex *network_mutex;
  
 // our transmitt instance 
 MQTTTransport *_mqtt_instance =  NULL;

 // MQTT callback to handle received messages
 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
    char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
    char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
    
    memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
    memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
    memcpy(buffer,payload,length);
    strcpy(rcv_topic,topic);
    
    if (_mqtt_instance != NULL) {
        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
            _mqtt_instance->processPongMessage(buffer,length);
        }
        else {
            memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
            memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
            memcpy(buffer,payload,length);
            strcpy(rcv_topic,topic);
            _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
        }
    }
 }
 
 // our MQTT client endpoint
 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
  
 // default constructor
 MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
     this->m_mqtt = NULL;
     _mqtt_instance = this;
     this->m_map = map;
     this->m_ping_counter = 1;
     this->m_ping_countdown = MQTT_PING_COUNTDOWN;
     this->initTopic();
 }
 
 // default destructor
 MQTTTransport::~MQTTTransport() {
     this->disconnect();
 }
  
 // init our topic
 void MQTTTransport::initTopic() {
     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
     char *endpoint_name = endpoint->getEndpointName();
     memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
     sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
 }
 
 // get our topic
 char *MQTTTransport::getTopic() { return this->m_topic; }
 
 // get the IOC <--> MBED resource map
 MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
 
 // pull the endpoint name from the MQTT topic
 char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
     if (topic != NULL) {
         memset(this->m_endpoint_name,0,PERSONALITY_NAME_LEN+1);
         char trash[MQTT_IOC_TOPIC_LEN+1];
         char ep[MQTT_IOC_TOPIC_LEN+1];
         memset(trash,0,MQTT_IOC_TOPIC_LEN+1);
         memset(ep,0,MQTT_IOC_TOPIC_LEN+1);
         bool done = false;
         int length = 0; if (topic != NULL) length = strlen(topic);
         for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; }
         sscanf(topic,"%s%s",trash,ep);
         //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep);
         if (strlen(ep) > 0) {
               if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) {
                   // just insert the name and let the parser determine if its for us or not...
                   strncpy(this->m_endpoint_name,ep,strlen(ep));
               }
               else {
                   // this is a broadcast message - so we need to process it
                   MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
                   char *endpoint_name = endpoint->getEndpointName();
                   strcpy(this->m_endpoint_name,endpoint_name);
               }
          }
          //this->logger()->log("MQTT Topic (discovered): %s  Original: %s",this->m_endpoint_name,topic);
          return this->m_endpoint_name;
       }
       //this->logger()->log("MQTT Topic (discovered): NULL  Original: %s",topic);
       return NULL;
 }
 
 // process a MQTT Message
 void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) {
     char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1];
     char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1];
     char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1];
     char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1];
     
     // initialize
     memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
     memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
     memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
     memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
      
     // get our endpoint
     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
     char *endpoint_name = endpoint->getEndpointName();
     
     // DEBUG
     //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);

     // only respond if its for our node
     if (strcmp(endpoint_name,message_name) == 0) {                  
         // format of the MQTT message:   message_type:verb|Parameter_X:value|keyword:optional_data  
         char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
         memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); 
         memcpy(buffer,payload,payload_length); 
         int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count;
         for(int i=0;i<payload_length;++i) {
             if (buffer[i] == ':') {
                 if (i < (payload_length-1)) buffer[i] = ' ';
                 else buffer[i] = '\0';
             }
         }     
         if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb);
         if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value);
         if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt);
                        
         // DEBUG
         //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length);
         //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer));
         //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value);
                          
         // load endpoints
         if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) {                 // Endpoint
             if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) {     // load
                 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) {   // all
                    // load up our endpoints
                    endpoint->loadPersonalities();
                    endpoint->updatePersonalities();
                 }
                 else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) {
                    // load up our endpoints (us only)
                    endpoint->loadPersonalities();
                    endpoint->updatePersonalities();
                 }
             }
             
             else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {  // update
                 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) {       // all
                     // update our endpoints
                     endpoint->updatePersonalities();
                 }
                 else {
                     // update just our endpoint
                     int index = -1;
                     if (message_name != NULL) {
                         index = endpoint->indexOfPersonality((char *)message_name);
                         if (index >= 0) {
                             if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
                                // update our endpoint
                                endpoint->updatePersonality(index);
                             }
                         }
                     }
                 }
             }
         }
         
         // change a resource value
         if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
              if (message_name != NULL) {
                 // destined for our lights?
                 int index = endpoint->indexOfPersonality((char *)message_name);
                 if (index >= 0) {
                     if (message_verb != NULL) {
                         // map the parameter to one of ours
                         char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
                         if (mapped_resource != NULL) {
                             if (message_value != NULL) {
                                 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
                                 bool success = factory->setResourceValue(mapped_resource,message_value);
                                 
                                 // end the resource value back over MQTT
                                 this->sendResult(message_name,message_verb,message_value,success);
                             }
                         }
                         
                         // for Dimming, we also want to refresh our record (in whole) at the IOC
                         if (this->isDimmingResource(message_verb)) {
                             // send a fresh update to the IOC - just us...
                             int index = endpoint->indexOfPersonality((char *)message_name);
                             if (index >= 0) endpoint->updatePersonality(index);
                         }
                     }
                 }
             }
         }
         
         // get a resource value
         if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
             if (message_name != NULL) {
                 // destined for our lights?
                 int index = endpoint->indexOfPersonality((char *)message_name);
                 if (index >= 0) {
                     if (message_verb != NULL) {
                         // map the parameter to one of ours
                         char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
                         if (mapped_resource != NULL) {
                             EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
                             strcpy(message_value,factory->getResourceValue((char *)mapped_resource));
                             bool success = false; if (message_value != NULL) success = true;
                             
                             // log resource get
                             if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
                             
                             // end the resource value back over MQTT
                             this->sendResult(message_name,message_verb,message_value,success);
                         }
                     }
                 }
             }
         }
     }
     else {
         // message not bound for our node
         //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
         ;
     }     
 }
 
 // is this the dimming resource?
 bool MQTTTransport::isDimmingResource(char *resource) {
     bool isDimming = false;
  
     if (resource != NULL && strcmp(resource,"Dimming") == 0) isDimming = true;
        
     return isDimming;
 }
 
 // send result back to MQTT
 void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
     if (this->m_connected == true) {
         // send the response back to MQTT
         this->logger()->log("Sending Response back to MQTT...");
         char message[MAX_MQTT_MESSAGE_LENGTH+1];
         memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
         char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
         sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
         if (network_mutex != NULL) network_mutex->lock();
         bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
         if (network_mutex != NULL) network_mutex->unlock();
         if (sent) {
             this->logger()->log("Result sent successfully");
             this->logger()->blinkTransportTxLED();
         }
         else {
             this->logger()->log("Result send FAILED");
         }
     }
     else {
         // unable to send the response 
         this->logger()->log("Unable to send response back to MQTT. Not connected.");
     }
 }
 
 char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
 
 char *MQTTTransport::makeID(char *id_template,char *buffer) {
     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
     int instance_id = rand()%100;
     if (endpoint != NULL) instance_id = endpoint->getInstanceID();
     srand(time(0));
     srand(rand());
     sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id);
     return buffer;
 }
 
 // is this message a PONG message?
 bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
     bool isPong = false;
     char verb[MQTT_PING_VERB_LEN+1];
     char end[MQTT_PING_VERB_LEN+1];
     int counter = 0;
     
     // clean
     memset(verb,0,MQTT_PING_VERB_LEN+1);
     memset(end,0,MQTT_PING_VERB_LEN+1);
         
     // make sure this is for us...
     char *topic_ep_name = this->getEndpointNameFromTopic(topic);
     if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
         // parse the payload
         for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
         sscanf(payload,"%s%d",verb,&counter);
         
         // check the contents to make sure its for us...
         //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
         if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {     
             // its a PONG message to our PING... 
             isPong = true;
         }
     }
     
     // return isPong status
     return isPong;
 }
 
 
 // process this PONG message
 void MQTTTransport::processPongMessage(char *payload,int payload_length) {
     // DEBUG
     //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
             
     // simply increment the counter
     ++this->m_ping_counter;
     
     // reset counter if maxed 
     if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
 }
 
 // send a PING message
 bool MQTTTransport::sendPingMessage() {
     bool sent = false;
     char message[MAX_MQTT_MESSAGE_LENGTH+1];
     
     // initialize...
     memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
     
     // build message
     sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter);
     
     // send the message over the ping/pong topic
     //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
     if (network_mutex != NULL) network_mutex->lock();
     sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
     if (network_mutex != NULL) network_mutex->unlock();
     if (sent) {
         // send succeeded
         //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
         this->logger()->blinkTransportTxLED();
         
         // wait for 1 second
         wait_ms(1000);
     }
     else {
         // fail silently now...
         ; 
         
         // send failed! - reconnect
         //this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
         
         // attempt reconnect
         //this->logger()->log("PING send failed - re-connecting MQTT...");
         //this->disconnect();
         //sent = this->connect();
         //if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
         //else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter);
     }
     
     // return our status
     return sent;
 }
 
 static char _mqtt_id[MQTT_ENDPOINT_IDLEN+1];
 
 // connect up MQTT
 bool MQTTTransport::connect() {
     memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
     //if (network_mutex != NULL) network_mutex->lock();
     if (this->m_connected == false) {
         this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
         this->m_mqtt = &_mqtt;
         if (this->m_mqtt != NULL) {
             char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
             this->logger()->log("MQTT Connect: ID: %s...",id);
             if (this->m_mqtt->connect(id)) {
                 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
                 if (this->m_mqtt->subscribe(this->getTopic())) {
                    if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
                        this->logger()->log("MQTT CONNECTED.");
                        this->m_connected = true;
                    }
                    else {
                        this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
                        this->logger()->turnLEDRed();
                        this->m_connected = false;
                    }
                 }
                 else {
                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
                     this->logger()->turnLEDRed();
                     this->m_connected = false;
                 }
             }
             else {
                 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
                 this->logger()->turnLEDRed();
                 this->m_connected = false;
             }
         }
         else {
             this->logger()->log("MQTT Unable to allocate new instance");
             this->logger()->turnLEDRed();
             this->m_connected = false;
         }
     }
     else {
         this->logger()->log("MQTT already connected (OK)");
     }
     //if (network_mutex != NULL) network_mutex->unlock();
     return this->m_connected;
 }
 
 // disconnect from MQTT
 bool MQTTTransport::disconnect() {
     if (this->m_mqtt != NULL) {
         this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
         if (network_mutex != NULL) network_mutex->lock();
         this->m_mqtt->unsubscribe(this->getTopic());
         if (network_mutex != NULL) network_mutex->unlock();
         this->logger()->log("MQTT Disconnecting...");
         this->m_mqtt->disconnect();
     }
     else {
         this->logger()->log("MQTT already disconnected (OK)");
     }
     this->m_connected = false;
     return true;
 }
 
 // check transport and process stuff
 void MQTTTransport::checkAndProcess() {
     // process any MQTT messages
     if (this->m_mqtt != NULL && this->m_connected == true) {
         if (network_mutex != NULL) network_mutex->lock();
         bool connected = this->m_mqtt->loop();
         if (network_mutex != NULL) network_mutex->unlock();
         if (connected) {
            this->logger()->blinkTransportRxLED();
         }
         //else {
         //   this->logger()->log("Attempting reconnection to MQTT in checkAndProcess...");
         //   this->disconnect();
         //   Thread::wait(15000);
         //   this->connect();
         //}
     }
     
     // send a PING if time for it
     --this->m_ping_countdown;
     if (this->m_ping_countdown <= 0) {
         //this->logger()->log("MQTT: Sending PING...");
         this->m_ping_countdown = MQTT_PING_COUNTDOWN;
         this->sendPingMessage();
     }
 }