1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
|
from __future__ import absolute_import
import itertools
import operator
import os
from plumbum.lib import six
from abc import abstractmethod, abstractproperty
import warnings
from functools import reduce
class FSUser(int):
"""A special object that represents a file-system user. It derives from ``int``, so it behaves
just like a number (``uid``/``gid``), but also have a ``.name`` attribute that holds the
string-name of the user, if given (otherwise ``None``)
"""
def __new__(cls, val, name = None):
self = int.__new__(cls, val)
self.name = name
return self
class Path(str, six.ABC):
"""An abstraction over file system paths. This class is abstract, and the two implementations
are :class:`LocalPath <plumbum.machines.local.LocalPath>` and
:class:`RemotePath <plumbum.path.remote.RemotePath>`.
"""
CASE_SENSITIVE = True
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, str(self))
def __div__(self, other):
"""Joins two paths"""
return self.join(other)
__truediv__ = __div__
def __floordiv__(self, expr):
"""Returns a (possibly empty) list of paths that matched the glob-pattern under this path"""
return self.glob(expr)
def __iter__(self):
"""Iterate over the files in this directory"""
return iter(self.list())
def __eq__(self, other):
if isinstance(other, Path):
return self._get_info() == other._get_info()
elif isinstance(other, str):
if self.CASE_SENSITIVE:
return str(self) == other
else:
return str(self).lower() == other.lower()
else:
return NotImplemented
def __ne__(self, other):
return not (self == other)
def __gt__(self, other):
return str(self) > str(other)
def __ge__(self, other):
return str(self) >= str(other)
def __lt__(self, other):
return str(self) < str(other)
def __le__(self, other):
return str(self) <= str(other)
def __hash__(self):
if self.CASE_SENSITIVE:
return hash(str(self))
else:
return hash(str(self).lower())
def __nonzero__(self):
return bool(str(self))
__bool__ = __nonzero__
@abstractmethod
def _form(self, *parts):
pass
def up(self, count = 1):
"""Go up in ``count`` directories (the default is 1)"""
return self.join("../" * count)
def walk(self, filter = lambda p: True, dir_filter = lambda p: True): # @ReservedAssignment
"""traverse all (recursive) sub-elements under this directory, that match the given filter.
By default, the filter accepts everything; you can provide a custom filter function that
takes a path as an argument and returns a boolean
:param filter: the filter (predicate function) for matching results. Only paths matching
this predicate are returned. Defaults to everything.
:param dir_filter: the filter (predicate function) for matching directories. Only directories
matching this predicate are recursed into. Defaults to everything.
"""
for p in self.list():
if filter(p):
yield p
if p.is_dir() and dir_filter(p):
for p2 in p.walk(filter, dir_filter):
yield p2
@abstractproperty
def name(self):
"""The basename component of this path"""
@property
def basename(self):
"""Included for compatibility with older Plumbum code"""
warnings.warn("Use .name instead", DeprecationWarning)
return self.name
@abstractproperty
def stem(self):
"""The name without an extension, or the last component of the path"""
@abstractproperty
def dirname(self):
"""The dirname component of this path"""
@abstractproperty
def root(self):
"""The root of the file tree (`/` on Unix)"""
@abstractproperty
def drive(self):
"""The drive letter (on Windows)"""
@abstractproperty
def suffix(self):
"""The suffix of this file"""
@abstractproperty
def suffixes(self):
"""This is a list of all suffixes"""
@abstractproperty
def uid(self):
"""The user that owns this path. The returned value is a :class:`FSUser <plumbum.path.FSUser>`
object which behaves like an ``int`` (as expected from ``uid``), but it also has a ``.name``
attribute that holds the string-name of the user"""
@abstractproperty
def gid(self):
"""The group that owns this path. The returned value is a :class:`FSUser <plumbum.path.FSUser>`
object which behaves like an ``int`` (as expected from ``gid``), but it also has a ``.name``
attribute that holds the string-name of the group"""
@abstractmethod
def as_uri(self, scheme=None):
"""Returns a universal resource identifier. Use ``scheme`` to force a scheme."""
@abstractmethod
def _get_info(self):
pass
@abstractmethod
def join(self, *parts):
"""Joins this path with any number of paths"""
@abstractmethod
def list(self):
"""Returns the files in this directory"""
@abstractmethod
def iterdir(self):
"""Returns an iterator over the directory. Might be slightly faster on Python 3.5 than .list()"""
@abstractmethod
def is_dir(self):
"""Returns ``True`` if this path is a directory, ``False`` otherwise"""
def isdir(self):
"""Included for compatibility with older Plumbum code"""
warnings.warn("Use .is_dir() instead", DeprecationWarning)
return self.is_dir()
@abstractmethod
def is_file(self):
"""Returns ``True`` if this path is a regular file, ``False`` otherwise"""
def isfile(self):
"""Included for compatibility with older Plumbum code"""
warnings.warn("Use .is_file() instead", DeprecationWarning)
return self.is_file()
def islink(self):
"""Included for compatibility with older Plumbum code"""
warnings.warn("Use is_symlink instead", DeprecationWarning)
return self.is_symlink()
@abstractmethod
def is_symlink(self):
"""Returns ``True`` if this path is a symbolic link, ``False`` otherwise"""
@abstractmethod
def exists(self):
"""Returns ``True`` if this path exists, ``False`` otherwise"""
@abstractmethod
def stat(self):
"""Returns the os.stats for a file"""
pass
@abstractmethod
def with_name(self, name):
"""Returns a path with the name replaced"""
@abstractmethod
def with_suffix(self, suffix, depth=1):
"""Returns a path with the suffix replaced. Up to last ``depth`` suffixes will be
replaced. None will replace all suffixes. If there are less than ``depth`` suffixes,
this will replace all suffixes. ``.tar.gz`` is an example where ``depth=2`` or
``depth=None`` is useful"""
def preferred_suffix(self, suffix):
"""Adds a suffix if one does not currently exist (otherwise, no change). Useful
for loading files with a default suffix"""
if len(self.suffixes) > 0:
return self
else:
return self.with_suffix(suffix)
@abstractmethod
def glob(self, pattern):
"""Returns a (possibly empty) list of paths that matched the glob-pattern under this path"""
@abstractmethod
def delete(self):
"""Deletes this path (recursively, if a directory)"""
@abstractmethod
def move(self, dst):
"""Moves this path to a different location"""
def rename(self, newname):
"""Renames this path to the ``new name`` (only the basename is changed)"""
return self.move(self.up() / newname)
@abstractmethod
def copy(self, dst, override = False):
"""Copies this path (recursively, if a directory) to the destination path. Raises TypeError if
dst exists and override is False."""
@abstractmethod
def mkdir(self):
"""Creates a directory at this path; if the directory already exists, silently ignore"""
@abstractmethod
def open(self, mode = "r"):
"""opens this path as a file"""
@abstractmethod
def read(self, encoding=None):
"""returns the contents of this file. By default the data is binary (``bytes``), but you can
specify the encoding, e.g., ``'latin1'`` or ``'utf8'``"""
@abstractmethod
def write(self, data, encoding=None):
"""writes the given data to this file. By default the data is expected to be binary (``bytes``),
but you can specify the encoding, e.g., ``'latin1'`` or ``'utf8'``"""
@abstractmethod
def touch(self):
"""Update the access time. Creates an empty file if none exists."""
@abstractmethod
def chown(self, owner = None, group = None, recursive = None):
"""Change ownership of this path.
:param owner: The owner to set (either ``uid`` or ``username``), optional
:param group: The group to set (either ``gid`` or ``groupname``), optional
:param recursive: whether to change ownership of all contained files and subdirectories.
Only meaningful when ``self`` is a directory. If ``None``, the value
will default to ``True`` if ``self`` is a directory, ``False`` otherwise.
"""
@abstractmethod
def chmod(self, mode):
"""Change the mode of path to the numeric mode.
:param mode: file mode as for os.chmod
"""
@staticmethod
def _access_mode_to_flags(mode, flags = {"f" : os.F_OK, "w" : os.W_OK, "r" : os.R_OK, "x" : os.X_OK}):
if isinstance(mode, str):
mode = reduce(operator.or_, [flags[m] for m in mode.lower()], 0)
return mode
@abstractmethod
def access(self, mode = 0):
"""Test file existence or permission bits
:param mode: a bitwise-or of access bits, or a string-representation thereof:
``'f'``, ``'x'``, ``'r'``, ``'w'`` for ``os.F_OK``, ``os.X_OK``,
``os.R_OK``, ``os.W_OK``
"""
@abstractmethod
def link(self, dst):
"""Creates a hard link from ``self`` to ``dst``
:param dst: the destination path
"""
@abstractmethod
def symlink(self, dst):
"""Creates a symbolic link from ``self`` to ``dst``
:param dst: the destination path
"""
@abstractmethod
def unlink(self):
"""Deletes a symbolic link"""
def split(self):
"""Splits the path on directory separators, yielding a list of directories, e.g,
``"/var/log/messages"`` will yield ``['var', 'log', 'messages']``.
"""
parts = []
path = self
while path != path.dirname:
parts.append(path.name)
path = path.dirname
return parts[::-1]
@property
def parts(self):
"""Splits the directory into parts, including the base directroy, returns a tuple"""
return tuple([self.drive + self.root] + self.split())
def relative_to(self, source):
"""Computes the "relative path" require to get from ``source`` to ``self``. They satisfy the invariant
``source_path + (target_path - source_path) == target_path``. For example::
/var/log/messages - /var/log/messages = []
/var/log/messages - /var = [log, messages]
/var/log/messages - / = [var, log, messages]
/var/log/messages - /var/tmp = [.., log, messages]
/var/log/messages - /opt = [.., var, log, messages]
/var/log/messages - /opt/lib = [.., .., var, log, messages]
"""
if isinstance(source, str):
source = self._form(source)
parts = self.split()
baseparts = source.split()
ancestors = len(list(itertools.takewhile(lambda p: p[0] == p[1], zip(parts, baseparts))))
return RelativePath([".."] * (len(baseparts) - ancestors) + parts[ancestors:])
def __sub__(self, other):
"""Same as ``self.relative_to(other)``"""
return self.relative_to(other)
def _glob(self, pattern, fn):
"""Applies a glob string or list/tuple/iterable to the current path, using ``fn``"""
if isinstance(pattern, str):
return fn(pattern)
else:
results = []
for single_pattern in pattern:
results.extend(fn(single_pattern))
return sorted(list(set(results)))
class RelativePath(object):
"""
Relative paths are the "delta" required to get from one path to another.
Note that relative path do not point at anything, and thus are not paths.
Therefore they are system agnostic (but closed under addition)
Paths are always absolute and point at "something", whether existent or not.
Relative paths are created by subtracting paths (``Path.relative_to``)
"""
def __init__(self, parts):
self.parts = parts
def __str__(self):
return "/".join(self.parts)
def __iter__(self):
return iter(self.parts)
def __len__(self):
return len(self.parts)
def __getitem__(self, index):
return self.parts[index]
def __repr__(self):
return "RelativePath(%r)" % (self.parts,)
def __eq__(self, other):
return str(self) == str(other)
def __ne__(self, other):
return not (self == other)
def __gt__(self, other):
return str(self) > str(other)
def __ge__(self, other):
return str(self) >= str(other)
def __lt__(self, other):
return str(self) < str(other)
def __le__(self, other):
return str(self) <= str(other)
def __hash__(self):
return hash(str(self))
def __nonzero__(self):
return bool(str(self))
__bool__ = __nonzero__
def up(self, count = 1):
return RelativePath(self.parts[:-count])
def __radd__(self, path):
return path.join(*self.parts)
|