1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
|
# -*- coding: utf-8 -*-
# :Project: pglast — Test the __main__.py module
# :Created: lun 07 ago 2017 12:50:37 CEST
# :Author: Lele Gaifax <lele@metapensiero.it>
# :License: GNU General Public License version 3 or later
# :Copyright: © 2017, 2018, 2019, 2021, 2022, 2024 Lele Gaifax
#
from contextlib import _RedirectStream, redirect_stdout
from io import StringIO
from os import unlink
from tempfile import NamedTemporaryFile
import pytest
from pglast.__main__ import main
class redirect_stdin(_RedirectStream):
_stream = "stdin"
class UnclosableStream(StringIO):
def close(self):
pass
def test_cli_workhorse():
with StringIO() as output:
with redirect_stdout(output):
with pytest.raises(SystemExit) as status:
main(['-h'])
assert status.value.args[0] == 0
assert 'usage:' in output.getvalue()
with StringIO("Select foo,bar Fron sometable") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
with pytest.raises(SystemExit) as status:
main([])
assert str(status.value.args[0]) == \
'syntax error at or near "sometable", at index 20'
with StringIO("Select foo,bar From sometable Where foo<>0") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main([])
assert output.getvalue() == """\
SELECT foo
, bar
FROM sometable
WHERE foo <> 0
"""
output = NamedTemporaryFile(delete=False)
try:
main(['-S', "select 1", '-', output.name])
with open(output.name) as f:
assert f.read() == "SELECT 1\n"
finally:
try:
unlink(output.name)
except PermissionError:
# This happens under Windows, for some reason the file is still
# in use...
pass
with StringIO("Select 1") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--parse-tree'])
assert "'val': {'@': 'Integer', 'ival': 1}" in output.getvalue()
with StringIO("select a,b,c from st where a='foo'") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--normalize'])
assert output.getvalue() == "SELECT a, b, c FROM st WHERE a = 'foo'\n"
with StringIO("""\
CREATE FUNCTION add (a integer, b integer) RETURNS integer AS $$
BEGIN RETURN a + b; END;
$$ LANGUAGE plpgsql""") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--plpgsql', '--parse-tree'])
assert '"PLpgSQL_function":' in output.getvalue()
with StringIO("Select 1") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main([])
assert output.getvalue() == "SELECT 1\n"
with StringIO("Select 1;") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main([])
assert output.getvalue() == "SELECT 1\n"
with StringIO("Select 1; Select 2") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main([])
assert output.getvalue() == "SELECT 1;\n\nSELECT 2\n"
with StringIO("Select 1; Select 2") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--semicolon-after-last-statement'])
assert output.getvalue() == "SELECT 1;\n\nSELECT 2;\n"
with StringIO("Select 'abcdef'") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--split-string-literals', '0'])
assert output.getvalue() == "SELECT 'abcdef'\n"
with StringIO("Select 'abcdef'") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--split-string-literals', '3'])
assert output.getvalue() == """\
SELECT 'abc'
'def'
"""
with StringIO("Select /* one */ 1") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--preserve-comments'])
assert output.getvalue() == "SELECT /* one */ 1\n"
with StringIO("CREATE TABLESPACE tbl LOCATION 'dir' -- comment") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--preserve-comments'])
assert output.getvalue() == "CREATE TABLESPACE tbl LOCATION 'dir' -- comment\n\n"
with StringIO("""\
select collation for ('abc'),
normalize('abc', nfc),
overlay('Txxxxas' placing 'hom' FROM 2 for 4),
position('bc' in 'abcd'),
trim(both ' abc '),
trim(both '*' from '***abc***'),
trim(leading '*' from '***abc***'),
trim(trailing '*' from '***abc***'),
xmlexists('//town[text() = ''Toronto'']'
passing '<towns><town>Toronto</town><town>Ottawa</town></towns>');
""") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--special-functions', '--compact-lists-margin', '100'])
assert output.getvalue() == """\
SELECT COLLATION FOR ('abc')
, normalize('abc', NFC)
, overlay('Txxxxas' PLACING 'hom' FROM 2 FOR 4)
, position('bc' IN 'abcd')
, trim(BOTH FROM ' abc ')
, trim(BOTH '*' FROM '***abc***')
, trim(LEADING '*' FROM '***abc***')
, trim(TRAILING '*' FROM '***abc***')
, xmlexists('//town[text() = ''Toronto'']'\
PASSING BY REF '<towns><town>Toronto</town><town>Ottawa</town></towns>')
"""
with StringIO("select extract(hour from t1.modtime), count(*) from t1") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--special-functions', '--compact-lists-margin', '120'])
assert output.getvalue() == ("SELECT EXTRACT(HOUR FROM t1.modtime), count(*)\n"
"FROM t1\n")
with StringIO("select extract(hour from t1.modtime), count(*) from t1") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--special-functions', '--compact-lists-margin', '40'])
assert output.getvalue() == ("SELECT EXTRACT(HOUR FROM t1.modtime)\n"
" , count(*)\n"
"FROM t1\n")
with StringIO("select substring('123' from 1 for 2)") as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--special-functions', '--compact-lists-margin', '40'])
assert output.getvalue() == "SELECT substring('123' FROM 1 FOR 2)\n"
in_stmt = """\
select substring('123',2,3),
regexp_split_to_array('x,x,x', ','),
btrim('xxx'), trim('xxx'),
POSITION('hour' in trim(substring('xyz hour ',1,6)))
"""
with StringIO(in_stmt) as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--compact-lists-margin', '100'])
assert output.getvalue() == """\
SELECT substring('123', 2, 3)
, regexp_split_to_array('x,x,x', ',')
, btrim('xxx')
, pg_catalog.btrim('xxx')
, pg_catalog.position(pg_catalog.btrim(substring('xyz hour ', 1, 6)), 'hour')
"""
with StringIO(in_stmt) as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--remove-pg_catalog-from-functions', '--compact-lists-margin', '100'])
assert output.getvalue() == """\
SELECT substring('123', 2, 3)
, regexp_split_to_array('x,x,x', ',')
, btrim('xxx')
, btrim('xxx')
, pg_catalog.position(btrim(substring('xyz hour ', 1, 6)), 'hour')
"""
with StringIO('SELECT NULLIF(1, 0)') as input:
with UnclosableStream() as output:
with redirect_stdin(input), redirect_stdout(output):
main(['--remove-pg_catalog-from-functions'])
assert output.getvalue() == "SELECT NULLIF(1, 0)\n"
|