| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 
 | import builtins
import locale
import os
import sys
import threading
from test import support
from test.support import os_helper
from .utils import print_warning
class SkipTestEnvironment(Exception):
    pass
# Unit tests are supposed to leave the execution environment unchanged
# once they complete.  But sometimes tests have bugs, especially when
# tests fail, and the changes to environment go on to mess up other
# tests.  This can cause issues with buildbot stability, since tests
# are run in random order and so problems may appear to come and go.
# There are a few things we can save and restore to mitigate this, and
# the following context manager handles this task.
class saved_test_environment:
    """Save bits of the test environment and restore them at block exit.
        with saved_test_environment(test_name, verbose, quiet):
            #stuff
    Unless quiet is True, a warning is printed to stderr if any of
    the saved items was changed by the test. The support.environment_altered
    attribute is set to True if a change is detected.
    If verbose is more than 1, the before and after state of changed
    items is also printed.
    """
    def __init__(self, test_name, verbose, quiet, *, pgo):
        self.test_name = test_name
        self.verbose = verbose
        self.quiet = quiet
        self.pgo = pgo
    # To add things to save and restore, add a name XXX to the resources list
    # and add corresponding get_XXX/restore_XXX functions.  get_XXX should
    # return the value to be saved and compared against a second call to the
    # get function when test execution completes.  restore_XXX should accept
    # the saved value and restore the resource using it.  It will be called if
    # and only if a change in the value is detected.
    #
    # Note: XXX will have any '.' replaced with '_' characters when determining
    # the corresponding method names.
    resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
                 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
                 'warnings.filters', 'asyncore.socket_map',
                 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
                 'sys.warnoptions',
                 # multiprocessing.process._cleanup() may release ref
                 # to a thread, so check processes first.
                 'multiprocessing.process._dangling', 'threading._dangling',
                 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
                 'files', 'locale', 'warnings.showwarning',
                 'shutil_archive_formats', 'shutil_unpack_formats',
                 'asyncio.events._event_loop_policy',
                 'urllib.requests._url_tempfiles', 'urllib.requests._opener',
                )
    def get_module(self, name):
        # function for restore() methods
        return sys.modules[name]
    def try_get_module(self, name):
        # function for get() methods
        try:
            return self.get_module(name)
        except KeyError:
            raise SkipTestEnvironment
    def get_urllib_requests__url_tempfiles(self):
        urllib_request = self.try_get_module('urllib.request')
        return list(urllib_request._url_tempfiles)
    def restore_urllib_requests__url_tempfiles(self, tempfiles):
        for filename in tempfiles:
            os_helper.unlink(filename)
    def get_urllib_requests__opener(self):
        urllib_request = self.try_get_module('urllib.request')
        return urllib_request._opener
    def restore_urllib_requests__opener(self, opener):
        urllib_request = self.get_module('urllib.request')
        urllib_request._opener = opener
    def get_asyncio_events__event_loop_policy(self):
        self.try_get_module('asyncio')
        return support.maybe_get_event_loop_policy()
    def restore_asyncio_events__event_loop_policy(self, policy):
        asyncio = self.get_module('asyncio')
        asyncio.set_event_loop_policy(policy)
    def get_sys_argv(self):
        return id(sys.argv), sys.argv, sys.argv[:]
    def restore_sys_argv(self, saved_argv):
        sys.argv = saved_argv[1]
        sys.argv[:] = saved_argv[2]
    def get_cwd(self):
        return os.getcwd()
    def restore_cwd(self, saved_cwd):
        os.chdir(saved_cwd)
    def get_sys_stdout(self):
        return sys.stdout
    def restore_sys_stdout(self, saved_stdout):
        sys.stdout = saved_stdout
    def get_sys_stderr(self):
        return sys.stderr
    def restore_sys_stderr(self, saved_stderr):
        sys.stderr = saved_stderr
    def get_sys_stdin(self):
        return sys.stdin
    def restore_sys_stdin(self, saved_stdin):
        sys.stdin = saved_stdin
    def get_os_environ(self):
        return id(os.environ), os.environ, dict(os.environ)
    def restore_os_environ(self, saved_environ):
        os.environ = saved_environ[1]
        os.environ.clear()
        os.environ.update(saved_environ[2])
    def get_sys_path(self):
        return id(sys.path), sys.path, sys.path[:]
    def restore_sys_path(self, saved_path):
        sys.path = saved_path[1]
        sys.path[:] = saved_path[2]
    def get_sys_path_hooks(self):
        return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
    def restore_sys_path_hooks(self, saved_hooks):
        sys.path_hooks = saved_hooks[1]
        sys.path_hooks[:] = saved_hooks[2]
    def get_sys_gettrace(self):
        return sys.gettrace()
    def restore_sys_gettrace(self, trace_fxn):
        sys.settrace(trace_fxn)
    def get___import__(self):
        return builtins.__import__
    def restore___import__(self, import_):
        builtins.__import__ = import_
    def get_warnings_filters(self):
        warnings = self.try_get_module('warnings')
        return id(warnings.filters), warnings.filters, warnings.filters[:]
    def restore_warnings_filters(self, saved_filters):
        warnings = self.get_module('warnings')
        warnings.filters = saved_filters[1]
        warnings.filters[:] = saved_filters[2]
    def get_asyncore_socket_map(self):
        asyncore = sys.modules.get('test.support.asyncore')
        # XXX Making a copy keeps objects alive until __exit__ gets called.
        return asyncore and asyncore.socket_map.copy() or {}
    def restore_asyncore_socket_map(self, saved_map):
        asyncore = sys.modules.get('test.support.asyncore')
        if asyncore is not None:
            asyncore.close_all(ignore_all=True)
            asyncore.socket_map.update(saved_map)
    def get_shutil_archive_formats(self):
        shutil = self.try_get_module('shutil')
        # we could call get_archives_formats() but that only returns the
        # registry keys; we want to check the values too (the functions that
        # are registered)
        return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
    def restore_shutil_archive_formats(self, saved):
        shutil = self.get_module('shutil')
        shutil._ARCHIVE_FORMATS = saved[0]
        shutil._ARCHIVE_FORMATS.clear()
        shutil._ARCHIVE_FORMATS.update(saved[1])
    def get_shutil_unpack_formats(self):
        shutil = self.try_get_module('shutil')
        return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
    def restore_shutil_unpack_formats(self, saved):
        shutil = self.get_module('shutil')
        shutil._UNPACK_FORMATS = saved[0]
        shutil._UNPACK_FORMATS.clear()
        shutil._UNPACK_FORMATS.update(saved[1])
    def get_logging__handlers(self):
        logging = self.try_get_module('logging')
        # _handlers is a WeakValueDictionary
        return id(logging._handlers), logging._handlers, logging._handlers.copy()
    def restore_logging__handlers(self, saved_handlers):
        # Can't easily revert the logging state
        pass
    def get_logging__handlerList(self):
        logging = self.try_get_module('logging')
        # _handlerList is a list of weakrefs to handlers
        return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
    def restore_logging__handlerList(self, saved_handlerList):
        # Can't easily revert the logging state
        pass
    def get_sys_warnoptions(self):
        return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
    def restore_sys_warnoptions(self, saved_options):
        sys.warnoptions = saved_options[1]
        sys.warnoptions[:] = saved_options[2]
    # Controlling dangling references to Thread objects can make it easier
    # to track reference leaks.
    def get_threading__dangling(self):
        # This copies the weakrefs without making any strong reference
        return threading._dangling.copy()
    def restore_threading__dangling(self, saved):
        threading._dangling.clear()
        threading._dangling.update(saved)
    # Same for Process objects
    def get_multiprocessing_process__dangling(self):
        multiprocessing_process = self.try_get_module('multiprocessing.process')
        # Unjoined process objects can survive after process exits
        multiprocessing_process._cleanup()
        # This copies the weakrefs without making any strong reference
        return multiprocessing_process._dangling.copy()
    def restore_multiprocessing_process__dangling(self, saved):
        multiprocessing_process = self.get_module('multiprocessing.process')
        multiprocessing_process._dangling.clear()
        multiprocessing_process._dangling.update(saved)
    def get_sysconfig__CONFIG_VARS(self):
        # make sure the dict is initialized
        sysconfig = self.try_get_module('sysconfig')
        sysconfig.get_config_var('prefix')
        return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
                dict(sysconfig._CONFIG_VARS))
    def restore_sysconfig__CONFIG_VARS(self, saved):
        sysconfig = self.get_module('sysconfig')
        sysconfig._CONFIG_VARS = saved[1]
        sysconfig._CONFIG_VARS.clear()
        sysconfig._CONFIG_VARS.update(saved[2])
    def get_sysconfig__INSTALL_SCHEMES(self):
        sysconfig = self.try_get_module('sysconfig')
        return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
                sysconfig._INSTALL_SCHEMES.copy())
    def restore_sysconfig__INSTALL_SCHEMES(self, saved):
        sysconfig = self.get_module('sysconfig')
        sysconfig._INSTALL_SCHEMES = saved[1]
        sysconfig._INSTALL_SCHEMES.clear()
        sysconfig._INSTALL_SCHEMES.update(saved[2])
    def get_files(self):
        # XXX: Maybe add an allow-list here?
        return sorted(fn + ('/' if os.path.isdir(fn) else '')
                      for fn in os.listdir()
                      if not fn.startswith(".hypothesis"))
    def restore_files(self, saved_value):
        fn = os_helper.TESTFN
        if fn not in saved_value and (fn + '/') not in saved_value:
            if os.path.isfile(fn):
                os_helper.unlink(fn)
            elif os.path.isdir(fn):
                os_helper.rmtree(fn)
    _lc = [getattr(locale, lc) for lc in dir(locale)
           if lc.startswith('LC_')]
    def get_locale(self):
        pairings = []
        for lc in self._lc:
            try:
                pairings.append((lc, locale.setlocale(lc, None)))
            except (TypeError, ValueError):
                continue
        return pairings
    def restore_locale(self, saved):
        for lc, setting in saved:
            locale.setlocale(lc, setting)
    def get_warnings_showwarning(self):
        warnings = self.try_get_module('warnings')
        return warnings.showwarning
    def restore_warnings_showwarning(self, fxn):
        warnings = self.get_module('warnings')
        warnings.showwarning = fxn
    def resource_info(self):
        for name in self.resources:
            method_suffix = name.replace('.', '_')
            get_name = 'get_' + method_suffix
            restore_name = 'restore_' + method_suffix
            yield name, getattr(self, get_name), getattr(self, restore_name)
    def __enter__(self):
        self.saved_values = []
        for name, get, restore in self.resource_info():
            try:
                original = get()
            except SkipTestEnvironment:
                continue
            self.saved_values.append((name, get, restore, original))
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        saved_values = self.saved_values
        self.saved_values = None
        # Some resources use weak references
        support.gc_collect()
        for name, get, restore, original in saved_values:
            current = get()
            # Check for changes to the resource's value
            if current != original:
                support.environment_altered = True
                restore(original)
                if not self.quiet and not self.pgo:
                    print_warning(
                        f"{name} was modified by {self.test_name}\n"
                        f"  Before: {original}\n"
                        f"  After:  {current} ")
        return False
 |