File: vcs.py

package info (click to toggle)
firefox-esr 68.10.0esr-1~deb9u1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 3,143,932 kB
  • sloc: cpp: 5,227,879; javascript: 4,315,531; ansic: 2,467,042; python: 794,975; java: 349,993; asm: 232,034; xml: 228,320; sh: 82,008; lisp: 41,202; makefile: 22,347; perl: 15,555; objc: 5,277; cs: 4,725; yacc: 1,778; ada: 1,681; pascal: 1,673; lex: 1,417; exp: 527; php: 436; ruby: 225; awk: 162; sed: 53; csh: 44
file content (299 lines) | stat: -rw-r--r-- 10,244 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import json
import os
import platform
import stat
import subprocess
from collections import deque

from .sourcefile import SourceFile


def get_tree(tests_root, manifest, manifest_path, cache_root,
             working_copy=False, rebuild=False):
    tree = None
    if cache_root is None:
        cache_root = os.path.join(tests_root, ".wptcache")
    if not os.path.exists(cache_root):
        try:
            os.makedirs(cache_root)
        except IOError:
            cache_root = None

    if not working_copy:
        tree = Git.for_path(tests_root,
                            manifest.url_base,
                            manifest_path=manifest_path,
                            cache_path=cache_root,
                            rebuild=rebuild)
    if tree is None:
        tree = FileSystem(tests_root,
                          manifest.url_base,
                          manifest_path=manifest_path,
                          cache_path=cache_root,
                          rebuild=rebuild)
    return tree


class Git(object):
    def __init__(self, repo_root, url_base, cache_path, manifest_path=None,
                 rebuild=False):
        self.root = repo_root
        self.git = Git.get_func(repo_root)
        self.url_base = url_base
        # rebuild is a noop for now since we don't cache anything

    @staticmethod
    def get_func(repo_path):
        def git(cmd, *args):
            full_cmd = ["git", cmd] + list(args)
            try:
                return subprocess.check_output(full_cmd, cwd=repo_path, stderr=subprocess.STDOUT)
            except Exception as e:
                if platform.uname()[0] == "Windows" and isinstance(e, WindowsError):
                    full_cmd[0] = "git.bat"
                    return subprocess.check_output(full_cmd, cwd=repo_path, stderr=subprocess.STDOUT)
                else:
                    raise
        return git

    @classmethod
    def for_path(cls, path, url_base, cache_path, manifest_path=None, rebuild=False):
        git = Git.get_func(path)
        try:
            # this needs to be a command that fails if we aren't in a git repo
            git("rev-parse", "--show-toplevel")
        except (subprocess.CalledProcessError, OSError):
            return None
        else:
            return cls(path, url_base, cache_path,
                       manifest_path=manifest_path, rebuild=rebuild)

    def _local_changes(self):
        """get a set of files which have changed between HEAD and working copy"""
        changes = set()

        cmd = ["status", "-z", "--ignore-submodules=all"]
        data = self.git(*cmd)

        in_rename = False
        for line in data.split(b"\0")[:-1]:
            if in_rename:
                changes.add(line)
                in_rename = False
            else:
                status = line[:2]
                if b"R" in status or b"C" in status:
                    in_rename = True
                changes.add(line[3:])

        return changes

    def _show_file(self, path):
        path = os.path.relpath(os.path.abspath(path), self.root)
        return self.git("show", "HEAD:%s" % path)

    def __iter__(self):
        cmd = ["ls-tree", "-r", "-z", "HEAD"]
        local_changes = self._local_changes()
        for result in self.git(*cmd).split("\0")[:-1]:
            data, rel_path = result.rsplit("\t", 1)
            hash = data.split(" ", 3)[2]
            if rel_path in local_changes:
                contents = self._show_file(rel_path)
            else:
                contents = None
            yield SourceFile(self.root,
                             rel_path,
                             self.url_base,
                             hash,
                             contents=contents), True

    def dump_caches(self):
        pass


class FileSystem(object):
    def __init__(self, root, url_base, cache_path, manifest_path=None, rebuild=False):
        from gitignore import gitignore
        self.root = os.path.abspath(root)
        self.url_base = url_base
        self.ignore_cache = None
        self.mtime_cache = None
        if cache_path is not None:
            if manifest_path is not None:
                self.mtime_cache = MtimeCache(cache_path, root, manifest_path, rebuild)
            if gitignore.has_ignore(root):
                self.ignore_cache = GitIgnoreCache(cache_path, root, rebuild)
        self.path_filter = gitignore.PathFilter(self.root,
                                                extras=[".git/"],
                                                cache=self.ignore_cache)

    def __iter__(self):
        mtime_cache = self.mtime_cache
        for dirpath, dirnames, filenames in self.path_filter(walk(self.root)):
            for filename, path_stat in filenames:
                path = os.path.join(dirpath, filename)
                if mtime_cache is None or mtime_cache.updated(path, path_stat):
                    yield SourceFile(self.root, path, self.url_base), True
                else:
                    yield path, False

    def dump_caches(self):
        for cache in [self.mtime_cache, self.ignore_cache]:
            if cache is not None:
                cache.dump()


