Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: mbed-os
neo.py
- Committer:
- Christopher Haster 
- Date:
- 2016-03-30
- Revision:
- 69:eb600c35d6ac
- Parent:
- 68:14cc5a81333d
- Parent:
- 67:c3b033f16424
- Child:
- 70:e6f7587c6562
- Child:
- 72:08d69d476038
File content as of revision 69:eb600c35d6ac:
#!/usr/bin/env python
import argparse
import sys
import re
import subprocess
import os
import contextlib
import shutil
from collections import *
from itertools import *
# Default paths to Mercurial and Git
hg_cmd = 'hg'
git_cmd = 'git'
# Subparser handling
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
def log(msg):
    sys.stderr.write("---\n"+msg+"\n")
def error(msg, code):
    sys.stderr.write("---\nERROR: "+msg+"\n---\n")
    sys.exit(code)
    
def subcommand(name, *args, **kwargs):
    def subcommand(command):
        subparser = subparsers.add_parser(name, **kwargs)
    
        for arg in args:
            if arg.endswith('?'):
                subparser.add_argument(arg.strip('?'), nargs='?')
            elif arg.endswith('*'):
                pass
            else:
                subparser.add_argument(arg)
    
        def thunk(parsed_args):
            ordered_args = [vars(parsed_args)[arg.strip('?*')]
                            if not arg.endswith('*') else remainder
                            for arg in args]
            return command(*ordered_args)
    
        subparser.set_defaults(command=thunk)
        return command
    return subcommand
def repo_or_die():
    repo = Repo.fromrepo()
    if repo.scm is None:
        error("Current folder is not a repository", -1)
    
# Process execution
class ProcessException(Exception):
    pass
def popen(command, stdin=None, **kwargs):
    # print for debugging
    print ' '.join(command)
    proc = subprocess.Popen(command, **kwargs)
    if proc.wait() != 0:
        raise ProcessException(proc.returncode)
def pquery(command, stdin=None, **kwargs):
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, **kwargs)
    stdout, _ = proc.communicate(stdin)
    if proc.returncode != 0:
        raise ProcessException(proc.returncode)
    return stdout
# Directory navigation
@contextlib.contextmanager
def cd(newdir):
    prevdir = os.getcwd()
    os.chdir(newdir)
    try:
        yield
    finally:
        os.chdir(prevdir)
# Handling for multiple version controls
scms = OrderedDict()
def scm(name):
    def scm(cls):
        scms[name] = cls()
        return cls
    return scm
def staticclass(cls):
    for k, v in cls.__dict__.items():
        if hasattr(v, '__call__') and not k.startswith('__'):
            setattr(cls, k, staticmethod(v))
    return cls
@scm('hg')
@staticclass
class Hg(object):
    name = 'hg'
    def clone(url, name=None, hash=None):
        log("Cloning "+name+" from "+url)
        popen([hg_cmd, 'clone', url, name] + (['-u', hash] if hash else []))
    def add(file):
        log("Adding "+file)
        popen([hg_cmd, 'add', file])
        
    def remove(file):
        log("Removing "+file)
        popen([hg_cmd, 'rm', '-f', file])
        try:
            os.remove(file)
        except OSError:
            pass
    def commit():
        popen([hg_cmd, 'commit'])
        
    def push():
        log("Pushing to remote repository")
        popen([hg_cmd, 'push'])
        
    def pull(hash=None):
        log("Pulling from remote repository")
        popen([hg_cmd, 'pull'])
        popen([hg_cmd, 'update'] + (['-r', hash] if hash else []))
    def status():
        popen([hg_cmd, 'status'])
    def hash():
        return pquery([hg_cmd, 'id', '-i']).strip().strip('+')
    def dirty():
        return pquery([hg_cmd, 'status', '-q'])
    def ignore(file):
        hooked = False
        hook = 'ignore.local = .hg/hgignore'
        with open('.hg/hgrc') as f:
            if hook not in f.read().splitlines():
                with open('.hg/hgrc', 'a') as f:
                    f.write('[ui]\n')
                    f.write(hook + '\n')
        file = '^%s/' % file
        exclude = '.hg/hgignore'
        with open(exclude, 'a') as f:
            f.write(file + '\n')
    def unignore(file):
        file = '^%s/' % file
        exclude = '.hg/hgignore'
        if not os.path.isfile(exclude):
            return
        with open(exclude) as f:
            lines = f.read().splitlines()
        if file not in lines:
            return
        lines.remove(file)
        with open(exclude, 'w') as f:
            f.write('\n'.join(lines) + '\n')
