File: test_models_legacy.py

package info (click to toggle)
python-pbcommand 2.1.1%2Bgit20231020.28d1635-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,016 kB
  • sloc: python: 7,676; makefile: 220; sh: 73
file content (66 lines) | stat: -rw-r--r-- 1,813 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import tempfile

from pbcommand.models.legacy import Pipeline
from pbcommand.pb_io.common import load_pipeline_interface_from


class TestLegacyModels:

    def test_load_pipeline_from_json(self):
        pipeline_json = """
{
  "_comment": "Automatically generated by pbcromwell.wdl2json",
  "description": "Cromwell workflow dev_diagnostic_subreads",
  "entryPoints": [
    {
      "entryId": "eid_subread",
      "fileTypeId": "PacBio.DataSet.SubreadSet",
      "name": "Entry eid_subread",
      "optional": false
    },
    {
      "entryId": "eid_ref_dataset",
      "fileTypeId": "PacBio.DataSet.ReferenceSet",
      "name": "Entry eid_ref_dataset",
      "optional": true
    }
  ],
  "id": "cromwell.workflows.dev_diagnostic_subreads",
  "name": "SubreadSet Diagnostics Workflow",
  "options": [],
  "schemaVersion": "2.0.0",
  "tags": [
    "dev",
    "cromwell"
  ],
  "taskOptions": [
    {
      "default": 0,
      "description": "Cromwell workflow option exit_code",
      "id": "exit_code",
      "name": "Exit code",
      "optionTypeId": "integer"
    },
    {
      "default": false,
      "description": "Cromwell workflow option emit_warn_alarm",
      "id": "emit_warn_alarm",
      "name": "Emit warn alarm",
      "optionTypeId": "boolean"
    },
    {
      "default": false,
      "description": "Cromwell workflow option emit_error_alarm",
      "id": "emit_error_alarm",
      "name": "Emit error alarm",
      "optionTypeId": "boolean"
    }
  ],
  "version": "0.5.0"
}"""
        json_file = tempfile.NamedTemporaryFile(suffix=".json").name
        with open(json_file, "w") as json_out:
            json_out.write(pipeline_json)
        p = load_pipeline_interface_from(json_file)
        assert p.pipeline_id == "cromwell.workflows.dev_diagnostic_subreads"
        s = p.summary()