import base64
import click
from click_default_group import DefaultGroup  # type: ignore
from datetime import datetime
import hashlib
import pathlib
from runpy import run_module
import sqlite_utils
from sqlite_utils.db import AlterError, BadMultiValues, DescIndex, NoTable
from sqlite_utils.plugins import pm, get_plugins
from sqlite_utils.utils import maximize_csv_field_size_limit
from sqlite_utils import recipes
import textwrap
import inspect
import io
import itertools
import json
import os
import pdb
import sys
import csv as csv_std
import tabulate
from .utils import (
    OperationalError,
    _compile_code,
    chunks,
    file_progress,
    find_spatialite,
    flatten as _flatten,
    sqlite3,
    decode_base64_values,
    progressbar,
    rows_from_file,
    Format,
    TypeTracker,
)

try:
    import trogon  # type: ignore
except ImportError:
    trogon = None


CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])

VALID_COLUMN_TYPES = ("INTEGER", "TEXT", "FLOAT", "BLOB")

UNICODE_ERROR = """
{}

The input you provided uses a character encoding other than utf-8.

You can fix this by passing the --encoding= option with the encoding of the file.

If you do not know the encoding, running 'file filename.csv' may tell you.

It's often worth trying: --encoding=latin-1
""".strip()

maximize_csv_field_size_limit()


class CaseInsensitiveChoice(click.Choice):
    def __init__(self, choices):
        super().__init__([choice.lower() for choice in choices])

    def convert(self, value, param, ctx):
        return super().convert(value.lower(), param, ctx)


def output_options(fn):
    for decorator in reversed(
        (
            click.option(
                "--nl",
                help="Output newline-delimited JSON",
                is_flag=True,
                default=False,
            ),
            click.option(
                "--arrays",
                help="Output rows as arrays instead of objects",
                is_flag=True,
                default=False,
            ),
            click.option("--csv", is_flag=True, help="Output CSV"),
            click.option("--tsv", is_flag=True, help="Output TSV"),
            click.option("--no-headers", is_flag=True, help="Omit CSV headers"),
            click.option(
                "-t", "--table", is_flag=True, help="Output as a formatted table"
            ),
            click.option(
                "--fmt",
                help="Table format - one of {}".format(
                    ", ".join(tabulate.tabulate_formats)
                ),
            ),
            click.option(
                "--json-cols",
                help="Detect JSON cols and output them as JSON, not escaped strings",
                is_flag=True,
                default=False,
            ),
        )
    ):
        fn = decorator(fn)
    return fn


def load_extension_option(fn):
    return click.option(
        "--load-extension",
        multiple=True,
        help="Path to SQLite extension, with optional :entrypoint",
    )(fn)


@click.group(
    cls=DefaultGroup,
    default="query",
    default_if_no_args=True,
    context_settings=CONTEXT_SETTINGS,
)
@click.version_option()
def cli():
    "Commands for interacting with a SQLite database"
    pass


if trogon is not None:
    cli = trogon.tui()(cli)


@cli.command()
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.option(
    "--fts4", help="Just show FTS4 enabled tables", default=False, is_flag=True
)
@click.option(
    "--fts5", help="Just show FTS5 enabled tables", default=False, is_flag=True
)
@click.option(
    "--counts", help="Include row counts per table", default=False, is_flag=True
)
@output_options
@click.option(
    "--columns",
    help="Include list of columns for each table",
    is_flag=True,
    default=False,
)
@click.option(
    "--schema",
    help="Include schema for each table",
    is_flag=True,
    default=False,
)
@load_extension_option
def tables(
    path,
    fts4,
    fts5,
    counts,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    columns,
    schema,
    load_extension,
    views=False,
):
    """List the tables in the database

    Example:

    \b
        sqlite-utils tables trees.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    headers = ["view" if views else "table"]
    if counts:
        headers.append("count")
    if columns:
        headers.append("columns")
    if schema:
        headers.append("schema")

    def _iter():
        if views:
            items = db.view_names()
        else:
            items = db.table_names(fts4=fts4, fts5=fts5)
        for name in items:
            row = [name]
            if counts:
                row.append(db[name].count)
            if columns:
                cols = [c.name for c in db[name].columns]
                if csv:
                    row.append("\n".join(cols))
                else:
                    row.append(cols)
            if schema:
                row.append(db[name].schema)
            yield row

    if table or fmt:
        print(tabulate.tabulate(_iter(), headers=headers, tablefmt=fmt or "simple"))
    elif csv or tsv:
        writer = csv_std.writer(sys.stdout, dialect="excel-tab" if tsv else "excel")
        if not no_headers:
            writer.writerow(headers)
        for row in _iter():
            writer.writerow(row)
    else:
        for line in output_rows(_iter(), headers, nl, arrays, json_cols):
            click.echo(line)


@cli.command()
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.option(
    "--counts", help="Include row counts per view", default=False, is_flag=True
)
@output_options
@click.option(
    "--columns",
    help="Include list of columns for each view",
    is_flag=True,
    default=False,
)
@click.option(
    "--schema",
    help="Include schema for each view",
    is_flag=True,
    default=False,
)
@load_extension_option
def views(
    path,
    counts,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    columns,
    schema,
    load_extension,
):
    """List the views in the database

    Example:

    \b
        sqlite-utils views trees.db
    """
    tables.callback(
        path=path,
        fts4=False,
        fts5=False,
        counts=counts,
        nl=nl,
        arrays=arrays,
        csv=csv,
        tsv=tsv,
        no_headers=no_headers,
        table=table,
        fmt=fmt,
        json_cols=json_cols,
        columns=columns,
        schema=schema,
        load_extension=load_extension,
        views=True,
    )


@cli.command()
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("tables", nargs=-1)
@click.option("--no-vacuum", help="Don't run VACUUM", default=False, is_flag=True)
@load_extension_option
def optimize(path, tables, no_vacuum, load_extension):
    """Optimize all full-text search tables and then run VACUUM - should shrink the database file

    Example:

    \b
        sqlite-utils optimize chickens.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if not tables:
        tables = db.table_names(fts4=True) + db.table_names(fts5=True)
    with db.conn:
        for table in tables:
            db[table].optimize()
    if not no_vacuum:
        db.vacuum()


@cli.command(name="rebuild-fts")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("tables", nargs=-1)
@load_extension_option
def rebuild_fts(path, tables, load_extension):
    """Rebuild all or specific full-text search tables

    Example:

    \b
        sqlite-utils rebuild-fts chickens.db chickens
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if not tables:
        tables = db.table_names(fts4=True) + db.table_names(fts5=True)
    with db.conn:
        for table in tables:
            db[table].rebuild_fts()


@cli.command()
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("names", nargs=-1)
def analyze(path, names):
    """Run ANALYZE against the whole database, or against specific named indexes and tables

    Example:

    \b
        sqlite-utils analyze chickens.db
    """
    db = sqlite_utils.Database(path)
    try:
        if names:
            for name in names:
                db.analyze(name)
        else:
            db.analyze()
    except OperationalError as e:
        raise click.ClickException(e)


@cli.command()
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
def vacuum(path):
    """Run VACUUM against the database

    Example:

    \b
        sqlite-utils vacuum chickens.db
    """
    sqlite_utils.Database(path).vacuum()


@cli.command()
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@load_extension_option
def dump(path, load_extension):
    """Output a SQL dump of the schema and full contents of the database

    Example:

    \b
        sqlite-utils dump chickens.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    for line in db.iterdump():
        click.echo(line)


@cli.command(name="add-column")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("col_name")
@click.argument(
    "col_type",
    type=click.Choice(
        ["integer", "int", "float", "text", "str", "blob", "bytes"],
        case_sensitive=False,
    ),
    required=False,
)
@click.option(
    "--fk", type=str, required=False, help="Table to reference as a foreign key"
)
@click.option(
    "--fk-col",
    type=str,
    required=False,
    help="Referenced column on that foreign key table - if omitted will automatically use the primary key",
)
@click.option(
    "--not-null-default",
    type=str,
    required=False,
    help="Add NOT NULL DEFAULT 'TEXT' constraint",
)
@click.option(
    "--ignore",
    is_flag=True,
    help="If column already exists, do nothing",
)
@load_extension_option
def add_column(
    path,
    table,
    col_name,
    col_type,
    fk,
    fk_col,
    not_null_default,
    ignore,
    load_extension,
):
    """Add a column to the specified table

    Example:

    \b
        sqlite-utils add-column chickens.db chickens weight float
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db[table].add_column(
            col_name, col_type, fk=fk, fk_col=fk_col, not_null_default=not_null_default
        )
    except OperationalError as ex:
        if not ignore:
            raise click.ClickException(str(ex))


@cli.command(name="add-foreign-key")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("column")
@click.argument("other_table", required=False)
@click.argument("other_column", required=False)
@click.option(
    "--ignore",
    is_flag=True,
    help="If foreign key already exists, do nothing",
)
@load_extension_option
def add_foreign_key(
    path, table, column, other_table, other_column, ignore, load_extension
):
    """
    Add a new foreign key constraint to an existing table

    Example:

        sqlite-utils add-foreign-key my.db books author_id authors id
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db[table].add_foreign_key(column, other_table, other_column, ignore=ignore)
    except AlterError as e:
        raise click.ClickException(e)


