File: perf_device_trigger.py

package info (click to toggle)
chromium 73.0.3683.75-1~deb9u1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,792,156 kB
  • sloc: cpp: 13,473,466; ansic: 1,577,080; python: 898,539; javascript: 655,737; xml: 341,883; asm: 306,070; java: 289,969; perl: 80,911; objc: 67,198; sh: 43,184; cs: 27,853; makefile: 12,092; php: 11,064; yacc: 10,373; tcl: 8,875; ruby: 3,941; lex: 1,800; pascal: 1,473; lisp: 812; awk: 41; jsp: 39; sed: 19; sql: 3
file content (304 lines) | stat: -rwxr-xr-x 11,887 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Custom swarming triggering script.

This script does custom swarming triggering logic, to enable device affinity
for our bots, while lumping all trigger calls under one logical step.

For the perf use case of device affinity, this script now enables soft device
affinity.  This means that it tries to smartly allocate jobs to bots based
on what is currently alive and what bot the task was last triggered on,
preferring that last triggered bot if available.  If the
--multiple-trigger-configs flag is specified than this script overrides
the soft device affinity functionality in favor of the provided ids.

The algorithm is roughly the following:

Find eligible bots, healthy or not.
  * Query swarming for eligible bots based on the dimensions passed in
    on the swarming call.  Determine their health status based on
    is not quarantied and is not is_dead

Of the eligible bots determine what bot id to run the shard on.
(Implementation in _select_config_indices_with_soft_affinity)
  * First query swarming for the last task that ran that shard with
    given dimensions.  Assuming they are returned with most recent first.
  * Check if the bot id that ran that task is alive, if so trigger
    on that bot again.
  * If that bot isn't alive, allocate to another alive bot or if no
    other alive bots exist, trigger on the same dead one.

Scripts inheriting must have roughly the same command line interface as
swarming.py trigger. It modifies it in the following ways:

 * Intercepts the dump-json argument, and creates its own by combining the
   results from each trigger call.
 * Intercepts the dimensions from the swarming call and determines what bots
   are healthy based on the above device affinity algorithm, and triggers
 * Adds a tag to the swarming trigger job with the shard so we know the last
   bot that ran this shard.

This script is normally called from the swarming recipe module in tools/build.

