File: test_async.py

package info (click to toggle)
python-trame-server 3.6.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 480 kB
  • sloc: python: 4,075; javascript: 5; sh: 4; makefile: 3
file content (92 lines) | stat: -rw-r--r-- 2,251 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor

import pytest
from trame.app import asynchronous, get_server


@pytest.mark.asyncio
async def test_thread_state_sync():
    running_states = []
    value_changes = []

    MULTI_PROCESS_MANAGER = multiprocessing.Manager()
    SPAWN = multiprocessing.get_context("spawn")
    PROCESS_EXECUTOR = ProcessPoolExecutor(1, mp_context=SPAWN)

    loop = asyncio.get_event_loop()
    queue = MULTI_PROCESS_MANAGER.Queue()

    server = get_server("test_thread_state_sync")
    server.state.running = False
    server.state.a = 0

    @server.state.change("running")
    def on_running_change(running, **_):
        running_states.append(running)

    @server.state.change("a")
    def on_a_change(a, **_):
        value_changes.append(a)

    server.start(exec_mode="task", port=0)
    assert await server.ready

    def exec_in_thread(queue):
        with asynchronous.StateQueue(queue) as state:
            assert state.queue is queue

            state.running = True

            state.update(
                {
                    "b": 10,
                    "c": 20,
                }
            )

            for i in range(10):
                time.sleep(0.1)
                state.a = i
                assert state.a == i
                assert state["a"] == i

            state.running = False

    asynchronous.decorate_task(
        loop.run_in_executor(
            PROCESS_EXECUTOR,
            exec_in_thread(queue),
        )
    )
    asynchronous.create_state_queue_monitor_task(server, queue)

    previous_size = len(value_changes)
    while len(value_changes) < 10:
        await asyncio.sleep(0.15)
        assert len(value_changes) > previous_size
        previous_size = len(value_changes)

    assert running_states == [False, True, False]

    assert server.state.b == 10
    assert server.state.c == 20

    await server.stop()


@pytest.mark.asyncio
async def test_task_decorator():
    bg_update = "idle"

    @asynchronous.task
    async def run_something():
        nonlocal bg_update
        bg_update = "ok"

    run_something()
    assert bg_update == "idle"
    await asyncio.sleep(0.1)
    assert bg_update == "ok"