File: sqlite_utils_plugin.py

package info (click to toggle)
python-sqlite-migrate 0.1~beta0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 144 kB
  • sloc: python: 371; makefile: 3
file content (118 lines) | stat: -rw-r--r-- 4,199 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import click
import difflib
import pathlib
import sqlite_utils
from sqlite_migrate import Migrations
import textwrap


@sqlite_utils.hookimpl
def register_commands(cli):
    @cli.command()
    @click.argument(
        "db_path", type=click.Path(dir_okay=False, readable=True, writable=True)
    )
    @click.argument("migrations", type=click.Path(dir_okay=True, exists=True), nargs=-1)
    @click.option("--stop-before", help="Stop before applying this migration")
    @click.option(
        "list_", "--list", is_flag=True, help="List migrations without running them"
    )
    @click.option("-v", "--verbose", is_flag=True, help="Show verbose output")
    def migrate(db_path, migrations, stop_before, list_, verbose):
        """
        Apply pending database migrations.

        Usage:

            sqlite-utils migrate database.db

        This will find the migrations.py file in the current directory
        or subdirectories and apply any pending migrations.

        Or pass paths to one or more migrations.py files directly:

            sqlite-utils migrate database.db path/to/migrations.py

        Pass --list to see a list of applied and pending migrations
        without applying them.
        """
        if not migrations:
            # Scan current directory for migrations.py files
            migrations = [pathlib.Path(".").resolve()]
        files = set()
        for path_str in migrations:
            path = pathlib.Path(path_str)
            if path.is_dir():
                files.update(path.rglob("migrations.py"))
            else:
                files.add(path)
        migration_sets = []
        for filepath in files:
            code = filepath.read_text()
            namespace = {}
            exec(code, namespace)
            # Find all instances of Migrations
            for obj in namespace.values():
                if isinstance(obj, Migrations):
                    migration_sets.append(obj)
        if not migration_sets:
            raise click.ClickException("No migrations.py files found")

        if stop_before and len(migration_sets) > 1:
            raise click.ClickException(
                "--stop-before can only be used with a single migrations.py file"
            )

        db = sqlite_utils.Database(db_path)

        if list_:
            display_list(db, migration_sets)
            return

        prev_schema = db.schema
        if verbose:
            click.echo("Migrating {}".format(db_path))
            click.echo("\nSchema before:\n")
            click.echo(textwrap.indent(prev_schema, "  ") or "  (empty)")
            click.echo()
        for migration_set in migration_sets:
            migration_set.apply(db, stop_before=stop_before)
        if verbose:
            click.echo("Schema after:\n")
            post_schema = db.schema
            if post_schema == prev_schema:
                click.echo("  (unchanged)")
            else:
                click.echo(textwrap.indent(post_schema, "  "))
                click.echo("\nSchema diff:\n")
                # Calculate and display a diff
                diff = list(
                    difflib.unified_diff(
                        prev_schema.splitlines(), post_schema.splitlines()
                    )
                )
                # Skipping the first two lines since they only make
                # sense if we provided filenames, and the next one
                # because it is just @@ -0,0 +1,15 @@
                click.echo("\n".join(diff[3:]))


def display_list(db, migration_sets):
    applied = set()
    for migration_set in migration_sets:
        print("Migrations for: {}".format(migration_set.name))
        print()
        print("  Applied:")
        for migration in migration_set.applied(db):
            print("    {} - {}".format(migration.name, migration.applied_at))
            applied.add(migration.name)
        print()
        print("  Pending:")
        output = False
        for migration in migration_set.pending(db):
            output = True
            if migration.name not in applied:
                print("    {}".format(migration.name))
        if not output:
            print("    (none)")
        print()