1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
|
import re
from textwrap import dedent
import psycopg
import pytest
from unittest.mock import patch, MagicMock
from pgspecial.main import PGSpecial, NO_QUERY
from utils import run, dbtest, requires_json, requires_jsonb
from pgcli.main import PGCli, exception_formatter as main_exception_formatter
from pgcli.packages.parseutils.meta import FunctionMetadata
def function_meta_data(
func_name,
schema_name="public",
arg_names=None,
arg_types=None,
arg_modes=None,
return_type=None,
is_aggregate=False,
is_window=False,
is_set_returning=False,
is_extension=False,
arg_defaults=None,
):
return FunctionMetadata(
schema_name,
func_name,
arg_names,
arg_types,
arg_modes,
return_type,
is_aggregate,
is_window,
is_set_returning,
is_extension,
arg_defaults,
)
@dbtest
def test_conn(executor):
run(executor, """create table test(a text)""")
run(executor, """insert into test values('abc')""")
assert run(executor, """select * from test""", join=True) == dedent(
"""\
+-----+
| a |
|-----|
| abc |
+-----+
SELECT 1"""
)
@dbtest
def test_copy(executor):
executor_copy = executor.copy()
run(executor_copy, """create table test(a text)""")
run(executor_copy, """insert into test values('abc')""")
assert run(executor_copy, """select * from test""", join=True) == dedent(
"""\
+-----+
| a |
|-----|
| abc |
+-----+
SELECT 1"""
)
@dbtest
def test_bools_are_treated_as_strings(executor):
run(executor, """create table test(a boolean)""")
run(executor, """insert into test values(True)""")
assert run(executor, """select * from test""", join=True) == dedent(
"""\
+------+
| a |
|------|
| True |
+------+
SELECT 1"""
)
@dbtest
def test_expanded_slash_G(executor, pgspecial):
# Tests whether we reset the expanded output after a \G.
run(executor, """create table test(a boolean)""")
run(executor, """insert into test values(True)""")
results = run(executor, r"""select * from test \G""", pgspecial=pgspecial)
assert pgspecial.expanded_output == False
@dbtest
def test_schemata_table_views_and_columns_query(executor):
run(executor, "create table a(x text, y text)")
run(executor, "create table b(z text)")
run(executor, "create view d as select 1 as e")
run(executor, "create schema schema1")
run(executor, "create table schema1.c (w text DEFAULT 'meow')")
run(executor, "create schema schema2")
# schemata
# don't enforce all members of the schemas since they may include postgres
# temporary schemas
assert set(executor.schemata()) >= {
"public",
"pg_catalog",
"information_schema",
"schema1",
"schema2",
}
assert executor.search_path() == ["pg_catalog", "public"]
# tables
assert set(executor.tables()) >= {
("public", "a"),
("public", "b"),
("schema1", "c"),
}
assert set(executor.table_columns()) >= {
("public", "a", "x", "text", False, None),
("public", "a", "y", "text", False, None),
("public", "b", "z", "text", False, None),
("schema1", "c", "w", "text", True, "'meow'::text"),
}
# views
assert set(executor.views()) >= {("public", "d")}
assert set(executor.view_columns()) >= {
("public", "d", "e", "integer", False, None)
}
@dbtest
def test_foreign_key_query(executor):
run(executor, "create schema schema1")
run(executor, "create schema schema2")
run(executor, "create table schema1.parent(parentid int PRIMARY KEY)")
run(
executor,
"create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)",
)
assert set(executor.foreignkeys()) >= {
("schema1", "parent", "parentid", "schema2", "child", "motherid")
}
@dbtest
def test_functions_query(executor):
run(
executor,
"""create function func1() returns int
language sql as $$select 1$$""",
)
run(executor, "create schema schema1")
run(
executor,
"""create function schema1.func2() returns int
language sql as $$select 2$$""",
)
run(
executor,
"""create function func3()
returns table(x int, y int) language sql
as $$select 1, 2 from generate_series(1,5)$$;""",
)
run(
executor,
"""create function func4(x int) returns setof int language sql
as $$select generate_series(1,5)$$;""",
)
funcs = set(executor.functions())
assert funcs >= {
function_meta_data(func_name="func1", return_type="integer"),
function_meta_data(
func_name="func3",
arg_names=["x", "y"],
arg_types=["integer", "integer"],
arg_modes=["t", "t"],
return_type="record",
is_set_returning=True,
),
function_meta_data(
schema_name="public",
func_name="func4",
arg_names=("x",),
arg_types=("integer",),
return_type="integer",
is_set_returning=True,
),
function_meta_data(
schema_name="schema1", func_name="func2", return_type="integer"
),
}
@dbtest
def test_datatypes_query(executor):
run(executor, "create type foo AS (a int, b text)")
types = list(executor.datatypes())
assert types == [("public", "foo")]
@dbtest
def test_database_list(executor):
databases = executor.databases()
assert "_test_db" in databases
@dbtest
def test_invalid_syntax(executor, exception_formatter):
result = run(
executor,
"invalid syntax!",
exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=False),
)
assert 'syntax error at or near "invalid"' in result[0]
assert "SQLSTATE" not in result[0]
@dbtest
def test_invalid_syntax_verbose(executor):
result = run(
executor,
"invalid syntax!",
exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=True),
)
fields = r"""
Severity: ERROR
Severity \(non-localized\): ERROR
SQLSTATE code: 42601
Message: syntax error at or near "invalid"
Position: 1
File: scan\.l
Line: \d+
Routine: scanner_yyerror
""".strip()
assert re.search(fields, result[0])
@dbtest
def test_invalid_column_name(executor, exception_formatter):
result = run(
executor, "select invalid command", exception_formatter=exception_formatter
)
assert 'column "invalid" does not exist' in result[0]
@pytest.fixture(params=[True, False])
def expanded(request):
return request.param
@dbtest
def test_unicode_support_in_output(executor, expanded):
run(executor, "create table unicodechars(t text)")
run(executor, "insert into unicodechars (t) values ('é')")
# See issue #24, this raises an exception without proper handling
assert "é" in run(
executor, "select * from unicodechars", join=True, expanded=expanded
)
@dbtest
def test_not_is_special(executor, pgspecial):
"""is_special is set to false for database queries."""
query = "select 1"
result = list(executor.run(query, pgspecial=pgspecial))
success, is_special = result[0][5:]
assert success == True
assert is_special == False
@dbtest
def test_execute_from_file_no_arg(executor, pgspecial):
r"""\i without a filename returns an error."""
result = list(executor.run(r"\i", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert "missing required argument" in status
assert success == False
assert is_special == True
@dbtest
@patch("pgcli.main.os")
def test_execute_from_file_io_error(os, executor, pgspecial):
r"""\i with an os_error returns an error."""
# Inject an OSError.
os.path.expanduser.side_effect = OSError("test")
# Check the result.
result = list(executor.run(r"\i test", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert status == "test"
assert success == False
assert is_special == True
@dbtest
def test_execute_from_commented_file_that_executes_another_file(
executor, pgspecial, tmpdir
):
# https://github.com/dbcli/pgcli/issues/1336
sqlfile1 = tmpdir.join("test01.sql")
sqlfile1.write("-- asdf \n\\h")
sqlfile2 = tmpdir.join("test00.sql")
sqlfile2.write("--An useless comment;\nselect now();\n-- another useless comment")
rcfile = str(tmpdir.join("rcfile"))
print(rcfile)
cli = PGCli(pgexecute=executor, pgclirc_file=rcfile)
assert cli != None
statement = "--comment\n\\h"
result = run(executor, statement, pgspecial=cli.pgspecial)
assert result != None
assert result[0].find("ALTER TABLE")
@dbtest
def test_execute_commented_first_line_and_special(executor, pgspecial, tmpdir):
# just some base cases that should work also
statement = "--comment\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
statement = "/*comment*/\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
# https://github.com/dbcli/pgcli/issues/1362
statement = "--comment\n\\h"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = "--comment1\n--comment2\n\\h"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = "/*comment*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = """/*comment1
comment2*/
\h"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = """/*comment1
comment2*/
/*comment 3
comment4*/
\\h"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = " /*comment*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = "/*comment\ncomment line2*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = " /*comment\ncomment line2*/\n\h;"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("ALTER") >= 0
assert result[1].find("ABORT") >= 0
statement = """\\h /*comment4 */"""
result = run(executor, statement, pgspecial=pgspecial)
print(result)
assert result != None
assert result[0].find("No help") >= 0
# TODO: we probably don't want to do this but sqlparse is not parsing things well
# we relly want it to find help but right now, sqlparse isn't dropping the /*comment*/
# style comments after command
statement = """/*comment1*/
\h
/*comment4 */"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[0].find("No help") >= 0
# TODO: same for this one
statement = """/*comment1
comment3
comment2*/
\\h
/*comment4
comment5
comment6*/"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[0].find("No help") >= 0
@dbtest
def test_execute_commented_first_line_and_normal(executor, pgspecial, tmpdir):
# https://github.com/dbcli/pgcli/issues/1403
# just some base cases that should work also
statement = "--comment\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
statement = "/*comment*/\nselect now();"
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[1].find("now") >= 0
# this simulates the original error (1403) without having to add/drop tables
# since it was just an error on reading input files and not the actual
# command itself
# test that the statement works
statement = """VALUES (1, 'one'), (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# test the statement with a \n in the middle
statement = """VALUES (1, 'one'),\n (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# test the statement with a newline in the middle
statement = """VALUES (1, 'one'),
(2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# now add a single comment line
statement = """--comment\nVALUES (1, 'one'), (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# doing without special char \n
statement = """--comment
VALUES (1,'one'),
(2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# two comment lines
statement = """--comment\n--comment2\nVALUES (1,'one'), (2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# doing without special char \n
statement = """--comment
--comment2
VALUES (1,'one'), (2, 'two'), (3, 'three');
"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# multiline comment + newline in middle of the statement
statement = """/*comment
comment2
comment3*/
VALUES (1,'one'),
(2, 'two'), (3, 'three');"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
# multiline comment + newline in middle of the statement
# + comments after the statement
statement = """/*comment
comment2
comment3*/
VALUES (1,'one'),
(2, 'two'), (3, 'three');
--comment4
--comment5"""
result = run(executor, statement, pgspecial=pgspecial)
assert result != None
assert result[5].find("three") >= 0
@dbtest
def test_multiple_queries_same_line(executor):
result = run(executor, "select 'foo'; select 'bar'")
assert len(result) == 12 # 2 * (output+status) * 3 lines
assert "foo" in result[3]
assert "bar" in result[9]
@dbtest
def test_multiple_queries_with_special_command_same_line(executor, pgspecial):
result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial)
assert len(result) == 11 # 2 * (output+status) * 3 lines
assert "foo" in result[3]
# This is a lame check. :(
assert "Schema" in result[7]
@dbtest
def test_multiple_queries_same_line_syntaxerror(executor, exception_formatter):
result = run(
executor,
"select 'fooé'; invalid syntax é",
exception_formatter=exception_formatter,
)
assert "fooé" in result[3]
assert 'syntax error at or near "invalid"' in result[-1]
@pytest.fixture
def pgspecial():
return PGCli().pgspecial
@dbtest
def test_special_command_help(executor, pgspecial):
result = run(executor, "\\?", pgspecial=pgspecial)[1].split("|")
assert "Command" in result[1]
assert "Description" in result[2]
@dbtest
def test_bytea_field_support_in_output(executor):
run(executor, "create table binarydata(c bytea)")
run(executor, "insert into binarydata (c) values (decode('DEADBEEF', 'hex'))")
assert "\\xdeadbeef" in run(executor, "select * from binarydata", join=True)
@dbtest
def test_unicode_support_in_unknown_type(executor):
assert "日本語" in run(executor, "SELECT '日本語' AS japanese;", join=True)
@dbtest
def test_unicode_support_in_enum_type(executor):
run(executor, "CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy', '日本語')")
run(executor, "CREATE TABLE person (name TEXT, current_mood mood)")
run(executor, "INSERT INTO person VALUES ('Moe', '日本語')")
assert "日本語" in run(executor, "SELECT * FROM person", join=True)
@requires_json
def test_json_renders_without_u_prefix(executor, expanded):
run(executor, "create table jsontest(d json)")
run(executor, """insert into jsontest (d) values ('{"name": "Éowyn"}')""")
result = run(
executor, "SELECT d FROM jsontest LIMIT 1", join=True, expanded=expanded
)
assert '{"name": "Éowyn"}' in result
@requires_jsonb
def test_jsonb_renders_without_u_prefix(executor, expanded):
run(executor, "create table jsonbtest(d jsonb)")
run(executor, """insert into jsonbtest (d) values ('{"name": "Éowyn"}')""")
result = run(
executor, "SELECT d FROM jsonbtest LIMIT 1", join=True, expanded=expanded
)
assert '{"name": "Éowyn"}' in result
@dbtest
def test_date_time_types(executor):
run(executor, "SET TIME ZONE UTC")
assert (
run(executor, "SELECT (CAST('00:00:00' AS time))", join=True).split("\n")[3]
== "| 00:00:00 |"
)
assert (
run(executor, "SELECT (CAST('00:00:00+14:59' AS timetz))", join=True).split(
"\n"
)[3]
== "| 00:00:00+14:59 |"
)
assert (
run(executor, "SELECT (CAST('4713-01-01 BC' AS date))", join=True).split("\n")[
3
]
== "| 4713-01-01 BC |"
)
assert (
run(
executor, "SELECT (CAST('4713-01-01 00:00:00 BC' AS timestamp))", join=True
).split("\n")[3]
== "| 4713-01-01 00:00:00 BC |"
)
assert (
run(
executor,
"SELECT (CAST('4713-01-01 00:00:00+00 BC' AS timestamptz))",
join=True,
).split("\n")[3]
== "| 4713-01-01 00:00:00+00 BC |"
)
assert (
run(
executor, "SELECT (CAST('-123456789 days 12:23:56' AS interval))", join=True
).split("\n")[3]
== "| -123456789 days, 12:23:56 |"
)
@dbtest
@pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"])
def test_large_numbers_render_directly(executor, value):
run(executor, "create table numbertest(a numeric)")
run(executor, f"insert into numbertest (a) values ({value})")
assert value in run(executor, "select * from numbertest", join=True)
@dbtest
@pytest.mark.parametrize("command", ["di", "dv", "ds", "df", "dT"])
@pytest.mark.parametrize("verbose", ["", "+"])
@pytest.mark.parametrize("pattern", ["", "x", "*.*", "x.y", "x.*", "*.y"])
def test_describe_special(executor, command, verbose, pattern, pgspecial):
# We don't have any tests for the output of any of the special commands,
# but we can at least make sure they run without error
sql = r"\{command}{verbose} {pattern}".format(**locals())
list(executor.run(sql, pgspecial=pgspecial))
@dbtest
@pytest.mark.parametrize("sql", ["invalid sql", "SELECT 1; select error;"])
def test_raises_with_no_formatter(executor, sql):
with pytest.raises(psycopg.ProgrammingError):
list(executor.run(sql))
@dbtest
def test_on_error_resume(executor, exception_formatter):
sql = "select 1; error; select 1;"
result = list(
executor.run(sql, on_error_resume=True, exception_formatter=exception_formatter)
)
assert len(result) == 3
@dbtest
def test_on_error_stop(executor, exception_formatter):
sql = "select 1; error; select 1;"
result = list(
executor.run(
sql, on_error_resume=False, exception_formatter=exception_formatter
)
)
assert len(result) == 2
# @dbtest
# def test_unicode_notices(executor):
# sql = "DO language plpgsql $$ BEGIN RAISE NOTICE '有人更改'; END $$;"
# result = list(executor.run(sql))
# assert result[0][0] == u'NOTICE: 有人更改\n'
@dbtest
def test_nonexistent_function_definition(executor):
with pytest.raises(RuntimeError):
result = executor.view_definition("there_is_no_such_function")
@dbtest
def test_function_definition(executor):
run(
executor,
"""
CREATE OR REPLACE FUNCTION public.the_number_three()
RETURNS int
LANGUAGE sql
AS $function$
select 3;
$function$
""",
)
result = executor.function_definition("the_number_three")
@dbtest
def test_function_notice_order(executor):
run(
executor,
"""
CREATE OR REPLACE FUNCTION demo_order() RETURNS VOID AS
$$
BEGIN
RAISE NOTICE 'first';
RAISE NOTICE 'second';
RAISE NOTICE 'third';
RAISE NOTICE 'fourth';
RAISE NOTICE 'fifth';
RAISE NOTICE 'sixth';
END;
$$
LANGUAGE plpgsql;
""",
)
executor.function_definition("demo_order")
result = run(executor, "select demo_order()")
assert "first\nsecond\nthird\nfourth\nfifth\nsixth" in result[0]
assert "+------------+" in result[1]
assert "| demo_order |" in result[2]
assert "|------------|" in result[3]
assert "| |" in result[4]
assert "+------------+" in result[5]
assert "SELECT 1" in result[6]
@dbtest
def test_view_definition(executor):
run(executor, "create table tbl1 (a text, b numeric)")
run(executor, "create view vw1 AS SELECT * FROM tbl1")
run(executor, "create materialized view mvw1 AS SELECT * FROM tbl1")
result = executor.view_definition("vw1")
assert 'VIEW "public"."vw1" AS' in result
assert "FROM tbl1" in result
# import pytest; pytest.set_trace()
result = executor.view_definition("mvw1")
assert "MATERIALIZED VIEW" in result
@dbtest
def test_nonexistent_view_definition(executor):
with pytest.raises(RuntimeError):
result = executor.view_definition("there_is_no_such_view")
with pytest.raises(RuntimeError):
result = executor.view_definition("mvw1")
@dbtest
def test_short_host(executor):
with patch.object(executor, "host", "localhost"):
assert executor.short_host == "localhost"
with patch.object(executor, "host", "localhost.example.org"):
assert executor.short_host == "localhost"
with patch.object(
executor, "host", "localhost1.example.org,localhost2.example.org"
):
assert executor.short_host == "localhost1"
with patch.object(executor, "host", "ec2-11-222-333-444.compute-1.amazonaws.com"):
assert executor.short_host == "ec2-11-222-333-444"
with patch.object(executor, "host", "1.2.3.4"):
assert executor.short_host == "1.2.3.4"
class VirtualCursor:
"""Mock a cursor to virtual database like pgbouncer."""
def __init__(self):
self.protocol_error = False
self.protocol_message = ""
self.description = None
self.status = None
self.statusmessage = "Error"
def execute(self, *args, **kwargs):
self.protocol_error = True
self.protocol_message = "Command not supported"
@dbtest
def test_exit_without_active_connection(executor):
quit_handler = MagicMock()
pgspecial = PGSpecial()
pgspecial.register(
quit_handler,
"\\q",
"\\q",
"Quit pgcli.",
arg_type=NO_QUERY,
case_sensitive=True,
aliases=(":q",),
)
with patch.object(
executor.conn, "cursor", side_effect=psycopg.InterfaceError("I'm broken!")
):
# we should be able to quit the app, even without active connection
run(executor, "\\q", pgspecial=pgspecial)
quit_handler.assert_called_once()
# an exception should be raised when running a query without active connection
with pytest.raises(psycopg.InterfaceError):
run(executor, "select 1", pgspecial=pgspecial)
@dbtest
def test_virtual_database(executor):
virtual_connection = MagicMock()
virtual_connection.cursor.return_value = VirtualCursor()
with patch.object(executor, "conn", virtual_connection):
result = run(executor, "select 1")
assert "Command not supported" in result
|