File: registry.py

package info (click to toggle)
python-fs 2.4.16-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 1,944 kB
  • sloc: python: 13,048; makefile: 226; sh: 3
file content (283 lines) | stat: -rw-r--r-- 9,429 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# coding: utf-8
"""`Registry` class mapping protocols and FS URLs to their `Opener`.
"""

from __future__ import absolute_import, print_function, unicode_literals

import typing

import collections
import contextlib

from ..errors import ResourceReadOnly
from .base import Opener
from .errors import EntryPointError, UnsupportedProtocol
from .parse import parse_fs_url

if typing.TYPE_CHECKING:
    from typing import Callable, Dict, Iterator, List, Text, Tuple, Type, Union

    from ..base import FS


class Registry(object):
    """A registry for `Opener` instances."""

    def __init__(self, default_opener="osfs", load_extern=False):
        # type: (Text, bool) -> None
        """Create a registry object.

        Arguments:
            default_opener (str, optional): The protocol to use, if one
                is not supplied. The default is to use 'osfs', so that the
                FS URL is treated as a system path if no protocol is given.
            load_extern (bool, optional): Set to `True` to load openers from
                PyFilesystem2 extensions. Defaults to `False`.

        """
        self.default_opener = default_opener
        self.load_extern = load_extern
        self._protocols = {}  # type: Dict[Text, Opener]

    def __repr__(self):
        # type: () -> Text
        return "<fs-registry {!r}>".format(self.protocols)

    def install(self, opener):
        # type: (Union[Type[Opener], Opener, Callable[[], Opener]]) -> Opener
        """Install an opener.

        Arguments:
            opener (`Opener`): an `Opener` instance, or a callable that
                returns an opener instance.

        Note:
            May be used as a class decorator. For example::

                registry = Registry()
                @registry.install
                class ArchiveOpener(Opener):
                    protocols = ['zip', 'tar']

        """
        _opener = opener if isinstance(opener, Opener) else opener()
        assert isinstance(_opener, Opener), "Opener instance required"
        assert _opener.protocols, "must list one or more protocols"
        for protocol in _opener.protocols:
            self._protocols[protocol] = _opener
        return _opener

    @property
    def protocols(self):
        # type: () -> List[Text]
        """`list`: the list of supported protocols."""
        _protocols = list(self._protocols)
        return _protocols

    def get_opener(self, protocol):
        # type: (Text) -> Opener
        """Get the opener class associated to a given protocol.

        Arguments:
            protocol (str): A filesystem protocol.

        Returns:
            Opener: an opener instance.

        Raises:
            ~fs.opener.errors.UnsupportedProtocol: If no opener
                could be found for the given protocol.
            EntryPointLoadingError: If the returned entry point
                is not an `Opener` subclass or could not be loaded
                successfully.

        """
        protocol = protocol or self.default_opener

        if True:
            entry_point = None

        # If not entry point was loaded from the extensions, try looking
        # into the registered protocols
        if entry_point is None:
            if protocol in self._protocols:
                opener_instance = self._protocols[protocol]
            else:
                raise UnsupportedProtocol(
                    "protocol '{}' is not supported".format(protocol)
                )

        # If an entry point was found in an extension, attempt to load it
        else:
            try:
                opener = entry_point.load()
            except Exception as exception:
                raise EntryPointError(
                    "could not load entry point; {}".format(exception)
                )
            if not issubclass(opener, Opener):
                raise EntryPointError("entry point did not return an opener")

            try:
                opener_instance = opener()
            except Exception as exception:
                raise EntryPointError(
                    "could not instantiate opener; {}".format(exception)
                )

        return opener_instance

    def open(
        self,
        fs_url,  # type: Text
        writeable=True,  # type: bool
        create=False,  # type: bool
        cwd=".",  # type: Text
        default_protocol="osfs",  # type: Text
    ):
        # type: (...) -> Tuple[FS, Text]
        """Open a filesystem from a FS URL.

        Returns a tuple of a filesystem object and a path. If there is
        no path in the FS URL, the path value will be `None`.

        Arguments:
            fs_url (str): A filesystem URL.
            writeable (bool, optional): `True` if the filesystem must be
                writeable.
            create (bool, optional): `True` if the filesystem should be
                created if it does not exist.
            cwd (str): The current working directory.

        Returns:
            (FS, str): a tuple of ``(<filesystem>, <path from url>)``

        """
        if "://" not in fs_url:
            # URL may just be a path
            fs_url = "{}://{}".format(default_protocol, fs_url)

        parse_result = parse_fs_url(fs_url)
        protocol = parse_result.protocol
        open_path = parse_result.path

        opener = self.get_opener(protocol)

        open_fs = opener.open_fs(fs_url, parse_result, writeable, create, cwd)
        return open_fs, open_path

    def open_fs(
        self,
        fs_url,  # type: Union[FS, Text]
        writeable=False,  # type: bool
        create=False,  # type: bool
        cwd=".",  # type: Text
        default_protocol="osfs",  # type: Text
    ):
        # type: (...) -> FS
        """Open a filesystem from a FS URL (ignoring the path component).

        Arguments:
            fs_url (str): A filesystem URL. If a filesystem instance is
                given instead, it will be returned transparently.
            writeable (bool, optional): `True` if the filesystem must
                be writeable.
            create (bool, optional): `True` if the filesystem should be
                created if it does not exist.
            cwd (str): The current working directory (generally only
                relevant for OS filesystems).
            default_protocol (str): The protocol to use if one is not
                supplied in the FS URL (defaults to ``"osfs"``).

        Returns:
            ~fs.base.FS: A filesystem instance.

        Caution:
            The ``writeable`` parameter only controls whether the
            filesystem *needs* to be writable, which is relevant for
            some archive filesystems. Passing ``writeable=False`` will
            **not** make the return filesystem read-only. For this,
            consider using `fs.wrap.read_only` to wrap the returned
            instance.

        """
        from ..base import FS

        if isinstance(fs_url, FS):
            _fs = fs_url
        else:
            _fs, _path = self.open(
                fs_url,
                writeable=writeable,
                create=create,
                cwd=cwd,
                default_protocol=default_protocol,
            )
        return _fs

    @contextlib.contextmanager
    def manage_fs(
        self,
        fs_url,  # type: Union[FS, Text]
        create=False,  # type: bool
        writeable=False,  # type: bool
        cwd=".",  # type: Text
    ):
        # type: (...) -> Iterator[FS]
        """Get a context manager to open and close a filesystem.

        Arguments:
            fs_url (FS or str): A filesystem instance or a FS URL.
            create (bool, optional): If `True`, then create the filesystem if
                it doesn't already exist.
            writeable (bool, optional): If `True`, then the filesystem
                must be writeable.
            cwd (str): The current working directory, if opening a
                `~fs.osfs.OSFS`.

        Sometimes it is convenient to be able to pass either a FS object
        *or* an FS URL to a function. This context manager handles the
        required logic for that.

        Example:
            The `~Registry.manage_fs` method can be used to define a small
            utility function::

                >>> def print_ls(list_fs):
                ...     '''List a directory.'''
                ...     with manage_fs(list_fs) as fs:
                ...         print(' '.join(fs.listdir()))

            This function may be used in two ways. You may either pass
            a ``str``, as follows::

                >>> print_list('zip://projects.zip')

            Or, an filesystem instance::

                >>> from fs.osfs import OSFS
                >>> projects_fs = OSFS('~/')
                >>> print_list(projects_fs)

        """
        from ..base import FS

        def assert_writeable(fs):
            if fs.getmeta().get("read_only", True):
                raise ResourceReadOnly(path="/")

        if isinstance(fs_url, FS):
            if writeable:
                assert_writeable(fs_url)
            yield fs_url
        else:
            _fs = self.open_fs(fs_url, create=create, writeable=writeable, cwd=cwd)
            if writeable:
                assert_writeable(_fs)
            try:
                yield _fs
            finally:
                _fs.close()


registry = Registry(load_extern=True)