# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

import copy as cp
from pathlib import Path

import numpy as np
import pytest
from numpy.testing import assert_allclose, assert_array_almost_equal, assert_equal
from scipy import linalg

from mne import (
    Epochs,
    compute_proj_epochs,
    compute_proj_evoked,
    compute_proj_raw,
    compute_raw_covariance,
    convert_forward_solution,
    create_info,
    pick_types,
    read_events,
    read_forward_solution,
    read_source_estimate,
    sensitivity_map,
)
from mne._fiff.proj import (
    _EEG_AVREF_PICK_DICT,
    _needs_eeg_average_ref_proj,
    activate_proj,
    make_projector,
    setup_proj,
)
from mne.cov import compute_whitener, regularize
from mne.datasets import testing
from mne.io import RawArray, read_raw_fif
from mne.preprocessing import maxwell_filter
from mne.proj import (
    _has_eeg_average_ref_proj,
    make_eeg_average_ref_proj,
    read_proj,
    write_proj,
)
from mne.rank import _compute_rank_int
from mne.utils import _record_warnings

base_dir = Path(__file__).parents[1] / "io" / "tests" / "data"
raw_fname = base_dir / "test_raw.fif"
event_fname = base_dir / "test-eve.fif"
proj_fname = base_dir / "test-proj.fif"
proj_gz_fname = base_dir / "test-proj.fif.gz"
bads_fname = base_dir / "test_bads.txt"
sample_path = testing.data_path(download=False) / "MEG" / "sample"
fwd_fname = sample_path / "sample_audvis_trunc-meg-eeg-oct-4-fwd.fif"
sensmap_fname = sample_path / "sample_audvis_trunc-%s-oct-4-fwd-sensmap-%s.w"
eog_fname = sample_path / "sample_audvis_eog-proj.fif"
ecg_fname = sample_path / "sample_audvis_ecg-proj.fif"


def test_bad_proj():
    """Test dealing with bad projection application."""
    raw = read_raw_fif(raw_fname, preload=True)
    events = read_events(event_fname)
    picks = pick_types(
        raw.info, meg=True, stim=False, ecg=False, eog=False, exclude="bads"
    )
    picks = picks[2:18:3]
    _check_warnings(raw, events, picks)
    # still bad
    raw.pick([raw.ch_names[ii] for ii in picks])
    _check_warnings(raw, events)
    # "fixed"
    raw.info.normalize_proj()  # avoid projection warnings
    _check_warnings(raw, events, count=0)
    # eeg avg ref is okay
    raw = read_raw_fif(raw_fname, preload=True).pick(picks="eeg")
    raw.set_eeg_reference(projection=True)
    _check_warnings(raw, events, count=0)
    raw.info["bads"] = raw.ch_names[:10]
    _check_warnings(raw, events, count=0)

    raw = read_raw_fif(raw_fname)
    pytest.raises(ValueError, raw.del_proj, "foo")
    n_proj = len(raw.info["projs"])
    raw.del_proj(0)
    assert_equal(len(raw.info["projs"]), n_proj - 1)
    raw.del_proj()
    assert_equal(len(raw.info["projs"]), 0)

    # Ensure we deal with newer-style Neuromag projs properly, were getting:
    #
    #     Projection vector "PCA-v2" has magnitude 1.00 (should be unity),
    #     applying projector with 101/306 of the original channels available
    #     may be dangerous.
    raw = read_raw_fif(raw_fname).crop(0, 1)
    raw.set_eeg_reference(projection=True)
    raw.info["bads"] = ["MEG 0111"]
    meg_picks = pick_types(raw.info, meg=True, exclude=())
    ch_names = [raw.ch_names[pick] for pick in meg_picks]
    for p in raw.info["projs"][:-1]:
        data = np.zeros((1, len(ch_names)))
        idx = [ch_names.index(ch_name) for ch_name in p["data"]["col_names"]]
        data[:, idx] = p["data"]["data"]
        p["data"].update(ncol=len(meg_picks), col_names=ch_names, data=data)
    # smoke test for no warnings during reg
    regularize(compute_raw_covariance(raw, verbose="error"), raw.info)


