File: test_complex_threads.py

package info (click to toggle)
python-promise 2.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 400 kB
  • sloc: python: 2,681; sh: 13; makefile: 4
file content (23 lines) | stat: -rw-r--r-- 485 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from time import sleep
from concurrent.futures import ThreadPoolExecutor
from promise import Promise
from operator import mul

executor = ThreadPoolExecutor(max_workers=40000)


def promise_factorial(n):
    if n < 2:
        return 1
    sleep(.02)
    a = executor.submit(promise_factorial, n - 1)

    def promise_then(r):
        return mul(r, n)

    return Promise.resolve(a).then(promise_then)


def test_factorial():
    p = promise_factorial(10)
    assert p.get() == 3628800