Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: mbed-os
neo.py
- Committer:
- Christopher Haster
- Date:
- 2016-03-30
- Revision:
- 35:ffcfa5ace437
- Parent:
- 34:8d47baea4754
- Child:
- 36:5f4546dde73b
File content as of revision 35:ffcfa5ace437:
#!/usr/bin/env python import argparse import sys import re import subprocess import os import contextlib import shutil from collections import * from itertools import * # Subparser handling parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() def subcommand(name, *args, **kwargs): def subcommand(command): subparser = subparsers.add_parser(name, **kwargs) for arg in args: if arg.endswith('?'): subparser.add_argument(arg.strip('?'), nargs='?') elif arg.endswith('*'): pass else: subparser.add_argument(arg) def thunk(parsed_args): ordered_args = [vars(parsed_args)[arg.strip('?*')] if not arg.endswith('*') else remainder for arg in args] return command(*ordered_args) subparser.set_defaults(command=thunk) return command return subcommand # Process execution class ProcessException(Exception): pass def popen(command, stdin=None, **kwargs): # print for debugging print ' '.join(command) proc = subprocess.Popen(command, **kwargs) if proc.wait() != 0: raise ProcessException(proc.returncode) def pquery(command, stdin=None, **kwargs): proc = subprocess.Popen(command, stdout=subprocess.PIPE, **kwargs) stdout, _ = proc.communicate(stdin) if proc.returncode != 0: raise ProcessException(proc.returncode) return stdout # Directory navigation @contextlib.contextmanager def cd(newdir): prevdir = os.getcwd() os.chdir(newdir) try: yield finally: os.chdir(prevdir) def iterlibs(dir=None): for root, dirs, files in os.walk(dir or os.getcwd()): if os.path.basename(root).startswith('.'): del dirs[:] continue for file in files: if file.startswith('.'): continue elif file.endswith('.lib'): with open(os.path.join(root, file)) as f: yield f.read().strip(), os.path.join(root, file[:-4]) if file[:-4] in dirs: dirs.remove(file[:-4]) def savelib(repo): print repo.name, '->', repo.url with open(repo.lib, 'w') as f: f.write(repo.url + '\n') # Handling for multiple version controls scms = OrderedDict() def scm(name): def scm(cls): scms[name] = cls() return cls return scm def staticclass(cls): for k, v in cls.__dict__.items(): if not k.startswith('__'): setattr(cls, k, staticmethod(v)) return cls @scm('hg') @staticclass class Hg(object): def clone(url, name=None, hash=None): popen(['hg', 'clone', url, name] + (['-u', hash] if hash else [])) def add(file): popen(['hg', 'add', file]) def remove(file): popen(['hg', 'rm', '-f', file]) try: os.remove(file) except OSError: pass def commit(): popen(['hg', 'commit']) def push(): popen(['hg', 'push']) def pull(hash=None): popen(['hg', 'pull']) popen(['hg', 'update'] + (['-r', hash] if hash else [])) def hash(): return pquery(['hg', 'id', '-i']).strip().strip('+') def modified(): return pquery(['hg', 'status', '-q']) @scm('git') @staticclass class Git(object): def clone(url, name=None, hash=None): popen(['git', 'clone', url, name]) if hash: popen(['git', 'checkout', hash]) def add(file): popen(['git', 'add', file]) def remove(file): popen(['git', 'rm', '-f', file]) def commit(): popen(['git', 'commit', '-a']) def hash(): return pquery(['git', 'rev-parse', '--short', 'HEAD']).strip() def modified(): return pquery(['git', 'diff', '--name-only', 'HEAD']) # Repository object class Repo(object): def __init__(self, path=None): self.path = path or os.getcwd() self.name = os.path.basename(self.path) self.update() @classmethod def fromurl(cls, url, name=None): repo = cls.__new__(cls) url = url.strip() m = re.match('^(.*/([+a-zA-Z0-9_-]+)/?)(?:#(.*))?$', url) repo.repo = m.group(1) repo.name = name or m.group(2) repo.hash = m.group(3) repo.path = os.path.join(os.getcwd(), repo.name) return repo @property def lib(self): return self.path + '.lib' @property def url(self): if self.repo: if self.hash: return self.repo + '#' + self.hash else: return self.repo def update(self): if os.path.isdir(self.path): self.type = self.gettype() self.scm = self.getscm() self.hash = self.gethash() if os.path.isfile(self.lib): with open(self.lib) as f: temp = Repo.fromurl(f.read()) self.repo = temp.repo def gettype(self): for type, dir in [('hg', '.hg'), ('git', '.git')]: if os.path.isdir(os.path.join(self.path, dir)): return type def getscm(self): return scms[self.type] def gethash(self): with cd(self.path): return self.scm.hash() # Clone command @subcommand('import', 'url', 'name?', help='recursively import a repository') def import_(url, name=None): repo = Repo.fromurl(url, name) for scm in scms.values(): try: scm.clone(repo.repo, repo.path, repo.hash) break except ProcessException: pass repo.update() with cd(repo.path): for url, lib in iterlibs(): import_(url, lib) if (not os.path.isfile('mbed_settings.py') and os.path.isfile('mbed-os/tools/settings.py')): shutil.copy('mbed-os/tools/default_settings.py', 'mbed_settings.py') # Deploy command @subcommand('deploy', help='recursively import libraries in current directory') def deploy(): for url, lib in iterlibs(): import_(url, lib) # Install/uninstall command @subcommand('add', 'url', 'name?', help='add a library to the current repository') def add(url, name): cwd = Repo() repo = Repo.fromurl(url, name) import_(url) repo.update() savelib(repo) repo.scm.add(repo.lib) @subcommand('remove', 'library', help='remove a library from the current repository folder') def remove(library): cwd = Repo() repo = Repo(library) cwd.scm.remove(repo.lib) shutil.rmtree(repo.path) # Publish command @subcommand('publish', help='recursively publish changes to remote repositories') def publish(always=True): cwd = Repo() for url, lib in iterlibs(): if os.path.isdir(lib): with cd(lib): publish(False) synch() modified = cwd.scm.modified() if modified: print cwd.path print 'Uncommitted changes in %s' % cwd.name raw_input('Press enter to commit and push ') cwd.scm.commit() if modified or always: try: cwd.scm.push() except ProcessException as e: sys.exit(e[0]) # Update command @subcommand('update', 'ref?', help='recursively updates libraries and current repository') def update(ref=None): cwd = Repo() cwd.scm.pull(ref) for url, lib in iterlibs(): repo = Repo.fromurl(url, lib) if os.path.isdir(lib): with cd(repo.path): update(repo.hash) else: import_(url, lib) # Synch command @subcommand('synch', help='synchronize lib files') def synch(): cwd = Repo() for url, lib in iterlibs(): repo = Repo.fromurl(url, lib) repo.update() if url == repo.url: continue savelib(repo) # Compile command @subcommand('compile', 'args*', help='compile project using workspace_tools') def compile(args): cwd = Repo() if not os.path.isdir('mbed-os'): sys.stderr.write('Warning! mbed-os not found?') sys.exit(-1) macros = [] if os.path.isfile('MACROS.txt'): with open('MACROS.txt') as f: macros = f.read().splitlines() env = os.environ.copy() env['PYTHONPATH'] = '.' popen(['python', 'mbed-os/tools/make.py'] + list(chain.from_iterable(izip(repeat('-D'), macros))) + ['--source=%s' % cwd.path, '--build=%s' % os.path.join(cwd.path, '.build')] + args, env=env) # Export command @subcommand('export', 'args*', help='generate project files') def export(args): cwd = Repo() if not os.path.isdir('mbed-os'): sys.stderr.write('Warning! mbed-os not found?') sys.exit(-1) env = os.environ.copy() env['PYTHONPATH'] = '.' popen(['python', 'mbed-os/tools/project.py', '--source=%s' % cwd.path] + args, env=env) # Parse/run command args, remainder = parser.parse_known_args() status = args.command(args) sys.exit(status or 0)