File: __init__.py

package info (click to toggle)
python-pyhanko-certvalidator 0.26.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,956 kB
  • sloc: python: 9,254; sh: 47; makefile: 4
file content (39 lines) | stat: -rw-r--r-- 1,257 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from typing import Optional

import aiohttp

from ..api import FetcherBackend, Fetchers
from .cert_fetch_client import AIOHttpCertificateFetcher
from .crl_client import AIOHttpCRLFetcher
from .ocsp_client import AIOHttpOCSPFetcher
from .util import LazySession

__all__ = ['AIOHttpFetcherBackend']


class AIOHttpFetcherBackend(FetcherBackend):
    def __init__(
        self,
        session: Optional[aiohttp.ClientSession] = None,
        per_request_timeout=10,
    ):
        self.session = session or LazySession()
        self.per_request_timeout = per_request_timeout

    def get_fetchers(self) -> Fetchers:
        session = self.session
        to = self.per_request_timeout
        return Fetchers(
            ocsp_fetcher=AIOHttpOCSPFetcher(session, per_request_timeout=to),
            crl_fetcher=AIOHttpCRLFetcher(session, per_request_timeout=to),
            cert_fetcher=AIOHttpCertificateFetcher(
                session, per_request_timeout=to
            ),
        )

    async def close(self):
        session = self.session
        # only close the session if it's a lazy session;
        # a session passed in by the caller is their own responsibility
        if isinstance(session, LazySession):
            await session.close()