File: urlqueue.py

package info (click to toggle)
linkchecker 5.2-2
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 3,508 kB
  • ctags: 3,805
  • sloc: python: 22,666; lex: 1,114; yacc: 785; makefile: 276; ansic: 95; sh: 68; sql: 19; awk: 4
file content (272 lines) | stat: -rw-r--r-- 10,555 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2009 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Handle a queue of URLs to check.
"""
from __future__ import with_statement
import threading
import collections
from time import time as _time
from .. import log, LOG_CACHE


class Timeout (StandardError):
    """Raised by join()"""
    pass

class Empty (StandardError):
    """Exception raised by get()."""
    pass


class UrlQueue (object):
    """A queue supporting several consumer tasks. The task_done() idea is
    from the Python 2.5 implementation of Queue.Queue()."""

    def __init__ (self):
        """Initialize the queue state and task counters."""
        # Note: don't put a maximum size on the queue since it would
        # lead to deadlocks when all worker threads called put().
        self.queue = collections.deque()
        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the two conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()
        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
        self.finished_tasks = 0
        self.in_progress = {}
        self.checked = {}
        self.shutdown = False
        self.unsorted = 0

    def qsize (self):
        """Return the approximate size of the queue (not reliable!)."""
        self.mutex.acquire()
        n = len(self.queue)
        self.mutex.release()
        return n

    def empty (self):
        """Return True if the queue is empty, False otherwise (not reliable!)."""
        self.mutex.acquire()
        n = self._empty()
        self.mutex.release()
        return n

    def _empty (self):
        return not self.queue

    def get (self, timeout=None):
        """Get first not-in-progress url from the queue and
        return it. If no such url is available return None. The
        url might be already cached.
        """
        with self.not_empty:
            return self._get(timeout)

    def _get (self, timeout):
        if timeout is None:
            while self._empty():
                self.not_empty.wait()
        else:
            if timeout < 0:
                raise ValueError("'timeout' must be a positive number")
            endtime = _time() + timeout
            while self._empty():
                remaining = endtime - _time()
                if remaining <= 0.0:
                    raise Empty()
                self.not_empty.wait(remaining)
        url_data = self.queue.popleft()
        key = url_data.cache_url_key
        if url_data.has_result:
            # Already checked and copied from cache.
            pass
        elif key in self.checked:
            # Already checked; copy result. And even ignore
            # the case where url happens to be in_progress.
            url_data.copy_from_cache(self.checked[key])
        elif key in self.in_progress:
            # It's being checked currently; put it back in the queue.
            self.queue.append(url_data)
            url_data = None
        else:
            self.in_progress[key] = url_data
        return url_data

    def put (self, item, block=True, timeout=None):
        """Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        """
        with self.mutex:
            self._put(item)
            self.not_empty.notify()

    def _put (self, url_data):
        """Put URL in queue, increase number of unfished tasks."""
        if self.shutdown:
            # don't accept more URLs
            return
        log.debug(LOG_CACHE, "queueing %s", url_data)
        key = url_data.cache_url_key
        if key in self.checked:
            # Put at beginning of queue to get consumed quickly.
            url_data.copy_from_cache(self.checked[key])
            self.queue.appendleft(url_data)
        elif key in self.in_progress:
            # Put at beginning of queue since it will be cached soon.
            self.queue.appendleft(url_data)
        else:
            self.queue.append(url_data)
            self.unsorted += 1
        if self.unsorted > 2000:
            self._sort()
            self.unsorted = 0
        self.unfinished_tasks += 1

    def task_done (self, url_data):
        """
        Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        """
        with self.all_tasks_done:
            log.debug(LOG_CACHE, "task_done %s", url_data)
            if url_data is not None:
                key = url_data.cache_url_key
                if key is not None and key not in self.checked:
                    self._cache_url(key, url_data)
                else:
                    assert key not in self.in_progress
            self.finished_tasks += 1
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notifyAll()
            self.unfinished_tasks = unfinished

    def _cache_url (self, key, url_data):
        """Put URL result data into cache."""
        log.debug(LOG_CACHE, "Caching %r", key)
        if key in self.in_progress:
            del self.in_progress[key]
        data = url_data.get_cache_data()
        self.checked[key] = data
        # check for aliases (eg. through HTTP redirections)
        if hasattr(url_data, "aliases"):
            data = url_data.get_alias_cache_data()
            for key in url_data.aliases:
                if key in self.checked or key in self.in_progress:
                    continue
                log.debug(LOG_CACHE, "Caching alias %r", key)
                self.checked[key] = data

    def _sort (self):
        """Sort URL queue by putting all cached URLs at the beginning."""
        newqueue = collections.deque()
        while self.queue:
            url_data = self.queue.popleft()
            key = url_data.cache_url_key
            if url_data.has_result:
                # Already checked and copied from cache.
                newqueue.appendleft(url_data)
            elif key in self.checked:
                # Already checked; copy result. And even ignore
                # the case where url happens to be in_progress.
                url_data.copy_from_cache(self.checked[key])
                newqueue.appendleft(url_data)
            else:
                newqueue.append(url_data)
        self.queue = newqueue

    def join (self, timeout=None):
        """Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        """
        with self.all_tasks_done:
            if timeout is None:
                while self.unfinished_tasks:
                    self.all_tasks_done.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                endtime = _time() + timeout
                while self.unfinished_tasks:
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Timeout()
                    self.all_tasks_done.wait(remaining)

    def do_shutdown (self):
        """Shutdown the queue by not accepting any more URLs."""
        with self.mutex:
            unfinished = self.unfinished_tasks - len(self.queue)
            self.queue.clear()
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('shutdown is in error')
                self.all_tasks_done.notifyAll()
            self.unfinished_tasks = unfinished
            self.shutdown = True

    def status (self):
        """
        Get tuple (finished tasks, in progress, queue size).
        """
        with self.mutex:
            return (self.finished_tasks,
                    len(self.in_progress), len(self.queue))

    def checked_redirect (self, redirect, url_data):
        """
        Check if redirect is already in cache. Used for URL redirections
        to avoid double checking of already cached URLs.
        If the redirect URL is found in the cache, the result data is
        already copied.
        """
        with self.mutex:
            if redirect in self.checked:
                url_data.copy_from_cache(self.checked[redirect])
                return True
            return False