File: rrqueue.py

package info (click to toggle)
python-queuelib 1.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 176 kB
  • sloc: python: 1,034; sh: 8; makefile: 5
file content (85 lines) | stat: -rw-r--r-- 2,715 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import annotations

from collections import deque
from typing import TYPE_CHECKING, Any, Callable

if TYPE_CHECKING:
    from collections.abc import Hashable, Iterable

    from queuelib.queue import BaseQueue


class RoundRobinQueue:
    """A round robin queue implemented using multiple internal queues (typically,
    FIFO queues). The internal queue must implement the following methods:
        * push(obj)
        * pop()
        * peek()
        * close()
        * __len__()
    The constructor receives a qfactory argument, which is a callable used to
    instantiate a new (internal) queue when a new key is allocated. The
    qfactory function is called with the key number as first and only argument.
    start_domains is a sequence of domains to initialize the queue with. If the
    queue was previously closed leaving some domain buckets non-empty, those
    domains should be passed in start_domains.

    The queue maintains a fifo queue of keys. The key that went last is popped
    first and the next queue for that key is then popped.
    """

    def __init__(
        self,
        qfactory: Callable[[Hashable], BaseQueue],
        start_domains: Iterable[Hashable] = (),
    ) -> None:
        self.queues = {}
        self.qfactory = qfactory
        for key in start_domains:
            self.queues[key] = self.qfactory(key)
        self.key_queue = deque(start_domains)

    def push(self, obj: Any, key: Hashable) -> None:
        if key not in self.key_queue:
            self.queues[key] = self.qfactory(key)
            self.key_queue.appendleft(key)  # it's new, might as well pop first
        q = self.queues[key]
        q.push(obj)  # this may fail (eg. serialization error)

    def peek(self) -> Any | None:
        try:
            key = self.key_queue[-1]
        except IndexError:
            return None
        return self.queues[key].peek()

    def pop(self) -> Any | None:
        # pop until we find a valid object, closing necessary queues
        while True:
            try:
                key = self.key_queue.pop()
            except IndexError:
                return None

            q = self.queues[key]
            m = q.pop()

            if len(q) == 0:
                del self.queues[key]
                q.close()
            else:
                self.key_queue.appendleft(key)

            if m:
                return m

    def close(self) -> list[Hashable]:
        active = []
        for k, q in self.queues.items():
            if len(q):
                active.append(k)
            q.close()
        return active

    def __len__(self) -> int:
        return sum(len(x) for x in self.queues.values()) if self.queues else 0