File: EspGridHelper.py

package info (click to toggle)
python-renardo-lib 0.9.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,220 kB
  • sloc: python: 10,999; sh: 34; makefile: 7
file content (118 lines) | stat: -rw-r--r-- 3,483 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from renardo_lib.ServerManager import BidirectionalOSCServer, OSCMessage

class EspGrid:
    def __init__(self, address=("localhost", 5510), clock=None):

        self.server_address = address
        self.metro = clock

        self.server = BidirectionalOSCServer(("0.0.0.0", 0))
        self.server.connect(self.server_address)
        
        self.server.addMsgHandler('/reset', self._reset_clock)

    def __repr__(self):
        return self.__class__.__name__

    def _reset_clock(self, *args):
        self.metro.reset()
        return

    def set_clock_mode(self, value):
        message = OSCMessage("/esp/clockMode/s")
        message.append(int(value))
        return self.server.send(message)

    def get_clock_mode(self):
        self.server.send(OSCMessage("/esp/clockMode/q"))
        return self.server.receive("/esp/clockMode/r")

    # Retrieve tempo information

    def query(self):
        self.server.send(OSCMessage("/esp/clock/q"))
        return self.server.receive("/esp/clock/r")

    def get_tempo(self):
        """ Queries the EspGrid Server for tempo and return a list:
            [on, bpm, seconds, nanoseconds, beat] """
        self.server.send(OSCMessage("/esp/tempo/q"))
        return self.server.receive("/esp/tempo/r")

    def get_tempo_cpu(self):
        self.server.send(OSCMessage("/esp/tempoCPU/q"))
        return self.server.receive("/esp/tempoCPU/r")

    # Setting tempo data

    def toggle_tempo(self, flag):
        message = OSCMessage("/esp/beat/on")
        message.append(flag)
        return self.server.send(message)

    def start_tempo(self):
        return self.toggle_tempo(1)

    def stop_tempo(self):
        return self.toggle_tempo(0)

    def set_tempo(self, value):
        message = OSCMessage("/esp/beat/tempo")
        message.append(float(value))
        return self.server.send(message)

    def get_start_time(self):
        data = self.get_tempo()
        sec, nano = data[2], data[3]
        return float("{}.{}".format(sec, nano))

    # Subcription to immediate messages

    def subscribe(self):
        message = OSCMessage("/esp/subscribe")
        return self.server.send(message)

    def unsubscribe(self):
        message = OSCMessage("/esp/unsubscribe")
        return self.server.send(message)

    # Identification

    def set_name(self, name):
        message = OSCMessage("/esp/person/s")
        message.append(str(name))
        return self.server.send(message)

    def get_name(self):
        self.server.send(OSCMessage("/esp/person/q"))
        return self.server.receive("/esp/person/r")

    def set_machine_name(self, name):
        message = OSCMessage("/esp/machine/s")
        message.append(str(name))
        return self.server.send(message)

    def get_machine_name(self):
        self.server.send(OSCMessage("/esp/machine/q"))
        return self.server.receive("/esp/machine/r")

    # Sending messages
    def now(self, args):
        message = OSCMessage("/esp/msg/now")
        message.append(args)
        return self.server.send(message)

    def soon(self, args):
        message = OSCMessage("/esp/msg/soon")
        message.append(args)
        return self.server.send(message)

    def future(self, seconds, nanoseconds, args):
        message = OSCMessage("/esp/msg/future")
        message.append([seconds, nanosecons, args])
        return self.server.send(message)

    # Receiving
    def recv(self, pattern):
        return self.server.receive(pattern)