File: test_gio.py

package info (click to toggle)
quodlibet 4.6.0-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 18,016 kB
  • sloc: python: 85,817; sh: 385; xml: 110; makefile: 91
file content (86 lines) | stat: -rw-r--r-- 3,056 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
import shutil
from pathlib import Path
from time import sleep, time
from typing import Optional, Set

from _pytest.fixtures import fixture
from gi.repository import Gio

from quodlibet import print_d
from quodlibet.library.file import Event
from quodlibet.util.path import normalize_path
from tests import mkdtemp, run_gtk_loop
from tests.helper import temp_filename

SLEEP_SECS = 0.2
"""Enough for OS events / caches to filter through to GIO etc"""


@fixture
def temp_dir() -> Path:
    out_path = Path(mkdtemp())
    yield out_path
    shutil.rmtree(out_path)


class BasicMonitor:

    def __init__(self, path: Path):
        self.changed = []
        f = Gio.File.new_for_path(str(path))
        monitor = f.monitor_directory(Gio.FileMonitorFlags.WATCH_MOVES, None)
        handler_id = monitor.connect("changed", self._file_changed)
        self._monitors = {path: (monitor, handler_id)}
        print_d(f"Monitoring {path!s}")

    def _file_changed(self, _monitor, main_file: Gio.File,
                      other_file: Optional[Gio.File],
                      event_type: Gio.FileMonitorEvent) -> None:
        file_path = main_file.get_path()
        other_path = (Path(normalize_path(other_file.get_path(), True))
                      if other_file else None)
        print_d(f"Got event {event_type} on {file_path}->{other_path}")
        self.changed.append((event_type, file_path))

    @property
    def event_types(self) -> Set[Event]:
        return {changed[0] for changed in self.changed}


class TestFileMonitor:
    def test_create_delete(self, temp_dir: Path):
        path = temp_dir
        monitor = BasicMonitor(path)
        some_file = (path / "foo.txt")
        some_file.write_text("test")
        sleep(SLEEP_SECS)
        run_gtk_loop()
        assert monitor.changed, "No events after creation"
        # assert monitor.event_types >= {EventType.CHANGED, EventType.CREATED}
        assert monitor.event_types >= {Event.CREATED}
        monitor.changed.clear()
        some_file.unlink()
        sleep(SLEEP_SECS)
        run_gtk_loop()
        assert monitor.changed, "No events after deletion"
        assert monitor.event_types >= {Event.DELETED}

    def test_move(self, temp_dir: Path):
        monitor = BasicMonitor(temp_dir)
        with temp_filename(dir=temp_dir, suffix=".txt", as_path=True) as path:
            path.write_text("test\n")
            sleep(SLEEP_SECS)
            run_gtk_loop()
            assert monitor.changed, "No events after creation"
            monitor.changed.clear()

            new_name = f"new-{time()}.txt"
            path.rename(path.parent / new_name)
            sleep(SLEEP_SECS)
            run_gtk_loop()
            assert monitor.changed
            assert monitor.event_types >= {Event.RENAMED}, f"Got {monitor.changed}"