Official reference client implementation for Cumulocity SmartREST on u-blox C027.

Dependencies:   C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed

Fork of MbedSmartRestMain by Vincent Wochnik

operation/OperationSupport.cpp

Committer:
xinlei
Date:
2015-04-13
Revision:
93:0acd11870c6a
Parent:
91:423177e8a401
Child:
94:61d44636f020

File content as of revision 93:0acd11870c6a:

#include <string.h>
#include "OperationSupport.h"
//#include "Aggregator.h"
#include "SmartRestConf.h"
#include "logging.h"

//CharValue aOperationStatePending("PENDING");
//CharValue aOperationStateExecuting("EXECUTING");
//CharValue aOperationStateSuccessful("SUCCESSFUL");
//CharValue aOperationStateFailed("FAILED");

const char strPending[] = "PENDING";
const char strExecuting[] = "EXECUTING";
const char strSuccessful[] = "SUCCESSFUL";
const char strFailed[] = "FAILED";

OperationSupport::OperationSupport(RtosSmartRest& client, SmartRestTemplate& tpl, long& deviceId, 
                                   ConfigurationSynchronization& configurationSynchronization,
                                   LCDDisplay& lcdDisplay) :
    _deviceId(deviceId),
    _tpl(tpl),
    _client(client),
    _executor(client, tpl, deviceId, configurationSynchronization, lcdDisplay),
    sock(),
    _thread2(OperationSupport::thread2_func, this),
    _thread3(OperationSupport::thread3_func, this)
{
    _init = false;
    _firstRun = true;
}

bool OperationSupport::init()
{
    if (_init)
        return false;
    
    // Get pending operations
    // USAGE: 110,<DEVICE/ID>
    if (!_tpl.add("10,110,GET,/devicecontrol/operations?status=PENDING&deviceId=%%&pageSize=100,,application/vnd.com.nsn.cumulocity.operationCollection+json,%%,UNSIGNED,\r\n"))
        return false;

    // Set operation state
    // USAGE: 111,<OPERATION/ID>,<STATE>
    if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n"))
        return false;

    // Get operations
    // Response: 210,<OPERATION/ID>,<STATUS>
    if (!_tpl.add("11,210,\"$.operations\",,\"$.id\",\"$.status\"\r\n"))
        return false;

    // Get operation
    // Response: 211,<OPERATION/ID>,<STATUS>
    if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n"))
        return false;
        
    if (!_executor.init())
        return false;

    _init = true;
    return true;
}

bool OperationSupport::run()
{
    if (_firstRun) {
        _firstRun = false;
        bool b = requestPendingOperations();
        return b;
    } else {
        return true;
    }
}

bool OperationSupport::executePendingOperation(Operation& op)
{
    ComposedRecord r;
    if (!r.add(IntegerValue(112)) || !r.add(IntegerValue(op.identifier))) {
        return false;
    } else if (_client.send(r) != SMARTREST_SUCCESS) {
        _client.stop();
        return false;
    }
    ParsedRecord p;
    bool b = true;
    while (_client.receive(p) == SMARTREST_SUCCESS) {
        if (p.values() >= 3 &&
            p.value(0).integerValue() >= 220 &&
            p.value(0).integerValue() <= 222 &&
            p.value(2).valueType() == VALUE_INTEGER) {
            Operation* op1 = opool.alloc();
            op1->identifier = p.value(2).integerValue();
            bool ret = _executor.executeOperation(p);
            op1->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED;
            opool.put(op1);
        }
    }
    _client.stop();
    return b;
}

bool OperationSupport::requestPendingOperations()
{
    IntegerValue msgId(110);
    IntegerValue deviceId(_deviceId);
    ComposedRecord record;

    if ((!record.add(msgId)) || (!record.add(deviceId)))
        return false;
    else if (_client.send(record) != SMARTREST_SUCCESS) {
        _client.stop();
        return false;
    }

    uint8_t ret;
    ParsedRecord received;
    Operation opl[10];
    size_t c = 0;
    while ((ret=_client.receive(received)) == SMARTREST_SUCCESS) {
        if (c < 10 && operationFromRecord(received, opl[c])) {
            ++c;
        } else {
            aWarning("Ignored pending operation after 10.\n");
            break;
        }
    }
    _client.stop();

    for (size_t i = 0; i < c; ++i) {
        Operation* op = opool.alloc();
        op->identifier = opl[i].identifier;
        op->state = OPERATION_EXECUTING;
        opool.put(op);
        executePendingOperation(opl[i]);
    }
    return (ret == SMARTREST_END_OF_RESPONSE || ret == SMARTREST_CONNECTION_CLOSED);
}

