Official reference client implementation for Cumulocity SmartREST on u-blox C027.

Dependencies:   C027_Support C12832 LM75B MMA7660 MbedSmartRest mbed-rtos mbed

Fork of MbedSmartRestMain by Vincent Wochnik

Revision:
93:0acd11870c6a
Parent:
91:423177e8a401
Child:
94:61d44636f020
--- a/operation/OperationSupport.cpp	Fri Mar 20 14:27:10 2015 +0000
+++ b/operation/OperationSupport.cpp	Mon Apr 13 14:24:58 2015 +0000
@@ -1,23 +1,27 @@
 #include <string.h>
 #include "OperationSupport.h"
-#include "Aggregator.h"
-#include "ComposedRecord.h"
-#include "CharValue.h"
-#include "IntegerValue.h"
+//#include "Aggregator.h"
+#include "SmartRestConf.h"
+#include "logging.h"
+
+//CharValue aOperationStatePending("PENDING");
+//CharValue aOperationStateExecuting("EXECUTING");
+//CharValue aOperationStateSuccessful("SUCCESSFUL");
+//CharValue aOperationStateFailed("FAILED");
 
-CharValue aOperationStatePending("PENDING");
-CharValue aOperationStateExecuting("EXECUTING");
-CharValue aOperationStateSuccessful("SUCCESSFUL");
-CharValue aOperationStateFailed("FAILED");
+const char strPending[] = "PENDING";
+const char strExecuting[] = "EXECUTING";
+const char strSuccessful[] = "SUCCESSFUL";
+const char strFailed[] = "FAILED";
 
-OperationSupport::OperationSupport(AbstractSmartRest& client, SmartRestTemplate& tpl, long& deviceId, 
-                                   ConfigurationSynchronization& configurationSynchronization, DeviceIO& io,
-                                   DisplayInfo& displayInfo) :
+OperationSupport::OperationSupport(RtosSmartRest& client, SmartRestTemplate& tpl, long& deviceId, 
+                                   ConfigurationSynchronization& configurationSynchronization,
+                                   LCDDisplay& lcdDisplay) :
     _deviceId(deviceId),
     _tpl(tpl),
     _client(client),
