from typing import TYPE_CHECKING, Any, List, Optional, Type, cast
from unittest import mock

import django

from django.core.exceptions import (
    FieldDoesNotExist,
    ImproperlyConfigured,
    SuspiciousOperation,
)
from django.db import transaction
from django.db.backends.ddl_references import Statement
from django.db.backends.postgresql.schema import (  # type: ignore[import]
    DatabaseSchemaEditor,
)
from django.db.models import Field, Model

from psqlextra.settings import (
    postgres_prepend_local_search_path,
    postgres_reset_local_search_path,
)
from psqlextra.type_assertions import is_sql_with_params
from psqlextra.types import PostgresPartitioningMethod

from . import base_impl
from .introspection import PostgresIntrospection
from .side_effects import (
    HStoreRequiredSchemaEditorSideEffect,
    HStoreUniqueSchemaEditorSideEffect,
)

if TYPE_CHECKING:

    class SchemaEditor(DatabaseSchemaEditor):
        pass

else:
    SchemaEditor = base_impl.schema_editor()


class PostgresSchemaEditor(SchemaEditor):
    """Schema editor that adds extra methods for PostgreSQL specific features
    and hooks into existing implementations to add side effects specific to
    PostgreSQL."""

    sql_add_pk = "ALTER TABLE %s ADD PRIMARY KEY (%s)"

    sql_create_fk_not_valid = f"{SchemaEditor.sql_create_fk} NOT VALID"
    sql_validate_fk = "ALTER TABLE %s VALIDATE CONSTRAINT %s"

    sql_create_sequence_with_owner = "CREATE SEQUENCE %s OWNED BY %s.%s"

    sql_alter_table_storage_setting = "ALTER TABLE %s SET (%s = %s)"
    sql_reset_table_storage_setting = "ALTER TABLE %s RESET (%s)"

    sql_alter_table_schema = "ALTER TABLE %s SET SCHEMA %s"
    sql_create_schema = "CREATE SCHEMA %s"
    sql_delete_schema = "DROP SCHEMA %s"
    sql_delete_schema_cascade = "DROP SCHEMA %s CASCADE"

    sql_create_view = "CREATE VIEW %s AS (%s)"
    sql_replace_view = "CREATE OR REPLACE VIEW %s AS (%s)"
    sql_drop_view = "DROP VIEW IF EXISTS %s"
    sql_create_materialized_view = (
        "CREATE MATERIALIZED VIEW %s AS (%s) WITH DATA"
    )
    sql_drop_materialized_view = "DROP MATERIALIZED VIEW %s"
    sql_refresh_materialized_view = "REFRESH MATERIALIZED VIEW %s"
    sql_refresh_materialized_view_concurrently = (
        "REFRESH MATERIALIZED VIEW CONCURRENTLY %s"
    )
    sql_partition_by = " PARTITION BY %s (%s)"
    sql_add_default_partition = "CREATE TABLE %s PARTITION OF %s DEFAULT"
    sql_add_hash_partition = "CREATE TABLE %s PARTITION OF %s FOR VALUES WITH (MODULUS %s, REMAINDER %s)"
    sql_add_range_partition = (
        "CREATE TABLE %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)"
    )
    sql_add_list_partition = (
        "CREATE TABLE %s PARTITION OF %s FOR VALUES IN (%s)"
    )
    sql_delete_partition = "DROP TABLE %s"
    sql_table_comment = "COMMENT ON TABLE %s IS %s"

    side_effects: List[DatabaseSchemaEditor] = [
        cast(DatabaseSchemaEditor, HStoreUniqueSchemaEditorSideEffect()),
        cast(DatabaseSchemaEditor, HStoreRequiredSchemaEditorSideEffect()),
    ]

    def __init__(self, connection, collect_sql=False, atomic=True):
        super().__init__(connection, collect_sql, atomic)

        for side_effect in self.side_effects:
            side_effect.execute = self.execute
            side_effect.quote_name = self.quote_name

        self.deferred_sql = []
        self.introspection = PostgresIntrospection(self.connection)

    def create_schema(self, name: str) -> None:
        """Creates a Postgres schema."""

        self.execute(self.sql_create_schema % self.quote_name(name))

    def delete_schema(self, name: str, cascade: bool) -> None:
        """Drops a Postgres schema."""

        sql = (
            self.sql_delete_schema
            if not cascade
            else self.sql_delete_schema_cascade
        )
        self.execute(sql % self.quote_name(name))

    def create_model(self, model: Type[Model]) -> None:
        """Creates a new model."""

        super().create_model(model)

        for side_effect in self.side_effects:
            side_effect.create_model(model)

    def delete_model(self, model: Type[Model]) -> None:
        """Drops/deletes an existing model."""

        for side_effect in self.side_effects:
            side_effect.delete_model(model)

        super().delete_model(model)

    def clone_model_structure_to_schema(
        self, model: Type[Model], *, schema_name: str
    ) -> None:
        """Creates a clone of the columns for the specified model in a separate
        schema.

        The table will have exactly the same name as the model table
        in the default schema. It will have none of the constraints,
        foreign keys and indexes.

        Use this to create a temporary clone of a model table to
        replace the original model table later on. The lack of
        indices and constraints allows for greater write speeds.

        The original model table will be unaffected.

        Arguments:
            model:
                Model to clone the table of into the
                specified schema.

            schema_name:
                Name of the schema to create the cloned
                table in.
        """

        table_name = model._meta.db_table
        quoted_table_name = self.quote_name(model._meta.db_table)
        quoted_schema_name = self.quote_name(schema_name)

        quoted_table_fqn = f"{quoted_schema_name}.{quoted_table_name}"

        self.execute(
            self.sql_create_table
            % {
                "table": quoted_table_fqn,
                "definition": f"LIKE {quoted_table_name} INCLUDING ALL EXCLUDING CONSTRAINTS EXCLUDING INDEXES",
            }
        )

        # Copy sequences
        #
        # Django 4.0 and older do not use IDENTITY so Postgres does
        # not copy the sequences into the new table. We do it manually.
        if django.VERSION < (4, 1):
            with self.connection.cursor() as cursor:
                sequences = self.introspection.get_sequences(cursor, table_name)

            for sequence in sequences:
                if sequence["table"] != table_name:
                    continue

                quoted_sequence_name = self.quote_name(sequence["name"])
                quoted_sequence_fqn = (
                    f"{quoted_schema_name}.{quoted_sequence_name}"
                )
                quoted_column_name = self.quote_name(sequence["column"])

                self.execute(
                    self.sql_create_sequence_with_owner
                    % (
                        quoted_sequence_fqn,
                        quoted_table_fqn,
                        quoted_column_name,
                    )
                )

                self.execute(
                    self.sql_alter_column
                    % {
                        "table": quoted_table_fqn,
                        "changes": self.sql_alter_column_default
                        % {
                            "column": quoted_column_name,
                            "default": "nextval('%s')" % quoted_sequence_fqn,
                        },
                    }
                )

        # Copy storage settings
        #
        # Postgres only copies column-level storage options, not
        # the table-level storage options.
        with self.connection.cursor() as cursor:
            storage_settings = self.introspection.get_storage_settings(
                cursor, model._meta.db_table
            )

        for setting_name, setting_value in storage_settings.items():
            self.alter_table_storage_setting(
                quoted_table_fqn, setting_name, setting_value
            )

    def clone_model_constraints_and_indexes_to_schema(
        self, model: Type[Model], *, schema_name: str
    ) -> None:
        """Adds the constraints, foreign keys and indexes to a model table that
        was cloned into a separate table without them by
        `clone_model_structure_to_schema`.

        Arguments:
            model:
                Model for which the cloned table was created.

            schema_name:
                Name of the schema in which the cloned table
                resides.
        """

        with postgres_prepend_local_search_path(
            [schema_name], using=self.connection.alias
        ):
            for constraint in model._meta.constraints:
                self.add_constraint(model, constraint)  # type: ignore[attr-defined]

            for index in model._meta.indexes:
                self.add_index(model, index)

            if model._meta.unique_together:
                self.alter_unique_together(
                    model, tuple(), model._meta.unique_together
                )

            if model._meta.index_together:
                self.alter_index_together(
                    model, tuple(), model._meta.index_together
                )

            for field in model._meta.local_concrete_fields:  # type: ignore[attr-defined]
                # Django creates primary keys later added to the model with
                # a custom name. We want the name as it was created originally.
                if field.primary_key:
                    with postgres_reset_local_search_path(
                        using=self.connection.alias
                    ):
                        [primary_key_name] = self._constraint_names(  # type: ignore[attr-defined]
                            model, primary_key=True
                        )

                    self.execute(
                        self.sql_create_pk
                        % {
                            "table": self.quote_name(model._meta.db_table),
                            "name": self.quote_name(primary_key_name),
                            "columns": self.quote_name(
                                field.db_column or field.attname
                            ),
                        }
                    )
                    continue

                # Django creates foreign keys in a single statement which acquires
                # a AccessExclusiveLock on the referenced table. We want to avoid
                # that and created the FK as NOT VALID. We can run VALIDATE in
                # a separate transaction later to validate the entries without
                # acquiring a AccessExclusiveLock.
                if field.remote_field:
                    with postgres_reset_local_search_path(
                        using=self.connection.alias
                    ):
                        [fk_name] = self._constraint_names(  # type: ignore[attr-defined]
                            model, [field.column], foreign_key=True
                        )

                    sql = Statement(
                        self.sql_create_fk_not_valid,
                        table=self.quote_name(model._meta.db_table),
                        name=self.quote_name(fk_name),
                        column=self.quote_name(field.column),
                        to_table=self.quote_name(
                            field.target_field.model._meta.db_table
                        ),
                        to_column=self.quote_name(field.target_field.column),
                        deferrable=self.connection.ops.deferrable_sql(),
                    )

                    self.execute(sql)

                # It's hard to alter a field's check because it is defined
                # by the field class, not the field instance. Handle this
                # manually.
                field_check = field.db_parameters(self.connection).get("check")
                if field_check:
                    with postgres_reset_local_search_path(
                        using=self.connection.alias
                    ):
                        [field_check_name] = self._constraint_names(  # type: ignore[attr-defined]
                            model,
                            [field.column],
                            check=True,
                            exclude={
                                constraint.name
                                for constraint in model._meta.constraints
                            },
                        )

                    self.execute(
                        self._create_check_sql(  # type: ignore[attr-defined]
                            model, field_check_name, field_check
                        )
                    )

                # Clone the field and alter its state to math our current
                # table definition. This will cause Django see the missing
                # indices and create them.
                if field.remote_field:
                    # We add the foreign key constraint ourselves with NOT VALID,
                    # hence, we specify `db_constraint=False` on both old/new.
                    # Django won't touch the foreign key constraint.
                    old_field = self._clone_model_field(
                        field, db_index=False, unique=False, db_constraint=False
                    )
                    new_field = self._clone_model_field(
                        field, db_constraint=False
                    )
                    self.alter_field(model, old_field, new_field)
                else:
                    old_field = self._clone_model_field(
                        field, db_index=False, unique=False
                    )
                    new_field = self._clone_model_field(field)
                    self.alter_field(model, old_field, new_field)

    def clone_model_foreign_keys_to_schema(
        self, model: Type[Model], schema_name: str
    ) -> None:
        """Validates the foreign keys in the cloned model table created by
        `clone_model_structure_to_schema` and
        `clone_model_constraints_and_indexes_to_schema`.

        Do NOT run this in the same transaction as the
        foreign keys were added to the table. It WILL
        acquire a long-lived AccessExclusiveLock.

        Arguments:
            model:
                Model for which the cloned table was created.

            schema_name:
                Name of the schema in which the cloned table
                resides.
        """

        constraint_names = self._constraint_names(model, foreign_key=True)  # type: ignore[attr-defined]

        with postgres_prepend_local_search_path(
            [schema_name], using=self.connection.alias
        ):
            for fk_name in constraint_names:
                self.execute(
                    self.sql_validate_fk
                    % (
                        self.quote_name(model._meta.db_table),
                        self.quote_name(fk_name),
                    )
                )

    def alter_table_storage_setting(
        self, table_name: str, name: str, value: str
    ) -> None:
        """Alters a storage setting for a table.

        See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS

        Arguments:
            table_name:
                Name of the table to alter the setting for.

            name:
                Name of the setting to alter.

            value:
                Value to alter the setting to.

                Note that this is always a string, even if it looks
                like a number or a boolean. That's how Postgres
                stores storage settings internally.
        """

        self.execute(
            self.sql_alter_table_storage_setting
            % (self.quote_name(table_name), name, value)
        )

    def alter_model_storage_setting(
        self, model: Type[Model], name: str, value: str
    ) -> None:
        """Alters a storage setting for the model's table.

        See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS

        Arguments:
            model:
                Model of which to alter the table
                setting.

            name:
                Name of the setting to alter.

            value:
                Value to alter the setting to.

                Note that this is always a string, even if it looks
                like a number or a boolean. That's how Postgres
                stores storage settings internally.
        """

        self.alter_table_storage_setting(model._meta.db_table, name, value)

    def reset_table_storage_setting(self, table_name: str, name: str) -> None:
        """Resets a table's storage setting to the database or server default.

        See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS

        Arguments:
            table_name:
                Name of the table to reset the setting for.

            name:
                Name of the setting to reset.
        """

        self.execute(
            self.sql_reset_table_storage_setting
            % (self.quote_name(table_name), name)
        )

    def reset_model_storage_setting(
        self, model: Type[Model], name: str
    ) -> None:
        """Resets a model's table storage setting to the database or server
        default.

        See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS

        Arguments:
            table_name:
            model:
                Model for which to reset the table setting for.

            name:
                Name of the setting to reset.
        """

        self.reset_table_storage_setting(model._meta.db_table, name)

    def alter_table_schema(self, table_name: str, schema_name: str) -> None:
        """Moves the specified table into the specified schema.

        WARNING: Moving models into a different schema than the default
        will break querying the model.

        Arguments:
            table_name:
                Name of the table to move into the specified schema.

            schema_name:
                Name of the schema to move the table to.
        """

        self.execute(
            self.sql_alter_table_schema
            % (self.quote_name(table_name), self.quote_name(schema_name))
        )

    def alter_model_schema(self, model: Type[Model], schema_name: str) -> None:
        """Moves the specified model's table into the specified schema.

        WARNING: Moving models into a different schema than the default
        will break querying the model.

        Arguments:
            model:
               Model of which to move the table.

            schema_name:
                Name of the schema to move the model's table to.
        """

        self.execute(
            self.sql_alter_table_schema
            % (
                self.quote_name(model._meta.db_table),
                self.quote_name(schema_name),
            )
        )

    def refresh_materialized_view_model(
        self, model: Type[Model], concurrently: bool = False
    ) -> None:
        """Refreshes a materialized view."""

        sql_template = (
            self.sql_refresh_materialized_view_concurrently
            if concurrently
            else self.sql_refresh_materialized_view
        )

        sql = sql_template % self.quote_name(model._meta.db_table)
        self.execute(sql)

    def create_view_model(self, model: Type[Model]) -> None:
        """Creates a new view model."""

        self._create_view_model(self.sql_create_view, model)

    def replace_view_model(self, model: Type[Model]) -> None:
        """Replaces a view model with a newer version.

        This is used to alter the backing query of a view.
        """

        self._create_view_model(self.sql_replace_view, model)

    def delete_view_model(self, model: Type[Model]) -> None:
        """Deletes a view model."""

        sql = self.sql_drop_view % self.quote_name(model._meta.db_table)
        self.execute(sql)

    def create_materialized_view_model(self, model: Type[Model]) -> None:
        """Creates a new materialized view model."""

        self._create_view_model(self.sql_create_materialized_view, model)

    def replace_materialized_view_model(self, model: Type[Model]) -> None:
        """Replaces a materialized view with a newer version.

        This is used to alter the backing query of a materialized view.

        Replacing a materialized view is a lot trickier than a normal view.
        For normal views we can use `CREATE OR REPLACE VIEW`, but for
        materialized views, we have to create the new view, copy all
        indexes and constraints and drop the old one.

        This operation is atomic as it runs in a transaction.
        """

        with self.connection.cursor() as cursor:
            constraints = self.introspection.get_constraints(
                cursor, model._meta.db_table
            )

        with transaction.atomic():
            self.delete_materialized_view_model(model)
            self.create_materialized_view_model(model)

            for constraint_name, constraint_options in constraints.items():
                if not constraint_options["definition"]:
                    raise SuspiciousOperation(
                        "Table %s has a constraint '%s' that no definition could be generated for",
                        (model._meta.db_table, constraint_name),
                    )

                self.execute(constraint_options["definition"])

    def delete_materialized_view_model(self, model: Type[Model]) -> None:
        """Deletes a materialized view model."""

        sql = self.sql_drop_materialized_view % self.quote_name(
            model._meta.db_table
        )
        self.execute(sql)

    def create_partitioned_model(self, model: Type[Model]) -> None:
        """Creates a new partitioned model."""

        meta = self._partitioning_properties_for_model(model)

        # get the sql statement that django creates for normal
        # table creations..
        sql, params = self._extract_sql(self.create_model, model)

        partitioning_key_sql = ", ".join(
            self.quote_name(field_name) for field_name in meta.key
        )

        # create a composite key that includes the partitioning key
        sql = sql.replace(" PRIMARY KEY", "")
        if model._meta.pk and model._meta.pk.name not in meta.key:
            sql = sql[:-1] + ", PRIMARY KEY (%s, %s))" % (
                self.quote_name(model._meta.pk.name),
                partitioning_key_sql,
            )
        else:
            sql = sql[:-1] + ", PRIMARY KEY (%s))" % (partitioning_key_sql,)

        # extend the standard CREATE TABLE statement with
        # 'PARTITION BY ...'
        sql += self.sql_partition_by % (
            meta.method.upper(),
            partitioning_key_sql,
        )

        self.execute(sql, params)

    def delete_partitioned_model(self, model: Type[Model]) -> None:
        """Drops the specified partitioned model."""

        return self.delete_model(model)

    def add_range_partition(
        self,
        model: Type[Model],
        name: str,
        from_values: Any,
        to_values: Any,
        comment: Optional[str] = None,
    ) -> None:
        """Creates a new range partition for the specified partitioned model.

        Arguments:
            model:
                Partitioned model to create a partition for.

            name:
                Name to give to the new partition.
                Final name will be "{table_name}_{partition_name}"

            from_values:
                Start of the partitioning key range of
                values that need to be stored in this
                partition.

            to_values:
                End of the partitioning key range of
                values that need to be stored in this
                partition.

            comment:
                Optionally, a comment to add on this
                partition table.
        """

        # asserts the model is a model set up for partitioning
        self._partitioning_properties_for_model(model)

        table_name = self.create_partition_table_name(model, name)

        sql = self.sql_add_range_partition % (
            self.quote_name(table_name),
            self.quote_name(model._meta.db_table),
            "%s",
            "%s",
        )

        with transaction.atomic():
            self.execute(sql, (from_values, to_values))

            if comment:
                self.set_comment_on_table(table_name, comment)

    def add_list_partition(
        self,
        model: Type[Model],
        name: str,
        values: List[Any],
        comment: Optional[str] = None,
    ) -> None:
        """Creates a new list partition for the specified partitioned model.

        Arguments:
            model:
                Partitioned model to create a partition for.

            name:
                Name to give to the new partition.
                Final name will be "{table_name}_{partition_name}"

            values:
                Partition key values that should be
                stored in this partition.

            comment:
                Optionally, a comment to add on this
                partition table.
        """

        # asserts the model is a model set up for partitioning
        self._partitioning_properties_for_model(model)

        table_name = self.create_partition_table_name(model, name)

        sql = self.sql_add_list_partition % (
            self.quote_name(table_name),
            self.quote_name(model._meta.db_table),
            ",".join(["%s" for _ in range(len(values))]),
        )

        with transaction.atomic():
            self.execute(sql, values)

            if comment:
                self.set_comment_on_table(table_name, comment)

    def add_hash_partition(
        self,
        model: Type[Model],
        name: str,
        modulus: int,
        remainder: int,
        comment: Optional[str] = None,
    ) -> None:
        """Creates a new hash partition for the specified partitioned model.

        Arguments:
            model:
                Partitioned model to create a partition for.

            name:
                Name to give to the new partition.
                Final name will be "{table_name}_{partition_name}"

            modulus:
                Integer value by which the key is divided.

            remainder:
                The remainder of the hash value when divided by modulus.

            comment:
                Optionally, a comment to add on this partition table.
        """

        # asserts the model is a model set up for partitioning
        self._partitioning_properties_for_model(model)

        table_name = self.create_partition_table_name(model, name)

        sql = self.sql_add_hash_partition % (
            self.quote_name(table_name),
            self.quote_name(model._meta.db_table),
            "%s",
            "%s",
        )

        with transaction.atomic():
            self.execute(sql, (modulus, remainder))

            if comment:
                self.set_comment_on_table(table_name, comment)

    def add_default_partition(
        self, model: Type[Model], name: str, comment: Optional[str] = None
    ) -> None:
        """Creates a new default partition for the specified partitioned model.

        A default partition is a partition where rows are routed to when
        no more specific partition is a match.

        Arguments:
            model:
                Partitioned model to create a partition for.

            name:
                Name to give to the new partition.
                Final name will be "{table_name}_{partition_name}"

            comment:
                Optionally, a comment to add on this
                partition table.
        """

        # asserts the model is a model set up for partitioning
        self._partitioning_properties_for_model(model)

        table_name = self.create_partition_table_name(model, name)

        sql = self.sql_add_default_partition % (
            self.quote_name(table_name),
            self.quote_name(model._meta.db_table),
        )

        with transaction.atomic():
            self.execute(sql)

            if comment:
                self.set_comment_on_table(table_name, comment)

    def delete_partition(self, model: Type[Model], name: str) -> None:
        """Deletes the partition with the specified name."""

        sql = self.sql_delete_partition % self.quote_name(
            self.create_partition_table_name(model, name)
        )
        self.execute(sql)

    def alter_db_table(
        self, model: Type[Model], old_db_table: str, new_db_table: str
    ) -> None:
        """Alters a table/model."""

        super().alter_db_table(model, old_db_table, new_db_table)

        for side_effect in self.side_effects:
            side_effect.alter_db_table(model, old_db_table, new_db_table)

    def add_field(self, model: Type[Model], field: Field) -> None:
        """Adds a new field to an exisiting model."""

        super().add_field(model, field)

        for side_effect in self.side_effects:
            side_effect.add_field(model, field)

    def remove_field(self, model: Type[Model], field: Field) -> None:
        """Removes a field from an existing model."""

        for side_effect in self.side_effects:
            side_effect.remove_field(model, field)

        super().remove_field(model, field)

    def alter_field(
        self,
        model: Type[Model],
        old_field: Field,
        new_field: Field,
        strict: bool = False,
    ) -> None:
        """Alters an existing field on an existing model."""

        super().alter_field(model, old_field, new_field, strict)

        for side_effect in self.side_effects:
            side_effect.alter_field(model, old_field, new_field, strict)

    def vacuum_table(
        self,
        table_name: str,
        columns: List[str] = [],
        *,
        full: bool = False,
        freeze: bool = False,
        verbose: bool = False,
        analyze: bool = False,
        disable_page_skipping: bool = False,
        skip_locked: bool = False,
        index_cleanup: bool = False,
        truncate: bool = False,
        parallel: Optional[int] = None,
    ) -> None:
        """Runs the VACUUM statement on the specified table with the specified
        options.

        Arguments:
            table_name:
                Name of the table to run VACUUM on.

            columns:
                Optionally, a list of columns to vacuum. If not
                specified, all columns are vacuumed.
        """

        if self.connection.in_atomic_block:
            raise SuspiciousOperation("Vacuum cannot be done in a transaction")

        options = []
        if full:
            options.append("FULL")
        if freeze:
            options.append("FREEZE")
        if verbose:
            options.append("VERBOSE")
        if analyze:
            options.append("ANALYZE")
        if disable_page_skipping:
            options.append("DISABLE_PAGE_SKIPPING")
        if skip_locked:
            options.append("SKIP_LOCKED")
        if index_cleanup:
            options.append("INDEX_CLEANUP")
        if truncate:
            options.append("TRUNCATE")
        if parallel is not None:
            options.append(f"PARALLEL {parallel}")

        sql = "VACUUM"

        if options:
            options_sql = ", ".join(options)
            sql += f" ({options_sql})"

        sql += f" {self.quote_name(table_name)}"

        if columns:
            columns_sql = ", ".join(
                [self.quote_name(column) for column in columns]
            )
            sql += f" ({columns_sql})"

        self.execute(sql)

    def vacuum_model(
        self, model: Type[Model], fields: List[Field] = [], **kwargs
    ) -> None:
        """Runs the VACUUM statement on the table of the specified model with
        the specified options.

        Arguments:
            table_name:
            model:
                Model of which to run VACUUM the table.

            fields:
                Optionally, a list of fields to vacuum. If not
                specified, all fields are vacuumed.
        """

        columns = [
            field.column
            for field in fields
            if getattr(field, "concrete", False) and field.column
        ]
        self.vacuum_table(model._meta.db_table, columns, **kwargs)

    def set_comment_on_table(self, table_name: str, comment: str) -> None:
        """Sets the comment on the specified table."""

        sql = self.sql_table_comment % (self.quote_name(table_name), "%s")
        self.execute(sql, (comment,))

    def _create_view_model(self, sql: str, model: Type[Model]) -> None:
        """Creates a new view model using the specified SQL query."""

        meta = self._view_properties_for_model(model)

        with self.connection.cursor() as cursor:
            view_sql = cursor.mogrify(*meta.query)
            if isinstance(view_sql, bytes):
                view_sql = view_sql.decode("utf-8")

        self.execute(sql % (self.quote_name(model._meta.db_table), view_sql))

    def _extract_sql(self, method, *args):
        """Calls the specified method with the specified arguments and
        intercepts the SQL statement it WOULD execute.

        We use this to figure out the exact SQL statement Django would
        execute. We can then make a small modification and execute it
        ourselves.
        """

        with mock.patch.object(self, "execute") as execute:
            method(*args)

            return tuple(execute.mock_calls[0])[1]

    @staticmethod
    def _view_properties_for_model(model: Type[Model]):
        """Gets the view options for the specified model.

        Raises:
            ImproperlyConfigured:
                When the specified model is not set up
                as a view.
        """

        meta = getattr(model, "_view_meta", None)
        if not meta:
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be a view."
                    " Create the `ViewMeta` class as a child of '%s'."
                )
                % (model.__name__, model.__name__)
            )

        if not is_sql_with_params(meta.query):
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be a view."
                    " Set the `query` and `key` attribute on the"
                    " `ViewMeta` class as a child of '%s'"
                )
                % (model.__name__, model.__name__)
            )

        return meta

    @staticmethod
    def _partitioning_properties_for_model(model: Type[Model]):
        """Gets the partitioning options for the specified model.

        Raises:
            ImproperlyConfigured:
                When the specified model is not set up
                for partitioning.
        """

        meta = getattr(model, "_partitioning_meta", None)
        if not meta:
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be partitioned."
                    " Create the `PartitioningMeta` class as a child of '%s'."
                )
                % (model.__name__, model.__name__)
            )

        if not meta.method or not meta.key:
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be partitioned."
                    " Set the `method` and `key` attributes on the"
                    " `PartitioningMeta` class as a child of '%s'"
                )
                % (model.__name__, model.__name__)
            )

        if meta.method not in PostgresPartitioningMethod:
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be partitioned."
                    " '%s' is not a member of the PostgresPartitioningMethod enum."
                )
                % (model.__name__, meta.method)
            )

        if not isinstance(meta.key, list):
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be partitioned."
                    " Partitioning key should be a list (of field names or values,"
                    " depending on the partitioning method)."
                )
                % model.__name__
            )

        try:
            for field_name in meta.key:
                model._meta.get_field(field_name)
        except FieldDoesNotExist:
            raise ImproperlyConfigured(
                (
                    "Model '%s' is not properly configured to be partitioned."
                    " Field '%s' in partitioning key %s is not a valid field on"
                    " '%s'."
                )
                % (model.__name__, field_name, meta.key, model.__name__)
            )

        return meta

    def create_partition_table_name(self, model: Type[Model], name: str) -> str:
        return "%s_%s" % (model._meta.db_table.lower(), name.lower())

    def _clone_model_field(self, field: Field, **overrides) -> Field:
        """Clones the specified model field and overrides its kwargs with the
        specified overrides.

        The cloned field will not be contributed to the model.
        """

        _, _, field_args, field_kwargs = field.deconstruct()

        cloned_field_args = field_args[:]
        cloned_field_kwargs = {**field_kwargs, **overrides}

        cloned_field = field.__class__(
            *cloned_field_args, **cloned_field_kwargs
        )
        cloned_field.model = field.model
        cloned_field.set_attributes_from_name(field.name)

        if cloned_field.remote_field and field.remote_field:
            cloned_field.remote_field.model = field.remote_field.model
            cloned_field.set_attributes_from_rel()  # type: ignore[attr-defined]

        return cloned_field
