import unittest

from reactivex import Observable, operators
from reactivex.testing import ReactiveTest
from reactivex.testing.subscription import Subscription
from reactivex.testing.testscheduler import TestScheduler

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created


class RxException(Exception):
    pass


class TestConcatMap(unittest.TestCase):
    def test_concat_map_and_flatten_each_item(self):
        scheduler = TestScheduler()
        e1 = scheduler.create_hot_observable(
            on_next(220, 1),
            on_next(300, 3),
            on_next(330, 5),
            on_completed(500),
        )
        e2 = scheduler.create_cold_observable(
            on_next(0, 10), on_next(10, 10), on_next(20, 10), on_completed(30)
        )

        def create_inner(x: int) -> Observable[int]:
            return e2.pipe(operators.map(lambda i: i * x))

        def test_create():
            return e1.pipe(operators.concat_map(create_inner))

        results = scheduler.start(test_create)
        assert results.messages == [
            on_next(220, 10),
            on_next(230, 10),
            on_next(240, 10),
            on_next(300, 30),
            on_next(310, 30),
            on_next(320, 30),
            on_next(330, 50),
            on_next(340, 50),
            on_next(350, 50),
            on_completed(500),
        ]
        assert e1.subscriptions == [Subscription(200, 500)]
        assert e2.subscriptions == [
            Subscription(220, 250),
            Subscription(300, 330),
            Subscription(330, 360),
        ]

    def test_concat_map_many_inner_inner_never_completes(self):
        """should concat_ap many outer to many inner, inner never completes"""
        scheduler = TestScheduler()
        e1 = scheduler.create_hot_observable(
            on_next(210, 1),
            on_next(300, 4),  # take 4 will never complete
            on_next(400, 5),
        )
        e2 = scheduler.create_cold_observable(
            on_next(0, 5), on_next(10, 55), on_next(20, 555)
        )

        def create_inner(x: int) -> Observable[int]:
            return e2.pipe(operators.take(x))

        def test_create():
            return e1.pipe(operators.concat_map(create_inner))

        results = scheduler.start(test_create)
        assert results.messages == [
            on_next(210, 5),
            on_next(300, 5),
            on_next(310, 55),
            on_next(320, 555),
        ]
        assert e1.subscriptions == [Subscription(200, 1000)]
        assert e2.subscriptions == [
            Subscription(210, 210),
            Subscription(300, 1000),  # unsubs when we dispose of e1 at end of test
        ]

    def test_concat_map_finalize_before_next(self):
        """should finalize before moving to the next observable"""
        scheduler = TestScheduler()
        e1 = scheduler.create_hot_observable(
            on_next(210, 2),
            on_next(220, 4),
            on_next(600, 6),
        )
        e2 = scheduler.create_cold_observable(
            on_next(50, 5), on_next(100, 55), on_completed(100)
        )

        def create_inner(x: int) -> Observable[int]:
            return e2.pipe(operators.map(lambda i: i * x))

        def test_create():
            return e1.pipe(operators.concat_map(create_inner))

        results = scheduler.start(test_create)
        assert results.messages == [
            on_next(260, 10),
            on_next(310, 110),
            on_next(310 + 50, 20),
            on_next(410, 220),
            on_next(650, 30),
            on_next(700, 330),
        ]
        assert e1.subscriptions == [Subscription(200, 1000)]
        assert e2.subscriptions == [
            Subscription(210, 310),
            Subscription(310, 410),
            Subscription(600, 700),
        ]

    def test_concat_map_inner_errors(self):
        """should propagate errors if the mapped inner throws"""
        scheduler = TestScheduler()
        e1 = scheduler.create_cold_observable(
            on_next(0, 0),
            on_next(50, 1),
            on_next(100, 2),
        )

        inners = [
            scheduler.create_cold_observable(
                on_next(10, 1), on_next(100, 2), on_completed(100)
            ),
            scheduler.create_cold_observable(
                on_next(10, 50), on_error(80, Exception("no"))
            ),
            scheduler.create_cold_observable(
                on_next(10, 1), on_next(100, 2), on_completed(100)
            ),
        ]

        def create_inner(x: int) -> Observable[int]:
            return inners[x]

        def test_create():
            return e1.pipe(operators.concat_map(create_inner))

        results = scheduler.start(test_create)
        assert results.messages == [
            on_next(210, 1),
            on_next(300, 2),
            on_next(310, 50),
            on_error(380, Exception("no")),
        ]
        assert e1.subscriptions == [Subscription(200, 380)]
        e2, e3, e4 = inners
        assert e2.subscriptions == [
            Subscription(200, 300),
        ]
        assert e3.subscriptions == [
            Subscription(300, 380),
        ]
        assert e4.subscriptions == []

    def test_concat_map_outer_errors(self):
        scheduler = TestScheduler()
        e1 = scheduler.create_hot_observable(
            on_next(210, 2),
            on_next(220, 4),
            on_error(230, Exception("a")),
        )
        e2 = scheduler.create_cold_observable(on_next(0, 5), on_completed(100))

        def create_inner(x: int) -> Observable[int]:
            return e2.pipe(operators.map(lambda i: i * x))

        def test_create():
            return e1.pipe(operators.concat_map(create_inner))

        results = scheduler.start(test_create)

        assert results.messages == [on_next(210, 10), on_error(230, Exception("a"))]
        assert e1.subscriptions == [Subscription(200, 230)]
        assert e2.subscriptions == [
            Subscription(210, 230)
        ]  # should not be any further sub and should unsub from e2 on outer error
