Source code for carat.util

# encoding: utf-8
# pylint: disable=C0103
# pylint: disable=too-many-arguments
"""
Util
====

Signal segmentation
-------------------
.. autosummary::
    :toctree: generated/

    segmentSignal
    beat2signal
    get_time_segment

Time-frequency
------------------
.. autosummary::
    :toctree: generated/

    STFT
    fft2mel
    hz2mel
    mel2hz

Miscellaneous
-------------
.. autosummary::
    :toctree: generated/

    example_audio_file
    example_beats_file
    find_nearest
    deltas
"""


import warnings
import numpy as np
import scipy as sp
import scipy.signal
import scipy.fftpack as fft
import pkg_resources
#import scipy.signal
#import exceptions
from .exceptions import ParameterError

EXAMPLE_AUDIO1 = 'example_data/candombe/csic.1995_ansina1_01.wav'
EXAMPLE_AUDIO2 = 'example_data/samba/[0216] S2-TB2-03-SE.wav'
EXAMPLE_BEATS1 = 'example_data/candombe/csic.1995_ansina1_01.csv'
EXAMPLE_BEATS2 = 'example_data/samba/[0216] S2-TB2-03-SE.beats.txt'


__all__ = ['find_nearest', 'STFT', 'hz2mel', 'mel2hz', 'deltas']


