File: sharding_map_generator.py

package info (click to toggle)
chromium 138.0.7204.157-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,071,864 kB
  • sloc: cpp: 34,936,859; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,967; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (381 lines) | stat: -rw-r--r-- 15,600 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# Copyright 2018 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections

import core.bot_platforms
import core.path_util
import core.cli_utils

core.path_util.AddTelemetryToPath()

# Initialize the duration of all stories to be shard to 10 seconds.
# The reasons are:
# 1) Even if the stories are skipped, they still have non negligible
#    overhead.
# 2) For a case of sharding a set of benchmarks with no existing data about
#    timing, initializing the stories time within a single repeat to 1 leads
#    to a roughly equal distribution of stories on the shards, whereas
#    initializing them to zero will make the algorithm put all the stories
#    into the first shard.
# 3) For the case  of adding a new benchmark to a builder that hasn't run
#    it before but has run other benchmarks, 10 seconds is a reasonable
#    amount of time to guess that it would take the stories to run and
#    creates reasonably balanced shard maps.
DEFAULT_STORY_DURATION = 10


def generate_sharding_map(benchmarks_to_shard,
                          timing_data,
                          num_shards,
                          debug,
                          repeat_config=None):
  """Generate sharding map.

    Args:
      benchmarks_to_shard: a list of bot_platforms.BenchmarkConfig and
      ExecutableConfig objects.
      timing_data: The timing data in json with 'name' and 'duration'
      num_shards: the total number of shards
      debug: if true, print out full list of stories of each shard in shard map.
      repeat_config: dict of the tests which need to repeat on multiple
      shards.
    Return:
      The shard map.
  """
  # Now we have three ways to run the tests:
  # - Run the benchmark once;
  # - Run the benchmark multiple times;
  # - Run the test multiple times.
  # The current format of the shard map (which is using the benchmark and story
  # name as the key) does not directly support running repeating the same test
  # on the same shard. Also considering reusing the existing code, now we will
  # allocate the test in the following fashion: the benchmarks or tests which
  # needs repeats are always allocated first, in order to avoid the fact that
  # all of them need to be squeezed together.

  # Sort the list of benchmarks to be sharded by benchmark's name to make the
  # execution of this algorithm deterministic.
  benchmarks_to_shard.sort(key=lambda entry: entry.name)
  benchmark_name_to_config = {b.name: b for b in benchmarks_to_shard}
  stories_by_benchmark = {}
  for b in benchmarks_to_shard:
    stories_by_benchmark[b.name] = b.stories

  # parse the test repeat config
  benchmarks_to_repeat = {}
  stories_to_repeat = {}
  if repeat_config:
    for b in repeat_config:
      if b not in benchmark_name_to_config:
        continue
      if isinstance(repeat_config[b], int):
        repeats = min(repeat_config[b], num_shards)
        if repeats > 1:
          # repeat the whole benchmark
          benchmarks_to_repeat[b] = repeats
      else:
        # repeat specific stories of the benchmark
        for s in repeat_config[b]:
          repeats = min(repeat_config[b][s], num_shards)
          if repeats > 1:
            stories_to_repeat['%s/%s' % (b, s)] = repeats

  # Generate timing list for each way of repeating.
  # A timing list of tuples of (benchmarkname/story_name, story_duration),
  # where stories are ordered as in the benchmarks.
  benchmarks_to_run_once = [
      b for b in benchmarks_to_shard if b.name not in benchmarks_to_repeat
  ]
  one_time_story_timing_list = _gather_timing_data(benchmarks_to_run_once,
                                                   timing_data, True)

  repeated_benchmark_to_timing_list = {}
  for b in benchmarks_to_repeat:
    timing_list = _gather_timing_data([benchmark_name_to_config[b]],
                                      timing_data, True)
    repeated_benchmark_to_timing_list[b] = timing_list

  repeated_story_timing_list = {}
  for s, t in one_time_story_timing_list:
    if s in stories_to_repeat:
      repeated_story_timing_list[s] = t

  # Generate the total timing and story count
  total_time = sum(t for _, t in one_time_story_timing_list)
  total_story = len(one_time_story_timing_list)
  for benchmark, timing_list in repeated_benchmark_to_timing_list.items():
    total_time += sum(t * benchmarks_to_repeat[benchmark]
                      for s, t in timing_list)
    total_story += len(timing_list) * benchmarks_to_repeat[benchmark]
  for story, time in repeated_story_timing_list.items():
    total_time += time * (stories_to_repeat[story] - 1)
    total_story += stories_to_repeat[story] - 1
  expected_shard_time = total_time / num_shards

  sharding_map = collections.OrderedDict()
  num_stories = total_story
  min_shard_time = 0x7fffffff  # maxint
  min_shard_index = None
  max_shard_time = 0
  max_shard_index = None
  predicted_shard_timings = []
  debug_timing = collections.OrderedDict()

  current_repeat_on_benchmark = 0
  repeating_benchmark_timing_list = []
  one_time_story_timing_list.reverse()
  # The algorithm below removes all the stories from story timing lists one by
  # one and add them to the current shard until the shard's total time is
  # approximately equals to |expected_shard_time|. After that point,
  # it moves to the next shard. To make sure repeating benchmarks are not on the
  # same shard, the stories from repeated_benchmark_to_timing_list have higher
  # priority.
  # For efficient removal of story timing list's elements & to keep the
  # ordering of benchmark alphabetically sorted in the shards' assignment, we
  # reverse the |story_timing_list|.
  #TODO(crbug.com/40175917): fix extra story repeat
  for i in range(num_shards):
    if len(repeating_benchmark_timing_list) == 0:
      for benchmark, timing_list in repeated_benchmark_to_timing_list.items():
        if benchmarks_to_repeat[benchmark] > current_repeat_on_benchmark:
          repeating_benchmark_timing_list += timing_list
      repeating_benchmark_timing_list.reverse()
      current_repeat_on_benchmark += 1

    shard_name = 'shard #%i' % i
    sharding_map[str(i)] = {'benchmarks': collections.OrderedDict()}

    pre_allocated_stories = []
    pre_allocated_time = 0
    # pre_allocated_stories = all_pre_allocated_stories[i]
    # for s in pre_allocated_stories:
    #   pre_allocated_time += repeated_story_timing_list[s]
    for s, t in stories_to_repeat.items():
      if t > i:
        pre_allocated_stories.append(s)
        pre_allocated_time += repeated_story_timing_list[s]

    debug_timing[shard_name] = collections.OrderedDict()
    shard_time = pre_allocated_time
    stories_in_shard = []

    # Keep adding stories to the current shard if:
    # 1. Adding the next story does not makes the shard time further from
    # the expected;
    # Or
    # 2. The current shard is the last shard.
    while one_time_story_timing_list or repeating_benchmark_timing_list:
      if repeating_benchmark_timing_list:
        story_timing_list = repeating_benchmark_timing_list
      else:
        story_timing_list = one_time_story_timing_list

      # Add one story anyway to avoid empty shard
      current_story, current_duration = story_timing_list[-1]
      story_timing_list.pop()
      if current_story not in pre_allocated_stories:
        shard_time += current_duration
      stories_in_shard.append(current_story)
      debug_timing[shard_name][current_story] = current_duration

      if not one_time_story_timing_list and not repeating_benchmark_timing_list:
        # All stories sharded
        break

      if repeating_benchmark_timing_list:
        _, next_duration = repeating_benchmark_timing_list[-1]
      else:
        _, next_duration = one_time_story_timing_list[-1]
      if (abs(shard_time + next_duration - expected_shard_time) >
          abs(shard_time - expected_shard_time)) and i != num_shards - 1:
        # it is not the last shard and we should not add the next story
        break
    _add_benchmarks_to_shard(sharding_map, i, stories_in_shard,
                             stories_by_benchmark, benchmark_name_to_config)

    sharding_map_benchmarks = sharding_map[str(i)].get(
        'benchmarks', collections.OrderedDict())
    benchmark_sections = collections.OrderedDict()
    for benchmark, config in sharding_map_benchmarks.items():
      if 'sections' in config:
        section_list = [(s.get('begin', 0),
                         s.get('end', len(stories_by_benchmark[benchmark])))
                        for s in config['sections']]
      else:
        section_list = [(config.get('begin', 0),
                         config.get('end',
                                    len(stories_by_benchmark[benchmark])))]
      benchmark_sections[benchmark] = section_list

    for pre_allocated_story in pre_allocated_stories:
      benchmark, story = pre_allocated_story.split('/', 1)
      story_index = stories_by_benchmark[benchmark].index(story)
      if benchmark in benchmark_sections:
        benchmark_sections[benchmark].append((story_index, story_index + 1))
      else:
        benchmark_sections[benchmark] = [(story_index, story_index + 1)]

    new_benchmark_configs = collections.OrderedDict()
    for benchmark, sections in benchmark_sections.items():
      merged_sections = core.cli_utils.MergeIndexRanges(sections)
      sections_config = []
      if len(merged_sections) == 1:
        begin = merged_sections[0][0] if merged_sections[0][0] != 0 else None
        end = merged_sections[0][1] if merged_sections[0][1] != len(
            stories_by_benchmark[benchmark]) else None
        benchmark_config = {}
        if begin:
          benchmark_config['begin'] = begin
        if end:
          benchmark_config['end'] = end
      elif len(merged_sections) > 1:
        for section in merged_sections:
          sections_config.append({'begin': section[0], 'end': section[1]})
        benchmark_config = {
            'sections': sections_config,
            'abridged': benchmark_name_to_config[benchmark].abridged
        }

      abridged = benchmark_name_to_config[benchmark].abridged
      benchmark_config['abridged'] = abridged

      pageset_repeat_override = benchmark_name_to_config[
          benchmark].pageset_repeat_override
      if pageset_repeat_override:
        benchmark_config['pageset_repeat'] = pageset_repeat_override

      new_benchmark_configs[benchmark] = benchmark_config

    sharding_map[str(i)]['benchmarks'] = new_benchmark_configs
    if i != num_shards - 1:
      total_time -= shard_time
      expected_shard_time = total_time / (num_shards - i - 1)
    if shard_time > max_shard_time:
      max_shard_time = shard_time
      max_shard_index = i
    if shard_time < min_shard_time:
      min_shard_time = shard_time
      min_shard_index = i

    predicted_shard_timings.append((shard_name, shard_time))
    debug_timing[shard_name]['expected_total_time'] = shard_time

  sharding_map['extra_infos'] = collections.OrderedDict([
      ('num_stories', num_stories),
      ('predicted_min_shard_time', min_shard_time),
      ('predicted_min_shard_index', min_shard_index),
      ('predicted_max_shard_time', max_shard_time),
      ('predicted_max_shard_index', max_shard_index),
  ])

  if debug:
    sharding_map['extra_infos'].update(debug_timing)
  else:
    sharding_map['extra_infos'].update(predicted_shard_timings)
  return sharding_map


