File: example.py

package info (click to toggle)
picobox 4.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 248 kB
  • sloc: python: 1,666; makefile: 16
file content (29 lines) | stat: -rw-r--r-- 976 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import concurrent.futures
import threading

import picobox
import requests


@picobox.pass_("session")
def spam(session):
    return "thread={}; session={}; ip={}".format(
        threading.get_ident(),
        id(session),
        session.get("https://httpbin.org/ip").json()["origin"],
    )


# According to https://github.com/kennethreitz/requests/issues/2766
# requests.Session() is not thread-safe. Therefore we need to create
# a separate session for each thread.
box = picobox.Box()
box.put("session", factory=requests.Session, scope=picobox.threadlocal)

with picobox.push(box):
    # We have 3 threads and 10 spam calls which means there should be no more
    # than 3 different session instances (check session ID in the output).
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(spam) for _ in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())