# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

from pathlib import Path
from shutil import copytree

import numpy as np
import pytest
from numpy.testing import (
    assert_allclose,
    assert_array_equal,
    assert_array_less,
    assert_equal,
)

import mne
from mne import (
    SourceEstimate,
    add_source_space_distances,
    compute_source_morph,
    get_volume_labels_from_src,
    make_sphere_model,
    morph_source_spaces,
    pick_types,
    read_bem_solution,
    read_bem_surfaces,
    read_freesurfer_lut,
    read_source_spaces,
    read_trans,
    setup_source_space,
    setup_volume_source_space,
    write_source_spaces,
)
from mne._fiff.constants import FIFF
from mne._fiff.pick import _picks_to_idx
from mne.datasets import testing
from mne.fixes import _get_img_fdata
from mne.source_estimate import _get_src_type
from mne.source_space import (
    compute_distance_to_sensors,
    get_decimated_surfaces,
)
from mne.source_space._source_space import _compare_source_spaces
from mne.surface import _accumulate_normals, _triangle_neighbors
from mne.utils import _record_warnings, requires_mne, run_subprocess

data_path = testing.data_path(download=False)
subjects_dir = data_path / "subjects"
fname_mri = data_path / "subjects" / "sample" / "mri" / "T1.mgz"
aseg_fname = data_path / "subjects" / "sample" / "mri" / "aseg.mgz"
fname = subjects_dir / "sample" / "bem" / "sample-oct-6-src.fif"
fname_vol = subjects_dir / "sample" / "bem" / "sample-volume-7mm-src.fif"
fname_bem = data_path / "subjects" / "sample" / "bem" / "sample-1280-bem.fif"
fname_bem_sol = data_path / "subjects" / "sample" / "bem" / "sample-1280-bem-sol.fif"
fname_bem_3 = (
    data_path / "subjects" / "sample" / "bem" / "sample-1280-1280-1280-bem.fif"
)
fname_bem_3_sol = (
    data_path / "subjects" / "sample" / "bem" / "sample-1280-1280-1280-bem-sol.fif"
)
fname_fs = subjects_dir / "fsaverage" / "bem" / "fsaverage-ico-5-src.fif"
fname_morph = subjects_dir / "sample" / "bem" / "sample-fsaverage-ico-5-src.fif"
fname_src = data_path / "subjects" / "sample" / "bem" / "sample-oct-4-src.fif"
fname_fwd = data_path / "MEG" / "sample" / "sample_audvis_trunc-meg-eeg-oct-4-fwd.fif"
trans_fname = data_path / "MEG" / "sample" / "sample_audvis_trunc-trans.fif"
base_dir = Path(__file__).parents[2] / "io" / "tests" / "data"
fname_small = base_dir / "small-src.fif.gz"
fname_ave = base_dir / "test-ave.fif"
rng = np.random.RandomState(0)


@testing.requires_testing_data
@pytest.mark.parametrize(
    "picks, limits",
    [
        ("meg", (0.02, 0.250)),
        (None, (0.01, 0.250)),  # should be same as EEG
        ("eeg", (0.01, 0.250)),
    ],
)
def test_compute_distance_to_sensors(picks, limits):
    """Test computation of distances between vertices and sensors."""
    src = read_source_spaces(fname_src)
    fwd = mne.read_forward_solution(fname_fwd)
    info = fwd["info"]
    trans = read_trans(trans_fname)
    # trans = fwd['info']['mri_head_t']
    if isinstance(picks, str):
        kwargs = dict()
        kwargs[picks] = True
        if picks == "eeg":
            info["dev_head_t"] = None  # should not break anything
        use_picks = pick_types(info, **kwargs, exclude=())
    else:
        use_picks = picks
    n_picks = len(_picks_to_idx(info, use_picks, "data", exclude=()))

    # Make sure same vertices are used in src and fwd
    src[0]["inuse"] = fwd["src"][0]["inuse"]
    src[1]["inuse"] = fwd["src"][1]["inuse"]
    src[0]["nuse"] = fwd["src"][0]["nuse"]
    src[1]["nuse"] = fwd["src"][1]["nuse"]

    n_verts = src[0]["nuse"] + src[1]["nuse"]

    # minimum distances between vertices and sensors
    depths = compute_distance_to_sensors(src, info=info, picks=use_picks, trans=trans)
    assert depths.shape == (n_verts, n_picks)
    assert limits[0] * 5 > depths.min()  # meaningful choice of limits
    assert_array_less(limits[0], depths)
    assert_array_less(depths, limits[1])

    # If source space from Forward Solution and trans=None (i.e. identity) then
    # depths2 should be the same as depth.
    depths2 = compute_distance_to_sensors(
        src=fwd["src"], info=info, picks=use_picks, trans=None
    )
    assert_allclose(depths, depths2, rtol=1e-5)

    if picks != "eeg":
        # this should break things
        info["dev_head_t"] = None
        with pytest.raises(ValueError, match="Transform between meg<->head"):
            compute_distance_to_sensors(src, info, use_picks, trans)


def _read_small_src(remove=True):
    src = read_source_spaces(fname_small)
    if remove:
        for s in src:
            s["nearest"] = None
            s["nearest_dist"] = None
            s["pinfo"] = None
    return src


