File: impl_test.py

package info (click to toggle)
python-duet 0.2.9-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 236 kB
  • sloc: python: 1,423; sh: 30; makefile: 7
file content (80 lines) | stat: -rw-r--r-- 2,486 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Copyright 2021 The Duet Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import duet
import duet.impl as impl


class CompleteOnFlush(duet.BufferedFuture):
    def __init__(self):
        super().__init__()
        self.flushed = False

    def flush(self):
        self.flushed = True
        self.set_result(None)


def make_task(future: duet.AwaitableFuture) -> impl.Task:
    """Make a task from the given future.

    We advance the task once, which just starts the generator and yields the
    future itself.
    """
    task = impl.Task(future, impl.Scheduler(), None)
    task.advance()
    return task


class TestReadySet:
    def test_get_all_returns_all_ready_tasks(self):
        task1 = make_task(duet.completed_future(None))
        task2 = make_task(duet.completed_future(None))
        task3 = make_task(duet.AwaitableFuture())
        task4 = make_task(duet.completed_future(None))
        rs = impl.ReadySet()
        rs.register(task1)
        rs.register(task2)
        rs.register(task3)
        rs.register(task4)
        tasks = rs.get_all()
        assert tasks == [task1, task2, task4]

    def test_task_added_at_most_once(self):
        task = make_task(duet.completed_future(None))
        rs = impl.ReadySet()
        rs.register(task)
        rs.register(task)
        tasks = rs.get_all()
        assert tasks == [task]

    def test_futures_flushed_if_no_task_ready(self):
        future = CompleteOnFlush()
        task = make_task(future)
        rs = impl.ReadySet()
        rs.register(task)
        tasks = rs.get_all()
        assert tasks == [task]
        assert future.flushed

    def test_futures_not_flushed_if_tasks_ready(self):
        future = CompleteOnFlush()
        task1 = make_task(future)
        task2 = make_task(duet.completed_future(None))
        rs = impl.ReadySet()
        rs.register(task1)
        rs.register(task2)
        tasks = rs.get_all()
        assert tasks == [task2]
        assert not future.flushed