mqtt specific components for the impact mbed endpoint library

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

Revision:
0:a3fc1c6ef150
Child:
7:8a4a61202b36
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTTransport.cpp	Wed Mar 26 19:48:35 2014 +0000
@@ -0,0 +1,468 @@
+/* Copyright C2013 Doug Anson, MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
+ * and associated documentation files the "Software", to deal in the Software without restriction,
+ * including without limitation the rights to use, copy, modify, merge, publish, distribute,
+ * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all copies or
+ * substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
+ * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+ 
+ #include "mbed.h"
+ #include "rtos.h"
+ 
+ #include "MQTTTransport.h"
+ 
+ // Endpoint Support
+ #include "MBEDEndpoint.h"
+ 
+ // EmulatedResourceFactory support
+ #include "EmulatedResourceFactory.h"
+ 
+ // Network mutex
+ extern Mutex *network_mutex;
+  
+ // our transmitt instance 
+ MQTTTransport *_mqtt_instance =  NULL;
+
+ // MQTT callback to handle received messages
+ void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
+    char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
+    char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
+    
+    memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+    memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+    memcpy(buffer,payload,length);
+    strcpy(rcv_topic,topic);
+    
+    if (_mqtt_instance != NULL) {
+        if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
+            _mqtt_instance->processPongMessage(buffer,length);
+        }
+        else {
+            memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
+            memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
+            memcpy(buffer,payload,length);
+            strcpy(rcv_topic,topic);
+            _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
+        }
+    }
+ }
+ 
+ // our MQTT client endpoint
+ PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
+  
+ // default constructor
+ MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
+     this->m_mqtt = NULL;
+     _mqtt_instance = this;
+     this->m_map = map;
+     this->m_ping_counter = 1;
+     this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+     this->initTopic();
+ }
+ 
+ // default destructor
+ MQTTTransport::~MQTTTransport() {
+     this->disconnect();
+ }
+  
+ // init our topic
+ void MQTTTransport::initTopic() {
+     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
+     char *endpoint_name = endpoint->getEndpointName();
+     memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
+     sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
+ }
+ 
+ // get our topic
+ char *MQTTTransport::getTopic() { return this->m_topic; }
+ 
+ // get the IOC <--> MBED resource map
+ MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
+ 
+ // pull the endpoint name from the MQTT topic
+ char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
+     if (topic != NULL) {
+         memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1);
+         char trash[MQTT_IOC_TOPIC_LEN+1];
+         char ep[MQTT_IOC_TOPIC_LEN+1];
+         memset(trash,0,MQTT_IOC_TOPIC_LEN+1);
+         memset(ep,0,MQTT_IOC_TOPIC_LEN+1);
+         bool done = false;
+         int length = 0; if (topic != NULL) length = strlen(topic);
+         for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; }
+         sscanf(topic,"%s%s",trash,ep);
+         //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep);
+         if (strlen(ep) > 0) {
+               if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) {
+                   // just insert the name and let the parser determine if its for us or not...
+                   strncpy(this->m_endpoint_name,ep,strlen(ep));
+               }
+               else {
+                   // this is a broadcast message - so we need to process it
+                   MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
+                   char *endpoint_name = endpoint->getEndpointName();
+                   strcpy(this->m_endpoint_name,endpoint_name);
+               }
+          }
+          //this->logger()->log("MQTT Topic (discovered): %s  Original: %s",this->m_endpoint_name,topic);
+          return this->m_endpoint_name;
+       }
+       //this->logger()->log("MQTT Topic (discovered): NULL  Original: %s",topic);
+       return NULL;
+ }
+ 
+ // process a MQTT Message
+ void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) {
+     char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1];
+     char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1];
+     char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1];
+     char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1];
+     
+     // initialize
+     memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
+     memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
+     memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
+     memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
+      
+     // get our endpoint
+     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
+     char *endpoint_name = endpoint->getEndpointName();
+     
+     // DEBUG
+     //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
+
+     // only respond if its for our node
+     if (strcmp(endpoint_name,message_name) == 0) {                  
+         // format of the MQTT message:   message_type:verb|Parameter_X:value|keyword:optional_data  
+         char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
+         memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); 
+         memcpy(buffer,payload,payload_length); 
+         int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count;
+         for(int i=0;i<payload_length;++i) {
+             if (buffer[i] == ':') {
+                 if (i < (payload_length-1)) buffer[i] = ' ';
+                 else buffer[i] = '\0';
+             }
+         }     
+         if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb);
+         if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value);
+         if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt);
+                        
+         // DEBUG
+         //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length);
+         //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer));
+         //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value);
+                          
+         // load endpoints
+         if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) {                 // Endpoint
+             if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) {     // load
+                 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) {   // all
+                    // load up our endpoints
+                    endpoint->loadEndpoints();
+                    endpoint->updateEndpoints();
+                 }
+                 else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) {
+                    // load up our endpoints (us only)
+                    endpoint->loadEndpoints();
+                    endpoint->updateEndpoints();
+                 }
+             }
+             
+             else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {  // update
+                 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) {       // all
+                     // update our endpoints
+                     endpoint->updateEndpoints();
+                 }
+                 else {
+                     // update just our endpoint
+                     int index = -1;
+                     if (message_name != NULL) {
+                         index = endpoint->indexOfLight((char *)message_name);
+                         if (index >= 0) {
+                             if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
+                                // update our endpoint
+                                endpoint->updateEndpoints(index);
+                             }
+                         }
+                     }
+                 }
+             }
+         }
+         
+         // change a resource value
+         if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
+              if (message_name != NULL) {
+                 // destined for our lights?
+                 int index = endpoint->indexOfLight((char *)message_name);
+                 if (index >= 0) {
+                     if (message_verb != NULL) {
+                         // map the parameter to one of ours
+                         char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
+                         if (mapped_resource != NULL) {
+                             if (message_value != NULL) {
+                                 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
+                                 bool success = factory->setResourceValue(mapped_resource,message_value);
+                                 
+                                 // end the resource value back over MQTT
+                                 this->sendResult(message_name,message_verb,message_value,success);
+                             }
+                         }
+                     }
+                 }
+             }
+         }
+         
+         // get a resource value
+         if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
+             if (message_name != NULL) {
+                 // destined for our lights?
+                 int index = endpoint->indexOfLight((char *)message_name);
+                 if (index >= 0) {
+                     if (message_verb != NULL) {
+                         // map the parameter to one of ours
+                         char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
+                         if (mapped_resource != NULL) {
+                             EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
+                             strcpy(message_value,factory->getResourceValue((char *)mapped_resource));
+                             bool success = false; if (message_value != NULL) success = true;
+                             
+                             // log resource get
+                             if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
+                             
+                             // end the resource value back over MQTT
+                             this->sendResult(message_name,message_verb,message_value,success);
+                         }
+                     }
+                 }
+             }
+         }
+     }
+     else {
+         // message not bound for our node
+         //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
+         ;
+     }     
+ }
+ 
+ // send result back to MQTT
+ void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
+     if (this->m_connected == true) {
+         // send the response back to MQTT
+         this->logger()->log("Sending Response back to MQTT...");
+         char message[MAX_MQTT_MESSAGE_LENGTH+1];
+         memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
+         char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
+         sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
+         if (network_mutex != NULL) network_mutex->lock();
+         bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
+         if (network_mutex != NULL) network_mutex->unlock();
+         if (sent) {
+             this->logger()->log("Result sent successfully");
+             this->logger()->blinkTransportTxLED();
+         }
+         else {
+             this->logger()->log("Result send FAILED");
+         }
+     }
+     else {
+         // unable to send the response 
+         this->logger()->log("Unable to send response back to MQTT. Not connected.");
+     }
+ }
+ 
+ char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
+ 
+ char *MQTTTransport::makeID(char *id_template,char *buffer) {
+     MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
+     int instance_id = rand()%100;
+     if (endpoint != NULL) instance_id = endpoint->getInstanceID();
+     srand(time(0));
+     srand(rand());
+     sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id);
+     return buffer;
+ }
+ 
+ // is this message a PONG message?
+ bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
+     bool isPong = false;
+     char verb[MQTT_PING_VERB_LEN+1];
+     char end[MQTT_PING_VERB_LEN+1];
+     int counter = 0;
+     
+     // clean
+     memset(verb,0,MQTT_PING_VERB_LEN+1);
+     memset(end,0,MQTT_PING_VERB_LEN+1);
+         
+     // make sure this is for us...
+     char *topic_ep_name = this->getEndpointNameFromTopic(topic);
+     if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
+         // parse the payload
+         for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
+         sscanf(payload,"%s%d",verb,&counter);
+         
+         // check the contents to make sure its for us...
+         //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
+         if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {     
+             // its a PONG message to our PING... 
+             isPong = true;
+         }
+     }
+     
+     // return isPong status
+     return isPong;
+ }
+ 
+ 
+ // process this PONG message
+ void MQTTTransport::processPongMessage(char *payload,int payload_length) {
+     // DEBUG
+     //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
+             
+     // simply increment the counter
+     ++this->m_ping_counter;
+     
+     // reset counter if maxed 
+     if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
+ }
+ 
+ // send a PING message
+ bool MQTTTransport::sendPingMessage() {
+     bool sent = false;
+     char message[MAX_MQTT_MESSAGE_LENGTH+1];
+     
+     // initialize...
+     memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
+     
+     // build message
+     sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter);
+     
+     // send the message over the ping/pong topic
+     //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
+     if (network_mutex != NULL) network_mutex->lock();
+     sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
+     if (network_mutex != NULL) network_mutex->unlock();
+     if (sent) {
+         // send succeeded
+         //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
+         this->logger()->blinkTransportTxLED();
+         
+         // wait for 1 second
+         wait_ms(1000);
+     }
+     else {
+         // send failed! - reconnect
+         this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
+         
+         // attempt reconnect
+         this->logger()->log("PING send failed - re-connecting MQTT...");
+         this->disconnect();
+         sent = this->connect();
+         if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
+         else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter);
+     }
+     
+     // return our status
+     return sent;
+ }
+ 
+ // connect up MQTT
+ bool MQTTTransport::connect() {
+     if (network_mutex != NULL) network_mutex->lock();
+     char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
+     memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
+     if (this->m_connected == false) {
+         this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
+         this->m_mqtt = &_mqtt;
+         if (this->m_mqtt != NULL) {
+             char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
+             this->logger()->log("MQTT Connect: ID: %s...",id);
+             if (this->m_mqtt->connect(id)) {
+                 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
+                 if (this->m_mqtt->subscribe(this->getTopic())) {
+                    if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
+                        this->logger()->log("MQTT CONNECTED.");
+                        this->m_connected = true;
+                    }
+                    else {
+                        this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
+                        this->logger()->turnLEDRed();
+                        this->m_connected = false;
+                    }
+                 }
+                 else {
+                     this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
+                     this->logger()->turnLEDRed();
+                     this->m_connected = false;
+                 }
+             }
+             else {
+                 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
+                 this->logger()->turnLEDRed();
+                 this->m_connected = false;
+             }
+         }
+         else {
+             this->logger()->log("MQTT Unable to allocate new instance");
+             this->logger()->turnLEDRed();
+             this->m_connected = false;
+         }
+     }
+     else {
+         this->logger()->log("MQTT already connected (OK)");
+     }
+     if (network_mutex != NULL) network_mutex->unlock();
+     return this->m_connected;
+ }
+ 
+ // disconnect from MQTT
+ bool MQTTTransport::disconnect() {
+     if (this->m_mqtt != NULL) {
+         this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
+         if (network_mutex != NULL) network_mutex->lock();
+         this->m_mqtt->unsubscribe(this->getTopic());
+         if (network_mutex != NULL) network_mutex->unlock();
+         this->logger()->log("MQTT Disconnecting...");
+         this->m_mqtt->disconnect();
+     }
+     else {
+         this->logger()->log("MQTT already disconnected (OK)");
+     }
+     this->m_connected = false;
+     return true;
+ }
+ 
+ // check transport and process stuff
+ void MQTTTransport::checkAndProcess() {
+     // process any MQTT messages
+     if (this->m_mqtt != NULL && this->m_connected == true) {
+         if (network_mutex != NULL) network_mutex->lock();
+         bool connected = this->m_mqtt->loop();
+         if (network_mutex != NULL) network_mutex->unlock();
+         if (connected) {
+            this->logger()->blinkTransportRxLED();
+         }
+         else {
+            this->logger()->log("Attempting reconnection to MQTT in checkAndProcess...");
+            this->disconnect();
+            this->connect();
+         }
+     }
+     
+     // send a PING if time for it
+     --this->m_ping_countdown;
+     if (this->m_ping_countdown <= 0) {
+         //this->logger()->log("MQTT: Sending PING...");
+         this->m_ping_countdown = MQTT_PING_COUNTDOWN;
+         this->sendPingMessage();
+     }
+ }
\ No newline at end of file