Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of mbed-sdk-tools by
Diff: utils.py
- Revision:
- 29:1210849dba19
- Parent:
- 24:25bff2709c20
- Child:
- 31:182518299918
diff -r e080013bb94e -r 1210849dba19 utils.py --- a/utils.py Mon Aug 29 10:55:42 2016 +0100 +++ b/utils.py Mon Aug 29 11:18:36 2016 +0100 @@ -21,18 +21,26 @@ import math from os import listdir, remove, makedirs from shutil import copyfile -from os.path import isdir, join, exists, split, relpath, splitext, abspath, commonprefix, normpath +from os.path import isdir, join, exists, split, relpath, splitext, abspath +from os.path import commonprefix, normpath from subprocess import Popen, PIPE, STDOUT, call import json from collections import OrderedDict import logging def compile_worker(job): + """Standard task runner used for compiling + + Positional argumets: + job - a dict containing a list of commands and the remaining arguments + to run_cmd + """ results = [] for command in job['commands']: try: - _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'], chroot=job['chroot']) - except KeyboardInterrupt as e: + _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'], + chroot=job['chroot']) + except KeyboardInterrupt: raise ToolException results.append({ @@ -48,96 +56,143 @@ 'results': results } -def cmd(l, check=True, verbose=False, shell=False, cwd=None): - text = l if shell else ' '.join(l) +def cmd(command, check=True, verbose=False, shell=False, cwd=None): + """A wrapper to run a command as a blocking job""" + text = command if shell else ' '.join(command) if verbose: print text - rc = call(l, shell=shell, cwd=cwd) - if check and rc != 0: - raise Exception('ERROR %d: "%s"' % (rc, text)) + return_code = call(command, shell=shell, cwd=cwd) + if check and return_code != 0: + raise Exception('ERROR %d: "%s"' % (return_code, text)) def run_cmd(command, work_dir=None, chroot=None, redirect=False): + """Run a command in the forground + + Positional arguments: + command - the command to run + + Keyword arguments: + work_dir - the working directory to run the command in + chroot - the chroot to run the command in + redirect - redirect the stderr to a pipe to be used later + """ if chroot: # Conventions managed by the web team for the mbed.org build system chroot_cmd = [ '/usr/sbin/chroot', '--userspec=33:33', chroot ] - for c in command: - chroot_cmd += [c.replace(chroot, '')] + for element in command: + chroot_cmd += [element.replace(chroot, '')] - logging.debug("Running command %s"%' '.join(chroot_cmd)) + logging.debug("Running command %s", ' '.join(chroot_cmd)) command = chroot_cmd work_dir = None try: - p = Popen(command, stdout=PIPE, stderr=STDOUT if redirect else PIPE, cwd=work_dir) - _stdout, _stderr = p.communicate() - except OSError as e: + process = Popen(command, stdout=PIPE, + stderr=STDOUT if redirect else PIPE, cwd=work_dir) + _stdout, _stderr = process.communicate() + except OSError: print "[OS ERROR] Command: "+(' '.join(command)) raise - return _stdout, _stderr, p.returncode + return _stdout, _stderr, process.returncode def run_cmd_ext(command): + """ A version of run command that checks if the command exists befor running + + Positional arguments: + command - the command line you are trying to invoke + """ assert is_cmd_valid(command[0]) - p = Popen(command, stdout=PIPE, stderr=PIPE) - _stdout, _stderr = p.communicate() - return _stdout, _stderr, p.returncode + process = Popen(command, stdout=PIPE, stderr=PIPE) + _stdout, _stderr = process.communicate() + return _stdout, _stderr, process.returncode -def is_cmd_valid(cmd): +def is_cmd_valid(command): + """ Verify that a command exists and is executable + + Positional arguments: + command - the command to check + """ caller = get_caller_name() - abspath = find_cmd_abspath(cmd) - if not abspath: - error("%s: Command '%s' can't be found" % (caller, cmd)) - if not is_exec(abspath): - error("%s: Command '%s' resolves to file '%s' which is not executable" % (caller, cmd, abspath)) + cmd_path = find_cmd_abspath(command) + if not cmd_path: + error("%s: Command '%s' can't be found" % (caller, command)) + if not is_exec(cmd_path): + error("%s: Command '%s' resolves to file '%s' which is not executable" + % (caller, command, cmd_path)) return True def is_exec(path): + """A simple check to verify that a path to an executable exists + + Positional arguments: + path - the executable + """ return os.access(path, os.X_OK) or os.access(path+'.exe', os.X_OK) -def find_cmd_abspath(cmd): +def find_cmd_abspath(command): """ Returns the absolute path to a command. None is returned if no absolute path was found. + + Positional arguhments: + command - the command to find the path of """ - if exists(cmd) or exists(cmd + '.exe'): - return os.path.abspath(cmd) + if exists(command) or exists(command + '.exe'): + return os.path.abspath(command) if not 'PATH' in os.environ: - raise Exception("Can't find command path for current platform ('%s')" % sys.platform) - PATH=os.environ['PATH'] - for path in PATH.split(os.pathsep): - abspath = '%s/%s' % (path, cmd) - if exists(abspath) or exists(abspath + '.exe'): - return abspath + raise Exception("Can't find command path for current platform ('%s')" + % sys.platform) + path_env = os.environ['PATH'] + for path in path_env.split(os.pathsep): + cmd_path = '%s/%s' % (path, command) + if exists(cmd_path) or exists(cmd_path + '.exe'): + return cmd_path def mkdir(path): + """ a wrapped makedirs that only tries to create a directory if it does not + exist already + + Positional arguments: + path - the path to maybe create + """ if not exists(path): makedirs(path) def copy_file(src, dst): """ Implement the behaviour of "shutil.copy(src, dst)" without copying the - permissions (this was causing errors with directories mounted with samba) + permissions (this was causing errors with directories mounted with samba) + + Positional arguments: + src - the source of the copy operation + dst - the destination of the copy operation """ if isdir(dst): - _, file = split(src) - dst = join(dst, file) + _, base = split(src) + dst = join(dst, base) copyfile(src, dst) -def delete_dir_files(dir): - if not exists(dir): +def delete_dir_files(directory): + """ A function that does rm -rf + + Positional arguments: + directory - the directory to remove + """ + if not exists(directory): return - for f in listdir(dir): - file = join(dir, f) - if not isdir(file): + for element in listdir(directory): + to_remove = join(directory, element) + if not isdir(to_remove): remove(file) @@ -145,34 +200,58 @@ """ When called inside a function, it returns the name of the caller of that function. + + Keyword arguments: + steps - the number of steps up the stack the calling function is """ return inspect.stack()[steps][3] def error(msg): + """Fatal error, abort hard + + Positional arguments: + msg - the message to print before crashing + """ print("ERROR: %s" % msg) sys.exit(1) def rel_path(path, base, dot=False): - p = relpath(path, base) - if dot and not p.startswith('.'): - p = './' + p - return p + """Relative path calculation that optionaly always starts with a dot + + Positional arguments: + path - the path to make relative + base - what to make the path relative to + + Keyword arguments: + dot - if True, the path will always start with a './' + """ + final_path = relpath(path, base) + if dot and not final_path.startswith('.'): + final_path = './' + final_path + return final_path class ToolException(Exception): + """A class representing an exception throw by the tools""" pass class NotSupportedException(Exception): + """A class a toolchain not supporting a particular target""" pass class InvalidReleaseTargetException(Exception): pass def split_path(path): - base, file = split(path) - name, ext = splitext(file) + """spilt a file name into it's directory name, base name, and extension + + Positional arguments: + path - the file name to split + """ + base, has_ext = split(path) + name, ext = splitext(has_ext) return base, name, ext @@ -181,12 +260,15 @@ This roughly translates to the number of path separators (os.sep) + 1. Ex. Given "path/to/dir", this would return 3 Special cases: "." and "/" return 0 + + Positional arguments: + path - the path to calculate the depth of """ normalized_path = normpath(path) path_depth = 0 head, tail = split(normalized_path) - while(tail and tail != '.'): + while tail and tail != '.': path_depth += 1 head, tail = split(head) @@ -194,18 +276,28 @@ def args_error(parser, message): + """Abort with an error that was generated by the arguments to a CLI program + + Positional arguments: + parser - the ArgumentParser object that parsed the command line + message - what went wrong + """ print "\n\n%s\n\n" % message parser.print_help() sys.exit() def construct_enum(**enums): - """ Create your own pseudo-enums """ + """ Create your own pseudo-enums + + Keyword arguments: + * - a member of the Enum you are creating and it's value + """ return type('Enum', (), enums) def check_required_modules(required_modules, verbose=True): - """ Function checks for Python modules which should be "importable" (installed) + """ Function checks for Python modules which should be "importable" before test suite can be used. @return returns True if all modules are installed already """ @@ -214,63 +306,84 @@ for module_name in required_modules: try: imp.find_module(module_name) - except ImportError as e: + except ImportError: # We also test against a rare case: module is an egg file try: __import__(module_name) - except ImportError as e: + except ImportError as exc: not_installed_modules.append(module_name) if verbose: - print "Error: %s" % e + print "Error: %s" % exc if verbose: if not_installed_modules: - print "Warning: Module(s) %s not installed. Please install required module(s) before using this script."% (', '.join(not_installed_modules)) + print ("Warning: Module(s) %s not installed. Please install " + \ + "required module(s) before using this script.")\ + % (', '.join(not_installed_modules)) if not_installed_modules: return False else: return True -# Utility function: traverse a dictionary and change all the strings in the dictionary to -# ASCII from Unicode. Useful when reading ASCII JSON data, because the JSON decoder always -# returns Unicode string. -# Based on http://stackoverflow.com/a/13105359 -def dict_to_ascii(input): - if isinstance(input, dict): - return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value)) for key, value in input.iteritems()]) - elif isinstance(input, list): - return [dict_to_ascii(element) for element in input] - elif isinstance(input, unicode): - return input.encode('ascii') +def dict_to_ascii(dictionary): + """ Utility function: traverse a dictionary and change all the strings in + the dictionary to ASCII from Unicode. Useful when reading ASCII JSON data, + because the JSON decoder always returns Unicode string. Based on + http://stackoverflow.com/a/13105359 + + Positional arguments: + dictionary - The dict that contains some Unicode that should be ASCII + """ + if isinstance(dictionary, dict): + return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value)) + for key, value in dictionary.iteritems()]) + elif isinstance(dictionary, list): + return [dict_to_ascii(element) for element in dictionary] + elif isinstance(dictionary, unicode): + return dictionary.encode('ascii') else: - return input + return dictionary + +def json_file_to_dict(fname): + """ Read a JSON file and return its Python representation, transforming all + the strings from Unicode to ASCII. The order of keys in the JSON file is + preserved. -# Read a JSON file and return its Python representation, transforming all the strings from Unicode -# to ASCII. The order of keys in the JSON file is preserved. -def json_file_to_dict(fname): + Positional arguments: + fname - the name of the file to parse + """ try: - with open(fname, "rt") as f: - return dict_to_ascii(json.load(f, object_pairs_hook=OrderedDict)) + with open(fname, "r") as file_obj: + return dict_to_ascii(json.load(file_obj, + object_pairs_hook=OrderedDict)) except (ValueError, IOError): sys.stderr.write("Error parsing '%s':\n" % fname) raise # Wowza, double closure -def argparse_type(casedness, prefer_hyphen=False) : - def middle(list, type_name): - # validate that an argument passed in (as string) is a member of the list of possible - # arguments. Offer a suggestion if the case of the string, or the hyphens/underscores - # do not match the expected style of the argument. +def argparse_type(casedness, prefer_hyphen=False): + def middle(lst, type_name): def parse_type(string): - if prefer_hyphen: newstring = casedness(string).replace("_","-") - else: newstring = casedness(string).replace("-","_") - if string in list: + """ validate that an argument passed in (as string) is a member of + the list of possible arguments. Offer a suggestion if the case of + the string, or the hyphens/underscores do not match the expected + style of the argument. + """ + if prefer_hyphen: + newstring = casedness(string).replace("_", "-") + else: + newstring = casedness(string).replace("-", "_") + if string in lst: return string - elif string not in list and newstring in list: - raise argparse.ArgumentTypeError("{0} is not a supported {1}. Did you mean {2}?".format(string, type_name, newstring)) + elif string not in lst and newstring in lst: + raise argparse.ArgumentTypeError( + "{0} is not a supported {1}. Did you mean {2}?".format( + string, type_name, newstring)) else: - raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list))) + raise argparse.ArgumentTypeError( + "{0} is not a supported {1}. Supported {1}s are:\n{2}". + format(string, type_name, columnate(lst))) return parse_type return middle @@ -281,15 +394,19 @@ argparse_lowercase_hyphen_type = argparse_type(str.lower, True) def argparse_force_type(case): - def middle(list, type_name): - # validate that an argument passed in (as string) is a member of the list of possible - # arguments after converting it's case. Offer a suggestion if the hyphens/underscores - # do not match the expected style of the argument. + """ validate that an argument passed in (as string) is a member of the list + of possible arguments after converting it's case. + """ + def middle(lst, type_name): + """ The parser type generator""" def parse_type(string): - for option in list: + """ The parser type""" + for option in lst: if case(string) == case(option): return option - raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list))) + raise argparse.ArgumentTypeError( + "{0} is not a supported {1}. Supported {1}s are:\n{2}". + format(string, type_name, columnate(lst))) return parse_type return middle @@ -297,30 +414,42 @@ argparse_force_uppercase_type = argparse_force_type(str.upper) argparse_force_lowercase_type = argparse_force_type(str.lower) -# An argument parser combinator that takes in an argument parser and creates a new parser that -# accepts a comma separated list of the same thing. -def argparse_many(fn): +def argparse_many(func): + """ An argument parser combinator that takes in an argument parser and + creates a new parser that accepts a comma separated list of the same thing. + """ def wrap(string): - return [fn(s) for s in string.split(",")] + """ The actual parser""" + return [func(s) for s in string.split(",")] return wrap -# An argument parser that verifies that a string passed in corresponds to a file -def argparse_filestring_type(string) : - if exists(string) : +def argparse_filestring_type(string): + """ An argument parser that verifies that a string passed in corresponds + to a file""" + if exists(string): return string - else : - raise argparse.ArgumentTypeError("{0}"" does not exist in the filesystem.".format(string)) + else: + raise argparse.ArgumentTypeError( + "{0}"" does not exist in the filesystem.".format(string)) + +def columnate(strings, separator=", ", chars=80): + """ render a list of strings as a in a bunch of columns -# render a list of strings as a in a bunch of columns -def columnate(strings, seperator=", ", chars=80): + Positional arguments: + strings - the strings to columnate + + Keyword arguments; + separator - the separation between the columns + chars - the maximum with of a row + """ col_width = max(len(s) for s in strings) - total_width = col_width + len(seperator) + total_width = col_width + len(separator) columns = math.floor(chars / total_width) output = "" - for i, s in zip(range(len(strings)), strings): - append = s + for i, string in zip(range(len(strings)), strings): + append = string if i != len(strings) - 1: - append += seperator + append += separator if i % columns == columns - 1: append += "\n" else: @@ -328,13 +457,16 @@ output += append return output -# fail if argument provided is a parent of the specified directory def argparse_dir_not_parent(other): + """fail if argument provided is a parent of the specified directory""" def parse_type(not_parent): + """The parser type""" abs_other = abspath(other) abs_not_parent = abspath(not_parent) if abs_not_parent == commonprefix([abs_not_parent, abs_other]): - raise argparse.ArgumentTypeError("{0} may not be a parent directory of {1}".format(not_parent, other)) + raise argparse.ArgumentTypeError( + "{0} may not be a parent directory of {1}".format( + not_parent, other)) else: return not_parent return parse_type