from sqlite_utils import cli, Database
from sqlite_utils.db import Index, ForeignKey
from click.testing import CliRunner
from pathlib import Path
import subprocess
import sys
from unittest import mock
import json
import os
import pytest
import textwrap


def write_json(file_path, data):
    with open(file_path, "w") as fp:
        json.dump(data, fp)


def _supports_pragma_function_list():
    db = Database(memory=True)
    try:
        db.execute("select * from pragma_function_list()")
    except Exception:
        return False
    return True


def _has_compiled_ext():
    for ext in ["dylib", "so", "dll"]:
        path = Path(__file__).parent / f"ext.{ext}"
        if path.is_file():
            return True
    return False


COMPILED_EXTENSION_PATH = str(Path(__file__).parent / "ext")


@pytest.mark.parametrize(
    "options",
    (
        ["-h"],
        ["--help"],
        ["insert", "-h"],
        ["insert", "--help"],
    ),
)
def test_help(options):
    result = CliRunner().invoke(cli.cli, options)
    assert result.exit_code == 0
    assert result.output.startswith("Usage: ")
    assert "-h, --help" in result.output


def test_tables(db_path):
    result = CliRunner().invoke(cli.cli, ["tables", db_path], catch_exceptions=False)
    assert '[{"table": "Gosh"},\n {"table": "Gosh2"}]' == result.output.strip()


def test_views(db_path):
    Database(db_path).create_view("hello", "select sqlite_version()")
    result = CliRunner().invoke(cli.cli, ["views", db_path, "--table", "--schema"])
    assert (
        "view    schema\n"
        "------  --------------------------------------------\n"
        "hello   CREATE VIEW hello AS select sqlite_version()"
    ) == result.output.strip()


def test_tables_fts4(db_path):
    Database(db_path)["Gosh"].enable_fts(["c2"], fts_version="FTS4")
    result = CliRunner().invoke(cli.cli, ["tables", "--fts4", db_path])
    assert '[{"table": "Gosh_fts"}]' == result.output.strip()


def test_tables_fts5(db_path):
    Database(db_path)["Gosh"].enable_fts(["c2"], fts_version="FTS5")
    result = CliRunner().invoke(cli.cli, ["tables", "--fts5", db_path])
    assert '[{"table": "Gosh_fts"}]' == result.output.strip()


def test_tables_counts_and_columns(db_path):
    db = Database(db_path)
    with db.conn:
        db["lots"].insert_all([{"id": i, "age": i + 1} for i in range(30)])
    result = CliRunner().invoke(cli.cli, ["tables", "--counts", "--columns", db_path])
    assert (
        '[{"table": "Gosh", "count": 0, "columns": ["c1", "c2", "c3"]},\n'
        ' {"table": "Gosh2", "count": 0, "columns": ["c1", "c2", "c3"]},\n'
        ' {"table": "lots", "count": 30, "columns": ["id", "age"]}]'
    ) == result.output.strip()


@pytest.mark.parametrize(
    "format,expected",
    [
        (
            "--csv",
            (
                "table,count,columns\n"
                'Gosh,0,"c1\n'
                "c2\n"
                'c3"\n'
                'Gosh2,0,"c1\n'
                "c2\n"
                'c3"\n'
                'lots,30,"id\n'
                'age"'
            ),
        ),
        (
            "--tsv",
            "table\tcount\tcolumns\nGosh\t0\t['c1', 'c2', 'c3']\nGosh2\t0\t['c1', 'c2', 'c3']\nlots\t30\t['id', 'age']",
        ),
    ],
)
def test_tables_counts_and_columns_csv(db_path, format, expected):
    db = Database(db_path)
    with db.conn:
        db["lots"].insert_all([{"id": i, "age": i + 1} for i in range(30)])
    result = CliRunner().invoke(
        cli.cli, ["tables", "--counts", "--columns", format, db_path]
    )
    assert result.output.strip().replace("\r", "") == expected


def test_tables_schema(db_path):
    db = Database(db_path)
    with db.conn:
        db["lots"].insert_all([{"id": i, "age": i + 1} for i in range(30)])
    result = CliRunner().invoke(cli.cli, ["tables", "--schema", db_path])
    assert (
        '[{"table": "Gosh", "schema": "CREATE TABLE Gosh (c1 text, c2 text, c3 text)"},\n'
        ' {"table": "Gosh2", "schema": "CREATE TABLE Gosh2 (c1 text, c2 text, c3 text)"},\n'
        ' {"table": "lots", "schema": "CREATE TABLE [lots] (\\n   [id] INTEGER,\\n   [age] INTEGER\\n)"}]'
    ) == result.output.strip()


@pytest.mark.parametrize(
    "options,expected",
    [
        (
            ["--fmt", "simple"],
            (
                "c1     c2     c3\n"
                "-----  -----  ----------\n"
                "verb0  noun0  adjective0\n"
                "verb1  noun1  adjective1\n"
                "verb2  noun2  adjective2\n"
                "verb3  noun3  adjective3"
            ),
        ),
        (
            ["-t"],
            (
                "c1     c2     c3\n"
                "-----  -----  ----------\n"
                "verb0  noun0  adjective0\n"
                "verb1  noun1  adjective1\n"
                "verb2  noun2  adjective2\n"
                "verb3  noun3  adjective3"
            ),
        ),
        (
            ["--fmt", "rst"],
            (
                "=====  =====  ==========\n"
                "c1     c2     c3\n"
                "=====  =====  ==========\n"
                "verb0  noun0  adjective0\n"
                "verb1  noun1  adjective1\n"
                "verb2  noun2  adjective2\n"
                "verb3  noun3  adjective3\n"
                "=====  =====  =========="
            ),
        ),
    ],
)
def test_output_table(db_path, options, expected):
    db = Database(db_path)
    with db.conn:
        db["rows"].insert_all(
            [
                {
                    "c1": "verb{}".format(i),
                    "c2": "noun{}".format(i),
                    "c3": "adjective{}".format(i),
                }
                for i in range(4)
            ]
        )
    result = CliRunner().invoke(cli.cli, ["rows", db_path, "rows"] + options)
    assert result.exit_code == 0
    assert expected == result.output.strip()


def test_create_index(db_path):
    db = Database(db_path)
    assert [] == db["Gosh"].indexes
    result = CliRunner().invoke(cli.cli, ["create-index", db_path, "Gosh", "c1"])
    assert result.exit_code == 0
    assert [
        Index(
            seq=0, name="idx_Gosh_c1", unique=0, origin="c", partial=0, columns=["c1"]
        )
    ] == db["Gosh"].indexes
    # Try with a custom name
    result = CliRunner().invoke(
        cli.cli, ["create-index", db_path, "Gosh", "c2", "--name", "blah"]
    )
    assert result.exit_code == 0
    assert [
        Index(seq=0, name="blah", unique=0, origin="c", partial=0, columns=["c2"]),
        Index(
            seq=1, name="idx_Gosh_c1", unique=0, origin="c", partial=0, columns=["c1"]
        ),
    ] == db["Gosh"].indexes
    # Try a two-column unique index
    create_index_unique_args = [
        "create-index",
        db_path,
        "Gosh2",
        "c1",
        "c2",
        "--unique",
    ]
    result = CliRunner().invoke(cli.cli, create_index_unique_args)
    assert result.exit_code == 0
    assert [
        Index(
            seq=0,
            name="idx_Gosh2_c1_c2",
            unique=1,
            origin="c",
            partial=0,
            columns=["c1", "c2"],
        )
    ] == db["Gosh2"].indexes
    # Trying to create the same index should fail
    assert CliRunner().invoke(cli.cli, create_index_unique_args).exit_code != 0
    # ... unless we use --if-not-exists or --ignore
    for option in ("--if-not-exists", "--ignore"):
        assert (
            CliRunner().invoke(cli.cli, create_index_unique_args + [option]).exit_code
            == 0
        )


def test_create_index_analyze(db_path):
    db = Database(db_path)
    assert "sqlite_stat1" not in db.table_names()
    assert [] == db["Gosh"].indexes
    result = CliRunner().invoke(
        cli.cli, ["create-index", db_path, "Gosh", "c1", "--analyze"]
    )
    assert result.exit_code == 0
    assert "sqlite_stat1" in db.table_names()


def test_create_index_desc(db_path):
    db = Database(db_path)
    assert [] == db["Gosh"].indexes
    result = CliRunner().invoke(cli.cli, ["create-index", db_path, "Gosh", "--", "-c1"])
    assert result.exit_code == 0
    assert (
        db.execute("select sql from sqlite_master where type='index'").fetchone()[0]
        == "CREATE INDEX [idx_Gosh_c1]\n    ON [Gosh] ([c1] desc)"
    )


