Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependencies: mbed-os
neo.py
- Committer:
- screamer
- Date:
- 2016-04-01
- Revision:
- 60:52dc8fcb07d8
- Parent:
- 54:3cb9f99bbaaa
- Child:
- 62:d6a6204fb327
File content as of revision 60:52dc8fcb07d8:
#!/usr/bin/env python
import argparse
import sys
import re
import subprocess
import os
import contextlib
import shutil
from collections import *
from itertools import *
# Default paths to Mercurial and Git
hg_cmd = 'hg'
git_cmd = 'git'
# Subparser handling
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
def subcommand(name, *args, **kwargs):
def subcommand(command):
subparser = subparsers.add_parser(name, **kwargs)
for arg in args:
if arg.endswith('?'):
subparser.add_argument(arg.strip('?'), nargs='?')
elif arg.endswith('*'):
pass
else:
subparser.add_argument(arg)
def thunk(parsed_args):
ordered_args = [vars(parsed_args)[arg.strip('?*')]
if not arg.endswith('*') else remainder
for arg in args]
return command(*ordered_args)
subparser.set_defaults(command=thunk)
return command
return subcommand
# Process execution
class ProcessException(Exception):
pass
def popen(command, stdin=None, **kwargs):
# print for debugging
print ' '.join(command)
proc = subprocess.Popen(command, **kwargs)
if proc.wait() != 0:
raise ProcessException(proc.returncode)
def pquery(command, stdin=None, **kwargs):
proc = subprocess.Popen(command, stdout=subprocess.PIPE, **kwargs)
stdout, _ = proc.communicate(stdin)
if proc.returncode != 0:
raise ProcessException(proc.returncode)
return stdout
# Directory navigation
@contextlib.contextmanager
def cd(newdir):
prevdir = os.getcwd()
os.chdir(newdir)
try:
yield
finally:
os.chdir(prevdir)
# Handling for multiple version controls
scms = OrderedDict()
def scm(name):
def scm(cls):
scms[name] = cls()
return cls
return scm
def staticclass(cls):
for k, v in cls.__dict__.items():
if hasattr(v, '__call__') and not k.startswith('__'):
setattr(cls, k, staticmethod(v))
return cls
@scm('hg')
@staticclass
class Hg(object):
def clone(url, name=None, hash=None):
popen([hg_cmd, 'clone', url, name] + (['-u', hash] if hash else []))
def add(file): popen([hg_cmd, 'add', file])
def remove(file):
popen([hg_cmd, 'rm', '-f', file])
try:
os.remove(file)
except OSError:
pass
def commit(): popen([hg_cmd, 'commit'])
def push(): popen([hg_cmd, 'push'])
def pull(hash=None):
popen([hg_cmd, 'pull'])
popen([hg_cmd, 'update'] + (['-r', hash] if hash else []))
def hash(): return pquery([hg_cmd, 'id', '-i']).strip().strip('+')
def dirty(): return pquery([hg_cmd, 'status', '-q'])
def ignore(file):
hooked = False
hook = 'ignore.local = .hg/hgignore'
with open('.hg/hgrc') as f:
if hook not in f.read().splitlines():
with open('.hg/hgrc', 'a') as f:
f.write('[ui]\n')
f.write(hook + '\n')
file = '^%s/' % file
exclude = '.hg/hgignore'
with open(exclude, 'a') as f:
f.write(file + '\n')
def unignore(file):
file = '^%s/' % file
exclude = '.hg/hgignore'
if not os.path.isfile(exclude):
return
with open(exclude) as f:
lines = f.read().splitlines()
if file not in lines:
return
lines.remove(file)
with open(exclude, 'w') as f:
f.write('\n'.join(lines) + '\n')
@scm('git')
@staticclass
class Git(object):
def clone(url, name=None, hash=None):
popen([git_cmd, 'clone', url, name])
if hash:
with cd(name):
popen([git_cmd, 'checkout', '-q', hash])
def add(file): popen([git_cmd, 'add', file])
def remove(file): popen([git_cmd, 'rm', '-f', file])
def commit(): popen([git_cmd, 'commit', '-a'])
def push(): popen([git_cmd, 'push', '--all'])
def pull(hash=None):
popen([git_cmd, 'fetch', 'origin'])
popen([git_cmd, 'merge'] + ([hash] if hash else []))
def hash(): return pquery([git_cmd, 'rev-parse', '--short', 'HEAD']).strip()
def dirty(): return pquery([git_cmd, 'diff', '--name-only', 'HEAD'])
def ignore(file):
exclude = '.git/info/exclude'
with open(exclude, 'a') as f:
f.write(file + '\n')
def unignore(file):
exclude = '.git/info/exclude'
if not os.path.isfile(exclude):
return
with open(exclude) as f:
lines = f.read().splitlines()
if file not in lines:
return
lines.remove(file)
with open(exclude, 'w') as f:
f.write('\n'.join(lines) + '\n')
# Repository object
class Repo(object):
@classmethod
def fromurl(cls, url, path=None):
repo = cls()
m = re.match('^(.*/([\w+-]+)(?:\.\w+)?)/?(?:#(.*))?$', url.strip())
repo.name = os.path.basename(path or m.group(2))
repo.path = os.path.abspath(
path or os.path.join(os.getcwd(), repo.name))
repo.repo = m.group(1)
repo.hash = m.group(3)
return repo
@classmethod
def fromlib(cls, lib=None):
assert lib.endswith('.lib')
with open(lib) as f:
return cls.fromurl(f.read(), lib[:-4])
@classmethod
def fromrepo(cls, path=None):
repo = cls()
repo.path = os.path.abspath(path or os.getcwd())
repo.name = os.path.basename(repo.path)
repo.synch()
return repo
@property
def lib(self):
return self.path + '.lib'
@property
def url(self):
if self.repo:
return self.repo + '/' + ('#'+self.hash if self.hash else '')
def synch(self):
if os.path.isdir(self.path):
try:
self.scm = self.getscm()
self.hash = self.gethash()
self.libs = list(self.getlibs())
except ProcessException:
pass
if os.path.isfile(self.lib):
try:
self.repo = self.getrepo()
except ProcessException:
pass
def getscm(self):
for name, scm in scms.items():
if os.path.isdir(os.path.join(self.path, '.'+name)):
return scm
def gethash(self):
with cd(self.path):
return self.scm.hash()
def getlibs(self):
for root, dirs, files in os.walk(self.path):
dirs[:] = [d for d in dirs if not d.startswith('.')]
files[:] = [f for f in files if not f.startswith('.')]
for file in files:
if file.endswith('.lib'):
yield Repo.fromlib(os.path.join(root, file))
if file[:-4] in dirs:
dirs.remove(file[:-4])
def getrepo(self):
with open(self.lib) as f:
return Repo.fromurl(f.read()).repo
def write(self):
if os.path.isfile(self.lib):
with open(self.lib) as f:
if f.read().strip() == self.url.strip():
print self.name, 'unmodified'
return
with open(self.lib, 'w') as f:
f.write(self.url + '\n')
print self.name, '->', self.url
# Clone command
@subcommand('import', 'url', 'name?',
help='recursively import a repository')
def import_(url, path=None):
repo = Repo.fromurl(url, path)
for scm in scms.values():
try:
scm.clone(repo.repo, repo.path, repo.hash)
break
except ProcessException:
pass
repo.synch()
with cd(repo.path):
for lib in repo.libs:
import_(lib.url, lib.path)
repo.scm.ignore(lib.path[len(repo.path)+1:])
if (not os.path.isfile('mbed_settings.py') and
os.path.isfile('mbed-os/tools/settings.py')):
shutil.copy('mbed-os/tools/default_settings.py', 'mbed_settings.py')
# Deploy command
@subcommand('deploy',
help='recursively import libraries in current directory')
def deploy():
repo = Repo.fromrepo()
for lib in repo.libs:
import_(lib.url, lib.path)
repo.scm.ignore(lib.path[len(repo.path)+1:])
# Install/uninstall command
@subcommand('add', 'url', 'name?',
help='add a library to the current repository')
def add(url, path=None):
repo = Repo.fromrepo()
lib = Repo.fromurl(url, path)
import_(lib.url, lib.path)
repo.scm.ignore(lib.path[len(repo.path)+1:])
lib.synch()
lib.write()
repo.scm.add(lib.lib)
@subcommand('remove', 'path',
help='remove a library from the current repository folder')
def remove(path):
repo = Repo.fromrepo()
lib = Repo.fromrepo(path)
repo.scm.remove(lib.lib)
shutil.rmtree(lib.path)
repo.scm.unignore(lib.path[len(repo.path)+1:])
# Publish command
@subcommand('publish',
help='recursively publish changes to remote repositories')
def publish(always=True):
repo = Repo.fromrepo()
for lib in repo.libs:
with cd(lib.path):
publish(False)
synch()
dirty = repo.scm.dirty()
if dirty:
print 'Uncommitted changes in %s (%s)' % (repo.name, repo.path)
raw_input('Press enter to commit and push ')
repo.scm.commit()
if dirty or always:
try:
repo.scm.push()
except ProcessException as e:
sys.exit(e[0])
# Update command
@subcommand('update', 'ref?',
help='recursively updates libraries and current repository')
def update(ref=None):
repo = Repo.fromrepo()
repo.scm.pull(ref)
for lib in repo.libs:
if (not os.path.isfile(lib.lib) or
lib.repo != Repo.fromrepo(lib.path).repo):
with cd(lib.path):
if lib.cwd.dirty():
sys.stderr.write('Uncommitted changes in %s (%s)\n'
% (lib.name, lib.path))
sys.exit(1)
shutil.rmtree(lib.path)
repo.scm.unignore(lib.path[len(repo.path)+1:])
repo.synch()
for lib in repo.libs:
if os.path.isdir(lib.path):
with cd(lib.path):
update(lib.hash)
else:
import_(lib.url, lib.path)
repo.scm.ignore(lib.path[len(repo.path)+1:])
# Synch command
@subcommand('synch',
help='synchronize lib files')
def synch():
repo = Repo.fromrepo()
for lib in repo.libs:
lib.synch()
lib.write()
# Compile command
@subcommand('compile', 'args*',
help='compile project using workspace_tools')
def compile(args):
if not os.path.isdir('mbed-os'):
sys.stderr.write('Warning! mbed-os not found?\n')
sys.exit(-1)
repo = Repo.fromrepo()
macros = []
if os.path.isfile('MACROS.txt'):
with open('MACROS.txt') as f:
macros = f.read().splitlines()
env = os.environ.copy()
env['PYTHONPATH'] = '.'
popen(['python', 'mbed-os/tools/make.py']
+ list(chain.from_iterable(izip(repeat('-D'), macros)))
+ ['--source=%s' % repo.path,
'--build=%s' % os.path.join(repo.path, '.build')]
+ args,
env=env)
# Export command
@subcommand('export', 'args*',
help='generate project files')
def export(args):
if not os.path.isdir('mbed-os'):
sys.stderr.write('Warning! mbed-os not found?\n')
sys.exit(-1)
repo = Repo.fromrepo()
macros = []
if os.path.isfile('MACROS.txt'):
with open('MACROS.txt') as f:
macros = f.read().splitlines()
env = os.environ.copy()
env['PYTHONPATH'] = '.'
popen(['python', 'mbed-os/tools/project.py',
'--source=%s' % repo.path]
+ list(chain.from_iterable(izip(repeat('-D'), macros)))
+ args,
env=env)
# Parse/run command
args, remainder = parser.parse_known_args()
status = args.command(args)
sys.exit(status or 0)