1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
"""The DB API 2 Connection objects."""
from __future__ import annotations
from contextlib import suppress
from typing import Any, Sequence
from pg.core import Connection as Cnx
from pg.core import (
DatabaseError,
DataError,
Error,
IntegrityError,
InterfaceError,
InternalError,
NotSupportedError,
OperationalError,
ProgrammingError,
Warning,
)
from pg.error import op_error
from .cast import TypeCache
from .constants import shortcutmethods
from .cursor import Cursor
__all__ = ['Connection']
class Connection:
"""Connection object."""
# expose the exceptions as attributes on the connection object
Error = Error
Warning = Warning
InterfaceError = InterfaceError
DatabaseError = DatabaseError
InternalError = InternalError
OperationalError = OperationalError
ProgrammingError = ProgrammingError
IntegrityError = IntegrityError
DataError = DataError
NotSupportedError = NotSupportedError
def __init__(self, cnx: Cnx) -> None:
"""Create a database connection object."""
self._cnx: Cnx | None = cnx # connection
self._tnx = False # transaction state
self.type_cache = TypeCache(cnx)
self.cursor_type = Cursor
self.autocommit = False
try:
self._cnx.source()
except Exception as e:
raise op_error("Invalid connection") from e
def __enter__(self) -> Connection:
"""Enter the runtime context for the connection object.
The runtime context can be used for running transactions.
This also starts a transaction in autocommit mode.
"""
if self.autocommit:
cnx = self._cnx
if not cnx:
raise op_error("Connection has been closed")
try:
cnx.source().execute("BEGIN")
except DatabaseError:
raise # database provides error message
except Exception as e:
raise op_error("Can't start transaction") from e
else:
self._tnx = True
return self
def __exit__(self, et: type[BaseException] | None,
ev: BaseException | None, tb: Any) -> None:
"""Exit the runtime context for the connection object.
This does not close the connection, but it ends a transaction.
"""
if et is None and ev is None and tb is None:
self.commit()
else:
self.rollback()
def close(self) -> None:
"""Close the connection object."""
if not self._cnx:
raise op_error("Connection has been closed")
if self._tnx:
with suppress(DatabaseError):
self.rollback()
self._cnx.close()
self._cnx = None
@property
def closed(self) -> bool:
"""Check whether the connection has been closed or is broken."""
try:
return not self._cnx or self._cnx.status != 1
except TypeError:
return True
def commit(self) -> None:
"""Commit any pending transaction to the database."""
if not self._cnx:
raise op_error("Connection has been closed")
if self._tnx:
self._tnx = False
try:
self._cnx.source().execute("COMMIT")
except DatabaseError:
raise # database provides error message
except Exception as e:
raise op_error("Can't commit transaction") from e
def rollback(self) -> None:
"""Roll back to the start of any pending transaction."""
if not self._cnx:
raise op_error("Connection has been closed")
if self._tnx:
self._tnx = False
try:
self._cnx.source().execute("ROLLBACK")
except DatabaseError:
raise # database provides error message
except Exception as e:
raise op_error("Can't rollback transaction") from e
def cursor(self) -> Cursor:
"""Return a new cursor object using the connection."""
if not self._cnx:
raise op_error("Connection has been closed")
try:
return self.cursor_type(self)
except Exception as e:
raise op_error("Invalid connection") from e
if shortcutmethods: # otherwise do not implement and document this
def execute(self, operation: str,
parameters: Sequence | None = None) -> Cursor:
"""Shortcut method to run an operation on an implicit cursor."""
cursor = self.cursor()
cursor.execute(operation, parameters)
return cursor
def executemany(self, operation: str,
seq_of_parameters: Sequence[Sequence | None]
) -> Cursor:
"""Shortcut method to run an operation against a sequence."""
cursor = self.cursor()
cursor.executemany(operation, seq_of_parameters)
return cursor
|