File: records.py

package info (click to toggle)
python-databases 0.9.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 312 kB
  • sloc: python: 3,333; makefile: 11
file content (136 lines) | stat: -rw-r--r-- 4,404 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import enum
import typing
from datetime import date, datetime, time

from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.engine.row import Row as SQLRow
from sqlalchemy.sql.compiler import _CompileLabel
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import JSON
from sqlalchemy.types import TypeEngine

from databases.interfaces import Record as RecordInterface

DIALECT_EXCLUDE = {"postgresql"}


class Record(RecordInterface):
    __slots__ = (
        "_row",
        "_result_columns",
        "_dialect",
        "_column_map",
        "_column_map_int",
        "_column_map_full",
    )

    def __init__(
        self,
        row: typing.Any,
        result_columns: tuple,
        dialect: Dialect,
        column_maps: typing.Tuple[
            typing.Mapping[typing.Any, typing.Tuple[int, TypeEngine]],
            typing.Mapping[int, typing.Tuple[int, TypeEngine]],
            typing.Mapping[str, typing.Tuple[int, TypeEngine]],
        ],
    ) -> None:
        self._row = row
        self._result_columns = result_columns
        self._dialect = dialect
        self._column_map, self._column_map_int, self._column_map_full = column_maps

    @property
    def _mapping(self) -> typing.Mapping:
        return self._row

    def keys(self) -> typing.KeysView:
        return self._mapping.keys()

    def values(self) -> typing.ValuesView:
        return self._mapping.values()

    def __getitem__(self, key: typing.Any) -> typing.Any:
        if len(self._column_map) == 0:
            return self._row[key]
        elif isinstance(key, Column):
            idx, datatype = self._column_map_full[str(key)]
        elif isinstance(key, int):
            idx, datatype = self._column_map_int[key]
        else:
            idx, datatype = self._column_map[key]

        raw = self._row[idx]
        processor = datatype._cached_result_processor(self._dialect, None)

        if self._dialect.name in DIALECT_EXCLUDE:
            if processor is not None and isinstance(raw, (int, str, float)):
                return processor(raw)

        return raw

    def __iter__(self) -> typing.Iterator:
        return iter(self._row.keys())

    def __len__(self) -> int:
        return len(self._row)

    def __getattr__(self, name: str) -> typing.Any:
        try:
            return self.__getitem__(name)
        except KeyError as e:
            raise AttributeError(e.args[0]) from e


class Row(SQLRow):
    def __getitem__(self, key: typing.Any) -> typing.Any:
        """
        An instance of a Row in SQLAlchemy allows the access
        to the Row._fields as tuple and the Row._mapping for
        the values.
        """
        if isinstance(key, int):
            return super().__getitem__(key)

        idx = self._key_to_index[key][0]
        return super().__getitem__(idx)

    def keys(self):
        return self._mapping.keys()

    def values(self):
        return self._mapping.values()


def create_column_maps(
    result_columns: typing.Any,
) -> typing.Tuple[
    typing.Mapping[typing.Any, typing.Tuple[int, TypeEngine]],
    typing.Mapping[int, typing.Tuple[int, TypeEngine]],
    typing.Mapping[str, typing.Tuple[int, TypeEngine]],
]:
    """
    Generate column -> datatype mappings from the column definitions.

    These mappings are used throughout PostgresConnection methods
    to initialize Record-s. The underlying DB driver does not do type
    conversion for us so we have wrap the returned asyncpg.Record-s.

    :return: Three mappings from different ways to address a column to \
                corresponding column indexes and datatypes: \
                1. by column identifier; \
                2. by column index; \
                3. by column name in Column sqlalchemy objects.
    """
    column_map, column_map_int, column_map_full = {}, {}, {}
    for idx, (column_name, _, column, datatype) in enumerate(result_columns):
        column_map[column_name] = (idx, datatype)
        column_map_int[idx] = (idx, datatype)

        # Added in SQLA 2.0 and _CompileLabels do not have _annotations
        # When this happens, the mapping is on the second position
        if isinstance(column[0], _CompileLabel):
            column_map_full[str(column[2])] = (idx, datatype)
        else:
            column_map_full[str(column[0])] = (idx, datatype)
    return column_map, column_map_int, column_map_full