File: multiprocessing.rst

package info (click to toggle)
python-ruffus 2.6.3%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 20,828 kB
  • ctags: 2,843
  • sloc: python: 15,745; makefile: 180; sh: 14
file content (293 lines) | stat: -rw-r--r-- 12,914 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
.. include:: ../../global.inc
.. include:: manual_chapter_numbers.inc

.. index::
    pair: multiprocessing; Tutorial

.. _new_manual.multiprocessing:

####################################################################################################################################################
|new_manual.multiprocessing.chapter_num|: Multiprocessing, ``drmaa`` and Computation Clusters
####################################################################################################################################################


.. seealso::

    * :ref:`Manual Table of Contents <new_manual.table_of_contents>`
    * :ref:`@jobs_limit <decorators.jobs_limit>` syntax
    * :ref:`pipeline_run() <pipeline_functions.pipeline_run>` syntax
    * :ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` syntax

.. note::

    Remember to look at the example code:

        * :ref:`new_manual.multiprocessing.code`

***********************
Overview
***********************

.. index::
    pair: pipeline_run(multiprocess); Tutorial

=====================
Multi Processing
=====================

    *Ruffus* uses python `multiprocessing <http://docs.python.org/library/multiprocessing.html>`_ to run
    each job in a separate process.

    This means that jobs do *not* necessarily complete in the order of the defined parameters.
    Task hierachies are, of course, inviolate: upstream tasks run before downstream, dependent tasks.

    Tasks that are independent (i.e. do not precede each other) may be run in parallel as well.

    The number of concurrent jobs can be set in :ref:`pipeline_run<pipeline_functions.pipeline_run>`:

        ::

            pipeline_run([parallel_task], multiprocess = 5)


    If ``multiprocess`` is set to 1, then jobs will be run on a single process.



.. index::
    pair: data sharing across processes; Tutorial

=====================
Data sharing
=====================

    Running jobs in separate processes allows *Ruffus* to make full use of the multiple
    processors in modern computers. However, some `multiprocessing guidelines <http://docs.python.org/library/multiprocessing.html#multiprocessing-programming>`_
    should be borne in mind when writing *Ruffus* pipelines. In particular:

    * Try not to pass large amounts of data between jobs, or at least be aware that this has to be marshalled
      across process boundaries.

    * Only data which can be `pickled <http://docs.python.org/library/pickle.html>`_ can be passed as
      parameters to *Ruffus* task functions. Happily, that applies to almost any native Python data type.
      The use of the rare, unpicklable object will cause python to complain (fail) loudly when *Ruffus* pipelines
      are run.



.. index::
    pair: @jobs_limit; Tutorial

.. _new_manual.jobs_limit:


********************************************************************************************
Restricting parallelism with :ref:`@jobs_limit <decorators.jobs_limit>`
********************************************************************************************

    Calling :ref:`pipeline_run(multiprocess = NNN)<pipeline_functions.pipeline_run>` allows
    multiple jobs (from multiple independent tasks) to be run in parallel. However, there
    are some operations that consume so many resources that we might want them to run
    with less or no concurrency.

    For example, we might want to download some files via FTP but the server restricts
    requests from each IP address. Even if the rest of the pipeline is running 100 jobs in
    parallel, the FTP downloading must be restricted to 2 files at a time. We would really
    like to keep the pipeline running as is, but let this one operation run either serially,
    or with little concurrency.


    * :ref:`pipeline_run(multiprocess = NNN)<pipeline_functions.pipeline_run>` sets the pipeline-wide concurrency but
    * :ref:`@jobs_limit(MMM)<decorators.jobs_limit>` sets concurrency at ``MMM`` only for jobs in the decorated task.

    The optional name (e.g. ``@jobs_limit(3, "ftp_download_limit")``) allows the same limit to
    be shared across multiple tasks. To be pedantic: a limit of ``3`` jobs at a time would be applied
    across all tasks which have a ``@jobs_limit`` named ``"ftp_download_limit"``.

    The :ref:`example code<new_manual.multiprocessing.code>` uses up to 10 processes across the
    pipeline, but runs the ``stage1_big`` and ``stage1_small`` tasks 3 at a time (shared across
    both tasks). ``stage2`` jobs run 5 at a time.



.. _new_manual.ruffus.drmaa_wrapper.run_job:

********************************************************************************************
Using ``drmaa`` to dispatch work to Computational Clusters or Grid engines from Ruffus jobs
********************************************************************************************

    Ruffus has been widely used to manage work on computational clusters or grid engines. Though Ruffus
    task functions cannot (yet!) run natively and transparently on remote cluster nodes, it is trivial
    to dispatch work across the cluster.

    From version 2.4 onwards, Ruffus includes an optional helper module which interacts with
    `python bindings  <https://github.com/drmaa-python/drmaa-python>`__ for the widely used `drmaa  <http://en.wikipedia.org/wiki/DRMAA>`__
    Open Grid Forum API specification. This allows jobs to dispatch work to a computational cluster and wait until it completes.


    Here are the necessary steps

==============================================================================
1) Use a shared drmaa session:
==============================================================================

    Before your pipeline runs:

    .. code-block:: python

        #
        #   start shared drmaa session for all jobs / tasks in pipeline
        #
        import drmaa
        drmaa_session = drmaa.Session()
        drmaa_session.initialize()


    Cleanup after your pipeline completes:

    .. code-block:: python

        #
        #   pipeline functions go here
        #
        if __name__ == '__main__':
            drmaa_session.exit()