-    _executor(client, tpl, deviceId, configurationSynchronization, io, displayInfo),
-    _thread1(OperationSupport::thread1_func, this),
+    _executor(client, tpl, deviceId, configurationSynchronization, lcdDisplay),
+    sock(),
     _thread2(OperationSupport::thread2_func, this),
     _thread3(OperationSupport::thread3_func, this)
 {
@@ -36,7 +40,7 @@
         return false;
 
     // Set operation state
-    // USAGE: 110,<OPERATION/ID>,<STATE>
+    // USAGE: 111,<OPERATION/ID>,<STATE>
     if (!_tpl.add("10,111,PUT,/devicecontrol/operations/%%,application/vnd.com.nsn.cumulocity.operation+json,application/vnd.com.nsn.cumulocity.operation+json,%%,UNSIGNED STRING,\"{\"\"status\"\":\"\"%%\"\"}\"\r\n"))
         return false;
 
@@ -61,176 +65,181 @@
 {
     if (_firstRun) {
         _firstRun = false;
-        if (!requestPendingOperations())
-            return false;
+        bool b = requestPendingOperations();
+        return b;
+    } else {
+        return true;
+    }
+}
+
+bool OperationSupport::executePendingOperation(Operation& op)
+{
+    ComposedRecord r;
+    if (!r.add(IntegerValue(112)) || !r.add(IntegerValue(op.identifier))) {
+        return false;
+    } else if (_client.send(r) != SMARTREST_SUCCESS) {
+        _client.stop();
+        return false;
     }
-    return true;
+    ParsedRecord p;
+    bool b = true;
+    while (_client.receive(p) == SMARTREST_SUCCESS) {
+        if (p.values() >= 3 &&
+            p.value(0).integerValue() >= 220 &&
+            p.value(0).integerValue() <= 222 &&
+            p.value(2).valueType() == VALUE_INTEGER) {
+            Operation* op1 = opool.alloc();
+            op1->identifier = p.value(2).integerValue();
+            bool ret = _executor.executeOperation(p);
+            op1->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED;
+            opool.put(op1);
+        }
+    }
+    _client.stop();
+    return b;
 }
 
 bool OperationSupport::requestPendingOperations()
 {
-    uint8_t ret;
-    OperationStore::Operation op;
-
-    ComposedRecord record;
-    ParsedRecord received;
-    
     IntegerValue msgId(110);
     IntegerValue deviceId(_deviceId);
+    ComposedRecord record;
+
     if ((!record.add(msgId)) || (!record.add(deviceId)))
         return false;
-
-    if (_client.send(record) != SMARTREST_SUCCESS) {
-        _client.stop();
-        return false;
-    }
-    
-    while ((ret = _client.receive(received)) == SMARTREST_SUCCESS) {
-        if (!operationFromRecord(received, op))
-            continue;
-        _store.enqueue(op);
-//        _store.takePending(op);
-//        updateOperation(op);
-//        bool b = _executor.executeOperation(op);
-//        _store.markAsDone(op, b);
-    }
-    _client.stop();
-
-    if ((ret != SMARTREST_END_OF_RESPONSE) &&
-        (ret != SMARTREST_CONNECTION_CLOSED)) {
-        return false;
-    }   
-    return true;
-}
-
-bool OperationSupport::operationFromRecord(ParsedRecord& received, OperationStore::Operation& op)
-{
-    const char *tmp;
-
-    if ((received.values() < 4) ||
-        (received.value(0).valueType() != VALUE_INTEGER) ||
-//TODO: error here since streaming op does not have 210 msg id
-//        (received.value(0).integerValue() != 210) ||
-        (received.value(2).valueType() != VALUE_INTEGER) ||
-        (received.value(3).valueType() != VALUE_CHARACTER))
-        return false;
-    
-    op.identifier = received.value(2).integerValue();
-    tmp = received.value(3).characterValue();
-    if (strcmp(tmp, "EXECUTING") == 0)
-        op.state = OPERATION_EXECUTING;
-    else if (strcmp(tmp, "SUCCESSFUL") == 0)
-        op.state = OPERATION_SUCCESSFUL;
-    else if (strcmp(tmp, "FAILED") == 0)
-        op.state = OPERATION_FAILED;
-    else if (strcmp(tmp, "PENDING") == 0)
-        op.state = OPERATION_PENDING;
-    else
-        return false;
-
-    return true;
-}
-
-bool OperationSupport::updateOperation(OperationStore::Operation& op)
-{
-    ComposedRecord record;
-    ParsedRecord received;
-    IntegerValue msgId(111);
-    IntegerValue operationId(op.identifier);
-    if ((!record.add(msgId)) || (!record.add(operationId)) ||
-        (!record.add(operationStateValue(op))))
-        return false;
-    
-    if (_client.send(record) != SMARTREST_SUCCESS) {
+    else if (_client.send(record) != SMARTREST_SUCCESS) {
         _client.stop();
         return false;
     }
-    puts("UP: sent record!");
-    bool found = false;
-    while (_client.receive(received) == SMARTREST_SUCCESS) {
-        if ((received.values() == 4) &&
-            (received.value(0).valueType() == VALUE_INTEGER) &&
-            (received.value(0).integerValue() == 211) &&
-            (received.value(2).valueType() == VALUE_INTEGER) &&
-            (received.value(2).integerValue() == op.identifier)) {
-            found = true;
+
+    uint8_t ret;
+    ParsedRecord received;
+    Operation opl[10];
+    size_t c = 0;
+    while ((ret=_client.receive(received)) == SMARTREST_SUCCESS) {
+        if (c < 10 && operationFromRecord(received, opl[c])) {
+            ++c;
+        } else {
+            aWarning("Ignored pending operation after 10.\n");
             break;
         }
     }
-    puts("UP: received record!");
     _client.stop();
-    return found;
+
+    for (size_t i = 0; i < c; ++i) {
+        Operation* op = opool.alloc();
+        op->identifier = opl[i].identifier;
+        op->state = OPERATION_EXECUTING;
+        opool.put(op);
+        executePendingOperation(opl[i]);
+    }
+    return (ret == SMARTREST_END_OF_RESPONSE || ret == SMARTREST_CONNECTION_CLOSED);
 }
 
-CharValue& OperationSupport::operationStateValue(OperationStore::Operation& op)
+bool OperationSupport::operationFromRecord(ParsedRecord& received, Operation& op)
 {
-    switch (op.state) {
-    case OPERATION_EXECUTING:
-        return aOperationStateExecuting;
-    case OPERATION_SUCCESSFUL:
-        return aOperationStateSuccessful;
-    case OPERATION_FAILED:
-        return aOperationStateFailed;
-    default:
-        return aOperationStatePending;
+    if ((received.values() < 4) ||
+        (received.value(0).valueType() != VALUE_INTEGER) ||
+//        (received.value(0).integerValue() != 211) ||
+        (received.value(2).valueType() != VALUE_INTEGER) ||
+        (received.value(3).valueType() != VALUE_CHARACTER))
+        return false;
+
+    op.identifier = received.value(2).integerValue();
+    const char *tmp = received.value(3).characterValue();
+    if (strcmp(tmp, strExecuting) == 0)
+        op.state = OPERATION_EXECUTING;
+    else if (strcmp(tmp, strSuccessful) == 0)
+        op.state = OPERATION_SUCCESSFUL;
+    else if (strcmp(tmp, strFailed) == 0)
+        op.state = OPERATION_FAILED;
+    else if (strcmp(tmp, strPending) == 0)
+        op.state = OPERATION_PENDING;
+    else
+        return false;
+    return true;
+}
+
+//CharValue& OperationSupport::operationStateValue(OperationState state) const
+//{
+//    switch (state) {
+//    case OPERATION_EXECUTING:  return aOperationStateExecuting;
+//    case OPERATION_SUCCESSFUL: return aOperationStateSuccessful;
+//    case OPERATION_FAILED:     return aOperationStateFailed;
+//    default:                   return aOperationStatePending;
+//    }
+//}
+
+const char * OperationSupport::getOperationStateChar(OperationState state) const
+{
+    switch (state) {
+    case OPERATION_EXECUTING:  return strExecuting;
+    case OPERATION_SUCCESSFUL: return strSuccessful;
+    case OPERATION_FAILED:     return strFailed;
+    case OPERATION_PENDING:    return strPending;
     }
 }
 
-void OperationSupport::thread1()
-{
-    OperationStore::Operation op;
-    bool ret;
+//void OperationSupport::thread2()
+//{
+//    IntegerValue msgId(111);
+//    Aggregator aggr(true);
+//    aInfo("Report thread: %p\n", Thread::gettid());
+//    while (true) {
+//        while (!aggr.full()) {
+//            osEvent e = opool.get(200);
+//            if (e.status == osEventTimeout) {
+//                break;
+//            } else if (e.status == osEventMail) {
+//                Operation *op = (Operation*)e.value.p;
+//                ComposedRecord record;
+//                IntegerValue operationId(op->identifier);
+//                if (record.add(msgId) && record.add(operationId) &&
+//                    record.add(operationStateValue(op->state)) && aggr.add(record)) {
+//                    printf("Reporting: <%ld, %u>.\n", op->identifier, op->state);
+//                    opool.free(op);
+//                } else {
+//                    opool.put(op);
+//                }
+//            } else {
+//                break;
+//            }
+//        }
+//        if (aggr.length()) {
+//            _client.sendAndClose(aggr);
+//            _client.stop();
+//            aggr.clear();
+//        }
+//    }
+//}
 
-    while (!_init)
-        Thread::yield();
-    
-    while (true) {
-        if (!_store.takePending(op)) {
-            Thread::yield();
-            continue;
-        }        
-//        printf("Thread 1: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state);
-        puts("Updating op");
-        updateOperation(op);
-        puts("Updated op");
-        ret = _executor.executeOperation(op);
-        puts("Executed op");
-        _store.markAsDone(op, ret);
-    }
-}
-
+const char fmt[] = "POST /s HTTP/1.0\r\nHost: %s\r\nAuthorization: Basic %s\r\nX-Id: %s\r\nContent-Length: %d\r\n\r\n%s";
+const char fmt2[] = "111,%ld,%s\r\n";
 void OperationSupport::thread2()
 {
-    OperationStore::Operation op;
-    Aggregator aggr(true);
-
-    while (!_init)
-        Thread::yield();
-    
+    aInfo("Report thread: %p\n", Thread::gettid());
     while (true) {
-        while ((!aggr.full()) && (_store.takeDone(op))) {
-            ComposedRecord record;
-    
-//            printf("Thread 2: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state);
-            IntegerValue msgId(111);
-            IntegerValue operationId(op.identifier);
-            if ((!record.add(msgId)) || (!record.add(operationId)) ||
-                (!record.add(operationStateValue(op))))
-                break;
-            
-            if (!aggr.add(record))
-                break;
+        osEvent e = opool.get();
+        if (e.status == osEventMail) {
+            Operation *op = (Operation*)e.value.p;
+            aDebug("Report: <%ld, %u>.\n", op->identifier, op->state);
+            int len = snprintf(buf2, sizeof(buf2), fmt2, op->identifier, getOperationStateChar(op->state));
+            opool.free(op);
+            int ret = sock.connect(getHost(), getPort());
+            for (uint8_t i = 1; ret < 0 && i < 3; ++i) {
+                ret = sock.connect(getHost(), getPort());
+            }
+            if (ret >= 0) {
+                int l = snprintf(buf, sizeof(buf), fmt, getHost(), getAuthStr(), getIdentifier(), len, buf2);
+                ret = sock.send(buf, l);
+                if (ret < 0) {
+                    aError("Report: Send!\n");
+                }
+                sock.close();
+            } else {
+                aError("Report: Connect!\n");
+            }
         }
-        
-        if (aggr.length() == 0) {
-            Thread::yield();
-            continue;
-        }
-        
-        puts("Sending aggr");
-        if (_client.send(aggr) != SMARTREST_SUCCESS) { }
-        _client.stop();
-        aggr.clear();        
     }
 }
 
@@ -239,12 +248,12 @@
     char bayeuxId[33];
     ComposedRecord record;
     ParsedRecord received;
-    OperationStore::Operation op;
-    
-    while ((!_init) || (_firstRun))
+
+    while (!_init || _firstRun)
         Thread::yield();
-    
+
     // request Bayeux ID
+    aInfo("Polling thread: %p\n", Thread::gettid());
     {
         const char *str;
         IntegerValue msgId(80);
@@ -259,80 +268,98 @@
         _client.stop();
 
         str = received.value(0).characterValue();
-        if ((str == NULL) || (strlen(str) > sizeof(bayeuxId)))
+        if (str == NULL || strlen(str) > sizeof(bayeuxId))
             return;
 
         strcpy(bayeuxId, str);
         record.clear();
+        aInfo("Polling: bayeux ID %s\n", str);
     }
-    
+
     // set channel
     {
         char chn[16];
         int len = 0;
         snprintf(chn, sizeof(chn), "/%ld%n", _deviceId, &len);
-        if ((len == 0) || (len == sizeof(chn)))
+        if (len == 0 || len == sizeof(chn)) {
+            aError("Polling: Invalid id %ld.\n", _deviceId);
             return;
-        
+        }
+
         IntegerValue msgId(81);
         CharValue bid(bayeuxId);
         CharValue channel(chn);
-        if ((!record.add(msgId)) || (!record.add(bid)) || (!record.add(channel)))
+        if (!record.add(msgId) || !record.add(bid) || !record.add(channel)) {
             return;
+        }
         
         if ((_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) ||
             (_client.receive(received) != SMARTREST_END_OF_RESPONSE)) {
+            aError("Polling: Subscribe channel %s.\n", chn);
             _client.stop();
             return;
         }
-
         _client.stop();
         record.clear();
+        aInfo("Polling: Subscribed channel %s!\n", chn);
     }
     
-    {
-        IntegerValue msgId(83);
-        CharValue bid(bayeuxId);
-        if ((!record.add(msgId)) || (!record.add(bid)))
-            return;
+    if (!record.add(IntegerValue(83)) || !record.add(CharValue(bayeuxId))) {
+        return;
     }
 
     while (true) {
         if (_client.stream("/devicecontrol/notifications", record) != SMARTREST_SUCCESS) {
+            aError("Polling: Connect.\n");
             _client.stop();
             continue;
         }
-
-        puts("Receiving op");
         while (_client.receive(received) == SMARTREST_SUCCESS) {
-            if (!operationFromRecord(received, op))
+            for (size_t i = 0; i < received.values(); ++i) {
+                printf("%s ", received.rawValue(i));
+            }
+            puts("");
+            if (received.values() == 0) {
                 continue;
-            puts("Parsed record");
-//            printf("Thread 3: %l, %s, %u\r\n", op.identifier, op.descriptor, op.state);
-            _store.enqueue(op);
-            puts("Enqueued op");
-//            _store.takePending(op);
-//            puts("Updating op");
-//            updateOperation(op);
-//            puts("Updated op");
-//            bool ret = _executor.executeOperation(op);
-//            puts("Executed op");
-//            _store.markAsDone(op, ret);
-//            puts("Marked");
+            } else if (received.value(0).integerValue() == 86) { // advice responses
+                if (received.values() < 5) {
+                    aWarning("Incomplete advice, %u of 5 values received.\n", received.values());
+                    continue;
+                } else if (strcmp(received.value(4).characterValue(), "retry") == 0) {
+                    aDebug("Carry out 'retry' policy.\n");
+                    break;
+                } else if (strcmp(received.value(4).characterValue(), "handshake") == 0) {
+                }
+            } else if (received.value(0).integerValue() == 211) { // Operation State Message
+                Operation* op = opool.alloc();
+                if (op == NULL) {
+                    aCritical("opool full!\n");
+                } else if (operationFromRecord(received, *op)) {
+                    op->state = OPERATION_EXECUTING;
+                    opool.put(op);
+                } else {
+                    opool.free(op);
+                }
+            } else if (received.value(0).integerValue() >= 220 &&
+                       received.value(0).integerValue() <= 222) { // Real Operation
+                if (received.values() >= 3 &&
+                    received.value(2).valueType() == VALUE_INTEGER) {
+                    Operation* op = opool.alloc();
+                    op->identifier = received.value(2).integerValue();
+                    bool ret = _executor.executeOperation(received);
+                    op->state = ret ? OPERATION_SUCCESSFUL : OPERATION_FAILED;
+                    opool.put(op);
+                } else {
+                    aWarning("Incomplete operation, %u of 4 values received.\n", received.values());
+                }
+            } else {
+                aError("Unknown operation ID: %d\n", received.value(0).integerValue());
+            }
         }
-
-        //TODO: error checking
         _client.stop();
     }
 }
 
-void OperationSupport::thread1_func(void const *arg)
-{
-    OperationSupport *that;
-    that = (OperationSupport*)arg;
-    that->thread1();
-}
-
 void OperationSupport::thread2_func(void const *arg)
 {
     OperationSupport *that;