File: test_sp.py

package info (click to toggle)
theano 1.0.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 30,752 kB
  • sloc: python: 141,182; ansic: 9,505; makefile: 259; sh: 214; pascal: 81
file content (221 lines) | stat: -rw-r--r-- 9,506 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from __future__ import absolute_import, print_function, division
from nose.plugins.skip import SkipTest
import sys
import time
import unittest

import theano.sparse
if not theano.sparse.enable_sparse:
    raise SkipTest('Optional package sparse disabled')

import scipy.sparse
from scipy.signal import convolve2d
import scipy.sparse as sparse
import numpy as np
from six.moves import xrange

from theano import function, tensor
import theano
from theano.compat import next
from theano.sparse.sandbox import sp
from theano.sparse.tests.test_basic import random_lil
from theano.tests import unittest_tools as utt
from theano.sparse.tests.test_basic import sparse_random_inputs
from theano.tests.unittest_tools import attr


class TestSP(unittest.TestCase):
    @attr('slow')
    def test_convolution(self):
#        print '\n\n*************************************************'
#        print '           TEST CONVOLUTION'
#        print '*************************************************'

        # fixed parameters
        bsize = 10     # batch size
        imshp = (28, 28)
        kshp = (5, 5)
        nkern = 5
        ssizes = ((1, 1), (2, 2), (3, 3), (4, 4))
        convmodes = ('full', 'valid')

        # symbolic stuff
        bias = tensor.dvector()
        kerns = tensor.dmatrix()
        input = tensor.dmatrix()
        rng = np.random.RandomState(3423489)
        filters = rng.randn(nkern, np.prod(kshp))
        biasvals = rng.randn(nkern)

        for mode in ('FAST_COMPILE', 'FAST_RUN'):
            ttot, ntot = 0, 0
            for conv_mode in convmodes:
                for ss in ssizes:

                    output, outshp = sp.convolve(kerns, kshp, nkern, input,\
                            imshp, ss, bias=bias, mode=conv_mode)
                    f = function([kerns, bias, input], output, mode=mode)

                    # now test with real values
                    img2d = np.arange(bsize * np.prod(imshp)).reshape(( \
                                                            bsize,) + imshp)
                    img1d = img2d.reshape(bsize, -1)

                    # create filters (need to be flipped to use convolve2d)
                    filtersflipped = np.zeros((nkern,) + kshp)
                    for k in range(nkern):
                        it = reversed(filters[k, :])
                        for i in range(kshp[0]):
                            for j in range(kshp[1]):
                                filtersflipped[k, i, j] = next(it)

                    # compute output with convolve2d
                    if conv_mode == 'valid':
                        fulloutshp = np.array(imshp) - np.array(kshp) + 1
                    else:
                        fulloutshp = np.array(imshp) + np.array(kshp) - 1
                    ntime1 = time.time()
                    refout = np.zeros((bsize,)+tuple(fulloutshp)+(nkern,))
                    for b in range(bsize):
                        for n in range(nkern):
                            refout[b, ..., n] = convolve2d(img2d[b, :, :],
                                                         filtersflipped[n, ...],
                                                         conv_mode)
                    ntot += time.time() - ntime1

                    # need to flatten images
                    bench1 = refout[:, 0::ss[0], 0::ss[1], :].reshape(bsize, -1, nkern)
                    bench1 += biasvals.reshape(1, 1, nkern)

                    # swap the last two dimensions (output needs to be nkern x outshp)
                    bench1 = np.swapaxes(bench1, 1, 2)
                    ttime1 = time.time()
                    out1 = f(filters, biasvals, img1d)
                    ttot += time.time() - ttime1
                    temp = bench1.flatten() - out1.flatten()

                    assert (temp < 1e-5).all()

                    # test downward propagation -- symbolic stuff
                    #vis = tensor.grad(output, input, output)
                    #downprop = function([kerns,input], vis, mode=mode)
                    #visval = downprop(filters,img1d)
                    # test downward propagation -- reference implementation
                    #pshape = (img1d.shape[0],np.prod(outshp[1:]),np.prod(kshp))
                    #patchstack = np.zeros(pshape)
                    # for bi in np.arange(pshape[0]): # batch index
                        #abspos = 0
                        # for outy in np.arange(outshp[1]):
                            # for outx in np.arange(outshp[2]):
                                # for ni in np.arange(nkern):
                                    # print 'filters[n,:].shape = ', filters[n,:].shape
                                    # print 'out1[bi,abspos].shape =',out1[bi,abspos].shape
                                    #patchstack[bi,abspos,:] = filters[n,:]*out1[bi,abspos]
                                    # abspos+=1
                    #patchstack = patchstack.reshape(1,-1)
                    # indices, indptr, spmat_shape, sptype, outshp = \
                            # sp.convolution_indices.conv_eval(imshp,kshp,ss,conv_mode)
                    #spmat = sparse.csc_matrix((np.ones_like(indices),indices,indptr),spmat_shape)
                    #visref = np.dot(patchstack, spmat.todense())

                    # print 'visval = ', visval
                    # print 'visref = ', visref

                    #assert np.all(visref==visval)


