File: interp_termios.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (130 lines) | stat: -rw-r--r-- 5,100 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

""" Termios module. I'm implementing it directly here, as I see
little use of termios module on RPython level by itself
"""

from pypy.interpreter.gateway import unwrap_spec
from pypy.interpreter.error import oefmt, wrap_oserror, exception_from_saved_errno
from rpython.rlib import rtermios
from rpython.rlib import rposix
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rlib.rarithmetic import r_uint, widen

class Cache:
    def __init__(self, space):
        self.w_error = space.new_exception_class("termios.error")

def convert_error(space, error):
    w_exception_class = space.fromcache(Cache).w_error
    return wrap_oserror(space, error, w_exception_class=w_exception_class)

@unwrap_spec(when=int)
def tcsetattr(space, w_fd, when, w_attributes):
    fd = space.c_filedescriptor_w(w_fd)
    if not space.isinstance_w(w_attributes, space.w_list) or \
            space.len_w(w_attributes) != 7:
        raise oefmt(space.w_TypeError,
                    "tcsetattr, arg 3: must be 7 element list")
    w_iflag, w_oflag, w_cflag, w_lflag, w_ispeed, w_ospeed, w_cc = \
             space.unpackiterable(w_attributes, expected_length=7)
    cc = []
    for w_c in space.unpackiterable(w_cc):
        if space.isinstance_w(w_c, space.w_int):
            w_c = space.call_function(space.w_bytes, space.newlist([w_c]))
        cc.append(space.bytes_w(w_c))
    tup = (space.int_w(w_iflag), space.int_w(w_oflag),
           space.int_w(w_cflag), space.int_w(w_lflag),
           space.int_w(w_ispeed), space.int_w(w_ospeed), cc)
    try:
        rtermios.tcsetattr(fd, when, tup)
    except OSError as e:
        raise convert_error(space, e)

def tcgetattr(space, w_fd):
    fd = space.c_filedescriptor_w(w_fd)
    try:
        tup = rtermios.tcgetattr(fd)
    except OSError as e:
        raise convert_error(space, e)
    iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tup
    l_w = [space.newint(i) for i in [iflag, oflag, cflag, lflag, ispeed, ospeed]]
    # last one need to be chosen carefully
    cc_w = [space.newbytes(i) for i in cc]
    if lflag & rtermios.ICANON:
        cc_w[rtermios.VMIN] = space.newint(ord(cc[rtermios.VMIN][0]))
        cc_w[rtermios.VTIME] = space.newint(ord(cc[rtermios.VTIME][0]))
    w_cc = space.newlist(cc_w)
    l_w.append(w_cc)
    return space.newlist(l_w)

@unwrap_spec(duration=int)
def tcsendbreak(space, w_fd, duration):
    fd = space.c_filedescriptor_w(w_fd)
    try:
        rtermios.tcsendbreak(fd, duration)
    except OSError as e:
        raise convert_error(space, e)

def tcdrain(space, w_fd):
    fd = space.c_filedescriptor_w(w_fd)
    try:
        rtermios.tcdrain(fd)
    except OSError as e:
        raise convert_error(space, e)

@unwrap_spec(queue=int)
def tcflush(space, w_fd, queue):
    fd = space.c_filedescriptor_w(w_fd)
    try:
        rtermios.tcflush(fd, queue)
    except OSError as e:
        raise convert_error(space, e)

@unwrap_spec(action=int)
def tcflow(space, w_fd, action):
    fd = space.c_filedescriptor_w(w_fd)
    try:
        rtermios.tcflow(fd, action)
    except OSError as e:
        raise convert_error(space, e)

@unwrap_spec(fd=int)
def tcgetwinsize(space, fd):
    if rtermios.TIOCGWINSZ:
        with lltype.scoped_alloc(rposix.WINSIZE) as winsize:
            failed = rposix.c_ioctl_voidp(fd, rtermios.TIOCGWINSZ, winsize)
            if failed:
                w_error = space.fromcache(Cache).w_error
                raise exception_from_saved_errno(space, w_error)
            w_col = space.newint(r_uint(winsize.c_ws_col))
            w_row = space.newint(r_uint(winsize.c_ws_row))
            return space.newtuple([w_row, w_col])
    elif rtermios.TIOCGSIZE:
        raise oefmt(space.w_NotImplementedError, "TIOCGSIZE not supported")
    else:
        raise oefmt(space.w_NotImplementedError, "requires termios.TIOCGWINSZ")
         
@unwrap_spec(fd=int)
def tcsetwinsize(space, fd, w_winsz):
    winsz_w = space.listview(w_winsz)
    rows = space.int_w(winsz_w[0])
    cols = space.int_w(winsz_w[1])
    if rtermios.TIOCGWINSZ and rtermios.TIOCSWINSZ:
        with lltype.scoped_alloc(rposix.WINSIZE) as winsize:
            failed = rposix.c_ioctl_voidp(fd, rposix.TIOCGWINSZ, winsize)
            if failed:
                w_error = space.fromcache(Cache).w_error
                raise exception_from_saved_errno(space, w_error)
            winsize.c_ws_row = rffi.cast(rffi.USHORT, rows)
            winsize.c_ws_col = rffi.cast(rffi.USHORT, cols)
            if (widen(winsize.c_ws_row) != rows or
                widen(winsize.c_ws_col) != cols):
                raise oefmt(space.w_OverflowError, "winsize value(s) out of range")
            failed = rposix.c_ioctl_voidp(fd, rtermios.TIOCSWINSZ, winsize)
            if failed:
                w_error = space.fromcache(Cache).w_error
                raise exception_from_saved_errno(space, w_error)
    elif rtermios.TIOCSSIZE:
        raise oefmt(space.w_NotImplementedError, "TIOCSSIZE not supported")
    else:
        raise oefmt(space.w_NotImplementedError, "requires termios.TIOCGWINSZ, termios.TIOCSWINSZ")