MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_ublox_ethernet

Dependencies:   C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

Committer:
ansond
Date:
Thu Feb 27 23:33:24 2014 +0000
Revision:
22:f1002e5993c5
Parent:
15:e44d75d95b38
Child:
23:793b2898522e
updates

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ansond 0:ae2a45502448 1 /* Copyright C2013 Doug Anson, MIT License
ansond 0:ae2a45502448 2 *
ansond 0:ae2a45502448 3 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
ansond 0:ae2a45502448 4 * and associated documentation files the "Software", to deal in the Software without restriction,
ansond 0:ae2a45502448 5 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
ansond 0:ae2a45502448 6 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
ansond 0:ae2a45502448 7 * furnished to do so, subject to the following conditions:
ansond 0:ae2a45502448 8 *
ansond 0:ae2a45502448 9 * The above copyright notice and this permission notice shall be included in all copies or
ansond 0:ae2a45502448 10 * substantial portions of the Software.
ansond 0:ae2a45502448 11 *
ansond 0:ae2a45502448 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
ansond 0:ae2a45502448 13 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
ansond 0:ae2a45502448 14 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
ansond 0:ae2a45502448 15 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
ansond 0:ae2a45502448 16 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
ansond 0:ae2a45502448 17 */
ansond 0:ae2a45502448 18
ansond 0:ae2a45502448 19 #include "MQTTTransport.h"
ansond 0:ae2a45502448 20
ansond 4:11f00b499106 21 // Endpoint Support
ansond 0:ae2a45502448 22 #include "MBEDEndpoint.h"
ansond 0:ae2a45502448 23
ansond 8:45f9a920e82c 24 // EmulatedResourceFactory support
ansond 8:45f9a920e82c 25 #include "EmulatedResourceFactory.h"
ansond 8:45f9a920e82c 26
ansond 7:f570eb3f38cd 27 // splitstring support
ansond 7:f570eb3f38cd 28 #include "splitstring.h"
ansond 0:ae2a45502448 29
ansond 0:ae2a45502448 30 // our transmitt instance
ansond 15:e44d75d95b38 31 MQTTTransport *_mqtt_instance = NULL;
ansond 0:ae2a45502448 32
ansond 0:ae2a45502448 33 // MQTT callback to handle received messages
ansond 0:ae2a45502448 34 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
ansond 15:e44d75d95b38 35 if (_mqtt_instance != NULL) _mqtt_instance->processMessage(_mqtt_instance->getEndpointNameFromTopic(topic),payload,length);
ansond 0:ae2a45502448 36 }
ansond 0:ae2a45502448 37
ansond 0:ae2a45502448 38 // our MQTT client endpoint
ansond 0:ae2a45502448 39 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
ansond 0:ae2a45502448 40
ansond 0:ae2a45502448 41 // default constructor
ansond 15:e44d75d95b38 42 MQTTTransport::MQTTTransport(ErrorHandler *error_handler,void *endpoint,MBEDToIOCResourceMap *map) : Transport(error_handler,endpoint) {
ansond 0:ae2a45502448 43 this->m_mqtt = NULL;
ansond 15:e44d75d95b38 44 _mqtt_instance = this;
ansond 15:e44d75d95b38 45 this->m_map = map;
ansond 8:45f9a920e82c 46 this->initTopic();
ansond 0:ae2a45502448 47 }
ansond 0:ae2a45502448 48
ansond 0:ae2a45502448 49 // default destructor
ansond 0:ae2a45502448 50 MQTTTransport::~MQTTTransport() {
ansond 0:ae2a45502448 51 this->disconnect();
ansond 0:ae2a45502448 52 }
ansond 0:ae2a45502448 53
ansond 8:45f9a920e82c 54 // init our topic
ansond 8:45f9a920e82c 55 void MQTTTransport::initTopic() {
ansond 8:45f9a920e82c 56 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 8:45f9a920e82c 57 char *endpoint_name = endpoint->getEndpointName();
ansond 8:45f9a920e82c 58 memset(this->m_topic,0,MQTT_IOC_TOPIC_LEN+1);
ansond 8:45f9a920e82c 59 sprintf(this->m_topic,MQTT_IOC_TOPIC,endpoint_name);
ansond 8:45f9a920e82c 60 }
ansond 8:45f9a920e82c 61
ansond 8:45f9a920e82c 62 // get our topic
ansond 8:45f9a920e82c 63 char *MQTTTransport::getTopic() { return this->m_topic; }
ansond 8:45f9a920e82c 64
ansond 15:e44d75d95b38 65 // get the IOC <--> MBED resource map
ansond 15:e44d75d95b38 66 MBEDToIOCResourceMap *MQTTTransport::getMap() { return this->m_map; }
ansond 15:e44d75d95b38 67
ansond 6:34c07e145caa 68 // pull the endpoint name from the MQTT topic
ansond 6:34c07e145caa 69 char *MQTTTransport::getEndpointNameFromTopic(char *topic) {
ansond 6:34c07e145caa 70 memset(this->m_endpoint_name,0,LIGHT_NAME_LEN+1);
ansond 8:45f9a920e82c 71 splitstring *tmp = new splitstring(topic);
ansond 8:45f9a920e82c 72 vector<string> data = tmp->split('/');
ansond 8:45f9a920e82c 73 if (data.size() > 0) {
ansond 8:45f9a920e82c 74 char *ep = (char *)data[data.size()-1].c_str();
ansond 22:f1002e5993c5 75 if (ep != NULL) strncpy(this->m_endpoint_name,ep,strlen(ep));
ansond 8:45f9a920e82c 76 }
ansond 8:45f9a920e82c 77 if (tmp != NULL) delete tmp;
ansond 6:34c07e145caa 78 return this->m_endpoint_name;
ansond 6:34c07e145caa 79 }
ansond 6:34c07e145caa 80
ansond 0:ae2a45502448 81 // process a MQTT Message
ansond 6:34c07e145caa 82 void MQTTTransport::processMessage(char *message_name,char *payload, unsigned int len) {
ansond 7:f570eb3f38cd 83 char *message_type = "";
ansond 7:f570eb3f38cd 84 char *message_verb = "";
ansond 7:f570eb3f38cd 85 char *message_value = "";
ansond 14:0a6497a380a4 86 //char *message_opt = "";
ansond 7:f570eb3f38cd 87
ansond 6:34c07e145caa 88 // get our endpoint
ansond 0:ae2a45502448 89 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 7:f570eb3f38cd 90 char *endpoint_name = endpoint->getEndpointName();
ansond 6:34c07e145caa 91
ansond 0:ae2a45502448 92 // DEBUG
ansond 8:45f9a920e82c 93 //this->logger()->log("Endpoint:[%s] Target: [%s]",endpoint_name,message_name);
ansond 7:f570eb3f38cd 94
ansond 3:3db076b9d380 95 // only respond if its for our node
ansond 8:45f9a920e82c 96 if (strcmp(endpoint_name,message_name) == 0) {
ansond 7:f570eb3f38cd 97 // format of the MQTT message: message_type:verb|Parameter_X:value|keyword:optional_data
ansond 7:f570eb3f38cd 98 splitstring *tmp = new splitstring(payload);
ansond 7:f570eb3f38cd 99 vector<string> data = tmp->split(':');
ansond 7:f570eb3f38cd 100 if (data.size() > 0) message_type = (char *)data[0].c_str();
ansond 7:f570eb3f38cd 101 if (data.size() > 1) message_verb = (char *)data[1].c_str();
ansond 7:f570eb3f38cd 102 if (data.size() > 2) message_value = (char *)data[2].c_str();
ansond 14:0a6497a380a4 103 //if (data.size() > 3) message_opt = (char *)data[3].c_str();
ansond 7:f570eb3f38cd 104
ansond 7:f570eb3f38cd 105 // DEBUG
ansond 15:e44d75d95b38 106 //this->logger()->log("Raw Payload: %s",payload);
ansond 8:45f9a920e82c 107 //this->logger()->log("Type: %s Name: %s Verb: %s Value: %s",message_type,message_name,message_verb,message_value);
ansond 8:45f9a920e82c 108
ansond 3:3db076b9d380 109 // load endpoints
ansond 15:e44d75d95b38 110 if (message_type != NULL && strcmp(message_type,IOC_ENDPOINT_VERB) == 0) { // Endpoint
ansond 15:e44d75d95b38 111 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_LOAD_ALL_VERB) == 0) { // load
ansond 15:e44d75d95b38 112 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
ansond 15:e44d75d95b38 113 // load up our endpoints
ansond 15:e44d75d95b38 114 this->logger()->log("would load all endpoints...");
ansond 15:e44d75d95b38 115 endpoint->loadEndpoints();
ansond 0:ae2a45502448 116 }
ansond 0:ae2a45502448 117 }
ansond 15:e44d75d95b38 118
ansond 15:e44d75d95b38 119 else if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) { // update
ansond 15:e44d75d95b38 120 if (message_value != NULL && strcmp(message_value,IOC_ENDPOINT_ALL_VERB) == 0) { // all
ansond 15:e44d75d95b38 121 // update our endpoints
ansond 15:e44d75d95b38 122 this->logger()->log("would update all endpoints...");
ansond 15:e44d75d95b38 123 //endpoint->updateEndpoints();
ansond 15:e44d75d95b38 124 }
ansond 15:e44d75d95b38 125 else {
ansond 15:e44d75d95b38 126 // update just our endpoint
ansond 15:e44d75d95b38 127 int index = -1;
ansond 15:e44d75d95b38 128 if (message_name != NULL) {
ansond 15:e44d75d95b38 129 index = endpoint->indexOfLight((char *)message_name);
ansond 15:e44d75d95b38 130 if (index >= 0) {
ansond 15:e44d75d95b38 131 if (message_verb != NULL && strcmp(message_verb,IOC_REQUEST_UPDATE_ALL_VERB) == 0) {
ansond 15:e44d75d95b38 132 // update our endpoint
ansond 15:e44d75d95b38 133 this->logger()->log("would update just our endpoint...");
ansond 15:e44d75d95b38 134 //endpoint->updateEndpoints(index);
ansond 15:e44d75d95b38 135 }
ansond 3:3db076b9d380 136 }
ansond 3:3db076b9d380 137 }
ansond 3:3db076b9d380 138 }
ansond 3:3db076b9d380 139 }
ansond 0:ae2a45502448 140 }
ansond 3:3db076b9d380 141
ansond 3:3db076b9d380 142 // change a resource value
ansond 13:25448d92c205 143 if (message_type != NULL && strcmp(message_type,IOC_CHANGE_VERB) == 0) {
ansond 8:45f9a920e82c 144 if (message_name != NULL) {
ansond 3:3db076b9d380 145 // destined for our lights?
ansond 3:3db076b9d380 146 int index = endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 147 if (index >= 0) {
ansond 3:3db076b9d380 148 if (message_verb != NULL) {
ansond 3:3db076b9d380 149 // map the parameter to one of ours
ansond 14:0a6497a380a4 150 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
ansond 3:3db076b9d380 151 if (mapped_resource != NULL) {
ansond 3:3db076b9d380 152 if (message_value != NULL) {
ansond 8:45f9a920e82c 153 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
ansond 8:45f9a920e82c 154 bool success = factory->setResourceValue(mapped_resource,message_value);
ansond 8:45f9a920e82c 155
ansond 8:45f9a920e82c 156 // end the resource value back over MQTT
ansond 8:45f9a920e82c 157 this->sendResult(message_name,message_verb,message_value,success);
ansond 3:3db076b9d380 158 }
ansond 3:3db076b9d380 159 }
ansond 3:3db076b9d380 160 }
ansond 3:3db076b9d380 161 }
ansond 3:3db076b9d380 162 }
ansond 3:3db076b9d380 163 }
ansond 3:3db076b9d380 164
ansond 3:3db076b9d380 165 // get a resource value
ansond 13:25448d92c205 166 if (message_type != NULL && strcmp(message_type,IOC_REQUEST_VALUE_VERB) == 0) {
ansond 3:3db076b9d380 167 if (message_name != NULL) {
ansond 3:3db076b9d380 168 // destined for our lights?
ansond 3:3db076b9d380 169 int index = endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 170 if (index >= 0) {
ansond 3:3db076b9d380 171 if (message_verb != NULL) {
ansond 3:3db076b9d380 172 // map the parameter to one of ours
ansond 13:25448d92c205 173 char *mapped_resource = this->mapIOCResourceToEndpointResource((char *)message_verb);
ansond 3:3db076b9d380 174 if (mapped_resource != NULL) {
ansond 8:45f9a920e82c 175 EmulatedResourceFactory *factory = (EmulatedResourceFactory *)endpoint->getResources(index);
ansond 8:45f9a920e82c 176 message_value = factory->getResourceValue((char *)mapped_resource);
ansond 8:45f9a920e82c 177 bool success = false; if (message_value != NULL) success = true;
ansond 8:45f9a920e82c 178
ansond 8:45f9a920e82c 179 // log resource get
ansond 8:45f9a920e82c 180 if (success) this->logger()->log("Resource: %s (%s) Value: %s",message_verb,mapped_resource,message_value);
ansond 3:3db076b9d380 181
ansond 3:3db076b9d380 182 // end the resource value back over MQTT
ansond 8:45f9a920e82c 183 this->sendResult(message_name,message_verb,message_value,success);
ansond 3:3db076b9d380 184 }
ansond 3:3db076b9d380 185 }
ansond 3:3db076b9d380 186 }
ansond 0:ae2a45502448 187 }
ansond 0:ae2a45502448 188 }
ansond 7:f570eb3f38cd 189
ansond 7:f570eb3f38cd 190 // clean up
ansond 8:45f9a920e82c 191 if (tmp != NULL) delete tmp;
ansond 0:ae2a45502448 192 }
ansond 3:3db076b9d380 193 else {
ansond 3:3db076b9d380 194 // message not bound for our node
ansond 3:3db076b9d380 195 this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
ansond 7:f570eb3f38cd 196 }
ansond 0:ae2a45502448 197 }
ansond 0:ae2a45502448 198
ansond 8:45f9a920e82c 199 // send result back to MQTT
ansond 8:45f9a920e82c 200 void MQTTTransport::sendResult(char *endpoint_name,char *resource_name,char *value,bool success) {
ansond 8:45f9a920e82c 201 if (this->m_connected == true) {
ansond 8:45f9a920e82c 202 // send the response back to MQTT
ansond 8:45f9a920e82c 203 this->logger()->log("Sending Response back to MQTT...");
ansond 8:45f9a920e82c 204 char message[MAX_MQTT_MESSAGE_LENGTH+1];
ansond 8:45f9a920e82c 205 memset(message,0,MAX_MQTT_MESSAGE_LENGTH+1);
ansond 13:25448d92c205 206 char *str_success = IOC_RESPONSE_OK; if (!success) str_success = IOC_RESPONSE_FAILED;
ansond 13:25448d92c205 207 sprintf(message,IOC_RESPONSE_TEMPLATE,IOC_RESPONSE_VERB,endpoint_name,resource_name,value,str_success);
ansond 8:45f9a920e82c 208 bool sent = this->m_mqtt->publish(this->getTopic(),message,strlen(message));
ansond 8:45f9a920e82c 209 if (sent) {
ansond 8:45f9a920e82c 210 this->logger()->log("Result sent successfully");
ansond 8:45f9a920e82c 211 this->logger()->blinkMQTTTransportTxLED();
ansond 8:45f9a920e82c 212 }
ansond 8:45f9a920e82c 213 else {
ansond 8:45f9a920e82c 214 this->logger()->log("Result send FAILED");
ansond 8:45f9a920e82c 215 }
ansond 8:45f9a920e82c 216 }
ansond 8:45f9a920e82c 217 else {
ansond 8:45f9a920e82c 218 // unable to send the response
ansond 8:45f9a920e82c 219 this->logger()->log("Unable to send response back to MQTT. Not connected.");
ansond 8:45f9a920e82c 220 }
ansond 0:ae2a45502448 221 }
ansond 0:ae2a45502448 222
ansond 15:e44d75d95b38 223 char *MQTTTransport::mapIOCResourceToEndpointResource(char *ioc_name) { return this->getMap()->iocNameToEndpointName(ioc_name); }
ansond 0:ae2a45502448 224
ansond 0:ae2a45502448 225 char *MQTTTransport::makeID(char *id_template,char *buffer) {
ansond 0:ae2a45502448 226 srand(time(0));
ansond 0:ae2a45502448 227 srand(rand());
ansond 0:ae2a45502448 228 sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE);
ansond 0:ae2a45502448 229 return buffer;
ansond 0:ae2a45502448 230 }
ansond 0:ae2a45502448 231
ansond 0:ae2a45502448 232 // connect up MQTT
ansond 0:ae2a45502448 233 bool MQTTTransport::connect() {
ansond 0:ae2a45502448 234 char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
ansond 0:ae2a45502448 235 memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
ansond 0:ae2a45502448 236 if (this->m_connected == false) {
ansond 0:ae2a45502448 237 this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
ansond 0:ae2a45502448 238 this->m_mqtt = &_mqtt;
ansond 0:ae2a45502448 239 if (this->m_mqtt != NULL) {
ansond 0:ae2a45502448 240 char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
ansond 0:ae2a45502448 241 this->logger()->log("MQTT Connect: ID: %s...",id);
ansond 0:ae2a45502448 242 if (this->m_mqtt->connect(id)) {
ansond 8:45f9a920e82c 243 this->logger()->log("MQTT Subscribe: Topic: %s...",this->getTopic());
ansond 8:45f9a920e82c 244 if (this->m_mqtt->subscribe(this->getTopic())) {
ansond 22:f1002e5993c5 245 if (this->m_mqtt->subscribe(MQTT_IOC_ANNOUNCE_TOPIC)) {
ansond 22:f1002e5993c5 246 this->logger()->log("MQTT CONNECTED.");
ansond 22:f1002e5993c5 247 this->m_connected = true;
ansond 22:f1002e5993c5 248 }
ansond 22:f1002e5993c5 249 else {
ansond 22:f1002e5993c5 250 this->logger()->log("MQTT Subscribe: Topic(ANNOUNCE): %s FAILED",this->getTopic());
ansond 22:f1002e5993c5 251 this->logger()->turnLEDRed();
ansond 22:f1002e5993c5 252 this->m_connected = false;
ansond 22:f1002e5993c5 253 }
ansond 0:ae2a45502448 254 }
ansond 0:ae2a45502448 255 else {
ansond 8:45f9a920e82c 256 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",this->getTopic());
ansond 0:ae2a45502448 257 this->logger()->turnLEDRed();
ansond 7:f570eb3f38cd 258 this->m_connected = false;
ansond 0:ae2a45502448 259 }
ansond 0:ae2a45502448 260 }
ansond 0:ae2a45502448 261 else {
ansond 0:ae2a45502448 262 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
ansond 0:ae2a45502448 263 this->logger()->turnLEDRed();
ansond 7:f570eb3f38cd 264 this->m_connected = false;
ansond 0:ae2a45502448 265 }
ansond 0:ae2a45502448 266 }
ansond 0:ae2a45502448 267 else {
ansond 0:ae2a45502448 268 this->logger()->log("MQTT Unable to allocate new instance");
ansond 0:ae2a45502448 269 this->logger()->turnLEDRed();
ansond 7:f570eb3f38cd 270 this->m_connected = false;
ansond 0:ae2a45502448 271 }
ansond 0:ae2a45502448 272 }
ansond 0:ae2a45502448 273 else {
ansond 0:ae2a45502448 274 this->logger()->log("MQTT already connected (OK)");
ansond 0:ae2a45502448 275 }
ansond 0:ae2a45502448 276 return this->m_connected;
ansond 0:ae2a45502448 277 }
ansond 0:ae2a45502448 278
ansond 0:ae2a45502448 279 // disconnect from MQTT
ansond 0:ae2a45502448 280 bool MQTTTransport::disconnect() {
ansond 0:ae2a45502448 281 if (this->m_mqtt != NULL) {
ansond 8:45f9a920e82c 282 this->logger()->log("MQTT Unsubscribing from: %s...",this->getTopic());
ansond 8:45f9a920e82c 283 this->m_mqtt->unsubscribe(this->getTopic());
ansond 22:f1002e5993c5 284 this->m_mqtt->unsubscribe(MQTT_IOC_ANNOUNCE_TOPIC);
ansond 0:ae2a45502448 285 this->logger()->log("MQTT Disconnecting...");
ansond 0:ae2a45502448 286 this->m_mqtt->disconnect();
ansond 0:ae2a45502448 287 }
ansond 0:ae2a45502448 288 else {
ansond 0:ae2a45502448 289 this->logger()->log("MQTT already disconnected (OK)");
ansond 0:ae2a45502448 290 }
ansond 0:ae2a45502448 291 this->m_connected = false;
ansond 0:ae2a45502448 292 return true;
ansond 0:ae2a45502448 293 }
ansond 0:ae2a45502448 294
ansond 0:ae2a45502448 295 // check transport and process stuff
ansond 0:ae2a45502448 296 void MQTTTransport::checkAndProcess() {
ansond 0:ae2a45502448 297 if (this->m_mqtt != NULL && this->m_connected == true) {
ansond 0:ae2a45502448 298 this->m_mqtt->loop();
ansond 0:ae2a45502448 299 this->logger()->blinkMQTTTransportRxLED();
ansond 0:ae2a45502448 300 }
ansond 0:ae2a45502448 301 }