| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 
 | Task management
===============
.. highlight:: python
.. _class-Task:
Structure of Task
-----------------
A ClusterShell *Task* and its underlying *Engine* class are the fundamental
infrastructure associated with a thread. An *Engine* implements an event
processing loop that you use to schedule work and coordinate the receipt of
incoming events. The purpose of this run loop is to keep your thread busy when
there is work to do and put your thread to sleep when there is none. When
calling the :meth:`.Task.resume()` or :meth:`.Task.run()` methods, your thread
enters the Task Engine run loop and calls installed event handlers in response
to incoming events.
Using Task objects
------------------
A *Task* object provides the main interface for adding shell commands, files
to copy or timer and then running it. Every thread has a single *Task* object
(and underlying *Engine* object) associated with it. The *Task* object is an
instance of the :class:`.Task` class.
Getting a Task object
^^^^^^^^^^^^^^^^^^^^^
To get the *Task* object bound to the **current thread**, you use one of the following:
* Use the :func:`.Task.task_self()` function available at the root of the Task
  module
* or use ``task = Task()``; Task objects are only instantiated when needed.
Example of getting the current task object::
    >>> from ClusterShell.Task import task_self
    >>> task = task_self()
So for a single-threaded application, a Task is a simple singleton (which
instance is also available through :func:`.Task.task_self()`).
To get the *Task* object associated to a specific thread identified by the
identifier *tid*, you use the following::
    >>> from ClusterShell.Task import Task
    >>> task = Task(thread_id=tid)
.. _class-Task-configure:
Configuring the Task object
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Each *Task* provides an info dictionary that shares both internal
*Task*-specific parameters and user-defined (key, value) parameters. Use the
following :class:`.Task` class methods to get or set parameters:
* :meth:`.Task.info`
* :meth:`.Task.set_info`
For example, to configure the task debugging behavior::
    >>> task.set_info('debug', True)
    >>> task.info('debug')
    True
You can also use the *Task* info dictionary to set your own *Task*-specific
key, value pairs. You may use any free keys but only keys starting with
*USER_* are guaranteed not to be used by ClusterShell in the future.
Task info keys and their default values:
+-----------------+----------------+------------------------------------+
| Info key string | Default value  | Comment                            |
+=================+================+====================================+
| debug           | False          | Enable debugging support (boolean) |
+-----------------+----------------+------------------------------------+
| print_debug     | internal using | Default is to print debug lines to |
|                 | *print*        | stdout using *print*. To override  |
|                 |                | this behavior, set a function that |
|                 |                | takes two arguments (the task      |
|                 |                | object and a string) as the value. |
+-----------------+----------------+------------------------------------+
| fanout          | 64             | Ssh *fanout* window (integer)      |
+-----------------+----------------+------------------------------------+
| connect_timeout | 10             | Value passed to ssh or pdsh        |
|                 |                | (integer)                          |
+-----------------+----------------+------------------------------------+
| command_timeout | 0 (no timeout) | Value passed to ssh or pdsh        |
|                 |                | (integer)                          |
+-----------------+----------------+------------------------------------+
Below is an example of `print_debug` override. As you can see, we set the
function `print_csdebug(task, s)` as the value. When debugging is enabled,
this function will be called for any debug text line. For example, this
function searches for any known patterns and print a modified debug line to
stdout when found::
    def print_csdebug(task, s):
       m = re.search("(\w+): SHINE:\d:(\w+):", s)
       if m:
           print "%s<pickle>" % m.group(0)
       else:
           print s
    # Install the new debug printing function
    task_self().set_info("print_debug", print_csdebug)
.. _taskshell:
Submitting a shell command
^^^^^^^^^^^^^^^^^^^^^^^^^^
You can submit a set of commands for local or distant execution in parallel
with :meth:`.Task.shell`.
Local usage::
    task.shell(command [, key=key] [, handler=handler] [, timeout=secs])
Distant usage::
    task.shell(command, nodes=nodeset [, handler=handler] [, timeout=secs])
This method makes use of the default local or distant worker. ClusterShell
uses a default Worker based on the Python Popen2 standard module to execute
local commands, and a Worker based on *ssh* (Secure SHell) for distant
commands.
If the Task is not running, the command is scheduled for later execution. If
the Task is currently running, the command is executed as soon as possible
(depending on the current *fanout*).
To set a per-worker (eg. per-command) timeout value, just use the timeout
parameter (in seconds), for example::
    task.shell("uname -r", nodes=remote_nodes, handler=ehandler, timeout=5)
This is the preferred way to specify a command timeout.
:meth:`.EventHandler.ev_timeout` event is generated before the worker has finished to
indicate that some nodes have timed out. You may then retrieve the nodes with
:meth:`.DistantWorker.iter_keys_timeout()`.
Submitting a file copy action
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Local file copy to distant nodes is supported. You can submit a copy action
with :meth:`.Task.copy`::
    task.copy(source, dest, nodes=nodeset [, handler=handler] [, timeout=secs])
This method makes use of the default distant copy worker which is based on scp
(Secure CoPy) which comes with OpenSSH.
If the Task is not running, the copy is scheduled for later execution. If the
Task is currently running, the copy is started as soon as possible (depending
on the current *fanout*).
Starting the Task
^^^^^^^^^^^^^^^^^
Before you run a Task, you must add at least one worker (shell command, file
copy) or timer to it. If a Task does not have any worker to execute and
monitor, it exits immediately when you try to run it with::
    task.resume()
