#!/usr/bin/env python # Josh's SVN wrapper script # # Recommend putting in path as 'jsvn' or something other than 'svn' and # making an alias svn='jsvn'. For example, this .bash_aliases stanza # will check if 'jsvn' is in your path and automatically alias 'svn' to # it if so: # if [[ "$(which jsvn 2>/dev/null)" != "" ]]; then # alias svn='jsvn' # fi # # Implemented subcommands: # add ... # - add files as usual; add recursive contents of directories # branch[es] [[-d] ] # - with no arguments, list branches with '*' by the current one # - with -d, delete # - otherwise, create a new branch from the current one # tag[s] [[-d] ] | [-m ] # - with no arguments, list tags # - with -d, delete # - with -m, rename to # - otherwise, create a new tag from the current branch # switch # - switch to 'trunk', branch name, or tag name without having to specify # the full URL # - falls back to Subversion "switch" if doesn't exist # merge # - merge branch into the current WC path # - falls back to Subversion "merge" if doesn't exist # root # - output root URL (for use on shell such as "svn log $(svn root)/tags") # watch-lock # - block until the lock on a file/URL is released # users # - show a list of contributing users to a SVN path # binaries [--set-lock] # - show a list of versioned binary files under the current path, with # a prepended '*' for those with svn:needs-lock set # - with --set-lock, set svn:needs-lock to '*' for binaries # lockable [--remove] | [--status] # - with no switches, set svn:needs-lock to '*' for file[s] # - with --remove, remove svn:needs-lock' for file[s] # - with --status, prepended '*' for those with svn:needs-lock set # externals # - print a list of the externals in the repository # # The following subcommands are executed using their native handler, but # have their output simplified and/or colorized: # - diff # - log # - status # - update # # If the subcommand name begins with two leading underscores ("__"), the # underscores will be stripped and the command will be handled by native # Subversion without any jsvn processing. import sys import os import re import time from subprocess import * import traceback STATUS_LINE_REGEX = r'[ACDIMRX?!~ ][CM ][L ][+ ][SX ][KOTB ]' ########################################################################### # Subcommand Handler Return Values # ########################################################################### RET_OK = 0 RET_ERR = 1 RET_REEXEC = 2 ########################################################################### # ANSI escape color code values # ########################################################################### COLORS = { 'black': 0, 'red': 1, 'green': 2, 'yellow': 3, 'blue': 4, 'magenta': 5, 'cyan': 6, 'white': 7, } using_color = False ########################################################################### # Configuration # ########################################################################### def get_config(): config = { 'pager': 'less -FRX', 'use_pager': True, 'use_color': True, 'aliases': {}, } pth = os.path.expanduser('~/.jsvn') if os.path.exists(pth): fh = open(pth, 'r') script = fh.read() fh.close() try: exec(script, config) except: sys.stderr.write('Configuration file error in "%s":\n' % pth) traceback.print_exception(sys.exc_info()[0], sys.exc_info()[1], None) tb = traceback.extract_tb(sys.exc_info()[2]) for ent in tb[1:]: lineno, fn = ent[1:3] sys.stderr.write(' File "%s", line %d, in %s\n' % (pth, lineno, fn)) return config ########################################################################### # Utility Functions # ########################################################################### def ansi_color(out, fg=None, bg=None, bold=False): if using_color: bc = 1 if bold else 0 if fg is not None: out.write('\033[%d;%dm' % (bc, 30 + COLORS[fg])) if bg is not None: out.write('\033[%d;%dm' % (bc, 40 + COLORS[bg])) def ansi_reset(out): if using_color: out.write('\033[0m') def colordiff(out, line): if re.match(r'Index:\s', line): ansi_color(out, 'yellow') out.write(line) ansi_reset(out) return if re.match(r'={67}', line): ansi_color(out, 'yellow') out.write(line) ansi_reset(out) return if re.match(r'-', line): ansi_color(out, 'red') out.write(line) ansi_reset(out) return elif re.match(r'\+', line): ansi_color(out, 'green') out.write(line) ansi_reset(out) return m = re.match(r'(@@.*@@)(.*)', line) if m is None: m = re.match(r'(##.*##)(.*)', line) if m is not None: ansi_color(out, 'cyan') out.write(m.group(1)) ansi_reset(out) out.write(m.group(2)) out.write('\n') return out.write(line) def findInPath(cmd): path_entries = os.environ['PATH'].split(os.pathsep) for p in path_entries: full_path = os.path.join(p, cmd) if os.path.exists(full_path): return full_path return '' def getSVNURL(svn): for line in Popen([svn, 'info'], stdout=PIPE).communicate()[0].split('\n'): m = re.match(r'^URL:\s*(.*?)\s*$', line) if m is not None: return m.group(1) return '' def getSVNRoot(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts)): if parts[i] in ('trunk', 'tags', 'branches'): return '/'.join(parts[:i]) return '' def getSVNRelPath(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts) - 1): if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'): return '/' + '/'.join(parts[i+1:]) return '/' def getSVNTopLevel(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts)): if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'): return '/'.join(parts[:i+1]) return '' def getSVNBranchList(svn): colist = [] root = getSVNRoot(svn) lines = Popen([svn, 'ls', root + '/branches'], stdout=PIPE).communicate()[0].split('\n') for line in lines: if re.match(r'^\s*$', line) is None: colist.append(re.sub(r'/$', '', line)) return colist def getSVNTagList(svn): colist = [] root = getSVNRoot(svn) lines = Popen([svn, 'ls', root + '/tags'], stdout=PIPE).communicate()[0].split('\n') for line in lines: if re.match(r'^\s*$', line) is None: colist.append(re.sub(r'/$', '', line)) return colist def getSVNProperty(svn, prop, path): return Popen([svn, 'propget', prop, path], stdout=PIPE).communicate()[0] def setSVNProperty(svn, prop, val, path): Popen([svn, 'propset', prop, val, path], stdout=PIPE).wait() def delSVNProperty(svn, prop, path): Popen([svn, 'propdel', prop, path], stdout=PIPE).wait() def filter_update(pout, out): external = '' external_printed = True any_external_printed = False for line in iter(pout.readline, ''): m = re.match(r"Fetching external item into '(.*)':", line) if m is not None: external = m.group(1) external_printed = False continue if re.match(r'\s*$', line): continue if re.match(r'External at revision ', line): if external_printed: out.write(line) continue if re.match(r'(Updated.to|At) revision', line): if any_external_printed: out.write('\n') out.write(line) continue # anything not matched yet will cause an external to be shown if not external_printed: out.write("\nExternal '%s':\n" % external) external_printed = True any_external_printed = True if re.match(r'[ADUCGER ]{2}[B ][C ] ', line): action = line[0] if action == 'A': ansi_color(out, 'green') elif action == 'D': ansi_color(out, 'red') elif action == 'C': ansi_color(out, 'yellow') elif action == 'G': ansi_color(out, 'cyan') out.write(line) ansi_reset(out) continue out.write(line) def get_unknowns(svn): unknowns = [] pout = Popen([svn, 'status'], stdout=PIPE).stdout for line in iter(pout.readline, ''): m = re.match(r'\? (.*)$', line) if m is not None: unknowns.append(m.group(1)) return unknowns def descendant_path(child, parent): if child[0] != '/' and parent[0] == '/': child = os.getcwd() + '/' + child elif child[0] == '/' and parent[0] != '/': parent = os.getcwd() + '/' + parent if child == parent: return True if child.startswith(parent): if child[len(parent)] == '/': return True return False ########################################################################### # Subcommand Handlers # ########################################################################### def add(argv, svn, out): if len(argv) < 2: # do not handle if no targets are passed return RET_REEXEC if len(filter(lambda x: x.startswith('-'), argv)) != 0: # do not handle if any options are passed return RET_REEXEC # for each target specified, check if there are unversioned items # underneath it (for directories) and add them as well # if none are found, fall back to the native svn add unknowns = get_unknowns(svn) for path in argv[1:]: if path == '.': path = os.getcwd() if path.endswith('/'): path = path[:-1] found_one = False for u in unknowns: if descendant_path(u, path): Popen([svn, 'add', u], stdout=out).wait() found_one = True if not found_one: Popen([svn, 'add', path], stdout=out).wait() return RET_OK def branch(argv, svn, out): if len(argv) < 2: bl = ['trunk'] + getSVNBranchList(svn) current = getSVNTopLevel(svn).split('/')[-1] bl.sort() for b in bl: if b == current: out.write('*') ansi_color(out, 'green') else: out.write(' ') out.write(b + '\n') if b == current: ansi_reset(out) return RET_OK branch_name = argv[-1] origin = getSVNTopLevel(svn) root = getSVNRoot(svn) if len(argv) >= 3 and argv[1] == "-d": # delete branch in argv[2] Popen([svn, 'rm', root + '/branches/' + argv[2], '-m', "Removed branch '%s'" % branch_name], stdout=out).wait() return RET_OK if origin == '' or root == '': sys.stderr.write("Could not determine origin/root URL\n") return RET_ERR comment = "Created '%s' branch" % branch_name branch_path = root + '/branches/' + branch_name Popen([svn, 'copy', origin, branch_path, '-m', comment], stdout=out).wait() return RET_OK def tag(argv, svn, out): tl = getSVNTagList(svn) if len(argv) < 2: tl.sort() for t in tl: out.write(t + '\n') return RET_OK tag_name = argv[-1] origin = getSVNTopLevel(svn) root = getSVNRoot(svn) if origin == '' or root == '': sys.stderr.write("Could not determine origin/root URL\n") return RET_ERR if len(argv) == 4 and argv[1] == '-m': old_tag_name = argv[2] if not old_tag_name in tl: sys.stderr.write('Tag %s not found!\n' % old_tag_name) return RET_ERR Popen([svn, 'mv', root + '/tags/' + old_tag_name, root + '/tags/' + tag_name, '-m', "Renamed tag '%s' to '%s'" % (old_tag_name, tag_name)], stdout=out).wait() return RET_OK if len(argv) >= 3 and argv[1] == "-d": if not tag_name in tl: sys.stderr.write('Tag %s not found!\n' % tag_name) return RET_ERR # delete tag in argv[2] Popen([svn, 'rm', root + '/tags/' + tag_name, '-m', "Removed tag '%s'" % tag_name], stdout=out).wait() return RET_OK comment = "Created '%s' tag" % tag_name tag_path = root + '/tags/' + tag_name Popen([svn, 'copy', origin, tag_path, '-m', comment], stdout=out).wait() return RET_OK def switch(argv, svn, out): if len(argv) < 2: return RET_REEXEC switched = False root = getSVNRoot(svn) path = getSVNRelPath(svn) while True: if argv[1] == 'trunk': pout = Popen([svn, 'switch', root + '/trunk' + path], stdout=PIPE).stdout filter_update(pout, out) switched = True break bl = getSVNBranchList(svn) if argv[1] in bl: pout = Popen([svn, 'switch', root + '/branches/' + argv[1] + path], stdout=PIPE).stdout filter_update(pout, out) switched = True break tl = getSVNTagList(svn) if argv[1] in tl: pout = Popen([svn, 'switch', root + '/tags/' + argv[1] + path], stdout=PIPE).stdout filter_update(pout, out) switched = True break if switched: Popen(svn + ' info | grep --color=none "^URL:"', shell=True, stdout=out).wait() return RET_OK pout = Popen([svn] + argv, stdout=PIPE).stdout filter_update(pout, out) return RET_OK def merge(argv, svn, out): if len(argv) < 2: return RET_REEXEC root = getSVNRoot(svn) branches = getSVNBranchList(svn) if not argv[1] in branches: return RET_REEXEC lines = Popen([svn, 'log', '--stop-on-copy', root + '/branches/' + argv[1]], stdout=PIPE).communicate()[0].split('\n') rev = 0 for line in lines: m = re.match(r'^r(\d+)\s', line) if m is not None: rev = m.group(1) if rev == 0: sys.stderr.write('Could not get first branch revision\n') return RET_ERR path = getSVNRelPath(svn) Popen([svn, 'merge', '-r%s:HEAD' % rev, root + '/branches/' + argv[1] + path, '.'], stdout=out).wait() return RET_OK def watch_lock(argv, svn, out): if len(argv) < 2: return RET_ERR path = argv[1] if os.path.exists(path): # Get the repository URL of the file being watched p = Popen([svn, 'info', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match(r'URL: (.*)', line) if m is not None: path = m.group(1) break last_lock_owner = '' while True: lock_owner = '' p = Popen([svn, 'info', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match(r'Lock\sOwner:\s*(.*)', line) if m is not None: lock_owner = m.group(1) break if lock_owner == '': break if lock_owner != last_lock_owner: out.write('Locked by: %s\n' % lock_owner) last_lock_owner = lock_owner time.sleep(60) out.write(''' _ _ _ _ _ _ | | | |_ __ | | ___ ___| | _____ __| | | | | | | '_ \| |/ _ \ / __| |/ / _ \/ _` | | | |_| | | | | | (_) | (__| < __/ (_| |_| \___/|_| |_|_|\___/ \___|_|\_\___|\__,_(_) ''') return RET_OK def users(argv, svn, out): path = '.' if len(argv) > 1: path = argv[1] users = {} p = Popen([svn, 'log', '-q', path], stdout=PIPE) for line in iter(p.stdout.readline, ''): m = re.match('r\d+\s*\|([^|]+)\|', line) if m is not None: user = m.group(1).strip() if not user.lower() in users: users[user.lower()] = [user, 1] else: users[user.lower()][1] += 1 values = users.values() values.sort(key = lambda x: x[1], reverse = True) for v in values: out.write("%8d %s\n" % (v[1], v[0])) return RET_OK def binaries(argv, svn, out, base_path = '.'): for ent in os.listdir(base_path): if ent in ('.', '..', '.svn'): continue ent_path = os.sep.join([base_path, ent]) if os.path.isfile(ent_path): mime_type = getSVNProperty(svn, 'svn:mime-type', ent_path) if mime_type != '' and not re.match(r'text/.*', mime_type): # we found a binary file needs_lock = getSVNProperty(svn, 'svn:needs-lock', ent_path) if needs_lock: out.write('* ') elif len(argv) >= 2 and argv[1] == '--set-lock': setSVNProperty(svn, 'svn:needs-lock', '*', ent_path) out.write('S ') else: out.write(' ') out.write(ent_path) out.write('\n') elif os.path.isdir(ent_path): binaries(argv, svn, out, os.sep.join([base_path, ent])) return RET_OK def lockable(argv, svn, out): if len(argv) >= 2 and argv[1] == '--status': for ob in argv[2:]: ob_path = os.sep.join([base_path, ob]) needs_lock = getSVNProperty(svn, 'svn:needs-lock', ob_path) if needs_lock: out.write('* ') else: out.write(' ') out.write(ob_path) out.write('\n') elif len(argv) >= 2 and argv[1] == '--remove': for ob in argv[2:]: ob_path = os.sep.join([base_path, ob]) delSVNProperty(svn, 'svn:needs-lock', ob_path) else: # note this is the default assumed operation for ob in argv[1:]: ob_path = os.sep.join([base_path, ob]) setSVNProperty(svn, 'svn:needs-lock', '*', ob_path) return RET_OK def diff(argv, svn, out): pout = Popen([svn] + argv, stdout=PIPE).stdout for line in iter(pout.readline, ''): colordiff(out, line) return RET_OK def log(argv, svn, out): mode = 'normal' pout = Popen([svn] + argv, stdout=PIPE).stdout for line in iter(pout.readline, ''): if mode == 'normal' and re.match(r'(r\d+)\s+\|', line): parts = line.split('|') if len(parts) == 4: ansi_color(out, 'blue', bold=True) out.write(parts[0]) ansi_reset(out) out.write('|') ansi_color(out, 'cyan') out.write(parts[1]) ansi_reset(out) out.write('|') ansi_color(out, 'magenta') out.write(parts[2]) ansi_reset(out) out.write('|') out.write(parts[3]) else: out.write(line) elif mode == 'normal' and re.match(r'Changed.paths:', line): out.write(line) mode = 'cp' elif mode == 'cp' and re.match(r' [ADM] ', line): action = line[3] if action == 'A': ansi_color(out, 'green') elif action == 'D': ansi_color(out, 'red') elif action == 'M': ansi_color(out, 'yellow') out.write(line) ansi_reset(out) elif re.match(r'-{72}', line): ansi_color(out, 'yellow') out.write(line) ansi_reset(out) mode = 'normal' elif re.match(r'={67}', line): ansi_color(out, 'yellow') out.write(line) ansi_reset(out) mode = 'diff' elif mode == 'diff': colordiff(out, line) elif re.match(r'Index:\s', line): ansi_color(out, 'yellow') out.write(line) ansi_reset(out) else: out.write(line) return RET_OK def update(argv, svn, out): pout = Popen([svn] + argv, stdout=PIPE).stdout filter_update(pout, out) return RET_OK def status(argv, svn, out): external = '' external_printed = True pout = Popen([svn] + argv, stdout=PIPE).stdout for line in iter(pout.readline, ''): m = re.match(r"Performing status on external item at '(.*)':", line) if m is not None: external = m.group(1) external_printed = False continue if re.match(r'\s*$', line): continue # anything not matched yet will cause an external to be shown if not external_printed: out.write("\nExternal '%s':\n" % external) external_printed = True if re.match(STATUS_LINE_REGEX, line): action = line[0] if action == 'A' or action == 'M': ansi_color(out, 'green') elif action == 'C': ansi_color(out, 'yellow') elif action == 'D': ansi_color(out, 'red') elif action == 'R': ansi_color(out, 'magenta') elif action == 'X': continue # don't print externals out.write(line) ansi_reset(out) continue out.write(line) return RET_OK def externals(argv, svn, out): pout = Popen([svn, 'status'], stdout=PIPE).stdout for line in iter(pout.readline, ''): if re.match(STATUS_LINE_REGEX, line): if line[0] == 'X': out.write(line[8:]) return RET_OK def root(argv, svn, out): out.write(getSVNRoot(svn) + '\n') return RET_OK ########################################################################### # Main # ########################################################################### def main(argv): global using_color config = get_config() realsvn = findInPath('svn') out = sys.stdout using_pager = False using_color = sys.stdout.isatty() and config['use_color'] if sys.stdout.isatty() and config['use_pager']: if (len(argv) >= 1 and argv[0] in ('blame', 'praise', 'annotate', 'ann', 'cat', 'diff', 'di', 'help', 'list', 'ls', 'log', 'propget', 'pget', 'pg', 'proplist', 'plist', 'pl')): pager = config['pager'] if 'PAGER' in os.environ and os.environ['PAGER'] != '': pager = os.environ['PAGER'] pager_proc = Popen(pager, shell=True, stdin=PIPE) out = pager_proc.stdin using_pager = True if realsvn == '': sys.stderr.write("Error: 'svn' not found in path\n") return 1 handlers = { 'add': add, 'branch': branch, 'branches': branch, 'externals': externals, 'switch': switch, 'sw': switch, 'merge': merge, 'tag': tag, 'tags': tag, 'diff': diff, 'di': diff, 'log': log, 'root': root, 'up': update, 'update': update, 'watch-lock': watch_lock, 'users': users, 'binaries': binaries, 'lockable': lockable, 'st': status, 'stat': status, 'status': status, } do_normal_exec = True if len(argv) >= 1: if argv[0] in handlers: r = handlers[argv[0]](argv, realsvn, out) if r == RET_OK or r == RET_ERR: do_normal_exec = False elif argv[0].startswith('__'): # allow double-underscore commands to execute the native # subversion command (e.g. "__st") argv[0] = argv[0][2:] if do_normal_exec: Popen([realsvn] + argv, stdout=out).wait() if using_pager: out.close() pager_proc.wait() return 0 if __name__ == "__main__": rc = 0 try: rc = main(sys.argv[1:]) except IOError: pass sys.exit(rc)