Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add min_stations argument for detection #565

Merged
merged 18 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a571acd
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Jan 8, 2023
0f9aed5
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Mar 19, 2023
bd70736
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Apr 12, 2023
1898b07
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain May 28, 2023
c92dc09
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Jul 28, 2023
80af1a0
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Oct 24, 2023
08113a0
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Oct 29, 2023
4033675
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Dec 3, 2023
8310f7e
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Dec 11, 2023
3568b0c
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Dec 11, 2023
eb52921
Merge branch 'develop' of https://github.com/eqcorrscan/EQcorrscan in…
calum-chamberlain Dec 12, 2023
5a86ea7
Add test for min-stations
calum-chamberlain Feb 6, 2024
a45ba3a
Add min_stations kwarg
calum-chamberlain Feb 7, 2024
32e3675
Catch no matching data in the right places
calum-chamberlain Feb 7, 2024
b30c507
Pass min_stations through client_detect
calum-chamberlain Feb 7, 2024
0bb9ed5
Merge branch 'min-stations' of https://github.com/eqcorrscan/EQcorrsc…
calum-chamberlain Feb 9, 2024
2d97206
flake8
calum-chamberlain Feb 9, 2024
f7f0823
Add client_detect test for min stations
calum-chamberlain Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
## Current
- core.match_filter.tribe
- Add option to set minimum number of stations required to use a template in detect
(`min_stations` kwarg)

## 0.5.0
* core.match_filter.tribe
- Significant re-write of detect logic to take advantage of parallel steps (see #544)
- Significant re-structure of hidden functions.
Expand Down
22 changes: 19 additions & 3 deletions eqcorrscan/core/match_filter/helpers/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,18 @@
break
Logger.debug("Getting stream from queue")
st = _get_and_check(input_stream_queue, poison_queue)
Logger.warning(st)
if st is None:
Logger.info("Ran out of streams, stopping processing")
break
elif isinstance(st, Poison):
Logger.error("Killed")
break
if len(st) == 0:
break
# if len(st) == 0:
# Logger.error("Empty stream provided")
# poison_queue.put_nowait(Poison(IndexError
# ("No matching channels between stream and ")))
# break
Logger.info(f"Processing stream:\n{st}")

# Process stream
Expand Down Expand Up @@ -445,6 +449,7 @@
output_queue: Queue,
poison_queue: Queue,
xcorr_func: str = None,
min_stations: int = 0,
):
"""
Prepare templates and stream for correlation.
Expand Down Expand Up @@ -472,6 +477,8 @@
Queue to check for poison, or put poison into if something goes awry
:param xcorr_func:
Name of correlation function backend to be used.
:param min_stations:
Minimum number of stations to run a template.
"""
if isinstance(templates, dict):
# We have been passed a db of template files on disk
Expand Down Expand Up @@ -525,16 +532,25 @@
f"Multiple channels in continuous data for "
f"{', '.join(_duplicate_sids)}")))
break
template_sids = {tr.id for template in templates for tr in template.st}
if len(template_sids.intersection(st_sids)) == 0:
poison_queue.put_nowait(Poison(

Check warning on line 537 in eqcorrscan/core/match_filter/helpers/processes.py

View check run for this annotation

Codecov / codecov/patch

eqcorrscan/core/match_filter/helpers/processes.py#L537

Added line #L537 was not covered by tests
IndexError("No matching channels between templates and data")))
# Do the grouping for this stream
Logger.info(f"Grouping {len(templates)} templates into groups "
f"of {group_size} templates")
try:
template_groups = _group(sids=st_sids, templates=templates,
group_size=group_size, groups=groups)
group_size=group_size, groups=groups,
min_stations=min_stations)
except Exception as e:
Logger.error(e)
poison_queue.put_nowait(Poison(e))
break
if template_groups is None:
output_queue.put_nowait(None)
return

Logger.info(f"Grouped into {len(template_groups)} groups")
for i, template_group in enumerate(template_groups):
killed = _check_for_poison(poison_queue)
Expand Down
25 changes: 18 additions & 7 deletions eqcorrscan/core/match_filter/helpers/tribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import numpy as np

from collections import defaultdict
from typing import List, Set
from typing import List, Set, Union
from timeit import default_timer