"""

import argparse
import copy
import json
import os
import subprocess
import sys
import tempfile
import urllib

import base_test_triggerer

class Bot(object):
  """ Eligible bots to run the task"""
  def __init__(self, bot_id, is_alive):
    self._bot_id = bot_id
    self._is_alive = is_alive

  def id(self):
    return self._bot_id

  def is_alive(self):
    return self._is_alive

  def as_json_config(self):
    return {'id': self._bot_id}

class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer):
  def __init__(self, args, swarming_args):
    super(PerfDeviceTriggerer, self).__init__()
    if not args.multiple_trigger_configs:
      # Represents the list of current dimensions requested
      # by the parent swarming job.
      self._dimensions = self._get_swarming_dimensions(swarming_args)

      # Store what swarming server we need and whether or not we need
      # to send down authentication with it
      self._swarming_server = self._get_swarming_server(swarming_args)
      self._service_account = self._get_service_account(swarming_args)

      # Map of all existing bots in swarming that satisfy the current
      # set of dimensions indexed by bot id.
      # Note: this assumes perf bot dimensions are unique between
      # configurations.
      self._eligible_bots_by_ids = (
          self._query_swarming_for_eligible_bot_configs(self._dimensions))

  def append_additional_args(self, args, shard_index):
    # Append a tag to the swarming task with the shard number
    # so we can query for the last bot that ran a specific shard.
    tag = 'shard:%d' % shard_index
    shard_tag = ['--tag', tag]
    # Need to append this before the dash if present so it gets fed to
    # the swarming task itself.
    if '--' in args:
      dash_ind = args.index('--')
      return args[:dash_ind] + shard_tag + args[dash_ind:]
    else:
      return args + shard_tag

  def parse_bot_configs(self, args):
    if args.multiple_trigger_configs:
      super(PerfDeviceTriggerer, self).parse_bot_configs(args)
    else:
      self._bot_configs = []
      # For each eligible bot, append the dimension
      # to the eligible bot_configs
      for  _, bot in self._eligible_bots_by_ids.iteritems():
        self._bot_configs.append(bot.as_json_config())

  def select_config_indices(self, args, verbose):
    if args.multiple_trigger_configs:
      # If specific bot ids were passed in, we want to trigger a job for
      # every valid config regardless of health status since
      # each config represents exactly one bot in the perf swarming pool.
      return range(args.shards)
    return self._select_config_indices_with_soft_affinity(args, verbose)

  def _select_config_indices_with_soft_affinity(self, args, verbose):
    # First make sure the number of shards doesn't exceed the
    # number of eligible bots.  This means there is a config error somewhere.
    if args.shards > len(self._eligible_bots_by_ids):
      if verbose:
        self._print_device_affinity_info({}, {},
          self._eligible_bots_by_ids, args.shards)
      raise ValueError('Not enough available machines exist in in swarming'
                       'pool.  Shards requested (%d) exceeds available bots '
                       '(%d).' % (
                           args.shards, len(self._eligible_bots_by_ids)))

    shard_to_bot_assignment_map = {}
    unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids)
    for shard_index in xrange(args.shards):
      bot_id = self._query_swarming_for_last_shard_id(shard_index)
      if bot_id and bot_id in unallocated_bots_by_ids:
        bot = unallocated_bots_by_ids[bot_id]
        shard_to_bot_assignment_map[shard_index] = bot
        unallocated_bots_by_ids.pop(bot_id)
      else:
        shard_to_bot_assignment_map[shard_index] = None

    # Maintain the current map for debugging purposes
    existing_shard_bot_to_shard_map = copy.deepcopy(shard_to_bot_assignment_map)
    # Now create sets of remaining healthy and bad bots
    unallocated_healthy_bots = {
        b for b in unallocated_bots_by_ids.values() if b.is_alive()}
    unallocated_bad_bots = {
        b for b in unallocated_bots_by_ids.values() if not b.is_alive()}

    # Try assigning healthy bots for new shards first.
    for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()):
      if not bot and unallocated_healthy_bots:
        shard_to_bot_assignment_map[shard_index] = \
            unallocated_healthy_bots.pop()
        if verbose:
          print 'First time shard %d has been triggered' % shard_index
      elif not bot:
        shard_to_bot_assignment_map[shard_index] = unallocated_bad_bots.pop()

    # Handle the rest of shards that were assigned dead bots:
    for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()):
      if not bot.is_alive() and unallocated_healthy_bots:
        dead_bot = bot
        healthy_bot = unallocated_healthy_bots.pop()
        shard_to_bot_assignment_map[shard_index] = healthy_bot
        if verbose:
          print ('Device affinity broken for bot %s, new '
                 'mapping to bot %s' % (dead_bot.id(), healthy_bot.id()))

    # Now populate the indices into the bot_configs array
    selected_configs = []
    for shard_index in xrange(args.shards):
      selected_configs.append(self._find_bot_config_index(
          shard_to_bot_assignment_map[shard_index].id()))
    if verbose:
      self._print_device_affinity_info(
        shard_to_bot_assignment_map,
        existing_shard_bot_to_shard_map,
        self._eligible_bots_by_ids, args.shards)
    return selected_configs


  def _print_device_affinity_info(
      self, new_map, existing_map, health_map, num_shards):
    for shard_index in xrange(num_shards):
      existing = existing_map.get(shard_index, None)
      new = new_map.get(shard_index, None)
      existing_id = ""
      if existing:
        existing_id = existing.id()
      new_id = ""
      if new:
        new_id = new.id()
      print "Shard %d\n\tprevious: %s\n\tnew: %s" % (
          shard_index, existing_id, new_id)

    healthy_bots = []
    dead_bots = []
    for _, b in health_map.iteritems():
      if b.is_alive():
        healthy_bots.append(b.id())
      else:
        dead_bots.append(b.id())
    print "Healthy bots: %s" % healthy_bots
    print "Dead Bots: %s" % dead_bots


  def _query_swarming_for_eligible_bot_configs(self, dimensions):
    """ Query Swarming to figure out which bots are available.

      Returns: a dictionary in which the keys are the bot id and
      the values are Bot object that indicate the health status
      of the bots.
    """
    values = []
    for key, value in sorted(dimensions.iteritems()):
      values.append(('dimensions', '%s:%s' % (key, value)))

    query_result = self.query_swarming(
        'bots/list', values, True, server=self._swarming_server,
        service_account=self._service_account)
    perf_bots = {}
    for bot in query_result['items']:
      alive = (not bot['is_dead'] and not bot['quarantined'])
      perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive)
    return perf_bots

  def _find_bot_config_index(self, bot_id):
    # Find the index into the bot_config map that
    # maps to the bot id in question
    for i, dimensions in enumerate(self._bot_configs):
      if dimensions['id'] == bot_id:
        return i
    return None

  def _query_swarming_for_last_shard_id(self, shard_index):
    # Per shard, query swarming for the last bot that ran the task
    # Example: swarming.py query -S server-url.com --limit 1 \\
    #  'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12'
    values = [
      ('tags', '%s:%s' % (k, v)) for k, v in self._dimensions.iteritems()
    ]
    # Append the shard as a tag
    values.append(('tags', '%s:%s' % ('shard', str(shard_index))))
    values.sort()
    # TODO(eyaich): For now we are ignoring the state of the returned
    # task (ie completed, timed_out, bot_died, etc) as we are just
    # answering the question "What bot did we last trigger this shard on?"
    # Evaluate if this is the right decision going forward.

    # Query for the last task that ran with these dimensions and this shard
    query_result = self.query_swarming(
          'tasks/list', values, True, limit='1', server=self._swarming_server,
         service_account=self._service_account)
    tasks = query_result.get('items')
    if tasks:
      # We queried with a limit of 1 so we could only get back
      # the most recent which is what we care about.
      task = tasks[0]
      if 'bot_id' in task:
        return task['bot_id']
      for tag in task['tags']:
        if tag.startswith('id:'):
          return tag[len('id:'):]
    # No eligible shard for this bot
    return None

  def _get_swarming_dimensions(self, args):
    dimensions = {}
    for i in xrange(len(args) - 2):
      if args[i] == '--dimension':
        dimensions[args[i+1]] = args[i+2]
    return dimensions

  def _get_swarming_server(self, args):
    for i in xrange(len(args)):
      if '--swarming' in args[i]:
        server = args[i+1]
        slashes_index = server.index('//') + 2
        # Strip out the protocol
        return server[slashes_index:]

  def _get_service_account(self, args):
    for i in xrange(len(args) - 1):
      if '--auth-service-account-json' in args[i]:
        return args[i+1]

def main():
  # Setup args for common contract of base class
  parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract(
      argparse.ArgumentParser(description=__doc__))
  args, remaining = parser.parse_known_args()

  triggerer = PerfDeviceTriggerer(args, remaining)
  return triggerer.trigger_tasks(args, remaining)

if __name__ == '__main__':
  sys.exit(main())