File: test_sqlite3.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (862 lines) | stat: -rw-r--r-- 28,006 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
# -*- coding: utf-8 -*-
"""Tests for _sqlite3.py"""

from __future__ import absolute_import
import pytest
import sys
import gc
import io
import contextlib

_sqlite3 = pytest.importorskip('_sqlite3')

pypy_only = pytest.mark.skipif('__pypy__' not in sys.builtin_module_names,
    reason="PyPy-only test")


@pytest.fixture
def con():
    con = _sqlite3.connect(':memory:')
    yield con
    con.close()

con2 = con # allow using two connections


def test_list_ddl(con):
    """From issue996.  Mostly just looking for lack of exceptions."""
    cursor = con.cursor()
    cursor.execute('CREATE TABLE foo (bar INTEGER)')
    result = list(cursor)
    assert result == []
    cursor.execute('INSERT INTO foo (bar) VALUES (42)')
    result = list(cursor)
    assert result == []
    cursor.execute('SELECT * FROM foo')
    result = list(cursor)
    assert result == [(42,)]

@pypy_only
def test_connect_takes_same_positional_args_as_Connection(con):
    from inspect import getfullargspec
    clsargs = getfullargspec(_sqlite3.Connection.__init__).args[1:]  # ignore self
    conargs = getfullargspec(_sqlite3.connect).args
    assert clsargs == conargs

def test_total_changes_after_close(con):
    con.close()
    with pytest.raises(_sqlite3.ProgrammingError):
        con.total_changes

def test_connection_check_init():
    class Connection(_sqlite3.Connection):
        def __init__(self, name):
            pass

    con = Connection(":memory:")
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        con.cursor()
    assert '__init__' in str(excinfo.value)


def test_cursor_check_init(con):
    class Cursor(_sqlite3.Cursor):
        def __init__(self, name):
            pass

    cur = Cursor(con)
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        cur.execute('select 1')
    assert '__init__' in str(excinfo.value)

def test_connection_after_close(con):
    with pytest.raises(TypeError):
        con()
    con.close()
    # raises ProgrammingError because should check closed before check args
    with pytest.raises(_sqlite3.ProgrammingError):
        con()

def test_cursor_iter(con):
    cur = con.cursor()
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    next(cur)
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    con.commit()
    next(cur)
    with pytest.raises(StopIteration):
        next(cur)

    cur.executemany('select 1', [])
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    cur.execute('create table test(ing)')
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    cur.execute('insert into test values(1)')
    con.commit()
    with pytest.raises(StopIteration):
        next(cur)

def test_cursor_after_close(con):
    cur = con.execute('select 1')
    cur.close()
    con.close()
    with pytest.raises(_sqlite3.ProgrammingError):
        cur.close()
    # raises ProgrammingError because should check closed before check args
    with pytest.raises(_sqlite3.ProgrammingError):
        cur.execute(1,2,3,4,5)
    with pytest.raises(_sqlite3.ProgrammingError):
        cur.executemany(1,2,3,4,5)

def test_connection_del(tmpdir):
    """For issue1325."""
    import os
    import gc
    resource = pytest.importorskip('resource')

    limit = resource.getrlimit(resource.RLIMIT_NOFILE)
    try:
        fds = 0
        while True:
            fds += 1
            resource.setrlimit(resource.RLIMIT_NOFILE, (fds, limit[1]))
            try:
                for p in os.pipe(): os.close(p)
            except OSError:
                assert fds < 100
            else:
                break

        def open_many(cleanup):
            con = []
            for i in range(3):
                con.append(_sqlite3.connect(str(tmpdir.join('test.db'))))
                if cleanup:
                    con[i] = None
                    gc.collect(); gc.collect()

        def assert_fds_limit_triggers_operational_error():
            with pytest.raises(_sqlite3.OperationalError):
                open_many(False)
        assert_fds_limit_triggers_operational_error()
        gc.collect(); gc.collect()
        open_many(True)
    finally:
        resource.setrlimit(resource.RLIMIT_NOFILE, limit)

