1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.core.management.base import BaseCommand, CommandError
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.migrations.loader import MigrationLoader
class Command(BaseCommand):
help = "Shows all available migrations for the current project"
def add_arguments(self, parser):
parser.add_argument(
'app_label', nargs='*',
help='App labels of applications to limit the output to.',
)
parser.add_argument(
'--database', action='store', dest='database', default=DEFAULT_DB_ALIAS,
help='Nominates a database to synchronize. Defaults to the "default" database.',
)
formats = parser.add_mutually_exclusive_group()
formats.add_argument(
'--list', '-l', action='store_const', dest='format', const='list',
help='Shows a list of all migrations and which are applied.',
)
formats.add_argument(
'--plan', '-p', action='store_const', dest='format', const='plan',
help=(
'Shows all migrations in the order they will be applied. '
'With a verbosity level of 2 or above all direct migration dependencies '
'and reverse dependencies (run_before) will be included.'
)
)
parser.set_defaults(format='list')
def handle(self, *args, **options):
self.verbosity = options['verbosity']
# Get the database we're operating from
db = options['database']
connection = connections[db]
if options['format'] == "plan":
return self.show_plan(connection, options['app_label'])
else:
return self.show_list(connection, options['app_label'])
def _validate_app_names(self, loader, app_names):
invalid_apps = []
for app_name in app_names:
if app_name not in loader.migrated_apps:
invalid_apps.append(app_name)
if invalid_apps:
raise CommandError('No migrations present for: %s' % (', '.join(sorted(invalid_apps))))
def show_list(self, connection, app_names=None):
"""
Shows a list of all migrations on the system, or only those of
some named apps.
"""
# Load migrations from disk/DB
loader = MigrationLoader(connection, ignore_no_migrations=True)
graph = loader.graph
# If we were passed a list of apps, validate it
if app_names:
self._validate_app_names(loader, app_names)
# Otherwise, show all apps in alphabetic order
else:
app_names = sorted(loader.migrated_apps)
# For each app, print its migrations in order from oldest (roots) to
# newest (leaves).
for app_name in app_names:
self.stdout.write(app_name, self.style.MIGRATE_LABEL)
shown = set()
for node in graph.leaf_nodes(app_name):
for plan_node in graph.forwards_plan(node):
if plan_node not in shown and plan_node[0] == app_name:
# Give it a nice title if it's a squashed one
title = plan_node[1]
if graph.nodes[plan_node].replaces:
title += " (%s squashed migrations)" % len(graph.nodes[plan_node].replaces)
# Mark it as applied/unapplied
if plan_node in loader.applied_migrations:
self.stdout.write(" [X] %s" % title)
else:
self.stdout.write(" [ ] %s" % title)
shown.add(plan_node)
# If we didn't print anything, then a small message
if not shown:
self.stdout.write(" (no migrations)", self.style.ERROR)
def show_plan(self, connection, app_names=None):
"""
Shows all known migrations (or only those of the specified app_names)
in the order they will be applied.
"""
# Load migrations from disk/DB
loader = MigrationLoader(connection)
graph = loader.graph
if app_names:
self._validate_app_names(loader, app_names)
targets = [key for key in graph.leaf_nodes() if key[0] in app_names]
else:
targets = graph.leaf_nodes()
plan = []
seen = set()
# Generate the plan
for target in targets:
for migration in graph.forwards_plan(target):
if migration not in seen:
node = graph.node_map[migration]
plan.append(node)
seen.add(migration)
# Output
def print_deps(node):
out = []
for parent in sorted(node.parents):
out.append("%s.%s" % parent.key)
if out:
return " ... (%s)" % ", ".join(out)
return ""
for node in plan:
deps = ""
if self.verbosity >= 2:
deps = print_deps(node)
if node.key in loader.applied_migrations:
self.stdout.write("[X] %s.%s%s" % (node.key[0], node.key[1], deps))
else:
self.stdout.write("[ ] %s.%s%s" % (node.key[0], node.key[1], deps))
|