1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# smoke_zephyr/configuration.py
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of the project nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import copy
import json
import os
import sys
from collections.abc import Mapping as _Mapping
try:
import yaml
except ImportError:
has_yaml = False
"""Whether the :py:mod:`yaml` module is available or not."""
else:
has_yaml = True
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
SERIALIZER_DRIVERS = {}
"""The serializer drivers that are available."""
SERIALIZER_DRIVERS['json'] = {'load': json.load, 'dumps': lambda obj: json.dumps(obj, sort_keys=True, indent=4)}
SERIALIZER_DRIVERS['jsn'] = {'load': json.load, 'dumps': lambda obj: json.dumps(obj, sort_keys=True, indent=4)}
if has_yaml:
SERIALIZER_DRIVERS['yaml'] = {'load': lambda file_obj: yaml.load(file_obj, Loader=Loader), 'dumps': lambda obj: yaml.dumps(obj, default_flow_style=False, Dumper=Dumper)}
SERIALIZER_DRIVERS['yml'] = {'load': lambda file_obj: yaml.load(file_obj, Loader=Loader), 'dumps': lambda obj: yaml.dumps(obj, default_flow_style=False, Dumper=Dumper)}
class MemoryConfiguration(object):
"""
This class provides an interface for retrieving values from deeply nested
objects supporting Python's __getitem__ interface.
"""
seperator = '.'
def __init__(self, mem_object, prefix=''):
"""
:param smem_object: The memory object to parse.
:param str prefix: String to be prefixed to all option names.
:param str object_type: String to identify how to parse the mem_object.
"""
self.prefix = prefix
if not isinstance(mem_object, (dict, _Mapping)):
raise TypeError("mem_object does not inherit from dict or {0}.Mapping".format(_Mapping.__module__))
self._storage = mem_object
def get(self, item_name):
"""
Retrieve the value of an option.
:param str item_name: The name of the option to retrieve.
:return: The value of *item_name* in the configuration.
"""
if self.prefix:
item_name = self.prefix + self.seperator + item_name
item_names = item_name.split(self.seperator)
node = self._storage
for item_name in item_names:
node = node[item_name]
return node
def get_if_exists(self, item_name, default_value=None):
"""
Retrieve the value of an option if it exists, otherwise
return *default_value* instead of raising an error:
:param str item_name: The name of the option to retrieve.
:param default_value: The value to return if *item_name* does not exist.
:return: The value of *item_name* in the configuration.
"""
if self.has_option(item_name):
return self.get(item_name)
return default_value
def get_storage(self):
"""
Get a copy of the internal configuration. Changes made to the returned
copy will not affect this object.
:return: A copy of the internal storage object.
:rtype: dict
"""
return copy.deepcopy(self._storage)
def has_option(self, option_name):
"""
Check that an option exists.
:param str option_name: The name of the option to check.
:return: True of the option exists in the configuration.
:rtype: bool
"""
if self.prefix:
option_name = self.prefix + self.seperator + option_name
item_names = option_name.split(self.seperator)
node = self._storage
for item_name in item_names:
if node is None:
return False
if not item_name in node:
return False
node = node[item_name]
return True
def has_section(self, section_name):
"""
Checks that an option exists and that it contains sub options.
:param str section_name: The name of the section to check.
:return: True if the section exists.
:rtype: dict
"""
if not self.has_option(section_name):
return False
return isinstance(self.get(section_name), dict)
def set(self, item_name, item_value):
"""
Sets the value of an option in the configuration.
:param str item_name: The name of the option to set.
:param item_value: The value of the option to set.
"""
if self.prefix:
item_name = self.prefix + self.seperator + item_name
item_names = item_name.split(self.seperator)
item_last = item_names.pop()
node = self._storage
for item_name in item_names:
if not item_name in node:
node[item_name] = {}
node = node[item_name]
node[item_last] = item_value
return
class Configuration(MemoryConfiguration):
"""
This class provides a generic object for parsing configuration files
in multiple formats.
"""
def __init__(self, configuration_file, prefix=''):
"""
:param str configuration_file: The configuration file to parse.
:param str prefix: String to be prefixed to all option names.
"""
self.configuration_file = configuration_file
with open(self.configuration_file, 'r') as file_h:
mem_object = self._serializer('load', file_h)
super(Configuration, self).__init__(mem_object, prefix)
@property
def configuration_file_ext(self):
"""
The extension of the current configuration file.
"""
return os.path.splitext(self.configuration_file)[1][1:]
def _serializer(self, operation, *args):
if not self.configuration_file_ext in SERIALIZER_DRIVERS:
raise ValueError('unknown file type \'' + self.configuration_file_ext + '\'')
function = SERIALIZER_DRIVERS[self.configuration_file_ext][operation]
return function(*args)
def get_missing(self, verify_file):
"""
Use a verification configuration which has a list of required options
and their respective types. This information is used to identify missing
and incompatible options in the loaded configuration.
:param str verify_file: The file to load for verification data.
:return: A dictionary of missing and incompatible settings.
:rtype: dict
"""
vconf = Configuration(verify_file)
missing = {}
for setting, setting_type in vconf.get('settings').items():
if not self.has_option(setting):
missing['missing'] = missing.get('settings', [])
missing['missing'].append(setting)
elif not type(self.get(setting)).__name__ == setting_type:
missing['incompatible'] = missing.get('incompatible', [])
missing['incompatible'].append((setting, setting_type))
return missing
def save(self):
"""
Save the current configuration to disk.
"""
with open(self.configuration_file, 'w') as file_h:
file_h.write(self._serializer('dumps', self._storage))
def main():
import argparse
parser = argparse.ArgumentParser(description='Parse a configuration file', conflict_handler='resolve')
parser.add_argument('config_file', action='store', help='configuration file to parse')
parser.add_argument('option', action='store', help='option to retreive the value from')
arguments = parser.parse_args()
config = Configuration(arguments.config_file)
if not config.has_option(arguments.option):
return 1
option_value = config.get(arguments.option)
if isinstance(option_value, list):
for value in option_value:
print(value) # pylint: disable=C0325
return 0
print(option_value) # pylint: disable=C0325
return 0
if __name__ == '__main__':
sys.exit(main())
|