File: utils.py

package info (click to toggle)
python-pbcommand 2.1.1%2Bgit20231020.28d1635-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,016 kB
  • sloc: python: 7,676; makefile: 220; sh: 73
file content (118 lines) | stat: -rw-r--r-- 3,016 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# This is not public. Might want to move this into service_access_layer
from collections import defaultdict

from .models import ServiceJob, JobStates, JobTypes
# for backward compatibility
from pbcommand.utils import to_ascii


def _jobs_by_state_gen(sal, job_states):
    """:type sal: ServiceAccessLayer"""

    states = job_states if isinstance(
        job_states, (tuple, list)) else [job_states]

    jobs = sal.get_analysis_jobs()
    for job in jobs:
        sjob = ServiceJob.from_d(job)
        if sjob.state in states:
            yield sjob


def get_failed_jobs(sal):
    return sorted(_jobs_by_state_gen(sal, JobStates.FAILED),
                  key=lambda x: x.created_at, reverse=True)


def jobs_summary(jobs):
    """dict(state) -> count (int) """
    states_counts = defaultdict(lambda: 0)
    if jobs:
        for job in jobs:
            states_counts[job.state] += 1

    return states_counts


def to_jobs_summary(jobs, header=None):
    """Return string of jobs summary"""
    header = "Jobs" if header is None else header

    # Make easier to handle Option[Seq[Job]]
    xjobs = [] if jobs is None else jobs

    outs = []
    x = outs.append
    states_counts = jobs_summary(xjobs)
    x("{h} {n}".format(n=len(xjobs), h=header))
    for state, c in states_counts.items():
        x("State {s} {c}".format(c=c, s=state))

    return "\n".join(outs)


def to_all_job_types_summary(sal, sep="*****"):

    # only  use a subset of the job types

    funcs = [(JobTypes.IMPORT_DS, sal.get_import_dataset_jobs),
             (JobTypes.MERGE_DS, sal.get_merge_dataset_jobs),
             (JobTypes.CONVERT_FASTA, sal.get_fasta_convert_jobs),
             (JobTypes.ANALYSIS, sal.get_analysis_jobs)]

    outs = []
    x = outs.append
    x("All Job types Summary")
    x(sep)
    for name, func in funcs:
        out = to_jobs_summary(func(), header="{n} Jobs".format(n=name))
        x(out)
        x(sep)

    return "\n".join(outs)


def to_all_datasets_summary(sal, sep="****"):

    ds_types = [("SubreadSets", sal.get_subreadsets),
                ("HdfSubreadSets", sal.get_hdfsubreadsets),
                ("ReferenceSets", sal.get_referencesets),
                ("AlignmentSets", sal.get_alignmentsets),
                #("ConsensusSets", sal.get_ccsreadsets)
                ]

    outs = []
    x = outs.append
    x("Dataset Summary")
    x(sep)
    for name, func in ds_types:
        d = func()
        ndatasets = len(d)
        x("{n} {d}".format(n=name, d=ndatasets))

    return "\n".join(outs)


def to_sal_summary(sal):
    """
    :type sal: ServiceAccessLayer
    :rtype: str
    """

    status = sal.get_status()
    outs = []

    x = outs.append

    sep = "-" * 30

    x(repr(sal))
    x("SystemId : {}".format(status['id']))
    x("Version  : {}".format(status['version']))
    x("Status   : {}".format(status['message']))
    x(sep)
    x(to_all_datasets_summary(sal, sep=sep))
    x(sep)
    x(to_all_job_types_summary(sal, sep=sep))

    return "\n".join(outs)