Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
mbed.py
00001 # ----------------------------------------------------------------------- 00002 # Copyright (c) 2016 ARM Limited. All rights reserved. 00003 # SPDX-License-Identifier: Apache-2.0 00004 # Licensed under the Apache License, Version 2.0 (the License); you may 00005 # not use this file except in compliance with the License. 00006 # You may obtain a copy of the License at 00007 # 00008 # http://www.apache.org/licenses/LICENSE-2.0 00009 # 00010 # Unless required by applicable law or agreed to in writing, software 00011 # distributed under the License is distributed on an AS IS BASIS, WITHOUT 00012 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 # See the License for the specific language governing permissions and 00014 # limitations under the License. 00015 # ----------------------------------------------------------------------- 00016 00017 import os 00018 import shutil 00019 from time import sleep, strftime 00020 import serial 00021 import re 00022 import sys 00023 00024 00025 class MbedDeviceManager: 00026 def __init__(self): 00027 try: 00028 import mbed_lstools 00029 except ImportError, e: 00030 print("Error: Can't import 'mbed_lstools' module: %s"% e) 00031 mbed = mbed_lstools.create() 00032 self.mbed_list = mbed.list_mbeds() 00033 for dev in self.mbed_list: 00034 dev['Available'] = True 00035 # part. structure:{'mount_point': 'D:','serial_port': u'COM3','platform_name': 'LPC1768','Available'=True} 00036 00037 def dump_all_devices(self): 00038 print("dump Devices:") 00039 print(self.mbed_list) 00040 00041 00042 def get_device(self, platform_type): 00043 found = False 00044 index = int(-1) 00045 mount_point = None 00046 serial_port = None 00047 print(platform_type) 00048 for idx,dev in enumerate(self.mbed_list): 00049 if dev['platform_name'] == platform_type: 00050 found = True 00051 if dev['platform_name'] == platform_type and dev['Available']: 00052 mount_point = dev['mount_point'] 00053 serial_port = dev['serial_port'] 00054 print('Detected %s. Mount point %s Serial port %s index %s.' 00055 % (platform_type, mount_point, serial_port, idx)) 00056 dev['Available'] = False 00057 return idx, mount_point, serial_port 00058 if found: 00059 print('Device %s is not available already in use.' %platform_type) 00060 else: 00061 print('Device %s is not found.' %platform_type) 00062 return index, mount_point, serial_port 00063 00064 def free_device(self, index): 00065 dev = self.mbed_list[index] 00066 if not dev['Available']: 00067 dev['Available'] = True 00068 print('Release Device %s : %s .' % (index, dev['platform_name'])) 00069 else: 00070 print('Error: Device %s : %s is already free.' % (index, dev['platform_name']), ) 00071 00072 mbed_manager = MbedDeviceManager() 00073 00074 class MBED: 00075 def __init__(self,platform_type): 00076 self.manager = mbed_manager 00077 self.platform_type = platform_type 00078 self.mount_point = None 00079 self.serial_port = None 00080 self.index = -1 00081 self.running = False 00082 00083 def detect(self): 00084 self.index, self.mount_point, self.serial_port = self.manager.get_device(self.platform_type) 00085 if self.index >= 0: 00086 return True 00087 else: 00088 return False 00089 00090 def generic_mbed_copy_win(self, source, destination): 00091 00092 00093 from win32com.shell import shell, shellcon 00094 import pythoncom 00095 from os.path import abspath, join 00096 from glob import glob 00097 00098 for f in glob(join(destination, '*.bin')): 00099 os.unlink(f) 00100 00101 src = shell.SHCreateItemFromParsingName(source, None, shell.IID_IShellItem) 00102 dest_dir = shell.SHCreateItemFromParsingName( 00103 abspath(destination), 00104 None, 00105 shell.IID_IShellItem 00106 ) 00107 pfo = pythoncom.CoCreateInstance( 00108 shell.CLSID_FileOperation, 00109 None, 00110 pythoncom.CLSCTX_ALL, 00111 shell.IID_IFileOperation 00112 ) 00113 pfo.SetOperationFlags(shellcon.FOF_NO_UI) 00114 pfo.CopyItem(src, dest_dir, None, None) 00115 pfo.PerformOperations() 00116 00117 return True 00118 00119 def install_bin(self,bin_file_name): 00120 bin_on_mbed = os.path.join(self.mount_point,os.path.basename(bin_file_name)) 00121 print('%s Copying %s --> %s' %(strftime('%H:%M:%S'),bin_file_name,bin_on_mbed)) 00122 if 'win' in sys.platform: 00123 self.generic_mbed_copy_win(os.path.abspath(bin_file_name), os.path.abspath(self.mount_point)) 00124 else: 00125 shutil.copyfile(bin_file_name,bin_on_mbed) 00126 self.wait_for_file_system() 00127 print('%s Bin installation complete' %strftime('%H:%M:%S')) 00128 00129 def run(self,serial_capture_file_name,baud=9600,read_timeout=10,stop_on_serial_read_timeout=False): 00130 if serial_capture_file_name is not None: 00131 self.stop_on_serial_read_timeout = stop_on_serial_read_timeout 00132 from multiprocessing import Process, Event, Pipe 00133 self.event = Event() 00134 parent_conn, child_conn = Pipe() 00135 print('Start serial capture process') 00136 process_args=(self.serial_port,baud,read_timeout,serial_capture_file_name,stop_on_serial_read_timeout,self.event,child_conn) 00137 self.capture_process = Process(target=capture_serial,args=process_args) 00138 self.capture_process.start() 00139 print('Waiting for pipe input from subprocess') 00140 self.ip,self.port = parent_conn.recv() 00141 if not self.port or not self.ip : 00142 return 1 00143 00144 print('Received IP %s port %s'%(self.ip,self.port)) 00145 else: 00146 print('Running without serial capture') 00147 self.ip,self.port = run_no_capture(self.serial_port,baud,read_timeout) 00148 print('%s IP %s port %s' % (self.platform_type, self.ip, self.port)) 00149 self.running = True 00150 00151 def end_run(self, delete_binaries=True, release_manager=True): 00152 print('MBED end_run') 00153 if self.running: 00154 if not self.stop_on_serial_read_timeout: 00155 self.event.set() # the thread does not stop on its own. send stop signal 00156 print('MBED end_run waiting for subprocess to terminate') 00157 self.capture_process.join() # wait for completion 00158 print('MBED end_run subprocess terminated') 00159 if delete_binaries: 00160 print('MBED end_run deleting binaries') 00161 # delete all binaries on MBED 00162 filelist = [ f for f in os.listdir(self.mount_point) if f.endswith(".bin") ] 00163 for f in filelist: 00164 print('MBED end_run delete %s',f) 00165 os.remove(os.path.join(self.mount_point,f)) 00166 self.wait_for_file_system() 00167 print('MBED end_run binary delete completed') 00168 self.running = False 00169 if release_manager: 00170 self.manager.free_device(self.index) 00171 00172 def run_and_capture_till_timeout(self,serial_capture_file_name,baud=9600,read_timeout=10,endOfData=None): 00173 try: 00174 print('[%s run_and_capture_till_timeout] Start' %strftime('%H:%M:%S')) 00175 ser = serial.Serial(self.serial_port,baudrate=baud,timeout=read_timeout) 00176 if ser == None: 00177 print(' serial.Serial returned None..') 00178 capture_file = open(serial_capture_file_name,'w') 00179 read_size = 1000 00180 cont = True 00181 print('[%s run_and_capture_till_timeout] Reseting device..' %strftime('%H:%M:%S')) 00182 c = reset_mbed(ser) 00183 if c == None: 00184 cont = False 00185 else: 00186 capture_file.write(c) 00187 print('[%s run_and_capture_till_timeout] capturing to file...' %strftime('%H:%M:%S')) 00188 while cont: 00189 c = ser.read(read_size) 00190 # Look for the end of test data string and terminate if it is found. 00191 if endOfData != None: 00192 endPos = c.find(endOfData) 00193 # Clip off the termination string and anything afterwards 00194 if endPos != -1: 00195 c = c[:(endPos + len(endOfData))] 00196 capture_file.write(c) 00197 if endPos != -1: 00198 break 00199 if len(c) < read_size: 00200 print('[%s run_and_capture_till_timeout] serial read timeout. Stopping subprocess' %strftime('%H:%M:%S')) 00201 print ("exit last Buffsize " + str(len(c)) + "<32" ) 00202 cont = False 00203 print('[%s run_and_capture_till_timeout] closing capture file' %strftime('%H:%M:%S')) 00204 capture_file.close() 00205 print('[%s run_and_capture_till_timeout] closing serial port' %strftime('%H:%M:%S')) 00206 ser.flushInput() 00207 ser.flushOutput() 00208 ser.close() 00209 print('[%s run_and_capture_till_timeout] done' %strftime('%H:%M:%S')) 00210 return True 00211 except serial.SerialException as e: 00212 print('[run_and_capture_till_timeout] serial exception',e) 00213 return False 00214 00215 def wait_for_file_system(self): 00216 #sleep(25) #MBED file system takes some 'settling' time after it is wrirtten to 00217 sleep(3) #MBED file system takes some 'settling' time after it is wrirtten to 00218 00219 def reset_mbed(ser): 00220 for loop in range(5): 00221 # reset called after open port 00222 # add sleep for port ready (we saw that sometimes read failed...) 00223 delay = 5 00224 print('[%s reset_mbed] loop=%d , delay=%d' %(strftime('%H:%M:%S'), loop, delay)) 00225 sleep(delay) 00226 try: 00227 ser.sendBreak() 00228 except Exception: 00229 # In linux a termios.error is raised in sendBreak and in 00230 # setBreak. The following setBreak() is needed to release 00231 # the reset signal on the target mcu. 00232 try: 00233 ser.setBreak(False) 00234 except: 00235 pass 00236 c = ser.read(1) 00237 if len(c) == 1: 00238 return c 00239 print ("Error reading from serial port" ) 00240 return None 00241 00242 def is_valid_ip(ip): 00243 return re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$",ip) 00244 00245 def capture_serial(port,baud,read_timeout,capture_file_name,stop_on_serial_read_timeout,event,pipe_conn): 00246 try: 00247 print('[capture_serial subprocess] starting..') 00248 ser = serial.Serial(port,baudrate=baud,timeout=read_timeout) 00249 capture_file = open(capture_file_name,'w') 00250 read_size = 32 00251 cont = True 00252 print('[capture_serial subprocess] Reseting device..') 00253 c = reset_mbed(ser) 00254 if c == None: 00255 print('[capture_serial subprocess] Reseting device failed') 00256 00257 if c == None: 00258 cont = False 00259 print('[capture_serial subprocess] Failed to get IP from device... exiting ') 00260 pipe_conn.send([None,None]) # send IP and port 00261 pipe_conn.close() 00262 capture_file.close() 00263 ser.flushInput() 00264 ser.flushOutput() 00265 ser.close() 00266 return 00267 else: 00268 # parse the first received string for IP and port 00269 ip_str = c.split(':') 00270 if is_valid_ip(ip_str[1]) : 00271 pipe_conn.send([ip_str[1],ip_str[2]]) # send IP and port 00272 print('[capture_serial subprocess] Sending IP to main process %s:%s'%(ip_str[1],ip_str[2])) 00273 print('[capture_serial subprocess] capturing to file...') 00274 else: 00275 print('[capture_serial subprocess] No valid IP address') 00276 pipe_conn.send([None,None]) # send IP and port 00277 cont = False 00278 capture_file.write(c) 00279 pipe_conn.close() 00280 00281 while cont: 00282 c = ser.read(read_size) 00283 capture_file.write(c) 00284 if stop_on_serial_read_timeout: 00285 if len(c) < read_size: 00286 print('[capture_serial subprocess] serial read timeout. Stopping subprocess') 00287 cont = False 00288 else: 00289 if event.is_set(): 00290 print('[capture_serial subprocess] event is set. Stopping subprocess') 00291 cont = False 00292 00293 print('[capture_serial subprocess] closing capture file') 00294 capture_file.close() 00295 print('[capture_serial subprocess] closing serial port') 00296 ser.flushInput() 00297 ser.flushOutput() 00298 ser.close() 00299 print('[capture_serial subprocess] Subprocess exiting') 00300 except serial.SerialException as e: 00301 print('[capture_serial subprocess] serial exception',e) 00302 00303 def run_no_capture(port,baud,read_timeout): 00304 try: 00305 ser = serial.Serial(port,baudrate=baud,timeout=read_timeout) 00306 ip = None 00307 port = None 00308 c = reset_mbed(ser) 00309 if c: 00310 # parse the first received string for IP and port 00311 ip_str = c.split(':') 00312 if is_valid_ip(ip_str[1]) != None: 00313 ip = ip_str[1] 00314 port = ip_str[2] 00315 ser.flushInput() 00316 ser.flushOutput() 00317 ser.close() 00318 return ip,port 00319 except serial.SerialException as e: 00320 print e 00321 00322 00323 class MBED_DEVICE(MBED): 00324 def __init__(self,platform_type): 00325 print('MBED Device:' + platform_type) 00326 MBED.__init__(self, platform_type) 00327 00328
Generated on Tue Jul 12 2022 21:20:28 by
1.7.2