@pytest.mark.parametrize(
    "col_name,col_type,expected_schema",
    (
        ("text", "TEXT", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [text] TEXT)"),
        ("text", "str", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [text] TEXT)"),
        ("text", "STR", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [text] TEXT)"),
        (
            "integer",
            "INTEGER",
            "CREATE TABLE [dogs] (\n   [name] TEXT\n, [integer] INTEGER)",
        ),
        (
            "integer",
            "int",
            "CREATE TABLE [dogs] (\n   [name] TEXT\n, [integer] INTEGER)",
        ),
        ("float", "FLOAT", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [float] FLOAT)"),
        ("blob", "blob", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [blob] BLOB)"),
        ("blob", "BLOB", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [blob] BLOB)"),
        ("blob", "bytes", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [blob] BLOB)"),
        ("blob", "BYTES", "CREATE TABLE [dogs] (\n   [name] TEXT\n, [blob] BLOB)"),
        ("default", None, "CREATE TABLE [dogs] (\n   [name] TEXT\n, [default] TEXT)"),
    ),
)
def test_add_column(db_path, col_name, col_type, expected_schema):
    db = Database(db_path)
    db.create_table("dogs", {"name": str})
    assert db["dogs"].schema == "CREATE TABLE [dogs] (\n   [name] TEXT\n)"
    args = ["add-column", db_path, "dogs", col_name]
    if col_type is not None:
        args.append(col_type)
    assert CliRunner().invoke(cli.cli, args).exit_code == 0
    assert db["dogs"].schema == expected_schema


@pytest.mark.parametrize("ignore", (True, False))
def test_add_column_ignore(db_path, ignore):
    db = Database(db_path)
    db.create_table("dogs", {"name": str})
    args = ["add-column", db_path, "dogs", "name"] + (["--ignore"] if ignore else [])
    result = CliRunner().invoke(cli.cli, args)
    if ignore:
        assert result.exit_code == 0
    else:
        assert result.exit_code == 1
        assert result.output == "Error: duplicate column name: name\n"


def test_add_column_not_null_default(db_path):
    db = Database(db_path)
    db.create_table("dogs", {"name": str})
    assert db["dogs"].schema == "CREATE TABLE [dogs] (\n   [name] TEXT\n)"
    args = [
        "add-column",
        db_path,
        "dogs",
        "nickname",
        "--not-null-default",
        "dogs'dawg",
    ]
    assert CliRunner().invoke(cli.cli, args).exit_code == 0
    assert db["dogs"].schema == (
        "CREATE TABLE [dogs] (\n"
        "   [name] TEXT\n"
        ", [nickname] TEXT NOT NULL DEFAULT 'dogs''dawg')"
    )


@pytest.mark.parametrize(
    "args,assert_message",
    (
        (
            ["books", "author_id", "authors", "id"],
            "Explicit other_table and other_column",
        ),
        (["books", "author_id", "authors"], "Explicit other_table, guess other_column"),
        (["books", "author_id"], "Automatically guess other_table and other_column"),
    ),
)
def test_add_foreign_key(db_path, args, assert_message):
    db = Database(db_path)
    db["authors"].insert_all(
        [{"id": 1, "name": "Sally"}, {"id": 2, "name": "Asheesh"}], pk="id"
    )
    db["books"].insert_all(
        [
            {"title": "Hedgehogs of the world", "author_id": 1},
            {"title": "How to train your wolf", "author_id": 2},
        ]
    )
    assert (
        CliRunner().invoke(cli.cli, ["add-foreign-key", db_path] + args).exit_code == 0
    ), assert_message
    assert [
        ForeignKey(
            table="books", column="author_id", other_table="authors", other_column="id"
        )
    ] == db["books"].foreign_keys

    # Error if we try to add it twice:
    result = CliRunner().invoke(
        cli.cli, ["add-foreign-key", db_path, "books", "author_id", "authors", "id"]
    )
    assert result.exit_code != 0
    assert (
        "Error: Foreign key already exists for author_id => authors.id"
        == result.output.strip()
    )

    # No error if we add it twice with --ignore
    result = CliRunner().invoke(
        cli.cli,
        ["add-foreign-key", db_path, "books", "author_id", "authors", "id", "--ignore"],
    )
    assert result.exit_code == 0

    # Error if we try against an invalid column
    result = CliRunner().invoke(
        cli.cli, ["add-foreign-key", db_path, "books", "author_id", "authors", "bad"]
    )
    assert result.exit_code != 0
    assert "Error: No such column: authors.bad" == result.output.strip()


def test_add_column_foreign_key(db_path):
    db = Database(db_path)
    db["authors"].insert({"id": 1, "name": "Sally"}, pk="id")
    db["books"].insert({"title": "Hedgehogs of the world"})
    # Add an author_id foreign key column to the books table
    result = CliRunner().invoke(
        cli.cli, ["add-column", db_path, "books", "author_id", "--fk", "authors"]
    )
    assert result.exit_code == 0, result.output
    assert db["books"].schema == (
        'CREATE TABLE "books" (\n'
        "   [title] TEXT,\n"
        "   [author_id] INTEGER REFERENCES [authors]([id])\n"
        ")"
    )
    # Try it again with a custom --fk-col
    result = CliRunner().invoke(
        cli.cli,
        [
            "add-column",
            db_path,
            "books",
            "author_name_ref",
            "--fk",
            "authors",
            "--fk-col",
            "name",
        ],
    )
    assert result.exit_code == 0, result.output
    assert db["books"].schema == (
        'CREATE TABLE "books" (\n'
        "   [title] TEXT,\n"
        "   [author_id] INTEGER REFERENCES [authors]([id]),\n"
        "   [author_name_ref] TEXT REFERENCES [authors]([name])\n"
        ")"
    )
    # Throw an error if the --fk table does not exist
    result = CliRunner().invoke(
        cli.cli, ["add-column", db_path, "books", "author_id", "--fk", "bobcats"]
    )
    assert result.exit_code != 0
    assert "table 'bobcats' does not exist" in str(result.exception)


def test_suggest_alter_if_column_missing(db_path):
    db = Database(db_path)
    db["authors"].insert({"id": 1, "name": "Sally"}, pk="id")
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "authors", "-"],
        input='{"id": 2, "name": "Barry", "age": 43}',
    )
    assert result.exit_code != 0
    assert result.output.strip() == (
        "Error: table authors has no column named age\n\n"
        "Try using --alter to add additional columns"
    )


def test_index_foreign_keys(db_path):
    test_add_column_foreign_key(db_path)
    db = Database(db_path)
    assert [] == db["books"].indexes
    result = CliRunner().invoke(cli.cli, ["index-foreign-keys", db_path])
    assert result.exit_code == 0
    assert [["author_id"], ["author_name_ref"]] == [
        i.columns for i in db["books"].indexes
    ]


def test_enable_fts(db_path):
    db = Database(db_path)
    assert db["Gosh"].detect_fts() is None
    result = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"]
    )
    assert result.exit_code == 0
    assert "Gosh_fts" == db["Gosh"].detect_fts()

    # Table names with restricted chars are handled correctly.
    # colons and dots are restricted characters for table names.
    db["http://example.com"].create({"c1": str, "c2": str, "c3": str})
    assert db["http://example.com"].detect_fts() is None
    result = CliRunner().invoke(
        cli.cli,
        [
            "enable-fts",
            db_path,
            "http://example.com",
            "c1",
            "--fts4",
            "--tokenize",
            "porter",
        ],
    )
    assert result.exit_code == 0
    assert "http://example.com_fts" == db["http://example.com"].detect_fts()
    # Check tokenize was set to porter
    assert (
        "CREATE VIRTUAL TABLE [http://example.com_fts] USING FTS4 (\n"
        "    [c1],\n"
        "    tokenize='porter',\n"
        "    content=[http://example.com]"
        "\n)"
    ) == db["http://example.com_fts"].schema
    db["http://example.com"].drop()


def test_enable_fts_replace(db_path):
    db = Database(db_path)
    assert db["Gosh"].detect_fts() is None
    result = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"]
    )
    assert result.exit_code == 0
    assert "Gosh_fts" == db["Gosh"].detect_fts()
    assert db["Gosh_fts"].columns_dict == {"c1": str}

    # This should throw an error
    result2 = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"]
    )
    assert result2.exit_code == 1
    assert result2.output == "Error: table [Gosh_fts] already exists\n"

    # This should work
    result3 = CliRunner().invoke(
        cli.cli, ["enable-fts", db_path, "Gosh", "c2", "--fts4", "--replace"]
    )
    assert result3.exit_code == 0
    assert db["Gosh_fts"].columns_dict == {"c2": str}


def test_enable_fts_with_triggers(db_path):
    Database(db_path)["Gosh"].insert_all([{"c1": "baz"}])
    exit_code = (
        CliRunner()
        .invoke(
            cli.cli,
            ["enable-fts", db_path, "Gosh", "c1", "--fts4", "--create-triggers"],
        )
        .exit_code
    )
    assert exit_code == 0

    def search(q):
        return (
            Database(db_path)
            .execute("select c1 from Gosh_fts where c1 match ?", [q])
            .fetchall()
        )

    assert [("baz",)] == search("baz")
    Database(db_path)["Gosh"].insert_all([{"c1": "martha"}])
    assert [("martha",)] == search("martha")


def test_populate_fts(db_path):
    Database(db_path)["Gosh"].insert_all([{"c1": "baz"}])
    exit_code = (
        CliRunner()
        .invoke(cli.cli, ["enable-fts", db_path, "Gosh", "c1", "--fts4"])
        .exit_code
    )
    assert exit_code == 0

    def search(q):
        return (
            Database(db_path)
            .execute("select c1 from Gosh_fts where c1 match ?", [q])
            .fetchall()
        )

    assert [("baz",)] == search("baz")
    Database(db_path)["Gosh"].insert_all([{"c1": "martha"}])
    assert [] == search("martha")
    exit_code = (
        CliRunner().invoke(cli.cli, ["populate-fts", db_path, "Gosh", "c1"]).exit_code
    )
    assert exit_code == 0
    assert [("martha",)] == search("martha")


