1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
|
# -*- coding: utf-8 -*-
"""Automated unit tests for testing audio playback and capture.
These tests require an OS loopback sound device that forwards audio
output, generated by PyAudio for playback, and forwards it to an input
device, which PyAudio can record and verify against a test signal.
On Mac OS X, Soundflower can create such a device.
On GNU/Linux, the snd-aloop kernel module provides a loopback ALSA
device. Use examples/system_info.py to identify the name of the loopback
device.
"""
import math
import os
import struct
import time
import unittest
import wave
import sys
import numpy
import pyaudio
import alsa_utils
# To skip tests requiring hardware, set this environment variable:
SKIP_HW_TESTS = 'PYAUDIO_SKIP_HW_TESTS' in os.environ
# To run tests that require a loopback device (disabled by default), set this
# variable. If SKIP_HW_TESTS is set, this variable has no effect.
ENABLE_LOOPBACK_TESTS = 'PYAUDIO_ENABLE_LOOPBACK_TESTS' in os.environ
DUMP_CAPTURE = False
setUpModule = alsa_utils.disable_error_handler_output
tearDownModule = alsa_utils.disable_error_handler_output
def _create_reference_signal(freqs, sampling_rate, width, duration):
"""Return reference signal with several sinuoids with frequencies."""
total_frames = int(sampling_rate * duration)
max_amp = float(2**(width * 8 - 1) - 1)
avg_amp = max_amp / len(freqs)
return [
int(
sum(avg_amp * math.sin(2 * math.pi * freq *
(k / float(sampling_rate)))
for freq in freqs)) for k in range(total_frames)
]
def _signal_to_chunks(frame_data, frames_per_chunk, channels):
"""Given an array of values comprising the signal, return an iterable
of binary chunks, with each chunk containing frames_per_chunk
frames. Each frame represents a single value from the signal,
duplicated for each channel specified by channels.
"""
frames = [struct.pack('h', x) * channels for x in frame_data]
# Chop up frames into chunks
return [
b''.join(chunk_frames) for chunk_frames in tuple(
frames[i:i + frames_per_chunk]
for i in range(0, len(frames), frames_per_chunk))
]
def _pcm16_to_numpy(bytestring):
"""From PCM 16-bit bytes, return an equivalent numpy array of values."""
return struct.unpack('%dh' % (len(bytestring) / 2), bytestring)
def _write_wav(filename, data, width, channels, rate):
"""Write PCM data to wave file."""
wf = wave.open(filename, 'wb')
wf.setnchannels(channels)
wf.setsampwidth(width)
wf.setframerate(rate)
wf.writeframes(data)
wf.close()
class StreamWireTests(unittest.TestCase):
def setUp(self):
self.p = pyaudio.PyAudio()
self.loopback_input_idx = None
self.loopback_output_idx = None
if ENABLE_LOOPBACK_TESTS:
(self.loopback_input_idx,
self.loopback_output_idx) = self._get_audio_loopback()
if not (self.loopback_input_idx is None
or self.loopback_input_idx >= 0):
raise OSError("No loopback device found")
if not (self.loopback_output_idx is None
or self.loopback_output_idx >= 0):
raise OSError("No loopback device found")
# Different platforms/devices support different number of channels for
# input streams. Inspect the desired input device and use the maximum
# number of channels.
try:
input_device_info = self.p.get_host_api_info_by_index(
self.loopback_input_idx) if self.loopback_input_idx else (
self.p.get_default_input_device_info())
except OSError as err:
raise OSError(
f"Invalid device index {self.loopback_input_idx}") from err
self.input_channels = input_device_info['maxInputChannels']
if self.input_channels < 1:
raise OSError("Invalid number of input channels for device")
def tearDown(self):
self.p.terminate()
def _get_audio_loopback(self):
if sys.platform == 'darwin':
return self._find_audio_device(
'Soundflower (2ch)', 'Soundflower (2ch)')
if sys.platform in ('linux', 'linux2'):
return self._find_audio_device(
'Loopback: PCM (hw:1,0)', 'Loopback: PCM (hw:1,1)')
if sys.platform == 'win32':
# Assumes running in a VM, in which the hypervisor can
# set up a loopback device to back the "default" audio devices.
# Here, None indicates default device.
return None, None
return -1, -1
def _find_audio_device(self, indev, outdev):
"""Utility to find audio loopback device."""
input_idx, output_idx = -1, -1
for device_idx in range(self.p.get_device_count()):
devinfo = self.p.get_device_info_by_index(device_idx)
if (outdev == devinfo.get('name') and
devinfo.get('maxOutputChannels', 0) > 0):
output_idx = device_idx
if (indev == devinfo.get('name') and
devinfo.get('maxInputChannels', 0) > 0):
input_idx = device_idx
if output_idx > -1 and input_idx > -1:
break
return input_idx, output_idx
@unittest.skipIf(SKIP_HW_TESTS or not ENABLE_LOOPBACK_TESTS,
'Loopback device required.')
def test_input_output_blocking(self):
"""Test blocking-based record and playback."""
rate = 44100 # frames per second
width = 2 # bytes per sample
channels = self.input_channels
# Blocking-mode might add some initial choppiness on some
# platforms/loopback devices, so set a longer duration.
duration = 3 # seconds
frames_per_chunk = 1024
freqs = [130.81, 329.63, 440.0, 466.16, 587.33, 739.99]
test_signal = _create_reference_signal(freqs, rate, width, duration)
audio_chunks = _signal_to_chunks(
test_signal, frames_per_chunk, channels)
out_stream = self.p.open(
format=self.p.get_format_from_width(width),
channels=channels,
rate=rate,
output=True,
frames_per_buffer=frames_per_chunk,
output_device_index=self.loopback_output_idx)
in_stream = self.p.open(
format=self.p.get_format_from_width(width),
channels=channels,
rate=rate,
input=True,
frames_per_buffer=frames_per_chunk,
input_device_index=self.loopback_input_idx)
captured = []
for chunk in audio_chunks:
out_stream.write(chunk)
captured.append(in_stream.read(frames_per_chunk))
# Capture a few more frames, since there is some lag.
for i in range(8):
captured.append(in_stream.read(frames_per_chunk))
in_stream.stop_stream()
out_stream.stop_stream()
if DUMP_CAPTURE:
_write_wav('test_blocking.wav', b''.join(captured),
width, channels, rate)
captured_signal = _pcm16_to_numpy(b''.join(captured))
captured_left_channel = captured_signal[::2]
captured_right_channel = captured_signal[1::2]
self._assert_pcm16_spectrum_nearly_equal(
rate,
captured_left_channel,
test_signal,
len(freqs))
self._assert_pcm16_spectrum_nearly_equal(
rate,
captured_right_channel,
test_signal,
len(freqs))
@unittest.skipIf(SKIP_HW_TESTS or not ENABLE_LOOPBACK_TESTS,
'Loopback device required.')
def test_input_output_callback(self):
"""Test callback-based record and playback."""
rate = 44100 # frames per second
width = 2 # bytes per sample
channels = self.input_channels
duration = 1 # second
frames_per_chunk = 1024
freqs = [130.81, 329.63, 440.0, 466.16, 587.33, 739.99]
test_signal = _create_reference_signal(freqs, rate, width, duration)
audio_chunks = _signal_to_chunks(
test_signal, frames_per_chunk, channels)
state = {'count': 0}
def out_callback(_, frame_count, time_info, status):
if state['count'] >= len(audio_chunks):
return ('', pyaudio.paComplete)
rval = (audio_chunks[state['count']], pyaudio.paContinue)
state['count'] += 1
return rval
captured = []
def in_callback(in_data, frame_count, time_info, status):
captured.append(in_data)
return (None, pyaudio.paContinue)
out_stream = self.p.open(
format=self.p.get_format_from_width(width),
channels=channels,
rate=rate,
output=True,
frames_per_buffer=frames_per_chunk,
output_device_index=self.loopback_output_idx,
stream_callback=out_callback)
in_stream = self.p.open(
format=self.p.get_format_from_width(width),
channels=channels,
rate=rate,
input=True,
frames_per_buffer=frames_per_chunk,
input_device_index=self.loopback_input_idx,
stream_callback=in_callback)
in_stream.start_stream()
out_stream.start_stream()
time.sleep(duration + 1)
in_stream.stop_stream()
out_stream.stop_stream()
if DUMP_CAPTURE:
_write_wav('test_callback.wav', b''.join(captured),
width, channels, rate)
captured_signal = _pcm16_to_numpy(b''.join(captured))
captured_left_channel = captured_signal[::2]
captured_right_channel = captured_signal[1::2]
self._assert_pcm16_spectrum_nearly_equal(
rate,
captured_left_channel,
test_signal,
len(freqs))
self._assert_pcm16_spectrum_nearly_equal(
rate,
captured_right_channel,
test_signal,
len(freqs))
def _assert_pcm16_spectrum_nearly_equal(self, sampling_rate, cap, ref,
num_freq_peaks_expected):
"""Compares the discrete fourier transform of a captured signal
against the reference signal and ensures that the frequency peaks
match."""
# When passing a reference signal through the loopback device,
# the captured signal may include additional noise, as well as
# time lag, so testing that the captured signal is "similar
# enough" to the reference using bit-wise equality won't work
# well. Instead, the approach here a) assumes the reference
# signal is a sum of sinusoids and b) computes the discrete
# fourier transform of the reference and captured signals, and
# ensures that the frequencies of the top
# num_freq_peaks_expected frequency peaks are close.
cap_fft = numpy.absolute(numpy.fft.rfft(cap))
ref_fft = numpy.absolute(numpy.fft.rfft(ref))
# Find the indices of the peaks:
cap_peak_indices = sorted(numpy.argpartition(
cap_fft, -num_freq_peaks_expected)[-num_freq_peaks_expected:])
ref_peak_indices = sorted(numpy.argpartition(
ref_fft, -num_freq_peaks_expected)[-num_freq_peaks_expected:])
# Ensure that the corresponding frequencies of the peaks are close:
for cap_freq_index, ref_freq_index in zip(cap_peak_indices,
ref_peak_indices):
cap_freq = cap_freq_index / float(len(cap)) * (sampling_rate / 2)
ref_freq = ref_freq_index / float(len(ref)) * (sampling_rate / 2)
diff = abs(cap_freq - ref_freq)
self.assertLess(diff, 1.0)
# As an additional test, verify that the spectrum (not just
# the peaks) of the reference and captured signal are similar
# by computing the cross-correlation of the spectra. Assuming they
# are nearly identical, the cross-correlation should contain a large
# peak when the spectra overlap and mostly 0s elsewhere. Verify that
# using a histogram of the cross-correlation:
freq_corr_hist, _ = numpy.histogram(
numpy.correlate(cap_fft, ref_fft, mode='full'),
bins=10)
self.assertLess(sum(freq_corr_hist[2:])/sum(freq_corr_hist), 1e-2)
|