File: queue.py

package info (click to toggle)
python-pyforge 1.4.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 464 kB
  • sloc: python: 3,666; makefile: 12; sh: 7
file content (120 lines) | stat: -rw-r--r-- 3,997 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import sys
import traceback
import logging
from contextlib import contextmanager

from .function_call import FunctionCall
from .setattr import Setattr
from .exceptions import UnexpectedCall, UnexpectedSetattr, ExpectedEventsNotFound
from .queued_node import QueuedNode
from .queued_group import OrderedGroup, AnyOrderGroup, InterleavedGroup

_logger = logging.getLogger("pyforge")


class WheneverDecorator(QueuedNode):
    def __init__(self, obj):
        super(WheneverDecorator, self).__init__()
        self.obj = obj

    def get_expected(self):
        return []

    def get_available(self):
        return [self.obj]

    def pop_matching(self, queue_object):
        return self.obj if self.obj.matches(queue_object) else None

    def __len__(self):
        return len(self.obj)

    def __repr__(self):
        return "<whenever %s>" % (repr(self.obj),)


class ForgeQueue(object):
    def __init__(self, forge):
        super(ForgeQueue, self).__init__()
        self._root_group = OrderedGroup()
        self._recording_group = self._root_group
        self._forge = forge

    def get_expected(self):
        return self._root_group.get_expected()

    def get_available(self):
        return self._root_group.get_available()

    def is_empty(self):
        return not self._root_group.get_expected()

    def __len__(self):
        return len(self._root_group)

    def clear(self):
        self._root_group = OrderedGroup()
        self._recording_group = self._root_group

    def allow_whenever(self, queued_object):
        queued_object.get_parent().discard_child(queued_object)
        self._recording_group.push_out_of_band(WheneverDecorator(queued_object))

    def push_call(self, target, args, kwargs, caller_info):
        return self._push(FunctionCall(target, args, kwargs, caller_info))

    def push_setattr(self, target, name, value, caller_info):
        return self._push(Setattr(target, name, value, caller_info))

    def pop_matching_call(self, target, args, kwargs, caller_info):
        return self._pop_matching(FunctionCall(target, args, kwargs, caller_info), UnexpectedCall)

    def pop_matching_setattr(self, target, name, value, caller_info):
        return self._pop_matching(Setattr(target, name, value, caller_info), UnexpectedSetattr)

    def _push(self, queued_object):
        return self._recording_group.push(queued_object)

    def _pop_matching(self, queued_object, unexpected_class):
        popped = self._root_group.pop_matching(queued_object)
        if popped is None:
            self._log_exception_context()
            raise unexpected_class(self.get_available(), queued_object)
        return popped

    def _log_exception_context(self):
        exc_type, exc_value, exc_tb = sys.exc_info()
        if exc_value is not None:
            _logger.debug("In exception context:\n%s\n%s: %s",
                          "".join(traceback.format_tb(exc_tb)), exc_type.__name__, exc_value)

    @contextmanager
    def _get_group_context(self, group_class):
        group = group_class()
        try:
            self._recording_group.push(group)
            self._recording_group = group
            yield
        finally:
            parent = group.get_parent()
            if group.is_empty():
                parent.discard_child(group)
            self._recording_group = parent

    def get_any_order_group_context(self):
        return self._get_group_context(AnyOrderGroup)

    def get_ordered_group_context(self):
        return self._get_group_context(OrderedGroup)

    def get_interleaved_group_context(self):
        return self._get_group_context(InterleavedGroup)

    def verify(self):
        expected_events = self._root_group.get_expected()
        if expected_events:
            raise ExpectedEventsNotFound(expected_events)

    def __repr__(self):
        return "ForgeQueue(_root_group <id %s>=%s, _recording_group=<id %s>)" % (id(self._root_group),
            repr(self._root_group), id(self._recording_group))