def test_add_patch_info(monkeypatch):
    """Test adding patch info to source space."""
    # let's setup a small source space
    src = _read_small_src(remove=False)
    src_new = _read_small_src()

    # test that no patch info is added for small dist_limit
    add_source_space_distances(src_new, dist_limit=0.00001)
    assert all(s["nearest"] is None for s in src_new)
    assert all(s["nearest_dist"] is None for s in src_new)
    assert all(s["pinfo"] is None for s in src_new)

    # now let's use one that works (and test our warning-throwing)
    with monkeypatch.context() as m:
        m.setattr(mne.source_space._source_space, "_DIST_WARN_LIMIT", 1)
        with pytest.warns(RuntimeWarning, match="Computing distances for 258"):
            add_source_space_distances(src_new)
    _compare_source_spaces(src, src_new, "approx")

    src_nodist = src.copy()
    for s in src_nodist:
        for key in ("dist", "dist_limit"):
            s[key] = None
    add_source_space_distances(src_new, dist_limit=0)
    _compare_source_spaces(src, src_new, "approx")


# We could test "src_py" here, but we can rely on our existing tests to
# make sure the pinfo/patch_inds/nearest match
@testing.requires_testing_data
@pytest.mark.parametrize("src_kind", ["fwd", "src"])
def test_surface_source_space_doc(src_kind):
    """Test surface source space docstring."""
    # make sure we're correct about this stuff for both kinds!
    if src_kind == "fwd":
        src = mne.read_source_spaces(fname_fwd)
    else:
        assert src_kind == "src"
        src = mne.read_source_spaces(fname_src)
    for s in src:
        if src_kind == "src":  # original
            assert len(s["pinfo"]) == s["nuse"]
            assert_array_equal(s["patch_inds"], np.arange(s["nuse"]))
        else:  # pts removed
            assert len(s["pinfo"]) > s["nuse"]
        all_pinfo = np.concatenate(s["pinfo"])
        assert_array_equal(np.sort(all_pinfo), np.arange(s["np"]))
        assert len(s["patch_inds"]) == s["nuse"]
        assert len(s["vertno"]) == s["nuse"]
        assert len(s["patch_inds"]) == s["nuse"]
        for idx in (0, 42, 173):
            this_dense_vertex = s["vertno"][idx]
            # 'pinfo'
            this_vertex_represents = s["pinfo"][s["patch_inds"][idx]]
            assert len(this_vertex_represents) > 1
            # 'nearest'
            for other in this_vertex_represents:
                assert s["nearest"][other] == this_dense_vertex


@testing.requires_testing_data
def test_add_source_space_distances_limited(tmp_path):
    """Test adding distances to source space with a dist_limit."""
    src = read_source_spaces(fname)
    src_new = read_source_spaces(fname)
    del src_new[0]["dist"]
    del src_new[1]["dist"]
    n_do = 200  # limit this for speed
    src_new[0]["vertno"] = src_new[0]["vertno"][:n_do].copy()
    src_new[1]["vertno"] = src_new[1]["vertno"][:n_do].copy()
    out_name = tmp_path / "temp-src.fif"
    add_source_space_distances(src_new, dist_limit=0.007)
    write_source_spaces(out_name, src_new)
    src_new = read_source_spaces(out_name)

    for so, sn in zip(src, src_new):
        assert_array_equal(so["dist_limit"], np.array([-0.007], np.float32))
        assert_array_equal(sn["dist_limit"], np.array([0.007], np.float32))
        do = so["dist"]
        dn = sn["dist"]

        # clean out distances > 0.007 in C code
        do.data[do.data > 0.007] = 0
        do.eliminate_zeros()

        # make sure we have some comparable distances
        assert np.sum(do.data < 0.007) > 400

        # do comparison over the region computed
        d = (do - dn)[: sn["vertno"][n_do - 1]][:, : sn["vertno"][n_do - 1]]
        assert_allclose(np.zeros_like(d.data), d.data, rtol=0, atol=1e-6)


@pytest.mark.slowtest
@testing.requires_testing_data
def test_add_source_space_distances(tmp_path):
    """Test adding distances to source space."""
    src = read_source_spaces(fname)
    src_new = read_source_spaces(fname)
    del src_new[0]["dist"]
    del src_new[1]["dist"]
    n_do = 19  # limit this for speed
    src_new[0]["vertno"] = src_new[0]["vertno"][:n_do].copy()
    src_new[1]["vertno"] = src_new[1]["vertno"][:n_do].copy()
    out_name = tmp_path / "temp-src.fif"
    n_jobs = 2
    assert n_do % n_jobs != 0
    with pytest.raises(ValueError, match="non-negative"):
        add_source_space_distances(src_new, dist_limit=-1)
    add_source_space_distances(src_new, n_jobs=n_jobs)
    write_source_spaces(out_name, src_new)
    src_new = read_source_spaces(out_name)

    # iterate over both hemispheres
    for so, sn in zip(src, src_new):
        v = so["vertno"][:n_do]
        assert_array_equal(so["dist_limit"], np.array([-0.007], np.float32))
        assert_array_equal(sn["dist_limit"], np.array([np.inf], np.float32))
        do = so["dist"]
        dn = sn["dist"]

        # clean out distances > 0.007 in C code (some residual), and Python
        ds = list()
        for d in [do, dn]:
            d.data[d.data > 0.007] = 0
            d = d[v][:, v]
            d.eliminate_zeros()
            ds.append(d)

        # make sure we actually calculated some comparable distances
        assert np.sum(ds[0].data < 0.007) > 10

        # do comparison
        d = ds[0] - ds[1]
        assert_allclose(np.zeros_like(d.data), d.data, rtol=0, atol=1e-9)


