diff --git a/src/dspeed/processors/__init__.py b/src/dspeed/processors/__init__.py index 12bc5d9..fde47cf 100644 --- a/src/dspeed/processors/__init__.py +++ b/src/dspeed/processors/__init__.py @@ -92,7 +92,11 @@ from .soft_pileup_corr import soft_pileup_corr, soft_pileup_corr_bl from .svm import svm_predict from .time_over_threshold import time_over_threshold -from .time_point_thresh import interpolated_time_point_thresh, time_point_thresh +from .time_point_thresh import ( + interpolated_time_point_thresh, + multi_time_point_thresh, + time_point_thresh, +) from .transfer_function_convolver import transfer_function_convolver from .trap_filters import asym_trap_filter, trap_filter, trap_norm, trap_pickoff from .upsampler import interpolating_upsampler, upsampler @@ -143,6 +147,7 @@ "svm_predict", "time_point_thresh", "interpolated_time_point_thresh", + "multi_time_point_thresh", "asym_trap_filter", "trap_filter", "trap_norm", diff --git a/src/dspeed/processors/time_point_thresh.py b/src/dspeed/processors/time_point_thresh.py index f5de8c1..30595ac 100644 --- a/src/dspeed/processors/time_point_thresh.py +++ b/src/dspeed/processors/time_point_thresh.py @@ -71,7 +71,7 @@ def time_point_thresh( t_out[0] = i return else: - for i in range(int(t_start), 1, -1): + for i in range(int(t_start), 0, -1): if w_in[i - 1] < a_threshold <= w_in[i]: t_out[0] = i return @@ -115,13 +115,20 @@ def interpolated_time_point_thresh( * ``i`` -- integer `t_in`; equivalent to :func:`~.dsp.processors.fixed_sample_pickoff` - * ``f`` -- floor; interpolated values are at previous neighbor, so - threshold crossing is at next neighbor - * ``c`` -- ceiling, interpolated values are at next neighbor, so - threshold crossing is at previous neighbor + * ``b`` -- before; closest integer sample before threshold crossing + * ``a`` -- after; closest integer sample after threshold crossing + * ``r`` -- round; round to nearest integer sample to threshold crossing + * ``l`` -- linear interpolation + + The following modes are meant to mirror the options + dspeed.upsampler + + * ``f`` -- floor; interpolated values are at previous neighbor. + Equivalent to ``a`` + * ``c`` -- ceiling, interpolated values are at next neighbor. + Equivalent to ``b`` * ``n`` -- nearest-neighbor interpolation; threshold crossing is half-way between samples - * ``l`` -- linear interpolation * ``h`` -- Hermite cubic spline interpolation (*not implemented*) * ``s`` -- natural cubic spline interpolation (*not implemented*) t_out @@ -169,10 +176,15 @@ def interpolated_time_point_thresh( if mode_in == ord("i"): # return index before crossing t_out[0] = i_cross - elif mode_in == ord("f"): # return index after crossing + elif mode_in in (ord("a"), ord("f")): # return index after crossing t_out[0] = i_cross + 1 - elif mode_in == ord("c"): # return index before crossing + elif mode_in in (ord("b"), ord("c")): # return index before crossing t_out[0] = i_cross + elif mode_in == ord("r"): # return closest index to crossing + if abs(a_threshold - w_in[i_cross]) < abs(a_threshold - w_in[i_cross + 1]): + t_out[0] = i_cross + else: + t_out[0] = i_cross + 1 elif mode_in == ord("n"): # nearest-neighbor; return half-way between samps t_out[0] = i_cross + 0.5 elif mode_in == ord("l"): # linear @@ -181,3 +193,177 @@ def interpolated_time_point_thresh( ) else: raise DSPFatal("Unrecognized interpolation mode") + + +@guvectorize( + [ + "void(float32[:], float32[:], float32, float32, char, float32[:])", + "void(float64[:], float64[:], float64, float64, char, float64[:])", + ], + "(n),(m),(),(),()->(m)", + **nb_kwargs, +) +def multi_time_point_thresh( + w_in: np.ndarray, + a_threshold: np.ndarray, + t_start: int, + polarity: int, + mode_in: np.int8, + t_out: np.ndarray, +) -> None: + """Find the time where the waveform value crosses the threshold + + Search performed walking either forward or backward from the starting + index. Use interpolation to estimate a time between samples. Interpolation + mode selected with `mode_in`. + + Parameters + ---------- + w_in + the input waveform. + a_threshold + list of threshold values. + t_start + the starting index. + polarity + is the average slope of the waveform up (>0) or down (<0) in the + search region; only sign matters, not value. Raise Exception if 0. + mode_in + Character selecting which interpolation method to use. Note this + must be passed as a ``int8``, e.g. ``ord('i')``. Options: + + * ``i`` -- integer `t_in`; equivalent to + :func:`~.dsp.processors.fixed_sample_pickoff` + * ``b`` -- before; closest integer sample before threshold crossing + * ``a`` -- after; closest integer sample after threshold crossing + * ``r`` -- round; round to nearest integer sample to threshold crossing + * ``l`` -- linear interpolation + + The following modes are meant to mirror the options + dspeed.upsampler + + * ``f`` -- floor; interpolated values are at previous neighbor. + Equivalent to ``a`` + * ``c`` -- ceiling, interpolated values are at next neighbor. + Equivalent to ``b`` + * ``n`` -- nearest-neighbor interpolation; threshold crossing is + half-way between samples + * ``h`` -- Hermite cubic spline interpolation (*not implemented*) + * ``s`` -- natural cubic spline interpolation (*not implemented*) + t_out + the index where the waveform value crosses the threshold. + + JSON Configuration Example + -------------------------- + + .. code-block :: json + + "tp_0": { + "function": "time_point_thresh", + "module": "dspeed.processors", + "args": ["wf_atrap", "bl_std", "tp_start", 0, "'l'", "tp_0"], + "unit": "ns" + } + """ + t_out[:] = np.nan + + if np.isnan(w_in).any() or np.isnan(a_threshold).any() or np.isnan(t_start): + return + + if t_start < 0 or t_start >= len(w_in): + return + + # make polarity +/- 1 + if polarity > 0: + polarity = 1 + elif polarity < 0: + polarity = -1 + else: + raise DSPFatal("polarity cannot be 0") + + sorted_idx = np.argsort(a_threshold) + + # Get initial values for search + t_start = int(t_start) + a_start = w_in[t_start] + i_start = len(sorted_idx) + for i in range(len(sorted_idx)): + if a_threshold[sorted_idx[i]] >= a_start: + i_start = i + break + + # Search for timepoints at larger values + i_tp = i_start + if i_tp < len(sorted_idx): + idx = sorted_idx[i_tp] + for i_wf in range(t_start, len(w_in) - 1 if polarity > 0 else -1, polarity): + if i_tp >= len(sorted_idx): + break + while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]: + if mode_in == ord("i"): # return index closest to start of search + t_out[idx] = i_wf + elif mode_in in (ord("a"), ord("f")): # return index after crossing + t_out[idx] = i_wf if polarity < 0 else i_wf + 1 + elif mode_in in (ord("b"), ord("c")): # return index before crossing + t_out[idx] = i_wf if polarity > 0 else i_wf - 1 + elif mode_in == ord("r"): # round; return closest index + if ( + a_threshold[idx] - w_in[i_wf] + < w_in[i_wf + polarity] - a_threshold[sorted_idx[i_tp]] + ): + t_out[idx] = i_wf + else: + t_out[idx] = i_wf + polarity + elif mode_in == ord( + "n" + ): # nearest-neighbor; return half-way between samps + t_out[idx] = i_wf + 0.5 * polarity + elif mode_in == ord("l"): # linear + t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / ( + w_in[i_wf + polarity] - w_in[i_wf] + ) + else: + raise DSPFatal("Unrecognized interpolation mode") + i_tp += 1 + if i_tp >= len(sorted_idx): + break + idx = sorted_idx[i_tp] + + # Search for timepoints at smaller values + i_tp = i_start - 1 + if i_tp >= 0: + idx = sorted_idx[i_tp] + for i_wf in range( + t_start - 1, len(w_in) - 1 if polarity < 0 else -1, -polarity + ): + if i_tp < 0: + break + while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]: + if mode_in == ord("i"): # return index closest to start of search + t_out[idx] = i_wf + elif mode_in in (ord("a"), ord("f")): # return index after crossing + t_out[idx] = i_wf if polarity < 0 else i_wf + 1 + elif mode_in in (ord("b"), ord("c")): # return index before crossing + t_out[idx] = i_wf if polarity > 0 else i_wf - 1 + elif mode_in == ord("r"): # round; return closest index + if ( + a_threshold[idx] - w_in[i_wf] + < w_in[i_wf + polarity] - a_threshold[idx] + ): + t_out[idx] = i_wf + else: + t_out[idx] = i_wf + polarity + elif mode_in == ord( + "n" + ): # nearest-neighbor; return half-way between samps + t_out[idx] = i_wf + 0.5 * polarity + elif mode_in == ord("l"): # linear + t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / ( + w_in[i_wf + polarity] - w_in[i_wf] + ) + else: + raise DSPFatal("Unrecognized interpolation mode") + i_tp -= 1 + if i_tp < 0: + break + idx = sorted_idx[i_tp]