@cli.command(name="add-foreign-keys")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("foreign_key", nargs=-1)
@load_extension_option
def add_foreign_keys(path, foreign_key, load_extension):
    """
    Add multiple new foreign key constraints to a database

    Example:

    \b
        sqlite-utils add-foreign-keys my.db \\
            books author_id authors id \\
            authors country_id countries id
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if len(foreign_key) % 4 != 0:
        raise click.ClickException(
            "Each foreign key requires four values: table, column, other_table, other_column"
        )
    tuples = []
    for i in range(len(foreign_key) // 4):
        tuples.append(tuple(foreign_key[i * 4 : (i * 4) + 4]))
    try:
        db.add_foreign_keys(tuples)
    except AlterError as e:
        raise click.ClickException(e)


@cli.command(name="index-foreign-keys")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@load_extension_option
def index_foreign_keys(path, load_extension):
    """
    Ensure every foreign key column has an index on it

    Example:

    \b
        sqlite-utils index-foreign-keys chickens.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    db.index_foreign_keys()


@cli.command(name="create-index")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("column", nargs=-1, required=True)
@click.option("--name", help="Explicit name for the new index")
@click.option("--unique", help="Make this a unique index", default=False, is_flag=True)
@click.option(
    "--if-not-exists",
    "--ignore",
    help="Ignore if index already exists",
    default=False,
    is_flag=True,
)
@click.option(
    "--analyze",
    help="Run ANALYZE after creating the index",
    is_flag=True,
)
@load_extension_option
def create_index(
    path, table, column, name, unique, if_not_exists, analyze, load_extension
):
    """
    Add an index to the specified table for the specified columns

    Example:

    \b
        sqlite-utils create-index chickens.db chickens name

    To create an index in descending order:

    \b
        sqlite-utils create-index chickens.db chickens -- -name
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    # Treat -prefix as descending for columns
    columns = []
    for col in column:
        if col.startswith("-"):
            col = DescIndex(col[1:])
        columns.append(col)
    db[table].create_index(
        columns,
        index_name=name,
        unique=unique,
        if_not_exists=if_not_exists,
        analyze=analyze,
    )


@cli.command(name="enable-fts")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("column", nargs=-1, required=True)
@click.option("--fts4", help="Use FTS4", default=False, is_flag=True)
@click.option("--fts5", help="Use FTS5", default=False, is_flag=True)
@click.option("--tokenize", help="Tokenizer to use, e.g. porter")
@click.option(
    "--create-triggers",
    help="Create triggers to update the FTS tables when the parent table changes.",
    default=False,
    is_flag=True,
)
@click.option(
    "--replace",
    is_flag=True,
    help="Replace existing FTS configuration if it exists",
)
@load_extension_option
def enable_fts(
    path, table, column, fts4, fts5, tokenize, create_triggers, replace, load_extension
):
    """Enable full-text search for specific table and columns"

    Example:

    \b
        sqlite-utils enable-fts chickens.db chickens name
    """
    fts_version = "FTS5"
    if fts4 and fts5:
        click.echo("Can only use one of --fts4 or --fts5", err=True)
        return
    elif fts4:
        fts_version = "FTS4"

    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db[table].enable_fts(
            column,
            fts_version=fts_version,
            tokenize=tokenize,
            create_triggers=create_triggers,
            replace=replace,
        )
    except OperationalError as ex:
        raise click.ClickException(ex)


@cli.command(name="populate-fts")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("column", nargs=-1, required=True)
@load_extension_option
def populate_fts(path, table, column, load_extension):
    """Re-populate full-text search for specific table and columns

    Example:

    \b
        sqlite-utils populate-fts chickens.db chickens name
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    db[table].populate_fts(column)


@cli.command(name="disable-fts")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@load_extension_option
def disable_fts(path, table, load_extension):
    """Disable full-text search for specific table

    Example:

    \b
        sqlite-utils disable-fts chickens.db chickens
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    db[table].disable_fts()


@cli.command(name="enable-wal")
@click.argument(
    "path",
    nargs=-1,
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@load_extension_option
def enable_wal(path, load_extension):
    """Enable WAL for database files

    Example:

    \b
        sqlite-utils enable-wal chickens.db
    """
    for path_ in path:
        db = sqlite_utils.Database(path_)
        _load_extensions(db, load_extension)
        db.enable_wal()


@cli.command(name="disable-wal")
@click.argument(
    "path",
    nargs=-1,
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@load_extension_option
def disable_wal(path, load_extension):
    """Disable WAL for database files

    Example:

    \b
        sqlite-utils disable-wal chickens.db
    """
    for path_ in path:
        db = sqlite_utils.Database(path_)
        _load_extensions(db, load_extension)
        db.disable_wal()


@cli.command(name="enable-counts")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("tables", nargs=-1)
@load_extension_option
def enable_counts(path, tables, load_extension):
    """Configure triggers to update a _counts table with row counts

    Example:

    \b
        sqlite-utils enable-counts chickens.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if not tables:
        db.enable_counts()
    else:
        # Check all tables exist
        bad_tables = [table for table in tables if not db[table].exists()]
        if bad_tables:
            raise click.ClickException("Invalid tables: {}".format(bad_tables))
        for table in tables:
            db[table].enable_counts()


