1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
# Copyright 2012-2014 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Manage a PostgreSQL cluster."""
from __future__ import (
absolute_import,
print_function,
unicode_literals,
)
__metaclass__ = type
__all__ = [
"Cluster",
"PG_VERSION_MAX",
"PG_VERSIONS",
]
from contextlib import closing
from glob import iglob
from os import (
devnull,
environ,
makedirs,
path,
)
import shlex
from shutil import rmtree
from subprocess import (
CalledProcessError,
check_call,
)
import sys
from tempfile import TemporaryFile
from packaging.version import Version
import psycopg2
from postgresfixture.utils import LockFile
PG_BASE = "/usr/lib/postgresql"
PG_VERSION_BINS = {
path.basename(pgdir): path.join(pgdir, "bin")
for pgdir in iglob(path.join(PG_BASE, "*"))
if path.exists(path.join(pgdir, "bin", "pg_ctl"))
}
PG_VERSION_MAX = max(PG_VERSION_BINS, key=Version)
PG_VERSIONS = sorted(PG_VERSION_BINS, key=Version)
def get_pg_bin(version):
"""Return the PostgreSQL ``bin`` directory for the given `version`."""
return PG_VERSION_BINS[version]
def path_with_pg_bin(exe_path, version):
"""Return `exe_path` with the PostgreSQL ``bin`` directory added."""
exe_path = [
elem for elem in exe_path.split(path.pathsep)
if len(elem) != 0 and not elem.isspace()
]
pg_bin = get_pg_bin(version)
if pg_bin not in exe_path:
exe_path.insert(0, pg_bin)
return path.pathsep.join(exe_path)
class Cluster:
"""Represents a PostgreSQL cluster, running or not."""
def __init__(self, datadir, version=PG_VERSION_MAX):
self.datadir = path.abspath(datadir)
self.version = version
self.lock = LockFile(path.join(
path.dirname(self.datadir),
".%s.lock" % path.basename(self.datadir)))
def execute(self, *command, **options):
"""Execute a command with an environment suitable for this cluster."""
env = options.pop("env", environ).copy()
env["PATH"] = path_with_pg_bin(env.get("PATH", ""), self.version)
env["PGDATA"] = env["PGHOST"] = self.datadir
check_call(command, env=env, **options)
@property
def exists(self):
"""Whether or not this cluster exists on disk."""
version_file = path.join(self.datadir, "PG_VERSION")
return path.exists(version_file)
@property
def pidfile(self):
"""The (expected) pidfile for a running cluster.
Does *not* guarantee that the pidfile exists.
"""
return path.join(self.datadir, "postmaster.pid")
@property
def logfile(self):
"""The log file used (or will be used) by this cluster."""
return path.join(self.datadir, "backend.log")
@property
def running(self):
"""Whether this cluster is running or not."""
with open(devnull, "wb") as stdout, TemporaryFile() as stderr:
try:
self.execute("pg_ctl", "status", stdout=stdout, stderr=stderr)
except CalledProcessError as error:
# PostgreSQL has evolved to return different error codes in
# later versions, so here we check for specific codes to avoid
# masking errors from insufficient permissions or missing
# executables, for example.
version = Version(self.version)
if version >= Version("9.4"):
if error.returncode == 3:
# 3 means that the data directory is present and
# accessible but that the server is not running.
return False
elif error.returncode == 4:
# 4 means that the data directory is not present or is
# not accessible. If it's missing, then the server is
# not running. If it is present but not accessible
# then crash because we can't know if the server is
# running or not.
if not self.exists:
return False
elif version >= Version("9.2"):
if error.returncode == 3:
# 3 means that the data directory is present and
# accessible but that the server is not running OR
# that the data directory is not present.
return False
else:
if error.returncode == 1:
# 1 means that the server is not running OR the data
# directory is not present OR that the data directory
# is not accessible.
return False
# This is not a recognised error. First print out the cached
# stderr then re-raise the CalledProcessError.
try:
stderr.seek(0) # Rewind.
stderr_data = stderr.read()
if sys.version_info[0] >= 3:
stderr_data = stderr_data.decode(errors="replace")
sys.stderr.write(stderr_data)
finally:
raise
else:
return True
def create(self):
"""Create this cluster, if it does not exist."""
with self.lock.exclusive:
self._create()
def _create(self):
if not self.exists:
if not path.isdir(self.datadir):
makedirs(self.datadir)
self.execute("pg_ctl", "init", "-s", "-o", "-E utf8 -A trust")
def start(self):
"""Start this cluster, if it's not already started."""
with self.lock.exclusive:
self._start()
def _start(self):
if not self.running:
self._create()
# pg_ctl options:
# -l <file> -- log file.
# -s -- no informational messages.
# -w -- wait until startup is complete.
# postgres options:
# -h <arg> -- host name; empty arg means Unix socket only.
# -F -- don't bother fsync'ing.
# -k -- socket directory.
self.execute(
"pg_ctl", "start", "-l", self.logfile, "-s", "-w",
"-o", "-h '' -F -k %s" % shlex.quote(self.datadir))
def connect(self, database="template1", autocommit=True):
"""Connect to this cluster."""
connection = psycopg2.connect(
database=database, host=self.datadir)
connection.autocommit = autocommit
return connection
def shell(self, database="template1"):
self.execute("psql", "--quiet", "--", database)
@property
def databases(self):
"""The names of databases in this cluster."""
with closing(self.connect("postgres")) as conn:
with closing(conn.cursor()) as cur:
cur.execute("SELECT datname FROM pg_catalog.pg_database")
return {name for (name,) in cur.fetchall()}
def createdb(self, name):
"""Create the named database."""
with closing(self.connect()) as conn:
with closing(conn.cursor()) as cur:
cur.execute("CREATE DATABASE %s" % name)
def dropdb(self, name):
"""Drop the named database."""
with closing(self.connect()) as conn:
with closing(conn.cursor()) as cur:
cur.execute("DROP DATABASE %s" % name)
def stop(self):
"""Stop this cluster, if started."""
with self.lock.exclusive:
self._stop()
def _stop(self):
if self.running:
# pg_ctl options:
# -w -- wait for shutdown to complete.
# -m <mode> -- shutdown mode.
self.execute("pg_ctl", "stop", "-s", "-w", "-m", "fast")
def destroy(self):
"""Destroy this cluster, if it exists.
The cluster will be stopped if it's started.
"""
with self.lock.exclusive:
self._destroy()
def _destroy(self):
if self.exists:
self._stop()
rmtree(self.datadir)
|