Andrew Lindsay / Mbed 2 deprecated IoTGateway_Basic

Dependencies:   NetServices FatFileSystem csv_parser mbed MQTTClient RF12B DNSResolver SDFileSystem

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers IoTRouting.cpp Source File

IoTRouting.cpp

00001 /** IoT Gateway Routing for incoming messages
00002  *
00003  * @author Andrew Lindsay
00004  *
00005  * @section LICENSE
00006  *
00007  * Copyright (c) 2012 Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
00008  *
00009  * Permission is hereby granted, free of charge, to any person obtaining a copy
00010  * of this software and associated documentation files (the "Software"), to deal
00011  * in the Software without restriction, including without limitation the rights
00012  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00013  * copies of the Software, and to permit persons to whom the Software is
00014  * furnished to do so, subject to the following conditions:
00015 
00016  * The above copyright notice and this permission notice shall be included in
00017  * all copies or substantial portions of the Software.
00018  * 
00019  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00020  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00021  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00022  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00023  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00024  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00025  * THE SOFTWARE.
00026  * 
00027  * @section DESCRIPTION
00028  * 
00029  * 
00030  */
00031 
00032 #include "mbed.h"
00033 #include "RF12B.h"
00034 #include "IoTRouting.h"
00035 #include "csv_parser.h"
00036 #include "PayloadV1.h"
00037 #include "PayloadV2.h"
00038 #include "PayloadSimple.h"
00039 #include "OutputPachube.h"
00040 
00041 //DigitalOut activityLED(p29, "activityLED");
00042 
00043 #define iotRoutingFile "/iotfs/IOTRTG.TXT"
00044 #define iotNodeDefFile "/iotfs/IOTNODE.TXT"
00045 
00046 
00047 // Default Constructor
00048 IoTRouting::IoTRouting() {}
00049 
00050 
00051 void IoTRouting::addNodeToList(uint8_t groupId, uint8_t nodeId, uint8_t length, uint8_t type ) {
00052     IoTNodeDef *pr = new IoTNodeDef();
00053     pr->groupId = groupId;
00054     pr->nodeId = nodeId;
00055     pr->length = length;
00056     pr->type = type;
00057 
00058     _nodeDefList.push_back(pr);
00059 }
00060 
00061 void IoTRouting::addRoutingToList( short nodeId, short sensorId, float factor, byte outType,
00062                                    char *param1, char *param2, char *param3, char *param4 ) {
00063 
00064     PayloadRouting* pr = new PayloadRouting();
00065     pr->nodeId = nodeId;
00066     pr->sensorId = sensorId;
00067 
00068     pr->factor = factor;
00069     pr->outType = outType;
00070 
00071     switch ( pr->outType ) {
00072         case  OUTPUT_TYPE_PACHUBE:
00073             pr->output = pachubeOutput;
00074             break;
00075         case  OUTPUT_TYPE_MQTT:
00076             pr->output = mqttOutput;
00077             break;
00078         case  OUTPUT_TYPE_EMONCMS:
00079             pr->output = emonCmsOutput;
00080             break;
00081         case  OUTPUT_TYPE_SENSE:
00082             pr->output = senSeOutput;
00083             break;
00084         default:
00085             pr->output = NULL;
00086             break;
00087     }
00088 
00089     pr->param1 = new char[strlen(param1)+1];
00090     strcpy(pr->param1, param1);
00091     pr->param2 = new char[strlen(param2)+1];
00092     strcpy(pr->param2, param2);
00093     pr->param3 = new char[strlen(param3)+1];
00094     strcpy(pr->param3, param3);
00095     pr->param4 = new char[strlen(param4)+1];
00096     strcpy(pr->param4, param4);
00097 //    printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
00098     _routing.push_back(pr);
00099 }
00100 
00101 
00102 void IoTRouting::initRouting() {
00103     sendCount = 0;
00104 
00105     FILE *fp = fopen(iotNodeDefFile, "r");
00106     if (fp == NULL) {
00107         printf("Could not open file %s for read\n", iotNodeDefFile);
00108         return;
00109     }
00110 
00111     // read file
00112 
00113     const char field_terminator = ',';
00114     const char line_terminator  = '\n';
00115     const char enclosure_char   = '"';
00116 
00117     csv_parser file_parser;
00118 
00119     /* Define how many records we're gonna skip. This could be used to skip the column definitions. */
00120     file_parser.set_skip_lines(0);
00121 
00122     /* Specify the file to parse */
00123     file_parser.init(fp);
00124 
00125     /* Here we tell the parser how to parse the file */
00126     file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL);
00127     file_parser.set_field_term_char(field_terminator);
00128     file_parser.set_line_term_char(line_terminator);
00129 
00130     /* Check to see if there are more records, then grab each row one at a time */
00131 
00132     while (file_parser.has_more_rows()) {
00133         csv_row row = file_parser.get_row();
00134         if (row.size()<4) {
00135             printf("row too short %d\n", row.size());
00136             continue;
00137         }
00138         addNodeToList((uint8_t)atoi(row[0].c_str()), (uint8_t)atoi(row[1].c_str()),
00139                       (uint8_t)atoi(row[2].c_str()), (uint8_t)atoi(row[3].c_str()));
00140     }
00141 
00142     fclose(fp);
00143 
00144 //    printf("Finished loading routing data\n");
00145 
00146     fp = fopen(iotRoutingFile, "r");
00147     if (fp == NULL) {
00148         printf("Could not open file %s for read\n", iotRoutingFile);
00149         return;
00150     }
00151 //    _routing=new list<PayloadRouting*>();
00152 
00153     /* Define how many records we're gonna skip. This could be used to skip the column definitions. */
00154     file_parser.set_skip_lines(0);
00155 
00156     /* Specify the file to parse */
00157     file_parser.init(fp);
00158 
00159     /* Here we tell the parser how to parse the file */
00160     file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL);
00161     file_parser.set_field_term_char(field_terminator);
00162     file_parser.set_line_term_char(line_terminator);
00163 
00164     /* Check to see if there are more records, then grab each row one at a time */
00165 
00166     while (file_parser.has_more_rows()) {
00167         csv_row row = file_parser.get_row();
00168         if (row.size()<6) {
00169             printf("row too short %d\n", row.size());
00170             continue;
00171         }
00172         addRoutingToList( atoi(row[0].c_str()), atoi(row[1].c_str()), atof(row[2].c_str()), atoi(row[3].c_str()),
00173                           (char*)row[4].c_str(), (char*)row[5].c_str(), (char*)row[6].c_str(), (char*)row[7].c_str() );
00174     }
00175 
00176     fclose(fp);
00177 
00178 //    printf("Finished loading routing data\n");
00179 
00180 //    if(!mqtt.connect("IoTGateway")){
00181 //        printf("\r\nConnect to server failed ..\r\n");
00182     //  return -1;
00183 //    }
00184 }
00185 
00186 // Write the current nodelist back to file
00187 // First rename original file to iotsetup.bak
00188 bool IoTRouting::writeNodeList() {
00189 
00190     // Rename original file
00191 //    if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) {
00192 //        printf("Could not rename file %s\n", iotConfigFile);
00193 //        return false;
00194 //    }
00195 
00196     FILE *fp = fopen(iotNodeDefFile, "w");
00197     if (fp == NULL) {
00198         printf("Could not open file %s for write\n", iotNodeDefFile);
00199         return false;
00200     }
00201 
00202     for (short i=0; i<(int)_nodeDefList.size(); i++) {
00203         IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i);
00204 //        printf("%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type );
00205         fprintf(fp, "%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type );
00206     }
00207 
00208     fclose(fp);
00209 
00210     return true;
00211 }
00212 
00213 // Write the current nodelist back to file
00214 // First rename original file to iotsetup.bak
00215 bool IoTRouting::writeRoutingList() {
00216 
00217     // Rename original file
00218 //    if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) {
00219 //        printf("Could not rename file %s\n", iotRoutingFile);
00220 //        return false;
00221 //    }
00222 
00223     FILE *fp = fopen(iotRoutingFile, "w");
00224     if (fp == NULL) {
00225         printf("Could not open file %s for write\n", iotRoutingFile);
00226         return false;
00227     }
00228 
00229     for ( short i=0; i<(short)_routing.size(); i++ ) {
00230         PayloadRouting *pr = (PayloadRouting*)_routing.at(i);
00231 //        printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
00232         fprintf(fp, "%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
00233     }
00234 
00235     fclose(fp);
00236 
00237     return true;
00238 }
00239 
00240 void IoTRouting::addOutput( OutputDef *output, short outType ) {
00241     switch ( outType ) {
00242         case  OUTPUT_TYPE_PACHUBE:
00243             pachubeOutput = output;
00244             break;
00245         case  OUTPUT_TYPE_MQTT:
00246             mqttOutput = output;
00247             break;
00248         case  OUTPUT_TYPE_EMONCMS:
00249             emonCmsOutput = output;
00250             break;
00251         case  OUTPUT_TYPE_SENSE:
00252             senSeOutput = output;
00253             break;
00254         default:
00255             break;
00256     }
00257 }
00258 
00259 PayloadRouting* IoTRouting::getRouting( short nodeId, short sensorId ) {
00260 //    printf("Getting routing info for node %d, sensor %d - ", nodeId, sensorId);
00261     for ( short i=0; i<(short)_routing.size(); i++ ) {
00262 
00263         PayloadRouting *pr = (PayloadRouting*)_routing.at(i);
00264 //        printf("Node %d, sensor %d\n", pr->nodeId, pr->sensorId);
00265         if ( pr->nodeId == nodeId &&  pr->sensorId == sensorId ) {
00266 //            printf("Found!\n");
00267             return pr;
00268         }
00269     }
00270 //    printf("NOT found\n");
00271     // Add to routing list
00272     addRoutingToList( nodeId, sensorId, 1.0, OUTPUT_UNKNOWN,"","","","");
00273     // Save Routing list
00274     writeRoutingList();
00275 
00276     return NULL;
00277 }
00278 
00279 
00280 short IoTRouting::getPayloadType(  uint8_t *data, short dataLen ) {
00281 //    printf("Getting payload type, size is %d -  ",(int)_nodeDefList.size() );
00282     for (short i=0; i<(int)_nodeDefList.size(); i++) {
00283 //    printf("%d, %ld, ",i, (int)_nodeDefList.at(i));
00284         IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i);
00285         if ( nd->groupId == data[0] && nd->nodeId == (data[1] & 0x1f) && nd->length == data[2] ) {
00286 //            printf("Found %d\n", nd->type);
00287             return nd->type;
00288         }
00289     }
00290 //    printf("NOT found\n");
00291     // Add to node list
00292     addNodeToList(data[0], data[1] & 0x1f, data[2], PAYLOAD_TYPE_UNKNOWN );
00293     // Save NodeList
00294     writeNodeList();
00295     return PAYLOAD_TYPE_UNKNOWN;
00296 }
00297 
00298 #define TMPBUF_SIZE 20
00299 
00300 bool IoTRouting::routePayload( uint8_t *data, short dataLen ) {
00301     bool routed = false;
00302     uint8_t *ptr = data;
00303     PayloadSimple psimp;
00304     PayloadV1 pv1;
00305     PayloadV2 pv2;
00306     PayloadRouting *pr = NULL;
00307     char tmpBuf[TMPBUF_SIZE];
00308 
00309     pachubeOutput->init();      // Just to be sure
00310     printf("routePayload: ");
00311     for ( int i = 0; i < dataLen; i++ )
00312         printf("%02X ", *ptr++ );
00313 
00314     printf("\n");
00315 
00316     short payloadType = getPayloadType( data, dataLen );
00317 
00318     printf("Group ID: %d Node ID: %d Length: %d Status: %02x, Payload type %d\n", data[0], data[1] & 0x1f, data[2], data[3], payloadType );
00319 
00320     switch ( payloadType ) {
00321         case PAYLOAD_TYPE_SIMPLE:
00322             psimp = PayloadSimple( data, dataLen);
00323             printf("SIMPLE NumReadings %d\n", psimp.numReadings());
00324             for ( int n = 0; n<psimp.numReadings(); n++ ) {
00325                 printf("%d: %d - %d\n", n, psimp.sensorId(n), psimp.reading(n) );
00326                 // Get list of routing objects
00327                 pr = getRouting( psimp.nodeId(), psimp.sensorId(n) );
00328                 if ( pr != NULL ) {
00329                     if ( pr->output != NULL ) {
00330 //                        printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2);
00331                         snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(psimp.reading(n) * pr->factor));
00332 //                        printf("Add to output %s, %s, %s\n", pr->param1, pr->param2, tmpBuf );
00333                         pr->output->addReading( pr->param1, pr->param2,tmpBuf );
00334                     }
00335                 }
00336             }
00337             if ( pr != NULL ) {
00338                 if ( pr->output != NULL )
00339                     pr->output->send();
00340             }
00341             routed = true;
00342 
00343             break;
00344         case PAYLOAD_TYPE_V1:
00345             pv1 = PayloadV1( data, dataLen);
00346             printf("V1 NumReadings %d, Node %d\n", pv1.numReadings(), pv1.nodeId() );
00347 
00348             pr = getRouting( pv1.nodeId(), -1 );
00349             if ( pr != NULL ) {
00350                 // Output to destination
00351 //                printf("Updating low battery feed\n");
00352                 if ( data[3] & STATUS_LOW_BATTERY ) {
00353                     printf("LOW Battery detected\n");
00354                 }
00355                 if ( pr->output != NULL ) {
00356                     snprintf(tmpBuf, TMPBUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY ));
00357                     pr->output->addReading( pr->param1, pr->param2, tmpBuf );
00358                     }
00359             }
00360             // Determine output
00361             for ( int n = 0; n<pv1.numReadings(); n++ ) {
00362                 pr = getRouting(pv1.nodeId(), pv1.sensorId(n) );
00363                 if ( pr != NULL ) {
00364                     tmpBuf[0] = '\0';
00365 //                    printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2);
00366                     if ( pr->output != NULL ) {
00367                         snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv1.reading(n) * pr->factor));
00368                         pr->output->addReading( pr->param1, pr->param2, tmpBuf );
00369                     }
00370                     printf("%d: %d - %s\n", n, pv1.sensorId(n), tmpBuf );
00371 
00372                 }
00373             }
00374             if ( pr != NULL ) {
00375                 if ( pr->output != NULL )
00376                     pr->output->send();
00377             }
00378             routed = true;
00379             break;
00380         case PAYLOAD_TYPE_V2:
00381             pv2 = PayloadV2( data, dataLen );
00382 //            printf("V2 NumReadings %d\n", pv2.numReadings());
00383             pr = getRouting( pv2.nodeId(), -1 );
00384             if ( pr != NULL ) {
00385                 // Output to destination
00386 //                printf("Updating low battery feed\n");
00387                 if ( data[3] & STATUS_LOW_BATTERY ) {
00388                     printf("LOW Battery detected\n");
00389                 }
00390                 if ( pr->output != NULL ) {
00391                     snprintf(tmpBuf, TMPBUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY ));
00392                     pr->output->addReading( pr->param1, pr->param2, tmpBuf );
00393                 }
00394                 // Need to update add reading to detect change in feed ID and send.
00395             }
00396 
00397             for ( int n = 0; n < pv2.numReadings(); n++ ) {
00398 //                printf("Getting reading %d node %d sensor id %d\n",n, pv2.nodeId(), pv2.sensorId(n) );
00399                 pr = getRouting( pv2.nodeId(), pv2.sensorId(n) );
00400                 if ( pr != NULL ) {
00401 //                        printf("Item: pr %ld, out type %d\n", (int)pr, pr->outType);
00402 
00403 //                        printf("readingType = %d\n",pv2.readingType( n ));
00404 //                        printf("IoTRouting: output = %ld\n",(int)pr->output);
00405 
00406                     if ( pr->output != NULL ) {
00407                          tmpBuf[0] = '\0';
00408 
00409                         switch ( pv2.readingType( n ) ) {
00410                             case V2_DATATYPE_BYTE:
00411                                 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingByte(n) * pr->factor));
00412                                 break;
00413                             case V2_DATATYPE_SHORT:
00414                                 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingShort(n) * pr->factor));
00415                                 break;
00416                             case V2_DATATYPE_LONG:
00417                                 snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingLong(n) * pr->factor));
00418                                 break;
00419                             case V2_DATATYPE_STRING:
00420                                 break;
00421                             default:
00422                                 break;
00423                         }
00424                         pr->output->addReading( pr->param1, pr->param2, tmpBuf );
00425                     }
00426                 }
00427             }
00428             if ( pr != NULL ) {
00429                 if ( pr->output != NULL )
00430                     pr->output->send();
00431             }
00432 
00433             routed = true;
00434             break;
00435         case PAYLOAD_TYPE_UNKNOWN:      // Unknown node found
00436             printf("Unknown payload type found\n");
00437             break;
00438         default:
00439             printf("Unknown\n");
00440             break;
00441     }
00442 
00443     pachubeOutput->send();     // Just in case anything left to send
00444     printf("Finished routing\n");
00445     return routed;
00446 }
00447