File: test_pgq.py

package info (click to toggle)
python-pgq 3.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 320 kB
  • sloc: python: 2,822; makefile: 36; awk: 14
file content (55 lines) | stat: -rw-r--r-- 1,484 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

import os
import secrets

import pytest
import skytools
import datetime

import pgq


TEST_Q_NAME = os.environ.get("TEST_Q_NAME")


@pytest.fixture(scope="session")
def dbconn():
    db = skytools.connect_database("")
    db.set_isolation_level(0)
    return db


@pytest.mark.skipif(not TEST_Q_NAME, reason="no db setup")
def test_insert_event(dbconn):
    with dbconn.cursor() as curs:
        ev_id = pgq.insert_event(curs, TEST_Q_NAME, "mytype", "payload")
    assert ev_id > 0

    with dbconn.cursor() as curs:
        curs.execute("select * from pgq.event_template where ev_id = %s", (ev_id,))
        rows = curs.fetchall()

    assert len(rows) == 1
    assert rows[0]["ev_id"] == ev_id
    assert rows[0]["ev_type"] == "mytype"


@pytest.mark.skipif(not TEST_Q_NAME, reason="no db setup")
def test_bulk_insert_events(dbconn):
    fields = ['ev_type', 'ev_data', 'ev_time']
    my_type = secrets.token_urlsafe(12)
    ev_time = datetime.datetime.now()
    rows1 = [
        {'ev_type': my_type, 'ev_data': 'data1', 'ev_time': ev_time},
        {'ev_type': my_type, 'ev_data': 'data2', 'ev_time': ev_time},
        {'ev_type': my_type, 'ev_data': 'data3', 'ev_time': ev_time},
    ]
    with dbconn.cursor() as curs:
        pgq.bulk_insert_events(curs, rows1, fields, TEST_Q_NAME)

    with dbconn.cursor() as curs:
        curs.execute("select * from pgq.event_template where ev_type = %s", (my_type,))
        rows2 = curs.fetchall()

    assert len(rows1) == len(rows2)