1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
From: =?utf-8?b?TWljaGHFgiBHw7Nybnk=?= <mgorny@gentoo.org>
Date: Wed, 18 Jun 2025 07:46:37 +0200
Subject: test: Replace deprecated `event_loop` fixture
Replace the deprecated `event_loop` fixture with an explicit call
to `asyncio.get_running_loop()`, as suggested by the deprecation
warnings:
```
test/test_asyncredis.py:61
test/test_asyncredis.py:61: PytestDeprecationWarning: test_pubsub[fake] is asynchronous and explicitly requests the "event_loop" fixture. Asynchronous fixtures and test functions should use "asyncio.get_running_loop()" instead.
async def test_pubsub(async_redis, event_loop):
```
This fixes compatibility with `pytest-asyncio >= 1.0.0`.
Origin: upstream, https://github.com/cunla/fakeredis-py/pull/396
Last-Update: 2025-09-16
---
test/test_asyncredis.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/test/test_asyncredis.py b/test/test_asyncredis.py
index 8588f79..8909848 100644
--- a/test/test_asyncredis.py
+++ b/test/test_asyncredis.py
@@ -61,7 +61,7 @@ async def test_transaction_fail(async_redis: redis.asyncio.Redis):
await tr.execute()
-async def test_pubsub(async_redis, event_loop):
+async def test_pubsub(async_redis):
queue = asyncio.Queue()
async def reader(ps):
@@ -74,7 +74,7 @@ async def test_pubsub(async_redis, event_loop):
async with async_timeout(5), async_redis.pubsub() as ps:
await ps.subscribe("channel")
- task = event_loop.create_task(reader(ps))
+ task = asyncio.get_running_loop().create_task(reader(ps))
await async_redis.publish("channel", "message1")
await async_redis.publish("channel", "message2")
result1 = await queue.get()
@@ -120,14 +120,14 @@ async def test_blocking_timeout(conn):
@pytest.mark.slow
-async def test_blocking_unblock(async_redis, conn, event_loop):
+async def test_blocking_unblock(async_redis, conn):
"""Blocking command that gets unblocked after some time."""
async def unblock():
await asyncio.sleep(0.1)
await async_redis.rpush("list", "y")
- task = event_loop.create_task(unblock())
+ task = asyncio.get_running_loop().create_task(unblock())
result = await conn.blpop("list", timeout=1)
assert result == (b"list", b"y")
await task
|