Revision:
0:25fa8795676b
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/simple-mbed-cloud-client/TESTS/host_tests/sdk_host_tests.py	Sun Apr 18 15:20:23 2021 +0000
@@ -0,0 +1,349 @@
+## ----------------------------------------------------------------------------
+## Copyright 2016-2018 ARM Ltd.
+##
+## SPDX-License-Identifier: Apache-2.0
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+##     http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ----------------------------------------------------------------------------
+
+from mbed_host_tests import BaseHostTest
+from mbed_host_tests.host_tests_logger import HtrunLogger
+from mbed_cloud.device_directory import DeviceDirectoryAPI
+from mbed_cloud.connect import ConnectAPI
+import os
+import time
+import subprocess
+import re
+import signal
+
+DEFAULT_CYCLE_PERIOD = 1.0
+
+class SDKTests(BaseHostTest):
+    __result = None
+    deviceApi = None
+    connectApi = None
+    deviceID = None
+    post_timeout = None
+    firmware_proc = None
+    firmware_sent = False
+    firmware_file = None
+    iteration = 0
+    boot_cycles = 0
+
+    def send_safe(self, key, value):
+        #self.send_kv('dummy_start', 0)
+        self.send_kv(key, value)
+        self.send_kv(key, value)
+        self.send_kv(key, value)
+        self.send_kv(key, value)
+        self.send_kv(key, value)
+        #self.send_kv('dummy_end', 1)
+
+    def _callback_device_booted(self, key, value, timestamp): 
+        # This is used to let the device boot normally
+        self.send_safe('__sync', 0)
+
+    def _callback_device_ready(self, key, value, timestamp):
+        # Send device iteration number after a reset
+        self.boot_cycles += 1
+        # Prevent boot loop due to Mbed OS crash
+        if self.boot_cycles <= 5:
+            self.send_safe('iteration', self.iteration)
+
+    def _callback_test_advance(self, key, value, timestamp):
+        # Advance test sequence
+        self.iteration = self.iteration + 1
+        self.send_safe('reset', 0)
+
+    def _callback_test_failed(self, key, value, timestamp):
+        # Test failed. End it.
+        self.notify_complete(False)
+
+    """
+    Device Register routines
+    """
+    def _callback_verify_registration(self, key, value, timestamp):
+        try:
+            #set value for later use
+            self.deviceID = value
+
+            # Check if device is in Mbed Cloud Device Directory
+            device = self.deviceApi.get_device(value)
+
+            # Send registraton status to device
+            self.send_safe("registered", 1 if device.state == "registered" else 0)
+        except:
+            # SDK throws an exception if the device is not found (unsuccessful registration) or times out
+            self.send_safe("registered", 0)
+
+    def _callback_verify_identity(self, key, value, timestamp):
+        # Send true if old DeviceID is the same as current device is
+        self.send_safe("verified", 1 if self.deviceID == value else 0)
+
+    """
+    Device Connect routines
+    """
+    def _callback_verify_lwm2m_get(self, key, value, timestamp):
+        timeout = 0
+
+        # Get resource value from device
+        async_response = self.connectApi.get_resource_value_async(self.deviceID, value)
+
+        # Set a 30 second timeout here.
+        while not async_response.is_done and timeout <= 50:
+            time.sleep(0.1)
+            timeout += 1
+
+        if not async_response.is_done:
+            # Kick the REST API
+            timeout = 0
+            async_response = self.connectApi.get_resource_value_async(self.deviceID, value)
+            while not async_response.is_done and timeout <= 250:
+                time.sleep(0.1)
+                timeout += 1
+
+        if async_response.is_done:
+            # Send resource value back to device
+            self.send_safe("get_value", async_response.value)
+        else:
+            # Request timed out.
+            self.send_safe("timeout", 0)
+
+    def _callback_verify_lwm2m_set(self, key, value, timestamp):
+        timeout = 0
+
+        # Get resource value from device
+        async_response = self.connectApi.get_resource_value_async(self.deviceID, value)
+
+        # Set a 30 second timeout here.
+        while not async_response.is_done and timeout <= 300:
+            time.sleep(0.1)
+            timeout += 1
+
+        if async_response.is_done:
+            # Send resource value back to device
+            self.send_safe("set_value", async_response.value)
+        else:
+            # Request timed out.
+            self.send_safe("timeout", 0)
+
+    def _callback_verify_lwm2m_put(self, key, value, timestamp):
+        timeout = 0
+
+        # Get resource value from device and increment it
+        resource_value = self.connectApi.get_resource_value_async(self.deviceID, value)
+
+        # Set a 30 second timeout here.
+        while not resource_value.is_done and timeout <= 300:
+            time.sleep(0.1)
+            timeout += 1
+
+        if not resource_value.is_done:
+            self.send_safe("timeout", 0)
+            return
+
+        updated_value = int(resource_value.value) + 5
+
+        # Set new resource value from cloud
+        async_response = self.connectApi.set_resource_value_async(self.deviceID, value, updated_value)
+
+        # Set a 30 second timeout here.
+        while not async_response.is_done and timeout <= 300:
+            time.sleep(0.1)
+            timeout += 1
+
+        if not async_response.is_done:
+            self.send_safe("timeout", 0)
+        else:
+            # Send new resource value to device for verification.
+            self.send_safe("res_set", updated_value);
+
+    def _callback_verify_lwm2m_post(self, key, value, timestamp):
+        timeout = 0
+
+        # Execute POST function on device
+        resource_value = self.connectApi.execute_resource_async(self.deviceID, value)
+
+        # Set a 30 second timeout here.
+        while not resource_value.is_done and timeout <= 300:
+            time.sleep(0.1)
+            timeout += 1
+
+        if not resource_value.is_done:
+            self.send_safe("timeout", 0)
+            self.post_timeout = 1
+
+    def _callback_verify_lwm2m_post_result(self, key, value, timestamp):
+
+        # Called from callback function on device, POST function working as expected.
+        # If post_timeout is not none, the request took longer than 30 seconds, which is
+        # a failure. Don't send this value.
+        if not self.post_timeout:
+            self.send_safe("post_test_executed", 0)
+
+    """
+    Device Firmware update routines
+    """
+    def firmware_campaign_cleanup(self):
+        if self.firmware_proc:
+            if os.name == 'nt':
+                os.kill(self.firmware_proc.pid, signal.CTRL_C_EVENT)
+                os.kill(self.firmware_proc.pid, signal.CTRL_BREAK_EVENT)
+            self.firmware_proc.terminate()
+            outs, errs = self.firmware_proc.communicate()
+            self.logger.prn_inf('Firmware campaign process killed: PID %s' % self.firmware_proc.pid)
+            self.firmware_proc = None
+
+            try:
+                time.sleep(1) # let the manifest-tool sub-process die gracefully
+                if self.firmware_file:
+                    os.remove(self.firmware_file)
+                    self.firmware_file = None
+            except Exception, e:
+                pass
+
+    def _callback_firmware_ready(self, key, value, timestamp):
+        if self.firmware_sent:
+            # Firmware was sent, but wasn't applied if this callback is called
+            self.firmware_campaign_cleanup()
+            self.notify_complete(False)
+        else:
+            # Send device iteration number after a reset
+            self.send_safe('iteration', self.iteration)
+
+    def _callback_firmware_prepare(self, key, value, timestamp):
+        if not self.deviceID:
+            self.logger.prn_err("ERROR: No DeviceID")
+            self.notify_complete(False)
+            return -1
+
+        target = self.get_config_item('platform_name')
+        image = self.get_config_item('image_path')
+        update_image = re.sub(r'(.+)\.([a-z0-9]+)$', r'\1_update.bin', image if image else "")
+        if not image or not os.path.exists(update_image):
+            self.logger.prn_err("ERROR: No main or update image")
+            self.notify_complete(False)
+            return -1
+        self.logger.prn_inf('Found FW update image: "%s"' % update_image)
+
+        try:
+            # Open the firmware update image as provided by the build system
+            with open(update_image, 'rb') as f:
+                raw = f.read()
+            # Modify the initial "spdmc_ready_chk" sequence into "firmware_update" 
+            # (matching the string length) as an indication that the firmware was changed/updated
+            raw = re.sub(r'spdmc_ready_chk', r'firmware_update', raw)
+
+            # Save the firmware into a temp place. Manifest tool has issues handling very long paths even if -n is specified
+            update_mod_image = ".%s.%s.%s" % (target, re.sub(r'.*[\\/](.+)\.([a-z0-9]+)$', r'\1_update_mod.\2', image), time.time())
+            with open(update_mod_image, 'wb') as f:
+                f.write(raw)
+        except Exception, e:
+            self.logger.prn_err("ERROR: While preparing modified image")
+            self.notify_complete(False)
+            return -1
+        self.logger.prn_inf('Modified FW update image: "%s"' % update_mod_image)
+
+        # Use non-blocking call, but remember the process, so we can kill it later
+        try:
+            spargs = dict()
+            if os.name == 'posix':
+                spargs['preexec_fn'] = os.setpgrp
+            elif os.name == 'nt':
+                spargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
+            self.firmware_proc = subprocess.Popen(["mbed", "dm", "update", "device", "-p", update_mod_image, "-D", self.deviceID], stderr=subprocess.STDOUT, **spargs)
+            self.firmware_file = update_mod_image
+        except Exception, e:
+            self.logger.prn_err("ERROR: Unable to execute 'mbed dm' sub-command")
+            self.firmware_campaign_cleanup()
+            self.notify_complete(False)
+            return -1
+
+        # At this point the firmware should be on it's way to the device
+        self.firmware_sent = True
+        self.send_safe('firmware_sent', 1)
+        self.logger.prn_inf("Firmware sent and update campaign started. Check for download progress.")
+
+    def _callback_firmware_update(self, key, value, timestamp):
+        self.logger.prn_inf("Firmware successfully updated!")
+        self.firmware_campaign_cleanup()
+        self.iteration = self.iteration + 1
+        self.send_safe('iteration', self.iteration)
+
+
+    """
+    Host setup routines
+    """
+    def setup(self):
+        # Generic test routines
+        self.register_callback('device_booted', self._callback_device_booted)
+        self.register_callback('device_ready', self._callback_device_ready)
+        self.register_callback('test_advance', self._callback_test_advance)
+        self.register_callback('test_failed', self._callback_test_failed)
+
+        # Callbacks from device registration tests
+        self.register_callback('verify_registration', self._callback_verify_registration)
+        self.register_callback('verify_identity', self._callback_verify_identity)
+
+        # Callbacks from LWM2M tests
+        self.register_callback('verify_lwm2m_get_test', self._callback_verify_lwm2m_get)
+        self.register_callback('verify_lwm2m_set_test', self._callback_verify_lwm2m_set)
+        self.register_callback('verify_lwm2m_put_test', self._callback_verify_lwm2m_put)
+        self.register_callback('verify_lwm2m_post_test', self._callback_verify_lwm2m_post)
+        self.register_callback('verify_lwm2m_post_test_result', self._callback_verify_lwm2m_post_result)
+
+        # Callbacks from FW update tests
+        self.register_callback('spdmc_ready_chk', self._callback_firmware_ready)
+        self.register_callback('firmware_prepare', self._callback_firmware_prepare)
+        self.register_callback('firmware_update', self._callback_firmware_update)
+
+        # Setup API config
+        try:
+            result = subprocess.check_output(["mbed", "config", "--list"], stderr=subprocess.STDOUT)
+        except Exception, e:
+            self.logger.prn_err("ERROR: CLOUD_SDK_API_KEY global config is not set: " + str(e))
+            return -1
+
+        match = re.search(r'CLOUD_SDK_API_KEY=(.*)\n', result)
+        if match == None:
+            self.logger.prn_err("ERROR: CLOUD_SDK_API_KEY global config is not set.")
+            return -1
+
+        api_key_val = match.group(1).strip()
+
+        # Get API KEY and remove LF char if included
+        self.logger.prn_inf("CLOUD_SDK_API_KEY: " + api_key_val)
+
+        api_config = {"api_key" : api_key_val, "host" : "https://api.us-east-1.mbedcloud.com"}
+
+        self.iteration = 0
+        self.boot_cycles = 0
+
+        # Instantiate Device and Connect API
+        self.deviceApi = DeviceDirectoryAPI(api_config)
+        self.connectApi = ConnectAPI(api_config)
+
+    def result(self):
+        return self.__result
+
+    def teardown(self):
+        # Delete device from directory so as not to hit device allocation quota.
+        if self.deviceID:
+            self.deviceApi.delete_device(self.deviceID)
+        self.firmware_campaign_cleanup()
+
+        pass
+
+    def __init__(self):
+        super(SDKTests, self).__init__()
+        self.logger = HtrunLogger('TEST')