File: procfs.py

package info (click to toggle)
rust-pathrs 0.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,912 kB
  • sloc: python: 1,138; sh: 371; ansic: 259; makefile: 151
file content (224 lines) | stat: -rw-r--r-- 8,238 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/python3
# SPDX-License-Identifier: MPL-2.0
#
# libpathrs: safe path resolution on Linux
# Copyright (C) 2019-2025 Aleksa Sarai <cyphar@cyphar.com>
# Copyright (C) 2019-2025 SUSE LLC
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import typing
from typing import Any, IO, cast

# TODO: Remove this once we only support Python >= 3.11.
from typing_extensions import Self, TypeAlias

from ._internal import (
    # File type helpers.
    WrappedFd,
    _convert_mode,
    # Error API.
    PathrsError,
    _is_pathrs_err,
    INTERNAL_ERROR,
    # CFFI helpers.
    _cstr,
    _cbuffer,
)
from ._libpathrs_cffi import lib as libpathrs_so

if typing.TYPE_CHECKING:
    # mypy apparently cannot handle the "ffi: cffi.api.FFI" definition in
    # _libpathrs_cffi/__init__.pyi so we need to explicitly reference the type
    # from cffi here.
    import cffi

    ffi = cffi.FFI()
    CString: TypeAlias = cffi.FFI.CData
    CBuffer: TypeAlias = cffi.FFI.CData
else:
    from ._libpathrs_cffi import ffi

    CString: TypeAlias = ffi.CData
    CBuffer: TypeAlias = ffi.CData

__all__ = [
    "PROC_ROOT",
    "PROC_SELF",
    "PROC_THREAD_SELF",
    "PROC_PID",
    "ProcfsHandle",
    # Shorthand for ProcfsHandle.cached().<foo>.
    "open",
    "open_raw",
    "readlink",
]

# TODO: Switch to "type ..." syntax once we switch to Python >= 3.12...?
ProcfsBase: TypeAlias = int

#: Resolve proc_* operations relative to the /proc root. Note that this mode
#: may be more expensive because we have to take steps to try to avoid leaking
#: unmasked procfs handles, so you should use PROC_SELF if you can.
PROC_ROOT: ProcfsBase = libpathrs_so.PATHRS_PROC_ROOT

#: Resolve proc_* operations relative to /proc/self. For most programs, this is
#: the standard choice.
PROC_SELF: ProcfsBase = libpathrs_so.PATHRS_PROC_SELF

#: Resolve proc_* operations relative to /proc/thread-self. In multi-threaded
#: programs where one thread has a different CLONE_FS, it is possible for
#: /proc/self to point the wrong thread and so /proc/thread-self may be
#: necessary.
PROC_THREAD_SELF: ProcfsBase = libpathrs_so.PATHRS_PROC_THREAD_SELF


def PROC_PID(pid: int) -> ProcfsBase:
    """
    Resolve proc_* operations relative to /proc/<pid>. Be aware that due to PID
    recycling, using this is generally not safe except in certain
    circumstances. Namely:

     * PID 1 (the init process), as that PID cannot ever get recycled.
     * Your current PID (though you should just use PROC_SELF).
     * PIDs of child processes (as long as you are sure that no other part of
       your program incorrectly catches or ignores SIGCHLD, and that you do it
       *before* you call wait(2)or any equivalent method that could reap
       zombies).
    """
    if pid & libpathrs_so.__PATHRS_PROC_TYPE_MASK:
        raise ValueError(f"invalid PROC_PID value {pid}")
    return libpathrs_so.__PATHRS_PROC_TYPE_PID | pid


