Marco Zecchini
/
Example_RTOS
Rtos API example
Embed:
(wiki syntax)
Show/hide line numbers
host_test.py
00001 """ 00002 mbed SDK 00003 Copyright (c) 2011-2013 ARM Limited 00004 00005 Licensed under the Apache License, Version 2.0 (the "License"); 00006 you may not use this file except in compliance with the License. 00007 You may obtain a copy of the License at 00008 00009 http://www.apache.org/licenses/LICENSE-2.0 00010 00011 Unless required by applicable law or agreed to in writing, software 00012 distributed under the License is distributed on an "AS IS" BASIS, 00013 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 See the License for the specific language governing permissions and 00015 limitations under the License. 00016 """ 00017 00018 # Check if 'serial' module is installed 00019 try: 00020 from serial import Serial 00021 except ImportError, e: 00022 print "Error: Can't import 'serial' module: %s"% e 00023 exit(-1) 00024 00025 import os 00026 import re 00027 import types 00028 from sys import stdout 00029 from time import sleep, time 00030 from optparse import OptionParser 00031 00032 import host_tests_plugins 00033 00034 # This is a little tricky. We need to add upper directory to path so 00035 # we can find packages we want from the same level as other files do 00036 import sys 00037 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) 00038 from tools.test_api import get_autodetected_MUTS_list 00039 from tools.test_api import get_module_avail 00040 00041 00042 class Mbed : 00043 """ Base class for a host driven test 00044 """ 00045 def __init__(self): 00046 parser = OptionParser() 00047 00048 parser.add_option("-m", "--micro", 00049 dest="micro", 00050 help="The target microcontroller", 00051 metavar="MICRO") 00052 00053 parser.add_option("-p", "--port", 00054 dest="port", 00055 help="The serial port of the target mbed", 00056 metavar="PORT") 00057 00058 parser.add_option("-d", "--disk", 00059 dest="disk", 00060 help="The target disk path", 00061 metavar="DISK_PATH") 00062 00063 parser.add_option("-f", "--image-path", 00064 dest="image_path", 00065 help="Path with target's image", 00066 metavar="IMAGE_PATH") 00067 00068 parser.add_option("-c", "--copy", 00069 dest="copy_method", 00070 help="Copy method selector", 00071 metavar="COPY_METHOD") 00072 00073 parser.add_option("-C", "--program_cycle_s", 00074 dest="program_cycle_s", 00075 help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target", 00076 type="float", 00077 metavar="COPY_METHOD") 00078 00079 parser.add_option("-t", "--timeout", 00080 dest="timeout", 00081 help="Timeout", 00082 metavar="TIMEOUT") 00083 00084 parser.add_option("-r", "--reset", 00085 dest="forced_reset_type", 00086 help="Forces different type of reset") 00087 00088 parser.add_option("-R", "--reset-timeout", 00089 dest="forced_reset_timeout", 00090 metavar="NUMBER", 00091 type="int", 00092 help="When forcing a reset using option -r you can set up after reset timeout in seconds") 00093 00094 parser.add_option('', '--auto', 00095 dest='auto_detect', 00096 metavar=False, 00097 action="store_true", 00098 help='Use mbed-ls module to detect all connected mbed devices') 00099 00100 (self.options, _) = parser.parse_args() 00101 00102 self.DEFAULT_RESET_TOUT = 0 00103 self.DEFAULT_TOUT = 10 00104 00105 if self.options.port is None: 00106 raise Exception("The serial port of the target mbed have to be provided as command line arguments") 00107 00108 # Options related to copy / reset mbed device 00109 self.port = self.options.port 00110 self.disk = self.options.disk 00111 self.image_path = self.options.image_path.strip('"') 00112 self.copy_method = self.options.copy_method 00113 self.program_cycle_s = float(self.options.program_cycle_s) 00114 00115 self.serial = None 00116 self.serial_baud = 9600 00117 self.serial_timeout = 1 00118 00119 self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout 00120 print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port , self.disk ) 00121 00122 def init_serial_params (self, serial_baud=9600, serial_timeout=1): 00123 """ Initialize port parameters. 00124 This parameters will be used by self.init_serial() function to open serial port 00125 """ 00126 self.serial_baud = serial_baud 00127 self.serial_timeout = serial_timeout 00128 00129 def init_serial (self, serial_baud=None, serial_timeout=None): 00130 """ Initialize serial port. 00131 Function will return error is port can't be opened or initialized 00132 """ 00133 # Overload serial port configuration from default to parameters' values if they are specified 00134 serial_baud = serial_baud if serial_baud is not None else self.serial_baud 00135 serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout 00136 00137 if get_module_avail('mbed_lstools') and self.options.auto_detect: 00138 # Ensure serial port is up-to-date (try to find it 60 times) 00139 found = False 00140 00141 for i in range(0, 60): 00142 print('Looking for %s with MBEDLS' % self.options.micro) 00143 muts_list = get_autodetected_MUTS_list(platform_name_filter=[self.options.micro]) 00144 00145 if 1 in muts_list: 00146 mut = muts_list[1] 00147 self.port = mut['port'] 00148 found = True 00149 break 00150 else: 00151 sleep(3) 00152 00153 if not found: 00154 return False 00155 00156 # Clear serial port 00157 if self.serial : 00158 self.serial .close() 00159 self.serial = None 00160 00161 # We will pool for serial to be re-mounted if it was unmounted after device reset 00162 result = self.pool_for_serial_init (serial_baud, serial_timeout) # Blocking 00163 00164 # Port can be opened 00165 if result: 00166 self.flush () 00167 return result 00168 00169 def pool_for_serial_init (self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25): 00170 """ Functions pools for serial port readiness 00171 """ 00172 result = True 00173 last_error = None 00174 # This loop is used to check for serial port availability due to 00175 # some delays and remounting when devices are being flashed with new software. 00176 for i in range(pooling_loops): 00177 sleep(loop_delay if i else init_delay) 00178 try: 00179 self.serial = Serial(self.port , baudrate=serial_baud, timeout=serial_timeout) 00180 except Exception as e: 00181 result = False 00182 last_error = "MBED: %s"% str(e) 00183 stdout.write('.') 00184 stdout.flush() 00185 else: 00186 print "...port ready!" 00187 result = True 00188 break 00189 if not result and last_error: 00190 print last_error 00191 return result 00192 00193 def set_serial_timeout (self, timeout): 00194 """ Wraps self.mbed.serial object timeout property 00195 """ 00196 result = None 00197 if self.serial : 00198 self.serial .timeout = timeout 00199 result = True 00200 return result 00201 00202 def serial_read (self, count=1): 00203 """ Wraps self.mbed.serial object read method 00204 """ 00205 result = None 00206 if self.serial : 00207 try: 00208 result = self.serial .read(count) 00209 except: 00210 result = None 00211 return result 00212 00213 def serial_readline (self, timeout=5): 00214 """ Wraps self.mbed.serial object read method to read one line from serial port 00215 """ 00216 result = '' 00217 start = time() 00218 while (time() - start) < timeout: 00219 if self.serial : 00220 try: 00221 c = self.serial .read(1) 00222 result += c 00223 except Exception as e: 00224 print "MBED: %s"% str(e) 00225 result = None 00226 break 00227 if c == '\n': 00228 break 00229 return result 00230 00231 def serial_write (self, write_buffer): 00232 """ Wraps self.mbed.serial object write method 00233 """ 00234 result = None 00235 if self.serial : 00236 try: 00237 result = self.serial .write(write_buffer) 00238 except: 00239 result = None 00240 return result 00241 00242 def reset_timeout (self, timeout): 00243 """ Timeout executed just after reset command is issued 00244 """ 00245 for n in range(0, timeout): 00246 sleep(1) 00247 00248 def reset (self): 00249 """ Calls proper reset plugin to do the job. 00250 Please refer to host_test_plugins functionality 00251 """ 00252 # Flush serials to get only input after reset 00253 self.flush () 00254 if self.options.forced_reset_type: 00255 result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk ) 00256 else: 00257 result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial ) 00258 # Give time to wait for the image loading 00259 reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT 00260 self.reset_timeout (reset_tout_s) 00261 return result 00262 00263 def copy_image (self, image_path=None, disk=None, copy_method=None): 00264 """ Closure for copy_image_raw() method. 00265 Method which is actually copying image to mbed 00266 """ 00267 # Set closure environment 00268 image_path = image_path if image_path is not None else self.image_path 00269 disk = disk if disk is not None else self.disk 00270 copy_method = copy_method if copy_method is not None else self.copy_method 00271 # Call proper copy method 00272 result = self.copy_image_raw (image_path, disk, copy_method) 00273 return result 00274 00275 def copy_image_raw (self, image_path=None, disk=None, copy_method=None): 00276 """ Copy file depending on method you want to use. Handles exception 00277 and return code from shell copy commands. 00278 """ 00279 # image_path - Where is binary with target's firmware 00280 if copy_method is not None: 00281 # We override 'default' method with 'shell' method 00282 if copy_method == 'default': 00283 copy_method = 'shell' 00284 else: 00285 copy_method = 'shell' 00286 00287 result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk, program_cycle_s=self.program_cycle_s , target_mcu=self.options.micro) 00288 return result; 00289 00290 def flush (self): 00291 """ Flush serial ports 00292 """ 00293 result = False 00294 if self.serial : 00295 self.serial .flushInput() 00296 self.serial .flushOutput() 00297 result = True 00298 return result 00299 00300 00301 class HostTestResults : 00302 """ Test results set by host tests 00303 """ 00304 def __init__(self): 00305 self.RESULT_SUCCESS = 'success' 00306 self.RESULT_FAILURE = 'failure' 00307 self.RESULT_ERROR = 'error' 00308 self.RESULT_IO_SERIAL = 'ioerr_serial' 00309 self.RESULT_NO_IMAGE = 'no_image' 00310 self.RESULT_IOERR_COPY = "ioerr_copy" 00311 self.RESULT_PASSIVE = "passive" 00312 self.RESULT_NOT_DETECTED = "not_detected" 00313 self.RESULT_MBED_ASSERT = "mbed_assert" 00314 00315 00316 import tools.host_tests as host_tests 00317 00318 00319 class Test (HostTestResults ): 00320 """ Base class for host test's test runner 00321 """ 00322 # Select default host_test supervision (replaced after autodetection) 00323 test_supervisor = host_tests.get_host_test("default") 00324 00325 def __init__(self): 00326 self.mbed = Mbed() 00327 00328 def detect_test_config (self, verbose=False): 00329 """ Detects test case configuration 00330 """ 00331 result = {} 00332 while True: 00333 line = self.mbed .serial_readline() 00334 if "{start}" in line: 00335 self.notify ("HOST: Start test...") 00336 break 00337 else: 00338 # Detect if this is property from TEST_ENV print 00339 m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1]) 00340 if m and len(m.groups()) == 2: 00341 # This is most likely auto-detection property 00342 result[m.group(1)] = m.group(2) 00343 if verbose: 00344 self.notify ("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2))) 00345 else: 00346 # We can check if this is TArget Id in mbed specific format 00347 m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1]) 00348 if m2 and len(m2.groups()) == 2: 00349 if verbose: 00350 target_id = m2.group(1) + m2.group(2) 00351 self.notify ("HOST: TargetID '%s'"% target_id) 00352 self.notify (line[len(target_id):-1]) 00353 else: 00354 self.notify ("HOST: Unknown property: %s"% line.strip()) 00355 return result 00356 00357 def run (self): 00358 """ Test runner for host test. This function will start executing 00359 test and forward test result via serial port to test suite 00360 """ 00361 # Copy image to device 00362 self.notify ("HOST: Copy image onto target...") 00363 result = self.mbed .copy_image() 00364 if not result: 00365 self.print_result (self.RESULT_IOERR_COPY) 00366 00367 # Initialize and open target's serial port (console) 00368 self.notify ("HOST: Initialize serial port...") 00369 result = self.mbed .init_serial() 00370 if not result: 00371 self.print_result (self.RESULT_IO_SERIAL) 00372 00373 # Reset device 00374 self.notify ("HOST: Reset target...") 00375 result = self.mbed .reset() 00376 if not result: 00377 self.print_result (self.RESULT_IO_SERIAL) 00378 00379 # Run test 00380 try: 00381 CONFIG = self.detect_test_config (verbose=True) # print CONFIG 00382 00383 if "host_test_name" in CONFIG: 00384 if host_tests.is_host_test(CONFIG["host_test_name"]): 00385 self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"]) 00386 result = self.test_supervisor .test(self) #result = self.test() 00387 00388 if result is not None: 00389 self.print_result (result) 00390 else: 00391 self.notify ("HOST: Passive mode...") 00392 except Exception, e: 00393 print str(e) 00394 self.print_result (self.RESULT_ERROR) 00395 00396 def setup (self): 00397 """ Setup and check if configuration for test is 00398 correct. E.g. if serial port can be opened. 00399 """ 00400 result = True 00401 if not self.mbed .serial: 00402 result = False 00403 self.print_result (self.RESULT_IO_SERIAL) 00404 return result 00405 00406 def notify (self, message): 00407 """ On screen notification function 00408 """ 00409 print message 00410 stdout.flush() 00411 00412 def print_result (self, result): 00413 """ Test result unified printing function 00414 """ 00415 self.notify ("\r\n{{%s}}\r\n{{end}}" % result) 00416 00417 00418 class DefaultTestSelector (Test ): 00419 """ Test class with serial port initialization 00420 """ 00421 def __init__(self): 00422 HostTestResults.__init__(self) 00423 Test.__init__(self) 00424 00425 if __name__ == '__main__': 00426 DefaultTestSelector().run()
Generated on Sun Jul 17 2022 08:25:23 by 1.7.2