File: rsandbox.py

package info (click to toggle)
pypy 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 107,216 kB
  • sloc: python: 1,201,787; ansic: 62,419; asm: 5,169; cpp: 3,017; sh: 2,534; makefile: 545; xml: 243; lisp: 45; awk: 4
file content (176 lines) | stat: -rw-r--r-- 6,067 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Generation of sandboxing stand-alone executable from RPython code.
In place of real calls to any external function, this code builds
trampolines that marshal their input arguments, dump them to STDOUT,
and wait for an answer on STDIN.  Enable with 'translate.py --sandbox'.
"""
import py

from rpython.rlib import rmarshal, types
from rpython.rlib.signature import signature

# ____________________________________________________________
#
# Sandboxing code generator for external functions
#

from rpython.rlib import rposix
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rtyper.llannotation import lltype_to_annotation
from rpython.rtyper.annlowlevel import MixLevelHelperAnnotator
from rpython.tool.ansi_print import AnsiLogger

log = AnsiLogger("sandbox")


# a version of os.read() and os.write() that are not mangled
# by the sandboxing mechanism
ll_read_not_sandboxed = rposix.external('read',
                                        [rffi.INT, rffi.CCHARP, rffi.SIZE_T],
                                        rffi.SIZE_T,
                                        sandboxsafe=True,
                                        _nowrapper=True)

ll_write_not_sandboxed = rposix.external('write',
                                         [rffi.INT, rffi.CCHARP, rffi.SIZE_T],
                                         rffi.SIZE_T,
                                         sandboxsafe=True,
                                         _nowrapper=True)


@signature(types.int(), types.ptr(rffi.CCHARP.TO), types.int(),
    returns=types.none())
def writeall_not_sandboxed(fd, buf, length):
    fd = rffi.cast(rffi.INT, fd)
    while length > 0:
        size = rffi.cast(rffi.SIZE_T, length)
        count = rffi.cast(lltype.Signed, ll_write_not_sandboxed(fd, buf, size))
        if count <= 0:
            raise IOError
        length -= count
        buf = lltype.direct_ptradd(lltype.direct_arrayitems(buf), count)
        buf = rffi.cast(rffi.CCHARP, buf)


class FdLoader(rmarshal.Loader):
    def __init__(self, fd):
        rmarshal.Loader.__init__(self, "")
        self.fd = fd
        self.buflen = 4096

    def need_more_data(self):
        buflen = self.buflen
        with lltype.scoped_alloc(rffi.CCHARP.TO, buflen) as buf:
            buflen = rffi.cast(rffi.SIZE_T, buflen)
            fd = rffi.cast(rffi.INT, self.fd)
            count = ll_read_not_sandboxed(fd, buf, buflen)
            count = rffi.cast(lltype.Signed, count)
            if count <= 0:
                raise IOError
            self.buf += ''.join([buf[i] for i in range(count)])
            self.buflen *= 2

def sandboxed_io(buf):
    STDIN = 0
    STDOUT = 1
    # send the buffer with the marshalled fnname and input arguments to STDOUT
    with lltype.scoped_alloc(rffi.CCHARP.TO, len(buf)) as p:
        for i in range(len(buf)):
            p[i] = buf[i]
        writeall_not_sandboxed(STDOUT, p, len(buf))
    # build a Loader that will get the answer from STDIN
    loader = FdLoader(STDIN)
    # check for errors
    error = load_int(loader)
    if error != 0:
        reraise_error(error, loader)
    else:
        # no exception; the caller will decode the actual result
        return loader

def reraise_error(error, loader):
    if error == 1:
        raise OSError(load_int(loader), "external error")
    elif error == 2:
        raise IOError
    elif error == 3:
        raise OverflowError
    elif error == 4:
        raise ValueError
    elif error == 5:
        raise ZeroDivisionError
    elif error == 6:
        raise MemoryError
    elif error == 7:
        raise KeyError
    elif error == 8:
        raise IndexError
    else:
        raise RuntimeError


@signature(types.str(), returns=types.impossible())
def not_implemented_stub(msg):
    STDERR = 2
    with rffi.scoped_str2charp(msg + '\n') as buf:
        writeall_not_sandboxed(STDERR, buf, len(msg) + 1)
    raise RuntimeError(msg)  # XXX in RPython, the msg is ignored

def make_stub(fnname, msg):
    """Build always-raising stub function to replace unsupported external."""
    log.WARNING(msg)

    def execute(*args):
        not_implemented_stub(msg)
    execute.__name__ = 'sandboxed_%s' % (fnname,)
    return execute

def sig_ll(fnobj):
    FUNCTYPE = lltype.typeOf(fnobj)
    args_s = [lltype_to_annotation(ARG) for ARG in FUNCTYPE.ARGS]
    s_result = lltype_to_annotation(FUNCTYPE.RESULT)
    return args_s, s_result

dump_string = rmarshal.get_marshaller(str)
load_int = rmarshal.get_loader(int)

def get_sandbox_stub(fnobj, rtyper):
    fnname = fnobj._name
    args_s, s_result = sig_ll(fnobj)
    msg = "Not implemented: sandboxing for external function '%s'" % (fnname,)
    execute = make_stub(fnname, msg)
    return _annotate(rtyper, execute, args_s, s_result)

def make_sandbox_trampoline(fnname, args_s, s_result):
    """Create a trampoline function with the specified signature.

    The trampoline is meant to be used in place of real calls to the external
    function named 'fnname'.  It marshals its input arguments, dumps them to
    STDOUT, and waits for an answer on STDIN.
    """
    try:
        dump_arguments = rmarshal.get_marshaller(tuple(args_s))
        load_result = rmarshal.get_loader(s_result)
    except (rmarshal.CannotMarshal, rmarshal.CannotUnmarshall) as e:
        msg = "Cannot sandbox function '%s': %s" % (fnname, e)
        execute = make_stub(fnname, msg)
    else:
        def execute(*args):
            # marshal the function name and input arguments
            buf = []
            dump_string(buf, fnname)
            dump_arguments(buf, args)
            # send the buffer and wait for the answer
            loader = sandboxed_io(buf)
            # decode the answer
            result = load_result(loader)
            loader.check_finished()
            return result
        execute.__name__ = 'sandboxed_%s' % (fnname,)
    return execute


def _annotate(rtyper, f, args_s, s_result):
    ann = MixLevelHelperAnnotator(rtyper)
    graph = ann.getgraph(f, args_s, s_result)
    ann.finish()
    return graph