@cli.command(name="reset-counts")
@click.argument(
    "path",
    type=click.Path(exists=True, file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@load_extension_option
def reset_counts(path, load_extension):
    """Reset calculated counts in the _counts table

    Example:

    \b
        sqlite-utils reset-counts chickens.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    db.reset_counts()


_import_options = (
    click.option(
        "--flatten",
        is_flag=True,
        help='Flatten nested JSON objects, so {"a": {"b": 1}} becomes {"a_b": 1}',
    ),
    click.option("--nl", is_flag=True, help="Expect newline-delimited JSON"),
    click.option("-c", "--csv", is_flag=True, help="Expect CSV input"),
    click.option("--tsv", is_flag=True, help="Expect TSV input"),
    click.option("--empty-null", is_flag=True, help="Treat empty strings as NULL"),
    click.option(
        "--lines",
        is_flag=True,
        help="Treat each line as a single value called 'line'",
    ),
    click.option(
        "--text",
        is_flag=True,
        help="Treat input as a single value called 'text'",
    ),
    click.option("--convert", help="Python code to convert each item"),
    click.option(
        "--import",
        "imports",
        type=str,
        multiple=True,
        help="Python modules to import",
    ),
    click.option("--delimiter", help="Delimiter to use for CSV files"),
    click.option("--quotechar", help="Quote character to use for CSV/TSV"),
    click.option("--sniff", is_flag=True, help="Detect delimiter and quote character"),
    click.option("--no-headers", is_flag=True, help="CSV file has no header row"),
    click.option(
        "--encoding",
        help="Character encoding for input, defaults to utf-8",
    ),
)


def import_options(fn):
    for decorator in reversed(_import_options):
        fn = decorator(fn)
    return fn


def insert_upsert_options(*, require_pk=False):
    def inner(fn):
        for decorator in reversed(
            (
                click.argument(
                    "path",
                    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
                    required=True,
                ),
                click.argument("table"),
                click.argument("file", type=click.File("rb"), required=True),
                click.option(
                    "--pk",
                    help="Columns to use as the primary key, e.g. id",
                    multiple=True,
                    required=require_pk,
                ),
            )
            + _import_options
            + (
                click.option(
                    "--batch-size", type=int, default=100, help="Commit every X records"
                ),
                click.option("--stop-after", type=int, help="Stop after X records"),
                click.option(
                    "--alter",
                    is_flag=True,
                    help="Alter existing table to add any missing columns",
                ),
                click.option(
                    "--not-null",
                    multiple=True,
                    help="Columns that should be created as NOT NULL",
                ),
                click.option(
                    "--default",
                    multiple=True,
                    type=(str, str),
                    help="Default value that should be set for a column",
                ),
                click.option(
                    "-d",
                    "--detect-types",
                    is_flag=True,
                    envvar="SQLITE_UTILS_DETECT_TYPES",
                    help="Detect types for columns in CSV/TSV data",
                ),
                click.option(
                    "--analyze",
                    is_flag=True,
                    help="Run ANALYZE at the end of this operation",
                ),
                load_extension_option,
                click.option("--silent", is_flag=True, help="Do not show progress bar"),
                click.option(
                    "--strict",
                    is_flag=True,
                    default=False,
                    help="Apply STRICT mode to created table",
                ),
            )
        ):
            fn = decorator(fn)
        return fn

    return inner


def insert_upsert_implementation(
    path,
    table,
    file,
    pk,
    flatten,
    nl,
    csv,
    tsv,
    empty_null,
    lines,
    text,
    convert,
    imports,
    delimiter,
    quotechar,
    sniff,
    no_headers,
    encoding,
    batch_size,
    stop_after,
    alter,
    upsert,
    ignore=False,
    replace=False,
    truncate=False,
    not_null=None,
    default=None,
    detect_types=None,
    analyze=False,
    load_extension=None,
    silent=False,
    bulk_sql=None,
    functions=None,
    strict=False,
):
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if functions:
        _register_functions(db, functions)
    if (delimiter or quotechar or sniff or no_headers) and not tsv:
        csv = True
    if (nl + csv + tsv) >= 2:
        raise click.ClickException("Use just one of --nl, --csv or --tsv")
    if (csv or tsv) and flatten:
        raise click.ClickException("--flatten cannot be used with --csv or --tsv")
    if empty_null and not (csv or tsv):
        raise click.ClickException("--empty-null can only be used with --csv or --tsv")
    if encoding and not (csv or tsv):
        raise click.ClickException("--encoding must be used with --csv or --tsv")
    if pk and len(pk) == 1:
        pk = pk[0]
    encoding = encoding or "utf-8-sig"

    # The --sniff option needs us to buffer the file to peek ahead
    sniff_buffer = None
    decoded_buffer = None
    if sniff:
        sniff_buffer = io.BufferedReader(file, buffer_size=4096)
        decoded_buffer = io.TextIOWrapper(sniff_buffer, encoding=encoding)
    else:
        decoded_buffer = io.TextIOWrapper(file, encoding=encoding)

    tracker = None
    with file_progress(decoded_buffer, silent=silent) as decoded:
        if csv or tsv:
            if sniff:
                # Read first 2048 bytes and use that to detect
                first_bytes = sniff_buffer.peek(2048)
                dialect = csv_std.Sniffer().sniff(
                    first_bytes.decode(encoding, "ignore")
                )
            else:
                dialect = "excel-tab" if tsv else "excel"
            csv_reader_args = {"dialect": dialect}
            if delimiter:
                csv_reader_args["delimiter"] = delimiter
            if quotechar:
                csv_reader_args["quotechar"] = quotechar
            reader = csv_std.reader(decoded, **csv_reader_args)
            first_row = next(reader)
            if no_headers:
                headers = ["untitled_{}".format(i + 1) for i in range(len(first_row))]
                reader = itertools.chain([first_row], reader)
            else:
                headers = first_row
            if empty_null:
                docs = (
                    dict(zip(headers, [None if cell == "" else cell for cell in row]))
                    for row in reader
                )
            else:
                docs = (dict(zip(headers, row)) for row in reader)
            if detect_types:
                tracker = TypeTracker()
                docs = tracker.wrap(docs)
        elif lines:
            docs = ({"line": line.strip()} for line in decoded)
        elif text:
            docs = ({"text": decoded.read()},)
        else:
            try:
                if nl:
                    docs = (json.loads(line) for line in decoded if line.strip())
                else:
                    docs = json.load(decoded)
                    if isinstance(docs, dict):
                        docs = [docs]
            except json.decoder.JSONDecodeError as ex:
                raise click.ClickException(
                    "Invalid JSON - use --csv for CSV or --tsv for TSV files\n\nJSON error: {}".format(
                        ex
                    )
                )
            if flatten:
                docs = (_flatten(doc) for doc in docs)

        if stop_after:
            docs = itertools.islice(docs, stop_after)

        if convert:
            variable = "row"
            if lines:
                variable = "line"
            elif text:
                variable = "text"
            fn = _compile_code(convert, imports, variable=variable)
            if lines:
                docs = (fn(doc["line"]) for doc in docs)
            elif text:
                # Special case: this is allowed to be an iterable
                text_value = list(docs)[0]["text"]
                fn_return = fn(text_value)
                if isinstance(fn_return, dict):
                    docs = [fn_return]
                else:
                    try:
                        docs = iter(fn_return)
                    except TypeError:
                        raise click.ClickException(
                            "--convert must return dict or iterator"
                        )
            else:
                docs = (fn(doc) or doc for doc in docs)

        extra_kwargs = {
            "ignore": ignore,
            "replace": replace,
            "truncate": truncate,
            "analyze": analyze,
            "strict": strict,
        }
        if not_null:
            extra_kwargs["not_null"] = set(not_null)
        if default:
            extra_kwargs["defaults"] = dict(default)
        if upsert:
            extra_kwargs["upsert"] = upsert

        # docs should all be dictionaries
        docs = (verify_is_dict(doc) for doc in docs)

        # Apply {"$base64": true, ...} decoding, if needed
        docs = (decode_base64_values(doc) for doc in docs)

        # For bulk_sql= we use cursor.executemany() instead
        if bulk_sql:
            if batch_size:
                doc_chunks = chunks(docs, batch_size)
            else:
                doc_chunks = [docs]
            for doc_chunk in doc_chunks:
                with db.conn:
                    db.conn.cursor().executemany(bulk_sql, doc_chunk)
            return

        try:
            db[table].insert_all(
                docs, pk=pk, batch_size=batch_size, alter=alter, **extra_kwargs
            )
        except Exception as e:
            if (
                isinstance(e, OperationalError)
                and e.args
                and "has no column named" in e.args[0]
            ):
                raise click.ClickException(
                    "{}\n\nTry using --alter to add additional columns".format(
                        e.args[0]
                    )
                )
            # If we can find sql= and parameters= arguments, show those
            variables = _find_variables(e.__traceback__, ["sql", "parameters"])
            if "sql" in variables and "parameters" in variables:
                raise click.ClickException(
                    "{}\n\nsql = {}\nparameters = {}".format(
                        str(e), variables["sql"], variables["parameters"]
                    )
                )
            else:
                raise
        if tracker is not None:
            db[table].transform(types=tracker.types)

        # Clean up open file-like objects
        if sniff_buffer:
            sniff_buffer.close()
        if decoded_buffer:
            decoded_buffer.close()


def _find_variables(tb, vars):
    to_find = list(vars)
    found = {}
    for var in to_find:
        if var in tb.tb_frame.f_locals:
            vars.remove(var)
            found[var] = tb.tb_frame.f_locals[var]
    if vars and tb.tb_next:
        found.update(_find_variables(tb.tb_next, vars))
    return found


@cli.command()
@insert_upsert_options()
@click.option(
    "--ignore", is_flag=True, default=False, help="Ignore records if pk already exists"
)
@click.option(
    "--replace",
    is_flag=True,
    default=False,
    help="Replace records if pk already exists",
)
@click.option(
    "--truncate",
    is_flag=True,
    default=False,
    help="Truncate table before inserting records, if table already exists",
)
def insert(
    path,
    table,
    file,
    pk,
    flatten,
    nl,
    csv,
    tsv,
    empty_null,
    lines,
    text,
    convert,
    imports,
    delimiter,
    quotechar,
    sniff,
    no_headers,
    encoding,
    batch_size,
    stop_after,
    alter,
    detect_types,
    analyze,
    load_extension,
    silent,
    ignore,
    replace,
    truncate,
    not_null,
    default,
    strict,
):
    """
    Insert records from FILE into a table, creating the table if it
    does not already exist.

    Example:

        echo '{"name": "Lila"}' | sqlite-utils insert data.db chickens -

    By default the input is expected to be a JSON object or array of objects.

    \b
    - Use --nl for newline-delimited JSON objects
    - Use --csv or --tsv for comma-separated or tab-separated input
    - Use --lines to write each incoming line to a column called "line"
    - Use --text to write the entire input to a column called "text"

    You can also use --convert to pass a fragment of Python code that will
    be used to convert each input.

    Your Python code will be passed a "row" variable representing the
    imported row, and can return a modified row.

    This example uses just the name, latitude and longitude columns from
    a CSV file, converting name to upper case and latitude and longitude
    to floating point numbers:

    \b
        sqlite-utils insert plants.db plants plants.csv --csv --convert '
          return {
            "name": row["name"].upper(),
            "latitude": float(row["latitude"]),
            "longitude": float(row["longitude"]),
          }'

    If you are using --lines your code will be passed a "line" variable,
    and for --text a "text" variable.

    When using --text your function can return an iterator of rows to
    insert. This example inserts one record per word in the input:

    \b
        echo 'A bunch of words' | sqlite-utils insert words.db words - \\
          --text --convert '({"word": w} for w in text.split())'
    """
    try:
        insert_upsert_implementation(
            path,
            table,
            file,
            pk,
            flatten,
            nl,
            csv,
            tsv,
            empty_null,
            lines,
            text,
            convert,
            imports,
            delimiter,
            quotechar,
            sniff,
            no_headers,
            encoding,
            batch_size,
            stop_after,
            alter=alter,
            upsert=False,
            ignore=ignore,
            replace=replace,
            truncate=truncate,
            detect_types=detect_types,
            analyze=analyze,
            load_extension=load_extension,
            silent=silent,
            not_null=not_null,
            default=default,
            strict=strict,
        )
    except UnicodeDecodeError as ex:
        raise click.ClickException(UNICODE_ERROR.format(ex))


@cli.command()
@insert_upsert_options(require_pk=True)
def upsert(
    path,
    table,
    file,
    pk,
    flatten,
    nl,
    csv,
    tsv,
    empty_null,
    lines,
    text,
    convert,
    imports,
    batch_size,
    stop_after,
    delimiter,
    quotechar,
    sniff,
    no_headers,
    encoding,
    alter,
    not_null,
    default,
    detect_types,
    analyze,
    load_extension,
    silent,
    strict,
):
    """
    Upsert records based on their primary key. Works like 'insert' but if
    an incoming record has a primary key that matches an existing record
    the existing record will be updated.

    Example:

    \b
        echo '[
            {"id": 1, "name": "Lila"},
            {"id": 2, "name": "Suna"}
        ]' | sqlite-utils upsert data.db chickens - --pk id
    """
    try:
        insert_upsert_implementation(
            path,
            table,
            file,
            pk,
            flatten,
            nl,
            csv,
            tsv,
            empty_null,
            lines,
            text,
            convert,
            imports,
            delimiter,
            quotechar,
            sniff,
            no_headers,
            encoding,
            batch_size,
            stop_after,
            alter=alter,
            upsert=True,
            not_null=not_null,
            default=default,
            detect_types=detect_types,
            analyze=analyze,
            load_extension=load_extension,
            silent=silent,
            strict=strict,
        )
    except UnicodeDecodeError as ex:
        raise click.ClickException(UNICODE_ERROR.format(ex))


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("sql")
@click.argument("file", type=click.File("rb"), required=True)
@click.option("--batch-size", type=int, default=100, help="Commit every X records")
@click.option(
    "--functions", help="Python code defining one or more custom SQL functions"
)
@import_options
@load_extension_option
def bulk(
    path,
    sql,
    file,
    batch_size,
    functions,
    flatten,
    nl,
    csv,
    tsv,
    empty_null,
    lines,
    text,
    convert,
    imports,
    delimiter,
    quotechar,
    sniff,
    no_headers,
    encoding,
    load_extension,
):
    """
    Execute parameterized SQL against the provided list of documents.

    Example:

    \b
        echo '[
            {"id": 1, "name": "Lila2"},
            {"id": 2, "name": "Suna2"}
        ]' | sqlite-utils bulk data.db '
            update chickens set name = :name where id = :id
        ' -
    """
    try:
        insert_upsert_implementation(
            path=path,
            table=None,
            file=file,
            pk=None,
            flatten=flatten,
            nl=nl,
            csv=csv,
            tsv=tsv,
            empty_null=empty_null,
            lines=lines,
            text=text,
            convert=convert,
            imports=imports,
            delimiter=delimiter,
            quotechar=quotechar,
            sniff=sniff,
            no_headers=no_headers,
            encoding=encoding,
            batch_size=batch_size,
            stop_after=None,
            alter=False,
            upsert=False,
            not_null=set(),
            default={},
            detect_types=False,
            load_extension=load_extension,
            silent=False,
            bulk_sql=sql,
            functions=functions,
        )
    except (OperationalError, sqlite3.IntegrityError) as e:
        raise click.ClickException(str(e))


@cli.command(name="create-database")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.option(
    "--enable-wal", is_flag=True, help="Enable WAL mode on the created database"
)
@click.option(
    "--init-spatialite", is_flag=True, help="Enable SpatiaLite on the created database"
)
@load_extension_option
def create_database(path, enable_wal, init_spatialite, load_extension):
    """Create a new empty database file

    Example:

    \b
        sqlite-utils create-database trees.db
    """
    db = sqlite_utils.Database(path)
    if enable_wal:
        db.enable_wal()

    # load spatialite or another extension from a custom location
    if load_extension:
        _load_extensions(db, load_extension)

    # load spatialite from expected locations and initialize metadata
    if init_spatialite:
        db.init_spatialite()

    db.vacuum()


@cli.command(name="create-table")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("columns", nargs=-1, required=True)
@click.option("pks", "--pk", help="Column to use as primary key", multiple=True)
@click.option(
    "--not-null",
    multiple=True,
    help="Columns that should be created as NOT NULL",
)
@click.option(
    "--default",
    multiple=True,
    type=(str, str),
    help="Default value that should be set for a column",
)
@click.option(
    "--fk",
    multiple=True,
    type=(str, str, str),
    help="Column, other table, other column to set as a foreign key",
)
@click.option(
    "--ignore",
    is_flag=True,
    help="If table already exists, do nothing",
)
@click.option(
    "--replace",
    is_flag=True,
    help="If table already exists, replace it",
)
@click.option(
    "--transform",
    is_flag=True,
    help="If table already exists, try to transform the schema",
)
@load_extension_option
@click.option(
    "--strict",
    is_flag=True,
    help="Apply STRICT mode to created table",
)
def create_table(
    path,
    table,
    columns,
    pks,
    not_null,
    default,
    fk,
    ignore,
    replace,
    transform,
    load_extension,
    strict,
):
    """
    Add a table with the specified columns. Columns should be specified using
    name, type pairs, for example:

    \b
        sqlite-utils create-table my.db people \\
            id integer \\
            name text \\
            height float \\
            photo blob --pk id

    Valid column types are text, integer, float and blob.
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if len(columns) % 2 == 1:
        raise click.ClickException(
            "columns must be an even number of 'name' 'type' pairs"
        )
    coltypes = {}
    columns = list(columns)
    while columns:
        name = columns.pop(0)
        ctype = columns.pop(0)
        if ctype.upper() not in VALID_COLUMN_TYPES:
            raise click.ClickException(
                "column types must be one of {}".format(VALID_COLUMN_TYPES)
            )
        coltypes[name] = ctype.upper()
    # Does table already exist?
    if table in db.table_names():
        if not ignore and not replace and not transform:
            raise click.ClickException(
                'Table "{}" already exists. Use --replace to delete and replace it.'.format(
                    table
                )
            )
    db[table].create(
        coltypes,
        pk=pks[0] if len(pks) == 1 else pks,
        not_null=not_null,
        defaults=dict(default),
        foreign_keys=fk,
        ignore=ignore,
        replace=replace,
        transform=transform,
        strict=strict,
    )