@testing.requires_testing_data
@requires_mne
def test_discrete_source_space(tmp_path):
    """Test setting up (and reading/writing) discrete source spaces."""
    pytest.importorskip("nibabel")
    src = read_source_spaces(fname)
    v = src[0]["vertno"]

    # let's make a discrete version with the C code, and with ours
    temp_name = tmp_path / "temp-src.fif"
    # save
    temp_pos = tmp_path / "temp-pos.txt"
    np.savetxt(str(temp_pos), np.c_[src[0]["rr"][v], src[0]["nn"][v]])
    # let's try the spherical one (no bem or surf supplied)
    run_subprocess(
        ["mne_volume_source_space", "--meters", "--pos", temp_pos, "--src", temp_name]
    )
    src_c = read_source_spaces(temp_name)
    pos_dict = dict(rr=src[0]["rr"][v], nn=src[0]["nn"][v])
    src_new = setup_volume_source_space(pos=pos_dict)
    assert src_new.kind == "discrete"
    _compare_source_spaces(src_c, src_new, mode="approx")
    assert_allclose(src[0]["rr"][v], src_new[0]["rr"], rtol=1e-3, atol=1e-6)
    assert_allclose(src[0]["nn"][v], src_new[0]["nn"], rtol=1e-3, atol=1e-6)

    # now do writing
    write_source_spaces(temp_name, src_c, overwrite=True)
    src_c2 = read_source_spaces(temp_name)
    _compare_source_spaces(src_c, src_c2)

    # now do MRI
    with pytest.raises(ValueError, match="Cannot create interpolation"):
        setup_volume_source_space("sample", pos=pos_dict, mri=fname_mri)
    assert repr(src_new).split("~")[0] == repr(src_c).split("~")[0]
    assert " KiB" in repr(src_new)
    assert src_new.kind == "discrete"
    assert _get_src_type(src_new, None) == "discrete"

    with pytest.raises(RuntimeError, match="finite"):
        setup_volume_source_space(pos=dict(rr=[[0, 0, float("inf")]], nn=[[0, 1, 0]]))


@pytest.mark.slowtest
@testing.requires_testing_data
def test_volume_source_space(tmp_path):
    """Test setting up volume source spaces."""
    pytest.importorskip("nibabel")
    src = read_source_spaces(fname_vol)
    temp_name = tmp_path / "temp-src.fif"
    surf = read_bem_surfaces(fname_bem, s_id=FIFF.FIFFV_BEM_SURF_ID_BRAIN)
    surf["rr"] *= 1e3  # convert to mm
    bem_sol = read_bem_solution(fname_bem_3_sol)
    bem = read_bem_solution(fname_bem_sol)
    # The one in the testing dataset (uses bem as bounds)
    for this_bem, this_surf in zip(
        (bem, fname_bem, fname_bem_3, bem_sol, fname_bem_3_sol, None),
        (None, None, None, None, None, surf),
    ):
        src_new = setup_volume_source_space(
            "sample",
            pos=7.0,
            bem=this_bem,
            surface=this_surf,
            subjects_dir=subjects_dir,
        )
        write_source_spaces(temp_name, src_new, overwrite=True)
        src[0]["subject_his_id"] = "sample"  # XXX: to make comparison pass
        _compare_source_spaces(src, src_new, mode="approx")
        del src_new
        src_new = read_source_spaces(temp_name)
        _compare_source_spaces(src, src_new, mode="approx")
    with pytest.raises(OSError, match="surface file.*not exist"):
        setup_volume_source_space(
            "sample", surface="foo", mri=fname_mri, subjects_dir=subjects_dir
        )
    bem["surfs"][-1]["coord_frame"] = FIFF.FIFFV_COORD_HEAD
    with pytest.raises(ValueError, match="BEM is not in MRI coord.* got head"):
        setup_volume_source_space(
            "sample", bem=bem, mri=fname_mri, subjects_dir=subjects_dir
        )
    bem["surfs"] = bem["surfs"][:-1]  # no inner skull surf
    with pytest.raises(ValueError, match="Could not get inner skul.*from BEM"):
        setup_volume_source_space(
            "sample", bem=bem, mri=fname_mri, subjects_dir=subjects_dir
        )
    del bem
    assert repr(src) == repr(src_new)
    assert " MiB" in repr(src)
    assert src.kind == "volume"
    # Spheres
    sphere = make_sphere_model(
        r0=(0.0, 0.0, 0.0),
        head_radius=0.1,
        relative_radii=(0.9, 1.0),
        sigmas=(0.33, 1.0),
    )
    src = setup_volume_source_space(pos=10, sphere=(0.0, 0.0, 0.0, 0.09))
    src_new = setup_volume_source_space(pos=10, sphere=sphere)
    _compare_source_spaces(src, src_new, mode="exact")
    with pytest.raises(ValueError, match="sphere, if str"):
        setup_volume_source_space(sphere="foo")
    # Need a radius
    sphere = make_sphere_model(head_radius=None)
    with pytest.raises(ValueError, match="be spherical with multiple layers"):
        setup_volume_source_space(sphere=sphere)


