# Owner(s): ["oncall: distributed"]

# Copyright 2019 Kakao Brain
#
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import pytest
import torch

from torch.distributed.pipeline.sync.stream import (
    CPUStream,
    current_stream,
    default_stream,
    get_device,
    is_cuda,
    new_stream,
    record_stream,
    use_device,
    use_stream,
    wait_stream,
)

skip_if_no_cuda = pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")


class TestNewStream:
    def test_new_stream_cpu(self):
        stream = new_stream(torch.device("cpu"))
        assert stream is CPUStream

    @skip_if_no_cuda
    def test_new_stream_cuda(self):
        stream = new_stream(torch.device("cuda"))
        assert isinstance(stream, torch.cuda.Stream)
        assert stream != torch.cuda.default_stream()


class TestCurrentStream:
    def test_current_stream_cpu(self):
        stream = current_stream(torch.device("cpu"))
        assert stream is CPUStream

    @skip_if_no_cuda
    def test_current_stream_cuda(self):
        stream = current_stream(torch.device("cuda"))
        assert isinstance(stream, torch.cuda.Stream)
        assert stream == torch.cuda.current_stream()


class TestDefaultStream:
    def test_default_stream_cpu(self):
        stream = default_stream(torch.device("cpu"))
        assert stream is CPUStream

    @skip_if_no_cuda
    def test_default_stream_cuda(self):
        stream = default_stream(torch.device("cuda"))
        assert isinstance(stream, torch.cuda.Stream)
        assert stream == torch.cuda.default_stream()


class TestUseDevice:
    def test_use_device_cpu(self):
        with use_device(torch.device("cpu")):
            pass

    @skip_if_no_cuda
    def test_use_device_cuda(self):
        with use_device(torch.device("cuda")):
            pass


class TestUseStream:
    def test_use_stream_cpu(self):
        with use_stream(CPUStream):
            pass

    @skip_if_no_cuda
    def test_use_stream_cuda(self):
        stream = new_stream(torch.device("cuda"))
        with use_stream(stream):
            assert current_stream(torch.device("cuda")) == stream


class TestGetDevice:
    def test_get_device_cpu(self):
        assert get_device(CPUStream).type == "cpu"

    @skip_if_no_cuda
    def test_get_device_cuda(self):
        stream = current_stream(torch.device("cuda"))
        assert get_device(stream).type == "cuda"


class TestWaitStream:
    def _test_wait_stream(self, source, target, cuda_sleep=None):
        with use_stream(target):
            if is_cuda(target):
                cuda_sleep(0.5)
            x = torch.ones(100, 100, device=get_device(target))

        wait_stream(source, target)

        with use_stream(source):
            assert x.sum().item() == 10000

    def test_wait_stream_cpu_cpu(self):
        source = CPUStream
        target = CPUStream
        self._test_wait_stream(source, target)

    @skip_if_no_cuda
    def test_wait_stream_cpu_cuda(self, cuda_sleep):
        source = CPUStream
        target = new_stream(torch.device("cuda"))
        self._test_wait_stream(source, target, cuda_sleep)

    @skip_if_no_cuda
    def test_wait_stream_cuda_cpu(self, cuda_sleep):
        source = new_stream(torch.device("cuda"))
        target = CPUStream
        self._test_wait_stream(source, target, cuda_sleep)

    @skip_if_no_cuda
    def test_wait_stream_cuda_cuda(self, cuda_sleep):
        source = current_stream(torch.device("cuda"))
        target = new_stream(torch.device("cuda"))
        self._test_wait_stream(source, target, cuda_sleep)


class TestRecordStream:
    def test_record_stream_cpu(self):
        # It should silently ignore CPU tensors.
        x = torch.rand(1, device=torch.device("cpu"))
        record_stream(x, CPUStream)

    @skip_if_no_cuda
    def test_record_stream_cuda(self, cuda_sleep):
        # This test detects unexpected block reallocation. For reliable test,
        # the stream to allocate tensors is isolated. The allocator will not
        # reuse free blocks which were allocated from another stream.
        stream_alloc = new_stream(torch.device("cuda"))
        with torch.cuda.stream(stream_alloc):
            x = torch.rand(1, device=torch.device("cuda"))

        stream = new_stream(torch.device("cuda"))
        record_stream(x, stream)
        with use_stream(stream):
            cuda_sleep(0.5)

        # 'x' is deleted at Python's perspective. But the block of 'x' is still
        # required for 'stream'. 'y' shouldn't be allocated to the block.
        data_ptr = x.data_ptr()
        del x
        stream_alloc.synchronize()
        with torch.cuda.stream(stream_alloc):
            y = torch.rand(1, device=torch.device("cuda"))
        assert y.data_ptr() != data_ptr

        # Pause Python until 'stream' finishes tasks queued. Now the block of
        # 'x' is free to be reallocated.
        wait_stream(CPUStream, stream)
        with torch.cuda.stream(stream_alloc):
            z = torch.rand(1, device=torch.device("cuda"))
        assert z.data_ptr() == data_ptr

    @skip_if_no_cuda
    def test_record_stream_shifted_view(self, cuda_sleep):
        # Issue: https://github.com/pytorch/pytorch/issues/27366
        stream_alloc = new_stream(torch.device("cuda"))
        with torch.cuda.stream(stream_alloc):
            x = torch.rand(2, device=torch.device("cuda"))

        y = x[1:]
        assert y.data_ptr() > x.data_ptr()

        stream = new_stream(torch.device("cuda"))
        with use_stream(stream):
            cuda_sleep(0.5)
        record_stream(y, stream)

        data_ptr = x.data_ptr()
        del x, y

        stream_alloc.synchronize()
        with torch.cuda.stream(stream_alloc):
            z = torch.rand(2, device=torch.device("cuda"))
        assert z.data_ptr() != data_ptr
