1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
# This is not public. Might want to move this into service_access_layer
from collections import defaultdict
from .models import ServiceJob, JobStates, JobTypes
# for backward compatibility
from pbcommand.utils import to_ascii
def _jobs_by_state_gen(sal, job_states):
""":type sal: ServiceAccessLayer"""
states = job_states if isinstance(
job_states, (tuple, list)) else [job_states]
jobs = sal.get_analysis_jobs()
for job in jobs:
sjob = ServiceJob.from_d(job)
if sjob.state in states:
yield sjob
def get_failed_jobs(sal):
return sorted(_jobs_by_state_gen(sal, JobStates.FAILED),
key=lambda x: x.created_at, reverse=True)
def jobs_summary(jobs):
"""dict(state) -> count (int) """
states_counts = defaultdict(lambda: 0)
if jobs:
for job in jobs:
states_counts[job.state] += 1
return states_counts
def to_jobs_summary(jobs, header=None):
"""Return string of jobs summary"""
header = "Jobs" if header is None else header
# Make easier to handle Option[Seq[Job]]
xjobs = [] if jobs is None else jobs
outs = []
x = outs.append
states_counts = jobs_summary(xjobs)
x("{h} {n}".format(n=len(xjobs), h=header))
for state, c in states_counts.items():
x("State {s} {c}".format(c=c, s=state))
return "\n".join(outs)
def to_all_job_types_summary(sal, sep="*****"):
# only use a subset of the job types
funcs = [(JobTypes.IMPORT_DS, sal.get_import_dataset_jobs),
(JobTypes.MERGE_DS, sal.get_merge_dataset_jobs),
(JobTypes.CONVERT_FASTA, sal.get_fasta_convert_jobs),
(JobTypes.ANALYSIS, sal.get_analysis_jobs)]
outs = []
x = outs.append
x("All Job types Summary")
x(sep)
for name, func in funcs:
out = to_jobs_summary(func(), header="{n} Jobs".format(n=name))
x(out)
x(sep)
return "\n".join(outs)
def to_all_datasets_summary(sal, sep="****"):
ds_types = [("SubreadSets", sal.get_subreadsets),
("HdfSubreadSets", sal.get_hdfsubreadsets),
("ReferenceSets", sal.get_referencesets),
("AlignmentSets", sal.get_alignmentsets),
#("ConsensusSets", sal.get_ccsreadsets)
]
outs = []
x = outs.append
x("Dataset Summary")
x(sep)
for name, func in ds_types:
d = func()
ndatasets = len(d)
x("{n} {d}".format(n=name, d=ndatasets))
return "\n".join(outs)
def to_sal_summary(sal):
"""
:type sal: ServiceAccessLayer
:rtype: str
"""
status = sal.get_status()
outs = []
x = outs.append
sep = "-" * 30
x(repr(sal))
x("SystemId : {}".format(status['id']))
x("Version : {}".format(status['version']))
x("Status : {}".format(status['message']))
x(sep)
x(to_all_datasets_summary(sal, sep=sep))
x(sep)
x(to_all_job_types_summary(sal, sep=sep))
return "\n".join(outs)
|