1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
from __future__ import annotations
from contextlib import contextmanager
from io import BufferedRandom, BufferedWriter
from typing import TYPE_CHECKING
import pytest
from streamlink_cli.output import FileOutput
if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path
@contextmanager
def _create_fd(root: Path, name: str) -> Iterator[BufferedRandom]:
fd = (root / name).open("w+b")
try:
yield fd
finally:
fd.close()
@pytest.fixture()
def fd(tmp_path: Path):
with _create_fd(tmp_path, "file") as fd:
yield fd
@pytest.fixture(autouse=True)
def fake_stdout(monkeypatch: pytest.MonkeyPatch, tmp_path: Path):
# can't use in-memory io.BytesIO, since fd.fileno() is called on Windows
with _create_fd(tmp_path, "stdout") as fd:
monkeypatch.setattr("streamlink_cli.output.file.stdout", fd)
yield fd
def test_early_close(tmp_path: Path, fd: BufferedRandom):
filename = tmp_path / "foo" / "bar"
fo = FileOutput(filename=filename, record=FileOutput(fd=fd))
assert isinstance(fo.record, FileOutput)
assert not fo.opened
assert not fo.record.opened
assert not filename.exists()
fo.close()
fo.record.close()
def test_early_write(tmp_path: Path):
filename = tmp_path / "foo" / "bar"
fo = FileOutput(filename=filename)
assert not fo.opened
assert not filename.exists()
with pytest.raises(OSError, match=r"^Output is not opened$"):
fo.write(b"foo")
def test_open_write_close(tmp_path: Path, fd: BufferedRandom):
filename = tmp_path / "foo" / "bar"
fo = FileOutput(filename=filename, record=FileOutput(fd=fd))
assert fo.fd is None
assert isinstance(fo.record, FileOutput)
fo.open()
assert fo.opened
assert fo.record.opened
assert filename.parent.is_dir()
assert filename.is_file()
assert isinstance(fo.fd, BufferedWriter)
assert isinstance(fo.record.fd, BufferedRandom)
fo.write(b"foo")
fo.write(b"bar")
fo.write(b"baz")
fo.fd.flush()
fo.record.fd.flush()
assert filename.read_bytes() == b"foobarbaz"
fo.record.fd.seek(0)
assert fo.record.fd.read() == b"foobarbaz"
fo.close()
assert not fo.opened
assert not fo.record.opened
assert fo.fd.closed
assert fo.record.fd.closed
def test_write_stdout(fake_stdout: BufferedRandom):
fo = FileOutput(fd=fake_stdout)
assert fo.fd is fake_stdout
assert fo.filename is None
assert fo.record is None
fo.open()
assert fo.opened
fo.write(b"foo")
fo.write(b"bar")
fo.write(b"baz")
fo.fd.seek(0)
assert fo.fd.read() == b"foobarbaz"
fo.close()
assert not fo.opened
assert not fo.fd.closed
|