mbed based IoT Gateway More details http://blog.thiseldo.co.uk/wp-filez/IoTGateway.pdf

Dependencies:   NetServices FatFileSystem csv_parser mbed MQTTClient RF12B DNSResolver SDFileSystem

Routing/IoTRouting.cpp

Committer:
SomeRandomBloke
Date:
2012-05-09
Revision:
5:0dbc27a7af55
Parent:
4:d460406ac780

File content as of revision 5:0dbc27a7af55:

/** IoT Gateway Routing for incoming messages
 *
 * @author Andrew Lindsay
 *
 * @section LICENSE
 *
 * Copyright (c) 2012 Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:

 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 * 
 * @section DESCRIPTION
 * 
 * 
 */

#include "mbed.h"
#include "RF12B.h"
#include "IoTRouting.h"
#include "csv_parser.h"
#include "PayloadV1.h"
#include "PayloadV2.h"
#include "PayloadSimple.h"
#include "OutputPachube.h"

//DigitalOut activityLED(p29, "activityLED");

#define iotRoutingFile "/iotfs/IOTRTG.TXT"
#define iotNodeDefFile "/iotfs/IOTNODE.TXT"


// Default Constructor
IoTRouting::IoTRouting() {}


void IoTRouting::addNodeToList(uint8_t groupId, uint8_t nodeId, uint8_t length, uint8_t type ) {
    IoTNodeDef *pr = new IoTNodeDef();
    pr->groupId = groupId;
    pr->nodeId = nodeId;
    pr->length = length;
    pr->type = type;

    _nodeDefList.push_back(pr);
}

void IoTRouting::addRoutingToList( short nodeId, short sensorId, float factor, byte outType,
                                   char *param1, char *param2, char *param3, char *param4 ) {

    PayloadRouting* pr = new PayloadRouting();
    pr->nodeId = nodeId;
    pr->sensorId = sensorId;

    pr->factor = factor;
    pr->outType = outType;

    switch ( pr->outType ) {
        case  OUTPUT_TYPE_PACHUBE:
            pr->output = pachubeOutput;
            break;
        case  OUTPUT_TYPE_MQTT:
            pr->output = mqttOutput;
            break;
        case  OUTPUT_TYPE_EMONCMS:
            pr->output = emonCmsOutput;
            break;
        case  OUTPUT_TYPE_SENSE:
            pr->output = senSeOutput;
            break;
        default:
            pr->output = NULL;
            break;
    }

    pr->param1 = new char[strlen(param1)+1];
    strcpy(pr->param1, param1);
    pr->param2 = new char[strlen(param2)+1];
    strcpy(pr->param2, param2);
    pr->param3 = new char[strlen(param3)+1];
    strcpy(pr->param3, param3);
    pr->param4 = new char[strlen(param4)+1];
    strcpy(pr->param4, param4);
//    printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
    _routing.push_back(pr);
}


void IoTRouting::initRouting() {
    sendCount = 0;

    FILE *fp = fopen(iotNodeDefFile, "r");
    if (fp == NULL) {
        printf("Could not open file %s for read\n", iotNodeDefFile);
        return;
    }

    // read file

    const char field_terminator = ',';
    const char line_terminator  = '\n';
    const char enclosure_char   = '"';

    csv_parser file_parser;

    /* Define how many records we're gonna skip. This could be used to skip the column definitions. */
    file_parser.set_skip_lines(0);

    /* Specify the file to parse */
    file_parser.init(fp);

    /* Here we tell the parser how to parse the file */
    file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL);
    file_parser.set_field_term_char(field_terminator);
    file_parser.set_line_term_char(line_terminator);

    /* Check to see if there are more records, then grab each row one at a time */

    while (file_parser.has_more_rows()) {
        csv_row row = file_parser.get_row();
        if (row.size()<4) {
            printf("row too short %d\n", row.size());
            continue;
        }
        addNodeToList((uint8_t)atoi(row[0].c_str()), (uint8_t)atoi(row[1].c_str()),
                      (uint8_t)atoi(row[2].c_str()), (uint8_t)atoi(row[3].c_str()));
    }

    fclose(fp);

//    printf("Finished loading routing data\n");

    fp = fopen(iotRoutingFile, "r");
    if (fp == NULL) {
        printf("Could not open file %s for read\n", iotRoutingFile);
        return;
    }
//    _routing=new list<PayloadRouting*>();

    /* Define how many records we're gonna skip. This could be used to skip the column definitions. */
    file_parser.set_skip_lines(0);

    /* Specify the file to parse */
    file_parser.init(fp);

    /* Here we tell the parser how to parse the file */
    file_parser.set_enclosed_char(enclosure_char, ENCLOSURE_OPTIONAL);
    file_parser.set_field_term_char(field_terminator);
    file_parser.set_line_term_char(line_terminator);

    /* Check to see if there are more records, then grab each row one at a time */

    while (file_parser.has_more_rows()) {
        csv_row row = file_parser.get_row();
        if (row.size()<6) {
            printf("row too short %d\n", row.size());
            continue;
        }
        addRoutingToList( atoi(row[0].c_str()), atoi(row[1].c_str()), atof(row[2].c_str()), atoi(row[3].c_str()),
                          (char*)row[4].c_str(), (char*)row[5].c_str(), (char*)row[6].c_str(), (char*)row[7].c_str() );
    }

    fclose(fp);

