# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Methods related to querying builder information from Buildbucket."""

import concurrent.futures
import json
import logging
import os
import subprocess
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

# vpython-provided modules.
import six

# //testing imports.
from unexpected_passes_common import constants
from unexpected_passes_common import data_types

TESTING_BUILDBOT_DIR = os.path.realpath(
    os.path.join(constants.CHROMIUM_SRC_DIR, 'testing', 'buildbot'))
INTERNAL_TESTING_BUILDBOT_DIR = os.path.realpath(
    os.path.join(constants.SRC_INTERNAL_DIR, 'testing', 'buildbot'))
INFRA_CONFIG_BUILDERS_DIR = os.path.realpath(
    os.path.join(constants.CHROMIUM_SRC_DIR, 'infra', 'config', 'generated',
                 'builders'))
INTERNAL_INFRA_CONFIG_BUILDERS_DIR = os.path.realpath(
    os.path.join(constants.SRC_INTERNAL_DIR, 'infra', 'config', 'generated',
                 'builders'))

# Public JSON files for internal builders, which should be treated as internal.
PUBLIC_INTERNAL_JSON_FILES = {
    'chrome.json',
    'chrome.gpu.fyi.json',
    'chromeos.preuprev.json',
    'internal.chrome.fyi.json',
    'internal.chromeos.fyi.json',
}

AUTOGENERATED_JSON_KEY = 'AAAAA1 AUTOGENERATED FILE DO NOT EDIT'

FakeBuildersDict = Dict[data_types.BuilderEntry, Set[data_types.BuilderEntry]]

# TODO(crbug.com/358591565): Refactor this to remove the need for global
# statements.
_registered_instance = None


def GetInstance() -> 'Builders':
  return _registered_instance


def RegisterInstance(instance: 'Builders') -> None:
  global _registered_instance  # pylint: disable=global-statement
  assert _registered_instance is None
  assert isinstance(instance, Builders)
  _registered_instance = instance


def ClearInstance() -> None:
  global _registered_instance  # pylint: disable=global-statement
  _registered_instance = None


