1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
#!/usr/bin/env python3
"""Play an audio file using a limited amount of memory.
This is the same as play_long_file.py, but implemented without using NumPy.
"""
import argparse
import queue
import sys
import threading
import sounddevice as sd
import soundfile as sf
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'filename', metavar='FILENAME',
help='audio file to be played back')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-b', '--blocksize', type=int, default=2048,
help='block size (default: %(default)s)')
parser.add_argument(
'-q', '--buffersize', type=int, default=20,
help='number of blocks used for buffering (default: %(default)s)')
args = parser.parse_args(remaining)
if args.blocksize == 0:
parser.error('blocksize must not be zero')
if args.buffersize < 1:
parser.error('buffersize must be at least 1')
q = queue.Queue(maxsize=args.buffersize)
event = threading.Event()
def callback(outdata, frames, time, status):
assert frames == args.blocksize
if status.output_underflow:
print('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
data = q.get_nowait()
except queue.Empty as e:
print('Buffer is empty: increase buffersize?', file=sys.stderr)
raise sd.CallbackAbort from e
if len(data) < len(outdata):
outdata[:len(data)] = data
outdata[len(data):] = b'\x00' * (len(outdata) - len(data))
raise sd.CallbackStop
else:
outdata[:] = data
try:
with sf.SoundFile(args.filename) as f:
for _ in range(args.buffersize):
data = f.buffer_read(args.blocksize, dtype='float32')
if not data:
break
q.put_nowait(data) # Pre-fill queue
stream = sd.RawOutputStream(
samplerate=f.samplerate, blocksize=args.blocksize,
device=args.device, channels=f.channels, dtype='float32',
callback=callback, finished_callback=event.set)
with stream:
timeout = args.blocksize * args.buffersize / f.samplerate
while data:
data = f.buffer_read(args.blocksize, dtype='float32')
q.put(data, timeout=timeout)
event.wait() # Wait until playback is finished
except KeyboardInterrupt:
parser.exit('\nInterrupted by user')
except queue.Full:
# A timeout occurred, i.e. there was an error in the callback
parser.exit(1)
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))
|