File: cel.py

package info (click to toggle)
asterisk-testsuite 0.0.0%2Bsvn.5781-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd, stretch
  • size: 18,632 kB
  • sloc: xml: 33,912; python: 32,904; ansic: 1,599; sh: 395; makefile: 170; sql: 17
file content (206 lines) | stat: -rw-r--r-- 7,612 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
"""Asterisk call detail record testing

This module implements an Asterisk CEL parser.

Copyright (C) 2010, Digium, Inc.
Terry Wilson<twilson@digium.com>

This program is free software, distributed under the terms of
the GNU General Public License Version 2.
"""

import yaml
import unittest
import astcsv
import re
import logging

LOGGER = logging.getLogger(__name__)

class CELSniffer(object):
    """A pluggable module that sniffs AMI CEL events and dumps them into a YAML
    file. Useful during test writing to create a baseline of expected CEL events
    """

    def __init__(self, module_config, test_object):
        """Constructor"""
        test_object.register_ami_observer(self.ami_connect)
        test_object.register_stop_observer(self.stop_handler)
        self.events = []
        if module_config is None:
            self.filters = {}
            self.display = []
        else:
            self.filters = module_config.get('filters') or {}
            self.display = module_config.get('display') or []

    def ami_connect(self, ami):
        """AMI connection handler"""
        ami.registerEvent('CEL', self.cel_handler)

    def cel_handler(self, ami, event):
        """Handle a CEL event"""
        for filter_key, filter_value in self.filters.items():
            if re.match(filter_value, event.get(filter_key.lower())) is None:
                return
        self.events.append(event)
        return (ami, event)

    def stop_handler(self, reason):
        """Write out the file. Currently hard codd to cel_events.yaml"""
        stream = file('cel_events.yaml', 'w')
        if len(self.display) == 0:
            yaml.dump(self.events, stream)
        else:
            items = []
            for ev_item in self.events:
                item = {}
                for key in self.display:
                    key = key.lower()
                    if key not in ev_item:
                        continue
                    item[key] = ev_item[key]
                items.append(item)
            yaml.dump(items, stream)
        stream.close()

        return reason


class CELModule(object):
    """Class that checks a test for expected CEL results"""

    def __init__(self, module_config, test_object):
        """Constructor

        Parameters:
        module_config The yaml loaded configuration for the CEL Module
        test_object A concrete implementation of TestClass
        """
        self.test_object = test_object

        # Build our expected CEL records
        self.cel_records = {}
        for record in module_config:
            file_name = record['file']
            if file_name not in self.cel_records:
                self.cel_records[file_name] = []
            for csv_line in record['lines']:
                # Set the record to the default fields, then update with what
                # was passed in to us
                dict_record = dict((k, None) for k in AsteriskCSVCELLine.fields)
                if csv_line is not None:
                    dict_record.update(csv_line)

                file_records = self.cel_records[file_name]
                file_records.append(AsteriskCSVCELLine(**dict_record))

        # Hook ourselves onto the test object
        test_object.register_stop_observer(self._check_cel_records)

    def _check_cel_records(self, callback_param):
        """A deferred callback method that is called by the TestCase
        derived object when all Asterisk instances have stopped

        Parameters:
        callback_param
        """
        LOGGER.debug("Checking CEL records...")
        self.match_cels()
        return callback_param


    def match_cels(self):
        """Called when all instances of Asterisk have exited.  Derived
        classes can override this to provide their own behavior for CEL
        matching.
        """
        expectations_met = True
        for key in self.cel_records:
            cel_expect = AsteriskCSVCEL(records=self.cel_records[key])
            cel_file = AsteriskCSVCEL(filename="%s/%s/cel-custom/%s.csv" %
                (self.test_object.ast[0].base,
                 self.test_object.ast[0].directories['astlogdir'],
                 key))
            if cel_expect.match(cel_file):
                LOGGER.debug("%s.csv - CEL results met expectations" % key)
            else:
                msg = ("%s.csv - CEL results did not meet expectations. "
                       "Test Failed." % key)
                LOGGER.error(msg)
                expectations_met = False

        self.test_object.set_passed(expectations_met)


class AsteriskCSVCELLine(astcsv.AsteriskCSVLine):
    "A single Asterisk Call Event Log record"

    fields = ['eventtype', 'eventtime', 'cidname', 'cidnum', 'ani', 'rdnis',
    'dnid', 'exten', 'context', 'channel', 'app', 'appdata', 'amaflags',
    'accountcode', 'uniqueid', 'linkedid', 'bridgepeer', 'userfield',
    'userdeftype', 'eventextra']

    def __init__(self, eventtype=None, eventtime=None, cidname=None,
                 cidnum=None, ani=None, rdnis=None, dnid=None, exten=None,
                 context=None, channel=None, app=None, appdata=None,
                 amaflags=None, accountcode=None, uniqueid=None, linkedid=None,
                 bridgepeer=None, userfield=None, userdeftype=None,
                 eventextra=None):
        """Construct an Asterisk CSV CEL.

        The arguments list definition must be in the same order that the
        arguments appear in the CSV file. They can, of course, be passed to
        __init__ in any order. AsteriskCSVCEL will pass the arguments via a
        **dict.
        """

        astcsv.AsteriskCSVLine.__init__(self, AsteriskCSVCELLine.fields,
            eventtype=eventtype, eventtime=eventtime, cidname=cidname,
            cidnum=cidnum, ani=ani, rdnis=rdnis, dnid=dnid, exten=exten,
            context=context, channel=channel, app=app, appdata=appdata,
            amaflags=amaflags, accountcode=accountcode, uniqueid=uniqueid,
            linkedid=linkedid, bridgepeer=bridgepeer, userfield=userfield,
            userdeftype=userdeftype, eventextra=eventextra)

class AsteriskCSVCEL(astcsv.AsteriskCSV):
    """A representation of an Asterisk CSV CEL file"""

    def __init__(self, filename=None, records=None):
        """Initialize CEL records from an Asterisk cel-csv file"""

        astcsv.AsteriskCSV.__init__(self, filename, records,
            AsteriskCSVCELLine.fields, AsteriskCSVCELLine)


class AsteriskCSVCELTests(unittest.TestCase):
    """Unit tests for AsteriskCSVCEL"""

    def test_cel(self):
        """Test CEL using self_test/CELMaster1.csv"""

        cel = AsteriskCSVCEL("self_test/CELMaster1.csv")
        self.assertEqual(len(cel), 16)
        self.assertTrue(AsteriskCSVCELLine(eventtype="LINKEDID_END",
                            channel="TinCan/string").match(cel[-1],
                                silent=True, exact=(True, True)))
        self.assertTrue(cel[-1].match(AsteriskCSVCELLine(
                            eventtype="LINKEDID_END",
                            channel="TinCan/string"),
                                silent=True, exact=(True, True)))

        self.assertFalse(cel[1].match(cel[0], silent=True))
        self.assertFalse(cel[0].match(cel[1], silent=True))
        self.assertEqual(cel[-1].channel, "TinCan/string")

        self.assertTrue(cel.match(cel))
        cel2 = AsteriskCSVCEL("self_test/CELMaster2.csv")
        self.assertFalse(cel.match(cel2))


if __name__ == '__main__':
    logging.basicConfig()
    unittest.main()

# vim:sw=4:ts=4:expandtab:textwidth=79