File: _core.py

package info (click to toggle)
python-anysqlite 0.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 152 kB
  • sloc: python: 179; sh: 15; makefile: 2
file content (132 lines) | stat: -rw-r--r-- 4,368 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import sqlite3
import typing as tp
from functools import partial
from pathlib import Path

from anyio import CapacityLimiter, to_thread


class Connection:
    def __init__(self, _real_connection: sqlite3.Connection) -> None:
        self._real_connection = _real_connection
        self._limiter = CapacityLimiter(1)

    async def __aenter__(self) -> "Connection":
        return self

    async def __aexit__(self, *args: tp.Any, **kwargs: tp.Any) -> None:
        return await self.close()

    async def close(self) -> None:
        return await to_thread.run_sync(
            self._real_connection.close, limiter=self._limiter
        )

    async def commit(self) -> None:
        return await to_thread.run_sync(
            self._real_connection.commit, limiter=self._limiter
        )

    async def rollback(self) -> None:
        return await to_thread.run_sync(
            self._real_connection.rollback, limiter=self._limiter
        )

    async def cursor(self) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_connection.cursor, limiter=self._limiter
        )
        return Cursor(real_cursor, self._limiter)

    async def execute(self, sql: str, parameters: tp.Iterable[tp.Any] = ()) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_connection.execute, sql, parameters, limiter=self._limiter
        )
        return Cursor(real_cursor, self._limiter)

    async def executemany(
        self, sql: str, seq_of_parameters: tp.Iterable[tp.Iterable[tp.Any]]
    ) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_connection.executemany,
            sql,
            seq_of_parameters,
            limiter=self._limiter,
        )
        return Cursor(real_cursor, self._limiter)

    async def executescript(self, sql_script: str) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_connection.executescript, sql_script, limiter=self._limiter
        )
        return Cursor(real_cursor, self._limiter)


class Cursor:
    def __init__(self, real_cursor: sqlite3.Cursor, limiter: CapacityLimiter) -> None:
        self._real_cursor = real_cursor
        self._limiter = limiter

    @property
    def description(
        self,
    ) -> tp.Union[
        tp.Tuple[tp.Tuple[str, None, None, None, None, None, None], ...], tp.Any
    ]:
        return self._real_cursor.description

    @property
    def rowcount(self) -> int:
        return self._real_cursor.rowcount

    @property
    def arraysize(self) -> int:
        return self._real_cursor.arraysize

    async def close(self) -> None:
        await to_thread.run_sync(self._real_cursor.close, limiter=self._limiter)

    async def execute(self, sql: str, parameters: tp.Iterable[tp.Any] = ()) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_cursor.execute, sql, parameters, limiter=self._limiter
        )
        return Cursor(real_cursor, self._limiter)

    async def executemany(
        self, sql: str, seq_of_parameters: tp.Iterable[tp.Iterable[tp.Any]]
    ) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_cursor.executemany, sql, seq_of_parameters, limiter=self._limiter
        )
        return Cursor(real_cursor, self._limiter)

    async def executescript(self, sql_script: str) -> "Cursor":
        real_cursor = await to_thread.run_sync(
            self._real_cursor.executescript, sql_script, limiter=self._limiter
        )
        return Cursor(real_cursor, self._limiter)

    async def fetchone(self) -> tp.Any:
        return await to_thread.run_sync(
            self._real_cursor.fetchone, limiter=self._limiter
        )

    async def fetchmany(self, size: tp.Union[int, None] = 1) -> tp.Any:
        return await to_thread.run_sync(
            self._real_cursor.fetchmany, size, limiter=self._limiter
        )

    async def fetchall(self) -> tp.Any:
        return await to_thread.run_sync(
            self._real_cursor.fetchall, limiter=self._limiter
        )


async def connect(
    database: tp.Union[str, bytes, Path], **kwargs: tp.Any
) -> "Connection":
    kwargs["check_same_thread"] = False
    real_connection = await to_thread.run_sync(
        partial(sqlite3.connect, database, **kwargs)
    )
    return Connection(real_connection)