File: why.rst

package info (click to toggle)
python-anyio 4.11.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,292 kB
  • sloc: python: 16,605; sh: 21; makefile: 9
file content (543 lines) | stat: -rw-r--r-- 26,195 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
Why you should be using AnyIO APIs instead of asyncio APIs
==========================================================

.. py:currentmodule:: anyio

AnyIO is not just a compatibility layer for bridging asyncio and Trio_. For one, it comes with its own diverse set of
Trio-inspired APIs which have been designed to be a step up from asyncio. Secondly, asyncio has numerous design issues
and missing features that AnyIO fixes for you. Therefore there are strong merits in switching to AnyIO APIs even if
you are developing an application and not a library.

Design problems with task management
++++++++++++++++++++++++++++++++++++

While the :class:`asyncio.TaskGroup` class, introduced in Python 3.11, is a major step towards structured concurrency,
it only provides a very narrow API that severely limits its usefulness.

First and foremost, the :class:`asyncio.TaskGroup` class does not offer any way to cancel, or even list all of the
contained tasks, so in order to do that, you would still have to keep track of any tasks you create. This also makes it
problematic to pass the task group to a child tasks, as tracking the tasks becomes a lot more tedious in such cases.

Secondly, while AnyIO (and Trio_) has long provided a way to wait until a newly launched task signals readiness,
:class:`asyncio.TaskGroup` still does not provide any such mechanism, leaving users to devise their own, often
error-prone methods to achieve this.

How does AnyIO fix these problems?
----------------------------------

An AnyIO task group contains its own cancel scope which can be used to cancel all the child tasks, regardless of where
they were launched from. Furthermore, if the task group's cancel scope is cancelled, any tasks launched from the task
group since then are *also* automatically subject to cancellation, thus ensuring that nothing can accidentally hang the
task group and prevent it from exiting.

As for tasks signalling readiness, :ref:`here <start_initialize>` is an example of waiting until a child task is
ready.

.. note:: In all fairness, AnyIO's task groups have their own ergonomics issues, like the inability to retrieve the
    tasks' return values and not being easily able to cancel individual tasks. This is something that
    `#890 <https://github.com/agronholm/anyio/pull/890>`_ aims to rectify.

Design problems with cancellation
+++++++++++++++++++++++++++++++++

The most significant problems with asyncio relate to its handling of cancellation. Asyncio employs a cancellation
mechanism where cancelling a task schedules a :exc:`~asyncio.CancelledError` exception to be raised in the task (once).
This mechanism is called *edge cancellation*.

