Ram Gandikota
/
IOTMetronome
FRDM K64F Metronome
Diff: pal/Test/Scripts/mbed.py
- Revision:
- 0:dbad57390bd1
diff -r 000000000000 -r dbad57390bd1 pal/Test/Scripts/mbed.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pal/Test/Scripts/mbed.py Sun May 14 18:37:05 2017 +0000 @@ -0,0 +1,328 @@ +# ----------------------------------------------------------------------- +# Copyright (c) 2016 ARM Limited. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# Licensed under the Apache License, Version 2.0 (the License); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ----------------------------------------------------------------------- + +import os +import shutil +from time import sleep, strftime +import serial +import re +import sys + + +class MbedDeviceManager: + def __init__(self): + try: + import mbed_lstools + except ImportError, e: + print("Error: Can't import 'mbed_lstools' module: %s"% e) + mbed = mbed_lstools.create() + self.mbed_list = mbed.list_mbeds() + for dev in self.mbed_list: + dev['Available'] = True + # part. structure:{'mount_point': 'D:','serial_port': u'COM3','platform_name': 'LPC1768','Available'=True} + + def dump_all_devices(self): + print("dump Devices:") + print(self.mbed_list) + + + def get_device(self, platform_type): + found = False + index = int(-1) + mount_point = None + serial_port = None + print(platform_type) + for idx,dev in enumerate(self.mbed_list): + if dev['platform_name'] == platform_type: + found = True + if dev['platform_name'] == platform_type and dev['Available']: + mount_point = dev['mount_point'] + serial_port = dev['serial_port'] + print('Detected %s. Mount point %s Serial port %s index %s.' + % (platform_type, mount_point, serial_port, idx)) + dev['Available'] = False + return idx, mount_point, serial_port + if found: + print('Device %s is not available already in use.' %platform_type) + else: + print('Device %s is not found.' %platform_type) + return index, mount_point, serial_port + + def free_device(self, index): + dev = self.mbed_list[index] + if not dev['Available']: + dev['Available'] = True + print('Release Device %s : %s .' % (index, dev['platform_name'])) + else: + print('Error: Device %s : %s is already free.' % (index, dev['platform_name']), ) + +mbed_manager = MbedDeviceManager() + +class MBED: + def __init__(self,platform_type): + self.manager = mbed_manager + self.platform_type = platform_type + self.mount_point = None + self.serial_port = None + self.index = -1 + self.running = False + + def detect(self): + self.index, self.mount_point, self.serial_port = self.manager.get_device(self.platform_type) + if self.index >= 0: + return True + else: + return False + + def generic_mbed_copy_win(self, source, destination): + + + from win32com.shell import shell, shellcon + import pythoncom + from os.path import abspath, join + from glob import glob + + for f in glob(join(destination, '*.bin')): + os.unlink(f) + + src = shell.SHCreateItemFromParsingName(source, None, shell.IID_IShellItem) + dest_dir = shell.SHCreateItemFromParsingName( + abspath(destination), + None, + shell.IID_IShellItem + ) + pfo = pythoncom.CoCreateInstance( + shell.CLSID_FileOperation, + None, + pythoncom.CLSCTX_ALL, + shell.IID_IFileOperation + ) + pfo.SetOperationFlags(shellcon.FOF_NO_UI) + pfo.CopyItem(src, dest_dir, None, None) + pfo.PerformOperations() + + return True + + def install_bin(self,bin_file_name): + bin_on_mbed = os.path.join(self.mount_point,os.path.basename(bin_file_name)) + print('%s Copying %s --> %s' %(strftime('%H:%M:%S'),bin_file_name,bin_on_mbed)) + if 'win' in sys.platform: + self.generic_mbed_copy_win(os.path.abspath(bin_file_name), os.path.abspath(self.mount_point)) + else: + shutil.copyfile(bin_file_name,bin_on_mbed) + self.wait_for_file_system() + print('%s Bin installation complete' %strftime('%H:%M:%S')) + + def run(self,serial_capture_file_name,baud=9600,read_timeout=10,stop_on_serial_read_timeout=False): + if serial_capture_file_name is not None: + self.stop_on_serial_read_timeout = stop_on_serial_read_timeout + from multiprocessing import Process, Event, Pipe + self.event = Event() + parent_conn, child_conn = Pipe() + print('Start serial capture process') + process_args=(self.serial_port,baud,read_timeout,serial_capture_file_name,stop_on_serial_read_timeout,self.event,child_conn) + self.capture_process = Process(target=capture_serial,args=process_args) + self.capture_process.start() + print('Waiting for pipe input from subprocess') + self.ip,self.port = parent_conn.recv() + if not self.port or not self.ip : + return 1 + + print('Received IP %s port %s'%(self.ip,self.port)) + else: + print('Running without serial capture') + self.ip,self.port = run_no_capture(self.serial_port,baud,read_timeout) + print('%s IP %s port %s' % (self.platform_type, self.ip, self.port)) + self.running = True + + def end_run(self, delete_binaries=True, release_manager=True): + print('MBED end_run') + if self.running: + if not self.stop_on_serial_read_timeout: + self.event.set() # the thread does not stop on its own. send stop signal + print('MBED end_run waiting for subprocess to terminate') + self.capture_process.join() # wait for completion + print('MBED end_run subprocess terminated') + if delete_binaries: + print('MBED end_run deleting binaries') + # delete all binaries on MBED + filelist = [ f for f in os.listdir(self.mount_point) if f.endswith(".bin") ] + for f in filelist: + print('MBED end_run delete %s',f) + os.remove(os.path.join(self.mount_point,f)) + self.wait_for_file_system() + print('MBED end_run binary delete completed') + self.running = False + if release_manager: + self.manager.free_device(self.index) + + def run_and_capture_till_timeout(self,serial_capture_file_name,baud=9600,read_timeout=10,endOfData=None): + try: + print('[%s run_and_capture_till_timeout] Start' %strftime('%H:%M:%S')) + ser = serial.Serial(self.serial_port,baudrate=baud,timeout=read_timeout) + if ser == None: + print(' serial.Serial returned None..') + capture_file = open(serial_capture_file_name,'w') + read_size = 1000 + cont = True + print('[%s run_and_capture_till_timeout] Reseting device..' %strftime('%H:%M:%S')) + c = reset_mbed(ser) + if c == None: + cont = False + else: + capture_file.write(c) + print('[%s run_and_capture_till_timeout] capturing to file...' %strftime('%H:%M:%S')) + while cont: + c = ser.read(read_size) + # Look for the end of test data string and terminate if it is found. + if endOfData != None: + endPos = c.find(endOfData) + # Clip off the termination string and anything afterwards + if endPos != -1: + c = c[:(endPos + len(endOfData))] + capture_file.write(c) + if endPos != -1: + break + if len(c) < read_size: + print('[%s run_and_capture_till_timeout] serial read timeout. Stopping subprocess' %strftime('%H:%M:%S')) + print ("exit last Buffsize " + str(len(c)) + "<32" ) + cont = False + print('[%s run_and_capture_till_timeout] closing capture file' %strftime('%H:%M:%S')) + capture_file.close() + print('[%s run_and_capture_till_timeout] closing serial port' %strftime('%H:%M:%S')) + ser.flushInput() + ser.flushOutput() + ser.close() + print('[%s run_and_capture_till_timeout] done' %strftime('%H:%M:%S')) + return True + except serial.SerialException as e: + print('[run_and_capture_till_timeout] serial exception',e) + return False + + def wait_for_file_system(self): + #sleep(25) #MBED file system takes some 'settling' time after it is wrirtten to + sleep(3) #MBED file system takes some 'settling' time after it is wrirtten to + +def reset_mbed(ser): + for loop in range(5): + # reset called after open port + # add sleep for port ready (we saw that sometimes read failed...) + delay = 5 + print('[%s reset_mbed] loop=%d , delay=%d' %(strftime('%H:%M:%S'), loop, delay)) + sleep(delay) + try: + ser.sendBreak() + except Exception: + # In linux a termios.error is raised in sendBreak and in + # setBreak. The following setBreak() is needed to release + # the reset signal on the target mcu. + try: + ser.setBreak(False) + except: + pass + c = ser.read(1) + if len(c) == 1: + return c + print ("Error reading from serial port" ) + return None + +def is_valid_ip(ip): + return re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$",ip) + +def capture_serial(port,baud,read_timeout,capture_file_name,stop_on_serial_read_timeout,event,pipe_conn): + try: + print('[capture_serial subprocess] starting..') + ser = serial.Serial(port,baudrate=baud,timeout=read_timeout) + capture_file = open(capture_file_name,'w') + read_size = 32 + cont = True + print('[capture_serial subprocess] Reseting device..') + c = reset_mbed(ser) + if c == None: + print('[capture_serial subprocess] Reseting device failed') + + if c == None: + cont = False + print('[capture_serial subprocess] Failed to get IP from device... exiting ') + pipe_conn.send([None,None]) # send IP and port + pipe_conn.close() + capture_file.close() + ser.flushInput() + ser.flushOutput() + ser.close() + return + else: + # parse the first received string for IP and port + ip_str = c.split(':') + if is_valid_ip(ip_str[1]) : + pipe_conn.send([ip_str[1],ip_str[2]]) # send IP and port + print('[capture_serial subprocess] Sending IP to main process %s:%s'%(ip_str[1],ip_str[2])) + print('[capture_serial subprocess] capturing to file...') + else: + print('[capture_serial subprocess] No valid IP address') + pipe_conn.send([None,None]) # send IP and port + cont = False + capture_file.write(c) + pipe_conn.close() + + while cont: + c = ser.read(read_size) + capture_file.write(c) + if stop_on_serial_read_timeout: + if len(c) < read_size: + print('[capture_serial subprocess] serial read timeout. Stopping subprocess') + cont = False + else: + if event.is_set(): + print('[capture_serial subprocess] event is set. Stopping subprocess') + cont = False + + print('[capture_serial subprocess] closing capture file') + capture_file.close() + print('[capture_serial subprocess] closing serial port') + ser.flushInput() + ser.flushOutput() + ser.close() + print('[capture_serial subprocess] Subprocess exiting') + except serial.SerialException as e: + print('[capture_serial subprocess] serial exception',e) + +def run_no_capture(port,baud,read_timeout): + try: + ser = serial.Serial(port,baudrate=baud,timeout=read_timeout) + ip = None + port = None + c = reset_mbed(ser) + if c: + # parse the first received string for IP and port + ip_str = c.split(':') + if is_valid_ip(ip_str[1]) != None: + ip = ip_str[1] + port = ip_str[2] + ser.flushInput() + ser.flushOutput() + ser.close() + return ip,port + except serial.SerialException as e: + print e + + +class MBED_DEVICE(MBED): + def __init__(self,platform_type): + print('MBED Device:' + platform_type) + MBED.__init__(self, platform_type) + +