1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
.. include:: ../../global.inc
.. include:: manual_chapter_numbers.inc
.. index::
pair: split; Tutorial
.. _new_manual.split:
######################################################################################################
|new_manual.split.chapter_num|: Splitting up large tasks / files with **@split**
######################################################################################################
.. seealso::
* :ref:`Manual Table of Contents <new_manual.table_of_contents>`
* :ref:`@split <decorators.split>` syntax
* :ref:`Example code for this chapter <new_manual.split.code>`
**************************************************************************************
Overview
**************************************************************************************
A common requirement in computational pipelines is to split up a large task into
small jobs which can be run on different processors, (or sent to a computational
cluster). Very often, the number of jobs depends dynamically on the size of the
task, and cannot be known beforehand.
*Ruffus* uses the :ref:`@split <decorators.split>` decorator to indicate that
the :term:`task` function will produce an indeterminate number of independent *Outputs* from a single *Input*.
**************************************************************************************
Example: Calculate variance for a large list of numbers in parallel
**************************************************************************************
Suppose we wanted to calculate the `variance <http://en.wikipedia.org/wiki/Variance>`__ for
100,000 numbers, how can we parallelise the calculation so that we can get an answer as
speedily as possible?
We need to
* break down the problem into manageable chunks
* solve these in parallel, possibly on a computational cluster and then
* merge the partial solutions back together for a final result.
To complicate things, we usually do not want to hard-code the number of parallel chunks beforehand.
The degree of parallelism is often only apparent as we process our data.
**Ruffus** was designed to solve such problems which are common, for example, in bioinformatics and genomics.
A flowchart for our variance problem might look like this:
.. image:: ../../images/manual_split_merge_example.jpg
:scale: 30
(In this toy example, we create our own starting data in ``create_random_numbers()``.)
**************************************************************************************
Output files for :ref:`@split <decorators.split>`
**************************************************************************************
The *Ruffus* decorator :ref:`@split<decorators.split>` is designed specifically with this run-time flexibility in mind:
.. code-block:: python
@split(create_random_numbers, "*.chunks")
def split_problem (input_file_names, output_files):
pass
This will split the incoming ``input_file_names`` into ``NNN`` number of *outputs* where ``NNN`` is not predetermined:
The *output* (second) parameter of :ref:`@split<decorators.split>` often contains a |glob|_ pattern like the ``*.chunks`` above.
Only **after** the task function has completed, will Ruffus match the **Output** parameter (``*.chunks``)
against the files which have been created by ``split_problem()`` (e.g. ``1.chunks``, ``2.chunks``, ``3.chunks``)
**************************************************************************************
Be careful in specifying **Output** globs
**************************************************************************************
Note that it is your responsibility to keep the **Output** specification tight enough so that Ruffus does not
pick up extraneous files.
You can specify multiple |glob|_ patterns to match *all* the files which are the
result of the splitting task function. These can even cover different directories,
or groups of file names. This is a more extreme example:
::
@split("input.file", ['a*.bits', 'b*.pieces', 'somewhere_else/c*.stuff'])
def split_function (input_filename, output_files):
"Code to split up 'input.file'"
**************************************************************************************
Clean up previous pipeline runs
**************************************************************************************
Problem arise when the current directory contains results of previous pipeline runs.
* For example, if the previous analysis involved a large data set, there might be 3 chunks: ``1.chunks``, ``2.chunks``, ``3.chunks``.
* In the current analysis, there might be a smaller data set which divides into only 2 chunks, ``1.chunks`` and ``2.chunks``.
* Unfortunately, ``3.chunks`` from the previous run is still hanging around and will be included erroneously by the glob ``*.chunks``.
.. warning::
**Your first duty in** :ref:`@split <decorators.split>` **tasks functions should be to clean up**
To help you clean up thoroughly, Ruffus initialises the **output** parameter to all files which match specification.
The first order of business is thus invariably to cleanup ( delete with ``os.unlink``) all files in **Output**.
.. code-block:: python
:emphasize-lines: 11
#---------------------------------------------------------------
#
# split initial file
#
@split(create_random_numbers, "*.chunks")
def split_problem (input_file_names, output_files):
"""
splits random numbers file into xxx files of chunk_size each
"""
#
# clean up any files from previous runs
#
#for ff in glob.glob("*.chunks"):
for ff in input_file_names:
os.unlink(ff)
(The first time you run the example code, ``*.chunks`` will initialise ``output_files`` to an empty list. )
.. _new_manual.split.one_to_many:
**************************************************************************************
1 to many
**************************************************************************************
:ref:`@split <decorators.split>` is a one to many operator because its
outputs are a list of *independent* items.
If :ref:`@split <decorators.split>` generates 5 files, then this will lead to 5 jobs downstream.
This means we can just connect our old friend :ref:`@transform <decorators.transform>` to our pipeline
and the results of :ref:`@split <decorators.split>` will be analysed in parallel. This code should look
familiar:
.. code-block:: python
#---------------------------------------------------------------
#
# Calculate sum and sum of squares for each chunk file
#
@transform(split_problem, suffix(".chunks"), ".sums")
def sum_of_squares (input_file_name, output_file_name):
pass
Which results in output like this:
.. code-block:: pycon
>>> pipeline_run()
Job = [[random_numbers.list] -> *.chunks] completed
Completed Task = split_problem
Job = [1.chunks -> 1.sums] completed
Job = [10.chunks -> 10.sums] completed
Job = [2.chunks -> 2.sums] completed
Job = [3.chunks -> 3.sums] completed
Job = [4.chunks -> 4.sums] completed
Job = [5.chunks -> 5.sums] completed
Job = [6.chunks -> 6.sums] completed
Job = [7.chunks -> 7.sums] completed
Job = [8.chunks -> 8.sums] completed
Job = [9.chunks -> 9.sums] completed
Completed Task = sum_of_squares
Have a look at the :ref:`Example code for this chapter <new_manual.split.code>`
.. _new_manual.split.nothing_to_many:
**************************************************************************************
Nothing to many
**************************************************************************************
Normally we would use :ref:`@originate <new_manual.originate>` to create files from
scratch, for example at the beginning of the pipeline.
However, sometimes, it is not possible to determine ahead of time how many files you
will be creating from scratch. :ref:`@split<decorators.split>` can also be useful even in such cases:
.. code-block:: python
:emphasize-lines: 6
from random import randint
from ruffus import *
import os
# Create between 2 and 5 files
@split(None, "*.start")
def create_initial_files(no_input_file, output_files):
# cleanup first
for oo in output_files:
os.unlink(oo)
# make new files
for ii in range(randint(2,5)):
open("%d.start" % ii, "w")
@transform(create_initial_files, suffix(".start"), ".processed")
def process_files(input_file, output_file):
open(output_file, "w")
pipeline_run()
Giving:
.. code-block:: pycon
>>> pipeline_run()
Job = [None -> *.start] completed
Completed Task = create_initial_files
Job = [0.start -> 0.processed] completed
Job = [1.start -> 1.processed] completed
Completed Task = process_files
|