File: inspect_plwm.py

package info (click to toggle)
python-plwm 2.6a%2B20080530-2
  • links: PTS
  • area: main
  • in suites: buster, stretch
  • size: 1,048 kB
  • ctags: 1,781
  • sloc: python: 7,574; ansic: 1,075; xml: 109; makefile: 83
file content (122 lines) | stat: -rwxr-xr-x 3,806 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python
#
# inspect_plwm.py -- PLWM inspect client
#
#    Copyright (C) 2001  Peter Liljenberg <petli@ctrl-c.liu.se>
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

import sys
import os

from Xlib import display, rdb
import readline
import socket
import struct
import string

class InspectError(Exception): pass

class Inspect:
    def __init__(self, disp):

        # Get property containing inspect port and cookie

        self.PLWM_INSPECT_SERVER = disp.intern_atom('_PLWM_INSPECT_SERVER')
        p = disp.screen().root.get_property(self.PLWM_INSPECT_SERVER,
                                            self.PLWM_INSPECT_SERVER,
                                            0, 2)
        if not p or p.format != 32 or len(p.value) != 2:
            raise InspectError('valid _PLWM_INSPECT_SERVER property not found')

        port = int(p.value[0])
        cookie = int(p.value[1])

        # Connect to the same host as the display
        host = string.split(disp.get_display_name(), ':')[0]
        if host == '':
            host = '127.0.0.1'

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((host, port))

        self.recv_buf = ''

        # Send cookie, little-endian coded
        self.send_raw(struct.pack('>l', cookie))

        # Recieve and print welcome message
        sys.stdout.write(self.recv())

    def send_raw(self, data):
        while data:
            n = self.socket.send(data)
            data = data[n:]

    def send(self, data):
        self.send_raw(struct.pack('>l', len(data)) + data)

    def recv(self):
        length = None
        while length is None or len(self.recv_buf) < length:
            d = self.socket.recv(1000)
            if not d:
                raise InspectError('connection closed by server')
            self.recv_buf = self.recv_buf + d

            if length is None:
                if len(self.recv_buf) < 4:
                    continue
                length = struct.unpack('>l', self.recv_buf[:4])[0]
                self.recv_buf = self.recv_buf[4:]

        d = self.recv_buf[:length]
        self.recv_buf = self.recv_buf[length:]

        return d

    def loop(self):
        try:
            while 1:
                expr = raw_input('>>> ')
                if expr:

                    # If first character of expr is a space,
                    # then this is a multiline statement.
                    # Read more lines until we get an empty one

                    if expr[0] == ' ':
                        lines = [expr[1:]]

                        while 1:
                            d = raw_input('... ')
                            lines.append(d[1:])
                            if not d:
                                break

                        expr = string.join(lines, '\n')

                    self.send(expr)
                    sys.stdout.write(self.recv())
        except EOFError:
            self.socket.close()


def main():
    d, name, db, argv = rdb.get_display_opts(rdb.stdopts)
    Inspect(d).loop()

if __name__ == '__main__':
    main()