def _check_warnings(raw, events, picks=None, count=3):
    """Count warnings."""
    with _record_warnings() as w:
        Epochs(
            raw,
            events,
            dict(aud_l=1, vis_l=3),
            -0.2,
            0.5,
            picks=picks,
            preload=True,
            proj=True,
        )
    assert len(w) == count
    assert all("dangerous" in str(ww.message) for ww in w)


@testing.requires_testing_data
def test_sensitivity_maps():
    """Test sensitivity map computation."""
    fwd = read_forward_solution(fwd_fname)
    fwd = convert_forward_solution(fwd, surf_ori=True)
    projs = read_proj(eog_fname)
    projs.extend(read_proj(ecg_fname))
    decim = 6
    for ch_type in ["eeg", "grad", "mag"]:
        w = read_source_estimate(Path(str(sensmap_fname) % (ch_type, "lh"))).data
        stc = sensitivity_map(
            fwd, projs=None, ch_type=ch_type, mode="free", exclude="bads"
        )
        assert_array_almost_equal(stc.data, w, decim)
        assert stc.subject == "sample"
        # let's just make sure the others run
        if ch_type == "grad":
            # fixed (2)
            w = read_source_estimate(str(sensmap_fname) % (ch_type, "2-lh")).data
            stc = sensitivity_map(
                fwd, projs=None, mode="fixed", ch_type=ch_type, exclude="bads"
            )
            assert_array_almost_equal(stc.data, w, decim)
        if ch_type == "mag":
            # ratio (3)
            w = read_source_estimate(str(sensmap_fname) % (ch_type, "3-lh")).data
            stc = sensitivity_map(
                fwd, projs=None, mode="ratio", ch_type=ch_type, exclude="bads"
            )
            assert_array_almost_equal(stc.data, w, decim)
        if ch_type == "eeg":
            # radiality (4), angle (5), remaining (6), and  dampening (7)
            modes = ["radiality", "angle", "remaining", "dampening"]
            ends = ["4-lh", "5-lh", "6-lh", "7-lh"]
            for mode, end in zip(modes, ends):
                w = read_source_estimate(str(sensmap_fname) % (ch_type, end)).data
                stc = sensitivity_map(
                    fwd, projs=projs, mode=mode, ch_type=ch_type, exclude="bads"
                )
                assert_array_almost_equal(stc.data, w, decim)

    # test corner case for EEG
    stc = sensitivity_map(
        fwd,
        projs=[make_eeg_average_ref_proj(fwd["info"])],
        ch_type="eeg",
        exclude="bads",
    )
    # test corner case for projs being passed but no valid ones (#3135)
    pytest.raises(ValueError, sensitivity_map, fwd, projs=None, mode="angle")
    pytest.raises(RuntimeError, sensitivity_map, fwd, projs=[], mode="angle")
    # test volume source space
    fname = sample_path / "sample_audvis_trunc-meg-vol-7-fwd.fif"
    fwd = read_forward_solution(fname)
    sensitivity_map(fwd)


