1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
#!/usr/bin/env python3
# Copyright (C) 2017
import collections.abc
import logging
import os # requires posix
from augeas import Augeas
AUGEAS_LOAD_PATH = '/augeas/load/'
AUGEAS_FILES_PATH = '/files/'
AUGEAS_ERROR_PATH = '//error'
log = logging.getLogger('augeas')
def join(*paths):
"""
join two Augeas tree paths
FIXME: Beware: // is normalized to /
"""
norm_paths = [os.path.normpath(path) for path in paths]
# first path must be absolute
assert norm_paths[0][0] == '/'
new_paths = [norm_paths[0]]
# relativize all other paths so join works as expected
for path in norm_paths[1:]:
if path.startswith('/'):
path = path[1:]
new_paths.append(path)
new_path = os.path.join(*new_paths)
log.debug("join: new_path %s", new_path)
return os.path.normpath(new_path)
class AugeasWrapper:
"""python-augeas higher-level wrapper.
Load single augeas lens and configuration file.
Exposes configuration file as AugeasNode object with dict-like interface.
AugeasWrapper can be used in with statement in the same way as file does.
"""
def __init__(self, confpath, lens, root=None, loadpath=None,
flags=Augeas.NO_MODL_AUTOLOAD | Augeas.NO_LOAD | Augeas.ENABLE_SPAN):
"""Parse configuration file using given lens.
Params:
confpath (str): Absolute path to the configuration file
lens (str): Name of module containing Augeas lens
root: passed down to original Augeas
flags: passed down to original Augeas
loadpath: passed down to original Augeas
flags: passed down to original Augeas
"""
log.debug('loadpath: %s', loadpath)
log.debug('confpath: %s', confpath)
self._aug = Augeas(root=root, loadpath=loadpath, flags=flags)
# /augeas/load/{lens}
aug_load_path = join(AUGEAS_LOAD_PATH, lens)
# /augeas/load/{lens}/lens = {lens}.lns
self._aug.set(join(aug_load_path, 'lens'), f'{lens}.lns')
# /augeas/load/{lens}/incl[0] = {confpath}
self._aug.set(join(aug_load_path, 'incl[0]'), confpath)
self._aug.load()
errors = self._aug.match(AUGEAS_ERROR_PATH)
if errors:
err_msg = '\n'.join(
[f"{e}: {self._aug.get(e)}" for e in errors]
)
raise RuntimeError(err_msg)
path = join(AUGEAS_FILES_PATH, confpath)
paths = self._aug.match(path)
if len(paths) != 1:
raise ValueError(f'path {path} did not match exactly once')
self.tree = AugeasNode(self._aug, path)
self._loaded = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.save()
self.close()
def save(self):
"""Save Augeas tree to its original file."""
assert self._loaded
try:
self._aug.save()
except IOError as exc:
log.exception(exc)
for err_path in self._aug.match('//error'):
log.error('%s: %s', err_path,
self._aug.get(os.path.join(err_path, 'message')))
raise
def close(self):
"""
close Augeas library
After calling close() the object must not be used anymore.
"""
assert self._loaded
self._aug.close()
del self._aug
self._loaded = False
def match(self, path):
"""Yield AugeasNodes matching given expression."""
assert self._loaded
assert path
log.debug('tree match %s', path)
for matched_path in self._aug.match(path):
yield AugeasNode(self._aug, matched_path)
class AugeasNode(collections.abc.MutableMapping):
"""One Augeas tree node with dict-like interface."""
def __init__(self, aug, path):
"""
Args:
aug (AugeasWrapper or Augeas): Augeas library instance
path (str): absolute path in Augeas tree matching single node
BEWARE: There are no sanity checks of given path for performance reasons.
"""
assert aug
assert path
assert path.startswith('/')
self._aug = aug
self._path = path
self._span = None
@property
def path(self):
"""canonical path in Augeas tree, read-only"""
return self._path
@property
def value(self):
"""
get value of this node in Augeas tree
"""
value = self._aug.get(self._path)
log.debug('tree get: %s = %s', self._path, value)
return value
@value.setter
def value(self, value):
"""
set value of this node in Augeas tree
"""
log.debug('tree set: %s = %s', self._path, value)
self._aug.set(self._path, value)
@property
def span(self):
if self._span is None:
self._span = f"char position {self.char}"
return self._span
@property
def char(self):
return self._aug.span(self._path)[5]
def __len__(self):
"""
number of items matching this path
It is always 1 after __init__() but it may change
as Augeas tree changes.
"""
return len(self._aug.match(self._path))
def __getitem__(self, key):
if isinstance(key, int):
# int is a shortcut to write [int]
target_path = f'{self._path}[{key}]'
else:
target_path = self._path + key
log.debug('tree getitem: target_path %s', target_path)
paths = self._aug.match(target_path)
if len(paths) != 1:
raise KeyError(f'path {target_path} did not match exactly once')
return AugeasNode(self._aug, target_path)
def __delitem__(self, key):
log.debug('tree delitem: %s + %s', self._path, key)
target_path = self._path + key
log.debug('tree delitem: target_path %s', target_path)
self._aug.remove(target_path)
def __setitem__(self, key, value):
assert isinstance(value, AugeasNode)
target_path = self.path + key
self._aug.copy(value.path, target_path)
def __iter__(self):
self_path_len = len(self._path)
assert self_path_len > 0
log.debug('tree iter: %s', self._path)
for new_path in self._aug.match(self._path):
if len(new_path) == self_path_len:
yield ''
else:
yield new_path[self_path_len - 1:]
def match(self, subpath):
"""Yield AugeasNodes matching given sub-expression."""
assert subpath.startswith("/")
match_path = f"{self._path}{subpath}"
log.debug('tree match %s: %s', match_path, self._path)
for matched_path in self._aug.match(match_path):
yield AugeasNode(self._aug, matched_path)
def __repr__(self):
return f'AugeasNode({self._path})'
|