File: sqlite.py

package info (click to toggle)
django-dbbackup 4.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 512 kB
  • sloc: python: 3,767; makefile: 7
file content (110 lines) | stat: -rw-r--r-- 3,990 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import warnings
from io import BytesIO
from shutil import copyfileobj
from tempfile import SpooledTemporaryFile

from django.db import IntegrityError, OperationalError

from .base import BaseDBConnector

DUMP_TABLES = """
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND "type" == 'table'
ORDER BY "name"
"""
DUMP_ETC = """
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')
"""


class SqliteConnector(BaseDBConnector):
    """
    Create a dump at SQL layer like could make ``.dumps`` in sqlite3.
    Restore by evaluate the created SQL.
    """

    def _write_dump(self, fileobj):
        cursor = self.connection.cursor()
        cursor.execute(DUMP_TABLES)
        for table_name, _, sql in cursor.fetchall():
            if table_name.startswith("sqlite_") or table_name in self.exclude:
                continue
            if sql.startswith("CREATE TABLE"):
                sql = sql.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS")
                # Make SQL commands in 1 line
                sql = sql.replace("\n    ", "")
                sql = sql.replace("\n)", ")")
            fileobj.write(f"{sql};\n".encode())

            table_name_ident = table_name.replace('"', '""')
            res = cursor.execute(f'PRAGMA table_info("{table_name_ident}")')
            column_names = [str(table_info[1]) for table_info in res.fetchall()]
            q = """SELECT 'INSERT INTO "{0}" VALUES({1})' FROM "{0}";\n""".format(
                table_name_ident,
                ",".join(
                    """'||quote("{}")||'""".format(col.replace('"', '""'))
                    for col in column_names
                ),
            )
            query_res = cursor.execute(q)
            for row in query_res:
                fileobj.write(f"{row[0]};\n".encode())
            schema_res = cursor.execute(DUMP_ETC)
            for name, _, sql in schema_res.fetchall():
                if sql.startswith("CREATE INDEX"):
                    sql = sql.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS")
                fileobj.write(f"{sql};\n".encode())
        cursor.close()

    def create_dump(self):
        if not self.connection.is_usable():
            self.connection.connect()
        dump_file = SpooledTemporaryFile(max_size=10 * 1024 * 1024)
        self._write_dump(dump_file)
        dump_file.seek(0)
        return dump_file

    def restore_dump(self, dump):
        if not self.connection.is_usable():
            self.connection.connect()
        cursor = self.connection.cursor()
        sql_command = b""
        sql_is_complete = True
        for line in dump.readlines():
            sql_command = sql_command + line
            line_str = line.decode("UTF-8")
            if line_str.startswith("INSERT") and not line_str.endswith(");\n"):
                sql_is_complete = False
                continue
            if not sql_is_complete and line_str.endswith(");\n"):
                sql_is_complete = True

            if sql_is_complete:
                try:
                    cursor.execute(sql_command.decode("UTF-8"))
                except (OperationalError, IntegrityError) as err:
                    warnings.warn(f"Error in db restore: {err}")
                sql_command = b""


class SqliteCPConnector(BaseDBConnector):
    """
    Create a dump by copy the binary data file.
    Restore by simply copy to the good location.
    """

    def create_dump(self):
        path = self.connection.settings_dict["NAME"]
        dump = BytesIO()
        with open(path, "rb") as db_file:
            copyfileobj(db_file, dump)
        dump.seek(0)
        return dump

    def restore_dump(self, dump):
        path = self.connection.settings_dict["NAME"]
        with open(path, "wb") as db_file:
            copyfileobj(dump, db_file)