def _add_benchmarks_to_shard(sharding_map, shard_index, stories_in_shard,
    all_stories, benchmark_name_to_config):
  benchmarks = collections.OrderedDict()
  for story in stories_in_shard:
    (b, story) = story.split('/', 1)
    if b not in benchmarks:
      benchmarks[b] = []
    benchmarks[b].append(story)

  # Format the benchmark's stories by indices
  benchmarks_in_shard = collections.OrderedDict()
  executables_in_shard = collections.OrderedDict()
  crossbench_in_shard = collections.OrderedDict()
  for b in benchmarks:
    config = benchmark_name_to_config[b]
    if isinstance(config, core.bot_platforms.CrossbenchConfig):
      crossbench_in_shard[config.crossbench_name] = {
          'display_name': b,
          'arguments': config.arguments,
      }
      first_story = all_stories[b].index(benchmarks[b][0])
      last_story = all_stories[b].index(benchmarks[b][-1]) + 1
      if first_story != 0:
        crossbench_in_shard[config.crossbench_name]['begin'] = first_story
      if last_story != len(all_stories[b]):
        crossbench_in_shard[config.crossbench_name]['end'] = last_story
    elif config.is_telemetry:
      benchmarks_in_shard[b] = {}
      first_story = all_stories[b].index(benchmarks[b][0])
      last_story = all_stories[b].index(benchmarks[b][-1]) + 1
      if first_story != 0:
        benchmarks_in_shard[b]['begin'] = first_story
      if last_story != len(all_stories[b]):
        benchmarks_in_shard[b]['end'] = last_story
      benchmarks_in_shard[b]['abridged'] = benchmark_name_to_config[b].abridged
    else:
      executables_in_shard[b] = {}
      if config.flags:
        executables_in_shard[b]['arguments'] = config.flags
      executables_in_shard[b]['path'] = config.path
  sharding_map[str(shard_index)] = collections.OrderedDict()
  if benchmarks_in_shard:
    sharding_map[str(shard_index)]['benchmarks'] = benchmarks_in_shard
  if executables_in_shard:
    sharding_map[str(shard_index)]['executables'] = executables_in_shard
  if crossbench_in_shard:
    sharding_map[str(shard_index)]['crossbench'] = crossbench_in_shard