class CacheFile(object):
    file_name = None

    def __init__(self, cache_root, tests_root, rebuild=False):
        self.tests_root = tests_root
        if not os.path.exists(cache_root):
            os.makedirs(cache_root)
        self.path = os.path.join(cache_root, self.file_name)
        self.modified = False
        self.data = self.load(rebuild)

    def dump(self):
        if not self.modified:
            return
        with open(self.path, 'w') as f:
            json.dump(self.data, f, indent=1)

    def load(self, rebuild=False):
        data = {}
        try:
            if not rebuild:
                with open(self.path, 'r') as f:
                    data = json.load(f)
                data = self.check_valid(data)
        except IOError:
            pass
        return data

    def check_valid(self, data):
        """Check if the cached data is valid and return an updated copy of the
        cache containing only data that can be used."""
        return data


class MtimeCache(CacheFile):
    file_name = "mtime.json"

    def __init__(self, cache_root, tests_root, manifest_path, rebuild=False):
        self.manifest_path = manifest_path
        super(MtimeCache, self).__init__(cache_root, tests_root, rebuild=False)

    def updated(self, rel_path, stat):
        """Return a boolean indicating whether the file changed since the cache was last updated.

        This implicitly updates the cache with the new mtime data."""
        mtime = stat.st_mtime
        if mtime != self.data.get(rel_path):
            self.modified = True
            self.data[rel_path] = mtime
            return True
        return False

    def check_valid(self, data):
        if data.get("/tests_root") != self.tests_root:
            self.modified = True
        else:
            if self.manifest_path is not None and os.path.exists(self.manifest_path):
                mtime = os.path.getmtime(self.manifest_path)
                if data.get("/manifest_path") != [self.manifest_path, mtime]:
                    self.modified = True
            else:
                self.modified = True
        if self.modified:
            data = {}
            data["/tests_root"] = self.tests_root
        return data

    def dump(self):
        if self.manifest_path is None:
            raise ValueError
        if not os.path.exists(self.manifest_path):
            return
        mtime = os.path.getmtime(self.manifest_path)
        self.data["/manifest_path"] = [self.manifest_path, mtime]
        self.data["/tests_root"] = self.tests_root
        super(MtimeCache, self).dump()


class GitIgnoreCache(CacheFile):
    file_name = "gitignore.json"

    def check_valid(self, data):
        ignore_path = os.path.join(self.tests_root, ".gitignore")
        mtime = os.path.getmtime(ignore_path)
        if data.get("/gitignore_file") != [ignore_path, mtime]:
            self.modified = True
            data = {}
            data["/gitignore_file"] = [ignore_path, mtime]
        return data

    def __contains__(self, key):
        return key in self.data

    def __getitem__(self, key):
        return self.data[key]

    def __setitem__(self, key, value):
        if self.data.get(key) != value:
            self.modified = True
            self.data[key] = value


def walk(root):
    """Re-implementation of os.walk. Returns an iterator over
    (dirpath, dirnames, filenames), with some semantic differences
    to os.walk.

    This has a similar interface to os.walk, with the important difference
    that instead of lists of filenames and directory names, it yields
    lists of tuples of the form [(name, stat)] where stat is the result of
    os.stat for the file. That allows reusing the same stat data in the
    caller. It also always returns the dirpath relative to the root, with
    the root iself being returned as the empty string.

    Unlike os.walk the implementation is not recursive."""

    listdir = os.listdir
    get_stat = os.stat
    listdir = os.listdir
    join = os.path.join
    is_dir = stat.S_ISDIR
    is_link = stat.S_ISLNK
    relpath = os.path.relpath

    root = os.path.abspath(root)
    stack = deque([(root, "")])

    while stack:
        dir_path, rel_path = stack.popleft()
        try:
            # Note that listdir and error are globals in this module due
            # to earlier import-*.
            names = listdir(dir_path)
        except OSError:
            continue

        dirs, non_dirs = [], []
        for name in names:
            path = join(dir_path, name)
            try:
                path_stat = get_stat(path)
            except OSError:
                continue
            if is_dir(path_stat.st_mode):
                dirs.append((name, path_stat))
            else:
                non_dirs.append((name, path_stat))

        yield rel_path, dirs, non_dirs
        for name, path_stat in dirs:
            new_path = join(dir_path, name)
            if not is_link(path_stat.st_mode):
                stack.append((new_path, relpath(new_path, root)))