import json
import os
import shutil
import tempfile
import time
from typing import (
    Any,
    Dict,
    List,
)

import pytest

from bioblend import ConnectionError
from . import (
    GalaxyTestBase,
    test_util,
)


class TestGalaxyWorkflows(GalaxyTestBase.GalaxyTestBase):
    @test_util.skip_unless_tool("cat1")
    @test_util.skip_unless_tool("cat")
    def test_workflow_scheduling(self):
        path = test_util.get_abspath(os.path.join("data", "test_workflow_pause.ga"))
        workflow = self.gi.workflows.import_workflow_from_local_path(path)
        workflow_id = workflow["id"]
        history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]

        invocations = self.gi.workflows.get_invocations(workflow_id)
        assert len(invocations) == 0

        # Try invalid invocation (no input)
        with pytest.raises(ConnectionError):
            self.gi.workflows.invoke_workflow(workflow["id"])

        dataset1_id = self._test_dataset(history_id)
        invocation = self.gi.workflows.invoke_workflow(
            workflow["id"],
            inputs={"0": {"src": "hda", "id": dataset1_id}},
        )
        assert invocation["state"] == "new"
        invocation_id = invocation["id"]
        invocations = self.gi.workflows.get_invocations(workflow_id)
        assert len(invocations) == 1
        assert invocations[0]["id"] == invocation_id

        def invocation_steps_by_order_index() -> Dict[int, Dict[str, Any]]:
            invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
            return {s["order_index"]: s for s in invocation["steps"]}

        for _ in range(20):
            if 2 in invocation_steps_by_order_index():
                break
            time.sleep(0.5)

        invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
        assert invocation["state"] == "ready"

        steps = invocation_steps_by_order_index()
        pause_step = steps[2]
        assert self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"] is None
        self.gi.workflows.run_invocation_step_action(workflow_id, invocation_id, pause_step["id"], action=True)
        assert self.gi.workflows.show_invocation_step(workflow_id, invocation_id, pause_step["id"])["action"]
        for _ in range(20):
            invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
            if invocation["state"] == "scheduled":
                break

            time.sleep(0.5)

        invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
        assert invocation["state"] == "scheduled"

    def test_invoke_workflow_parameters_normalized(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns_subworkflow.ga"))
        workflow_id = self.gi.workflows.import_workflow_from_local_path(path)["id"]
        history_id = self.gi.histories.create_history(name="TestWorkflowInvokeParametersNormalized")["id"]
        dataset_id = self._test_dataset(history_id)
        with pytest.raises(ConnectionError):
            self.gi.workflows.invoke_workflow(
                workflow_id, inputs={"0": {"src": "hda", "id": dataset_id}}, params={"1": {"1|2": "comma"}}
            )
        self.gi.workflows.invoke_workflow(
            workflow_id,
            inputs={"0": {"src": "hda", "id": dataset_id}},
            params={"1": {"1|2": "comma"}},
            parameters_normalized=True,
        )

    @test_util.skip_unless_galaxy("release_19.09")
    @test_util.skip_unless_tool("cat1")
    @test_util.skip_unless_tool("cat")
    def test_cancelling_workflow_scheduling(self):
        path = test_util.get_abspath(os.path.join("data", "test_workflow_pause.ga"))
        workflow = self.gi.workflows.import_workflow_from_local_path(path)
        workflow_id = workflow["id"]
        history_id = self.gi.histories.create_history(name="TestWorkflowState")["id"]
        dataset1_id = self._test_dataset(history_id)

        invocations = self.gi.workflows.get_invocations(workflow_id)
        assert len(invocations) == 0

        invocation = self.gi.workflows.invoke_workflow(
            workflow["id"],
            inputs={"0": {"src": "hda", "id": dataset1_id}},
        )
        invocation_id = invocation["id"]
        invocations = self.gi.workflows.get_invocations(workflow_id)
        assert len(invocations) == 1
        assert invocations[0]["id"] == invocation_id

        invocation = self.gi.workflows.show_invocation(workflow_id, invocation_id)
        assert invocation["state"] in ["new", "ready"]

        self.gi.workflows.cancel_invocation(workflow_id, invocation_id)
        invocation = self.gi.invocations.wait_for_invocation(invocation_id, check=False)
        assert invocation["state"] == "cancelled"

    def test_import_export_workflow_from_local_path(self):
        with pytest.raises(TypeError):
            self.gi.workflows.import_workflow_from_local_path(None)  # type: ignore[arg-type]
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        imported_wf = self.gi.workflows.import_workflow_from_local_path(path)
        assert isinstance(imported_wf, dict)
        assert imported_wf["name"] == "paste_columns"
        assert imported_wf["url"].startswith("/api/workflows/")
        assert not imported_wf["deleted"]
        assert not imported_wf["published"]
        with pytest.raises(TypeError):
            self.gi.workflows.export_workflow_to_local_path(None, None, None)  # type: ignore[arg-type]
        export_dir = tempfile.mkdtemp(prefix="bioblend_test_")
        try:
            self.gi.workflows.export_workflow_to_local_path(imported_wf["id"], export_dir)
            dir_contents = os.listdir(export_dir)
            assert len(dir_contents) == 1
            export_path = os.path.join(export_dir, dir_contents[0])
            with open(export_path) as f:
                exported_wf_dict = json.load(f)
        finally:
            shutil.rmtree(export_dir)
        assert isinstance(exported_wf_dict, dict)

    def test_import_publish_workflow_from_local_path(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        imported_wf = self.gi.workflows.import_workflow_from_local_path(path, publish=True)
        assert isinstance(imported_wf, dict)
        assert not imported_wf["deleted"]
        assert imported_wf["published"]

    def test_import_export_workflow_dict(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        with open(path) as f:
            wf_dict = json.load(f)
        imported_wf = self.gi.workflows.import_workflow_dict(wf_dict)
        assert isinstance(imported_wf, dict)
        assert imported_wf["name"] == "paste_columns"
        assert imported_wf["url"].startswith("/api/workflows/")
        assert not imported_wf["deleted"]
        assert not imported_wf["published"]
        exported_wf_dict = self.gi.workflows.export_workflow_dict(imported_wf["id"])
        assert isinstance(exported_wf_dict, dict)

    def test_import_publish_workflow_dict(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        with open(path) as f:
            wf_dict = json.load(f)
        imported_wf = self.gi.workflows.import_workflow_dict(wf_dict, publish=True)
        assert isinstance(imported_wf, dict)
        assert not imported_wf["deleted"]
        assert imported_wf["published"]

    def test_get_workflows(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        workflow = self.gi.workflows.import_workflow_from_local_path(path)
        all_wfs = self.gi.workflows.get_workflows()
        assert len(all_wfs) > 0
        wfs_with_name = self.gi.workflows.get_workflows(name=workflow["name"])
        wf_list = [w for w in wfs_with_name if w["id"] == workflow["id"]]
        assert len(wf_list) == 1
        wf_data = wf_list[0]
        if "create_time" in workflow:  # Galaxy >= 20.01
            assert wf_data["create_time"] == workflow["create_time"]
        else:  # Galaxy <= 22.01
            assert wf_data["url"] == workflow["url"]

    def test_show_workflow(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        wf_data = self.gi.workflows.show_workflow(wf["id"])
        assert wf_data["id"] == wf["id"]
        assert wf_data["name"] == wf["name"]
        assert wf_data["url"] == wf["url"]
        assert len(wf_data["steps"]) == 3
        assert wf_data["inputs"] is not None

    def test_update_workflow_name(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        new_name = "new name"
        updated_wf = self.gi.workflows.update_workflow(wf["id"], name=new_name)
        assert updated_wf["name"] == new_name

    @test_util.skip_unless_galaxy("release_21.01")
    def test_update_workflow_published(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        assert not wf["published"]
        updated_wf = self.gi.workflows.update_workflow(wf["id"], published=True)
        assert updated_wf["published"]
        updated_wf = self.gi.workflows.update_workflow(wf["id"], published=False)
        assert not updated_wf["published"]

    @test_util.skip_unless_galaxy(
        "release_19.09"
    )  # due to Galaxy bug fixed in https://github.com/galaxyproject/galaxy/pull/9014
    def test_show_workflow_versions(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        wf_data = self.gi.workflows.show_workflow(wf["id"])
        assert wf_data["version"] == 0
        new_name = "new name"
        self.gi.workflows.update_workflow(wf["id"], name=new_name)
        updated_wf = self.gi.workflows.show_workflow(wf["id"])
        assert updated_wf["name"] == new_name
        assert updated_wf["version"] == 1
        updated_wf = self.gi.workflows.show_workflow(wf["id"], version=0)
        assert updated_wf["name"] == "paste_columns"
        assert updated_wf["version"] == 0
        updated_wf = self.gi.workflows.show_workflow(wf["id"], version=1)
        assert updated_wf["name"] == new_name
        assert updated_wf["version"] == 1

    @test_util.skip_unless_galaxy("release_19.09")
    def test_extract_workflow_from_history(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        history_id = self.gi.histories.create_history(name="test_wf_invocation")["id"]
        dataset1_id = self._test_dataset(history_id)
        dataset = {"src": "hda", "id": dataset1_id}
        invocation_id = self.gi.workflows.invoke_workflow(
            wf["id"],
            inputs={"Input 1": dataset, "Input 2": dataset},
            history_id=history_id,
            inputs_by="name",
        )["id"]
        invocation = self.gi.invocations.wait_for_invocation(invocation_id)
        wf1 = self.gi.workflows.show_workflow(invocation["workflow_id"])
        datasets = self.gi.histories.show_history(invocation["history_id"], contents=True)
        dataset_hids = [dataset["hid"] for dataset in datasets]
        job_ids = [step["job_id"] for step in invocation["steps"] if step["job_id"]]

        for job_id in job_ids:
            self.gi.jobs.wait_for_job(job_id)

        new_workflow_name = "My new workflow!"
        wf2 = self.gi.workflows.extract_workflow_from_history(
            history_id=invocation["history_id"],
            workflow_name=new_workflow_name,
            job_ids=job_ids,
            dataset_hids=dataset_hids,
        )
        wf2 = self.gi.workflows.show_workflow(wf2["id"])
        assert wf2["name"] == new_workflow_name
        assert len(wf1["steps"]) == len(wf2["steps"])
        for i in range(len(wf1["steps"])):
            assert wf1["steps"][str(i)]["type"] == wf2["steps"][str(i)]["type"]
            assert wf1["steps"][str(i)]["tool_id"] == wf2["steps"][str(i)]["tool_id"]

    def test_show_versions(self):
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        versions = self.gi.workflows.show_versions(wf["id"])
        assert len(versions) == 1
        version = versions[0]
        assert version["version"] == 0
        assert "update_time" in version
        assert "steps" in version

    @test_util.skip_unless_galaxy("release_21.01")
    def test_refactor_workflow(self):
        actions: List[Dict[str, Any]] = [
            {"action_type": "add_input", "type": "data", "label": "foo"},
            {"action_type": "update_step_label", "label": "bar", "step": {"label": "foo"}},
        ]
        path = test_util.get_abspath(os.path.join("data", "paste_columns.ga"))
        wf = self.gi.workflows.import_workflow_from_local_path(path)
        response = self.gi.workflows.refactor_workflow(wf["id"], actions, dry_run=True)
        assert len(response["action_executions"]) == len(actions)
        assert response["dry_run"] is True
        updated_steps = response["workflow"]["steps"]
        assert len(updated_steps) == 4
        assert {step["label"] for step in updated_steps.values()} == {"bar", None, "Input 1", "Input 2"}
