Official reference client implementation for Cumulocity SmartREST on u-blox C027.
Dependencies: C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed
Fork of MbedSmartRestMain by
operation/OperationSupport.cpp
- Committer:
- xinlei
- Date:
- 2015-03-20
- Revision:
- 91:423177e8a401
- Parent:
- 82:ca7430f50b2b
- Child:
- 93:0acd11870c6a
File content as of revision 91:423177e8a401:
#include <string.h> #include "OperationSupport.h" #include "Aggregator.h" #include "ComposedRecord.h" #include "CharValue.h" #include "IntegerValue.h" CharValue aOperationStatePending("PENDING"); CharValue aOperationStateExecuting("EXECUTING"); CharValue aOperationStateSuccessful("SUCCESSFUL"); CharValue aOperationStateFailed("FAILED"); OperationSupport::OperationSupport(AbstractSmartRest& client, SmartRestTemplate& tpl, long& deviceId, ConfigurationSynchronization& configurationSynchronization, DeviceIO& io, DisplayInfo& displayInfo) : _deviceId(deviceId), _tpl(tpl), _client(client), _executor(client, tpl, deviceId, configurationSynchronization, io, displayInfo), _thread1(OperationSupport::thread1_func, this), _thread2(OperationSupport::thread2_func, this), _thread3(OperationSupport::thread3_func, this) { _init = false; _firstRun = true; } bool OperationSupport::init() { if (_init) return false; // Get pending operations // USAGE: 110,<DEVICE/ID> if (!_tpl.add("10,110,GET,/devicecontrol/operations?status=PENDING&deviceId=%%&pageSize=100,,application/vnd.com.nsn.cumulocity.operationCollection+json,%%,UNSIGNED,\r\n")) return false; // Set operation state // USAGE: 110,<OPERATION/ID>,<STATE> if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n")) return false; // Get operations // Response: 210,<OPERATION/ID>,<STATUS> if (!_tpl.add("11,210,\"$.operations\",,\"$.id\",\"$.status\"\r\n")) return false; // Get operation // Response: 211,<OPERATION/ID>,<STATUS> if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n")) return false; if (!_executor.init()) return false; _init = true; return true; } bool OperationSupport::run() { if (_firstRun) { _firstRun = false; if (!requestPendingOperations()) return false; } return true; } bool OperationSupport::requestPendingOperations() { uint8_t ret; OperationStore::Operation op; ComposedRecord record; ParsedRecord received; IntegerValue msgId(110); IntegerValue deviceId(_deviceId); if ((!record.add(msgId)) || (!record.add(deviceId))) return false; if (_client.send(record) != SMARTREST_SUCCESS) { _client.stop(); return false; } while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) { if (!operationFromRecord(received, op)) continue; _store.enqueue(op); // _store.takePending(op); // updateOperation(op); // bool b = _executor.executeOperation(op); // _store.markAsDone(op, b); } _client.stop(); if ((ret != SMARTREST_END_OF_RESPONSE) && (ret != SMARTREST_CONNECTION_CLOSED)) { return false; } return true; } bool OperationSupport::operationFromRecord(ParsedRecord& received, OperationStore::Operation& op) { const char *tmp; if ((received.values() < 4) || (received.value(0).valueType() != VALUE_INTEGER) || //TODO: error here since streaming op does not have 210 msg id // (received.value(0).integerValue() != 210) || (received.value(2).valueType() != VALUE_INTEGER) || (received.value(3).valueType() != VALUE_CHARACTER)) return false; op.identifier = received.value(2).integerValue(); tmp = received.value(3).characterValue(); if (strcmp(tmp, "EXECUTING") == 0) op.state = OPERATION_EXECUTING; else if (strcmp(tmp, "SUCCESSFUL") == 0) op.state = OPERATION_SUCCESSFUL; else if (strcmp(tmp, "FAILED") == 0) op.state = OPERATION_FAILED; else if (strcmp(tmp, "PENDING") == 0) op.state = OPERATION_PENDING; else return false; return true; } bool OperationSupport::updateOperation(OperationStore::Operation& op) { ComposedRecord record; ParsedRecord received; IntegerValue msgId(111); IntegerValue operationId(op.identifier); if ((!record.add(msgId)) || (!record.add(operationId)) || (!record.add(operationStateValue(op)))) return false; if (_client.send(record) != SMARTREST_SUCCESS) { _client.stop(); return false; } puts("UP: sent record!"); bool found = false; while (_client.receive(received) == SMARTREST_SUCCESS) { if ((received.values() == 4) && (received.value(0).valueType() == VALUE_INTEGER) && (received.value(0).integerValue() == 211) && (received.value(2).valueType() == VALUE_INTEGER) && (received.value(2).integerValue() == op.identifier)) { found = true; break; } } puts("UP: received record!"); _client.stop(); return found; } CharValue& OperationSupport::operationStateValue(OperationStore::Operation& op) { switch (op.state) { case OPERATION_EXECUTING: return aOperationStateExecuting; case OPERATION_SUCCESSFUL: return aOperationStateSuccessful; case OPERATION_FAILED: return aOperationStateFailed; default: return aOperationStatePending; } } void OperationSupport::thread1() { OperationStore::Operation op; bool ret; while (!_init) Thread::yield(); while (true) { if (!_store.takePending(op)) { Thread::yield(); continue; } // printf("Thread 1: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state); puts("Updating op"); updateOperation(op); puts("Updated op"); ret = _executor.executeOperation(op); puts("Executed op"); _store.markAsDone(op, ret); } } void OperationSupport::thread2() { OperationStore::Operation op; Aggregator aggr(true); while (!_init) Thread::yield(); while (true) { while ((!aggr.full()) && (_store.takeDone(op))) { ComposedRecord record; // printf("Thread 2: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state); IntegerValue msgId(111); IntegerValue operationId(op.identifier); if ((!record.add(msgId)) || (!record.add(operationId)) || (!record.add(operationStateValue(op)))) break; if (!aggr.add(record)) break; } if (aggr.length() == 0) { Thread::yield(); continue; } puts("Sending aggr"); if (_client.send(aggr) != SMARTREST_SUCCESS) { } _client.stop(); aggr.clear(); } } void OperationSupport::thread3() { char bayeuxId[33]; ComposedRecord record; ParsedRecord received; OperationStore::Operation op; while ((!_init) || (_firstRun)) Thread::yield(); // request Bayeux ID { const char *str; IntegerValue msgId(80); if (!record.add(msgId)) return; if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) || (_client.receive(received) != SMARTREST_SUCCESS)) { _client.stop(); return; } _client.stop(); str = received.value(0).characterValue(); if ((str == NULL) || (strlen(str) > sizeof(bayeuxId))) return; strcpy(bayeuxId, str); record.clear(); } // set channel { char chn[16]; int len = 0; snprintf(chn, sizeof(chn), "/%ld%n", _deviceId, &len); if ((len == 0) || (len == sizeof(chn))) return; IntegerValue msgId(81); CharValue bid(bayeuxId); CharValue channel(chn); if ((!record.add(msgId)) || (!record.add(bid)) || (!record.add(channel))) return; if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) || (_client.receive(received) != SMARTREST_END_OF_RESPONSE)) { _client.stop(); return; } _client.stop(); record.clear(); } { IntegerValue msgId(83); CharValue bid(bayeuxId); if ((!record.add(msgId)) || (!record.add(bid))) return; } while (true) { if (_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) { _client.stop(); continue; } puts("Receiving op"); while (_client.receive(received) == SMARTREST_SUCCESS) { if (!operationFromRecord(received, op)) continue; puts("Parsed record"); // printf("Thread 3: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state); _store.enqueue(op); puts("Enqueued op"); // _store.takePending(op); // puts("Updating op"); // updateOperation(op); // puts("Updated op"); // bool ret = _executor.executeOperation(op); // puts("Executed op"); // _store.markAsDone(op, ret); // puts("Marked"); } //TODO: error checking _client.stop(); } } void OperationSupport::thread1_func(void const *arg) { OperationSupport *that; that = (OperationSupport*)arg; that->thread1(); } void OperationSupport::thread2_func(void const *arg) { OperationSupport *that; that = (OperationSupport*)arg; that->thread2(); } void OperationSupport::thread3_func(void const *arg) { OperationSupport *that; that = (OperationSupport*)arg; that->thread3(); }