3
# Copyright (C) 2014 Tim Marston <tim@edm.am>
5
# This file is part of stdhome (hereafter referred to as "this program").
6
# See http://ed.am/dev/stdhome for more information.
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU General Public License for more details.
18
# You should have received a copy of the GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
import subprocess, os, re, shutil
23
from subprocess import Popen
26
from stdhome import the
32
def __init__( self, dir ):
35
@param dir the fully-qualified directory to work in.
41
"""Create a new, empty branch
44
# the directory shouldn't exist
48
p = Popen( [ 'bzr', 'init', '.' ], cwd = self.dir,
49
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
50
output = p.communicate()[ 0 ]
53
# attempt to clean-up dir
55
shutil.rmtree( self.dir )
59
raise self.VcsError( 'bzr init failed', output )
62
def checkout( self, url ):
63
"""Checkout a new copy of a remote branch.
65
@param url the remote repository URL
68
# the directory shouldn't exist
72
p = Popen( [ 'bzr', 'co', url, '.' ], cwd = self.dir,
73
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
74
output = p.communicate()[ 0 ]
77
# attempt to clean-up dir
79
shutil.rmtree( self.dir )
83
raise self.VcsError( 'bzr checkout failed', output )
87
"""Revert the branch so that there are no outstanding changes or unknown files.
91
p = Popen( [ 'bzr', 'revert', '--no-backup' ], cwd = self.dir,
92
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
93
output = p.communicate()[ 0 ]
95
raise self.VcsError( 'bzr revert failed', output )
98
p = Popen( [ 'bzr', 'st' ], cwd = self.dir,
99
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
100
output = p.communicate()[ 0 ]
102
raise self.VcsError( 'bzr status failed', output )
103
files = self.parse_file_blocks( output )
105
# remove unknown files
106
if 'unknown' in files:
107
for file in files[ 'unknown' ]:
108
full_file = os.path.join( self.dir, file )
109
if os.path.isfile( full_file ):
110
os.unlink( full_file )
111
elif os.full_file.isdir( full_file ):
112
shutil.rmtree( full_file )
114
raise RuntimeError( 'exotic file in repo: %s' % file )
118
"""Update the branch, pulling down any upstream changes and merging them. This
119
method returns a list of the files that were modified as part of this
124
p = Popen( [ 'bzr', 'update' ], cwd = self.dir,
125
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
126
output = p.communicate()[ 0 ]
128
raise self.VcsError( 'bzr update failed', output )
130
# parse output (see logic in report() in bzrlib/delta.py)
132
buf = StringIO.StringIO( output )
134
if not re.search( '^[-R+ ?][K NMD!][* ] ', line ): continue
136
if the.verbose: print ' %s' % line
138
# renames show before and after file names
139
matches = re.search( '^R.. (.*?)[/@+]? => (.*?)[/@+]?$', line )
141
files.append( matches.group( 1 ) )
142
files.append( matches.group( 2 ) )
145
# kind changes shows the same name twice
146
matches = re.search( '^.K. (.*?)[/@+]? => (.*?)[/@+]?$', line )
148
files.append( matches.group( 1 ) )
151
# other entries have only one filename
152
matches = re.search( '^... (.*?)[/@+]?$', line )
154
files.append( matches.group( 1 ) )
158
'failed to parse bzr update output line:\n%' % line )
163
def has_changes( self ):
164
"""Check if the branch has any local modifications.
168
p = Popen( [ 'bzr', 'status', '--no-pending' ], cwd = self.dir,
169
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
170
output = p.communicate()[ 0 ]
172
raise self.VcsError( 'bzr status failed', output )
173
files = self.parse_file_blocks( output )
174
return True if len( files ) else False
177
def get_conflicts( self ):
178
"""Return a list of files that have conflicts.
182
p = Popen( [ 'bzr', 'status', '--no-pending' ], cwd = self.dir,
183
stdout = subprocess.PIPE, stderr = subprocess.STDOUT )
184
output = p.communicate()[ 0 ]
186
raise self.VcsError( 'bzr status failed', output )
187
files = self.parse_file_blocks( output )
188
return files['conflicts'] if 'conflicts' in files else None
191
def parse_file_blocks( self, output ):
194
buf = StringIO.StringIO( output )
196
matches = re.search( '^([a-z ]+):$', line, re.I )
198
current = matches.group( 1 )
201
matches = re.search( '^ ([^ ].*)$', line )
203
if not current in res:
204
res[ current ] = list()
205
res[ current ].append( matches.group( 1 ) )
207
if re.search( '^[0-9]+ shelf exists', line ): continue
208
if re.search( '^working tree is out of date', line ): continue
209
raise self.ParseError( "unrecognised line: %s" % line )
213
class ParseError( Exception ):