#            print '**** Convolution Profiling Results (',mode,') ****'
#            print 'Numpy processing time: ', ntot
#            print 'Theano processing time: ', ttot


    # this doesn't compare the output of anything... but I manually verified that the patches
    # are properly generated
    def test_multilayer_conv(self):
        # fixed parameters
        bsize = 10     # batch size
        imshp = (5, 5)
        kshp = ((3, 3), (2, 2))
        nkerns = (3, 6)  # per output pixel
        ssizes = (((1, 1), (2, 2)),)
        convmodes = ('full',)  # 'valid',)

        # symbolic stuff
        kerns = [tensor.dmatrix(), tensor.dmatrix()]
        input = tensor.dmatrix()
        rng = np.random.RandomState(3423489)

        # build actual input images
        img2d = np.arange(bsize*np.prod(imshp)).reshape((bsize,)+imshp)
        img1d = img2d.reshape(bsize, -1)

        for mode in ('FAST_COMPILE', 'FAST_RUN'):
            for conv_mode in convmodes:
                for ss in ssizes:

                    l1hid, l1shp = sp.convolve(kerns[0], kshp[0],\
                            nkerns[0], input, imshp, ss[0], mode=conv_mode)
                    l1propup = function([kerns[0], input], l1hid, mode=mode)

                    #l1kernvals = np.random.rand(nkerns[0],np.prod(kshp[0]))
                    l1kernvals = np.arange(nkerns[0]*np.prod(kshp[0])).reshape(nkerns[0], np.prod(kshp[0]))
                    l1hidval = l1propup(l1kernvals, img1d)

                    # actual values
                    l2hid, l2shp = sp.convolve(kerns[1], kshp[1],\
                            nkerns[1], l1hid, l1shp, ss[1], mode=conv_mode)
                    l2propup = function([kerns[1], l1hid], l2hid, mode=mode)

                    #l2kernvals = np.random.rand(nkerns[1],np.prod(kshp[1])*nkerns[0])
                    l2kernvals = np.arange(nkerns[1]*np.prod(kshp[1])*nkerns[0]).reshape(nkerns[1], np.prod(kshp[1])*nkerns[0])
                    # for debugging, we bring things back to integers
                    l1hidval = np.arange(np.size(l1hidval)).reshape(l1hidval.shape)

                    l2hidval = l2propup(l2kernvals, l1hidval)

    def test_maxpool(self):
        # generate flatted images
        maxpoolshps = ((2, 2), (3, 3), (4, 4), (5, 5), (6, 6))
        imval = np.random.rand(4, 5, 10, 10)

        images = tensor.dmatrix()
        for maxpoolshp in maxpoolshps:

            # symbolic stuff
            output, outshp = sp.max_pool(images, imval.shape[1:], maxpoolshp)
            f = function([images, ], [output, ])
            output_val = f(imval.reshape(imval.shape[0], -1))

            # numeric verification
            my_output_val = np.zeros((imval.shape[0], imval.shape[1],
                                     imval.shape[2] // maxpoolshp[0],
                                     imval.shape[3] // maxpoolshp[1]))
            assert np.prod(my_output_val.shape[1:]) == np.prod(np.r_[imval.shape[1], outshp])

            for n in range(imval.shape[0]):
                for k in range(imval.shape[1]):
                    for i in range(imval.shape[2] // maxpoolshp[0]):
                        for j in range(imval.shape[3] // maxpoolshp[1]):
                            ii, jj = i*maxpoolshp[0], j*maxpoolshp[1]
                            patch = imval[n, k, ii:ii+maxpoolshp[0], jj:jj+maxpoolshp[1]]
                            my_output_val[n, k, i, j] = np.max(patch)
            my_output_val = my_output_val.reshape(imval.shape[0], -1)
            assert np.all(output_val == my_output_val)

            def mp(input):
                output, outshp = sp.max_pool(input, imval.shape[1:], maxpoolshp)
                return output
            utt.verify_grad(mp, [imval.reshape(imval.shape[0], -1)])


if __name__ == '__main__':
    if 0:
        test_remove0()
        exit()
    if 1:
        testcase =  TestSP
        suite = unittest.TestLoader()
        suite = suite.loadTestsFromTestCase(testcase)
        unittest.TextTestRunner(verbosity=2).run(suite)
    else:
        unittest.main()