@testing.requires_testing_data
@requires_mne
def test_other_volume_source_spaces(tmp_path):
    """Test setting up other volume source spaces."""
    # these are split off because they require the MNE tools, and
    # Travis doesn't seem to like them
    pytest.importorskip("nibabel")

    # let's try the spherical one (no bem or surf supplied)
    temp_name = tmp_path / "temp-src.fif"
    run_subprocess(
        [
            "mne_volume_source_space",
            "--grid",
            "7.0",
            "--src",
            temp_name,
            "--mri",
            fname_mri,
        ]
    )
    src = read_source_spaces(temp_name)
    sphere = (0.0, 0.0, 0.0, 0.09)
    src_new = setup_volume_source_space(
        None, pos=7.0, mri=fname_mri, subjects_dir=subjects_dir, sphere=sphere
    )
    # we use a more accurate elimination criteria, so let's fix the MNE-C
    # source space
    assert len(src_new[0]["vertno"]) == 7497
    assert len(src) == 1
    assert len(src_new) == 1
    good_mask = np.isin(src[0]["vertno"], src_new[0]["vertno"])
    src[0]["inuse"][src[0]["vertno"][~good_mask]] = 0
    assert src[0]["inuse"].sum() == 7497
    src[0]["vertno"] = src[0]["vertno"][good_mask]
    assert len(src[0]["vertno"]) == 7497
    src[0]["nuse"] = len(src[0]["vertno"])
    assert src[0]["nuse"] == 7497
    _compare_source_spaces(src_new, src, mode="approx")
    assert "volume, shape" in repr(src)
    del src
    del src_new
    pytest.raises(
        ValueError,
        setup_volume_source_space,
        "sample",
        pos=7.0,
        sphere=[1.0, 1.0],
        mri=fname_mri,  # bad sphere
        subjects_dir=subjects_dir,
    )

    # now without MRI argument, it should give an error when we try
    # to read it
    run_subprocess(["mne_volume_source_space", "--grid", "7.0", "--src", temp_name])
    pytest.raises(ValueError, read_source_spaces, temp_name)


@pytest.mark.timeout(60)  # can be slow on OSX Travis
@pytest.mark.slowtest
@testing.requires_testing_data
def test_triangle_neighbors():
    """Test efficient vertex neighboring triangles for surfaces."""
    this = read_source_spaces(fname)[0]
    this["neighbor_tri"] = [list() for _ in range(this["np"])]
    for p in range(this["ntri"]):
        verts = this["tris"][p]
        this["neighbor_tri"][verts[0]].append(p)
        this["neighbor_tri"][verts[1]].append(p)
        this["neighbor_tri"][verts[2]].append(p)
    this["neighbor_tri"] = [np.array(nb, int) for nb in this["neighbor_tri"]]

    neighbor_tri = _triangle_neighbors(this["tris"], this["np"])
    assert all(
        np.array_equal(nt1, nt2) for nt1, nt2 in zip(neighbor_tri, this["neighbor_tri"])
    )


def test_accumulate_normals():
    """Test efficient normal accumulation for surfaces."""
    # set up comparison
    n_pts = int(1.6e5)  # approx number in sample source space
    n_tris = int(3.2e5)
    # use all positive to make a worst-case for cumulative summation
    # (real "nn" vectors will have both positive and negative values)
    tris = (rng.rand(n_tris, 1) * (n_pts - 2)).astype(int)
    tris = np.c_[tris, tris + 1, tris + 2]
    tri_nn = rng.rand(n_tris, 3)
    this = dict(tris=tris, np=n_pts, ntri=n_tris, tri_nn=tri_nn)

    # cut-and-paste from original code in surface.py:
    #    Find neighboring triangles and accumulate vertex normals
    this["nn"] = np.zeros((this["np"], 3))
    for p in range(this["ntri"]):
        # vertex normals
        verts = this["tris"][p]
        this["nn"][verts, :] += this["tri_nn"][p, :]
    nn = _accumulate_normals(this["tris"], this["tri_nn"], this["np"])

    # the moment of truth (or reckoning)
    assert_allclose(nn, this["nn"], rtol=1e-7, atol=1e-7)


@pytest.mark.slowtest
@testing.requires_testing_data
def test_setup_source_space(tmp_path):
    """Test setting up ico, oct, and all source spaces."""
    pytest.importorskip("nibabel")
    fname_ico = data_path / "subjects" / "fsaverage" / "bem" / "fsaverage-ico-5-src.fif"
    # first lets test some input params
    for spacing in ("oct", "oct6e"):
        with pytest.raises(ValueError, match="subdivision must be an integer"):
            setup_source_space(
                "sample", spacing=spacing, add_dist=False, subjects_dir=subjects_dir
            )
    for spacing in ("oct0", "oct-4"):
        with pytest.raises(ValueError, match="oct subdivision must be >= 1"):
            setup_source_space(
                "sample", spacing=spacing, add_dist=False, subjects_dir=subjects_dir
            )
    with pytest.raises(ValueError, match="ico subdivision must be >= 0"):
        setup_source_space(
            "sample", spacing="ico-4", add_dist=False, subjects_dir=subjects_dir
        )
    with pytest.raises(ValueError, match="must be a string with values"):
        setup_source_space(
            "sample", spacing="7emm", add_dist=False, subjects_dir=subjects_dir
        )
    with pytest.raises(ValueError, match="must be a string with values"):
        setup_source_space(
            "sample", spacing="ally", add_dist=False, subjects_dir=subjects_dir
        )

    # ico 5 (fsaverage) - write to temp file
    src = read_source_spaces(fname_ico)
    with _record_warnings():  # sklearn equiv neighbors
        src_new = setup_source_space(
            "fsaverage", spacing="ico5", subjects_dir=subjects_dir, add_dist=False
        )
    _compare_source_spaces(src, src_new, mode="approx")
    assert repr(src).split("~")[0] == repr(src_new).split("~")[0]
    assert repr(src).count("surface (") == 2
    assert_array_equal(src[0]["vertno"], np.arange(10242))
    assert_array_equal(src[1]["vertno"], np.arange(10242))

    # oct-6 (sample) - auto filename + IO
    src = read_source_spaces(fname)
    temp_name = tmp_path / "temp-src.fif"
    with _record_warnings():  # sklearn equiv neighbors
        src_new = setup_source_space(
            "sample", spacing="oct6", subjects_dir=subjects_dir, add_dist=False
        )
        write_source_spaces(temp_name, src_new, overwrite=True)
    assert_equal(src_new[0]["nuse"], 4098)
    _compare_source_spaces(src, src_new, mode="approx", nearest=False)
    src_new = read_source_spaces(temp_name)
    _compare_source_spaces(src, src_new, mode="approx", nearest=False)

    # all source points - no file writing
    src_new = setup_source_space(
        "sample", spacing="all", subjects_dir=subjects_dir, add_dist=False
    )
    assert src_new[0]["nuse"] == len(src_new[0]["rr"])
    assert src_new[1]["nuse"] == len(src_new[1]["rr"])

    # dense source space to hit surf['inuse'] lines of _create_surf_spacing
    pytest.raises(
        RuntimeError,
        setup_source_space,
        "sample",
        spacing="ico6",
        subjects_dir=subjects_dir,
        add_dist=False,
    )


