#!/usr/bin/env python # Josh's SVN wrapper script # # Recommend putting in path as 'jsvn' or something other than 'svn' and # making an alias svn='jsvn'. For example, this .bash_aliases stanza # will check if 'jsvn' is in your path and automatically alias 'svn' to # it if so: # if [[ "$(which jsvn 2>/dev/null)" != "" ]]; then # alias svn='jsvn' # fi # # The script detects if you have colorsvn and colordiff and uses them for # appropriate subcommands if so. # # Implemented subcommands: # branch[es] [[-d] ] # - with no arguments, list branches with '*' by the current one # - with -d, delete # - otherwise, create a new branch from the current one # tag[s] [[-d] ] | [-m ] # - with no arguments, list tags # - with -d, delete # - with -m, rename to # - otherwise, create a new tag from the current branch # switch # - switch to 'trunk', branch name, or tag name without having to specify # the full URL # - falls back to Subversion "switch" if doesn't exist # merge # - merge branch into the current WC path # - falls back to Subversion "merge" if doesn't exist # root # - output root URL (for use on shell such as "svn log $(svn root)/tags") # watch-lock # - block until the lock on a file/URL is released # users # - show a list of contributing users to a SVN path # binaries [--set-lock] # - show a list of versioned binary files under the current path, with # a prepended '*' for those with svn:needs-lock set # - with --set-lock, set svn:needs-lock to '*' for binaries # lockable [--remove] | [--status] # - with no switches, set svn:needs-lock to '*' for file[s] # - with --remove, remove svn:needs-lock' for file[s] # - with --status, prepended '*' for those with svn:needs-lock set import sys import os import re import time from subprocess import * ########################################################################### # Subcommand Handler Return Values # ########################################################################### RET_OK = 0 RET_ERR = 1 RET_REEXEC = 2 ########################################################################### # ANSI escape color code values # ########################################################################### COLORS = { 'black': 0, 'red': 1, 'green': 2, 'yellow': 3, 'blue': 4, 'magenta': 5, 'cyan': 6, 'white': 7, } ########################################################################### # Utility Functions # ########################################################################### def ansi_color(out, fg=None, bg=None, bold=False): bc = 1 if bold else 0 if fg is not None: out.write('\033[%d;%dm' % (bc, 30 + COLORS[fg])) if bg is not None: out.write('\033[%d;%dm' % (bc, 40 + COLORS[bg])) def ansi_reset(out): out.write('\033[0m') def colordiff(out, line): if re.search(r'^Index:\s', line): ansi_color(out, 'yellow') out.write(line + '\n') ansi_reset(out) return if re.match(r'={67}', line): ansi_color(out, 'yellow') out.write(line + '\n') ansi_reset(out) return if re.search(r'^-', line): ansi_color(out, 'red') out.write(line + '\n') ansi_reset(out) return elif re.search(r'^\+', line): ansi_color(out, 'green') out.write(line + '\n') ansi_reset(out) return m = re.match(r'(@@.*@@)(.*)', line) if m is not None: ansi_color(out, 'cyan') out.write(m.group(1)) out.write(m.group(2)) out.write('\n') ansi_reset(out) return out.write(line + '\n') def findInPath(cmd): path_entries = os.environ['PATH'].split(os.pathsep) for p in path_entries: full_path = os.path.join(p, cmd) if os.path.exists(full_path): return full_path return '' def getSVNURL(svn): for line in Popen([svn, 'info'], stdout=PIPE).communicate()[0].split('\n'): m = re.match(r'^URL:\s*(.*?)\s*$', line) if m is not None: return m.group(1) return '' def getSVNRoot(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts)): if parts[i] in ('trunk', 'tags', 'branches'): return '/'.join(parts[:i]) return '' def getSVNRelPath(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts) - 1): if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'): return '/' + '/'.join(parts[i+1:]) return '/' def getSVNTopLevel(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts)): if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'): return '/'.join(parts[:i+1]) return '' def getSVNBranchList(svn): colist = [] root = getSVNRoot(svn) lines = Popen([svn, 'ls', root + '/branches'], stdout=PIPE).communicate()[0].split('\n') for line in lines: if re.match(r'^\s*$', line) is None: colist.append(re.sub(r'/$', '', line)) return colist def getSVNTagList(svn): colist = [] root = getSVNRoot(svn) lines = Popen([svn, 'ls', root + '/tags'], stdout=PIPE).communicate()[0].split('\n') for line in lines: if re.match(r'^\s*$', line) is None: colist.append(re.sub(r'/$', '', line)) return colist def getSVNProperty(svn, prop, path): return Popen([svn, 'propget', prop, path], stdout=PIPE).communicate()[0] def setSVNProperty(svn, prop, val, path): Popen([svn, 'propset', prop, val, path], stdout=PIPE).wait() def delSVNProperty(svn, prop, path): Popen([svn, 'propdel', prop, path], stdout=PIPE).wait() ########################################################################### # Subcommand Handlers # ########################################################################### def branch(argv, svn, out): if len(argv) < 2: bl = ['trunk'] + getSVNBranchList(svn) current = getSVNTopLevel(svn).split('/')[-1] bl.sort() for b in bl: sys.stdout.write('*' if b == current else ' ') sys.stdout.write(b + '\n') return RET_OK branch_name = argv[-1] origin = getSVNTopLevel(svn) root = getSVNRoot(svn) if len(argv) >= 3 and argv[1] == "-d": # delete branch in argv[2] Popen([svn, 'rm', root + '/branches/' + argv[2], '-m', "Removed branch '%s'" % branch_name], stdout=out).wait() return RET_OK if origin == '' or root == '': sys.stderr.write("Could not determine origin/root URL\n") return RET_ERR comment = "Created '%s' branch" % branch_name branch_path = root + '/branches/' + branch_name Popen([svn, 'copy', origin, branch_path, '-m', comment], stdout=out).wait() return RET_OK def tag(argv, svn, out): tl = getSVNTagList(svn) if len(argv) < 2: tl.sort() for t in tl: sys.stdout.write(t + '\n') return RET_OK tag_name = argv[-1] origin = getSVNTopLevel(svn) root = getSVNRoot(svn) if origin == '' or root == '': sys.stderr.write("Could not determine origin/root URL\n") return RET_ERR if len(argv) == 4 and argv[1] == '-m': old_tag_name = argv[2] if not old_tag_name in tl: sys.stderr.write('Tag %s not found!\n' % old_tag_name) return RET_ERR Popen([svn, 'mv', root + '/tags/' + old_tag_name, root + '/tags/' + tag_name, '-m', "Renamed tag '%s' to '%s'" % (old_tag_name, tag_name)], stdout=out).wait() return RET_OK if len(argv) >= 3 and argv[1] == "-d": if not tag_name in tl: sys.stderr.write('Tag %s not found!\n' % tag_name) return RET_ERR # delete tag in argv[2] Popen([svn, 'rm', root + '/tags/' + tag_name, '-m', "Removed tag '%s'" % tag_name], stdout=out).wait() return RET_OK comment = "Created '%s' tag" % tag_name tag_path = root + '/tags/' + tag_name Popen([svn, 'copy', origin, tag_path, '-m', comment], stdout=out).wait() return RET_OK def switch(argv, svn, out): if len(argv) < 2: return RET_REEXEC switched = False root = getSVNRoot(svn) path = getSVNRelPath(svn) while True: if argv[1] == 'trunk': Popen([svn, 'switch', root + '/trunk' + path], stdout=out).wait() switched = True break bl = getSVNBranchList(svn) if argv[1] in bl: Popen([svn, 'switch', root + '/branches/' + argv[1] + path], stdout=out).wait() switched = True break tl = getSVNTagList(svn) if argv[1] in tl: Popen([svn, 'switch', root + '/tags/' + argv[1] + path], stdout=out).wait() switched = True break if switched: Popen(svn + ' info | grep --color=none "^URL:"', shell=True, stdout=out).wait() return RET_OK return RET_REEXEC def merge(argv, svn, out): if len(argv) < 2: return RET_REEXEC root = getSVNRoot(svn) branches = getSVNBranchList(svn) if not argv[1] in branches: return RET_REEXEC lines = Popen([svn, 'log', '--stop-on-copy', root + '/branches/' + argv[1]], stdout=PIPE).communicate()[0].split('\n') rev = 0 for line in lines: m = re.match(r'^r(\d+)\s', line) if m is not None: rev = m.group(1) if rev == 0: sys.stderr.write('Could not get first branch revision\n') return RET_ERR path = getSVNRelPath(svn) Popen([svn, 'merge', '-r%s:HEAD' % rev, root + '/branches/' + argv[1] + path, '.'], stdout=out).wait() return RET_OK def watch_lock(argv, svn, out): if len(argv) < 2: return RET_ERR path = argv[1] if os.path.exists(path): # Get the repository URL of the file being watched p = Popen([svn, 'info', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match(r'URL: (.*)', line) if m is not None: path = m.group(1) break last_lock_owner = '' while True: lock_owner = '' p = Popen([svn, 'info', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match(r'Lock\sOwner:\s*(.*)', line) if m is not None: lock_owner = m.group(1) break if lock_owner == '': break if lock_owner != last_lock_owner: sys.stdout.write('Locked by: %s\n' % lock_owner) last_lock_owner = lock_owner time.sleep(60) sys.stdout.write(''' _ _ _ _ _ _ | | | |_ __ | | ___ ___| | _____ __| | | | | | | '_ \| |/ _ \ / __| |/ / _ \/ _` | | | |_| | | | | | (_) | (__| < __/ (_| |_| \___/|_| |_|_|\___/ \___|_|\_\___|\__,_(_) ''') return RET_OK def users(argv, svn, out): path = '.' if len(argv) > 1: path = argv[1] users = {} p = Popen([svn, 'log', '-q', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match('r\d+\s*\|([^|]+)\|', line) if m is not None: user = m.group(1).strip() if not user.lower() in users: users[user.lower()] = [user, 1] else: users[user.lower()][1] += 1 values = users.values() values.sort(key = lambda x: x[1], reverse = True) for v in values: sys.stdout.write("%8d %s\n" % (v[1], v[0])) return RET_OK def binaries(argv, svn, out, base_path = '.'): for ent in os.listdir(base_path): if ent in ('.', '..', '.svn'): continue ent_path = os.sep.join([base_path, ent]) if os.path.isfile(ent_path): mime_type = getSVNProperty(svn, 'svn:mime-type', ent_path) if mime_type != '' and not re.match(r'text/.*', mime_type): # we found a binary file needs_lock = getSVNProperty(svn, 'svn:needs-lock', ent_path) if needs_lock: sys.stdout.write('* ') elif len(argv) >= 2 and argv[1] == '--set-lock': setSVNProperty(svn, 'svn:needs-lock', '*', ent_path) sys.stdout.write('S ') else: sys.stdout.write(' ') sys.stdout.write(ent_path) sys.stdout.write('\n') elif os.path.isdir(ent_path): binaries(argv, svn, out, os.sep.join([base_path, ent])) return RET_OK def lockable(argv, svn, out): if len(argv) >= 2 and argv[1] == '--status': for ob in argv[2:]: ob_path = os.sep.join([base_path, ob]) needs_lock = getSVNProperty(svn, 'svn:needs-lock', ob_path) if needs_lock: sys.stdout.write('* ') else: sys.stdout.write(' ') sys.stdout.write(ob_path) sys.stdout.write('\n') elif len(argv) >= 2 and argv[1] == '--remove': for ob in argv[2:]: ob_path = os.sep.join([base_path, ob]) delSVNProperty(svn, 'svn:needs-lock', ob_path) else: # note this is the default assumed operation for ob in argv[1:]: ob_path = os.sep.join([base_path, ob]) setSVNProperty(svn, 'svn:needs-lock', '*', ob_path) return RET_OK def diff(argv, svn, out): for line in Popen([svn] + argv, stdout=PIPE).communicate()[0].split('\n'): colordiff(out, line) return RET_OK def log(argv, svn, out): in_diff = False for line in Popen([svn] + argv, stdout=PIPE).communicate()[0].split('\n'): if re.match(r'(r\d+)\s+\|', line): parts = line.split('|') if len(parts) == 4: ansi_color(out, 'blue', bold=True) out.write(parts[0]) ansi_reset(out) out.write('|') ansi_color(out, 'cyan') out.write(parts[1]) ansi_reset(out) out.write('|') ansi_color(out, 'magenta') out.write(parts[2]) ansi_reset(out) out.write('|') out.write(parts[3]) out.write('\n') elif re.match(r'-{72}', line): ansi_color(out, 'yellow') out.write(line + '\n') ansi_reset(out) in_diff = False elif re.match(r'={67}', line): ansi_color(out, 'yellow') out.write(line + '\n') ansi_reset(out) in_diff = True elif in_diff: colordiff(out, line) elif re.search(r'^Index:\s', line): ansi_color(out, 'yellow') out.write(line + '\n') ansi_reset(out) else: out.write(line + '\n') return RET_OK def root(argv, svn, out): sys.stdout.write(getSVNRoot(svn) + '\n') return RET_OK ########################################################################### # Main # ########################################################################### def main(argv): realsvn = findInPath('svn') colorsvn = findInPath('colorsvn') out = sys.stdout stdout_is_a_tty = sys.stdout.isatty() if stdout_is_a_tty: pager = 'less -FRX' if 'PAGER' in os.environ and os.environ['PAGER'] != '': pager = os.environ['PAGER'] pager_proc = Popen(pager, shell=True, stdin=PIPE) out = pager_proc.stdin if realsvn == '': sys.stderr.write("Error: 'svn' not found in path\n") return 1 handlers = { 'branch': branch, 'branches': branch, 'switch': switch, 'merge': merge, 'tag': tag, 'tags': tag, 'diff': diff, 'log': log, 'root': root, 'watch-lock': watch_lock, 'users': users, 'binaries': binaries, 'lockable': lockable, } do_normal_exec = True if len(argv) >= 1: if argv[0] in handlers: r = handlers[argv[0]](argv, realsvn, out) if r == RET_OK or r == RET_ERR: do_normal_exec = False if (argv[0] in ('st', 'status', 'log', 'up', 'update') and colorsvn != ''): realsvn = colorsvn if do_normal_exec: Popen([realsvn] + argv, stdout=out).wait() if stdout_is_a_tty: out.close() pager_proc.wait() return 0 if __name__ == "__main__": rc = 0 try: rc = main(sys.argv[1:]) except IOError: pass sys.exit(rc)