1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
#!/usr/bin/env python
"""Asterisk Configuration File Handling.
This module implements interfaces for dealing with Asterisk configuration
files.
Copyright (C) 2010, Digium, Inc.
Russell Bryant <russell@digium.com>
This program is free software, distributed under the terms of
the GNU General Public License Version 2.
"""
#
# TODO
# - Implement config section template handling
#
import sys
import re
import unittest
import logging
LOGGER = logging.getLogger(__name__)
def is_blank_line(line):
"""Is this a blank line?"""
return re.match("\s*(?:;.*)?$", line) is not None
class Category(object):
"""A category in an Asterisk configuration file.
This is a helper class used along with ConfigFile. A category is section
of an Asterisk configuration that will contain zero or more key/value pairs
of options.
"""
def __init__(self, name, template=False):
self.options = []
self.name = name
self.template = template
self.varval_re = re.compile("""
\s* # Leading whitespace
(?P<name>[\w|,\.-]+) # Option name
\s*=>?\s* # Separator, = or =>
(?P<value>[\w\s=_()/@|,'"\.<>:-]*) # Option value (can be zero-length)
(?:;.*)?$ # Optional comment before end of line
""", re.VERBOSE)
def parse_line(self, line):
"""Parse a line in a category"""
match = self.varval_re.match(line)
if match is None:
if not is_blank_line(line):
LOGGER.warn("Invalid line: '%s'" % line.strip())
return
self.options.append((match.group("name"), match.group("value").strip()))
class ConfigFile(object):
"""An Asterisk Configuration File.
Parse an Asterisk configuration file.
"""
def __init__(self, filename, config_str=None):
"""Construct an Asterisk configuration file object
The ConfigFile object will parse an Asterisk configuration file into a
python data structure.
"""
self.categories = []
self.category_re = re.compile("""
\s* # Leading Whitespace
\[(?P<name>[\w,\.-]+)\] # Category name in square brackets
(?:\((?P<template>[!])\))? # Optionally marked as a template
\s*(?:;.*)?$ # trailing whitespace or a comment
""", re.VERBOSE)
if config_str is None:
try:
with open(filename, "r") as config_file:
config_str = config_file.read()
except IOError:
LOGGER.error("Failed to open config file '%s'" % filename)
return
except:
LOGGER.error("Unexpected error: %s" % sys.exc_info()[0])
return
config_str = self.strip_mline_comments(config_str)
for line in config_str.split("\n"):
self.parse_line(line)
def strip_mline_comments(self, text):
"""Strip multi-line comments"""
return re.compile(";--.*?--;", re.DOTALL).sub("", text)
def parse_line(self, line):
"""Parse a line in the config file"""
match = self.category_re.match(line)
if match is not None:
self.categories.append(
Category(match.group("name"),
template=match.group("template") == "!")
)
elif len(self.categories) == 0:
if not is_blank_line(line):
LOGGER.warn("Invalid line: '%s'" % line.strip())
else:
self.categories[-1].parse_line(line)
class ConfigFileTests(unittest.TestCase):
"""Unit tests for ConfigFile"""
def test_conf(self):
"""Test parsing a blob of config data"""
test = \
"; stuff\n" \
"this line is invalid on purpose\n" \
"[this is] also invalid]\n" \
";-- comment --;\n" \
";-- \n" \
"[this is commented out]\n" \
" --;\n" \
"[foo]\n" \
"a = b\n" \
" b = a \n" \
"this line is invalid on purpose\n" \
";moo\n" \
"c = d;asdadf;adfads;adsfasdf\n" \
" [bar] ;asdfasdf\n" \
"a-b=c-d\n" \
"xyz=x|y|z\n" \
"1234 => 4242,Example Mailbox,root@localhost,,var=val\n" \
"\n" \
"[template](!)\n" \
"foo=bar\n" \
"exten => _NXX.,n,Wait(1)\n" \
"astetcdir => /etc/asterisk\n"
conf = ConfigFile(fn=None, config_str=test)
self.assertEqual(len(conf.categories), 3)
self.assertEqual(conf.categories[0].name, "foo")
self.assertFalse(conf.categories[0].template)
self.assertEqual(len(conf.categories[0].options), 3)
self.assertEqual(conf.categories[0].options[0][0], "a")
self.assertEqual(conf.categories[0].options[0][1], "b")
self.assertEqual(conf.categories[0].options[1][0], "b")
self.assertEqual(conf.categories[0].options[1][1], "a")
self.assertEqual(conf.categories[0].options[2][0], "c")
self.assertEqual(conf.categories[0].options[2][1], "d")
self.assertEqual(conf.categories[1].name, "bar")
self.assertFalse(conf.categories[1].template)
self.assertEqual(len(conf.categories[1].options), 3)
self.assertEqual(conf.categories[1].options[0][0], "a-b")
self.assertEqual(conf.categories[1].options[0][1], "c-d")
self.assertEqual(conf.categories[1].options[1][0], "xyz")
self.assertEqual(conf.categories[1].options[1][1], "x|y|z")
self.assertEqual(conf.categories[1].options[2][0], "1234")
self.assertEqual(conf.categories[1].options[2][1],
"4242,Example Mailbox,root@localhost,,var=val")
self.assertEqual(conf.categories[2].name, "template")
self.assertTrue(conf.categories[2].template)
self.assertEqual(len(conf.categories[2].options), 3)
self.assertEqual(conf.categories[2].options[0][0], "foo")
self.assertEqual(conf.categories[2].options[0][1], "bar")
self.assertEqual(conf.categories[2].options[1][0], "exten")
self.assertEqual(conf.categories[2].options[1][1],
"_NXX.,n,Wait(1)")
self.assertEqual(conf.categories[2].options[2][0], "astetcdir")
self.assertEqual(conf.categories[2].options[2][1], "/etc/asterisk")
def main(argv=None):
"""Read in and show a config file, or run unit tests"""
if argv is None:
argv = sys.argv
if len(argv) == 2:
conf = ConfigFile(argv[1])
for cat in conf.categories:
LOGGER.debug("[%s]" % cat.name)
for (var, val) in cat.options:
LOGGER.debug("%s = %s" % (var, val))
else:
return unittest.main()
return 0
if __name__ == "__main__":
sys.exit(main() or 0)
|