From 5a86ea72b29666e052789bc8c3eb61d9e06e1141 Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Wed, 7 Feb 2024 11:42:57 +1300 Subject: [PATCH 1/6] Add test for min-stations --- eqcorrscan/tests/matched_filter/match_filter_test.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/eqcorrscan/tests/matched_filter/match_filter_test.py b/eqcorrscan/tests/matched_filter/match_filter_test.py index ff121c242..24a340878 100644 --- a/eqcorrscan/tests/matched_filter/match_filter_test.py +++ b/eqcorrscan/tests/matched_filter/match_filter_test.py @@ -729,6 +729,18 @@ def test_tribe_detect(self): party=party, party_in=self.party, float_tol=0.05, check_event=True) + def test_min_stations(self): + """ + Check that minimum stations is respected and templates + without sufficient stations are not run. + """ + for conc_proc in [True, False]: + party = self.tribe.detect( + stream=self.unproc_st, threshold=8.0, threshold_type='MAD', + trig_int=6.0, daylong=False, plot=False, min_stations=6, + parallel_process=False, concurrent_processing=conc_proc) + self.assertEqual(len(party), 0) + def test_tribe_detect_with_empty_streams(self): """ Compare the detect method for a tribe of one vs two templates and check From a45ba3a115afebeb35f38d60726a78c4bc01179b Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Wed, 7 Feb 2024 16:19:44 +1300 Subject: [PATCH 2/6] Add min_stations kwarg --- CHANGES.md | 5 ++++ .../core/match_filter/helpers/processes.py | 10 +++++++- eqcorrscan/core/match_filter/helpers/tribe.py | 25 +++++++++++++------ eqcorrscan/core/match_filter/template.py | 8 ++++-- eqcorrscan/core/match_filter/tribe.py | 24 +++++++++++++----- .../tests/matched_filter/match_filter_test.py | 20 +++++++++++++-- 6 files changed, 74 insertions(+), 18 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e49cdf69e..6503ac785 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,9 @@ ## Current +- core.match_filter.tribe + - Add option to set minimum number of stations required to use a template in detect + (`min_stations` kwarg) + +## 0.5.0 * core.match_filter.tribe - Significant re-write of detect logic to take advantage of parallel steps (see #544) - Significant re-structure of hidden functions. diff --git a/eqcorrscan/core/match_filter/helpers/processes.py b/eqcorrscan/core/match_filter/helpers/processes.py index 7a85945c0..60c446318 100644 --- a/eqcorrscan/core/match_filter/helpers/processes.py +++ b/eqcorrscan/core/match_filter/helpers/processes.py @@ -445,6 +445,7 @@ def _prepper( output_queue: Queue, poison_queue: Queue, xcorr_func: str = None, + min_stations: int = 0, ): """ Prepare templates and stream for correlation. @@ -472,6 +473,8 @@ def _prepper( Queue to check for poison, or put poison into if something goes awry :param xcorr_func: Name of correlation function backend to be used. + :param min_stations: + Minimum number of stations to run a template. """ if isinstance(templates, dict): # We have been passed a db of template files on disk @@ -530,11 +533,16 @@ def _prepper( f"of {group_size} templates") try: template_groups = _group(sids=st_sids, templates=templates, - group_size=group_size, groups=groups) + group_size=group_size, groups=groups, + min_stations=min_stations) except Exception as e: Logger.error(e) poison_queue.put_nowait(Poison(e)) break + if template_groups is None: + output_queue.put_nowait(None) + return + Logger.info(f"Grouped into {len(template_groups)} groups") for i, template_group in enumerate(template_groups): killed = _check_for_poison(poison_queue) diff --git a/eqcorrscan/core/match_filter/helpers/tribe.py b/eqcorrscan/core/match_filter/helpers/tribe.py index 9786a68fc..fcaa29fe2 100644 --- a/eqcorrscan/core/match_filter/helpers/tribe.py +++ b/eqcorrscan/core/match_filter/helpers/tribe.py @@ -17,7 +17,7 @@ import numpy as np from collections import defaultdict -from typing import List, Set +from typing import List, Set, Union from timeit import default_timer from concurrent.futures import ThreadPoolExecutor @@ -261,8 +261,9 @@ def _group( sids: Set[str], templates: List[Template], group_size: int, - groups: List[List[str]] = None -) -> List[List[Template]]: + groups: List[List[str]] = None, + min_stations: int = 0, +) -> Union[List[List[Template]], None]: """ Group templates either by seed id, or using pre-computed groups @@ -270,27 +271,37 @@ def _group( :param templates: Templates to group :param group_size: Maximum group size :param groups: [Optional] List of List of template names in groups - :return: Groups of templates. + :param min_stations: Minimum number of stations to run a template. + + :return: Groups of templates. None if no templates meet criteria """ Logger.info(f"Grouping for {sids}") if groups: Logger.info("Using pre-computed groups") t_dict = {t.name: t for t in templates} + stations = {sid.split('.')[1] for sid in sids} template_groups = [] for grp in groups: template_group = [ t_dict.get(t_name) for t_name in grp - if t_name in t_dict.keys()] + if t_name in t_dict.keys() and + len({tr.stats.station for tr in + t_dict.get(t_name).st}.intersection(stations) + ) >= min_stations] + Logger.info(f"Dropping {len(grp) - len(template_group)} templates " + f"due to fewer than {min_stations} matched channels") if len(template_group): template_groups.append(template_group) return template_groups template_groups = group_templates_by_seedid( templates=templates, st_seed_ids=sids, - group_size=group_size) + group_size=group_size, + min_stations=min_stations) if len(template_groups) == 1 and len(template_groups[0]) == 0: Logger.error("No matching ids between stream and templates") - raise IndexError("No matching ids between stream and templates") + return None + # raise IndexError("No matching ids between stream and templates") return template_groups diff --git a/eqcorrscan/core/match_filter/template.py b/eqcorrscan/core/match_filter/template.py index 599d27ed3..96c7735f9 100644 --- a/eqcorrscan/core/match_filter/template.py +++ b/eqcorrscan/core/match_filter/template.py @@ -761,6 +761,7 @@ def group_templates_by_seedid( templates: List[Template], st_seed_ids: Set[str], group_size: int, + min_stations: int = 0, ) -> List[List[Template]]: """ Group templates to reduce dissimilar traces @@ -771,6 +772,9 @@ def group_templates_by_seedid( Seed ids in the stream to be matched with :param group_size: Maximum group size - will not exceed this size + :param min_stations: + Minimum number of overlapping stations between template + and stream to use the template for detection. :return: List of lists of templates grouped. @@ -788,9 +792,9 @@ def group_templates_by_seedid( # Don't use templates that don't have any overlap with the stream template_seed_ids = tuple( (t_name, t_chans) for t_name, t_chans in template_seed_ids - if len(t_chans)) + if len({sid.split('.')[1] for sid in t_chans}) >= min_stations) Logger.info(f"Dropping {len(templates) - len(template_seed_ids)} " - f"templates due to no matched channels") + f"templates due to fewer than {min_stations} matched channels") # We will need this dictionary at the end for getting the templates by id template_dict = {t.name: t for t in templates} # group_size can be None, in which case we don't actually need to group diff --git a/eqcorrscan/core/match_filter/tribe.py b/eqcorrscan/core/match_filter/tribe.py index e415d1897..0d3a1f419 100644 --- a/eqcorrscan/core/match_filter/tribe.py +++ b/eqcorrscan/core/match_filter/tribe.py @@ -657,7 +657,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False, concurrent_processing=False, ignore_length=False, ignore_bad_data=False, group_size=None, overlap="calculate", full_peaks=False, save_progress=False, process_cores=None, - pre_processed=False, check_processing=True, + pre_processed=False, check_processing=True, min_stations=0, **kwargs): """ Detect using a Tribe of templates within a continuous stream. @@ -749,6 +749,10 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False, :type check_processing: bool :param check_processing: Whether to check that all templates were processed the same. + :type min_stations: int + :param min_stations: + Minimum number of overlapping stations between template + and stream to use the template for detection. :return: :class:`eqcorrscan.core.match_filter.Party` of Families of @@ -871,7 +875,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False, ignore_bad_data, group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, - plot, plotdir, plot_format,) + plot, plotdir, plot_format, min_stations,) if concurrent_processing: party = self._detect_concurrent(*args, **inner_kwargs) @@ -899,7 +903,7 @@ def _detect_serial( group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format, - **kwargs + min_stations, **kwargs ): """ Internal serial detect workflow. """ from eqcorrscan.core.match_filter.helpers.tribe import ( @@ -929,7 +933,10 @@ def _detect_serial( delta = st_chunk[0].stats.delta template_groups = _group( sids={tr.id for tr in st_chunk}, - templates=self.templates, group_size=group_size, groups=groups) + templates=self.templates, group_size=group_size, groups=groups, + min_stations=min_stations) + if template_groups is None: + continue for i, template_group in enumerate(template_groups): templates = [_quick_copy_stream(t.st) for t in template_group] template_names = [t.name for t in template_group] @@ -986,7 +993,7 @@ def _detect_concurrent( group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format, - **kwargs + min_stations, **kwargs ): """ Internal concurrent detect workflow. """ from eqcorrscan.core.match_filter.helpers.processes import ( @@ -1062,6 +1069,7 @@ def _detect_concurrent( output_queue=prepped_queue, poison_queue=poison_queue, xcorr_func=xcorr_func, + min_stations=min_stations, ), name="PrepProcess" ) @@ -1224,7 +1232,7 @@ def client_detect(self, client, starttime, endtime, threshold, ignore_bad_data=False, group_size=None, return_stream=False, full_peaks=False, save_progress=False, process_cores=None, retries=3, - check_processing=True, **kwargs): + check_processing=True, min_stations=0, **kwargs): """ Detect using a Tribe of templates within a continuous stream. @@ -1316,6 +1324,10 @@ def client_detect(self, client, starttime, endtime, threshold, :param retries: Number of attempts allowed for downloading - allows for transient server issues. + :type min_stations: int + :param min_stations: + Minimum number of overlapping stations between template + and stream to use the template for detection. :return: :class:`eqcorrscan.core.match_filter.Party` of Families of diff --git a/eqcorrscan/tests/matched_filter/match_filter_test.py b/eqcorrscan/tests/matched_filter/match_filter_test.py index 24a340878..f308248a5 100644 --- a/eqcorrscan/tests/matched_filter/match_filter_test.py +++ b/eqcorrscan/tests/matched_filter/match_filter_test.py @@ -734,8 +734,24 @@ def test_min_stations(self): Check that minimum stations is respected and templates without sufficient stations are not run. """ - for conc_proc in [True, False]: - party = self.tribe.detect( + local_tribe = self.tribe.copy() + # Drop some channels + local_tribe[0].st = local_tribe[0].st[0:-2] + for conc_proc in [False, True]: + party = local_tribe.detect( + stream=self.unproc_st, threshold=8.0, threshold_type='MAD', + trig_int=6.0, daylong=False, plot=False, min_stations=5, + parallel_process=False, concurrent_processing=conc_proc) + self.assertEqual(len(party), 3) + + def test_no_stations(self): + """ + Check that minimum stations is respected and templates + without sufficient stations are not run. + """ + local_tribe = self.tribe.copy() + for conc_proc in [False, True]: + party = local_tribe.detect( stream=self.unproc_st, threshold=8.0, threshold_type='MAD', trig_int=6.0, daylong=False, plot=False, min_stations=6, parallel_process=False, concurrent_processing=conc_proc) From 32e367514deb3a4e46e81be45ea861c824267f7b Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Wed, 7 Feb 2024 17:10:27 +1300 Subject: [PATCH 3/6] Catch no matching data in the right places --- eqcorrscan/core/match_filter/helpers/processes.py | 11 +++++++++-- eqcorrscan/core/match_filter/helpers/tribe.py | 2 +- eqcorrscan/core/match_filter/template.py | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/eqcorrscan/core/match_filter/helpers/processes.py b/eqcorrscan/core/match_filter/helpers/processes.py index 60c446318..885384da9 100644 --- a/eqcorrscan/core/match_filter/helpers/processes.py +++ b/eqcorrscan/core/match_filter/helpers/processes.py @@ -388,14 +388,17 @@ def _pre_processor( break Logger.debug("Getting stream from queue") st = _get_and_check(input_stream_queue, poison_queue) + Logger.warning(st) if st is None: Logger.info("Ran out of streams, stopping processing") break elif isinstance(st, Poison): Logger.error("Killed") break - if len(st) == 0: - break + # if len(st) == 0: + # Logger.error("Empty stream provided") + # poison_queue.put_nowait(Poison(IndexError("No matching channels between stream and "))) + # break Logger.info(f"Processing stream:\n{st}") # Process stream @@ -528,6 +531,10 @@ def _prepper( f"Multiple channels in continuous data for " f"{', '.join(_duplicate_sids)}"))) break + template_sids = {tr.id for template in templates for tr in template.st} + if len(template_sids.intersection(st_sids)) == 0: + poison_queue.put_nowait(Poison( + IndexError("No matching channels between templates and data"))) # Do the grouping for this stream Logger.info(f"Grouping {len(templates)} templates into groups " f"of {group_size} templates") diff --git a/eqcorrscan/core/match_filter/helpers/tribe.py b/eqcorrscan/core/match_filter/helpers/tribe.py index fcaa29fe2..2b6ff2a1a 100644 --- a/eqcorrscan/core/match_filter/helpers/tribe.py +++ b/eqcorrscan/core/match_filter/helpers/tribe.py @@ -287,7 +287,7 @@ def _group( if t_name in t_dict.keys() and len({tr.stats.station for tr in t_dict.get(t_name).st}.intersection(stations) - ) >= min_stations] + ) >= max(1, min_stations)] Logger.info(f"Dropping {len(grp) - len(template_group)} templates " f"due to fewer than {min_stations} matched channels") if len(template_group): diff --git a/eqcorrscan/core/match_filter/template.py b/eqcorrscan/core/match_filter/template.py index 96c7735f9..6bdf5c4e3 100644 --- a/eqcorrscan/core/match_filter/template.py +++ b/eqcorrscan/core/match_filter/template.py @@ -792,7 +792,7 @@ def group_templates_by_seedid( # Don't use templates that don't have any overlap with the stream template_seed_ids = tuple( (t_name, t_chans) for t_name, t_chans in template_seed_ids - if len({sid.split('.')[1] for sid in t_chans}) >= min_stations) + if len({sid.split('.')[1] for sid in t_chans}) >= max(1, min_stations)) Logger.info(f"Dropping {len(templates) - len(template_seed_ids)} " f"templates due to fewer than {min_stations} matched channels") # We will need this dictionary at the end for getting the templates by id From b30c507695f424f6fdefb280695c76febb4b7ebe Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Wed, 7 Feb 2024 17:40:57 +1300 Subject: [PATCH 4/6] Pass min_stations through client_detect --- eqcorrscan/core/match_filter/tribe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eqcorrscan/core/match_filter/tribe.py b/eqcorrscan/core/match_filter/tribe.py index 0d3a1f419..60d2e8b3e 100644 --- a/eqcorrscan/core/match_filter/tribe.py +++ b/eqcorrscan/core/match_filter/tribe.py @@ -1437,7 +1437,8 @@ def client_detect(self, client, starttime, endtime, threshold, process_cores=process_cores, save_progress=save_progress, return_stream=return_stream, check_processing=False, poison_queue=poison_queue, shutdown=False, - concurrent_processing=concurrent_processing, groups=groups) + concurrent_processing=concurrent_processing, groups=groups, + min_stations=min_stations) if not concurrent_processing: Logger.warning("Using concurrent_processing=True can be faster if" From 2d972067ee22ff17935758e74041c5cb4f305e53 Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Fri, 9 Feb 2024 15:53:59 +1300 Subject: [PATCH 5/6] flake8 --- eqcorrscan/core/match_filter/helpers/processes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eqcorrscan/core/match_filter/helpers/processes.py b/eqcorrscan/core/match_filter/helpers/processes.py index 885384da9..7eb44e52b 100644 --- a/eqcorrscan/core/match_filter/helpers/processes.py +++ b/eqcorrscan/core/match_filter/helpers/processes.py @@ -397,7 +397,8 @@ def _pre_processor( break # if len(st) == 0: # Logger.error("Empty stream provided") - # poison_queue.put_nowait(Poison(IndexError("No matching channels between stream and "))) + # poison_queue.put_nowait(Poison(IndexError + # ("No matching channels between stream and "))) # break Logger.info(f"Processing stream:\n{st}") From f7f08231a58763b5cc89451e269a34107b9498bb Mon Sep 17 00:00:00 2001 From: Calum Chamberlain Date: Fri, 9 Feb 2024 16:03:22 +1300 Subject: [PATCH 6/6] Add client_detect test for min stations --- .../tests/matched_filter/match_filter_test.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/eqcorrscan/tests/matched_filter/match_filter_test.py b/eqcorrscan/tests/matched_filter/match_filter_test.py index f308248a5..76180f897 100644 --- a/eqcorrscan/tests/matched_filter/match_filter_test.py +++ b/eqcorrscan/tests/matched_filter/match_filter_test.py @@ -744,6 +744,24 @@ def test_min_stations(self): parallel_process=False, concurrent_processing=conc_proc) self.assertEqual(len(party), 3) + @pytest.mark.network + def test_min_stations_network(self): + """ + Check that minimum stations is respected and templates + without sufficient stations are not run. + """ + local_tribe = self.tribe.copy() + # Drop some channels + local_tribe[0].st = local_tribe[0].st[0:-2] + for conc_proc in [False, True]: + party = local_tribe.client_detect( + client=Client('NCEDC'), starttime=UTCDateTime(2004, 9, 28, 17), + endtime=UTCDateTime(2004, 9, 28, 18), + threshold=8.0, threshold_type='MAD', + trig_int=6.0, daylong=False, plot=False, min_stations=5, + parallel_process=False, concurrent_processing=conc_proc) + self.assertEqual(len(party), 3) + def test_no_stations(self): """ Check that minimum stations is respected and templates