@testing.requires_testing_data
@requires_mne
@pytest.mark.slowtest
@pytest.mark.timeout(60)
@pytest.mark.parametrize("spacing", [2, 7])
def test_setup_source_space_spacing(tmp_path, spacing, monkeypatch):
    """Test setting up surface source spaces using a given spacing."""
    pytest.importorskip("nibabel")
    copytree(subjects_dir / "sample", tmp_path / "sample")
    args = [] if spacing == 7 else ["--spacing", str(spacing)]
    monkeypatch.setenv("SUBJECTS_DIR", str(tmp_path))
    monkeypatch.setenv("SUBJECT", "sample")
    run_subprocess(["mne_setup_source_space"] + args)
    src = read_source_spaces(tmp_path / "sample" / "bem" / f"sample-{spacing}-src.fif")
    # No need to pass subjects_dir here because we've setenv'ed it
    src_new = setup_source_space("sample", spacing=spacing, add_dist=False)
    _compare_source_spaces(src, src_new, mode="approx", nearest=True)
    # Degenerate conditions
    with pytest.raises(TypeError, match="spacing must be.*got.*float.*"):
        setup_source_space("sample", 7.0)
    with pytest.raises(ValueError, match="spacing must be >= 2, got 1"):
        setup_source_space("sample", 1)


@testing.requires_testing_data
def test_read_source_spaces():
    """Test reading of source space meshes."""
    src = read_source_spaces(fname, patch_stats=True)

    # 3D source space
    lh_points = src[0]["rr"]
    lh_faces = src[0]["tris"]
    lh_use_faces = src[0]["use_tris"]
    rh_points = src[1]["rr"]
    rh_faces = src[1]["tris"]
    rh_use_faces = src[1]["use_tris"]
    assert lh_faces.min() == 0
    assert lh_faces.max() == lh_points.shape[0] - 1
    assert lh_use_faces.min() >= 0
    assert lh_use_faces.max() <= lh_points.shape[0] - 1
    assert rh_faces.min() == 0
    assert rh_faces.max() == rh_points.shape[0] - 1
    assert rh_use_faces.min() >= 0
    assert rh_use_faces.max() <= rh_points.shape[0] - 1


@pytest.mark.slowtest
@testing.requires_testing_data
def test_write_source_space(tmp_path):
    """Test reading and writing of source spaces."""
    src0 = read_source_spaces(fname, patch_stats=False)
    temp_fname = tmp_path / "tmp-src.fif"
    write_source_spaces(temp_fname, src0)
    src1 = read_source_spaces(temp_fname, patch_stats=False)
    _compare_source_spaces(src0, src1)

    # test warnings on bad filenames
    src_badname = tmp_path / "test-bad-name.fif.gz"
    with pytest.warns(RuntimeWarning, match="-src.fif"):
        write_source_spaces(src_badname, src0)
    with pytest.warns(RuntimeWarning, match="-src.fif"):
        read_source_spaces(src_badname)


@testing.requires_testing_data
@pytest.mark.parametrize("pass_ids", (True, False))
def test_source_space_from_label(tmp_path, pass_ids):
    """Test generating a source space from volume label."""
    pytest.importorskip("nibabel")
    aseg_short = "aseg.mgz"
    atlas_ids, _ = read_freesurfer_lut()
    volume_label = "Left-Cerebellum-Cortex"

    # Test pos as dict
    pos = dict()
    with pytest.raises(ValueError, match="mri must be None if pos is a dict"):
        setup_volume_source_space(
            "sample",
            pos=pos,
            volume_label=volume_label,
            mri=aseg_short,
            subjects_dir=subjects_dir,
        )

    # Test T1.mgz provided
    with pytest.raises(RuntimeError, match=r"Must use a \*aseg.mgz file"):
        setup_volume_source_space(
            "sample", mri="T1.mgz", volume_label=volume_label, subjects_dir=subjects_dir
        )

    # Test invalid volume label
    mri = aseg_short
    with pytest.raises(ValueError, match="'Left-Cerebral' not found.*Did you"):
        setup_volume_source_space(
            "sample", volume_label="Left-Cerebral", mri=mri, subjects_dir=subjects_dir
        )

    # These should be equivalent
    if pass_ids:
        use_volume_label = {volume_label: atlas_ids[volume_label]}
    else:
        use_volume_label = volume_label

    # ensure it works even when not provided (detect that it should be aseg)
    src = setup_volume_source_space(
        "sample",
        volume_label=use_volume_label,
        add_interpolator=False,
        subjects_dir=subjects_dir,
    )
    assert_equal(volume_label, src[0]["seg_name"])
    assert src[0]["nuse"] == 404  # for our given pos and label

    # test reading and writing
    out_name = tmp_path / "temp-src.fif"
    write_source_spaces(out_name, src)
    src_from_file = read_source_spaces(out_name)
    _compare_source_spaces(src, src_from_file, mode="approx")


