Official reference client implementation for Cumulocity SmartREST on u-blox C027.
Dependencies: C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed
Fork of MbedSmartRestMain by
Diff: operation/OperationSupport.cpp
- Revision:
- 93:0acd11870c6a
- Parent:
- 91:423177e8a401
- Child:
- 94:61d44636f020
--- a/operation/OperationSupport.cpp Fri Mar 20 14:27:10 2015 +0000 +++ b/operation/OperationSupport.cpp Mon Apr 13 14:24:58 2015 +0000 @@ -1,23 +1,27 @@ #include <string.h> #include "OperationSupport.h" -#include "Aggregator.h" -#include "ComposedRecord.h" -#include "CharValue.h" -#include "IntegerValue.h" +//#include "Aggregator.h" +#include "SmartRestConf.h" +#include "logging.h" + +//CharValue aOperationStatePending("PENDING"); +//CharValue aOperationStateExecuting("EXECUTING"); +//CharValue aOperationStateSuccessful("SUCCESSFUL"); +//CharValue aOperationStateFailed("FAILED"); -CharValue aOperationStatePending("PENDING"); -CharValue aOperationStateExecuting("EXECUTING"); -CharValue aOperationStateSuccessful("SUCCESSFUL"); -CharValue aOperationStateFailed("FAILED"); +const char strPending[] = "PENDING"; +const char strExecuting[] = "EXECUTING"; +const char strSuccessful[] = "SUCCESSFUL"; +const char strFailed[] = "FAILED"; -OperationSupport::OperationSupport(AbstractSmartRest& client, SmartRestTemplate& tpl, long& deviceId, - ConfigurationSynchronization& configurationSynchronization, DeviceIO& io, - DisplayInfo& displayInfo) : +OperationSupport::OperationSupport(RtosSmartRest& client, SmartRestTemplate& tpl, long& deviceId, + ConfigurationSynchronization& configurationSynchronization, + LCDDisplay& lcdDisplay) : _deviceId(deviceId), _tpl(tpl), _client(client), - _executor(client, tpl, deviceId, configurationSynchronization, io, displayInfo), - _thread1(OperationSupport::thread1_func, this), + _executor(client, tpl, deviceId, configurationSynchronization, lcdDisplay), + sock(), _thread2(OperationSupport::thread2_func, this), _thread3(OperationSupport::thread3_func, this) { @@ -36,7 +40,7 @@ return false; // Set operation state - // USAGE: 110,<OPERATION/ID>,<STATE> + // USAGE: 111,<OPERATION/ID>,<STATE> if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n")) return false; @@ -61,176 +65,181 @@ { if (_firstRun) { _firstRun = false; - if (!requestPendingOperations()) - return false; + bool b = requestPendingOperations(); + return b; + } else { + return true; + } +} + +bool OperationSupport::executePendingOperation(Operation& op) +{ + ComposedRecord r; + if (!r.add(IntegerValue(112)) || !r.add(IntegerValue(op.identifier))) { + return false; + } else if (_client.send(r) != SMARTREST_SUCCESS) { + _client.stop(); + return false; } - return true; + ParsedRecord p; + bool b = true; + while (_client.receive(p) == SMARTREST_SUCCESS) { + if (p.values() >= 3 && + p.value(0).integerValue() >= 220 && + p.value(0).integerValue() <= 222 && + p.value(2).valueType() == VALUE_INTEGER) { + Operation* op1 = opool.alloc(); + op1->identifier = p.value(2).integerValue(); + bool ret = _executor.executeOperation(p); + op1->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED; + opool.put(op1); + } + } + _client.stop(); + return b; } bool OperationSupport::requestPendingOperations() { - uint8_t ret; - OperationStore::Operation op; - - ComposedRecord record; - ParsedRecord received; - IntegerValue msgId(110); IntegerValue deviceId(_deviceId); + ComposedRecord record; + if ((!record.add(msgId)) || (!record.add(deviceId))) return false; - - if (_client.send(record) != SMARTREST_SUCCESS) { - _client.stop(); - return false; - } - - while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) { - if (!operationFromRecord(received, op)) - continue; - _store.enqueue(op); -// _store.takePending(op); -// updateOperation(op); -// bool b = _executor.executeOperation(op); -// _store.markAsDone(op, b); - } - _client.stop(); - - if ((ret != SMARTREST_END_OF_RESPONSE) && - (ret != SMARTREST_CONNECTION_CLOSED)) { - return false; - } - return true; -} - -bool OperationSupport::operationFromRecord(ParsedRecord& received, OperationStore::Operation& op) -{ - const char *tmp; - - if ((received.values() < 4) || - (received.value(0).valueType() != VALUE_INTEGER) || -//TODO: error here since streaming op does not have 210 msg id -// (received.value(0).integerValue() != 210) || - (received.value(2).valueType() != VALUE_INTEGER) || - (received.value(3).valueType() != VALUE_CHARACTER)) - return false; - - op.identifier = received.value(2).integerValue(); - tmp = received.value(3).characterValue(); - if (strcmp(tmp, "EXECUTING") == 0) - op.state = OPERATION_EXECUTING; - else if (strcmp(tmp, "SUCCESSFUL") == 0) - op.state = OPERATION_SUCCESSFUL; - else if (strcmp(tmp, "FAILED") == 0) - op.state = OPERATION_FAILED; - else if (strcmp(tmp, "PENDING") == 0) - op.state = OPERATION_PENDING; - else - return false; - - return true; -} - -bool OperationSupport::updateOperation(OperationStore::Operation& op) -{ - ComposedRecord record; - ParsedRecord received; - IntegerValue msgId(111); - IntegerValue operationId(op.identifier); - if ((!record.add(msgId)) || (!record.add(operationId)) || - (!record.add(operationStateValue(op)))) - return false; - - if (_client.send(record) != SMARTREST_SUCCESS) { + else if (_client.send(record) != SMARTREST_SUCCESS) { _client.stop(); return false; } - puts("UP: sent record!"); - bool found = false; - while (_client.receive(received) == SMARTREST_SUCCESS) { - if ((received.values() == 4) && - (received.value(0).valueType() == VALUE_INTEGER) && - (received.value(0).integerValue() == 211) && - (received.value(2).valueType() == VALUE_INTEGER) && - (received.value(2).integerValue() == op.identifier)) { - found = true; + + uint8_t ret; + ParsedRecord received; + Operation opl[10]; + size_t c = 0; + while ((ret=_client.receive(received)) == SMARTREST_SUCCESS) { + if (c < 10 && operationFromRecord(received, opl[c])) { + ++c; + } else { + aWarning("Ignored pending operation after 10.\n"); break; } } - puts("UP: received record!"); _client.stop(); - return found; + + for (size_t i = 0; i < c; ++i) { + Operation* op = opool.alloc(); + op->identifier = opl[i].identifier; + op->state = OPERATION_EXECUTING; + opool.put(op); + executePendingOperation(opl[i]); + } + return (ret == SMARTREST_END_OF_RESPONSE || ret == SMARTREST_CONNECTION_CLOSED); } -CharValue& OperationSupport::operationStateValue(OperationStore::Operation& op) +bool OperationSupport::operationFromRecord(ParsedRecord& received, Operation& op) { - switch (op.state) { - case OPERATION_EXECUTING: - return aOperationStateExecuting; - case OPERATION_SUCCESSFUL: - return aOperationStateSuccessful; - case OPERATION_FAILED: - return aOperationStateFailed; - default: - return aOperationStatePending; + if ((received.values() < 4) || + (received.value(0).valueType() != VALUE_INTEGER) || +// (received.value(0).integerValue() != 211) || + (received.value(2).valueType() != VALUE_INTEGER) || + (received.value(3).valueType() != VALUE_CHARACTER)) + return false; + + op.identifier = received.value(2).integerValue(); + const char *tmp = received.value(3).characterValue(); + if (strcmp(tmp, strExecuting) == 0) + op.state = OPERATION_EXECUTING; + else if (strcmp(tmp, strSuccessful) == 0) + op.state = OPERATION_SUCCESSFUL; + else if (strcmp(tmp, strFailed) == 0) + op.state = OPERATION_FAILED; + else if (strcmp(tmp, strPending) == 0) + op.state = OPERATION_PENDING; + else + return false; + return true; +} + +//CharValue& OperationSupport::operationStateValue(OperationState state) const +//{ +// switch (state) { +// case OPERATION_EXECUTING: return aOperationStateExecuting; +// case OPERATION_SUCCESSFUL: return aOperationStateSuccessful; +// case OPERATION_FAILED: return aOperationStateFailed; +// default: return aOperationStatePending; +// } +//} + +const char * OperationSupport::getOperationStateChar(OperationState state) const +{ + switch (state) { + case OPERATION_EXECUTING: return strExecuting; + case OPERATION_SUCCESSFUL: return strSuccessful; + case OPERATION_FAILED: return strFailed; + case OPERATION_PENDING: return strPending; } } -void OperationSupport::thread1() -{ - OperationStore::Operation op; - bool ret; +//void OperationSupport::thread2() +//{ +// IntegerValue msgId(111); +// Aggregator aggr(true); +// aInfo("Report thread: %p\n", Thread::gettid()); +// while (true) { +// while (!aggr.full()) { +// osEvent e = opool.get(200); +// if (e.status == osEventTimeout) { +// break; +// } else if (e.status == osEventMail) { +// Operation *op = (Operation*)e.value.p; +// ComposedRecord record; +// IntegerValue operationId(op->identifier); +// if (record.add(msgId) && record.add(operationId) && +// record.add(operationStateValue(op->state)) && aggr.add(record)) { +// printf("Reporting: <%ld, %u>.\n", op->identifier, op->state); +// opool.free(op); +// } else { +// opool.put(op); +// } +// } else { +// break; +// } +// } +// if (aggr.length()) { +// _client.sendAndClose(aggr); +// _client.stop(); +// aggr.clear(); +// } +// } +//} - while (!_init) - Thread::yield(); - - while (true) { - if (!_store.takePending(op)) { - Thread::yield(); - continue; - } -// printf("Thread 1: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state); - puts("Updating op"); - updateOperation(op); - puts("Updated op"); - ret = _executor.executeOperation(op); - puts("Executed op"); - _store.markAsDone(op, ret); - } -} - +const char fmt[] = "POST /s HTTP/1.0\r\nHost: %s\r\nAuthorization: Basic %s\r\nX-Id: %s\r\nContent-Length: %d\r\n\r\n%s"; +const char fmt2[] = "111,%ld,%s\r\n"; void OperationSupport::thread2() { - OperationStore::Operation op; - Aggregator aggr(true); - - while (!_init) - Thread::yield(); - + aInfo("Report thread: %p\n", Thread::gettid()); while (true) { - while ((!aggr.full()) && (_store.takeDone(op))) { - ComposedRecord record; - -// printf("Thread 2: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state); - IntegerValue msgId(111); - IntegerValue operationId(op.identifier); - if ((!record.add(msgId)) || (!record.add(operationId)) || - (!record.add(operationStateValue(op)))) - break; - - if (!aggr.add(record)) - break; + osEvent e = opool.get(); + if (e.status == osEventMail) { + Operation *op = (Operation*)e.value.p; + aDebug("Report: <%ld, %u>.\n", op->identifier, op->state); + int len = snprintf(buf2, sizeof(buf2), fmt2, op->identifier, getOperationStateChar(op->state)); + opool.free(op); + int ret = sock.connect(getHost(), getPort()); + for (uint8_t i = 1; ret < 0 && i < 3; ++i) { + ret = sock.connect(getHost(), getPort()); + } + if (ret >= 0) { + int l = snprintf(buf, sizeof(buf), fmt, getHost(), getAuthStr(), getIdentifier(), len, buf2); + ret = sock.send(buf, l); + if (ret < 0) { + aError("Report: Send!\n"); + } + sock.close(); + } else { + aError("Report: Connect!\n"); + } } - - if (aggr.length() == 0) { - Thread::yield(); - continue; - } - - puts("Sending aggr"); - if (_client.send(aggr) != SMARTREST_SUCCESS) { } - _client.stop(); - aggr.clear(); } } @@ -239,12 +248,12 @@ char bayeuxId[33]; ComposedRecord record; ParsedRecord received; - OperationStore::Operation op; - - while ((!_init) || (_firstRun)) + + while (!_init || _firstRun) Thread::yield(); - + // request Bayeux ID + aInfo("Polling thread: %p\n", Thread::gettid()); { const char *str; IntegerValue msgId(80); @@ -259,80 +268,98 @@ _client.stop(); str = received.value(0).characterValue(); - if ((str == NULL) || (strlen(str) > sizeof(bayeuxId))) + if (str == NULL || strlen(str) > sizeof(bayeuxId)) return; strcpy(bayeuxId, str); record.clear(); + aInfo("Polling: bayeux ID %s\n", str); } - + // set channel { char chn[16]; int len = 0; snprintf(chn, sizeof(chn), "/%ld%n", _deviceId, &len); - if ((len == 0) || (len == sizeof(chn))) + if (len == 0 || len == sizeof(chn)) { + aError("Polling: Invalid id %ld.\n", _deviceId); return; - + } + IntegerValue msgId(81); CharValue bid(bayeuxId); CharValue channel(chn); - if ((!record.add(msgId)) || (!record.add(bid)) || (!record.add(channel))) + if (!record.add(msgId) || !record.add(bid) || !record.add(channel)) { return; + } if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) || (_client.receive(received) != SMARTREST_END_OF_RESPONSE)) { + aError("Polling: Subscribe channel %s.\n", chn); _client.stop(); return; } - _client.stop(); record.clear(); + aInfo("Polling: Subscribed channel %s!\n", chn); } - { - IntegerValue msgId(83); - CharValue bid(bayeuxId); - if ((!record.add(msgId)) || (!record.add(bid))) - return; + if (!record.add(IntegerValue(83)) || !record.add(CharValue(bayeuxId))) { + return; } while (true) { if (_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) { + aError("Polling: Connect.\n"); _client.stop(); continue; } - - puts("Receiving op"); while (_client.receive(received) == SMARTREST_SUCCESS) { - if (!operationFromRecord(received, op)) + for (size_t i = 0; i < received.values(); ++i) { + printf("%s ", received.rawValue(i)); + } + puts(""); + if (received.values() == 0) { continue; - puts("Parsed record"); -// printf("Thread 3: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state); - _store.enqueue(op); - puts("Enqueued op"); -// _store.takePending(op); -// puts("Updating op"); -// updateOperation(op); -// puts("Updated op"); -// bool ret = _executor.executeOperation(op); -// puts("Executed op"); -// _store.markAsDone(op, ret); -// puts("Marked"); + } else if (received.value(0).integerValue() == 86) { // advice responses + if (received.values() < 5) { + aWarning("Incomplete advice, %u of 5 values received.\n", received.values()); + continue; + } else if (strcmp(received.value(4).characterValue(), "retry") == 0) { + aDebug("Carry out 'retry' policy.\n"); + break; + } else if (strcmp(received.value(4).characterValue(), "handshake") == 0) { + } + } else if (received.value(0).integerValue() == 211) { // Operation State Message + Operation* op = opool.alloc(); + if (op == NULL) { + aCritical("opool full!\n"); + } else if (operationFromRecord(received, *op)) { + op->state = OPERATION_EXECUTING; + opool.put(op); + } else { + opool.free(op); + } + } else if (received.value(0).integerValue() >= 220 && + received.value(0).integerValue() <= 222) { // Real Operation + if (received.values() >= 3 && + received.value(2).valueType() == VALUE_INTEGER) { + Operation* op = opool.alloc(); + op->identifier = received.value(2).integerValue(); + bool ret = _executor.executeOperation(received); + op->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED; + opool.put(op); + } else { + aWarning("Incomplete operation, %u of 4 values received.\n", received.values()); + } + } else { + aError("Unknown operation ID: %d\n", received.value(0).integerValue()); + } } - - //TODO: error checking _client.stop(); } } -void OperationSupport::thread1_func(void const *arg) -{ - OperationSupport *that; - that = (OperationSupport*)arg; - that->thread1(); -} - void OperationSupport::thread2_func(void const *arg) { OperationSupport *that;