MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_ublox_cellular

Dependencies:   C027_Support C12832 StatusReporter LM75B MQTT-ansond c027_radios endpoint_core endpoint_mqtt mbed-rtos mbed

Committer:
ansond
Date:
Fri Mar 21 05:33:18 2014 +0000
Revision:
161:eea2bbfbb387
Parent:
160:3f373ec7ab3c
Child:
162:c9e9926cdc79
updates

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ansond 0:ae2a45502448 1 /* Copyright C2013 Doug Anson, MIT License
ansond 0:ae2a45502448 2 *
ansond 0:ae2a45502448 3 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
ansond 0:ae2a45502448 4 * and associated documentation files the "Software", to deal in the Software without restriction,
ansond 0:ae2a45502448 5 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
ansond 0:ae2a45502448 6 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
ansond 0:ae2a45502448 7 * furnished to do so, subject to the following conditions:
ansond 0:ae2a45502448 8 *
ansond 0:ae2a45502448 9 * The above copyright notice and this permission notice shall be included in all copies or
ansond 0:ae2a45502448 10 * substantial portions of the Software.
ansond 0:ae2a45502448 11 *
ansond 0:ae2a45502448 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
ansond 0:ae2a45502448 13 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
ansond 0:ae2a45502448 14 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
ansond 0:ae2a45502448 15 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
ansond 0:ae2a45502448 16 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
ansond 0:ae2a45502448 17 */
ansond 0:ae2a45502448 18
ansond 136:ea8f3900bc13 19 #include "mbed.h"
ansond 136:ea8f3900bc13 20 #include "rtos.h"
ansond 136:ea8f3900bc13 21
ansond 0:ae2a45502448 22 #include "MQTTTransport.h"
ansond 0:ae2a45502448 23
ansond 4:11f00b499106 24 // Endpoint Support
ansond 0:ae2a45502448 25 #include "MBEDEndpoint.h"
ansond 0:ae2a45502448 26
ansond 8:45f9a920e82c 27 // EmulatedResourceFactory support
ansond 8:45f9a920e82c 28 #include "EmulatedResourceFactory.h"
ansond 136:ea8f3900bc13 29
ansond 136:ea8f3900bc13 30 // Network mutex
ansond 136:ea8f3900bc13 31 extern Mutex *network_mutex;
ansond 69:090e16acccb7 32
ansond 0:ae2a45502448 33 // our transmitt instance
ansond 15:e44d75d95b38 34 MQTTTransport *_mqtt_instance = NULL;
ansond 0:ae2a45502448 35
ansond 0:ae2a45502448 36 // MQTT callback to handle received messages
ansond 0:ae2a45502448 37 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
ansond 131:27f29e230bbb 38 char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 131:27f29e230bbb 39 char rcv_topic[MQTT_IOC_TOPIC_LEN+1];
ansond 131:27f29e230bbb 40
ansond 131:27f29e230bbb 41 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 131:27f29e230bbb 42 memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 131:27f29e230bbb 43 memcpy(buffer,payload,length);
ansond 131:27f29e230bbb 44 strcpy(rcv_topic,topic);
ansond 131:27f29e230bbb 45
ansond 129:c4fa24308e33 46 if (_mqtt_instance != NULL) {
ansond 131:27f29e230bbb 47 if (_mqtt_instance->isPongMessage(rcv_topic,buffer,length)) {
ansond 131:27f29e230bbb 48 _mqtt_instance->processPongMessage(buffer,length);
ansond 131:27f29e230bbb 49 }
ansond 131:27f29e230bbb 50 else {
ansond 131:27f29e230bbb 51 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 131:27f29e230bbb 52 memset(rcv_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 131:27f29e230bbb 53 memcpy(buffer,payload,length);
ansond 131:27f29e230bbb 54 strcpy(rcv_topic,topic);
ansond 131:27f29e230bbb 55 _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(rcv_topic),buffer,length);
ansond 131:27f29e230bbb 56 }
ansond 129:c4fa24308e33 57 }
ansond 0:ae2a45502448 58 }
ansond 0:ae2a45502448 59
ansond 0:ae2a45502448 60 // our MQTT client endpoint
ansond 0:ae2a45502448 61 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
ansond 0:ae2a45502448 62
ansond 0:ae2a45502448 63 // default constructor
ansond 15:e44d75d95b38 64 MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
ansond 0:ae2a45502448 65 this->m_mqtt = NULL;
ansond 15:e44d75d95b38 66 _mqtt_instance = this;
ansond 15:e44d75d95b38 67 this->m_map = map;
ansond 129:c4fa24308e33 68 this->m_ping_counter = 1;
ansond 129:c4fa24308e33 69 this->m_ping_countdown = MQTT_PING_COUNTDOWN;
ansond 8:45f9a920e82c 70 this->initTopic();
ansond 0:ae2a45502448 71 }
ansond 0:ae2a45502448 72
ansond 0:ae2a45502448 73 // default destructor
ansond 0:ae2a45502448 74 MQTTTransport::~MQTTTransport() {
ansond 0:ae2a45502448 75 this->disconnect();
ansond 0:ae2a45502448 76 }
ansond 131:27f29e230bbb 77
ansond 8:45f9a920e82c 78 // init our topic
ansond 8:45f9a920e82c 79 void MQTTTransport::initTopic() {
ansond 8:45f9a920e82c 80 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 8:45f9a920e82c 81 char *endpoint_name = endpoint->getEndpointName();
ansond 8:45f9a920e82c 82 memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 8:45f9a920e82c 83 sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
ansond 8:45f9a920e82c 84 }
ansond 8:45f9a920e82c 85
ansond 8:45f9a920e82c 86 // get our topic
ansond 8:45f9a920e82c 87 char *MQTTTransport::getTopic() { return this->m_topic; }
ansond 8:45f9a920e82c 88
ansond 15:e44d75d95b38 89 // get the IOC <--> MBED resource map
ansond 15:e44d75d95b38 90 MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
ansond 15:e44d75d95b38 91
ansond 6:34c07e145caa 92 // pull the endpoint name from the MQTT topic
ansond 6:34c07e145caa 93 char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
ansond 48:d3663434128d 94 if (topic != NULL) {
ansond 48:d3663434128d 95 memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1);
ansond 69:090e16acccb7 96 char trash[MQTT_IOC_TOPIC_LEN+1];
ansond 69:090e16acccb7 97 char ep[MQTT_IOC_TOPIC_LEN+1];
ansond 69:090e16acccb7 98 memset(trash,0,MQTT_IOC_TOPIC_LEN+1);
ansond 69:090e16acccb7 99 memset(ep,0,MQTT_IOC_TOPIC_LEN+1);
ansond 69:090e16acccb7 100 bool done = false;
ansond 69:090e16acccb7 101 int length = 0; if (topic != NULL) length = strlen(topic);
ansond 69:090e16acccb7 102 for(int i=length-1;i>=0 && !done;--i) if (topic[i] == '/') { topic[i] = ' ' ; done = true; }
ansond 69:090e16acccb7 103 sscanf(topic,"%s%s",trash,ep);
ansond 160:3f373ec7ab3c 104 //this->logger()->log("MQTT: Topic:[%s] trash:[%s] ep:[%s]",topic,trash,ep);
ansond 69:090e16acccb7 105 if (strlen(ep) > 0) {
ansond 69:090e16acccb7 106 if (strcmp(ep,MQTT_IOC_ALL_ENDPOINT) != 0) {
ansond 69:090e16acccb7 107 // just insert the name and let the parser determine if its for us or not...
ansond 69:090e16acccb7 108 strncpy(this->m_endpoint_name,ep,strlen(ep));
ansond 69:090e16acccb7 109 }
ansond 69:090e16acccb7 110 else {
ansond 69:090e16acccb7 111 // this is a broadcast message - so we need to process it
ansond 69:090e16acccb7 112 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 69:090e16acccb7 113 char *endpoint_name = endpoint->getEndpointName();
ansond 69:090e16acccb7 114 strcpy(this->m_endpoint_name,endpoint_name);
ansond 69:090e16acccb7 115 }
ansond 48:d3663434128d 116 }
ansond 89:6e7dd4cba216 117 //this->logger()->log("MQTT Topic (discovered): %s Original: %s",this->m_endpoint_name,topic);
ansond 48:d3663434128d 118 return this->m_endpoint_name;
ansond 48:d3663434128d 119 }
ansond 89:6e7dd4cba216 120 //this->logger()->log("MQTT Topic (discovered): NULL Original: %s",topic);
ansond 48:d3663434128d 121 return NULL;
ansond 6:34c07e145caa 122 }
ansond 6:34c07e145caa 123
ansond 0:ae2a45502448 124 // process a MQTT Message
ansond 69:090e16acccb7 125 void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int payload_length) {
ansond 69:090e16acccb7 126 char message_type[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 69:090e16acccb7 127 char message_verb[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 69:090e16acccb7 128 char message_value[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 69:090e16acccb7 129 char message_opt[MQTT_PAYLOAD_SEGMENT_LEN+1];
ansond 69:090e16acccb7 130
ansond 69:090e16acccb7 131 // initialize
ansond 69:090e16acccb7 132 memset(message_type,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 69:090e16acccb7 133 memset(message_verb,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 69:090e16acccb7 134 memset(message_value,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 69:090e16acccb7 135 memset(message_opt,0,MQTT_PAYLOAD_SEGMENT_LEN+1);
ansond 23:793b2898522e 136
ansond 6:34c07e145caa 137 // get our endpoint
ansond 0:ae2a45502448 138 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 7:f570eb3f38cd 139 char *endpoint_name = endpoint->getEndpointName();
ansond 6:34c07e145caa 140
ansond 0:ae2a45502448 141 // DEBUG
ansond 8:45f9a920e82c 142 //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
ansond 7:f570eb3f38cd 143
ansond 3:3db076b9d380 144 // only respond if its for our node
ansond 8:45f9a920e82c 145 if (strcmp(endpoint_name,message_name) == 0) {
ansond 69:090e16acccb7 146 // format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data
ansond 69:090e16acccb7 147 char buffer[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 69:090e16acccb7 148 memset(buffer,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 69:090e16acccb7 149 memcpy(buffer,payload,payload_length);
ansond 69:090e16acccb7 150 int count = 0; for(int i=0;i<payload_length;++i) if (payload[i] == ':') ++count;
ansond 72:ffd155ec8b93 151 for(int i=0;i<payload_length;++i) {
ansond 72:ffd155ec8b93 152 if (buffer[i] == ':') {
ansond 72:ffd155ec8b93 153 if (i < (payload_length-1)) buffer[i] = ' ';
ansond 72:ffd155ec8b93 154 else buffer[i] = '\0';
ansond 72:ffd155ec8b93 155 }
ansond 72:ffd155ec8b93 156 }
ansond 160:3f373ec7ab3c 157 if (count == 1) sscanf(buffer,"%s%s",message_type,message_verb);
ansond 160:3f373ec7ab3c 158 if (count == 2) sscanf(buffer,"%s%s%s",message_type,message_verb,message_value);
ansond 160:3f373ec7ab3c 159 if (count == 3) sscanf(buffer,"%s%s%s%s",message_type,message_verb,message_value,message_opt);
ansond 69:090e16acccb7 160
ansond 7:f570eb3f38cd 161 // DEBUG
ansond 72:ffd155ec8b93 162 //this->logger()->log("Raw Payload: %s, length: %d",payload,payload_length);
ansond 72:ffd155ec8b93 163 //this->logger()->log("Buffer: %s, length: %d",buffer,strlen(buffer));
ansond 72:ffd155ec8b93 164 //this->logger()->log("Parsed Payload: Type: [%s] Name: [%s] Verb: [%s] Value: [%s]",message_type,message_name,message_verb,message_value);
ansond 8:45f9a920e82c 165
ansond 3:3db076b9d380 166 // load endpoints
ansond 15:e44d75d95b38 167 if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint
ansond 15:e44d75d95b38 168 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load
ansond 15:e44d75d95b38 169 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
ansond 15:e44d75d95b38 170 // load up our endpoints
ansond 15:e44d75d95b38 171 endpoint->loadEndpoints();
ansond 0:ae2a45502448 172 }
ansond 93:e3b732068ae9 173 else if (message_value != NULL && strcmp(message_value,this->m_endpoint_name) == 0) {
ansond 93:e3b732068ae9 174 // load up our endpoints (us only)
ansond 93:e3b732068ae9 175 endpoint->loadEndpoints();
ansond 93:e3b732068ae9 176 }
ansond 0:ae2a45502448 177 }
ansond 15:e44d75d95b38 178
ansond 15:e44d75d95b38 179 else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update
ansond 15:e44d75d95b38 180 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
ansond 15:e44d75d95b38 181 // update our endpoints
ansond 23:793b2898522e 182 endpoint->updateEndpoints();
ansond 15:e44d75d95b38 183 }
ansond 15:e44d75d95b38 184 else {
ansond 15:e44d75d95b38 185 // update just our endpoint
ansond 15:e44d75d95b38 186 int index = -1;
ansond 15:e44d75d95b38 187 if (message_name != NULL) {
ansond 15:e44d75d95b38 188 index = endpoint->indexOfLight((char *)message_name);
ansond 15:e44d75d95b38 189 if (index >= 0) {
ansond 15:e44d75d95b38 190 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
ansond 15:e44d75d95b38 191 // update our endpoint
ansond 23:793b2898522e 192 endpoint->updateEndpoints(index);
ansond 15:e44d75d95b38 193 }
ansond 3:3db076b9d380 194 }
ansond 3:3db076b9d380 195 }
ansond 3:3db076b9d380 196 }
ansond 3:3db076b9d380 197 }
ansond 0:ae2a45502448 198 }
ansond 3:3db076b9d380 199
ansond 3:3db076b9d380 200 // change a resource value
ansond 13:25448d92c205 201 if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
ansond 8:45f9a920e82c 202 if (message_name != NULL) {
ansond 3:3db076b9d380 203 // destined for our lights?
ansond 3:3db076b9d380 204 int index = endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 205 if (index >= 0) {
ansond 3:3db076b9d380 206 if (message_verb != NULL) {
ansond 3:3db076b9d380 207 // map the parameter to one of ours
ansond 14:0a6497a380a4 208 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
ansond 3:3db076b9d380 209 if (mapped_resource != NULL) {
ansond 3:3db076b9d380 210 if (message_value != NULL) {
ansond 8:45f9a920e82c 211 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
ansond 8:45f9a920e82c 212 bool success = factory->setResourceValue(mapped_resource,message_value);
ansond 8:45f9a920e82c 213
ansond 8:45f9a920e82c 214 // end the resource value back over MQTT
ansond 8:45f9a920e82c 215 this->sendResult(message_name,message_verb,message_value,success);
ansond 3:3db076b9d380 216 }
ansond 3:3db076b9d380 217 }
ansond 3:3db076b9d380 218 }
ansond 3:3db076b9d380 219 }
ansond 3:3db076b9d380 220 }
ansond 3:3db076b9d380 221 }
ansond 3:3db076b9d380 222
ansond 3:3db076b9d380 223 // get a resource value
ansond 13:25448d92c205 224 if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
ansond 3:3db076b9d380 225 if (message_name != NULL) {
ansond 3:3db076b9d380 226 // destined for our lights?
ansond 3:3db076b9d380 227 int index = endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 228 if (index >= 0) {
ansond 3:3db076b9d380 229 if (message_verb != NULL) {
ansond 3:3db076b9d380 230 // map the parameter to one of ours
ansond 13:25448d92c205 231 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
ansond 3:3db076b9d380 232 if (mapped_resource != NULL) {
ansond 8:45f9a920e82c 233 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
ansond 69:090e16acccb7 234 strcpy(message_value,factory->getResourceValue((char *)mapped_resource));
ansond 8:45f9a920e82c 235 bool success = false; if (message_value != NULL) success = true;
ansond 8:45f9a920e82c 236
ansond 8:45f9a920e82c 237 // log resource get
ansond 8:45f9a920e82c 238 if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
ansond 3:3db076b9d380 239
ansond 3:3db076b9d380 240 // end the resource value back over MQTT
ansond 8:45f9a920e82c 241 this->sendResult(message_name,message_verb,message_value,success);
ansond 3:3db076b9d380 242 }
ansond 3:3db076b9d380 243 }
ansond 3:3db076b9d380 244 }
ansond 0:ae2a45502448 245 }
ansond 0:ae2a45502448 246 }
ansond 0:ae2a45502448 247 }
ansond 3:3db076b9d380 248 else {
ansond 3:3db076b9d380 249 // message not bound for our node
ansond 131:27f29e230bbb 250 //this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
ansond 131:27f29e230bbb 251 ;
ansond 7:f570eb3f38cd 252 }
ansond 0:ae2a45502448 253 }
ansond 0:ae2a45502448 254
ansond 8:45f9a920e82c 255 // send result back to MQTT
ansond 8:45f9a920e82c 256 void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
ansond 8:45f9a920e82c 257 if (this->m_connected == true) {
ansond 8:45f9a920e82c 258 // send the response back to MQTT
ansond 8:45f9a920e82c 259 this->logger()->log("Sending Response back to MQTT...");
ansond 8:45f9a920e82c 260 char message[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 8:45f9a920e82c 261 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 13:25448d92c205 262 char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
ansond 13:25448d92c205 263 sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
ansond 136:ea8f3900bc13 264 if (network_mutex != NULL) network_mutex->lock();
ansond 8:45f9a920e82c 265 bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
ansond 136:ea8f3900bc13 266 if (network_mutex != NULL) network_mutex->unlock();
ansond 8:45f9a920e82c 267 if (sent) {
ansond 8:45f9a920e82c 268 this->logger()->log("Result sent successfully");
ansond 36:210f3956038c 269 this->logger()->blinkTransportTxLED();
ansond 8:45f9a920e82c 270 }
ansond 8:45f9a920e82c 271 else {
ansond 8:45f9a920e82c 272 this->logger()->log("Result send FAILED");
ansond 8:45f9a920e82c 273 }
ansond 8:45f9a920e82c 274 }
ansond 8:45f9a920e82c 275 else {
ansond 8:45f9a920e82c 276 // unable to send the response
ansond 8:45f9a920e82c 277 this->logger()->log("Unable to send response back to MQTT. Not connected.");
ansond 8:45f9a920e82c 278 }
ansond 0:ae2a45502448 279 }
ansond 0:ae2a45502448 280
ansond 15:e44d75d95b38 281 char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
ansond 0:ae2a45502448 282
ansond 0:ae2a45502448 283 char *MQTTTransport::makeID(char *id_template,char *buffer) {
ansond 0:ae2a45502448 284 srand(time(0));
ansond 0:ae2a45502448 285 srand(rand());
ansond 0:ae2a45502448 286 sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE);
ansond 0:ae2a45502448 287 return buffer;
ansond 0:ae2a45502448 288 }
ansond 0:ae2a45502448 289
ansond 129:c4fa24308e33 290 // is this message a PONG message?
ansond 129:c4fa24308e33 291 bool MQTTTransport::isPongMessage(char *topic,char *payload,int payload_length) {
ansond 129:c4fa24308e33 292 bool isPong = false;
ansond 129:c4fa24308e33 293 char verb[MQTT_PING_VERB_LEN+1];
ansond 131:27f29e230bbb 294 char end[MQTT_PING_VERB_LEN+1];
ansond 129:c4fa24308e33 295 int counter = 0;
ansond 129:c4fa24308e33 296
ansond 129:c4fa24308e33 297 // clean
ansond 129:c4fa24308e33 298 memset(verb,0,MQTT_PING_VERB_LEN+1);
ansond 131:27f29e230bbb 299 memset(end,0,MQTT_PING_VERB_LEN+1);
ansond 131:27f29e230bbb 300
ansond 131:27f29e230bbb 301 // make sure this is for us...
ansond 131:27f29e230bbb 302 char *topic_ep_name = this->getEndpointNameFromTopic(topic);
ansond 131:27f29e230bbb 303 if (topic_ep_name != NULL && strcmp(topic_ep_name,this->m_endpoint_name) == 0) {
ansond 129:c4fa24308e33 304 // parse the payload
ansond 129:c4fa24308e33 305 for(int i=0;payload != NULL && i<payload_length;++i) if (payload[i] == ':') payload[i] = ' ';
ansond 131:27f29e230bbb 306 sscanf(payload,"%s%d",verb,&counter);
ansond 129:c4fa24308e33 307
ansond 129:c4fa24308e33 308 // check the contents to make sure its for us...
ansond 132:563a1ee99efc 309 //this->logger()->log("isPongMessage: verb: %s counter %d ping_counter: %d",verb,counter,this->m_ping_counter);
ansond 131:27f29e230bbb 310 if (strcmp(verb,"pong") == 0 && counter == this->m_ping_counter) {
ansond 129:c4fa24308e33 311 // its a PONG message to our PING...
ansond 129:c4fa24308e33 312 isPong = true;
ansond 129:c4fa24308e33 313 }
ansond 129:c4fa24308e33 314 }
ansond 129:c4fa24308e33 315
ansond 129:c4fa24308e33 316 // return isPong status
ansond 129:c4fa24308e33 317 return isPong;
ansond 129:c4fa24308e33 318 }
ansond 129:c4fa24308e33 319
ansond 129:c4fa24308e33 320
ansond 129:c4fa24308e33 321 // process this PONG message
ansond 129:c4fa24308e33 322 void MQTTTransport::processPongMessage(char *payload,int payload_length) {
ansond 129:c4fa24308e33 323 // DEBUG
ansond 133:e69d03f4eb5c 324 //this->logger()->log("Received PONG: counter=%d",this->m_ping_counter);
ansond 129:c4fa24308e33 325
ansond 129:c4fa24308e33 326 // simply increment the counter
ansond 129:c4fa24308e33 327 ++this->m_ping_counter;
ansond 129:c4fa24308e33 328
ansond 129:c4fa24308e33 329 // reset counter if maxed
ansond 129:c4fa24308e33 330 if (this->m_ping_counter >= MQTT_MAX_COUNTER) this->m_ping_counter = 1;
ansond 129:c4fa24308e33 331 }
ansond 129:c4fa24308e33 332
ansond 129:c4fa24308e33 333 // send a PING message
ansond 129:c4fa24308e33 334 bool MQTTTransport::sendPingMessage() {
ansond 129:c4fa24308e33 335 bool sent = false;
ansond 129:c4fa24308e33 336 char message[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 129:c4fa24308e33 337
ansond 129:c4fa24308e33 338 // initialize...
ansond 129:c4fa24308e33 339 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 129:c4fa24308e33 340
ansond 129:c4fa24308e33 341 // build message
ansond 132:563a1ee99efc 342 sprintf(message,"ping:%s:%d:",this->m_endpoint_name,this->m_ping_counter);
ansond 129:c4fa24308e33 343
ansond 129:c4fa24308e33 344 // send the message over the ping/pong topic
ansond 133:e69d03f4eb5c 345 //this->logger()->log("Sending PING: counter=%d",this->m_ping_counter);
ansond 136:ea8f3900bc13 346 if (network_mutex != NULL) network_mutex->lock();
ansond 132:563a1ee99efc 347 sent = this->m_mqtt->publish(MQTT_PING_SEND_TOPIC,message,strlen(message));
ansond 136:ea8f3900bc13 348 if (network_mutex != NULL) network_mutex->unlock();
ansond 129:c4fa24308e33 349 if (sent) {
ansond 129:c4fa24308e33 350 // send succeeded
ansond 132:563a1ee99efc 351 //this->logger()->log("PING %d sent successfully",this->m_ping_counter);
ansond 129:c4fa24308e33 352 this->logger()->blinkTransportTxLED();
ansond 131:27f29e230bbb 353
ansond 131:27f29e230bbb 354 // wait for 1 second
ansond 131:27f29e230bbb 355 wait_ms(1000);
ansond 129:c4fa24308e33 356 }
ansond 129:c4fa24308e33 357 else {
ansond 129:c4fa24308e33 358 // send failed! - reconnect
ansond 129:c4fa24308e33 359 this->logger()->log("PING send %d FAILED... (re)connecting...",this->m_ping_counter);
ansond 129:c4fa24308e33 360
ansond 129:c4fa24308e33 361 // attempt reconnect
ansond 148:b09187a0fa77 362 this->logger()->log("PING send failed - re-connecting MQTT...");
ansond 129:c4fa24308e33 363 this->disconnect();
ansond 129:c4fa24308e33 364 sent = this->connect();
ansond 129:c4fa24308e33 365 if (sent) this->logger()->log("PING %d: MQTT reconnection successful...",this->m_ping_counter);
ansond 148:b09187a0fa77 366 else this->logger()->log("PING %d: resend failed giving up...",this->m_ping_counter);
ansond 129:c4fa24308e33 367 }
ansond 129:c4fa24308e33 368
ansond 129:c4fa24308e33 369 // return our status
ansond 129:c4fa24308e33 370 return sent;
ansond 129:c4fa24308e33 371 }
ansond 129:c4fa24308e33 372
ansond 0:ae2a45502448 373 // connect up MQTT
ansond 0:ae2a45502448 374 bool MQTTTransport::connect() {
ansond 136:ea8f3900bc13 375 if (network_mutex != NULL) network_mutex->lock();
ansond 0:ae2a45502448 376 char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
ansond 0:ae2a45502448 377 memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
ansond 0:ae2a45502448 378 if (this->m_connected == false) {
ansond 0:ae2a45502448 379 this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
ansond 0:ae2a45502448 380 this->m_mqtt = &_mqtt;
ansond 0:ae2a45502448 381 if (this->m_mqtt != NULL) {
ansond 0:ae2a45502448 382 char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
ansond 0:ae2a45502448 383 this->logger()->log("MQTT Connect: ID: %s...",id);
ansond 0:ae2a45502448 384 if (this->m_mqtt->connect(id)) {
ansond 8:45f9a920e82c 385 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
ansond 8:45f9a920e82c 386 if (this->m_mqtt->subscribe(this->getTopic())) {
ansond 161:eea2bbfbb387 387 if (this->m_mqtt->subscribe(MQTT_IOC_ALL_TOPIC)) {
ansond 161:eea2bbfbb387 388 this->logger()->log("MQTT CONNECTED.");
ansond 161:eea2bbfbb387 389 this->m_connected = true;
ansond 161:eea2bbfbb387 390 }
ansond 161:eea2bbfbb387 391 else {
ansond 161:eea2bbfbb387 392 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_ALL_TOPIC);
ansond 161:eea2bbfbb387 393 this->logger()->turnLEDRed();
ansond 161:eea2bbfbb387 394 this->m_connected = false;
ansond 161:eea2bbfbb387 395 }
ansond 0:ae2a45502448 396 }
ansond 0:ae2a45502448 397 else {
ansond 131:27f29e230bbb 398 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
ansond 0:ae2a45502448 399 this->logger()->turnLEDRed();
ansond 7:f570eb3f38cd 400 this->m_connected = false;
ansond 0:ae2a45502448 401 }
ansond 0:ae2a45502448 402 }
ansond 0:ae2a45502448 403 else {
ansond 0:ae2a45502448 404 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
ansond 0:ae2a45502448 405 this->logger()->turnLEDRed();
ansond 7:f570eb3f38cd 406 this->m_connected = false;
ansond 0:ae2a45502448 407 }
ansond 0:ae2a45502448 408 }
ansond 0:ae2a45502448 409 else {
ansond 0:ae2a45502448 410 this->logger()->log("MQTT Unable to allocate new instance");
ansond 0:ae2a45502448 411 this->logger()->turnLEDRed();
ansond 7:f570eb3f38cd 412 this->m_connected = false;
ansond 0:ae2a45502448 413 }
ansond 0:ae2a45502448 414 }
ansond 0:ae2a45502448 415 else {
ansond 0:ae2a45502448 416 this->logger()->log("MQTT already connected (OK)");
ansond 0:ae2a45502448 417 }
ansond 136:ea8f3900bc13 418 if (network_mutex != NULL) network_mutex->unlock();
ansond 0:ae2a45502448 419 return this->m_connected;
ansond 0:ae2a45502448 420 }
ansond 0:ae2a45502448 421
ansond 0:ae2a45502448 422 // disconnect from MQTT
ansond 0:ae2a45502448 423 bool MQTTTransport::disconnect() {
ansond 0:ae2a45502448 424 if (this->m_mqtt != NULL) {
ansond 8:45f9a920e82c 425 this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
ansond 136:ea8f3900bc13 426 if (network_mutex != NULL) network_mutex->lock();
ansond 8:45f9a920e82c 427 this->m_mqtt->unsubscribe(this->getTopic());
ansond 136:ea8f3900bc13 428 if (network_mutex != NULL) network_mutex->unlock();
ansond 0:ae2a45502448 429 this->logger()->log("MQTT Disconnecting...");
ansond 0:ae2a45502448 430 this->m_mqtt->disconnect();
ansond 0:ae2a45502448 431 }
ansond 0:ae2a45502448 432 else {
ansond 0:ae2a45502448 433 this->logger()->log("MQTT already disconnected (OK)");
ansond 0:ae2a45502448 434 }
ansond 0:ae2a45502448 435 this->m_connected = false;
ansond 0:ae2a45502448 436 return true;
ansond 0:ae2a45502448 437 }
ansond 0:ae2a45502448 438
ansond 0:ae2a45502448 439 // check transport and process stuff
ansond 0:ae2a45502448 440 void MQTTTransport::checkAndProcess() {
ansond 129:c4fa24308e33 441 // process any MQTT messages
ansond 0:ae2a45502448 442 if (this->m_mqtt != NULL && this->m_connected == true) {
ansond 136:ea8f3900bc13 443 if (network_mutex != NULL) network_mutex->lock();
ansond 131:27f29e230bbb 444 bool connected = this->m_mqtt->loop();
ansond 136:ea8f3900bc13 445 if (network_mutex != NULL) network_mutex->unlock();
ansond 131:27f29e230bbb 446 if (connected) {
ansond 131:27f29e230bbb 447 this->logger()->blinkTransportRxLED();
ansond 131:27f29e230bbb 448 }
ansond 131:27f29e230bbb 449 else {
ansond 148:b09187a0fa77 450 this->logger()->log("Attempting reconnection to MQTT in checkAndProcess...");
ansond 131:27f29e230bbb 451 this->disconnect();
ansond 131:27f29e230bbb 452 this->connect();
ansond 131:27f29e230bbb 453 }
ansond 0:ae2a45502448 454 }
ansond 129:c4fa24308e33 455
ansond 129:c4fa24308e33 456 // send a PING if time for it
ansond 132:563a1ee99efc 457 --this->m_ping_countdown;
ansond 132:563a1ee99efc 458 if (this->m_ping_countdown <= 0) {
ansond 133:e69d03f4eb5c 459 //this->logger()->log("MQTT: Sending PING...");
ansond 132:563a1ee99efc 460 this->m_ping_countdown = MQTT_PING_COUNTDOWN;
ansond 132:563a1ee99efc 461 this->sendPingMessage();
ansond 132:563a1ee99efc 462 }
ansond 0:ae2a45502448 463 }