File: git.py

package info (click to toggle)
python-wikkid 0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 728 kB
  • sloc: python: 3,051; makefile: 12
file content (219 lines) | stat: -rw-r--r-- 7,097 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#
# Copyright (C) 2012 Wikkid Developers.
#
# This software is licensed under the GNU Affero General Public License
# version 3 (see the file LICENSE).

"""A git filestore using Dulwich.

"""

import datetime
import mimetypes

from dulwich.objects import Blob, Tree, ZERO_SHA
from dulwich.object_store import tree_lookup_path
from dulwich.repo import Repo
from dulwich.walk import Walker
import posixpath
import stat

from zope.interface import implementer

from wikkid.filestore import FileExists, UpdateConflicts
from wikkid.interface.filestore import FileType, IFile, IFileStore


@implementer(IFileStore)
class FileStore(object):
    """A filestore that just uses an internal map to store data."""

    _encoding = 'utf-8'

    @classmethod
    def from_path(cls, path):
        return cls(Repo(path))

    def __init__(self, repo, ref=b'HEAD'):
        """Repo is a dulwich repository."""
        self.repo = repo
        self.ref = ref

    @property
    def store(self):
        return self.repo.object_store

    def _get_root(self, revision=None):
        if revision is None:
            try:
                revision = self.repo.refs[self.ref]
            except KeyError:
                revision = ZERO_SHA
        try:
            return (revision, self.repo[revision].tree)
        except KeyError:
            return None, None

    def get_file(self, path):
        """Return an object representing the file."""
        commit_id, root_id = self._get_root()
        if root_id is None:
            return None
        try:
            (mode, sha) = tree_lookup_path(
                self.store.__getitem__,
                root_id, path.encode(self._encoding))
        except KeyError:
            return None
        return File(
            self.store, mode, sha, path, commit_id, encoding=self._encoding)

    def update_file(self, path, content, author, parent_revision,
                    commit_message=None):
        """The `author` is updating the file at `path` with `content`."""
        commit_id, root_id = self._get_root()
        if root_id is None:
            root_tree = Tree()
        else:
            root_tree = self.store[root_id]
        # Find all tree objects involved
        tree = root_tree
        trees = [root_tree]
        elements = path.strip(posixpath.sep).split(posixpath.sep)
        for el in elements[:-1]:
            try:
                (mode, sha) = tree[el.encode(self._encoding)]
            except KeyError:
                tree = Tree()
            else:
                if not stat.S_ISDIR(mode):
                    raise FileExists(
                        "File %s exists and is not a directory" % el)
                tree = self.store[sha]
            trees.append(tree)
        if elements[-1] in tree:
            (old_mode, old_sha) = tree[elements[-1]]
            if stat.S_ISDIR(old_mode):
                raise FileExists("File %s exists and is a directory" % path)
            if old_sha != parent_revision and parent_revision is not None:
                raise UpdateConflicts(
                    "File conflict %s != %s" % (
                        old_sha, parent_revision), old_sha)
        if not isinstance(content, bytes):
            raise TypeError(content)
        blob = Blob.from_string(content)
        child = (stat.S_IFREG | 0o644, blob.id)
        self.store.add_object(blob)
        assert len(trees) == len(elements)
        for tree, name in zip(reversed(trees), reversed(elements)):
            assert name != ""
            tree[name.encode(self._encoding)] = child
            self.store.add_object(tree)
            child = (stat.S_IFDIR, tree.id)
        if commit_message is None:
            commit_message = ""
        if author is not None:
            author = author.encode(self._encoding)
        self.repo.do_commit(
            ref=self.ref, message=commit_message.encode(self._encoding),
            author=author, tree=child[1])

    def list_directory(self, directory_path):
        """Return a list of File objects for in the directory path.

        If the path doesn't exist, returns None.  If the path exists but is
        empty, an empty list is returned.  Otherwise a list of File objects in
        that directory.
        """
        if directory_path is None:
            directory_path = ''
        else:
            directory_path = directory_path.strip(posixpath.sep)
        commit_id, root_id = self._get_root()
        if directory_path == '':
            sha = root_id
            mode = stat.S_IFDIR
        else:
            if root_id is None:
                return None
            try:
                (mode, sha) = tree_lookup_path(
                    self.store.__getitem__,
                    root_id, directory_path.encode(self._encoding))
            except KeyError:
                return None
        if mode is not None and stat.S_ISDIR(mode):
            ret = []
            for (name, mode, sha) in self.store[sha].items():
                ret.append(
                    File(self.store, mode, sha,
                         posixpath.join(
                             directory_path, name.decode(self._encoding)),
                         commit_id, encoding=self._encoding))
            return ret
        else:
            return None


@implementer(IFile)
class File(object):
    """A Git file object."""

    def __init__(self, store, mode, sha, path, commit_sha, encoding):
        self.store = store
        self.encoding = encoding
        self.mode = mode
        self.sha = sha
        self.path = path
        self.commit_sha = commit_sha
        self.base_name = posixpath.basename(path)
        self.mimetype = mimetypes.guess_type(self.base_name)[0]

    @property
    def file_type(self):
        """Work out the filetype based on the mimetype if possible."""
        if self._is_directory:
            return FileType.DIRECTORY
        else:
            if self.mimetype is None:
                binary = self._is_binary
            else:
                binary = not self.mimetype.startswith('text/')
            if binary:
                return FileType.BINARY_FILE
            else:
                return FileType.TEXT_FILE

    def get_content(self):
        o = self.store[self.sha]
        if isinstance(o, Blob):
            return o.data
        else:
            return None

    @property
    def _is_directory(self):
        return stat.S_ISDIR(self.mode)

    @property
    def _is_binary(self):
        return b'\0' in self.get_content()

    def _get_last_modified_commit(self):
        walker = Walker(
            self.store, include=[self.commit_sha],
            paths=[self.path.encode(self.encoding)])
        return next(iter(walker)).commit

    @property
    def last_modified_in_revision(self):
        return self.sha

    @property
    def last_modified_by(self):
        return self._get_last_modified_commit().author.decode(self.encoding)

    @property
    def last_modified_date(self):
        c = self._get_last_modified_commit()
        return datetime.datetime.utcfromtimestamp(c.commit_time)