File: dbutils.py

package info (click to toggle)
python-pgspecial 1.11.10%2Bdfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 352 kB
  • sloc: python: 3,143; makefile: 3
file content (144 lines) | stat: -rw-r--r-- 5,156 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from contextlib import contextmanager

import pytest
import psycopg2
import psycopg2.extras
from os import getenv


# TODO: should this be somehow be divined from environment?
POSTGRES_USER = getenv('PGUSER', 'postgres')
POSTGRES_HOST = getenv('PGHOST', 'localhost')
POSTGRES_PORT = getenv('PGPORT', 5432)
POSTGRES_PASSWORD = getenv('PGPASSWORD', '')

TEST_DB_NAME = '_test_db'
FOREIGN_TEST_DB_NAME = '_foreign_test_db'

def db_connection(dbname=None):
    conn = psycopg2.connect(user=POSTGRES_USER,
                            host=POSTGRES_HOST,
                            password=POSTGRES_PASSWORD,
                            port=POSTGRES_PORT,
                            database=dbname)
    conn.autocommit = True
    return conn

try:
    conn = db_connection(dbname=None)
    CAN_CONNECT_TO_DB = True
    SERVER_VERSION = conn.server_version
except:
    CAN_CONNECT_TO_DB = False
    SERVER_VERSION = 0


dbtest = pytest.mark.skipif(
    not CAN_CONNECT_TO_DB,
    reason="Need a postgres instance at localhost accessible by user "
           "'%s'" % POSTGRES_USER)


def create_db(dbname=TEST_DB_NAME):
    with db_connection(dbname=None).cursor() as cur:
        try:
            cur.execute('''CREATE DATABASE %s''' % dbname)
        except:
            pass


def setup_db(conn):
    with conn.cursor() as cur:
        # schemas
        cur.execute('create schema schema1')
        cur.execute('create schema schema2')
        cur.execute('create schema schema3')

        # tables
        cur.execute('create table tbl1(id1 integer, txt1 text, CONSTRAINT id_text PRIMARY KEY(id1, txt1))')
        cur.execute('create table tbl2(id2 serial, txt2 text)')
        cur.execute('create table schema2.tbl2(id2 serial, txt2 text)')
        cur.execute('create table schema1.tbl2(id2 serial, txt2 text)')
        cur.execute('create table schema1.s1_tbl1(id1 integer, txt1 text)')
        cur.execute('create table tbl3(c3 circle, exclude using gist (c3 with &&))')
        cur.execute('create table "Inh1"(value1 integer) inherits (tbl1)')
        cur.execute('create table inh2(value2 integer) inherits (tbl1, tbl2)')
        cur.execute('''
        create table schema3.test_generated_default 
        (id int GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, some_stuff text)''')

        # views
        cur.execute('create view vw1 as select * from tbl1')
        cur.execute('''create view schema1.s1_vw1 as
                       select * from schema1.s1_tbl1''')

        # materialized views
        cur.execute('create materialized view mvw1 as select * from tbl1')
        cur.execute('''create materialized view schema1.s1_mvw1 as
                       select * from schema1.s1_tbl1''')

        # datatype
        cur.execute('create type foo AS (a int, b text)')

        # functions
        cur.execute('''create function func1() returns int language sql as
                       $$select 1$$''')
        cur.execute('''create function schema1.s1_func1() returns int language
                       sql as $$select 2$$''')

        # domains
        cur.execute("create domain gender_t char(1)"
                    " check (value in ('F', 'M'))")
        cur.execute("create domain schema1.smallint_t smallint")
        cur.execute("create domain schema1.bigint_t bigint")
        cur.execute("comment on domain schema1.bigint_t is"
                    " 'a really large integer'")


def teardown_db(conn):
    with conn.cursor() as cur:
        cur.execute('''
            DROP SCHEMA public CASCADE;
            CREATE SCHEMA public;
            DROP SCHEMA IF EXISTS schema1 CASCADE;
            DROP SCHEMA IF EXISTS schema2 CASCADE;
            DROP SCHEMA IF EXISTS schema3 CASCADE;''')


def setup_foreign(conn):

    foreign_conn = db_connection(FOREIGN_TEST_DB_NAME)
    with foreign_conn.cursor() as foreign_cur:
        foreign_cur.execute('create table if not exists foreign_foo (a int, b text)')

    with conn.cursor() as cur:

        # foreign database wrapper
        cur.execute("create extension if not exists postgres_fdw")
        cur.execute("create server if not exists foreign_db_server "
                    "foreign data wrapper postgres_fdw "
                    "options (host '127.0.0.1', dbname %s )", 
                    (FOREIGN_TEST_DB_NAME, ))
        cur.execute("create user mapping if not exists for current_user "
                    "server foreign_db_server "
                    "options (user 'postgres') ")
        cur.execute("create foreign table if not exists foreign_foo (a int, b text) "
                    "server foreign_db_server "
                    "options (schema_name 'public', table_name 'foreign_foo') ")
       
def teardown_foreign(conn):

    with conn.cursor() as cur:

        cur.execute("drop server if exists foreign_db_server cascade")
        cur.execute("drop extension if exists postgres_fdw")
        cur.execute("drop database if exists %s" % FOREIGN_TEST_DB_NAME) 


@contextmanager
def foreign_db_environ():
    conn2 = db_connection(dbname=TEST_DB_NAME)
    create_db(FOREIGN_TEST_DB_NAME)
    setup_foreign(conn2)
    yield
    teardown_foreign(conn2)