bool OperationSupport::operationFromRecord(ParsedRecord& received, Operation& op)
{
    if ((received.values() < 4) ||
        (received.value(0).valueType() != VALUE_INTEGER) ||
//        (received.value(0).integerValue() != 211) ||
        (received.value(2).valueType() != VALUE_INTEGER) ||
        (received.value(3).valueType() != VALUE_CHARACTER))
        return false;

    op.identifier = received.value(2).integerValue();
    const char *tmp = received.value(3).characterValue();
    if (strcmp(tmp, strExecuting) == 0)
        op.state = OPERATION_EXECUTING;
    else if (strcmp(tmp, strSuccessful) == 0)
        op.state = OPERATION_SUCCESSFUL;
    else if (strcmp(tmp, strFailed) == 0)
        op.state = OPERATION_FAILED;
    else if (strcmp(tmp, strPending) == 0)
        op.state = OPERATION_PENDING;
    else
        return false;
    return true;
}

//CharValue& OperationSupport::operationStateValue(OperationState state) const
//{
//    switch (state) {
//    case OPERATION_EXECUTING:  return aOperationStateExecuting;
//    case OPERATION_SUCCESSFUL: return aOperationStateSuccessful;
//    case OPERATION_FAILED:     return aOperationStateFailed;
//    default:                   return aOperationStatePending;
//    }
//}

const char * OperationSupport::getOperationStateChar(OperationState state) const
{
    switch (state) {
    case OPERATION_EXECUTING:  return strExecuting;
    case OPERATION_SUCCESSFUL: return strSuccessful;
    case OPERATION_FAILED:     return strFailed;
    case OPERATION_PENDING:    return strPending;
    }
}

//void OperationSupport::thread2()
//{
//    IntegerValue msgId(111);
//    Aggregator aggr(true);
//    aInfo("Report thread: %p\n", Thread::gettid());
//    while (true) {
//        while (!aggr.full()) {
//            osEvent e = opool.get(200);
//            if (e.status == osEventTimeout) {
//                break;
//            } else if (e.status == osEventMail) {
//                Operation *op = (Operation*)e.value.p;
//                ComposedRecord record;
//                IntegerValue operationId(op->identifier);
//                if (record.add(msgId) && record.add(operationId) &&
//                    record.add(operationStateValue(op->state)) && aggr.add(record)) {
//                    printf("Reporting: <%ld, %u>.\n", op->identifier, op->state);
//                    opool.free(op);
//                } else {
//                    opool.put(op);
//                }
//            } else {
//                break;
//            }
//        }
//        if (aggr.length()) {
//            _client.sendAndClose(aggr);
//            _client.stop();
//            aggr.clear();
//        }
//    }
//}

const char fmt[] = "POST /s HTTP/1.0\r\nHost: %s\r\nAuthorization: Basic %s\r\nX-Id: %s\r\nContent-Length: %d\r\n\r\n%s";
const char fmt2[] = "111,%ld,%s\r\n";
void OperationSupport::thread2()
{
    aInfo("Report thread: %p\n", Thread::gettid());
    while (true) {
        osEvent e = opool.get();
        if (e.status == osEventMail) {
            Operation *op = (Operation*)e.value.p;
            aDebug("Report: <%ld, %u>.\n", op->identifier, op->state);
            int len = snprintf(buf2, sizeof(buf2), fmt2, op->identifier, getOperationStateChar(op->state));
            opool.free(op);
            int ret = sock.connect(getHost(), getPort());
            for (uint8_t i = 1; ret < 0 && i < 3; ++i) {
                ret = sock.connect(getHost(), getPort());
            }
            if (ret >= 0) {
                int l = snprintf(buf, sizeof(buf), fmt, getHost(), getAuthStr(), getIdentifier(), len, buf2);
                ret = sock.send(buf, l);
                if (ret < 0) {
                    aError("Report: Send!\n");
                }
                sock.close();
            } else {
                aError("Report: Connect!\n");
            }
        }
    }
}

