File: parse.py

package info (click to toggle)
python-eliot 1.16.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 964 kB
  • sloc: python: 8,641; makefile: 151
file content (191 lines) | stat: -rw-r--r-- 6,085 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
Parse a stream of serialized messages into a forest of
``WrittenAction`` and ``WrittenMessage`` objects.
"""

from pyrsistent import PClass, pmap_field, pset_field, discard

from ._message import WrittenMessage, TASK_UUID_FIELD
from ._action import (
    TaskLevel,
    WrittenAction,
    ACTION_STATUS_FIELD,
    STARTED_STATUS,
    ACTION_TYPE_FIELD,
)


class Task(PClass):
    """
    A tree of actions with the same task UUID.
    """

    _nodes = pmap_field(TaskLevel, (WrittenAction, WrittenMessage))
    _completed = pset_field(TaskLevel)
    _root_level = TaskLevel(level=[])

    def root(self):
        """
        @return: The root L{WrittenAction}.
        """
        return self._nodes[self._root_level]

    def is_complete(self):
        """
        @return bool: True only if all messages in the task tree have been
        added to it.
        """
        return self._root_level in self._completed

    def _insert_action(self, node):
        """
        Add a L{WrittenAction} to the tree.

        Parent actions will be created as necessary.

        @param child: A L{WrittenAction} to add to the tree.

        @return: Updated L{Task}.
        """
        task = self
        if (
            node.end_message
            and node.start_message
            and (len(node.children) == node.end_message.task_level.level[-1] - 2)
        ):
            # Possibly this action is complete, make sure all sub-actions
            # are complete:
            completed = True
            for child in node.children:
                if (
                    isinstance(child, WrittenAction)
                    and child.task_level not in self._completed
                ):
                    completed = False
                    break
            if completed:
                task = task.transform(["_completed"], lambda s: s.add(node.task_level))
        task = task.transform(["_nodes", node.task_level], node)
        return task._ensure_node_parents(node)

    def _ensure_node_parents(self, child):
        """
        Ensure the node (WrittenAction/WrittenMessage) is referenced by parent
        nodes.

        Parent actions will be created as necessary.

        @param child: A L{WrittenMessage} or L{WrittenAction} which is
            being added to the tree.

        @return: Updated L{Task}.
        """
        task_level = child.task_level
        if task_level.parent() is None:
            return self

        parent = self._nodes.get(task_level.parent())
        if parent is None:
            parent = WrittenAction(
                task_level=task_level.parent(), task_uuid=child.task_uuid
            )
        parent = parent._add_child(child)
        return self._insert_action(parent)

    def add(self, message_dict):
        """
        Update the L{Task} with a dictionary containing a serialized Eliot
        message.

        @param message_dict: Dictionary whose task UUID matches this one.

        @return: Updated L{Task}.
        """
        is_action = message_dict.get(ACTION_TYPE_FIELD) is not None
        written_message = WrittenMessage.from_dict(message_dict)
        if is_action:
            action_level = written_message.task_level.parent()
            action = self._nodes.get(action_level)
            if action is None:
                action = WrittenAction(
                    task_level=action_level, task_uuid=message_dict[TASK_UUID_FIELD]
                )
            if message_dict[ACTION_STATUS_FIELD] == STARTED_STATUS:
                # Either newly created MissingAction, or one created by
                # previously added descendant of the action.
                action = action._start(written_message)
            else:
                action = action._end(written_message)
            return self._insert_action(action)
        else:
            # Special case where there is no action:
            if written_message.task_level.level == [1]:
                return self.transform(
                    ["_nodes", self._root_level],
                    written_message,
                    ["_completed"],
                    lambda s: s.add(self._root_level),
                )
            else:
                return self._ensure_node_parents(written_message)


class Parser(PClass):
    """
    Parse serialized Eliot messages into L{Task} instances.

    @ivar _tasks: Map from UUID to corresponding L{Task}.
    """

    _tasks = pmap_field(str, Task)

    def add(self, message_dict):
        """
        Update the L{Parser} with a dictionary containing a serialized Eliot
        message.

        @param message_dict: Dictionary of serialized Eliot message.

        @return: Tuple of (list of completed L{Task} instances, updated
            L{Parser}).
        """
        uuid = message_dict[TASK_UUID_FIELD]
        if uuid in self._tasks:
            task = self._tasks[uuid]
        else:
            task = Task()
        task = task.add(message_dict)
        if task.is_complete():
            parser = self.transform(["_tasks", uuid], discard)
            return [task], parser
        else:
            parser = self.transform(["_tasks", uuid], task)
            return [], parser

    def incomplete_tasks(self):
        """
        @return: List of L{Task} that are not yet complete.
        """
        return list(self._tasks.values())

    @classmethod
    def parse_stream(cls, iterable):
        """
        Parse a stream of messages into a stream of L{Task} instances.

        :param iterable: An iterable of serialized Eliot message dictionaries.

        :return: An iterable of parsed L{Task} instances. Remaining
            incomplete L{Task} will be returned when the input stream is
            exhausted.
        """
        parser = Parser()
        for message_dict in iterable:
            completed, parser = parser.add(message_dict)
            for task in completed:
                yield task
        for task in parser.incomplete_tasks():
            yield task


__all__ = ["Parser", "Task", "TaskLevel", "WrittenMessage", "WrittenAction"]