File: test_gio.py

package info (click to toggle)
quodlibet 4.7.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,228 kB
  • sloc: python: 89,728; sh: 381; xml: 110; makefile: 91
file content (88 lines) | stat: -rw-r--r-- 2,941 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
import shutil
from pathlib import Path
from time import sleep, time

from _pytest.fixtures import fixture
from gi.repository import Gio

from quodlibet import print_d
from quodlibet.library.file import Event
from quodlibet.util.path import normalize_path
from tests import mkdtemp, run_gtk_loop
from tests.helper import temp_filename

SLEEP_SECS = 0.2
"""Enough for OS events / caches to filter through to GIO etc"""


@fixture
def temp_dir() -> Path:
    out_path = Path(mkdtemp())
    yield out_path
    shutil.rmtree(out_path)


class BasicMonitor:
    def __init__(self, path: Path):
        self.changed = []
        f = Gio.File.new_for_path(str(path))
        monitor = f.monitor_directory(Gio.FileMonitorFlags.WATCH_MOVES, None)
        handler_id = monitor.connect("changed", self._file_changed)
        self._monitors = {path: (monitor, handler_id)}
        print_d(f"Monitoring {path!s}")

    def _file_changed(
        self,
        _monitor,
        main_file: Gio.File,
        other_file: Gio.File | None,
        event_type: Gio.FileMonitorEvent,
    ) -> None:
        file_path = main_file.get_path()
        other_path = (
            Path(normalize_path(other_file.get_path(), True)) if other_file else None
        )
        print_d(f"Got event {event_type} on {file_path}->{other_path}")
        self.changed.append((event_type, file_path))

    @property
    def event_types(self) -> set[Event]:
        return {changed[0] for changed in self.changed}


class TestFileMonitor:
    def test_create_delete(self, temp_dir: Path):
        path = temp_dir
        monitor = BasicMonitor(path)
        some_file = path / "foo.txt"
        some_file.write_text("test")
        sleep(SLEEP_SECS)
        run_gtk_loop()
        assert monitor.changed, "No events after creation"
        assert monitor.event_types >= {Event.CREATED}
        monitor.changed.clear()
        some_file.unlink()
        sleep(SLEEP_SECS)
        run_gtk_loop()
        assert monitor.changed, "No events after deletion"
        assert monitor.event_types >= {Event.DELETED}

    def test_move(self, temp_dir: Path):
        monitor = BasicMonitor(temp_dir)
        with temp_filename(dir=temp_dir, suffix=".txt", as_path=True) as path:
            path.write_text("test\n")
            sleep(SLEEP_SECS)
            run_gtk_loop()
            assert monitor.changed, "No events after creation"
            monitor.changed.clear()

            new_name = f"new-{time()}.txt"
            path.rename(path.parent / new_name)
            sleep(SLEEP_SECS)
            run_gtk_loop()
            assert monitor.changed
            assert monitor.event_types >= {Event.RENAMED}, f"Got {monitor.changed}"