1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
|
"""
Wordnet and ILI Packages and Collections
"""
import gzip
import lzma
import shutil
import tarfile
import tempfile
from collections.abc import Iterator
from pathlib import Path
from wn import ili, lmf
from wn._config import config
from wn._exceptions import Error
from wn._types import AnyPath
from wn._util import is_gzip, is_lzma
from wn.constants import _ILI, _WORDNET
_ADDITIONAL_FILE_SUFFIXES = ("", ".txt", ".md", ".rst")
def is_package_directory(path: AnyPath) -> bool:
"""Return ``True`` if *path* appears to be a wordnet or ILI package."""
path = Path(path).expanduser()
return len(_package_directory_types(path)) == 1
def _package_directory_types(path: Path) -> list[tuple[Path, str]]:
types: list[tuple[Path, str]] = []
if path.is_dir():
for p in path.iterdir():
typ = _resource_file_type(p)
if typ is not None:
types.append((p, typ))
return types
def _resource_file_type(path: Path) -> str | None:
if lmf.is_lmf(path):
return _WORDNET
elif ili.is_ili_tsv(path):
return _ILI
return None
def is_collection_directory(path: AnyPath) -> bool:
"""Return ``True`` if *path* appears to be a wordnet collection."""
path = Path(path).expanduser()
return (
path.is_dir() and len(list(filter(is_package_directory, path.iterdir()))) >= 1
)
class Project:
"""The base class for packages and collections."""
__slots__ = ("_path",)
def __init__(self, path: AnyPath):
self._path: Path = Path(path).expanduser()
@property
def path(self) -> Path:
"""The path of the project directory or resource file.
For :class:`Package` and :class:`Collection` objects, the path
is its directory. For :class:`ResourceOnlyPackage` objects,
the path is the same as from
:meth:`resource_file() <Package.resource_file>`
"""
return self._path
def readme(self) -> Path | None:
"""Return the path of the README file, or :data:`None` if none exists."""
return self._find_file(self._path / "README", _ADDITIONAL_FILE_SUFFIXES)
def license(self) -> Path | None:
"""Return the path of the license, or :data:`None` if none exists."""
return self._find_file(self._path / "LICENSE", _ADDITIONAL_FILE_SUFFIXES)
def citation(self) -> Path | None:
"""Return the path of the citation, or :data:`None` if none exists."""
return self._find_file(self._path / "citation", (".bib",))
def _find_file(self, base: Path, suffixes: tuple[str, ...]) -> Path | None:
for suffix in suffixes:
base = base.with_suffix(suffix)
if base.is_file():
return base
return None
class Package(Project):
"""A wordnet or ILI package.
A package is a directory with a resource file and optional
metadata files.
"""
@property
def type(self) -> str | None:
"""Return the name of the type of resource contained by the package.
Valid return values are:
- :python:`"wordnet"` -- the resource is a WN-LMF lexicon file
- :python:`"ili"` -- the resource is an interlingual index file
- :data:`None` -- the resource type is undetermined
"""
return _resource_file_type(self.resource_file())
def resource_file(self) -> Path:
"""Return the path of the package's resource file."""
files = _package_directory_types(self._path)
if not files:
raise Error(f"no resource found in package: {self._path!s}")
elif len(files) > 1:
raise Error(f"multiple resource found in package: {self._path!s}")
return files[0][0]
class ResourceOnlyPackage(Package):
"""A virtual package for a single-file resource.
This class is for resource files that are not distributed in a
package directory. The :meth:`readme() <Project.readme>`,
:meth:`license() <Project.license>`, and
:meth:`citation() <Project.citation>` methods all return
:data:`None`.
"""
def resource_file(self) -> Path:
return self._path
def readme(self):
return None
def license(self):
return None
def citation(self):
return None
class Collection(Project):
"""A wordnet or ILI collection
Collections are directories that contain package directories and
optional metadata files.
"""
def packages(self) -> list[Package]:
"""Return the list of packages in the collection."""
return [
Package(path) for path in self._path.iterdir() if is_package_directory(path)
]
def get_project(
*,
project: str | None = None,
path: AnyPath | None = None,
) -> Project:
"""Return the :class:`Project` object for *project* or *path*.
The *project* argument is a project specifier and will look in the
download cache for the project data. If the project has not been
downloaded and cached, an error will be raised.
The *path* argument looks for project data at the given path. It
can point to a resource file, a package directory, or a collection
directory. Unlike :func:`iterpackages`, this function does not
iterate over packages within a collection, and instead the
:class:`Collection` object is returned.
.. note::
If the target is compressed or archived, the data will be
extracted to a temporary directory. It is the user's
responsibility to delete this temporary directory, which is
indicated by :data:`Project.path`.
"""
if project and path:
raise TypeError("expected a project specifier or a path, not both")
if not project and not path:
raise TypeError("expected a project specifier or a path")
if project:
info = config.get_project_info(project)
if not info["cache"]:
raise Error(f"{project} is not cached; try `wn.download({project!r}` first")
path = info["cache"]
assert path
proj, _ = _get_project_from_path(path)
return proj
def _get_project_from_path(
path: AnyPath,
tmp_path: Path | None = None,
) -> tuple[Project, Path | None]:
path = Path(path).expanduser()
if path.is_dir():
if is_package_directory(path):
return Package(path), tmp_path
elif is_collection_directory(path):
return Collection(path), tmp_path
else:
raise Error(
f"does not appear to be a valid package or collection: {path!s}"
)
elif tarfile.is_tarfile(path):
tmpdir_ = Path(tempfile.mkdtemp())
with tarfile.open(path) as tar:
_check_tar(tar)
tar.extractall(path=tmpdir_)
contents = list(tmpdir_.iterdir())
if len(contents) != 1:
raise Error(
"archive may only have one resource, package, or collection"
)
return _get_project_from_path(contents[0], tmp_path=tmpdir_)
else:
decompressed, tmp_path = _get_decompressed(path, tmp_path)
if lmf.is_lmf(decompressed) or ili.is_ili_tsv(decompressed):
return ResourceOnlyPackage(decompressed), tmp_path
else:
raise Error(f"not a valid lexical resource: {path!s}")
def iterpackages(path: AnyPath, delete: bool = True) -> Iterator[Package]:
"""Yield any wordnet or ILI packages found at *path*.
The *path* argument can point to one of the following:
- a lexical resource file or ILI file
- a wordnet package directory
- a wordnet collection directory
- a tar archive containing one of the above
- a compressed (gzip or lzma) resource file or tar archive
The *delete* argument determines whether any created temporary
directories will be deleted after iteration is complete. When it
is :data:`True`, the package objects can only be inspected during
iteration. If one needs persistent objects (e.g.,
:python:`pkgs = list(iterpackages(...))`), then set *delete* to
:data:`False`.
.. warning::
When *delete* is set to :data:`False`, the user is responsible
for cleaning up any temporary directories. The
:data:`Project.path` attribute indicates the path of the
temporary directory.
"""
project, tmp_path = _get_project_from_path(path)
try:
match project:
case Package():
yield project
case Collection():
yield from project.packages()
case _:
raise Error(f"unexpected project type: {project.__class__.__name__}")
finally:
if tmp_path and delete:
if tmp_path.is_dir():
shutil.rmtree(tmp_path)
elif tmp_path.is_file():
tmp_path.unlink()
else:
raise Error(f"could not remove temporary path: {tmp_path}")
def _get_decompressed(
source: Path,
tmp_path: Path | None,
) -> tuple[Path, Path | None]:
gzipped = is_gzip(source)
xzipped = is_lzma(source)
if not (gzipped or xzipped):
return source, tmp_path
else:
tmp = tempfile.NamedTemporaryFile(suffix=".xml", delete=False) # noqa: SIM115
path = Path(tmp.name)
try:
if gzipped:
with gzip.open(source, "rb") as gzip_src:
shutil.copyfileobj(gzip_src, tmp)
else: # xzipped
with lzma.open(source, "rb") as lzma_src:
shutil.copyfileobj(lzma_src, tmp)
tmp.close() # Windows cannot reliably reopen until it's closed
except (OSError, EOFError, lzma.LZMAError) as exc:
raise Error(f"could not decompress file: {source}") from exc
# if tmp_path is not None, the compressed file was in a
# temporary directory, so return that. Otherwise the new path
# becomes the tmp_path
return path, tmp_path or path
def _check_tar(tar: tarfile.TarFile) -> None:
"""Check the tarfile to avoid potential security issues.
Currently collections and packages have the following constraints:
- Only regular files or directories
- No paths starting with '/' or containing '..'
"""
for info in tar.getmembers():
if not (info.isfile() or info.isdir()):
raise Error(
f"tarfile member is not a regular file or directory: {info.name}"
)
if info.name.startswith("/") or ".." in info.name:
raise Error(
f"tarfile member paths may not be absolute or contain ..: {info.name}"
)
|