File: flink_test.py

package info (click to toggle)
sqlfluff 3.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 34,000 kB
  • sloc: python: 106,131; sql: 34,188; makefile: 52; sh: 8
file content (350 lines) | stat: -rw-r--r-- 10,759 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""Tests for the FlinkSQL dialect."""

from sqlfluff.core import FluffConfig, Linter


class TestFlinkSQLDialect:
    """Test FlinkSQL dialect parsing."""

    def test_flink_dialect_basic(self):
        """Test basic FlinkSQL dialect functionality."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        # Test simple SELECT statement
        sql = "SELECT * FROM my_table;\n"
        result = linter.lint_string(sql)
        assert result is not None
        # Check for parsing errors only, ignore style warnings
        parsing_errors = [v for v in result.violations if v.rule.code.startswith("PRS")]
        assert len(parsing_errors) == 0

    def test_flink_create_table_basic(self):
        """Test basic CREATE TABLE statement."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE my_table (
            id INT,
            name STRING,
            age INT
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'my-topic'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None
        # Allow for some parsing issues initially

    def test_flink_row_data_type(self):
        """Test FlinkSQL ROW data type."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE my_table (
            id INT,
            nested_data ROW<name STRING, age INT>
        ) WITH (
            'connector' = 'kafka'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_timestamp_with_precision(self):
        """Test FlinkSQL TIMESTAMP with precision."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE my_table (
            id INT,
            event_time TIMESTAMP(3),
            processing_time TIMESTAMP_LTZ(3)
        ) WITH (
            'connector' = 'kafka'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_watermark_definition(self):
        """Test FlinkSQL WATERMARK definition."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE my_table (
            id INT,
            event_time TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_computed_column(self):
        """Test FlinkSQL computed column."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE my_table (
            id INT,
            name STRING,
            full_name AS CONCAT(name, '_suffix')
        ) WITH (
            'connector' = 'kafka'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_metadata_column(self):
        """Test FlinkSQL metadata column."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE my_table (
            id INT,
            name STRING,
            kafka_offset BIGINT METADATA FROM 'offset'
        ) WITH (
            'connector' = 'kafka'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_show_statements(self):
        """Test FlinkSQL SHOW statements."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        statements = [
            "SHOW CATALOGS",
            "SHOW DATABASES",
            "SHOW TABLES",
            "SHOW VIEWS",
            "SHOW FUNCTIONS",
            "SHOW MODULES",
            "SHOW JARS",
            "SHOW JOBS",
        ]

        for sql in statements:
            result = linter.lint_string(sql)
            assert result is not None

    def test_flink_use_statements(self):
        """Test FlinkSQL USE statements."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        statements = [
            "USE CATALOG my_catalog",
            "USE my_database",
            "USE my_catalog.my_database",
        ]

        for sql in statements:
            result = linter.lint_string(sql)
            assert result is not None

    def test_flink_describe_statement(self):
        """Test FlinkSQL DESCRIBE statement."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = "DESCRIBE my_table"
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_explain_statement(self):
        """Test FlinkSQL EXPLAIN statement."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = "EXPLAIN SELECT * FROM my_table"
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_set_statement(self):
        """Test FlinkSQL SET statement."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        statements = [
            "SET",
            "SET 'table.exec.state.ttl' = '1h'",
            "SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE'",
            "SET 'execution.checkpointing.unaligned.enabled' = 'true'",
            "SET 'execution.checkpointing.timeout' = '600000'",
        ]

        for sql in statements:
            result = linter.lint_string(sql)
            assert result is not None

    def test_flink_create_catalog(self):
        """Test FlinkSQL CREATE CATALOG statement."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE CATALOG my_catalog WITH (
            'type' = 'hive',
            'hive-conf-dir' = '/path/to/hive/conf'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_create_database(self):
        """Test FlinkSQL CREATE DATABASE statement."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE DATABASE IF NOT EXISTS my_db
        COMMENT 'My database'
        WITH (
            'key1' = 'value1'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_alternative_with_syntax(self):
        """Test FlinkSQL WITH clause using alternative double equals syntax."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE test_table (
          data_info ROW<`info` STRING>,
          name STRING,
          score DOUBLE,
          total_count DOUBLE,
          active_count DOUBLE,
          metadata ROW<`details` STRING>,
          change_rate DOUBLE,
          volume DOUBLE,
          change_percentage DOUBLE,
          updated_at TIMESTAMP(3),
          category STRING
        ) WITH (
          connector == 'test-connector',
          environment == 'development'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None


class TestFlinkSQLComplexExamples:
    """Test FlinkSQL with complex examples covering various features."""

    def test_flink_row_datatype_table(self):
        """Test FlinkSQL table with ROW data types and connector options."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE table1 (
          data_info ROW<`name` STRING>,
          email STRING,
          score DOUBLE,
          total_points DOUBLE,
          active_points DOUBLE,
          metadata ROW<`description` STRING>,
          change_percentage DOUBLE,
          volume DOUBLE,
          rate_change_percentage DOUBLE,
          last_updated TIMESTAMP(3),
          status STRING
        )  WITH (
          'connector' = 'test-connector',
          'project' = 'test-project',
          'dataset' = 'test-dataset'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_complex_table_structure(self):
        """Test FlinkSQL table with complex structure and multiple data types."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE table2 (
          session_id STRING,
          session_ts TIMESTAMP(3),
          source_name STRING,
          service STRING,
          category STRING,
          category_id STRING,
          type STRING,
          type_id STRING,
          identifier STRING,
          identifier_id STRING,
          event_type STRING,
          action_type STRING,
          resource_type STRING,
          value DOUBLE,
          quantity DOUBLE,
          request_url STRING,
          is_deleted BOOLEAN,
          item_count INT,
          created_ts TIMESTAMP(3),
          updated_ts TIMESTAMP(3),
          processed_ts TIMESTAMP(3),
          received_ts TIMESTAMP(3),
          sequence_ts TIMESTAMP(3)
        ) WITH (
          'connector' = 'test-connector',
          'project' = 'test-project',
          'dataset' = 'test-dataset',
          'table' = 'test-table'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None

    def test_flink_simple_record_table(self):
        """Test FlinkSQL table with simple record structure."""
        config = FluffConfig(overrides={"dialect": "flink"})
        linter = Linter(config=config)

        sql = """
        CREATE TABLE table3 (
          service STRING,
          type STRING,
          from_id STRING,
          to_id STRING,
          amount DOUBLE,
          quantity DOUBLE,
          executed_at TIMESTAMP(3),
          id STRING,
          request_url STRING,
          direction STRING,
          client_received_timestamp TIMESTAMP(3),
          job_timestamp TIMESTAMP(3),
          job_id STRING,
          processor STRING
        ) WITH (
          'connector' = 'test-connector',
          'project' = 'test-project',
          'dataset' = 'test-dataset',
          'table' = 'test-records'
        )
        """
        result = linter.lint_string(sql)
        assert result is not None