1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
import click
import difflib
import pathlib
import sqlite_utils
from sqlite_migrate import Migrations
import textwrap
@sqlite_utils.hookimpl
def register_commands(cli):
@cli.command()
@click.argument(
"db_path", type=click.Path(dir_okay=False, readable=True, writable=True)
)
@click.argument("migrations", type=click.Path(dir_okay=True, exists=True), nargs=-1)
@click.option("--stop-before", help="Stop before applying this migration")
@click.option(
"list_", "--list", is_flag=True, help="List migrations without running them"
)
@click.option("-v", "--verbose", is_flag=True, help="Show verbose output")
def migrate(db_path, migrations, stop_before, list_, verbose):
"""
Apply pending database migrations.
Usage:
sqlite-utils migrate database.db
This will find the migrations.py file in the current directory
or subdirectories and apply any pending migrations.
Or pass paths to one or more migrations.py files directly:
sqlite-utils migrate database.db path/to/migrations.py
Pass --list to see a list of applied and pending migrations
without applying them.
"""
if not migrations:
# Scan current directory for migrations.py files
migrations = [pathlib.Path(".").resolve()]
files = set()
for path_str in migrations:
path = pathlib.Path(path_str)
if path.is_dir():
files.update(path.rglob("migrations.py"))
else:
files.add(path)
migration_sets = []
for filepath in files:
code = filepath.read_text()
namespace = {}
exec(code, namespace)
# Find all instances of Migrations
for obj in namespace.values():
if isinstance(obj, Migrations):
migration_sets.append(obj)
if not migration_sets:
raise click.ClickException("No migrations.py files found")
if stop_before and len(migration_sets) > 1:
raise click.ClickException(
"--stop-before can only be used with a single migrations.py file"
)
db = sqlite_utils.Database(db_path)
if list_:
display_list(db, migration_sets)
return
prev_schema = db.schema
if verbose:
click.echo("Migrating {}".format(db_path))
click.echo("\nSchema before:\n")
click.echo(textwrap.indent(prev_schema, " ") or " (empty)")
click.echo()
for migration_set in migration_sets:
migration_set.apply(db, stop_before=stop_before)
if verbose:
click.echo("Schema after:\n")
post_schema = db.schema
if post_schema == prev_schema:
click.echo(" (unchanged)")
else:
click.echo(textwrap.indent(post_schema, " "))
click.echo("\nSchema diff:\n")
# Calculate and display a diff
diff = list(
difflib.unified_diff(
prev_schema.splitlines(), post_schema.splitlines()
)
)
# Skipping the first two lines since they only make
# sense if we provided filenames, and the next one
# because it is just @@ -0,0 +1,15 @@
click.echo("\n".join(diff[3:]))
def display_list(db, migration_sets):
applied = set()
for migration_set in migration_sets:
print("Migrations for: {}".format(migration_set.name))
print()
print(" Applied:")
for migration in migration_set.applied(db):
print(" {} - {}".format(migration.name, migration.applied_at))
applied.add(migration.name)
print()
print(" Pending:")
output = False
for migration in migration_set.pending(db):
output = True
if migration.name not in applied:
print(" {}".format(migration.name))
if not output:
print(" (none)")
print()
|