1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
"""
Download web pages using asynchronous IO
"""
from time import time
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
from scrapy.core.exceptions import IgnoreRequest
from scrapy.conf import settings
from scrapy.utils.defer import mustbe_deferred
from scrapy import log
from .middleware import DownloaderMiddlewareManager
from .handlers import download_any
class SpiderInfo(object):
"""Simple class to keep information and state for each open spider"""
def __init__(self, download_delay=None, max_concurrent_requests=None):
if download_delay is None:
self.download_delay = settings.getint('DOWNLOAD_DELAY')
else:
self.download_delay = download_delay
if download_delay:
self.max_concurrent_requests = 1
elif max_concurrent_requests is None:
self.max_concurrent_requests = settings.getint('CONCURRENT_REQUESTS_PER_SPIDER')
else:
self.max_concurrent_requests = max_concurrent_requests
self.active = set()
self.queue = []
self.transferring = set()
self.closing = False
self.lastseen = 0
self.next_request_calls = set()
def free_transfer_slots(self):
return self.max_concurrent_requests - len(self.transferring)
def needs_backout(self):
# use self.active to include requests in the downloader middleware
return len(self.active) > 2 * self.max_concurrent_requests
def cancel_request_calls(self):
for call in self.next_request_calls:
call.cancel()
self.next_request_calls.clear()
class Downloader(object):
"""Mantain many concurrent downloads and provide an HTTP abstraction.
It supports a limited number of connections per spider and many spiders in
parallel.
"""
def __init__(self):
self.sites = {}
self.middleware = DownloaderMiddlewareManager()
self.concurrent_spiders = settings.getint('CONCURRENT_SPIDERS')
def fetch(self, request, spider):
"""Main method to use to request a download
This method includes middleware mangling. Middleware can returns a
Response object, then request never reach downloader queue, and it will
not be downloaded from site.
"""
site = self.sites[spider]
if site.closing:
raise IgnoreRequest('Cannot fetch on a closing spider')
site.active.add(request)
def _deactivate(_):
site.active.remove(request)
self._close_if_idle(spider)
return _
dfd = self.middleware.download(self.enqueue, request, spider)
return dfd.addBoth(_deactivate)
def enqueue(self, request, spider):
"""Enqueue a Request for a effective download from site"""
site = self.sites[spider]
if site.closing:
raise IgnoreRequest
deferred = defer.Deferred()
site.queue.append((request, deferred))
self._process_queue(spider)
return deferred
def _process_queue(self, spider):
"""Effective download requests from site queue"""
site = self.sites.get(spider)
if not site:
return
# Delay queue processing if a download_delay is configured
now = time()
if site.download_delay:
penalty = site.download_delay - now + site.lastseen
if penalty > 0:
d = defer.Deferred()
d.addCallback(self._process_queue)
call = reactor.callLater(penalty, d.callback, spider)
site.next_request_calls.add(call)
d.addBoth(lambda x: site.next_request_calls.remove(call))
return
site.lastseen = now
# Process enqueued requests if there are free slots to transfer for this site
while site.queue and site.free_transfer_slots() > 0:
request, deferred = site.queue.pop(0)
if site.closing:
dfd = defer.fail(Failure(IgnoreRequest()))
else:
dfd = self._download(site, request, spider)
dfd.chainDeferred(deferred)
self._close_if_idle(spider)
def _close_if_idle(self, spider):
site = self.sites.get(spider)
if site and site.closing and not site.active:
del self.sites[spider]
def _download(self, site, request, spider):
# The order is very important for the following deferreds. Do not change!
# 1. Create the download deferred
dfd = mustbe_deferred(download_any, request, spider)
# 2. After response arrives, remove the request from transferring
# state to free up the transferring slot so it can be used by the
# following requests (perhaps those which came from the downloader
# middleware itself)
site.transferring.add(request)
def finish_transferring(_):
site.transferring.remove(request)
self._process_queue(spider)
# avoid partially downloaded responses from propagating to the
# downloader middleware, to speed-up the closing process
if site.closing:
log.msg("Crawled while closing spider: %s" % request, \
level=log.DEBUG, spider=spider)
raise IgnoreRequest
return _
return dfd.addBoth(finish_transferring)
def open_spider(self, spider):
"""Allocate resources to begin processing a spider"""
if spider in self.sites:
raise RuntimeError('Downloader spider already opened: %s' % spider)
self.sites[spider] = SpiderInfo(
download_delay=getattr(spider, 'download_delay', None),
max_concurrent_requests=getattr(spider, 'max_concurrent_requests', None)
)
def close_spider(self, spider):
"""Free any resources associated with the given spider"""
site = self.sites.get(spider)
if not site or site.closing:
raise RuntimeError('Downloader spider already closed: %s' % spider)
site.closing = True
site.cancel_request_calls()
self._process_queue(spider)
def has_capacity(self):
"""Does the downloader have capacity to handle more spiders"""
return len(self.sites) < self.concurrent_spiders
def is_idle(self):
return not self.sites
|