def test_on_conflict_rollback_executemany(con):
    major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
    if (int(major), int(minor), int(micro)) < (3, 2, 2):
        pytest.skip("requires sqlite3 version >= 3.2.2")
    con.execute("create table foo(x, unique(x) on conflict rollback)")
    con.execute("insert into foo(x) values (1)")
    try:
        con.executemany("insert into foo(x) values (?)", [[1]])
    except _sqlite3.DatabaseError:
        pass
    con.execute("insert into foo(x) values (2)")
    try:
        con.commit()
    except _sqlite3.OperationalError:
        pytest.fail("_sqlite3 knew nothing about the implicit ROLLBACK")

def test_statement_arg_checking(con):
    with pytest.raises(TypeError) as e:
        con(123)
    with pytest.raises(TypeError) as e:
        con.execute(123)
    with pytest.raises(TypeError) as e:
        con.executemany(123, 123)
    with pytest.raises(TypeError) as e:
        con.executescript(123)
    assert str(e.value).startswith('script argument must be unicode')

def test_statement_param_checking(con):
    con.execute('create table foo(x)')
    con.execute('insert into foo(x) values (?)', [2])
    con.execute('insert into foo(x) values (?)', (2,))
    class seq(object):
        def __len__(self):
            return 1
        def __getitem__(self, key):
            return 2
    con.execute('insert into foo(x) values (?)', seq())
    del seq.__len__
    with pytest.raises(_sqlite3.ProgrammingError):
        con.execute('insert into foo(x) values (?)', seq())
    with pytest.raises(_sqlite3.ProgrammingError):
        con.execute('insert into foo(x) values (?)', {2:2})
    with pytest.raises(_sqlite3.ProgrammingError) as e:
        con.execute('insert into foo(x) values (?)', 2)
    assert str(e.value) == 'parameters are of unsupported type'

def test_explicit_begin(con):
    con.execute('BEGIN')
    with pytest.raises(_sqlite3.OperationalError):
        con.execute('BEGIN ')
    con.commit()
    con.execute('BEGIN')
    con.commit()

def test_row_factory_use(con):
    con.row_factory = 42
    con.execute('select 1')

def test_returning_blob_must_own_memory(con):
    con.create_function("returnblob", 0, lambda: memoryview(b"blob"))
    cur = con.execute("select returnblob()")
    val = cur.fetchone()[0]
    assert isinstance(val, bytes)

def test_function_arg_str_null_char(con):
    con.create_function("strlen", 1, lambda a: len(a))
    cur = con.execute("select strlen(?)", ["x\0y"])
    val = cur.fetchone()[0]
    assert val == 3


def test_description_after_fetchall(con):
    cur = con.cursor()
    assert cur.description is None
    cur.execute("select 42").fetchall()
    assert cur.description is not None

def test_executemany_lastrowid(con):
    cur = con.cursor()
    cur.execute("create table test(a)")
    cur.executemany("insert into test values (?)", [[1], [2], [3]])
    assert cur.lastrowid == 0
    # issue 2682
    cur.execute('''insert
                into test
                values (?)
                ''', (1, ))
    assert cur.lastrowid
    cur.execute('''insert\t into test values (?) ''', (1, ))
    assert cur.lastrowid

def test_authorizer_bad_value(con):
    def authorizer_cb(action, arg1, arg2, dbname, source):
        return 42
    con.set_authorizer(authorizer_cb)
    with pytest.raises(_sqlite3.OperationalError) as e:
        con.execute('select 123')
    major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
    if (int(major), int(minor), int(micro)) >= (3, 6, 14):
        assert str(e.value) == 'authorizer malfunction'
    else:
        assert str(e.value) == \
            ("illegal return value (1) from the authorization function - "
                "should be SQLITE_OK, SQLITE_IGNORE, or SQLITE_DENY")