@pytest.mark.slowtest
@testing.requires_testing_data
def test_source_space_exclusive_complete(src_volume_labels):
    """Test that we produce exclusive and complete labels."""
    # these two are neighbors and are quite large, so let's use them to
    # ensure no overlaps
    pytest.importorskip("nibabel")
    src, volume_labels, _ = src_volume_labels
    ii = volume_labels.index("Left-Cerebral-White-Matter")
    jj = volume_labels.index("Left-Cerebral-Cortex")
    assert src[ii]["nuse"] == 755  # 2034 with pos=5, was 2832
    assert src[jj]["nuse"] == 616  # 1520 with pos=5, was 2623
    src_full = read_source_spaces(fname_vol)
    # This implicitly checks for overlap because np.sort would preserve
    # duplicates, and it checks for completeness because the sets should match
    assert_array_equal(
        src_full[0]["vertno"], np.sort(np.concatenate([s["vertno"] for s in src]))
    )
    for si, s in enumerate(src):
        assert_allclose(src_full[0]["rr"], s["rr"], atol=1e-6)
    # also check single_volume=True -- should be the same result
    with (
        _record_warnings(),
        pytest.warns(RuntimeWarning, match="Found no usable.*Left-vessel.*"),
    ):
        src_single = setup_volume_source_space(
            src[0]["subject_his_id"],
            7.0,
            "aseg.mgz",
            bem=fname_bem,
            volume_label=volume_labels,
            single_volume=True,
            add_interpolator=False,
            subjects_dir=subjects_dir,
        )
    assert len(src_single) == 1
    assert "Unknown+Left-Cerebral-White-Matter+Left-" in repr(src_single)
    assert_array_equal(src_full[0]["vertno"], src_single[0]["vertno"])


@pytest.mark.timeout(60)  # ~24 s on Travis
@pytest.mark.slowtest
@testing.requires_testing_data
def test_read_volume_from_src():
    """Test reading volumes from a mixed source space."""
    pytest.importorskip("nibabel")
    labels_vol = ["Left-Amygdala", "Brain-Stem", "Right-Amygdala"]

    src = read_source_spaces(fname)

    # Setup a volume source space
    vol_src = setup_volume_source_space(
        "sample",
        mri=aseg_fname,
        pos=5.0,
        bem=fname_bem,
        volume_label=labels_vol,
        subjects_dir=subjects_dir,
    )
    # Generate the mixed source space, testing some list methods
    assert src.kind == "surface"
    assert vol_src.kind == "volume"
    src += vol_src
    assert src.kind == "mixed"
    assert vol_src.kind == "volume"
    assert src[:2].kind == "surface"
    assert src[2:].kind == "volume"
    assert src[:].kind == "mixed"
    with pytest.raises(RuntimeError, match="Invalid source space"):
        src[::2]

    volume_src = get_volume_labels_from_src(src, "sample", subjects_dir)
    volume_label = volume_src[0].name
    volume_label = "Left-" + volume_label.replace("-lh", "")

    # Test
    assert_equal(volume_label, src[2]["seg_name"])

    assert_equal(src[2]["type"], "vol")


