1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
"""
The low-level implementation of termios module
note that this module should only be imported when
termios module is there
"""
import termios
from rpython.rtyper.lltypesystem import rffi
from rpython.rtyper.lltypesystem import lltype
from rpython.rtyper.extfunc import lazy_register, register_external
from rpython.rlib.rarithmetic import intmask
from rpython.rtyper.extregistry import ExtRegistryEntry
from rpython.annotator import model as annmodel
from rpython.rtyper import rclass
from rpython.rlib import rtermios, rposix
from rpython.rtyper.tool import rffi_platform
from rpython.translator.tool.cbuild import ExternalCompilationInfo
eci = ExternalCompilationInfo(
includes = ['termios.h', 'unistd.h']
)
class CConfig:
_compilation_info_ = eci
NCCS = rffi_platform.DefinedConstantInteger('NCCS')
_HAVE_STRUCT_TERMIOS_C_ISPEED = rffi_platform.Defined(
'_HAVE_STRUCT_TERMIOS_C_ISPEED')
_HAVE_STRUCT_TERMIOS_C_OSPEED = rffi_platform.Defined(
'_HAVE_STRUCT_TERMIOS_C_OSPEED')
c_config = rffi_platform.configure(CConfig)
NCCS = c_config['NCCS']
TCFLAG_T = rffi.UINT
CC_T = rffi.UCHAR
SPEED_T = rffi.UINT
INT = rffi.INT
_add = []
if c_config['_HAVE_STRUCT_TERMIOS_C_ISPEED']:
_add.append(('c_ispeed', SPEED_T))
if c_config['_HAVE_STRUCT_TERMIOS_C_OSPEED']:
_add.append(('c_ospeed', SPEED_T))
TERMIOSP = rffi.CStructPtr('termios', ('c_iflag', TCFLAG_T), ('c_oflag', TCFLAG_T),
('c_cflag', TCFLAG_T), ('c_lflag', TCFLAG_T),
('c_line', CC_T),
('c_cc', lltype.FixedSizeArray(CC_T, NCCS)), *_add)
def c_external(name, args, result):
return rffi.llexternal(name, args, result, compilation_info=eci)
c_tcsetattr = c_external('tcsetattr', [INT, INT, TERMIOSP], INT)
c_cfgetispeed = c_external('cfgetispeed', [TERMIOSP], SPEED_T)
c_cfgetospeed = c_external('cfgetospeed', [TERMIOSP], SPEED_T)
c_cfsetispeed = c_external('cfsetispeed', [TERMIOSP, SPEED_T], INT)
c_cfsetospeed = c_external('cfsetospeed', [TERMIOSP, SPEED_T], INT)
c_tcsendbreak = c_external('tcsendbreak', [INT, INT], INT)
c_tcdrain = c_external('tcdrain', [INT], INT)
c_tcflush = c_external('tcflush', [INT, INT], INT)
c_tcflow = c_external('tcflow', [INT, INT], INT)
c_tcgetattr = c_external('tcgetattr', [INT, TERMIOSP], INT)
def tcgetattr_llimpl(fd):
c_struct = lltype.malloc(TERMIOSP.TO, flavor='raw')
try:
if c_tcgetattr(fd, c_struct) < 0:
raise OSError(rposix.get_errno(), 'tcgetattr failed')
cc = [chr(c_struct.c_c_cc[i]) for i in range(NCCS)]
ispeed = c_cfgetispeed(c_struct)
ospeed = c_cfgetospeed(c_struct)
result = (intmask(c_struct.c_c_iflag), intmask(c_struct.c_c_oflag),
intmask(c_struct.c_c_cflag), intmask(c_struct.c_c_lflag),
intmask(ispeed), intmask(ospeed), cc)
return result
finally:
lltype.free(c_struct, flavor='raw')
register_external(rtermios.tcgetattr, [int], (int, int, int, int, int, int, [str]),
llimpl=tcgetattr_llimpl, export_name='termios.tcgetattr')
def tcsetattr_llimpl(fd, when, attributes):
c_struct = lltype.malloc(TERMIOSP.TO, flavor='raw')
try:
c_struct.c_c_iflag = r_uint(attributes[0])
c_struct.c_c_oflag = r_uint(attributes[1])
c_struct.c_c_cflag = r_uint(attributes[2])
c_struct.c_c_lflag = r_uint(attributes[3])
ispeed = r_uint(attributes[4])
ospeed = r_uint(attributes[5])
cc = attributes[6]
for i in range(NCCS):
c_struct.c_c_cc[i] = rffi.r_uchar(ord(cc[i][0]))
if c_cfsetispeed(c_struct, ispeed) < 0:
raise OSError(rposix.get_errno(), 'tcsetattr failed')
if c_cfsetospeed(c_struct, ospeed) < 0:
raise OSError(rposix.get_errno(), 'tcsetattr failed')
if c_tcsetattr(fd, when, c_struct) < 0:
raise OSError(rposix.get_errno(), 'tcsetattr failed')
finally:
lltype.free(c_struct, flavor='raw')
r_uint = rffi.r_uint
register_external(rtermios.tcsetattr, [int, int, (int, int, int,
int, int, int, [str])], llimpl=tcsetattr_llimpl,
export_name='termios.tcsetattr')
# a bit C-c C-v code follows...
def tcsendbreak_llimpl(fd, duration):
if c_tcsendbreak(fd, duration):
raise OSError(rposix.get_errno(), 'tcsendbreak failed')
register_external(termios.tcsendbreak, [int, int],
llimpl=tcsendbreak_llimpl,
export_name='termios.tcsendbreak')
def tcdrain_llimpl(fd):
if c_tcdrain(fd) < 0:
raise OSError(rposix.get_errno(), 'tcdrain failed')
register_external(termios.tcdrain, [int], llimpl=tcdrain_llimpl,
export_name='termios.tcdrain')
def tcflush_llimpl(fd, queue_selector):
if c_tcflush(fd, queue_selector) < 0:
raise OSError(rposix.get_errno(), 'tcflush failed')
register_external(termios.tcflush, [int, int], llimpl=tcflush_llimpl,
export_name='termios.tcflush')
def tcflow_llimpl(fd, action):
if c_tcflow(fd, action) < 0:
raise OSError(rposix.get_errno(), 'tcflow failed')
register_external(termios.tcflow, [int, int], llimpl=tcflow_llimpl,
export_name='termios.tcflow')
|