File: configuration.py

package info (click to toggle)
python-smoke-zephyr 2.0.1-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 264 kB
  • sloc: python: 1,294; makefile: 4
file content (241 lines) | stat: -rw-r--r-- 8,472 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#  smoke_zephyr/configuration.py
#
#  Redistribution and use in source and binary forms, with or without
#  modification, are permitted provided that the following conditions are
#  met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following disclaimer
#    in the documentation and/or other materials provided with the
#    distribution.
#  * Neither the name of the project nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
#  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
#  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
#  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
#  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
#  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
#  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
#  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
#  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
#  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#

import copy
import json
import os
import sys
from collections.abc import Mapping as _Mapping

try:
	import yaml
except ImportError:
	has_yaml = False
	"""Whether the :py:mod:`yaml` module is available or not."""
else:
	has_yaml = True
	try:
		from yaml import CLoader as Loader, CDumper as Dumper
	except ImportError:
		from yaml import Loader, Dumper

SERIALIZER_DRIVERS = {}
"""The serializer drivers that are available."""
SERIALIZER_DRIVERS['json'] = {'load': json.load, 'dumps': lambda obj: json.dumps(obj, sort_keys=True, indent=4)}
SERIALIZER_DRIVERS['jsn'] = {'load': json.load, 'dumps': lambda obj: json.dumps(obj, sort_keys=True, indent=4)}
if has_yaml:
	SERIALIZER_DRIVERS['yaml'] = {'load': lambda file_obj: yaml.load(file_obj, Loader=Loader), 'dumps': lambda obj: yaml.dumps(obj, default_flow_style=False, Dumper=Dumper)}
	SERIALIZER_DRIVERS['yml'] = {'load': lambda file_obj: yaml.load(file_obj, Loader=Loader), 'dumps': lambda obj: yaml.dumps(obj, default_flow_style=False, Dumper=Dumper)}

class MemoryConfiguration(object):
	"""
	This class provides an interface for retrieving values from deeply nested
	objects supporting Python's __getitem__ interface.
	"""
	seperator = '.'
	def __init__(self, mem_object, prefix=''):
		"""
		:param smem_object: The memory object to parse.
		:param str prefix: String to be prefixed to all option names.
		:param str object_type: String to identify how to parse the mem_object.
		"""
		self.prefix = prefix
		if not isinstance(mem_object, (dict, _Mapping)):
			raise TypeError("mem_object does not inherit from dict or {0}.Mapping".format(_Mapping.__module__))
		self._storage = mem_object

	def get(self, item_name):
		"""
		Retrieve the value of an option.

		:param str item_name: The name of the option to retrieve.
		:return: The value of *item_name* in the configuration.
		"""
		if self.prefix:
			item_name = self.prefix + self.seperator + item_name
		item_names = item_name.split(self.seperator)
		node = self._storage
		for item_name in item_names:
			node = node[item_name]
		return node

	def get_if_exists(self, item_name, default_value=None):
		"""
		Retrieve the value of an option if it exists, otherwise
		return *default_value* instead of raising an error:

		:param str item_name: The name of the option to retrieve.
		:param default_value: The value to return if *item_name* does not exist.
		:return: The value of *item_name* in the configuration.
		"""
		if self.has_option(item_name):
			return self.get(item_name)
		return default_value

	def get_storage(self):
		"""
		Get a copy of the internal configuration. Changes made to the returned
		copy will not affect this object.

		:return: A copy of the internal storage object.
		:rtype: dict
		"""
		return copy.deepcopy(self._storage)

	def has_option(self, option_name):
		"""
		Check that an option exists.

		:param str option_name: The name of the option to check.
		:return: True of the option exists in the configuration.
		:rtype: bool
		"""
		if self.prefix:
			option_name = self.prefix + self.seperator + option_name
		item_names = option_name.split(self.seperator)
		node = self._storage
		for item_name in item_names:
			if node is None:
				return False
			if not item_name in node:
				return False
			node = node[item_name]
		return True

	def has_section(self, section_name):
		"""
		Checks that an option exists and that it contains sub options.

		:param str section_name: The name of the section to check.
		:return: True if the section exists.
		:rtype: dict
		"""
		if not self.has_option(section_name):
			return False
		return isinstance(self.get(section_name), dict)

	def set(self, item_name, item_value):
		"""
		Sets the value of an option in the configuration.

		:param str item_name: The name of the option to set.
		:param item_value: The value of the option to set.
		"""
		if self.prefix:
			item_name = self.prefix + self.seperator + item_name
		item_names = item_name.split(self.seperator)
		item_last = item_names.pop()
		node = self._storage
		for item_name in item_names:
			if not item_name in node:
				node[item_name] = {}
			node = node[item_name]
		node[item_last] = item_value
		return

class Configuration(MemoryConfiguration):
	"""
	This class provides a generic object for parsing configuration files
	in multiple formats.
	"""
	def __init__(self, configuration_file, prefix=''):
		"""
		:param str configuration_file: The configuration file to parse.
		:param str prefix: String to be prefixed to all option names.
		"""
		self.configuration_file = configuration_file
		with open(self.configuration_file, 'r') as file_h:
			mem_object = self._serializer('load', file_h)
		super(Configuration, self).__init__(mem_object, prefix)

	@property
	def configuration_file_ext(self):
		"""
		The extension of the current configuration file.
		"""
		return os.path.splitext(self.configuration_file)[1][1:]

	def _serializer(self, operation, *args):
		if not self.configuration_file_ext in SERIALIZER_DRIVERS:
			raise ValueError('unknown file type \'' + self.configuration_file_ext + '\'')
		function = SERIALIZER_DRIVERS[self.configuration_file_ext][operation]
		return function(*args)

	def get_missing(self, verify_file):
		"""
		Use a verification configuration which has a list of required options
		and their respective types. This information is used to identify missing
		and incompatible options in the loaded configuration.

		:param str verify_file: The file to load for verification data.
		:return: A dictionary of missing and incompatible settings.
		:rtype: dict
		"""
		vconf = Configuration(verify_file)
		missing = {}
		for setting, setting_type in vconf.get('settings').items():
			if not self.has_option(setting):
				missing['missing'] = missing.get('settings', [])
				missing['missing'].append(setting)
			elif not type(self.get(setting)).__name__ == setting_type:
				missing['incompatible'] = missing.get('incompatible', [])
				missing['incompatible'].append((setting, setting_type))
		return missing

	def save(self):
		"""
		Save the current configuration to disk.
		"""
		with open(self.configuration_file, 'w') as file_h:
			file_h.write(self._serializer('dumps', self._storage))

def main():
	import argparse

	parser = argparse.ArgumentParser(description='Parse a configuration file', conflict_handler='resolve')
	parser.add_argument('config_file', action='store', help='configuration file to parse')
	parser.add_argument('option', action='store', help='option to retreive the value from')
	arguments = parser.parse_args()

	config = Configuration(arguments.config_file)
	if not config.has_option(arguments.option):
		return 1
	option_value = config.get(arguments.option)
	if isinstance(option_value, list):
		for value in option_value:
			print(value) # pylint: disable=C0325
		return 0
	print(option_value) # pylint: disable=C0325
	return 0

if __name__ == '__main__':
	sys.exit(main())