"""
Test the fastica algorithm.
"""
import itertools
import warnings

import numpy as np
from scipy import stats

from sklearn.utils.testing import assert_almost_equal
from sklearn.utils.testing import assert_array_almost_equal
from sklearn.utils.testing import assert_less
from sklearn.utils.testing import assert_equal
from sklearn.utils.testing import assert_warns
from sklearn.utils.testing import assert_raises
from sklearn.utils.testing import assert_raises_regex

from sklearn.decomposition import FastICA, fastica, PCA
from sklearn.decomposition.fastica_ import _gs_decorrelation
from sklearn.externals.six import moves
from sklearn.exceptions import ConvergenceWarning


def center_and_norm(x, axis=-1):
    """ Centers and norms x **in place**

        Parameters
        -----------
        x: ndarray
            Array with an axis of observations (statistical units) measured on
            random variables.
        axis: int, optional
            Axis along which the mean and variance are calculated.
    """
    x = np.rollaxis(x, axis)
    x -= x.mean(axis=0)
    x /= x.std(axis=0)


def test_gs():
    # Test gram schmidt orthonormalization
    # generate a random orthogonal  matrix
    rng = np.random.RandomState(0)
    W, _, _ = np.linalg.svd(rng.randn(10, 10))
    w = rng.randn(10)
    _gs_decorrelation(w, W, 10)
    assert_less((w ** 2).sum(), 1.e-10)
    w = rng.randn(10)
    u = _gs_decorrelation(w, W, 5)
    tmp = np.dot(u, W.T)
    assert_less((tmp[:5] ** 2).sum(), 1.e-10)


def test_fastica_simple(add_noise=False):
    # Test the FastICA algorithm on very simple data.
    rng = np.random.RandomState(0)
    # scipy.stats uses the global RNG:
    np.random.seed(0)
    n_samples = 1000
    # Generate two sources:
    s1 = (2 * np.sin(np.linspace(0, 100, n_samples)) > 0) - 1
    s2 = stats.t.rvs(1, size=n_samples)
    s = np.c_[s1, s2].T
    center_and_norm(s)
    s1, s2 = s

    # Mixing angle
    phi = 0.6
    mixing = np.array([[np.cos(phi), np.sin(phi)],
                       [np.sin(phi), -np.cos(phi)]])
    m = np.dot(mixing, s)

    if add_noise:
        m += 0.1 * rng.randn(2, 1000)

    center_and_norm(m)

    # function as fun arg
    def g_test(x):
        return x ** 3, (3 * x ** 2).mean(axis=-1)

    algos = ['parallel', 'deflation']
    nls = ['logcosh', 'exp', 'cube', g_test]
    whitening = [True, False]
    for algo, nl, whiten in itertools.product(algos, nls, whitening):
        if whiten:
            k_, mixing_, s_ = fastica(m.T, fun=nl, algorithm=algo)
            assert_raises(ValueError, fastica, m.T, fun=np.tanh,
                          algorithm=algo)
        else:
            X = PCA(n_components=2, whiten=True).fit_transform(m.T)
            k_, mixing_, s_ = fastica(X, fun=nl, algorithm=algo, whiten=False)
            assert_raises(ValueError, fastica, X, fun=np.tanh,
                          algorithm=algo)
        s_ = s_.T
        # Check that the mixing model described in the docstring holds:
        if whiten:
            assert_almost_equal(s_, np.dot(np.dot(mixing_, k_), m))

        center_and_norm(s_)
        s1_, s2_ = s_
        # Check to see if the sources have been estimated
        # in the wrong order
        if abs(np.dot(s1_, s2)) > abs(np.dot(s1_, s1)):
            s2_, s1_ = s_
        s1_ *= np.sign(np.dot(s1_, s1))
        s2_ *= np.sign(np.dot(s2_, s2))

        # Check that we have estimated the original sources
        if not add_noise:
            assert_almost_equal(np.dot(s1_, s1) / n_samples, 1, decimal=2)
            assert_almost_equal(np.dot(s2_, s2) / n_samples, 1, decimal=2)
        else:
            assert_almost_equal(np.dot(s1_, s1) / n_samples, 1, decimal=1)
            assert_almost_equal(np.dot(s2_, s2) / n_samples, 1, decimal=1)

    # Test FastICA class
    _, _, sources_fun = fastica(m.T, fun=nl, algorithm=algo, random_state=0)
    ica = FastICA(fun=nl, algorithm=algo, random_state=0)
    sources = ica.fit_transform(m.T)
    assert_equal(ica.components_.shape, (2, 2))
    assert_equal(sources.shape, (1000, 2))

    assert_array_almost_equal(sources_fun, sources)
    assert_array_almost_equal(sources, ica.transform(m.T))

    assert_equal(ica.mixing_.shape, (2, 2))

    for fn in [np.tanh, "exp(-.5(x^2))"]:
        ica = FastICA(fun=fn, algorithm=algo, random_state=0)
        assert_raises(ValueError, ica.fit, m.T)

    assert_raises(TypeError, FastICA(fun=moves.xrange(10)).fit, m.T)


def test_fastica_nowhiten():
    m = [[0, 1], [1, 0]]

    # test for issue #697
    ica = FastICA(n_components=1, whiten=False, random_state=0)
    assert_warns(UserWarning, ica.fit, m)
    assert hasattr(ica, 'mixing_')


