1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
import pytest
import psycopg
from pgcli.main import format_output, OutputSettings
from os import getenv
POSTGRES_USER = getenv("PGUSER", "postgres")
POSTGRES_HOST = getenv("PGHOST", "localhost")
POSTGRES_PORT = getenv("PGPORT", 5432)
POSTGRES_PASSWORD = getenv("PGPASSWORD", "postgres")
def db_connection(dbname=None):
conn = psycopg.connect(
user=POSTGRES_USER,
host=POSTGRES_HOST,
password=POSTGRES_PASSWORD,
port=POSTGRES_PORT,
dbname=dbname,
)
conn.autocommit = True
return conn
try:
conn = db_connection()
CAN_CONNECT_TO_DB = True
SERVER_VERSION = conn.info.parameter_status("server_version")
JSON_AVAILABLE = True
JSONB_AVAILABLE = True
except Exception:
CAN_CONNECT_TO_DB = JSON_AVAILABLE = JSONB_AVAILABLE = False
SERVER_VERSION = 0
dbtest = pytest.mark.skipif(
not CAN_CONNECT_TO_DB,
reason="Need a postgres instance at localhost accessible by user 'postgres'",
)
requires_json = pytest.mark.skipif(not JSON_AVAILABLE, reason="Postgres server unavailable or json type not defined")
requires_jsonb = pytest.mark.skipif(not JSONB_AVAILABLE, reason="Postgres server unavailable or jsonb type not defined")
def create_db(dbname):
with db_connection().cursor() as cur:
try:
cur.execute("""CREATE DATABASE _test_db""")
except Exception:
pass
def drop_tables(conn):
with conn.cursor() as cur:
cur.execute(
"""
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
DROP SCHEMA IF EXISTS schema1 CASCADE;
DROP SCHEMA IF EXISTS schema2 CASCADE"""
)
def run(executor, sql, join=False, expanded=False, pgspecial=None, exception_formatter=None):
"Return string output for the sql to be run"
results = executor.run(sql, pgspecial, exception_formatter)
formatted = []
settings = OutputSettings(table_format="psql", dcmlfmt="d", floatfmt="g", expanded=expanded)
for title, rows, headers, status, sql, success, is_special in results:
formatted.extend(format_output(title, rows, headers, status, settings))
if join:
formatted = "\n".join(formatted)
return formatted
def completions_to_set(completions):
return {(completion.display_text, completion.display_meta_text) for completion in completions}
|