File: tasks.rst

package info (click to toggle)
python-anyio 4.8.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,108 kB
  • sloc: python: 14,231; sh: 21; makefile: 9
file content (183 lines) | stat: -rw-r--r-- 7,185 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
Creating and managing tasks
===========================

.. py:currentmodule:: anyio

A *task* is a unit of execution that lets you do many things concurrently that need
waiting on. This works so that while you can have any number of tasks, the asynchronous
event loop can only run one of them at a time. When the task encounters an ``await``
statement that requires the task to sleep until something happens, the event loop is
then free to work on another task. When the thing the first task was waiting is
complete, the event loop will resume the execution of that task on the first opportunity
it gets.

Task handling in AnyIO loosely follows the Trio_ model. Tasks can be created (*spawned*)
using *task groups*. A task group is an asynchronous context manager that makes sure
that all its child tasks are finished one way or another after the context block is
exited. If a child task, or the code in the enclosed context block raises an exception,
all child tasks are cancelled. Otherwise the context manager just waits until all child
tasks have exited before proceeding.

Here's a demonstration::

    from anyio import sleep, create_task_group, run


    async def sometask(num: int) -> None:
        print('Task', num, 'running')
        await sleep(1)
        print('Task', num, 'finished')


    async def main() -> None:
        async with create_task_group() as tg:
            for num in range(5):
                tg.start_soon(sometask, num)

        print('All tasks finished!')

    run(main)

.. _Trio: https://trio.readthedocs.io/en/latest/reference-core.html
   #tasks-let-you-do-multiple-things-at-once

Starting and initializing tasks
-------------------------------

Sometimes it is very useful to be able to wait until a task has successfully initialized
itself. For example, when starting network services, you can have your task start the
listener and then signal the caller that initialization is done. That way, the caller
can now start another task that depends on that service being up and running. Also, if
the socket bind fails or something else goes wrong during initialization, the exception
will be propagated to the caller which can then catch and handle it.

This can be done with :meth:`TaskGroup.start() <.abc.TaskGroup.start>`::

    from anyio import (
        TASK_STATUS_IGNORED,
        create_task_group,
        connect_tcp,
        create_tcp_listener,
        run,
    )
    from anyio.abc import TaskStatus


    async def handler(stream):
        ...


    async def start_some_service(
        port: int, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED
    ):
        async with await create_tcp_listener(
            local_host="127.0.0.1", local_port=port
        ) as listener:
            task_status.started()
            await listener.serve(handler)


    async def main():
        async with create_task_group() as tg:
            await tg.start(start_some_service, 5000)
            async with await connect_tcp("127.0.0.1", 5000) as stream:
                ...


    run(main)

The target coroutine function **must** call ``task_status.started()`` because the task
that is calling with :meth:`TaskGroup.start() <.abc.TaskGroup.start>` will be blocked
until then. If the spawned task never calls it, then the
:meth:`TaskGroup.start() <.abc.TaskGroup.start>` call will raise a ``RuntimeError``.

.. note:: Unlike :meth:`~.abc.TaskGroup.start_soon`, :meth:`~.abc.TaskGroup.start` needs
   an ``await``.

Handling multiple errors in a task group
----------------------------------------

It is possible for more than one task to raise an exception in a task group. This can
happen when a task reacts to cancellation by entering either an exception handler block
or a ``finally:`` block and raises an exception there. This raises the question: which
exception is propagated from the task group context manager? The answer is "both". In
practice this means that a special exception, :exc:`ExceptionGroup` (or
:exc:`BaseExceptionGroup`) is raised which contains both exception objects.

To catch such exceptions potentially nested in groups, special measures are required.
On Python 3.11 and later, you can use the ``except*`` syntax to catch multiple
exceptions::

    from anyio import create_task_group

    try:
        async with create_task_group() as tg:
            tg.start_soon(some_task)
            tg.start_soon(another_task)
    except* ValueError as excgroup:
        for exc in excgroup.exceptions:
            ...  # handle each ValueError
    except* KeyError as excgroup:
        for exc in excgroup.exceptions:
            ...  # handle each KeyError

If compatibility with older Python versions is required, you can use the ``catch()``
function from the exceptiongroup_ package::

    from anyio import create_task_group
    from exceptiongroup import catch

    def handle_valueerror(excgroup: ExceptionGroup) -> None:
        for exc in excgroup.exceptions:
            ...  # handle each ValueError

    def handle_keyerror(excgroup: ExceptionGroup) -> None:
        for exc in excgroup.exceptions:
            ...  # handle each KeyError

    with catch({
        ValueError: handle_valueerror,
        KeyError: handle_keyerror
    }):
        async with create_task_group() as tg:
            tg.start_soon(some_task)
            tg.start_soon(another_task)

If you need to set local variables in the handlers, declare them as ``nonlocal``::

    def handle_valueerror(exc):
        nonlocal somevariable
        somevariable = 'whatever'

.. _exceptiongroup: https://pypi.org/project/exceptiongroup/

Context propagation
-------------------

Whenever a new task is spawned, `context`_ will be copied to the new task. It is
important to note *which* context will be copied to the newly spawned task. It is not
the context of the task group's host task that will be copied, but the context of the
task that calls :meth:`TaskGroup.start() <.abc.TaskGroup.start>` or
:meth:`TaskGroup.start_soon() <.abc.TaskGroup.start_soon>`.

.. _context: https://docs.python.org/3/library/contextvars.html

Differences with asyncio.TaskGroup
----------------------------------

The :class:`asyncio.TaskGroup` class, added in Python 3.11, is very similar in design to
the AnyIO :class:`~.abc.TaskGroup` class. The asyncio counterpart has some important
differences in its semantics, however:

* The task group itself is instantiated directly, rather than using a factory function
* Tasks are spawned solely through :meth:`~asyncio.TaskGroup.create_task`; there is no
  ``start()`` or ``start_soon()`` method
* The :meth:`~asyncio.TaskGroup.create_task` method returns a task object which can be
  awaited on (or cancelled)
* Tasks spawned via :meth:`~asyncio.TaskGroup.create_task` can only be cancelled
  individually (there is no ``cancel()`` method or similar in the task group)
* When a task spawned via :meth:`~asyncio.TaskGroup.create_task` is cancelled before its
  coroutine has started running, it will not get a chance to handle the cancellation
  exception
* :class:`asyncio.TaskGroup` does not allow starting new tasks after an exception in
  one of the tasks has triggered a shutdown of the task group