1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
|
Asynchronous Python HTTP Requests for Humans
============================================
.. image:: https://travis-ci.org/ross/requests-futures.svg?branch=master
:target: https://travis-ci.org/ross/requests-futures
Small add-on for the python requests_ http library. Makes use of python 3.2's
`concurrent.futures`_ or the backport_ for prior versions of python.
The additional API and changes are minimal and strives to avoid surprises.
The following synchronous code:
.. code-block:: python
from requests import Session
session = Session()
# first requests starts and blocks until finished
response_one = session.get('http://httpbin.org/get')
# second request starts once first is finished
response_two = session.get('http://httpbin.org/get?foo=bar')
# both requests are complete
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)
Can be translated to make use of futures, and thus be asynchronous by creating
a FuturesSession and catching the returned Future in place of Response. The
Response can be retrieved by calling the result method on the Future:
.. code-block:: python
from requests_futures.sessions import FuturesSession
session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)
By default a ThreadPoolExecutor is created with 8 workers. If you would like to
adjust that value or share a executor across multiple sessions you can provide
one to the FuturesSession constructor.
.. code-block:: python
from concurrent.futures import ThreadPoolExecutor
from requests_futures.sessions import FuturesSession
session = FuturesSession(executor=ThreadPoolExecutor(max_workers=10))
# ...
As a shortcut in case of just increasing workers number you can pass
`max_workers` straight to the `FuturesSession` constructor:
.. code-block:: python
from requests_futures.sessions import FuturesSession
session = FuturesSession(max_workers=10)
FutureSession will use an existing session object if supplied:
.. code-block:: python
from requests import session
from requests_futures.sessions import FuturesSession
my_session = session()
future_session = FuturesSession(session=my_session)
That's it. The api of requests.Session is preserved without any modifications
beyond returning a Future rather than Response. As with all futures exceptions
are shifted (thrown) to the future.result() call so try/except blocks should be
moved there.
Tying extra information to the request/response
===============================================
The most common piece of information needed is the URL of the request. This can
be accessed without any extra steps using the `request` property of the
response object.
.. code-block:: python
from concurrent.futures import as_completed
from pprint import pprint
from requests_futures.sessions import FuturesSession
session = FuturesSession()
futures=[session.get(f'http://httpbin.org/get?{i}') for i in range(3)]
for future in as_completed(futures):
resp = future.result()
pprint({
'url': resp.request.url,
'content': resp.json(),
})
There are situations in which you may want to tie additional information to a
request/response. There are a number of ways to go about this, the simplest is
to attach additional information to the future object itself.
.. code-block:: python
from concurrent.futures import as_completed
from pprint import pprint
from requests_futures.sessions import FuturesSession
session = FuturesSession()
futures=[]
for i in range(3):
future = session.get('http://httpbin.org/get')
future.i = i
futures.append(future)
for future in as_completed(futures):
resp = future.result()
pprint({
'i': future.i,
'content': resp.json(),
})
Canceling queued requests (a.k.a cleaning up after yourself)
============================================================
If you know that you won't be needing any additional responses from futures that
haven't yet resolved, it's a good idea to cancel those requests. You can do this
by using the session as a context manager:
.. code-block:: python
from requests_futures.sessions import FuturesSession
with FuturesSession(max_workers=1) as session:
future = session.get('https://httpbin.org/get')
future2 = session.get('https://httpbin.org/delay/10')
future3 = session.get('https://httpbin.org/delay/10')
response = future.result()
In this example, the second or third request will be skipped, saving time and
resources that would otherwise be wasted.
Iterating over a list of requests responses
===========================================
Without preserving the requests order:
.. code-block:: python
from concurrent.futures import as_completed
from requests_futures.sessions import FuturesSession
with FuturesSession() as session:
futures = [session.get('https://httpbin.org/delay/{}'.format(i % 3)) for i in range(10)]
for future in as_completed(futures):
resp = future.result()
print(resp.json()['url'])
Working in the Background
=========================
Additional processing can be done in the background using requests's hooks_
functionality. This can be useful for shifting work out of the foreground, for
a simple example take json parsing.
.. code-block:: python
from pprint import pprint
from requests_futures.sessions import FuturesSession
session = FuturesSession()
def response_hook(resp, *args, **kwargs):
# parse the json storing the result on the response object
resp.data = resp.json()
future = session.get('http://httpbin.org/get', hooks={
'response': response_hook,
})
# do some other stuff, send some more requests while this one works
response = future.result()
print('response status {0}'.format(response.status_code))
# data will have been attached to the response object in the background
pprint(response.data)
Hooks can also be applied to the session.
.. code-block:: python
from pprint import pprint
from requests_futures.sessions import FuturesSession
def response_hook(resp, *args, **kwargs):
# parse the json storing the result on the response object
resp.data = resp.json()
session = FuturesSession()
session.hooks['response'] = response_hook
future = session.get('http://httpbin.org/get')
# do some other stuff, send some more requests while this one works
response = future.result()
print('response status {0}'.format(response.status_code))
# data will have been attached to the response object in the background
pprint(response.data) pprint(response.data)
A more advanced example that adds an `elapsed` property to all requests.
.. code-block:: python
from pprint import pprint
from requests_futures.sessions import FuturesSession
from time import time
class ElapsedFuturesSession(FuturesSession):
def request(self, method, url, hooks=None, *args, **kwargs):
start = time()
if hooks is None:
hooks = {}
def timing(r, *args, **kwargs):
r.elapsed = time() - start
try:
if isinstance(hooks['response'], (list, tuple)):
# needs to be first so we don't time other hooks execution
hooks['response'].insert(0, timing)
else:
hooks['response'] = [timing, hooks['response']]
except KeyError:
hooks['response'] = timing
return super(ElapsedFuturesSession, self) \
.request(method, url, hooks=hooks, *args, **kwargs)
session = ElapsedFuturesSession()
future = session.get('http://httpbin.org/get')
# do some other stuff, send some more requests while this one works
response = future.result()
print('response status {0}'.format(response.status_code))
print('response elapsed {0}'.format(response.elapsed))
Using ProcessPoolExecutor
=========================
Similarly to `ThreadPoolExecutor`, it is possible to use an instance of
`ProcessPoolExecutor`. As the name suggest, the requests will be executed
concurrently in separate processes rather than threads.
.. code-block:: python
from concurrent.futures import ProcessPoolExecutor
from requests_futures.sessions import FuturesSession
session = FuturesSession(executor=ProcessPoolExecutor(max_workers=10))
# ... use as before
.. HINT::
Using the `ProcessPoolExecutor` is useful, in cases where memory
usage per request is very high (large response) and cycling the interpreter
is required to release memory back to OS.
A base requirement of using `ProcessPoolExecutor` is that the `Session.request`,
`FutureSession` all be pickle-able.
This means that only Python 3.5 is fully supported, while Python versions
3.4 and above REQUIRE an existing `requests.Session` instance to be passed
when initializing `FutureSession`. Python 2.X and < 3.4 are currently not
supported.
.. code-block:: python
# Using python 3.4
from concurrent.futures import ProcessPoolExecutor
from requests import Session
from requests_futures.sessions import FuturesSession
session = FuturesSession(executor=ProcessPoolExecutor(max_workers=10),
session=Session())
# ... use as before
In case pickling fails, an exception is raised pointing to this documentation.
.. code-block:: python
# Using python 2.7
from concurrent.futures import ProcessPoolExecutor
from requests import Session
from requests_futures.sessions import FuturesSession
session = FuturesSession(executor=ProcessPoolExecutor(max_workers=10),
session=Session())
Traceback (most recent call last):
...
RuntimeError: Cannot pickle function. Refer to documentation: https://github.com/ross/requests-futures/#using-processpoolexecutor
.. IMPORTANT::
* Python >= 3.4 required
* A session instance is required when using Python < 3.5
* If sub-classing `FuturesSession` it must be importable (module global)
Installation
============
pip install requests-futures
.. _`requests`: https://github.com/kennethreitz/requests
.. _`concurrent.futures`: http://docs.python.org/dev/library/concurrent.futures.html
.. _backport: https://pypi.python.org/pypi/futures
.. _hooks: http://docs.python-requests.org/en/master/user/advanced/#event-hooks
|