File: taskmgnt.rst

package info (click to toggle)
clustershell 1.9.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,228 kB
  • sloc: python: 20,978; makefile: 149
file content (315 lines) | stat: -rw-r--r-- 12,102 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
Task management
===============

.. highlight:: python

.. _class-Task:

Structure of Task
-----------------

A ClusterShell *Task* and its underlying *Engine* class are the fundamental
infrastructure associated with a thread. An *Engine* implements an event
processing loop that you use to schedule work and coordinate the receipt of
incoming events. The purpose of this run loop is to keep your thread busy when
there is work to do and put your thread to sleep when there is none. When
calling the :meth:`.Task.resume()` or :meth:`.Task.run()` methods, your thread
enters the Task Engine run loop and calls installed event handlers in response
to incoming events.

Using Task objects
------------------

A *Task* object provides the main interface for adding shell commands, files
to copy or timer and then running it. Every thread has a single *Task* object
(and underlying *Engine* object) associated with it. The *Task* object is an
instance of the :class:`.Task` class.


Getting a Task object
^^^^^^^^^^^^^^^^^^^^^

To get the *Task* object bound to the **current thread**, you use one of the following:

* Use the :func:`.Task.task_self()` function available at the root of the Task
  module
* or use ``task = Task()``; Task objects are only instantiated when needed.

Example of getting the current task object::

    >>> from ClusterShell.Task import task_self
    >>> task = task_self()

So for a single-threaded application, a Task is a simple singleton (which
instance is also available through :func:`.Task.task_self()`).

To get the *Task* object associated to a specific thread identified by the
identifier *tid*, you use the following::

    >>> from ClusterShell.Task import Task
    >>> task = Task(thread_id=tid)


.. _class-Task-configure:

Configuring the Task object
^^^^^^^^^^^^^^^^^^^^^^^^^^^

Each *Task* provides an info dictionary that shares both internal
*Task*-specific parameters and user-defined (key, value) parameters. Use the
following :class:`.Task` class methods to get or set parameters:

* :meth:`.Task.info`
* :meth:`.Task.set_info`


For example, to configure the task debugging behavior::

    >>> task.set_info('debug', True)
    >>> task.info('debug')
    True

You can also use the *Task* info dictionary to set your own *Task*-specific
key, value pairs. You may use any free keys but only keys starting with
*USER_* are guaranteed not to be used by ClusterShell in the future.

Task info keys and their default values:

+-----------------+----------------+------------------------------------+
| Info key string | Default value  | Comment                            |
+=================+================+====================================+
| debug           | False          | Enable debugging support (boolean) |
+-----------------+----------------+------------------------------------+
| print_debug     | internal using | Default is to print debug lines to |
|                 | *print*        | stdout using *print*. To override  |
|                 |                | this behavior, set a function that |
|                 |                | takes two arguments (the task      |
|                 |                | object and a string) as the value. |
+-----------------+----------------+------------------------------------+
| fanout          | 64             | Ssh *fanout* window (integer)      |
+-----------------+----------------+------------------------------------+
| connect_timeout | 10             | Value passed to ssh or pdsh        |
|                 |                | (integer)                          |
+-----------------+----------------+------------------------------------+
| command_timeout | 0 (no timeout) | Value passed to ssh or pdsh        |
|                 |                | (integer)                          |
+-----------------+----------------+------------------------------------+

Below is an example of `print_debug` override. As you can see, we set the
function `print_csdebug(task, s)` as the value. When debugging is enabled,
this function will be called for any debug text line. For example, this
function searches for any known patterns and print a modified debug line to
stdout when found::

    def print_csdebug(task, s):
       m = re.search("(\w+): SHINE:\d:(\w+):", s)
       if m:
           print "%s<pickle>" % m.group(0)
       else:
           print s

    # Install the new debug printing function
    task_self().set_info("print_debug", print_csdebug)


.. _taskshell:

Submitting a shell command
^^^^^^^^^^^^^^^^^^^^^^^^^^

You can submit a set of commands for local or distant execution in parallel
with :meth:`.Task.shell`.

Local usage::

    task.shell(command [, key=key] [, handler=handler] [, timeout=secs])

Distant usage::

    task.shell(command, nodes=nodeset [, handler=handler] [, timeout=secs])

This method makes use of the default local or distant worker. ClusterShell
uses a default Worker based on the Python Popen2 standard module to execute
local commands, and a Worker based on *ssh* (Secure SHell) for distant
commands.

If the Task is not running, the command is scheduled for later execution. If
the Task is currently running, the command is executed as soon as possible
(depending on the current *fanout*).

To set a per-worker (eg. per-command) timeout value, just use the timeout
parameter (in seconds), for example::

    task.shell("uname -r", nodes=remote_nodes, handler=ehandler, timeout=5)

This is the preferred way to specify a command timeout.
:meth:`.EventHandler.ev_timeout` event is generated before the worker has finished to
indicate that some nodes have timed out. You may then retrieve the nodes with
:meth:`.DistantWorker.iter_keys_timeout()`.

Submitting a file copy action
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Local file copy to distant nodes is supported. You can submit a copy action
with :meth:`.Task.copy`::

    task.copy(source, dest, nodes=nodeset [, handler=handler] [, timeout=secs])

This method makes use of the default distant copy worker which is based on scp
(Secure CoPy) which comes with OpenSSH.

If the Task is not running, the copy is scheduled for later execution. If the
Task is currently running, the copy is started as soon as possible (depending
on the current *fanout*).

