Clone of official tools
Diff: utils.py
- Revision:
- 29:1210849dba19
- Parent:
- 24:25bff2709c20
- Child:
- 31:8ea194f6145b
diff -r e080013bb94e -r 1210849dba19 utils.py --- a/utils.py Mon Aug 29 10:55:42 2016 +0100 +++ b/utils.py Mon Aug 29 11:18:36 2016 +0100 @@ -21,18 +21,26 @@ import math from os import listdir, remove, makedirs from shutil import copyfile -from os.path import isdir, join, exists, split, relpath, splitext, abspath, commonprefix, normpath +from os.path import isdir, join, exists, split, relpath, splitext, abspath +from os.path import commonprefix, normpath from subprocess import Popen, PIPE, STDOUT, call import json from collections import OrderedDict import logging def compile_worker(job): + """Standard task runner used for compiling + + Positional argumets: + job - a dict containing a list of commands and the remaining arguments + to run_cmd + """ results = [] for command in job['commands']: try: - _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'], chroot=job['chroot']) - except KeyboardInterrupt as e: + _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'], + chroot=job['chroot']) + except KeyboardInterrupt: raise ToolException results.append({ @@ -48,96 +56,143 @@ 'results': results } -def cmd(l, check=True, verbose=False, shell=False, cwd=None): - text = l if shell else ' '.join(l) +def cmd(command, check=True, verbose=False, shell=False, cwd=None): + """A wrapper to run a command as a blocking job""" + text = command if shell else ' '.join(command) if verbose: print text - rc = call(l, shell=shell, cwd=cwd) - if check and rc != 0: - raise Exception('ERROR %d: "%s"' % (rc, text)) + return_code = call(command, shell=shell, cwd=cwd) + if check and return_code != 0: + raise Exception('ERROR %d: "%s"' % (return_code, text)) def run_cmd(command, work_dir=None, chroot=None, redirect=False): + """Run a command in the forground + + Positional arguments: + command - the command to run + + Keyword arguments: + work_dir - the working directory to run the command in + chroot - the chroot to run the command in + redirect - redirect the stderr to a pipe to be used later + """ if chroot: # Conventions managed by the web team for the mbed.org build system chroot_cmd = [ '/usr/sbin/chroot', '--userspec=33:33', chroot ] - for c in command: - chroot_cmd += [c.replace(chroot, '')] + for element in command: + chroot_cmd += [element.replace(chroot, '')] - logging.debug("Running command %s"%' '.join(chroot_cmd)) + logging.debug("Running command %s", ' '.join(chroot_cmd)) command = chroot_cmd work_dir = None try: - p = Popen(command, stdout=PIPE, stderr=STDOUT if redirect else PIPE, cwd=work_dir) - _stdout, _stderr = p.communicate() - except OSError as e: + process = Popen(command, stdout=PIPE, + stderr=STDOUT if redirect else PIPE, cwd=work_dir) + _stdout, _stderr = process.communicate() + except OSError: print "[OS ERROR] Command: "+(' '.join(command)) raise - return _stdout, _stderr, p.returncode + return _stdout, _stderr, process.returncode def run_cmd_ext(command): + """ A version of run command that checks if the command exists befor running + + Positional arguments: + command - the command line you are trying to invoke + """ assert is_cmd_valid(command[0]) - p = Popen(command, stdout=PIPE, stderr=PIPE) - _stdout, _stderr = p.communicate() - return _stdout, _stderr, p.returncode + process = Popen(command, stdout=PIPE, stderr=PIPE) + _stdout, _stderr = process.communicate() + return _stdout, _stderr, process.returncode -def is_cmd_valid(cmd): +def is_cmd_valid(command): + """ Verify that a command exists and is executable + + Positional arguments: + command - the command to check + """ caller = get_caller_name() - abspath = find_cmd_abspath(cmd) - if not abspath: - error("%s: Command '%s' can't be found" % (caller, cmd)) - if not is_exec(abspath): - error("%s: Command '%s' resolves to file '%s' which is not executable" % (caller, cmd, abspath)) + cmd_path = find_cmd_abspath(command) + if not cmd_path: + error("%s: Command '%s' can't be found" % (caller, command)) + if not is_exec(cmd_path): + error("%s: Command '%s' resolves to file '%s' which is not executable" + % (caller, command, cmd_path)) return True def is_exec(path): + """A simple check to verify that a path to an executable exists + + Positional arguments: + path - the executable + """ return os.access(path, os.X_OK) or os.access(path+'.exe', os.X_OK) -def find_cmd_abspath(cmd): +def find_cmd_abspath(command): """ Returns the absolute path to a command. None is returned if no absolute path was found. + + Positional arguhments: + command - the command to find the path of """ - if exists(cmd) or exists(cmd + '.exe'): - return os.path.abspath(cmd) + if exists(command) or exists(command + '.exe'): + return os.path.abspath(command) if not 'PATH' in os.environ: - raise Exception("Can't find command path for current platform ('%s')" % sys.platform) - PATH=os.environ['PATH'] - for path in PATH.split(os.pathsep): - abspath = '%s/%s' % (path, cmd) - if exists(abspath) or exists(abspath + '.exe'): - return abspath + raise Exception("Can't find command path for current platform ('%s')" + % sys.platform) + path_env = os.environ['PATH'] + for path in path_env.split(os.pathsep): + cmd_path = '%s/%s' % (path, command) + if exists(cmd_path) or exists(cmd_path + '.exe'): + return cmd_path def mkdir(path): + """ a wrapped makedirs that only tries to create a directory if it does not + exist already + + Positional arguments: + path - the path to maybe create + """ if not exists(path): makedirs(path) def copy_file(src, dst): """ Implement the behaviour of "shutil.copy(src, dst)" without copying the - permissions (this was causing errors with directories mounted with samba) + permissions (this was causing errors with directories mounted with samba) + + Positional arguments: + src - the source of the copy operation + dst - the destination of the copy operation """ if isdir(dst): - _, file = split(src) - dst = join(dst, file) + _, base = split(src) + dst = join(dst, base) copyfile(src, dst) -def delete_dir_files(dir): - if not exists(dir): +def delete_dir_files(directory): + """ A function that does rm -rf + + Positional arguments: + directory - the directory to remove + """ + if not exists(directory): return - for f in listdir(dir): - file = join(dir, f) - if not isdir(file): + for element in listdir(directory): + to_remove = join(directory, element) + if not isdir(to_remove): remove(file) @@ -145,34 +200,58 @@ """ When called inside a function, it returns the name of the caller of that function. + + Keyword arguments: + steps - the number of steps up the stack the calling function is """ return inspect.stack()[steps][3] def error(msg): + """Fatal error, abort hard + + Positional arguments: + msg - the message to print before crashing + """ print("ERROR: %s" % msg) sys.exit(1) def rel_path(path, base, dot=False): - p = relpath(path, base) - if dot and not p.startswith('.'): - p = './' + p - return p + """Relative path calculation that optionaly always starts with a dot + + Positional arguments: + path - the path to make relative + base - what to make the path relative to + + Keyword arguments: + dot - if True, the path will always start with a './' + """ + final_path = relpath(path, base) + if dot and not final_path.startswith('.'): + final_path = './' + final_path + return final_path class ToolException(Exception): + """A class representing an exception throw by the tools""" pass class NotSupportedException(Exception): + """A class a toolchain not supporting a particular target""" pass class InvalidReleaseTargetException(Exception): pass def split_path(path): - base, file = split(path) - name, ext = splitext(file) + """spilt a file name into it's directory name, base name, and extension + + Positional arguments: + path - the file name to split + """ + base, has_ext = split(path) + name, ext = splitext(has_ext) return base, name, ext @@ -181,12 +260,15 @@ This roughly translates to the number of path separators (os.sep) + 1. Ex. Given "path/to/dir", this would return 3 Special cases: "." and "/" return 0 + + Positional arguments: + path - the path to calculate the depth of """ normalized_path = normpath(path) path_depth = 0 head, tail = split(normalized_path) - while(tail and tail != '.'): + while tail and tail != '.': path_depth += 1 head, tail = split(head) @@ -194,18 +276,28 @@ def args_error(parser, message): + """Abort with an error that was generated by the arguments to a CLI program + + Positional arguments: + parser - the ArgumentParser object that parsed the command line + message - what went wrong + """ print "\n\n%s\n\n" % message parser.print_help() sys.exit() def construct_enum(**enums): - """ Create your own pseudo-enums """ + """ Create your own pseudo-enums + + Keyword arguments: + * - a member of the Enum you are creating and it's value + """ return type('Enum', (), enums) def check_required_modules(required_modules, verbose=True): - """ Function checks for Python modules which should be "importable" (installed) + """ Function checks for Python modules which should be "importable" before test suite can be used. @return returns True if all modules are installed already """ @@ -214,63 +306,84 @@ for module_name in required_modules: try: imp.find_module(module_name) - except ImportError as e: + except ImportError: # We also test against a rare case: module is an egg file try: __import__(module_name) - except ImportError as e: + except ImportError as exc: not_installed_modules.append(module_name) if verbose: - print "Error: %s" % e + print "Error: %s" % exc if verbose: if not_installed_modules: - print "Warning: Module(s) %s not installed. Please install required module(s) before using this script."% (', '.join(not_installed_modules)) + print ("Warning: Module(s) %s not installed. Please install " + \ + "required module(s) before using this script.")\ + % (', '.join(not_installed_modules)) if not_installed_modules: return False else: return True -# Utility function: traverse a dictionary and change all the strings in the dictionary to -# ASCII from Unicode. Useful when reading ASCII JSON data, because the JSON decoder always -# returns Unicode string. -# Based on http://stackoverflow.com/a/13105359 -def dict_to_ascii(input): - if isinstance(input, dict): - return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value)) for key, value in input.iteritems()]) - elif isinstance(input, list): - return [dict_to_ascii(element) for element in input] - elif isinstance(input, unicode): - return input.encode('ascii') +def dict_to_ascii(dictionary): + """ Utility function: traverse a dictionary and change all the strings in + the dictionary to ASCII from Unicode. Useful when reading ASCII JSON data, + because the JSON decoder always returns Unicode string. Based on + http://stackoverflow.com/a/13105359 + + Positional arguments: + dictionary - The dict that contains some Unicode that should be ASCII + """ + if isinstance(dictionary, dict): + return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value)) + for key, value in dictionary.iteritems()]) + elif isinstance(dictionary, list): + return [dict_to_ascii(element) for element in dictionary] + elif isinstance(dictionary, unicode): + return dictionary.encode('ascii') else: - return input + return dictionary + +def json_file_to_dict(fname): + """ Read a JSON file and return its Python representation, transforming all + the strings from Unicode to ASCII. The order of keys in the JSON file is + preserved. -# Read a JSON file and return its Python representation, transforming all the strings from Unicode -# to ASCII. The order of keys in the JSON file is preserved. -def json_file_to_dict(fname): + Positional arguments: + fname - the name of the file to parse + """ try: - with open(fname, "rt") as f: - return dict_to_ascii(json.load(f, object_pairs_hook=OrderedDict)) + with open(fname, "r") as file_obj: + return dict_to_ascii(json.load(file_obj, + object_pairs_hook=OrderedDict)) except (ValueError, IOError): sys.stderr.write("Error parsing '%s':\n" % fname) raise # Wowza, double closure -def argparse_type(casedness, prefer_hyphen=False) : - def middle(list, type_name): - # validate that an argument passed in (as string) is a member of the list of possible - # arguments. Offer a suggestion if the case of the string, or the hyphens/underscores - # do not match the expected style of the argument. +def argparse_type(casedness, prefer_hyphen=False): + def middle(lst, type_name): def parse_type(string): - if prefer_hyphen: newstring = casedness(string).replace("_","-") - else: newstring = casedness(string).replace("-","_") - if string in list: + """ validate that an argument passed in (as string) is a member of + the list of possible arguments. Offer a suggestion if the case of + the string, or the hyphens/underscores do not match the expected + style of the argument. + """ + if prefer_hyphen: + newstring = casedness(string).replace("_", "-") + else: + newstring = casedness(string).replace("-", "_") + if string in lst: return string - elif string not in list and newstring in list: - raise argparse.ArgumentTypeError("{0} is not a supported {1}. Did you mean {2}?".format(string, type_name, newstring)) + elif string not in lst and newstring in lst: + raise argparse.ArgumentTypeError( + "{0} is not a supported {1}. Did you mean {2}?".format( + string, type_name, newstring)) else: - raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list))) + raise argparse.ArgumentTypeError( + "{0} is not a supported {1}. Supported {1}s are:\n{2}". + format(string, type_name, columnate(lst))) return parse_type return middle @@ -281,15 +394,19 @@ argparse_lowercase_hyphen_type = argparse_type(str.lower, True) def argparse_force_type(case): - def middle(list, type_name): - # validate that an argument passed in (as string) is a member of the list of possible - # arguments after converting it's case. Offer a suggestion if the hyphens/underscores - # do not match the expected style of the argument. + """ validate that an argument passed in (as string) is a member of the list + of possible arguments after converting it's case. + """ + def middle(lst, type_name): + """ The parser type generator""" def parse_type(string): - for option in list: + """ The parser type""" + for option in lst: if case(string) == case(option): return option - raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list))) + raise argparse.ArgumentTypeError( + "{0} is not a supported {1}. Supported {1}s are:\n{2}". + format(string, type_name, columnate(lst))) return parse_type return middle @@ -297,30 +414,42 @@ argparse_force_uppercase_type = argparse_force_type(str.upper) argparse_force_lowercase_type = argparse_force_type(str.lower) -# An argument parser combinator that takes in an argument parser and creates a new parser that -# accepts a comma separated list of the same thing. -def argparse_many(fn): +def argparse_many(func): + """ An argument parser combinator that takes in an argument parser and + creates a new parser that accepts a comma separated list of the same thing. + """ def wrap(string): - return [fn(s) for s in string.split(",")] + """ The actual parser""" + return [func(s) for s in string.split(",")] return wrap -# An argument parser that verifies that a string passed in corresponds to a file -def argparse_filestring_type(string) : - if exists(string) : +def argparse_filestring_type(string): + """ An argument parser that verifies that a string passed in corresponds + to a file""" + if exists(string): return string - else : - raise argparse.ArgumentTypeError("{0}"" does not exist in the filesystem.".format(string)) + else: + raise argparse.ArgumentTypeError( + "{0}"" does not exist in the filesystem.".format(string)) + +def columnate(strings, separator=", ", chars=80): + """ render a list of strings as a in a bunch of columns -# render a list of strings as a in a bunch of columns -def columnate(strings, seperator=", ", chars=80): + Positional arguments: + strings - the strings to columnate + + Keyword arguments; + separator - the separation between the columns + chars - the maximum with of a row + """ col_width = max(len(s) for s in strings) - total_width = col_width + len(seperator) + total_width = col_width + len(separator) columns = math.floor(chars / total_width) output = "" - for i, s in zip(range(len(strings)), strings): - append = s + for i, string in zip(range(len(strings)), strings): + append = string if i != len(strings) - 1: - append += seperator + append += separator if i % columns == columns - 1: append += "\n" else: @@ -328,13 +457,16 @@ output += append return output -# fail if argument provided is a parent of the specified directory def argparse_dir_not_parent(other): + """fail if argument provided is a parent of the specified directory""" def parse_type(not_parent): + """The parser type""" abs_other = abspath(other) abs_not_parent = abspath(not_parent) if abs_not_parent == commonprefix([abs_not_parent, abs_other]): - raise argparse.ArgumentTypeError("{0} may not be a parent directory of {1}".format(not_parent, other)) + raise argparse.ArgumentTypeError( + "{0} may not be a parent directory of {1}".format( + not_parent, other)) else: return not_parent return parse_type