Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp
MQTTTransport.cpp
00001 /* Copyright C2013 Doug Anson, MIT License 00002 * 00003 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software 00004 * and associated documentation files the "Software", to deal in the Software without restriction, 00005 * including without limitation the rights to use, copy, modify, merge, publish, distribute, 00006 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is 00007 * furnished to do so, subject to the following conditions: 00008 * 00009 * The above copyright notice and this permission notice shall be included in all copies or 00010 * substantial portions of the Software. 00011 * 00012 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING 00013 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 00014 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 00015 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00016 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 00017 */ 00018 00019 #include "mbed.h" 00020 00021 #include "MQTTTransport.h" 00022 00023 // Endpoint Support 00024 #include "MBEDEndpoint.h" 00025 00026 // EmulatedResourceFactory support 00027 #include "EmulatedResourceFactory.h" 00028 00029 #ifdef NETWORK_MUTEX 00030 #include "rtos.h" 00031 extern Mutex *network_mutex; 00032 #endif 00033 00034 // our transmitt instance 00035 MQTTTransport *_mqtt_instance = NULL; 00036 00037 // MQTT callback to handle received messages 00038 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) { 00039 char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; 00040 char rcv_topic[MQTT_IOC_TOPIC_LEN+1]; 00041 00042 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); 00043 memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); 00044 memcpy(buffer,payload,length); 00045 strcpy(rcv_topic,topic); 00046 00047 if (_mqtt_instance != NULL) { 00048 if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) { 00049 _mqtt_instance->processPongMessage(buffer,length); 00050 } 00051 else { 00052 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); 00053 memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1); 00054 memcpy(buffer,payload,length); 00055 strcpy(rcv_topic,topic); 00056 _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length); 00057 } 00058 } 00059 } 00060 00061 // our MQTT client endpoint 00062 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler); 00063 00064 // default constructor 00065 MQTTTransport::MQTTTransport(Logger *logger,void *endpoint,MBEDToIOCResourceMap *map) : Transport(logger,endpoint) { 00066 this->m_mqtt = NULL; 00067 _mqtt_instance = this; 00068 this->m_map = map; 00069 this->m_ping_counter = 1; 00070 this->m_ping_countdown = MQTT_PING_COUNTDOWN; 00071 this->initTopic(); 00072 } 00073 00074 // default destructor 00075 MQTTTransport::~MQTTTransport() { 00076 this->disconnect(); 00077 } 00078 00079 // init our topic 00080 void MQTTTransport::initTopic() { 00081 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); 00082 char *endpoint_name = endpoint->getEndpointName(); 00083 memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1); 00084 sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name); 00085 } 00086 00087 // get our topic 00088 char *MQTTTransport::getTopic() { return this->m_topic; } 00089 00090 // get the IOC <--> MBED resource map 00091 MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; } 00092 00093 // pull the endpoint name from the MQTT topic 00094 char *MQTTTransport::getEndpointNameFromTopic(char *topic) { 00095 if (topic != NULL) { 00096 memset(this->m_endpoint_name,0,PERSONALITY_NAME_LEN+1); 00097 char trash[MQTT_IOC_TOPIC_LEN+1]; 00098 char ep[MQTT_IOC_TOPIC_LEN+1]; 00099 memset(trash,0,MQTT_IOC_TOPIC_LEN+1); 00100 memset(ep,0,MQTT_IOC_TOPIC_LEN+1); 00101 bool done = false; 00102 int length = 0; if (topic != NULL) length = strlen(topic); 00103 for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; } 00104 sscanf(topic,"%s%s",trash,ep); 00105 //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep); 00106 if (strlen(ep) > 0) { 00107 if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) { 00108 // just insert the name and let the parser determine if its for us or not... 00109 strncpy(this->m_endpoint_name,ep,strlen(ep)); 00110 } 00111 else { 00112 // this is a broadcast message - so we need to process it 00113 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); 00114 char *endpoint_name = endpoint->getEndpointName(); 00115 strcpy(this->m_endpoint_name,endpoint_name); 00116 } 00117 } 00118 //this->logger()->log("MQTT Topic (discovered): %s Original: %s",this->m_endpoint_name,topic); 00119 return this->m_endpoint_name; 00120 } 00121 //this->logger()->log("MQTT Topic (discovered): NULL Original: %s",topic); 00122 return NULL; 00123 } 00124 00125 // process a MQTT Message 00126 void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) { 00127 char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1]; 00128 char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1]; 00129 char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1]; 00130 char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1]; 00131 00132 // initialize 00133 memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1); 00134 memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1); 00135 memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1); 00136 memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1); 00137 00138 // get our endpoint 00139 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); 00140 char *endpoint_name = endpoint->getEndpointName(); 00141 00142 // DEBUG 00143 //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name); 00144 00145 // only respond if its for our node 00146 if (strcmp(endpoint_name,message_name) == 0) { 00147 // format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data 00148 char buffer[MAX_MQTT_MESSAGE_LENGTH+1]; 00149 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1); 00150 memcpy(buffer,payload,payload_length); 00151 int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count; 00152 for(int i=0;i<payload_length;++i) { 00153 if (buffer[i] == ':') { 00154 if (i < (payload_length-1)) buffer[i] = ' '; 00155 else buffer[i] = '\0'; 00156 } 00157 } 00158 if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb); 00159 if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value); 00160 if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt); 00161 00162 // DEBUG 00163 //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length); 00164 //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer)); 00165 //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value); 00166 00167 // load endpoints 00168 if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint 00169 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load 00170 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all 00171 // load up our endpoints 00172 endpoint->loadPersonalities(); 00173 endpoint->updatePersonalities(); 00174 } 00175 else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) { 00176 // load up our endpoints (us only) 00177 endpoint->loadPersonalities(); 00178 endpoint->updatePersonalities(); 00179 } 00180 } 00181 00182 else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update 00183 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all 00184 // update our endpoints 00185 endpoint->updatePersonalities(); 00186 } 00187 else { 00188 // update just our endpoint 00189 int index = -1; 00190 if (message_name != NULL) { 00191 index = endpoint->indexOfPersonality((char *)message_name); 00192 if (index >= 0) { 00193 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { 00194 // update our endpoint 00195 endpoint->updatePersonality(index); 00196 } 00197 } 00198 } 00199 } 00200 } 00201 } 00202 00203 // change a resource value 00204 if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) { 00205 if (message_name != NULL) { 00206 // destined for our lights? 00207 int index = endpoint->indexOfPersonality((char *)message_name); 00208 if (index >= 0) { 00209 if (message_verb != NULL) { 00210 // map the parameter to one of ours 00211 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb); 00212 if (mapped_resource != NULL) { 00213 if (message_value != NULL) { 00214 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index); 00215 bool success = factory->setResourceValue(mapped_resource,message_value); 00216 00217 // end the resource value back over MQTT 00218 this->sendResult(message_name,message_verb,message_value,success); 00219 } 00220 } 00221 00222 // for Dimming, we also want to refresh our record (in whole) at the IOC 00223 if (this->isDimmingResource(message_verb)) { 00224 // send a fresh update to the IOC - just us... 00225 int index = endpoint->indexOfPersonality((char *)message_name); 00226 if (index >= 0) endpoint->updatePersonality(index); 00227 } 00228 } 00229 } 00230 } 00231 } 00232 00233 // get a resource value 00234 if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) { 00235 if (message_name != NULL) { 00236 // destined for our lights? 00237 int index = endpoint->indexOfPersonality((char *)message_name); 00238 if (index >= 0) { 00239 if (message_verb != NULL) { 00240 // map the parameter to one of ours 00241 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb); 00242 if (mapped_resource != NULL) { 00243 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index); 00244 strcpy(message_value,factory->getResourceValue((char *)mapped_resource)); 00245 bool success = false; if (message_value != NULL) success = true; 00246 00247 // log resource get 00248 if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value); 00249 00250 // end the resource value back over MQTT 00251 this->sendResult(message_name,message_verb,message_value,success); 00252 } 00253 } 00254 } 00255 } 00256 } 00257 } 00258 else { 00259 // message not bound for our node 00260 //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name); 00261 ; 00262 } 00263 } 00264 00265 // is this the dimming resource? 00266 bool MQTTTransport::isDimmingResource(char *resource) { 00267 bool isDimming = false; 00268 00269 if (resource != NULL && strcmp(resource,"Dimming") == 0) isDimming = true; 00270 00271 return isDimming; 00272 } 00273 00274 // send result back to MQTT 00275 void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) { 00276 if (this->m_connected == true) { 00277 // send the response back to MQTT 00278 this->logger()->log("Sending Response back to MQTT..."); 00279 char message[MAX_MQTT_MESSAGE_LENGTH+1]; 00280 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); 00281 char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED; 00282 sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success); 00283 #ifdef NETWORK_MUTEX 00284 if (network_mutex != NULL) network_mutex->lock(); 00285 #endif 00286 bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message)); 00287 #ifdef NETWORK_MUTEX 00288 if (network_mutex != NULL) network_mutex->unlock(); 00289 #endif 00290 if (sent) { 00291 this->logger()->log("Result sent successfully"); 00292 this->logger()->blinkTransportTxLED(); 00293 } 00294 else { 00295 this->logger()->log("Result send FAILED"); 00296 } 00297 } 00298 else { 00299 // unable to send the response 00300 this->logger()->log("Unable to send response back to MQTT. Not connected."); 00301 } 00302 } 00303 00304 char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); } 00305 00306 char *MQTTTransport::makeID(char *id_template,char *buffer) { 00307 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint(); 00308 int instance_id = rand()%100; 00309 if (endpoint != NULL) instance_id = endpoint->getInstanceID(); 00310 srand(time(0)); 00311 srand(rand()); 00312 sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id); 00313 return buffer; 00314 } 00315 00316 // is this message a PONG message? 00317 bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) { 00318 bool isPong = false; 00319 char verb[MQTT_PING_VERB_LEN+1]; 00320 char end[MQTT_PING_VERB_LEN+1]; 00321 int counter = 0; 00322 00323 // clean 00324 memset(verb,0,MQTT_PING_VERB_LEN+1); 00325 memset(end,0,MQTT_PING_VERB_LEN+1); 00326 00327 // make sure this is for us... 00328 char *topic_ep_name = this->getEndpointNameFromTopic(topic); 00329 if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) { 00330 // parse the payload 00331 for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' '; 00332 sscanf(payload,"%s%d",verb,&counter); 00333 00334 // check the contents to make sure its for us... 00335 //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter); 00336 if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) { 00337 // its a PONG message to our PING... 00338 isPong = true; 00339 } 00340 } 00341 00342 // return isPong status 00343 return isPong; 00344 } 00345 00346 00347 // process this PONG message 00348 void MQTTTransport::processPongMessage(char *payload,int payload_length) { 00349 // DEBUG 00350 //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter); 00351 00352 // simply increment the counter 00353 ++this->m_ping_counter; 00354 00355 // reset counter if maxed 00356 if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1; 00357 } 00358 00359 // send a PING message 00360 bool MQTTTransport::sendPingMessage() { 00361 bool sent = false; 00362 char message[MAX_MQTT_MESSAGE_LENGTH+1]; 00363 00364 // initialize... 00365 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1); 00366 00367 // build message 00368 sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter); 00369 00370 // send the message over the ping/pong topic 00371 //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter); 00372 #ifdef NETWORK_MUTEX 00373 if (network_mutex != NULL) network_mutex->lock(); 00374 #endif 00375 sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message)); 00376 #ifdef NETWORK_MUTEX 00377 if (network_mutex != NULL) network_mutex->unlock(); 00378 #endif 00379 if (sent) { 00380 // send succeeded 00381 //this->logger()->log("PING %d sent successfully",this->m_ping_counter); 00382 this->logger()->blinkTransportTxLED(); 00383 00384 // wait for 1 second 00385 Thread::wait(1000); 00386 } 00387 else { 00388 // fail silently now... 00389 ; 00390 00391 // send failed! - reconnect 00392 //this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter); 00393 00394 // attempt reconnect 00395 //this->logger()->log("PING send failed - re-connecting MQTT..."); 00396 //this->disconnect(); 00397 //sent = this->connect(); 00398 //if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter); 00399 //else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter); 00400 } 00401 00402 // return our status 00403 return sent; 00404 } 00405 00406 static char _mqtt_id[MQTT_ENDPOINT_IDLEN+1]; 00407 00408 // connect up MQTT 00409 bool MQTTTransport::connect() { 00410 memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1)); 00411 if (this->m_connected == false) { 00412 this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT); 00413 this->m_mqtt = &_mqtt; 00414 if (this->m_mqtt != NULL) { 00415 char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id); 00416 this->logger()->log("MQTT Connect: ID: %s...",id); 00417 if (this->m_mqtt->connect(id)) { 00418 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic()); 00419 if (this->m_mqtt->subscribe(this->getTopic())) { 00420 if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) { 00421 this->logger()->log("MQTT CONNECTED."); 00422 this->m_connected = true; 00423 } 00424 else { 00425 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC); 00426 this->logger()->turnLEDRed(); 00427 this->m_connected = false; 00428 } 00429 } 00430 else { 00431 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic()); 00432 this->logger()->turnLEDRed(); 00433 this->m_connected = false; 00434 } 00435 } 00436 else { 00437 this->logger()->log("MQTT Connect: ID: %s FAILED",id); 00438 this->logger()->turnLEDRed(); 00439 this->m_connected = false; 00440 } 00441 } 00442 else { 00443 this->logger()->log("MQTT Unable to allocate new instance"); 00444 this->logger()->turnLEDRed(); 00445 this->m_connected = false; 00446 } 00447 } 00448 else { 00449 this->logger()->log("MQTT already connected (OK)"); 00450 } 00451 return this->m_connected; 00452 } 00453 00454 // disconnect from MQTT 00455 bool MQTTTransport::disconnect() { 00456 if (this->m_mqtt != NULL) { 00457 this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic()); 00458 #ifdef NETWORK_MUTEX 00459 if (network_mutex != NULL) network_mutex->lock(); 00460 #endif 00461 this->m_mqtt->unsubscribe(this->getTopic()); 00462 #ifdef NETWORK_MUTEX 00463 if (network_mutex != NULL) network_mutex->unlock(); 00464 #endif 00465 this->logger()->log("MQTT Disconnecting..."); 00466 this->m_mqtt->disconnect(); 00467 } 00468 else { 00469 this->logger()->log("MQTT already disconnected (OK)"); 00470 } 00471 this->m_connected = false; 00472 return true; 00473 } 00474 00475 // check transport and process stuff 00476 void MQTTTransport::checkAndProcess() { 00477 // process any MQTT messages 00478 if (this->m_mqtt != NULL && this->m_connected == true) { 00479 #ifdef NETWORK_MUTEX 00480 if (network_mutex != NULL) network_mutex->lock(); 00481 #endif 00482 bool connected = this->m_mqtt->loop(); 00483 #ifdef NETWORK_MUTEX 00484 if (network_mutex != NULL) network_mutex->unlock(); 00485 #endif 00486 if (connected) { 00487 this->logger()->blinkTransportRxLED(); 00488 } 00489 //else { 00490 // this->logger()->log("Attempting reconnection to MQTT in checkAndProcess..."); 00491 // this->disconnect(); 00492 // Thread::wait(15000); 00493 // this->connect(); 00494 //} 00495 } 00496 00497 // send a PING if time for it 00498 --this->m_ping_countdown; 00499 if (this->m_ping_countdown <= 0) { 00500 //this->logger()->log("MQTT: Sending PING..."); 00501 this->m_ping_countdown = MQTT_PING_COUNTDOWN; 00502 this->sendPingMessage(); 00503 } 00504 }
Generated on Thu Jul 14 2022 16:49:25 by
1.7.2