from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -261,36 +261,47 @@ def _group(
sids: Set[str],
templates: List[Template],
group_size: int,
groups: List[List[str]] = None
) -> List[List[Template]]:
groups: List[List[str]] = None,
min_stations: int = 0,
) -> Union[List[List[Template]], None]:
"""
Group templates either by seed id, or using pre-computed groups

:param sids: Seed IDs available in stream
:param templates: Templates to group
:param group_size: Maximum group size
:param groups: [Optional] List of List of template names in groups
:return: Groups of templates.
:param min_stations: Minimum number of stations to run a template.

:return: Groups of templates. None if no templates meet criteria
"""
Logger.info(f"Grouping for {sids}")
if groups:
Logger.info("Using pre-computed groups")
t_dict = {t.name: t for t in templates}
stations = {sid.split('.')[1] for sid in sids}
template_groups = []
for grp in groups:
template_group = [
t_dict.get(t_name) for t_name in grp
if t_name in t_dict.keys()]
if t_name in t_dict.keys() and
len({tr.stats.station for tr in
t_dict.get(t_name).st}.intersection(stations)
) >= max(1, min_stations)]
Logger.info(f"Dropping {len(grp) - len(template_group)} templates "
f"due to fewer than {min_stations} matched channels")
if len(template_group):
template_groups.append(template_group)
return template_groups
template_groups = group_templates_by_seedid(
templates=templates,
st_seed_ids=sids,
group_size=group_size)
group_size=group_size,
min_stations=min_stations)
if len(template_groups) == 1 and len(template_groups[0]) == 0:
Logger.error("No matching ids between stream and templates")
raise IndexError("No matching ids between stream and templates")
return None
# raise IndexError("No matching ids between stream and templates")
return template_groups


Expand Down
8 changes: 6 additions & 2 deletions eqcorrscan/core/match_filter/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ def group_templates_by_seedid(
templates: List[Template],
st_seed_ids: Set[str],
group_size: int,
min_stations: int = 0,
) -> List[List[Template]]:
"""
Group templates to reduce dissimilar traces
Expand All @@ -771,6 +772,9 @@ def group_templates_by_seedid(
Seed ids in the stream to be matched with
:param group_size:
Maximum group size - will not exceed this size
:param min_stations:
Minimum number of overlapping stations between template
and stream to use the template for detection.

:return:
List of lists of templates grouped.
Expand All @@ -788,9 +792,9 @@ def group_templates_by_seedid(
# Don't use templates that don't have any overlap with the stream
template_seed_ids = tuple(
(t_name, t_chans) for t_name, t_chans in template_seed_ids
if len(t_chans))
if len({sid.split('.')[1] for sid in t_chans}) >= max(1, min_stations))
Logger.info(f"Dropping {len(templates) - len(template_seed_ids)} "
f"templates due to no matched channels")
f"templates due to fewer than {min_stations} matched channels")
# We will need this dictionary at the end for getting the templates by id
template_dict = {t.name: t for t in templates}
# group_size can be None, in which case we don't actually need to group
Expand Down
27 changes: 20 additions & 7 deletions eqcorrscan/core/match_filter/tribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
concurrent_processing=False, ignore_length=False,
ignore_bad_data=False, group_size=None, overlap="calculate",
full_peaks=False, save_progress=False, process_cores=None,
pre_processed=False, check_processing=True,
pre_processed=False, check_processing=True, min_stations=0,
**kwargs):
"""
Detect using a Tribe of templates within a continuous stream.
Expand Down Expand Up @@ -749,6 +749,10 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
:type check_processing: bool
:param check_processing:
Whether to check that all templates were processed the same.
:type min_stations: int
:param min_stations:
Minimum number of overlapping stations between template
and stream to use the template for detection.

:return:
:class:`eqcorrscan.core.match_filter.Party` of Families of
Expand Down Expand Up @@ -871,7 +875,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
ignore_bad_data, group_size, groups, sampling_rate, threshold,
threshold_type, save_progress, xcorr_func, concurrency, cores,
export_cccsums, parallel, peak_cores, trig_int, full_peaks,
plot, plotdir, plot_format,)
plot, plotdir, plot_format, min_stations,)

