Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
simple-mbed-cloud-client/TESTS/host_tests/sdk_host_tests.py
- Committer:
- leothedragon
- Date:
- 2021-05-04
- Revision:
- 0:8f0bb79ddd48
File content as of revision 0:8f0bb79ddd48:
## ---------------------------------------------------------------------------- ## Copyright 2016-2018 ARM Ltd. ## ## SPDX-License-Identifier: Apache-2.0 ## ## Licensed under the Apache License, Version 2.0 (the "License"); ## you may not use this file except in compliance with the License. ## You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## ---------------------------------------------------------------------------- from mbed_host_tests import BaseHostTest from mbed_host_tests.host_tests_logger import HtrunLogger from mbed_cloud.device_directory import DeviceDirectoryAPI from mbed_cloud.connect import ConnectAPI import os import time import subprocess import re import signal DEFAULT_CYCLE_PERIOD = 1.0 class SDKTests(BaseHostTest): __result = None deviceApi = None connectApi = None deviceID = None post_timeout = None firmware_proc = None firmware_sent = False firmware_file = None iteration = 0 boot_cycles = 0 def send_safe(self, key, value): #self.send_kv('dummy_start', 0) self.send_kv(key, value) self.send_kv(key, value) self.send_kv(key, value) self.send_kv(key, value) self.send_kv(key, value) #self.send_kv('dummy_end', 1) def _callback_device_booted(self, key, value, timestamp): # This is used to let the device boot normally self.send_safe('__sync', 0) def _callback_device_ready(self, key, value, timestamp): # Send device iteration number after a reset self.boot_cycles += 1 # Prevent boot loop due to Mbed OS crash if self.boot_cycles <= 5: self.send_safe('iteration', self.iteration) def _callback_test_advance(self, key, value, timestamp): # Advance test sequence self.iteration = self.iteration + 1 self.send_safe('reset', 0) def _callback_test_failed(self, key, value, timestamp): # Test failed. End it. self.notify_complete(False) """ Device Register routines """ def _callback_verify_registration(self, key, value, timestamp): try: #set value for later use self.deviceID = value # Check if device is in Mbed Cloud Device Directory device = self.deviceApi.get_device(value) # Send registraton status to device self.send_safe("registered", 1 if device.state == "registered" else 0) except: # SDK throws an exception if the device is not found (unsuccessful registration) or times out self.send_safe("registered", 0) def _callback_verify_identity(self, key, value, timestamp): # Send true if old DeviceID is the same as current device is self.send_safe("verified", 1 if self.deviceID == value else 0) """ Device Connect routines """ def _callback_verify_lwm2m_get(self, key, value, timestamp): timeout = 0 # Get resource value from device async_response = self.connectApi.get_resource_value_async(self.deviceID, value) # Set a 30 second timeout here. while not async_response.is_done and timeout <= 50: time.sleep(0.1) timeout += 1 if not async_response.is_done: # Kick the REST API timeout = 0 async_response = self.connectApi.get_resource_value_async(self.deviceID, value) while not async_response.is_done and timeout <= 250: time.sleep(0.1) timeout += 1 if async_response.is_done: # Send resource value back to device self.send_safe("get_value", async_response.value) else: # Request timed out. self.send_safe("timeout", 0) def _callback_verify_lwm2m_set(self, key, value, timestamp): timeout = 0 # Get resource value from device async_response = self.connectApi.get_resource_value_async(self.deviceID, value) # Set a 30 second timeout here. while not async_response.is_done and timeout <= 300: time.sleep(0.1) timeout += 1 if async_response.is_done: # Send resource value back to device self.send_safe("set_value", async_response.value) else: # Request timed out. self.send_safe("timeout", 0) def _callback_verify_lwm2m_put(self, key, value, timestamp): timeout = 0 # Get resource value from device and increment it resource_value = self.connectApi.get_resource_value_async(self.deviceID, value) # Set a 30 second timeout here. while not resource_value.is_done and timeout <= 300: time.sleep(0.1) timeout += 1 if not resource_value.is_done: self.send_safe("timeout", 0) return updated_value = int(resource_value.value) + 5 # Set new resource value from cloud async_response = self.connectApi.set_resource_value_async(self.deviceID, value, updated_value) # Set a 30 second timeout here. while not async_response.is_done and timeout <= 300: time.sleep(0.1) timeout += 1 if not async_response.is_done: self.send_safe("timeout", 0) else: # Send new resource value to device for verification. self.send_safe("res_set", updated_value); def _callback_verify_lwm2m_post(self, key, value, timestamp): timeout = 0 # Execute POST function on device resource_value = self.connectApi.execute_resource_async(self.deviceID, value) # Set a 30 second timeout here. while not resource_value.is_done and timeout <= 300: time.sleep(0.1) timeout += 1 if not resource_value.is_done: self.send_safe("timeout", 0) self.post_timeout = 1 def _callback_verify_lwm2m_post_result(self, key, value, timestamp): # Called from callback function on device, POST function working as expected. # If post_timeout is not none, the request took longer than 30 seconds, which is # a failure. Don't send this value. if not self.post_timeout: self.send_safe("post_test_executed", 0) """ Device Firmware update routines """ def firmware_campaign_cleanup(self): if self.firmware_proc: if os.name == 'nt': os.kill(self.firmware_proc.pid, signal.CTRL_C_EVENT) os.kill(self.firmware_proc.pid, signal.CTRL_BREAK_EVENT) self.firmware_proc.terminate() outs, errs = self.firmware_proc.communicate() self.logger.prn_inf('Firmware campaign process killed: PID %s' % self.firmware_proc.pid) self.firmware_proc = None try: time.sleep(1) # let the manifest-tool sub-process die gracefully if self.firmware_file: os.remove(self.firmware_file) self.firmware_file = None except Exception, e: pass def _callback_firmware_ready(self, key, value, timestamp): if self.firmware_sent: # Firmware was sent, but wasn't applied if this callback is called self.firmware_campaign_cleanup() self.notify_complete(False) else: # Send device iteration number after a reset self.send_safe('iteration', self.iteration) def _callback_firmware_prepare(self, key, value, timestamp): if not self.deviceID: self.logger.prn_err("ERROR: No DeviceID") self.notify_complete(False) return -1 target = self.get_config_item('platform_name') image = self.get_config_item('image_path') update_image = re.sub(r'(.+)\.([a-z0-9]+)$', r'\1_update.bin', image if image else "") if not image or not os.path.exists(update_image): self.logger.prn_err("ERROR: No main or update image") self.notify_complete(False) return -1 self.logger.prn_inf('Found FW update image: "%s"' % update_image) try: # Open the firmware update image as provided by the build system with open(update_image, 'rb') as f: raw = f.read() # Modify the initial "spdmc_ready_chk" sequence into "firmware_update" # (matching the string length) as an indication that the firmware was changed/updated raw = re.sub(r'spdmc_ready_chk', r'firmware_update', raw) # Save the firmware into a temp place. Manifest tool has issues handling very long paths even if -n is specified update_mod_image = ".%s.%s.%s" % (target, re.sub(r'.*[\\/](.+)\.([a-z0-9]+)$', r'\1_update_mod.\2', image), time.time()) with open(update_mod_image, 'wb') as f: f.write(raw) except Exception, e: self.logger.prn_err("ERROR: While preparing modified image") self.notify_complete(False) return -1 self.logger.prn_inf('Modified FW update image: "%s"' % update_mod_image) # Use non-blocking call, but remember the process, so we can kill it later try: spargs = dict() if os.name == 'posix': spargs['preexec_fn'] = os.setpgrp elif os.name == 'nt': spargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP self.firmware_proc = subprocess.Popen(["mbed", "dm", "update", "device", "-p", update_mod_image, "-D", self.deviceID], stderr=subprocess.STDOUT, **spargs) self.firmware_file = update_mod_image except Exception, e: self.logger.prn_err("ERROR: Unable to execute 'mbed dm' sub-command") self.firmware_campaign_cleanup() self.notify_complete(False) return -1 # At this point the firmware should be on it's way to the device self.firmware_sent = True self.send_safe('firmware_sent', 1) self.logger.prn_inf("Firmware sent and update campaign started. Check for download progress.") def _callback_firmware_update(self, key, value, timestamp): self.logger.prn_inf("Firmware successfully updated!") self.firmware_campaign_cleanup() self.iteration = self.iteration + 1 self.send_safe('iteration', self.iteration) """ Host setup routines """ def setup(self): # Generic test routines self.register_callback('device_booted', self._callback_device_booted) self.register_callback('device_ready', self._callback_device_ready) self.register_callback('test_advance', self._callback_test_advance) self.register_callback('test_failed', self._callback_test_failed) # Callbacks from device registration tests self.register_callback('verify_registration', self._callback_verify_registration) self.register_callback('verify_identity', self._callback_verify_identity) # Callbacks from LWM2M tests self.register_callback('verify_lwm2m_get_test', self._callback_verify_lwm2m_get) self.register_callback('verify_lwm2m_set_test', self._callback_verify_lwm2m_set) self.register_callback('verify_lwm2m_put_test', self._callback_verify_lwm2m_put) self.register_callback('verify_lwm2m_post_test', self._callback_verify_lwm2m_post) self.register_callback('verify_lwm2m_post_test_result', self._callback_verify_lwm2m_post_result) # Callbacks from FW update tests self.register_callback('spdmc_ready_chk', self._callback_firmware_ready) self.register_callback('firmware_prepare', self._callback_firmware_prepare) self.register_callback('firmware_update', self._callback_firmware_update) # Setup API config try: result = subprocess.check_output(["mbed", "config", "--list"], stderr=subprocess.STDOUT) except Exception, e: self.logger.prn_err("ERROR: CLOUD_SDK_API_KEY global config is not set: " + str(e)) return -1 match = re.search(r'CLOUD_SDK_API_KEY=(.*)\n', result) if match == None: self.logger.prn_err("ERROR: CLOUD_SDK_API_KEY global config is not set.") return -1 api_key_val = match.group(1).strip() # Get API KEY and remove LF char if included self.logger.prn_inf("CLOUD_SDK_API_KEY: " + api_key_val) api_config = {"api_key" : api_key_val, "host" : "https://api.us-east-1.mbedcloud.com"} self.iteration = 0 self.boot_cycles = 0 # Instantiate Device and Connect API self.deviceApi = DeviceDirectoryAPI(api_config) self.connectApi = ConnectAPI(api_config) def result(self): return self.__result def teardown(self): # Delete device from directory so as not to hit device allocation quota. if self.deviceID: self.deviceApi.delete_device(self.deviceID) self.firmware_campaign_cleanup() pass def __init__(self): super(SDKTests, self).__init__() self.logger = HtrunLogger('TEST')