1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
#! /usr/bin/env python
from numpy.testing import TestCase
from aubio import fvec, source, sink
from utils import list_all_sounds, get_tmp_sink_path, del_tmp_sink_path
from utils import parse_file_samplerate
from _tools import parametrize, skipTest, assert_raises, assert_warns
list_of_sounds = list_all_sounds('sounds')
samplerates = [0, 44100, 8000, 32000]
hop_sizes = [512, 1024, 64]
path = None
many_files = 300 # 256 opened files is too much
all_params = []
for soundfile in list_of_sounds:
for hop_size in hop_sizes:
for samplerate in samplerates:
all_params.append((hop_size, samplerate, soundfile))
class Test_aubio_sink(object):
def test_wrong_filename(self):
with assert_raises(RuntimeError):
sink('')
def test_wrong_samplerate(self):
with assert_raises(RuntimeError):
sink(get_tmp_sink_path(), -1)
def test_wrong_samplerate_too_large(self):
with assert_raises(RuntimeError):
sink(get_tmp_sink_path(), 1536001, 2)
def test_wrong_channels(self):
with assert_raises(RuntimeError):
sink(get_tmp_sink_path(), 44100, -1)
def test_wrong_channels_too_large(self):
with assert_raises(RuntimeError):
sink(get_tmp_sink_path(), 44100, 202020)
def test_many_sinks(self):
from tempfile import mkdtemp
import os.path
import shutil
tmpdir = mkdtemp()
sink_list = []
for i in range(many_files):
path = os.path.join(tmpdir, 'f-' + str(i) + '.wav')
g = sink(path, 0)
sink_list.append(g)
write = 32
for _ in range(200):
vec = fvec(write)
g(vec, write)
g.close()
shutil.rmtree(tmpdir)
@parametrize('hop_size, samplerate, path', all_params)
def test_read_and_write(self, hop_size, samplerate, path):
orig_samplerate = parse_file_samplerate(soundfile)
try:
if orig_samplerate is not None and orig_samplerate < samplerate:
# upsampling should emit a warning
with assert_warns(UserWarning):
f = source(soundfile, samplerate, hop_size)
else:
f = source(soundfile, samplerate, hop_size)
except RuntimeError as e:
err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
skipTest(err_msg.format(str(e), hop_size, samplerate))
if samplerate == 0: samplerate = f.samplerate
sink_path = get_tmp_sink_path()
g = sink(sink_path, samplerate)
total_frames = 0
while True:
vec, read = f()
g(vec, read)
total_frames += read
if read < f.hop_size: break
del_tmp_sink_path(sink_path)
@parametrize('hop_size, samplerate, path', all_params)
def test_read_and_write_multi(self, hop_size, samplerate, path):
orig_samplerate = parse_file_samplerate(soundfile)
try:
if orig_samplerate is not None and orig_samplerate < samplerate:
# upsampling should emit a warning
with assert_warns(UserWarning):
f = source(soundfile, samplerate, hop_size)
else:
f = source(soundfile, samplerate, hop_size)
except RuntimeError as e:
err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
skipTest(err_msg.format(str(e), hop_size, samplerate))
if samplerate == 0: samplerate = f.samplerate
sink_path = get_tmp_sink_path()
g = sink(sink_path, samplerate, channels = f.channels)
total_frames = 0
while True:
vec, read = f.do_multi()
g.do_multi(vec, read)
total_frames += read
if read < f.hop_size: break
del_tmp_sink_path(sink_path)
def test_close_file(self):
samplerate = 44100
sink_path = get_tmp_sink_path()
g = sink(sink_path, samplerate)
g.close()
del_tmp_sink_path(sink_path)
def test_close_file_twice(self):
samplerate = 44100
sink_path = get_tmp_sink_path()
g = sink(sink_path, samplerate)
g.close()
g.close()
del_tmp_sink_path(sink_path)
def test_read_with(self):
samplerate = 44100
sink_path = get_tmp_sink_path()
vec = fvec(128)
with sink(sink_path, samplerate) as g:
for _ in range(10):
g(vec, 128)
if __name__ == '__main__':
from _tools import run_module_suite
run_module_suite()
|