1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
|
"""Download handlers for http and https schemes"""
import re
from time import time
from cStringIO import StringIO
from urlparse import urldefrag
from zope.interface import implements
from twisted.internet import defer, reactor, protocol
from twisted.web.http_headers import Headers as TxHeaders
from twisted.web.iweb import IBodyProducer
from twisted.internet.error import TimeoutError
from twisted.web.http import PotentialDataLoss
from scrapy.xlib.tx import Agent, ProxyAgent, ResponseDone, \
HTTPConnectionPool, TCP4ClientEndpoint
from scrapy.http import Headers
from scrapy.responsetypes import responsetypes
from scrapy.core.downloader.webclient import _parse
from scrapy.utils.misc import load_object
class HTTP11DownloadHandler(object):
def __init__(self, settings):
self._pool = HTTPConnectionPool(reactor, persistent=True)
self._pool.maxPersistentPerHost = settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN')
self._pool._factory.noisy = False
self._contextFactoryClass = load_object(settings['DOWNLOADER_CLIENTCONTEXTFACTORY'])
self._contextFactory = self._contextFactoryClass()
def download_request(self, request, spider):
"""Return a deferred for the HTTP download"""
agent = ScrapyAgent(contextFactory=self._contextFactory, pool=self._pool)
return agent.download_request(request)
def close(self):
return self._pool.closeCachedConnections()
class TunnelError(Exception):
"""An HTTP CONNECT tunnel could not be established by the proxy."""
class TunnelingTCP4ClientEndpoint(TCP4ClientEndpoint):
"""An endpoint that tunnels through proxies to allow HTTPS downloads. To
accomplish that, this endpoint sends an HTTP CONNECT to the proxy.
The HTTP CONNECT is always sent when using this endpoint, I think this could
be improved as the CONNECT will be redundant if the connection associated
with this endpoint comes from the pool and a CONNECT has already been issued
for it.
"""
_responseMatcher = re.compile('HTTP/1\.. 200')
def __init__(self, reactor, host, port, proxyConf, contextFactory,
timeout=30, bindAddress=None):
proxyHost, proxyPort, self._proxyAuthHeader = proxyConf
super(TunnelingTCP4ClientEndpoint, self).__init__(reactor, proxyHost,
proxyPort, timeout, bindAddress)
self._tunnelReadyDeferred = defer.Deferred()
self._tunneledHost = host
self._tunneledPort = port
self._contextFactory = contextFactory
def requestTunnel(self, protocol):
"""Asks the proxy to open a tunnel."""
tunnelReq = 'CONNECT %s:%s HTTP/1.1\n' % (self._tunneledHost,
self._tunneledPort)
if self._proxyAuthHeader:
tunnelReq += 'Proxy-Authorization: %s \n\n' % self._proxyAuthHeader
else:
tunnelReq += '\n'
protocol.transport.write(tunnelReq)
self._protocolDataReceived = protocol.dataReceived
protocol.dataReceived = self.processProxyResponse
self._protocol = protocol
return protocol
def processProxyResponse(self, bytes):
"""Processes the response from the proxy. If the tunnel is successfully
created, notifies the client that we are ready to send requests. If not
raises a TunnelError.
"""
self._protocol.dataReceived = self._protocolDataReceived
if TunnelingTCP4ClientEndpoint._responseMatcher.match(bytes):
self._protocol.transport.startTLS(self._contextFactory,
self._protocolFactory)
self._tunnelReadyDeferred.callback(self._protocol)
else:
self._tunnelReadyDeferred.errback(
TunnelError('Could not open CONNECT tunnel.'))
def connectFailed(self, reason):
"""Propagates the errback to the appropriate deferred."""
self._tunnelReadyDeferred.errback(reason)
def connect(self, protocolFactory):
self._protocolFactory = protocolFactory
connectDeferred = super(TunnelingTCP4ClientEndpoint,
self).connect(protocolFactory)
connectDeferred.addCallback(self.requestTunnel)
connectDeferred.addErrback(self.connectFailed)
return self._tunnelReadyDeferred
class TunnelingAgent(Agent):
"""An agent that uses a L{TunnelingTCP4ClientEndpoint} to make HTTPS
downloads. It may look strange that we have chosen to subclass Agent and not
ProxyAgent but consider that after the tunnel is opened the proxy is
transparent to the client; thus the agent should behave like there is no
proxy involved.
"""
def __init__(self, reactor, proxyConf, contextFactory=None,
connectTimeout=None, bindAddress=None, pool=None):
super(TunnelingAgent, self).__init__(reactor, contextFactory,
connectTimeout, bindAddress, pool)
self._proxyConf = proxyConf
self._contextFactory = contextFactory
def _getEndpoint(self, scheme, host, port):
return TunnelingTCP4ClientEndpoint(self._reactor, host, port,
self._proxyConf, self._contextFactory, self._connectTimeout,
self._bindAddress)
class ScrapyAgent(object):
_Agent = Agent
_ProxyAgent = ProxyAgent
_TunnelingAgent = TunnelingAgent
def __init__(self, contextFactory=None, connectTimeout=10, bindAddress=None, pool=None):
self._contextFactory = contextFactory
self._connectTimeout = connectTimeout
self._bindAddress = bindAddress
self._pool = pool
def _get_agent(self, request, timeout):
bindaddress = request.meta.get('bindaddress') or self._bindAddress
proxy = request.meta.get('proxy')
if proxy:
_, _, proxyHost, proxyPort, proxyParams = _parse(proxy)
scheme = _parse(request.url)[0]
omitConnectTunnel = proxyParams.find('noconnect') >= 0
if scheme == 'https' and not omitConnectTunnel:
proxyConf = (proxyHost, proxyPort,
request.headers.get('Proxy-Authorization', None))
return self._TunnelingAgent(reactor, proxyConf,
contextFactory=self._contextFactory, connectTimeout=timeout,
bindAddress=bindaddress, pool=self._pool)
else:
endpoint = TCP4ClientEndpoint(reactor, proxyHost, proxyPort,
timeout=timeout, bindAddress=bindaddress)
return self._ProxyAgent(endpoint)
return self._Agent(reactor, contextFactory=self._contextFactory,
connectTimeout=timeout, bindAddress=bindaddress, pool=self._pool)
def download_request(self, request):
timeout = request.meta.get('download_timeout') or self._connectTimeout
agent = self._get_agent(request, timeout)
# request details
url = urldefrag(request.url)[0]
method = request.method
headers = TxHeaders(request.headers)
bodyproducer = _RequestBodyProducer(request.body) if request.body else None
start_time = time()
d = agent.request(method, url, headers, bodyproducer)
# set download latency
d.addCallback(self._cb_latency, request, start_time)
# response body is ready to be consumed
d.addCallback(self._cb_bodyready, request)
d.addCallback(self._cb_bodydone, request, url)
# check download timeout
self._timeout_cl = reactor.callLater(timeout, d.cancel)
d.addBoth(self._cb_timeout, request, url, timeout)
return d
def _cb_timeout(self, result, request, url, timeout):
if self._timeout_cl.active():
self._timeout_cl.cancel()
return result
raise TimeoutError("Getting %s took longer than %s seconds." % (url, timeout))
def _cb_latency(self, result, request, start_time):
request.meta['download_latency'] = time() - start_time
return result
def _cb_bodyready(self, txresponse, request):
# deliverBody hangs for responses without body
if txresponse.length == 0:
return txresponse, '', None
def _cancel(_):
txresponse._transport._producer.loseConnection()
d = defer.Deferred(_cancel)
txresponse.deliverBody(_ResponseReader(d, txresponse, request))
return d
def _cb_bodydone(self, result, request, url):
txresponse, body, flags = result
status = int(txresponse.code)
headers = Headers(txresponse.headers.getAllRawHeaders())
respcls = responsetypes.from_args(headers=headers, url=url)
return respcls(url=url, status=status, headers=headers, body=body, flags=flags)
class _RequestBodyProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return defer.succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
class _ResponseReader(protocol.Protocol):
def __init__(self, finished, txresponse, request):
self._finished = finished
self._txresponse = txresponse
self._request = request
self._bodybuf = StringIO()
def dataReceived(self, bodyBytes):
self._bodybuf.write(bodyBytes)
def connectionLost(self, reason):
if self._finished.called:
return
body = self._bodybuf.getvalue()
if reason.check(ResponseDone):
self._finished.callback((self._txresponse, body, None))
elif reason.check(PotentialDataLoss):
self._finished.callback((self._txresponse, body, ['partial']))
else:
self._finished.errback(reason)
|