1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
|
from __future__ import annotations
import logging
from pathlib import Path, PurePath
import pytest
from filelock import AsyncFileLock, AsyncSoftFileLock, BaseAsyncFileLock, Timeout
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.parametrize("path_type", [str, PurePath, Path])
@pytest.mark.parametrize("filename", ["a", "new/b", "new2/new3/c"])
@pytest.mark.asyncio
async def test_simple(
lock_type: type[BaseAsyncFileLock],
path_type: type[str | Path],
filename: str,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
caplog.set_level(logging.DEBUG)
# test lock creation by passing a `str`
lock_path = tmp_path / filename
lock = lock_type(path_type(lock_path))
async with lock as locked:
assert lock.is_locked
assert lock is locked
assert not lock.is_locked
assert caplog.messages == [
f"Attempting to acquire lock {id(lock)} on {lock_path}",
f"Lock {id(lock)} acquired on {lock_path}",
f"Attempting to release lock {id(lock)} on {lock_path}",
f"Lock {id(lock)} released on {lock_path}",
]
assert [r.levelno for r in caplog.records] == [logging.DEBUG, logging.DEBUG, logging.DEBUG, logging.DEBUG]
assert [r.name for r in caplog.records] == ["filelock", "filelock", "filelock", "filelock"]
assert logging.getLogger("filelock").level == logging.NOTSET
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.parametrize("path_type", [str, PurePath, Path])
@pytest.mark.parametrize("filename", ["a", "new/b", "new2/new3/c"])
@pytest.mark.asyncio
async def test_acquire(
lock_type: type[BaseAsyncFileLock],
path_type: type[str | Path],
filename: str,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
caplog.set_level(logging.DEBUG)
# test lock creation by passing a `str`
lock_path = tmp_path / filename
lock = lock_type(path_type(lock_path))
async with await lock.acquire() as locked:
assert lock.is_locked
assert lock is locked
assert not lock.is_locked
assert caplog.messages == [
f"Attempting to acquire lock {id(lock)} on {lock_path}",
f"Lock {id(lock)} acquired on {lock_path}",
f"Attempting to release lock {id(lock)} on {lock_path}",
f"Lock {id(lock)} released on {lock_path}",
]
assert [r.levelno for r in caplog.records] == [logging.DEBUG, logging.DEBUG, logging.DEBUG, logging.DEBUG]
assert [r.name for r in caplog.records] == ["filelock", "filelock", "filelock", "filelock"]
assert logging.getLogger("filelock").level == logging.NOTSET
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.asyncio
async def test_non_blocking(lock_type: type[BaseAsyncFileLock], tmp_path: Path) -> None:
# raises Timeout error when the lock cannot be acquired
lock_path = tmp_path / "a"
lock_1, lock_2 = lock_type(str(lock_path)), lock_type(str(lock_path))
lock_3 = lock_type(str(lock_path), blocking=False)
lock_4 = lock_type(str(lock_path), timeout=0)
lock_5 = lock_type(str(lock_path), blocking=False, timeout=-1)
# acquire lock 1
await lock_1.acquire()
assert lock_1.is_locked
assert not lock_2.is_locked
assert not lock_3.is_locked
assert not lock_4.is_locked
assert not lock_5.is_locked
# try to acquire lock 2
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
await lock_2.acquire(blocking=False)
assert not lock_2.is_locked
assert lock_1.is_locked
# try to acquire pre-parametrized `blocking=False` lock 3 with `acquire`
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
await lock_3.acquire()
assert not lock_3.is_locked
assert lock_1.is_locked
# try to acquire pre-parametrized `blocking=False` lock 3 with context manager
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
async with lock_3:
pass
assert not lock_3.is_locked
assert lock_1.is_locked
# try to acquire pre-parametrized `timeout=0` lock 4 with `acquire`
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
await lock_4.acquire()
assert not lock_4.is_locked
assert lock_1.is_locked
# try to acquire pre-parametrized `timeout=0` lock 4 with context manager
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
async with lock_4:
pass
assert not lock_4.is_locked
assert lock_1.is_locked
# blocking precedence over timeout
# try to acquire pre-parametrized `timeout=-1,blocking=False` lock 5 with `acquire`
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
await lock_5.acquire()
assert not lock_5.is_locked
assert lock_1.is_locked
# try to acquire pre-parametrized `timeout=-1,blocking=False` lock 5 with context manager
with pytest.raises(Timeout, match=r"The file lock '.*' could not be acquired."):
async with lock_5:
pass
assert not lock_5.is_locked
assert lock_1.is_locked
# release lock 1
await lock_1.release()
assert not lock_1.is_locked
assert not lock_2.is_locked
assert not lock_3.is_locked
assert not lock_4.is_locked
assert not lock_5.is_locked
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.parametrize("thread_local", [True, False])
@pytest.mark.asyncio
async def test_non_executor(lock_type: type[BaseAsyncFileLock], thread_local: bool, tmp_path: Path) -> None:
lock_path = tmp_path / "a"
lock = lock_type(str(lock_path), thread_local=thread_local, run_in_executor=False)
async with lock as locked:
assert lock.is_locked
assert lock is locked
assert not lock.is_locked
@pytest.mark.asyncio
async def test_coroutine_function(tmp_path: Path) -> None:
acquired = released = False
class AioFileLock(BaseAsyncFileLock):
async def _acquire(self) -> None: # type: ignore[override]
nonlocal acquired
acquired = True
self._context.lock_file_fd = 1
async def _release(self) -> None: # type: ignore[override]
nonlocal released
released = True
self._context.lock_file_fd = None
lock = AioFileLock(str(tmp_path / "a"))
await lock.acquire()
assert acquired
assert not released
await lock.release()
assert acquired
assert released
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.asyncio
async def test_wait_message_logged(
lock_type: type[BaseAsyncFileLock], tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG)
lock_path = tmp_path / "a"
first_lock = lock_type(str(lock_path))
second_lock = lock_type(str(lock_path), timeout=0.2)
# Hold the lock so second_lock has to wait
await first_lock.acquire()
with pytest.raises(Timeout):
await second_lock.acquire()
assert any("waiting" in msg for msg in caplog.messages)
@pytest.mark.parametrize("lock_type", [AsyncSoftFileLock, AsyncFileLock])
@pytest.mark.asyncio
async def test_attempting_to_acquire_branch(
lock_type: type[BaseAsyncFileLock], tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG)
lock = lock_type(str(tmp_path / "a"))
await lock.acquire()
assert any("Attempting to acquire lock" in m for m in caplog.messages)
await lock.release()
@pytest.mark.asyncio
async def test_thread_local_run_in_executor(tmp_path: Path) -> None: # noqa: RUF029
with pytest.raises(ValueError, match="run_in_executor is not supported when thread_local is True"):
AsyncSoftFileLock(str(tmp_path / "a"), thread_local=True, run_in_executor=True)
@pytest.mark.parametrize("lock_type", [AsyncSoftFileLock, AsyncFileLock])
@pytest.mark.asyncio
async def test_attempting_to_acquire(
lock_type: type[BaseAsyncFileLock], tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG)
lock = lock_type(str(tmp_path / "a.lock"), run_in_executor=False)
await lock.acquire(timeout=0.1)
assert any("Attempting to acquire lock" in m for m in caplog.messages)
await lock.release()
@pytest.mark.parametrize("lock_type", [AsyncSoftFileLock, AsyncFileLock])
@pytest.mark.asyncio
async def test_attempting_to_release(
lock_type: type[BaseAsyncFileLock], tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG)
lock = lock_type(str(tmp_path / "a.lock"), run_in_executor=False)
await lock.acquire(timeout=0.1) # lock_counter = 1, is_locked = True
await lock.acquire(timeout=0.1) # lock_counter = 2 (reentrant)
await lock.release(force=True)
assert any("Attempting to release lock" in m for m in caplog.messages)
assert any("released" in m for m in caplog.messages)
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.asyncio
async def test_release_early_exit_when_unlocked(lock_type: type[BaseAsyncFileLock], tmp_path: Path) -> None:
lock = lock_type(str(tmp_path / "a.lock"), run_in_executor=False)
assert not lock.is_locked
await lock.release()
assert not lock.is_locked
@pytest.mark.parametrize("lock_type", [AsyncFileLock, AsyncSoftFileLock])
@pytest.mark.asyncio
async def test_release_nonzero_counter_exit(
lock_type: type[BaseAsyncFileLock], tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG)
lock = lock_type(str(tmp_path / "a.lock"), run_in_executor=False)
await lock.acquire()
await lock.acquire()
await lock.release() # counter goes 2→1
assert lock.lock_counter == 1
assert lock.is_locked
assert not any("Attempting to release" in m for m in caplog.messages)
await lock.release()
|