1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
from __future__ import annotations
import asyncio
import pytest
from distributed._asyncio import RLock
from distributed.utils_test import gen_test
@gen_test()
async def test_rlock():
l = RLock()
# Owners are treated as reentrant as long as they compare as equal.
# Make sure that the comparison uses '==' and not 'is'.
x1 = ["x"]
x2 = ["x"]
assert x1 is not x2, "internalized objects"
async with l(x1), l(x2):
assert l._owner == ["x"]
assert l._count == 2
fut = asyncio.create_task(l.acquire("y"))
await asyncio.sleep(0.01)
assert not fut.done()
with pytest.raises(RuntimeError):
l.release("z")
await fut
l.release("y")
with pytest.raises(RuntimeError):
l.release("w")
assert l._owner is None
assert l._count == 0
assert not l._lock.locked()
@gen_test()
async def test_rlock_nonreentrant():
"""Use the RLock with non-reentrant owners"""
l = RLock()
x = object()
y = object()
assert x != y
async with l(x):
assert l._owner is x
fut = asyncio.create_task(l.acquire(y))
await asyncio.sleep(0.01)
assert not fut.done()
await fut
assert l._owner is y
l.release(y)
assert l._owner is None
assert l._count == 0
assert not l._lock.locked()
@gen_test()
async def test_rlock_none():
"""None is a valid owner"""
l = RLock()
async with l(None), l(None):
assert l._owner is None
assert l._count == 2
assert l._lock.locked()
fut = asyncio.create_task(l.acquire("x"))
await asyncio.sleep(0.01)
assert not fut.done()
await fut
assert l._owner == "x"
l.release("x")
assert l._owner is None
assert l._count == 0
assert not l._lock.locked()
@gen_test()
async def test_rlock_release_on_raise():
"""None is a valid owner"""
l = RLock()
with pytest.raises(ZeroDivisionError):
async with l("x"):
1 / 0
assert l._owner is None
assert l._count == 0
assert not l._lock.locked()
|