Skip to content

Commit

Permalink
Merge pull request #48 from eric-brandao/development
Browse files Browse the repository at this point in the history
necessary changes in deconvolution with regularization, and other stuff
  • Loading branch information
PyTTaMaster authored Aug 12, 2024
2 parents 84321c2 + ba3790f commit 1d30852
Show file tree
Hide file tree
Showing 8 changed files with 20,105 additions and 17 deletions.
19,891 changes: 19,891 additions & 0 deletions examples/RoomIR_notebook_demo.ipynb

Large diffs are not rendered by default.

75 changes: 75 additions & 0 deletions examples/deconv_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Aug 7 08:44:21 2024
@author: Eric Brandao
Deconvolution - some methods
"""

import pytta
import numpy as np
import matplotlib.pyplot as plt

#%% Loading measured files
xt_dict = pytta.load('xt.hdf5')
xt = xt_dict[list(xt_dict.keys())[0]] # this is a signal object

yt_dict = pytta.load('yt.hdf5')
yt = yt_dict[list(yt_dict.keys())[0]] # this is a signal object
yt_split = yt.split()
#%% Deconv - regularized sweep as it is in class
ht = pytta.ImpulsiveResponse(excitation = xt, recording = yt_split[0],
samplingRate = xt.samplingRate, regularization = True,
freq_limits = [100, 10000], method = 'linear')

#%% Testing methods
ht_naive = ht._naive_deconv(xt, yt_split[0])
ht_regu = ht._regularized_deconv(xt, yt_split[0], freq_limits = [100, 10000])
ht_regu_zp = ht._regularized_zp_deconv(xt, yt_split[0], freq_limits = [100, 10000], num_zeros = None)
ht_welch_h1 = ht._welch_h1_deconv(xt, yt_split[0],winType = 'hann',
winSize = 2**16, overlap = 0.6)

#%% Plots
plt.figure(figsize = (8, 4))
plt.plot(ht.IR.timeVector, 20*np.log10(np.abs(ht.IR.timeSignal)/np.amax(np.abs(ht.IR.timeSignal))),
alpha = 1, label = 'original')
# plt.plot(ht_regu.timeVector, 20*np.log10(np.abs(ht_regu.timeSignal)/np.amax(np.abs(ht_regu.timeSignal))),
# alpha = 0.4, label = 'regularized')
# plt.plot(ht_regu_zp.timeVector, 20*np.log10(np.abs(ht_regu_zp.timeSignal)/np.amax(np.abs(ht_regu_zp.timeSignal))),
# alpha = 0.8, label = 'regularized')
plt.plot(ht_welch_h1.timeVector, 20*np.log10(np.abs(ht_welch_h1.timeSignal)/np.amax(np.abs(ht_welch_h1.timeSignal))),
alpha = 0.8, label = 'H1')

# plt.plot(ht_naive.timeVector, 20*np.log10(np.abs(ht_naive.timeSignal)/np.amax(np.abs(ht_naive.timeSignal))),
# alpha = 0.4, label = 'naive')
plt.legend()
plt.xlabel('Time (s)')
plt.ylabel('Magnitude (dB)')
#plt.xlim((-0.1, ht.IR.timeVector[-1]))
plt.ylim((-80, 10))
plt.grid()
plt.tight_layout()

#%%
plt.figure(figsize = (8, 4))
plt.semilogx(ht.IR.freqVector, 20*np.log10(np.abs(ht.IR.freqSignal)),
alpha = 1, label = 'original')
# plt.plot(ht_regu.timeVector, 20*np.log10(np.abs(ht_regu.timeSignal)/np.amax(np.abs(ht_regu.timeSignal))),
# alpha = 0.4, label = 'regularized')
plt.plot(ht_regu_zp.freqVector, 20*np.log10(np.abs(ht_regu_zp.freqSignal)),
alpha = 0.8, label = 'regularized w/ zp')
plt.plot(ht_welch_h1.freqVector, 20*np.log10(np.abs(ht_welch_h1.freqSignal)),
alpha = 0.8, label = 'H1')

# plt.plot(ht_naive.timeVector, 20*np.log10(np.abs(ht_naive.timeSignal)/np.amax(np.abs(ht_naive.timeSignal))),
# alpha = 0.4, label = 'naive')
plt.legend()
plt.xlabel('Time (s)')
plt.ylabel('Magnitude (dB)')
plt.xlim((100, 10000))
plt.ylim((-80, 10))
plt.grid()
plt.tight_layout()


2 changes: 1 addition & 1 deletion examples/rec_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

measurementParams = {
'lengthDomain': 'time',
'timeLength': 3,
'timeLength': 10,
'samplingRate': 48000,
'freqMin': 20,
'freqMax': 20000,
Expand Down
4 changes: 2 additions & 2 deletions examples/reverberation_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

myRT = pytta.RoomAnalysis(myIRsignal,
nthOct=3,
minFreq=float(2e1),
maxFreq=float(2e4))
minFreq=float(100),
maxFreq=float(10000))
fig = myRT.plot_T30()
fig.show()
Binary file added examples/xt.hdf5
Binary file not shown.
Binary file added examples/yt.hdf5
Binary file not shown.
6 changes: 3 additions & 3 deletions pytta/classes/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ def __init__(self, ir: SignalObj, nthOct: int = 1,
nbands = maxBand - minBand + 1
super().__init__('mixed', nthOct, minFreq, maxFreq, nbands*[0], *args, **kwargs)
self.ir = crop_IR(_ir, ircut)
self._params = self.estimate_energy_parameters(self.ir, self.bands, plotLundeby,
self._params, self.listEDC, self.fhSignal = self.estimate_energy_parameters(self.ir, self.bands, plotLundeby,
bypassLundeby, suppressWarnings,
nthOct=nthOct, minFreq=minFreq,
maxFreq=maxFreq)
Expand Down Expand Up @@ -1006,7 +1006,7 @@ def estimate_energy_parameters(ir: SignalObj, bands: np.ndarray,
params['T20'] = reverberation_time(20, listEDC)
params['T30'] = reverberation_time(30, listEDC)
# self._params['BR'], self._params['TR'] = timbre_ratios(self.T20)
return params
return params, listEDC, fhSignal

@property
def parameters(self):
Expand Down Expand Up @@ -1609,7 +1609,7 @@ def central_time(sqrIR: np.ndarray, tstamp: np.ndarray) -> np.ndarray:
"""
sumSIR = sqrIR.sum(axis=0)
sumTSIR = (tstamp[:, None] * sqrIR).sum(axis=0)
central_time = (sumTSIR / sumSIR) * 1000 # milisseconds
central_time = (sumTSIR / sumSIR) #* 1000 # milisseconds
return central_time


