1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
# Copyright 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
from unittest import mock
from gi.repository import GLib
from quodlibet import print_d
from quodlibet.util import is_windows
from tests import TestCase, skipIf
from quodlibet.util.fifo import FIFOError, split_message, FIFO, fifo_exists, write_fifo
from tests.helper import temp_filename
class Tsplit_message(TestCase):
def test_main(self):
def func(d):
return list(split_message(d))
# no response format
self.assertEqual(func(b""), [])
self.assertEqual(func(b"foo"), [(b"foo", None)])
self.assertEqual(func(b"foo\nbar"), [(b"foo", None), (b"bar", None)])
# response format
self.assertEqual(func(b"\x00a\x00/dev/null\x00"), [(b"a", b"/dev/null")])
self.assertEqual(
func(b"\x00a\x00/dev/null\x00\x00b\x00/dev/foo\x00"),
[(b"a", b"/dev/null"), (b"b", b"/dev/foo")],
)
# mixed
self.assertEqual(
func(b"foo\x00a\x00/dev\x00"), [(b"foo", None), (b"a", b"/dev")]
)
self.assertEqual(
func(b"\x00a\x00/dev\x00foo"), [(b"a", b"/dev"), (b"foo", None)]
)
self.assertEqual(
func(b"\x00a\x00/dev\x00foo\x00b\x00/arg\x00bla"),
[(b"a", b"/dev"), (b"foo", None), (b"b", b"/arg"), (b"bla", None)],
)
# inval
self.assertRaises(ValueError, func, b"foo\x00bar")
@skipIf(is_windows(), "not on Windows")
class TFIFO(TestCase):
def test_creation_destruction(self):
def cb(bs, _):
print_d(bs)
with temp_filename() as fn:
fifo = FIFO(fn, cb)
assert not fifo_exists(fifo._path)
fifo.open()
assert fifo_exists(fifo._path)
# Should *not* error if file is gone
fifo.destroy()
def test_unwriteable_location(self):
fifo = FIFO("/dev/not-here", None)
fifo.open()
with self.assertRaises(FIFOError):
write_fifo(fifo._path, b"foobar")
fifo.destroy()
def test_empty_read(self):
result = []
with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
fifo = FIFO(f, result.append)
source = mock.Mock()
source.read.return_value = b""
assert fifo._process(source, GLib.IO_IN) is False
assert m_open.mock_calls == [mock.call(ignore_lock=True)]
assert result == []
def test_glib_err_read(self):
result = []
with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
fifo = FIFO(f, result.append)
source = mock.Mock()
assert fifo._process(source, GLib.IO_ERR) is False
assert m_open.mock_calls == [mock.call(ignore_lock=True)]
assert result == []
def test_oserror_read(self):
result = []
with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
fifo = FIFO(f, result.append)
source = mock.Mock()
source.read.side_effect = OSError
assert fifo._process(source, GLib.IO_IN) is False
assert m_open.mock_calls == [mock.call(ignore_lock=True)]
assert result == []
def test_successful_read(self):
result = []
with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
fifo = FIFO(f, result.append)
source = mock.Mock()
source.read.return_value = b"foo"
assert fifo._process(source, GLib.IO_IN) is True
assert m_open.mock_calls == []
assert result == [b"foo"]
|