1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
from sqlite_utils import Database
from sqlite_utils import cli
from click.testing import CliRunner
import pytest
def test_enable_counts_specific_table(fresh_db):
foo = fresh_db["foo"]
assert fresh_db.table_names() == []
for i in range(10):
foo.insert({"name": "item {}".format(i)})
assert fresh_db.table_names() == ["foo"]
assert foo.count == 10
# Now enable counts
foo.enable_counts()
assert foo.triggers_dict == {
"foo_counts_insert": (
"CREATE TRIGGER [foo_counts_insert] AFTER INSERT ON [foo]\n"
"BEGIN\n"
" INSERT OR REPLACE INTO [_counts]\n"
" VALUES (\n 'foo',\n"
" COALESCE(\n"
" (SELECT count FROM [_counts] WHERE [table] = 'foo'),\n"
" 0\n"
" ) + 1\n"
" );\n"
"END"
),
"foo_counts_delete": (
"CREATE TRIGGER [foo_counts_delete] AFTER DELETE ON [foo]\n"
"BEGIN\n"
" INSERT OR REPLACE INTO [_counts]\n"
" VALUES (\n"
" 'foo',\n"
" COALESCE(\n"
" (SELECT count FROM [_counts] WHERE [table] = 'foo'),\n"
" 0\n"
" ) - 1\n"
" );\n"
"END"
),
}
assert fresh_db.table_names() == ["foo", "_counts"]
assert list(fresh_db["_counts"].rows) == [{"count": 10, "table": "foo"}]
# Add some items to test the triggers
for i in range(5):
foo.insert({"name": "item {}".format(10 + i)})
assert foo.count == 15
assert list(fresh_db["_counts"].rows) == [{"count": 15, "table": "foo"}]
# Delete some items
foo.delete_where("rowid < 7")
assert foo.count == 9
assert list(fresh_db["_counts"].rows) == [{"count": 9, "table": "foo"}]
foo.delete_where()
assert foo.count == 0
assert list(fresh_db["_counts"].rows) == [{"count": 0, "table": "foo"}]
def test_enable_counts_all_tables(fresh_db):
foo = fresh_db["foo"]
bar = fresh_db["bar"]
foo.insert({"name": "Cleo"})
bar.insert({"name": "Cleo"})
foo.enable_fts(["name"])
fresh_db.enable_counts()
assert set(fresh_db.table_names()) == {
"foo",
"bar",
"foo_fts",
"foo_fts_data",
"foo_fts_idx",
"foo_fts_docsize",
"foo_fts_config",
"_counts",
}
assert list(fresh_db["_counts"].rows) == [
{"count": 1, "table": "foo"},
{"count": 1, "table": "bar"},
{"count": 3, "table": "foo_fts_data"},
{"count": 1, "table": "foo_fts_idx"},
{"count": 1, "table": "foo_fts_docsize"},
{"count": 1, "table": "foo_fts_config"},
]
@pytest.fixture
def counts_db_path(tmpdir):
path = str(tmpdir / "test.db")
db = Database(path)
db["foo"].insert({"name": "bar"})
db["bar"].insert({"name": "bar"})
db["bar"].insert({"name": "bar"})
db["baz"].insert({"name": "bar"})
return path
@pytest.mark.parametrize(
"extra_args,expected_triggers",
[
(
[],
[
"foo_counts_insert",
"foo_counts_delete",
"bar_counts_insert",
"bar_counts_delete",
"baz_counts_insert",
"baz_counts_delete",
],
),
(
["bar"],
[
"bar_counts_insert",
"bar_counts_delete",
],
),
],
)
def test_cli_enable_counts(counts_db_path, extra_args, expected_triggers):
db = Database(counts_db_path)
assert list(db.triggers_dict.keys()) == []
result = CliRunner().invoke(cli.cli, ["enable-counts", counts_db_path] + extra_args)
assert result.exit_code == 0
assert list(db.triggers_dict.keys()) == expected_triggers
def test_uses_counts_after_enable_counts(counts_db_path):
db = Database(counts_db_path)
logged = []
with db.tracer(lambda sql, parameters: logged.append((sql, parameters))):
assert db["foo"].count == 1
assert logged == [
("select name from sqlite_master where type = 'view'", None),
("select count(*) from [foo]", []),
]
logged.clear()
assert not db.use_counts_table
db.enable_counts()
assert db.use_counts_table
assert db["foo"].count == 1
assert logged == [
(
"CREATE TABLE IF NOT EXISTS [_counts](\n [table] TEXT PRIMARY KEY,\n count INTEGER DEFAULT 0\n);",
None,
),
("select name from sqlite_master where type = 'table'", None),
("select name from sqlite_master where type = 'view'", None),
("select name from sqlite_master where type = 'view'", None),
("select name from sqlite_master where type = 'view'", None),
("select name from sqlite_master where type = 'view'", None),
("select sql from sqlite_master where name = ?", ("foo",)),
("SELECT quote(:value)", {"value": "foo"}),
("select sql from sqlite_master where name = ?", ("bar",)),
("SELECT quote(:value)", {"value": "bar"}),
("select sql from sqlite_master where name = ?", ("baz",)),
("SELECT quote(:value)", {"value": "baz"}),
("select sql from sqlite_master where name = ?", ("_counts",)),
("select name from sqlite_master where type = 'view'", None),
("select [table], count from _counts where [table] in (?)", ["foo"]),
]
def test_reset_counts(counts_db_path):
db = Database(counts_db_path)
db["foo"].enable_counts()
db["bar"].enable_counts()
assert db.cached_counts() == {"foo": 1, "bar": 2}
# Corrupt the value
db["_counts"].update("foo", {"count": 3})
assert db.cached_counts() == {"foo": 3, "bar": 2}
assert db["foo"].count == 3
# Reset them
db.reset_counts()
assert db.cached_counts() == {"foo": 1, "bar": 2}
assert db["foo"].count == 1
def test_reset_counts_cli(counts_db_path):
db = Database(counts_db_path)
db["foo"].enable_counts()
db["bar"].enable_counts()
assert db.cached_counts() == {"foo": 1, "bar": 2}
db["_counts"].update("foo", {"count": 3})
result = CliRunner().invoke(cli.cli, ["reset-counts", counts_db_path])
assert result.exit_code == 0
assert db.cached_counts() == {"foo": 1, "bar": 2}
|