def test_disable_fts(db_path):
    db = Database(db_path)
    assert {"Gosh", "Gosh2"} == set(db.table_names())
    db["Gosh"].enable_fts(["c1"], create_triggers=True)
    assert {
        "Gosh_fts",
        "Gosh_fts_idx",
        "Gosh_fts_data",
        "Gosh2",
        "Gosh_fts_config",
        "Gosh",
        "Gosh_fts_docsize",
    } == set(db.table_names())
    exit_code = CliRunner().invoke(cli.cli, ["disable-fts", db_path, "Gosh"]).exit_code
    assert exit_code == 0
    assert {"Gosh", "Gosh2"} == set(db.table_names())


def test_vacuum(db_path):
    result = CliRunner().invoke(cli.cli, ["vacuum", db_path])
    assert result.exit_code == 0


def test_dump(db_path):
    result = CliRunner().invoke(cli.cli, ["dump", db_path])
    assert result.exit_code == 0
    assert result.output.startswith("BEGIN TRANSACTION;")
    assert result.output.strip().endswith("COMMIT;")


@pytest.mark.parametrize("tables", ([], ["Gosh"], ["Gosh2"]))
def test_optimize(db_path, tables):
    db = Database(db_path)
    with db.conn:
        for table in ("Gosh", "Gosh2"):
            db[table].insert_all(
                [
                    {
                        "c1": "verb{}".format(i),
                        "c2": "noun{}".format(i),
                        "c3": "adjective{}".format(i),
                    }
                    for i in range(10000)
                ]
            )
        db["Gosh"].enable_fts(["c1", "c2", "c3"], fts_version="FTS4")
        db["Gosh2"].enable_fts(["c1", "c2", "c3"], fts_version="FTS5")
    size_before_optimize = os.stat(db_path).st_size
    result = CliRunner().invoke(cli.cli, ["optimize", db_path] + tables)
    assert result.exit_code == 0
    size_after_optimize = os.stat(db_path).st_size
    # Weirdest thing: tests started failing because size after
    # ended up larger than size before in some cases. I think
    # it's OK to tolerate that happening, though it's very strange.
    assert size_after_optimize <= (size_before_optimize + 10000)
    # Soundness check that --no-vacuum doesn't throw errors:
    result = CliRunner().invoke(cli.cli, ["optimize", "--no-vacuum", db_path])
    assert result.exit_code == 0


def test_rebuild_fts_fixes_docsize_error(db_path):
    db = Database(db_path, recursive_triggers=False)
    records = [
        {
            "c1": "verb{}".format(i),
            "c2": "noun{}".format(i),
            "c3": "adjective{}".format(i),
        }
        for i in range(10000)
    ]
    with db.conn:
        db["fts5_table"].insert_all(records, pk="c1")
        db["fts5_table"].enable_fts(
            ["c1", "c2", "c3"], fts_version="FTS5", create_triggers=True
        )
    # Search should work
    assert list(db["fts5_table"].search("verb1"))
    # Replicate docsize error from this issue for FTS5
    # https://github.com/simonw/sqlite-utils/issues/149
    assert db["fts5_table_fts_docsize"].count == 10000
    db["fts5_table"].insert_all(records, replace=True)
    assert db["fts5_table"].count == 10000
    assert db["fts5_table_fts_docsize"].count == 20000
    # Running rebuild-fts should fix this
    result = CliRunner().invoke(cli.cli, ["rebuild-fts", db_path, "fts5_table"])
    assert result.exit_code == 0
    assert db["fts5_table_fts_docsize"].count == 10000


@pytest.mark.parametrize(
    "format,expected",
    [
        ("--csv", "id,name,age\n1,Cleo,4\n2,Pancakes,2\n"),
        ("--tsv", "id\tname\tage\n1\tCleo\t4\n2\tPancakes\t2\n"),
    ],
)
def test_query_csv(db_path, format, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
                {"id": 2, "age": 2, "name": "Pancakes"},
            ]
        )
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, age from dogs", format]
    )
    assert result.exit_code == 0
    assert result.output.replace("\r", "") == expected
    # Test the no-headers option:
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, age from dogs", "--no-headers", format]
    )
    expected_rest = "\n".join(expected.split("\n")[1:]).strip()
    assert result.output.strip().replace("\r", "") == expected_rest


_all_query = "select id, name, age from dogs"
_one_query = "select id, name, age from dogs where id = 1"


@pytest.mark.parametrize(
    "sql,args,expected",
    [
        (
            _all_query,
            [],
            '[{"id": 1, "name": "Cleo", "age": 4},\n {"id": 2, "name": "Pancakes", "age": 2}]',
        ),
        (
            _all_query,
            ["--nl"],
            '{"id": 1, "name": "Cleo", "age": 4}\n{"id": 2, "name": "Pancakes", "age": 2}',
        ),
        (_all_query, ["--arrays"], '[[1, "Cleo", 4],\n [2, "Pancakes", 2]]'),
        (_all_query, ["--arrays", "--nl"], '[1, "Cleo", 4]\n[2, "Pancakes", 2]'),
        (_one_query, [], '[{"id": 1, "name": "Cleo", "age": 4}]'),
        (_one_query, ["--nl"], '{"id": 1, "name": "Cleo", "age": 4}'),
        (_one_query, ["--arrays"], '[[1, "Cleo", 4]]'),
        (_one_query, ["--arrays", "--nl"], '[1, "Cleo", 4]'),
        (
            "select id, dog(age) from dogs",
            ["--functions", "def dog(i):\n  return i * 7"],
            '[{"id": 1, "dog(age)": 28},\n {"id": 2, "dog(age)": 14}]',
        ),
    ],
)
def test_query_json(db_path, sql, args, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
                {"id": 2, "age": 2, "name": "Pancakes"},
            ]
        )
    result = CliRunner().invoke(cli.cli, [db_path, sql] + args)
    assert expected == result.output.strip()


def test_query_json_empty(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [db_path, "select * from sqlite_master where 0"],
    )
    assert result.output.strip() == "[]"


def test_query_invalid_function(db_path):
    result = CliRunner().invoke(
        cli.cli, [db_path, "select bad()", "--functions", "def invalid_python"]
    )
    assert result.exit_code == 1
    assert result.output.startswith("Error: Error in functions definition:")


TEST_FUNCTIONS = """
def zero():
    return 0

def one(a):
    return a

def _two(a, b):
    return a + b

def two(a, b):
    return _two(a, b)
"""


def test_query_complex_function(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select zero(), one(1), two(1, 2)",
            "--functions",
            TEST_FUNCTIONS,
        ],
    )
    assert result.exit_code == 0
    assert json.loads(result.output.strip()) == [
        {"zero()": 0, "one(1)": 1, "two(1, 2)": 3}
    ]


@pytest.mark.skipif(
    not _supports_pragma_function_list(),
    reason="Needs SQLite version that supports pragma_function_list()",
)
def test_hidden_functions_are_hidden(db_path):
    result = CliRunner().invoke(
        cli.cli,
        [
            db_path,
            "select name from pragma_function_list()",
            "--functions",
            TEST_FUNCTIONS,
        ],
    )
    assert result.exit_code == 0
    functions = {r["name"] for r in json.loads(result.output.strip())}
    assert "zero" in functions
    assert "one" in functions
    assert "two" in functions
    assert "_two" not in functions


LOREM_IPSUM_COMPRESSED = (
    b"x\x9c\xed\xd1\xcdq\x03!\x0c\x05\xe0\xbb\xabP\x01\x1eW\x91\xdc|M\x01\n\xc8\x8e"
    b"f\xf83H\x1e\x97\x1f\x91M\x8e\xe9\xe0\xdd\x96\x05\x84\xf4\xbek\x9fRI\xc7\xf2J"
    b"\xb9\x97>i\xa9\x11W\xb13\xa5\xde\x96$\x13\xf3I\x9cu\xe8J\xda\xee$EcsI\x8e\x0b"
    b"$\xea\xab\xf6L&u\xc4emI\xb3foFnT\xf83\xca\x93\xd8QZ\xa8\xf2\xbd1q\xd1\x87\xf3"
    b"\x85>\x8c\xa4i\x8d\xdaTu\x7f<c\xc9\xf5L\x0f\xd7E\xad/\x9b\x9eI^2\x93\x1a\x9b"
    b"\xf6F^\n\xd7\xd4\x8f\xca\xfb\x90.\xdd/\xfd\x94\xd4\x11\x87I8\x1a\xaf\xd1S?\x06"
    b"\x88\xa7\xecBo\xbb$\xbb\t\xe9\xf4\xe8\xe4\x98U\x1bM\x19S\xbe\xa4e\x991x\xfc"
    b"x\xf6\xe2#\x9e\x93h'&%YK(i)\x7f\t\xc5@N7\xbf+\x1b\xb5\xdd\x10\r\x9e\xb1\xf0"
    b"y\xa1\xf7W\x92a\xe2;\xc6\xc8\xa0\xa7\xc4\x92\xe2\\\xf2\xa1\x99m\xdf\x88)\xc6"
    b"\xec\x9a\xa5\xed\x14wR\xf1h\xf22x\xcfM\xfdv\xd3\xa4LY\x96\xcc\xbd[{\xd9m\xf0"
    b"\x0eH#\x8e\xf5\x9b\xab\xd7\xcb\xe9t\x05\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\x03"
    b"\x1f\xf8\xc0\x07>\xf0\x81\x0f|\xe0\xfb\x8f\xef\x1b\x9b\x06\x83}"
)


