File: create.py

package info (click to toggle)
firefox 147.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,683,324 kB
  • sloc: cpp: 7,607,156; javascript: 6,532,492; ansic: 3,775,158; python: 1,415,368; xml: 634,556; asm: 438,949; java: 186,241; sh: 62,751; makefile: 18,079; objc: 13,092; perl: 12,808; yacc: 4,583; cs: 3,846; pascal: 3,448; lex: 1,720; ruby: 1,003; php: 436; lisp: 258; awk: 247; sql: 66; sed: 54; csh: 10; exp: 6
file content (123 lines) | stat: -rw-r--r-- 4,865 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import logging
import sys
from concurrent import futures

from slugid import nice as slugid

from taskgraph.util import json
from taskgraph.util.parameterization import resolve_timestamps
from taskgraph.util.taskcluster import CONCURRENCY, get_session, get_taskcluster_client
from taskgraph.util.time import current_json_time

logger = logging.getLogger(__name__)

# this is set to true for `mach taskgraph action-callback --test`
testing = False


def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id):
    taskid_to_label = {t: l for l, t in label_to_taskid.items()}

    # when running as an actual decision task, we use the decision task's
    # taskId as the taskGroupId.  The process that created the decision task
    # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
    # fall back to a slugid
    scheduler_id = "{}-level-{}".format(graph_config["trust-domain"], params["level"])

    # Add the taskGroupId, schedulerId and optionally the decision task
    # dependency
    for task_id in taskgraph.graph.nodes:
        task_def = taskgraph.tasks[task_id].task

        # if this task has no dependencies *within* this taskgraph, make it
        # depend on this decision task. If it has another dependency within
        # the taskgraph, then it already implicitly depends on the decision
        # task.  The result is that tasks do not start immediately. if this
        # loop fails halfway through, none of the already-created tasks run.
        if not any(t in taskgraph.tasks for t in task_def.get("dependencies", [])):
            task_def.setdefault("dependencies", []).append(decision_task_id)

        task_def["taskGroupId"] = decision_task_id
        task_def["schedulerId"] = scheduler_id

    # If `testing` is True, then run without parallelization
    concurrency = CONCURRENCY if not testing else 1
    session = get_session()
    with futures.ThreadPoolExecutor(concurrency) as e:
        fs = {}

        # We can't submit a task until its dependencies have been submitted.
        # So our strategy is to walk the graph and submit tasks once all
        # their dependencies have been submitted.
        tasklist = set(taskgraph.graph.visit_postorder())
        alltasks = tasklist.copy()

        def schedule_tasks():
            # bail out early if any futures have failed
            if any(f.done() and f.exception() for f in fs.values()):
                return

            to_remove = set()
            new = set()

            def submit(task_id, label, task_def):
                fut = e.submit(create_task, session, task_id, label, task_def)
                new.add(fut)
                fs[task_id] = fut

            for task_id in tasklist:
                task_def = taskgraph.tasks[task_id].task
                # If we haven't finished submitting all our dependencies yet,
                # come back to this later.
                # Some dependencies aren't in our graph, so make sure to filter
                # those out
                deps = set(task_def.get("dependencies", [])) & alltasks
                if any((d not in fs or not fs[d].done()) for d in deps):
                    continue

                submit(task_id, taskid_to_label[task_id], task_def)
                to_remove.add(task_id)

                # Schedule tasks as many times as task_duplicates indicates
                attributes = taskgraph.tasks[task_id].attributes
                for i in range(1, attributes.get("task_duplicates", 1)):
                    # We use slugid() since we want a distinct task id
                    submit(slugid(), taskid_to_label[task_id], task_def)
            tasklist.difference_update(to_remove)

            # as each of those futures complete, try to schedule more tasks
            for f in futures.as_completed(new):
                schedule_tasks()

        # start scheduling tasks and run until everything is scheduled
        schedule_tasks()

        # check the result of each future, raising an exception if it failed
        for f in futures.as_completed(fs.values()):
            f.result()


def create_task(session, task_id, label, task_def):
    # Resolve timestamps
    now = current_json_time(datetime_format=True)
    task_def = resolve_timestamps(now, task_def)

    if testing:
        json.dump(
            [task_id, task_def],
            sys.stdout,
            sort_keys=True,
            indent=2,
        )
        # add a newline
        print("")
        return

    logger.info(f"Creating task with taskId {task_id} for {label}")
    queue = get_taskcluster_client("queue")
    queue.createTask(task_id, task_def)