1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
from renardo_lib.ServerManager import BidirectionalOSCServer, OSCMessage
class EspGrid:
def __init__(self, address=("localhost", 5510), clock=None):
self.server_address = address
self.metro = clock
self.server = BidirectionalOSCServer(("0.0.0.0", 0))
self.server.connect(self.server_address)
self.server.addMsgHandler('/reset', self._reset_clock)
def __repr__(self):
return self.__class__.__name__
def _reset_clock(self, *args):
self.metro.reset()
return
def set_clock_mode(self, value):
message = OSCMessage("/esp/clockMode/s")
message.append(int(value))
return self.server.send(message)
def get_clock_mode(self):
self.server.send(OSCMessage("/esp/clockMode/q"))
return self.server.receive("/esp/clockMode/r")
# Retrieve tempo information
def query(self):
self.server.send(OSCMessage("/esp/clock/q"))
return self.server.receive("/esp/clock/r")
def get_tempo(self):
""" Queries the EspGrid Server for tempo and return a list:
[on, bpm, seconds, nanoseconds, beat] """
self.server.send(OSCMessage("/esp/tempo/q"))
return self.server.receive("/esp/tempo/r")
def get_tempo_cpu(self):
self.server.send(OSCMessage("/esp/tempoCPU/q"))
return self.server.receive("/esp/tempoCPU/r")
# Setting tempo data
def toggle_tempo(self, flag):
message = OSCMessage("/esp/beat/on")
message.append(flag)
return self.server.send(message)
def start_tempo(self):
return self.toggle_tempo(1)
def stop_tempo(self):
return self.toggle_tempo(0)
def set_tempo(self, value):
message = OSCMessage("/esp/beat/tempo")
message.append(float(value))
return self.server.send(message)
def get_start_time(self):
data = self.get_tempo()
sec, nano = data[2], data[3]
return float("{}.{}".format(sec, nano))
# Subcription to immediate messages
def subscribe(self):
message = OSCMessage("/esp/subscribe")
return self.server.send(message)
def unsubscribe(self):
message = OSCMessage("/esp/unsubscribe")
return self.server.send(message)
# Identification
def set_name(self, name):
message = OSCMessage("/esp/person/s")
message.append(str(name))
return self.server.send(message)
def get_name(self):
self.server.send(OSCMessage("/esp/person/q"))
return self.server.receive("/esp/person/r")
def set_machine_name(self, name):
message = OSCMessage("/esp/machine/s")
message.append(str(name))
return self.server.send(message)
def get_machine_name(self):
self.server.send(OSCMessage("/esp/machine/q"))
return self.server.receive("/esp/machine/r")
# Sending messages
def now(self, args):
message = OSCMessage("/esp/msg/now")
message.append(args)
return self.server.send(message)
def soon(self, args):
message = OSCMessage("/esp/msg/soon")
message.append(args)
return self.server.send(message)
def future(self, seconds, nanoseconds, args):
message = OSCMessage("/esp/msg/future")
message.append([seconds, nanosecons, args])
return self.server.send(message)
# Receiving
def recv(self, pattern):
return self.server.receive(pattern)
|