mqtt specific components for the impact mbed endpoint library

Dependents:   mbed_mqtt_endpoint_ublox_ethernet mbed_mqtt_endpoint_ublox_cellular mbed_mqtt_endpoint_nxp

Committer:
ansond
Date:
Tue Jul 01 17:18:20 2014 +0000
Revision:
43:14ccc830d6a6
Parent:
42:297585f8e7bd
Child:
49:965a65122c0f
Child:
52:8b95544920af
fixes to switches for mutexes

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ansond 0:a3fc1c6ef150 1 /* Copyright C2013 Doug Anson, MIT License
ansond 0:a3fc1c6ef150 2 *
ansond 0:a3fc1c6ef150 3 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
ansond 0:a3fc1c6ef150 4 * and associated documentation files the "Software", to deal in the Software without restriction,
ansond 0:a3fc1c6ef150 5 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
ansond 0:a3fc1c6ef150 6 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
ansond 0:a3fc1c6ef150 7 * furnished to do so, subject to the following conditions:
ansond 0:a3fc1c6ef150 8 *
ansond 0:a3fc1c6ef150 9 * The above copyright notice and this permission notice shall be included in all copies or
ansond 0:a3fc1c6ef150 10 * substantial portions of the Software.
ansond 0:a3fc1c6ef150 11 *
ansond 0:a3fc1c6ef150 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
ansond 0:a3fc1c6ef150 13 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
ansond 0:a3fc1c6ef150 14 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
ansond 0:a3fc1c6ef150 15 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
ansond 0:a3fc1c6ef150 16 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
ansond 0:a3fc1c6ef150 17 */
ansond 0:a3fc1c6ef150 18
ansond 0:a3fc1c6ef150 19 #include "mbed.h"
ansond 42:297585f8e7bd 20
ansond 0:a3fc1c6ef150 21 #include "MQTTTransport.h"
ansond 0:a3fc1c6ef150 22
ansond 0:a3fc1c6ef150 23 // Endpoint Support
ansond 0:a3fc1c6ef150 24 #include "MBEDEndpoint.h"
ansond 0:a3fc1c6ef150 25
ansond 0:a3fc1c6ef150 26 // EmulatedResourceFactory support
ansond 0:a3fc1c6ef150 27 #include "EmulatedResourceFactory.h"
ansond 43:14ccc830d6a6 28
ansond 43:14ccc830d6a6 29 #ifdef NETWORK_MUTEX
ansond 43:14ccc830d6a6 30 #include "rtos.h"
ansond 43:14ccc830d6a6 31 extern Mutex *network_mutex;
ansond 43:14ccc830d6a6 32 #endif
ansond 0:a3fc1c6ef150 33
ansond 0:a3fc1c6ef150 34 // our transmitt instance
ansond 0:a3fc1c6ef150 35 MQTTTransport *_mqtt_instance = NULL;
ansond 0:a3fc1c6ef150 36
ansond 0:a3fc1c6ef150 37 // MQTT callback to handle received messages
ansond 0:a3fc1c6ef150 38 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
ansond 0:a3fc1c6ef150 39 char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 0:a3fc1c6ef150 40 char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
ansond 0:a3fc1c6ef150 41
ansond 0:a3fc1c6ef150 42 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 0:a3fc1c6ef150 43 memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 0:a3fc1c6ef150 44 memcpy(buffer,payload,length);
ansond 0:a3fc1c6ef150 45 strcpy(rcv_topic,topic);
ansond 0:a3fc1c6ef150 46
ansond 0:a3fc1c6ef150 47 if (_mqtt_instance != NULL) {
ansond 0:a3fc1c6ef150 48 if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
ansond 0:a3fc1c6ef150 49 _mqtt_instance->processPongMessage(buffer,length);
ansond 0:a3fc1c6ef150 50 }
ansond 0:a3fc1c6ef150 51 else {
ansond 0:a3fc1c6ef150 52 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 0:a3fc1c6ef150 53 memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 0:a3fc1c6ef150 54 memcpy(buffer,payload,length);
ansond 0:a3fc1c6ef150 55 strcpy(rcv_topic,topic);
ansond 0:a3fc1c6ef150 56 _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
ansond 0:a3fc1c6ef150 57 }
ansond 0:a3fc1c6ef150 58 }
ansond 0:a3fc1c6ef150 59 }
ansond 0:a3fc1c6ef150 60
ansond 0:a3fc1c6ef150 61 // our MQTT client endpoint
ansond 0:a3fc1c6ef150 62 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
ansond 0:a3fc1c6ef150 63
ansond 0:a3fc1c6ef150 64 // default constructor
ansond 0:a3fc1c6ef150 65 MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
ansond 0:a3fc1c6ef150 66 this->m_mqtt = NULL;
ansond 0:a3fc1c6ef150 67 _mqtt_instance = this;
ansond 0:a3fc1c6ef150 68 this->m_map = map;
ansond 0:a3fc1c6ef150 69 this->m_ping_counter = 1;
ansond 0:a3fc1c6ef150 70 this->m_ping_countdown = MQTT_PING_COUNTDOWN;
ansond 0:a3fc1c6ef150 71 this->initTopic();
ansond 0:a3fc1c6ef150 72 }
ansond 0:a3fc1c6ef150 73
ansond 0:a3fc1c6ef150 74 // default destructor
ansond 0:a3fc1c6ef150 75 MQTTTransport::~MQTTTransport() {
ansond 0:a3fc1c6ef150 76 this->disconnect();
ansond 0:a3fc1c6ef150 77 }
ansond 0:a3fc1c6ef150 78
ansond 0:a3fc1c6ef150 79 // init our topic
ansond 0:a3fc1c6ef150 80 void MQTTTransport::initTopic() {
ansond 0:a3fc1c6ef150 81 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 0:a3fc1c6ef150 82 char *endpoint_name = endpoint->getEndpointName();
ansond 0:a3fc1c6ef150 83 memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 0:a3fc1c6ef150 84 sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
ansond 0:a3fc1c6ef150 85 }
ansond 0:a3fc1c6ef150 86
ansond 0:a3fc1c6ef150 87 // get our topic
ansond 0:a3fc1c6ef150 88 char *MQTTTransport::getTopic() { return this->m_topic; }
ansond 0:a3fc1c6ef150 89
ansond 0:a3fc1c6ef150 90 // get the IOC <--> MBED resource map
ansond 0:a3fc1c6ef150 91 MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
ansond 0:a3fc1c6ef150 92
ansond 0:a3fc1c6ef150 93 // pull the endpoint name from the MQTT topic
ansond 0:a3fc1c6ef150 94 char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
ansond 0:a3fc1c6ef150 95 if (topic != NULL) {
ansond 7:8a4a61202b36 96 memset(this->m_endpoint_name,0,PERSONALITY_NAME_LEN+1);
ansond 0:a3fc1c6ef150 97 char trash[MQTT_IOC_TOPIC_LEN+1];
ansond 0:a3fc1c6ef150 98 char ep[MQTT_IOC_TOPIC_LEN+1];
ansond 0:a3fc1c6ef150 99 memset(trash,0,MQTT_IOC_TOPIC_LEN+1);
ansond 0:a3fc1c6ef150 100 memset(ep,0,MQTT_IOC_TOPIC_LEN+1);
ansond 0:a3fc1c6ef150 101 bool done = false;
ansond 0:a3fc1c6ef150 102 int length = 0; if (topic != NULL) length = strlen(topic);
ansond 0:a3fc1c6ef150 103 for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; }
ansond 0:a3fc1c6ef150 104 sscanf(topic,"%s%s",trash,ep);
ansond 0:a3fc1c6ef150 105 //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep);
ansond 0:a3fc1c6ef150 106 if (strlen(ep) > 0) {
ansond 0:a3fc1c6ef150 107 if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) {
ansond 0:a3fc1c6ef150 108 // just insert the name and let the parser determine if its for us or not...
ansond 0:a3fc1c6ef150 109 strncpy(this->m_endpoint_name,ep,strlen(ep));
ansond 0:a3fc1c6ef150 110 }
ansond 0:a3fc1c6ef150 111 else {
ansond 0:a3fc1c6ef150 112 // this is a broadcast message - so we need to process it
ansond 0:a3fc1c6ef150 113 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 0:a3fc1c6ef150 114 char *endpoint_name = endpoint->getEndpointName();
ansond 0:a3fc1c6ef150 115 strcpy(this->m_endpoint_name,endpoint_name);
ansond 0:a3fc1c6ef150 116 }
ansond 0:a3fc1c6ef150 117 }
ansond 0:a3fc1c6ef150 118 //this->logger()->log("MQTT Topic (discovered): %s Original: %s",this->m_endpoint_name,topic);
ansond 0:a3fc1c6ef150 119 return this->m_endpoint_name;
ansond 0:a3fc1c6ef150 120 }
ansond 0:a3fc1c6ef150 121 //this->logger()->log("MQTT Topic (discovered): NULL Original: %s",topic);
ansond 0:a3fc1c6ef150 122 return NULL;
ansond 0:a3fc1c6ef150 123 }
ansond 0:a3fc1c6ef150 124
ansond 0:a3fc1c6ef150 125 // process a MQTT Message
ansond 0:a3fc1c6ef150 126 void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) {
ansond 0:a3fc1c6ef150 127 char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 0:a3fc1c6ef150 128 char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 0:a3fc1c6ef150 129 char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 0:a3fc1c6ef150 130 char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 0:a3fc1c6ef150 131
ansond 0:a3fc1c6ef150 132 // initialize
ansond 0:a3fc1c6ef150 133 memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 0:a3fc1c6ef150 134 memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 0:a3fc1c6ef150 135 memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 0:a3fc1c6ef150 136 memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 0:a3fc1c6ef150 137
ansond 0:a3fc1c6ef150 138 // get our endpoint
ansond 0:a3fc1c6ef150 139 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 0:a3fc1c6ef150 140 char *endpoint_name = endpoint->getEndpointName();
ansond 0:a3fc1c6ef150 141
ansond 0:a3fc1c6ef150 142 // DEBUG
ansond 0:a3fc1c6ef150 143 //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
ansond 0:a3fc1c6ef150 144
ansond 0:a3fc1c6ef150 145 // only respond if its for our node
ansond 0:a3fc1c6ef150 146 if (strcmp(endpoint_name,message_name) == 0) {
ansond 0:a3fc1c6ef150 147 // format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data
ansond 0:a3fc1c6ef150 148 char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 0:a3fc1c6ef150 149 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 0:a3fc1c6ef150 150 memcpy(buffer,payload,payload_length);
ansond 0:a3fc1c6ef150 151 int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count;
ansond 0:a3fc1c6ef150 152 for(int i=0;i<payload_length;++i) {
ansond 0:a3fc1c6ef150 153 if (buffer[i] == ':') {
ansond 0:a3fc1c6ef150 154 if (i < (payload_length-1)) buffer[i] = ' ';
ansond 0:a3fc1c6ef150 155 else buffer[i] = '\0';
ansond 0:a3fc1c6ef150 156 }
ansond 0:a3fc1c6ef150 157 }
ansond 0:a3fc1c6ef150 158 if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb);
ansond 0:a3fc1c6ef150 159 if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value);
ansond 0:a3fc1c6ef150 160 if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt);
ansond 0:a3fc1c6ef150 161
ansond 0:a3fc1c6ef150 162 // DEBUG
ansond 0:a3fc1c6ef150 163 //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length);
ansond 0:a3fc1c6ef150 164 //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer));
ansond 0:a3fc1c6ef150 165 //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value);
ansond 0:a3fc1c6ef150 166
ansond 0:a3fc1c6ef150 167 // load endpoints
ansond 0:a3fc1c6ef150 168 if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint
ansond 0:a3fc1c6ef150 169 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load
ansond 0:a3fc1c6ef150 170 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
ansond 0:a3fc1c6ef150 171 // load up our endpoints
ansond 7:8a4a61202b36 172 endpoint->loadPersonalities();
ansond 7:8a4a61202b36 173 endpoint->updatePersonalities();
ansond 0:a3fc1c6ef150 174 }
ansond 0:a3fc1c6ef150 175 else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) {
ansond 0:a3fc1c6ef150 176 // load up our endpoints (us only)
ansond 7:8a4a61202b36 177 endpoint->loadPersonalities();
ansond 7:8a4a61202b36 178 endpoint->updatePersonalities();
ansond 0:a3fc1c6ef150 179 }
ansond 0:a3fc1c6ef150 180 }
ansond 0:a3fc1c6ef150 181
ansond 0:a3fc1c6ef150 182 else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update
ansond 0:a3fc1c6ef150 183 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
ansond 0:a3fc1c6ef150 184 // update our endpoints
ansond 7:8a4a61202b36 185 endpoint->updatePersonalities();
ansond 0:a3fc1c6ef150 186 }
ansond 0:a3fc1c6ef150 187 else {
ansond 0:a3fc1c6ef150 188 // update just our endpoint
ansond 0:a3fc1c6ef150 189 int index = -1;
ansond 0:a3fc1c6ef150 190 if (message_name != NULL) {
ansond 7:8a4a61202b36 191 index = endpoint->indexOfPersonality((char *)message_name);
ansond 0:a3fc1c6ef150 192 if (index >= 0) {
ansond 0:a3fc1c6ef150 193 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
ansond 0:a3fc1c6ef150 194 // update our endpoint
ansond 7:8a4a61202b36 195 endpoint->updatePersonality(index);
ansond 0:a3fc1c6ef150 196 }
ansond 0:a3fc1c6ef150 197 }
ansond 0:a3fc1c6ef150 198 }
ansond 0:a3fc1c6ef150 199 }
ansond 0:a3fc1c6ef150 200 }
ansond 0:a3fc1c6ef150 201 }
ansond 0:a3fc1c6ef150 202
ansond 0:a3fc1c6ef150 203 // change a resource value
ansond 0:a3fc1c6ef150 204 if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
ansond 0:a3fc1c6ef150 205 if (message_name != NULL) {
ansond 0:a3fc1c6ef150 206 // destined for our lights?
ansond 7:8a4a61202b36 207 int index = endpoint->indexOfPersonality((char *)message_name);
ansond 0:a3fc1c6ef150 208 if (index >= 0) {
ansond 0:a3fc1c6ef150 209 if (message_verb != NULL) {
ansond 0:a3fc1c6ef150 210 // map the parameter to one of ours
ansond 0:a3fc1c6ef150 211 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
ansond 0:a3fc1c6ef150 212 if (mapped_resource != NULL) {
ansond 0:a3fc1c6ef150 213 if (message_value != NULL) {
ansond 0:a3fc1c6ef150 214 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
ansond 0:a3fc1c6ef150 215 bool success = factory->setResourceValue(mapped_resource,message_value);
ansond 0:a3fc1c6ef150 216
ansond 0:a3fc1c6ef150 217 // end the resource value back over MQTT
ansond 0:a3fc1c6ef150 218 this->sendResult(message_name,message_verb,message_value,success);
ansond 0:a3fc1c6ef150 219 }
ansond 0:a3fc1c6ef150 220 }
ansond 31:e5950e0677be 221
ansond 31:e5950e0677be 222 // for Dimming, we also want to refresh our record (in whole) at the IOC
ansond 31:e5950e0677be 223 if (this->isDimmingResource(message_verb)) {
ansond 31:e5950e0677be 224 // send a fresh update to the IOC - just us...
ansond 31:e5950e0677be 225 int index = endpoint->indexOfPersonality((char *)message_name);
ansond 31:e5950e0677be 226 if (index >= 0) endpoint->updatePersonality(index);
ansond 31:e5950e0677be 227 }
ansond 0:a3fc1c6ef150 228 }
ansond 0:a3fc1c6ef150 229 }
ansond 0:a3fc1c6ef150 230 }
ansond 0:a3fc1c6ef150 231 }
ansond 0:a3fc1c6ef150 232
ansond 0:a3fc1c6ef150 233 // get a resource value
ansond 0:a3fc1c6ef150 234 if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
ansond 0:a3fc1c6ef150 235 if (message_name != NULL) {
ansond 0:a3fc1c6ef150 236 // destined for our lights?
ansond 7:8a4a61202b36 237 int index = endpoint->indexOfPersonality((char *)message_name);
ansond 0:a3fc1c6ef150 238 if (index >= 0) {
ansond 0:a3fc1c6ef150 239 if (message_verb != NULL) {
ansond 0:a3fc1c6ef150 240 // map the parameter to one of ours
ansond 0:a3fc1c6ef150 241 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
ansond 0:a3fc1c6ef150 242 if (mapped_resource != NULL) {
ansond 0:a3fc1c6ef150 243 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
ansond 0:a3fc1c6ef150 244 strcpy(message_value,factory->getResourceValue((char *)mapped_resource));
ansond 0:a3fc1c6ef150 245 bool success = false; if (message_value != NULL) success = true;
ansond 0:a3fc1c6ef150 246
ansond 0:a3fc1c6ef150 247 // log resource get
ansond 0:a3fc1c6ef150 248 if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
ansond 0:a3fc1c6ef150 249
ansond 0:a3fc1c6ef150 250 // end the resource value back over MQTT
ansond 0:a3fc1c6ef150 251 this->sendResult(message_name,message_verb,message_value,success);
ansond 0:a3fc1c6ef150 252 }
ansond 0:a3fc1c6ef150 253 }
ansond 0:a3fc1c6ef150 254 }
ansond 0:a3fc1c6ef150 255 }
ansond 0:a3fc1c6ef150 256 }
ansond 0:a3fc1c6ef150 257 }
ansond 0:a3fc1c6ef150 258 else {
ansond 0:a3fc1c6ef150 259 // message not bound for our node
ansond 0:a3fc1c6ef150 260 //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
ansond 0:a3fc1c6ef150 261 ;
ansond 0:a3fc1c6ef150 262 }
ansond 0:a3fc1c6ef150 263 }
ansond 0:a3fc1c6ef150 264
ansond 31:e5950e0677be 265 // is this the dimming resource?
ansond 31:e5950e0677be 266 bool MQTTTransport::isDimmingResource(char *resource) {
ansond 31:e5950e0677be 267 bool isDimming = false;
ansond 31:e5950e0677be 268
ansond 31:e5950e0677be 269 if (resource != NULL && strcmp(resource,"Dimming") == 0) isDimming = true;
ansond 31:e5950e0677be 270
ansond 31:e5950e0677be 271 return isDimming;
ansond 31:e5950e0677be 272 }
ansond 31:e5950e0677be 273
ansond 0:a3fc1c6ef150 274 // send result back to MQTT
ansond 0:a3fc1c6ef150 275 void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
ansond 0:a3fc1c6ef150 276 if (this->m_connected == true) {
ansond 0:a3fc1c6ef150 277 // send the response back to MQTT
ansond 0:a3fc1c6ef150 278 this->logger()->log("Sending Response back to MQTT...");
ansond 0:a3fc1c6ef150 279 char message[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 0:a3fc1c6ef150 280 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 0:a3fc1c6ef150 281 char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
ansond 0:a3fc1c6ef150 282 sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
ansond 42:297585f8e7bd 283 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 284 if (network_mutex != NULL) network_mutex->lock();
ansond 42:297585f8e7bd 285 #endif
ansond 0:a3fc1c6ef150 286 bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
ansond 42:297585f8e7bd 287 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 288 if (network_mutex != NULL) network_mutex->unlock();
ansond 42:297585f8e7bd 289 #endif
ansond 0:a3fc1c6ef150 290 if (sent) {
ansond 0:a3fc1c6ef150 291 this->logger()->log("Result sent successfully");
ansond 0:a3fc1c6ef150 292 this->logger()->blinkTransportTxLED();
ansond 0:a3fc1c6ef150 293 }
ansond 0:a3fc1c6ef150 294 else {
ansond 0:a3fc1c6ef150 295 this->logger()->log("Result send FAILED");
ansond 0:a3fc1c6ef150 296 }
ansond 0:a3fc1c6ef150 297 }
ansond 0:a3fc1c6ef150 298 else {
ansond 0:a3fc1c6ef150 299 // unable to send the response
ansond 0:a3fc1c6ef150 300 this->logger()->log("Unable to send response back to MQTT. Not connected.");
ansond 0:a3fc1c6ef150 301 }
ansond 0:a3fc1c6ef150 302 }
ansond 0:a3fc1c6ef150 303
ansond 0:a3fc1c6ef150 304 char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
ansond 0:a3fc1c6ef150 305
ansond 0:a3fc1c6ef150 306 char *MQTTTransport::makeID(char *id_template,char *buffer) {
ansond 0:a3fc1c6ef150 307 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 0:a3fc1c6ef150 308 int instance_id = rand()%100;
ansond 0:a3fc1c6ef150 309 if (endpoint != NULL) instance_id = endpoint->getInstanceID();
ansond 0:a3fc1c6ef150 310 srand(time(0));
ansond 0:a3fc1c6ef150 311 srand(rand());
ansond 0:a3fc1c6ef150 312 sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE,instance_id);
ansond 0:a3fc1c6ef150 313 return buffer;
ansond 0:a3fc1c6ef150 314 }
ansond 0:a3fc1c6ef150 315
ansond 0:a3fc1c6ef150 316 // is this message a PONG message?
ansond 0:a3fc1c6ef150 317 bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
ansond 0:a3fc1c6ef150 318 bool isPong = false;
ansond 0:a3fc1c6ef150 319 char verb[MQTT_PING_VERB_LEN+1];
ansond 0:a3fc1c6ef150 320 char end[MQTT_PING_VERB_LEN+1];
ansond 0:a3fc1c6ef150 321 int counter = 0;
ansond 0:a3fc1c6ef150 322
ansond 0:a3fc1c6ef150 323 // clean
ansond 0:a3fc1c6ef150 324 memset(verb,0,MQTT_PING_VERB_LEN+1);
ansond 0:a3fc1c6ef150 325 memset(end,0,MQTT_PING_VERB_LEN+1);
ansond 0:a3fc1c6ef150 326
ansond 0:a3fc1c6ef150 327 // make sure this is for us...
ansond 0:a3fc1c6ef150 328 char *topic_ep_name = this->getEndpointNameFromTopic(topic);
ansond 0:a3fc1c6ef150 329 if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
ansond 0:a3fc1c6ef150 330 // parse the payload
ansond 0:a3fc1c6ef150 331 for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
ansond 0:a3fc1c6ef150 332 sscanf(payload,"%s%d",verb,&counter);
ansond 0:a3fc1c6ef150 333
ansond 0:a3fc1c6ef150 334 // check the contents to make sure its for us...
ansond 0:a3fc1c6ef150 335 //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
ansond 0:a3fc1c6ef150 336 if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {
ansond 0:a3fc1c6ef150 337 // its a PONG message to our PING...
ansond 0:a3fc1c6ef150 338 isPong = true;
ansond 0:a3fc1c6ef150 339 }
ansond 0:a3fc1c6ef150 340 }
ansond 0:a3fc1c6ef150 341
ansond 0:a3fc1c6ef150 342 // return isPong status
ansond 0:a3fc1c6ef150 343 return isPong;
ansond 0:a3fc1c6ef150 344 }
ansond 0:a3fc1c6ef150 345
ansond 0:a3fc1c6ef150 346
ansond 0:a3fc1c6ef150 347 // process this PONG message
ansond 0:a3fc1c6ef150 348 void MQTTTransport::processPongMessage(char *payload,int payload_length) {
ansond 0:a3fc1c6ef150 349 // DEBUG
ansond 0:a3fc1c6ef150 350 //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
ansond 0:a3fc1c6ef150 351
ansond 0:a3fc1c6ef150 352 // simply increment the counter
ansond 0:a3fc1c6ef150 353 ++this->m_ping_counter;
ansond 0:a3fc1c6ef150 354
ansond 0:a3fc1c6ef150 355 // reset counter if maxed
ansond 0:a3fc1c6ef150 356 if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
ansond 0:a3fc1c6ef150 357 }
ansond 0:a3fc1c6ef150 358
ansond 0:a3fc1c6ef150 359 // send a PING message
ansond 0:a3fc1c6ef150 360 bool MQTTTransport::sendPingMessage() {
ansond 0:a3fc1c6ef150 361 bool sent = false;
ansond 0:a3fc1c6ef150 362 char message[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 0:a3fc1c6ef150 363
ansond 0:a3fc1c6ef150 364 // initialize...
ansond 0:a3fc1c6ef150 365 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 0:a3fc1c6ef150 366
ansond 0:a3fc1c6ef150 367 // build message
ansond 0:a3fc1c6ef150 368 sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter);
ansond 0:a3fc1c6ef150 369
ansond 0:a3fc1c6ef150 370 // send the message over the ping/pong topic
ansond 0:a3fc1c6ef150 371 //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
ansond 42:297585f8e7bd 372 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 373 if (network_mutex != NULL) network_mutex->lock();
ansond 42:297585f8e7bd 374 #endif
ansond 0:a3fc1c6ef150 375 sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
ansond 42:297585f8e7bd 376 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 377 if (network_mutex != NULL) network_mutex->unlock();
ansond 42:297585f8e7bd 378 #endif
ansond 0:a3fc1c6ef150 379 if (sent) {
ansond 0:a3fc1c6ef150 380 // send succeeded
ansond 0:a3fc1c6ef150 381 //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
ansond 0:a3fc1c6ef150 382 this->logger()->blinkTransportTxLED();
ansond 0:a3fc1c6ef150 383
ansond 0:a3fc1c6ef150 384 // wait for 1 second
ansond 0:a3fc1c6ef150 385 wait_ms(1000);
ansond 0:a3fc1c6ef150 386 }
ansond 0:a3fc1c6ef150 387 else {
ansond 20:ee03f10c074e 388 // fail silently now...
ansond 20:ee03f10c074e 389 ;
ansond 20:ee03f10c074e 390
ansond 0:a3fc1c6ef150 391 // send failed! - reconnect
ansond 20:ee03f10c074e 392 //this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
ansond 0:a3fc1c6ef150 393
ansond 0:a3fc1c6ef150 394 // attempt reconnect
ansond 20:ee03f10c074e 395 //this->logger()->log("PING send failed - re-connecting MQTT...");
ansond 20:ee03f10c074e 396 //this->disconnect();
ansond 20:ee03f10c074e 397 //sent = this->connect();
ansond 20:ee03f10c074e 398 //if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
ansond 20:ee03f10c074e 399 //else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter);
ansond 0:a3fc1c6ef150 400 }
ansond 0:a3fc1c6ef150 401
ansond 0:a3fc1c6ef150 402 // return our status
ansond 0:a3fc1c6ef150 403 return sent;
ansond 0:a3fc1c6ef150 404 }
ansond 0:a3fc1c6ef150 405
ansond 29:ac6390032cec 406 static char _mqtt_id[MQTT_ENDPOINT_IDLEN+1];
ansond 29:ac6390032cec 407
ansond 0:a3fc1c6ef150 408 // connect up MQTT
ansond 0:a3fc1c6ef150 409 bool MQTTTransport::connect() {
ansond 29:ac6390032cec 410 memset(_mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
ansond 0:a3fc1c6ef150 411 if (this->m_connected == false) {
ansond 0:a3fc1c6ef150 412 this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
ansond 0:a3fc1c6ef150 413 this->m_mqtt = &_mqtt;
ansond 0:a3fc1c6ef150 414 if (this->m_mqtt != NULL) {
ansond 29:ac6390032cec 415 char *id = this->makeID(MQTT_ENDPOINT_ID,_mqtt_id);
ansond 0:a3fc1c6ef150 416 this->logger()->log("MQTT Connect: ID: %s...",id);
ansond 0:a3fc1c6ef150 417 if (this->m_mqtt->connect(id)) {
ansond 0:a3fc1c6ef150 418 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
ansond 0:a3fc1c6ef150 419 if (this->m_mqtt->subscribe(this->getTopic())) {
ansond 0:a3fc1c6ef150 420 if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
ansond 0:a3fc1c6ef150 421 this->logger()->log("MQTT CONNECTED.");
ansond 0:a3fc1c6ef150 422 this->m_connected = true;
ansond 0:a3fc1c6ef150 423 }
ansond 0:a3fc1c6ef150 424 else {
ansond 0:a3fc1c6ef150 425 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
ansond 0:a3fc1c6ef150 426 this->logger()->turnLEDRed();
ansond 0:a3fc1c6ef150 427 this->m_connected = false;
ansond 0:a3fc1c6ef150 428 }
ansond 0:a3fc1c6ef150 429 }
ansond 0:a3fc1c6ef150 430 else {
ansond 0:a3fc1c6ef150 431 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
ansond 0:a3fc1c6ef150 432 this->logger()->turnLEDRed();
ansond 0:a3fc1c6ef150 433 this->m_connected = false;
ansond 0:a3fc1c6ef150 434 }
ansond 0:a3fc1c6ef150 435 }
ansond 0:a3fc1c6ef150 436 else {
ansond 0:a3fc1c6ef150 437 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
ansond 0:a3fc1c6ef150 438 this->logger()->turnLEDRed();
ansond 0:a3fc1c6ef150 439 this->m_connected = false;
ansond 0:a3fc1c6ef150 440 }
ansond 0:a3fc1c6ef150 441 }
ansond 0:a3fc1c6ef150 442 else {
ansond 0:a3fc1c6ef150 443 this->logger()->log("MQTT Unable to allocate new instance");
ansond 0:a3fc1c6ef150 444 this->logger()->turnLEDRed();
ansond 0:a3fc1c6ef150 445 this->m_connected = false;
ansond 0:a3fc1c6ef150 446 }
ansond 0:a3fc1c6ef150 447 }
ansond 0:a3fc1c6ef150 448 else {
ansond 0:a3fc1c6ef150 449 this->logger()->log("MQTT already connected (OK)");
ansond 0:a3fc1c6ef150 450 }
ansond 0:a3fc1c6ef150 451 return this->m_connected;
ansond 0:a3fc1c6ef150 452 }
ansond 0:a3fc1c6ef150 453
ansond 0:a3fc1c6ef150 454 // disconnect from MQTT
ansond 0:a3fc1c6ef150 455 bool MQTTTransport::disconnect() {
ansond 0:a3fc1c6ef150 456 if (this->m_mqtt != NULL) {
ansond 0:a3fc1c6ef150 457 this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
ansond 42:297585f8e7bd 458 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 459 if (network_mutex != NULL) network_mutex->lock();
ansond 42:297585f8e7bd 460 #endif
ansond 0:a3fc1c6ef150 461 this->m_mqtt->unsubscribe(this->getTopic());
ansond 42:297585f8e7bd 462 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 463 if (network_mutex != NULL) network_mutex->unlock();
ansond 42:297585f8e7bd 464 #endif
ansond 0:a3fc1c6ef150 465 this->logger()->log("MQTT Disconnecting...");
ansond 0:a3fc1c6ef150 466 this->m_mqtt->disconnect();
ansond 0:a3fc1c6ef150 467 }
ansond 0:a3fc1c6ef150 468 else {
ansond 0:a3fc1c6ef150 469 this->logger()->log("MQTT already disconnected (OK)");
ansond 0:a3fc1c6ef150 470 }
ansond 0:a3fc1c6ef150 471 this->m_connected = false;
ansond 0:a3fc1c6ef150 472 return true;
ansond 0:a3fc1c6ef150 473 }
ansond 0:a3fc1c6ef150 474
ansond 0:a3fc1c6ef150 475 // check transport and process stuff
ansond 0:a3fc1c6ef150 476 void MQTTTransport::checkAndProcess() {
ansond 0:a3fc1c6ef150 477 // process any MQTT messages
ansond 0:a3fc1c6ef150 478 if (this->m_mqtt != NULL && this->m_connected == true) {
ansond 42:297585f8e7bd 479 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 480 if (network_mutex != NULL) network_mutex->lock();
ansond 42:297585f8e7bd 481 #endif
ansond 0:a3fc1c6ef150 482 bool connected = this->m_mqtt->loop();
ansond 42:297585f8e7bd 483 #ifdef NETWORK_MUTEX
ansond 0:a3fc1c6ef150 484 if (network_mutex != NULL) network_mutex->unlock();
ansond 42:297585f8e7bd 485 #endif
ansond 0:a3fc1c6ef150 486 if (connected) {
ansond 0:a3fc1c6ef150 487 this->logger()->blinkTransportRxLED();
ansond 0:a3fc1c6ef150 488 }
ansond 32:9a024a6af2fb 489 //else {
ansond 32:9a024a6af2fb 490 // this->logger()->log("Attempting reconnection to MQTT in checkAndProcess...");
ansond 32:9a024a6af2fb 491 // this->disconnect();
ansond 32:9a024a6af2fb 492 // Thread::wait(15000);
ansond 32:9a024a6af2fb 493 // this->connect();
ansond 32:9a024a6af2fb 494 //}
ansond 0:a3fc1c6ef150 495 }
ansond 0:a3fc1c6ef150 496
ansond 0:a3fc1c6ef150 497 // send a PING if time for it
ansond 0:a3fc1c6ef150 498 --this->m_ping_countdown;
ansond 0:a3fc1c6ef150 499 if (this->m_ping_countdown <= 0) {
ansond 0:a3fc1c6ef150 500 //this->logger()->log("MQTT: Sending PING...");
ansond 0:a3fc1c6ef150 501 this->m_ping_countdown = MQTT_PING_COUNTDOWN;
ansond 0:a3fc1c6ef150 502 this->sendPingMessage();
ansond 0:a3fc1c6ef150 503 }
ansond 0:a3fc1c6ef150 504 }