1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
#!/usr/bin/python3
"""
Utility to obtain paths to important analysis files from SMRT Link jobs,
compatible with multiple applications and versions.
"""
import logging
import os
import os.path as op
import sys
from pbcommand.models.common import FileTypes
from pbcommand.services._service_access_layer import get_smrtlink_client, run_client_with_retry
from pbcommand.utils import setup_log
from pbcommand.cli import get_default_argparser_with_base_opts, pacbio_args_runner
log = logging.getLogger(__name__)
__version__ = "0.1.1"
class ResolverFailure(Exception):
pass
class ResourceTypes:
JOB_PATH = "path"
ALIGNMENTS = "alignments"
PREASSEMBLY = "preassembly"
POLISHED_ASSEMBLY = "polished-assembly"
MAPPING_STATS = "mapping-stats"
SUBREADS_ENTRY = "subreads-in"
ALL = [JOB_PATH, ALIGNMENTS, PREASSEMBLY,
POLISHED_ASSEMBLY, MAPPING_STATS, SUBREADS_ENTRY]
@staticmethod
def from_string(s):
if s in ResourceTypes.ALL:
return s
else:
raise KeyError("Unknown resource type '%s'" % s)
def _is_report(ds_file):
return ds_file.file_type_id == FileTypes.REPORT.file_type_id
def _is_alignments(ds_file):
return ds_file.file_type_id in [FileTypes.DS_ALIGN.file_type_id,
FileTypes.DS_ALIGN_CCS.file_type_id]
def _get_by_partial_source_id(ds_files, source_id_str):
for ds_file in ds_files:
if source_id_str in ds_file.source_id:
return ds_file.path
ALIGNMENT_SOURCES = [
"mapped", # new (Cromwell) mapping and resequencing workflows
"consolidated_xml", # new resequencing workflow
"consolidate_alignments-out-0", # old (pbsmrtpipe) resequencing pipeline
"consolidate_alignments_ccs-out-0", # old CCS mapping pipelines
"datastore_to_alignments-out-0", # old mapping/resequencing pipelines
"datastore_to_ccs_alignments-out-0" # old CCS mapping pipelines
]
def _find_alignments(datastore):
alignments = [f for f in datastore.files.values() if _is_alignments(f)]
if len(alignments) == 1:
return alignments[0].path
for source in ALIGNMENT_SOURCES:
for ds_file in alignments:
source_id = ds_file.source_id.split(".")[-1]
if source_id == source:
return ds_file.path
raise ResolverFailure("Can't find alignments output for job")
class Resolver:
def __init__(self, client):
self._client = client
def _get_job_datastore_reports(self, job_id):
datastore = self._client.get_analysis_job_datastore(job_id)
return [f for f in datastore.files.values() if _is_report(f)]
def resolve_alignments(self, job_id):
datastore = self._client.get_analysis_job_datastore(job_id)
return _find_alignments(datastore)
def resolve_preassembly_stats(self, job_id):
ds_files = self._get_job_datastore_reports(job_id)
return _get_by_partial_source_id(ds_files, "preassembly")
def resolve_polished_assembly_stats(self, job_id):
ds_files = self._get_job_datastore_reports(job_id)
return _get_by_partial_source_id(ds_files, "polished_assembly")
def resolve_mapping_stats(self, job_id):
ds_files = self._get_job_datastore_reports(job_id)
return _get_by_partial_source_id(ds_files, "mapping_stats")
def resolve_job(self, job_id):
return self._client.get_job_by_id(job_id).path
def resolve_input_subreads(self, job_id):
eps = self._client.get_analysis_job_entry_points(job_id)
subread_ids = []
for ep in eps:
if ep.dataset_metatype == FileTypes.DS_SUBREADS.file_type_id:
subread_ids.append(ep.dataset_uuid)
if len(subread_ids) == 0:
raise ResolverFailure(
"Can't find a SubreadSet entry point for this job")
elif len(subread_ids) > 1:
raise ResolverFailure(
"Multiple SubreadSet entry points found for this job")
return self._client.get_subreadset_by_id(subread_ids[0])["path"]
def run_args(args):
def _run_resolver(client):
resolver = Resolver(client)
resource = None
if args.resource_type == ResourceTypes.JOB_PATH:
resource = resolver.resolve_job(args.job_id)
elif args.resource_type == ResourceTypes.ALIGNMENTS:
resource = resolver.resolve_alignments(args.job_id)
elif args.resource_type == ResourceTypes.PREASSEMBLY:
resource = resolver.resolve_preassembly_stats(args.job_id)
elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
resource = resolver.resolve_polished_assembly_stats(args.job_id)
elif args.resource_type == ResourceTypes.MAPPING_STATS:
resource = resolver.resolve_mapping_stats(args.job_id)
elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
resource = resolver.resolve_input_subreads(args.job_id)
else:
raise NotImplementedError(
"Can't retrieve resource type '%s'" % args.resource_type)
return resource
resource = run_client_with_retry(_run_resolver,
args.host,
args.port,
args.user,
args.password)
print(resource)
if args.make_symlink is not None:
if op.exists(args.make_symlink):
os.remove(args.make_symlink)
os.symlink(resource, args.make_symlink)
return 0
def _get_parser():
p = get_default_argparser_with_base_opts(
__version__,
__doc__,
default_level=logging.WARN)
p.add_argument("job_id", help="SMRT Link analysis job ID")
p.add_argument("resource_type", nargs="?",
default=ResourceTypes.JOB_PATH,
type=ResourceTypes.from_string,
help="Resource type to resolve (choices: {c})".format(
c=", ".join(ResourceTypes.ALL)))
p.add_argument("-u", "--host", dest="host", action="store",
default=os.environ.get("PB_SERVICE_HOST", "localhost"),
help="Hostname of SMRT Link server. If this is anything other than 'localhost' you must supply authentication.")
p.add_argument("-p", "--port", dest="port", action="store", type=int,
default=int(os.environ.get("PB_SERVICE_PORT", "8081")),
help="Services port number")
p.add_argument("--user", dest="user", action="store",
default=os.environ.get("PB_SERVICE_AUTH_USER", None),
help="User to authenticate with (if using HTTPS)")
p.add_argument("--password", dest="password", action="store",
default=os.environ.get("PB_SERVICE_AUTH_PASSWORD", None),
help="Password to authenticate with (if using HTTPS)")
p.add_argument("--symlink", dest="make_symlink", action="store",
default=None,
help="If defined, will create a symlink to the retrieved file")
return p
def main(argv):
return pacbio_args_runner(argv[1:],
_get_parser(),
run_args,
log,
setup_log_func=setup_log)
if __name__ == "__main__":
sys.exit(main(sys.argv))
|