mbed based IoT Gateway More details http://blog.thiseldo.co.uk/wp-filez/IoTGateway.pdf
Dependencies: NetServices FatFileSystem csv_parser mbed MQTTClient RF12B DNSResolver SDFileSystem
Routing/IoTRouting.cpp
- Committer:
- SomeRandomBloke
- Date:
- 2012-05-01
- Revision:
- 4:d460406ac780
- Parent:
- 2:27714c8c9c0a
- Child:
- 5:0dbc27a7af55
File content as of revision 4:d460406ac780:
/** IoT Gateway Routing for incoming messages * * @author Andrew Lindsay * * @section LICENSE * * Copyright (c) 2012 Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk) * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. * * @section DESCRIPTION * * */ #include "mbed.h" #include "RF12B.h" #include "IoTRouting.h" #include "csv_parser.h" #include "PayloadV1.h" #include "PayloadV2.h" #include "PayloadSimple.h" #include "OutputPachube.h" //DigitalOut activityLED(p29, "activityLED"); #define iotRoutingFile "/iotfs/IOTRTG.TXT" #define iotNodeDefFile "/iotfs/IOTNODE.TXT" // Default Constructor IoTRouting::IoTRouting() {} void IoTRouting::addNodeToList(uint8_t groupId, uint8_t nodeId, uint8_t length, uint8_t type ) { IoTNodeDef *pr = new IoTNodeDef(); pr->groupId = groupId; pr->nodeId = nodeId; pr->length = length; pr->type = type; _nodeDefList.push_back(pr); } void IoTRouting::addRoutingToList( short nodeId, short sensorId, float factor, byte outType, char *param1, char *param2, char *param3, char *param4 ) { PayloadRouting* pr = new PayloadRouting(); pr->nodeId = nodeId; pr->sensorId = sensorId; pr->factor = factor; pr->outType = outType; switch ( pr->outType ) { case OUTPUT_TYPE_PACHUBE: pr->output = pachubeOutput; break; case OUTPUT_TYPE_MQTT: pr->output = mqttOutput; break; case OUTPUT_TYPE_EMONCMS: pr->output = emonCmsOutput; break; case OUTPUT_TYPE_SENSE: pr->output = senSeOutput; break; default: pr->output = NULL; break; } pr->param1 = new char[strlen(param1)+1]; strcpy(pr->param1, param1); pr->param2 = new char[strlen(param2)+1]; strcpy(pr->param2, param2); pr->param3 = new char[strlen(param3)+1]; strcpy(pr->param3, param3); pr->param4 = new char[strlen(param4)+1]; strcpy(pr->param4, param4); // printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4); _routing.push_back(pr); } void IoTRouting::initRouting() { sendCount = 0; FILE *fp = fopen(iotNodeDefFile, "r"); if (fp == NULL) { printf("Could not open file %s for read\n", iotNodeDefFile); return; } // read file const char field_terminator = ','; const char line_terminator = '\n'; const char enclosure_char = '"'; csv_parser file_parser; /* Define how many records we're gonna skip. This could be used to skip the column definitions. */ file_parser.set_skip_lines(0); /* Specify the file to parse */ file_parser.init(fp); /* Here we tell the parser how to parse the file */ file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL); file_parser.set_field_term_char(field_terminator); file_parser.set_line_term_char(line_terminator); /* Check to see if there are more records, then grab each row one at a time */ while (file_parser.has_more_rows()) { csv_row row = file_parser.get_row(); if (row.size()<4) { printf("row too short %d\n", row.size()); continue; } addNodeToList((uint8_t)atoi(row[0].c_str()), (uint8_t)atoi(row[1].c_str()), (uint8_t)atoi(row[2].c_str()), (uint8_t)atoi(row[3].c_str())); } fclose(fp); // printf("Finished loading routing data\n"); fp = fopen(iotRoutingFile, "r"); if (fp == NULL) { printf("Could not open file %s for read\n", iotRoutingFile); return; } // _routing=new list<PayloadRouting*>(); /* Define how many records we're gonna skip. This could be used to skip the column definitions. */ file_parser.set_skip_lines(0); /* Specify the file to parse */ file_parser.init(fp); /* Here we tell the parser how to parse the file */ file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL); file_parser.set_field_term_char(field_terminator); file_parser.set_line_term_char(line_terminator); /* Check to see if there are more records, then grab each row one at a time */ while (file_parser.has_more_rows()) { csv_row row = file_parser.get_row(); if (row.size()<6) { printf("row too short %d\n", row.size()); continue; } addRoutingToList( atoi(row[0].c_str()), atoi(row[1].c_str()), atof(row[2].c_str()), atoi(row[3].c_str()), (char*)row[4].c_str(), (char*)row[5].c_str(), (char*)row[6].c_str(), (char*)row[7].c_str() ); } fclose(fp); // printf("Finished loading routing data\n"); // if(!mqtt.connect("IoTGateway")){ // printf("\r\nConnect to server failed ..\r\n"); // return -1; // } } // Write the current nodelist back to file // First rename original file to iotsetup.bak bool IoTRouting::writeNodeList() { // Rename original file // if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) { // printf("Could not rename file %s\n", iotConfigFile); // return false; // } FILE *fp = fopen(iotNodeDefFile, "w"); if (fp == NULL) { printf("Could not open file %s for write\n", iotNodeDefFile); return false; } for (short i=0; i<(int)_nodeDefList.size(); i++) { IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i); // printf("%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type ); fprintf(fp, "%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type ); } fclose(fp); return true; } // Write the current nodelist back to file // First rename original file to iotsetup.bak bool IoTRouting::writeRoutingList() { // Rename original file // if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) { // printf("Could not rename file %s\n", iotRoutingFile); // return false; // } FILE *fp = fopen(iotRoutingFile, "w"); if (fp == NULL) { printf("Could not open file %s for write\n", iotRoutingFile); return false; } for ( short i=0; i<(short)_routing.size(); i++ ) { PayloadRouting *pr = (PayloadRouting*)_routing.at(i); // printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4); fprintf(fp, "%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4); } fclose(fp); return true; } void IoTRouting::addOutput( OutputDef *output, short outType ) { switch ( outType ) { case OUTPUT_TYPE_PACHUBE: pachubeOutput = output; break; case OUTPUT_TYPE_MQTT: mqttOutput = output; break; case OUTPUT_TYPE_EMONCMS: emonCmsOutput = output; break; case OUTPUT_TYPE_SENSE: senSeOutput = output; break; default: break; } } PayloadRouting* IoTRouting::getRouting( short nodeId, short sensorId ) { printf("Getting routing info for node %d, sensor %d - ", nodeId, sensorId); for ( short i=0; i<(short)_routing.size(); i++ ) { PayloadRouting *pr = (PayloadRouting*)_routing.at(i); // printf("Node %d, sensor %d\n", pr->nodeId, pr->sensorId); if ( pr->nodeId == nodeId && pr->sensorId == sensorId ) { printf("Found!\n"); return pr; } } printf("NOT found\n"); // Add to routing list addRoutingToList( nodeId, sensorId, 1.0, OUTPUT_UNKNOWN,"","","",""); // Save Routing list writeRoutingList(); return NULL; } short IoTRouting::getPayloadType( uint8_t *data, short dataLen ) { printf("Getting payload type, size is %d - ",(int)_nodeDefList.size() ); for (short i=0; i<(int)_nodeDefList.size(); i++) { // printf("%d, %ld, ",i, (int)_nodeDefList.at(i)); IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i); if ( nd->groupId == data[0] && nd->nodeId == (data[1] & 0x1f) && nd->length == data[2] ) { printf("Found %d\n", nd->type); return nd->type; } } printf("NOT found\n"); // Add to node list addNodeToList(data[0], data[1] & 0x1f, data[2], PAYLOAD_TYPE_UNKNOWN ); // Save NodeList writeNodeList(); return PAYLOAD_TYPE_UNKNOWN; } bool IoTRouting::routePayload( uint8_t *data, short dataLen ) { bool routed = false; uint8_t *ptr = data; PayloadSimple psimp; PayloadV1 pv1; PayloadV2 pv2; PayloadRouting *pr = NULL; char tmpBuf[42]; pachubeOutput->init(); // Just to be sure printf("routePayload: "); for ( int i = 0; i < dataLen; i++ ) printf("%02X ", *ptr++ ); printf("\n"); short payloadType = getPayloadType( data, dataLen ); printf("Group ID: %d Node ID: %d Length: %d Status: %02x, Payload type %d\n", data[0], data[1] & 0x1f, data[2], data[3], payloadType ); switch ( payloadType ) { case PAYLOAD_TYPE_SIMPLE: psimp = PayloadSimple( data, dataLen); printf("SIMPLE NumReadings %d\n", psimp.numReadings()); for ( int n = 0; n<psimp.numReadings(); n++ ) { printf("%d: %d - %d\n", n, psimp.sensorId(n), psimp.reading(n) ); // Get list of routing objects pr = getRouting( psimp.nodeId(), psimp.sensorId(n) ); if ( pr != NULL ) { if ( pr->output != NULL ) { // printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2); snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(psimp.reading(n) * pr->factor)); printf("Add to output %s, %s, %s\n", pr->param1, pr->param2, tmpBuf ); pr->output->addReading( pr->param1, pr->param2,tmpBuf ); } } } if ( pr != NULL ) { if ( pr->output != NULL ) pr->output->send(); } routed = true; break; case PAYLOAD_TYPE_V1: pv1 = PayloadV1( data, dataLen); printf("V1 NumReadings %d, Node %d\n", pv1.numReadings(), pv1.nodeId() ); pr = getRouting( pv1.nodeId(), -1 ); if ( pr != NULL ) { // Output to destination // printf("Updating low battery feed\n"); if ( data[3] & STATUS_LOW_BATTERY ) { printf("LOW Battery detected\n"); } if ( pr->output != NULL ) { snprintf(tmpBuf, DATABUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY )); pr->output->addReading( pr->param1, pr->param2, tmpBuf ); } } // Determine output for ( int n = 0; n<pv1.numReadings(); n++ ) { pr = getRouting(pv1.nodeId(), pv1.sensorId(n) ); if ( pr != NULL ) { tmpBuf[0] = '\0'; // printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2); if ( pr->output != NULL ) { snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv1.reading(n) * pr->factor)); pr->output->addReading( pr->param1, pr->param2, tmpBuf ); } printf("%d: %d - %s\n", n, pv1.sensorId(n), tmpBuf ); } } if ( pr != NULL ) { if ( pr->output != NULL ) pr->output->send(); } routed = true; break; case PAYLOAD_TYPE_V2: pv2 = PayloadV2( data, dataLen ); // printf("V2 NumReadings %d\n", pv2.numReadings()); pr = getRouting( pv2.nodeId(), -1 ); if ( pr != NULL ) { // Output to destination // printf("Updating low battery feed\n"); if ( data[3] & STATUS_LOW_BATTERY ) { printf("LOW Battery detected\n"); } if ( pr->output != NULL ) { snprintf(tmpBuf, DATABUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY )); pr->output->addReading( pr->param1, pr->param2, tmpBuf ); } // Need to update add reading to detect change in feed ID and send. } for ( int n = 0; n < pv2.numReadings(); n++ ) { // printf("Getting reading %d node %d sensor id %d\n",n, pv2.nodeId(), pv2.sensorId(n) ); pr = getRouting( pv2.nodeId(), pv2.sensorId(n) ); if ( pr != NULL ) { // printf("Item: pr %ld, out type %d\n", (int)pr, pr->outType); // printf("readingType = %d\n",pv2.readingType( n )); // printf("IoTRouting: output = %ld\n",(int)pr->output); if ( pr->output != NULL ) { tmpBuf[0] = '\0'; switch ( pv2.readingType( n ) ) { case V2_DATATYPE_BYTE: snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv2.readingByte(n) * pr->factor)); break; case V2_DATATYPE_SHORT: snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv2.readingShort(n) * pr->factor)); break; case V2_DATATYPE_LONG: snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv2.readingLong(n) * pr->factor)); break; case V2_DATATYPE_STRING: break; default: break; } pr->output->addReading( pr->param1, pr->param2, tmpBuf ); } } } if ( pr != NULL ) { if ( pr->output != NULL ) pr->output->send(); } routed = true; break; case PAYLOAD_TYPE_UNKNOWN: // Unknown node found printf("Unknown payload type found\n"); break; default: printf("Unknown\n"); break; } pachubeOutput->send(); // Just in case anything left to send printf("Finished routing\n"); return routed; }