1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
from tests.dialects.test_dialect import Validator
class TestRisingWave(Validator):
dialect = "risingwave"
maxDiff = None
def test_risingwave(self):
self.validate_all(
"SELECT a FROM tbl",
read={
"": "SELECT a FROM tbl FOR UPDATE",
},
)
self.validate_identity(
"CREATE SOURCE from_kafka (*, gen_i32_field INT AS int32_field + 2, gen_i64_field INT AS int64_field + 2, WATERMARK FOR time_col AS time_col - INTERVAL '5 SECOND') INCLUDE header foo VARCHAR AS myheader INCLUDE key AS mykey WITH (connector='kafka', topic='my_topic') FORMAT PLAIN ENCODE PROTOBUF (A=1, B=2) KEY ENCODE PROTOBUF (A=3, B=4)"
)
self.validate_identity(
"CREATE SINK my_sink AS SELECT * FROM A WITH (connector='kafka', topic='my_topic') FORMAT PLAIN ENCODE PROTOBUF (A=1, B=2) KEY ENCODE PROTOBUF (A=3, B=4)"
)
self.validate_identity(
"WITH t1 AS MATERIALIZED (SELECT 1), t2 AS NOT MATERIALIZED (SELECT 2) SELECT * FROM t1, t2"
)
def test_datatypes(self):
self.validate_identity("SELECT CAST(NULL AS MAP(VARCHAR, INT)) AS map_column")
self.validate_identity(
"SELECT NULL::MAP<VARCHAR, INT> AS map_column",
"SELECT CAST(NULL AS MAP(VARCHAR, INT)) AS map_column",
)
self.validate_identity("CREATE TABLE t (map_col MAP(VARCHAR, INT))")
self.validate_identity(
"CREATE TABLE t (map_col MAP<VARCHAR, INT>)",
"CREATE TABLE t (map_col MAP(VARCHAR, INT))",
)
|