files/jsvn
2011-04-25 12:43:09 -04:00

342 lines
11 KiB
Python
Executable File

#!/usr/bin/env python
# Josh's SVN wrapper script
#
# Recommend putting in path as 'jsvn' or something other than 'svn' and
# making an alias svn='jsvn'
#
# The script detects if you have colorsvn and colordiff and uses them for
# appropriate subcommands if so.
#
# Implemented subcommands:
# branch[es] [[-d] <branch_name>]
# - with no arguments, list branches with '*' by the current one
# - with -d, delete <branch>
# - otherwise, create a new branch from the current one
# tag[s] [[-d] <tag_name>] | [-m <old_tag_name> <new_tag_name>]
# - with no arguments, list tags
# - with -d, delete <tag>
# - with -m, rename <old_tag_name> to <new_tag_name>
# - otherwise, create a new tag from the current branch
# switch <short_name>
# - switch to 'trunk', branch name, or tag name without having to specify
# the full URL
# - falls back to Subversion "switch" if <short_name> doesn't exist
# merge <branch>
# - merge branch <branch> into the current WC path
# - falls back to Subversion "merge" if <branch> doesn't exist
# root
# - output root URL (for use on shell such as "svn log $(svn root)/tags")
# watch-lock
# - block until the lock on a file/URL is released
# users
# - show a list of contributing users to a SVN path
# binaries
# - show a list of versioned binary files under the current path, with
# a prepended '*' for those with svn:needs-lock set
import sys
import os
import re
import time
from subprocess import *
PATH = os.environ['PATH'].split(os.pathsep)
def findInPath(cmd):
for p in PATH:
full_path = os.path.join(p, cmd)
if os.path.exists(full_path):
return full_path
return ''
def getSVNURL(svn):
for line in Popen([svn, 'info'], stdout=PIPE).communicate()[0].split('\n'):
m = re.match(r'^URL:\s*(.*?)\s*$', line)
if m is not None:
return m.group(1)
return ''
def getSVNRoot(svn):
url = getSVNURL(svn)
parts = url.split('/')
for i in range(0, len(parts)):
if parts[i] in ('trunk', 'tags', 'branches'):
return '/'.join(parts[:i])
return ''
def getSVNRelPath(svn):
url = getSVNURL(svn)
parts = url.split('/')
for i in range(0, len(parts) - 1):
if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'):
return '/' + '/'.join(parts[i+1:])
return '/'
def getSVNTopLevel(svn):
url = getSVNURL(svn)
parts = url.split('/')
for i in range(0, len(parts)):
if parts[i] == 'trunk' or i > 0 and parts[i-1] in ('tags', 'branches'):
return '/'.join(parts[:i+1])
return ''
def getSVNBranchList(svn):
colist = []
root = getSVNRoot(svn)
lines = Popen([svn, 'ls', root + '/branches'],
stdout=PIPE).communicate()[0].split('\n')
for line in lines:
if re.match(r'^\s*$', line) is None:
colist.append(re.sub(r'/$', '', line))
return colist
def getSVNTagList(svn):
colist = []
root = getSVNRoot(svn)
lines = Popen([svn, 'ls', root + '/tags'],
stdout=PIPE).communicate()[0].split('\n')
for line in lines:
if re.match(r'^\s*$', line) is None:
colist.append(re.sub(r'/$', '', line))
return colist
def getSVNProperty(svn, prop, path):
return Popen([svn, 'propget', prop, path], stdout=PIPE).communicate()[0]
def branch(argv, svn):
if len(argv) < 2:
bl = ['trunk'] + getSVNBranchList(svn)
current = getSVNTopLevel(svn).split('/')[-1]
bl.sort()
for b in bl:
sys.stdout.write('*' if b == current else ' ')
sys.stdout.write(b + '\n')
return 0
branch_name = argv[-1]
origin = getSVNTopLevel(svn)
root = getSVNRoot(svn)
if len(argv) >= 3 and argv[1] == "-d":
# delete branch in argv[2]
Popen([svn, 'rm', root + '/branches/' + argv[2], '-m',
"Removed branch '%s'" % branch_name]).wait()
return 0
if origin == '' or root == '':
sys.stderr.write("Could not determine origin/root URL\n")
return 1
comment = "Created '%s' branch" % branch_name
branch_path = root + '/branches/' + branch_name
Popen([svn, 'copy', origin, branch_path, '-m', comment]).wait()
return 0
def tag(argv, svn):
tl = getSVNTagList(svn)
if len(argv) < 2:
tl.sort()
for t in tl:
sys.stdout.write(t + '\n')
return 0
tag_name = argv[-1]
origin = getSVNTopLevel(svn)
root = getSVNRoot(svn)
if origin == '' or root == '':
sys.stderr.write("Could not determine origin/root URL\n")
return 1
if len(argv) == 4 and argv[1] == '-m':
old_tag_name = argv[2]
if not old_tag_name in tl:
sys.stderr.write('Tag %s not found!\n' % old_tag_name)
return 1
Popen([svn, 'mv',
root + '/tags/' + old_tag_name, root + '/tags/' + tag_name,
'-m', "Renamed tag '%s' to '%s'" % (old_tag_name, tag_name)]).wait()
return 0
if len(argv) >= 3 and argv[1] == "-d":
if not tag_name in tl:
sys.stderr.write('Tag %s not found!\n' % tag_name)
return 1
# delete tag in argv[2]
Popen([svn, 'rm', root + '/tags/' + tag_name, '-m',
"Removed tag '%s'" % tag_name]).wait()
return 0
comment = "Created '%s' tag" % tag_name
tag_path = root + '/tags/' + tag_name
Popen([svn, 'copy', origin, tag_path, '-m', comment]).wait()
return 0
def switch(argv, svn):
if len(argv) < 2:
return -1
root = getSVNRoot(svn)
path = getSVNRelPath(svn)
if argv[1] == 'trunk':
Popen([svn, 'switch', root + '/trunk' + path]).wait()
return 0
bl = getSVNBranchList(svn)
if argv[1] in bl:
Popen([svn, 'switch', root + '/branches/' + argv[1] + path]).wait()
return 0
tl = getSVNTagList(svn)
if argv[1] in tl:
Popen([svn, 'switch', root + '/tags/' + argv[1] + path]).wait()
return 0
return -2
def merge(argv, svn):
if len(argv) < 2:
return -1
root = getSVNRoot(svn)
branches = getSVNBranchList(svn)
if not argv[1] in branches:
return -3
lines = Popen([svn, 'log', '--stop-on-copy', root + '/branches/' + argv[1]],
stdout=PIPE).communicate()[0].split('\n')
rev = 0
for line in lines:
m = re.match(r'^r(\d+)\s', line)
if m is not None:
rev = m.group(1)
if rev == 0:
sys.stderr.write('Could not get first branch revision\n')
return -4
path = getSVNRelPath(svn)
Popen([svn, 'merge', '-r%s:HEAD' % rev,
root + '/branches/' + argv[1] + path, '.']).wait()
return 0
def watch_lock(argv, svn):
if len(argv) < 2:
return -1
path = argv[1]
if os.path.exists(path):
# Get the repository URL of the file being watched
p = Popen([svn, 'info', path], stdout=PIPE)
lines = p.communicate()[0].split('\n')
for line in lines:
m = re.match(r'URL: (.*)', line)
if m is not None:
path = m.group(1)
break
last_lock_owner = ''
while 1:
lock_owner = ''
p = Popen([svn, 'info', path], stdout=PIPE)
lines = p.communicate()[0].split('\n')
for line in lines:
m = re.match(r'Lock\sOwner:\s*(.*)', line)
if m is not None:
lock_owner = m.group(1)
break
if lock_owner == '':
break
if lock_owner != last_lock_owner:
sys.stdout.write('Locked by: %s\n' % lock_owner)
last_lock_owner = lock_owner
time.sleep(60)
sys.stdout.write('''
_ _ _ _ _ _
| | | |_ __ | | ___ ___| | _____ __| | |
| | | | '_ \| |/ _ \ / __| |/ / _ \/ _` | |
| |_| | | | | | (_) | (__| < __/ (_| |_|
\___/|_| |_|_|\___/ \___|_|\_\___|\__,_(_)
''')
return 0
def users(argv, svn):
path = '.'
if len(argv) > 1:
path = argv[1]
users = {}
p = Popen([svn, 'log', '-q', path], stdout=PIPE)
lines = p.communicate()[0].split('\n')
for line in lines:
m = re.match('r\d+\s*\|([^|]+)\|', line)
if m is not None:
user = m.group(1).strip()
if not user.lower() in users:
users[user.lower()] = [user, 1]
else:
users[user.lower()][1] += 1
values = users.values()
values.sort(key = lambda x: x[1], reverse = True)
for v in values:
print "%8d %s" % (v[1], v[0])
return 0
def binaries(argv, svn, base_path = '.'):
for ent in os.listdir(base_path):
if ent in ('.', '..', '.svn'):
continue
ent_path = os.sep.join([base_path, ent])
if os.path.isfile(ent_path):
mime_type = getSVNProperty(svn, 'svn:mime-type', ent_path)
if re.match(r'application/octet-stream', mime_type):
# we found a binary file
needs_lock = getSVNProperty(svn, 'svn:needs-lock', ent_path)
sys.stdout.write('* ' if needs_lock != '' else ' ')
sys.stdout.write(ent_path)
sys.stdout.write('\n')
elif os.path.isdir(ent_path):
binaries(argv, svn, os.sep.join([base_path, ent]))
return 0
def main(argv):
realsvn = findInPath('svn')
colorsvn = findInPath('colorsvn')
colordiff = findInPath('colordiff')
if realsvn == '':
sys.stderr.write("Error: 'svn' not found in path\n")
return 1
if len(argv) >= 1:
if argv[0] == "branch" or argv[0] == "branches":
return branch(argv, realsvn)
if argv[0] == "switch":
r = switch(argv, realsvn)
if r == 0:
Popen(realsvn + ' info | grep --color=none "^URL:"',
shell = True).wait()
if r >= 0:
return r
if argv[0] == "merge":
r = merge(argv, realsvn)
if r >= 0:
return r
if argv[0] == "tag" or argv[0] == "tags":
return tag(argv, realsvn)
if argv[0] == "diff" and colordiff != '':
diff_out = Popen([realsvn] + argv, stdout=PIPE).stdout
Popen([colordiff], stdin=diff_out).wait()
return 0
if argv[0] == "root":
sys.stdout.write(getSVNRoot(realsvn) + '\n')
return 0
if argv[0] == "watch-lock":
return watch_lock(argv, realsvn)
if argv[0] == "users":
return users(argv, realsvn)
if argv[0] == "binaries":
return binaries(argv, realsvn)
if argv[0] in ('st', 'status', 'log', 'up', 'update') \
and colorsvn != '':
realsvn = colorsvn
Popen([realsvn] + argv).wait()
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))