Official reference client implementation for Cumulocity SmartREST on u-blox C027.
Dependencies: C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed
Fork of MbedSmartRestMain by
operation/OperationSupport.cpp
- Committer:
- xinlei
- Date:
- 2015-04-13
- Revision:
- 93:0acd11870c6a
- Parent:
- 91:423177e8a401
- Child:
- 94:61d44636f020
File content as of revision 93:0acd11870c6a:
#include <string.h> #include "OperationSupport.h" //#include "Aggregator.h" #include "SmartRestConf.h" #include "logging.h" //CharValue aOperationStatePending("PENDING"); //CharValue aOperationStateExecuting("EXECUTING"); //CharValue aOperationStateSuccessful("SUCCESSFUL"); //CharValue aOperationStateFailed("FAILED"); const char strPending[] = "PENDING"; const char strExecuting[] = "EXECUTING"; const char strSuccessful[] = "SUCCESSFUL"; const char strFailed[] = "FAILED"; OperationSupport::OperationSupport(RtosSmartRest& client, SmartRestTemplate& tpl, long& deviceId, ConfigurationSynchronization& configurationSynchronization, LCDDisplay& lcdDisplay) : _deviceId(deviceId), _tpl(tpl), _client(client), _executor(client, tpl, deviceId, configurationSynchronization, lcdDisplay), sock(), _thread2(OperationSupport::thread2_func, this), _thread3(OperationSupport::thread3_func, this) { _init = false; _firstRun = true; } bool OperationSupport::init() { if (_init) return false; // Get pending operations // USAGE: 110,<DEVICE/ID> if (!_tpl.add("10,110,GET,/devicecontrol/operations?status=PENDING&deviceId=%%&pageSize=100,,application/vnd.com.nsn.cumulocity.operationCollection+json,%%,UNSIGNED,\r\n")) return false; // Set operation state // USAGE: 111,<OPERATION/ID>,<STATE> if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n")) return false; // Get operations // Response: 210,<OPERATION/ID>,<STATUS> if (!_tpl.add("11,210,\"$.operations\",,\"$.id\",\"$.status\"\r\n")) return false; // Get operation // Response: 211,<OPERATION/ID>,<STATUS> if (!_tpl.add("11,211,,\"$.deviceId\",\"$.id\",\"$.status\"\r\n")) return false; if (!_executor.init()) return false; _init = true; return true; } bool OperationSupport::run() { if (_firstRun) { _firstRun = false; bool b = requestPendingOperations(); return b; } else { return true; } } bool OperationSupport::executePendingOperation(Operation& op) { ComposedRecord r; if (!r.add(IntegerValue(112)) || !r.add(IntegerValue(op.identifier))) { return false; } else if (_client.send(r) != SMARTREST_SUCCESS) { _client.stop(); return false; } ParsedRecord p; bool b = true; while (_client.receive(p) == SMARTREST_SUCCESS) { if (p.values() >= 3 && p.value(0).integerValue() >= 220 && p.value(0).integerValue() <= 222 && p.value(2).valueType() == VALUE_INTEGER) { Operation* op1 = opool.alloc(); op1->identifier = p.value(2).integerValue(); bool ret = _executor.executeOperation(p); op1->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED; opool.put(op1); } } _client.stop(); return b; } bool OperationSupport::requestPendingOperations() { IntegerValue msgId(110); IntegerValue deviceId(_deviceId); ComposedRecord record; if ((!record.add(msgId)) || (!record.add(deviceId))) return false; else if (_client.send(record) != SMARTREST_SUCCESS) { _client.stop(); return false; } uint8_t ret; ParsedRecord received; Operation opl[10]; size_t c = 0; while ((ret=_client.receive(received)) == SMARTREST_SUCCESS) { if (c < 10 && operationFromRecord(received, opl[c])) { ++c; } else { aWarning("Ignored pending operation after 10.\n"); break; } } _client.stop(); for (size_t i = 0; i < c; ++i) { Operation* op = opool.alloc(); op->identifier = opl[i].identifier; op->state = OPERATION_EXECUTING; opool.put(op); executePendingOperation(opl[i]); } return (ret == SMARTREST_END_OF_RESPONSE || ret == SMARTREST_CONNECTION_CLOSED); } bool OperationSupport::operationFromRecord(ParsedRecord& received, Operation& op) { if ((received.values() < 4) || (received.value(0).valueType() != VALUE_INTEGER) || // (received.value(0).integerValue() != 211) || (received.value(2).valueType() != VALUE_INTEGER) || (received.value(3).valueType() != VALUE_CHARACTER)) return false; op.identifier = received.value(2).integerValue(); const char *tmp = received.value(3).characterValue(); if (strcmp(tmp, strExecuting) == 0) op.state = OPERATION_EXECUTING; else if (strcmp(tmp, strSuccessful) == 0) op.state = OPERATION_SUCCESSFUL; else if (strcmp(tmp, strFailed) == 0) op.state = OPERATION_FAILED; else if (strcmp(tmp, strPending) == 0) op.state = OPERATION_PENDING; else return false; return true; } //CharValue& OperationSupport::operationStateValue(OperationState state) const //{ // switch (state) { // case OPERATION_EXECUTING: return aOperationStateExecuting; // case OPERATION_SUCCESSFUL: return aOperationStateSuccessful; // case OPERATION_FAILED: return aOperationStateFailed; // default: return aOperationStatePending; // } //} const char * OperationSupport::getOperationStateChar(OperationState state) const { switch (state) { case OPERATION_EXECUTING: return strExecuting; case OPERATION_SUCCESSFUL: return strSuccessful; case OPERATION_FAILED: return strFailed; case OPERATION_PENDING: return strPending; } } //void OperationSupport::thread2() //{ // IntegerValue msgId(111); // Aggregator aggr(true); // aInfo("Report thread: %p\n", Thread::gettid()); // while (true) { // while (!aggr.full()) { // osEvent e = opool.get(200); // if (e.status == osEventTimeout) { // break; // } else if (e.status == osEventMail) { // Operation *op = (Operation*)e.value.p; // ComposedRecord record; // IntegerValue operationId(op->identifier); // if (record.add(msgId) && record.add(operationId) && // record.add(operationStateValue(op->state)) && aggr.add(record)) { // printf("Reporting: <%ld, %u>.\n", op->identifier, op->state); // opool.free(op); // } else { // opool.put(op); // } // } else { // break; // } // } // if (aggr.length()) { // _client.sendAndClose(aggr); // _client.stop(); // aggr.clear(); // } // } //} const char fmt[] = "POST /s HTTP/1.0\r\nHost: %s\r\nAuthorization: Basic %s\r\nX-Id: %s\r\nContent-Length: %d\r\n\r\n%s"; const char fmt2[] = "111,%ld,%s\r\n"; void OperationSupport::thread2() { aInfo("Report thread: %p\n", Thread::gettid()); while (true) { osEvent e = opool.get(); if (e.status == osEventMail) { Operation *op = (Operation*)e.value.p; aDebug("Report: <%ld, %u>.\n", op->identifier, op->state); int len = snprintf(buf2, sizeof(buf2), fmt2, op->identifier, getOperationStateChar(op->state)); opool.free(op); int ret = sock.connect(getHost(), getPort()); for (uint8_t i = 1; ret < 0 && i < 3; ++i) { ret = sock.connect(getHost(), getPort()); } if (ret >= 0) { int l = snprintf(buf, sizeof(buf), fmt, getHost(), getAuthStr(), getIdentifier(), len, buf2); ret = sock.send(buf, l); if (ret < 0) { aError("Report: Send!\n"); } sock.close(); } else { aError("Report: Connect!\n"); } } } } void OperationSupport::thread3() { char bayeuxId[33]; ComposedRecord record; ParsedRecord received; while (!_init || _firstRun) Thread::yield(); // request Bayeux ID aInfo("Polling thread: %p\n", Thread::gettid()); { const char *str; IntegerValue msgId(80); if (!record.add(msgId)) return; if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) || (_client.receive(received) != SMARTREST_SUCCESS)) { _client.stop(); return; } _client.stop(); str = received.value(0).characterValue(); if (str == NULL || strlen(str) > sizeof(bayeuxId)) return; strcpy(bayeuxId, str); record.clear(); aInfo("Polling: bayeux ID %s\n", str); } // set channel { char chn[16]; int len = 0; snprintf(chn, sizeof(chn), "/%ld%n", _deviceId, &len); if (len == 0 || len == sizeof(chn)) { aError("Polling: Invalid id %ld.\n", _deviceId); return; } IntegerValue msgId(81); CharValue bid(bayeuxId); CharValue channel(chn); if (!record.add(msgId) || !record.add(bid) || !record.add(channel)) { return; } if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) || (_client.receive(received) != SMARTREST_END_OF_RESPONSE)) { aError("Polling: Subscribe channel %s.\n", chn); _client.stop(); return; } _client.stop(); record.clear(); aInfo("Polling: Subscribed channel %s!\n", chn); } if (!record.add(IntegerValue(83)) || !record.add(CharValue(bayeuxId))) { return; } while (true) { if (_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) { aError("Polling: Connect.\n"); _client.stop(); continue; } while (_client.receive(received) == SMARTREST_SUCCESS) { for (size_t i = 0; i < received.values(); ++i) { printf("%s ", received.rawValue(i)); } puts(""); if (received.values() == 0) { continue; } else if (received.value(0).integerValue() == 86) { // advice responses if (received.values() < 5) { aWarning("Incomplete advice, %u of 5 values received.\n", received.values()); continue; } else if (strcmp(received.value(4).characterValue(), "retry") == 0) { aDebug("Carry out 'retry' policy.\n"); break; } else if (strcmp(received.value(4).characterValue(), "handshake") == 0) { } } else if (received.value(0).integerValue() == 211) { // Operation State Message Operation* op = opool.alloc(); if (op == NULL) { aCritical("opool full!\n"); } else if (operationFromRecord(received, *op)) { op->state = OPERATION_EXECUTING; opool.put(op); } else { opool.free(op); } } else if (received.value(0).integerValue() >= 220 && received.value(0).integerValue() <= 222) { // Real Operation if (received.values() >= 3 && received.value(2).valueType() == VALUE_INTEGER) { Operation* op = opool.alloc(); op->identifier = received.value(2).integerValue(); bool ret = _executor.executeOperation(received); op->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED; opool.put(op); } else { aWarning("Incomplete operation, %u of 4 values received.\n", received.values()); } } else { aError("Unknown operation ID: %d\n", received.value(0).integerValue()); } } _client.stop(); } } void OperationSupport::thread2_func(void const *arg) { OperationSupport *that; that = (OperationSupport*)arg; that->thread2(); } void OperationSupport::thread3_func(void const *arg) { OperationSupport *that; that = (OperationSupport*)arg; that->thread3(); }