File: test_locks.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (141 lines) | stat: -rw-r--r-- 3,609 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from __future__ import annotations

import pickle
from datetime import timedelta
from time import sleep

import pytest

from distributed import Lock, get_client
from distributed.metrics import time
from distributed.utils_test import gen_cluster


@gen_cluster(client=True, nthreads=[("127.0.0.1", 8)] * 2)
async def test_lock(c, s, a, b):
    await c.set_metadata("locked", False)

    def f(x):
        client = get_client()
        with Lock("x") as lock:
            assert client.get_metadata("locked") is False
            client.set_metadata("locked", True)
            sleep(0.05)
            assert client.get_metadata("locked") is True
            client.set_metadata("locked", False)

    futures = c.map(f, range(20))
    await c.gather(futures)
    assert not s.extensions["locks"].events
    assert not s.extensions["locks"].ids


@gen_cluster(client=True)
async def test_timeout(c, s, a, b):
    locks = s.extensions["locks"]
    lock = Lock("x")
    result = await lock.acquire()
    assert result is True
    assert locks.ids["x"] == lock.id

    lock2 = Lock("x")
    assert lock.id != lock2.id

    start = time()
    result = await lock2.acquire(timeout=0.1)
    stop = time()
    assert stop - start < 0.3
    assert result is False
    assert locks.ids["x"] == lock.id
    assert not locks.events["x"]

    await lock.release()


@gen_cluster(client=True)
async def test_acquires_with_zero_timeout(c, s, a, b):
    lock = Lock("x")
    await lock.acquire(timeout=0)
    assert lock.locked()
    await lock.release()

    await lock.acquire(timeout="1s")
    await lock.release()
    await lock.acquire(timeout=timedelta(seconds=1))
    await lock.release()


@gen_cluster(client=True)
async def test_acquires_blocking(c, s, a, b):
    lock = Lock("x")
    await lock.acquire(blocking=False)
    assert lock.locked()
    await lock.release()
    assert not lock.locked()

    with pytest.raises(ValueError):
        lock.acquire(blocking=False, timeout=1)


def test_timeout_sync(client):
    with Lock("x") as lock:
        assert Lock("x").acquire(timeout=0.1) is False


@gen_cluster(client=True)
async def test_errors(c, s, a, b):
    lock = Lock("x")
    with pytest.raises(ValueError):
        await lock.release()


def test_lock_sync(client):
    def f(x):
        with Lock("x") as lock:
            client = get_client()
            assert client.get_metadata("locked") is False
            client.set_metadata("locked", True)
            sleep(0.05)
            assert client.get_metadata("locked") is True
            client.set_metadata("locked", False)

    client.set_metadata("locked", False)
    futures = client.map(f, range(10))
    client.gather(futures)


@gen_cluster(client=True)
async def test_lock_types(c, s, a, b):
    for name in [1, ("a", 1), ["a", 1], b"123", "123"]:
        lock = Lock(name)
        assert lock.name == name

        await lock.acquire()
        await lock.release()

    assert not s.extensions["locks"].events


@gen_cluster(client=True)
async def test_serializable(c, s, a, b):
    def f(x, lock=None):
        with lock:
            assert lock.name == "x"
            return x + 1

    lock = Lock("x")
    futures = c.map(f, range(10), lock=lock)
    await c.gather(futures)

    lock2 = pickle.loads(pickle.dumps(lock))
    assert lock2.name == lock.name
    assert lock2.client is lock.client


@gen_cluster(client=True, nthreads=[])
async def test_locks(c, s):
    async with Lock("x") as l1:
        l2 = Lock("x")
        assert l1.client is c
        assert l2.client is c
        assert await l2.acquire(timeout=0.01) is False