Arrow / Mbed OS DAPLink Reset
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers msd_remount_test.py Source File

msd_remount_test.py

00001 #
00002 # DAPLink Interface Firmware
00003 # Copyright (c) 2016-2017, ARM Limited, All Rights Reserved
00004 # SPDX-License-Identifier: Apache-2.0
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License"); you may
00007 # not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 # http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
00014 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 #
00018 
00019 import mbed_lstools
00020 import threading
00021 import time
00022 import os
00023 import sys
00024 
00025 
00026 # This prevents the following error message from getting
00027 # displayed on windows if the mbed dismounts unexpectedly
00028 # during a transfer:
00029 #   There is no disk in the drive. Please insert a disk into
00030 #   drive \Device<Harddiskx><rdrive>
00031 def disable_popup():
00032     if sys.platform.startswith("win"):
00033         # pylint: disable=invalid-name
00034         import ctypes
00035         SEM_FAILCRITICALERRORS = 1
00036         GetErrorMode = \
00037             ctypes.windll.kernel32.GetErrorMode  # @UndefinedVariable
00038         GetErrorMode.restype = ctypes.c_uint
00039         GetErrorMode.argtypes = []
00040         SetErrorMode = \
00041             ctypes.windll.kernel32.SetErrorMode  # @UndefinedVariable
00042         SetErrorMode.restype = ctypes.c_uint
00043         SetErrorMode.argtypes = [ctypes.c_uint]
00044 
00045         err_mode = GetErrorMode()
00046         err_mode |= SEM_FAILCRITICALERRORS
00047         SetErrorMode(err_mode)
00048 
00049 MAX_REMOUNT_TIME = 5 * 60
00050 should_exit = False
00051 exit_cond = threading.Condition()
00052 print_mut = threading.RLock()
00053 global_start_time = time.time()
00054 
00055 
00056 class ExitException(Exception):
00057     pass
00058 
00059 
00060 def _get_time():
00061     return time.time() - global_start_time
00062 
00063 
00064 def sync_print(msg):
00065     with print_mut:
00066         print(msg)
00067 
00068 
00069 def get_mount_point(board_id):
00070     lstools = mbed_lstools.create()
00071     mbed_list = lstools.list_mbeds()
00072     for mbed in mbed_list:
00073         if mbed['target_id'] == board_id:
00074             return mbed['mount_point']
00075     else:
00076         Exception("Board %s not found" % board_id)
00077 
00078 
00079 def msd_remount_main(thread_index, board_id):
00080     global should_exit
00081     try:
00082         mount_point = get_mount_point(board_id)
00083         while True:
00084             if should_exit:
00085                 raise ExitException()
00086 
00087             # Trigger a remount
00088             sync_print("Triggering remount for %i %s - %s at %.6f - %s" %
00089                        (thread_index, mount_point, board_id, _get_time(),
00090                         time.strftime("%H:%M:%S")))
00091             file_path = mount_point + "/" + "refresh.act"
00092             with open(file_path, "wb") as _:
00093                 pass
00094 
00095             # Wait for board to dismount
00096             start_time = time.time()
00097             while os.path.exists(mount_point):
00098                 if should_exit:
00099                     raise ExitException()
00100                 if time.time() - start_time > MAX_REMOUNT_TIME:
00101                     raise Exception("Board remount timed out")
00102                 time.sleep(0.1)
00103             sync_print("Drive %s dismount" % mount_point)
00104 
00105             # Wait for drive to come back
00106             mount_point = None
00107             start_time = time.time()
00108             while mount_point is None:
00109                 if should_exit:
00110                     raise ExitException()
00111                 mount_point = get_mount_point(board_id)
00112                 if time.time() - start_time > MAX_REMOUNT_TIME:
00113                     raise Exception("Board remount timed out")
00114                 time.sleep(0.1)
00115             assert os.path.exists(mount_point)
00116 
00117             sync_print("Remount complete as %s" % mount_point)
00118     except ExitException:
00119         pass
00120     except:
00121         sync_print("Thread %i exception board %s" % (thread_index, board_id))
00122         with exit_cond:
00123             should_exit = 1
00124             exit_cond.notify_all()
00125         raise
00126 
00127 
00128 def main():
00129     global should_exit
00130     disable_popup()
00131     lstools = mbed_lstools.create()
00132     mbed_list = lstools.list_mbeds()
00133     for thread_index, mbed in enumerate(mbed_list):
00134         msd_thread = threading.Thread(target=msd_remount_main,
00135                                       args=(thread_index, mbed['target_id']))
00136         msd_thread.start()
00137 
00138     try:
00139         with exit_cond:
00140             while not should_exit:
00141                 exit_cond.wait(1)
00142     except KeyboardInterrupt:
00143         pass
00144     should_exit = True
00145 
00146     sync_print("Exiting")
00147 
00148 if __name__ == "__main__":
00149     main()