class Builders():
  def __init__(self, suite: Optional[str], include_internal_builders: bool):
    """
    Args:
      suite: A string containing particular suite of interest if applicable,
          such as for Telemetry-based tests. Can be None if not applicable.
      include_internal_builders: A boolean indicating whether data from
          internal builders should be used in addition to external ones.
    """
    self._authenticated = False
    self._suite = suite
    self._include_internal_builders = include_internal_builders

  def _ProcessTestingBuildbotJsonFiles(
      self, files: List[str], are_internal_files: bool,
      builder_type: str) -> Set[data_types.BuilderEntry]:
    builders = set()
    for filepath in files:
      if not filepath.endswith('.json'):
        continue
      if builder_type == constants.BuilderTypes.CI:
        if 'tryserver' in filepath:
          continue
      elif builder_type == constants.BuilderTypes.TRY:
        if 'tryserver' not in filepath:
          continue
      with open(filepath, encoding='utf-8') as f:
        buildbot_json = json.load(f)
      # Skip any JSON files that don't contain builder information.
      if AUTOGENERATED_JSON_KEY not in buildbot_json:
        continue

      for builder, test_map in buildbot_json.items():
        # Remove the auto-generated comments.
        if 'AAAA' in builder:
          continue
        # Filter out any builders that don't run the suite in question.
        if not self._BuilderRunsTestOfInterest(test_map):
          continue
        builders.add(
            data_types.BuilderEntry(builder, builder_type, are_internal_files))
    return builders

  def _ProcessInfraConfigJsonFiles(
      self, files: List[Tuple[str, str]], are_internal_files: bool,
      builder_type: str) -> Set[data_types.BuilderEntry]:
    builders = set()
    for builder_name, filepath in files:
      if not filepath.endswith('.json'):
        raise RuntimeError(f'Given path {filepath} was not a JSON file')
      with open(filepath, encoding='utf-8') as f:
        targets_json = json.load(f)

      # For CI builders, we can directly use the builder name from the JSON
      # file, as this will always be a valid CI builder name. Additionally, this
      # properly handles cases of a parent builder triggering a child tester -
      # the parent builder's JSON contains the names of the child testers.
      # For trybots, we want to instead use the builder name from the filepath.
      # This is because trybots that mirror CI builders contain the CI builder
      # names in the JSON, but we want the trybot name.
      for ci_builder_name, test_map in targets_json.items():
        if not self._BuilderRunsTestOfInterest(test_map):
          continue
        if builder_type == constants.BuilderTypes.CI:
          builders.add(
              data_types.BuilderEntry(ci_builder_name, builder_type,
                                      are_internal_files))
        else:
          builders.add(
              data_types.BuilderEntry(builder_name, builder_type,
                                      are_internal_files))
    return builders

  def GetCiBuilders(self) -> Set[data_types.BuilderEntry]:
    """Gets the set of CI builders to query.

    Returns:
      A set of data_types.BuilderEntry, each element corresponding to either a
      public or internal CI builder to query results from.
    """
    ci_builders = set()

    logging.info('Getting CI builders')
    ci_builders = self._ProcessTestingBuildbotJsonFiles(
        _GetPublicTestingBuildbotJsonFiles(), False, constants.BuilderTypes.CI)
    ci_builders |= self._ProcessInfraConfigJsonFiles(
        _GetPublicInfraConfigCiJsonFiles(), False, constants.BuilderTypes.CI)
    if self._include_internal_builders:
      ci_builders |= self._ProcessTestingBuildbotJsonFiles(
          _GetInternalTestingBuildbotJsonFiles(), True,
          constants.BuilderTypes.CI)
      ci_builders |= self._ProcessInfraConfigJsonFiles(
          _GetInternalInfraConfigCiJsonFiles(), True, constants.BuilderTypes.CI)

    logging.debug('Got %d CI builders after trimming: %s', len(ci_builders),
                  ', '.join([b.name for b in ci_builders]))
    return ci_builders

  def _BuilderRunsTestOfInterest(self, test_map: Dict[str, Any]) -> bool:
    """Determines if a builder runs a test of interest.

    Args:
      test_map: A dict, corresponding to a builder's test spec from a
          //testing/buildbot JSON file.
      suite: A string containing particular suite of interest if applicable,
          such as for Telemetry-based tests. Can be None if not applicable.

    Returns:
      True if |test_map| contains a test of interest, else False.
    """
    raise NotImplementedError()

  def GetTryBuilders(self, ci_builders: Iterable[data_types.BuilderEntry]
                     ) -> Set[data_types.BuilderEntry]:
    """Gets the set of try builders to query.

    A try builder is of interest if it mirrors a builder in |ci_builders| or is
    a dedicated try builder.

    Args:
      ci_builders: An iterable of data_types.BuilderEntry, each element being a
          public or internal CI builder that results will be/were queried from.

    Returns:
      A set of data_types.BuilderEntry, each element being the name of a
      Chromium try builder to query results from.
    """
    logging.info('Getting try builders')
    dedicated_try_builders = self._ProcessTestingBuildbotJsonFiles([
        os.path.join(TESTING_BUILDBOT_DIR, f)
        for f in os.listdir(TESTING_BUILDBOT_DIR)
    ], False, constants.BuilderTypes.TRY)
    dedicated_try_builders |= self._ProcessInfraConfigJsonFiles(
        _GetPublicInfraConfigTryJsonFiles(), False, constants.BuilderTypes.TRY)
    if self._include_internal_builders:
      dedicated_try_builders |= self._ProcessTestingBuildbotJsonFiles([
          os.path.join(INTERNAL_TESTING_BUILDBOT_DIR, f)
          for f in os.listdir(INTERNAL_TESTING_BUILDBOT_DIR)
      ], True, constants.BuilderTypes.TRY)
      dedicated_try_builders |= self._ProcessInfraConfigJsonFiles(
          _GetInternalInfraConfigTryJsonFiles(), True,
          constants.BuilderTypes.TRY)
    mirrored_builders = set()
    no_output_builders = set()

    with concurrent.futures.ThreadPoolExecutor(
        max_workers=os.cpu_count()) as pool:
      results_iter = pool.map(self._GetMirroredBuildersForCiBuilder,
                              ci_builders)
      for (builders, found_mirror) in results_iter:
        if found_mirror:
          mirrored_builders |= builders
        else:
          no_output_builders |= builders

    if no_output_builders:
      raise RuntimeError(
          'Did not get Buildbucket output for the following builders. They may '
          'need to be added to the GetFakeCiBuilders or '
          'GetNonChromiumBuilders .\n%s' %
          '\n'.join([b.name for b in no_output_builders]))
    logging.debug('Got %d try builders: %s', len(mirrored_builders),
                  mirrored_builders)
    return dedicated_try_builders | mirrored_builders

  def _GetMirroredBuildersForCiBuilder(
      self, ci_builder: data_types.BuilderEntry
  ) -> Tuple[Set[data_types.BuilderEntry], bool]:
    """Gets the set of try builders that mirror a CI builder.

    Args:
      ci_builder: A data_types.BuilderEntry for a public or internal CI builder.

    Returns:
      A tuple (builders, found_mirror). |builders| is a set of
      data_types.BuilderEntry, either the set of try builders that mirror
      |ci_builder| or |ci_builder|, depending on the value of |found_mirror|.
      |found_mirror| is True if mirrors were actually found, in which case
      |builders| contains the try builders. Otherwise, |found_mirror| is False
      and |builders| contains |ci_builder|.
    """
    mirrored_builders = set()
    if ci_builder in self.GetNonChromiumBuilders():
      logging.debug('%s is a non-Chromium CI builder', ci_builder.name)
      return mirrored_builders, True

    fake_builders = self.GetFakeCiBuilders()
    if ci_builder in fake_builders:
      mirrored_builders |= fake_builders[ci_builder]
      logging.debug('%s is a fake CI builder mirrored by %s', ci_builder.name,
                    ', '.join(b.name for b in fake_builders[ci_builder]))
      return mirrored_builders, True

    bb_output = self._GetBuildbucketOutputForCiBuilder(ci_builder)
    if not bb_output:
      mirrored_builders.add(ci_builder)
      logging.debug('Did not get Buildbucket output for builder %s',
                    ci_builder.name)
      return mirrored_builders, False

    bb_json = json.loads(bb_output)
    mirrored = bb_json.get('output', {}).get('properties',
                                             {}).get('mirrored_builders', [])
    # The mirror names from Buildbucket include the group separated by :, e.g.
    # tryserver.chromium.android:gpu-fyi-try-android-m-nexus-5x-64, so only grab
    # the builder name.
    for mirror in mirrored:
      split = mirror.split(':')
      assert len(split) == 2
      logging.debug('Got mirrored builder for %s: %s', ci_builder.name,
                    split[1])
      mirrored_builders.add(
          data_types.BuilderEntry(split[1], constants.BuilderTypes.TRY,
                                  ci_builder.is_internal_builder))
    return mirrored_builders, True

  def _GetBuildbucketOutputForCiBuilder(self,
                                        ci_builder: data_types.BuilderEntry
                                        ) -> str:
    # Ensure the user is logged in to bb.
    if not self._authenticated:
      try:
        with open(os.devnull, 'w', newline='', encoding='utf-8') as devnull:
          subprocess.check_call(['bb', 'auth-info'],
                                stdout=devnull,
                                stderr=devnull)
      except subprocess.CalledProcessError as e:
        six.raise_from(
            RuntimeError('You are not logged into bb - run `bb auth-login`.'),
            e)
      self._authenticated = True
    # Split out for ease of testing.
    # Get the Buildbucket ID for the most recent completed build for a builder.
    p = subprocess.Popen([
        'bb',
        'ls',
        '-id',
        '-1',
        '-status',
        'ended',
        '%s/ci/%s' % (ci_builder.project, ci_builder.name),
    ],
                         stdout=subprocess.PIPE)
    # Use the ID to get the most recent build.
    bb_output = subprocess.check_output([
        'bb',
        'get',
        '-A',
        '-json',
    ],
                                        stdin=p.stdout,
                                        text=True)
    return bb_output

  def GetIsolateNames(self) -> Set[str]:
    """Gets the isolate names that are relevant to this implementation.

    Returns:
      A set of strings, each element being the name of an isolate of interest.
    """
    raise NotImplementedError()

  def GetFakeCiBuilders(self) -> FakeBuildersDict:
    """Gets a mapping of fake CI builders to their mirrored trybots.

    Returns:
      A dict of data_types.BuilderEntry -> set(data_types.BuilderEntry). Each
      key is a CI builder that doesn't actually exist and each value is a set of
      try builders that mirror the CI builder but do exist.
    """
    raise NotImplementedError()

  def GetNonChromiumBuilders(self) -> Set[data_types.BuilderEntry]:
    """Gets the builders that are not actual Chromium builders.

    These are listed in the Chromium //testing/buildbot files, but aren't under
    the Chromium Buildbucket project. These don't use the same recipes as
    Chromium builders, and thus don't have the list of trybot mirrors.

    Returns:
      A set of data_types.BuilderEntry, each element being a non-Chromium
      builder.
    """
    raise NotImplementedError()


