Arrow / Mbed OS DAPLink Reset
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers cdc_stress_test.py Source File

cdc_stress_test.py

00001 #
00002 # DAPLink Interface Firmware
00003 # Copyright (c) 2016-2017, ARM Limited, All Rights Reserved
00004 # SPDX-License-Identifier: Apache-2.0
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00007 # not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 # http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 #
00018 
00019 import mbed_lstools
00020 import threading
00021 import time
00022 import serial
00023 
00024 should_exit = False
00025 exit_cond = threading.Condition()
00026 print_mut = threading.RLock()
00027 global_start_time = time.time()
00028 
00029 
00030 def _get_time():
00031     return time.time() - global_start_time
00032 
00033 
00034 def sync_print(msg):
00035     with print_mut:
00036         print(msg)
00037 
00038 
00039 def cdc_throughput_main(thread_index, serial_port):
00040     global should_exit
00041     ser = None
00042     try:
00043         count = 0
00044         while not should_exit:
00045             ser = serial.Serial(serial_port)  # open serial port
00046             ser.baudrate = 115200
00047             ser.write("this is test data")
00048             ser.baudrate = 9600
00049             ser.write("more test data")
00050             ser.close()
00051 
00052             if count % 10 == 0:
00053                 sync_print("Thread %i on loop %10i at %.6f - %s - port %s" %
00054                            (thread_index, count, _get_time(),
00055                             time.strftime("%H:%M:%S"), serial_port))
00056             count += 1
00057 
00058     except:
00059         sync_print("Error on thread %i serial port %s" %
00060                    (thread_index, serial_port))
00061         if ser is not None:
00062             ser.close()
00063         with exit_cond:
00064             should_exit = 1
00065             exit_cond.notify_all()
00066         raise
00067 
00068 
00069 def main():
00070     global should_exit
00071     lstools = mbed_lstools.create()
00072     mbed_list = lstools.list_mbeds()
00073     for thread_index, mbed in enumerate(mbed_list):
00074         cdc_thread = threading.Thread(target=cdc_throughput_main,
00075                                       args=(thread_index, mbed['serial_port']))
00076         cdc_thread.start()
00077 
00078     try:
00079         with exit_cond:
00080             while not should_exit:
00081                 exit_cond.wait(1)
00082     except KeyboardInterrupt:
00083         pass
00084     should_exit = True
00085 
00086     sync_print("Exiting")
00087 
00088 
00089 if __name__ == "__main__":
00090     main()