File: file_reading.py

package info (click to toggle)
chromium 138.0.7204.183-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,071,908 kB
  • sloc: cpp: 34,937,088; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,971; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (642 lines) | stat: -rwxr-xr-x 28,280 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
#!/usr/bin/env python3
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Parsing logic to read files for the Web App testing framework.
"""

from collections import defaultdict
import logging
import os
from posixpath import split
import re
from typing import Dict, List, Set, Tuple, Optional

from numpy import append

from models import Action, TestId
from models import ArgEnum
from models import ActionType
from models import ActionsByName
from models import CoverageTest
from models import CoverageTestsByPlatformSet
from models import EnumsByType
from models import PartialAndFullCoverageByBaseName
from models import TestIdsTestNamesByPlatform
from models import TestIdsTestNamesByPlatformSet
from models import TestIdTestNameTuple
from models import TestPartitionDescription
from models import TestPlatform

MIN_COLUMNS_ENUMS_FILE = 2
MIN_COLUMNS_ACTIONS_FILE = 5
MIN_COLUMNS_SUPPORTED_ACTIONS_FILE = 5
MIN_COLUMNS_UNPROCESSED_COVERAGE_FILE = 2


def enumerate_markdown_file_lines_to_table_rows(
        lines: List[str]) -> List[Tuple[int, List[str]]]:
    output = []
    for i, line in enumerate(lines):
        line = line.strip()
        if not line.startswith("|"):
            continue
        # Remove the pipe character from the beginning and end to prevent an
        # empty first and last entry.
        line = line.lstrip('|').rstrip("|")
        row: List[str] = line.split("|")
        if len(row) == 0:
            continue
        stripped = list(map(str.strip, row))
        first_item: str = stripped[0]
        if first_item.startswith("#"):
            continue
        if first_item.count('-') == len(first_item):
            continue
        output.append((i, stripped))
    return output


def enumerate_all_argument_combinations(argument_types: List[ArgEnum]
                                        ) -> List[List[str]]:
    if len(argument_types) == 0:
        return [[]]
    sub_combinations = enumerate_all_argument_combinations(argument_types[:-1])
    last_type = argument_types[-1]
    output: List[List[str]] = []
    for combination in sub_combinations:
        for value in last_type.values:
            output.append(combination + [value])
    return output


def expand_wildcards_in_action(action: str, enums: EnumsByType) -> List[str]:
    """
    Takes an action string that could contain enum wildcards, and returns the
    list of all combinations of actions with all wildcards fully expanded.

    Example input:
    - action: 'Action(EnumType::All, EnumType::All)'
    - enums: {'EnumType': EnumType('EnumType', ['Value1', 'Value2'])}
    Example output:
    - ['Action(Value1, Value1)', 'Action(Value1, Value2)',
       'Action(Value2, Value1)', 'Action(Value2, Value2)']
    """
    if "::All" not in action:
        return [action]
    output: List[str] = []
    for type, enum in enums.items():
        wildcard_str = type + "::All"
        if wildcard_str in action:
            prefix = action[:action.index(wildcard_str)]
            postfix = action[action.index(wildcard_str) + len(wildcard_str):]
            for value in enum.values:
                output.extend(
                    expand_wildcards_in_action(prefix + value + postfix,
                                               enums))
    return output


def expand_tests_from_action_parameter_wildcards(enums: EnumsByType,
                                                 actions: List[str]
                                                 ) -> List[List[str]]:
    """
    Takes a list of actions for a test that could contain argument wildcards.
    Returns a list of tests the expand out all combination of argument
    wildcards.
    Example input:
    - actions: ['Action1(EnumType::All), Action2(EnumType::All)']
    - enums: {'EnumType': EnumType('EnumType', ['Value1', 'Value2'])}
    Example output:
    - [['Action1(Value1)', 'Action2(Value1)'],
       ['Action1(Value1)', 'Action2(Value2)'],
       ['Action1(Value2)', 'Action2(Value1)'],
       ['Action1(Value2)', 'Action2(Value2)']]
    """
    if not actions:
        return [[]]
    current_elements: List[str] = expand_wildcards_in_action(actions[0], enums)
    output: List[List[str]] = []
    following_output = expand_tests_from_action_parameter_wildcards(
        enums, actions[1:])
    for following_list in following_output:
        for element in current_elements:
            output.append([element] + following_list)
    return output


def resolve_bash_style_replacement(output_action_str: str,
                                   argument_values: List[str]):
    for i, arg in enumerate(argument_values):
        find_str = f"${i+1}"
        output_action_str = output_action_str.replace(find_str, arg)
    return output_action_str


def human_friendly_name_to_canonical_action_name(
        human_friendly_action_name: str,
        action_base_name_to_default_args: Dict[str, str]):
    """
    Converts a human-friendly action name (used in the spreadsheet) and turns
    into the format compatible with this testing framework. This does two
    things:
    1) Resolving specified arguments, from "()" format into a "_". For example,
       "action(with_argument)" turns into "action_with_argument".
    2) Resolving modeless actions (that have a default argument) to
       include the default argument. For example, if
       |action_base_name_to_default_arg| contains an entry for
       |human_friendly_action_name|, then that entry is appended to the action.
       "action" and {"action": "default_argument"} will respectively will return
       "action_default_argument".
    If neither of those cases apply, then the |human_friendly_action_name| is
    returned.
    """
    human_friendly_action_name = human_friendly_action_name.strip()
    if human_friendly_action_name in action_base_name_to_default_args:
        # Handle default arguments.
        human_friendly_action_name += "_" + action_base_name_to_default_args[
            human_friendly_action_name]
    elif '(' in human_friendly_action_name:
        # Handle arguments being specified. Also strip trailing _, which appears
        # if the action is "action_name()" without arguments.
        human_friendly_action_name = human_friendly_action_name.replace(
            "(", "_").replace(", ", "_").rstrip(")_")
    return human_friendly_action_name


def read_platform_supported_actions(csv_file
                                    ) -> PartialAndFullCoverageByBaseName:
    """Reads the action base names and coverage from the given csv file.

    Args:
        csv_file: The comma-separated-values file which lists action base names
                  and whether it is fully or partially supported.

    Returns:
        A dictionary of action base name to a set of partially supported
        and fully supported platforms.
    """
    actions_base_name_to_coverage: PartialAndFullCoverageByBaseName = {}
    column_offset_to_platform = {
        0: TestPlatform.MAC,
        1: TestPlatform.WINDOWS,
        2: TestPlatform.LINUX,
        3: TestPlatform.CHROME_OS
    }
    for i, row in enumerate(csv_file):
        if not row:
            continue
        if row[0].startswith("#"):
            continue
        if len(row) < MIN_COLUMNS_SUPPORTED_ACTIONS_FILE:
            raise ValueError(f"Row {i} does not contain enough entries. "
                             f"Got {row}.")
        action_base_name: str = row[0].strip()
        if action_base_name in actions_base_name_to_coverage:
            raise ValueError(f"Action base name '{action_base_name}' on "
                             f"row {i} is already specified.")
        if not re.fullmatch(r'[a-z_]+', action_base_name):
            raise ValueError(
                f"Invald action base name '{action_base_name}' on "
                f"row {i}. Please use snake_case.")
        fully_supported_platforms: Set[TestPlatform] = set()
        partially_supported_platforms: Set[TestPlatform] = set()
        for j, value in enumerate(row[1:5]):
            value = value.strip()
            if not value:
                continue
            if value == "🌕":
                fully_supported_platforms.add(column_offset_to_platform[j])
            elif value == "🌓":
                partially_supported_platforms.add(column_offset_to_platform[j])
            elif value != "🌑":
                raise ValueError(f"Invalid coverage '{value}' on row {i}. "
                                 f"Please use '🌕', '🌓', or '🌑'.")

        actions_base_name_to_coverage[action_base_name] = (
            partially_supported_platforms, fully_supported_platforms)
    return actions_base_name_to_coverage


def read_enums_file(enums_file_lines: List[str]) -> EnumsByType:
    """Reads the enums markdown file.
    """
    enums_by_type: EnumsByType = {}
    for i, row in enumerate_markdown_file_lines_to_table_rows(
            enums_file_lines):
        if len(row) < MIN_COLUMNS_ENUMS_FILE:
            raise ValueError(f"Row {i!r} does not contain enough entries. "
                             f"Got {row}.")
        type = row[0].strip()
        if not re.fullmatch(r'([A-Z]\w*\*?)|', type):
            raise ValueError(f"Invald enum type name {type!r} on row "
                             f"{i!r}. Please use PascalCase.")
        values: List[str] = []
        default_value: Optional[str] = None
        for value in row[1:]:
            value = value.strip()
            if not value:
                continue
            if "*" in value:
                if default_value is not None:
                    raise ValueError(
                        f"Cannot have two default values for enum type "
                        f"{type!r} on row {i!r}.")

                value = value.rstrip("*")
                default_value = value
            if not re.fullmatch(r'([A-Z]\w*\*?)|', value):
                raise ValueError(f"Invald enum value {value!r} on row "
                                 f"{i!r}. Please use PascalCase.")
            values.append(value)
        enum: ArgEnum = ArgEnum(type, values, default_value)
        enums_by_type[enum.type_name] = enum
    return enums_by_type


def read_actions_file(
        actions_file_lines: List[str], enums_by_type: Dict[str, ArgEnum],
        supported_platform_actions: PartialAndFullCoverageByBaseName
) -> Tuple[ActionsByName, Dict[str, str]]:
    """Reads the actions comma-separated-values file.

    If arguments  are specified for an action in the file, then one action is
    added to the results dictionary per action_base_name + mode
    parameterized. A argument marked with a "*" is considered the default
    argument for that action.

    If output actions are specified for an action, then it will be a
    PARAMETERIZED action and the output actions will be resolved into the
    `Action.output_actions` field.

    See the README.md for more information about actions and action templates.

    Args:
        actions_file_lines: The comma-separated-values file read to parse all
                          actions.
        supported_platform_actions: A dictionary of platform to the actions that
                                    are fully or partially covered on that
                                    platform.

    Returns (actions_by_name,
             action_base_name_to_default_args):
        actions_by_name:
            Index of all actions by action name.
        action_base_name_to_default_args:
            Index of action base names to the default arguments. Only populated
            for actions where all argument types have defaults.

    Raises:
        ValueError: The input file is invalid.
    """
    actions_by_name: Dict[str, Action] = {}
    action_base_name_to_default_args: Dict[str, str] = {}
    action_base_names: Set[str] = set()
    for i, row in enumerate_markdown_file_lines_to_table_rows(
            actions_file_lines):
        if len(row) < MIN_COLUMNS_ACTIONS_FILE:
            raise ValueError(f"Row {i!r} does not contain enough entries. "
                             f"Got {row}.")

        shortened_base_name = row[7].strip() if len(row) > 7 else None
        action_base_name = row[0].strip()
        action_base_names.add(action_base_name)
        if not re.fullmatch(r'[a-z_]+', action_base_name):
            raise ValueError(f"Invald action base name {action_base_name} on "
                             f"row {i!r}. Please use snake_case.")

        type = ActionType.STATE_CHANGE
        if action_base_name.startswith("check_"):
            type = ActionType.STATE_CHECK

        output_unresolved_action_names = []
        output_actions_str = row[2].strip()
        if output_actions_str:
            type = ActionType.PARAMETERIZED
            # Output actions for parameterized actions can also specify (or
            # assume default) action arguments (e.g. `do_action(arg1)`) if the
            # parameterized action doesn't have a argument. However, they cannot
            # be fully resolved yet without reading all actions. So the
            # resolution must happen later.
            output_unresolved_action_names = [
                output.strip() for output in output_actions_str.split("&")
            ]

        (partially_supported_platforms,
         fully_supported_platforms) = supported_platform_actions.get(
             action_base_name, (set(), set()))

        # Parse the argument types, and save the defaults if they exist.
        arg_types: List[ArgEnum] = []
        defaults: List[str] = []
        for arg_type_str in row[1].split(","):
            arg_type_str = arg_type_str.strip()
            if not arg_type_str:
                continue
            if arg_type_str not in enums_by_type:
                raise ValueError(
                    f"Cannot find enum type {arg_type_str!r} on row {i!r}.")
            enum = enums_by_type[arg_type_str]
            arg_types.append(enum)
            if enum.default_value:
                defaults.append(enum.default_value)

        # If all arguments types have defaults, then save these defaults as the
        # default argument for this base action name.
        if len(defaults) > 0 and len(defaults) == len(arg_types):
            action_base_name_to_default_args[action_base_name] = (
                "_".join(defaults))

        # From each action row, resolve out the possible parameter arguments
        # and create one action per combination of arguments.

        all_arg_value_combinations: List[List[str]] = (
            enumerate_all_argument_combinations(arg_types))

        for arg_combination in all_arg_value_combinations:
            name = "_".join([action_base_name] + arg_combination)

            # If the action has arguments, then modify the output actions,
            # and cpp method.
            joined_cpp_arguments = ", ".join([
                f"{arg_types[i].type_name}::k{arg}"
                for i, arg in enumerate(arg_combination)
            ])

            # Convert the `cpp_method` to Pascal-case
            cpp_method = ''.join(word.title()
                                 for word in action_base_name.split('_'))
            cpp_method += "(" + joined_cpp_arguments + ")"

            # Resolve bash-replacement for any output actions. Resolving to
            # canonical names is not done here because the defaults map is not
            # fully populated yet.
            output_canonical_action_names: List[str] = []
            for human_friendly_action_name in output_unresolved_action_names:
                bash_replaced_name = resolve_bash_style_replacement(
                    human_friendly_action_name, arg_combination)

                # Handle any wildcards in the actions
                wildcart_expanded_actions = expand_wildcards_in_action(
                    bash_replaced_name, enums_by_type)

                # Output actions for parameterized actions are not allowed to
                # use 'defaults', and the action author must explicitly
                # populate all arguments with bash-style replacements or static
                # values.
                for action_name in wildcart_expanded_actions:
                    output_canonical_action_names.append(
                        human_friendly_name_to_canonical_action_name(
                            action_name, {}))

            if name in actions_by_name:
                raise ValueError(f"Cannot add duplicate action {name} on row "
                                 f"{i!r}")

            action = Action(name, action_base_name, shortened_base_name,
                            cpp_method, type, fully_supported_platforms,
                            partially_supported_platforms)
            action._output_canonical_action_names = (
                output_canonical_action_names)
            actions_by_name[action.name] = action

    unused_supported_actions = set(
        supported_platform_actions.keys()).difference(action_base_names)
    if unused_supported_actions:
        raise ValueError(f"Actions specified as supported that are not in "
                         f"the actions list: {unused_supported_actions}.")

    # Resolve the output actions
    for action in actions_by_name.values():
        if action.type is not ActionType.PARAMETERIZED:
            continue
        assert (action._output_canonical_action_names)
        for canonical_name in action._output_canonical_action_names:
            if canonical_name in actions_by_name:
                action.output_actions.append(actions_by_name[canonical_name])
            else:
                # Having this lookup fail is a feature, it allows a
                # parameterized action to reference output actions that might
                # not all support every value of the parameterized action.
                # When that argument is specified in a test case, then that
                # action would be excluded & one less test case would be
                # generated.
                logging.info(f"Output action {canonical_name} not found for "
                             f"parameterized action {action.name}.")
        if not action.output_actions:
            raise ValueError(
                f"Action {action} is a parameterized action, but "
                f"none of it's possible parameterized actions were"
                f" found: {action._output_canonical_action_names}")
    return (actions_by_name, action_base_name_to_default_args)


def read_unprocessed_coverage_tests_file(
        coverage_file_lines: List[str], actions_by_name: ActionsByName,
        enums_by_type: EnumsByType,
        action_base_name_to_default_arg: Dict[str, str]) -> List[CoverageTest]:
    """Reads the coverage tests markdown file.

    The coverage tests file can have blank entries in the test row, and does not
    have test names.

    Args:
        coverage_file_lines: The comma-separated-values file with all coverage
                           tests.
        actions_by_name: An index of action name to Action
        action_base_name_to_default_arg: An index of action base name to
                                           default argument, if there is one.

    Returns:
        A list of CoverageTests read from the file.

    Raises:
        ValueError: The input file is invalid.
    """
    missing_actions = []
    required_coverage_tests = []
    for i, row in enumerate_markdown_file_lines_to_table_rows(
            coverage_file_lines):
        if len(row) < MIN_COLUMNS_UNPROCESSED_COVERAGE_FILE:
            raise ValueError(f"Row {i!r} does not have test actions: {row}")
        platforms = TestPlatform.get_platforms_from_chars(row[0])
        if len(platforms) == 0:
            raise ValueError(f"Row {i} has invalid platforms: {row[0]}")
        # Filter out all blank actions.
        original_action_strs = [
            action_str for action_str in row[1:] if action_str.strip()
        ]
        # If any of the actions had parameter wildcards (like
        # "WindowOption::All"), then this expands those into multiple tests.
        expanded_tests = expand_tests_from_action_parameter_wildcards(
            enums_by_type, original_action_strs)
        for test_actions in expanded_tests:
            actions: List[Action] = []
            for action_name in test_actions:
                action_name = action_name.strip()
                if action_name == "":
                    continue
                action_name = human_friendly_name_to_canonical_action_name(
                    action_name, action_base_name_to_default_arg)
                if action_name not in actions_by_name:
                    missing_actions.append(action_name)
                    logging.error(f"Could not find action on row {i!r}: "
                                  f"{action_name}")
                    continue
                actions.append(actions_by_name[action_name])
            coverage_test = CoverageTest(actions, platforms)
            required_coverage_tests.append(coverage_test)
    if missing_actions:
        raise ValueError(f"Actions missing from actions dictionary: "
                         f"{', '.join(missing_actions)}")
    return required_coverage_tests


def get_and_maybe_delete_tests_in_browsertest(
    filename: str,
    required_tests: Set[TestIdTestNameTuple] = {},
    delete_in_place: bool = False
) -> Dict[TestIdTestNameTuple, Set[TestPlatform]]:
    """
    Returns a dictionary of all test ids and test names found to
    the set of detected platforms the test is enabled on.

    When delete_in_place is set to True, overwrite the file to remove tests not
    in required_tests.

    For reference, this is what a disabled test by a sheriff typically looks
    like:

    TEST_F(WebAppIntegrationTestBase, DISABLED_NavStandalone_InstallIconShown) {
        ...
    }

    In the above case, the test will be considered disabled on all platforms.
    This is what a test disabled by a sheriff on a specific platform looks like:

    #if BUILDFLAG(IS_WIN)
    #define MAYBE_NavStandalone_InstallIconShown \
            DISABLED_NavStandalone_InstallIconShown
    #else
    #define MAYBE_NavStandalone_InstallIconShown NavStandalone_InstallIconShown
    #endif
    TEST_F(WebAppIntegrationTestBase, MAYBE_NavStandalone_InstallIconShown) {
        ...
    }

    In the above case, the test will be considered disabled on
    `TestPlatform.WINDOWS` and thus enabled on {`TestPlatform.MAC`,
    `TestPlatform.CHROME_OS`, and `TestPlatform.LINUX`}.
    """
    tests: Dict[TestIdTestNameTuple, Set[TestPlatform]] = {}

    with open(filename, 'r') as fp:
        file = fp.read()
        result_file = file
        # Attempts to match a full test case, where the name contains the test
        # id prefix. Purposefully allows any prefixes on the test name (like
        # MAYBE_ or DISABLED_). Examples can be found here.
        # https://regex101.com/r/l1xnAJ/2
        for match in re.finditer(
                'IN_PROC_BROWSER_TEST_F[\\(\\w\\s,]+'
                fr'{CoverageTest.TEST_ID_PREFIX}([a-zA-Z0-9._-]+)\)'
                '\\s*{\n(?:\\s*\\/\\/.*\n)+((?:[^;^}}]+;\n)+)}', file):
            test_steps: List[str] = []
            if match.group(2):
                test_body = match.group(2).split(";")
                for line in test_body:
                    assert not line.strip().startswith("//")
                    test_steps.append(line.strip())
            test_id = generate_test_id_from_test_steps(test_steps)
            test_name = match.group(1)
            tests[TestIdTestNameTuple(test_id, test_name)] = set(TestPlatform)
            browser_test_name = f"{CoverageTest.TEST_ID_PREFIX}{test_name}"
            required_tests_ids = []
            for t in required_tests:
                required_tests_ids.append(t[0])
            if f"DISABLED_{browser_test_name}" not in file:
                if delete_in_place and test_id not in required_tests_ids:
                    del tests[TestIdTestNameTuple(test_id, test_name)]
                    # Remove the matching test code block when the test is not
                    # in required_tests
                    regex_to_remove = re.escape(match.group(0))
                    result_file = re.sub(regex_to_remove, '', result_file)
                continue
            enabled_platforms: Set[TestPlatform] = tests[TestIdTestNameTuple(
                test_id, test_name)]
            for platform in TestPlatform:
                # Search for macro that specifies the given platform before
                # the string "DISABLED_<test_name>".
                macro_for_regex = re.escape(platform.macro)
                # This pattern ensures that there aren't any '{' or '}'
                # characters between the macro and the disabled test name, which
                #  ensures that the macro is applying to the correct test.
                if re.search(
                        fr"{macro_for_regex}[^{{}}]+DISABLED_{browser_test_name}",
                        file):
                    enabled_platforms.remove(platform)
            if len(enabled_platforms) == len(TestPlatform):
                enabled_platforms.clear()
    if delete_in_place:
        with open(filename, 'w') as fp:
            fp.write(result_file)
    return tests


def find_existing_and_disabled_tests(
    test_partitions: List[TestPartitionDescription],
    required_coverage_by_platform_set: CoverageTestsByPlatformSet,
    delete_in_place: bool = False
) -> Tuple[TestIdsTestNamesByPlatformSet, TestIdsTestNamesByPlatform]:
    """
    Returns a dictionary of platform set to test id, and a dictionary of
    platform to disabled test ids.
    """
    existing_tests: TestIdsNamesByPlatformSet = defaultdict(lambda: set())
    disabled_tests: TestIdsNamesByPlatform = defaultdict(lambda: set())
    for partition in test_partitions:
        for file in os.listdir(partition.browsertest_dir):
            if not file.startswith(partition.test_file_prefix):
                continue
            platforms = frozenset(
                TestPlatform.get_platforms_from_browsertest_filename(file))
            filename = os.path.join(partition.browsertest_dir, file)
            required_tests = set(
                TestIdTestNameTuple(i.id, i.generate_test_name())
                for i in required_coverage_by_platform_set.get(platforms, []))
            tests = get_and_maybe_delete_tests_in_browsertest(
                filename, required_tests, delete_in_place)
            for test_id, test_name in tests.keys():
                if test_id in existing_tests[platforms]:
                    raise ValueError(f"Already found test {test_name}. "
                                     f"Duplicate test in {filename}")
                existing_tests[platforms].add(
                    TestIdTestNameTuple(test_id, test_name))
            for platform in platforms:
                for (test_id, test_name), enabled_platforms in tests.items():
                    if platform not in enabled_platforms:
                        disabled_tests[platform].add(
                            TestIdTestNameTuple(test_id, test_name))
            test_names = [test_name for (test_id, test_name) in tests.keys()]
            logging.info(f"Found tests in {filename}:\n{test_names}")
    return (existing_tests, disabled_tests)


def generate_test_id_from_test_steps(test_steps: List[str]) -> str:
    test_id = []
    for test_step in test_steps:
        # Examples of the matching regex.
        # https://regex101.com/r/UYlzkK/1
        match_test_step = re.search(r"helper_.(\w+)\(([\w,\s:]*)\)", test_step)
        if match_test_step:
            actions = re.findall('[A-Z][^A-Z]*', match_test_step.group(1))
            test_id += [a.lower() for a in actions]
            if match_test_step.group(2):
                parameters = [
                    m.strip() for m in match_test_step.group(2).split(',')
                ]
                for p in parameters:
                    match_param_value = re.match(r".*::k(.*)", p)
                    if match_param_value.group(1):
                        test_id.append(match_param_value.group(1))
    return "_".join(test_id)