@scm('git')
@staticclass
class Git(object):
    name = 'git'
    def clone(url, name=None, hash=None):
        log("Cloning "+name+" from "+url)
        popen([git_cmd, 'clone', url, name])
        if hash:
            with cd(name):
                popen([git_cmd, 'checkout', '-q', hash])
    def add(file):
        log("Adding "+file)
        popen([git_cmd, 'add', file])
        
    def remove(file):
        log("Removing "+file)
        popen([git_cmd, 'rm', '-f', file])
    def commit():
        popen([git_cmd, 'commit', '-a'])
        
    def push():
        log("Pushing to remote repository")
        popen([git_cmd, 'push', '--all'])
        
    def pull(hash=None):
        log("Pulling from remote repository")
        popen([git_cmd, 'fetch', 'origin'])
        popen([git_cmd, 'merge'] + ([hash] if hash else []))
    def status():
        popen([git_cmd, 'status', '-s'])
    def hash():
        return pquery([git_cmd, 'rev-parse', '--short', 'HEAD']).strip()
        
    def dirty():
        return pquery([git_cmd, 'diff', '--name-only', 'HEAD'])
    def ignore(file):
        exclude = '.git/info/exclude'
        with open(exclude, 'a') as f:
            f.write(file + '\n')
    def unignore(file):
        exclude = '.git/info/exclude'
        if not os.path.isfile(exclude):
            return
        with open(exclude) as f:
            lines = f.read().splitlines()
        if file not in lines:
            return
        lines.remove(file)
        with open(exclude, 'w') as f:
            f.write('\n'.join(lines) + '\n')
# Repository object
class Repo(object):
    @classmethod
    def fromurl(cls, url, path=None):
        repo = cls()
        m = re.match('^(.*/([\w+-]+)(?:\.\w+)?)/?(?:#(.*))?$', url.strip())
        repo.name = os.path.basename(path or m.group(2))
        repo.path = os.path.abspath(
            path or os.path.join(os.getcwd(), repo.name))
        repo.repo = m.group(1)
        repo.hash = m.group(3)
        return repo
    @classmethod
    def fromlib(cls, lib=None):
        assert lib.endswith('.lib')
        with open(lib) as f:
            return cls.fromurl(f.read(), lib[:-4])
    @classmethod
    def fromrepo(cls, path=None):
        repo = cls()
        repo.path = os.path.abspath(path or os.getcwd())
        repo.name = os.path.basename(repo.path)
        repo.synch()
        return repo
    @property
    def lib(self):
        return self.path + '.lib'
    @property
    def url(self):
        if self.repo:
            return self.repo + '/' + ('#'+self.hash if self.hash else '')
    def synch(self):
        if os.path.isdir(self.path):
            try:
                self.scm  = self.getscm()
                self.hash = self.gethash()
                self.libs = list(self.getlibs())
            except ProcessException:
                pass
        if os.path.isfile(self.lib):
            try:
                self.repo = self.getrepo()
            except ProcessException:
                pass
    def getscm(self):
        for name, scm in scms.items():
            if os.path.isdir(os.path.join(self.path, '.'+name)):
                return scm
    def gethash(self):
        if self.scm:
            with cd(self.path):
                return self.scm.hash()
    def getlibs(self):
        for root, dirs, files in os.walk(self.path):
            dirs[:]  = [d for d in dirs  if not d.startswith('.')]
            files[:] = [f for f in files if not f.startswith('.')]
            for file in files:
                if file.endswith('.lib'):
                    yield Repo.fromlib(os.path.join(root, file))
                    if file[:-4] in dirs:
                        dirs.remove(file[:-4])
    def getrepo(self):
        with open(self.lib) as f:
            return Repo.fromurl(f.read()).repo
    def write(self):
        if os.path.isfile(self.lib):
            with open(self.lib) as f:
                if f.read().strip() == self.url.strip():
                    print self.name, 'unmodified'
                    return
        with open(self.lib, 'w') as f:
            f.write(self.url + '\n')
        print self.name, '->', self.url
# Clone command
@subcommand('import', 'url', 'name?',
    help='Import a program tree')
def import_(url, path=None):
    repo = Repo.fromurl(url, path)
    for scm in scms.values():
        try:
            scm.clone(repo.repo, repo.path, repo.hash)
            break
        except ProcessException:
            pass
    repo.synch()
    with cd(repo.path):
        for lib in repo.libs:
            import_(lib.url, lib.path)
            repo.scm.ignore(lib.path[len(repo.path)+1:])
        if (not os.path.isfile('mbed_settings.py') and 
            os.path.isfile('mbed-os/tools/settings.py')):
            shutil.copy('mbed-os/tools/default_settings.py', 'mbed_settings.py')
# Deploy command
@subcommand('deploy',
    help='Import library in the current program or library')
def deploy():
    repo = Repo.fromrepo()
    for lib in repo.libs:
        import_(lib.url, lib.path)
        repo.scm.ignore(lib.path[len(repo.path)+1:])
