File: jack_bases.py

package info (click to toggle)
raysession 0.17.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,168 kB
  • sloc: python: 44,371; sh: 1,538; makefile: 208; xml: 86
file content (82 lines) | stat: -rw-r--r-- 2,387 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from enum import Enum, auto
from queue import Queue
import time
from typing import Iterator, TypeAlias, Optional

from .port_data import PortData


PatchEventArg: TypeAlias = str | PortData | tuple[str, str] | tuple[int, str, str]


class PatchEngineOuterMissing(Exception):
    def __init__(self) -> None:
        super().__init__(f'No PatchEngineOuter set for this PatchEngine')


class PatchEvent(Enum):
    CLIENT_ADDED = auto()
    CLIENT_REMOVED = auto()
    PORT_ADDED = auto()
    PORT_REMOVED = auto()
    PORT_RENAMED = auto()
    CONNECTION_ADDED = auto()
    CONNECTION_REMOVED = auto()
    CONNECTION_FAILED = auto()
    METADATA_CHANGED = auto()
    SHUTDOWN = auto()
    XRUN = auto()
    BLOCKSIZE_CHANGED = auto()
    SAMPLERATE_CHANGED = auto()


class PatchEventQueue(Queue[tuple[PatchEvent, PatchEventArg]]):
    def __init__(self, maxsize: int = 0):
        super().__init__(maxsize)
        self.oldies_queue = Queue()
        
    def add(self, *args):
        nargs = len(args)
        if nargs > 2:
            event, *rest = args
        elif nargs == 2:
            event, rest = args
        elif nargs == 1:
            event = args[0]
            rest = None
        else:
            raise TypeError
        self.put((event, rest))
        self.oldies_queue.put((event, rest, time.time()))
        
    def __iter__(self) -> Iterator[tuple[PatchEvent, PatchEventArg]]:
        while self.qsize():
            event, event_arg = self.get()
            yield (event, event_arg)
    
    def oldies(self, required_time: float = 0.200) \
            -> Iterator[tuple[PatchEvent, PatchEventArg]]:
        '''Iter only events older than required_time'''
        while self.oldies_queue.qsize():
            if time.time() - self.oldies_queue.queue[0][2] < required_time:
                break

            event, event_arg, time_ = self.oldies_queue.get()
            yield (event, event_arg)


class ClientNamesUuids(dict[str, int]):
    def __init__(self):
        super().__init__()
        self._revd = dict[int, str]()
        
    def __setitem__(self, key: str, value: int):
        super().__setitem__(key, value)
        self._revd[value] = key
        
    def __delitem__(self, key):
        self._revd.pop(self[key])
        super().__delitem__(key)
    
    def name_from_uuid(self, uuid: int) -> Optional[str]:
        return self._revd.get(uuid)