1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
#!/usr/bin/env python3
"""Play a web stream.
ffmpeg-python (https://github.com/kkroening/ffmpeg-python) has to be installed.
If you don't know a stream URL, try http://icecast.spc.org:8000/longplayer
(see https://longplayer.org/ for a description).
"""
import argparse
import queue
import sys
import ffmpeg
import sounddevice as sd
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
'url', metavar='URL',
help='stream URL')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='output device (numeric ID or substring)')
parser.add_argument(
'-b', '--blocksize', type=int, default=1024,
help='block size (default: %(default)s)')
parser.add_argument(
'-q', '--buffersize', type=int, default=20,
help='number of blocks used for buffering (default: %(default)s)')
args = parser.parse_args(remaining)
if args.blocksize == 0:
parser.error('blocksize must not be zero')
if args.buffersize < 1:
parser.error('buffersize must be at least 1')
q = queue.Queue(maxsize=args.buffersize)
print('Getting stream information ...')
try:
info = ffmpeg.probe(args.url)
except ffmpeg.Error as e:
sys.stderr.buffer.write(e.stderr)
parser.exit(1, str(e))
streams = info.get('streams', [])
if len(streams) != 1:
parser.exit(1, 'There must be exactly one stream available')
stream = streams[0]
if stream.get('codec_type') != 'audio':
parser.exit(1, 'The stream must be an audio stream')
channels = stream['channels']
samplerate = float(stream['sample_rate'])
def callback(outdata, frames, time, status):
assert frames == args.blocksize
if status.output_underflow:
print('Output underflow: increase blocksize?', file=sys.stderr)
raise sd.CallbackAbort
assert not status
try:
data = q.get_nowait()
except queue.Empty as e:
print('Buffer is empty: increase buffersize?', file=sys.stderr)
raise sd.CallbackAbort from e
assert len(data) == len(outdata)
outdata[:] = data
try:
print('Opening stream ...')
process = ffmpeg.input(
args.url
).output(
'pipe:',
format='f32le',
acodec='pcm_f32le',
ac=channels,
ar=samplerate,
loglevel='quiet',
).run_async(pipe_stdout=True)
stream = sd.RawOutputStream(
samplerate=samplerate, blocksize=args.blocksize,
device=args.device, channels=channels, dtype='float32',
callback=callback)
read_size = args.blocksize * channels * stream.samplesize
print('Buffering ...')
for _ in range(args.buffersize):
q.put_nowait(process.stdout.read(read_size))
print('Starting Playback ...')
with stream:
timeout = args.blocksize * args.buffersize / samplerate
while True:
q.put(process.stdout.read(read_size), timeout=timeout)
except KeyboardInterrupt:
parser.exit(1, '\nInterrupted by user')
except queue.Full:
# A timeout occurred, i.e. there was an error in the callback
parser.exit(1)
except Exception as e:
parser.exit(1, type(e).__name__ + ': ' + str(e))
|