def _gather_timing_data(benchmarks_to_shard, timing_data, repeat):
  """Generates a list of story and duration in order.
  Return:
    A list of tuples of (story_name, story_duration), sorted by the order of
    benchmark name + story order within the benchmark.
  """
  timing_data_dict = {}
  for run in timing_data:
    if run['duration']:
      timing_data_dict[run['name']] = float(run['duration'])
  timing_data_list = []
  for b in benchmarks_to_shard:
    run_count = b.repeat if repeat else 1
    for s in b.stories:
      test_name = '%s/%s' % (b.name, s)
      test_duration = getattr(b, 'estimated_runtime', DEFAULT_STORY_DURATION)
      if test_name in timing_data_dict:
        test_duration = timing_data_dict[test_name] * run_count
      timing_data_list.append((test_name, test_duration))
  return timing_data_list


def test_sharding_map(
    sharding_map, benchmarks_to_shard, test_timing_data):
  story_timing_list = _gather_timing_data(
      benchmarks_to_shard, test_timing_data, False)

  story_timing_dict = dict(story_timing_list)

  results = collections.OrderedDict()
  all_stories = {}
  for b in benchmarks_to_shard:
    all_stories[b.name] = b.stories

  sharding_map.pop('extra_infos', None)
  for shard in sharding_map:
    results[shard] = collections.OrderedDict()
    shard_total_time = 0
    for benchmark_name in sharding_map[shard]['benchmarks']:
      benchmark = sharding_map[shard]['benchmarks'][benchmark_name]
      begin = 0
      end = len(all_stories[benchmark_name])
      if 'begin' in benchmark:
        begin = benchmark['begin']
      if 'end' in benchmark:
        end = benchmark['end']
      benchmark_timing = 0
      for story in all_stories[benchmark_name][begin : end]:
        story_timing = story_timing_dict[benchmark_name + '/' + story]
        results[shard][benchmark_name + '/' + story] = str(story_timing)
        benchmark_timing += story_timing
      shard_total_time += benchmark_timing
    results[shard]['full_time'] = shard_total_time
  return results