File: loader.py

package info (click to toggle)
python-peachpy 0.0~git20211013.257881e-1.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 2,452 kB
  • sloc: python: 29,286; ansic: 54; makefile: 44; cpp: 31
file content (153 lines) | stat: -rw-r--r-- 6,811 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# This file is part of PeachPy package and is licensed under the Simplified BSD license.
#    See license.rst for the full text of the license.

import sys


class Loader:
    def __init__(self, code_size, data_size=0):
        from peachpy.util import is_int
        if not is_int(code_size):
            raise TypeError("code size must be an integer")
        if not is_int(data_size):
            raise TypeError("data size must be an integer")
        if code_size <= 0:
            raise ValueError("code size must be positive")
        if data_size < 0:
            raise ValueError("data size must be non-negative")

        import mmap
        self.allocation_granularity = max(mmap.ALLOCATIONGRANULARITY, mmap.PAGESIZE)
        self.code_address = None
        self.code_size = self.allocation_size(code_size)
        self.data_address = None
        self.data_size = self.allocation_size(data_size)

        self._release_memory = None

        osname = sys.platform.lower()
        if osname == "darwin" or osname.startswith("linux") or osname.startswith("freebsd"):
            import ctypes

            if osname == "darwin":
                libc = ctypes.cdll.LoadLibrary("libc.dylib")
            elif osname.startswith("freebsd"):
                libc = ctypes.cdll.LoadLibrary("libc.so.7")
            else:
                libc = ctypes.cdll.LoadLibrary("libc.so.6")

            # void* mmap(void* addr, size_t len, int prot, int flags, int fd, off_t offset)
            mmap_function = libc.mmap
            mmap_function.restype = ctypes.c_void_p
            mmap_function.argtype = [ctypes.c_void_p, ctypes.c_size_t,
                             ctypes.c_int, ctypes.c_int,
                             ctypes.c_int, ctypes.c_size_t]
            # int munmap(void* addr, size_t len)
            munmap_function = libc.munmap
            munmap_function.restype = ctypes.c_int
            munmap_function.argtype = [ctypes.c_void_p, ctypes.c_size_t]

            def munmap(address, size):
                munmap_result = munmap_function(ctypes.c_void_p(address), size)
                assert munmap_result == 0

            self._release_memory = lambda address_size: munmap(address_size[0], address_size[1])

            # Allocate code segment
            code_address = mmap_function(None, self.code_size,
                                         mmap.PROT_READ | mmap.PROT_WRITE | mmap.PROT_EXEC,
                                         mmap.MAP_ANON | mmap.MAP_PRIVATE,
                                         -1, 0)
            if code_address == -1:
                raise OSError("Failed to allocate memory for code segment")
            self.code_address = code_address

            if self.data_size > 0:
                # Allocate data segment
                data_address = mmap_function(None, self.data_size,
                                             mmap.PROT_READ | mmap.PROT_WRITE,
                                             mmap.MAP_ANON | mmap.MAP_PRIVATE,
                                             -1, 0)
                if data_address == -1:
                    raise OSError("Failed to allocate memory for data segment")
                self.data_address = data_address
        elif osname == "win32":
            import ctypes

            # From WinNT.h
            PAGE_READWRITE = 0x04
            PAGE_EXECUTE_READWRITE = 0x40
            MEM_COMMIT = 0x1000
            MEM_RESERVE = 0x2000
            MEM_RELEASE = 0x8000

            # LPVOID WINAPI VirtualAlloc(LPVOID address, SIZE_T size, DWORD allocationType, DWORD protect)
            VirtualAlloc_function = ctypes.windll.kernel32.VirtualAlloc
            VirtualAlloc_function.restype = ctypes.c_void_p
            VirtualAlloc_function.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_ulong, ctypes.c_ulong]
            # BOOL WINAPI VirtualFree(LPVOID lpAddress, SIZE_T dwSize, DWORD  dwFreeType)
            VirtualFree_function = ctypes.windll.kernel32.VirtualFree
            VirtualFree_function.restype = ctypes.c_int
            VirtualFree_function.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_ulong]

            def VirtualFree(address, size):
                VirtualFree_result = VirtualFree_function(address, 0, MEM_RELEASE)
                assert VirtualFree_result != 0

            self._release_memory = lambda address_size: VirtualFree(address_size[0], address_size[1])

            # Allocate code segment
            code_address = VirtualAlloc_function(None, self.code_size,
                                                 MEM_RESERVE | MEM_COMMIT,
                                                 PAGE_EXECUTE_READWRITE)
            if not code_address:
                raise OSError("Failed to allocate memory for code segment")
            self.code_address = code_address

            if self.data_size > 0:
                # Allocate data segment
                data_address = VirtualAlloc_function(None, self.data_size,
                                                     MEM_RESERVE | MEM_COMMIT,
                                                     PAGE_READWRITE)
                if not data_address:
                    raise OSError("Failed to allocate memory for data segment")
                self.data_address = data_address
        elif osname == "nacl":
            import dynacl

            # Allocate code segment
            self.allocation = dynacl.allocate(self.code_size, self.data_size)
            self.code_address = self.allocation.code_address
            self.data_address = self.allocation.data_address
            self.copy_code = self._nacl_copy_code
        else:
            raise ValueError("Unknown host OS: " + osname)

    def allocation_size(self, segment_size):
        import peachpy.util
        return peachpy.util.roundup(segment_size, self.allocation_granularity)

    def copy_code(self, code_segment):
        import ctypes
        ctypes.memmove(self.code_address,
                       ctypes.c_char_p(bytes(code_segment)),
                       len(code_segment))

    def _nacl_copy_code(self, code_segment):
        code_offset = 0
        self.allocation.dyncode_create(code_segment, code_offset)

    def copy_data(self, data_segment):
        import ctypes
        ctypes.memmove(self.data_address,
                       ctypes.c_char_p(bytes(data_segment)),
                       len(data_segment))

    def __del__(self):
        if self._release_memory is not None:
            if self.code_address is not None:
                self._release_memory((self.code_address, self.code_size))
                self.code_address = None
            if self.data_address is not None:
                self._release_memory((self.data_address, self.data_size))
                self.data_address = None