File: objects.py

package info (click to toggle)
cvs2svn 2.4.0-4
  • links: PTS
  • area: main
  • in suites: stretch
  • size: 3,720 kB
  • sloc: python: 22,383; sh: 512; perl: 121; makefile: 84
file content (346 lines) | stat: -rw-r--r-- 12,837 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python
#
#  objects.py: Objects that keep track of state during a test
#
#  Subversion is a tool for revision control.
#  See http://subversion.tigris.org for more information.
#
# ====================================================================
#    Licensed to the Apache Software Foundation (ASF) under one
#    or more contributor license agreements.  See the NOTICE file
#    distributed with this work for additional information
#    regarding copyright ownership.  The ASF licenses this file
#    to you under the Apache License, Version 2.0 (the
#    "License"); you may not use this file except in compliance
#    with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing,
#    software distributed under the License is distributed on an
#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#    KIND, either express or implied.  See the License for the
#    specific language governing permissions and limitations
#    under the License.
######################################################################

# General modules
import shutil, sys, re, os, subprocess

# Our testing module
import svntest
from svntest import actions, main, tree, verify, wc


######################################################################
# Helpers

def local_path(path):
  """Convert a path from internal style ('/' separators) to the local style."""
  if path == '':
    path = '.'
  return os.sep.join(path.split('/'))


def db_dump(db_dump_name, repo_path, table):
  """Yield a human-readable representation of the rows of the BDB table
  TABLE in the repo at REPO_PATH.  Yield one line of text at a time.
  Calls the external program "db_dump" which is supplied with BDB."""
  table_path = repo_path + "/db/" + table
  process = subprocess.Popen([db_dump_name, "-p", table_path],
                             stdout=subprocess.PIPE, universal_newlines=True)
  retcode = process.wait()
  assert retcode == 0

  # Strip out the header and footer; copy the rest into FILE.
  copying = False
  for line in process.stdout.readlines():
    if line == "HEADER=END\n":
      copying = True;
    elif line == "DATA=END\n":
      break
    elif copying:
      yield line

def pretty_print_skel(line):
  """Return LINE modified so as to look prettier for human reading, but no
  longer unambiguous or machine-parsable. LINE is assumed to be in the
  "Skel" format in which some values are preceded by a decimal length. This
  function removes the length indicator, and also replaces a zero-length
  value with a pair of single quotes."""
  new_line = ''
  while line:
    # an explicit-length atom
    explicit_atom = re.match(r'\d+ ', line)
    if explicit_atom:
      n = int(explicit_atom.group())
      line = line[explicit_atom.end():]
      new_line += line[:n]
      line = line[n:]
      if n == 0:
        new_line += "''"
      continue

    # an implicit-length atom
    implicit_atom = re.match(r'[A-Za-z][^\s()]*', line)
    if implicit_atom:
      n = implicit_atom.end()
      new_line += line[:n]
      line = line[n:]
      continue

    # parentheses, white space or any other non-atom
    new_line += line[:1]
    line = line[1:]

  return new_line

def dump_bdb(db_dump_name, repo_path, dump_dir):
  """Dump all the known BDB tables in the repository at REPO_PATH into a
  single text file in DUMP_DIR.  Omit any "next-key" records."""
  dump_file = dump_dir + "/all.bdb"
  file = open(dump_file, 'w')
  for table in ['revisions', 'transactions', 'changes', 'copies', 'nodes',
                'node-origins', 'representations', 'checksum-reps', 'strings',
                'locks', 'lock-tokens', 'miscellaneous', 'uuids']:
    file.write(table + ":\n")
    next_key_line = False
    for line in db_dump(db_dump_name, repo_path, table):
      # Omit any 'next-key' line and the following line.
      if next_key_line:
        next_key_line = False
        continue
      if line == ' next-key\n':
        next_key_line = True
        continue
      # The line isn't necessarily a skel, but pretty_print_skel() shouldn't
      # do too much harm if it isn't.
      file.write(pretty_print_skel(line))
    file.write("\n")
  file.close()

def locate_db_dump():
  """Locate a db_dump executable"""
  # Assume that using the newest version is OK.
  for db_dump_name in ['db4.8_dump', 'db4.7_dump', 'db4.6_dump',
                       'db4.5_dump', 'db4.4_dump', 'db_dump']:
    try:
      if subprocess.Popen([db_dump_name, "-V"]).wait() == 0:
        return db_dump_name
    except OSError, e:
      pass
  return 'none'