def test_authorizer_clear(con):
    def authorizer_cb(action, arg1, arg2, dbname, source):
        return _sqlite3.SQLITE_DENY
    con.set_authorizer(authorizer_cb)
    with pytest.raises(_sqlite3.DatabaseError) as e:
        con.execute('select 123')
    con.set_authorizer(None)
    con.execute('select 123')

def test_issue1573(con):
    cur = con.cursor()
    cur.execute(u'SELECT 1 as méil')
    assert cur.description[0][0] == u"méil"

def test_adapter_exception(con):
    def cast(obj):
        raise ZeroDivisionError

    _sqlite3.register_adapter(int, cast)
    try:
        cur = con.cursor()
        with pytest.raises(ZeroDivisionError):
            cur.execute("select ?", (4,))
    finally:
        del _sqlite3.adapters[(int, _sqlite3.PrepareProtocol)]
        _sqlite3.BASE_TYPE_ADAPTED = False

def test_null_character(con):
    if not hasattr(_sqlite3, '_ffi') and sys.version_info < (2, 7, 9):
        pytest.skip("_sqlite3 too old")
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        con("\0select 1")
    assert str(excinfo.value) == "the query contains a null character"
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        con("select 1\0")
    assert str(excinfo.value) == "the query contains a null character"
    cur = con.cursor()
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        cur.execute("\0select 2")
    assert str(excinfo.value) == "the query contains a null character"
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        cur.execute("select 2\0")
    assert str(excinfo.value) == "the query contains a null character"

def test_close_in_del_ordering():
    import gc
    class SQLiteBackend(object):
        success = False
        def __init__(self):
            self.connection = _sqlite3.connect(":memory:")
        def close(self):
            self.connection.close()
        def __del__(self):
            self.close()
            SQLiteBackend.success = True
        def create_db_if_needed(self):
            conn = self.connection
            cursor = conn.cursor()
            cursor.execute("""
                create table if not exists nameoftable(value text)
            """)
            cursor.close()
            conn.commit()
    SQLiteBackend().create_db_if_needed()
    gc.collect()
    gc.collect()
    assert SQLiteBackend.success

def test_locked_table(con):
    con.execute("CREATE TABLE foo(x)")
    con.execute("INSERT INTO foo(x) VALUES (?)", [42])
    cur = con.execute("SELECT * FROM foo")  # foo() is locked while cur is active
    with pytest.raises(_sqlite3.OperationalError):
        con.execute("DROP TABLE foo")

def test_cursor_close(con):
    con.execute("CREATE TABLE foo(x)")
    con.execute("INSERT INTO foo(x) VALUES (?)", [42])
    cur = con.execute("SELECT * FROM foo")
    cur.close()
    con.execute("DROP TABLE foo")  # no error

def test_cursor_del(con):
    con.execute("CREATE TABLE foo(x)")
    con.execute("INSERT INTO foo(x) VALUES (?)", [42])
    con.execute("SELECT * FROM foo")
    import gc; gc.collect()
    con.execute("DROP TABLE foo")  # no error

def test_open_path():
    class P:
        def __fspath__(self):
            return b":memory:"
    _sqlite3.connect(P())

def test_isolation_bug():
    con = _sqlite3.connect(":memory:", isolation_level=None)
    #con = _sqlite3.connect(":memory:")
    #con.isolation_level = None
    cur = con.cursor()
    cur.execute("create table foo(x);")

def test_reset_of_shared_statement(con):
    con = _sqlite3.connect(':memory:')
    c0 = con.cursor()
    c0.execute('CREATE TABLE data(n int, t int)')
    # insert two values
    c0.execute('INSERT INTO data(n, t) VALUES(?, ?)', (0, 1))
    c0.execute('INSERT INTO data(n, t) VALUES(?, ?)', (1, 2))

    c1 = con.execute('select * from data')
    list(c1) # c1's statement is no longer in use afterwards
    c2 = con.execute('select * from data')
    # the statement between c1 and c2 is shared
    assert c1._Cursor__statement is c2._Cursor__statement
    val = next(c2)
    assert val == (0, 1)
    c1 = None # make c1 unreachable
    gc.collect() # calling c1.__del__ used to reset c2._Cursor__statement!
    val = next(c2)
    assert val == (1, 2)
    with pytest.raises(StopIteration):
        next(c2)

