File: interp_win32_py3.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (45 lines) | stat: -rw-r--r-- 1,683 bytes parent folder | download | duplicates (9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from rpython.rtyper.lltypesystem import rffi
from rpython.rlib._rsocket_rffi import socketclose, geterrno, socketrecv, send
from rpython.rlib import rwin32
from pypy.interpreter.error import OperationError
from pypy.interpreter.gateway import unwrap_spec

def getWindowsError(space):
    errno = geterrno()
    message = rwin32.FormatErrorW(errno)
    w_errcode = space.newint(errno)
    return OperationError(space.w_WindowsError,
                         space.newtuple([w_errcode, space.newtext(*message),
                        space.w_None, w_errcode]))

@unwrap_spec(handle=int)
def multiprocessing_closesocket(space, handle):
    res = socketclose(handle)
    if res != 0:
        raise getWindowsError(space)

@unwrap_spec(handle=int, buffersize=int)
def multiprocessing_recv(space, handle, buffersize):
    with rffi.scoped_alloc_buffer(buffersize) as buf:
        read_bytes = socketrecv(handle, buf.raw, buffersize, 0)
        if read_bytes >= 0:
            return space.newbytes(buf.str(read_bytes))
    raise getWindowsError(space)

@unwrap_spec(handle=int, data='bufferstr')
def multiprocessing_send(space, handle, data):
    if data is None:
        raise OperationError(space.w_ValueError, 'data cannot be None')
    with rffi.scoped_nonmovingbuffer(data) as dataptr:
        # rsocket checks for writability of socket with wait_for_data, cpython does check
        res = send(handle, dataptr, len(data), 0)
        if res < 0:
            raise getWindowsError(space)
    return space.newint(res)

def handle_w(space, w_handle):
    return rffi.cast(rwin32.HANDLE, space.int_w(w_handle))

_GetTickCount = rwin32.winexternal(
    'GetTickCount', [], rwin32.DWORD)