Starting the Task
^^^^^^^^^^^^^^^^^

Before you run a Task, you must add at least one worker (shell command, file
copy) or timer to it. If a Task does not have any worker to execute and
monitor, it exits immediately when you try to run it with::

    task.resume()

At this time, all previously submitted commands will start in the associated
Task thread. From a library user point of view, the task thread is blocked
until the end of the command executions.

Please note that the special method :meth:`.Task.run` does a
:meth:`.Task.shell` and a :meth:`.Task.resume` in once.

To set a Task execution timeout, use the optional *timeout* parameter to set
the timeout value in seconds. Once this time is elapsed when the Task is still
running, the running Task raises ``TimeoutError`` exception, cleaning by the
way all scheduled workers and timers. Using such a timeout ensures that the
Task will not exceed a given time for all its scheduled works. You can also
configure per-worker timeout that generates an event
:meth:`.EventHandler.ev_timeout` but will not raise an exception, allowing the
Task to continue. Indeed, using a per-worker timeout is the preferred way for
most applications.


Getting Task results
^^^^^^^^^^^^^^^^^^^^

After the task is finished (after :meth:`.Task.resume` or :meth:`.Task.run`)
or after a worker is completed when you have previously defined an event
handler (at :meth:`.EventHandler.ev_close`), you can use Task result getters:

* :meth:`.Task.iter_buffers`
* :meth:`.Task.iter_errors`
* :meth:`.Task.node_buffer`
* :meth:`.Task.node_error`
* :meth:`.Task.max_retcode`
* :meth:`.Task.num_timeout`
* :meth:`.Task.iter_keys_timeout`

Note: *buffer* refers to standard output, *error* to standard error.

Please see some examples in :ref:`prog-examples`.


Exiting the Task
^^^^^^^^^^^^^^^^

If a Task does not have anymore scheduled worker or timer (for example, if you
run one shell command and then it closes), it exits automatically from
:meth:`.Task.resume`. Still, except from a signal handler, you can always call
the following method to abort the Task execution:

* :meth:`.Task.abort`

For example, it is safe to call this method from an event handler within the
task itself. On abort, all scheduled workers (shell command, file copy) and
timers are cleaned and :meth:`.Task.resume` returns, unblocking the Task
thread from a library user point of view. Please note that commands being
executed remotely are not necessary stopped (this is due to *ssh(1)*
behavior).


.. _configuring-a-timer:

Configuring a Timer
^^^^^^^^^^^^^^^^^^^

A timer is bound to a Task (and its underlying Engine) and fires at a preset
time in the future. Timers can fire either only once or repeatedly at fixed
time intervals. Repeating timers can also have their next firing time manually
adjusted (see :meth:`.Task.timer`).

A timer is not a real-time mechanism; it fires when the Task's underlying
Engine to which the timer has been added is running and able to check if the
timer firing time has passed.

When a timer fires, the method :meth:`.EventHandler.ev_timer` of the
associated EventHandler is called.

To configure a timer, use the following (secs in seconds with floating point
precision)::

    task.timer(self, fire=secs, handler=handler [, interval=secs])


.. _task-default-worker:

Changing default worker
^^^^^^^^^^^^^^^^^^^^^^^

When calling :meth:`.Task.shell` or :meth:`.Task.copy` the Task object creates
a worker instance for each call. When the *nodes* argument is defined, the
worker class used for these calls is based on Task default *distant_worker*.
Change this value to use another worker class, by example **Rsh**::

    from ClusterShell.Task import task_self
    from ClusterShell.Worker.Rsh import WorkerRsh

    task_self().set_default('distant_worker', WorkerRsh)


Thread safety and Task objects
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


ClusterShell is an event-based library and one of its advantage is to avoid
the use of threads (and their safety issues), so it's mainly not thread-safe.
When possible, avoid the use of threads with ClusterShell. However, it's
sometimes not so easy, first because another library you want to use in some
event handler is not event-based and may block the current thread (that's
enough to break the deal). Also, in some cases, it could be useful for you to
run several Tasks at the same time. Since version 1.1, ClusterShell provides
support for launching a Task in another thread and some experimental support
for multiple Tasks, but:

* you should ensure that a Task is configured and accessed from one thread at
  a time before it's running (there is no API lock/mutex protection),
* once the Task is running, you should modify it only from the same thread
  that owns that Task (for example, you cannot call :meth:`.Task.abort` from
  another thread).

The library provides two thread-safe methods and a function for basic Task
interactions: :meth:`.Task.wait`, :meth:`.Task.join` and
:func:`.Task.task_wait` (function defined at the root of the Task module).
Please refer to the API documentation.

Configuring explicit Shell Worker objects
-----------------------------------------

We have seen in :ref:`taskshell` how to easily submit shell commands to the
Task. The :meth:`.Task.shell` method returns an already scheduled Worker
object. It is possible to instantiate the Worker object explicitly, for
example::

    from ClusterShell.Worker.Ssh import WorkerSsh

    worker = WorkerSsh('node3', command="/bin/echo alright")

To be used in a Task, add the worker to it with::

    task.schedule(worker)

If you have pdsh installed, you can use it by easily switching to the Pdsh
worker, which should behave the same manner as the Ssh worker::

    from ClusterShell.Worker.Pdsh import WorkerPdsh

    worker = WorkerPdsh('node3', command="/bin/echo alright")