File: test_loop.py

package info (click to toggle)
python-aiohttp 3.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 5,612 kB
  • sloc: python: 36,917; ansic: 15,734; makefile: 365; sh: 83
file content (39 lines) | stat: -rw-r--r-- 1,164 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio
import platform
import threading

import pytest

from aiohttp import web
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop


@pytest.mark.skipif(platform.system() == "Windows",
                    reason="the test is not valid for Windows")
async def test_subprocess_co(loop) -> None:
    assert isinstance(threading.current_thread(), threading._MainThread)
    proc = await asyncio.create_subprocess_shell(
        "exit 0", loop=loop, stdin=asyncio.subprocess.DEVNULL,
        stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
    await proc.wait()


class TestCase(AioHTTPTestCase):
    async def get_application(self):
        app = web.Application()
        app.on_startup.append(self.on_startup_hook)
        return app

    async def on_startup_hook(self, app):
        self.on_startup_called = True

    @unittest_run_loop
    async def test_on_startup_hook(self) -> None:
        self.assertTrue(self.on_startup_called)

    def test_default_loop(self) -> None:
        self.assertIs(self.loop, asyncio.get_event_loop())


def test_default_loop(loop) -> None:
    assert asyncio.get_event_loop() is loop