1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Install procedure
:author: Michael Mulich
:copyright: 2010 by Penn State University
:organization: WebLion Group, Penn State University
:license: GPL, see LICENSE for more detail
"""
import sys
import tempfile
import urllib2
import ConfigParser
from urlparse import urlparse, urljoin
assert sys.version_info >= (2, 7), "Python >= 2.7 is required"
__version__ = '0.1.0'
class VersionCfg(object):
"""A versions.cfg configuration object."""
def __init__(self, file_path, comes_from=None):
self.file_path = file_path
if not comes_from is None or not isinstance(comes_from, self.__class__):
RuntimeError("Expected None or a %s type object, received %s." \
% (self.__class__.__name__, type(comes_from)))
self.cfg = self.__class__.read_cfg(self.file_path)
self.comes_from = comes_from
if self.is_extended:
self.__init_extend()
def __init_extend(self):
"""Initialized the configuration file this configuration file
extends."""
extends_value = self.cfg.get('buildout', 'extends')
url_parts = urlparse(extends_value)
kls = self.__class__
if not url_parts.scheme:
self.extends = kls.from_url(urljoin(self.comes_from, extends_value), self)
elif url_parts.scheme == 'file':
self.extends = kls(url_parts.path, self)
else:
self.extends = kls.from_url(extends_value, self)
@classmethod
def from_url(cls, url, comes_from=None):
"""Instantiate the class from a config at some URL."""
try:
data = urllib2.urlopen(url)
except urllib2.HTTPError, e:
raise RuntimeError("The versions.cfg file could not be downloaded. "
"Please check the that you can get to %s."
% url)
file_path = tempfile.mkstemp()[1]
with open(file_path, 'w') as f:
f.write(data.read())
obj = cls(file_path, comes_from=url)
return obj
@staticmethod
def read_cfg(file_path):
"""Read in the configuration file and return a
ConfigParser.RawConfigParser instance."""
cfg = ConfigParser.RawConfigParser()
with open(file_path, 'r') as f:
cfg.readfp(f)
return cfg
@property
def is_extended(self):
"""Has this versions.cfg extended another configuration file?"""
if hasattr(self, '_extends'):
return self._extends
if self.cfg.has_option('buildout', 'extends'):
self._extends = True
else:
self._extends = False
return self._extends
def has(self, key):
"""Check for the key somewhere in the version hierarchy."""
if self.cfg.has_option('versions', key):
return True
elif self.is_extended:
return self.extends.has(key)
else:
return False
def get(self, key):
"""Get the value of key from somewhere in the version hierarchy."""
try:
return (self.cfg.get('versions', key), self)
except ConfigParser.NoOptionError, err:
if self.is_extended:
return self.extends.get(key)
else:
raise err
def items(self):
# Build a set of option keys
options = set()
version_obj = self
while version_obj:
options = options.union(version_obj.cfg.options('versions'))
version_obj = getattr(version_obj, 'extends', None)
# Attatch the values to the options
return [(name, self.get(name),) for name in options]
def merge(self, io_obj):
"""Merge from this configuration downward (if it extends others).
Output to an IO object."""
# Group by where name value pair come from
grouping = {}
for name, (value, comes_from) in self.items():
if not comes_from in grouping:
grouping[comes_from] = {}
grouping[comes_from][name] = value
# Write out the merge
url_or_path = lambda c: hasattr(c, 'url') and c.url or c.file_path
io_obj.write("# Merging %s version configuration files:\n" \
% len(grouping))
for group in grouping.keys():
io_obj.write("# - %s\n" % url_or_path(group))
io_obj.write("[versions]\n")
for came_from, values in grouping.items():
io_obj.write("# Pinnings came from %s\n" % url_or_path(came_from))
for name, value in values.items():
io_obj.write("%s = %s\n" % (name, value))
def main():
args = sys.argv
if args[0].endswith(__file__):
args.pop(0)
if len(args) < 1:
print("usage: %s URL")
sys.exit(1)
url = args[0]
url_parts = urlparse(url)
if url_parts.scheme is None \
or url_parts.scheme == 'file':
versions_cfg = VersionCfg(url_parts.path)
else:
versions_cfg = VersionCfg.from_url(url)
versions_cfg.merge(sys.stdout)
if __name__ == '__main__':
main()
|