File: test_queue.py

package info (click to toggle)
python-roborock 2.39.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,128 kB
  • sloc: python: 10,342; makefile: 17
file content (32 lines) | stat: -rw-r--r-- 679 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio

import pytest

from roborock.exceptions import VacuumError
from roborock.roborock_future import RoborockFuture


def test_can_create():
    RoborockFuture(1)


@pytest.mark.asyncio
async def test_set_result():
    rq = RoborockFuture(1)
    rq.set_result("test")
    assert await rq.async_get(1) == "test"


@pytest.mark.asyncio
async def test_set_exception():
    rq = RoborockFuture(1)
    rq.set_exception(VacuumError("test"))
    with pytest.raises(VacuumError):
        assert await rq.async_get(1)


@pytest.mark.asyncio
async def test_get_timeout():
    rq = RoborockFuture(1)
    with pytest.raises(asyncio.TimeoutError):
        await rq.async_get(0.01)