1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
|
#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Custom swarming triggering script.
This script does custom swarming triggering logic, to enable device affinity
for our bots, while lumping all trigger calls under one logical step.
For the perf use case of device affinity, this script now enables soft device
affinity. This means that it tries to smartly allocate jobs to bots based
on what is currently alive and what bot the task was last triggered on,
preferring that last triggered bot if available. If the
--multiple-trigger-configs flag is specified than this script overrides
the soft device affinity functionality in favor of the provided ids.
The algorithm is roughly the following:
Find eligible bots, healthy or not.
* Query swarming for eligible bots based on the dimensions passed in
on the swarming call. Determine their health status based on
is not quarantied and is not is_dead
Of the eligible bots determine what bot id to run the shard on.
(Implementation in _select_config_indices_with_soft_affinity)
* First query swarming for the last task that ran that shard with
given dimensions. Assuming they are returned with most recent first.
* Check if the bot id that ran that task is alive, if so trigger
on that bot again.
* If that bot isn't alive, allocate to another alive bot or if no
other alive bots exist, trigger on the same dead one.
Scripts inheriting must have roughly the same command line interface as
swarming.py trigger. It modifies it in the following ways:
* Intercepts the dump-json argument, and creates its own by combining the
results from each trigger call.
* Intercepts the dimensions from the swarming call and determines what bots
are healthy based on the above device affinity algorithm, and triggers
* Adds a tag to the swarming trigger job with the shard so we know the last
bot that ran this shard.
This script is normally called from the swarming recipe module in tools/build.
"""
import argparse
import copy
import os
import subprocess
import sys
import logging
import random
import base_test_triggerer
SRC_DIR = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(os.path.join(SRC_DIR, 'tools', 'perf'))
# //tools/perf imports.
import generate_perf_sharding
from core import bot_platforms
class Bot(object): # pylint: disable=useless-object-inheritance
"""Eligible bots to run the task."""
def __init__(self, bot_id, is_alive):
self._bot_id = bot_id
self._is_alive = is_alive
def id(self):
return self._bot_id
def is_alive(self):
return self._is_alive
def as_json_config(self):
return {'id': self._bot_id}
class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer):
def __init__(self, args, swarming_args):
# pylint: disable=super-with-arguments
super(PerfDeviceTriggerer, self).__init__()
# pylint: enable=super-with-arguments
self._sharded_query_failed = False
if not args.multiple_trigger_configs:
# Represents the list of current dimensions requested
# by the parent swarming job.
self._dimensions = _get_swarming_dimensions(swarming_args)
# Store what swarming server we need and whether or not we need
# to send down authentication with it
self._swarming_server = _get_swarming_server(swarming_args)
# Map of all existing bots in swarming that satisfy the current
# set of dimensions indexed by bot id.
# Note: this assumes perf bot dimensions are unique between
# configurations.
self._eligible_bots_by_ids = (
self._query_swarming_for_eligible_bot_configs(self._dimensions))
if args.multiple_dimension_script_verbose:
logging.basicConfig(level=logging.DEBUG)
def generate_shard_map(self, args, buildername, selected_config):
shard_map = None
num_of_shards = len(selected_config)
builder = bot_platforms.find_bot_platform(buildername)
if args.use_dynamic_shards and builder and num_of_shards:
logging.info(
'Generating dynamic shardmap for builder: %s with %d shards',
buildername, num_of_shards)
shard_map = generate_perf_sharding.GenerateShardMap(
builder=builder, num_of_shards=num_of_shards)
for shard_index, bot_index in selected_config:
bot_id = self._bot_configs[bot_index]['id']
shard_map['extra_infos']['bot #%s' % shard_index] = bot_id
return shard_map
def append_additional_args(self, args, shard_index):
# Append a tag to the swarming task with the shard number
# so we can query for the last bot that ran a specific shard.
tag = 'shard:%d' % shard_index
shard_tag = ['--tag', tag]
# Need to append this before the dash if present so it gets fed to
# the swarming task itself.
if '--' in args:
dash_ind = args.index('--')
return args[:dash_ind] + shard_tag + args[dash_ind:]
return args + shard_tag
def parse_bot_configs(self, args):
if args.multiple_trigger_configs:
# pylint: disable=super-with-arguments
super(PerfDeviceTriggerer, self).parse_bot_configs(args)
# pylint: enable=super-with-arguments
else:
self._bot_configs = []
# For each eligible bot, append the dimension
# to the eligible bot_configs
for _, bot in self._eligible_bots_by_ids.items():
self._bot_configs.append(bot.as_json_config())
def select_config_indices(self, args):
if args.multiple_trigger_configs:
configs = []
# If specific bot ids were passed in, we want to trigger a job for
# every valid config regardless of health status since
# each config represents exactly one bot in the perf swarming pool.
for index in range(len(
base_test_triggerer.indices_to_trigger(args))):
configs.append((index, index))
if args.use_dynamic_shards:
return self._select_config_indices_with_dynamic_sharding()
return self._select_config_indices_with_soft_affinity(args)
def _select_config_indices_with_dynamic_sharding(self):
alive_bot_ids = [
bot_id for bot_id, b in self._eligible_bots_by_ids.items()
if b.is_alive()
]
trigger_count = len(alive_bot_ids)
indexes = list(range(trigger_count))
random.shuffle(indexes)
selected_config = [(indexes[i],
self._find_bot_config_index(alive_bot_ids[i]))
for i in range(trigger_count)]
selected_config.sort()
for shard_index, bot_index in selected_config:
logging.info('Shard %d\n\tBot: %s', shard_index,
self._bot_configs[bot_index]['id'])
return selected_config
def _select_config_indices_with_soft_affinity(self, args):
trigger_count = len(base_test_triggerer.indices_to_trigger(args))
# First make sure the number of shards doesn't exceed the
# number of eligible bots. This means there is a config error somewhere.
if trigger_count > len(self._eligible_bots_by_ids):
_print_device_affinity_info({}, {}, self._eligible_bots_by_ids,
trigger_count)
raise ValueError(
'Not enough available machines exist in swarming '
'pool. Shards requested (%d) exceeds available bots '
'(%d).' % (trigger_count, len(self._eligible_bots_by_ids)))
shard_to_bot_assignment_map = {}
unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids)
for shard_index in base_test_triggerer.indices_to_trigger(args):
bot_id = self._query_swarming_for_last_shard_id(shard_index)
if bot_id and bot_id in unallocated_bots_by_ids:
bot = unallocated_bots_by_ids[bot_id]
shard_to_bot_assignment_map[shard_index] = bot
unallocated_bots_by_ids.pop(bot_id)
else:
shard_to_bot_assignment_map[shard_index] = None
# Maintain the current map for debugging purposes
existing_shard_bot_to_shard_map = copy.deepcopy(
shard_to_bot_assignment_map)
# Now create sets of remaining healthy and bad bots
unallocated_healthy_bots = {
b
for b in unallocated_bots_by_ids.values() if b.is_alive()
}
unallocated_bad_bots = {
b
for b in unallocated_bots_by_ids.values() if not b.is_alive()
}
# Try assigning healthy bots for new shards first.
for shard_index, bot in sorted(shard_to_bot_assignment_map.items()):
if not bot and unallocated_healthy_bots:
shard_to_bot_assignment_map[shard_index] = \
unallocated_healthy_bots.pop()
logging.info('First time shard %d has been triggered',
shard_index)
elif not bot:
shard_to_bot_assignment_map[
shard_index] = unallocated_bad_bots.pop()
# Handle the rest of shards that were assigned dead bots:
for shard_index, bot in sorted(shard_to_bot_assignment_map.items()):
if not bot.is_alive() and unallocated_healthy_bots:
dead_bot = bot
healthy_bot = unallocated_healthy_bots.pop()
shard_to_bot_assignment_map[shard_index] = healthy_bot
logging.info(
'Device affinity broken for shard #%d. bot %s is dead,'
' new mapping to bot %s', shard_index, dead_bot.id(),
healthy_bot.id())
# Now populate the indices into the bot_configs array
selected_configs = []
for shard_index in base_test_triggerer.indices_to_trigger(args):
selected_configs.append(
(shard_index,
self._find_bot_config_index(
shard_to_bot_assignment_map[shard_index].id())))
_print_device_affinity_info(shard_to_bot_assignment_map,
existing_shard_bot_to_shard_map,
self._eligible_bots_by_ids, trigger_count)
return selected_configs
def _query_swarming_for_eligible_bot_configs(self, dimensions):
"""Query Swarming to figure out which bots are available.
Returns: a dictionary in which the keys are the bot id and
the values are Bot object that indicate the health status
of the bots.
"""
query_result = self.list_bots(dimensions, server=self._swarming_server)
perf_bots = {}
for bot in query_result:
# Device maintenance is usually quick, and we can wait for it to
# finish. However, if the device is too hot, it can take a long time
# for it to cool down, so check for 'Device temperature' in
# maintenance_msg.
alive = (not bot.get('is_dead') and not bot.get('quarantined')
and 'Device temperature' not in bot.get(
'maintenance_msg', ''))
perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive)
return perf_bots
def _find_bot_config_index(self, bot_id):
# Find the index into the bot_config map that
# maps to the bot id in question
for i, dimensions in enumerate(self._bot_configs):
if dimensions['id'] == bot_id:
return i
return None
def _query_swarming_for_last_shard_id(self, shard_index):
"""Per shard, query swarming for the last bot that ran the task.
Example: swarming.py query -S server-url.com --limit 1 \\
'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12'
"""
values = ['%s:%s' % (k, v) for k, v in self._dimensions.items()]
values.sort()
# Append the shard as a tag
values_with_shard = list(values)
values_with_shard.append('%s:%s' % ('shard', str(shard_index)))
values_with_shard.sort()
# TODO(eyaich): For now we are ignoring the state of the returned
# task (ie completed, timed_out, bot_died, etc) as we are just
# answering the question "What bot did we last trigger this shard on?"
# Evaluate if this is the right decision going forward.
# Query for the last task that ran with these dimensions and this shard.
# Query with the shard param first. This will sometimes time out for
# queries we've never done before, so try querying without it if that
# happens.
try:
if not self._sharded_query_failed:
tasks = self.list_tasks(values_with_shard,
limit='1',
server=self._swarming_server)
except subprocess.CalledProcessError:
self._sharded_query_failed = True
if self._sharded_query_failed:
tasks = self.list_tasks(values,
limit='1',
server=self._swarming_server)
if tasks:
# We queried with a limit of 1 so we could only get back
# the most recent which is what we care about.
task = tasks[0]
if 'bot_id' in task:
return task['bot_id']
for tag in task['tags']:
if tag.startswith('id:'):
return tag[len('id:'):]
# No eligible shard for this bot
return None
def _print_device_affinity_info(new_map, existing_map, health_map, num_shards):
logging.info('')
for shard_index in range(num_shards):
existing = existing_map.get(shard_index, None)
new = new_map.get(shard_index, None)
existing_id = ''
if existing:
existing_id = existing.id()
new_id = ''
if new:
new_id = new.id()
logging.info('Shard %d\n\tprevious: %s\n\tnew: %s', shard_index,
existing_id, new_id)
healthy_bots = []
dead_bots = []
for _, b in health_map.items():
if b.is_alive():
healthy_bots.append(b.id())
else:
dead_bots.append(b.id())
logging.info('Shards needed: %d', num_shards)
logging.info('Total bots (dead + healthy): %d',
len(dead_bots) + len(healthy_bots))
logging.info('Healthy bots, %d: %s', len(healthy_bots), healthy_bots)
logging.info('Dead Bots, %d: %s', len(dead_bots), dead_bots)
logging.info('')
def _get_swarming_dimensions(args):
dimensions = {}
for i in range(len(args) - 2):
if args[i] == '--dimension':
dimensions[args[i + 1]] = args[i + 2]
return dimensions
def _get_swarming_server(args):
for i, argument in enumerate(args):
if '--swarming' in argument:
server = args[i + 1]
slashes_index = server.index('//') + 2
# Strip out the protocol
return server[slashes_index:]
return None
def main():
logging.basicConfig(level=logging.INFO,
format='(%(levelname)s) %(asctime)s pid=%(process)d'
' %(module)s.%(funcName)s:%(lineno)d %(message)s')
# Setup args for common contract of base class
parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract(
argparse.ArgumentParser(description=__doc__))
parser.add_argument('--use-dynamic-shards',
action='store_true',
required=False,
help='Ignore --shards and the existing shard map. Will '
'generate a shard map at run time and use as much '
'device as possible.')
args, remaining = parser.parse_known_args()
triggerer = PerfDeviceTriggerer(args, remaining)
return triggerer.trigger_tasks(args, remaining)
if __name__ == '__main__':
sys.exit(main())
|