1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
"""
Parse a stream of serialized messages into a forest of
``WrittenAction`` and ``WrittenMessage`` objects.
"""
from pyrsistent import PClass, pmap_field, pset_field, discard
from ._message import WrittenMessage, TASK_UUID_FIELD
from ._action import (
TaskLevel,
WrittenAction,
ACTION_STATUS_FIELD,
STARTED_STATUS,
ACTION_TYPE_FIELD,
)
class Task(PClass):
"""
A tree of actions with the same task UUID.
"""
_nodes = pmap_field(TaskLevel, (WrittenAction, WrittenMessage))
_completed = pset_field(TaskLevel)
_root_level = TaskLevel(level=[])
def root(self):
"""
@return: The root L{WrittenAction}.
"""
return self._nodes[self._root_level]
def is_complete(self):
"""
@return bool: True only if all messages in the task tree have been
added to it.
"""
return self._root_level in self._completed
def _insert_action(self, node):
"""
Add a L{WrittenAction} to the tree.
Parent actions will be created as necessary.
@param child: A L{WrittenAction} to add to the tree.
@return: Updated L{Task}.
"""
task = self
if (
node.end_message
and node.start_message
and (len(node.children) == node.end_message.task_level.level[-1] - 2)
):
# Possibly this action is complete, make sure all sub-actions
# are complete:
completed = True
for child in node.children:
if (
isinstance(child, WrittenAction)
and child.task_level not in self._completed
):
completed = False
break
if completed:
task = task.transform(["_completed"], lambda s: s.add(node.task_level))
task = task.transform(["_nodes", node.task_level], node)
return task._ensure_node_parents(node)
def _ensure_node_parents(self, child):
"""
Ensure the node (WrittenAction/WrittenMessage) is referenced by parent
nodes.
Parent actions will be created as necessary.
@param child: A L{WrittenMessage} or L{WrittenAction} which is
being added to the tree.
@return: Updated L{Task}.
"""
task_level = child.task_level
if task_level.parent() is None:
return self
parent = self._nodes.get(task_level.parent())
if parent is None:
parent = WrittenAction(
task_level=task_level.parent(), task_uuid=child.task_uuid
)
parent = parent._add_child(child)
return self._insert_action(parent)
def add(self, message_dict):
"""
Update the L{Task} with a dictionary containing a serialized Eliot
message.
@param message_dict: Dictionary whose task UUID matches this one.
@return: Updated L{Task}.
"""
is_action = message_dict.get(ACTION_TYPE_FIELD) is not None
written_message = WrittenMessage.from_dict(message_dict)
if is_action:
action_level = written_message.task_level.parent()
action = self._nodes.get(action_level)
if action is None:
action = WrittenAction(
task_level=action_level, task_uuid=message_dict[TASK_UUID_FIELD]
)
if message_dict[ACTION_STATUS_FIELD] == STARTED_STATUS:
# Either newly created MissingAction, or one created by
# previously added descendant of the action.
action = action._start(written_message)
else:
action = action._end(written_message)
return self._insert_action(action)
else:
# Special case where there is no action:
if written_message.task_level.level == [1]:
return self.transform(
["_nodes", self._root_level],
written_message,
["_completed"],
lambda s: s.add(self._root_level),
)
else:
return self._ensure_node_parents(written_message)
class Parser(PClass):
"""
Parse serialized Eliot messages into L{Task} instances.
@ivar _tasks: Map from UUID to corresponding L{Task}.
"""
_tasks = pmap_field(str, Task)
def add(self, message_dict):
"""
Update the L{Parser} with a dictionary containing a serialized Eliot
message.
@param message_dict: Dictionary of serialized Eliot message.
@return: Tuple of (list of completed L{Task} instances, updated
L{Parser}).
"""
uuid = message_dict[TASK_UUID_FIELD]
if uuid in self._tasks:
task = self._tasks[uuid]
else:
task = Task()
task = task.add(message_dict)
if task.is_complete():
parser = self.transform(["_tasks", uuid], discard)
return [task], parser
else:
parser = self.transform(["_tasks", uuid], task)
return [], parser
def incomplete_tasks(self):
"""
@return: List of L{Task} that are not yet complete.
"""
return list(self._tasks.values())
@classmethod
def parse_stream(cls, iterable):
"""
Parse a stream of messages into a stream of L{Task} instances.
:param iterable: An iterable of serialized Eliot message dictionaries.
:return: An iterable of parsed L{Task} instances. Remaining
incomplete L{Task} will be returned when the input stream is
exhausted.
"""
parser = Parser()
for message_dict in iterable:
completed, parser = parser.add(message_dict)
for task in completed:
yield task
for task in parser.incomplete_tasks():
yield task
__all__ = ["Parser", "Task", "TaskLevel", "WrittenMessage", "WrittenAction"]
|