# Install/uninstall command
@subcommand('add', 'url', 'name?',
    help='Add a library to the current program or library')
def add(url, path=None):
    repo = Repo.fromrepo()
    lib = Repo.fromurl(url, path)
    import_(lib.url, lib.path)
    repo.scm.ignore(lib.path[len(repo.path)+1:])
    lib.synch()
    lib.write()
    repo.scm.add(lib.lib)
@subcommand('remove', 'path',
    help='Remove a library from the current program or library')
def remove(path):
    repo = Repo.fromrepo()
    lib = Repo.fromrepo(path)
    repo.scm.remove(lib.lib)
    shutil.rmtree(lib.path)
    repo.scm.unignore(lib.path[len(repo.path)+1:])
# Publish command
@subcommand('publish',
    help='Publish working tree to remote repositories')
def publish(always=True):
    repo_or_die()
    repo = Repo.fromrepo()
    for lib in repo.libs:
        with cd(lib.path):
            publish(False)
    synch()
    dirty = repo.scm.dirty()
    if dirty:
        log('Uncommitted changes in %s (%s)' % (repo.name, repo.path))
        raw_input('Press enter to commit and push: ')
        repo.scm.commit()
    if dirty or always:
        try:
            repo.scm.push()
        except ProcessException as e:
            sys.exit(e[0])
# Update command
@subcommand('update', 'ref?',
    help='Update current program or library and recursively update all libraries')
def update(ref=None):
    repo_or_die()
    repo = Repo.fromrepo()
    repo.scm.pull(ref)
    for lib in repo.libs:
        if (not os.path.isfile(lib.lib) or
            lib.repo != Repo.fromrepo(lib.path).repo):
            with cd(lib.path):
                if lib.cwd.dirty():
                    error('Uncommitted changes in %s (%s)\n'
                        % (lib.name, lib.path), 1)
            shutil.rmtree(lib.path)
            repo.scm.unignore(lib.path[len(repo.path)+1:])
    repo.synch()
    for lib in repo.libs:
        if os.path.isdir(lib.path):
            with cd(lib.path):
                update(lib.hash)
        else:
            import_(lib.url, lib.path)
            repo.scm.ignore(lib.path[len(repo.path)+1:])
# Synch command
@subcommand('synch',
    help='Synchronize library references (.lib files)')
def synch():
    repo = Repo.fromrepo()
    for lib in repo.libs:
        lib.synch()
        lib.write()
# Compile command
@subcommand('compile', 'args*',
    help='Compile project using mbed OS build system')
def compile(args):
    if not os.path.isdir('mbed-os'):
        error('mbed-os not found?\n', -1)
    repo = Repo.fromrepo()
    macros = []
    if os.path.isfile('MACROS.txt'):
        with open('MACROS.txt') as f:
            macros = f.read().splitlines()
    env = os.environ.copy()
    env['PYTHONPATH'] = '.'
    popen(['python', 'mbed-os/tools/make.py']
        + list(chain.from_iterable(izip(repeat('-D'), macros)))
        + ['--source=%s' % repo.path,
           '--build=%s' % os.path.join(repo.path, '.build')]
        + args,
        env=env)
# Export command
@subcommand('export', 'args*',
    help='Generate project files for IDE')
def export(args):
    if not os.path.isdir('mbed-os'):
        error('mbed-os not found?\n', -1)
    repo = Repo.fromrepo()
    macros = []
    if os.path.isfile('MACROS.txt'):
        with open('MACROS.txt') as f:
            macros = f.read().splitlines()
    env = os.environ.copy()
    env['PYTHONPATH'] = '.'
    popen(['python', 'mbed-os/tools/project.py',
           '--source=%s' % repo.path]
        + list(chain.from_iterable(izip(repeat('-D'), macros)))
        + args,
        env=env)
# Helpful status commands
@subcommand('ls',
    help='list repositories recursively')
def list_(prefix=''):
    repo = Repo.fromrepo()
    print prefix + repo.name, '(%s)' % repo.hash
    for i, lib in enumerate(repo.libs):
        if prefix:
            nprefix = prefix[:-3] + ('|  ' if prefix[-3] == '|' else '   ')
        else:
            nprefix = ''
        nprefix += '|- ' if i < len(repo.libs)-1 else '`- '
        with cd(lib.path):
            list_(nprefix)
@subcommand('status',
    help='show status of nested repositories')
def status():
    repo = Repo.fromrepo()
    if repo.scm.dirty():
        print '---', repo.name, '---'
        repo.scm.status()
    for lib in repo.libs:
        with cd(lib.path):
            status()
# Parse/run command
args, remainder = parser.parse_known_args()
status = args.command(args)
sys.exit(status or 0)