from __future__ import annotations

import threading
from typing import TYPE_CHECKING, Literal

import pytest

pytest.importorskip("sqlite3")

import sqlite3

from filelock import Timeout
from filelock._read_write import (
    _MAX_SQLITE_TIMEOUT_MS,
    ReadWriteLock,
    _all_connections,
    _cleanup_connections,
    timeout_for_sqlite,
)

if TYPE_CHECKING:
    from collections.abc import Generator
    from pathlib import Path

    from pytest_mock import MockerFixture


@pytest.fixture(autouse=True)
def _clear_singleton_cache() -> Generator[None]:
    ReadWriteLock._instances.clear()
    yield
    for instance in list(ReadWriteLock._instances.valuerefs()):
        if (lock := instance()) is not None:
            lock.close()
    ReadWriteLock._instances.clear()


@pytest.fixture
def lock_file(tmp_path: Path) -> str:
    return str(tmp_path / "test_lock.db")


def test_timeout_for_sqlite_non_blocking() -> None:
    assert timeout_for_sqlite(10.0, blocking=False, already_waited=0.0) == 0


def test_timeout_for_sqlite_infinite_timeout() -> None:
    assert timeout_for_sqlite(-1, blocking=True, already_waited=0.0) == _MAX_SQLITE_TIMEOUT_MS


def test_timeout_for_sqlite_negative_timeout_raises() -> None:
    with pytest.raises(ValueError, match="timeout must be a non-negative number or -1"):
        timeout_for_sqlite(-2, blocking=True, already_waited=0.0)


def test_timeout_for_sqlite_positive_timeout_subtracts_waited() -> None:
    assert timeout_for_sqlite(5.0, blocking=True, already_waited=2.0) == 3000


def test_timeout_for_sqlite_waited_exceeds_timeout() -> None:
    assert timeout_for_sqlite(1.0, blocking=True, already_waited=2.0) == 0


def test_timeout_for_sqlite_zero_timeout() -> None:
    assert timeout_for_sqlite(0.0, blocking=True, already_waited=0.0) == 0


def test_timeout_for_sqlite_huge_timeout_clamped() -> None:
    assert timeout_for_sqlite(3_000_000.0, blocking=True, already_waited=0.0) == _MAX_SQLITE_TIMEOUT_MS


def test_singleton_returns_same_instance(lock_file: str) -> None:
    lock1 = ReadWriteLock(lock_file)
    lock2 = ReadWriteLock(lock_file)
    assert lock1 is lock2


def test_singleton_different_files(tmp_path: Path) -> None:
    lock1 = ReadWriteLock(str(tmp_path / "a.db"))
    lock2 = ReadWriteLock(str(tmp_path / "b.db"))
    assert lock1 is not lock2


def test_singleton_rejects_different_timeout(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, timeout=1.0)
    with pytest.raises(ValueError, match="Singleton lock created with"):
        ReadWriteLock(lock_file, timeout=2.0)
    del lock


def test_singleton_rejects_different_blocking(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, blocking=True)
    with pytest.raises(ValueError, match="Singleton lock created with"):
        ReadWriteLock(lock_file, blocking=False)
    del lock


def test_non_singleton_creates_new_instance(lock_file: str) -> None:
    lock1 = ReadWriteLock(lock_file, is_singleton=False)
    lock2 = ReadWriteLock(lock_file, is_singleton=False)
    assert lock1 is not lock2


def test_init_sets_attributes(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, timeout=5.0, blocking=False, is_singleton=False)
    assert lock.lock_file == lock_file
    assert lock.timeout == pytest.approx(5.0)
    assert lock.blocking is False
    assert lock._lock_level == 0
    assert lock._current_mode is None
    assert lock._write_thread_id is None
    assert lock._con is not None


