1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import sys
from collections.abc import Iterator
from typing import cast
import ruamel.yaml
import cwl_utils.parser as cwl
from cwl_utils.image_puller import (
DockerImagePuller,
ImagePuller,
SingularityImagePuller,
)
def arg_parser() -> argparse.ArgumentParser:
"""Argument parser."""
parser = argparse.ArgumentParser(
description="Save container images specified in a CWL document (Workflow or CommandLineTool). "
"For CWL Workflows, all steps will also be searched (recursively)."
)
parser.add_argument(
"input", help="Input CWL document (CWL Workflow or CWL CommandLineTool)"
)
parser.add_argument("--dir", help="Directory in which to save images")
parser.add_argument(
"-s",
"--singularity",
help="Use singularity to pull the image",
action="store_true",
)
parser.add_argument(
"--container-engine",
dest="container_engine",
help="Specify which command to use to run OCI containers. "
"Defaults to 'docker' (or 'singularity' if --singularity/-s is passed).",
)
parser.add_argument(
"--force-download", help="Force pulling a newer container.", action="store_true"
)
return parser
def run(args: argparse.Namespace) -> list[cwl.DockerRequirement]:
"""Extract the docker reqs and download them using Singularity or Docker."""
if args.singularity and not args.dir:
print("Error! Must specify --dir if using --singularity")
sys.exit(1)
if args.dir:
os.makedirs(args.dir, exist_ok=True)
top = cwl.load_document_by_uri(args.input)
reqs: list[cwl.DockerRequirement] = []
for req in traverse(top):
reqs.append(req)
if not req.dockerPull:
print(
"Unable to save image from due to lack of 'dockerPull':",
file=sys.stderr,
)
yaml = ruamel.yaml.YAML()
yaml.dump(req.save(), sys.stderr)
continue
if args.singularity:
image_puller: ImagePuller = SingularityImagePuller(
req.dockerPull,
args.dir,
(
args.container_engine
if args.container_engine is not None
else "singularity"
),
args.force_download,
)
else:
image_puller = DockerImagePuller(
req.dockerPull,
args.dir,
(
args.container_engine
if args.container_engine is not None
else "docker"
),
args.force_download,
)
image_puller.save_docker_image()
return reqs
def extract_docker_requirements(
process: cwl.Process,
) -> Iterator[cwl.DockerRequirement]:
"""Yield an iterator of the docker reqs, normalizing the pull request."""
for req in extract_docker_reqs(process):
if isinstance(req.dockerPull, str) and ":" not in req.dockerPull:
req.dockerPull += ":latest"
yield req
def extract_docker_reqs(process: cwl.Process) -> Iterator[cwl.DockerRequirement]:
"""For the given process, extract the DockerRequirement(s)."""
if process.requirements:
for req in process.requirements:
if isinstance(req, cwl.DockerRequirementTypes):
yield req
if process.hints:
for req in process.hints:
if isinstance(req, cwl.DockerRequirementTypes):
yield req
def traverse(process: cwl.Process) -> Iterator[cwl.DockerRequirement]:
"""Yield the iterator for the docker reqs, including an workflow steps."""
yield from extract_docker_requirements(process)
if isinstance(process, cwl.WorkflowTypes):
yield from traverse_workflow(process)
def get_process_from_step(step: cwl.WorkflowStep) -> cwl.Process:
"""Return the process for this step, loading it if necessary."""
if isinstance(step.run, str):
return cast(cwl.Process, cwl.load_document_by_uri(step.run))
return cast(cwl.Process, step.run)
def traverse_workflow(workflow: cwl.Workflow) -> Iterator[cwl.DockerRequirement]:
"""Iterate over the steps of this workflow, yielding the docker reqs."""
for step in workflow.steps:
yield from extract_docker_reqs(step)
yield from traverse(get_process_from_step(step))
def main() -> int:
"""Command line entry point."""
run(arg_parser().parse_args(sys.argv[1:]))
return 0
if __name__ == "__main__":
sys.exit(main())
|