File: aiohttp_utils.py

package info (click to toggle)
vcr.py 8.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,036 kB
  • sloc: python: 6,275; makefile: 188; sh: 1
file content (42 lines) | stat: -rw-r--r-- 1,283 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# flake8: noqa
import asyncio

import aiohttp


async def aiohttp_request(loop, method, url, output="text", encoding="utf-8", content_type=None, **kwargs):
    async with aiohttp.ClientSession(loop=loop) as session:
        response_ctx = session.request(method, url, **kwargs)

        response = await response_ctx.__aenter__()
        if output == "text":
            content = await response.text()
        elif output == "json":
            content_type = content_type or "application/json"
            content = await response.json(encoding=encoding, content_type=content_type)
        elif output == "raw":
            content = await response.read()
        elif output == "stream":
            content = await response.content.read()

        response_ctx._resp.close()
        await session.close()

        return response, content


def aiohttp_app():
    async def hello(request):
        return aiohttp.web.Response(text="hello")

    async def json(request):
        return aiohttp.web.json_response({})

    async def json_empty_body(request):
        return aiohttp.web.json_response()

    app = aiohttp.web.Application()
    app.router.add_get("/", hello)
    app.router.add_get("/json", json)
    app.router.add_get("/json/empty", json_empty_body)
    return app