File: filter.py

package info (click to toggle)
python-eliot 1.16.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 964 kB
  • sloc: python: 8,641; makefile: 151
file content (124 lines) | stat: -rw-r--r-- 3,372 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Command line program for filtering line-based Eliot logs.
"""

if __name__ == "__main__":
    import eliot.filter

    eliot.filter.main()

import sys
from datetime import datetime, timedelta
from json import JSONEncoder, dumps, loads


class _DatetimeJSONEncoder(JSONEncoder):
    """
    JSON encoder that supports L{datetime}.
    """

    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        return JSONEncoder.default(self, o)


class EliotFilter(object):
    """
    Filter Eliot log lines using a Python expression.

    @ivar code: A Python code object, the compiled filter expression.
    """

    _SKIP = object()

    def __init__(self, expr, incoming, output):
        """
        @param expr: A Python expression that will be called for each log message.
        @type expr: L{str}

        @param incoming: An iterable of L{bytes}, each of which is a serialized
            Eliot message.

        @param output: A file to which output should be written.
        @type output: L{file} or a file-like object.
        """
        self.code = compile(expr, "<string>", "eval")
        self.incoming = incoming
        self.output = output

    def run(self):
        """
        For each incoming message, decode the JSON, evaluate expression, encode
        as JSON and write that to the output file.
        """
        for line in self.incoming:
            message = loads(line)
            result = self._evaluate(message)
            if result is self._SKIP:
                continue
            self.output.write(dumps(result, cls=_DatetimeJSONEncoder) + "\n")

    def _evaluate(self, message):
        """
        Evaluate the expression with the given Python object in its locals.

        @param message: A decoded JSON input.

        @return: The resulting object.
        """
        return eval(
            self.code,
            globals(),
            {
                "J": message,
                "timedelta": timedelta,
                "datetime": datetime,
                "SKIP": self._SKIP,
            },
        )


USAGE = """\
Usage: cat eliot.log | python -m eliot.filter <expr>

Read JSON-expression per line from stdin, and filter it using a Python
expression <expr>.

The expression will have a local `J` containing decoded JSON. `datetime` and
`timedelta` from Python's `datetime` module are also available as locals,
containing the corresponding classes. `SKIP` is also available, if it's the
expression result that indicates nothing should be output.

The output will be written to stdout using JSON serialization. `datetime`
objects will be serialized to ISO format.

Examples:

- Pass through the messages unchanged:

    $ cat eliot.log | python -m eliot.filter J

- Retrieve a specific field from a specific message type, dropping messages
  of other types:

    $ cat eliot.log | python -m eliot.filter \\
        "J['field'] if J.get('message_type') == 'my:message' else SKIP"
"""


def main(sys=sys):
    """
    Run the program.

    Accept arguments from L{sys.argv}, read from L{sys.stdin}, write to
    L{sys.stdout}.

    @param sys: An object with same interface and defaulting to the L{sys}
        module.
    """
    if len(sys.argv) != 2:
        sys.stderr.write(USAGE)
        return 1
    EliotFilter(sys.argv[1], sys.stdin, sys.stdout).run()
    return 0