[docs]def find_nearest(array, value): """Find index of the nearest value of an array to a given value Parameters ---------- array (numpy.ndarray) : array value (float) : value Returns ------- idx (int) : index of nearest value in the array """ idx = (np.abs(array-value)).argmin() return idx
[docs]def STFT(x, window_length, hop, windowing_function=sp.hanning, dft_length=None, zp_flag=False): """ Calculates the Short-Time Fourier Transform a signal. Given an input signal, it calculates the DFT of frames of the signal and stores them in bi-dimensional Scipy array. **Args**: window_len (float): length of the window in seconds (must be positive). window (callable): a callable object that receives the window length in samples and returns a numpy array containing the windowing function samples. hop (float): frame hop between adjacent frames in seconds. final_time (positive integer): time (in seconds) up to which the spectrogram is calculated. zp_flag (bool): a flag indicating if the *Zero-Phase Windowing* should be performed. **Returns**: spec: numpy array time: numpy array frequency: numpy array **Raises**: """ # Checking input: if x.ndim != 1: raise AttributeError("Data must be one-dimensional.") # Window length must be odd: if window_length%2 == 0: window_length = window_length + 1 # DFT length is equal the window_len+1 (always even) if dft_length is None: dft_length = window_length + 1 # If dft_length was set by the user, it should always be larger than the window length. if dft_length < window_length: warnings.warn("DFT length is smaller than window length.", RuntimeWarning) # Partitioning the input signal: part_sig = segmentSignal(x, window_length, hop) no_cols = part_sig.shape[1] # Applying the window: window = windowing_function(window_length) win_sig = part_sig * sp.transpose(sp.tile(window, (no_cols, 1))) # Zero-phase windowing: if zp_flag: win_sig = fft.fftshift(win_sig, axes=0) # Taking the FFT of the partitioned signal spec = fft.fftshift(fft.fft(win_sig, n=dft_length, axis=0), axes=0) # Normalizing energy spec /= sp.sum(window) # Calculating time and frequency indices for the data frequency = fft.fftshift(fft.fftfreq(dft_length)) time = sp.arange(no_cols)*float(hop) + ((window_length-1)/2) # Creating output spectrogram return spec, time, frequency
[docs]def segmentSignal(signal, window_len, hop): """ Segmentation of an array-like input: Given an array-like, this function calculates the DFT of frames of the signal and stores them in bi-dimensional Scipy array. **Args**: signal (array-like): object to be windowed. Must be a one-dimensional array-like object. window_len (int): window size in samples. hop (int): frame hop between adjacent frames in seconds. **Returns**: A 2-D numpy array containing the windowed signal. Each element of this array X can be defined as: X[m,n] = x[n+Hm] where, H is the HOP in samples, 0<=n<=N, N = window_len, and 0<m<floor(((len(x)-N)/H)+1). **Raises**: AttributeError if signal is not one-dimensional. ValueError if window_len or hop are not strictly positives. """ if(window_len <= 0 or hop <= 0): raise ValueError("window_len and hop values must be strictly positive numbers.") if signal.ndim != 1: raise AttributeError("Input signal must be one dimensional.") # Calculating the number of columns: no_cols = int(sp.floor((sp.size(signal)-window_len)/float(hop))+1) # Windowing indices (which element goes to which position in the windowed matrix). ind_col = sp.tile(sp.arange(window_len, dtype=np.uint64), (no_cols, 1)) ind_line = sp.tile(sp.arange(no_cols, dtype=np.uint64)*hop, (window_len, 1)) ind = sp.transpose(ind_col) + ind_line # Partitioned signal: part_sig = signal[ind].copy() # Windowing partitioned signal return part_sig
def __get_segment(y, idx_ini, idx_end): """ Get a segment of an array, given by initial and ending indexes. **Args**: y (numpy array): Must be a one-dimensional array. idx_ini (int): initial index. idx_end (int): ending index. **Returns**: segment (numpy array): segment of the signal. **Raises**: AttributeError if y is not a one-dimensional numpy array. ValueError if idx_ini or idx_end fall outside the signal bounds. ValueError if idx_ini >= idx_end. """ if not isinstance(y, np.ndarray): raise ParameterError('Input must be of type numpy.ndarray, ' 'given type(y)={}'.format(type(y))) if y.ndim != 1: raise ParameterError('Input must be one-dimensional, ' 'given y.ndim={}'.format(y.ndim)) if idx_ini >= idx_end: raise ValueError('Ending index is smaller than initial index.' ' idx_ini={:d}, idx_end={:d}'.format(idx_ini, idx_end)) if idx_ini < 0 or idx_end >= y.size: raise ValueError('Index out of signal bounds.' ' y.size={:d}, idx_ini={:d},' ' idx_end={:d}'.format(y.size, idx_ini, idx_end)) y_segment = y[idx_ini:idx_end] return y_segment
[docs]def get_time_segment(y, time, time_ini, time_end): """ Get a segment of an array, given by initial and ending indexes. **Args**: y (numpy array): signal array. Must be a one-dimensional array. time (numpy array): corresponding time. Must be a one-dimensional array. time_ini (int): initial time value. time_end (int): ending time value. **Returns**: segment (numpy array): segment of the signal. **Raises**: AttributeError if y or time is not a one-dimensional numpy array. ValueError if idx_ini or idx_end fall outside the signal bounds. ValueError if idx_ini >= idx_end. """ if not isinstance(y, np.ndarray): raise ParameterError('Input must be of type numpy.ndarray, ' 'given type(y)={}'.format(type(y))) if not isinstance(time, np.ndarray): raise ParameterError('Input must be of type numpy.ndarray, ' 'given type(time)={}'.format(type(time))) if y.ndim != 1: raise ParameterError('Input must be one-dimensional, ' 'given y.ndim={}'.format(y.ndim)) if time.ndim != 1: raise ParameterError('Input must be one-dimensional, ' 'given time.ndim={}'.format(time.ndim)) if time.size != y.size: raise ParameterError('Input y and time must be of the same size, ' 'time.size={:d}, y.size={:d}'.format(time.size, y.size)) if time_ini >= time_end: raise ValueError('Ending time is smaller than initial time.' ' idx_ini={:.3f}, idx_end={:.3f}'.format(time_ini, time_end)) if time_ini < time[0] or time_end > time[-1]: raise ValueError('Time valures are out of signal bounds.' ' time[0]={:.3f}, time[-1]={:.3f}, time_ini={:.3f},\ time_end={:.3f},'.format(time[0], time[-1], time_ini, time_end)) y_segment = y[(time >= time_ini) & (time <= time_end)] return y_segment
[docs]def beat2signal(y, time, beats, ind_beat): """ Get the signal fragment corresponding to a beat given by index ind_beat. If instead of beats, downbeats are used, then a bar is returned. **Args**: y (numpy array): signal array. Must be a one-dimensional array. time (numpy array): corresponding time. Must be a one-dimensional array. beats (numpy array): time instants of the beats. ind_beat (int): index of the desired beat. **Returns**: beat_segment (numpy array): segment of the signal corresponding to the beat. **Raises**: AttributeError if y or time is not a one-dimensional numpy array. ValueError if ind_beat fall outside the beats bounds. """ if not isinstance(y, np.ndarray): raise ParameterError('Input must be of type numpy.ndarray, ' 'given type(y)={}'.format(type(y))) if not isinstance(time, np.ndarray): raise ParameterError('Input must be of type numpy.ndarray, ' 'given type(time)={}'.format(type(time))) if not isinstance(beats, np.ndarray): raise ParameterError('Input must be of type numpy.ndarray, ' 'given type(beats)={}'.format(type(beats))) if y.ndim != 1: raise ParameterError('Input must be one-dimensional, ' 'given y.ndim={}'.format(y.ndim)) if time.ndim != 1: raise ParameterError('Input must be one-dimensional, ' 'given time.ndim={}'.format(time.ndim)) if beats.ndim != 1: raise ParameterError('Input must be one-dimensional, ' 'given beats.ndim={}'.format(beats.ndim)) if time.size != y.size: raise ParameterError('Input y and time must be of the same size, ' 'time.size={:d}, y.size={:d}'.format(time.size, y.size)) if ind_beat >= beats.size or ind_beat < 0: raise ValueError('Index out of bounds.' ' ind_beat={:d}, beats.size={:d}'.format(ind_beat, beats.size)) time_ini = beats[ind_beat] time_end = beats[ind_beat+1] beat_segment = get_time_segment(y, time, time_ini, time_end) return beat_segment
[docs]def fft2mel(freq, nfilts, minfreq, maxfreq): """ This method returns a 2-D Numpy array of weights that map a linearly spaced spectrogram to the Mel scale. **Args**: freq (1-D Numpy array): frequency of the components of the DFT. nfilts (): number of output bands. minfreq (): frequency of the first MEL coefficient. maxfreq (): frequency of the last MEL coefficient. **Returns**: The center frequencies in Hz of the Mel bands. """ minmel = hz2mel(minfreq) maxmel = hz2mel(maxfreq) binfrqs = mel2hz(minmel+sp.arange(nfilts+2)/(float(nfilts)+1)*(maxmel-minmel)) wts = sp.zeros((nfilts, (freq.size))) for i in range(nfilts): slp = binfrqs[i + sp.arange(3)] loslope = (freq - slp[0])/(slp[1] - slp[0]) hislope = (slp[2] - freq)/(slp[2] - slp[1]) wts[i, :] = sp.maximum(0.0, sp.minimum(loslope, hislope)) wts[:, freq < 0] = 0 wts = sp.dot(sp.diag(2./(binfrqs[2+sp.arange(nfilts)]-binfrqs[sp.arange(nfilts)])), wts) binfrqs = binfrqs[1:nfilts+1] return wts, binfrqs
[docs]def hz2mel(f_hz): """ Converts a given frequency in Hz to the Mel scale. **Args**: f_hz (Numpy array): Array containing the frequencies in HZ that should be converted. **Returns**: A Numpy array (of same shape as f_zh) containing the converted frequencies. """ f_0 = 0 f_sp = 200.0/3.0 # Log step brkfrq = 1000.0 # Frequency above which the distribution stays linear. brkpt = (brkfrq - f_0)/f_sp # First Mel value for linear region. logstep = sp.exp(sp.log(6.4)/27) # Step in the log region z_mel = sp.where(f_hz < brkfrq, (f_hz - f_0)/f_sp, brkpt + (sp.log(f_hz/brkfrq))/sp.log(logstep)) return z_mel
[docs]def mel2hz(z_mel): """ Converts a given frequency in the Mel scale to Hz scale. **Args**: z_mel (Numpy array): Array of frequencies in the Mel scale that should be converted. **Returns**: A Numpy array (of same shape as z_mel) containing the converted frequencies. """ f_0 = 0 f_sp = 200.0/3.0 brkfrq = 1000.0 brkpt = (brkfrq - f_0)/f_sp logstep = sp.exp(sp.log(6.4)/27) f_hz = sp.where(z_mel < brkpt, f_0 + f_sp*z_mel, brkfrq*sp.exp(sp.log(logstep)*(z_mel-brkpt))) return f_hz
[docs]def deltas(x, w=3): """ this function estimates the derivative of x """ if x.ndim == 1: y = x.reshape((-1, 1)).T else: y = x if not w%2: w -= 1 hlen = int(sp.floor(w/2)) if w == 1: # first-order difference: win = sp.r_[1, -1] else: win = sp.r_[hlen:-hlen-1:-1] # Extending the input data (avoid border problems) extended_x = sp.c_[sp.repeat(y[:, 0].reshape((-1, 1)), hlen, axis=1), y, sp.repeat(y[:, -1].reshape((-1, 1)), hlen, axis=1)] d = scipy.signal.lfilter(win, 1, extended_x, axis=1) d = d[:, 2*hlen:] if x.ndim == 1: d = d[0, :] return d
def getValidKeywords(kw, func): """ This function returns a dictionary containing the keywords arguments in initial_kw that are valid for function func. """ import inspect valid_kw = {} invalid_kw = kw.copy() # args, varargs, varkw, defaults = inspect.getargspec(func) # NOTE: inspect.getargspec was deprecated, and inspect.getfullargspec is suggested as the # standard interface for single-source Python 2/3 code migrating away from legacy getargspec() args = inspect.getfullargspec(func) #for k, v in kw.iteritems(): for k, v in kw.items(): if k in args.args: valid_kw[k] = v del invalid_kw[k] return valid_kw, invalid_kw
[docs]def example_audio_file(num_file=None): '''Get the path to an included audio example file. Examples -------- >>> # Load the waveform from the default example track >>> y, sr = carat.audio.load(carat.util.example_audio_file()) >>> # Load 10 seconds of the waveform from the example track number 1 >>> y, sr = carat.audio.load(carat.util.example_audio_file(num_file=1), duration=10.0)) >>> # Load the waveform from the example track number 2 >>> y, sr = carat.audio.load(carat.util.example_audio_file(num_file=2)) Parameters ---------- num_file : int Number to select among the example files available. Returns ------- filename : str Path to the audio example file included with `carat`. ''' if num_file == 1: EXAMPLE_AUDIO = EXAMPLE_AUDIO1 elif num_file == 2: EXAMPLE_AUDIO = EXAMPLE_AUDIO2 else: EXAMPLE_AUDIO = EXAMPLE_AUDIO2 return pkg_resources.resource_filename(__name__, EXAMPLE_AUDIO)
[docs]def example_beats_file(num_file=None): '''Get the path to an included example file of beats annotations. Examples -------- >>> # Load beats and downbeats from the example audio file number 1 >>> beats, b_labs = carat.load_beats(carat.util.example_beats_file(num_file=1)) >>> downbeats, d_labs = carat.load_downbeats(carat.util.example_beats_file(num_file=1)) Parameters ---------- num_file : int Number to select among the example files available. Returns ------- filename : str Path to the beats annotations example file included with `carat`. ''' if num_file == 1: EXAMPLE_BEATS = EXAMPLE_BEATS1 elif num_file == 2: EXAMPLE_BEATS = EXAMPLE_BEATS2 else: EXAMPLE_BEATS = EXAMPLE_BEATS2 return pkg_resources.resource_filename(__name__, EXAMPLE_BEATS)