void OperationSupport::thread3()
{
    char bayeuxId[33];
    ComposedRecord record;
    ParsedRecord received;

    while (!_init || _firstRun)
        Thread::yield();

    // request Bayeux ID
    aInfo("Polling thread: %p\n", Thread::gettid());
    {
        const char *str;
        IntegerValue msgId(80);
        if (!record.add(msgId))
            return;
        
        if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) ||
            (_client.receive(received) != SMARTREST_SUCCESS)) {
            _client.stop();
            return;
        }
        _client.stop();

        str = received.value(0).characterValue();
        if (str == NULL || strlen(str) > sizeof(bayeuxId))
            return;

        strcpy(bayeuxId, str);
        record.clear();
        aInfo("Polling: bayeux ID %s\n", str);
    }

    // set channel
    {
        char chn[16];
        int len = 0;
        snprintf(chn, sizeof(chn), "/%ld%n", _deviceId, &len);
        if (len == 0 || len == sizeof(chn)) {
            aError("Polling: Invalid id %ld.\n", _deviceId);
            return;
        }

        IntegerValue msgId(81);
        CharValue bid(bayeuxId);
        CharValue channel(chn);
        if (!record.add(msgId) || !record.add(bid) || !record.add(channel)) {
            return;
        }
        
        if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) ||
            (_client.receive(received) != SMARTREST_END_OF_RESPONSE)) {
            aError("Polling: Subscribe channel %s.\n", chn);
            _client.stop();
            return;
        }
        _client.stop();
        record.clear();
        aInfo("Polling: Subscribed channel %s!\n", chn);
    }
    
    if (!record.add(IntegerValue(83)) || !record.add(CharValue(bayeuxId))) {
        return;
    }

    while (true) {
        if (_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) {
            aError("Polling: Connect.\n");
            _client.stop();
            continue;
        }
        while (_client.receive(received) == SMARTREST_SUCCESS) {
            for (size_t i = 0; i < received.values(); ++i) {
                printf("%s ", received.rawValue(i));
            }
            puts("");
            if (received.values() == 0) {
                continue;
            } else if (received.value(0).integerValue() == 86) { // advice responses
                if (received.values() < 5) {
                    aWarning("Incomplete advice, %u of 5 values received.\n", received.values());
                    continue;
                } else if (strcmp(received.value(4).characterValue(), "retry") == 0) {
                    aDebug("Carry out 'retry' policy.\n");
                    break;
                } else if (strcmp(received.value(4).characterValue(), "handshake") == 0) {
                }
            } else if (received.value(0).integerValue() == 211) { // Operation State Message
                Operation* op = opool.alloc();
                if (op == NULL) {
                    aCritical("opool full!\n");
                } else if (operationFromRecord(received, *op)) {
                    op->state = OPERATION_EXECUTING;
                    opool.put(op);
                } else {
                    opool.free(op);
                }
            } else if (received.value(0).integerValue() >= 220 &&
                       received.value(0).integerValue() <= 222) { // Real Operation
                if (received.values() >= 3 &&
                    received.value(2).valueType() == VALUE_INTEGER) {
                    Operation* op = opool.alloc();
                    op->identifier = received.value(2).integerValue();
                    bool ret = _executor.executeOperation(received);
                    op->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED;
                    opool.put(op);
                } else {
                    aWarning("Incomplete operation, %u of 4 values received.\n", received.values());
                }
            } else {
                aError("Unknown operation ID: %d\n", received.value(0).integerValue());
            }
        }
        _client.stop();
    }
}

void OperationSupport::thread2_func(void const *arg)
{
    OperationSupport *that;
    that = (OperationSupport*)arg;
    that->thread2();
}

void OperationSupport::thread3_func(void const *arg)
{
    OperationSupport *that;
    that = (OperationSupport*)arg;
    that->thread3();
}