Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: NetServices FatFileSystem csv_parser mbed MQTTClient RF12B DNSResolver SDFileSystem
IoTRouting.cpp
00001 /** IoT Gateway Routing for incoming messages 00002 * 00003 * @author Andrew Lindsay 00004 * 00005 * @section LICENSE 00006 * 00007 * Copyright (c) 2012 Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk) 00008 * 00009 * Permission is hereby granted, free of charge, to any person obtaining a copy 00010 * of this software and associated documentation files (the "Software"), to deal 00011 * in the Software without restriction, including without limitation the rights 00012 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00013 * copies of the Software, and to permit persons to whom the Software is 00014 * furnished to do so, subject to the following conditions: 00015 00016 * The above copyright notice and this permission notice shall be included in 00017 * all copies or substantial portions of the Software. 00018 * 00019 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00020 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00021 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00022 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00023 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00024 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00025 * THE SOFTWARE. 00026 * 00027 * @section DESCRIPTION 00028 * 00029 * 00030 */ 00031 00032 #include "mbed.h" 00033 #include "RF12B.h" 00034 #include "IoTRouting.h" 00035 #include "csv_parser.h" 00036 #include "PayloadV1.h" 00037 #include "PayloadV2.h" 00038 #include "PayloadSimple.h" 00039 #include "OutputPachube.h" 00040 00041 //DigitalOut activityLED(p29, "activityLED"); 00042 00043 #define iotRoutingFile "/iotfs/IOTRTG.TXT" 00044 #define iotNodeDefFile "/iotfs/IOTNODE.TXT" 00045 00046 00047 // Default Constructor 00048 IoTRouting::IoTRouting() {} 00049 00050 00051 void IoTRouting::addNodeToList(uint8_t groupId, uint8_t nodeId, uint8_t length, uint8_t type ) { 00052 IoTNodeDef *pr = new IoTNodeDef(); 00053 pr->groupId = groupId; 00054 pr->nodeId = nodeId; 00055 pr->length = length; 00056 pr->type = type; 00057 00058 _nodeDefList.push_back(pr); 00059 } 00060 00061 void IoTRouting::addRoutingToList( short nodeId, short sensorId, float factor, byte outType, 00062 char *param1, char *param2, char *param3, char *param4 ) { 00063 00064 PayloadRouting* pr = new PayloadRouting(); 00065 pr->nodeId = nodeId; 00066 pr->sensorId = sensorId; 00067 00068 pr->factor = factor; 00069 pr->outType = outType; 00070 00071 switch ( pr->outType ) { 00072 case OUTPUT_TYPE_PACHUBE: 00073 pr->output = pachubeOutput; 00074 break; 00075 case OUTPUT_TYPE_MQTT: 00076 pr->output = mqttOutput; 00077 break; 00078 case OUTPUT_TYPE_EMONCMS: 00079 pr->output = emonCmsOutput; 00080 break; 00081 case OUTPUT_TYPE_SENSE: 00082 pr->output = senSeOutput; 00083 break; 00084 default: 00085 pr->output = NULL; 00086 break; 00087 } 00088 00089 pr->param1 = new char[strlen(param1)+1]; 00090 strcpy(pr->param1, param1); 00091 pr->param2 = new char[strlen(param2)+1]; 00092 strcpy(pr->param2, param2); 00093 pr->param3 = new char[strlen(param3)+1]; 00094 strcpy(pr->param3, param3); 00095 pr->param4 = new char[strlen(param4)+1]; 00096 strcpy(pr->param4, param4); 00097 // printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4); 00098 _routing.push_back(pr); 00099 } 00100 00101 00102 void IoTRouting::initRouting() { 00103 sendCount = 0; 00104 00105 FILE *fp = fopen(iotNodeDefFile, "r"); 00106 if (fp == NULL) { 00107 printf("Could not open file %s for read\n", iotNodeDefFile); 00108 return; 00109 } 00110 00111 // read file 00112 00113 const char field_terminator = ','; 00114 const char line_terminator = '\n'; 00115 const char enclosure_char = '"'; 00116 00117 csv_parser file_parser; 00118 00119 /* Define how many records we're gonna skip. This could be used to skip the column definitions. */ 00120 file_parser.set_skip_lines(0); 00121 00122 /* Specify the file to parse */ 00123 file_parser.init(fp); 00124 00125 /* Here we tell the parser how to parse the file */ 00126 file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL); 00127 file_parser.set_field_term_char(field_terminator); 00128 file_parser.set_line_term_char(line_terminator); 00129 00130 /* Check to see if there are more records, then grab each row one at a time */ 00131 00132 while (file_parser.has_more_rows()) { 00133 csv_row row = file_parser.get_row(); 00134 if (row.size()<4) { 00135 printf("row too short %d\n", row.size()); 00136 continue; 00137 } 00138 addNodeToList((uint8_t)atoi(row[0].c_str()), (uint8_t)atoi(row[1].c_str()), 00139 (uint8_t)atoi(row[2].c_str()), (uint8_t)atoi(row[3].c_str())); 00140 } 00141 00142 fclose(fp); 00143 00144 // printf("Finished loading routing data\n"); 00145 00146 fp = fopen(iotRoutingFile, "r"); 00147 if (fp == NULL) { 00148 printf("Could not open file %s for read\n", iotRoutingFile); 00149 return; 00150 } 00151 // _routing=new list<PayloadRouting*>(); 00152 00153 /* Define how many records we're gonna skip. This could be used to skip the column definitions. */ 00154 file_parser.set_skip_lines(0); 00155 00156 /* Specify the file to parse */ 00157 file_parser.init(fp); 00158 00159 /* Here we tell the parser how to parse the file */ 00160 file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL); 00161 file_parser.set_field_term_char(field_terminator); 00162 file_parser.set_line_term_char(line_terminator); 00163 00164 /* Check to see if there are more records, then grab each row one at a time */ 00165 00166 while (file_parser.has_more_rows()) { 00167 csv_row row = file_parser.get_row(); 00168 if (row.size()<6) { 00169 printf("row too short %d\n", row.size()); 00170 continue; 00171 } 00172 addRoutingToList( atoi(row[0].c_str()), atoi(row[1].c_str()), atof(row[2].c_str()), atoi(row[3].c_str()), 00173 (char*)row[4].c_str(), (char*)row[5].c_str(), (char*)row[6].c_str(), (char*)row[7].c_str() ); 00174 } 00175 00176 fclose(fp); 00177 00178 // printf("Finished loading routing data\n"); 00179 00180 // if(!mqtt.connect("IoTGateway")){ 00181 // printf("\r\nConnect to server failed ..\r\n"); 00182 // return -1; 00183 // } 00184 } 00185 00186 // Write the current nodelist back to file 00187 // First rename original file to iotsetup.bak 00188 bool IoTRouting::writeNodeList() { 00189 00190 // Rename original file 00191 // if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) { 00192 // printf("Could not rename file %s\n", iotConfigFile); 00193 // return false; 00194 // } 00195 00196 FILE *fp = fopen(iotNodeDefFile, "w"); 00197 if (fp == NULL) { 00198 printf("Could not open file %s for write\n", iotNodeDefFile); 00199 return false; 00200 } 00201 00202 for (short i=0; i<(int)_nodeDefList.size(); i++) { 00203 IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i); 00204 // printf("%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type ); 00205 fprintf(fp, "%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type ); 00206 } 00207 00208 fclose(fp); 00209 00210 return true; 00211 } 00212 00213 // Write the current nodelist back to file 00214 // First rename original file to iotsetup.bak 00215 bool IoTRouting::writeRoutingList() { 00216 00217 // Rename original file 00218 // if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) { 00219 // printf("Could not rename file %s\n", iotRoutingFile); 00220 // return false; 00221 // } 00222 00223 FILE *fp = fopen(iotRoutingFile, "w"); 00224 if (fp == NULL) { 00225 printf("Could not open file %s for write\n", iotRoutingFile); 00226 return false; 00227 } 00228 00229 for ( short i=0; i<(short)_routing.size(); i++ ) { 00230 PayloadRouting *pr = (PayloadRouting*)_routing.at(i); 00231 // printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4); 00232 fprintf(fp, "%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4); 00233 } 00234 00235 fclose(fp); 00236 00237 return true; 00238 } 00239 00240 void IoTRouting::addOutput( OutputDef *output, short outType ) { 00241 switch ( outType ) { 00242 case OUTPUT_TYPE_PACHUBE: 00243 pachubeOutput = output; 00244 break; 00245 case OUTPUT_TYPE_MQTT: 00246 mqttOutput = output; 00247 break; 00248 case OUTPUT_TYPE_EMONCMS: 00249 emonCmsOutput = output; 00250 break; 00251 case OUTPUT_TYPE_SENSE: 00252 senSeOutput = output; 00253 break; 00254 default: 00255 break; 00256 } 00257 } 00258 00259 PayloadRouting* IoTRouting::getRouting( short nodeId, short sensorId ) { 00260 // printf("Getting routing info for node %d, sensor %d - ", nodeId, sensorId); 00261 for ( short i=0; i<(short)_routing.size(); i++ ) { 00262 00263 PayloadRouting *pr = (PayloadRouting*)_routing.at(i); 00264 // printf("Node %d, sensor %d\n", pr->nodeId, pr->sensorId); 00265 if ( pr->nodeId == nodeId && pr->sensorId == sensorId ) { 00266 // printf("Found!\n"); 00267 return pr; 00268 } 00269 } 00270 // printf("NOT found\n"); 00271 // Add to routing list 00272 addRoutingToList( nodeId, sensorId, 1.0, OUTPUT_UNKNOWN,"","","",""); 00273 // Save Routing list 00274 writeRoutingList(); 00275 00276 return NULL; 00277 } 00278 00279 00280 short IoTRouting::getPayloadType( uint8_t *data, short dataLen ) { 00281 // printf("Getting payload type, size is %d - ",(int)_nodeDefList.size() ); 00282 for (short i=0; i<(int)_nodeDefList.size(); i++) { 00283 // printf("%d, %ld, ",i, (int)_nodeDefList.at(i)); 00284 IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i); 00285 if ( nd->groupId == data[0] && nd->nodeId == (data[1] & 0x1f) && nd->length == data[2] ) { 00286 // printf("Found %d\n", nd->type); 00287 return nd->type; 00288 } 00289 } 00290 // printf("NOT found\n"); 00291 // Add to node list 00292 addNodeToList(data[0], data[1] & 0x1f, data[2], PAYLOAD_TYPE_UNKNOWN ); 00293 // Save NodeList 00294 writeNodeList(); 00295 return PAYLOAD_TYPE_UNKNOWN; 00296 } 00297 00298 #define TMPBUF_SIZE 20 00299 00300 bool IoTRouting::routePayload( uint8_t *data, short dataLen ) { 00301 bool routed = false; 00302 uint8_t *ptr = data; 00303 PayloadSimple psimp; 00304 PayloadV1 pv1; 00305 PayloadV2 pv2; 00306 PayloadRouting *pr = NULL; 00307 char tmpBuf[TMPBUF_SIZE]; 00308 00309 pachubeOutput->init(); // Just to be sure 00310 printf("routePayload: "); 00311 for ( int i = 0; i < dataLen; i++ ) 00312 printf("%02X ", *ptr++ ); 00313 00314 printf("\n"); 00315 00316 short payloadType = getPayloadType( data, dataLen ); 00317 00318 printf("Group ID: %d Node ID: %d Length: %d Status: %02x, Payload type %d\n", data[0], data[1] & 0x1f, data[2], data[3], payloadType ); 00319 00320 switch ( payloadType ) { 00321 case PAYLOAD_TYPE_SIMPLE: 00322 psimp = PayloadSimple( data, dataLen); 00323 printf("SIMPLE NumReadings %d\n", psimp.numReadings()); 00324 for ( int n = 0; n<psimp.numReadings(); n++ ) { 00325 printf("%d: %d - %d\n", n, psimp.sensorId(n), psimp.reading(n) ); 00326 // Get list of routing objects 00327 pr = getRouting( psimp.nodeId(), psimp.sensorId(n) ); 00328 if ( pr != NULL ) { 00329 if ( pr->output != NULL ) { 00330 // printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2); 00331 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(psimp.reading(n) * pr->factor)); 00332 // printf("Add to output %s, %s, %s\n", pr->param1, pr->param2, tmpBuf ); 00333 pr->output->addReading( pr->param1, pr->param2,tmpBuf ); 00334 } 00335 } 00336 } 00337 if ( pr != NULL ) { 00338 if ( pr->output != NULL ) 00339 pr->output->send(); 00340 } 00341 routed = true; 00342 00343 break; 00344 case PAYLOAD_TYPE_V1: 00345 pv1 = PayloadV1( data, dataLen); 00346 printf("V1 NumReadings %d, Node %d\n", pv1.numReadings(), pv1.nodeId() ); 00347 00348 pr = getRouting( pv1.nodeId(), -1 ); 00349 if ( pr != NULL ) { 00350 // Output to destination 00351 // printf("Updating low battery feed\n"); 00352 if ( data[3] & STATUS_LOW_BATTERY ) { 00353 printf("LOW Battery detected\n"); 00354 } 00355 if ( pr->output != NULL ) { 00356 snprintf(tmpBuf, TMPBUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY )); 00357 pr->output->addReading( pr->param1, pr->param2, tmpBuf ); 00358 } 00359 } 00360 // Determine output 00361 for ( int n = 0; n<pv1.numReadings(); n++ ) { 00362 pr = getRouting(pv1.nodeId(), pv1.sensorId(n) ); 00363 if ( pr != NULL ) { 00364 tmpBuf[0] = '\0'; 00365 // printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2); 00366 if ( pr->output != NULL ) { 00367 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv1.reading(n) * pr->factor)); 00368 pr->output->addReading( pr->param1, pr->param2, tmpBuf ); 00369 } 00370 printf("%d: %d - %s\n", n, pv1.sensorId(n), tmpBuf ); 00371 00372 } 00373 } 00374 if ( pr != NULL ) { 00375 if ( pr->output != NULL ) 00376 pr->output->send(); 00377 } 00378 routed = true; 00379 break; 00380 case PAYLOAD_TYPE_V2: 00381 pv2 = PayloadV2( data, dataLen ); 00382 // printf("V2 NumReadings %d\n", pv2.numReadings()); 00383 pr = getRouting( pv2.nodeId(), -1 ); 00384 if ( pr != NULL ) { 00385 // Output to destination 00386 // printf("Updating low battery feed\n"); 00387 if ( data[3] & STATUS_LOW_BATTERY ) { 00388 printf("LOW Battery detected\n"); 00389 } 00390 if ( pr->output != NULL ) { 00391 snprintf(tmpBuf, TMPBUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY )); 00392 pr->output->addReading( pr->param1, pr->param2, tmpBuf ); 00393 } 00394 // Need to update add reading to detect change in feed ID and send. 00395 } 00396 00397 for ( int n = 0; n < pv2.numReadings(); n++ ) { 00398 // printf("Getting reading %d node %d sensor id %d\n",n, pv2.nodeId(), pv2.sensorId(n) ); 00399 pr = getRouting( pv2.nodeId(), pv2.sensorId(n) ); 00400 if ( pr != NULL ) { 00401 // printf("Item: pr %ld, out type %d\n", (int)pr, pr->outType); 00402 00403 // printf("readingType = %d\n",pv2.readingType( n )); 00404 // printf("IoTRouting: output = %ld\n",(int)pr->output); 00405 00406 if ( pr->output != NULL ) { 00407 tmpBuf[0] = '\0'; 00408 00409 switch ( pv2.readingType( n ) ) { 00410 case V2_DATATYPE_BYTE: 00411 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingByte(n) * pr->factor)); 00412 break; 00413 case V2_DATATYPE_SHORT: 00414 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingShort(n) * pr->factor)); 00415 break; 00416 case V2_DATATYPE_LONG: 00417 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingLong(n) * pr->factor)); 00418 break; 00419 case V2_DATATYPE_STRING: 00420 break; 00421 default: 00422 break; 00423 } 00424 pr->output->addReading( pr->param1, pr->param2, tmpBuf ); 00425 } 00426 } 00427 } 00428 if ( pr != NULL ) { 00429 if ( pr->output != NULL ) 00430 pr->output->send(); 00431 } 00432 00433 routed = true; 00434 break; 00435 case PAYLOAD_TYPE_UNKNOWN: // Unknown node found 00436 printf("Unknown payload type found\n"); 00437 break; 00438 default: 00439 printf("Unknown\n"); 00440 break; 00441 } 00442 00443 pachubeOutput->send(); // Just in case anything left to send 00444 printf("Finished routing\n"); 00445 return routed; 00446 } 00447
Generated on Tue Jul 12 2022 22:07:04 by
1.7.2