File: locks_test.py

package info (click to toggle)
aioredis 1.3.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 1,104 kB
  • sloc: python: 11,112; makefile: 7
file content (30 lines) | stat: -rw-r--r-- 710 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

from aioredis.locks import Lock


async def test_finished_waiter_cancelled():
    lock = Lock()

    ta = asyncio.ensure_future(lock.acquire())
    await asyncio.sleep(0)
    assert lock.locked()

    tb = asyncio.ensure_future(lock.acquire())
    await asyncio.sleep(0)
    assert len(lock._waiters) == 1

    # Create a second waiter, wake up the first, and cancel it.
    # Without the fix, the second was not woken up and the lock
    # will never be locked
    asyncio.ensure_future(lock.acquire())
    await asyncio.sleep(0)
    lock.release()
    tb.cancel()

    await asyncio.sleep(0)
    assert ta.done()
    assert tb.cancelled()

    await asyncio.sleep(0)
    assert lock.locked()