File: ll_termios.py

package info (click to toggle)
pypy 2.4.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 86,992 kB
  • ctags: 170,715
  • sloc: python: 1,030,417; ansic: 43,437; cpp: 5,241; asm: 5,169; sh: 458; makefile: 408; xml: 231; lisp: 45
file content (135 lines) | stat: -rw-r--r-- 5,289 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

"""
The low-level implementation of termios module
note that this module should only be imported when
termios module is there
"""

import termios
from rpython.rtyper.lltypesystem import rffi
from rpython.rtyper.lltypesystem import lltype
from rpython.rtyper.extfunc import lazy_register, register_external
from rpython.rlib.rarithmetic import intmask
from rpython.rtyper.extregistry import ExtRegistryEntry
from rpython.annotator import model as annmodel
from rpython.rtyper import rclass
from rpython.rlib import rtermios, rposix
from rpython.rtyper.tool import rffi_platform
from rpython.translator.tool.cbuild import ExternalCompilationInfo

eci = ExternalCompilationInfo(
    includes = ['termios.h', 'unistd.h']
)

class CConfig:
    _compilation_info_ = eci
    NCCS = rffi_platform.DefinedConstantInteger('NCCS')
    _HAVE_STRUCT_TERMIOS_C_ISPEED = rffi_platform.Defined(
            '_HAVE_STRUCT_TERMIOS_C_ISPEED')
    _HAVE_STRUCT_TERMIOS_C_OSPEED = rffi_platform.Defined(
            '_HAVE_STRUCT_TERMIOS_C_OSPEED')

c_config = rffi_platform.configure(CConfig)
NCCS = c_config['NCCS']

TCFLAG_T = rffi.UINT
CC_T = rffi.UCHAR
SPEED_T = rffi.UINT
INT = rffi.INT

_add = []
if c_config['_HAVE_STRUCT_TERMIOS_C_ISPEED']:
    _add.append(('c_ispeed', SPEED_T))
if c_config['_HAVE_STRUCT_TERMIOS_C_OSPEED']:
    _add.append(('c_ospeed', SPEED_T))
TERMIOSP = rffi.CStructPtr('termios', ('c_iflag', TCFLAG_T), ('c_oflag', TCFLAG_T),
                           ('c_cflag', TCFLAG_T), ('c_lflag', TCFLAG_T),
                           ('c_line', CC_T),
                           ('c_cc', lltype.FixedSizeArray(CC_T, NCCS)), *_add)

def c_external(name, args, result):
    return rffi.llexternal(name, args, result, compilation_info=eci)

c_tcsetattr = c_external('tcsetattr', [INT, INT, TERMIOSP], INT)
c_cfgetispeed = c_external('cfgetispeed', [TERMIOSP], SPEED_T)
c_cfgetospeed = c_external('cfgetospeed', [TERMIOSP], SPEED_T)
c_cfsetispeed = c_external('cfsetispeed', [TERMIOSP, SPEED_T], INT)
c_cfsetospeed = c_external('cfsetospeed', [TERMIOSP, SPEED_T], INT)
c_tcsendbreak = c_external('tcsendbreak', [INT, INT], INT)
c_tcdrain = c_external('tcdrain', [INT], INT)
c_tcflush = c_external('tcflush', [INT, INT], INT)
c_tcflow = c_external('tcflow', [INT, INT], INT)

c_tcgetattr = c_external('tcgetattr', [INT, TERMIOSP], INT)

def tcgetattr_llimpl(fd):
    c_struct = lltype.malloc(TERMIOSP.TO, flavor='raw')

    try:
        if c_tcgetattr(fd, c_struct) < 0:
            raise OSError(rposix.get_errno(), 'tcgetattr failed')
        cc = [chr(c_struct.c_c_cc[i]) for i in range(NCCS)]
        ispeed = c_cfgetispeed(c_struct)
        ospeed = c_cfgetospeed(c_struct)
        result = (intmask(c_struct.c_c_iflag), intmask(c_struct.c_c_oflag),
                  intmask(c_struct.c_c_cflag), intmask(c_struct.c_c_lflag),
                  intmask(ispeed), intmask(ospeed), cc)
        return result
    finally:
        lltype.free(c_struct, flavor='raw')

register_external(rtermios.tcgetattr, [int], (int, int, int, int, int, int, [str]),
                   llimpl=tcgetattr_llimpl, export_name='termios.tcgetattr')

def tcsetattr_llimpl(fd, when, attributes):
    c_struct = lltype.malloc(TERMIOSP.TO, flavor='raw')
    try:
        c_struct.c_c_iflag = r_uint(attributes[0])
        c_struct.c_c_oflag = r_uint(attributes[1])
        c_struct.c_c_cflag = r_uint(attributes[2])
        c_struct.c_c_lflag = r_uint(attributes[3])
        ispeed = r_uint(attributes[4])
        ospeed = r_uint(attributes[5])
        cc = attributes[6]
        for i in range(NCCS):
            c_struct.c_c_cc[i] = rffi.r_uchar(ord(cc[i][0]))
        if c_cfsetispeed(c_struct, ispeed) < 0:
            raise OSError(rposix.get_errno(), 'tcsetattr failed')
        if c_cfsetospeed(c_struct, ospeed) < 0:
            raise OSError(rposix.get_errno(), 'tcsetattr failed')
        if c_tcsetattr(fd, when, c_struct) < 0:
            raise OSError(rposix.get_errno(), 'tcsetattr failed')
    finally:
        lltype.free(c_struct, flavor='raw')

r_uint = rffi.r_uint
register_external(rtermios.tcsetattr, [int, int, (int, int, int,
                  int, int, int, [str])], llimpl=tcsetattr_llimpl,
                  export_name='termios.tcsetattr')

# a bit C-c C-v code follows...

def tcsendbreak_llimpl(fd, duration):
    if c_tcsendbreak(fd, duration):
        raise OSError(rposix.get_errno(), 'tcsendbreak failed')
register_external(termios.tcsendbreak, [int, int],
                  llimpl=tcsendbreak_llimpl,
                  export_name='termios.tcsendbreak')

def tcdrain_llimpl(fd):
    if c_tcdrain(fd) < 0:
        raise OSError(rposix.get_errno(), 'tcdrain failed')
register_external(termios.tcdrain, [int], llimpl=tcdrain_llimpl,
                  export_name='termios.tcdrain')

def tcflush_llimpl(fd, queue_selector):
    if c_tcflush(fd, queue_selector) < 0:
        raise OSError(rposix.get_errno(), 'tcflush failed')
register_external(termios.tcflush, [int, int], llimpl=tcflush_llimpl,
                  export_name='termios.tcflush')

def tcflow_llimpl(fd, action):
    if c_tcflow(fd, action) < 0:
        raise OSError(rposix.get_errno(), 'tcflow failed')
register_external(termios.tcflow, [int, int], llimpl=tcflow_llimpl,
                  export_name='termios.tcflow')