File: connection.py

package info (click to toggle)
pygresql 1%3A6.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,944 kB
  • sloc: python: 15,052; ansic: 5,730; makefile: 16; sh: 10
file content (156 lines) | stat: -rw-r--r-- 5,077 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""The DB API 2 Connection objects."""

from __future__ import annotations

from contextlib import suppress
from typing import Any, Sequence

from pg.core import Connection as Cnx
from pg.core import (
    DatabaseError,
    DataError,
    Error,
    IntegrityError,
    InterfaceError,
    InternalError,
    NotSupportedError,
    OperationalError,
    ProgrammingError,
    Warning,
)
from pg.error import op_error

from .cast import TypeCache
from .constants import shortcutmethods
from .cursor import Cursor

__all__ = ['Connection']

class Connection:
    """Connection object."""

    # expose the exceptions as attributes on the connection object
    Error = Error
    Warning = Warning
    InterfaceError = InterfaceError
    DatabaseError = DatabaseError
    InternalError = InternalError
    OperationalError = OperationalError
    ProgrammingError = ProgrammingError
    IntegrityError = IntegrityError
    DataError = DataError
    NotSupportedError = NotSupportedError

    def __init__(self, cnx: Cnx) -> None:
        """Create a database connection object."""
        self._cnx: Cnx | None = cnx  # connection
        self._tnx = False  # transaction state
        self.type_cache = TypeCache(cnx)
        self.cursor_type = Cursor
        self.autocommit = False
        try:
            self._cnx.source()
        except Exception as e:
            raise op_error("Invalid connection") from e

    def __enter__(self) -> Connection:
        """Enter the runtime context for the connection object.

        The runtime context can be used for running transactions.

        This also starts a transaction in autocommit mode.
        """
        if self.autocommit:
            cnx = self._cnx
            if not cnx:
                raise op_error("Connection has been closed")
            try:
                cnx.source().execute("BEGIN")
            except DatabaseError:
                raise  # database provides error message
            except Exception as e:
                raise op_error("Can't start transaction") from e
            else:
                self._tnx = True
        return self

    def __exit__(self, et: type[BaseException] | None,
                 ev: BaseException | None, tb: Any) -> None:
        """Exit the runtime context for the connection object.

        This does not close the connection, but it ends a transaction.
        """
        if et is None and ev is None and tb is None:
            self.commit()
        else:
            self.rollback()

    def close(self) -> None:
        """Close the connection object."""
        if not self._cnx:
            raise op_error("Connection has been closed")
        if self._tnx:
            with suppress(DatabaseError):
                self.rollback()
        self._cnx.close()
        self._cnx = None

    @property
    def closed(self) -> bool:
        """Check whether the connection has been closed or is broken."""
        try:
            return not self._cnx or self._cnx.status != 1
        except TypeError:
            return True

    def commit(self) -> None:
        """Commit any pending transaction to the database."""
        if not self._cnx:
            raise op_error("Connection has been closed")
        if self._tnx:
            self._tnx = False
            try:
                self._cnx.source().execute("COMMIT")
            except DatabaseError:
                raise  # database provides error message
            except Exception as e:
                raise op_error("Can't commit transaction") from e

    def rollback(self) -> None:
        """Roll back to the start of any pending transaction."""
        if not self._cnx:
            raise op_error("Connection has been closed")
        if self._tnx:
            self._tnx = False
            try:
                self._cnx.source().execute("ROLLBACK")
            except DatabaseError:
                raise  # database provides error message
            except Exception as e:
                raise op_error("Can't rollback transaction") from e

    def cursor(self) -> Cursor:
        """Return a new cursor object using the connection."""
        if not self._cnx:
            raise op_error("Connection has been closed")
        try:
            return self.cursor_type(self)
        except Exception as e:
            raise op_error("Invalid connection") from e

    if shortcutmethods:  # otherwise do not implement and document this

        def execute(self, operation: str,
                    parameters: Sequence | None = None) -> Cursor:
            """Shortcut method to run an operation on an implicit cursor."""
            cursor = self.cursor()
            cursor.execute(operation, parameters)
            return cursor

        def executemany(self, operation: str,
                        seq_of_parameters: Sequence[Sequence | None]
                        ) -> Cursor:
            """Shortcut method to run an operation against a sequence."""
            cursor = self.cursor()
            cursor.executemany(operation, seq_of_parameters)
            return cursor