File: test_parallel.py

package info (click to toggle)
cwltool 1.0.20181217162649%2Bdfsg-10
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,632 kB
  • sloc: python: 11,008; makefile: 153; sh: 22
file content (33 lines) | stat: -rw-r--r-- 1,265 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import json

from cwltool.context import RuntimeContext
from cwltool.executors import MultithreadedJobExecutor
from cwltool import load_tool

from .util import get_data, get_windows_safe_factory, windows_needs_docker


@windows_needs_docker
def test_sequential_workflow(tmpdir):
    load_tool.loaders = {}
    test_file = "tests/wf/count-lines1-wf.cwl"
    executor = MultithreadedJobExecutor()
    runtime_context = RuntimeContext()
    runtime_context.outdir = str(tmpdir)
    runtime_context.select_resources = executor.select_resources
    factory = get_windows_safe_factory(
        executor=executor, runtime_context=runtime_context)
    echo = factory.make(get_data(test_file))
    file_contents = {"class": "File",
                     "location": get_data("tests/wf/whale.txt")}
    assert  echo(file1=file_contents) == {"count_output": 16}

@windows_needs_docker
def test_scattered_workflow():
    load_tool.loaders = {}
    test_file = "tests/wf/scatter-wf4.cwl"
    job_file = "tests/wf/scatter-job2.json"
    factory = get_windows_safe_factory(executor=MultithreadedJobExecutor())
    echo = factory.make(get_data(test_file))
    with open(get_data(job_file)) as job:
        assert echo(**json.load(job)) == {'out': ['foo one three', 'foo two four']}