1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
|
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import logging
import os
import pathlib
import shutil
import time
from pathlib import Path
import yaml
from voluptuous import Optional
from taskgraph.actions import render_actions_json
from taskgraph.create import create_tasks
from taskgraph.generator import TaskGraphGenerator
from taskgraph.parameters import Parameters, get_version
from taskgraph.taskgraph import TaskGraph
from taskgraph.util import json
from taskgraph.util.python_path import find_object
from taskgraph.util.schema import Schema, validate_schema
from taskgraph.util.vcs import get_repository
from taskgraph.util.yaml import load_yaml
logger = logging.getLogger(__name__)
ARTIFACTS_DIR = Path("artifacts")
# For each project, this gives a set of parameters specific to the project.
# See `taskcluster/docs/parameters.rst` for information on parameters.
PER_PROJECT_PARAMETERS = {
# the default parameters are used for projects that do not match above.
"default": {
"target_tasks_method": "default",
}
}
#: Schema for try_task_config.json version 2
try_task_config_schema_v2 = Schema(
{
Optional("parameters"): {str: object},
}
)
def full_task_graph_to_runnable_tasks(full_task_json):
runnable_tasks = {}
for label, node in full_task_json.items():
if not ("extra" in node["task"] and "treeherder" in node["task"]["extra"]):
continue
th = node["task"]["extra"]["treeherder"]
runnable_tasks[label] = {"symbol": th["symbol"]}
for i in ("groupName", "groupSymbol", "collection"):
if i in th:
runnable_tasks[label][i] = th[i]
if th.get("machine", {}).get("platform"):
runnable_tasks[label]["platform"] = th["machine"]["platform"]
return runnable_tasks
def taskgraph_decision(options, parameters=None):
"""
Run the decision task. This function implements `mach taskgraph decision`,
and is responsible for
* processing decision task command-line options into parameters
* running task-graph generation exactly the same way the other `mach
taskgraph` commands do
* generating a set of artifacts to memorialize the graph
* calling TaskCluster APIs to create the graph
"""
level = logging.INFO
if options.get("verbose"):
level = logging.DEBUG
logging.root.setLevel(level)
# Handlers must have an explicit level set for them to properly filter
# messages from child loggers (such as the optimization log a few lines
# down).
for h in logging.root.handlers:
h.setLevel(level)
if not os.path.isdir(ARTIFACTS_DIR):
os.mkdir(ARTIFACTS_DIR)
# optimizations are difficult to debug after the fact, so we always
# log them at DEBUG level, and write the log as a separate artifact
opt_log = logging.getLogger("optimization")
opt_log.setLevel(logging.DEBUG)
opt_handler = logging.FileHandler(ARTIFACTS_DIR / "optimizations.log", mode="w")
if logging.root.handlers:
opt_handler.setFormatter(logging.root.handlers[0].formatter)
opt_log.addHandler(opt_handler)
parameters = parameters or (
lambda graph_config: get_decision_parameters(graph_config, options)
)
decision_task_id = os.environ["TASK_ID"]
# create a TaskGraphGenerator instance
tgg = TaskGraphGenerator(
root_dir=options.get("root"),
parameters=parameters,
decision_task_id=decision_task_id,
write_artifacts=True,
enable_verifications=options.get("verify", True),
)
# write out the parameters used to generate this graph
write_artifact("parameters.yml", dict(**tgg.parameters))
# write out the public/actions.json file
write_artifact(
"actions.json",
render_actions_json(tgg.parameters, tgg.graph_config, decision_task_id),
)
# write out the full graph for reference
full_task_json = tgg.full_task_graph.to_json()
write_artifact("full-task-graph.json", full_task_json)
# write out the public/runnable-jobs.json file
write_artifact(
"runnable-jobs.json", full_task_graph_to_runnable_tasks(full_task_json)
)
# this is just a test to check whether the from_json() function is working
_, _ = TaskGraph.from_json(full_task_json)
# write out the target task set to allow reproducing this as input
write_artifact("target-tasks.json", list(tgg.target_task_set.tasks.keys()))
# write out the optimized task graph to describe what will actually happen,
# and the map of labels to taskids
write_artifact("task-graph.json", tgg.morphed_task_graph.to_json())
write_artifact("label-to-taskid.json", tgg.label_to_taskid)
# write out current run-task and fetch-content scripts
RUN_TASK_DIR = pathlib.Path(__file__).parent / "run-task"
shutil.copy2(RUN_TASK_DIR / "run-task", ARTIFACTS_DIR)
shutil.copy2(RUN_TASK_DIR / "fetch-content", ARTIFACTS_DIR)
# actually create the graph
create_tasks(
tgg.graph_config,
tgg.morphed_task_graph,
tgg.label_to_taskid,
tgg.parameters,
decision_task_id=decision_task_id,
)
def get_decision_parameters(graph_config, options):
"""
Load parameters from the command-line options for 'taskgraph decision'.
This also applies per-project parameters, based on the given project.
"""
parameters = {
n: options[n]
for n in [
"base_repository",
"base_ref",
"base_rev",
"head_repository",
"head_rev",
"head_ref",
"head_tag",
"project",
"pushlog_id",
"pushdate",
"repository_type",
"owner",
"level",
"target_tasks_method",
"tasks_for",
]
if n in options
}
repo_path = os.getcwd()
repo = get_repository(repo_path)
try:
commit_message = repo.get_commit_message()
except UnicodeDecodeError:
commit_message = ""
# Define default filter list, as most configurations shouldn't need
# custom filters.
parameters["files_changed"] = repo.get_changed_files(
rev=parameters["head_rev"], base=parameters["base_rev"]
)
parameters["filters"] = [
"target_tasks_method",
]
parameters["optimize_strategies"] = None
parameters["optimize_target_tasks"] = True
parameters["existing_tasks"] = {}
parameters["do_not_optimize"] = []
parameters["enable_always_target"] = True
parameters["build_number"] = 1
parameters["version"] = get_version(repo_path)
parameters["next_version"] = None
# owner must be an email, but sometimes (e.g., for ffxbld) it is not, in which
# case, fake it
if "@" not in parameters["owner"]:
parameters["owner"] += "@noreply.mozilla.org"
# use the pushdate as build_date if given, else use current time
parameters["build_date"] = parameters["pushdate"] or int(time.time())
# moz_build_date is the build identifier based on build_date
parameters["moz_build_date"] = time.strftime(
"%Y%m%d%H%M%S", time.gmtime(parameters["build_date"])
)
project = parameters["project"]
try:
parameters.update(PER_PROJECT_PARAMETERS[project])
except KeyError:
logger.warning(
f"using default project parameters; add {project} to "
f"PER_PROJECT_PARAMETERS in {__file__} to customize behavior "
"for this project"
)
parameters.update(PER_PROJECT_PARAMETERS["default"])
# `target_tasks_method` has higher precedence than `project` parameters
if options.get("target_tasks_method"):
parameters["target_tasks_method"] = options["target_tasks_method"]
# ..but can be overridden by the commit message: if it contains the special
# string "DONTBUILD" and this is an on-push decision task, then use the
# special 'nothing' target task method.
if "DONTBUILD" in commit_message and (
options["tasks_for"] in ("hg-push", "github-push")
):
parameters["target_tasks_method"] = "nothing"
if options.get("optimize_target_tasks") is not None:
parameters["optimize_target_tasks"] = options["optimize_target_tasks"]
if "decision-parameters" in graph_config["taskgraph"]:
decision_params = find_object(graph_config["taskgraph"]["decision-parameters"])
assert callable(decision_params)
decision_params(graph_config, parameters)
if options.get("try_task_config_file"):
task_config_file = os.path.abspath(options.get("try_task_config_file"))
else:
# if try_task_config.json is present, load it
task_config_file = os.path.join(os.getcwd(), "try_task_config.json")
# load try settings
if ("try" in project and options["tasks_for"] == "hg-push") or options[
"tasks_for"
] == "github-pull-request":
set_try_config(parameters, task_config_file)
result = Parameters(**parameters)
result.check()
return result
def set_try_config(parameters, task_config_file):
if os.path.isfile(task_config_file):
logger.info(f"using try tasks from {task_config_file}")
with open(task_config_file) as fh:
task_config = json.load(fh)
task_config_version = task_config.pop("version")
if task_config_version == 2:
validate_schema(
try_task_config_schema_v2,
task_config,
"Invalid v2 `try_task_config.json`.",
)
parameters.update(task_config["parameters"])
return
else:
raise Exception(
f"Unknown `try_task_config.json` version: {task_config_version}"
)
def write_artifact(filename, data):
logger.info(f"writing artifact file `{filename}`")
if not os.path.isdir(ARTIFACTS_DIR):
os.mkdir(ARTIFACTS_DIR)
path = ARTIFACTS_DIR / filename
if filename.endswith(".yml"):
with open(path, "w") as f:
yaml.safe_dump(data, f, allow_unicode=True, default_flow_style=False)
elif filename.endswith(".json"):
with open(path, "w") as f:
json.dump(data, f, sort_keys=True, indent=2)
elif filename.endswith(".gz"):
import gzip # noqa: PLC0415
with gzip.open(path, "wb") as f:
f.write(json.dumps(data)) # type: ignore
else:
raise TypeError(f"Don't know how to write to {filename}")
def read_artifact(filename):
path = ARTIFACTS_DIR / filename
if filename.endswith(".yml"):
return load_yaml(path, filename)
elif filename.endswith(".json"):
with open(path) as f:
return json.load(f)
elif filename.endswith(".gz"):
import gzip # noqa: PLC0415
with gzip.open(path, "rb") as f:
return json.load(f)
else:
raise TypeError(f"Don't know how to read {filename}")
def rename_artifact(src, dest):
os.rename(ARTIFACTS_DIR / src, ARTIFACTS_DIR / dest)
|