Doug Anson / endpoint_mqtt

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTTransport.cpp Source File

MQTTTransport.cpp

00001 /* Copyright C2013 Doug Anson, MIT License
00002  *
00003  * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
00004  * and associated documentation files the "Software", to deal in the Software without restriction,
00005  * including without limitation the rights to use, copy, modify, merge, publish, distribute,
00006  * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
00007  * furnished to do so, subject to the following conditions:
00008  *
00009  * The above copyright notice and this permission notice shall be included in all copies or
00010  * substantial portions of the Software.
00011  *
00012  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
00013  * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
00014  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
00015  * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00016  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
00017  */
00018  
00019  #include "mbed.h"
00020  
00021  #include "MQTTTransport.h"
00022  
00023  // Endpoint Support
00024  #include "MBEDEndpoint.h"
00025  
00026  // EmulatedResourceFactory support
00027  #include "EmulatedResourceFactory.h"
00028  
00029  #ifdef NETWORK_MUTEX
00030  #include "rtos.h"
00031  extern Mutex *network_mutex;
00032  #endif
00033   
00034  // our transmitt instance 
00035  MQTTTransport *_mqtt_instance =  NULL;
00036 
00037  // MQTT callback to handle received messages
00038  void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
00039     char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
00040     char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
00041     
00042     memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
00043     memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
00044     memcpy(buffer,payload,length);
00045     strcpy(rcv_topic,topic);
00046     
00047     if (_mqtt_instance != NULL) {
00048         if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
00049             _mqtt_instance->processPongMessage(buffer,length);
00050         }
00051         else {
00052             memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
00053             memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
00054             memcpy(buffer,payload,length);
00055             strcpy(rcv_topic,topic);
00056             _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
00057         }
00058     }
00059  }
00060  
00061  // our MQTT client endpoint
00062  PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
00063   
00064  // default constructor
00065  MQTTTransport::MQTTTransport(Logger *logger,void *endpoint,MBEDToIOCResourceMap *map) : Transport(logger,endpoint) {
00066      this->m_mqtt = NULL;
00067      _mqtt_instance = this;
00068      this->m_map = map;
00069      this->m_ping_counter = 1;
00070      this->m_ping_countdown = MQTT_PING_COUNTDOWN;
00071      this->initTopic();
00072  }
00073  
00074  // default destructor
00075  MQTTTransport::~MQTTTransport() {
00076      this->disconnect();
00077  }
00078   
00079  // init our topic
00080  void MQTTTransport::initTopic() {
00081      MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
00082      char *endpoint_name = endpoint->getEndpointName();
00083      memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
00084      sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
00085  }
00086  
00087  // get our topic
00088  char *MQTTTransport::getTopic() { return this->m_topic; }
00089  
00090  // get the IOC <--> MBED resource map
00091  MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
00092  
00093  // pull the endpoint name from the MQTT topic
00094  char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
00095      if (topic != NULL) {
00096          memset(this->m_endpoint_name,0,PERSONALITY_NAME_LEN+1);
00097          char trash[MQTT_IOC_TOPIC_LEN+1];
00098          char ep[MQTT_IOC_TOPIC_LEN+1];
00099          memset(trash,0,MQTT_IOC_TOPIC_LEN+1);
00100          memset(ep,0,MQTT_IOC_TOPIC_LEN+1);
00101          bool done = false;
00102          int length = 0; if (topic != NULL) length = strlen(topic);
00103          for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; }
00104          sscanf(topic,"%s%s",trash,ep);
00105          //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep);
00106          if (strlen(ep) > 0) {
00107                if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) {
00108                    // just insert the name and let the parser determine if its for us or not...
00109                    strncpy(this->m_endpoint_name,ep,strlen(ep));
00110                }
00111                else {
00112                    // this is a broadcast message - so we need to process it
00113                    MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
00114                    char *endpoint_name = endpoint->getEndpointName();
00115                    strcpy(this->m_endpoint_name,endpoint_name);
00116                }
00117           }
00118           //this->logger()->log("MQTT Topic (discovered): %s  Original: %s",this->m_endpoint_name,topic);
00119           return this->m_endpoint_name;
00120        }
00121        //this->logger()->log("MQTT Topic (discovered): NULL  Original: %s",topic);
00122        return NULL;
00123  }
00124  
00125  // process a MQTT Message
00126  void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) {
00127      char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1];
00128      char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1];
00129      char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1];
00130      char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1];
00131      
00132      // initialize
00133      memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
00134      memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
00135      memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
00136      memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
00137       
00138      // get our endpoint
00139      MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
00140      char *endpoint_name = endpoint->getEndpointName();
00141      
00142      // DEBUG
00143      //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
00144 
00145      // only respond if its for our node
00146      if (strcmp(endpoint_name,message_name) == 0) {                  
00147          // format of the MQTT message:   message_type:verb|Parameter_X:value|keyword:optional_data  
00148          char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
00149          memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); 
00150          memcpy(buffer,payload,payload_length); 
00151          int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count;
00152          for(int i=0;i<payload_length;++i) {
00153              if (buffer[i] == ':') {
00154                  if (i < (payload_length-1)) buffer[i] = ' ';
00155                  else buffer[i] = '\0';
00156              }
00157          }     
00158          if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb);
00159          if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value);
00160          if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt);
00161                         
00162          // DEBUG
00163          //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length);
00164          //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer));
00165          //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value);
00166                           
00167          // load endpoints
00168          if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) {                 // Endpoint
00169              if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) {     // load
00170                  if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) {   // all
00171                     // load up our endpoints
00172                     endpoint->loadPersonalities();
00173                     endpoint->updatePersonalities();
00174                  }
00175                  else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) {
00176                     // load up our endpoints (us only)
00177                     endpoint->loadPersonalities();
00178                     endpoint->updatePersonalities();
00179                  }
00180              }
00181              
00182              else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {  // update
00183                  if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) {       // all
00184                      // update our endpoints
00185                      endpoint->updatePersonalities();
00186                  }
00187                  else {
00188                      // update just our endpoint
00189                      int index = -1;
00190                      if (message_name != NULL) {
00191                          index = endpoint->indexOfPersonality((char *)message_name);
00192                          if (index >= 0) {
00193                              if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
00194                                 // update our endpoint
00195                                 endpoint->updatePersonality(index);
00196                              }
00197                          }
00198                      }
00199                  }
00200              }
00201          }
00202          
00203          // change a resource value
00204          if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
00205               if (message_name != NULL) {
00206                  // destined for our lights?
00207                  int index = endpoint->indexOfPersonality((char *)message_name);
00208                  if (index >= 0) {
00209                      if (message_verb != NULL) {
00210                          // map the parameter to one of ours
00211                          char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
00212                          if (mapped_resource != NULL) {
00213                              if (message_value != NULL) {
00214                                  EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
00215                                  bool success = factory->setResourceValue(mapped_resource,message_value);
00216                                  
00217                                  // end the resource value back over MQTT
00218                                  this->sendResult(message_name,message_verb,message_value,success);
00219                              }
00220                          }
00221                          
00222                          // for Dimming, we also want to refresh our record (in whole) at the IOC
00223                          if (this->isDimmingResource(message_verb)) {
00224                              // send a fresh update to the IOC - just us...
00225                              int index = endpoint->indexOfPersonality((char *)message_name);
00226                              if (index >= 0) endpoint->updatePersonality(index);
00227                          }
00228                      }
00229                  }
00230              }
00231          }
00232          
00233          // get a resource value
00234          if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
00235              if (message_name != NULL) {
00236                  // destined for our lights?
00237                  int index = endpoint->indexOfPersonality((char *)message_name);
00238                  if (index >= 0) {
00239                      if (message_verb != NULL) {
00240                          // map the parameter to one of ours
00241                          char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
00242                          if (mapped_resource != NULL) {
00243                              EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
00244                              strcpy(message_value,factory->getResourceValue((char *)mapped_resource));
00245                              bool success = false; if (message_value != NULL) success = true;
00246                              
00247                              // log resource get
00248                              if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
00249                              
00250                              // end the resource value back over MQTT
00251                              this->sendResult(message_name,message_verb,message_value,success);
00252                          }
00253                      }
00254                  }
00255              }
00256          }
00257      }
00258      else {
00259          // message not bound for our node
00260          //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
00261          ;
00262      }     
00263  }
00264  
00265  // is this the dimming resource?
00266  bool MQTTTransport::isDimmingResource(char *resource) {
00267      bool isDimming = false;
00268   
00269      if (resource != NULL && strcmp(resource,"Dimming") == 0) isDimming = true;
00270         
00271      return isDimming;
00272  }
00273  
00274  // send result back to MQTT
00275  void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
00276      if (this->m_connected == true) {
00277          // send the response back to MQTT
00278          this->logger()->log("Sending Response back to MQTT...");
00279          char message[MAX_MQTT_MESSAGE_LENGTH+1];
00280          memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
00281          char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
00282          sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
00283  #ifdef NETWORK_MUTEX
00284          if (network_mutex != NULL) network_mutex->lock();
00285  #endif
00286          bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
00287  #ifdef NETWORK_MUTEX
00288          if (network_mutex != NULL) network_mutex->unlock();
00289  #endif
00290          if (sent) {
00291              this->logger()->log("Result sent successfully");
00292              this->logger()->blinkTransportTxLED();
00293          }
00294          else {
00295              this->logger()->log("Result send FAILED");
00296          }
00297      }
00298      else {
00299          // unable to send the response 
00300          this->logger()->log("Unable to send response back to MQTT. Not connected.");
00301      }
00302  }
00303  
00304  char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
00305  
00306  char *MQTTTransport::makeID(char *id_template,char *buffer) {
00307      MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
00308      int instance_id = rand()%100;
00309      if (endpoint != NULL) instance_id = endpoint->getInstanceID();
00310      srand(time(0));
00311      srand(rand());
00312      sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id);
00313      return buffer;
00314  }
00315  
00316  // is this message a PONG message?
00317  bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
00318      bool isPong = false;
00319      char verb[MQTT_PING_VERB_LEN+1];
00320      char end[MQTT_PING_VERB_LEN+1];
00321      int counter = 0;
00322      
00323      // clean
00324      memset(verb,0,MQTT_PING_VERB_LEN+1);
00325      memset(end,0,MQTT_PING_VERB_LEN+1);
00326          
00327      // make sure this is for us...
00328      char *topic_ep_name = this->getEndpointNameFromTopic(topic);
00329      if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
00330          // parse the payload
00331          for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
00332          sscanf(payload,"%s%d",verb,&counter);
00333          
00334          // check the contents to make sure its for us...
00335          //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
00336          if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {     
00337              // its a PONG message to our PING... 
00338              isPong = true;
00339          }
00340      }
00341      
00342      // return isPong status
00343      return isPong;
00344  }
00345  
00346  
00347  // process this PONG message
00348  void MQTTTransport::processPongMessage(char *payload,int payload_length) {
00349      // DEBUG
00350      //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
00351              
00352      // simply increment the counter
00353      ++this->m_ping_counter;
00354      
00355      // reset counter if maxed 
00356      if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
00357  }
00358  
00359  // send a PING message
00360  bool MQTTTransport::sendPingMessage() {
00361      bool sent = false;
00362      char message[MAX_MQTT_MESSAGE_LENGTH+1];
00363      
00364      // initialize...
00365      memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
00366      
00367      // build message
00368      sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter);
00369      
00370      // send the message over the ping/pong topic
00371      //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
00372  #ifdef NETWORK_MUTEX
00373      if (network_mutex != NULL) network_mutex->lock();
00374  #endif
00375      sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
00376  #ifdef NETWORK_MUTEX
00377      if (network_mutex != NULL) network_mutex->unlock();
00378  #endif
00379      if (sent) {
00380          // send succeeded
00381          //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
00382          this->logger()->blinkTransportTxLED();
00383          
00384          // wait for 1 second
00385          Thread::wait(1000);
00386      }
00387      else {
00388          // fail silently now...
00389          ; 
00390          
00391          // send failed! - reconnect
00392          //this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
00393          
00394          // attempt reconnect
00395          //this->logger()->log("PING send failed - re-connecting MQTT...");
00396          //this->disconnect();
00397          //sent = this->connect();
00398          //if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
00399          //else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter);
00400      }
00401      
00402      // return our status
00403      return sent;
00404  }
00405  
00406  static char _mqtt_id[MQTT_ENDPOINT_IDLEN+1];
00407  
00408  // connect up MQTT
00409  bool MQTTTransport::connect() {
00410      memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
00411      if (this->m_connected == false) {
00412          this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
00413          this->m_mqtt = &_mqtt;
00414          if (this->m_mqtt != NULL) {
00415              char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
00416              this->logger()->log("MQTT Connect: ID: %s...",id);
00417              if (this->m_mqtt->connect(id)) {
00418                  this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
00419                  if (this->m_mqtt->subscribe(this->getTopic())) {
00420                     if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
00421                         this->logger()->log("MQTT CONNECTED.");
00422                         this->m_connected = true;
00423                     }
00424                     else {
00425                         this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
00426                         this->logger()->turnLEDRed();
00427                         this->m_connected = false;
00428                     }
00429                  }
00430                  else {
00431                      this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
00432                      this->logger()->turnLEDRed();
00433                      this->m_connected = false;
00434                  }
00435              }
00436              else {
00437                  this->logger()->log("MQTT Connect: ID: %s FAILED",id);
00438                  this->logger()->turnLEDRed();
00439                  this->m_connected = false;
00440              }
00441          }
00442          else {
00443              this->logger()->log("MQTT Unable to allocate new instance");
00444              this->logger()->turnLEDRed();
00445              this->m_connected = false;
00446          }
00447      }
00448      else {
00449          this->logger()->log("MQTT already connected (OK)");
00450      }
00451      return this->m_connected;
00452  }
00453  
00454  // disconnect from MQTT
00455  bool MQTTTransport::disconnect() {
00456      if (this->m_mqtt != NULL) {
00457          this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
00458  #ifdef NETWORK_MUTEX
00459          if (network_mutex != NULL) network_mutex->lock();
00460  #endif
00461          this->m_mqtt->unsubscribe(this->getTopic());
00462  #ifdef NETWORK_MUTEX
00463          if (network_mutex != NULL) network_mutex->unlock();
00464  #endif
00465          this->logger()->log("MQTT Disconnecting...");
00466          this->m_mqtt->disconnect();
00467      }
00468      else {
00469          this->logger()->log("MQTT already disconnected (OK)");
00470      }
00471      this->m_connected = false;
00472      return true;
00473  }
00474  
00475  // check transport and process stuff
00476  void MQTTTransport::checkAndProcess() {
00477      // process any MQTT messages
00478      if (this->m_mqtt != NULL && this->m_connected == true) {
00479  #ifdef NETWORK_MUTEX
00480          if (network_mutex != NULL) network_mutex->lock();
00481  #endif
00482          bool connected = this->m_mqtt->loop();
00483  #ifdef NETWORK_MUTEX
00484          if (network_mutex != NULL) network_mutex->unlock();
00485  #endif
00486          if (connected) {
00487             this->logger()->blinkTransportRxLED();
00488          }
00489          //else {
00490          //   this->logger()->log("Attempting reconnection to MQTT in checkAndProcess...");
00491          //   this->disconnect();
00492          //   Thread::wait(15000);
00493          //   this->connect();
00494          //}
00495      }
00496      
00497      // send a PING if time for it
00498      --this->m_ping_countdown;
00499      if (this->m_ping_countdown <= 0) {
00500          //this->logger()->log("MQTT: Sending PING...");
00501          this->m_ping_countdown = MQTT_PING_COUNTDOWN;
00502          this->sendPingMessage();
00503      }
00504  }