The `av.open` functions checks input metadata to determine the mode to open with ("r" or "w"). If an input to `decode_audio` is found to be in write-mode, without this change it can't be read. Forcing read mode solves this.
105 lines
2.7 KiB
Python
105 lines
2.7 KiB
Python
"""We use the PyAV library to decode the audio: https://github.com/PyAV-Org/PyAV
|
|
|
|
The advantage of PyAV is that it bundles the FFmpeg libraries so there is no additional
|
|
system dependencies. FFmpeg does not need to be installed on the system.
|
|
|
|
However, the API is quite low-level so we need to manipulate audio frames directly.
|
|
"""
|
|
|
|
import gc
|
|
import io
|
|
import itertools
|
|
|
|
from typing import BinaryIO, Union
|
|
|
|
import av
|
|
import numpy as np
|
|
|
|
|
|
def decode_audio(
|
|
input_file: Union[str, BinaryIO],
|
|
sampling_rate: int = 16000,
|
|
split_stereo: bool = False,
|
|
):
|
|
"""Decodes the audio.
|
|
|
|
Args:
|
|
input_file: Path to the input file or a file-like object.
|
|
sampling_rate: Resample the audio to this sample rate.
|
|
split_stereo: Return separate left and right channels.
|
|
|
|
Returns:
|
|
A float32 Numpy array.
|
|
|
|
If `split_stereo` is enabled, the function returns a 2-tuple with the
|
|
separated left and right channels.
|
|
"""
|
|
resampler = av.audio.resampler.AudioResampler(
|
|
format="s16",
|
|
layout="mono" if not split_stereo else "stereo",
|
|
rate=sampling_rate,
|
|
)
|
|
|
|
raw_buffer = io.BytesIO()
|
|
dtype = None
|
|
|
|
with av.open(input_file, mode="r", metadata_errors="ignore") as container:
|
|
frames = container.decode(audio=0)
|
|
frames = _ignore_invalid_frames(frames)
|
|
frames = _group_frames(frames, 500000)
|
|
frames = _resample_frames(frames, resampler)
|
|
|
|
for frame in frames:
|
|
array = frame.to_ndarray()
|
|
dtype = array.dtype
|
|
raw_buffer.write(array)
|
|
|
|
# It appears that some objects related to the resampler are not freed
|
|
# unless the garbage collector is manually run.
|
|
del resampler
|
|
gc.collect()
|
|
|
|
audio = np.frombuffer(raw_buffer.getbuffer(), dtype=dtype)
|
|
|
|
# Convert s16 back to f32.
|
|
audio = audio.astype(np.float32) / 32768.0
|
|
|
|
if split_stereo:
|
|
left_channel = audio[0::2]
|
|
right_channel = audio[1::2]
|
|
return left_channel, right_channel
|
|
|
|
return audio
|
|
|
|
|
|
def _ignore_invalid_frames(frames):
|
|
iterator = iter(frames)
|
|
|
|
while True:
|
|
try:
|
|
yield next(iterator)
|
|
except StopIteration:
|
|
break
|
|
except av.error.InvalidDataError:
|
|
continue
|
|
|
|
|
|
def _group_frames(frames, num_samples=None):
|
|
fifo = av.audio.fifo.AudioFifo()
|
|
|
|
for frame in frames:
|
|
frame.pts = None # Ignore timestamp check.
|
|
fifo.write(frame)
|
|
|
|
if num_samples is not None and fifo.samples >= num_samples:
|
|
yield fifo.read()
|
|
|
|
if fifo.samples > 0:
|
|
yield fifo.read()
|
|
|
|
|
|
def _resample_frames(frames, resampler):
|
|
# Add None to flush the resampler.
|
|
for frame in itertools.chain(frames, [None]):
|
|
yield from resampler.resample(frame)
|