File: min-tempfile.diff

package info (click to toggle)
python3.13 3.13.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 121,328 kB
  • sloc: python: 704,014; ansic: 653,914; xml: 31,250; sh: 5,844; cpp: 4,326; makefile: 1,981; objc: 787; lisp: 502; javascript: 213; asm: 75; csh: 12
file content (219 lines) | stat: -rw-r--r-- 8,714 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
Description: Debian: Degrade tempfile gracefully without shutil
 python3.X-minimal includes tempfile but not shutil. Use a fallback racy
 rmtree, if shutil can't be imported.

 This needs constant maintainance, as the code is a copy.

Forwarded: not-needed

--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -40,7 +40,198 @@
 import warnings as _warnings
 import io as _io
 import os as _os
-import shutil as _shutil
+try:
+  import shutil as _shutil
+  _rmtree = _shutil.rmtree
+except ImportError:
+  import sys as _sys
+  import stat as _stat
+
+  # version vulnerable to race conditions
+  def _rmtree_unsafe(path, onexc):
+      def onerror(err):
+          if not isinstance(err, FileNotFoundError):
+              onexc(os.scandir, err.filename, err)
+      results = os.walk(path, topdown=False, onerror=onerror, followlinks=os._walk_symlinks_as_files)
+      for dirpath, dirnames, filenames in results:
+          for name in dirnames:
+              fullname = os.path.join(dirpath, name)
+              try:
+                  os.rmdir(fullname)
+              except FileNotFoundError:
+                  continue
+              except OSError as err:
+                  onexc(os.rmdir, fullname, err)
+          for name in filenames:
+              fullname = os.path.join(dirpath, name)
+              try:
+                  os.unlink(fullname)
+              except FileNotFoundError:
+                  continue
+              except OSError as err:
+                  onexc(os.unlink, fullname, err)
+      try:
+          os.rmdir(path)
+      except FileNotFoundError:
+          pass
+      except OSError as err:
+          onexc(os.rmdir, path, err)
+
+  # Version using fd-based APIs to protect against races
+  def _rmtree_safe_fd(stack, onexc):
+      # Each stack item has four elements:
+      # * func: The first operation to perform: os.lstat, os.close or os.rmdir.
+      #   Walking a directory starts with an os.lstat() to detect symlinks; in
+      #   this case, func is updated before subsequent operations and passed to
+      #   onexc() if an error occurs.
+      # * dirfd: Open file descriptor, or None if we're processing the top-level
+      #   directory given to rmtree() and the user didn't supply dir_fd.
+      # * path: Path of file to operate upon. This is passed to onexc() if an
+      #   error occurs.
+      # * orig_entry: os.DirEntry, or None if we're processing the top-level
+      #   directory given to rmtree(). We used the cached stat() of the entry to
+      #   save a call to os.lstat() when walking subdirectories.
+      func, dirfd, path, orig_entry = stack.pop()
+      name = path if orig_entry is None else orig_entry.name
+      try:
+          if func is os.close:
+              os.close(dirfd)
+              return
+          if func is os.rmdir:
+              os.rmdir(name, dir_fd=dirfd)
+              return
+
+          # Note: To guard against symlink races, we use the standard
+          # lstat()/open()/fstat() trick.
+          assert func is os.lstat
+          if orig_entry is None:
+              orig_st = os.lstat(name, dir_fd=dirfd)
+          else:
+              orig_st = orig_entry.stat(follow_symlinks=False)
+
+          func = os.open  # For error reporting.
+          topfd = os.open(name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dirfd)
+
+          func = os.path.islink  # For error reporting.
+          try:
+              if not os.path.samestat(orig_st, os.fstat(topfd)):
+                  # Symlinks to directories are forbidden, see GH-46010.
+                  raise OSError("Cannot call rmtree on a symbolic link")
+              stack.append((os.rmdir, dirfd, path, orig_entry))
+          finally:
+              stack.append((os.close, topfd, path, orig_entry))
+
+          func = os.scandir  # For error reporting.
+          with os.scandir(topfd) as scandir_it:
+              entries = list(scandir_it)
+          for entry in entries:
+              fullname = os.path.join(path, entry.name)
+              try:
+                  if entry.is_dir(follow_symlinks=False):
+                      # Traverse into sub-directory.
+                      stack.append((os.lstat, topfd, fullname, entry))
+                      continue
+              except FileNotFoundError:
+                  continue
+              except OSError:
+                  pass
+              try:
+                  os.unlink(entry.name, dir_fd=topfd)
+              except FileNotFoundError:
+                  continue
+              except OSError as err:
+                  onexc(os.unlink, fullname, err)
+      except FileNotFoundError as err:
+          if orig_entry is None or func is os.close:
+              err.filename = path
+              onexc(func, path, err)
+      except OSError as err:
+          err.filename = path
+          onexc(func, path, err)
+
+  _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
+                       os.supports_dir_fd and
+                       os.scandir in os.supports_fd and
+                       os.stat in os.supports_follow_symlinks)
+
+  def _rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
+      """Recursively delete a directory tree.
+
+      If dir_fd is not None, it should be a file descriptor open to a directory;
+      path will then be relative to that directory.
+      dir_fd may not be implemented on your platform.
+      If it is unavailable, using it will raise a NotImplementedError.
+
+      If ignore_errors is set, errors are ignored; otherwise, if onexc or
+      onerror is set, it is called to handle the error with arguments (func,
+      path, exc_info) where func is platform and implementation dependent;
+      path is the argument to that function that caused it to fail; and
+      the value of exc_info describes the exception. For onexc it is the
+      exception instance, and for onerror it is a tuple as returned by
+      sys.exc_info().  If ignore_errors is false and both onexc and
+      onerror are None, the exception is reraised.
+
+      onerror is deprecated and only remains for backwards compatibility.
+      If both onerror and onexc are set, onerror is ignored and onexc is used.
+      """
+
+      sys.audit("tempfile._rmtree", path, dir_fd)
+      if ignore_errors:
+          def onexc(*args):
+              pass
+      elif onerror is None and onexc is None:
+          def onexc(*args):
+              raise
+      elif onexc is None:
+          if onerror is None:
+              def onexc(*args):
+                  raise
+          else:
+              # delegate to onerror
+              def onexc(*args):
+                  func, path, exc = args
+                  if exc is None:
+                      exc_info = None, None, None
+                  else:
+                      exc_info = type(exc), exc, exc.__traceback__
+                  return onerror(func, path, exc_info)
+
+      if _use_fd_functions:
+          # While the unsafe rmtree works fine on bytes, the fd based does not.
+          if isinstance(path, bytes):
+              path = os.fsdecode(path)
+          stack = [(os.lstat, dir_fd, path, None)]
+          try:
+              while stack:
+                  _rmtree_safe_fd(stack, onexc)
+          finally:
+              # Close any file descriptors still on the stack.
+              while stack:
+                  func, fd, path, entry = stack.pop()
+                  if func is not os.close:
+                      continue
+                  try:
+                      os.close(fd)
+                  except OSError as err:
+                      onexc(os.close, path, err)
+      else:
+          if dir_fd is not None:
+              raise NotImplementedError("dir_fd unavailable on this platform")
+          try:
+              st = os.lstat(path)
+          except OSError as err:
+              onexc(os.lstat, path, err)
+              return
+          try:
+              if _rmtree_islink(st):
+                  # symlinks to directories are forbidden, see bug #1669
+                  raise OSError("Cannot call rmtree on a symbolic link")
+          except OSError as err:
+              onexc(os.path.islink, path, err)
+              # can't continue even if onexc hook returns
+              return
+          return _rmtree_unsafe(path, onexc)
+
 import errno as _errno
 from random import Random as _Random
 import sys as _sys
@@ -932,7 +1123,7 @@
                 if not ignore_errors:
                     raise
 
-        _shutil.rmtree(name, onexc=onexc)
+        _rmtree(name, onexc=onexc)
 
     @classmethod
     def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True):