1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import TYPE_CHECKING, Generic, TypeVar
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore
from synapse.storage.databases.state import StateGroupDataStore
from synapse.storage.databases.state.deletion import StateDeletionDataStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.main import DataStore
logger = logging.getLogger(__name__)
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=["database_name", SERVER_NAME_LABEL],
)
class Databases(Generic[DataStoreT]):
"""The various databases.
These are low level interfaces to physical databases.
Attributes:
databases
main
state
persist_events
state_deletion
"""
databases: list[DatabasePool]
main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class`
state: StateGroupDataStore
persist_events: PersistEventsStore | None
state_deletion: StateDeletionDataStore
def __init__(self, main_store_class: type[DataStoreT], hs: "HomeServer"):
# Note we pass in the main store class here as workers use a different main
# store.
self.databases = []
main: DataStoreT | None = None
state: StateGroupDataStore | None = None
state_deletion: StateDeletionDataStore | None = None
persist_events: PersistEventsStore | None = None
server_name = hs.hostname
for database_config in hs.config.database.databases:
db_name = database_config.name
engine = create_engine(database_config.config)
with make_conn(
db_config=database_config,
engine=engine,
default_txn_name="startup",
server_name=server_name,
) as db_conn:
logger.info("[database config %r]: Checking database server", db_name)
engine.check_database(db_conn)
logger.info(
"[database config %r]: Preparing for databases %r",
db_name,
database_config.databases,
)
prepare_database(
db_conn,
engine,
hs.config,
databases=database_config.databases,
)
database = DatabasePool(hs, database_config, engine)
if "main" in database_config.databases:
logger.info(
"[database config %r]: Starting 'main' database", db_name
)
# Sanity check we don't try and configure the main store on
# multiple databases.
if main:
raise Exception("'main' data store already configured")
main = main_store_class(database, db_conn, hs)
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main, db_conn) # type: ignore[arg-type]
if "state" in database_config.databases:
logger.info(
"[database config %r]: Starting 'state' database", db_name
)
# Sanity check we don't try and configure the state store on
# multiple databases.
if state:
raise Exception("'state' data store already configured")
state_deletion = StateDeletionDataStore(database, db_conn, hs)
state = StateGroupDataStore(database, db_conn, hs, state_deletion)
db_conn.commit()
self.databases.append(database)
logger.info("[database config %r]: prepared", db_name)
# Closing the context manager doesn't close the connection.
# psycopg will close the connection when the object gets GCed, but *only*
# if the PID is the same as when the connection was opened [1], and
# it may not be if we fork in the meantime.
#
# [1]: https://github.com/psycopg/psycopg2/blob/2_8_5/psycopg/connection_type.c#L1378
db_conn.close()
# Track the background update status for each database
background_update_status.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(database.name(), server_name): database.updates.get_status()
for database in self.databases
},
)
# Sanity check that we have actually configured all the required stores.
if not main:
raise Exception("No 'main' database configured")
if not state or not state_deletion:
raise Exception("No 'state' database configured")
# We use local variables here to ensure that the databases do not have
# optional types.
self.main = main # type: ignore[assignment]
self.state = state
self.persist_events = persist_events
self.state_deletion = state_deletion
|