1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
import os
from contextlib import contextmanager
class EnvPathList(list):
__slots__ = ["_path_factory", "_pathsep"]
def __init__(self, path_factory, pathsep):
self._path_factory = path_factory
self._pathsep = pathsep
def append(self, path):
list.append(self, self._path_factory(path))
def extend(self, paths):
list.extend(self, (self._path_factory(p) for p in paths))
def insert(self, index, path):
list.insert(self, index, self._path_factory(path))
def index(self, path):
list.index(self, self._path_factory(path))
def __contains__(self, path):
return list.__contains__(self, self._path_factory(path))
def remove(self, path):
list.remove(self, self._path_factory(path))
def update(self, text):
self[:] = [self._path_factory(p) for p in text.split(self._pathsep)]
def join(self):
return self._pathsep.join(str(p) for p in self)
class BaseEnv(object):
"""The base class of LocalEnv and RemoteEnv"""
__slots__ = ["_curr", "_path", "_path_factory"]
CASE_SENSITIVE = True
def __init__(self, path_factory, pathsep):
self._path_factory = path_factory
self._path = EnvPathList(path_factory, pathsep)
self._update_path()
def _update_path(self):
self._path.update(self.get("PATH", ""))
@contextmanager
def __call__(self, *args, **kwargs):
"""A context manager that can be used for temporal modifications of the environment.
Any time you enter the context, a copy of the old environment is stored, and then restored,
when the context exits.
:param args: Any positional arguments for ``update()``
:param kwargs: Any keyword arguments for ``update()``
"""
prev = self._curr.copy()
self.update(**kwargs)
try:
yield
finally:
self._curr = prev
self._update_path()
def __iter__(self):
"""Returns an iterator over the items ``(key, value)`` of current environment
(like dict.items)"""
return iter(self._curr.items())
def __hash__(self):
raise TypeError("unhashable type")
def __len__(self):
"""Returns the number of elements of the current environment"""
return len(self._curr)
def __contains__(self, name):
"""Tests whether an environment variable exists in the current environment"""
return (name if self.CASE_SENSITIVE else name.upper()) in self._curr
def __getitem__(self, name):
"""Returns the value of the given environment variable from current environment,
raising a ``KeyError`` if it does not exist"""
return self._curr[name if self.CASE_SENSITIVE else name.upper()]
def keys(self):
"""Returns the keys of the current environment (like dict.keys)"""
return self._curr.keys()
def items(self):
"""Returns the items of the current environment (like dict.items)"""
return self._curr.items()
def values(self):
"""Returns the values of the current environment (like dict.values)"""
return self._curr.values()
def get(self, name, *default):
"""Returns the keys of the current environment (like dict.keys)"""
return self._curr.get((name if self.CASE_SENSITIVE else name.upper()), *default)
def __delitem__(self, name):
"""Deletes an environment variable from the current environment"""
name = name if self.CASE_SENSITIVE else name.upper()
del self._curr[name]
if name == "PATH":
self._update_path()
def __setitem__(self, name, value):
"""Sets/replaces an environment variable's value in the current environment"""
name = name if self.CASE_SENSITIVE else name.upper()
self._curr[name] = value
if name == "PATH":
self._update_path()
def pop(self, name, *default):
"""Pops an element from the current environment (like dict.pop)"""
name = name if self.CASE_SENSITIVE else name.upper()
res = self._curr.pop(name, *default)
if name == "PATH":
self._update_path()
return res
def clear(self):
"""Clears the current environment (like dict.clear)"""
self._curr.clear()
self._update_path()
def update(self, *args, **kwargs):
"""Updates the current environment (like dict.update)"""
self._curr.update(*args, **kwargs)
if not self.CASE_SENSITIVE:
for k, v in list(self._curr.items()):
self._curr[k.upper()] = v
self._update_path()
def getdict(self):
"""Returns the environment as a real dictionary"""
self._curr["PATH"] = self.path.join()
return dict((k, str(v)) for k, v in self._curr.items())
@property
def path(self):
"""The system's ``PATH`` (as an easy-to-manipulate list)"""
return self._path
def _get_home(self):
if "HOME" in self:
return self._path_factory(self["HOME"])
elif "USERPROFILE" in self:
return self._path_factory(self["USERPROFILE"])
elif "HOMEPATH" in self:
return self._path_factory(self.get("HOMEDRIVE", ""), self["HOMEPATH"])
return None
def _set_home(self, p):
if "HOME" in self:
self["HOME"] = str(p)
elif "USERPROFILE" in self:
self["USERPROFILE"] = str(p)
elif "HOMEPATH" in self:
self["HOMEPATH"] = str(p)
else:
self["HOME"] = str(p)
home = property(_get_home, _set_home)
"""Get or set the home path"""
@property
def user(self):
"""Return the user name, or ``None`` if it is not set"""
# adapted from getpass.getuser()
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
if name in self:
return self[name]
try:
# POSIX only
import pwd
except ImportError:
return None
else:
return pwd.getpwuid(os.getuid())[0] # @UndefinedVariable
|