The most common problem with edge cancellation is that if the task catches the :exc:`~asyncio.CancelledError` (which
often happens by accident when the user code has a ``except BaseException:`` block and doesn't re-raise the exception),
then no further action is taken, and the task keeps happily running until it is explicitly cancelled again::

    import asyncio


    async def sleeper():
        try:
            await asyncio.sleep(1)
        except BaseException:
            pass  # the first cancellation is caught here

        # This call will never return unless the task is cancelled again
        await asyncio.sleep(float("inf"))

    async def main():
        async with asyncio.TaskGroup() as tg:
            task = tg.create_task(sleeper())
            await asyncio.sleep(0)  # ensure that the task reaches the first sleep()
            task.cancel()

        print("done")

    # Execution hangs
    asyncio.run(main())

Another issue is that if a task that has already been scheduled to resume with a value (that is, the ``await`` is about
to yield a result) is cancelled, a :exc:`~asyncio.CancelledError` will instead be raised in the task's coroutine when it
resumes, thus potentially causing the awaitable result to be lost, even if the task catches the exception::

    import asyncio


    async def receive(f):
        print(await f)
        await asyncio.sleep(1)
        print("The task will be cancelled before this is printed")


    async def main():
        f = asyncio.get_running_loop().create_future()
        task = asyncio.create_task(receive(f))
        await asyncio.sleep(0)  # make sure the task has started
        f.set_result("hello")
        task.cancel()

        # The "hello" result is lost due to the cancellation
        try:
            await task
        except asyncio.CancelledError:
            pass


    # No output
    asyncio.run(main())

Similarly, if a newly created task is cancelled, its coroutine function may never get to run and thus react to the
cancellation. While the :external+python:doc:`asyncio documentation <library/asyncio-task>` claims that:

   Tasks can easily and safely be cancelled. When a task is cancelled, :exc:`~asyncio.CancelledError` will be raised in
   the task at the next opportunity.

This is simply **not true** for tasks that are cancelled before they have had a chance to start! This is problematic in
cases where the newly launched task is responsible for managing a resource. If the task is cancelled without getting to
handle the :exc:`~asyncio.CancelledError`, it won't have a chance to close the managed resource::

    import asyncio


    class Resource:
        async def __aenter__(self):
            return self

        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # Here would be the code that cleanly closes the resource
            print("closed")


    async def handle_resource(resource):
        async with resource:
            ...


    async def main():
        async with asyncio.TaskGroup() as tg:
            task = tg.create_task(handle_resource(Resource()))
            task.cancel()


    # No output
    asyncio.run(main())

.. note:: :func:`Eager task factories <asyncio.eager_task_factory>` and, likewise, tasks started with
   ``eager_start=True`` do not suffer from this particular issue, as there is no opportunity to cancel the task before
   its first iteration.

Asyncio cancellation shielding is a major footgun
-------------------------------------------------

Asyncio has a function, :func:`asyncio.shield`, to shield a coroutine from cancellation. It launches a new task in such
a way that the cancellation of the task awaiting on it will not propagate to the new task.

The trouble with this is that if the host task (the task that awaits for the shielded operation to complete) **is**
cancelled, the shielded task is orphaned. If the shielded task then raises an exception, only a warning might be printed
on the console, but the exception will not propagate anywhere. Worse yet, since asyncio only holds weak references to
each task, there is nothing preventing the shielded task from being garbage collected, mid-execution::

    import asyncio
    import gc


    async def shielded_task():
        fut = asyncio.get_running_loop().create_future()
        await fut


    async def host_task():
        await asyncio.shield(shielded_task())


    async def main():
        async with asyncio.TaskGroup() as tg:
            task = tg.create_task(host_task())
            await asyncio.sleep(0)  # allow the host task to start
            task.cancel()
            await asyncio.sleep(0)  # allow the cancellation to take effect on the host task
            gc.collect()

    # Prints warning: Task was destroyed but it is pending!
    asyncio.run(main())

To make matters even worse, the shielding only prevents indirect cancellation through the host task. If the event loop
is shut down, it will automatically cancel all tasks, including the supposedly shielded one::

    import asyncio
    import signal


    async def finalizer():
        await asyncio.sleep(1)
        print("Finalizer done")

    async def main():
        ...  # the business logic goes here
        asyncio.get_running_loop().call_soon(signal.raise_signal, signal.SIGINT)  # simulate ctrl+C
        await asyncio.shield(finalizer())

    # Prints a traceback containing a KeyboardInterrupt and a CancelledError, but not the "Finalizer done" message
    asyncio.run(main())

A good practical example of the issues with :func:`~asyncio.shield` can be drawn from the `Python Redis client`_ where
the incorrect use of this function was responsible for a `significant outage of ChatGPT`_. The point here is not to lay
blame on the downstream developers, but to demonstrate that :func:`~asyncio.shield` is difficult, if not impossible, to
use correctly for any practical purpose.

.. _Python Redis client: https://github.com/redis/redis-py
.. _significant outage of ChatGPT: https://openai.com/index/march-20-chatgpt-outage/

How does AnyIO fix these problems?
----------------------------------

To provide for more precise and predictable cancellation control, AnyIO (and Trio_) uses *cancel scopes*. Cancel scopes
select sections of a coroutine function to be cancelled. Cancel scopes are stateful in nature, meaning once a cancel
scope has been cancelled, it will stay that way. On asyncio, AnyIO cancel scopes work by cancelling the enclosed task(s)
every time they try to await on something as long as the task's active cancel scope is *effectively cancelled* (i.e.
either directly or via an ancestor scope). This mechanism of stateful cancellation is called *level cancellation*.

AnyIO's cancel scopes have two notable differences from asyncio's cancellation:

#. Cancel scopes never try to cancel a task when it's scheduled to resume with a value
#. Cancel scopes always allow the task a chance to react to the cancellation

In addition to providing the ability to cancel specific code sections, cancel scopes also provide two important
features: shielding and timeouts.

Shielding a section of code from cancellation also works in a more straightforward fashion – not by launching another
task, but by preventing the propagation of cancellation from parent cancel scope to a shielded scope.

Cancel scopes with a set *deadline* are roughly equivalent to :func:`asyncio.timeout`, except for the level cancellation
semantics and the ability to combine timeouts with shielding to easily implement finalization with a timeout. The
:func:`move_on_after` context manager is often used for this purpose.

.. note:: Shielded cancel scopes only protect against cancellation by other cancel scopes, not direct calls to
    :meth:`~asyncio.Task.cancel`.

