File: utils.py

package info (click to toggle)
django-dbbackup 4.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 512 kB
  • sloc: python: 3,767; makefile: 7
file content (122 lines) | stat: -rw-r--r-- 3,534 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import contextlib
import os
import subprocess

from django.conf import settings
from django.core.files import File
from django.core.files.storage import Storage
from django.utils import timezone

from dbbackup.db.base import get_connector

BASE_FILE = os.path.join(settings.BLOB_DIR, "test.txt")
ENCRYPTED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gpg")
COMPRESSED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gz")
TARED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.tar")
ENCRYPTED_COMPRESSED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gz.gpg")
TEST_DATABASE = {
    "ENGINE": "django.db.backends.sqlite3",
    "NAME": "/tmp/foo.db",
    "USER": "foo",
    "PASSWORD": "bar",
    "HOST": "foo",
    "PORT": 122,
}
TEST_MONGODB = {
    "ENGINE": "django_mongodb_engine",
    "NAME": "mongo_test",
    "USER": "foo",
    "PASSWORD": "bar",
    "HOST": "foo",
    "PORT": 122,
}
TEST_DATABASE = settings.DATABASES["default"]

GPG_PRIVATE_PATH = os.path.join(settings.BLOB_DIR, "gpg/secring.gpg")
GPG_PUBLIC_PATH = os.path.join(settings.BLOB_DIR, "gpg/pubring.gpg")
GPG_FINGERPRINT = "7438 8D4E 02AF C011 4E2F  1E79 F7D1 BBF0 1F63 FDE9"
DEV_NULL = open(os.devnull, "w")


class handled_files(dict):
    """
    Dict for gather information about fake storage and clean between tests.
    You should use the constant instance ``HANDLED_FILES`` and clean it
    before tests.
    """

    def __init__(self):
        super().__init__()
        self.clean()

    def clean(self):
        self["written_files"] = []
        self["deleted_files"] = []


HANDLED_FILES = handled_files()


class FakeStorage(Storage):
    name = "FakeStorage"

    def exists(self, name):
        return name in HANDLED_FILES["written_files"]

    def get_available_name(self, name, max_length=None):
        return name[:max_length]

    def get_valid_name(self, name):
        return name

    def listdir(self, path):
        return ([], [f[0] for f in HANDLED_FILES["written_files"]])

    def accessed_time(self, name):
        return timezone.now()

    created_time = modified_time = accessed_time

    def _open(self, name, mode="rb"):
        file_ = [f[1] for f in HANDLED_FILES["written_files"] if f[0] == name][0]
        file_.seek(0)
        return file_

    def _save(self, name, content):
        HANDLED_FILES["written_files"].append((name, File(content)))
        return name

    def delete(self, name):
        HANDLED_FILES["deleted_files"].append(name)


def clean_gpg_keys():
    with contextlib.suppress(Exception):
        cmd = "gpg --batch --yes --delete-key '%s'" % GPG_FINGERPRINT
        subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
    with contextlib.suppress(Exception):
        cmd = "gpg --batch --yes --delete-secrect-key '%s'" % GPG_FINGERPRINT
        subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)


def add_private_gpg():
    cmd = f"gpg --import {GPG_PRIVATE_PATH}".split()
    subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)


def add_public_gpg():
    cmd = f"gpg --import {GPG_PUBLIC_PATH}".split()
    subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)


def callable_for_filename_template(datetime, **kwargs):
    return f"{datetime}_foo"


def get_dump(database=TEST_DATABASE):
    return get_connector().create_dump()


def get_dump_name(database=None):
    database = database or TEST_DATABASE
    return get_connector().generate_filename()