def test_row_index_unicode(con):
    import sqlite3
    con.row_factory = sqlite3.Row
    row = con.execute("select 1 as \xff").fetchone()
    assert row["\xff"] == 1
    with pytest.raises(IndexError):
        row['\u0178']
    with pytest.raises(IndexError):
        row['\xdf']

@pytest.mark.skipif(not hasattr(_sqlite3.Connection, "backup"), reason="no backup")
class TestBackup:
    def test_target_is_connection(self, con):
        with pytest.raises(TypeError):
            con.backup(None)

    def test_target_different_self(self, con):
        with pytest.raises(ValueError):
            con.backup(con)

    def test_progress_callable(self, con, con2):
        with pytest.raises(TypeError):
            con.backup(con2, progress=34)

    def test_backup_simple(self, con, con2):
        cursor = con.cursor()
        con.execute('CREATE TABLE foo (key INTEGER)')
        con.executemany('INSERT INTO foo (key) VALUES (?)', [(3,), (4,)])
        con.commit()

        con.backup(con2)
        result = con2.execute("SELECT key FROM foo ORDER BY key").fetchall()
        assert result[0][0] == 3
        assert result[1][0] == 4

def test_reset_already_committed_statements_bug(con):
    con.execute('''CREATE TABLE COMPANY
             (ID INT PRIMARY KEY,
             A INT);''')
    con.execute("INSERT INTO COMPANY (ID, A) \
          VALUES (1, 2)")
    cursor = con.execute("SELECT id, a from COMPANY")
    con.commit()
    con.execute("DROP TABLE COMPANY")

def test_empty_statement():
    r = _sqlite3.connect(":memory:")
    cur = r.cursor()
    for sql in ["", " ", "/*comment*/"]:
        r = cur.execute(sql)
        assert r.description is None
        assert cur.fetchall() == []

def test_uninit_connection():
    con = _sqlite3.Connection.__new__(_sqlite3.Connection)
    with pytest.raises(_sqlite3.ProgrammingError):
        con.isolation_level
    with pytest.raises(_sqlite3.ProgrammingError):
        con.total_changes
    with pytest.raises(_sqlite3.ProgrammingError):
        con.in_transaction
    with pytest.raises(_sqlite3.ProgrammingError):
        con.iterdump()
    with pytest.raises(_sqlite3.ProgrammingError):
        con.close()

@pypy_only
@pytest.mark.parametrize(
        "param",
        (
            bytearray([1, 2, 3]),
            3.14,
            42,
            "i <3 pypy",
            b"i <3 pypy",
            None,
        ),
)
def test_need_adapt_optimization(param, con):
    adapters = dict(_sqlite3.adapters)

    def adapter(param):
        """dummy adapter that adapts only non-basetypes"""
        return 42

    assert _sqlite3.BASE_TYPE_ADAPTED == False

    # check that the fast path works for base types
    old_adapt = _sqlite3.adapt
    try:
        _sqlite3.adapt = None
        cur = con.cursor()
        cur.execute("SELECT ?", (param,))

        assert cur.fetchone() == (param,)
    finally:
        _sqlite3.adapt = old_adapt

    # check that if an adapter for a base type has been registered,
    # the behaviour is as expected
    try:
        _sqlite3.register_adapter(type(param), adapter)
        assert _sqlite3.BASE_TYPE_ADAPTED == True
        cur = con.cursor()
        cur.execute("SELECT ?", (param,))

        assert cur.fetchone() == (42,)
    finally:
        _sqlite3.adapters = adapters
        _sqlite3.BASE_TYPE_ADAPTED = False