==============================================================================
2) import ``ruffus.drmaa_wrapper``
==============================================================================

    * The optional ``ruffus.drmaa_wrapper`` module needs to be imported explicitly:

    .. code-block:: python
        :emphasize-lines: 1

        # imported ruffus.drmaa_wrapper explicitly
        from ruffus.drmaa_wrapper import run_job, error_drmaa_job


==============================================================================
3) call :ref:`drmaa_wrapper.run_job()<drmaa_wrapper.run_job>`
==============================================================================

    :ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` dispatches the work to a cluster node within a normal Ruffus job and waits for completion

    This is the equivalent of `os.system  <http://docs.python.org/2/library/os.html#os.system>`__  or
    `subprocess.check_output  <http://docs.python.org/2/library/subprocess.html#subprocess.check_call>`__ but the code will run remotely as specified:

        .. code-block:: python
            :emphasize-lines: 1

                # ruffus.drmaa_wrapper.run_job
                stdout_res, stderr_res  = run_job(cmd_str           = "touch " + output_file,
                                                  job_name          = job_name,
                                                  logger            = logger,
                                                  drmaa_session     = drmaa_session,
                                                  run_locally       = options.local_run,
                                                  job_other_options = job_other_options)

        The complete code is available :ref:`here <using_ruffus.drmaa_wrapper>`

    * :ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` is a convenience wrapper around the `python drmaa bindings <https://github.com/drmaa-python/drmaa-python>`__
      `RunJob <http://drmaa-python.readthedocs.org/en/latest/tutorials.html#waiting-for-a-job>`__ function.
      It takes care of writing drmaa *job templates* for you.
    * Each call creates a separate drmaa *job template*.

==================================================================================================
4) Use multithread: :ref:`pipeline_run(multithread = NNN) <pipeline_functions.pipeline_run>`
==================================================================================================

    .. warning ::

        :ref:`drmaa_wrapper.run_job()<drmaa_wrapper.run_job>`

            **requires** ``pipeline_run`` :ref:`(multithread = NNN)<pipeline_functions.pipeline_run>`

            **and will not work with**  ``pipeline_run`` :ref:`(multiprocess = NNN)<pipeline_functions.pipeline_run>`


    Using multithreading rather than multiprocessing
        * allows the drmaa session to be shared
        * prevents "processing storms" which lock up the queue submission node when hundreds or thousands of grid engine / cluster commands complete at the same time.

        .. code-block:: python

            pipeline_run (..., multithread = NNN, ...)

        or if you are using ruffus.cmdline:

        .. code-block:: python

            cmdline.run (options, multithread = options.jobs)


    Normally multithreading reduces the amount of parallelism in python due to the python `Global interpreter Lock (GIL)  <http://en.wikipedia.org/wiki/Global_Interpreter_Lock>`__.
    However, as the work load is almost entirely on another computer (i.e. a cluster / grid engine node) with a separate python interpreter, any cost benefit calculations of this sort are moot.

==================================================================================================
5) Develop locally
==================================================================================================

    :ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` provides two convenience parameters for developing grid engine pipelines:

    * commands can run locally, i.e. on the local machine rather than on cluster nodes:

          .. code-block:: python

              run_job(cmd_str, run_locally = True)

    * Output files can be `touch  <http://en.wikipedia.org/wiki/Touch_(Unix)>`__\ed, i.e. given the appearance of the work having being done without actually running the commands

          .. code-block:: python

              run_job(cmd_str, touch_only = True)


.. index::
    pair: pipeline_run touch mode; Tutorial
    pair: touch mode pipeline_run; Tutorial

.. _new_manual.pipeline_run_touch:


********************************************************************************************
Forcing a pipeline to appear up to date
********************************************************************************************

    Sometimes, we *know* that a pipeline has run to completion, that everything is up-to-date. However, Ruffus still insists on the basis
    of file modification times that you need to rerun.

    For example, sometimes a trivial accounting modification needs to be made to a data file.
    Even though you know that this changes nothing in practice, Ruffus will detect the modification and
    ask to rerun everything from that point forwards.

    One way to convince Ruffus that everything is fine is to manually `touch  <http://en.wikipedia.org/wiki/Touch_(Unix)>`__
    all subsequent data files one by one in sequence so that the file timestamps follow the appropriate progression.

    You can also ask *Ruffus* to do this automatically for you by running the pipeline in `touch  <http://en.wikipedia.org/wiki/Touch_(Unix)>`__
    mode:

        .. code-block:: python

            pipeline_run( touch_files_only = True)


    :ref:`pipeline_run <pipeline_functions.pipeline_run>` will run your pipeline script normally working backwards from any specified final target, or else the
    last task in the pipeline. It works out where it should begin running, i.e. with the first out-of-date data files.
    After that point, instead of calling your pipeline task functions, each missing or out-of-date file is
    `touch-ed  <http://en.wikipedia.org/wiki/Touch_(Unix)>`__ in turn so that the file modification dates
    follow on successively.


    This turns out to be useful way to check that your pipeline runs correctly by creating a series of dummy (empty files).
    However, *Ruffus* does not know how to read your mind to know which files to create from :ref:`@split <decorators.split>` or
    :ref:`@subdivide <decorators.subdivide>` tasks.


    Using :ref:`ruffus.cmdline <new_manual.cmdline>` from version 2.4, you can just specify:

        .. code-block:: bash

            your script --touch_files_only [--other_options_of_your_own_etc]