1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
#!/usr/bin/env python3
import os
import socket
import struct
# object ids
WL_DISPLAY = 1
WL_REGISTRY = 2
WL_SYNC_DONE = 3
# opcodes requests
WL_DISPLAY_SYNC = 0
WL_DISPLAY_GET_REGISTRY = 1
# opcodes events
WL_REGISTRY_GLOBAL = 0
class ArgString:
def parse(data):
size = struct.unpack('=I', data[:4])[0]
data = data[4:4 + size - 1]
padding = (4 - (size % 4)) % 4
return 4 + size + padding, data.decode()
class ArgRegistryGlobal:
def parse(data):
global_id = struct.unpack('=I', data[:4])[0]
data = data[4:]
consumed, interface = ArgString.parse(data)
data = data[consumed:]
version = struct.unpack('=I', data)[0]
return global_id, interface, version
class Wayland:
def __init__(self, wl_socket, log=False):
if log:
print()
print(f' Connecting to {wl_socket}')
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
self.socket.connect(wl_socket)
try:
creds = self.socket.getsockopt(
socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize('3i')
)
pid, uid, gid = struct.unpack('3i', creds)
with open(f'/proc/{pid}/comm', 'r') as f:
self.name = f.read().strip()
except:
self.name = 'Unknown'
if log:
print(f" Connected to {self.name}\n")
def wire(self, obj_id, opcode, data=b''):
size = 8 + len(data)
sizeop = size << 16 | opcode
self.socket.send(struct.pack('=II', obj_id, sizeop) + data)
def wire_arg_uint32(self, to_obj_id, opcode, arg):
self.wire(to_obj_id, opcode, struct.pack('=I', arg))
def parse_msg(self, obj_id, opcode, arg_data, interfaces):
if obj_id == WL_REGISTRY and opcode == WL_REGISTRY_GLOBAL:
_, interface, version = ArgRegistryGlobal.parse(arg_data)
interfaces[interface] = version
return True
return False
def get_interfaces(self):
self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_GET_REGISTRY, WL_REGISTRY)
self.wire_arg_uint32(WL_DISPLAY, WL_DISPLAY_SYNC, WL_SYNC_DONE)
interfaces = dict()
old_data = b''
data = self.socket.recv(4096)
while data:
data = old_data + data
while len(data) >= 8:
obj_id, sizeop = struct.unpack('=II', data[:8])
size = sizeop >> 16
op = sizeop & 0xffff
if len(data) < size:
break
arg_data = data[8:size]
if obj_id == WL_DISPLAY:
# Ignore error and delete_id events
pass
elif obj_id == WL_SYNC_DONE:
# All interfaces have been announced
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
return interfaces
elif self.parse_msg(obj_id, op, arg_data, interfaces):
pass
else:
print(f"Unknown message received: obj_id {obj_id} op {op}")
data = data[size:]
old_data = data
data = self.socket.recv(4096)
wl_socket = os.path.basename(self.socket.getpeername())
print(f"error in wayland communication with {self.name} @ {wl_socket}\n")
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
return interfaces
if __name__ == '__main__':
import sys
runtime_dir = os.getenv('XDG_RUNTIME_DIR')
if not runtime_dir:
print("XDG_RUNTIME_DIR not set")
exit(1)
def find_wl_sockets(sockets):
x = 0
while True:
try:
os.stat(os.path.join(runtime_dir, f'wayland-{x}'))
sockets.append(f'wayland-{x}')
except FileNotFoundError:
break
x += 1
compositors = dict()
wl_sockets = sys.argv[1:]
if not wl_sockets:
find_wl_sockets(wl_sockets)
for wl_socket in wl_sockets:
wl = Wayland(os.path.join(runtime_dir, wl_socket), log=len(wl_sockets) == 1)
if len(wl_sockets) == 1:
print(" {:<45s} {:>2}".format("Interface", "Version"))
for name, version in sorted(wl.get_interfaces().items()):
print(" {:<45s} {:>2}".format(name, version))
print()
exit(0)
compositors[wl_socket] = (wl.name, wl.get_interfaces())
all_interfaces = set()
for _, (_, interfaces) in compositors.items():
all_interfaces |= set(interfaces.items())
for compositor, (compositor_name, interfaces) in compositors.items():
missing = all_interfaces - set(interfaces.items())
for name, version in set(missing):
if interfaces.get(name, 0) > version:
missing.remove((name, version))
if missing:
print()
print(f"\x1b[1m Protocols missing from {compositor_name} @ {compositor}\x1b[m")
for name, version in sorted(missing):
own_version = interfaces.get(name, 0)
print(" {:<45s} {:>2} {}".format(name, version,
f'(has version {own_version})' if own_version else '')
)
print()
|