File: models.py

package info (click to toggle)
python-pbcommand 2.1.1%2Bgit20231020.28d1635-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,016 kB
  • sloc: python: 7,676; makefile: 220; sh: 73
file content (463 lines) | stat: -rw-r--r-- 15,389 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
"""
Services Specific Data Models
"""

import json
import uuid
from collections import namedtuple
import os

import iso8601
from requests.exceptions import RequestException

from pbcommand.utils import to_ascii

__all__ = [
    'ServiceJob',
    'ServiceEntryPoint',
    'JobEntryPoint',
    'JobStates',
    'JobTypes',
]


# This are mirrored from the BaseSMRTServer
class LogLevels:
    TRACE = "TRACE"
    DEBUG = "DEBUG"
    INFO = "INFO"
    NOTICE = "NOTICE"
    WARN = "WARN"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"
    FATAL = "FATAL"

    ALL = (TRACE, DEBUG, INFO, NOTICE, WARN, ERROR, CRITICAL, FATAL)

    @classmethod
    def is_valid(cls, level):
        return level in cls.ALL


SERVICE_LOGGER_RESOURCE_ID = "pbsmrtpipe"

LogResource = namedtuple("LogResource", "id name description")
LogMessage = namedtuple("LogMessage", "sourceId level message")

PbsmrtpipeLogResource = LogResource(SERVICE_LOGGER_RESOURCE_ID, "Pbsmrtpipe",
                                    "Secondary Analysis Pbsmrtpipe Job logger")