######################################################################
# Class SvnRepository

class SvnRepository:
  """An object of class SvnRepository represents a Subversion repository,
     providing both a client-side view and a server-side view."""

  def __init__(self, repo_url, repo_dir):
    self.repo_url = repo_url
    self.repo_absdir = os.path.abspath(repo_dir)
    self.db_dump_name = locate_db_dump()
    # This repository object's idea of its own head revision.
    self.head_rev = 0

  def dump(self, output_dir):
    """Dump the repository into the directory OUTPUT_DIR"""
    ldir = local_path(output_dir)
    os.mkdir(ldir)

    """Run a BDB dump on the repository"""
    if self.db_dump_name != 'none':
      dump_bdb(self.db_dump_name, self.repo_absdir, ldir)

    """Run 'svnadmin dump' on the repository."""
    exit_code, stdout, stderr = \
      actions.run_and_verify_svnadmin(None, None, None,
                                      'dump', self.repo_absdir)
    ldumpfile = local_path(output_dir + "/svnadmin.dump")
    main.file_write(ldumpfile, ''.join(stderr))
    main.file_append(ldumpfile, ''.join(stdout))


  def obliterate_node_rev(self, path, rev,
                          exp_out=None, exp_err=[], exp_exit=0):
    """Obliterate the single node-rev PATH in revision REV. Check the
    expected stdout, stderr and exit code (EXP_OUT, EXP_ERR, EXP_EXIT)."""
    arg = self.repo_url + '/' + path + '@' + str(rev)
    actions.run_and_verify_svn2(None, exp_out, exp_err, exp_exit,
                                'obliterate', arg)

  def svn_mkdirs(self, *dirs):
    """Run 'svn mkdir' on the repository. DIRS is a list of directories to
    make, and each directory is a path relative to the repository root,
    neither starting nor ending with a slash."""
    urls = [self.repo_url + '/' + dir for dir in dirs]
    actions.run_and_verify_svn(None, None, [],
                               'mkdir', '-m', 'svn_mkdirs()', '--parents',
                               *urls)
    self.head_rev += 1


######################################################################
# Class SvnWC

class SvnWC:
  """An object of class SvnWC represents a WC, and provides methods for
     operating on it. It keeps track of the state of the WC and of the
     repository, so that the expected results of common operations are
     automatically known.

     Path arguments to class methods paths are relative to the WC dir and
     in Subversion canonical form ('/' separators).
     """

  def __init__(self, wc_dir, repo):
    """Initialize the object to use the existing WC at path WC_DIR and
    the existing repository object REPO."""
    self.wc_absdir = os.path.abspath(wc_dir)
    # 'state' is, at all times, the 'wc.State' representation of the state
    # of the WC, with paths relative to 'wc_absdir'.
    #self.state = wc.State('', {})
    initial_wc_tree = tree.build_tree_from_wc(self.wc_absdir, load_props=True)
    self.state = initial_wc_tree.as_state()
    self.state.add({
      '': wc.StateItem()
    })
    self.repo = repo

  def __str__(self):
    return "SvnWC(head_rev=" + str(self.repo.head_rev) + ", state={" + \
      str(self.state.desc) + \
      "})"

  def svn_mkdir(self, rpath):
    lpath = local_path(rpath)
    actions.run_and_verify_svn(None, None, [], 'mkdir', lpath)

    self.state.add({
      rpath : wc.StateItem(status='A ')
    })