At this time, all previously submitted commands will start in the associated
Task thread. From a library user point of view, the task thread is blocked
until the end of the command executions.
Please note that the special method :meth:`.Task.run` does a
:meth:`.Task.shell` and a :meth:`.Task.resume` in once.
To set a Task execution timeout, use the optional *timeout* parameter to set
the timeout value in seconds. Once this time is elapsed when the Task is still
running, the running Task raises ``TimeoutError`` exception, cleaning by the
way all scheduled workers and timers. Using such a timeout ensures that the
Task will not exceed a given time for all its scheduled works. You can also
configure per-worker timeout that generates an event
:meth:`.EventHandler.ev_timeout` but will not raise an exception, allowing the
Task to continue. Indeed, using a per-worker timeout is the preferred way for
most applications.
Getting Task results
^^^^^^^^^^^^^^^^^^^^
After the task is finished (after :meth:`.Task.resume` or :meth:`.Task.run`)
or after a worker is completed when you have previously defined an event
handler (at :meth:`.EventHandler.ev_close`), you can use Task result getters:
* :meth:`.Task.iter_buffers`
* :meth:`.Task.iter_errors`
* :meth:`.Task.node_buffer`
* :meth:`.Task.node_error`
* :meth:`.Task.max_retcode`
* :meth:`.Task.num_timeout`
* :meth:`.Task.iter_keys_timeout`
Note: *buffer* refers to standard output, *error* to standard error.
Please see some examples in :ref:`prog-examples`.
Exiting the Task
^^^^^^^^^^^^^^^^
If a Task does not have anymore scheduled worker or timer (for example, if you
run one shell command and then it closes), it exits automatically from
:meth:`.Task.resume`. Still, except from a signal handler, you can always call
the following method to abort the Task execution:
* :meth:`.Task.abort`
For example, it is safe to call this method from an event handler within the
task itself. On abort, all scheduled workers (shell command, file copy) and
timers are cleaned and :meth:`.Task.resume` returns, unblocking the Task
thread from a library user point of view. Please note that commands being
executed remotely are not necessary stopped (this is due to *ssh(1)*
behavior).
.. _configuring-a-timer:
Configuring a Timer
^^^^^^^^^^^^^^^^^^^
A timer is bound to a Task (and its underlying Engine) and fires at a preset
time in the future. Timers can fire either only once or repeatedly at fixed
time intervals. Repeating timers can also have their next firing time manually
adjusted (see :meth:`.Task.timer`).
A timer is not a real-time mechanism; it fires when the Task's underlying
Engine to which the timer has been added is running and able to check if the
timer firing time has passed.
When a timer fires, the method :meth:`.EventHandler.ev_timer` of the
associated EventHandler is called.
To configure a timer, use the following (secs in seconds with floating point
precision)::
    task.timer(self, fire=secs, handler=handler [, interval=secs])
.. _task-default-worker:
Changing default worker
^^^^^^^^^^^^^^^^^^^^^^^
When calling :meth:`.Task.shell` or :meth:`.Task.copy` the Task object creates
a worker instance for each call. When the *nodes* argument is defined, the
worker class used for these calls is based on Task default *distant_worker*.
Change this value to use another worker class, by example **Rsh**::
    from ClusterShell.Task import task_self
    from ClusterShell.Worker.Rsh import WorkerRsh
    task_self().set_default('distant_worker', WorkerRsh)
Thread safety and Task objects
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ClusterShell is an event-based library and one of its advantage is to avoid
the use of threads (and their safety issues), so it's mainly not thread-safe.
When possible, avoid the use of threads with ClusterShell. However, it's
sometimes not so easy, first because another library you want to use in some
event handler is not event-based and may block the current thread (that's
enough to break the deal). Also, in some cases, it could be useful for you to
run several Tasks at the same time. Since version 1.1, ClusterShell provides
support for launching a Task in another thread and some experimental support
for multiple Tasks, but:
* you should ensure that a Task is configured and accessed from one thread at
  a time before it's running (there is no API lock/mutex protection),
* once the Task is running, you should modify it only from the same thread
  that owns that Task (for example, you cannot call :meth:`.Task.abort` from
  another thread).
The library provides two thread-safe methods and a function for basic Task
interactions: :meth:`.Task.wait`, :meth:`.Task.join` and
:func:`.Task.task_wait` (function defined at the root of the Task module).
Please refer to the API documentation.
Configuring explicit Shell Worker objects
-----------------------------------------
We have seen in :ref:`taskshell` how to easily submit shell commands to the
Task. The :meth:`.Task.shell` method returns an already scheduled Worker
object. It is possible to instantiate the Worker object explicitly, for
example::
    from ClusterShell.Worker.Ssh import WorkerSsh
    worker = WorkerSsh('node3', command="/bin/echo alright")
To be used in a Task, add the worker to it with::
    task.schedule(worker)
If you have pdsh installed, you can use it by easily switching to the Pdsh
worker, which should behave the same manner as the Ssh worker::
    from ClusterShell.Worker.Pdsh import WorkerPdsh
    worker = WorkerPdsh('node3', command="/bin/echo alright")
 |