Expand Down
144 changes: 133 additions & 11 deletions pytta/classes/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,7 @@ class ImpulsiveResponse(_base.PyTTaObj):

def __init__(self, excitation=None, recording=None,
method='linear', winType=None, winSize=None, overlap=None,
regularization=True, ir=None, *args, **kwargs):
regularization=True, freq_limits = None, ir=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if excitation is None or recording is None:
if ir is None:
Expand Down Expand Up @@ -1328,8 +1328,8 @@ def __init__(self, excitation=None, recording=None,
winType=winType,
winSize=winSize,
overlap=overlap,
regularization=
regularization)
regularization=regularization,
freq_limits = freq_limits)
return

def __repr__(self):
Expand Down Expand Up @@ -1413,16 +1413,26 @@ def _to_dict(self):
out = {'methodInfo': self.methodInfo}
return out

def _calculate_regu_spk(self, inputSignal, outputSignal):
def _calculate_regu_spk(self, inputSignal, outputSignal, freq_limits = None):
""" Computes regularized spectrum
Parameters
-----------------
freq_limits : None or list with 2 values
List with desired freqMin and freqMax of your regularized IR. None is default, in which case
freqMin and freqMax will be computed from properties of the inputSignal and outputSignal
"""
data = _make_pk_spectra(inputSignal.freqSignal)
outputFreqSignal = _make_pk_spectra(outputSignal.freqSignal)
freqVector = inputSignal.freqVector
b = data * 0 + 10**(-200/20) # inside signal freq range
a = data * 0 + 1 # outside signal freq range
minFreq = np.max([inputSignal.freqMin, outputSignal.freqMin])
maxFreq = np.min([inputSignal.freqMax, outputSignal.freqMax])
if freq_limits is None:
minFreq = np.max([inputSignal.freqMin, outputSignal.freqMin])
maxFreq = np.min([inputSignal.freqMax, outputSignal.freqMax])
else:
minFreq = freq_limits[0]
maxFreq = freq_limits[1]
# Calculate epsilon
eps = self._crossfade_spectruns(a, b,
[minFreq/np.sqrt(2),
Expand All @@ -1446,10 +1456,120 @@ def _calculate_regu_spk(self, inputSignal, outputSignal):
inputSignal.samplingRate,
signalType='energy')
return C

def _naive_deconv(self, inputSignal, outputSignal):
""" Deconvolution via naive method
H(jw) = Y(jw)/X(jw)
no zero padding is performed
"""
result = outputSignal / inputSignal
return result

def _regularized_deconv(self, inputSignal, outputSignal, freq_limits = None):
""" Deconvolution with regularization
H = Y(jw)*C(jw), with C(jw) = conj(X(jw))/(X(jw)+ e(jw))
no zero padding is performed
"""
C = self._calculate_regu_spk(inputSignal, outputSignal, freq_limits = freq_limits)
result = outputSignal * C
return result

def _regularized_zp_deconv(self, inputSignal, outputSignal, freq_limits = None,
num_zeros = None):
""" Deconvolution with regularization
H = Y(jw)*C(jw), with C(jw) = conj(X(jw))/(X(jw)+ e(jw))
zero padding is performed
"""
if num_zeros is None:
num_zeros = inputSignal.timeSignal.shape[0]

inputSignal_zp = inputSignal.zero_padding(num_zeros = num_zeros)
outputSignal_zp = outputSignal.zero_padding(num_zeros = num_zeros)

C = self._calculate_regu_spk(inputSignal_zp, outputSignal_zp, freq_limits = freq_limits)
result = outputSignal_zp * C
return result

def _deconv_invfilter(self, inputSignal, outputSignal):
""" deconvolve by generating time response of an inverse filter for the sweep
"""
R = np.log(inputSignal.freqMax/inputSignal.freqMin)

# Inverse filter
k = np.exp(inputSignal.timeVector*R/inputSignal.timeVector[-1])
f = inputSignal.timeSignal.flatten()[::-1]/k
f = f/np.amax(np.abs(f))
# Impulse response
#outputSignal.timeSignal.flatten()
ht = ss.fftconvolve(outputSignal.timeSignal.flatten(), f, mode='same')#*inputSignal.numSamples/inputSignal.samplingRate
ht = np.roll(ht, shift = int(len(ht)/2))
result = SignalObj(signalArray = ht, domain='time', signalType = 'power',
samplingRate = inputSignal.samplingRate, freqMin = inputSignal.freqMin,
freqMax = inputSignal.freqMax)
return result


def _welch_h1_deconv(self, inputSignal, outputSignal, winType = None,
winSize = None, overlap = None):
""" Deconvolution with H1 statistical estimator
Welch's method is used
"""

if winType is None:
winType = 'hann'
if winSize is None:
winSize = inputSignal.samplingRate//2
if overlap is None:
overlap = 0.5
# sig1.shape[0]
result = SignalObj(np.zeros((inputSignal.timeSignal.shape[0]//2 + 1,
outputSignal.freqSignal.shape[1])),
domain='freq',
samplingRate=inputSignal.samplingRate,
signalType='energy')
if outputSignal.numChannels > 1:
if inputSignal.numChannels > 1:
if inputSignal.numChannels\
!= outputSignal.numChannels:
raise ValueError("Both signal-like objects must have\
the same number of channels.")
for channel in range(outputSignal.numChannels):
XY, XX = self._calc_csd_tf(
inputSignal.timeSignal[:, channel],
outputSignal.timeSignal[:, channel],
inputSignal.samplingRate,
winType, winSize, winSize*overlap)
result.freqSignal[:, channel] \
= np.array(XY/XX, ndmin=2).T
else:
for channel in range(outputSignal.numChannels):
XY, XX = self._calc_csd_tf(
inputSignal.timeSignal,
outputSignal.timeSignal[:, channel],
inputSignal.samplingRate,
winType, winSize, winSize*overlap)
result.freqSignal[:, channel] \
= np.array(XY/XX, ndmin=2).T
else:
XY, XX = self._calc_csd_tf(
inputSignal.timeSignal,
outputSignal.timeSignal,
inputSignal.samplingRate,
winType, winSize, winSize*overlap)
result.freqSignal = np.array(XY/XX, ndmin=2).T
return result


def _calculate_tf_ir(self, inputSignal, outputSignal, method='linear',
winType=None, winSize=None, overlap=None,
regularization=False):
regularization=False, freq_limits = None):
if type(inputSignal) is not type(outputSignal):
raise TypeError("Only signal-like objects can become an \
Impulsive Response.")
Expand All @@ -1459,7 +1579,7 @@ def _calculate_tf_ir(self, inputSignal, outputSignal, method='linear',

if method == 'linear':
if regularization:
C = self._calculate_regu_spk(inputSignal, outputSignal)
C = self._calculate_regu_spk(inputSignal, outputSignal, freq_limits = freq_limits)
result = outputSignal * C
else:
result = outputSignal / inputSignal
Expand All @@ -1471,7 +1591,7 @@ def _calculate_tf_ir(self, inputSignal, outputSignal, method='linear',
outputSignal.timeSignal.shape[0])

if regularization:
C = self._calculate_regu_spk(inputSignal_zp, outputSignal_zp)
C = self._calculate_regu_spk(inputSignal_zp, outputSignal_zp, freq_limits = freq_limits)
result = outputSignal_zp * C
else:
result = outputSignal_zp / inputSignal_zp
Expand Down Expand Up @@ -1660,10 +1780,12 @@ def _calc_csd_tf(self, sig1, sig2, samplingRate, windowName,
numberOfSamples, overlapSamples):
f, S11 = ss.csd(sig1, sig1, samplingRate, window=windowName,
nperseg=numberOfSamples, noverlap=overlapSamples,
axis=0)
nfft = sig1.shape[0],
axis=0, scaling = 'spectrum')
f, S12 = ss.csd(sig1, sig2, samplingRate, window=windowName,
nperseg=numberOfSamples, noverlap=overlapSamples,
axis=0)
nfft = sig1.shape[0],
axis=0, scaling = 'spectrum')
del f
return S12, S11

Expand Down

0 comments on commit 1d30852

Please sign in to comment.