#!/usr/bin/env python # Josh's SVN wrapper script # # Recommend putting in path as 'jsvn' or something other than 'svn' and # making an alias svn='jsvn' # # The script detects if you have colorsvn and colordiff and uses them for # appropriate subcommands if so. # # Implemented subcommands: # branch[es] [[-d] ] # - with no arguments, list branches with '*' by the current one # - with -d, delete # - otherwise, create a new branch from the current one # tag[s] [[-d] ] | [-m ] # - with no arguments, list tags # - with -d, delete # - with -m, rename to # - otherwise, create a new tag from the current branch # switch # - switch to 'trunk', branch name, or tag name without having to specify # the full URL # - falls back to Subversion "switch" if doesn't exist # merge # - merge branch into the current WC path # - falls back to Subversion "merge" if doesn't exist # root # - output root URL (for use on shell such as "svn log $(svn root)/tags") # watch-lock # - block until the lock on a file/URL is released # users # - show a list of contributing users to a SVN path # binaries [--set-lock] # - show a list of versioned binary files under the current path, with # a prepended '*' for those with svn:needs-lock set # - with --set-lock, set svn:needs-lock to '*' for binaries import sys import os import re import time from subprocess import * def findInPath(cmd): path_entries = os.environ['PATH'].split(os.pathsep) for p in path_entries: full_path = os.path.join(p, cmd) if os.path.exists(full_path): return full_path return '' def getSVNURL(svn): for line in Popen([svn, 'info'], stdout=PIPE).communicate()[0].split('\n'): m = re.match(r'^URL:\s*(.*?)\s*$', line) if m is not None: return m.group(1) return '' def getSVNRoot(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts)): if parts[i] in ('trunk', 'tags', 'branches'): return '/'.join(parts[:i]) return '' def getSVNRelPath(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts) - 1): if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'): return '/' + '/'.join(parts[i+1:]) return '/' def getSVNTopLevel(svn): url = getSVNURL(svn) parts = url.split('/') for i in range(0, len(parts)): if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'): return '/'.join(parts[:i+1]) return '' def getSVNBranchList(svn): colist = [] root = getSVNRoot(svn) lines = Popen([svn, 'ls', root + '/branches'], stdout=PIPE).communicate()[0].split('\n') for line in lines: if re.match(r'^\s*$', line) is None: colist.append(re.sub(r'/$', '', line)) return colist def getSVNTagList(svn): colist = [] root = getSVNRoot(svn) lines = Popen([svn, 'ls', root + '/tags'], stdout=PIPE).communicate()[0].split('\n') for line in lines: if re.match(r'^\s*$', line) is None: colist.append(re.sub(r'/$', '', line)) return colist def getSVNProperty(svn, prop, path): return Popen([svn, 'propget', prop, path], stdout=PIPE).communicate()[0] def setSVNProperty(svn, prop, val, path): Popen([svn, 'propset', prop, val, path], stdout=PIPE).wait() def branch(argv, svn): if len(argv) < 2: bl = ['trunk'] + getSVNBranchList(svn) current = getSVNTopLevel(svn).split('/')[-1] bl.sort() for b in bl: sys.stdout.write('*' if b == current else ' ') sys.stdout.write(b + '\n') return 0 branch_name = argv[-1] origin = getSVNTopLevel(svn) root = getSVNRoot(svn) if len(argv) >= 3 and argv[1] == "-d": # delete branch in argv[2] Popen([svn, 'rm', root + '/branches/' + argv[2], '-m', "Removed branch '%s'" % branch_name]).wait() return 0 if origin == '' or root == '': sys.stderr.write("Could not determine origin/root URL\n") return 1 comment = "Created '%s' branch" % branch_name branch_path = root + '/branches/' + branch_name Popen([svn, 'copy', origin, branch_path, '-m', comment]).wait() return 0 def tag(argv, svn): tl = getSVNTagList(svn) if len(argv) < 2: tl.sort() for t in tl: sys.stdout.write(t + '\n') return 0 tag_name = argv[-1] origin = getSVNTopLevel(svn) root = getSVNRoot(svn) if origin == '' or root == '': sys.stderr.write("Could not determine origin/root URL\n") return 1 if len(argv) == 4 and argv[1] == '-m': old_tag_name = argv[2] if not old_tag_name in tl: sys.stderr.write('Tag %s not found!\n' % old_tag_name) return 1 Popen([svn, 'mv', root + '/tags/' + old_tag_name, root + '/tags/' + tag_name, '-m', "Renamed tag '%s' to '%s'" % (old_tag_name, tag_name)]).wait() return 0 if len(argv) >= 3 and argv[1] == "-d": if not tag_name in tl: sys.stderr.write('Tag %s not found!\n' % tag_name) return 1 # delete tag in argv[2] Popen([svn, 'rm', root + '/tags/' + tag_name, '-m', "Removed tag '%s'" % tag_name]).wait() return 0 comment = "Created '%s' tag" % tag_name tag_path = root + '/tags/' + tag_name Popen([svn, 'copy', origin, tag_path, '-m', comment]).wait() return 0 def switch(argv, svn): if len(argv) < 2: return -1 root = getSVNRoot(svn) path = getSVNRelPath(svn) if argv[1] == 'trunk': Popen([svn, 'switch', root + '/trunk' + path]).wait() return 0 bl = getSVNBranchList(svn) if argv[1] in bl: Popen([svn, 'switch', root + '/branches/' + argv[1] + path]).wait() return 0 tl = getSVNTagList(svn) if argv[1] in tl: Popen([svn, 'switch', root + '/tags/' + argv[1] + path]).wait() return 0 return -2 def merge(argv, svn): if len(argv) < 2: return -1 root = getSVNRoot(svn) branches = getSVNBranchList(svn) if not argv[1] in branches: return -3 lines = Popen([svn, 'log', '--stop-on-copy', root + '/branches/' + argv[1]], stdout=PIPE).communicate()[0].split('\n') rev = 0 for line in lines: m = re.match(r'^r(\d+)\s', line) if m is not None: rev = m.group(1) if rev == 0: sys.stderr.write('Could not get first branch revision\n') return -4 path = getSVNRelPath(svn) Popen([svn, 'merge', '-r%s:HEAD' % rev, root + '/branches/' + argv[1] + path, '.']).wait() return 0 def watch_lock(argv, svn): if len(argv) < 2: return -1 path = argv[1] if os.path.exists(path): # Get the repository URL of the file being watched p = Popen([svn, 'info', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match(r'URL: (.*)', line) if m is not None: path = m.group(1) break last_lock_owner = '' while 1: lock_owner = '' p = Popen([svn, 'info', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match(r'Lock\sOwner:\s*(.*)', line) if m is not None: lock_owner = m.group(1) break if lock_owner == '': break if lock_owner != last_lock_owner: sys.stdout.write('Locked by: %s\n' % lock_owner) last_lock_owner = lock_owner time.sleep(60) sys.stdout.write(''' _ _ _ _ _ _ | | | |_ __ | | ___ ___| | _____ __| | | | | | | '_ \| |/ _ \ / __| |/ / _ \/ _` | | | |_| | | | | | (_) | (__| < __/ (_| |_| \___/|_| |_|_|\___/ \___|_|\_\___|\__,_(_) ''') return 0 def users(argv, svn): path = '.' if len(argv) > 1: path = argv[1] users = {} p = Popen([svn, 'log', '-q', path], stdout=PIPE) lines = p.communicate()[0].split('\n') for line in lines: m = re.match('r\d+\s*\|([^|]+)\|', line) if m is not None: user = m.group(1).strip() if not user.lower() in users: users[user.lower()] = [user, 1] else: users[user.lower()][1] += 1 values = users.values() values.sort(key = lambda x: x[1], reverse = True) for v in values: print "%8d %s" % (v[1], v[0]) return 0 def binaries(argv, svn, base_path = '.'): for ent in os.listdir(base_path): if ent in ('.', '..', '.svn'): continue ent_path = os.sep.join([base_path, ent]) if os.path.isfile(ent_path): mime_type = getSVNProperty(svn, 'svn:mime-type', ent_path) if re.match(r'application/octet-stream', mime_type): # we found a binary file needs_lock = getSVNProperty(svn, 'svn:needs-lock', ent_path) if needs_lock: sys.stdout.write('* ') elif len(argv) >= 2 and argv[1] == '--set-lock': setSVNProperty(svn, 'svn:needs-lock', '*', ent_path) sys.stdout.write('S ') else: sys.stdout.write(' ') sys.stdout.write(ent_path) sys.stdout.write('\n') elif os.path.isdir(ent_path): binaries(argv, svn, os.sep.join([base_path, ent])) return 0 def main(argv): realsvn = findInPath('svn') colorsvn = findInPath('colorsvn') colordiff = findInPath('colordiff') if realsvn == '': sys.stderr.write("Error: 'svn' not found in path\n") return 1 if len(argv) >= 1: if argv[0] == "branch" or argv[0] == "branches": return branch(argv, realsvn) if argv[0] == "switch": r = switch(argv, realsvn) if r == 0: Popen(realsvn + ' info | grep --color=none "^URL:"', shell = True).wait() if r >= 0: return r if argv[0] == "merge": r = merge(argv, realsvn) if r >= 0: return r if argv[0] == "tag" or argv[0] == "tags": return tag(argv, realsvn) if argv[0] == "diff" and colordiff != '': diff_out = Popen([realsvn] + argv, stdout=PIPE).stdout Popen([colordiff], stdin=diff_out).wait() return 0 if argv[0] == "root": sys.stdout.write(getSVNRoot(realsvn) + '\n') return 0 if argv[0] == "watch-lock": return watch_lock(argv, realsvn) if argv[0] == "users": return users(argv, realsvn) if argv[0] == "binaries": return binaries(argv, realsvn) if argv[0] in ('st', 'status', 'log', 'up', 'update') \ and colorsvn != '': realsvn = colorsvn Popen([realsvn] + argv).wait() return 0 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))