@cli.command(name="duplicate")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("new_table")
@click.option("--ignore", is_flag=True, help="If table does not exist, do nothing")
@load_extension_option
def duplicate(path, table, new_table, ignore, load_extension):
    """
    Create a duplicate of this table, copying across the schema and all row data.
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db[table].duplicate(new_table)
    except NoTable:
        if not ignore:
            raise click.ClickException('Table "{}" does not exist'.format(table))


@cli.command(name="rename-table")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("new_name")
@click.option("--ignore", is_flag=True, help="If table does not exist, do nothing")
@load_extension_option
def rename_table(path, table, new_name, ignore, load_extension):
    """
    Rename this table.
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db.rename_table(table, new_name)
    except sqlite3.OperationalError as ex:
        if not ignore:
            raise click.ClickException(
                'Table "{}" could not be renamed. {}'.format(table, str(ex))
            )


@cli.command(name="drop-table")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.option("--ignore", is_flag=True, help="If table does not exist, do nothing")
@load_extension_option
def drop_table(path, table, ignore, load_extension):
    """Drop the specified table

    Example:

    \b
        sqlite-utils drop-table chickens.db chickens
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db[table].drop(ignore=ignore)
    except OperationalError:
        raise click.ClickException('Table "{}" does not exist'.format(table))


@cli.command(name="create-view")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("view")
@click.argument("select")
@click.option(
    "--ignore",
    is_flag=True,
    help="If view already exists, do nothing",
)
@click.option(
    "--replace",
    is_flag=True,
    help="If view already exists, replace it",
)
@load_extension_option
def create_view(path, view, select, ignore, replace, load_extension):
    """Create a view for the provided SELECT query

    Example:

    \b
        sqlite-utils create-view chickens.db heavy_chickens \\
          'select * from chickens where weight > 3'
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    # Does view already exist?
    if view in db.view_names():
        if ignore:
            return
        elif replace:
            db[view].drop()
        else:
            raise click.ClickException(
                'View "{}" already exists. Use --replace to delete and replace it.'.format(
                    view
                )
            )
    db.create_view(view, select)


