Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
usb_test.py
00001 # 00002 # DAPLink Interface Firmware 00003 # Copyright (c) 2016-2016, ARM Limited, All Rights Reserved 00004 # SPDX-License-Identifier: Apache-2.0 00005 # 00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may 00007 # not use this file except in compliance with the License. 00008 # You may obtain a copy of the License at 00009 # 00010 # http://www.apache.org/licenses/LICENSE-2.0 00011 # 00012 # Unless required by applicable law or agreed to in writing, software 00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 # See the License for the specific language governing permissions and 00016 # limitations under the License. 00017 # 00018 """ Module for low level and targeted USB tests""" 00019 00020 from __future__ import print_function 00021 00022 import os 00023 import usb.core 00024 import functools 00025 import threading 00026 import time 00027 from usb_cdc import USBCdc 00028 from usb_hid import USBHid 00029 from usb_msd import USBMsd, Fat 00030 00031 DISMOUNT_TIME_S = 10.00 00032 00033 00034 def test_usb(workspace, parent_test, force=False): 00035 """Run raw USB tests 00036 00037 Requirements: 00038 -daplink-validation must be loaded for the target. 00039 """ 00040 00041 # Only test on supported platforms 00042 if not _platform_supports_usb_test() and not force: 00043 parent_test.info("Skipping USB test on this platform") 00044 return 00045 test_info = parent_test.create_subtest("USB test") 00046 00047 # Find the device under test 00048 serial_number = workspace.board.get_unique_id() 00049 dev = _daplink_from_serial_number(serial_number) 00050 if dev is None: 00051 test_info.failure("Could not find board with serial number %s" % 00052 serial_number) 00053 return 00054 00055 # Create wrappers for and acquire exclusive access to interfaces 00056 cdc = USBCdc(dev) 00057 hid = USBHid(dev) 00058 msd = USBMsd(dev) 00059 cdc.lock() 00060 hid.lock() 00061 msd.lock() 00062 00063 try: 00064 00065 for usb_test_on in (False, True): 00066 00067 _set_usb_test_mode(hid, usb_test_on) 00068 00069 test_cdc(test_info, cdc) 00070 00071 test_hid(test_info, hid) 00072 00073 test_msd(test_info, msd) 00074 00075 test_msd_stall(test_info, msd) 00076 00077 test_control(test_info, dev, cdc, hid, msd) 00078 00079 test_all(test_info, cdc, hid, msd) 00080 00081 # TODO - future enhancements 00082 # MSD remount + hid 00083 # STALL IN and STALL OUT 00084 00085 finally: 00086 try: 00087 _set_usb_test_mode(hid, False) 00088 except usb.core.USBError: 00089 pass 00090 cdc.unlock() 00091 hid.unlock() 00092 msd.unlock() 00093 00094 00095 def main(): 00096 """Run the usb test as a stand alone program""" 00097 00098 import test_info 00099 import mock 00100 00101 def get_unique_id(unique_id): 00102 """Mock function to return a unique id""" 00103 return unique_id 00104 00105 dev_list = usb.core.find(find_all=True, custom_match=_daplink_match) 00106 for dev in dev_list: 00107 board_id = dev.serial_number 00108 print("Testing board %s" % board_id) 00109 print("----------------") 00110 mock_ws = mock.Mock() 00111 mock_ws.board = mock.Mock() 00112 mock_ws.board.unique_id = dev.serial_number 00113 mock_ws.board.get_unique_id = functools.partial(get_unique_id, 00114 board_id) 00115 test_usb(mock_ws, test_info.TestInfoStub(), True) 00116 00117 00118 def test_cdc(test_info, cdc): 00119 """Smoke test of the CDC endpoint""" 00120 cdc.set_line_coding(115200) 00121 baud, fmt, parity, databits = cdc.get_line_coding() 00122 test_info.info("Baud %i, fmt %i, parity %i, databits %i" % 00123 (baud, fmt, parity, databits)) 00124 cdc.send_break(cdc.SEND_BREAK_ON) 00125 cdc.send_break(cdc.SEND_BREAK_OFF) 00126 data = cdc.read(1024) 00127 test_info.info("Serial port data: %s" % bytearray(data)) 00128 cdc.write("Hello world") 00129 data = cdc.read(1024) 00130 test_info.info("Serial port data2: %s" % bytearray(data)) 00131 00132 00133 def test_hid(test_info, hid): 00134 """Smoke test of the HID endpoint""" 00135 hid.set_idle() 00136 report = hid.get_descriptor(hid.DESC_TYPE_REPORT, 0) 00137 test_info.info("Report descriptor: %s" % report) 00138 # Send CMSIS-DAP vendor command to get the serial number 00139 data = bytearray(64) 00140 data[0] = 0x80 00141 hid.set_report(data) 00142 resp = hid.get_report(64) 00143 length = resp[1] 00144 test_info.info("CMSIS-DAP response: %s" % 00145 bytearray(resp[1:1 + length]).decode("utf-8")) 00146 00147 00148 def test_msd(test_info, msd): 00149 """MSD endpoint tests""" 00150 00151 # Simple read 00152 mbr = msd.scsi_read10(0, 1) 00153 test_info.info("MBR[0:16]: %s" % mbr[0:16]) 00154 00155 # Test FAT filesystem 00156 fat = Fat(msd) 00157 print(fat.mbr) 00158 00159 # Grab entries in the root directory 00160 root_dir = fat.root_dir 00161 for entry in root_dir: 00162 if entry["DIR_Name"][0] != "\0": 00163 print(entry) 00164 00165 # Trigger a remount 00166 dir_idx = root_dir.find_free_entry_index() 00167 root_dir[dir_idx]["DIR_Name"] = "REFRESH ACT" 00168 root_dir_data = root_dir.pack() 00169 msd.scsi_write10(root_dir.sector, root_dir_data) 00170 00171 test_info.info("Added file to root directory") 00172 start = time.time() 00173 while time.time() - start < DISMOUNT_TIME_S: 00174 try: 00175 msd.scsi_read10(0, 1) 00176 except msd.SCSIError: 00177 test_info.info("Dismount detected") 00178 break 00179 else: 00180 test_info.failure("Device failed to dismount") 00181 00182 start = time.time() 00183 while time.time() - start < DISMOUNT_TIME_S: 00184 try: 00185 msd.scsi_read10(0, 1) 00186 test_info.info("Mount detected") 00187 break 00188 except msd.SCSIError: 00189 pass 00190 else: 00191 test_info.failure("Device failed to mount") 00192 00193 00194 def test_msd_stall(test_info, msd): 00195 """Test stalls coming at various times in the middle of MSD xfers""" 00196 fat = Fat(msd) 00197 root_dir = fat.root_dir 00198 dir_idx = root_dir.find_free_entry_index() 00199 root_dir[dir_idx]["DIR_Name"] = "REFRESH ACT" 00200 root_dir_data = root_dir.pack() 00201 00202 # Test that a write fails if media is removed after the CBW 00203 # stage but before the CSW stage 00204 msd.scsi_write10(root_dir.sector, root_dir_data) 00205 msd.delay_cbw_to_data = 1.0 00206 retval = msd.CSW_STATUS_PASSED 00207 try: 00208 msd.scsi_write10(0, bytearray(512)) 00209 test_info.failure("Device failed to stall data stage") 00210 except msd.SCSIError as error: 00211 retval = error.value 00212 msd.delay_cbw_to_data = 0 00213 # Make sure device still works as expected 00214 time.sleep(3) 00215 msd.scsi_read10(0, 1) 00216 msd.scsi_write10(0, bytearray(512)) 00217 if retval == msd.CSW_STATUS_FAILED: 00218 test_info.info("Test CBW,Stall,Data OUT - Pass") 00219 else: 00220 test_info.failure("Device returned wrong status") 00221 00222 # Test that a write succeeds even if media is removed 00223 # after the OUT stage but before the CSW stage 00224 msd.scsi_write10(root_dir.sector, root_dir_data) 00225 msd.delay_data_to_csw = 1.0 00226 msd.scsi_write10(0, bytearray(512)) 00227 msd.delay_data_to_csw = 0 00228 # Make sure device still works as expected 00229 time.sleep(3) 00230 msd.scsi_read10(0, 1) 00231 msd.scsi_write10(0, bytearray(512)) 00232 test_info.info("Test DATA OUT,Stall,CSW - Pass") 00233 00234 # Test that a read succeeds even if media is removed 00235 # after the IN stage but before the CSW stage 00236 msd.scsi_write10(root_dir.sector, root_dir_data) 00237 msd.delay_data_to_csw = 1.0 00238 resp = msd.scsi_read10(0, 1) 00239 assert len(resp) == 512 00240 msd.delay_data_to_csw = 0 00241 # Make sure device still works as expected 00242 time.sleep(3) 00243 msd.scsi_read10(0, 1) 00244 msd.scsi_write10(0, bytearray(512)) 00245 test_info.info("Test DATA IN,Stall,CSW - Pass") 00246 00247 # Test that a test unit ready succeeds even if media is removed 00248 # after the CBW stage but before the CSW stage 00249 msd.scsi_write10(root_dir.sector, root_dir_data) 00250 msd.delay_data_to_csw = 1.0 00251 resp = msd.scsi_test_unit_ready() 00252 msd.delay_data_to_csw = 0 00253 # Make sure device still works as expected 00254 time.sleep(3) 00255 msd.scsi_read10(0, 1) 00256 msd.scsi_write10(0, bytearray(512)) 00257 if resp == msd.CSW_STATUS_PASSED: 00258 test_info.info("Test CBW,Stall,CSW - Pass") 00259 else: 00260 test_info.failure("Test CBW,Stall,CSW - Failed") 00261 00262 # Test that a test unit ready succeeds even if media is removed 00263 # after the CBW stage but before the CSW stage 00264 msd.scsi_write10(root_dir.sector, root_dir_data) 00265 time.sleep(1.0) 00266 resp = msd.scsi_test_unit_ready() 00267 # Make sure device still works as expected 00268 time.sleep(3) 00269 msd.scsi_read10(0, 1) 00270 msd.scsi_write10(0, bytearray(512)) 00271 if resp == msd.CSW_STATUS_FAILED: 00272 test_info.info("Test CBW,Stall,CSW - Pass") 00273 else: 00274 test_info.failure("Test CBW,Stall,CSW - Failed") 00275 00276 00277 def test_control(test_info, dev, cdc, hid, msd): 00278 """Test for the control endpoint""" 00279 00280 test_info.info("testing control transfer with size a multiple of 256") 00281 request_type = 0x80 00282 request = 0x06 # Get descriptor 00283 value = 0x200 # Configuration descriptor 00284 index = 0 # Always 0 for this request 00285 resp = dev.ctrl_transfer(request_type, request, value, index, 256) 00286 assert len(resp) > 0 00287 00288 test_info.info("testing control commands") 00289 # Test various patterns of control transfers 00290 # 00291 # Some devices have had problems with back-to-back 00292 # control transfers. Intentionally send these sequences 00293 # to make sure they are properly handled. 00294 for _ in range(100): 00295 # Control transfer with a data in stage 00296 cdc.get_line_coding() 00297 for _ in range(100): 00298 # Control transfer with a data out stage followed 00299 # by a control transfer with a data in stage 00300 cdc.set_line_coding(115200) 00301 cdc.get_line_coding() 00302 for _ in range(100): 00303 # Control transfer with a data out stage 00304 cdc.set_line_coding(115200) 00305 00306 test_info.info("testing endpoint clearing") 00307 00308 cdc.ep_data_out.clear_halt() 00309 cdc.ep_data_out.write('') # DATA0 00310 cdc.ep_data_out.clear_halt() 00311 cdc.ep_data_out.write('') # DATA0 00312 00313 cdc.ep_data_out.clear_halt() 00314 cdc.ep_data_out.write('') # DATA 0 00315 cdc.ep_data_out.write('') # DATA 1 00316 cdc.ep_data_out.clear_halt() 00317 cdc.ep_data_out.write('') # DATA 0 00318 00319 00320 def test_all(test_info, cdc, hid, msd): 00321 """Test all endpoints in parallel""" 00322 mutex = threading.RLock() 00323 terminate = False 00324 error_msg_list = [] 00325 00326 def _safe_print(message): 00327 """Thread safe wrapper to print messages""" 00328 with mutex: 00329 print(message) 00330 00331 def _test_msd(): 00332 """MSD thread entry point for parallel testing""" 00333 try: 00334 _safe_print("msd started") 00335 msd_data = msd.scsi_read10(100, 1) 00336 while not terminate: 00337 #msd_data = 'x' * 1024 * 16 # 16KB 00338 msd.scsi_write10(100, msd_data) 00339 _safe_print("msd end") 00340 except: 00341 error_msg_list.append("MSD test failed") 00342 raise 00343 00344 def _test_cdc(): 00345 """CDC thread entry point for parallel testing""" 00346 try: 00347 _safe_print("cdc started") 00348 while not terminate: 00349 cdc.set_line_coding(115200) 00350 cdc.get_line_coding() 00351 cdc.send_break(cdc.SEND_BREAK_ON) 00352 cdc.send_break(cdc.SEND_BREAK_OFF) 00353 cdc.read(1024) 00354 cdc.write("Hello world") 00355 cdc.read(1024) 00356 _safe_print("cdc end") 00357 except: 00358 error_msg_list.append("CDC test failed") 00359 raise 00360 00361 def _test_hid(): 00362 """HID thread entry point for parallel testing""" 00363 try: 00364 _safe_print("hid started") 00365 data = bytearray(64) 00366 data[0] = 0x80 00367 while not terminate: 00368 hid.set_report(data) 00369 resp = hid.get_report(64) 00370 assert resp[0] == 0x80 00371 _safe_print("hid end") 00372 except: 00373 error_msg_list.append("HID test failed") 00374 raise 00375 00376 thread_list = [] 00377 for function in (_test_msd, _test_cdc, _test_hid): 00378 thread = threading.Thread(target=function) 00379 thread.start() 00380 thread_list.append(thread) 00381 00382 time.sleep(10) 00383 00384 terminate = True 00385 for thread in thread_list: 00386 thread.join() 00387 00388 for error in error_msg_list: 00389 test_info.failure(error) 00390 00391 00392 def _daplink_match(dev): 00393 """DAPLink match function to be used with usb.core.find""" 00394 try: 00395 device_string = dev.product 00396 except ValueError: 00397 return False 00398 if device_string is None: 00399 return False 00400 if device_string.find("CMSIS-DAP") < 0: 00401 return False 00402 return True 00403 00404 00405 def _daplink_from_serial_number(serial_number): 00406 """Return a usb handle to the DAPLink device with the serial number""" 00407 dev_list = usb.core.find(find_all=True, custom_match=_daplink_match) 00408 for dev in dev_list: 00409 if dev.serial_number == serial_number: 00410 return dev 00411 return None 00412 00413 00414 def _platform_supports_usb_test(): 00415 """Return True if this platform supports USB testing, False otherwise""" 00416 if os.name != "posix": 00417 return False 00418 if not hasattr(os, 'uname'): 00419 if False: 00420 # Hack to supress warnings for uname not existing 00421 os.uname = lambda: [None] 00422 return False 00423 if os.uname()[0] == "Darwin": 00424 return False 00425 return True 00426 00427 00428 def _set_usb_test_mode(hid, enabled): 00429 """Set to True to enable USB testing mode""" 00430 data = bytearray(64) 00431 data[0] = 0x88 00432 data[1] = 1 if enabled else 0 00433 hid.set_report(data) 00434 resp = hid.get_report(64) 00435 if (resp[0] != 0x88) or (resp[1] != 1): 00436 raise Exception("Error configuring USB test mode") 00437 00438 00439 if __name__ == "__main__": 00440 main()
Generated on Tue Jul 12 2022 15:37:26 by
1.7.2