# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Graph morphs are modifications to task-graphs that take place *after* the
optimization phase.

These graph morphs are largely invisible to developers running `./mach`
locally, so they should be limited to changes that do not modify the meaning of
the graph.
"""

# Note that the translation of `{'task-reference': '..'}` and
# `artifact-reference` are handled in the optimization phase (since
# optimization involves dealing with taskIds directly).  Similarly,
# `{'relative-datestamp': '..'}` is handled at the last possible moment during
# task creation.


import copy
import logging
import os
import re

from slugid import nice as slugid
from taskgraph.graph import Graph
from taskgraph.morph import register_morph
from taskgraph.task import Task
from taskgraph.taskgraph import TaskGraph

from .util.workertypes import get_worker_type

here = os.path.abspath(os.path.dirname(__file__))
logger = logging.getLogger(__name__)
MAX_ROUTES = 64


def amend_taskgraph(taskgraph, label_to_taskid, to_add):
    """Add the given tasks to the taskgraph, returning a new taskgraph"""
    new_tasks = taskgraph.tasks.copy()
    new_edges = set(taskgraph.graph.edges)
    for task in to_add:
        new_tasks[task.task_id] = task
        assert task.label not in label_to_taskid
        label_to_taskid[task.label] = task.task_id
        for depname, dep in task.dependencies.items():
            new_edges.add((task.task_id, dep, depname))

    taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges))
    return taskgraph, label_to_taskid


def derive_misc_task(
    target_task,
    purpose,
    image,
    taskgraph,
    label_to_taskid,
    parameters,
    graph_config,
    dependencies,
):
    """Create the shell of a task that depends on `dependencies` and on the given docker
    image."""
    label = f"{purpose}-{target_task.label}"

    # this is why all docker image tasks are included in the target task graph: we
    # need to find them in label_to_taskid, even if nothing else required them
    image_taskid = label_to_taskid["docker-image-" + image]

    provisioner_id, worker_type = get_worker_type(
        graph_config,
        parameters,
        "misc",
    )

    deps = copy.copy(dependencies)
    deps["docker-image"] = image_taskid

    task_def = {
        "provisionerId": provisioner_id,
        "workerType": worker_type,
        "dependencies": [d for d in deps.values()],
        "created": {"relative-datestamp": "0 seconds"},
        "deadline": target_task.task["deadline"],
        # no point existing past the parent task's deadline
        "expires": target_task.task["deadline"],
        "metadata": {
            "name": label,
            "description": f"{purpose} for {target_task.description}",
            "owner": target_task.task["metadata"]["owner"],
            "source": target_task.task["metadata"]["source"],
        },
        "scopes": [],
        "payload": {
            "image": {
                "path": "public/image.tar.zst",
                "taskId": image_taskid,
                "type": "task-image",
            },
            "features": {"taskclusterProxy": True},
            "maxRunTime": 600,
        },
    }

    if image_taskid not in taskgraph.tasks:
        # The task above depends on the replaced docker-image not one in
        # this current graph.
        del deps["docker-image"]

    task = Task(
        kind="misc",
        label=label,
        attributes={},
        task=task_def,
        dependencies=deps,
    )
    task.task_id = slugid()
    return task


# these regular expressions capture route prefixes for which we have a star
# scope, allowing them to be summarized.  Each should correspond to a star scope
# in each Gecko `assume:repo:hg.mozilla.org/...` role.
SCOPE_SUMMARY_REGEXPS = [
    re.compile(r"(index:insert-task:docker\.images\.v1\.[^.]*\.).*"),
    re.compile(r"(index:insert-task:gecko\.v2\.trunk\.revision\.).*"),
    re.compile(r"(index:insert-task:gecko\.v2\.[^.]*\.).*"),
    re.compile(r"(index:insert-task:comm\.v2\.[^.]*\.).*"),
]


def make_index_task(
    parent_task,
    taskgraph,
    label_to_taskid,
    parameters,
    graph_config,
    index_paths,
    index_rank,
    purpose,
    dependencies,
):
    task = derive_misc_task(
        parent_task,
        purpose,
        "index-task",
        taskgraph,
        label_to_taskid,
        parameters,
        graph_config,
        dependencies,
    )

    # we need to "summarize" the scopes, otherwise a particularly
    # namespace-heavy index task might have more scopes than can fit in a
    # temporary credential.
    scopes = set()
    for path in index_paths:
        scope = f"index:insert-task:{path}"
        for summ_re in SCOPE_SUMMARY_REGEXPS:
            match = summ_re.match(scope)
            if match:
                scope = match.group(1) + "*"
                break
        scopes.add(scope)
    task.task["scopes"] = sorted(scopes)

    task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths
    task.task["payload"]["env"] = {
        "TARGET_TASKID": parent_task.task_id,
        "INDEX_RANK": f"{index_rank}",
    }
    return task


@register_morph
def add_index_tasks(taskgraph, label_to_taskid, parameters, graph_config):
    """
    The TaskCluster queue only allows 64 routes on a task. In the event a task
    exceeds this limit, this graph morph adds "index tasks" that depend on it
    and do the index insertions directly, avoiding the limit on task.routes.
    """
    logger.debug("Morphing: adding index tasks")

    # Add indexes for tasks that exceed MAX_ROUTES.
    added = []
    for label, task in taskgraph.tasks.items():
        if len(task.task.get("routes", [])) <= MAX_ROUTES:
            continue
        index_paths = [
            r.split(".", 1)[1] for r in task.task["routes"] if r.startswith("index.")
        ]
        task.task["routes"] = [
            r for r in task.task["routes"] if not r.startswith("index.")
        ]
        added.append(
            make_index_task(
                task,
                taskgraph,
                label_to_taskid,
                parameters,
                graph_config,
                index_paths=index_paths,
                index_rank=task.task.get("extra", {}).get("index", {}).get("rank", 0),
                purpose="index-task",
                dependencies={"parent": task.task_id},
            )
        )

    if added:
        taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
        logger.info(f"Added {len(added)} index tasks")

    return taskgraph, label_to_taskid


@register_morph
def add_eager_cache_index_tasks(taskgraph, label_to_taskid, parameters, graph_config):
    """
    Some tasks (e.g. cached tasks) we want to exist in the index before they even
    run/complete. Our current use is to allow us to depend on an unfinished cached
    task in future pushes. This graph morph adds "eager-index tasks" that depend on
    the decision task and do the index insertions directly, which does not need to
    wait on the pointed at task to complete.
    """
    logger.debug("Morphing: Adding eager cached index's")

    added = []
    for label, task in taskgraph.tasks.items():
        if "eager_indexes" not in task.attributes:
            continue
        eager_indexes = task.attributes["eager_indexes"]
        added.append(
            make_index_task(
                task,
                taskgraph,
                label_to_taskid,
                parameters,
                graph_config,
                index_paths=eager_indexes,
                index_rank=0,  # Be sure complete tasks get priority
                purpose="eager-index",
                dependencies={},
            )
        )

    if added:
        taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
        logger.info(f"Added {len(added)} eager index tasks")
    return taskgraph, label_to_taskid


@register_morph
def add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config):
    return _add_try_task_duplicates(
        taskgraph, label_to_taskid, parameters, graph_config
    )


# this shim function exists so we can call it from the unittests.
# this works around an issue with
# third_party/python/taskcluster_taskgraph/taskgraph/morph.py#40
def _add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config):
    try_config = parameters.get("try_task_config", {})
    tasks = try_config.get("tasks", [])
    glob_tasks = {x.strip("-*") for x in tasks if x.endswith("-*")}
    tasks = set(tasks) - glob_tasks

    rebuild = try_config.get("rebuild")
    if rebuild:
        for task in taskgraph.tasks.values():
            chunk_index = -1
            if task.label.endswith("-cf"):
                chunk_index = -2
            label_parts = task.label.split("-")
            label_no_chunk = "-".join(label_parts[:chunk_index])

            if label_parts[chunk_index].isnumeric() and label_no_chunk in glob_tasks:
                task.attributes["task_duplicates"] = rebuild
            elif task.label in tasks:
                task.attributes["task_duplicates"] = rebuild
    return taskgraph, label_to_taskid
