File: blob.py

package info (click to toggle)
python-pygit2 1.18.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,720 kB
  • sloc: ansic: 12,584; python: 9,337; sh: 205; makefile: 26
file content (155 lines) | stat: -rw-r--r-- 4,759 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import io
import threading
import time
from contextlib import AbstractContextManager
from typing import Optional
from queue import Queue

from ._pygit2 import Blob, Oid
from .enums import BlobFilter


class _BlobIO(io.RawIOBase):
    """Low-level wrapper for streaming blob content.

    The underlying libgit2 git_writestream filter chain will be run
    in a separate thread. The GIL will be released while running
    libgit2 filtering.
    """

    def __init__(
        self,
        blob: Blob,
        as_path: Optional[str] = None,
        flags: BlobFilter = BlobFilter.CHECK_FOR_BINARY,
        commit_id: Optional[Oid] = None,
    ):
        super().__init__()
        self._blob = blob
        self._queue: Optional[Queue] = Queue(maxsize=1)
        self._ready = threading.Event()
        self._writer_closed = threading.Event()
        self._chunk: Optional[bytes] = None
        self._thread = threading.Thread(
            target=self._blob._write_to_queue,
            args=(self._queue, self._ready, self._writer_closed),
            kwargs={
                'as_path': as_path,
                'flags': int(flags),
                'commit_id': commit_id,
            },
            daemon=True,
        )
        self._thread.start()

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def isatty(self):
        return False

    def readable(self):
        return True

    def writable(self):
        return False

    def seekable(self):
        return False

    def readinto(self, b, /):
        try:
            while self._chunk is None:
                self._ready.wait()
                if self._queue.empty():
                    if self._writer_closed.is_set():
                        # EOF
                        return 0
                    self._ready.clear()
                    time.sleep(0)
                    continue
                chunk = self._queue.get()
                if chunk:
                    self._chunk = chunk

            if len(self._chunk) <= len(b):
                bytes_written = len(self._chunk)
                b[:bytes_written] = self._chunk
                self._chunk = None
                return bytes_written
            bytes_written = len(b)
            b[:] = self._chunk[:bytes_written]
            self._chunk = self._chunk[bytes_written:]
            return bytes_written
        except KeyboardInterrupt:
            return 0

    def close(self):
        try:
            self._ready.wait()
            self._writer_closed.wait()
            while self._queue is not None and not self._queue.empty():
                self._queue.get()
            self._thread.join()
        except KeyboardInterrupt:
            pass
        self._queue = None


class BlobIO(io.BufferedReader, AbstractContextManager):
    """Read-only wrapper for streaming blob content.

    Supports reading both raw and filtered blob content.
    Implements io.BufferedReader.

    Example:

        >>> with BlobIO(blob) as f:
        ...     while True:
        ...         # Read blob data in 1KB chunks until EOF is reached
        ...         chunk = f.read(1024)
        ...         if not chunk:
        ...             break

    By default, `BlobIO` will stream the raw contents of the blob, but it
    can also be used to stream filtered content (i.e. to read the content
    after applying filters which would be used when checking out the blob
    to the working directory).

    Example:

        >>> with BlobIO(blob, as_path='my_file.ext') as f:
        ...     # Read the filtered content which would be returned upon
        ...     # running 'git checkout -- my_file.txt'
        ...     filtered_data = f.read()
    """

    def __init__(
        self,
        blob: Blob,
        as_path: Optional[str] = None,
        flags: BlobFilter = BlobFilter.CHECK_FOR_BINARY,
        commit_id: Optional[Oid] = None,
    ):
        """Wrap the specified blob.

        Parameters:
            blob: The blob to wrap.
            as_path: Filter the contents of the blob as if it had the specified
                path. If `as_path` is None, the raw contents of the blob will
                be read.
            flags: A combination of enums.BlobFilter constants
                (only applicable when `as_path` is set).
            commit_id: Commit to load attributes from when
                ATTRIBUTES_FROM_COMMIT is specified in `flags`
                (only applicable when `as_path` is set).
        """
        raw = _BlobIO(blob, as_path=as_path, flags=flags, commit_id=commit_id)
        super().__init__(raw)

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()


io.RawIOBase.register(_BlobIO)
io.BufferedIOBase.register(BlobIO)