File: concurrency.py

package info (click to toggle)
smart-open 7.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 980 kB
  • sloc: python: 8,054; sh: 90; makefile: 14
file content (87 lines) | stat: -rw-r--r-- 3,396 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
#
# This code is distributed under the terms and conditions
# from the MIT License (MIT).
#

"""Common functionality for concurrent processing. The main entry point is :func:`create_pool`."""

import concurrent.futures
import contextlib
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor

logger = logging.getLogger(__name__)


class ThreadPoolExecutor(_ThreadPoolExecutor):
    """Subclass with a lazy consuming imap method."""

    def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=2):
        """Ordered imap that consumes iterables just-in-time.

        References:
            https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999

        Args:
            fn: Function to apply.
            iterables: One (or more) iterable(s) to pass to fn (using zip) as positional argument(s).
            timeout: Per-future result retrieval timeout in seconds.
            queued_tasks_per_worker: Amount of additional items per worker to fetch from iterables to
                    fill the queue: this determines the total queue size.
                Setting 0 will result in a true just-in-time behaviour: when a worker finishes a task,
                    it waits until a result is consumed from the imap generator, at which point next()
                    is called on the input iterable(s) and a new task is submitted.
                Default 2 ensures there is always some work to pick up. Note that at imap startup,
                    the queue will fill up before the first yield occurs.

        Example:
            long_generator = itertools.count()
            with ThreadPoolExecutor(42) as pool:
                result_generator = pool.imap(fn, long_generator)
                for result in result_generator:
                    print(result)
        """
        futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1)
        popleft, append, submit = futures.popleft, futures.append, self.submit

        def get():
            """Block until the next task is done and return the result."""
            return popleft().result(timeout)

        for args in zip(*iterables):
            append(submit(fn, *args))
            if len(futures) == maxlen:
                yield get()

        while futures:
            yield get()


# ConcurrentFuturesPool and create_pool were once used in smart_open.s3.iter_bucket.
# Left here for backwards compatibility.


class ConcurrentFuturesPool(object):
    """A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
    def __init__(self, max_workers):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def imap_unordered(self, function, items):
        futures = [self.executor.submit(function, item) for item in items]
        for future in concurrent.futures.as_completed(futures):
            yield future.result()

    def terminate(self):
        self.executor.shutdown(wait=True)


@contextlib.contextmanager
def create_pool(processes=1):  # arg is called processes due to historical reasons
    logger.info("creating concurrent futures pool with %i workers", processes)
    pool = ConcurrentFuturesPool(max_workers=processes)
    yield pool
    pool.terminate()