def test_description_insert():
    conn = _sqlite3.connect(":memory:")

    cursor = conn.cursor()

    cursor.execute("""create table foo (x int, y int)""")
    cursor.execute(
        """insert into foo (x, y) values (1, 1), (2, 2), (3, 3), (4, 4)"""
    )
    cursor.execute(
        """insert into foo (x, y) values (5, 5), (6, 6)
        RETURNING x, y"""
    )

    assert cursor.description == (
        ("x", None, None, None, None, None, None),
        ("y", None, None, None, None, None, None),
    )


def test_description_update():
    conn = _sqlite3.connect(":memory:")

    cursor = conn.cursor()

    cursor.execute("""create table foo (x int, y int)""")
    cursor.execute(
        """insert into foo (x, y) values (1, 1), (2, 2), (3, 3), (4, 4)"""
    )
    cursor.execute(
        """update foo set y=y+5 where x in (2, 3)
        RETURNING x, y"""
    )

    assert cursor.description == (
        ("x", None, None, None, None, None, None),
        ("y", None, None, None, None, None, None),
    )


def test_description_delete():
    conn = _sqlite3.connect(":memory:")

    cursor = conn.cursor()

    cursor.execute("""create table foo (x int, y int)""")
    cursor.execute(
        """insert into foo (x, y) values (1, 1), (2, 2), (3, 3), (4, 4)"""
    )
    cursor.execute(
        """delete from foo where x in (1, 4)
        RETURNING x, y"""
    )

    assert cursor.description == (
        ("x", None, None, None, None, None, None),
        ("y", None, None, None, None, None, None),
    )

def test_description_parse_colnames():
    conn = _sqlite3.connect(":memory:")
    cur = conn.cursor()
    cur.execute(u'SELECT 1 as "m [ä]"')
    assert cur.description[0][0] == u"m [ä]"

    def bar(x):
        return x
    try:
        _sqlite3.converters['Ä'] = bar
        conn = _sqlite3.connect(":memory:", detect_types=_sqlite3.PARSE_COLNAMES)
        cur = conn.cursor()
        cur.execute(u'SELECT 1 as "m [ä]"')
        assert cur.description[0][0] == u"m"
    finally:
        del _sqlite3.converters['Ä']


def test_recursive_close():
    conn = _sqlite3.connect(":memory:", detect_types=_sqlite3.PARSE_COLNAMES)
    cursor = conn.cursor()
    cursor.execute("create table test(x foo)")
    cursor.executemany("insert into test(x) values (?)",
                       [("foo",), ("bar",)])
    def conv(x):
        cursor.close()
        return x
    try:
        _sqlite3.converters['CLOSE'] = conv
        with pytest.raises(_sqlite3.ProgrammingError):
            cursor.execute(f'select x as "x [CLOSE]", x from test')
            next(cursor)
    finally:
        del _sqlite3.converters['CLOSE']

def test_recursive_fetch():
    conn = _sqlite3.connect(":memory:", detect_types=_sqlite3.PARSE_COLNAMES)
    cursor = conn.cursor()
    cursor.execute("create table test(x foo)")
    cursor.executemany("insert into test(x) values (?)",
                       [("foo",), ("bar",)])
    l = []
    def conv(x):
        cursor.fetchone()
        return x
    try:
        _sqlite3.converters['ITER'] = conv
        with pytest.raises(_sqlite3.ProgrammingError):
            cursor.execute(f'select x as "x [ITER]", x from test')
            next(cursor)
    finally:
        del _sqlite3.converters['ITER']

