File: requests.py

package info (click to toggle)
python-web-poet 0.23.2-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 908 kB
  • sloc: python: 6,112; makefile: 19
file content (39 lines) | stat: -rw-r--r-- 1,372 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from __future__ import annotations

import logging
from collections.abc import Awaitable, Callable
from contextvars import ContextVar
from typing import TypeAlias

from web_poet.exceptions import RequestDownloaderVarError
from web_poet.page_inputs.http import HttpRequest, HttpResponse

logger = logging.getLogger(__name__)

#: Frameworks that wants to support additional requests in ``web-poet`` should
#: set the appropriate implementation of ``request_downloader_var``
#: for requesting data.
RequestDownloaderT: TypeAlias = Callable[[HttpRequest], Awaitable[HttpResponse]]
request_downloader_var: ContextVar[RequestDownloaderT] = ContextVar(
    "request_downloader"
)


async def _perform_request(request: HttpRequest) -> HttpResponse:
    """Given a :class:`~.Request`, execute it using the **request implementation**
    that was set in the ``web_poet.request_downloader_var`` :mod:`contextvars`
    instance.
    """

    logger.info(f"Requesting page: {request}")

    try:
        request_downloader = request_downloader_var.get()
    except LookupError as ex:
        raise RequestDownloaderVarError(
            "Additional requests are used inside the Page Object but the "
            "current framework has not set any HttpRequest Backend via "
            "'web_poet.request_downloader_var'"
        ) from ex

    return await request_downloader(request)