def test_query_json_binary(db_path):
    db = Database(db_path)
    with db.conn:
        db["files"].insert(
            {
                "name": "lorem.txt",
                "sz": 16984,
                "data": LOREM_IPSUM_COMPRESSED,
            },
            pk="name",
        )
    result = CliRunner().invoke(cli.cli, [db_path, "select name, sz, data from files"])
    assert result.exit_code == 0, str(result)
    assert json.loads(result.output.strip()) == [
        {
            "name": "lorem.txt",
            "sz": 16984,
            "data": {
                "$base64": True,
                "encoded": (
                    (
                        "eJzt0c1xAyEMBeC7q1ABHleR3HxNAQrIjmb4M0gelx+RTY7p4N2WBYT0vmufUknH"
                        "8kq5lz5pqRFXsTOl3pYkE/NJnHXoStruJEVjc0mOCyTqq/ZMJnXEZW1Js2ZvRm5U+"
                        "DPKk9hRWqjyvTFx0YfzhT6MpGmN2lR1fzxjyfVMD9dFrS+bnkleMpMam/ZGXgrX1I"
                        "/K+5Au3S/9lNQRh0k4Gq/RUz8GiKfsQm+7JLsJ6fTo5JhVG00ZU76kZZkxePx49uI"
                        "jnpNoJyYlWUsoaSl/CcVATje/Kxu13RANnrHweaH3V5Jh4jvGyKCnxJLiXPKhmW3f"
                        "iCnG7Jql7RR3UvFo8jJ4z039dtOkTFmWzL1be9lt8A5II471m6vXy+l0BR/4wAc+8"
                        "IEPfOADH/jABz7wgQ984AMf+MAHPvCBD3zgAx/4wAc+8IEPfOADH/jABz7wgQ984A"
                        "Mf+MAHPvCBD3zgAx/4wAc+8IEPfOADH/jABz7wgQ984PuP7xubBoN9"
                    )
                ),
            },
        }
    ]


@pytest.mark.parametrize(
    "sql,params,expected",
    [
        ("select 1 + 1 as out", {"p": "2"}, 2),
        ("select 1 + :p as out", {"p": "2"}, 3),
        (
            "select :hello as out",
            {"hello": """This"has'many'quote"s"""},
            """This"has'many'quote"s""",
        ),
    ],
)
def test_query_params(db_path, sql, params, expected):
    extra_args = []
    for key, value in params.items():
        extra_args.extend(["-p", key, value])
    result = CliRunner().invoke(cli.cli, [db_path, sql] + extra_args)
    assert result.exit_code == 0, str(result)
    assert json.loads(result.output.strip()) == [{"out": expected}]


def test_query_json_with_json_cols(db_path):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert(
            {
                "id": 1,
                "name": "Cleo",
                "friends": [{"name": "Pancakes"}, {"name": "Bailey"}],
            }
        )
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, friends from dogs"]
    )
    assert (
        r"""
    [{"id": 1, "name": "Cleo", "friends": "[{\"name\": \"Pancakes\"}, {\"name\": \"Bailey\"}]"}]
    """.strip()
        == result.output.strip()
    )
    # With --json-cols:
    result = CliRunner().invoke(
        cli.cli, [db_path, "select id, name, friends from dogs", "--json-cols"]
    )
    expected = r"""
    [{"id": 1, "name": "Cleo", "friends": [{"name": "Pancakes"}, {"name": "Bailey"}]}]
    """.strip()
    assert expected == result.output.strip()
    # Test rows command too
    result_rows = CliRunner().invoke(cli.cli, ["rows", db_path, "dogs", "--json-cols"])
    assert expected == result_rows.output.strip()


@pytest.mark.parametrize(
    "content,is_binary",
    [(b"\x00\x0Fbinary", True), ("this is text", False), (1, False), (1.5, False)],
)
def test_query_raw(db_path, content, is_binary):
    Database(db_path)["files"].insert({"content": content})
    result = CliRunner().invoke(
        cli.cli, [db_path, "select content from files", "--raw"]
    )
    if is_binary:
        assert result.stdout_bytes == content
    else:
        assert result.output == str(content)


@pytest.mark.parametrize(
    "content,is_binary",
    [(b"\x00\x0Fbinary", True), ("this is text", False), (1, False), (1.5, False)],
)
def test_query_raw_lines(db_path, content, is_binary):
    Database(db_path)["files"].insert_all({"content": content} for _ in range(3))
    result = CliRunner().invoke(
        cli.cli, [db_path, "select content from files", "--raw-lines"]
    )
    if is_binary:
        assert result.stdout_bytes == b"\n".join(content for _ in range(3)) + b"\n"
    else:
        assert result.output == "\n".join(str(content) for _ in range(3)) + "\n"


def test_query_memory_does_not_create_file(tmpdir):
    owd = os.getcwd()
    try:
        os.chdir(tmpdir)
        # This should create a foo.db file
        CliRunner().invoke(cli.cli, ["foo.db", "select sqlite_version()"])
        # This should NOT create a file
        result = CliRunner().invoke(cli.cli, [":memory:", "select sqlite_version()"])
        assert ["sqlite_version()"] == list(json.loads(result.output)[0].keys())
    finally:
        os.chdir(owd)
    assert ["foo.db"] == os.listdir(tmpdir)


@pytest.mark.parametrize(
    "args,expected",
    [
        (
            [],
            '[{"id": 1, "name": "Cleo", "age": 4},\n {"id": 2, "name": "Pancakes", "age": 2}]',
        ),
        (
            ["--nl"],
            '{"id": 1, "name": "Cleo", "age": 4}\n{"id": 2, "name": "Pancakes", "age": 2}',
        ),
        (["--arrays"], '[[1, "Cleo", 4],\n [2, "Pancakes", 2]]'),
        (["--arrays", "--nl"], '[1, "Cleo", 4]\n[2, "Pancakes", 2]'),
        (
            ["--nl", "-c", "age", "-c", "name"],
            '{"age": 4, "name": "Cleo"}\n{"age": 2, "name": "Pancakes"}',
        ),
        # --limit and --offset
        (
            ["-c", "name", "--limit", "1"],
            '[{"name": "Cleo"}]',
        ),
        (
            ["-c", "name", "--limit", "1", "--offset", "1"],
            '[{"name": "Pancakes"}]',
        ),
        # --where
        (
            ["-c", "name", "--where", "id = 1"],
            '[{"name": "Cleo"}]',
        ),
        (
            ["-c", "name", "--where", "id = :id", "-p", "id", "1"],
            '[{"name": "Cleo"}]',
        ),
        (
            ["-c", "name", "--where", "id = :id", "--param", "id", "1"],
            '[{"name": "Cleo"}]',
        ),
        # --order
        (
            ["-c", "id", "--order", "id desc", "--limit", "1"],
            '[{"id": 2}]',
        ),
        (
            ["-c", "id", "--order", "id", "--limit", "1"],
            '[{"id": 1}]',
        ),
    ],
)
def test_rows(db_path, args, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
                {"id": 2, "age": 2, "name": "Pancakes"},
            ],
            column_order=("id", "name", "age"),
        )
    result = CliRunner().invoke(cli.cli, ["rows", db_path, "dogs"] + args)
    assert expected == result.output.strip()


def test_upsert(db_path, tmpdir):
    json_path = str(tmpdir / "dogs.json")
    db = Database(db_path)
    insert_dogs = [
        {"id": 1, "name": "Cleo", "age": 4},
        {"id": 2, "name": "Nixie", "age": 4},
    ]
    write_json(json_path, insert_dogs)
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "dogs", json_path, "--pk", "id"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0, result.output
    assert 2 == db["dogs"].count
    # Now run the upsert to update just their ages
    upsert_dogs = [
        {"id": 1, "age": 5},
        {"id": 2, "age": 5},
    ]
    write_json(json_path, upsert_dogs)
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "dogs", json_path, "--pk", "id"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0, result.output
    assert list(db.query("select * from dogs order by id")) == [
        {"id": 1, "name": "Cleo", "age": 5},
        {"id": 2, "name": "Nixie", "age": 5},
    ]


def test_upsert_pk_required(db_path, tmpdir):
    json_path = str(tmpdir / "dogs.json")
    insert_dogs = [
        {"id": 1, "name": "Cleo", "age": 4},
        {"id": 2, "name": "Nixie", "age": 4},
    ]
    write_json(json_path, insert_dogs)
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "dogs", json_path],
        catch_exceptions=False,
    )
    assert result.exit_code == 2
    assert "Error: Missing option '--pk'" in result.output


