File: __init__.py

package info (click to toggle)
python-django-pgschemas 1.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 848 kB
  • sloc: python: 3,887; makefile: 33; sh: 10; sql: 2
file content (304 lines) | stat: -rw-r--r-- 11,625 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import enum

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Case, CharField, Q, Value, When
from django.db.models.functions import Concat
from django.db.utils import ProgrammingError

from django_pgschemas.management.commands._executors import parallel, sequential
from django_pgschemas.schema import Schema, get_current_schema
from django_pgschemas.utils import (
    create_schema,
    dynamic_models_exist,
    get_clone_reference,
    get_domain_model,
    get_tenant_model,
)


class CommandScope(enum.Enum):
    ALL = "all"
    DYNAMIC = "dynamic"
    STATIC = "static"

    @classmethod
    def allow_static(cls):
        return [cls.ALL, cls.STATIC]

    @classmethod
    def allow_dynamic(cls):
        return [cls.ALL, cls.DYNAMIC]


EXECUTORS = {
    "sequential": sequential,
    "parallel": parallel,
}


class WrappedSchemaOption:
    scope = CommandScope.ALL
    specific_schemas = None

    allow_interactive = True
    allow_wildcards = True

    def add_arguments(self, parser):
        if self.allow_interactive:
            parser.add_argument(
                "--noinput",
                "--no-input",
                action="store_false",
                dest="interactive",
                help="Tells Django to NOT prompt the user for input of any kind.",
            )
        parser.add_argument(
            "-s",
            "--schema",
            nargs="+",
            dest="schemas",
            help="Schema(s) to execute the current command",
        )
        parser.add_argument(
            "-x",
            "--exclude-schema",
            nargs="+",
            dest="excluded_schemas",
            help="Schema(s) to exclude when executing the current command",
        )

        if self.allow_wildcards:
            parser.add_argument(
                "-as",
                "--include-all-schemas",
                action="store_true",
                dest="all_schemas",
                help="Include all schemas when executing the current command",
            )
            parser.add_argument(
                "-ss",
                "--include-static-schemas",
                action="store_true",
                dest="static_schemas",
                help="Include all static schemas when executing the current command",
            )
            parser.add_argument(
                "-ds",
                "--include-dynamic-schemas",
                action="store_true",
                dest="dynamic_schemas",
                help="Include all dynamic schemas when executing the current command",
            )
            parser.add_argument(
                "-ts",
                "--include-tenant-schemas",
                action="store_true",
                dest="tenant_schemas",
                help="Include all tenant-like schemas when executing the current command",
            )

        parser.add_argument(
            "--parallel",
            dest="parallel",
            action="store_true",
            help="Run command in parallel mode",
        )
        parser.add_argument(
            "--no-create-schemas",
            dest="skip_schema_creation",
            action="store_true",
            help="Skip automatic creation of non-existing schemas",
        )

    def get_schemas_from_options(self, **options):
        skip_schema_creation = options.get("skip_schema_creation", False)
        try:
            schemas = self._get_schemas_from_options(**options)
        except ProgrammingError:
            # This happens with unmigrated database.
            # It can also happen when the tenant model contains unapplied migrations that break.
            raise CommandError(
                "Error while attempting to retrieve dynamic schemas. "
                "Perhaps you need to migrate the 'public' schema first?"
            )
        if self.specific_schemas is not None:
            schemas = [x for x in schemas if x in self.specific_schemas]
            if not schemas:
                raise CommandError("This command can only run in %s" % self.specific_schemas)
        if not skip_schema_creation:
            for schema in schemas:
                create_schema(schema, check_if_exists=True, sync_schema=False, verbosity=0)
        return schemas

    def get_executor_from_options(self, **options):
        return EXECUTORS["parallel"] if options.get("parallel") else EXECUTORS["sequential"]

    def get_scope_display(self):
        return "|".join(self.specific_schemas or []) or self.scope.value

    def _get_schemas_from_options(self, **options):
        schemas = options.get("schemas") or []
        excluded_schemas = options.get("excluded_schemas") or []
        include_all_schemas = options.get("all_schemas") or False
        include_static_schemas = options.get("static_schemas") or False
        include_dynamic_schemas = options.get("dynamic_schemas") or False
        include_tenant_schemas = options.get("tenant_schemas") or False
        dynamic_ready = dynamic_models_exist()
        allow_static = self.scope in CommandScope.allow_static()
        allow_dynamic = self.scope in CommandScope.allow_dynamic()
        clone_reference = get_clone_reference()

        if (
            not schemas
            and not include_all_schemas
            and not include_static_schemas
            and not include_dynamic_schemas
            and not include_tenant_schemas
        ):
            if not self.allow_interactive:
                include_all_schemas = True
            elif options.get("interactive", True):
                schema = input(
                    "Enter schema to run command (leave blank for running on '%s' schemas): "
                    % self.get_scope_display()
                ).strip()

                if schema:
                    schemas.append(schema)
                else:
                    include_all_schemas = True
            else:
                raise CommandError("No schema provided")

        TenantModel = get_tenant_model()
        has_domains = get_domain_model() is not None

        static_schemas = (
            [x for x in settings.TENANTS.keys() if x != "default"] if allow_static else []
        )
        dynamic_schemas = (
            TenantModel.objects.values_list("schema_name", flat=True)
            if TenantModel is not None and dynamic_ready and allow_dynamic
            else []
        )
        if clone_reference and allow_static:
            static_schemas.append(clone_reference)

        schemas_to_return = set()

        if include_all_schemas:
            if not allow_static and not allow_dynamic:
                raise CommandError("Including all schemas is NOT allowed")
            schemas_to_return = schemas_to_return.union(static_schemas + list(dynamic_schemas))
        if include_static_schemas:
            if not allow_static:
                raise CommandError("Including static schemas is NOT allowed")
            schemas_to_return = schemas_to_return.union(static_schemas)
        if include_dynamic_schemas:
            if not allow_dynamic:
                raise CommandError("Including dynamic schemas is NOT allowed")
            schemas_to_return = schemas_to_return.union(dynamic_schemas)
        if include_tenant_schemas:
            if not allow_dynamic:
                raise CommandError("Including tenant-like schemas is NOT allowed")
            schemas_to_return = schemas_to_return.union(dynamic_schemas)
            if clone_reference:
                schemas_to_return.add(clone_reference)

        def find_schema_by_reference(reference, as_excluded=False):
            if reference in settings.TENANTS and reference != "default" and allow_static:
                return reference
            elif reference == clone_reference:
                return reference
            elif (
                TenantModel is not None
                and dynamic_ready
                and TenantModel.objects.filter(schema_name=reference).exists()
                and allow_dynamic
            ):
                return reference
            else:
                local = []
                if allow_static:
                    local += [
                        schema_name
                        for schema_name, data in settings.TENANTS.items()
                        if schema_name not in ["public", "default"]
                        and any(x for x in data.get("DOMAINS", []) if x.startswith(reference))
                    ]
                if TenantModel is not None and dynamic_ready and allow_dynamic:
                    local += (
                        TenantModel.objects.annotate(
                            route=Case(
                                When(
                                    domains__folder="",
                                    then="domains__domain",
                                ),
                                default=Concat(
                                    "domains__domain",
                                    Value("/"),
                                    "domains__folder",
                                    output_field=CharField(),
                                ),
                                output_field=CharField(),
                            )
                            if has_domains
                            else Value("")
                        )
                        .filter(Q(schema_name=reference) | Q(route__startswith=reference))
                        .distinct()
                        .values_list("schema_name", flat=True)
                    )
                if not local:
                    message = (
                        "No schema found for '%s' (excluded)"
                        if as_excluded
                        else "No schema found for '%s'"
                    )
                    raise CommandError(message % reference)
                if len(local) > 1:
                    message = (
                        "More than one tenant found for schema '%s' by domain (excluded), "
                        "please, narrow down the filter"
                        if as_excluded
                        else "More than one tenant found for schema '%s' by domain, please, narrow down the filter"
                    )
                    raise CommandError(message % reference)
                return local[0]

        for schema in schemas:
            included = find_schema_by_reference(schema, as_excluded=False)
            schemas_to_return.add(included)

        for schema in excluded_schemas:
            excluded = find_schema_by_reference(schema, as_excluded=True)
            schemas_to_return -= {excluded}

        return (
            list(schemas_to_return)
            if "public" not in schemas_to_return
            else ["public"] + list(schemas_to_return - {"public"})
        )


class SchemaCommand(WrappedSchemaOption, BaseCommand):
    def handle(self, *args, **options):
        schemas = self.get_schemas_from_options(**options)
        executor = self.get_executor_from_options(**options)
        executor(schemas, self, "_raw_handle_schema", args, options, pass_schema_in_kwargs=True)

    def _raw_handle_schema(self, *args, **kwargs):
        kwargs.pop("schema_name")
        self.handle_schema(get_current_schema(), *args, **kwargs)

    def handle_schema(self, schema: Schema, *args, **options):
        raise NotImplementedError


class StaticSchemaCommand(SchemaCommand):
    scope = CommandScope.STATIC


class DynamicSchemaCommand(SchemaCommand):
    scope = CommandScope.DYNAMIC