From 6ec82e6b957ca7e39b1d4673356f8d91b1cda96b Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 13:35:07 +0100 Subject: [PATCH 01/14] now producing raw_records_ext for external data --- amstrax/plugins/raw_records/daqreader.py | 481 ++++++++++++++--------- 1 file changed, 291 insertions(+), 190 deletions(-) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index c08e9048..7f10b8ea 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -1,19 +1,21 @@ -import glob import os +import glob import warnings +from typing import Tuple from collections import Counter from immutabledict import immutabledict + import numpy as np import numba - import strax -# it was me - export, __all__ = strax.exporter() -__all__ += ['ARTIFICIAL_DEADTIME_CHANNEL'] +__all__.extend(["ARTIFICIAL_DEADTIME_CHANNEL"]) + +# Just below the TPC acquisition monitor, see +# https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq:channel_groups +ARTIFICIAL_DEADTIME_CHANNEL = 799 -ARTIFICIAL_DEADTIME_CHANNEL = 40 class ArtificialDeadtimeInserted(UserWarning): pass @@ -23,170 +25,147 @@ class ArtificialDeadtimeInserted(UserWarning): @strax.takes_config( # All these must have track=False, so the raw_records hash never changes! # DAQ settings -- should match settings given to redax - strax.Option('record_length', default=110, track=False, type=int, - help="Number of samples per raw_record"), - strax.Option('max_digitizer_sampling_time', - default=10, track=False, type=int, - help="Highest interval time of the digitizer sampling times(s) used."), - strax.Option('run_start_time', type=float, track=False, default=0, - help="time of start run (s since unix epoch)"), - strax.Option('daq_chunk_duration', track=False, - default=int(5e9), type=int, - help="Duration of regular chunks in ns"), - strax.Option('daq_overlap_chunk_duration', track=False, - default=int(5e8), type=int, - help="Duration of intermediate/overlap chunks in ns"), - strax.Option('daq_compressor', default="lz4", track=False, - help="Algorithm used for (de)compressing the live data"), - strax.Option('readout_threads', type=dict, track=False, - help="Dictionary of the readout threads where the keys " - "specify the reader and value the number of threads"), - strax.Option('daq_input_dir', type=str, track=False, - help="Directory where readers put data"), + strax.Option( + "record_length", default=110, track=False, type=int, help="Number of samples per raw_record" + ), + strax.Option( + "max_digitizer_sampling_time", + default=10, + track=False, + type=int, + help="Highest interval time of the digitizer sampling times(s) used.", + ), + strax.Option( + "run_start_time", + type=float, + track=False, + default=0, + help="time of start run (s since unix epoch)", + ), + strax.Option( + "daq_chunk_duration", + track=False, + default=int(5e9), + type=int, + help="Duration of regular chunks in ns", + ), + strax.Option( + "daq_overlap_chunk_duration", + track=False, + default=int(5e8), + type=int, + help="Duration of intermediate/overlap chunks in ns", + ), + strax.Option( + "daq_compressor", + default="lz4", + track=False, + help="Algorithm used for (de)compressing the live data", + ), + strax.Option( + "readout_threads", + type=dict, + track=False, + help=( + "Dictionary of the readout threads where the keys " + "specify the reader and value the number of threads" + ), + ), + strax.Option("daq_input_dir", type=str, track=False, help="Directory where readers put data"), # DAQReader settings - strax.Option('safe_break_in_pulses', default=1000, track=False, infer_type=False, - help="Time (ns) between pulses indicating a safe break " - "in the datastream -- gaps of this size cannot be " - "interior to peaklets."), - strax.Option('channel_map', track=False, type=immutabledict, infer_type=False, - help="immutabledict mapping subdetector to (min, max) " - "channel number.")) + strax.Option( + "safe_break_in_pulses", + default=1000, + track=False, + infer_type=False, + help=( + "Time (ns) between pulses indicating a safe break " + "in the datastream -- gaps of this size cannot be " + "interior to peaklets." + ), + ), + strax.Option( + "channel_map", + track=False, + type=immutabledict, + infer_type=False, + help="immutabledict mapping subdetector to (min, max) channel number.", + ), +) class DAQReader(strax.Plugin): - """ - Read the XAMS DAQ-live_data from redax and split it to the - appropriate raw_record data-types based on the channel-map. + """Read the XENONnT DAQ-live_data from redax and split it to the appropriate raw_record data- + types based on the channel-map. + Does nothing whatsoever to the live_data; not even baselining. - Provides: - - raw_records + + Provides: + - raw_records: (tpc)raw_records. + - raw_records_ext: (external)raw_records. + """ - depends_on = tuple() - parallel = 'process' - rechunk_on_save = False - # never change the version! - __version__ = '0.0.0' - compressor = 'zstd' + provides: Tuple[str, ...] = ( + "raw_records_ext", + "raw_records", # raw_records has to be last due to lineage + ) - provides = 'raw_records' - data_kind = 'raw_records' + data_kind = immutabledict(zip(provides, provides)) + depends_on: Tuple = tuple() + parallel = "process" + chunk_target_size_mb = 50 + rechunk_on_save = immutabledict( + raw_records=False, + raw_records_ext=False, + ) + compressor = "lz4" + __version__ = "0.0.0" + input_timeout = 300 def infer_dtype(self): - if not self.multi_output: - return strax.raw_record_dtype( - samples_per_record=self.config["record_length"]) - return { - d: strax.raw_record_dtype( - samples_per_record=self.config["record_length"]) - for d in self.provides} - - - def compute(self, chunk_i): - dt_central = self.config['daq_chunk_duration'] - dt_overlap = self.config['daq_overlap_chunk_duration'] - - t_start = chunk_i * (dt_central + dt_overlap) - t_end = t_start + dt_central - - pre, current, post = self._chunk_paths(chunk_i) - r_pre, r_post = None, None - break_pre, break_post = t_start, t_end - - if pre: - if chunk_i == 0: - warnings.warn( - f"DAQ is being sloppy: there should be no pre dir {pre} " - f"for chunk 0. We're ignoring it.", - UserWarning) - else: - r_pre, break_pre = self._load_chunk( - path=pre, - start=t_start - dt_overlap, - end=t_start, - kind='pre') - - r_main, _ = self._load_chunk( - path=current, - start=t_start, - end=t_end, - kind='central') - - if post: - r_post, break_post = self._load_chunk( - path=post, - start=t_end, - end=t_end + dt_overlap, - kind='post') - - # Concatenate the result. - records = np.concatenate([ - x for x in (r_pre, r_main, r_post) - if x is not None]) - - # Convert to strax chunks - dt = records['dt'] - - # Convert time to time in ns since unix epoch. - # Ensure the offset is a whole digitizer sample - records["time"] += dt * (self.t0 // dt) - - result_name = 'raw_records' - result = self.chunk( - start=self.t0 + break_pre, - end=self.t0 + break_post, - data=records, - data_type=result_name) - - print(f"Read chunk {chunk_i:06d} from DAQ") - # Print data rate / data type if any - print(f"\t{result}") - - return result - - - # The following functions are copied unchanged from straxen - # https://github.com/XENONnT/straxen/blob/master/straxen/plugins/raw_records/daqreader.py - # copied on 23 January 2023 + d: strax.raw_record_dtype(samples_per_record=self.config["record_length"]) + for d in self.provides + } def setup(self): - self.t0 = int(self.config['run_start_time']) * int(1e9) - self.dt_max = self.config['max_digitizer_sampling_time'] - self.n_readout_threads = sum(self.config['readout_threads'].values()) - if (self.config['safe_break_in_pulses'] - > min(self.config['daq_chunk_duration'], - self.config['daq_overlap_chunk_duration'])): + self.t0 = int(self.config["run_start_time"]) * int(1e9) + self.dt_max = self.config["max_digitizer_sampling_time"] + self.n_readout_threads = sum(self.config["readout_threads"].values()) + if self.config["safe_break_in_pulses"] > min( + self.config["daq_chunk_duration"], self.config["daq_overlap_chunk_duration"] + ): raise ValueError( "Chunk durations must be larger than the minimum safe break" - " duration (preferably a lot larger!)") + " duration (preferably a lot larger!)" + ) def _path(self, chunk_i): - return self.config["daq_input_dir"] + f'/{chunk_i:06d}' + return self.config["daq_input_dir"] + f"/{chunk_i:06d}" def _chunk_paths(self, chunk_i): - """Return paths to previous, current and next chunk - If any of them does not exist, or they are not yet populated - with data from all readers, their path is replaced by False. - """ + """Return paths to previous, current and next chunk If any of them does not exist, or they + are not yet populated with data from all readers, their path is replaced by False.""" p = self._path(chunk_i) result = [] - for q in [p + '_pre', p, p + '_post']: + for q in [p + "_pre", p, p + "_post"]: if os.path.exists(q): n_files = self._count_files_per_chunk(q) if n_files >= self.n_readout_threads: result.append(q) else: - print(f"Found incomplete folder {q}: " - f"contains {n_files} files but expected " - f"{self.n_readout_threads}. " - f"Waiting for more data.") + print( + f"Found incomplete folder {q}: " + f"contains {n_files} files but expected " + f"{self.n_readout_threads}. " + "Waiting for more data." + ) if self.source_finished(): # For low rates, different threads might end in a # different chunck at the end of a run, # still keep the results in this case. - print(f"Run finished correctly nonetheless: " - f"saving the results") + print(f"Run finished correctly nonetheless: saving the results") result.append(q) else: result.append(False) @@ -196,26 +175,25 @@ def _chunk_paths(self, chunk_i): @staticmethod def _partial_chunk_to_thread_name(partial_chunk): - """Convert name of part of the chunk to the thread_name that wrote it""" - return '_'.join(partial_chunk.split('_')[:-1]) + """Convert name of part of the chunk to the thread_name that wrote it.""" + return "_".join(partial_chunk.split("_")[:-1]) def _count_files_per_chunk(self, path_chunk_i): - """ - Check that the files in the chunks have names consistent with - the readout threads - """ - counted_files = Counter( - [self._partial_chunk_to_thread_name(p) for p in os.listdir(path_chunk_i)]) + """Check that the files in the chunks have names consistent with the readout threads.""" + counted_files = Counter([ + self._partial_chunk_to_thread_name(p) for p in os.listdir(path_chunk_i) + ]) for thread, n_counts in counted_files.items(): - if thread not in self.config['readout_threads']: - raise ValueError(f'Bad data for {path_chunk_i}. Got {thread}') - if n_counts > self.config['readout_threads'][thread]: - raise ValueError(f'{thread} wrote {n_counts}, expected' - f'{self.config["readout_threads"][thread]}') + if thread not in self.config["readout_threads"]: + raise ValueError(f"Bad data for {path_chunk_i}. Got {thread}") + if n_counts > self.config["readout_threads"][thread]: + raise ValueError( + f"{thread} wrote {n_counts}, expected{self.config['readout_threads'][thread]}" + ) return sum(counted_files.values()) def source_finished(self): - end_dir = self.config["daq_input_dir"] + '/THE_END' + end_dir = self.config["daq_input_dir"] + "/THE_END" if not os.path.exists(end_dir): return False else: @@ -225,49 +203,48 @@ def is_ready(self, chunk_i): ended = self.source_finished() pre, current, post = self._chunk_paths(chunk_i) next_ahead = os.path.exists(self._path(chunk_i + 1)) - if (current and ( - (pre and post - or chunk_i == 0 and post - or ended and (pre and not next_ahead)))): + if current and ( + (pre and post or chunk_i == 0 and post or ended and (pre and not next_ahead)) + ): return True return False - def _load_chunk(self, path, start, end, kind='central'): + def _load_chunk(self, path, start, end, kind="central"): records = [ strax.load_file( - fn, - compressor=self.config["daq_compressor"], - dtype=self.dtype_for('raw_records')) - for fn in sorted(glob.glob(f'{path}/*'))] + fn, compressor=self.config["daq_compressor"], dtype=self.dtype_for("raw_records") + ) + for fn in sorted(glob.glob(f"{path}/*")) + ] records = np.concatenate(records) records = strax.sort_by_time(records) first_start, last_start, last_end = None, None, None if len(records): - first_start, last_start = records[0]['time'], records[-1]['time'] + first_start, last_start = records[0]["time"], records[-1]["time"] # Records are sorted by (start)time and are of variable length. # Their end-times can differ. In the most pessimistic case we have # to look back one record length for each channel. - tot_channels = np.sum([np.diff(x) + 1 for x in - self.config['channel_map'].values()]) + tot_channels = np.sum([np.diff(x) + 1 for x in self.config["channel_map"].values()]) look_n_samples = self.config["record_length"] * tot_channels last_end = strax.endtime(records[-look_n_samples:]).max() if first_start < start or last_start >= end: raise ValueError( f"Bad data from DAQ: chunk {path} should contain data " f"that starts in [{start}, {end}), but we see start times " - f"ranging from {first_start} to {last_start}.") + f"ranging from {first_start} to {last_start}." + ) - if kind == 'central': + if kind == "central": result = records break_time = None else: # Find a time at which we can safely partition the data. - min_gap = self.config['safe_break_in_pulses'] + min_gap = self.config["safe_break_in_pulses"] if not len(records) or last_end + min_gap < end: # There is enough room at the end of the data break_time = end - min_gap - result = records if kind == 'post' else records[:0] + result = records if kind == "post" else records[:0] else: # Let's hope there is some quiet time in the middle try: @@ -275,10 +252,10 @@ def _load_chunk(self, path, start, end, kind='central'): records, safe_break=min_gap, # Records from the last chunk can extend as far as: - not_before=(start - + self.config['record_length'] * self.dt_max), - left=kind == 'post', - tolerant=False) + not_before=(start + self.config["record_length"] * self.dt_max), + left=kind == "post", + tolerant=False, + ) except strax.NoBreakFound: # We still have to break somewhere, but this can involve # throwing away data. @@ -289,34 +266,158 @@ def _load_chunk(self, path, start, end, kind='central'): # Mark the region where data /might/ be removed with # artificial deadtime. - dead_time_start = ( - break_time - self.config['record_length'] * self.dt_max) + dead_time_start = break_time - self.config["record_length"] * self.dt_max warnings.warn( f"Data in {path} is so dense that no {min_gap} " - f"ns break exists: data loss inevitable. " - f"Inserting artificial deadtime between " + "ns break exists: data loss inevitable. " + "Inserting artificial deadtime between " f"{dead_time_start} and {end}.", - ArtificialDeadtimeInserted) + ArtificialDeadtimeInserted, + ) - if kind == 'pre': + if kind == "pre": # Give the artificial deadtime past the break result = self._artificial_dead_time( - start=break_time, end=end, dt=self.dt_max) + start=break_time, end=end, dt=self.dt_max + ) else: # Remove data that would stick out result = records[strax.endtime(records) <= break_time] # Add the artificial deadtime until the break result = strax.sort_by_time( - np.concatenate([result, - self._artificial_dead_time( - start=dead_time_start, - end=break_time, dt=self.dt_max)])) + np.concatenate([ + result, + self._artificial_dead_time( + start=dead_time_start, end=break_time, dt=self.dt_max + ), + ]) + ) return result, break_time def _artificial_dead_time(self, start, end, dt): return strax.dict_to_rec( - dict(time=[start], - length=[(end - start) // dt], - dt=[dt], - channel=[ARTIFICIAL_DEADTIME_CHANNEL]), - self.dtype_for('raw_records')) + dict( + time=[start], + length=[(end - start) // dt], + dt=[dt], + channel=[ARTIFICIAL_DEADTIME_CHANNEL], + ), + self.dtype_for("raw_records"), + ) + + def compute(self, chunk_i): + dt_central = self.config["daq_chunk_duration"] + dt_overlap = self.config["daq_overlap_chunk_duration"] + + t_start = chunk_i * (dt_central + dt_overlap) + t_end = t_start + dt_central + + pre, current, post = self._chunk_paths(chunk_i) + r_pre, r_post = None, None + break_pre, break_post = t_start, t_end + + if pre: + if chunk_i == 0: + warnings.warn( + f"DAQ is being sloppy: there should be no pre dir {pre} " + "for chunk 0. We're ignoring it.", + UserWarning, + ) + else: + r_pre, break_pre = self._load_chunk( + path=pre, start=t_start - dt_overlap, end=t_start, kind="pre" + ) + + r_main, _ = self._load_chunk(path=current, start=t_start, end=t_end, kind="central") + + if post: + r_post, break_post = self._load_chunk( + path=post, start=t_end, end=t_end + dt_overlap, kind="post" + ) + + # Concatenate the result. + records = np.concatenate([x for x in (r_pre, r_main, r_post) if x is not None]) + + # Split records by channel + tpc_min = min(self.config['channel_map']['bottom'][0], self.config['channel_map']['top'][0]) + tpc_max = max(self.config['channel_map']['bottom'][1], self.config['channel_map']['top'][1]) + + channel_ranges = { + 'tpc': (tpc_min, tpc_max), + } + + if 'external' in self.config['channel_map']: + channel_ranges['external'] = self.config['channel_map']['external'] + else: + channel_ranges['external'] = (-1, -1) + + result_arrays = split_channel_ranges( + records, np.asarray(list(channel_ranges.values())) + ) + del records + + # Convert to strax chunks + result = dict() + for i, subd in enumerate(channel_ranges): + + print(f"{i}. Subdetector {subd} has {len(result_arrays[i])} records") + + if len(result_arrays[i]): + # dt may differ per subdetector + dt = result_arrays[i]['dt'][0] + # Convert time to time in ns since unix epoch. + # Ensure the offset is a whole digitizer sample + result_arrays[i]['time'] += dt * (self.t0 // dt) + + result_name = 'raw_records' + if subd == 'external': + result_name += '_ext' + result[result_name] = self.chunk( + start=self.t0 + break_pre, + end=self.t0 + break_post, + data=result_arrays[i], + data_type=result_name, + ) + + print(f"Read chunk {chunk_i:06d} from DAQ") + for r in result.values(): + # Print data rate / data type if any + if r._mbs() > 0: + print(f"\t{r}") + return result + + +@export +@numba.njit(nogil=True, cache=True) +def split_channel_ranges(records, channel_ranges): + """Return numba.List of record arrays in channel_ranges. + + channel_ranges is a list of tuples specifying the channel ranges for each subdetector. + """ + n_subdetectors = len(channel_ranges) + which_detector = np.zeros(len(records), dtype=np.int8) + n_in_detector = np.zeros(n_subdetectors, dtype=np.int64) + + # First loop to count number of records per detector + for r_i, r in enumerate(records): + for d_i, (left, right) in enumerate(channel_ranges): + if left <= r['channel'] <= right: + which_detector[r_i] = d_i + n_in_detector[d_i] += 1 + break + else: + raise ValueError(f"Bad data from DAQ: data in unknown channel {r['channel']}") + + # Allocate memory + results = numba.typed.List() + for d_i in range(n_subdetectors): + results.append(np.empty(n_in_detector[d_i], dtype=records.dtype)) + + # Second loop to fill results + n_placed = np.zeros(n_subdetectors, dtype=np.int64) + for r_i, r in enumerate(records): + d_i = which_detector[r_i] + results[d_i][n_placed[d_i]] = r + n_placed[d_i] += 1 + + return results From d7c5d2e79c43363e444575985720dae484d91b99 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 13:35:29 +0100 Subject: [PATCH 02/14] added records_ext --- amstrax/plugins/records_ext/__init__.py | 2 + .../records_ext/pulse_processing_ext.py | 60 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 amstrax/plugins/records_ext/__init__.py create mode 100644 amstrax/plugins/records_ext/pulse_processing_ext.py diff --git a/amstrax/plugins/records_ext/__init__.py b/amstrax/plugins/records_ext/__init__.py new file mode 100644 index 00000000..59e8edd9 --- /dev/null +++ b/amstrax/plugins/records_ext/__init__.py @@ -0,0 +1,2 @@ +from . import pulse_processing_ext +from .pulse_processing_ext import * diff --git a/amstrax/plugins/records_ext/pulse_processing_ext.py b/amstrax/plugins/records_ext/pulse_processing_ext.py new file mode 100644 index 00000000..35907a37 --- /dev/null +++ b/amstrax/plugins/records_ext/pulse_processing_ext.py @@ -0,0 +1,60 @@ +import numba +import numpy as np +import strax +from immutabledict import immutabledict + +export, __all__ = strax.exporter() + +@export +@strax.takes_config( + strax.Option( + 'baseline_samples', + default=40, infer_type=False, + help='Number of samples to use at the start of the pulse to determine ' + 'the baseline'), +) +class PulseProcessingEXT(strax.Plugin): + """ + Plugin which performs the pulse processing steps: + 1. Baseline subtraction + 2. Pulse splitting + 3. Pulse merging + 4. Pulse counting + 5. Pulse length and area calculation + + """ + __version__ = '0.0.1' + + parallel = 'process' + rechunk_on_save = False + compressor = 'zstd' + + depends_on = 'raw_records_ext' + + provides = 'records_ext' + data_kind = 'records_ext' + + # I think in amstrax we can save everything + # default is ALWAYS + # save_when = strax.SaveWhen.TARGET + + def infer_dtype(self): + record_length = strax.record_length_from_dtype( + self.deps["raw_records_ext"].dtype_for("raw_records_ext") + ) + dtype = strax.record_dtype(record_length) + return dtype + + def compute(self, raw_records_ext): + # Do not trust in DAQ + strax.baseline to leave the + # out-of-bounds samples to zero. + r = strax.raw_to_records(raw_records_ext) + del raw_records_ext + + r = strax.sort_by_time(r) + strax.zero_out_of_bounds(r) + strax.baseline(r, baseline_samples=self.baseline_samples, flip=True) + + strax.integrate(r) + + return r \ No newline at end of file From 53c4ad86e9872d7358a3bd1bd4f5fa54d08c729f Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 13:35:44 +0100 Subject: [PATCH 03/14] added peaks and peaks_basics for ext data --- amstrax/plugins/peaks_ext/__init__.py | 6 + amstrax/plugins/peaks_ext/peak_basics_ext.py | 234 +++++++++++++++++++ amstrax/plugins/peaks_ext/peaks_ext.py | 81 +++++++ 3 files changed, 321 insertions(+) create mode 100644 amstrax/plugins/peaks_ext/__init__.py create mode 100644 amstrax/plugins/peaks_ext/peak_basics_ext.py create mode 100644 amstrax/plugins/peaks_ext/peaks_ext.py diff --git a/amstrax/plugins/peaks_ext/__init__.py b/amstrax/plugins/peaks_ext/__init__.py new file mode 100644 index 00000000..f871ca0f --- /dev/null +++ b/amstrax/plugins/peaks_ext/__init__.py @@ -0,0 +1,6 @@ +from . import peaks_ext +from .peaks_ext import * + +from . import peak_basics_ext +from .peak_basics_ext import * + diff --git a/amstrax/plugins/peaks_ext/peak_basics_ext.py b/amstrax/plugins/peaks_ext/peak_basics_ext.py new file mode 100644 index 00000000..e8e13fe2 --- /dev/null +++ b/amstrax/plugins/peaks_ext/peak_basics_ext.py @@ -0,0 +1,234 @@ +import numba +import numpy as np +import strax +from immutabledict import immutabledict + +export, __all__ = strax.exporter() + + +# For n_competing, which is temporarily added to PeakBasics +@export +@strax.takes_config( + strax.Option( + "channel_map", + type=immutabledict, + track=False, + help="Map of channel numbers to top, bottom and aqmon, to be defined in the context", + ), + strax.Option( + "check_peak_sum_area_rtol", + default=1e-4, + help="Check if the area of the sum-wf is the same as the total area" + " (if the area of the peak is positively defined)." + " Set to None to disable.", + ), + strax.Option( + 's1_min_width', + default=10, + help="Minimum (IQR) width of S1s" + ), + strax.Option( + 's1_max_width', + default=225, + help="Maximum (IQR) width of S1s" + ), + strax.Option( + 's1_min_area', + default=10, + help="Minimum area (PE) for S1s" + ), + strax.Option( + 's2_min_area', + default=10, + help="Minimum area (PE) for S2s" + ), + strax.Option( + 's2_min_width', + default=225, + help="Minimum width for S2s" + ), + strax.Option( + 's1_min_channels', + default=5, + help="Minimum number of channels for S1s" + ), + strax.Option( + 's2_min_channels', + default=5, + help="Minimum number of channels for S2s" + ), + strax.Option( + 's2_min_area_fraction_top', + default=0, + help="Minimum area fraction top for S2s" + ), + strax.Option( + 's1_max_area_fraction_top', + default=.2, + help="Maximum area fraction top for S1s" + ), +) +class PeakBasicsEXT(strax.Plugin): + provides = ("peak_basics_ext",) + depends_on = ("peaks_ext", ) + data_kind = "peaks_ext" + + parallel = "False" + rechunk_on_save = False + __version__ = "2.1" + dtype = [ + (('Start time of the peak (ns since unix epoch)', + 'time'), np.int64), + (('End time of the peak (ns since unix epoch)', + 'endtime'), np.int64), + (('Weighted center time of the peak (ns since unix epoch)', + 'center_time'), np.int64), + (('Peak integral in PE', + 'area'), np.float32), + (('Number of hits contributing at least one sample to the peak', + 'n_hits'), np.int32), + (('Number of PMTs contributing to the peak', + 'n_channels'), np.int16), + (('PMT number which contributes the most PE', + 'max_pmt'), np.int16), + (('Area of signal in the largest-contributing PMT (PE)', + 'max_pmt_area'), np.float32), + (('Total number of saturated channels', + 'n_saturated_channels'), np.int16), + (('Width (in ns) of the central 50% area of the peak', + 'range_50p_area'), np.float32), + (('Width (in ns) of the central 90% area of the peak', + 'range_90p_area'), np.float32), + (('Fraction of area seen by the top array ' + '(NaN for peaks with non-positive area)', + 'area_fraction_top'), np.float32), + (('Length of the peak waveform in samples', + 'length'), np.int32), + (('Time resolution of the peak waveform in ns', + 'dt'), np.int16), + (('Time between 10% and 50% area quantiles [ns]', + 'rise_time'), np.float32), + (('Number of PMTs with hits within tight range of mean', + 'tight_coincidence'), np.int16), + (('Type of peak (s1 or s2)', + 'type'), np.int16), + ] + + def compute(self, peaks_ext): + p = peaks_ext + r = np.zeros(len(p), self.dtype) + needed_fields = 'time length dt area type' + for q in needed_fields.split(): + r[q] = p[q] + + r['endtime'] = p['time'] + p['dt'] * p['length'] + r['n_channels'] = (p['area_per_channel'] > 0).sum(axis=1) + r['n_hits'] = p['n_hits'] + r['range_50p_area'] = p['width'][:, 5] + r['range_90p_area'] = p['width'][:, 9] + r['max_pmt'] = np.argmax(p['area_per_channel'], axis=1) + r['max_pmt_area'] = np.max(p['area_per_channel'], axis=1) + r['tight_coincidence'] = p['tight_coincidence'] + r['n_saturated_channels'] = p['n_saturated_channels'] + + # Negative-area peaks get NaN AFT + m = p['area'] > 0 + r['rise_time'] = -p['area_decile_from_midpoint'][:, 1] + + # Negative or zero-area peaks have centertime at startime + r["center_time"] = p["time"] + r["center_time"][m] += self.compute_center_times(p[m]) + + # Determine peak type + # 0 = unknown + # 1 = s1 + # 2 = s2 + + is_s1 = p['area'] >= 0 + is_s1 &= r['range_50p_area'] > self.config['s1_min_width'] + is_s1 &= r['range_50p_area'] < self.config['s1_max_width'] + is_s1 &= r['n_channels'] >= 0 + + is_s2 = p['area'] > 0 + is_s2 &= r['range_50p_area'] > self.config['s2_min_width'] + is_s2 &= r['n_channels'] >= 0 + + # if both are true, then it's an unknown peak + is_s1 &= ~is_s2 + is_s2 &= ~is_s1 + + r['type'][is_s1] = 1 + r['type'][is_s2] = 2 + + return r + + + @staticmethod + @numba.jit(nopython=True, nogil=True, cache=False) + def find_n_competing(peaks, window, fraction): + n = len(peaks) + t = peaks["time"] + a = peaks["area"] + results = np.zeros(n, dtype=np.int16) + left_i = 0 + right_i = 0 + for i, peak in enumerate(peaks): + while t[left_i] + window < t[i] and left_i < n - 1: + left_i += 1 + while t[right_i] - window < t[i] and right_i < n - 1: + right_i += 1 + results[i] = np.sum(a[left_i : right_i + 1] > a[i] * fraction) + + return results + + @staticmethod + @numba.njit(cache=True, nogil=True) + def compute_center_times(peaks): + result = np.zeros(len(peaks), dtype=np.int32) + for p_i, p in enumerate(peaks): + t = 0 + for t_i, weight in enumerate(p["data"]): + t += t_i * p["dt"] * weight + result[p_i] = t / p["area"] + return result + + @staticmethod + def check_area(area_per_channel_sum, peaks, rtol) -> None: + """ + Check if the area of the sum-wf is the same as the total area + (if the area of the peak is positively defined). + + :param area_per_channel_sum: the summation of the + peaks['area_per_channel'] which will be checked against the + values of peaks['area']. + :param peaks: array of peaks. + :param rtol: relative tolerance for difference between + area_per_channel_sum and peaks['area']. See np.isclose. + :raises: ValueError if the peak area and the area-per-channel + sum are not sufficiently close + """ + positive_area = peaks["area"] > 0 + if not np.sum(positive_area): + return + + is_close = np.isclose( + area_per_channel_sum[positive_area], + peaks[positive_area]["area"], + rtol=rtol, + ) + + if not is_close.all(): + for peak in peaks[positive_area][~is_close]: + print("bad area") + strax.print_record(peak) + + p_i = np.where(~is_close)[0][0] + peak = peaks[positive_area][p_i] + area_fraction_off = ( + 1 - area_per_channel_sum[positive_area][p_i] / peak["area"] + ) + message = ( + f"Area not calculated correctly, it's " + f'{100 * area_fraction_off} % off, time: {peak["time"]}' + ) + raise ValueError(message) diff --git a/amstrax/plugins/peaks_ext/peaks_ext.py b/amstrax/plugins/peaks_ext/peaks_ext.py new file mode 100644 index 00000000..0bf57572 --- /dev/null +++ b/amstrax/plugins/peaks_ext/peaks_ext.py @@ -0,0 +1,81 @@ +import numba +import numpy as np +import strax + +export, __all__ = strax.exporter() + +@export +@strax.takes_config( + strax.Option('peak_gap_threshold', default=300, + help="No hits for this many ns triggers a new peak"), + strax.Option('peak_left_extension', default=10, + help="Include this many ns left of hits in peaks"), + strax.Option('peak_right_extension', default=10, + help="Include this many ns right of hits in peaks"), + strax.Option('peak_min_area', default=10, + help="Minimum contributing PMTs needed to define a peak"), + strax.Option('peak_min_pmts', default=1, + help="Minimum contributing PMTs needed to define a peak"), + strax.Option('single_channel_peaks', default=False, + help='Whether single-channel peaks should be reported'), + strax.Option('peak_split_min_height', default=25, + help="Minimum height in PE above a local sum waveform" + "minimum, on either side, to trigger a split"), + strax.Option('peak_split_min_ratio', default=4, + help="Minimum ratio between local sum waveform" + "minimum and maxima on either side, to trigger a split"), + strax.Option('diagnose_sorting', track=False, default=False, + help="Enable runtime checks for sorting and disjointness"), + strax.Option('n_tpc_pmts', track=False, default=False, + help="Number of channels"), + strax.Option('gain_to_pe_array', default=None, + help="Gain to pe array"), +) +class PeaksEXT(strax.Plugin): + depends_on = ('records_ext',) + data_kind = 'peaks_ext' + parallel = 'process' + provides = ('peaks_ext') + rechunk_on_save = True + + __version__ = '0.0.1' + + def infer_dtype(self): + + return strax.peak_dtype(n_channels=self.config['n_tpc_pmts']) + + def compute(self, records_ext, start, end): + + r = records_ext + + if self.config['gain_to_pe_array'] is None: + self.to_pe = np.ones(self.config['n_tpc_pmts']) + else: + self.to_pe = self.config['gain_to_pe_array'] + + hits = strax.find_hits(r) + hits = strax.sort_by_time(hits) + + rlinks = strax.record_links(r) + + # Rewrite to just peaks/hits + peaks = strax.find_peaks( + hits, self.to_pe, + gap_threshold=self.config['peak_gap_threshold'], + left_extension=self.config['peak_left_extension'], + right_extension=self.config['peak_right_extension'], + min_area=self.config['peak_min_area'], + min_channels=1, + result_dtype=strax.peak_dtype(n_channels=self.config['n_tpc_pmts']) + ) + + strax.sum_waveform(peaks, hits, r, rlinks, self.to_pe) + + peaks = strax.split_peaks( + peaks, hits, r, rlinks, self.to_pe, + min_height=self.config['peak_split_min_height'], + min_ratio=self.config['peak_split_min_ratio']) + + strax.compute_widths(peaks) + + return peaks From c8c59893e57b5ea80486e01b8e89df53c7c83009 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 13:55:52 +0100 Subject: [PATCH 04/14] add plugins to init --- amstrax/plugins/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/amstrax/plugins/__init__.py b/amstrax/plugins/__init__.py index 82a292f1..cfe0143c 100644 --- a/amstrax/plugins/__init__.py +++ b/amstrax/plugins/__init__.py @@ -14,3 +14,10 @@ from . import events from .events import * + +from . import records_ext +from .records_ext import * + +from . import peaks_ext +from .peaks_ext import * + From 659aa2bb434fe5d98eb4305d42614633fed20216 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 13:56:15 +0100 Subject: [PATCH 05/14] register plugins and new channel map --- amstrax/contexts.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/amstrax/contexts.py b/amstrax/contexts.py index 60293f9e..50cf082c 100644 --- a/amstrax/contexts.py +++ b/amstrax/contexts.py @@ -36,6 +36,10 @@ ax.EventInfo, ax.EventWaveform, ax.EventAreaPerChannel, + # External PMT plugins + ax.PulseProcessingEXT, + ax.PeaksEXT, + ax.PeakBasicsEXT, # LED plugins not default # ax.RecordsLED, # ax.LEDCalibration, @@ -54,6 +58,7 @@ channel_map=immutabledict( bottom=(0, 0), top=(1, 4), + external=(5,10), aqmon=(40, 40), # register strax deadtime )) From 85bc3d654ff3191e247ef9f327436781bbfc23a8 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 13:58:16 +0100 Subject: [PATCH 06/14] remove print debug statements --- amstrax/plugins/raw_records/daqreader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index 7f10b8ea..3476c095 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -359,9 +359,7 @@ def compute(self, chunk_i): # Convert to strax chunks result = dict() for i, subd in enumerate(channel_ranges): - - print(f"{i}. Subdetector {subd} has {len(result_arrays[i])} records") - + if len(result_arrays[i]): # dt may differ per subdetector dt = result_arrays[i]['dt'][0] From c65ac4e1f73b595ebf4f190eec6fd09febca9974 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:03:40 +0100 Subject: [PATCH 07/14] fix indent --- amstrax/plugins/raw_records/daqreader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index 3476c095..2acf8609 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -359,7 +359,7 @@ def compute(self, chunk_i): # Convert to strax chunks result = dict() for i, subd in enumerate(channel_ranges): - + if len(result_arrays[i]): # dt may differ per subdetector dt = result_arrays[i]['dt'][0] @@ -403,8 +403,8 @@ def split_channel_ranges(records, channel_ranges): which_detector[r_i] = d_i n_in_detector[d_i] += 1 break - else: - raise ValueError(f"Bad data from DAQ: data in unknown channel {r['channel']}") + else: + raise ValueError(f"Bad data from DAQ: data in unknown channel {r['channel']}") # Allocate memory results = numba.typed.List() From 705cf36302d3585d9313524f0dc38d7ee462ad8b Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:05:58 +0100 Subject: [PATCH 08/14] fix codefactor newlines --- amstrax/plugins/__init__.py | 1 - amstrax/plugins/peaks_ext/__init__.py | 1 - amstrax/plugins/records_ext/pulse_processing_ext.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/amstrax/plugins/__init__.py b/amstrax/plugins/__init__.py index cfe0143c..7f715722 100644 --- a/amstrax/plugins/__init__.py +++ b/amstrax/plugins/__init__.py @@ -20,4 +20,3 @@ from . import peaks_ext from .peaks_ext import * - diff --git a/amstrax/plugins/peaks_ext/__init__.py b/amstrax/plugins/peaks_ext/__init__.py index f871ca0f..73211cf4 100644 --- a/amstrax/plugins/peaks_ext/__init__.py +++ b/amstrax/plugins/peaks_ext/__init__.py @@ -3,4 +3,3 @@ from . import peak_basics_ext from .peak_basics_ext import * - diff --git a/amstrax/plugins/records_ext/pulse_processing_ext.py b/amstrax/plugins/records_ext/pulse_processing_ext.py index 35907a37..e4304769 100644 --- a/amstrax/plugins/records_ext/pulse_processing_ext.py +++ b/amstrax/plugins/records_ext/pulse_processing_ext.py @@ -57,4 +57,4 @@ def compute(self, raw_records_ext): strax.integrate(r) - return r \ No newline at end of file + return r From b54a7282dec433e9bc588213e96447f39ef3eddb Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:13:52 +0100 Subject: [PATCH 09/14] explain error --- amstrax/plugins/raw_records/daqreader.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index 2acf8609..e1c34cb1 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -404,6 +404,8 @@ def split_channel_ranges(records, channel_ranges): n_in_detector[d_i] += 1 break else: + # channel_ranges should be sorted ascending. + print(r["time"], r["channel"], channel_ranges) raise ValueError(f"Bad data from DAQ: data in unknown channel {r['channel']}") # Allocate memory From 919ca4a8d819de2035b9d763d581e5c254093f78 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:26:36 +0100 Subject: [PATCH 10/14] confused --- amstrax/plugins/raw_records/daqreader.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index e1c34cb1..d94c4ce0 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -405,8 +405,7 @@ def split_channel_ranges(records, channel_ranges): break else: # channel_ranges should be sorted ascending. - print(r["time"], r["channel"], channel_ranges) - raise ValueError(f"Bad data from DAQ: data in unknown channel {r['channel']}") + raise ValueError(f"Bad data from DAQ: data in unknown channel {int(r['channel'])}") # Allocate memory results = numba.typed.List() From a1548554fdb6189d301b403eed2da9b0f9ffc58f Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:51:31 +0100 Subject: [PATCH 11/14] debug --- amstrax/plugins/raw_records/daqreader.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index d94c4ce0..ba2b6777 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -392,6 +392,9 @@ def split_channel_ranges(records, channel_ranges): channel_ranges is a list of tuples specifying the channel ranges for each subdetector. """ + + print("Channel ranges", channel_ranges) + n_subdetectors = len(channel_ranges) which_detector = np.zeros(len(records), dtype=np.int8) n_in_detector = np.zeros(n_subdetectors, dtype=np.int64) @@ -405,6 +408,7 @@ def split_channel_ranges(records, channel_ranges): break else: # channel_ranges should be sorted ascending. + print(f"Unknown channel found: {r['channel']}") # Add this line for debugging raise ValueError(f"Bad data from DAQ: data in unknown channel {int(r['channel'])}") # Allocate memory From f43bb1d3de7131919c58ad4e762a1820e213d96e Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:52:50 +0100 Subject: [PATCH 12/14] debug --- amstrax/plugins/raw_records/daqreader.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index ba2b6777..c39f1120 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -408,8 +408,9 @@ def split_channel_ranges(records, channel_ranges): break else: # channel_ranges should be sorted ascending. - print(f"Unknown channel found: {r['channel']}") # Add this line for debugging - raise ValueError(f"Bad data from DAQ: data in unknown channel {int(r['channel'])}") + channel_int = int(r['channel']) # Convert to int outside the f-string + print("Unknown channel found:", channel_int) # Add this line for debugging + raise ValueError(f"Bad data from DAQ: data in unknown channel") # Allocate memory results = numba.typed.List() From 0852dad12cf1ed8e80aa933b70a9f1c0910e1b22 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 14:57:35 +0100 Subject: [PATCH 13/14] remove debug statements --- amstrax/plugins/raw_records/daqreader.py | 3 --- requirements.txt | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/amstrax/plugins/raw_records/daqreader.py b/amstrax/plugins/raw_records/daqreader.py index c39f1120..23ae3721 100644 --- a/amstrax/plugins/raw_records/daqreader.py +++ b/amstrax/plugins/raw_records/daqreader.py @@ -392,9 +392,6 @@ def split_channel_ranges(records, channel_ranges): channel_ranges is a list of tuples specifying the channel ranges for each subdetector. """ - - print("Channel ranges", channel_ranges) - n_subdetectors = len(channel_ranges) which_detector = np.zeros(len(records), dtype=np.int8) n_in_detector = np.zeros(n_subdetectors, dtype=np.int64) diff --git a/requirements.txt b/requirements.txt index c757681d..8e3046e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ numpy -numba>=0.50.0 +numba>=0.58.1 strax>=0.12.0 pymongo<4.0 multihist>=0.6.3 From 566e5daa3147ec68c081772c1214e87153a4c716 Mon Sep 17 00:00:00 2001 From: cfuselli Date: Tue, 12 Dec 2023 15:01:16 +0100 Subject: [PATCH 14/14] remove debug statements --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 8e3046e0..c757681d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ numpy -numba>=0.58.1 +numba>=0.50.0 strax>=0.12.0 pymongo<4.0 multihist>=0.6.3