//    printf("Finished loading routing data\n");

//    if(!mqtt.connect("IoTGateway")){
//        printf("\r\nConnect to server failed ..\r\n");
    //  return -1;
//    }
}

// Write the current nodelist back to file
// First rename original file to iotsetup.bak
bool IoTRouting::writeNodeList() {

    // Rename original file
//    if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) {
//        printf("Could not rename file %s\n", iotConfigFile);
//        return false;
//    }

    FILE *fp = fopen(iotNodeDefFile, "w");
    if (fp == NULL) {
        printf("Could not open file %s for write\n", iotNodeDefFile);
        return false;
    }

    for (short i=0; i<(int)_nodeDefList.size(); i++) {
        IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i);
//        printf("%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type );
        fprintf(fp, "%d,%d,%d,%d\n",nd->groupId,nd->nodeId, nd->length,nd->type );
    }

    fclose(fp);

    return true;
}

// Write the current nodelist back to file
// First rename original file to iotsetup.bak
bool IoTRouting::writeRoutingList() {

    // Rename original file
//    if( rename ( iotConfigFile.c_str(), iotConfigFileBak.c_str() ) != 0 ) {
//        printf("Could not rename file %s\n", iotRoutingFile);
//        return false;
//    }

    FILE *fp = fopen(iotRoutingFile, "w");
    if (fp == NULL) {
        printf("Could not open file %s for write\n", iotRoutingFile);
        return false;
    }

    for ( short i=0; i<(short)_routing.size(); i++ ) {
        PayloadRouting *pr = (PayloadRouting*)_routing.at(i);
//        printf("%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
        fprintf(fp, "%d,%d,%f,%d,%s,%s,%s,%s\n",pr->nodeId, pr->sensorId, pr->factor, pr->outType, pr->param1, pr->param2, pr->param3, pr->param4);
    }

    fclose(fp);

    return true;
}

void IoTRouting::addOutput( OutputDef *output, short outType ) {
    switch ( outType ) {
        case  OUTPUT_TYPE_PACHUBE:
            pachubeOutput = output;
            break;
        case  OUTPUT_TYPE_MQTT:
            mqttOutput = output;
            break;
        case  OUTPUT_TYPE_EMONCMS:
            emonCmsOutput = output;
            break;
        case  OUTPUT_TYPE_SENSE:
            senSeOutput = output;
            break;
        default:
            break;
    }
}

PayloadRouting* IoTRouting::getRouting( short nodeId, short sensorId ) {
//    printf("Getting routing info for node %d, sensor %d - ", nodeId, sensorId);
    for ( short i=0; i<(short)_routing.size(); i++ ) {

        PayloadRouting *pr = (PayloadRouting*)_routing.at(i);
//        printf("Node %d, sensor %d\n", pr->nodeId, pr->sensorId);
        if ( pr->nodeId == nodeId &&  pr->sensorId == sensorId ) {
//            printf("Found!\n");
            return pr;
        }
    }
//    printf("NOT found\n");
    // Add to routing list
    addRoutingToList( nodeId, sensorId, 1.0, OUTPUT_UNKNOWN,"","","","");
    // Save Routing list
    writeRoutingList();

    return NULL;
}


short IoTRouting::getPayloadType(  uint8_t *data, short dataLen ) {
//    printf("Getting payload type, size is %d -  ",(int)_nodeDefList.size() );
    for (short i=0; i<(int)_nodeDefList.size(); i++) {
//    printf("%d, %ld, ",i, (int)_nodeDefList.at(i));
        IoTNodeDef *nd = (IoTNodeDef*)_nodeDefList.at(i);
        if ( nd->groupId == data[0] && nd->nodeId == (data[1] & 0x1f) && nd->length == data[2] ) {
//            printf("Found %d\n", nd->type);
            return nd->type;
        }
    }
//    printf("NOT found\n");
    // Add to node list
    addNodeToList(data[0], data[1] & 0x1f, data[2], PAYLOAD_TYPE_UNKNOWN );
    // Save NodeList
    writeNodeList();
    return PAYLOAD_TYPE_UNKNOWN;
}

#define TMPBUF_SIZE 20

