File: util.py

package info (click to toggle)
python-pyhanko-certvalidator 0.26.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,956 kB
  • sloc: python: 9,254; sh: 47; makefile: 4
file content (67 lines) | stat: -rw-r--r-- 1,846 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import asyncio
from typing import Any, Dict, Union

import aiohttp

from ..api import DEFAULT_USER_AGENT
from ..common_utils import queue_fetch_task

__all__ = ['LazySession', 'AIOHttpMixin']


class LazySession:
    def __init__(self):
        self._session = None

    async def get_session(self):
        session = self._session
        if session is None:
            self._session = session = aiohttp.ClientSession()
        return session

    async def close(self):
        session = self._session
        if session is not None:
            await session.close()


class AIOHttpMixin:
    def __init__(
        self,
        session: Union[aiohttp.ClientSession, LazySession],
        user_agent=None,
        per_request_timeout=10,
    ):
        self._session = session
        self.user_agent = user_agent or DEFAULT_USER_AGENT
        self.per_request_timeout = per_request_timeout
        self.__results: Dict[Any, Any] = {}
        self.__result_events: Dict[Any, asyncio.Event] = {}
        super().__init__()

    async def get_session(self) -> aiohttp.ClientSession:
        session = self._session
        if isinstance(session, LazySession):
            return await session.get_session()
        else:
            return session

    def get_results(self):
        return {
            v for v in self.__results.values() if not isinstance(v, Exception)
        }

    def get_results_for_tag(self, tag):
        result = self.__results[tag]
        if isinstance(result, Exception):
            raise KeyError

    def _iter_results(self):
        for k, v in self.__results.items():
            if not isinstance(v, Exception):
                yield k, v

    async def _post_fetch_task(self, tag, async_fun):
        return await queue_fetch_task(
            self.__results, self.__result_events, tag, async_fun
        )