File: process.py

package info (click to toggle)
python-pebble 5.1.1-1
  • links: PTS
  • area: main
  • in suites: sid, trixie
  • size: 436 kB
  • sloc: python: 5,491; makefile: 2
file content (165 lines) | stat: -rw-r--r-- 5,452 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# This file is part of Pebble.
# Copyright (c) 2013-2025, Matteo Cafasso

# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.

# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with Pebble.  If not, see <http://www.gnu.org/licenses/>.

import os
import types
import asyncio
import multiprocessing

from itertools import count
from functools import wraps
from concurrent.futures import TimeoutError
from typing import Any, Callable, Optional, overload

from pebble import common
from pebble.pool.process import ProcessPool


@overload
def process(func: common.CallableType) -> common.AsyncIODecoratorReturnType:
    ...
@overload
def process(
        name: Optional[str] = None,
        daemon: bool = True,
        timeout: Optional[float] = None,
        mp_context: Optional[multiprocessing.context.BaseContext] = None,
        pool: Optional[ProcessPool] = None
) -> common.AsyncIODecoratorParamsReturnType:
    ...
def process(*args, **kwargs):
    """Runs the decorated function in a concurrent process,
    taking care of the result and error management.

    Decorated functions will return an asyncio.Future object
    once called.

    The timeout parameter will set a maximum execution time
    for the decorated function. If the execution exceeds the timeout,
    the process will be stopped and the Future will raise TimeoutError.

    The name parameter will set the process name.

    The daemon parameter controls the underlying process daemon flag.
    Default is True.

    The context parameter allows to provide the multiprocessing.context
    object used for starting the process.

    The pool parameter accepts a pebble.ProcessPool instance to be used
    instead of running the function in a new process.

    """
    return common.decorate_function(_process_wrapper, *args, **kwargs)


def _process_wrapper(
        function: Callable,
        name: str,
        daemon: bool,
        timeout: float,
        mp_context: multiprocessing.context.BaseContext,
        pool: ProcessPool
) -> Callable:
    if isinstance(function, types.FunctionType):
        common.register_function(function)

    if hasattr(mp_context, 'get_start_method'):
        start_method = mp_context.get_start_method()
    else:
        start_method = 'spawn' if os.name == 'nt' else 'fork'

    if pool is not None:
        if not isinstance(pool, ProcessPool):
            raise TypeError('Pool expected to be ProcessPool')
        start_method = 'pool'

    @wraps(function)
    def wrapper(*args, **kwargs) -> asyncio.Future:
        loop = common.get_asyncio_loop()
        target, args = common.maybe_install_trampoline(function, args, start_method)

        if pool is not None:
            future = loop.run_in_executor(pool, target, timeout, *args, **kwargs)
        else:
            future = loop.create_future()
            reader, writer = mp_context.Pipe(duplex=False)

            worker = common.launch_process(
                name, common.function_handler, daemon, mp_context,
                target, args, kwargs, (reader, writer))

            writer.close()

            loop.create_task(_worker_handler(future, worker, reader, timeout))

        return future

    return wrapper


async def _worker_handler(
        future: asyncio.Future,
        worker: multiprocessing.Process,
        pipe: multiprocessing.Pipe,
        timeout: float
):
    """Worker lifecycle manager.

    Waits for the worker to be perform its task,
    collects result, runs the callback and cleans up the process.

    """
    result = await _get_result(future, pipe, timeout)

    if worker.is_alive():
        common.stop_process(worker)

    if result.status == common.ResultStatus.SUCCESS:
        future.set_result(result.value)
    else:
        if result.status == common.ResultStatus.ERROR:
            result.value.exitcode = worker.exitcode
            result.value.pid = worker.pid
        if not isinstance(result.value, asyncio.CancelledError):
            future.set_exception(result.value)


async def _get_result(
        future: asyncio.Future,
        pipe: multiprocessing.Pipe,
        timeout: float
) -> Any:
    """Waits for result and handles communication errors."""
    counter = count(step=common.CONSTS.sleep_unit)

    try:
        while not pipe.poll():
            if timeout is not None and next(counter) >= timeout:
                error = TimeoutError('Task Timeout', timeout)
                return common.Result(common.ResultStatus.FAILURE, error)
            if future.cancelled():
                error = asyncio.CancelledError()
                return common.Result(common.ResultStatus.FAILURE, error)

            await asyncio.sleep(common.CONSTS.sleep_unit)

        return pipe.recv()
    except (EOFError, OSError):
        error = common.ProcessExpired('Abnormal termination')
        return common.Result(common.ResultStatus.ERROR, error)
    except Exception as error:
        return common.Result(common.ResultStatus.ERROR, error)