def test_upsert_analyze(db_path, tmpdir):
    db = Database(db_path)
    db["rows"].insert({"id": 1, "foo": "x", "n": 3}, pk="id")
    db["rows"].create_index(["n"])
    assert "sqlite_stat1" not in db.table_names()
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "rows", "-", "--nl", "--analyze", "--pk", "id"],
        input='{"id": 2, "foo": "bar", "n": 1}',
    )
    assert result.exit_code == 0, result.output
    assert "sqlite_stat1" in db.table_names()


def test_upsert_flatten(tmpdir):
    db_path = str(tmpdir / "flat.db")
    db = Database(db_path)
    db["upsert_me"].insert({"id": 1, "name": "Example"}, pk="id")
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "upsert_me", "-", "--flatten", "--pk", "id", "--alter"],
        input=json.dumps({"id": 1, "nested": {"two": 2}}),
    )
    assert result.exit_code == 0
    assert list(db.query("select * from upsert_me")) == [
        {"id": 1, "name": "Example", "nested_two": 2}
    ]


def test_upsert_alter(db_path, tmpdir):
    json_path = str(tmpdir / "dogs.json")
    db = Database(db_path)
    insert_dogs = [{"id": 1, "name": "Cleo"}]
    write_json(json_path, insert_dogs)
    result = CliRunner().invoke(
        cli.cli, ["insert", db_path, "dogs", json_path, "--pk", "id"]
    )
    assert result.exit_code == 0, result.output
    # Should fail with error code if no --alter
    upsert_dogs = [{"id": 1, "age": 5}]
    write_json(json_path, upsert_dogs)
    result = CliRunner().invoke(
        cli.cli, ["upsert", db_path, "dogs", json_path, "--pk", "id"]
    )
    assert result.exit_code == 1
    assert (
        "Error: no such column: age\n\n"
        "sql = UPDATE [dogs] SET [age] = ? WHERE [id] = ?\n"
        "parameters = [5, 1]"
    ) == result.output.strip()
    # Should succeed with --alter
    result = CliRunner().invoke(
        cli.cli, ["upsert", db_path, "dogs", json_path, "--pk", "id", "--alter"]
    )
    assert result.exit_code == 0
    assert list(db.query("select * from dogs order by id")) == [
        {"id": 1, "name": "Cleo", "age": 5},
    ]


@pytest.mark.parametrize(
    "args,schema",
    [
        # No primary key
        (
            [
                "name",
                "text",
                "age",
                "integer",
            ],
            ("CREATE TABLE [t] (\n   [name] TEXT,\n   [age] INTEGER\n)"),
        ),
        # All types:
        (
            [
                "id",
                "integer",
                "name",
                "text",
                "age",
                "integer",
                "weight",
                "float",
                "thumbnail",
                "blob",
                "--pk",
                "id",
            ],
            (
                "CREATE TABLE [t] (\n"
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [name] TEXT,\n"
                "   [age] INTEGER,\n"
                "   [weight] FLOAT,\n"
                "   [thumbnail] BLOB\n"
                ")"
            ),
        ),
        # Not null:
        (
            ["name", "text", "--not-null", "name"],
            ("CREATE TABLE [t] (\n" "   [name] TEXT NOT NULL\n" ")"),
        ),
        # Default:
        (
            ["age", "integer", "--default", "age", "3"],
            ("CREATE TABLE [t] (\n" "   [age] INTEGER DEFAULT '3'\n" ")"),
        ),
        # Compound primary key
        (
            ["category", "text", "name", "text", "--pk", "category", "--pk", "name"],
            (
                "CREATE TABLE [t] (\n   [category] TEXT,\n   [name] TEXT,\n"
                "   PRIMARY KEY ([category], [name])\n)"
            ),
        ),
    ],
)
def test_create_table(args, schema):
    runner = CliRunner()
    with runner.isolated_filesystem():
        result = runner.invoke(
            cli.cli,
            [
                "create-table",
                "test.db",
                "t",
            ]
            + args,
            catch_exceptions=False,
        )
        assert result.exit_code == 0
        db = Database("test.db")
        assert schema == db["t"].schema


def test_create_table_foreign_key():
    runner = CliRunner()
    creates = (
        ["authors", "id", "integer", "name", "text", "--pk", "id"],
        [
            "books",
            "id",
            "integer",
            "title",
            "text",
            "author_id",
            "integer",
            "--pk",
            "id",
            "--fk",
            "author_id",
            "authors",
            "id",
        ],
    )
    with runner.isolated_filesystem():
        for args in creates:
            result = runner.invoke(
                cli.cli, ["create-table", "books.db"] + args, catch_exceptions=False
            )
            assert result.exit_code == 0
        db = Database("books.db")
        assert (
            "CREATE TABLE [authors] (\n"
            "   [id] INTEGER PRIMARY KEY,\n"
            "   [name] TEXT\n"
            ")"
        ) == db["authors"].schema
        assert (
            "CREATE TABLE [books] (\n"
            "   [id] INTEGER PRIMARY KEY,\n"
            "   [title] TEXT,\n"
            "   [author_id] INTEGER REFERENCES [authors]([id])\n"
            ")"
        ) == db["books"].schema


def test_create_table_error_if_table_exists():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["dogs"].insert({"name": "Cleo"})
        result = runner.invoke(
            cli.cli, ["create-table", "test.db", "dogs", "id", "integer"]
        )
        assert result.exit_code == 1
        assert (
            'Error: Table "dogs" already exists. Use --replace to delete and replace it.'
            == result.output.strip()
        )


def test_create_table_ignore():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["dogs"].insert({"name": "Cleo"})
        result = runner.invoke(
            cli.cli, ["create-table", "test.db", "dogs", "id", "integer", "--ignore"]
        )
        assert result.exit_code == 0
        assert "CREATE TABLE [dogs] (\n   [name] TEXT\n)" == db["dogs"].schema


def test_create_table_replace():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["dogs"].insert({"name": "Cleo"})
        result = runner.invoke(
            cli.cli, ["create-table", "test.db", "dogs", "id", "integer", "--replace"]
        )
        assert result.exit_code == 0
        assert "CREATE TABLE [dogs] (\n   [id] INTEGER\n)" == db["dogs"].schema


def test_create_view():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        result = runner.invoke(
            cli.cli, ["create-view", "test.db", "version", "select sqlite_version()"]
        )
        assert result.exit_code == 0
        assert "CREATE VIEW version AS select sqlite_version()" == db["version"].schema


def test_create_view_error_if_view_exists():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("version", "select sqlite_version() + 1")
        result = runner.invoke(
            cli.cli, ["create-view", "test.db", "version", "select sqlite_version()"]
        )
        assert result.exit_code == 1
        assert (
            'Error: View "version" already exists. Use --replace to delete and replace it.'
            == result.output.strip()
        )


def test_create_view_ignore():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("version", "select sqlite_version() + 1")
        result = runner.invoke(
            cli.cli,
            [
                "create-view",
                "test.db",
                "version",
                "select sqlite_version()",
                "--ignore",
            ],
        )
        assert result.exit_code == 0
        assert (
            "CREATE VIEW version AS select sqlite_version() + 1" == db["version"].schema
        )


def test_create_view_replace():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("version", "select sqlite_version() + 1")
        result = runner.invoke(
            cli.cli,
            [
                "create-view",
                "test.db",
                "version",
                "select sqlite_version()",
                "--replace",
            ],
        )
        assert result.exit_code == 0
        assert "CREATE VIEW version AS select sqlite_version()" == db["version"].schema


def test_drop_table():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["t"].create({"pk": int}, pk="pk")
        assert "t" in db.table_names()
        result = runner.invoke(
            cli.cli,
            [
                "drop-table",
                "test.db",
                "t",
            ],
        )
        assert result.exit_code == 0
        assert "t" not in db.table_names()


def test_drop_table_error():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["t"].create({"pk": int}, pk="pk")
        result = runner.invoke(
            cli.cli,
            [
                "drop-table",
                "test.db",
                "t2",
            ],
        )
        assert result.exit_code == 1
        assert 'Error: Table "t2" does not exist' == result.output.strip()
        # Using --ignore suppresses that error
        result = runner.invoke(
            cli.cli,
            ["drop-table", "test.db", "t2", "--ignore"],
        )
        assert result.exit_code == 0


def test_drop_view():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db.create_view("hello", "select 1")
        assert "hello" in db.view_names()
        result = runner.invoke(
            cli.cli,
            [
                "drop-view",
                "test.db",
                "hello",
            ],
        )
        assert result.exit_code == 0
        assert "hello" not in db.view_names()


def test_drop_view_error():
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        db["t"].create({"pk": int}, pk="pk")
        result = runner.invoke(
            cli.cli,
            [
                "drop-view",
                "test.db",
                "t2",
            ],
        )
        assert result.exit_code == 1
        assert 'Error: View "t2" does not exist' == result.output.strip()
        # Using --ignore suppresses that error
        result = runner.invoke(
            cli.cli,
            ["drop-view", "test.db", "t2", "--ignore"],
        )
        assert result.exit_code == 0