@testing.requires_testing_data
def test_combine_source_spaces(tmp_path):
    """Test combining source spaces."""
    nib = pytest.importorskip("nibabel")
    rng = np.random.RandomState(2)
    volume_labels = ["Brain-Stem", "Right-Hippocampus"]  # two fairly large

    # create a sparse surface source space to ensure all get mapped
    # when mri_resolution=False
    srf = setup_source_space(
        "sample", "oct3", add_dist=False, subjects_dir=subjects_dir
    )

    # setup 2 volume source spaces
    vol = setup_volume_source_space(
        "sample",
        subjects_dir=subjects_dir,
        volume_label=volume_labels[0],
        mri=aseg_fname,
        add_interpolator=False,
    )

    # setup a discrete source space
    rr = rng.randint(0, 11, (20, 3)) * 5e-3
    nn = np.zeros(rr.shape)
    nn[:, -1] = 1
    pos = {"rr": rr, "nn": nn}
    disc = setup_volume_source_space(
        "sample", subjects_dir=subjects_dir, pos=pos, verbose="error"
    )

    # combine source spaces
    assert srf.kind == "surface"
    assert vol.kind == "volume"
    assert disc.kind == "discrete"
    src = srf + vol + disc
    assert src.kind == "mixed"
    assert srf.kind == "surface"
    assert vol.kind == "volume"
    assert disc.kind == "discrete"

    # test addition of source spaces
    assert len(src) == 4

    # test reading and writing
    src_out_name = tmp_path / "temp-src.fif"
    src.save(src_out_name)
    src_from_file = read_source_spaces(src_out_name)
    _compare_source_spaces(src, src_from_file, mode="approx")
    assert repr(src).split("~")[0] == repr(src_from_file).split("~")[0]
    assert_equal(src.kind, "mixed")

    # test that all source spaces are in MRI coordinates
    coord_frames = np.array([s["coord_frame"] for s in src])
    assert (coord_frames == FIFF.FIFFV_COORD_MRI).all()

    # test errors for export_volume
    image_fname = tmp_path / "temp-image.mgz"

    # source spaces with no volume
    with pytest.raises(ValueError, match="at least one volume"):
        srf.export_volume(image_fname, verbose="error")

    # unrecognized source type
    disc2 = disc.copy()
    disc2[0]["type"] = "kitty"
    with pytest.raises(ValueError, match="Invalid value"):
        src + disc2
    del disc2

    # unrecognized file type
    bad_image_fname = tmp_path / "temp-image.png"
    # vertices outside vol space warning
    pytest.raises(ValueError, src.export_volume, bad_image_fname, verbose="error")

    # mixed coordinate frames
    disc3 = disc.copy()
    disc3[0]["coord_frame"] = 10
    src_mixed_coord = src + disc3
    with pytest.raises(ValueError, match="must be in head coordinates"):
        src_mixed_coord.export_volume(image_fname, verbose="error")

    # now actually write it
    fname_img = tmp_path / "img.nii"
    for mri_resolution in (False, "sparse", True):
        for src, up in ((vol, 705), (srf + vol, 27272), (disc + vol, 705)):
            src.export_volume(
                fname_img, use_lut=False, mri_resolution=mri_resolution, overwrite=True
            )
            img_data = _get_img_fdata(nib.load(str(fname_img)))
            n_src = img_data.astype(bool).sum()
            n_want = sum(s["nuse"] for s in src)
            if mri_resolution is True:
                n_want += up
            assert n_src == n_want, src

    # gh-8004
    temp_aseg = tmp_path / "aseg.mgz"
    aseg_img = nib.load(aseg_fname)
    aseg_affine = aseg_img.affine
    aseg_affine[:3, :3] *= 0.7
    new_aseg = nib.MGHImage(aseg_img.dataobj, aseg_affine)
    nib.save(new_aseg, str(temp_aseg))
    lh_cereb = mne.setup_volume_source_space(
        "sample",
        mri=temp_aseg,
        volume_label="Left-Cerebellum-Cortex",
        add_interpolator=False,
        subjects_dir=subjects_dir,
    )
    src = srf + lh_cereb
    with pytest.warns(RuntimeWarning, match="2 surf vertices lay outside"):
        src.export_volume(image_fname, mri_resolution="sparse", overwrite=True)

    # gh-12495
    image_fname = tmp_path / "temp-image.nii"
    lh_cereb = mne.setup_volume_source_space(
        "sample",
        mri=aseg_fname,
        volume_label="Left-Cerebellum-Cortex",
        add_interpolator=False,
        subjects_dir=subjects_dir,
    )
    lh_cereb.export_volume(image_fname, mri_resolution=True)
    aseg = nib.load(str(aseg_fname))
    out = nib.load(str(image_fname))
    assert_allclose(out.affine, aseg.affine)
    src_data = _get_img_fdata(out).astype(bool)
    aseg_data = _get_img_fdata(aseg) == 8
    n_src = src_data.sum()
    n_aseg = aseg_data.sum()
    assert n_aseg == n_src
    n_overlap = (src_data & aseg_data).sum()
    assert n_src == n_overlap


@testing.requires_testing_data
def test_morph_source_spaces():
    """Test morphing of source spaces."""
    pytest.importorskip("nibabel")
    src = read_source_spaces(fname_fs)
    src_morph = read_source_spaces(fname_morph)
    src_morph_py = morph_source_spaces(src, "sample", subjects_dir=subjects_dir)
    _compare_source_spaces(src_morph, src_morph_py, mode="approx")


