1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
from contextlib import contextmanager
import pytest
import psycopg2
import psycopg2.extras
from os import getenv
# TODO: should this be somehow be divined from environment?
POSTGRES_USER = getenv('PGUSER', 'postgres')
POSTGRES_HOST = getenv('PGHOST', 'localhost')
POSTGRES_PORT = getenv('PGPORT', 5432)
POSTGRES_PASSWORD = getenv('PGPASSWORD', '')
TEST_DB_NAME = '_test_db'
FOREIGN_TEST_DB_NAME = '_foreign_test_db'
def db_connection(dbname=None):
conn = psycopg2.connect(user=POSTGRES_USER,
host=POSTGRES_HOST,
password=POSTGRES_PASSWORD,
port=POSTGRES_PORT,
database=dbname)
conn.autocommit = True
return conn
try:
conn = db_connection(dbname=None)
CAN_CONNECT_TO_DB = True
SERVER_VERSION = conn.server_version
except:
CAN_CONNECT_TO_DB = False
SERVER_VERSION = 0
dbtest = pytest.mark.skipif(
not CAN_CONNECT_TO_DB,
reason="Need a postgres instance at localhost accessible by user "
"'%s'" % POSTGRES_USER)
def create_db(dbname=TEST_DB_NAME):
with db_connection(dbname=None).cursor() as cur:
try:
cur.execute('''CREATE DATABASE %s''' % dbname)
except:
pass
def setup_db(conn):
with conn.cursor() as cur:
# schemas
cur.execute('create schema schema1')
cur.execute('create schema schema2')
cur.execute('create schema schema3')
# tables
cur.execute('create table tbl1(id1 integer, txt1 text, CONSTRAINT id_text PRIMARY KEY(id1, txt1))')
cur.execute('create table tbl2(id2 serial, txt2 text)')
cur.execute('create table schema2.tbl2(id2 serial, txt2 text)')
cur.execute('create table schema1.tbl2(id2 serial, txt2 text)')
cur.execute('create table schema1.s1_tbl1(id1 integer, txt1 text)')
cur.execute('create table tbl3(c3 circle, exclude using gist (c3 with &&))')
cur.execute('create table "Inh1"(value1 integer) inherits (tbl1)')
cur.execute('create table inh2(value2 integer) inherits (tbl1, tbl2)')
cur.execute('''
create table schema3.test_generated_default
(id int GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, some_stuff text)''')
# views
cur.execute('create view vw1 as select * from tbl1')
cur.execute('''create view schema1.s1_vw1 as
select * from schema1.s1_tbl1''')
# materialized views
cur.execute('create materialized view mvw1 as select * from tbl1')
cur.execute('''create materialized view schema1.s1_mvw1 as
select * from schema1.s1_tbl1''')
# datatype
cur.execute('create type foo AS (a int, b text)')
# functions
cur.execute('''create function func1() returns int language sql as
$$select 1$$''')
cur.execute('''create function schema1.s1_func1() returns int language
sql as $$select 2$$''')
# domains
cur.execute("create domain gender_t char(1)"
" check (value in ('F', 'M'))")
cur.execute("create domain schema1.smallint_t smallint")
cur.execute("create domain schema1.bigint_t bigint")
cur.execute("comment on domain schema1.bigint_t is"
" 'a really large integer'")
def teardown_db(conn):
with conn.cursor() as cur:
cur.execute('''
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
DROP SCHEMA IF EXISTS schema1 CASCADE;
DROP SCHEMA IF EXISTS schema2 CASCADE;
DROP SCHEMA IF EXISTS schema3 CASCADE;''')
def setup_foreign(conn):
foreign_conn = db_connection(FOREIGN_TEST_DB_NAME)
with foreign_conn.cursor() as foreign_cur:
foreign_cur.execute('create table if not exists foreign_foo (a int, b text)')
with conn.cursor() as cur:
# foreign database wrapper
cur.execute("create extension if not exists postgres_fdw")
cur.execute("create server if not exists foreign_db_server "
"foreign data wrapper postgres_fdw "
"options (host '127.0.0.1', dbname %s )",
(FOREIGN_TEST_DB_NAME, ))
cur.execute("create user mapping if not exists for current_user "
"server foreign_db_server "
"options (user 'postgres') ")
cur.execute("create foreign table if not exists foreign_foo (a int, b text) "
"server foreign_db_server "
"options (schema_name 'public', table_name 'foreign_foo') ")
def teardown_foreign(conn):
with conn.cursor() as cur:
cur.execute("drop server if exists foreign_db_server cascade")
cur.execute("drop extension if exists postgres_fdw")
cur.execute("drop database if exists %s" % FOREIGN_TEST_DB_NAME)
@contextmanager
def foreign_db_environ():
conn2 = db_connection(dbname=TEST_DB_NAME)
create_db(FOREIGN_TEST_DB_NAME)
setup_foreign(conn2)
yield
teardown_foreign(conn2)
|