1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
-- ----------------------------------------------------------------------
-- Section: Internal Tables
--
-- Overview:
-- pgq.queue - Queue configuration
-- pgq.consumer - Consumer names
-- pgq.subscription - Consumer registrations
-- pgq.tick - Per-queue snapshots (ticks)
-- pgq.event_* - Data tables
-- pgq.retry_queue - Events to be retried later
-- pgq.failed_queue - Events whose processing failed
--
-- Its basically generalized and simplified Slony-I structure:
-- sl_node - pgq.consumer
-- sl_set - pgq.queue
-- sl_subscriber + sl_confirm - pgq.subscription
-- sl_event - pgq.tick
-- sl_setsync - pgq_ext.completed_*
-- sl_log_* - slony1 has per-cluster data tables,
-- pgq has per-queue data tables.
-- ----------------------------------------------------------------------
set client_min_messages = 'warning';
set default_with_oids = 'off';
-- drop schema if exists pgq cascade;
create schema pgq;
-- ----------------------------------------------------------------------
-- Table: pgq.consumer
--
-- Name to id lookup for consumers
--
-- Columns:
-- co_id - consumer's id for internal usage
-- co_name - consumer's id for external usage
-- ----------------------------------------------------------------------
create table pgq.consumer (
co_id serial,
co_name text not null default 'fooz',
constraint consumer_pkey primary key (co_id),
constraint consumer_name_uq UNIQUE (co_name)
);
-- ----------------------------------------------------------------------
-- Table: pgq.queue
--
-- Information about available queues
--
-- Columns:
-- queue_id - queue id for internal usage
-- queue_name - queue name visible outside
-- queue_ntables - how many data tables the queue has
-- queue_cur_table - which data table is currently active
-- queue_rotation_period - period for data table rotation
-- queue_switch_step1 - tx when rotation happened
-- queue_switch_step2 - tx after rotation was committed
-- queue_switch_time - time when switch happened
-- queue_external_ticker - ticks come from some external sources
-- queue_ticker_max_count - batch should not contain more events
-- queue_ticker_max_lag - events should not age more
-- queue_ticker_idle_period - how often to tick when no events happen
-- queue_data_pfx - prefix for data table names
-- queue_event_seq - sequence for event id's
-- queue_tick_seq - sequence for tick id's
-- ----------------------------------------------------------------------
create table pgq.queue (
queue_id serial,
queue_name text not null,
queue_ntables integer not null default 3,
queue_cur_table integer not null default 0,
queue_rotation_period interval not null default '2 hours',
queue_switch_step1 bigint not null default txid_current(),
queue_switch_step2 bigint default txid_current(),
queue_switch_time timestamptz not null default now(),
queue_external_ticker boolean not null default false,
queue_ticker_max_count integer not null default 500,
queue_ticker_max_lag interval not null default '3 seconds',
queue_ticker_idle_period interval not null default '1 minute',
queue_data_pfx text not null,
queue_event_seq text not null,
queue_tick_seq text not null,
constraint queue_pkey primary key (queue_id),
constraint queue_name_uq unique (queue_name)
);
-- ----------------------------------------------------------------------
-- Table: pgq.tick
--
-- Snapshots for event batching
--
-- Columns:
-- tick_queue - queue id whose tick it is
-- tick_id - ticks id (per-queue)
-- tick_time - time when tick happened
-- tick_snapshot - transaction state
-- ----------------------------------------------------------------------
create table pgq.tick (
tick_queue int4 not null,
tick_id bigint not null,
tick_time timestamptz not null default now(),
tick_snapshot txid_snapshot not null default txid_current_snapshot(),
constraint tick_pkey primary key (tick_queue, tick_id),
constraint tick_queue_fkey foreign key (tick_queue)
references pgq.queue (queue_id)
);
-- ----------------------------------------------------------------------
-- Sequence: pgq.batch_id_seq
--
-- Sequence for batch id's.
-- ----------------------------------------------------------------------
create sequence pgq.batch_id_seq;
-- ----------------------------------------------------------------------
-- Table: pgq.subscription
--
-- Consumer registration on a queue.
--
-- Columns:
--
-- sub_id - subscription id for internal usage
-- sub_queue - queue id
-- sub_consumer - consumer's id
-- sub_last_tick - last tick the consumer processed
-- sub_batch - shortcut for queue_id/consumer_id/tick_id
-- sub_next_tick - batch end pos
-- ----------------------------------------------------------------------
create table pgq.subscription (
sub_id serial not null,
sub_queue int4 not null,
sub_consumer int4 not null,
sub_last_tick bigint not null,
sub_active timestamptz not null default now(),
sub_batch bigint,
sub_next_tick bigint,
constraint subscription_pkey primary key (sub_id),
constraint subscription_ukey unique (sub_queue, sub_consumer),
constraint sub_queue_fkey foreign key (sub_queue)
references pgq.queue (queue_id),
constraint sub_consumer_fkey foreign key (sub_consumer)
references pgq.consumer (co_id)
);
-- ----------------------------------------------------------------------
-- Table: pgq.event_template
--
-- Parent table for all event tables
--
-- Columns:
-- ev_id - event's id, supposed to be unique per queue
-- ev_time - when the event was inserted
-- ev_txid - transaction id which inserted the event
-- ev_owner - subscription id that wanted to retry this
-- ev_retry - how many times the event has been retried, NULL for new events
-- ev_type - consumer/producer can specify what the data fields contain
-- ev_data - data field
-- ev_extra1 - extra data field
-- ev_extra2 - extra data field
-- ev_extra3 - extra data field
-- ev_extra4 - extra data field
-- ----------------------------------------------------------------------
create table pgq.event_template (
ev_id bigint not null,
ev_time timestamptz not null,
ev_txid bigint not null default txid_current(),
ev_owner int4,
ev_retry int4,
ev_type text,
ev_data text,
ev_extra1 text,
ev_extra2 text,
ev_extra3 text,
ev_extra4 text
);
-- ----------------------------------------------------------------------
-- Table: pgq.retry_queue
--
-- Events to be retried. When retry time reaches, they will
-- be put back into main queue.
--
-- Columns:
-- ev_retry_after - time when it should be re-inserted to main queue
-- * - same as pgq.event_template
-- ----------------------------------------------------------------------
create table pgq.retry_queue (
ev_retry_after timestamptz not null,
like pgq.event_template,
constraint rq_pkey primary key (ev_owner, ev_id),
constraint rq_owner_fkey foreign key (ev_owner)
references pgq.subscription (sub_id)
);
alter table pgq.retry_queue alter column ev_owner set not null;
alter table pgq.retry_queue alter column ev_txid drop not null;
create index rq_retry_idx on pgq.retry_queue (ev_retry_after);
create index rq_retry_owner_idx on pgq.retry_queue (ev_owner, ev_id);
-- ----------------------------------------------------------------------
-- Table: pgq.failed_queue
--
-- Events whose processing failed.
--
-- Columns:
-- ev_failed_reason - consumer's excuse for not processing
-- ev_failed_time - when it was tagged failed
-- * - same as pgq.event_template
-- ----------------------------------------------------------------------
create table pgq.failed_queue (
ev_failed_reason text,
ev_failed_time timestamptz not null,
-- all event fields
like pgq.event_template,
constraint fq_pkey primary key (ev_owner, ev_id),
constraint fq_owner_fkey foreign key (ev_owner)
references pgq.subscription (sub_id)
);
alter table pgq.failed_queue alter column ev_owner set not null;
alter table pgq.failed_queue alter column ev_txid drop not null;
|