#  def propset(self, pname, pvalue, *rpaths):
#    "Set property 'pname' to value 'pvalue' on each path in 'rpaths'"
#    local_paths = tuple([local_path(rpath) for rpath in rpaths])
#    actions.run_and_verify_svn(None, None, [], 'propset', pname, pvalue,
#                               *local_paths)

  def svn_set_props(self, rpath, props):
    """Change the properties of PATH to be the dictionary {name -> value} PROPS.
    """
    lpath = local_path(rpath)
    #for prop in path's existing props:
    #  actions.run_and_verify_svn(None, None, [], 'propdel',
    #                             prop, lpath)
    for prop in props:
      actions.run_and_verify_svn(None, None, [], 'propset',
                                 prop, props[prop], lpath)
    self.state.tweak(rpath, props=props)

  def svn_file_create_add(self, rpath, content=None, props=None):
    "Make and add a file with some default content, and keyword expansion."
    lpath = local_path(rpath)
    ldirname, filename = os.path.split(lpath)
    if content is None:
      # Default content
      content = "This is the file '" + filename + "'.\n" + \
                "Last changed in '$Revision$'.\n"
    main.file_write(lpath, content)
    actions.run_and_verify_svn(None, None, [], 'add', lpath)

    self.state.add({
      rpath : wc.StateItem(status='A ')
    })
    if props is None:
      # Default props
      props = {
        'svn:keywords': 'Revision'
      }
    self.svn_set_props(rpath, props)

  def file_modify(self, rpath, content=None, props=None):
    "Make text and property mods to a WC file."
    lpath = local_path(rpath)
    if content is not None:
      #main.file_append(lpath, "An extra line.\n")
      #actions.run_and_verify_svn(None, None, [], 'propset',
      #                           'newprop', 'v', lpath)
      main.file_write(lpath, content)
      self.state.tweak(rpath, content=content)
    if props is not None:
      self.set_props(rpath, props)
      self.state.tweak(rpath, props=props)

  def svn_move(self, rpath1, rpath2, parents=False):
    """Move/rename the existing WC item RPATH1 to become RPATH2.
    RPATH2 must not already exist. If PARENTS is true, any missing parents
    of RPATH2 will be created."""
    lpath1 = local_path(rpath1)
    lpath2 = local_path(rpath2)
    args = [lpath1, lpath2]
    if parents:
      args += ['--parents']
    actions.run_and_verify_svn(None, None, [], 'copy', *args)
    self.state.add({
      rpath2: self.state.desc[rpath1]
    })
    self.state.remove(rpath1)

  def svn_copy(self, rpath1, rpath2, parents=False, rev=None):
    """Copy the existing WC item RPATH1 to become RPATH2.
    RPATH2 must not already exist. If PARENTS is true, any missing parents
    of RPATH2 will be created. If REV is not None, copy revision REV of
    the node identified by WC item RPATH1."""
    lpath1 = local_path(rpath1)
    lpath2 = local_path(rpath2)
    args = [lpath1, lpath2]
    if rev is not None:
      args += ['-r', rev]
    if parents:
      args += ['--parents']
    actions.run_and_verify_svn(None, None, [], 'copy', *args)
    self.state.add({
      rpath2: self.state.desc[rpath1]
    })

  def svn_delete(self, rpath, even_if_modified=False):
    "Delete a WC path locally."
    lpath = local_path(rpath)
    args = []
    if even_if_modified:
      args += ['--force']
    actions.run_and_verify_svn(None, None, [], 'delete', lpath, *args)

  def svn_commit(self, rpath='', log=''):
    "Commit a WC path (recursively). Return the new revision number."
    lpath = local_path(rpath)
    actions.run_and_verify_svn(None, verify.AnyOutput, [],
                               'commit', '-m', log, lpath)
    actions.run_and_verify_update(lpath, None, None, None)
    self.repo.head_rev += 1
    return self.repo.head_rev

  def svn_update(self, rpath='', rev='HEAD'):
    "Update the WC to the specified revision"
    lpath = local_path(rpath)
    actions.run_and_verify_update(lpath, None, None, None)

#  def svn_merge(self, rev_spec, source, target, exp_out=None):
#    """Merge a single change from path 'source' to path 'target'.
#    SRC_CHANGE_NUM is either a number (to cherry-pick that specific change)
#    or a command-line option revision range string such as '-r10:20'."""
#    lsource = local_path(source)
#    ltarget = local_path(target)
#    if isinstance(rev_spec, int):
#      rev_spec = '-c' + str(rev_spec)
#    if exp_out is None:
#      target_re = re.escape(target)
#      exp_1 = "--- Merging r.* into '" + target_re + ".*':"
#      exp_2 = "(A |D |[UG] | [UG]|[UG][UG])   " + target_re + ".*"
#      exp_out = verify.RegexOutput(exp_1 + "|" + exp_2)
#    actions.run_and_verify_svn(None, exp_out, [],
#                               'merge', rev_spec, lsource, ltarget)