Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed
Fork of MbedSmartRestMain by
operation/OperationSupport.cpp
- Committer:
- vwochnik
- Date:
- 2014-11-30
- Revision:
- 67:c360a2b2c948
- Parent:
- 65:a62dbef2f924
- Child:
- 68:0dc778a16d0d
File content as of revision 67:c360a2b2c948:
#include "OperationSupport.h"
#include <string.h>
#include "Aggregator.h"
#include "ComposedRecord.h"
#include "CharValue.h"
#include "IntegerValue.h"
CharValue aOperationStatePending("PENDING");
CharValue aOperationStateExecuting("EXECUTING");
CharValue aOperationStateSuccessful("SUCCESSFUL");
CharValue aOperationStateFailed("FAILED");
OperationSupport::OperationSupport(AbstractSmartRest& client, SmartRestTemplate& tpl, long& deviceId, DeviceIO& io) :
_client(client),
_tpl(tpl),
_deviceId(deviceId),
_executor(client, tpl, deviceId, io),
_thread1(OperationSupport::thread1_func, this),
_thread2(OperationSupport::thread2_func, this),
_thread3(OperationSupport::thread3_func, this)
{
_init = false;
_firstRun = true;
}
bool OperationSupport::init()
{
if (_init)
return false;
// Get pending operations
// USAGE: 110,<DEVICE/ID>
if (!_tpl.add("10,110,GET,/devicecontrol/operations?status=PENDING&deviceId=%%&pageSize=100,,application/vnd.com.nsn.cumulocity.operationCollection+json,%%,UNSIGNED,\r\n"))
return false;
// Set operation state
// USAGE: 110,<OPERATION/ID>,<STATE>
if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n"))
return false;
// Get operations
// Response: 210,<OPERATION/ID>,<STATUS>
if (!_tpl.add("11,210,\"$.operations\",,\"$.id\",\"$.status\"\r\n"))
return false;
// Get operation
// Response: 211,<OPERATION/ID>,<STATUS>
if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n"))
return false;
if (!_executor.init())
return false;
_init = true;
return true;
}
bool OperationSupport::run()
{
if (_firstRun) {
_firstRun = false;
return true;
if (!requestPendingOperations())
return false;
}
return true;
}
bool OperationSupport::requestPendingOperations()
{
uint8_t ret;
OperationStore::Operation op;
ComposedRecord record;
ParsedRecord received;
IntegerValue msgId(110);
IntegerValue deviceId(_deviceId);
if ((!record.add(msgId)) || (!record.add(deviceId)))
return false;
if (_client.send(record) != SMARTREST_SUCCESS) {
_client.stop();
return false;
}
while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) {
if (!operationFromRecord(received, op))
continue;
if (!_store.enqueue(op))
continue;
}
_client.stop();
if ((ret != SMARTREST_END_OF_RESPONSE) &&
(ret != SMARTREST_CONNECTION_CLOSED)) {
return false;
}
return true;
}
bool OperationSupport::operationFromRecord(ParsedRecord& received, OperationStore::Operation& op)
{
const char *tmp;
if ((received.values() < 4) ||
(received.value(0).valueType() != VALUE_INTEGER) ||
(received.value(0).integerValue() != 210) ||
(received.value(2).valueType() != VALUE_INTEGER) ||
(received.value(3).valueType() != VALUE_CHARACTER))
return false;
op.identifier = received.value(2).integerValue();
tmp = received.value(3).characterValue();
if (strcmp(tmp, "EXECUTING") == 0)
op.state = OPERATION_EXECUTING;
else if (strcmp(tmp, "SUCCESSFUL") == 0)
op.state = OPERATION_SUCCESSFUL;
else if (strcmp(tmp, "FAILED") == 0)
op.state = OPERATION_FAILED;
else
op.state = OPERATION_PENDING;
return true;
}
bool OperationSupport::updateOperation(OperationStore::Operation& op)
{
uint8_t ret; bool found;
ComposedRecord record;
ParsedRecord received;
IntegerValue msgId(111);
IntegerValue operationId(op.identifier);
if ((!record.add(msgId)) || (!record.add(operationId)) ||
(!record.add(operationStateValue(op))))
return false;
if (_client.send(record) != SMARTREST_SUCCESS) {
_client.stop();
return false;
}
found = false;
while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) {
if ((received.values() == 4) &&
(received.value(0).valueType() == VALUE_INTEGER) &&
(received.value(0).integerValue() == 211) &&
(received.value(2).valueType() == VALUE_INTEGER) &&
(received.value(2).integerValue() == op.identifier)) {
found = true;
break;
}
}
_client.stop();
return found;
}
CharValue& OperationSupport::operationStateValue(OperationStore::Operation& op)
{
switch (op.state) {
case OPERATION_EXECUTING:
return aOperationStateExecuting;
case OPERATION_SUCCESSFUL:
return aOperationStateSuccessful;
case OPERATION_FAILED:
return aOperationStateFailed;
default:
return aOperationStatePending;
}
}
void OperationSupport::thread1()
{
OperationStore::Operation op;
bool ret;
while (!_init)
Thread::yield();
while (true) {
if (!_store.takePending(op)) {
Thread::yield();
continue;
}
updateOperation(op);
ret = _executor.executeOperation(op);
_store.markAsDone(op, ret);
}
}
void OperationSupport::thread2()
{
OperationStore::Operation op;
Aggregator aggr(true);
while (!_init)
Thread::yield();
while (true) {
while ((!aggr.full()) && (_store.takeDone(op))) {
ComposedRecord record;
IntegerValue msgId(111);
IntegerValue operationId(op.identifier);
if ((!record.add(msgId)) || (!record.add(operationId)) ||
(!record.add(operationStateValue(op))))
break;
if (!aggr.add(record))
break;
}
if (aggr.length() == 0) {
Thread::yield();
continue;
}
if (_client.send(aggr) != SMARTREST_SUCCESS) { }
_client.stop();
aggr.clear();
}
}
void OperationSupport::thread3()
{
char bayeuxId[33];
ComposedRecord record;
ParsedRecord received;
while ((!_init) || (_firstRun))
Thread::yield();
// request Bayeux ID
{
IntegerValue msgId(80);
if (!record.add(msgId))
return;
if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS)) {
puts("Sending stream failed.");
_client.stop();
return;
}
if ((_client.receive(received) != SMARTREST_SUCCESS)) {
puts("Receiving stream failed.");
_client.stop();
}
_client.stop();
puts(received.value(0).characterValue());
}
while (true) {
}
}
void OperationSupport::thread1_func(void const *arg)
{
OperationSupport *that;
that = (OperationSupport*)arg;
that->thread1();
}
void OperationSupport::thread2_func(void const *arg)
{
OperationSupport *that;
that = (OperationSupport*)arg;
that->thread2();
}
void OperationSupport::thread3_func(void const *arg)
{
OperationSupport *that;
that = (OperationSupport*)arg;
that->thread3();
}

Cumulocity