1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
from __future__ import absolute_import, print_function
import time
import parser
import warnings
from numpy import (float32, float64, complex64, complex128,
zeros, random, array)
from numpy.testing import (TestCase, assert_equal,
assert_allclose, run_module_suite)
from scipy.weave import blitz_tools, blitz, BlitzWarning
from scipy.weave.ast_tools import harvest_variables
from weave_test_utils import remove_whitespace, debug_print, TempdirBlitz, dec
class TestAstToBlitzExpr(TestCase):
def generic_check(self,expr,desired):
ast = parser.suite(expr)
ast_list = ast.tolist()
actual = blitz_tools.ast_to_blitz_expr(ast_list)
actual = remove_whitespace(actual)
desired = remove_whitespace(desired)
assert_equal(actual,desired,expr)
def test_simple_expr(self):
# convert simple expr to blitz
expr = "a[:1:2] = b[:1+i+2:]"
desired = "a(blitz::Range(_beg,1-1,2))="\
"b(blitz::Range(_beg,1+i+2-1));"
self.generic_check(expr,desired)
def test_fdtd_expr(self):
# Convert fdtd equation to blitz.
# Note: This really should have "\" at the end of each line to
# indicate continuation.
expr = "ex[:,1:,1:] = ca_x[:,1:,1:] * ex[:,1:,1:]" \
"+ cb_y_x[:,1:,1:] * (hz[:,1:,1:] - hz[:,:-1,:])"\
"- cb_z_x[:,1:,1:] * (hy[:,1:,1:] - hy[:,1:,:-1])"
desired = 'ex(_all,blitz::Range(1,_end),blitz::Range(1,_end))='\
' ca_x(_all,blitz::Range(1,_end),blitz::Range(1,_end))'\
' *ex(_all,blitz::Range(1,_end),blitz::Range(1,_end))'\
'+cb_y_x(_all,blitz::Range(1,_end),blitz::Range(1,_end))'\
'*(hz(_all,blitz::Range(1,_end),blitz::Range(1,_end))'\
' -hz(_all,blitz::Range(_beg,Nhz(1)-1-1),_all))'\
' -cb_z_x(_all,blitz::Range(1,_end),blitz::Range(1,_end))'\
'*(hy(_all,blitz::Range(1,_end),blitz::Range(1,_end))'\
'-hy(_all,blitz::Range(1,_end),blitz::Range(_beg,Nhy(2)-1-1)));'
self.generic_check(expr,desired)
class TestBlitz(TestCase):
"""These are long running tests...
Would be useful to benchmark these things somehow.
"""
def generic_check(self, expr, arg_dict, type, size):
clean_result = array(arg_dict['result'],copy=1)
t1 = time.time()
exec(expr, globals(),arg_dict)
t2 = time.time()
standard = t2 - t1
desired = arg_dict['result']
arg_dict['result'] = clean_result
t1 = time.time()
blitz_tools.blitz(expr,arg_dict,{},verbose=0)
t2 = time.time()
compiled = t2 - t1
actual = arg_dict['result']
# TODO: this isn't very stringent. Need to tighten this up and
# learn where failures are occurring.
assert_allclose(abs(actual.ravel()), abs(desired.ravel()),
rtol=1e-4, atol=1e-6)
return standard, compiled
def generic_2d(self,expr,typ):
# The complex testing is pretty lame...
ast = parser.suite(expr)
arg_list = harvest_variables(ast.tolist())
all_sizes = [(10,10), (50,50), (100,100), (500,500), (1000,1000)]
debug_print('\nExpression:', expr)
with TempdirBlitz():
for size in all_sizes:
arg_dict = {}
for arg in arg_list:
arg_dict[arg] = random.normal(0,1,size).astype(typ)
# set imag part of complex values to non-zero value
try:
arg_dict[arg].imag = arg_dict[arg].real
except:
pass
debug_print('Run:', size,typ)
standard,compiled = self.generic_check(expr,arg_dict,type,size)
try:
speed_up = standard/compiled
except:
speed_up = -1.
debug_print("1st run(numpy,compiled,speed up): %3.4f, %3.4f, "
"%3.4f" % (standard,compiled,speed_up))
standard,compiled = self.generic_check(expr,arg_dict,type,size)
try:
speed_up = standard/compiled
except:
speed_up = -1.
debug_print("2nd run(numpy,compiled,speed up): %3.4f, %3.4f, "
"%3.4f" % (standard,compiled,speed_up))
@dec.slow
def test_5point_avg_2d_float(self):
expr = "result[1:-1,1:-1] = (b[1:-1,1:-1] + b[2:,1:-1] + b[:-2,1:-1]" \
"+ b[1:-1,2:] + b[1:-1,:-2]) / 5."
self.generic_2d(expr,float32)
@dec.slow
def test_5point_avg_2d_double(self):
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=BlitzWarning)
expr = "result[1:-1,1:-1] = (b[1:-1,1:-1] + b[2:,1:-1] + b[:-2,1:-1]" \
"+ b[1:-1,2:] + b[1:-1,:-2]) / 5."
self.generic_2d(expr,float64)
@dec.slow
def _check_5point_avg_2d_complex_float(self):
""" Note: THIS TEST is KNOWN TO FAIL ON GCC 3.x.
It will not adversely affect 99.99 percent of weave
result[1:-1,1:-1] = (b[1:-1,1:-1] + b[2:,1:-1] + b[:-2,1:-1]
+ b[1:-1,2:] + b[1:-1,:-2]) / 5.
Note: THIS TEST is KNOWN TO FAIL ON GCC 3.x. The reason is that
5. is a double and b is a complex32. blitz doesn't know
how to handle complex32/double. See:
http://www.oonumerics.org/MailArchives/blitz-support/msg00541.php
Unfortunately, the fix isn't trivial. Instead of fixing it, I
prefer to wait until we replace blitz++ with Pat Miller's code
that doesn't rely on blitz..
"""
expr = "result[1:-1,1:-1] = (b[1:-1,1:-1] + b[2:,1:-1] + b[:-2,1:-1]" \
"+ b[1:-1,2:] + b[1:-1,:-2]) / 5."
self.generic_2d(expr,complex64)
@dec.slow
def test_5point_avg_2d_complex_double(self):
expr = "result[1:-1,1:-1] = (b[1:-1,1:-1] + b[2:,1:-1] + b[:-2,1:-1]" \
"+ b[1:-1,2:] + b[1:-1,:-2]) / 5."
self.generic_2d(expr,complex128)
@dec.slow
def test_blitz_bug():
# Assignment to arr[i:] used to fail inside blitz expressions.
with TempdirBlitz():
N = 4
expr_buggy = 'arr_blitz_buggy[{0}:] = arr[{0}:]'
expr_not_buggy = 'arr_blitz_not_buggy[{0}:{1}] = arr[{0}:]'
random.seed(7)
arr = random.randn(N)
sh = arr.shape[0]
for lim in [0, 1, 2]:
arr_blitz_buggy = zeros(N)
arr_blitz_not_buggy = zeros(N)
arr_np = zeros(N)
blitz(expr_buggy.format(lim))
blitz(expr_not_buggy.format(lim, 'sh'))
arr_np[lim:] = arr[lim:]
assert_allclose(arr_blitz_buggy, arr_np)
assert_allclose(arr_blitz_not_buggy, arr_np)
if __name__ == "__main__":
run_module_suite()
|