File: base.py

package info (click to toggle)
python-django-pgschemas 1.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 848 kB
  • sloc: python: 3,887; makefile: 33; sh: 10; sql: 2
file content (94 lines) | stat: -rw-r--r-- 3,030 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from django.core.exceptions import ImproperlyConfigured
from django.db.utils import DatabaseError
from django.utils.asyncio import async_unsafe

from django_pgschemas.schema import get_current_schema, get_default_schema
from django_pgschemas.settings import (
    get_base_backend_module,
    get_extra_search_paths,
    get_original_backend_module,
)
from django_pgschemas.utils import check_schema_name, get_limit_set_calls

from .introspection import DatabaseSchemaIntrospection

try:
    try:
        import psycopg as _psycopg
    except ImportError:
        import psycopg2 as _psycopg
except ImportError:
    raise ImproperlyConfigured("Error loading psycopg2 or psycopg module")

try:
    module = get_original_backend_module("base")
except AttributeError:
    module = get_base_backend_module("base")


def get_search_path(schema=None):
    if schema is None:
        schema = get_default_schema()

    search_path = ["public"] if schema.schema_name == "public" else [schema.schema_name, "public"]
    search_path.extend(get_extra_search_paths())

    for part in search_path:
        check_schema_name(part)

    return ", ".join(search_path)


class DatabaseWrapper(module.DatabaseWrapper):
    def __init__(self, *args, **kwargs):
        self._search_path = None
        self._setting_search_path = False
        super().__init__(*args, **kwargs)

        # Patched version of DatabaseIntrospection that only returns the table list for the currently selected schema
        self.introspection = DatabaseSchemaIntrospection(self)

    @async_unsafe
    def close(self) -> None:
        self._search_path = None
        self._setting_search_path = False
        super().close()

    @async_unsafe
    def rollback(self) -> None:
        self._search_path = None
        self._setting_search_path = False
        super().rollback()

    def _handle_search_path(self, cursor=None):
        search_path_for_current_schema = get_search_path(get_current_schema())

        skip = self._setting_search_path or (
            self._search_path == search_path_for_current_schema and get_limit_set_calls()
        )

        if not skip:
            self._setting_search_path = True
            cursor_for_search_path = self.connection.cursor() if cursor is None else cursor

            try:
                cursor_for_search_path.execute(
                    f"SET search_path = {search_path_for_current_schema}"
                )
            except (DatabaseError, _psycopg.InternalError):
                self._search_path = None
            else:
                self._search_path = search_path_for_current_schema
            finally:
                self._setting_search_path = False

            if cursor is None:
                cursor_for_search_path.close()

    def _cursor(self, name=None):
        cursor = super()._cursor(name=name)

        cursor_for_search_path = cursor if name is None else None  # Named cursors cannot be reused
        self._handle_search_path(cursor_for_search_path)

        return cursor