def test_recursive_init():
    conn = _sqlite3.connect(":memory:", detect_types=_sqlite3.PARSE_COLNAMES)
    cursor = conn.cursor()
    cursor.execute("create table test(x foo)")
    cursor.executemany("insert into test(x) values (?)",
                       [("foo",), ("bar",)])
    def conv(x):
        cursor.__init__(conn)
        return x
    try:
        _sqlite3.converters['INIT'] = conv
        with pytest.raises(_sqlite3.ProgrammingError):
            cursor.execute(f'select x as "x [INIT]", x from test')
            next(cursor)
    finally:
        del _sqlite3.converters['INIT']

def test_complete_statement():
    # tested in stdlib for cpython 3.11+
    assert not _sqlite3.complete_statement("SELECT")  # missing ';'
    assert _sqlite3.complete_statement("SELECT foo FROM bar;")

def test_enable_callback_traceback():
    # tested in stdlib for cpython 3.11+
    def bad_progress():
        1 // 0

    con = _sqlite3.connect(":memory:")
    con.set_progress_handler(bad_progress, 1)
    # sanity check
    with pytest.raises(_sqlite3.OperationalError):
        con.execute("create table foo(a, b);")
    
    _sqlite3.enable_callback_tracebacks(True)
    unraisables = []
    def _hook(u):
        unraisables.append(u)
    old_hook = sys.unraisablehook
    sys.unraisablehook = _hook
    try:
        with pytest.raises(_sqlite3.OperationalError) as e:
            con.execute("create table foo(a, b);")
        print(e)
    finally:
        _sqlite3.enable_callback_tracebacks(False)
        sys.unraisablehook = old_hook
    assert len(unraisables) > 0
    cm = unraisables[-1]
    assert cm.exc_type == ZeroDivisionError 

def test_adapt_three_args():
    assert _sqlite3.adapt(print, alt=123) == 123
    with pytest.raises(_sqlite3.ProgrammingError):
        _sqlite3.adapt(print)

def test_bad_conform(con):
    class BadConform:
        def __conform__(self, protocol):
            raise TypeError
    val = BadConform()
    con.execute("create table test(foo)")
    with pytest.raises(_sqlite3.ProgrammingError) as info:
        con.execute("insert into test(foo) values (?)", (val,))
    assert 'BadConform' in str(info.value)

def test_collation_unicode(con):
    con.create_collation('ä', lambda *args: None)

def test_function_raises_memory_error(con):
    def mem():
        raise MemoryError()
    con.create_function("mem", 0, mem)
    with pytest.raises(MemoryError):
        cur = con.execute("select mem()")

def test_function_raises_overflow_error(con):
    def ovf():
        raise OverflowError()
    con.create_function("ovf", 0, ovf)
    with pytest.raises(_sqlite3.DataError):
        cur = con.execute("select ovf()")

def test_window_function(con):
    class WindowSumInt:
        def __init__(self):
            self.count = 0

        def step(self, value):
            self.count += value

        def value(self):
            return self.count

        def inverse(self, value):
            self.count -= value

        def finalize(self):
            return self.count

    con.create_window_function('sumint', 1, WindowSumInt)
    con.execute("create table test(x, y)")
    values = [
        ("a", 4),
        ("b", 5),
        ("c", 3),
        ("d", 8),
        ("e", 1),
    ]
    con.executemany("insert into test values(?, ?)", values)
    query = """
            select x, sumint(y) over (
                order by x rows between 1 preceding and 1 following
            ) as sum_y
            from test order by x
    """
    res = con.execute(query)
    assert list(res) == [
        ("a", 9),
        ("b", 12),
        ("c", 16),
        ("d", 12),
        ("e", 9),
    ]

def test_query_limit(con):
    con.setlimit(_sqlite3.SQLITE_LIMIT_SQL_LENGTH, 100)
    limit = con.getlimit(_sqlite3.SQLITE_LIMIT_SQL_LENGTH)
    with pytest.raises(_sqlite3.DataError) as info:
        con.execute('select 1'.ljust(limit + 1))
    assert "query string is too large" in str(info.value)
    with pytest.raises(_sqlite3.DataError) as info:
        con.executescript('select 1'.ljust(limit + 1))
    assert "query string is too large" in str(info.value)

