Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: utils.py
- Revision:
- 29:1210849dba19
- Parent:
- 24:25bff2709c20
- Child:
- 31:8ea194f6145b
--- a/utils.py Mon Aug 29 10:55:42 2016 +0100
+++ b/utils.py Mon Aug 29 11:18:36 2016 +0100
@@ -21,18 +21,26 @@
import math
from os import listdir, remove, makedirs
from shutil import copyfile
-from os.path import isdir, join, exists, split, relpath, splitext, abspath, commonprefix, normpath
+from os.path import isdir, join, exists, split, relpath, splitext, abspath
+from os.path import commonprefix, normpath
from subprocess import Popen, PIPE, STDOUT, call
import json
from collections import OrderedDict
import logging
def compile_worker(job):
+ """Standard task runner used for compiling
+
+ Positional argumets:
+ job - a dict containing a list of commands and the remaining arguments
+ to run_cmd
+ """
results = []
for command in job['commands']:
try:
- _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'], chroot=job['chroot'])
- except KeyboardInterrupt as e:
+ _, _stderr, _rc = run_cmd(command, work_dir=job['work_dir'],
+ chroot=job['chroot'])
+ except KeyboardInterrupt:
raise ToolException
results.append({
@@ -48,96 +56,143 @@
'results': results
}
-def cmd(l, check=True, verbose=False, shell=False, cwd=None):
- text = l if shell else ' '.join(l)
+def cmd(command, check=True, verbose=False, shell=False, cwd=None):
+ """A wrapper to run a command as a blocking job"""
+ text = command if shell else ' '.join(command)
if verbose:
print text
- rc = call(l, shell=shell, cwd=cwd)
- if check and rc != 0:
- raise Exception('ERROR %d: "%s"' % (rc, text))
+ return_code = call(command, shell=shell, cwd=cwd)
+ if check and return_code != 0:
+ raise Exception('ERROR %d: "%s"' % (return_code, text))
def run_cmd(command, work_dir=None, chroot=None, redirect=False):
+ """Run a command in the forground
+
+ Positional arguments:
+ command - the command to run
+
+ Keyword arguments:
+ work_dir - the working directory to run the command in
+ chroot - the chroot to run the command in
+ redirect - redirect the stderr to a pipe to be used later
+ """
if chroot:
# Conventions managed by the web team for the mbed.org build system
chroot_cmd = [
'/usr/sbin/chroot', '--userspec=33:33', chroot
]
- for c in command:
- chroot_cmd += [c.replace(chroot, '')]
+ for element in command:
+ chroot_cmd += [element.replace(chroot, '')]
- logging.debug("Running command %s"%' '.join(chroot_cmd))
+ logging.debug("Running command %s", ' '.join(chroot_cmd))
command = chroot_cmd
work_dir = None
try:
- p = Popen(command, stdout=PIPE, stderr=STDOUT if redirect else PIPE, cwd=work_dir)
- _stdout, _stderr = p.communicate()
- except OSError as e:
+ process = Popen(command, stdout=PIPE,
+ stderr=STDOUT if redirect else PIPE, cwd=work_dir)
+ _stdout, _stderr = process.communicate()
+ except OSError:
print "[OS ERROR] Command: "+(' '.join(command))
raise
- return _stdout, _stderr, p.returncode
+ return _stdout, _stderr, process.returncode
def run_cmd_ext(command):
+ """ A version of run command that checks if the command exists befor running
+
+ Positional arguments:
+ command - the command line you are trying to invoke
+ """
assert is_cmd_valid(command[0])
- p = Popen(command, stdout=PIPE, stderr=PIPE)
- _stdout, _stderr = p.communicate()
- return _stdout, _stderr, p.returncode
+ process = Popen(command, stdout=PIPE, stderr=PIPE)
+ _stdout, _stderr = process.communicate()
+ return _stdout, _stderr, process.returncode
-def is_cmd_valid(cmd):
+def is_cmd_valid(command):
+ """ Verify that a command exists and is executable
+
+ Positional arguments:
+ command - the command to check
+ """
caller = get_caller_name()
- abspath = find_cmd_abspath(cmd)
- if not abspath:
- error("%s: Command '%s' can't be found" % (caller, cmd))
- if not is_exec(abspath):
- error("%s: Command '%s' resolves to file '%s' which is not executable" % (caller, cmd, abspath))
+ cmd_path = find_cmd_abspath(command)
+ if not cmd_path:
+ error("%s: Command '%s' can't be found" % (caller, command))
+ if not is_exec(cmd_path):
+ error("%s: Command '%s' resolves to file '%s' which is not executable"
+ % (caller, command, cmd_path))
return True
def is_exec(path):
+ """A simple check to verify that a path to an executable exists
+
+ Positional arguments:
+ path - the executable
+ """
return os.access(path, os.X_OK) or os.access(path+'.exe', os.X_OK)
-def find_cmd_abspath(cmd):
+def find_cmd_abspath(command):
""" Returns the absolute path to a command.
None is returned if no absolute path was found.
+
+ Positional arguhments:
+ command - the command to find the path of
"""
- if exists(cmd) or exists(cmd + '.exe'):
- return os.path.abspath(cmd)
+ if exists(command) or exists(command + '.exe'):
+ return os.path.abspath(command)
if not 'PATH' in os.environ:
- raise Exception("Can't find command path for current platform ('%s')" % sys.platform)
- PATH=os.environ['PATH']
- for path in PATH.split(os.pathsep):
- abspath = '%s/%s' % (path, cmd)
- if exists(abspath) or exists(abspath + '.exe'):
- return abspath
+ raise Exception("Can't find command path for current platform ('%s')"
+ % sys.platform)
+ path_env = os.environ['PATH']
+ for path in path_env.split(os.pathsep):
+ cmd_path = '%s/%s' % (path, command)
+ if exists(cmd_path) or exists(cmd_path + '.exe'):
+ return cmd_path
def mkdir(path):
+ """ a wrapped makedirs that only tries to create a directory if it does not
+ exist already
+
+ Positional arguments:
+ path - the path to maybe create
+ """
if not exists(path):
makedirs(path)
def copy_file(src, dst):
""" Implement the behaviour of "shutil.copy(src, dst)" without copying the
- permissions (this was causing errors with directories mounted with samba)
+ permissions (this was causing errors with directories mounted with samba)
+
+ Positional arguments:
+ src - the source of the copy operation
+ dst - the destination of the copy operation
"""
if isdir(dst):
- _, file = split(src)
- dst = join(dst, file)
+ _, base = split(src)
+ dst = join(dst, base)
copyfile(src, dst)
-def delete_dir_files(dir):
- if not exists(dir):
+def delete_dir_files(directory):
+ """ A function that does rm -rf
+
+ Positional arguments:
+ directory - the directory to remove
+ """
+ if not exists(directory):
return
- for f in listdir(dir):
- file = join(dir, f)
- if not isdir(file):
+ for element in listdir(directory):
+ to_remove = join(directory, element)
+ if not isdir(to_remove):
remove(file)
@@ -145,34 +200,58 @@
"""
When called inside a function, it returns the name
of the caller of that function.
+
+ Keyword arguments:
+ steps - the number of steps up the stack the calling function is
"""
return inspect.stack()[steps][3]
def error(msg):
+ """Fatal error, abort hard
+
+ Positional arguments:
+ msg - the message to print before crashing
+ """
print("ERROR: %s" % msg)
sys.exit(1)
def rel_path(path, base, dot=False):
- p = relpath(path, base)
- if dot and not p.startswith('.'):
- p = './' + p
- return p
+ """Relative path calculation that optionaly always starts with a dot
+
+ Positional arguments:
+ path - the path to make relative
+ base - what to make the path relative to
+
+ Keyword arguments:
+ dot - if True, the path will always start with a './'
+ """
+ final_path = relpath(path, base)
+ if dot and not final_path.startswith('.'):
+ final_path = './' + final_path
+ return final_path
class ToolException(Exception):
+ """A class representing an exception throw by the tools"""
pass
class NotSupportedException(Exception):
+ """A class a toolchain not supporting a particular target"""
pass
class InvalidReleaseTargetException(Exception):
pass
def split_path(path):
- base, file = split(path)
- name, ext = splitext(file)
+ """spilt a file name into it's directory name, base name, and extension
+
+ Positional arguments:
+ path - the file name to split
+ """
+ base, has_ext = split(path)
+ name, ext = splitext(has_ext)
return base, name, ext
@@ -181,12 +260,15 @@
This roughly translates to the number of path separators (os.sep) + 1.
Ex. Given "path/to/dir", this would return 3
Special cases: "." and "/" return 0
+
+ Positional arguments:
+ path - the path to calculate the depth of
"""
normalized_path = normpath(path)
path_depth = 0
head, tail = split(normalized_path)
- while(tail and tail != '.'):
+ while tail and tail != '.':
path_depth += 1
head, tail = split(head)
@@ -194,18 +276,28 @@
def args_error(parser, message):
+ """Abort with an error that was generated by the arguments to a CLI program
+
+ Positional arguments:
+ parser - the ArgumentParser object that parsed the command line
+ message - what went wrong
+ """
print "\n\n%s\n\n" % message
parser.print_help()
sys.exit()
def construct_enum(**enums):
- """ Create your own pseudo-enums """
+ """ Create your own pseudo-enums
+
+ Keyword arguments:
+ * - a member of the Enum you are creating and it's value
+ """
return type('Enum', (), enums)
def check_required_modules(required_modules, verbose=True):
- """ Function checks for Python modules which should be "importable" (installed)
+ """ Function checks for Python modules which should be "importable"
before test suite can be used.
@return returns True if all modules are installed already
"""
@@ -214,63 +306,84 @@
for module_name in required_modules:
try:
imp.find_module(module_name)
- except ImportError as e:
+ except ImportError:
# We also test against a rare case: module is an egg file
try:
__import__(module_name)
- except ImportError as e:
+ except ImportError as exc:
not_installed_modules.append(module_name)
if verbose:
- print "Error: %s" % e
+ print "Error: %s" % exc
if verbose:
if not_installed_modules:
- print "Warning: Module(s) %s not installed. Please install required module(s) before using this script."% (', '.join(not_installed_modules))
+ print ("Warning: Module(s) %s not installed. Please install " + \
+ "required module(s) before using this script.")\
+ % (', '.join(not_installed_modules))
if not_installed_modules:
return False
else:
return True
-# Utility function: traverse a dictionary and change all the strings in the dictionary to
-# ASCII from Unicode. Useful when reading ASCII JSON data, because the JSON decoder always
-# returns Unicode string.
-# Based on http://stackoverflow.com/a/13105359
-def dict_to_ascii(input):
- if isinstance(input, dict):
- return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value)) for key, value in input.iteritems()])
- elif isinstance(input, list):
- return [dict_to_ascii(element) for element in input]
- elif isinstance(input, unicode):
- return input.encode('ascii')
+def dict_to_ascii(dictionary):
+ """ Utility function: traverse a dictionary and change all the strings in
+ the dictionary to ASCII from Unicode. Useful when reading ASCII JSON data,
+ because the JSON decoder always returns Unicode string. Based on
+ http://stackoverflow.com/a/13105359
+
+ Positional arguments:
+ dictionary - The dict that contains some Unicode that should be ASCII
+ """
+ if isinstance(dictionary, dict):
+ return OrderedDict([(dict_to_ascii(key), dict_to_ascii(value))
+ for key, value in dictionary.iteritems()])
+ elif isinstance(dictionary, list):
+ return [dict_to_ascii(element) for element in dictionary]
+ elif isinstance(dictionary, unicode):
+ return dictionary.encode('ascii')
else:
- return input
+ return dictionary
+
+def json_file_to_dict(fname):
+ """ Read a JSON file and return its Python representation, transforming all
+ the strings from Unicode to ASCII. The order of keys in the JSON file is
+ preserved.
-# Read a JSON file and return its Python representation, transforming all the strings from Unicode
-# to ASCII. The order of keys in the JSON file is preserved.
-def json_file_to_dict(fname):
+ Positional arguments:
+ fname - the name of the file to parse
+ """
try:
- with open(fname, "rt") as f:
- return dict_to_ascii(json.load(f, object_pairs_hook=OrderedDict))
+ with open(fname, "r") as file_obj:
+ return dict_to_ascii(json.load(file_obj,
+ object_pairs_hook=OrderedDict))
except (ValueError, IOError):
sys.stderr.write("Error parsing '%s':\n" % fname)
raise
# Wowza, double closure
-def argparse_type(casedness, prefer_hyphen=False) :
- def middle(list, type_name):
- # validate that an argument passed in (as string) is a member of the list of possible
- # arguments. Offer a suggestion if the case of the string, or the hyphens/underscores
- # do not match the expected style of the argument.
+def argparse_type(casedness, prefer_hyphen=False):
+ def middle(lst, type_name):
def parse_type(string):
- if prefer_hyphen: newstring = casedness(string).replace("_","-")
- else: newstring = casedness(string).replace("-","_")
- if string in list:
+ """ validate that an argument passed in (as string) is a member of
+ the list of possible arguments. Offer a suggestion if the case of
+ the string, or the hyphens/underscores do not match the expected
+ style of the argument.
+ """
+ if prefer_hyphen:
+ newstring = casedness(string).replace("_", "-")
+ else:
+ newstring = casedness(string).replace("-", "_")
+ if string in lst:
return string
- elif string not in list and newstring in list:
- raise argparse.ArgumentTypeError("{0} is not a supported {1}. Did you mean {2}?".format(string, type_name, newstring))
+ elif string not in lst and newstring in lst:
+ raise argparse.ArgumentTypeError(
+ "{0} is not a supported {1}. Did you mean {2}?".format(
+ string, type_name, newstring))
else:
- raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list)))
+ raise argparse.ArgumentTypeError(
+ "{0} is not a supported {1}. Supported {1}s are:\n{2}".
+ format(string, type_name, columnate(lst)))
return parse_type
return middle
@@ -281,15 +394,19 @@
argparse_lowercase_hyphen_type = argparse_type(str.lower, True)
def argparse_force_type(case):
- def middle(list, type_name):
- # validate that an argument passed in (as string) is a member of the list of possible
- # arguments after converting it's case. Offer a suggestion if the hyphens/underscores
- # do not match the expected style of the argument.
+ """ validate that an argument passed in (as string) is a member of the list
+ of possible arguments after converting it's case.
+ """
+ def middle(lst, type_name):
+ """ The parser type generator"""
def parse_type(string):
- for option in list:
+ """ The parser type"""
+ for option in lst:
if case(string) == case(option):
return option
- raise argparse.ArgumentTypeError("{0} is not a supported {1}. Supported {1}s are:\n{2}".format(string, type_name, columnate(list)))
+ raise argparse.ArgumentTypeError(
+ "{0} is not a supported {1}. Supported {1}s are:\n{2}".
+ format(string, type_name, columnate(lst)))
return parse_type
return middle
@@ -297,30 +414,42 @@
argparse_force_uppercase_type = argparse_force_type(str.upper)
argparse_force_lowercase_type = argparse_force_type(str.lower)
-# An argument parser combinator that takes in an argument parser and creates a new parser that
-# accepts a comma separated list of the same thing.
-def argparse_many(fn):
+def argparse_many(func):
+ """ An argument parser combinator that takes in an argument parser and
+ creates a new parser that accepts a comma separated list of the same thing.
+ """
def wrap(string):
- return [fn(s) for s in string.split(",")]
+ """ The actual parser"""
+ return [func(s) for s in string.split(",")]
return wrap
-# An argument parser that verifies that a string passed in corresponds to a file
-def argparse_filestring_type(string) :
- if exists(string) :
+def argparse_filestring_type(string):
+ """ An argument parser that verifies that a string passed in corresponds
+ to a file"""
+ if exists(string):
return string
- else :
- raise argparse.ArgumentTypeError("{0}"" does not exist in the filesystem.".format(string))
+ else:
+ raise argparse.ArgumentTypeError(
+ "{0}"" does not exist in the filesystem.".format(string))
+
+def columnate(strings, separator=", ", chars=80):
+ """ render a list of strings as a in a bunch of columns
-# render a list of strings as a in a bunch of columns
-def columnate(strings, seperator=", ", chars=80):
+ Positional arguments:
+ strings - the strings to columnate
+
+ Keyword arguments;
+ separator - the separation between the columns
+ chars - the maximum with of a row
+ """
col_width = max(len(s) for s in strings)
- total_width = col_width + len(seperator)
+ total_width = col_width + len(separator)
columns = math.floor(chars / total_width)
output = ""
- for i, s in zip(range(len(strings)), strings):
- append = s
+ for i, string in zip(range(len(strings)), strings):
+ append = string
if i != len(strings) - 1:
- append += seperator
+ append += separator
if i % columns == columns - 1:
append += "\n"
else:
@@ -328,13 +457,16 @@
output += append
return output
-# fail if argument provided is a parent of the specified directory
def argparse_dir_not_parent(other):
+ """fail if argument provided is a parent of the specified directory"""
def parse_type(not_parent):
+ """The parser type"""
abs_other = abspath(other)
abs_not_parent = abspath(not_parent)
if abs_not_parent == commonprefix([abs_not_parent, abs_other]):
- raise argparse.ArgumentTypeError("{0} may not be a parent directory of {1}".format(not_parent, other))
+ raise argparse.ArgumentTypeError(
+ "{0} may not be a parent directory of {1}".format(
+ not_parent, other))
else:
return not_parent
return parse_type
