File: test_util_fifo.py

package info (click to toggle)
quodlibet 4.7.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,228 kB
  • sloc: python: 89,728; sh: 381; xml: 110; makefile: 91
file content (114 lines) | stat: -rw-r--r-- 3,892 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Copyright 2014 Christoph Reiter
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

from unittest import mock

from gi.repository import GLib
from quodlibet import print_d
from quodlibet.util import is_windows
from tests import TestCase, skipIf

from quodlibet.util.fifo import FIFOError, split_message, FIFO, fifo_exists, write_fifo
from tests.helper import temp_filename


class Tsplit_message(TestCase):
    def test_main(self):
        def func(d):
            return list(split_message(d))

        # no response format
        self.assertEqual(func(b""), [])
        self.assertEqual(func(b"foo"), [(b"foo", None)])
        self.assertEqual(func(b"foo\nbar"), [(b"foo", None), (b"bar", None)])

        # response format
        self.assertEqual(func(b"\x00a\x00/dev/null\x00"), [(b"a", b"/dev/null")])
        self.assertEqual(
            func(b"\x00a\x00/dev/null\x00\x00b\x00/dev/foo\x00"),
            [(b"a", b"/dev/null"), (b"b", b"/dev/foo")],
        )

        # mixed
        self.assertEqual(
            func(b"foo\x00a\x00/dev\x00"), [(b"foo", None), (b"a", b"/dev")]
        )
        self.assertEqual(
            func(b"\x00a\x00/dev\x00foo"), [(b"a", b"/dev"), (b"foo", None)]
        )
        self.assertEqual(
            func(b"\x00a\x00/dev\x00foo\x00b\x00/arg\x00bla"),
            [(b"a", b"/dev"), (b"foo", None), (b"b", b"/arg"), (b"bla", None)],
        )

        # inval
        self.assertRaises(ValueError, func, b"foo\x00bar")


@skipIf(is_windows(), "not on Windows")
class TFIFO(TestCase):
    def test_creation_destruction(self):
        def cb(bs, _):
            print_d(bs)

        with temp_filename() as fn:
            fifo = FIFO(fn, cb)
            assert not fifo_exists(fifo._path)
            fifo.open()
            assert fifo_exists(fifo._path)
        # Should *not* error if file is gone
        fifo.destroy()

    def test_unwriteable_location(self):
        fifo = FIFO("/dev/not-here", None)
        fifo.open()
        with self.assertRaises(FIFOError):
            write_fifo(fifo._path, b"foobar")
        fifo.destroy()

    def test_empty_read(self):
        result = []

        with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
            fifo = FIFO(f, result.append)
            source = mock.Mock()
            source.read.return_value = b""
            assert fifo._process(source, GLib.IO_IN) is False
            assert m_open.mock_calls == [mock.call(ignore_lock=True)]
            assert result == []

    def test_glib_err_read(self):
        result = []

        with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
            fifo = FIFO(f, result.append)
            source = mock.Mock()
            assert fifo._process(source, GLib.IO_ERR) is False
            assert m_open.mock_calls == [mock.call(ignore_lock=True)]
            assert result == []

    def test_oserror_read(self):
        result = []

        with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
            fifo = FIFO(f, result.append)
            source = mock.Mock()
            source.read.side_effect = OSError
            assert fifo._process(source, GLib.IO_IN) is False
            assert m_open.mock_calls == [mock.call(ignore_lock=True)]
            assert result == []

    def test_successful_read(self):
        result = []

        with temp_filename() as f, mock.patch.object(FIFO, "open") as m_open:
            fifo = FIFO(f, result.append)
            source = mock.Mock()
            source.read.return_value = b"foo"
            assert fifo._process(source, GLib.IO_IN) is True
            assert m_open.mock_calls == []
            assert result == [b"foo"]