@cli.command(name="drop-view")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("view")
@click.option("--ignore", is_flag=True, help="If view does not exist, do nothing")
@load_extension_option
def drop_view(path, view, ignore, load_extension):
    """Drop the specified view

    Example:

    \b
        sqlite-utils drop-view chickens.db heavy_chickens
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    try:
        db[view].drop(ignore=ignore)
    except OperationalError:
        raise click.ClickException('View "{}" does not exist'.format(view))


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("sql")
@click.option(
    "--attach",
    type=(str, click.Path(file_okay=True, dir_okay=False, allow_dash=False)),
    multiple=True,
    help="Additional databases to attach - specify alias and filepath",
)
@output_options
@click.option("-r", "--raw", is_flag=True, help="Raw output, first column of first row")
@click.option("--raw-lines", is_flag=True, help="Raw output, first column of each row")
@click.option(
    "-p",
    "--param",
    multiple=True,
    type=(str, str),
    help="Named :parameters for SQL query",
)
@click.option(
    "--functions", help="Python code defining one or more custom SQL functions"
)
@load_extension_option
def query(
    path,
    sql,
    attach,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    raw,
    raw_lines,
    param,
    load_extension,
    functions,
):
    """Execute SQL query and return the results as JSON

    Example:

    \b
        sqlite-utils data.db \\
            "select * from chickens where age > :age" \\
            -p age 1
    """
    db = sqlite_utils.Database(path)
    for alias, attach_path in attach:
        db.attach(alias, attach_path)
    _load_extensions(db, load_extension)
    db.register_fts4_bm25()

    if functions:
        _register_functions(db, functions)

    _execute_query(
        db,
        sql,
        param,
        raw,
        raw_lines,
        table,
        csv,
        tsv,
        no_headers,
        fmt,
        nl,
        arrays,
        json_cols,
    )


@cli.command()
@click.argument(
    "paths",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=True),
    required=False,
    nargs=-1,
)
@click.argument("sql")
@click.option(
    "--functions", help="Python code defining one or more custom SQL functions"
)
@click.option(
    "--attach",
    type=(str, click.Path(file_okay=True, dir_okay=False, allow_dash=False)),
    multiple=True,
    help="Additional databases to attach - specify alias and filepath",
)
@click.option(
    "--flatten",
    is_flag=True,
    help='Flatten nested JSON objects, so {"foo": {"bar": 1}} becomes {"foo_bar": 1}',
)
@output_options
@click.option("-r", "--raw", is_flag=True, help="Raw output, first column of first row")
@click.option("--raw-lines", is_flag=True, help="Raw output, first column of each row")
@click.option(
    "-p",
    "--param",
    multiple=True,
    type=(str, str),
    help="Named :parameters for SQL query",
)
@click.option(
    "--encoding",
    help="Character encoding for CSV input, defaults to utf-8",
)
@click.option(
    "-n",
    "--no-detect-types",
    is_flag=True,
    help="Treat all CSV/TSV columns as TEXT",
)
@click.option("--schema", is_flag=True, help="Show SQL schema for in-memory database")
@click.option("--dump", is_flag=True, help="Dump SQL for in-memory database")
@click.option(
    "--save",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    help="Save in-memory database to this file",
)
@click.option(
    "--analyze",
    is_flag=True,
    help="Analyze resulting tables and output results",
)
@load_extension_option
def memory(
    paths,
    sql,
    functions,
    attach,
    flatten,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    raw,
    raw_lines,
    param,
    encoding,
    no_detect_types,
    schema,
    dump,
    save,
    analyze,
    load_extension,
    return_db=False,
):
    """Execute SQL query against an in-memory database, optionally populated by imported data

    To import data from CSV, TSV or JSON files pass them on the command-line:

    \b
        sqlite-utils memory one.csv two.json \\
            "select * from one join two on one.two_id = two.id"

    For data piped into the tool from standard input, use "-" or "stdin":

    \b
        cat animals.csv | sqlite-utils memory - \\
            "select * from stdin where species = 'dog'"

    The format of the data will be automatically detected. You can specify the format
    explicitly using :json, :csv, :tsv or :nl (for newline-delimited JSON) - for example:

    \b
        cat animals.csv | sqlite-utils memory stdin:csv places.dat:nl \\
            "select * from stdin where place_id in (select id from places)"

    Use --schema to view the SQL schema of any imported files:

    \b
        sqlite-utils memory animals.csv --schema
    """
    db = sqlite_utils.Database(memory=True)

    # If --dump or --save or --analyze used but no paths detected, assume SQL query is a path:
    if (dump or save or schema or analyze) and not paths:
        paths = [sql]
        sql = None
    stem_counts = {}
    for i, path in enumerate(paths):
        # Path may have a :format suffix
        fp = None
        if ":" in path and path.rsplit(":", 1)[-1].upper() in Format.__members__:
            path, suffix = path.rsplit(":", 1)
            format = Format[suffix.upper()]
        else:
            format = None
        if path in ("-", "stdin"):
            fp = sys.stdin.buffer
            file_table = "stdin"
        else:
            file_path = pathlib.Path(path)
            stem = file_path.stem
            if stem_counts.get(stem):
                file_table = "{}_{}".format(stem, stem_counts[stem])
            else:
                file_table = stem
            stem_counts[stem] = stem_counts.get(stem, 1) + 1
            fp = file_path.open("rb")
        rows, format_used = rows_from_file(fp, format=format, encoding=encoding)
        tracker = None
        if format_used in (Format.CSV, Format.TSV) and not no_detect_types:
            tracker = TypeTracker()
            rows = tracker.wrap(rows)
        if flatten:
            rows = (_flatten(row) for row in rows)

        db[file_table].insert_all(rows, alter=True)
        if tracker is not None:
            db[file_table].transform(types=tracker.types)
        # Add convenient t / t1 / t2 views
        view_names = ["t{}".format(i + 1)]
        if i == 0:
            view_names.append("t")
        for view_name in view_names:
            if not db[view_name].exists():
                db.create_view(view_name, "select * from [{}]".format(file_table))

        if fp:
            fp.close()

    if analyze:
        _analyze(db, tables=None, columns=None, save=False)
        return

    if dump:
        for line in db.iterdump():
            click.echo(line)
        return

    if schema:
        click.echo(db.schema)
        return

    if save:
        db2 = sqlite_utils.Database(save)
        for line in db.iterdump():
            db2.execute(line)
        return

    for alias, attach_path in attach:
        db.attach(alias, attach_path)
    _load_extensions(db, load_extension)
    db.register_fts4_bm25()

    if functions:
        _register_functions(db, functions)

    if return_db:
        return db

    _execute_query(
        db,
        sql,
        param,
        raw,
        raw_lines,
        table,
        csv,
        tsv,
        no_headers,
        fmt,
        nl,
        arrays,
        json_cols,
    )


def _execute_query(
    db,
    sql,
    param,
    raw,
    raw_lines,
    table,
    csv,
    tsv,
    no_headers,
    fmt,
    nl,
    arrays,
    json_cols,
):
    with db.conn:
        try:
            cursor = db.execute(sql, dict(param))
        except OperationalError as e:
            raise click.ClickException(str(e))
        if cursor.description is None:
            # This was an update/insert
            headers = ["rows_affected"]
            cursor = [[cursor.rowcount]]
        else:
            headers = [c[0] for c in cursor.description]
        if raw:
            data = cursor.fetchone()[0]
            if isinstance(data, bytes):
                sys.stdout.buffer.write(data)
            else:
                sys.stdout.write(str(data))
        elif raw_lines:
            for row in cursor:
                data = row[0]
                if isinstance(data, bytes):
                    sys.stdout.buffer.write(data + b"\n")
                else:
                    sys.stdout.write(str(data) + "\n")
        elif fmt or table:
            print(
                tabulate.tabulate(
                    list(cursor), headers=headers, tablefmt=fmt or "simple"
                )
            )
        elif csv or tsv:
            writer = csv_std.writer(sys.stdout, dialect="excel-tab" if tsv else "excel")
            if not no_headers:
                writer.writerow(headers)
            for row in cursor:
                writer.writerow(row)
        else:
            for line in output_rows(cursor, headers, nl, arrays, json_cols):
                click.echo(line)


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("dbtable")
@click.argument("q")
@click.option("-o", "--order", type=str, help="Order by ('column' or 'column desc')")
@click.option("-c", "--column", type=str, multiple=True, help="Columns to return")
@click.option(
    "--limit",
    type=int,
    help="Number of rows to return - defaults to everything",
)
@click.option(
    "--sql", "show_sql", is_flag=True, help="Show SQL query that would be run"
)
@click.option("--quote", is_flag=True, help="Apply FTS quoting rules to search term")
@output_options
@load_extension_option
@click.pass_context
def search(
    ctx,
    path,
    dbtable,
    q,
    order,
    show_sql,
    quote,
    column,
    limit,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    load_extension,
):
    """Execute a full-text search against this table

    Example:

        sqlite-utils search data.db chickens lila
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    # Check table exists
    table_obj = db[dbtable]
    if not table_obj.exists():
        raise click.ClickException("Table '{}' does not exist".format(dbtable))
    if not table_obj.detect_fts():
        raise click.ClickException(
            "Table '{}' is not configured for full-text search".format(dbtable)
        )
    if column:
        # Check they all exist
        table_columns = table_obj.columns_dict
        for c in column:
            if c not in table_columns:
                raise click.ClickException(
                    "Table '{}' has no column '{}".format(dbtable, c)
                )
    sql = table_obj.search_sql(columns=column, order_by=order, limit=limit)
    if show_sql:
        click.echo(sql)
        return
    if quote:
        q = db.quote_fts(q)
    try:
        ctx.invoke(
            query,
            path=path,
            sql=sql,
            nl=nl,
            arrays=arrays,
            csv=csv,
            tsv=tsv,
            no_headers=no_headers,
            table=table,
            fmt=fmt,
            json_cols=json_cols,
            param=[("query", q)],
            load_extension=load_extension,
        )
    except click.ClickException as e:
        if "malformed MATCH expression" in str(e) or "unterminated string" in str(e):
            raise click.ClickException(
                "{}\n\nTry running this again with the --quote option".format(str(e))
            )
        else:
            raise


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("dbtable")
@click.option("-c", "--column", type=str, multiple=True, help="Columns to return")
@click.option("--where", help="Optional where clause")
@click.option("-o", "--order", type=str, help="Order by ('column' or 'column desc')")
@click.option(
    "-p",
    "--param",
    multiple=True,
    type=(str, str),
    help="Named :parameters for where clause",
)
@click.option(
    "--limit",
    type=int,
    help="Number of rows to return - defaults to everything",
)
@click.option(
    "--offset",
    type=int,
    help="SQL offset to use",
)
@output_options
@load_extension_option
@click.pass_context
def rows(
    ctx,
    path,
    dbtable,
    column,
    where,
    order,
    param,
    limit,
    offset,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    load_extension,
):
    """Output all rows in the specified table

    Example:

    \b
        sqlite-utils rows trees.db Trees
    """
    columns = "*"
    if column:
        columns = ", ".join("[{}]".format(c) for c in column)
    sql = "select {} from [{}]".format(columns, dbtable)
    if where:
        sql += " where " + where
    if order:
        sql += " order by " + order
    if limit:
        sql += " limit {}".format(limit)
    if offset:
        sql += " offset {}".format(offset)
    ctx.invoke(
        query,
        path=path,
        sql=sql,
        nl=nl,
        arrays=arrays,
        csv=csv,
        tsv=tsv,
        no_headers=no_headers,
        table=table,
        fmt=fmt,
        param=param,
        json_cols=json_cols,
        load_extension=load_extension,
    )


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("tables", nargs=-1)
@output_options
@load_extension_option
@click.pass_context
def triggers(
    ctx,
    path,
    tables,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    load_extension,
):
    """Show triggers configured in this database

    Example:

    \b
        sqlite-utils triggers trees.db
    """
    sql = "select name, tbl_name as [table], sql from sqlite_master where type = 'trigger'"
    if tables:
        quote = sqlite_utils.Database(memory=True).quote
        sql += " and [table] in ({})".format(
            ", ".join(quote(table) for table in tables)
        )
    ctx.invoke(
        query,
        path=path,
        sql=sql,
        nl=nl,
        arrays=arrays,
        csv=csv,
        tsv=tsv,
        no_headers=no_headers,
        table=table,
        fmt=fmt,
        json_cols=json_cols,
        load_extension=load_extension,
    )


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("tables", nargs=-1)
@click.option("--aux", is_flag=True, help="Include auxiliary columns")
@output_options
@load_extension_option
@click.pass_context
def indexes(
    ctx,
    path,
    tables,
    aux,
    nl,
    arrays,
    csv,
    tsv,
    no_headers,
    table,
    fmt,
    json_cols,
    load_extension,
):
    """Show indexes for the whole database or specific tables

    Example:

    \b
        sqlite-utils indexes trees.db Trees
    """
    sql = """
    select
      sqlite_master.name as "table",
      indexes.name as index_name,
      xinfo.*
    from sqlite_master
      join pragma_index_list(sqlite_master.name) indexes
      join pragma_index_xinfo(index_name) xinfo
    where
      sqlite_master.type = 'table'
    """
    if tables:
        quote = sqlite_utils.Database(memory=True).quote
        sql += " and sqlite_master.name in ({})".format(
            ", ".join(quote(table) for table in tables)
        )
    if not aux:
        sql += " and xinfo.key = 1"
    ctx.invoke(
        query,
        path=path,
        sql=sql,
        nl=nl,
        arrays=arrays,
        csv=csv,
        tsv=tsv,
        no_headers=no_headers,
        table=table,
        fmt=fmt,
        json_cols=json_cols,
        load_extension=load_extension,
    )


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("tables", nargs=-1, required=False)
@load_extension_option
def schema(
    path,
    tables,
    load_extension,
):
    """Show full schema for this database or for specified tables

    Example:

    \b
        sqlite-utils schema trees.db
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    if tables:
        for table in tables:
            click.echo(db[table].schema)
    else:
        click.echo(db.schema)


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.option(
    "--type",
    type=(
        str,
        click.Choice(["INTEGER", "TEXT", "FLOAT", "BLOB"], case_sensitive=False),
    ),
    multiple=True,
    help="Change column type to INTEGER, TEXT, FLOAT or BLOB",
)
@click.option("--drop", type=str, multiple=True, help="Drop this column")
@click.option(
    "--rename", type=(str, str), multiple=True, help="Rename this column to X"
)
@click.option("-o", "--column-order", type=str, multiple=True, help="Reorder columns")
@click.option("--not-null", type=str, multiple=True, help="Set this column to NOT NULL")
@click.option(
    "--not-null-false", type=str, multiple=True, help="Remove NOT NULL from this column"
)
@click.option("--pk", type=str, multiple=True, help="Make this column the primary key")
@click.option(
    "--pk-none", is_flag=True, help="Remove primary key (convert to rowid table)"
)
@click.option(
    "--default",
    type=(str, str),
    multiple=True,
    help="Set default value for this column",
)
@click.option(
    "--default-none", type=str, multiple=True, help="Remove default from this column"
)
@click.option(
    "add_foreign_keys",
    "--add-foreign-key",
    type=(str, str, str),
    multiple=True,
    help="Add a foreign key constraint from a column to another table with another column",
)
@click.option(
    "drop_foreign_keys",
    "--drop-foreign-key",
    type=str,
    multiple=True,
    help="Drop foreign key constraint for this column",
)
@click.option("--sql", is_flag=True, help="Output SQL without executing it")
@load_extension_option
def transform(
    path,
    table,
    type,
    drop,
    rename,
    column_order,
    not_null,
    not_null_false,
    pk,
    pk_none,
    default,
    default_none,
    add_foreign_keys,
    drop_foreign_keys,
    sql,
    load_extension,
):
    """Transform a table beyond the capabilities of ALTER TABLE

    Example:

    \b
        sqlite-utils transform mydb.db mytable \\
            --drop column1 \\
            --rename column2 column_renamed
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    types = {}
    kwargs = {}
    for column, ctype in type:
        if ctype.upper() not in VALID_COLUMN_TYPES:
            raise click.ClickException(
                "column types must be one of {}".format(VALID_COLUMN_TYPES)
            )
        types[column] = ctype.upper()

    not_null_dict = {}
    for column in not_null:
        not_null_dict[column] = True
    for column in not_null_false:
        not_null_dict[column] = False

    default_dict = {}
    for column, value in default:
        default_dict[column] = value
    for column in default_none:
        default_dict[column] = None

    kwargs["types"] = types
    kwargs["drop"] = set(drop)
    kwargs["rename"] = dict(rename)
    kwargs["column_order"] = column_order or None
    kwargs["not_null"] = not_null_dict
    if pk:
        if len(pk) == 1:
            kwargs["pk"] = pk[0]
        else:
            kwargs["pk"] = pk
    elif pk_none:
        kwargs["pk"] = None
    kwargs["defaults"] = default_dict
    if drop_foreign_keys:
        kwargs["drop_foreign_keys"] = drop_foreign_keys
    if add_foreign_keys:
        kwargs["add_foreign_keys"] = add_foreign_keys

    if sql:
        for line in db[table].transform_sql(**kwargs):
            click.echo(line)
    else:
        db[table].transform(**kwargs)