def _GetPublicTestingBuildbotJsonFiles() -> List[str]:
  return [
      os.path.join(TESTING_BUILDBOT_DIR, f)
      for f in os.listdir(TESTING_BUILDBOT_DIR)
      if f not in PUBLIC_INTERNAL_JSON_FILES
  ]


def _GetInternalTestingBuildbotJsonFiles() -> List[str]:
  internal_files = [
      os.path.join(INTERNAL_TESTING_BUILDBOT_DIR, f)
      for f in os.listdir(INTERNAL_TESTING_BUILDBOT_DIR)
  ]
  public_internal_files = [
      os.path.join(TESTING_BUILDBOT_DIR, f)
      for f in os.listdir(TESTING_BUILDBOT_DIR)
      if f in PUBLIC_INTERNAL_JSON_FILES
  ]
  return internal_files + public_internal_files


def _GetPublicInfraConfigCiJsonFiles() -> List[Tuple[str, str]]:
  return _GetInfraConfigJsonFiles(INFRA_CONFIG_BUILDERS_DIR, 'ci')


def _GetInternalInfraConfigCiJsonFiles() -> List[Tuple[str, str]]:
  return _GetInfraConfigJsonFiles(INTERNAL_INFRA_CONFIG_BUILDERS_DIR, 'ci')


