1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
# coding: utf-8
"""`Registry` class mapping protocols and FS URLs to their `Opener`.
"""
from __future__ import absolute_import, print_function, unicode_literals
import typing
import collections
import contextlib
from ..errors import ResourceReadOnly
from .base import Opener
from .errors import EntryPointError, UnsupportedProtocol
from .parse import parse_fs_url
if typing.TYPE_CHECKING:
from typing import Callable, Dict, Iterator, List, Text, Tuple, Type, Union
from ..base import FS
class Registry(object):
"""A registry for `Opener` instances."""
def __init__(self, default_opener="osfs", load_extern=False):
# type: (Text, bool) -> None
"""Create a registry object.
Arguments:
default_opener (str, optional): The protocol to use, if one
is not supplied. The default is to use 'osfs', so that the
FS URL is treated as a system path if no protocol is given.
load_extern (bool, optional): Set to `True` to load openers from
PyFilesystem2 extensions. Defaults to `False`.
"""
self.default_opener = default_opener
self.load_extern = load_extern
self._protocols = {} # type: Dict[Text, Opener]
def __repr__(self):
# type: () -> Text
return "<fs-registry {!r}>".format(self.protocols)
def install(self, opener):
# type: (Union[Type[Opener], Opener, Callable[[], Opener]]) -> Opener
"""Install an opener.
Arguments:
opener (`Opener`): an `Opener` instance, or a callable that
returns an opener instance.
Note:
May be used as a class decorator. For example::
registry = Registry()
@registry.install
class ArchiveOpener(Opener):
protocols = ['zip', 'tar']
"""
_opener = opener if isinstance(opener, Opener) else opener()
assert isinstance(_opener, Opener), "Opener instance required"
assert _opener.protocols, "must list one or more protocols"
for protocol in _opener.protocols:
self._protocols[protocol] = _opener
return _opener
@property
def protocols(self):
# type: () -> List[Text]
"""`list`: the list of supported protocols."""
_protocols = list(self._protocols)
return _protocols
def get_opener(self, protocol):
# type: (Text) -> Opener
"""Get the opener class associated to a given protocol.
Arguments:
protocol (str): A filesystem protocol.
Returns:
Opener: an opener instance.
Raises:
~fs.opener.errors.UnsupportedProtocol: If no opener
could be found for the given protocol.
EntryPointLoadingError: If the returned entry point
is not an `Opener` subclass or could not be loaded
successfully.
"""
protocol = protocol or self.default_opener
if True:
entry_point = None
# If not entry point was loaded from the extensions, try looking
# into the registered protocols
if entry_point is None:
if protocol in self._protocols:
opener_instance = self._protocols[protocol]
else:
raise UnsupportedProtocol(
"protocol '{}' is not supported".format(protocol)
)
# If an entry point was found in an extension, attempt to load it
else:
try:
opener = entry_point.load()
except Exception as exception:
raise EntryPointError(
"could not load entry point; {}".format(exception)
)
if not issubclass(opener, Opener):
raise EntryPointError("entry point did not return an opener")
try:
opener_instance = opener()
except Exception as exception:
raise EntryPointError(
"could not instantiate opener; {}".format(exception)
)
return opener_instance
def open(
self,
fs_url, # type: Text
writeable=True, # type: bool
create=False, # type: bool
cwd=".", # type: Text
default_protocol="osfs", # type: Text
):
# type: (...) -> Tuple[FS, Text]
"""Open a filesystem from a FS URL.
Returns a tuple of a filesystem object and a path. If there is
no path in the FS URL, the path value will be `None`.
Arguments:
fs_url (str): A filesystem URL.
writeable (bool, optional): `True` if the filesystem must be
writeable.
create (bool, optional): `True` if the filesystem should be
created if it does not exist.
cwd (str): The current working directory.
Returns:
(FS, str): a tuple of ``(<filesystem>, <path from url>)``
"""
if "://" not in fs_url:
# URL may just be a path
fs_url = "{}://{}".format(default_protocol, fs_url)
parse_result = parse_fs_url(fs_url)
protocol = parse_result.protocol
open_path = parse_result.path
opener = self.get_opener(protocol)
open_fs = opener.open_fs(fs_url, parse_result, writeable, create, cwd)
return open_fs, open_path
def open_fs(
self,
fs_url, # type: Union[FS, Text]
writeable=False, # type: bool
create=False, # type: bool
cwd=".", # type: Text
default_protocol="osfs", # type: Text
):
# type: (...) -> FS
"""Open a filesystem from a FS URL (ignoring the path component).
Arguments:
fs_url (str): A filesystem URL. If a filesystem instance is
given instead, it will be returned transparently.
writeable (bool, optional): `True` if the filesystem must
be writeable.
create (bool, optional): `True` if the filesystem should be
created if it does not exist.
cwd (str): The current working directory (generally only
relevant for OS filesystems).
default_protocol (str): The protocol to use if one is not
supplied in the FS URL (defaults to ``"osfs"``).
Returns:
~fs.base.FS: A filesystem instance.
Caution:
The ``writeable`` parameter only controls whether the
filesystem *needs* to be writable, which is relevant for
some archive filesystems. Passing ``writeable=False`` will
**not** make the return filesystem read-only. For this,
consider using `fs.wrap.read_only` to wrap the returned
instance.
"""
from ..base import FS
if isinstance(fs_url, FS):
_fs = fs_url
else:
_fs, _path = self.open(
fs_url,
writeable=writeable,
create=create,
cwd=cwd,
default_protocol=default_protocol,
)
return _fs
@contextlib.contextmanager
def manage_fs(
self,
fs_url, # type: Union[FS, Text]
create=False, # type: bool
writeable=False, # type: bool
cwd=".", # type: Text
):
# type: (...) -> Iterator[FS]
"""Get a context manager to open and close a filesystem.
Arguments:
fs_url (FS or str): A filesystem instance or a FS URL.
create (bool, optional): If `True`, then create the filesystem if
it doesn't already exist.
writeable (bool, optional): If `True`, then the filesystem
must be writeable.
cwd (str): The current working directory, if opening a
`~fs.osfs.OSFS`.
Sometimes it is convenient to be able to pass either a FS object
*or* an FS URL to a function. This context manager handles the
required logic for that.
Example:
The `~Registry.manage_fs` method can be used to define a small
utility function::
>>> def print_ls(list_fs):
... '''List a directory.'''
... with manage_fs(list_fs) as fs:
... print(' '.join(fs.listdir()))
This function may be used in two ways. You may either pass
a ``str``, as follows::
>>> print_list('zip://projects.zip')
Or, an filesystem instance::
>>> from fs.osfs import OSFS
>>> projects_fs = OSFS('~/')
>>> print_list(projects_fs)
"""
from ..base import FS
def assert_writeable(fs):
if fs.getmeta().get("read_only", True):
raise ResourceReadOnly(path="/")
if isinstance(fs_url, FS):
if writeable:
assert_writeable(fs_url)
yield fs_url
else:
_fs = self.open_fs(fs_url, create=create, writeable=writeable, cwd=cwd)
if writeable:
assert_writeable(_fs)
try:
yield _fs
finally:
_fs.close()
registry = Registry(load_extern=True)
|