Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
cdc_stress_test.py
00001 # 00002 # DAPLink Interface Firmware 00003 # Copyright (c) 2016-2017, ARM Limited, All Rights Reserved 00004 # SPDX-License-Identifier: Apache-2.0 00005 # 00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may 00007 # not use this file except in compliance with the License. 00008 # You may obtain a copy of the License at 00009 # 00010 # http://www.apache.org/licenses/LICENSE-2.0 00011 # 00012 # Unless required by applicable law or agreed to in writing, software 00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 # See the License for the specific language governing permissions and 00016 # limitations under the License. 00017 # 00018 00019 import mbed_lstools 00020 import threading 00021 import time 00022 import serial 00023 00024 should_exit = False 00025 exit_cond = threading.Condition() 00026 print_mut = threading.RLock() 00027 global_start_time = time.time() 00028 00029 00030 def _get_time(): 00031 return time.time() - global_start_time 00032 00033 00034 def sync_print(msg): 00035 with print_mut: 00036 print(msg) 00037 00038 00039 def cdc_throughput_main(thread_index, serial_port): 00040 global should_exit 00041 ser = None 00042 try: 00043 count = 0 00044 while not should_exit: 00045 ser = serial.Serial(serial_port) # open serial port 00046 ser.baudrate = 115200 00047 ser.write("this is test data") 00048 ser.baudrate = 9600 00049 ser.write("more test data") 00050 ser.close() 00051 00052 if count % 10 == 0: 00053 sync_print("Thread %i on loop %10i at %.6f - %s - port %s" % 00054 (thread_index, count, _get_time(), 00055 time.strftime("%H:%M:%S"), serial_port)) 00056 count += 1 00057 00058 except: 00059 sync_print("Error on thread %i serial port %s" % 00060 (thread_index, serial_port)) 00061 if ser is not None: 00062 ser.close() 00063 with exit_cond: 00064 should_exit = 1 00065 exit_cond.notify_all() 00066 raise 00067 00068 00069 def main(): 00070 global should_exit 00071 lstools = mbed_lstools.create() 00072 mbed_list = lstools.list_mbeds() 00073 for thread_index, mbed in enumerate(mbed_list): 00074 cdc_thread = threading.Thread(target=cdc_throughput_main, 00075 args=(thread_index, mbed['serial_port'])) 00076 cdc_thread.start() 00077 00078 try: 00079 with exit_cond: 00080 while not should_exit: 00081 exit_cond.wait(1) 00082 except KeyboardInterrupt: 00083 pass 00084 should_exit = True 00085 00086 sync_print("Exiting") 00087 00088 00089 if __name__ == "__main__": 00090 main()
Generated on Tue Jul 12 2022 15:37:13 by