def test_enable_wal():
    runner = CliRunner()
    dbs = ["test.db", "test2.db"]
    with runner.isolated_filesystem():
        for dbname in dbs:
            db = Database(dbname)
            db["t"].create({"pk": int}, pk="pk")
            assert db.journal_mode == "delete"
        result = runner.invoke(cli.cli, ["enable-wal"] + dbs, catch_exceptions=False)
        assert result.exit_code == 0
        for dbname in dbs:
            db = Database(dbname)
            assert db.journal_mode == "wal"


def test_disable_wal():
    runner = CliRunner()
    dbs = ["test.db", "test2.db"]
    with runner.isolated_filesystem():
        for dbname in dbs:
            db = Database(dbname)
            db["t"].create({"pk": int}, pk="pk")
            db.enable_wal()
            assert db.journal_mode == "wal"
        result = runner.invoke(cli.cli, ["disable-wal"] + dbs)
        assert result.exit_code == 0
        for dbname in dbs:
            db = Database(dbname)
            assert db.journal_mode == "delete"


@pytest.mark.parametrize(
    "args,expected",
    [
        (
            [],
            '[{"rows_affected": 1}]',
        ),
        (["-t"], "rows_affected\n---------------\n              1"),
    ],
)
def test_query_update(db_path, args, expected):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert_all(
            [
                {"id": 1, "age": 4, "name": "Cleo"},
            ]
        )
    result = CliRunner().invoke(
        cli.cli, [db_path, "update dogs set age = 5 where name = 'Cleo'"] + args
    )
    assert expected == result.output.strip()
    assert list(db.query("select * from dogs")) == [
        {"id": 1, "age": 5, "name": "Cleo"},
    ]


def test_add_foreign_keys(db_path):
    db = Database(db_path)
    db["countries"].insert({"id": 7, "name": "Panama"}, pk="id")
    db["authors"].insert({"id": 3, "name": "Matilda", "country_id": 7}, pk="id")
    db["books"].insert({"id": 2, "title": "Wolf anatomy", "author_id": 3}, pk="id")
    assert db["authors"].foreign_keys == []
    assert db["books"].foreign_keys == []
    result = CliRunner().invoke(
        cli.cli,
        [
            "add-foreign-keys",
            db_path,
            "authors",
            "country_id",
            "countries",
            "id",
            "books",
            "author_id",
            "authors",
            "id",
        ],
    )
    assert result.exit_code == 0
    assert db["authors"].foreign_keys == [
        ForeignKey(
            table="authors",
            column="country_id",
            other_table="countries",
            other_column="id",
        )
    ]
    assert db["books"].foreign_keys == [
        ForeignKey(
            table="books", column="author_id", other_table="authors", other_column="id"
        )
    ]


@pytest.mark.parametrize(
    "args,expected_schema",
    [
        (
            [],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [age] INTEGER NOT NULL DEFAULT '1',\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["--type", "age", "text"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [age] TEXT NOT NULL DEFAULT '1',\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["--drop", "age"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["--rename", "age", "age2", "--rename", "id", "pk"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [pk] INTEGER PRIMARY KEY,\n"
                "   [age2] INTEGER NOT NULL DEFAULT '1',\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["--not-null", "name"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [age] INTEGER NOT NULL DEFAULT '1',\n"
                "   [name] TEXT NOT NULL\n"
                ")"
            ),
        ),
        (
            ["--not-null-false", "age"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [age] INTEGER DEFAULT '1',\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["--pk", "name"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER,\n"
                "   [age] INTEGER NOT NULL DEFAULT '1',\n"
                "   [name] TEXT PRIMARY KEY\n"
                ")"
            ),
        ),
        (
            ["--pk-none"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER,\n"
                "   [age] INTEGER NOT NULL DEFAULT '1',\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["--default", "name", "Turnip"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [age] INTEGER NOT NULL DEFAULT '1',\n"
                "   [name] TEXT DEFAULT 'Turnip'\n"
                ")"
            ),
        ),
        (
            ["--default-none", "age"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [age] INTEGER NOT NULL,\n"
                "   [name] TEXT\n"
                ")"
            ),
        ),
        (
            ["-o", "name", "--column-order", "age", "-o", "id"],
            (
                'CREATE TABLE "dogs" (\n'
                "   [name] TEXT,\n"
                "   [age] INTEGER NOT NULL DEFAULT '1',\n"
                "   [id] INTEGER PRIMARY KEY\n"
                ")"
            ),
        ),
    ],
)
def test_transform(db_path, args, expected_schema):
    db = Database(db_path)
    with db.conn:
        db["dogs"].insert(
            {"id": 1, "age": 4, "name": "Cleo"},
            not_null={"age"},
            defaults={"age": 1},
            pk="id",
        )
    result = CliRunner().invoke(cli.cli, ["transform", db_path, "dogs"] + args)
    print(result.output)
    assert result.exit_code == 0
    schema = db["dogs"].schema
    assert schema == expected_schema


@pytest.mark.parametrize(
    "extra_args,expected_schema",
    (
        (
            ["--drop-foreign-key", "country"],
            (
                'CREATE TABLE "places" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [name] TEXT,\n"
                "   [country] INTEGER,\n"
                "   [city] INTEGER REFERENCES [city]([id]),\n"
                "   [continent] INTEGER\n"
                ")"
            ),
        ),
        (
            ["--drop-foreign-key", "country", "--drop-foreign-key", "city"],
            (
                'CREATE TABLE "places" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [name] TEXT,\n"
                "   [country] INTEGER,\n"
                "   [city] INTEGER,\n"
                "   [continent] INTEGER\n"
                ")"
            ),
        ),
        (
            ["--add-foreign-key", "continent", "continent", "id"],
            (
                'CREATE TABLE "places" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [name] TEXT,\n"
                "   [country] INTEGER REFERENCES [country]([id]),\n"
                "   [city] INTEGER REFERENCES [city]([id]),\n"
                "   [continent] INTEGER REFERENCES [continent]([id])\n"
                ")"
            ),
        ),
    ),
)
def test_transform_add_or_drop_foreign_key(db_path, extra_args, expected_schema):
    db = Database(db_path)
    with db.conn:
        # Create table with three foreign keys so we can drop two of them
        db["continent"].insert({"id": 1, "name": "Europe"}, pk="id")
        db["country"].insert({"id": 1, "name": "France"}, pk="id")
        db["city"].insert({"id": 24, "name": "Paris"}, pk="id")
        db["places"].insert(
            {
                "id": 32,
                "name": "Caveau de la Huchette",
                "country": 1,
                "city": 24,
                "continent": 1,
            },
            foreign_keys=("country", "city"),
            pk="id",
        )
    result = CliRunner().invoke(
        cli.cli,
        [
            "transform",
            db_path,
            "places",
        ]
        + extra_args,
    )
    assert result.exit_code == 0
    schema = db["places"].schema
    assert schema == expected_schema


_common_other_schema = (
    "CREATE TABLE [species] (\n   [id] INTEGER PRIMARY KEY,\n   [species] TEXT\n)"
)


@pytest.mark.parametrize(
    "args,expected_table_schema,expected_other_schema",
    [
        (
            [],
            (
                'CREATE TABLE "trees" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [address] TEXT,\n"
                "   [species_id] INTEGER REFERENCES [species]([id])\n"
                ")"
            ),
            _common_other_schema,
        ),
        (
            ["--table", "custom_table"],
            (
                'CREATE TABLE "trees" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [address] TEXT,\n"
                "   [custom_table_id] INTEGER REFERENCES [custom_table]([id])\n"
                ")"
            ),
            "CREATE TABLE [custom_table] (\n   [id] INTEGER PRIMARY KEY,\n   [species] TEXT\n)",
        ),
        (
            ["--fk-column", "custom_fk"],
            (
                'CREATE TABLE "trees" (\n'
                "   [id] INTEGER PRIMARY KEY,\n"
                "   [address] TEXT,\n"
                "   [custom_fk] INTEGER REFERENCES [species]([id])\n"
                ")"
            ),
            _common_other_schema,
        ),
        (
            ["--rename", "name", "name2"],
            'CREATE TABLE "trees" (\n'
            "   [id] INTEGER PRIMARY KEY,\n"
            "   [address] TEXT,\n"
            "   [species_id] INTEGER REFERENCES [species]([id])\n"
            ")",
            "CREATE TABLE [species] (\n   [id] INTEGER PRIMARY KEY,\n   [species] TEXT\n)",
        ),
    ],
)
def test_extract(db_path, args, expected_table_schema, expected_other_schema):
    db = Database(db_path)
    with db.conn:
        db["trees"].insert(
            {"id": 1, "address": "4 Park Ave", "species": "Palm"},
            pk="id",
        )
    result = CliRunner().invoke(
        cli.cli, ["extract", db_path, "trees", "species"] + args
    )
    print(result.output)
    assert result.exit_code == 0
    schema = db["trees"].schema
    assert schema == expected_table_schema
    other_schema = [t for t in db.tables if t.name not in ("trees", "Gosh", "Gosh2")][
        0
    ].schema
    assert other_schema == expected_other_schema


def test_insert_encoding(tmpdir):
    db_path = str(tmpdir / "test.db")
    latin1_csv = (
        b"date,name,latitude,longitude\n"
        b"2020-01-01,Barra da Lagoa,-27.574,-48.422\n"
        b"2020-03-04,S\xe3o Paulo,-23.561,-46.645\n"
        b"2020-04-05,Salta,-24.793:-65.408"
    )
    assert latin1_csv.decode("latin-1").split("\n")[2].split(",")[1] == "São Paulo"
    csv_path = str(tmpdir / "test.csv")
    with open(csv_path, "wb") as fp:
        fp.write(latin1_csv)
    # First attempt should error:
    bad_result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "places", csv_path, "--csv"],
        catch_exceptions=False,
    )
    assert bad_result.exit_code == 1
    assert (
        "The input you provided uses a character encoding other than utf-8"
        in bad_result.output
    )
    # Using --encoding=latin-1 should work
    good_result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "places", csv_path, "--encoding", "latin-1", "--csv"],
        catch_exceptions=False,
    )
    assert good_result.exit_code == 0
    db = Database(db_path)
    assert list(db["places"].rows) == [
        {
            "date": "2020-01-01",
            "name": "Barra da Lagoa",
            "latitude": "-27.574",
            "longitude": "-48.422",
        },
        {
            "date": "2020-03-04",
            "name": "São Paulo",
            "latitude": "-23.561",
            "longitude": "-46.645",
        },
        {
            "date": "2020-04-05",
            "name": "Salta",
            "latitude": "-24.793:-65.408",
            "longitude": None,
        },
    ]


