File: helpers.py

package info (click to toggle)
pyotgw 2.2.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 408 kB
  • sloc: python: 4,580; sh: 5; makefile: 2
file content (28 lines) | stat: -rw-r--r-- 696 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""Helper functions for tests"""

import asyncio


async def called_x_times(mocked, x, timeout=10):
    """Wait for x or more calls on mocked object or timeout"""

    async def _wait():
        while mocked.call_count < x:
            await asyncio.sleep(0)

    await asyncio.wait_for(_wait(), timeout)


async def called_once(mocked, timeout=10):
    """Wait for at least 1 call on mocked object or timeout"""
    await called_x_times(mocked, 1, timeout)


async def let_queue_drain(queue, timeout=10):
    """Wait for queue to become empty or timeout"""

    async def _wait():
        while not queue.empty():
            await asyncio.sleep(0)

    await asyncio.wait_for(_wait(), timeout)