def _GetPublicInfraConfigTryJsonFiles() -> List[Tuple[str, str]]:
  return _GetInfraConfigJsonFiles(INFRA_CONFIG_BUILDERS_DIR, 'try')


def _GetInternalInfraConfigTryJsonFiles() -> List[Tuple[str, str]]:
  return _GetInfraConfigJsonFiles(INTERNAL_INFRA_CONFIG_BUILDERS_DIR, 'try')


def _GetInfraConfigJsonFiles(builders_dir: str,
                             subdirectory: str) -> List[Tuple[str, str]]:
  """Gets the relevant //infra/config JSON files.

  Args:
    builders_dir: The generated builders directory to look in, mainly for
        specifying whether to look for public or internal files.
    subdirectory: The subdirectory in |builders_dir| to look in, mainly for
        specifying whether to look for CI or try builders.

  Returns:
    A list of tuples (builder_name, filepath). |builder_name| is the name of the
    builder that was found, while |filepath| is the path to a generated JSON
    file.
  """
  json_files = []
  group_path = os.path.join(builders_dir, subdirectory)
  for builder_name in os.listdir(group_path):
    target_dir = os.path.join(group_path, builder_name, 'targets')
    if not os.path.exists(target_dir):
      continue
    for target_file in os.listdir(target_dir):
      if not target_file.endswith('.json'):
        continue
      json_files.append((builder_name, os.path.join(target_dir, target_file)))

  return json_files