The first asyncio example above demonstrated how a task cancellation is only delivered once, unless explicitly repeated.
But with AnyIO's cancel scopes, every attempt to yield control to the event loop from a cancelled task results in a new
:exc:`~asyncio.CancelledError`::

    import asyncio

    import anyio


    async def sleeper():
        try:
            await asyncio.sleep(1)
        except BaseException:
            pass  # the first cancellation is caught here

        # This will raise another CancelledError
        await asyncio.sleep(float("inf"))

    async def main():
        async with anyio.create_task_group() as tg:
            tg.start_soon(sleeper)
            await asyncio.sleep(0)  # ensure that the task reaches the first sleep()
            tg.cancel_scope.cancel()

        print("done")

    # Output: "done"
    asyncio.run(main())

The AnyIO version of the second example demonstrates that a task which is scheduled to resume will be able to process
the result of the ``await`` before it gets cancelled::

    import asyncio

    import anyio


    async def receive(f):
        print(await f)
        await asyncio.sleep(1)
        print("The task will be cancelled before this is printed")


    async def main():
        f = asyncio.get_running_loop().create_future()
        async with anyio.create_task_group() as tg:
            tg.start_soon(receive, f)
            await asyncio.sleep(0)  # make sure the task has started
            f.set_result("hello")
            tg.cancel_scope.cancel()


    # Output: "hello"
    asyncio.run(main())

The third example demonstrated that if a newly created task is cancelled, it would not get an opportunity to react to
the cancellation. With AnyIO's task groups, they do::

    import asyncio

    import anyio


    class Resource:
        async def __aenter__(self):
            return self

        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # Here would be the code that cleanly closes the resource
            print("closed")


    async def handle_resource(resource):
        async with resource:
            ...


    async def main():
        async with anyio.create_task_group() as tg:
            tg.start_soon(handle_resource, Resource())
            tg.cancel_scope.cancel()


    # Output: "closed"
    asyncio.run(main())


.. seealso:: :doc:`cancellation`

Design problems with asyncio queues
+++++++++++++++++++++++++++++++++++

While the :class:`asyncio.Queue` class was upgraded in Python 3.13 to support the notion of shutting down, there are
still a number of issues with it:

#. Queues are unbounded by default
#. Queues don't support async iteration
#. Queue shutdown doesn't play nice with multiple producers

The problem with unbounded queues is that careless use of such queues may cause runaway memory use and thus lead to out
of memory errors. This default behavior is unfortunately unfixable due to backwards compatibility reasons.

The second problem is mostly an ergonomics issue. A PR was made to address this, but was
`declined <https://github.com/python/cpython/pull/120925#issuecomment-2370151879>`_.

The third problem manifests itself when multiple producer tasks put items to the same queue. If one producer shuts down
the queue, the others will get unwarranted errors when trying to put more items to the queue. Therefore the producers
either need another means to coordinate the queue shutdown, or they need to be launched in a task group in such a manner
that the host task shuts down the queue after the producer tasks have exited. Either way, the design is not ideal for
multiple producer tasks.

.. _queue_fix:

How does AnyIO fix these problems?
----------------------------------

AnyIO offers an alternative to queues: :ref:`memory object streams <memory object streams>`. They were modeled after
Trio's `memory channels`_. When you create a memory object stream, you get a "send" stream and a "receive" stream. The
separation is necessary for the purpose of cloning (explained below). By default, memory object streams have an item
capacity of 0, meaning the stream does not store anything. In other words, a send operation will not complete until
another task shows up to receive the item.

Memory object streams support cloning. This enables each consumer and producer task to close its own clone of the
receive or send stream. Only after all clones have been closed is the respective send or receive memory object stream
considered to be closed.

Unlike :class:`asyncio.Queue`, memory object receive streams support async iteration. The ``async for`` loop then ends
naturally when all send stream clones have been closed. For send streams, attempting to send an item when all receive
stream clones have been closed raises a :exc:`BrokenResourceError`.

Memory object streams also provide better debugging facilities via the
:meth:`~.streams.memory.MemoryObjectReceiveStream.statistics` method which can tell you:

* the number of queued items
* the number of open send and receive streams
* how many tasks are waiting to send or receive to/from the stream

.. _memory channels: https://trio.readthedocs.io/en/stable/reference-core.html#using-channels-to-pass-values-between-tasks

Design problems with the streams API
++++++++++++++++++++++++++++++++++++

