Official reference client implementation for Cumulocity SmartREST on u-blox C027.

Dependencies:   C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed

Fork of MbedSmartRestMain by Vincent Wochnik

operation/OperationSupport.cpp

Committer:
xinlei
Date:
2015-03-03
Revision:
77:f6717e4eccc4
Parent:
70:f489ca11f254
Child:
81:4a7761914901

File content as of revision 77:f6717e4eccc4:

#include <string.h>
#include "OperationSupport.h"
#include "Aggregator.h"
#include "ComposedRecord.h"
#include "CharValue.h"
#include "IntegerValue.h"

CharValue aOperationStatePending("PENDING");
CharValue aOperationStateExecuting("EXECUTING");
CharValue aOperationStateSuccessful("SUCCESSFUL");
CharValue aOperationStateFailed("FAILED");

OperationSupport::OperationSupport(AbstractSmartRest& client, SmartRestTemplate& tpl, long& deviceId, ConfigurationSynchronization& configurationSynchronization, DeviceIO& io) :
    _client(client),
    _tpl(tpl),
    _deviceId(deviceId),
    _executor(client, tpl, deviceId, configurationSynchronization, io),
    _thread1(OperationSupport::thread1_func, this),
    _thread2(OperationSupport::thread2_func, this),
    _thread3(OperationSupport::thread3_func, this)
{
    _init = false;
    _firstRun = true;
}

bool OperationSupport::init()
{
    if (_init)
        return false;
    
    // Get pending operations
    // USAGE: 110,<DEVICE/ID>
    if (!_tpl.add("10,110,GET,/devicecontrol/operations?status=PENDING&deviceId=%%&pageSize=100,,application/vnd.com.nsn.cumulocity.operationCollection+json,%%,UNSIGNED,\r\n"))
        return false;

    // Set operation state
    // USAGE: 110,<OPERATION/ID>,<STATE>
    if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n"))
        return false;

    // Get operations
    // Response: 210,<OPERATION/ID>,<STATUS>
    if (!_tpl.add("11,210,\"$.operations\",,\"$.id\",\"$.status\"\r\n"))
        return false;

    // Get operation
    // Response: 211,<OPERATION/ID>,<STATUS>
    if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n"))
        return false;
        
    if (!_executor.init())
        return false;

    _init = true;
    return true;
}

bool OperationSupport::run()
{
    if (_firstRun) {
        _firstRun = false;
        if (!requestPendingOperations())
            return false;
    }
    return true;
}

bool OperationSupport::requestPendingOperations()
{
    uint8_t ret;
    OperationStore::Operation op;

    ComposedRecord record;
    ParsedRecord received;
    
    IntegerValue msgId(110);
    IntegerValue deviceId(_deviceId);
    if ((!record.add(msgId)) || (!record.add(deviceId)))
        return false;

    if (_client.send(record) != SMARTREST_SUCCESS) {
        _client.stop();
        return false;
    }
    
    while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) {
        if (!operationFromRecord(received, op))
            continue;
        _store.enqueue(op);
    }
    _client.stop();

    if ((ret != SMARTREST_END_OF_RESPONSE) &&
        (ret != SMARTREST_CONNECTION_CLOSED)) {
        return false;
    }   
    return true;
}

bool OperationSupport::operationFromRecord(ParsedRecord& received, OperationStore::Operation& op)
{
    const char *tmp;

    if ((received.values() < 4) ||
        (received.value(0).valueType() != VALUE_INTEGER) ||
//TODO: error here since streaming op does not have 210 msg id
//        (received.value(0).integerValue() != 210) ||
        (received.value(2).valueType() != VALUE_INTEGER) ||
        (received.value(3).valueType() != VALUE_CHARACTER))
        return false;
    
    op.identifier = received.value(2).integerValue();
    tmp = received.value(3).characterValue();
    if (strcmp(tmp, "EXECUTING") == 0)
        op.state = OPERATION_EXECUTING;
    else if (strcmp(tmp, "SUCCESSFUL") == 0)
        op.state = OPERATION_SUCCESSFUL;
    else if (strcmp(tmp, "FAILED") == 0)
        op.state = OPERATION_FAILED;
    else if (strcmp(tmp, "PENDING") == 0)
        op.state = OPERATION_PENDING;
    else
        return false;
    
    return true;
}