class ProcfsHandle(WrappedFd):
    """ """

    _PROCFS_OPEN_HOW_TYPE = "pathrs_procfs_open_how *"

    @classmethod
    def cached(cls) -> Self:
        """
        Returns a cached version of ProcfsHandle that will always remain valid
        and cannot be closed. This is the recommended usage of ProcfsHandle.
        """
        return cls.from_raw_fd(libpathrs_so.PATHRS_PROC_DEFAULT_ROOTFD)

    @classmethod
    def new(cls, /, *, unmasked: bool = False) -> Self:
        """
        Create a new procfs handle with the requested configuration settings.

        Note that the requested configuration might be eligible for caching, in
        which case the ProcfsHandle.fileno() will contain a special sentinel
        value that cannot be used like a regular file descriptor.
        """

        # TODO: Is there a way to have ProcfsOpenHow actually subclass CData so
        # that we don't need to do any of these ugly casts?
        how = cast("libpathrs_so.ProcfsOpenHow", ffi.new(cls._PROCFS_OPEN_HOW_TYPE))
        how_size = ffi.sizeof(cast("Any", how))

        if unmasked:
            how.flags = libpathrs_so.PATHRS_PROCFS_NEW_UNMASKED

        fd = libpathrs_so.pathrs_procfs_open(how, how_size)
        if _is_pathrs_err(fd):
            raise PathrsError._fetch(fd) or INTERNAL_ERROR
        return cls.from_raw_fd(fd)

    def open_raw(self, base: ProcfsBase, path: str, flags: int, /) -> WrappedFd:
        """
        Open a procfs file using Unix open flags.

        This function returns a WrappedFd file handle.

        base indicates what the path should be relative to. Valid values
        include PROC_{ROOT,SELF,THREAD_SELF}.

        path is a relative path to base indicating which procfs file you wish
        to open.

        flags is the set of O_* flags you wish to pass to the open operation.
        If you do not intend to open a symlink, you should pass O_NOFOLLOW to
        flags to let libpathrs know that it can be more strict when opening the
        path.
        """
        # TODO: Should we default to O_NOFOLLOW or put a separate argument for it?
        fd = libpathrs_so.pathrs_proc_openat(self.fileno(), base, _cstr(path), flags)
        if _is_pathrs_err(fd):
            raise PathrsError._fetch(fd) or INTERNAL_ERROR
        return WrappedFd(fd)

    def open(
        self, base: ProcfsBase, path: str, mode: str = "r", /, *, extra_flags: int = 0
    ) -> IO[Any]:
        """
        Open a procfs file using Pythonic mode strings.

        This function returns an os.fdopen() file handle.

        base indicates what the path should be relative to. Valid values
        include PROC_{ROOT,SELF,THREAD_SELF}.

        path is a relative path to base indicating which procfs file you wish
        to open.

        mode is a Python mode string, and extra_flags can be used to indicate
        extra O_* flags you wish to pass to the open operation. If you do not
        intend to open a symlink, you should pass O_NOFOLLOW to extra_flags to
        let libpathrs know that it can be more strict when opening the path.
        """
        flags = _convert_mode(mode) | extra_flags
        with self.open_raw(base, path, flags) as file:
            return file.fdopen(mode)

    def readlink(self, base: ProcfsBase, path: str, /) -> str:
        """
        Fetch the target of a procfs symlink.

        Note that some procfs symlinks are "magic-links" where the returned
        string from readlink() is not how they are actually resolved.

        base indicates what the path should be relative to. Valid values
        include PROC_{ROOT,SELF,THREAD_SELF}.

        path is a relative path to base indicating which procfs file you wish
        to open.
        """
        # TODO: See if we can merge this with Root.readlink.
        cpath = _cstr(path)
        linkbuf_size = 128
        while True:
            linkbuf = _cbuffer(linkbuf_size)
            n = libpathrs_so.pathrs_proc_readlinkat(
                self.fileno(), base, cpath, linkbuf, linkbuf_size
            )
            if _is_pathrs_err(n):
                raise PathrsError._fetch(n) or INTERNAL_ERROR
            elif n <= linkbuf_size:
                buf = typing.cast(bytes, ffi.buffer(linkbuf, linkbuf_size)[:n])
                return buf.decode("latin1")
            else:
                # The contents were truncated. Unlike readlinkat, pathrs
                # returns the size of the link when it checked. So use the
                # returned size as a basis for the reallocated size (but in
                # order to avoid a DoS where a magic-link is growing by a
                # single byte each iteration, make sure we are a fair bit
                # larger).
                linkbuf_size += n


#: Open a procfs file (with unix open flags).
#: Shorthand for ProcfsHandle.cached().open(...).
open = ProcfsHandle.cached().open

#: Open a procfs file (with Pythonic mode strings).
#: Shorthand for ProcfsHandle.cached().open_raw(...).
open_raw = ProcfsHandle.cached().open_raw

#: Fetch the target of a procfs symlink.
#: Shorthand for ProcfsHandle.cached().readlink(...).
readlink = ProcfsHandle.cached().readlink