File: taskcluster.py

package info (click to toggle)
firefox 147.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,683,324 kB
  • sloc: cpp: 7,607,156; javascript: 6,532,492; ansic: 3,775,158; python: 1,415,368; xml: 634,556; asm: 438,949; java: 186,241; sh: 62,751; makefile: 18,079; objc: 13,092; perl: 12,808; yacc: 4,583; cs: 3,846; pascal: 3,448; lex: 1,720; ruby: 1,003; php: 436; lisp: 258; awk: 247; sql: 66; sed: 54; csh: 10; exp: 6
file content (130 lines) | stat: -rw-r--r-- 3,664 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import logging

import taskcluster_urls as liburls
from taskcluster import Hooks
from taskgraph.util import taskcluster as tc_util
from taskgraph.util.taskcluster import (
    get_root_url,
    get_task_definition,
    get_taskcluster_client,
)

logger = logging.getLogger(__name__)


def get_index_url(index_path, multiple=False, block_proxy=True):
    index_tmpl = liburls.api(
        get_root_url(block_proxy=block_proxy), "index", "v1", "task{}/{}"
    )
    return index_tmpl.format("s" if multiple else "", index_path)


def insert_index(index_path, task_id, data=None):
    # Find task expiry.
    expires = get_task_definition(task_id)["expires"]

    index = get_taskcluster_client("index")
    response = index.insertTask(
        index_path,
        {
            "taskId": task_id,
            "rank": 0,
            "data": data or {},
            "expires": expires,
        },
    )
    return response


def status_task(task_id):
    """Gets the status of a task given a task_id.

    In testing mode, just logs that it would have retrieved status.

    Args:
        task_id (str): A task id.

    Returns:
        dict: A dictionary object as defined here:
          https://docs.taskcluster.net/docs/reference/platform/queue/api#status
    """
    if tc_util.testing:
        logger.info(f"Would have gotten status for {task_id}.")
    else:
        queue = get_taskcluster_client("queue")
        response = queue.status(task_id)
        if response:
            return response.get("status", {})


def state_task(task_id):
    """Gets the state of a task given a task_id.

    In testing mode, just logs that it would have retrieved state. This is a subset of the
    data returned by :func:`status_task`.

    Args:
        task_id (str): A task id.

    Returns:
        str: The state of the task, one of
          ``pending, running, completed, failed, exception, unknown``.
    """
    if tc_util.testing:
        logger.info(f"Would have gotten state for {task_id}.")
    else:
        status = status_task(task_id).get("state") or "unknown"
        return status


def trigger_hook(hook_group_id, hook_id, hook_payload):
    hooks = Hooks({"rootUrl": get_root_url()})
    response = hooks.triggerHook(hook_group_id, hook_id, hook_payload)

    logger.info(
        "Task seen here: {}/tasks/{}".format(
            get_root_url(),
            response["status"]["taskId"],
        )
    )


def list_task_group_tasks(task_group_id):
    """Generate the tasks in a task group"""
    queue = get_taskcluster_client("queue")

    tasks = []

    def pagination_handler(response):
        tasks.extend(response["tasks"])

    queue.listTaskGroup(task_group_id, paginationHandler=pagination_handler)

    return tasks


def list_task_group_incomplete_task_ids(task_group_id):
    states = ("running", "pending", "unscheduled")
    for task in [t["status"] for t in list_task_group_tasks(task_group_id)]:
        if task["state"] in states:
            yield task["taskId"]


def list_task_group_complete_tasks(task_group_id):
    tasks = {}
    for task in list_task_group_tasks(task_group_id):
        if task.get("status", {}).get("state", "") == "completed":
            tasks[task.get("task", {}).get("metadata", {}).get("name", "")] = task.get(
                "status", {}
            ).get("taskId", "")
    return tasks


def find_task(index_path):
    index = get_taskcluster_client("index")
    return index.findTask(index_path)