bool OperationSupport::updateOperation(OperationStore::Operation& op)
{
    ComposedRecord record;
    ParsedRecord received;   
    IntegerValue msgId(111);
    IntegerValue operationId(op.identifier);
    if ((!record.add(msgId)) || (!record.add(operationId)) ||
        (!record.add(operationStateValue(op))))
        return false;
    
    if (_client.send(record) != SMARTREST_SUCCESS) {
        _client.stop();
        return false;
    }
    
    bool found = false;
    uint8_t ret;
    while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) {
        if ((received.values() == 4) &&
            (received.value(0).valueType() == VALUE_INTEGER) &&
            (received.value(0).integerValue() == 211) &&
            (received.value(2).valueType() == VALUE_INTEGER) &&
            (received.value(2).integerValue() == op.identifier)) {
            found = true;
            break;
        }
    }
    _client.stop();

    return found;
}

CharValue& OperationSupport::operationStateValue(OperationStore::Operation& op)
{
    switch (op.state) {
    case OPERATION_EXECUTING:
        return aOperationStateExecuting;
    case OPERATION_SUCCESSFUL:
        return aOperationStateSuccessful;
    case OPERATION_FAILED:
        return aOperationStateFailed;
    default:
        return aOperationStatePending;
    }
}

void OperationSupport::thread1()
{
    OperationStore::Operation op;
    bool ret;

    while (!_init)
        Thread::yield();
    
    while (true) {
        if (!_store.takePending(op)) {
            Thread::yield();
            continue;
        }
        
        updateOperation(op);
        ret = _executor.executeOperation(op);
        _store.markAsDone(op, ret);
    }
}

void OperationSupport::thread2()
{
    OperationStore::Operation op;
    Aggregator aggr(true);

    while (!_init)
        Thread::yield();
    
    while (true) {
        while ((!aggr.full()) && (_store.takeDone(op))) {
            ComposedRecord record;
    
            IntegerValue msgId(111);
            IntegerValue operationId(op.identifier);
            if ((!record.add(msgId)) || (!record.add(operationId)) ||
                (!record.add(operationStateValue(op))))
                break;
            
            if (!aggr.add(record))
                break;
        }
        
        if (aggr.length() == 0) {
            Thread::yield();
            continue;
        }
        
        if (_client.send(aggr) != SMARTREST_SUCCESS) { }
        _client.stop();
        aggr.clear();        
    }
}

void OperationSupport::thread3()
{
    char bayeuxId[33];
    ComposedRecord record;
    ParsedRecord received;
    OperationStore::Operation op;
    
    while ((!_init) || (_firstRun))
        Thread::yield();
    
    // request Bayeux ID
    {
        const char *str;
        IntegerValue msgId(80);
        if (!record.add(msgId))
            return;
        
        if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) ||
            (_client.receive(received) != SMARTREST_SUCCESS)) {
            _client.stop();
            return;
        }
        _client.stop();

        str = received.value(0).characterValue();
        if ((str == NULL) || (strlen(str) > sizeof(bayeuxId)))
            return;

        strcpy(bayeuxId, str);
        record.clear();
    }
    
    // set channel
    {
        char chn[16];
        int len = 0;
        snprintf(chn, sizeof(chn), "/%ld%n", _deviceId, &len);
        if ((len == 0) || (len == sizeof(chn)))
            return;
        
        IntegerValue msgId(81);
        CharValue bid(bayeuxId);
        CharValue channel(chn);
        if ((!record.add(msgId)) || (!record.add(bid)) || (!record.add(channel)))
            return;
        
        if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) ||
            (_client.receive(received) != SMARTREST_END_OF_RESPONSE)) {
            _client.stop();
            return;
        }

        _client.stop();
        record.clear();
    }
    
    {
        IntegerValue msgId(83);
        CharValue bid(bayeuxId);
        if ((!record.add(msgId)) || (!record.add(bid)))
            return;
    }

    while (true) {
        if (_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) {
            _client.stop();
            continue;
        }

        while (_client.receive(received) == SMARTREST_SUCCESS) {
            if (!operationFromRecord(received, op))
                continue;
            _store.enqueue(op);
        }
        
        //TODO: error checking
        _client.stop();
    }
}

void OperationSupport::thread1_func(void const *arg)
{
    OperationSupport *that;
    that = (OperationSupport*)arg;
    that->thread1();
}

void OperationSupport::thread2_func(void const *arg)
{
    OperationSupport *that;
    that = (OperationSupport*)arg;
    that->thread2();
}

void OperationSupport::thread3_func(void const *arg)
{
    OperationSupport *that;
    that = (OperationSupport*)arg;
    that->thread3();
}