File: test_process.py

package info (click to toggle)
python-simpy3 3.0.11-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,080 kB
  • sloc: python: 2,885; makefile: 138
file content (205 lines) | stat: -rw-r--r-- 4,954 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
Tests for the ``simpy.events.Process``.

"""
# Pytest gets the parameters "env" and "log" from the *conftest.py* file
import pytest

from simpy import Interrupt


def test_start_non_process(env):
    """Check that you cannot start a normal function."""
    def foo():
        pass

    pytest.raises(ValueError, env.process, foo)


def test_get_state(env):
    """A process is alive until it's generator has not terminated."""
    def pem_a(env):
        yield env.timeout(3)

    def pem_b(env, pem_a):
        yield env.timeout(1)
        assert pem_a.is_alive

        yield env.timeout(3)
        assert not pem_a.is_alive

    proc_a = env.process(pem_a(env))
    env.process(pem_b(env, proc_a))
    env.run()


def test_target(env):
    def pem(env, event):
        yield event

    event = env.timeout(5)
    proc = env.process(pem(env, event))

    # Wait until "proc" is initialized and yielded the event
    while env.peek() < 5:
        env.step()
    assert proc.target is event
    proc.interrupt()


def test_wait_for_proc(env):
    """A process can wait until another process finishes."""
    def finisher(env):
        yield env.timeout(5)

    def waiter(env, finisher):
        proc = env.process(finisher(env))
        yield proc  # Waits until "proc" finishes

        assert env.now == 5

    env.process(waiter(env, finisher))
    env.run()


def test_exit(env):
    """Processes can set a return value via an ``exit()`` function,
    comparable to ``sys.exit()``.

    """
    def child(env):
        yield env.timeout(1)
        env.exit(env.now)

    def parent(env):
        result1 = yield env.process(child(env))
        result2 = yield env.process(child(env))

        assert [result1, result2] == [1, 2]

    env.process(parent(env))
    env.run()


@pytest.mark.skipif('sys.version_info[:2] < (3, 3)')
def test_return_value(env):
    """Processes can set a return value."""
    # Python < 3.2 would raise a SyntaxError if this was real code ...
    code = """def child(env):
        yield env.timeout(1)
        return env.now
    """
    globs, locs = {}, {}
    code = compile(code, '<string>', 'exec')
    eval(code, globs, locs)
    child = locs['child']

    def parent(env):
        result1 = yield env.process(child(env))
        result2 = yield env.process(child(env))

        assert [result1, result2] == [1, 2]

    env.process(parent(env))
    env.run()


def test_child_exception(env):
    """A child catches an exception and sends it to its parent."""
    def child(env):
        try:
            yield env.timeout(1)
            raise RuntimeError('Onoes!')
        except RuntimeError as err:
            env.exit(err)

    def parent(env):
        result = yield env.process(child(env))
        assert isinstance(result, Exception)

    env.process(parent(env))
    env.run()


def test_interrupted_join(env):
    """Interrupts remove a process from the callbacks of its target."""

    def interruptor(env, process):
        yield env.timeout(1)
        process.interrupt()

    def child(env):
        yield env.timeout(2)

    def parent(env):
        child_proc = env.process(child(env))
        try:
            yield child_proc
            pytest.fail('Did not receive an interrupt.')
        except Interrupt:
            assert env.now == 1
            assert child_proc.is_alive

            # We should not get resumed when child terminates.
            yield env.timeout(5)
            assert env.now == 6

    parent_proc = env.process(parent(env))
    env.process(interruptor(env, parent_proc))
    env.run()


def test_interrupted_join_and_rejoin(env):
    """Tests that interrupts are raised while the victim is waiting for
    another process. The victim tries to join again.

    """
    def interruptor(env, process):
        yield env.timeout(1)
        process.interrupt()

    def child(env):
        yield env.timeout(2)

    def parent(env):
        child_proc = env.process(child(env))
        try:
            yield child_proc
            pytest.fail('Did not receive an interrupt.')
        except Interrupt:
            assert env.now == 1
            assert child_proc.is_alive

            yield child_proc
            assert env.now == 2

    parent_proc = env.process(parent(env))
    env.process(interruptor(env, parent_proc))
    env.run()


def test_error_and_interrupted_join(env):
    def child_a(env, process):
        process.interrupt()
        env.exit()
        yield  # Dummy yield

    def child_b(env):
        raise AttributeError('spam')
        yield  # Dummy yield

    def parent(env):
        env.process(child_a(env, env.active_process))
        b = env.process(child_b(env))

        try:
            yield b
        # This interrupt unregisters me from b so I won't receive its
        # AttributeError
        except Interrupt:
            pass

        yield env.timeout(0)

    env.process(parent(env))
    pytest.raises(AttributeError, env.run)