File: QueueSet.py

package info (click to toggle)
python-dirq 1.8-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, trixie
  • size: 336 kB
  • sloc: python: 2,200; makefile: 166
file content (176 lines) | stat: -rw-r--r-- 5,257 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Interface to elements on a set of directory based queues.

Author
------

Konstantin Skaburskas <konstantin.skaburskas@gmail.com>

License and Copyright
---------------------

ASL 2.0

Copyright (C) CERN 2011-2021
"""

import dirq

from dirq.QueueBase import QueueBase
from dirq.Exceptions import QueueError

__version__ = dirq.VERSION
__author__ = dirq.AUTHOR
__date__ = dirq.DATE


class QueueSet(object):
    """Interface to elements on a set of directory based queues.
    """
    def __init__(self, *queues):
        """Generate queue set on the given lists of queues. Copies of the
        object instances are used.

        Arguments:
            *queues - QueueSet([q1,..]/(q1,..)) or QueueSet(q1,..)

        Raise:
            QueueError - queues should be list/tuple or Queue object
            TypeError  - one of objects provided is not instance of Queue
        """
        self.qset = []  # set of queues
        self.elts = []  # local (queue, element) cache
        self._next_exception = False

        self._add(*queues)

    def __iter__(self):
        """ Return iterator over element names on the set of queues. """
        self._reset()
        self._next_exception = True
        return self

    def names(self):
        """Return iterator over element names on the set of queues.
        """
        return self.__iter__()

    def _reset(self):
        """Regenerate lists of intermediate directories and drop cached
        elements lists.

        Raise:
            OSError - can't list directories
        """
        for queue in self.qset:
            queue._reset()
        self.elts = []

    def first(self):
        """Return the first element in the queue set and cache information
        about the next ones.

        Raise:
            OSError - can't list directories
        """
        self._reset()
        return self.next()

    def __next__(self):
        """Return (queue, next element) tuple from the queue set, only using
        cached information.

        Raise:
            StopIteration - when used as Python iterator via
                            __iter__() method

            OSError       - can't list element directories
        """
        if not self.elts:
            for queue in self.qset:
                self.elts.append((queue, queue.next()))
            if not self.elts:
                return (None, None)
        self.elts.sort(key=lambda x: x[1])
        for index, queue_elt in enumerate(self.elts):
            self.elts[index] = (queue_elt[0], queue_elt[0].next())
            if queue_elt[1]:
                return queue_elt
        if self._next_exception:
            self._next_exception = False
            raise StopIteration
        else:
            return (None, None)
    next = __next__

    def count(self):
        """Return the number of elements in the queue set, regardless of
        their state.

        Raise:
            OSError - can't list/stat element directories
        """
        count = 0
        for queue in self.qset:
            count += queue.count()
        return count

    def _add(self, *queues):
        """Add lists of queues to existing ones. Copies of the object
        instances are used.

        Arguments:
            *queues - add([q1,..]/(q1,..)) or add(q1,..)

        Raise:
            QueueError - queue already in the set
            TypeError  - wrong queue object type provided
        """
        type_queue = False
        for queue in queues:
            if type(queue) in [list, tuple] and not type_queue:
                for _queue in queue:
                    if isinstance(_queue, QueueBase):
                        if _queue.id in [x.id for x in self.qset]:
                            raise QueueError("queue already in the set: %s" %
                                             _queue.path)
                        self.qset.append(_queue.copy())
                    else:
                        raise TypeError("QueueBase objects expected.")
                break
            elif isinstance(queue, QueueBase):
                type_queue = True
                self.qset.append(queue.copy())
            else:
                raise TypeError("expected QueueBase object(s) or list/tuple "
                                "of QueueBase objects")

    def add(self, *queues):
        """Add lists of queues to existing ones. Copies of the object
        instances are used.

        Arguments:
            *queues - add([q1,..]/(q1,..)) or add(q1,..)

        Raise:
            QueueError - queue already in the set
            TypeError  - wrong queue object type provided
        """
        self._add(*queues)
        self._reset()

    def remove(self, given_queue):
        """Remove a queue and its respective elements from in memory cache.

        Arguments:
            queue - queue to be removed

        Raise:
            TypeError - wrong queue object type provided
        """
        if not isinstance(given_queue, QueueBase):
            raise TypeError("QueueBase objects expected.")
        for index, queue in enumerate(self.qset):
            if given_queue.id == queue.id:
                del self.qset[index]
                if self.elts:
                    del self.elts[index]