File: cluster.py

package info (click to toggle)
postgresfixture 0.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 224 kB
  • sloc: python: 1,094; makefile: 5
file content (242 lines) | stat: -rw-r--r-- 8,317 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# Copyright 2012-2014 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Manage a PostgreSQL cluster."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

__metaclass__ = type
__all__ = [
    "Cluster",
    "PG_VERSION_MAX",
    "PG_VERSIONS",
    ]

from contextlib import closing
from glob import iglob
from os import (
    devnull,
    environ,
    makedirs,
    path,
    )
import shlex
from shutil import rmtree
from subprocess import (
    CalledProcessError,
    check_call,
    )
import sys
from tempfile import TemporaryFile

from packaging.version import Version
import psycopg2

from postgresfixture.utils import LockFile


PG_BASE = "/usr/lib/postgresql"

PG_VERSION_BINS = {
    path.basename(pgdir): path.join(pgdir, "bin")
    for pgdir in iglob(path.join(PG_BASE, "*"))
    if path.exists(path.join(pgdir, "bin", "pg_ctl"))
}

PG_VERSION_MAX = max(PG_VERSION_BINS, key=Version)
PG_VERSIONS = sorted(PG_VERSION_BINS, key=Version)


def get_pg_bin(version):
    """Return the PostgreSQL ``bin`` directory for the given `version`."""
    return PG_VERSION_BINS[version]


def path_with_pg_bin(exe_path, version):
    """Return `exe_path` with the PostgreSQL ``bin`` directory added."""
    exe_path = [
        elem for elem in exe_path.split(path.pathsep)
        if len(elem) != 0 and not elem.isspace()
        ]
    pg_bin = get_pg_bin(version)
    if pg_bin not in exe_path:
        exe_path.insert(0, pg_bin)
    return path.pathsep.join(exe_path)


class Cluster:
    """Represents a PostgreSQL cluster, running or not."""

    def __init__(self, datadir, version=PG_VERSION_MAX):
        self.datadir = path.abspath(datadir)
        self.version = version
        self.lock = LockFile(path.join(
            path.dirname(self.datadir),
            ".%s.lock" % path.basename(self.datadir)))

    def execute(self, *command, **options):
        """Execute a command with an environment suitable for this cluster."""
        env = options.pop("env", environ).copy()
        env["PATH"] = path_with_pg_bin(env.get("PATH", ""), self.version)
        env["PGDATA"] = env["PGHOST"] = self.datadir
        check_call(command, env=env, **options)

    @property
    def exists(self):
        """Whether or not this cluster exists on disk."""
        version_file = path.join(self.datadir, "PG_VERSION")
        return path.exists(version_file)

    @property
    def pidfile(self):
        """The (expected) pidfile for a running cluster.

        Does *not* guarantee that the pidfile exists.
        """
        return path.join(self.datadir, "postmaster.pid")

    @property
    def logfile(self):
        """The log file used (or will be used) by this cluster."""
        return path.join(self.datadir, "backend.log")

    @property
    def running(self):
        """Whether this cluster is running or not."""
        with open(devnull, "wb") as stdout, TemporaryFile() as stderr:
            try:
                self.execute("pg_ctl", "status", stdout=stdout, stderr=stderr)
            except CalledProcessError as error:
                # PostgreSQL has evolved to return different error codes in
                # later versions, so here we check for specific codes to avoid
                # masking errors from insufficient permissions or missing
                # executables, for example.
                version = Version(self.version)
                if version >= Version("9.4"):
                    if error.returncode == 3:
                        # 3 means that the data directory is present and
                        # accessible but that the server is not running.
                        return False
                    elif error.returncode == 4:
                        # 4 means that the data directory is not present or is
                        # not accessible. If it's missing, then the server is
                        # not running. If it is present but not accessible
                        # then crash because we can't know if the server is
                        # running or not.
                        if not self.exists:
                            return False
                elif version >= Version("9.2"):
                    if error.returncode == 3:
                        # 3 means that the data directory is present and
                        # accessible but that the server is not running OR
                        # that the data directory is not present.
                        return False
                else:
                    if error.returncode == 1:
                        # 1 means that the server is not running OR the data
                        # directory is not present OR that the data directory
                        # is not accessible.
                        return False

                # This is not a recognised error. First print out the cached
                # stderr then re-raise the CalledProcessError.
                try:
                    stderr.seek(0)  # Rewind.
                    stderr_data = stderr.read()
                    if sys.version_info[0] >= 3:
                        stderr_data = stderr_data.decode(errors="replace")
                    sys.stderr.write(stderr_data)
                finally:
                    raise
            else:
                return True

    def create(self):
        """Create this cluster, if it does not exist."""
        with self.lock.exclusive:
            self._create()

    def _create(self):
        if not self.exists:
            if not path.isdir(self.datadir):
                makedirs(self.datadir)
            self.execute("pg_ctl", "init", "-s", "-o", "-E utf8 -A trust")

    def start(self):
        """Start this cluster, if it's not already started."""
        with self.lock.exclusive:
            self._start()

    def _start(self):
        if not self.running:
            self._create()
            # pg_ctl options:
            #  -l <file> -- log file.
            #  -s -- no informational messages.
            #  -w -- wait until startup is complete.
            # postgres options:
            #  -h <arg> -- host name; empty arg means Unix socket only.
            #  -F -- don't bother fsync'ing.
            #  -k -- socket directory.
            self.execute(
                "pg_ctl", "start", "-l", self.logfile, "-s", "-w",
                "-o", "-h '' -F -k %s" % shlex.quote(self.datadir))

    def connect(self, database="template1", autocommit=True):
        """Connect to this cluster."""
        connection = psycopg2.connect(
            database=database, host=self.datadir)
        connection.autocommit = autocommit
        return connection

    def shell(self, database="template1"):
        self.execute("psql", "--quiet", "--", database)

    @property
    def databases(self):
        """The names of databases in this cluster."""
        with closing(self.connect("postgres")) as conn:
            with closing(conn.cursor()) as cur:
                cur.execute("SELECT datname FROM pg_catalog.pg_database")
                return {name for (name,) in cur.fetchall()}

    def createdb(self, name):
        """Create the named database."""
        with closing(self.connect()) as conn:
            with closing(conn.cursor()) as cur:
                cur.execute("CREATE DATABASE %s" % name)

    def dropdb(self, name):
        """Drop the named database."""
        with closing(self.connect()) as conn:
            with closing(conn.cursor()) as cur:
                cur.execute("DROP DATABASE %s" % name)

    def stop(self):
        """Stop this cluster, if started."""
        with self.lock.exclusive:
            self._stop()

    def _stop(self):
        if self.running:
            # pg_ctl options:
            #  -w -- wait for shutdown to complete.
            #  -m <mode> -- shutdown mode.
            self.execute("pg_ctl", "stop", "-s", "-w", "-m", "fast")

    def destroy(self):
        """Destroy this cluster, if it exists.

        The cluster will be stopped if it's started.
        """
        with self.lock.exclusive:
            self._destroy()

    def _destroy(self):
        if self.exists:
            self._stop()
            rmtree(self.datadir)