MBED_DEMOS / Mbed 2 deprecated mbed_mqtt_endpoint_ublox_ethernet

Dependencies:   C027 C12832 EthernetInterface StatusReporter LM75B MQTT-ansond endpoint_core endpoint_mqtt mbed-rtos mbed

Committer:
ansond
Date:
Wed Feb 26 05:45:49 2014 +0000
Revision:
5:4ad5ec5802a2
Parent:
4:11f00b499106
Child:
6:34c07e145caa
updates

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ansond 0:ae2a45502448 1 /* Copyright C2013 Doug Anson, MIT License
ansond 0:ae2a45502448 2 *
ansond 0:ae2a45502448 3 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
ansond 0:ae2a45502448 4 * and associated documentation files the "Software", to deal in the Software without restriction,
ansond 0:ae2a45502448 5 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
ansond 0:ae2a45502448 6 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
ansond 0:ae2a45502448 7 * furnished to do so, subject to the following conditions:
ansond 0:ae2a45502448 8 *
ansond 0:ae2a45502448 9 * The above copyright notice and this permission notice shall be included in all copies or
ansond 0:ae2a45502448 10 * substantial portions of the Software.
ansond 0:ae2a45502448 11 *
ansond 0:ae2a45502448 12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
ansond 0:ae2a45502448 13 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
ansond 0:ae2a45502448 14 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
ansond 0:ae2a45502448 15 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
ansond 0:ae2a45502448 16 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
ansond 0:ae2a45502448 17 */
ansond 0:ae2a45502448 18
ansond 0:ae2a45502448 19 #include "MQTTTransport.h"
ansond 0:ae2a45502448 20
ansond 4:11f00b499106 21 // Endpoint Support
ansond 0:ae2a45502448 22 #include "MBEDEndpoint.h"
ansond 0:ae2a45502448 23
ansond 4:11f00b499106 24 // string splitter support
ansond 4:11f00b499106 25 #include "Splitter.h"
ansond 0:ae2a45502448 26
ansond 0:ae2a45502448 27 // our transmitt instance
ansond 0:ae2a45502448 28 MQTTTransport *instance = NULL;
ansond 0:ae2a45502448 29
ansond 0:ae2a45502448 30 // MQTT callback to handle received messages
ansond 0:ae2a45502448 31 void _mqtt_message_handler(char *topic,char *payload,unsigned int length) {
ansond 0:ae2a45502448 32 if (instance != NULL) instance->processMessage((char *)payload,length);
ansond 0:ae2a45502448 33 }
ansond 0:ae2a45502448 34
ansond 0:ae2a45502448 35 // our MQTT client endpoint
ansond 0:ae2a45502448 36 PubSubClient _mqtt(MQTT_HOSTNAME,MQTT_HOSTPORT,_mqtt_message_handler);
ansond 0:ae2a45502448 37
ansond 0:ae2a45502448 38 // default constructor
ansond 0:ae2a45502448 39 MQTTTransport::MQTTTransport(ErrorHandler *error_handler) : Transport(error_handler) {
ansond 0:ae2a45502448 40 this->m_mqtt = NULL;
ansond 0:ae2a45502448 41 instance = this;
ansond 0:ae2a45502448 42 this->m_map = new MBEDToIOCResourceMap();
ansond 0:ae2a45502448 43 }
ansond 0:ae2a45502448 44
ansond 0:ae2a45502448 45 // default destructor
ansond 0:ae2a45502448 46 MQTTTransport::~MQTTTransport() {
ansond 0:ae2a45502448 47 this->disconnect();
ansond 0:ae2a45502448 48 if (this->m_mqtt != NULL) delete this->m_mqtt;
ansond 0:ae2a45502448 49 }
ansond 0:ae2a45502448 50
ansond 0:ae2a45502448 51 // process a MQTT Message
ansond 0:ae2a45502448 52 void MQTTTransport::processMessage(char *payload, unsigned int len) {
ansond 3:3db076b9d380 53 char *message_type = NULL;
ansond 3:3db076b9d380 54 char *message_name = NULL;
ansond 3:3db076b9d380 55 char *message_verb = NULL;
ansond 3:3db076b9d380 56 char *message_value = NULL;
ansond 3:3db076b9d380 57 char *message_opt = NULL;
ansond 3:3db076b9d380 58 char *endpoint_name = NULL;
ansond 0:ae2a45502448 59 MBEDEndpoint *endpoint = (MBEDEndpoint *)this->getEndpoint();
ansond 0:ae2a45502448 60
ansond 0:ae2a45502448 61 // DEBUG
ansond 5:4ad5ec5802a2 62 this->logger()->pause("MQTT Message: %s length=%d",payload,len);
ansond 0:ae2a45502448 63
ansond 0:ae2a45502448 64 // split the string by the delimiter
ansond 5:4ad5ec5802a2 65 this->logger()->pause("DOUG1");
ansond 5:4ad5ec5802a2 66 Splitter *splitter = new Splitter(payload,':',this->logger());
ansond 5:4ad5ec5802a2 67 this->logger()->pause("DOUG2");
ansond 4:11f00b499106 68 char** data = splitter->split();
ansond 5:4ad5ec5802a2 69 this->logger()->pause("DOUG3");
ansond 4:11f00b499106 70 int count = splitter->size(data);
ansond 5:4ad5ec5802a2 71 this->logger()->pause("DOUG4");
ansond 0:ae2a45502448 72
ansond 0:ae2a45502448 73 // format of the MQTT message: message_type:name:verb|Parameter_X:value|keyword:optional_data
ansond 2:90a84a216c58 74 /*
ansond 4:11f00b499106 75 if (count > 0) message_type = data[0];
ansond 4:11f00b499106 76 if (count > 1) message_name = data[1];
ansond 4:11f00b499106 77 if (count > 2) message_verb = data[2];
ansond 4:11f00b499106 78 if (count > 3) message_value = data[3];
ansond 4:11f00b499106 79 if (count > 4) message_opt = data[4];
ansond 2:90a84a216c58 80 */
ansond 2:90a84a216c58 81
ansond 3:3db076b9d380 82 // get our endpoint name
ansond 5:4ad5ec5802a2 83 this->logger()->pause("DOUG5");
ansond 3:3db076b9d380 84 endpoint_name = endpoint->getEndpointName();
ansond 3:3db076b9d380 85
ansond 2:90a84a216c58 86 // XXX FIXUP until MQTT messages have endpoint_names in them
ansond 4:11f00b499106 87 if (count > 0) message_type = data[0];
ansond 4:11f00b499106 88 message_name = endpoint->getEndpointName(); // if (count > 1) message_name = (char *)data[1].c_str();
ansond 4:11f00b499106 89 if (count > 1) message_verb = data[1];
ansond 4:11f00b499106 90 if (count > 2) message_value = data[2];
ansond 4:11f00b499106 91 if (count > 3) message_opt = data[3];
ansond 3:3db076b9d380 92
ansond 2:90a84a216c58 93 // DEBUG
ansond 3:3db076b9d380 94 this->logger()->log("Type: %s Name: %s Verb: %s Value: %s",message_type,message_name,message_verb,message_value);
ansond 0:ae2a45502448 95
ansond 3:3db076b9d380 96 // only respond if its for our node
ansond 3:3db076b9d380 97 if (false && strcmp(endpoint_name,message_name) == 0) {
ansond 3:3db076b9d380 98 // DEBUG
ansond 3:3db076b9d380 99 this->logger()->log("Message: %s bound for %s... processing...",payload,endpoint_name);
ansond 3:3db076b9d380 100
ansond 3:3db076b9d380 101 // load endpoints
ansond 3:3db076b9d380 102 if (message_type != NULL && strcmp(message_type,"Endpoint") == 0) {
ansond 3:3db076b9d380 103 if (message_name != NULL && strcmp(message_name,"all") == 0) {
ansond 3:3db076b9d380 104 if (message_verb != NULL && strcmp(message_verb,"load") == 0) {
ansond 3:3db076b9d380 105 // load up our endpoint
ansond 3:3db076b9d380 106 endpoint->loadEndpoint();
ansond 3:3db076b9d380 107 }
ansond 3:3db076b9d380 108 if (message_verb != NULL && strcmp(message_verb,"Update") == 0) {
ansond 0:ae2a45502448 109 // update our endpoint
ansond 0:ae2a45502448 110 endpoint->updateEndpoint();
ansond 0:ae2a45502448 111 }
ansond 0:ae2a45502448 112 }
ansond 3:3db076b9d380 113 else {
ansond 3:3db076b9d380 114 // destined for our lights?
ansond 3:3db076b9d380 115 int index = -1;
ansond 3:3db076b9d380 116 if (message_name != NULL) {
ansond 3:3db076b9d380 117 endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 118 if (index >= 0) {
ansond 3:3db076b9d380 119 if (message_verb != NULL && strcmp(message_verb,"Update") == 0) {
ansond 3:3db076b9d380 120 // update our endpoint
ansond 3:3db076b9d380 121 endpoint->updateEndpoint();
ansond 3:3db076b9d380 122 }
ansond 3:3db076b9d380 123 }
ansond 3:3db076b9d380 124 }
ansond 3:3db076b9d380 125 }
ansond 0:ae2a45502448 126 }
ansond 3:3db076b9d380 127
ansond 3:3db076b9d380 128 // change a resource value
ansond 3:3db076b9d380 129 if (message_type != NULL && strcmp(message_type,"Change") == 0) {
ansond 3:3db076b9d380 130 if (message_name != NULL) {
ansond 3:3db076b9d380 131 // destined for our lights?
ansond 3:3db076b9d380 132 int index = endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 133 if (index >= 0) {
ansond 3:3db076b9d380 134 if (message_verb != NULL) {
ansond 3:3db076b9d380 135 // map the parameter to one of ours
ansond 3:3db076b9d380 136 char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
ansond 3:3db076b9d380 137 if (mapped_resource != NULL) {
ansond 3:3db076b9d380 138 if (message_value != NULL) {
ansond 3:3db076b9d380 139 ResourceFactory *factory = endpoint->getResources(index);
ansond 3:3db076b9d380 140 factory->setResourceValue(message_name,message_value);
ansond 3:3db076b9d380 141 }
ansond 3:3db076b9d380 142 }
ansond 3:3db076b9d380 143 }
ansond 3:3db076b9d380 144 }
ansond 3:3db076b9d380 145 }
ansond 3:3db076b9d380 146 }
ansond 3:3db076b9d380 147
ansond 3:3db076b9d380 148 // get a resource value
ansond 3:3db076b9d380 149 if (message_type != NULL && strcmp(message_type,"Get") == 0) {
ansond 3:3db076b9d380 150 if (message_name != NULL) {
ansond 3:3db076b9d380 151 // destined for our lights?
ansond 3:3db076b9d380 152 int index = endpoint->indexOfLight((char *)message_name);
ansond 3:3db076b9d380 153 if (index >= 0) {
ansond 3:3db076b9d380 154 if (message_verb != NULL) {
ansond 3:3db076b9d380 155 // map the parameter to one of ours
ansond 3:3db076b9d380 156 char *mapped_resource = this->mapEndpointResourceToIOCResource((char *)message_verb);
ansond 3:3db076b9d380 157 if (mapped_resource != NULL) {
ansond 3:3db076b9d380 158 ResourceFactory *factory = endpoint->getResources(index);
ansond 3:3db076b9d380 159 char *resource_value = factory->getResourceValue((char *)message_name);
ansond 3:3db076b9d380 160
ansond 3:3db076b9d380 161 // end the resource value back over MQTT
ansond 3:3db076b9d380 162 this->sendResourceValue(message_name,message_verb,resource_value);
ansond 3:3db076b9d380 163 }
ansond 3:3db076b9d380 164 }
ansond 3:3db076b9d380 165 }
ansond 0:ae2a45502448 166 }
ansond 0:ae2a45502448 167 }
ansond 0:ae2a45502448 168 }
ansond 3:3db076b9d380 169 else {
ansond 3:3db076b9d380 170 // message not bound for our node
ansond 3:3db076b9d380 171 this->logger()->log("MQTT Message: %s not for us: %s... ignoring...",payload,endpoint_name);
ansond 3:3db076b9d380 172 }
ansond 0:ae2a45502448 173
ansond 4:11f00b499106 174 // clean up the array
ansond 4:11f00b499106 175 if (data != NULL) splitter->clear(data);
ansond 4:11f00b499106 176
ansond 3:3db076b9d380 177 // clean up
ansond 4:11f00b499106 178 if (splitter != NULL) delete splitter;
ansond 0:ae2a45502448 179 }
ansond 0:ae2a45502448 180
ansond 0:ae2a45502448 181 void MQTTTransport::sendResourceValue(char *endpoint_name,char *parameter_name,char *value) {
ansond 0:ae2a45502448 182 }
ansond 0:ae2a45502448 183
ansond 0:ae2a45502448 184 char *MQTTTransport::mapEndpointResourceToIOCResource(char *ioc_name) {
ansond 0:ae2a45502448 185 return this->m_map->getMBEDMappedName(ioc_name);
ansond 0:ae2a45502448 186 }
ansond 0:ae2a45502448 187
ansond 0:ae2a45502448 188 char *MQTTTransport::makeID(char *id_template,char *buffer) {
ansond 0:ae2a45502448 189 srand(time(0));
ansond 0:ae2a45502448 190 srand(rand());
ansond 0:ae2a45502448 191 sprintf(buffer,id_template,rand()%MQTT_MAXID_VALUE);
ansond 0:ae2a45502448 192 return buffer;
ansond 0:ae2a45502448 193 }
ansond 0:ae2a45502448 194
ansond 0:ae2a45502448 195 // connect up MQTT
ansond 0:ae2a45502448 196 bool MQTTTransport::connect() {
ansond 0:ae2a45502448 197 char mqtt_id[MQTT_ENDPOINT_IDLEN+1];
ansond 0:ae2a45502448 198 memset(mqtt_id,0,(MQTT_ENDPOINT_IDLEN+1));
ansond 0:ae2a45502448 199 if (this->m_connected == false) {
ansond 0:ae2a45502448 200 this->logger()->log("MQTT Init: %s:%d...",MQTT_HOSTNAME,MQTT_HOSTPORT);
ansond 0:ae2a45502448 201 this->m_mqtt = &_mqtt;
ansond 0:ae2a45502448 202 if (this->m_mqtt != NULL) {
ansond 0:ae2a45502448 203 char *id = this->makeID(MQTT_ENDPOINT_ID,mqtt_id);
ansond 0:ae2a45502448 204 this->logger()->log("MQTT Connect: ID: %s...",id);
ansond 0:ae2a45502448 205 if (this->m_mqtt->connect(id)) {
ansond 0:ae2a45502448 206 this->logger()->log("MQTT Subscribe: Topic: %s...",MQTT_IOC_TOPIC);
ansond 0:ae2a45502448 207 if (this->m_mqtt->subscribe(MQTT_IOC_TOPIC)) {
ansond 0:ae2a45502448 208 this->logger()->log("MQTT CONNECTED.");
ansond 0:ae2a45502448 209 this->m_connected = true;
ansond 0:ae2a45502448 210 }
ansond 0:ae2a45502448 211 else {
ansond 0:ae2a45502448 212 this->logger()->log("MQTT Subscribe: Topic: %s FAILED",MQTT_IOC_TOPIC);
ansond 0:ae2a45502448 213 this->logger()->turnLEDRed();
ansond 0:ae2a45502448 214 }
ansond 0:ae2a45502448 215 }
ansond 0:ae2a45502448 216 else {
ansond 0:ae2a45502448 217 this->logger()->log("MQTT Connect: ID: %s FAILED",id);
ansond 0:ae2a45502448 218 this->logger()->turnLEDRed();
ansond 0:ae2a45502448 219 }
ansond 0:ae2a45502448 220 }
ansond 0:ae2a45502448 221 else {
ansond 0:ae2a45502448 222 this->logger()->log("MQTT Unable to allocate new instance");
ansond 0:ae2a45502448 223 this->logger()->turnLEDRed();
ansond 0:ae2a45502448 224 }
ansond 0:ae2a45502448 225 }
ansond 0:ae2a45502448 226 else {
ansond 0:ae2a45502448 227 this->logger()->log("MQTT already connected (OK)");
ansond 0:ae2a45502448 228 }
ansond 0:ae2a45502448 229 return this->m_connected;
ansond 0:ae2a45502448 230 }
ansond 0:ae2a45502448 231
ansond 0:ae2a45502448 232 // disconnect from MQTT
ansond 0:ae2a45502448 233 bool MQTTTransport::disconnect() {
ansond 0:ae2a45502448 234 if (this->m_mqtt != NULL) {
ansond 0:ae2a45502448 235 this->logger()->log("MQTT Unsubscribing from: %s...",MQTT_IOC_TOPIC);
ansond 0:ae2a45502448 236 this->m_mqtt->unsubscribe(MQTT_IOC_TOPIC);
ansond 0:ae2a45502448 237 this->logger()->log("MQTT Disconnecting...");
ansond 0:ae2a45502448 238 this->m_mqtt->disconnect();
ansond 0:ae2a45502448 239 }
ansond 0:ae2a45502448 240 else {
ansond 0:ae2a45502448 241 this->logger()->log("MQTT already disconnected (OK)");
ansond 0:ae2a45502448 242 }
ansond 0:ae2a45502448 243 this->m_connected = false;
ansond 0:ae2a45502448 244 return true;
ansond 0:ae2a45502448 245 }
ansond 0:ae2a45502448 246
ansond 0:ae2a45502448 247 // check transport and process stuff
ansond 0:ae2a45502448 248 void MQTTTransport::checkAndProcess() {
ansond 0:ae2a45502448 249 if (this->m_mqtt != NULL && this->m_connected == true) {
ansond 0:ae2a45502448 250 this->m_mqtt->loop();
ansond 0:ae2a45502448 251 this->logger()->blinkMQTTTransportRxLED();
ansond 0:ae2a45502448 252 }
ansond 0:ae2a45502448 253 }