Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: FXAS21002 FXOS8700Q
sdk_host_tests.py
00001 ## ---------------------------------------------------------------------------- 00002 ## Copyright 2016-2018 ARM Ltd. 00003 ## 00004 ## SPDX-License-Identifier: Apache-2.0 00005 ## 00006 ## Licensed under the Apache License, Version 2.0 (the "License"); 00007 ## you may not use this file except in compliance with the License. 00008 ## You may obtain a copy of the License at 00009 ## 00010 ## http://www.apache.org/licenses/LICENSE-2.0 00011 ## 00012 ## Unless required by applicable law or agreed to in writing, software 00013 ## distributed under the License is distributed on an "AS IS" BASIS, 00014 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 ## See the License for the specific language governing permissions and 00016 ## limitations under the License. 00017 ## ---------------------------------------------------------------------------- 00018 00019 from mbed_host_tests import BaseHostTest 00020 from mbed_host_tests.host_tests_logger import HtrunLogger 00021 from mbed_cloud.device_directory import DeviceDirectoryAPI 00022 from mbed_cloud.connect import ConnectAPI 00023 import os 00024 import time 00025 import subprocess 00026 import re 00027 import signal 00028 00029 DEFAULT_CYCLE_PERIOD = 1.0 00030 00031 class SDKTests(BaseHostTest): 00032 __result = None 00033 deviceApi = None 00034 connectApi = None 00035 deviceID = None 00036 post_timeout = None 00037 firmware_proc = None 00038 firmware_sent = False 00039 firmware_file = None 00040 iteration = 0 00041 boot_cycles = 0 00042 00043 def send_safe(self, key, value): 00044 #self.send_kv('dummy_start', 0) 00045 self.send_kv(key, value) 00046 self.send_kv(key, value) 00047 self.send_kv(key, value) 00048 self.send_kv(key, value) 00049 self.send_kv(key, value) 00050 #self.send_kv('dummy_end', 1) 00051 00052 def _callback_device_booted(self, key, value, timestamp): 00053 # This is used to let the device boot normally 00054 self.send_safe('__sync', 0) 00055 00056 def _callback_device_ready(self, key, value, timestamp): 00057 # Send device iteration number after a reset 00058 self.boot_cycles += 1 00059 # Prevent boot loop due to Mbed OS crash 00060 if self.boot_cycles <= 5: 00061 self.send_safe('iteration', self.iteration) 00062 00063 def _callback_test_advance(self, key, value, timestamp): 00064 # Advance test sequence 00065 self.iteration = self.iteration + 1 00066 self.send_safe('reset', 0) 00067 00068 def _callback_test_failed(self, key, value, timestamp): 00069 # Test failed. End it. 00070 self.notify_complete(False) 00071 00072 """ 00073 Device Register routines 00074 """ 00075 def _callback_verify_registration(self, key, value, timestamp): 00076 try: 00077 #set value for later use 00078 self.deviceID = value 00079 00080 # Check if device is in Mbed Cloud Device Directory 00081 device = self.deviceApi.get_device(value) 00082 00083 # Send registraton status to device 00084 self.send_safe("registered", 1 if device.state == "registered" else 0) 00085 except: 00086 # SDK throws an exception if the device is not found (unsuccessful registration) or times out 00087 self.send_safe("registered", 0) 00088 00089 def _callback_verify_identity(self, key, value, timestamp): 00090 # Send true if old DeviceID is the same as current device is 00091 self.send_safe("verified", 1 if self.deviceID == value else 0) 00092 00093 """ 00094 Device Connect routines 00095 """ 00096 def _callback_verify_lwm2m_get(self, key, value, timestamp): 00097 timeout = 0 00098 00099 # Get resource value from device 00100 async_response = self.connectApi.get_resource_value_async(self.deviceID, value) 00101 00102 # Set a 30 second timeout here. 00103 while not async_response.is_done and timeout <= 50: 00104 time.sleep(0.1) 00105 timeout += 1 00106 00107 if not async_response.is_done: 00108 # Kick the REST API 00109 timeout = 0 00110 async_response = self.connectApi.get_resource_value_async(self.deviceID, value) 00111 while not async_response.is_done and timeout <= 250: 00112 time.sleep(0.1) 00113 timeout += 1 00114 00115 if async_response.is_done: 00116 # Send resource value back to device 00117 self.send_safe("get_value", async_response.value) 00118 else: 00119 # Request timed out. 00120 self.send_safe("timeout", 0) 00121 00122 def _callback_verify_lwm2m_set(self, key, value, timestamp): 00123 timeout = 0 00124 00125 # Get resource value from device 00126 async_response = self.connectApi.get_resource_value_async(self.deviceID, value) 00127 00128 # Set a 30 second timeout here. 00129 while not async_response.is_done and timeout <= 300: 00130 time.sleep(0.1) 00131 timeout += 1 00132 00133 if async_response.is_done: 00134 # Send resource value back to device 00135 self.send_safe("set_value", async_response.value) 00136 else: 00137 # Request timed out. 00138 self.send_safe("timeout", 0) 00139 00140 def _callback_verify_lwm2m_put(self, key, value, timestamp): 00141 timeout = 0 00142 00143 # Get resource value from device and increment it 00144 resource_value = self.connectApi.get_resource_value_async(self.deviceID, value) 00145 00146 # Set a 30 second timeout here. 00147 while not resource_value.is_done and timeout <= 300: 00148 time.sleep(0.1) 00149 timeout += 1 00150 00151 if not resource_value.is_done: 00152 self.send_safe("timeout", 0) 00153 return 00154 00155 updated_value = int(resource_value.value) + 5 00156 00157 # Set new resource value from cloud 00158 async_response = self.connectApi.set_resource_value_async(self.deviceID, value, updated_value) 00159 00160 # Set a 30 second timeout here. 00161 while not async_response.is_done and timeout <= 300: 00162 time.sleep(0.1) 00163 timeout += 1 00164 00165 if not async_response.is_done: 00166 self.send_safe("timeout", 0) 00167 else: 00168 # Send new resource value to device for verification. 00169 self.send_safe("res_set", updated_value); 00170 00171 def _callback_verify_lwm2m_post(self, key, value, timestamp): 00172 timeout = 0 00173 00174 # Execute POST function on device 00175 resource_value = self.connectApi.execute_resource_async(self.deviceID, value) 00176 00177 # Set a 30 second timeout here. 00178 while not resource_value.is_done and timeout <= 300: 00179 time.sleep(0.1) 00180 timeout += 1 00181 00182 if not resource_value.is_done: 00183 self.send_safe("timeout", 0) 00184 self.post_timeout = 1 00185 00186 def _callback_verify_lwm2m_post_result(self, key, value, timestamp): 00187 00188 # Called from callback function on device, POST function working as expected. 00189 # If post_timeout is not none, the request took longer than 30 seconds, which is 00190 # a failure. Don't send this value. 00191 if not self.post_timeout: 00192 self.send_safe("post_test_executed", 0) 00193 00194 """ 00195 Device Firmware update routines 00196 """ 00197 def firmware_campaign_cleanup(self): 00198 if self.firmware_proc: 00199 if os.name == 'nt': 00200 os.kill(self.firmware_proc.pid, signal.CTRL_C_EVENT) 00201 os.kill(self.firmware_proc.pid, signal.CTRL_BREAK_EVENT) 00202 self.firmware_proc.terminate() 00203 outs, errs = self.firmware_proc.communicate() 00204 self.logger.prn_inf('Firmware campaign process killed: PID %s' % self.firmware_proc.pid) 00205 self.firmware_proc = None 00206 00207 try: 00208 time.sleep(1) # let the manifest-tool sub-process die gracefully 00209 if self.firmware_file: 00210 os.remove(self.firmware_file) 00211 self.firmware_file = None 00212 except Exception, e: 00213 pass 00214 00215 def _callback_firmware_ready(self, key, value, timestamp): 00216 if self.firmware_sent: 00217 # Firmware was sent, but wasn't applied if this callback is called 00218 self.firmware_campaign_cleanup() 00219 self.notify_complete(False) 00220 else: 00221 # Send device iteration number after a reset 00222 self.send_safe('iteration', self.iteration) 00223 00224 def _callback_firmware_prepare(self, key, value, timestamp): 00225 if not self.deviceID: 00226 self.logger.prn_err("ERROR: No DeviceID") 00227 self.notify_complete(False) 00228 return -1 00229 00230 target = self.get_config_item('platform_name') 00231 image = self.get_config_item('image_path') 00232 update_image = re.sub(r'(.+)\.([a-z0-9]+)$', r'\1_update.bin', image if image else "") 00233 if not image or not os.path.exists(update_image): 00234 self.logger.prn_err("ERROR: No main or update image") 00235 self.notify_complete(False) 00236 return -1 00237 self.logger.prn_inf('Found FW update image: "%s"' % update_image) 00238 00239 try: 00240 # Open the firmware update image as provided by the build system 00241 with open(update_image, 'rb') as f: 00242 raw = f.read() 00243 # Modify the initial "spdmc_ready_chk" sequence into "firmware_update" 00244 # (matching the string length) as an indication that the firmware was changed/updated 00245 raw = re.sub(r'spdmc_ready_chk', r'firmware_update', raw) 00246 00247 # Save the firmware into a temp place. Manifest tool has issues handling very long paths even if -n is specified 00248 update_mod_image = ".%s.%s.%s" % (target, re.sub(r'.*[\\/](.+)\.([a-z0-9]+)$', r'\1_update_mod.\2', image), time.time()) 00249 with open(update_mod_image, 'wb') as f: 00250 f.write(raw) 00251 except Exception, e: 00252 self.logger.prn_err("ERROR: While preparing modified image") 00253 self.notify_complete(False) 00254 return -1 00255 self.logger.prn_inf('Modified FW update image: "%s"' % update_mod_image) 00256 00257 # Use non-blocking call, but remember the process, so we can kill it later 00258 try: 00259 spargs = dict() 00260 if os.name == 'posix': 00261 spargs['preexec_fn'] = os.setpgrp 00262 elif os.name == 'nt': 00263 spargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP 00264 self.firmware_proc = subprocess.Popen(["mbed", "dm", "update", "device", "-p", update_mod_image, "-D", self.deviceID], stderr=subprocess.STDOUT, **spargs) 00265 self.firmware_file = update_mod_image 00266 except Exception, e: 00267 self.logger.prn_err("ERROR: Unable to execute 'mbed dm' sub-command") 00268 self.firmware_campaign_cleanup() 00269 self.notify_complete(False) 00270 return -1 00271 00272 # At this point the firmware should be on it's way to the device 00273 self.firmware_sent = True 00274 self.send_safe('firmware_sent', 1) 00275 self.logger.prn_inf("Firmware sent and update campaign started. Check for download progress.") 00276 00277 def _callback_firmware_update(self, key, value, timestamp): 00278 self.logger.prn_inf("Firmware successfully updated!") 00279 self.firmware_campaign_cleanup() 00280 self.iteration = self.iteration + 1 00281 self.send_safe('iteration', self.iteration) 00282 00283 00284 """ 00285 Host setup routines 00286 """ 00287 def setup(self): 00288 # Generic test routines 00289 self.register_callback('device_booted', self._callback_device_booted) 00290 self.register_callback('device_ready', self._callback_device_ready) 00291 self.register_callback('test_advance', self._callback_test_advance) 00292 self.register_callback('test_failed', self._callback_test_failed) 00293 00294 # Callbacks from device registration tests 00295 self.register_callback('verify_registration', self._callback_verify_registration) 00296 self.register_callback('verify_identity', self._callback_verify_identity) 00297 00298 # Callbacks from LWM2M tests 00299 self.register_callback('verify_lwm2m_get_test', self._callback_verify_lwm2m_get) 00300 self.register_callback('verify_lwm2m_set_test', self._callback_verify_lwm2m_set) 00301 self.register_callback('verify_lwm2m_put_test', self._callback_verify_lwm2m_put) 00302 self.register_callback('verify_lwm2m_post_test', self._callback_verify_lwm2m_post) 00303 self.register_callback('verify_lwm2m_post_test_result', self._callback_verify_lwm2m_post_result) 00304 00305 # Callbacks from FW update tests 00306 self.register_callback('spdmc_ready_chk', self._callback_firmware_ready) 00307 self.register_callback('firmware_prepare', self._callback_firmware_prepare) 00308 self.register_callback('firmware_update', self._callback_firmware_update) 00309 00310 # Setup API config 00311 try: 00312 result = subprocess.check_output(["mbed", "config", "--list"], stderr=subprocess.STDOUT) 00313 except Exception, e: 00314 self.logger.prn_err("ERROR: CLOUD_SDK_API_KEY global config is not set: " + str(e)) 00315 return -1 00316 00317 match = re.search(r'CLOUD_SDK_API_KEY=(.*)\n', result) 00318 if match == None: 00319 self.logger.prn_err("ERROR: CLOUD_SDK_API_KEY global config is not set.") 00320 return -1 00321 00322 api_key_val = match.group(1).strip() 00323 00324 # Get API KEY and remove LF char if included 00325 self.logger.prn_inf("CLOUD_SDK_API_KEY: " + api_key_val) 00326 00327 api_config = {"api_key" : api_key_val, "host" : "https://api.us-east-1.mbedcloud.com"} 00328 00329 self.iteration = 0 00330 self.boot_cycles = 0 00331 00332 # Instantiate Device and Connect API 00333 self.deviceApi = DeviceDirectoryAPI(api_config) 00334 self.connectApi = ConnectAPI(api_config) 00335 00336 def result(self): 00337 return self.__result 00338 00339 def teardown(self): 00340 # Delete device from directory so as not to hit device allocation quota. 00341 if self.deviceID: 00342 self.deviceApi.delete_device(self.deviceID) 00343 self.firmware_campaign_cleanup() 00344 00345 pass 00346 00347 def __init__(self): 00348 super(SDKTests, self).__init__() 00349 self.logger = HtrunLogger('TEST')
Generated on Tue Jul 12 2022 20:21:02 by
