1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
from __future__ import annotations
from collections import deque
from typing import TYPE_CHECKING, Any, Callable
if TYPE_CHECKING:
from collections.abc import Hashable, Iterable
from queuelib.queue import BaseQueue
class RoundRobinQueue:
"""A round robin queue implemented using multiple internal queues (typically,
FIFO queues). The internal queue must implement the following methods:
* push(obj)
* pop()
* peek()
* close()
* __len__()
The constructor receives a qfactory argument, which is a callable used to
instantiate a new (internal) queue when a new key is allocated. The
qfactory function is called with the key number as first and only argument.
start_domains is a sequence of domains to initialize the queue with. If the
queue was previously closed leaving some domain buckets non-empty, those
domains should be passed in start_domains.
The queue maintains a fifo queue of keys. The key that went last is popped
first and the next queue for that key is then popped.
"""
def __init__(
self,
qfactory: Callable[[Hashable], BaseQueue],
start_domains: Iterable[Hashable] = (),
) -> None:
self.queues = {}
self.qfactory = qfactory
for key in start_domains:
self.queues[key] = self.qfactory(key)
self.key_queue = deque(start_domains)
def push(self, obj: Any, key: Hashable) -> None:
if key not in self.key_queue:
self.queues[key] = self.qfactory(key)
self.key_queue.appendleft(key) # it's new, might as well pop first
q = self.queues[key]
q.push(obj) # this may fail (eg. serialization error)
def peek(self) -> Any | None:
try:
key = self.key_queue[-1]
except IndexError:
return None
return self.queues[key].peek()
def pop(self) -> Any | None:
# pop until we find a valid object, closing necessary queues
while True:
try:
key = self.key_queue.pop()
except IndexError:
return None
q = self.queues[key]
m = q.pop()
if len(q) == 0:
del self.queues[key]
q.close()
else:
self.key_queue.appendleft(key)
if m:
return m
def close(self) -> list[Hashable]:
active = []
for k, q in self.queues.items():
if len(q):
active.append(k)
q.close()
return active
def __len__(self) -> int:
return sum(len(x) for x in self.queues.values()) if self.queues else 0
|