File: consumer.py

package info (click to toggle)
python-pgq 3.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 320 kB
  • sloc: python: 2,822; makefile: 36; awk: 14
file content (140 lines) | stat: -rw-r--r-- 4,447 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""PgQ consumer framework for Python.
"""

from typing import Optional, Dict, Tuple, Iterator
from skytools.basetypes import Cursor, DictRow

from pgq.baseconsumer import BaseBatchWalker, BaseConsumer, EventList
from pgq.event import Event

__all__ = ['Consumer']


# Event status codes
EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1


class RetriableEvent(Event):
    """Event which can be retried

    Consumer is supposed to tag them after processing.
    """
    __slots__ = ('_status', )

    _status: int

    def __init__(self, queue_name: str, row: DictRow) -> None:
        super().__init__(queue_name, row)
        self._status = EV_DONE

    def tag_done(self) -> None:
        self._status = EV_DONE

    def get_status(self) -> int:
        return self._status

    def tag_retry(self, retry_time: int = 60) -> None:
        self._status = EV_RETRY
        self.retry_time = retry_time


class RetriableWalkerEvent(RetriableEvent):
    """Redirects status flags to RetriableBatchWalker.

    That way event data can be gc'd immediately and
    tag_done() events don't need to be remembered.
    """
    __slots__ = ('_walker', )

    _walker: "RetriableBatchWalker"

    def __init__(self, walker: "RetriableBatchWalker", queue_name: str, row: DictRow) -> None:
        super().__init__(queue_name, row)
        self._walker = walker

    def tag_done(self) -> None:
        self._walker.tag_event_done(self)

    def get_status(self) -> int:
        return self._walker.get_status(self)

    def tag_retry(self, retry_time: int = 60) -> None:
        self._walker.tag_event_retry(self, retry_time)


class RetriableBatchWalker(BaseBatchWalker):
    """BatchWalker that returns RetriableEvents
    """

    status_map: Dict[int, Tuple[int,int]]

    def __init__(self, curs: Cursor, batch_id: int, queue_name: str, fetch_size: int = 300, consumer_filter: Optional[str] = None) -> None:
        super().__init__(curs, batch_id, queue_name, fetch_size, consumer_filter)
        self.status_map = {}

    def _make_event(self, queue_name: str, row: DictRow) -> RetriableWalkerEvent:
        return RetriableWalkerEvent(self, queue_name, row)

    def tag_event_done(self, event: Event) -> None:
        if event.id in self.status_map:
            del self.status_map[event.id]

    def tag_event_retry(self, event: Event, retry_time: int) -> None:
        self.status_map[event.id] = (EV_RETRY, retry_time)

    def get_status(self, event: Event) -> int:
        return self.status_map.get(event.id, (EV_DONE, 0))[0]

    def iter_status(self) -> Iterator[Tuple[int, Tuple[int, int]]]:
        for res in self.status_map.items():
            yield res


class Consumer(BaseConsumer):
    """Normal consumer base class.
    Can retry events
    """

    _batch_walker_class = RetriableBatchWalker

    def _make_event(self, queue_name: str, row: DictRow) -> RetriableEvent:
        return RetriableEvent(queue_name, row)

    def _flush_retry(self, curs: Cursor, batch_id: int, ev_list: EventList) -> None:
        """Tag retry events."""

        retry = 0
        if self.pgq_lazy_fetch and isinstance(ev_list, RetriableBatchWalker):
            for ev_id, stat in ev_list.iter_status():
                if stat[0] == EV_RETRY:
                    self._tag_retry(curs, batch_id, ev_id, stat[1])
                    retry += 1
                elif stat[0] != EV_DONE:
                    raise Exception("Untagged event: id=%d" % ev_id)
        else:
            for ev in ev_list:
                if ev._status == EV_RETRY:
                    self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
                    retry += 1
                elif ev._status != EV_DONE:
                    raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % (
                                    ev.id, ev.type, ev.data, ev.extra1))

        # report weird events
        if retry:
            self.stat_increase('retry-events', retry)

    def _finish_batch(self, curs: Cursor, batch_id: int, ev_list: EventList) -> None:
        """Tag events and notify that the batch is done."""

        self._flush_retry(curs, batch_id, ev_list)

        super()._finish_batch(curs, batch_id, ev_list)

    def _tag_retry(self, cx: Cursor, batch_id: int, ev_id: int, retry_time: int) -> None:
        """Tag event for retry. (internal)"""
        cx.execute("select pgq.event_retry(%s, %s, %s)",
                   [batch_id, ev_id, retry_time])