File: pgq_core.sql

package info (click to toggle)
pgq 3.2.6-8
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 836 kB
  • sloc: sql: 3,412; ansic: 1,990; python: 302; makefile: 89; sh: 2
file content (100 lines) | stat: -rw-r--r-- 3,783 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
\set VERBOSITY 'terse'
set client_min_messages = 'warning';

select * from pgq.maint_tables_to_vacuum();
select * from pgq.maint_retry_events();

select pgq.create_queue('tmpqueue');
select pgq.register_consumer('tmpqueue', 'consumer');
select pgq.unregister_consumer('tmpqueue', 'consumer');
select pgq.drop_queue('tmpqueue');

select pgq.create_queue('myqueue');
select pgq.register_consumer('myqueue', 'consumer');
update pgq.queue set queue_ticker_max_lag = '0', queue_ticker_idle_period = '0';
select pgq.next_batch('myqueue', 'consumer');
select pgq.next_batch('myqueue', 'consumer');
select pgq.ticker();
select pgq.next_batch('myqueue', 'consumer');
select pgq.next_batch('myqueue', 'consumer');

select queue_name, consumer_name, prev_tick_id, tick_id, lag < '30 seconds' as lag_exists from pgq.get_batch_info(1);

select queue_name, queue_ntables, queue_cur_table, queue_rotation_period,
       queue_switch_time <= now() as switch_time_exists,
       queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag,
       queue_ticker_idle_period, ticker_lag < '2 hours' as ticker_lag_exists,
       last_tick_id
  from pgq.get_queue_info() order by 1;
select queue_name, consumer_name, lag < '30 seconds' as lag_exists,
       last_seen < '30 seconds' as last_seen_exists,
       last_tick, current_batch, next_tick
  from pgq.get_consumer_info() order by 1, 2;

select pgq.finish_batch(1);
select pgq.finish_batch(1);

select pgq.ticker();
select pgq.next_batch('myqueue', 'consumer');
select * from pgq.batch_event_tables(2);
select * from pgq.get_batch_events(2);
select pgq.finish_batch(2);

select pgq.insert_event('myqueue', 'r1', 'data');
select pgq.insert_event('myqueue', 'r2', 'data', 'extra1', 'extra2', 'extra3', 'extra4');
select pgq.insert_event('myqueue', 'r3', 'data');
select pgq.current_event_table('myqueue');
select pgq.ticker();

select * from pgq.next_batch_custom('myqueue', 'consumer', '1 hour', null, null);
select * from pgq.next_batch_custom('myqueue', 'consumer', null, 10000, null);
select * from pgq.next_batch_custom('myqueue', 'consumer', null, null, '10 minutes');
select pgq.next_batch('myqueue', 'consumer');
select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 from pgq.get_batch_events(3);

begin;
select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
    from pgq.get_batch_cursor(3, 'acurs', 10);
close acurs;
select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
    from pgq.get_batch_cursor(3, 'acurs', 2);
close acurs;
select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4
    from pgq.get_batch_cursor(3, 'acurs', 2, 'ev_id = 1');
close acurs;
end;

select pgq.event_retry(3, 2, 0);
select pgq.batch_retry(3, 0);
select pgq.finish_batch(3);

select pgq.event_retry_raw('myqueue', 'consumer', now(), 666, now(), 0,
        'rawtest', 'data', null, null, null, null);

select pgq.ticker();

-- test maint
update pgq.queue set queue_rotation_period = '0 seconds';
select queue_name, pgq.maint_rotate_tables_step1(queue_name) from pgq.queue;
select pgq.maint_rotate_tables_step2();

-- test extra
select nextval(queue_event_seq) from pgq.queue where queue_name = 'myqueue';
select pgq.force_tick('myqueue');
select nextval(queue_event_seq) from pgq.queue where queue_name = 'myqueue';

create sequence tmptest_seq;

select pgq.seq_getval('tmptest_seq');
select pgq.seq_setval('tmptest_seq', 10);
select pgq.seq_setval('tmptest_seq', 5);
select pgq.seq_setval('tmptest_seq', 15);
select pgq.seq_getval('tmptest_seq');

drop sequence tmptest_seq;

select * from pgq.maint_operations();
update pgq.queue set queue_extra_maint = array['baz', 'foo.bar'];
select * from pgq.maint_operations();

select pgq.drop_queue('myqueue', true);