@pytest.mark.timeout(60)  # can be slow on OSX Travis
@pytest.mark.slowtest
@testing.requires_testing_data
def test_morphed_source_space_return():
    """Test returning a morphed source space to the original subject."""
    # let's create some random data on fsaverage
    pytest.importorskip("nibabel")
    data = rng.randn(20484, 1)
    tmin, tstep = 0, 1.0
    src_fs = read_source_spaces(fname_fs)
    stc_fs = SourceEstimate(
        data, [s["vertno"] for s in src_fs], tmin, tstep, "fsaverage"
    )
    n_verts_fs = sum(len(s["vertno"]) for s in src_fs)

    # Create our morph source space
    src_morph = morph_source_spaces(src_fs, "sample", subjects_dir=subjects_dir)
    n_verts_sample = sum(len(s["vertno"]) for s in src_morph)
    assert n_verts_fs == n_verts_sample

    # Morph the data over using standard methods
    stc_morph = compute_source_morph(
        src_fs,
        "fsaverage",
        "sample",
        spacing=[s["vertno"] for s in src_morph],
        smooth=1,
        subjects_dir=subjects_dir,
        warn=False,
    ).apply(stc_fs)
    assert stc_morph.data.shape[0] == n_verts_sample

    # We can now pretend like this was real data we got e.g. from an inverse.
    # To be complete, let's remove some vertices
    keeps = [
        np.sort(rng.permutation(np.arange(len(v)))[: len(v) - 10])
        for v in stc_morph.vertices
    ]
    stc_morph = SourceEstimate(
        np.concatenate([stc_morph.lh_data[keeps[0]], stc_morph.rh_data[keeps[1]]]),
        [v[k] for v, k in zip(stc_morph.vertices, keeps)],
        tmin,
        tstep,
        "sample",
    )

    # Return it to the original subject
    stc_morph_return = stc_morph.to_original_src(src_fs, subjects_dir=subjects_dir)

    # This should fail (has too many verts in SourceMorph)
    with pytest.warns(RuntimeWarning, match="vertices not included"):
        morph = compute_source_morph(
            src_morph,
            subject_from="sample",
            spacing=stc_morph_return.vertices,
            smooth=1,
            subjects_dir=subjects_dir,
        )
    with pytest.raises(ValueError, match="vertices do not match"):
        morph.apply(stc_morph)

    # Compare to the original data
    with pytest.warns(RuntimeWarning, match="vertices not included"):
        stc_morph_morph = compute_source_morph(
            src=stc_morph,
            subject_from="sample",
            spacing=stc_morph_return.vertices,
            smooth=1,
            subjects_dir=subjects_dir,
        ).apply(stc_morph)

    assert_equal(stc_morph_return.subject, stc_morph_morph.subject)
    for ii in range(2):
        assert_array_equal(stc_morph_return.vertices[ii], stc_morph_morph.vertices[ii])
    # These will not match perfectly because morphing pushes data around
    corr = np.corrcoef(stc_morph_return.data[:, 0], stc_morph_morph.data[:, 0])[0, 1]
    assert corr > 0.99, corr

    # Explicitly test having two vertices map to the same target vertex. We
    # simulate this by having two vertices be at the same position.
    src_fs2 = src_fs.copy()
    vert1, vert2 = src_fs2[0]["vertno"][:2]
    src_fs2[0]["rr"][vert1] = src_fs2[0]["rr"][vert2]
    stc_morph_return = stc_morph.to_original_src(src_fs2, subjects_dir=subjects_dir)

    # test to_original_src method result equality
    for ii in range(2):
        assert_array_equal(stc_morph_return.vertices[ii], stc_morph_morph.vertices[ii])

    # These will not match perfectly because morphing pushes data around
    corr = np.corrcoef(stc_morph_return.data[:, 0], stc_morph_morph.data[:, 0])[0, 1]
    assert corr > 0.99, corr

    # Degenerate cases
    stc_morph.subject = None  # no .subject provided
    pytest.raises(
        ValueError,
        stc_morph.to_original_src,
        src_fs,
        subject_orig="fsaverage",
        subjects_dir=subjects_dir,
    )
    stc_morph.subject = "sample"
    del src_fs[0]["subject_his_id"]  # no name in src_fsaverage
    pytest.raises(
        ValueError, stc_morph.to_original_src, src_fs, subjects_dir=subjects_dir
    )
    src_fs[0]["subject_his_id"] = "fsaverage"  # name mismatch
    pytest.raises(
        ValueError,
        stc_morph.to_original_src,
        src_fs,
        subject_orig="foo",
        subjects_dir=subjects_dir,
    )
    src_fs[0]["subject_his_id"] = "sample"
    src = read_source_spaces(fname)  # wrong source space
    pytest.raises(
        RuntimeError, stc_morph.to_original_src, src, subjects_dir=subjects_dir
    )


@testing.requires_testing_data
@pytest.mark.parametrize(
    "src, n, nv",
    [
        (fname_fs, 2, 10242),
        (fname_src, 2, 258),
        (fname_vol, 0, 0),
    ],
)
def test_get_decimated_surfaces(src, n, nv):
    """Test get_decimated_surfaces."""
    surfaces = get_decimated_surfaces(src)
    assert len(surfaces) == n
    if n == 0:
        return
    for s in surfaces:
        assert set(s) == {"rr", "tris"}
        assert len(s["rr"]) == nv
        assert_array_equal(np.unique(s["tris"]), np.arange(nv))


# The following code was used to generate small-src.fif.gz.
# Unfortunately the C code bombs when trying to add source space distances,
# possibly due to incomplete "faking" of a smaller surface on our part here.
"""

import os
import numpy as np
import mne

data_path = mne.datasets.sample.data_path()
src = mne.setup_source_space('sample', fname=None, spacing='oct5')
hemis = ['lh', 'rh']
fnames = [
    str(data_path) + f'/subjects/sample/surf/{h}.decimated' for h in hemis]

vs = list()
for s, fname in zip(src, fnames):
    coords = s['rr'][s['vertno']]
    vs.append(s['vertno'])
    idx = -1 * np.ones(len(s['rr']))
    idx[s['vertno']] = np.arange(s['nuse'])
    faces = s['use_tris']
    faces = idx[faces]
    mne.write_surface(fname, coords, faces)

# we need to move sphere surfaces
spheres = [
    str(data_path) + f'/subjects/sample/surf/{h}.sphere' for h in hemis]
for s in spheres:
    os.rename(s, s + '.bak')
try:
    for s, v in zip(spheres, vs):
        coords, faces = mne.read_surface(s + '.bak')
        coords = coords[v]
        mne.write_surface(s, coords, faces)
    src = mne.setup_source_space('sample', fname=None, spacing='oct4',
                                 surface='decimated')
finally:
    for s in spheres:
        os.rename(s + '.bak', s)

fname = 'small-src.fif'
fname_gz = fname + '.gz'
mne.write_source_spaces(fname, src)
mne.utils.run_subprocess(['mne_add_patch_info', '--src', fname,
                          '--srcp', fname])
mne.write_source_spaces(fname_gz, mne.read_source_spaces(fname))
"""
