File: pool.py

package info (click to toggle)
aiomcache 0.8.2-1
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 128 kB
  • sloc: python: 389; makefile: 4
file content (82 lines) | stat: -rw-r--r-- 2,612 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
from typing import Any, Mapping, NamedTuple, Optional, Set

__all__ = ['MemcachePool']


class Connection(NamedTuple):
    reader: asyncio.StreamReader
    writer: asyncio.StreamWriter


class MemcachePool:
    def __init__(self, host: str, port: int, *, minsize: int, maxsize: int,
                 conn_args: Optional[Mapping[str, Any]] = None):
        self._host = host
        self._port = port
        self._minsize = minsize
        self._maxsize = maxsize
        self.conn_args = conn_args or {}
        self._pool: asyncio.Queue[Connection] = asyncio.Queue()
        self._in_use: Set[Connection] = set()

    async def clear(self) -> None:
        """Clear pool connections."""
        while not self._pool.empty():
            conn = await self._pool.get()
            self._do_close(conn)

    def _do_close(self, conn: Connection) -> None:
        conn.reader.feed_eof()
        conn.writer.close()

    async def acquire(self) -> Connection:
        """Acquire connection from the pool, or spawn new one
        if pool maxsize permits.

        :return: ``tuple`` (reader, writer)
        """
        while self.size() == 0 or self.size() < self._minsize:
            _conn = await self._create_new_conn()
            if _conn is None:
                break
            self._pool.put_nowait(_conn)

        conn: Optional[Connection] = None
        while not conn:
            _conn = await self._pool.get()
            if _conn.reader.at_eof() or _conn.reader.exception() is not None:
                self._do_close(_conn)
                conn = await self._create_new_conn()
            else:
                conn = _conn

        self._in_use.add(conn)
        return conn

    def release(self, conn: Connection) -> None:
        """Releases connection back to the pool.

        :param conn: ``namedtuple`` (reader, writer)
        """
        self._in_use.remove(conn)
        if conn.reader.at_eof() or conn.reader.exception() is not None:
            self._do_close(conn)
        else:
            self._pool.put_nowait(conn)

    async def _create_new_conn(self) -> Optional[Connection]:
        if self.size() < self._maxsize:
            reader, writer = await asyncio.open_connection(
                self._host, self._port, **self.conn_args)
            if self.size() < self._maxsize:
                return Connection(reader, writer)
            else:
                reader.feed_eof()
                writer.close()
                return None
        else:
            return None

    def size(self) -> int:
        return self._pool.qsize() + len(self._in_use)