Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: mbed-os
neo.py
- Committer:
- Christopher Haster
- Date:
- 2016-03-30
- Revision:
- 44:5ff277e7f754
- Parent:
- 43:8a4a902a0654
- Child:
- 45:5a91306f7924
File content as of revision 44:5ff277e7f754:
#!/usr/bin/env python import argparse import sys import re import subprocess import os import contextlib import shutil from collections import * from itertools import * # Subparser handling parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() def subcommand(name, *args, **kwargs): def subcommand(command): subparser = subparsers.add_parser(name, **kwargs) for arg in args: if arg.endswith('?'): subparser.add_argument(arg.strip('?'), nargs='?') elif arg.endswith('*'): pass else: subparser.add_argument(arg) def thunk(parsed_args): ordered_args = [vars(parsed_args)[arg.strip('?*')] if not arg.endswith('*') else remainder for arg in args] return command(*ordered_args) subparser.set_defaults(command=thunk) return command return subcommand # Process execution class ProcessException(Exception): pass def popen(command, stdin=None, **kwargs): # print for debugging print ' '.join(command) proc = subprocess.Popen(command, **kwargs) if proc.wait() != 0: raise ProcessException(proc.returncode) def pquery(command, stdin=None, **kwargs): proc = subprocess.Popen(command, stdout=subprocess.PIPE, **kwargs) stdout, _ = proc.communicate(stdin) if proc.returncode != 0: raise ProcessException(proc.returncode) return stdout # Directory navigation @contextlib.contextmanager def cd(newdir): prevdir = os.getcwd() os.chdir(newdir) try: yield finally: os.chdir(prevdir) # Handling for multiple version controls scms = OrderedDict() def scm(name): def scm(cls): scms[name] = cls() return cls return scm def staticclass(cls): for k, v in cls.__dict__.items(): if hasattr(v, '__call__') and not k.startswith('__'): setattr(cls, k, staticmethod(v)) return cls @scm('hg') @staticclass class Hg(object): def clone(url, name=None, hash=None): popen(['hg', 'clone', url, name] + (['-u', hash] if hash else [])) # add exclude file with cd(name): with open('.hg/hgrc', 'a') as f: f.write('[ui]\n') f.write('ignore.local = .hg/exclude\n') def add(file): popen(['hg', 'add', file]) def remove(file): popen(['hg', 'rm', '-f', file]) try: os.remove(file) except OSError: pass def commit(): popen(['hg', 'commit']) def push(): popen(['hg', 'push']) def pull(hash=None): popen(['hg', 'pull']) popen(['hg', 'update'] + (['-r', hash] if hash else [])) def hash(): return pquery(['hg', 'id', '-i']).strip().strip('+') def dirty(): return pquery(['hg', 'status', '-q']) def ignore(file): file = '^%s/' % file exclude = '.hg/exclude' with open(exclude, 'a') as f: f.write(file + '\n') def unignore(file): file = '^%s/' % file exclude = '.hg/exclude' if not os.path.isfile(exclude): return with open(exclude) as f: lines = f.read().splitlines() if file not in lines: return lines.remove(file) with open(exclude, 'w') as f: f.write('\n'.join(lines) + '\n') @scm('git') @staticclass class Git(object): def clone(url, name=None, hash=None): popen(['git', 'clone', url, name]) if hash: with cd(name): popen(['git', 'checkout', '-q', hash]) def add(file): popen(['git', 'add', file]) def remove(file): popen(['git', 'rm', '-f', file]) def commit(): popen(['git', 'commit', '-a']) def push(): popen(['git', 'push', '--all']) def pull(hash=None): popen(['git', 'fetch', 'origin']) popen(['git', 'merge'] + ([hash] if hash else [])) def hash(): return pquery(['git', 'rev-parse', '--short', 'HEAD']).strip() def dirty(): return pquery(['git', 'diff', '--name-only', 'HEAD']) def ignore(file): exclude = '.git/info/exclude' with open(exclude, 'a') as f: f.write(file + '\n') def unignore(file): exclude = '.git/info/exclude' if not os.path.isfile(exclude): return with open(exclude) as f: lines = f.read().splitlines() if file not in lines: return lines.remove(file) with open(exclude, 'w') as f: f.write('\n'.join(lines) + '\n') # Repository object class Repo(object): @classmethod def fromurl(cls, url, path=None): repo = cls() m = re.match('^(.*/([+a-zA-Z0-9_-]+)/?)(?:#(.*))?$', url.strip()) repo.name = os.path.basename(path or m.group(2)) repo.path = os.path.abspath( path or os.path.join(os.getcwd(), repo.name)) repo.repo = m.group(1) repo.hash = m.group(3) return repo @classmethod def fromlib(cls, lib=None): assert lib.endswith('.lib') with open(lib) as f: return cls.fromurl(f.read(), lib[:-4]) @classmethod def fromrepo(cls, path=None): repo = cls() repo.path = os.path.abspath(path or os.getcwd()) repo.name = os.path.basename(repo.path) repo.synch() return repo @property def lib(self): return self.path + '.lib' @property def url(self): if self.repo: return self.repo + ('#'+self.hash if self.hash else '') def synch(self): if os.path.isdir(self.path): try: self.scm = self.getscm() self.hash = self.gethash() self.libs = list(self.getlibs()) except ProcessException: pass if os.path.isfile(self.lib): try: self.repo = self.getrepo() except ProcessException: pass def getscm(self): for name, scm in scms.items(): if os.path.isdir(os.path.join(self.path, '.'+name)): return scm def gethash(self): with cd(self.path): return self.scm.hash() def getlibs(self): for root, dirs, files in os.walk(self.path): dirs[:] = [d for d in dirs if not d.startswith('.')] files[:] = [f for f in files if not f.startswith('.')] for file in files: if file.endswith('.lib'): yield Repo.fromlib(os.path.join(root, file)) def getrepo(self): with open(self.lib) as f: return Repo.fromurl(f.read()).repo def write(self): if os.path.isfile(self.lib): with open(self.lib) as f: if f.read().strip() == self.url.strip(): print self.name, 'unmodified' return with open(self.lib, 'w') as f: f.write(self.url + '\n') print self.name, '->', self.url # Clone command @subcommand('import', 'url', 'name?', help='recursively import a repository') def import_(url, path=None): repo = Repo.fromurl(url, path) for scm in scms.values(): try: scm.clone(repo.repo, repo.path, repo.hash) break except ProcessException: pass repo.synch() with cd(repo.path): for lib in repo.libs: import_(lib.url, lib.path) repo.scm.ignore(lib.path[len(repo.path)+1:]) if (not os.path.isfile('mbed_settings.py') and os.path.isfile('mbed-os/tools/settings.py')): shutil.copy('mbed-os/tools/default_settings.py', 'mbed_settings.py') # Deploy command @subcommand('deploy', help='recursively import libraries in current directory') def deploy(): repo = Repo.fromrepo() for lib in repo.libs: import_(lib.url, lib.path) repo.scm.ignore(lib.path[len(repo.path)+1:]) # Install/uninstall command @subcommand('add', 'url', 'name?', help='add a library to the current repository') def add(url, path=None): repo = Repo.fromrepo() lib = Repo.fromurl(url, path) import_(lib.url, lib.path) repo.scm.ignore(lib.path[len(repo.path)+1:]) lib.synch() lib.write() repo.scm.add(lib.lib) @subcommand('remove', 'path', help='remove a library from the current repository folder') def remove(path): repo = Repo.fromrepo() lib = Repo.fromrepo(path) repo.scm.remove(lib.lib) shutil.rmtree(lib.path) repo.scm.unignore(lib.path[len(repo.path)+1:]) # Publish command @subcommand('publish', help='recursively publish changes to remote repositories') def publish(always=True): repo = Repo.fromrepo() for lib in repo.libs: with cd(lib.path): publish(False) synch() dirty = repo.scm.dirty() if dirty: print 'Uncommitted changes in %s (%s)' % (repo.name, repo.path) raw_input('Press enter to commit and push ') repo.scm.commit() if dirty or always: try: repo.scm.push() except ProcessException as e: sys.exit(e[0]) # Update command @subcommand('update', 'ref?', help='recursively updates libraries and current repository') def update(ref=None): repo = Repo.fromrepo() repo.scm.pull(ref) for lib in repo.libs: if (not os.path.isfile(lib.lib) or lib.repo != Repo.fromrepo(lib.path).repo): with cd(lib.path): if lib.cwd.dirty(): sys.stderr.write('Uncommitted changes in %s (%s)\n' % (lib.name, lib.path)) sys.exit(1) shutil.rmtree(lib.path) repo.scm.unignore(lib.path[len(repo.path)+1:]) repo.synch() for lib in repo.libs: if not os.path.isdir(lib.path): import_(lib.url, lib.path) repo.scm.ignore(lib.path[len(repo.path)+1:]) else: with cd(lib.path): update(lib.hash) # Synch command @subcommand('synch', help='synchronize lib files') def synch(): repo = Repo.fromrepo() for lib in repo.libs: lib.synch() lib.write() # Compile command @subcommand('compile', 'args*', help='compile project using workspace_tools') def compile(args): if not os.path.isdir('mbed-os'): sys.stderr.write('Warning! mbed-os not found?\n') sys.exit(-1) repo = Repo.fromrepo() macros = [] if os.path.isfile('MACROS.txt'): with open('MACROS.txt') as f: macros = f.read().splitlines() env = os.environ.copy() env['PYTHONPATH'] = '.' popen(['python', 'mbed-os/tools/make.py'] + list(chain.from_iterable(izip(repeat('-D'), macros))) + ['--source=%s' % repo.path, '--build=%s' % os.path.join(repo.path, '.build')] + args, env=env) # Export command @subcommand('export', 'args*', help='generate project files') def export(args): if not os.path.isdir('mbed-os'): sys.stderr.write('Warning! mbed-os not found?\n') sys.exit(-1) repo = Repo.fromrepo() macros = [] if os.path.isfile('MACROS.txt'): with open('MACROS.txt') as f: macros = f.read().splitlines() env = os.environ.copy() env['PYTHONPATH'] = '.' popen(['python', 'mbed-os/tools/project.py', '--source=%s' % repo.path] + list(chain.from_iterable(izip(repeat('-D'), macros))) + args, env=env) ## List command #@subcommand('list', # help='list recursive libraries') #def # Parse/run command args, remainder = parser.parse_known_args() status = args.command(args) sys.exit(status or 0)