diff --git a/lib_v2/dataset.py b/lib_v2/dataset.py deleted file mode 100644 index a4a8836..0000000 --- a/lib_v2/dataset.py +++ /dev/null @@ -1,119 +0,0 @@ -import os - -import numpy as np -import torch -from tqdm import tqdm - -from lib_v2 import spec_utils - - -class VocalRemoverValidationSet(torch.utils.data.Dataset): - - def __init__(self, filelist): - self.filelist = filelist - - def __len__(self): - return len(self.filelist) - - def __getitem__(self, idx): - path = self.filelist[idx] - data = np.load(path) - - return data['X'], data['y'] - - -def mixup_generator(X, y, rate, alpha): - perm = np.random.permutation(len(X))[:int(len(X) * rate)] - for i in range(len(perm) - 1): - lam = np.random.beta(alpha, alpha) - X[perm[i]] = lam * X[perm[i]] + (1 - lam) * X[perm[i + 1]] - y[perm[i]] = lam * y[perm[i]] + (1 - lam) * y[perm[i + 1]] - - return X, y - - -def get_oracle_data(X, y, instance_loss, oracle_rate, oracle_drop_rate): - k = int(len(X) * oracle_rate * (1 / (1 - oracle_drop_rate))) - n = int(len(X) * oracle_rate) - idx = np.argsort(instance_loss)[::-1][:k] - idx = np.random.choice(idx, n, replace=False) - oracle_X = X[idx].copy() - oracle_y = y[idx].copy() - - return oracle_X, oracle_y, idx - - -def make_padding(width, cropsize, offset): - left = offset - roi_size = cropsize - left * 2 - if roi_size == 0: - roi_size = cropsize - right = roi_size - (width % roi_size) + left - - return left, right, roi_size - - -def make_training_set(filelist, cropsize, patches, sr, hop_length, offset): - len_dataset = patches * len(filelist) - X_dataset = np.zeros( - (len_dataset, 2, hop_length, cropsize), dtype=np.float32) - y_dataset = np.zeros( - (len_dataset, 2, hop_length, cropsize), dtype=np.float32) - for i, (X_path, y_path) in enumerate(tqdm(filelist)): - p = np.random.uniform() - if p < 0.1: - X_path.replace(os.path.splitext(X_path)[1], '_pitch-1.wav') - y_path.replace(os.path.splitext(y_path)[1], '_pitch-1.wav') - elif p < 0.2: - X_path.replace(os.path.splitext(X_path)[1], '_pitch1.wav') - y_path.replace(os.path.splitext(y_path)[1], '_pitch1.wav') - - X, y = spec_utils.cache_or_load(X_path, y_path, sr, hop_length) - coeff = np.max([X.max(), y.max()]) - X, y = X / coeff, y / coeff - - l, r, roi_size = make_padding(X.shape[2], cropsize, offset) - X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode='constant') - y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode='constant') - - starts = np.random.randint(0, X_pad.shape[2] - cropsize, patches) - ends = starts + cropsize - for j in range(patches): - idx = i * patches + j - X_dataset[idx] = X_pad[:, :, starts[j]:ends[j]] - y_dataset[idx] = y_pad[:, :, starts[j]:ends[j]] - if np.random.uniform() < 0.5: - # swap channel - X_dataset[idx] = X_dataset[idx, ::-1] - y_dataset[idx] = y_dataset[idx, ::-1] - - return X_dataset, y_dataset - - -def make_validation_set(filelist, cropsize, sr, hop_length, offset): - patch_list = [] - outdir = 'cs{}_sr{}_hl{}_of{}'.format(cropsize, sr, hop_length, offset) - os.makedirs(outdir, exist_ok=True) - for i, (X_path, y_path) in enumerate(tqdm(filelist)): - basename = os.path.splitext(os.path.basename(X_path))[0] - - X, y = spec_utils.cache_or_load(X_path, y_path, sr, hop_length) - coeff = np.max([X.max(), y.max()]) - X, y = X / coeff, y / coeff - - l, r, roi_size = make_padding(X.shape[2], cropsize, offset) - X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode='constant') - y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode='constant') - - len_dataset = int(np.ceil(X.shape[2] / roi_size)) - for j in range(len_dataset): - outpath = os.path.join(outdir, '{}_p{}.npz'.format(basename, j)) - start = j * roi_size - if not os.path.exists(outpath): - np.savez( - outpath, - X=X_pad[:, :, start:start + cropsize], - y=y_pad[:, :, start:start + cropsize]) - patch_list.append(outpath) - - return VocalRemoverValidationSet(patch_list) diff --git a/lib_v2/layers.py b/lib_v2/layers.py deleted file mode 100644 index dee340b..0000000 --- a/lib_v2/layers.py +++ /dev/null @@ -1,117 +0,0 @@ -import torch -from torch import nn -import torch.nn.functional as F - -from lib_v2 import spec_utils - - -class Conv2DBNActiv(nn.Module): - - def __init__(self, nin, nout, ksize=3, stride=1, pad=1, dilation=1, activ=nn.ReLU): - super(Conv2DBNActiv, self).__init__() - self.conv = nn.Sequential( - nn.Conv2d( - nin, nout, - kernel_size=ksize, - stride=stride, - padding=pad, - dilation=dilation, - bias=False), - nn.BatchNorm2d(nout), - activ() - ) - - def __call__(self, x): - return self.conv(x) - - -class SeperableConv2DBNActiv(nn.Module): - - def __init__(self, nin, nout, ksize=3, stride=1, pad=1, dilation=1, activ=nn.ReLU): - super(SeperableConv2DBNActiv, self).__init__() - self.conv = nn.Sequential( - nn.Conv2d( - nin, nin, - kernel_size=ksize, - stride=stride, - padding=pad, - dilation=dilation, - groups=nin, - bias=False), - nn.Conv2d( - nin, nout, - kernel_size=1, - bias=False), - nn.BatchNorm2d(nout), - activ() - ) - - def __call__(self, x): - return self.conv(x) - - -class Encoder(nn.Module): - - def __init__(self, nin, nout, ksize=3, stride=1, pad=1, activ=nn.LeakyReLU): - super(Encoder, self).__init__() - self.conv1 = Conv2DBNActiv( - nin, nout, ksize, 1, pad, activ=activ) - self.conv2 = Conv2DBNActiv( - nout, nout, ksize, stride, pad, activ=activ) - - def __call__(self, x): - skip = self.conv1(x) - h = self.conv2(skip) - - return h, skip - - -class Decoder(nn.Module): - - def __init__(self, nin, nout, ksize=3, stride=1, pad=1, dropout=False): - super(Decoder, self).__init__() - self.conv = Conv2DBNActiv(nin, nout, ksize, 1, pad) - self.dropout = nn.Dropout2d(0.1) if dropout else None - - def __call__(self, x, skip=None): - x = F.interpolate(x, scale_factor=2, mode='bilinear', align_corners=True) - if skip is not None: - x = spec_utils.crop_center(x, skip) - h = self.conv(x) - - if self.dropout is not None: - h = self.dropout(h) - - return h - - -class ASPPModule(nn.Module): - - def __init__(self, nin, nout, dilations=(4, 8, 16)): - super(ASPPModule, self).__init__() - self.conv1 = nn.Sequential( - nn.AdaptiveAvgPool2d((1, None)), - Conv2DBNActiv(nin, nin, 1, 1, 0) - ) - self.conv2 = Conv2DBNActiv(nin, nin, 1, 1, 0) - self.conv3 = SeperableConv2DBNActiv( - nin, nin, 3, 1, dilations[0], dilations[0]) - self.conv4 = SeperableConv2DBNActiv( - nin, nin, 3, 1, dilations[1], dilations[1]) - self.conv5 = SeperableConv2DBNActiv( - nin, nin, 3, 1, dilations[2], dilations[2]) - self.bottleneck = nn.Sequential( - Conv2DBNActiv(nin * 5, nout, 1, 1, 0), - nn.Dropout2d(0.1) - ) - - def forward(self, x): - _, _, h, w = x.size() - feat1 = F.interpolate(self.conv1(x), size=(h, w), mode='bilinear', align_corners=True) - feat2 = self.conv2(x) - feat3 = self.conv3(x) - feat4 = self.conv4(x) - feat5 = self.conv5(x) - out = torch.cat((feat1, feat2, feat3, feat4, feat5), dim=1) - bottle = self.bottleneck(out) - return bottle diff --git a/lib_v2/nets.py b/lib_v2/nets.py deleted file mode 100644 index 0a3ff66..0000000 --- a/lib_v2/nets.py +++ /dev/null @@ -1,86 +0,0 @@ -import torch -from torch import nn - -from lib_v2 import layers - - -class BaseASPPNet(nn.Module): - - def __init__(self, nin, ch, dilations=(4, 8, 16)): - super(BaseASPPNet, self).__init__() - self.enc1 = layers.Encoder(nin, ch, 3, 2, 1) - self.enc2 = layers.Encoder(ch, ch * 2, 3, 2, 1) - self.enc3 = layers.Encoder(ch * 2, ch * 4, 3, 2, 1) - self.enc4 = layers.Encoder(ch * 4, ch * 8, 3, 2, 1) - - self.aspp = layers.ASPPModule(ch * 8, ch * 16, dilations) - - self.dec4 = layers.Decoder(ch * (8 + 16), ch * 8, 3, 1, 1) - self.dec3 = layers.Decoder(ch * (4 + 8), ch * 4, 3, 1, 1) - self.dec2 = layers.Decoder(ch * (2 + 4), ch * 2, 3, 1, 1) - self.dec1 = layers.Decoder(ch * (1 + 2), ch, 3, 1, 1) - - def __call__(self, x): - h, e1 = self.enc1(x) - h, e2 = self.enc2(h) - h, e3 = self.enc3(h) - h, e4 = self.enc4(h) - - h = self.aspp(h) - - h = self.dec4(h, e4) - h = self.dec3(h, e3) - h = self.dec2(h, e2) - h = self.dec1(h, e1) - - return h - - -class CascadedASPPNet(nn.Module): - - def __init__(self): - super(CascadedASPPNet, self).__init__() - self.low_band_net = BaseASPPNet(2, 32, ((2, 4), (4, 8), (8, 16))) - self.high_band_net = BaseASPPNet(2, 32, ((2, 4), (4, 8), (8, 16))) - - self.bridge = layers.Conv2DBNActiv(34, 16, 1, 1, 0) - self.full_band_net = BaseASPPNet(16, 32) - - self.out = nn.Sequential( - layers.Conv2DBNActiv(32, 16, 3, 1, 1), - nn.Conv2d(16, 2, 1, bias=False)) - self.aux_out = nn.Conv2d(32, 2, 1, bias=False) - - self.offset = 128 - - def __call__(self, x): - bandw = x.size()[2] // 2 - aux = torch.cat([ - self.low_band_net(x[:, :, :bandw]), - self.high_band_net(x[:, :, bandw:]) - ], dim=2) - - h = torch.cat([x, aux], dim=1) - h = self.full_band_net(self.bridge(h)) - - h = torch.sigmoid(self.out(h)) - aux = torch.sigmoid(self.aux_out(aux)) - - return h, aux - - def predict(self, x): - bandw = x.size()[2] // 2 - aux = torch.cat([ - self.low_band_net(x[:, :, :bandw]), - self.high_band_net(x[:, :, bandw:]) - ], dim=2) - - h = torch.cat([x, aux], dim=1) - h = self.full_band_net(self.bridge(h)) - - h = torch.sigmoid(self.out(h)) - if self.offset > 0: - h = h[:, :, :, self.offset:-self.offset] - assert h.size()[3] > 0 - - return h diff --git a/lib_v2/spec_utils.py b/lib_v2/spec_utils.py deleted file mode 100644 index be61986..0000000 --- a/lib_v2/spec_utils.py +++ /dev/null @@ -1,136 +0,0 @@ -import os - -import librosa -import numpy as np -import soundfile as sf -import torch - - -def crop_center(h1, h2, concat=True): - # s_freq = (h2.shape[2] - h1.shape[2]) // 2 - # e_freq = s_freq + h1.shape[2] - h1_shape = h1.size() - h2_shape = h2.size() - if h2_shape[3] < h1_shape[3]: - raise ValueError('h2_shape[3] must be greater than h1_shape[3]') - s_time = (h2_shape[3] - h1_shape[3]) // 2 - e_time = s_time + h1_shape[3] - h2 = h2[:, :, :, s_time:e_time] - if concat: - return torch.cat([h1, h2], dim=1) - else: - return h2 - - -def calc_spec(X, hop_length): - n_fft = (hop_length - 1) * 2 - audio_left = np.asfortranarray(X[0]) - audio_right = np.asfortranarray(X[1]) - spec_left = librosa.stft(audio_left, n_fft, hop_length=hop_length) - spec_right = librosa.stft(audio_right, n_fft, hop_length=hop_length) - spec = np.asfortranarray([spec_left, spec_right]) - - return spec - - -def mask_uninformative(mask, ref, thres=0.3, min_range=64, fade_area=32): - if min_range < fade_area * 2: - raise ValueError('min_range must be >= fade_area * 2') - idx = np.where(ref.mean(axis=(0, 1)) < thres)[0] - starts = np.insert(idx[np.where(np.diff(idx) != 1)[0] + 1], 0, idx[0]) - ends = np.append(idx[np.where(np.diff(idx) != 1)[0]], idx[-1]) - uninformative = np.where(ends - starts > min_range)[0] - if len(uninformative) > 0: - starts = starts[uninformative] - ends = ends[uninformative] - old_e = None - for s, e in zip(starts, ends): - if old_e is not None and s - old_e < fade_area: - s = old_e - fade_area * 2 - elif s != 0: - start_mask = mask[:, :, s:s + fade_area] - np.clip( - start_mask + np.linspace(0, 1, fade_area), 0, 1, - out=start_mask) - if e != mask.shape[2]: - end_mask = mask[:, :, e - fade_area:e] - np.clip( - end_mask + np.linspace(1, 0, fade_area), 0, 1, - out=end_mask) - mask[:, :, s + fade_area:e - fade_area] = 1 - old_e = e - - return mask - - -def align_wave_head_and_tail(a, b, sr): - a_mono = a[:, :sr * 4].sum(axis=0) - b_mono = b[:, :sr * 4].sum(axis=0) - a_mono -= a_mono.mean() - b_mono -= b_mono.mean() - offset = len(a_mono) - 1 - delay = np.argmax(np.correlate(a_mono, b_mono, 'full')) - offset - - if delay > 0: - a = a[:, delay:] - else: - b = b[:, np.abs(delay):] - if a.shape[1] < b.shape[1]: - b = b[:, :a.shape[1]] - else: - a = a[:, :b.shape[1]] - - return a, b - - -def cache_or_load(mix_path, inst_path, sr, hop_length): - _, mix_ext = os.path.splitext(mix_path) - _, inst_ext = os.path.splitext(inst_path) - spec_mix_path = mix_path.replace(mix_ext, '.npy') - spec_inst_path = inst_path.replace(inst_ext, '.npy') - - if os.path.exists(spec_mix_path) and os.path.exists(spec_inst_path): - X = np.load(spec_mix_path) - y = np.load(spec_inst_path) - else: - X, _ = librosa.load( - mix_path, sr, False, dtype=np.float32, res_type='kaiser_fast') - y, _ = librosa.load( - inst_path, sr, False, dtype=np.float32, res_type='kaiser_fast') - X, _ = librosa.effects.trim(X) - y, _ = librosa.effects.trim(y) - X, y = align_wave_head_and_tail(X, y, sr) - - X = np.abs(calc_spec(X, hop_length)) - y = np.abs(calc_spec(y, hop_length)) - - _, ext = os.path.splitext(mix_path) - np.save(spec_mix_path, X) - np.save(spec_inst_path, y) - - return X, y - - -def spec_to_wav(mag, phase, hop_length): - spec = mag * phase - spec_left = np.asfortranarray(spec[0]) - spec_right = np.asfortranarray(spec[1]) - wav_left = librosa.istft(spec_left, hop_length=hop_length) - wav_right = librosa.istft(spec_right, hop_length=hop_length) - wav = np.asfortranarray([wav_left, wav_right]) - - return wav - - -if __name__ == "__main__": - import sys - X, _ = librosa.load( - sys.argv[1], 44100, False, dtype=np.float32, res_type='kaiser_fast') - y, _ = librosa.load( - sys.argv[2], 44100, False, dtype=np.float32, res_type='kaiser_fast') - X, _ = librosa.effects.trim(X) - y, _ = librosa.effects.trim(y) - X, y = align_wave_head_and_tail(X, y, 44100) - sf.write('test_i.wav', y.T, 44100) - sf.write('test_m.wav', X.T, 44100) - sf.write('test_v.wav', (X - y).T, 44100)