File: _db.py

package info (click to toggle)
python-wn 1.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,100 kB
  • sloc: python: 8,429; xml: 566; sql: 238; makefile: 12
file content (156 lines) | stat: -rw-r--r-- 4,736 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Storage back-end interface.
"""

import json
import logging
import sqlite3
from importlib import resources
from pathlib import Path

from wn._config import config
from wn._exceptions import DatabaseError
from wn._types import AnyPath
from wn._util import format_lexicon_specifier, short_hash

logger = logging.getLogger("wn")


# Module Constants

DEBUG = False

# This stores hashes of the schema to check for version differences.
# When the schema changes, the hash will change. If the new hash is
# not added here, the 'test_schema_compatibility' test will fail. It
# is the developer's responsibility to only add compatible schema
# hashes here. If the schema change is not backwards-compatible, then
# clear all old hashes and only put the latest hash here. A hash can
# be generated like this:
#
# >>> import sqlite3
# >>> import wn
# >>> conn = sqlite3.connect(wn.config.database_path)
# >>> wn._db.schema_hash(conn)
#
COMPATIBLE_SCHEMA_HASHES = {
    "8348fc1a6254f514294a1dc70458e0733742935d",
}


# Optional metadata is stored as a JSON string


def _adapt_dict(d: dict) -> bytes:
    return json.dumps(d).encode("utf-8")


def _convert_dict(s: bytes) -> dict:
    return json.loads(s)


def _convert_boolean(s: bytes) -> bool:
    return bool(int(s))


sqlite3.register_adapter(dict, _adapt_dict)
sqlite3.register_converter("meta", _convert_dict)
sqlite3.register_converter("boolean", _convert_boolean)


# The pool is a cache of open connections. Unless the database path is
# changed, there should only be zero or one.
pool: dict[AnyPath, sqlite3.Connection] = {}


# The connect() function should be used for all connections


def connect(check_schema: bool = True) -> sqlite3.Connection:
    dbpath = config.database_path
    if dbpath not in pool:
        if not config.data_directory.exists():
            config.data_directory.mkdir(parents=True, exist_ok=True)
        initialized = dbpath.is_file()
        conn = sqlite3.connect(
            str(dbpath),
            detect_types=sqlite3.PARSE_DECLTYPES,
            check_same_thread=not config.allow_multithreading,
        )
        # foreign key support needs to be enabled for each connection
        conn.execute("PRAGMA foreign_keys = ON")
        if DEBUG:
            conn.set_trace_callback(print)
        if not initialized:
            logger.info("initializing database: %s", dbpath)
            _init_db(conn)
        if check_schema:
            _check_schema_compatibility(conn, dbpath)

        pool[dbpath] = conn
    return pool[dbpath]


def _init_db(conn: sqlite3.Connection) -> None:
    schema = (resources.files("wn") / "schema.sql").read_text()
    conn.executescript(schema)
    with conn:
        conn.executemany(
            "INSERT INTO ili_statuses VALUES (null,?)",
            [("presupposed",), ("proposed",)],
        )


def _check_schema_compatibility(conn: sqlite3.Connection, dbpath: Path) -> None:
    hash = schema_hash(conn)

    # if the hash is known, then we're all good here
    if hash in COMPATIBLE_SCHEMA_HASHES:
        return

    logger.debug("current schema hash:\n  %s", hash)
    logger.debug(
        "compatible schema hashes:\n  %s", "\n  ".join(COMPATIBLE_SCHEMA_HASHES)
    )
    # otherwise, try to raise a helpful error message
    msg = "Wn's schema has changed and is no longer compatible with the database."
    try:
        specs = list_lexicons_safe(conn)
    except DatabaseError as exc:
        raise DatabaseError(msg) from exc
    if specs:
        installed = "\n  ".join(specs)
        msg += (
            f"\nLexicons currently installed:\n  {installed}"
            "\nRun wn.reset_database(rebuild=True) to rebuild the database."
        )
    else:
        msg += (
            "\nNo lexicons are currently installed."
            "\nRun wn.reset_database() to re-initialize the database."
        )
    raise DatabaseError(msg)


def list_lexicons_safe(conn: sqlite3.Connection | None = None) -> list[str]:
    """Return the list of lexicon specifiers for added lexicons."""
    if conn is None:
        conn = connect(check_schema=False)
    try:
        specs = conn.execute("SELECT id, version FROM lexicons").fetchall()
    except sqlite3.OperationalError as exc:
        raise DatabaseError("could not list lexicons") from exc
    return [format_lexicon_specifier(id, ver) for id, ver in specs]


def schema_hash(conn: sqlite3.Connection) -> str:
    query = "SELECT sql FROM sqlite_master WHERE NOT sql ISNULL"
    schema = "\n\n".join(row[0] for row in conn.execute(query))
    return short_hash(schema)


def clear_connections() -> None:
    """Close and delete any open database connections."""
    for path in list(pool):
        pool[path].close()
        del pool[path]