class ServiceJob:

    def __init__(self, ix, job_uuid, name, state, path, job_type, created_at,
                 settings,
                 is_active=True,
                 smrtlink_version=None,
                 created_by=None,
                 updated_at=None,
                 error_message=None,
                 imported_at=None,
                 job_updated_at=None,
                 created_by_email=None,
                 is_multi_job=False,
                 tags="",
                 parent_multi_job_id=None,
                 workflow=None,
                 project_id=1,
                 job_started_at=None,
                 job_completed_at=None,
                 sub_job_type_id=None,
                 external_job_id=None):
        """

        :param ix: Job Integer Id
        :param job_uuid: Globally unique Job UUID
        :param name:  Display name of job
        :param state:  Job State
        :param path: Absolute Path to the job directory
        :param job_type:  Job Type
        :param created_at: when the job was created at
        :param settings: dict of job specific settings
        :param is_active:  If the Job is active (only active jobs are displayed in the SL UI)
        :param smrtlink_version: SMRT Link Version (if known)
        :param created_by: User that created the job
        :param updated_at: when the last update of the job occurred
        :param error_message: Error message if the job has failed
        :param job_started_at: Job start time (if the job has started running)
        :param job_completed_at: Job completed time (if the job has completed)

        :type ix: int
        :type job_uuid: str
        :type name: str
        :type state: str
        :type job_type: str
        :type created_at: DateTime
        :type updated_at: DateTime | None
        :type job_updated_at: DateTime | None
        :type settings: dict
        :type is_active: bool
        :type smrtlink_version: str | None
        :type created_by: str | None
        :type created_by_email: str | None
        :type error_message: str | None
        :type is_multi_job: bool
        :type tags: str
        :type workflow: dict | None
        :type project_id: int
        """
        self.id = int(ix)
        # validation
        _ = uuid.UUID(job_uuid)
        self.uuid = job_uuid
        self.name = name
        self.state = state
        self.path = path
        self.job_type = job_type
        self.created_at = created_at
        self.settings = settings
        self.is_active = is_active
        self.smrtlink_version = smrtlink_version
        self.created_by = created_by
        self.created_by_email = created_by_email
        # Is this Option[T] or T?
        self.updated_at = updated_at
        self.error_message = error_message
        self.imported_at = imported_at
        # Job was imported from another system
        self.job_updated_at = updated_at if job_updated_at is None else job_updated_at
        self.is_multi_job = is_multi_job
        self.tags = tags
        self.parent_multi_job_id = parent_multi_job_id
        # for MultiJob state
        self.workflow = {} if workflow is None else workflow
        self.project_id = project_id
        self.sub_job_type_id = sub_job_type_id
        self.external_job_id = external_job_id

        # Prior to SL 6.0.X, there was a lack of clear mechanism of communication of the
        # job start and completed at time stamps, the job created at was used.
        # The created_at refers to the data model entity, not when the job is run.
        # Note this is only defined when the job has been completed
        self.job_started_at = job_started_at
        self.job_completed_at = job_completed_at

    def __repr__(self):
        # truncate the name to avoid having a useless repr
        max_length = 15
        if len(self.name) >= max_length:
            name = self.name[:max_length] + "..."
        else:
            name = self.name

        created_by = "Unknown" if self.created_by is None else self.created_by

        ix = str(self.id).rjust(5)
        state = self.state.rjust(11)
        # simpler format
        created_at = self.created_at.strftime("%m-%d-%Y %I:%M.%S")

        # this should really use humanize. But this would take forever
        # to get into the nightly build
        def _format_dt(n_seconds):
            if n_seconds >= 60:
                # for most cases, you don't really don't
                # care about the seconds
                return "{m} min ".format(m=int(n_seconds // 60))
            else:
                return "{s:.2f} sec".format(s=n_seconds)

        run_time = "NA" if self.run_time_sec is None else _format_dt(
            self.run_time_sec)

        _d = dict(k=self.__class__.__name__,
                  i=ix,
                  n=name,
                  c=created_at,
                  u=self.uuid,
                  s=state, b=created_by,
                  r=run_time)

        return "<{k} i:{i} state:{s} created:{c} by:{b} name:{n} runtime: {r} >".format(
            **_d)

    @property
    def execution_time_sec(self):
        """
        Return the Job Execution time (in sec) for completed jobs or return None for
        non-completed jobs

        Note, for Jobs from SL > 6.0.0, this was not defined and will always return None

        :rtype: None | int
        """
        if self.job_started_at is not None:
            if self.job_completed_at is not None:
                return (self.job_completed_at -
                        self.job_started_at).total_seconds()
        return None

    @property
    def run_time_sec(self):
        """
        Note, prior to SL 6.0.X, jobs did not have a well defined job start/complete mechanism
        and the Job entity timestamps were used. This has assumptions that the Job is
        started when the job is created. This is often not true.

        For completed jobs from SL version >= 6.0.x, use execution_time_sec.

        :rtype: None | int
        """
        if self.job_updated_at is not None:
            return (self.job_updated_at - self.created_at).total_seconds()

        return None

    @staticmethod
    def from_d(d):
        """Convert from Service JSON response to `ServiceJob` instance"""

        def sx(x):
            return d[x]

        def s_or(x, default=None):
            return d.get(x, default)

        # Convert to string key value to ascii
        def se(x):
            return to_ascii(sx(x))

        def se_or(x, default=None):
            v = s_or(x, default=default)
            return v if v is None else to_ascii(v)

        def to_t(x):
            return iso8601.parse_date(se(x))

        def to_d(x):
            # the "jsonSettings" are a string for some stupid reason
            return json.loads(sx(x))

        def to_opt_datetime(k):
            x = s_or(k)
            return iso8601.parse_date(x) if x is not None else None

        ix = int(sx('id'))
        job_uuid = sx('uuid')
        name = se('name')
        state = se('state')
        path = se('path')
        job_type = se('jobTypeId')
        created_at = to_t('createdAt')
        updated_at = to_opt_datetime('updatedAt')
        job_started_at = to_opt_datetime("jobStartedAt")
        job_completed_at = to_opt_datetime("jobCompletedAt")
        job_updated_at = to_opt_datetime('jobUpdatedAt')
        imported_at = to_opt_datetime('importedAt')

        project_id = s_or("projectId", 1)

        smrtlink_version = se_or("smrtlinkVersion")
        error_message = se_or("errorMessage")
        created_by = se_or("createdBy")
        created_by_email = se_or('createdByEmail')
        is_active = d.get('isActive', True)
        settings = se_or('jsonSettings')
        sub_job_type_id = se_or("subJobTypeId")
        external_job_id = se_or("externalJobId")

        is_multi_job = d.get("isMultiJob", False)
        parent_multi_job_id = s_or("parentMultiJobId")

        workflow = json.loads(se_or("workflow", "{}"))

        tags = se_or("tags", "")

        return ServiceJob(ix,
                          job_uuid,
                          name,
                          state,
                          path,
                          job_type,
                          created_at,
                          settings, is_active=is_active,
                          smrtlink_version=smrtlink_version,
                          created_by=created_by,
                          created_by_email=created_by_email,
                          updated_at=updated_at,
                          error_message=error_message,
                          imported_at=imported_at,
                          job_updated_at=job_updated_at,
                          project_id=project_id,
                          tags=tags,
                          is_multi_job=is_multi_job,
                          parent_multi_job_id=parent_multi_job_id,
                          workflow=workflow,
                          job_started_at=job_started_at,
                          job_completed_at=job_completed_at,
                          sub_job_type_id=sub_job_type_id,
                          external_job_id=external_job_id)

    def was_successful(self):
        """ :rtype: bool """
        return self.state == JobStates.SUCCESSFUL


class JobExeError(ValueError):
    """Service Job failed to complete successfully"""
    pass


class SmrtServerConnectionError(RequestException):
    """This is blunt to catch all status related errors"""
    pass


class SMRTServiceBaseError(Exception):
    """Fundamental Error datastructure in SMRT Server"""

    def __init__(self, http_code, error_type, message, **kwargs):
        self.http_code = http_code
        self.error_type = error_type
        self.msg = message
        message = "Http code={h} msg={m} type={t}".format(
            h=http_code, m=message, t=error_type)
        super().__init__(message)

    @staticmethod
    def from_d(d):
        """Convert from SMRT Link Service Error JSON response to `SMRTServiceBaseError` instance"""
        return SMRTServiceBaseError(
            d['httpCode'], d['errorType'], d['message'])


# "Job" is the raw output from the jobs/1234
JobResult = namedtuple("JobResult", "job run_time errors")


class JobTask(namedtuple(
        "JobTask", "task_uuid job_id task_id task_type name state created_at updated_at error_message")):

    @staticmethod
    def from_d(d):
        return JobTask(d['uuid'], d['jobId'], d['taskId'], d['taskTypeId'],
                       d['name'], d['state'], d['createdAt'],
                       d['updatedAt'], d.get('errorMessage'))


def _to_resource_id(x):
    if isinstance(x, int):
        return x
    try:
        _ = uuid.UUID(x)
        return x
    except ValueError as e:
        raise ValueError(
            "Resource id '{x}' must be given as int or uuid".format(
                x=x))


class ServiceEntryPoint:
    """Entry Points to initialize Pipelines"""

    def __init__(self, entry_id, dataset_type, path_or_uri):
        self.entry_id = entry_id
        self.dataset_type = dataset_type
        # int (only supported), UUID or path to XML dataset will be added
        self._resource = path_or_uri

    @property
    def resource(self):
        """Backwards compatible with path_or_uri"""
        return self._resource

    def __repr__(self):
        _d = dict(k=self.__class__.__name__, e=self.entry_id,
                  r=self._resource, d=self.dataset_type)
        return "<{k} {e} {d} {r} >".format(**_d)

    @staticmethod
    def from_d(d):
        """Convert from Service JSON response to `ServiceEntryPoint` instance"""
        i = _to_resource_id(d['datasetId'])
        return ServiceEntryPoint(
            to_ascii(d['entryId']), to_ascii(d['fileTypeId']), i)

    def to_d(self):
        return dict(entryId=self.entry_id,
                    fileTypeId=self.dataset_type,
                    datasetId=self.resource)


class JobEntryPoint(namedtuple("JobEntryPoint",
                               "job_id dataset_uuid dataset_metatype")):
    """ Returned from the Services /job/1234/entry-points """
    @staticmethod
    def from_d(d):
        """Convert from Service JSON response to `JobEntryPoint` instance"""
        return JobEntryPoint(d['jobId'], d['datasetUUID'], d['datasetType'])


class JobStates:
    """Allowed SMRT Link Service Job states"""
    CREATED = "CREATED"
    SUBMITTED = "SUBMITTED"
    RUNNING = "RUNNING"
    FAILED = "FAILED"
    SUCCESSFUL = "SUCCESSFUL"
    TERMINATED = "TERMINATED"
    ABORTED = "ABORTED"

    ALL = (RUNNING, CREATED, FAILED, SUCCESSFUL, SUBMITTED)

    # End points
    ALL_COMPLETED = (FAILED, SUCCESSFUL, TERMINATED, ABORTED)
    ALL_FAILED = (FAILED, TERMINATED, ABORTED)


class JobTypes:
    """SMRT Link Analysis JOb Types"""
    IMPORT_DS = "import-dataset"
    IMPORT_DSTORE = "import-datastore"
    MERGE_DS = "merge-datasets"
    CROMWELL = "cromwell"
    ANALYSIS = "analysis"
    MOCK_PB_PIPE = "mock-pbsmrtpipe"
    CONVERT_FASTA = 'convert-fasta-reference'

    @classmethod
    def ALL(cls):
        """ALL allowed SL Analysis Job Types"""
        return (cls.IMPORT_DS, cls.IMPORT_DSTORE, cls.MERGE_DS,
                cls.CROMWELL, cls.MOCK_PB_PIPE, cls.CONVERT_FASTA)


class ServiceResourceTypes:
    REPORTS = "reports"
    DATASTORE = "datastore"
    ENTRY_POINTS = "entry-points"


def add_smrtlink_server_args(p):
    DEFAULT_HOST = os.environ.get("PB_SERVICE_HOST", "localhost")
    DEFAULT_PORT = int(os.environ.get("PB_SERVICE_PORT", 8000))
    DEFAULT_USER = os.environ.get("PB_SERVICE_AUTH_USER", None)
    DEFAULT_PASSWORD = os.environ.get("PB_SERVICE_AUTH_PASSWORD", None)
    p.add_argument("--host",
                   action="store",
                   default=DEFAULT_HOST,
                   help="SL Server Hostname")
    p.add_argument("--port",
                   action="store",
                   type=int,
                   default=DEFAULT_PORT,
                   help="SL Server Port")
    p.add_argument("--user",
                   action="store",
                   default=DEFAULT_USER,
                   help="SL Server User Name")
    p.add_argument("--password",
                   action="store",
                   default=DEFAULT_PASSWORD,
                   help="SL Server Password")
    return p