1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
|
.. include:: ../../global.inc
.. include:: manual_chapter_numbers.inc
.. index::
pair: Up to date; Tutorial
pair: Task completion; Tutorial
pair: Exceptions; Tutorial
pair: Interrupted Pipeline; Tutorial
.. _new_manual.checkpointing:
######################################################################################################
|new_manual.checkpointing.chapter_num|: Checkpointing: Interrupted Pipelines and Exceptions
######################################################################################################
.. seealso::
* :ref:`Manual Table of Contents <new_manual.table_of_contents>`
.. note::
Remember to look at the example code:
* :ref:`new_manual.checkpointing.code`
***************************************
Overview
***************************************
.. image:: ../../images/theoretical_pipeline_schematic.png
:scale: 50
Computational pipelines transform your data in stages until the final result is produced.
By default, *Ruffus* uses file modification times for the **input** and **output** to determine
whether each stage of a pipeline is up-to-date or not. But what happens when the task
function is interrupted, whether from the command line or by error, half way through writing the output?
In this case, the half-formed, truncated and corrupt **Output** file will look newer than its **Input** and hence up-to-date.
.. index::
pair: Tutorial; interrupting tasks
.. _new_manual.interrupting_tasks:
***************************************
Interrupting tasks
***************************************
Let us try with an example:
.. code-block:: python
:emphasize-lines: 20
from ruffus import *
import sys, time
# create initial files
@originate(['job1.start'])
def create_initial_files(output_file):
with open(output_file, "w") as oo: pass
#---------------------------------------------------------------
#
# long task to interrupt
#
@transform(create_initial_files, suffix(".start"), ".output")
def long_task(input_files, output_file):
with open(output_file, "w") as ff:
ff.write("Unfinished...")
# sleep for 2 seconds here so you can interrupt me
sys.stderr.write("Job started. Press ^C to interrupt me now...\n")
time.sleep(2)
ff.write("\nFinished")
sys.stderr.write("Job completed.\n")
# Run
pipeline_run([long_task])
When this script runs, it pauses in the middle with this message::
Job started. Press ^C to interrupt me now...
If you interrupted the script by pressing Control-C at this point, you will see that ``job1.output`` contains only ``Unfinished...``.
However, if you should rerun the interrupted pipeline again, Ruffus ignores the corrupt, incomplete file:
.. code-block:: pycon
>>> pipeline_run([long_task])
Job started. Press ^C to interrupt me now...
Job completed
And if you had run ``pipeline_printout``:
.. code-block:: pycon
:emphasize-lines: 8
>>> pipeline_printout(sys.stdout, [long_task], verbose=3)
________________________________________
Tasks which will be run:
Task = long_task
Job = [job1.start
-> job1.output]
# Job needs update: Previous incomplete run leftover: [job1.output]
We can see that *Ruffus* magically knows that the previous run was incomplete, and that ``job1.output`` is detritus that needs to be discarded.
.. _new_manual.logging_completed_jobs:
******************************************
Checkpointing: only log completed jobs
******************************************
All is revealed if you were to look in the working directory. *Ruffus* has created a file called ``.ruffus_history.sqlite``.
In this `SQLite <https://sqlite.org/>`_ database, *Ruffus* logs only those files which are the result of a completed job,
all other files are suspect.
This file checkpoint database is a fail-safe, not a substitute for checking file modification times. If the **Input** or **Output** files are
modified, the pipeline will rerun.
By default, *Ruffus* saves only file timestamps to the SQLite database but you can also add a checksum of the pipeline task function body or parameters.
This behaviour can be controlled by setting the ``checksum_level`` parameter
in ``pipeline_run()``. For example, if you do not want to save any timestamps or checksums:
.. code-block:: python
pipeline_run(checksum_level = 0)
CHECKSUM_FILE_TIMESTAMPS = 0 # only rerun when the file timestamps are out of date (classic mode)
CHECKSUM_HISTORY_TIMESTAMPS = 1 # Default: also rerun when the history shows a job as being out of date
CHECKSUM_FUNCTIONS = 2 # also rerun when function body has changed
CHECKSUM_FUNCTIONS_AND_PARAMS = 3 # also rerun when function parameters or function body change
.. note::
Checksums are calculated from the `pickled <http://docs.python.org/2/library/pickle.html>`_ string for the function code and parameters.
If pickling fails, Ruffus will degrade gracefully to saving just the timestamp in the SQLite database.
.. _new_manual.history_files_cannot_be_shared:
****************************************************************************
Do not share the same checkpoint file across for multiple pipelines!
****************************************************************************
The name of the Ruffus python script is not saved in the checkpoint file along side timestamps and checksums.
That means that you can rename your pipeline source code file without having to rerun the pipeline!
The tradeoff is that if multiple pipelines are run from the same directory, and save their histories to the
same SQlite database file, and if their file names overlap (all of these are bad ideas anyway!), this is
bound to be a source of confusion.
Luckily, the name and path of the checkpoint file can be also changed for each pipeline
.. _new_manual.changing_history_file_name:
****************************************************************************
Setting checkpoint file names
****************************************************************************
.. warning::
Some file systems do not appear to support SQLite at all:
There are reports that SQLite databases have `file locking problems <http://beets.radbox.org/blog/sqlite-nightmare.html>`_ on Lustre.
The best solution would be to keep the SQLite database on an alternate compatible file system away from the working directory if possible.
============================================================================================================================================================
environment variable ``DEFAULT_RUFFUS_HISTORY_FILE``
============================================================================================================================================================
The name of the checkpoint file is the value of the environment variable ``DEFAULT_RUFFUS_HISTORY_FILE``.
export DEFAULT_RUFFUS_HISTORY_FILE=/some/where/.ruffus_history.sqlite
This gives considerable flexibility, and allows a system-wide policy to be set so that all Ruffus checkpoint files are set logically to particular paths.
.. note::
It is your responsibility to make sure that the requisite destination directories for the checkpoint files exist beforehand!
Where this is missing, the checkpoint file defaults to ``.ruffus_history.sqlite`` in your working directory
============================================================================================================================================================
Setting the checkpoint file name manually
============================================================================================================================================================
This checkpoint file name can always be overridden as a parameter to Ruffus functions:
.. code-block:: python
pipeline_run(history_file = "XXX")
pipeline_printout(history_file = "XXX")
pipeline_printout_graph(history_file = "XXX")
There is also built in support in ``Ruffus.cmdline``. So if you use this module, you can simply add to your command line:
.. code-block:: bash
# use a custom checkpoint file
myscript --checksum_file_name .myscript.ruffus_history.sqlite
This takes precedence over everything else.
****************************************************************************
Useful checkpoint file name policies ``DEFAULT_RUFFUS_HISTORY_FILE``
****************************************************************************
If the pipeline script is called ``test/bin/scripts/run.me.py``, then these are the resulting checkpoint files locations:
============================================================================================================================================================
Example 1: same directory, different name
============================================================================================================================================================
If the environment variable is:
.. code-block:: bash
export DEFAULT_RUFFUS_HISTORY_FILE=.{basename}.ruffus_history.sqlite
Then the job checkpoint database for ``run.me.py`` will be ``.run.me.ruffus_history.sqlite``
.. code-block:: bash
/test/bin/scripts/run.me.py
/common/path/for/job_history/scripts/.run.me.ruffus_history.sqlite
============================================================================================================================================================
Example 2: Different directory, same name
============================================================================================================================================================
.. code-block:: bash
export DEFAULT_RUFFUS_HISTORY_FILE=/common/path/for/job_history/.{basename}.ruffus_history.sqlite
.. code-block:: bash
/common/path/for/job_history/.run.me.ruffus_history.sqlite
============================================================================================================================================================
Example 2: Different directory, same name but keep one level of subdirectory to disambiguate
============================================================================================================================================================
.. code-block:: bash
export DEFAULT_RUFFUS_HISTORY_FILE=/common/path/for/job_history/{subdir[0]}/.{basename}.ruffus_history.sqlite
.. code-block:: bash
/common/path/for/job_history/scripts/.run.me.ruffus_history.sqlite
============================================================================================================================================================
Example 2: nested in common directory
============================================================================================================================================================
.. code-block:: bash
export DEFAULT_RUFFUS_HISTORY_FILE=/common/path/for/job_history/{path}/.{basename}.ruffus_history.sqlite
.. code-block:: bash
/common/path/for/job_history/test/bin/scripts/.run.me.ruffus_history.sqlite
.. index::
pair: Tutorial; Regenerating the checkpoint file
.. _new_manual.regenerating_history_file:
******************************************************************************
Regenerating the checkpoint file
******************************************************************************
Occasionally you may need to re-generate the checkpoint file.
This could be necessary:
* because you are upgrading from a previous version of Ruffus without checkpoint file support
* on the rare occasions when the SQLite file becomes corrupted and has to deleted
* if you wish to circumvent the file checking of Ruffus after making some manual changes!
To do this, it is only necessary to call ``pipeline_run`` appropriately:
.. code-block:: python
CHECKSUM_REGENERATE = 2
pipeline(touch_files_only = CHECKSUM_REGENERATE)
Similarly, if you are using ``Ruffus.cmdline``, you can call:
.. code-block:: bash
myscript --recreate_database
Note that this regenerates the checkpoint file to reflect the existing *Input*, *Output* files on disk.
In other words, the onus is on you to make sure there are no half-formed, corrupt files. On the other hand,
the pipeline does not need to have been previously run successfully for this to work. Essentially, Ruffus,
pretends to run the pipeline, while logging all the files with consistent file modication times, stopping
at the first tasks which appear out of date or incomplete.
.. index::
pair: rules; for rerunning jobs
.. _new_manual.skip_up_to_date.rules:
******************************************************************************
Rules for determining if files are up to date
******************************************************************************
The following simple rules are used by *Ruffus*.
#. The pipeline stage will be rerun if:
* If any of the **Input** files are new (newer than the **Output** files)
* If any of the **Output** files are missing
#. In addition, it is possible to run jobs which create files from scratch.
* If no **Input** file names are supplied, the job will only run if any *output* file is missing.
#. Finally, if no **Output** file names are supplied, the job will always run.
.. index::
pair: Exception; Missing input files
******************************************************************************
Missing files generate exceptions
******************************************************************************
If the *inputs* files for a job are missing, the task function will have no way
to produce its *output*. In this case, a ``MissingInputFileError`` exception will be raised
automatically. For example,
::
task.MissingInputFileError: No way to run job: Input file ['a.1'] does not exist
for Job = ["a.1" -> "a.2", "A file"]
.. index::
pair: Manual; Timestamp resolution
******************************************************************************
Caveats: Coarse Timestamp resolution
******************************************************************************
Note that modification times have precision to the nearest second under some older file systems
(ext2/ext3?). This may be also be true for networked file systems.
*Ruffus* supplements the file system time resolution by independently recording the timestamp at
full OS resolution (usually to at least the millisecond) at job completion, when presumably the **Output**
files will have been created.
However, *Ruffus* only does this if the discrepancy between file time and system time is less than a second
(due to poor file system timestamp resolution). If there are large mismatches between the two, due for example
to network time slippage, misconfiguration etc, *Ruffus* reverts to using the file system time and adds a one second
delay between jobs (via ``time.sleep()``) to make sure input and output file stamps are different.
If you know that your filesystem has coarse-grained timestamp resolution, you can always revert to this very conservative behaviour,
at the prices of some annoying 1s pauses, by setting :ref:`pipeline_run(one_second_per_job = True) <pipeline_functions.pipeline_run>`
.. index::
pair: Manual; flag files
******************************************************************************
Flag files: Checkpointing for the paranoid
******************************************************************************
One other way of checkpointing your pipelines is to create an extra "flag" file as an additional
**Output** file name. The flag file is only created or updated when everything else in the
job has completed successifully and written to disk. A missing or out of date flag file then
would be a sign for Ruffus that the task never completed properly in the first place.
This used to be much the best way of performing checkpointing in Ruffus and is still
the most bulletproof way of proceeding. For example, even the loss or corruption
of the checkpoint file, would not affect things greatly.
Nevertheless flag files are largely superfluous in modern *Ruffus*.
|