@pytest.mark.skipif(
    not hasattr(_sqlite3.Connection, 'serialize'),
    reason='sqlite3 version too old for serialize')
def test_serialize(con):
    with con:
        con.execute("create table t(t)")
    data = con.serialize()
    assert len(data) == 8192

    # Remove test table, verify that it was removed.
    with con:
        con.execute("drop table t")
    with pytest.raises(_sqlite3.OperationalError):
        con.execute("select t from t")

    # Deserialize and verify that test table is restored.
    con.deserialize(data)
    con.execute("select t from t")

class TestBlob:
    def test_simple(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        blob = con.blobopen("test", "b", 1)
        assert blob.read(len(data)) == data

    def test_write(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        with con.blobopen("test", "b", 1) as blob:
            blob.write(b"some new data")
            assert blob.tell() == len(b"some new data")
        with con.blobopen("test", "b", 1) as blob:
            assert blob.read() == b'some new dataa string is exactly fifty bytes long!'

    def test_seek(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        with con.blobopen("test", "b", 1) as blob:
            blob.write(b"some new data")
            blob.seek(0)
            assert blob.read() == b'some new dataa string is exactly fifty bytes long!'

    def test_seek_to_end(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        with con.blobopen("test", "b", 1) as blob:
            blob.seek(len(data))
            assert blob.read() == b''

    def test_getitem(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        with con.blobopen("test", "b", 1) as blob:
            assert blob[0] == ord('t')
            assert blob[:2] == b'th'
            assert blob[::2] == data[::2]

    def test_setitem(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        with con.blobopen("test", "b", 1) as blob:
            blob[0] = 14
            assert blob[0] == 14
            blob[:len(b"some new data")] = b"some new data"
            assert blob.tell() == 0
        with con.blobopen("test", "b", 1) as blob:
            assert blob.read() == b'some new dataa string is exactly fifty bytes long!'
        with pytest.raises(IndexError):
            with con.blobopen("test", "b", 1) as blob:
                blob[-1999] = 14

    def test_setitem_stride(self, con):
        con.execute("create table test(b blob)")
        data = b"this blob data string is exactly fifty bytes long!"
        con.execute("insert into test(b) values (?)", (data, ))
        with con.blobopen("test", "b", 1) as blob:
            blob[0] = 14
            assert blob[0] == 14
            blob[:len(b"some new data")*2:2] = b"some new data"
            assert blob.tell() == 0
            expected = bytearray(data)
            expected[:len(b"some new data")*2:2] = b"some new data"
            assert blob[:] == expected


def test_threadsafe():
    assert _sqlite3.threadsafety in {0, 1, 3}

def test_sql_lstrip_comments():
    strip = _sqlite3._sql_lstrip_comments
    assert strip(' *not a comment') == '*not a comment'
    assert strip(' -*not a comment') == '-*not a comment'
    assert strip(' -- foo bar') == ''
    assert strip(' --') == ''
    assert strip(' -\n- select 2') == '-\n- select 2'
    assert strip(' /') == '/'
    assert strip(' /* */ a') == 'a'
    assert strip(' /*') == ''
    assert strip(' // c++ comments are not allowed') == '// c++ comments are not allowed'
    assert strip(' select 2') == 'select 2'
    assert strip('\n               -- comment\n               select 2\n            ') == 'select 2\n            '
    assert strip('\n        ') == ''
    assert strip('\n\n            /*\n            foo\n            */\n            ') == ''

def test_weird_reinit(con):
    # not quite sure why that's useful, but oh well
    with pytest.raises(_sqlite3.OperationalError):
        con.__init__('hopefull/does/not/exist/in/this/directory/äöúß')
    with pytest.raises(_sqlite3.ProgrammingError):
        con.execute('select 1')
    con.__init__(':memory:')