1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
import contextlib
import os
import subprocess
from django.conf import settings
from django.core.files import File
from django.core.files.storage import Storage
from django.utils import timezone
from dbbackup.db.base import get_connector
BASE_FILE = os.path.join(settings.BLOB_DIR, "test.txt")
ENCRYPTED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gpg")
COMPRESSED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gz")
TARED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.tar")
ENCRYPTED_COMPRESSED_FILE = os.path.join(settings.BLOB_DIR, "test.txt.gz.gpg")
TEST_DATABASE = {
"ENGINE": "django.db.backends.sqlite3",
"NAME": "/tmp/foo.db",
"USER": "foo",
"PASSWORD": "bar",
"HOST": "foo",
"PORT": 122,
}
TEST_MONGODB = {
"ENGINE": "django_mongodb_engine",
"NAME": "mongo_test",
"USER": "foo",
"PASSWORD": "bar",
"HOST": "foo",
"PORT": 122,
}
TEST_DATABASE = settings.DATABASES["default"]
GPG_PRIVATE_PATH = os.path.join(settings.BLOB_DIR, "gpg/secring.gpg")
GPG_PUBLIC_PATH = os.path.join(settings.BLOB_DIR, "gpg/pubring.gpg")
GPG_FINGERPRINT = "7438 8D4E 02AF C011 4E2F 1E79 F7D1 BBF0 1F63 FDE9"
DEV_NULL = open(os.devnull, "w")
class handled_files(dict):
"""
Dict for gather information about fake storage and clean between tests.
You should use the constant instance ``HANDLED_FILES`` and clean it
before tests.
"""
def __init__(self):
super().__init__()
self.clean()
def clean(self):
self["written_files"] = []
self["deleted_files"] = []
HANDLED_FILES = handled_files()
class FakeStorage(Storage):
name = "FakeStorage"
def exists(self, name):
return name in HANDLED_FILES["written_files"]
def get_available_name(self, name, max_length=None):
return name[:max_length]
def get_valid_name(self, name):
return name
def listdir(self, path):
return ([], [f[0] for f in HANDLED_FILES["written_files"]])
def accessed_time(self, name):
return timezone.now()
created_time = modified_time = accessed_time
def _open(self, name, mode="rb"):
file_ = [f[1] for f in HANDLED_FILES["written_files"] if f[0] == name][0]
file_.seek(0)
return file_
def _save(self, name, content):
HANDLED_FILES["written_files"].append((name, File(content)))
return name
def delete(self, name):
HANDLED_FILES["deleted_files"].append(name)
def clean_gpg_keys():
with contextlib.suppress(Exception):
cmd = "gpg --batch --yes --delete-key '%s'" % GPG_FINGERPRINT
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
with contextlib.suppress(Exception):
cmd = "gpg --batch --yes --delete-secrect-key '%s'" % GPG_FINGERPRINT
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
def add_private_gpg():
cmd = f"gpg --import {GPG_PRIVATE_PATH}".split()
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
def add_public_gpg():
cmd = f"gpg --import {GPG_PUBLIC_PATH}".split()
subprocess.call(cmd, stdout=DEV_NULL, stderr=DEV_NULL)
def callable_for_filename_template(datetime, **kwargs):
return f"{datetime}_foo"
def get_dump(database=TEST_DATABASE):
return get_connector().create_dump()
def get_dump_name(database=None):
database = database or TEST_DATABASE
return get_connector().generate_filename()
|