1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
from __future__ import absolute_import, print_function
import os
import tempfile
import glob
from distutils.errors import DistutilsFileError
import distutils.file_util
from numpy.testing import dec
from scipy.weave import catalog
def slow(t):
"""Replacement for numpy.testing.dec.slow for weave."""
t.slow = True
if ('RUN_WEAVE_TESTS' in os.environ and
bool(os.environ['RUN_WEAVE_TESTS'])):
t.__test__ = True
else:
t.__test__ = False
return t
dec.slow = slow
def remove_whitespace(in_str):
out = in_str.replace(" ","")
out = out.replace("\t","")
out = out.replace("\n","")
return out
###################################################
# mainly used by catalog tests
###################################################
def temp_catalog_files(prefix=''):
# might need to add some more platform specific catalog file
# suffixes to remove. The .pag was recently added for SunOS
d = catalog.default_dir()
f = catalog.os_dependent_catalog_name()
return glob.glob(os.path.join(d,prefix+f+'*'))
def clear_temp_catalog():
"""Remove any catalog from the temp dir."""
backup_dir = tempfile.mkdtemp()
for file in temp_catalog_files():
move_file(file,backup_dir)
return backup_dir
def restore_temp_catalog(backup_dir):
"""Remove any catalog from the temp dir"""
cat_dir = catalog.default_dir()
for file in os.listdir(backup_dir):
file = os.path.join(backup_dir,file)
d,f = os.path.split(file)
dst_file = os.path.join(cat_dir, f)
if os.path.exists(dst_file):
os.remove(dst_file)
move_file(file,dst_file)
os.rmdir(backup_dir)
def empty_temp_dir():
"""Create a sub directory in the temp directory for use in tests"""
d = catalog.default_dir()
for i in range(10000):
new_d = os.path.join(d,tempfile.gettempprefix()[1:-1]+repr(i))
if not os.path.exists(new_d):
os.mkdir(new_d)
break
return new_d
def cleanup_temp_dir(d):
"""Remove a directory created by empty_temp_dir().
This should probably catch some errors.
"""
files = map(lambda x,d=d: os.path.join(d,x),os.listdir(d))
for i in files:
try:
if os.path.isdir(i):
cleanup_temp_dir(i)
else:
os.remove(i)
except OSError:
# failed to remove file for whatever reason (maybe it is a DLL
# Python is currently using)
pass
try:
os.rmdir(d)
except OSError:
pass
class TempdirBlitz():
"""A context manager to create a tempdir and make blitz() use it.
Also cleans up the tempdir on exit. Usage::
with TempdirBlitz():
weave.blitz(expr)
When using blitz without this contextmanager, it tends to litter .so and
.cpp files all over the dir from which tests are run.
"""
def __init__(self):
self._entered = False
def __enter__(self):
self._entered = True
self.module_location = empty_temp_dir()
self._old_env = os.environ.get('PYTHONCOMPILED', None)
os.environ['PYTHONCOMPILED'] = self.module_location
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
if self._old_env is None:
os.environ.pop('PYTHONCOMPILED')
else:
os.environ['PYTHONCOMPILED'] = self._old_env
cleanup_temp_dir(self.module_location)
# from distutils -- old versions had bug, so copying here to make sure
# a working version is available.
def move_file(src, dst,
verbose=0,
dry_run=0):
"""Move a file 'src' to 'dst'. If 'dst' is a directory, the file will
be moved into it with the same name; otherwise, 'src' is just renamed
to 'dst'. Return the new full name of the file.
Handles cross-device moves on Unix using 'copy_file()'. What about
other systems???
"""
from os.path import exists, isfile, isdir, basename, dirname
import errno
if verbose:
print("moving %s -> %s" % (src, dst))
if dry_run:
return dst
if not isfile(src):
raise DistutilsFileError("can't move '%s': not a regular file" % src)
if isdir(dst):
dst = os.path.join(dst, basename(src))
elif exists(dst):
raise DistutilsFileError("can't move '%s': destination '%s' already "
"exists" % (src, dst))
if not isdir(dirname(dst)):
raise DistutilsFileError("can't move '%s': destination '%s' not a "
"valid path" % (src, dst))
copy_it = 0
try:
os.rename(src, dst)
except os.error as xxx_todo_changeme1:
(num, msg) = xxx_todo_changeme1.args
if num == errno.EXDEV:
copy_it = 1
else:
raise DistutilsFileError("couldn't move '%s' to '%s': %s" %
(src, dst, msg))
if copy_it:
distutils.file_util.copy_file(src, dst)
try:
os.unlink(src)
except os.error as xxx_todo_changeme:
(num, msg) = xxx_todo_changeme.args
try:
os.unlink(dst)
except os.error:
pass
raise DistutilsFileError("couldn't move '%s' to '%s' by copy/delete: "
"delete '%s' failed: %s" % (src, dst, src, msg))
return dst
def debug_print(*arg):
# Set to true to enable printing debug / benchmark info when running
# these tests
WEAVE_DEBUG = False
if WEAVE_DEBUG:
print(*arg)
|