def test_compute_proj_epochs(tmp_path):
    """Test SSP computation on epochs."""
    event_id, tmin, tmax = 1, -0.2, 0.3

    raw = read_raw_fif(raw_fname, preload=True)
    events = read_events(event_fname)
    bad_ch = "MEG 2443"
    picks = pick_types(raw.info, meg=True, eeg=False, stim=False, eog=False, exclude=[])
    epochs = Epochs(
        raw, events, event_id, tmin, tmax, picks=picks, baseline=None, proj=False
    )

    evoked = epochs.average()
    projs = compute_proj_epochs(epochs, n_grad=1, n_mag=1, n_eeg=0)
    write_proj(tmp_path / "test-proj.fif.gz", projs)
    for p_fname in [proj_fname, proj_gz_fname, tmp_path / "test-proj.fif.gz"]:
        projs2 = read_proj(p_fname)
        assert len(projs) == len(projs2)

        for p1, p2 in zip(projs, projs2):
            assert p1["desc"] == p2["desc"]
            assert p1["data"]["col_names"] == p2["data"]["col_names"]
            assert p1["active"] == p2["active"]
            # compare with sign invariance
            p1_data = p1["data"]["data"] * np.sign(p1["data"]["data"][0, 0])
            p2_data = p2["data"]["data"] * np.sign(p2["data"]["data"][0, 0])
            if bad_ch in p1["data"]["col_names"]:
                bad = p1["data"]["col_names"].index("MEG 2443")
                mask = np.ones(p1_data.size, dtype=bool)
                mask[bad] = False
                p1_data = p1_data[:, mask]
                p2_data = p2_data[:, mask]
            corr = np.corrcoef(p1_data, p2_data)[0, 1]
            assert_array_almost_equal(corr, 1.0, 5)
            if p2["explained_var"]:
                assert isinstance(p2["explained_var"], float)
                assert_array_almost_equal(p1["explained_var"], p2["explained_var"])

    # test that you can compute the projection matrix
    projs = activate_proj(projs)
    proj, nproj, U = make_projector(projs, epochs.ch_names, bads=[])

    assert nproj == 2
    assert U.shape[1] == 2

    # test that you can save them
    with epochs.info._unlock():
        epochs.info["projs"] += projs
    evoked = epochs.average()
    evoked.save(tmp_path / "foo-ave.fif")

    projs = read_proj(proj_fname)

    projs_evoked = compute_proj_evoked(evoked, n_grad=1, n_mag=1, n_eeg=0)
    assert len(projs_evoked) == 2
    # XXX : test something

    # test parallelization
    projs = compute_proj_epochs(
        epochs, n_grad=1, n_mag=1, n_eeg=0, desc_prefix="foobar"
    )
    assert all("foobar" in x["desc"] for x in projs)
    projs = activate_proj(projs)
    proj_par, _, _ = make_projector(projs, epochs.ch_names, bads=[])
    assert_allclose(proj, proj_par, rtol=1e-8, atol=1e-16)

    # test warnings on bad filenames
    proj_badname = tmp_path / "test-bad-name.fif.gz"
    with pytest.warns(RuntimeWarning, match="-proj.fif"):
        write_proj(proj_badname, projs)
    with pytest.warns(RuntimeWarning, match="-proj.fif"):
        read_proj(proj_badname)

    # bad inputs
    fname = tmp_path / "out-proj.fif"
    with pytest.raises(TypeError, match="projs"):
        write_proj(fname, "foo")
    with pytest.raises(TypeError, match=r"projs\[0\] must be .*"):
        write_proj(fname, ["foo"], overwrite=True)


@pytest.mark.slowtest
def test_compute_proj_raw(tmp_path):
    """Test SSP computation on raw."""
    # Test that the raw projectors work
    raw_time = 2.5  # Do shorter amount for speed
    raw = read_raw_fif(raw_fname).crop(0, raw_time)
    raw.load_data()
    for ii in (0.25, 0.5, 1, 2):
        with pytest.warns(RuntimeWarning, match="Too few samples"):
            projs = compute_proj_raw(
                raw, duration=ii - 0.1, stop=raw_time, n_grad=1, n_mag=1, n_eeg=0
            )

        # test that you can compute the projection matrix
        projs = activate_proj(projs)
        proj, nproj, U = make_projector(projs, raw.ch_names, bads=[])

        assert nproj == 2
        assert U.shape[1] == 2

        # test that you can save them
        with raw.info._unlock():
            raw.info["projs"] += projs
        raw.save(tmp_path / f"foo_{ii}_raw.fif", overwrite=True)

    # Test that purely continuous (no duration) raw projection works
    with pytest.warns(RuntimeWarning, match="Too few samples"):
        projs = compute_proj_raw(
            raw, duration=None, stop=raw_time, n_grad=1, n_mag=1, n_eeg=0
        )

    # test that you can compute the projection matrix
    projs = activate_proj(projs)
    proj, nproj, U = make_projector(projs, raw.ch_names, bads=[])

    assert nproj == 2
    assert U.shape[1] == 2

    # test that you can save them
    with raw.info._unlock():
        raw.info["projs"] += projs
    raw.save(tmp_path / "foo_rawproj_continuous_raw.fif")

    # test resampled-data projector, upsampling instead of downsampling
    # here to save an extra filtering (raw would have to be LP'ed to be equiv)
    raw_resamp = cp.deepcopy(raw)
    raw_resamp.resample(raw.info["sfreq"] * 2, n_jobs=2, npad="auto")
    projs = compute_proj_raw(
        raw_resamp, duration=None, stop=raw_time, n_grad=1, n_mag=1, n_eeg=0
    )
    projs = activate_proj(projs)
    proj_new, _, _ = make_projector(projs, raw.ch_names, bads=[])
    assert_array_almost_equal(proj_new, proj, 4)

    # test with bads
    raw.load_bad_channels(bads_fname)  # adds 2 bad mag channels
    with pytest.warns(RuntimeWarning, match="Too few samples"):
        projs = compute_proj_raw(raw, n_grad=0, n_mag=0, n_eeg=1)
    assert len(projs) == 1

    # test that bad channels can be excluded, and empty support
    for projs_ in (projs, []):
        proj, nproj, U = make_projector(projs_, raw.ch_names, bads=raw.ch_names)
        assert_array_almost_equal(proj, np.eye(len(raw.ch_names)))
        assert nproj == 0  # all channels excluded
        assert U.shape == (len(raw.ch_names), nproj)


