1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2011-2016 David Robillard <d@drobilla.net>
# SPDX-License-Identifier: ISC
import math
import lilv
import sys
import wave
import numpy
class WavFile(object):
"""Helper class for accessing wav file data. Should work on the most common
formats (8 bit unsigned, 16 bit signed, 32 bit signed). Audio data is
converted to float32."""
# (struct format code, is_signedtype) for each sample width:
WAV_SPECS = {
1: ("B", False),
2: ("h", True),
4: ("l", True),
}
def __init__(self, wav_in_path):
self.wav_in = wave.open(wav_in_path, 'r')
self.framerate = self.wav_in.getframerate()
self.nframes = self.wav_in.getnframes()
self.nchannels = self.wav_in.getnchannels()
self.sampwidth = self.wav_in.getsampwidth()
wav_spec = self.WAV_SPECS[self.sampwidth]
self.struct_fmt_code, self.signed = wav_spec
self.range = 2 ** (8*self.sampwidth)
def read(self):
"""Read data from an open wav file. Return a list of channels, where each
channel is a list of floats."""
raw_bytes = self.wav_in.readframes(self.nframes)
struct_fmt = "%u%s" % (len(raw_bytes) / self.sampwidth, self.struct_fmt_code)
data = wave.struct.unpack(struct_fmt, raw_bytes)
if self.signed:
data = [i / float(self.range/2) for i in data]
else:
data = [(i - float(range/2)) / float(range/2) for i in data]
channels = []
for i in range(self.nchannels):
channels.append([data[j] for j in range(0, len(data), self.nchannels) ])
return channels
def close(self):
self.wav_in.close()
def main():
# Read command line arguments
if len(sys.argv) != 4:
print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV')
sys.exit(1)
# Initialise Lilv
world = lilv.World()
ns = world.ns
world.load_all()
plugin_uri = sys.argv[1]
wav_in_path = sys.argv[2]
wav_out_path = sys.argv[3]
# Find plugin
plugin_uri_node = world.new_uri(plugin_uri)
plugins = world.get_all_plugins()
if plugin_uri_node not in plugins:
print("Unknown plugin `%s'" % plugin_uri)
sys.exit(1)
plugin = plugins[plugin_uri_node]
n_audio_in = plugin.get_num_ports_of_class(ns.lv2.InputPort, ns.lv2.AudioPort)
n_audio_out = plugin.get_num_ports_of_class(ns.lv2.OutputPort, ns.lv2.AudioPort)
if n_audio_out == 0:
print("Plugin has no audio outputs\n")
sys.exit(1)
# Open input file
try:
wav_in = WavFile(wav_in_path)
except:
print("Failed to open input `%s'\n" % wav_in_path)
sys.exit(1)
if wav_in.nchannels != n_audio_in:
print("Input has %d channels, but plugin has %d audio inputs\n" % (
wav_in.nchannels, n_audio_in))
sys.exit(1)
# Open output file
wav_out = wave.open(wav_out_path, 'w')
if not wav_out:
print("Failed to open output `%s'\n" % wav_out_path)
sys.exit(1)
# Set output file to same format as input (except possibly nchannels)
wav_out.setparams(wav_in.wav_in.getparams())
wav_out.setnchannels(n_audio_out)
print('%s => %s => %s @ %d Hz'
% (wav_in_path, plugin.get_name(), wav_out_path, wav_in.framerate))
instance = lilv.Instance(plugin, wav_in.framerate)
channels = wav_in.read()
wav_in.close()
# Connect all ports to buffers. NB if we fail to connect any buffer, lilv
# will segfault.
audio_input_buffers = []
audio_output_buffers = []
control_input_buffers = []
control_output_buffers = []
for index in range(plugin.get_num_ports()):
port = plugin.get_port_by_index(index)
if port.is_a(ns.lv2.InputPort):
if port.is_a(ns.lv2.AudioPort):
audio_input_buffers.append(numpy.array(channels[len(audio_input_buffers)], numpy.float32))
instance.connect_port(index, audio_input_buffers[-1])
elif port.is_a(ns.lv2.ControlPort):
default = float(port.get(ns.lv2.default))
control_input_buffers.append(numpy.array([default], numpy.float32))
instance.connect_port(index, control_input_buffers[-1])
else:
raise ValueError("Unhandled port type")
elif port.is_a(ns.lv2.OutputPort):
if port.is_a(ns.lv2.AudioPort):
audio_output_buffers.append(numpy.array([0] * wav_in.nframes, numpy.float32))
instance.connect_port(index, audio_output_buffers[-1])
elif port.is_a(ns.lv2.ControlPort):
control_output_buffers.append(numpy.array([0], numpy.float32))
instance.connect_port(index, control_output_buffers[-1])
else:
raise ValueError("Unhandled port type")
# Run the plugin:
instance.run(wav_in.nframes)
# Interleave output buffers:
data = numpy.dstack(audio_output_buffers).flatten()
# Return to original int range:
if wav_in.signed:
data = data * float(wav_in.range / 2)
else:
data = (data + 1) * float(wav_in.range/2)
# Write output file in chunks to stop memory usage getting out of hand:
CHUNK_SIZE = 8192
for chunk in numpy.array_split(data, CHUNK_SIZE):
wav_out.writeframes(wave.struct.pack("%u%s" % (len(chunk), wav_in.struct_fmt_code), *chunk.astype(int)))
wav_out.close()
if __name__ == "__main__":
main()
|