File: test_tailpipe.py

package info (click to toggle)
con-duct 0.17.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 584 kB
  • sloc: python: 4,324; sh: 22; makefile: 18
file content (73 lines) | stat: -rw-r--r-- 2,515 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from __future__ import annotations
from pathlib import Path
import subprocess
import tempfile
from unittest.mock import patch
import pytest
from utils import MockStream
from con_duct.__main__ import TailPipe

# 10^7 line fixture is about 70MB
FIXTURE_LIST = [f"ten_{i}" for i in range(1, 8)]


@pytest.fixture(scope="module", params=FIXTURE_LIST)
def fixture_path(
    request: pytest.FixtureRequest, tmp_path_factory: pytest.TempPathFactory
) -> str:
    num_lines_exponent = int(request.param.split("_")[1])
    base_temp_dir = tmp_path_factory.mktemp("fixture_data")
    file_path = base_temp_dir / f"{request.param}.txt"
    with open(file_path, "w") as f:
        for i in range(10**num_lines_exponent):
            f.write(f"{i}\n")
    # print(f"10 ^ {num_lines_exponent}: {10 ** num_lines_exponent}")
    # print(f"Fixture file size: {os.path.getsize(file_path)} bytes")
    return str(file_path)


@patch("sys.stdout", new_callable=MockStream)
def test_high_throughput_stdout(mock_stdout: MockStream, fixture_path: str) -> None:
    with tempfile.NamedTemporaryFile(mode="wb") as tmpfile:
        process = subprocess.Popen(
            ["cat", fixture_path],
            stdout=tmpfile,
        )
        stream = TailPipe(tmpfile.name, mock_stdout.buffer)
        stream.start()
        process.wait()
        stream.close()

    assert process.returncode == 0
    with open(fixture_path, "rb") as fixture:
        expected = fixture.read()
    assert mock_stdout.getvalue() == expected


@patch("sys.stderr", new_callable=MockStream)
def test_high_throughput_stderr(mock_stderr: MockStream, fixture_path: str) -> None:
    with tempfile.NamedTemporaryFile(mode="wb") as tmpfile:
        process = subprocess.Popen(
            [Path(__file__).with_name("data") / "cat_to_err.py", fixture_path],
            stdout=subprocess.DEVNULL,
            stderr=tmpfile,
        )
        stream = TailPipe(tmpfile.name, mock_stderr.buffer)
        stream.start()
        process.wait()
        stream.close()

    assert process.returncode == 0
    with open(fixture_path, "rb") as fixture:
        expected = fixture.read()
    assert mock_stderr.getvalue() == expected


@patch("sys.stdout", new_callable=MockStream)
def test_close(mock_stdout: MockStream) -> None:
    with tempfile.NamedTemporaryFile(mode="wb") as tmpfile:
        stream = TailPipe(tmpfile.name, mock_stdout.buffer)
        stream.start()
        stream.close()
        assert stream.infile is not None
        assert stream.infile.closed