@pytest.mark.parametrize("duration", [1, np.pi / 2.0])
@pytest.mark.parametrize("sfreq", [600.614990234375, 1000.0])
def test_proj_raw_duration(duration, sfreq):
    """Test equivalence of `duration` options."""
    n_ch, n_dim = 30, 3
    rng = np.random.RandomState(0)
    signals = rng.randn(n_dim, 10000)
    mixing = rng.randn(n_ch, n_dim) + [0, 1, 2]
    data = np.dot(mixing, signals)
    raw = RawArray(data, create_info(n_ch, sfreq, "eeg"))
    raw.set_eeg_reference(projection=True)
    n_eff = int(round(raw.info["sfreq"] * duration))
    # crop to an even "duration" number of epochs
    stop = ((len(raw.times) // n_eff) * n_eff - 1) / raw.info["sfreq"]
    raw.crop(0, stop)
    proj_def = compute_proj_raw(raw, n_eeg=n_dim)
    proj_dur = compute_proj_raw(raw, duration=duration, n_eeg=n_dim)
    proj_none = compute_proj_raw(raw, duration=None, n_eeg=n_dim)
    assert len(proj_dur) == len(proj_none) == len(proj_def) == n_dim
    # proj_def is not in here because it does not necessarily evenly divide
    # the signal length:
    for pu, pn in zip(proj_dur, proj_none):
        assert_allclose(pu["data"]["data"], pn["data"]["data"])
    # but we can test it here since it should still be a small subspace angle:
    for proj in (proj_dur, proj_none, proj_def):
        computed = np.concatenate([p["data"]["data"] for p in proj], 0)
        angle = np.rad2deg(linalg.subspace_angles(computed.T, mixing)[0])
        assert angle < 1e-5


def test_make_eeg_average_ref_proj():
    """Test EEG average reference projection."""
    raw = read_raw_fif(raw_fname, preload=True)
    eeg = pick_types(raw.info, meg=False, eeg=True)

    # No average EEG reference
    assert not np.all(raw._data[eeg].mean(axis=0) < 1e-19)

    # Apply average EEG reference
    car = make_eeg_average_ref_proj(raw.info)
    reref = raw.copy()
    reref.add_proj(car)
    reref.apply_proj()
    assert_array_almost_equal(reref._data[eeg].mean(axis=0), 0, decimal=18)

    # Error when custom reference has already been applied
    with raw.info._unlock():
        raw.info["custom_ref_applied"] = True
    pytest.raises(RuntimeError, make_eeg_average_ref_proj, raw.info)

    # test that an average EEG ref is not added when doing proj
    raw.set_eeg_reference(projection=True)
    assert _has_eeg_average_ref_proj(raw.info)
    raw.del_proj(idx=-1)
    assert not _has_eeg_average_ref_proj(raw.info)
    raw.apply_proj()
    assert not _has_eeg_average_ref_proj(raw.info)


@pytest.mark.parametrize("ch_type", tuple(_EEG_AVREF_PICK_DICT) + ("all",))
def test_has_eeg_average_ref_proj(ch_type):
    """Test checking whether an (i)EEG average reference exists."""
    all_ref_ch_types = list(_EEG_AVREF_PICK_DICT)
    if ch_type == "all":
        ch_types = all_ref_ch_types
        set_eeg_ref_ch_type = all_ref_ch_types
    else:
        ch_types = [ch_type] * len(all_ref_ch_types)
        set_eeg_ref_ch_type = ch_type
    empty_info = create_info(len(all_ref_ch_types), 1000.0, ch_types)
    assert not _has_eeg_average_ref_proj(empty_info)

    raw = read_raw_fif(raw_fname)
    raw.del_proj()
    raw.load_data()
    picks = pick_types(raw.info, eeg=True)
    assert len(picks) == 60
    # repeat `ch_types` over and over again
    ch_types = sum([ch_types] * (len(picks) // len(ch_types) + 1), [])[: len(picks)]
    raw.set_channel_types(
        {raw.ch_names[pick]: ch_type for pick, ch_type in zip(picks, ch_types)}
    )
    raw._data[picks] = 1.0  # set all to unity
    # For individual channel types, set them and return from the test early
    if ch_type != "all":
        for ch_type in ("auto", set_eeg_ref_ch_type):
            raw.set_eeg_reference(projection=True, ch_type=ch_type)
            assert len(raw.info["projs"]) == 1
            assert _has_eeg_average_ref_proj(raw.info)
            assert not _needs_eeg_average_ref_proj(raw.info)
            if ch_type == "auto":
                with pytest.warns(RuntimeWarning, match="added.*untouched"):
                    raw.set_eeg_reference(projection=True, ch_type=ch_type)
                raw.del_proj()
        desc = raw.info["projs"][0]["desc"].lower()
        assert ch_type in desc
        data = raw.copy().apply_proj()[picks][0]
        assert_allclose(data, 0.0, atol=1e-12)  # zeroed out
        return

    # Now for ch_type == 'all', ensure that we can make one proj or
    # len(all_ref_ch_types) projs

    # One big joint proj
    raw.set_eeg_reference(projection=True, ch_type=set_eeg_ref_ch_type)
    assert len(raw.info["projs"]) == len(all_ref_ch_types)
    raw.del_proj()
    raw.set_eeg_reference(projection=True, ch_type=set_eeg_ref_ch_type, joint=True)
    assert len(raw.info["projs"]) == 1
    assert _has_eeg_average_ref_proj(raw.info)
    assert not _needs_eeg_average_ref_proj(raw.info)
    desc = raw.info["projs"][0]["desc"].lower()
    assert all(ch_type in desc for ch_type in all_ref_ch_types)
    for ch_type in all_ref_ch_types + [all_ref_ch_types]:
        with pytest.warns(RuntimeWarning, match="already added.*untouch"):
            raw.set_eeg_reference(projection=True, ch_type=ch_type)
    data = raw.copy().apply_proj()[picks][0]
    assert_allclose(data, 0.0, atol=1e-12)  # zeroed out
    raw.del_proj()

    # len(all_kinds) separate projs, with data for each channel type that
    # is a different non-zero integer (EEG=1, SEEG=2, ...)
    for ci, ch_type in enumerate(all_ref_ch_types):
        raw._data[pick_types(raw.info, **{ch_type: True})] = ci + 1
    for ci, ch_type in enumerate(all_ref_ch_types):
        raw.set_eeg_reference(projection=True, ch_type=ch_type)
        assert len(raw.info["projs"]) == ci + 1
        if ci < len(all_ref_ch_types) < 1:
            assert not _has_eeg_average_ref_proj(raw.info)
            assert _needs_eeg_average_ref_proj(raw.info)
    assert len(raw.info["projs"]) == len(all_ref_ch_types)
    assert _has_eeg_average_ref_proj(raw.info)
    assert not _needs_eeg_average_ref_proj(raw.info)
    descs = [p["desc"].lower() for p in raw.info["projs"]]
    assert len(descs) == len(all_ref_ch_types)
    for desc, ch_type in zip(descs, all_ref_ch_types):
        assert ch_type in desc
    assert_allclose(raw[picks][0], raw[all_ref_ch_types][0], atol=1e-12)
    data = raw.copy().apply_proj()[picks][0]
    assert len(data) == len(picks)
    assert_allclose(data, 0.0, atol=1e-12)  # zeroed out
    # a single joint proj will *not* zero out given these integer 1/2/3...
    # values per channel type assuming we have an even number of channels.
    # If this changes we'll have to make this check better
    assert len(all_ref_ch_types) % 2 == 0
    data_nz = (
        raw.del_proj()
        .set_eeg_reference(projection=True, ch_type=all_ref_ch_types, joint=True)
        .apply_proj()[picks][0]
    )
    assert not np.isclose(data_nz, 0.0).any()


def test_needs_eeg_average_ref_proj():
    """Test checking whether a recording needs an EEG average reference."""
    raw = read_raw_fif(raw_fname)
    assert _needs_eeg_average_ref_proj(raw.info)

    raw.set_eeg_reference(projection=True)
    assert not _needs_eeg_average_ref_proj(raw.info)

    # No EEG channels
    raw = read_raw_fif(raw_fname, preload=True)
    eeg = [raw.ch_names[c] for c in pick_types(raw.info, meg=False, eeg=True)]
    raw.drop_channels(eeg)
    assert not _needs_eeg_average_ref_proj(raw.info)

    # Custom ref flag set
    raw = read_raw_fif(raw_fname)
    with raw.info._unlock():
        raw.info["custom_ref_applied"] = True
    assert not _needs_eeg_average_ref_proj(raw.info)


def test_sss_proj():
    """Test `meg` proj option."""
    raw = read_raw_fif(raw_fname)
    raw.crop(0, 1.0).load_data().pick(picks="meg")
    raw.pick(raw.ch_names[:51]).del_proj()
    raw_sss = maxwell_filter(raw, int_order=5, ext_order=2)
    sss_rank = 21  # really low due to channel picking
    assert len(raw_sss.info["projs"]) == 0
    for meg, n_proj, want_rank in (
        ("separate", 6, sss_rank),
        ("combined", 3, sss_rank - 3),
    ):
        proj = compute_proj_raw(raw_sss, n_grad=3, n_mag=3, meg=meg, verbose="error")
        this_raw = raw_sss.copy().add_proj(proj).apply_proj()
        assert len(this_raw.info["projs"]) == n_proj
        sss_proj_rank = _compute_rank_int(this_raw)
        cov = compute_raw_covariance(this_raw, verbose="error")
        W, ch_names, rank = compute_whitener(cov, this_raw.info, return_rank=True)
        assert ch_names == this_raw.ch_names
        assert want_rank == sss_proj_rank == rank  # proper reduction
        if meg == "combined":
            assert this_raw.info["projs"][0]["data"]["col_names"] == ch_names
        else:
            mag_names = ch_names[2::3]
            assert this_raw.info["projs"][3]["data"]["col_names"] == mag_names


def test_eq_ne():
    """Test == and != between projectors."""
    raw = read_raw_fif(raw_fname, preload=False)
    assert len(raw.info["projs"]) == 3
    raw.set_eeg_reference(projection=True)

    pca1 = cp.deepcopy(raw.info["projs"][0])
    pca2 = cp.deepcopy(raw.info["projs"][1])
    car = cp.deepcopy(raw.info["projs"][3])

    assert pca1 != pca2
    assert pca1 != car
    assert pca2 != car
    assert pca1 == raw.info["projs"][0]
    assert pca2 == raw.info["projs"][1]
    assert car == raw.info["projs"][3]


def test_setup_proj():
    """Test setup_proj."""
    raw = read_raw_fif(raw_fname)
    assert _needs_eeg_average_ref_proj(raw.info)
    raw.del_proj()
    setup_proj(raw.info)


@testing.requires_testing_data
def test_compute_proj_explained_variance():
    """Test computation based on the explained variance."""
    raw = read_raw_fif(sample_path / "sample_audvis_trunc_raw.fif", preload=False)
    raw.crop(0, 5).load_data()
    raw.del_proj("all")
    with pytest.warns(RuntimeWarning, match="Too few samples"):
        projs = compute_proj_raw(raw, duration=None, n_grad=0.7, n_mag=0.9, n_eeg=0.8)
    for type_, n_vector_ in (("planar", 0.7), ("axial", 0.9), ("eeg", 0.8)):
        explained_var = [
            proj["explained_var"]
            for proj in projs
            if proj["desc"].split("-")[0] == type_
        ]
        assert n_vector_ <= sum(explained_var)