def test_acquire_release_read(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    proxy = lock.acquire_read()
    assert lock._lock_level == 1
    assert lock._current_mode == "read"
    lock.release()
    assert lock._lock_level == 0
    assert lock._current_mode is None
    assert isinstance(proxy, object)


def test_acquire_release_write(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.acquire_write()
    assert lock._lock_level == 1
    assert lock._current_mode == "write"
    assert lock._write_thread_id == threading.get_ident()
    lock.release()
    assert lock._lock_level == 0
    assert lock._current_mode is None
    assert lock._write_thread_id is None


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_reentrant_lock(lock_file: str, mode: Literal["read", "write"]) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    acquire = lock.acquire_read if mode == "read" else lock.acquire_write
    acquire()
    acquire()
    assert lock._lock_level == 2
    lock.release()
    assert lock._lock_level == 1
    assert lock._current_mode == mode
    lock.release()
    assert lock._lock_level == 0


def test_upgrade_read_to_write_prohibited(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.acquire_read()
    with pytest.raises(RuntimeError, match=r"already holding a read lock.*upgrade not allowed"):
        lock.acquire_write()
    lock.release()


def test_downgrade_write_to_read_prohibited(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.acquire_write()
    with pytest.raises(RuntimeError, match=r"already holding a write lock.*downgrade not allowed"):
        lock.acquire_read()
    lock.release()


def test_release_unheld_lock_raises(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    with pytest.raises(RuntimeError, match="not held"):
        lock.release()


def test_release_force_unheld_is_noop(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.release(force=True)


def test_release_force_resets_level(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.acquire_write()
    lock.acquire_write()
    assert lock._lock_level == 2
    lock.release(force=True)
    assert lock._lock_level == 0
    assert lock._current_mode is None


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_lock_context_manager(lock_file: str, mode: Literal["read", "write"]) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    ctx = lock.read_lock() if mode == "read" else lock.write_lock()
    with ctx:
        assert lock._lock_level == 1
        assert lock._current_mode == mode
    assert lock._lock_level == 0
    assert lock._current_mode is None


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_lock_uses_instance_defaults(lock_file: str, mode: Literal["read", "write"]) -> None:
    lock = ReadWriteLock(lock_file, timeout=3.0, blocking=True, is_singleton=False)
    ctx = lock.read_lock() if mode == "read" else lock.write_lock()
    with ctx:
        assert lock._current_mode == mode


def test_read_lock_custom_timeout(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    with lock.read_lock(timeout=5.0):
        assert lock._current_mode == "read"


def test_write_lock_custom_blocking(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    with lock.write_lock(blocking=True):
        assert lock._current_mode == "write"


def test_close_releases_lock_and_closes_connection(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.acquire_write()
    assert lock._lock_level == 1
    lock.close()
    assert lock._lock_level == 0
    with pytest.raises(sqlite3.ProgrammingError, match="Cannot operate on a closed database"):
        lock._con.execute("SELECT 1;")


def test_close_on_unheld_lock_closes_connection(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.close()
    with pytest.raises(sqlite3.ProgrammingError, match="Cannot operate on a closed database"):
        lock._con.execute("SELECT 1;")


def test_write_lock_reentrant_from_different_thread_prohibited(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock.acquire_write()
    error: RuntimeError | None = None

    def try_reenter() -> None:
        nonlocal error
        try:
            lock.acquire_write()
        except RuntimeError as exc:
            error = exc

    thread = threading.Thread(target=try_reenter)
    thread.start()
    thread.join(timeout=5)
    lock.release()
    assert error is not None
    assert "while it is held by thread" in str(error)


@pytest.mark.parametrize(
    ("first", "second"),
    [
        pytest.param("read", "write", id="read-then-write"),
        pytest.param("write", "read", id="write-then-read"),
    ],
)
def test_sequential_mode_switch(lock_file: str, first: str, second: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    first_ctx = lock.read_lock() if first == "read" else lock.write_lock()
    second_ctx = lock.read_lock() if second == "read" else lock.write_lock()
    with first_ctx:
        pass
    with second_ctx:
        pass


def test_non_blocking_read_fails_when_write_held(lock_file: str) -> None:
    holder_lock = ReadWriteLock(lock_file, is_singleton=False)
    contender_lock = ReadWriteLock(lock_file, is_singleton=False)
    acquired = threading.Event()

    def hold_write() -> None:
        with holder_lock.write_lock():
            acquired.set()
            threading.Event().wait(timeout=2)

    thread = threading.Thread(target=hold_write)
    thread.start()
    acquired.wait(timeout=5)

    with pytest.raises(Timeout):
        contender_lock.acquire_read(blocking=False)

    thread.join(timeout=5)


def test_non_blocking_write_fails_when_read_held(lock_file: str) -> None:
    holder_lock = ReadWriteLock(lock_file, is_singleton=False)
    contender_lock = ReadWriteLock(lock_file, is_singleton=False)
    acquired = threading.Event()

    def hold_read() -> None:
        with holder_lock.read_lock():
            acquired.set()
            threading.Event().wait(timeout=2)

    thread = threading.Thread(target=hold_read)
    thread.start()
    acquired.wait(timeout=5)

    with pytest.raises(Timeout):
        contender_lock.acquire_write(blocking=False)

    thread.join(timeout=5)


def test_timeout_read_expires(lock_file: str) -> None:
    holder_lock = ReadWriteLock(lock_file, is_singleton=False)
    contender_lock = ReadWriteLock(lock_file, is_singleton=False)
    acquired = threading.Event()

    def hold_write() -> None:
        with holder_lock.write_lock():
            acquired.set()
            threading.Event().wait(timeout=3)

    thread = threading.Thread(target=hold_write)
    thread.start()
    acquired.wait(timeout=5)

    with pytest.raises(Timeout):
        contender_lock.acquire_read(timeout=0.2)

    thread.join(timeout=5)


def test_nested_read_context_managers(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    with lock.read_lock():
        assert lock._lock_level == 1
        with lock.read_lock():
            assert lock._lock_level == 2
            with lock.read_lock():
                assert lock._lock_level == 3
            assert lock._lock_level == 2
        assert lock._lock_level == 1
    assert lock._lock_level == 0


def test_nested_write_context_managers(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    with lock.write_lock():
        assert lock._lock_level == 1
        with lock.write_lock():
            assert lock._lock_level == 2
        assert lock._lock_level == 1
    assert lock._lock_level == 0


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_non_blocking_transaction_lock_timeout(lock_file: str, mode: Literal["read", "write"]) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock._transaction_lock.acquire()
    try:
        acquire = lock.acquire_read if mode == "read" else lock.acquire_write
        with pytest.raises(Timeout):
            acquire(blocking=False)
    finally:
        lock._transaction_lock.release()


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_non_blocking_with_timeout_no_value_error(lock_file: str, mode: Literal["read", "write"]) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock._transaction_lock.acquire()
    try:
        acquire = lock.acquire_read if mode == "read" else lock.acquire_write
        with pytest.raises(Timeout):
            acquire(blocking=False, timeout=5.0)
    finally:
        lock._transaction_lock.release()


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_finite_timeout_transaction_lock(lock_file: str, mode: Literal["read", "write"]) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    lock._transaction_lock.acquire()
    try:
        acquire = lock.acquire_read if mode == "read" else lock.acquire_write
        with pytest.raises(Timeout):
            acquire(timeout=0.1)
    finally:
        lock._transaction_lock.release()


def test_double_check_read_reentrance(lock_file: str, mocker: MockerFixture) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    real_lock = lock._transaction_lock
    mock_lock = mocker.MagicMock()

    def fake_acquire(blocking: bool = True, timeout: float = -1) -> bool:
        lock._lock_level = 1
        lock._current_mode = "read"
        return real_lock.acquire(blocking, timeout)

    mock_lock.acquire = fake_acquire
    mock_lock.release = real_lock.release
    lock._transaction_lock = mock_lock
    proxy = lock.acquire_read()
    assert lock._lock_level == 2
    assert proxy is not None
    lock._lock_level = 0
    lock._current_mode = None
    lock._con.rollback()


def test_double_check_write_reentrance(lock_file: str, mocker: MockerFixture) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    real_lock = lock._transaction_lock
    mock_lock = mocker.MagicMock()

    def fake_acquire(blocking: bool = True, timeout: float = -1) -> bool:
        lock._lock_level = 1
        lock._current_mode = "write"
        lock._write_thread_id = threading.get_ident()
        return real_lock.acquire(blocking, timeout)

    mock_lock.acquire = fake_acquire
    mock_lock.release = real_lock.release
    lock._transaction_lock = mock_lock
    proxy = lock.acquire_write()
    assert lock._lock_level == 2
    assert proxy is not None
    lock._lock_level = 0
    lock._current_mode = None
    lock._write_thread_id = None
    lock._con.rollback()


@pytest.mark.parametrize(
    ("acquire_mode", "conflicting_mode", "match"),
    [
        pytest.param("read", "write", "downgrade not allowed", id="read-blocked-by-write"),
        pytest.param("write", "read", "upgrade not allowed", id="write-blocked-by-read"),
    ],
)
def test_double_check_wrong_mode_raises(
    lock_file: str,
    mocker: MockerFixture,
    acquire_mode: Literal["read", "write"],
    conflicting_mode: Literal["read", "write"],
    match: str,
) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    real_lock = lock._transaction_lock
    mock_lock = mocker.MagicMock()

    def fake_acquire(blocking: bool = True, timeout: float = -1) -> bool:
        lock._lock_level = 1
        lock._current_mode = conflicting_mode
        return real_lock.acquire(blocking, timeout)

    mock_lock.acquire = fake_acquire
    mock_lock.release = real_lock.release
    lock._transaction_lock = mock_lock
    acquire = lock.acquire_read if acquire_mode == "read" else lock.acquire_write
    with pytest.raises(RuntimeError, match=match):
        acquire()
    lock._lock_level = 0
    lock._current_mode = None


@pytest.mark.parametrize(
    ("error_msg", "expected_exception"),
    [
        pytest.param("disk I/O error", sqlite3.OperationalError, id="non-locked-reraised"),
        pytest.param("database is locked", Timeout, id="locked-becomes-timeout"),
    ],
)
@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_operational_error_handling(
    lock_file: str,
    mocker: MockerFixture,
    error_msg: str,
    expected_exception: type[Exception],
    mode: Literal["read", "write"],
) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    con_mock = mocker.MagicMock()
    con_mock.execute.side_effect = sqlite3.OperationalError(error_msg)
    lock._con = con_mock
    acquire = lock.acquire_read if mode == "read" else lock.acquire_write
    with pytest.raises(expected_exception):
        acquire()


def test_write_lock_context_manager_overrides_defaults(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, timeout=10.0, blocking=False, is_singleton=False)
    with lock.write_lock(timeout=5.0, blocking=True):
        assert lock._current_mode == "write"


@pytest.mark.parametrize(
    "mode",
    [
        pytest.param("read", id="read"),
        pytest.param("write", id="write"),
    ],
)
def test_busy_timeout_recomputed_after_journal_mode(
    lock_file: str, mocker: MockerFixture, mode: Literal["read", "write"]
) -> None:
    counter = iter([0.0, 0.0, 0.5])
    mocker.patch("filelock._read_write.time.perf_counter", side_effect=counter)
    lock = ReadWriteLock(lock_file, is_singleton=False)
    acquire = lock.acquire_read if mode == "read" else lock.acquire_write
    acquire(timeout=2.0)
    lock.release()


def test_connection_tracked_in_all_connections(lock_file: str) -> None:
    lock = ReadWriteLock(lock_file, is_singleton=False)
    assert lock._con in _all_connections
    lock.close()


def test_cleanup_connections_closes_all(tmp_path: Path) -> None:
    lock1 = ReadWriteLock(str(tmp_path / "a.db"), is_singleton=False)
    lock2 = ReadWriteLock(str(tmp_path / "b.db"), is_singleton=False)
    con1, con2 = lock1._con, lock2._con
    _cleanup_connections()
    with pytest.raises(sqlite3.ProgrammingError, match="Cannot operate on a closed database"):
        con1.execute("SELECT 1;")
    with pytest.raises(sqlite3.ProgrammingError, match="Cannot operate on a closed database"):
        con2.execute("SELECT 1;")
