# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import Any, Callable, Coroutine, cast

import pytest

from playwright.async_api import Error, Page, Request, Route
from tests.server import Server


async def test_should_work(page: Page, server: Server) -> None:
    await page.route("**/*", lambda route: asyncio.create_task(route.fallback()))
    await page.goto(server.EMPTY_PAGE)


async def test_should_fall_back(page: Page, server: Server) -> None:
    intercepted = []

    def _handler1(route: Route) -> None:
        intercepted.append(1)
        asyncio.create_task(route.fallback())

    await page.route("**/empty.html", _handler1)

    def _handler2(route: Route) -> None:
        intercepted.append(2)
        asyncio.create_task(route.fallback())

    await page.route("**/empty.html", _handler2)

    def _handler3(route: Route) -> None:
        intercepted.append(3)
        asyncio.create_task(route.fallback())

    await page.route("**/empty.html", _handler3)

    await page.goto(server.EMPTY_PAGE)
    assert intercepted == [3, 2, 1]


async def test_should_fall_back_async_delayed(page: Page, server: Server) -> None:
    intercepted = []

    def create_handler(i: int) -> Callable[[Route], Coroutine]:
        async def handler(route: Route) -> None:
            intercepted.append(i)
            await asyncio.sleep(0.1)
            await route.fallback()

        return handler

    await page.route("**/empty.html", create_handler(1))
    await page.route("**/empty.html", create_handler(2))
    await page.route("**/empty.html", create_handler(3))
    await page.goto(server.EMPTY_PAGE)
    assert intercepted == [3, 2, 1]


async def test_should_chain_once(page: Page, server: Server) -> None:
    await page.route(
        "**/madeup.txt",
        lambda route: asyncio.create_task(
            route.fulfill(status=200, body="fulfilled one")
        ),
        times=1,
    )
    await page.route(
        "**/madeup.txt", lambda route: asyncio.create_task(route.fallback()), times=1
    )

    resp = await page.goto(server.PREFIX + "/madeup.txt")
    assert resp
    body = await resp.body()
    assert body == b"fulfilled one"


async def test_should_not_chain_fulfill(page: Page, server: Server) -> None:
    failed = [False]

    def handler(route: Route) -> None:
        failed[0] = True

    await page.route("**/empty.html", handler)
    await page.route(
        "**/empty.html",
        lambda route: asyncio.create_task(route.fulfill(status=200, body="fulfilled")),
    )
    await page.route(
        "**/empty.html", lambda route: asyncio.create_task(route.fallback())
    )

    response = await page.goto(server.EMPTY_PAGE)
    assert response
    body = await response.body()
    assert body == b"fulfilled"
    assert not failed[0]


async def test_should_not_chain_abort(
    page: Page, server: Server, is_webkit: bool, is_firefox: bool
) -> None:
    failed = [False]

    def handler(route: Route) -> None:
        failed[0] = True

    await page.route("**/empty.html", handler)
    await page.route("**/empty.html", lambda route: asyncio.create_task(route.abort()))
    await page.route(
        "**/empty.html", lambda route: asyncio.create_task(route.fallback())
    )

    with pytest.raises(Error) as excinfo:
        await page.goto(server.EMPTY_PAGE)
    if is_webkit:
        assert "Blocked by Web Inspector" in excinfo.value.message
    elif is_firefox:
        assert "NS_ERROR_FAILURE" in excinfo.value.message
    else:
        assert "net::ERR_FAILED" in excinfo.value.message
    assert not failed[0]


async def test_should_fall_back_after_exception(page: Page, server: Server) -> None:
    await page.route("**/empty.html", lambda route: route.continue_())

    async def handler(route: Route) -> None:
        try:
            await route.fulfill(response=cast(Any, {}))
        except Exception:
            await route.fallback()

    await page.route("**/empty.html", handler)

    await page.goto(server.EMPTY_PAGE)


async def test_should_amend_http_headers(page: Page, server: Server) -> None:
    values = []

    async def handler(route: Route) -> None:
        values.append(route.request.headers.get("foo"))
        values.append(await route.request.header_value("FOO"))
        await route.continue_()

    await page.route("**/sleep.zzz", handler)

    async def handler_with_header_mods(route: Route) -> None:
        await route.fallback(headers={**route.request.headers, "FOO": "bar"})

    await page.route("**/*", handler_with_header_mods)

    await page.goto(server.EMPTY_PAGE)
    with server.expect_request("/sleep.zzz") as server_request_info:
        await page.evaluate("() => fetch('/sleep.zzz')")
    values.append(server_request_info.value.getHeader("foo"))
    assert values == ["bar", "bar", "bar"]


async def test_should_delete_header_with_undefined_value(
    page: Page, server: Server
) -> None:
    await page.goto(server.EMPTY_PAGE)
    server.set_route(
        "/something",
        lambda r: (
            r.setHeader("Acces-Control-Allow-Origin", "*"),
            r.write(b"done"),
            r.finish(),
        ),
    )

    intercepted_request = []

    async def capture_and_continue(route: Route, request: Request) -> None:
        intercepted_request.append(request)
        await route.continue_()

    await page.route("**/*", capture_and_continue)

    async def delete_foo_header(route: Route, request: Request) -> None:
        headers = await request.all_headers()
        del headers["foo"]
        await route.fallback(headers=headers)

    await page.route(server.PREFIX + "/something", delete_foo_header)

    [server_req, text] = await asyncio.gather(
        server.wait_for_request("/something"),
        page.evaluate(
            """
            async url => {
                const data = await fetch(url, {
                    headers: {
                    foo: 'a',
                    bar: 'b',
                    }
                });
                return data.text();
                }
            """,
            server.PREFIX + "/something",
        ),
    )

    assert text == "done"
    assert not intercepted_request[0].headers.get("foo")
    assert intercepted_request[0].headers.get("bar") == "b"
    assert not server_req.getHeader("foo")
    assert server_req.getHeader("bar") == "b"


