1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
import sqlite3
import typing as tp
from functools import partial
from pathlib import Path
from anyio import CapacityLimiter, to_thread
class Connection:
def __init__(self, _real_connection: sqlite3.Connection) -> None:
self._real_connection = _real_connection
self._limiter = CapacityLimiter(1)
async def __aenter__(self) -> "Connection":
return self
async def __aexit__(self, *args: tp.Any, **kwargs: tp.Any) -> None:
return await self.close()
async def close(self) -> None:
return await to_thread.run_sync(
self._real_connection.close, limiter=self._limiter
)
async def commit(self) -> None:
return await to_thread.run_sync(
self._real_connection.commit, limiter=self._limiter
)
async def rollback(self) -> None:
return await to_thread.run_sync(
self._real_connection.rollback, limiter=self._limiter
)
async def cursor(self) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_connection.cursor, limiter=self._limiter
)
return Cursor(real_cursor, self._limiter)
async def execute(self, sql: str, parameters: tp.Iterable[tp.Any] = ()) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_connection.execute, sql, parameters, limiter=self._limiter
)
return Cursor(real_cursor, self._limiter)
async def executemany(
self, sql: str, seq_of_parameters: tp.Iterable[tp.Iterable[tp.Any]]
) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_connection.executemany,
sql,
seq_of_parameters,
limiter=self._limiter,
)
return Cursor(real_cursor, self._limiter)
async def executescript(self, sql_script: str) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_connection.executescript, sql_script, limiter=self._limiter
)
return Cursor(real_cursor, self._limiter)
class Cursor:
def __init__(self, real_cursor: sqlite3.Cursor, limiter: CapacityLimiter) -> None:
self._real_cursor = real_cursor
self._limiter = limiter
@property
def description(
self,
) -> tp.Union[
tp.Tuple[tp.Tuple[str, None, None, None, None, None, None], ...], tp.Any
]:
return self._real_cursor.description
@property
def rowcount(self) -> int:
return self._real_cursor.rowcount
@property
def arraysize(self) -> int:
return self._real_cursor.arraysize
async def close(self) -> None:
await to_thread.run_sync(self._real_cursor.close, limiter=self._limiter)
async def execute(self, sql: str, parameters: tp.Iterable[tp.Any] = ()) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_cursor.execute, sql, parameters, limiter=self._limiter
)
return Cursor(real_cursor, self._limiter)
async def executemany(
self, sql: str, seq_of_parameters: tp.Iterable[tp.Iterable[tp.Any]]
) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_cursor.executemany, sql, seq_of_parameters, limiter=self._limiter
)
return Cursor(real_cursor, self._limiter)
async def executescript(self, sql_script: str) -> "Cursor":
real_cursor = await to_thread.run_sync(
self._real_cursor.executescript, sql_script, limiter=self._limiter
)
return Cursor(real_cursor, self._limiter)
async def fetchone(self) -> tp.Any:
return await to_thread.run_sync(
self._real_cursor.fetchone, limiter=self._limiter
)
async def fetchmany(self, size: tp.Union[int, None] = 1) -> tp.Any:
return await to_thread.run_sync(
self._real_cursor.fetchmany, size, limiter=self._limiter
)
async def fetchall(self) -> tp.Any:
return await to_thread.run_sync(
self._real_cursor.fetchall, limiter=self._limiter
)
async def connect(
database: tp.Union[str, bytes, Path], **kwargs: tp.Any
) -> "Connection":
kwargs["check_same_thread"] = False
real_connection = await to_thread.run_sync(
partial(sqlite3.connect, database, **kwargs)
)
return Connection(real_connection)
|