def test_fastica_convergence_fail():
    # Test the FastICA algorithm on very simple data
    # (see test_non_square_fastica).
    # Ensure a ConvergenceWarning raised if the tolerance is sufficiently low.
    rng = np.random.RandomState(0)

    n_samples = 1000
    # Generate two sources:
    t = np.linspace(0, 100, n_samples)
    s1 = np.sin(t)
    s2 = np.ceil(np.sin(np.pi * t))
    s = np.c_[s1, s2].T
    center_and_norm(s)
    s1, s2 = s

    # Mixing matrix
    mixing = rng.randn(6, 2)
    m = np.dot(mixing, s)

    # Do fastICA with tolerance 0. to ensure failing convergence
    ica = FastICA(algorithm="parallel", n_components=2, random_state=rng,
                  max_iter=2, tol=0.)
    assert_warns(ConvergenceWarning, ica.fit, m.T)


def test_non_square_fastica(add_noise=False):
    # Test the FastICA algorithm on very simple data.
    rng = np.random.RandomState(0)

    n_samples = 1000
    # Generate two sources:
    t = np.linspace(0, 100, n_samples)
    s1 = np.sin(t)
    s2 = np.ceil(np.sin(np.pi * t))
    s = np.c_[s1, s2].T
    center_and_norm(s)
    s1, s2 = s

    # Mixing matrix
    mixing = rng.randn(6, 2)
    m = np.dot(mixing, s)

    if add_noise:
        m += 0.1 * rng.randn(6, n_samples)

    center_and_norm(m)

    k_, mixing_, s_ = fastica(m.T, n_components=2, random_state=rng)
    s_ = s_.T

    # Check that the mixing model described in the docstring holds:
    assert_almost_equal(s_, np.dot(np.dot(mixing_, k_), m))

    center_and_norm(s_)
    s1_, s2_ = s_
    # Check to see if the sources have been estimated
    # in the wrong order
    if abs(np.dot(s1_, s2)) > abs(np.dot(s1_, s1)):
        s2_, s1_ = s_
    s1_ *= np.sign(np.dot(s1_, s1))
    s2_ *= np.sign(np.dot(s2_, s2))

    # Check that we have estimated the original sources
    if not add_noise:
        assert_almost_equal(np.dot(s1_, s1) / n_samples, 1, decimal=3)
        assert_almost_equal(np.dot(s2_, s2) / n_samples, 1, decimal=3)


def test_fit_transform():
    # Test FastICA.fit_transform
    rng = np.random.RandomState(0)
    X = rng.random_sample((100, 10))
    for whiten, n_components in [[True, 5], [False, None]]:
        n_components_ = (n_components if n_components is not None else
                         X.shape[1])

        ica = FastICA(n_components=n_components, whiten=whiten, random_state=0)
        Xt = ica.fit_transform(X)
        assert_equal(ica.components_.shape, (n_components_, 10))
        assert_equal(Xt.shape, (100, n_components_))

        ica = FastICA(n_components=n_components, whiten=whiten, random_state=0)
        ica.fit(X)
        assert_equal(ica.components_.shape, (n_components_, 10))
        Xt2 = ica.transform(X)

        assert_array_almost_equal(Xt, Xt2)


def test_inverse_transform():
    # Test FastICA.inverse_transform
    n_features = 10
    n_samples = 100
    n1, n2 = 5, 10
    rng = np.random.RandomState(0)
    X = rng.random_sample((n_samples, n_features))
    expected = {(True, n1): (n_features, n1),
                (True, n2): (n_features, n2),
                (False, n1): (n_features, n2),
                (False, n2): (n_features, n2)}
    for whiten in [True, False]:
        for n_components in [n1, n2]:
            n_components_ = (n_components if n_components is not None else
                             X.shape[1])
            ica = FastICA(n_components=n_components, random_state=rng,
                          whiten=whiten)
            with warnings.catch_warnings(record=True):
                # catch "n_components ignored" warning
                Xt = ica.fit_transform(X)
            expected_shape = expected[(whiten, n_components_)]
            assert_equal(ica.mixing_.shape, expected_shape)
            X2 = ica.inverse_transform(Xt)
            assert_equal(X.shape, X2.shape)

            # reversibility test in non-reduction case
            if n_components == X.shape[1]:
                assert_array_almost_equal(X, X2)


def test_fastica_errors():
    n_features = 3
    n_samples = 10
    rng = np.random.RandomState(0)
    X = rng.random_sample((n_samples, n_features))
    w_init = rng.randn(n_features + 1, n_features + 1)
    assert_raises_regex(ValueError, 'max_iter should be greater than 1',
                        FastICA, max_iter=0)
    assert_raises_regex(ValueError, r'alpha must be in \[1,2\]',
                        fastica, X, fun_args={'alpha': 0})
    assert_raises_regex(ValueError, 'w_init has invalid shape.+'
                        r'should be \(3L?, 3L?\)',
                        fastica, X, w_init=w_init)
    assert_raises_regex(ValueError,
                        'Invalid algorithm.+must be.+parallel.+or.+deflation',
                        fastica, X, algorithm='pizza')