While asyncio provides a limited set of `stream classes`_, their callback-based design unfortunately shines through from
the API. First of all, unlike regular sockets, you get a separate reader and writer object instead of a full-duplex
stream which you would essentially get from the :mod:`socket` functions. Second, in order to send data to the stream,
you have to first call the synchronous :meth:`~asyncio.StreamWriter.write` method which adds data to the internal
buffer, and then you have to remember to call the coroutine method :meth:`~asyncio.StreamWriter.drain` which then
*actually* causes the data to be written to the underlying socket. Likewise, when you close a stream, you first have to
call :meth:`~asyncio.StreamWriter.close` and *then* await on :meth:`~asyncio.StreamWriter.wait_closed` to make sure the
stream has *actually* closed! To add insult to injury, these classes don't even support the async context manager
protocol so you can't just do ``async with writer: ...``.

Another issue lies with the :meth:`~asyncio.StreamWriter.get_extra_info` method asyncio provides to get information like
the remote address for socket connections, or the raw socket object:

#. This method only exists in the writer class, not the reader (for whatever reason).
#. It returns a dictionary, so to get the information you want, you'll need to access one of the keys in the returned
   dict, based on the documentation.
#. It is not type safe, as Typeshed specifies the return type as ``dict[str, Any]``. Therefore, static type checkers
   cannot check the correctness of any access to the returned dict based on either the keys or the value types.

.. _stream classes: https://docs.python.org/3/library/asyncio-stream.html

How does AnyIO fix these problems?
----------------------------------

AnyIO comes with hierarchy of base stream classes:

* :class:`~.abc.UnreliableObjectStream`, :class:`~.abc.UnreliableObjectReceiveStream` and
  :class:`~.abc.UnreliableObjectSendStream`: for transporting objects; no guarantees of reliable or ordered delivery,
  just like with UDP sockets
* :class:`~.abc.ObjectStream`, :class:`~.abc.ObjectReceiveStream`, :class:`~.abc.ObjectSendStream`: like the above, but
  with added guarantees about reliable and ordered delivery
* :class:`~.abc.ByteStream`, :class:`~.abc.ByteReceiveStream`, :class:`~.abc.ByteSendStream`: for transporting bytes;
  may split chunks arbitrarily, just like TCP sockets
* :class:`~.abc.SocketStream`: byte streams backed by actual sockets

These interfaces are then implemented by a number of concrete classes, such as:

* :class:`~.streams.memory.MemoryObjectReceiveStream` and :class:`~.streams.memory.MemoryObjectSendStream`: for
  exchanging arbitrary objects between tasks within the same process (see :ref:`this section <queue_fix>` for the
  rationale for the sender/receiver split)
* :class:`~.streams.buffered.BufferedByteReceiveStream` and :class:`~.streams.buffered.BufferedByteStream`: for adapting
  bytes-oriented object streams into byte streams, and for supporting read operations that require a buffer, such as
  needing to read a precise amount of bytes, or reading up to a specific delimiter
* :class:`~.streams.tls.TLSStream`: for using TLS encryption over any arbitrary (bytes-oriented) stream
* :class:`~.streams.text.TextReceiveStream` and :class:`~.streams.text.TextStream`: for turning a bytes-oriented stream
  into a unicode string-oriented stream
* :class:`~.streams.file.FileReadStream` and :class:`~.streams.file.FileWriteStream`: for reading from or writing to
  files
* :class:`~.streams.stapled.StapledObjectStream` and :class:`~.streams.stapled.StapledByteStream`: for combining
  different read and write streams into full-duplex streams

.. important:: In contrast with regular sockets or asyncio streams, AnyIO streams raise :exc:`EndOfStream` instead of
    returning an empty bytes object or ``None`` when there is no more data to be read.

.. seealso:: :doc:`streams`

As a counterpart to :meth:`~asyncio.StreamWriter.get_extra_info`, AnyIO offers a system of typed attributes where stream
classes (and any kind of class, really) can offer such extra information in a type safe manner. This is
especially useful with stream wrappers such as :class:`~.streams.tls.TLSStream`. Stream wrapper
classes like that can pass through any typed attributes from the wrapped stream while adding their own on top. They can
also choose to just override any attributes they like, all the while preserving type safety.

.. seealso:: :doc:`typedattrs`

Design problems with the thread API
+++++++++++++++++++++++++++++++++++

Asyncio comes with two ways to call blocking code in worker threads, each with its own caveats:

#. :func:`asyncio.to_thread`
#. :meth:`asyncio.loop.run_in_executor`

The first function is the more modern one, and supports :mod:`contextvar <contextvars>` propagation. However, there is
no way to use it with a thread pool executor other than the default. And due to the design decision of allowing the
pass-through of arbitrary positional and keyword arguments, no such option can ever be added without breaking backwards
compatibility. The second function, on the other hand, allows for explicitly specifying a thread pool to use, but it
doesn't support context variable propagation.