@cli.command()
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument("columns", nargs=-1, required=True)
@click.option(
    "--table", "other_table", help="Name of the other table to extract columns to"
)
@click.option("--fk-column", help="Name of the foreign key column to add to the table")
@click.option(
    "--rename",
    type=(str, str),
    multiple=True,
    help="Rename this column in extracted table",
)
@load_extension_option
def extract(
    path,
    table,
    columns,
    other_table,
    fk_column,
    rename,
    load_extension,
):
    """Extract one or more columns into a separate table

    Example:

    \b
        sqlite-utils extract trees.db Street_Trees species
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    kwargs = dict(
        columns=columns,
        table=other_table,
        fk_column=fk_column,
        rename=dict(rename),
    )
    db[table].extract(**kwargs)


@cli.command(name="insert-files")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table")
@click.argument(
    "file_or_dir",
    nargs=-1,
    required=True,
    type=click.Path(file_okay=True, dir_okay=True, allow_dash=True),
)
@click.option(
    "-c",
    "--column",
    type=str,
    multiple=True,
    help="Column definitions for the table",
)
@click.option("pks", "--pk", help="Column to use as primary key", multiple=True)
@click.option("--alter", is_flag=True, help="Alter table to add missing columns")
@click.option("--replace", is_flag=True, help="Replace files with matching primary key")
@click.option("--upsert", is_flag=True, help="Upsert files with matching primary key")
@click.option("--name", type=str, help="File name to use")
@click.option("--text", is_flag=True, help="Store file content as TEXT, not BLOB")
@click.option(
    "--encoding",
    help="Character encoding for input, defaults to utf-8",
)
@click.option("-s", "--silent", is_flag=True, help="Don't show a progress bar")
@load_extension_option
def insert_files(
    path,
    table,
    file_or_dir,
    column,
    pks,
    alter,
    replace,
    upsert,
    name,
    text,
    encoding,
    silent,
    load_extension,
):
    """
    Insert one or more files using BLOB columns in the specified table

    Example:

    \b
        sqlite-utils insert-files pics.db images *.gif \\
            -c name:name \\
            -c content:content \\
            -c content_hash:sha256 \\
            -c created:ctime_iso \\
            -c modified:mtime_iso \\
            -c size:size \\
            --pk name
    """
    if not column:
        if text:
            column = ["path:path", "content_text:content_text", "size:size"]
        else:
            column = ["path:path", "content:content", "size:size"]
        if not pks:
            pks = ["path"]

    def yield_paths_and_relative_paths():
        for f_or_d in file_or_dir:
            path = pathlib.Path(f_or_d)
            if f_or_d == "-":
                yield "-", "-"
            elif path.is_dir():
                for subpath in path.rglob("*"):
                    if subpath.is_file():
                        yield subpath, subpath.relative_to(path)
            elif path.is_file():
                yield path, path

    # Load all paths so we can show a progress bar
    paths_and_relative_paths = list(yield_paths_and_relative_paths())

    with progressbar(paths_and_relative_paths, silent=silent) as bar:

        def to_insert():
            for path, relative_path in bar:
                row = {}
                # content_text is special case as it considers 'encoding'

                def _content_text(p):
                    resolved = p.resolve()
                    try:
                        return resolved.read_text(encoding=encoding)
                    except UnicodeDecodeError as e:
                        raise UnicodeDecodeErrorForPath(e, resolved)

                lookups = dict(FILE_COLUMNS, content_text=_content_text)
                if path == "-":
                    stdin_data = sys.stdin.buffer.read()
                    # We only support a subset of columns for this case
                    lookups = {
                        "name": lambda p: name or "-",
                        "path": lambda p: name or "-",
                        "content": lambda p: stdin_data,
                        "content_text": lambda p: stdin_data.decode(
                            encoding or "utf-8"
                        ),
                        "sha256": lambda p: hashlib.sha256(stdin_data).hexdigest(),
                        "md5": lambda p: hashlib.md5(stdin_data).hexdigest(),
                        "size": lambda p: len(stdin_data),
                    }
                for coldef in column:
                    if ":" in coldef:
                        colname, coltype = coldef.rsplit(":", 1)
                    else:
                        colname, coltype = coldef, coldef
                    try:
                        value = lookups[coltype](path)
                        row[colname] = value
                    except KeyError:
                        raise click.ClickException(
                            "'{}' is not a valid column definition - options are {}".format(
                                coltype, ", ".join(lookups.keys())
                            )
                        )
                    # Special case for --name
                    if coltype == "name" and name:
                        row[colname] = name
                yield row

        db = sqlite_utils.Database(path)
        _load_extensions(db, load_extension)
        try:
            with db.conn:
                db[table].insert_all(
                    to_insert(),
                    pk=pks[0] if len(pks) == 1 else pks,
                    alter=alter,
                    replace=replace,
                    upsert=upsert,
                )
        except UnicodeDecodeErrorForPath as e:
            raise click.ClickException(
                UNICODE_ERROR.format(
                    "Could not read file '{}' as text\n\n{}".format(e.path, e.exception)
                )
            )


@cli.command(name="analyze-tables")
@click.argument(
    "path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False, exists=True),
    required=True,
)
@click.argument("tables", nargs=-1)
@click.option(
    "-c",
    "--column",
    "columns",
    type=str,
    multiple=True,
    help="Specific columns to analyze",
)
@click.option("--save", is_flag=True, help="Save results to _analyze_tables table")
@click.option("--common-limit", type=int, default=10, help="How many common values")
@click.option("--no-most", is_flag=True, default=False, help="Skip most common values")
@click.option(
    "--no-least", is_flag=True, default=False, help="Skip least common values"
)
@load_extension_option
def analyze_tables(
    path,
    tables,
    columns,
    save,
    common_limit,
    no_most,
    no_least,
    load_extension,
):
    """Analyze the columns in one or more tables

    Example:

    \b
        sqlite-utils analyze-tables data.db trees
    """
    db = sqlite_utils.Database(path)
    _load_extensions(db, load_extension)
    _analyze(db, tables, columns, save, common_limit, no_most, no_least)


def _analyze(db, tables, columns, save, common_limit=10, no_most=False, no_least=False):
    if not tables:
        tables = db.table_names()
    todo = []
    table_counts = {}
    seen_columns = set()
    for table in tables:
        table_counts[table] = db[table].count
        for column in db[table].columns:
            if not columns or column.name in columns:
                todo.append((table, column.name))
                seen_columns.add(column.name)
    # Check the user didn't specify a column that doesn't exist
    if columns and (set(columns) - seen_columns):
        raise click.ClickException(
            "These columns were not found: {}".format(
                ", ".join(sorted(set(columns) - seen_columns))
            )
        )
    # Now we now how many we need to do
    for i, (table, column) in enumerate(todo):
        column_details = db[table].analyze_column(
            column,
            common_limit=common_limit,
            total_rows=table_counts[table],
            value_truncate=80,
            most_common=not no_most,
            least_common=not no_least,
        )
        if save:
            db["_analyze_tables_"].insert(
                column_details._asdict(), pk=("table", "column"), replace=True
            )
        most_common_rendered = ""
        if column_details.num_null != column_details.total_rows:
            most_common_rendered = _render_common(
                "\n\n  Most common:", column_details.most_common
            )
        least_common_rendered = _render_common(
            "\n\n  Least common:", column_details.least_common
        )
        details = (
            (
                textwrap.dedent(
                    """
        {table}.{column}: ({i}/{total})

          Total rows: {total_rows}
          Null rows: {num_null}
          Blank rows: {num_blank}

          Distinct values: {num_distinct}{most_common_rendered}{least_common_rendered}
        """
                )
                .strip()
                .format(
                    i=i + 1,
                    total=len(todo),
                    most_common_rendered=most_common_rendered,
                    least_common_rendered=least_common_rendered,
                    **column_details._asdict(),
                )
            )
            + "\n"
        )
        click.echo(details)


@cli.command()
@click.argument("packages", nargs=-1, required=False)
@click.option(
    "-U", "--upgrade", is_flag=True, help="Upgrade packages to latest version"
)
@click.option(
    "-e",
    "--editable",
    help="Install a project in editable mode from this path",
)
def install(packages, upgrade, editable):
    """Install packages from PyPI into the same environment as sqlite-utils"""
    args = ["pip", "install"]
    if upgrade:
        args += ["--upgrade"]
    if editable:
        args += ["--editable", editable]
    args += list(packages)
    sys.argv = args
    run_module("pip", run_name="__main__")


@cli.command()
@click.argument("packages", nargs=-1, required=True)
@click.option("-y", "--yes", is_flag=True, help="Don't ask for confirmation")
def uninstall(packages, yes):
    """Uninstall Python packages from the sqlite-utils environment"""
    sys.argv = ["pip", "uninstall"] + list(packages) + (["-y"] if yes else [])
    run_module("pip", run_name="__main__")


def _generate_convert_help():
    help = textwrap.dedent(
        """
    Convert columns using Python code you supply. For example:

    \b
    sqlite-utils convert my.db mytable mycolumn \\
        '"\\n".join(textwrap.wrap(value, 10))' \\
        --import=textwrap

    "value" is a variable with the column value to be converted.

    Use "-" for CODE to read Python code from standard input.

    The following common operations are available as recipe functions:
    """
    ).strip()
    recipe_names = [
        n
        for n in dir(recipes)
        if not n.startswith("_")
        and n not in ("json", "parser")
        and callable(getattr(recipes, n))
    ]
    for name in recipe_names:
        fn = getattr(recipes, name)
        help += "\n\nr.{}{}\n\n\b{}".format(
            name, str(inspect.signature(fn)), textwrap.dedent(fn.__doc__.rstrip())
        )
    help += "\n\n"
    help += textwrap.dedent(
        """
    You can use these recipes like so:

    \b
    sqlite-utils convert my.db mytable mycolumn \\
        'r.jsonsplit(value, delimiter=":")'
    """
    ).strip()
    return help


@cli.command(help=_generate_convert_help())
@click.argument(
    "db_path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table", type=str)
@click.argument("columns", type=str, nargs=-1, required=True)
@click.argument("code", type=str)
@click.option(
    "--import", "imports", type=str, multiple=True, help="Python modules to import"
)
@click.option(
    "--dry-run", is_flag=True, help="Show results of running this against first 10 rows"
)
@click.option(
    "--multi", is_flag=True, help="Populate columns for keys in returned dictionary"
)
@click.option("--where", help="Optional where clause")
@click.option(
    "-p",
    "--param",
    multiple=True,
    type=(str, str),
    help="Named :parameters for where clause",
)
@click.option("--output", help="Optional separate column to populate with the output")
@click.option(
    "--output-type",
    help="Column type to use for the output column",
    default="text",
    type=click.Choice(["integer", "float", "blob", "text"]),
)
@click.option("--drop", is_flag=True, help="Drop original column afterwards")
@click.option("--no-skip-false", is_flag=True, help="Don't skip falsey values")
@click.option("-s", "--silent", is_flag=True, help="Don't show a progress bar")
@click.option("pdb_", "--pdb", is_flag=True, help="Open pdb debugger on first error")
def convert(
    db_path,
    table,
    columns,
    code,
    imports,
    dry_run,
    multi,
    where,
    param,
    output,
    output_type,
    drop,
    no_skip_false,
    silent,
    pdb_,
):
    sqlite3.enable_callback_tracebacks(True)
    db = sqlite_utils.Database(db_path)
    if output is not None and len(columns) > 1:
        raise click.ClickException("Cannot use --output with more than one column")
    if multi and len(columns) > 1:
        raise click.ClickException("Cannot use --multi with more than one column")
    if drop and not (output or multi):
        raise click.ClickException("--drop can only be used with --output or --multi")
    if code == "-":
        # Read code from standard input
        code = sys.stdin.read()
    where_args = dict(param) if param else []
    # Compile the code into a function body called fn(value)
    try:
        fn = _compile_code(code, imports)
    except SyntaxError as e:
        raise click.ClickException(str(e))
    if dry_run:
        # Pull first 20 values for first column and preview them
        if multi:

            def preview(v):
                return json.dumps(fn(v), default=repr) if v else v

        else:

            def preview(v):
                return fn(v) if v else v

        db.conn.create_function("preview_transform", 1, preview)
        sql = """
            select
                [{column}] as value,
                preview_transform([{column}]) as preview
            from [{table}]{where} limit 10
        """.format(
            column=columns[0],
            table=table,
            where=" where {}".format(where) if where is not None else "",
        )
        for row in db.conn.execute(sql, where_args).fetchall():
            click.echo(str(row[0]))
            click.echo(" --- becomes:")
            click.echo(str(row[1]))
            click.echo()
        count = db[table].count_where(
            where=where,
            where_args=where_args,
        )
        click.echo("Would affect {} row{}".format(count, "" if count == 1 else "s"))
    else:
        # Wrap fn with a thing that will catch errors and optionally drop into pdb
        if pdb_:
            fn_ = fn

            def wrapped_fn(value):
                try:
                    return fn_(value)
                except Exception as ex:
                    print("\nException raised, dropping into pdb...:", ex)
                    pdb.post_mortem(ex.__traceback__)
                    sys.exit(1)

            fn = wrapped_fn
        try:
            db[table].convert(
                columns,
                fn,
                where=where,
                where_args=where_args,
                output=output,
                output_type=output_type,
                drop=drop,
                skip_false=not no_skip_false,
                multi=multi,
                show_progress=not silent,
            )
        except BadMultiValues as e:
            raise click.ClickException(
                "When using --multi code must return a Python dictionary - returned: {}".format(
                    repr(e.values)
                )
            )


@cli.command("add-geometry-column")
@click.argument(
    "db_path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table", type=str)
@click.argument("column_name", type=str)
@click.option(
    "-t",
    "--type",
    "geometry_type",
    type=click.Choice(
        [
            "POINT",
            "LINESTRING",
            "POLYGON",
            "MULTIPOINT",
            "MULTILINESTRING",
            "MULTIPOLYGON",
            "GEOMETRYCOLLECTION",
            "GEOMETRY",
        ],
        case_sensitive=False,
    ),
    default="GEOMETRY",
    help="Specify a geometry type for this column.",
    show_default=True,
)
@click.option(
    "--srid",
    type=int,
    default=4326,
    show_default=True,
    help="Spatial Reference ID. See https://spatialreference.org for details on specific projections.",
)
@click.option(
    "--dimensions",
    "coord_dimension",
    type=str,
    default="XY",
    help="Coordinate dimensions. Use XYZ for three-dimensional geometries.",
)
@click.option("--not-null", "not_null", is_flag=True, help="Add a NOT NULL constraint.")
@load_extension_option
def add_geometry_column(
    db_path,
    table,
    column_name,
    geometry_type,
    srid,
    coord_dimension,
    not_null,
    load_extension,
):
    """Add a SpatiaLite geometry column to an existing table. Requires SpatiaLite extension.
    \n\n
    By default, this command will try to load the SpatiaLite extension from usual paths.
    To load it from a specific path, use --load-extension."""
    db = sqlite_utils.Database(db_path)
    if not db[table].exists():
        raise click.ClickException(
            "You must create a table before adding a geometry column"
        )

    # load spatialite, one way or another
    if load_extension:
        _load_extensions(db, load_extension)
    db.init_spatialite()

    if db[table].add_geometry_column(
        column_name, geometry_type, srid, coord_dimension, not_null
    ):
        click.echo(f"Added {geometry_type} column {column_name} to {table}")


@cli.command("create-spatial-index")
@click.argument(
    "db_path",
    type=click.Path(file_okay=True, dir_okay=False, allow_dash=False),
    required=True,
)
@click.argument("table", type=str)
@click.argument("column_name", type=str)
@load_extension_option
def create_spatial_index(db_path, table, column_name, load_extension):
    """Create a spatial index on a SpatiaLite geometry column.
    The table and geometry column must already exist before trying to add a spatial index.
    \n\n
    By default, this command will try to load the SpatiaLite extension from usual paths.
    To load it from a specific path, use --load-extension."""
    db = sqlite_utils.Database(db_path)
    if not db[table].exists():
        raise click.ClickException(
            "You must create a table and add a geometry column before creating a spatial index"
        )

    # load spatialite
    if load_extension:
        _load_extensions(db, load_extension)
    db.init_spatialite()

    if column_name not in db[table].columns_dict:
        raise click.ClickException(
            "You must add a geometry column before creating a spatial index"
        )

    db[table].create_spatial_index(column_name)


@cli.command(name="plugins")
def plugins_list():
    "List installed plugins"
    click.echo(json.dumps(get_plugins(), indent=2))


pm.hook.register_commands(cli=cli)


def _render_common(title, values):
    if values is None:
        return ""
    lines = [title]
    for value, count in values:
        lines.append("    {}: {}".format(count, value))
    return "\n".join(lines)


class UnicodeDecodeErrorForPath(Exception):
    def __init__(self, exception, path):
        self.exception = exception
        self.path = path


FILE_COLUMNS = {
    "name": lambda p: p.name,
    "path": lambda p: str(p),
    "fullpath": lambda p: str(p.resolve()),
    "sha256": lambda p: hashlib.sha256(p.resolve().read_bytes()).hexdigest(),
    "md5": lambda p: hashlib.md5(p.resolve().read_bytes()).hexdigest(),
    "mode": lambda p: p.stat().st_mode,
    "content": lambda p: p.resolve().read_bytes(),
    "mtime": lambda p: p.stat().st_mtime,
    "ctime": lambda p: p.stat().st_ctime,
    "mtime_int": lambda p: int(p.stat().st_mtime),
    "ctime_int": lambda p: int(p.stat().st_ctime),
    "mtime_iso": lambda p: datetime.utcfromtimestamp(p.stat().st_mtime).isoformat(),
    "ctime_iso": lambda p: datetime.utcfromtimestamp(p.stat().st_ctime).isoformat(),
    "size": lambda p: p.stat().st_size,
    "stem": lambda p: p.stem,
    "suffix": lambda p: p.suffix,
}


def output_rows(iterator, headers, nl, arrays, json_cols):
    # We have to iterate two-at-a-time so we can know if we
    # should output a trailing comma or if we have reached
    # the last row.
    current_iter, next_iter = itertools.tee(iterator, 2)
    next(next_iter, None)
    first = True
    for row, next_row in itertools.zip_longest(current_iter, next_iter):
        is_last = next_row is None
        data = row
        if json_cols:
            # Any value that is a valid JSON string should be treated as JSON
            data = [maybe_json(value) for value in data]
        if not arrays:
            data = dict(zip(headers, data))
        line = "{firstchar}{serialized}{maybecomma}{lastchar}".format(
            firstchar=("[" if first else " ") if not nl else "",
            serialized=json.dumps(data, default=json_binary),
            maybecomma="," if (not nl and not is_last) else "",
            lastchar="]" if (is_last and not nl) else "",
        )
        yield line
        first = False
    if first:
        # We didn't output any rows, so yield the empty list
        yield "[]"


def maybe_json(value):
    if not isinstance(value, str):
        return value
    stripped = value.strip()
    if not (stripped.startswith("{") or stripped.startswith("[")):
        return value
    try:
        return json.loads(stripped)
    except ValueError:
        return value


def json_binary(value):
    if isinstance(value, bytes):
        return {"$base64": True, "encoded": base64.b64encode(value).decode("latin-1")}
    else:
        raise TypeError


def verify_is_dict(doc):
    if not isinstance(doc, dict):
        raise click.ClickException(
            "Rows must all be dictionaries, got: {}".format(repr(doc)[:1000])
        )
    return doc


def _load_extensions(db, load_extension):
    if load_extension:
        db.conn.enable_load_extension(True)
        for ext in load_extension:
            if ext == "spatialite" and not os.path.exists(ext):
                ext = find_spatialite()
            if ":" in ext:
                path, _, entrypoint = ext.partition(":")
                db.conn.execute("SELECT load_extension(?, ?)", [path, entrypoint])
            else:
                db.conn.load_extension(ext)


def _register_functions(db, functions):
    # Register any Python functions as SQL functions:
    sqlite3.enable_callback_tracebacks(True)
    globals = {}
    try:
        exec(functions, globals)
    except SyntaxError as ex:
        raise click.ClickException("Error in functions definition: {}".format(ex))
    # Register all callables in the locals dict:
    for name, value in globals.items():
        if callable(value) and not name.startswith("_"):
            db.register_function(value, name=name)
