mqtt specific components for the impact mbed endpoint library
Dependents: mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp
Diff: MQTTTransport.cpp
- Revision:
- 0:a3fc1c6ef150
- Child:
- 7:8a4a61202b36
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTTransport.cpp Wed Mar 26 19:48:35 2014 +0000 @@ -0,0 +1,468 @@ +/* Copyright C2013 Doug Anson, MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software + * and associated documentation files the "Software", to deal in the Software without restriction, + * including without limitation the rights to use, copy, modify, merge, publish, distribute, + * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or + * substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + + #include "mbed.h" + #include "rtos.h" + + #include "MQTTTransport.h" + + // Endpoint Support + #include "MBEDEndpoint.h" + + // EmulatedResourceFactory support + #include "EmulatedResourceFactory.h" + + // Network mutex + extern Mutex *network_mutex; + + // our transmitt instance + MQTTTransport *_mqtt_instance = NULL; + + // MQTT callback to handle received messages + void _mqtt_message_handler(char *topic,char *payload,unsigned int length) { + char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; + char rcv_topic[MQTT_IOC_TOPIC_LEN+1]; + + memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); + memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); + memcpy(buffer,payload,length); + strcpy(rcv_topic,topic); + + if (_mqtt_instance != NULL) { + if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) { + _mqtt_instance->processPongMessage(buffer,length); + } + else { + memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); + memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); + memcpy(buffer,payload,length); + strcpy(rcv_topic,topic); + _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length); + } + } + } + + // our MQTT client endpoint + PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler); + + // default constructor + MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) { + this->m_mqtt = NULL; + _mqtt_instance = this; + this->m_map = map; + this->m_ping_counter = 1; + this->m_ping_countdown = MQTT_PING_COUNTDOWN; + this->initTopic(); + } + + // default destructor + MQTTTransport::~MQTTTransport() { + this->disconnect(); + } + + // init our topic + void MQTTTransport::initTopic() { + MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); + char *endpoint_name = endpoint->getEndpointName(); + memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1); + sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name); + } + + // get our topic + char *MQTTTransport::getTopic() { return this->m_topic; } + + // get the IOC <--> MBED resource map + MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; } + + // pull the endpoint name from the MQTT topic + char *MQTTTransport::getEndpointNameFromTopic(char *topic) { + if (topic != NULL) { + memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1); + char trash[MQTT_IOC_TOPIC_LEN+1]; + char ep[MQTT_IOC_TOPIC_LEN+1]; + memset(trash,0,MQTT_IOC_TOPIC_LEN+1); + memset(ep,0,MQTT_IOC_TOPIC_LEN+1); + bool done = false; + int length = 0; if (topic != NULL) length = strlen(topic); + for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; } + sscanf(topic,"%s%s",trash,ep); + //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep); + if (strlen(ep) > 0) { + if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) { + // just insert the name and let the parser determine if its for us or not... + strncpy(this->m_endpoint_name,ep,strlen(ep)); + } + else { + // this is a broadcast message - so we need to process it + MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); + char *endpoint_name = endpoint->getEndpointName(); + strcpy(this->m_endpoint_name,endpoint_name); + } + } + //this->logger()->log("MQTT Topic (discovered): %s Original: %s",this->m_endpoint_name,topic); + return this->m_endpoint_name; + } + //this->logger()->log("MQTT Topic (discovered): NULL Original: %s",topic); + return NULL; + } + + // process a MQTT Message + void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) { + char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1]; + char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1]; + char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1]; + char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1]; + + // initialize + memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1); + memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1); + memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1); + memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1); + + // get our endpoint + MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); + char *endpoint_name = endpoint->getEndpointName(); + + // DEBUG + //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name); + + // only respond if its for our node + if (strcmp(endpoint_name,message_name) == 0) { + // format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data + char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; + memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); + memcpy(buffer,payload,payload_length); + int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count; + for(int i=0;i<payload_length;++i) { + if (buffer[i] == ':') { + if (i < (payload_length-1)) buffer[i] = ' '; + else buffer[i] = '\0'; + } + } + if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb); + if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value); + if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt); + + // DEBUG + //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length); + //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer)); + //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value); + + // load endpoints + if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint + if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load + if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all + // load up our endpoints + endpoint->loadEndpoints(); + endpoint->updateEndpoints(); + } + else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) { + // load up our endpoints (us only) + endpoint->loadEndpoints(); + endpoint->updateEndpoints(); + } + } + + else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update + if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all + // update our endpoints + endpoint->updateEndpoints(); + } + else { + // update just our endpoint + int index = -1; + if (message_name != NULL) { + index = endpoint->indexOfLight((char *)message_name); + if (index >= 0) { + if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { + // update our endpoint + endpoint->updateEndpoints(index); + } + } + } + } + } + } + + // change a resource value + if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) { + if (message_name != NULL) { + // destined for our lights? + int index = endpoint->indexOfLight((char *)message_name); + if (index >= 0) { + if (message_verb != NULL) { + // map the parameter to one of ours + char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb); + if (mapped_resource != NULL) { + if (message_value != NULL) { + EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index); + bool success = factory->setResourceValue(mapped_resource,message_value); + + // end the resource value back over MQTT + this->sendResult(message_name,message_verb,message_value,success); + } + } + } + } + } + } + + // get a resource value + if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) { + if (message_name != NULL) { + // destined for our lights? + int index = endpoint->indexOfLight((char *)message_name); + if (index >= 0) { + if (message_verb != NULL) { + // map the parameter to one of ours + char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb); + if (mapped_resource != NULL) { + EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index); + strcpy(message_value,factory->getResourceValue((char *)mapped_resource)); + bool success = false; if (message_value != NULL) success = true; + + // log resource get + if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value); + + // end the resource value back over MQTT + this->sendResult(message_name,message_verb,message_value,success); + } + } + } + } + } + } + else { + // message not bound for our node + //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name); + ; + } + } + + // send result back to MQTT + void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) { + if (this->m_connected == true) { + // send the response back to MQTT + this->logger()->log("Sending Response back to MQTT..."); + char message[MAX_MQTT_MESSAGE_LENGTH+1]; + memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); + char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED; + sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success); + if (network_mutex != NULL) network_mutex->lock(); + bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message)); + if (network_mutex != NULL) network_mutex->unlock(); + if (sent) { + this->logger()->log("Result sent successfully"); + this->logger()->blinkTransportTxLED(); + } + else { + this->logger()->log("Result send FAILED"); + } + } + else { + // unable to send the response + this->logger()->log("Unable to send response back to MQTT. Not connected."); + } + } + + char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); } + + char *MQTTTransport::makeID(char *id_template,char *buffer) { + MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); + int instance_id = rand()%100; + if (endpoint != NULL) instance_id = endpoint->getInstanceID(); + srand(time(0)); + srand(rand()); + sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id); + return buffer; + } + + // is this message a PONG message? + bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) { + bool isPong = false; + char verb[MQTT_PING_VERB_LEN+1]; + char end[MQTT_PING_VERB_LEN+1]; + int counter = 0; + + // clean + memset(verb,0,MQTT_PING_VERB_LEN+1); + memset(end,0,MQTT_PING_VERB_LEN+1); + + // make sure this is for us... + char *topic_ep_name = this->getEndpointNameFromTopic(topic); + if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) { + // parse the payload + for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' '; + sscanf(payload,"%s%d",verb,&counter); + + // check the contents to make sure its for us... + //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter); + if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) { + // its a PONG message to our PING... + isPong = true; + } + } + + // return isPong status + return isPong; + } + + + // process this PONG message + void MQTTTransport::processPongMessage(char *payload,int payload_length) { + // DEBUG + //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter); + + // simply increment the counter + ++this->m_ping_counter; + + // reset counter if maxed + if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1; + } + + // send a PING message + bool MQTTTransport::sendPingMessage() { + bool sent = false; + char message[MAX_MQTT_MESSAGE_LENGTH+1]; + + // initialize... + memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); + + // build message + sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter); + + // send the message over the ping/pong topic + //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter); + if (network_mutex != NULL) network_mutex->lock(); + sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message)); + if (network_mutex != NULL) network_mutex->unlock(); + if (sent) { + // send succeeded + //this->logger()->log("PING %d sent successfully",this->m_ping_counter); + this->logger()->blinkTransportTxLED(); + + // wait for 1 second + wait_ms(1000); + } + else { + // send failed! - reconnect + this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter); + + // attempt reconnect + this->logger()->log("PING send failed - re-connecting MQTT..."); + this->disconnect(); + sent = this->connect(); + if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter); + else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter); + } + + // return our status + return sent; + } + + // connect up MQTT + bool MQTTTransport::connect() { + if (network_mutex != NULL) network_mutex->lock(); + char mqtt_id[MQTT_ENDPOINT_IDLEN+1]; + memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1)); + if (this->m_connected == false) { + this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); + this->m_mqtt = &_mqtt; + if (this->m_mqtt != NULL) { + char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id); + this->logger()->log("MQTT Connect: ID: %s...",id); + if (this->m_mqtt->connect(id)) { + this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic()); + if (this->m_mqtt->subscribe(this->getTopic())) { + if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) { + this->logger()->log("MQTT CONNECTED."); + this->m_connected = true; + } + else { + this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC); + this->logger()->turnLEDRed(); + this->m_connected = false; + } + } + else { + this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic()); + this->logger()->turnLEDRed(); + this->m_connected = false; + } + } + else { + this->logger()->log("MQTT Connect: ID: %s FAILED",id); + this->logger()->turnLEDRed(); + this->m_connected = false; + } + } + else { + this->logger()->log("MQTT Unable to allocate new instance"); + this->logger()->turnLEDRed(); + this->m_connected = false; + } + } + else { + this->logger()->log("MQTT already connected (OK)"); + } + if (network_mutex != NULL) network_mutex->unlock(); + return this->m_connected; + } + + // disconnect from MQTT + bool MQTTTransport::disconnect() { + if (this->m_mqtt != NULL) { + this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic()); + if (network_mutex != NULL) network_mutex->lock(); + this->m_mqtt->unsubscribe(this->getTopic()); + if (network_mutex != NULL) network_mutex->unlock(); + this->logger()->log("MQTT Disconnecting..."); + this->m_mqtt->disconnect(); + } + else { + this->logger()->log("MQTT already disconnected (OK)"); + } + this->m_connected = false; + return true; + } + + // check transport and process stuff + void MQTTTransport::checkAndProcess() { + // process any MQTT messages + if (this->m_mqtt != NULL && this->m_connected == true) { + if (network_mutex != NULL) network_mutex->lock(); + bool connected = this->m_mqtt->loop(); + if (network_mutex != NULL) network_mutex->unlock(); + if (connected) { + this->logger()->blinkTransportRxLED(); + } + else { + this->logger()->log("Attempting reconnection to MQTT in checkAndProcess..."); + this->disconnect(); + this->connect(); + } + } + + // send a PING if time for it + --this->m_ping_countdown; + if (this->m_ping_countdown <= 0) { + //this->logger()->log("MQTT: Sending PING..."); + this->m_ping_countdown = MQTT_PING_COUNTDOWN; + this->sendPingMessage(); + } + } \ No newline at end of file