File: network.py

package info (click to toggle)
python-werkzeug 3.1.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,912 kB
  • sloc: python: 21,938; javascript: 292; makefile: 39; sh: 17; xml: 16
file content (124 lines) | stat: -rw-r--r-- 3,795 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Query the servers for information."""
import socket
from datetime import datetime
from math import log

from .utils import unicodecmp


class ServerError(Exception):
    pass


class Syncable:
    last_sync = None

    def sync(self):
        try:
            self._sync()
        except (OSError, socket.timeout):
            return False
        self.last_sync = datetime.utcnow()
        return True


class ServerBrowser(Syncable):
    def __init__(self, cup):
        self.cup = cup
        self.servers = cup.db.setdefault("servers", dict)

    def _sync(self):
        to_delete = set(self.servers)
        for x in range(1, 17):
            addr = (f"master{x}.teeworlds.com", 8300)
            print(addr)
            try:
                self._sync_server_browser(addr, to_delete)
            except (OSError, socket.timeout):
                continue
        for server_id in to_delete:
            self.servers.pop(server_id, None)
        if not self.servers:
            raise OSError("no servers found")
        self.cup.db.sync()

    def _sync_server_browser(self, addr, to_delete):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(5)
        s.sendto(b"\x20\x00\x00\x00\x00\x48\xff\xff\xff\xffreqt", addr)
        data = s.recvfrom(1024)[0][14:]
        s.close()

        for n in range(0, len(data) // 6):
            addr = (
                ".".join(map(str, map(ord, data[n * 6 : n * 6 + 4]))),
                ord(data[n * 6 + 5]) * 256 + ord(data[n * 6 + 4]),
            )
            server_id = f"{addr[0]}:{addr[1]}"
            if server_id in self.servers:
                if not self.servers[server_id].sync():
                    continue
            else:
                try:
                    self.servers[server_id] = Server(addr, server_id)
                except ServerError:
                    pass
            to_delete.discard(server_id)


class Server(Syncable):
    def __init__(self, addr, server_id):
        self.addr = addr
        self.id = server_id
        self.players = []
        if not self.sync():
            raise ServerError("server not responding in time")

    def _sync(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(1)
        s.sendto(b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xffgief", self.addr)
        bits = s.recvfrom(1024)[0][14:].split(b"\x00")
        s.close()
        self.version, server_name, map_name = bits[:3]
        self.name = server_name.decode("latin1")
        self.map = map_name.decode("latin1")
        self.gametype = bits[3]
        self.flags, self.progression, player_count, self.max_players = map(
            int, bits[4:8]
        )

        # sync the player stats
        players = {p.name: p for p in self.players}
        for i in range(player_count):
            name = bits[8 + i * 2].decode("latin1")
            score = int(bits[9 + i * 2])

            # update existing player
            if name in players:
                player = players.pop(name)
                player.score = score
            # add new player
            else:
                self.players.append(Player(self, name, score))
        # delete players that left
        for player in players.values():
            try:
                self.players.remove(player)
            except Exception:
                pass

        # sort the player list and count them
        self.players.sort(key=lambda x: -x.score)
        self.player_count = len(self.players)

    def __cmp__(self, other):
        return unicodecmp(self.name, other.name)


class Player:
    def __init__(self, server, name, score):
        self.server = server
        self.name = name
        self.score = score
        self.size = round(100 + log(max(score, 1)) * 25, 2)