File: test_catalog.py

package info (click to toggle)
harlequin-postgres 1.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 324 kB
  • sloc: python: 1,651; makefile: 24
file content (92 lines) | stat: -rw-r--r-- 3,464 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import pytest
from harlequin.catalog import InteractiveCatalogItem
from harlequin_postgres.adapter import HarlequinPostgresConnection
from harlequin_postgres.catalog import (
    ColumnCatalogItem,
    DatabaseCatalogItem,
    RelationCatalogItem,
    SchemaCatalogItem,
    TableCatalogItem,
    ViewCatalogItem,
)


@pytest.fixture
def connection_with_objects(
    connection: HarlequinPostgresConnection,
) -> HarlequinPostgresConnection:
    connection.execute("create schema one")
    connection.execute("create table one.foo as select 1 as a, '2' as b")
    connection.execute("create table one.bar as select 1 as a, '2' as b")
    connection.execute("create table one.baz as select 1 as a, '2' as b")
    connection.execute("create schema two")
    connection.execute("create view two.qux as select * from one.foo")
    connection.execute("create schema three")
    # the original connection fixture will clean this up.
    return connection


def test_catalog(connection_with_objects: HarlequinPostgresConnection) -> None:
    conn = connection_with_objects

    catalog = conn.get_catalog()

    # at least two databases, postgres and test
    assert len(catalog.items) >= 2

    [test_db_item] = filter(lambda item: item.label == "test", catalog.items)
    assert isinstance(test_db_item, InteractiveCatalogItem)
    assert isinstance(test_db_item, DatabaseCatalogItem)
    assert not test_db_item.children
    assert not test_db_item.loaded

    schema_items = test_db_item.fetch_children()
    assert all(isinstance(item, SchemaCatalogItem) for item in schema_items)

    [schema_one_item] = filter(lambda item: item.label == "one", schema_items)
    assert isinstance(schema_one_item, SchemaCatalogItem)
    assert not schema_one_item.children
    assert not schema_one_item.loaded

    table_items = schema_one_item.fetch_children()
    assert all(isinstance(item, RelationCatalogItem) for item in table_items)

    [foo_item] = filter(lambda item: item.label == "foo", table_items)
    assert isinstance(foo_item, TableCatalogItem)
    assert not foo_item.children
    assert not foo_item.loaded

    foo_column_items = foo_item.fetch_children()
    assert all(isinstance(item, ColumnCatalogItem) for item in foo_column_items)

    [schema_two_item] = filter(lambda item: item.label == "two", schema_items)
    assert isinstance(schema_two_item, SchemaCatalogItem)
    assert not schema_two_item.children
    assert not schema_two_item.loaded

    view_items = schema_two_item.fetch_children()
    assert all(isinstance(item, ViewCatalogItem) for item in view_items)

    [qux_item] = filter(lambda item: item.label == "qux", view_items)
    assert isinstance(qux_item, ViewCatalogItem)
    assert not qux_item.children
    assert not qux_item.loaded

    qux_column_items = qux_item.fetch_children()
    assert all(isinstance(item, ColumnCatalogItem) for item in qux_column_items)

    assert [item.label for item in foo_column_items] == [
        item.label for item in qux_column_items
    ]

    # ensure calling fetch_children on cols doesn't raise
    children_items = foo_column_items[0].fetch_children()
    assert not children_items

    [schema_three_item] = filter(lambda item: item.label == "three", schema_items)
    assert isinstance(schema_two_item, SchemaCatalogItem)
    assert not schema_two_item.children
    assert not schema_two_item.loaded

    three_children = schema_three_item.fetch_children()
    assert not three_children