1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
|
.. include:: ../../global.inc
.. include:: manual_chapter_numbers.inc
.. index::
pair: multiprocessing; Tutorial
.. _new_manual.multiprocessing:
####################################################################################################################################################
|new_manual.multiprocessing.chapter_num|: Multiprocessing, ``drmaa`` and Computation Clusters
####################################################################################################################################################
.. seealso::
* :ref:`Manual Table of Contents <new_manual.table_of_contents>`
* :ref:`@jobs_limit <decorators.jobs_limit>` syntax
* :ref:`pipeline_run() <pipeline_functions.pipeline_run>` syntax
* :ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` syntax
.. note::
Remember to look at the example code:
* :ref:`new_manual.multiprocessing.code`
***********************
Overview
***********************
.. index::
pair: pipeline_run(multiprocess); Tutorial
=====================
Multi Processing
=====================
*Ruffus* uses python `multiprocessing <http://docs.python.org/library/multiprocessing.html>`_ to run
each job in a separate process.
This means that jobs do *not* necessarily complete in the order of the defined parameters.
Task hierachies are, of course, inviolate: upstream tasks run before downstream, dependent tasks.
Tasks that are independent (i.e. do not precede each other) may be run in parallel as well.
The number of concurrent jobs can be set in :ref:`pipeline_run<pipeline_functions.pipeline_run>`:
::
pipeline_run([parallel_task], multiprocess = 5)
If ``multiprocess`` is set to 1, then jobs will be run on a single process.
.. index::
pair: data sharing across processes; Tutorial
=====================
Data sharing
=====================
Running jobs in separate processes allows *Ruffus* to make full use of the multiple
processors in modern computers. However, some `multiprocessing guidelines <http://docs.python.org/library/multiprocessing.html#multiprocessing-programming>`_
should be borne in mind when writing *Ruffus* pipelines. In particular:
* Try not to pass large amounts of data between jobs, or at least be aware that this has to be marshalled
across process boundaries.
* Only data which can be `pickled <http://docs.python.org/library/pickle.html>`_ can be passed as
parameters to *Ruffus* task functions. Happily, that applies to almost any native Python data type.
The use of the rare, unpicklable object will cause python to complain (fail) loudly when *Ruffus* pipelines
are run.
.. index::
pair: @jobs_limit; Tutorial
.. _new_manual.jobs_limit:
********************************************************************************************
Restricting parallelism with :ref:`@jobs_limit <decorators.jobs_limit>`
********************************************************************************************
Calling :ref:`pipeline_run(multiprocess = NNN)<pipeline_functions.pipeline_run>` allows
multiple jobs (from multiple independent tasks) to be run in parallel. However, there
are some operations that consume so many resources that we might want them to run
with less or no concurrency.
For example, we might want to download some files via FTP but the server restricts
requests from each IP address. Even if the rest of the pipeline is running 100 jobs in
parallel, the FTP downloading must be restricted to 2 files at a time. We would really
like to keep the pipeline running as is, but let this one operation run either serially,
or with little concurrency.
* :ref:`pipeline_run(multiprocess = NNN)<pipeline_functions.pipeline_run>` sets the pipeline-wide concurrency but
* :ref:`@jobs_limit(MMM)<decorators.jobs_limit>` sets concurrency at ``MMM`` only for jobs in the decorated task.
The optional name (e.g. ``@jobs_limit(3, "ftp_download_limit")``) allows the same limit to
be shared across multiple tasks. To be pedantic: a limit of ``3`` jobs at a time would be applied
across all tasks which have a ``@jobs_limit`` named ``"ftp_download_limit"``.
The :ref:`example code<new_manual.multiprocessing.code>` uses up to 10 processes across the
pipeline, but runs the ``stage1_big`` and ``stage1_small`` tasks 3 at a time (shared across
both tasks). ``stage2`` jobs run 5 at a time.
.. _new_manual.ruffus.drmaa_wrapper.run_job:
********************************************************************************************
Using ``drmaa`` to dispatch work to Computational Clusters or Grid engines from Ruffus jobs
********************************************************************************************
Ruffus has been widely used to manage work on computational clusters or grid engines. Though Ruffus
task functions cannot (yet!) run natively and transparently on remote cluster nodes, it is trivial
to dispatch work across the cluster.
From version 2.4 onwards, Ruffus includes an optional helper module which interacts with
`python bindings <https://github.com/drmaa-python/drmaa-python>`__ for the widely used `drmaa <http://en.wikipedia.org/wiki/DRMAA>`__
Open Grid Forum API specification. This allows jobs to dispatch work to a computational cluster and wait until it completes.
Here are the necessary steps
==============================================================================
1) Use a shared drmaa session:
==============================================================================
Before your pipeline runs:
.. code-block:: python
#
# start shared drmaa session for all jobs / tasks in pipeline
#
import drmaa
drmaa_session = drmaa.Session()
drmaa_session.initialize()
Cleanup after your pipeline completes:
.. code-block:: python
#
# pipeline functions go here
#
if __name__ == '__main__':
drmaa_session.exit()
==============================================================================
2) import ``ruffus.drmaa_wrapper``
==============================================================================
* The optional ``ruffus.drmaa_wrapper`` module needs to be imported explicitly:
.. code-block:: python
:emphasize-lines: 1
# imported ruffus.drmaa_wrapper explicitly
from ruffus.drmaa_wrapper import run_job, error_drmaa_job
==============================================================================
3) call :ref:`drmaa_wrapper.run_job()<drmaa_wrapper.run_job>`
==============================================================================
:ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` dispatches the work to a cluster node within a normal Ruffus job and waits for completion
This is the equivalent of `os.system <http://docs.python.org/2/library/os.html#os.system>`__ or
`subprocess.check_output <http://docs.python.org/2/library/subprocess.html#subprocess.check_call>`__ but the code will run remotely as specified:
.. code-block:: python
:emphasize-lines: 1
# ruffus.drmaa_wrapper.run_job
stdout_res, stderr_res = run_job(cmd_str = "touch " + output_file,
job_name = job_name,
logger = logger,
drmaa_session = drmaa_session,
run_locally = options.local_run,
job_other_options = job_other_options)
The complete code is available :ref:`here <using_ruffus.drmaa_wrapper>`
* :ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` is a convenience wrapper around the `python drmaa bindings <https://github.com/drmaa-python/drmaa-python>`__
`RunJob <http://drmaa-python.readthedocs.org/en/latest/tutorials.html#waiting-for-a-job>`__ function.
It takes care of writing drmaa *job templates* for you.
* Each call creates a separate drmaa *job template*.
==================================================================================================
4) Use multithread: :ref:`pipeline_run(multithread = NNN) <pipeline_functions.pipeline_run>`
==================================================================================================
.. warning ::
:ref:`drmaa_wrapper.run_job()<drmaa_wrapper.run_job>`
**requires** ``pipeline_run`` :ref:`(multithread = NNN)<pipeline_functions.pipeline_run>`
**and will not work with** ``pipeline_run`` :ref:`(multiprocess = NNN)<pipeline_functions.pipeline_run>`
Using multithreading rather than multiprocessing
* allows the drmaa session to be shared
* prevents "processing storms" which lock up the queue submission node when hundreds or thousands of grid engine / cluster commands complete at the same time.
.. code-block:: python
pipeline_run (..., multithread = NNN, ...)
or if you are using ruffus.cmdline:
.. code-block:: python
cmdline.run (options, multithread = options.jobs)
Normally multithreading reduces the amount of parallelism in python due to the python `Global interpreter Lock (GIL) <http://en.wikipedia.org/wiki/Global_Interpreter_Lock>`__.
However, as the work load is almost entirely on another computer (i.e. a cluster / grid engine node) with a separate python interpreter, any cost benefit calculations of this sort are moot.
==================================================================================================
5) Develop locally
==================================================================================================
:ref:`drmaa_wrapper.run_job() <drmaa_wrapper.run_job>` provides two convenience parameters for developing grid engine pipelines:
* commands can run locally, i.e. on the local machine rather than on cluster nodes:
.. code-block:: python
run_job(cmd_str, run_locally = True)
* Output files can be `touch <http://en.wikipedia.org/wiki/Touch_(Unix)>`__\ed, i.e. given the appearance of the work having being done without actually running the commands
.. code-block:: python
run_job(cmd_str, touch_only = True)
.. index::
pair: pipeline_run touch mode; Tutorial
pair: touch mode pipeline_run; Tutorial
.. _new_manual.pipeline_run_touch:
********************************************************************************************
Forcing a pipeline to appear up to date
********************************************************************************************
Sometimes, we *know* that a pipeline has run to completion, that everything is up-to-date. However, Ruffus still insists on the basis
of file modification times that you need to rerun.
For example, sometimes a trivial accounting modification needs to be made to a data file.
Even though you know that this changes nothing in practice, Ruffus will detect the modification and
ask to rerun everything from that point forwards.
One way to convince Ruffus that everything is fine is to manually `touch <http://en.wikipedia.org/wiki/Touch_(Unix)>`__
all subsequent data files one by one in sequence so that the file timestamps follow the appropriate progression.
You can also ask *Ruffus* to do this automatically for you by running the pipeline in `touch <http://en.wikipedia.org/wiki/Touch_(Unix)>`__
mode:
.. code-block:: python
pipeline_run( touch_files_only = True)
:ref:`pipeline_run <pipeline_functions.pipeline_run>` will run your pipeline script normally working backwards from any specified final target, or else the
last task in the pipeline. It works out where it should begin running, i.e. with the first out-of-date data files.
After that point, instead of calling your pipeline task functions, each missing or out-of-date file is
`touch-ed <http://en.wikipedia.org/wiki/Touch_(Unix)>`__ in turn so that the file modification dates
follow on successively.
This turns out to be useful way to check that your pipeline runs correctly by creating a series of dummy (empty files).
However, *Ruffus* does not know how to read your mind to know which files to create from :ref:`@split <decorators.split>` or
:ref:`@subdivide <decorators.subdivide>` tasks.
Using :ref:`ruffus.cmdline <new_manual.cmdline>` from version 2.4, you can just specify:
.. code-block:: bash
your script --touch_files_only [--other_options_of_your_own_etc]
|