1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
import tempfile
from pbcommand.models.legacy import Pipeline
from pbcommand.pb_io.common import load_pipeline_interface_from
class TestLegacyModels:
def test_load_pipeline_from_json(self):
pipeline_json = """
{
"_comment": "Automatically generated by pbcromwell.wdl2json",
"description": "Cromwell workflow dev_diagnostic_subreads",
"entryPoints": [
{
"entryId": "eid_subread",
"fileTypeId": "PacBio.DataSet.SubreadSet",
"name": "Entry eid_subread",
"optional": false
},
{
"entryId": "eid_ref_dataset",
"fileTypeId": "PacBio.DataSet.ReferenceSet",
"name": "Entry eid_ref_dataset",
"optional": true
}
],
"id": "cromwell.workflows.dev_diagnostic_subreads",
"name": "SubreadSet Diagnostics Workflow",
"options": [],
"schemaVersion": "2.0.0",
"tags": [
"dev",
"cromwell"
],
"taskOptions": [
{
"default": 0,
"description": "Cromwell workflow option exit_code",
"id": "exit_code",
"name": "Exit code",
"optionTypeId": "integer"
},
{
"default": false,
"description": "Cromwell workflow option emit_warn_alarm",
"id": "emit_warn_alarm",
"name": "Emit warn alarm",
"optionTypeId": "boolean"
},
{
"default": false,
"description": "Cromwell workflow option emit_error_alarm",
"id": "emit_error_alarm",
"name": "Emit error alarm",
"optionTypeId": "boolean"
}
],
"version": "0.5.0"
}"""
json_file = tempfile.NamedTemporaryFile(suffix=".json").name
with open(json_file, "w") as json_out:
json_out.write(pipeline_json)
p = load_pipeline_interface_from(json_file)
assert p.pipeline_id == "cromwell.workflows.dev_diagnostic_subreads"
s = p.summary()
|