File: fixture_tables.py

package info (click to toggle)
django-nose 1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 340 kB
  • sloc: python: 567; sh: 29; makefile: 14
file content (155 lines) | stat: -rw-r--r-- 6,385 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""A copy of Django 1.3.0's stock loaddata.py, adapted so that, instead of
loading any data, it returns the tables referenced by a set of fixtures so we
can truncate them (and no others) quickly after we're finished with them."""

import os
import gzip
import zipfile

from django.conf import settings
from django.core import serializers
from django.db import router, DEFAULT_DB_ALIAS
from django.db.models import get_apps
from django.utils.itercompat import product

try:
    import bz2
    has_bz2 = True
except ImportError:
    has_bz2 = False


def tables_used_by_fixtures(fixture_labels, using=DEFAULT_DB_ALIAS):
    """Act like Django's stock loaddata command, but, instead of loading data,
    return an iterable of the names of the tables into which data would be
    loaded."""
    # Keep a count of the installed objects and fixtures
    fixture_count = 0
    loaded_object_count = 0
    fixture_object_count = 0
    tables = set()

    class SingleZipReader(zipfile.ZipFile):
        def __init__(self, *args, **kwargs):
            zipfile.ZipFile.__init__(self, *args, **kwargs)
            if settings.DEBUG:
                assert len(self.namelist()) == 1, "Zip-compressed fixtures must contain only one file."
        def read(self):
            return zipfile.ZipFile.read(self, self.namelist()[0])

    compression_types = {
        None:   file,
        'gz':   gzip.GzipFile,
        'zip':  SingleZipReader
    }
    if has_bz2:
        compression_types['bz2'] = bz2.BZ2File

    app_module_paths = []
    for app in get_apps():
        if hasattr(app, '__path__'):
            # It's a 'models/' subpackage
            for path in app.__path__:
                app_module_paths.append(path)
        else:
            # It's a models.py module
            app_module_paths.append(app.__file__)

    app_fixtures = [os.path.join(os.path.dirname(path), 'fixtures') for path in app_module_paths]
    for fixture_label in fixture_labels:
        parts = fixture_label.split('.')

        if len(parts) > 1 and parts[-1] in compression_types:
            compression_formats = [parts[-1]]
            parts = parts[:-1]
        else:
            compression_formats = compression_types.keys()

        if len(parts) == 1:
            fixture_name = parts[0]
            formats = serializers.get_public_serializer_formats()
        else:
            fixture_name, format = '.'.join(parts[:-1]), parts[-1]
            if format in serializers.get_public_serializer_formats():
                formats = [format]
            else:
                formats = []

        if not formats:
            # stderr.write(style.ERROR("Problem installing fixture '%s': %s is
            # not a known serialization format.\n" % (fixture_name, format)))
            return set()

        if os.path.isabs(fixture_name):
            fixture_dirs = [fixture_name]
        else:
            fixture_dirs = app_fixtures + list(settings.FIXTURE_DIRS) + ['']

        for fixture_dir in fixture_dirs:
            # stdout.write("Checking %s for fixtures...\n" %
            # humanize(fixture_dir))

            label_found = False
            for combo in product([using, None], formats, compression_formats):
                database, format, compression_format = combo
                file_name = '.'.join(
                    p for p in [
                        fixture_name, database, format, compression_format
                    ]
                    if p
                )

                # stdout.write("Trying %s for %s fixture '%s'...\n" % \
                # (humanize(fixture_dir), file_name, fixture_name))
                full_path = os.path.join(fixture_dir, file_name)
                open_method = compression_types[compression_format]
                try:
                    fixture = open_method(full_path, 'r')
                    if label_found:
                        fixture.close()
                        # stderr.write(style.ERROR("Multiple fixtures named
                        # '%s' in %s. Aborting.\n" % (fixture_name,
                        # humanize(fixture_dir))))
                        return set()
                    else:
                        fixture_count += 1
                        objects_in_fixture = 0
                        loaded_objects_in_fixture = 0
                        # stdout.write("Installing %s fixture '%s' from %s.\n"
                        # % (format, fixture_name, humanize(fixture_dir)))
                        try:
                            objects = serializers.deserialize(format, fixture, using=using)
                            for obj in objects:
                                objects_in_fixture += 1
                                if router.allow_syncdb(using, obj.object.__class__):
                                    loaded_objects_in_fixture += 1
                                    tables.add(
                                        obj.object.__class__._meta.db_table)
                            loaded_object_count += loaded_objects_in_fixture
                            fixture_object_count += objects_in_fixture
                            label_found = True
                        except (SystemExit, KeyboardInterrupt):
                            raise
                        except Exception:
                            fixture.close()
                            # stderr.write( style.ERROR("Problem installing
                            # fixture '%s': %s\n" % (full_path, ''.join(tra
                            # ceback.format_exception(sys.exc_type,
                            # sys.exc_value, sys.exc_traceback)))))
                            return set()
                        fixture.close()

                        # If the fixture we loaded contains 0 objects, assume that an
                        # error was encountered during fixture loading.
                        if objects_in_fixture == 0:
                            # stderr.write( style.ERROR("No fixture data found
                            # for '%s'. (File format may be invalid.)\n" %
                            # (fixture_name)))
                            return set()

                except Exception:
                    # stdout.write("No %s fixture '%s' in %s.\n" % \ (format,
                    # fixture_name, humanize(fixture_dir)))
                    pass

    return tables