async def test_should_amend_method(page: Page, server: Server) -> None:
    await page.goto(server.EMPTY_PAGE)

    method = []

    def _handler(route: Route) -> None:
        method.append(route.request.method)
        asyncio.create_task(route.continue_())

    await page.route("**/*", _handler)
    await page.route(
        "**/*", lambda route: asyncio.create_task(route.fallback(method="POST"))
    )

    [request, _] = await asyncio.gather(
        server.wait_for_request("/sleep.zzz"),
        page.evaluate("() => fetch('/sleep.zzz')"),
    )

    assert method == ["POST"]
    assert request.method == b"POST"


async def test_should_override_request_url(page: Page, server: Server) -> None:
    url = []

    def _handler1(route: Route) -> None:
        url.append(route.request.url)
        asyncio.create_task(route.continue_())

    await page.route("**/global-var.html", _handler1)

    def _handler2(route: Route) -> None:
        asyncio.create_task(route.fallback(url=server.PREFIX + "/global-var.html"))

    await page.route("**/foo", _handler2)

    [server_request, response, _] = await asyncio.gather(
        server.wait_for_request("/global-var.html"),
        page.wait_for_event("response"),
        page.goto(server.PREFIX + "/foo"),
    )

    assert url == [server.PREFIX + "/global-var.html"]
    assert response.url == server.PREFIX + "/global-var.html"
    assert response.request.url == server.PREFIX + "/global-var.html"
    assert await page.evaluate("() => window['globalVar']") == 123
    assert server_request.uri == b"/global-var.html"
    assert server_request.method == b"GET"


async def test_should_amend_post_data(page: Page, server: Server) -> None:
    await page.goto(server.EMPTY_PAGE)
    post_data = []

    def _handler(route: Route) -> None:
        post_data.append(route.request.post_data)
        asyncio.create_task(route.continue_())

    await page.route("**/*", _handler)
    await page.route(
        "**/*", lambda route: asyncio.create_task(route.fallback(post_data="doggo"))
    )
    [server_request, _] = await asyncio.gather(
        server.wait_for_request("/sleep.zzz"),
        page.evaluate("() => fetch('/sleep.zzz', { method: 'POST', body: 'birdy' })"),
    )
    assert post_data == ["doggo"]
    assert server_request.post_body == b"doggo"


async def test_should_amend_binary_post_data(page: Page, server: Server) -> None:
    await page.goto(server.EMPTY_PAGE)
    post_data_buffer = []

    def _handler1(route: Route) -> None:
        post_data_buffer.append(route.request.post_data)
        asyncio.create_task(route.continue_())

    await page.route("**/*", _handler1)

    async def _handler2(route: Route) -> None:
        await route.fallback(post_data=b"\x00\x01\x02\x03\x04")

    await page.route("**/*", _handler2)

    [server_request, result] = await asyncio.gather(
        server.wait_for_request("/sleep.zzz"),
        page.evaluate("fetch('/sleep.zzz', { method: 'POST', body: 'birdy' })"),
    )
    # FIXME: should this be bytes?
    assert post_data_buffer == ["\x00\x01\x02\x03\x04"]
    assert server_request.method == b"POST"
    assert server_request.post_body == b"\x00\x01\x02\x03\x04"


async def test_should_chain_fallback_with_dynamic_url(
    server: Server, page: Page
) -> None:
    intercepted = []

    def _handler1(route: Route) -> None:
        intercepted.append(1)
        asyncio.create_task(route.fallback(url=server.EMPTY_PAGE))

    await page.route("**/bar", _handler1)

    def _handler2(route: Route, request: Request) -> None:
        intercepted.append(2)
        asyncio.create_task(route.fallback(url="http://localhost/bar"))

    await page.route("**/foo", _handler2)

    def _handler3(route: Route, request: Request) -> None:
        intercepted.append(3)
        asyncio.create_task(route.fallback(url="http://localhost/foo"))

    await page.route("**/empty.html", _handler3)

    await page.goto(server.EMPTY_PAGE)
    assert intercepted == [3, 2, 1]


async def test_should_amend_json_post_data(server: Server, page: Page) -> None:
    await page.goto(server.EMPTY_PAGE)
    post_data = []

    def _handle1(route: Route, request: Request) -> None:
        post_data.append(route.request.post_data)
        asyncio.create_task(route.continue_())

    await page.route("**/*", _handle1)
    await page.route(
        "**/*",
        lambda route: asyncio.create_task(route.fallback(post_data={"foo": "bar"})),
    )

    [server_request, _] = await asyncio.gather(
        server.wait_for_request("/sleep.zzz"),
        page.evaluate("() => fetch('/sleep.zzz', { method: 'POST', body: 'birdy' })"),
    )
    assert post_data == ['{"foo": "bar"}']
    assert server_request.post_body == b'{"foo": "bar"}'
