File: test_client_loop.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (52 lines) | stat: -rw-r--r-- 1,823 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from __future__ import annotations

import contextlib

import pytest

from distributed import Client, LocalCluster
from distributed.utils import LoopRunner


@contextlib.contextmanager
def _check_loop_runner():
    loops_before = LoopRunner._all_loops.copy()
    yield
    # Internal loops registry must the same as before cluster running.
    # This means loop runners in LocalCluster and Client correctly stopped.
    # See LoopRunner._stop_unlocked().
    assert loops_before == LoopRunner._all_loops


def _check_cluster_and_client_loop(loop):
    # Setup simple cluster with one threaded worker.
    # Complex setup is not required here since we test only IO loop teardown.
    with LocalCluster(
        loop=loop, n_workers=1, dashboard_address=":0", processes=False
    ) as cluster:
        with Client(cluster, loop=loop) as client:
            client.run(max, 1, 2)


# Test if Client stops LoopRunner on close.
@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning")
def test_close_loop_sync_start_new_loop(cleanup):
    with _check_loop_runner():
        _check_cluster_and_client_loop(loop=None)


# Test if Client stops LoopRunner on close.
@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning")
def test_close_loop_sync_use_running_loop(cleanup):
    with _check_loop_runner():
        # Start own loop or use current thread's one.
        loop_runner = LoopRunner()
        loop_runner.start()

        try:
            _check_cluster_and_client_loop(loop=loop_runner.loop)
        finally:
            # own loop must be explicitly stopped.
            loop_runner.stop()