File: test_as_completed.py

package info (click to toggle)
python3.13 3.13.6-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 121,256 kB
  • sloc: python: 703,743; ansic: 653,888; xml: 31,250; sh: 5,844; cpp: 4,326; makefile: 1,981; objc: 787; lisp: 502; javascript: 213; asm: 75; csh: 12
file content (118 lines) | stat: -rw-r--r-- 3,992 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import itertools
import time
import unittest
import weakref
from concurrent import futures
from concurrent.futures._base import (
    CANCELLED_AND_NOTIFIED, FINISHED, Future)

from test import support

from .util import (
    PENDING_FUTURE, RUNNING_FUTURE,
    CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
    create_future, create_executor_tests, setup_module)


def mul(x, y):
    return x * y


class AsCompletedTests:
    def test_no_timeout(self):
        future1 = self.executor.submit(mul, 2, 21)
        future2 = self.executor.submit(mul, 7, 6)

        completed = set(futures.as_completed(
                [CANCELLED_AND_NOTIFIED_FUTURE,
                 EXCEPTION_FUTURE,
                 SUCCESSFUL_FUTURE,
                 future1, future2]))
        self.assertEqual(set(
                [CANCELLED_AND_NOTIFIED_FUTURE,
                 EXCEPTION_FUTURE,
                 SUCCESSFUL_FUTURE,
                 future1, future2]),
                completed)

    def test_future_times_out(self):
        """Test ``futures.as_completed`` timing out before
        completing it's final future."""
        already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
                             EXCEPTION_FUTURE,
                             SUCCESSFUL_FUTURE}

        # Windows clock resolution is around 15.6 ms
        short_timeout = 0.100
        for timeout in (0, short_timeout):
            with self.subTest(timeout):

                completed_futures = set()
                future = self.executor.submit(time.sleep, short_timeout * 10)

                try:
                    for f in futures.as_completed(
                        already_completed | {future},
                        timeout
                    ):
                        completed_futures.add(f)
                except futures.TimeoutError:
                    pass

                # Check that ``future`` wasn't completed.
                self.assertEqual(completed_futures, already_completed)

    def test_duplicate_futures(self):
        # Issue 20367. Duplicate futures should not raise exceptions or give
        # duplicate responses.
        # Issue #31641: accept arbitrary iterables.
        future1 = self.executor.submit(time.sleep, 2)
        completed = [
            f for f in futures.as_completed(itertools.repeat(future1, 3))
        ]
        self.assertEqual(len(completed), 1)

    def test_free_reference_yielded_future(self):
        # Issue #14406: Generator should not keep references
        # to finished futures.
        futures_list = [Future() for _ in range(8)]
        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
        futures_list.append(create_future(state=FINISHED, result=42))

        with self.assertRaises(futures.TimeoutError):
            for future in futures.as_completed(futures_list, timeout=0):
                futures_list.remove(future)
                wr = weakref.ref(future)
                del future
                support.gc_collect()  # For PyPy or other GCs.
                self.assertIsNone(wr())

        futures_list[0].set_result("test")
        for future in futures.as_completed(futures_list):
            futures_list.remove(future)
            wr = weakref.ref(future)
            del future
            support.gc_collect()  # For PyPy or other GCs.
            self.assertIsNone(wr())
            if futures_list:
                futures_list[0].set_result("test")

    def test_correct_timeout_exception_msg(self):
        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]

        with self.assertRaises(futures.TimeoutError) as cm:
            list(futures.as_completed(futures_list, timeout=0))

        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')


create_executor_tests(globals(), AsCompletedTests)


def setUpModule():
    setup_module()


if __name__ == "__main__":
    unittest.main()