File: threaded_runner.rst

package info (click to toggle)
datalad 1.1.5-2.1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 7,140 kB
  • sloc: python: 69,392; sh: 1,521; makefile: 220
file content (87 lines) | stat: -rw-r--r-- 7,496 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
.. -*- mode: rst -*-
.. vi: set ft=rst sts=4 ts=4 sw=4 et tw=79:

.. _chap_threaded_runner:


****************
Threaded runner
****************

.. topic:: Specification scope and status

   This specification provides an overview over the current implementation of the subprocess runner that is used throughout datalad.

Threads
=======

DataLad often requires the execution of subprocesses. While subprocesses are executed, datalad, i.e. its main thread, should be able to read data from stdout and stderr of the subprocess as well as write data to stdin of the subprocess. This requires a way to efficiently multiplex reading from stdout and stderr of the subprocess as well as writing to stdin of the subprocess.

Since non-blocking IO and waiting on multiple sources (poll or select) differs vastly in terms of capabilities and API on different OSs, we decided to use blocking IO and threads to multiplex reading from different sources.

Generally we have a number of threads that might be created and executed, depending on the need for writing to stdin or reading from stdout or stderr. Each thread can read from either a single queue or a file descriptor. Reading is done blocking. Each thread can put data into multiple queues. This is used to transport data that was read as well as for signaling conditions like closed file descriptors.

Conceptually, there are the main thread and two different types of threads:

 - type 1: transport threads (1 thread per process I/O descriptor)
 - type 2: process waiting thread (1 thread)

Transport Threads
.................

Besides the main thread, there might be up to three additional threads to handle data transfer to ``stdin``, and from ``stdout`` and ``stderr``. Each of those threads copies data between queues and file descriptors in a tight loop. The stdin-thread reads from an input-queue, the stdout- and stderr-threads write to an output queue. Each thread signals its exit to a set of signal queues, which might be identical to the output queues.

The ``stdin``-thread reads data from a queue and writes it to the ``stdin``-file descriptor of the sub-process. If it reads ``None`` from the queue, it will exit. The thread will also exit, if an exit is requested by calling ``thread.request_exit()``, or if an error occurs during writing. In all cases it will enqueue a ``None`` to all its signal-queues.

The ``stdout``- and ``stderr``-threads read from the respective file descriptor and enqueue data into their output queue, unless the data has zero length (which indicates a closed descriptor). On a zero-length read they exit and enqueue ``None`` into their signal queues.

All queues are infinite. Nevertheless signaling is performed with a timeout of one 100 milliseconds in order to ensure that threads can exit.


Process Waiting Thread
......................

The process waiting thread waits for a given process to exit and enqueues an exit notification into it signal queues.



Main Thread
...........

There is a single queue, the ``output_queue``, on which the main thread waits, after all transport threads, and the process waiting thread are started. The ``output_queue`` is the signaling queue and the output queue of the stderr-thread and the stdout-thread. It is also the signaling queue of the stdin-thread, and it is the signaling queue for the process waiting threads.

The main thread waits on the ``output_queue`` for data or signals and handles them accordingly, i.e. calls data callbacks of the protocol if data arrives, and calls connection-related callbacks of the protocol if other signals arrive. If no messages arrive on the  ``output_queue``, the main thread blocks for 100ms. If it is unblocked, either by getting a message or due to elapsing of the 100ms, it will process timeouts. If the ``timeout``-parameter to the constructor was not ``None``, it will check the last time any of the monitored files (stdout and/or stderr) yielded data. If the time is larger than the specified timeout, it will call the ``timeout`` method of the protocol instance. Due to this implementation, the resolution for timeouts is 100ms. The main thread handles the closing of ``stdin``-, ``stdout``-, and ``stderr``-file descriptors if all other threads have terminated and if ``output_queue`` is empty. These tasks are either performed in the method ``ThreadedRunner.run()`` or in a result generator that is returned by  ``ThreadedRunner.run()`` whenever ``send()`` is called on it.


Protocols
=========

Due to its history datalad uses the protocol defined in ``asyncio.protocols.SubprocessProtocol`` and in ``asyncio.protocols.BaseProtocol``. To keep compatibility with the code base, the threaded-runner implementation uses the same interface. Please note, although we use the same interface and although the interface is defined in the asyncio libraries, the threaded-runner implementation does not make any use of ``asyncio``. The description of the interface nevertheless applies in the context of the threaded-runner. The following methods of the ``SubprocessProtocol`` are supported.

    - ``SubprocessProtocol.pipe_data_received(fd, data)``
    - ``SubprocessProtocol.pipe_connection_lost(fd, exc)``
    - ``SubprocessProtocol.process_exited()``

In addition the following methods of ``BaseProtocol`` are supported:

    - ``BaseProtocol.connection_made(transport)``
    - ``BaseProtocol.connection_lost(exc)``


The datalad-provided protocol ``datalad.runners.protocol.WitlessProtocol`` provides an additional callback:

    - ``WitlessProtocol.timeout(fd)``

The method ``timeout()`` will be called when the parameter ``timeout`` in ``WitlessRunner.run``, ``ThreadedRunner.run``, or ``run_command`` is set to a number specifying the desired timeout in seconds. If no data is received from ``stdin``, or ``stderr`` (if those are supposed to be captured), the method ``WitlessProtocol.timeout(fd)`` is called with ``fd`` set to the respective file number, e.g. 1, or 2. If ``WitlessProtocol.timeout(fd)`` returns ``True``, only the corresponding file descriptor will be closed and the associated threads will exit.

The method ``WitlessProtocol.timeout(fd)`` is also called if stdout, stderr and stdin are closed and the process does not exit within the given interval. In this case ``fd`` is set to ``None``. If ``WitlessProtocol.timeout(fd)`` returns ``True`` the process is terminated.


Object and Generator Results
================================

If the protocol that is provided to ``run()`` does not inherit ``datalad.runner.protocol.GeneratorMixIn``, the final result that will be returned to the caller is determined by calling ``WitlessProtocol._prepare_result()``. Whatever object this method returns will be returned to the caller.

If the protocol that is provided to ``run()`` does inherit ``datalad.runner.protocol.GeneratorMixIn``, ``run()`` will return a ``Generator``. This generator will yield the elements that were sent to it in the protocol-implementation by calling ``GeneratorMixIn.send_result()`` in the order in which the method ``GeneratorMixIn.send_result()`` is called. For example, if ``GeneratorMixIn.send_result(43)`` is called, the generator will yield ``43``, and if ``GeneratorMixIn.send_result({"a": 123, "b": "some data"})`` is called, the generator will yield ``{"a": 123, "b": "some data"}``.

Internally the generator is implemented by keeping track of the process state and waiting in the ``output_queue`` once, when ``send`` (or ``__next__``) is called on it.