if concurrent_processing:
party = self._detect_concurrent(*args, **inner_kwargs)
Expand Down Expand Up @@ -899,7 +903,7 @@ def _detect_serial(
group_size, groups, sampling_rate, threshold, threshold_type,
save_progress, xcorr_func, concurrency, cores, export_cccsums,
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
**kwargs
min_stations, **kwargs
):
""" Internal serial detect workflow. """
from eqcorrscan.core.match_filter.helpers.tribe import (
Expand Down Expand Up @@ -929,7 +933,10 @@ def _detect_serial(
delta = st_chunk[0].stats.delta
template_groups = _group(
sids={tr.id for tr in st_chunk},
templates=self.templates, group_size=group_size, groups=groups)
templates=self.templates, group_size=group_size, groups=groups,
min_stations=min_stations)
if template_groups is None:
continue
for i, template_group in enumerate(template_groups):
templates = [_quick_copy_stream(t.st) for t in template_group]
template_names = [t.name for t in template_group]
Expand Down Expand Up @@ -986,7 +993,7 @@ def _detect_concurrent(
group_size, groups, sampling_rate, threshold, threshold_type,
save_progress, xcorr_func, concurrency, cores, export_cccsums,
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
**kwargs
min_stations, **kwargs
):
""" Internal concurrent detect workflow. """
from eqcorrscan.core.match_filter.helpers.processes import (
Expand Down Expand Up @@ -1062,6 +1069,7 @@ def _detect_concurrent(
output_queue=prepped_queue,
poison_queue=poison_queue,
xcorr_func=xcorr_func,
min_stations=min_stations,
),
name="PrepProcess"
)
Expand Down Expand Up @@ -1224,7 +1232,7 @@ def client_detect(self, client, starttime, endtime, threshold,
ignore_bad_data=False, group_size=None,
return_stream=False, full_peaks=False,
save_progress=False, process_cores=None, retries=3,
check_processing=True, **kwargs):
check_processing=True, min_stations=0, **kwargs):
"""
Detect using a Tribe of templates within a continuous stream.

Expand Down Expand Up @@ -1316,6 +1324,10 @@ def client_detect(self, client, starttime, endtime, threshold,
:param retries:
Number of attempts allowed for downloading - allows for transient
server issues.
:type min_stations: int
:param min_stations:
Minimum number of overlapping stations between template
and stream to use the template for detection.

:return:
:class:`eqcorrscan.core.match_filter.Party` of Families of
Expand Down Expand Up @@ -1425,7 +1437,8 @@ def client_detect(self, client, starttime, endtime, threshold,
process_cores=process_cores, save_progress=save_progress,
return_stream=return_stream, check_processing=False,
poison_queue=poison_queue, shutdown=False,
concurrent_processing=concurrent_processing, groups=groups)
concurrent_processing=concurrent_processing, groups=groups,
min_stations=min_stations)

if not concurrent_processing:
Logger.warning("Using concurrent_processing=True can be faster if"
Expand Down
46 changes: 46 additions & 0 deletions eqcorrscan/tests/matched_filter/match_filter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,52 @@ def test_tribe_detect(self):
party=party, party_in=self.party, float_tol=0.05,
check_event=True)

def test_min_stations(self):
"""
Check that minimum stations is respected and templates
without sufficient stations are not run.
"""
local_tribe = self.tribe.copy()
# Drop some channels
local_tribe[0].st = local_tribe[0].st[0:-2]
for conc_proc in [False, True]:
party = local_tribe.detect(
stream=self.unproc_st, threshold=8.0, threshold_type='MAD',
trig_int=6.0, daylong=False, plot=False, min_stations=5,
parallel_process=False, concurrent_processing=conc_proc)
self.assertEqual(len(party), 3)

@pytest.mark.network
def test_min_stations_network(self):
"""
Check that minimum stations is respected and templates
without sufficient stations are not run.
"""
local_tribe = self.tribe.copy()
# Drop some channels
local_tribe[0].st = local_tribe[0].st[0:-2]
for conc_proc in [False, True]:
party = local_tribe.client_detect(
client=Client('NCEDC'), starttime=UTCDateTime(2004, 9, 28, 17),
endtime=UTCDateTime(2004, 9, 28, 18),
threshold=8.0, threshold_type='MAD',
trig_int=6.0, daylong=False, plot=False, min_stations=5,
parallel_process=False, concurrent_processing=conc_proc)
self.assertEqual(len(party), 3)

def test_no_stations(self):
"""
Check that minimum stations is respected and templates
without sufficient stations are not run.
"""
local_tribe = self.tribe.copy()
for conc_proc in [False, True]:
party = local_tribe.detect(
stream=self.unproc_st, threshold=8.0, threshold_type='MAD',
trig_int=6.0, daylong=False, plot=False, min_stations=6,
parallel_process=False, concurrent_processing=conc_proc)
self.assertEqual(len(party), 0)

def test_tribe_detect_with_empty_streams(self):
"""
Compare the detect method for a tribe of one vs two templates and check
Expand Down
Loading