@pytest.mark.parametrize("fts", ["FTS4", "FTS5"])
@pytest.mark.parametrize(
    "extra_arg,expected",
    [
        (
            None,
            '[{"rowid": 2, "id": 2, "title": "Title the second"}]\n',
        ),
        ("--csv", "rowid,id,title\n2,2,Title the second\n"),
    ],
)
def test_search(tmpdir, fts, extra_arg, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["articles"].insert_all(
        [
            {"id": 1, "title": "Title the first"},
            {"id": 2, "title": "Title the second"},
            {"id": 3, "title": "Title the third"},
        ],
        pk="id",
    )
    db["articles"].enable_fts(["title"], fts_version=fts)
    result = CliRunner().invoke(
        cli.cli,
        ["search", db_path, "articles", "second"] + ([extra_arg] if extra_arg else []),
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert result.output.replace("\r", "") == expected


def test_search_quote(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["creatures"].insert({"name": "dog."}).enable_fts(["name"])
    # Without --quote should return an error
    error_result = CliRunner().invoke(cli.cli, ["search", db_path, "creatures", 'dog"'])
    assert error_result.exit_code == 1
    assert error_result.output == (
        "Error: unterminated string\n\n"
        "Try running this again with the --quote option\n"
    )
    # With --quote it should work
    result = CliRunner().invoke(
        cli.cli, ["search", db_path, "creatures", 'dog"', "--quote"]
    )
    assert result.exit_code == 0
    assert result.output.strip() == '[{"rowid": 1, "name": "dog."}]'


def test_indexes(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db.conn.executescript(
        """
        create table Gosh (c1 text, c2 text, c3 text);
        create index Gosh_idx on Gosh(c2, c3 desc);
    """
    )
    result = CliRunner().invoke(
        cli.cli,
        ["indexes", str(db_path)],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert json.loads(result.output) == [
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 0,
            "cid": 1,
            "name": "c2",
            "desc": 0,
            "coll": "BINARY",
            "key": 1,
        },
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 1,
            "cid": 2,
            "name": "c3",
            "desc": 1,
            "coll": "BINARY",
            "key": 1,
        },
    ]
    result2 = CliRunner().invoke(
        cli.cli,
        ["indexes", str(db_path), "--aux"],
        catch_exceptions=False,
    )
    assert result2.exit_code == 0
    assert json.loads(result2.output) == [
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 0,
            "cid": 1,
            "name": "c2",
            "desc": 0,
            "coll": "BINARY",
            "key": 1,
        },
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 1,
            "cid": 2,
            "name": "c3",
            "desc": 1,
            "coll": "BINARY",
            "key": 1,
        },
        {
            "table": "Gosh",
            "index_name": "Gosh_idx",
            "seqno": 2,
            "cid": -1,
            "name": None,
            "desc": 0,
            "coll": "BINARY",
            "key": 0,
        },
    ]


_TRIGGERS_EXPECTED = (
    '[{"name": "blah", "table": "articles", "sql": "CREATE TRIGGER blah '
    'AFTER INSERT ON articles\\nBEGIN\\n    UPDATE counter SET count = count + 1;\\nEND"}]\n'
)


@pytest.mark.parametrize(
    "extra_args,expected",
    [
        ([], _TRIGGERS_EXPECTED),
        (["articles"], _TRIGGERS_EXPECTED),
        (["counter"], "[]\n"),
    ],
)
def test_triggers(tmpdir, extra_args, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["articles"].insert(
        {"id": 1, "title": "Title the first"},
        pk="id",
    )
    db["counter"].insert({"count": 1})
    db.conn.execute(
        textwrap.dedent(
            """
        CREATE TRIGGER blah AFTER INSERT ON articles
        BEGIN
            UPDATE counter SET count = count + 1;
        END
    """
        )
    )
    args = ["triggers", db_path]
    if extra_args:
        args.extend(extra_args)
    result = CliRunner().invoke(
        cli.cli,
        args,
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert result.output == expected


@pytest.mark.parametrize(
    "options,expected",
    (
        (
            [],
            (
                "CREATE TABLE [dogs] (\n"
                "   [id] INTEGER,\n"
                "   [name] TEXT\n"
                ");\n"
                "CREATE TABLE [chickens] (\n"
                "   [id] INTEGER,\n"
                "   [name] TEXT,\n"
                "   [breed] TEXT\n"
                ");\n"
                "CREATE INDEX [idx_chickens_breed]\n"
                "    ON [chickens] ([breed]);\n"
            ),
        ),
        (
            ["dogs"],
            ("CREATE TABLE [dogs] (\n" "   [id] INTEGER,\n" "   [name] TEXT\n" ")\n"),
        ),
        (
            ["chickens", "dogs"],
            (
                "CREATE TABLE [chickens] (\n"
                "   [id] INTEGER,\n"
                "   [name] TEXT,\n"
                "   [breed] TEXT\n"
                ")\n"
                "CREATE TABLE [dogs] (\n"
                "   [id] INTEGER,\n"
                "   [name] TEXT\n"
                ")\n"
            ),
        ),
    ),
)
def test_schema(tmpdir, options, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["dogs"].create({"id": int, "name": str})
    db["chickens"].create({"id": int, "name": str, "breed": str})
    db["chickens"].create_index(["breed"])
    result = CliRunner().invoke(
        cli.cli,
        ["schema", db_path] + options,
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert result.output == expected


def test_long_csv_column_value(tmpdir):
    db_path = str(tmpdir / "test.db")
    csv_path = str(tmpdir / "test.csv")
    with open(csv_path, "w") as csv_file:
        long_string = "a" * 131073
        csv_file.write("id,text\n")
        csv_file.write("1,{}\n".format(long_string))
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "bigtable", csv_path, "--csv"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    rows = list(db["bigtable"].rows)
    assert len(rows) == 1
    assert rows[0]["text"] == long_string


@pytest.mark.parametrize(
    "args,tsv",
    (
        (["--csv", "--no-headers"], False),
        (["--no-headers"], False),
        (["--tsv", "--no-headers"], True),
    ),
)
def test_import_no_headers(tmpdir, args, tsv):
    db_path = str(tmpdir / "test.db")
    csv_path = str(tmpdir / "test.csv")
    with open(csv_path, "w") as csv_file:
        sep = "\t" if tsv else ","
        csv_file.write("Cleo{sep}Dog{sep}5\n".format(sep=sep))
        csv_file.write("Tracy{sep}Spider{sep}7\n".format(sep=sep))
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "creatures", csv_path] + args,
        catch_exceptions=False,
    )
    assert result.exit_code == 0, result.output
    db = Database(db_path)
    schema = db["creatures"].schema
    assert schema == (
        "CREATE TABLE [creatures] (\n"
        "   [untitled_1] TEXT,\n"
        "   [untitled_2] TEXT,\n"
        "   [untitled_3] TEXT\n"
        ")"
    )
    rows = list(db["creatures"].rows)
    assert rows == [
        {"untitled_1": "Cleo", "untitled_2": "Dog", "untitled_3": "5"},
        {"untitled_1": "Tracy", "untitled_2": "Spider", "untitled_3": "7"},
    ]


def test_attach(tmpdir):
    foo_path = str(tmpdir / "foo.db")
    bar_path = str(tmpdir / "bar.db")
    db = Database(foo_path)
    with db.conn:
        db["foo"].insert({"id": 1, "text": "foo"})
    db2 = Database(bar_path)
    with db2.conn:
        db2["bar"].insert({"id": 1, "text": "bar"})
    db.attach("bar", bar_path)
    sql = "select * from foo union all select * from bar.bar"
    result = CliRunner().invoke(
        cli.cli,
        [foo_path, "--attach", "bar", bar_path, sql],
        catch_exceptions=False,
    )
    assert json.loads(result.output) == [
        {"id": 1, "text": "foo"},
        {"id": 1, "text": "bar"},
    ]


def test_csv_insert_bom(tmpdir):
    db_path = str(tmpdir / "test.db")
    bom_csv_path = str(tmpdir / "bom.csv")
    with open(bom_csv_path, "wb") as fp:
        fp.write(b"\xef\xbb\xbfname,age\nCleo,5")
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "broken", bom_csv_path, "--encoding", "utf-8", "--csv"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    result2 = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "fixed", bom_csv_path, "--csv"],
        catch_exceptions=False,
    )
    assert result2.exit_code == 0
    db = Database(db_path)
    tables = db.execute("select name, sql from sqlite_master").fetchall()
    assert tables == [
        ("broken", "CREATE TABLE [broken] (\n   [\ufeffname] TEXT,\n   [age] TEXT\n)"),
        ("fixed", "CREATE TABLE [fixed] (\n   [name] TEXT,\n   [age] TEXT\n)"),
    ]


@pytest.mark.parametrize("option_or_env_var", (None, "-d", "--detect-types"))
def test_insert_detect_types(tmpdir, option_or_env_var):
    db_path = str(tmpdir / "test.db")
    data = "name,age,weight\nCleo,6,45.5\nDori,1,3.5"
    extra = []
    if option_or_env_var:
        extra = [option_or_env_var]

    def _test():
        result = CliRunner().invoke(
            cli.cli,
            ["insert", db_path, "creatures", "-", "--csv"] + extra,
            catch_exceptions=False,
            input=data,
        )
        assert result.exit_code == 0
        db = Database(db_path)
        assert list(db["creatures"].rows) == [
            {"name": "Cleo", "age": 6, "weight": 45.5},
            {"name": "Dori", "age": 1, "weight": 3.5},
        ]

    if option_or_env_var is None:
        # Use environment variable instead of option
        with mock.patch.dict(os.environ, {"SQLITE_UTILS_DETECT_TYPES": "1"}):
            _test()
    else:
        _test()


@pytest.mark.parametrize("option", ("-d", "--detect-types"))
def test_upsert_detect_types(tmpdir, option):
    db_path = str(tmpdir / "test.db")
    data = "id,name,age,weight\n1,Cleo,6,45.5\n2,Dori,1,3.5"
    result = CliRunner().invoke(
        cli.cli,
        ["upsert", db_path, "creatures", "-", "--csv", "--pk", "id"] + [option],
        catch_exceptions=False,
        input=data,
    )
    assert result.exit_code == 0
    db = Database(db_path)
    assert list(db["creatures"].rows) == [
        {"id": 1, "name": "Cleo", "age": 6, "weight": 45.5},
        {"id": 2, "name": "Dori", "age": 1, "weight": 3.5},
    ]


def test_integer_overflow_error(tmpdir):
    db_path = str(tmpdir / "test.db")
    result = CliRunner().invoke(
        cli.cli,
        ["insert", db_path, "items", "-"],
        input=json.dumps({"bignumber": 34223049823094832094802398430298048240}),
    )
    assert result.exit_code == 1
    assert result.output == (
        "Error: Python int too large to convert to SQLite INTEGER\n\n"
        "sql = INSERT INTO [items] ([bignumber]) VALUES (?);\n"
        "parameters = [34223049823094832094802398430298048240]\n"
    )


def test_python_dash_m():
    "Tool can be run using python -m sqlite_utils"
    result = subprocess.run(
        [sys.executable, "-m", "sqlite_utils", "--help"], stdout=subprocess.PIPE
    )
    assert result.returncode == 0
    assert b"Commands for interacting with a SQLite database" in result.stdout


@pytest.mark.parametrize("enable_wal", (False, True))
def test_create_database(tmpdir, enable_wal):
    db_path = tmpdir / "test.db"
    assert not db_path.exists()
    args = ["create-database", str(db_path)]
    if enable_wal:
        args.append("--enable-wal")
    result = CliRunner().invoke(cli.cli, args)
    assert result.exit_code == 0, result.output
    assert db_path.exists()
    assert db_path.read_binary()[:16] == b"SQLite format 3\x00"
    db = Database(str(db_path))
    if enable_wal:
        assert db.journal_mode == "wal"
    else:
        assert db.journal_mode == "delete"


@pytest.mark.parametrize(
    "options,expected",
    (
        (
            [],
            [
                {"tbl": "two_indexes", "idx": "idx_two_indexes_species", "stat": "1 1"},
                {"tbl": "two_indexes", "idx": "idx_two_indexes_name", "stat": "1 1"},
                {"tbl": "one_index", "idx": "idx_one_index_name", "stat": "1 1"},
            ],
        ),
        (
            ["one_index"],
            [
                {"tbl": "one_index", "idx": "idx_one_index_name", "stat": "1 1"},
            ],
        ),
        (
            ["idx_two_indexes_name"],
            [
                {"tbl": "two_indexes", "idx": "idx_two_indexes_name", "stat": "1 1"},
            ],
        ),
    ),
)
def test_analyze(tmpdir, options, expected):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["one_index"].insert({"id": 1, "name": "Cleo"}, pk="id")
    db["one_index"].create_index(["name"])
    db["two_indexes"].insert({"id": 1, "name": "Cleo", "species": "dog"}, pk="id")
    db["two_indexes"].create_index(["name"])
    db["two_indexes"].create_index(["species"])
    result = CliRunner().invoke(cli.cli, ["analyze", db_path] + options)
    assert result.exit_code == 0
    assert list(db["sqlite_stat1"].rows) == expected


def test_rename_table(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["one"].insert({"id": 1, "name": "Cleo"}, pk="id")
    # First try a non-existent table
    result_error = CliRunner().invoke(
        cli.cli,
        ["rename-table", db_path, "missing", "two"],
        catch_exceptions=False,
    )
    assert result_error.exit_code == 1
    assert result_error.output == (
        'Error: Table "missing" could not be renamed. ' "no such table: missing\n"
    )
    # And check --ignore works
    result_error2 = CliRunner().invoke(
        cli.cli,
        ["rename-table", db_path, "missing", "two", "--ignore"],
        catch_exceptions=False,
    )
    assert result_error2.exit_code == 0
    previous_columns = db["one"].columns_dict
    # Now try for a table that exists
    result = CliRunner().invoke(
        cli.cli,
        ["rename-table", db_path, "one", "two"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert db["two"].columns_dict == previous_columns


def test_duplicate_table(tmpdir):
    db_path = str(tmpdir / "test.db")
    db = Database(db_path)
    db["one"].insert({"id": 1, "name": "Cleo"}, pk="id")
    # First try a non-existent table
    result_error = CliRunner().invoke(
        cli.cli,
        ["duplicate", db_path, "missing", "two"],
        catch_exceptions=False,
    )
    assert result_error.exit_code == 1
    assert result_error.output == 'Error: Table "missing" does not exist\n'
    # And check --ignore works
    result_error2 = CliRunner().invoke(
        cli.cli,
        ["duplicate", db_path, "missing", "two", "--ignore"],
        catch_exceptions=False,
    )
    assert result_error2.exit_code == 0
    # Now try for a table that exists
    result = CliRunner().invoke(
        cli.cli,
        ["duplicate", db_path, "one", "two"],
        catch_exceptions=False,
    )
    assert result.exit_code == 0
    assert db["one"].columns_dict == db["two"].columns_dict
    assert list(db["one"].rows) == list(db["two"].rows)


@pytest.mark.skipif(not _has_compiled_ext(), reason="Requires compiled ext.c")
@pytest.mark.parametrize(
    "entrypoint,should_pass,should_fail",
    (
        (None, ("a",), ("b", "c")),
        ("sqlite3_ext_b_init", ("b"), ("a", "c")),
        ("sqlite3_ext_c_init", ("c"), ("a", "b")),
    ),
)
def test_load_extension(entrypoint, should_pass, should_fail):
    ext = COMPILED_EXTENSION_PATH
    if entrypoint:
        ext += ":" + entrypoint
    for func in should_pass:
        result = CliRunner().invoke(
            cli.cli,
            ["memory", "select {}()".format(func), "--load-extension", ext],
            catch_exceptions=False,
        )
        assert result.exit_code == 0
    for func in should_fail:
        result = CliRunner().invoke(
            cli.cli,
            ["memory", "select {}()".format(func), "--load-extension", ext],
            catch_exceptions=False,
        )
        assert result.exit_code == 1


@pytest.mark.parametrize("strict", (False, True))
def test_create_table_strict(strict):
    runner = CliRunner()
    with runner.isolated_filesystem():
        db = Database("test.db")
        result = runner.invoke(
            cli.cli,
            ["create-table", "test.db", "items", "id", "integer", "w", "float"]
            + (["--strict"] if strict else []),
        )
        assert result.exit_code == 0
        assert db["items"].strict == strict or not db.supports_strict
        # Should have a floating point column
        assert db["items"].columns_dict == {"id": int, "w": float}


@pytest.mark.parametrize("method", ("insert", "upsert"))
@pytest.mark.parametrize("strict", (False, True))
def test_insert_upsert_strict(tmpdir, method, strict):
    db_path = str(tmpdir / "test.db")
    result = CliRunner().invoke(
        cli.cli,
        [method, db_path, "items", "-", "--csv", "--pk", "id"]
        + (["--strict"] if strict else []),
        input="id\n1",
    )
    assert result.exit_code == 0
    db = Database(db_path)
    assert db["items"].strict == strict or not db.supports_strict
