File: multinode_server.in

package info (click to toggle)
morse-simulator 1.4-8
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 187,116 kB
  • sloc: ansic: 108,311; python: 25,694; cpp: 786; makefile: 126; xml: 34; sh: 7
file content (121 lines) | stat: -rwxr-xr-x 4,379 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#! @PYTHON_EXECUTABLE@
""" Simulation Manager that coordinates multiple MORSE nodes.

TEST
====

from pymorse.stream import StreamJSON, PollThread
node_stream = StreamJSON('localhost', 65000)
poll_thread = PollThread()
poll_thread.start()
node_stream.publish(['node1', {'robot1': [[0, 0, 0], [0, 0, 0]]}])
node_stream.publish(['node1', {'robot2': [[0, 0, 1], [1, 0, 0]]}])
node_stream.publish(['node1', {'robot3': [[0, 1, 0], [0, 1, 0]]}])
node_stream.last()
node_stream.close()
poll_thread.syncstop()
"""

import socket
import logging
import asyncore

# initialize the logger
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter( logging.Formatter('[%(asctime)s][%(name)s][%(levelname)s] %(message)s') )
logger.addHandler( handler )
logger.setLevel(logging.INFO)

from pymorse.stream import StreamJSON, PollThread

class MorseMultinode(asyncore.dispatcher):
    def __init__(self, host='0.0.0.0', port=65000):
        logger.debug("Starting Morse Multinode on %s:%i" % (str(host), port))
        asyncore.dispatcher.__init__(self)
        #self.nodes = {}
        self.robots = {}
        self.create_socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)

    def handle_accepted(self, sock, addr):
        logger.info("Incoming connection from %s" % repr(addr))
        #self.nodes[addr] = MorseNode(sock, self)
        MorseNode(sock, self)

def check_pose(rot, loc):
    inf = float('inf')
    return len(rot) == 3 and len(loc) == 3 and \
           [-inf < val < +inf for val in rot] == \
           [-inf < val < +inf for val in loc] == \
           [True, True, True]

class MorseNode(object):
    def __init__(self, sock, master):
        self._stream = StreamJSON(sock=sock)
        self._master = master
        self._stream.subscribe(self.on_message)

    def on_message(self, message):
        client_name   = message[0]
        client_robots = message[1]
        try:
            self.handle_robots(client_name, client_robots)
        except Exception as e:
            logger.warning("error while processing data from %s" % client_name)
        logger.debug("%s"%str(message))

    def handle_robots(self, client_name, client_robots):
        # Build/update the list of robots from
        # the data received from all the clients
        # XXX The special key __time is not a robot, but some specific
        # time information. Process them differently
        for robot_name, robot_position in client_robots.items():
            if type(robot_name) is str and robot_name == '__time':
                simulated_time, dt, real_time = robot_position
                if '__time' in self._master.robots:
                    # Here, we are supposed to test that messages are
                    # well-ordered, but, it does not seem to work really
                    # correctly for now, at least for two Blender local
                    # nodes
                    # current_time = self._master.robots['__time']
                    # if simulated_time + dt < current_time[0]:
                    #     logger.warning("%s seems to be late in the federation (last %f, received %f)" \
                    #             % (client_name, current_time[0], simulated_time))
                    pass
                self._master.robots[robot_name] = robot_position
            elif type(robot_name) is str and check_pose(*robot_position):
                self._master.robots[robot_name] = robot_position
            else:
                logger.info("received unexpected robot data, discarding.")

        # (self._master.robots - client_robots.keys())
        # do not send back the data we just received
        data = self._master.robots.copy()
        for robot in client_robots.keys():
            if robot is not '__time':
                del data[robot] # faster than: data.pop(robot)
        self._stream.publish(data)

def main(argv):
    if '-d' in argv[1:]:
        logger.setLevel(logging.DEBUG)

    serv = MorseMultinode()

    try:
        asyncore.loop(timeout=0.01)
    except KeyboardInterrupt:
        logger.info("Quit (Ctrl+C)")
    finally:
        logger.info("Closing all connections")
        asyncore.close_all()

    logger.info("Bye!")
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main(sys.argv))