Another inconvenience comes from the inability to synchronously call synchronous functions in the event loop thread from
a worker thread. That is, running a synchronous function in the event loop thread and then returning its return value
from that call. While asyncio provides a way to do this for coroutine functions
(:func:`~asyncio.run_coroutine_threadsafe`), there is no counterpart for synchronous functions. The closest match would
be :meth:`~asyncio.loop.call_soon_threadsafe`, this function only schedules a callback to be run on the event loop
thread and does not provide any means to retrieve the return value.

How does AnyIO fix these problems?
----------------------------------

AnyIO uses its own thread pooling mechanism, based on :ref:`capacity limiters <capacity-limiters>` which are similar to
semaphores. To call a function in a worker thread, you would use :func:`.to_thread.run_sync`. This function can be
passed a specific capacity limiter to count against. All worker threads will be spawned in a thread pool specific to the
current event loop, and can be reused in any call to :func:`.to_thread.run_sync`, regardless of the capacity limiter
used. More worker threads will be spawned as necessary, so long as the capacity limiter allows it. The event loop's
thread pool is homogeneous, meaning idle threads in it are reused regardless of which capacity limiter was passed to the
call that spawned them.

.. note:: :func:`anyio.to_thread.run_sync` propagates context variables just like :func:`asyncio.to_thread`.

From inside AnyIO worker threads, you can call functions in the event loop thread using :func:`.from_thread.run` and
:func:`.from_thread.run_sync`, for coroutine functions and synchronous functions, respectively. The former is a direct
counterpart to asyncio's :func:`~asyncio.run_coroutine_threadsafe`, but the latter will wait for the function to run and
return its return value, unlike :meth:`~asyncio.loop.call_soon_threadsafe`.

Design problems with signal handling APIs
+++++++++++++++++++++++++++++++++++++++++

Asyncio only provides facilities to set or remove signal handlers. The :meth:`~asyncio.loop.add_signal_handler` method
will replace any existing handler for that signal, and won't return the previous handler for potential chaining. There
is also no way to get the current handler for a signal.

AnyIO provides an alternate mechanism to handle signals with its :func:`open_signal_receiver` context manager.

.. seealso:: :doc:`signals`

Missing file I/O and async path support
+++++++++++++++++++++++++++++++++++++++

Asyncio contains no facilities to help with file I/O, forcing you to use :func:`~asyncio.to_thread` or
:meth:`~asyncio.loop.run_in_executor` with every single file operation to prevent blocking the event loop thread.

To overcome this shortcoming, users often turn to libraries such as aiofiles_ and aiopath_ which offer async interfaces
for file and path access. However, AnyIO provides its own set of async file I/O APIs, including an async compatible
counterpart for the :class:`~pathlib.Path` class. Additionally, it should be noted that AnyIO provides
:ref:`file streams <FileStreams>` compatible with its stream class hierarchy.

.. seealso:: :doc:`fileio`

.. _aiofiles: https://github.com/Tinche/aiofiles
.. _aiopath: https://github.com/alexdelorenzo/aiopath

Features not in asyncio which you might be interested in
++++++++++++++++++++++++++++++++++++++++++++++++++++++++

AnyIO doesn't just offer replacements for asyncio APIs, but provides a bunch of its own conveniences which you may find
helpful.

Built-in pytest plugin
----------------------

AnyIO contains its own pytest_ plugin for running asynchronous tests. It can completely replace pytest-asyncio_ for
testing asynchronous code. It is somewhat simpler to use too, in addition to supporting more event loop implementations
such as Trio_.

.. seealso:: :doc:`testing`

.. _pytest: https://docs.pytest.org/
.. _pytest-asyncio: https://github.com/pytest-dev/pytest-asyncio

Connectables
------------

To complement its stream class hierarchy, AnyIO offers an abstraction for producing connected streams, either object or
bytes-oriented. This can be very useful when writing network clients, as abstracting out the connection mechanism allows
for a lot of customization, including mocking connections without having to resort to monkey patching.

.. seealso:: :ref:`connectables`

Context manager mix-in classes
------------------------------

AnyIO provides mix-in classes for safely implementing context managers which embed other context managers. Typically
this would require implementing ``__aenter__`` and ``__aexit__``, often requiring these classes to store state in the
instance and dealing with exceptions raised in ``__aenter__()``. The context manager mix-ins allow you to replace these
method pairs with a single method where you write your logic just like with
:func:`@asynccontextmanager <contextlib.asynccontextmanager>`, albeit at the cost of sacrificing re-entrancy.

.. seealso:: :doc:`contextmanagers`

.. _Trio: https://github.com/python-trio/trio