bool IoTRouting::routePayload( uint8_t *data, short dataLen ) {
    bool routed = false;
    uint8_t *ptr = data;
    PayloadSimple psimp;
    PayloadV1 pv1;
    PayloadV2 pv2;
    PayloadRouting *pr = NULL;
    char tmpBuf[TMPBUF_SIZE];

    pachubeOutput->init();      // Just to be sure
    printf("routePayload: ");
    for ( int i = 0; i < dataLen; i++ )
        printf("%02X ", *ptr++ );

    printf("\n");

    short payloadType = getPayloadType( data, dataLen );

    printf("Group ID: %d Node ID: %d Length: %d Status: %02x, Payload type %d\n", data[0], data[1] & 0x1f, data[2], data[3], payloadType );

    switch ( payloadType ) {
        case PAYLOAD_TYPE_SIMPLE:
            psimp = PayloadSimple( data, dataLen);
            printf("SIMPLE NumReadings %d\n", psimp.numReadings());
            for ( int n = 0; n<psimp.numReadings(); n++ ) {
                printf("%d: %d - %d\n", n, psimp.sensorId(n), psimp.reading(n) );
                // Get list of routing objects
                pr = getRouting( psimp.nodeId(), psimp.sensorId(n) );
                if ( pr != NULL ) {
                    if ( pr->output != NULL ) {
//                        printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2);
                        snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(psimp.reading(n) * pr->factor));
//                        printf("Add to output %s, %s, %s\n", pr->param1, pr->param2, tmpBuf );
                        pr->output->addReading( pr->param1, pr->param2,tmpBuf );
                    }
                }
            }
            if ( pr != NULL ) {
                if ( pr->output != NULL )
                    pr->output->send();
            }
            routed = true;

            break;
        case PAYLOAD_TYPE_V1:
            pv1 = PayloadV1( data, dataLen);
            printf("V1 NumReadings %d, Node %d\n", pv1.numReadings(), pv1.nodeId() );

            pr = getRouting( pv1.nodeId(), -1 );
            if ( pr != NULL ) {
                // Output to destination
//                printf("Updating low battery feed\n");
                if ( data[3] & STATUS_LOW_BATTERY ) {
                    printf("LOW Battery detected\n");
                }
                if ( pr->output != NULL ) {
                    snprintf(tmpBuf, TMPBUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY ));
                    pr->output->addReading( pr->param1, pr->param2, tmpBuf );
                    }
            }
            // Determine output
            for ( int n = 0; n<pv1.numReadings(); n++ ) {
                pr = getRouting(pv1.nodeId(), pv1.sensorId(n) );
                if ( pr != NULL ) {
                    tmpBuf[0] = '\0';
//                    printf("Pachube %d, %d, %f, %s, %s\n",pr->nodeId, pr->sensorId, pr->factor, pr->param1, pr->param2);
                    if ( pr->output != NULL ) {
                        snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv1.reading(n) * pr->factor));
                        pr->output->addReading( pr->param1, pr->param2, tmpBuf );
                    }
                    printf("%d: %d - %s\n", n, pv1.sensorId(n), tmpBuf );

                }
            }
            if ( pr != NULL ) {
                if ( pr->output != NULL )
                    pr->output->send();
            }
            routed = true;
            break;
        case PAYLOAD_TYPE_V2:
            pv2 = PayloadV2( data, dataLen );
//            printf("V2 NumReadings %d\n", pv2.numReadings());
            pr = getRouting( pv2.nodeId(), -1 );
            if ( pr != NULL ) {
                // Output to destination
//                printf("Updating low battery feed\n");
                if ( data[3] & STATUS_LOW_BATTERY ) {
                    printf("LOW Battery detected\n");
                }
                if ( pr->output != NULL ) {
                    snprintf(tmpBuf, TMPBUF_SIZE, "%d", (int)( data[3] & STATUS_LOW_BATTERY ));
                    pr->output->addReading( pr->param1, pr->param2, tmpBuf );
                }
                // Need to update add reading to detect change in feed ID and send.
            }

            for ( int n = 0; n < pv2.numReadings(); n++ ) {
//                printf("Getting reading %d node %d sensor id %d\n",n, pv2.nodeId(), pv2.sensorId(n) );
                pr = getRouting( pv2.nodeId(), pv2.sensorId(n) );
                if ( pr != NULL ) {
//                        printf("Item: pr %ld, out type %d\n", (int)pr, pr->outType);

//                        printf("readingType = %d\n",pv2.readingType( n ));
//                        printf("IoTRouting: output = %ld\n",(int)pr->output);

                    if ( pr->output != NULL ) {
                         tmpBuf[0] = '\0';

                        switch ( pv2.readingType( n ) ) {
                            case V2_DATATYPE_BYTE:
                                snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingByte(n) * pr->factor));
                                break;
                            case V2_DATATYPE_SHORT:
                                snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingShort(n) * pr->factor));
                                break;
                            case V2_DATATYPE_LONG:
                                snprintf(tmpBuf, TMPBUF_SIZE, "%.3f", (float)(pv2.readingLong(n) * pr->factor));
                                break;
                            case V2_DATATYPE_STRING:
                                break;
                            default:
                                break;
                        }
                        pr->output->addReading( pr->param1, pr->param2, tmpBuf );
                    }
                }
            }
            if ( pr != NULL ) {
                if ( pr->output != NULL )
                    pr->output->send();
            }

            routed = true;
            break;
        case PAYLOAD_TYPE_UNKNOWN:      // Unknown node found
            printf("Unknown payload type found\n");
            break;
        default:
            printf("Unknown\n");
            break;
    }

    pachubeOutput->send();     // Just in case anything left to send
    printf("Finished routing\n");
    return routed;
}