mqtt specific components for the impact mbed endpoint library
Dependents: mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp
MQTTTransport.cpp
- Committer:
- ansond
- Date:
- 2014-07-01
- Revision:
- 42:297585f8e7bd
- Parent:
- 32:9a024a6af2fb
- Child:
- 43:14ccc830d6a6
File content as of revision 42:297585f8e7bd:
/* Copyright C2013 Doug Anson, MIT License * * Permission is hereby granted, free of charge, to any person obtaining a copy of this software * and associated documentation files the "Software", to deal in the Software without restriction, * including without limitation the rights to use, copy, modify, merge, publish, distribute, * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all copies or * substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "mbed.h" #ifdef NETWORK_MUTEX #include "rtos.h" extern Mutex *network_mutex; #endif #include "MQTTTransport.h" // Endpoint Support #include "MBEDEndpoint.h" // EmulatedResourceFactory support #include "EmulatedResourceFactory.h" // our transmitt instance MQTTTransport *_mqtt_instance = NULL; // MQTT callback to handle received messages void _mqtt_message_handler(char *topic,char *payload,unsigned int length) { char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; char rcv_topic[MQTT_IOC_TOPIC_LEN+1]; memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); memcpy(buffer,payload,length); strcpy(rcv_topic,topic); if (_mqtt_instance != NULL) { if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) { _mqtt_instance->processPongMessage(buffer,length); } else { memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); memcpy(buffer,payload,length); strcpy(rcv_topic,topic); _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length); } } } // our MQTT client endpoint PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler); // default constructor MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) { this->m_mqtt = NULL; _mqtt_instance = this; this->m_map = map; this->m_ping_counter = 1; this->m_ping_countdown = MQTT_PING_COUNTDOWN; this->initTopic(); } // default destructor MQTTTransport::~MQTTTransport() { this->disconnect(); } // init our topic void MQTTTransport::initTopic() { MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); char *endpoint_name = endpoint->getEndpointName(); memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1); sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name); } // get our topic char *MQTTTransport::getTopic() { return this->m_topic; } // get the IOC <--> MBED resource map MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; } // pull the endpoint name from the MQTT topic char *MQTTTransport::getEndpointNameFromTopic(char *topic) { if (topic != NULL) { memset(this->m_endpoint_name,0,PERSONALITY_NAME_LEN+1); char trash[MQTT_IOC_TOPIC_LEN+1]; char ep[MQTT_IOC_TOPIC_LEN+1]; memset(trash,0,MQTT_IOC_TOPIC_LEN+1); memset(ep,0,MQTT_IOC_TOPIC_LEN+1); bool done = false; int length = 0; if (topic != NULL) length = strlen(topic); for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; } sscanf(topic,"%s%s",trash,ep); //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep); if (strlen(ep) > 0) { if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) { // just insert the name and let the parser determine if its for us or not... strncpy(this->m_endpoint_name,ep,strlen(ep)); } else { // this is a broadcast message - so we need to process it MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); char *endpoint_name = endpoint->getEndpointName(); strcpy(this->m_endpoint_name,endpoint_name); } } //this->logger()->log("MQTT Topic (discovered): %s Original: %s",this->m_endpoint_name,topic); return this->m_endpoint_name; } //this->logger()->log("MQTT Topic (discovered): NULL Original: %s",topic); return NULL; } // process a MQTT Message void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) { char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1]; char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1]; char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1]; char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1]; // initialize memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1); memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1); memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1); memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1); // get our endpoint MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); char *endpoint_name = endpoint->getEndpointName(); // DEBUG //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name); // only respond if its for our node if (strcmp(endpoint_name,message_name) == 0) { // format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); memcpy(buffer,payload,payload_length); int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count; for(int i=0;i<payload_length;++i) { if (buffer[i] == ':') { if (i < (payload_length-1)) buffer[i] = ' '; else buffer[i] = '\0'; } } if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb); if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value); if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt); // DEBUG //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length); //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer)); //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value); // load endpoints if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all // load up our endpoints endpoint->loadPersonalities(); endpoint->updatePersonalities(); } else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) { // load up our endpoints (us only) endpoint->loadPersonalities(); endpoint->updatePersonalities(); } } else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all // update our endpoints endpoint->updatePersonalities(); } else { // update just our endpoint int index = -1; if (message_name != NULL) { index = endpoint->indexOfPersonality((char *)message_name); if (index >= 0) { if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update our endpoint endpoint->updatePersonality(index); } } } } } } // change a resource value if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) { if (message_name != NULL) { // destined for our lights? int index = endpoint->indexOfPersonality((char *)message_name); if (index >= 0) { if (message_verb != NULL) { // map the parameter to one of ours char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb); if (mapped_resource != NULL) { if (message_value != NULL) { EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index); bool success = factory->setResourceValue(mapped_resource,message_value); // end the resource value back over MQTT this->sendResult(message_name,message_verb,message_value,success); } } // for Dimming, we also want to refresh our record (in whole) at the IOC if (this->isDimmingResource(message_verb)) { // send a fresh update to the IOC - just us... int index = endpoint->indexOfPersonality((char *)message_name); if (index >= 0) endpoint->updatePersonality(index); } } } } } // get a resource value if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) { if (message_name != NULL) { // destined for our lights? int index = endpoint->indexOfPersonality((char *)message_name); if (index >= 0) { if (message_verb != NULL) { // map the parameter to one of ours char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb); if (mapped_resource != NULL) { EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index); strcpy(message_value,factory->getResourceValue((char *)mapped_resource)); bool success = false; if (message_value != NULL) success = true; // log resource get if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value); // end the resource value back over MQTT this->sendResult(message_name,message_verb,message_value,success); } } } } } } else { // message not bound for our node //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name); ; } } // is this the dimming resource? bool MQTTTransport::isDimmingResource(char *resource) { bool isDimming = false; if (resource != NULL && strcmp(resource,"Dimming") == 0) isDimming = true; return isDimming; } // send result back to MQTT void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) { if (this->m_connected == true) { // send the response back to MQTT this->logger()->log("Sending Response back to MQTT..."); char message[MAX_MQTT_MESSAGE_LENGTH+1]; memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED; sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->lock(); #endif bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message)); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->unlock(); #endif if (sent) { this->logger()->log("Result sent successfully"); this->logger()->blinkTransportTxLED(); } else { this->logger()->log("Result send FAILED"); } } else { // unable to send the response this->logger()->log("Unable to send response back to MQTT. Not connected."); } } char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); } char *MQTTTransport::makeID(char *id_template,char *buffer) { MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); int instance_id = rand()%100; if (endpoint != NULL) instance_id = endpoint->getInstanceID(); srand(time(0)); srand(rand()); sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id); return buffer; } // is this message a PONG message? bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) { bool isPong = false; char verb[MQTT_PING_VERB_LEN+1]; char end[MQTT_PING_VERB_LEN+1]; int counter = 0; // clean memset(verb,0,MQTT_PING_VERB_LEN+1); memset(end,0,MQTT_PING_VERB_LEN+1); // make sure this is for us... char *topic_ep_name = this->getEndpointNameFromTopic(topic); if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) { // parse the payload for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' '; sscanf(payload,"%s%d",verb,&counter); // check the contents to make sure its for us... //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter); if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) { // its a PONG message to our PING... isPong = true; } } // return isPong status return isPong; } // process this PONG message void MQTTTransport::processPongMessage(char *payload,int payload_length) { // DEBUG //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter); // simply increment the counter ++this->m_ping_counter; // reset counter if maxed if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1; } // send a PING message bool MQTTTransport::sendPingMessage() { bool sent = false; char message[MAX_MQTT_MESSAGE_LENGTH+1]; // initialize... memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); // build message sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter); // send the message over the ping/pong topic //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->lock(); #endif sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message)); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->unlock(); #endif if (sent) { // send succeeded //this->logger()->log("PING %d sent successfully",this->m_ping_counter); this->logger()->blinkTransportTxLED(); // wait for 1 second wait_ms(1000); } else { // fail silently now... ; // send failed! - reconnect //this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter); // attempt reconnect //this->logger()->log("PING send failed - re-connecting MQTT..."); //this->disconnect(); //sent = this->connect(); //if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter); //else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter); } // return our status return sent; } static char _mqtt_id[MQTT_ENDPOINT_IDLEN+1]; // connect up MQTT bool MQTTTransport::connect() { memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1)); if (this->m_connected == false) { this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); this->m_mqtt = &_mqtt; if (this->m_mqtt != NULL) { char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id); this->logger()->log("MQTT Connect: ID: %s...",id); if (this->m_mqtt->connect(id)) { this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic()); if (this->m_mqtt->subscribe(this->getTopic())) { if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) { this->logger()->log("MQTT CONNECTED."); this->m_connected = true; } else { this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC); this->logger()->turnLEDRed(); this->m_connected = false; } } else { this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic()); this->logger()->turnLEDRed(); this->m_connected = false; } } else { this->logger()->log("MQTT Connect: ID: %s FAILED",id); this->logger()->turnLEDRed(); this->m_connected = false; } } else { this->logger()->log("MQTT Unable to allocate new instance"); this->logger()->turnLEDRed(); this->m_connected = false; } } else { this->logger()->log("MQTT already connected (OK)"); } return this->m_connected; } // disconnect from MQTT bool MQTTTransport::disconnect() { if (this->m_mqtt != NULL) { this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic()); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->lock(); #endif this->m_mqtt->unsubscribe(this->getTopic()); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->unlock(); #endif this->logger()->log("MQTT Disconnecting..."); this->m_mqtt->disconnect(); } else { this->logger()->log("MQTT already disconnected (OK)"); } this->m_connected = false; return true; } // check transport and process stuff void MQTTTransport::checkAndProcess() { // process any MQTT messages if (this->m_mqtt != NULL && this->m_connected == true) { #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->lock(); #endif bool connected = this->m_mqtt->loop(); #ifdef NETWORK_MUTEX if (network_mutex != NULL) network_mutex->unlock(); #endif if (connected) { this->logger()->blinkTransportRxLED(); } //else { // this->logger()->log("Attempting reconnection to MQTT in checkAndProcess..."); // this->disconnect(); // Thread::wait(15000); // this->connect(); //} } // send a PING if time for it --this->m_ping_countdown; if (this->m_ping_countdown <= 0) { //this->logger()->log("MQTT: Sending PING..."); this->m_ping_countdown = MQTT_PING_COUNTDOWN; this->sendPingMessage(); } }