1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
from gi.repository import GLib, Gio
from senf import fsn2bytes, bytes2fsn
from . import TestCase, skipIf
from .helper import temp_filename
import quodlibet
from quodlibet.remote import QuodLibetUnixRemote
from quodlibet.util import is_windows
QLPATH = str(Path(quodlibet.__file__).resolve().parent.parent)
class Mock:
def __init__(self, resp=None):
self.lines = []
self.resp = resp
def handle_line(self, app, line):
self.lines.append(line)
return self.resp
@skipIf(is_windows(), "unix only")
class TUnixRemote(TestCase):
def test_fifo(self):
mock = Mock()
remote = QuodLibetUnixRemote(None, mock)
remote._callback(b"foo\n")
remote._callback(b"bar\nbaz")
self.assertEqual(
mock.lines, [bytes2fsn(b, None) for b in [b"foo", b"bar", b"baz"]])
def test_response(self):
with temp_filename() as fn:
mock = Mock(resp=bytes2fsn(b"resp", None))
remote = QuodLibetUnixRemote(None, mock)
remote._callback(b"\x00foo\x00" + fsn2bytes(fn, None) + b"\x00")
self.assertEqual(mock.lines, [bytes2fsn(b"foo", None)])
with open(fn, "rb") as h:
self.assertEqual(h.read(), b"resp")
@skipIf(is_windows(), "unix only")
class TUnixRemoteFifoFullCycle(TestCase):
@pytest.fixture(autouse=True)
def tmp_fifo_path(self, tmp_path):
self.registry = Mock(resp=bytes2fsn(b"response", None))
self.tmp_path = tmp_path
with mock.patch.object(QuodLibetUnixRemote, "_PATH", str(tmp_path / "control")):
yield
def _send_message_remote_proc(self, msg, callback):
"""Execute QuodLibetUnixRemote.send_message() in a child process"""
# Using a child process is the only way to execute send_message, as it
# can only be run on the main thread, and blocks waiting for the
# response. It can't be run in a separate thread, because it uses
# signals to handle timeouts and we can't run the GLib mainloop in a
# thread as Gtk has already been initialised on the Python main thread.
# Attempting to use the loop in a thread leads to segfaults.
temp_script = self.tmp_path / "send_message.py"
with temp_script.open("w") as fpy:
fpy.write(
f"""\
import sys
import traceback
from quodlibet.remote import QuodLibetUnixRemote
QuodLibetUnixRemote._PATH = {QuodLibetUnixRemote._PATH!r}
msg = sys.stdin.read()
try:
result = QuodLibetUnixRemote.send_message(msg)
except Exception:
traceback.print_exc(file=sys.stderr)
else:
sys.stdout.buffer.write(result)
sys.stdout.flush()
"""
)
def finished(proc, result):
try:
success, stdout, stderr = proc.communicate_finish(result)
except GLib.Error as ex:
callback(None, ex)
else:
if not success:
return
if stderr.get_size():
callback(None, stderr.get_data().decode())
callback(stdout.get_data(), None)
try:
launcher = Gio.SubprocessLauncher.new(
Gio.SubprocessFlags.STDOUT_PIPE
| Gio.SubprocessFlags.STDERR_PIPE
| Gio.SubprocessFlags.STDIN_PIPE
)
path = [QLPATH]
if "PYTHONPATH" in os.environ:
path.append(os.environ["PYTHONPATH"])
launcher.setenv("PYTHONPATH", ":".join(path), True)
proc = launcher.spawnv([sys.executable, str(temp_script)])
input = GLib.Bytes.new(msg)
proc.communicate_async(input, None, finished)
except Exception as ex:
callback(None, ex)
def send_message(self, msg: str) -> bytes:
"""Send a message to a remote
Runs the GLib main loop, with the QuodLibetUnixRemote listener active,
sends it a message and waits for the response.
"""
result = error = None
remote = None
loop = GLib.MainLoop()
def proc_callback(*res):
nonlocal result, error
result, error = res
remote.stop()
loop.quit()
def run_receiver_and_remote():
nonlocal remote
remote = QuodLibetUnixRemote(None, self.registry)
remote.start()
GLib.idle_add(self._send_message_remote_proc, msg, proc_callback)
GLib.idle_add(run_receiver_and_remote)
loop.run()
if error is not None:
if isinstance(error, str):
pytest.fail(error, pytrace=False)
raise error
return result
def test_remote_send_message(self):
response = self.send_message(b"foo 42")
self.assertEqual(self.registry.lines, [bytes2fsn(b"foo 42", None)])
self.assertEqual(response, b"response")
|