mbed based IoT Gateway More details http://blog.thiseldo.co.uk/wp-filez/IoTGateway.pdf

Dependencies:   NetServices FatFileSystem csv_parser mbed MQTTClient RF12B DNSResolver SDFileSystem

Revision:
0:a29a0225f203
Child:
2:27714c8c9c0a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Routing/IoTRouting.cpp	Mon Apr 02 22:05:20 2012 +0000
@@ -0,0 +1,434 @@
+/** IoT Gateway Routing for incoming messages
+ *
+ * @author Andrew Lindsay
+ *
+ * @section LICENSE
+ *
+ * Copyright (c) 2012 Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ * 
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ * 
+ * @section DESCRIPTION
+ * 
+ * 
+ */
+
+#include "mbed.h"
+#include "RF12B.h"
+#include "IoTRouting.h"
+#include "csv_parser.h"
+#include "PayloadV1.h"
+#include "PayloadV2.h"
+#include "PayloadSimple.h"
+#include "OutputPachube.h"
+
+//DigitalOut activityLED(p29, "activityLED");
+
+#define iotRoutingFile "/iotfs/IOTRTG.TXT"
+#define iotNodeDefFile "/iotfs/IOTNODE.TXT"
+
+
+// Default Constructor
+IoTRouting::IoTRouting() {}
+
+
+void IoTRouting::addNodeToList(uint8_t groupId, uint8_t nodeId, uint8_t length, uint8_t type ) {
+    IoTNodeDef *pr = new IoTNodeDef();
+    pr->groupId = groupId;
+    pr->nodeId = nodeId;
+    pr->length = length;
+    pr->type = type;
+
+    _nodeDefList.push_back(pr);
+}
+
+void IoTRouting::addRoutingToList( short nodeId, short sensorId, float factor, byte outType,
+                                   char *param1, char *param2, char *param3, char *param4 ) {
+
+    PayloadRouting* pr = new PayloadRouting();
+    pr->nodeId = nodeId;
+    pr->sensorId = sensorId;
+
+    pr->factor = factor;
+    pr->outType = outType;
+
+    switch ( pr->outType ) {
+        case  OUTPUT_TYPE_PACHUBE:
+            pr->output = pachubeOutput;
+            break;
+        case  OUTPUT_TYPE_MQTT:
+            pr->output = mqttOutput;
+            break;
+        default:
+            pr->output = NULL;
+            break;
+    }
+
+    pr->param1 = new char[strlen(param1)+1];
+    strcpy(pr->param1, param1);
+    pr->param2 = new char[strlen(param2)+1];
+    strcpy(pr->param2, param2);
+    pr->param3 = new char[strlen(param3)+1];
+    strcpy(pr->param3, param3);
+    pr->param4 = new char[strlen(param4)+1];
+    strcpy(pr->param4, param4);
+//    printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
+    _routing.push_back(pr);
+}
+
+
+void IoTRouting::initRouting() {
+    sendCount = 0;
+
+    FILE *fp = fopen(iotNodeDefFile, "r");
+    if (fp == NULL) {
+        printf("Could not open file %s for read\n", iotNodeDefFile);
+        return;
+    }
+
+    // read file
+
+    const char field_terminator = ',';
+    const char line_terminator  = '\n';
+    const char enclosure_char   = '"';
+
+    csv_parser file_parser;
+
+    /* Define how many records we're gonna skip. This could be used to skip the column definitions. */
+    file_parser.set_skip_lines(0);
+
+    /* Specify the file to parse */
+    file_parser.init(fp);
+
+    /* Here we tell the parser how to parse the file */
+    file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL);
+    file_parser.set_field_term_char(field_terminator);
+    file_parser.set_line_term_char(line_terminator);
+
+    /* Check to see if there are more records, then grab each row one at a time */
+
+    while (file_parser.has_more_rows()) {
+        csv_row row = file_parser.get_row();
+        if (row.size()<4) {
+            printf("row too short %d\n", row.size());
+            continue;
+        }
+        addNodeToList((uint8_t)atoi(row[0].c_str()), (uint8_t)atoi(row[1].c_str()),
+                      (uint8_t)atoi(row[2].c_str()), (uint8_t)atoi(row[3].c_str()));
+    }
+
+    fclose(fp);
+
+//    printf("Finished loading routing data\n");
+
+    fp = fopen(iotRoutingFile, "r");
+    if (fp == NULL) {
+        printf("Could not open file %s for read\n", iotRoutingFile);
+        return;
+    }
+//    _routing=new list<PayloadRouting*>();
+
+    /* Define how many records we're gonna skip. This could be used to skip the column definitions. */
+    file_parser.set_skip_lines(0);
+
+    /* Specify the file to parse */
+    file_parser.init(fp);
+
+    /* Here we tell the parser how to parse the file */
+    file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL);
+    file_parser.set_field_term_char(field_terminator);
+    file_parser.set_line_term_char(line_terminator);
+
+    /* Check to see if there are more records, then grab each row one at a time */
+
+    while (file_parser.has_more_rows()) {
+        csv_row row = file_parser.get_row();
+        if (row.size()<6) {
+            printf("row too short %d\n", row.size());
+            continue;
+        }
+        addRoutingToList( atoi(row[0].c_str()), atoi(row[1].c_str()), atof(row[2].c_str()), atoi(row[3].c_str()),
+                          (char*)row[4].c_str(), (char*)row[5].c_str(), (char*)row[6].c_str(), (char*)row[7].c_str() );
+    }
+
+    fclose(fp);
+
+//    printf("Finished loading routing data\n");
+
+//    if(!mqtt.connect("IoTGateway")){
+//        printf("\r\nConnect to server failed ..\r\n");
+    //  return -1;
+//    }
+}
+
+// Write the current nodelist back to file
+// First rename original file to iotsetup.bak
+bool IoTRouting::writeNodeList() {
+
+    // Rename original file
+//    if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) {
+//        printf("Could not rename file %s\n", iotConfigFile);
+//        return false;
+//    }
+
+    FILE *fp = fopen(iotNodeDefFile, "w");
+    if (fp == NULL) {
+        printf("Could not open file %s for write\n", iotNodeDefFile);
+        return false;
+    }
+
+    for (short i=0; i<(int)_nodeDefList.size(); i++) {
+        IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i);
+//        printf("%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type );
+        fprintf(fp, "%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type );
+    }
+
+    fclose(fp);
+
+    return true;
+}
+
+// Write the current nodelist back to file
+// First rename original file to iotsetup.bak
+bool IoTRouting::writeRoutingList() {
+
+    // Rename original file
+//    if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) {
+//        printf("Could not rename file %s\n", iotRoutingFile);
+//        return false;
+//    }
+
+    FILE *fp = fopen(iotRoutingFile, "w");
+    if (fp == NULL) {
+        printf("Could not open file %s for write\n", iotRoutingFile);
+        return false;
+    }
+
+    for ( short i=0; i<(short)_routing.size(); i++ ) {
+        PayloadRouting *pr = (PayloadRouting*)_routing.at(i);
+//        printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
+        fprintf(fp, "%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
+    }
+
+    fclose(fp);
+
+    return true;
+}
+
+void IoTRouting::addOutput( OutputDef *output, short outType ) {
+    switch ( outType ) {
+        case  OUTPUT_TYPE_PACHUBE:
+            pachubeOutput = output;
+            break;
+        case  OUTPUT_TYPE_MQTT:
+            mqttOutput = output;
+            break;
+        default:
+            break;
+    }
+}
+
+PayloadRouting* IoTRouting::getRouting( short nodeId, short sensorId ) {
+    printf("Getting routing info for node %d, sensor %d size %d - ", nodeId, sensorId, (int)_routing.size());
+    for ( short i=0; i<(short)_routing.size(); i++ ) {
+
+        PayloadRouting *pr = (PayloadRouting*)_routing.at(i);
+//        printf("Node %d, sensor %d\n", pr->nodeId, pr->sensorId);
+        if ( pr->nodeId == nodeId &&  pr->sensorId == sensorId ) {
+            printf("Found!\n");
+            return pr;
+        }
+    }
+    printf("NOT found\n");
+    // Add to routing list
+    addRoutingToList( nodeId, sensorId, 1.0, OUTPUT_UNKNOWN,"","","","");
+    // Save Routing list
+    writeRoutingList();
+
+    return NULL;
+}
+
+
+short IoTRouting::getPayloadType(  uint8_t *data, short dataLen ) {
+    printf("Getting payload type, size is %d -  ",(int)_nodeDefList.size() );
+    for (short i=0; i<(int)_nodeDefList.size(); i++) {
+//    printf("%d, %ld, ",i, (int)_nodeDefList.at(i));
+        IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i);
+        if ( nd->groupId == data[0] && nd->nodeId == (data[1] & 0x1f) && nd->length == data[2] ) {
+            printf("Found %d\n", nd->type);
+            return nd->type;
+        }
+    }
+    printf("NOT found\n");
+    // Add to node list
+    addNodeToList(data[0], data[1] & 0x1f, data[2], PAYLOAD_TYPE_UNKNOWN );
+    // Save NodeList
+    writeNodeList();
+    return PAYLOAD_TYPE_UNKNOWN;
+}
+
+
+bool IoTRouting::routePayload( uint8_t *data, short dataLen ) {
+    bool routed = false;
+    uint8_t *ptr = data;
+    PayloadSimple psimp;
+    PayloadV1 pv1;
+    PayloadV2 pv2;
+    PayloadRouting *pr = NULL;
+    char tmpBuf[42];
+
+    pachubeOutput->init();      // Just to be sure
+    printf("routePayload: ");
+    for ( int i = 0; i < dataLen; i++ )
+        printf("%02X ", *ptr++ );
+
+    printf("\n");
+
+    short payloadType = getPayloadType( data, dataLen );
+
+    printf("Group ID: %d Node ID: %d Length: %d Status: %02x, Payload type %d\n", data[0], data[1] & 0x1f, data[2], data[3], payloadType );
+
+    switch ( payloadType ) {
+        case PAYLOAD_TYPE_SIMPLE:
+            psimp = PayloadSimple( data, dataLen);
+            printf("SIMPLE NumReadings %d\n", psimp.numReadings());
+            for ( int n = 0; n<psimp.numReadings(); n++ ) {
+                printf("%d: %d - %d\n", n, psimp.sensorId(n), psimp.reading(n) );
+                // Get list of routing objects
+                pr = getRouting( psimp.nodeId(), psimp.sensorId(n) );
+                if ( pr != NULL ) {
+                    if ( pr->output != NULL ) {
+//                        printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2);
+                        snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(psimp.reading(n) * pr->factor));
+                        printf("Add to output %s, %s, %s\n", pr->param1, pr->param2, tmpBuf );
+                        pr->output->addReading( pr->param1, pr->param2,tmpBuf );
+                    }
+                }
+            }
+            if ( pr != NULL ) {
+                if ( pr->output != NULL )
+                    pr->output->send();
+            }
+            routed = true;
+
+            break;
+        case PAYLOAD_TYPE_V1:
+            pv1 = PayloadV1( data, dataLen);
+            printf("V1 NumReadings %d, Node %d\n", pv1.numReadings(), pv1.nodeId() );
+
+            pr = getRouting( pv1.nodeId(), -1 );
+            if ( pr != NULL ) {
+                // Output to destination
+//                printf("Updating low battery feed\n");
+                if ( data[3] & STATUS_LOW_BATTERY ) {
+                    printf("LOW Battery detected\n");
+                }
+                if ( pr->output != NULL ) {
+                    snprintf(tmpBuf, DATABUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY ));
+                    pr->output->addReading( pr->param1, pr->param2, tmpBuf );
+                    }
+            }
+            // Determine output
+            for ( int n = 0; n<pv1.numReadings(); n++ ) {
+                pr = getRouting(pv1.nodeId(), pv1.sensorId(n) );
+                if ( pr != NULL ) {
+                    tmpBuf[0] = '\0';
+//                    printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2);
+                    if ( pr->output != NULL ) {
+                        snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv1.reading(n) * pr->factor));
+                        pr->output->addReading( pr->param1, pr->param2, tmpBuf );
+                    }
+                    printf("%d: %d - %s\n", n, pv1.sensorId(n), tmpBuf );
+
+                }
+            }
+            if ( pr != NULL ) {
+                if ( pr->output != NULL )
+                    pr->output->send();
+            }
+            routed = true;
+            break;
+        case PAYLOAD_TYPE_V2:
+            pv2 = PayloadV2( data, dataLen );
+//            printf("V2 NumReadings %d\n", pv2.numReadings());
+            pr = getRouting( pv2.nodeId(), -1 );
+            if ( pr != NULL ) {
+                // Output to destination
+//                printf("Updating low battery feed\n");
+                if ( data[3] & STATUS_LOW_BATTERY ) {
+                    printf("LOW Battery detected\n");
+                }
+                if ( pr->output != NULL ) {
+                    snprintf(tmpBuf, DATABUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY ));
+                    pr->output->addReading( pr->param1, pr->param2, tmpBuf );
+                }
+                // Need to update add reading to detect change in feed ID and send.
+            }
+
+            for ( int n = 0; n < pv2.numReadings(); n++ ) {
+//                printf("Getting reading %d node %d sensor id %d\n",n, pv2.nodeId(), pv2.sensorId(n) );
+                pr = getRouting( pv2.nodeId(), pv2.sensorId(n) );
+                if ( pr != NULL ) {
+//                        printf("Item: pr %ld, out type %d\n", (int)pr, pr->outType);
+
+//                        printf("readingType = %d\n",pv2.readingType( n ));
+//                        printf("IoTRouting: output = %ld\n",(int)pr->output);
+
+                    if ( pr->output != NULL ) {
+                         tmpBuf[0] = '\0';
+
+                        switch ( pv2.readingType( n ) ) {
+                            case V2_DATATYPE_BYTE:
+                                snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv2.readingByte(n) * pr->factor));
+                                break;
+                            case V2_DATATYPE_SHORT:
+                                snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv2.readingShort(n) * pr->factor));
+                                break;
+                            case V2_DATATYPE_LONG:
+                                snprintf(tmpBuf, DATABUF_SIZE, "%.3f", (float)(pv2.readingLong(n) * pr->factor));
+                                break;
+                            case V2_DATATYPE_STRING:
+                                break;
+                            default:
+                                break;
+                        }
+                        pr->output->addReading( pr->param1, pr->param2, tmpBuf );
+                    }
+                }
+            }
+            if ( pr != NULL ) {
+                if ( pr->output != NULL )
+                    pr->output->send();
+            }
+
+            routed = true;
+            break;
+        case PAYLOAD_TYPE_UNKNOWN:      // Unknown node found
+            printf("Unknown payload type found\n");
+            break;
+        default:
+            printf("Unknown\n");
+            break;
+    }
+
+    pachubeOutput->send();     // Just in case anything left to send
+    printf("Finished routing\n");
+    return routed;
+}
+