File: playerctl.py

package info (click to toggle)
playerctl 2.4.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 704 kB
  • sloc: ansic: 5,157; python: 1,107; xml: 198; sh: 133; makefile: 62
file content (85 lines) | stat: -rw-r--r-- 2,448 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import asyncio
import os
from shlex import join


class CommandResult:
    def __init__(self, stdout, stderr, returncode):
        self.stdout = stdout.decode().strip()
        self.stderr = stderr.decode().strip()
        self.returncode = returncode


class PlayerctlProcess:
    def __init__(self, proc, debug=False):
        self.queue = asyncio.Queue()
        self.proc = proc

        async def reader(stream):
            while True:
                line = await stream.readline()
                if not line:
                    break
                line = line.decode().strip()
                if 'playerctl-DEBUG:' in line:
                    print(line)
                else:
                    self.queue.put_nowait(line)

        async def printer(stream):
            while True:
                line = await stream.readline()
                print(line)
                if not line:
                    break

        asyncio.get_event_loop().create_task(reader(proc.stdout))
        # asyncio.get_event_loop().create_task(printer(proc.stderr))

    def running(self):
        return self.proc.returncode is None


class PlayerctlCli:
    def __init__(self, bus_address=None, debug=False):
        self.bus_address = bus_address
        self.debug = debug
        self.proc = None

    async def _start(self, cmd):
        env = os.environ.copy()
        shell_cmd = f'playerctl {cmd}'

        if self.bus_address:
            env['DBUS_SESSION_BUS_ADDRESS'] = self.bus_address
        if self.debug:
            env['G_MESSAGES_DEBUG'] = 'playerctl'

        return await asyncio.create_subprocess_shell(
            shell_cmd,
            env=env,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)

    async def start(self, cmd):
        proc = await self._start(cmd)
        return PlayerctlProcess(proc)

    async def run(self, cmd):
        proc = await self._start(cmd)
        stdout, stderr = await proc.communicate()
        await proc.wait()
        return CommandResult(stdout, stderr, proc.returncode)

    async def list(self, players=[], ignored=[]):
        args = ['--list-all']

        if players:
            args.extend(['--player', ','.join(players)])

        if ignored:
            args.extend(['--ignored-players', ','.join(ignored)])

        cmd = await self.run(join(args))
        assert cmd.returncode == 0, cmd.stderr
        return cmd.stdout.splitlines()